文章目录
-
分布式消息队列Kafka
-
架构
-
组成架构
* 好处
* Api -
命令行操作
* Java Api
* 1.producer1
2
3
4
5
6
7
8
9
10
11
12
131 * 1)异步提交
2 * 2)同步提交
3
4 * 2.Consumer
5
6 * 1)自动提交
7 * 2)手动提交
8 * 3)自定义存储offset
9
10 * 3.自定义Interceptor
11
12 * 运行机制
13 -
生产者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
291 * 1.分区的原因
2 * 2.分区原则
3 * 3.数据的可靠性
4 * 4.ISR机制
5 * 5.ack应答机制
6 * 6.数据的一致性保证(HW、LEO机制)
7 * 7.exactly机制()
8 * 8.server.properties配置说明
9 * 9.producer配置文件说明
10 * 10.consumer配置文件说明
11
12 * 消费者
13
14 * 1.消费方式
15 * 2.分区分配策略
16 * 3.offset的维护
17 * 4. 消费者组
18
19 * kafka高效读写数据
20
21 * 1.顺序写磁盘
22 * 2.零复制拷贝
23
24 * kafka事务
25
26 * 1.zookeeper在kafka中的作用
27 * 2.producer事务
28 * 3.customer事务
29
分布式消息队列Kafka
Kafka 是一个分布式的基于发布/订阅模式的消息队列(Message Queue),主要应用于大数据实时处理领域。
kafka是最初由linkedin公司开发的,使用scala语言编写,kafka是一个分布式,分区的,多副本的,多订阅者的日志系统(分布式MQ系统),可以用于搜索日志,监控日志,访问日志等
Kafka is a distributed,partitioned,replicated commit logservice。它提供了类似于的特性,但是在实现上完全不同,此外它并不是规范的实现。对消息保存时根据进行归类,发送消息者成为消息接受者成为此外集群有多个实例组成,每个实例成为。无论是集群,还是和都依赖于来保证系统可用性集群保存一些信息
架构
组成架构
- Producer:消息生产者,就是向 kafka broker 发消息的客户端;
2)Consumer:消息消费者,向 kafka broker 取消息的客户端;
Consumer Group(CG):消费者组,由多个 consumer 组成。消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内一个消费者消费;消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。
Broker:一台 kafka 服务器就是一个 broker。一个集群由多个 broker 组成。一个 broker可以容纳多个 topic。
Topic:可以理解为一个队列, 生产者和消费者面向的都是一个 topic ;
Partition :为了实现扩展性,一个非常大的 topic 可以分布到多个 broker(即服务器)上, 一个 topic 可以分为多个partition ,每个 partition 是一个有序的队列;
Replica :副本,为保证集群中的某个节点发生故障时,该节点上的 partition 数据不丢失,且kafka 仍然能够继续工作,kafka 提供了副本机制,一个 topic 的每个分区都有若干个副本,一个 leader 和若干个 follower。
leader :每个分区多个副本的“主”,生产者发送数据的对象,以及消费者消费数据的对象都是 leader。
follower: 每个分区多个副本中的从,实时从中同步数据,保持和数据的同步。发生故障时,某个follower
好处
可靠性:分布式的,分区,复制和容错的。
可扩展性:kafka消息传递系统轻松缩放,无需停机。
耐用性:kafka使用分布式提交日志,这意味着消息会尽可能快速的保存在磁盘上,因此它是持久的。
性能:kafka对于发布和订阅消息都具有高吞吐量。即使存储了许多TB的消息,他也爆出稳定的性能。
kafka非常快:保证零停机和零数据丢失。
Api
-
命令行操作
1.查看所有topic
bin/kafka-topics.sh –zookeeper hadoop102:2181 –list
2.创建topic
bin/kafka-topics.sh –zookeeper hadoop102:2181 –create –replication-factor 3 –partitions 1 –topic
first
3.删除topic
bin/kafka-topics.sh –zookeeper hadoop102:2181 –delete –topic first
4.启用produce发送消息
bin/kafka-console-producer.sh –broker-list hadoop102:9092 –topic first
5.消费消息
bin/kafka-console-consumer.sh –zookeeper hadoop102:2181 –topic first //读取当前的消息
bin/kafka-console-consumer.sh –bootstrap-server hadoop102:9092 –from-beginning –topic first //读取所有的消息(包含之前的消息)
6.查看某个topic的信息
bin/kafka-topics.sh –zookeeper hadoop102:2181 –describe –topic first
7.修改分区信息
bin/kafka-topics.sh –zookeeper hadoop102:2181 –alter –topic first –partitions 6
-
Java Api
1.producer
Kafka 的 Producer 发送消息采用的是异步发送的方式。在消息发送的过程中,涉及到了两个线程—main线程和Sender线程,以及一个线程共享变量—RecordAccumulator。main 线程将消息发送给RecordAccumulator,Sender 线程不断从 RecordAccumulator 中拉取消息发送到 Kafkabroker。
1)异步提交
需要用到的类:
KafkaProducer:需要创建一个生产者对象,用来发送数据
ProducerConfig:获取所需的一系列配置参数
ProducerRecord:每条数据都要封装成一个ProducerRecord 对象
producer的send方法默认是异步,由两个重载方法组成。带回调函数的方法参数为RecordMetadata 和 Exception。如果Exception为null,说明没有产生异常。
异步提交失败会自动重试。
2)同步提交
send方法返回的是一个Future对象,我们可以利用这一点来实现同步提交。如:send().get(),若提交没有返回结果,则会阻塞当前线程。
2.Consumer
Consumer 消费数据时的可靠性是很容易保证的,因为数据在 Kafka 中是持久化的,故不用担心数据丢失问题。
由于 consumer 在消费过程中可能会出现断电宕机等故障,consumer 恢复后,需要从故障前的位置的继续消费,所以 consumer 需要实时记录自己消费到了哪个 offset,以便故障恢复后继续消费。
所以 offset 的维护是 Consumer 消费数据是必须考虑的问题。
1)自动提交
需要用到的类:
KafkaConsumer:需要创建一个消费者对象,用来消费数据
ConsumerConfig:获取所需的一系列配置参数
ConsuemrRecord:每条数据都要封装成一个ConsumerRecord 对象
我们可以通过设置两个参数来实现自动提交
enable.auto.commit:是否开启自动提交 offset 功能
auto.commit.interval.ms:自动提交 offset 的时间间隔
2)手动提交
手动提交需要先将自动提交关闭。一般使用异步的,因为同步提交会阻塞线程,降低吞吐量
手动提交 offset 的方法有两种:分别是 commitSync(同步提交)和 commitAsync(异步提交)。两者的相同点是,都会将本次 poll 的一批数据最高的偏移量提交;不同点是,
commitSync 阻塞当前线程,一直到提交成功,并且会自动失败重试(由不可控因素导致,也会出现提交失败);而 commitAsync 则没有失败重试机制,故有可能提交失败。
3)自定义存储offset
Kafka 0.9 版本之前,offset 存储在 zookeeper,0.9 版本及之后,默认将 offset 存储在 Kafka的一个内置的 topic 中。除此之外,Kafka 还可以选择自定义存储 offset。
offset 的维护是相当繁琐的,因为需要考虑到消费者的 Rebalace。
当有新的消费者加入消费者组、已有的消费者推出消费者组或者所订阅的主题的分区发生变化,就会触发到分区的重新分配,重新分配的过程叫做Rebalance。
消费者发生 Rebalance 之后,每个消费者消费的分区就会发生变化。因此消费者要首先获取到自己被重新分配到的分区,并且定位到每个分区最近提交的 offset 位置继续消费。
要实现自定义存储 offset需要用到ConsumerRebalanceListener类。
3.自定义Interceptor
自定义拦截器中的方法
(1)configure(configs)
获取配置信息和初始化数据时调用。
(2)onSend(ProducerRecord):
该方法封装进 KafkaProducer.send 方法中,即它运行在用户主线程中。Producer 确保在消息被序列化以及计算分区前调用该方法。用户可以在该方法中对消息做任何操作,但最好保证不要修改消息所属的 topic 和分区,否则会影响目标分区的计算。
(3)onAcknowledgement(RecordMetadata, Exception):
该方法会在消息从RecordAccumulator 成功发送到 Kafka Broker 之后,或者在发送过程中失败时调用。并且通常都是在 producer 回调逻辑触发之前。onAcknowledgement 运行在producer 的 IO 线程中,因此不要在该方法中放入很重的逻辑,否则会拖慢 producer 的消息发送效率。
(4)close:
关闭 interceptor,主要用于执行一些资源清理工作.
如前所述,interceptor 可能被运行在多个线程中,因此在具体实现时用户需要自行确保线程安全。另外倘若指定了多个 interceptor,则 producer 将按照指定顺序调用它们,并仅仅是捕获每个 interceptor 可能抛出的异常记录到错误日志中而非在向上传递。这在使用过程中要特别留意。
运行机制
-
生产者
1.分区的原因
(1)方便在集群中扩展,每个 Partition 可以通过调整以适应它所在的机器,而一个 topic又可以有多个Partition 组成,因此整个集群就可以适应任意大小的数据了;
(2 )可以提高并发,因为可以以Partition 为单位读写了。
2.分区原则
我们需要将 producer 发送的数据封装成一个 ProducerRecord 对象。
(1)指明 partition的情况下,直接将指明的值直接作为 partiton 值;
(2)没有指明 partition值但有key的情况下,将key的hash值与topic的partition数进行取余得到 partition 值;
(3)都未指定,使用partitionround-robin策略
3.数据的可靠性
为保证 producer 发送的数据,能可靠的发送到指定的 topic,topic 的每个partition 收到producer
发送的数据后,都需要向 producer 发送 ack(acknowledgement 确认收到),如果producer
收到 ack,就会进行下一轮的发送,否则重新发送数据。
4.ISR机制
Leader 维护了一个动态的 in-sync replica set (ISR),意为和 leader 保持同步的 follower 集合。当 ISR 中的 follower 完成数据的同步之后,leader 就会给 follower 发送 ack。如果 follower长时间 未 向 leader 同 步 数 据 , 则 该 follower 将 被 踢 出 ISR , 该 时 间 阈 值 由replica.lag.time.max.ms 参数设定。Leader
发生故障之后,就会从 ISR 中选举新的 leader。
5.ack应答机制
0:producer 不等待 broker 的 ack,这一操作提供了一个最低的延迟,broker 一接收到,但是还没有写入磁盘就已经返回,当 broker 故障时有可能丢失数据;
1:等待的leader的落盘成功后返回,如果在同步成功之前故障,那么将会丢失数据
-1(all):producer 等待 broker的 ack,partition 的 leader和 follower 全部落盘成功后才返回 ack。但是如果在 follower同步完成后,broker 发送 ack 之前,leader 发生故障,那么会造成数据重复。
6.数据的一致性保证(HW、LEO机制)
LEO:指的是每个副本最大的offset;
HW:指的是消费者能见到的最大的 offset,ISR 队列中最小的 LEO
(1) follower 故障
follower 发生故障后会被临时踢出 ISR,待该 follower 恢复后,follower 会读取本地磁盘记录的上次的 HW,并将 log 文件高于 HW 的部分截取掉,从 HW 开始向 leader 进行同步。等该 follower 的 LEO 大于等于该 Partition 的 HW,即 follower 追上 leader 之后,就可以重新加入 ISR 了。
(2) leader 故障
leader 发生故障之后,会从中选出一个新的,之后,为保证多个副本之间的数据一致性,其余的会先将各自的文件高于的部分截掉,然后从新的同步数据
注意:这只能保证副本之间的数据一致性,并不能保证数据不丢失或者不重复.
7.exactly机制()
1) kafka的数据消费模型:
exactly once:消费且仅消费一次
at least once:最少消费一次 出现数据重复消费的问题
at most once : 至多消费一次 出现数据丢失的问题
2) 0.11 版本的 Kafka,引入了一项重大特性:幂等性。所谓的幂等性就是指Producer 不论向 Server 发送多少次重复数据,Server 端都只会持久化一条。幂等性结合 At Least Once 语义,就构成了 Kafka 的Exactly Once 语义。即:
At Least Once + 幂等性 (开启幂等性需要将 Producer 的参数中 enable.idompotence 设置为 true)
3) 开启幂等性后,发往同一 Partition 的消息会附带 Sequence Number。而Broker 端会对<PID, Partition, SeqNumber>做缓存,当具有相同主键的消息提交时,Broker 只会持久化一条。
但是PID 重启就会变化,同时不同的 Partition 也具有不同主键,所以幂等性无法保证跨分区跨会话的Exactly Once。
8.server.properties配置说明
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67 1#broker的全局唯一编号,不能重复
2broker.id=0
3
4#用来监听链接的端口,producer或consumer将在此端口建立连接
5port=9092
6
7#处理网络请求的线程数量
8num.network.threads=3
9
10#用来处理磁盘IO的线程数量
11num.io.threads=8
12
13#发送套接字的缓冲区大小
14socket.send.buffer.bytes=102400
15
16#接受套接字的缓冲区大小
17socket.receive.buffer.bytes=102400
18
19#请求套接字的缓冲区大小
20socket.request.max.bytes=104857600
21
22#kafka运行日志存放的路径
23log.dirs=/export/data/kafka/
24
25#topic在当前broker上的分片个数
26num.partitions=2
27
28#用来恢复和清理data下数据的线程数量
29num.recovery.threads.per.data.dir=1
30
31#segment文件保留的最长时间,超时将被删除
32log.retention.hours=1
33
34#滚动生成新的segment文件的最大时间
35log.roll.hours=1
36
37#日志文件中每个segment的大小,默认为1G
38log.segment.bytes=1073741824
39
40#周期性检查文件大小的时间
41log.retention.check.interval.ms=300000
42
43#日志清理是否打开
44log.cleaner.enable=true
45
46#broker需要使用zookeeper保存meta数据
47zookeeper.connect=zk01:2181,zk02:2181,zk03:2181
48
49#zookeeper链接超时时间
50zookeeper.connection.timeout.ms=6000
51
52#partion buffer中,消息的条数达到阈值,将触发flush到磁盘
53log.flush.interval.messages=10000
54
55#消息buffer的时间,达到阈值,将触发flush到磁盘
56log.flush.interval.ms=3000
57
58#删除topic需要server.properties中设置delete.topic.enable=true否则只是标记删除
59delete.topic.enable=true
60
61#此处的host.name为本机IP(重要),如果不改,则客户端会抛出:Producer connection to localhost:9092 unsuccessful 错误!
62host.name=kafka01
63
64advertised.host.name=192.168.140.128
65
66
67
9.producer配置文件说明
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55 1#指定kafka节点列表,用于获取metadata,不必全部指定
2metadata.broker.list=node01:9092,node02:9092,node03:9092
3# 指定分区处理类。默认kafka.producer.DefaultPartitioner,表通过key哈希到对应分区
4#partitioner.class=kafka.producer.DefaultPartitioner
5# 是否压缩,默认0表示不压缩,1表示用gzip压缩,2表示用snappy压缩。压缩后消息中会有头来指明消息压缩类型,故在消费者端消息解压是透明的无需指定。
6compression.codec=none
7# 指定序列化处理类
8serializer.class=kafka.serializer.DefaultEncoder
9# 如果要压缩消息,这里指定哪些topic要压缩消息,默认empty,表示不压缩。
10#compressed.topics=
11
12# 设置发送数据是否需要服务端的反馈,有三个值0,1,-1
13# 0: producer不会等待broker发送ack
14# 1: 当leader接收到消息之后发送ack
15# -1: 当所有的follower都同步消息成功后发送ack.
16request.required.acks=0
17
18# 在向producer发送ack之前,broker允许等待的最大时间 ,如果超时,broker将会向producer发送一个error ACK.意味着上一次消息因为某种原因未能成功(比如follower未能同步成功)
19request.timeout.ms=10000
20
21# 同步还是异步发送消息,默认“sync”表同步,"async"表异步。异步可以提高发送吞吐量,
22也意味着消息将会在本地buffer中,并适时批量发送,但是也可能导致丢失未发送过去的消息
23producer.type=sync
24
25# 在async模式下,当message被缓存的时间超过此值后,将会批量发送给broker,默认为5000ms
26# 此值和batch.num.messages协同工作.
27queue.buffering.max.ms = 5000
28
29# 在async模式下,producer端允许buffer的最大消息量
30# 无论如何,producer都无法尽快的将消息发送给broker,从而导致消息在producer端大量沉积
31# 此时,如果消息的条数达到阀值,将会导致producer端阻塞或者消息被抛弃,默认为10000
32queue.buffering.max.messages=20000
33
34# 如果是异步,指定每次批量发送数据量,默认为200
35batch.num.messages=500
36
37# 当消息在producer端沉积的条数达到"queue.buffering.max.meesages"后
38# 阻塞一定时间后,队列仍然没有enqueue(producer仍然没有发送出任何消息)
39# 此时producer可以继续阻塞或者将消息抛弃,此timeout值用于控制"阻塞"的时间
40# -1: 无阻塞超时限制,消息不会被抛弃
41# 0:立即清空队列,消息被抛弃
42queue.enqueue.timeout.ms=-1
43
44
45# 当producer接收到error ACK,或者没有接收到ACK时,允许消息重发的次数
46# 因为broker并没有完整的机制来避免消息重复,所以当网络异常时(比如ACK丢失)
47# 有可能导致broker接收到重复的消息,默认值为3.
48message.send.max.retries=3
49
50# producer刷新topic metada的时间间隔,producer需要知道partition leader的位置,以及当前topic的情况
51# 因此producer需要一个机制来获取最新的metadata,当producer遇到特定错误时,将会立即刷新
52# (比如topic失效,partition丢失,leader失效等),此外也可以通过此参数来配置额外的刷新机制,默认值600000
53topic.metadata.refresh.interval.ms=60000
54
55
10.consumer配置文件说明
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36 1# zookeeper连接服务器地址
2zookeeper.connect=zk01:2181,zk02:2181,zk03:2181
3# zookeeper的session过期时间,默认5000ms,用于检测消费者是否挂掉
4zookeeper.session.timeout.ms=5000
5#当消费者挂掉,其他消费者要等该指定时间才能检查到并且触发重新负载均衡
6zookeeper.connection.timeout.ms=10000
7# 指定多久消费者更新offset到zookeeper中。注意offset更新时基于time而不是每次获得的消息。一旦在更新zookeeper发生异常并重启,将可能拿到已拿到过的消息
8zookeeper.sync.time.ms=2000
9#指定消费
10group.id=itcast
11# 当consumer消费一定量的消息之后,将会自动向zookeeper提交offset信息
12# 注意offset信息并不是每消费一次消息就向zk提交一次,而是现在本地保存(内存),并定期提交,默认为true
13auto.commit.enable=true
14# 自动更新时间。默认60 * 1000
15auto.commit.interval.ms=1000
16# 当前consumer的标识,可以设定,也可以有系统生成,主要用来跟踪消息消费情况,便于观察
17conusmer.id=xxx
18# 消费者客户端编号,用于区分不同客户端,默认客户端程序自动产生
19client.id=xxxx
20# 最大取多少块缓存到消费者(默认10)
21queued.max.message.chunks=50
22# 当有新的consumer加入到group时,将会reblance,此后将会有partitions的消费端迁移到新 的consumer上,如果一个consumer获得了某个partition的消费权限,那么它将会向zk注册 "Partition Owner registry"节点信息,但是有可能此时旧的consumer尚没有释放此节点, 此值用于控制,注册节点的重试次数.
23rebalance.max.retries=5
24
25# 获取消息的最大尺寸,broker不会像consumer输出大于此值的消息chunk 每次feth将得到多条消息,此值为总大小,提升此值,将会消耗更多的consumer端内存
26fetch.min.bytes=6553600
27
28# 当消息的尺寸不足时,server阻塞的时间,如果超时,消息将立即发送给consumer
29fetch.wait.max.ms=5000
30socket.receive.buffer.bytes=655360
31# 如果zookeeper没有offset值或offset值超出范围。那么就给个初始的offset。有smallest、largest、anything可选,分别表示给当前最小的offset、当前最大的offset、抛异常。默认largest
32auto.offset.reset=smallest
33# 指定序列化处理类
34derializer.class=kafka.serializer.DefaultDecoder
35
36
-
消费者
1.消费方式
kafka支持pull(拉)和push(推)两种方式。但是push有可能会导致customer来不及消费,导致网络拥堵或者拒绝连接。pull由消费者自己选择拉取的数据内容、大小,但如果生产者没有数据,可能会导致死循环,所以我们通常设置一个超时时间来返回。
2.分区分配策略
1.RoundRobin 按照消费者组轮询选择消费的分区
2.Range 按照topic分配分区
3.offset的维护
Kafka 0.9 版本之前,consumer 默认将 offset 保存在 Zookeeper 中,从 0.9 版本开始,
consumer 默认将 保存在 一个内置的 中,该 topic 为 consumer_offsets。
1)修改配置文件 consumer.properties
exclude.internal.topics=false
2)读取 offset
0.11.0.0 之前版本:
bin/kafka-console-consumer.sh –topic __consumer_offsets – zookeeper hadoop102:2181 –formatter “kafka.coordinator.GroupMetadataManager$OffsetsMessageFormatter” –consumer.config config/consumer.properties –from-beginning
0.11.0.0 之后版本(含):
4. 消费者组
需要修改 consumer.properties 中的group.id
-
kafka高效读写数据
1.顺序写磁盘
Kafka 的 producer 生产数据,要写入到 log 文件中,写的过程是一直追加到文件末端,为顺序写。官网有数据表明,同样的磁盘,顺序写能到 600M/s,而随机写只有 100K/s。这与磁盘的机械机构有关,顺序写之所以快,是因为其省去了大量磁头寻址的时间。
2.零复制拷贝
-
kafka事务
1.zookeeper在kafka中的作用
Kafka 集群中有一个 broker 会被选举为 Controller,负责管理集群 broker 的上下线,所有 topic 的分区副本分配和 leader 选举等工作。
2.producer事务
为了跨分区、跨会话的事务,我们需要引入一个事务id,并将producer获得的pid与之绑定。我们可以通过Transaction Coordinator来获得事务的状态以及将事务写入一个事务topic。
3.customer事务
customer并没有严格的事务,因为customer可以访问任意信息,无法保证commit的事务被精准消费。同一事务的消息重启后可能会被删除。