首页 > 其他分享 >SparkStreaming中的转换算子1

SparkStreaming中的转换算子1

时间:2022-09-02 12:15:29浏览次数:66  
标签:转换 val org SparkStreaming 算子 apache import spark

转换算子1---map,flatMap

  • RDD支持的转换算子DStream大部分都是支持的
    map、flatMap、filter、distinct、union、join、reduceByKey......
    RDD中部分行动算子DStream会当作转换算子使用,算子的执行逻辑是一样的
package SparkStreaming.trans

import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Milliseconds, StreamingContext}

object Transform1 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("trans1").setMaster("local[3]")
    val ssc = new StreamingContext(conf, Milliseconds(5000))
    val ds = ssc.socketTextStream("node1", 44444, StorageLevel.MEMORY_ONLY_SER)
    /**
     * 将用户通过端口发送的 脏话过滤
     */
    //1、脏话使用*来表示
    val ds1 = ds.map((line: String) => {
      val str = line.replaceAll("操", "*").replaceAll("草", "*")
      str
    })
    // 2. 脏话直接不显示  直接过滤掉
    val ds2 = ds.filter((line: String) => {
      if (line.contains("屮") || line.contains("尼玛")) {
        false
      } else {
        true
      }
    })
    println("-------")
    ds1.print()
    println("-------")
    ds2.print()
    println("-------")

    ssc.start()
    ssc.awaitTermination()
  }
}

转换算子2 count reduce

package SparkStreaming.trans

import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Milliseconds, StreamingContext}
/**
 *  DStream中reduce、count两个算子  在我们的RDD中属于行动算子,但是在DStream中是转换算子
 *  两个算子会给我们返回一个新的DStream,DStream就是reduce和count计算完成的结果类型
 */
object Transform2 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("trans2").setMaster("local[2]")
    val ssc = new StreamingContext(conf, Milliseconds(3000))

    val ds = ssc.socketTextStream("node1", 44444, StorageLevel.MEMORY_ONLY)
    val ds1: DStream[Long] = ds.count()
    val ds2 = ds.reduce(_ + _)
    ds1.print()
    ds2.print()

    ssc.start()
    ssc.awaitTermination()
  }
}

转换算子3 transform

  • DStream中一个特殊转换算子transform
package SparkStreaming.trans

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, StreamingContext}

object Transform3 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[3]").setAppName("transform3")
    val ssc = new StreamingContext(conf, Seconds(3))
    val ds = ssc.socketTextStream("node1", 44444, StorageLevel.MEMORY_ONLY)
    val ds1:DStream[(String,Int)] = ds.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
    val ds3 = ds.transform((rdd: RDD[String]) => {
      val ds2: RDD[(String, Int)] = rdd.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
      ds2
    })
    ds1.print()
    ds3.print()

    ssc.start()
    ssc.awaitTermination()
  }
}

标签:转换,val,org,SparkStreaming,算子,apache,import,spark
From: https://www.cnblogs.com/jsqup/p/16649333.html

相关文章

  • 如何在 JavaScript 中将 JSON 转换为 CSV
    如何在JavaScript中将JSON转换为CSV下面是我们如何在JavaScript中轻松地将JSON转换为CSV:函数jsonToCsv(项目){constheader=Object.keys(items[0]);常......
  • 转换流
    @Testpublicvoidtest4()throwsIOException{InputStreamReaderisr=null;try{FileInputStreamfis=newFileInputStream("......
  • date和string相互转换
    日期和string相互转换DateDem.javapackagepractice;importjava.text.ParseException;importjava.util.Date;publicclassDateDemo{/***日期和时......
  • [CSharpTips]C# 将DataTable转换为类
    将DataTable转换为类众所周知,有时候我们需要将sql查询返回的DataTable转换为类。最开始是使用循环一个个给类的属性赋值,但是这样效率低并且无法复用。后来了解到利用Data......
  • 【小工具】es6转换成es5
     2、在项目根目录创建.babelrc文件{"presets":["es2015"],"plugins":[]}  3.1)安装babel-clinpminstall-gbabel-cli3.2)安装bab......
  • c++的类型转换
    1.int转string,函数to_string()x=10;stringm=to_string(x);经测试gccv5.4.0版本不支持,版本v7.5.0支持。判断版本号命令:g++-v同样适用于double,float2.string转int,......
  • 进程的状态与转换
    进程的状态与转换状态运行态:占有CPU就绪态:已经具备运行条件,但由于没有空闲CPU,而暂时不能运行阻塞态(又称等待态):因等待某一事而暂时不能运行以上三种称为进程的......
  • sparkstreaming的创建方式及运行流程及注意事项
    sparkstreaming创建有两种方式1.借助SparkConf对象创建valconf=newSparkConf().setAppName("streamingContext").setMaster("local[4]")/***streamingcontex......
  • Antd之No Data转换为中文
    Antd默认的暂无数据是英文的,如下图表格所示。修改的方法为:使用a-config-provider1.在App.vue增加a-config-provider,包装显示的页面<template><a-config-provider......
  • 气象相关,转换风力,风向的工具类
    importjava.util.LinkedHashMap;importjava.util.Map;/***气象工具**/publicclassWeatherUtil{//风力privatestaticfinalMap<String,Ran......