分配的实施
我们了解了Kafka中消费者的分区分配策略之后是否会有这样的疑问:如果消费者客户端中配置了两个分配策略,那么以哪个为准?如果有多个消费者,彼此所配置的分配策略并不完全相同,那么以哪个为准?多个消费者之间的分区分配是需要协同的,那么这个协同的过程又是怎样?
在kafka中有一个组协调器(GroupCoordinator)负责来协调消费组内各个消费者的分区分配,对于每一个消费组而言,在kafka服务端都会有其对应的一个组协调器。具体的协调分区分配的过程如下:
1.首先各个消费者向GroupCoordinator提案各自的分配策略。如下图所示,各个消费者提案的分配策略和订阅信息都包含在JoinGroupRequest请求中。
2.GroupCoordinator收集各个消费者的提案,然后执行以下两个步骤:一、选举消费组的leader;二、选举消费组的分区分配策略。
选举消费组的分区分配策略比较好理解,为什么这里还要选举消费组的leader,因为最终的分区分配策略的实施需要有一个成员来执行,而这个leader消费者正好扮演了这一个角色。在Kafka中把具体的分区分配策略的具体执行权交给了消费者客户端,这样可以提供更高的灵活性。比如需要变更分配策略,那么只需修改消费者客户端就醒来,而不必要修改并重启Kafka服务端。
怎么选举消费组的leader? 这个分两种情况分析:如果消费组内还没有leader,那么第一个加入消费组的消费者即为消费组的leader;如果某一时刻leader消费者由于某些原因退出了消费组,那么就会重新选举一个新的leader,这个重新选举leader的过程又更为“随意”了,相关代码如下:
1
2
3
4
5 1//scala code.
2private val members = new mutable.HashMap[String, MemberMetadata]
3var leaderId = members.keys.head
4
5
解释一下这2行代码:在GroupCoordinator中消费者的信息是以HashMap的形式存储的,其中key为消费者的名称,而value是消费者相关的元数据信息。leaderId表示leader消费者的名称,它的取值为HashMap中的第一个键值对的key,这种选举的方式基本上和随机挑选无异。
总体上来说,消费组的leader选举过程是很随意的。
怎么选举消费组的分配策略?投票决定。每个消费者都可以设置自己的分区分配策略,对于消费组而言需要从各个消费者所呈报上来的各个分配策略中选举一个彼此都“信服”的策略来进行整体上的分区分配。这个分区分配的选举并非由leader消费者来决定,而是根据消费组内的各个消费者投票来决定。这里所说的“根据组内的各个消费者投票来决定”不是指GroupCoordinator还要与各个消费者进行进一步交互来实施,而是根据各个消费者所呈报的分配策略来实施。最终所选举的分配策略基本上可以看做是被各个消费者所支持的最多的策略,具体的选举过程如下:
- 收集各个消费者所支持的所有分配策略,组成候选集candidates。
- 每个消费者从候选集candidates中找出第一个自身所支持的策略,为这个策略投上一票。
- 计算候选集中各个策略的选票数,选票数最多的策略即为当前消费组的分配策略。
如果某个消费者并不支持所选举出的分配策略,那么就会报错。
3.GroupCoordinator发送回执给各个消费者,并交由leader消费者执行具体的分区分配。
如上图所示,JoinGroupResponse回执中包含有GroupCoordinator中投票选举出的分配策略的信息。并且,只有leader消费者的回执中包含各个消费者的订阅信息,因为只需要leader消费者根据订阅信息来执行具体的分配,其余的消费并不需要。
4.leader消费者在整理出具体的分区分配方案后通过SyncGroupRequest请求提交给GroupCoordinator,然后GroupCoordinator为每个消费者挑选出各自的分配结果并通过SyncGroupResponse回执以告知它们。