kafka原理系列之(一)消息存储和offset提交机制

释放双眼,带上耳机,听听看~!

kafka之消息存储和offset提交机制

Kafka具有存储功能,默认保存数据时间为7天或者大小1G,也就是说kafka broker上的数据超7天或者1G,
就会被清理掉。这些数据存放在broker服务器上,以log文件的形式存在。
kafka的安装目录下面的/conf/server.propertites文件中中设置:


1
2
3
4
5
6
1### 日志保存时间 (hours|minutes),默认为7天(168小时)。超过这个时间会根据policy处理数据。bytes和minutes无论哪个先达到都会触发。
2log.retention.hours=168
3##日志数据存储的最大字节数。超过这个时间会根据policy处理数据。
4#log.retention.bytes=1073741824
5
6

1、kafka的日志

log的路径配置在conf/server.properties配置文件中,
log.dirs = /log1
我的日志路径配置在log1文件夹下,demo1是topic名称,只有一个分区和副本。
log文件的命名那一长串0(比如/logs1/demo1-0/00000000000000000000.log),
是这个日志文件的offset位置。当日志文件达到时间或者大小的上限时,
就会生成下一个日志文件,命名的就是下一个offset位置了。
另外在/logs1/demo1-0还有两个文件,index文件存放的是topic的offset,timeindex是存放的是时间戳

2、那么consumer的offset是怎么存储的呢?

consumer有两种消息方式,一种是存放在broker的日志目录中,
另一种方式是存放在zookeeper中。两种存放方式和你使用kafka-console-consumer命令使用的选项有关。
如果使用的是bootstrap-server,那么就存放在broker;如果使用的是–zookeeper那么就存放在zookeeper。
broker存放offset是kafka从0.9版本开始,提供的新的消费方式。原因是zookeeper来存放,
还是有许多弊端,不方便灵活控制,效率不高。

3、offset提交的应用场景

offset:指的是kafka的topic中的每个消费组消费的下标。
简单的来说就是一条消息对应一个offset下标,每次消费数据的时候如果提交offset,那么下次消费就会从提交的offset加一那里开始消费。
比如一个topic中有100条数据,我消费了50条并且提交了,那么此时的kafka服务端记录提交的offset就是49(offset从0开始),
那么下次消费的时候offset就从50开始消费。

offset提交的方式有两种,自动提交和手动提交。
conf/server.properties配置文件的参数设置:
enable.auto.commit = true (那么这个是自动提交,false为手动)

但是offset下标自动提交其实在很多场景都不适用,因为自动提交是在kafka拉取到数据之后就直接提交,
这样很容易丢失数据,尤其是在需要事物控制的时候。很多情况下我们需要从kafka成功拉取数据之后,
对数据进行相应的处理之后再进行提交。如拉取数据之后进行写入mysql这种,
所以这时我们就需要进行手动提交kafka的offset下标。

4、offset提交的方式

4.1、自动提交偏移量:

Kafka中偏移量的自动提交是由参数enable_auto_commit和auto_commit_interval_ms控制的,
当enable_auto_commit=True时,Kafka在消费的过程中会以频率为auto_commit_interval_ms
向Kafka自带的topic(__consumer_offsets)进行偏移量提交,具体提交到哪个Partation是以算法:
partation=hash(group_id)%50来计算的。

如:group_id=test_group_1,则partation=hash(“test_group_1”)%50=28

4.2、手动提交偏移量:

对于手动提交offset主要有3种方式:
1.同步提交 2.异步提交 3.异步+同步 组合的方式提交

1)同步手动提交偏移量

同步模式下提交失败的时候一直尝试提交,直到遇到无法重试的情况下才会结束,
同时同步方式下消费者线程在拉取消息会被阻塞,在broker对提交的请求做出响应之前,
会一直阻塞直到偏移量提交操作成功或者在提交过程中发生异常,限制了消息的吞吐量。
只有当前批次的消息提交完成时才会触发poll来获取下一轮的消息。

2)异步手动提交偏移量+回调函数

异步手动提交offset时,消费者线程不会阻塞,提交失败的时候也不会进行重试,
并且可以配合回调函数在broker做出响应的时候记录错误信息。
对于异步提交,由于不会进行失败重试,当消费者异常关闭或者触发了再均衡前,
如果偏移量还未提交就会造成偏移量丢失。

3)异步+同步 组合的方式提交偏移量

针对异步提交偏移量丢失的问题,通过对消费者进行异步批次提交并且在关闭时同步提交的方式,
这样即使上一次的异步提交失败,通过同步提交还能够进行补救,同步会一直重试,直到提交成功。
通过finally在最后不管是否异常都会触发consumer.commit()来同步补救一次,确保偏移量不会丢失。

查看kafka的topic的的log日志文件的内容:
bin/kafka-run-class.sh kafka.tools.DumpLogSegments –files /temp/kafka-logs/orderPayMsg-0/00000000000000000000.log –print-data-log

其中/temp/kafka-logs是对应kafka的log路径,也就是log.dirs的值。

相关实战链接:
kafka的offset保存实例kafka的offset提交机制实例(python版)

给TA打赏
共{{data.count}}人
人已打赏
安全运维

Elasticsearch模块功能之-索引分片分配(Index shard allocation)

2021-12-11 11:36:11

安全运维

Ubuntu上NFS的安装配置

2021-12-19 17:36:11

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索