使用scala编写flink api从不同的数据源(源端)读取数据,并进行无界流/有界流的数据处理,最终将处理好的数据sink到对应的目标端
一、maven配置
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>flinkapi</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.11</artifactId>
<version>1.7.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.7.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_2.11</artifactId>
<version>1.7.2</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.44</version>
</dependency>
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.0</version>
</dependency>
<!--<dependency>-->
<!--<groupId>org.apache.flink</groupId>-->
<!--<artifactId>flink-connector-elasticsearch6_2.11</artifactId>-->
<!--<version>1.7.2</version>-->
<!--</dependency>-->
<!--<dependency>-->
<!--<groupId>org.apache.flink</groupId>-->
<!--<artifactId>flink-statebackend-rocksdb_2.11</artifactId>-->
<!--<version>1.7.2</version>-->
<!--</dependency>-->
</dependencies>
<build>
<sourceDirectory>src/main/scala</sourceDirectory>
<testSourceDirectory>src/test/scala</testSourceDirectory>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
<configuration>
<args>
<!--<arg>-make:transitive</arg>-->
<arg>-dependencyfile</arg>
<arg>${project.build.directory}/.scala_dependencies</arg>
</args>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>WordCount</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
二、WordCount
(1)准备好wordcount.txt文本
hello world
dog fish
hadoop
spark
hello world
dog fish
hadoop
spark
hello world
dog fish
hadoop
spark
(2)有界数据流有状态计算(scala语言)
package com.harley.test
import org.apache.flink.api.scala._
object Demo01_ExecutionEnvironment {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val ds: DataSet[String] = env.readTextFile("src/main/resources/wordcount.txt")
val res = ds.flatMap(_.split(" "))
.map((_,1))
.groupBy(0) //元组的第一个元素
.sum(1)
//输出结果
res.print()
}
}
(3)无界数据流有状态计算
package com.harley.test
import org.apache.flink.streaming.api.scala._
object Demo02_StreamExecutionEnvironment {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val ds: DataStream[String] = env.socketTextStream("localhost", 9999)
val res = ds.flatMap(_.split(" "))
.filter(_ != "a")
.map((_, 1))
.keyBy(0)
.sum(1)
// 设置并行度
res.print("Stream print").setParallelism(2)
env.execute("Stream job")
}
}
三、整合不同的数据源
3.1、将集合作为数据源
import org.apache.flink.streaming.api.scala._
object Demo01_Source{
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val ds = env.fromCollection(List(
SensorReader("sensor_1", 1547718199, 35.80018327300259),
SensorReader("sensor_6", 1547718201, 15.402984393403084),
SensorReader("sensor_7", 1547718202, 6.720945201171228),
SensorReader("sensor_10", 1547718205, 38.101067604893444)
))
ds.print("collection source")
env.execute("source job")
}
case class SensorReader(id: String,timestamp: Long,temperature: Double)
}
3.2、Kafka作为数据源
(1)启动Kafka服务,创建topic,并启动一个producer,向producer中发送数据
nohup bin/kafka-server-config.sh config/server.properties &
bin/kafka-console-producer.sh --bootstrap-server node7-2:9092 --topic test
(2)通过Flink API服务topic中的数据
import java.util.Properties
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer010}
object Demo02_KafkaSource {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
// 设置全局并行度
env.setParallelism(1)
val props = new Properties()
props.setProperty("bootstrap.servers","node7-2:9092")
props.setProperty("group.id","kafkasource")
props.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer")
props.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer")
props.setProperty("auto.offset.reset","latest")
val stream = env.addSource(new FlinkKafkaConsumer010[String](
"test", // topic
new SimpleStringSchema(),
props
))
stream.print("kafka source")
env.execute("job")
}
}
3.3、自定义Source
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala._
import scala.util.Random
object Demo03_DefinedSource {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val stream = env.addSource(new SensorSource())
stream.print("defined source")
env.execute("defined job")
}
class SensorSource extends SourceFunction[Sensor1]{
var running: Boolean = true
override def run(sourceContext: SourceFunction.SourceContext[Sensor1]): Unit = {
val random = new Random()
var currentTemp = 1.to(10).map(
i => ("sensor_"+i,random.nextGaussian()*20+60)
)
while(running){
currentTemp = currentTemp.map(tuple=>{
(tuple._1,tuple._2+random.nextGaussian())
})
val ts = System.currentTimeMillis()
currentTemp.foreach(tuple=>{
sourceContext.collect(Sensor1(tuple._1,ts,tuple._2))
})
Thread.sleep(1000)
}
}
override def cancel(): Unit = running = false
}
case class Sensor1(id:String,timestamp:Long,tempreture: Double)
}
四、Flink的方法
4.1、Transform
(1)准备文本文件sensor.txt
sensor_1,1600828704738,62.00978962180007
sensor_2,1600828704738,88.07800632412795
sensor_3,1600828704738,63.85113916269769
sensor_4,1600828704738,88.11328700513668
sensor_5,1600828704738,104.80491942566778
sensor_6,1600828704738,57.27152286624301
sensor_7,1600828704738,42.542439944867574
sensor_8,1600828704738,59.3964933103558
sensor_9,1600828704738,59.967837312594796
sensor_10,1600828704738,77.23695484678282
sensor_1,1600828705745,62.5
sensor_1,1600828705745,88.86940686134874
sensor_3,1600828705745,64.9
sensor_1,1600828705745,87.8
sensor_5,1600828705745,104.33176752272263
sensor_6,1600828705745,56.14405735923403
(2)
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.streaming.api.scala._
object Demo04_Transform {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val stream: DataStream[String] = env.readTextFile("src/main/resources/sensor.txt")
val value = stream.map(line => {
val splitedArrs = line.split(",")
Sensor1(splitedArrs(0), splitedArrs(1).trim.toLong, splitedArrs(2).trim.toDouble)
})
val value1 = stream.flatMap(_.split(","))
stream.filter(_.split(",")(0)=="sensor_1")
value.keyBy("id")
.reduce((s1,s2)=>Sensor1(s1.id,s1.timestamp,s2.tempreture+100))
.print("keyby")
val value2: KeyedStream[Sensor1, Tuple] = value.keyBy(0)
env.execute("job")
}
case class Sensor1(id:String,timestamp: Long,tempreture: Double)
}
4.2、Split
import com.jinghang.day13.Demo04_Transform.Sensor1
import org.apache.flink.streaming.api.scala._
object Demo05_Split {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val stream: DataStream[String] = env.readTextFile("src/main/resources/sensor.txt")
val ds: DataStream[Sensor1] = stream.map(line => {
val splited = line.split(",")
Sensor1(splited(0), splited(1).trim.toLong, splited(2).trim.toDouble)
})
val splitStream: SplitStream[Sensor1] = ds.split(
sensor => {
if (sensor.tempreture > 60)
Seq("high")
else
Seq("low")
})
val highStream: DataStream[Sensor1] = splitStream.select("high")
val lowStream: DataStream[Sensor1] = splitStream.select("low")
val allStream: DataStream[Sensor1] = splitStream.select("high","low")
val connStream: ConnectedStreams[Sensor1, Sensor1] = highStream.connect(lowStream)
val unionStream: DataStream[Sensor1] = highStream.union(lowStream).union(allStream)
highStream.print("high")
lowStream.print("low")
allStream.print("all")
env.execute("job")
}
}
4.3、UDF
import com.jinghang.day13.Demo04_Transform.Sensor1
import org.apache.flink.api.common.functions.{FilterFunction, RichFilterFunction, RichFlatMapFunction}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector
object Demo06_UDF {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val stream: DataStream[String] = env.readTextFile("src/main/resources/sensor.txt")
val ds: DataStream[Sensor1] = stream.map(line => {
val splitedArrs = line.split(",")
Sensor1(splitedArrs(0), splitedArrs(1).trim.toLong, splitedArrs(2).trim.toDouble)
})
val dataStream = ds.filter(new UDFFilter())
ds.filter(new RichFilterFunction[Sensor1] {
override def filter(value: Sensor1): Boolean = {
value.id == "sensor_1"
}
//可以做一些预操作
override def open(parameters: Configuration): Unit = super.open(parameters)
})
dataStream.print("filter")
env.execute("filter job")
}
class UDFFilter() extends FilterFunction[Sensor1]{
override def filter(value: Sensor1): Boolean = {
value.id == "sensor_1"
}
}
class UDFFlatMap extends RichFlatMapFunction[String,String]{
override def flatMap(value: String, out: Collector[String]): Unit = {
}
}
}
4.4、JDBC Sink
import java.sql.{Connection, DriverManager, PreparedStatement}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import org.apache.flink.streaming.api.scala._
object Demo01_JdbcSink {
def main(args: Array[String]): Unit = {
// 创建流处理的环境对象
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
// 设置全局并行度
env.setParallelism(1)
val stream: DataStream[String] = env.readTextFile("src/main/resources/sensor.txt")
val dataStream: DataStream[Sensor] = stream.map(line => {
val splitedArrs = line.split(",")
Sensor(splitedArrs(0), splitedArrs(1).trim.toLong, splitedArrs(2).trim.toDouble)
})
dataStream.addSink(new JDBCSink())
env.execute("job")
}
case class Sensor(id: String,timestamp: Long,temperature: Double)
class JDBCSink extends RichSinkFunction[Sensor]{
var connection: Connection = _
var insertStatement: PreparedStatement = _
var updateStatement: PreparedStatement = _
/**
* 初始化操作
* @param parameters
*/
override def open(parameters: Configuration): Unit = {
Class.forName("com.mysql.cj.jdbc.Driver")
connection = DriverManager.getConnection("jdbc:mysql:///test?serverTimezone=UTC","root","123456")
insertStatement = connection.prepareStatement("insert into temperatures (sensor,temps) values (?,?)")
println(insertStatement)
updateStatement = connection.prepareCall("update temperatures set temps = ? where sensor = ?")
println(updateStatement)
}
override def invoke(value: Sensor, context: SinkFunction.Context[_]): Unit = {
updateStatement.setDouble(1,value.temperature)
updateStatement.setString(2,value.id)
updateStatement.execute()
if (updateStatement.getUpdateCount == 0){
insertStatement.setString(1,value.id)
insertStatement.setDouble(2,value.temperature)
insertStatement.execute()
}
}
override def close(): Unit = {
insertStatement.close()
updateStatement.close()
connection.close()
}
}
}
— 业精于勤荒于嬉,行成于思毁于随 —
标签:03,flink,val,Flink,API,env,apache,org,sensor From: https://www.cnblogs.com/houhuilinblogs/p/18252425