一、项目要求
1)使用 spark streaming 创建消费者读取相应主题的数据
2) 使用 spark streaming 实时统计 每隔2秒 分别统计 所有上架和下架各自的数量
3) 使用 spark streaming 实时统计 每隔2秒 各个货品号 各自的数量
4) 使用 spark streaming 实时统计 每隔2秒 各个类别的物品数量
5) 使用 spark sql 统计 各个货品号的上架和下架数量
6) 使用 spark core/rdd 统计 各个货品号各个物品上架和下架的数量
7) 可以将上边计算的结果 spark streaming推送到 kafka的一个新的主题,使用前端工具实时展示,spark core 和 spark rdd推送到 mysql 使用 echarts展示。
本地部署。在本地开启虚拟机,启动zookeeper,kafka。运行项目代码对系统进行访问。
二、项目活动图
三、实时统计效果图
1)实时展示所有上架和下架各自的数量
2) 实时展示各个学期 各个货品号 各自的数量
3)实时展示各个学期 各个类别的物品数量
4) 实时展各个货品号各个物品上架和下架的数量
5) 使用echarts饼状图展示 各个货品号各个物品上架和下架的数量
四、详细代码
1)处理部分代码
package scala2
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
object Super_Total2 {
def main(args: Array[String]): Unit = {
// 1) 构建spqrkconf 本地运行,运行应用程序名称
val conf = new SparkConf().setMaster("local[*]").setAppName("SerMarket2024")
//2) 构建 sparkstreaming ---> streamingContext, 加载配置
// StreamingContext 需要导入依赖
// spark streaming 可以进行流式处理,微批次处理, 间隔2秒
val ssc = new StreamingContext(conf, Seconds(2))
// SPARK 输出红色 info信息 --》 error
ssc.sparkContext.setLogLevel("error")
//3) kafka 配置 broker , key value, group id,消费模式
val kfkaParams = Map[String, Object](
"bootstrap.servers" -> "mina:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "SuperMarket2025Total",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
//4) SPARK 链接 KAFKA 订阅,TOPIC, STREAMINGCONTEXT
//topic name
val topicName = Array("supermarket")
val streamRdd = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topicName, kfkaParams)
)
//data process
//Super_IdProStuCou data process
val resIdProStuCou = streamRdd.map(_.value())
val resultIdProStuCou = resIdProStuCou.flatMap(line => {
val parts = line.split("\t")
if (parts.length > 4) {
val ID = parts(0).trim
val status = parts(4).trim
val name = parts(1).trim
if ("1995922".equals(ID)) {
if ("Nayeon".equals(name)) {
if ("U".equals(status)) List(("1995922NayeonU上架", parts(2).toInt))
else if ("D".equals(status)) List(("1995922NayeonD下架", parts(2).toInt - parts(3).toInt))
else Nil
}
else if ("Jeongyeon".equals(name)) {
if ("U".equals(status)) List(("1995922JeongyeonU上架", parts(2).toInt))
else if ("D".equals(status)) List(("1995922JeongyeonD下架", parts(2).toInt - parts(3).toInt))
else Nil
}
else Nil
}
else if ("1996119".equals(ID)) {
if ("Momo".equals(name)) {
if ("U".equals(status)) List(("1996119MomoU上架", parts(2).toInt))
else if ("D".equals(status)) List(("1996119MomoD下架", parts(2).toInt - parts(3).toInt))
else Nil
}
else if ("Sana".equals(name)) {
if ("U".equals(status)) List(("1996119SanaU上架", parts(2).toInt))
else if ("D".equals(status)) List(("1996119SanaD下架", parts(2).toInt - parts(3).toInt))
else Nil
}
else if ("Mina".equals(name)) {
if ("U".equals(status)) List(("1996119MinaU上架", parts(2).toInt))
else if ("D".equals(status)) List(("1996119MinaD下架", parts(2).toInt - parts(3).toInt))
else Nil
}
else Nil
}
else if ("1997201".equals(ID)) {
if ("Jihyo".equals(name)) {
if ("U".equals(status)) List(("1997201JihyoU上架", parts(2).toInt))
else if ("D".equals(status)) List(("1997201JihyoD下架", parts(2).toInt - parts(3).toInt))
else Nil
}
else if ("Dahyun".equals(name)) {
if ("U".equals(status)) List(("1997201DahyunU上架", parts(2).toInt))
else if ("D".equals(status)) List(("1997201DahyunD下架", parts(2).toInt - parts(3).toInt))
else Nil
}
else Nil
}
else if ("1999423".equals(ID)) {
if ("Chaeyoung".equals(name)) {
if ("U".equals(status)) List(("1999423ChaeyoungU上架", parts(2).toInt))
else if ("D".equals(status)) List(("1999423ChaeyoungD下架", parts(2).toInt - parts(3).toInt))
else Nil
}
else if ("Tzuyu".equals(name)) {
if ("U".equals(status)) List(("1999423TzuyuU上架", parts(2).toInt))
else if ("D".equals(status)) List(("1999423TzuyuD下架", parts(2).toInt - parts(3).toInt))
else Nil
}
else Nil
}
else if ("2024123".equals(ID)) {
if ("Zico".equals(name)) {
if ("U".equals(status)) List(("2024123ZicoU上架", parts(2).toInt))
else if ("D".equals(status)) List(("2024123ZicoD下架", parts(2).toInt - parts(3).toInt))
else Nil
}
else Nil
}
else Nil
} else Nil
}).reduceByKeyAndWindow(_ + _,
Seconds(4),
Seconds(4)
)
//Super_INOUT data process
val resINOUT = streamRdd.map(_.value())
val resultINOUT = resINOUT.flatMap(line => {
val parts = line.split("\t")
if (parts.length > 4) {
val status = parts(4).trim
if ("U".equals(status)) List(("U上架", parts(2).toInt))
else if ("D".equals(status)) List(("D下架", parts(2).toInt - parts(3).toInt))
else Nil
} else Nil
}).reduceByKeyAndWindow(_ + _, Seconds(4), Seconds(4))
//Super_ProCla data process
val resProCla = streamRdd.map(_.value())
val resultProCla = resProCla.flatMap(line => {
val parts = line.split("\t")
if (parts.length > 4) {
val Cla = parts(1).trim
if ("Zico".equals(Cla)) List(("货品类Zico", parts(2).toInt - parts(3).toInt))
else if ("Tzuyu".equals(Cla)) List(("货品类Tzuyu", parts(2).toInt - parts(3).toInt))
else if ("Chaeyoung".equals(Cla)) List(("货品类Chaeyoung", parts(2).toInt - parts(3).toInt))
else if ("Dahyun".equals(Cla)) List(("货品类Dahyun", parts(2).toInt - parts(3).toInt))
else if ("Mina".equals(Cla)) List(("货品类Mina", parts(2).toInt - parts(3).toInt))
else if ("Jihyo".equals(Cla)) List(("货品类Jihyo", parts(2).toInt - parts(3).toInt))
else if ("Sana".equals(Cla)) List(("货品类Sana", parts(2).toInt - parts(3).toInt))
else if ("Momo".equals(Cla)) List(("货品类Momo", parts(2).toInt - parts(3).toInt))
else if ("Jeongyeon".equals(Cla)) List(("货品类Jeongyeon", parts(2).toInt - parts(3).toInt))
else if ("Nayeon".equals(Cla)) List(("货品类Nayeon", parts(2).toInt - parts(3).toInt))
else Nil
} else Nil
}).reduceByKeyAndWindow(_ + _, Seconds(4), Seconds(4))
//Super_ProId data process
val resProId = streamRdd.map(_.value())
val resultProId = resProId.flatMap(line => {
val parts = line.split("\t")
if (parts.length > 4) {
val ID = parts(0).trim
if ("1995922".equals(ID)) List(("货品号1995922", parts(2).toInt - parts(3).toInt))
else if ("1996119".equals(ID)) List(("货品号1996119", parts(2).toInt - parts(3).toInt))
else if ("1997201".equals(ID)) List(("货品号1997201", parts(2).toInt - parts(3).toInt))
else if ("1999423".equals(ID)) List(("货品号1999423", parts(2).toInt - parts(3).toInt))
else if ("2024123".equals(ID)) List(("货品号2024123", parts(2).toInt - parts(3).toInt))
else Nil
} else Nil
}).reduceByKeyAndWindow(_ + _, Seconds(4), Seconds(4))
//Super_Sal data process
val resSql = streamRdd.map(_.value())
val resultSql = resSql.flatMap(line => {
val parts = line.split("\t")
if (parts.length > 4) {
val ID = parts(0).trim
val status = parts(4).trim
if ("1995922".equals(ID)) {
if ("U".equals(status)) List(("1995922U上架", parts(2).toInt))
else if ("D".equals(status)) List(("1995922D下架", parts(2).toInt - parts(3).toInt))
else Nil
}
else if ("1996119".equals(ID)) {
if ("U".equals(status)) List(("1996119U上架", parts(2).toInt))
else if ("D".equals(status)) List(("1996119D下架", parts(2).toInt - parts(3).toInt))
else Nil
}
else if ("1997201".equals(ID)) {
if ("U".equals(status)) List(("1997201U上架", parts(2).toInt))
else if ("D".equals(status)) List(("1997201D下架", parts(2).toInt - parts(3).toInt))
else Nil
}
else if ("1999423".equals(ID)) {
if ("U".equals(status)) List(("1999423U上架", parts(2).toInt))
else if ("D".equals(status)) List(("1999423D下架", parts(2).toInt - parts(3).toInt))
else Nil
}
else if ("2024123".equals(ID)) {
if ("U".equals(status)) List(("2024123U上架", parts(2).toInt))
else if ("D".equals(status)) List(("2024123D下架", parts(2).toInt - parts(3).toInt))
else Nil
}
else Nil
} else Nil
}).reduceByKeyAndWindow(_ + _,
Seconds(4),
Seconds(4)
)
//data store
//Super_IdProStuCou data store
resultIdProStuCou.foreachRDD(
x => {
println("--------各个货品号各个物品上下架IPSC------")
x.foreach(
obj => {
println(obj.toString)
//创建新的客户端
val property = new java.util.Properties()
property.put("bootstrap.servers", "mina:9092")
property.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
property.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
val producer = new KafkaProducer[String, String](property)
//spark 链接 KAFKA
producer.send(new ProducerRecord[String, String]("supermarketIdProStuCou", obj.toString))
//关闭
producer.close()
}
//结果打印
//将数据发送到KAFKA 的TOPIC 当中 ,发送到 TOPIC TEST1
// 1)构建
)
}
)
//Super_INOUT data store
resultINOUT.foreachRDD(
x => {
println("--------所有上架下架INOUT------")
x.foreach(
obj => {
println(obj)
val property = new java.util.Properties()
property.put("bootstrap.servers", "mina:9092")
property.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
property.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
val producer = new KafkaProducer[String, String](property)
//spark 链接 KAFKA
producer.send(new ProducerRecord[String, String]("supermarketINOUT", obj.toString))
//关闭
producer.close()
}
//结果打印
//将数据发送到KAFKA 的TOPIC 当中 ,发送到 TOPIC TEST1
// 1)构建
)
}
)
//Super_ProCla data store
resultProCla.foreachRDD(
x => {
println("--------各个货品类CLASS------")
x.foreach(
obj => {
println(obj)
//创建新的客户端
val property = new java.util.Properties()
property.put("bootstrap.servers", "mina:9092")
property.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
property.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
val producer = new KafkaProducer[String, String](property)
//spark 链接 KAFKA
producer.send(new ProducerRecord[String, String]("supermarketProCla", obj.toString))
//关闭
producer.close()
}
//结果打印
//将数据发送到KAFKA 的TOPIC 当中 ,发送到 TOPIC TEST1
// 1)构建
)
}
)
//Super_ProId data store
resultProId.foreachRDD(
x => {
println("--------各个货品号ID------")
x.foreach(
obj => {
println(obj)
//创建新的客户端
val property = new java.util.Properties()
property.put("bootstrap.servers", "mina:9092")
property.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
property.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
val producer = new KafkaProducer[String, String](property)
//spark 链接 KAFKA
producer.send(new ProducerRecord[String, String]("supermarketProId", obj.toString))
//关闭
producer.close()
}
//结果打印
//将数据发送到KAFKA 的TOPIC 当中 ,发送到 TOPIC TEST1
// 1)构建
)
}
)
//Super_SCon data store
streamRdd.foreachRDD {
x =>
if (!x.isEmpty()) {
println("--------读取数据是SCON------")
val records = x.map(_.value())
records.foreach(println)
//创建新的客户端
val property = new java.util.Properties()
property.put("bootstrap.servers", "mina:9092")
property.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
property.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
val producer = new KafkaProducer[String, String](property)
//spark 链接 KAFKA
producer.send(new ProducerRecord[String, String]("supermarketRead", records.toString))
//关闭
producer.close()
}
//结果打印
}
//Super_Sql data store
val spSessSql = SparkSession.builder
.appName("superMarketSql")
.master("local[*]")
.getOrCreate()
import spSessSql.implicits._
/*x =>
if(!x.isEmpty()){
val df: DataFrame = x.toDF("ID", "U上架", "D下架")
df.show()
}*/
resultSql.foreachRDD {
x =>
if (!x.isEmpty()) {
val df: DataFrame = x.toDF("IDSTATUS", "COUNT")
df.show()
// DB
df.write.format("jdbc")
.option("url", "jdbc:mysql://127.0.0.2:3306/supermarketsql?characterEncoding=UTF8&sueSSL=false")
.option("dbtable", "supersqlnew_1")
.option("user", "root")
.option("password", "123456")
.mode(SaveMode.Append)
.save()
// RDD
}
}
//Super_SqlEcharts data store
val spSessSqlEcharts = SparkSession.builder
.appName("echarts")
.master("local[*]")
.getOrCreate()
val resultSqlEcharts = resultIdProStuCou
/*x =>
if(!x.isEmpty()){
val df: DataFrame = x.toDF("ID", "U上架", "D下架")
df.show()
}*/
resultSqlEcharts.foreachRDD {
x =>
if (!x.isEmpty()) {
import spSessSqlEcharts.implicits._
val df: DataFrame = x.toDF("IDOWNSTATUS", "COUNT")
df.show()
// DB
df.write.format("jdbc")
.option("url", "jdbc:mysql://127.0.0.1:3306/echarts?characterEncoding=UTF8&sueSSL=false")
.option("dbtable", "superecharts_1")
.option("user", "root")
.option("password", "123456")
.mode(SaveMode.Append)
.save()
// RDD
}
}
//6)开启SSC , 监控 KAFKA 数据
ssc.start()
ssc.awaitTermination() //监控KAFKA的数据
}
}
2)flask可视化部分
1>图一
import json
from flask import Flask, render_template
from flask_socketio import SocketIO
from kafka import KafkaConsumer
app = Flask(__name__)
app.config['SECRET_KEY'] = 'secret!'
socketio = SocketIO(app)
thread = None
#***************************************************
consumer1 = KafkaConsumer(
auto_offset_reset='earliest',
bootstrap_servers = "mina:9092",
group_id = "superMarketINOUT",
)
consumer1.subscribe(["supermarketINOUT"])
#****************************************************
#**********************************************************
def background_thread2():
id1 = 0
id2 = 0
for msg in consumer1:
data_json = msg.value.decode('utf8')
# 解析json数据
# data_list = json.loads(data_json)
obj = data_json.split(',')
if 'U上架' in msg.value.decode('utf8'):
id1 = obj[1]
elif 'D下架' in msg.value.decode('utf8'):
id2 = obj[1]
else:
continue
result = str(id1) + ',' + str(id2)
print(result)
socketio.emit('test_message2', {'data': result})
# ***********************************************************************
@socketio.on('test_connect')
def connect(message):
print(message)
global thread
if thread is None:
thread = socketio.start_background_task(target=background_thread2) # **************************
socketio.emit('connected', {'data': 'Connected'})
@app.route("/")
def handle_mes():
return render_template("INOUT.html")
if __name__ == '__main__':
socketio.run(app ,port= 5000,debug=True)
import json
from flask import Flask, render_template
from flask_socketio import SocketIO
from kafka import KafkaConsumer
app = Flask(__name__)
app.config['SECRET_KEY'] = 'secret!'
socketio = SocketIO(app)
thread = None
#***************************************************
consumer1 = KafkaConsumer(
auto_offset_reset='earliest',
bootstrap_servers = "mina:9092",
group_id = "superMarketINOUT",
)
consumer1.subscribe(["supermarketINOUT"])
#****************************************************
#**********************************************************
def background_thread2():
id1 = 0
id2 = 0
for msg in consumer1:
data_json = msg.value.decode('utf8')
# 解析json数据
# data_list = json.loads(data_json)
obj = data_json.split(',')
if 'U上架' in msg.value.decode('utf8'):
id1 = obj[1]
elif 'D下架' in msg.value.decode('utf8'):
id2 = obj[1]
else:
continue
result = str(id1) + ',' + str(id2)
print(result)
socketio.emit('test_message2', {'data': result})
# ***********************************************************************
@socketio.on('test_connect')
def connect(message):
print(message)
global thread
if thread is None:
thread = socketio.start_background_task(target=background_thread2) # **************************
socketio.emit('connected', {'data': 'Connected'})
@app.route("/")
def handle_mes():
return render_template("INOUT.html")
if __name__ == '__main__':
socketio.run(app ,port= 5000,debug=True)
2>图二
import json
from flask import Flask, render_template
from flask_socketio import SocketIO
from kafka import KafkaConsumer
app = Flask(__name__)
app.config['SECRET_KEY'] = 'secret!'
socketio = SocketIO(app)
thread = None
#***************************************************
consumer1 = KafkaConsumer(
auto_offset_reset='earliest',
bootstrap_servers = "mina:9092",
group_id = "superMarketproId",
)
consumer1.subscribe(["supermarketProId"])
#****************************************************
#**********************************************************
def background_thread2():
id1 = 0
id2 = 0
id3 = 0
id4 = 0
id5 = 0
for msg in consumer1:
data_json = msg.value.decode('utf8')
print('-----当前数据----'+ data_json )
# 解析json数据
#data_list = json.loads(data_json)
obj = data_json.split(',')
if '货品号1995922' in msg.value.decode('utf8'):
id1 = obj[1]
elif '货品号1996119' in msg.value.decode('utf8'):
id2 = obj[1]
elif '货品号1997201' in msg.value.decode('utf8'):
id3 = obj[1]
elif '货品号1999423' in msg.value.decode('utf8'):
id4 = obj[1]
elif '货品号2024123' in msg.value.decode('utf8'):
id5 = obj[1]
else:
continue
result = str(id1) + ',' + str(id2) + ',' + str(id3) + ',' + str(id4)+ ',' + str(id5)
print(result)
socketio.emit('test_message2',{'data':result})
#***********************************************************************
@socketio.on('test_connect')
def connect(message):
print(message)
global thread
if thread is None:
thread = socketio.start_background_task(target=background_thread2)#**************************
socketio.emit('connected', {'data': 'Connected'})
@app.route("/")
def handle_mes():
return render_template("proId.html")
if __name__ == '__main__':
socketio.run(app,port=5001,debug=True)
import json
from flask import Flask, render_template
from flask_socketio import SocketIO
from kafka import KafkaConsumer
app = Flask(__name__)
app.config['SECRET_KEY'] = 'secret!'
socketio = SocketIO(app)
thread = None
#***************************************************
consumer1 = KafkaConsumer(
auto_offset_reset='earliest',
bootstrap_servers = "mina:9092",
group_id = "superMarketproId",
)
consumer1.subscribe(["supermarketProId"])
#****************************************************
#**********************************************************
def background_thread2():
id1 = 0
id2 = 0
id3 = 0
id4 = 0
id5 = 0
for msg in consumer1:
data_json = msg.value.decode('utf8')
print('-----当前数据----'+ data_json )
# 解析json数据
#data_list = json.loads(data_json)
obj = data_json.split(',')
if '货品号1995922' in msg.value.decode('utf8'):
id1 = obj[1]
elif '货品号1996119' in msg.value.decode('utf8'):
id2 = obj[1]
elif '货品号1997201' in msg.value.decode('utf8'):
id3 = obj[1]
elif '货品号1999423' in msg.value.decode('utf8'):
id4 = obj[1]
elif '货品号2024123' in msg.value.decode('utf8'):
id5 = obj[1]
else:
continue
result = str(id1) + ',' + str(id2) + ',' + str(id3) + ',' + str(id4)+ ',' + str(id5)
print(result)
socketio.emit('test_message2',{'data':result})
#***********************************************************************
@socketio.on('test_connect')
def connect(message):
print(message)
global thread
if thread is None:
thread = socketio.start_background_task(target=background_thread2)#**************************
socketio.emit('connected', {'data': 'Connected'})
@app.route("/")
def handle_mes():
return render_template("proId.html")
if __name__ == '__main__':
socketio.run(app,port=5001,debug=True)
3>图三
import json
from flask import Flask, render_template
from flask_socketio import SocketIO
from kafka import KafkaConsumer
app = Flask(__name__)
app.config['SECRET_KEY'] = 'secret!'
socketio = SocketIO(app)
thread = None
#***************************************************
consumer1 = KafkaConsumer(
auto_offset_reset='earliest',
bootstrap_servers = "mina:9092",
group_id = "superMarketproCla",
)
consumer1.subscribe(["supermarketProCla"])
#****************************************************
#**********************************************************
def background_thread2():
id1 = 0
id2 = 0
id3 = 0
id4 = 0
id5 = 0
id6 = 0
id7 = 0
id8 = 0
id9 = 0
id10 = 0
for msg in consumer1:
data_json = msg.value.decode('utf8')
print('-----当前数据----'+ data_json )
# 解析json数据
#data_list = json.loads(data_json)
obj = data_json.split(',')
if '货品类Zico' in msg.value.decode('utf8'):
id1 = obj[1]
elif '货品类Tzuyu' in msg.value.decode('utf8'):
id2 = obj[1]
elif '货品类Chaeyoung' in msg.value.decode('utf8'):
id3 = obj[1]
elif '货品类Dahyun' in msg.value.decode('utf8'):
id4 = obj[1]
elif '货品类Mina' in msg.value.decode('utf8'):
id5 = obj[1]
elif '货品类Jihyo' in msg.value.decode('utf8'):
id6 = obj[1]
elif '货品类Sana' in msg.value.decode('utf8'):
id7 = obj[1]
elif '货品类Momo' in msg.value.decode('utf8'):
id8 = obj[1]
elif '货品类Jeongyeon' in msg.value.decode('utf8'):
id9 = obj[1]
elif '货品类Nayeon' in msg.value.decode('utf8'):
id10 = obj[1]
else:
continue
result = str(id1) + ',' + str(id2) + ',' + str(id3) + ',' + str(id4)+ ',' + str(id5)+ ',' +str(id6) + ',' + str(id7) + ',' + str(id8) + ',' + str(id9)+ ',' + str(id10)
print(result)
socketio.emit('test_message2',{'data':result})
#***********************************************************************
@socketio.on('test_connect')
def connect(message):
print(message)
global thread
if thread is None:
thread = socketio.start_background_task(target=background_thread2)#**************************
socketio.emit('connected', {'data': 'Connected'})
@app.route("/")
def handle_mes():
return render_template("proCla.html")
if __name__ == '__main__':
socketio.run(app,port=5002,debug=True)
import json
from flask import Flask, render_template
from flask_socketio import SocketIO
from kafka import KafkaConsumer
app = Flask(__name__)
app.config['SECRET_KEY'] = 'secret!'
socketio = SocketIO(app)
thread = None
#***************************************************
consumer1 = KafkaConsumer(
auto_offset_reset='earliest',
bootstrap_servers = "mina:9092",
group_id = "superMarketproCla",
)
consumer1.subscribe(["supermarketProCla"])
#****************************************************
#**********************************************************
def background_thread2():
id1 = 0
id2 = 0
id3 = 0
id4 = 0
id5 = 0
id6 = 0
id7 = 0
id8 = 0
id9 = 0
id10 = 0
for msg in consumer1:
data_json = msg.value.decode('utf8')
print('-----当前数据----'+ data_json )
# 解析json数据
#data_list = json.loads(data_json)
obj = data_json.split(',')
if '货品类Zico' in msg.value.decode('utf8'):
id1 = obj[1]
elif '货品类Tzuyu' in msg.value.decode('utf8'):
id2 = obj[1]
elif '货品类Chaeyoung' in msg.value.decode('utf8'):
id3 = obj[1]
elif '货品类Dahyun' in msg.value.decode('utf8'):
id4 = obj[1]
elif '货品类Mina' in msg.value.decode('utf8'):
id5 = obj[1]
elif '货品类Jihyo' in msg.value.decode('utf8'):
id6 = obj[1]
elif '货品类Sana' in msg.value.decode('utf8'):
id7 = obj[1]
elif '货品类Momo' in msg.value.decode('utf8'):
id8 = obj[1]
elif '货品类Jeongyeon' in msg.value.decode('utf8'):
id9 = obj[1]
elif '货品类Nayeon' in msg.value.decode('utf8'):
id10 = obj[1]
else:
continue
result = str(id1) + ',' + str(id2) + ',' + str(id3) + ',' + str(id4)+ ',' + str(id5)+ ',' +str(id6) + ',' + str(id7) + ',' + str(id8) + ',' + str(id9)+ ',' + str(id10)
print(result)
socketio.emit('test_message2',{'data':result})
#***********************************************************************
@socketio.on('test_connect')
def connect(message):
print(message)
global thread
if thread is None:
thread = socketio.start_background_task(target=background_thread2)#**************************
socketio.emit('connected', {'data': 'Connected'})
@app.route("/")
def handle_mes():
return render_template("proCla.html")
if __name__ == '__main__':
socketio.run(app,port=5002,debug=True)
4>图四
import json
from flask import Flask, render_template
from flask_socketio import SocketIO
from kafka import KafkaConsumer
app = Flask(__name__)
app.config['SECRET_KEY'] = 'secret!'
socketio = SocketIO(app)
thread = None
#***************************************************
consumer1 = KafkaConsumer(
auto_offset_reset='earliest',
bootstrap_servers = "mina:9092",
group_id = "superMarketIdProStuCou",
)
consumer1.subscribe(["supermarketIdProStuCou"])
#****************************************************
#**********************************************************
def background_thread2():
id1 = 0
id2 = 0
id3 = 0
id4 = 0
id5 = 0
id6 = 0
id7 = 0
id8 = 0
id9 = 0
id10 = 0
id11 = 0
id12 = 0
id13 = 0
id14 = 0
id15 = 0
id16 = 0
id17 = 0
id18 = 0
id19 = 0
id20 = 0
for msg in consumer1:
data_json = msg.value.decode('utf8')
print('-----当前数据----'+ data_json )
# 解析json数据
#data_list = json.loads(data_json)
obj = data_json.split(',')
if '1995922NayeonU上架' in msg.value.decode('utf8'):
id1 = obj[1]
elif '1995922NayeonD下架' in msg.value.decode('utf8'):
id2 = obj[1]
elif '1995922JeongyeonU上架' in msg.value.decode('utf8'):
id3 = obj[1]
elif '1995922JeongyeonD下架' in msg.value.decode('utf8'):
id4 = obj[1]
elif '1996119MomoU上架' in msg.value.decode('utf8'):
id5 = obj[1]
elif '1996119MomoD下架' in msg.value.decode('utf8'):
id6 = obj[1]
elif '1996119SanaU上架' in msg.value.decode('utf8'):
id7 = obj[1]
elif '1996119SanaD下架' in msg.value.decode('utf8'):
id8 = obj[1]
elif '1996119MinaU上架' in msg.value.decode('utf8'):
id9 = obj[1]
elif '1996119MinaD下架' in msg.value.decode('utf8'):
id10 = obj[1]
elif '1997201JihyoU上架' in msg.value.decode('utf8'):
id11 = obj[1]
elif '1997201JihyoD下架' in msg.value.decode('utf8'):
id12 = obj[1]
elif '1997201DahyunU上架' in msg.value.decode('utf8'):
id13 = obj[1]
elif '1997201DahyunD下架' in msg.value.decode('utf8'):
id14 = obj[1]
elif '1999423ChaeyoungU上架' in msg.value.decode('utf8'):
id15 = obj[1]
elif '1999423ChaeyoungD下架' in msg.value.decode('utf8'):
id16 = obj[1]
elif '1999423TzuyuU上架' in msg.value.decode('utf8'):
id17 = obj[1]
elif '1999423TzuyuD下架' in msg.value.decode('utf8'):
id18 = obj[1]
elif '2024123ZicoU上架' in msg.value.decode('utf8'):
id19 = obj[1]
elif '2024123ZicoD下架' in msg.value.decode('utf8'):
id20 = obj[1]
else:
continue
result = str(id1) + ',' + str(id2) + ',' + str(id3) + ',' + str(id4)+ ',' + str(id5)+ ',' +str(id6) + ',' + str(id7) + ',' + str(id8) + ',' + str(id9)+ ',' + str(id10)+ ',' + str(id11) + ',' + str(id12) + ',' + str(id13) + ',' + str(id14)+ ',' + str(id15)+ ',' +str(id16) + ',' + str(id17) + ',' + str(id18) + ',' + str(id19)+ ',' + str(id20)
print(result)
socketio.emit('test_message2',{'data':result})
#***********************************************************************
@socketio.on('test_connect')
def connect(message):
print(message)
global thread
if thread is None:
thread = socketio.start_background_task(target=background_thread2)#**************************
socketio.emit('connected', {'data': 'Connected'})
@app.route("/")
def handle_mes():
return render_template("idProStuCou.html")
if __name__ == '__main__':
socketio.run(app,host='127.0.0.1',port=5003,debug=True)
import json
from flask import Flask, render_template
from flask_socketio import SocketIO
from kafka import KafkaConsumer
app = Flask(__name__)
app.config['SECRET_KEY'] = 'secret!'
socketio = SocketIO(app)
thread = None
#***************************************************
consumer1 = KafkaConsumer(
auto_offset_reset='earliest',
bootstrap_servers = "mina:9092",
group_id = "superMarketIdProStuCou",
)
consumer1.subscribe(["supermarketIdProStuCou"])
#****************************************************
#**********************************************************
def background_thread2():
id1 = 0
id2 = 0
id3 = 0
id4 = 0
id5 = 0
id6 = 0
id7 = 0
id8 = 0
id9 = 0
id10 = 0
id11 = 0
id12 = 0
id13 = 0
id14 = 0
id15 = 0
id16 = 0
id17 = 0
id18 = 0
id19 = 0
id20 = 0
for msg in consumer1:
data_json = msg.value.decode('utf8')
print('-----当前数据----'+ data_json )
# 解析json数据
#data_list = json.loads(data_json)
obj = data_json.split(',')
if '1995922NayeonU上架' in msg.value.decode('utf8'):
id1 = obj[1]
elif '1995922NayeonD下架' in msg.value.decode('utf8'):
id2 = obj[1]
elif '1995922JeongyeonU上架' in msg.value.decode('utf8'):
id3 = obj[1]
elif '1995922JeongyeonD下架' in msg.value.decode('utf8'):
id4 = obj[1]
elif '1996119MomoU上架' in msg.value.decode('utf8'):
id5 = obj[1]
elif '1996119MomoD下架' in msg.value.decode('utf8'):
id6 = obj[1]
elif '1996119SanaU上架' in msg.value.decode('utf8'):
id7 = obj[1]
elif '1996119SanaD下架' in msg.value.decode('utf8'):
id8 = obj[1]
elif '1996119MinaU上架' in msg.value.decode('utf8'):
id9 = obj[1]
elif '1996119MinaD下架' in msg.value.decode('utf8'):
id10 = obj[1]
elif '1997201JihyoU上架' in msg.value.decode('utf8'):
id11 = obj[1]
elif '1997201JihyoD下架' in msg.value.decode('utf8'):
id12 = obj[1]
elif '1997201DahyunU上架' in msg.value.decode('utf8'):
id13 = obj[1]
elif '1997201DahyunD下架' in msg.value.decode('utf8'):
id14 = obj[1]
elif '1999423ChaeyoungU上架' in msg.value.decode('utf8'):
id15 = obj[1]
elif '1999423ChaeyoungD下架' in msg.value.decode('utf8'):
id16 = obj[1]
elif '1999423TzuyuU上架' in msg.value.decode('utf8'):
id17 = obj[1]
elif '1999423TzuyuD下架' in msg.value.decode('utf8'):
id18 = obj[1]
elif '2024123ZicoU上架' in msg.value.decode('utf8'):
id19 = obj[1]
elif '2024123ZicoD下架' in msg.value.decode('utf8'):
id20 = obj[1]
else:
continue
result = str(id1) + ',' + str(id2) + ',' + str(id3) + ',' + str(id4)+ ',' + str(id5)+ ',' +str(id6) + ',' + str(id7) + ',' + str(id8) + ',' + str(id9)+ ',' + str(id10)+ ',' + str(id11) + ',' + str(id12) + ',' + str(id13) + ',' + str(id14)+ ',' + str(id15)+ ',' +str(id16) + ',' + str(id17) + ',' + str(id18) + ',' + str(id19)+ ',' + str(id20)
print(result)
socketio.emit('test_message2',{'data':result})
#***********************************************************************
@socketio.on('test_connect')
def connect(message):
print(message)
global thread
if thread is None:
thread = socketio.start_background_task(target=background_thread2)#**************************
socketio.emit('connected', {'data': 'Connected'})
@app.route("/")
def handle_mes():
return render_template("idProStuCou.html")
if __name__ == '__main__':
socketio.run(app,host='127.0.0.1',port=5003,debug=True)
五、详细部分阐述
1)KAFKA生产者
package org.example;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
import java.util.Random;
public class Super_Prod {
public static void main(String[] args) {
// 1. 创建 kafka 生产者的配置对象
Properties properties = new Properties();
// 2. 给 kafka 配置对象添加配置信息:bootstrap.servers
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.226.129:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
// 每隔5秒推送 5条数据
while (true) {
// 3. 创建 kafka 生产者对象
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
System.out.println("开始发送数据==========================");
try {
// 4. 调用 send 方法,发送消息
for (int i = 0; i < 10; i++) {
Random rd=new Random(); //创建实例
// 品类
String[] categoryArray = new String[]{"1995922\tNayeon\t92\t29\tU","1995922\tJeongyeon\t82\t28\tD","1996119\tMomo\t28\t2\tU","1996119\tSana\t94\t49\tU","1997201\tJihyo\t59\t5\tU","1996119\tMina\t69\t26\tU", "1997201\tDahyun\t77\t37\tU", "1999423\tChaeyoung\t98\t88\tD", "1999423\tTzuyu\t99\t25\tD", "2024123\tZico\t100\t10\tD"};
int radom =rd.nextInt(10);
String category = categoryArray[radom];
//String category = ""1995922\tNayeon\t92\t29\tU","1995922\tJeongyeon\t82\t28\tD","1996119\tMomo\t28\t2\tU","1996119\tSana\t94\t49\tU","1997201\tJihyo\t59\t5\tU","1996119\tMina\t69\t26\tU", "1997201\tDahyun\t77\t37\tU", "1999423\tChaeyoung\t98\t88\tD", "1999423\tTzuyu\t99\t25\tD", "2024123\tZico\t100\t10\tD"";
// 模拟数据
System.out.println("发送数据为" + category);
try {
kafkaProducer.send(new ProducerRecord<String, String>("supermarket", category));
}catch (Exception e){
e.printStackTrace();
}
}
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 5. 关闭资源
kafkaProducer.close();
}
}
}
2)创建消费者读取相应主题的数据
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.{Seconds, StreamingContext}
object Super_SCon {
def main(args: Array[String]):Unit ={
/* spark streaming 实现 kafka 的消费者
1) 构建spqrkconf 本地运行,运行应用程序名称
2) 构建 sparkstreaming ---> streamingContext, 加载配置
3) kafka 配置 broker , key value, group id,消费模式
4) SPARK 链接 KAFKA 订阅,TOPIC, STREAMINGCONTEXT
5) 循环的形式 打印 处理
6) 开启SSC , 监控 KAFKA 数据
*/
// 1) 构建spqrkconf 本地运行,运行应用程序名称
val conf = new SparkConf().setMaster("local[*]").setAppName("SerMarket2024")
//2) 构建 sparkstreaming ---> streamingContext, 加载配置
// StreamingContext 需要导入依赖
// spark streaming 可以进行流式处理,微批次处理, 间隔2秒
val ssc = new StreamingContext( conf, Seconds(2))
// SPARK 输出红色 info信息 --》 error
ssc.sparkContext.setLogLevel("error")
//3) kafka 配置 broker , key value, group id,消费模式
val kfkaParams = Map[String, Object](
"bootstrap.servers" ->"192.168.226.129:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "SuperMarket2024READ",
"enable.auto.commit" -> (false:java.lang.Boolean)
)
//4) SPARK 链接 KAFKA 订阅,TOPIC, STREAMINGCONTEXT
//topic name
val topicName = Array("supermarket")
val streamRdd = KafkaUtils.createDirectStream[String,String](
ssc,
PreferConsistent,
Subscribe[String,String](topicName,kfkaParams)
)
/* // 返回KAFKA 返回的 STREAMRDD (一段有时间间隔的RDD)
streamRdd.foreachRDD(
x => {
if (! x.isEmpty()){ // 判断是否为空, ! 相反
val line = x.map(_.value()) //
line.foreach(println)
}
}
)*/
streamRdd.foreachRDD{
x => if(!x.isEmpty()) {
println("--------读取数据是SCON------")
val records = x.map(_.value())
records.foreach(println)
//创建新的客户端
val property = new java.util.Properties()
property.put("bootstrap.servers", "192.168.226.129:9092")
property.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
property.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
val producer = new KafkaProducer[String,String](property)
//spark 链接 KAFKA
producer.send(new ProducerRecord[String,String]("supermarketRead",records.toString))
//关闭
producer.close()
}
//结果打印
}
//将数据发送到KAFKA 的TOPIC 当中 ,发送到 TOPIC TEST1
// 1)构建
//6)开启SSC , 监控 KAFKA 数据
ssc.start()
ssc.awaitTermination() //监控KAFKA的数据
}
}
3)统计 所有上架和下架各自的数量
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.{Seconds, StreamingContext}
object Super_INOUT {
def main(args: Array[String]):Unit = {
/* spark streaming 实现 kafka 的消费者
1) 构建spqrkconf 本地运行,运行应用程序名称
2) 构建 sparkstreaming ---> streamingContext, 加载配置
3) kafka 配置 broker , key value, group id,消费模式
4) SPARK 链接 KAFKA 订阅,TOPIC, STREAMINGCONTEXT
5) 循环的形式 打印 处理
6) 开启SSC , 监控 KAFKA 数据
*/
// 1) 构建spqrkconf 本地运行,运行应用程序名称
val conf = new SparkConf().setMaster("local[*]").setAppName("SerMarket2024")
//2) 构建 sparkstreaming ---> streamingContext, 加载配置
// StreamingContext 需要导入依赖
// spark streaming 可以进行流式处理,微批次处理, 间隔2秒
val ssc = new StreamingContext(conf, Seconds(2))
// SPARK 输出红色 info信息 --》 error
ssc.sparkContext.setLogLevel("error")
//3) kafka 配置 broker , key value, group id,消费模式
val kfkaParams = Map[String, Object](
"bootstrap.servers" -> "192.168.226.129:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "SuperMarket2024INOUT",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
//4) SPARK 链接 KAFKA 订阅,TOPIC, STREAMINGCONTEXT
//topic name
val topicName = Array("supermarket")
val streamRdd = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topicName, kfkaParams)
)
/* // 返回KAFKA 返回的 STREAMRDD (一段有时间间隔的RDD)
streamRdd.foreachRDD(
x => {
if (! x.isEmpty()){ // 判断是否为空, ! 相反
val line = x.map(_.value()) //
line.foreach(println)
}
}
)*/
/*val inoutStream = streamRdd.map(x => x.value().split("\t"))
val statusCounts = inoutStream.flatMap(x =>{
if(x.length >= 5){
val status = x(4).trim
if("U".equals(status)) List(("U上架",1))
else if("D".equals(status)) List(("D下架",1))
else Nil
}else Nil
}).reduceByKey(_+_)
statusCounts.foreachRDD(x => {
println("--------上架和下架统计-----------")
val counts = x.collect()
counts.foreach { case (status, count) =>
println(s"$status:$count")
}
})*/
val res = streamRdd.map(_.value())
val result = res.flatMap(line=>{
val parts = line.split("\t")
if(parts.length>4){
val status = parts(4).trim
if ("U".equals(status)) List(("U上架", parts(2).toInt))
else if ("D".equals(status)) List(("D下架", parts(2).toInt-parts(3).toInt))
else Nil
} else Nil
}).reduceByKeyAndWindow(_+_,Seconds(4), Seconds(4))
result.foreachRDD(
x => {
println("--------所有上架下架INOUT------")
x.foreach(
obj => {
println(obj)
//创建新的客户端
val property = new java.util.Properties()
property.put("bootstrap.servers", "192.168.226.129:9092")
property.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
property.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
val producer = new KafkaProducer[String, String](property)
//spark 链接 KAFKA
producer.send(new ProducerRecord[String, String]("supermarketINOUT", obj.toString))
//关闭
producer.close()}
//结果打印
//将数据发送到KAFKA 的TOPIC 当中 ,发送到 TOPIC TEST1
// 1)构建
)}
)
//6)开启SSC , 监控 KAFKA 数据
ssc.start()
ssc.awaitTermination() //监控KAFKA的数据
}
}
4)各个货品号 各自的数量
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.{Seconds, StreamingContext}
object Super_ProId {
def main(args: Array[String]):Unit = {
/* spark streaming 实现 kafka 的消费者
1) 构建spqrkconf 本地运行,运行应用程序名称
2) 构建 sparkstreaming ---> streamingContext, 加载配置
3) kafka 配置 broker , key value, group id,消费模式
4) SPARK 链接 KAFKA 订阅,TOPIC, STREAMINGCONTEXT
5) 循环的形式 打印 处理
6) 开启SSC , 监控 KAFKA 数据
*/
// 1) 构建spqrkconf 本地运行,运行应用程序名称
val conf = new SparkConf().setMaster("local[*]").setAppName("SerMarket2024")
//2) 构建 sparkstreaming ---> streamingContext, 加载配置
// StreamingContext 需要导入依赖
// spark streaming 可以进行流式处理,微批次处理, 间隔2秒
val ssc = new StreamingContext(conf, Seconds(2))
// SPARK 输出红色 info信息 --》 error
ssc.sparkContext.setLogLevel("error")
//3) kafka 配置 broker , key value, group id,消费模式
val kfkaParams = Map[String, Object](
"bootstrap.servers" -> "192.168.226.129:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "SuperMarket2024ProId",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
//4) SPARK 链接 KAFKA 订阅,TOPIC, STREAMINGCONTEXT
//topic name
val topicName = Array("supermarket")
val streamRdd = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topicName, kfkaParams)
)
val res = streamRdd.map(_.value())
val result = res.flatMap(line=>{
val parts = line.split("\t")
if(parts.length>4){
val ID = parts(0).trim
if ("1995922".equals(ID)) List(("货品号1995922", parts(2).toInt-parts(3).toInt))
else if ("1996119".equals(ID)) List(("货品号1996119", parts(2).toInt-parts(3).toInt))
else if ("1997201".equals(ID)) List(("货品号1997201", parts(2).toInt-parts(3).toInt))
else if ("1999423".equals(ID)) List(("货品号1999423", parts(2).toInt-parts(3).toInt))
else if ("2024123".equals(ID)) List(("货品号2024123", parts(2).toInt-parts(3).toInt))
else Nil
} else Nil
}).reduceByKeyAndWindow(_+_,Seconds(4), Seconds(4))
result.foreachRDD(
x => {
println("--------各个货品号ID------")
x.foreach(
obj => {
println(obj)
//创建新的客户端
val property = new java.util.Properties()
property.put("bootstrap.servers", "192.168.226.129:9092")
property.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
property.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
val producer = new KafkaProducer[String, String](property)
//spark 链接 KAFKA
producer.send(new ProducerRecord[String, String]("supermarketProId", obj.toString))
//关闭
producer.close()}
//结果打印
//将数据发送到KAFKA 的TOPIC 当中 ,发送到 TOPIC TEST1
// 1)构建
)}
)
//6)开启SSC , 监控 KAFKA 数据
ssc.start()
ssc.awaitTermination() //监控KAFKA的数据
}
}
5)各个类别的物品数量
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.{Seconds, StreamingContext}
object Super_ProCla {
def main(args: Array[String]):Unit = {
/* spark streaming 实现 kafka 的消费者
1) 构建spqrkconf 本地运行,运行应用程序名称
2) 构建 sparkstreaming ---> streamingContext, 加载配置
3) kafka 配置 broker , key value, group id,消费模式
4) SPARK 链接 KAFKA 订阅,TOPIC, STREAMINGCONTEXT
5) 循环的形式 打印 处理
6) 开启SSC , 监控 KAFKA 数据
*/
// 1) 构建spqrkconf 本地运行,运行应用程序名称
val conf = new SparkConf().setMaster("local[*]").setAppName("SerMarket2024")
//2) 构建 sparkstreaming ---> streamingContext, 加载配置
// StreamingContext 需要导入依赖
// spark streaming 可以进行流式处理,微批次处理, 间隔2秒
val ssc = new StreamingContext(conf, Seconds(2))
// SPARK 输出红色 info信息 --》 error
ssc.sparkContext.setLogLevel("error")
//3) kafka 配置 broker , key value, group id,消费模式
val kfkaParams = Map[String, Object](
"bootstrap.servers" -> "192.168.226.129:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "SuperMarket2024ProCla",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
//4) SPARK 链接 KAFKA 订阅,TOPIC, STREAMINGCONTEXT
//topic name
val topicName = Array("supermarket")
val streamRdd = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topicName, kfkaParams)
)
/* // 返回KAFKA 返回的 STREAMRDD (一段有时间间隔的RDD)
streamRdd.foreachRDD(
x => {
if (! x.isEmpty()){ // 判断是否为空, ! 相反
val line = x.map(_.value()) //
line.foreach(println)
}
}
)*/
/*val inoutStream = streamRdd.map(x => x.value().split("\t"))
val statusCounts = inoutStream.flatMap(x =>{
if(x.length >= 5){
val status = x(4).trim
if("U".equals(status)) List(("U上架",1))
else if("D".equals(status)) List(("D下架",1))
else Nil
}else Nil
}).reduceByKey(_+_)
statusCounts.foreachRDD(x => {
println("--------上架和下架统计-----------")
val counts = x.collect()
counts.foreach { case (status, count) =>
println(s"$status:$count")
}
})*/
val res = streamRdd.map(_.value())
val result = res.flatMap(line=>{
val parts = line.split("\t")
if(parts.length>4){
val Cla = parts(1).trim
if ("Zico".equals(Cla)) List(("货品类Zico", parts(2).toInt-parts(3).toInt))
else if ("Tzuyu".equals(Cla)) List(("货品类Tzuyu", parts(2).toInt-parts(3).toInt))
else if ("Chaeyoung".equals(Cla)) List(("货品类Chaeyoung", parts(2).toInt-parts(3).toInt))
else if ("Dahyun".equals(Cla)) List(("货品类Dahyun", parts(2).toInt-parts(3).toInt))
else if ("Mina".equals(Cla)) List(("货品类Mina", parts(2).toInt-parts(3).toInt))
else if ("Jihyo".equals(Cla)) List(("货品类Jihyo", parts(2).toInt-parts(3).toInt))
else if ("Sana".equals(Cla)) List(("货品类Sana", parts(2).toInt-parts(3).toInt))
else if ("Momo".equals(Cla)) List(("货品类Momo", parts(2).toInt-parts(3).toInt))
else if ("Jeongyeon".equals(Cla)) List(("货品类Jeongyeon", parts(2).toInt-parts(3).toInt))
else if ("Nayeon".equals(Cla)) List(("货品类Nayeon", parts(2).toInt-parts(3).toInt))
else Nil
} else Nil
}).reduceByKeyAndWindow(_+_,Seconds(4), Seconds(4))
result.foreachRDD(
x => {
println("--------各个货品类CLASS------")
x.foreach(
obj => {
println(obj)
//创建新的客户端
val property = new java.util.Properties()
property.put("bootstrap.servers", "192.168.226.129:9092")
property.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
property.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
val producer = new KafkaProducer[String, String](property)
//spark 链接 KAFKA
producer.send(new ProducerRecord[String, String]("supermarketProCla", obj.toString))
//关闭
producer.close()}
//结果打印
//将数据发送到KAFKA 的TOPIC 当中 ,发送到 TOPIC TEST1
// 1)构建
)}
)
//6)开启SSC , 监控 KAFKA 数据
ssc.start()
ssc.awaitTermination() //监控KAFKA的数据
}
}
6)各个货品号的上架和下架数量
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
object Super_Sql {
def main(args: Array[String]): Unit = {
val isLocal = if (args.length > 0) args(0).toBoolean else true // 提供默认值为 true
val conf = new SparkConf().setAppName("SerMarket2024")
if (isLocal) {
conf.setMaster("local[*]")
}
val ssc = new StreamingContext(conf, Seconds(2))
// SPARK 输出红色 info信息 --》 error
ssc.sparkContext.setLogLevel("error")
//3) kafka 配置 broker , key value, group id,消费模式
val kfkaParams = Map[String, Object](
"bootstrap.servers" -> "192.168.226.129:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "SuperMarket2024Sql",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
//4) SPARK 链接 KAFKA 订阅,TOPIC, STREAMINGCONTEXT
//topic name
val topicName = Array("supermarket")
val streamRdd = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topicName, kfkaParams)
)
val res = streamRdd.map(_.value())
val result = res.flatMap(line => {
val parts = line.split("\t")
if (parts.length > 4) {
val ID = parts(0).trim
val status = parts(4).trim
if ("1995922".equals(ID)) {
if ("U".equals(status)) List(("1995922U上架", parts(2).toInt))
else if ("D".equals(status)) List(("1995922D下架", parts(2).toInt - parts(3).toInt))
else Nil
}
else if ("1996119".equals(ID)) {
if ("U".equals(status)) List(("1996119U上架", parts(2).toInt))
else if ("D".equals(status)) List(("1996119D下架", parts(2).toInt - parts(3).toInt))
else Nil
}
else if ("1997201".equals(ID)) {
if ("U".equals(status)) List(("1997201U上架", parts(2).toInt))
else if ("D".equals(status)) List(("1997201D下架", parts(2).toInt - parts(3).toInt))
else Nil
}
else if ("1999423".equals(ID)) {
if ("U".equals(status)) List(("1999423U上架", parts(2).toInt))
else if ("D".equals(status)) List(("1999423D下架", parts(2).toInt - parts(3).toInt))
else Nil
}
else if ("2024123".equals(ID)) {
if ("U".equals(status)) List(("2024123U上架", parts(2).toInt))
else if ("D".equals(status)) List(("2024123D下架", parts(2).toInt - parts(3).toInt))
else Nil
}
else Nil
} else Nil
}).reduceByKeyAndWindow(_ + _,
Seconds(2),
Seconds(2)
)
//result.print()
val spSess = SparkSession.builder
.appName("superMarketSql")
.master("local[*]")
.getOrCreate()
import spSess.implicits._
/*x =>
if(!x.isEmpty()){
val df: DataFrame = x.toDF("ID", "U上架", "D下架")
df.show()
}*/
result.foreachRDD {
x =>
if (!x.isEmpty()) {
val df: DataFrame = x.toDF("IDSTATUS","COUNT")
df.show()
// DB
df.write.format("jdbc")
.option("url", "jdbc:mysql://127.0.0.1:3306/supermarketsql?characterEncoding=UTF8&sueSSL=false")
.option("dbtable", "supersqlnew")
.option("user", "root")
.option("password", "123456")
.mode(SaveMode.Append)
.save()
// RDD
}
}
ssc.start()
ssc.awaitTermination()
}
}
7)各个货品号各个物品上架和下架的数量
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.{Seconds, StreamingContext}
object Super_IdProStuCou{
def main(args: Array[String]):Unit = {
/* spark streaming 实现 kafka 的消费者
1) 构建spqrkconf 本地运行,运行应用程序名称
2) 构建 sparkstreaming ---> streamingContext, 加载配置
3) kafka 配置 broker , key value, group id,消费模式
4) SPARK 链接 KAFKA 订阅,TOPIC, STREAMINGCONTEXT
5) 循环的形式 打印 处理
6) 开启SSC , 监控 KAFKA 数据
*/
// 1) 构建spqrkconf 本地运行,运行应用程序名称
val conf = new SparkConf().setMaster("local[*]").setAppName("SerMarket2024")
//2) 构建 sparkstreaming ---> streamingContext, 加载配置
// StreamingContext 需要导入依赖
// spark streaming 可以进行流式处理,微批次处理, 间隔2秒
val ssc = new StreamingContext(conf, Seconds(2))
// SPARK 输出红色 info信息 --》 error
ssc.sparkContext.setLogLevel("error")
//3) kafka 配置 broker , key value, group id,消费模式
val kfkaParams = Map[String, Object](
"bootstrap.servers" -> "192.168.226.129:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "SuperMarket2024IdProStuCou",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
//4) SPARK 链接 KAFKA 订阅,TOPIC, STREAMINGCONTEXT
//topic name
val topicName = Array("supermarket")
val streamRdd = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topicName, kfkaParams)
)
/* // 返回KAFKA 返回的 STREAMRDD (一段有时间间隔的RDD)
streamRdd.foreachRDD(
x => {
if (! x.isEmpty()){ // 判断是否为空, ! 相反
val line = x.map(_.value()) //
line.foreach(println)
}
}
)*/
/*val inoutStream = streamRdd.map(x => x.value().split("\t"))
val statusCounts = inoutStream.flatMap(x =>{
if(x.length >= 5){
val status = x(4).trim
if("U".equals(status)) List(("U上架",1))
else if("D".equals(status)) List(("D下架",1))
else Nil
}else Nil
}).reduceByKey(_+_)
statusCounts.foreachRDD(x => {
println("--------上架和下架统计-----------")
val counts = x.collect()
counts.foreach { case (status, count) =>
println(s"$status:$count")
}
})*/
val res = streamRdd.map(_.value())
val result = res.flatMap(line => {
val parts = line.split("\t")
if (parts.length > 4) {
val ID = parts(0).trim
val status = parts(4).trim
val name = parts(1).trim
if ("1995922".equals(ID)) {
if("Nayeon".equals(name)) {
if ("U".equals(status)) List(("1995922NayeonU上架", parts(2).toInt))
else if ("D".equals(status)) List(("1995922NayeonD下架", parts(2).toInt - parts(3).toInt))
else Nil
}
else if("Jeongyeon".equals(name)) {
if ("U".equals(status)) List(("1995922JeongyeonU上架", parts(2).toInt))
else if ("D".equals(status)) List(("1995922JeongyeonD下架", parts(2).toInt - parts(3).toInt))
else Nil
}
else Nil
}
else if ("1996119".equals(ID)) {
if ("Momo".equals(name)) {
if ("U".equals(status)) List(("1996119MomoU上架", parts(2).toInt))
else if ("D".equals(status)) List(("1996119MomoD下架", parts(2).toInt - parts(3).toInt))
else Nil
}
else if ("Sana".equals(name)) {
if ("U".equals(status)) List(("1996119SanaU上架", parts(2).toInt))
else if ("D".equals(status)) List(("1996119SanaD下架", parts(2).toInt - parts(3).toInt))
else Nil
}
else if ("Mina".equals(name)) {
if ("U".equals(status)) List(("1996119MinaU上架", parts(2).toInt))
else if ("D".equals(status)) List(("1996119MinaD下架", parts(2).toInt - parts(3).toInt))
else Nil
}
else Nil
}
else if ("1997201".equals(ID)) {
if ("Jihyo".equals(name)) {
if ("U".equals(status)) List(("1997201JihyoU上架", parts(2).toInt))
else if ("D".equals(status)) List(("1997201JihyoD下架", parts(2).toInt - parts(3).toInt))
else Nil
}
else if ("Dahyun".equals(name)) {
if ("U".equals(status)) List(("1997201DahyunU上架", parts(2).toInt))
else if ("D".equals(status)) List(("1997201DahyunD下架", parts(2).toInt - parts(3).toInt))
else Nil
}
else Nil
}
else if ("1999423".equals(ID)) {
if ("Chaeyoung".equals(name)) {
if ("U".equals(status)) List(("1999423ChaeyoungU上架", parts(2).toInt))
else if ("D".equals(status)) List(("1999423ChaeyoungD下架", parts(2).toInt - parts(3).toInt))
else Nil
}
else if ("Tzuyu".equals(name)) {
if ("U".equals(status)) List(("1999423TzuyuU上架", parts(2).toInt))
else if ("D".equals(status)) List(("1999423TzuyuD下架", parts(2).toInt - parts(3).toInt))
else Nil
}
else Nil
}
else if ("2024123".equals(ID)) {
if ("Zico".equals(name)) {
if ("U".equals(status)) List(("2024123ZicoU上架", parts(2).toInt))
else if ("D".equals(status)) List(("2024123ZicoD下架", parts(2).toInt - parts(3).toInt))
else Nil
}
else Nil
}
else Nil
} else Nil
}).reduceByKeyAndWindow(_ + _,
Seconds(4),
Seconds(4)
)
result.foreachRDD(
x => {
println("--------各个货品号各个物品上下架IPSC------")
x.foreach(
obj => {
println(obj.toString)
//创建新的客户端
val property = new java.util.Properties()
property.put("bootstrap.servers", "192.168.226.129:9092")
property.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
property.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
val producer = new KafkaProducer[String, String](property)
//spark 链接 KAFKA
producer.send(new ProducerRecord[String, String]("supermarketIdProStuCou", obj.toString))
//关闭
producer.close()}
//结果打印
//将数据发送到KAFKA 的TOPIC 当中 ,发送到 TOPIC TEST1
// 1)构建
)}
)
//6)开启SSC , 监控 KAFKA 数据
ssc.start()
ssc.awaitTermination() //监控KAFKA的数据
}
}
8)推送到 mysql 使用 echarts展示
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
object Super_SqlEcharts {
def main(args: Array[String]): Unit = {
val isLocal = if (args.length > 0) args(0).toBoolean else true // 提供默认值为 true
val conf = new SparkConf().setAppName("SerMarket2024")
if (isLocal) {
conf.setMaster("local[*]")
}
val ssc = new StreamingContext(conf, Seconds(2))
// SPARK 输出红色 info信息 --》 error
ssc.sparkContext.setLogLevel("error")
//3) kafka 配置 broker , key value, group id,消费模式
val kfkaParams = Map[String, Object](
"bootstrap.servers" -> "192.168.226.129:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "SuperMarket2024SqlEcharts",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
//4) SPARK 链接 KAFKA 订阅,TOPIC, STREAMINGCONTEXT
//topic name
val topicName = Array("supermarket")
val streamRdd = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topicName, kfkaParams)
)
val res = streamRdd.map(_.value())
val result = res.flatMap(line => {
val parts = line.split("\t")
if (parts.length > 4) {
val ID = parts(0).trim
val status = parts(4).trim
val name = parts(1).trim
if ("1995922".equals(ID)) {
if ("Nayeon".equals(name)) {
if ("U".equals(status)) List(("1995922NayeonU上架", parts(2).toInt))
else if ("D".equals(status)) List(("1995922NayeonD下架", parts(2).toInt - parts(3).toInt))
else Nil
}
else if ("Jeongyeon".equals(name)) {
if ("U".equals(status)) List(("1995922JeongyeonU上架", parts(2).toInt))
else if ("D".equals(status)) List(("1995922JeongyeonD下架", parts(2).toInt - parts(3).toInt))
else Nil
}
else Nil
}
else if ("1996119".equals(ID)) {
if ("Momo".equals(name)) {
if ("U".equals(status)) List(("1996119MomoU上架", parts(2).toInt))
else if ("D".equals(status)) List(("1996119MomoD下架", parts(2).toInt - parts(3).toInt))
else Nil
}
else if ("Sana".equals(name)) {
if ("U".equals(status)) List(("1996119SanaU上架", parts(2).toInt))
else if ("D".equals(status)) List(("1996119SanaD下架", parts(2).toInt - parts(3).toInt))
else Nil
}
else if ("Mina".equals(name)) {
if ("U".equals(status)) List(("1996119MinaU上架", parts(2).toInt))
else if ("D".equals(status)) List(("1996119MinaD下架", parts(2).toInt - parts(3).toInt))
else Nil
}
else Nil
}
else if ("1997201".equals(ID)) {
if ("Jihyo".equals(name)) {
if ("U".equals(status)) List(("1997201JihyoU上架", parts(2).toInt))
else if ("D".equals(status)) List(("1997201JihyoD下架", parts(2).toInt - parts(3).toInt))
else Nil
}
else if ("Dahyun".equals(name)) {
if ("U".equals(status)) List(("1997201DahyunU上架", parts(2).toInt))
else if ("D".equals(status)) List(("1997201DahyunD下架", parts(2).toInt - parts(3).toInt))
else Nil
}
else Nil
}
else if ("1999423".equals(ID)) {
if ("Chaeyoung".equals(name)) {
if ("U".equals(status)) List(("1999423ChaeyoungU上架", parts(2).toInt))
else if ("D".equals(status)) List(("1999423ChaeyoungD下架", parts(2).toInt - parts(3).toInt))
else Nil
}
else if ("Tzuyu".equals(name)) {
if ("U".equals(status)) List(("1999423TzuyuU上架", parts(2).toInt))
else if ("D".equals(status)) List(("1999423TzuyuD下架", parts(2).toInt - parts(3).toInt))
else Nil
}
else Nil
}
else if ("2024123".equals(ID)) {
if ("Zico".equals(name)) {
if ("U".equals(status)) List(("2024123ZicoU上架", parts(2).toInt))
else if ("D".equals(status)) List(("2024123ZicoD下架", parts(2).toInt - parts(3).toInt))
else Nil
}
else Nil
}
else Nil
} else Nil
}).reduceByKeyAndWindow(_ + _,
Seconds(4),
Seconds(4)
)
//result.print()
val spSess = SparkSession.builder
.appName("echarts")
.master("local[*]")
.getOrCreate()
import spSess.implicits._
/*x =>
if(!x.isEmpty()){
val df: DataFrame = x.toDF("ID", "U上架", "D下架")
df.show()
}*/
result.foreachRDD {
x =>
if (!x.isEmpty()) {
val df: DataFrame = x.toDF("IDOWNSTATUS","COUNT")
df.show()
// DB
df.write.format("jdbc")
.option("url", "jdbc:mysql://127.0.0.1:3306/echarts?characterEncoding=UTF8&sueSSL=false")
.option("dbtable", "superecharts")
.option("user", "root")
.option("password", "123456")
.mode(SaveMode.Append)
.save()
// RDD
}
}
ssc.start()
ssc.awaitTermination()
}
}
标签:val,PYTHON,toInt,equals,value,else,FLASK,parts,KAFKA
From: https://blog.csdn.net/duibuqiwoshijing/article/details/145112345