自定义分区分配策略
读者不仅可以任意选用Kafka所提供的3种分配策略,还可以自定义分配策略来实现更多可选的功能。自定义的分配策略必须要实现org.apache.kafka.clients.consumer.internals.PartitionAssignor接口。PartitionAssignor接口的定义如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 1Subscription subscription(Set<String> topics);
2String name();
3Map<String, Assignment> assign(Cluster metadata,
4 Map<String, Subscription> subscriptions);
5void onAssignment(Assignment assignment);
6class Subscription {
7 private final List<String> topics;
8 private final ByteBuffer userData;
9(省略若干方法……)
10}
11class Assignment {
12 private final List<TopicPartition> partitions;
13 private final ByteBuffer userData;
14(省略若干方法……)
15}
16
17
PartitionAssignor接口中定义了两个内部类:Subscription和Assignment。
Subscription类用来表示消费者的订阅信息,类中有两个属性:topics和userData,分别表示消费者所订阅topic列表和用户自定义信息。PartitionAssignor接口通过subscription()方法来设置消费者自身相关的Subscription信息,注意到此方法中只有一个参数topics,与Subscription类中的topics的相互呼应,但是并没有有关userData的参数体现。为了增强用户对分配结果的控制,可以在subscription()方法内部添加一些影响分配的用户自定义信息赋予userData,比如:权重、ip地址、host或者机架(rack)等等。
举例,在subscription()这个方法中提供机架信息,标识此消费者所部署的机架位置,在分区分配时可以根据分区的leader副本所在的机架位置来实施具体的分配,这样可以让消费者与所需拉取消息的broker节点处于同一机架。参考下图,消费者consumer1和broker1都部署在机架rack1上,消费者consumer2和broker2都部署在机架rack2上。如果分区的分配不是机架感知的,那么有可能与图(上部分)中的分配结果一样,consumer1消费broker2中的分区,而consumer2消费broker1中的分区;如果分区的分配是机架感知的,那么就会出现图(下部分)的分配结果,consumer1消费broker1中的分区,而consumer2消费broker2中的分区,这样相比于前一种情形而言,既可以减少消费延迟又可以减少跨机架带宽的占用。
再来说一下Assignment类,它是用来表示分配结果信息的,类中也有两个属性:partitions和userData,分别表示所分配到的分区集合和用户自定义的数据。可以通过PartitionAssignor接口中的onAssignment()方法是在每个消费者收到消费组leader分配结果时的回调函数,例如在StickyAssignor策略中就是通过这个方法保存当前的分配方案,以备在下次消费组再平衡(rebalance)时可以提供分配参考依据。
接口中的name()方法用来提供分配策略的名称,对于Kafka提供的3种分配策略而言,RangeAssignor对应的protocol_name为“range”,RoundRobinAssignor对应的protocol_name为“roundrobin”,StickyAssignor对应的protocol_name为“sticky”,所以自定义的分配策略中要注意命名的时候不要与已存在的分配策略发生冲突。这个命名用来标识分配策略的名称,在后面所描述的加入消费组以及选举消费组leader的时候会有涉及。
真正的分区分配方案的实现是在assign()方法中,方法中的参数metadata表示集群的元数据信息,而subscriptions表示消费组内各个消费者成员的订阅信息,最终方法返回各个消费者的分配信息。
Kafka中还提供了一个抽象类org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor,它可以简化PartitionAssignor接口的实现,对assign()方法进行了实现,其中会将Subscription中的userData信息去掉后,在进行分配。Kafka提供的3种分配策略都是继承自这个抽象类。如果开发人员在自定义分区分配策略时需要使用userData信息来控制分区分配的结果,那么就不能直接继承AbstractPartitionAssignor这个抽象类,而需要直接实现PartitionAssignor接口。
下面笔者参考Kafka中的RangeAssignor策略来自定义一个随机的分配策略,这里笔者称之为RandomAssignor,具体代码实现如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68 1package org.apache.kafka.clients.consumer;
2
3import org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor;
4import org.apache.kafka.common.TopicPartition;
5import java.util.*;
6
7/**
8 * Created by 朱小厮 on 2018/7/12.
9 * 欢迎关注笔者的微信公众号:朱小厮的博客.
10 */
11public class RandomAssignor extends AbstractPartitionAssignor {
12 @Override
13 public String name() {
14 return "random";
15 }
16
17 @Override
18 public Map<String, List<TopicPartition>> assign(
19 Map<String, Integer> partitionsPerTopic,
20 Map<String, Subscription> subscriptions) {
21 Map<String, List<String>> consumersPerTopic =
22consumersPerTopic(subscriptions);
23 Map<String, List<TopicPartition>> assignment = new HashMap<>();
24 for (String memberId : subscriptions.keySet()) {
25 assignment.put(memberId, new ArrayList<>());
26 }
27
28 // 针对每一个topic进行分区分配
29 for (Map.Entry<String, List<String>> topicEntry :
30consumersPerTopic.entrySet()) {
31 String topic = topicEntry.getKey();
32 List<String> consumersForTopic = topicEntry.getValue();
33 int consumerSize = consumersForTopic.size();
34
35 Integer numPartitionsForTopic = partitionsPerTopic.get(topic);
36 if (numPartitionsForTopic == null) {
37 continue;
38 }
39
40 // 当前topic下的所有分区
41 List<TopicPartition> partitions =
42AbstractPartitionAssignor.partitions(topic,
43numPartitionsForTopic);
44 // 将每个分区随机分配给一个消费者
45 for (TopicPartition partition : partitions) {
46 int rand = new Random().nextInt(consumerSize);
47 String randomConsumer = consumersForTopic.get(rand);
48 assignment.get(randomConsumer).add(partition);
49 }
50 }
51 return assignment;
52 }
53
54 // 获取每个topic所对应的消费者列表,即:[topic, List[consumer]]
55 private Map<String, List<String>> consumersPerTopic(
56Map<String, Subscription> consumerMetadata) {
57 Map<String, List<String>> res = new HashMap<>();
58 for (Map.Entry<String, Subscription> subscriptionEntry :
59consumerMetadata.entrySet()) {
60 String consumerId = subscriptionEntry.getKey();
61 for (String topic : subscriptionEntry.getValue().topics())
62 put(res, topic, consumerId);
63 }
64 return res;
65 }
66}
67
68
在使用时,消费者客户端需要添加相应的Properties参数,示例如下:
1
2
3
4 1properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
2RandomAssignor.class.getName());
3
4
这里只是演示如何自定义实现一个分区分配策略,RandomAssignor的实现并不是特别的理想,并不见得会比Kafka自身所提供的RangeAssignor策略之类的要好。