使用Log4j将程序日志实时写入Kafka

释放双眼,带上耳机,听听看~!

 第一部分 搭建Kafka环境

安装Kafka

下载:http://kafka.apache.org/downloads.html


1
2
3
1tar zxf kafka-<VERSION>.tgz
2cd kafka-<VERSION>
3

启动Zookeeper

启动Zookeeper前需要配置一下config/zookeeper.properties:

使用Log4j将程序日志实时写入Kafka

接下来启动Zookeeper


1
2
1bin/zookeeper-server-start.sh config/zookeeper.properties
2

启动Kafka Server

启动Kafka Server前需要配置一下config/server.properties。主要配置以下几项,内容就不说了,注释里都很详细:

使用Log4j将程序日志实时写入Kafka

然后启动Kafka Server:


1
2
1bin/kafka-server-start.sh config/server.properties
2

创建Topic


1
2
1bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
2

查看创建的Topic


1
2
1bin/kafka-topics.sh --list --zookeeper localhost:2181
2

启动控制台Producer,向Kafka发送消息


1
2
3
4
1bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
2This is a message
3This is another message
4

启动控制台Consumer,消费刚刚发送的消息


1
2
3
4
1bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
2This is a message
3This is another message
4

删除Topic


1
2
1bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic test
2

注:
只有当delete.topic.enable=true时,该操作才有效

配置Kafka集群(单台机器上)

首先拷贝server.properties文件为多份(这里演示
4个节点的Kafka集群,因此还需要拷贝3份配置文件):


1
2
3
4
1cp config/server.properties config/server1.properties
2cp config/server.properties config/server2.properties
3cp config/server.properties config/server3.properties
4

修改server1.properties的以下内容:


1
2
3
4
1broker.id=1
2port=9093
3log.dir=/tmp/kafka-logs-1
4

同理修改server2.properties和server3.properties的这些内容,并保持所有配置文件的zookeeper.connect属性都指向运行在本机的zookeeper地址localhost:2181。注意,由于这几个Kafka节点都将运行在同一台机器上,因此需要保证这几个值不同,这里以累加的方式处理。例如在server2.properties上:


1
2
3
4
1broker.id=2
2port=9094
3log.dir=/tmp/kafka-logs-2
4

把server3.properties也配置好以后,依次启动这些节点:


1
2
3
4
1bin/kafka-server-start.sh config/server1.properties &
2bin/kafka-server-start.sh config/server2.properties &
3bin/kafka-server-start.sh config/server3.properties &
4

Topic & Partition

Topic在逻辑上可以被认为是一个queue,每条消费都必须指定它的Topic,可以简单理解为必须指明把这条消息放进哪个queue里。为了使得Kafka的吞吐率可以线性提高,物理上把Topic分成一个或多个Partition,每个Partition在物理上对应一个文件夹,该文件夹下存储这个Partition的所有消息和索引文件。

现在在Kafka集群上创建备份因子为3,分区数为4的Topic:


1
2
1bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 4 --topic kafka
2

说明:备份因子replication-factor越大,则说明集群容错性越强,就是当集群down掉后,数据恢复的可能性越大。所有的分区数里的内容共同组成了一份数据,分区数partions越大,则该topic的消息就越分散,集群中的消息分布就越均匀。

然后使用kafka-topics.sh的–describe参数查看一下Topic为kafka的详情:

使用Log4j将程序日志实时写入Kafka

输出的第一行是所有分区的概要,接下来的每一行是一个分区的描述。可以看到Topic为kafka的消息,PartionCount=4,ReplicationFactor=3正是我们创建时指定的分区数和备份因子。

另外:Leader是指负责这个分区所有读写的节点;Replicas是指这个分区所在的所有节点(不论它是否活着);ISR是Replicas的子集,代表存有这个分区信息而且当前活着的节点。

拿partition:0这个分区来说,该分区的Leader是server0,分布在id为0,1,2这三个节点上,而且这三个节点都活着。

再来看下Kafka集群的日志:

使用Log4j将程序日志实时写入Kafka

其中kafka-logs-0代表server0的日志,kafka-logs-1代表server1的日志,以此类推。

从上面的配置可知,id为0,1,2,3的节点分别对应server0, server1, server2, server3。而上例中的partition:0分布在id为0, 1, 2这三个节点上,因此可以在server0, server1, server2这三个节点上看到有kafka-0这个文件夹。这个kafka-0就代表Topic为kafka的partion0。

第二部分 Kafka+Log4j项目整合

先来看下Maven项目结构图:

使用Log4j将程序日志实时写入Kafka

pom.xml引入的jar包:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
1<dependencies>
2    <dependency>
3        <groupId>junit</groupId>
4        <artifactId>junit</artifactId>
5        <version>4.12</version>
6    </dependency>
7
8    <dependency>
9        <groupId>org.apache.kafka</groupId>
10        <artifactId>kafka-clients</artifactId>
11        <version>0.10.2.0</version>
12    </dependency>
13    <dependency>
14        <groupId>org.apache.kafka</groupId>
15        <artifactId>kafka_2.10</artifactId>
16        <version>0.10.2.0</version>
17    </dependency>
18    <dependency>
19        <groupId>org.apache.kafka</groupId>
20        <artifactId>kafka-log4j-appender</artifactId>
21        <version>0.10.2.0</version>
22    </dependency>
23    <dependency>
24        <groupId>com.google.guava</groupId>
25        <artifactId>guava</artifactId>
26        <version>18.0</version>
27    </dependency>
28</dependencies>
29

重要的内容是log4j.properties:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
1log4j.rootLogger=debug,Console
2
3# appender kafka
4log4j.appender.kafka=org.apache.kafka.log4jappender.KafkaLog4jAppender
5log4j.appender.kafka.topic=kafkaTest
6log4j.appender.kafka.syncSend=false
7# multiple brokers are separated by comma ",".
8log4j.appender.kafka.brokerList=192.168.1.163:9092
9log4j.appender.kafka.layout=org.apache.log4j.PatternLayout
10log4j.appender.kafka.layout.ConversionPattern=%d [%-5p] [%t] - [%l] %m%n
11
12#输出日志到控制台
13log4j.appender.Console=org.apache.log4j.ConsoleAppender
14log4j.appender.Console.Threshold=all
15log4j.appender.Console.layout=org.apache.log4j.PatternLayout
16log4j.appender.Console.layout.ConversionPattern=%-d{yyyy-MM-dd HH\:mm\:ss} [%c\:%L]-[%p] %m%n
17
18#kafka
19log4j.logger.com.demo.kafka.Log4jToKafka=info,kafka
20#关闭spring低级别日志
21log4j.logger.org.springside.examples.miniweb=ERROR
22log4j.logger.com.octo.captcha.service.image.DefaultManageableImageCaptchaService=ERROR
23log4j.logger.com.mchange.v2.resourcepool.BasicResourcePool=ERROR
24log4j.logger.com.mchange.v2.c3p0.impl.C3P0PooledConnectionPool=ERROR
25log4j.logger.com.mchange.v2.c3p0.impl.NewPooledConnection=ERROR
26log4j.logger.com.mchange.v2.c3p0.management.DynamicPooledDataSourceManagerMBean=ERROR
27log4j.logger.com.mchange.v2.c3p0.C3P0Registry=ERROR
28log4j.logger.com.mchange.v2.log.MLog=ERROR
29log4j.logger.com.mchange.v2.c3p0.impl.AbstractPoolBackedDataSource=ERROR
30

log4j输出日志:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
1package com.demo.kafka;
2import org.apache.log4j.Logger;
3
4/**
5 * INFO: info User: xuchao Date: 2017/3/17 Version: 1.0 History:
6 * <p>
7 * 如果有修改过程,请记录
8 * </P>
9 */
10
11public class Log4jToKafka {
12    private static Logger logger = Logger.getLogger(Log4jToKafka.class);
13
14    public static void main(String args[]) {
15        System.out.println("hello word!");
16        int start = 1;
17        while (true) {
18            start++;
19            logger.info(start + "hello Log4jToKafka test !");
20            try {
21                Thread.sleep(50l);
22            } catch (InterruptedException e) {
23                e.printStackTrace();
24            }
25        }
26
27    }
28}
29

消费kafka中的信息:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
1package com.demo.kafka;
2
3/**
4 * INFO: info
5 * User: zhaokai
6 * Date: 2017/3/17
7 * Version: 1.0
8 * History: <p>如果有修改过程,请记录</P>
9 */
10
11import java.util.Arrays;
12import java.util.Properties;
13
14import org.apache.kafka.clients.consumer.ConsumerRecord;
15import org.apache.kafka.clients.consumer.ConsumerRecords;
16import org.apache.kafka.clients.consumer.KafkaConsumer;
17
18public class Consumer {
19
20    public static void main(String[] args) {
21        System.out.println("begin consumer");
22        connectionKafka();
23        System.out.println("finish consumer");
24    }
25
26    @SuppressWarnings("resource")
27    public static void connectionKafka() {
28
29        Properties props = new Properties();
30        props.put("bootstrap.servers", "192.168.1.163:9092");
31        props.put("group.id", "testConsumer");
32        props.put("enable.auto.commit", "true");
33        props.put("auto.commit.interval.ms", "1000");
34        props.put("session.timeout.ms", "30000");
35        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
36        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
37        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
38        consumer.subscribe(Arrays.asList("kafkaTest"));
39        while (true) {
40            ConsumerRecords<String, String> records = consumer.poll(100);
41            try {
42                Thread.sleep(2000);
43            } catch (InterruptedException e) {
44                e.printStackTrace();
45            }
46            for (ConsumerRecord<String, String> record : records) {
47                System.out.printf("===================offset = %d, key = %s, value = %s", record.offset(), record.key(),
48                        record.value());
49            }
50        }
51    }
52}
53

MyProducer.java用于向Kafka发送消息,但不通过log4j的appender发送。此案例中可以不要。但是我还是放在这里:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
1package com.demo.kafka;
2
3import java.util.ArrayList;
4import java.util.List;
5import java.util.Properties;
6import kafka.javaapi.producer.Producer;
7import kafka.producer.KeyedMessage;
8import kafka.producer.ProducerConfig;
9
10public class MyProducer {
11    private static final String TOPIC = "kafka";
12    private static final String CONTENT = "This is a single message";
13    private static final String BROKER_LIST = "localhost:9092";
14    private static final String SERIALIZER_CLASS = "kafka.serializer.StringEncoder";
15
16    public static void main(String[] args) {
17        Properties props = new Properties();
18        props.put("serializer.class", SERIALIZER_CLASS);
19        props.put("metadata.broker.list", BROKER_LIST);
20
21        ProducerConfig config = new ProducerConfig(props);
22        Producer<String, String> producer = new Producer<String, String>(config);
23
24        // Send one message.
25        KeyedMessage<String, String> message = new KeyedMessage<String, String>(TOPIC, CONTENT);
26        producer.send(message);
27
28        // Send multiple messages.
29        List<KeyedMessage<String, String>> messages = new ArrayList<KeyedMessage<String, String>>();
30        for (int i = 0; i < 5; i++) {
31            messages.add(new KeyedMessage<String, String>(TOPIC, "Multiple message at a time. " + i));
32        }
33        producer.send(messages);
34    }
35}
36

到这里,代码就结束了。

第三部分 运行与验证

先运行Consumer,使其处于监听状态。同时,还可以启动Kafka自带的ConsoleConsumer来验证是否跟Consumer的结果一致。最后运行Log4jToKafka.java。

先来看看Consumer的输出:

使用Log4j将程序日志实时写入Kafka

再来看看ConsoleConsumer的输出:

使用Log4j将程序日志实时写入Kafka

可以看到,尽管发往Kafka的消息去往了不同的地方,但是内容是一样的,而且一条也不少。最后再来看看Kafka的日志。

使用Log4j将程序日志实时写入Kafka

我们知道,Topic为kafka的消息有4个partion,从之前的截图可知这4个partion均匀分布在4个kafka节点上,于是我对每一个partion随机选取一个节点查看了日志内容。

上图中黄色选中部分依次代表在server0上查看partion0,在server1上查看partion1,以此类推。

而红色部分是日志内容,由于在创建Topic时准备将20条日志分成4个区存储,可以很清楚的看到,这20条日志确实是很均匀的存储在了几个partion上。

摘一点Infoq上的话:每个日志文件都是一个log entrie序列,每个log entrie包含一个4字节整型数值(值为N+5),1个字节的"magic value",4个字节的CRC校验码,其后跟N个字节的消息体。每条消息都有一个当前Partition下唯一的64字节的offset,它指明了这条消息的起始位置。磁盘上存储的消息格式如下:


1
2
3
4
5
1message length : 4 bytes (value: 1+4+n)
2"magic" value : 1 byte
3crc : 4 bytes
4payload : n bytes
5

这里我们看到的日志文件的每一行,就是一个log entrie,每一行前面无法显示的字符(蓝色选中部分),就是(message length + magic value + crc)了。而log entrie的后部分,则是消息体的内容了。

给TA打赏
共{{data.count}}人
人已打赏
安全网络

CDN安全市场到2022年价值76.3亿美元

2018-2-1 18:02:50

安全技术

PHP+redis实现session共享

2022-1-12 12:36:11

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索