首页 > 编程语言 >KAFKA+SPARK+PYTHON+FLASK实现信息实时统计系统(本地版本)

KAFKA+SPARK+PYTHON+FLASK实现信息实时统计系统(本地版本)

时间:2025-01-14 09:29:02浏览次数:3  
标签:val PYTHON toInt equals value else FLASK parts KAFKA

一、项目要求

1)使用 spark streaming 创建消费者读取相应主题的数据

2)  使用 spark streaming 实时统计 每隔2秒 分别统计 所有上架和下架各自的数量

3)  使用 spark streaming 实时统计 每隔2秒 各个货品号 各自的数量

4)  使用 spark streaming 实时统计 每隔2秒 各个类别的物品数量

5)  使用 spark sql 统计 各个货品号的上架和下架数量

6)  使用 spark core/rdd 统计 各个货品号各个物品上架和下架的数量

7)  可以将上边计算的结果 spark streaming推送到 kafka的一个新的主题,使用前端工具实时展示,spark core 和 spark rdd推送到 mysql 使用 echarts展示。

本地部署。在本地开启虚拟机,启动zookeeper,kafka。运行项目代码对系统进行访问。

二、项目活动图

三、实时统计效果图

1)实时展示所有上架和下架各自的数量

2) 实时展示各个学期 各个货品号 各自的数量

3)实时展示各个学期 各个类别的物品数量

4) 实时展各个货品号各个物品上架和下架的数量

5) 使用echarts饼状图展示 各个货品号各个物品上架和下架的数量

四、详细代码

1)处理部分代码

package scala2

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.{Seconds, StreamingContext}

import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}



object Super_Total2 {
  def main(args: Array[String]): Unit = {

    //   1) 构建spqrkconf 本地运行,运行应用程序名称
    val conf = new SparkConf().setMaster("local[*]").setAppName("SerMarket2024")

    //2) 构建 sparkstreaming ---> streamingContext, 加载配置
    // StreamingContext 需要导入依赖
    // spark streaming 可以进行流式处理,微批次处理, 间隔2秒
    val ssc = new StreamingContext(conf, Seconds(2))

    // SPARK 输出红色 info信息  --》 error
    ssc.sparkContext.setLogLevel("error")

    //3) kafka 配置 broker , key value, group id,消费模式
    val kfkaParams = Map[String, Object](
      "bootstrap.servers" -> "mina:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "SuperMarket2025Total",
      "enable.auto.commit" -> (false: java.lang.Boolean)

    )

    //4) SPARK 链接 KAFKA 订阅,TOPIC, STREAMINGCONTEXT

    //topic name

    val topicName = Array("supermarket")
    val streamRdd = KafkaUtils.createDirectStream[String, String](
      ssc,
      PreferConsistent,
      Subscribe[String, String](topicName, kfkaParams)
    )


    //data process
//Super_IdProStuCou data process
    val resIdProStuCou = streamRdd.map(_.value())
    val resultIdProStuCou = resIdProStuCou.flatMap(line => {
      val parts = line.split("\t")
      if (parts.length > 4) {
        val ID = parts(0).trim
        val status = parts(4).trim
        val name = parts(1).trim

        if ("1995922".equals(ID)) {
          if ("Nayeon".equals(name)) {

            if ("U".equals(status)) List(("1995922NayeonU上架", parts(2).toInt))
            else if ("D".equals(status)) List(("1995922NayeonD下架", parts(2).toInt - parts(3).toInt))
            else Nil
          }
          else if ("Jeongyeon".equals(name)) {

            if ("U".equals(status)) List(("1995922JeongyeonU上架", parts(2).toInt))
            else if ("D".equals(status)) List(("1995922JeongyeonD下架", parts(2).toInt - parts(3).toInt))
            else Nil
          }
          else Nil
        }
        else if ("1996119".equals(ID)) {

          if ("Momo".equals(name)) {

            if ("U".equals(status)) List(("1996119MomoU上架", parts(2).toInt))
            else if ("D".equals(status)) List(("1996119MomoD下架", parts(2).toInt - parts(3).toInt))
            else Nil
          }
          else if ("Sana".equals(name)) {

            if ("U".equals(status)) List(("1996119SanaU上架", parts(2).toInt))
            else if ("D".equals(status)) List(("1996119SanaD下架", parts(2).toInt - parts(3).toInt))
            else Nil
          }
          else if ("Mina".equals(name)) {

            if ("U".equals(status)) List(("1996119MinaU上架", parts(2).toInt))
            else if ("D".equals(status)) List(("1996119MinaD下架", parts(2).toInt - parts(3).toInt))
            else Nil
          }
          else Nil
        }
        else if ("1997201".equals(ID)) {

          if ("Jihyo".equals(name)) {

            if ("U".equals(status)) List(("1997201JihyoU上架", parts(2).toInt))
            else if ("D".equals(status)) List(("1997201JihyoD下架", parts(2).toInt - parts(3).toInt))
            else Nil
          }
          else if ("Dahyun".equals(name)) {

            if ("U".equals(status)) List(("1997201DahyunU上架", parts(2).toInt))
            else if ("D".equals(status)) List(("1997201DahyunD下架", parts(2).toInt - parts(3).toInt))
            else Nil
          }
          else Nil
        }
        else if ("1999423".equals(ID)) {

          if ("Chaeyoung".equals(name)) {

            if ("U".equals(status)) List(("1999423ChaeyoungU上架", parts(2).toInt))
            else if ("D".equals(status)) List(("1999423ChaeyoungD下架", parts(2).toInt - parts(3).toInt))
            else Nil
          }
          else if ("Tzuyu".equals(name)) {

            if ("U".equals(status)) List(("1999423TzuyuU上架", parts(2).toInt))
            else if ("D".equals(status)) List(("1999423TzuyuD下架", parts(2).toInt - parts(3).toInt))
            else Nil
          }
          else Nil
        }
        else if ("2024123".equals(ID)) {

          if ("Zico".equals(name)) {

            if ("U".equals(status)) List(("2024123ZicoU上架", parts(2).toInt))
            else if ("D".equals(status)) List(("2024123ZicoD下架", parts(2).toInt - parts(3).toInt))
            else Nil
          }

          else Nil
        }
        else Nil
      } else Nil


    }).reduceByKeyAndWindow(_ + _,
      Seconds(4),
      Seconds(4)
    )


    //Super_INOUT data process
    val resINOUT = streamRdd.map(_.value())
    val resultINOUT = resINOUT.flatMap(line => {
      val parts = line.split("\t")
      if (parts.length > 4) {
        val status = parts(4).trim
        if ("U".equals(status)) List(("U上架", parts(2).toInt))
        else if ("D".equals(status)) List(("D下架", parts(2).toInt - parts(3).toInt))
        else Nil
      } else Nil


    }).reduceByKeyAndWindow(_ + _, Seconds(4), Seconds(4))

    //Super_ProCla data process
    val resProCla = streamRdd.map(_.value())
    val resultProCla = resProCla.flatMap(line => {
      val parts = line.split("\t")
      if (parts.length > 4) {
        val Cla = parts(1).trim
        if ("Zico".equals(Cla)) List(("货品类Zico", parts(2).toInt - parts(3).toInt))
        else if ("Tzuyu".equals(Cla)) List(("货品类Tzuyu", parts(2).toInt - parts(3).toInt))
        else if ("Chaeyoung".equals(Cla)) List(("货品类Chaeyoung", parts(2).toInt - parts(3).toInt))
        else if ("Dahyun".equals(Cla)) List(("货品类Dahyun", parts(2).toInt - parts(3).toInt))
        else if ("Mina".equals(Cla)) List(("货品类Mina", parts(2).toInt - parts(3).toInt))
        else if ("Jihyo".equals(Cla)) List(("货品类Jihyo", parts(2).toInt - parts(3).toInt))
        else if ("Sana".equals(Cla)) List(("货品类Sana", parts(2).toInt - parts(3).toInt))
        else if ("Momo".equals(Cla)) List(("货品类Momo", parts(2).toInt - parts(3).toInt))
        else if ("Jeongyeon".equals(Cla)) List(("货品类Jeongyeon", parts(2).toInt - parts(3).toInt))
        else if ("Nayeon".equals(Cla)) List(("货品类Nayeon", parts(2).toInt - parts(3).toInt))

        else Nil
      } else Nil


    }).reduceByKeyAndWindow(_ + _, Seconds(4), Seconds(4))


    //Super_ProId data process
    val resProId = streamRdd.map(_.value())
    val resultProId = resProId.flatMap(line => {
      val parts = line.split("\t")
      if (parts.length > 4) {
        val ID = parts(0).trim
        if ("1995922".equals(ID)) List(("货品号1995922", parts(2).toInt - parts(3).toInt))
        else if ("1996119".equals(ID)) List(("货品号1996119", parts(2).toInt - parts(3).toInt))
        else if ("1997201".equals(ID)) List(("货品号1997201", parts(2).toInt - parts(3).toInt))
        else if ("1999423".equals(ID)) List(("货品号1999423", parts(2).toInt - parts(3).toInt))
        else if ("2024123".equals(ID)) List(("货品号2024123", parts(2).toInt - parts(3).toInt))
        else Nil
      } else Nil


    }).reduceByKeyAndWindow(_ + _, Seconds(4), Seconds(4))

    //Super_Sal data process
    val resSql = streamRdd.map(_.value())
    val resultSql = resSql.flatMap(line => {
      val parts = line.split("\t")
      if (parts.length > 4) {
        val ID = parts(0).trim
        val status = parts(4).trim

        if ("1995922".equals(ID)) {

          if ("U".equals(status)) List(("1995922U上架", parts(2).toInt))
          else if ("D".equals(status)) List(("1995922D下架", parts(2).toInt - parts(3).toInt))
          else Nil
        }
        else if ("1996119".equals(ID)) {

          if ("U".equals(status)) List(("1996119U上架", parts(2).toInt))
          else if ("D".equals(status)) List(("1996119D下架", parts(2).toInt - parts(3).toInt))
          else Nil
        }
        else if ("1997201".equals(ID)) {

          if ("U".equals(status)) List(("1997201U上架", parts(2).toInt))
          else if ("D".equals(status)) List(("1997201D下架", parts(2).toInt - parts(3).toInt))
          else Nil
        }
        else if ("1999423".equals(ID)) {

          if ("U".equals(status)) List(("1999423U上架", parts(2).toInt))
          else if ("D".equals(status)) List(("1999423D下架", parts(2).toInt - parts(3).toInt))
          else Nil
        }
        else if ("2024123".equals(ID)) {

          if ("U".equals(status)) List(("2024123U上架", parts(2).toInt))
          else if ("D".equals(status)) List(("2024123D下架", parts(2).toInt - parts(3).toInt))
          else Nil
        }
        else Nil
      } else Nil


    }).reduceByKeyAndWindow(_ + _,
      Seconds(4),
      Seconds(4)
    )










    //data store
//Super_IdProStuCou data store
    resultIdProStuCou.foreachRDD(
      x => {
        println("--------各个货品号各个物品上下架IPSC------")
        x.foreach(
          obj => {
            println(obj.toString)

            //创建新的客户端
            val property = new java.util.Properties()
            property.put("bootstrap.servers", "mina:9092")
            property.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
            property.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")


            val producer = new KafkaProducer[String, String](property)

            //spark 链接 KAFKA
            producer.send(new ProducerRecord[String, String]("supermarketIdProStuCou", obj.toString))
            //关闭
            producer.close()
          }

          //结果打印

          //将数据发送到KAFKA 的TOPIC 当中 ,发送到 TOPIC TEST1
          // 1)构建
        )
      }
    )





    //Super_INOUT data store
    resultINOUT.foreachRDD(
      x => {
        println("--------所有上架下架INOUT------")
        x.foreach(
          obj => {
            println(obj)


            val property = new java.util.Properties()
            property.put("bootstrap.servers", "mina:9092")
            property.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
            property.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")


            val producer = new KafkaProducer[String, String](property)

            //spark 链接 KAFKA
            producer.send(new ProducerRecord[String, String]("supermarketINOUT", obj.toString))
            //关闭
            producer.close()
          }

          //结果打印

          //将数据发送到KAFKA 的TOPIC 当中 ,发送到 TOPIC TEST1
          // 1)构建
        )
      }
    )

    //Super_ProCla data store
    resultProCla.foreachRDD(
      x => {
        println("--------各个货品类CLASS------")
        x.foreach(
          obj => {
            println(obj)

            //创建新的客户端
            val property = new java.util.Properties()
            property.put("bootstrap.servers", "mina:9092")
            property.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
            property.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")


            val producer = new KafkaProducer[String, String](property)

            //spark 链接 KAFKA
            producer.send(new ProducerRecord[String, String]("supermarketProCla", obj.toString))
            //关闭
            producer.close()
          }

          //结果打印

          //将数据发送到KAFKA 的TOPIC 当中 ,发送到 TOPIC TEST1
          // 1)构建
        )
      }
    )


    //Super_ProId data store
    resultProId.foreachRDD(
      x => {
        println("--------各个货品号ID------")
        x.foreach(
          obj => {
            println(obj)

            //创建新的客户端
            val property = new java.util.Properties()
            property.put("bootstrap.servers", "mina:9092")
            property.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
            property.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")


            val producer = new KafkaProducer[String, String](property)

            //spark 链接 KAFKA
            producer.send(new ProducerRecord[String, String]("supermarketProId", obj.toString))
            //关闭
            producer.close()
          }

          //结果打印

          //将数据发送到KAFKA 的TOPIC 当中 ,发送到 TOPIC TEST1
          // 1)构建
        )
      }
    )

//Super_SCon data store
    streamRdd.foreachRDD {

      x =>
        if (!x.isEmpty()) {
          println("--------读取数据是SCON------")
          val records = x.map(_.value())
          records.foreach(println)
          //创建新的客户端
          val property = new java.util.Properties()
          property.put("bootstrap.servers", "mina:9092")
          property.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
          property.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")


          val producer = new KafkaProducer[String, String](property)

          //spark 链接 KAFKA
          producer.send(new ProducerRecord[String, String]("supermarketRead", records.toString))
          //关闭
          producer.close()
        }
      //结果打印
    }



    //Super_Sql data store
    val spSessSql = SparkSession.builder
      .appName("superMarketSql")
      .master("local[*]")
      .getOrCreate()

    import spSessSql.implicits._

    /*x =>
           if(!x.isEmpty()){
             val df: DataFrame = x.toDF("ID", "U上架", "D下架")
             df.show()
           }*/
    resultSql.foreachRDD {

      x =>
        if (!x.isEmpty()) {
          val df: DataFrame = x.toDF("IDSTATUS", "COUNT")

          df.show()



          // DB
          df.write.format("jdbc")
            .option("url", "jdbc:mysql://127.0.0.2:3306/supermarketsql?characterEncoding=UTF8&sueSSL=false")
            .option("dbtable", "supersqlnew_1")
            .option("user", "root")
            .option("password", "123456")
            .mode(SaveMode.Append)
            .save()
          // RDD


        }
    }



    //Super_SqlEcharts data store
    val spSessSqlEcharts = SparkSession.builder
      .appName("echarts")
      .master("local[*]")
      .getOrCreate()


    val resultSqlEcharts = resultIdProStuCou
    /*x =>
           if(!x.isEmpty()){
             val df: DataFrame = x.toDF("ID", "U上架", "D下架")
             df.show()
           }*/
    resultSqlEcharts.foreachRDD {

      x =>
        if (!x.isEmpty()) {
          import spSessSqlEcharts.implicits._
          val df: DataFrame = x.toDF("IDOWNSTATUS", "COUNT")

          df.show()



          // DB
          df.write.format("jdbc")
            .option("url", "jdbc:mysql://127.0.0.1:3306/echarts?characterEncoding=UTF8&sueSSL=false")
            .option("dbtable", "superecharts_1")
            .option("user", "root")
            .option("password", "123456")
            .mode(SaveMode.Append)
            .save()
          // RDD


        }
    }


    //6)开启SSC , 监控 KAFKA 数据
    ssc.start()
    ssc.awaitTermination() //监控KAFKA的数据

  }


}

2)flask可视化部分

1>图一

import json
from flask import Flask, render_template
from flask_socketio import SocketIO
from kafka import KafkaConsumer


app = Flask(__name__)
app.config['SECRET_KEY'] = 'secret!'
socketio = SocketIO(app)
thread = None

#***************************************************
consumer1 = KafkaConsumer(
    auto_offset_reset='earliest',
    bootstrap_servers = "mina:9092",
    group_id = "superMarketINOUT",
    )
consumer1.subscribe(["supermarketINOUT"])
#****************************************************

#**********************************************************
def background_thread2():
    id1 = 0
    id2 = 0


    for msg in consumer1:
        data_json = msg.value.decode('utf8')

        #  解析json数据
        # data_list = json.loads(data_json)
        obj = data_json.split(',')
        if 'U上架' in msg.value.decode('utf8'):
            id1 = obj[1]
        elif 'D下架' in msg.value.decode('utf8'):
            id2 = obj[1]

        else:
            continue
        result = str(id1) + ',' + str(id2)
        print(result)
        socketio.emit('test_message2', {'data': result})


# ***********************************************************************

@socketio.on('test_connect')
def connect(message):
    print(message)
    global thread
    if thread is None:
        thread = socketio.start_background_task(target=background_thread2)  # **************************
    socketio.emit('connected', {'data': 'Connected'})


@app.route("/")
def handle_mes():
    return render_template("INOUT.html")


if __name__ == '__main__':
    socketio.run(app ,port= 5000,debug=True)
import json
from flask import Flask, render_template
from flask_socketio import SocketIO
from kafka import KafkaConsumer


app = Flask(__name__)
app.config['SECRET_KEY'] = 'secret!'
socketio = SocketIO(app)
thread = None

#***************************************************
consumer1 = KafkaConsumer(
    auto_offset_reset='earliest',
    bootstrap_servers = "mina:9092",
    group_id = "superMarketINOUT",
    )
consumer1.subscribe(["supermarketINOUT"])
#****************************************************

#**********************************************************
def background_thread2():
    id1 = 0
    id2 = 0


    for msg in consumer1:
        data_json = msg.value.decode('utf8')

        #  解析json数据
        # data_list = json.loads(data_json)
        obj = data_json.split(',')
        if 'U上架' in msg.value.decode('utf8'):
            id1 = obj[1]
        elif 'D下架' in msg.value.decode('utf8'):
            id2 = obj[1]

        else:
            continue
        result = str(id1) + ',' + str(id2)
        print(result)
        socketio.emit('test_message2', {'data': result})


# ***********************************************************************

@socketio.on('test_connect')
def connect(message):
    print(message)
    global thread
    if thread is None:
        thread = socketio.start_background_task(target=background_thread2)  # **************************
    socketio.emit('connected', {'data': 'Connected'})


@app.route("/")
def handle_mes():
    return render_template("INOUT.html")


if __name__ == '__main__':
    socketio.run(app ,port= 5000,debug=True)

2>图二

import json
from flask import Flask, render_template
from flask_socketio import SocketIO
from kafka import KafkaConsumer


app = Flask(__name__)
app.config['SECRET_KEY'] = 'secret!'
socketio = SocketIO(app)
thread = None

#***************************************************
consumer1 = KafkaConsumer(
    auto_offset_reset='earliest',
    bootstrap_servers = "mina:9092",
    group_id = "superMarketproId",
    )
consumer1.subscribe(["supermarketProId"])
#****************************************************

#**********************************************************
def background_thread2():
    id1 = 0
    id2 = 0
    id3 = 0
    id4 = 0
    id5 = 0


    for msg in consumer1:
        data_json = msg.value.decode('utf8')
        print('-----当前数据----'+ data_json )
        #  解析json数据
        #data_list = json.loads(data_json)
        obj = data_json.split(',')
        if '货品号1995922' in msg.value.decode('utf8'):
            id1 = obj[1]
        elif '货品号1996119' in msg.value.decode('utf8'):
            id2 = obj[1]
        elif '货品号1997201' in msg.value.decode('utf8'):
            id3 = obj[1]
        elif '货品号1999423' in msg.value.decode('utf8'):
            id4 = obj[1]
        elif '货品号2024123' in msg.value.decode('utf8'):
            id5 = obj[1]
        else:
            continue
        result = str(id1) + ',' + str(id2) + ',' + str(id3) + ',' + str(id4)+ ',' + str(id5)
        print(result)
        socketio.emit('test_message2',{'data':result})
#***********************************************************************

@socketio.on('test_connect')
def connect(message):
    print(message)
    global thread
    if thread is None:
        thread = socketio.start_background_task(target=background_thread2)#**************************
    socketio.emit('connected', {'data': 'Connected'})


@app.route("/")
def handle_mes():
    return render_template("proId.html")


if __name__ == '__main__':
    socketio.run(app,port=5001,debug=True)
import json
from flask import Flask, render_template
from flask_socketio import SocketIO
from kafka import KafkaConsumer


app = Flask(__name__)
app.config['SECRET_KEY'] = 'secret!'
socketio = SocketIO(app)
thread = None

#***************************************************
consumer1 = KafkaConsumer(
    auto_offset_reset='earliest',
    bootstrap_servers = "mina:9092",
    group_id = "superMarketproId",
    )
consumer1.subscribe(["supermarketProId"])
#****************************************************

#**********************************************************
def background_thread2():
    id1 = 0
    id2 = 0
    id3 = 0
    id4 = 0
    id5 = 0


    for msg in consumer1:
        data_json = msg.value.decode('utf8')
        print('-----当前数据----'+ data_json )
        #  解析json数据
        #data_list = json.loads(data_json)
        obj = data_json.split(',')
        if '货品号1995922' in msg.value.decode('utf8'):
            id1 = obj[1]
        elif '货品号1996119' in msg.value.decode('utf8'):
            id2 = obj[1]
        elif '货品号1997201' in msg.value.decode('utf8'):
            id3 = obj[1]
        elif '货品号1999423' in msg.value.decode('utf8'):
            id4 = obj[1]
        elif '货品号2024123' in msg.value.decode('utf8'):
            id5 = obj[1]
        else:
            continue
        result = str(id1) + ',' + str(id2) + ',' + str(id3) + ',' + str(id4)+ ',' + str(id5)
        print(result)
        socketio.emit('test_message2',{'data':result})
#***********************************************************************

@socketio.on('test_connect')
def connect(message):
    print(message)
    global thread
    if thread is None:
        thread = socketio.start_background_task(target=background_thread2)#**************************
    socketio.emit('connected', {'data': 'Connected'})


@app.route("/")
def handle_mes():
    return render_template("proId.html")


if __name__ == '__main__':
    socketio.run(app,port=5001,debug=True)

3>图三

import json
from flask import Flask, render_template
from flask_socketio import SocketIO
from kafka import KafkaConsumer


app = Flask(__name__)
app.config['SECRET_KEY'] = 'secret!'
socketio = SocketIO(app)
thread = None

#***************************************************
consumer1 = KafkaConsumer(
    auto_offset_reset='earliest',
    bootstrap_servers = "mina:9092",
    group_id = "superMarketproCla",
    )
consumer1.subscribe(["supermarketProCla"])
#****************************************************

#**********************************************************
def background_thread2():
    id1 = 0
    id2 = 0
    id3 = 0
    id4 = 0
    id5 = 0
    id6 = 0
    id7 = 0
    id8 = 0
    id9 = 0
    id10 = 0


    for msg in consumer1:
        data_json = msg.value.decode('utf8')
        print('-----当前数据----'+ data_json )
        #  解析json数据
        #data_list = json.loads(data_json)
        obj = data_json.split(',')
        if '货品类Zico' in msg.value.decode('utf8'):
            id1 = obj[1]
        elif '货品类Tzuyu' in msg.value.decode('utf8'):
            id2 = obj[1]
        elif '货品类Chaeyoung' in msg.value.decode('utf8'):
            id3 = obj[1]
        elif '货品类Dahyun' in msg.value.decode('utf8'):
            id4 = obj[1]
        elif '货品类Mina' in msg.value.decode('utf8'):
            id5 = obj[1]
        elif '货品类Jihyo' in msg.value.decode('utf8'):
            id6 = obj[1]
        elif '货品类Sana' in msg.value.decode('utf8'):
            id7 = obj[1]
        elif '货品类Momo' in msg.value.decode('utf8'):
            id8 = obj[1]
        elif '货品类Jeongyeon' in msg.value.decode('utf8'):
            id9 = obj[1]
        elif '货品类Nayeon' in msg.value.decode('utf8'):
            id10 = obj[1]
        else:
            continue
        result = str(id1) + ',' + str(id2) + ',' + str(id3) + ',' + str(id4)+ ',' + str(id5)+ ',' +str(id6) + ',' + str(id7) + ',' + str(id8) + ',' + str(id9)+ ',' + str(id10)
        print(result)
        socketio.emit('test_message2',{'data':result})
#***********************************************************************

@socketio.on('test_connect')
def connect(message):
    print(message)
    global thread
    if thread is None:
        thread = socketio.start_background_task(target=background_thread2)#**************************
    socketio.emit('connected', {'data': 'Connected'})


@app.route("/")
def handle_mes():
    return render_template("proCla.html")


if __name__ == '__main__':
    socketio.run(app,port=5002,debug=True)
import json
from flask import Flask, render_template
from flask_socketio import SocketIO
from kafka import KafkaConsumer


app = Flask(__name__)
app.config['SECRET_KEY'] = 'secret!'
socketio = SocketIO(app)
thread = None

#***************************************************
consumer1 = KafkaConsumer(
    auto_offset_reset='earliest',
    bootstrap_servers = "mina:9092",
    group_id = "superMarketproCla",
    )
consumer1.subscribe(["supermarketProCla"])
#****************************************************

#**********************************************************
def background_thread2():
    id1 = 0
    id2 = 0
    id3 = 0
    id4 = 0
    id5 = 0
    id6 = 0
    id7 = 0
    id8 = 0
    id9 = 0
    id10 = 0


    for msg in consumer1:
        data_json = msg.value.decode('utf8')
        print('-----当前数据----'+ data_json )
        #  解析json数据
        #data_list = json.loads(data_json)
        obj = data_json.split(',')
        if '货品类Zico' in msg.value.decode('utf8'):
            id1 = obj[1]
        elif '货品类Tzuyu' in msg.value.decode('utf8'):
            id2 = obj[1]
        elif '货品类Chaeyoung' in msg.value.decode('utf8'):
            id3 = obj[1]
        elif '货品类Dahyun' in msg.value.decode('utf8'):
            id4 = obj[1]
        elif '货品类Mina' in msg.value.decode('utf8'):
            id5 = obj[1]
        elif '货品类Jihyo' in msg.value.decode('utf8'):
            id6 = obj[1]
        elif '货品类Sana' in msg.value.decode('utf8'):
            id7 = obj[1]
        elif '货品类Momo' in msg.value.decode('utf8'):
            id8 = obj[1]
        elif '货品类Jeongyeon' in msg.value.decode('utf8'):
            id9 = obj[1]
        elif '货品类Nayeon' in msg.value.decode('utf8'):
            id10 = obj[1]
        else:
            continue
        result = str(id1) + ',' + str(id2) + ',' + str(id3) + ',' + str(id4)+ ',' + str(id5)+ ',' +str(id6) + ',' + str(id7) + ',' + str(id8) + ',' + str(id9)+ ',' + str(id10)
        print(result)
        socketio.emit('test_message2',{'data':result})
#***********************************************************************

@socketio.on('test_connect')
def connect(message):
    print(message)
    global thread
    if thread is None:
        thread = socketio.start_background_task(target=background_thread2)#**************************
    socketio.emit('connected', {'data': 'Connected'})


@app.route("/")
def handle_mes():
    return render_template("proCla.html")


if __name__ == '__main__':
    socketio.run(app,port=5002,debug=True)

4>图四

import json
from flask import Flask, render_template
from flask_socketio import SocketIO
from kafka import KafkaConsumer


app = Flask(__name__)
app.config['SECRET_KEY'] = 'secret!'
socketio = SocketIO(app)
thread = None

#***************************************************
consumer1 = KafkaConsumer(
    auto_offset_reset='earliest',
    bootstrap_servers = "mina:9092",
    group_id = "superMarketIdProStuCou",
    )
consumer1.subscribe(["supermarketIdProStuCou"])
#****************************************************

#**********************************************************
def background_thread2():
    id1 = 0
    id2 = 0
    id3 = 0
    id4 = 0
    id5 = 0
    id6 = 0
    id7 = 0
    id8 = 0
    id9 = 0
    id10 = 0
    id11 = 0
    id12 = 0
    id13 = 0
    id14 = 0
    id15 = 0
    id16 = 0
    id17 = 0
    id18 = 0
    id19 = 0
    id20 = 0


    for msg in consumer1:
        data_json = msg.value.decode('utf8')
        print('-----当前数据----'+ data_json )
        #  解析json数据
        #data_list = json.loads(data_json)
        obj = data_json.split(',')
        if '1995922NayeonU上架' in msg.value.decode('utf8'):
            id1 = obj[1]
        elif '1995922NayeonD下架' in msg.value.decode('utf8'):
            id2 = obj[1]
        elif '1995922JeongyeonU上架' in msg.value.decode('utf8'):
            id3 = obj[1]
        elif '1995922JeongyeonD下架' in msg.value.decode('utf8'):
            id4 = obj[1]
        elif '1996119MomoU上架' in msg.value.decode('utf8'):
            id5 = obj[1]
        elif '1996119MomoD下架' in msg.value.decode('utf8'):
            id6 = obj[1]
        elif '1996119SanaU上架' in msg.value.decode('utf8'):
            id7 = obj[1]
        elif '1996119SanaD下架' in msg.value.decode('utf8'):
            id8 = obj[1]
        elif '1996119MinaU上架' in msg.value.decode('utf8'):
            id9 = obj[1]
        elif '1996119MinaD下架' in msg.value.decode('utf8'):
            id10 = obj[1]
        elif '1997201JihyoU上架' in msg.value.decode('utf8'):
            id11 = obj[1]
        elif '1997201JihyoD下架' in msg.value.decode('utf8'):
            id12 = obj[1]
        elif '1997201DahyunU上架' in msg.value.decode('utf8'):
            id13 = obj[1]
        elif '1997201DahyunD下架' in msg.value.decode('utf8'):
            id14 = obj[1]
        elif '1999423ChaeyoungU上架' in msg.value.decode('utf8'):
            id15 = obj[1]
        elif '1999423ChaeyoungD下架' in msg.value.decode('utf8'):
            id16 = obj[1]
        elif '1999423TzuyuU上架' in msg.value.decode('utf8'):
            id17 = obj[1]
        elif '1999423TzuyuD下架' in msg.value.decode('utf8'):
            id18 = obj[1]
        elif '2024123ZicoU上架' in msg.value.decode('utf8'):
            id19 = obj[1]
        elif '2024123ZicoD下架' in msg.value.decode('utf8'):
            id20 = obj[1]
        else:
            continue
        result = str(id1) + ',' + str(id2) + ',' + str(id3) + ',' + str(id4)+ ',' + str(id5)+ ',' +str(id6) + ',' + str(id7) + ',' + str(id8) + ',' + str(id9)+ ',' + str(id10)+ ',' + str(id11) + ',' + str(id12) + ',' + str(id13) + ',' + str(id14)+ ',' + str(id15)+ ',' +str(id16) + ',' + str(id17) + ',' + str(id18) + ',' + str(id19)+ ',' + str(id20)
        print(result)
        socketio.emit('test_message2',{'data':result})
#***********************************************************************

@socketio.on('test_connect')
def connect(message):
    print(message)
    global thread
    if thread is None:
        thread = socketio.start_background_task(target=background_thread2)#**************************
    socketio.emit('connected', {'data': 'Connected'})


@app.route("/")
def handle_mes():
    return render_template("idProStuCou.html")


if __name__ == '__main__':
    socketio.run(app,host='127.0.0.1',port=5003,debug=True)
import json
from flask import Flask, render_template
from flask_socketio import SocketIO
from kafka import KafkaConsumer


app = Flask(__name__)
app.config['SECRET_KEY'] = 'secret!'
socketio = SocketIO(app)
thread = None

#***************************************************
consumer1 = KafkaConsumer(
    auto_offset_reset='earliest',
    bootstrap_servers = "mina:9092",
    group_id = "superMarketIdProStuCou",
    )
consumer1.subscribe(["supermarketIdProStuCou"])
#****************************************************

#**********************************************************
def background_thread2():
    id1 = 0
    id2 = 0
    id3 = 0
    id4 = 0
    id5 = 0
    id6 = 0
    id7 = 0
    id8 = 0
    id9 = 0
    id10 = 0
    id11 = 0
    id12 = 0
    id13 = 0
    id14 = 0
    id15 = 0
    id16 = 0
    id17 = 0
    id18 = 0
    id19 = 0
    id20 = 0


    for msg in consumer1:
        data_json = msg.value.decode('utf8')
        print('-----当前数据----'+ data_json )
        #  解析json数据
        #data_list = json.loads(data_json)
        obj = data_json.split(',')
        if '1995922NayeonU上架' in msg.value.decode('utf8'):
            id1 = obj[1]
        elif '1995922NayeonD下架' in msg.value.decode('utf8'):
            id2 = obj[1]
        elif '1995922JeongyeonU上架' in msg.value.decode('utf8'):
            id3 = obj[1]
        elif '1995922JeongyeonD下架' in msg.value.decode('utf8'):
            id4 = obj[1]
        elif '1996119MomoU上架' in msg.value.decode('utf8'):
            id5 = obj[1]
        elif '1996119MomoD下架' in msg.value.decode('utf8'):
            id6 = obj[1]
        elif '1996119SanaU上架' in msg.value.decode('utf8'):
            id7 = obj[1]
        elif '1996119SanaD下架' in msg.value.decode('utf8'):
            id8 = obj[1]
        elif '1996119MinaU上架' in msg.value.decode('utf8'):
            id9 = obj[1]
        elif '1996119MinaD下架' in msg.value.decode('utf8'):
            id10 = obj[1]
        elif '1997201JihyoU上架' in msg.value.decode('utf8'):
            id11 = obj[1]
        elif '1997201JihyoD下架' in msg.value.decode('utf8'):
            id12 = obj[1]
        elif '1997201DahyunU上架' in msg.value.decode('utf8'):
            id13 = obj[1]
        elif '1997201DahyunD下架' in msg.value.decode('utf8'):
            id14 = obj[1]
        elif '1999423ChaeyoungU上架' in msg.value.decode('utf8'):
            id15 = obj[1]
        elif '1999423ChaeyoungD下架' in msg.value.decode('utf8'):
            id16 = obj[1]
        elif '1999423TzuyuU上架' in msg.value.decode('utf8'):
            id17 = obj[1]
        elif '1999423TzuyuD下架' in msg.value.decode('utf8'):
            id18 = obj[1]
        elif '2024123ZicoU上架' in msg.value.decode('utf8'):
            id19 = obj[1]
        elif '2024123ZicoD下架' in msg.value.decode('utf8'):
            id20 = obj[1]
        else:
            continue
        result = str(id1) + ',' + str(id2) + ',' + str(id3) + ',' + str(id4)+ ',' + str(id5)+ ',' +str(id6) + ',' + str(id7) + ',' + str(id8) + ',' + str(id9)+ ',' + str(id10)+ ',' + str(id11) + ',' + str(id12) + ',' + str(id13) + ',' + str(id14)+ ',' + str(id15)+ ',' +str(id16) + ',' + str(id17) + ',' + str(id18) + ',' + str(id19)+ ',' + str(id20)
        print(result)
        socketio.emit('test_message2',{'data':result})
#***********************************************************************

@socketio.on('test_connect')
def connect(message):
    print(message)
    global thread
    if thread is None:
        thread = socketio.start_background_task(target=background_thread2)#**************************
    socketio.emit('connected', {'data': 'Connected'})


@app.route("/")
def handle_mes():
    return render_template("idProStuCou.html")


if __name__ == '__main__':
    socketio.run(app,host='127.0.0.1',port=5003,debug=True)

五、详细部分阐述

1)KAFKA生产者


package org.example;

        import org.apache.kafka.clients.producer.KafkaProducer;
        import org.apache.kafka.clients.producer.ProducerConfig;
        import org.apache.kafka.clients.producer.ProducerRecord;

        import java.util.Properties;
        import java.util.Random;

public class Super_Prod {
    public static void main(String[] args) {

        // 1. 创建 kafka 生产者的配置对象
        Properties properties = new Properties();
        // 2. 给 kafka 配置对象添加配置信息:bootstrap.servers
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.226.129:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");


        // 每隔5秒推送 5条数据
        while (true) {
            // 3. 创建 kafka 生产者对象
            KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
            System.out.println("开始发送数据==========================");
            try {
                // 4. 调用 send 方法,发送消息
                for (int i = 0; i < 10; i++) {
                    Random rd=new Random();  //创建实例
                    // 品类
                    String[] categoryArray = new String[]{"1995922\tNayeon\t92\t29\tU","1995922\tJeongyeon\t82\t28\tD","1996119\tMomo\t28\t2\tU","1996119\tSana\t94\t49\tU","1997201\tJihyo\t59\t5\tU","1996119\tMina\t69\t26\tU", "1997201\tDahyun\t77\t37\tU", "1999423\tChaeyoung\t98\t88\tD", "1999423\tTzuyu\t99\t25\tD", "2024123\tZico\t100\t10\tD"};
                    int radom =rd.nextInt(10);
                    String category  = categoryArray[radom];
                    //String category  = ""1995922\tNayeon\t92\t29\tU","1995922\tJeongyeon\t82\t28\tD","1996119\tMomo\t28\t2\tU","1996119\tSana\t94\t49\tU","1997201\tJihyo\t59\t5\tU","1996119\tMina\t69\t26\tU", "1997201\tDahyun\t77\t37\tU", "1999423\tChaeyoung\t98\t88\tD", "1999423\tTzuyu\t99\t25\tD", "2024123\tZico\t100\t10\tD"";
                    // 模拟数据
                    System.out.println("发送数据为" + category);

                    try {
                        kafkaProducer.send(new ProducerRecord<String, String>("supermarket", category));

                    }catch (Exception e){
                        e.printStackTrace();
                    }
                }
                Thread.sleep(5000);

            } catch (InterruptedException e) {
                e.printStackTrace();
            }


            // 5. 关闭资源
            kafkaProducer.close();

        }

    }
}

2)创建消费者读取相应主题的数据

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.{Seconds, StreamingContext}

object Super_SCon {
  def main(args: Array[String]):Unit ={
    /* spark streaming 实现 kafka 的消费者
    1) 构建spqrkconf 本地运行,运行应用程序名称
    2) 构建 sparkstreaming ---> streamingContext, 加载配置
    3) kafka 配置 broker , key value, group id,消费模式
    4) SPARK 链接 KAFKA 订阅,TOPIC, STREAMINGCONTEXT
    5) 循环的形式  打印  处理
    6) 开启SSC , 监控 KAFKA 数据
         */

    //   1) 构建spqrkconf 本地运行,运行应用程序名称
    val conf = new SparkConf().setMaster("local[*]").setAppName("SerMarket2024")

    //2) 构建 sparkstreaming ---> streamingContext, 加载配置
    // StreamingContext 需要导入依赖
    // spark streaming 可以进行流式处理,微批次处理, 间隔2秒
    val ssc = new StreamingContext( conf, Seconds(2))

    // SPARK 输出红色 info信息  --》 error
    ssc.sparkContext.setLogLevel("error")

    //3) kafka 配置 broker , key value, group id,消费模式
    val kfkaParams = Map[String, Object](
      "bootstrap.servers" ->"192.168.226.129:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "SuperMarket2024READ",
      "enable.auto.commit" -> (false:java.lang.Boolean)

    )

    //4) SPARK 链接 KAFKA 订阅,TOPIC, STREAMINGCONTEXT

    //topic name

    val topicName = Array("supermarket")
    val streamRdd = KafkaUtils.createDirectStream[String,String](
      ssc,
      PreferConsistent,
      Subscribe[String,String](topicName,kfkaParams)
    )

    /* // 返回KAFKA 返回的 STREAMRDD (一段有时间间隔的RDD)
     streamRdd.foreachRDD(
       x => {
         if (! x.isEmpty()){   // 判断是否为空, ! 相反
           val line = x.map(_.value())  //
           line.foreach(println)
         }
       }
     )*/

    streamRdd.foreachRDD{

      x => if(!x.isEmpty()) {
        println("--------读取数据是SCON------")
        val records = x.map(_.value())
        records.foreach(println)
            //创建新的客户端
            val property = new java.util.Properties()
            property.put("bootstrap.servers", "192.168.226.129:9092")
            property.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
            property.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")



            val producer = new KafkaProducer[String,String](property)

            //spark 链接 KAFKA
            producer.send(new ProducerRecord[String,String]("supermarketRead",records.toString))
            //关闭
            producer.close()
          }
          //结果打印
      }

    //将数据发送到KAFKA 的TOPIC 当中 ,发送到 TOPIC TEST1
    // 1)构建



    //6)开启SSC , 监控 KAFKA 数据
    ssc.start()
    ssc.awaitTermination()  //监控KAFKA的数据

  }

}

3)统计 所有上架和下架各自的数量

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.{Seconds, StreamingContext}

object Super_INOUT {
  def main(args: Array[String]):Unit = {
    /* spark streaming 实现 kafka 的消费者
    1) 构建spqrkconf 本地运行,运行应用程序名称
    2) 构建 sparkstreaming ---> streamingContext, 加载配置
    3) kafka 配置 broker , key value, group id,消费模式
    4) SPARK 链接 KAFKA 订阅,TOPIC, STREAMINGCONTEXT
    5) 循环的形式  打印  处理
    6) 开启SSC , 监控 KAFKA 数据
         */

    //   1) 构建spqrkconf 本地运行,运行应用程序名称
    val conf = new SparkConf().setMaster("local[*]").setAppName("SerMarket2024")

    //2) 构建 sparkstreaming ---> streamingContext, 加载配置
    // StreamingContext 需要导入依赖
    // spark streaming 可以进行流式处理,微批次处理, 间隔2秒
    val ssc = new StreamingContext(conf, Seconds(2))

    // SPARK 输出红色 info信息  --》 error
    ssc.sparkContext.setLogLevel("error")

    //3) kafka 配置 broker , key value, group id,消费模式
    val kfkaParams = Map[String, Object](
      "bootstrap.servers" -> "192.168.226.129:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "SuperMarket2024INOUT",
      "enable.auto.commit" -> (false: java.lang.Boolean)

    )

    //4) SPARK 链接 KAFKA 订阅,TOPIC, STREAMINGCONTEXT

    //topic name

    val topicName = Array("supermarket")
    val streamRdd = KafkaUtils.createDirectStream[String, String](
      ssc,
      PreferConsistent,
      Subscribe[String, String](topicName, kfkaParams)
    )

    /* // 返回KAFKA 返回的 STREAMRDD (一段有时间间隔的RDD)
     streamRdd.foreachRDD(
       x => {
         if (! x.isEmpty()){   // 判断是否为空, ! 相反
           val line = x.map(_.value())  //
           line.foreach(println)
         }
       }
     )*/

    /*val inoutStream = streamRdd.map(x => x.value().split("\t"))

          val statusCounts = inoutStream.flatMap(x =>{
            if(x.length >= 5){
              val status = x(4).trim
              if("U".equals(status)) List(("U上架",1))
              else if("D".equals(status)) List(("D下架",1))
              else Nil
            }else Nil
          }).reduceByKey(_+_)

    statusCounts.foreachRDD(x => {
      println("--------上架和下架统计-----------")
      val counts = x.collect()
      counts.foreach { case (status, count) =>
        println(s"$status:$count")
      }
    })*/
    val res = streamRdd.map(_.value())
    val result = res.flatMap(line=>{
      val parts = line.split("\t")
      if(parts.length>4){
        val status = parts(4).trim
        if ("U".equals(status)) List(("U上架", parts(2).toInt))
        else if ("D".equals(status)) List(("D下架", parts(2).toInt-parts(3).toInt))
        else Nil
      } else Nil


    }).reduceByKeyAndWindow(_+_,Seconds(4), Seconds(4))
    result.foreachRDD(
      x => {
        println("--------所有上架下架INOUT------")
        x.foreach(
          obj => {
            println(obj)

            //创建新的客户端
            val property = new java.util.Properties()
            property.put("bootstrap.servers", "192.168.226.129:9092")
            property.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
            property.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")


            val producer = new KafkaProducer[String, String](property)

            //spark 链接 KAFKA
            producer.send(new ProducerRecord[String, String]("supermarketINOUT", obj.toString))
            //关闭
            producer.close()}

            //结果打印

      //将数据发送到KAFKA 的TOPIC 当中 ,发送到 TOPIC TEST1
      // 1)构建
            )}
  )

      //6)开启SSC , 监控 KAFKA 数据
      ssc.start()
      ssc.awaitTermination() //监控KAFKA的数据

    }

      }

4)各个货品号 各自的数量

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.{Seconds, StreamingContext}

object Super_ProId {
  def main(args: Array[String]):Unit = {
    /* spark streaming 实现 kafka 的消费者
    1) 构建spqrkconf 本地运行,运行应用程序名称
    2) 构建 sparkstreaming ---> streamingContext, 加载配置
    3) kafka 配置 broker , key value, group id,消费模式
    4) SPARK 链接 KAFKA 订阅,TOPIC, STREAMINGCONTEXT
    5) 循环的形式  打印  处理
    6) 开启SSC , 监控 KAFKA 数据
         */

    //   1) 构建spqrkconf 本地运行,运行应用程序名称
    val conf = new SparkConf().setMaster("local[*]").setAppName("SerMarket2024")

    //2) 构建 sparkstreaming ---> streamingContext, 加载配置
    // StreamingContext 需要导入依赖
    // spark streaming 可以进行流式处理,微批次处理, 间隔2秒
    val ssc = new StreamingContext(conf, Seconds(2))

    // SPARK 输出红色 info信息  --》 error
    ssc.sparkContext.setLogLevel("error")

    //3) kafka 配置 broker , key value, group id,消费模式
    val kfkaParams = Map[String, Object](
      "bootstrap.servers" -> "192.168.226.129:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "SuperMarket2024ProId",
      "enable.auto.commit" -> (false: java.lang.Boolean)

    )

    //4) SPARK 链接 KAFKA 订阅,TOPIC, STREAMINGCONTEXT

    //topic name

    val topicName = Array("supermarket")
    val streamRdd = KafkaUtils.createDirectStream[String, String](
      ssc,
      PreferConsistent,
      Subscribe[String, String](topicName, kfkaParams)
    )


    val res = streamRdd.map(_.value())
    val result = res.flatMap(line=>{
      val parts = line.split("\t")
      if(parts.length>4){
        val ID = parts(0).trim
        if ("1995922".equals(ID)) List(("货品号1995922", parts(2).toInt-parts(3).toInt))
        else if ("1996119".equals(ID)) List(("货品号1996119", parts(2).toInt-parts(3).toInt))
        else if ("1997201".equals(ID)) List(("货品号1997201", parts(2).toInt-parts(3).toInt))
        else if ("1999423".equals(ID)) List(("货品号1999423", parts(2).toInt-parts(3).toInt))
        else if ("2024123".equals(ID)) List(("货品号2024123", parts(2).toInt-parts(3).toInt))
        else Nil
      } else Nil


    }).reduceByKeyAndWindow(_+_,Seconds(4), Seconds(4))
    result.foreachRDD(
      x => {
        println("--------各个货品号ID------")
        x.foreach(
          obj => {
            println(obj)

            //创建新的客户端
            val property = new java.util.Properties()
            property.put("bootstrap.servers", "192.168.226.129:9092")
            property.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
            property.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")


            val producer = new KafkaProducer[String, String](property)

            //spark 链接 KAFKA
            producer.send(new ProducerRecord[String, String]("supermarketProId", obj.toString))
            //关闭
            producer.close()}

          //结果打印

          //将数据发送到KAFKA 的TOPIC 当中 ,发送到 TOPIC TEST1
          // 1)构建
        )}
    )

    //6)开启SSC , 监控 KAFKA 数据
    ssc.start()
    ssc.awaitTermination() //监控KAFKA的数据

  }

}

5)各个类别的物品数量

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.{Seconds, StreamingContext}

object Super_ProCla {
  def main(args: Array[String]):Unit = {
    /* spark streaming 实现 kafka 的消费者
    1) 构建spqrkconf 本地运行,运行应用程序名称
    2) 构建 sparkstreaming ---> streamingContext, 加载配置
    3) kafka 配置 broker , key value, group id,消费模式
    4) SPARK 链接 KAFKA 订阅,TOPIC, STREAMINGCONTEXT
    5) 循环的形式  打印  处理
    6) 开启SSC , 监控 KAFKA 数据
         */

    //   1) 构建spqrkconf 本地运行,运行应用程序名称
    val conf = new SparkConf().setMaster("local[*]").setAppName("SerMarket2024")

    //2) 构建 sparkstreaming ---> streamingContext, 加载配置
    // StreamingContext 需要导入依赖
    // spark streaming 可以进行流式处理,微批次处理, 间隔2秒
    val ssc = new StreamingContext(conf, Seconds(2))

    // SPARK 输出红色 info信息  --》 error
    ssc.sparkContext.setLogLevel("error")

    //3) kafka 配置 broker , key value, group id,消费模式
    val kfkaParams = Map[String, Object](
      "bootstrap.servers" -> "192.168.226.129:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "SuperMarket2024ProCla",
      "enable.auto.commit" -> (false: java.lang.Boolean)

    )

    //4) SPARK 链接 KAFKA 订阅,TOPIC, STREAMINGCONTEXT

    //topic name

    val topicName = Array("supermarket")
    val streamRdd = KafkaUtils.createDirectStream[String, String](
      ssc,
      PreferConsistent,
      Subscribe[String, String](topicName, kfkaParams)
    )

    /* // 返回KAFKA 返回的 STREAMRDD (一段有时间间隔的RDD)
     streamRdd.foreachRDD(
       x => {
         if (! x.isEmpty()){   // 判断是否为空, ! 相反
           val line = x.map(_.value())  //
           line.foreach(println)
         }
       }
     )*/

    /*val inoutStream = streamRdd.map(x => x.value().split("\t"))

          val statusCounts = inoutStream.flatMap(x =>{
            if(x.length >= 5){
              val status = x(4).trim
              if("U".equals(status)) List(("U上架",1))
              else if("D".equals(status)) List(("D下架",1))
              else Nil
            }else Nil
          }).reduceByKey(_+_)

    statusCounts.foreachRDD(x => {
      println("--------上架和下架统计-----------")
      val counts = x.collect()
      counts.foreach { case (status, count) =>
        println(s"$status:$count")
      }
    })*/
    val res = streamRdd.map(_.value())
    val result = res.flatMap(line=>{
      val parts = line.split("\t")
      if(parts.length>4){
        val Cla = parts(1).trim
        if ("Zico".equals(Cla)) List(("货品类Zico", parts(2).toInt-parts(3).toInt))
        else if ("Tzuyu".equals(Cla)) List(("货品类Tzuyu", parts(2).toInt-parts(3).toInt))
        else if ("Chaeyoung".equals(Cla)) List(("货品类Chaeyoung", parts(2).toInt-parts(3).toInt))
        else if ("Dahyun".equals(Cla)) List(("货品类Dahyun", parts(2).toInt-parts(3).toInt))
        else if ("Mina".equals(Cla)) List(("货品类Mina", parts(2).toInt-parts(3).toInt))
        else if ("Jihyo".equals(Cla)) List(("货品类Jihyo", parts(2).toInt-parts(3).toInt))
        else if ("Sana".equals(Cla)) List(("货品类Sana", parts(2).toInt-parts(3).toInt))
        else if ("Momo".equals(Cla)) List(("货品类Momo", parts(2).toInt-parts(3).toInt))
        else if ("Jeongyeon".equals(Cla)) List(("货品类Jeongyeon", parts(2).toInt-parts(3).toInt))
        else if ("Nayeon".equals(Cla)) List(("货品类Nayeon", parts(2).toInt-parts(3).toInt))

        else Nil
      } else Nil


    }).reduceByKeyAndWindow(_+_,Seconds(4), Seconds(4))
    result.foreachRDD(
      x => {
        println("--------各个货品类CLASS------")
        x.foreach(
          obj => {
            println(obj)

            //创建新的客户端
            val property = new java.util.Properties()
            property.put("bootstrap.servers", "192.168.226.129:9092")
            property.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
            property.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")


            val producer = new KafkaProducer[String, String](property)

            //spark 链接 KAFKA
            producer.send(new ProducerRecord[String, String]("supermarketProCla", obj.toString))
            //关闭
            producer.close()}

          //结果打印

          //将数据发送到KAFKA 的TOPIC 当中 ,发送到 TOPIC TEST1
          // 1)构建
        )}
    )

    //6)开启SSC , 监控 KAFKA 数据
    ssc.start()
    ssc.awaitTermination() //监控KAFKA的数据

  }

}

6)各个货品号的上架和下架数量



import org.apache.spark.sql.{DataFrame,  SaveMode, SparkSession}
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.kafka.common.serialization.StringDeserializer

import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}




object Super_Sql {

  def main(args: Array[String]): Unit = {
    val isLocal = if (args.length > 0) args(0).toBoolean else true // 提供默认值为 true

    val conf = new SparkConf().setAppName("SerMarket2024")
    if (isLocal) {
      conf.setMaster("local[*]")
    }

    val ssc = new StreamingContext(conf, Seconds(2))

    // SPARK 输出红色 info信息  --》 error
    ssc.sparkContext.setLogLevel("error")

    //3) kafka 配置 broker , key value, group id,消费模式
    val kfkaParams = Map[String, Object](
      "bootstrap.servers" -> "192.168.226.129:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "SuperMarket2024Sql",
      "enable.auto.commit" -> (false: java.lang.Boolean)

    )

    //4) SPARK 链接 KAFKA 订阅,TOPIC, STREAMINGCONTEXT

    //topic name

    val topicName = Array("supermarket")
    val streamRdd = KafkaUtils.createDirectStream[String, String](
      ssc,
      PreferConsistent,
      Subscribe[String, String](topicName, kfkaParams)
    )


    val res = streamRdd.map(_.value())
    val result = res.flatMap(line => {
      val parts = line.split("\t")
      if (parts.length > 4) {
        val ID = parts(0).trim
        val status = parts(4).trim

        if ("1995922".equals(ID)) {

          if ("U".equals(status)) List(("1995922U上架", parts(2).toInt))
          else if ("D".equals(status)) List(("1995922D下架", parts(2).toInt - parts(3).toInt))
          else Nil
        }
        else if ("1996119".equals(ID)) {

          if ("U".equals(status)) List(("1996119U上架", parts(2).toInt))
          else if ("D".equals(status)) List(("1996119D下架", parts(2).toInt - parts(3).toInt))
          else Nil
        }
        else if ("1997201".equals(ID)) {

          if ("U".equals(status)) List(("1997201U上架", parts(2).toInt))
          else if ("D".equals(status)) List(("1997201D下架", parts(2).toInt - parts(3).toInt))
          else Nil
        }
        else if ("1999423".equals(ID)) {

          if ("U".equals(status)) List(("1999423U上架", parts(2).toInt))
          else if ("D".equals(status)) List(("1999423D下架", parts(2).toInt - parts(3).toInt))
          else Nil
        }
        else if ("2024123".equals(ID)) {

          if ("U".equals(status)) List(("2024123U上架", parts(2).toInt))
          else if ("D".equals(status)) List(("2024123D下架", parts(2).toInt - parts(3).toInt))
          else Nil
        }
        else Nil
      } else Nil


    }).reduceByKeyAndWindow(_ + _,
      Seconds(2),
      Seconds(2)
    )
//result.print()

    val spSess = SparkSession.builder
      .appName("superMarketSql")
      .master("local[*]")
      .getOrCreate()

    import spSess.implicits._

    /*x =>
           if(!x.isEmpty()){
             val df: DataFrame = x.toDF("ID", "U上架", "D下架")
             df.show()
           }*/
    result.foreachRDD {

      x =>
        if (!x.isEmpty()) {
          val df: DataFrame = x.toDF("IDSTATUS","COUNT")

          df.show()



          // DB
          df.write.format("jdbc")
            .option("url", "jdbc:mysql://127.0.0.1:3306/supermarketsql?characterEncoding=UTF8&sueSSL=false")
            .option("dbtable", "supersqlnew")
            .option("user", "root")
            .option("password", "123456")
            .mode(SaveMode.Append)
            .save()
          // RDD


        }
    }
    ssc.start()
    ssc.awaitTermination()
  }
}

7)各个货品号各个物品上架和下架的数量

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.{Seconds, StreamingContext}

object Super_IdProStuCou{
  def main(args: Array[String]):Unit = {
    /* spark streaming 实现 kafka 的消费者
    1) 构建spqrkconf 本地运行,运行应用程序名称
    2) 构建 sparkstreaming ---> streamingContext, 加载配置
    3) kafka 配置 broker , key value, group id,消费模式
    4) SPARK 链接 KAFKA 订阅,TOPIC, STREAMINGCONTEXT
    5) 循环的形式  打印  处理
    6) 开启SSC , 监控 KAFKA 数据
         */

    //   1) 构建spqrkconf 本地运行,运行应用程序名称
    val conf = new SparkConf().setMaster("local[*]").setAppName("SerMarket2024")

    //2) 构建 sparkstreaming ---> streamingContext, 加载配置
    // StreamingContext 需要导入依赖
    // spark streaming 可以进行流式处理,微批次处理, 间隔2秒
    val ssc = new StreamingContext(conf, Seconds(2))

    // SPARK 输出红色 info信息  --》 error
    ssc.sparkContext.setLogLevel("error")

    //3) kafka 配置 broker , key value, group id,消费模式
    val kfkaParams = Map[String, Object](
      "bootstrap.servers" -> "192.168.226.129:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "SuperMarket2024IdProStuCou",
      "enable.auto.commit" -> (false: java.lang.Boolean)

    )

    //4) SPARK 链接 KAFKA 订阅,TOPIC, STREAMINGCONTEXT

    //topic name

    val topicName = Array("supermarket")
    val streamRdd = KafkaUtils.createDirectStream[String, String](
      ssc,
      PreferConsistent,
      Subscribe[String, String](topicName, kfkaParams)
    )

    /* // 返回KAFKA 返回的 STREAMRDD (一段有时间间隔的RDD)
     streamRdd.foreachRDD(
       x => {
         if (! x.isEmpty()){   // 判断是否为空, ! 相反
           val line = x.map(_.value())  //
           line.foreach(println)
         }
       }
     )*/

    /*val inoutStream = streamRdd.map(x => x.value().split("\t"))

          val statusCounts = inoutStream.flatMap(x =>{
            if(x.length >= 5){
              val status = x(4).trim
              if("U".equals(status)) List(("U上架",1))
              else if("D".equals(status)) List(("D下架",1))
              else Nil
            }else Nil
          }).reduceByKey(_+_)

    statusCounts.foreachRDD(x => {
      println("--------上架和下架统计-----------")
      val counts = x.collect()
      counts.foreach { case (status, count) =>
        println(s"$status:$count")
      }
    })*/
    val res = streamRdd.map(_.value())
    val result = res.flatMap(line => {
      val parts = line.split("\t")
      if (parts.length > 4) {
        val ID = parts(0).trim
        val status = parts(4).trim
        val name = parts(1).trim

        if ("1995922".equals(ID)) {
          if("Nayeon".equals(name)) {

            if ("U".equals(status)) List(("1995922NayeonU上架", parts(2).toInt))
            else if ("D".equals(status)) List(("1995922NayeonD下架", parts(2).toInt - parts(3).toInt))
            else Nil
          }
          else if("Jeongyeon".equals(name)) {

            if ("U".equals(status)) List(("1995922JeongyeonU上架", parts(2).toInt))
            else if ("D".equals(status)) List(("1995922JeongyeonD下架", parts(2).toInt - parts(3).toInt))
            else Nil
          }
          else  Nil
        }
        else if ("1996119".equals(ID)) {

          if ("Momo".equals(name)) {

            if ("U".equals(status)) List(("1996119MomoU上架", parts(2).toInt))
            else if ("D".equals(status)) List(("1996119MomoD下架", parts(2).toInt - parts(3).toInt))
            else Nil
          }
          else if ("Sana".equals(name)) {

            if ("U".equals(status)) List(("1996119SanaU上架", parts(2).toInt))
            else if ("D".equals(status)) List(("1996119SanaD下架", parts(2).toInt - parts(3).toInt))
            else Nil
          }
          else if ("Mina".equals(name)) {

            if ("U".equals(status)) List(("1996119MinaU上架", parts(2).toInt))
            else if ("D".equals(status)) List(("1996119MinaD下架", parts(2).toInt - parts(3).toInt))
            else Nil
          }
          else Nil
        }
        else if ("1997201".equals(ID)) {

          if ("Jihyo".equals(name)) {

            if ("U".equals(status)) List(("1997201JihyoU上架", parts(2).toInt))
            else if ("D".equals(status)) List(("1997201JihyoD下架", parts(2).toInt - parts(3).toInt))
            else Nil
          }
          else if ("Dahyun".equals(name)) {

            if ("U".equals(status)) List(("1997201DahyunU上架", parts(2).toInt))
            else if ("D".equals(status)) List(("1997201DahyunD下架", parts(2).toInt - parts(3).toInt))
            else Nil
          }
          else Nil
        }
        else if ("1999423".equals(ID)) {

          if ("Chaeyoung".equals(name)) {

            if ("U".equals(status)) List(("1999423ChaeyoungU上架", parts(2).toInt))
            else if ("D".equals(status)) List(("1999423ChaeyoungD下架", parts(2).toInt - parts(3).toInt))
            else Nil
          }
          else if ("Tzuyu".equals(name)) {

            if ("U".equals(status)) List(("1999423TzuyuU上架", parts(2).toInt))
            else if ("D".equals(status)) List(("1999423TzuyuD下架", parts(2).toInt - parts(3).toInt))
            else Nil
          }
          else Nil
        }
        else if ("2024123".equals(ID)) {

          if ("Zico".equals(name)) {

            if ("U".equals(status)) List(("2024123ZicoU上架", parts(2).toInt))
            else if ("D".equals(status)) List(("2024123ZicoD下架", parts(2).toInt - parts(3).toInt))
            else Nil
          }

          else Nil
        }
        else Nil
      } else Nil


    }).reduceByKeyAndWindow(_ + _,
      Seconds(4),
      Seconds(4)
    )
    result.foreachRDD(
      x => {
        println("--------各个货品号各个物品上下架IPSC------")
        x.foreach(
          obj => {
            println(obj.toString)

            //创建新的客户端
            val property = new java.util.Properties()
            property.put("bootstrap.servers", "192.168.226.129:9092")
            property.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
            property.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")


            val producer = new KafkaProducer[String, String](property)

            //spark 链接 KAFKA
            producer.send(new ProducerRecord[String, String]("supermarketIdProStuCou", obj.toString))
            //关闭
            producer.close()}

          //结果打印

          //将数据发送到KAFKA 的TOPIC 当中 ,发送到 TOPIC TEST1
          // 1)构建
        )}
    )

    //6)开启SSC , 监控 KAFKA 数据
    ssc.start()
    ssc.awaitTermination() //监控KAFKA的数据

  }

}

8)推送到 mysql 使用 echarts展示



import org.apache.spark.sql.{DataFrame,  SaveMode, SparkSession}
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.kafka.common.serialization.StringDeserializer

import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}




object Super_SqlEcharts {

  def main(args: Array[String]): Unit = {
    val isLocal = if (args.length > 0) args(0).toBoolean else true // 提供默认值为 true

    val conf = new SparkConf().setAppName("SerMarket2024")
    if (isLocal) {
      conf.setMaster("local[*]")
    }

    val ssc = new StreamingContext(conf, Seconds(2))

    // SPARK 输出红色 info信息  --》 error
    ssc.sparkContext.setLogLevel("error")

    //3) kafka 配置 broker , key value, group id,消费模式
    val kfkaParams = Map[String, Object](
      "bootstrap.servers" -> "192.168.226.129:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "SuperMarket2024SqlEcharts",
      "enable.auto.commit" -> (false: java.lang.Boolean)

    )

    //4) SPARK 链接 KAFKA 订阅,TOPIC, STREAMINGCONTEXT

    //topic name

    val topicName = Array("supermarket")
    val streamRdd = KafkaUtils.createDirectStream[String, String](
      ssc,
      PreferConsistent,
      Subscribe[String, String](topicName, kfkaParams)
    )


    val res = streamRdd.map(_.value())
    val result = res.flatMap(line => {
      val parts = line.split("\t")
      if (parts.length > 4) {
        val ID = parts(0).trim
        val status = parts(4).trim
        val name = parts(1).trim

        if ("1995922".equals(ID)) {
          if ("Nayeon".equals(name)) {

            if ("U".equals(status)) List(("1995922NayeonU上架", parts(2).toInt))
            else if ("D".equals(status)) List(("1995922NayeonD下架", parts(2).toInt - parts(3).toInt))
            else Nil
          }
          else if ("Jeongyeon".equals(name)) {

            if ("U".equals(status)) List(("1995922JeongyeonU上架", parts(2).toInt))
            else if ("D".equals(status)) List(("1995922JeongyeonD下架", parts(2).toInt - parts(3).toInt))
            else Nil
          }
          else Nil
        }
        else if ("1996119".equals(ID)) {

          if ("Momo".equals(name)) {

            if ("U".equals(status)) List(("1996119MomoU上架", parts(2).toInt))
            else if ("D".equals(status)) List(("1996119MomoD下架", parts(2).toInt - parts(3).toInt))
            else Nil
          }
          else if ("Sana".equals(name)) {

            if ("U".equals(status)) List(("1996119SanaU上架", parts(2).toInt))
            else if ("D".equals(status)) List(("1996119SanaD下架", parts(2).toInt - parts(3).toInt))
            else Nil
          }
          else if ("Mina".equals(name)) {

            if ("U".equals(status)) List(("1996119MinaU上架", parts(2).toInt))
            else if ("D".equals(status)) List(("1996119MinaD下架", parts(2).toInt - parts(3).toInt))
            else Nil
          }
          else Nil
        }
        else if ("1997201".equals(ID)) {

          if ("Jihyo".equals(name)) {

            if ("U".equals(status)) List(("1997201JihyoU上架", parts(2).toInt))
            else if ("D".equals(status)) List(("1997201JihyoD下架", parts(2).toInt - parts(3).toInt))
            else Nil
          }
          else if ("Dahyun".equals(name)) {

            if ("U".equals(status)) List(("1997201DahyunU上架", parts(2).toInt))
            else if ("D".equals(status)) List(("1997201DahyunD下架", parts(2).toInt - parts(3).toInt))
            else Nil
          }
          else Nil
        }
        else if ("1999423".equals(ID)) {

          if ("Chaeyoung".equals(name)) {

            if ("U".equals(status)) List(("1999423ChaeyoungU上架", parts(2).toInt))
            else if ("D".equals(status)) List(("1999423ChaeyoungD下架", parts(2).toInt - parts(3).toInt))
            else Nil
          }
          else if ("Tzuyu".equals(name)) {

            if ("U".equals(status)) List(("1999423TzuyuU上架", parts(2).toInt))
            else if ("D".equals(status)) List(("1999423TzuyuD下架", parts(2).toInt - parts(3).toInt))
            else Nil
          }
          else Nil
        }
        else if ("2024123".equals(ID)) {

          if ("Zico".equals(name)) {

            if ("U".equals(status)) List(("2024123ZicoU上架", parts(2).toInt))
            else if ("D".equals(status)) List(("2024123ZicoD下架", parts(2).toInt - parts(3).toInt))
            else Nil
          }

          else Nil
        }
        else Nil
      } else Nil


    }).reduceByKeyAndWindow(_ + _,
      Seconds(4),
      Seconds(4)
    )
    //result.print()

    val spSess = SparkSession.builder
      .appName("echarts")
      .master("local[*]")
      .getOrCreate()

    import spSess.implicits._

    /*x =>
           if(!x.isEmpty()){
             val df: DataFrame = x.toDF("ID", "U上架", "D下架")
             df.show()
           }*/
    result.foreachRDD {

      x =>
        if (!x.isEmpty()) {
          val df: DataFrame = x.toDF("IDOWNSTATUS","COUNT")

          df.show()



          // DB
          df.write.format("jdbc")
            .option("url", "jdbc:mysql://127.0.0.1:3306/echarts?characterEncoding=UTF8&sueSSL=false")
            .option("dbtable", "superecharts")
            .option("user", "root")
            .option("password", "123456")
            .mode(SaveMode.Append)
            .save()
          // RDD


        }
    }
    ssc.start()
    ssc.awaitTermination()
  }
}



标签:val,PYTHON,toInt,equals,value,else,FLASK,parts,KAFKA
From: https://blog.csdn.net/duibuqiwoshijing/article/details/145112345

相关文章

  • 2025 算法方向毕业设计选题推荐汇总 python
    目录前言毕设选题选题迷茫选题的重要性更多选题指导最后 前言  ......
  • 巧夺天工:VSCode Python 终端环境隔离的背后原理
    每个写Python的小伙伴都会感慨,VSCode对Python环境的支持太好了!当你切换Python解释器后,新开的终端会自动激活对应的环境,不同项目互不干扰,用起来简直不要太舒服。但是,你知道这背后的实现原理吗?终端环境隔离的本质:环境变量首先,我们要理解终端中环境激活的本质。当我们在终端......
  • 【MSF免杀】python木马源码免杀
    免责声明由于传播利用本文所提供的信息而造成的任何直接或者间接的后果及损失,均由使用者本人负责,一旦造成后果请自行承担!......
  • HFSS 仿真完成后通过邮件和弹窗通知--python脚本
    通过windows弹窗和和邮件提醒,通知HFSS开发者仿真分析已经结束,快回去工作吧!(支持优化分析和单个普通分析。)窗口提示邮箱提示(这里我用的qq邮箱)你需要的前置准备在电脑上安装python支持smpt的邮箱(绝大部分邮箱都支持,但是你需要确认在邮箱设置中已经开启)获取"邮箱授权码",(......
  • python bs4 selenium 查找a href=javascript:();的实际点击事件和url
    在使用BeautifulSoup和Selenium时,处理href="javascript:;"的链接需要一些额外的步骤,因为这些链接不直接指向一个URL,而是通过JavaScript代码来执行某些操作。这可能包括导航到另一个页面、触发模态窗口、显示/隐藏内容等。以下是如何使用Selenium来查找和处理这......
  • Python 和 Tesseract OCR 识别复杂验证码
    ​安装依赖首先,确保已安装所需的工具和库。安装Tesseract在Windows上,下载安装包并进行安装:TesseractGitHub。在Linux上,你可以通过以下命令安装:bash更多内容访问ttocr.com或联系1436423940sudoapt-getinstalltesseract-ocr安装Python库使用pip安装Python......
  • Python|【Pytorch】基于小波时频图与SwinTransformer的轴承故障诊断研究
    ......
  • flask框架实验课程安排与资料管理系统毕设源码+论文
    本系统(程序+源码+数据库+调试部署+开发环境)带论文文档1万字以上,文末可获取,系统界面在最后面。系统程序文件列表开题报告内容一、选题背景关于实验课程安排与资料管理系统的研究,现有研究主要集中在课程安排或者资料管理单方面的居多,专门针对实验课程这一特定场景下的课程安......
  • flask框架手机在线销售系统毕设源码+论文
    本系统(程序+源码+数据库+调试部署+开发环境)带论文文档1万字以上,文末可获取,系统界面在最后面。系统程序文件列表开题报告内容一、选题背景随着移动互联网的飞速发展以及智能手机的大规模普及,手机成为人们生活中极为重要的一部分,这为手机在线销售系统提供了庞大的市场需求基......
  • flask框架人事管理系统毕设源码+论文
    本系统(程序+源码+数据库+调试部署+开发环境)带论文文档1万字以上,文末可获取,系统界面在最后面。系统程序文件列表开题报告内容一、选题背景关于人事管理系统的研究,现有研究主要集中在通用的企业管理系统层面,对人事管理系统中各个功能模块深度整合与优化的研究较少。在国内外......