spark读取 kafka nginx网站日志消息 并写入HDFS中

释放双眼,带上耳机,听听看~!

spark 版本为1.0
kafka 版本为0.8

首先来看看kafka的架构图 详细了解请参考官方

我这边有三台机器用于kafka 日志收集的
A 192.168.1.1 为server
B 192.168.1.2 为producer
C 192.168.1.3 为consumer

首先在A上的kafka安装目录下执行如下命令


1
2
1./kafka-server-start.sh ../config/server.properties
2

启动kafka 通过netstat -npl 可以查看出是否开启默认端口9092

B为我们的nginx日志产生服务器,在这里的日志是网站实时写入到access-nginx.log 中
因此我们可以通过 tail -f 的方式能看到当前网站正在请求的日志信息。如果你的网站访问量很大请勿执行tail -f

同样我们也要在B上部署kafka,如果你没有写kafka 的客户端的话(查看客户端API地址 )

执行如下命令来push 数据到集群中


1
2
1tail -n 0 -f   /www/nh-nginx02/access.log  | bin/kafka-console-producer.sh --broker-list 192.168.1.1:9092 --topic sb-nginx03
2

这样我们就将日志push到kafka消息中了

C中,现在我们来写 consumer pull数据,还是要部署一下kafka 然后执行命令


1
2
1bin/kafka-console-consumer.sh --zookeeper 192.168.1.1:2181 --topic sb-nginx03 --from-beginning
2

参数
–zookeeper 指定了你集群中zookeeper 的地址和端口即可
–topic 要和我们在B中push的时候指定的名称一致

上述方式只为在shell 命令行下,如何通过spark来写consumer呢?
假设你已经下载好spark1.0 源码 假设你已经部署好sbt scala等环境

scala 代码如下:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
1package test

3import java.util.Properties



7import org.apache.spark.streaming._
8import org.apache.spark.streaming.StreamingContext._
9import org.apache.spark.streaming.kafka._
10import org.apache.spark.SparkConf
11 
12 
13object KafkaTest {
14 
15  def main(args:Array[String])
16  {
17    if (args.length < 5) {
18      System.err.println("Usage: KafkaTest <zkQuorum> <group> <topics> <numThreads> <output>")
19      System.exit(1)
20    }
21    val Array(zkQuorum, group, topics, numThreads,output) = args
22    val sparkConf = new SparkConf().setAppName("KafkaTest")
23    val ssc =  new StreamingContext(sparkConf, Seconds(2))
24    ssc.checkpoint("checkpoint")
25 
26    val topicpMap = topics.split(",").map((_,numThreads.toInt)).toMap
27    val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap).map(_._2)
28    lines.saveAsTextFiles(output)
29    ssc.start()
30    ssc.awaitTermination()
31 
32    //.saveAsTextFile(output)
33 
34 
35  }
36 
37}
38

然后编译
mvn -Phadoop-2.3 -Dhadoop.version=2.3.0-cdh5.0.1 -DskipTests package

然后spark作业提交


1
2
1./bin/spark-submit  --master local[*]  --class org.apache.spark.fccs.KafkaTest ./test/target/scala-2.10/spark-test-1.0.0-hadoop2.3.0-cdh5.0.1.jar  zoo02 my-test  sb-nginx03 1 hdfs://192.168.1.1:9100/tmp/spark-log.txt
2

结果如下:

spark scala

给TA打赏
共{{data.count}}人
人已打赏
安全网络

CDN安全市场到2022年价值76.3亿美元

2018-2-1 18:02:50

安全运维

数据库命名规则

2021-12-11 11:36:11

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索