一、KafkaOffsetMonitor简述
KafkaOffsetMonitor是Kafka的一款客户端消费监控工具,用来实时监控Kafka服务的Consumer以及它们所在的Partition中的Offset,我们可以浏览当前的消费者组,并且每个Topic的所有Partition的消费情况都可以一目了然。
二、KafkaOffsetMonitor下载
KafkaOffsetMonitor托管在Github上,可以通过Github下载。
下载地址:https://github.com/quantifind/KafkaOffsetMonitor/releases
或者下载百度网盘:链接:https://pan.baidu.com/s/1geEBEvT 密码:jaeu
三、KafkaOffsetMonitor启动
将下载下来的KafkaOffsetMonitor jar包上传到linux上,可以新建一个目录KafkaMonitor,用于存放KafkaOffsetMonitor-assembly-0.2.0.jar进入到KafkaMonitor目录下,通过java编译命令来运行这个jar包:
1
2
3
4
5
6
7
8
9
10
11
12
13
14 1[root@kafka50 KafkaMonitor]# java -cp KafkaOffsetMonitor-assembly-0.2.0.jar com.quantifind.kafka.offsetapp.OffsetGetterWeb --zk 10.0.0.50:12181,10.0.0.60:12181,10.0.0.70:12181 --port 8088 --refresh 5.seconds --retain 1.days
2按回车后,可以看到控制台输出:
3
4serving resources from: jar:file:/data/KafkaMonitor/KafkaOffsetMonitor-assembly-0.2.0.jar!/offsetapp
5SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
6SLF4J: Defaulting to no-operation (NOP) logger implementation
7SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
82018-01-05 21:17:36.267:INFO:oejs.Server:jetty-7.x.y-SNAPSHOT
9log4j:WARN No appenders could be found for logger (org.I0Itec.zkclient.ZkConnection).
10log4j:WARN Please initialize the log4j system properly.
11log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
122018-01-05 21:17:36.630:INFO:oejsh.ContextHandler:started o.e.j.s.ServletContextHandler{/,jar:file:/data/KafkaMonitor/KafkaOffsetMonitor-assembly-0.2.0.jar!/offsetapp}
132018-01-05 21:17:36.662:INFO:oejs.AbstractConnector:Started SocketConnector@0.0.0.0:8088
14
如果没有指定端口,则默认会开启一个随机端口。
1
2
3
4
5
6
7
8 1参数说明:
2
3zk :zookeeper主机地址,如果有多个,用逗号隔开
4port :应用程序端口
5refresh :应用程序在数据库中刷新和存储点的频率
6retain :在db中保留多长时间
7dbName :保存的数据库文件名,默认为offsetapp
8
为了更方便的启动KafkaOffsetMonitor,可以写一个启动脚本来直接运行,我这里新建一个名为:kafka-monitor-start.sh的脚本,然后编辑这个脚本:
1
2
3
4
5
6
7
8 1[root@kafka50 KafkaMonitor]# vim kafka-monitor-start.sh
2java -Xms512M -Xmx512M -Xss1024K -XX:PermSize=256m -XX:MaxPermSize=512m -cp KafkaOffsetMonitor-assembly-0.2.0.jar com.quantifind.kafka.offsetapp.OffsetGetterWeb \
3--port 8088 \
4--zk 10.0.0.50:12181,10.0.0.60:12181,10.0.0.70:12181 \
5--refresh 5.minutes \
6--retain 1.day >/dev/null 2>&1;
7
8
然后退出保存即可,接下来修改一下kafka-monitor-start.sh的权限
1
2 1[root@kafka50 KafkaMonitor]# chmod +x kafka-monitor-start.sh
2
启动KafkaOffsetMonitor:
1
2
3
4
5
6 1[root@kafka50 KafkaMonitor]# nohup /data/KafkaMonitor/kafka-monitor-start.sh &
2[1] 6551
3[root@kafka50 KafkaMonitor]# lsof -i:8088
4COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME
5java 6552 root 16u IPv6 26047 0t0 TCP *:radan-http (LISTEN)
6
四、KafkaOffsetMonitor Web UI
在游览器中输入:http://ip:port即可以查看KafkaOffsetMonitor Web UI,如下图:
在下图中有一个Visualizations选项卡,点击其中的Cluster Overview可以查看当前Kafka集群的Broker情况
五、简单的Producer
1、新建一个Topic
首先为本次试验新建一个Topic,命令如下:
1
2
3
4
5
6
7 1bin/kafka-topics.sh \
2 --create \
3 --zookeeper 10.0.0.50:12181 \
4 --replication-factor 3 \
5 --partition 3 \
6 --topic kafkamonitor-simpleproducer
7
2、新建SimpleProducer代码
在上一篇文章中提到的Producer封装Github代码的基础上,写了一个往kafkamonitor-simpleproducer发送message的java代码。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 1import com.ckm.kafka.producer.impl.KafkaProducerToolImpl;
2import com.ckm.kafka.producer.inter.KafkaProducerTool;
3
4/**
5 * Created by ckm on 2016/8/30.
6 */
7public class SimpleProducer {
8 public static void main(String[] args) {
9 KafkaProducerTool kafkaProducerTool = new KafkaProducerToolImpl();
10 int i = 0;
11 String message = "";
12 while (true) {
13 message = "test-simple-producer : " + i ++;
14 kafkaProducerTool.publishMessage("kafkamonitor-simpleproducer", message);
15 }
16 }
17}
18
程序运行效果:
3、ConsoleConsumer消费该topic
用kafka自带的ConsoleConsumer消费kafkamonitor-simpleproducer中的message。
1
2 1bin/kafka-console-consumer.sh --zookeeper m000:2181 --from-beginning --topic kafkamonitor-simpleproducer
2
消费截图如下:
4、KafkaOffsetMonitor页面
(1)在Topic List选项卡中,我们可以看到刚才新建的kafkamonitor-simpleproducer
(2)点开后,能看到有一个console-consumer正在消费该topic
(3)继续进入该Consumer,可以查看该Consumer当前的消费状况
这张图片的左上角显示了当前Topic的生产速率,右上角显示了当前Consumer的消费速率。
图片中还有三种颜色的线条,蓝色的表示当前Topic中的Message数目,灰色的表示当前Consumer消费的offset位置,红色的表示蓝色灰色的差值,即当前Consumer滞后于Producer的message数目。
(4)看一眼各partition中的message消费情况
从上图可以看到,当前有3个Partition,每个Partition中的message数目分布很不均匀。这里可以与接下来的自定义Producer的情况进行一个对比。
六、自定义Partitioner的Producer
1、新建一个Topic
1
2
3
4
5
6
7 1bin/kafka-topics.sh \
2 --create \
3 --zookeeper 10.0.0.50:12181 \
4 --replication-factor 3 \
5 --partition 3 \
6 --topic kafkamonitor-partitionedproducer
7
2、Partitioner代码
逻辑很简单,循环依次往各Partition中发送message。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 1import kafka.producer.Partitioner;
2
3/**
4 * Created by ckm on 2018/1/8.
5 */
6public class TestPartitioner implements Partitioner {
7 public TestPartitioner() {
8 }
9
10 @Override
11 public int partition(Object key, int numPartitions) {
12 int intKey = (int) key;
13 return intKey % numPartitions;
14 }
15}
16
3、Producer代码
将自定义的Partitioner设置到Producer,其他调用过程和二中类似。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 1import com.ckm.kafka.producer.impl.KafkaProducerToolImpl;
2import com.ckm.kafka.producer.inter.KafkaProducerTool;
3
4/**
5 * Created by ckm on 2016/8/30.
6 */
7public class PartitionedProducer {
8 public static void main(String[] args) {
9 KafkaProducerTool kafkaProducerTool = new KafkaProducerToolImpl();
10 kafkaProducerTool.getProducerProperties().put("partitioner.class", "TestPartitioner");
11 int i = 0;
12 String message = "";
13 while (true) {
14 message = "test-partitioner-producer : " + i;
15 System.out.println(message);
16 kafkaProducerTool.publishPartitionedMessage("kafkamonitor-partitionedproducer", i + "", message);
17 i ++;
18 }
19 }
20}
21
代码运行效果如下图:
4、ConsoleConsumer消费Message
1
2 1bin/kafka-console-consumer.sh --zookeeper 10.0.0.50:12181 --from-beginning --topic kafkamonitor-partitionedproducer
2
消费效果如下图:
5、KafkaOffsetMonitor页面
其他页面与上面的类似,这里只观察一下每个partition中的message数目与第二节中的对比。可以看到这里每个Partition中message分别是很均匀的。
注意事项:
注意这里有一个坑,默认情况下Producer往一个不存在的Topic发送message时会自动创建这个Topic。由于在这个封装中,有同时传递message和topic的情况,如果调用方法时传入的参数反了,将会在Kafka集群中自动创建Topic。在正常情况下,应该是先把Topic根据需要创建好,然后Producer往该Topic发送Message,最好把Kafka这个默认自动创建Topic的功能关掉。
那么,假设真的不小心创建了多余的Topic,在删除时,会出现“marked for deletion”提示,只是将该topic标记为删除,使用list命令仍然能看到。如果需要调整这两个功能的话,在server.properties中配置如下两个参数:
auto.create.topics.enable
true
Enable auto creation of topic on the server
delete.topic.enable
false
Enables delete topic. Delete topic through the admin tool will have no effect if this config is turned off
七,KafkaOffsetMonitor 总结
KafkaOffsetMonitor:程序一个jar包的形式运行,部署较为方便。只有监控功能,使用起来也较为安全。
除了KafkaOffsetMonitor,Kafka监控工具还有另外两款:
Kafka Web Console:监控功能较为全面,可以预览消息,监控Offset、Lag等信息,但存在bug,不建议在生产环境中使用。
Kafka Manager:偏向Kafka集群管理,若操作不当,容易导致集群出现故障。对Kafka实时生产和消费消息是通过JMX实现的。没有记录Offset、Lag等信息。