RocketMQ学习笔记
消息队列
简介
消息队列是一种异步的服务间通信方式,适用于无服务器和微服务架构。消息在被处理和删除之前一直存储在队列上。每条消息仅可被一位用户处理一次。消息队列可被用于分离重量级处理、缓冲或批处理工作以及缓解高峰期工作负载。
使用场景
- 服务解耦
- 流量削峰
- 数据分发
消息队列对比
RocketMQ
RocketMQ是一个纯Java、分布式、队列模型的开源消息中间件,前身是MetaQ,是阿里参考Kafka特点研发的一个队列模型的消息中间件,后开源给apache基金会成为了apache的顶级开源项目,具有高性能、高可靠、高实时、分布式特点。
安装
因为RocketMQ的安装的东西有三个部分:namesrv、broker、rocketmq-dashboard ,用Docker-compose安装起来比较方便。
配置中的映射文件和日志都是用默认配置,生产环境根据是否需要持久化数据和配置决定
制作镜像步骤如下,实际使用可直接用docker启动,如果需要固定版本或者定制化,可自制镜像:
1 | git clone https://github.com/apache/rocketmq-docker.git |
通过docker
启动
1 | git clone https://github.com/apache/rocketmq.git |
通过centos
服务器6765
端口访问管理页面。在OPS
中改为Name Server
的9876
端口,点击Cluster
,可以正常看到Broker
节点,并且界面不报错,则说明页面访问正常。(例如通过docker
启动,网络是host
,则填写本机的的9876
端口。)
基本操作
新建节点
点击 Topic - ADD
发送消息
点击 SEND MESSAGE
ps:如果出现报错,需要查看dashbord
的log
,原因可能是网络不通,亦或是Broker节点磁盘超过85%
。
获取消息
点击Message-topic
,选择对应Topoc:hello_mq
,选择时间范围(需要包括发送消息的时间)
可以看到刚刚发送的消息,以及消息详情
基本概念
RocketMQ
是一个分布式的消息队列。
Producer
:生产者,消息的发送者,可以做成一个集群Name Server
:类似注册中心,不负责数据和业务逻辑,同步Broker节点的数据信息Broker
:存储、传输消息的节点Consumer
:消费者,消息的接收者,可以做成一个集群Topic
:区分消息的种类,一个发送者可以发送消息给一个或者多个Topic
;一个消息的接收者可以订阅一个或多个Topic
消息- Message Queue:相当于
Topic
的分区,用于并行发送和接收消息
消息类型
同步发送
- 同步发送,线程阻塞,投递
complete
,阻塞结束 - 如果发送失败,会在默认的超时时间3s内,进行重试,最多重试2次
- 投递
complete
不代表投递成功,要check SendResult.sendStatus
来判断是否投递成功 SendResult
里面有发送状态的枚举:sendStatus
,同步消息投递有一个状态返回值
1 | public enum SendStatus { |
只有ack
的SendStatus=SEND_OK
才代表发送消息成功,会停止重试,
异步发送
- 当前线程一定要等待异步线程回调结束再关闭
Producer
,由于是异步,提前关闭Producer
会导致未回调链接就断开。 - 异步消息不
retry
,投递失败就回调,只有消息同步发送才会retry
- 异步发送一般用于链路耗时较长,对RT响应时间较为敏感的业务场景,例如用户视频上传后通知启动转码服务,转码完后通知推送视频转码结果等。
单向发送
- 消息不可靠,性能高,只负责往服务器发送一条消息,不会重试,也不会对发送失败特殊处理
- 次方式发送消息过程耗时短,一般在微秒级别
发送方式 | 发送TPS | 发送结果反馈 | 可靠性 |
---|---|---|---|
同步发送 | 快 | 有 | 不丢失 |
异步发送 | 快 | 有 | 不丢失 |
单向发送 | 最快 | 无 | 可能丢失 |
功能行为
普通消息
用到最多,生产者关注消息发送成功即可,消费者消费到消息即可。
这种消息不保证消息的顺序,所以消息可以大规模并发地发送和消费,吞吐量高,合适大部分场景。
消息被Producer
发送到集群中的某个Broker
的Topic
中某个Queue
中,在发送端不保证顺序,只保证发送,因此发送端无法确定消息顺序。
消息被Consumer
集群获取,多个Consumer
同时获取多个Queue
,因此也无法确保消息的消费顺序。
顺序消息
在有序事件处理、撮合交易、数据实时增量同步等场景下,异构系统间需要维持强一致的状态同步,上游的事件变更需要按照顺序传递到下游进行处理。在这类场景下使用 Apache RocketMQ 的顺序消息可以有效保证数据传输的顺序性。
如何保证消息的顺序性
Apache RocketMQ 的消息的顺序性分为两部分,生产顺序性和消费顺序性。
生产顺序性 :
Apache RocketMQ 通过生产者和服务端的协议保障单个生产者串行地发送消息,并按序存储和持久化。
如需保证消息生产的顺序性,则必须满足以下条件:
- 单一生产者:消息生产的顺序性仅支持单一生产者,不同生产者分布在不同的系统,即使设置相同的消息组,不同生产者之间产生的消息也无法判定其先后顺序。
- 串行发送:Apache RocketMQ 生产者客户端支持多线程安全访问,但如果生产者使用多线程并行发送,则不同线程间产生的消息将无法判定其先后顺序。
满足以上条件的生产者,将顺序消息发送至 Apache RocketMQ 后,会保证设置了同一消息组的消息,按照发送顺序存储在同一队列中。服务端顺序存储逻辑如下:
- 相同消息组的消息按照先后顺序被存储在同一个队列。(通过
hash
之后shard
,可以确保同一类型消息被发送到同一个Group
中) - 不同消息组的消息可以混合在同一个队列中,且不保证连续。
如上图所示,消息组1和消息组4的消息混合存储在队列1中, Apache RocketMQ 保证消息组1中的消息G1-M1、G1-M2、G1-M3是按发送顺序存储,且消息组4的消息G4-M1、G4-M2也是按顺序存储,但消息组1和消息组4中的消息不涉及顺序关系。
消费顺序性 :
Apache RocketMQ 通过消费者和服务端的协议保障消息消费严格按照存储的先后顺序来处理。
如需保证消息消费的顺序性,则必须满足以下条件:
投递顺序
Apache RocketMQ 通过客户端SDK和服务端通信协议保障消息按照服务端存储顺序投递,但业务方消费消息时需要严格按照接收—处理—应答的语义处理消息,避免因异步处理导致消息乱序。
NOTE
消费者类型为PushConsumer时, Apache RocketMQ 保证消息按照存储顺序一条一条投递给消费者,若消费者类型为SimpleConsumer,则消费者有可能一次拉取多条消息。此时,消息消费的顺序性需要由业务方自行保证。消费者类型的具体信息,请参见消费者分类。
有限重试
Apache RocketMQ 顺序消息投递仅在重试次数限定范围内,即一条消息如果一直重试失败,超过最大重试次数后将不再重试,跳过这条消息消费,不会一直阻塞后续消息处理。
对于需要严格保证消费顺序的场景,请务设置合理的重试次数,避免参数不合理导致消息乱序。
生产顺序性和消费顺序性组合
如果消息需要严格按照先进先出(FIFO)的原则处理,即先发送的先消费、后发送的后消费,则必须要同时满足生产顺序性和消费顺序性。
一般业务场景下,同一个生产者可能对接多个下游消费者,不一定所有的消费者业务都需要顺序消费,您可以将生产顺序性和消费顺序性进行差异化组合,应用于不同的业务场景。例如发送顺序消息,但使用非顺序的并发消费方式来提高吞吐能力。更多组合方式如下表所示:
生产顺序 消费顺序 顺序性效果 设置消息组,保证消息顺序发送。 顺序消费 按照消息组粒度,严格保证消息顺序。 同一消息组内的消息的消费顺序和发送顺序完全一致。 设置消息组,保证消息顺序发送。 并发消费 并发消费,尽可能按时间顺序处理。 未设置消息组,消息乱序发送。 顺序消费 按队列存储粒度,严格顺序。 基于 Apache RocketMQ 本身队列的属性,消费顺序和队列存储的顺序一致,但不保证和发送顺序一致。 未设置消息组,消息乱序发送。 并发消费 并发消费,尽可能按照时间顺序处理。
延时消息
定时消息是 Apache RocketMQ 提供的一种高级消息类型,消息被发送至服务端后,在指定时间后才能被消费者消费。通过设置一定的定时时间可以实现分布式场景的延时调度触发效果。
定时消息和延时消息本质相同,都是服务端根据消息设置的定时时间在某一固定时刻将消息投递给消费者消费。因此,下文统一用定时消息描述。
在分布式定时调度触发、任务超时处理等场景,需要实现精准、可靠的定时事件触发。使用 Apache RocketMQ 的定时消息可以简化定时调度任务的开发逻辑,实现高性能、可扩展、高可靠的定时触发能力。
使用场景:
- 分布式定时调度
- 任务超时处理,例如订单超时
RocketMQ发送延时消息时先把消息按照延迟时间段发送到指定的队列中(rocketmq把每种延迟时间段的消息都存放到同一个队列中)然后通过一个定时器进行轮训这些队列,查看消息是否到期,如果到期就把这个消息发送到指定topic的队列中,这样的好处是同一队列中的消息延时时间是一致的,还有一个好处是这个队列中的消息时按照消息到期时间进行递增排序的,说的简单直白就是队列中消息越靠前的到期时间越早。
事务消息
- 事务消息:消息队列RocketMQ版提供类似XA或Open XA的分布式事务功能,通过消息队列RocketMQ版事务消息能达到分布式事务的最终一致。
- 半事务消息:暂不能投递的消息,生产者已经成功地将消息发送到了消息队列RocketMQ版服务端,但是消息队列RocketMQ版服务端未收到生产者对该消息的二次确认,此时该消息被标记成“暂不能投递”状态,处于该种状态下的消息即半事务消息。
- 消息回查:由于网络闪断、生产者应用重启等原因,导致某条事务消息的二次确认丢失,消息队列RocketMQ版服务端通过扫描发现某条消息长期处于“半事务消息”时,需要主动向消息生产者询问该消息的最终状态(Commit或是Rollback),该询问过程即消息回查。
事务消息发送步骤如下:
- 生产者将半事务消息发送至消息队列
RocketMQ
版服务端。 - 消息队列
RocketMQ
版服务端将消息持久化成功之后,向生产者返回Ack
确认消息已经发送成功,此时消息为半事务消息。 - 生产者开始执行本地事务逻辑。
- 生产者根据本地事务执行结果向服务端提交二次确认结果(
Commit
或是Rollback
),服务端收到确认结果后处理逻辑如下:- 二次确认结果为
Commit
:服务端将半事务消息标记为可投递,并投递给消费者。 - 二次确认结果为
Rollback
:服务端将回滚事务,不会将半事务消息投递给消费者。
- 二次确认结果为
- 在断网或者是生产者应用重启的特殊情况下,若服务端未收到发送者提交的二次确认结果,或服务端收到的二次确认结果为
Unknown
未知状态,经过固定时间后,服务端将对消息生产者即生产者集群中任一生产者实例发起消息回查。
事务消息回查步骤如下:
- 生产者收到消息回查后,需要检查对应消息的本地事务执行的最终结果。
- 生产者根据检查得到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤4对半事务消息进行处理。
Go操作RocketMQ
client:RocketMQ Client Go
文档:RocketMQ Go Client Introduction
官方示例非常全:example
同步生产简单消息
1 | package main |
1 | INFO[0000] the topic route info changed changeTo="{\"OrderTopicConf\":\"\",\"queueDatas\":[{\"brokerName\":\"DEFAULT_BROKER\",\"readQueueNums\":4,\"writeQueueNums\":4,\"perm\":7,\"topicSynFlag\":0}],\"brokerDatas\":[{\"cluster\":\"DefaultCluster\",\"brokerName\":\"DEFAULT_BROKER\",\"brokerAddrs\":{\"0\":\"192.168.51.16:10911\"}}]}" changedFrom="<nil>" topic=hello_mq |
消费消息
1 | package main |
发送延迟消息
延迟消息用法是 msg.WithDelayTimeLevel(3)
1 | package main |
事务消息
构造事务生产者
1 | func NewTransactionProducer(listener primitive.TransactionListener, opts ...Option) (*transactionProducer, error) |
listener实例实现两个方法
1 | type TransactionListener interface { |
例如
1 | type OrderListener struct{} |
测试代码
1 | // 发送事务消息 |
注意:源码中可以看到,默认情况下,连接RocketMQ
的client其实都是一份,无论是生产者还是消费者。
简单生产者
1 | func NewProducer(opts ...producer.Option) (Producer, error) { |
事务生产者
1 | func NewTransactionProducer(listener primitive.TransactionListener, opts ...Option) (*transactionProducer, error) { |
消费者
1 | func NewPushConsumer(opts ...consumer.Option) (PushConsumer, error) { |
其实都是创建client
1 | // clientMap是一个并发安全的map |
可以看到,如果一个进程中同时有多重生产者以及消费者,client
是会复用的。
而 p.Shutdown
方法如下
1 | func (pc *pushConsumer) Shutdown() error { |
Shutdown
方法在syncMap
中删除对象,因此,某一个生产者或者消费者Shutdown
,则会断开该进程中所有与RocketMQ
的连接。
通过RocketMQ分布式事务的案例
例如在购物场景中,下单功能:
- 查询库存状态
- 如果没有库存,则业务返回结果;如果有库存,则执行本地事务
- 执行本地事务,创建订单信息,发布事务消息恢复扣减对应订单号的库存,调用库存接口,扣减库存
- 执行本地事务成功,则回滚恢复库存扣减的消息
- 执行本地事务失败,则提交恢复库存扣减的消息
- 库存服务监听回滚topic,更新本地第二张订单和库存的信息,将库存恢复回来,订单状态更新为回滚状态
下单后的支付功能:
- 下单功能的本地事务中,增加延迟信息发送,防止定时内未支付,发送到订单
topic
中 - 当支付成功,通过支付系统回调接口,更新订单状态
- 订单服务监听订单
topic
,获取订单信息,查询订单信息表,获取订单装填,如果已支付,则ConsumeSuccess
,如果未支付,将订单信息发送到回滚的topic
中,使订单服务回滚库存数量 - 如果订单更新过程失败,则将消息设置为重试
ConsumeRetryLater
- 支付系统回调接口,在最大努力通知请款下,一般能够保障订单状态更新成功
推荐阅读:
消息队列简介
消息队列详解:ActiveMQ、RocketMQ、RabbitMQ、Kafka
《浅入浅出》-RocketMQ
RocketMQ官方网站
RocketMQ GitHub
RocketMQ延迟消息实现原理和源码分析
事务消息
RocketMQ 事务消息