RocketMQ 5.x

yin_bo_ Lv2

1. MQ概述

  • MQ(MessageQueue):既消息队列,是在消息传输的过程中保存消息的容器,多用于分布式系统之间通信。其中队列是数据结构的一种,特征为先进先出

alt MQ工作流程
alt MQ工作流程

  • MQ的优势与劣势:
    • 优势
      1. 应用解耦
        • 系统的耦合性越高,容错性和可维护性就越低
        • 所以解耦之后消费方和生产方就没有关系了,消费方存活与否不影响生产方
      2. 异步提速
        • 生产方发完消息,不需要等消费方回应就可以继续下一步业务逻辑
      3. 削峰填谷
        • 高并发时将请求暂存队列,按消费者处理能力平稳消费,避免瞬时流量压垮下游
    • 劣势
      1. 系统可用性降低
        • 一旦MQ宕机,就会对业务产生影响
      2. 系统复杂性提高
        • 以前是系统间同步远程调用,现在是通过MQ进行异步调用,大大增加了系统复杂度
      3. 一致性问题
        • A系统处理完业务,通过MQ给B C D三个系统发消息数据,如果B C系统处理成功 D系统处理失败,就会产生消息不一致

2. RocketMQ入门

2.1 RocketMQ介绍

  • 主流的MQ产品:

    1. ActiveMQ
      • Java语言实现,万级数据吞吐量,处理速度ms级。主从架构,成熟度高 (现在基本没人用了)
    2. RabbitMQ
      • erlang语言实现,万级数据吞吐量,处理速度us级,主从架构
    3. RocketMQ
      • java语言实现,十万级数据吞吐量,处理速度ms级,分布式架构,功能强大,拓展性强
    4. kafka
      • scala语言实现,十万级数据吞吐量,处理速度ms级,分布式架构,功能较少,应用于大数据较多
  • RocketMQ是阿里开源的一块非常优秀的中间件产品,他现在已经在阿里内部被广泛的应用,并且经受住了多次双十一这种极致场景的压力(2017年双十一,RocketMQ流转的消息量带到了万亿级,峰值TPS达到5600万)

2.2 RocketMQ工作原理

  • 四大核心角色

    • Producer:消息生产者
    • Consumer:消息消费者
    • Broker:消息存储与转发服务(真正干活的角色)
    • NameServer:轻量级注册中心,类似“微型 Nacos”,提供路由发现与心跳检测
  • Porducer发消息给BrokerBroker收到消息并持久化成功后,立刻给Producer返回ACK(确认)(这一步与Consumer无关),同时消息异步的推送/拉取给Consumer

  • 这里将消息返回给消费者有两种方式

    1. 拉取模式Consumer每秒发起请求去Broker拉取数据
    2. 推送模式ConsumerBroker建立一个长连接,监听着Broker,如果有消息就返回给Consumer
  • 那么NameServer有什么用呢

    • 之前我们学过SpringCloud吧,NameServer就充当注册中心(类似于Nacos)的角色。
    • 当我们的Broker是个集群的话,Producer应该如何知道发送给哪个Broker消息呢?Consumer应该监听哪个Broker
      • ProducerConsumer会先经过NameServer路由,再去找可以用的Broker
    • 那么NameServer是怎么知道Broker可用且没宕机的呢?
      • 用的是跟注册中心相同原理的心跳检测
        alt MQ工作原理
        alt MQ工作原理

2.3 RocketMQ 5.x安装

这里我们学过Docker了,所以直接在WSL上的Ubuntu系统上的Docker部署RocketMQ就行了😊
具体可以看rocketmq的官网:http://rocketmq.apache.org
现在RocketMQ主推的是5.0的大版本,4.0版本停止维护了,尽量不要用😚

RocketMQ实质上就是Java代码,需要用JDK去运行这里的RocketMQ镜像里已经自带JDK了,所以不需要再次安装

如何使用Docker安装RocketMQ

  1. 首先我们需要拉取相关的镜像,如apache/rocketmq:5.3.2(RocketMQ资源)pangliang/rocketmq-console-ng(可视化仪表盘资源)

    1
    2
    docker pull apache/rocketmq:5.3.2
    docker pull pangliang/rocketmq-console-ng

    如果拉取不了镜像,就要用魔法(🤫),我们的魔法不会作用于终端,所以要配置一下。
    在我们的魔法页面查到代理的端口,比如我的是7890,输入以下指令进行终端代理

    1
    2
    $env:https_proxy="http://127.0.0.1:7890"
    $env:http_proxy="http://127.0.0.1:7890"
  2. 编写broker.conf,用来配置broker

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    brokerClusterName = DefaultCluster
    brokerName = broker-a
    brokerId = 0
    deleteWhen = 04
    fileReservedTime = 48
    brokerRole = ASYNC_MASTER
    flushDiskType = ASYNC_FLUSH

    namesrvAddr = rmqnamesrv:9876
    listenPort = 10911
    brokerIP1 = rmqbroker
    brokerIP2 = rmqbroker
    haListenPort = 10912

    autoCreateTopicEnable = true
    autoCreateSubscriptionGroup = true

    storePathRootDir = /home/rocketmq/store
    storePathCommitLog = /home/rocketmq/store/commitlog
    storePathConsumeQueue = /home/rocketmq/store/consumequeue
    storePathIndex = /home/rocketmq/store/index
  3. 编写docker-compose (如果不知道docker-compose是什么看我之前的docker笔记)

    • 介绍一下容器的配置
    1. rmqnamesrv容器:端口为9876,因为是类似注册中心的作用,所以要先部署
    2. rmqbroker容器:端口有三个:10909,10911,10912,需要配置环境(nameserver的地址),并且依赖于namesrv服务。broker的配置挂载到/broker.conf的文件里
    3. rmqproxy容器:端口为8080和8081,这里简单的负责路由
    4. rmqdashboard容器:可视化界面,容器内端口为8080,映射到宿主机的端口为9999,所以我们要访问仪表盘直接访问 http://localhost:9999 即可
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    services:

    namesrv: # NameServer配置
    image: apache/rocketmq:5.3.2
    container_name: rmqnamesrv
    ports:
    - 9876:9876
    networks:
    - rocketmq
    command: sh mqnamesrv

    broker: # broker配置
    image: apache/rocketmq:5.3.2
    container_name: rmqbroker
    ports:
    - 10909:10909
    - 10911:10911
    - 10912:10912
    volumes:
    - ./broker.conf:/home/rocketmq/conf/broker.conf
    environment:
    - NAMESRV_ADDR=rmqnamesrv:9876
    depends_on:
    - namesrv
    networks:
    - rocketmq
    command: sh mqbroker -c /home/rocketmq/conf/broker.conf

    proxy: # 代理配置,类似于gateway
    image: apache/rocketmq:5.3.2
    container_name: rmqproxy
    networks:
    - rocketmq
    depends_on:
    - broker
    - namesrv
    ports:
    - 8080:8080
    - 8081:8081
    restart: on-failure
    environment:
    - NAMESRV_ADDR=rmqnamesrv:9876
    command: sh mqproxy

    dashboard: # 仪表盘配置
    image: pangliang/rocketmq-console-ng
    container_name: rmqdashboard
    ports:
    - 9999:8080
    environment:
    - JAVA_OPTS=-Drocketmq.namesrv.addr=rmqnamesrv:9876
    networks:
    - rocketmq
    depends_on:
    - namesrv
    restart: on-failure
    networks:
    rocketmq:
    driver: bridge
  4. 访问http://localhost:9999可以看到RocketMQ的仪表盘

    alt RocketMQ可视化界面
    alt RocketMQ可视化界面

3. RocketMQ消息发送与接收

3.1 简单生产者/消费者书写

  1. 引入rocketmq-client-java依赖

    • 依赖的版本最好和RocketMQ的版本形同,这里引入的是rocketmq-client-java新API,而不是rocketmq-client旧的API
    1
    2
    3
    4
    5
    <dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client-java</artifactId>
    <version>5.3.2</version>
    </dependency>
  2. 生产者代码

    1. 先写出接入点地址(endpoint)和标题(topic)
    2. new一个认证,里面有接入点地址
    3. 使用producer的工厂类初始化生产者
    4. 使用message的工厂来初始化消息,并发送且得到回执(receipt)
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    public class SimpleProducer {
    private static final Logger log = LoggerFactory.getLogger(SimpleProducer.class);

    public static void main(String[] args) throws ClientException {
    //1. 接入点地址,这里必须是proxy代理的地址和端口
    String endpoint = "localhost:8081";
    //2. 消息发送目标的Topic名称
    String topic = "TopicTest";

    //3. 写一下conf,里面带有接入点地址,也就是MQ的代理地址
    ClientConfiguration configuration = ClientConfiguration
    .newBuilder()
    .setEndpoints(endpoint)
    .build();

    //4. 初始化Producer
    Producer producer = provider
    .newProducerBuilder()
    .setTopics(topic)
    .setClientConfiguration(configuration)
    .build();

    //5. 设置发送的普通消息
    MessageBuilder messageBuilder = new MessageBuilderImpl();
    Message message = messageBuilder
    .setTopic(topic)
    .setKeys("Key is me")
    .setTag("Tag is null")
    .setBody(("God Drinks Java" + i).getBytes())
    .build();

    try {
    //开始发送消息,需要关注发送的结果,并捕获失败异常 receipt是回执的意思
    SendReceipt receipt = producer.send(message);
    log.info(receipt.toString());
    }catch (Exception e) {
    log.error("Fail to send message",e);
    }
    }
    }
  3. 消费者代码

    • 这里我们使用**推模式(push)**来进行消费者消息的推送
    • 跟生产者的思路差不多,除了topic还要指定消费者所属的消费者组
    • 这里设置消费监听器来建立与broker的长连接,随时监听producer的消息
    • 当我们开启推模式消费者代码时,producer每发送一条消息,comsumer就会立刻从监听器里接受.
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    public class SimpleConsumer {
    private static final org.slf4j.Logger log = LoggerFactory.getLogger(SimpleConsumer.class);

    public static void main(String[] args) throws ClientException, InterruptedException {
    String endpoint = "localhost:8081";
    //2. 从namesrv里获取认证
    ClientConfiguration clientConfiguration = ClientConfiguration
    .newBuilder()
    .setEndpoints(endpoint)
    .build();

    // 3. 订阅消息的过滤规则, *表示订阅所有Tag的信息
    String tag = "*";
    FilterExpression filter = new FilterExpression(tag, FilterExpressionType.TAG);

    // 4. 指定所属的消费者组
    String Group = "ConsumerGroup";
    // 5. 指定订阅目标的Topic
    String topic = "TopicTest";
    // 6 初始化工厂类provider的接入点

    // 7. 初始化PushConsumer,需要绑定消费者组,通信参数和订阅管理,这里还是使用provider工厂类来构建
    PushConsumerBuilderImpl pushConsumerBuilder = new PushConsumerBuilderImpl();
    PushConsumer pushConsumer = pushConsumerBuilder.setConsumerGroup(Group)
    .setClientConfiguration(clientConfiguration)
    .setSubscriptionExpressions(Collections.singletonMap(topic, filter))
    .setMessageListener(messageView -> {
    //处理消息并返回消费结果
    ByteBuffer byteBuffer = messageView.getBody();
    String content = StandardCharsets.UTF_8.decode(
    byteBuffer.duplicate()
    ).toString();
    log.info("messageId={} message={}", messageView.getMessageId(), content);
    return ConsumeResult.SUCCESS;
    })
    .build();
    }
    }
  4. 总结

    • 这是我producer发送消息的时间和我consumer接受消息的时间
      • 可以看到只差0.1s,既是这样也是算上发送日志的时间
      • 足以见得推模式的效率有多么快
    1
    2
    18:04:59.774 [main] INFO ....
    18:04:59.784 [RocketmqMessageConsumption-0-67] INFO ....

3.2 单生产者多消费者模式

  • 有如下场景
    • 我一个主题为topic1的生产者连续发送了10条消息,这时有两个消费者正在监听topic1
    • 这两个消费者获得的消息会是怎样的?
      • 是每个人都分到了全部的消息?
      • 还是平分了这十个消息?
  • 我们来实验一下
  1. 首先要让消费者代码能起多个实例
  • 运行配置 —> 修改选项 —> 允许多个实例
  1. 我们修改一下producer部分代码,让代码连续发送十条消息

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
        for (int i = 1; i <= 10; i++) {
    //5. 设置发送的普通消息
    Message message = provider
    .newMessageBuilder()
    .setTopic(topic)
    .setKeys("Key is me")
    .setTag("Tag is null")
    .setBody(("God Drinks Java" + i).getBytes())
    .build();
    try {
    //开始发送消息,需要关注发送的结果,并捕获失败异常 receipt是回执的意思
    SendReceipt receipt = messageBuilder.send(message);
    log.info(receipt.toString());
    }catch (Exception e) {
    log.error("Fail to send message",e);
    }
    }
  2. 启动两个consumer,然后再启动producer,发现如下结果

    • 可以看到producer发送的消息被两个consumer均分了,但是顺序是乱序,为什么?

18:49:22.095 [main] INFO cn.itcast.demo.simple.Producer - SendReceiptImpl{messageId=01683421218C3DD25409405E3200000000, recallHandle=}
18:49:22.106 [main] INFO cn.itcast.demo.simple.Producer - SendReceiptImpl{messageId=01683421218C3DD25409405E3300000001, recallHandle=}
18:49:22.119 [main] INFO cn.itcast.demo.simple.Producer - SendReceiptImpl{messageId=01683421218C3DD25409405E3300000002, recallHandle=}
18:49:22.126 [main] INFO cn.itcast.demo.simple.Producer - SendReceiptImpl{messageId=01683421218C3DD25409405E3300000003, recallHandle=}
18:49:22.132 [main] INFO cn.itcast.demo.simple.Producer - SendReceiptImpl{messageId=01683421218C3DD25409405E3300000004, recallHandle=}
18:49:22.139 [main] INFO cn.itcast.demo.simple.Producer - SendReceiptImpl{messageId=01683421218C3DD25409405E3300000005, recallHandle=}
18:49:22.145 [main] INFO cn.itcast.demo.simple.Producer - SendReceiptImpl{messageId=01683421218C3DD25409405E3300000006, recallHandle=}
18:49:22.152 [main] INFO cn.itcast.demo.simple.Producer - SendReceiptImpl{messageId=01683421218C3DD25409405E3300000007, recallHandle=}
18:49:22.159 [main] INFO cn.itcast.demo.simple.Producer - SendReceiptImpl{messageId=01683421218C3DD25409405E3300000008, recallHandle=}
18:49:22.166 [main] INFO cn.itcast.demo.simple.Producer - SendReceiptImpl{messageId=01683421218C3DD25409405E3300000009, recallHandle=}

18:49:22.093 [RocketmqMessageConsumption-0-84] INFO cn.itcast.demo.one2many.Consumer - messageId=01683421218C3DD25409405E3200000000 message=God Drinks Java1
18:49:22.120 [RocketmqMessageConsumption-0-26] INFO cn.itcast.demo.one2many.Consumer - messageId=01683421218C3DD25409405E3300000002 message=God Drinks Java3
18:49:22.133 [RocketmqMessageConsumption-0-41] INFO cn.itcast.demo.one2many.Consumer - messageId=01683421218C3DD25409405E3300000004 message=God Drinks Java5
18:49:22.146 [RocketmqMessageConsumption-0-43] INFO cn.itcast.demo.one2many.Consumer - messageId=01683421218C3DD25409405E3300000006 message=God Drinks Java7
18:49:22.161 [RocketmqMessageConsumption-0-40] INFO cn.itcast.demo.one2many.Consumer - messageId=01683421218C3DD25409405E3300000008 message=God Drinks Java9

18:49:22.108 [RocketmqMessageConsumption-0-76] INFO cn.itcast.demo.one2many.Consumer - messageId=01683421218C3DD25409405E3300000001 message=God Drinks Java2
18:49:22.128 [RocketmqMessageConsumption-0-77] INFO cn.itcast.demo.one2many.Consumer - messageId=01683421218C3DD25409405E3300000003 message=God Drinks Java4
18:49:22.142 [RocketmqMessageConsumption-0-78] INFO cn.itcast.demo.one2many.Consumer - messageId=01683421218C3DD25409405E3300000005 message=God Drinks Java6
18:49:22.155 [RocketmqMessageConsumption-0-79] INFO cn.itcast.demo.one2many.Consumer - messageId=01683421218C3DD25409405E3300000007 message=God Drinks Java8
18:49:22.167 [RocketmqMessageConsumption-0-80] INFO cn.itcast.demo.one2many.Consumer - messageId=01683421218C3DD25409405E3300000009 message=God Drinks Java10

  • 如果我们想让两个consumer都接收到这十个消息,应该怎么做?

    • 我们将consumer的消费者组改为group1,启动一个consumer,然后改为group2,再启动一个consumer
    • 这时我们运行producer发送十条消息,发现两个consumer都接收到了10条消息
  • 所以在单生产者多消费者消息发送中有两个方式

    1. 两个消费者,相同的消费者组,他们会负载均衡的接受生产者的消息
    2. 两个消费者,不同的消费者组,生产者会给每个组都发送相同的消息

4. 主题(Topic)

  • 主题定义:是MQ中消息运输和存储的顶层容器,用于标识同一类业务逻辑的消息

  • 主题作用

    1. 定义数据的分类隔离:将不同业务类型的数据拆分到不同的主题中管理,通过主题实现存储和订阅的隔离性。
    2. 定义数据的身份和权限:消息本身是匿名无身份的,同一分类的消息使用相同的主题做身份识别和权限管理。
  • 模型关系

    alt 主题流程与位置
    alt 主题流程与位置

    主题是顶层存储,所有消息资源的定义都在主题内完成。但主题是一个逻辑概念,并不是实际的消息容器
    主题由多个队列(queue)组成,先进先出

5. 消息(Message)

5.1 消息特点与分类

  • 定义:消息是MQ中最小数据传输单元

  • 特点

    1. 消息不可变性:消息本质上是已经产生并确定的时间,一旦产生后,消息的内容不会发生改变

    2. 消息持久化:RocketMQ会对消息进行持久化,即将接受到的消息存储到RocketMQ服务端的存储文件中,保证消息的可回溯性和系统故障后可恢复性。

  • 消息类型

    1. **Normal(普通消息)**:消息本身无特殊语义,消息之间没有关联。
    2. **FIFO(顺序消息)**:MQ通过消息分组(MessageGroup)标记一组特定消息的先后顺序,可以保证消息的投递顺序严格按照消息发送的顺序
    3. **Delay(定时/延时消息)**:通过指定延时时间控制消息生产后不要立刻投递,而是延迟后对消费者可见。
    4. **Transaction(事务消息)**:MQ支持分布式事务消息,支持消息调用的事务一致性(简单来说就是事务完成后再发送消息)。

5.2 普通消息(Normal)

  • 普通消息是最常用,最高吞吐的消息,可以保证消息100%不丢。
  • 普通消息氛围三种:
    1. 同步消息
    2. 异步消息
    3. 单向消息

5.2.1 同步消息

  • 同步消息特征:最常用的消息。即时性较强,重要的消息,且必须有回执的消息,例如短信,通知(转账成功)

    • 我们之前写的代码就是同步消息
    • 每次生产者都会生成一个receipt也就是回执,我们在业务中就将回执返回给用户
  • 同步消息执行逻辑

    • 将消息发给用户之后,再告诉用户成功了

5.2.2 异步消息

  • 异步消息特征:即时性最强,对回执的实时性要求不高,可以接受延迟收到回执

  • 异步消息执行逻辑

    • 将消息异步丢给Netty线程去执行
    • 同时给用户返回成功信息
    • 等Netty线程处理完,也就是成功的发给Broker,且Broker返回了ACK
  • 那么怎么去写异步消息呢

    • 使用producer的sendAsync方法,获取一个future,然后在future里写异步执行的机制
    • 然后把成功信息直接返回给用户
    • 等Broker返回ACK时,执行回调,返回回执
    • 这里返回的Throwable是检测Netty线程是否让Broker返回了ACK,如果没有返回ACK,则报错,让人员去补发消息
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
          for (int i = 1; i <= 10; i++) {
      //5. 设置发送的普通消息
      MessageBuilder messageBuilder = new MessageBuilderImpl();
      Message message = messageBuilder
      .setTopic(topic)
      .setKeys("Key is me")
      .setTag("Tag is null")
      .setBody(("God Drinks Java" + i).getBytes())
      .build();


      //消息被Netty线程抢走,在Netty线程发送消息
      CompletableFuture<SendReceipt> future = producer.sendAsync(message);

      //直接给用户返回成功信息
      log.info("用户的订单{}发送成功了", i);

      //等Netty线程把消息发给Broker,并且Broker返回ACK时,执行这里的回调
      //throwable是Netty线程返回的是否Broker返回ACK的结果
      future.whenCompleteAsync((Receipt, throwable) -> {
      //这里用null != throwable 是因为要防止空指针
      if (null != throwable) {
      log.error("Failed to send message", throwable);
      // Return early.
      return;
      }

      //可以异步实现的功能
      log.info("Send message successfully, messageId={}", Receipt.getMessageId());
      });
      }
  • 这里回调消息也就是发送消息和返回用户成功信息是同时进行的,所以叫做异步消息

5.2.3 单向消息

  • 单项消息特征:不需要有回执的消息,例如日志类的消息

5.3 顺序消息(FIFO)

  • 标题: RocketMQ 5.x
  • 作者: yin_bo_
  • 创建于 : 2025-11-27 13:13:16
  • 更新于 : 2025-12-04 17:53:41
  • 链接: https://www.blog.yinbo.xyz/2025/11/27/微服务/RocketMQ/
  • 版权声明: 本文章采用 CC BY-NC-SA 4.0 进行许可。