本章我们讨论RabbitMQ的消息模式与路由拓扑,主要涉及以下知识点:
-
交换器类型与绑定关系
-
消息队列
-
死信交换器
-
即时交换器与队列
-
备用交换器
-
优先级队列
交换器类型
1. 扇出交换器
扇出交换器提供了典型的发布订阅消息拓扑,发送到扇出交换器中的消息将会广播路由到所有绑定到当前交换器上的所有消息队列和交换器中。
在扇出交换器中,消费者彼此间相互独立,彼此收到的也只是消息的拷贝。如果需要对Consumer App1进行扩容,只需要部署多个应用实例即可,实例从同一个消息队列Queue1读取消息。
在所有的交换器中,扇出交换器是速度最快,因为它们并不需要去读取消息的路由键值或者消息头。
2. 直接交换器和默认交换器
直接交换器会基于消息的路由键值对消息进行路由。路由键值由消息的生产者设置。路由键值是由多个单词组成的由逗号分隔的字符串。例如:booking.new, booking.modified, booking.cancelled等等。
消息队列或者交换器与直接交换器之间会通过绑定键值来关联,如下图所示:
直接交换器是速度第二快的交换器,因为他们只需要执行精确的字串比较操作。
RabbitMQ中有一个特殊的交换器,我们称之为默认交换器,这也是一个直接交换器。默认交换器隐式绑定到所有消息队列中。隐式绑定关系的绑定键值与队列名字一致,这也即意味着你可以直接通过队列名字发送消息到队列中。有了这样的一个特性,生产者就可以准确选择目标消费者来处理消息,而不是通过由消费者配置绑定来实现。一般来说,交换器提供的是生产者与消费者的解耦,但对于默认交换器来说,除了消费者与生产者的解耦,其还额外提供了一个的点对点消息路由功能。
3. 主题交换器
主题交换器同样基于路由键值路由消息,但主题交换器针对绑定键值提供了两种类型的模糊匹配模式
*号表示匹配单个单词(booking.new包含两个单词),#表示匹配任意数量的单词。
假设我们有如下路由键值:
-
booking.new
-
booking.modified
-
booking.cancelled
-
booking.car.new
-
booking.car.modified
-
booking.car.removed
-
booking.hotel.new
-
booking.hotel.modified
-
booking.hotel.removed
如果我们可以使用以下绑定键值来定义绑定关系
- booking.new :精确匹配
- extras.*.modified :所有针对extras的修改
- extras.# : 所有的extras
- #.new : 所有新的bookings和extras
我们可以通过设计好的路由键值和绑定键值,在不改变绑定关系的前提下直接新增路由键值,这样就可以使得我们系统在面对变更时更显健壮。
注意,主题交换器会随着绑定关系的增加而速率降低。
4. 消息头交换器
消息头交换器功能强大,但速率也最慢。这也是使用消息头交换器的时候必须考虑的一个因素,因为当你未来面对需要扩容的时候,可能会带来一系列的性能问题。消息头交换器会忽略消息的路由键值,转而通过解析消息头的方式对消息进行路由。每一个绑定到消息头交换器的消息队列或者交换器可以包含多个匹配消息头,消息头之间可以通过定义“与”或者“或”来确定具体匹配规则。
例如,假设有消息包含如下消息头:
-
entity.type (booking, passenger, baggage, pet)
-
change.type (new, modified, cancelled, removed, moved)
-
agent.id
-
client.id
我们可以定义如下绑定规则:
-
entity.type=booking, change.type=cancelled, x-match=all. :所有取消订阅的消息
-
entity.type=passenger, x-match=all. I want all passenger messages. :所有乘客信息
-
entity.type=pet, change.type=new,x-match=all. :所有新增的宠物信息
-
agent.id=2, client.id=1001, x-match=any.:所有与指定旅行社或指定客户相关的信息。
一致性哈希交换器
一致性性哈希交换器允许我们通过哈希的方式将单个消息队列分隔成多个小的子队列。一致性哈希主要哈希的是消息的路由键值、消息头或者消息属性。一致性哈希为我们提供了可靠的有序处理和数据局部性模式。
当然了,一致性哈希也存在一定的问题。首先,RabbitMQ本身并不提供类似Kafka分区消息队列中的协调角色功能,所以需要由用户来负责各个消费者在分区队列中的消费分配,相比来说,Kafka提供的则是开箱即用的类似功能。此外还有其他可能的问题:
-
对路由键值,消息头或者消息属性本身进行哈希可能并无法得到相对公平的分布效果。极端情况,可能全部消息都最终路由到了单个消息队列中
-
如果消息队列相对较少的话,哈希结果可能仍旧不是相对公平
为了解决分布不均的问题,一些别的分布式系统采取了虚拟节点的概念。例如Riak和Cassandra就引入了比实际物理节点多的虚拟节点,这些虚拟节点跨物理节点分布。而RabbitMQ并没有虚拟节点的概念,所以使用一致性哈希交换器的时候要特别注意。
死信交换器
我们可以配置一个消息队列,该消息队列会基于以下条件路由消息到指定交换器中:
-
消息队列已达消息个数容量上限,队首的消息将被路由到私信交换器中(DLX),也即是说,当有新消息到达满消息队列时,队首中最老的消息将会被踢出队列。
-
队列也达字节大小容量上限 ,同样的,队首最早的消息将被踢出
-
队列配置了消息最大存活时间(TTL),有消息已达TTL上限
-
消息自身配置了TTL属性,并且已达TTL上限
这里有一点需要注意:只有位于队首的消息才是被认作死信路由的前提,因此,只有当消息到达消息队列的队首,并且达到TTL上限,这个消息才会被路由到死信交换器中。
死信交换器也是普通规则交换器,同样的,你可以创建任意一种类型(扇出、直接、主题、消息头)的死信交换器,并且绑定到任意队列或者其他交换器中。
RabbitMQ的死信功能为避免消息丢失提供了一种可能的逃逸路线,死信功能可用于消息重试或者延迟队列。
即时交换器与队列
即时交换器意味着当所有的队列绑定全部删除之后,交换器可以自毁。你可以通过删除队列绑定本身或者直接删除队列的方式来删除队列绑定。
即时队列意味着当所有的消费者不再使用当前队列,队列可以自毁。停止使用当前队列表示当前消费者取消了当前队列的订阅或者通道已被关闭。
即时队列也是排他队列,也就是说只有声明了当前队列的消费者才可以使用,一旦声明者取消或者关闭队列通道,队列即自动删除。
即时队列具备TTL属性,一旦队列空置超过TTL,队列即自动删除。
即时交换器和队列可以用作延迟队列,重试队列或者应答队列。
备用交换器
当配置交换器时,每一个交换器都可以包含一个备用交换器。当交换器因没有绑定或者匹配绑定导致无法正常路由消息时,交换器就会将消息路由给备用交换器。
这避免了我们因错误路由键值或者错误路由拓扑而导致的可能信息丢失。备用交互器也为我们提供了第五种路由模式。
优先级队列
在RabbitMQ中可以为消息配置不同的优先级,建议最多10级。当我们在声明一个队列时,同样也可以将其声明为优先级队列。当消息生产者为消息设定了一个优先级,那么其在优先级队列中位置将由这个优先级确定。很自然的,高优先级消息将优先处理。
当使用优先级队列时,我们需要考虑以下场景:如果一个优先级队列已满,并且一个高优先级的消息到位于队首,当一个低优先级的消息到达时,它会将队首的高优先级消息从队列中踢除到DLX中。这么处理也是有一定的道理,否则的话,低优先级消息将一直被阻塞。所以,使用优先级队列时需格外小心。
抄送与密送
消息生产者同样可以向消息中添加其他额外的路由键值信息(CC和BCC消息头,字符串数组的形式)。含义与Email抄送和密送一样,CC和BCC中的路由键值也会像正常的消息路由键值一样进行路由,BCC消息头在消息路由前从消息中删除。
声明交换器、队列和绑定
应用本身也可以根据需要自行声明交换器、队列和绑定关系。对于Kafka来说,这可能就无法实现,因为增加分区会影响所有的消费者,因此,Kafka通过集中管理的方式来实现。在这一点上,RabbitMQ则表现的相对灵活,因为应用可以在不影响其他应用的前提下自行管理各自的RabbitMQ视角。在实际应用中,我们可以基于简单的约定准则,构建实现复杂的路由拓扑,并且实现他们自治。但我们需要关注到容量与性能,那么使用自定义交互器、队列和绑定关系就要非常当心了,很可能会导致我们的路由拓扑会越来越复杂。
虚拟主机
虚拟主机是交换器和消息队列的逻辑容器,可以用来实现对交换器和消息队列的访问控制。RabbitMQ不支持跨虚拟主机路由,本章包含的所有示例都假设在一个大的虚拟主机内。