ReZero's Utopia.

(转) RabbitMq 笔记

Word count: 4.1kReading time: 14 min
2020/07/03 Share

RabbitMq 基础整理

参考链接

芋道源码

csdn

概念介绍

Message acknowledgment

不存在timeout概念,一个消费者处理消息时间再长也不会导致该消息被发送给其他消费者,除非它的RabbitMQ连接断开
引发问题:忘记回执,消费者重启后会重复消费这些消息并重复执行业务逻辑

Queue

多个消费者可以订阅同一个Queue,这时Queue中的消息会被平均分摊给多个消费者进行处理,而不是每个消费者都收到所有的消息并处理。

Exchange

routing key设定的长度限制为255 bytes

在绑定(Binding)Exchange与Queue的同时,一般会指定一个binding key;消费者将消息发送给Exchange时,一般会指定一个routing key;当binding key与routing key相匹配时,消息将会被路由到对应的Queue中

exchangeTypes: fanout、direct、topic、headers

  • fanout: 发送到该Exchange的消息路由到 所有 与它绑定的Queue中

  • direct: 把消息路由到那些binding key与routing key完全匹配的Queue中

  • topic: 同direct,但增加了规则筛选:

    • routing key为一个句点号 . 分隔的字符串(我们将被句点号 .分隔开的每一段独立的字符串称为一个单词),如“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit”

    • binding key与routing key一样也是句点号“. ”分隔的字符串

    • binding key中可以存在两种特殊字符*#,用于做模糊匹配,其中 * 用于匹配一个单词,#用于匹配多个单词(可以是零个)

    • 当存在满足的多个bindingkey 时,如果处于同一个queue,则只会投递一次。

  • headers 在绑定Queue与Exchange时指定一组键值对;当消息发送到Exchange时,RabbitMQ会取到该消息的headers(也是一个键值对的形式),对比其中的键值对是否完全匹配Queue与Exchange绑定时指定的键值对;如果完全匹配则消息会路由到该Queue,否则不会路由到该Queue。

RPC

RabbitMQ中实现RPC的机制是:

- 客户端发送请求(消息)时,在消息的属性(MessageProperties,在AMQP协议中定义了14种properties,这些属性会随着消息一起发送)中设置两个值replyTo(一个Queue名称,用于告诉服务器处理完成后将通知我的消息发送到这个Queue中)和correlationId(此次请求的标识号,服务器处理完成后需要将此属性返还,客户端将根据这个id了解哪条请求被成功执行了或执行失败)

- 服务器端收到消息并处理

- 服务器端处理完消息后,将生成一条应答消息到replyTo指定的Queue,同时带上correlationId属性

-客户端之前已订阅replyTo指定的Queue,从中收到服务器的应答消息后,根据其中的correlationId属性分析哪条请求被执行了,根据执行结果进行后续业务处理

mandatory标志告诉服务器至少将该消息route到一个 队列 中,否则将消息返还给生产者: 返回动作是注册 ReturnListener 到 channel 实现。

@Deprecate immediate标记会影响镜像队列性能,增加代码复杂性,并建议采用“TTL”和“DLX”等方式替代。

TTL: 针对queue 是 x-message-ttl 参数,单独就消息而言是 expiration,用于设置消息过期时间,不设置就不会过期,两个一起用,则消息的过期时间以两者之间TTL较小的那个数值为准

利用DLX, 当消息在一个队列中变成死信(dead message)之后,它能被重新publish到另一个Exchange,这个Exchange就是DLX。消息变成死信一向有一下几种情况:

- 消息被拒绝(basic.reject/ basic.nack)并且requeue=false(标志是否重新入队)
- 消息TTL过期(参考:RabbitMQ之TTL(Time-To-Live 过期时间))
- 队列达到最大长度

x-max-priority 队列优先级,高优先级先消费

延迟队列:
场景一:在订单系统中,一个用户下单之后通常有30分钟的时间进行支付,如果30分钟之内没有支付成功,那么这个订单将进行一场处理。这是就可以使用延迟队列将订单信息发送到延迟队列。

场景二:用户希望通过手机远程遥控家里的智能设备在指定的时间进行工作。这时候就可以将用户指令发送到延迟队列,当指令设定的时间到了再将指令推送到只能设备。

延迟队列的实现:

exchange_delay_begin:这个是producer端发送时调用的exchange, 将消息发送至queue_dealy_begin中。
queue_delay_begin: 通过routingKey=”delay”绑定exchang_delay_begin, 同时配置DLX=exchange_delay_done, 当消息变成死信时,发往exchange_delay_done中。
exchange_delay_done: 死信的exchange, 如果不配置x-dead-letter-routing-key则采用原有默认的routingKey,即queue_delay_begin绑定exchang_delay_beghin采用的“delay”。
queue_delay_done:消息在TTL到期之后,最终通过exchang_delay_done发送值此queue,消费端通过消费此queue的消息,即可以达到延迟的效果。

这种方式会有队列阻塞的问题,假如前面有个大延迟任务,后面的任务就都被堵住了

correlationId: 客户端单一回调队列时用来区分请求的

RPC的处理流程:

  • 当客户端启动时,创建一个匿名的回调队列。
  • 客户端为RPC请求设置2个属性:replyTo,设置回调队列名字;correlationId,标记request。
  • 请求被发送到rpc_queue队列中。
  • RPC服务器端监听rpc_queue队列中的请求,当请求到来时,服务器端会处理并且把带有结果的消息发送给客户端。接收的队列就是replyTo设定的回调队列。
  • 客户端监听回调队列,当有消息时,检查correlationId属性,如果与request中匹配,那就是结果了。

确认 client 的确发到了 broker 的手端

  1. 事务 垃圾,不考虑

    RabbitMQ中与事务机制有关的方法有三个:txSelect(), txCommit()以及txRollback(), txSelect用于将当前channel设置成transaction模式,txCommit用于提交事务,txRollback用于回滚事务,在通过txSelect开启事务之后,我们便可以发布消息给broker代理服务器了,如果txCommit提交成功了,则消息一定到达了broker了,如果在txCommit执行之前broker异常崩溃或者由于其他原因抛出异常,这个时候我们便可以捕获异常通过txRollback回滚事务了

  2. confirm 模式

    http://www.iocoder.cn/RabbitMQ/message-confirmation-mechanism-transaction-Confirm/?self

    批量confirm模式的问题在于confirm之后返回false之后进行重发这样会使性能降低,异步confirm模式(async)编程模型较为复杂。

消息持久化

exchange, queue的持久化是通过durable=true来实现的 message 是 deliveryMode = 2

持久化未必保证消息不会丢失:

  1. 从consumer端来说,如果这时autoAck=true,那么当consumer接收到相关消息之后,还没来得及处理就crash掉了,那么这样也算数据丢失,这种情况也好处理,只需将autoAck设置为false
    (方法定义如下),然后在正确处理完消息之后进行手动ack(channel.basicAck)

  2. 消息在 cache 里还没来得及落盘

    • RabbitMQ并不是为每条消息都做fsync的处理,可能仅仅保存到cache中而不是物理磁盘上

    • 首先可以引入RabbitMQ的mirrored-queue即镜像队列,这个相当于配置了副本,当master在此特殊时间内crash掉,可以自动切换到slave,这样有效的保障了HA, 除非整个集群都挂掉,这样也不能完全的100%保障RabbitMQ不丢消息,但比没有mirrored-queue的要好很多,很多现实生产环境下都是配置了mirrored-queue的

    • 还有要在producer引入事务机制或者Confirm机制来确保消息已经正确的发送至broker端,有关RabbitMQ的事务机制或者Confirm机制可以参考:RabbitMQ之消息确认机制(事务+Confirm).

  3. 消息刷到磁盘的时间

    • 写入文件前会有一个Buffer,大小为1M,数据在写入文件时,首先会写入到这个Buffer,如果Buffer已满,则会将Buffer写入到文件(未必刷到磁盘)。

    • 有个固定的刷盘时间:25ms,也就是不管Buffer满不满,每个25ms,Buffer里的数据及未刷新到磁盘的文件内容必定会刷到磁盘。

    • 每次消息写入后,如果没有后续写入请求,则会直接将已写入的消息刷到磁盘:使用Erlang的receive x after 0实现,只要进程的信箱里没有消息,则产生一个timeout消息,而timeout会触发刷盘操作。

1
channel.queueDeclare("queue.persistent.name", durable=true, exclusive=false, autoDelete=false, null);

exclusive:排他队列,如果一个队列被声明为排他队列,该队列仅对首次申明它的连接可见,并在连接断开时自动删除。这里需要注意三点:

  1. 排他队列是基于连接可见的,同一连接的不同信道是可以同时访问同一连接创建的排他队列;

  2. “首次”,如果一个连接已经声明了一个排他队列,其他连接是不允许建立同名的排他队列的,这个与普通队列不同;

  3. 即使该队列是持久化的,一旦连接关闭或者客户端退出,该排他队列都会被自动删除的,这种队列适用于一个客户端发送读取消息的应用场景。

autoDelete:自动删除,如果该队列没有任何订阅的消费者的话,该队列会被自动删除。这种队列适用于临时队列。

Push & Pull

Mirror queue

因为message在发送之后和被写入磁盘并执行fsync之间存在一个虽然短暂但是会产生问题的时间窗。通过publisher的confirm机制能够确保客户端知道哪些message已经存入磁盘,尽管如此,一般不希望遇到因单点故障导致的服务不可用。

因为queue及其内容仅仅存储于单个节点之上,所以一个节点的失效表现为其对应的queue不可用

每一个镜像队列都包含一个master和多个slave,分别对应于不同的节点。slave会准确地按照master执行命令的顺序进行命令执行,故slave与master上维护的状态应该是相同的。除了publish外所有动作都只会向master发送,然后由master将命令执行的结果广播给slave们,故看似从镜像队列中的消费操作实际上是在master上执行的。

1
2
3
4
5
6
7
8
9
10
11
12
13
rabbitmqctl set_policy [-p Vhost] Name Pattern Definition [Priority]

-p Vhost: 可选参数,针对指定vhost下的queue进行设置
Name: policy的名称
Pattern: queue的匹配模式(正则表达式)
Definition:镜像定义,包括三个部分ha-mode, ha-params, ha-sync-mode
ha-mode:指明镜像队列的模式,有效值为 all/exactly/nodes
all:表示在集群中所有的节点上进行镜像
exactly:表示在指定个数的节点上进行镜像,节点的个数由ha-params指定
nodes:表示在指定的节点上进行镜像,节点名称通过ha-params指定
ha-params:ha-mode模式需要用到的参数
ha-sync-mode:进行队列中消息的同步方式,有效值为automatic和manual
priority:可选参数,policy的优先级

https://blog.csdn.net/u013256816/article/details/71097186

Rabbit logs

script
1
2
tail -f $RABBITMQ_HOME/var/log/rabbitmq/rabbit@$HOSTNAME.log -n 200
# 实时查看相应操作所对应的服务日志

message track

Firehose的机制是将生产者投递给RabbitMQ的消息,或者是RabbitMQ投递给消费者的消息按照指定的格式发送到默认的交换器上。这个默认的交换器的名称为amq.rabbitmq.trace,它是一个topic类型的交换器。发送到这个交换器上的消息的routingKey为publish.exchangename和deliver.queuename。其中exchangename和queuename为实际的交换器和队列的名称,分别对应生产者投递到交换器的消息和消费者从队列中获取的消息。

lazy queue

惰性队列会将接收到的消息直接存入文件系统中,而不管是持久化的或者是非持久化的,这样可以减少了内存的消耗,但是会增加I/O的使用,如果消息是持久化的,那么这样的I/O操作不可避免,惰性队列和持久化消息可谓是“最佳拍档”。注意如果惰性队列中存储的是非持久化的消息,内存的使用率会一直很稳定,但是重启之后消息一样会丢失。

Reality Analyze

  1. producer -> exchange // 通过 confirm 方式来解决

  2. exchange -> queue // 通过 mandatory 无法找到队列时 Basic.Return命令将消息返回给生产者

    备份交换器的实质就是原有交换器的一个“备胎”,所有无法正确路由的消息都发往这个备份交换器中,可以为所有的交换器设置同一个AE,不过这里需要提前确保的是AE已经正确的绑定了队列,最好类型也是fanout的。如果备份交换器和mandatory参数一起使用,那么mandatory参数无效。

  3. queue durable // 队列 durable 消息 deliveryMode=2

    在持久化的消息正确存入RabbitMQ之后,还需要有一段时间(虽然很短,但是不可忽视)才能存入磁盘之中。RabbitMQ并不会为每条消息都做同步存盘(调用内核的fsync6方法)的处理,可能仅仅保存到操作系统缓存之中而不是物理磁盘之中。如果在这段时间内RabbitMQ服务节点发生了宕机、重启等异常情况,消息保存还没来得及落盘,那么这些消息将会丢失。

    • 如果在Phase1中采用了事务机制或者publisher confirm机制的话,服务端的返回是在消息落盘之后执行的,这样可以进一步的提高了消息的可靠性。
    • 但是即便如此也无法避免单机故障且无法修复(比如磁盘损毁)而引起的消息丢失,这里就需要引入镜像队列。
    • 镜像队列相当于配置了副本,绝大多数分布式的东西都有多副本的概念来确保HA。
    • 在镜像队列中,如果主节点(master)在此特殊时间内挂掉,可以自动切换到从节点(slave),这样有效的保证了高可用性,除非整个集群都挂掉。
    • 虽然这样也不能完全的保证RabbitMQ消息不丢失(比如机房被炸。。。),但是配置了镜像队列要比没有配置镜像队列的可靠性要高很多,在实际生产环境中的关键业务队列一般都会设置镜像队列。
  4. consumer crash before deal message // autoAck=false

    • RabbitMQ不会为未确认的消息设置过期时间,它判断此消息是否需要重新投递给消费者的唯一依据是消费该消息的消费者连接是否已经断开,这么设计的原因是RabbitMQ允许消费者消费一条消息的时间可以很久很久。
    • 如果消息消费失败,也可以调用Basic.Reject或者Basic.Nack来拒绝当前消息而不是确认,如果只是简单的拒绝那么消息会丢失,需要将相应的requeue参数设置为true,那么RabbitMQ会重新将这条消息存入队列,以便可以发送给下一个订阅的消费者。如果requeue参数设置为false的话,RabbitMQ立即会把消息从队列中移除,而不会把它发送给新的消费者。

tips: requeue的消息是存入队列头部的,即可以快速的又被发送给消费,如果此时消费者又不能正确的消费而又requeue的话就会进入一个无尽的循环之中。对于这种情况,笔者的建议是在出现无法正确消费的消息时不要采用requeue的方式来确保消息可靠性,而是重新投递到新的队列中,比如设定的死信队列中,以此可以避免前面所说的死循环而又可以确保相应的消息不丢失。对于死信队列中的消息可以用另外的方式来消费分析,以便找出问题的根本。

脑裂现象

CATALOG
  1. 1. RabbitMq 基础整理
    1. 1.0.1. 参考链接
  2. 1.1. 概念介绍
    1. 1.1.1. Message acknowledgment
    2. 1.1.2. Queue
    3. 1.1.3. Exchange
    4. 1.1.4. RPC
  3. 1.2. 延迟队列的实现:
  4. 1.3. RPC的处理流程:
  5. 1.4. 确认 client 的确发到了 broker 的手端
  6. 1.5. 消息持久化
    1. 1.5.1. Push & Pull
    2. 1.5.2. Mirror queue
    3. 1.5.3. Rabbit logs
    4. 1.5.4. message track
    5. 1.5.5. lazy queue
    6. 1.5.6. Reality Analyze
    7. 1.5.7. 脑裂现象