RabbitMQ学习笔记

介绍

消息系统允许软件、应用相互连接和扩展.这些应用可以相互链接起来组成一个更大的应用,或者将用户设备和数据进行连接.消息系统通过将消息的发送和接收分离来实现应用程序的异步和解偶.

或许你正在考虑进行数据投递,非阻塞操作或推送通知。或许你想要实现发布/订阅,异步处理,或者工作队列。所有这些都可以通过消息系统实现。

RabbitMQ是一个消息代理 - 一个消息系统的媒介。它可以为你的应用提供一个通用的消息发送和接收平台,并且保证消息在传输过程中的安全。

AMQP

AMQP(高级消息队列协议)是一个网络协议。它支持符合要求的客户端应用(application)和消息中间件代理(messaging middleware broker)之间进行通信。

enter image description here

消息(message)被发布者(publisher)发送给交换机(exchange),交换机常常被比喻成邮局或者邮箱。然后交换机将收到的消息根据路由规则分发给绑定的队列(queue)。最后AMQP代理会将消息投递给订阅了此队列的消费者,或者消费者按照需求自行获取。

image-20221024105047528

Connection:AMQP是一个长连接协议。Connection被设计为长期使用的,可以携带多个Channel。

Channel:AMQP是一个多通道协议。Channel提供了一种方式,在比较重的TCP/IP连接上建立多个轻量级的连接。这会让协议对防火墙更加友好,因为端口使用是可预知的。它也意味着很容易支持流量调整和其他QoS特性。

Exchange:Exchange类能够让应用操作服务端的交换器。这个类能够让程序自己设置路由,而不是通过某些配置。不过大部分程序并不需要这个级别的复杂度,过去的中间件也不只支持这个语义。

Queue:该类用于让程序管理服务端上的消息队列。几乎所有的消费者应用都是基本步骤,至少要验证使用的消息队列是否存在。

VHost:虚拟主机是服务端的一个数据分区。在多租户使用是,可以方便进行管理。虚拟主机有自己的命名空间、交换器、消息队列等等。所有连接,只可能和一个虚拟主机建立。

Exchange:交换器是一个虚拟主机内的消息路由Agent。用于处理消息的路由信息(一般是Routing-Key),然后将其发送到消息队列或者内部服务中。交换器可能是持久化的、临时的、自动删除的。交换器把消息路由到消息队列时可以是并行的。这会创建一个消息的多个实例。

模式

队列模式:

image-20220712000102279

  • 一个生产者,一个队列,一个消费者
  • 生产者将消息发送到一个指定队列,消费者监听队列消费

工作分配模式:

image-20220712000153120

  • 一个生产者,一个队列,多个消费者
  • 生产者将消息发送到一个指定队列
  • 多个消费者同时监听一个队列,从队列中抢消息

发布、订阅模式

image-20220712000349394

  • 一个生产者,一个exchange,多个队列
  • 生产者将消息发送到exchange,由exchange发送给每一个队列,每个队列的消息是一样的
  • exchange不负责存储,如果没有绑定队列,则会将消息丢弃
  • exchange类型是fanout,发送到所有绑定到exchange的队列

路由模式

image-20220712000945238

  • 一个生产者,一个exchange,多个队列,多个消费者
  • exchange类型是direct,通过消息的routing-key将消息发送到不同的队列
  • 生产者将消息发送到exchange时需要指定routing-key,如果没有被exchange匹配上,则消息会被丢弃
  • 队列定义时,需要通过routing-keyexchange绑定,通过routing-key获取不同的消息

主题模式

image-20220712001451062

  • 一个生产者,一个exchange,多个队列,多个消费者
  • exchange类型是topic,通过消息的routing-key将消息发送到不同的队列
  • 不同的routing-key通过字符串匹配,发往不同的队列,其中*匹配一个单词,#匹配0个或更多的单词

消息队列的问题

  1. 为什么会造成重复消息

    由于网络传输等问题,在消息提交时没有收到ack,导致消息重复提交;或者在消费消息时,消费者ack时队列没有收到,这个消息被发送给多个消费者

  2. 如何解决重复消息

    不同的消息队列有不同的解决方案。

    rabbitmq中,rabbitmq不保证消息不重复,则需要业务端去重。

    • 保证每条消息都有唯一编号且保证消息处理成功与去重表的日志同时出现
    • 拿到这个消息做数据库的insert操作,给这个消息做一个唯一主键
  3. 如何保证消息的可靠性传输

    每个MQ都要从三个角度来分析:生产者丢弃消息、消息队列丢失消息、消费者丢失消息,以rabbitmq为例

    • 生产者丢失消息

      rabbitmq提供transactionconfirm模式来确保生产者不丢消息。

      transaction机制,发送消息前,开启事务(channel.txSelect()),然后发送消息,如果发送过程中出现异常,事务就会回滚(channel.txRollback()),如果发送成功,则提交事务(channel.txCommit())。缺点是吞吐下降。

      confirm机制,所有在该信道上面发布的消息都会被指派到一个唯一的ID,一旦消息被投递到所有匹配的队列之后,rabbitmq就会发送一个ack给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列。如果rabbitmq没能处理消息,则会发送一个Nack消息,生产者可以重试操作。

    • 消息队列丢失数据

      一般是开启磁盘持久化的配置。这个持久化配置可以和confirm机制配合使用,在持久化磁盘之后,再给生产者发送一个ack

      持久化的步骤:

      1. queue的持久化标识durable设置为true,则代表一个持久化队列
      2. 发送消息时,将deliveryMode=2rabbitmq挂了之后,重启后也能恢复数据
    • 消费者丢失数据

      这种情况一般是采用了自动确认模式,这种模式下,消费者会自动确认收到的消息。这时rabbitmq会立即将消息删除,这种情况下如果消费者出现异常没能处理消息,就会丢弃该消息。解决方案是采用手动确认消息。

  4. 如何确保消息的有序性

    一般通过某个算法,将需要保持先后顺序的消息放到同一个消息队列中(kafka就是Partitionrabbitmq就是queue),然后只用一个消费者去消费该队列。

    如果是消费消息有序,则需要确保消费者只有一个,或者消费过程有序,例如先消费一个消息,ack之后,再消费下一个。

    核心点是入队有序。

安装

练习使用,通过helmk8s上安装集群。前提准备sc,添加bitnamirepo库,使用bitnami中的rabbitmq包。

1
2
3
4
// pull下来,手动修改,并且做好备份
helm pull bitnami/rabbitmq
tar zxvf rabbitmq-10.1.13.tgz
cd rabbitmq/

更改配置文件

1
2
3
4
5
6
7
8
vim values.yaml

global:
storageClass: "nfs-client"
// 副本数设置为3
replicaCount: 3
service:
type: NodePort

部署

1
2
kubectl create ns rabbitmq
helm install rabbitmq -f values.yaml ./ -n rabbitmq

账号密码

1
2
echo "Username      : user"
echo "Password : $(kubectl get secret --namespace rabbitmq rabbitmq -o jsonpath="{.data.rabbitmq-password}" | base64 -d)"

通过svc映射的15672端口打开管理页面

通过golang操作rabbitmq

官方文档:rabbitmq操作指南

获取客户端

安装

1
go get github.com/rabbitmq/amqp091-go

初始化连接

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
type RabbitMQClient struct {
Conn *amqp.Connection
Channel *amqp.Channel
Queue amqp.Queue
RoutingKey string
}

func InitClient() (client RabbitMQClient, err error) {
client.Conn, err = amqp.Dial(url)
if err != nil {
return
}
client.Channel, err = client.Conn.Channel()
if err != nil {
return
}
return client, nil
}

功能函数

1
2
3
4
5
func failOnError(err error, msg string) {
if err != nil {
log.Panicf("%s: %s", msg, err)
}
}

队列模式、工作分配模式

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
conn, err := amqp.Dial(url)
if err != nil {
return
}
failOnError(err, "Failed to init a connection")
defer conn.Close()

// 消费者协程消费
// go consumer(conn, "consumer1")
// go consumer(conn, "consumer2")

pChan, err := conn.Channel()
failOnError(err, "Failed to init a publish channel")
defer pChan.Close()

queue, err := pChan.QueueDeclare(
name, // 队列名称,如果是空的,则会自动生成
false, // 是否持久化,false代表不持久化,当服务器重启,队列数据消失
false, // 是否自动删除,为 true 则设置队列为自动删除。自动删除的前提是:至少有一个消费者连接到这个队列,之后所有与这个队列连接的消费者都断开时,才会自动删除这个队列。
false, // 排他,当设置为true时,其他的connection声明、绑定、使用、清除或删除同名队列时将收到错误,当连接关闭时,队列会被自动删除。(同一个connection可以起多个channel声明队列)
false, // 与queue交互是否等待回应,一般为false
nil, // 其他参数
)
failOnError(err, "Failed to declare a queue")

for i := 0; ; i++ {
body := fmt.Sprintf("Hello World: %d", i)
err = pChan.Publish(
"", // 不指定交换机
queue.Name, // 队列名称
false, // 强制,指定交换机时有用,如果设置为true,当没有绑定与routing key匹配的队列,则消息无法发送出去
false, // 立即接受,设置为true时,如果队列对端没有消费者,则消费不会发送过去
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
})
failOnError(err, "Failed to publish a message")
time.Sleep(time.Millisecond * 100)
}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
func consumer(conn *amqp.Connection, cname string) {
cChan, err := conn.Channel()
failOnError(err, "Failed to init a consumer channel")
defer cChan.Close()

queue, err := cChan.QueueDeclare(
name, // 队列名称,如果是空的,则会自动生成
false, // 是否持久化,false代表不持久化,当服务器重启,队列数据消失
false, // 是否自动删除,为 true 则设置队列为自动删除。自动删除的前提是:至少有一个消费者连接到这个队列,之后所有与这个队列连接的消费者都断开时,才会自动删除。
false, // 排他,当设置为true时,其他的connection声明、绑定、使用、清除或删除同名队列时将收到错误,当连接关闭时,队列会被自动删除
false, // 与queue交互是否等待回应,一般为false
nil, // 其他参数
)
failOnError(err, "Failed to declare a queue")

msgs, err := cChan.Consume(
queue.Name, // queue
cname, // 消费者名称
true, // 是否自动ack,true代表获取消息之后就ack掉
false, // 排他,true代表队列消费者只能有一个,即使是同一个connection也只能有一个消费者
false, // 参数不支持
false, // 与queue交互是否等待回应,一般为false
nil, // args
)
failOnError(err, "Failed to register a consumer")

for d := range msgs {
log.Printf("%s Received a message: %s", d.ConsumerTag, d.Body)
}

return
}

发布、订阅模式

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
conn, err := amqp.Dial(url)
if err != nil {
return
}
failOnError(err, "Failed to init a connection")
defer conn.Close()

go consumerPub(conn, "consumer1")
go consumerPub(conn, "consumer2")

pChan, err := conn.Channel()
failOnError(err, "Failed to init a publish channel")
defer pChan.Close()

err = pChan.ExchangeDeclare(
exchange, // 交换机名称
"fanout", // 交换机类型
true, // 是否持久化
false, // 是否自动删除
false, // 是否是内部交换机,内部交换机外部无法发送,只有内部交换机之间能够发送消息
false, // 与交换机交互,是否要等待响应
nil, // 其他参数
)
failOnError(err, "Failed to declare a exchange")

for i := 0; ; i++ {
body := fmt.Sprintf("Hello World: %d", i)
err = pChan.Publish(
exchange, // 指定交换机 // 发送到交换机
"", // 不指定队列名称,fanout下发送到所有队列
false, // 强制,指定交换机时有用,如果设置为true,当没有绑定与routing key匹配的队列,则消息无法发送出去
false, // 立即接受,设置为true时,如果队列对端没有消费者,则消费不会发送过去
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
})
failOnError(err, "Failed to publish a message")
time.Sleep(time.Millisecond * 100)
}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
channel, err := conn.Channel()
failOnError(err, "Failed to init a consumer channel")
defer channel.Close()

q, err := channel.QueueDeclare(
name, // 队列名称,如果是空的,则会自动生成
false, // 是否持久化,false代表不持久化,当服务器重启,队列数据消失
false, // 是否自动删除,为 true 则设置队列为自动删除。自动删除的前提是:至少有一个消费者连接到这个队列,之后所有与这个队列连接的消费者都断开时,才会自动删除。
false, // 排他,当设置为true时,其他的connection声明、绑定、使用、清除或删除同名队列时将收到错误,当连接关闭时,队列会被自动删除
false, // 与queue交互是否等待回应,一般为false
nil, // 其他参数
)

// 绑定队列
err = channel.QueueBind(
q.Name, // 队列名称
"", // 不指定routing key
exchange, // 交换机名称
false, // 绑定是否等待回应
nil, // 其他参数
)
failOnError(err, "Failed to bind a queue with exchange")

msgs, err := channel.Consume(
q.Name, // queue
name, // 消费者名称
true, // 是否自动ack,true代表获取消息之后就ack掉
false, // 排他,true代表队列消费者只能有一个,即使是同一个connection也只能有一个消费者
false, // 参数不支持
false, // 与queue交互是否等待回应,一般为false
nil, // args
)

failOnError(err, "Failed to register a consumer")

for d := range msgs {
log.Printf("%s Received a message: %s", d.ConsumerTag, d.Body)
}

return

路由模式

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
conn, err := amqp.Dial(url)
if err != nil {
return
}
failOnError(err, "Failed to init a connection")
defer conn.Close()

go consumerRouting(conn, "rabbit")
go consumerRouting(conn, "dog")

pChan, err := conn.Channel()
failOnError(err, "Failed to init a publish channel")
defer pChan.Close()

err = pChan.ExchangeDeclare(
exchange, // 交换机名称
"direct", // 交换机类型,direct路由模式
true, // 是否持久化
false, // 是否自动删除
false, // 是否是内部交换机,内部交换机外部无法发送,只有内部交换机之间能够发送消息
false, // 与交换机交互,是否要等待响应
nil, // 其他参数
)
failOnError(err, "Failed to declare a exchange")

for i := 0; ; i++ {
for k, v := range message {
body := fmt.Sprintf("%s: %d", v, i)

err = pChan.Publish(
exchange, // 指定交换机 // 发送到交换机
k, // 不同的routing key,发送到不同的队列
false, // 强制,指定交换机时有用,如果设置为true,当没有绑定与routing key匹配的队列,则消息无法发送出去
false, // 立即接受,设置为true时,如果队列对端没有消费者,则消费不会发送过去
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
})
failOnError(err, "Failed to publish a message")
time.Sleep(time.Millisecond * 100)
}
}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
channel, err := conn.Channel()
failOnError(err, "Failed to init a consumer channel")
defer channel.Close()

q, err := channel.QueueDeclare(
name,
false,
false,
false,
false,
nil,
)
failOnError(err, "Failed to declare queue")

err = channel.QueueBind(
name, // 队列名称
name, // routing key ,这里使用和队列名称一样的key
exchange, // 交换机名称
false,
nil,
)
failOnError(err, "Failed to bind queue with exchange")

msgs, err := channel.Consume(
q.Name, // 队列名称
name, // 消费者名称,这里使用和队列名称一样的消费者名称
true,
false,
false,
false,
nil)
failOnError(err, "Failed to consume message")
for m := range msgs {
fmt.Printf("%s Received a message: %s\n", d.ConsumerTag, m.Body)
}

主题模式

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
var message = map[string]string{
"quick.orange.rabbit": "I am a quick.orange.rabbit",
"lazy.orange.elephant": "I am a lazy.orange.elephant",
"quick.orange.fox": "I am a quick.orange.fox",
"lazy.brown.fox": "I am a lazy.brown.fox",
"lazy.pink.rabbit": "I am a lazy.pink.rabbit",
"quick.brown.fox": "I am a quick.brown.fox",
"quick.orange.male.rabbit": "I am a quick.orange.male.rabbit",
"lazy.orange.male.rabbit": "I am a lazy.orange.male.rabbit",
}

conn, err := amqp.Dial(url)
if err != nil {
return
}
failOnError(err, "Failed to init a connection")
defer conn.Close()

go consumerTopic(conn, "queue1", "*.orange.*")
go consumerTopic(conn, "queue2", "*.*.rabbit")
go consumerTopic(conn, "queue2", "lazy.#")

pChan, err := conn.Channel()
failOnError(err, "Failed to init a publish channel")
defer pChan.Close()

err = pChan.ExchangeDeclare(
exchange, // 交换机名称
"topic", // 交换机类型,topic主题模式
true, // 是否持久化
false, // 是否自动删除
false, // 是否是内部交换机,内部交换机外部无法发送,只有内部交换机之间能够发送消息
false, // 与交换机交互,是否要等待响应
nil, // 其他参数
)
failOnError(err, "Failed to declare a exchange")

for i := 0; ; i++ {
for k, v := range message {
body := fmt.Sprintf("%s: %d", v, i)

err = pChan.Publish(
exchange, // 指定交换机 // 发送到交换机
k, // 不同的routing key,发送到不同的队列
false, // 强制,指定交换机时有用,如果设置为true,当没有绑定与routing key匹配的队列,则消息无法发送出去
false, // 立即接受,设置为true时,如果队列对端没有消费者,则消费不会发送过去
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
})
failOnError(err, "Failed to publish a message")
time.Sleep(time.Millisecond * 100)
}
}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
channel, err := conn.Channel()
failOnError(err, "Failed to init a consumer channel")
defer channel.Close()

q, err := channel.QueueDeclare(
name,
false,
false,
false,
false,
nil,
)
failOnError(err, "Failed to declare a queue")

err = channel.QueueBind(
q.Name,
key,
exchange,
false,
nil,
)
failOnError(err, "Failed to bind a queue with exchange")

msgs, err := channel.Consume(
q.Name,
name,
true,
false,
false,
false,
nil,
)
failOnError(err, "Failed to consume a message")

for m := range msgs {
fmt.Printf("%s Received a message: %s\n", m.ConsumerTag, m.Body)
}

结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
queue1 Received a message: I am a quick.orange.rabbit: 0
queue2 Received a message: I am a quick.orange.rabbit: 0
queue2 Received a message: I am a lazy.orange.elephant: 0
queue1 Received a message: I am a lazy.orange.elephant: 0
queue1 Received a message: I am a quick.orange.fox: 0
queue2 Received a message: I am a lazy.brown.fox: 0
queue2 Received a message: I am a lazy.pink.rabbit: 0
queue1 Received a message: I am a quick.orange.fox: 1
queue2 Received a message: I am a lazy.brown.fox: 1
queue2 Received a message: I am a lazy.pink.rabbit: 1
queue2 Received a message: I am a lazy.orange.male.rabbit: 1
queue2 Received a message: I am a quick.orange.rabbit: 1
queue1 Received a message: I am a quick.orange.rabbit: 1
queue2 Received a message: I am a lazy.orange.elephant: 1
queue1 Received a message: I am a lazy.orange.elephant: 1
  1. 当一个消息匹配多个key时,只会发送一个到队列
  2. 当消息没有匹配任何key时,会丢弃消息

方法说明

声明队列

1
func (ch *Channel) QueueDeclare(name string, durable bool, autoDelete bool, exclusive bool, noWait bool, args Table) (Queue, error)

name:队列名称,如果为空,会生成一个唯一随机的名称

durable:是否持久化,当服务器重启时,持久化为true,代表会队列会被恢复,为false,则会被删掉

autoDelete:队列是否自动删除,当最后一个监听被移除后,true时,会自动被删除

exclusive:排他,true时,独占队列只能由声明它们的连接访问,并且在连接关闭时将被删除。尝试声明、绑定、使用、清除或删除同名队列时,其他连接上的通道将收到错误。

noWait:是否等待,队列将假定在服务器上声明。如果满足现有队列的条件或尝试从不同连接修改现有队列,则会出现通道异常。

args:可选amqp。对于需要额外参数的交换类型,可以发送特定于服务器交换实现的参数表。

交换机声明

1
func (ch *Channel) ExchangeDeclare(name string, kind string, durable bool, autoDelete bool, internal bool, noWait bool, args Table) error

name:交换机名称,如果交换不存在,服务器将创建它。如果交换存在,服务器将验证它是否具有提供的类型、持久性和自动删除标志。

kind:类型,支持类型:direct路由模式,fanout发布订阅模式,topic主题模式,headers(headers exchange主要通过发送的request message中的header进行匹配,其中匹配规则(x-match)又分为all和any,all代表必须所有的键值对匹配,any代表只要有一个键值对匹配即可。headers exchange的默认匹配规则(x-match)是any。),

durable:是否持久化,当服务器重启时,持久化为true,代表会交换机会被恢复,为false,则会被删掉

autoDelete:交换机是否自动删除,当最后一个监听被移除后,true时,会自动被删除

internal:是否是内部交换机,如果是true,则无法通过publisher发送消息

noWait:当noWaittrue时,在不等待服务器确认的情况下声明。通道可能因错误而关闭。添加NotifyClose侦听器以响应任何异常。

args:可选amqp。对于需要额外参数的交换类型,可以发送特定于服务器交换实现的参数表。

消息发送

1
func (ch *Channel) Publish(exchange string, key string, mandatory bool, immediate bool, msg Publishing) error

exchange:指定发送的交换机,空代表使用队列的默认交换机

key:队列名称,将单个消息传递到单个队列时,可以使用队列名称的routingKey发布到默认交换。这是因为每个声明的队列都获得到默认交换的隐式路由。

mandatory:强制性

immediate:立即发布

由于发布是异步的,任何无法传递的消息都将由服务器返回。当mandatory或immediate为true时,使用带有Channel.NotifyReturn的监听者,用于在调用发布时处理任何无法传递的消息。
当mandatory为true且未绑定与key匹配的队列时,或者当immediate为true,且匹配队列中没有消费者准备接受传递时,发布可能无法传递。
这可能会在通道、连接或套接字关闭时返回错误。错误或缺少错误并不表示服务器是否已收到此发布。
如果底层套接字关闭而未从内核缓冲区中清除挂起的发布包,则发布可能无法到达代理。使所有发布内容都可能到达服务器的简单方法是始终在终止发布应用程序之前调用Connection.Close。确保所有发布到达服务器的方法是向Channel.NotifyPublish添加一个监听者,并使用Channel.Confirm将channel设置为确认模式。发布交付标签及其相应的确认从1开始。确认所有发布后退出。
当发布未返回错误且通道处于确认模式时,具有第一次确认的DeliveryTags的内部计数器从1开始。

msg:消息内容

mssage结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
type Publishing struct {
// Application or exchange specific fields,
// the headers exchange will inspect this field.
Headers Table // 应用程序或交换特定字段,标头交换将检查此字段。

// Properties
ContentType string // MIME content type 一般为txt
ContentEncoding string // MIME content encoding 编码
DeliveryMode uint8 // Transient (0 or 1) or Persistent (2) 分发模式,0或者1代表非持久化,2代表持久化
Priority uint8 // 0 to 9 优先级
CorrelationId string // correlation identifier 消息唯一ID,一般用于ACK,路由等
ReplyTo string // address to to reply to (ex: RPC) 重会队列,消息失败了返回到哪个队列
Expiration string // message expiration spec 消息到期时间,如果设置过期时间,消息设置持久化,当没有消费者,消息会持久化到队列中,过期之后就会被删掉
MessageId string // message identifier 消息ID
Timestamp time.Time // message timestamp
Type string // message type name
UserId string // creating user id - ex: "guest"
AppId string // creating application id

// The application specific payload of the message
Body []byte
}

消费消息

1
func (ch *Channel) Consume(queue string, consumer string, autoAck bool, exclusive bool, noLocal bool, noWait bool, args Table) (<-chan Delivery, error)

queue:消费队列名称

consumer:消费者由一个字符串标识,该字符串对于该频道上的所有消费者都是唯一的,并且具有范围

autoAck:为true时,服务器将在将消息写入网络之前向该消费者确认消息。动确认交付意味着,如果消费者在消息交付后无法处理交付,则某些消息可能会丢失。

exclusive:排他,服务器将确保这是此队列中的唯一使用者。当exclusive为false时,服务器将在多个消费者之间公平分配交付。

noLocal:RabbitMQ不支持noLocal标志。建议为Channel.Publish和Channel.Consume使用单独的连接。使用此参数,以避免发布时的TCP推送影响使用消息的能力,因此此参数主要用于完整性。

noWait:当noWaittrue时,不会等待消费者确认请求并立即开始投递消息。

args:可选amqp。对于需要额外参数的交换类型,可以发送特定于服务器交换实现的参数表。

投递中的信息,受Channel.Qos个数限制,将被缓冲,直到从返回的chan接收到。
当通道或连接关闭时,所有缓冲和投递中的消息将被丢弃。
当消费者标签被取消时,所有机上信息都将被发送,直到返回的通道关闭。

Return消息机制

当消息发送到交换机,通过key匹配,匹配上之后,消息会发送到对应队列中,如果没有匹配上,则会被丢弃,通过Return消息机制,可以获取到被丢弃的消息。

需要注意,消息发送时,mandatory必须设置为true,不然消息会被直接丢弃。

1
2
3
4
5
go func() {
for return_msg := range pChan.NotifyReturn(make(chan amqp.Return)) {
fmt.Printf("%+v", return_msg)
}
}()

qos限流

在非自动ack前提下,如果一定数目的消息未被确认前,不进行消费新的消息。

1
func (ch *Channel) Qos(prefetchCount int, prefetchSize int, global bool) error

prefetchSize:限制的字节大小,默认为0

prefetchCount:会告诉rabbitmq不要同时给一个消费者推送多于N个消息,即一旦有N个消息还没有ack,则该consumer将block掉,直到有一条消息ack后,再推送下一条消息。

global:如果未真,该qos的设置将用于一个connection中所有channelconsumer中,如果为false,只应用于当前channel的已存在的以及未来定义的consumer中。

自动ack下,qos不生效。

Qos控制服务器在收到送达确认之前,将尝试在网络上为消费者保留多少消息或多少字节。Qos的目的是确保服务器和客户端之间的网络缓冲区保持满。
如果预取计数大于零,则服务器将在收到确认之前向消费者传递这么多消息。当消费者使用noAck启动时,服务器会忽略此选项,因为不会预期或发送任何确认。
如果预取大小大于零,服务器将尝试在收到消费者的确认之前,将至少有那么多字节的传递刷新到网络。当消费者开始使用noAck时,此选项将被忽略。
当global为true时,这些Qos设置适用于同一连接上所有频道上的所有现有和未来消费者。如果为false,则为Channel。Qos设置将应用于此频道上的所有现有和未来消费者。
请参阅RabbitMQ Consumer Prefetch文档,了解如何在Rabbit MQ中实现全局标志,因为它与AMQP 0.9.1规范的不同之处在于,全局Qos设置的范围仅限于通道,而非连接(https://www.rabbitmq.com/consumer-prefetch.html ).
要在不同连接上从同一队列消费的消费者之间获得循环行为,请将预取计数设置为1,服务器上的下一条可用消息将传递给下一个可用消费者。
如果您的消费者工作时间相当一致,并且不超过网络往返时间的两倍,您将看到显著的吞吐量提高,从RabbitMQ上的基准测试所描述的预取计数2或稍大开始。

ack消息确认机制

在消费端确保消息已经收到,通常会使用手动ack的机制。

注册Consume时,将autoACK设置为false,使用手动ack。消费消息时,通过执行Ack实现手动ack。

ack:

1
func (d Delivery) Ack(multiple bool) error

multiple:是否批量ack,当multipletrue时,将确认当前消息和所有同channel上先前未确认的消息。这对于批量处理消息很有用。

Nack:

1
func (d Delivery) Nack(multiple bool, requeue bool) error

multiple:是否批量nack,当multipletrue时,将不确认当前消息和所有同channel上先前未确认的消息。这对于批量处理消息很有用。

requeue:是否返回给队列,当requeuetrue时,请求消息队列将此消息传递给其他消费者(如果只有一个消费者,那么会放入队尾)。如果不可能或requeue为false,则消息将被丢弃或传递到服务器配置的死信队列。

此方法不能用于将客户端不希望处理的消息选择或重新入队,而是通知服务器客户端此时无法处理此消息。
每次未自动确认的消息都必须调用Delivery.Ack,Delivery.Reject,Delivery.Nack。

通过Acknowledger接口拒绝委派的否定确认。
如果requeue为true,则将此消息排队,以便在其他通道上传递给使用者。如果重新排队为false或服务器无法对此消息进行排队,则会将其删除。
如果您正在批量处理交付,并且您的服务器支持它,请选择Delivery.Nack。
每个未自动确认的交付都必须调用Delivery.Ack、Delivery.Reject或Delivery.Nack。

Reject和Nack
消息确认可以让RabbitMQ知道消费者已经接受并处理完消息。但是如果消息本身或者消息的处理过程出现问题怎么办?需要一种机制,通知RabbitMQ,这个消息,我无法处理,请让别的消费者处理。这里就有两种机制,Reject和Nack。

Reject

Reject在拒绝消息时,可以使用requeue标识,告诉RabbitMQ是否需要重新发送给别的消费者。不重新发送,一般这个消息就会被RabbitMQ丢弃。Reject一次只能拒绝一条消息。

Nack

Nack则可以一次性拒绝多个消息。这是RabbitMQ对AMQP规范的一个扩展。

通过RejectRequeuConsumer可以看到当requeue参数设置为true时,消息发生了重新投递。

TTL队列、消息

TTL是Time To Live的缩写,也就是生存时间。

消息的过期时间,可以在发送消息时指定;

队列的过期时间,从消息入队开始计算,超过队列的超时时间,那么消息会自动清除。

Publishing.Expiration指定。

死信队列 DLX Dead-letter-Exchange

消息变成一个死信,说明这个消息没有消费者消费了。

在rabbitmq中,可以利用DLX,当消息在一个队列中变成死信之后,能被重新publish到另一个交换机,这个交换机就是DLX。

消息变成死信友这几种情况:

  • 消息被拒绝,Delivery.Reject,Delivery.Nack,并且requeue=false
  • 消息TTL过期
  • 队列达到最大长度

死信队列特点:

  • DLX也是一个正常的交换机,和一般的交换机没有区别,可以在任何队列上被指定,实际上就是设置某个队列的属性
  • 当这个队列中有死信时,rabbitmq会自动将这个消息重新发布到设置的DLX上,进而被路由到另一个队列。
  • 可以监听这个队列中消息做相应的处理,这个特性可以弥补rabbitmq 3.0以前支持的immediate参数的功能
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
func consumerRouting(conn *amqp.Connection, name string) {
channel, err := conn.Channel()
failOnError(err, "Failed to init a consumer channel")
defer channel.Close()

// 定义参数
args := amqp.Table{
// 参数中设置死信交换机名称
"x-dead-letter-exchange": "dlx_exchange",
}
// 声明消费队列
q, err := channel.QueueDeclare(
name,
false,
false,
false,
false,
args,
)
failOnError(err, "Failed to declare queue")

err = channel.QueueBind(
name, // 队列名称
name, // routing key ,这里使用和队列名称一样的key
exchange, // 交换机名称
false,
nil,
)
failOnError(err, "Failed to bind queue with exchange")

// 死信交换机和死信队列
// 声明死信交换机
err = channel.ExchangeDeclare(
"dlx_exchange",
"topic",
false,
false,
false,
false,
nil)

// 声明绑定死信交换机的队列
dlxQueue, err := channel.QueueDeclare(
"dlx_queue",
false,
false,
false,
false,
nil,
)
failOnError(err, "Failed to declare queue")

// 将死信交换机和死信队列绑定
err = channel.QueueBind(
dlxQueue.Name, // 队列名称
name, // routing key ,这里使用和队列名称一样的key
exchange, // 交换机名称
false,
nil,
)
failOnError(err, "Failed to bind queue with exchange")

msgs, err := channel.Consume(
q.Name, // 队列名称
name, // 消费者名称,这里使用和队列名称一样的消费者名称
true,
false,
false,
false,
nil)
failOnError(err, "Failed to consume message")
for m := range msgs {
fmt.Printf("%s Received a message: %s\n", m.ConsumerTag, m.Body)
}
}

高可用

Rabbitmq高可用方案是以主备的集群形式,集群有两种模式,默认模式和镜像模式。

普通集群模式

img

元数据包含以下内容:

  • 队列元数据:队列的名称及属性
  • 交换器:交换器的名称及属性
  • 绑定关系元数据:交换器与队列或者交换器与交换器
  • vhost元数据:为vhost内的队列,交换器和绑定提供命名空间及安全属性之间的绑定关系

在普通集群模式下,集群中各个节点之间只会相互同步元数据,也就是说,消息数据不会被同步。那么问题就来了,假如我们连接到 A 节点,但是消息又存储在 B 节点又怎么办呢?

不论是生产者还是消费者,假如连接到的节点上没有存储队列数据,那么内部会将其转发到存储队列数据的节点上进行存储。虽然说内部可以实现转发,但是因为消息仅仅只是存储在一个节点,那么假如这节点挂了,消息是不是就没有了?这个问题确实存在,所以这种普通集群模式并没有达到高可用的目的。

镜像队列模式

img

镜像队列模式下,节点之间不仅仅会同步元数据,消息内容也会在镜像节点间同步,可用性更高。这种方案提升了可用性的同时,因为同步数据之间也会带来网络开销从而在一定程度上会影响到性能。

slave会准确地按照maste执行命令地顺序进行动作,故slave和master上维护的状态应该是相同的。如果master由于某种原因失效,那么“资历最老”(基于slave加入cluster的时间排序)的slave会被提升为新的master。发送到镜像队列的所有消息会被同时发往 master和所有的slave上,如果此时master挂掉了,消息还会在slave上,这样slave提升为 master的时候消息也不会丢失

img

集群中的每个 Broker 节点都包含 1 个队列的 master 和 2 个队列的 slave, Q1 的负载大多都在 broker1 上,Q2 的负载大多都集中在 broker2 上,Q3 的负载大多都集中在 broker3 上,只要确保队列的 master 节点均匀散落在集群中的各个 Broker 节点即可确保很大程度的负载均衡。

master提供读写服务,在slave上的操作都会路由到master上,slave只做备份-主备切换。也就是说slave队列先将消费者的请求转发给master队列,然后再由master队列准备好数据返回给slave队列,最后由slave队列将消息返回给消费者。

img

镜像队列基本上就是一个特殊的BackingQueue,它内部包裹了一个普通的BackingQueue做本地消息持久化处理,在此基础上增加了将消息和ack复制到所有镜像的功能。所有对mirror_queue_master的操作,会通过组播GM的方式同步到各slave节点。GM负责消息的广播,mirror_queue_slave负责回调处理,而master上的回调处理是由coordinator负责完成。mirror_queue_slave中包含了普通的BackingQueue进行消息的存储,master节点中BackingQueue包含在mirror_queue_master中由AMQQueue进行调用。

缺陷

镜像队列主要的问题是消息同步的性能。由于使用了一种低效的消息复制方法,镜像队列的性能会比较低下。

镜像队列会选择一个主队列和多个从队列,主队列会将自己接收的读、写请求同步给所有从队列。当所有的从队列保存消息之后,主队列才会向生产者发送确认。如果主队列挂掉,其中一个从队列会晋升成主队列,让整个镜像队列仍然保持可用,避免消息丢失。

当一个节点下线,然后恢复上线之后,它保存的所有从队列的镜像数据都会丢失。

  • 缺陷1:此时从队列重新上线,但是它是空的,运维人员必须做出选择,是否要将数据同步到这个队列。如果选择同步,那么就意味着要将当前所有的消息从主队列同步到从队列。

  • 缺陷2:同步是阻塞的,它会让整个队列不可用。通常情况下,如果生产和消费的速度能够基本匹配,那么队列应该是没有消息堆积或者堆积非常少的,这样同步只会阻塞很短的时间。但是有时有些队列有时会存在大量堆积,可能是由于故意设计成这样,也可能是因为消费端或者下游服务非常慢或者挂掉,但是上游生产者仍然不停生产消息。

仲裁队列

RabbitMQ 3.8 版本中最重要的改动那非仲裁队列(Quorum Queues)莫属。它提供队列复制的能力,保障数据的高可用和安全性。使用仲裁队列可以在 RabbitMQ 节点间进行队列数据的复制,从而达到在一个节点宕机时,队列仍然可以提供服务的效果。

其实 RabbitMQ 已经有一个高可用队列的实现,那就是镜像队列(Mirror Queues)。在 RabbitMQ 3.8 版本问世之前,镜像队列是实现数据高可用的唯一手段,但是它有一些设计上的缺陷,这也是 RabbitMQ 提供仲裁队列的原因。

Raft 共识协议逐渐成为了工业上大量使用的分布式共识协议,仲裁队列就是基于 Raft 共识算法的一个变种。它比镜像队列更安全、性能更好。

每个仲裁队列都有多个副本,它包含一个主和多个从副本。replication factor 为 5 的仲裁队列将会有 1 个主副本和 4 个从副本。每个副本都在不通的 RabbitMQ 节点上。

客户端(生产者和消费者)只会与主副本进行交互,主副本再将这些命令复制到从副本。与镜像队列类似,从副本不与客户端进行交互,它们仅仅作为一个冗余备份,在节点挂掉或重启时提供高可用的能力。当主副本所在的节点下线,其中一个在另外节点的从副本会被选举成为主副本,继续提供服务。

图 3 - Raft 共识协议

消息复制和主副本选举的操作,需要超过半数的副本同意,所以我管它叫做仲裁队列。当生产者发送一条消息,需要超过半数的队列副本都将消息写入磁盘以后才会向生产者进行确认。这意味着少部分比较慢的副本不会影响整个队列的性能。同样地,主副本的选举也需要超过半数的副本同意才行,这会避免出现网络分区时 2 个主副本,所以说仲裁队列相对于可用性更看重一致性。

优势

  • 客户端创建队里时,增加属性即可,改动少
  • 节点上线,从从副本复制消息,复制过程非阻塞
  • 数据写入更加可靠,超过半数不会造成脑裂
  • Raft 协议比镜像队列的算法更有效率,可以提供更好的消息吞吐量。

总结起来,仲裁队列可以提供更高的性能、更好的数据安全性、更容易进行节点的滚动升级。

劣势

特性更少

这些特性在仲裁队列的第一个版本中不会提供

  • 非持久化消息
  • 排它队列
  • 队列/消息 TTL(超时时间)
  • 一些规则(Policy)不可用,只有死信队列、队列长度限制可用
  • 优先级
  • 惰性队列
  • 非全局的消息预取(Qos)
磁盘使用——写入放大

仲裁队列的磁盘和内存配置与普通队列不同。

  • 普通队列:普通队列使用“共享”存储模型,对于一条要投递到多个队列的消息,只会存储一次,其他队列只会保存这条消息的引用。也就是说,在发布-订阅模型下,一条将要投递到多个队列的消息,它的存储大小不会随着投递到的队列变多而线性增长。
  • 仲裁队列:仲裁队列使用在内存中“共享”的存储模型,在磁盘中,每条消息都会分别被存储。所以发布-订阅模型会造成更严重的写入放大,可能导致更大的磁盘使用,甚至不得不放弃使用仲裁队列。
内存使用——所有消息一直保存在内存中

仲裁队列的所有消息一直会保存在内存中,这会增加内存的使用量,最终可能导致集群不可用。如果不进行一些检查和监控,队列消息不断堆积,可能会导致生产停止(内存高水位),直到消息被消费或者从内存中删除。所以当使用仲裁队列时,设置队列的长度限制非常重要。此外还有必要用惰性队列作为仲裁队列的死信队列,通过死信交换器将这些消息转发到死信队列中。

因此,队列的规划和监控边得比普通场景下更为重要。下游(消费者和下游服务)的中断或者变慢可能导致多个队列消息堆积,需要有对应的规划和措施。你需要多少个仲裁队列、它们的写入速率时多少,当集群达到内存高水位时其他队列会不会收到影响?

失去多数节点时意味着队列不可用

如果仲裁队列超过半数的副本永久丢失,那么队列数据就永久丢失了。即便有小部分的副本仍然可用,队列仍然没有办法恢复,只能被强制删除。虽然这种场景出现的可能性较小,但是仍有这样的危险存在。所以,推荐使用可靠的磁盘,并且把复制因子设置为 5 ~ 3。

延迟

尽管仲裁队列的吞吐量更高,但是延迟也可能更高,这是由于使用了 Raft 协议。在仲裁队列中,所有消息都是持久化的,所有消息都会保存到每个副本的磁盘中。安全性是仲裁队列的主要目标。

通过kubernetes搭建高可用RabbitMQ集群

1
2
3
4
5
6
7
8
9
10
# 增加repo库
helm repo add bitnami https://charts.bitnami.com/bitnami
# 检索 rabbitmq 的helm包
helm search repo rabbitmq
# 拉包到本地,拉取指定版本,坑会少一些
helm pull bitnami/rabbitmq --version 9.0.8
# 解压
tar zxvf rabbitmq-9.0.8.tgz
# 进入目录修改配置文件
cd rabbitmq/

修改配置文件 vim values.yaml,需要修改或者注意的配置如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# 改成debug模式,可以查看到更多日志
image:
debug: true
# 配置登录账号密码
auth:
username: admin
password: "admin"
# 调整集群配置
clustering:
# rabbitmq集群意外宕机强制启动 当rabbitmq启用持久化存储时,若rabbitmq所有pod同时宕机,将无法重新启动,因此有必要提前开启clustering.forceBoot
forceBoot: true
# 调整时区
extraEnvVars:
- name: TZ
value: "Asia/Shanghai"
# 调整副本数
replicaCount: 3
# 设置持久化(这里需要注意,要提前准备好sc)
persistence:
storageClass: "local-path"
# 修改svc,为了演示过程方便,这里改成 NodePort,生产环境不建议暴露出来,维持ClusterIP即可
service:
type: NodePort
# 集群模式会自动将pod分配到不同的容器中,默认开启了软亲和性
podAntiAffinityPreset: soft
# 如果k8s的cluster名称不是默认,则需要修改
clusterDomain: cluster.local

创建ns,并且安装rabbitmq

1
2
kubectl create ns rabbitmq
helm install rabbitmq -f values.yaml ./
1
2
3
4
5
6
7
8
9
10
11
12
# kubectl get all
NAME READY STATUS RESTARTS AGE
pod/rabbitmq-0 1/1 Running 0 17m
pod/rabbitmq-1 1/1 Running 0 17m
pod/rabbitmq-2 1/1 Running 0 6m19s

NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
service/rabbitmq NodePort 10.0.204.87 <none> 5672:31568/TCP,4369:30672/TCP,25672:32739/TCP,15672:31982/TCP 17m
service/rabbitmq-headless ClusterIP None <none> 4369/TCP,5672/TCP,25672/TCP,15672/TCP 17m

NAME READY AGE
statefulset.apps/rabbitmq 3/3 17m

可能出现镜像没有拉下来,导致容器状态卡在 ErrImagePull,则更换镜像地址,从dockerhub上拉取

1
2
3
4
# 默认
docker pull docker.io/bitnami/rabbitmq:3.9.17-debian-10-r0
# 改成
docker pull bitnami/rabbitmq:3.9.17-debian-10-r0

拉取之后,修改tag即可。

1
docker tag 7bc4ce9af795 docker.io/bitnami/rabbitmq:3.9.17-debian-10-r0

排错:

当使用配置中的账号密码登录,弹出页面 你与此网站的连接不是私密连接,此时需要进入容器,重置权限

1
2
3
4
5
kubectl exec -it rabbitmq-0 bash
# 获取用户admin的权限
rabbitmqctl list_user_permissions admin
# 设置用户admin的权限,这里设置前后没有区别,但是设置之后就可以正常登录
rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"

将三个pod的端口映射出来,可以更加细致查看页面

1
2
3
kubectl port-forward --address=0.0.0.0 pod/rabbitmq-0 15672:15672 &
kubectl port-forward --address=0.0.0.0 pod/rabbitmq-1 15673:15672 &
kubectl port-forward --address=0.0.0.0 pod/rabbitmq-2 15674:15672

普通集群模式

安装完成,默认就是普通集群模式

image-20221101165313406

在node2的页面在每个节点上都创建一个queue,在其他节点页面上可以看到queue信息已经被同步过来

image-20221101174856959

queue信息可以看到是普通模式

image-20221101180228811

在node0中向所有的queue发送message,选择持久化的投递方式

image-20221101175016438

在 node1上可以看到 queue 信息

image-20221101175236843

在node1和node2上,分别获取message,都可以获取到。验证普通集群模式下,连接任何一个节点都可以使用。

将node3关机

1
kubectl scale --replicas=2 statefulset.apps/rabbitmq

node状态停止

image-20221101175508511

队列状态down

image-20221101175559718

无法通过queue2获取到message

image-20221101175624932

总结:

普通模式集群下,节点之间元数据同步,并且node之间可读写消息,但是message数据不同步,当节点宕机,数据也就无法访问。

镜像队列模式

恢复节点个数为3个

1
kubectl scale --replicas=3 statefulset.apps/rabbitmq

修改策略

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 进入rabbitmq
kubectl exec -it rabbitmq-0 bash

# 罗列当前policy,普通集群模式下,policy为空
rabbitmqctl list_policies

Listing policies for vhost "/" ...

# 设置镜像模式,同步方式改为自动
rabbitmqctl set_policy ha-all "^" '{"ha-mode":"all" , "ha-sync-mode":"automatic"}'

Setting policy "ha-all" for pattern "^" to "{"ha-mode":"all" , "ha-sync-mode":"automatic"}" with priority "0" for vhost "/" ...

# 再次查看policy
rabbitmqctl list_policies

Listing policies for vhost "/" ...
vhost name pattern apply-to definition priority
/ ha-all ^ all {"ha-mode":"all","ha-sync-mode":"automatic"} 0
1
2
3
4
5
6
7
8
9
10
11
12
13
# 策略说明
rabbitmqctl set_policy [-p ] [--priority ] [--apply-to ]
-p Vhost: 可选参数,针对指定vhost下的queue进行设置
Name: policy的名称
Pattern: queue的匹配模式(正则表达式)
Definition:镜像定义,包括三个部分ha-mode, ha-params, ha-sync-mode
ha-mode:指明镜像队列的模式,有效值为 all/exactly/nodes
all:表示在集群中所有的节点上进行镜像
exactly:表示在指定个数的节点上进行镜像,节点的个数由ha-params指定
nodes:表示在指定的节点上进行镜像,节点名称通过ha-params指定
ha-params:ha-mode模式需要用到的参数
ha-sync-mode:进行队列中消息的同步方式,有效值为automatic和manual
priority:可选参数,policy的优先级

再查看页面,可以看到队列状态变成HA

image-20221101180601759

image-20221101180629694

测试,node2上的master有queue2

image-20221101180945564

将node2关闭

1
kubectl scale --replicas=2 statefulset.apps/rabbitmq

image-20221101181444396

关闭之后,queue2的master切换到node0上

image-20221101181123405

在node1上获取queue2上的message

image-20221101181229790

在queue0、queue1、queue2上分别发送信息

image-20221101181425946

再将node2启动起来

1
kubectl scale --replicas=3 statefulset.apps/rabbitmq

image-20221101181531898

获取队列信息

image-20221101181610421

image-20221101181630242

image-20221101181640713

可以看到,新的node加进来之后,不会将原先队列的master切换到新的node上

在node3上获取信息,都可以正常获取到,验证高可用完成。

仲裁队列

创建一个仲裁队列

image-20221101181915439

仲裁信息

image-20221101181936703

发送数据

image-20221101182146762

默认将node0作为Leader,通过关闭node0的docker实现模拟node0宕机

1
2
3
4
kubectl describe pod rabbitmq-0 | grep "Container ID:"
Container ID: docker://c7b2194001c93947d5ad6dff9064347928f09093232e4714a9ad8e328a07a3c2

docker stop c7b2194001c93

image-20221101182301229

master切换

image-20221101182331051

并且可以获取到message。

推荐阅读:

RabbitMQ中文文档

RabbitMQ Tutorials

高可用RabbitMQ集群的搭建及原理分析

RabbitMQ集群和高可用方案

AMQP协议学习

AMQP-0-9-1中文规范

amqp 1.0

K8S部署RabbitMQ集群+镜像模式实现高可用

RabbitMQ集群(镜像队列)原理详解

RabbitMQ脑裂的模拟和分析

RabbitMQ 3.8 特性聚焦:仲裁队列(Quorum Queues)