消息队列是什么?

计算机科学中,消息队列(英语:Message queue)是一种进程间通信或同一进程的不同线程间的通信方式,软件贮列用来处理一系列的输入,通常是来自用户。消息队列提供了异步通信协议,每一个贮列中的纪录包含详细说明的资料,包含发生的时间,输入设备的种类,以及特定的输入参数,也就是说:消息的发送者和接收者不需要同时与消息队列交互。消息会保存在队列中,直到接收者取回它。

-- Wiki

上述定义中一共提到了三个角色,包括生产者,接收者以及消息队列(代理)。生产者向消息队列中插入消息,消息队列在消费者对消息进行消费前可以进行缓存,消费者在从队列中取出消息然后进行处理。三者之间主要根据消息通信协议定义的一系列行为进行交互,常见的通信协议包括 AMQP, XMPP, STOMP 等。

目前被广泛采用的消息队列包括 ActiveMQ, RabbitMQ 以及 Kafka 等,都是由上述要素构成的。

为什么需要消息队列?

在谈到为什么需要消息队列前,我们先了解下在软件开发过程中会遇到哪些问题?

  • 服务/代码高耦合
  • 瞬时的并发高峰导致服务过载。
  • 进程/微服务间同步通信时要求通信双方同时在线,可用性差。
  • 进程/微服务间同步通信时需要知道对方的网络地址,因为滚动升级;伸缩等因素,需要引入服务发现机制,增加了系统整体复杂度。

假设一个外卖 App 一共拥有下单,付款以及配送三个微服务,当用户在前台下单时,三个微服务之间通过链式调用的方式完成订单创建,付款以及分配骑手等功能。

图1.外卖 App 服务通信

关于服务/代码高耦合:假设我们现在需要加入一个 BI 服务,对订单的付款数据进行统计。我们既可以引入离线数据库同步付款数据,也可以搭建一个付款服务数据库的只读从库来满足需求。但是在当前场景下,假设因为一些因素限制我们必须通过订单服务在向付款服务发起 pay 请求时,同时也向 BI 服务发送订单付款信息。这就相当于对于同一份信息,一个服务要将其副本同步给多个下游,随着下游的个数的增加,就会出现服务之间互相耦合的程度不断加深,调用的链路更长以及响应速度越来越慢等问题。如果订单服务需要更改 pay 请求内容,就必须对多个下游进行向后兼容,这也大大增加了开发难度。

关于瞬时的并发高峰导致服务过载:某天因为送券促销活动,吸引了一批新顾客下单,付款服务收到大量 pay 请求,导致数据库过载,单个 pay 请求的响应时间变长甚至无法响应而超时。订单服务因得不到响应而一直保持连接,并继续向付款服务发起请求,最终也因为同时开启大量的连接而耗尽资源,变得无法响应。

解耦 + 削峰 + 异步

消息队列通过解耦,削峰以及异步的方式有效解决了上述问题。

图2.具有消息队列的外卖 App 服务通信

解耦是指无论是作为生产者的订单服务,还是作为消费者的配送和付款服务都只需要和消息队列交互,无论多少个消费者需要消费订单信息,订单服务只需要发送一次到消息队列中。消息队列同时具备消息缓存的能力,能够在消息到达后且消费者消费前,对消息暂存,消费者可以根据自己的速率进行消费,不会因为瞬时并发而过载,这就是削峰。而异步则避免避免了同步通信导致的可用性差的问题,只要消息队列可用,就不必要求生产者与消费者同时在线。

消息队列的原理与机制

虽然前面我们讲到消息队列通过解耦,异步和削峰三把杀器解决了一系列问题,但是引入消息队列本身也可能会带来消息丢失,重复消息以及消息队列故障等问题。那么消息队列本身又是通过哪些机制来解决这些问题的呢?下面我们将从 Kafka 和 RabbitMQ 来逐步探索消息队列在可靠投递,重复消费以及可用性等方面的原理。

可靠投递

熟悉计算机网络的朋友都知道,TCP 为保证按序且可靠投递采用了按序确认以及超时重传等机制,每个 TCP 包都有对应的序列号,接收方在收到后对对应的序列号进行 ACK,发送方收到 ACK 后继续发送剩余内容。如果在传输过程中出现丢包或者发送方长时间没有收到 ACK,发送方就会重新投递。其实,消息队列的做法也是如此。消息队列也是采用了 ACK(消息确认机制)。其中可靠投递包括发布确认和接收确认两个环节。

图3.可靠投递

其中发布者在发送消息到消息队列后,需要等待消息队列的确认,否则重发。而消息队列在投递消息给消费者以后,同时等待消息者的接收确认。标准的 AMQP 0-9-1 协议不包含发布确认的约束,发布者只能通过事务的方式保证一组消息成功发布,因为事物提交/回滚等开销太大,严重影响了 RabbitMQ 的吞吐性能,于是便引入基于 ACK 的 publish confirm 机制。

因为重传等因素,消息队列保证消息的 at least once 投递,这意味着对于同一条消息存在重复消费的风险。当消费者消费完消息后,却在确认消息前而崩溃,此时消息队列会因为未收到 ACK 而向其他消费者或恢复后的原消费者再次投递,这就造成了同一条消息被多次消费的问题。通过保证消息消费逻辑幂等和原子性可以避免重复消费的问题。

按序到达

除了保证消息可靠投递外,保证消息按序到达也同样重要。对于同一个订单,如果订单服务先收到 cancel 消息,然后才收到 purchase 消息,那么会造成整个服务混乱。

在 RabbitMQ 中,queue 作为消息的载体,是一个 FIFO(先进先出)的队列,这就确保同一个队列内的有序状态。但是当多个消费者同时消费同一个队列时,同样也存在乱序消费的问题。比如 A 消费者和 B 消费者同时订阅了订单队列,此时 A 消费收到1号订单的 purchase 消息,但是在消费过程中阻塞了,B 消费者从队列中取出 cancel 消息并迅速完成了消费逻辑,这就造成了乱序问题。RabbitMQ 3.8 版本引入了 Single Active Consumer 机制,保证每个 queue 同时中只有一个活跃的消费者在消费,其他绑定相同 queue 的消费者只有在前者死亡的时候才会被激活。
而 Kafka 则通过 partition 存储消息,每个 partition 底层都可被看作为一个日志,日志只能通过增量的方式写入,这就保证了消息在 partition 内是有序的。同时,每个 partition 只能被相同 Consumer Group 内的一个消费者独占消费,这也避免了因为同时消费而可能导致的乱序问题。

高可用

消息队列的出现虽然避免了发布者和消费者之间必须同时在线而带来的可用性问题,但是消息队列本身不可用则会导致更加严重的可用性问题。除了服务的稳定性以外,如何保证消息在故障转移过程中不丢失也是高可用方案的重要指标。

RabbitMQ 支持 quorum queues 以及 classic mirrored queues 两种高可用模式。两种模式都是通过 master-slave 机制来实现消息冗余的。对于消息发布,ACK 等操作只能在 master 节点上进行,而 slave 从 master 节点上同步数据变化。当 master 节点故障时,已同步的最年老 slave 节点将被选举为新的 master。因为 slave 节点只能用作故障转移,无法进行消息投递等行为,这就导致了相较于非高可用模式下的 master 吞吐量下降。其中 quorum queues 因为实现了 Raft 协议的缘故,因此较于 classic mirrored queues 提供了更好的数据安全性保证。

Kafka 的 partition 同样也支持高可用。Partition 被分为 leader 和 follower, 每个 leader 有若干个 follower,每个 leader partition 会维护一个 ISR 列表来管理与自己保持 follower 节点。和 RabbitMQ 一样,所有的消息读写只能对 leader partition 进行。每当发布者发送消息到 leader partition 时,leader partition 要等到 ISR 列表里的所有节点都同步完该消息以后,才允许消费者消费,我们称之为 watermark 机制。当 leader partition 故障时,此时则从 ISR 列表中选举出一个 follower 节点作为 master。

消费方式

RabbitMQ 和 Kafka 的消费者分别采用了 push 和 poll 的方式从消息代理获取消息。RabbitMQ server 在推送消息时,会根据 channel 以及消费者设置的 qos 值投递。Qos 类似于 TCP 中的滑动窗口,允许消费者在未 ACK 的情况下同时获取到多个消息,通过增大 qos 值能够提高 RabbitMQ 的吞吐量。

如前所述,在 Kafka 中保存消息的载体是 append-only 日志。当生产者对 partition 每写入一条消息,就向对应的日志文件尾部插入一条记录,除了记录消息内容外,还会按序为该条消息分配一个 partition 内部唯一的 offset,消息者通过 offset 来标志消费进度。采用 append-only 日志保存消息能够将随机写入转化为顺序写入,这对提高写入吞吐量有极大帮助,但是这也同样带来了消费消息时顺序查找所花费的O(N)时间复杂度,为了加快查找效率,Kafka 还会为每一个日志文件维护一个索引文件。同时 Kafka 也提供日志 retention 的机制,用户可以根据大小和时间两种指标设置日志文件的轮换策略。

image

图4.Kafka 底层存储(摘自https://tech.meituan.com/2015/01/13/kafka-fs-design-theory.html)

生产者投递方式

RabbitMQ 和 Kafka 的生产者投递方式大同小异。值得一提的是,RabbitMQ 为了防止生产者过快投递引起服务器过载而引入了背压机制,RabbitMQ 会随着逐步积压的未消费数据的增多而慢慢限制生产者的发布速率。同时 RabbitMQ 的 high_memory_watermark 也会对发布速率造成印象。当 RabbitMQ serevr 消耗的内存达到 high_memory_watermark 后,此时将阻塞生产者的发布。虽然提高 high_memory_watermark 能够增加可消耗内存,减少生产者被阻塞的概率,但是这可能会导致 RabbitMQ OOM。RabbitMQ 采用了写时复制的策略进行垃圾回收,这可能会使 RabbitMQ 在垃圾回收时内存加倍。

而关于 Kafka,前面我们有讲到它也有一个 watermark,虽然名字相同,但是作用却大不一样。Kafka 中每一个 leader partition 都会维护一个 watermark,该 watermark 主要用来标志 ISR 节点的同步进度。当某条消息被所有的 ISR 节点都同步完成后,那么 watermark 就会移动到该条消息的 offset,此时对于消费者可见。已经发布至 leader partition 但是位于 watermark 之后的消息,对于消费者是不可见的。

图5.Kafka high watermark

本文参考

https://www.rabbitmq.com/confirms.html#publisher-confirms

https://www.rabbitmq.com/clustering.html

https://www.rabbitmq.com/quorum-queues.html