转换算子1---map,flatMap
- RDD支持的转换算子DStream大部分都是支持的
map、flatMap、filter、distinct、union、join、reduceByKey......
RDD中部分行动算子DStream会当作转换算子使用,算子的执行逻辑是一样的
package SparkStreaming.trans
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Milliseconds, StreamingContext}
object Transform1 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("trans1").setMaster("local[3]")
val ssc = new StreamingContext(conf, Milliseconds(5000))
val ds = ssc.socketTextStream("node1", 44444, StorageLevel.MEMORY_ONLY_SER)
/**
* 将用户通过端口发送的 脏话过滤
*/
//1、脏话使用*来表示
val ds1 = ds.map((line: String) => {
val str = line.replaceAll("操", "*").replaceAll("草", "*")
str
})
// 2. 脏话直接不显示 直接过滤掉
val ds2 = ds.filter((line: String) => {
if (line.contains("屮") || line.contains("尼玛")) {
false
} else {
true
}
})
println("-------")
ds1.print()
println("-------")
ds2.print()
println("-------")
ssc.start()
ssc.awaitTermination()
}
}
转换算子2 count reduce
package SparkStreaming.trans
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Milliseconds, StreamingContext}
/**
* DStream中reduce、count两个算子 在我们的RDD中属于行动算子,但是在DStream中是转换算子
* 两个算子会给我们返回一个新的DStream,DStream就是reduce和count计算完成的结果类型
*/
object Transform2 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("trans2").setMaster("local[2]")
val ssc = new StreamingContext(conf, Milliseconds(3000))
val ds = ssc.socketTextStream("node1", 44444, StorageLevel.MEMORY_ONLY)
val ds1: DStream[Long] = ds.count()
val ds2 = ds.reduce(_ + _)
ds1.print()
ds2.print()
ssc.start()
ssc.awaitTermination()
}
}
转换算子3 transform
- DStream中一个特殊转换算子transform
package SparkStreaming.trans
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
object Transform3 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[3]").setAppName("transform3")
val ssc = new StreamingContext(conf, Seconds(3))
val ds = ssc.socketTextStream("node1", 44444, StorageLevel.MEMORY_ONLY)
val ds1:DStream[(String,Int)] = ds.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
val ds3 = ds.transform((rdd: RDD[String]) => {
val ds2: RDD[(String, Int)] = rdd.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
ds2
})
ds1.print()
ds3.print()
ssc.start()
ssc.awaitTermination()
}
}
标签:转换,val,org,SparkStreaming,算子,apache,import,spark
From: https://www.cnblogs.com/jsqup/p/16649333.html