首页 > 数据库 >Spark:流式读取Kafka后读取ES并存储值Mysql,业务以及源码(一)

Spark:流式读取Kafka后读取ES并存储值Mysql,业务以及源码(一)

时间:2022-11-06 15:45:55浏览次数:64  
标签:读取 Kafka 源码 conf import apache org spark es

业务:

  最近公司需要处理一些关于数据的问题,需要spark+kafka+es+mysql 进行联合处理

  主要的业务也比较简单,大致是如下图

 

主要步骤如下:

  1. 一级项目将相关的处理标识提交至kafka
  2. spark读取kafka获取到相关的处理标识
  3. 根据相关的标识读取es数据
  4. 讲数据存储只Mysql

项目环境:

  • spark:3.0.0
  • scala:2.12.11
  • es:8.2.3

 

pom文件:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>System</artifactId>
        <groupId>com.dispose.metadata</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>sparkDispose</artifactId>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

    <dependencies>
        <!--避免部分jar包冲突导致大部分的功能无法使用,建议优先将运行环境的相关的jar配置在最前-->

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_${scala.version}</artifactId>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-network-common_${scala.version}</artifactId>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-yarn_${scala.version}</artifactId>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_${scala.version}</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_${scala.version}</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_${scala.version}</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_${scala.version}</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-mllib_${scala.version}</artifactId>
        </dependency>

        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
        </dependency>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-compiler</artifactId>
        </dependency>

        <!--elasticsearch-->
        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch-spark-30_2.12</artifactId>
            <version>8.2.3</version>
        </dependency>

        <!--excel-->
        <dependency>
            <groupId>com.crealytics</groupId>
            <artifactId>spark-excel_2.12</artifactId>
            <version>3.0.1_0.18.3</version>
        </dependency>

        <!--json-->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.54</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-core</artifactId>
            <version>2.10.1</version>
        </dependency>

        <!--mysql-->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.19</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid</artifactId>
            <version>1.1.10</version>
        </dependency>

        <dependency>
            <groupId>com.dounine</groupId>
            <artifactId>spark-sql-datasource</artifactId>
            <version>1.0.4</version>
        </dependency>


    </dependencies>
    <build>
        <plugins>
            <!-- 该插件用于将 Scala 代码编译成 class 文件 -->
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.2</version>
                <executions>
                    <execution>
                        <id>scala-compile-first</id>
                        <phase>process-resources</phase>
                        <goals>
                            <goal>add-source</goal>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                    <execution>
                        <id>scala-test-compile</id>
                        <phase>process-test-resources</phase>
                        <goals>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>

            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.1.0</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>


            <plugin>
                <groupId>org.scala-tools</groupId>
                <artifactId>maven-scala-plugin</artifactId>
                <version>2.15.2</version>
                <executions>
                    <execution>
                        <id>scala-compile-first</id>
                        <goals>
                            <goal>compile</goal>
                        </goals>
                        <configuration>
                            <includes>
                                <include>**/*.scala</include>
                            </includes>
                        </configuration>
                    </execution>
                    <execution>
                        <id>scala-test-compile</id>
                        <goals>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>${maven.compiler.source.version}</source>
                    <target>${maven.compiler.target.version}</target>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-jar-plugin</artifactId>
                <version>2.4</version>
                <!-- The configuration of the plugin -->
                <configuration>
                    <!-- Configuration of the archiver -->
                    <archive>
                        <!--生成的jar中,不要包含pom.xml和pom.properties这两个文件-->
                        <addMavenDescriptor>true</addMavenDescriptor>

                        <!-- Manifest specific configuration -->
                        <manifest>
                            <!--是否要把第三方jar放到manifest的classpath中-->
                            <addClasspath>true</addClasspath>
                            <!--生成的manifest中classpath的前缀,因为要把第三方jar放到lib目录下,所以classpath的前缀是lib/-->
                            <classpathPrefix>lib/</classpathPrefix>
                        </manifest>
                    </archive>

                </configuration>
            </plugin>

        </plugins>
    </build>

</project>

 

源码:

 

package project.local


import com.alibaba.fastjson.{JSON, JSONObject}
import common.objcet.conf.TidbConfLocal
import common.utils.connect.JdbcConnectUtils
import common.utils.kafka.KafkaHostUtils
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Row, SQLContext, SaveMode, SparkSession}
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
import org.elasticsearch.spark._

/**
 * 实现讲kafka数据读取后,查询es,存储至tidb
 */
object Read_Kafka_DisposeES2Tidb4 {
  def main(args: Array[String]): Unit = {

    val conf: SparkConf = new SparkConf().setAppName(this.getClass.getName).setMaster("local[*]")

    //es 主要存储相关的

    conf.set("es.nodes", "172.24.5.101")
    conf.set("es.port", "9200")
    //conf.set("es.scroll.size", "1")
    //闭slice优化,默认是打开的,会根据索引的数据量 和spark.es.input.max.docs.per.partition大小划分分区
    //数据量 大的时候不建议关闭
    //conf.set("es.input.use.sliced.partitions", "false")
    //mapping映射的数据类型是Date导致的bug
    //conf.set("es.mapping.date.rich", "false")
    conf.set("es.read.metadata.field", "_id")
    conf.set("es.nodes.wan.only", "true")
    //conf.set("es.index.auto.create", "true")
    //conf.set("es.output.json", "true")
    //conf.set("es.input.json", "true")
    //conf.set("es.http.timeout", "100s")
    conf.set("es.net.http.auth.user", "elastic")
    conf.set("es.net.http.auth.pass", "wfes@231")
    // 从源头过滤相关的字段,_source不生效,需要在这里设置es查询结果返回的字段,后面单独说明
    conf.set("es.internal.mr.target.fields", "did")

    // conf.set("es.net.ssl","true")
    //conf.set("es.net.ssl.cert.allow.self.signed", "true")
    // conf.set("es.net.ssl.keystore.location","file:///F:\\http.p12")
    //conf.set("es.net.ssl.keystore.type","pkcs12")
    // conf.set("es.net.ssl.keystore.pass", "password")
    //conf.set("es.net.ssl.cert.allow.self.signed","false")

    val sc = new SparkContext(conf)
    val context = new SQLContext(sc)
    //十秒执行
    val streamingContext = new StreamingContext(sc, Seconds(10))
    //创建kafka对象
    val kafkaStream: InputDStream[ConsumerRecord[String, String]] = KafkaHostUtils.installKafka(streamingContext, "knowledgeDir1")

    val kafkaStreamValue: DStream[(String)] = kafkaStream.transform(rdd => {
      val value: RDD[(String)] = rdd.mapPartitions(iter => {
        iter.map(line => {
          println(line.value())
          (line.value())
        })
      }).filter(f => {
        f != null
      })
      value
    })


    val value: DStream[Int] = kafkaStreamValue.transform(rdd => {

      if (!rdd.isEmpty()) {
        rdd.collect().foreach(c => {
          //消息数据转换成json对象
          val nObject: JSONObject = JSON.parseObject(c)
          //项目编号
          val projectString: String = nObject.get("project").toString
          //索引
          val index: String = nObject.get("index").toString
          //知识目录节点唯一识别
          val uniqueidentifier: String = nObject.get("uniqueidentifier").toString.replace("/","_")
          //知识目录唯一识别
          val uniqueidentifierMaster: String = nObject.get("uniqueidentifierMaster").toString.replace("/","_")
          //每个节点的具体参数
          val query: String = nObject.get("rule").toString

          val valueES: RDD[(String, String)] = sc.esJsonRDD("sti_repo_all", query)
          val valueKeyRdd: RDD[Row] = valueES.map(x => {
            Row(projectString, uniqueidentifierMaster, uniqueidentifier, index, query,  x._1)
          })

          //存储至mysql
          //结构schema
          val commSchema = StructType(List(
            StructField("project", StringType, nullable = false),
            StructField("uniqueidentifierMaster", StringType, nullable = false),
            StructField("uniqueidentifier", StringType, nullable = false),
            StructField("did", StringType, nullable = false),
            StructField("query", StringType, nullable = false),
            StructField("uid", StringType, nullable = false)
          ))

          val frameItem: DataFrame = context.createDataFrame(valueKeyRdd, commSchema)
      //这个对象 需要自己创建,配置工具类进行使用功能,主要涵盖有tidb的相关配置信息等
         val tidbConf: TidbConfLocal = new TidbConfLocal("sticloud.signmapping", "directory_record_" + uniqueidentifierMaster);
          frameItem.show(10)

          JdbcConnectUtils.jdbcSourceDBWriterDataFrame(tidbConf, frameItem, SaveMode.Append);
        })

        val seq = Seq[Int](elems = 1)
        sc.makeRDD(seq)
      } else {
        val seq = Seq[Int](elems = 2)
        sc.makeRDD(seq)
      }

    })

    value.print()


    streamingContext.start();
    streamingContext.awaitTermination();


  }


}
package common.utils.kafka

import common.objcet.conf.KafKaConf
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent

object KafkaHostUtils {

  def installKafka(streamingContext: StreamingContext, topic: String): InputDStream[ConsumerRecord[String, String]] = {
    //消费者配置
    val confKafka = new KafKaConf()
    KafkaUtils.createDirectStream[String, String](
      streamingContext,
      PreferConsistent,
      Subscribe[String, String](Array(topic), confKafka.kafkaParams))
  }


}
package common.utils.connect

import common.objcet.conf.DbConfOther
import common.utils.spark.SparkUtils
import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions
import org.apache.spark.sql.{DataFrame, SaveMode}

import java.util.Properties

object JdbcConnectUtils {

  

  /** *
   * 作用:
   * -->主要用于将数据存储至相关的数据库
   * -->支持mysql、支持 TIDB
   *
   * @param dbParamOther :数据库相关配置
   * @value 相关参数
   *        url:数据库地址
   *        dbName:数据库名称
   *        tableName:数据库表名
   *        user:数据库用户名
   *        password:数据库用户密码
   *        driver:数据库连接驱动
   *        jdbcBatchInsertSize:一次提交的数据量
   * @Test :测试用例说明
   *       参考:mysql.local.Create_MysqlTable_UtilsDemo
   * @param saveMode       :存储模式
   *                       SaveMode.Overwrite
   *                       SaveMode.Append
   *                       SaveMode.Overwrite
   *                       SaveMode.Ignore
   * @param frameItem      :存储的数据
   * @param sparkModeParam :运行模式
   * @return DataFrame
   * @author fds
   * @version 1.0
   * @since JDK1.8
   */
  def jdbcSourceDBWriterDataFrame: (DbConfOther, DataFrame, SaveMode) => Unit = (dbParamOther: DbConfOther, frameItem: DataFrame, saveMode: SaveMode) => {

    frameItem.write.format("jdbc")
      .mode(saveMode)
      .option("url", dbParamOther.url + "/" + dbParamOther.dbName + "?rewriteBatchedStatements=true")
      .option("dbtable", dbParamOther.tableName)
      .option("user", dbParamOther.user)
      .option("password", dbParamOther.password)
      .option("driver", dbParamOther.driver)
      //注释掉当前行,没有表则自动创建
      .option("truncate", dbParamOther.truncate)
      .option(JDBCOptions.JDBC_BATCH_INSERT_SIZE, dbParamOther.jdbcBatchInsertSize)
      .option("isolationLevel", "NONE").save()

  }
}

 

标签:读取,Kafka,源码,conf,import,apache,org,spark,es
From: https://www.cnblogs.com/EeDFanRen/p/16862691.html

相关文章

  • 我看谁还不懂多线程之间的通信+基础入门+实战教程+详细介绍+附源码
    一、多线程之间的通信(Java版本)1、多线程概念介绍多线程概念在我们的程序层面来说,多线程通常是在每个进程中执行的,相应的附和我们常说的线程与进程之间的关系。线程与进程的......
  • 【lwip】10-ICMP协议&源码分析
    目录前言10.1ICMP简介10.2ICMP报文10.2.1ICMP报文格式10.2.2ICMP报文类型10.2.3ICMP报文固定首部字段意义10.3ICMP差错报告报文10.3.1目的不可达10.3.2源站抑制10.......
  • ArrayList源码分析
    目标:理解ArrayList的底层数据结构深入掌握ArrayList查询快,增删慢的原因掌握ArrayList的扩容机制掌握ArrayList初始化容量过程掌握ArrayList出现线程安全问题原因及解......
  • SpringBoot03(读取配置内容)
    SpringBoot读取propriety/yml/yaml配置中的内容3种方法方法1@value@RestControllerpublicclassHelloController{/*@Value("${XXXX}")这个是@Value的固定格式,XX......
  • 盘点一个Pandas新手在文件读取路上遇到的问题
    大家好,我是皮皮。一、前言国庆期间在Python铂金交流群【暮雨和】问了一个Pandas处理的问题,提问截图如下:代码截图如下:新手上路,遇到的问题还是挺多的。二、实现过程......
  • SpringMVC源码-获取HandleAdapter和调用HandleAdapter.handle
    DispatcherServlet.getHandlerAdapter(Objecthandler)protectedHandlerAdaptergetHandlerAdapter(Objecthandler)throwsServletException{ if(this.handlerAdapt......
  • 读取数组树下的某值,并返回其父级下的任何值 vue
    1//遍历树获取对应id的项中的值2queryTree(tree,value){3letstark=[];4stark=stark.concat(tree);5while(stark.length)......
  • 20、读取成绩文件排序数据
    题目:  输入文件:三列:学号、姓名、成绩列之间用逗号分割,比如”101,小张,88“行之间用\n换行分割待处理文件名:  内容如下:    思路:  1、先把读取文件函数......
  • cesium源码分析-Worker&gltf
    Worker:cesium中使用线程的一个方面是进行几何数据计算,通常计算耗时,放到线程中计算也很合理,但是通常几何数据占用相当大的内存,虽然浏览器中主线程与子线程传递数据可以使用......
  • SpringMVC源码-getHandler
    DispatcherServlet.getHandler(HttpServletRequestrequest)protectedHandlerExecutionChaingetHandler(HttpServletRequestrequest)throwsException{ if(this.ha......