kafka入门

释放双眼,带上耳机,听听看~!

kafka入门学习笔记

1、目标

  • 1、掌握kafka相关概念
  • 2、掌握搭建一个kafka集群
  • 3、掌握kafka生产者和消费者代码开发
  • 4、掌握kafka的分区策略
  • 5、掌握kafka整合flume
  • 6、掌握kafka如何保证消息不丢失

2、kafka概述

2.1 kafka是什么

kafka是由linkedin开源,捐献apache基金会,它是一个实时的分布式消息队列。
它提供了一个对于实时处理下高可靠,高性能,高吞吐量、低延迟的平台

Kafka是一个分布式消息队列:生产者、消费者的功能。它提供了类似于JMS的特性,但是在设计实现上完全不同,此外它并不是JMS规范的实现。

2.2 消息队列的作用

  • 核心作用 : 解耦、异步、并行

2.3 kafka与activeMQ区别

kafka入门

activeMQ:它是一个严格的JMS(java message)框架实现,后期需要有严格的事务去控制

kafka:它并不是一个严格的JMS(java message)框架实现,它是类似于JMS框架 , 它会主动把数据从kafka集群中拉取过来,它追求的高吞吐量。

2.3.1、在架构模型方面
​ RabbitMQ遵循AMQP协议,RabbitMQ的broker由Exchange,Binding,queue组成,其中exchange和binding组成了消息的路由键;客户端Producer通过连接channel和server进行通信,Consumer从queue获取消息进行消费(长连接,queue有消息会推送到consumer端,consumer循环从输入流读取数据)。rabbitMQ以broker为中心;有消息的确认机制。
​ kafka遵从一般的MQ结构,producer,broker,consumer,以consumer为中心,消息的消费信息保存的客户端consumer上,consumer根据消费的点,从broker上批量pull数据;无消息确认机制。

2.3.2、在吞吐量
​ kafka具有高的吞吐量,内部采用消息的批量处理,zero-copy机制,数据的存储和获取是本地磁盘顺序批量操作,具有O(1)的复杂度,消息处理的效率很高。
rabbitMQ在吞吐量方面稍逊于kafka,他们的出发点不一样,rabbitMQ支持对消息的可靠的传递,支持事务,不支持批量的操作;基于存储的可靠性的要求存储可以采用内存或者硬盘。

2.3.3、在可用性方面
​ rabbitMQ支持miror的queue,主queue失效,miror queue接管。kafka的broker支持主备模式。

2.3.4、在集群负载均衡方面
​ kafka采用zookeeper对集群中的broker、consumer进行管理,可以注册topic到zookeeper上;通过zookeeper的协调机制,producer保存对应topic的broker信息,可以随机或者轮询发送到broker上;并且producer可以基于语义指定分片,消息发送到broker的某分片上。

3、kafka集群架构

kafka入门

  • 1、Producer

  • 生产者

  • 数据通过生产者写入到kafka集群中

  • 2、broker

  • kafka集群中每一个节点就是一个broker,后期kafka的数据就存放在每一个broker

  • 3、topic

  • 消息的主题,它是一类消息的聚集 , 每个topic将被分成多个partition(区),在集群的配置文件中配置。

  • 4、partition

  • 分区概念

  • 一个topic中有很多个分区,每一个分区就存在一部分数据。
    * 每个partition由多个segment组成
    * 任何发布到此partition的消息都会被直接追加到log文件的尾部
    * 每个partition在内存中对应一个index列表,记录每个segment中的第一条消息偏移。这样查找消息的时候,先在index列表中定位消息位置,再读取文件,速度快
    * 发布者发到某个topic的消息会被均匀的分布到多个part上,broker收到发布消息往对应part的最后一个segment上添加该消息。

  • 5、replication

  • 副本

  • 一个topic中有很多个分区,每一个分区构建多个副本,保证数据的安全可靠性

  • 6、segment

  • 它就是用来存储每一个分区中的数据,它里面包括了2类文件

  • 一个是log文件,它用于存在该分区的数据
    * 一个是index文件,它用于存在数据的索引信息数据

  • 就是为log文件中的数据构建索引

  • 方便后期能够快速定位到我们需要的数据在整个log文件的哪一块

    
    
    1
    2
    3
    4
    1  * 每个segment中存储多条消息,消息id由其逻辑位置决定,即从消息id可直接定位到消息的存储位置,避免id到位置的额外映射
    2  * 当某个segment上的消息条数达到配置值或消息发布时间超过阈值时,segment上的消息会被flush到磁盘,只有flush到磁盘上的消息订阅者才能订阅到
    3  * segment达到一定的大小(可以通过配置文件设定,默认1G)后将不会再往该segment写数据,broker会创建新的segment
    4
  • 7、zookeeper

  • 主要是使用zk帮我们管理kafka集群的元数据信息

  • kafka每一个broker地址
    * 所有topic的信息
    * 消费者的信息

  • 8、consumer

  • 消费者

  • 消费者后期去消费kafka集群中topic的数据

  • 条件

  • 1、kafka集群地址
    * 2、需要消费的topic名称
    * 3、消费的topic的偏移量(记录了消费的位置,从哪一块开始消费)

  • 9、offset

  • 偏移量

  • 它就是记录下每一个消费者消费的位置在哪里

    • 有2中保存方式
  • 第一种

  • 可以通过kafka集群自己去保存,这个时候由它自身有一个内置的topic去存储偏移量
    * __consumer_offsets

  • 它默认有50个分区,这些分区就存在了消费者消费数据的偏移量

    
    
    1
    2
    1  * 第二种
    2
  • 可以通过zk去保存

    • 作用


1
2
3
1它是记录了每一个消费者消费topic每一个分区的位置,好处:方便于后期消费者程序挂掉了,然后正常启动,启动之后,它会读取上一次消费的记录,继续向后面消费。
2
3

4、kafka集群安装部署

  • 1、下载对应的安装包

  • 访问kafka官网:kafka.apache.org

  • 2、规划安装目录

  • /export/servers

  • 3、上传安装包到服务器中

  • 4、解压安装包到指定的安装目录

  • tar -zxvf kafka_2.11-1.0.0.tgz -C /export/servers

  • 5、重命名解压目录

  • mv kafka_2.11-1.0.0 kafka

  • 6、修改配置文件

  • 在node1上进去到kafka安装目录下有一个config文件夹

  • vim server.properties , 修改和添加如下配置即可


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
1#指定broker的id,它是唯一标识,不能够重复
2broker.id=0
3
4#指定当前broker的服务地址
5host.name=node-1
6
7#kafka集群数据存放的目录
8log.dirs=/export/servers/kafka/kafka-logs
9
10#指定依赖zk的地址
11zookeeper.connect=node-1:2181,node-2:2181,node-3:2181
12
13#指定kafka中的topic是否可以删除,默认是false,表示不可以删除,改为true,可以删除
14delete.topic.enable=true
15
16
  • 7、配置kafka环境变量

  • vim /etc/profile


1
2
3
4
1export KAFKA_HOME=/export/servers/kafka
2export PATH=$PATH:$KAFKA_HOME/bin
3
4
  • 8、分发kafka安装目录和环境变量


1
2
3
4
5
6
1scp -r kafka node-2:/export/servers
2scp -r kafka node-3:/export/servers
3scp /etc/profile node-2:/etc
4scp /etc/profile node-3:/etc
5
6
  • 9、修改node-2和node-3配置文件信息

  • node-2

  • vim server.properties


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
1#指定broker的id,它是唯一标识,不能够重复
2broker.id=1
3
4#指定当前broker的服务地址
5host.name=node-2
6
7#kafka集群数据存放的目录
8log.dirs=/export/servers/kafka/kafka-logs
9
10#指定依赖zk的地址
11zookeeper.connect=node-1:2181,node-2:2181,node-3:2181
12
13#指定kafka中的topic是否可以删除,默认是false,表示不可以删除,改为true,可以删除
14delete.topic.enable=true
15
16

1
2
1* node-3
2
  • vim server.properties


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
1#指定broker的id,它是唯一标识,不能够重复
2broker.id=2
3
4#指定当前broker的服务地址
5host.name=node-3
6
7#kafka集群数据存放的目录
8log.dirs=/export/servers/kafka/kafka-logs
9
10#指定依赖zk的地址
11zookeeper.connect=node-1:2181,node-2:2181,node-3:2181
12
13#指定kafka中的topic是否可以删除,默认是false,表示不可以删除,改为true,可以删除
14delete.topic.enable=true
15
16
  • 10、让所有kafka节点环境变量生效

  • 在所有kafka节点执行

  • source /etc/profile

5、kafka集群启动和停止

  • 1、启动

  • 1、先启动zk集群

    • 2、然后再启动kafka集群
  • 需要再每一个kafka节点执行


1
2
3
1nohup kafka-server-start.sh /export/servers/kafka/config/server.properties > /dev/null 2>&1 &
2
3

1
2
1    * 一键启动脚本
2
  • vim start-kafka.sh


1
2
3
4
5
6
7
8
1#!/bin/sh
2for host in node-1 node-2 node-3
3do
4   ssh $host "source /etc/profile;nohup kafka-server-start.sh /export/servers/kafka/config/server.properties > /dev/null 2>&1 &"
5   echo "$host kafka is running"
6done
7
8

1
2
1    * sh start-kafka.sh
2
  • 2、停止

  • 1、需要再每一个台kafka节点执行

  • kafka-server-stop.sh


1
2
3
4
5
6
7
1这个脚本由于不同的linux版本,有一定问题(centos 6.x)
2
3ps ax | grep -i 'kafka\.Kafka' | grep java | grep -v grep | awk '{print $1}'
4改为
5ps ax | grep -i 'kafka' | grep java | grep -v grep | awk '{print $1}'      
6
7

1
2
1    * 一键关闭脚本
2

1
2
3
4
5
6
7
8
1#!/bin/sh
2for host in node-1 node-2 node-3
3do
4   ssh $host "source /etc/profile;kafka-server-stop.sh"
5   echo "$host kafka is stop"
6done
7
8

6、kafka管理命令的使用

  • 1、创建topic

  • kafka-topics.sh


1
2
3
4
5
6
7
8
9
1kafka-topics.sh --create --topic test --partitions 3 --replication-factor 2 --zookeeper node-1:2181,node-2:2181,node-3:2181
2
3--create :表示要创建
4--topic:指定要创建的topic名称
5--partitions:指定要创建的topic有几个分区
6--replication-factor:指定副本数
7--zookeeper:指定zk地址
8
9
  • 2、查看kafka集群有哪些topic

  • kafka-topics.sh


1
2
3
4
1kafka-topics.sh --list --zookeeper node-1:2181,node-2:2181,node-3:2181
2--list:查看kafka集群有哪些topic
3
4
  • 3、模拟一个生产者向topic发送数据

  • kafka-console-producer.sh


1
2
3
4
5
6
1kafka-console-producer.sh --topic test --broker-list node-1:9092,node-2:9092,node-3:9092
2
3--topic:指定向哪一个topic生产数据
4--broker-list :指定kafka集群地址
5
6
  • 4、模拟一个消费者去消费topic的数据

  • kafka-console-consumer.sh


1
2
3
4
5
6
7
8
9
1kafka-console-consumer.sh --bootstrap-server node-1:9092,node-2:9092,node-3:9092 --from-beginning  --topic test
2
3--bootstrap-server:指定kafka集群地址
4--from-beginning:指定从第一条数据开始消费
5--topic:指定消费哪一个topic数据
6
7kafka-console-consumer.sh --zookeeper node-1:2181,node-2:2181,node-3:2181 --from-beginning  --topic test
8
9
  • 5、删除topic

  • kafka-topics.sh


1
2
3
4
5
6
1kafka-topics.sh --delete --topic test --zookeeper node1:2181,node2:2181,node3:2181
2--delete:表示要删除操作
3--topic:指定要删除的topic名称
4--zookeeper :指定zk服务地址
5
6

7、kafka生产者和消费者java代码开发

  • 引入依赖


1
2
3
4
5
6
7
8
9
1<dependencies>
2    <dependency>
3        <groupId>org.apache.kafka</groupId>
4        <artifactId>kafka-clients</artifactId>
5        <version>1.0.0</version>
6    </dependency>
7</dependencies>
8
9

7.1 生产者代码开发


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
1package cn.itcast.kafka;
2
3import org.apache.kafka.clients.producer.KafkaProducer;
4import org.apache.kafka.clients.producer.Producer;
5import org.apache.kafka.clients.producer.ProducerRecord;
6
7import java.util.Properties;
8
9//todo:开发一个kafka的生产者代码
10public class KafkaProducerStudy {
11  public static void main(String[] args) {
12      Properties props = new Properties();
13      //kafka集群地址
14      props.put("bootstrap.servers", "node-1:9092,node-2:9092,node-3:9092");
15      //kafka的acks消息确认机制
16      //acks一共有4个选项
17      //-1和all:表示生产者发送数据给topic,需要所有该topic分区副本把数据保存正常
18      //1: 表示生产者发送数据给topic,只需要分区的主副本已经把数据保存正常
19      //0:生产者只管发数据,不需要确认,丢失数据可能性最高
20      props.put("acks", "all");
21      //重试次数
22      props.put("retries", 0);
23      //每个批次写入数据的大小
24      props.put("batch.size", 16384);
25      //延迟多久进行写入
26      props.put("linger.ms", 1);
27      //缓冲区的内存大小
28      props.put("buffer.memory", 33554432);
29      props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
30      props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
31
32       //设置自己的分区函数
33      props.put("partitioner.class","cn.itcast.kafka.MyPartitioner");
34
35      Producer<String, String> producer = new KafkaProducer<String, String>(props);
36      for (int i = 0; i < 100; i++)
37          //ProducerRecord<String, String> 有2个泛型 第一个String表示消息的key类型,在这里表示消息的标识,第二个String表示消息内容本身
38           //构建ProducerRecord对象需要3个参数:第一个是topic名称,第二个就是消息的key,第三个消息内容本身
39         // producer.send(new ProducerRecord<String, String>("test", Integer.toString(i), "hadoop spark"));
40         //kafka分区策略:4种分区策略
41            //1、指定具体的分区号,数据就按照指定的分区号,流入到对应分区中
42          //producer.send(new ProducerRecord<String, String>("test", 0,Integer.toString(i), "hadoop spark"));
43
44           //2、不指定具体的分区号,指定消息的key(不断变化) 按照key.hashcode%分区数=分区号,hashPartitioner
45          //producer.send(new ProducerRecord<String, String>("test", Integer.toString(i), "hadoop spark"));
46
47          //3、不指定具体的分区号,也不指定消息的key,它是采用轮训(随机)的方式写入到不同分区中。
48          //producer.send(new ProducerRecord<String, String>("test","hadoop spark"));
49
50          //4、自定义分区函数
51         producer.send(new ProducerRecord<String, String>("test", Integer.toString(i), "hadoop spark"));
52
53
54      producer.close();
55  }
56}
57
58

7.2 消费者代码开发

  • 1、自动提交偏移量


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
1package cn.itcast.kafka;
2
3import org.apache.kafka.clients.consumer.ConsumerRecord;
4import org.apache.kafka.clients.consumer.ConsumerRecords;
5import org.apache.kafka.clients.consumer.KafkaConsumer;
6
7import java.util.Arrays;
8import java.util.Properties;
9
10//todo:开发kafka的消费者代码-----自动提交偏移量
11public class KafkaConsumerStudy {
12    public static void main(String[] args) {
13        Properties props = new Properties();
14        //kafka集群地址
15        props.put("bootstrap.servers", "node-1:9092,node-2:9092,node-3:9092");
16        //消费者组id
17        props.put("group.id", "test");
18        //自动提交消费的偏移量
19        props.put("enable.auto.commit", "true");
20        //每隔多久提交一次偏移量
21        props.put("auto.commit.interval.ms", "1000");
22        //key反序列化类
23        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
24        //value 反序列化类
25        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
26
27        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
28        //指定消费的topic名称,可以有多个
29        consumer.subscribe(Arrays.asList("test"));
30        while (true) {
31              //指定数据拉取的时间间隔
32            ConsumerRecords<String, String> records = consumer.poll(100);
33            for (ConsumerRecord<String, String> record : records)
34                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
35        }
36    }
37}
38
39
40
  • 2、手动提交偏移量


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
1package cn.itcast.kafka;
2
3import org.apache.kafka.clients.consumer.ConsumerRecord;
4import org.apache.kafka.clients.consumer.ConsumerRecords;
5import org.apache.kafka.clients.consumer.KafkaConsumer;
6
7import java.util.ArrayList;
8import java.util.Arrays;
9import java.util.List;
10import java.util.Properties;
11
12//todo:开发一个kafka消费者程序-------自己提交偏移量
13public class KafkaConsumerManualOffset {
14    public static void main(String[] args) {
15        Properties props = new Properties();
16        //指定kafka集群地址
17        props.put("bootstrap.servers", "node1:9092,node2:9092,node3:9092");
18        //消费者组id
19        props.put("group.id", "test");
20        // 手动提交偏移量
21        props.put("enable.auto.commit", "false");
22        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
23        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
24        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
25        consumer.subscribe(Arrays.asList("test"));
26        final int minBatchSize = 200;
27        List<ConsumerRecord<String, String>> buffer = new ArrayList<ConsumerRecord<String, String>>();
28        while (true) {
29            ConsumerRecords<String, String> records = consumer.poll(100);
30            for (ConsumerRecord<String, String> record : records) {
31                buffer.add(record);
32            }
33            //判断下数据有没有超过200条数据
34            if (buffer.size() >= minBatchSize) {
35              //  insertIntoDb(buffer);   //具体的处理逻辑
36                System.out.println("当前总条数据:"+buffer.size());
37
38                //手动提交偏移量
39                consumer.commitSync();
40                buffer.clear();
41            }
42        }
43    }
44}
45
46
47

kafka入门

8、kafka的分区策略

  • 当前生产者产生的数据到底会流入到topic的哪一个分区中去?这里就涉及到kafka的分区策略

  • kafka分区策略:4种分区策略

  • 1、指定具体的分区号,数据就按照指定的分区号,流入到对应分区中


1
2
3
1producer.send(new ProducerRecord<String, String>("test", 0,Integer.toString(i), "hadoop spark"));
2
3

1
2
1* 2、不指定具体的分区号,指定消息的key(不断变化) 按照key.hashcode%分区数=分区号
2

1
2
3
1producer.send(new ProducerRecord<String, String>("test", Integer.toString(i), "hadoop spark"));
2
3

1
2
1* 3、不指定具体的分区号,也不指定消息的key,它是采用轮训(随机)的方式写入到不同分区中
2

1
2
3
1producer.send(new ProducerRecord<String, String>("test","hadoop spark"));
2
3

1
2
1* 4、自定义分区函数
2

1
2
3
4
5
6
1   //设置自己的分区函数
2props.put("partitioner.class","cn.itcast.kafka.MyPartitioner");
3
4producer.send(new ProducerRecord<String, String>("test", Integer.toString(i), "hadoop spark"));
5
6

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
1package cn.itcast.kafka;
2
3import org.apache.kafka.clients.producer.KafkaProducer;
4import org.apache.kafka.clients.producer.Producer;
5import org.apache.kafka.clients.producer.ProducerRecord;
6
7import java.util.Properties;
8
9//todo:开发一个kafka的生产者代码
10public class KafkaProducerStudy {
11    public static void main(String[] args) {
12        Properties props = new Properties();
13        //kafka集群地址
14        props.put("bootstrap.servers", "node1:9092,node2:9092,node3:9092");
15        //kafka的acks消息确认机制
16        props.put("acks", "all");
17        //重试次数
18        props.put("retries", 0);
19        //每个批次写入数据的大小
20        props.put("batch.size", 16384);
21        //延迟多久进行写入
22        props.put("linger.ms", 1);
23        //缓冲区的内存大小
24        props.put("buffer.memory", 33554432);
25        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
26        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
27
28         //设置自己的分区函数
29        props.put("partitioner.class","cn.itcast.kafka.MyPartitioner");
30
31        Producer<String, String> producer = new KafkaProducer<String, String>(props);
32        for (int i = 0; i < 100; i++)
33            //ProducerRecord<String, String> 有2个泛型 第一个String表示消息的key类型,在这里表示消息的标识,第二个String表示消息内容本身
34             //构建ProducerRecord对象需要3个参数:第一个是topic名称,第二个就是消息的key,第三个消息内容本身
35           // producer.send(new ProducerRecord<String, String>("test", Integer.toString(i), "hadoop spark"));
36           //kafka分区策略:4种分区策略
37              //1、指定具体的分区号,数据就按照指定的分区号,流入到对应分区中
38            //producer.send(new ProducerRecord<String, String>("test", 0,Integer.toString(i), "hadoop spark"));
39
40             //2、不指定具体的分区号,指定消息的key(不断变化) 按照key.hashcode%分区数=分区号,hashPartitioner
41            //producer.send(new ProducerRecord<String, String>("test", Integer.toString(i), "hadoop spark"));
42
43            //3、不指定具体的分区号,也不指定消息的key,它是采用轮训(随机)的方式写入到不同分区中。
44            //producer.send(new ProducerRecord<String, String>("test","hadoop spark"));
45
46            //4、自定义分区函数
47           producer.send(new ProducerRecord<String, String>("test", Integer.toString(i), "hadoop spark"));
48
49
50        producer.close();
51    }
52}
53
54
55
  • 自定义分区函数


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
1package cn.itcast.kafka;
2
3import org.apache.kafka.clients.producer.Partitioner;
4import org.apache.kafka.common.Cluster;
5
6import java.util.Map;
7
8//自定义分区函数
9public class MyPartitioner implements Partitioner{
10    /**
11     * 该方法会返回一个分区号
12     * @param topic  topic的名称
13     * @param key     消息的key
14     * @param keyBytes  消息的key字节数组
15     * @param value    消息的内容
16     * @param valueBytes  消息的内容字节数组
17     * @param cluster    kafka集群对象
18     * @return
19     */
20    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
21        //自己去实现hashPartitioner       key.hashcode%分区数=分区号
22        int numPartions = cluster.partitionsForTopic("test").size();
23        //test有3个分区,对应的分区号就分别为:0 1 2
24             // -2 -1 0 1 2
25
26        return  Math.abs(key.hashCode()%numPartions);
27    }
28
29    public void close() {
30
31    }
32
33    public void configure(Map<String, ?> configs) {
34
35    }
36}
37
38
39

9、kafka文件存储机制

可以见参考资源《kafka的文件存储机制.md》

kafka入门

note:

  • 一个topic中的分区数据只能够被同一个消费者组的一个线程取消费
  • 不同消费者组的线程可以同时消费一个topic中的同一个分区数据

​ 生产者在生产数据的时候 , 是有一定的顺序 , 这些数据按照不同的分区规则写入到不同的分区中 . 这个时候 , 消费者在消费数据的时候 , 它是以分区为单位进行消费 , 只有一个消费者的时候 , 先消费哪一个分区 , 然后再消费其他分区 , 这个时候就无法保证消费的顺序 .

​ 如果想要保证生产的数据的顺序和消费数据的顺序一致 , 在这里只能够设置当前topic的分区数就是一个 , 对于kafka框架来说 , 它是一个分布式消息队列 , 这种设置与它的分布式的理念是有违背的.

​ 为什么kafka可以快速的定义那一条数据在哪?

  1. 通过log文件命令规则使用二分查询 , 快速定义要找的数据在哪个文件中
  2. 然后在通过对应的index文件 , 为数据构建了稀疏索引 , 并不是为每一条数据构建索引 , 这是为了避免空间浪费 , 后期通过索引文件快速定义要找的数据在整个log文件的哪一行

具体原因参考《kafka为什么那么快.md》

10、kafka整合flume

kafka入门

  • 1、安装flume和kafka

  • 2、修改flume配置

  • vim flume-kafka.conf


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
1#为我们的source channel  sink起名
2a1.sources = r1
3a1.channels = c1
4a1.sinks = k1
5#指定我们的source收集到的数据发送到哪个管道
6a1.sources.r1.channels = c1
7#指定我们的source数据收集策略
8a1.sources.r1.type = spooldir
9a1.sources.r1.spoolDir = /export/servers/flumedata
10a1.sources.r1.deletePolicy = never
11a1.sources.r1.fileSuffix = .COMPLETED
12a1.sources.r1.ignorePattern = ^(.)*\\.tmp$
13a1.sources.r1.inputCharset = utf-8
14#指定我们的channel为memory,即表示所有的数据都装进memory当中
15a1.channels.c1.type = memory
16#指定我们的sink为kafka sink,并指定我们的sink从哪个channel当中读取数据
17a1.sinks.k1.channel = c1
18a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
19a1.sinks.k1.kafka.topic = test
20a1.sinks.k1.kafka.bootstrap.servers = node-1:9092,node-2:9092,node-3:9092
21a1.sinks.k1.kafka.flumeBatchSize = 20
22a1.sinks.k1.kafka.producer.acks = 1
23
24
  • 3、启动flume(需要在flume文件目录下)


1
2
3
1bin/flume-ng agent -n a1 -c conf  -f conf/flume-kafka.conf -Dflume.root.logger=info,console
2
3

11、kafka如何保证数据不丢失

  • 1、生产者保证数据不丢失

  • 就是利用kafka的ack机制

  • 同步模式


1
2
3
4
5
6
1//指定为同步模式
2producer.type=sync
3//ack确认机制等于1,只需要主副本确认数据保存成功就可以了,后期从副本自己去同步数据
4request.required.acks=1
5
6

1
2
1    * 异步模式
2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
1//指定为异步模式
2producer.type=async
3//ack确认机制等于1,只需要主副本确认数据保存成功就可以了,后期从副本自己去同步数据
4request.required.acks=1
5//指定数据缓存到什么时候发送出去
6queue.buffering.max.ms=5000
7//指定数据缓存到多少条之后发送出去
8queue.buffering.max.messages=10000
9//数据达到了发送的阈值,后期由于一些原因导致数据并没有发送出去,这个时候对于缓存的数据是否保留,-1保留未成功发送的数据, 0就是不保留,直接舍弃掉。
10queue.enqueue.timeout.ms = -1
11//每次发送的数据量条数
12batch.num.messages=200
13
14
  • 2、broker—kafka集群自己本身

  • kafka中有很多个topic,每一个topic有很多个分区,每一个分区有多个副本。通过多副本机制保证数据的安全性

  • 3、消费者保证数据不丢失

  • 每一个消费者在消费数据的时候,都把当前消费的位置记录下来,后续消费者程序挂掉了,然后正常重启,读取上一次消费的偏移量offset,接着上一次继续消费。

12、kafkaManager监控工具的安装与使用

可以参考资料《kafka_manager监控工具的安装与使用.md》文档

给TA打赏
共{{data.count}}人
人已打赏
安全网络

CDN安全市场到2022年价值76.3亿美元

2018-2-1 18:02:50

安全运维

解析Linux内核的同步与互斥机制(二)

2021-8-18 16:36:11

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索