kafka入门学习笔记
1、目标
- 1、掌握kafka相关概念
- 2、掌握搭建一个kafka集群
- 3、掌握kafka生产者和消费者代码开发
- 4、掌握kafka的分区策略
- 5、掌握kafka整合flume
- 6、掌握kafka如何保证消息不丢失
2、kafka概述
2.1 kafka是什么
kafka是由linkedin开源,捐献apache基金会,它是一个实时的分布式消息队列。
它提供了一个对于实时处理下高可靠,高性能,高吞吐量、低延迟的平台
Kafka是一个分布式消息队列:生产者、消费者的功能。它提供了类似于JMS的特性,但是在设计实现上完全不同,此外它并不是JMS规范的实现。
2.2 消息队列的作用
- 核心作用 : 解耦、异步、并行
2.3 kafka与activeMQ区别
activeMQ:它是一个严格的JMS(java message)框架实现,后期需要有严格的事务去控制
kafka:它并不是一个严格的JMS(java message)框架实现,它是类似于JMS框架 , 它会主动把数据从kafka集群中拉取过来,它追求的高吞吐量。
2.3.1、在架构模型方面
RabbitMQ遵循AMQP协议,RabbitMQ的broker由Exchange,Binding,queue组成,其中exchange和binding组成了消息的路由键;客户端Producer通过连接channel和server进行通信,Consumer从queue获取消息进行消费(长连接,queue有消息会推送到consumer端,consumer循环从输入流读取数据)。rabbitMQ以broker为中心;有消息的确认机制。
kafka遵从一般的MQ结构,producer,broker,consumer,以consumer为中心,消息的消费信息保存的客户端consumer上,consumer根据消费的点,从broker上批量pull数据;无消息确认机制。
2.3.2、在吞吐量
kafka具有高的吞吐量,内部采用消息的批量处理,zero-copy机制,数据的存储和获取是本地磁盘顺序批量操作,具有O(1)的复杂度,消息处理的效率很高。
rabbitMQ在吞吐量方面稍逊于kafka,他们的出发点不一样,rabbitMQ支持对消息的可靠的传递,支持事务,不支持批量的操作;基于存储的可靠性的要求存储可以采用内存或者硬盘。
2.3.3、在可用性方面
rabbitMQ支持miror的queue,主queue失效,miror queue接管。kafka的broker支持主备模式。
2.3.4、在集群负载均衡方面
kafka采用zookeeper对集群中的broker、consumer进行管理,可以注册topic到zookeeper上;通过zookeeper的协调机制,producer保存对应topic的broker信息,可以随机或者轮询发送到broker上;并且producer可以基于语义指定分片,消息发送到broker的某分片上。
3、kafka集群架构
-
1、Producer
-
生产者
-
数据通过生产者写入到kafka集群中
-
2、broker
-
kafka集群中每一个节点就是一个broker,后期kafka的数据就存放在每一个broker
-
3、topic
-
消息的主题,它是一类消息的聚集 , 每个topic将被分成多个partition(区),在集群的配置文件中配置。
-
4、partition
-
分区概念
-
一个topic中有很多个分区,每一个分区就存在一部分数据。
* 每个partition由多个segment组成
* 任何发布到此partition的消息都会被直接追加到log文件的尾部
* 每个partition在内存中对应一个index列表,记录每个segment中的第一条消息偏移。这样查找消息的时候,先在index列表中定位消息位置,再读取文件,速度快
* 发布者发到某个topic的消息会被均匀的分布到多个part上,broker收到发布消息往对应part的最后一个segment上添加该消息。 -
5、replication
-
副本
-
一个topic中有很多个分区,每一个分区构建多个副本,保证数据的安全可靠性
-
6、segment
-
它就是用来存储每一个分区中的数据,它里面包括了2类文件
-
一个是log文件,它用于存在该分区的数据
* 一个是index文件,它用于存在数据的索引信息数据 -
就是为log文件中的数据构建索引
-
方便后期能够快速定位到我们需要的数据在整个log文件的哪一块
1
2
3
41 * 每个segment中存储多条消息,消息id由其逻辑位置决定,即从消息id可直接定位到消息的存储位置,避免id到位置的额外映射
2 * 当某个segment上的消息条数达到配置值或消息发布时间超过阈值时,segment上的消息会被flush到磁盘,只有flush到磁盘上的消息订阅者才能订阅到
3 * segment达到一定的大小(可以通过配置文件设定,默认1G)后将不会再往该segment写数据,broker会创建新的segment
4 -
7、zookeeper
-
主要是使用zk帮我们管理kafka集群的元数据信息
-
kafka每一个broker地址
* 所有topic的信息
* 消费者的信息 -
8、consumer
-
消费者
-
消费者后期去消费kafka集群中topic的数据
-
条件
-
1、kafka集群地址
* 2、需要消费的topic名称
* 3、消费的topic的偏移量(记录了消费的位置,从哪一块开始消费) -
9、offset
-
偏移量
-
它就是记录下每一个消费者消费的位置在哪里
- 有2中保存方式
-
第一种
-
可以通过kafka集群自己去保存,这个时候由它自身有一个内置的topic去存储偏移量
* __consumer_offsets -
它默认有50个分区,这些分区就存在了消费者消费数据的偏移量
1
21 * 第二种
2 -
可以通过zk去保存
-
作用
-
1
2
3 1它是记录了每一个消费者消费topic每一个分区的位置,好处:方便于后期消费者程序挂掉了,然后正常启动,启动之后,它会读取上一次消费的记录,继续向后面消费。
2
3
4、kafka集群安装部署
-
1、下载对应的安装包
-
访问kafka官网:kafka.apache.org
- https://archive.apache.org/dist/kafka/1.0.0/kafka_2.11-1.0.0.tgz
- kafka_2.11-1.0.0.tgz
-
2、规划安装目录
-
/export/servers
-
3、上传安装包到服务器中
-
4、解压安装包到指定的安装目录
-
tar -zxvf kafka_2.11-1.0.0.tgz -C /export/servers
-
5、重命名解压目录
-
mv kafka_2.11-1.0.0 kafka
-
6、修改配置文件
-
在node1上进去到kafka安装目录下有一个config文件夹
-
vim server.properties , 修改和添加如下配置即可
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 1#指定broker的id,它是唯一标识,不能够重复
2broker.id=0
3
4#指定当前broker的服务地址
5host.name=node-1
6
7#kafka集群数据存放的目录
8log.dirs=/export/servers/kafka/kafka-logs
9
10#指定依赖zk的地址
11zookeeper.connect=node-1:2181,node-2:2181,node-3:2181
12
13#指定kafka中的topic是否可以删除,默认是false,表示不可以删除,改为true,可以删除
14delete.topic.enable=true
15
16
-
7、配置kafka环境变量
-
vim /etc/profile
1
2
3
4 1export KAFKA_HOME=/export/servers/kafka
2export PATH=$PATH:$KAFKA_HOME/bin
3
4
-
8、分发kafka安装目录和环境变量
1
2
3
4
5
6 1scp -r kafka node-2:/export/servers
2scp -r kafka node-3:/export/servers
3scp /etc/profile node-2:/etc
4scp /etc/profile node-3:/etc
5
6
-
9、修改node-2和node-3配置文件信息
-
node-2
-
vim server.properties
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 1#指定broker的id,它是唯一标识,不能够重复
2broker.id=1
3
4#指定当前broker的服务地址
5host.name=node-2
6
7#kafka集群数据存放的目录
8log.dirs=/export/servers/kafka/kafka-logs
9
10#指定依赖zk的地址
11zookeeper.connect=node-1:2181,node-2:2181,node-3:2181
12
13#指定kafka中的topic是否可以删除,默认是false,表示不可以删除,改为true,可以删除
14delete.topic.enable=true
15
16
1
2 1* node-3
2
-
vim server.properties
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 1#指定broker的id,它是唯一标识,不能够重复
2broker.id=2
3
4#指定当前broker的服务地址
5host.name=node-3
6
7#kafka集群数据存放的目录
8log.dirs=/export/servers/kafka/kafka-logs
9
10#指定依赖zk的地址
11zookeeper.connect=node-1:2181,node-2:2181,node-3:2181
12
13#指定kafka中的topic是否可以删除,默认是false,表示不可以删除,改为true,可以删除
14delete.topic.enable=true
15
16
-
10、让所有kafka节点环境变量生效
-
在所有kafka节点执行
-
source /etc/profile
5、kafka集群启动和停止
-
1、启动
-
1、先启动zk集群
- 2、然后再启动kafka集群
-
需要再每一个kafka节点执行
1
2
3 1nohup kafka-server-start.sh /export/servers/kafka/config/server.properties > /dev/null 2>&1 &
2
3
1
2 1 * 一键启动脚本
2
-
vim start-kafka.sh
1
2
3
4
5
6
7
8 1#!/bin/sh
2for host in node-1 node-2 node-3
3do
4 ssh $host "source /etc/profile;nohup kafka-server-start.sh /export/servers/kafka/config/server.properties > /dev/null 2>&1 &"
5 echo "$host kafka is running"
6done
7
8
1
2 1 * sh start-kafka.sh
2
-
2、停止
-
1、需要再每一个台kafka节点执行
-
kafka-server-stop.sh
1
2
3
4
5
6
7 1这个脚本由于不同的linux版本,有一定问题(centos 6.x)
2
3ps ax | grep -i 'kafka\.Kafka' | grep java | grep -v grep | awk '{print $1}'
4改为
5ps ax | grep -i 'kafka' | grep java | grep -v grep | awk '{print $1}'
6
7
1
2 1 * 一键关闭脚本
2
1
2
3
4
5
6
7
8 1#!/bin/sh
2for host in node-1 node-2 node-3
3do
4 ssh $host "source /etc/profile;kafka-server-stop.sh"
5 echo "$host kafka is stop"
6done
7
8
6、kafka管理命令的使用
-
1、创建topic
-
kafka-topics.sh
1
2
3
4
5
6
7
8
9 1kafka-topics.sh --create --topic test --partitions 3 --replication-factor 2 --zookeeper node-1:2181,node-2:2181,node-3:2181
2
3--create :表示要创建
4--topic:指定要创建的topic名称
5--partitions:指定要创建的topic有几个分区
6--replication-factor:指定副本数
7--zookeeper:指定zk地址
8
9
-
2、查看kafka集群有哪些topic
-
kafka-topics.sh
1
2
3
4 1kafka-topics.sh --list --zookeeper node-1:2181,node-2:2181,node-3:2181
2--list:查看kafka集群有哪些topic
3
4
-
3、模拟一个生产者向topic发送数据
-
kafka-console-producer.sh
1
2
3
4
5
6 1kafka-console-producer.sh --topic test --broker-list node-1:9092,node-2:9092,node-3:9092
2
3--topic:指定向哪一个topic生产数据
4--broker-list :指定kafka集群地址
5
6
-
4、模拟一个消费者去消费topic的数据
-
kafka-console-consumer.sh
1
2
3
4
5
6
7
8
9 1kafka-console-consumer.sh --bootstrap-server node-1:9092,node-2:9092,node-3:9092 --from-beginning --topic test
2
3--bootstrap-server:指定kafka集群地址
4--from-beginning:指定从第一条数据开始消费
5--topic:指定消费哪一个topic数据
6
7kafka-console-consumer.sh --zookeeper node-1:2181,node-2:2181,node-3:2181 --from-beginning --topic test
8
9
-
5、删除topic
-
kafka-topics.sh
1
2
3
4
5
6 1kafka-topics.sh --delete --topic test --zookeeper node1:2181,node2:2181,node3:2181
2--delete:表示要删除操作
3--topic:指定要删除的topic名称
4--zookeeper :指定zk服务地址
5
6
7、kafka生产者和消费者java代码开发
-
引入依赖
1
2
3
4
5
6
7
8
9 1<dependencies>
2 <dependency>
3 <groupId>org.apache.kafka</groupId>
4 <artifactId>kafka-clients</artifactId>
5 <version>1.0.0</version>
6 </dependency>
7</dependencies>
8
9
7.1 生产者代码开发
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58 1package cn.itcast.kafka;
2
3import org.apache.kafka.clients.producer.KafkaProducer;
4import org.apache.kafka.clients.producer.Producer;
5import org.apache.kafka.clients.producer.ProducerRecord;
6
7import java.util.Properties;
8
9//todo:开发一个kafka的生产者代码
10public class KafkaProducerStudy {
11 public static void main(String[] args) {
12 Properties props = new Properties();
13 //kafka集群地址
14 props.put("bootstrap.servers", "node-1:9092,node-2:9092,node-3:9092");
15 //kafka的acks消息确认机制
16 //acks一共有4个选项
17 //-1和all:表示生产者发送数据给topic,需要所有该topic分区副本把数据保存正常
18 //1: 表示生产者发送数据给topic,只需要分区的主副本已经把数据保存正常
19 //0:生产者只管发数据,不需要确认,丢失数据可能性最高
20 props.put("acks", "all");
21 //重试次数
22 props.put("retries", 0);
23 //每个批次写入数据的大小
24 props.put("batch.size", 16384);
25 //延迟多久进行写入
26 props.put("linger.ms", 1);
27 //缓冲区的内存大小
28 props.put("buffer.memory", 33554432);
29 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
30 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
31
32 //设置自己的分区函数
33 props.put("partitioner.class","cn.itcast.kafka.MyPartitioner");
34
35 Producer<String, String> producer = new KafkaProducer<String, String>(props);
36 for (int i = 0; i < 100; i++)
37 //ProducerRecord<String, String> 有2个泛型 第一个String表示消息的key类型,在这里表示消息的标识,第二个String表示消息内容本身
38 //构建ProducerRecord对象需要3个参数:第一个是topic名称,第二个就是消息的key,第三个消息内容本身
39 // producer.send(new ProducerRecord<String, String>("test", Integer.toString(i), "hadoop spark"));
40 //kafka分区策略:4种分区策略
41 //1、指定具体的分区号,数据就按照指定的分区号,流入到对应分区中
42 //producer.send(new ProducerRecord<String, String>("test", 0,Integer.toString(i), "hadoop spark"));
43
44 //2、不指定具体的分区号,指定消息的key(不断变化) 按照key.hashcode%分区数=分区号,hashPartitioner
45 //producer.send(new ProducerRecord<String, String>("test", Integer.toString(i), "hadoop spark"));
46
47 //3、不指定具体的分区号,也不指定消息的key,它是采用轮训(随机)的方式写入到不同分区中。
48 //producer.send(new ProducerRecord<String, String>("test","hadoop spark"));
49
50 //4、自定义分区函数
51 producer.send(new ProducerRecord<String, String>("test", Integer.toString(i), "hadoop spark"));
52
53
54 producer.close();
55 }
56}
57
58
7.2 消费者代码开发
-
1、自动提交偏移量
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40 1package cn.itcast.kafka;
2
3import org.apache.kafka.clients.consumer.ConsumerRecord;
4import org.apache.kafka.clients.consumer.ConsumerRecords;
5import org.apache.kafka.clients.consumer.KafkaConsumer;
6
7import java.util.Arrays;
8import java.util.Properties;
9
10//todo:开发kafka的消费者代码-----自动提交偏移量
11public class KafkaConsumerStudy {
12 public static void main(String[] args) {
13 Properties props = new Properties();
14 //kafka集群地址
15 props.put("bootstrap.servers", "node-1:9092,node-2:9092,node-3:9092");
16 //消费者组id
17 props.put("group.id", "test");
18 //自动提交消费的偏移量
19 props.put("enable.auto.commit", "true");
20 //每隔多久提交一次偏移量
21 props.put("auto.commit.interval.ms", "1000");
22 //key反序列化类
23 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
24 //value 反序列化类
25 props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
26
27 KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
28 //指定消费的topic名称,可以有多个
29 consumer.subscribe(Arrays.asList("test"));
30 while (true) {
31 //指定数据拉取的时间间隔
32 ConsumerRecords<String, String> records = consumer.poll(100);
33 for (ConsumerRecord<String, String> record : records)
34 System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
35 }
36 }
37}
38
39
40
-
2、手动提交偏移量
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47 1package cn.itcast.kafka;
2
3import org.apache.kafka.clients.consumer.ConsumerRecord;
4import org.apache.kafka.clients.consumer.ConsumerRecords;
5import org.apache.kafka.clients.consumer.KafkaConsumer;
6
7import java.util.ArrayList;
8import java.util.Arrays;
9import java.util.List;
10import java.util.Properties;
11
12//todo:开发一个kafka消费者程序-------自己提交偏移量
13public class KafkaConsumerManualOffset {
14 public static void main(String[] args) {
15 Properties props = new Properties();
16 //指定kafka集群地址
17 props.put("bootstrap.servers", "node1:9092,node2:9092,node3:9092");
18 //消费者组id
19 props.put("group.id", "test");
20 // 手动提交偏移量
21 props.put("enable.auto.commit", "false");
22 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
23 props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
24 KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
25 consumer.subscribe(Arrays.asList("test"));
26 final int minBatchSize = 200;
27 List<ConsumerRecord<String, String>> buffer = new ArrayList<ConsumerRecord<String, String>>();
28 while (true) {
29 ConsumerRecords<String, String> records = consumer.poll(100);
30 for (ConsumerRecord<String, String> record : records) {
31 buffer.add(record);
32 }
33 //判断下数据有没有超过200条数据
34 if (buffer.size() >= minBatchSize) {
35 // insertIntoDb(buffer); //具体的处理逻辑
36 System.out.println("当前总条数据:"+buffer.size());
37
38 //手动提交偏移量
39 consumer.commitSync();
40 buffer.clear();
41 }
42 }
43 }
44}
45
46
47
8、kafka的分区策略
-
当前生产者产生的数据到底会流入到topic的哪一个分区中去?这里就涉及到kafka的分区策略
-
kafka分区策略:4种分区策略
-
1、指定具体的分区号,数据就按照指定的分区号,流入到对应分区中
1
2
3 1producer.send(new ProducerRecord<String, String>("test", 0,Integer.toString(i), "hadoop spark"));
2
3
1
2 1* 2、不指定具体的分区号,指定消息的key(不断变化) 按照key.hashcode%分区数=分区号
2
1
2
3 1producer.send(new ProducerRecord<String, String>("test", Integer.toString(i), "hadoop spark"));
2
3
1
2 1* 3、不指定具体的分区号,也不指定消息的key,它是采用轮训(随机)的方式写入到不同分区中
2
1
2
3 1producer.send(new ProducerRecord<String, String>("test","hadoop spark"));
2
3
1
2 1* 4、自定义分区函数
2
1
2
3
4
5
6 1 //设置自己的分区函数
2props.put("partitioner.class","cn.itcast.kafka.MyPartitioner");
3
4producer.send(new ProducerRecord<String, String>("test", Integer.toString(i), "hadoop spark"));
5
6
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55 1package cn.itcast.kafka;
2
3import org.apache.kafka.clients.producer.KafkaProducer;
4import org.apache.kafka.clients.producer.Producer;
5import org.apache.kafka.clients.producer.ProducerRecord;
6
7import java.util.Properties;
8
9//todo:开发一个kafka的生产者代码
10public class KafkaProducerStudy {
11 public static void main(String[] args) {
12 Properties props = new Properties();
13 //kafka集群地址
14 props.put("bootstrap.servers", "node1:9092,node2:9092,node3:9092");
15 //kafka的acks消息确认机制
16 props.put("acks", "all");
17 //重试次数
18 props.put("retries", 0);
19 //每个批次写入数据的大小
20 props.put("batch.size", 16384);
21 //延迟多久进行写入
22 props.put("linger.ms", 1);
23 //缓冲区的内存大小
24 props.put("buffer.memory", 33554432);
25 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
26 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
27
28 //设置自己的分区函数
29 props.put("partitioner.class","cn.itcast.kafka.MyPartitioner");
30
31 Producer<String, String> producer = new KafkaProducer<String, String>(props);
32 for (int i = 0; i < 100; i++)
33 //ProducerRecord<String, String> 有2个泛型 第一个String表示消息的key类型,在这里表示消息的标识,第二个String表示消息内容本身
34 //构建ProducerRecord对象需要3个参数:第一个是topic名称,第二个就是消息的key,第三个消息内容本身
35 // producer.send(new ProducerRecord<String, String>("test", Integer.toString(i), "hadoop spark"));
36 //kafka分区策略:4种分区策略
37 //1、指定具体的分区号,数据就按照指定的分区号,流入到对应分区中
38 //producer.send(new ProducerRecord<String, String>("test", 0,Integer.toString(i), "hadoop spark"));
39
40 //2、不指定具体的分区号,指定消息的key(不断变化) 按照key.hashcode%分区数=分区号,hashPartitioner
41 //producer.send(new ProducerRecord<String, String>("test", Integer.toString(i), "hadoop spark"));
42
43 //3、不指定具体的分区号,也不指定消息的key,它是采用轮训(随机)的方式写入到不同分区中。
44 //producer.send(new ProducerRecord<String, String>("test","hadoop spark"));
45
46 //4、自定义分区函数
47 producer.send(new ProducerRecord<String, String>("test", Integer.toString(i), "hadoop spark"));
48
49
50 producer.close();
51 }
52}
53
54
55
-
自定义分区函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39 1package cn.itcast.kafka;
2
3import org.apache.kafka.clients.producer.Partitioner;
4import org.apache.kafka.common.Cluster;
5
6import java.util.Map;
7
8//自定义分区函数
9public class MyPartitioner implements Partitioner{
10 /**
11 * 该方法会返回一个分区号
12 * @param topic topic的名称
13 * @param key 消息的key
14 * @param keyBytes 消息的key字节数组
15 * @param value 消息的内容
16 * @param valueBytes 消息的内容字节数组
17 * @param cluster kafka集群对象
18 * @return
19 */
20 public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
21 //自己去实现hashPartitioner key.hashcode%分区数=分区号
22 int numPartions = cluster.partitionsForTopic("test").size();
23 //test有3个分区,对应的分区号就分别为:0 1 2
24 // -2 -1 0 1 2
25
26 return Math.abs(key.hashCode()%numPartions);
27 }
28
29 public void close() {
30
31 }
32
33 public void configure(Map<String, ?> configs) {
34
35 }
36}
37
38
39
9、kafka文件存储机制
可以见参考资源《kafka的文件存储机制.md》
note:
- 一个topic中的分区数据只能够被同一个消费者组的一个线程取消费
- 不同消费者组的线程可以同时消费一个topic中的同一个分区数据
生产者在生产数据的时候 , 是有一定的顺序 , 这些数据按照不同的分区规则写入到不同的分区中 . 这个时候 , 消费者在消费数据的时候 , 它是以分区为单位进行消费 , 只有一个消费者的时候 , 先消费哪一个分区 , 然后再消费其他分区 , 这个时候就无法保证消费的顺序 .
如果想要保证生产的数据的顺序和消费数据的顺序一致 , 在这里只能够设置当前topic的分区数就是一个 , 对于kafka框架来说 , 它是一个分布式消息队列 , 这种设置与它的分布式的理念是有违背的.
为什么kafka可以快速的定义那一条数据在哪?
- 通过log文件命令规则使用二分查询 , 快速定义要找的数据在哪个文件中
- 然后在通过对应的index文件 , 为数据构建了稀疏索引 , 并不是为每一条数据构建索引 , 这是为了避免空间浪费 , 后期通过索引文件快速定义要找的数据在整个log文件的哪一行
具体原因参考《kafka为什么那么快.md》
10、kafka整合flume
-
1、安装flume和kafka
-
2、修改flume配置
-
vim flume-kafka.conf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24 1#为我们的source channel sink起名
2a1.sources = r1
3a1.channels = c1
4a1.sinks = k1
5#指定我们的source收集到的数据发送到哪个管道
6a1.sources.r1.channels = c1
7#指定我们的source数据收集策略
8a1.sources.r1.type = spooldir
9a1.sources.r1.spoolDir = /export/servers/flumedata
10a1.sources.r1.deletePolicy = never
11a1.sources.r1.fileSuffix = .COMPLETED
12a1.sources.r1.ignorePattern = ^(.)*\\.tmp$
13a1.sources.r1.inputCharset = utf-8
14#指定我们的channel为memory,即表示所有的数据都装进memory当中
15a1.channels.c1.type = memory
16#指定我们的sink为kafka sink,并指定我们的sink从哪个channel当中读取数据
17a1.sinks.k1.channel = c1
18a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
19a1.sinks.k1.kafka.topic = test
20a1.sinks.k1.kafka.bootstrap.servers = node-1:9092,node-2:9092,node-3:9092
21a1.sinks.k1.kafka.flumeBatchSize = 20
22a1.sinks.k1.kafka.producer.acks = 1
23
24
-
3、启动flume(需要在flume文件目录下)
1
2
3 1bin/flume-ng agent -n a1 -c conf -f conf/flume-kafka.conf -Dflume.root.logger=info,console
2
3
11、kafka如何保证数据不丢失
-
1、生产者保证数据不丢失
-
就是利用kafka的ack机制
-
同步模式
1
2
3
4
5
6 1//指定为同步模式
2producer.type=sync
3//ack确认机制等于1,只需要主副本确认数据保存成功就可以了,后期从副本自己去同步数据
4request.required.acks=1
5
6
1
2 1 * 异步模式
2
1
2
3
4
5
6
7
8
9
10
11
12
13
14 1//指定为异步模式
2producer.type=async
3//ack确认机制等于1,只需要主副本确认数据保存成功就可以了,后期从副本自己去同步数据
4request.required.acks=1
5//指定数据缓存到什么时候发送出去
6queue.buffering.max.ms=5000
7//指定数据缓存到多少条之后发送出去
8queue.buffering.max.messages=10000
9//数据达到了发送的阈值,后期由于一些原因导致数据并没有发送出去,这个时候对于缓存的数据是否保留,-1保留未成功发送的数据, 0就是不保留,直接舍弃掉。
10queue.enqueue.timeout.ms = -1
11//每次发送的数据量条数
12batch.num.messages=200
13
14
-
2、broker—kafka集群自己本身
-
kafka中有很多个topic,每一个topic有很多个分区,每一个分区有多个副本。通过多副本机制保证数据的安全性
-
3、消费者保证数据不丢失
-
每一个消费者在消费数据的时候,都把当前消费的位置记录下来,后续消费者程序挂掉了,然后正常重启,读取上一次消费的偏移量offset,接着上一次继续消费。
12、kafkaManager监控工具的安装与使用
可以参考资料《kafka_manager监控工具的安装与使用.md》文档