系列目录
kafka原理和实践(一)原理:10分钟入门
kafka原理和实践(二)spring-kafka简单实践
kafka原理和实践(三)spring-kafka生产者源码
kafka原理和实践(四)spring-kafka消费者源码
kafka原理和实践(五)spring-kafka配置详解
kafka原理和实践(六)总结升华
==============正文分割线=====================
一、kafkaConsumer消费者模型
如上图所示,spring-kafka消费者模型主要流程:
1.容器启动,轮询执行消费。
2.kafkaConsumer拉取消息流程:
1)Fetcher请求获取器获取请求并存储在unset中
2)ConsumerNetworkClient网络客户端执行poll(),调用NetWlrikClient的send()方法从unset中获取ClientRequest请求转成RequestSend最终塞进Selector的KafkaChannel通道中,Seletcor.send()从kafka集群拉取待消费数据ConsumerRecords
- 消费者监听器MessageListener.onMessage()执行用户自定义的实际消费业务逻辑。
一、kafkaConsumer构造
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118 11 @SuppressWarnings("unchecked")
2 2 privateKafkaConsumer(ConsumerConfig config,
3 3 Deserializer<K> keyDeserializer,
4 4 Deserializer<V> valueDeserializer) {
5 5 try {
6 6 log.debug("Starting the Kafka consumer");
7 7 this.requestTimeoutMs = config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
8 8 int sessionTimeOutMs = config.getInt(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG);
9 9 int fetchMaxWaitMs = config.getInt(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG);
10 10 if (this.requestTimeoutMs <= sessionTimeOutMs || this.requestTimeoutMs <= fetchMaxWaitMs)
11 11 throw new ConfigException(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG + " should be greater than " + ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG + " and " + ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG);
12 12 this.time = new SystemTime();
13 13
14 14 String clientId = config.getString(ConsumerConfig.CLIENT_ID_CONFIG);
15 15 if (clientId.length() <= 0)
16 16 clientId = "consumer-" + CONSUMER_CLIENT_ID_SEQUENCE.getAndIncrement();
17 17 this.clientId = clientId;
18 18 Map<String, String> metricsTags = new LinkedHashMap<>();
19 19 metricsTags.put("client-id", clientId);
20 20 MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG))
21 21 .timeWindow(config.getLong(ConsumerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS)
22 22 .tags(metricsTags);
23 23 List<MetricsReporter> reporters = config.getConfiguredInstances(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG,
24 24 MetricsReporter.class);
25 25 reporters.add(new JmxReporter(JMX_PREFIX));
26 26 this.metrics = new Metrics(metricConfig, reporters, time);
27 27 this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
28 28
29 29 // load interceptors and make sure they get clientId
30 30 Map<String, Object> userProvidedConfigs = config.originals();
31 31 userProvidedConfigs.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
32 32 List<ConsumerInterceptor<K, V>> interceptorList = (List) (new ConsumerConfig(userProvidedConfigs)).getConfiguredInstances(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
33 33 ConsumerInterceptor.class);
34 34 this.interceptors = interceptorList.isEmpty() ? null : new ConsumerInterceptors<>(interceptorList);
35 35 if (keyDeserializer == null) {
36 36 this.keyDeserializer = config.getConfiguredInstance(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
37 37 Deserializer.class);
38 38 this.keyDeserializer.configure(config.originals(), true);
39 39 } else {
40 40 config.ignore(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
41 41 this.keyDeserializer = keyDeserializer;
42 42 }
43 43 if (valueDeserializer == null) {
44 44 this.valueDeserializer = config.getConfiguredInstance(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
45 45 Deserializer.class);
46 46 this.valueDeserializer.configure(config.originals(), false);
47 47 } else {
48 48 config.ignore(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
49 49 this.valueDeserializer = valueDeserializer;
50 50 }
51 51 ClusterResourceListeners clusterResourceListeners = configureClusterResourceListeners(keyDeserializer, valueDeserializer, reporters, interceptorList);
52 52 this.metadata = new Metadata(retryBackoffMs, config.getLong(ConsumerConfig.METADATA_MAX_AGE_CONFIG), false, clusterResourceListeners);
53 53 List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
54 54 this.metadata.update(Cluster.bootstrap(addresses), 0);
55 55 String metricGrpPrefix = "consumer";
56 56 ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config.values());
57 57 NetworkClient netClient = new NetworkClient(
58 58 new Selector(config.getLong(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), metrics, time, metricGrpPrefix, channelBuilder),
59 59 this.metadata,
60 60 clientId,
61 61 100, // a fixed large enough value will suffice
62 62 config.getLong(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG),
63 63 config.getInt(ConsumerConfig.SEND_BUFFER_CONFIG),
64 64 config.getInt(ConsumerConfig.RECEIVE_BUFFER_CONFIG),
65 65 config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG), time);
66 66 this.client = new ConsumerNetworkClient(netClient, metadata, time, retryBackoffMs,
67 67 config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG));
68 68 OffsetResetStrategy offsetResetStrategy = OffsetResetStrategy.valueOf(config.getString(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).toUpperCase(Locale.ROOT));
69 69 this.subscriptions = new SubscriptionState(offsetResetStrategy);
70 70 List<PartitionAssignor> assignors = config.getConfiguredInstances(
71 71 ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
72 72 PartitionAssignor.class);
73 73 this.coordinator = new ConsumerCoordinator(this.client,
74 74 config.getString(ConsumerConfig.GROUP_ID_CONFIG),
75 75 config.getInt(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG),
76 76 config.getInt(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG),
77 77 config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG),
78 78 assignors,
79 79 this.metadata,
80 80 this.subscriptions,
81 81 metrics,
82 82 metricGrpPrefix,
83 83 this.time,
84 84 retryBackoffMs,
85 85 new ConsumerCoordinator.DefaultOffsetCommitCallback(),
86 86 config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG),
87 87 config.getInt(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG),
88 88 this.interceptors,
89 89 config.getBoolean(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG));
90 90 this.fetcher = new Fetcher<>(this.client,
91 91 config.getInt(ConsumerConfig.FETCH_MIN_BYTES_CONFIG),
92 92 config.getInt(ConsumerConfig.FETCH_MAX_BYTES_CONFIG),
93 93 config.getInt(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG),
94 94 config.getInt(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG),
95 95 config.getInt(ConsumerConfig.MAX_POLL_RECORDS_CONFIG),
96 96 config.getBoolean(ConsumerConfig.CHECK_CRCS_CONFIG),
97 97 this.keyDeserializer,
98 98 this.valueDeserializer,
99 99 this.metadata,
100100 this.subscriptions,
101101 metrics,
102102 metricGrpPrefix,
103103 this.time,
104104 this.retryBackoffMs);
105105
106106 config.logUnused();
107107 AppInfoParser.registerAppInfo(JMX_PREFIX, clientId);
108108
109109 log.debug("Kafka consumer created");
110110 } catch (Throwable t) {
111111 // call close methods if internal objects are already constructed
112112 // this is to prevent resource leak. see KAFKA-2121
113113 close(true);
114114 // now propagate the exception
115115 throw new KafkaException("Failed to construct kafka consumer", t);
116116 }
117117 }
118
从
KafkaConsumer构造函数来看,核心组件有:
1.Metadata:封装了元数据的一些逻辑的类。元数据仅保留一个主题的子集,随着时间的推移可以添加。当我们请求一个主题的元数据时,我们没有任何元数据会触发元数据更新。如果对元数据启用了主题过期,那么在更新之后,在过期时间间隔内未使用的任何主题都将从元数据刷新集中删除。
2**.**
ConsumerNetworkClient:高等级消费者访问网络层,为请求Future任务
提供基本支持。这个类是线程安全的,但是不提供响应回调的同步。这保证在调用它们时不会持有锁。
SubscriptionState:订阅的TopicPartition的offset状态维护
4.ConsumerCoordinator:消费者的协调者,负责partitiion的分配,reblance
5.Fetcher:从brokers上按照配置获取消息。
二、消费者容器启动流程
kafka消费者有两种常见的实现方式:
1.xml配置文件
2.基于注解实现
其实,不管哪种方式,本质只是生成Spring Bean的方式不同而已。我们就以xml的实现方式来追踪源码。
基于xml的总体配置如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50 11 <!-- 1.定义consumer的参数 -->
2 2 <bean id="consumerProperties" class="java.util.HashMap">
3 3 <constructor-arg>
4 4 <map>
5 5 <entry key="bootstrap.servers" value="${bootstrap.servers}" />
6 6 <entry key="group.id" value="${group.id}" />
7 7 <entry key="enable.auto.commit" value="${enable.auto.commit}" />
8 8 <entry key="session.timeout.ms" value="${session.timeout.ms}" />
9 9 <entry key="key.deserializer"
1010 value="org.apache.kafka.common.serialization.StringDeserializer" />
1111 <entry key="value.deserializer"
1212 value="org.apache.kafka.common.serialization.StringDeserializer" />
1313 </map>
1414 </constructor-arg>
1515 </bean>
1616
1717 <!-- 2.创建consumerFactory bean -->
1818 <bean id="consumerFactory"
1919 class="org.springframework.kafka.core.DefaultKafkaConsumerFactory" >
2020 <constructor-arg>
2121 <ref bean="consumerProperties" />
2222 </constructor-arg>
2323 </bean>
2424
2525 <!-- 3.定义消费实现类 -->
2626 <bean id="kafkaConsumerService" class="xxx.service.impl.KafkaConsumerSerivceImpl" />
2727
2828 <!-- 4.消费者容器配置信息 -->
2929 <bean id="containerProperties" class="org.springframework.kafka.listener.config.ContainerProperties">
3030 <!-- topic -->
3131 <constructor-arg name="topics">
3232 <list>
3333 <value>${kafka.consumer.topic.credit.for.lease}</value>
3434 <value>${loan.application.feedback.topic}</value>
3535 <value>${templar.agreement.feedback.topic}</value>
3636 <value>${templar.aggrement.active.feedback.topic}</value>
3737 <value>${templar.aggrement.agreementRepaid.topic}</value>
3838 <value>${templar.aggrement.agreementWithhold.topic}</value>
3939 <value>${templar.aggrement.agreementRepayRemind.topic}</value>
4040 </list>
4141 </constructor-arg>
4242 <property name="messageListener" ref="kafkaConsumerService" />
4343 </bean>
4444 <!-- 5.消费者并发消息监听容器,执行doStart()方法 -->
4545 <bean id="messageListenerContainer" class="org.springframework.kafka.listener.ConcurrentMessageListenerContainer" init-method="doStart" >
4646 <constructor-arg ref="consumerFactory" />
4747 <constructor-arg ref="containerProperties" />
4848 <property name="concurrency" value="${concurrency}" />
4949 </bean>
50
分为5个步骤:
2.1.定义消费参数bean
consumerProperties ,就是个map<key,value>
2.2.创建consumerFactory bean
1
2 1DefaultKafkaConsumerFactory 实现了ConsumerFactory接口,提供创建消费者和判断是否自动提交2个方法。通过consumerProperties作为参数构造。
2
1
2
3
4
5
6
7
8
9 11 public interface ConsumerFactory<K, V> {
22
33 Consumer<K, V> createConsumer();
44
55 boolean isAutoCommit();
66
77
88 }
9
2.3.定义消费实现类
自定义一个类实现MessageListener接口,接口设计如下:
实现
onMessage方法,去消费接收到的消息。两种方案:
1)
MessageListener 消费完消息后
自动提交offset
(enable.auto.commit=true时),可提高效率,存在消费失败但移动了偏移量的风险。
2)
AcknowledgingMessageListener
消费完消息后手动提交offset(enable.auto.commit=false时)效率降低,无消费失败但移动偏移量的风险。
2.4.监听容器配置信息
1
2 1ContainerProperties:包含了一个监听容器的运行时配置信息,主要定义了监听的主题、分区、初始化偏移量,还有消息监听器。
2
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140 11 public class ContainerProperties {
2 2
3 3 private static final int DEFAULT_SHUTDOWN_TIMEOUT = 10000;
4 4
5 5 private static final int DEFAULT_QUEUE_DEPTH = 1;
6 6
7 7 private static final int DEFAULT_PAUSE_AFTER = 10000;
8 8
9 9 /**
10 10 * Topic names.监听的主题字符串数组
11 11 */
12 12 private final String[] topics;
13 13
14 14 /**
15 15 * Topic pattern.监听的主题模板
16 16 */
17 17 private final Pattern topicPattern;
18 18
19 19 /**
20 20 * Topics/partitions/initial offsets.
21 21 */
22 22 private final TopicPartitionInitialOffset[] topicPartitions;
23 23
24 24 /**
25 25 * 确认模式(自动确认属性为false时使用)
26 26 * <ul>
27 27 * <li>1.RECORD逐条确认: 每条消息被发送给监听者后确认</li>
28 28 * <li>2.BATCH批量确认: 当批量消息记录被消费者接收到并传送给监听器时确认</li>
29 30 * <li>3.TIME超时确认:当超过设置的超时时间毫秒数时确认(should be greater than
30 31 * {@code #setPollTimeout(long) pollTimeout}.</li>
31 32 * <li>4.COUNT计数确认: 当接收到指定数量之后确认</li>
32 33 * <li>5.MANUAL手动确认:由监听器负责确认(AcknowledgingMessageListener)</ul>
33 36 */
34 37 private AbstractMessageListenerContainer.AckMode ackMode = AckMode.BATCH;
35 38
36 39 /**
37 40 * The number of outstanding record count after which offsets should be
38 41 * committed when {@link AckMode#COUNT} or {@link AckMode#COUNT_TIME} is being
39 42 * used.
40 43 */
41 44 private int ackCount;
42 45
43 46 /**
44 47 * The time (ms) after which outstanding offsets should be committed when
45 48 * {@link AckMode#TIME} or {@link AckMode#COUNT_TIME} is being used. Should be
46 49 * larger than
47 50 */
48 51 private long ackTime;
49 52
50 53 /**
51 54 * 消息监听器,必须是 MessageListener或者AcknowledgingMessageListener两者中的一个 55 * 56 */
52 57 private Object messageListener;
53 58
54 59 /**
55 60 * The max time to block in the consumer waiting for records.
56 61 */
57 62 private volatile long pollTimeout = 1000;
58 63
59 64 /**
60 65 * 线程执行器:轮询消费者
61 66 */
62 67 private AsyncListenableTaskExecutor consumerTaskExecutor;
63 68
64 69 /**
65 70 * 线程执行器:调用监听器
66 71 */
67 72 private AsyncListenableTaskExecutor listenerTaskExecutor;
68 73
69 74 /**
70 75 * 错误回调,当监听器抛出异常时
71 76 */
72 77 private GenericErrorHandler<?> errorHandler;
73 78
74 79 /**
75 80 * When using Kafka group management and {@link #setPauseEnabled(boolean)} is
76 81 * true, the delay after which the consumer should be paused. Default 10000.
77 82 */
78 83 private long pauseAfter = DEFAULT_PAUSE_AFTER;
79 84
80 85 /**
81 86 * When true, avoids rebalancing when this consumer is slow or throws a
82 87 * qualifying exception - pauses the consumer. Default: true.
83 88 * @see #pauseAfter
84 89 */
85 90 private boolean pauseEnabled = true;
86 91
87 92 /**
88 93 * Set the queue depth for handoffs from the consumer thread to the listener
89 94 * thread. Default 1 (up to 2 in process).
90 95 */
91 96 private int queueDepth = DEFAULT_QUEUE_DEPTH;
92 97
93 98 /**
94 99 * 停止容器超时时间 */
95103 private long shutdownTimeout = DEFAULT_SHUTDOWN_TIMEOUT;
96104
97105 /**
98106 * 用户定义的消费者再平衡监听器实现类 */
99108 private ConsumerRebalanceListener consumerRebalanceListener;
100109
101110 /**
102111 * 提交回调,默认记录日志。 */
103114 private OffsetCommitCallback commitCallback;
104115
105116 /**
106117 * Whether or not to call consumer.commitSync() or commitAsync() when the
107118 * container is responsible for commits. Default true. See
108119 * https://github.com/spring-projects/spring-kafka/issues/62 At the time of
109120 * writing, async commits are not entirely reliable.
110121 */
111122 private boolean syncCommits = true;
112123
113124 private boolean ackOnError = true;
114125
115126 private Long idleEventInterval;
116127
117128 publicContainerProperties(String... topics) {
118129 Assert.notEmpty(topics, "An array of topicPartitions must be provided");
119130 this.topics = Arrays.asList(topics).toArray(new String[topics.length]);
120131 this.topicPattern = null;
121132 this.topicPartitions = null;
122133 }
123134
124135 public ContainerProperties(Pattern topicPattern) {
125136 this.topics = null;
126137 this.topicPattern = topicPattern;
127138 this.topicPartitions = null;
128139 }
129140
130141 public ContainerProperties(TopicPartitionInitialOffset... topicPartitions) {
131142 this.topics = null;
132143 this.topicPattern = null;
133144 Assert.notEmpty(topicPartitions, "An array of topicPartitions must be provided");
134145 this.topicPartitions = new LinkedHashSet<>(Arrays.asList(topicPartitions))
135146 .toArray(new TopicPartitionInitialOffset[topicPartitions.length]);
136147 }
137148 ...省略各种set、get
138149
139150 }
140
2.5.启动并发消息监听容器
核心类
ConcurrentMessageListenerContainer,继承自抽象类
AbstractMessageListenerContainer,类图如下:
看上图可知AbstractMessageListenerContainer有2个实现类分别对应单线程和多线程,建议采用多线程消费。下面分析一下主要ConcurrentMessageListenerContainer类,注意2个方法:
1.构造函数,入参:消费者工厂ConsumerFactory+容器配置
ContainerProperties
doStart():核心方法KafkaMessageListenerContainer的
start()方法。
源码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86 11 public class ConcurrentMessageListenerContainer<K, V> extends AbstractMessageListenerContainer<K, V> {
2 2
3 3 private final ConsumerFactory<K, V> consumerFactory;
4 4
5 5 private final List<KafkaMessageListenerContainer<K, V>> containers = new ArrayList<>();
6 6
7 7 private int concurrency = 1;
8 8
9 9 /**
10 10 * Construct an instance with the supplied configuration properties.
11 11 * The topic partitions are distributed evenly across the delegate
12 12 * {@link KafkaMessageListenerContainer}s.
13 13 * @param consumerFactory the consumer factory.
14 14 * @param containerProperties the container properties.
15 15 */
16 16 public ConcurrentMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
17 17 ContainerProperties containerProperties) {
18 18 super(containerProperties);
19 19 Assert.notNull(consumerFactory, "A ConsumerFactory must be provided");
20 20 this.consumerFactory = consumerFactory;
21 21 }
22 22
23 23 public int getConcurrency() {
24 24 return this.concurrency;
25 25 }
26 26
27 27 /**
28 28 * The maximum number of concurrent {@link KafkaMessageListenerContainer}s running.
29 29 * Messages from within the same partition will be processed sequentially.
30 30 * @param concurrency the concurrency.
31 31 */
32 32 public void setConcurrency(int concurrency) {
33 33 Assert.isTrue(concurrency > 0, "concurrency must be greater than 0");
34 34 this.concurrency = concurrency;
35 35 }
36 36
37 37 /**
38 38 * Return the list of {@link KafkaMessageListenerContainer}s created by
39 39 * this container.
40 40 * @return the list of {@link KafkaMessageListenerContainer}s created by
41 41 * this container.
42 42 */
43 43 public List<KafkaMessageListenerContainer<K, V>> getContainers() {
44 44 return Collections.unmodifiableList(this.containers);
45 45 }
46 46
47 47 /*
48 48 * Under lifecycle lock.
49 49 */
50 50 @Override
51 51 protected voiddoStart() {
52 52 if (!isRunning()) {
53 53 ContainerProperties containerProperties = getContainerProperties();
54 54 TopicPartitionInitialOffset[] topicPartitions = containerProperties.getTopicPartitions();
55 55 if (topicPartitions != null//校验并发数>分区数,报错。
56 56 && this.concurrency > topicPartitions.length) {
57 57 this.logger.warn("When specific partitions are provided, the concurrency must be less than or "
58 58 + "equal to the number of partitions; reduced from " + this.concurrency + " to "
59 59 + topicPartitions.length);
60 60 this.concurrency = topicPartitions.length;//并发数最大只能=分区数
61 61 }
62 62 setRunning(true);
63 63 //遍历创建监听器容器
64 64 for (int i = 0; i < this.concurrency; i++) {
65 65 KafkaMessageListenerContainer<K, V> container;
66 66 if (topicPartitions == null) {
67 67 container = new KafkaMessageListenerContainer<>(this.consumerFactory, containerProperties);
68 68 }
69 69 else {
70 70 container = new KafkaMessageListenerContainer<>(this.consumerFactory, containerProperties,
71 71 partitionSubset(containerProperties, i));
72 72 }
73 73 if (getBeanName() != null) {
74 74 container.setBeanName(getBeanName() + "-" + i);
75 75 }
76 76 if (getApplicationEventPublisher() != null) {
77 77 container.setApplicationEventPublisher(getApplicationEventPublisher());
78 78 }
79 79 container.setClientIdSuffix("-" + i);
80 80 container.start();//核心方法,启动容器
81 81 this.containers.add(container);
82 82 }
83 83 }
84 84 }146 ...省略
85147 }
86
继续追踪
,调用AbstractMessageListenerContainer的
doStart(),值得注意的是start()和stop方法加了同一把锁,用于锁住生命周期。
1
2
3
4
5
6
7
8
9
10
11
12
13
14 11 private final Object lifecycleMonitor = new Object();
2 2
3 3 @Override
4 4 public final void start() {
5 5 synchronized (this.lifecycleMonitor) {
6 6 Assert.isTrue(
7 7 this.containerProperties.getMessageListener() instanceof KafkaDataListener,
8 8 "A " + KafkaDataListener.class.getName() + " implementation must be provided");
9 9 doStart();
1010 }
1111 }
1212
1313 protected abstract void doStart();
14
最终调用的是KafkaMessageListenerContainer的
doStart()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48 11 @Override
2 2 protected voiddoStart() {
3 3 if (isRunning()) {
4 4 return;
5 5 }
6 6 ContainerProperties containerProperties = getContainerProperties();
7 7
8 8 if (!this.consumerFactory.isAutoCommit()) {
9 9 AckMode ackMode = containerProperties.getAckMode();
1010 if (ackMode.equals(AckMode.COUNT) || ackMode.equals(AckMode.COUNT_TIME)) {
1111 Assert.state(containerProperties.getAckCount() > 0, "'ackCount' must be > 0");
1212 }
1313 if ((ackMode.equals(AckMode.TIME) || ackMode.equals(AckMode.COUNT_TIME))
1414 && containerProperties.getAckTime() == 0) {
1515 containerProperties.setAckTime(5000);
1616 }
1717 }
1818
1919 Object messageListener = containerProperties.getMessageListener();
2020 Assert.state(messageListener != null, "A MessageListener is required");
2121 if (messageListener instanceof GenericAcknowledgingMessageListener) {
2222 this.acknowledgingMessageListener = (GenericAcknowledgingMessageListener<?>) messageListener;
2323 }
2424 else if (messageListener instanceof GenericMessageListener) {
2525 this.listener = (GenericMessageListener<?>) messageListener;
2626 }
2727 else {
2828 throw new IllegalStateException("messageListener must be 'MessageListener' "
2929 + "or 'AcknowledgingMessageListener', not " + messageListener.getClass().getName());
3030 }
3131 if (containerProperties.getConsumerTaskExecutor() == null) {
3232 SimpleAsyncTaskExecutor consumerExecutor = new SimpleAsyncTaskExecutor(
3333 (getBeanName() == null ? "" : getBeanName()) + "-C-");
3434 containerProperties.setConsumerTaskExecutor(consumerExecutor);
3535 }
3636 if (containerProperties.getListenerTaskExecutor() == null) {
3737 SimpleAsyncTaskExecutor listenerExecutor = new SimpleAsyncTaskExecutor(
3838 (getBeanName() == null ? "" : getBeanName()) + "-L-");
3939 containerProperties.setListenerTaskExecutor(listenerExecutor);
4040 }//1.构建 监听消费者
4141 this.listenerConsumer = new ListenerConsumer(this.listener, this.acknowledgingMessageListener);
4242 setRunning(true);
43//2.异步提交 监听消费者任务,返回Future并赋值。
4443 this.listenerConsumerFuture = containerProperties
4544 .getConsumerTaskExecutor()
4645 .submitListenable(this.listenerConsumer);
4746 }
48
doStart主要包含2个操作:构建内部类
ListenerConsumer
和
提交 监听消费者任务,返回Future并赋值。
1.构建内部类
ListenerConsumer
ListenerConsumer类图如下:
ListenerConsumer构造函数源码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85 11 @SuppressWarnings("unchecked")
2 2 ListenerConsumer(GenericMessageListener<?> listener, GenericAcknowledgingMessageListener<?> ackListener) {
3 3 Assert.state(!this.isAnyManualAck || !this.autoCommit,
4 4 "Consumer cannot be configured for auto commit for ackMode " + this.containerProperties.getAckMode());
5 5 @SuppressWarnings("deprecation")
6 6 final Consumer<K, V> consumer =
7 7 KafkaMessageListenerContainer.this.consumerFactory instanceof
8 8 org.springframework.kafka.core.ClientIdSuffixAware
9 9 ? ((org.springframework.kafka.core.ClientIdSuffixAware<K, V>) KafkaMessageListenerContainer
1010 .this.consumerFactory)
1111 .createConsumer(KafkaMessageListenerContainer.this.clientIdSuffix)
1212 : KafkaMessageListenerContainer.this.consumerFactory.createConsumer();
1313
1414 this.theListener = listener == null ? ackListener : listener;
1515 ConsumerRebalanceListener rebalanceListener = createRebalanceListener(consumer);
1616
1717 if (KafkaMessageListenerContainer.this.topicPartitions == null) {
1818 if (this.containerProperties.getTopicPattern() != null) {
1919 consumer.subscribe(this.containerProperties.getTopicPattern(), rebalanceListener);
2020 }
2121 else {
2222 consumer.subscribe(Arrays.asList(this.containerProperties.getTopics()), rebalanceListener);
2323 }
2424 }
2525 else {
2626 List<TopicPartitionInitialOffset> topicPartitions =
2727 Arrays.asList(KafkaMessageListenerContainer.this.topicPartitions);
2828 this.definedPartitions = new HashMap<>(topicPartitions.size());
2929 for (TopicPartitionInitialOffset topicPartition : topicPartitions) {
3030 this.definedPartitions.put(topicPartition.topicPartition(),
3131 new OffsetMetadata(topicPartition.initialOffset(), topicPartition.isRelativeToCurrent()));
3232 }
3333 consumer.assign(new ArrayList<>(this.definedPartitions.keySet()));
3434 }
3535 this.consumer = consumer;
3636 GenericErrorHandler<?> errHandler = this.containerProperties.getGenericErrorHandler();
3737 this.genericListener = listener;
38 //1.
39if (this.theListener instanceof BatchAcknowledgingMessageListener) {
4038 this.listener = null;
4139 this.batchListener = null;
4240 this.acknowledgingMessageListener = null;
4341 this.batchAcknowledgingMessageListener = (BatchAcknowledgingMessageListener<K, V>) this.theListener;
4442 this.isBatchListener = true;
4543 }//2.
4644 else if (this.theListener instanceof AcknowledgingMessageListener) {
4745 this.listener = null;
4846 this.acknowledgingMessageListener = (AcknowledgingMessageListener<K, V>) this.theListener;
4947 this.batchListener = null;
5048 this.batchAcknowledgingMessageListener = null;
5149 this.isBatchListener = false;
5250 }//3.
5351 else if (this.theListener instanceof BatchMessageListener) {
5452 this.listener = null;
5553 this.batchListener = (BatchMessageListener<K, V>) this.theListener;
5654 this.acknowledgingMessageListener = null;
5755 this.batchAcknowledgingMessageListener = null;
5856 this.isBatchListener = true;
5957 }//4.
6058 else if (this.theListener instanceof MessageListener) {
6159 this.listener = (MessageListener<K, V>) this.theListener;
6260 this.batchListener = null;
6361 this.acknowledgingMessageListener = null;
6462 this.batchAcknowledgingMessageListener = null;
6563 this.isBatchListener = false;
6664 }
6765 else {
6866 throw new IllegalArgumentException("Listener must be one of 'MessageListener', "
6967 + "'BatchMessageListener', 'AcknowledgingMessageListener', "
7068 + "'BatchAcknowledgingMessageListener', not " + this.theListener.getClass().getName());
7169 }
7270 if (this.isBatchListener) {
7371 validateErrorHandler(true);
7472 this.errorHandler = new LoggingErrorHandler();
7573 this.batchErrorHandler = errHandler == null ? new BatchLoggingErrorHandler()
7674 : (BatchErrorHandler) errHandler;
7775 }
7876 else {
7977 validateErrorHandler(false);
8078 this.errorHandler = errHandler == null ? new LoggingErrorHandler() : (ErrorHandler) errHandler;
8179 this.batchErrorHandler = new BatchLoggingErrorHandler();
8280 }
8381 Assert.state(!this.isBatchListener || !this.isRecordAck, "Cannot use AckMode.RECORD with a batch listener");
8482 }
85
1.定义消费者订阅topic或者指定分区
2.设置监听器,支持4种:
1)BatchAcknowledgingMessageListener批量需确认消息监听器
2)AcknowledgingMessageListener需确认消息监听器
3)BatchMessageListener批量消息监听器
4)
MessageListener消息监听器(用的最多,一次消费一条)
2.提交 监听消费者任务(ListenerConsumer),返回Future并赋值。
这里我们看一下任务Runnable接口的run方法,分两种情况
1.如果自定义了分区,没必要再平衡分配分区了,直接回调
2.未指定分区,进入自旋消费
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91 11 @Override
2 2 public void run() {
3 3 if (this.genericListener instanceof ConsumerSeekAware) {
4 4 ((ConsumerSeekAware) this.genericListener).registerSeekCallback(this);
5 5 }
6 6 this.count = 0;
7 7 this.last = System.currentTimeMillis();
8 8 if (isRunning() && this.definedPartitions != null) {//1.如果运行中且自定义了分区,没必要再平衡分配分区了,直接回调
9 9 initPartitionsIfNeeded();// 有需要就初始化分区
1010 //回调
1113 if (!this.autoCommit) {
1214 startInvoker();
1315 }
1416 }
1517 long lastReceive = System.currentTimeMillis();
1618 long lastAlertAt = lastReceive;
1719 while (isRunning()) {//2.未指定分区,进入自旋消费
1820 try {
1921 if (!this.autoCommit) {
2022 processCommits();// 如果手动提交,处理提交
2123 }
2224 processSeeks();// 重新定位偏移量,下一次消费时使用
2325 if (this.logger.isTraceEnabled()) {
2426 this.logger.trace("Polling (paused=" + this.paused + ")...");
2527 }// 1)拉取消费记录
2628 ConsumerRecords<K, V> records = this.consumer.poll(this.containerProperties.getPollTimeout());
2729 if (records != null && this.logger.isDebugEnabled()) {
2830 this.logger.debug("Received: " + records.count() + " records");
2931 }
3032 if (records != null && records.count() > 0) {
3133 if (this.containerProperties.getIdleEventInterval() != null) {
3234 lastReceive = System.currentTimeMillis();
3335 }// 2)如果设置了自动提交,直接在当前线程执行
3439 if (this.autoCommit) {
3540 invokeListener(records);
3641 }
3742 else {// 3)否则发送消息进缓存队列
3843 if (sendToListener(records)) {
3944 if (this.assignedPartitions != null) {
4045 // avoid group management rebalance due to a slow
4146 // consumer
4247 this.consumer.pause(this.assignedPartitions);
4348 this.paused = true;
4449 this.unsent = records;
4550 }
4651 }
4752 }
4853 }
4954 else {
5055 if (this.containerProperties.getIdleEventInterval() != null) {
5156 long now = System.currentTimeMillis();
5257 if (now > lastReceive + this.containerProperties.getIdleEventInterval()
5358 && now > lastAlertAt + this.containerProperties.getIdleEventInterval()) {
5459 publishIdleContainerEvent(now - lastReceive);
5560 lastAlertAt = now;
5661 if (this.genericListener instanceof ConsumerSeekAware) {
5762 seekPartitions(getAssignedPartitions(), true);
5863 }
5964 }
6065 }
6166 }
6267 this.unsent = checkPause(this.unsent);
6368 }
6469 catch (WakeupException e) {
6570 this.unsent = checkPause(this.unsent);
6671 }
6772 catch (Exception e) {
6873 if (this.containerProperties.getGenericErrorHandler() != null) {
6974 this.containerProperties.getGenericErrorHandler().handle(e, null);
7075 }
7176 else {
7277 this.logger.error("Container exception", e);
7378 }
7479 }
7580 }
7681 if (this.listenerInvokerFuture != null) {
7782 stopInvoker();
7883 commitManualAcks();
7984 }
8085 try {
8186 this.consumer.unsubscribe();
8287 }
8388 catch (WakeupException e) {
8489 // No-op. Continue process
8590 }
8691 this.consumer.close();
8792 if (this.logger.isInfoEnabled()) {
8893 this.logger.info("Consumer stopped");
8994 }
9095 }
91
1.如果用户自定义了分区且非自动提交,那么开启异步线程执行
ListenerInvoker任务,源码如下:
1
2
3
4
5
6 11 private voidstartInvoker() {
22 ListenerConsumer.this.invoker = new ListenerInvoker();
33 ListenerConsumer.this.listenerInvokerFuture = this.containerProperties.getListenerTaskExecutor()
44 .submit(ListenerConsumer.this.invoker);
55 }
6
执行
ListenerInvoker的run方法,实际上就执行一遍,因为CountDownLatch初始化为1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57 11 private final class ListenerInvoker implements SchedulingAwareRunnable {
2 2
3 3 private final CountDownLatch exitLatch = new CountDownLatch(1);
4 4
5 5 private volatile boolean active = true;
6 6
7 7 private volatile Thread executingThread;
8 8
9 9 ListenerInvoker() {
1010 super();
1111 }
1212
1313 @Override
1414 public voidrun() {
1515 Assert.isTrue(this.active, "This instance is not active anymore");
1616 if (ListenerConsumer.this.theListener instanceof ConsumerSeekAware) {
1717 ((ConsumerSeekAware) ListenerConsumer.this.theListener).registerSeekCallback(ListenerConsumer.this);
1818 }
1919 try {
2020 this.executingThread = Thread.currentThread();
2121 while (this.active) {
2222 try {// 从阻塞队列LinkedBlockingQueue recordsToProcess中拉取 待消费记录
2323 ConsumerRecords<K, V> records = ListenerConsumer.this.recordsToProcess.poll(1,
2424 TimeUnit.SECONDS);
2525 if (this.active) {
2626 if (records != null) {
2727 invokeListener(records);// 消费
2828 }
2929 else {
3030 if (ListenerConsumer.this.logger.isTraceEnabled()) {
3131 ListenerConsumer.this.logger.trace("No records to process");
3232 }
3333 }
3434 }
3535 }
3636 catch (InterruptedException e) {
3737 if (!this.active) {
3838 Thread.currentThread().interrupt();
3939 }
4040 else {
4141 ListenerConsumer.this.logger.debug("Interrupt ignored");
4242 }
4343 }
4444 }
4545 }
4646 finally {
4747 this.active = false;
4848 this.exitLatch.countDown();
4949 }
5050 }
5151
5252 @Override
5353 public boolean isLongLived() {
5454 return true;
5555 }
56581 }
57
1
2
3
4
5
6
7
8
9 11 private void invokeListener(final ConsumerRecords<K, V> records) {
22 if (this.isBatchListener) {
33 invokeBatchListener(records);
44 }
55 else {
66 invokeRecordListener(records);
77 }
88 }
9
如上图,从阻塞队列中取得待消费记录,用迭代器iterator消费,根据自定义消费类型,用不同listener来执行onMessage方法(用户自定义
MessageListener接口的onMessage方法,实现用户自己的消费业务逻辑)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39 11 private void invokeRecordListener(final ConsumerRecords<K, V> records) {
2 2 Iterator<ConsumerRecord<K, V>> iterator = records.iterator();
3 3 while (iterator.hasNext() && (this.autoCommit || (this.invoker != null && this.invoker.active))) {
4 4 final ConsumerRecord<K, V> record = iterator.next();
5 5 if (this.logger.isTraceEnabled()) {
6 6 this.logger.trace("Processing " + record);
7 7 }
8 8 try {
9 9 if (this.acknowledgingMessageListener != null) {
1010 this.acknowledgingMessageListener.onMessage(record,//终极核心方法,用户自定义的MessageListener接口的onMessage方法
1111 this.isAnyManualAck
1212 ? new ConsumerAcknowledgment(record, this.isManualImmediateAck)
1313 : null);
1414 }
1515 else {
1616 this.listener.onMessage(record);//终极核心方法,用户自定义的MessageListener接口的onMessage方法
1717 }
1818 if (!this.isAnyManualAck && !this.autoCommit) {
1919 this.acks.add(record);
2020 }
2121 }
2222 catch (Exception e) {
2323 if (this.containerProperties.isAckOnError() && !this.autoCommit) {
2424 this.acks.add(record);
2525 }
2626 try {
2727 this.errorHandler.handle(e, record);
2828 }
2929 catch (Exception ee) {
3030 this.logger.error("Error handler threw an exception", ee);
3131 }
3232 catch (Error er) { //NOSONAR
3333 this.logger.error("Error handler threw an error", er);
3434 throw er;
3535 }
3636 }
3737 }
3838 }
39
2.未指定分区,进入自旋
1
2
3
4 1// 1)拉取消费记录
2ConsumerRecords<K, V> records = this.consumer.poll(this.containerProperties.getPollTimeout());
3
4
1
2
3 12)如果设置了自动提交,直接在当前线程执行invokeListener(records);
2
3
1
2
3 1// 3)否则发送消息进缓存队列
2sendToListener(records)
3
1)在每个轮询中,消费者将尝试使用最后一个被使用的偏移量作为起始偏移量,并按顺序提取。最后一个被消费的偏移量可以通过 seek(TopicPartition,long)或自动设置为最后一个被订阅的分区列表的偏移量获得。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35 11 @Override
2 2 public ConsumerRecords<K, V> poll(long timeout) {
3 3 acquire();
4 4 try {
5 5 if (timeout < 0)
6 6 throw new IllegalArgumentException("Timeout must not be negative");
7 7
8 8 if (this.subscriptions.hasNoSubscriptionOrUserAssignment())
9 9 throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions");
1010
1111 // poll for new data until the timeout expires
1212 long start = time.milliseconds();
1313 long remaining = timeout;
1414 do {
1515 Map<TopicPartition, List<ConsumerRecord<K, V>>> records =pollOnce(remaining);
1616 if (!records.isEmpty()) {
1723 fetcher.sendFetches();// 在返回所获取的记录之前,我们可以发送下一轮的fetches并避免阻塞等待它们的响应,以便在用户处理获取的记录时进行流水线操作。
1824 client.pollNoWakeup();//由于已经更新了所使用的位置,所以我们不允许在返回所获取的记录之前触发wakeups或任何其他错误。
1925
2026 if (this.interceptors == null)
2127 return new ConsumerRecords<>(records);
2228 else// 如果存在消费者拦截器执行拦截
2329 return this.interceptors.onConsume(new ConsumerRecords<>(records));
2430 }
2531
2632 long elapsed = time.milliseconds() - start;
2733 remaining = timeout - elapsed;
2834 } while (remaining > 0);
2935
3036 return ConsumerRecords.empty();
3137 } finally {
3238 release();
3339 }
3440 }
35
pollOnce:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32 11 private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long timeout) {
2 2 coordinator.poll(time.milliseconds());
3 3
4 4 // 遍历所有的TopicPartition,如果有未知偏移量(分区的),那么更新。涉及coordinator刷新已提交分区偏移量+fetcher更新获取位置
5 6 if (!subscriptions.hasAllFetchPositions())
6 7 updateFetchPositions(this.subscriptions.missingFetchPositions());
7 8
8 9 // 返回已获取到的记录
910 Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords();
1011 if (!records.isEmpty())
1112 returnrecords;
1213
1314 // 发送fetch请求
1415 fetcher.sendFetches();
1516
1617 long now = time.milliseconds();
1718 long pollTimeout = Math.min(coordinator.timeToNextPoll(now), timeout);
1819 // 执行IO,拉取数据
1920 client.poll(pollTimeout, now, new PollCondition() {
2021 @Override
2122 public boolean shouldBlock() {
2223 // since a fetch might be completed by the background thread, we need this poll condition
2324 // to ensure that we do not block unnecessarily in poll()
2425 return !fetcher.hasCompletedFetches();
2526 }
2627 });
2731 if (coordinator.needRejoin())
2832 return Collections.emptyMap();
2933
3034 return fetcher.fetchedRecords();
3135 }
32
好吧,再往下涉及到通信IO层了,这里不再多说。将来补全了kafka通信协议相关文章后再加上飞机票。
2)
invokeListener和分支1一样
,
最终调用的是用户自定义的
MessageListener接口的
onMessage方法,不再重复。
sendToListener,
这里塞进缓存队列LinkedBlockingQueue<ConsumerRecords<K, V>> recordsToProcess,塞进队列后,何时再消费?ListenerInvoker的run方法执行了recordsToProcess.poll进行了消费,