如何管理Spark Streaming消费Kafka的偏移量(三)

释放双眼,带上耳机,听听看~!

前面的文章已经介绍了在spark streaming集成kafka时,如何处理其偏移量的问题,由于spark streaming自带的checkpoint弊端非常明显,所以一些对数据一致性要求比较高的项目里面,不建议采用其自带的checkpoint来做故障恢复。

在spark streaming1.3之后的版本支持direct kafka stream,这种策略更加完善,放弃了原来使用Kafka的高级API自动保存数据的偏移量,之后的版本采用Simple API也就是更加偏底层的api,我们既可以用checkpoint来容灾,也可以通过低级api来获取偏移量自己管理偏移量,这样以来无论是程序升级,还是故障重启,在框架端都可以做到Exact One准确一次的语义。

本篇文章,会再介绍下,如何手动管理kafka的offset,并给出具体的代码加以分析:

版本:

apache spark streaming2.1

apache kafka 0.9.0.0

手动管理offset的注意点:

(1)第一次项目启动的时候,因为zk里面没有偏移量,所以使用KafkaUtils直接创建InputStream,默认是从最新的偏移量开始消费,这一点可以控制。

(2)如果非第一次启动,zk里面已经存在偏移量,所以我们读取zk的偏移量,并把它传入到KafkaUtils中,从上次结束时的偏移量开始消费处理。

(3)在foreachRDD里面,对每一个批次的数据处理之后,再次更新存在zk里面的偏移量

注意上面的3个步骤,1和2只会加载一次,第3个步骤是每个批次里面都会执行一次。

下面看第一和第二个步骤的核心代码:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
1/****
2    *
3    * @param ssc  StreamingContext
4    * @param kafkaParams  配置kafka的参数
5    * @param zkClient  zk连接的client
6    * @param zkOffsetPath zk里面偏移量的路径
7    * @param topics     需要处理的topic
8    * @return   InputDStream[(String, String)] 返回输入流
9    */
10  def createKafkaStream(ssc: StreamingContext,
11                        kafkaParams: Map[String, String],
12                        zkClient: ZkClient,
13                        zkOffsetPath: String,
14                        topics: Set[String]): InputDStream[(String, String)]={
15    //目前仅支持一个topic的偏移量处理,读取zk里面偏移量字符串
16    val zkOffsetData=KafkaOffsetManager.readOffsets(zkClient,zkOffsetPath,topics.last)
17
18    val kafkaStream = zkOffsetData match {
19      case None =>  //如果从zk里面没有读到偏移量,就说明是系统第一次启动
20        log.info("系统第一次启动,没有读取到偏移量,默认就最新的offset开始消费")
21        //使用最新的偏移量创建DirectStream
22        KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
23      case Some(lastStopOffset) =>
24        log.info("从zk中读取到偏移量,从上次的偏移量开始消费数据......")
25        val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.key, mmd.message)
26        //使用上次停止时候的偏移量创建DirectStream
27        KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc, kafkaParams, lastStopOffset, messageHandler)
28    }
29    kafkaStream//返回创建的kafkaStream
30  }
31

主要是针对第一次启动,和非首次启动做了不同的处理。

然后看下第三个步骤的代码:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
1/****
2    * 保存每个批次的rdd的offset到zk中
3    * @param zkClient zk连接的client
4    * @param zkOffsetPath   偏移量路径
5    * @param rdd     每个批次的rdd
6    */
7  def saveOffsets(zkClient: ZkClient, zkOffsetPath: String, rdd: RDD[_]): Unit = {
8    //转换rdd为Array[OffsetRange]
9    val offsetsRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
10    //转换每个OffsetRange为存储到zk时的字符串格式 :  分区序号1:偏移量1,分区序号2:偏移量2,......
11    val offsetsRangesStr = offsetsRanges.map(offsetRange => s"${offsetRange.partition}:${offsetRange.untilOffset}").mkString(",")
12    log.debug(" 保存的偏移量:  "+offsetsRangesStr)
13    //将最终的字符串结果保存到zk里面
14    ZkUtils.updatePersistentPath(zkClient, zkOffsetPath, offsetsRangesStr)
15  }
16

主要是更新每个批次的偏移量到zk中。

例子已经上传到github中,有兴趣的同学可以参考这个链接:

https://github.com/qindongliang/streaming-offset-to-zk

后续文章会聊一下为了升级应用如何优雅的关闭的流程序,以及在kafka扩展分区时,上面的程序如何自动兼容。

给TA打赏
共{{data.count}}人
人已打赏
安全网络

CDN安全市场到2022年价值76.3亿美元

2018-2-1 18:02:50

安全技术

JavaScript使用cookie

2021-12-21 16:36:11

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索