查看
// 默认是前10条
print(num)
保存数据
- 一批次产生一个文件
package SparkStreaming.action
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Milliseconds, StreamingContext}
object Action1 {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("action1").setMaster("local[2]")
val ssc = new StreamingContext(sparkConf, Milliseconds(3000))
val ds: DStream[String] = ssc.socketTextStream("node1", 44444)
// 将a开头的数据 保存成为一个文本文件格式
ds.filter((line: String) => {
val first: Char = line.charAt(0)
if (first == 'a') {
true
} else {
false
}
}).saveAsTextFiles("hdfs://node1:9000/spark", ".txt")
// 将b开头的数据 保存成为一个Object序列化对象数据
ds.filter((line: String) => {
val first:Char = line.charAt(0)
if(first == 'b'){
true
}else{
false
}
}).saveAsObjectFiles("hdfs://node1:9000/sparkobject",".sequence")
ssc.start()
ssc.awaitTermination()
}
}
foreachRDD算子:常用的一种行动算子
- 遍历DStream中每一个RDD,得到DStream中的每一个RDD,然后对每一个RDD数据保存到MySQL数据库,或者使用SparkSession进行SQL查询分析,因为这个算子是DStream的行动算子,会触发DStream依赖链的执行,但是得到的每一个RDD也必须调用行动算子触发
- 我们可以在foreachRDD算子中,实现对DStream每一批次RDD数据保存操作,保存到数据库、保存到结构化文件中等等,通过还可以在foreachRDD算子中将RDD转换为DataFrame或者Dataset实现结构化数据SQL处理操作
- 【注意问题】如果把数据转换DataFrame或者Dataset保存到数据库,那么我们要求DataFrame、Dataset的Structype中列名必须和数据表的列名字段一致
package SparkStreaming.action
import java.util.Properties
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, SaveMode, SparkSession}
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Milliseconds, StreamingContext}
object Action2 {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("action2").setMaster("local[2]")
val ssc = new StreamingContext(sparkConf, Milliseconds(3000))
val ds: DStream[String] = ssc.socketTextStream("node1", 44444)
ds.foreachRDD((rdd: RDD[String]) => {
val rdd1 = rdd.map((line: String) => {
val array = line.split(" ")
(array(0), array(1).toInt)
})
val session = SparkSession.builder().config(sparkConf).getOrCreate()
import session.implicits._
val frame = rdd1.toDF("name", "age")
// 借助Spark SQL把数据保存到MySQL中
val properties = new Properties()
properties.put("user","root")
properties.put("password","Jsq123456...")
frame.write.mode(SaveMode.Append).jdbc("jdbc:mysql://node1:3306/project?serverTimezone=UTC", "student_info", properties)
})
ssc.start()
ssc.awaitTermination()
}
}
一般情况下 Spark Streaming处理流式数据的时候,如果实时数据是结构化数据类型的话,可以在Spark Streaming中无缝衔接Spark SQL技术对实时的结构化进行统计分析。
标签:String,val,行动,sparkstreaming,算子,import,spark,DStream,ssc
From: https://www.cnblogs.com/jsqup/p/16649586.html