RocketMQ 5.x
1. MQ概述
- MQ(
MessageQueue):既消息队列,是在消息传输的过程中保存消息的容器,多用于分布式系统之间通信。其中队列是数据结构的一种,特征为先进先出

- MQ的优势与劣势:
- 优势:
- 应用解耦
- 系统的耦合性越高,容错性和可维护性就越低
- 所以解耦之后消费方和生产方就没有关系了,消费方存活与否不影响生产方
- 异步提速
- 生产方发完消息,不需要等消费方回应就可以继续下一步业务逻辑
- 削峰填谷
- 高并发时将请求暂存队列,按消费者处理能力平稳消费,避免瞬时流量压垮下游
- 应用解耦
- 劣势:
- 系统可用性降低
- 一旦MQ宕机,就会对业务产生影响
- 系统复杂性提高
- 以前是系统间同步远程调用,现在是通过MQ进行异步调用,大大增加了系统复杂度
- 一致性问题
- A系统处理完业务,通过MQ给B C D三个系统发消息数据,如果B C系统处理成功 D系统处理失败,就会产生消息不一致
- 系统可用性降低
- 优势:
2. RocketMQ入门
2.1 RocketMQ介绍
主流的MQ产品:
- ActiveMQ
- Java语言实现,万级数据吞吐量,处理速度ms级。主从架构,成熟度高 (现在基本没人用了)
- RabbitMQ
- erlang语言实现,万级数据吞吐量,处理速度us级,主从架构
- RocketMQ
- java语言实现,十万级数据吞吐量,处理速度ms级,分布式架构,功能强大,拓展性强
- kafka
- scala语言实现,十万级数据吞吐量,处理速度ms级,分布式架构,功能较少,应用于大数据较多
- ActiveMQ
RocketMQ是阿里开源的一块非常优秀的中间件产品,他现在已经在阿里内部被广泛的应用,并且经受住了多次双十一这种极致场景的压力(2017年双十一,RocketMQ流转的消息量带到了万亿级,峰值TPS达到5600万)
2.2 RocketMQ工作原理
四大核心角色
- Producer:消息生产者
- Consumer:消息消费者
- Broker:消息存储与转发服务(真正干活的角色)
- NameServer:轻量级注册中心,类似“微型 Nacos”,提供路由发现与心跳检测
Porducer发消息给Broker,Broker收到消息并持久化成功后,立刻给Producer返回ACK(确认)(这一步与Consumer无关),同时消息异步的推送/拉取给Consumer这里将消息返回给消费者有两种方式
- 拉取模式:
Consumer每秒发起请求去Broker拉取数据 - 推送模式:
Consumer和Broker建立一个长连接,监听着Broker,如果有消息就返回给Consumer
- 拉取模式:
那么NameServer有什么用呢?
- 之前我们学过SpringCloud吧,
NameServer就充当注册中心(类似于Nacos)的角色。 - 当我们的
Broker是个集群的话,Producer应该如何知道发送给哪个Broker消息呢?Consumer应该监听哪个Broker?Producer和Consumer会先经过NameServer路由,再去找可以用的Broker
- 那么NameServer是怎么知道Broker可用且没宕机的呢?
- 用的是跟注册中心相同原理的心跳检测

alt MQ工作原理
- 用的是跟注册中心相同原理的心跳检测
- 之前我们学过SpringCloud吧,
2.3 RocketMQ 5.x安装
这里我们学过Docker了,所以直接在WSL上的Ubuntu系统上的Docker部署RocketMQ就行了😊
具体可以看rocketmq的官网:http://rocketmq.apache.org
现在RocketMQ主推的是5.0的大版本,4.0版本停止维护了,尽量不要用😚
RocketMQ实质上就是Java代码,需要用JDK去运行,这里的RocketMQ镜像里已经自带JDK了,所以不需要再次安装
如何使用Docker安装RocketMQ
首先我们需要拉取相关的镜像,如apache/rocketmq:5.3.2(RocketMQ资源)和pangliang/rocketmq-console-ng(可视化仪表盘资源)
1
2docker pull apache/rocketmq:5.3.2
docker pull pangliang/rocketmq-console-ng如果拉取不了镜像,就要用魔法(🤫),我们的魔法不会作用于终端,所以要配置一下。
在我们的魔法页面查到代理的端口,比如我的是7890,输入以下指令进行终端代理1
2$env:https_proxy="http://127.0.0.1:7890"
$env:http_proxy="http://127.0.0.1:7890"编写
broker.conf,用来配置broker1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
namesrvAddr = rmqnamesrv:9876
listenPort = 10911
brokerIP1 = rmqbroker
brokerIP2 = rmqbroker
haListenPort = 10912
autoCreateTopicEnable = true
autoCreateSubscriptionGroup = true
storePathRootDir = /home/rocketmq/store
storePathCommitLog = /home/rocketmq/store/commitlog
storePathConsumeQueue = /home/rocketmq/store/consumequeue
storePathIndex = /home/rocketmq/store/index编写
docker-compose(如果不知道docker-compose是什么看我之前的docker笔记)- 介绍一下容器的配置
rmqnamesrv容器:端口为9876,因为是类似注册中心的作用,所以要先部署rmqbroker容器:端口有三个:10909,10911,10912,需要配置环境(nameserver的地址),并且依赖于namesrv服务。broker的配置挂载到/broker.conf的文件里rmqproxy容器:端口为8080和8081,这里简单的负责路由rmqdashboard容器:可视化界面,容器内端口为8080,映射到宿主机的端口为9999,所以我们要访问仪表盘直接访问 http://localhost:9999 即可
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59services:
namesrv: # NameServer配置
image: apache/rocketmq:5.3.2
container_name: rmqnamesrv
ports:
- 9876:9876
networks:
- rocketmq
command: sh mqnamesrv
broker: # broker配置
image: apache/rocketmq:5.3.2
container_name: rmqbroker
ports:
- 10909:10909
- 10911:10911
- 10912:10912
volumes:
- ./broker.conf:/home/rocketmq/conf/broker.conf
environment:
- NAMESRV_ADDR=rmqnamesrv:9876
depends_on:
- namesrv
networks:
- rocketmq
command: sh mqbroker -c /home/rocketmq/conf/broker.conf
proxy: # 代理配置,类似于gateway
image: apache/rocketmq:5.3.2
container_name: rmqproxy
networks:
- rocketmq
depends_on:
- broker
- namesrv
ports:
- 8080:8080
- 8081:8081
restart: on-failure
environment:
- NAMESRV_ADDR=rmqnamesrv:9876
command: sh mqproxy
dashboard: # 仪表盘配置
image: pangliang/rocketmq-console-ng
container_name: rmqdashboard
ports:
- 9999:8080
environment:
- JAVA_OPTS=-Drocketmq.namesrv.addr=rmqnamesrv:9876
networks:
- rocketmq
depends_on:
- namesrv
restart: on-failure
networks:
rocketmq:
driver: bridge访问http://localhost:9999 ,可以看到RocketMQ的仪表盘

alt RocketMQ可视化界面
3. RocketMQ消息发送与接收
3.1 简单生产者/消费者书写
引入rocketmq-client-java依赖
- 依赖的版本最好和RocketMQ的版本形同,这里引入的是rocketmq-client-java新API,而不是rocketmq-client旧的API
1
2
3
4
5<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client-java</artifactId>
<version>5.3.2</version>
</dependency>生产者代码
- 先写出接入点地址(endpoint)和标题(topic)
- new一个认证,里面有接入点地址
- 使用producer的工厂类初始化生产者
- 使用message的工厂来初始化消息,并发送且得到回执(receipt)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40public class SimpleProducer {
private static final Logger log = LoggerFactory.getLogger(SimpleProducer.class);
public static void main(String[] args) throws ClientException {
//1. 接入点地址,这里必须是proxy代理的地址和端口
String endpoint = "localhost:8081";
//2. 消息发送目标的Topic名称
String topic = "TopicTest";
//3. 写一下conf,里面带有接入点地址,也就是MQ的代理地址
ClientConfiguration configuration = ClientConfiguration
.newBuilder()
.setEndpoints(endpoint)
.build();
//4. 初始化Producer
Producer producer = provider
.newProducerBuilder()
.setTopics(topic)
.setClientConfiguration(configuration)
.build();
//5. 设置发送的普通消息
MessageBuilder messageBuilder = new MessageBuilderImpl();
Message message = messageBuilder
.setTopic(topic)
.setKeys("Key is me")
.setTag("Tag is null")
.setBody(("God Drinks Java" + i).getBytes())
.build();
try {
//开始发送消息,需要关注发送的结果,并捕获失败异常 receipt是回执的意思
SendReceipt receipt = producer.send(message);
log.info(receipt.toString());
}catch (Exception e) {
log.error("Fail to send message",e);
}
}
}消费者代码
- 这里我们使用**推模式(push)**来进行消费者消息的推送
- 跟生产者的思路差不多,除了topic还要指定消费者所属的消费者组
- 这里设置消费监听器来建立与broker的长连接,随时监听producer的消息
- 当我们开启推模式消费者代码时,producer每发送一条消息,comsumer就会立刻从监听器里接受.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38public class SimpleConsumer {
private static final org.slf4j.Logger log = LoggerFactory.getLogger(SimpleConsumer.class);
public static void main(String[] args) throws ClientException, InterruptedException {
String endpoint = "localhost:8081";
//2. 从namesrv里获取认证
ClientConfiguration clientConfiguration = ClientConfiguration
.newBuilder()
.setEndpoints(endpoint)
.build();
// 3. 订阅消息的过滤规则, *表示订阅所有Tag的信息
String tag = "*";
FilterExpression filter = new FilterExpression(tag, FilterExpressionType.TAG);
// 4. 指定所属的消费者组
String Group = "ConsumerGroup";
// 5. 指定订阅目标的Topic
String topic = "TopicTest";
// 6 初始化工厂类provider的接入点
// 7. 初始化PushConsumer,需要绑定消费者组,通信参数和订阅管理,这里还是使用provider工厂类来构建
PushConsumerBuilderImpl pushConsumerBuilder = new PushConsumerBuilderImpl();
PushConsumer pushConsumer = pushConsumerBuilder.setConsumerGroup(Group)
.setClientConfiguration(clientConfiguration)
.setSubscriptionExpressions(Collections.singletonMap(topic, filter))
.setMessageListener(messageView -> {
//处理消息并返回消费结果
ByteBuffer byteBuffer = messageView.getBody();
String content = StandardCharsets.UTF_8.decode(
byteBuffer.duplicate()
).toString();
log.info("messageId={} message={}", messageView.getMessageId(), content);
return ConsumeResult.SUCCESS;
})
.build();
}
}总结:
- 这是我producer发送消息的时间和我consumer接受消息的时间
- 可以看到只差0.1s,既是这样也是算上发送日志的时间
- 足以见得推模式的效率有多么快
1
218:04:59.774 [main] INFO ....
18:04:59.784 [RocketmqMessageConsumption-0-67] INFO ....- 这是我producer发送消息的时间和我consumer接受消息的时间
3.2 单生产者多消费者模式
- 有如下场景:
- 我一个主题为
topic1的生产者连续发送了10条消息,这时有两个消费者正在监听topic1。 - 这两个消费者获得的消息会是怎样的?
- 是每个人都分到了全部的消息?
- 还是平分了这十个消息?
- 我一个主题为
- 我们来实验一下
- 首先要让消费者代码能起多个实例
- 运行配置 —> 修改选项 —> 允许多个实例
我们修改一下producer部分代码,让代码连续发送十条消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17for (int i = 1; i <= 10; i++) {
//5. 设置发送的普通消息
Message message = provider
.newMessageBuilder()
.setTopic(topic)
.setKeys("Key is me")
.setTag("Tag is null")
.setBody(("God Drinks Java" + i).getBytes())
.build();
try {
//开始发送消息,需要关注发送的结果,并捕获失败异常 receipt是回执的意思
SendReceipt receipt = messageBuilder.send(message);
log.info(receipt.toString());
}catch (Exception e) {
log.error("Fail to send message",e);
}
}启动两个consumer,然后再启动producer,发现如下结果
- 可以看到producer发送的消息被两个consumer均分了,但是顺序是乱序,为什么?
18:49:22.095 [main] INFO cn.itcast.demo.simple.Producer - SendReceiptImpl{messageId=01683421218C3DD25409405E3200000000, recallHandle=}
18:49:22.106 [main] INFO cn.itcast.demo.simple.Producer - SendReceiptImpl{messageId=01683421218C3DD25409405E3300000001, recallHandle=}
18:49:22.119 [main] INFO cn.itcast.demo.simple.Producer - SendReceiptImpl{messageId=01683421218C3DD25409405E3300000002, recallHandle=}
18:49:22.126 [main] INFO cn.itcast.demo.simple.Producer - SendReceiptImpl{messageId=01683421218C3DD25409405E3300000003, recallHandle=}
18:49:22.132 [main] INFO cn.itcast.demo.simple.Producer - SendReceiptImpl{messageId=01683421218C3DD25409405E3300000004, recallHandle=}
18:49:22.139 [main] INFO cn.itcast.demo.simple.Producer - SendReceiptImpl{messageId=01683421218C3DD25409405E3300000005, recallHandle=}
18:49:22.145 [main] INFO cn.itcast.demo.simple.Producer - SendReceiptImpl{messageId=01683421218C3DD25409405E3300000006, recallHandle=}
18:49:22.152 [main] INFO cn.itcast.demo.simple.Producer - SendReceiptImpl{messageId=01683421218C3DD25409405E3300000007, recallHandle=}
18:49:22.159 [main] INFO cn.itcast.demo.simple.Producer - SendReceiptImpl{messageId=01683421218C3DD25409405E3300000008, recallHandle=}
18:49:22.166 [main] INFO cn.itcast.demo.simple.Producer - SendReceiptImpl{messageId=01683421218C3DD25409405E3300000009, recallHandle=}
18:49:22.093 [RocketmqMessageConsumption-0-84] INFO cn.itcast.demo.one2many.Consumer - messageId=01683421218C3DD25409405E3200000000 message=God Drinks Java1
18:49:22.120 [RocketmqMessageConsumption-0-26] INFO cn.itcast.demo.one2many.Consumer - messageId=01683421218C3DD25409405E3300000002 message=God Drinks Java3
18:49:22.133 [RocketmqMessageConsumption-0-41] INFO cn.itcast.demo.one2many.Consumer - messageId=01683421218C3DD25409405E3300000004 message=God Drinks Java5
18:49:22.146 [RocketmqMessageConsumption-0-43] INFO cn.itcast.demo.one2many.Consumer - messageId=01683421218C3DD25409405E3300000006 message=God Drinks Java7
18:49:22.161 [RocketmqMessageConsumption-0-40] INFO cn.itcast.demo.one2many.Consumer - messageId=01683421218C3DD25409405E3300000008 message=God Drinks Java9
18:49:22.108 [RocketmqMessageConsumption-0-76] INFO cn.itcast.demo.one2many.Consumer - messageId=01683421218C3DD25409405E3300000001 message=God Drinks Java2
18:49:22.128 [RocketmqMessageConsumption-0-77] INFO cn.itcast.demo.one2many.Consumer - messageId=01683421218C3DD25409405E3300000003 message=God Drinks Java4
18:49:22.142 [RocketmqMessageConsumption-0-78] INFO cn.itcast.demo.one2many.Consumer - messageId=01683421218C3DD25409405E3300000005 message=God Drinks Java6
18:49:22.155 [RocketmqMessageConsumption-0-79] INFO cn.itcast.demo.one2many.Consumer - messageId=01683421218C3DD25409405E3300000007 message=God Drinks Java8
18:49:22.167 [RocketmqMessageConsumption-0-80] INFO cn.itcast.demo.one2many.Consumer - messageId=01683421218C3DD25409405E3300000009 message=God Drinks Java10
如果我们想让两个consumer都接收到这十个消息,应该怎么做?
- 我们将consumer的消费者组改为group1,启动一个consumer,然后改为group2,再启动一个consumer
- 这时我们运行producer发送十条消息,发现两个consumer都接收到了10条消息
所以在单生产者多消费者消息发送中有两个方式
- 两个消费者,相同的消费者组,他们会负载均衡的接受生产者的消息
- 两个消费者,不同的消费者组,生产者会给每个组都发送相同的消息
4. 主题(Topic)
主题定义:是MQ中消息运输和存储的顶层容器,用于标识同一类业务逻辑的消息
主题作用:
- 定义数据的分类隔离:将不同业务类型的数据拆分到不同的主题中管理,通过主题实现存储和订阅的隔离性。
- 定义数据的身份和权限:消息本身是匿名无身份的,同一分类的消息使用相同的主题做身份识别和权限管理。
模型关系:

alt 主题流程与位置
主题是顶层存储,所有消息资源的定义都在主题内完成。但主题是一个逻辑概念,并不是实际的消息容器
主题由多个队列(queue)组成,先进先出
5. 消息(Message)
5.1 消息特点与分类
定义:消息是MQ中最小数据传输单元
特点:
消息不可变性:消息本质上是已经产生并确定的时间,一旦产生后,消息的内容不会发生改变。
消息持久化:RocketMQ会对消息进行持久化,即将接受到的消息存储到RocketMQ服务端的存储文件中,保证消息的可回溯性和系统故障后可恢复性。
消息类型
**Normal(普通消息)**:消息本身无特殊语义,消息之间没有关联。**FIFO(顺序消息)**:MQ通过消息分组(MessageGroup)标记一组特定消息的先后顺序,可以保证消息的投递顺序严格按照消息发送的顺序。**Delay(定时/延时消息)**:通过指定延时时间控制消息生产后不要立刻投递,而是延迟后对消费者可见。**Transaction(事务消息)**:MQ支持分布式事务消息,支持消息调用的事务一致性(简单来说就是事务完成后再发送消息)。
5.2 普通消息(Normal)
- 普通消息是最常用,最高吞吐的消息,可以保证消息100%不丢。
- 普通消息氛围三种:
- 同步消息
- 异步消息
- 单向消息
5.2.1 同步消息
同步消息特征:最常用的消息。即时性较强,重要的消息,且必须有回执的消息,例如短信,通知(转账成功)
- 我们之前写的代码就是同步消息
- 每次生产者都会生成一个
receipt也就是回执,我们在业务中就将回执返回给用户
同步消息执行逻辑:
- 将消息发给用户之后,再告诉用户成功了
5.2.2 异步消息
异步消息特征:即时性最强,对回执的实时性要求不高,可以接受延迟收到回执。
异步消息执行逻辑:
- 将消息异步丢给Netty线程去执行
- 同时给用户返回成功信息
- 等Netty线程处理完,也就是成功的发给Broker,且Broker返回了ACK
那么怎么去写异步消息呢?
- 使用producer的
sendAsync方法,获取一个future,然后在future里写异步执行的机制 - 然后把成功信息直接返回给用户
- 等Broker返回ACK时,执行回调,返回回执
- 这里返回的Throwable是检测Netty线程是否让Broker返回了ACK,如果没有返回ACK,则报错,让人员去补发消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31for (int i = 1; i <= 10; i++) {
//5. 设置发送的普通消息
MessageBuilder messageBuilder = new MessageBuilderImpl();
Message message = messageBuilder
.setTopic(topic)
.setKeys("Key is me")
.setTag("Tag is null")
.setBody(("God Drinks Java" + i).getBytes())
.build();
//消息被Netty线程抢走,在Netty线程发送消息
CompletableFuture<SendReceipt> future = producer.sendAsync(message);
//直接给用户返回成功信息
log.info("用户的订单{}发送成功了", i);
//等Netty线程把消息发给Broker,并且Broker返回ACK时,执行这里的回调
//throwable是Netty线程返回的是否Broker返回ACK的结果
future.whenCompleteAsync((Receipt, throwable) -> {
//这里用null != throwable 是因为要防止空指针
if (null != throwable) {
log.error("Failed to send message", throwable);
// Return early.
return;
}
//可以异步实现的功能
log.info("Send message successfully, messageId={}", Receipt.getMessageId());
});
}
- 使用producer的
这里回调消息也就是发送消息和返回用户成功信息是同时进行的,所以叫做异步消息
5.2.3 单向消息
- 单项消息特征:不需要有回执的消息,例如日志类的消息
5.3 顺序消息(FIFO)
- 标题: RocketMQ 5.x
- 作者: yin_bo_
- 创建于 : 2025-11-27 13:13:16
- 更新于 : 2025-12-04 17:53:41
- 链接: https://www.blog.yinbo.xyz/2025/11/27/微服务/RocketMQ/
- 版权声明: 本文章采用 CC BY-NC-SA 4.0 进行许可。


