消息队列中间件(message queue middleWare, MQ)指利用高效可靠的消息传递机制进行与平台无关的数据交流,并基于数据通信进行分布式数据集成。一般有两种传递模式:点对点模式和发布订阅模式。常用消息中间件:RabbitMq、ActiveMq、Kafka、RocketMq、Pulsar。Kafka在Pulsar问世前被称为消息中间件之王,但Pulsar目前正在被各个大厂使用,如华为、腾讯等,性能强悍,有超越kafka的趋势,感兴趣可以了解下,这里不做过多讲解。
应用程序A将消息发送给消息中间件, 消息中间件将消息路由给应用程序B,这样消息可以完全存在于两台不同的计算机上。消息中间件负责网络通信,如果网络不可用,消息中间件会存储消息直到连接可用。
解藕: 从上面消息中间件模型图可以看到,应用程序A和B之间互相并不知道,实现了解耦。
冗余(存储):有些情况数据处理会失败,消息中间件可以把数据进行持久化,直到他们已经被完全处理。通过这种方式规避数据丢失的风险。
扩展性:因为消息中间件解藕了应用的处理过程,所以提高消息入队和处理效率很容易。
削峰:在访问剧增的情况下,应用仍需要继续发挥作用,但这种突发的流量并不常见,如果以处理峰值的标准来投入资源,无疑是巨大的浪费,使用消息中间件支撑突发的流量,不会因为超负荷请求而完全奔溃。
可恢复性:当系统的一部分组件失效时不影响整个系统。降低了应用间的耦合性,系统恢复后还能继续处理消息。
顺序保证:大多数场景下,顺序处理数据很重要,大部分消息中间件支持一定程度上的顺序性。
异步通信:很多时候不需要立即处理消息,消息中间件提供了异步处理机制。
缓冲:在任何重要的系统中,都会存在需要不同处理时间的元素,消息中间件通过一个缓冲层来帮助任务最高效率的执行,写入消息中间件的处理尽可能的快。该缓冲层有助于控制和优化数据流经过系统的速度。
RabbitMq最早起源于金融系统,是AMQP协议的Erlange语言实现。整体上是一个生产者消费者模型,主要负责接受、存储和转发消息。架构图如下
主要分为三部分,生产者、broker、消费者
投递消息的一方。创建消息,然后投递到RabbitMq中。消息一般包含两部分:消息体和标签(label)。消息体称为payload,标签用来表述这条消息,比如一个交换器的名称和一个路由键。生产者把消息给RabbitMQ,RabbitMq根据标签把消息发送给感兴趣的消费者。
接收消息的一方。消费者消费一条消息的时候,只是消费消息的消息体(payload)。在消息路由的过程中,消息的标签会丢弃,存入到队列中的消息只有消息体,消费者也只会消费消息体。
消息中间件的服务节点。一个RabbitMq Broker可以简单地看作一个RabbitMq服务节点,或者RabbitMq服务实例。Broker由交换器、队列、RoutingKey、BindingKey、VHost组成。
VHost:虚拟地址,用于进行逻辑隔离。
交换器(Exchange):生产者将消息发送给交换器,由交换器将消息路由到一个或者多个队列中。如果路由不到,会返给消费者,或者直接丢弃。RabbitMq中常用的交换器有四种,fanout、direct、topic、headers这四种。amqp协议中还提到另外两种:system和自定义
队列(queue):用来存储消息,RabbitMq中消息只能存储在队列中。多个消费者可以订阅同一个队列,这时队列中的消息会被平均分摊(轮询)。
RoutingKey:路由键,生产者将消息发送给交换器的时候,一般会指定一个RoutingKey,用来指定这个消息的路由规则,而这个RoutingKey需要与交换器类型和绑定键(bindingKey)联合使用才能生效。
BindingKey:通过绑定将交换器和队列关联起来,在绑定的时候一般会指定一个绑定键(BindingKey),这样RabbitMq就知道如何正确将消息路由到队列了。
客户端和broker建立的连接是TCP连接,一旦连接建立起来,客户端紧接着可以创建一个AMQP信道,RabbitMq处理的每条AMQP指令都是通过信道完成的。
我们可以完全通过connection来完成工作,为什么要引入信道呢?一个应用程序中有很多线程需要从RabbitMq中消费,或者生产消息,那么必然需要建立很多连接。然而对于操作系统而言,建立和销毁TCP连接是非常昂贵的开销,如果遇到业务高峰,性能随之显现。RabbitMq采用类似NIO的做法,选择TCP复用,不仅可以减少性能开销,同时也便于管理。
每个线程把持一个信道,所以信道复用connection的TCP连接。同时RabbitMq可以确保每个线程的私密性。就像拥有独立的连接一样,当每个信道流量不是很大时,复用单一的connection可以在产生性能瓶颈情况下有效节省TCP资源。但是当信道本身的流量很大时,这时多个信道复用一个connection就会产生性能瓶颈,进而使整体的流量被限制。此时就需要开辟多个connection,将这些信道均摊到这些connection中。
消息中间件定义中提到高效可靠,可靠有两层含义,服务可靠和消息可靠,这里只讨论消息的可靠性。要讨论消息的可靠性首先知道哪里会出现不可靠的情况,即消息丢失。
生产者将消息发往broker,默认情况下broker不会返回任何信息给生产者,那在发往broker的过程中,可能由于网络原因或其他原因,消息根本就没有到达broker,而生产者是不知道有没有到达,更不要谈后续的消息持久化了,因为消息根本就没到达broker,发送消息的过程中出现消息丢失。如何解决呢,有两种方案。
1、 事务机制实现
开启事务多了几个环节,只有消息成功被RabbitMq接收,事务才能提交,否则便可在捕获异常之后进行处理。但事务会严重影响RabbitMq的性能,大大降低吞吐量。因为在事务提交前生产者不能发送下一条消息,一直处于阻塞状态,极大的降低了吞吐量。
2、 生产者消息确认机制
生产者确认是一种轻量级的机制,生产者将信道设置成confirm(确认)模式,一旦信道进入confirm模式,所有在该信道上面发布的消息都会被指派一个唯一的ID,一旦消息被投递到所有匹配的队列之后,RabbitMq就会发送一个确认(Basic.Ack)给生产者(包括消息的唯一id),这就使得生产者知晓消息已经正确到达目的地。如果消息和队列是持久化的,那么确认消息会在消息写入磁盘之后发出。
为了保证消息从队列可靠的到达消费者,RabbitMq提供了消息确认机制。消费者在订阅队列时,可以指定autoAck参数,为false时,RabbitMq会等待消费者显式回复确认后才从内存(或磁盘)中移去消息(实际上是打上删除标记,之后在删除)。当为true时,RabbitMq会把发送出去的消息置为确认,然后从内存(或磁盘)中删除,而不管消费者是否真正的消费了消息。
autoAck为false时,消费者就有足够的时间处理消息,不用担心处理消息的过程中消费者进程挂掉或者消息丢失的问题,因为RabbitMq会一直等待持有消息的消费者显式的回调ack。
autoAck设置为false时,对于RabbitMq服务而言,队列中的消息分为了两部分:一部分是等待投递给消费者的消息;一部分是已经投递给消费者,但是还没有收到消费者确认的消息。如果一直没有收到消费者的确认信号,并且消费此消息的消费者已经断开连接,则RabbitMq会安排该消息重新入队列,等待投递给下一个消费者,当然也有可能还是这个消费者。
RabbitMq不会为未确认的消息设置过期时间,他判断此消息是否需要重新投递给消费者的唯一依据是消费该消息的消费者是否断开了连接,这么设计可以保证消费者的消费时间很长很长。
注意,这里的autoAck是RabbitMq定义,通常情况下我们使用的是Spring封装好的,Spring定义的和RabbitMq中稍有不同,做了些扩展。在AcknowledgeMode有详细的介绍。
public enum AcknowledgeMode {
/**
* No acks - {@code autoAck=true} in {@code Channel.basicConsume()}.
*/
NONE,
/**
* Manual acks - user must ack/nack via a channel aware listener.
*/
MANUAL,
/**
* Auto - the container will issue the ack/nack based on whether
* the listener returns normally, or throws an exception.
* <p><em>Do not confuse with RabbitMQ {@code autoAck} which is
* represented by {@link #NONE} here</em>.
*/
AUTO;
}
NONE:相当于autoAck设置为true,消息发送出去后就不管了,到底有没有成功到达消费者,消费者到底有没有处理成功,broker都不关心。
AUTO:自动ACK,源码中注解写的很清楚,不要和RabbitMq中autoAck所混淆,消息处理完会自动返回ack。但这种方式有个问题是如果消息处理失败,没有返回ack,就会有unAck的消息在队列中等待处理,对后续消息造成阻塞,产生消息积压,使用时要注意。
MANUAL:手动ACK,需要显示的告诉broker ack或unAck,这种方式在配合QOS(下文介绍)是比较完善的方案。
持久化可以提高RabbitMq的可靠性,防止在异常情况下消息的丢失。RabbitMq的持久化分成三部分:交换器的持久化、队列的持久化和消息的持久化。
交换器的持久化可以在声明交换器的时候设置,如果交换器不设置持久化,RabbitMq重启后,交换器的元数据会丢失,不过消息不会丢失,只是不能将消息发送给这个交换器了。
队列的持久化在声明队列的时候设置,如果不设置队列的持久化,RabbitMq服务重启后,队列的元数据会丢失,此时数据也会丢失。
将交换器、队列、消息都设置成持久化后能保证数据不丢失吗?答案是否定的。
1、从消费者的角度来看,将autoAck设置成true,那么当消费者接收到相关消息后,还没来得及处理就宕机了,这样也算数据丢失。
2、在持久化的消息正确存入RabbitMq之后,还需要一段时间才能存入磁盘。RabbitMq不会为每条消息都进行同步存盘(调用内核的fsync)的处理,可能仅保存在操作系统缓存之中而不是物理磁盘之中。如果在这段时间内,RabbitMq发生了宕机、重启等异常情况,消息保存还没来得及落盘,那么这些消息将会丢失。
上面的问题可以通过镜像队列机制来解决。相当于配置了副本,如果主节点在此特殊时期挂掉了,可以自动切换到从节点,这样有效的保证了高可用性,除非整个集群都挂掉。虽然这样也不能完全保证不丢失,但这样已经好很多。
当队列有多个消费者时,会出现某些消费者消费能力弱,某些消费者消费能力强的情况。如果不加限制可能会出现broker总是将消息推到处理能力弱的消费者身上,时间长了造成消息堆积。RabbitMq引入了服务质量(QOS)的概念,通过调用basicQos方法设置,方法中的参数可以设置预取数量,比如设置为10,意思是在这10条消息未处理完返回ack前不要给我推消息,以此来达到负载均衡的效果。QOS只有推模式生效,对拉模式是无效的。
RabbitMq可以对消息和队列设置TTL。
设置队列的TTL;通过队列的属性设置,队列中的所有消息都有相同的过期时间,一旦消息过期,就会立即从队列中抹去。
设置消息的TTL:对消息单独设置,每条消息的TTL可以不同,即使消息过期也不会立即从队列中抹去,在投递前判定。如果两者一同使用,则以最小的那个为准,消息的生存时间一旦超过了设置的TTL,就会变成“死信”,消费者则无法收到该消息。
DLX,全称为Dead-Letter-Exchange,死信交换器。当一个消息在队列中变成死信之后,它能被发送到另一个交换器中,这个交换器即使是DLX,绑定死信交换器的队列称为死信队列。消息变为死信的情况:
消息被拒绝,并且设置的requeue为false
消息过期
队列达到最大长度
DLX也是一个正常的交换器,和一般的交换器没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性。当这个队列中存在死信时,RabbitMq就会自动的将消息重新发布到设置的DLX上去,进而被路由到另一个队列中,即死信队列。可以监听这个队列中的消息进行相应的处理。
优先级队列,顾名思义,具有高优先级的队列具有高的优先权,优先级高的消息具备优先被消费的特权。在发送消息时设置当前的优先级,默认最低为0,最高为队列设置的最大优先级。优先级高的消息可以被优先消费,这个是有前提的:如果在消费者消费速度大于生产者的生产速度且broker中没有消息堆积的情况,对发送的消息设置优先级也就没有什么实际意义了。因为生产者刚发送完一条消息就被消费了,那么就意味着broker中至多有一条消息,对于单条消息来说优先级是没意义的。消费者如果是拉模式,优先级队列也没有实际意义。
所谓延迟队列是指当消息被发出后,并不想让消费者立刻拿到消息,而是等待特定时间后,消费者才能拿到消息。在AMQP协议中,或者RabbitMq本身并没有直接支持延迟队列的功能,但可以通过DLX和TTL来实现。
消息发送时设置消息的TTL,这个队列无消费者消息,在达到过期时间后消息将被投递到特定的私信队列中,通过这种方式间接的实现延迟队列。
不管是持久化的消息还是非持久化的消息都可以被写入磁盘。持久化的消息在到达队列时就被写入到磁盘,并且如果可以,持久化的消息也会在内存中保存一个备份,这样可以提高一定的性能,当内存吃紧的时候会从内存中消除。非持久化的消息一般只保存在内存中,在内存吃紧的时候再被换入到磁盘中,以节省内存空间。这两种类型的消息落盘处理都在RabbitMq的持久层完成的。持久层是一个逻辑概念,实际包括两部分:
队列索引(rabbit_queue_index):负责维护队列中落盘的消息,包括消息的存储地点、是否已被交付给消费者、是否已被消费者ack等。每个队列中都有一个与之对应的rabbit_queue_index。
消息存储(rabbit_msg_store):以键值对的形式存储信息,它被所有队列共享,在每个节点中有且只有一个。在细分的话分为msg_store_persistent和msg_store_transient,msg_store_persistent负责持久化消息的持久化,重启后消息不会丢失;msg_store_transient负责非持久化消息的持久化,重启后消息会丢失。
索引在各个文件系统中都有所应用,如MySql、Kafka等等。在RabbitMq中二者的大小对性能影响较大,可以根据实际情况进行相应的调整。
通常队列由rabbit_amqqueue_process和backing_queue这两部分组成。
rabbit_amqqueue_process负责协议相关的消息处理,即接收生产者发布的消息、向消费者交付消息、处理消息确认(包括生产端的confirm和消费端的ack)等。
backing_queue是消息存储的具体形式和引擎,并向rabbit_amqqueue_process提供相关的接口以供调用。
如果消息投递的目的队列是空的,并且有消费者订阅了这个队列,那么该消息会直接发送给消费者,不会经过队列这一步。而当消息无法直接投递给消费者时,需要暂时将消息存入队列,以便重新投递。消息存入队列后,不是固定不变的,他会随着系统的负载在队列中不断的流动,消息的状态会不断发生变化。RabbitMq中队列的消息有四种状态。
1、alpha:消息的内容(包括消息体、属性和headers)和消息索引都存储在内存中
2、beta:消息内容保存在磁盘中,消息索引保存在内存中。
3、gamma:消息内容保存在磁盘中,消息索引在磁盘和内存中都有。
4、delta:消息内容和索引都在磁盘中。
消息所处的四种状态对cpu和I/O的要求是不一样的,如delta状态不消耗内存,但对cpu和磁盘I/O要求高,这四种状态也间接影响了性能。
RabbitMq可以对内存和磁盘使用设置阈值,当达到阈值时,生产者将被阻塞(block),直到恢复正常。除了这两个阈值,rabbitmq从2.8.0版本开始还引入了流控(Flow Control)机制来确保稳定性。流控机制是用来避免消息的发送速率过快而导致服务器难以支撑的情形。内存和磁盘告警相当于全局的流控(Global Flow Control),一旦触发会阻塞集群中所有的Connection。
Erlange进程之间并不共享内存(binary类型除外),而是通过消息传递来通信,每个进程都有一个自己的进程邮箱。默认情况下,Erlange并没有对进程邮箱的大小进行限制,所以当大量消息持续发往某个进程时,会导致该进程邮箱过大,最终内存溢出并奔溃。在RabbitMq中,如果生产者持续高速发送,而消费者的消费速度较低时,如果没有流控,很快就会使内部进程邮箱的大小达到阈值。
RabbitMq使用了一种基于信用证算法(credit-base algorithm)的流控机制来限制发送消息的速率以解决前面提到的问题。他通过监控各个进程的进程邮箱,当某个进程的负载过高而来不及处理消息时,这个进程的进程邮箱开始堆积消息。当堆积到一定量时,就会阻塞而不接收上游的新消息。从而慢慢的上游进程的进程邮箱也开始堆积。当堆积到一定数量时也会阻塞而停止接收上游的消息,最后就会使负责网络数据包接收的进程 阻塞而暂停接受新的数据。
进程A接收消息并转发至进程B,进程B接收消息并转发给进程C。每个进程中都有一对关于收发消息的credit值。以进程B为例,{{credit_from, C}, value}表示能发送多少条消息给C,每发一条消息该值减1,当为0时,进程B不再往进程C发送消息,也不再接收进程A的消息。{{credit_to, A}, value}表示在接收多少条消息就向进程A发送增加credit值的通知,进程A接收到该通知后就增加{{credit_from, B}, value}所对应的值,这样进程A就能持续的发送消息。当上游发送速率高于下游接收速率时,credit值就会逐渐耗光,这时进程就会阻塞,阻塞的情况会一直传递到最上游。当上游进程接收到增加credit值的通知时,若此时上游进程处于阻塞状态则解除阻塞,开始接收更上游的消息,一个个传导最终能够解除最上游阻塞状态。由此可知基于信用证算法的流控机制最终将消息的发送频率限制在消息处理进程的处理能力范围内。