UVStatMultiPlans(GitHub)项目持续收集各种高性能实时uv实现方案并对各种实现方案的优缺点进行对比分析!
需求描述
统计每分钟用户每个页面的uv访问量。
Kafka数据格式
{"userId":"c61b801e-22e7-4238-8f67-90968a40f2a7","page":"page_1","behaviorTime":1692247408129}
{"userId":"c61b801e-22e7-4238-8f67-90968a40f2a7","page":"page_2","behaviorTime":1692247408129}
代码实现
完整代码已上传至:https://github.com/xl-xueling/uvstatmultiplans.git
public class UVStatPlan1 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(5);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.getConfig().setAutoWatermarkInterval(TimeUnit.MINUTES.toMillis(10));
Properties kafkaProperties = new Properties();
kafkaProperties.setProperty("bootstrap.servers", SysConst.KAFKA_BOOTSTRAP_SERVERS);
kafkaProperties.setProperty("group.id","groupId_" + System.currentTimeMillis());
kafkaProperties.setProperty("auto.offset.reset","latest");
FlinkKafkaConsumer<String> consumer =
new FlinkKafkaConsumer<String>(SysConst.KAFKA_TOPIC_NAME, new SimpleStringSchema(), kafkaProperties);
DataStream<UserBehavior> dataStream = env.addSource(consumer).map(x -> {
UserBehavior userBehavior = null;
try{
userBehavior = JsonUtil.toJavaObject(x,UserBehavior.class);
}catch (Exception ex){
ex.printStackTrace();
}
return userBehavior;
}).assignTimestampsAndWatermarks
(WatermarkStrategy.<UserBehavior>forMonotonousTimestamps().withTimestampAssigner((SerializableTimestampAssigner<UserBehavior>)
(userBehavior, l) -> userBehavior.getBehaviorTime()));
dataStream.keyBy((KeySelector<UserBehavior, String>) UserBehavior::getPage).window(TumblingEventTimeWindows.of(Time.minutes(1)))
.trigger(new TimeIntervalTrigger<>(5,TimeUnit.SECONDS))
.aggregate(new UVStatAggregate(),new WindowResultFunction())
.map(x -> {
System.out.println("key:" + x.page + ",window time:" + DateUtil.formatTimeStamp(x.windowTime,"yyyy-MM-dd HH:mm:ss") + ",uv:" + x.uv);
return null;
});
env.execute();
}
public static class WindowResultFunction implements WindowFunction<Integer, PageUVResult, String, TimeWindow> {
@Override
public void apply(
String key,
TimeWindow window,
Iterable<Integer> aggregateResult,
Collector<PageUVResult> collector
) throws Exception {
Integer count = aggregateResult.iterator().next();
collector.collect(PageUVResult.of(key, window.getEnd(), count));
}
}
public static class UVStatAggregate implements AggregateFunction<UserBehavior, Set<String>, Integer> {
@Override
public Set<String> createAccumulator() {
return new HashSet<>();
}
@Override
public Set<String> add(UserBehavior userBehavior, Set<String> accumulator) {
accumulator.add(userBehavior.getUserId());
return accumulator;
}
@Override
public Integer getResult(Set<String> accumulator) {
return accumulator.size();
}
@Override
public Set<String> merge(Set<String> a, Set<String> b) {
a.addAll(b);
return a;
}
}
}
本实现方式优缺点
- 优点:
实现方案较为简单。 - 缺点:
基于Set实现,用户数据存储在内存中,如果统计周期内用户量较大需要耗费较大的内存空间。
完整代码已上传至:https://github.com/xl-xueling/uvstatmultiplans.git
标签:Set,return,uv,实时,源码,new,userBehavior,public
From: https://www.cnblogs.com/xl-xueling/p/17637311.html