首页 > 其他分享 >使用Broadcast实现Flink流处理动态更新配置数据

使用Broadcast实现Flink流处理动态更新配置数据

时间:2023-02-05 22:11:31浏览次数:35  
标签:eventName flink val eventType Flink userId Broadcast eventTimeStamp 动态

需求背景

Flink实时任务的开发过程中,有一个常见的场景需要动态更新一些配置信息,这些信息可能在文件中,也可能是数据库中。对于批处理任务而言这非常简单,可我们在实时任务的执行过程中,该如何实现呢,其实也非常简单。

源码阅读

在Flink中,DataStream也有Broadcast(广播)的能力,通过将数据流广播,可以把我们需要读取的的配置数据广播到所有的下游task中,实现共享配置信息的能力,方便下游task读取/更新。

在Flink中,Broadcast也是一种状态,会在checkpoint时进行持久化,状态数据不会丢失。

  • 目前Broadbcast只支持类型为MapState的状态类型,在使用时声明定义一个MapStateDescriptor。

  • val mapStateDescriptor = new MapStateDescriptor[K, V](
       "mapStateDemo",
       createTypeInformation[K],
       createTypeInformation[V])
    
  • 同时Broadcast-State-API的使用要区分两种场景:

  • Non-Keyed Stream

  • Keyed Stream

因为通过connect关联两条流后,返回一个BroadcastConnectedStream流,而更进一步的数据处理,需要调用process,该方法需要BaseBroadcastProcessFunction的两个子类:

  • BroadcastProcessFunction

  • KeyedBroadcastProcessFunction

刚好对应上面的Non-Keyed Stream和Keyed Stream

  • img

  • img

  • BroadcastConnectedStream的process方法注释:

  • Assumes as inputs a [[org.apache.flink.streaming.api.datastream.BroadcastStream]] and a [[KeyedStream]] and applies the given [[KeyedBroadcastProcessFunction]] on them, thereby creating a transformed output stream.
    

场景实例

这里我们以用户事件数据为例。有两个事件流,一个事件流是用户行为数据,另一个事件流是需要广播的配置参数。使用Kafka作为两个流的源。

  • 环境信息

  • jdk:1.8;

  • scala:2.12;

  • flink:1.13.6;

Maven

<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-scala -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-scala_2.12</artifactId>
    <version>1.13.6</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-scala_2.12</artifactId>
    <version>1.13.6</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka_2.12</artifactId>
    <version>1.13.6</version>
</dependency>

<dependency>
    <groupId>org.json4s</groupId>
    <artifactId>json4s-jackson_2.12</artifactId>
    <version>3.7.0-RC1</version>
</dependency>

完整代码

import org.apache.flink.api.common.eventtime.WatermarkStrategy
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.common.state.MapStateDescriptor
import org.apache.flink.configuration.Configuration
import org.apache.flink.connector.kafka.source.KafkaSource
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer
import org.apache.flink.streaming.api.functions.co.{BroadcastProcessFunction}
import org.slf4j.{Logger, LoggerFactory}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector
import org.json4s._
import org.json4s.jackson.JsonMethods

// {"userId":"","eventTimeStamp":1670314741672,"eventType":1}
case class UserEvent(userId: String, eventTimeStamp: Long, eventType: Int)

// {"eventType":1,"eventName":"页面访问事件"}
case class EventType(eventType: Int, eventName: String)

object BroadcastStateTest1 extends App {

   val log: Logger = LoggerFactory.getLogger(this.getClass)

   val env: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironment(2)

   val brokers = "127.0.0.1:9092"

   /*
   //用户事件json
   {"userId":"zhangsan","eventTimeStamp":1670314741672,"eventType":1}
   {"userId":"zhangsan","eventTimeStamp":1670314741672,"eventType":2}
   {"userId":"lisi","eventTimeStamp":1670314741672,"eventType":1}
   {"userId":"lisi","eventTimeStamp":1670314741672,"eventType":2}
    */
   val user_event_topic = "user_event"

   /*
   //事件类型json
   {"eventType":1,"eventName":"页面访问事件"}
   {"eventType":2,"eventName":"页面点击事件"}
   {"eventType":3,"eventName":"添加购物车事件"}
    */
   val event_confilg_topic = "event_type"

   val group_id = "test_1"

   implicit val formats = DefaultFormats // Brings in default date formats etc.

   //定义kafka-userevent源
   val userEventSource = KafkaSource.builder[String]
      .setBootstrapServers(brokers)
      .setTopics(user_event_topic)
      .setGroupId(group_id)
      .setStartingOffsets(OffsetsInitializer.earliest)
      .setValueOnlyDeserializer(new SimpleStringSchema())
      .build

   //定义kafka-userevent源
   val eventTypeSource = KafkaSource.builder[String]
      .setBootstrapServers(brokers)
      .setTopics(event_confilg_topic)
      .setGroupId(group_id)
      .setStartingOffsets(OffsetsInitializer.earliest)
      .setValueOnlyDeserializer(new SimpleStringSchema())
      .build

   //定义需要broadcast的状态类型,只支持map类型的状态定义
   val eventTypeBroadcastStateDescriptors = new MapStateDescriptor[Int, EventType](
      "EventType",
      createTypeInformation[Int],
      createTypeInformation[EventType])

   //定义事件类型stream
   val eventTypeDataStream = env
      .fromSource(eventTypeSource, WatermarkStrategy.noWatermarks[String](), "EventType-Source")
      //    .fromElements(
      //       """{"eventType":1,"eventName":"页面访问事件"}""",
      //       """{"eventType":2,"eventName":"页面点击事件"}"""
      //    )
      .map(obj => {
         if (obj.nonEmpty) {
            log.info(s"eventTypeDataStream input:${obj}")

            val parseJson: JValue = JsonMethods.parse(obj)
            val eventType: Int = (parseJson \ "eventType").extractOrElse[Int](0) // 设置默认值
            val eventName: String = (parseJson \ "eventName").extractOrElse[String]("") // 设置默认值

            log.info(s"eventTypeDataStream data eventType:${eventType},eventName:${eventName}")
            EventType(eventType, eventName)
         } else {
            null
         }
      })
      .filter(_ != null)
      .broadcast(eventTypeBroadcastStateDescriptors)

   //定义用户事件stream
   val userEventDataStream: DataStream[UserEvent] = env
      .fromSource(userEventSource, WatermarkStrategy.noWatermarks[String](), "UserEvent-Source")
      //    .fromElements(
      //       """{"userId":"zhangsan","eventTimeStamp":1670314741672,"eventType":1}""",
      //       """{"userId":"lisi","eventTimeStamp":1670314741672,"eventType":1}"""
      //    )
      .map(obj => {
         if (obj.nonEmpty) {
            log.info(s"userEventDataStream input:${obj}")

            val parseJson: JValue = JsonMethods.parse(obj)
            val userId: String = (parseJson \ "userId").extractOrElse[String]("") // 设置默认值
            val eventTimeStamp: Long = (parseJson \ "eventTimeStamp").extractOrElse[Long](0L) // 设置默认值
            val eventType: Int = (parseJson \ "eventType").extractOrElse[Int](0) // 设置默认值

            log.info(s"userEventDataStream input data userId:${userId},eventTimeStamp:${eventTimeStamp},eventType:${eventType}")
            UserEvent(userId, eventTimeStamp, eventType)
         } else {
            null
         }
      })
      .filter(_ != null)

   //将用户事件stream与事件类型stream进行关联
   val connEventStream = userEventDataStream.connect(eventTypeDataStream)

   connEventStream.process(new BroadcastProcessFunction[UserEvent, EventType, UserEvent] {

      override def open(parameters: Configuration): Unit = {
         super.open(parameters)
      }

      override def processElement(value: UserEvent,
                           ctx: BroadcastProcessFunction[UserEvent, EventType, UserEvent]#ReadOnlyContext,
                           out: Collector[UserEvent]): Unit = {

         val state = ctx.getBroadcastState(eventTypeBroadcastStateDescriptors)
         val userId = value.userId
         val eventType = value.eventType

         val eventTypeObj = state.get(eventType)
         val eventName = {
            if (eventTypeObj != null) {
               eventTypeObj.eventName
            } else {
               null
            }
         }
         log.info(s"connEventStream processElement userId:${userId},eventType:${eventType},eventName:${eventName}")

         out.collect(value)
      }

      override def processBroadcastElement(value: EventType,
                                  ctx: BroadcastProcessFunction[UserEvent, EventType, UserEvent]#Context,
                                  out: Collector[UserEvent]): Unit = {
         val eventType = value.eventType
         val newValue = value.eventName

         val state = ctx.getBroadcastState(eventTypeBroadcastStateDescriptors)
         val oldValue = state.get(eventType)

         if (state.contains(eventType)) {
            log.info(s"connEventStream configured eventType exists: eventType=${eventType},detail: oldValue=${oldValue}, newValue=${newValue}")
         } else {
            log.info(s"connEventStream configured eventType=${eventType},detail: oldValue=0, newValue=${newValue}")
         }

         state.put(eventType, value)

      }
   })

   env.execute()

}
  • 部署运行

  • 创建Topic

  • 运行需要创建测试对应Topic,创建命令参考如下:

  • ./kafka-topics.sh --create --topic user_event --partitions 1 --replication-factor 1 --bootstrap-server 127.0.0.1:9092
    ./kafka-topics.sh --create --topic event_type --partitions 1 --replication-factor 1 --bootstrap-server 127.0.0.1:9092
    
  • bin/flink run -d -c flink-app-jobs.jar
    
  • 用户事件数据

  • {"userId":"zhangsan","eventTimeStamp":1670314741672,"eventType":1}
    {"userId":"zhangsan","eventTimeStamp":1670314741672,"eventType":2}
    {"userId":"lisi","eventTimeStamp":1670314741672,"eventType":1}
    {"userId":"lisi","eventTimeStamp":1670314741672,"eventType":2}
    
  • 事件字典数据

  • {"eventType":1,"eventName":"页面访问事件"}
    {"eventType":2,"eventName":"页面点击事件"}
    {"eventType":3,"eventName":"添加购物车事件"}
    

测试总结

按照不同的数据生产顺序,可以在控制台观察数据输出的结果,不管是我们更新字典数据,还是写入新的用户事件数据,可以满足我们动态更新配置信息的需求。

标签:eventName,flink,val,eventType,Flink,userId,Broadcast,eventTimeStamp,动态
From: https://www.cnblogs.com/panshan-lurenjia/p/17094075.html

相关文章

  • Linux环境下:程序的链接, 装载和库[动态链接]
    静态链接库在程序编译阶段就完成了链接工作,完成链接后,依赖的库就都打入了可执行文件中,所以文件大小一般会比较大。而动态库链接库是在程序运行时才被链接的,所以磁盘上只要......
  • 十九、Flink状态编程之状态持久化和状态后端
    简介  在Flink的状态管理机制中,很重要的一个功能就是对状态进行持久化(persistence)保存,这样就可以在发生故障后进行重启恢复。Flink对状态进行持久化的方式,就是将当前所......
  • 十六、Flink状态编程之按键分区状态
    简介  在实际应用中,一般都需要将数据按照某个key进行分区,然后再进行计算处理;所以最为常见的状态类型就是KeyedState。之前介绍到keyBy之后的聚合、窗口计算,算子所持有的......
  • 十五、Flink状态编程之Flink状态
    简介在流处理中,数据是连续不断到来和处理的。每个任务进行计算处理时,可以基于当前数据直接转换得到输出结果;也可以依赖一些其他数据。这些由一个任务维护,并且用来计算输出......
  • 动态投资回收期计算公式
    动态投资回收期=(累计净现金流量现值出现正值的年数-1)+上一年累计净现金流量现值的绝对值/出现正值年份净现金流量的现值静态投资回收期           ......
  • MyBatis的使用八(动态SQL)
    本主要讲述mybatis处理动态sql语句一.问题引入前端展示的数据表格中,查询条件可能不止一个,如何将用户输入的多个查询条件,拼接到sql语句中呢?DynamicMapper接口声......
  • 动态规划
    动态规划算法设计的一种思想将问题分解为相互重叠的子问题,对子问题进行求解,从而解决原问题1.动态规划和分治区别分治的子问题,相互独立,互不干扰动态规划的子问题......
  • 混合应用字符串插值、字符串格式方法生成动态查询语句
    Strings=String.Format("select*fromPrice_ItemDeptswhereFeeDeptID={0}{1}{2}",$"'{deptID}'",string.IsNullOrEmpty(categoryID)?""......
  • Java静态绑定和动态绑定
    将方法调用连接到方法体称为绑定。在java中有两种类型的绑定:静态绑定(也称为早期绑定)。动态绑定(也称为后期绑定)。了解类型下面让我们来了解实例的类型。1.变......
  • 动态代理-RPC实现核心原理
    实现过统一拦截吗?如授权认证、性能统计,可以用SpringAOP,不需要改动原有代码前提下,还能实现非业务逻辑跟业务逻辑的解耦。核心就是动态代理,通过对字节码进行增强,在方法调用......