首页 > 编程语言 >15种实时uv实现方案系列(附源码)之一:Flink基于set实时uv统计

15种实时uv实现方案系列(附源码)之一:Flink基于set实时uv统计

时间:2023-08-17 12:56:57浏览次数:69  
标签:Set return uv 实时 源码 new userBehavior public

UVStatMultiPlans(GitHub)项目持续收集各种高性能实时uv实现方案并对各种实现方案的优缺点进行对比分析!

需求描述

统计每分钟用户每个页面的uv访问量。

Kafka数据格式

{"userId":"c61b801e-22e7-4238-8f67-90968a40f2a7","page":"page_1","behaviorTime":1692247408129}
{"userId":"c61b801e-22e7-4238-8f67-90968a40f2a7","page":"page_2","behaviorTime":1692247408129}

代码实现

完整代码已上传至:https://github.com/xl-xueling/uvstatmultiplans.git
public class UVStatPlan1 {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(5);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.getConfig().setAutoWatermarkInterval(TimeUnit.MINUTES.toMillis(10));
        Properties kafkaProperties = new Properties();
        kafkaProperties.setProperty("bootstrap.servers", SysConst.KAFKA_BOOTSTRAP_SERVERS);
        kafkaProperties.setProperty("group.id","groupId_" + System.currentTimeMillis());
        kafkaProperties.setProperty("auto.offset.reset","latest");
        FlinkKafkaConsumer<String> consumer =
                new FlinkKafkaConsumer<String>(SysConst.KAFKA_TOPIC_NAME, new SimpleStringSchema(), kafkaProperties);
        DataStream<UserBehavior> dataStream = env.addSource(consumer).map(x -> {
            UserBehavior userBehavior = null;
            try{
                userBehavior = JsonUtil.toJavaObject(x,UserBehavior.class);
            }catch (Exception ex){
                ex.printStackTrace();
            }
            return userBehavior;
        }).assignTimestampsAndWatermarks
                (WatermarkStrategy.<UserBehavior>forMonotonousTimestamps().withTimestampAssigner((SerializableTimestampAssigner<UserBehavior>)
                        (userBehavior, l) -> userBehavior.getBehaviorTime()));
        dataStream.keyBy((KeySelector<UserBehavior, String>) UserBehavior::getPage).window(TumblingEventTimeWindows.of(Time.minutes(1)))
                .trigger(new TimeIntervalTrigger<>(5,TimeUnit.SECONDS))
                .aggregate(new UVStatAggregate(),new WindowResultFunction())
                .map(x -> {
                    System.out.println("key:" + x.page + ",window time:" + DateUtil.formatTimeStamp(x.windowTime,"yyyy-MM-dd HH:mm:ss") + ",uv:" + x.uv);
                    return null;
                });
        env.execute();
    }

    public static class WindowResultFunction implements WindowFunction<Integer, PageUVResult, String, TimeWindow> {

        @Override
        public void apply(
                String key,
                TimeWindow window,
                Iterable<Integer> aggregateResult,
                Collector<PageUVResult> collector
        ) throws Exception {
            Integer count = aggregateResult.iterator().next();
            collector.collect(PageUVResult.of(key, window.getEnd(), count));
        }
    }

    public static class UVStatAggregate implements AggregateFunction<UserBehavior, Set<String>, Integer> {

        @Override
        public Set<String> createAccumulator() {
            return new HashSet<>();
        }

        @Override
        public Set<String> add(UserBehavior userBehavior, Set<String> accumulator) {
            accumulator.add(userBehavior.getUserId());
            return accumulator;
        }

        @Override
        public Integer getResult(Set<String> accumulator) {
            return accumulator.size();
        }

        @Override
        public Set<String> merge(Set<String> a, Set<String> b) {
            a.addAll(b);
            return a;
        }
    }
}

本实现方式优缺点

  • 优点:
    实现方案较为简单。
  • 缺点:
    基于Set实现,用户数据存储在内存中,如果统计周期内用户量较大需要耗费较大的内存空间。
完整代码已上传至:https://github.com/xl-xueling/uvstatmultiplans.git

标签:Set,return,uv,实时,源码,new,userBehavior,public
From: https://www.cnblogs.com/xl-xueling/p/17637311.html

相关文章

  • ConcurrentHashMap 源码详解
    ConcurrentHashMap是Java提供的一个并发散列映射实现,它允许多个线程同时读写而不需要同步整个数据结构。它是线程安全的,并且相比于其他线程安全的Map实现(如Collections.synchronizedMap或Hashtable),它提供了更高的并发性能。以下是ConcurrentHashMap的一些核心特性和相应......
  • 基于微信小程序的网上交易平台的设计与实现-计算机毕业设计源码+LW文档
    摘 要随着互联网技术的发展,传统的商品销售迎来了机遇,我国是个人口大国,商品的需求量大,如何推广商品的销售是企业非常关注的事情。随着电子商务多元化的发展,各种类型的商品逐渐转移到线上销售。在互联网的帮助下,带动企业打开销路,促进商品销售的可持续发展。同时,通过基于微信小程......
  • 会议记录管理系统-计算机毕业设计源码+LW文档
    摘 要随着信息技术的发展,管理系统越来越成熟,各种企事业单位使用各种类型的管理系统来提高工作效率,从而降低手工劳动的弊端。公司一直以来都非常重视公司信息化的发展,近几年来随着公司规模扩大,业务逐渐增加,公司对会员的管理也愈发的困难。因此,公司提出通过开发会议记录管理系统......
  • 基于PHP的花茶交流平台的设计与实现-计算机毕业设计源码+LW文档
    摘  要现在这种紧张压抑的生活状态,人们已经渐渐忘记了原本的样子,我们有时会想着去放下手中的工作,学会享受生活,品鉴人间趣味。在我国近五千年的历史长河中,茶文化对人们来说有着深厚含义。对于有着丰富生活阅历的人来说,品茶聊天就是最佳休闲方式。借此我产生了灵感设计了茶交流......
  • 如何利用量化交易平台获取实时行情数据进行分析之代码分享
    量化交易平台之行情数据获取方式续通过开放的方式提供全球股票(A股、港股、美股)、期货(国内期货、国际期货)等历史数据查询及实盘实时行情订阅平台特色:全球大多数行情一次购买即可享受全部数据行情订阅。历史数据可以提供下载服务方便使用云端自定义指数合成能力自定义品种的支持(如不......
  • 基于SpringBoot的点餐系统的设计与实现-计算机毕业设计源码+LW文档
    摘要:随着移动互联网的快速发展,微信小程序作为一种轻量级、快速启动、无需下载安装的应用程序形式,在市场中越来越受欢迎。同时,餐饮行业也是一个充满机会的领域,尤其是在新冠疫情后,外卖、自取等模式逐渐成为餐饮行业的主要销售方式。因此,开发一款基于微信小程序的点餐系统,能够提高餐......
  • 基于微信小程序的景区服务系统-计算机毕业设计源码+LW文档
    摘要随着社会经济的发展,各行业竞争激烈,年轻群体工作压力大,越来越多的人希望通过旅游来缓解压力。而传统的旅行社都是通过事先定制的线路和固定时间,没有个性化定制服务,不能满足现代用户的需求。对于此,开发景区服务系统可以很好的解决用户个性化旅游的服务,通过系统查询各种景点信息,......
  • Unity之如何计算实时帧率
    代码如下:usingSystem;usingSystem.Collections;usingSystem.Collections.Generic;usingUnityEngine;publicclassCalcFPSTool:MonoBehaviour{privatefloatm_UpdateInterval,m_FPS;floatUpdateInterval{set=>m_UpdateInterval=Mathf.Cla......
  • formDataToJSON 抽丝剥茧 formData 与 Object 的转换【玩转源码】
    前言通过axios源码阅读,实现formDataToJSON抽丝剥茧formData与Object的转换,接下来详细分享整个过程。formDataToJSON抽丝剥茧formData与Object的转换FormData对象FormData对象用以将数据编译成键值对,以便用XMLHttpRequest来发送数据。FormData对象主要用于发送表单数......
  • pd.get_dummy() 详细用法即源码解析
    源代码分析pandas:encoding.pyget_dummy()解析defget_dummies(data,prefix=None, prefix_sep:str|Iterable[str]|dict[str,str]="_",dummy_na:bool=False,columns=None,sparse:bool=False,drop_first:bool=F......