释放双眼,带上耳机,听听看~!
使用 elasticsearch-hadoop 包,可在 github 中搜索到该项目
项目地址
example
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 1import org.elasticsearch.spark._
2import org.elasticsearch.spark.sql._
3
4val conf = new SparkConf()
5 .set("es.nodes","192.168.47.155")
6 .set("es.port","9200")
7 .setMaster("spark://...")
8 .setAppName("es_hdfs")
9
10val sc = new SparkConf(sc)
11
12//查询合作方为abc的数据
13val query = """{"query":{"match":{"activity.partnerCode": "abc"}}}"""
14
15//将在es中的查询结果转化为rdd/dataFrame
16val esRdd = sc.esRDD(s"index/type",query)
17//直接读入全部数据
18val esDf = sqlContext.esDF(s"index/type")
19
20//对读入rdd/dataFrame进行操作
21esRdd.map(r=>{...})
22esDf.flatMap(r=>{......})
23
24//将dataFrame/rdd写入es
25esRdd.saveToEs("index/type")
26resultDf.saveToEs("index/type")
27
28
Tips
从es读入数据时,读取的并发度由es的分片数决定。