RocketMQ学习笔记

消息队列

简介

消息队列是一种异步的服务间通信方式,适用于无服务器和微服务架构。消息在被处理和删除之前一直存储在队列上。每条消息仅可被一位用户处理一次。消息队列可被用于分离重量级处理、缓冲或批处理工作以及缓解高峰期工作负载。

使用场景

  1. 服务解耦
  2. 流量削峰
  3. 数据分发

消息队列对比

img

RocketMQ

RocketMQ是一个纯Java、分布式、队列模型的开源消息中间件,前身是MetaQ,是阿里参考Kafka特点研发的一个队列模型的消息中间件,后开源给apache基金会成为了apache的顶级开源项目,具有高性能、高可靠、高实时、分布式特点。

安装

因为RocketMQ的安装的东西有三个部分:namesrv、broker、rocketmq-dashboard ,用Docker-compose安装起来比较方便。

配置中的映射文件和日志都是用默认配置,生产环境根据是否需要持久化数据和配置决定

制作镜像步骤如下,实际使用可直接用docker启动,如果需要固定版本或者定制化,可自制镜像:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
git clone https://github.com/apache/rocketmq-docker.git
cd rocketmq-docker

cd image-build
// 使用最新版,构建镜像
sh build-image.sh 5.0.0 alpine

// 准备特定版本
cd ..
chmod 755 stage.sh
./stage.sh 5.0.0

// 运行单机
cd stages/5.0.0/
./play-docker.sh alpine

// 生成RocketMQ Dashboard Docker映像,基础镜像只支持centos
// https://dist.apache.org/repos/dist/release/rocketmq/rocketmq-dashboard/ 目录下,只有1.0.0版本
cd image-build && sh build-image-dashboard.sh 1.0.0 centos
// ps: 注意,官网的dockerfile里面安装java是基于x86_64,如果打镜像的系统不是x86_64,则可能出现java安装失败,报错找不到java包或者路径。
// mac os打镜像有些坑,dashboard的镜像,使用centos服务器打镜像,并用centos服务器启动dashboard
// 如果将srv、broker、dashbord分开安装,后续可能会由于网络问题无法正常操作,因此建议将RocketMQ安装在同一台具备docker能力的centos服务器上。

// 启动,使用下载的第三方镜像,则也将启动命令换掉,不使用官方的脚本
sh product/start-dashboard.sh 1.0.0

通过docker启动

1
2
3
4
5
6
7
git clone https://github.com/apache/rocketmq.git
// 启动NameServer
docker run -it -d --net=host apache/rocketmq ./mqnamesrv
// 启动Broker
docker run -it -d --net=host --mount source=/tmp/store,target=/home/rocketmq/store apache/rocketmq ./mqbroker -n localhost:9876
// 通过自建镜像或者下载的镜像启动dashboard
docker run -d -it --name rocketmq-dashboard -p 6765:8080 192.168.51.16:9443/dev/rocketmq-dashboard:1.0.0-centos

通过centos服务器6765端口访问管理页面。在OPS中改为Name Server9876端口,点击Cluster,可以正常看到Broker节点,并且界面不报错,则说明页面访问正常。(例如通过docker启动,网络是host,则填写本机的的9876端口。)

image-20221014092057756

基本操作

新建节点

点击 Topic - ADD

image-20221013221150174

发送消息

点击 SEND MESSAGE

image-20221013221244114

image-20221013221314797 image-20221013232752302

ps:如果出现报错,需要查看dashbordlog,原因可能是网络不通,亦或是Broker节点磁盘超过85%

获取消息

点击Message-topic,选择对应Topoc:hello_mq,选择时间范围(需要包括发送消息的时间)

image-20221013233303997

可以看到刚刚发送的消息,以及消息详情

image-20221013233325429

基本概念

RocketMQ是一个分布式的消息队列。

Rmq-structure.png

  • Producer:生产者,消息的发送者,可以做成一个集群
  • Name Server:类似注册中心,不负责数据和业务逻辑,同步Broker节点的数据信息
  • Broker:存储、传输消息的节点
  • Consumer:消费者,消息的接收者,可以做成一个集群
  • Topic:区分消息的种类,一个发送者可以发送消息给一个或者多个Topic;一个消息的接收者可以订阅一个或多个Topic消息
  • Message Queue:相当于Topic的分区,用于并行发送和接收消息

消息类型

同步发送

  1. 同步发送,线程阻塞,投递complete,阻塞结束
  2. 如果发送失败,会在默认的超时时间3s内,进行重试,最多重试2次
  3. 投递complete不代表投递成功,要check SendResult.sendStatus来判断是否投递成功
  4. SendResult里面有发送状态的枚举:sendStatus,同步消息投递有一个状态返回值
1
2
3
4
5
6
public enum SendStatus {
SEND_OK, // 发送成功
FLUSH_DISK_TIMEOUT, // 刷盘超时
FLUSH_SLAVE_TIMEOUT, // 同步slave超时
SLAVE_NOT_AVAILABLE, // 没有slave
}

只有ackSendStatus=SEND_OK才代表发送消息成功,会停止重试,

异步发送

  1. 当前线程一定要等待异步线程回调结束再关闭Producer,由于是异步,提前关闭Producer会导致未回调链接就断开。
  2. 异步消息不retry,投递失败就回调,只有消息同步发送才会retry
  3. 异步发送一般用于链路耗时较长,对RT响应时间较为敏感的业务场景,例如用户视频上传后通知启动转码服务,转码完后通知推送视频转码结果等。

单向发送

  1. 消息不可靠,性能高,只负责往服务器发送一条消息,不会重试,也不会对发送失败特殊处理
  2. 次方式发送消息过程耗时短,一般在微秒级别
发送方式 发送TPS 发送结果反馈 可靠性
同步发送 不丢失
异步发送 不丢失
单向发送 最快 可能丢失

功能行为

普通消息

用到最多,生产者关注消息发送成功即可,消费者消费到消息即可。

这种消息不保证消息的顺序,所以消息可以大规模并发地发送和消费,吞吐量高,合适大部分场景。

消息被Producer发送到集群中的某个BrokerTopic中某个Queue中,在发送端不保证顺序,只保证发送,因此发送端无法确定消息顺序。

消息被Consumer集群获取,多个Consumer同时获取多个Queue,因此也无法确保消息的消费顺序。

顺序消息

在有序事件处理、撮合交易、数据实时增量同步等场景下,异构系统间需要维持强一致的状态同步,上游的事件变更需要按照顺序传递到下游进行处理。在这类场景下使用 Apache RocketMQ 的顺序消息可以有效保证数据传输的顺序性。

如何保证消息的顺序性

Apache RocketMQ 的消息的顺序性分为两部分,生产顺序性消费顺序性

  • 生产顺序性

    Apache RocketMQ 通过生产者和服务端的协议保障单个生产者串行地发送消息,并按序存储和持久化。

    如需保证消息生产的顺序性,则必须满足以下条件:

    • 单一生产者:消息生产的顺序性仅支持单一生产者,不同生产者分布在不同的系统,即使设置相同的消息组,不同生产者之间产生的消息也无法判定其先后顺序。
    • 串行发送:Apache RocketMQ 生产者客户端支持多线程安全访问,但如果生产者使用多线程并行发送,则不同线程间产生的消息将无法判定其先后顺序。

    满足以上条件的生产者,将顺序消息发送至 Apache RocketMQ 后,会保证设置了同一消息组的消息,按照发送顺序存储在同一队列中。服务端顺序存储逻辑如下:

    • 相同消息组的消息按照先后顺序被存储在同一个队列。(通过hash之后shard,可以确保同一类型消息被发送到同一个Group中)
    • 不同消息组的消息可以混合在同一个队列中,且不保证连续。

    顺序存储逻辑

    如上图所示,消息组1和消息组4的消息混合存储在队列1中, Apache RocketMQ 保证消息组1中的消息G1-M1、G1-M2、G1-M3是按发送顺序存储,且消息组4的消息G4-M1、G4-M2也是按顺序存储,但消息组1和消息组4中的消息不涉及顺序关系。

    • 消费顺序性

      Apache RocketMQ 通过消费者和服务端的协议保障消息消费严格按照存储的先后顺序来处理。

      如需保证消息消费的顺序性,则必须满足以下条件:

      • 投递顺序

        Apache RocketMQ 通过客户端SDK和服务端通信协议保障消息按照服务端存储顺序投递,但业务方消费消息时需要严格按照接收—处理—应答的语义处理消息,避免因异步处理导致消息乱序。

        NOTE

        消费者类型为PushConsumer时, Apache RocketMQ 保证消息按照存储顺序一条一条投递给消费者,若消费者类型为SimpleConsumer,则消费者有可能一次拉取多条消息。此时,消息消费的顺序性需要由业务方自行保证。消费者类型的具体信息,请参见消费者分类

      • 有限重试

        Apache RocketMQ 顺序消息投递仅在重试次数限定范围内,即一条消息如果一直重试失败,超过最大重试次数后将不再重试,跳过这条消息消费,不会一直阻塞后续消息处理。

        对于需要严格保证消费顺序的场景,请务设置合理的重试次数,避免参数不合理导致消息乱序。

    生产顺序性和消费顺序性组合

    如果消息需要严格按照先进先出(FIFO)的原则处理,即先发送的先消费、后发送的后消费,则必须要同时满足生产顺序性和消费顺序性。

    一般业务场景下,同一个生产者可能对接多个下游消费者,不一定所有的消费者业务都需要顺序消费,您可以将生产顺序性和消费顺序性进行差异化组合,应用于不同的业务场景。例如发送顺序消息,但使用非顺序的并发消费方式来提高吞吐能力。更多组合方式如下表所示:

    生产顺序 消费顺序 顺序性效果
    设置消息组,保证消息顺序发送。 顺序消费 按照消息组粒度,严格保证消息顺序。 同一消息组内的消息的消费顺序和发送顺序完全一致。
    设置消息组,保证消息顺序发送。 并发消费 并发消费,尽可能按时间顺序处理。
    未设置消息组,消息乱序发送。 顺序消费 按队列存储粒度,严格顺序。 基于 Apache RocketMQ 本身队列的属性,消费顺序和队列存储的顺序一致,但不保证和发送顺序一致。
    未设置消息组,消息乱序发送。 并发消费 并发消费,尽可能按照时间顺序处理。

延时消息

定时消息是 Apache RocketMQ 提供的一种高级消息类型,消息被发送至服务端后,在指定时间后才能被消费者消费。通过设置一定的定时时间可以实现分布式场景的延时调度触发效果。

定时消息和延时消息本质相同,都是服务端根据消息设置的定时时间在某一固定时刻将消息投递给消费者消费。因此,下文统一用定时消息描述。

在分布式定时调度触发、任务超时处理等场景,需要实现精准、可靠的定时事件触发。使用 Apache RocketMQ 的定时消息可以简化定时调度任务的开发逻辑,实现高性能、可扩展、高可靠的定时触发能力。

使用场景:

  • 分布式定时调度
  • 任务超时处理,例如订单超时

RocketMQ发送延时消息时先把消息按照延迟时间段发送到指定的队列中(rocketmq把每种延迟时间段的消息都存放到同一个队列中)然后通过一个定时器进行轮训这些队列,查看消息是否到期,如果到期就把这个消息发送到指定topic的队列中,这样的好处是同一队列中的消息延时时间是一致的,还有一个好处是这个队列中的消息时按照消息到期时间进行递增排序的,说的简单直白就是队列中消息越靠前的到期时间越早。

事务消息

  • 事务消息:消息队列RocketMQ版提供类似XA或Open XA的分布式事务功能,通过消息队列RocketMQ版事务消息能达到分布式事务的最终一致。
  • 半事务消息:暂不能投递的消息,生产者已经成功地将消息发送到了消息队列RocketMQ版服务端,但是消息队列RocketMQ版服务端未收到生产者对该消息的二次确认,此时该消息被标记成“暂不能投递”状态,处于该种状态下的消息即半事务消息。
  • 消息回查:由于网络闪断、生产者应用重启等原因,导致某条事务消息的二次确认丢失,消息队列RocketMQ版服务端通过扫描发现某条消息长期处于“半事务消息”时,需要主动向消息生产者询问该消息的最终状态(Commit或是Rollback),该询问过程即消息回查。

事务消息

事务消息发送步骤如下:

  1. 生产者将半事务消息发送至消息队列RocketMQ版服务端。
  2. 消息队列RocketMQ版服务端将消息持久化成功之后,向生产者返回Ack确认消息已经发送成功,此时消息为半事务消息。
  3. 生产者开始执行本地事务逻辑。
  4. 生产者根据本地事务执行结果向服务端提交二次确认结果(Commit或是Rollback),服务端收到确认结果后处理逻辑如下:
    • 二次确认结果为Commit:服务端将半事务消息标记为可投递,并投递给消费者。
    • 二次确认结果为Rollback:服务端将回滚事务,不会将半事务消息投递给消费者。
  5. 在断网或者是生产者应用重启的特殊情况下,若服务端未收到发送者提交的二次确认结果,或服务端收到的二次确认结果为Unknown未知状态,经过固定时间后,服务端将对消息生产者即生产者集群中任一生产者实例发起消息回查。

事务消息回查步骤如下:

  1. 生产者收到消息回查后,需要检查对应消息的本地事务执行的最终结果。
  2. 生产者根据检查得到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤4对半事务消息进行处理。

Go操作RocketMQ

client:RocketMQ Client Go

文档:RocketMQ Go Client Introduction

官方示例非常全:example

同步生产简单消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
package main

import (
"context"
"fmt"
"os"
"strconv"

"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/primitive"
"github.com/apache/rocketmq-client-go/v2/producer"
)

// 发送普通消息
func main() {
p, _ := rocketmq.NewProducer(
// name server IP 和 port
producer.WithNsResolver(primitive.NewPassthroughResolver([]string{"192.168.51.16:9876"})),
// 重试次数
producer.WithRetry(2),
)
// 启动Producer
err := p.Start()
if err != nil {
fmt.Printf("start producer error: %s", err.Error())
os.Exit(1)
}
// 指定topic
topic := "hello_mq"

for i := 0; i < 10; i++ {
msg := &primitive.Message{
Topic: topic,
Body: []byte("Hello RocketMQ Go Client! " + strconv.Itoa(i)),
}
// 同步发送消息
res, err := p.SendSync(context.Background(), msg)

if err != nil {
fmt.Printf("send message error: %s\n", err)
} else {
fmt.Printf("send message success: result=%s\n", res.String())
}
}
// 消息发送完成,关闭Producer
err = p.Shutdown()
if err != nil {
fmt.Printf("shutdown producer error: %s", err.Error())
}
}
1
2
3
4
5
INFO[0000] the topic route info changed                  changeTo="{\"OrderTopicConf\":\"\",\"queueDatas\":[{\"brokerName\":\"DEFAULT_BROKER\",\"readQueueNums\":4,\"writeQueueNums\":4,\"perm\":7,\"topicSynFlag\":0}],\"brokerDatas\":[{\"cluster\":\"DefaultCluster\",\"brokerName\":\"DEFAULT_BROKER\",\"brokerAddrs\":{\"0\":\"192.168.51.16:10911\"}}]}" changedFrom="<nil>" topic=hello_mq
// 发送到的queueId是轮训,也就不会保证顺序性
send message success: result=SendResult [sendStatus=0, msgIds=AC10D3111A7A00000000433007080001, offsetMsgId=C0A8331000002A9F0000000000000000, queueOffset=0, messageQueue=MessageQueue [topic=hello_mq, brokerName=DEFAULT_BROKER, queueId=1]]
send message success: result=SendResult [sendStatus=0, msgIds=AC10D3111A7A0000000043300708000a, offsetMsgId=C0A8331000002A9F00000000000006AE, queueOffset=2, messageQueue=MessageQueue [topic=hello_mq, brokerName=DEFAULT_BROKER, queueId=2]]
INFO[0000] will remove client from clientMap clientID=172.16.211.17@6778

消费消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package main

import (
"context"
"fmt"
"os"

"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/consumer"
"github.com/apache/rocketmq-client-go/v2/primitive"
)

func main() {
sig := make(chan os.Signal)
// 服务端往客户端推送消息,相对于客户端pull的方式,性能更高
c, _ := rocketmq.NewPushConsumer(
// 消费组名称,通过消费组作为消费单位,消费组中的client可以并发消息
consumer.WithGroupName("testGroup"),
// name server 信息
consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{"192.168.51.16:9876"})),
)
// 订阅topic,将消息通过函数处理
err := c.Subscribe("hello_mq", consumer.MessageSelector{}, func(ctx context.Context,
msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
for i := range msgs {
fmt.Printf("subscribe callback: %v \n", msgs[i])
}

// ConsumeSuccess代表,消费成功
return consumer.ConsumeSuccess, nil
})
if err != nil {
fmt.Println(err.Error())
}
// Note: start after subscribe
err = c.Start()
if err != nil {
fmt.Println(err.Error())
os.Exit(-1)
}
// 主goroutine阻塞住
<-sig
err = c.Shutdown()
if err != nil {
fmt.Printf("shutdown Consumer error: %s", err.Error())
}
}

发送延迟消息

延迟消息用法是 msg.WithDelayTimeLevel(3)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
package main

import (
"context"
"fmt"
"os"
"strconv"

"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/primitive"
"github.com/apache/rocketmq-client-go/v2/producer"
)

// 发送延迟消息
func main() {
p, _ := rocketmq.NewProducer(
// name server IP 和 port
producer.WithNsResolver(primitive.NewPassthroughResolver([]string{"192.168.51.16:9876"})),
// 重试次数
producer.WithRetry(2),
)
// 启动Producer
err := p.Start()
if err != nil {
fmt.Printf("start producer error: %s", err.Error())
os.Exit(1)
}
// 指定topic
topic := "hello_mq"

for i := 0; i < 10; i++ {
msg := &primitive.Message{
Topic: topic,
Body: []byte("Hello RocketMQ Go Client! " + strconv.Itoa(i)),
}
//WithDelayTimeLevel set message delay time to consume. reference delay level definition: 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h delay level starts from 1. for example, if we set param level=1, then the delay time is 1s.
// level 3 ,代表10s后才会被发送到消费者
msg.WithDelayTimeLevel(3)
// 同步发送消息
res, err := p.SendSync(context.Background(), msg)

if err != nil {
fmt.Printf("send message error: %s\n", err)
} else {
fmt.Printf("send message success: result=%s\n", res.String())
}
}
// 消息发送完成,关闭Producer
err = p.Shutdown()
if err != nil {
fmt.Printf("shutdown producer error: %s", err.Error())
}
}

事务消息

构造事务生产者

1
func NewTransactionProducer(listener primitive.TransactionListener, opts ...Option) (*transactionProducer, error)

listener实例实现两个方法

1
2
3
4
5
6
7
8
9
10
type TransactionListener interface {
// When send transactional prepare(half) message succeed, this method will be invoked to execute local transaction.
// 当发送事务性准备(半)消息成功时,将调用此方法来执行本地事务。
ExecuteLocalTransaction(*Message) LocalTransactionState

// When no response to prepare(half) message. broker will send check message to check the transaction status, and this
// method will be invoked to get local transaction status.
// 当没有响应准备(半)消息时。代理将发送检查消息来检查事务状态,并调用此方法来获取本地事务状态。
CheckLocalTransaction(*MessageExt) LocalTransactionState
}

例如

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
type OrderListener struct{}

func (o *OrderListener) ExecuteLocalTransaction(*primitive.Message) primitive.LocalTransactionState {
log.Println("commit exec success")

// 提交
//return primitive.CommitMessageState
// 回滚
//return primitive.RollbackMessageState
// 通过unknown,进行回查
return primitive.UnknowState
}

func (o *OrderListener) CheckLocalTransaction(*primitive.MessageExt) primitive.LocalTransactionState {
//log.Println("callbacked and commit ...")
// 回调之后提交
//return primitive.CommitMessageState

log.Println("callbacked and rollback ...")
return primitive.RollbackMessageState
}

测试代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
// 发送事务消息
func main() {
// 事务生产者
p, _ := producer.NewTransactionProducer(&OrderListener{},
// name server IP 和 port
producer.WithNsResolver(primitive.NewPassthroughResolver([]string{"192.168.51.16:9876"})),
// 重试次数
producer.WithRetry(2))

// 启动Producer
err := p.Start()
if err != nil {
fmt.Printf("start producer error: %s", err.Error())
os.Exit(1)
}
// 指定topic
topic := "hello_mq"

msg := &primitive.Message{
Topic: topic,
Body: []byte("Hello RocketMQ Go Client! "),
}
// 发送事务消息
res, err := p.SendMessageInTransaction(context.Background(), msg)

if err != nil {
fmt.Printf("send message error: %s\n", err)
} else {
fmt.Printf("send message success: result=%s\n", res.String())
}

// 阻塞住,让RocketMQ可以执行回查
select {}

// 消息发送完成,关闭Producer
err = p.Shutdown()
if err != nil {
fmt.Printf("shutdown producer error: %s", err.Error())
}
}

注意:源码中可以看到,默认情况下,连接RocketMQ的client其实都是一份,无论是生产者还是消费者。

简单生产者

1
2
3
4
5
6
7
8
9
func NewProducer(opts ...producer.Option) (Producer, error) {
return producer.NewDefaultProducer(opts...)
}

func NewDefaultProducer(opts ...Option) (*defaultProducer, error) {
...
producer.client = internal.GetOrNewRocketMQClient(defaultOpts.ClientOptions, producer.callbackCh)
...
}

事务生产者

1
2
3
4
5
6
7
8
9
10
func NewTransactionProducer(listener primitive.TransactionListener, opts ...Option) (*transactionProducer, error) {
producer, err := NewDefaultProducer(opts...)
...
}

func NewDefaultProducer(opts ...Option) (*defaultProducer, error) {
...
producer.client = internal.GetOrNewRocketMQClient(defaultOpts.ClientOptions, producer.callbackCh)
...
}

消费者

1
2
3
4
5
6
7
8
9
10
func NewPushConsumer(opts ...consumer.Option) (PushConsumer, error) {
return consumer.NewPushConsumer(opts...)
}

func NewPushConsumer(opts ...Option) (*pushConsumer, error) {
...
dc := &defaultConsumer{
client: internal.GetOrNewRocketMQClient(defaultOpts.ClientOptions, nil),
...
}

其实都是创建client

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// clientMap是一个并发安全的map
var clientMap sync.Map

func GetOrNewRocketMQClient(option ClientOptions, callbackCh chan interface{}) RMQClient {
client := &rmqClient{
option: option,
remoteClient: remote.NewRemotingClient(option.RemotingClientConfig),
done: make(chan struct{}),
}
...
// 通过clientID获取client对象
actual, loaded := clientMap.LoadOrStore(client.ClientID(), client)
...
}

func (c *rmqClient) ClientID() string {
id := c.option.ClientIP + "@"
if c.option.InstanceName == "DEFAULT" {
// 默认情况下,是以客户端ip地址加上pid组合成clientID
id += strconv.Itoa(os.Getpid())
} else {
id += c.option.InstanceName
}
if c.option.UnitName != "" {
id += "@" + c.option.UnitName
}
return id
}

可以看到,如果一个进程中同时有多重生产者以及消费者,client是会复用的。

p.Shutdown 方法如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func (pc *pushConsumer) Shutdown() error {
var err error
pc.closeOnce.Do(func() {
close(pc.done)

pc.client.UnregisterConsumer(pc.consumerGroup)
err = pc.defaultConsumer.shutdown()
})

return err
}

func (c *rmqClient) UnregisterConsumer(group string) {
// shutdown方法,是在syncMap中删除对象
c.consumerMap.Delete(group)
}

Shutdown方法在syncMap中删除对象,因此,某一个生产者或者消费者Shutdown,则会断开该进程中所有与RocketMQ的连接。

通过RocketMQ分布式事务的案例

img

例如在购物场景中,下单功能:

  1. 查询库存状态
  2. 如果没有库存,则业务返回结果;如果有库存,则执行本地事务
  3. 执行本地事务,创建订单信息,发布事务消息恢复扣减对应订单号的库存,调用库存接口,扣减库存
  4. 执行本地事务成功,则回滚恢复库存扣减的消息
  5. 执行本地事务失败,则提交恢复库存扣减的消息
  6. 库存服务监听回滚topic,更新本地第二张订单和库存的信息,将库存恢复回来,订单状态更新为回滚状态

下单后的支付功能:

  1. 下单功能的本地事务中,增加延迟信息发送,防止定时内未支付,发送到订单topic
  2. 当支付成功,通过支付系统回调接口,更新订单状态
  3. 订单服务监听订单topic,获取订单信息,查询订单信息表,获取订单装填,如果已支付,则ConsumeSuccess,如果未支付,将订单信息发送到回滚的topic中,使订单服务回滚库存数量
  4. 如果订单更新过程失败,则将消息设置为重试ConsumeRetryLater
  5. 支付系统回调接口,在最大努力通知请款下,一般能够保障订单状态更新成功

推荐阅读:

消息队列简介

消息队列详解:ActiveMQ、RocketMQ、RabbitMQ、Kafka

《浅入浅出》-RocketMQ

RocketMQ官方网站

RocketMQ GitHub

RocketMQ延迟消息实现原理和源码分析

事务消息

RocketMQ 事务消息