系列目录
kafka原理和实践(一)原理:10分钟入门
kafka原理和实践(二)spring-kafka简单实践
kafka原理和实践(三)spring-kafka生产者源码
kafka原理和实践(四)spring-kafka消费者源码
kafka原理和实践(五)spring-kafka配置详解
kafka原理和实践(六)总结升华
==============正文分割线=====================
由于项目上了Spring-cloud,继承了spring-boot-start,默认支持版本是spring-kafka-1.1.7,本文基于源码spring-kafka-1.1.7分析。虽然官网已经到2.0版本,但我们分析核心方法基本不变,官网飞机票
一、 KafkaProducer发送模型
如上图,由KafkaTemplete发起发送请求,可分为如下几个步骤:
一、数据入池
1.KafkaProducer启动发送消息
2.消息发送拦截器拦截
3.用序列化器把数据进行序列化
4.用分区器选择消息的分区
5.添加进记录累加器
二、NIO发送数据
6.等待数据条数达到批量发送阀值或者新建一个RecoedBatch,立即唤醒Sender线程执行run方法
7.发送器内部从累加器Deque中拿到要发送的数据RecordBatch转换成ClientRequest客户端请求
8.在发送器内部,经由NetworkClient转换成RequestSend(Send接口)并调用Selector暂存进KafkaChannel(NetWorkClient维护的通道Map<String, KafkaChannel> channels)
9.执行nio发送消息(1.Selector.select()2.把KafkaChannel中的Send数据(ByteBuffer[])写入KafkaChannel的写通道GatheringByteChannel)
二、KafkaTemplate模板
spring-kafka提供了简单的KafkaTemplate类,直接调用发送方法即可,只需要让容器知道这个bean即可(具体见第二章实践中xml中配置bean)。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74 11 public class KafkaTemplate<K, V> implements KafkaOperations<K, V> {
2 2 14 ...
3 15
4 16 /**
5 17 * Create an instance using the supplied producer factory and autoFlush false.
6 18 * @param producerFactory the producer factory.
7 19 */
8 20 public KafkaTemplate(ProducerFactory<K, V> producerFactory) {
9 21 this(producerFactory, false);
10 22 }
11 23
12 24 /**
13 25 * Create an instance using the supplied producer factory and autoFlush setting.
14 26 * Set autoFlush to true if you wish to synchronously interact with Kafka, calling
15 27 * {@link java.util.concurrent.Future#get()} on the result.
16 28 * @param producerFactory the producer factory.
17 29 * @param autoFlush true to flush after each send.
18 30 */
19 31 public KafkaTemplate(ProducerFactory<K, V> producerFactory, boolean autoFlush) {
20 32 this.producerFactory = producerFactory;
21 33 this.autoFlush = autoFlush;
22 34 }
23 36 ...
24181 /**
25182 * Send the producer record.
26183 * @param producerRecord the producer record.
27184 * @return a Future for the {@link RecordMetadata}.
28185 */
29186 protected ListenableFuture<SendResult<K, V>> doSend(final ProducerRecord<K, V> producerRecord) {
30187 final Producer<K, V> producer = getTheProducer();
31188 if (this.logger.isTraceEnabled()) {
32189 this.logger.trace("Sending: " + producerRecord);
33190 }
34191 final SettableListenableFuture<SendResult<K, V>> future = new SettableListenableFuture<>();
35192 producer.send(producerRecord, new Callback() {
36193
37194 @Override
38195 public voidonCompletion(RecordMetadata metadata, Exception exception) {
39196 try {
40197 if (exception == null) {
41198 future.set(new SendResult<>(producerRecord, metadata));
42199 if (KafkaTemplate.this.producerListener != null
43200 && KafkaTemplate.this.producerListener.isInterestedInSuccess()) {
44201 KafkaTemplate.this.producerListener.onSuccess(producerRecord.topic(),
45202 producerRecord.partition(), producerRecord.key(), producerRecord.value(), metadata);
46203 }
47204 }
48205 else {
49206 future.setException(new KafkaProducerException(producerRecord, "Failed to send", exception));
50207 if (KafkaTemplate.this.producerListener != null) {
51208 KafkaTemplate.this.producerListener.onError(producerRecord.topic(),
52209 producerRecord.partition(),
53210 producerRecord.key(),
54211 producerRecord.value(),
55212 exception);
56213 }
57214 }
58215 }
59216 finally {
60217 producer.close();
61218 }
62219 }
63220
64221 });
65222 if (this.autoFlush) {
66223 flush();
67224 }
68225 if (this.logger.isTraceEnabled()) {
69226 this.logger.trace("Sent: " + producerRecord);
70227 }
71228 return future;
72229 }
73235 }
74
KafkaTemplate源码重点
1.构造函数,入参ProducerFactory构造工厂和是否自动刷新(缓冲区的records立即发送)
2.发送消息
doSend,这里核心点就2个:
1)producer.
send(producerRecord,
Callback)producer即KafkaProducer
2)
Callback回调
onCompletion完成,
onSuccess,
onError。
三、KafkaProducer
3.1KafkaProducer构造过程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137 11 @SuppressWarnings({"unchecked", "deprecation"})
2 2 private KafkaProducer(ProducerConfig config, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
3 3 try {
4 4 log.trace("Starting the Kafka producer");
5 5 Map<String, Object> userProvidedConfigs = config.originals();
6 6 this.producerConfig = config;
7 7 this.time = new SystemTime();
8 8
9 9 clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG);
10 10 if (clientId.length() <= 0)
11 11 clientId = "producer-" + PRODUCER_CLIENT_ID_SEQUENCE.getAndIncrement();
12 12 Map<String, String> metricTags = new LinkedHashMap<String, String>();
13 13 metricTags.put("client-id", clientId);
14 14 MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG))
15 15 .timeWindow(config.getLong(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS)
16 16 .tags(metricTags);
17 17 List<MetricsReporter> reporters = config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG,
18 18 MetricsReporter.class);
19 19 reporters.add(new JmxReporter(JMX_PREFIX));
20 20 this.metrics = new Metrics(metricConfig, reporters, time);
21 21 this.partitioner = config.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG, Partitioner.class);
22 22 long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);
23 23 if (keySerializer == null) {
24 24 this.keySerializer = config.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
25 25 Serializer.class);
26 26 this.keySerializer.configure(config.originals(), true);
27 27 } else {
28 28 config.ignore(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
29 29 this.keySerializer = keySerializer;
30 30 }
31 31 if (valueSerializer == null) {
32 32 this.valueSerializer = config.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
33 33 Serializer.class);
34 34 this.valueSerializer.configure(config.originals(), false);
35 35 } else {
36 36 config.ignore(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
37 37 this.valueSerializer = valueSerializer;
38 38 }
39 39
40 40 // load interceptors and make sure they get clientId
41 41 userProvidedConfigs.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
42 42 List<ProducerInterceptor<K, V>> interceptorList = (List) (new ProducerConfig(userProvidedConfigs)).getConfiguredInstances(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
43 43 ProducerInterceptor.class);
44 44 this.interceptors = interceptorList.isEmpty() ? null : new ProducerInterceptors<>(interceptorList);
45 45
46 46 ClusterResourceListeners clusterResourceListeners = configureClusterResourceListeners(keySerializer, valueSerializer, interceptorList, reporters);
47 47 this.metadata = new Metadata(retryBackoffMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG), true, clusterResourceListeners);
48 48 this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG);
49 49 this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG);
50 50 this.compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG));
51 51 /* check for user defined settings.
52 52 * If the BLOCK_ON_BUFFER_FULL is set to true,we do not honor METADATA_FETCH_TIMEOUT_CONFIG.
53 53 * This should be removed with release 0.9 when the deprecated configs are removed.
54 54 */
55 55 if (userProvidedConfigs.containsKey(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG)) {
56 56 log.warn(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG + " config is deprecated and will be removed soon. " +
57 57 "Please use " + ProducerConfig.MAX_BLOCK_MS_CONFIG);
58 58 boolean blockOnBufferFull = config.getBoolean(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG);
59 59 if (blockOnBufferFull) {
60 60 this.maxBlockTimeMs = Long.MAX_VALUE;
61 61 } else if (userProvidedConfigs.containsKey(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG)) {
62 62 log.warn(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG + " config is deprecated and will be removed soon. " +
63 63 "Please use " + ProducerConfig.MAX_BLOCK_MS_CONFIG);
64 64 this.maxBlockTimeMs = config.getLong(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG);
65 65 } else {
66 66 this.maxBlockTimeMs = config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG);
67 67 }
68 68 } else if (userProvidedConfigs.containsKey(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG)) {
69 69 log.warn(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG + " config is deprecated and will be removed soon. " +
70 70 "Please use " + ProducerConfig.MAX_BLOCK_MS_CONFIG);
71 71 this.maxBlockTimeMs = config.getLong(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG);
72 72 } else {
73 73 this.maxBlockTimeMs = config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG);
74 74 }
75 75
76 76 /* check for user defined settings.
77 77 * If the TIME_OUT config is set use that for request timeout.
78 78 * This should be removed with release 0.9
79 79 */
80 80 if (userProvidedConfigs.containsKey(ProducerConfig.TIMEOUT_CONFIG)) {
81 81 log.warn(ProducerConfig.TIMEOUT_CONFIG + " config is deprecated and will be removed soon. Please use " +
82 82 ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);
83 83 this.requestTimeoutMs = config.getInt(ProducerConfig.TIMEOUT_CONFIG);
84 84 } else {
85 85 this.requestTimeoutMs = config.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);
86 86 }
87 87
88 88 this.accumulator = new RecordAccumulator(config.getInt(ProducerConfig.BATCH_SIZE_CONFIG),
89 89 this.totalMemorySize,
90 90 this.compressionType,
91 91 config.getLong(ProducerConfig.LINGER_MS_CONFIG),
92 92 retryBackoffMs,
93 93 metrics,
94 94 time);
95 95
96 96 List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
97 97 this.metadata.update(Cluster.bootstrap(addresses), time.milliseconds());
98 98 ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config.values());
99 99 NetworkClient client = new NetworkClient(
100100 new Selector(config.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), this.metrics, time, "producer", channelBuilder),
101101 this.metadata,
102102 clientId,
103103 config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION),
104104 config.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG),
105105 config.getInt(ProducerConfig.SEND_BUFFER_CONFIG),
106106 config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG),
107107 this.requestTimeoutMs, time);
108108 this.sender = newSender(client,
109109 this.metadata,
110110 this.accumulator,
111111 config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION) == 1,
112112 config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),
113113 (short) parseAcks(config.getString(ProducerConfig.ACKS_CONFIG)),
114114 config.getInt(ProducerConfig.RETRIES_CONFIG),
115115 this.metrics,
116116 new SystemTime(),
117117 clientId,
118118 this.requestTimeoutMs);
119119 String ioThreadName = "kafka-producer-network-thread" + (clientId.length() > 0 ? " | " + clientId : "");
120120 this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
121121 this.ioThread.start();
122122
123123 this.errors = this.metrics.sensor("errors");
124124
125125
126126 config.logUnused();
127127 AppInfoParser.registerAppInfo(JMX_PREFIX, clientId);
128128 log.debug("Kafka producer started");
129129 } catch (Throwable t) {
130130 // call close methods if internal objects are already constructed
131131 // this is to prevent resource leak. see KAFKA-2121
132132 close(0, TimeUnit.MILLISECONDS, true);
133133 // now propagate the exception
134134 throw new KafkaException("Failed to construct kafka producer", t);
135135 }
136136 }
137
如上图,KafkaProducer包含集合核心组件:
1)
Metadata元数据:维护cluster集群信息、topic信息。
2)
RecordAccumulator记录
累加器: 缓存生产数据,然后批量发送,用以减少IO次数,提升性能。
2)
Sender发送器:metadata+RecordAccumulator+NetworkClient网络客户端
3)
KafkaThread IO线程:一个自定义名称的线程,
Sender作为
Runnable接口,线程
start后,运行Sender的
run方法,go!
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106 11 /**
2 2 * The main run loop for the sender thread
3 3 */
4 4 public voidrun() {
5 5 log.debug("Starting Kafka producer I/O thread.");
6 6
7 7 // main loop, runs until close is called
8 8 while (running) {
9 9 try {
10 10 run(time.milliseconds());
11 11 } catch (Exception e) {
12 12 log.error("Uncaught error in kafka producer I/O thread: ", e);
13 13 }
14 14 }
15 15
16 16 log.debug("Beginning shutdown of Kafka producer I/O thread, sending remaining records.");
17 17
18 18 // okay we stopped accepting requests but there may still be
19 19 // requests in the accumulator or waiting for acknowledgment,
20 20 // wait until these are completed.
21 21 while (!forceClose && (this.accumulator.hasUnsent() || this.client.inFlightRequestCount() > 0)) {
22 22 try {
23 23 run(time.milliseconds());
24 24 } catch (Exception e) {
25 25 log.error("Uncaught error in kafka producer I/O thread: ", e);
26 26 }
27 27 }
28 28 if (forceClose) {
29 29 // We need to fail all the incomplete batches and wake up the threads waiting on
30 30 // the futures.
31 31 this.accumulator.abortIncompleteBatches();
32 32 }
33 33 try {
34 34 this.client.close();
35 35 } catch (Exception e) {
36 36 log.error("Failed to close network client", e);
37 37 }
38 38
39 39 log.debug("Shutdown of Kafka producer I/O thread has completed.");
40 40 }
41 41
42 42 /**
43 43 * Run a single iteration of sending
44 44 *
45 45 * @param now
46 46 * The current POSIX time in milliseconds
47 47 */
48 48 void run(long now) {
49 49 Cluster cluster = metadata.fetch();
50 50 // 获取集群中已准备好的分区列表
51 51 RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
52 52
53 53 // 如果有的分区的leader还未知 ,强制更新元数据
54 54 if (!result.unknownLeaderTopics.isEmpty()) {
55 58 for (String topic : result.unknownLeaderTopics)
56 59 this.metadata.add(topic);
57 60 this.metadata.requestUpdate();
58 61 }
59 62
60 63 // 移除NetworkClient还没准备好的发送到达的节点
61 64 Iterator<Node> iter = result.readyNodes.iterator();
62 65 long notReadyTimeout = Long.MAX_VALUE;
63 66 while (iter.hasNext()) {
64 67 Node node = iter.next();
65 68 if (!this.client.ready(node, now)) {
66 69 iter.remove();
67 70 notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now));
68 71 }
69 72 }
70 73
71 74 // 根据准备好的节点,创建生产者请求
72 75 Map<Integer, List<RecordBatch>> batches = this.accumulator.drain(cluster,
73 76 result.readyNodes,
74 77 this.maxRequestSize,
75 78 now);
76 79 if (guaranteeMessageOrder) {
77 80 // Mute all the partitions drained
78 81 for (List<RecordBatch> batchList : batches.values()) {
79 82 for (RecordBatch batch : batchList)
80 83 this.accumulator.mutePartition(batch.topicPartition);
81 84 }
82 85 }
83 86 // 超时处理
84 87 List<RecordBatch> expiredBatches = this.accumulator.abortExpiredBatches(this.requestTimeout, now);
85 88 // update sensors
86 89 for (RecordBatch expiredBatch : expiredBatches)
87 90 this.sensors.recordErrors(expiredBatch.topicPartition.topic(), expiredBatch.recordCount);
88 91
89 92 sensors.updateProduceRequestMetrics(batches);
90 93 List<ClientRequest> requests =createProduceRequests(batches, now);
91 94 // 如果存在已就绪节点,置轮询时间为0
92 98 long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
93 99 if (result.readyNodes.size() > 0) {
94100 log.trace("Nodes with data ready to send: {}", result.readyNodes);
95101 log.trace("Created {} produce requests: {}", requests.size(), requests);
96102 pollTimeout = 0;
97103 }
98104 for (ClientRequest request : requests)
99105 client.send(request, now);
100106
101107 // 1.如果有一些分区已准备好,查询时间为0;
102109 // 2.否则如果有分区有数据存储但是还没准备好,查询时间在当前时间和滞留过期时间差
103110 // 3.其他情况,查询时间在当前时间和元数据过期时间差
104111 this.client.poll(pollTimeout, now);
105112 }
106
对创建好的
requests遍历执行:
client.
send(request, now);NetworkClient发送ClientRequest
1
2
3
4
5
6
7
8
9
10
11
12
13
14 11 @Override
2 2 public void send(ClientRequest request, long now) {
3 3 String nodeId = request.request().destination();
4 4 if (!canSendRequest(nodeId))
5 5 throw new IllegalStateException("Attempt to send a request to node " + nodeId + " which is not ready.");
6 6 doSend(request, now);
7 7 }
8 8
9 9 private void doSend(ClientRequest request, long now) {
1010 request.setSendTimeMs(now);
1111 this.inFlightRequests.add(request);
1212 selector.send(request.request());
1313 }
14
1
2
3
4
5
6
7
8
9
10 11 public voidsend(Send send) {
22 KafkaChannel channel = channelOrFail(send.destination());
33 try {
44 channel.setSend(send);
55 } catch (CancelledKeyException e) {
66 this.failedSends.add(send.destination());
77 close(channel);
88 }
99 }
10
见上图,最终实际上就是构造了一个
KafkaChannel对象,并设置了发送内容和目的地。
**client.poll(pollTimeout, now);**实际的IO读写操作。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32 11 @Override
2 2 public List<ClientResponse> poll(long timeout, long now) {
3 3 long metadataTimeout = metadataUpdater.maybeUpdate(now);
4 4 try {
5 5 this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs));
6 6 } catch (IOException e) {
7 7 log.error("Unexpected error during I/O", e);
8 8 }
9 9
1010 // 处理执行完后,构建各种ClientResponse添加进responses
1111 long updatedNow = this.time.milliseconds();
1212 List<ClientResponse> responses = new ArrayList<>();
1313 handleCompletedSends(responses, updatedNow);
1414 handleCompletedReceives(responses, updatedNow);
1515 handleDisconnections(responses, updatedNow);
1616 handleConnections();
1717 handleTimedOutRequests(responses, updatedNow);
1818
1919 //遍历responses处理回调
2020 for (ClientResponse response : responses) {
2121 if (response.request().hasCallback()) {
2222 try {
2323 response.request().callback().onComplete(response);
2424 } catch (Exception e) {
2525 log.error("Uncaught error in request completion:", e);
2626 }
2727 }
2828 }
2929
3030 return responses;
3131 }
32
核心方法
selector.
poll最终执行了什么?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30 11 public void poll(long timeout) throws IOException {
2 2 if (timeout < 0)
3 3 throw new IllegalArgumentException("timeout should be >= 0");
4 4
5 5 clear();
6 6
7 7 if (hasStagedReceives() || !immediatelyConnectedKeys.isEmpty())
8 8 timeout = 0;
9 9
1010 /* check ready keys */
1111 long startSelect = time.nanoseconds();
1212 int readyKeys =select(timeout);
1313 long endSelect = time.nanoseconds();
1414 this.sensors.selectTime.record(endSelect - startSelect, time.milliseconds());
1515
1616 if (readyKeys > 0 || !immediatelyConnectedKeys.isEmpty()) {
1717 pollSelectionKeys(this.nioSelector.selectedKeys(), false, endSelect);
1818 pollSelectionKeys(immediatelyConnectedKeys, true, endSelect);
1919 }
2020
2121 addToCompletedReceives();
2222
2323 long endIo = time.nanoseconds();
2424 this.sensors.ioTime.record(endIo - endSelect, time.milliseconds());
2525
2626 // we use the time at the end of select to ensure that we don't close any connections that
2727 // have just been processed in pollSelectionKeys
2828 maybeCloseOldestConnection(endSelect);
2929 }
30
如上图,核心逻辑就2个:查询等待通道,写入数据。
1)
select:等待通道变成就绪状态,返回已准备好的通道数
1
2
3
4
5
6
7
8
9
10 11 private int select(long ms) throws IOException {
22 if (ms < 0L)
33 throw new IllegalArgumentException("timeout should be >= 0");
44
55 if (ms == 0L)
66 return this.nioSelector.selectNow();
77 else
88 return this.nioSelector.select(ms);
99 }
10
java.nio.channels.Selector nioSelector看上图,最终其实就是一个JDK自带的JAVA
NIO Selector执行
select方法,自上次调用select()方法后有多少通道变成就绪状态。
Selector.select(ms) 最长阻塞ms毫秒(通道在你注册的事件上就绪)。
Selector.selectNow:
不会阻塞,不管什么通道就绪都立刻返回,没有通道变成可选择的,则此方法直接返回零
NIO Selector
1.JAVA NIO模型
比较多,不在这里展开写,预留飞机票一张。
2.Selector
关于Selector这里就简单引用一张图,有图有真相。
2)
pollSelectionKeys 如果已准备好通道数>0,根据key把数据(ByteBuffer)写入指定Channel
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69 11 private void pollSelectionKeys(Iterable<SelectionKey> selectionKeys,
2 2 boolean isImmediatelyConnected,
3 3 long currentTimeNanos) {
4 4 Iterator<SelectionKey> iterator =selectionKeys.iterator();
5 5 while (iterator.hasNext()) {
6 6 SelectionKey key = iterator.next();
7 7 iterator.remove();
8 8 KafkaChannel channel = channel(key);
9 9
1010 // register all per-connection metrics at once
1111 sensors.maybeRegisterConnectionMetrics(channel.id());
1212 if (idleExpiryManager != null)
1313 idleExpiryManager.update(channel.id(), currentTimeNanos);
1414
1515 try {
1616
1717 /* complete any connections that have finished their handshake (either normally or immediately) */
1818 if (isImmediatelyConnected || key.isConnectable()) {
1919 if (channel.finishConnect()) {
2020 this.connected.add(channel.id());
2121 this.sensors.connectionCreated.record();
2222 SocketChannel socketChannel = (SocketChannel) key.channel();
2323 log.debug("Created socket with SO_RCVBUF = {}, SO_SNDBUF = {}, SO_TIMEOUT = {} to node {}",
2424 socketChannel.socket().getReceiveBufferSize(),
2525 socketChannel.socket().getSendBufferSize(),
2626 socketChannel.socket().getSoTimeout(),
2727 channel.id());
2828 } else
2929 continue;
3030 }
3131
3232 /* 准备好通道 */
3333 if (channel.isConnected() && !channel.ready())
3434 channel.prepare();
3535
3636 /* 从channel读取数据 */
3737 if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) {
3838 NetworkReceive networkReceive;
3939 while ((networkReceive = channel.read()) != null)
4040 addToStagedReceives(channel, networkReceive);
4141 }
4242
4343 /* 数据写入Channel */
4444 if (channel.ready() && key.isWritable()) {
4545 Send send = channel.write();
4646 if (send != null) {
4747 this.completedSends.add(send);
4848 this.sensors.recordBytesSent(channel.id(), send.size());
4949 }
5050 }
5151
5252 /* cancel any defunct sockets */
5353 if (!key.isValid()) {
5454 close(channel);
5555 this.disconnected.add(channel.id());
5656 }
5757
5858 } catch (Exception e) {
5959 String desc = channel.socketDescription();
6060 if (e instanceof IOException)
6161 log.debug("Connection with {} disconnected", desc, e);
6262 else
6363 log.warn("Unexpected error from {}; closing connection", desc, e);
6464 close(channel);
6565 this.disconnected.add(channel.id());
6666 }
6767 }
6868 }
69
3.2 KafkaProducer发送数据
KafkaProducer.send
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84 11 @Override
2 2 public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
3 3 // intercept the record, which can be potentially modified; this method does not throw exceptions
4 4 ProducerRecord<K, V> interceptedRecord = this.interceptors == null ? record : this.interceptors.onSend(record);
5 5 returndoSend(interceptedRecord, callback);
6 6 }
7 7
8 8 /**
9 9 * 异步发送一条记录到一个主题的实现类
1010 */
1111 private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
1212 TopicPartition tp = null;
1313 try {
1414 // first make sure the metadata for the topic is available
1515 ClusterAndWaitTime clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);
1616 long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
1717 Cluster cluster = clusterAndWaitTime.cluster;
1818 byte[] serializedKey;
1919 try {// 序列化key
2020 serializedKey = keySerializer.serialize(record.topic(), record.key());
2121 } catch (ClassCastException cce) {
2222 throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +
2323 " to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
2424 " specified in key.serializer");
2525 }
2626 byte[] serializedValue;
2727 try {// 序列化value
2828 serializedValue = valueSerializer.serialize(record.topic(), record.value());
2929 } catch (ClassCastException cce) {
3030 throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +
3131 " to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
3232 " specified in value.serializer");
3333 }
3434
3535 int partition = partition(record, serializedKey, serializedValue, cluster);
3636 int serializedSize = Records.LOG_OVERHEAD + Record.recordSize(serializedKey, serializedValue);
3737 ensureValidRecordSize(serializedSize);
38 // 主题和分区
3938 tp = new TopicPartition(record.topic(), partition);
4039 long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp();
4140 log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
4241 // producer callback will make sure to call both 'callback' and interceptor callback
4342 Callback interceptCallback = this.interceptors == null ? callback : new InterceptorCallback<>(callback, this.interceptors, tp);
4443 RecordAccumulator.RecordAppendResult result =accumulator.append(tp, timestamp, serializedKey, serializedValue, interceptCallback, remainingWaitMs);
4544 if (result.batchIsFull || result.newBatchCreated) {
4645 log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
4746 this.sender.wakeup();
4847 }
4948 return result.future;// 返回Future
5049 // handling exceptions and record the errors;
5150 // for API exceptions return them in the future,
5251 // for other exceptions throw directly
5352 } catch (ApiException e) {
5453 log.debug("Exception occurred during message send:", e);
5554 if (callback != null)
5655 callback.onCompletion(null, e);
5756 this.errors.record();
5857 if (this.interceptors != null)
5958 this.interceptors.onSendError(record, tp, e);
6059 return new FutureFailure(e);
6160 } catch (InterruptedException e) {
6261 this.errors.record();
6362 if (this.interceptors != null)
6463 this.interceptors.onSendError(record, tp, e);
6564 throw new InterruptException(e);
6665 } catch (BufferExhaustedException e) {
6766 this.errors.record();
6867 this.metrics.sensor("buffer-exhausted-records").record();
6968 if (this.interceptors != null)
7069 this.interceptors.onSendError(record, tp, e);
7170 throw e;
7271 } catch (KafkaException e) {
7372 this.errors.record();
7473 if (this.interceptors != null)
7574 this.interceptors.onSendError(record, tp, e);
7675 throw e;
7776 } catch (Exception e) {
7877 // we notify interceptor about all exceptions, since onSend is called before anything else in this method
7978 if (this.interceptors != null)
8079 this.interceptors.onSendError(record, tp, e);
8180 throw e;
8281 }
8382 }
84
核心方法,
1.把需要发送的数据(
TopicPartition+序列化后的key,value+)添加进RecordAccumulator
记录累加器。
sender.
wakeup()当累加器满了时,唤醒Sender不再阻塞在当前select()方法上。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56 11 /**
2 2 * 添加记录进累加器,返回result包含Future、标志位(batch批量发送已满或者新建) 7 * @param tp 主题分区
3 8 * @param timestamp The timestamp of the record
4 9 * @param key 序列化后的key
510 * @param value 序列化后的value
611 * @param callback 请求完成时的回调函数
712 * @param maxTimeToBlock 阻塞最大毫秒数
813 */
914 public RecordAppendResult append(TopicPartition tp,
1015 long timestamp,
1116 byte[] key,
1217 byte[] value,
1318 Callback callback,
1419 long maxTimeToBlock) throws InterruptedException {
1520 // 条数+1,往累加器中添加数据的条数(abortIncompleteBatches方法会作为条件使用)
1622 appendsInProgress.incrementAndGet();
1723 try {
1824 // 从ConcurrentMap<TopicPartition, Deque<RecordBatch>> batches中获取key=tp的的双向队列,为空新建一个
1925 Deque<RecordBatch> dq = getOrCreateDeque(tp);
2026 synchronized (dq) {// 阻塞双向队列,一直到获取锁,尝试添加进累加器
2127 if (closed)
2228 throw new IllegalStateException("Cannot send after the producer is closed.");
2329 RecordAppendResult appendResult =tryAppend(timestamp, key, value, callback, dq);
2430 if (appendResult != null)// 1.如果添加成功,直接返回
2531 return appendResult;
2632 }
2733 // =====2.添加失败====
2834 //2.1划分缓存,再次尝试添加进累加器
2935 int size = Math.max(this.batchSize, Records.LOG_OVERHEAD + Record.recordSize(key, value));
3036 log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());
3137 ByteBuffer buffer = free.allocate(size, maxTimeToBlock);
3238 synchronized (dq) {// 阻塞双向队列,一直到获取锁,尝试添加进累加器
3339 // 获取双向队列锁之后再次校验生产者是否已关闭
3440 if (closed)
3541 throw new IllegalStateException("Cannot send after the producer is closed.");
3642
3743 RecordAppendResult appendResult =tryAppend(timestamp, key, value, callback, dq);
3844 if (appendResult != null) {
3945 //2.2添加成功,释放缓冲区
4046 free.deallocate(buffer);
4147 return appendResult;
4248 }//2.3添加失败,构建一个可写入内存的MemoryRecords
4349 MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression, this.batchSize);
4450 RecordBatch batch = new RecordBatch(tp, records, time.milliseconds());
4551 FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, callback, time.milliseconds()));
4652
4753 dq.addLast(batch);
4854 incomplete.add(batch);// 添加进未完成记录IncompleteRecordBatches
4955 return new RecordAppendResult(future, dq.size() > 1 || batch.records.isFull(), true);
5056 }
5157 } finally {
52 // 条数-1,往累加器中添加记录的条数
5358 appendsInProgress.decrementAndGet();
5459 }
5560 }
56
看上图
append方法,把record添加进累加器调用了三次
tryAppend,前两次一样的最后一个参数是Deque,最后一次的最后一个参数是毫秒数。追踪前两个
tryAppend:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 11 /**
2 2 * If `RecordBatch.tryAppend` fails (i.e. the record batch is full), close its memory records to release temporary
3 3 * resources (like compression streams buffers).
4 4 */
5 5 private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Callback callback, Deque<RecordBatch> deque) {
6 6 RecordBatch last = deque.peekLast();
7 7 if (last != null) {
8 8 FutureRecordMetadata future = last.tryAppend(timestamp, key, value, callback, time.milliseconds());
9 9 if (future == null)
1010 last.records.close();
1111 else
1212 return new RecordAppendResult(future, deque.size() > 1 || last.records.isFull(), false);
1313 }
1414 return null;
1515 }
16
如上图,最终还是调用的
tryAppend(timestamp, key, value, callback, time.milliseconds());
追踪:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23 11 /**
2 2 * Append the record to the current record set and return the relative offset within that record set
3 3 *
4 4 * @return The RecordSend corresponding to this record or null if there isn't sufficient room.
5 5 */
6 6 public FutureRecordMetadata tryAppend(long timestamp, byte[] key, byte[] value, Callback callback, long now) {
7 7 if (!this.records.hasRoomFor(key, value)) {
8 8 return null;
9 9 } else {
1010 long checksum = this.records.append(offsetCounter++, timestamp, key, value);
1111 this.maxRecordSize = Math.max(this.maxRecordSize, Record.recordSize(key, value));
1212 this.lastAppendTime = now;
1313 FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount,
1414 timestamp, checksum,
1515 key == null ? -1 : key.length,
1616 value == null ? -1 : value.length);
1717 if (callback != null)
1818 thunks.add(new Thunk(callback, future));
1919 this.recordCount++;
2020 returnfuture;
2121 }
2222 }
23
如上图,
append实际就是往
RecordBatch的
MemoryRecords(封装了ByteBuffer等信息)中添加当前record。返回一个
FutureRecordMetadata。
最终封装成
RecordAppendResult 返回,至此完成了往累加器accumulator中添加一条record。
再次回归到KafkaTemplete生产者模板发送消息时doSend方法,当KafkaProducer.send发送消息完毕时,如果设置了自动刷新,则执行KafkaProducer.flush()
1
2
3
4
5
6
7
8
9
10
11
12 11 @Override
2 2 public void flush() {
3 3 log.trace("Flushing accumulated records in producer.");
4 4 this.accumulator.beginFlush();
5 5 this.sender.wakeup();
6 6 try {
7 7 this.accumulator.awaitFlushCompletion();
8 8 } catch (InterruptedException e) {
9 9 throw new InterruptException("Flush interrupted.", e);
1010 }
1111 }
12
KafkaProducer.flush()==》
accumulator.
awaitFlushCompletion()==》RecordBatch.
produceFuture.await()
1
2
3
4
5
6
7
8
9
10
11
12 11 /**
2 2 * Mark all partitions as ready to send and block until the send is complete
3 3 */
4 4 public void awaitFlushCompletion() throws InterruptedException {
5 5 try {
6 6 for (RecordBatch batch : this.incomplete.all())
7 7 batch.produceFuture.await();
8 8 } finally {
9 9 this.flushesInProgress.decrementAndGet();
1010 }
1111 }
12
1
2
3
4
5
6
7
8
9 11 private final CountDownLatch latch = new CountDownLatch(1);
22
33 /**
44 * Await the completion of this request
55 */
66 public void await() throws InterruptedException {
77 latch.await();
88 }
9
如上图,
awaitFlushCompletion
遍历未完成的
RecordBatch的
ProduceRequestResult (生产请求结果)用一个倒计数器(1个任务)等待完成。
四、总结
本章,我们结合流程图从kafaTemplete入手分析了kafka生产者发送消息的主要源码,现在看来主要就两个模块,一个是存储数据进累加器缓存,第二个是发送器 netty NIO发送消息。我们发现生产者发送消息源码并不复杂。下一章,讲解消费者源码。