Kafka发送超过broker限定大小的消息时Client和Broker端各自会有什么异常?

释放双眼,带上耳机,听听看~!

前几天遇到一个bug,查看发送日志发现java.io.IOException: Broken pipe的错误,通过深入了解发现当kafka producer发送的消息体大于Broker配置的默认值时就会报这个异常。如果仅发送一次是不会报这个异常的,要连续发送才会报这个异常。

本博文记录一下当Kafka发送超过broker限定大小的消息时Client和Broker端各自会有什么异常。

Kafka Broker Configs中有一个参数:message.max.bytes——用来指定消息的大小。

当Producer向Broker发送一个比Kafka Broker配置的阈值还要大的一个消息时,Producer端和Broker端会有什么异常情况。
Producer端测试代码:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
1public class Producer {
2
3    public static final String brokerList = "10.198.197.59:9092";
4    public static final String topic = "versionTopic";
5
6    public static void main(String[] args) {
7        Properties properties = new Properties();
8        properties.put("serializer.class", "kafka.serializer.StringEncoder");
9        properties.put("metadata.broker.list", brokerList);
10
11        ProducerConfig config = new ProducerConfig(properties);
12        kafka.javaapi.producer.Producer producer = new kafka.javaapi.producer.Producer<Integer, String>(config);
13
14        String message = getMessage(1 * 1024 * 1024);
15
16        for(int i=0;i<3;i++) {
17            KeyedMessage<Integer, String> keyedMessage = new KeyedMessage<Integer, String>(topic, message);
18            producer.send(keyedMessage);
19            System.out.println("=============================");
20        }
21
22        try {
23            TimeUnit.SECONDS.sleep(50);
24        } catch (InterruptedException e) {
25            e.printStackTrace();
26        }
27
28    }
29
30    public static String getMessage(int msgSize) {
31        StringBuilder stringBuilder = new StringBuilder();
32        for(int i=0;i<msgSize;i++) {
33            stringBuilder.append("x");
34        }
35        return stringBuilder.toString();
36    }
37}
38

Producer端输出:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
12017-02-28 16:19:31 -[INFO] - [Verifying properties] - [kafka.utils.Logging$class:68]
22017-02-28 16:19:31 -[INFO] - [Property metadata.broker.list is overridden to 10.198.197.59:9092] - [kafka.utils.Logging$class:68]
32017-02-28 16:19:31 -[INFO] - [Property serializer.class is overridden to kafka.serializer.StringEncoder] - [kafka.utils.Logging$class:68]
42017-02-28 16:19:31 -[INFO] - [Fetching metadata from broker id:0,host:10.198.197.59,port:9092 with correlation id 0 for 1 topic(s) Set(versionTopic)] - [kafka.utils.Logging$class:68]
52017-02-28 16:19:31 -[INFO] - [Connected to 10.198.197.59:9092 for producing] - [kafka.utils.Logging$class:68]
62017-02-28 16:19:31 -[INFO] - [Disconnecting from 10.198.197.59:9092] - [kafka.utils.Logging$class:68]
72017-02-28 16:19:31 -[INFO] - [Connected to 10.198.197.59:9092 for producing] - [kafka.utils.Logging$class:68]
8=============================
92017-02-28 16:19:34 -[INFO] - [Disconnecting from 10.198.197.59:9092] - [kafka.utils.Logging$class:68]
102017-02-28 16:19:34 -[WARN] - [Failed to send producer request with correlation id 4 to broker 0 with data for partitions [versionTopic,0]] - [kafka.utils.Logging$class:89]
11java.io.IOException: 你的主机中的软件中止了一个已建立的连接。(ps:如果没有中文,这里会出现“java.io.IOException: Broken pipe”的报错。)
12    at sun.nio.ch.SocketDispatcher.writev0(Native Method)
13    at sun.nio.ch.SocketDispatcher.writev(SocketDispatcher.java:55)
14    at sun.nio.ch.IOUtil.write(IOUtil.java:148)
15    at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:504)
16    at java.nio.channels.SocketChannel.write(SocketChannel.java:502)
17    at kafka.network.BoundedByteBufferSend.writeTo(BoundedByteBufferSend.scala:56)
18    at kafka.network.Send$class.writeCompletely(Transmission.scala:75)
19    at kafka.network.BoundedByteBufferSend.writeCompletely(BoundedByteBufferSend.scala:26)
20    at kafka.network.BlockingChannel.send(BlockingChannel.scala:103)
21    at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
22    at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
23  at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:103)
24  at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
25  at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
26  at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
27  at kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:102)
28    at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
29  at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
30    at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
31    at kafka.producer.SyncProducer.send(SyncProducer.scala:101)
32    at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:255)
33    at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:106)
34  at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:100)
35    at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
36    at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
37  at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
38    at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
39    at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
40    at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
41    at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
42    at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:100)
43    at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72)
44    at kafka.producer.Producer.send(Producer.scala:77)
45    at kafka.javaapi.producer.Producer.send(Producer.scala:33)
46    at com.kafka.Producer.main(Producer.java:30)
472017-02-28 16:19:34 -[INFO] - [Back off for 100 ms before retrying send. Remaining retries = 3] - [kafka.utils.Logging$class:68]
482017-02-28 16:19:34 -[INFO] - [Fetching metadata from broker id:0,host:10.198.197.59,port:9092 with correlation id 5 for 1 topic(s) Set(versionTopic)] - [kafka.utils.Logging$class:68]
492017-02-28 16:19:34 -[INFO] - [Connected to 10.198.197.59:9092 for producing] - [kafka.utils.Logging$class:68]
502017-02-28 16:19:34 -[INFO] - [Disconnecting from 10.198.197.59:9092] - [kafka.utils.Logging$class:68]
512017-02-28 16:19:34 -[INFO] - [Disconnecting from 10.198.197.59:9092] - [kafka.utils.Logging$class:68]
522017-02-28 16:19:34 -[INFO] - [Connected to 10.198.197.59:9092 for producing] - [kafka.utils.Logging$class:68]
53=============================
542017-02-28 16:19:38 -[INFO] - [Disconnecting from 10.198.197.59:9092] - [kafka.utils.Logging$class:68]
552017-02-28 16:19:38 -[WARN] - [Failed to send producer request with correlation id 9 to broker 0 with data for partitions [versionTopic,0]] - [kafka.utils.Logging$class:89]
56java.io.IOException: 你的主机中的软件中止了一个已建立的连接。(ps:如果没有中文,这里会出现“java.io.IOException: Broken pipe”的报错。)
57    at sun.nio.ch.SocketDispatcher.writev0(Native Method)
58    at sun.nio.ch.SocketDispatcher.writev(SocketDispatcher.java:55)
59    at sun.nio.ch.IOUtil.write(IOUtil.java:148)
60    at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:504)
61    at java.nio.channels.SocketChannel.write(SocketChannel.java:502)
62    at kafka.network.BoundedByteBufferSend.writeTo(BoundedByteBufferSend.scala:56)
63    at kafka.network.Send$class.writeCompletely(Transmission.scala:75)
64    at kafka.network.BoundedByteBufferSend.writeCompletely(BoundedByteBufferSend.scala:26)
65    at kafka.network.BlockingChannel.send(BlockingChannel.scala:103)
66    at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
67    at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
68  at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:103)
69  at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
70  at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
71  at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
72  at kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:102)
73    at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
74  at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
75    at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
76    at kafka.producer.SyncProducer.send(SyncProducer.scala:101)
77    at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:255)
78    at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:106)
79  at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:100)
80    at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
81    at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
82  at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
83    at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
84    at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
85    at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
86    at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
87    at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:100)
88    at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72)
89    at kafka.producer.Producer.send(Producer.scala:77)
90    at kafka.javaapi.producer.Producer.send(Producer.scala:33)
91    at com.kafka.Producer.main(Producer.java:30)
922017-02-28 16:19:38 -[INFO] - [Back off for 100 ms before retrying send. Remaining retries = 3] - [kafka.utils.Logging$class:68]
932017-02-28 16:19:38 -[INFO] - [Fetching metadata from broker id:0,host:10.198.197.59,port:9092 with correlation id 10 for 1 topic(s) Set(versionTopic)] - [kafka.utils.Logging$class:68]
942017-02-28 16:19:38 -[INFO] - [Connected to 10.198.197.59:9092 for producing] - [kafka.utils.Logging$class:68]
952017-02-28 16:19:38 -[INFO] - [Disconnecting from 10.198.197.59:9092] - [kafka.utils.Logging$class:68]
962017-02-28 16:19:38 -[INFO] - [Disconnecting from 10.198.197.59:9092] - [kafka.utils.Logging$class:68]
972017-02-28 16:19:38 -[INFO] - [Connected to 10.198.197.59:9092 for producing] - [kafka.utils.Logging$class:68]
98=============================
99

注意输出中的:java.io.IOException: 你的主机中的软件中止了一个已建立的连接。(ps:如果没有中文,这里会出现“java.io.IOException: Broken pipe”的报错。)

而Broker端会有报错:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
1[2017-02-28 16:04:03,384] INFO Closing socket connection to /10.101.48.240. (kafka.network.Processor)
2[2017-02-28 16:04:06,466] ERROR [KafkaApi-0] Error processing ProducerRequest with correlation id 2 from client  on partition [versionTopic,0] (kafka.server.KafkaApis)
3kafka.common.MessageSizeTooLargeException: Message size is 1048602 bytes which exceeds the maximum configured message size of 1000012.
4    at kafka.log.Log$$anonfun$analyzeAndValidateMessageSet$1.apply(Log.scala:378)
5   at kafka.log.Log$$anonfun$analyzeAndValidateMessageSet$1.apply(Log.scala:361)
6    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
7    at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:32)
8    at kafka.log.Log.analyzeAndValidateMessageSet(Log.scala:361)
9    at kafka.log.Log.append(Log.scala:257)
10    at kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:379)
11  at kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:365)
12    at kafka.utils.Utils$.inLock(Utils.scala:535)
13    at kafka.utils.Utils$.inReadLock(Utils.scala:541)
14    at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:365)
15    at kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:291)
16  at kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:282)
17    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
18  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
19    at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
20  at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
21    at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
22    at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
23    at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
24    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
25    at scala.collection.AbstractTraversable.map(Traversable.scala:105)
26    at kafka.server.KafkaApis.appendToLocalLog(KafkaApis.scala:282)
27    at kafka.server.KafkaApis.handleProducerOrOffsetCommitRequest(KafkaApis.scala:204)
28    at kafka.server.KafkaApis.handle(KafkaApis.scala:59)
29    at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:59)
30    at java.lang.Thread.run(Thread.java:745)
31[2017-02-28 16:04:06,467] INFO [KafkaApi-0] Send the close connection response due to error handling produce request [clientId = , correlationId = 2, topicAndPartition = [versionTopic,0]] with Ack=0 (kafka.server.KafkaApis)
32[2017-02-28 16:04:06,629] INFO Closing socket connection to /10.101.48.240. (kafka.network.Processor)
33[2017-02-28 16:04:09,921] ERROR [KafkaApi-0] Error processing ProducerRequest with correlation id 7 from client  on partition [versionTopic,0] (kafka.server.KafkaApis)
34kafka.common.MessageSizeTooLargeException: Message size is 1048602 bytes which exceeds the maximum configured message size of 1000012.
35    at kafka.log.Log$$anonfun$analyzeAndValidateMessageSet$1.apply(Log.scala:378)
36  at kafka.log.Log$$anonfun$analyzeAndValidateMessageSet$1.apply(Log.scala:361)
37    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
38    at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:32)
39    at kafka.log.Log.analyzeAndValidateMessageSet(Log.scala:361)
40    at kafka.log.Log.append(Log.scala:257)
41    at kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:379)
42  at kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:365)
43    at kafka.utils.Utils$.inLock(Utils.scala:535)
44    at kafka.utils.Utils$.inReadLock(Utils.scala:541)
45    at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:365)
46    at kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:291)
47  at kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:282)
48    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
49  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
50    at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
51  at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
52    at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
53    at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
54    at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
55    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
56    at scala.collection.AbstractTraversable.map(Traversable.scala:105)
57    at kafka.server.KafkaApis.appendToLocalLog(KafkaApis.scala:282)
58    at kafka.server.KafkaApis.handleProducerOrOffsetCommitRequest(KafkaApis.scala:204)
59    at kafka.server.KafkaApis.handle(KafkaApis.scala:59)
60    at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:59)
61    at java.lang.Thread.run(Thread.java:745)
62[2017-02-28 16:04:09,922] INFO [KafkaApi-0] Send the close connection response due to error handling produce request [clientId = , correlationId = 7, topicAndPartition = [versionTopic,0]] with Ack=0 (kafka.server.KafkaApis)
63[2017-02-28 16:04:10,096] INFO Closing socket connection to /10.101.48.240. (kafka.network.Processor)
64[2017-02-28 16:04:13,374] ERROR [KafkaApi-0] Error processing ProducerRequest with correlation id 12 from client  on partition [versionTopic,0] (kafka.server.KafkaApis)
65kafka.common.MessageSizeTooLargeException: Message size is 1048602 bytes which exceeds the maximum configured message size of 1000012.
66    at kafka.log.Log$$anonfun$analyzeAndValidateMessageSet$1.apply(Log.scala:378)
67  at kafka.log.Log$$anonfun$analyzeAndValidateMessageSet$1.apply(Log.scala:361)
68    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
69    at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:32)
70    at kafka.log.Log.analyzeAndValidateMessageSet(Log.scala:361)
71    at kafka.log.Log.append(Log.scala:257)
72    at kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:379)
73  at kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:365)
74    at kafka.utils.Utils$.inLock(Utils.scala:535)
75    at kafka.utils.Utils$.inReadLock(Utils.scala:541)
76    at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:365)
77    at kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:291)
78  at kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:282)
79    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
80  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
81    at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
82  at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
83    at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
84    at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
85    at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
86    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
87    at scala.collection.AbstractTraversable.map(Traversable.scala:105)
88    at kafka.server.KafkaApis.appendToLocalLog(KafkaApis.scala:282)
89    at kafka.server.KafkaApis.handleProducerOrOffsetCommitRequest(KafkaApis.scala:204)
90    at kafka.server.KafkaApis.handle(KafkaApis.scala:59)
91    at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:59)
92    at java.lang.Thread.run(Thread.java:745)
93[2017-02-28 16:04:13,375] INFO [KafkaApi-0] Send the close connection response due to error handling produce request [clientId = , correlationId = 12, topicAndPartition = [versionTopic,0]] with Ack=0 (kafka.server.KafkaApis)
94

注意输出中的:kafka.common.MessageSizeTooLargeException: Message size is 1048602 bytes which exceeds the maximum configured message size of 1000012.这句。


注意:当kafka一切正常,producer端发送也会出现这样的INFO:


1
2
3
4
5
6
7
8
9
12017-03-07 20:06:03 -[INFO] - [Verifying properties] - [kafka.utils.Logging$class:68]
22017-03-07 20:06:04 -[INFO] - [Property metadata.broker.list is overridden to 10.198.197.59:9092] - [kafka.utils.Logging$class:68]
32017-03-07 20:06:04 -[INFO] - [Property serializer.class is overridden to kafka.serializer.StringEncoder] - [kafka.utils.Logging$class:68]
42017-03-07 20:06:04 -[INFO] - [Fetching metadata from broker id:0,host:10.198.197.59,port:9092 with correlation id 0 for 1 topic(s) Set(testTopic)] - [kafka.utils.Logging$class:68]
52017-03-07 20:06:04 -[INFO] - [Connected to 10.198.197.59:9092 for producing] - [kafka.utils.Logging$class:68]
62017-03-07 20:06:04 -[INFO] - [Disconnecting from 10.198.197.59:9092] - [kafka.utils.Logging$class:68]
72017-03-07 20:06:04 -[INFO] - [Connected to 10.198.197.59:9092 for producing] - [kafka.utils.Logging$class:68]
8(之后producer发送数据)
9

看倒数三行,咋一看以为是出了异常,但事实上这是正常的INFO, 至于为什么先Connected又Disconnecting又Connected那就不得而知了,等博主翻阅了kafka的源码之后再来解释这个现象咯~

给TA打赏
共{{data.count}}人
人已打赏
安全运维

MongoDB最简单的入门教程之三 使用Java代码往MongoDB里插入数据

2021-12-11 11:36:11

安全运维

Ubuntu上NFS的安装配置

2021-12-19 17:36:11

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索