博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
es-09-spark集成
阅读量:6657 次
发布时间:2019-06-25

本文共 11632 字,大约阅读时间需要 38 分钟。

es和spark的集成比较简单, 直接使用内部封装的一些方法即可

版本设置说明: 

maven依赖说明: 

 

1, maven配置: 

xiaoniubigdata
com.wenbronk
1.0
4.0.0
spark06-es
2.3.1
2.11
2.11.12
org.apache.spark
spark-core_${spark.scala.version}
${spark.version}
org.apache.spark
spark-sql_${spark.scala.version}
${spark.version}
org.apache.spark
spark-streaming_${spark.scala.version}
${spark.version}
org.elasticsearch
elasticsearch-spark-20_2.11
6.3.2
org.scala-tools
maven-scala-plugin
2.15.2
compile
testCompile
org.apache.maven.plugins
maven-deploy-plugin
2.8.2
true

2, RDD的使用

1), read

package com.wenbronk.spark.es.rddimport org.apache.spark.rdd.RDDimport org.apache.spark.sql.SparkSessionimport org.apache.spark.{SparkConf, SparkContext}/**  * 从es中读取数据  */object ReadMain {  def main(args: Array[String]) = {//    val sparkconf = new SparkConf().setAppName("read-es").setMaster("local[4]")//    val spark = new SparkContext(sparkconf)    val sparkSession = SparkSession.builder()      .appName("read-es-rdd")      .master("local[4]")      .config("es.index.auto.create", true)      .config("es.nodes", "10.124.147.22")      .config("es.port", 9200)      .getOrCreate()    val spark = sparkSession.sparkContext    // 自定义query, 导入es包    import org.elasticsearch.spark._    // 以array方式读取    val esreadRdd: RDD[(String, collection.Map[String, AnyRef])] = spark.esRDD("macsearch_fileds/mac",      """        |{        |  "query": {        |    "match_all": {}        |  }        |}      """.stripMargin)    val value: RDD[(Option[AnyRef], Int)] = esreadRdd.map(_._2.get("mac")).map(mac => (mac, 1)).reduceByKey(_ + _)      .sortBy(_._2)    val tuples: Array[(Option[AnyRef], Int)] = value.collect()    tuples.foreach(println)    esreadRdd.saveAsTextFile("/Users/bronkwen/work/IdeaProjects/xiaoniubigdata/spark06-es/target/json")    sparkSession.close()  }}

2, readJson

package com.wenbronk.spark.es.rddimport org.apache.spark.sql.SparkSessionimport scala.util.parsing.json.JSONobject ReadJsonMain {  def main(args: Array[String]): Unit = {    val sparkSession = SparkSession.builder()      .appName("read-es-rdd")      .master("local[4]")      .config("es.index.auto.create", true)      .config("es.nodes", "10.124.147.22")      .config("es.port", 9200)      .getOrCreate()    val spark = sparkSession.sparkContext    // 使用json的方式读取, 带查询的    import org.elasticsearch.spark._    val esJsonRdd = spark.esJsonRDD("macsearch_fileds/mac",      """      {        "query": {          "match_all": {}        }      }      """.stripMargin)    esJsonRdd.map(_._2).saveAsTextFile("/Users/bronkwen/work/IdeaProjects/xiaoniubigdata/spark06-es/target/json")    sparkSession.close()  }}

3, write

package com.wenbronk.spark.es.rddimport org.apache.spark.rdd.RDDimport org.apache.spark.sql.SparkSessionimport org.elasticsearch.spark.rdd.EsSparkobject WriteMain {  def main(args: Array[String]): Unit = {    val spark = SparkSession.builder()      .master("local[4]")      .appName("write-spark-es")      .config("es.index.auto.create", true)      .config("es.nodes", "10.124.147.22")      .config("es.port", 9200)      .getOrCreate()    val df: RDD[String] = spark.sparkContext.textFile("/Users/bronkwen/work/IdeaProjects/xiaoniubigdata/spark06-es/target/json")//    df.map(_.substring())    import org.elasticsearch.spark._//    df.rdd.saveToEs("spark/docs")//    EsSpark.saveToEs(df, "spark/docs")    EsSpark.saveJsonToEs(df, "spark/json")    spark.close()  }}

4, 写入多个index中

package com.wenbronk.spark.es.rddimport org.apache.spark.sql.SparkSessionobject WriteMultiIndex {  def main(args: Array[String]): Unit = {    val spark = SparkSession.builder()      .master("local[4]")      .appName("es-spark-multiindex")      .config("es.es.index.auto.create", true)      .config("es.nodes", "10.124.147.22")      .config("es.port", 9200)      .getOrCreate()    val sc = spark.sparkContext    val game = Map("media_type"->"game","title" -> "FF VI","year" -> "1994")    val book = Map("media_type" -> "book","title" -> "Harry Potter","year" -> "2010")    val cd = Map("media_type" -> "music","title" -> "Surfing With The Alien")    import org.elasticsearch.spark._    // 可以自定义自己的metadata, 只添加id    sc.makeRDD(Seq((1, game), (2, book), (3, cd))).saveToEs("my-collection-{media_type}/doc")    spark.close()  }}

2, streaming

1), write

package com.wenbronk.spark.es.streamimport org.apache.spark.streaming.dstream.InputDStreamimport org.apache.spark.{SparkConf, SparkContext}import org.apache.spark.streaming.{Seconds, StreamingContext}import org.elasticsearch.spark.rdd.EsSparkimport org.elasticsearch.spark.streaming.EsSparkStreamingimport scala.collection.mutableobject WriteStreamingMain {  def main (args: Array[String]): Unit = {    val conf = new SparkConf().setAppName("es-spark-streaming-write").setMaster("local[4]")      conf.set("es.index.auto.create", "true")      conf.set("es.nodes", "10.124.147.22")      // 默认端口9200, 不知道怎么设置 Int类型    val sc = new SparkContext(conf)    val ssc = new StreamingContext(sc, Seconds(1))    val numbers = Map("one" -> 1, "two" -> 2, "three" -> 3)    val airports = Map("arrival" -> "Otopeni", "SFO" -> "San Fran")    val rdd = sc.makeRDD(Seq(numbers, airports))    val microbatches = mutable.Queue(rdd)    val dstream: InputDStream[Map[String, Any]] = ssc.queueStream(microbatches)//    import org.elasticsearch.spark.streaming._//    dstream.saveToEs("sparkstreaming/doc")//    EsSparkStreaming.saveToEs(dstream, "sparkstreaming/doc")    // 带有id的//    EsSparkStreaming.saveToEs(dstream, "spark/docs", Map("es.mapping.id" -> "id"))    // json格式    EsSparkStreaming.saveJsonToEs(dstream, "sparkstreaming/json")    ssc.start()    ssc.awaitTermination()  }}

2, 写入带有meta的, rdd也是用

package com.wenbronk.spark.es.streamimport org.apache.spark.{SparkConf, SparkContext}import org.apache.spark.streaming.{Seconds, StreamingContext}import scala.collection.mutableobject WriteStreamMeta {  def main(args: Array[String]): Unit = {    val conf = new SparkConf().setAppName("es-spark-streaming-write").setMaster("local[4]")    conf.set("es.index.auto.create", "true")    conf.set("es.nodes", "10.124.147.22")    // 默认端口9200, 不知道怎么设置 Int类型    val sc = new SparkContext(conf)    val ssc = new StreamingContext(sc, Seconds(1))    val otp = Map("iata" -> "OTP", "name" -> "Otopeni")    val muc = Map("iata" -> "MUC", "name" -> "Munich")    val sfo = Map("iata" -> "SFO", "name" -> "San Fran")    val airportsRDD = sc.makeRDD(Seq((1, otp), (2, muc), (3, sfo)))    val microbatches = mutable.Queue(airportsRDD)    import org.elasticsearch.spark.streaming._    ssc.queueStream(microbatches).saveToEsWithMeta("airports/2015")    ssc.start()    ssc.awaitTermination()  }  /**    * 使用多种meta    */  def main1(args: Array[String]): Unit = {    val ID = "id";    val TTL = "ttl"    val VERSION = "version"    val conf = new SparkConf().setAppName("es-spark-streaming-write").setMaster("local[4]")    val sc = new SparkContext(conf)    val ssc = new StreamingContext(sc, Seconds(1))    val otp = Map("iata" -> "OTP", "name" -> "Otopeni")    val muc = Map("iata" -> "MUC", "name" -> "Munich")    val sfo = Map("iata" -> "SFO", "name" -> "San Fran")    // 定义meta 不需要一对一对应    val otpMeta = Map(ID -> 1, TTL -> "3h")    val mucMeta = Map(ID -> 2, VERSION -> "23")    val sfoMeta = Map(ID -> 3)    val airportsRDD = sc.makeRDD(Seq((otpMeta, otp), (mucMeta, muc), (sfoMeta, sfo)))    val microbatches = mutable.Queue(airportsRDD)    import org.elasticsearch.spark.streaming._    ssc.queueStream(microbatches).saveToEsWithMeta("airports/2015")    ssc.start()    ssc.awaitTermination()  }}

3, sql的使用

1), read

package com.wenbronk.spark.es.sqlimport org.apache.spark.sql.{DataFrame, SparkSession}object ESSqlReadMain {  def main(args: Array[String]): Unit = {    val spark = SparkSession.builder()      .master("local[4]")      .appName("es-sql-read")      .config("es.index.auto.create", true)      // 转换sql为es的DSL      .config("pushown", true)      .config("es.nodes", "10.124.147.22")      .config("es.port", 9200)      .getOrCreate()    // 完全查询//    val df: DataFrame = spark.read.format("es").load("macsearch_fileds/mac")    import org.elasticsearch.spark.sql._    val df = spark.esDF("macsearch_fileds/mac",      """        |{        |   "query": {        |     "match_all": {        |   }        |}      """.stripMargin)    // 显示下数据    df.printSchema()    df.createOrReplaceTempView("macseach_fileds")    val dfSql: DataFrame = spark.sql(      """        select         mac,         count(mac) con        from macseach_fileds        group by mac        order by con desc      """.stripMargin)    dfSql.show()    // 存入本地文件中    import spark.implicits._    df.write.json("/Users/bronkwen/work/IdeaProjects/xiaoniubigdata/spark06-es/target/sql/json")    spark.stop()  }}

2), write

package com.wenbronk.spark.es.sqlimport org.apache.spark.sql.{DataFrame, SparkSession}import org.elasticsearch.spark.sql.EsSparkSQLobject ESSqlWriteMain {  def main(args: Array[String]): Unit = {    val spark = SparkSession.builder()      .master("local[4]")      .appName("es-sql-write")      .config("es.index.auto.create", true)      .config("es.nodes", "10.124.147.22")      .config("es.port", 9200)      .getOrCreate()    import spark.implicits._    val df: DataFrame = spark.read.format("json").load("/Users/bronkwen/work/IdeaProjects/xiaoniubigdata/spark06-es/target/sql/json")    df.show()    // json格式直接写入//    import org.elasticsearch.spark.sql._//    df.saveToEs("spark/people")    EsSparkSQL.saveToEs(df, "spark/people")    spark.close()  }}

4, structStream

对 结构化流不太熟悉, 等熟悉了在看

package com.wenbronk.spark.es.structstreamimport org.apache.spark.sql.SparkSessionobject StructStreamWriteMain {  def main(args: Array[String]): Unit = {    val spark = SparkSession.builder()      .appName("structstream-es-write")      .master("local[4]")      .config("es.index.auto.create", true)      .config("es.nodes", "10.124.147.22")      .config("es.port", 9200)      .getOrCreate()    val df = spark.readStream        .format("json")      .load("/Users/bronkwen/work/IdeaProjects/xiaoniubigdata/spark06-es/target/json")    df.writeStream        .option("checkpointLocation", "/save/location")        .format("es")        .start()    spark.close()  }}

 

转载地址:http://xfxto.baihongyu.com/

你可能感兴趣的文章
调侃之VNC
查看>>
Guava学习笔记:Google Guava 类库简介
查看>>
个人网站运维上线——服务器介绍
查看>>
java - mySQL
查看>>
读书---《C++程序设计》[任化敏-编写]
查看>>
浅谈Vue响应式(数组变异方法)
查看>>
2012年网络安全前景几何?
查看>>
【C深度剖析】自实现 strcmp strlen strcpy strcat
查看>>
ThinkPHP 系统常量列表
查看>>
有时候xcode不能debug时
查看>>
小白制作星空字
查看>>
004.使用MSCK命令修复Hive表分区
查看>>
ThreadLocal及Java引用类型
查看>>
iOS 设置一行颜色不同的NSString 、剪切图片、设置TableView的自定义header
查看>>
spring transaction不生效的一些原因
查看>>
我的友情链接
查看>>
详解Nginx+php-5.4+Mysql-5.5+Memcached+redis的架构部署
查看>>
Warning MVC1000
查看>>
cruisecontrol编译识别
查看>>
PHP问题 —— Notice: Undefined index:
查看>>