首页 > 其他分享 >Flink - [03] API

Flink - [03] API

时间:2024-06-17 15:11:57浏览次数:22  
标签:03 flink val Flink API env apache org sensor

使用scala编写flink api从不同的数据源(源端)读取数据,并进行无界流/有界流的数据处理,最终将处理好的数据sink到对应的目标端

 

一、maven配置

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>flinkapi</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_2.11</artifactId>
            <version>1.7.2</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.11</artifactId>
            <version>1.7.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.11_2.11</artifactId>
            <version>1.7.2</version>
        </dependency>

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.44</version>
        </dependency>


        <dependency>
            <groupId>org.apache.bahir</groupId>
            <artifactId>flink-connector-redis_2.11</artifactId>
            <version>1.0</version>
        </dependency>
        <!--<dependency>-->
        <!--<groupId>org.apache.flink</groupId>-->
        <!--<artifactId>flink-connector-elasticsearch6_2.11</artifactId>-->
        <!--<version>1.7.2</version>-->
        <!--</dependency>-->

        <!--<dependency>-->
        <!--<groupId>org.apache.flink</groupId>-->
        <!--<artifactId>flink-statebackend-rocksdb_2.11</artifactId>-->
        <!--<version>1.7.2</version>-->
        <!--</dependency>-->

    </dependencies>

    <build>
        <sourceDirectory>src/main/scala</sourceDirectory>
        <testSourceDirectory>src/test/scala</testSourceDirectory>
        <plugins>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                        <configuration>
                            <args>
                                <!--<arg>-make:transitive</arg>-->
                                <arg>-dependencyfile</arg>
                                <arg>${project.build.directory}/.scala_dependencies</arg>
                            </args>
                        </configuration>
                    </execution>
                </executions>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.4.3</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>WordCount</mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>



</project>

 

 

二、WordCount

(1)准备好wordcount.txt文本

hello world
dog fish
hadoop
spark
hello world
dog fish
hadoop
spark
hello world
dog fish
hadoop
spark

(2)有界数据流有状态计算(scala语言)

package com.harley.test

import org.apache.flink.api.scala._

object Demo01_ExecutionEnvironment {

    def main(args: Array[String]): Unit = {

        val env = ExecutionEnvironment.getExecutionEnvironment

        val ds: DataSet[String] = env.readTextFile("src/main/resources/wordcount.txt")

        val res = ds.flatMap(_.split(" "))
          .map((_,1))
          .groupBy(0)  //元组的第一个元素
          .sum(1)

        //输出结果
        res.print()

    }

}

(3)无界数据流有状态计算

package com.harley.test

import org.apache.flink.streaming.api.scala._

object Demo02_StreamExecutionEnvironment {

  def main(args: Array[String]): Unit = {

    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val ds: DataStream[String] = env.socketTextStream("localhost", 9999)

    val res = ds.flatMap(_.split(" "))
      .filter(_ != "a")
      .map((_, 1))
      .keyBy(0)
      .sum(1)

    // 设置并行度
    res.print("Stream print").setParallelism(2)

    env.execute("Stream job")

  }

}

 

 

三、整合不同的数据源

3.1、将集合作为数据源

import org.apache.flink.streaming.api.scala._

object Demo01_Source{

  def main(args: Array[String]): Unit = {

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    val ds = env.fromCollection(List(
      SensorReader("sensor_1", 1547718199, 35.80018327300259),
      SensorReader("sensor_6", 1547718201, 15.402984393403084),
      SensorReader("sensor_7", 1547718202, 6.720945201171228),
      SensorReader("sensor_10", 1547718205, 38.101067604893444)
    ))

    ds.print("collection source")

    env.execute("source job")

  }
  case class SensorReader(id: String,timestamp: Long,temperature: Double)
}

 

3.2、Kafka作为数据源

(1)启动Kafka服务,创建topic,并启动一个producer,向producer中发送数据

nohup bin/kafka-server-config.sh config/server.properties &
bin/kafka-console-producer.sh --bootstrap-server node7-2:9092 --topic test

(2)通过Flink API服务topic中的数据

import java.util.Properties

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer010}

object Demo02_KafkaSource {
  def main(args: Array[String]): Unit = {

    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    // 设置全局并行度
    env.setParallelism(1)
    val props = new Properties()
    props.setProperty("bootstrap.servers","node7-2:9092")
    props.setProperty("group.id","kafkasource")
    props.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer")
    props.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer")
    props.setProperty("auto.offset.reset","latest")

    val stream = env.addSource(new FlinkKafkaConsumer010[String](
      "test", // topic
      new SimpleStringSchema(),
      props
    ))
    stream.print("kafka source")
    env.execute("job")
  }
}

 

 

3.3、自定义Source

import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala._

import scala.util.Random

object Demo03_DefinedSource {
  def main(args: Array[String]): Unit = {

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    val stream = env.addSource(new SensorSource())

    stream.print("defined source")
    env.execute("defined job")

  }

  class SensorSource extends SourceFunction[Sensor1]{

    var running: Boolean = true

    override def run(sourceContext: SourceFunction.SourceContext[Sensor1]): Unit = {

      val random = new Random()

      var currentTemp = 1.to(10).map(

        i => ("sensor_"+i,random.nextGaussian()*20+60)
      )

      while(running){

        currentTemp = currentTemp.map(tuple=>{
          (tuple._1,tuple._2+random.nextGaussian())
        })
        val ts = System.currentTimeMillis()
        currentTemp.foreach(tuple=>{
          sourceContext.collect(Sensor1(tuple._1,ts,tuple._2))
        })
        Thread.sleep(1000)
      }
    }

    override def cancel(): Unit = running = false
  }

  case class Sensor1(id:String,timestamp:Long,tempreture: Double)
}

 

 

 

四、Flink的方法

4.1、Transform

(1)准备文本文件sensor.txt

sensor_1,1600828704738,62.00978962180007
sensor_2,1600828704738,88.07800632412795
sensor_3,1600828704738,63.85113916269769
sensor_4,1600828704738,88.11328700513668
sensor_5,1600828704738,104.80491942566778
sensor_6,1600828704738,57.27152286624301
sensor_7,1600828704738,42.542439944867574
sensor_8,1600828704738,59.3964933103558
sensor_9,1600828704738,59.967837312594796
sensor_10,1600828704738,77.23695484678282
sensor_1,1600828705745,62.5
sensor_1,1600828705745,88.86940686134874
sensor_3,1600828705745,64.9
sensor_1,1600828705745,87.8
sensor_5,1600828705745,104.33176752272263
sensor_6,1600828705745,56.14405735923403

(2)

import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.streaming.api.scala._

object Demo04_Transform {
  def main(args: Array[String]): Unit = {

    val env = StreamExecutionEnvironment.getExecutionEnvironment

    env.setParallelism(1)

    val stream: DataStream[String] = env.readTextFile("src/main/resources/sensor.txt")

    val value = stream.map(line => {

      val splitedArrs = line.split(",")
      Sensor1(splitedArrs(0), splitedArrs(1).trim.toLong, splitedArrs(2).trim.toDouble)

    })

    val value1 = stream.flatMap(_.split(","))

    stream.filter(_.split(",")(0)=="sensor_1")

    value.keyBy("id")
      .reduce((s1,s2)=>Sensor1(s1.id,s1.timestamp,s2.tempreture+100))
      .print("keyby")

    val value2: KeyedStream[Sensor1, Tuple] = value.keyBy(0)
    env.execute("job")

  }
  case class Sensor1(id:String,timestamp: Long,tempreture: Double)
}

 

 

4.2、Split

import com.jinghang.day13.Demo04_Transform.Sensor1
import org.apache.flink.streaming.api.scala._

object Demo05_Split {
  def main(args: Array[String]): Unit = {

    val env = StreamExecutionEnvironment.getExecutionEnvironment

    env.setParallelism(1)
    val stream: DataStream[String] = env.readTextFile("src/main/resources/sensor.txt")

    val ds: DataStream[Sensor1] = stream.map(line => {
      val splited = line.split(",")
      Sensor1(splited(0), splited(1).trim.toLong, splited(2).trim.toDouble)

    })

    val splitStream: SplitStream[Sensor1] = ds.split(

      sensor => {

        if (sensor.tempreture > 60)
          Seq("high")
        else
          Seq("low")

      })

    val highStream: DataStream[Sensor1] = splitStream.select("high")
    val lowStream: DataStream[Sensor1] = splitStream.select("low")
    val allStream: DataStream[Sensor1] = splitStream.select("high","low")

    val connStream: ConnectedStreams[Sensor1, Sensor1] = highStream.connect(lowStream)
    val unionStream: DataStream[Sensor1] = highStream.union(lowStream).union(allStream)

    highStream.print("high")
    lowStream.print("low")
    allStream.print("all")

    env.execute("job")
  }
}

 

 

4.3、UDF

import com.jinghang.day13.Demo04_Transform.Sensor1
import org.apache.flink.api.common.functions.{FilterFunction, RichFilterFunction, RichFlatMapFunction}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector

object Demo06_UDF {
  def main(args: Array[String]): Unit = {

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val stream: DataStream[String] = env.readTextFile("src/main/resources/sensor.txt")

    val ds: DataStream[Sensor1] = stream.map(line => {
      val splitedArrs = line.split(",")
      Sensor1(splitedArrs(0), splitedArrs(1).trim.toLong, splitedArrs(2).trim.toDouble)
    })

    val dataStream = ds.filter(new UDFFilter())
    ds.filter(new RichFilterFunction[Sensor1] {
      override def filter(value: Sensor1): Boolean = {
        value.id == "sensor_1"
      }

      //可以做一些预操作
      override def open(parameters: Configuration): Unit = super.open(parameters)
    })



    dataStream.print("filter")

    env.execute("filter job")



  }

  class UDFFilter() extends FilterFunction[Sensor1]{
    override def filter(value: Sensor1): Boolean = {
      value.id == "sensor_1"
    }
  }

  class UDFFlatMap extends RichFlatMapFunction[String,String]{
    override def flatMap(value: String, out: Collector[String]): Unit = {

    }
  }
}

 

 

4.4、JDBC Sink

import java.sql.{Connection, DriverManager, PreparedStatement}

import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import org.apache.flink.streaming.api.scala._

object Demo01_JdbcSink {
  def main(args: Array[String]): Unit = {
    // 创建流处理的环境对象
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    // 设置全局并行度
    env.setParallelism(1)

    val stream: DataStream[String] = env.readTextFile("src/main/resources/sensor.txt")

    val dataStream: DataStream[Sensor] = stream.map(line => {
      val splitedArrs = line.split(",")
      Sensor(splitedArrs(0), splitedArrs(1).trim.toLong, splitedArrs(2).trim.toDouble)

    })

    dataStream.addSink(new JDBCSink())

    env.execute("job")
  }

  case class Sensor(id: String,timestamp: Long,temperature: Double)
  class JDBCSink extends RichSinkFunction[Sensor]{

    var connection: Connection = _
    var insertStatement: PreparedStatement = _
    var updateStatement: PreparedStatement = _

    /**
     * 初始化操作
     * @param parameters
     */
    override def open(parameters: Configuration): Unit = {

      Class.forName("com.mysql.cj.jdbc.Driver")
      connection = DriverManager.getConnection("jdbc:mysql:///test?serverTimezone=UTC","root","123456")
      insertStatement = connection.prepareStatement("insert into temperatures (sensor,temps) values (?,?)")
      println(insertStatement)
      updateStatement = connection.prepareCall("update temperatures set temps = ? where sensor = ?")
      println(updateStatement)
    }

    override def invoke(value: Sensor, context: SinkFunction.Context[_]): Unit = {
      updateStatement.setDouble(1,value.temperature)
      updateStatement.setString(2,value.id)
      updateStatement.execute()

      if (updateStatement.getUpdateCount == 0){
        insertStatement.setString(1,value.id)
        insertStatement.setDouble(2,value.temperature)
        insertStatement.execute()
      }
    }
    override def close(): Unit = {
      insertStatement.close()
      updateStatement.close()
      connection.close()
    }
  }
}

 

 

 

— 业精于勤荒于嬉,行成于思毁于随 —

标签:03,flink,val,Flink,API,env,apache,org,sensor
From: https://www.cnblogs.com/houhuilinblogs/p/18252425

相关文章

  • DDD落地 - 实现一个简单的API
    1.前言去年看了这个系列产品代码都给你看了,可别再说不会DDD的文章,对DDD了解得深了一些,结果长时间不用,全给忘光了。最近又捡起来看了看,然后实操了一下,做了个小小的demo,加深一下印象,下次哪个项目可能就能用上。另外,这个demo没有做领域事件,回头再加上。代码地址:https://github.......
  • 无代码侵入自动生成API接口文档——EasyYapi
    无代码侵入自动生成API接口文档——EasyYapi在idea中下载EasyYapi插件配置EasyYapi插件(基本都是这步存在问题)server配置很多服务器都配置了代理,所以这个地址很容易填写错误(我多配置了/yapi),这里只是yapi服务器地址,他会自动加/yapi,如果代理规则也是这个,那么就是代理服务器地址......
  • 程序员修炼之道:从小工到专家阅读笔记03
    这本书的适用范围可以从初学者到有经验的程序员再到项目经理,作为一本偏向理论与思想的书,书中不可避免有些假大空的地方,再加上作者写完本书的时间还在1999年,书中的很多方法与标准放在今天也已不再实用。但这些都不能掩盖它的优秀之处,作者曾在本书完成十年后说过,如果这本书是放在现......
  • 升级到.Net 8 api 返回JObject 对象为空字符串
    在使用dotnet8过程中,使用了JObject类型作为api的返回,但是返回的空数组api:[HttpGet("voices")]publicasyncTask<IActionResult>GetObject(){JObjectobj=newJObject();obj["test"]="test";returnnewJsonResult(obj){StatusCod......
  • 如何将图片转换为向量?(通过DashScope API调用)
    本文介绍如何通过模型服务灵积DashScope将图片转换为向量,并入库至向量检索服务。DashVector中进行向量检索。,通过灵活、易用的模型API服务,让各种模态模型的能力,都能方便的为AI开发者所用。通过灵积API,开发者不仅可以直接集成大模型的强大能力,也可以对模型进行训练微调,实现模型......
  • Python数据分析与建模库-03数据分析处理库Pandas-1.数据读取
    该视频主要讲述了pandas库在数据处理中的重要性。首先介绍了pandas库是基于numpy库封装了一些操作,简化了数据处理过程。然后通过读取CSV文件的例子,演示了如何使用pandas的read_csv函数将数据读入,并展示了数据类型和数据格式。接着介绍了pandas库中的DataFrame格式,它可以看作......
  • Flink面试必问题:时间和窗口处理面试题及参考答案(3万字长文)
    目录Flink中的事件时间(EventTime)和处理时间(ProcessingTime)有什么区别?Flink的容错机制是如何实现的?Flink中的窗口(Window)是什么?Flink支持哪些类型的窗口?如何定义一个滚动窗口(TumblingWindow)?如何定义一个滑动窗口(SlidingWindow)?如何定义一个会话窗口(SessionWindow)?Flin......
  • Apipost模拟HTTP客户端
    模拟HTTP客户端的软件有很多,其中比较著名的就有API-FOX、POSTMAN。相信很多小伙伴都使用POSTMAN。这篇博客主要介绍Apipost的原因是,Apipost无需下载,具有网页版。APIFOX的站内下载:Api-Fox,类似于PostMan的软件_postman资源-CSDN文库 Apipost模拟HTTP客户端(正文)新建窗口 ......
  • ArkTS本地化数据库SqlLight使用,鸿蒙NEXT星河版API(11)
    RelationalStore提供了一套完整的对本地数据库进行管理的机制,对外提供了一系列的增、删、改、查等接口,也可以直接运行用户输入的SQL语句来满足复杂的场景需要。谓词:数据库中用来代表数据实体的性质、特征或者数据实体之间关系的词项,主要用来定义数据库的操作条件。结果......
  • java学习03
    类型转换强制类型转换自动类型转化自动类型转换会从低到高转换类型转换注意点1、不能进行布尔类型的转换2、转化时会有精度和溢出的问题变量java变量类型名称值typevarName[=value]局部变量在方法里面实例变量在方法外在类里面类变量带static时可直接在方法里使......