Flume+kafka+spark streaming+Redis实时统计广告投放的pv,uv,click,cost

释放双眼,带上耳机,听听看~!

因为业务逻辑的修改,投放数据存入大数据集群中,因此,需要修改之前的业务逻辑,需要实时知道rtb投放的花费情况。

环境版本:

spark: 2.11-2.4.0-cdh6.2.0

kafka: 2.1.0-cdh6.2.0

fluem: 1.9.0-cdh6.2.0

    1. Flume配置


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
1a1.sources = r1
2
3a1.sinks = k1
4
5a1.channels = c1
6
7#sources
8
9a1.sources.r1.type = spooldir # 监控目录方式读取新增的日志文件
10
11a1.sources.r1.recursiveDirectorySearch = true # flume在1.9版本中已实现递归监视子文件夹里的有新文件产生的功能
12
13a1.sources.r1.spoolDir = /mnt/data1/flume_data/bid_bsw # flume监控的目录
14
15a1.sources.r1.fileHeader = false
16
17a1.sources.r1.ignorePattern = ^(.)*\\.[0|1]{1}$
18
19a1.sources.r1.includePattern = ^bid.log.[0-9]{12}$
20
21#channels
22
23a1.channels.c1.type = memory
24
25a1.channels.c1.capacity = 1000
26
27a1.channels.c1.transactionCapacity = 100
28
29#sinks
30
31a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
32
33a1.sinks.k1.kafka.topic = statistic_bid_topic # 读取的topic
34
35a1.sinks.k1.kafka.bootstrap.servers = 192.168.1.65:9092,192.168.1.66:9092,192.168.1.67:9092
36
37a1.sinks.k1.kafka.producer.acks=1
38
39a1.sinks.k1.kafka.flumeBatchSize =20
40
41a1.sinks.k1.channel = c1
42
43a1.sources.r1.channels = c1
44

说明:

a1.sources.r1.spoolDir配置监控bid日志的目录;

a1.sources.r1.recursiveDirectorySearch配置此监控目录可以递归监控是否有新的文件产生;

a1.sinks.k1.kafka.bootstrap.servers配置kafka的Destination Broker List值

    1. kafka安装和配置

Flume+kafka+spark streaming+Redis实时统计广告投放的pv,uv,click,cost

在cdh集群:操作->添加服务->Kafka进行相应添加

安装cdh-master,cdh-slaver1,cdh-slaver2作为kafka broker;

cdh-master作为Kafka MirrorMaker

Flume+kafka+spark streaming+Redis实时统计广告投放的pv,uv,click,cost

Kafka配置:

zookeeper.chroot

设置为
/kafka

auto.create.topics.enable
:勾选

delete.topic.enable
:勾选

broker.id

Flume+kafka+spark streaming+Redis实时统计广告投放的pv,uv,click,cost

log.dirs

/XXX/kafka/data

注意:如果此路径的磁盘空间不足,kafka
会报错关闭!

bootstrap.servers

192.168.1.65:9092,192.168.1.66:9092,192.168.1.67:9092  (安装kafka的服务器,如上图所示)

source.bootstrap.servers

192.168.1.65:9092,192.168.1.66:9092,192.168.1.67:9092

whitelist

192.168.1.65:9092,192.168.1.66:9092,192.168.1.67:9092

oom_heap_dump_dir

/tmp

kafka设置topic命令:

到kafka的安装bin目录下,执行:

./kafka-topics.sh –create –topic statistic_bid_topic –zookeeper cdh-master.lavapm:2181,cdh-slaver1.lavapm:2181,cdh-slaver2.lavapm:2181/kafka –partitions 3 –replication-factor 2

设置了topic名字为statistic_bid_topic,设置了3个分区,2个副本;

查看所有的topic命令:

./kafka-topics.sh –zookeeper 192.168.1.65:2181/kafka –list

查看某个topic详细信息:

列出了statistic_bid_topic的parition数量、replica因子以及每个partition的leader、replica信息,命令为:

./kafka-topics.sh –zookeeper 192.168.1.65:2181/kafka –topic statistic_bid_topic –describe

Flume+kafka+spark streaming+Redis实时统计广告投放的pv,uv,click,cost

查看consumer group列表

使用–list参数,命令为:

./kafka-consumer-groups.sh –bootstrap-server 192.168.1.65:9092 –list

Flume+kafka+spark streaming+Redis实时统计广告投放的pv,uv,click,cost

**查看特定****consumer group **详情

使用
–group

–describe
参数
,运行命令:

./kafka-consumer-groups.sh –bootstrap-server 192.168.1.65:9092 –group statistic_bid_topic_Group –describe

Flume+kafka+spark streaming+Redis实时统计广告投放的pv,uv,click,cost

接收消息,从头开始接收全部数据命令:

./kafka-console-consumer.sh –bootstrap-server 192.168.1.65:9092 –topic statistic_bid_topic –from-beginning

生产消息命令:

./kafka-console-producer.sh –broker-list cdh-master.lavapm:9092 –topic statistic_bid_topic

  • 3.spark streaming开发

实时读取kafka对应topic中的bid数据,一行一行解析json字符串,获取对应的imp,bid,click数据,然后做统计和合并,最后以增量方式存入redis中,因为是用于每天的投放统计,因此,存储的key的格式为:年月日::oid::推广单元id,如下图所示:

Flume+kafka+spark streaming+Redis实时统计广告投放的pv,uv,click,cost

具体某个推广单元的详细数据如下图所示:

Flume+kafka+spark streaming+Redis实时统计广告投放的pv,uv,click,cost

代码实现使用scala:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
1import java.text.SimpleDateFormat
2import java.util.Date
3import org.apache.kafka.common.serialization.StringDeserializer
4import org.apache.log4j.{Level, Logger}
5import org.apache.spark.rdd.RDD
6import org.apache.spark.streaming.{Seconds, StreamingContext}
7import org.apache.spark.{SparkConf, SparkContext}
8import org.apache.spark.streaming.kafka010._
9import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
10import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
11import org.apache.spark.sql.SparkSession
12import org.apache.spark.streaming.dstream.DStream
13import org.apache.spark.HashPartitioner
14import scala.util.parsing.json._
15import redis.clients.jedis.{Jedis, JedisPool, JedisPoolConfig}
16
17object KafkaBidStatistic extends Serializable {
18
19  def regJson(json: Option[Any]) = json match {
20    // 转换类型
21    case Some(map: collection.immutable.Map[String, Any]) => map
22  }
23
24  /**
25    * 解析多层json
26    *
27    * @param string_json
28    * @return
29    */
30  def str_json(string_json: String): collection.immutable.Map[String, Any] = {
31    var first: collection.immutable.Map[String, Any] = collection.immutable.Map()
32    val jsonS = JSON.parseFull(string_json)
33    //不确定数据的类型时,此处加异常判断
34    if (jsonS.isInstanceOf[Option[Any]]) {
35      first = regJson(jsonS);
36    }
37    first
38  }
39
40  def get_str_json(line: String): String = {
41    var ret = "";
42    try{
43      var first: collection.immutable.Map[String, Any] = str_json(line);
44      // 根据imp日志数据的特征,提取一种imp日志数据作为统计值
45      // imp日志数据列子:{"cip":"223.24.172.199","mid":"687","pid":"","oid":"284","et":{"bid":1557196078,"imp":1557196080},"wd":{},"ip":"35.200.110.13","ov":1,"ex":20191211,"url":"","usc":["IAB18"],"uri":"\/imp?ext_data=b2lkPTI4NCxjaWQ9LHV1aWQ9VURhSlVmQXEybW1acmVpSTZzWVJCZSxtZT1iaWRzd2l0Y2gsZXg9Ymlkc3dpdGNoLHBjPTAsbGlkPSxuYXQ9MCx2cj0wLHV0PTE1NTcxOTYwNzgsZG9tPSxwaWQ9LG9jYz1jaW0sY2NwPTU2MDAwMCxhaWQ9Njg3LHk9MCxhdj0w&ver=1&reqid=bidswitch&price=0.55992","occ":"cim","me":"bidswitch","mobile":{"id":"","os":2},"ua":"BidSwitch\/1.0","id":"UDaJUfAq2mmZreiI6sYRBe","op":"","ccp":1,"ins":{},"sz":"320x50","reuse":1,"wp":0.55992,"is_mobile":1,"nwp":1,"ev":"imp","rg":1840000000,"did":"CDz2mRv7s7fffMuVY3eeuy"}
46      //  && first.contains("uri") 国内投放没有此值!
47      if (first("ev").toString == "imp" && first.contains("me")) { // first.getOrElse("me" , 1)   && first("me").toString=="bidswitch"
48        val wp = first("wp");
49        val mid = first("mid").toString;
50        val oid = first("oid").toString;
51        val ev = first("ev").toString;
52        val id = first("id").toString;
53        ret = oid + "," + mid + "," + wp + ",\"" + ev + "\",\"" + id + "\""; //  oid,mid,wp,"imp","id"
54      }
55      else if (first("ev").toString == "bid") {
56        val mid = first("mid").toString;
57        val oid = first("oid").toString;
58        val id = first("id").toString;
59        val ev = first("ev").toString;
60        ret = oid + "," + mid + ",\"" + ev + "\",\"" + id + "\""; //  oid,mid,"bid","id"
61      }
62      else if (first("ev").toString == "clk") {
63        val mid = first("mid").toString;
64        val oid = first("oid").toString;
65        val id = first("id").toString;
66        val ev = first("ev").toString;
67        ret = oid + "," + mid + ",\"" + ev + "\",\"" + id + "\""; //  oid,mid,"clk","id"
68      }
69    }
70    catch{
71      case ex: Exception => {
72        ex.printStackTrace() // 打印到标准err
73        System.err.println("get_str_json()解析字符串异常! line = "+line)  // 打印到标准err
74      }
75    }
76    ret;
77  }
78
79
80  def main(args: Array[String]): Unit = {
81  //  Logger.getLogger("org.apache.spark").setLevel(Level.WARN);
82
83    Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
84    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.ERROR)
85    Logger.getLogger("org.apache.kafka.clients.consumer").setLevel(Level.ERROR)
86
87    val conf = new SparkConf();
88    conf.setMaster("yarn").setAppName("statisticBid2Redis")
89  //每隔2秒监测一次是否有数据
90    val ssc = new StreamingContext(conf, Seconds.apply(2))    
91    val ssce = SparkSession.builder().config(conf).getOrCreate
92
93    // 提高Job并发数
94    //  spark.conf.set("spark.streaming.concurrentJobs", 10)
95    // 获取topic分区leaders及其最新offsets时,调大重试次数。 Receiver模式下使用
96    //spark.conf.set("spark.streaming.kafka.maxRetries", 50)
97    //开启反压
98    ssce.conf.set("spark.streaming.backpressure.enabled",true)
99    //确保在kill任务时,能够处理完最后一批数据,再关闭程序,不会发生强制kill导致数据处理中断,没处理完的数据丢失
100    ssce.conf.set("spark.streaming.stopGracefullyOnShutdown",true)
101    //反压机制时初始化的摄入量,该参数只对receiver模式起作用,并不适用于direct模式
102    ssce.conf.set("spark.streaming.backpressure.initialRate",5000)
103    // 设置每秒每个分区最大获取日志数,控制处理数据量,保证数据均匀处理。
104    // 每次作业中每个,Kafka分区最多读取的记录条数。可以防止第一个批次流量出现过载情况,也可以防止批次处理出现数据流量过载情况
105    ssce.conf.set("spark.streaming.kafka.maxRatePerPartition",3000)
106
107    val sc = ssce.sparkContext;
108    println("********* spark.default.parallelism *********"+sc.defaultParallelism)
109
110    val kafkaParams = Map[String, Object](
111      "bootstrap.servers" -> "192.168.1.65:9092,192.168.1.66:9092,192.168.1.67:9092", // kafka 集群
112      "key.deserializer" -> classOf[StringDeserializer],
113      "value.deserializer" -> classOf[StringDeserializer],
114      "group.id" -> "statistic_bid_topic_Group", //  statistic_bid_topic_Group
115      "auto.offset.reset" -> "latest", // 1.earliest 每次都是从头开始消费(from-beginning)2.latest 消费最新消息 3.smallest 从最早的消息开始读取
116      "enable.auto.commit" -> (true: java.lang.Boolean) //true , false=手动提交offset
117    )
118
119   val topics = Array("statistic_bid_topic") //主题,可配置多个  
120
121    //Direct方式
122    val stream = KafkaUtils.createDirectStream[String, String](
123      ssc,
124      PreferConsistent,
125      Subscribe[String, String](topics, kafkaParams)
126    )
127
128    stream.foreachRDD { kafkaRDD =>
129      if (!kafkaRDD.isEmpty()) {
130        val now: Date = new Date();
131        val dateFormat: SimpleDateFormat = new SimpleDateFormat("yyyyMMdd");
132        val date_str = dateFormat.format(now);
133        val df: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");//yyyyMMddHHmmss
134
135      //  val offsetRanges = kafkaRDD.asInstanceOf[HasOffsetRanges].offsetRanges
136
137        val lines: RDD[String] = kafkaRDD.map(e => (e.value()))
138
139        //解析json字符串
140        val rdd = lines.flatMap {
141          line =>
142            val parse_str = get_str_json(line);
143            val arr = parse_str.split("\t");
144            arr;
145        }.filter(word => word.nonEmpty).distinct(); //去空行
146
147        rdd.cache();
148
149        // imp      oid,mid,wp,"imp","id"
150        val rdd_imp = rdd.flatMap(_.split("\t")).filter(_.indexOf("\"imp\"") > 0).map(_.split(","));
151        val rdd_imp1 = rdd_imp.map(word => (word(0), word(1)) -> (word(2).toDouble / 1000).formatted("%.6f").toDouble); // oid,mid->wp    价格是cpm,因此要除以1000
152
153        // 统计 cost
154        //val result_imp_cost = rdd_imp1.reduceByKey(_ + _).sortBy(_._2, false).map({ case ((oid, mid), wp) => (oid, mid) -> (wp.toDouble.formatted("%.6f").toDouble*1000000).toInt }); bidswitch数据是真实价格,没有*1000
155        val result_imp_cost = rdd_imp1.reduceByKey(_ + _).sortBy(_._2, false).map({ case ((oid, mid), wp) => (oid, mid) -> (wp.toDouble.formatted("%.6f").toDouble*1000).toInt });
156
157        // 统计 pv
158        val rdd_imp3 = rdd_imp1.map({ case ((oid, mid), wp) => (oid, mid) -> 1 }); // oid,mid->1
159        val result_imp_count = rdd_imp3.reduceByKey(_ + _).sortBy(_._2, false);// 统计pv
160
161        //统计 bid num   oid,mid,"bid","id"
162        val rdd_bid = rdd.flatMap(_.split("\t")).distinct.filter(_.indexOf("\"bid\"") > 0).map(_.split(","));
163        val rdd_bid1 = rdd_bid.map(word => (word(0), word(1)) -> 1);
164        val result_bid_count = rdd_bid1.reduceByKey(_ + _).sortBy(_._2, false);
165
166        //统计 clk num  oid,mid,"clk","id"
167        val rdd_clk = rdd.flatMap(_.split("\t")).distinct.filter(_.indexOf("\"clk\"") > 0).map(_.split(","));
168        val rdd_clk1 = rdd_clk.map(word => (word(0), word(1)) -> 1);
169        val result_clk_count = rdd_clk1.reduceByKey(_ + _).sortBy(_._2, false);
170
171        // join imp+bid+clk
172        val rdd_join1 = result_imp_cost.join(result_imp_count).map({ case ((oid, mid), (wp, pv)) => (oid, mid) -> (wp, pv) }); // ((oid,mid),(wp,pv))
173        val rdd_join2 = result_bid_count.leftOuterJoin(rdd_join1);
174        // (oid,mid),(bid,(wp,pv))  (wp,pv)Option[(Double,Int)]
175        val rdd_join3 = rdd_join2.map { case ((oid, mid), (bid, x)) =>
176          val (wp, pv) = x.getOrElse((0, 0));
177          (oid, mid) -> (bid, wp, pv)
178        }
179        val rdd_join4 = rdd_join3.fullOuterJoin(result_clk_count);
180        // (oid,mid),((bid,wp,pv),clk)
181        val rdd_join5 = rdd_join4.map { case ((oid, mid), (x, y)) =>
182          val (bid, wp, pv) = x.getOrElse((0, 0, 0));
183          (oid, mid, bid, wp, pv, y.getOrElse(0))
184        }
185        //"{"+oid+","+mid+","+bid+","+pv+","+clk+","+wp+"}"   oid,mid,bid,wp,pv,clk   .sortBy(f => (f._4), false).
186        val rdd_join6 = rdd_join5.map({ case (oid, mid, bid, wp, pv, clk) => (oid, mid, bid, pv, clk, wp) }); // oid,mid,bid,pv,clk,wp
187        // println("!!! ### after join = " + rdd_join6.count() + " , detail info: ");
188
189        println(" ********* statistic finished ********* "+df.format(now)+" ********* rdd_join6 = "+rdd_join6.count())
190
191        // 存入redis
192        rdd_join6.foreachPartition(partitionOfRecords => {
193          partitionOfRecords.foreach(pair => { // oid,mid,bid,pv,clk,wp
194            val conn: Jedis = JedisConnectionPools.getConnection()// 获取jedis连接
195            val oid = pair._1 // oid, mid, bid, pv, clk, wp
196            val bid = pair._3
197            val pv = pair._4
198            val clk = pair._5
199            val cost = pair._6
200            val HashKey = date_str+"::oid::"+oid          
201            conn.hincrBy(HashKey, "cost", cost.toInt)
202            conn.hincrBy(HashKey, "pv", pv.toInt)
203            conn.hincrBy(HashKey, "bid", bid.toInt)
204            conn.hincrBy(HashKey, "clk", clk.toInt)          
205            conn.close()
206          })
207        })
208
209       // println("### save to redis over ! time = "+df.format(now))
210       // 确保结果都已经正确且幂等地输出了
211       // stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
212       // println("### 更新offset ! time = "+df.format(now))
213
214        rdd.unpersist()
215        println(" ********* save to redis finished ***** "+df.format(now))
216
217      }
218    }
219    ssc.start()
220    ssc.awaitTermination()
221  }
222
223}
224
225

redis实现:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
1import redis.clients.jedis.{Jedis, JedisPool, JedisPoolConfig}
2
3object JedisConnectionPools {
4  val redisHost = "your redis host"
5  val redisPort = 6379
6  val redisTimeout = 30000
7
8  val conf = new JedisPoolConfig()
9  
10  //最大连接数
11  conf.setMaxTotal(20)
12  
13  //最大空闲连接数
14  conf.setMaxIdle(10)
15  
16  //调用borrow Object方法时,是否进行有效检查
17  //conf.setTestOnBorrow(true)
18
19  //ip地址, redis的端口号,连接超时时间
20  val pool = new JedisPool(conf,"your redis host",6379,10000,"password")
21
22  def getConnection():Jedis={
23    pool.getResource
24  }
25
26  def main(args: Array[String]): Unit = {
27    val  conn = JedisConnectionPools.getConnection()  
28    val r1 = conn.keys("oid::mid::*")
29    println(r1)
30    conn.close()  
31  }
32
33}
34

 

intellij IDEA打jar包后,运行于cdh-master机器上,启动命令:

nohup spark-submit –master yarn –queue root.dev –deploy-mode client –jars /home/libs/jedis-2.9.0.jar,/home/libs/commons-pool2-2.0.jar –class com.XXX.KafkaBidStatistic /home/streaming/dspstreaming.jar > /home/logs/test.log 2>&1 &

说明:

  1. 第一次使用streaming开发,网上查找了很多资料,都说要自己使用代码提交offset,但是,我没有手动提交,因为业务需求方面对数据的丢失不是很严格。手动提交需要每次记录offset并存放到如redis,然后,每次读取offset时候手动更新。

  2. 采用Direct方式的streaming,如果程序坏了,重启会丢失之前程序坏了的时候的数据,需要使用checkpoint机制,但是本文暂时还没有实现。

基于direct的方式,使用kafka的简单api,Spark Streaming自己就负责追踪消费的offset,并保存在checkpoint中。Spark自己一定是同步的,因此可以保证数据是消费一次且仅消费一次。

  1. 运行streaming程序后,可以在cdh界面上查看streaming运行情况,地址如下所示:

http://cdh-slaver4.lavapm:8088/proxy/application_1562297838757_0853/streaming/

**scheduling delay:**用来统计在等待被处理所消费的时间;

如果scheduling delay值在程序运行一段时间后,一直在递增,这就表明此系统不能对产生的数据实时响应,就是出现了
处理时延,每个batch time 内的处理速度小于数据的产生速度。在这种情况下,需要想法减少数据的处理速度,即需要
提升处理效率
。可以增加kafka的topic的partition数量,因为其与streaming中的rdd的partition是一一对应的,就会并行读取数据,提高处理速度。

给TA打赏
共{{data.count}}人
人已打赏
安全运维

MongoDB最简单的入门教程之二 使用nodejs访问MongoDB

2021-12-11 11:36:11

安全运维

Ubuntu上NFS的安装配置

2021-12-19 17:36:11

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索