黑马点评

yin_bo_ Lv2

1. 内容概述

  • 短信登录
    • 这部分会使用Redis共享session来实现
    • 用Redis替换session来存储邮箱验证码
  • 商户查询缓存
    • 这部分要理解缓存击穿,缓存穿透,缓存雪崩等问题,对于这些概念的理解不仅仅是停留在概念上,更是能在代码中看到对应的内容
  • 优惠券秒杀
    • 这部分可以学会Redis的计数器功能,完成高性能的Redis操作,同时学会Redis分布式锁的原理,包括Redis的三种消息队列
  • 附近的商户
    • 利用Redis的GEOHash(新数据结构,前面没有应用场景就没介绍)来完成对于地理坐标的操作
  • UV统计
    • 主要是使用Redis来完成统计功能
  • 用户签到
    • 使用Redis的BitMap数据统计功能
  • 好友关注
    • 基于Set集合的关注、取消关注,共同关注等等功能,这部分在上篇的练习题中出现过,这次我们在项目中来使用一下
  • 达人探店
    • 基于List来完成点赞列表的操作,同时基于SortedSet来完成点赞的排行榜功能

2. 导入项目

  • 在实现功能之前,我们先来导入项目,让项目跑起来
    • 导入SQL表
说明
tb_user 用户表
tb_user_info 用户详情表
tb_shop 商户信息表
tb_shop_type 商户类型表
tb_blog 用户日记表(达人探店日记)
tb_follow 用户关注表
tb_voucher 优惠券表
tb_voucher_order 优惠券的订单表
  • 有关当前模型
    • 该项目采用的是前后端分离开发模式
    • 手机或者app端发起请求 请求我们的Nginx服务器 Nginx基于七层模型走的是HTTP协议 可以实现基于Lua直接绕开Tomcat访问Redis 也可以作为静态资源服务器 轻松抗下上万并发 负载均衡到下游Tomcat服务器 打散流量
    • 我们都知道一台4核8G的Tomcat在优化和处理简单业务的加持下 大不了就处理1000左右的并发 经过Nginx的负载均衡分馏后 利用集群支撑起整个项目 同时Nginx在部署了前端项目后 更是可以做到动静分离 进一步降低Tomcat服务器的压力 这些功能都靠Nginx起作用 所以Ngix是项目重要的一环
    • 在Tomcat支撑起并发流量后 我们如果让Tomcat直接去访问Mysql 根据经验Mysql企业级服务器只要上点并发 一般是16或32核心cpu 32或64G内存 向企业级mysql加上固态硬盘能够支撑的并发 大概就是4000~7000左右 上万并发 瞬间就会让Mysql服务器的cpu 硬盘全部打满 容易崩溃 所以我们在高并发场景下 会选择使用mysql集群 同时为了进一步降低Mysql的压力 同时增加访问的性能 我们也会加入Redis 同时使用Redis集群使得Redis对外提供更好的服务
  • 导入后端项目

    • 黑马已经提供好了后端项目源码压缩包,我们将其解压之后,放到自己的workspace里
    • 然后修改MySQL和Reids的连接要素为自己的,随后启动项目
    • 访问http://localhost:8081/shop-type/list ,如果可以看到JSON数据,则说明导入成功
  • 导入前端工程

  • 黑马已经提供好了前端项目源码压缩包,我们将其解压之后,放到自己的workSpace里

  • 然后在nginx所在目录打开一个cmd窗口,输入命令,即可启动项目
    start nginx.exe

  • 访问http://localhost:8080/ ,打开开发者模式,可以看到页面


3. 短信登录

  • Session的工作流程

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    // 第一次:存储数据
    session.setAttribute("code", "123456");
    // 实际上:服务器内部 Map<SessionID, Map<String, Object>>
    // 类似于:Map<"ABC123", {"code": "123456"}>

    // 第二次:获取数据
    String code = (String) session.getAttribute("code");
    // 实际上:通过Session ID "ABC123" 找到对应的Map,然后取"code"的值

    // 验证成功,清除Session中的验证码
    session.removeAttribute("code");

  • 发送验证码
    用户在提交手机号后,会校验手机号是否合法,如果不合法,则要求用户重新输入手机号
    如果手机号合法,后台此时生成对应的验证码,同时将验证码进行保存,然后再通过短信的方式将验证码发送给用户

  • 短信验证码登录、注册
    用户将验证码和手机号进行输入,后台从session中拿到当前验证码,然后和用户输入的验证码进行校验,如果不一致,则无法通过校验,如果一致,则后台根据手机号查询用户,如果用户不存在,则为用户创建账号信息,保存到数据库,无论是否存在,都会将用户信息保存到session中,方便后续获得当前登录信息

  • 校验登录状态
    用户在请求的时候,会从cookie中携带JsessionId到后台,后台通过JsessionId从session中拿到用户信息,如果没有session信息,则进行拦截,如果有session信息,则将用户信息保存到threadLocal中,并放行


3.1 Session实现发送验证码功能

  • 发送验证码
    用户在提交手机号后,会校验手机号是否合法,如果不合法,则要求用户重新输入手机号
    如果手机号合法,后台此时生成对应的验证码,同时将验证码进行保存,然后再通过短信的方式将验证码发送给用户

  • 输入手机号 点击发送验证码按钮 查看发送的请求

    这里可以看到

    请求网址: http://localhost:8080/api/user/code?phone=15793458583
    请求方法: POST

  • 看样子是调用了UserController中的code方法 携带参数是phone

    1
    2
    3
    4
    5
    6
    7
    8
    9
    /**
    * 发送手机验证码
    */
    @PostMapping("code")
    public Result sendCode(@RequestParam("phone") String phone, HttpSession session) {
    // TODO 发送短信验证码并保存验证码
    return Result.fail("功能未完成");
    }

  • 先在IUserService接口中声明方法

    先在userServiceImpl实现类中实现方法

    最后在userController控制器中调用这个方法来完成发送验证码需求

  • userServiceImpl实现类中写sendCode方法

    Result是自己定义的类 实现的是返回的信息是否成功

    RegexUtils类和RegexPatterns类中已经封装好了手机号正则表达式工具

    如果手机号符合正则表达式 则使用RandomUtil类中的随机生成数组方法来生成乱序验证码

    有了验证码之后使用setAttribute方法来将验证码保存到session

    这里简化 使用SLF4J直接向命令台发送验证码 之后项目上线了再使用发送验证码的形式

    最后使sendCode方法向前端返回OK

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    @Service
    @Slf4j
    publicclassUserServiceImplextendsServiceImpl<UserMapper, User>implementsIUserService{

    @Override
    //这里的Result是自己定义的类publicResult sendCode(String phone,HttpSessionsession) {
    //1. 校验手机号//这里黑马已经写好了手机号正则表达式工具//RegexPatterns RegexUtilsif(RegexUtils.isPhoneInvalid(phone)){
    //2. 如果不符合 返回错误信息returnResult.fail("手机号格式错误");
    }
    //3. 符合 生成验证码//这里直接用RandomUtil里的随机生成数字方法String code = RandomUtil.randomNumbers(6);

    //4. 将验证码保存到session//使用setAttribute方法来将验证码保存到sessionsession.setAttribute("code",code);

    //5. 发送验证码//这里是模拟 实际上线需要替换log.debug("发送短信验证码成功,验证码:{}",code);

    //返回OKreturnResult.ok();
    }
    }

    1
    2
    3
    4
    5
    6
    publicinterfaceIUserServiceextendsIService<User> {

    Result sendCode(String phone,HttpSessionsession);

    }

    1
    2
    3
    4
    5
    6
    @PostMapping("code")
    publicResult sendCode(@RequestParam("phone") String phone,HttpSessionsession)throwsMessagingException {
    // TODO 发送短信验证码并保存验证码//return Result.fail("功能未完成");returnuserService.sendCode(phone,session);

    }

    由此可知 实现项目功能的正确顺序为

    在 Service 接口中声明方法 → 在 Service 实现类中实现方法在 → Controller 中调用


3.2 Session实现验证码登录功能

  • 短信验证码登录、注册
    用户将验证码和手机号进行输入,后台从session中拿到当前验证码,然后和用户输入的验证码进行校验,如果不一致,则无法通过校验,如果一致,则后台根据手机号查询用户,如果用户不存在,则为用户创建账号信息,保存到数据库,无论是否存在,都会将用户信息保存到session中,方便后续获得当前登录信息
  • 输入手机号和验证码 点击登录的按钮 查看发送的请求

可以看到请求的URL是user/login 并且请求参数是JSON形式

由此推断出调用了userControllerlogin方法

  • 由图 前端提交的是json形式 所以需要使用@RequestBody
    • @RequestBody是Sping MVC中的注解 作用是将HTTP请求的 JSON/XML正文转换为Java对象
  • 如下是LoginFromDTO的代码 由此可知将JSON文件转换成立phone code password
1
2
3
4
5
6
7
@Data
publicclassLoginFormDTO {
privateString phone;
privateString code;
privateString password; //可以密码登录
}

  • 接下来依旧在 Service 接口中声明方法 → 在 Service 实现类中实现方法在 → Controller 中调用

    • 在Service接口中声明 需要的参数为 从JSON转化为Java代码的登录信息 还有Session

      1
      2
      3
      4
      5
      6
      7
      publicinterfaceIUserServiceextendsIService<User> {

      Result sendCode(String phone,HttpSessionsession);

      Result login(LoginFormDTO loginForm,HttpSessionsession);
      }

    • 在 Service 实现类中实现方法 首先先将思路写出 如图

      对数据库增删改查用的是MyBatisPlus 可以能简单的实现CRUD

      本类中继承了ServiceImpl类 这个类是MyBatisPlus提供的 可以实现对单表的CRUD

      query()就相当于SQL中的select * from tb_user where

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      @Override
      publicResult login(LoginFormDTO loginForm,HttpSessionsession) {
      //1. 校验手机号String phone = loginForm.getPhone();
      if(RegexUtils.isPhoneInvalid(phone)){
      returnResult.fail("手机号格式错误");
      }
      //2. 校验验证码// 从session中取出验证码Object cacheCode = session.getAttribute("code");
      String code = loginForm.getCode();

      //3. 不一致 直接报错if(cacheCode ==null){
      returnResult.fail("验证码已过期");
      }
      //这里要使用toString方法把cacheCode变成字符串 要不然可能出现空指针
      elseif(!code.equals(cacheCode.toString())){
      returnResult.fail("验证码错误");
      }
      session.removeAttribute("code");

      //4. 如果一致 根据手机号查询用户//使用的是mybatisplus 所以很简单就可以实现查询//本类中继承了ServiceImpl父类 这个类是mybatisplus提供的//这个类可以实现对于单表的增删改查//使用方法是表明实体类和Mapper是什么//query()就等同于SQL里的 select * from tb_user whereUser user = query().eq("phone", phone).one();
      //5. 判断用户是否存在if(user ==null){
      //6. 不存在 创建新用户 保存用户到数据库user = createUserWithPhone(phone);
      }
      //7. 存在//8. 保存用户到sessionsession.setAttribute("user",user);
      //session的原理是cookie 所以不需要返回登陆凭证returnResult.ok();
      }

      privateUser createUserWithPhone(String phone) {
      //创建新的用户User user =newUser();
      user.setPhone(phone);
      user.setNickName(USER_NICK_NAME_PREFIX+ RandomUtil.randomString(7));
      //使用mybatisplus保存用户到数据库save(user);
      returnuser;
      }

  • 最后在Controller层调用login方法

    1
    2
    3
    4
    5
    @PostMapping("/login")
    publicResult login(@RequestBody LoginFormDTO loginForm,HttpSessionsession){
    // 实现登录功能returnuserService.login(loginForm,session);
    }

这里黑马写出了一个漏洞:

如果我用手机号A申请了验证码 然后将手机号A修改成手机号B 再用验证码还可以登录

解决方案:

首先在发送验证码功能里使用Session里的setAttribute来获取申请验证码的手机号

然后在登录功能里通过校验Session里的手机号和前端发送的手机号进行比对 来避免这个漏洞


3.3 Session实现登录校验拦截功能

  • 用拦截器对用户信息进行校验

    • 拦截器概念

    在学习拦截器的概念之前,我们先看一张图

    alt 拦截器
    alt 拦截器

    • 浏览器发送一个请求,会先到Tomcat服务器的web服务器
    • Tomcat服务器接收到请求后,会先去判断请求的是静态资源还是动态资源
    • 如果是静态资源,会直接到Tomcat的项目部署目录下直接访问
    • 如果是动态资源,就需要交给项目的后台代码进行处理
    • 在找到具体的方法之前,我们可以去配置过滤器(可以配置多个),按照顺序进行执行(在这里就可以进行权限校验)
    • 然后进入到中央处理器(SpringMVC中的内容),SpringMVC会根据配置的规则进行拦截
    • 如果满足规则,则进行处理,找到其对应的Controller类中的方法进行执行,完成后返回结果
    • 如果不满足规则,则不进行处理
    • 这个时候,如果我们需要在每个Controller方法执行的前后添加业务,具体该如何来实现?
      1. 这个就是拦截器要做的事
    • 拦截器(Interceptor)是一种动态拦截方法调用的机制,在SpringMVC中动态拦截控制器方法的执行
      • 作用:
        • 在指定的方法调用前后执行预先设定的代码
        • 阻止原始方法的执行
      • 总结:拦截器就是用来作增强
    • 但是这个拦截器貌似跟我们之前学的过滤器很像啊,不管是从作用上来看还是从执行顺序上来看
      • 那么拦截器和过滤器之间的区别是什么呢?
        • 归属不同:Filter属于Servlet技术,而Interceptor属于SpringMVC技术
        • 拦截内容不同:Filter对所有访问进行增强,Interceptor仅对SpringMVC的访问进行增强
    • 过滤器和拦截器的区别补充
      • 执行顺序:Filter > Interceptor > Controller
      • 依赖关系:Filter不依赖Spring,Interceptor是Spring框架的一部分
      • 使用场景:Filter适合做全局处理(编码、安全),Interceptor适合做业务相关处理(权限、日志)
  • 先写出拦截器功能

    创建一个LoginInterceptor类 完成HandlerInterceptor接口 重写其中的两个方法 前置拦截器和完成处理方法 前置拦截器主要用于我们登录之前的权限校验 完成处理方法是用于处理登录后的信息 避免内存泄漏

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    public class LoginInterceptor implements HandlerInterceptor {
    //这里按ctrl+i就可以自动构建这两个方法
    @Override
    public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
    //1. 获取session
    HttpSession session = request.getSession();
    //2. 获取session中的用户信息
    User user = (User) session.getAttribute("user");
    //3. 判断用户是否存在
    if (user == null) {
    //4. 不存在,则拦截
    response.setStatus(401);
    return false;
    }
    //5. 存在,保存用户信息到ThreadLocal,UserHolder是提供好了的工具类
    UserHolder.saveUser(user);
    //6. 放行
    return true;
    }

    @Override
    public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
    UserHolder.removeUser();
    }
    }

    • 这里将user保存到ThreadLocal 使用UserHolder里的方法

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      public class UserHolder {
      private static final ThreadLocal<User> tl = new ThreadLocal<>();

      public static void saveUser(User user){
      tl.set(user);
      }

      public static User getUser(){
      return tl.get();
      }

      public static void removeUser(){
      tl.remove();
      }
      }

    • 再将拦截器配置到MVC当中

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      @Configuration
      public class MvcConfig implements WebMvcConfigurer {
      @Override
      public void addInterceptors(InterceptorRegistry registry) {
      registry.addInterceptor(new LoginInterceptor())
      .excludePathPatterns(
      "/user/code",
      "/user/login",
      "/blog/hot",
      "/shop/**",
      "/shop-type/**",
      "/upload/**",
      "/voucher/**"
      );
      }
      }

    • 最后写一下me方法

      1
      2
      3
      4
      5
      6
      7
      @GetMapping("/me")
      public Result me() {
      //这里通过UserHolder的getUser方法将user从ThreadLocal中拉出来
      User user = UserHolder.getUser();
      return Result.ok(user);
      }

  • 隐藏用户敏感信息

我们登录的的时候 前端的请求报文会显示用户在整个数据库中的信息

这样会泄露用户的敏感信息 所以我们应当在返回用户信息前 将用户的敏感信息进行隐藏 采用的核心思路是书写一个UserDto对象 这个UserDto对象就没有敏感信息了 在返回前 将有用户敏感信息的User对象转化为没有敏感信息的UserDto对象 就可以避免这个问题

  • DTO是什么

    • DTO(Data Transfer Object) 即数据传输对象 是一种设计模式 用于在不同层或不同系统之间传输数据

    DTO 的核心价值:

    • 🛡️ 安全性 - 隐藏敏感字段
    • 🎯 针对性 - 不同场景使用不同DTO
    • 🔧 灵活性 - 数据格式可定制
    • 📦 解耦合 - 各层数据独立,避免相互影响
  • UserDto类如下 将User对象中的属性拷贝给UserDto 就可以避免暴露用户的信息

    • lombok是一个Java库 通过注释自动生成代码 不需要开发射手动编写getter setter equals hashCode 构造函数等样板代码

      @Data 是Lombok中一个常用的注解

      作用是所有字段的getter/setter + equals + hashCode + toString

    1
    2
    3
    4
    5
    6
    7
    8
    importlombok.Data;
    @Data
    publicclassUserDTO {
    privateLong id;
    privateString nickName;
    privateString icon;
    }

  • 修改UserHolder 将其User类型全都改为UserDto类型

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    publicclassUserHolder {
    privatestatic finalThreadLocal<UserDTO>tl=newThreadLocal<>();

    publicstaticvoidsaveUser(UserDTO user){
    tl.set(user);
    }

    publicstaticUserDTO getUser(){
    returntl.get();
    }

    publicstaticvoidremoveUser(){
    tl.remove();
    }
    }

    这里黑马有多处调用了UserHolder类中的getUser方法 他们的类型都是User 突然改用 UserDto会报错所以需要多次修改 一个一个看太麻烦了

    对着getUser方法按左键 点查找方法就能看见所有调用这个方法的地方 然后依次修改就行

  • 修改login方法

    1
    2
    3
    4
    5
    6
            //7. 保存用户信息到session中
    - session.setAttribute("user", user);
    + UserDTO userDTO = BeanUtil.copyProperties(user, UserDTO.class);
    + session.setAttribute("user", userDTO);
    return Result.ok();

    BeamUtilhutool库里的一个类 里面有一个copyProperties(复制属性)方法 可以自动将属性拷贝成另外一个类型 并且自动创建对象

  • 修改拦截器

    1
    2
    3
    4
    //1. 获取sessionHttpSessionsession = request.getSession();
    //2. 获取session中的用户信息UserDTO user = (UserDTO) session.getAttribute("user");
    //3. 判断用户是否存在

  • 修改Controller层的me方法

    1
    2
    3
    4
    5
    6
    @GetMapping("/me")
    publicResult me() {
    // TODO 获取当前登录的用户并返回UserDTO user = UserHolder.getUser();
    returnResult.ok(user);
    }

  • 总结

  • 将数据库中的User转换成了UserDTO类型 —>

    • 将UserDTO存储在服务器的Session中 —>
    • Session为参数自动生成了JSESSIONID —>
    • 通过Set-Cookie头告诉浏览器保存JSESSIONID —>
    • 服务器使用JESSIONID找到对应的数据(拦截器在这个过程进行校验) —>
    • 往客户端页面里显示User数据

    暂时无法在飞书文档外展示此内容


3.4 Session共享问题

  • 每个tomcat中都有一份属于自己的session 假设用户第一次访问第一台tomcat 并且把自己的信息存放到第一台服务器的session中

    但是第二次这个用户访问了第二台tomcat 那么在第二台机器上 肯定没有第一台服务器存放的session 所以此时整个登录拦截功能就会出现问题

    • 我们能如果解决这个问题呢?

      早期的方案是session拷贝 就是说虽然每个tomcat上都会有不同的个session 但是每当任意一台服务器的session修改时 都会同步给其他的tomcat服务器的session 这样就可以实现session的共享

    • 但session共享有两个大问题

      • 每台服务器都有一份完整的session数据 服务器压力过大
      • session拷贝数据时 可能会出现延迟

所以后面都是基于Redis来完成 我们把session换成Redis Redis数据本身就是共享的 就可以避免session共享的问题了


3.5 Redis代替session的业务流程

  • 设计key结构
    • 首先要知道用什么数据结构来存储数据
    • 由于存入的数据比较简单 我们可以使用String或者Hash
      • 如果使用String 以JSON字符串来保存数据 会额外占用部分空间
      • 如果使用Hash 则它的value中只会存储数据本身
    • 如果不在意内存 直接使用String就好
  • 设计key的细节
    • 这里就采用K-V键值对方式

    • 但是对于key的处理 不能像session一样用phone或code来当key

      因为Redis的key是共享的 code可能会重复 phone这种敏感字段也不适合存储到Redis中

    • 在设计key的时候 我们需要满足两点

      • key要有唯一性
      • key要方便携带

所以我们在后台随机生成一个token 然后让前端带着这个token就能完成我们的业务逻辑

  • 整体访问流程
    • 当注册完成后 用户去登录 然后校验用户提交的手机号/邮箱和验证码是否一致
      • 如果一致 则根据手机号查询用户信息 不存在则新建 最后将用户数据保存到Redis 并生成一个token作为Redis的key
    • 当我们校验用户是否登录时 回去携带着token进行访问 从Redis中获取token对应的value 判断是否存在这个数据
      • 如果不存在 则拦截
      • 如果存在 则将其用户信息(userDto)保存到ThreadLocal 并放行

3.6 基于Redis实现短信登录

  1. UserServiceImpl中注入StringRedisTemplate

    1
    2
    3
    @Resource
    privateStringRedisTemplate stringRedisTemplate;

  2. 修改sendCode方法

    1. 这里的key使用login:code:手机号 防止与其他业务的key冲突
    2. 验证码需要有效期 防止冗余数据堆叠在Redis中
    3. key需要加上业务名称 避免与其他业务的key冲突 例如login:code:phone
    4. 这些可以定义成常量类 让代码更专业
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    publicclassRedisConstants {
    publicstaticfinalStringLOGIN_CODE_KEY= "login:code:";
    publicstaticfinalLongLOGIN_CODE_TTL= 2L;
    publicstaticfinalStringLOGIN_USER_KEY= "login:token:";
    publicstaticfinalLongLOGIN_USER_TTL= 36000L;

    publicstaticfinalLongCACHE_NULL_TTL= 2L;

    publicstaticfinalLongCACHE_SHOP_TTL= 30L;
    publicstaticfinalStringCACHE_SHOP_KEY= "cache:shop:";

    publicstaticfinalStringLOCK_SHOP_KEY= "lock:shop:";
    publicstaticfinalLongLOCK_SHOP_TTL= 10L;

    publicstaticfinalStringSECKILL_STOCK_KEY= "seckill:stock:";
    publicstaticfinalStringBLOG_LIKED_KEY= "blog:liked:";
    publicstaticfinalStringFEED_KEY= "feed:";
    publicstaticfinalStringSHOP_GEO_KEY= "shop:geo:";
    publicstaticfinalStringUSER_SIGN_KEY= "sign:";
    }

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    //实现发送验证码功能
    @Override
    //这里的Result是自己定义的类
    public Result sendCode(String phone, HttpSession session) {
    //1. 校验手机号
    //这里黑马已经写好了手机号正则表达式工具
    //RegexPatterns RegexUtils
    if(RegexUtils.isPhoneInvalid(phone)){
    //2. 如果不符合 返回错误信息
    return Result.fail("手机号格式错误");
    }
    //3. 符合 生成验证码
    //这里直接用RandomUtil里的随机生成数字方法
    String code = RandomUtil.randomNumbers(6);

    - //4. 将验证码保存到session
    - //使用setAttribute方法来将验证码保存到session
    - session.setAttribute("code",code);
    - session.setAttribute("phone",phone);
    + //4. 将验证码保存到Redis
    + //这里的key使用login:code:手机号 防止与其他业务的key冲突
    + //key需要有效期 设置成两分钟 2和login:code最好定义个常量类 看起来更专业
    + stringRedisTemplate.opsForValue().set(LOGIN_CODE_KEY + phone,code,LOGIN_CODE_TTL,TimeUnit.MINUTES);

    //5. 发送验证码
    //这里是模拟 实际上线需要替换
    log.debug("发送短信验证码成功,验证码:{}",code);

    //返回OK
    return Result.ok();
    }

  3. 修改login方法

    1. 使用redis获取验证码就不需要校验phone是否前后一致了
    2. 存储用户信息到redis需要生成token作为key 这里使用UUID
    3. 因为需要将用户信息以HashMap的形式存入reidis 所以这里需要使用BeanUtil将UserDTO转化为UserMap
    4. 用户信息一直存到redis会造成redis臃肿 所以也需要给登录信息设置有效期 使用exprie方法
    5. 用户登录后将code删除
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    145
    146
    147
    148
    149
    150
    151
    152
    153
    154
    155
    156
    157
    158
    159
    160
    161
    @Override
    public Result login(LoginFormDTO loginForm, HttpSession session) {
    //1. 校验手机号
    String phone = loginForm.getPhone();
    if(RegexUtils.isPhoneInvalid(phone)){
    return Result.fail("手机号格式错误");
    }
    - //2. 校验验证码
    - // 从session中取出验证码
    - Object cacheCode = session.getAttribute("code");
    - String code = loginForm.getCode();
    + //2. 校验验证码
    + // 从Redis获取验证码并校验
    + //注意这 opsForValue().get(LOGIN_CODE_KEY + phone); phone是发送验证码之后的phone 所以不需要二次验证
    + //如果phone前后不一致 cacheCode就不能获取数据 就是null
    + String cacheCode = stringRedisTemplate.opsForValue().get(LOGIN_CODE_KEY + phone);
    + String code = loginForm.getCode();

    //3. 不一致 直接报错
    if(cacheCode == null){
    return Result.fail("验证码已过期");
    }
    - //这里要使用toString方法把cacheCode变成字符串 要不然可能出现空指针
    + //这里cacheCode已经是String类型
    else if(!code.equals(cacheCode)){
    return Result.fail("验证码错误");
    }
    - session.removeAttribute("code");

    //实现发送验证码功能
    @Override
    //这里的Result是自己定义的类
    public Result sendCode(String phone, HttpSession session) {
    //1. 校验手机号
    //这里黑马已经写好了手机号正则表达式工具
    //RegexPatterns RegexUtils
    if(RegexUtils.isPhoneInvalid(phone)){
    //2. 如果不符合 返回错误信息
    return Result.fail("手机号格式错误");
    }
    //3. 符合 生成验证码
    //这里直接用RandomUtil里的随机生成数字方法
    String code = RandomUtil.randomNumbers(6);

    - //4. 将验证码保存到session
    - //使用setAttribute方法来将验证码保存到session
    - session.setAttribute("code",code);
    - session.setAttribute("phone",phone);
    + //4. 将验证码保存到Redis
    + //这里的key使用login:code:手机号 防止与其他业务的key冲突
    + //key需要有效期 设置成两分钟 2和login:code最好定义个常量类 看起来更专业
    + stringRedisTemplate.opsForValue().set(LOGIN_CODE_KEY + phone,code,LOGIN_CODE_TTL,TimeUnit.MINUTES);

    //5. 发送验证码
    //这里是模拟 实际上线需要替换
    log.debug("发送短信验证码成功,验证码:{}",code);

    //返回OK
    return Result.ok();
    }

    修改login方法
    使用redis获取验证码就不需要校验phone是否前后一致了
    存储用户信息到redis需要生成token作为key 这里使用UUID
    因为需要将用户信息以HashMap的形式存入reidis 所以这里需要使用BeanUtil将UserDTO转化为UserMap
    用户信息一直存到redis会造成redis臃肿 所以也需要给登录信息设置有效期 使用exprie方法
    @Override
    public Result login(LoginFormDTO loginForm, HttpSession session) {
    //1. 校验手机号
    String phone = loginForm.getPhone();
    if(RegexUtils.isPhoneInvalid(phone)){
    return Result.fail("手机号格式错误");
    }
    - //2. 校验验证码
    - // 从session中取出验证码
    - Object cacheCode = session.getAttribute("code");
    - String code = loginForm.getCode();
    + //2. 校验验证码
    + // 从Redis获取验证码并校验
    + //注意这 opsForValue().get(LOGIN_CODE_KEY + phone); phone是发送验证码之后的phone 所以不需要二次验证
    + //如果phone前后不一致 cacheCode就不能获取数据 就是null
    + String cacheCode = stringRedisTemplate.opsForValue().get(LOGIN_CODE_KEY + phone);
    + String code = loginForm.getCode();

    //3. 不一致 直接报错
    if(cacheCode == null){
    return Result.fail("验证码已过期");
    }
    //这里要使用toString方法把cacheCode变成字符串 要不然可能出现空指针
    else if(!code.equals(cacheCode.toString())){
    return Result.fail("验证码错误");
    }
    - session.removeAttribute("code");

    //4. 如果一致
    + //可以将redis中的code删除了
    + stringRedisTemplate.delete(LOGIN_CODE_KEY + phone);

    根据手机号查询用户
    //使用的是mybatisplus 所以很简单就可以实现查询
    //本类中继承了ServiceImpl父类 这个类是mybatisplus提供的
    //这个类可以实现对于单表的增删改查
    //使用方法是表明实体类和Mapper是什么
    //query()就等同于SQL里的 select * from tb_user where
    User user = query().eq("phone", phone).one();
    //5. 判断用户是否存在
    if(user == null){
    //6. 不存在 创建新用户 保存用户到数据库
    user = createUserWithPhone(phone);
    }
    - //7. 存在
    - //8. 保存用户到session
    - session.setAttribute("user",user);
    - //session的原理是cookie 所以不需要返回登陆凭证

    + //7. 保存用户信息到redis中
    + //7.1 随机生成token 作为登录令牌 使用UUID作为token
    + String token = UUID.randomUUID().toString();

    + //7.2 将user对象转化为Hash存储
    + //将user改为userDTO 再将userDTO转化为userMap 都可以用BeanUtil方法
    + UserDTO userDTO = BeanUtil.copyProperties(user, UserDTO.class);
    // 添加空值检查,避免存储null字符串
    if (userDTO.getIcon() != null) {
    userMap.put("icon", userDTO.getIcon());
    }
    userMap.put("id", String.valueOf(userDTO.getId()));
    if (userDTO.getNickName() != null) {
    userMap.put("nickName", userDTO.getNickName());
    }

    + //7.3 存储
    + stringRedisTemplate.opsForHash().putAll(LOGIN_USER_KEY + token, userMap);
    + //用户信息一直存到redis会造成redis臃肿 所以也需要给登录信息设置有效期
    + //在存储的时候不能直接设置有效期 需要使用 expire方法
    + stringRedisTemplate.expire(LOGIN_USER_KEY + token, LOGIN_USER_TTL, TimeUnit.MINUTES);

    // 8. 将token返回给客户端
    - return Result.ok();
    + return Result.ok(token);
    }

    private User createUserWithPhone(String phone) {
    //创建新的用户
    User user = new User();
    user.setPhone(phone);
    user.setNickName(USER_NICK_NAME_PREFIX + RandomUtil.randomString(7));
    //使用mybatisplus保存用户到数据库
    save(user);
    return user;
    }

    private User createUserWithPhone(String phone) {
    //创建新的用户
    User user = new User();
    user.setPhone(phone);
    user.setNickName(USER_NICK_NAME_PREFIX + RandomUtil.randomString(7));
    //使用mybatisplus保存用户到数据库
    save(user);
    return user;
    }
    1. 这里有一个问题 redis里的用户信息并不会随着用户的重新登录而刷新用户信息的有效期

      而session就没有这个问题

      我们需要在用户重新登录的时候刷新有效期 这里就需要在拦截器里修改

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    public class LoginInterceptor implements HandlerInterceptor {
    @Override
    public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
    //1. 获取session
    HttpSession session = request.getSession();
    //2. 获取session中的用户信息
    UserDTO user = (UserDTO) session.getAttribute("user");
    //3. 判断用户是否存在
    if (user == null) {
    //4. 不存在,则拦截
    response.setStatus(401);
    return false;
    }
    //5. 存在,保存用户信息到ThreadLocal,UserHolder是提供好了的工具类
    UserHolder.saveUser(user);
    //6. 放行
    return true;
    }

    @Override
    public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
    UserHolder.removeUser();
    }
    }

拦截器NPE的心得

这里我运行服务端之后可以登录 但是返回主页面的时候报错了 报错的是空指针异常NPE

为什么? 返回主页面报错说明拦截出错了 命令台也显示是我的拦截器出现了NPE

我打开LoginInterceptor 出错的是第46行 如下

*Map*<Object, Object> userMap = stringRedisTemplate.opsForHash().entries(***LOGIN_USER_KEY*** + token);

也没出啥错啊 我心想 我选择调试 调试之后发现token和redis里的也一样

但就是stringRedisTemplate传的是空值

我突然想起来 我的LoginInterceptor没有加@Component! 也就是说我的这个拦截器类并没有被Spring所管理 自然不能注入StringRedisTemplate

所以让拦截器类加上@Component 含需要在配置类中注入使用 注入LoginInterceptor 不再new出来

1
2
@Component  //加上注解

1
2
3
4
5
6
7
8
9
10
11
//这个类加了@Configuration  说明这个类由Spring构建  所以可以用@Resource注解
public class MvcConfig implements WebMvcConfigurer {

@Resource
private LoginInterceptor loginInterceptor;

@Override
public void addInterceptors(InterceptorRegistry registry) {
//手动new的话 stringRedisTemplate还是null
registry.addInterceptor(new LoginInterceptor())
registry.addInterceptor(loginInterceptor)
  • 拦截的问题: 拦截器只会拦截指定的路径 不是拦截一切

如果一直访问的是没有被拦截的页面 token就不会被刷新 也就没有刷新有效期

解决方法

所以我们需要在拦截登录的基础上 加上一个新的拦截器

第一个拦截器主要是查询Redis用户 如果没有查询到 就不会刷新token有效期而直接放行

第二个拦截器做登录拦截 等登录后 下一次用户访问第一个拦截一切的拦截器后会刷新token有效期

新建一个RefreshTokenInterceptor拦截器 将之前LoginInterceptor拦截器的代码复制过去

并且删除拦截功能 直接放行 但是如果检测到用户存在就把他放到ThreadLocal当中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
//对象是手动new出来的 不受Spring管理  没有@Component注解  所以不能使用@Resource来注入StringRedisTemplate
@Slf4j
@Component
public class RefreshTokenInterceptor implements HandlerInterceptor {

@Resource
private StringRedisTemplate stringRedisTemplate;

@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
//1. 从request获取请求头中的token
//前端代码中看到请求头叫authorization
String token = request.getHeader("authorization");

//使用StrUtil来判断token是否为空
//如果为空 交给LoginInterceptor处理
if (StrUtil.isBlank(token)) {
return true;
}

//2. 利用token从redis中获取用户信息
//使用entries获取的是HashMap
Map<Object, Object> userMap = stringRedisTemplate.opsForHash().entries(LOGIN_USER_KEY + token);

//3. 判断用户是否存在
if (userMap.isEmpty()) {
//4. 不存在,交给LoginInterceptor处理
return true;
}

//5. 将查询到的Hash数据转为userDTO对象 这样才能存到ThreadLocal当中
// 这里使用BeanUtil的fillBeanWithMap方法 第三个参数是是否忽略错误 填false
UserDTO userDTO = BeanUtil.fillBeanWithMap(userMap, new UserDTO(), false);
UserHolder.saveUser(userDTO);

//6. 刷新token的有效期
stringRedisTemplate.expire(LOGIN_USER_KEY + token, LOGIN_USER_TTL, TimeUnit.MINUTES);

//7. 放行
return true;
}

@Override
public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
UserHolder.removeUser();
}
}

修改LoginInterceptor拦截器 只留拦截功能

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
//对象是手动new出来的 不受Spring管理  没有@Component注解  所以不能使用@Resource来注入StringRedisTemplate
@Slf4j
@Component
publicclassLoginInterceptorimplementsHandlerInterceptor{

@Resource
privateStringRedisTemplate stringRedisTemplate;

@Override
publicbooleanpreHandle(HttpServletRequestrequest,HttpServletResponseresponse, Object handler)throwsException {

//判断是否需要拦截(ThreadLocal中是否有用户)if(UserHolder.getUser() ==null){
response.setStatus(401);
returnfalse;
}
returntrue;
}

@Override
publicvoidafterCompletion(HttpServletRequestrequest,HttpServletResponseresponse, Object handler, Exception ex)throwsException {
UserHolder.removeUser();
}

}

修改拦截器配置文件 添加refreshTokenInterceptor拦截器

并使用order来将拦截器排序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
//这个类加了@Configuration  说明这个类由Spring构建  所以可以用@Resource注解
@Configuration
publicclassMvcConfigimplementsWebMvcConfigurer{

@Resource
privateLoginInterceptor loginInterceptor;
@Resource
privateRefreshTokenInterceptor refreshTokenInterceptor;
@Override
publicvoidaddInterceptors(InterceptorRegistry registry) {
//第一个拦截器 拦截所有请求 有用户数据刷新token有效期 没有用户数据不刷新直接放行//使用order来规定拦截器顺序registry.addInterceptor(refreshTokenInterceptor).order(0);

//第二个拦截器 除了以下路径如果没登录直接拦截registry.addInterceptor(loginInterceptor).order(1)
.excludePathPatterns(
"/user/code",
"/user/login",
"/blog/hot",
"/shop/**",
"/shop-type/**",
"/upload/**",
"/voucher/**"
);

}
}


3.7 黑马没讲知识的补充

  • 登出功能
    很容易实现
    只需要在Controller层调用UserHolder类的removeUser方法来将用户信息从ThreadLocal去除就行

    • UserController的代码
    1
    2
    3
    4
    5
    @PostMapping("/logout")  
    public Result logout(){
    UserHolder.removeUser();
    return Result.ok();
    }

4. 商品查询缓存

4.1 什么是缓存?

  • 缓存(Cache)就是数据交换的缓冲区,是存储数据的临时地方,一般读写性能较高。

  • 在各个层面都有缓存 如图:

    alt 缓存
    alt 缓存

  • 缓存的作用:

    • 降低后端的负载。
    • 提高读写效率,降低响应时间。
  • 缓存的成本:

    • 需要保持数据一致性成本。
    • 代码维护成本。
    • 运维成本。

4.2 添加Redis缓存

  • 没有添加Redis缓存的时候
    • 客户端直接向数据库发送请求 数据库直接将数据返回给客户端
  • 添加了Redis缓存
    • 客户端优先向Redis发送请求 如果Redis中恰好有需要的数据 则直接返回给客户端
    • 如果没有 则向数据库发送请求 数据向Redis发送数据使其缓存 同时返回给客户端 这样下次相同请求可以直接由Redis完成

alt 缓存作用模型1
alt 缓存作用模型1

alt 缓存作用模型2
alt 缓存作用模型2

  • 根据id查询商铺缓存的流程
    alt 根据id查询商铺缓存
    alt 根据id查询商铺缓存

实现据id查询商铺缓存

  • 那么接下来 让我们尝试写一下根据id查询商铺缓存吧❤
    • 如果在Redis中查询到数据了 那一定是JSON结构的 所以获取和存储商铺数据类型用的是String
    • 依旧使用hutool库中的JSONUtil来转换shop的类型🤭
1
2
3
4
5
6
@GetMapping("/{id}")
public Result queryShopById(@PathVariable("id") Long id) {
//getById是IService接口里的方法 是MyBatisPlus提供的接口
//所以这里直接向前端返回数据库里的数据
return shopService.queryById(id);
}
1
2
3
4
 public interface IShopService extends IService<Shop> {

Result queryById(Long id);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class ShopServiceImpl extends ServiceImpl<ShopMapper, Shop> implements IShopService {

@Resource
StringRedisTemplate stringRedisTemplate;

@Override
public Result queryById(Long id) {
//1. 从Redis查询商铺缓存
String shopJson = stringRedisTemplate.opsForValue().get(CACHE_SHOP_KEY + id);
//2. 判断缓存是否命中
//3. 如果命中 也就是数据在Redis中存在
if (StrUtil.isNotBlank(shopJson)) {
Shop shop = JSONUtil.toBean(shopJson, Shop.class);
return Result.ok(shop);
}
//4. 如果Redis中不存在数据 则在MySQL中查询
Shop shop = getById(id);
//5. 如果在MySQL中也没查到 返回404
if (shop == null) {
return Result.fail("店铺不存在!");
}
//6. 如果在MySQL中查到了 存入Redis 使用String存的是JSON类型
stringRedisTemplate.opsForValue().set(CACHE_SHOP_KEY + id, JSONUtil.toJsonStr(shop));
//7. 返回商铺信息
return Result.ok(shop);
}
}
  • 做好根据id查询商铺缓存之后就会发现再次访问店铺加载时间缩短到了几ms 这就是缓存的力量

实现查询商铺列表缓存

  • 接下来趁热打铁😄
    • 咱们自己独立完成以下商铺列表的缓存
    • 整个功能都是我自己写的哦 没有看黑马🤭
    • 为什么我存到Redis中的列表类型用JSON而不用List🤔
      • JSON:1次序列化 + 1次网络IO
      • List:N次序列化 + N次网络IO(N为元素个数)
    • 使用的SONUtil.toList方法将JSON改为List的哦
1
2
3
4
5
6
7
8
9
10
@RequestMapping("/shop-type")
public class ShopTypeController {
@Resource
private IShopTypeService typeService;
@GetMapping("list")
public Result queryTypeList() {
return typeService.queryShopTypeByList();
}
}

1
2
3
4
5
public interface IShopTypeService extends IService<ShopType> {

Result queryShopTypeByList();
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
@Service
public class ShopTypeServiceImpl extends ServiceImpl<ShopTypeMapper, ShopType> implements IShopTypeService {

@Resource
private StringRedisTemplate stringRedisTemplate;


@Override
public Result queryShopTypeByList() {
//1. 从Redis中查询商铺类型
String shopTypeListJson = stringRedisTemplate.opsForValue().get(CACHE_SHOP_TYPE_KEY);

//2. 判断缓存是否命中
//3 如果命中 也就是数据在Redis中存在
if(StrUtil.isNotBlank(shopTypeListJson)){
//将JSON数据转化为List<ShopType>集合
List<ShopType> shopTypeList = JSONUtil.toList(shopTypeListJson, ShopType.class);
return Result.ok(shopTypeList);
}
//4. 如果Redis中不存在数据 则在MySQL中查询
//这里的sort是tb_shop_type中的排序列
//所以这段代码使用MyBatisPlus中的query方法按照sort列的顺序赋值给shopTypeList
List<ShopType> shopTypeList = query().orderByAsc("sort").list();
//5. 如果在MySQL中也没查到 返回404
if(shopTypeList == null){
return Result.fail("商铺列表不存在!");
}
//6. 如果在MySQL中查到了 将其存入Redis 使用String类型存入Redis
stringRedisTemplate.opsForValue().set(CACHE_SHOP_TYPE_KEY,JSONUtil.toJsonStr(shopTypeList));
//7. 返回商铺列表信息
return Result.ok(shopTypeList);
}

}

4.3 缓存更新策略

  • 缓存更新是Redis为了节约内存而设计出来的一个东西,主要是因为内存数据宝贵,当我们向Redis插入太多数据,此时就可能会导致缓存中数据过多,所以Redis会对部分数据进行更新,或者把淘汰更合适。🤔
  1. 内存淘汰:Redis自动进行,当Redis内存达到我们设定的max-memory时,会自动触发淘汰机制,淘汰掉一些不重要的数据(可以我们自己设置策略方式)。
  2. 超时剔除:当我们给Redis设置了过期时间TTL之后,Redis会将超时的数据进行删除,方便我们继续使用缓存。
  3. 主动更新:我们可以手动调用方法把缓存删除掉,通常用于解决缓存和数据库不一致问题。
内存淘汰 超时剔除 主动更新
说明 不用自己维护,利用Redis的内存淘汰机制,当内存不足时自动淘汰部分数据。下次查询时更新缓存。 给缓存数据添加TTL时间,到期后自动删除缓存。下次查询时更新缓存。 编写业务逻辑,在修改数据库的同时,更新缓存。
一致性 一般
维护成本
  • 业务场景
    • 低一致性需求:使用内存淘汰机制,例如店铺类型的查询缓存(因为这个很长一段时间都不需要更新)
    • 高一致性需求:主动更新,并以超时剔除作为兜底方案,例如店铺详情查询的缓存

数据库和缓存不一致解决方案:

  • 由于我们的缓存数据源来自数据库,而数据库的数据是会发生变化的,因此,如果当数据库中数据发生变化,而缓存却没有同步,此时就会有一致性问题存在,其后果是用户使用缓存中的过时数据,就会产生类似多线程数据安全问题,从而影响业务,产品口碑等。

    • 那么如何解决这个问题呢?有如下三种方式:
      • Cache Aside Pattern 人工编码方式:缓存调用者在更新完数据库之后再去更新缓存,也称之为双写方案。

      • Read/Write Through Pattern:缓存与数据库整合为一个服务,由服务来维护一致性。调用者调用该服务,无需关心缓存一致性问题。但是维护这样一个服务很复杂,市面上也不容易找到这样的一个现成的服务,开发成本高。

      • Write Behind Caching Pattern:调用者只操作缓存,其他线程去异步处理数据库,最终实现一致性。但是维护这样的一个异步的任务很复杂,需要实时监控缓存中的数据更新,其他线程去异步更新数据库也可能不太及时,而且缓存服务器如果宕机,那么缓存的数据也就丢失了。

在企业的实际应用中,还是方案一最可靠。

  • 操作缓存和数据库时有三个问题需要考虑:
    • 删除缓存还是更新缓存?

      • 更新缓存:每次更新数据库都需要更新缓存,无效写操作较多。
      • 删除缓存:更新数据库时让缓存失效,再次查询时更新缓存。(写的频率会降低,有效更新会更高)
    • 如何保证缓存和数据库操作同时成功或失败?(保证原子性)

      • 单体系统:将缓存与数据库操作放在同一个事务。
      • 分布式系统:利用TCC等分布式事务方案。
    • 先操作缓存还是先操作数据库 ?(保证线程安全)

删除缓存的操作很快,但是更新数据库的操作相对较慢,如果此时有一个线程2刚好进来查询缓存,由于我们刚刚才删除缓存,所以线程2需要查询数据库,并写入缓存,但是我们更新数据库的操作还未完成,所以线程2查询到的数据是脏数据,数据库和缓存中的值不一样,出现线程安全问题。

alt 先删除缓存,再操作数据库
alt 先删除缓存,再操作数据库

线程1在查询缓存的时候,缓存TTL刚好失效,需要查询数据库并写入缓存,这个操作耗时相对较短(相比较于上图来说),但是就在这么短的时间内,线程2进来了,更新数据库,删除缓存,但是线程1虽然查询完了数据(更新前的旧数据),但是还没来得及写入缓存,所以线程2的更新数据库与删除缓存,并没有影响到线程1的查询旧数据,写入缓存,数据库和缓存中的值不一样,造成线程安全问题。

alt 先操作数据库,再删除缓存
alt 先操作数据库,再删除缓存

红色文字


4.4 实现商铺缓存双写一致

核心思路

  • 修改ShopController中的业务逻辑,满足以下要求:
    • 根据id查询店铺时,如果缓存未命中,则查询数据库,并将数据库结果写入缓存,并设置TTL。
    • 根据id修改店铺时,先修改数据库,再删除缓存
  • 修改ShopServiceImpl的queryById方法,写入缓存时设置一下TTL
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class ShopServiceImpl extends ServiceImpl<ShopMapper, Shop> implements IShopService {

@Resource
StringRedisTemplate stringRedisTemplate;

@Override
public Result queryById(Long id) {
//1. 从Redis查询商铺缓存
String shopJson = stringRedisTemplate.opsForValue().get(CACHE_SHOP_KEY + id);
//2. 判断缓存是否命中
//3 如果命中 也就是数据在Redis中存在
if (StrUtil.isNotBlank(shopJson)) {
Shop shop = JSONUtil.toBean(shopJson, Shop.class);
return Result.ok(shop);
}
//3. 如果Redis中不存在数据 则在MySQL中查询
Shop shop = getById(id);
//4. 如果在MySQL中也没查到 返回404
if (shop == null) {
return Result.fail("店铺不存在!");
}
//5. 如果在MySQL中查到了 存入Redis 使用String存的是JSON类型
- stringRedisTemplate.opsForValue().set(CACHE_SHOP_KEY + id, JSONUtil.toJsonStr(shop));


+ /*写入缓存设置TTL*/
+ stringRedisTemplate.opsForValue().set(CACHE_SHOP_KEY + id, JSONUtil.toJsonStr(shop),CACHE_SHOP_TTL, TimeUnit.MINUTES);


//6. 返回商铺信息
return Result.ok(shop);
}
}

实现updateShop方法并加入缓存

  • 这里需要先更新数据库再删除之前的缓存 来保持数据一致性
1
2
3
4
5
6

@PutMapping
public Result updateShop(@RequestBody Shop shop) {
return shopService.update(shop);
}

1
Result update(Shop shop);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
    @Override
//如果删除缓存抛出异常 依赖@Transactional来回滚数据库事务
@Transactional
public Result update(Shop shop) {
//1. 更新数据库
//先判断以下ID是否为空
if(shop.getId()==null){
return Result.fail("店铺id不能为空");
}
//更新数据库
updateById(shop);
//2. 删除缓存 如果有之前的商铺id 这里直接删除之前的商铺的缓存 保持原子性
stringRedisTemplate.delete(CACHE_SHOP_KEY + shop.getId());
//返回结果
return Result.ok();
}
}

使用Postman来更新商铺

  1. 在postman中添加一个New Request 改名为”更新店铺”。
  2. 设置请求路径为http://localhost:8081/shop (注意是后端的8081)。
  3. 在Body里选择raw-JSON模式,设置修改的信息,然后Send。
  4. 当显示200 OK就是发送成功了。
    alt postnab
    alt postnab
  • 这时查看CLI 发现已经调用updateById方法修改了商铺数据
  • MySQL里的商铺数据也发生了更改
  • Redis中这条商铺缓存已经删除了

alt CLI中update
alt CLI中update

alt MySQL中update
alt MySQL中update


4.5 缓存穿透

  • 缓存穿透:指客户端请求的数据在缓存中或在数据库中都不存在,这样缓存永远都不生效(只有数据库查到了,才会让redis缓存,现在的问题是查不到),会频繁的访问数据库。
  • 常见的解决方案:
    • 缓存空对象:
      • 将不存在的数据设置为null存到redis中,这样下次用户就不会直接访问数据库。给这种空对象设置TTL,防止占用太多内存。但是如果恰好在数据库中更新了相同key的对象,就会数据库缓存短期不一致。
        • 优点:实现简单,维护方便。
        • 缺点:额外的内存消耗,可能造成短期的不一致。
    • 布隆过滤:
      • 在客户端和Redis之间设置一个布隆过滤器,过滤不存在的数据
        • 优点:内存占用少,没有多余的key。
        • 缺点:实现复杂,可能存在误判
          alt 布隆过滤器
          alt 布隆过滤器

关于布隆过滤器

布隆过滤器怎么知道我们数据库和Redis中是否有存在的数据呢?
其实是采用的哈希思想来解决这个问题,通过一个庞大的二进制数据,根据哈希思想去判断当前数据是否存在。
误判的原因:布隆过滤器采用的是哈希思想,只要是哈希思想,就可能存在哈希冲突。

编码解决商品查询的缓存穿透问题

  • 核心思路如下
    alt 解决缓存穿透
    alt 解决缓存穿透
  • 在ServiceImpl层增加判断逻辑 如果不存在将value设置为空字符串,设置较短的TTL,返回错误信息。
  • 再次发起查询,判断value是否为空字符串,如果是,就说明是不存在的数据,返回错误信息。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@Override
public Result queryById(Long id) {
//1. 从Redis查询商铺缓存
String shopJson = stringRedisTemplate.opsForValue().get(CACHE_SHOP_KEY + id);
//2. 判断缓存是否命中
//3 如果命中 也就是数据在Redis中存在
if (StrUtil.isNotBlank(shopJson)) {
Shop shop = JSONUtil.toBean(shopJson, Shop.class);
return Result.ok(shop);
}

+ //判断商铺信息是否为空值
+ //这里shopJson只有空值和null两种情况
+ //排除null就是空对象
+ if(shopJson != null){
+ return Result.fail("商铺不存在");
+ }

//3. 如果未命中 则在MySQL中查询
Shop shop = getById(id);
- //4. 如果在MySQL中也没查到 返回404
+ //4. 如果在MySQL中也没查到 将空值写入Redis
if (shop == null) {
+ stringRedisTemplate.opsForValue().set(CACHE_SHOP_KEY + id,"",CACHE_NULL_TTL,TimeUnit.MINUTES);
return Result.fail("店铺不存在!");
}
//5. 如果在MySQL中查到了 存入Redis 使用String存的是JSON类型
//写入缓存设置TTL
stringRedisTemplate.opsForValue().set(CACHE_SHOP_KEY + id, JSONUtil.toJsonStr(shop),CACHE_SHOP_TTL, TimeUnit.MINUTES);
//6. 返回商铺信息
return Result.ok(shop);
}

运行之后进入一个不存在的店铺页面 会提示错误

alt 查询不存在店铺
alt 查询不存在店铺

在Redis中可以看到空对象,并且有很短的TTL

alt Redis中看空值
alt Redis中看空值

小结:

缓存穿透产生的原因是什么?
用户请求的数据在缓存中和在数据库中都不存在,不断发起这样的请求,会给数据库带来巨大压力
缓存产投的解决方案有哪些?
缓存null值
布隆过滤
增强id复杂度,避免被猜测id规律(可以采用雪花算法)
做好数据的基础格式校验
加强用户权限校验
做好热点参数的限流


4.6 缓存雪崩

  • 缓存雪崩是指在同一时间段,大量缓存的key同时失效,或者Redis服务宕机,导致大量请求到达数据库,带来巨大压力。

  • 解决方案

    • 给不同的Key的TTL添加随机值,让其在不同时间段分批失效。
    • 利用Redis集群提高服务的可用性(使用一个或者多个哨兵(Sentinel)实例组成的系统,对redis节点进行监控,在主节点出现故障的情况下,能将从节点中的一个升级为主节点,进行故障转义,保证系统的可用性。 )。
    • 给缓存业务添加降级限流策略 (在微服务中)
    • 给业务添加多级缓存(浏览器访问静态资源时,优先读取浏览器本地缓存;访问非静态资源(ajax查询数据)时,访问服务端;请求到达Nginx后,优先读取Nginx本地缓存;如果Nginx本地缓存未命中,则去直接查询Redis(不经过Tomcat);如果Redis查询未命中,则查询Tomcat;请求进入Tomcat后,优先查询JVM进程缓存;如果JVM进程缓存未命中,则查询数据库)。

4.7 缓存击穿

  • 缓存击穿也叫热点Key问题,就是一个被高并发访问并且缓存重建业务较复杂的key突然失效了,那么无数请求访问就会在瞬间给数据库带来巨大的冲击。

缓存击穿解决方法

  • 逻辑分析
    • 假设线程1在查询缓存之后未命中,去查询数据库重建缓存数据,完成之后其他线程就可以从缓存中加载数据了。
    • 但在线程1没有重建缓存之前进来了很多线程同时访问,却没有缓存,就会一起访问数据库,导致数据库访问压力过大。
    • alt 缓存击穿1
      alt 缓存击穿1
  1. 互斥锁

    • 利用锁的互斥性,假设线程过来,只能一个一个人访问数据库,从而避免对数据库频繁访问。

    • 但是会影响查询的性能,使其从并行改成串行,我们可以用tryLock方法+double check来解决这个问题

    • 线程1在操作的时候,拿锁把门锁上,那么线程2,3,4就不能操作数据库了,只有1操作完打开锁,缓存数据重建好了,那么其他线程直接从redis就能查到数据。

    • alt 缓存击穿互斥锁
      alt 缓存击穿互斥锁

  2. 逻辑过期

    • 之所以会出现缓存击穿,主要是因为我们对key设置了TTL,如果不设置TTL,那么就不会有缓存击穿了,但是不设置TTL,又会一直占内存,所以我们可以采用逻辑过期方案

    • 我们直接在数据的value中设置过期时间,过期时间不会直接作用于Redis,而是我们后续用过逻辑去处理。

    • alt 缓存击穿逻辑过期2
      alt 缓存击穿逻辑过期2

    • 假设线程1去查询缓存,然后从value中判断当前数据已经过期。此时线程1去获取互斥锁,那么其他线程会被阻塞,获得锁的进程他会开启一个新线程去进行之前的重建缓存数据的逻辑 (异步构建缓存数据),直到新开的线程完成逻辑之后才会释放锁。而线程1直接返回过期的数据,假设线程3过来访问,由于线程2拿着锁,所以线程3无法获得锁,线程3也直接返回线程1的旧数据数据(牺牲了数据一致性换取性能的提升)。

    • 只有等待线程2重建缓存数据之后,其他线程才能返回正确的数据。

    • 这种方案巧妙在于,异步构建缓存数据,缺点是在重建完缓存数据之前,返回的都是脏数据.

    • alt 缓存击穿逻辑过期
      alt 缓存击穿逻辑过期

  • 对比互斥锁与逻辑删除
  • 互斥锁方案:由于保证了互斥性,所以数据一致,且实现简单,只是加了一把锁而已,也没有其他的事情需要操心,所以没有额外的内存消耗,缺点在于有锁的情况,就可能死锁,所以只能串行执行,性能会受到影响。
  • 逻辑过期方案:线程读取过程中不需要等待,性能好,有一个额外的线程持有锁去进行重构缓存数据,但是在重构数据完成之前,其他线程只能返回脏数据,且实现起来比较麻烦。
解决方案 优点 缺点
互斥锁 没有额外的内存消耗
保证一致性
实现简单
线程需要等待,性能受影响
可能有死锁风险
逻辑过期 线程无需等待,性能较好 不保证一致性
有额外内存消耗
实现复杂

基于互斥锁方式解决缓存击穿问题

  • 核心思路:

    • 进行查询之后,如果没有从缓存中查询到数据,则进行互斥锁的获取,获取互斥锁之后,判断是否获取到了锁,如果没获取到,则休眠一段时间,过一会儿再去尝试,直到获取到锁为止,才能进行查询。
    • 如果获取到了锁的线程,则进行查询,将查询到的数据写入Redis,再释放锁,返回数据,利用互斥锁就能保证只有一个线程去执行数据库的逻辑,防止缓存击穿。
    • alt 缓存击穿互斥锁流程图
      alt 缓存击穿互斥锁流程图
  • 操作锁的代码:

  • 核心思路就是利用redis的setnx指令来表示获取锁,如果redis没有这个key,则插入成功,返回1,如果已经存在这个key,则插入失败,返回0。在StringRedisTemplate中返回true/false,我们可以根据返回值来判断是否有线程成功获取到了锁。

以免你不知道什么是setnx指令😓

  • alt setnx指令
    alt setnx指令
1
2
3
4
5
6
7
//设置互斥锁
private boolean tryLock(String key){
//setOfAbsent就是redis中的setnx 如果value不为空则不能修改
Boolean Mutex = stringRedisTemplate.opsForValue().setIfAbsent(LOCK_SHOP_KEY + key, "notnull", LOCK_SHOP_TTL, TimeUnit.MINUTES);
//避免返回值为null,我们这里使用了BooleanUtil工具类
return BooleanUtil.isTrue(Mutex);
}
1
2
3
4
//释放互斥锁
private void unLock(String key){
stringRedisTemplate.delete(LOCK_SHOP_KEY + key);
}
  • 先把之前的缓存穿透代码修改一下,提取成一个独立的方法 先存着不去调用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34

//缓存穿透代码
public Shop queryWithPassThrough(Long id) {

//1. 从Redis查询商铺缓存
String shopJson = stringRedisTemplate.opsForValue().get(CACHE_SHOP_KEY + id);
//2. 判断缓存是否命中
//3 如果命中 也就是数据在Redis中存在
//isNotBlank对于空字符串返回的也是false
if (StrUtil.isNotBlank(shopJson)) {
return JSONUtil.toBean(shopJson, Shop.class);
}

//判断商铺信息是否为空值
//这里shopJson只有空值和null两种情况
//排除null就是空对象
if(shopJson != null){
return null;
}

//3. 如果未命中 则在MySQL中查询
Shop shop = getById(id);
//4. 如果在MySQL中也没查到 将空值写入Redis
if (shop == null) {
stringRedisTemplate.opsForValue().set(CACHE_SHOP_KEY + id,"",CACHE_NULL_TTL,TimeUnit.MINUTES);
return null;
}
//5. 如果在MySQL中查到了 存入Redis 使用String存的是JSON类型
//写入缓存设置TTL
stringRedisTemplate.opsForValue().set(CACHE_SHOP_KEY + id, JSONUtil.toJsonStr(shop),CACHE_SHOP_TTL, TimeUnit.MINUTES);
//6. 返回商铺信息
return shop;

}
  • 然后写一个互斥锁的缓存方法queryWithMutex
    • 这里使用try/catch/finally来捕获异常。
    • 如果没有获取互斥锁,那么休眠并通过递归重试
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
//缓存穿透代码
public Shop queryWithMutex(Long id) {

//1. 从Redis查询商铺缓存
String shopJson = stringRedisTemplate.opsForValue().get(CACHE_SHOP_KEY + id);
//2. 判断缓存是否命中
//3 如果命中 也就是数据在Redis中存在
//isNotBlank对于空字符串返回的也是false
if (StrUtil.isNotBlank(shopJson)) {
return JSONUtil.toBean(shopJson, Shop.class);
}

//判断商铺信息是否为空值
//这里shopJson只有空值和null两种情况
//排除null就是空对象
if(shopJson != null){
return null;
}
Shop shop = null;

try {
//3. 如果未命中 实现缓存重建
//3.1 获取互斥锁
boolean isGetLock = tryLock(CACHE_SHOP_KEY + id);
//3.2 判断是否获取成功
if(!isGetLock){
//3.3 如果获取失败 休眠并重试
Thread.sleep(50);
//递归
return queryWithMutex(id);
}
//如果成功获取 根据id查询数据库
shop = getById(id);
//为了更容易发生并发冲突 这里模拟重建的延迟 = 如果是企业级代码一定不行
Thread.sleep(200);

//4. 如果在MySQL中也没查到 将空值写入Redis
if (shop == null) {
stringRedisTemplate.opsForValue().set(CACHE_SHOP_KEY + id,"",CACHE_NULL_TTL,TimeUnit.MINUTES);
return null;
}
//5. 如果在MySQL中查到了 存入Redis 使用String存的是JSON类型
//写入缓存设置TTL
stringRedisTemplate.opsForValue().set(CACHE_SHOP_KEY + id, JSONUtil.toJsonStr(shop),CACHE_SHOP_TTL, TimeUnit.MINUTES);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
//6. 释放互斥锁
unLock(CACHE_SHOP_KEY + id);

}

//7. 返回商铺信息
return shop;

}
  • 最后修改queryById方法
1
2
3
4
5
6
7
8
9
10
11
12
13
 @Override
public Result queryById(Long id) {
//缓存穿透
// Shop shop = queryWithPassThrough(id);

//互斥锁解决缓存击穿
Shop shop = queryWithMutex(id);
if (shop == null) {
return Result.fail("店铺不存在!");
}
//返回商铺信息
return Result.ok(shop);
}
  • 使用JMeter进行测试吧😄

    • 模拟一下缓存击穿的场景:
      • 某个时刻 一个热点数据的TTL到期了,用户不能从Redis中获取热点商品数据,然后就都去数据库里查询,造成数据库压力过大。
    • 那么我们首先将Redis中的热点商品数据删除,模拟TTL到期,用JMeter压力测试,去访问这个没有缓存的热点数据。
  • 这里JMeter设置1000个线程访问数据

    • 如果后台只输出了一条SQL语句 则说明互斥锁生效了,没有造成大量的用户查询数据库
1
2
3
: ==>  Preparing: SELECT id,name,type_id,images,area,address,x,y,avg_price,sold,comments,score,open_hours,create_time,update_time FROM tb_shop WHERE id=?
: ==> Parameters: 2(Long)
: <== Total: 1

基于逻辑过期方式方式解决缓存击穿问题

  • 需求:修改根据id查询的店铺的逻辑,基于逻辑过期的方式来解决缓存击穿问题
  • 思路分析: 当用户开始查询Redis时,判断是否命中
    • 没命中:返回空数据。
    • 命中:将value取出,判断value中的过期时间。
      • 没过期:直接返回redis中的数据。
      • 过期:尝试获取互斥锁,如果获取到了,则开启独立线程,然后直接返回之前的数据,独立线程去重构数据,重构完成之后释放互斥锁。
    • alt 缓存击穿逻辑过期流程图
      alt 缓存击穿逻辑过期流程图
  1. 封装数据:因为现在redis中存储的数据的value需要带上过期时间,此时要么去修改原来的实体类,要么新建一个类包含原有的数据和过期时间
  • 如果修改了原来的实体类 就违反了OCP原则 所以这选择新建一个工具类Redisdata
    • Redisdata里设置一个逻辑过期时间 如果让ShopServiceImpl继承Redisdata就会有代码侵入性 ,所以直接object一个data.
  1. 在ShopServiceImpl中新增方法,进行单元测试,看看是否写入数据。
1
2
3
4
5
6
@Data
public class RedisData {
private LocalDateTime expireTime;
private Object data;
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
//将封装有获取时间的店铺信息传到redis
//设置成public以便于测试类调用
public void saveShopToRedis(Long id,Long expireSeconds){
//1. 查询店铺数据
Shop shop = getById(id);

//2. 写入逻辑过期
RedisData redisData = new RedisData();
//将shop的数据添加到redisData当中
redisData.setData(shop);
//过期时间 = 本地时间 + 所预期的时间
redisData.setExpireTime(LocalDateTime.now().plusSeconds(expireSeconds));

//3. 写入Redis
stringRedisTemplate.opsForValue().set(CACHE_SHOP_KEY + id, JSONUtil.toJsonStr(redisData),CACHE_SHOP_TTL, TimeUnit.MINUTES);
}
1
2
3
4
@Test
void testSaveShopToRedis() {
shopService.saveShopToRedis(1L,20L);
}
* 运行测试类 在Redis中查看id为1的商铺数据 * 发现有`data`和`expireTime`属性
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
{
"data": {
"area": "大关",
"openHours": "10:00-22:00",
"sold": 4215,
"images": "",
"address": "金华路锦昌文华苑29号",
"comments": 3035,
"avgPrice": 80,
"updateTime": 1760852369000,
"score": 37,
"createTime": 1640167839000,
"name": "102茶餐厅",
"x": 120.149192,
"y": 30.316078,
"typeId": 1,
"id": 1
},
"expireTime": 1760929504900
}
  1. 开始编写正式代码吧😳
    Tips:
    • 为了缓存重建需要创建一个线程池。
    • 这里默认有热点数据(Redis中有这个数据,并且还有过期时间)。所以未命中直接返回空。
    • 将redisData中的data拿出来是一个JSONObejct类型
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50

//创建一个缓存重建线程池
private static final ExecutorService CACHE_REBUILD_EXECUTOR = Executors.newFixedThreadPool(10);

//使用逻辑过期解决缓存穿透代码
public Shop queryWithLogicalExpire(Long id) {

//1. 从Redis查询商铺缓存
String shopJson = stringRedisTemplate.opsForValue().get(CACHE_SHOP_KEY + id);

//因为这是解决缓存穿透的代码,默认已经有了热点数据,所以如果未命中直接返回空
if(StrUtil.isBlank(shopJson)){
return null;
}
//如果命中 判断缓存是否过期
//先把JSON反序列化为对象
RedisData redisData = JSONUtil.toBean(shopJson, RedisData.class);
//将data转为Shop对象

//redisData中的data是一个JSONObject类型
//将data修改为shop类型
JSONObject data = (JSONObject)redisData.getData();
Shop shop = JSONUtil.toBean(data, Shop.class);

//获取过期时间
LocalDateTime expireTime = redisData.getExpireTime();

//未过期 直接返回商铺信息
if (expireTime.isAfter(LocalDateTime.now())) {
return shop;
}

//过期了 尝试获取互斥锁
boolean getKey = tryLock(LOCK_SHOP_KEY + id);
if (getKey) {
//成功获取互斥锁 开启独立线程 实现缓存重建
CACHE_REBUILD_EXECUTOR.submit(() -> {
try {
//缓存重建就是重新查看数据库 重新将数据写入缓存
saveShopToRedis(id,20L);
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
//释放锁
unLock(LOCK_SHOP_KEY + id);
}
});
}
return shop;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Override
public Result queryById(Long id) {
//解决缓存穿透
//Shop shop = queryWithPassThrough(id);

//互斥锁解决缓存击穿
//Shop shop = queryWithMutex(id);

//逻辑过期解决缓存击穿
Shop shop = queryWithLogicalExpire(id);

//判断是否为空数据
if (shop == null) {
return Result.fail("店铺不存在!");
}
//返回商铺信息
return Result.ok(shop);
}

Tips:
* 注意这里只启用了queryWithLogicalExpire 如果未命中直接返回空 所以这里必须要在Redis中先有热点数据 (还要有过期时间!)

  • 运行代码吧!
    • 运行之后直接修改数据库 将redis中现有的热点数据修改个值
    • 使用JMeter来进行测试,就会发现在几次请求发送之后 值被成功修改了!
    • 再去看服务端后台 发现只执行了一次 那么就成功了!

4.8 封装缓存工具

封装缓存工具

  • 基于StringRedisTemplate封装一个缓存工具类,需满足下列要求

    • 方法1:将任意Java对象序列化为JSON,并存储到String类型的Key中,并可以设置TTL过期时间

      1
      2
      3
      public void set(String key, Object value, Long time, TimeUnit timeUnit) {
      stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(value), time, timeUnit);
      }
    • 方法2:将任意Java对象序列化为JSON,并存储再String类型的key中,并可以设置逻辑过期时间,用于处理缓存击穿问题。

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      public void setWithLogicExpire(String key, Object value, Long time, TimeUnit timeUnit) {
      //由于要生成过期时间 所以需要用到RedisData
      RedisData redisData = new RedisData();
      //添加逻辑过期时间
      redisData.setExpireTime(LocalDateTime.now().plusSeconds(timeUnit.toSeconds(time)));
      //添加数据
      redisData.setData(value);
      //将数据转化成json存入redis 因为是逻辑过期所以不需要加过期时间
      stringRedisTemplate.opsForValue().set(key,JSONUtil.toJsonStr(redisData));
      }
    • 方法3:根据指定的key查询缓存,并反序列化为指定类型,利用缓存空值的方式解决缓存击穿问题。

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      public <R,ID> R queryWithPassThrough(String keyPrefix, ID id, Class<R> type, Function<ID,R> dbFallback,Long time, TimeUnit timeUnit) {
      String key = keyPrefix + id;

      String json = stringRedisTemplate.opsForValue().get(key);

      if (StrUtil.isNotBlank(json)) {
      return JSONUtil.toBean(json, type);
      }


      if(json != null){
      return null;
      }


      R r = dbFallback.apply(id);


      //如果数据库中没有 返回空值
      if (r == null) {
      stringRedisTemplate.opsForValue().set(key,"",CACHE_NULL_TTL,TimeUnit.SECONDS);
      return null;
      }

      set(key,r,time,timeUnit);

      return r;
      }
    • 方法4:根据指定的key查询缓存,并反序列化为指定类型,需要利用互斥锁解决缓存击穿问题

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      public <R, ID> R queryWithMutex(String keyPrefix, ID id, Class<R> type, Function<ID, R> dbFallback, Long time, TimeUnit timeUnit) {
      //先从Redis中查,这里的常量值是固定的前缀 + 店铺id
      String key = keyPrefix + id;
      String json = stringRedisTemplate.opsForValue().get(key);
      //如果不为空(查询到了),则转为Shop类型直接返回
      if (StrUtil.isNotBlank(json)) {
      return JSONUtil.toBean(json, type);
      }
      if (json != null) {
      return null;
      }
      R r = null;
      String lockKey = LOCK_SHOP_KEY + id;
      try {
      //否则去数据库中查
      boolean flag = tryLock(lockKey);
      if (!flag) {
      Thread.sleep(50);
      return queryWithMutex(keyPrefix, id, type, dbFallback, time, timeUnit);
      }
      r = dbFallback.apply(id);
      //查不到,则将空值写入Redis
      if (r == null) {
      stringRedisTemplate.opsForValue().set(key, "", CACHE_NULL_TTL, TimeUnit.MINUTES);
      return null;
      }
      //并存入redis,设置TTL
      this.set(key, r, time, timeUnit);
      } catch (InterruptedException e) {
      throw new RuntimeException(e);
      } finally {
      unLock(lockKey);
      }
      return r;
      }

    • 方法5:根据指定的key查询缓存,并反序列化为指定类型,需要利用逻辑过期解决缓存击穿问题

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53
      54
      55
      56
      57
      58
      59
      60
       //创建一个缓存重建线程池
      private static final ExecutorService CACHE_REBUILD_EXECUTOR = Executors.newFixedThreadPool(10);

      //5. 根据指定的key查询缓存,并反序列化为指定类型,需要利用逻辑过期解决缓存击穿问题
      public <R,ID> R queryWithLogicalExpire(String keyPrefix, ID id, Class<R> type, Function<ID,R> dbFallback,Long time, TimeUnit timeUnit) {

      //1. 从Redis查询缓存
      String key = keyPrefix + id;
      String json = stringRedisTemplate.opsForValue().get(key);

      //因为这是解决缓存穿透的代码,默认已经有了热点数据,所以如果未命中直接返回空
      if(StrUtil.isBlank(json)){
      return null;
      }

      //如果命中 判断缓存是否过期
      //先把JSON反序列化为对象
      RedisData redisData = JSONUtil.toBean(json, RedisData.class);
      //将data转为Shop对象

      //redisData中的data是一个JSONObject类型
      //将data修改为shop类型
      JSONObject data = (JSONObject)redisData.getData();
      //这里R是泛型 所以不能用R.class
      R r = JSONUtil.toBean(data, type);

      //获取过期时间
      LocalDateTime expireTime = redisData.getExpireTime();

      //未过期 直接返回商铺信息
      if (expireTime.isAfter(LocalDateTime.now())) {
      return r;
      }

      //过期了 尝试获取互斥锁
      String lockKey = LOCK_SHOP_KEY + id;
      boolean getKey = tryLock(lockKey);
      if (getKey) {
      //成功获取互斥锁 开启独立线程 实现缓存重建
      CACHE_REBUILD_EXECUTOR.submit(() -> {
      try {
      //缓存重建就是重新查看数据库 重新将数据写入缓存
      //先查数据库 得到数据
      //r1是新的数据
      R r1 = dbFallback.apply(id);
      //再写Redis
      //这里将ri变为之后的r
      setWithLogicExpire(key,r1,time,timeUnit);

      } catch (Exception e) {
      throw new RuntimeException(e);
      } finally {
      //释放锁
      unLock(lockKey);
      }
      });
      }
      //这里如果没有获取互斥锁 获取的是过期的数据
      return r;
      }
  • 完整工具代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
package com.hmdp.utils;
import cn.hutool.core.util.BooleanUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import com.hmdp.entity.Shop;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.time.LocalDateTime;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

import static com.hmdp.utils.RedisConstants.*;


@Slf4j
@Component
public class CacheClient {
@Resource
private StringRedisTemplate stringRedisTemplate;



//方法1:将任意Java对象序列化为JSON,并存储到String类型的Key中,并可以设置TTL过期时间
public void set(String key, Object value,Long time, TimeUnit timeUnit) {
//将value序列化为JSON字符串
stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(value), time, timeUnit);
}


//方法2:将任意Java对象序列化为JSON,并存储在String类型的Key中,并可以设置逻辑过期时间,用于处理缓存击穿问题
public void setWithLogicExpire(String key, Object value, Long time, TimeUnit timeUnit) {
//由于要生成过期时间 所以需要用到RedisData
RedisData redisData = new RedisData();
//添加逻辑过期时间
redisData.setExpireTime(LocalDateTime.now().plusSeconds(timeUnit.toSeconds(time)));
//添加数据
redisData.setData(value);
//将数据转化成json存入redis 因为是逻辑过期所以不需要加过期时间
stringRedisTemplate.opsForValue().set(key,JSONUtil.toJsonStr(redisData));
}



//方法3:根据指定的Key查询缓存,并反序列化为指定类型,利用缓存空值的方式解决缓存穿透问题
//这里返回值不确定 使用泛型
//所以参数要有数据的类型 来告诉泛型要用什么类型
//因为id是一个string类型 所以这里需要传一个id的前缀
//id的类型也不确定 所以也需要用泛型
//因为不知道类型 不能使用getById 所以使用function 传入数据库取出操作 第一个参数是传入的值 第二个参数是返回类型也就是R
public <R,ID> R queryWithPassThrough(String keyPrefix, ID id, Class<R> type, Function<ID,R> dbFallback,Long time, TimeUnit timeUnit) {
String key = keyPrefix + id;

String json = stringRedisTemplate.opsForValue().get(key);

if (StrUtil.isNotBlank(json)) {
return JSONUtil.toBean(json, type);
}


if(json != null){
return null;
}


R r = dbFallback.apply(id);


//如果数据库中没有 返回空值
if (r == null) {
stringRedisTemplate.opsForValue().set(key,"",CACHE_NULL_TTL,TimeUnit.SECONDS);
return null;
}

set(key,r,time,timeUnit);

return r;
}



//方法4:根据指定的Key查询缓存,并反序列化为指定类型,需要利用互斥锁解决缓存击穿问题
public <R, ID> R queryWithMutex(String keyPrefix, ID id, Class<R> type, Function<ID, R> dbFallback, Long time, TimeUnit timeUnit) {
//先从Redis中查,这里的常量值是固定的前缀 + 店铺id
String key = keyPrefix + id;
String json = stringRedisTemplate.opsForValue().get(key);
//如果不为空(查询到了),则转为Shop类型直接返回
if (StrUtil.isNotBlank(json)) {
return JSONUtil.toBean(json, type);
}
if (json != null) {
return null;
}
R r = null;
String lockKey = LOCK_SHOP_KEY + id;
try {
//否则去数据库中查
boolean flag = tryLock(lockKey);
if (!flag) {
Thread.sleep(50);
return queryWithMutex(keyPrefix, id, type, dbFallback, time, timeUnit);
}
r = dbFallback.apply(id);
//查不到,则将空值写入Redis
if (r == null) {
stringRedisTemplate.opsForValue().set(key, "", CACHE_NULL_TTL, TimeUnit.MINUTES);
return null;
}
//并存入redis,设置TTL
this.set(key, r, time, timeUnit);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
unLock(lockKey);
}
return r;
}



//创建一个缓存重建线程池
private static final ExecutorService CACHE_REBUILD_EXECUTOR = Executors.newFixedThreadPool(10);

//5. 根据指定的key查询缓存,并反序列化为指定类型,需要利用逻辑过期解决缓存击穿问题
public <R,ID> R queryWithLogicalExpire(String keyPrefix, ID id, Class<R> type, Function<ID,R> dbFallback,Long time, TimeUnit timeUnit) {

//1. 从Redis查询缓存
String key = keyPrefix + id;
String json = stringRedisTemplate.opsForValue().get(key);

//因为这是解决缓存穿透的代码,默认已经有了热点数据,所以如果未命中直接返回空
if(StrUtil.isBlank(json)){
return null;
}

//如果命中 判断缓存是否过期
//先把JSON反序列化为对象
RedisData redisData = JSONUtil.toBean(json, RedisData.class);
//将data转为Shop对象

//redisData中的data是一个JSONObject类型
//将data修改为shop类型
JSONObject data = (JSONObject)redisData.getData();
//这里R是泛型 所以不能用R.class
R r = JSONUtil.toBean(data, type);

//获取过期时间
LocalDateTime expireTime = redisData.getExpireTime();

//未过期 直接返回商铺信息
if (expireTime.isAfter(LocalDateTime.now())) {
return r;
}

//过期了 尝试获取互斥锁
String lockKey = LOCK_SHOP_KEY + id;
boolean getKey = tryLock(lockKey);
if (getKey) {
//成功获取互斥锁 开启独立线程 实现缓存重建
CACHE_REBUILD_EXECUTOR.submit(() -> {
try {
//缓存重建就是重新查看数据库 重新将数据写入缓存
//先查数据库 得到数据
//r1是新的数据
R r1 = dbFallback.apply(id);
//再写Redis
//这里将ri变为之后的r
setWithLogicExpire(key,r1,time,timeUnit);

} catch (Exception e) {
throw new RuntimeException(e);
} finally {
//释放锁
unLock(lockKey);
}
});
}
//这里如果没有获取互斥锁 获取的是过期的数据
return r;
}


//设置互斥锁
private boolean tryLock(String key){
//setOfAbsent就是redis中的setnx 如果value不为空则不能修改
Boolean Mutex = stringRedisTemplate.opsForValue().setIfAbsent(LOCK_SHOP_KEY + key, "notnull", LOCK_SHOP_TTL, TimeUnit.MINUTES);
//避免返回值为null,我们这里使用了BooleanUtil工具类
return BooleanUtil.isTrue(Mutex);
}


//释放互斥锁
private void unLock(String key){
stringRedisTemplate.delete(LOCK_SHOP_KEY + key);
}

}

5. 优惠券秒杀

5.1 Redis实现全局唯一ID

  • 用户抢购时,会自动生成订单并保存到tb_voucher_order这张表中,而订单表如果使用数据库自增ID就会存在一些问题:

    • id的规律性太明显
    • 受单表数据量的限制
  • 全局ID生成器是一种在分布式系统下用来生成全局唯一ID的工具,一般满足下列特性:

    • 唯一性
    • 高可用
    • 高性能
    • 递增性
    • 安全性
  • 为了增加ID的安全性,我们可以不直接使用Redis自增的数值,而是拼接其他信息。

  • ID组成部分:

    • 符号位:1bit 永远为0
    • 时间戳:31bit 以秒为单位,可以使用69年(2^31秒)
    • 序列号:32bit 秒的计数器 支持每秒传输2^32个不同的ID

开始实现唯一ID吧

  • Redis自增ID策略:

    • 每天一个key,方便统计订单量
    • ID构造是 时间戳+计时器
  • 先设置项目的初始时间 这里例为2025年1月1日

1
2
3
4
5
6
7
8
9
//获取项目的初始时间 这里定为2025年1月1号0点0分
public static void main(String[] args) {

LocalDateTime time = LocalDateTime.of(2025, 1, 1, 0, 0);
//将时间转化为秒
long second = time.toEpochSecond(ZoneOffset.UTC);
System.out.println("second = "+second);
//这里输出是1735689600 说明项目初识时间是这个时间 将这个时间定义为常量
}
  • 获取到了项目初始时间是1735689600,将其设置为常量BEGIN_TIMESTAMP
1
private static final long BEGIN_TIMESTAMP = 1735689600L;
  • 写出功能代码
    • 使用increment方法来使redis自增
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35


@Component
public class RedisIdWorker {
@Resource
private StringRedisTemplate stringRedisTemplate;

//开始时间
private static final Long BEGIN_TIMESTAMP = 1735689600L;
//序列号位数
public static final Long COUNT_BITS = 32L;

public long nextId(String keyPrefix){
//1. 生成时间戳
LocalDateTime now = LocalDateTime.now();
long currentTime = now.toEpochSecond(ZoneOffset.UTC);

//timeStamp为时间戳
long timeStamp = currentTime - BEGIN_TIMESTAMP;


//2. 生成序列号
//2.1 获取当前日期 精确到天
String date = now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd"));
//2.2 自增长
//不会担心空指针问题 他会自增长
long count = stringRedisTemplate.opsForValue().increment("icr:"+keyPrefix+":"+date);

//3. 拼接并返回
//拼接long类型用位运算
//让timeStamp向左移动32位
//用或运算将count填充到timeStamp
return timeStamp << COUNT_BITS|count;
}
}
  • 开始测试一下吧,写一个工具类
    • 创建一个线程池 线程数量为500
    • 关于CountDownLatch:
      • 是一个同步工具类 用于协调多个线程之间的同步。
      • 创建一个Latch从初始计数开始 每个线程完成了之后利用countDown方法减一 最后使用await方法阻塞 。
      • 作用是确保主线程在300个任务完成后再执行,计算总耗时。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
private ExecutorService es = Executors.newFixedThreadPool(500);


@Test
void testGenerateId() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(300);
Runnable task = ()->{
for (int i = 0; i < 100; i++) {
long id = redisIdWorker.nextId("order");
System.out.println("id = "+id);
}
latch.countDown();
};
long beginTime = System.currentTimeMillis();
for (int i = 0; i < 300; i++) {
es.submit(task);
}
latch.await();
long endTime = System.currentTimeMillis();
System.out.println("time = " + (endTime - beginTime));
}

最后会输出30000个唯一ID 因为执行了300次有100次循环生成的任务。

位运算原理

  • 时间戳: 00000000 00000000 00000000 00001001 (假设为9)
  • 左移32位: 00000000 00000000 00000000 00001001 00000000 00000000 00000000 00000000
  • 序列号: 00000000 00000000 00000000 00000101 (假设为5)
  • 按位或: 00000000 00000000 00000000 00001001 00000000 00000000 00000000 00000101
  • 结果: 时间戳9 + 序列号5 的合并ID

5.2 实现优惠券秒杀下单

  • 每个店铺都可以发送优惠券,分为平价券和特价券。平价券可以任意购买,而特价券需要秒杀抢购。

  • 表关系如下:

    • tb_voucher:优惠券的基本信息,优惠金额,使用规则等。
    • tb_seckill_voucher:优惠券的库存,开始抢购时间,结束抢购时间,特价优惠券才需要填写这些信息
  • 平价券由于优惠力度并不是很大,所以是可以任意领取。而代金券由于优惠力度大,所以像第二种券,就得限制数量,从表结构上也能看出,特价券除了具有优惠券的基本信息以外,还具有库存,抢购时间,结束时间等等字段

  • 黑马在VoucherController中提供了三个接口,可以实现下面的功能:

1
2
3
4
5
@PostMapping
public Result addVoucher(@RequestBody Voucher voucher) {
voucherService.save(voucher);
return Result.ok(voucher.getId());
}
1
2
3
4
5
@PostMapping("seckill")
public Result addSeckillVoucher(@RequestBody Voucher voucher) {
voucherService.addSeckillVoucher(voucher);
return Result.ok(voucher.getId());
}
1
2
3
4
@GetMapping("/list/{shopId}")
public Result queryVoucherOfShop(@PathVariable("shopId") Long shopId) {
return voucherService.queryVoucherOfShop(shopId);
}
  • 其中添加秒杀优惠券的业务代码如下
    • 其中voucher中的stockbeginTimeendTime都被注解了@TableField(exist = false) 也就是在数据库中不存在
    • seckillVoucher中有这几个字段 所以在创建秒杀优惠券的时候只需要添加这几个信息和ID
    • 其中ID是进行券的关联
1
2
3
4
5
6
7
8
9
10
11
12
13
@Override
@Transactional
public void addSeckillVoucher(Voucher voucher) {
// 保存优惠券
save(voucher);
// 保存秒杀信息
SeckillVoucher seckillVoucher = new SeckillVoucher();
seckillVoucher.setVoucherId(voucher.getId());
seckillVoucher.setStock(voucher.getStock());
seckillVoucher.setBeginTime(voucher.getBeginTime());
seckillVoucher.setEndTime(voucher.getEndTime());
seckillVoucherService.save(seckillVoucher);
}
  • 由于这里并没有后台管理页面,所以我们只能用POSTMAN模拟发送请求来新增秒杀券,请求路径http://localhost:8081/voucher/seckill
    • 请求方式POST,JSON数据如下,注意优惠券的截止日期设置,若优惠券过期,则不会在页面上显示。
    • 注意:这里结束时间要在现实时间之后 要不然就过期看不见了
1
2
3
4
5
6
7
8
9
10
11
12
{
"shopId":1,
"title":"100元代金券",
"subTitle":"周一至周五可用",
"rules":"全场通用\\n无需预约\\n可无限叠加",
"payValue":8000,
"actualValue":10000,
"type":1,
"stock":100,
"beginTime":"2022-01-01T00:00:00",
"endTime":"2025-10-31T23:59:59"
}
  • 添加完效果如下
    alt 秒杀券显示
    alt 秒杀券显示

5.3 实现秒杀下单

  • 点击限时抢购,然后查看发送的请求
1
2
请求网址: http://localhost:8080/api/voucher-order/seckill/13
请求方法: POST
  • 看样子是VoucherOrderController里的方法
  • 而且还没完成
1
2
3
4
5
6
7
8
9
@RestController
@RequestMapping("/voucher-order")
public class VoucherOrderController {
@PostMapping("seckill/{id}")
public Result seckillVoucher(@PathVariable("id") Long voucherId) {
return Result.fail("功能未完成");
}
}

实现一下抢秒杀券功能吧

  • 流程如下:

    alt 抢秒杀券流程图
    alt 抢秒杀券流程图

  • 对应着流程图 来编写相对应的代码吧

  • 这里需要注入voucherOrderService接口
1
2
3
4
5
6
7
8
9
@Resource
private IVoucherOrderService voucherOrderService;

@PostMapping("seckill/{id}")
public Result seckillVoucher(@PathVariable("id") Long voucherId) {

return voucherOrderService.seckillVoucher(voucherId);

}
1
2
3
4
5
 public interface IVoucherOrderService extends IService<VoucherOrder> {

Result seckillVoucher(Long voucherId);
}

  • 要注入秒杀券的信息
  • 通过优惠券id来得到秒杀券在数据库里的数据
  • 先讲三个id数据传入数据库 其他的属性先不用管 有默认的值
  • 注:这里有一个mybatis使用方式 我们在下面讲
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
//查询秒杀全ID 所以要注入秒杀券信息
@Resource
private ISeckillVoucherService seckillVoucherService;
@Resource
private RedisIdWorker redisIdWorker;

@Override
public Result seckillVoucher(Long voucherId) {
//1. 查询秒杀券
//因为优惠券和秒杀券ID相同 所以这里可以直接使用优惠券ID
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);

//2. 判断秒杀是否开始
//2.1 没有开始 返回错误信息
if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
return Result.fail("活动没有开始");
}

//3. 判断秒杀是否已经结束
if (voucher.getEndTime().isBefore(LocalDateTime.now())) {
return Result.fail("活动已经结束");
}

//4. 判断库存是否充足
if (voucher.getStock() < 1) {
return Result.fail("该优惠券已售罄");
}

//5. 扣减库存
boolean success = seckillVoucherService.update() // 1. 开始更新操作
.setSql("stock = stock - 1") // 2. 设置更新的SQL片段
.eq("voucher_id", voucherId) // 3. WHERE条件:指定优惠券ID
.gt("stock", 0) // 4. WHERE条件:库存必须大于0
.update(); // 5. 执行更新,返回是否成功

if (!success) { // 6. 判断更新结果
return Result.fail("库存不足"); // 7. 更新失败,返回库存不足
}
//6. 创建订单
VoucherOrder voucherOrder = new VoucherOrder();
//设置订单id
Long orderId = redisIdWorker.nextId("order");
//从ThreadLocal中获得用户id
Long userId = UserHolder.getUser().getId();

//设置代金券id
voucherOrder.setVoucherId(voucherId);
voucherOrder.setId(orderId);
voucherOrder.setUserId(userId);

//7. 将订单保存到数据库
save(voucherOrder);

//8. 返回订单ID
return Result.ok(orderId);
}

🛠 MyBatisPlus的链式调用设计

在上面的实现秒杀券下单功能里有这样一段代码:

1
2
3
4
boolean success = seckillVoucherService.update() 
.setSql("stock = stock - 1")
.eq("voucher_id", voucherId)
.update();
  • 这一段是MyBatis中的链式调用,逐步构建查询条件最后执行
  • 具体流程:
  1. update():开始构建更新指令
  2. setSql():设置sql语句的字段
  3. .eq:设置sql语句的where条件
  4. update():最终执行
  • 两个 .update() 的区别:
位置 作用 返回值
seckillVoucherService.update() 创建更新条件构造器 UpdateWrapper 对象
链式调用的 .update() 执行数据库更新操作 boolean (是否更新成功)
  • 📋 常用的WHERE条件方法
方法 含义 SQL等价 示例
.eq() 等于 = .eq("status", 1) → status = 1
.ne() 不等于 != .ne("status", 0) → status != 0
.gt() 大于 > .gt("age", 18) → age > 18
.ge() 大于等于 >= .ge("score", 60) → score >= 60
.lt() 小于 < .lt("price", 100) → price < 100
.le() 小于等于 <= .le("count", 10) → count <= 10
.like() 模糊匹配 LIKE .like("name", "张%") → name LIKE '张%'
.in() IN查询 IN .in("id", Arrays.asList(1,2,3)) → id IN (1,2,3)

5.4 超卖问题

为什么会出现超卖问题🧐

  • 我们写的代码其实有问题,当遇到高并发场景时,会出现超卖现象
  • 用JMeter开200个线程来模拟抢优惠券的场景
  • URL为localhost:8081/voucher-order/seckill/12(注意,这里我的秒杀全的id是12)
  • 这里弱智黑马没有说要加请求头,所以直接测压会全报错,因为没有token,所以被直接拦截了
    • 加一个请求头
      • Name为authorization Value为用户的token
        alt token请求头
        alt token请求头

这时测压一般会发生超卖现象 也就是秒杀券库存表里的券变成了负数 多买了几张!!!🤬

这个问题怎么出现的?🧐

  • 假如说第一个线程检测库存为1,还没来得及扣减,这时第二个线程就开始检测库存了,这时检测库存也是1,也开始扣减。
  • 如此一来,库存就被扣成负数了😔
  • 这个问题称为并发安全问题
    alt 超卖问题解析
    alt 超卖问题解析

超买问题是典型的多线程安全问题,针对这一问题的最常见解决方式就是加锁。对于加锁,有常见的两种方案:

  1. 悲观锁(sync)
    • 悲观锁认为线程安全问题一定会发生,因此在操作数据之前先获取锁,确保线程串行执行
    • 例如SynchronizedLock等,都是悲观锁
  2. 乐观锁(voilate)
    • 乐观锁认为线程安全问题不一定会发生,因此不加锁,只是在更新数据的时候再去判断有没有其他线程对数据进行了修改
    • 如果没有修改,则认为自己是安全的,自己才可以更新数据
    • 如果已经被其他线程修改,则说明发生了安全问题,此时可以重试或者异常

—两种锁的实现方式—

  • 悲观锁:悲观锁可以实现对于数据的串行化执行,比如syn,和lock都是悲观锁的代表。

    • 同时,悲观锁中又可以再细分为公平锁非公平锁可重入锁,等等
  • 乐观锁:

    1. 版本号法:每次操作数据会对版本号+1,再提交回数据时,会去校验是否比之前的版本大1 ,如果大1 ,则进行操作成功。
      • 这套机制的核心逻辑在于,如果在操作过程中,版本号只比原来大1 ,那么就意味着操作过程中没有人对他进行过修改,他的操作就是安全的,如果不大1,则数据被修改过。

alt 乐观锁版本号法
alt 乐观锁版本号法

  1. CAS(Compare-And-Swap),比较之后再交换,进行无锁化机制加锁,也就是用其他数据比如库存库存来代替版本锁
    alt 乐观锁CAS法
    alt 乐观锁CAS法

5.5 使用乐观锁解决库存超卖问题

乐观锁的弊端😫

  • 这里我就就使用CAS方法 在扣减库存的时候加上乐观锁 判断库存是否相等
1
2
3
4
5
6
//5. 扣减库存
boolean success = seckillVoucherService.update() // 1. 开始更新操作
.setSql("stock = stock - 1") // 2. 设置更新的SQL片段 set stock = stock - 1
.eq("voucher_id", voucherId) // 3. WHERE条件:指定优惠券ID where voucher_id = voucherId
.eq("stock", voucher.getStock()) //4. 添加乐观锁CAS 将stock作为版本号 使条件中添加库存相同 where stock = voucher.getStock()
.update(); // 5. 执行更新,返回是否成功
  • 这时我们使用JMeter测压 发现大多都失败了! 100个券只被抢了20个!

  • 这里就牵扯到了乐观锁的弊端:

    • 假如我有100个线程 这100个线程同时抢票 查询的库存都是100
    • 只有第1个抢到的线程成功了 其他99个线程都判断版本号不一致失败了!🤬
    • 这里并没有线程安全问题 但是成功率太低了!

对乐观锁进行改进🙆‍

  1. 不使用版本号判断 直接判断库存是否大于0 如果不大于0 不执行SQL语句
1
2
3
4
5
6
//5. 扣减库存
boolean success = seckillVoucherService.update() // 1. 开始更新操作
.setSql("stock = stock - 1") // 2. 设置更新的SQL片段 set stock = stock - 1
.eq("voucher_id", voucherId) // 3. WHERE条件:指定优惠券ID where voucher_id = voucherId
.gt("stock", 0) // 4. WHERE条件:库存必须大于0 where stock > 0
.update(); // 5. 执行更新,返回是否成功

成功了🤭 既没有了线程安全问题 又保证了成功率变高


5.6 实现一人一单功能

  • 需求:修改秒杀业务,要求同一个优惠券,一个用户只能下一单

    alt 一人一单流程图
    alt 一人一单流程图

  • 具体操作逻辑如下:我们在判断库存是否充足之后,根据我们保存的订单数据,判断用户订单是否已存在

    • 如果已存在,则不能下单,返回错误信息
    • 如果不存在,则继续下单,获取优惠券
  • 先去实现这个逻辑:

    • 就是在删减库存前面判断用户是否重复下过订单
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
@Transactional
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
//查询秒杀全ID 所以要注入秒杀券信息
@Resource
private ISeckillVoucherService seckillVoucherService;
@Resource
private RedisIdWorker redisIdWorker;

@Override
public Result seckillVoucher(Long voucherId) {
//1. 查询秒杀券
//因为优惠券和秒杀券ID相同 所以这里可以直接使用优惠券ID
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);

//2. 判断秒杀是否开始
//2.1 没有开始 返回错误信息
if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
return Result.fail("活动没有开始");
}

//3. 判断秒杀是否已经结束
if (voucher.getEndTime().isBefore(LocalDateTime.now())) {
return Result.fail("活动已经结束");
}

//4. 判断库存是否充足
if (voucher.getStock() < 1) {
return Result.fail("该优惠券已售罄");
}



//5. 判断用户是否重复下单
//从ThreadLocal中获得用户id
Long userId = UserHolder.getUser().getId();


//5.1 根据userId和voucherId来查询订单
Integer count = query()
.eq("user_id", userId)
.eq("voucher_id", voucherId).count();



//5.2 判断是否存在
//如果存在
if (count > 0) {
return Result.fail("您已经抢购过该秒杀券!");
}


//6. 扣减库存
boolean success = seckillVoucherService.update() // 1. 开始更新操作
.setSql("stock = stock - 1") // 2. 设置更新的SQL片段 set stock = stock - 1
.eq("voucher_id", voucherId) // 3. WHERE条件:指定优惠券ID where voucher_id = voucherId
.gt("stock", 0) // 4. WHERE条件:库存必须大于0 where stock > 0
.update(); // 5. 执行更新,返回是否成功

if (!success) { // 6. 判断更新结果
return Result.fail("库存不足"); // 7. 更新失败,返回库存不足
}


//7. 创建订单
VoucherOrder voucherOrder = new VoucherOrder();

//设置订单id
Long orderId = redisIdWorker.nextId("order");
//设置代金券id
voucherOrder.setVoucherId(voucherId);
voucherOrder.setId(orderId);
voucherOrder.setUserId(userId);

//8. 将订单保存到数据库
save(voucherOrder);

//9. 返回订单ID
return Result.ok(orderId);
}
}

  • 这时再使用JMeter进行测试 发现库存变成了90,但咱们只想要这一个用户下一单啊!

  • 存在的问题:

    • 如果用户故意开多线程抢优惠券,那么在判断库存充足之后,执行一人一单逻辑之前,在这个区间如果进来了多个线程,还是可以抢多张优惠券的,那我们这里使用悲观锁来解决这个问题。
  • 解决方法:

    • 我们把一人一单逻辑的代码都提取到一个createVoucherOrder方法中,然后给这个方法加锁。(synchronized意为同步的)
    • 不管哪个线程(例如线程A)运行到这个方法时,都要检查有没有其他线程正在用这个方法(或者该类的其他同步方法),有的话等正在使用synchronized方法的其他线程运行完之后再运行线程A,没有的话,锁定调用者,然后直接运行。
    • 如果在整个方法上加锁的话太占性能,因为我们要实现的功能是一人一单,所以只在方法内部的userId上加锁。
  • 注意 不能在userId本身上加锁:

    • Long是包装类,虽然每次查询id一样,但每次都是新的类,需要换成字符串再转到线程池才能共享
    • 每次toString都new了一个新字符串对象在堆里,所以只用userId.toString()拿到的也不是同一个对象。
    • 用intern()方法可以让同一个值的字符串不重复的放到字符串常量池中
    • tips:这是JVM里的东西,没看懂的可以学一下JVM
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
@Transactional
public Result createVoucherOrder(Long voucherId) {
//5. 判断用户是否重复下单
//从ThreadLocal中获得用户id
Long userId = UserHolder.getUser().getId();

//我们这个锁只需要单加到用户上
//使用同步锁将userId锁起来
//synchronized只能锁对象和类对象

synchronized (userId.toString().intern()) {

//5.1 根据userId和voucherId来查询订单
Integer count = query()
.eq("user_id", userId)
.eq("voucher_id", voucherId).count();


//5.2 判断是否存在
//如果存在
if (count > 0) {
return Result.fail("您已经抢购过该秒杀券!");
}


//6. 扣减库存
boolean success = seckillVoucherService.update() // 1. 开始更新操作
.setSql("stock = stock - 1") // 2. 设置更新的SQL片段 set stock = stock - 1
.eq("voucher_id", voucherId) // 3. WHERE条件:指定优惠券ID where voucher_id = voucherId
.gt("stock", 0) // 4. WHERE条件:库存必须大于0 where stock > 0
.update(); // 5. 执行更新,返回是否成功

if (!success) { // 6. 判断更新结果
return Result.fail("库存不足");
}


//7. 创建订单
VoucherOrder voucherOrder = new VoucherOrder();

//设置订单id
Long orderId = redisIdWorker.nextId("order");
//设置代金券id
voucherOrder.setVoucherId(voucherId);
voucherOrder.setId(orderId);
voucherOrder.setUserId(userId);

//8. 将订单保存到数据库
save(voucherOrder);

//9. 返回订单ID
return Result.ok(orderId);
}
}
  • 但是这里还有个问题:
    • 我们应该先释放锁再提交事务,由于当前方法是由Spring事务控制,如果在内部加锁,可能会导致当前方法事务还没有提交,但是锁已经释放了,这样会导致问题。
    • 将当前方法整体用锁包裹起来,再执行createVoucherOrder函数去完成下单
    • 等函数执行完也就是事务提交了,这时再释放锁。这样就能确保线程安全
1
2
3
4
5
6
 Long userId = UserHolder.getUser().getId();
synchronized (userId.toString().intern()) {

return createVoucherOrder(voucherId);

}
  • 但是还是有事务问题😅:
    • 我们只是对createVoucherOrder函数加了事务,没给外部函数加事务。
    • 外部函数调用方法的时候,用的是this.的方式调用的,事务想要生效,还得利用代理来生效。
    • 所以这个地方我们需要获取原始的事务对象来操作事务
    • 这里使用AopContext.currentProxy()方法来获取当前对象的代理,然后用代理对象调用方法,记得要去接口IVoucherOrderService中创建createVoucherOrder
1
2
3
4
5
6
7
Long userId = UserHolder.getUser().getId();
synchronized (userId.toString().intern()) {
//这里代理对象是这个接口的代理对象
IVoucherOrderService proxy = (IVoucherOrderService)AopContext.currentProxy();
//要调用代理的createVoucherOrder,要在接口中创建这个函数
return proxy.createVoucherOrder(voucherId);
}
  • 这个方法还会用到一个依赖aspectjweaver
1
2
3
4
5
<dependency>
<groupId>org.aspectj</groupId>
<artifactId>aspectjweaver</artifactId>
<version>1.9.24</version>
</dependency>
  • 同时在启动类上加上@EnableAspectJAutoProxy(exposeProxy = true)注解
1
2
3
4
5
6
7
8
9
@MapperScan("com.hmdp.mapper")
@SpringBootApplication
@EnableAspectJAutoProxy(exposeProxy = true)
public class HmDianPingApplication {
public static void main(String[] args) {
SpringApplication.run(HmDianPingApplication.class, args);
}

}
  • 重启服务器,再次使用Jmeter测试,200个线程并发,但是只能抢到一张优惠券,目的达成😙

5.7 集群下的线程并发安全问题

  • 通过加锁可以解决单机情况下的一人一单安全问题,但是在集群模式下就不行

  • 模拟集群:

    1. 我们将服务启动两份,端口分别为8081和8082:

      1. 对着服务按CTRL+D,拷贝当前启动项
      2. 点击修改选项,然后点击添加虚拟机选项
      3. 在里面输入-Dserver.port=8082,然后同时启动两个服务,这样就有了服务集群
        alt 同时启动两个服务
        alt 同时启动两个服务
    2. 然后修改nginx的conf目录下的nginx.conf文件,配置反向代理负载均衡

      • proxy_pass http://127.0.0.1:8081;注释掉,改用proxy_pass http://backend;
      • 在backend里面加入server 127.0.0.1:8082 max_fails=5 fail_timeout=10s weight=1;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
   location /api {  
default_type application/json;
#internal;
keepalive_timeout 30s;
keepalive_requests 1000;
#支持keep-alive
proxy_http_version 1.1;
rewrite /api(/.*) $1 break;
proxy_pass_request_headers on;
#more_clear_input_headers Accept-Encoding;
proxy_next_upstream error timeout;
#proxy_pass http://127.0.0.1:8081;
proxy_pass http://backend;
}
upstream backend {
server 127.0.0.1:8081 max_fails=5 fail_timeout=10s weight=1;
server 127.0.0.1:8082 max_fails=5 fail_timeout=10s weight=1;
}
  1. 在nginx文件中输入.\nginx.exe -s reload使其重载

  2. 现在,用户请求会在这两个节点上负载均衡,再次测试下是否存在线程安全问题😘

    • 访问http://localhost:8080/api/voucher/list/1,因为在nginx中配置了访问8080会调用8081和8082
    • 这时看后台命令框则发现只有一个服务被调用了 刷新一下页面,发现另外一个服务被调用了
    • 这就是nginx的轮询机制
  • 在集群中测试线程并发
    • 我们使用POSTMAN发送两次请求,header携带同一用户的token,尝试用同一账号抢两张优惠券,发现是可行的。
    • 失败原因:
      • 我们部署了多个Tomcat,每个Tomcat都有一个属于自己的jvm,那么假设在服务器A的Tomcat内部,有两个线程,即线程1和线程2,这两个线程使用的是同一份代码,那么他们的锁对象是同一个,是可以实现互斥的。
      • 但是如果在服务器B的Tomcat的内部,又有两个线程,他们的锁对象虽然写的跟服务器A一样,但是却不是同一个,所以线程3和线程4可以实现互斥,却不能和线程1,线程2互斥。
      • alt 集群下的线程并发安全问题
        alt 集群下的线程并发安全问题
    • 这就是集群环境下,syn锁失效的原因,在这种情况下,我们需要使用分布式锁来解决这个问题,让锁不存在于每个jvm的内部,而是让所有jvm公用外部的一把锁(Redis)

6. 分布式锁

6.1 基本原理和实现方式比对

  • 分布式锁:满足分布式系统或集群模式下多线程可见并且可以互斥的锁

    • 核心思想:大家共同用一把锁,那么我们就能锁住线程,不让线程进行,让程序串行执行。
      alt 分布式锁原理图
      alt 分布式锁原理图
  • 分布式锁的特征

    1. 可见性 (这里说的可见性并不是并发编程中指的内存可见性,只是说多个进程之间都能感知到变化的意思)
    2. 互斥
    3. 高可用 (程序不易崩溃,时时刻刻保证较高可用性)
    4. 高性能 (由于加锁本身让性能降低,所以对于分布式锁需要较高的加锁性能和释放锁性能)
    5. 安全性
  • 常见的分布式锁有三种

    1. MySQL:MySQL本身就带有锁机制,但是由于MySQL的性能一般,所以采用分布式锁的情况下,使用MySQL作为分布式锁比较少见
    2. Redis:Redis作为分布式锁是非常常见的一种使用方式,现在企业级开发中基本都是用Redis或者Zookeeper作为分布式锁,利用SETNX这个方法,如果插入Key成功,则表示获得到了锁,如果有人插入成功,那么其他人就回插入失败,无法获取到锁,利用这套逻辑完成互斥,从而实现分布式锁
    3. Zookeeper:Zookeeper也是企业级开发中较好的一种实现分布式锁的方案,但本文是学Redis的,所以这里就不过多阐述了
MySQL Redis Zookeeper
互斥 利用mysql本身的互斥锁机制 利用setnx这样的互斥命令 利用节点的唯一性和有序性实现互斥
高可用
高性能 一般 一般
安全性 断开连接,自动释放锁 利用锁超时时间,到期释放 临时节点,断开连接自动释放

6.2 Redis分布式锁的实现核心思路

  • 实现分布式锁时需要实现两个基本方法
  1. 获取锁

    • 互斥:确保只能由一个线程获取锁。
    • 非阻塞:尝试一次,成功返回true,失败返回false
    1
    2
    3
    4
    # 添加锁,利用setnx的互斥特性
    SETNX lock thread1
    # 添加锁过期时间,避免服务器宕机引起的死锁
    EXPIRE lock 10
    • 但是这样写会有问题:如果在没有添加过期时间之前宕机了,就会发生死锁
    • 我们查询set的用法,发现还有几个参数
      • EX:后面加以s为单位的过期时间
      • PX:后面加以ms为单位的过期时间
      • NX:跟SETNX一样,来设置锁
    1
    SET key value [NX|XX] [GET] [EX seconds|PX milliseconds|EXAT unix-time-seconds|PXAT unix-time-milliseconds|KEEPTTL]
    • 所以我们可以通过设置EX,NX来只用一个命令实现有过期时间的互斥锁
    1
    SET lock thread01 NX EX 10
  2. 释放锁

    • 手动释放
    • 超时释放:获取锁的时候添加一个超时时间
    1
    DEL lock
  • 核心思路
    • 我们利用redis的SETNX方法,当由多个线程进入时,我们就利用该方法来获取锁。
    • 第一个线程进入时,redis中有这个key了,返回了1,表示他抢到了锁,那么他去执行业务,然后删除锁,退出锁逻辑。
    • 没有抢到锁(返回了0)的线程,等待一定时间后重试。

6.3 实现基础分布式锁

  • 需求:定义一个类,实现下面接口,利用Redis实现分布式锁功能

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    public interface ILock {
    /**
    * 尝试获取锁
    *
    * @param timeoutSec 锁持有的超时时间,过期自动释放
    * @return true表示获取锁成功,false表示获取锁失败
    */
    boolean tryLock(long timeoutSec);

    /**
    * 释放锁
    */
    void unlock();
    }
  • 创建SimpleRedisLock类来实现接口

    • 因为类没有被spring管理,所以用构造函数注入
    • 获取线程唯一id使用Thread.currentThread().getId()
    • Boolean包装类直接返回会有自动拆箱,所以使用它的equals方法来判断
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      public class SimpleRedisLock implements ILock{
      //锁的前缀
      private static final String KEY_PREFIX = "lock:";

      //具体业务名称,将前缀和业务名拼接之后当做Key
      private String name;

      //这里不是@Autowired注入,采用的是构造器注入,在创建SimpleRedisLock时,将RedisTemplate作为参数传入
      private StringRedisTemplate stringRedisTemplate;

      //构造函数
      public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {
      this.name = name;
      this.stringRedisTemplate = stringRedisTemplate;
      }


      @Override
      public boolean tryLock(long timeoutSec) {
      //获取当前线程表示
      long threadId = Thread.currentThread().getId();
      String threadIdStr = String.valueOf(threadId);

      //获取锁
      Boolean isGetLock = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadIdStr, timeoutSec, TimeUnit.SECONDS);
      //因为Boolean是包装类,直接返回会有自动拆箱,可能会有安全风险
      //所以这里使用包装类的equals方法来判断
      return Boolean.TRUE.equals(isGetLock);
      }

      @Override
      public void unlock() {
      stringRedisTemplate.delete(KEY_PREFIX + name);
      }
      }
  • 修改业务代码

    • 去掉sync锁,采用RedisLock
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
Long userId = UserHolder.getUser().getId();
//尝试去创建锁对象
//业务是这个用户是否重复下单,所以锁id可以加上用户id
SimpleRedisLock lock = new SimpleRedisLock("order:" + userId, stringRedisTemplate);
//获取锁对象
boolean isGetLock = lock.tryLock(120);
if (!isGetLock) {
return Result.fail("不允许抢多张优惠券");
}
try {
// 获取代理对象
IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
return proxy.createVoucherOrder(voucherId);
} finally {
// 释放锁
lock.unlock();
}
  • 使用Jmeter进行压力测试,请求头中携带登录用户的token,最终只能抢到一张优惠券

6.4 解决误删问题

误删问题解析🧐

  • 逻辑说明

  • 持有的锁的线程1在锁内部阻塞了,导致锁的TTL到期,自动释放

  • 此时线程2也来尝试获取锁,由于线程1已经释放了锁,所以线程2可以拿到

  • 此时线程1阻塞完了,继续往下执行,要释放锁了

  • 此时就会将属于线程2的锁释放,这就是误删别人锁的情况

  • 解决方案

  • 在每个线程释放锁的时候,都判断以下这个锁是不是自己的, 如果不属于自己,则不进行删除操作。

  • 假设还是上面的情况,线程1阻塞,锁自动释放,线程2开始获取锁,此时线程1阻塞完了,开始释放锁

  • 但是发现这把锁不是自己的,所以不释放了

  • 当线程2执行到释放锁的逻辑时,如果TTL还未过期,则判断当前这把锁是自己的,所以释放锁

  • 需求:修改之前的分布式锁实现

  • 概述:在获取锁的时候存入线程表示 (用UUID标示,在一个JVM中,ThreadId一般不会重复,但是我们是集群模式,有多个JVM,多个JVM可能存在出现ThreadId重复的情况),在释放锁的时候先获取锁的线程标示,判断是否一致。

  • 修改一下多线程锁的业务,将标示加上UUID并且在释放锁之前判断线程标示是否一致

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51

public class SimpleRedisLock implements ILock{

//具体业务名称,将前缀和业务名拼接之后当做Key
private final String name;


//这里不是@Autowired注入,采用的是构造器注入,在创建SimpleRedisLock时,将RedisTemplate作为参数传入
private final StringRedisTemplate stringRedisTemplate;

//构造函数
public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {
this.name = name;
this.stringRedisTemplate = stringRedisTemplate;
}


//锁的前缀
private static final String KEY_PREFIX = "lock:";
//锁的线程标识的前缀
private static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-";


@Override
public boolean tryLock(long timeoutSec) {
//获取当前线程标示
String threadId = ID_PREFIX + Thread.currentThread().getId();


//获取锁
Boolean isGetLock = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadId, timeoutSec, TimeUnit.SECONDS);
//因为Boolean是包装类,直接返回会有自动拆箱,可能会有安全风险
//所以这里使用包装类的equals方法来判断
return Boolean.TRUE.equals(isGetLock);
}

@Override
public void unlock() {
//获取线程标识
String threadId = ID_PREFIX + Thread.currentThread().getId();
//获取锁中的线程标识
String id = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);

//判断是否一致
if (threadId.equals(id)) {
//一致,释放锁
stringRedisTemplate.delete(KEY_PREFIX + name);
}

}
}

6.5 Lua脚本解决分布式锁的原子性问题

更为极端的误删逻辑😅

  1. 假设线程1已经获取锁,在判断一致后,准备释放锁(也就是调用unlock)的时候,又出现了阻塞(例如JVM垃圾回收机制)
  2. 于是TTL过期,自动释放了
  3. 这时线程2拿到了锁
  4. 但是线程1还没执行完,那么线程1就会把线程2的锁给删了
  5. 这就是删锁的原子性问题
  • Redis提供了Lua脚本功能,在一个脚本中编写多条Redis命令,确保多条命令执行时的原子性
  • Lua是一种编程语言,它的基本语法可以上菜鸟教程看看,链接:https://www.runoob.com/lua/lua-tutorial.html
  • 这里重点介绍Redis提供的调用函数,我们可以使用Lua去操作Redis,而且还能保证他的原子性,这样就可以实现拿锁,判断标示,删锁是一个原子性动作了

Lua脚本操作Redis用法

  • Redis提供的调用函数语法:
1
redis.call('命令名称','key','其他参数'....)
  • 例如我们要执行set name yi_bo_,则脚本是这样
1
redis.call('set','name','yin_bo_')
  • 例如我们要执行set name yin_bo_,再执行get name,则脚本如下
1
2
3
4
5
6
## 先执行set name yin_bo_)
redis.call('set','name','yin_bo_')
## 再执行get name
local name = redis.call('get','name')
## 返回
return name
  • 写好脚本之后,需要用Redis命令来调用脚本,调用脚本的常见命令如下
1
2
## numkeys为参数数量  为0则不需要传参
EVAL script numkeys key[key...] arg[arg...]
  • 例如,我们要调用redis.call('set','name','yin_bo_')这个脚本,语法如下
1
EVAL "return redis.call('set','name','yin_bo_')" 0
  • 如果脚本中的key和value不想写死,可以作为参数传递,key类型参数会放入KEYS数组,其他参数会放入ARGV数组,在脚本中可以以KEYS和ARGV数组中获取这些参数

    注意:在Lua中,数组下标从1开始

1
EVAL "return redis.call('set',KEYS[1],ARGV[1])" 1 name yin_bo_
  • 现在我们来实现Lua脚本吧😊
1
2
3
4
5
6
7
8
9
--其中KEYS[1]为锁的key
--ARGB[1]为当前线程标示
--把这个标示和当前线程标示做比较
if (redis.call('get', KEYS[1]) == ARGV[1]) then
--释放锁 del key
return redis.call('del',key)
end
return 0

6.7 利用Java代码调用Lua脚本改造分布式锁

  • 在RedisTemplate中 可以利用execute方法去执行Lua脚本
    • 以下为RedisTemplate调用Lua脚本的API
    • 接受参数为脚本,keys和args
1
2
3
public <T> T execute(RedisScript<T> script, List<K> keys, Object... args) {
return this.scriptExecutor.execute(script, keys, args);
}
  • 如何在java文件中获取Lua文件呢?
    • 这里就需要获取多线程锁的脚本了
    • 这里Lua文件放到了resources里的LuaScript文件夹里 所以通过ClassPathResource来调用
    • static 块加载 → 脚本只读一次
1
2
3
4
5
6
public static final DefaultRedisScript<Long> UNLOCK_SCRIPT;
static{
UNLOCK_SCRIPT = new DefaultRedisScript<>();
UNLOCK_SCRIPT.setLocation(new ClassPathResource("LuaScript/Unlock.lua"));
UNLOCK_SCRIPT.setResultType(Long.class);
}
  • 现在来修改删除锁的逻辑吧
1
2
3
4
5
6
7
8
9
@Override
public void unlock() {
//调用Lua脚本
//因为KEYS必须要是一个LIST 而我们锁的key是一个字符串 所以这里用Collections.singletonList方法来将key变成单一元素的数组
stringRedisTemplate.execute(
UNLOCK_SCRIPT,
Collections.singletonList(KEY_PREFIX + name),
ID_PREFIX + Thread.currentThread().getId());
}

现在完成了,我们的分布式锁维持了原子性😊

分布式锁原子性之我见

所谓原子性,就是要保持功能执行的时间短到不会发生意外嘛
之前的删除锁功能写的太繁琐了,可能会还没删除锁就TTL失效了
所以我们要使用Lua脚本,直接调用脚本,让功能在TTL失效之前运行完

7. 分布式锁-Redisson

基于SETNX实现的分布式锁存在的问题

  1. 不可重入

    • 重入问题是指获取锁的线程,可以再次进入到相同的锁的代码块中。
    • 可重入锁的意义在于防止死锁,例如在HashTable这样的代码中,他的方法都是使用synchronized修饰的,加入他在一个方法内调用另一个方法,如果此时是不可重入的,那就死锁了
    • 所以可重入锁的重要意义是防止死锁,我们的synchronized和Lock锁都是可重入的

      可重入锁 ≠ 为了”多拿锁”
      可重入锁 = 为了”方法之间互相调用时不死锁”

  2. 不可重试

    • 我们编写的分布式锁只能尝试一次,失败了返回false,没有重试机制。
    • 但合理的情况应该是:当线程获取锁失败后,他应该能再次获取锁
  3. 超时释放

    • 我们在加锁的时候增加了TTL,这样我们可以防止死锁,但是如果卡顿(阻塞)时间太长,也会导致锁的释放,虽然我们采用Lua脚本防止删锁,误删别人的锁,但是当前的锁没锁住,也有安全隐患
  4. 主从一致性

    • 如果Redis提供了主从集群,那么当我们向集群写数据时,主机需要异步的将数据同步给从机,万一在同步之前,主机宕机了(主从同步存在延迟,虽然时间很短,但还是发生了),那么又会出现死锁问题

      客户端 → 主节点:SETNX lock:1001 OK
      主 → 从:异步同步(未完成)
      主宕机 → 从升主
      新客户端 → 新主:SETNX lock:1001 OK!→ 两个人都拿到了锁!

  • Redisson简述:

    • Redisson是一个在Redis基础上实现Java驻内存数据网格(In-Memory Data Grid)。他不仅提供了一系列的分布式Java常用对象,还提供了许多分布式服务,其中就包括了各种分布式锁的实现。
  • Redisson提供了分布式锁的功能

  • 可重入锁(Reentrant Lock)

  • 公平锁(Fair Lock)

  • 联锁(MultiLock)

  • 红锁(RedLock)

  • 读写锁(ReadWriteLock)

  • 信号量(Semaphore)

  • 可过期性信号量(PermitExpirableSemaphore)

  • 闭锁(CountDownLatch)

7.1 Redisson入门

  1. 导入依赖
1
2
3
4
5
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.17.7</version> <!-- 稳定老版本,无 Unix Socket 依赖 -->
</dependency>
  1. 配置Redisson客户端,在config包下新建RedissonConfig类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RedissonConfig {
@Bean
public RedissonClient redissonClient() {
Config config = new Config();
config.useSingleServer()
.setAddress("redis://127.0.0.1:6379")
.setPassword("zhou123quan");
return Redisson.create(config);
}
}
  1. 使用Redisson的分布式锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Resource
private RedissonClient redissonClient;

@Test
void testRedisson() throws InterruptedException {
//获取可重入锁
RLock lock = redissonClient.getLock("anyLock");
//尝试获取锁,三个参数分别是:获取锁的最大等待时间(期间会重试),锁的自动释放时间,时间单位
boolean success = lock.tryLock(1,10, TimeUnit.SECONDS);
//判断获取锁成功
if (success) {
try {
System.out.println("执行业务");
} finally {
//释放锁
lock.unlock();
}
}
}
  1. 直接将我们之前的锁进行替换,换成redisson提供的分布式锁(注意要注入RedissonClient)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
Long userId = UserHolder.getUser().getId();
//尝试去创建锁对象
//业务是这个用户是否重复下单,所以锁id可以加上用户id

RLock lock = redissonClient.getLock("order:" + userId);

//获取锁对象
//无参 失败不等待直接返回false 租约30s
boolean isGetLock = lock.tryLock();
if (!isGetLock) {
return Result.fail("不允许抢多张优惠券");
}
try {
// 获取代理对象
IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
return proxy.createVoucherOrder(voucherId);
} finally {
// 释放锁
lock.unlock();
}

这时使用JMeter开200个线程进行测压,结果也只抢到了一个o( ̄▽ ̄)ブ

7.2 Redisson可重入锁原理(已经被封装的玩意)

method1在方法内部调用method2,method1和method2出于同一个线程,那么method1已经拿到一把锁了,想进入method2中拿另外一把锁,必然是拿不到的,于是就出现了死锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
@Resource
private RedissonClient redissonClient;

private RLock lock;

@BeforeEach
void setUp() {
lock = redissonClient.getLock("lock");
}

@Test
void method1() {
boolean success = lock.tryLock();
if (!success) {
log.error("获取锁失败,1");
return;
}
try {
log.info("获取锁成功");
method2();
} finally {
log.info("释放锁,1");
lock.unlock();
}
}

void method2() {
RLock lock = redissonClient.getLock("lock");
boolean success = lock.tryLock();
if (!success) {
log.error("获取锁失败,2");
return;
}
try {
log.info("获取锁成功,2");
} finally {
log.info("释放锁,2");
lock.unlock();
}
}

其他锁的可重入原理

  • 在JUC的Lock锁中,他是借助于等增的一个voaltile的一个state变量来记录重入状态的

    • 如果当前没有人持有这把锁,那么state = 0
    • 如果有人持有了这把锁,那么state = 1
      • 如果持有这把锁的人再次持有这把锁,那么state会+1
  • 对于synchronize而言,他在C语言代码中会有一个count

    • 原理与state相同,也是重入一次就+1,释放一次就-1,直到减到0,表示这把锁没有被人持有
  • 在Redisson中的可重入锁:

    • 分布式锁中,他采用hash结构来存储锁
    • 其中外层key表示这把锁是否存在
    • 内层key则记录当前锁被哪个线程持有
  • 这里我们使用redisson重新获取锁,但是运行失败了 为什么?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Resource
private RedissonClient redissonClient;

private RLock lock;

@BeforeEach
void setUp() {
lock = redissonClient.getLock("lock");
}

@Test
void method1() {
boolean success = lock.tryLock();
if (!success) {
log.error("获取锁失败,1");
return;
}
try {
log.info("获取锁成功");
method2();
} finally {
log.info("释放锁,1");
lock.unlock();
}
}

这里我们method1和method2在同一个线程中,必然拿不到第二把锁,也就会出现死锁

  • 所以我们需要额外判断,method1和method2是否处于同一线程,如果是同一线程,则可以拿到锁,但是state会+1,之后执行的method2中的方法,执行完将state-1 只有减到0,才会真正释放锁

  • 由于我们需要额外存储一个值也就是state,所以用字符串类型SET NX EX是不行的,我们需要一个key多个value,所以用到了Hash结构,但是Hash结构又没有NX这种方法,所以我们需要将原有的逻辑拆开,进行手动判断

  • 先判断是否同线程,同线程锁加1,执行完后判断是否锁属于自己,如果属于自己锁减1

    alt Redisson可重入锁流程图
    alt Redisson可重入锁流程图

  • 为了保证原子性,所以流程图中的业务使用Lua实现

  • 获取锁的逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
local key = KEYS[1]; --锁的标示
local threadId = ARGV[1]; --线程的唯一标示
local releaseTime = ARGV[2]; --锁的自动释放时间

--如果锁不存在
if (redis.call('exists',key) == 0)then
--获取锁并且添加线程标示,state设为1
redis.call('hset',key,threadId,'1'); --第三个参数为锁的计数器
--设置锁的有效期
redis.call('expire',key,releaseTime);
return 1; -- 返回结果
end;

--锁已经存在,判断线程标识是否为自己的
if (redis.call('hexists',key,threadId) == 1)then
--是自己的,锁计数器+1,使用hincrby 使自增长的值为1
redis.call('hincrby',key,threadId,1);
--设置锁的有效期
redis.call('expire',key,releaseTime);
return 1; -- 返回结果
end;
return 0;--代码走到这里,说明锁存在而且不在同一线程,获取锁失败了
  • 释放锁的逻辑:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
local key = KEYS[1];--其中KEYS[1]为锁的key
local threadId = ARGV[1];--ARGV[1]为当前线程标示
local releaseTime = ARGV[2];--ARGV[2]为锁的自动施放时间

-- 如果锁不是自己的 也就是说threadId不存在,锁过期或者被人释放了

--只有加锁的线程,才会在 Redis 的 Hash 中 写入自己的 threadId
-- 没写就说明不是自己的锁
if (redis.call('HEXISTS',key,threadId) == 0)then
return nil; --直接返回nil 使脚本结束
end

--如果锁是自己的 锁计数器-1 还是使用hincrby,不过自增长的值为-1
local count= redis.call('HINCRBY',key,threadId,-1);

--判断重入数次为多少
if(count > 0)then
--大于0 说明锁还没有释放,重置有效期
redis.call('expire',key,releaseTime)
end

--否则直接释放锁
redis.call('del',key)
return nil

7.3 Redisson锁重试和WatchDog机制

  • 之前我们使用无参的Redisson的tryLock方法,现在我们分析以有参的参数分别代表什么

    • 尝试获取锁,三个参数分别是:获取锁的最大等待时间(期间会重试),锁的自动释放时间,时间单位
    • boolean success = lock.tryLock(1,10, TimeUnit.SECONDS);
  • Watchdog是什么?

    • 看门狗:一个后台线程,只要你持有锁,他就每隔一段时间帮你续期
    • 防止: 业务卡顿 → 锁过期 → 别人抢走!
    • 核心目标是使锁活到业务结束
    • 只有当leaseTime = -1也就是tryLock或者lock函数没有第二个参数时 会开启
    • 每隔10s续期一次
  • 什么时候不续期?

    1. unlock()取消了Watchdog任务
    2. tryLock(5,30,SECONDS) 这时leaseTime有值,不开启watchdog
    3. 锁被别人删了 HEXISTS返回为0自动停止

生活比喻
你租酒店房间 30 分钟:
无 Watchdog:30 分钟后自动退房
有 Watchdog:每 10 分钟,服务员问:“还住吗?”
你说“住!” → 再续 30 分钟
你退房 → 服务员不问了

alt Redisson看门狗流程图
alt Redisson看门狗流程图

7.4 Redisson锁的MutiLock原理

  • Redis的主从一致性问题

    • 为了提高Redis的可用性,我们会搭建集群或者主从,现在以主从为例

    • 此时我们去写命令,写在主机上,主机会将数据同步给从机,但是假设主机还没来得及把数据写入到从机去的时候,主机宕机了

    • 哨兵会发现主机宕机了,于是选举一个slave(从机)变成master(主机),而此时新的master(主机)上并没有锁的信息,那么其他线程就可以获取锁,又会引发安全问题

  • 怎么解决?

    • Redisson提出来了MutiLock锁,使用这把锁的话,那我们就不用主从了,每个节点的地位都是一样的,都可以当做是主机.
    • 那我们就需要将加锁的逻辑写入到每一个主从节点上,只有所有的服务器都写入成功,此时才是加锁成功.
    • 假设现在某个节点挂了,那么他去获取锁的时候,只要有一个节点拿不到,都不能算是加锁成功,就保证了加锁的可靠性

7.5 小结

  1. 不可重入Redis分布式锁

    • 原理:利用SETNX的互斥性,利用EX避免死锁,释放锁时判断线程标示避免误删
    • 缺陷:不可重入,无法重试,锁超时失效
  2. 可重入Redis分布式锁

    • 原理:利用Hash结构,记录线程标识和重入次数,利用WatchDog延期锁时间,利用信号量控制锁重试等待
    • 缺点:若Redis宕机会引起锁失效
  3. Redisson的multiLock

    • 原理:多个独立的Redis节点,必须在所有节点都获取重入锁时才能获取锁成功

8. 秒杀优化

8.1 异步秒杀思路

  • 先回顾下我们实现的下单流程

    • 当用户发起请求,会先请求Nginx,Nginx反向代理到Tomcat,而Tomcat中的程序会串行操作,分为以下几个步骤
      1. 查询优惠券
      2. 判断优惠券库存是否足够
      3. 查询订单
      4. 校验是否一人一单
      5. 扣减订单
      6. 创建订单
    • 六个步骤中,很多操作是要去操作数据库的,而且是一个线程串行执行,导致程序执行的很慢
    • 所以我们需要异步程序执行
  • 优化方案

    • 我们将耗时较短的判断逻辑放到Redis中,例如:库存是否充足,是否一人一单这种操作,只有满足这两条操作,那么一定会下单成功
    • 之后不用等数据真的写进数据库,我们就可以直接告诉用户下单成功,然后后台再开一个线程
    • 后台线程慢慢的去执行队列中的消息,这样我们很快就可以完成下单业务
      alt 异步秒杀流程图
      alt 异步秒杀流程图
  • 这里存在两个难点

    1. 我们怎么在Redis中快速校验是否一人一单,还有库存判断
    2. 我们校验一人一单和将下单数据写入数据库,这是两个线程,我们怎么知道下单是否完成了
      • 我们需要将一些信息返回给前端,同时也将这些信息丢到异步queue中,后去操作中,可以通过这个id来查询下单逻辑是否完成

整体思路

这里就是将Redis当成消息队列了嘛
有啥东西判断完了都往里面存 然后向前端返回信息,同时再去异步操作数据库😅

  • 当用户下单之后,判断库存是否充足,只需要去Redis中根据key找对应的value是否大于0

  • 如果不充足直接结束

  • 如果充足,则在Redis中判断用户是否可以下单(也就是是否下过单)

  • 如果set集合中没有找到用户的下单数据,则可以下单,并将userId和优惠券存入Redis中,返回0

  • 整个过程需要保证原子性,所以我们需要用Lua去操作,同时由于我们需要在Redis中查询优惠券信息,所以 我们新增秒杀优惠券的同时,需要将优惠券信息保存到Redis中

  • 完成以上逻辑判断时,我们只需要判断当前Redis中返回值是否为0 如果为0则表示可以下单,将消息保存到queue中,然后返回到Tomcat,开一个线程去异步下单,同时直接将订单信息返回给前端

    alt 异步秒杀思路图
    alt 异步秒杀思路图

8.2 异步秒杀资格判断

  • 需求:
    1. 新增秒杀优惠券的同时,将优惠券信息保存到Redis中
    2. 基于Lua脚本,判断秒杀库存,一人一单,决定用户是否抢购成功
    3. 如果抢购成功,将优惠券id和用户id封装后存入阻塞队列
    4. 开启线程,不断从阻塞队列中获取信息,实现异步下单
  1. 修改保存优惠券相关代码
    新增添加优惠券库存数量到Redis的代码
1
stringRedisTemplate.opsForValue().set(SECKILL_STOCK_KEY + voucher.getId(),voucher.getStock().toString());
  • 添加成功后,数据库中和Redis中都能看到优惠券信息
  1. 编写Lua脚本
  • Lua的字符串拼接使用..,字符串转数字的是tonumber()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
-- 订单id
local voucherId = ARGV[1]

-- 用户id
local userId = ARGV[2]

-- 优惠券key
local stockKey = 'seckill:stock' .. voucherId

-- 订单key
local orderKey = 'seckill:order' .. voucherId

-- 判断库存是否充足
-- 库存不足 返回1
if (tonumber(redis.call('get',stockKey)) <= 0) then
return 1
end

-- 判断用户是否下单
-- sismember判断一个元素是否在集合当中
if (redis.call('sismember', orderKey, userId) == 1) then
return 2
end

--扣减库存
redis.call('incrby',stockKey,-1)

-- 将userId存入当前优惠券的set集合
-- sadd 向set集合中添加元素
redis.call('sadd',orderKey,userId)
return 0

  1. 修改逻辑业务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
//消息队列的脚本的脚本
public static final DefaultRedisScript<Long> SECKILL_SCRIPT;
static{
SECKILL_SCRIPT = new DefaultRedisScript<>();
SECKILL_SCRIPT.setLocation(new ClassPathResource("LuaScript/seckill.lua"));
SECKILL_SCRIPT.setResultType(Long.class);
}

@Override
public Result seckillVoucher(Long voucherId) {
//1. 执行lua脚本
Long result = stringRedisTemplate.execute(SECKILL_SCRIPT,
Collections.emptyList(), voucherId.toString(),
UserHolder.getUser().getId().toString());
//2. 判断返回值,并返回错误信息
if (result.intValue() != 0) {
return Result.fail(result.intValue() == 1 ? "库存不足" : "不能重复下单");
}
long orderId = redisIdWorker.nextId("order");
//TODO 保存阻塞队列

//3. 返回订单id
return Result.ok(orderId);
}

最后使用JMeter进行测验 发现吞吐量接近两千😊

8.3 基于阻塞队列实现秒杀优化

什么是阻塞队列(BlockingQueue)?

阻塞队列(BlockingQueue)是JUC包中的一个工具 来自于java.util.concurrent
阻塞队列就是一个线程安全的队列,当你往队列里面塞或者拿东西的时候,就会自动等待

操作 没位置/没东西时 行为
放元素 (put) 队列满了 阻塞等待
取元素 (take) 队列空了 阻塞等待
  • 核心接口:BlockingQueue
    常用实现类
队列 特点 适用场景
ArrayBlockingQueue 有界、数组实现 控制队列大小
LinkedBlockingQueue 有界/无界(默认 Integer.MAX_VALUE) 最常用
PriorityBlockingQueue 优先级队列 按优先级消费
SynchronousQueue 容量 0! 生产者消费者直接交接

目前我们已经实现了用户下单-> 将下单信息存入Redis ->返回下单信息到前端给用户
但是还没有实现将下单信息存入数据库的操作
这就需要我们另外创建一个后台消费者线程,通过阻塞队列从 Redis 中获取订单信息,实现异步落库操作。
如果判断为0 也就是可以下单 我们将下单的逻辑保存到队列当中,然后去异步执行

  1. 创建阻塞队列
1
private final BlockingQueue<VoucherOrder> orderTasks = new ArrayBlockingQueue<>(1024 * 1024);
  1. 将订单信息封装到voucherOrder中,然后将其加入到阻塞队列
1
2
3
4
5
6
7
8
long orderId = redisIdWorker.nextId("order");
//封装到voucherOrder中
VoucherOrder voucherOrder = new VoucherOrder();
voucherOrder.setVoucherId(voucherId);
voucherOrder.setUserId(UserHolder.getUser().getId());
voucherOrder.setId(orderId);
//加入到阻塞队列
orderTasks.add(voucherOrder);
  1. 创建一个线程池来实现异步下单

    • 因为只需要另外开一个线程去处理下单 所以这里用newSingleThreadExecutor
    1
    2
     //创建一个单线程
    private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();
  2. 创建线程任务,秒杀业务需要在类初始化之后,就立即执行,所以这里需要用到@PostConstruct注解
    让下单逻辑去实现Runnable
    这里又使用了handleVoucherOrder来为订单加上分布式锁,其实可以删除,因为在Lua脚本中已经分布式的完成了判断业务

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
     //@PostConstruct注解:在类刚初始化的时候执行被注解的命令
    @PostConstruct
    //创建初始方法,将阻塞队列里的判断逻辑移到另外一个线程
    private void init() {
    SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
    }


    //为下单逻辑增加分布式锁(这里在Lua脚本中已经分布式的完成了判断业务,所以无需再加分布式锁,这里可以删除)
    private void handleVoucherOrder(VoucherOrder voucherOrder) {
    //1. 获取用户
    Long userId = voucherOrder.getUserId();
    //2. 创建锁对象,作为兜底方案
    RLock redisLock = redissonClient.getLock("order:" + userId);
    //3. 获取锁
    boolean isLock = redisLock.tryLock();
    //4. 判断是否获取锁成功
    if (!isLock) {
    log.error("不允许重复下单!");
    return;
    }
    try {
    //5. 使用代理对象,由于这里是另外一个线程,
    proxy.createVoucherOrder(voucherOrder);
    } finally {
    redisLock.unlock();
    }
    }

    //下单逻辑,在另外一个线程异步执行
    private class VoucherOrderHandler implements Runnable {
    @Override
    public void run() {
    while (true) {
    try {
    //1. 获取队列中的订单信息
    // take方法为阻塞方法,如果没获取元素则阻塞,所以这里可以用死循环
    VoucherOrder voucherOrder = orderTasks.take();
    //2. 创建订单
    handleVoucherOrder(voucherOrder);
    } catch (Exception e) {
    log.error("订单处理异常", e);
    }
    }
    }
    }

  3. 在主线程获取代理对象proxy,然后在handleVoucherOrder方法中获取来实现分布式锁

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
        @Override
    public Result seckillVoucher(Long voucherId) {
    //1. 执行Lua脚本判断订单中用户是否有购买资格 并且对redis中的库存进行操作
    Long result = stringRedisTemplate.execute(SECKILL_SCRIPT,
    Collections.emptyList(), voucherId.toString(),
    UserHolder.getUser().getId().toString());

    //2. 判断结果是否为0
    if (result.intValue() != 0) {
    //2.1 不为0 则代表着用户没有购买资格
    return Result.fail(result.intValue() == 1 ? "库存不足" : "不能重复下单");
    }

    //2.2 为0 有购买资格,生成订单ID
    long orderId = redisIdWorker.nextId("order");

    //3. 将订单信息封装到voucherOrder中
    VoucherOrder voucherOrder = new VoucherOrder();
    voucherOrder.setVoucherId(voucherId);
    voucherOrder.setUserId(UserHolder.getUser().getId());
    voucherOrder.setId(orderId);

    //4. 加入到阻塞队列
    orderTasks.add(voucherOrder);

    //5. 主线程获取代理对象
    proxy = (IVoucherOrderService) AopContext.currentProxy();
    return Result.ok(orderId);
    }

黑马讲错或者遗漏的坑😅

  1. 在将订单信息添加到阻塞队列时,需要使用put()而不是add()
1
2
3
4
5
// 错误!队列满时抛异常
orderTasks.add(voucherOrder);

// 正确!队列满时阻塞等待
orderTasks.put(voucherOrder);

add() 是 Queue 的方法,满时抛 IllegalStateException
put() 是 BlockingQueue 的方法,满时阻塞等待

  • 最终代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
package com.hmdp.service.impl;

import com.hmdp.dto.Result;
import com.hmdp.entity.VoucherOrder;
import com.hmdp.mapper.VoucherOrderMapper;
import com.hmdp.service.ISeckillVoucherService;
import com.hmdp.service.IVoucherOrderService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.hmdp.utils.RedisIdWorker;
import com.hmdp.utils.UserHolder;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.aop.framework.AopContext;
import org.springframework.core.io.ClassPathResource;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.Collections;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

@Service
@Slf4j
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {

@Resource
private ISeckillVoucherService seckillVoucherService;
@Resource
private RedisIdWorker redisIdWorker;
@Resource
private StringRedisTemplate stringRedisTemplate;
@Resource
private RedissonClient redissonClient;

private IVoucherOrderService proxy;


//引用Lua脚本
private static final DefaultRedisScript<Long> SECKILL_SCRIPT;
static {
SECKILL_SCRIPT = new DefaultRedisScript<>();
SECKILL_SCRIPT.setLocation(new ClassPathResource("/LuaScript/seckill.lua"));
SECKILL_SCRIPT.setResultType(Long.class);
}

//创建单线程 在这个线程异步执行下单逻辑
private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();

//@PostConstruct注解:在类刚初始化的时候执行被注解的命令
@PostConstruct
//创建初始方法,将阻塞队列里的判断逻辑移到另外一个线程
private void init() {
SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
}

//创建阻塞队列
private final BlockingQueue<VoucherOrder> orderTasks = new ArrayBlockingQueue<>(1024 * 1024);

//为下单逻辑增加分布式锁(这里在Lua脚本中已经分布式的完成了判断业务,所以无需再加分布式锁,这里可以删除)
private void handleVoucherOrder(VoucherOrder voucherOrder) {
//1. 获取用户
Long userId = voucherOrder.getUserId();
//2. 创建锁对象,作为兜底方案
RLock redisLock = redissonClient.getLock("order:" + userId);
//3. 获取锁
boolean isLock = redisLock.tryLock();
//4. 判断是否获取锁成功
if (!isLock) {
log.error("不允许重复下单!");
return;
}
try {
//5. 使用代理对象,由于这里是另外一个线程,
proxy.createVoucherOrder(voucherOrder);
} finally {
redisLock.unlock();
}
}


//下单逻辑,之后塞进阻塞队列由另外一个线程执行
private class VoucherOrderHandler implements Runnable {
@Override
public void run() {
while (true) {
try {
//1. 获取队列中的订单信息
// take方法为阻塞方法,如果没获取元素则阻塞,所以这里可以用死循环
VoucherOrder voucherOrder = orderTasks.take();
//2. 创建订单
handleVoucherOrder(voucherOrder);
} catch (Exception e) {
log.error("订单处理异常", e);
}
}
}
}


@Override
public Result seckillVoucher(Long voucherId) throws InterruptedException {
//1. 执行Lua脚本判断订单中用户是否有购买资格 并且对redis中的库存进行操作
Long result = stringRedisTemplate.execute(SECKILL_SCRIPT,
Collections.emptyList(), voucherId.toString(),
UserHolder.getUser().getId().toString());

//2. 判断结果是否为0
if (result.intValue() != 0) {
//2.1 不为0 则代表着用户没有购买资格
return Result.fail(result.intValue() == 1 ? "库存不足" : "不能重复下单");
}

//2.2 为0 有购买资格,生成订单ID
long orderId = redisIdWorker.nextId("order");

//3. 将订单信息封装到voucherOrder中
VoucherOrder voucherOrder = new VoucherOrder();
voucherOrder.setVoucherId(voucherId);
voucherOrder.setUserId(UserHolder.getUser().getId());
voucherOrder.setId(orderId);

//4. 加入到阻塞队列
orderTasks.put(voucherOrder);

//5. 主线程获取代理对象
proxy = (IVoucherOrderService) AopContext.currentProxy();
return Result.ok(orderId);
}


@Transactional
public void createVoucherOrder(VoucherOrder voucherOrder) {
// 一人一单逻辑
Long userId = voucherOrder.getUserId();
Long voucherId = voucherOrder.getVoucherId();
synchronized (userId.toString().intern()) {
int count = query()
.eq("voucher_id", voucherId)
.eq("user_id", userId)
.count();
if (count > 0) {
log.error("你已经抢过优惠券了哦");
return;
}
//5. 扣减库存
boolean success = seckillVoucherService.update()
.setSql("stock = stock - 1")
.eq("voucher_id", voucherId)
.gt("stock", 0)
.update();
if (!success) {
log.error("库存不足");
return;
}
//7. 将订单数据保存到表中
save(voucherOrder);
}
}
}

8.4 小结

  • 秒杀业务的优化思路是什么?
  1. 先利用Redis完成库存容量、一人一单的判断,完成抢单业务
  2. 再将下单业务放入阻塞队列,利用独立线程异步下单
  • 基于阻塞队列的异步秒杀存在哪些问题?
  1. 内存限制问题:
    我们现在使用的是JDK里的阻塞队列,它使用的是JVM的内存,如果在高并发的条件下,无数的订单都会放在阻塞队列里,可能就会造成内存溢出,所以我们在创建阻塞队列时,设置了一个长度,但是如果真的存满了,再有新的订单来往里塞,那就塞不进去了,存在内存限制问题
  2. 数据安全问题:
    经典服务器宕机了,用户明明下单了,但是数据库里没看到

9. Redis消息队列

9.1 认识消息队列

  • 什么是消息队列(Message Queue)?
    字面意思上就是存放消息的队列,最简单的消息队列包括三个角色

    1. 消息队列:存储和管理消息,也被称为消息代理(Message Broker)
    2. 生产者:发送消息到消息队列。
    3. 消费者:从消息队列获取消息并处理消息。
  • 使用队列的好处在于解耦

    • 举个例子:快递员(生产者)把快递放到驿站(Message Queue)里,我们(消费者)从驿站去拿快递,这就是一个异步。
    • 如果耦合,也就是没有驿站,那么快递员必须亲自上楼把快递送到到你手上,服务当然好,但是如果我们不在家的话,快递员就要一直等,浪费了快递员的时间,
    • 所以解耦是非常有必要的。
  • 那么在这种场景下我们的秒杀就变成了:

    • 在我们下单之后,利用Redis去校验下单的结果,然后再通过队列把消息发送出去,然后再启动一个线程去拿到这个消息,完成解耦,同时加快了我们的响应速度。
  • Redis提供了三种不同的方式来实现消息队列:

    1. list结构:基于List结构模拟消息队列
    2. PubSub:基本的点对点消息模型
    3. Stream:比较完善的消息队列模型
  • 这里我们可以直接使用现成的MQ消息队列,例如kafka,rocketMQ,这里主要是学习Redis,所以使用Redis提供的MQ方案。

9.2 基于List实现消息队列

Redis的list数据结构是一个双向链表,很容易模拟出队列的效果。

队列的入口和出口不在通一遍,所以我们可以利用:LPUSH结合PROP,或者RPUSH结合LPOP来实现 (这里L为Left,R为Right)

要注意的是:当队列没有消息时,RPOP和LPOP操作会返回NULL,而不是JVM阻塞队列那样会阻塞,并等待消息,所以我们这里应该使用BRPOP或者BLPOP来实现阻塞效果。(B代表阻塞)

  • 基于List的消息队列优缺点:
    • 优点
      1. 利用Redis存储,不受JVM内存上限
      2. 基于Redis的持久化机制,数据安全性有保障
      3. 可以满足消息有序性
    • 缺点
      1. 无法避免消息丢失(经典服务器宕机)
      2. 只支持单消费者(一个消费者把消息拿走了,其他消费者就看不到这条消息)

9.3 基于PubSub的消息队列

  • PubSub(发布订阅)是Redis2.0版本进入的消息传递模型

  • 消费者可以订阅一个或者多个channel,生产者向对应channel发送消息后,所有订阅者都能收到相关消息。

  • SUBSCRIBE channel [channel]:订阅一个或者多个频道。

  • PUBLISH channel msg:向一个频道发送消息。

  • PSUBSCRIBE pattern [pattern]:订阅与pattern(通配符)格式匹配的所有频道

alt 基于PubSub的消息队列
alt 基于PubSub的消息队列

  • 基于PubSub的消息队列优缺点:
    • 优点
      1. 采用发布订阅模型,支持多生产,多消费
    • 缺点
      1. 不支持数据持久化
      2. 无法避免消息丢失(如果向频道发送了消息,却没人订阅该频道,那发送的这条消息就丢失了)
      3. 消息堆积有上线,超出时数据丢失(消费者拿到数据的时候处理的太慢,而发送消息太快)

9.4 基于Stream的消息队列

  • Stream是Redis5.0引用的一种新的数据类型,可以实现一个功能非常完善的消息队列

  • 因为是数据类型 所以支持持久化

  • 发送消息的命令:XADD

    1
    XADD key [NOMKSTREAM] [MAXLEN|MINID [=!~] threshold [LIMIT count]] *|ID field value [field value ...]
    • [NOMKSYREAM]
      • 如果队列不存在,是否自动创建队列,默认是自动创建
    • [MAXLEN|MINID [=!~] threshold [LIMIT count]]
      • 设置消息队列的最大消息数量, 不设置则无上限
    • *|ID
      • 消息的唯一id,*代表由Redis自动生成
      • 格式是”时间戳-递增数字”例如1145141919810-0
    • field value [field value ...]
      • 发送到队列的消息,称为Entry,格式就是多个KV键值对
    • 举例:
1
2
## 创建名为user的队列,并向其中发送一个信息,消息是{name=yin_bo_,age=20},并使用Redis自动生成ID
XADD users * name yin_bo_ age 20
  • 读取消息的命令:XREAD
    1
    XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]
  • [COUNT count]
  • 每次读取消息的最大数量
  • [BLOCK milliseconds]
  • 当没有消息是否阻塞,阻塞时长
  • ID[ID...]
  • 起始ID,只返回大于该ID的消息
    • 0:表示从第一个消息开始
    • $:表示从最新的消息开始
  • 举例:使用XREAD读取第一个消息
1
2
3
4
5
6
7
127.0.0.1:6379> XREAD COUNT 1 STREAMS users 0
1) 1) "users"
1) 1) 1) "1762681812034-0"
1) 1) "name"
1) "yin_bo_"
2) "age"
3) "20"
  • 在业务监听时,我们可以用循环调用的XREAD阻塞方式来查询最新消息,从而实现持续监听队列的效果,伪代码如下:

  • 1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    while (true){
    //尝试读取队列中的消息,最多阻塞2秒
    Object msg = redis.execute("XREAD COUNT 1 BLOCK 2000 STREAMS users $");
    //没读取到,跳过下面的逻辑
    if(msg == null){
    continue;
    }
    //处理消息
    handleMessage(msg);
    }

    注意:当我们指定ID为$时,代表只能读取到最新消息,如果当我们在处理一条消息的过程中,又有超过一条以上的消息到达队列,那么下次获取的时候,也只能获取一条,会出现漏读消息的问题。

  • STREAM类型消息队列的XREAD命令特点

    1. 消息可回溯
    2. 一个消息可以被多个消费者读取
    3. 可以阻塞读取
    4. 由漏读消息的风险

9.5 基于Stream的消息队列—-消费者组

  • 消费者组(Consumer Group):将多个消费者划分到一个组内,同时监听一个队列,具备以下特点:

    1. 消息分流
      • 队列中的消息会分流给组内不同消费者,而不是重复的消费者,从而加快消息处理的速度。
    2. 消息标识
      • 消费者会维护一个标识,记录最后一个被处理的消息,哪怕消费者宕机重启,还会从标识之后读取消息,确保每一个消息都会被消费。
    3. 消息确认
    • 消费者获取消息后,消息处于Pending状态,并存入一个pending-list,当处理完成后,需要通过XACK来确认消息,标记消息为已处理,才会从pending-list中移除
  • 创建消费者组

    1
    XGROUP CREATE key groupName ID [MKSTREAM]
    • key
      • 队列名称
    • groupName
      • 消费者组名称
    • ID
      • 起始ID标识,$表示队列中的最后一个消息,0表示队列中的第一个消息。
    • MKSTREAM(MK就是make的意思)
      • 队列不存在时自动创建队列
  • 其他常见命令

    • 删除指定的消费者组

      1
      XGROUP DESTORY key groupName
    • 给指定的消费者组添加消费者

      1
      XGROUP CREATECONSUMER key groupName consumerName
    • 删除消费者组中的指定消费者

      1
      XGROUP DELCONSUMER key groupName consumerName
  • 从消费者组里读取消息

    1
    XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOCACK] STREAMS key [keys ...] ID [ID ...]
    • group
      • 消费者组名称
    • consumer
      • 消费者名,如果消费者不存在,会自动创建一个消费者
    • count
      • 本次查询的最大数量
    • BLOCK milliseconds
      • 当前没有消息时的最大等待时间
    • NOACK
      • 无需手动ACK,ACK是确认的意思,获取到消息后自动确认(一般不用,我们都是手动确认)
    • STREAMs key
      • 指定队列名
    • ID
      • 获取消息的起始ID
    • >:从下一个未消费的消息队列开始(pending-list)
    • 其他:根据指定id从pending-list中获取已消费但未确认的消息,例如0,是从pending-list中的第一个消息开始
  • 消费者监听消息的伪代码基本思路

    • 如果读取到消息队列里的消息了 一定要做ACK处理!!!
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      while(true){
      // 尝试监听队列,使用阻塞模式,最大等待时长为2000ms
      Object msg = redis.call("XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS s1 >")
      if(msg == null){
      // 没监听到消息,重试
      continue;
      }
      try{
      //处理消息,完成后要手动确认ACK,ACK代码在handleMessage中编写
      handleMessage(msg);
      } catch(Exception e){
      while(true){
      //0表示从pending-list中的第一个消息开始,如果前面都ACK了,那么这里就不会监听到消息
      Object msg = redis.call("XREADGROUP GROUP g1 c1 COUNT 1 STREAMS s1 0");
      if(msg == null){
      //null表示没有异常消息,所有消息均已确认,结束循环
      break;
      }
      try{
      //说明有异常消息,再次处理
      handleMessage(msg);
      } catch(Exception e){
      //再次出现异常,记录日志,继续循环
      log.error("..");
      continue;
      }
      }
      }
      }
  • STREAM类型消息队列的XREADGROUP命令的特点

    1. 消息可回溯
    2. 可以多消费者争抢消息,加快消费速度
    3. 可以阻塞读取
    4. 没有消息漏读风险
    5. 有消息确认机制,保证消息至少被消费一次
List PubSub Stream
消息持久化 支持 不支持 支持
阻塞读取 支持 支持 支持
消息堆积处理 受限于内存空间,
可以利用多消费者加快处理
受限于消费者缓冲区 受限于队列长度,
可以利用消费者组提高消费速度,减少堆积
消息确认机制 不支持 不支持 支持
消息回溯 不支持 不支持 支持

9.6 Stream消息队列实现异步秒杀下单

  1. 创建一个Stream类型的消息队列 名为stream.orders

    • 组的名称为g1
    1
    XGROUP CREATE stream.orders g1 0 MKSTREAM
  2. 修改之前秒杀下单的Lua脚本,在认定有抢购资格后,直接向stream.orders中添加消息,内容包括voucherId,userId,orderId

    • 新增加一个orderId的参数,并且在最后添加返回信息到stream队列的redis指令
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    -- 优惠券id
    local voucherId = ARGV[1]
    -- 用户id
    local userId = ARGV[2]
    -- 订单id
    local orderId = ARGV[3]


    -- 优惠券key
    local stockKey = 'seckill:stock:' .. voucherId

    -- 订单key
    local orderKey = 'seckill:order:' .. voucherId


    -- 判断库存是否充足
    if (tonumber(redis.call('get', stockKey)) <= 0) then
    return 1
    end
    -- 判断用户是否下单
    if (redis.call('sismember', orderKey, userId) == 1) then
    return 2
    end
    -- 扣减库存
    redis.call('incrby', stockKey, -1)
    -- 将userId存入当前优惠券的set集合
    redis.call('sadd', orderKey, userId)
    -- 发送消息到stream队列当中,XADD stream.orders * k1 v1 k2 v2....
    -- 在VoucherOrder中订单的id就叫id 所以这里将订单ID命名为id
    redis.call('xadd','stream.orders','*','userId',userId,'voucherId',voucherId,'id',orderId)
    return 0
  3. 修改秒杀逻辑

    • 之前的逻辑是将订单信息放到JUC的阻塞队列当中
    • 现在的逻辑是在Lua脚本中做完判断之后直接塞进redis的stream队列当中
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@Override
public Result seckillVoucher(Long voucherId) {
//1. 执行Lua脚本判断订单中用户是否有购买资格 并且对redis中的库存进行操作
Long result = stringRedisTemplate.execute(SECKILL_SCRIPT,
Collections.emptyList(),
voucherId.toString(),
UserHolder.getUser().getId().toString()
);

//2. 判断结果是否为0
if (result.intValue() != 0) {
//2.1 不为0 则代表着用户没有购买资格
return Result.fail(result.intValue() == 1 ? "库存不足" : "不能重复下单");
}
//2.2 为0 有购买资格,生成订单ID
long orderId = redisIdWorker.nextId("order");

//3. 将订单信息封装到voucherOrder中
VoucherOrder voucherOrder = new VoucherOrder();
voucherOrder.setVoucherId(voucherId);
voucherOrder.setUserId(UserHolder.getUser().getId());
voucherOrder.setId(orderId);

//4. 加入到阻塞队列
orderTasks.add(voucherOrder);

//5. 主线程获取代理对象
proxy = (IVoucherOrderService) AopContext.currentProxy();
return Result.ok(orderId);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@Override
public Result seckillVoucher(Long voucherId) {
//1. 获取用户ID
Long userId = UserHolder.getUser().getId();

//2. 获取订单id
long orderId = redisIdWorker.nextId("order");

//3. 执行Lua脚本判断订单中用户是否有购买资格 并且对redis中的库存进行操作
Long result = stringRedisTemplate.execute(
SECKILL_SCRIPT,
Collections.emptyList(),
voucherId.toString(),
userId.toString(),
String.valueOf(orderId)
);

//4. 判断结果是否为0 如果为O,Lua脚本会将订单信息返回给stream队列
if (result.intValue() != 0) {
//4.1 不为0 则代表着用户没有购买资格
return Result.fail(result.intValue() == 1 ? "库存不足" : "不能重复下单");
}

//5. 主线程获取代理对象
proxy = (IVoucherOrderService) AopContext.currentProxy();
return Result.ok(orderId);
}
  1. 在项目启动时开启一个线程来不断尝试获取stream.orders中的消息,完成下单
    • 第一次读取队列中g1的c1,并且是从未消费的消息中获取
    • 如果没有获取到,就从pending-list中所有没有ack的消息中获取
    • 如果还是没有获取到,就一直获取,直到消息进入pending-list
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    //不再是获取阻塞队列的消息,而是获取stream消息队列的消息
    String queueName = "stream.orders";

    private class VoucherOrderHandler implements Runnable {

    @Override
    public void run() {
    while (true) {
    try {
    //1. 获取队列中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS stream.orders >
    List<MapRecord<String, Object, Object>> records = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),
    StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),
    //ReadOffset.lastConsumed()底层就是 '>'
    StreamOffset.create(queueName, ReadOffset.lastConsumed()));
    //2. 判断消息是否获取成功
    if (records == null || records.isEmpty()) {
    continue;
    }
    //3. 消息获取成功之后,我们需要将其转为对象
    MapRecord<String, Object, Object> record = records.get(0);
    Map<Object, Object> values = record.getValue();
    VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);
    //4. 获取成功,执行下单逻辑,将数据保存到数据库中
    handleVoucherOrder(voucherOrder);
    //5. 手动ACK,SACK stream.orders g1 id
    stringRedisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId());
    } catch (Exception e) {
    log.error("订单处理异常", e);
    //订单异常的处理方式我们封装成一个函数,避免代码太臃肿
    handlePendingList();
    }
    }
    }
    }

    private void handlePendingList() {
    while (true) {
    try {
    //1. 获取pending-list中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS stream.orders 0
    List<MapRecord<String, Object, Object>> records = stringRedisTemplate.opsForStream().read(
    Consumer.from("g1", "c1"),
    StreamReadOptions.empty().count(1),
    StreamOffset.create(queueName, ReadOffset.from("0")));
    //2. 判断pending-list中是否有未处理消息
    if (records == null || records.isEmpty()) {
    //如果没有就说明没有异常消息,直接结束循环
    break;
    }
    //3. 消息获取成功之后,我们需要将其转为对象
    MapRecord<String, Object, Object> record = records.get(0);
    Map<Object, Object> values = record.getValue();
    VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);
    //4. 获取成功,执行下单逻辑,将数据保存到数据库中
    handleVoucherOrder(voucherOrder);
    //5. 手动ACK,SACK stream.orders g1 id
    stringRedisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId());
    } catch (Exception e) {
    log.info("处理pending-list异常");
    //如果怕异常多次出现,可以在这里休眠一会儿
    try {
    Thread.sleep(50);
    } catch (InterruptedException ex) {
    throw new RuntimeException(ex);
    }
    }
    }
    }

10. 达人探店

10.1 发布探店笔记

探店笔记对应的表有两个:

  • tb_blog包含笔记中的标题、文字、图片等
  • tb_blog_comments其他用户对探店笔记的评价

对应Java的实体类如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
@Data
@EqualsAndHashCode(callSuper = false)
@Accessors(chain = true)
@TableName("tb_blog")
public class Blog implements Serializable {

private static final long serialVersionUID = 1L;

/**
* 主键
*/
@TableId(value = "id", type = IdType.AUTO)
private Long id;
/**
* 商户id
*/
private Long shopId;
/**
* 用户id
*/
private Long userId;
/**
* 用户图标
*/
@TableField(exist = false)
private String icon;
/**
* 用户姓名
*/
@TableField(exist = false)
private String name;
/**
* 是否点赞过了
*/
@TableField(exist = false)
private Boolean isLike;

/**
* 标题
*/
private String title;

/**
* 探店的照片,最多9张,多张以","隔开
*/
private String images;

/**
* 探店的文字描述
*/
private String content;

/**
* 点赞数量
*/
private Integer liked;

/**
* 评论数量
*/
private Integer comments;

/**
* 创建时间
*/
private LocalDateTime createTime;

/**
* 更新时间
*/
private LocalDateTime updateTime;
}
  • 效果图如下

    alt 发布探店笔记效果图
    alt 发布探店笔记效果图

  • 保存笔记对应的代码

1
2
3
4
5
6
7
8
9
10
@PostMapping
public Result saveBlog(@RequestBody Blog blog) {
// 获取登录用户
UserDTO user = UserHolder.getUser();
blog.setUserId(user.getId());
// 保存探店博文
blogService.save(blog);
// 返回id
return Result.ok(blog.getId());
}
  • 上传图片的代码:

    注意:这里我们需要修改SystemConstants.IMAGE_UPLOAD_DIR为自己图片保存的地址,在实际开发中图片一般会放在nginx或者云存储上

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@PostMapping("blog")
public Result uploadImage(@RequestParam("file") MultipartFile image) {
try {
// 获取原始文件名称
String originalFilename = image.getOriginalFilename();
// 生成新文件名
String fileName = createNewFileName(originalFilename);
// 保存文件
image.transferTo(new File(SystemConstants.IMAGE_UPLOAD_DIR, fileName));
// 返回结果
log.debug("文件上传成功,{}", fileName);
return Result.ok(fileName);
} catch (IOException e) {
throw new RuntimeException("文件上传失败", e);
}
}

10.2 查看探店笔记

  • 需求:点击首页的探店笔记,会进入详细页面,我们现在需要实现页面的查询接口。

  • 随便点击一张图片,查看发送的请求

    请求网址: http://localhost:8080/api/blog/6
    请求路径: /blog/{id}
    请求方法: GET

BlogController下的方法,请求方式为GET,那我们直接来编写对应的方法
在Service类中创建对应方法之后,在Impl类中实现,我们查看用户探店笔记的时候,需要额外设置用户名和其头像,由于设置用户信息这个操作比较通用,所以这里封装成了一个方法

1
2
3
4
5
6
7
8
9
10
@GetMapping("/hot")
public Result queryHotBlog(@RequestParam(value = "current", defaultValue = "1") Integer current) {
return blogService.queryHotBlog(current);
}


@GetMapping("/{id}")
public Result queryBlogById(@PathVariable("id") Long id){
return blogService.queryBlogById(id);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
 @Service
public class BlogServiceImpl extends ServiceImpl<BlogMapper, Blog> implements IBlogService {


@Resource
private IUserService userService;


@Override
public Result queryHotBlog(Integer current) {
// 根据用户查询
Page<Blog> page = query()
.orderByDesc("liked")
.page(new Page<>(current, SystemConstants.MAX_PAGE_SIZE));
// 获取当前页数据
List<Blog> records = page.getRecords();
// 查询用户
records.forEach(this::queryBlogUser);
return Result.ok(records);
}

@Override
public Result queryBlogById(Long id) {
//1. 查询blog
Blog blog = getById(id);
if (blog == null) {
return Result.fail("blog不存在!");
}
//2. 查询blog有关的用户
queryBlogUser(blog);
return Result.ok(blog);
}


private void queryBlogUser(Blog blog) {
Long userId = blog.getUserId();
User user = userService.getById(userId); //通过userid获取user用户
blog.setName(user.getNickName());
blog.setIcon(user.getIcon());
}

}

该功能是如何实现的🤨

可以看到我们的实现类只是将name和icon存到blog类里面,然后返回给前端,是如果做到显示blog的所有信息的呢
我们看看表的实体类中有这样的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
* 用户图标
*/
@TableField(exist = false)
private String icon;
/**
* 用户姓名
*/
@TableField(exist = false)
private String name;
/**
* 是否点赞过了
*/
@TableField(exist = false)
private Boolean isLike;

这里MyBatisPlus语法将三个属性都设置为了false
这样这些属性就不会在数据库里存储了😄。
然后我们在功能类中获取这三个属性,把他们存到blog类中
再将完整的blog类返回给前端

功能就是这样实现的!!!

10.3 实现点赞功能

  • 点击点赞按钮,查看发送的请求:

    请求网址: http://localhost:8080/api/blog/like/4
    请求方法: PUT

  • 看样子是BlogController中的like方法,源码如下

    1
    2
    3
    4
    5
    6
    7
    @PutMapping("/like/{id}")
    public Result likeBlog(@PathVariable("id") Long id) {
    // 修改点赞数量
    blogService.update()
    .setSql("liked = liked + 1").eq("id", id).update();
    return Result.ok();
    }
  • 问题分析:这种方式会导致一个用户无限点赞,明显是不合理的。

  • 造成这个问题的原因是:我们现在的逻辑,发起请求只是给数据库+1,所以才会出现这个问题

  • 需求

    1. 同一个用户只能对同一片笔记点赞一次,再次点击则取消点赞。
    2. 如果当前用户已经点赞,则点赞按钮高亮显示(前端已经实现,判断字段Blog类的isLike属性)
  • 实现步骤

    1. 修改点赞功能,利用Redis中的set集合来判断是否点赞过,为点赞则点赞数+1,已点赞则点赞数-1
    2. 根据修改id查询的业务,判断当前登录用户是否点赞过,复制给isLike字段。
    3. 修改分页查询Blog业务,判断当前登录用户是否点赞过,复制给isLike字段。
  • 具体实现

1
2
3
4
@PutMapping("/like/{id}")
public Result likeBlog(@PathVariable("id") Long id) {
return blogService.likeBlog(id);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@Override
public Result likeBlog(Long id) {
//1. 获取当前用户信息
Long userId = UserHolder.getUser().getId();

//2. 如果当前用户为点赞,则点赞数+1,同时将用户加入set集合
String key = BLOG_LIKED_KEY + id;
Boolean isLiked = stringRedisTemplate.opsForSet().isMember(key, userId.toString());
if(BooleanUtil.isFalse(isLiked)){
//点赞数+1
boolean success = update().setSql("liked = liked + 1").eq("id", id).update();
//将用户添加到set集合
if(success){
stringRedisTemplate.opsForSet().add(key, userId.toString());
}
}else{
//点赞数-1
boolean success = update().setSql("liked = liked - 1").eq("id", id).update();
if(success){
//从set集合移除
stringRedisTemplate.opsForSet().remove(key, userId.toString());
}
}
return Result.ok();
}

  • 修改完毕之后,页面上还不能立即显示点赞完毕的后果,我们还需要修改查询Blog业务,判断Blog是否被当前用户点赞过
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
@Override
public Result queryHotBlog(Integer current) {
// 根据用户查询
Page<Blog> page = query()
.orderByDesc("liked")
.page(new Page<>(current, SystemConstants.MAX_PAGE_SIZE));
// 获取当前页数据
List<Blog> records = page.getRecords();
// 查询用户
records.forEach(blog -> {
queryBlogUser(blog);
//追加判断blog是否被用户点赞过,逻辑封装到isBlogLiked方法中
isBlogLiked(blog);
});
return Result.ok(records);
}



@Override
public Result queryBlogById(Long id) {
//1. 查询blog
Blog blog = getById(id);
if (blog == null) {
return Result.fail("blog不存在!");
}
//2. 查询blog有关的用户
queryBlogUser(blog);
//追加判断blog是否被当前用户点赞,逻辑封装到isBlogLiked方法中
isBlogLiked(blog);
return Result.ok(blog);
}

private void isBlogLiked(Blog blog) {
//1. 获取当前用户信息
Long userId = UserHolder.getUser().getId();
//2. 判断当前用户是否点赞
String key = BLOG_LIKED_KEY + blog.getId();
Boolean isMember = stringRedisTemplate.opsForSet().isMember(key, userId.toString());
//3. 如果点赞了,则将isLike设置为true
blog.setIsLike(BooleanUtil.isTrue(isMember));
}

10.4 实现点赞排行榜

  • 当我们点击探店笔记详情页面时,应该按点赞顺序展示点赞用户,比如显示最早点赞的TOP5,形成点赞排行榜,就跟QQ空间发的说说一样,可以看到有哪些人点了赞

  • 之前的点赞是放到Set集合中,但是Set集合又不能排序,所以这个时候,我们就可以改用SortedSet(Zset)

  • 那我们这里顺便就来对比一下这些集合的区别

List Set SortedSet
排序方式 按添加顺序排序 无法排序 根据score值排序
唯一性 不唯一 唯一 唯一
查找方式 按索引查找或首尾查找 根据元素查找 根据元素查找
  • 修改BlogServiceImpl
    由于ZSet没有isMember方法,所以这里只能通过查询score来判断集合中是否有该元素,如果有该元素,则返回值是对应的score,如果没有该元素,则返回值为null
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@Override
public Result likeBlog(Long id) {
//1. 获取当前用户信息
Long userId = UserHolder.getUser().getId();

//2. 如果当前用户为点赞,则点赞数+1,同时将用户加入set集合
String key = BLOG_LIKED_KEY + id;
//尝试获得score
Double score = stringRedisTemplate.opsForZSet().score(key, userId.toString());
//为null 则表示集合中没有该用户
if (score == null) {
//点赞数+1
boolean success = update().setSql("like = like + 1").eq("id", id).update();
//将用户加入Zset集合
if (success) {
stringRedisTemplate.opsForZSet().add(key, userId.toString(), System.currentTimeMillis());
}
}else{
//3. 如果用户已经点赞,则取消点赞,将用户从set集合中移除
boolean success = update().setSql("like = like - 1").eq("id", id).update();
if (success) {
stringRedisTemplate.opsForZSet().remove(key, userId.toString());
}
}
return Result.ok();
}

  • 同时修改isBlogLike方法,在原有逻辑上,判断用户是否登录,登录状态下才会判断用户是否点赞
1
2
3
4
5
6
7
8
9
private void isBlogLiked(Blog blog) {
//1. 获取当前用户信息
Long userId = UserHolder.getUser().getId();
//2. 判断当前用户是否点赞
String key = BLOG_LIKED_KEY + blog.getId();
Double score = stringRedisTemplate.opsForZSet().score(key, userId.toString());
//3. 如果点赞了 score不为零
blog.setIsLike(score != null);
}

这里栈溢出了,为什么🤨

springboot集成的redis版本太老了,在序列化Blog对象时,会疯狂调用isBlogLike方法,导致无限衣柜栈溢出。
2.7.1修复了这个bug,不再乱调用有副作用的getter
所以如果栈溢出了,将spring-data-redis版本改成2.7.1或者以上

  • 那我们继续来完善显示点赞列表功能,查看浏览器请求,这个请求目前应该是404的,因为我们还没有写,他需要一个list返回值,显示top5点赞的用户

请求网址: http://localhost:8080/api/blog/likes/4
请求方法: GET

在Controller层中编写对应的方法,点赞查询列表,具体逻辑写到BlogServiceImpl中

1
2
3
4
@GetMapping("/likes/{id}")
public Result queryBlogLikes(@PathVariable Integer id){
return blogService.queryBlogLikes(id);
}

功能代码如下 主要看排序的方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Override
public Result queryBlogLikes(Integer id) {
String key = BLOG_LIKED_KEY + id;
//zrange key 0 4 查询zset 中前五个元素
Set<String> top5 = stringRedisTemplate.opsForZSet().range(key, 0, 4);
//如果是空的(可能没人点赞),直接返回一个空的集合
if(top5 == null||top5.isEmpty()){
return Result.ok(Collections.emptyList());
}

List<Long> ids = top5.stream().map(Long::valueOf).collect(Collectors.toList());
//将ids使用`,`拼凑,SQL语句查询出来的结果并不是按我们期望的方式排
//所以我们需要order by field来执行排序方式,期望的排序方式就是按照查询出来的id进行排序
String idsStr = StrUtil.join(",",ids);
//select * from tb_user where id in (ids[0],ids[1] ...) order by field (id,ids[0],ids[1]...)
List<UserDTO> userDTOS = userService.query().in("id", ids)
.last("order by field(id," + idsStr + ")")
.list().stream()
.map(user -> BeanUtil.copyProperties(user, UserDTO.class))
.collect(Collectors.toList());
return Result.ok(userDTOS);
}

成果图

alt 点赞排序成果图
alt 点赞排序成果图

11. 好友关注

11.1 实现关注和取关

关注是User之间的关系,是博主与粉丝的关系,数据库中有一张tb_follow表来标示

Field Default Extra Comment
id (NULL) auto_increment 主键
user_id (NULL) 用户id
follow_user_id (NULL) 关联的用户id
create_time CURRENT_TIMESTAMP DEFAULT_GENERATED 创建时间
  • 该表对应的实体类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
@Data
@EqualsAndHashCode(callSuper = false)
@Accessors(chain = true)
@TableName("tb_follow")
public class Follow implements Serializable {

private static final long serialVersionUID = 1L;

/**
* 主键
*/
@TableId(value = "id", type = IdType.AUTO)
private Long id;

/**
* 用户id
*/
private Long userId;

/**
* 关联的用户id
*/
private Long followUserId;

/**
* 创建时间
*/
private LocalDateTime createTime;
}
  • 先在FollowController中新增两个方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@RestController
@RequestMapping("/follow")
public class FollowController {
@Resource
private IFollowService followService;
//判断当前用户是否关注了该博主
@GetMapping("/or/not/{id}")
public Result isFollow(@PathVariable("id") Long followUserId) {
return followService.isFollow(followUserId);
}
//实现取关/关注
@PutMapping("/{id}/{isFollow}")
public Result follow(@PathVariable("id") Long followUserId, @PathVariable("isFollow") Boolean isFellow) {
return followService.follow(followUserId,isFellow);
}
}
  • 再去FollowServiceImpl中去实现
    • 前端返回了isFellow,能直到用户是关注还是取关
      • 关注就是新增follow数据
      • 取关需要删除follow数据,因为我不并不知道这条关注记录的自增id,所以我们只能靠条件(例如user_id)删除,这就需要新建一个QueryWrapper来拼条件
    • 检测当前用户是否关注只需要查询数据库中是否有这条记录,如果有计数返回给前端
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@Override
public Result follow(Long followUserId, Boolean isFellow) {
//1. 获取登录的用户
Long userId = UserHolder.getUser().getId();
//2. 判断到底是关注还是取关
if (isFellow) {
//3. 关注:新增数据
Follow follow = new Follow();
follow.setUserId(userId);
follow.setFollowUserId(followUserId);
save(follow);
} else {
//4. 取关:删除数据 delete from tb_follow where user_id = ? and follow_id = ?
remove(new QueryWrapper<Follow>()
.eq("user_id", userId)
.eq("follow_user_id", followUserId)
);
}
return Result.ok();
}

@Override
public Result isFollow(Long followUserId) {
//1. 获取用户信息
Long userId = UserHolder.getUser().getId();

//2. 查询是否关注 select * from tb_follow where user_id = ? and follow_user_id = ?
Integer count = query().eq("user_id", userId).eq("follow_user_id", followUserId).count();
//3. 判断是否关注
return Result.ok(count > 0);
}

11.2 实现查询用户信息

  1. 实现查看用户信息功能,功能简单,直接在controller层写了
1
2
3
4
5
6
7
8
9
10
11
12
@GetMapping("/{id}")
public Result getInfo(@PathVariable("id") Long userId){
// 查询详情
User user = userService.getById(userId);
if (user == null) {
// 没有详情,应该是第一次查看详情
return Result.ok();
}
UserDTO userDTO = BeanUtil.copyProperties(user, UserDTO.class);
// 返回
return Result.ok(userDTO);
}
  1. 实现查看用户笔记功能,同样直接写在controller层
1
2
3
4
5
6
7
8
9
@GetMapping("/of/user")
public Result queryBlogById(@RequestParam(value = "current", defaultValue = "1") Integer current, @RequestParam("id") Long id) {
//根据用户查询
Page<Blog> page = blogService.query().eq("user_id", id).page(new Page<>(current, SystemConstants.MAX_PAGE_SIZE));

//获取当前页数据
List<Blog> records = page.getRecords();
return Result.ok(records);
}

查看用户笔记为什么这样写?🤨

我第一次看到这个方法的时候就有很多疑惑,
为啥要传current这个参数?page是干啥的?为啥要将用户的blog存到page里面?
偏偏黑马直接跳过了这方面,生怕我学会了属于是😅

这里用到了分页的知识,为什么要分页?如果用户有1000篇博客,不可能直接向前端展示1000篇,会拉低性能。
使用page,告知MyBatisPlus我要将查询到的数据分页了,一页有MAX_PAGE_SIZE个数据。
用户刚开始的current为1,也就是说用户只能看到第一页的数据,如果用户再往下翻,想看第二页的数据,只需要将current变为2即可。
就是常见应用的分页,翻看到一定次数之后会加载新的数据

11.3 实现共同关注

需求:利用Redis中恰当的数据结构,实现共同关注功能,在博主个人页面展示出当前用户与博主的共同关注

  • 实现方式当然是我们之前学过的set集合,在set集合中,有交集并集补集的api,可以把二者关注的人放入到set集合中,然后通过api查询两个set集合的交集

  • 那我们就得先修改我们之前的关注逻辑,在关注博主的同时,需要将数据放到set集合中,方便后期我们实现共同关注,当取消关注时,也需要将数据从set集合中删除

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@Override
public Result follow(Long followUserId, Boolean isFellow) {
//1. 获取登录的用户
Long userId = UserHolder.getUser().getId();
String key = "follows:" + userId;
//2. 判断到底是关注还是取关
if (isFellow) {
//3. 关注:新增数据
Follow follow = new Follow();
follow.setUserId(userId);
follow.setFollowUserId(followUserId);
boolean isSuccess = save(follow);
if (isSuccess) {
//3.1 把关注目标存到Redis中 sadd userId followUserId

stringRedisTemplate.opsForSet().add(key,followUserId.toString());
}
} else {
//4. 取关:删除数据 delete from tb_follow where user_id = ? and follow_id = ?
boolean isSuccess = remove(new QueryWrapper<Follow>()
.eq("user_id", userId)
.eq("follow_user_id", followUserId)
);
//4.1 从redis中移除
if (isSuccess) {
stringRedisTemplate.opsForSet().remove(key,followUserId.toString());
}
}
return Result.ok();
}
  1. 先在Controller层写方法
1
2
3
4
@GetMapping("/common/{id}")
public Result getCommonFollow(@PathVariable("id") Long followUserId) {
return followService.getCommonFollow(followUserId);
}
  1. 在Impl层写业务逻辑
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Override
public Result getCommonFollow(Long id) {
//1. 获取当前用户id
Long userId = UserHolder.getUser().getId();
String key1 = "follows:" + id;
String key2 = "follows:" + userId;
//对当前用户和博主用户的关注列表取交集 使用intersect
Set<String> intersect = stringRedisTemplate.opsForSet().intersect(key1, key2);
//判断是否有共同关注
if (intersect == null||intersect.isEmpty()) {
//无交集,返回空集合
return Result.ok(Collections.emptyList());
}

//有共同关注,将结果转换为List
List<Long> ids = intersect.stream().map(Long::valueOf).collect(Collectors.toList());
//根据ids去查询共同关注用户,封装成UserDTO再返回
List<UserDTO> userDTOS = userService
.listByIds(ids)
.stream()
.map(user -> BeanUtil.copyProperties(user, UserDTO.class))
.collect(Collectors.toList());

return Result.ok(userDTOS);
}

如何将ids封装成userDTOS的?

分为四步:

1
2
3
4
5
List<UserDTO> userDTOS = userService
.listByIds(ids) // ①
.stream() // ②
.map(user -> BeanUtil.copyProperties(user, UserDTO.class)) // ③
.collect(Collectors.toList()); // ④
  1. userService.listByIds(ids)
    其实底层是 MyBatis-Plus 的 QueryWrapper + in 查询
    相当于 SQL:SELECT * FROM tb_user WHERE id IN (1,2,3,4,5…)
    返回的是 List 实体对象(里面有密码、手机号等敏感字段)

  2. .stream()
    把普通 List 变成 Java 8 的 Stream 流,方便后面链式操作

  3. .map(user -> BeanUtil.copyProperties(user, UserDTO.class))
    这才是重点!!!
    把 user 这个对象的所有同名属性复制到一个新建的 UserDTO 实例里

  4. .collect(Collectors.toList())
    把 Stream 重新收集成 List 返回给前端

11.4 Feed流实现方案

  • 当我们关注了用户之后,这个用户发布了动态,那我们应该把这些数据推送给用户,这个需求,我们又称其为Feed流,关注推送也叫作Feed流,直译为投喂,为用户提供沉浸式体验,通过无限下拉刷新获取新的信息,

  • 对于传统的模式内容检索:用户需要主动通过搜索引擎或者是其他方式去查找想看的内容

  • 对于新型Feed流的效果:系统分析用户到底想看什么,然后直接把内容推送给用户,从而使用户能更加节约时间,不用去主动搜素

  • Feed流的实现有两种模式

  1. Timeline:不做内容筛选,简单的按照内容发布时间排序,常用于好友或关注(B站关注的up,朋友圈等)
    • 优点:信息全面,不会有缺失,并且实现也相对简单
    • 缺点:信息噪音较多,用户不一定感兴趣,内容获取效率低
  2. 智能排序:利用智能算法屏蔽掉违规的、用户不感兴趣的内容,推送用户感兴趣的信息来吸引用户
    • 优点:投喂用户感兴趣的信息,用户粘度很高,容易沉迷
    • 缺点:如果算法不精准,可能会起到反作用(给你推的你都不爱看)
  • 那我们这里针对好友的操作,采用的是Timeline方式,只需要拿到我们关注用户的信息,然后按照时间排序即可

  • 采用Timeline模式,有三种具体的实现方案

  1. 拉模式
  2. 推模式
  3. 推拉结合
  • 拉模式:也叫读扩散

    • 该模式的核心含义是:当张三和李四、王五发了消息之后,都会保存到自己的发件箱中,如果赵六要读取消息,那么他会读取他自己的收件箱,此时系统会从他关注的人群中,将他关注人的信息全都进行拉取,然后进行排序
    • 优点:比较节约空间,因为赵六在读取信息时,并没有重复读取,并且读取完之后,可以将他的收件箱清除
    • 缺点:有延迟,当用户读取数据时,才会去关注的人的时发件箱中拉取信息,假设该用户关注了海量用户,那么此时就会拉取很多信息,对服务器压力巨大
      alt 拉模式
      alt 拉模式
  • 推模式:也叫写扩散

    • 推模式就是没有写邮箱的,当张三写一个内容,此时会主动把张三写的内容发送到它粉丝的收件箱中,假设此时李四再来读取,就不用再去临时拉去了
    • 优点:时效快,不用临时拉取
    • 缺点:内存压力大,假设一个大V发了一个动态,很多人关注他,那么就会写很多份数据到粉丝那边去。
      alt 推模式
      alt 推模式
  • 推拉结合:页脚读写混合,兼具推和拉两种模式的优点

  • 推拉模式是一个折中的方案,站在发件人这一边,如果是普通人,那么我们采用写扩散的方式,直接把数据写入到他的粉丝收件箱中,因为普通人的粉丝数量较少,所以这样不会产生太大压力。但如果是大V,那么他是直接将数据写入一份到发件箱中去,在直接写一份到活跃粉丝的收件箱中,站在收件人这边来看,如果是活跃粉丝,那么大V和普通人发的都会写到自己的收件箱里,但如果是普通粉丝,由于上线不是很频繁,所以等他们上线的时候,再从发件箱中去拉取信息。

    alt 推拉结合
    alt 推拉结合

11.5 推送到粉丝收件箱

  • 需求:

    1. 修改新增探店笔记的业务,在保存blog到数据库的同时,推送到粉丝的收件箱
    2. 收件箱满足可以根据时间戳排序,必须使用Redis的数据结构实现
    3. 查询收件箱数据时,可实现分页查询
  • Feed流中的数据会不断更新,所以数据的角标也会不断变化,所以我们不能使用传统的分页模式

  • 假设在t1时刻,我们取读取第一页,此时page = 1,size = 5,那么我们拿到的就是10~6这几条记录,假设t2时刻有发布了一条新纪录,那么在t3时刻,我们来读取第二页,此时page = 2,size = 5,那么此时读取的数据是从6开始的,读到的是6~2,那么我们就读到了重复的数据,所以我们要使用Feed流的分页,不能使用传统的分页

    alt 传统分页的局限
    alt 传统分页的局限

  • Feed流的滚动分页

  • 我们需要记录每次操作的最后一条,然后从这个位置去开始读数据

  • 举个例子:我们从t1时刻开始,拿到第一页数据,拿到了10~6,然后记录下当前最后一次读取的记录,就是6,t2时刻发布了新纪录,此时这个11在最上面,但不会影响我们之前拿到的6,此时t3时刻来读取第二页,第二页读数据的时候,从6-1=5开始读,这样就拿到了5~1的记录。我们在这个地方可以使用SortedSet来做,使用时间戳来充当表中的1~10

    alt Feed流分页
    alt Feed流分页

  • 核心思路:我们保存完探店笔记后,获取当前用户的粉丝列表,然后将数据推送给粉丝,因为我们这是点评,没这么多大V,就只用推模式

  • 功能写在修改保存笔记的方法里

1
2
3
4
5
6
7
8
9
10
@Override
public Result saveBlog(Blog blog) {
// 获取登录用户
UserDTO user = UserHolder.getUser();
blog.setUserId(user.getId());
// 保存探店博文
blogService.save(blog);
// 返回id
return Result.ok(blog.getId());
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Override
public Result saveBlog(Blog blog) {
// 1. 获取登录用户
UserDTO user = UserHolder.getUser();
blog.setUserId(user.getId());
// 2. 保存探店博客
boolean isSuccess = blogService.save(blog);
if (!isSuccess) {
return Result.fail("博客保存失败");
}
// 3. 查询blog作者的所有粉丝 select * from tb_follow where follow_user_id = ?
List<Follow> follows = followService.query().eq("follow_user_id", user.getId()).list();
follows.forEach(follow -> {
//3.1 获取粉丝id
Long userId = follow.getUserId();
//3.2 推送笔记id给所有粉丝
String key = "feed:" + userId;
stringRedisTemplate.opsForZSet().add(key, blog.getId().toString(), System.currentTimeMillis());
});
// 5. 返回id
return Result.ok(blog.getId());
}

现在我们发送笔记,打开Redis,发现已经有Feed流来实现推模式了

11.6 实现分页查询收件箱

  • 需求:在个人主页的关注栏中,查询并展示推送的Blog信息

  • 具体步骤如下

  1. 每次查询完成之后,我们要分析出查询出的最小时间戳,这个值会作为下一次的查询条件
    2, 我们需要找到与上一次查询相同的查询个数,并作为偏移量,下次查询的时候,跳过这些查询过的数据,拿到我们需要的数据(例如时间戳8 6 6 5 5 4,我们每次查询3个,第一次是8 6 6,此时最小时间戳是6,如果不设置偏移量,会从第一个6之后开始查询,那么查询到的就是6 5 5,而不是5 5 4,如果这里说的不清楚,那就看后续的代码)
  • 综上:我们的请求参数中需要携带lastId和offset,即上一次查询时的最小时间戳和偏移量,这两个参数
    编写一个通用的实体类,不一定只对blog进行分页查询,这里用泛型做一个通用的分页查询,list是封装返回的结果,minTime是记录的最小时间戳,offset是记录偏移量
1
2
3
4
5
6
@Data
public class ScrollResult {
private List<?> list;
private Long minTime;
private Integer offset;
}
1
2
3
4
5
6
7
8
9
10
@Override
public Result saveBlog(Blog blog) {
// 获取登录用户
UserDTO user = UserHolder.getUser();
blog.setUserId(user.getId());
// 保存探店博文
blogService.save(blog);
// 返回id
return Result.ok(blog.getId());
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
   @Override
public Result queryBlogOfFollow(Long max, Integer offset) {
//1. 获取当前用户
Long userId = UserHolder.getUser().getId();
//2. 查询该用户收件箱(之前我们存的key是固定前缀 + 粉丝id),所以根据当前用户id就可以查询是否有关注的人发了笔记
String key = FEED_KEY + userId;
Set<ZSetOperations.TypedTuple<String>> typeTuples = stringRedisTemplate.opsForZSet()
.reverseRangeByScoreWithScores(key, 0, max, offset, 10);// 一次最多10条
//3. 非空判断
if (typeTuples == null || typeTuples.isEmpty()){
return Result.ok(Collections.emptyList());
}
//4. 解析数据,blogId、minTime(时间戳)、offset,这里指定创建的list大小,可以略微提高效率,因为我们知道这个list就得是这么大
List<Long> blogIds = new ArrayList<>(typeTuples.size());
long minTime = 0;
int newOffset = 1;
for (ZSetOperations.TypedTuple<String> typeTuple : typeTuples) {
//4.1 获取id
String id = typeTuple.getValue();
blogIds.add(Long.valueOf(id));

//4.2 获取score(时间戳)
long time = typeTuple.getScore().longValue();
if (time == minTime){
newOffset++;
}else {
minTime = time;
newOffset = 1;
}
}
// 4. 神之一手!直接用 listByIds(),它内部自动处理:
// - 空集合安全
// - 自动生成 IN (?, ?, ?)
// - 自动加上 ORDER BY FIELD(id, ?, ?, ?)
List<Blog> blogs = listByIds(blogIds);

if (CollUtil.isEmpty(blogs)) {
return Result.ok(Collections.emptyList());
}

// 5. 手动按收件箱顺序排序(因为 listByIds 的 FIELD 排序可能和我们期望的顺序不完全一致)
// 这一步保留原始时间线顺序,超级重要!
blogs.sort(Comparator.comparingInt(b -> blogIds.indexOf(b.getId())));

// 6. 查询用户信息 + 点赞状态(不变)
for (Blog blog : blogs) {
queryBlogUser(blog);
isBlogLiked(blog);
}

// 7. 封装返回
ScrollResult result = new ScrollResult();
result.setList(blogs);
result.setMinTime(minTime);
result.setOffset(newOffset);

return Result.ok(result);
}

12. 附近商铺

12.1 GEO数据结构的基本用法

& GEO就是Geolocation的简写形式,代表地理坐标。Redis在3.2版本中加入了对GEO的支持,允许存储地理坐标信息,帮助我们根据经纬度来检索数据,常见的命令有

  1. GEOADD:添加一个地理空间信息,包含:经度(longitude)、纬度(latitude)、值(member)
  • 命令格式
    GEOADD key longitude latitude member [longitude latitude member …]
    • 返回值:添加到sorted set元素的数目,但不包括已更新score的元素
    • 复杂度:每⼀个元素添加是O(log(N)) ,N是sorted set的元素数量
    • 举例
      GEOADD china 13.361389 38.115556 "shanghai" 15.087269 37.502669 "beijing"
  1. GEODIST:计算指定的两个点之间的距离并返回
  • 命令格式
    GEODIST key member1 member2 [m|km|ft|mi]
    如果两个位置之间的其中⼀个不存在, 那么命令返回空值。
    指定单位的参数 unit 必须是m km mi(英里) ft(英尺)中的一个
  • 如果用户没有显式地指定单位参数, 那么 GEODIST 默认使用米作为单位。
  1. GEODIST 命令在计算距离时会假设地球为完美的球形, 在极限情况下, 这⼀假设最⼤会造成 0.5% 的误差

    • 返回值:计算出的距离会以双精度浮点数的形式被返回。 如果给定的位置元素不存在, 那么命令返回空值
    • 举例
      GEODIST china beijing shanghai km
  2. GEOHASH:将指定member的坐标转化为hash字符串形式并返回

    • 命令格式
      GEOHASH key member [member …]
    • 通常使用表示位置的元素使用不同的技术,使用Geohash位置52点整数编码。由于编码和解码过程中所使用的初始最小和最大坐标不同,编码的编码也不同于标准。此命令返回一个标准的Geohash,在维基百科和geohash.org网站都有相关描述
    • 返回值:一个数组, 数组的每个项都是一个 geohash 。 命令返回的 geohash 的位置与用户给定的位置元素的位置一一对应
    • 复杂度:O(log(N)) for each member requested, where N is the number of elements in the sorted set
    • 举例
    1
    2
    3
     云服务器:0>GEOHASH china beijing shanghai
    1) "sqdtr74hyu0"
    2) "sqc8b49rny0"
  3. GEOPOS:返回指定member的坐标

    • 格式:GEOPOS key member [member …]
    • 给定一个sorted set表示的空间索引,密集使用 geoadd 命令,它以获得指定成员的坐标往往是有益的。当空间索引填充通过 geoadd 的坐标转换成一个52位Geohash,所以返回的坐标可能不完全以添加元素的,但小的错误可能会出台。
    • 因为 GEOPOS 命令接受可变数量的位置元素作为输入, 所以即使用户只给定了一个位置元素, 命令也会返回数组回复
    • 返回值:GEOPOS 命令返回一个数组, 数组中的每个项都由两个元素组成: 第一个元素为给定位置元素的经度, 而第二个元素则为给定位置元素的纬度。当给定的位置元素不存在时, 对应的数组项为空值
    • 复杂度:O(log(N)) for each member requested, where N is the number of elements in the sorted set
    1
    2
    3
    4
    5
    6
     云服务器:0>geopos china beijing shanghai
    1) 1) "15.08726745843887329"
    2) "37.50266842333162032"

    2) 1) "13.36138933897018433"
    3) "38.11555639549629859"

6 GEOSEARCH:在指定范围内搜索member,并按照与制定点之间的距离排序后返回,范围可以使圆形或矩形,6.2的新功能

  • 命令格式
    GEOSEARCH key [FROMMEMBER member] [FROMLONLAT longitude latitude] [BYRADIUS radius m|km|ft|mi]
    [BYBOX width height m|km|ft|mi] [ASC|DESC] [COUNT count [ANY]] [WITHCOORD] [WITHDIST] [WITHHASH]
  • 举例
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
     云服务器:0>geosearch china FROMLONLAT 15 37 BYRADIUS 200 km ASC WITHCOORD WITHDIST
    1) 1) "beijing"
    1) "56.4413"
    1) 1) "15.08726745843887329"
    1) "37.50266842333162032"


    3) 1) "shanghai"
    1) "190.4424"
    1) 1) "13.36138933897018433"
    1) "38.11555639549629859"



    云服务器:0>geosearch china FROMLONLAT 15 37 BYBOX 400 400 km DESC WITHCOORD WITHDIST
    1) 1) "shanghai"
    1) "190.4424"
    1) 1) "13.36138933897018433"
    1) "38.11555639549629859"


    4) 1) "beijing"
    1) "56.4413"
    1) S1) "15.08726745843887329"
    1) "37.50266842333162032"
  1. GEOSEARCHSTORE:与GEOSEARCH功能一致,不过可以把结果存储到一个指定的key,也是6.2的新功能
    • 命令格式
      GEOSEARCHSTORE destination source [FROMMEMBER member] [FROMLONLAT longitude latitude]
      [BYRADIUS radius m|km|ft|mi] [BYBOX width height m|km|ft|mi]
      [ASC|DESC] [COUNT count [ANY]] [STOREDIST]
    • 这个命令和 GEORADIUS 命令一样, 都可以找出位于指定范围内的元素, 但是 GEORADIUSBYMEMBER 的中心点是由给定的位置元素决定的, 而不是像 GEORADIUS 那样, 使用输入的经度和纬度来决定中心点
    • 指定成员的位置被用作查询的中心。
    • 关于 GEORADIUSBYMEMBER 命令的更多信息, 请参考 GEORADIUS 命令的文档
    • 复杂度:O(N+log(M)) where N is the number of elements inside the bounding box of the circular area delimited by center and radius and M is the number of items inside the index
    1
    2
    3
    云服务器:0>GEORADIUSBYMEMBER china beijing 200 km
    1) "shanghai"
    2) "beijing"

13. 用户签到

13.1 BitMap功能演示

  • 我们针对签到功能完全可以通过MySQL来完成,如下表
Field Type Collation Null Key Default Comment
id bigint unsigned (NULL) NO PRI (NULL) 主键
user_id bigint unsigned (NULL) NO (NULL) 用户id
year year (NULL) NO (NULL) 签到的年
month tinyint (NULL) NO (NULL) 签到的月份
date date (NULL) NO (NULL) 签到的日期
is_backup tinyint unsigned (NULL) YES (NULL) 是否补签
  • 用户签到一次,就是一条记录,假如有1000W用户,平均没人每年签到10次,那这张表一年的数据量就有1亿条

  • 那有没有方法能简化的呢?

    • 我们可以使用二进制位来记录每个月的签到情况,签到记录为1,未签到记录为0,把每一个bit位对应当月的每一天,形成映射关系,用0和1标识业务状态,这种思路就成为位图(BitMap)。这样我们就能用极小的空间,来实现大量数据的表示。
  • Redis中是利用String类型数据结构实现BitMap,因此上限是512M,转换为bit则是2^32个bit位

  • BitMap的操作命令:

  1. SETBIT:向指定位置(offset)存入一个0或1
  2. GETBIT:获取指定位置(offset)的bit值
  3. BITCOUNT:统计BitMap中值为1的bit位的数量
  4. BITFIELD:操作(查询、修改、自增)BitMap中bit数组中的指定位置(offset)的值
  5. BITFIELD_RO:获取BitMap中bit数组,并以十进制形式返回
  6. BITOP:将多个BitMap的结果做位运算(与、或、异或)
  7. BITPOS:查找bit数组中指定范围内第一个0或1出现的位置

13.2 实现签到功能

  • 需求:实现签到接口,将当前用户签到信息保存到Redis中

  • 签到的请求:

    Request URL
    http://localhost:8080/api/user/sign
    Request Method
    GET

  • 思路:我们可以把年和月作为BitMap的key,然后保存到一个BitMap中,每次签到就把对应位上的0变成1,只要是1就说明这一天已经签到了,反之则没有签到

  • 由于BitMap底层是基于String数据结构,因此其操作也都封装在字符串相关操作中了

  1. 在UserController中编写对应方法
1
2
3
4
@PostMapping("/sign")
public Result sign(){
return userService.sign();
}
  1. 在Impl层写具体实现代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Override
public Result sign() {
//1. 获取当前用户
Long userId = UserHolder.getUser().getId();
//2. 获取日期
LocalDateTime now = LocalDateTime.now();
//3. 拼接key
String keySuffix = now.format(DateTimeFormatter.ofPattern(":yyyyMM"));
String key = USER_SIGN_KEY + keySuffix;

//4. 获取今天是当月第几天(1~31)
int dayOfMonth = now.getDayOfMonth();

//写入Redis BITSET offset 1
stringRedisTemplate.opsForValue().setBit(key,dayOfMonth - 1,true);
return Result.ok();
}
  • 使用PostMan发送请求测试
  • 发送成功之后,在Redis图形化界面中是可以看到的
    alt 签到存储
    alt 签到存储

13.3 实现签到统计

  • 如何获取本月到今天为止的所有签到数据?

    • `BITFIELD key GET u[dayOfMonth] 0
  • 如何从后往前遍历每个bit位,获取连续签到天数?

    • 从末尾往前数,看看有多少个1
    • 如果遇到0,则证明连续签到间断
    • 简单的位运算算法
      1
      2
      3
      4
      5
      6
      7
      8
      9
      int count = 0;
      while(true) {
      if((num & 1) == 0)
      break;
      else
      count++;
      num >>>= 1;
      }
      return count;
  • 需求:实现下面的接口,统计当前用户截止当前时间在本月的连续签到天数

说明
请求方式 GET
请求路径 /user/sign/count
请求参数
返回值 连续签到天数
  1. 在UserController中创建对应的方法
1
2
3
4
@GetMapping("/sign/count")
public Result signCount(){
return userService.signCount();
}
  1. 在UserServiceImpl中实现方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
@Override
public Result signCount() {
//1. 获取当前用户
Long userId = UserHolder.getUser().getId();
//2. 获取日期
LocalDateTime now = LocalDateTime.now();
//3. 拼接key
String keySuffix = now.format(DateTimeFormatter.ofPattern(":yyyyMM"));
String key = USER_SIGN_KEY + userId + keySuffix;
//4. 获取今天是当月第几天
int dayOfMonth = now.getDayOfMonth();
//5. 获取截止到今天的签到记录 返回的是一个十进制的数字
//BITFIELD key GET uDay 0
//uDay代表查询的是前Day位 最后的0是offset代表从第一位开始
//例如今天第14天 就是u14 0 意思是从第1位开始查询到第14位

List<Long> result = stringRedisTemplate.opsForValue()
.bitField(key, BitFieldSubCommands.create()
.get(BitFieldSubCommands.BitFieldType
.unsigned(dayOfMonth))
.valueAt(0)
);
if (result == null||result.isEmpty()) {
return Result.ok(0);
}
//6. 遍历循环
int cnt = 0;
Long num = result.get(0);
while (true){
//7. 让数字于1做与运算,得到数字最后一个bit位
if((num &1) == 0){ //如果bit位为0 证明未签到 连续签到中断
break;
}else {
cnt++;
//数字右移 抛弃最后一位
//使用>>>而不是>>是因为
//>>会符号位扩展,导致死循环
//>>>无符号右移
num >>>= 1;
}

}
return Result.ok(cnt);
}

使用PostMan发送请求,可以手动修改redis中的签到数据多次测试。

14. UV统计

14.1 HyperLogLog用法

  • UV:全称Unique Visitor,也叫独立访客量,是指通过互联网访问、浏览这个网页的自然人。1天内同一个用户多次访问该网站,只记录1次。

  • PV:全称Page View,也叫页面访问量或点击量,用户每访问网站的一个页面,记录1次PV,用户多次打开页面,则记录多次PV。往往用来衡量网站的流量。

  • 通常来说PV会比UV大很多,所以衡量同一个网站的访问量,我们需要综合考虑很多因素。

  • UV统计在服务端做会很麻烦,因为要判断该用户是否已经统计过了,需要将统计过的信息保存,但是如果每个访问的用户都保存到Redis中,那么数据库会非常恐怖,那么该如何处理呢?

    • HyperLogLog(HLL)是从Loglog算法派生的概率算法,用户确定非常大的集合基数,而不需要存储其所有值。
  • Redis中的HLL是基于string结构实现的,单个HLL的内存永远小于16kb,内存占用低的令人发指!作为代价,其测量结果是概率性的,有小于0.81%的误差。不过对于UV统计来说,这完全可以忽略。

  • 常用的三个方法

    1
    2
    3
    4
    5
    6
    7
    8
    PFADD key element [element...]


    PFCOUNT key [key ...]


    PFMERGE destkey sourcekey [sourcekey ...]

14.2 测试百万数据的统计

  • 使用单元测试,向HyperLogLog中添加100万条数据,看看内存占用是否真的那么低,以及统计误差如何
1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Test
public void testHyperLogLog() {
String[] users = new String[1000];
int j = 0;
for (int i = 0; i < 1000000; i++) {
j = i % 1000;
users[j] = "user_" + i;
if (j == 999) {
stringRedisTemplate.opsForHyperLogLog().add("HLL", users);
}
}
Long count = stringRedisTemplate.opsForHyperLogLog().size("HLL");
System.out.println("count = " + count);
}

15. 结语

本项目顺利结束,本人因为课程原因敲的较慢,用了两三个星期时间去完善。
说一下我对黑马点评这个项目的见解:

  • 该项目的最重要部分就是秒杀,分布式锁和消息队列,之后的知识点知识浅尝辄止,让我们作为了解知道这些功能应该如何去实现。
  • 该项目的前端部分不太好用,没有felx去适配缩放
  • 后端教学部分十分巧妙,作为入门级项目特别优秀

学到这里说明后端之路已经走了一半了,希望你我都能在毕业之前找到好工作

封笔于2025/11/10,本人大二上💕。
项目目录https://github.com/yin-bo-Final/hm-dianping

  • 标题: 黑马点评
  • 作者: yin_bo_
  • 创建于 : 2025-10-18 12:24:35
  • 更新于 : 2025-11-20 15:31:51
  • 链接: https://www.blog.yinbo.xyz/2025/10/18/Java学习/Redis/黑马点评/
  • 版权声明: 本文章采用 CC BY-NC-SA 4.0 进行许可。
目录
黑马点评