需求背景
Flink实时任务的开发过程中,有一个常见的场景需要动态更新一些配置信息,这些信息可能在文件中,也可能是数据库中。对于批处理任务而言这非常简单,可我们在实时任务的执行过程中,该如何实现呢,其实也非常简单。
源码阅读
在Flink中,DataStream也有Broadcast(广播)的能力,通过将数据流广播,可以把我们需要读取的的配置数据广播到所有的下游task中,实现共享配置信息的能力,方便下游task读取/更新。
在Flink中,Broadcast也是一种状态,会在checkpoint时进行持久化,状态数据不会丢失。
-
目前Broadbcast只支持类型为MapState的状态类型,在使用时声明定义一个MapStateDescriptor。
-
val mapStateDescriptor = new MapStateDescriptor[K, V]( "mapStateDemo", createTypeInformation[K], createTypeInformation[V])
-
同时Broadcast-State-API的使用要区分两种场景:
-
Non-Keyed Stream
-
Keyed Stream
因为通过connect关联两条流后,返回一个BroadcastConnectedStream流,而更进一步的数据处理,需要调用process,该方法需要BaseBroadcastProcessFunction的两个子类:
-
BroadcastProcessFunction
-
KeyedBroadcastProcessFunction
刚好对应上面的Non-Keyed Stream和Keyed Stream
-
BroadcastConnectedStream的process方法注释:
-
Assumes as inputs a [[org.apache.flink.streaming.api.datastream.BroadcastStream]] and a [[KeyedStream]] and applies the given [[KeyedBroadcastProcessFunction]] on them, thereby creating a transformed output stream.
场景实例
这里我们以用户事件数据为例。有两个事件流,一个事件流是用户行为数据,另一个事件流是需要广播的配置参数。使用Kafka作为两个流的源。
-
环境信息
-
jdk:1.8;
-
scala:2.12;
-
flink:1.13.6;
Maven
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-scala -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.12</artifactId>
<version>1.13.6</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.12</artifactId>
<version>1.13.6</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>1.13.6</version>
</dependency>
<dependency>
<groupId>org.json4s</groupId>
<artifactId>json4s-jackson_2.12</artifactId>
<version>3.7.0-RC1</version>
</dependency>
完整代码
import org.apache.flink.api.common.eventtime.WatermarkStrategy
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.common.state.MapStateDescriptor
import org.apache.flink.configuration.Configuration
import org.apache.flink.connector.kafka.source.KafkaSource
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer
import org.apache.flink.streaming.api.functions.co.{BroadcastProcessFunction}
import org.slf4j.{Logger, LoggerFactory}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector
import org.json4s._
import org.json4s.jackson.JsonMethods
// {"userId":"","eventTimeStamp":1670314741672,"eventType":1}
case class UserEvent(userId: String, eventTimeStamp: Long, eventType: Int)
// {"eventType":1,"eventName":"页面访问事件"}
case class EventType(eventType: Int, eventName: String)
object BroadcastStateTest1 extends App {
val log: Logger = LoggerFactory.getLogger(this.getClass)
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironment(2)
val brokers = "127.0.0.1:9092"
/*
//用户事件json
{"userId":"zhangsan","eventTimeStamp":1670314741672,"eventType":1}
{"userId":"zhangsan","eventTimeStamp":1670314741672,"eventType":2}
{"userId":"lisi","eventTimeStamp":1670314741672,"eventType":1}
{"userId":"lisi","eventTimeStamp":1670314741672,"eventType":2}
*/
val user_event_topic = "user_event"
/*
//事件类型json
{"eventType":1,"eventName":"页面访问事件"}
{"eventType":2,"eventName":"页面点击事件"}
{"eventType":3,"eventName":"添加购物车事件"}
*/
val event_confilg_topic = "event_type"
val group_id = "test_1"
implicit val formats = DefaultFormats // Brings in default date formats etc.
//定义kafka-userevent源
val userEventSource = KafkaSource.builder[String]
.setBootstrapServers(brokers)
.setTopics(user_event_topic)
.setGroupId(group_id)
.setStartingOffsets(OffsetsInitializer.earliest)
.setValueOnlyDeserializer(new SimpleStringSchema())
.build
//定义kafka-userevent源
val eventTypeSource = KafkaSource.builder[String]
.setBootstrapServers(brokers)
.setTopics(event_confilg_topic)
.setGroupId(group_id)
.setStartingOffsets(OffsetsInitializer.earliest)
.setValueOnlyDeserializer(new SimpleStringSchema())
.build
//定义需要broadcast的状态类型,只支持map类型的状态定义
val eventTypeBroadcastStateDescriptors = new MapStateDescriptor[Int, EventType](
"EventType",
createTypeInformation[Int],
createTypeInformation[EventType])
//定义事件类型stream
val eventTypeDataStream = env
.fromSource(eventTypeSource, WatermarkStrategy.noWatermarks[String](), "EventType-Source")
// .fromElements(
// """{"eventType":1,"eventName":"页面访问事件"}""",
// """{"eventType":2,"eventName":"页面点击事件"}"""
// )
.map(obj => {
if (obj.nonEmpty) {
log.info(s"eventTypeDataStream input:${obj}")
val parseJson: JValue = JsonMethods.parse(obj)
val eventType: Int = (parseJson \ "eventType").extractOrElse[Int](0) // 设置默认值
val eventName: String = (parseJson \ "eventName").extractOrElse[String]("") // 设置默认值
log.info(s"eventTypeDataStream data eventType:${eventType},eventName:${eventName}")
EventType(eventType, eventName)
} else {
null
}
})
.filter(_ != null)
.broadcast(eventTypeBroadcastStateDescriptors)
//定义用户事件stream
val userEventDataStream: DataStream[UserEvent] = env
.fromSource(userEventSource, WatermarkStrategy.noWatermarks[String](), "UserEvent-Source")
// .fromElements(
// """{"userId":"zhangsan","eventTimeStamp":1670314741672,"eventType":1}""",
// """{"userId":"lisi","eventTimeStamp":1670314741672,"eventType":1}"""
// )
.map(obj => {
if (obj.nonEmpty) {
log.info(s"userEventDataStream input:${obj}")
val parseJson: JValue = JsonMethods.parse(obj)
val userId: String = (parseJson \ "userId").extractOrElse[String]("") // 设置默认值
val eventTimeStamp: Long = (parseJson \ "eventTimeStamp").extractOrElse[Long](0L) // 设置默认值
val eventType: Int = (parseJson \ "eventType").extractOrElse[Int](0) // 设置默认值
log.info(s"userEventDataStream input data userId:${userId},eventTimeStamp:${eventTimeStamp},eventType:${eventType}")
UserEvent(userId, eventTimeStamp, eventType)
} else {
null
}
})
.filter(_ != null)
//将用户事件stream与事件类型stream进行关联
val connEventStream = userEventDataStream.connect(eventTypeDataStream)
connEventStream.process(new BroadcastProcessFunction[UserEvent, EventType, UserEvent] {
override def open(parameters: Configuration): Unit = {
super.open(parameters)
}
override def processElement(value: UserEvent,
ctx: BroadcastProcessFunction[UserEvent, EventType, UserEvent]#ReadOnlyContext,
out: Collector[UserEvent]): Unit = {
val state = ctx.getBroadcastState(eventTypeBroadcastStateDescriptors)
val userId = value.userId
val eventType = value.eventType
val eventTypeObj = state.get(eventType)
val eventName = {
if (eventTypeObj != null) {
eventTypeObj.eventName
} else {
null
}
}
log.info(s"connEventStream processElement userId:${userId},eventType:${eventType},eventName:${eventName}")
out.collect(value)
}
override def processBroadcastElement(value: EventType,
ctx: BroadcastProcessFunction[UserEvent, EventType, UserEvent]#Context,
out: Collector[UserEvent]): Unit = {
val eventType = value.eventType
val newValue = value.eventName
val state = ctx.getBroadcastState(eventTypeBroadcastStateDescriptors)
val oldValue = state.get(eventType)
if (state.contains(eventType)) {
log.info(s"connEventStream configured eventType exists: eventType=${eventType},detail: oldValue=${oldValue}, newValue=${newValue}")
} else {
log.info(s"connEventStream configured eventType=${eventType},detail: oldValue=0, newValue=${newValue}")
}
state.put(eventType, value)
}
})
env.execute()
}
-
部署运行
-
创建Topic
-
运行需要创建测试对应Topic,创建命令参考如下:
-
./kafka-topics.sh --create --topic user_event --partitions 1 --replication-factor 1 --bootstrap-server 127.0.0.1:9092 ./kafka-topics.sh --create --topic event_type --partitions 1 --replication-factor 1 --bootstrap-server 127.0.0.1:9092
-
提交运行Flink Job
-
bin/flink run -d -c flink-app-jobs.jar
-
用户事件数据
-
{"userId":"zhangsan","eventTimeStamp":1670314741672,"eventType":1} {"userId":"zhangsan","eventTimeStamp":1670314741672,"eventType":2} {"userId":"lisi","eventTimeStamp":1670314741672,"eventType":1} {"userId":"lisi","eventTimeStamp":1670314741672,"eventType":2}
-
事件字典数据
-
{"eventType":1,"eventName":"页面访问事件"} {"eventType":2,"eventName":"页面点击事件"} {"eventType":3,"eventName":"添加购物车事件"}
测试总结
按照不同的数据生产顺序,可以在控制台观察数据输出的结果,不管是我们更新字典数据,还是写入新的用户事件数据,可以满足我们动态更新配置信息的需求。
标签:eventName,flink,val,eventType,Flink,userId,Broadcast,eventTimeStamp,动态 From: https://www.cnblogs.com/panshan-lurenjia/p/17094075.html