kafka原理和实践(三)spring-kafka生产者源码

释放双眼,带上耳机,听听看~!

系列目录

kafka原理和实践(一)原理:10分钟入门

kafka原理和实践(二)spring-kafka简单实践

kafka原理和实践(三)spring-kafka生产者源码

kafka原理和实践(四)spring-kafka消费者源码

kafka原理和实践(五)spring-kafka配置详解

kafka原理和实践(六)总结升华

 

==============正文分割线=====================

由于项目上了Spring-cloud,继承了spring-boot-start,默认支持版本是spring-kafka-1.1.7,本文基于源码spring-kafka-1.1.7分析。虽然官网已经到2.0版本,但我们分析核心方法基本不变,官网飞机票

一、 KafkaProducer发送模型

kafka原理和实践(三)spring-kafka生产者源码

如上图,由KafkaTemplete发起发送请求,可分为如下几个步骤:

 

一、数据入池

1.KafkaProducer启动发送消息

2.消息发送拦截器拦截

3.用序列化器把数据进行序列化

4.用分区器选择消息的分区

5.添加进记录累加器

二、NIO发送数据

6.等待数据条数达到批量发送阀值或者新建一个RecoedBatch,立即唤醒Sender线程执行run方法

7.发送器内部从累加器Deque中拿到要发送的数据RecordBatch转换成ClientRequest客户端请求

8.在发送器内部,经由NetworkClient转换成RequestSend(Send接口)并调用Selector暂存进KafkaChannel(NetWorkClient维护的通道Map<String, KafkaChannel> channels)

9.执行nio发送消息(1.Selector.select()2.把KafkaChannel中的Send数据(ByteBuffer[])写入KafkaChannel的写通道GatheringByteChannel)

 

 二、KafkaTemplate模板

spring-kafka提供了简单的KafkaTemplate类,直接调用发送方法即可,只需要让容器知道这个bean即可(具体见第二章实践中xml中配置bean)。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
11 public class KafkaTemplate&lt;K, V&gt; implements KafkaOperations&lt;K, V&gt; {
2  2  14      ...
3 15
4 16     /**
5 17      * Create an instance using the supplied producer factory and autoFlush false.
6 18      * @param producerFactory the producer factory.
7 19      */
8 20     public KafkaTemplate(ProducerFactory&lt;K, V&gt; producerFactory) {
9 21         this(producerFactory, false);
10 22     }
11 23
12 24     /**
13 25      * Create an instance using the supplied producer factory and autoFlush setting.
14 26      * Set autoFlush to true if you wish to synchronously interact with Kafka, calling
15 27      * {@link java.util.concurrent.Future#get()} on the result.
16 28      * @param producerFactory the producer factory.
17 29      * @param autoFlush true to flush after each send.
18 30      */
19 31     public KafkaTemplate(ProducerFactory&lt;K, V&gt; producerFactory, boolean autoFlush) {
20 32         this.producerFactory = producerFactory;
21 33         this.autoFlush = autoFlush;
22 34     }
23 36    ...
24181     /**
25182      * Send the producer record.
26183      * @param producerRecord the producer record.
27184      * @return a Future for the {@link RecordMetadata}.
28185      */
29186     protected ListenableFuture&lt;SendResult&lt;K, V&gt;&gt; doSend(final ProducerRecord&lt;K, V&gt; producerRecord) {
30187         final Producer&lt;K, V&gt; producer = getTheProducer();
31188         if (this.logger.isTraceEnabled()) {
32189             this.logger.trace(&quot;Sending: &quot; + producerRecord);
33190         }
34191         final SettableListenableFuture&lt;SendResult&lt;K, V&gt;&gt; future = new SettableListenableFuture&lt;&gt;();
35192         producer.send(producerRecord, new Callback() {
36193
37194             @Override
38195             public voidonCompletion(RecordMetadata metadata, Exception exception) {
39196                 try {
40197                     if (exception == null) {
41198                         future.set(new SendResult&lt;&gt;(producerRecord, metadata));
42199                         if (KafkaTemplate.this.producerListener != null
43200                                 &amp;&amp; KafkaTemplate.this.producerListener.isInterestedInSuccess()) {
44201                             KafkaTemplate.this.producerListener.onSuccess(producerRecord.topic(),
45202                                     producerRecord.partition(), producerRecord.key(), producerRecord.value(), metadata);
46203                         }
47204                     }
48205                     else {
49206                         future.setException(new KafkaProducerException(producerRecord, &quot;Failed to send&quot;, exception));
50207                         if (KafkaTemplate.this.producerListener != null) {
51208                             KafkaTemplate.this.producerListener.onError(producerRecord.topic(),
52209                                     producerRecord.partition(),
53210                                     producerRecord.key(),
54211                                     producerRecord.value(),
55212                                     exception);
56213                         }
57214                     }
58215                 }
59216                 finally {
60217                     producer.close();
61218                 }
62219             }
63220
64221         });
65222         if (this.autoFlush) {
66223             flush();
67224         }
68225         if (this.logger.isTraceEnabled()) {
69226             this.logger.trace(&quot;Sent: &quot; + producerRecord);
70227         }
71228         return future;
72229     }
73235 }
74

 KafkaTemplate源码重点

1.构造函数,入参ProducerFactory构造工厂和是否自动刷新(缓冲区的records立即发送)

2.发送消息
doSend,这里核心点就2个:

1)producer.
send(producerRecord,
Callback)producer即KafkaProducer

2)
Callback回调
onCompletion完成,
onSuccess,
onError。

三、KafkaProducer

3.1KafkaProducer构造过程


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
11 @SuppressWarnings({&quot;unchecked&quot;, &quot;deprecation&quot;})
2  2     private KafkaProducer(ProducerConfig config, Serializer&lt;K&gt; keySerializer, Serializer&lt;V&gt; valueSerializer) {
3  3         try {
4  4             log.trace(&quot;Starting the Kafka producer&quot;);
5  5             Map&lt;String, Object&gt; userProvidedConfigs = config.originals();
6  6             this.producerConfig = config;
7  7             this.time = new SystemTime();
8  8
9  9             clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG);
10 10             if (clientId.length() &lt;= 0)
11 11                 clientId = &quot;producer-&quot; + PRODUCER_CLIENT_ID_SEQUENCE.getAndIncrement();
12 12             Map&lt;String, String&gt; metricTags = new LinkedHashMap&lt;String, String&gt;();
13 13             metricTags.put(&quot;client-id&quot;, clientId);
14 14             MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG))
15 15                     .timeWindow(config.getLong(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS)
16 16                     .tags(metricTags);
17 17             List&lt;MetricsReporter&gt; reporters = config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG,
18 18                     MetricsReporter.class);
19 19             reporters.add(new JmxReporter(JMX_PREFIX));
20 20             this.metrics = new Metrics(metricConfig, reporters, time);
21 21             this.partitioner = config.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG, Partitioner.class);
22 22             long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);
23 23             if (keySerializer == null) {
24 24                 this.keySerializer = config.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
25 25                         Serializer.class);
26 26                 this.keySerializer.configure(config.originals(), true);
27 27             } else {
28 28                 config.ignore(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
29 29                 this.keySerializer = keySerializer;
30 30             }
31 31             if (valueSerializer == null) {
32 32                 this.valueSerializer = config.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
33 33                         Serializer.class);
34 34                 this.valueSerializer.configure(config.originals(), false);
35 35             } else {
36 36                 config.ignore(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
37 37                 this.valueSerializer = valueSerializer;
38 38             }
39 39
40 40             // load interceptors and make sure they get clientId
41 41             userProvidedConfigs.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
42 42             List&lt;ProducerInterceptor&lt;K, V&gt;&gt; interceptorList = (List) (new ProducerConfig(userProvidedConfigs)).getConfiguredInstances(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
43 43                     ProducerInterceptor.class);
44 44             this.interceptors = interceptorList.isEmpty() ? null : new ProducerInterceptors&lt;&gt;(interceptorList);
45 45
46 46             ClusterResourceListeners clusterResourceListeners = configureClusterResourceListeners(keySerializer, valueSerializer, interceptorList, reporters);
47 47             this.metadata = new Metadata(retryBackoffMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG), true, clusterResourceListeners);
48 48             this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG);
49 49             this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG);
50 50             this.compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG));
51 51             /* check for user defined settings.
52 52              * If the BLOCK_ON_BUFFER_FULL is set to true,we do not honor METADATA_FETCH_TIMEOUT_CONFIG.
53 53              * This should be removed with release 0.9 when the deprecated configs are removed.
54 54              */
55 55             if (userProvidedConfigs.containsKey(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG)) {
56 56                 log.warn(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG + &quot; config is deprecated and will be removed soon. &quot; +
57 57                         &quot;Please use &quot; + ProducerConfig.MAX_BLOCK_MS_CONFIG);
58 58                 boolean blockOnBufferFull = config.getBoolean(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG);
59 59                 if (blockOnBufferFull) {
60 60                     this.maxBlockTimeMs = Long.MAX_VALUE;
61 61                 } else if (userProvidedConfigs.containsKey(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG)) {
62 62                     log.warn(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG + &quot; config is deprecated and will be removed soon. &quot; +
63 63                             &quot;Please use &quot; + ProducerConfig.MAX_BLOCK_MS_CONFIG);
64 64                     this.maxBlockTimeMs = config.getLong(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG);
65 65                 } else {
66 66                     this.maxBlockTimeMs = config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG);
67 67                 }
68 68             } else if (userProvidedConfigs.containsKey(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG)) {
69 69                 log.warn(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG + &quot; config is deprecated and will be removed soon. &quot; +
70 70                         &quot;Please use &quot; + ProducerConfig.MAX_BLOCK_MS_CONFIG);
71 71                 this.maxBlockTimeMs = config.getLong(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG);
72 72             } else {
73 73                 this.maxBlockTimeMs = config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG);
74 74             }
75 75
76 76             /* check for user defined settings.
77 77              * If the TIME_OUT config is set use that for request timeout.
78 78              * This should be removed with release 0.9
79 79              */
80 80             if (userProvidedConfigs.containsKey(ProducerConfig.TIMEOUT_CONFIG)) {
81 81                 log.warn(ProducerConfig.TIMEOUT_CONFIG + &quot; config is deprecated and will be removed soon. Please use &quot; +
82 82                         ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);
83 83                 this.requestTimeoutMs = config.getInt(ProducerConfig.TIMEOUT_CONFIG);
84 84             } else {
85 85                 this.requestTimeoutMs = config.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);
86 86             }
87 87
88 88             this.accumulator = new RecordAccumulator(config.getInt(ProducerConfig.BATCH_SIZE_CONFIG),
89 89                     this.totalMemorySize,
90 90                     this.compressionType,
91 91                     config.getLong(ProducerConfig.LINGER_MS_CONFIG),
92 92                     retryBackoffMs,
93 93                     metrics,
94 94                     time);
95 95
96 96             List&lt;InetSocketAddress&gt; addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
97 97             this.metadata.update(Cluster.bootstrap(addresses), time.milliseconds());
98 98             ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config.values());
99 99             NetworkClient client = new NetworkClient(
100100                     new Selector(config.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), this.metrics, time, &quot;producer&quot;, channelBuilder),
101101                     this.metadata,
102102                     clientId,
103103                     config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION),
104104                     config.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG),
105105                     config.getInt(ProducerConfig.SEND_BUFFER_CONFIG),
106106                     config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG),
107107                     this.requestTimeoutMs, time);
108108             this.sender = newSender(client,
109109                     this.metadata,
110110                     this.accumulator,
111111                     config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION) == 1,
112112                     config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),
113113                     (short) parseAcks(config.getString(ProducerConfig.ACKS_CONFIG)),
114114                     config.getInt(ProducerConfig.RETRIES_CONFIG),
115115                     this.metrics,
116116                     new SystemTime(),
117117                     clientId,
118118                     this.requestTimeoutMs);
119119             String ioThreadName = &quot;kafka-producer-network-thread&quot; + (clientId.length() &gt; 0 ? &quot; | &quot; + clientId : &quot;&quot;);
120120             this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
121121             this.ioThread.start();
122122
123123             this.errors = this.metrics.sensor(&quot;errors&quot;);
124124
125125
126126             config.logUnused();
127127             AppInfoParser.registerAppInfo(JMX_PREFIX, clientId);
128128             log.debug(&quot;Kafka producer started&quot;);
129129         } catch (Throwable t) {
130130             // call close methods if internal objects are already constructed
131131             // this is to prevent resource leak. see KAFKA-2121
132132             close(0, TimeUnit.MILLISECONDS, true);
133133             // now propagate the exception
134134             throw new KafkaException(&quot;Failed to construct kafka producer&quot;, t);
135135         }
136136     }
137

如上图,KafkaProducer包含集合核心组件:

1)
Metadata元数据:维护cluster集群信息、topic信息。

2)
RecordAccumulator记录
累加器: 缓存生产数据,然后批量发送,用以减少IO次数,提升性能。

2)
Sender发送器:metadata+RecordAccumulator+NetworkClient网络客户端

3)
KafkaThread IO线程:一个自定义名称的线程,
Sender作为
Runnable接口,线程
start后,运行Sender的
run方法,go!


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
11 /**
2  2      * The main run loop for the sender thread
3  3      */
4  4     public voidrun() {
5  5         log.debug(&quot;Starting Kafka producer I/O thread.&quot;);
6  6
7  7         // main loop, runs until close is called
8  8         while (running) {
9  9             try {
10 10 run(time.milliseconds());
11 11             } catch (Exception e) {
12 12                 log.error(&quot;Uncaught error in kafka producer I/O thread: &quot;, e);
13 13             }
14 14         }
15 15
16 16         log.debug(&quot;Beginning shutdown of Kafka producer I/O thread, sending remaining records.&quot;);
17 17
18 18         // okay we stopped accepting requests but there may still be
19 19         // requests in the accumulator or waiting for acknowledgment,
20 20         // wait until these are completed.
21 21         while (!forceClose &amp;&amp; (this.accumulator.hasUnsent() || this.client.inFlightRequestCount() &gt; 0)) {
22 22             try {
23 23                 run(time.milliseconds());
24 24             } catch (Exception e) {
25 25                 log.error(&quot;Uncaught error in kafka producer I/O thread: &quot;, e);
26 26             }
27 27         }
28 28         if (forceClose) {
29 29             // We need to fail all the incomplete batches and wake up the threads waiting on
30 30             // the futures.
31 31             this.accumulator.abortIncompleteBatches();
32 32         }
33 33         try {
34 34             this.client.close();
35 35         } catch (Exception e) {
36 36             log.error(&quot;Failed to close network client&quot;, e);
37 37         }
38 38
39 39         log.debug(&quot;Shutdown of Kafka producer I/O thread has completed.&quot;);
40 40     }
41 41
42 42     /**
43 43      * Run a single iteration of sending
44 44      *
45 45      * @param now
46 46      *            The current POSIX time in milliseconds
47 47      */
48 48     void run(long now) {
49 49         Cluster cluster = metadata.fetch();
50 50         // 获取集群中已准备好的分区列表
51 51         RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
52 52
53 53         // 如果有的分区的leader还未知 ,强制更新元数据
54 54         if (!result.unknownLeaderTopics.isEmpty()) {
55 58             for (String topic : result.unknownLeaderTopics)
56 59                 this.metadata.add(topic);
57 60             this.metadata.requestUpdate();
58 61         }
59 62
60 63         // 移除NetworkClient还没准备好的发送到达的节点
61 64         Iterator&lt;Node&gt; iter = result.readyNodes.iterator();
62 65         long notReadyTimeout = Long.MAX_VALUE;
63 66         while (iter.hasNext()) {
64 67             Node node = iter.next();
65 68             if (!this.client.ready(node, now)) {
66 69                 iter.remove();
67 70                 notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now));
68 71             }
69 72         }
70 73
71 74         // 根据准备好的节点,创建生产者请求
72 75         Map&lt;Integer, List&lt;RecordBatch&gt;&gt; batches = this.accumulator.drain(cluster,
73 76                                                                          result.readyNodes,
74 77                                                                          this.maxRequestSize,
75 78                                                                          now);
76 79         if (guaranteeMessageOrder) {
77 80             // Mute all the partitions drained
78 81             for (List&lt;RecordBatch&gt; batchList : batches.values()) {
79 82                 for (RecordBatch batch : batchList)
80 83                     this.accumulator.mutePartition(batch.topicPartition);
81 84             }
82 85         }
83 86      // 超时处理
84 87         List&lt;RecordBatch&gt; expiredBatches = this.accumulator.abortExpiredBatches(this.requestTimeout, now);
85 88         // update sensors
86 89         for (RecordBatch expiredBatch : expiredBatches)
87 90             this.sensors.recordErrors(expiredBatch.topicPartition.topic(), expiredBatch.recordCount);
88 91
89 92         sensors.updateProduceRequestMetrics(batches);
90 93         List&lt;ClientRequest&gt; requests =createProduceRequests(batches, now);
91 94         // 如果存在已就绪节点,置轮询时间为0
92 98         long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
93 99         if (result.readyNodes.size() &gt; 0) {
94100             log.trace(&quot;Nodes with data ready to send: {}&quot;, result.readyNodes);
95101             log.trace(&quot;Created {} produce requests: {}&quot;, requests.size(), requests);
96102             pollTimeout = 0;
97103         }
98104         for (ClientRequest request : requests)
99105             client.send(request, now);
100106
101107         // 1.如果有一些分区已准备好,查询时间为0;  
102109         // 2.否则如果有分区有数据存储但是还没准备好,查询时间在当前时间和滞留过期时间差
103110         // 3.其他情况,查询时间在当前时间和元数据过期时间差
104111         this.client.poll(pollTimeout, now);
105112     }
106

 对创建好的
requests遍历执行:
client.
send(request, now);NetworkClient发送ClientRequest


1
2
3
4
5
6
7
8
9
10
11
12
13
14
11 @Override
2 2     public void send(ClientRequest request, long now) {
3 3         String nodeId = request.request().destination();
4 4         if (!canSendRequest(nodeId))
5 5             throw new IllegalStateException(&quot;Attempt to send a request to node &quot; + nodeId + &quot; which is not ready.&quot;);
6 6 doSend(request, now);
7 7     }
8 8
9 9     private void doSend(ClientRequest request, long now) {
1010         request.setSendTimeMs(now);
1111         this.inFlightRequests.add(request);
1212         selector.send(request.request());
1313     }
14

 


1
2
3
4
5
6
7
8
9
10
11 public voidsend(Send send) {
22         KafkaChannel channel = channelOrFail(send.destination());
33         try {
44             channel.setSend(send);
55         } catch (CancelledKeyException e) {
66             this.failedSends.add(send.destination());
77             close(channel);
88         }
99     }
10

见上图,最终实际上就是构造了一个
KafkaChannel对象,并设置了发送内容和目的地。

**client.poll(pollTimeout, now);**实际的IO读写操作。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
11 @Override
2 2     public List&lt;ClientResponse&gt; poll(long timeout, long now) {
3 3         long metadataTimeout = metadataUpdater.maybeUpdate(now);
4 4         try {
5 5             this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs));
6 6         } catch (IOException e) {
7 7             log.error(&quot;Unexpected error during I/O&quot;, e);
8 8         }
9 9
1010         // 处理执行完后,构建各种ClientResponse添加进responses  
1111         long updatedNow = this.time.milliseconds();
1212         List&lt;ClientResponse&gt; responses = new ArrayList&lt;&gt;();
1313         handleCompletedSends(responses, updatedNow);
1414         handleCompletedReceives(responses, updatedNow);
1515         handleDisconnections(responses, updatedNow);
1616         handleConnections();
1717         handleTimedOutRequests(responses, updatedNow);
1818
1919         //遍历responses处理回调
2020         for (ClientResponse response : responses) {
2121             if (response.request().hasCallback()) {
2222                 try {
2323                     response.request().callback().onComplete(response);
2424                 } catch (Exception e) {
2525                     log.error(&quot;Uncaught error in request completion:&quot;, e);
2626                 }
2727             }
2828         }
2929
3030         return responses;
3131     }
32

核心方法
selector.
poll最终执行了什么?


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
11 public void poll(long timeout) throws IOException {
2 2         if (timeout &lt; 0)
3 3             throw new IllegalArgumentException(&quot;timeout should be &gt;= 0&quot;);
4 4
5 5         clear();
6 6
7 7         if (hasStagedReceives() || !immediatelyConnectedKeys.isEmpty())
8 8             timeout = 0;
9 9
1010         /* check ready keys */
1111         long startSelect = time.nanoseconds();
1212         int readyKeys =select(timeout);
1313         long endSelect = time.nanoseconds();
1414         this.sensors.selectTime.record(endSelect - startSelect, time.milliseconds());
1515
1616         if (readyKeys &gt; 0 || !immediatelyConnectedKeys.isEmpty()) {
1717             pollSelectionKeys(this.nioSelector.selectedKeys(), false, endSelect);
1818             pollSelectionKeys(immediatelyConnectedKeys, true, endSelect);
1919         }
2020
2121         addToCompletedReceives();
2222
2323         long endIo = time.nanoseconds();
2424         this.sensors.ioTime.record(endIo - endSelect, time.milliseconds());
2525
2626         // we use the time at the end of select to ensure that we don&#x27;t close any connections that
2727         // have just been processed in pollSelectionKeys
2828 maybeCloseOldestConnection(endSelect);
2929     }
30

如上图,核心逻辑就2个:查询等待通道,写入数据。

1)

select:等待通道变成就绪状态,返回已准备好的通道数


1
2
3
4
5
6
7
8
9
10
11 private int select(long ms) throws IOException {
22         if (ms &lt; 0L)
33             throw new IllegalArgumentException(&quot;timeout should be &gt;= 0&quot;);
44
55         if (ms == 0L)
66             return this.nioSelector.selectNow();
77         else
88             return this.nioSelector.select(ms);
99     }
10

java.nio.channels.Selector nioSelector看上图,最终其实就是一个JDK自带的JAVA
NIO Selector执行
select方法,自上次调用select()方法后有多少通道变成就绪状态。

Selector.select(ms) 最长阻塞ms毫秒(通道在你注册的事件上就绪)。

Selector.selectNow:
不会阻塞,不管什么通道就绪都立刻返回,没有通道变成可选择的,则此方法直接返回零

NIO Selector

1.JAVA NIO模型

比较多,不在这里展开写,预留飞机票一张。

2.Selector

关于Selector这里就简单引用一张图,有图有真相。

kafka原理和实践(三)spring-kafka生产者源码

2)

pollSelectionKeys 如果已准备好通道数>0,根据key把数据(ByteBuffer)写入指定Channel


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
11 private void pollSelectionKeys(Iterable&lt;SelectionKey&gt; selectionKeys,
2 2                                    boolean isImmediatelyConnected,
3 3                                    long currentTimeNanos) {
4 4         Iterator&lt;SelectionKey&gt; iterator =selectionKeys.iterator();
5 5         while (iterator.hasNext()) {
6 6             SelectionKey key = iterator.next();
7 7             iterator.remove();
8 8             KafkaChannel channel = channel(key);
9 9
1010             // register all per-connection metrics at once
1111             sensors.maybeRegisterConnectionMetrics(channel.id());
1212             if (idleExpiryManager != null)
1313                 idleExpiryManager.update(channel.id(), currentTimeNanos);
1414
1515             try {
1616
1717                 /* complete any connections that have finished their handshake (either normally or immediately) */
1818                 if (isImmediatelyConnected || key.isConnectable()) {
1919                     if (channel.finishConnect()) {
2020                         this.connected.add(channel.id());
2121                         this.sensors.connectionCreated.record();
2222                         SocketChannel socketChannel = (SocketChannel) key.channel();
2323                         log.debug(&quot;Created socket with SO_RCVBUF = {}, SO_SNDBUF = {}, SO_TIMEOUT = {} to node {}&quot;,
2424                                 socketChannel.socket().getReceiveBufferSize(),
2525                                 socketChannel.socket().getSendBufferSize(),
2626                                 socketChannel.socket().getSoTimeout(),
2727                                 channel.id());
2828                     } else
2929                         continue;
3030                 }
3131
3232                 /* 准备好通道 */
3333                 if (channel.isConnected() &amp;&amp; !channel.ready())
3434                     channel.prepare();
3535
3636                 /* 从channel读取数据 */
3737                 if (channel.ready() &amp;&amp; key.isReadable() &amp;&amp; !hasStagedReceive(channel)) {
3838                     NetworkReceive networkReceive;
3939                     while ((networkReceive = channel.read()) != null)
4040                         addToStagedReceives(channel, networkReceive);
4141                 }
4242
4343                 /* 数据写入Channel */
4444                 if (channel.ready() &amp;&amp; key.isWritable()) {
4545                     Send send = channel.write();
4646                     if (send != null) {
4747                         this.completedSends.add(send);
4848                         this.sensors.recordBytesSent(channel.id(), send.size());
4949                     }
5050                 }
5151
5252                 /* cancel any defunct sockets */
5353                 if (!key.isValid()) {
5454                     close(channel);
5555                     this.disconnected.add(channel.id());
5656                 }
5757
5858             } catch (Exception e) {
5959                 String desc = channel.socketDescription();
6060                 if (e instanceof IOException)
6161                     log.debug(&quot;Connection with {} disconnected&quot;, desc, e);
6262                 else
6363                     log.warn(&quot;Unexpected error from {}; closing connection&quot;, desc, e);
6464                 close(channel);
6565                 this.disconnected.add(channel.id());
6666             }
6767         }
6868     }
69

 

3.2 KafkaProducer发送数据

KafkaProducer.send


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
11 @Override
2 2     public Future&lt;RecordMetadata&gt; send(ProducerRecord&lt;K, V&gt; record, Callback callback) {
3 3         // intercept the record, which can be potentially modified; this method does not throw exceptions
4 4         ProducerRecord&lt;K, V&gt; interceptedRecord = this.interceptors == null ? record : this.interceptors.onSend(record);
5 5         returndoSend(interceptedRecord, callback);
6 6     }
7 7
8 8     /**
9 9      * 异步发送一条记录到一个主题的实现类
1010      */
1111     private Future&lt;RecordMetadata&gt; doSend(ProducerRecord&lt;K, V&gt; record, Callback callback) {
1212         TopicPartition tp = null;
1313         try {
1414             // first make sure the metadata for the topic is available
1515             ClusterAndWaitTime clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);
1616             long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
1717             Cluster cluster = clusterAndWaitTime.cluster;
1818             byte[] serializedKey;
1919             try {// 序列化key
2020                 serializedKey = keySerializer.serialize(record.topic(), record.key());
2121             } catch (ClassCastException cce) {
2222                 throw new SerializationException(&quot;Can&#x27;t convert key of class &quot; + record.key().getClass().getName() +
2323                         &quot; to class &quot; + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
2424                         &quot; specified in key.serializer&quot;);
2525             }
2626             byte[] serializedValue;
2727             try {// 序列化value
2828                 serializedValue = valueSerializer.serialize(record.topic(), record.value());
2929             } catch (ClassCastException cce) {
3030                 throw new SerializationException(&quot;Can&#x27;t convert value of class &quot; + record.value().getClass().getName() +
3131                         &quot; to class &quot; + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
3232                         &quot; specified in value.serializer&quot;);
3333             }
3434
3535             int partition = partition(record, serializedKey, serializedValue, cluster);
3636             int serializedSize = Records.LOG_OVERHEAD + Record.recordSize(serializedKey, serializedValue);
3737             ensureValidRecordSize(serializedSize);  
38               // 主题和分区
3938             tp = new TopicPartition(record.topic(), partition);
4039             long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp();
4140             log.trace(&quot;Sending record {} with callback {} to topic {} partition {}&quot;, record, callback, record.topic(), partition);
4241             // producer callback will make sure to call both &#x27;callback&#x27; and interceptor callback
4342             Callback interceptCallback = this.interceptors == null ? callback : new InterceptorCallback&lt;&gt;(callback, this.interceptors, tp);
4443             RecordAccumulator.RecordAppendResult result =accumulator.append(tp, timestamp, serializedKey, serializedValue, interceptCallback, remainingWaitMs);
4544             if (result.batchIsFull || result.newBatchCreated) {
4645                 log.trace(&quot;Waking up the sender since topic {} partition {} is either full or getting a new batch&quot;, record.topic(), partition);
4746                 this.sender.wakeup();
4847             }
4948             return result.future;// 返回Future
5049             // handling exceptions and record the errors;
5150             // for API exceptions return them in the future,
5251             // for other exceptions throw directly
5352         } catch (ApiException e) {
5453             log.debug(&quot;Exception occurred during message send:&quot;, e);
5554             if (callback != null)
5655                 callback.onCompletion(null, e);
5756             this.errors.record();
5857             if (this.interceptors != null)
5958                 this.interceptors.onSendError(record, tp, e);
6059             return new FutureFailure(e);
6160         } catch (InterruptedException e) {
6261             this.errors.record();
6362             if (this.interceptors != null)
6463                 this.interceptors.onSendError(record, tp, e);
6564             throw new InterruptException(e);
6665         } catch (BufferExhaustedException e) {
6766             this.errors.record();
6867             this.metrics.sensor(&quot;buffer-exhausted-records&quot;).record();
6968             if (this.interceptors != null)
7069                 this.interceptors.onSendError(record, tp, e);
7170             throw e;
7271         } catch (KafkaException e) {
7372             this.errors.record();
7473             if (this.interceptors != null)
7574                 this.interceptors.onSendError(record, tp, e);
7675             throw e;
7776         } catch (Exception e) {
7877             // we notify interceptor about all exceptions, since onSend is called before anything else in this method
7978             if (this.interceptors != null)
8079                 this.interceptors.onSendError(record, tp, e);
8180             throw e;
8281         }
8382     }
84

核心方法,

1.把需要发送的数据(
TopicPartition+序列化后的key,value+)添加进RecordAccumulator
记录累加器。

sender.
wakeup()当累加器满了时,唤醒Sender不再阻塞在当前select()方法上。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
11 /**
2 2      * 添加记录进累加器,返回result包含Future、标志位(batch批量发送已满或者新建)  7      * @param tp 主题分区
3 8      * @param timestamp The timestamp of the record
4 9      * @param key 序列化后的key
510      * @param value 序列化后的value
611      * @param callback 请求完成时的回调函数
712      * @param maxTimeToBlock 阻塞最大毫秒数
813      */
914     public RecordAppendResult append(TopicPartition tp,
1015                                      long timestamp,
1116                                      byte[] key,
1217                                      byte[] value,
1318                                      Callback callback,
1419                                      long maxTimeToBlock) throws InterruptedException {
1520         // 条数+1,往累加器中添加数据的条数(abortIncompleteBatches方法会作为条件使用)
1622         appendsInProgress.incrementAndGet();
1723         try {
1824             // 从ConcurrentMap&lt;TopicPartition, Deque&lt;RecordBatch&gt;&gt; batches中获取key=tp的的双向队列,为空新建一个
1925             Deque&lt;RecordBatch&gt; dq = getOrCreateDeque(tp);
2026             synchronized (dq) {// 阻塞双向队列,一直到获取锁,尝试添加进累加器
2127                 if (closed)
2228                     throw new IllegalStateException(&quot;Cannot send after the producer is closed.&quot;);
2329                 RecordAppendResult appendResult =tryAppend(timestamp, key, value, callback, dq);
2430                 if (appendResult != null)// 1.如果添加成功,直接返回
2531                     return appendResult;
2632             }
2733        // =====2.添加失败====
2834             //2.1划分缓存,再次尝试添加进累加器
2935             int size = Math.max(this.batchSize, Records.LOG_OVERHEAD + Record.recordSize(key, value));
3036             log.trace(&quot;Allocating a new {} byte message buffer for topic {} partition {}&quot;, size, tp.topic(), tp.partition());
3137             ByteBuffer buffer = free.allocate(size, maxTimeToBlock);
3238             synchronized (dq) {// 阻塞双向队列,一直到获取锁,尝试添加进累加器
3339                 // 获取双向队列锁之后再次校验生产者是否已关闭
3440                 if (closed)
3541                     throw new IllegalStateException(&quot;Cannot send after the producer is closed.&quot;);
3642
3743                 RecordAppendResult appendResult =tryAppend(timestamp, key, value, callback, dq);
3844                 if (appendResult != null) {
3945                     //2.2添加成功,释放缓冲区
4046                     free.deallocate(buffer);
4147                     return appendResult;
4248                 }//2.3添加失败,构建一个可写入内存的MemoryRecords
4349                 MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression, this.batchSize);
4450                 RecordBatch batch = new RecordBatch(tp, records, time.milliseconds());
4551                 FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, callback, time.milliseconds()));
4652
4753                 dq.addLast(batch);
4854 incomplete.add(batch);// 添加进未完成记录IncompleteRecordBatches
4955                 return new RecordAppendResult(future, dq.size() &gt; 1 || batch.records.isFull(), true);
5056             }
5157         } finally {  
52          // 条数-1,往累加器中添加记录的条数
5358             appendsInProgress.decrementAndGet();
5459         }
5560     }
56

 看上图
append方法,把record添加进累加器调用了三次
tryAppend,前两次一样的最后一个参数是Deque,最后一次的最后一个参数是毫秒数。追踪前两个
tryAppend:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
11 /**
2 2      * If `RecordBatch.tryAppend` fails (i.e. the record batch is full), close its memory records to release temporary
3 3      * resources (like compression streams buffers).
4 4      */
5 5     private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Callback callback, Deque&lt;RecordBatch&gt; deque) {
6 6         RecordBatch last = deque.peekLast();
7 7         if (last != null) {
8 8             FutureRecordMetadata future = last.tryAppend(timestamp, key, value, callback, time.milliseconds());
9 9             if (future == null)
1010                 last.records.close();
1111             else
1212                 return new RecordAppendResult(future, deque.size() &gt; 1 || last.records.isFull(), false);
1313         }
1414         return null;
1515     }
16

如上图,最终还是调用的
tryAppend(timestamp, key, value, callback, time.milliseconds());
追踪:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
11 /**
2 2      * Append the record to the current record set and return the relative offset within that record set
3 3      *
4 4      * @return The RecordSend corresponding to this record or null if there isn&#x27;t sufficient room.
5 5      */
6 6     public FutureRecordMetadata tryAppend(long timestamp, byte[] key, byte[] value, Callback callback, long now) {
7 7         if (!this.records.hasRoomFor(key, value)) {
8 8             return null;
9 9         } else {
1010             long checksum = this.records.append(offsetCounter++, timestamp, key, value);
1111             this.maxRecordSize = Math.max(this.maxRecordSize, Record.recordSize(key, value));
1212             this.lastAppendTime = now;
1313             FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount,
1414                                                                    timestamp, checksum,
1515                                                                    key == null ? -1 : key.length,
1616                                                                    value == null ? -1 : value.length);
1717             if (callback != null)
1818                 thunks.add(new Thunk(callback, future));
1919             this.recordCount++;
2020             returnfuture;
2121         }
2222     }
23

如上图,
append实际就是往
RecordBatch的
MemoryRecords(封装了ByteBuffer等信息)中添加当前record。返回一个
FutureRecordMetadata。

最终封装成
RecordAppendResult 返回,至此完成了往累加器accumulator中添加一条record。

再次回归到KafkaTemplete生产者模板发送消息时doSend方法,当KafkaProducer.send发送消息完毕时,如果设置了自动刷新,则执行KafkaProducer.flush()


1
2
3
4
5
6
7
8
9
10
11
12
11 @Override
2 2     public void flush() {
3 3         log.trace(&quot;Flushing accumulated records in producer.&quot;);
4 4         this.accumulator.beginFlush();
5 5         this.sender.wakeup();
6 6         try {
7 7             this.accumulator.awaitFlushCompletion();
8 8         } catch (InterruptedException e) {
9 9             throw new InterruptException(&quot;Flush interrupted.&quot;, e);
1010         }
1111     }
12

KafkaProducer.flush()==》
accumulator.
awaitFlushCompletion()==》RecordBatch.
produceFuture.await()


1
2
3
4
5
6
7
8
9
10
11
12
11 /**
2 2      * Mark all partitions as ready to send and block until the send is complete
3 3      */
4 4     public void awaitFlushCompletion() throws InterruptedException {
5 5         try {
6 6             for (RecordBatch batch : this.incomplete.all())
7 7                 batch.produceFuture.await();
8 8         } finally {
9 9             this.flushesInProgress.decrementAndGet();
1010         }
1111     }
12

1
2
3
4
5
6
7
8
9
11 private final CountDownLatch latch = new CountDownLatch(1);
22
33 /**
44      * Await the completion of this request
55      */
66     public void await() throws InterruptedException {
77 latch.await();
88     }
9

如上图,
awaitFlushCompletion
遍历未完成的
RecordBatch的
ProduceRequestResult (生产请求结果)用一个倒计数器(1个任务)等待完成。

 四、总结

本章,我们结合流程图从kafaTemplete入手分析了kafka生产者发送消息的主要源码,现在看来主要就两个模块,一个是存储数据进累加器缓存,第二个是发送器 netty NIO发送消息。我们发现生产者发送消息源码并不复杂。下一章,讲解消费者源码。

给TA打赏
共{{data.count}}人
人已打赏
安全网络

CDN安全市场到2022年价值76.3亿美元

2018-2-1 18:02:50

安全技术

CentOS 7安装Maven脚本

2021-8-18 16:36:11

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索