使用spark与ElasticSearch交互

释放双眼,带上耳机,听听看~!

使用 elasticsearch-hadoop 包,可在 github 中搜索到该项目

项目地址

example


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
1import org.elasticsearch.spark._
2import org.elasticsearch.spark.sql._
3
4val conf = new SparkConf()
5    .set("es.nodes","192.168.47.155")
6    .set("es.port","9200")
7    .setMaster("spark://...")
8    .setAppName("es_hdfs")
9
10val sc = new SparkConf(sc)
11
12//查询合作方为abc的数据
13val query = """{"query":{"match":{"activity.partnerCode": "abc"}}}"""
14
15//将在es中的查询结果转化为rdd/dataFrame
16val esRdd = sc.esRDD(s"index/type",query)
17//直接读入全部数据
18val esDf = sqlContext.esDF(s"index/type")
19
20//对读入rdd/dataFrame进行操作
21esRdd.map(r=>{...})
22esDf.flatMap(r=>{......})
23
24//将dataFrame/rdd写入es
25esRdd.saveToEs("index/type")
26resultDf.saveToEs("index/type")
27
28

Tips

从es读入数据时,读取的并发度由es的分片数决定。

给TA打赏
共{{data.count}}人
人已打赏
安全运维

OpenSSH-8.7p1离线升级修复安全漏洞

2021-10-23 10:13:25

安全运维

设计模式的设计原则

2021-12-12 17:36:11

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索