最近在阅读AMQP协议。AMQP协议算是消息队列里无法绕开的一个协议,通过阅读该协议来学习消息队列以及自有协议设计。该协议的阅读体验非常好,协议本身没有过于复杂,规范里也会解释各个地方的设计思路。
该文章是基于AMQP 0.9.1编写,AMQP 0.9.1规范发布于2008年。所以后面提到的很多特性在当时是先进的,但放到现在可能就习以为常了。比如Kafka于2011年发布第一版,RTMP协议1.0版本于2012年发布,HTTP/2于2015年发布,
AMQP的全称为:Advanced Message Queuing Protocol(高级消息队列协议)
AMQP所覆盖的内容包含网络协议以及服务端服务
一套被称作”高级消息队列协议模型(AMQ Model)“的消息能力定义。该模型涵盖Broker服务中用于路由和存储消息的组件,以及把这些组件连在一起的规则。
一个网络层协议AMQP。能够让客户端程序与实现了AMQ Model的服务端进行通信。
AMQP像是一个把东西连在一起的语言,而不是一个系统。其设计目标是:让服务端可通过协议编程。理解了AMQP的这个设计目标,也就能够理解其协议的设计思路了。
AMQP协议是一个二进制协议,具有一些现代特性:多通道(multi-channel),可协商(negotiated),异步、安全、便携、语言中立、高效的。其协议主要分成两层:
功能层(Functional Layer):定义了一系列的命令
传输层(Transport Layer):携带了从应用 → 服务端的方法,用于处理多路复用、分帧、编码、心跳、data-representation、错误处理。
这样分层之后,可以把传输层替换为其它传输协议,而不需要修改功能层。同样,也可以使用同样的传输层,基于此实现不同的上层协议。可能RabbitMQ也是因为类似的原因,能够比较容易的支持MQTT、STOMP等协议的吧。
AMQ Model的设计是由以下需求驱动的:
确保符合标准的实现之间的互操作性。
提供清晰且直接的方式控制QoS
保持一致和明确的命名
通过协议能够修改服务端的各种配置
使用可以轻松映射到应用程序级API的命令符号
清晰,每个操作只能做一件事。
AMQP传输层是由以下需求驱动的
紧凑。能够快速封包和解包
可以携带任意大小的消息,没有明显的限制
同一个连接可以承载多个通道(Channel)
长时间存活,没有显著的限制
允许异步命令流水线
容易扩展。易于处理新需求、或者变更需求
向前兼容
使用强大的断言模型,可修复
对编程语言保持中立
适合代码生成过程
在设计过程中,希望能够支持不同的消息架构:
先存后发模型。有多个Writer,只有一个Reader
分散工作负载。有多个Writer和多个Reader
发布订阅模型,多个Writer和多个reader
基于消息内容的路由,多个Writer,多个Reader
队列文件传输,多个Writer,多个Reader
两个节点之间点对点连接
市场数据(Market data)分发。多个数据源,多个Reader
主要包含了三个主要的组件:
exchange
(交换器):从Publisher程序中收取消息,并把这些消息根据一些规则路由到消息队列(Message Queue)中
message queue
(消息队列):存储消息。直到消息被安全的投递给了消费者。
binding
:定义了 message queue
和 exchange
之间的关系,提供了消息路由的规则。
可以把AMQP的架构理解为一个邮件服务:
一个AMQP消息类似于一封邮件信息
消息队列类似于一个邮箱(Mailbox)
消费者类似一个邮件客户端,能够拉取和删除邮件。
交换器类似一个MTA(邮件服务器)。检查邮件,基于邮件里的路由信息、路由表,来决定如何把邮件发送到一个或多个邮箱里。
Routing Key类似于邮件中的To:
,Cc:
, Bcc:
的地址。不包含服务端信息。
每一个交换器实例,类似于各个MTA进程。用于处理不同子域名的邮件,或者特定类型的邮件。
Binding
类似于MTA中的路由表。
在AMQP里,生产者直接把消息发到服务端,服务端再把这些消息路由到邮箱中。消费者直接从邮箱里取消息。但在AMQP之前的很多中间件中,发布者是把消息直接发到对应的邮箱里(类似于存储发布队列),或者直接发到邮件列表里(类似topic订阅)。
这里的主要区别在于,用户可以控制消息队列和交换器的绑定规则,而不是依赖中间件自身的代码。这样就可以做很多有趣的事情。比如定义一个这样的规则:把所有包含这样和这样Header的消息,都复制一份到这个消息队列中。“
而这一点也是我认为AMQP和其他一些消息队列最重要的差异。
消息的生命周期
消息由生产者产生。生产者把内容放到消息里,并设置一些属性以及消息的路由。然后生产者把消息发给服务端。
服务端收到消息,交换器(大部分情况)把消息路由到若干个该服务器上的消息队列中。如果这个消息找不到路由,则会丢弃或者退回给生产者(生产者可自行决定)。
一条消息可以存在于许多消息队列中。服务器可以通过复制消息,引用计数等方式来实现。这不会影响互操作性。但是,将一条消息路由到多个消息队列时,每个消息队列上的消息都是相同的。没有可以区分各种副本的唯一标识符。
消息到达消息队列。消息队列会立即尝试通过AMQP将其传递给消费者。如果做不到,消息队列将消息存储(按生产者的要求存储在内存中或磁盘上),并等待消费者准备就绪。如果没有消费者,则消息队列可以通过AMQP将消息返回给生产者(同样,如果生产者要求这样做)。
当消息队列可以将消息传递给消费者时,它将消息从其内部缓冲区中删除。 可以立即删除,也可以在使用者确认其已成功处理消息之后删除(ack)。 由消费者选择“确认”消息的方式和时间。消费者也可以拒绝消息(否定确认)。
生产者发消息与消费者确认,被分组成一个事务。当一个应用同时扮演多个角色时:发消息,发ack,commit或者回滚事务。消息从服务端投递给消费者这个过程不是事务的。消费者对消息进行确认就够了。
在这个过程中,生产者只能把所有消息发到一个单点(交换器),而不能直接把消息发到某个消息队列(message-queue)中。
交换器(exchange)的生命周期
每个AMQP服务端都会自己创建一些交换器,这些不能被销毁。AMQP程序也可以创建其自己的交换器。AMQP并不使用 create
这个方法,而是使用 declare
方法来表示:如果不存在,则创建,存在了则继续。程序可以创建交换器用于私有使用,并在任务完成后销毁它们。虽然AMQP提供了销毁交换器的方法,但一般来讲程序不需要销毁它。
队列(queue)的生命周期
队列分为两种,
持久化消息队列:由很多消费者共享。当消费者都退出后,队列依然存在,并会继续收集消息。
临时消息队列:临时消息队列对于消费者是私有和绑定的。当消费者断开连接,则消息队列被删除。
绑定是交换器和消息队列之间的关系,告诉交换器如何路由消息。
// 绑定命令的伪代码
Queue.Bind <queue> TO <exchange> WHERE <condition>
几个经典的使用案例:共享队列、私有的回复队列、发布-订阅。
构造一个共享队列
Queue.Declare queue=app.svc01 // 声明一个叫做 app.svc01 的队列
// Comsumer
Basic.Consume queue=app.svc01 // 消费者消费该队列
// Producer
Basic.Publish routing-key=app.svc01// 生产者发布消息。routingKey为队列名称
构造一个私有回复队列
一般来讲,回复队列是私有的、临时的、由服务端命名、只有一个消费者。(没有直接使用AMQP协议中的例子,而是使用了RabbitMQ的例子)
Queue.Declare queue=rpc_queue // 调用的队列
// Server
Basic.Consume queue=rpc_queue
// Client
Queue.Declare queue=<empty> exclusive=TRUES:Queue.Declare-Ok queue=amq.gen-X
... // AMQP服务端告诉队列名称
Basic.Publish queue=rpc_queue reply_to=amq_gen-X... // 客户端向服务端发送请求
// Server
handleMessage()
// 服务端处理好消息后,向消息列的reply-to字段中的队列发送响应
Basic.Publish exchange=<empty> routing-key={message.replay_to}
构造一个发布-订阅队列
在传统的中间件中,术语 subscription
含糊不清。至少包含两个概念:匹配消息的条件集,和一个临时队列用于存放匹配的消息。AMQP把这两部分拆成:binding
和message queus
。在AMQP中,并没有一个实体叫做 subscription
AMQP的发布订阅模型为:
给一个消费者保留消息(一些场景下是多个消费者)
从多个源收集消息,比如匹配Topic、消息的字段、或者内容等方式
订阅队列与命名队列或回复队列之间的关键区别在于,订阅队列名称与路由目的无关,并且路由是根据抽象的匹配条件完成的,而不是路由键字段的一对一匹配。
// Consumer
Queue.Declare queue=<empty> exclusive=TRUE
// 这里是使用服务端下发的队列名称,并设置为独占。
// 也可以使用约定的队列名称。这样就相当于把发布-订阅模型与共享队列组合使用了S:Queue.Declare-Ok queue=tmp.2
Queue.Bind queue=tmp.2
TO exchange=amq.topic WHERE routing-key=*.orange.*
Basic.Consume queue=tmp.2
// Producer
Basic.Publish exchange=amq.topic routing-key=quick.orange.rabbit
中间件复杂度很高,所以设计协议时的挑战是要驯服其复杂性。AMQP采用方法是基于类来建立传统API模型。类中包含方法,并定义了方法明确应该做什么。
AMQP中有两种不同的方式进行对话:
同步请求-响应。一个节点发送请求,另一个阶段发送响应。适用于性能不重要的方法。发送同步请求时,该节点直到收到回复后,才能发送下一个请求
异步通知。一个节点发送数据,但是不期待回复。一般用于性能很重要的地方。异步请求会尽可能快的发送消息,不等待确认。只在需要的时候在更上层(比如消费者层)实现限流等功能。AMQP中可以没有确认,要么成功,要么就会收到关闭Channel或者连接的异常。如果需要明确的追踪成功或者失败,那么应该使用事务。
Connection类
AMQP是一个长连接协议。Connection被设计为长期使用的,可以携带多个Channel。Connection的生命周期是:
客户端打开到服务端的TCP/IP连接,发送协议头。这是客户端发送的数据里,唯一不能被解析为方法的数据。
服务端返回其协议版本、属性(比如支持的安全机制列表)。 the Start method
客户端选择安全机制 Start-Ok
服务端开始认证过程, 它使用SASL的质询-响应模型(challenge-response model)。它向客户端发送一个质询 Secure
客户端向服务端发送一个认证响应Secure-Ok
。比如,如果使用 plain
认证机制,则响应会包含登录名和密码
客户端重复质询Secure
或转到协商步骤,发送一系列参数,如最大帧大小 Tune
客户端接受,或者调低这些参数 Tune-Ok
客户端正式打开连接,并选择一个Vhost Open
服务端确认VHost有效 Open-Ok
客户端可以按照预期使用连接
当一个节点打算结束连接 Close
另一个节点需要结束握手 Close-Ok
服务端和客户端关闭Socket连接。
如果在发送或者收到 Open
或者 Open-Ok
之前,某一个节点发现了一个错误,则必须直接关闭Socket,且不发送任何数据。
Channel类
AMQP是一个多通道协议。Channel提供了一种方式,在比较重的TCP/IP连接上建立多个轻量级的连接。这会让协议对防火墙更加友好,因为端口使用是可预知的。它也意味着很容易支持流量调整和其他QoS特性。
Channels相互是独立的,可以同步执行不同的功能。可用带宽会在当前活动之间共享。
这里期望也鼓励多线程客户端程序应该使用 每个线程一个channel
的模型。不过,一个客户端在一个或多个AMQP服务端上打开多个连接也是可以的。
Channel的生命周期为:
客户端打开一个新通道 Open
服务端确认新通道准备就绪 Open-Ok
客户端和服务端按预期来使用通道.
一个节点关闭了通道 Close
另一个节点对通道关闭进行握手 Close-Ok
Exchange类
Exchange类能够让应用操作服务端的交换器。这个类能够让程序自己设置路由,而不是通过某些配置。不过大部分程序并不需要这个级别的复杂度,过去的中间件也不只支持这个语义。
Exchange的生命周期为:
客户端让服务端确保该exchange存在Declare
。客户端可以细化为:“如果交换器不存在则进行创建” 或 “如果交换器不存在,警告我,不需要创建”
客户端向Exchange发消息
客户端也可以选择删掉Exchange Delete
Queue类
该类用于让程序管理服务端上的消息队列。几乎所有的消费者应用都是基本步骤,至少要验证使用的消息队列是否存在。
一个持久化消息队列的生命周期非常简单
客户端断言这个消息队列存在 Declare
(设置 passive
参数)
服务端确认消息队列存在 Declare-Ok
客户端消息队列中读消息
一个临时消息队列的生命周期会更有趣些:
客户端创建消息队列 Declare
(不提供队列名称,服务器会分配一个名称)。服务端确认 Declare-Ok
客户端在消息队列上启动一个消费者
客户端取消消费,可以是显示取消,也可以是通过关闭通道或者连接连接隐式取消的
当最后一个消费者从消息队列中消失的时候,在过了礼貌性超时后,服务端会删除消息队列
AMQP实现了Topic订阅的分发模型。这可以让订阅在合作的订阅者间进行负载均衡。涉及到额外的绑定阶段的生命周期:
客户端创建一个队列Declare
,服务端确认Declare-Ok
客户端绑定消息队列到一个topic exchange上Bind
,服务端确认Bind-Ok
客户端像之前一样使用消息队列。
Basic类
Basic实现本规范中描述的消息功能。支持如下语义:
从客户端→服务端发消息。异步Publish
开始或者停止消费Consume
,Cancel
从服务端到客户端发消息。异步Deliver
,Return
确认消息Ack
,Reject
同步的从消息队列中读取消息Get
事务类:
AMQP支持两种类型的事务:
自动事务。每个发布的消息和应答都处理为独立事务.
服务端本地事务:服务器会缓存发布的消息和应答,并会根据需要由client来提交它们.
Transaction 类(“tx”) 使应用程序可访问第二种类型,即服务器事务。这个类的语义是:
应用程序要求服务端事务,在需要的每个channel里Select
应用程序做一些工作Publish
, Ack
应用程序提交或回滚工作Commit
,Roll-back
应用程序正常工作,循环往复。
事务包含发布消息和ack,不包含分发。所以,回滚并不能重入队列或者重新分发任何消息。客户端有权在事务中确认这些消息。
AMQP的功能描述,一定程度上也是RabbitMQ的功能描述,不过RabbitMQ基于AMQP做了一些扩展
消息会携带一些属性,以及具体内容(二进制数据)
消息是可被持久化的。持久化消息是可以安全的存在硬盘上的,即使发生了验证的网络错误、服务端崩溃溢出等情况,也可以确保被投递。
消息可以有优先级。同一个队列中,高优先级的消息会比低优先级的消息先被发送。当消息需要被丢弃时(比如服务端内存不足等),将会优先丢弃低优先级消息
服务端一定不能修改消息的内容。但服务端可能会在消息头上添加一些属性,但一定不会移除或者修改已经存在的属性。
虚拟主机是服务端的一个数据分区。在多租户使用时,可以方便进行管理。
虚拟主机有自己的命名空间、交换器、消息队列等等。所有连接,只可能和一个虚拟主机建立。
交换器是一个虚拟主机内的消息路由Agent。用于处理消息的路由信息(一般是Routing-Key),然后将其发送到消息队列或者内部服务中。交换器可能是持久化的、临时的、自动删除的。交换器把消息路由到消息队列时可以是并行的。这会创建一个消息的多个实例。
Direct
交换器
一个消息队列使用RoutingKey K
绑定到交换器
生产者向交换器发送RoutingKey为R
的消息
当 K=R
时,消息被转发到该消息队列中
Fanout
交换器
一个消息队列没有使用任何参数绑定交换器
生产者向交换器发了一条消息
这个消息无条件的发送到该消息队列
Topic
交换器
消息队列使用路由规则 P
绑定到交换器
生产者使用RoutingKey R
发送消息到交换器
如果R 能够匹配 P
,则把消息发到该消息队列。
RoutingKey必须由若干个被点.
分隔的单词组成。每个单词只能包含字母和数字。其中 *
匹配一个单词,#
匹配0个或者多个单词。比如 *.stock.#
匹配 usd.stock
和 eur.stock.db
但是不匹配 stock.nasdaq
Headers
交换器
消息队列使用Header的参数表来绑定。不适用RoutingKey
生产者向交换器发送消息,Header中包含了指定的键值对
如果匹配,则传给消息队列。
比如:
format=json,type=log,x-match=all
format=line,type=log,x-match=any
如果 x-match
为all
,则必须都匹配才行。如果x-match
为any
,则有任意一个header匹配即可。
系统交换器
这个平时应该用不到,这里略过。感兴趣的可以直接查看AMQP0.9.1的3.1.3.5章节。
解释了命令如何映射到传输层的。在设计自有协议时,可以参考一下它的设计思路,以及中间需要注意的问题。
AMQP是一个二进制协议。有不同类型的帧frame
构成。帧会携带协议的方法以及其他信息。所有的帧都有相同的基本结构,即:帧头,payload,帧尾。payload格式取决于帧的类型。
我们假设使用的是面向流的可靠网络层(比如TCP/IP)。单个Socket连接上可以有多个独立的控制线程,也就是通道Channel
。不同的通道共享一个连接,每个通道上的帧都是按严格的顺序排列,这样可以用一个状态机来解析协议。
传输层(wire-level)的格式被设计为扩展性强、且足够通用,可以用于任何更高层的协议(不仅仅是AMQP)。我们假设AMQP是会被扩展、优化的。
主要涉及这几个部分:数据类型、协议协商、分帧方式、帧细节、方法帧、内容帧、心跳帧、错误处理、通道与连接的关闭。
AMQP的数据类型用于方法帧中,他们有
整数(1-8个字节),表示大小,数量,范围等。全都是无符号整数
Bits。用于表示为开/关值,会被封包为字节。
短字符串。用于存放短的文本属性。最多255字节,解析时不用担心缓冲区溢出。
长字符串:用于存放二进制数据块
字段表(Field Table),用于存放键值对
客户端连接时,和服务端协商可接受的配置。当两个节点达成一致后,连接才能继续使用。通过协商,可以让我们断言假设和前提条件。主要协商这几方面的信息
实现的协议和版本。服务端可能会在同一端口提供多种协议的支持
加密参数和验证
最大帧尺寸、Channel的数量、某些操作的限制。
如果协商达成一致,双方会根据协商预分配缓冲区避免死锁。传入的帧如果满足协商条件,则认为其实安全的。如果超过了,那么另一方必须断开连接。
TCP/IP是流协议。没有内置的分帧机制。现有的协议一般有这几种方式进行分帧:
每个连接只发送一个帧。简单,但是慢。
在流中加入分隔符来分帧。简单,但是解析较慢(因为需要不断的读取,去寻找分隔符)
计算帧的尺寸,并在每个帧之前发送尺寸。简单且快速。也是AMQP的选择
帧头包括:帧类型、通道、尺寸。帧尾包含错误检测信息。
读帧头,检查帧类型和Channel
根据帧类型,读取payload并处理
读帧尾校验
在实现时,性能很重要的时候,我们会使用 read-ahead buffering
或者 gathering reads
去避免读帧时进行三次系统调用。
方法帧
读取方法帧的payload
解包为结构
检查方法在当前上下文中是否允许
检查参数是否有效
执行方法。
方法帧是由AMQP数据字段组成。编码代码可以直接从协议规范中生成,速度非常快。
内容帧
内容是端到端直接发送的应用数据。内容由一系列属性和二进制数据组成。其中一系列的属性组成了 ”内容帧的帧头“。而二进制数据,可以是任意大小,它可能被拆分成多个块发送,每个块是一个 content-body帧
一些方法(比如 Basic.Publish
,Basic.Deliver
)是会携带内容的。一个内容帧的帧头如下结构:
这里把 content-body
作为单独的帧,这样就可以支持Zero-copy技术,这部分内容就不需要被编码。把内容属性放到自己的帧里,这样收件人就可以选择性的丢弃不想处理的内容。
通道与连接的关闭
对于客户端,只要发送了 Open
就认为连接和通道是打开的。对于服务端则是Open-Ok
。如果一个节点想要关闭通道和连接必须要进行握手。
如果突然或者意外关闭,没办法立刻被检测到,可能会导致丢失返回值。所以需要在关闭之前进行握手。在一个节点发送 Close
后,另一个节点必须发送 Close-Ok
来回复。然后双方可以关闭通道或者连接。如果节点忽略了 Close
操作,当双方同时发送 Close
时,可能会导致死锁。
最后,更多细节可以查看AMQP的官方规范,以及RabbitMQ在其基础上扩充的其它特性。
参考链接
AMQP 0.9.1 规范:https://www.rabbitmq.com/resources/specs/amqp0-9-1.pdf
AMQP 0.9.1 完整的中文翻译:http://www.blogjava.net/qbna350816/archive/2016/08/12/431554.html
RabbitMQ 对于 AMQP 0.9.1 的勘误:https://www.rabbitmq.com/amqp-0-9-1-errata.html
RabbitMQ 对与 AMQP 0.9.1 的扩展:https://www.rabbitmq.com/extensions.html
AMQP 1.0 最终版:http://www.amqp.org/specification/1.0/amqp-org-download