RocketMQ

yin_bo_ Lv2

1. MQ概述

  • MQ(MessageQueue):既消息队列,是在消息传输的过程中保存消息的容器,多用于分布式系统之间通信。其中队列是数据结构的一种,特征为先进先出

alt MQ工作流程
alt MQ工作流程

  • MQ的优势与劣势:
    • 优势
      1. 应用解耦
        • 系统的耦合性越高,容错性和可维护性就越低
        • 所以解耦之后消费方和生产方就没有关系了,消费方存活与否不影响生产方
      2. 异步提速
        • 生产方发完消息,不需要等消费方回应就可以继续下一步业务逻辑
      3. 削峰填谷
        • 高并发时将请求暂存队列,按消费者处理能力平稳消费,避免瞬时流量压垮下游
    • 劣势
      1. 系统可用性降低
        • 一旦MQ宕机,就会对业务产生影响
      2. 系统复杂性提高
        • 以前是系统间同步远程调用,现在是通过MQ进行异步调用,大大增加了系统复杂度
      3. 一致性问题
        • A系统处理完业务,通过MQ给B C D三个系统发消息数据,如果B C系统处理成功 D系统处理失败,就会产生消息不一致

2. RocketMQ入门

2.1 RocketMQ介绍

  • 主流的MQ产品:

    1. ActiveMQ
      • Java语言实现,万级数据吞吐量,处理速度ms级。主从架构,成熟度高 (现在基本没人用了)
    2. RabbitMQ
      • erlang语言实现,万级数据吞吐量,处理速度us级,主从架构
    3. RocketMQ
      • java语言实现,十万级数据吞吐量,处理速度ms级,分布式架构,功能强大,拓展性强
    4. kafka
      • scala语言实现,十万级数据吞吐量,处理速度ms级,分布式架构,功能较少,应用于大数据较多
  • RocketMQ是阿里开源的一块非常优秀的中间件产品,他现在已经在阿里内部被广泛的应用,并且经受住了多次双十一这种极致场景的压力(2017年双十一,RocketMQ流转的消息量带到了万亿级,峰值TPS达到5600万)

2.2 RocketMQ工作原理

  • 四大核心角色

    • Producer:消息生产者
    • Consumer:消息消费者
    • Broker:消息存储与转发服务(真正干活的角色)
    • NameServer:轻量级注册中心,类似“微型 Nacos”,提供路由发现与心跳检测
  • 生产者集群发消息到**消息服务器集群(Broker)**当中,Broker收到消息立刻给生产者返回ACK(确认),消息异步的推送/拉取给消费者

  • 这里将消息返回给消费者有两种方式

    1. 拉取模式:消费者每秒发起请求去Broker拉取数据
    2. 推送模式:消费者和Broker建立一个长连接,监听着队列,如果有消息就返回给消费者
  • 命名服务器集群(NameServer)去管理消息队列的IP,来实现多Broker组成集群(类似于Nacos),能够让消费者和生产者获取Broker集群的信息,从而让它们发送到特定的消息队列(负载均衡)

    • 如果消费者或者生产者有一个宕机了,我们的命名服务器会根据**心跳检测(也是注册中心的东西)**去查询它们的状态,从而避免将消息发送给宕机的消费者
  • 命名服务器集群和消息服务器集群组成了MQ也就是消息队列

alt MQ工作原理
alt MQ工作原理

2.3 RocketMQ安装

这里我们学过Docker了,所以直接在我们Ubuntu上的Docker安装RocketMQ就行了😊

在安装之前请保证您的服务器已经安装好Docker环境并且安装好JDK

如何使用Docker安装RocketMQ

  • 我们之前已经学过微服务了,那么应该知道,在部署微服务模块之前部署注册注册中心(如Nacos)的话,会报错,因为微服务找不到注册的地方。
  • RocketMQ分为NameServer和Broker两部分,这里NameServer就是RocketMQ的注册中心,所以我们需要先部署NameServer容器
  1. 拉取RocketMQ的镜像

    1
    docker pull rocketmqinc/rocketmq
    • 拉取成功后使用docker images查看镜像是否下载成功
  2. 创建nameserver数据存储目录

    • 之前分析了要先部署nameserver容器,我们这里先创建nameserver的日志和数据存放目录。这个目录我将其放在了data目录下
    1
    mkdir -p /data/rocketmq/logs /data/rocketmq/store
  3. 构建nameserver容器并启动

    • 我们已经创建好了nameserver的日志和数据的存放路径,现在挂载他们,执行以下命令启动nameserver即可
    1
    2
    3
    4
    5
    6
    7
    docker run -d  \
    --restart=always --name rmqnamesrv -p 9876:9876 \
    -v /data/rocketmq/logs:/root/logs \
    -v /data/rocketmq/store:/root/store \
    -e "MAX_POSSIBLE_HEAP=100000000" \
    rocketmqinc/rocketmq \
    sh mqnamesrv \
    • 分析以下以上的代码
      • -d:后台运行
      • --name:nameserver容器名字为rmqnamesrv
      • -p 9876:9876:因为NameServer监听的是9876,这里将他对外暴露
      • --restart=always:主机重启或者容器崩溃会自动拉起
      • -v ...:数据卷挂载到宿主机
      • -e ...:进程的Java堆内存上线改为100MB
      • sh mqnamesrv:启动mqnamesrv容器
    • 以后的docker命令就不会说的这么详细了🫥
    • 执行完之后可以通过docker ps查看是否启动成功
  4. 创建broker数据存储目录

    • 接下来我们创建broker数据卷挂载目录和配置文件
    1
    mkdir -p /data/rocketmqbroker/logs  /data/rocketmqbroker/store /data/rocketmqbroker/conf 

    logs:是broker的日志目录,store:是broker的数据目录,conf是broker的配置信息目录

  5. 创建broker配置文件

    • 我们之前创建的broker配置文件目录的路径是/data/rocketmqbroker/conf,我们在这里写一个broker配置文件,命名为broker.conf
    1
    2
    cd /data/rocketmqbroker/conf
    nano broker.conf
    • 然后写一下配置文件,注意:这里修改namesrvAddr和brokerIP1(如果以后有集群全都要修改)的IP地址为我们WSL虚拟机的的IP地址
    1
    2
    # 查询IP地址
    hostname -I
    • 下面是配置文件的信息,复制完了记得修改IP地址
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


# 所属集群名字
brokerClusterName=DefaultCluster
# broker 名字,注意此处不同的配置文件填写的不一样,如果在 broker-a.properties 使用: broker-a,
# 在 broker-b.properties 使用: broker-b
brokerName=broker-a
# 0 表示 Master,> 0 表示 Slave
brokerId=0
# nameServer地址,分号分割
namesrvAddr=你的IP地址:9876
# 启动IP,如果 docker 报 com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.0.120:10909> failed
# 解决方式1 加上一句 producer.setVipChannelEnabled(false);,解决方式2 brokerIP1 设置宿主机IP,不要使用docker 内部IP
#producer.setVipChannelEnabled=false
brokerIP1=你的IP地址

# 在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
# 是否允许 Broker 自动创建 Topic,建议线下开启,线上关闭 !!!这里仔细看是 false,false,false
autoCreateTopicEnable=true
# 是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
# Broker 对外服务的监听端口
listenPort=10911
# 删除文件时间点,默认凌晨4点
deleteWhen=04
# 文件保留时间,默认48小时
fileReservedTime=120
# commitLog 每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
# ConsumeQueue 每个文件默认存 30W 条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
# destroyMapedFileIntervalForcibly=120000
# redeleteHangedFileInterval=120000
# 检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
# 存储路径
# storePathRootDir=/home/ztztdata/rocketmq-all-4.1.0-incubating/store
# commitLog 存储路径
# storePathCommitLog=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/commitlog
# 消费队列存储
# storePathConsumeQueue=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/consumequeue
# 消息索引存储路径
# storePathIndex=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/index
# checkpoint 文件存储路径
# storeCheckpoint=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/checkpoint
# abort 文件存储路径
# abortFile=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/abort
# 限制的消息大小
maxMessageSize=65536
# flushCommitLogLeastPages=4
# flushConsumeQueueLeastPages=2
# flushCommitLogThoroughInterval=10000
# flushConsumeQueueThoroughInterval=60000
# Broker 的角色
# - ASYNC_MASTER 异步复制Master
# - SYNC_MASTER 同步双写Master
# - SLAVE
brokerRole=ASYNC_MASTER
# 刷盘方式
# - ASYNC_FLUSH 异步刷盘
# - SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
# 发消息线程池数量
# sendMessageThreadPoolNums=128
# 拉消息线程池数量
# pullMessageThreadPoolNums=128
  • 查看我们的配置文件,获得了以下信
    1. 我们broker集群的名字为DefaultCluster
    2. 我们这个broker的名字是broker-a
    3. 我们这个broker为主机
  1. 构造broker容器并启动

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    docker run -d \
    --restart=always --name rmqbroker01 \
    --link rmqnamesrv:namesrv \
    -p 10911:10911 -p 10909:10909 \
    -v /data/rocketmqbroker/logs:/root/logs \
    -v /data/rocketmqbroker/store:/root/store \
    -v /data/rocketmqbroker/conf/broker.conf:/opt/rocketmq-4.4.0/conf/broker.conf \
    -e "NAMESRV_ADDR=namesrv:9876" \
    -e "MAX_POSSIBLE_HEAP=200000000" \
    apache/rocketmq:5.3.2 \
    sh mqbroker \
    -c /home/rocketmq/broker.conf \
    • 通过docker ps查看是否有rmqbroker01进程运行,如果运行了说明rocketMQ已经成功运行了😊
  2. 拉取RocketMQ-Console可视化界面镜像

    • 我们还需要一个可视化界面来监视RocketMQ的进程
    1
    docker pull pangliang/rocketmq-console-ng
  3. 构建RocketMQ-Console容器并启动

    • 记得改IP
    1
    2
    3
    4
    5
    6
    7
    8
    docker run \
    -d --restart=always \
    --name rmqadmin \
    -e "JAVA_OPTS= \
    -Drocketmq.namesrv.addr=你的IP:9876 \
    -Dcom.rocketmqsendMessageWithVIPChannel=false" \
    -p 9999:8080 \
    pangliang/rocketmq-console-ng \
    • 代码解析
      1. -Drocketmq.namesrv.addr=你的IP:9876:告诉控制台我们的nameserver在哪
      2. -Dcom.rocketmqsendMessageWithVIPChannel=false":控制台发测试消息时关闭VIP通道
  4. 通过访问9999端口查看RocketMQ-Console是否启动成功

    alt RocketMQ可视化界面
    alt RocketMQ可视化界面

  • 标题: RocketMQ
  • 作者: yin_bo_
  • 创建于 : 2025-11-27 13:13:16
  • 更新于 : 2025-11-27 18:36:56
  • 链接: https://www.blog.yinbo.xyz/2025/11/27/微服务/RocketMQ/
  • 版权声明: 本文章采用 CC BY-NC-SA 4.0 进行许可。