首页 > 其他分享 >flink中的Keyed State

flink中的Keyed State

时间:2023-05-09 22:55:38浏览次数:32  
标签:MapState ListState 订单 flink 用户 Keyed 访问 State event

Keyed state是指在Flink中与一个特定key相关联的状态。在Flink中,数据被分区并按key分组。当数据流被分区和分组后,每个key都有一个对应的状态,这就是Keyed state。它可以用于计算窗口、聚合操作和连续查询等。Keyed state通常用于在流处理中跟踪关键得分、计数或其他与特定数据点相关的值。

MapState、ListState和ValueState都属于Keyed state的不同类型,它们的作用如下:

  1. MapState:MapState是一种Key-Value集合状态,它允许用户存储和访问与特定key相关联的值。MapState通常用于数据聚合、初始化数据或将一个状态映射到另一个状态。

假设有一个订单流,其中每个订单都有一个唯一的订单号(orderId)和相应的商品数量(itemCount)。我们想要按订单号将订单总数聚合,每次收到一个新订单时更新相应的订单数。此时,我们可以使用MapState来实现:

// 创建一个 MapState 用于存储订单数量
MapState<String, Integer> orderCountMap = getRuntimeContext().getMapState(
    new MapStateDescriptor<>("orderCountMap", Types.STRING, Types.INT));

/**
 * 处理每个订单的方法,统计每个订单的商品数量
 * @param order 表示要被处理的订单对象
 * @throws Exception
 */
public void processElement(Order order) throws Exception {
    String orderId = order.getOrderId(); // 获取订单ID
    Integer itemCount = order.getItemCount(); // 获取订单商品数量
    Integer countSoFar = orderCountMap.get(orderId); // 获取该订单已有的商品数量
    if (countSoFar == null) { // 如果还未有该订单的记录,则初始化为0
        countSoFar = 0;
    }
    countSoFar += order.getItemCount(); // 把新的商品数量加入已有的记录里
    orderCountMap.put(orderId, countSoFar); // 更新该订单的记录
}

  1. ListState:ListState是一种列表状态,它允许用户将多个值存储在一个state单元里。ListState通常用于在某些场景中存储状态,例如窗口聚合中的中间结果。

假设有一个统计每个小时内用户访问网站的用户列表,我们可以使用ListState来实现:

// 创建一个 ListState 用于存储用户列表
ListState<String> userListState = getRuntimeContext().getListState(
    new ListStateDescriptor<String>("userListState", Types.STRING));

/**
 * 处理每个用户访问事件的方法,将用户添加到用户列表中
 * @param event 表示要被处理的用户访问事件对象
 * @throws Exception
 */
public void processElement(UserAccessEvent event) throws Exception {
    long currentTime = event.getTimestamp(); // 获取事件的时间戳
    DateTimeZone timeZone = DateTimeZone.forID("Asia/Shanghai"); // 设置时区
    DateTime dateTime = new DateTime(currentTime, timeZone); // 根据时间戳创建 DateTime 对象
    String hourKey = dateTime.toString("YYYYMMddHH"); // 根据时间戳创建按小时分组的 Key
    String userName = event.getUserName(); // 获取访问用户的用户名
    Iterable<String> userList = userListState.get(); // 获取当前存储的用户列表
    if (Iterables.size(userList) == 0) { // 如果列表为空,则直接添加用户
        userListState.add(userName);
    } else { // 否则,复制一份列表,并在其中查找是否已经存在该用户
        List<String> userListCopy = Lists.newArrayList(userList);
        if (!userListCopy.contains(userName)) { // 如果不存在,则添加该用户
            userListCopy.add(userName);
            userListState.update(userListCopy); // 更新存储的用户列表
        }
    }
}

  1. ValueState:ValueState是一种单值状态,允许用户存储和更新与特定key相关联的单个值。ValueState通常用于记录特定key的最新状态或跟踪特定key的计数器状态。

假设有一个网站流,记录了每次浏览的页面和用户ID,我们想要在特定的时间段内统计用户浏览不同页面的次数。在这种情况下,我们可以使用ValueState来实现:

// 创建一个 ValueState 用于存储当前页面的访问次数
ValueState<Integer> visitCountState = getRuntimeContext().getState(
    new ValueStateDescriptor<>("visitCount", Types.INT));

/**
 * 处理每个页面访问事件的方法,更新该页面的访问次数
 * @param event 表示要被处理的页面访问事件对象
 * @throws Exception
 */
public void processElement(PageVisitEvent event) throws Exception {
    int visitCountSoFar = visitCountState.value() == null ? 0 : visitCountState.value(); // 获取当前页面的访问次数
    visitCountSoFar += 1; // 增加访问次数
    visitCountState.update(visitCountSoFar); // 更新当前页面的访问次数
    output.collect(event); // 发送处理后的事件对象到下游算子
}

标签:MapState,ListState,订单,flink,用户,Keyed,访问,State,event
From: https://www.cnblogs.com/aminor/p/17386608.html

相关文章

  • React笔记-state(四)
    React学习笔记-state(四)概念state的主要作用是用于组件保存控制以及修改自己的状态它算是组件的私有属性不可通过外部访问和修改只能通过组件内部的this.setState来修改修改state属性会导致组件的重新渲染注意:如果直接通过this.state.xxx的方式修改,组件不会重新渲染,但......
  • 启动flink显示ERROR: JAVA_HOME is not set and could not be found.
    问题:JAVA_HOME存在,但启动flink时出现ERROR:JAVA_HOMEisnotsetandcouldnotbefound.原因:环境变量加载顺序不对#/etc/profile.d/hadoop.sh#...exportHADOOP_CLASSPATH=`hadoopclasspath`JAVA_HOME是在java.sh里定义的,而hadoop.sh按照字典序先于java.sh加......
  • java.lang.IllegalStateException: Failed to check the status of the service 的解
    参考资料java.lang.IllegalStateException:Failedtocheckthestatusoftheservice的解决办法_Hello_World_QWP的博客-CSDN博客环境条件springcloud,注册中心用的是zookeeper;报错原因@ReferenceprivateXXXServicexxxService;解决方法@Refe......
  • flink Connecting to remote task manager 'localhost/127.0.0.1:44489
    问题:启动集群后,执行任务时失败:Causedby:org.apache.flink.runtime.io.network.partition.consumer.PartitionConnectionException:Connectionforpartition47d4a412246bdbbc3447e1968e07c821#1@04049d45261135a1a8bae9c8f62a1ba4_0a448493b4782967b150582570326227_1_0not......
  • 一、全面理解JWT | 二、对比Statement与prepareStatemen
    一、全面理解JWT-海~~D-博客园(cnblogs.com)结合抖音:@渡一Web前端学习频道---->第179集|彻底理解JWT可以更透彻的弄清JWT。 二、javaJDBCStatement的用途对比Statement与prepareStatement1.Statement存在SQL注入,PrepareStatement不存在SQL注入......
  • 为什么useState返回的是数组而不是对象?
    1.如果 useState 返回数组,那么可以顺便对数组中的变量命名,代码看起来也比较干净2.自定义hook的时候可以遵循一个简单原则:当参数大于2个的时候返回值的类型返回 object,否则返回数组。......
  • scandir,major和minor,state,无锁机制----比较交换CAS Compare And Swap,dirent,sprintf,fop
    文章目录1.Linuxc目录操作函数scandir2.Linux系统设备(device)的major和minornumber3.state4.无锁机制----比较交换CASCompareAndSwap5.dirent6.sprintf7.fopen8.atoi函数9.strtok10.strtol1.Linuxc目录操作函数scandir(1)头文件:#include<dirent.h>定义函数:intscandir(......
  • StatefulSet扩缩容源码分析
    k8sv1.15.0Informer监听cmd/kube-controller-manager/app/apps.go作为StatefulSet资源控制器,StatefulSetController通过PodInformer、StatefulSetInformer、PersistentVolumeClaimInformer、ControllerRevisionInformer来监听事件。扩缩容StatefulSetpodManagementPolicyPa......
  • Flink Chain任务链分隔
    Chain分隔文章目录Chain分隔如何切断任务链?startNewChain与disableChaining区别全局切断任务链(chain)web端效果查看隔离后依赖链忙碌程度什么是Backpressured(被压/反压)?代码样例参考文献如何切断任务链?由于共享slot的存在,当一个任务链的计算量特别庞大时,且只在一个slot上执行......
  • Flink Cdc MySQL 整库同步到 StarRocks
    这段时间开始调研使用StarRocks做准实时数据仓库:flinkcdc实时同步数据到StarRocks,然后在StarRocks中做分层计算,直接把StarRocks中的ADS层提供给BI查询。架构如下:由于用到的表比较多,不能用FlinkSQL给每个表都做个CDC的任务(任务太多不好维护、对数据库又可能有......