首页 > 其他分享 >Flink基于State做千万用户的pv

Flink基于State做千万用户的pv

时间:2022-11-18 16:02:31浏览次数:53  
标签:count pv String Flink state State user time id


需求:记录每天某一页面下所有用户的访问次数和第一次访问的时间
解法:

  1. redis做缓存,每天一个map,设置ttl,用户访问次数做累积,过滤完先存到redis,sink的时候读redis,查出这个用户的总访问次数
  2. 用flink的keyby(user_id+date),生成count和min,使用checkpoint进行容错

    对于上边两种做法各有缺陷,第一种需要借助外部存储,任务出问题的时候重启无法保证累计不重复更新;第二种需要会占用大量的内存,无法清理过期的user_id+date,没几天就oom。
    此时我想到第一种解决方案,用keyby(user_id).process来解决,对每个user_id生成一个管道,用ValueState进行管理,管理两个值(count, min_time),然后判断日志的时间来确定是那一天,然后用state的ttl来解决占有内存的问题,但是后来使用ttl的时候就把我劝退了,state的ttl是惰性删除,过期了不访问就不会删除,那我还要他有个屁用。
    于是想到第二种方案,用keyby(user_id%1000).process来解决,保证用户会分到同一个桶里,维护一个MapState[date_time, Map(user, (count, min_time))],定期删除过期的date_time,一是就解决了ttl的问题,二是保证每次都能删除昨天的分区,三是分多个桶也会保证数据不倾斜。另外还有个优点,从checkponit中恢复的时候不会有重复累计count,保证数据的准确性,用的是FsStateBackend。

核心代码如下

env.setStateBackend(new FsStateBackend(s"hdfs:///flink/checkpoint/xxx/$jobName"));

map((user_id%1000, user_id, timestamp))
.keyby(_._1)
.process(new MonitorKeyedProcessFunction)


class MonitorKeyedProcessFunction() extends KeyedProcessFunction[Long, (Long, String, String, String), JSONObject] {

private var state: MapState[String, java.util.HashMap[String, (Int, String)]] = _

override def open(parameters: Configuration): Unit = {
// 创建 ValueStateDescriptor
val descriptor = new MapStateDescriptor[String, java.util.HashMap[String, (Int, String)]]("myState", classOf[String], classOf[java.util.HashMap[String, (Int, String)]])

// 基于 ValueStateDescriptor 创建 ValueState
state = getRuntimeContext.getMapState(descriptor)

}

override def processElement(value: (Long, String, String, String),
context: KeyedProcessFunction[Long, (Long, String, String, String), JSONObject]#Context,
out: Collector[JSONObject]): Unit = {

val user_id = value._2
val time = value._3
val date = DateTimeUtil.tranTimeToString(time, "yyyy-MM-dd")
var current = state.get(date)
// 总数
var count = 0
// 第一次访问时间
var first_modified = time

// 初始化
if (current == null) {
current = new java.util.HashMap[String, (Int, String)]
}
if (current.keys.contains(user_id)) {
val info = current(user_id)
count = info._1
first_modified = info._2
}
// 最小时间
if (time < first_modified)
first_modified = time
// 累加
count += 1
// 提交
current.put(user_id, (count, first_modified))
// 更新
state.put(date, current)

// 删除过期日期数据 1点左右
val yesterday = System.currentTimeMillis()-(86400+3600)*1000
val yesterdayStr = DateTimeUtil.tranTimeToString(yesterday, "yyyy-MM-dd")
if(state.get(yesterdayStr)!=null){
if(state.get(yesterdayStr).size()!=0){
state.remove(yesterdayStr)
println(s"删除此分区${yesterdayStr}的状态数据")
}
}
// 输出
val res = new JSONObject()
res.put("user_id", user_id)
res.put("count", count)
res.put("min_time", first_modified)
out.collect(res)
}
}


标签:count,pv,String,Flink,state,State,user,time,id
From: https://blog.51cto.com/u_15879559/5868555

相关文章

  • Flink广播变量
    应用场景实时更新配置,例如:任务在统计3个页面的uv,又要统计另外三个页面的uv,那我是不是可以通过配置的方式,快速实现类似需求实时加载维表,例如:kafka里用户购买的订单信息的binl......
  • Flink/Spark中ETL的简单模版
    我们往往可以忽略外界的干扰因素,避免焦虑,专心做自己想做的事情,反正焦虑又解决不了问题引言使用flink或者spark的时候,写好固定的模版很重要,对于一下etl的实时任务,只需要执行......
  • C语言:IPv6地址压缩
    题目IPv6二进位制下为128位长度,以16位为一组,每组以冒号“:”隔开,可以分为8组,每组以4位十六进制方式表示。例如:2001:0db8:0000:0000:0123:4567:89ab:cdef是一个......
  • ERROR 1820 (HY000): You must reset your password using ALTER USER statement befo
    首先安装后,执行任何指令都会提示:ERROR1820(HY000):YoumustresetyourpasswordusingALTERUSERstatementbeforeexecutingthisstatement.可以用以下指令修改......
  • 【2022.11.17】N5105安装PVE系统,关联proxmox
    下载、安装PVE系统先去PVE官网下载新版的ISO文件:ProxmoxVE7.2ISOInstaller写入磁盘后直接进入BIOS,选择U盘启动选择同意选择磁盘如果有网络的话,不用选择,没网络的......
  • shell显示当前主机系统信息,包括:主机名,IPv4地址,操作系统版本,内核 版本,CPU型号,内存大小
    root@libin5shell]#vimshell1.sh#!/bin/bash#显示当前主机系统信息,包括:主机名,IPv4地址,操作系统版本,内核版本,CPU型号,内存大小,硬盘大小#******************************......
  • Apache Flink架构及其工作原理
    ApacheFlink架构及其工作原理1、定义:Apacheflink是一个实时计算框架和分布式处理引擎,用于再无边界和有边界数据流上进行有状态的计算,Flink能在所有的集群环境中运行,......
  • 在macbook m1上调试flink1.14.3
    前置条件1:首先先用homebrew安装一下flink1.14.3版本,安装完成后,/usr/local/Celler/apache-flink/1.14.3是主路径。可以看看有没有类似的文件夹来确定有没有安装上。前置条......
  • K8s存储管理—volume、pv、pvc
    介绍---------来自官方文档    存储的管理是一个与计算实例的管理完全不同的问题。PersistentVolume子系统为用户和管理员提供了一组API,将存储如何供应的细节从其......
  • R语言用CPV模型的房地产信贷信用风险的度量和预测
    全文链接:http://tecdat.cn/?p=30401原文出处:拓端数据部落公众号本文基于CPV模型,对房地产信贷风险进行了度量与预测。我们被客户要求撰写关于CPV模型的研究报告。结果......