kafka原理和实践(四)spring-kafka消费者源码

释放双眼,带上耳机,听听看~!

系列目录

kafka原理和实践(一)原理:10分钟入门

kafka原理和实践(二)spring-kafka简单实践

kafka原理和实践(三)spring-kafka生产者源码

kafka原理和实践(四)spring-kafka消费者源码

kafka原理和实践(五)spring-kafka配置详解

kafka原理和实践(六)总结升华

 

 

==============正文分割线=====================

一、kafkaConsumer消费者模型

kafka原理和实践(四)spring-kafka消费者源码

如上图所示,spring-kafka消费者模型主要流程:

1.容器启动,轮询执行消费。

2.kafkaConsumer拉取消息流程:

1)Fetcher请求获取器获取请求并存储在unset中

2)ConsumerNetworkClient网络客户端执行poll(),调用NetWlrikClient的send()方法从unset中获取ClientRequest请求转成RequestSend最终塞进Selector的KafkaChannel通道中,Seletcor.send()从kafka集群拉取待消费数据ConsumerRecords

  1. 消费者监听器MessageListener.onMessage()执行用户自定义的实际消费业务逻辑。

一、kafkaConsumer构造


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
11 @SuppressWarnings("unchecked")
2  2     privateKafkaConsumer(ConsumerConfig config,
3  3                           Deserializer<K> keyDeserializer,
4  4                           Deserializer<V> valueDeserializer) {
5  5         try {
6  6             log.debug("Starting the Kafka consumer");
7  7             this.requestTimeoutMs = config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
8  8             int sessionTimeOutMs = config.getInt(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG);
9  9             int fetchMaxWaitMs = config.getInt(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG);
10 10             if (this.requestTimeoutMs <= sessionTimeOutMs || this.requestTimeoutMs <= fetchMaxWaitMs)
11 11                 throw new ConfigException(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG + " should be greater than " + ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG + " and " + ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG);
12 12             this.time = new SystemTime();
13 13
14 14             String clientId = config.getString(ConsumerConfig.CLIENT_ID_CONFIG);
15 15             if (clientId.length() <= 0)
16 16                 clientId = "consumer-" + CONSUMER_CLIENT_ID_SEQUENCE.getAndIncrement();
17 17             this.clientId = clientId;
18 18             Map<String, String> metricsTags = new LinkedHashMap<>();
19 19             metricsTags.put("client-id", clientId);
20 20             MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG))
21 21                     .timeWindow(config.getLong(ConsumerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS)
22 22                     .tags(metricsTags);
23 23             List<MetricsReporter> reporters = config.getConfiguredInstances(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG,
24 24                     MetricsReporter.class);
25 25             reporters.add(new JmxReporter(JMX_PREFIX));
26 26             this.metrics = new Metrics(metricConfig, reporters, time);
27 27             this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
28 28
29 29             // load interceptors and make sure they get clientId
30 30             Map<String, Object> userProvidedConfigs = config.originals();
31 31             userProvidedConfigs.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
32 32             List<ConsumerInterceptor<K, V>> interceptorList = (List) (new ConsumerConfig(userProvidedConfigs)).getConfiguredInstances(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
33 33                     ConsumerInterceptor.class);
34 34             this.interceptors = interceptorList.isEmpty() ? null : new ConsumerInterceptors<>(interceptorList);
35 35             if (keyDeserializer == null) {
36 36                 this.keyDeserializer = config.getConfiguredInstance(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
37 37                         Deserializer.class);
38 38                 this.keyDeserializer.configure(config.originals(), true);
39 39             } else {
40 40                 config.ignore(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
41 41                 this.keyDeserializer = keyDeserializer;
42 42             }
43 43             if (valueDeserializer == null) {
44 44                 this.valueDeserializer = config.getConfiguredInstance(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
45 45                         Deserializer.class);
46 46                 this.valueDeserializer.configure(config.originals(), false);
47 47             } else {
48 48                 config.ignore(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
49 49                 this.valueDeserializer = valueDeserializer;
50 50             }
51 51             ClusterResourceListeners clusterResourceListeners = configureClusterResourceListeners(keyDeserializer, valueDeserializer, reporters, interceptorList);
52 52             this.metadata = new Metadata(retryBackoffMs, config.getLong(ConsumerConfig.METADATA_MAX_AGE_CONFIG), false, clusterResourceListeners);
53 53             List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
54 54             this.metadata.update(Cluster.bootstrap(addresses), 0);
55 55             String metricGrpPrefix = "consumer";
56 56             ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config.values());
57 57             NetworkClient netClient = new NetworkClient(
58 58                     new Selector(config.getLong(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), metrics, time, metricGrpPrefix, channelBuilder),
59 59                     this.metadata,
60 60                     clientId,
61 61                     100, // a fixed large enough value will suffice
62 62                     config.getLong(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG),
63 63                     config.getInt(ConsumerConfig.SEND_BUFFER_CONFIG),
64 64                     config.getInt(ConsumerConfig.RECEIVE_BUFFER_CONFIG),
65 65                     config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG), time);
66 66             this.client = new ConsumerNetworkClient(netClient, metadata, time, retryBackoffMs,
67 67                     config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG));
68 68             OffsetResetStrategy offsetResetStrategy = OffsetResetStrategy.valueOf(config.getString(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).toUpperCase(Locale.ROOT));
69 69             this.subscriptions = new SubscriptionState(offsetResetStrategy);
70 70             List<PartitionAssignor> assignors = config.getConfiguredInstances(
71 71                     ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
72 72                     PartitionAssignor.class);
73 73             this.coordinator = new ConsumerCoordinator(this.client,
74 74                     config.getString(ConsumerConfig.GROUP_ID_CONFIG),
75 75                     config.getInt(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG),
76 76                     config.getInt(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG),
77 77                     config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG),
78 78                     assignors,
79 79                     this.metadata,
80 80                     this.subscriptions,
81 81                     metrics,
82 82                     metricGrpPrefix,
83 83                     this.time,
84 84                     retryBackoffMs,
85 85                     new ConsumerCoordinator.DefaultOffsetCommitCallback(),
86 86                     config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG),
87 87                     config.getInt(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG),
88 88                     this.interceptors,
89 89                     config.getBoolean(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG));
90 90             this.fetcher = new Fetcher<>(this.client,
91 91                     config.getInt(ConsumerConfig.FETCH_MIN_BYTES_CONFIG),
92 92                     config.getInt(ConsumerConfig.FETCH_MAX_BYTES_CONFIG),
93 93                     config.getInt(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG),
94 94                     config.getInt(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG),
95 95                     config.getInt(ConsumerConfig.MAX_POLL_RECORDS_CONFIG),
96 96                     config.getBoolean(ConsumerConfig.CHECK_CRCS_CONFIG),
97 97                     this.keyDeserializer,
98 98                     this.valueDeserializer,
99 99                     this.metadata,
100100                     this.subscriptions,
101101                     metrics,
102102                     metricGrpPrefix,
103103                     this.time,
104104                     this.retryBackoffMs);
105105
106106             config.logUnused();
107107             AppInfoParser.registerAppInfo(JMX_PREFIX, clientId);
108108
109109             log.debug("Kafka consumer created");
110110         } catch (Throwable t) {
111111             // call close methods if internal objects are already constructed
112112             // this is to prevent resource leak. see KAFKA-2121
113113             close(true);
114114             // now propagate the exception
115115             throw new KafkaException("Failed to construct kafka consumer", t);
116116         }
117117     }
118

 从
KafkaConsumer构造函数来看,核心组件有:

1.Metadata:封装了元数据的一些逻辑的类。元数据仅保留一个主题的子集,随着时间的推移可以添加。当我们请求一个主题的元数据时,我们没有任何元数据会触发元数据更新。如果对元数据启用了主题过期,那么在更新之后,在过期时间间隔内未使用的任何主题都将从元数据刷新集中删除。

2**.**
ConsumerNetworkClient:高等级消费者访问网络层,为请求Future任务
提供基本支持。这个类是线程安全的,但是不提供响应回调的同步。这保证在调用它们时不会持有锁。

SubscriptionState:订阅的TopicPartition的offset状态维护

4.ConsumerCoordinator:消费者的协调者,负责partitiion的分配,reblance

5.Fetcher:从brokers上按照配置获取消息。

二、消费者容器启动流程

kafka消费者有两种常见的实现方式:

1.xml配置文件

2.基于注解实现

其实,不管哪种方式,本质只是生成Spring Bean的方式不同而已。我们就以xml的实现方式来追踪源码。

基于xml的总体配置如下:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
11 <!-- 1.定义consumer的参数 -->
2 2     <bean id="consumerProperties" class="java.util.HashMap">
3 3         <constructor-arg>
4 4             <map>
5 5                 <entry key="bootstrap.servers" value="${bootstrap.servers}" />
6 6                 <entry key="group.id" value="${group.id}" />
7 7                 <entry key="enable.auto.commit" value="${enable.auto.commit}" />
8 8                 <entry key="session.timeout.ms" value="${session.timeout.ms}" />
9 9                 <entry key="key.deserializer"
1010                     value="org.apache.kafka.common.serialization.StringDeserializer" />
1111                 <entry key="value.deserializer"
1212                     value="org.apache.kafka.common.serialization.StringDeserializer" />
1313             </map>
1414         </constructor-arg>
1515     </bean>
1616
1717     <!-- 2.创建consumerFactory bean -->
1818     <bean id="consumerFactory"
1919         class="org.springframework.kafka.core.DefaultKafkaConsumerFactory" >
2020         <constructor-arg>
2121             <ref bean="consumerProperties" />
2222         </constructor-arg>
2323     </bean>
2424
2525     <!-- 3.定义消费实现类 -->
2626     <bean id="kafkaConsumerService" class="xxx.service.impl.KafkaConsumerSerivceImpl" />
2727
2828     <!-- 4.消费者容器配置信息 -->
2929     <bean id="containerProperties" class="org.springframework.kafka.listener.config.ContainerProperties">
3030         <!-- topic -->
3131         <constructor-arg name="topics">
3232             <list>
3333                 <value>${kafka.consumer.topic.credit.for.lease}</value>
3434                 <value>${loan.application.feedback.topic}</value>
3535                 <value>${templar.agreement.feedback.topic}</value>
3636                 <value>${templar.aggrement.active.feedback.topic}</value>
3737                 <value>${templar.aggrement.agreementRepaid.topic}</value>
3838                 <value>${templar.aggrement.agreementWithhold.topic}</value>
3939                 <value>${templar.aggrement.agreementRepayRemind.topic}</value>
4040             </list>
4141         </constructor-arg>
4242         <property name="messageListener" ref="kafkaConsumerService" />
4343     </bean>
4444     <!-- 5.消费者并发消息监听容器,执行doStart()方法 -->
4545     <bean id="messageListenerContainer" class="org.springframework.kafka.listener.ConcurrentMessageListenerContainer" init-method="doStart" >
4646         <constructor-arg ref="consumerFactory" />
4747         <constructor-arg ref="containerProperties" />
4848         <property name="concurrency" value="${concurrency}" />
4949     </bean>
50

 分为5个步骤:

 2.1.定义消费参数bean

consumerProperties ,就是个map<key,value>

2.2.创建consumerFactory bean


1
2
1DefaultKafkaConsumerFactory 实现了ConsumerFactory接口,提供创建消费者和判断是否自动提交2个方法。通过consumerProperties作为参数构造。
2

1
2
3
4
5
6
7
8
9
11 public interface ConsumerFactory&lt;K, V&gt; {
22
33     Consumer&lt;K, V&gt; createConsumer();
44
55     boolean isAutoCommit();
66
77
88 }
9

2.3.定义消费实现类

自定义一个类实现MessageListener接口,接口设计如下:

kafka原理和实践(四)spring-kafka消费者源码

实现
onMessage方法,去消费接收到的消息。两种方案:

1)
MessageListener 消费完消息后
自动提交offset
(enable.auto.commit=true时),可提高效率,存在消费失败但移动了偏移量的风险。

2)
AcknowledgingMessageListener
消费完消息后手动提交offset(enable.auto.commit=false时)效率降低,无消费失败但移动偏移量的风险。

2.4.监听容器配置信息


1
2
1ContainerProperties:包含了一个监听容器的运行时配置信息,主要定义了监听的主题、分区、初始化偏移量,还有消息监听器。
2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
11 public class ContainerProperties {
2  2
3  3     private static final int DEFAULT_SHUTDOWN_TIMEOUT = 10000;
4  4
5  5     private static final int DEFAULT_QUEUE_DEPTH = 1;
6  6
7  7     private static final int DEFAULT_PAUSE_AFTER = 10000;
8  8
9  9     /**
10 10      * Topic names.监听的主题字符串数组
11 11      */
12 12     private final String[] topics;
13 13
14 14     /**
15 15      * Topic pattern.监听的主题模板
16 16      */
17 17     private final Pattern topicPattern;
18 18
19 19     /**
20 20      * Topics/partitions/initial offsets.
21 21      */
22 22     private final TopicPartitionInitialOffset[] topicPartitions;
23 23
24 24     /**
25 25      * 确认模式(自动确认属性为false时使用)
26 26      * &lt;ul&gt;
27 27      * &lt;li&gt;1.RECORD逐条确认: 每条消息被发送给监听者后确认&lt;/li&gt;
28 28      * &lt;li&gt;2.BATCH批量确认: 当批量消息记录被消费者接收到并传送给监听器时确认&lt;/li&gt;
29 30      * &lt;li&gt;3.TIME超时确认:当超过设置的超时时间毫秒数时确认(should be greater than
30 31      * {@code #setPollTimeout(long) pollTimeout}.&lt;/li&gt;
31 32      * &lt;li&gt;4.COUNT计数确认: 当接收到指定数量之后确认&lt;/li&gt;
32 33      * &lt;li&gt;5.MANUAL手动确认:由监听器负责确认(AcknowledgingMessageListener)&lt;/ul&gt;
33 36      */
34 37     private AbstractMessageListenerContainer.AckMode ackMode = AckMode.BATCH;
35 38
36 39     /**
37 40      * The number of outstanding record count after which offsets should be
38 41      * committed when {@link AckMode#COUNT} or {@link AckMode#COUNT_TIME} is being
39 42      * used.
40 43      */
41 44     private int ackCount;
42 45
43 46     /**
44 47      * The time (ms) after which outstanding offsets should be committed when
45 48      * {@link AckMode#TIME} or {@link AckMode#COUNT_TIME} is being used. Should be
46 49      * larger than
47 50      */
48 51     private long ackTime;
49 52
50 53     /**
51 54      * 消息监听器,必须是 MessageListener或者AcknowledgingMessageListener两者中的一个 55      * 56      */
52 57     private Object messageListener;
53 58
54 59     /**
55 60      * The max time to block in the consumer waiting for records.
56 61      */
57 62     private volatile long pollTimeout = 1000;
58 63
59 64     /**
60 65      * 线程执行器:轮询消费者
61 66      */
62 67     private AsyncListenableTaskExecutor consumerTaskExecutor;
63 68
64 69     /**
65 70      * 线程执行器:调用监听器
66 71      */
67 72     private AsyncListenableTaskExecutor listenerTaskExecutor;
68 73
69 74     /**
70 75      * 错误回调,当监听器抛出异常时
71 76      */
72 77     private GenericErrorHandler&lt;?&gt; errorHandler;
73 78
74 79     /**
75 80      * When using Kafka group management and {@link #setPauseEnabled(boolean)} is
76 81      * true, the delay after which the consumer should be paused. Default 10000.
77 82      */
78 83     private long pauseAfter = DEFAULT_PAUSE_AFTER;
79 84
80 85     /**
81 86      * When true, avoids rebalancing when this consumer is slow or throws a
82 87      * qualifying exception - pauses the consumer. Default: true.
83 88      * @see #pauseAfter
84 89      */
85 90     private boolean pauseEnabled = true;
86 91
87 92     /**
88 93      * Set the queue depth for handoffs from the consumer thread to the listener
89 94      * thread. Default 1 (up to 2 in process).
90 95      */
91 96     private int queueDepth = DEFAULT_QUEUE_DEPTH;
92 97
93 98     /**
94 99      * 停止容器超时时间    */
95103     private long shutdownTimeout = DEFAULT_SHUTDOWN_TIMEOUT;
96104
97105     /**
98106      * 用户定义的消费者再平衡监听器实现类     */
99108     private ConsumerRebalanceListener consumerRebalanceListener;
100109
101110     /**
102111      * 提交回调,默认记录日志。      */
103114     private OffsetCommitCallback commitCallback;
104115
105116     /**
106117      * Whether or not to call consumer.commitSync() or commitAsync() when the
107118      * container is responsible for commits. Default true. See
108119      * https://github.com/spring-projects/spring-kafka/issues/62 At the time of
109120      * writing, async commits are not entirely reliable.
110121      */
111122     private boolean syncCommits = true;
112123
113124     private boolean ackOnError = true;
114125
115126     private Long idleEventInterval;
116127
117128     publicContainerProperties(String... topics) {
118129         Assert.notEmpty(topics, &quot;An array of topicPartitions must be provided&quot;);
119130         this.topics = Arrays.asList(topics).toArray(new String[topics.length]);
120131         this.topicPattern = null;
121132         this.topicPartitions = null;
122133     }
123134
124135     public ContainerProperties(Pattern topicPattern) {
125136         this.topics = null;
126137         this.topicPattern = topicPattern;
127138         this.topicPartitions = null;
128139     }
129140
130141     public ContainerProperties(TopicPartitionInitialOffset... topicPartitions) {
131142         this.topics = null;
132143         this.topicPattern = null;
133144         Assert.notEmpty(topicPartitions, &quot;An array of topicPartitions must be provided&quot;);
134145         this.topicPartitions = new LinkedHashSet&lt;&gt;(Arrays.asList(topicPartitions))
135146                 .toArray(new TopicPartitionInitialOffset[topicPartitions.length]);
136147     }
137148 ...省略各种set、get
138149
139150 }
140

 

2.5.启动并发消息监听容器

核心类
ConcurrentMessageListenerContainer,继承自抽象类
AbstractMessageListenerContainer,类图如下:

kafka原理和实践(四)spring-kafka消费者源码

 

看上图可知AbstractMessageListenerContainer有2个实现类分别对应单线程和多线程,建议采用多线程消费。下面分析一下主要ConcurrentMessageListenerContainer类,注意2个方法:

1.构造函数,入参:消费者工厂ConsumerFactory+容器配置
ContainerProperties

doStart():核心方法KafkaMessageListenerContainer的
start()方法。
源码如下:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
11 public class ConcurrentMessageListenerContainer&lt;K, V&gt; extends AbstractMessageListenerContainer&lt;K, V&gt; {
2  2
3  3     private final ConsumerFactory&lt;K, V&gt; consumerFactory;
4  4
5  5     private final List&lt;KafkaMessageListenerContainer&lt;K, V&gt;&gt; containers = new ArrayList&lt;&gt;();
6  6
7  7     private int concurrency = 1;
8  8
9  9     /**
10 10      * Construct an instance with the supplied configuration properties.
11 11      * The topic partitions are distributed evenly across the delegate
12 12      * {@link KafkaMessageListenerContainer}s.
13 13      * @param consumerFactory the consumer factory.
14 14      * @param containerProperties the container properties.
15 15      */
16 16     public ConcurrentMessageListenerContainer(ConsumerFactory&lt;K, V&gt; consumerFactory,
17 17             ContainerProperties containerProperties) {
18 18         super(containerProperties);
19 19         Assert.notNull(consumerFactory, &quot;A ConsumerFactory must be provided&quot;);
20 20         this.consumerFactory = consumerFactory;
21 21     }
22 22
23 23     public int getConcurrency() {
24 24         return this.concurrency;
25 25     }
26 26
27 27     /**
28 28      * The maximum number of concurrent {@link KafkaMessageListenerContainer}s running.
29 29      * Messages from within the same partition will be processed sequentially.
30 30      * @param concurrency the concurrency.
31 31      */
32 32     public void setConcurrency(int concurrency) {
33 33         Assert.isTrue(concurrency &gt; 0, &quot;concurrency must be greater than 0&quot;);
34 34         this.concurrency = concurrency;
35 35     }
36 36
37 37     /**
38 38      * Return the list of {@link KafkaMessageListenerContainer}s created by
39 39      * this container.
40 40      * @return the list of {@link KafkaMessageListenerContainer}s created by
41 41      * this container.
42 42      */
43 43     public List&lt;KafkaMessageListenerContainer&lt;K, V&gt;&gt; getContainers() {
44 44         return Collections.unmodifiableList(this.containers);
45 45     }
46 46
47 47     /*
48 48      * Under lifecycle lock.
49 49      */
50 50     @Override
51 51     protected voiddoStart() {
52 52         if (!isRunning()) {
53 53             ContainerProperties containerProperties = getContainerProperties();
54 54             TopicPartitionInitialOffset[] topicPartitions = containerProperties.getTopicPartitions();
55 55             if (topicPartitions != null//校验并发数&gt;分区数,报错。
56 56                     &amp;&amp; this.concurrency &gt; topicPartitions.length) {
57 57                 this.logger.warn(&quot;When specific partitions are provided, the concurrency must be less than or &quot;
58 58                         + &quot;equal to the number of partitions; reduced from &quot; + this.concurrency + &quot; to &quot;
59 59                         + topicPartitions.length);
60 60                 this.concurrency = topicPartitions.length;//并发数最大只能=分区数
61 61             }
62 62             setRunning(true);
63 63             //遍历创建监听器容器
64 64             for (int i = 0; i &lt; this.concurrency; i++) {
65 65                 KafkaMessageListenerContainer&lt;K, V&gt; container;
66 66                 if (topicPartitions == null) {
67 67                     container = new KafkaMessageListenerContainer&lt;&gt;(this.consumerFactory, containerProperties);
68 68                 }
69 69                 else {
70 70                     container = new KafkaMessageListenerContainer&lt;&gt;(this.consumerFactory, containerProperties,
71 71                             partitionSubset(containerProperties, i));
72 72                 }
73 73                 if (getBeanName() != null) {
74 74                     container.setBeanName(getBeanName() + &quot;-&quot; + i);
75 75                 }
76 76                 if (getApplicationEventPublisher() != null) {
77 77                     container.setApplicationEventPublisher(getApplicationEventPublisher());
78 78                 }
79 79                 container.setClientIdSuffix(&quot;-&quot; + i);
80 80                 container.start();//核心方法,启动容器
81 81                 this.containers.add(container);
82 82             }
83 83         }
84 84     }146 ...省略
85147 }
86

 继续追踪
,调用AbstractMessageListenerContainer的
doStart(),值得注意的是start()和stop方法加了同一把锁,用于锁住生命周期。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
11 private final Object lifecycleMonitor = new Object();
2 2
3 3 @Override
4 4     public final void start() {
5 5         synchronized (this.lifecycleMonitor) {
6 6             Assert.isTrue(
7 7                     this.containerProperties.getMessageListener() instanceof KafkaDataListener,
8 8                     &quot;A &quot; + KafkaDataListener.class.getName() + &quot; implementation must be provided&quot;);
9 9 doStart();
1010         }
1111     }
1212
1313     protected abstract void doStart();
14

最终调用的是KafkaMessageListenerContainer的
doStart()


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
11 @Override
2 2     protected voiddoStart() {
3 3         if (isRunning()) {
4 4             return;
5 5         }
6 6         ContainerProperties containerProperties = getContainerProperties();
7 7
8 8         if (!this.consumerFactory.isAutoCommit()) {
9 9             AckMode ackMode = containerProperties.getAckMode();
1010             if (ackMode.equals(AckMode.COUNT) || ackMode.equals(AckMode.COUNT_TIME)) {
1111                 Assert.state(containerProperties.getAckCount() &gt; 0, &quot;&#x27;ackCount&#x27; must be &gt; 0&quot;);
1212             }
1313             if ((ackMode.equals(AckMode.TIME) || ackMode.equals(AckMode.COUNT_TIME))
1414                     &amp;&amp; containerProperties.getAckTime() == 0) {
1515                 containerProperties.setAckTime(5000);
1616             }
1717         }
1818
1919         Object messageListener = containerProperties.getMessageListener();
2020         Assert.state(messageListener != null, &quot;A MessageListener is required&quot;);
2121         if (messageListener instanceof GenericAcknowledgingMessageListener) {
2222             this.acknowledgingMessageListener = (GenericAcknowledgingMessageListener&lt;?&gt;) messageListener;
2323         }
2424         else if (messageListener instanceof GenericMessageListener) {
2525             this.listener = (GenericMessageListener&lt;?&gt;) messageListener;
2626         }
2727         else {
2828             throw new IllegalStateException(&quot;messageListener must be &#x27;MessageListener&#x27; &quot;
2929                     + &quot;or &#x27;AcknowledgingMessageListener&#x27;, not &quot; + messageListener.getClass().getName());
3030         }
3131         if (containerProperties.getConsumerTaskExecutor() == null) {
3232             SimpleAsyncTaskExecutor consumerExecutor = new SimpleAsyncTaskExecutor(
3333                     (getBeanName() == null ? &quot;&quot; : getBeanName()) + &quot;-C-&quot;);
3434             containerProperties.setConsumerTaskExecutor(consumerExecutor);
3535         }
3636         if (containerProperties.getListenerTaskExecutor() == null) {
3737             SimpleAsyncTaskExecutor listenerExecutor = new SimpleAsyncTaskExecutor(
3838                     (getBeanName() == null ? &quot;&quot; : getBeanName()) + &quot;-L-&quot;);
3939             containerProperties.setListenerTaskExecutor(listenerExecutor);
4040         }//1.构建 监听消费者
4141         this.listenerConsumer = new ListenerConsumer(this.listener, this.acknowledgingMessageListener);
4242         setRunning(true);  
43//2.异步提交 监听消费者任务,返回Future并赋值。
4443         this.listenerConsumerFuture = containerProperties
4544                 .getConsumerTaskExecutor()
4645                 .submitListenable(this.listenerConsumer);
4746     }
48

 

doStart主要包含2个操作:构建内部类
ListenerConsumer

提交 监听消费者任务,返回Future并赋值。

1.构建内部类

ListenerConsumer

ListenerConsumer类图如下:

kafka原理和实践(四)spring-kafka消费者源码

ListenerConsumer构造函数源码如下:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
11 @SuppressWarnings(&quot;unchecked&quot;)
2 2         ListenerConsumer(GenericMessageListener&lt;?&gt; listener, GenericAcknowledgingMessageListener&lt;?&gt; ackListener) {
3 3             Assert.state(!this.isAnyManualAck || !this.autoCommit,
4 4                     &quot;Consumer cannot be configured for auto commit for ackMode &quot; + this.containerProperties.getAckMode());
5 5             @SuppressWarnings(&quot;deprecation&quot;)
6 6             final Consumer&lt;K, V&gt; consumer =
7 7                     KafkaMessageListenerContainer.this.consumerFactory instanceof
8 8                                     org.springframework.kafka.core.ClientIdSuffixAware
9 9                             ? ((org.springframework.kafka.core.ClientIdSuffixAware&lt;K, V&gt;) KafkaMessageListenerContainer
1010                                     .this.consumerFactory)
1111                                         .createConsumer(KafkaMessageListenerContainer.this.clientIdSuffix)
1212                             : KafkaMessageListenerContainer.this.consumerFactory.createConsumer();
1313
1414             this.theListener = listener == null ? ackListener : listener;
1515             ConsumerRebalanceListener rebalanceListener = createRebalanceListener(consumer);
1616
1717             if (KafkaMessageListenerContainer.this.topicPartitions == null) {
1818                 if (this.containerProperties.getTopicPattern() != null) {
1919                     consumer.subscribe(this.containerProperties.getTopicPattern(), rebalanceListener);
2020                 }
2121                 else {
2222                     consumer.subscribe(Arrays.asList(this.containerProperties.getTopics()), rebalanceListener);
2323                 }
2424             }
2525             else {
2626                 List&lt;TopicPartitionInitialOffset&gt; topicPartitions =
2727                         Arrays.asList(KafkaMessageListenerContainer.this.topicPartitions);
2828                 this.definedPartitions = new HashMap&lt;&gt;(topicPartitions.size());
2929                 for (TopicPartitionInitialOffset topicPartition : topicPartitions) {
3030                     this.definedPartitions.put(topicPartition.topicPartition(),
3131                             new OffsetMetadata(topicPartition.initialOffset(), topicPartition.isRelativeToCurrent()));
3232                 }
3333                 consumer.assign(new ArrayList&lt;&gt;(this.definedPartitions.keySet()));
3434             }
3535             this.consumer = consumer;
3636             GenericErrorHandler&lt;?&gt; errHandler = this.containerProperties.getGenericErrorHandler();
3737             this.genericListener = listener;  
38               //1.  
39if (this.theListener instanceof BatchAcknowledgingMessageListener) {
4038                 this.listener = null;
4139                 this.batchListener = null;
4240                 this.acknowledgingMessageListener = null;
4341                 this.batchAcknowledgingMessageListener = (BatchAcknowledgingMessageListener&lt;K, V&gt;) this.theListener;
4442                 this.isBatchListener = true;
4543             }//2.
4644             else if (this.theListener instanceof AcknowledgingMessageListener) {
4745                 this.listener = null;
4846                 this.acknowledgingMessageListener = (AcknowledgingMessageListener&lt;K, V&gt;) this.theListener;
4947                 this.batchListener = null;
5048                 this.batchAcknowledgingMessageListener = null;
5149                 this.isBatchListener = false;
5250             }//3.
5351             else if (this.theListener instanceof BatchMessageListener) {
5452                 this.listener = null;
5553                 this.batchListener = (BatchMessageListener&lt;K, V&gt;) this.theListener;
5654                 this.acknowledgingMessageListener = null;
5755                 this.batchAcknowledgingMessageListener = null;
5856                 this.isBatchListener = true;
5957             }//4.
6058             else if (this.theListener instanceof MessageListener) {
6159                 this.listener = (MessageListener&lt;K, V&gt;) this.theListener;
6260                 this.batchListener = null;
6361                 this.acknowledgingMessageListener = null;
6462                 this.batchAcknowledgingMessageListener = null;
6563                 this.isBatchListener = false;
6664             }
6765             else {
6866                 throw new IllegalArgumentException(&quot;Listener must be one of &#x27;MessageListener&#x27;, &quot;
6967                         + &quot;&#x27;BatchMessageListener&#x27;, &#x27;AcknowledgingMessageListener&#x27;, &quot;
7068                         + &quot;&#x27;BatchAcknowledgingMessageListener&#x27;, not &quot; + this.theListener.getClass().getName());
7169             }
7270             if (this.isBatchListener) {
7371                 validateErrorHandler(true);
7472                 this.errorHandler = new LoggingErrorHandler();
7573                 this.batchErrorHandler = errHandler == null ? new BatchLoggingErrorHandler()
7674                         : (BatchErrorHandler) errHandler;
7775             }
7876             else {
7977                 validateErrorHandler(false);
8078                 this.errorHandler = errHandler == null ? new LoggingErrorHandler() : (ErrorHandler) errHandler;
8179                 this.batchErrorHandler = new BatchLoggingErrorHandler();
8280             }
8381             Assert.state(!this.isBatchListener || !this.isRecordAck, &quot;Cannot use AckMode.RECORD with a batch listener&quot;);
8482         }
85

1.定义消费者订阅topic或者指定分区

2.设置监听器,支持4种:

1)BatchAcknowledgingMessageListener批量需确认消息监听器

2)AcknowledgingMessageListener需确认消息监听器

3)BatchMessageListener批量消息监听器

4)
MessageListener消息监听器(用的最多,一次消费一条)

 

2.提交 监听消费者任务(ListenerConsumer),返回Future并赋值。

这里我们看一下任务Runnable接口的run方法,分两种情况

1.如果自定义了分区,没必要再平衡分配分区了,直接回调

2.未指定分区,进入自旋消费


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
11 @Override
2 2         public void run() {
3 3             if (this.genericListener instanceof ConsumerSeekAware) {
4 4                 ((ConsumerSeekAware) this.genericListener).registerSeekCallback(this);
5 5             }
6 6             this.count = 0;
7 7             this.last = System.currentTimeMillis();
8 8             if (isRunning() &amp;&amp; this.definedPartitions != null) {//1.如果运行中且自定义了分区,没必要再平衡分配分区了,直接回调
9 9                 initPartitionsIfNeeded();// 有需要就初始化分区
1010                 //回调
1113                 if (!this.autoCommit) {
1214 startInvoker();
1315                 }
1416             }
1517             long lastReceive = System.currentTimeMillis();
1618             long lastAlertAt = lastReceive;
1719             while (isRunning()) {//2.未指定分区,进入自旋消费
1820                 try {
1921                     if (!this.autoCommit) {
2022                         processCommits();// 如果手动提交,处理提交
2123                     }
2224                     processSeeks();// 重新定位偏移量,下一次消费时使用
2325                     if (this.logger.isTraceEnabled()) {
2426                         this.logger.trace(&quot;Polling (paused=&quot; + this.paused + &quot;)...&quot;);
2527                     }// 1)拉取消费记录
2628                     ConsumerRecords&lt;K, V&gt; records = this.consumer.poll(this.containerProperties.getPollTimeout());
2729                     if (records != null &amp;&amp; this.logger.isDebugEnabled()) {
2830                         this.logger.debug(&quot;Received: &quot; + records.count() + &quot; records&quot;);
2931                     }
3032                     if (records != null &amp;&amp; records.count() &gt; 0) {
3133                         if (this.containerProperties.getIdleEventInterval() != null) {
3234                             lastReceive = System.currentTimeMillis();
3335                         }// 2)如果设置了自动提交,直接在当前线程执行
3439                         if (this.autoCommit) {
3540 invokeListener(records);
3641                         }
3742                         else {// 3)否则发送消息进缓存队列
3843                             if (sendToListener(records)) {
3944                                 if (this.assignedPartitions != null) {
4045                                     // avoid group management rebalance due to a slow
4146                                     // consumer
4247                                     this.consumer.pause(this.assignedPartitions);
4348                                     this.paused = true;
4449                                     this.unsent = records;
4550                                 }
4651                             }
4752                         }
4853                     }
4954                     else {
5055                         if (this.containerProperties.getIdleEventInterval() != null) {
5156                             long now = System.currentTimeMillis();
5257                             if (now &gt; lastReceive + this.containerProperties.getIdleEventInterval()
5358                                     &amp;&amp; now &gt; lastAlertAt + this.containerProperties.getIdleEventInterval()) {
5459                                 publishIdleContainerEvent(now - lastReceive);
5560                                 lastAlertAt = now;
5661                                 if (this.genericListener instanceof ConsumerSeekAware) {
5762                                     seekPartitions(getAssignedPartitions(), true);
5863                                 }
5964                             }
6065                         }
6166                     }
6267                     this.unsent = checkPause(this.unsent);
6368                 }
6469                 catch (WakeupException e) {
6570                     this.unsent = checkPause(this.unsent);
6671                 }
6772                 catch (Exception e) {
6873                     if (this.containerProperties.getGenericErrorHandler() != null) {
6974                         this.containerProperties.getGenericErrorHandler().handle(e, null);
7075                     }
7176                     else {
7277                         this.logger.error(&quot;Container exception&quot;, e);
7378                     }
7479                 }
7580             }
7681             if (this.listenerInvokerFuture != null) {
7782                 stopInvoker();
7883                 commitManualAcks();
7984             }
8085             try {
8186                 this.consumer.unsubscribe();
8287             }
8388             catch (WakeupException e) {
8489                 // No-op. Continue process
8590             }
8691             this.consumer.close();
8792             if (this.logger.isInfoEnabled()) {
8893                 this.logger.info(&quot;Consumer stopped&quot;);
8994             }
9095         }
91

1.如果用户自定义了分区且非自动提交,那么开启异步线程执行
ListenerInvoker任务,源码如下:


1
2
3
4
5
6
11 private voidstartInvoker() {
22             ListenerConsumer.this.invoker = new ListenerInvoker();
33             ListenerConsumer.this.listenerInvokerFuture = this.containerProperties.getListenerTaskExecutor()
44                     .submit(ListenerConsumer.this.invoker);
55         }
6

执行
ListenerInvoker的run方法,实际上就执行一遍,因为CountDownLatch初始化为1


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
11 private final class ListenerInvoker implements SchedulingAwareRunnable {
2 2
3 3             private final CountDownLatch exitLatch = new CountDownLatch(1);
4 4
5 5             private volatile boolean active = true;
6 6
7 7             private volatile Thread executingThread;
8 8
9 9             ListenerInvoker() {
1010                 super();
1111             }
1212
1313             @Override
1414             public voidrun() {
1515                 Assert.isTrue(this.active, &quot;This instance is not active anymore&quot;);
1616                 if (ListenerConsumer.this.theListener instanceof ConsumerSeekAware) {
1717                     ((ConsumerSeekAware) ListenerConsumer.this.theListener).registerSeekCallback(ListenerConsumer.this);
1818                 }
1919                 try {
2020                     this.executingThread = Thread.currentThread();
2121                     while (this.active) {
2222                         try {// 从阻塞队列LinkedBlockingQueue recordsToProcess中拉取 待消费记录
2323                             ConsumerRecords&lt;K, V&gt; records = ListenerConsumer.this.recordsToProcess.poll(1,
2424                                     TimeUnit.SECONDS);
2525                             if (this.active) {
2626                                 if (records != null) {
2727 invokeListener(records);// 消费
2828                                 }
2929                                 else {
3030                                     if (ListenerConsumer.this.logger.isTraceEnabled()) {
3131                                         ListenerConsumer.this.logger.trace(&quot;No records to process&quot;);
3232                                     }
3333                                 }
3434                             }
3535                         }
3636                         catch (InterruptedException e) {
3737                             if (!this.active) {
3838                                 Thread.currentThread().interrupt();
3939                             }
4040                             else {
4141                                 ListenerConsumer.this.logger.debug(&quot;Interrupt ignored&quot;);
4242                             }
4343                         }
4444                     }
4545                 }
4646                 finally {
4747                     this.active = false;
4848                     this.exitLatch.countDown();
4949                 }
5050             }
5151
5252             @Override
5353             public boolean isLongLived() {
5454                 return true;
5555             }
56581         }
57

1
2
3
4
5
6
7
8
9
11 private void invokeListener(final ConsumerRecords&lt;K, V&gt; records) {
22             if (this.isBatchListener) {
33                 invokeBatchListener(records);
44             }
55             else {
66 invokeRecordListener(records);
77             }
88         }
9

如上图,从阻塞队列中取得待消费记录,用迭代器iterator消费,根据自定义消费类型,用不同listener来执行onMessage方法(用户自定义
MessageListener接口的onMessage方法,实现用户自己的消费业务逻辑


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
11 private void invokeRecordListener(final ConsumerRecords&lt;K, V&gt; records) {
2 2             Iterator&lt;ConsumerRecord&lt;K, V&gt;&gt; iterator = records.iterator();
3 3             while (iterator.hasNext() &amp;&amp; (this.autoCommit || (this.invoker != null &amp;&amp; this.invoker.active))) {
4 4                 final ConsumerRecord&lt;K, V&gt; record = iterator.next();
5 5                 if (this.logger.isTraceEnabled()) {
6 6                     this.logger.trace(&quot;Processing &quot; + record);
7 7                 }
8 8                 try {
9 9                     if (this.acknowledgingMessageListener != null) {
1010                         this.acknowledgingMessageListener.onMessage(record,//终极核心方法,用户自定义的MessageListener接口的onMessage方法
1111                                 this.isAnyManualAck
1212                                         ? new ConsumerAcknowledgment(record, this.isManualImmediateAck)
1313                                         : null);
1414                     }
1515                     else {
1616                         this.listener.onMessage(record);//终极核心方法,用户自定义的MessageListener接口的onMessage方法
1717                     }
1818                     if (!this.isAnyManualAck &amp;&amp; !this.autoCommit) {
1919                         this.acks.add(record);
2020                     }
2121                 }
2222                 catch (Exception e) {
2323                     if (this.containerProperties.isAckOnError() &amp;&amp; !this.autoCommit) {
2424                         this.acks.add(record);
2525                     }
2626                     try {
2727                         this.errorHandler.handle(e, record);
2828                     }
2929                     catch (Exception ee) {
3030                         this.logger.error(&quot;Error handler threw an exception&quot;, ee);
3131                     }
3232                     catch (Error er) { //NOSONAR
3333                         this.logger.error(&quot;Error handler threw an error&quot;, er);
3434                         throw er;
3535                     }
3636                 }
3737             }
3838         }
39

2.未指定分区,进入自旋


1
2
3
4
1// 1)拉取消费记录
2ConsumerRecords&lt;K, V&gt; records = this.consumer.poll(this.containerProperties.getPollTimeout());  
3
4

1
2
3
12)如果设置了自动提交,直接在当前线程执行invokeListener(records);  
2
3

1
2
3
1// 3)否则发送消息进缓存队列
2sendToListener(records)
3

1)在每个轮询中,消费者将尝试使用最后一个被使用的偏移量作为起始偏移量,并按顺序提取。最后一个被消费的偏移量可以通过 seek(TopicPartition,long)或自动设置为最后一个被订阅的分区列表的偏移量获得。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
11 @Override
2 2     public ConsumerRecords&lt;K, V&gt; poll(long timeout) {
3 3         acquire();
4 4         try {
5 5             if (timeout &lt; 0)
6 6                 throw new IllegalArgumentException(&quot;Timeout must not be negative&quot;);
7 7
8 8             if (this.subscriptions.hasNoSubscriptionOrUserAssignment())
9 9                 throw new IllegalStateException(&quot;Consumer is not subscribed to any topics or assigned any partitions&quot;);
1010
1111             // poll for new data until the timeout expires
1212             long start = time.milliseconds();
1313             long remaining = timeout;
1414             do {
1515                 Map&lt;TopicPartition, List&lt;ConsumerRecord&lt;K, V&gt;&gt;&gt; records =pollOnce(remaining);
1616                 if (!records.isEmpty()) {
1723                     fetcher.sendFetches();// 在返回所获取的记录之前,我们可以发送下一轮的fetches并避免阻塞等待它们的响应,以便在用户处理获取的记录时进行流水线操作。
1824                     client.pollNoWakeup();//由于已经更新了所使用的位置,所以我们不允许在返回所获取的记录之前触发wakeups或任何其他错误。
1925
2026                     if (this.interceptors == null)
2127                         return new ConsumerRecords&lt;&gt;(records);
2228                     else// 如果存在消费者拦截器执行拦截
2329                         return this.interceptors.onConsume(new ConsumerRecords&lt;&gt;(records));
2430                 }
2531
2632                 long elapsed = time.milliseconds() - start;
2733                 remaining = timeout - elapsed;
2834             } while (remaining &gt; 0);
2935
3036             return ConsumerRecords.empty();
3137         } finally {
3238             release();
3339         }
3440     }
35

pollOnce:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
11 private Map&lt;TopicPartition, List&lt;ConsumerRecord&lt;K, V&gt;&gt;&gt; pollOnce(long timeout) {
2 2         coordinator.poll(time.milliseconds());
3 3
4 4         // 遍历所有的TopicPartition,如果有未知偏移量(分区的),那么更新。涉及coordinator刷新已提交分区偏移量+fetcher更新获取位置
5 6         if (!subscriptions.hasAllFetchPositions())
6 7             updateFetchPositions(this.subscriptions.missingFetchPositions());
7 8
8 9         // 返回已获取到的记录
910         Map&lt;TopicPartition, List&lt;ConsumerRecord&lt;K, V&gt;&gt;&gt; records = fetcher.fetchedRecords();
1011         if (!records.isEmpty())
1112             returnrecords;
1213
1314         // 发送fetch请求
1415         fetcher.sendFetches();
1516
1617         long now = time.milliseconds();
1718         long pollTimeout = Math.min(coordinator.timeToNextPoll(now), timeout);
1819         // 执行IO,拉取数据
1920         client.poll(pollTimeout, now, new PollCondition() {
2021             @Override
2122             public boolean shouldBlock() {
2223                 // since a fetch might be completed by the background thread, we need this poll condition
2324                 // to ensure that we do not block unnecessarily in poll()
2425                 return !fetcher.hasCompletedFetches();
2526             }
2627         });
2731         if (coordinator.needRejoin())
2832             return Collections.emptyMap();
2933
3034         return fetcher.fetchedRecords();
3135     }
32

 好吧,再往下涉及到通信IO层了,这里不再多说。将来补全了kafka通信协议相关文章后再加上飞机票。

2)
invokeListener和分支1一样

最终调用的是用户自定义的
MessageListener接口的
onMessage方法,不再重复。

sendToListener,
这里塞进缓存队列LinkedBlockingQueue<ConsumerRecords<K, V>> recordsToProcess,塞进队列后,何时再消费?ListenerInvoker的run方法执行了recordsToProcess.poll进行了消费,

 

给TA打赏
共{{data.count}}人
人已打赏
安全网络

CDN安全市场到2022年价值76.3亿美元

2018-2-1 18:02:50

安全网络

Java NIO框架Netty教程(十一) 并发访问测试(上)

2021-8-18 16:36:11

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索