实时项目笔记处理
一、行为日志数据采集
1、ngx_kafka_module安装
先提前安装好nginx和kafka组件,目的配置nginx,nginx获取到分布式系统的消息轮询进行分发到kafka中进行消费!
- 安装编译c客户端的kafka源码
#git 拉取librdkafka
git clone https://github.com/edenhill/librdkafka
#进入librdkafka环境目录
cd librdkafka
# 安装c++环境
yum install -y gcc gcc-c++ pcre-devel zlib-devel
# 编译nginx安装包
./configure
# 安装nginx软件
make && make install
- 安装nginx整合kafka的ngx_kafka_module插件
# 下载nginx插件源码包
git clone https://github.com/brgliuwei/ngx_kafka_module
#进入nginx源码包根目录下
cd /usr/local/nginx/nginx-1.20.2
# 给nginx源码包添加插件
./configure --add-module=/root/ngx_kafka_module
# 安装nginx环境
make && make install
# 测试nginx环境是否安装成功
/usr/local/nginx/sbin/nginx -V
- 配置nginx.conf文件
#user nobody;
worker_processes 1;
#error_log logs/error.log;
#error_log logs/error.log notice;
#error_log logs/error.log info;
#pid logs/nginx.pid;
events {
worker_connections 1024;
}
http {
include mime.types;
default_type application/octet-stream;
#log_format main '$remote_addr - $remote_user [$time_local] "$request" '
# '$status $body_bytes_sent "$http_referer" '
# '"$http_user_agent" "$http_x_forwarded_for"';
#access_log logs/access.log main;
sendfile on;
#tcp_nopush on;
#keepalive_timeout 0;
keepalive_timeout 65;
#gzip on;
# 配置kafka
kafka;
kafka_broker_list node1:9092 master:9092 node2:9092;
server {
listen 80;
server_name localhost;
#charset koi8-r;
#access_log logs/host.access.log main;
#设置kafka请求地址
location /logs/access/topic {
#kafka分区轮询分发
kafka_partition auto;
# 设置的主题是action_log
kafka_topic action_log;
}
#error_page 404 /404.html;
# redirect server error pages to the static page /50x.html
#
error_page 500 502 503 504 /50x.html;
location = /50x.html {
root html;
}
}
}
- 测试从nginx获取的数据传输进kafka
# 发送消息进kafka中进行消费
curl localhost/logs/access/topic -d "message send to kafka topic"
# 显示发送的详细信息
curl localhost/logs/access/topic -d "message send to kafka topic" -v
- 启动kafka消费者
kafka-console-consumer.sh --bootstrap-server node1:9092,master:9092,node2:9092 --topic action_log --from-beginning
注意:如果出现以下库找不到情况下,就重新加载so库。
#lib
echo "/usr/local/lib" >> /etc/ld.so.conf
ldconfig
2、OpenResty(推荐使用)
通过lua脚本语言编写,http请求访问,nginx轮询生成日志行为数据文件,使用flume拉取日志行为文件到kafka中进行消费!
- 安装OpenResty依赖环境。
yum install -y readline-devel pcre-devel openssl-devel gcc perl wget
- 使用wget下载OpenResty安装包
wget https://openresty.org/download/openresty-1.13.6.2.tar.gz
- 解压安装包
tar -zxvf openresty-1.13.6.2.tar.gz -C /usr/local/openresty/
- 编译OpenResty
进入到openresty自带的和LuaJIT整合的目录,然后编译安装.
cd /usr/local/openresty/openresty-1.13.6.2/bundle/LuaJIT-2.1-20180420
make && make install
- 切换到openresty的安装目录,预编译openresty
cd /usr/local/src/openresty-1.13.6.2
./configure
- 编译安装
gmake && gmake install
- 启动openresty配套的nginx
cd /usr/local/openresty/nginx/sbin/
# 启动OpenResty中nginx
./nginx
- 查看nginx的详细信息
/usr/local/openresty/nginx/sbin/nginx -V
- 使用浏览器访问nginx
- nginx结合lua脚本语言(案例测试)
修改openresty下nginx的配置文件
vim /usr/local/openresty/nginx/conf/nginx.conf
在http的配置下添加如下配置(测试页面输出字符)
location /lua {
default_type 'text/html';
content_by_lua 'ngx.say("hello yjx")';
}
测试http请求访问
- 编写lua脚本语言,伪装成空图片,写入到特定指定的日志文件中
location /log.gif {
#伪装成gif文件
default_type image/gif;
#本身关闭access_log
access_log off;
#使用lua将nginx的接收的参数写入到日志文件中
log_by_lua_file 'conf/log.lua';
#返回空图片
empty_gif;
}
- 在nginx的conf目录下创建一个log.lua文件
vim /usr/local/openresty/nginx/conf/log.lua
但是以上方法容易导致行为数据文件过大,造成读写效率变低。
- 解决方案:
编写lua脚本语言,让行为数据生成一个个小文件数据
-- 引入lua用来解析json的库
local cjson = require "cjson"
-- 获取请求参数列表
local request_args_tab = ngx.req.get_uri_args()
-- 获取当前系统时间
local time = os.date("%Y%m%d%H",unixtime)
-- 使用lua的io打开一个文件,如果文件不存在,就创建,a为append模式
local path = "/logs/access-" .. time .. ".log"
local file = io.open(path, "a")
-- 定义一个json对象
local log_json = {}
-- 将参数的K和V迭代出来,添加到json对象中
for k, v in pairs(request_args_tab) do
log_json[k] = v
end
-- 将json写入到指定的log文件,末尾追加换行
file:write(cjson.encode(log_json), "\n")
-- 将数据写入
file:flush()
授予读写权限给/logs目录,让logs目录有权限进行读写操作!
mkdir /logs
chmod o+w /logs
#或者将/logs目录的所属用户改成nobody
chown -R nobody:nobody /logs
在nginx所在的机器上安装Flume,使用TailDirSource和KafkaChannel将数据采集到Kafka中,也就是使用flume拉取nginx上的行为日志数据。
- flume配置文件信息(使用kafka channel模式把日志文件数据拉取到kafka中进行消费)
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /root/taildir_position.json
a1.sources.r1.filegroups = f1
#使用正则表达式
a1.sources.r1.filegroups.f1 = /logs/access-.*\.log
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = node1:9092,master:9092,node2:9092
a1.channels.c1.kafka.consumer.group.id = flume-consumer
a1.channels.c1.kafka.topic = action_log
a1.channels.c1.parseAsFlumeEvent= false
a1.sources.r1.channels = c1
- OpenResty和kafka整合插件
下载插件源码
wget https://github.com/doujiang24/lua-resty-kafka/archive/master.zip
解压lua-resty-kafka的源码
#首先安装unzip解压工具,如果没有按照unzip会报错
yum install -y unzip
#解压lua-resty-kafka的源码
unzip master.zip -d /usr/local/src/
#在/usr/local/openresty/lualib下创建一个kafka目录,用于保存lua-resty-kafka整合的模块代码
mkdir /usr/local/openresty/lualib/kafka
#将lua-resty-kafka整合的模块代码拷贝到/usr/local/openresty/lualib/kafka目录
cp -r /usr/local/src/lua-resty-kafka-master/lib/resty/ /usr/local/openresty/lualib/kafka/
修改nginx的配置文件
vim /usr/local/openresty/nginx/conf/nginx.conf
在http配置的下面、server配置同级的部分添加如下配置
lua_package_path "/usr/local/openresty/lualib/kafka/?.lua;;";
- 在server配置的下面添加如下配置
location /log {
}
- 启动Zookeeper和kafka
zookeeper-3.4.10/bin/zkServer.sh start
kafka_2.11-0.10.2.1/bin/kafka-server-start.sh -daemon /bigdata/kafka_2.11-0.10.2.1/config/server.properties
- 创建主题
kafka_2.11-0.10.2.1/bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic test --replication-factor 3 --partitions 3
3、FileBeat采集行为数据
Filebeat 是一个用于转发和集中日志数据的轻量级传送器。作为代理安装在您的服务器上,Filebeat 监控您指定的日志文件或位置,收集日志事件,并将它们转发到Elasticsearch或 Logstash以进行索引。
当您启动 Filebeat 时,它会启动一个或多个输入,这些输入会在您为日志数据指定的位置中查找。对于 Filebeat 定位的每个日志,Filebeat 都会启动一个收割机。每个harvester 读取单个日志以获取新内容并将新日志数据发送到libbeat,libbeat 聚合事件并将聚合数据发送到您为Filebeat 配置的输出。
- FileBeat和Flume比较
指标 | Flume | FileBeat |
---|---|---|
内存 | 大 | 小 |
CPU | 大 | 小 |
背压敏感协议 | 否 | 是 |
插件 | 需要写API | 多 |
功能 | 从多种输入端采集到多种输出端 | 传输 |
轻重 | 相对较重 | 轻量级二进制文件 |
过滤能力 | 自带了分区和拦截器的功能 | 有过滤能力但是较弱 |
进程 | 一台服务有多个进程,挂掉之后需要手动拉起 | 十分稳定 |
编写语言 | Java | Go |
集群 | 分布式 | 单节点 |
二次开发或扩展开发 | 一般 | 易 |
- 安装
curl -L -O https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-8.1.3-x86_64.rpm
##默认安装路径在 /usr/share/filebeat/
sudo rpm -vi filebeat-8.1.3-x86_64.rpm
- 开启kafka模式
filebeat modules list #查看需要启动的模块
filebeat modules enable kafka #在安装目录下,启用一个或多个模块
- 创建采集配置文件(上传filebeat2kafka.yml文件到/etc/filebeat/文件目录下)
filebeat.inputs:
- type: log
paths:
- /opt/data/data.json
output.kafka:
# initial brokers for reading cluster metadata
hosts: ["node01:9092", "node02:9092", "node03:9092"]
# message topic selection + partitioning
topic: filebeat
partition.round_robin:
reachable_only: true
required_acks: 1
max_message_bytes: 1000000
- kafka0.11版本以下使用以下配置
filebeat.inputs:
- type: log
paths:
- /home/data/data.json
output.kafka:
version: 0.11.0.2
enabled: true
# initial brokers for reading cluster metadata
hosts: ["node1:9092", "master:9092", "node2:9092"]
# message topic selection + partitioning
topic: filebeat
partition.round_robin:
reachable_only: true
required_acks: 1
max_message_bytes: 1000000
- 开启FileBeat工具采集日志行为数据
filebeat -e -c filebeat2kafka.yml
二、乐字节直播项目搭建
1、导包(普通Maven工程)
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<flink.version>1.15.2</flink.version>
<scala.version>2.12.2</scala.version>
<log4j.version>2.12.1</log4j.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<!--flink客户端-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<!--本地运行的webUI-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web</artifactId>
<version>${flink.version}</version>
</dependency>
<!--flink与kafka整合-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.16</version>
</dependency>
<!--状态后端-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb</artifactId>
<version>${flink.version}</version>
</dependency>
<!--日志系统-->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
<version>5.3.21</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.3.4</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cep</artifactId>
<version>${flink.version}</version>
</dependency>
<!--json格式依赖-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
</dependency>
<!--csv格式依赖-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Flink SQL -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Flink CDC 的依赖 -->
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>2.3.0</version>
</dependency>
<!-- flink与File整合的依赖 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-files</artifactId>
<version>${flink.version}</version>
</dependency>
<!--fastjson工具类-->
<dependency>
<groupId>com.alibaba.fastjson2</groupId>
<artifactId>fastjson2</artifactId>
<version>2.0.41</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
<version>1.21</version>
</dependency>
<!--Alibaba Druid数据库连接池-->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>1.2.15</version>
</dependency>
<!--发送异步HTTP请求的Java工具包 -->
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpasyncclient</artifactId>
<version>4.1.4</version>
</dependency>
<!--连接clickhouse 驱动包-->
<dependency>
<groupId>com.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<version>0.4.6</version>
</dependency>
<!--redis连接包-->
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.12</artifactId>
<version>1.1.0</version>
</dependency>
<!--drools规则-->
<dependency>
<groupId>org.drools</groupId>
<artifactId>drools-compiler</artifactId>
<version>7.23.0.Final</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.2</version>
</dependency>
</dependencies>
2、编写Flink工具类
package com.zwf.utils;
/**
* @author MrZeng
* @version 1.0
* @date 2024-01-15 17:04
*/
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
/**
* Flink工具类
*/
public class FlinkUtils {
/**
* @param ConfigPath 外部配置文件信息
* @param serialize 序列化器类
* @param <T> 返回类型
* @return
*/
//创建flink环境
public static final StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
//获取外部文件参数对象
public static ParameterTool param=null;
public static <T> DataStream<T> createKafka(String configPath, Class<? extends DeserializationSchema<T>> deserialization) throws Exception{
//加载外部文件配置
param=ParameterTool.fromPropertiesFile(configPath);
//获取checkpoint间隔时间
long chk = param.getLong("checkpoint.interval",3000L);
//获取checkpoint路径
String ckPath = param.get("checkpoint.path");
//获取kafka topic
String topics = param.get("kafka.topics");
env.enableCheckpointing(chk, CheckpointingMode.EXACTLY_ONCE);
//设置chk文件系统路径
env.setStateBackend(new FsStateBackend(ckPath));
//设置外部存储策略
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//获取topic数组集
String[] split = topics.split(",");
List<String> topis = Arrays.asList(split);
Properties properties = param.getProperties();
FlinkKafkaConsumer<T> kafkaConsumer = new FlinkKafkaConsumer<>(topis, deserialization.newInstance(), properties);
//不提交偏移量进入chk
kafkaConsumer.setCommitOffsetsOnCheckpoints(false);
return env.addSource(kafkaConsumer);
}
/**
*
* @param args 参数
* @param kafkaMeta kafka自定义反序列化器获取kafka元数据
* @return
* @param <T>
* @throws Exception
*/
public static <T> DataStream<T> createKafkaV2(String args,Class<? extends KafkaDeserializationSchema<T>> kafkaMeta) throws Exception {
param=ParameterTool.fromPropertiesFile(args);
Long interval = param.getLong("checkpoint.interval", 3000L);
String chkPath = param.get("checkpoint.path");
String topics = param.getRequired("kafka.topics");
List<String> topicList = Arrays.asList(topics.split(","));
//开启状态保存
env.enableCheckpointing(interval,CheckpointingMode.EXACTLY_ONCE);
//设置RockDB状态后端
env.setStateBackend(new EmbeddedRocksDBStateBackend());
env.getCheckpointConfig().setCheckpointStorage(chkPath);
//设置外部存储清除策略
env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
Properties properties = param.getProperties();
//设置kafka source
FlinkKafkaConsumer<T> consumer = new FlinkKafkaConsumer(topicList, kafkaMeta.newInstance(), properties);
//是否应在检查点上将偏移量提交回Kafka
consumer.setCommitOffsetsOnCheckpoints(false);
return env.addSource(consumer);
}
//根据主题获取数据流
public static <T> DataStream<T> createKafkaV2(ParameterTool tools,String topic,Class<? extends DeserializationSchema<T>> deserialization) throws Exception {
//检查点生成间隔时间
Long interval = tools.getLong("checkpoint.interval", 3000L);
String chkPath = tools.get("checkpoint.path");
//开启状态保存
env.enableCheckpointing(interval,CheckpointingMode.EXACTLY_ONCE);
//设置RockDB状态后端
env.setStateBackend(new EmbeddedRocksDBStateBackend());
env.getCheckpointConfig().setCheckpointStorage(chkPath);
//设置外部存储清除策略
env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
Properties properties = tools.getProperties();
//设置kafka source
FlinkKafkaConsumer<T> consumer = new FlinkKafkaConsumer(topic,deserialization.newInstance(), properties);
//是否应在检查点上将偏移量提交回Kafka
consumer.setCommitOffsetsOnCheckpoints(false);
return env.addSource(consumer);
}
}
3、flink&kafka配置信息
运行时的配置参数定义
checkpoint.interval=5000
checkpoint.path=hdfs://master:8020/chk/statebackend
kafka.topics=order-main
bootstrap.servers=node1:9092,master:9092,node2:9092
auto.offset.reset=earliest
group.id=zwf_9870
isolation.level=read_committed
amap.http.key=26ccd8417f5df7255027dbe118d5258d
amap.http.url=https://restapi.amap.com/v3/geocode/regeo
clickhouse.batch.size=3
clickhouse.batch.interval=5000
kafka.input.main=ordermain
kafka.input.detail=orderdetail
4、log4j日志文件(log4j2.properties)
rootLogger.level =Info
rootLogger.appenderRef.console.ref = ConsoleAppender
appender.console.name = ConsoleAppender
appender.console.type = CONSOLE
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
三、实时指标计算
1、新老用户统计
- 导入数据到kafka中
cat data.json | kafka-console-producer.sh --broker-list node1:9092,master:9092,node2:9092 --topic weblogs
kafka-console-consumer.sh --bootstrap-server node1:9092,master:9092,node2:9092 --topic weblogs --from-beginning
- 数据格式
{
"carrier":"中国电信",
"deviceId":"aff486e2-1942-45ae-8623-9d6ef1f29b2f",
"deviceType":"IPHONE7PLUS",
"eventId":"appLaunch",
"id":79,
"isNew":1,
"latitude":37.942441944941706,
"longitude":115.21183014458548,
"netType":"WIFI",
"osName":"ios",
"osVersion":"8.2",
"releaseChannel":"AppStore",
"resolution":"2048*1024",
"sessionId":"fqKovEGyuNM0",
"timestamp":1615814321410
}
- 字段格式说明
carrier: 运营商
deviceId: 设备ID
eventId:事件类型ID(appLaunch即App启动)
isNew:1为新用户,0为老用户
latitude:纬度
longitude:经度
netType:网络类型
osName:操作系统
osVersion:操作系统版本
releaseChannel:下载渠道
resolution:屏幕分辨率
sessionId:会话ID
timestamp:时间戳
osName:操作系统
- 编码
//pojo实体类
package com.zwf.pojo;
/**
* @author MrZeng
* @version 1.0
* @date 2024-01-16 11:01
*/
import java.util.HashMap;
public class DataBean {
//唯一标识码
private String id;
//Unique Device Identifier,唯一设备标识码
//private String udid;
//设备ID
private String deviceId;
//全局用户ID
private String guid;
//账户ID
private String account;
//
private String appId;
//应用版本
private String appVersion;
//运营商
private String carrier;
//设备类型
private String deviceType;
//事件ID
private String eventId;
//IP地址
private String ip;
//纬度
private Double latitude;
//经度
private Double longitude;
//网络类型
private String netType;
//操作系统名字
private String osName;
//操作系统版本
private String osVersion;
//下载渠道
private String releaseChannel;
//屏幕分辨率
private String resolution;
//会话ID
private String sessionId;
//事件时间戳
private Long timestamp;
//切割的会话ID 30分钟切割一次
private String newSessionId;
//国家
private String country;
//省
private String province;
//市
private String city;
//县
private String region;
//日期
private String date;
//小时
private String hour;
//设置
private HashMap<String, Object> properties;
//最后更新时间戳
private Long lastUpdate;
private int isNew; //数据存在是否是一个新用户(通常不存在)
//是不是新用户,如果为1为新用户,如果为0为老用户
private int isN;
public DataBean(){}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getDeviceId() {
return deviceId;
}
public void setDeviceId(String deviceId) {
this.deviceId = deviceId;
}
public String getGuid() {
return guid;
}
public void setGuid(String guid) {
this.guid = guid;
}
public String getAccount() {
return account;
}
public void setAccount(String account) {
this.account = account;
}
public String getAppId() {
return appId;
}
public void setAppId(String appId) {
this.appId = appId;
}
public String getAppVersion() {
return appVersion;
}
public void setAppVersion(String appVersion) {
this.appVersion = appVersion;
}
public String getCarrier() {
return carrier;
}
public void setCarrier(String carrier) {
this.carrier = carrier;
}
public String getDeviceType() {
return deviceType;
}
public void setDeviceType(String deviceType) {
this.deviceType = deviceType;
}
public String getEventId() {
return eventId;
}
public void setEventId(String eventId) {
this.eventId = eventId;
}
public String getIp() {
return ip;
}
public void setIp(String ip) {
this.ip = ip;
}
public Double getLatitude() {
return latitude;
}
public void setLatitude(Double latitude) {
this.latitude = latitude;
}
public Double getLongitude() {
return longitude;
}
public void setLongitude(Double longitude) {
this.longitude = longitude;
}
public String getNetType() {
return netType;
}
public void setNetType(String netType) {
this.netType = netType;
}
public String getOsName() {
return osName;
}
public void setOsName(String osName) {
this.osName = osName;
}
public String getOsVersion() {
return osVersion;
}
public void setOsVersion(String osVersion) {
this.osVersion = osVersion;
}
public String getReleaseChannel() {
return releaseChannel;
}
public void setReleaseChannel(String releaseChannel) {
this.releaseChannel = releaseChannel;
}
public String getResolution() {
return resolution;
}
public void setResolution(String resolution) {
this.resolution = resolution;
}
public String getSessionId() {
return sessionId;
}
public void setSessionId(String sessionId) {
this.sessionId = sessionId;
}
public Long getTimestamp() {
return timestamp;
}
public void setTimestamp(Long timestamp) {
this.timestamp = timestamp;
}
public String getNewSessionId() {
return newSessionId;
}
public void setNewSessionId(String newSessionId) {
this.newSessionId = newSessionId;
}
public String getCountry() {
return country;
}
public void setCountry(String country) {
this.country = country;
}
public String getProvince() {
return province;
}
public void setProvince(String province) {
this.province = province;
}
public String getCity() {
return city;
}
public void setCity(String city) {
this.city = city;
}
public String getRegion() {
return region;
}
public void setRegion(String region) {
this.region = region;
}
public HashMap<String, Object> getProperties() {
return properties;
}
public void setProperties(HashMap<String, Object> properties) {
this.properties = properties;
}
public Long getLastUpdate() {
return lastUpdate;
}
public void setLastUpdate(Long lastUpdate) {
this.lastUpdate = lastUpdate;
}
public int getIsNew() {
return isNew;
}
public void setIsNew(int isNew) {
this.isNew = isNew;
}
public int getIsN() {
return isN;
}
public void setIsN(int isN) {
this.isN = isN;
}
public String getDate() {
return date;
}
public void setDate(String date) {
this.date = date;
}
public String getHour() {
return hour;
}
public void setHour(String hour) {
this.hour = hour;
}
@Override
public String toString() {
return "DataBean{" +
"id=" + id +
", deviceId='" + deviceId + '\'' +
", guid='" + guid + '\'' +
", account='" + account + '\'' +
", appId='" + appId + '\'' +
", appVersion='" + appVersion + '\'' +
", carrier='" + carrier + '\'' +
", deviceType='" + deviceType + '\'' +
", eventId='" + eventId + '\'' +
", ip='" + ip + '\'' +
", latitude=" + latitude +
", longitude=" + longitude +
", netType='" + netType + '\'' +
", osName='" + osName + '\'' +
", osVersion='" + osVersion + '\'' +
", releaseChannel='" + releaseChannel + '\'' +
", resolution='" + resolution + '\'' +
", sessionId='" + sessionId + '\'' +
", timestamp=" + timestamp +
", newSessionId='" + newSessionId + '\'' +
", country='" + country + '\'' +
", province='" + province + '\'' +
", city='" + city + '\'' +
", region='" + region + '\'' +
", properties=" + properties +
", lastUpdate=" + lastUpdate +
", isNew=" + isN +
'}';
}
}
- job工作类
package com.zwf.jobs;
import com.zwf.constant.EventID;
import com.zwf.pojo.DataBean;
import com.zwf.udf.JsonToDataBean;
import com.zwf.utils.FlinkUtils;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
/**
* @author MrZeng
* @version 1.0
* @date 2024-01-16 9:28
*/
public class UserCount {
public static void main(String[] args) throws Exception {
DataStream<String> stream = FlinkUtils.createKafka(args[0], SimpleStringSchema.class);
//把json字符串转为javabean
//过滤出applaunch(手机app启动)事件中 每个设备 新老用户 标记:1 (0表示老用户、1表示新用户)
SingleOutputStreamOperator<DataBean> dataSource = stream.process(new JsonToDataBean());
SingleOutputStreamOperator<Tuple3<String, Integer, Integer>> sumRes = dataSource.filter(x -> EventID.APP_LAUNCH.equals(x.getEventId())).map(x -> Tuple3.of(x.getDeviceType(), x.getIsNew(), 1), Types.TUPLE(Types.STRING, Types.INT, Types.INT))
.keyBy(x -> Tuple2.of(x.f0, x.f1), Types.TUPLE(Types.STRING, Types.INT)).sum(2);
// sumRes.print();
//过滤eventID=applaunch出新老用户数量 计算新老用户总人数
sumRes.map(x->Tuple2.of(x.f1,x.f2),Types.TUPLE(Types.INT, Types.INT)).keyBy(x->x.f0).sum(1).print();
FlinkUtils.env.execute();
}
}
-
编写自定义函数把json字符串转换为java Bean对象
在Flink中实现keyBy,要按照很多的维度进行keyBy,然后sum,在sink。
package com.zwf.udf;
import com.alibaba.fastjson2.JSONObject;
import com.zwf.pojo.DataBean;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
/**
* @author MrZeng
* @version 1.0
* @date 2024-01-16 11:01
*/
public class JsonToDataBean extends ProcessFunction<String, DataBean> {
@Override
public void processElement(String value, ProcessFunction<String, DataBean>.Context ctx, Collector<DataBean> out) throws Exception {
try {
DataBean dataBean = JSONObject.parseObject(value, DataBean.class);
out.collect(dataBean);
}catch (Exception e){
}
}
}
- 异步IO获取地理位置(经纬度转省市区)
本项目使用了高德地图API,对数据的经纬度进行转换,把数据中的经纬度转换为省市区!
采用httpclient工具,对高德地图API进行异步请求操作,在查询的时候把经纬度转换为省市区。
- udf(富有异步接口对api进行http请求获取数据)
package com.zwf.udf;
import com.alibaba.fastjson2.JSONObject;
import com.zwf.pojo.DataBean;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.apache.http.HttpResponse;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
import org.apache.http.impl.nio.client.HttpAsyncClients;
import org.apache.http.util.EntityUtils;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.function.Supplier;
/**
* @author MrZeng
* @version 1.0
* @date 2024-01-16 14:53
*/
public class LocationFunction extends RichAsyncFunction<DataBean,DataBean> {
//创建异步请求对象
CloseableHttpAsyncClient httpclient=null;
//创建url
private String url=null;
//创建key
private String key=null;
//创建最大连接数
private int maxConnectNum;
public LocationFunction(String url, String key, int maxConnectNum) {
this.url = url;
this.key = key;
this.maxConnectNum = maxConnectNum;
}
@Override
public void open(Configuration parameters) throws Exception {
//构建配置信息
RequestConfig config=RequestConfig.custom().build();
//构建异步连接池
httpclient= HttpAsyncClients.custom()
.setDefaultRequestConfig(config)
.setMaxConnTotal(maxConnectNum).build();
//开启异步请求
httpclient.start();
}
@Override
public void asyncInvoke(DataBean bean, ResultFuture<DataBean> resultFuture) {
//获取经纬度
Double longitude = bean.getLongitude();
Double latitude = bean.getLatitude();
//构建http Get请求
HttpGet get=new HttpGet(url+"?location="+longitude+","+latitude+"&key="+key);
//发送get请求
Future<HttpResponse> execute = httpclient.execute(get, null);
//获取相应结果
CompletableFuture.supplyAsync(new Supplier<DataBean>() {
@Override
public DataBean get() {
//获取http响应
HttpResponse response=null;
try {
//省
String province = null;
//市
String city = null;
//地区
String region=null;
//国家
String country=null;
//获取响应信息
response = execute.get();
if(response.getStatusLine().getStatusCode()==200){
//解析返回的字段串
String resInfo = EntityUtils.toString(response.getEntity());
JSONObject jsonObject = JSONObject.parseObject(resInfo);
JSONObject regeocode = jsonObject.getJSONObject("regeocode");
//获取的对象数据存在并且不为null.
if(regeocode!=null&&!regeocode.isEmpty()){
JSONObject addressComponent = regeocode.getJSONObject("addressComponent");
province = addressComponent.getString("province");
country= addressComponent.getString("country");
city = addressComponent.getString("city");
region = addressComponent.getString("district");
}
}
bean.setProvince(province);
bean.setCity(city);
bean.setRegion(region);
bean.setCountry(country);
return bean;
}catch (Exception e){
return null;
}
}
//返回一个DataBean对象
}).thenAccept((DataBean resp)->{
resultFuture.complete(Collections.singleton(resp));
});
}
@Override
public void close() throws Exception {
httpclient.close();
}
}
- 创建job对象类
package com.zwf.jobs;
import com.zwf.constant.EventID;
import com.zwf.pojo.DataBean;
import com.zwf.udf.JsonToDataBean;
import com.zwf.udf.LocationFunction;
import com.zwf.utils.FlinkUtils;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import java.util.concurrent.TimeUnit;
/**
* @author MrZeng
* @version 1.0
* @date 2024-01-16 9:28
*/
public class UserCount2 {
public static void main(String[] args) throws Exception {
DataStream<String> stream = FlinkUtils.createKafka(args[0], SimpleStringSchema.class);
//把json字符串转为javabean
//过滤出applaunch事件中 每个设备新老用户数量
SingleOutputStreamOperator<DataBean> dataSource = stream.process(new JsonToDataBean());
//过滤出app客户端运行产生的数据
SingleOutputStreamOperator<DataBean> filter = dataSource.filter(x -> EventID.APP_LAUNCH.equals(x.getEventId()));
//转换经纬度
String key = FlinkUtils.param.getRequired("amap.http.key");
String url = FlinkUtils.param.getRequired("amap.http.url");
SingleOutputStreamOperator<DataBean> source = AsyncDataStream.unorderedWait(filter, new LocationFunction(url, key, 50), 5, TimeUnit.MINUTES);
//计算每个省的新老用户人数
source.map(line->Tuple3.of(line.getProvince(),line.getIsNew(),1),Types.TUPLE(Types.STRING, Types.INT,Types.INT))
.keyBy(v->Tuple2.of(v.f0,v.f1),Types.TUPLE(Types.STRING,Types.INT)).sum(2).print();
FlinkUtils.env.execute();
}
}
2、客户访问数及用户数(去重)
同一个用户重复访问一个活动,或者多个用户重复访问一个活动情况下,我们需要对数据进行去重,才能获取到用户数或者活动访问数
缺点:使用hashSet会造成资源的大量消耗!
- 案例(使用状态对象去重)
package com.zwf.Demo;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import java.util.HashSet;
/**
* @author MrZeng
* @version 1.0
* @date 2024-01-16 20:48
*/
/**
* u1,A1,view
* u1,A1,view
* u1,A1,view
* u1,A1,join
* u1,A1,join
* u2,A1,view
* u2,A1,join
* u1,A2,view
* u1,A2,view
* u1,A2,join
*/
public class ActiveCount {
public static void main(String[] args) throws Exception {
//统计用户 activeId eventId
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> source = environment.socketTextStream("192.168.147.110", 8888);
SingleOutputStreamOperator<Tuple3<String, String, String>> map = source.map(lines -> {
String[] split = lines.split(",");
return Tuple3.of(split[0], split[1], split[2]);
}, Types.TUPLE(Types.STRING, Types.STRING, Types.STRING));
//对用户的活动ID和事件ID进行分组
KeyedStream<Tuple3<String, String, String>, Tuple2<String, String>> keyedStream = map.keyBy(tup -> Tuple2.of(tup.f1, tup.f2),Types.TUPLE(Types.STRING,Types.STRING));
//求出每个活动ID和事件ID的用户数 和 浏览记录数
keyedStream.process(new KeyedProcessFunction<Tuple2<String, String>, Tuple3<String, String, String>, Tuple4<String,String,Integer,Integer>>() {
//定义两个valueState 一个记录uid去重 一个记录浏览数
private transient ValueState<HashSet<String>> vUidCount=null;
private transient ValueState<Integer> pageViewCount=null;
@Override
public void processElement(Tuple3<String, String, String> value, KeyedProcessFunction<Tuple2<String, String>, Tuple3<String, String, String>, Tuple4<String, String, Integer, Integer>>.Context ctx, Collector<Tuple4<String, String, Integer, Integer>> out) throws Exception {
String uid = value.f0;
HashSet<String> suid = vUidCount.value();
if(suid==null){
suid=new HashSet<>();
}
suid.add(uid);
vUidCount.update(suid);
//页面访问数
Integer uCount = pageViewCount.value();
if(uCount==null){
uCount=0;
}
uCount++;
pageViewCount.update(uCount);
//活动id 事件id 用户数 页面浏览数
out.collect(Tuple4.of(value.f1, value.f2,suid.size(),uCount));
}
@Override
public void open(Configuration parameters) throws Exception {
//用户数统计
ValueStateDescriptor<HashSet<String>> uidState = new ValueStateDescriptor<>("uidCount", TypeInformation.of(new TypeHint<HashSet<String>>() {}));
vUidCount=getRuntimeContext().getState(uidState);
//页面访问数
ValueStateDescriptor<Integer> uidCount = new ValueStateDescriptor<>("pageViewCount", Types.INT);
pageViewCount=getRuntimeContext().getState(uidCount);
}
}).print();
environment.execute();
}
}
3、使用布隆过滤器去重
优点:轻量级,耗费资源少,通过判断数据一定不存在来对数据进行去重操作,可以用来统计用户数和用户浏览数。
- 案例
package com.zwf.Demo;
import com.google.common.hash.BloomFilter;
import com.google.common.hash.Funnels;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
/**
* @author MrZeng
* @version 1.0
* @date 2024-01-16 20:48
*/
/**
* u1,A1,view
* u1,A1,view
* u1,A1,view
* u1,A1,join
* u1,A1,join
* u2,A1,view
* u2,A1,join
* u1,A2,view
* u1,A2,view
* u1,A2,join
*/
public class ActiveCount1 {
public static void main(String[] args) throws Exception {
//统计用户 activeId eventId
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> source = environment.socketTextStream("192.168.147.110", 8888);
SingleOutputStreamOperator<Tuple3<String, String, String>> map = source.map(lines -> {
String[] split = lines.split(",");
return Tuple3.of(split[0], split[1], split[2]);
}, Types.TUPLE(Types.STRING, Types.STRING, Types.STRING));
//对用户的活动ID和事件ID进行分组
KeyedStream<Tuple3<String, String, String>, Tuple2<String, String>> keyedStream = map.keyBy(tup -> Tuple2.of(tup.f1, tup.f2),Types.TUPLE(Types.STRING,Types.STRING));
//求出每个活动ID和事件ID的用户数 和 浏览记录数
keyedStream.process(new KeyedProcessFunction<Tuple2<String, String>, Tuple3<String, String, String>, Tuple4<String,String,Integer,Integer>>() {
//定义两个valueState 一个记录uid去重 一个记录浏览数
private transient ValueState<Integer> countUid=null;
private transient ValueState<Integer> vPageCount=null;
private transient ValueState<BloomFilter<String>> bloomFilter;
@Override
public void processElement(Tuple3<String, String, String> value, KeyedProcessFunction<Tuple2<String, String>, Tuple3<String, String, String>, Tuple4<String, String, Integer, Integer>>.Context ctx, Collector<Tuple4<String, String, Integer, Integer>> out) throws Exception {
String uid = value.f0;
BloomFilter<String> filter = bloomFilter.value();
Integer uidCount = countUid.value();
//filter对象为null
if(filter==null){
//初始化布隆过滤器
filter = BloomFilter.create(Funnels.unencodedCharsFunnel(), 10000);
uidCount=0;
}
//如果布隆过滤器中不存在用户id 就计数一次
if(!filter.mightContain(uid)){
filter.put(uid);
uidCount++;
}
//更新布隆对象
bloomFilter.update(filter);
//更新用户数
countUid.update(uidCount);
Integer uCount = vPageCount.value();
if(uCount==null){
uCount=0;
}
uCount++;
vPageCount.update(uCount);
//活动id 事件id 用户数 页面浏览数
out.collect(Tuple4.of(value.f1, value.f2,uidCount,uCount));
}
@Override
public void open(Configuration parameters) throws Exception {
//布隆过滤器
ValueStateDescriptor<BloomFilter<String>> bloom = new ValueStateDescriptor<>("bloomfilter", TypeInformation.of(new TypeHint<BloomFilter<String>>() {}));
bloomFilter=getRuntimeContext().getState(bloom);
ValueStateDescriptor<Integer> uidState = new ValueStateDescriptor<>("uidState", TypeInformation.of(new TypeHint<Integer>() {}));
countUid=getRuntimeContext().getState(uidState);
ValueStateDescriptor<Integer> uidCount = new ValueStateDescriptor<>("vUidCount", Types.INT);
vPageCount=getRuntimeContext().getState(uidCount);
}
}).print();
environment.execute();
}
}
- 项目实战
使用布隆过滤器判断是否存在设备ID,不存在设备ID就标记为新设备!
package com.zwf.udf;
import com.google.common.hash.BloomFilter;
import com.google.common.hash.Funnels;
import com.zwf.pojo.DataBean;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
/**
* @author MrZeng
* @version 1.0
* @date 2024-01-17 9:58
*/
public class DeviceIdBindIsNewFunction extends KeyedProcessFunction<String, DataBean,DataBean> {
private transient ValueState<BloomFilter<String>> bloomFilter;
@Override
public void open(Configuration parameters) throws Exception {
//初始化布隆过滤器
ValueStateDescriptor<BloomFilter<String>> deviceIdBloom = new ValueStateDescriptor<>("device_Id_Bloom", TypeInformation.of(new TypeHint<BloomFilter<String>>() {}));
bloomFilter=getRuntimeContext().getState(deviceIdBloom);
}
@Override
public void processElement(DataBean dataBean, KeyedProcessFunction<String, DataBean, DataBean>.Context ctx, Collector<DataBean> out) throws Exception {
String deviceId = dataBean.getDeviceId();
BloomFilter<String> filter = bloomFilter.value();
if(filter==null){
filter=BloomFilter.create(Funnels.unencodedCharsFunnel(),10000);
}
//如果布隆过滤器中不存在的值
if(!filter.mightContain(deviceId)){
filter.put(deviceId);
dataBean.setIsN(1);
bloomFilter.update(filter);
}
out.collect(dataBean);
}
}
job工具类,处理设备浏览数据,标记是否新设备!
package com.zwf.jobs;
import com.alibaba.fastjson2.JSONObject;
import com.zwf.constant.EventID;
import com.zwf.pojo.DataBean;
import com.zwf.udf.DeviceIdBindIsNewFunction;
import com.zwf.utils.FlinkUtils;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
/**
* @author MrZeng
* @version 1.0
* @date 2024-01-17 9:52
*/
public class UserCount3 {
public static void main(String[] args) throws Exception {
//bloomFilter中不存在设备ID的用户是新用户
DataStream<String> kafkaStream = FlinkUtils.createKafka(args[0], SimpleStringSchema.class);
SingleOutputStreamOperator<DataBean> dataBeanSource = kafkaStream.map(lines -> JSONObject.parseObject(lines, DataBean.class));
//过滤出app客户端运行产生的数据
SingleOutputStreamOperator<DataBean> filter = dataBeanSource.filter(x -> EventID.APP_LAUNCH.equals(x.getEventId()));
//根据设备类型分区
filter.keyBy(databean->databean.getDeviceType()).process(new DeviceIdBindIsNewFunction())
.print();
FlinkUtils.env.execute();
}
}
- 项目实战
实现方案一
按设备类型(huawei-p40)进行keyBy,然后使用BloomFilter进行判断是否为新用户。
存在问题:如果某个设备类型的用户比较多可能会出现数据倾斜,而且一个分区中会有多个组,一个组对应一个BloomFilter,那么一个分区就会有多个BloomFilter,也会占用更多资源。
实现方案二
按照设备ID进行keyBy,然后定义一个OperatorState保存BloomFilter进行判断是否为新用户,一个分区对应一个BloomFilter,更加节省资源,不会出现数据倾斜。
解决问题
如果数据中出现isNew为空或数据字段没有isNew的情况,可以利用此方法解决。
-
方案一实现代码
-
自定义函数
package com.zwf.udf;
import com.google.common.hash.BloomFilter;
import com.google.common.hash.Funnels;
import com.zwf.pojo.DataBean;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
/**
* @author MrZeng
* @version 1.0
* @date 2024-01-17 9:58
*/
public class DeviceIdBindIsNewFunction extends KeyedProcessFunction<String, DataBean,DataBean> {
private transient ValueState<BloomFilter<String>> bloomFilter;
@Override
public void open(Configuration parameters) throws Exception {
//初始化布隆过滤器
ValueStateDescriptor<BloomFilter<String>> deviceIdBloom = new ValueStateDescriptor<>("device_Id_Bloom", TypeInformation.of(new TypeHint<BloomFilter<String>>() {}));
bloomFilter=getRuntimeContext().getState(deviceIdBloom);
}
@Override
public void processElement(DataBean dataBean, KeyedProcessFunction<String, DataBean, DataBean>.Context ctx, Collector<DataBean> out) throws Exception {
String deviceId = dataBean.getDeviceId();
BloomFilter<String> filter = bloomFilter.value();
if(filter==null){
filter=BloomFilter.create(Funnels.unencodedCharsFunnel(),10000);
}
//如果布隆过滤器中不存在的值
if(!filter.mightContain(deviceId)){
filter.put(deviceId);
//新用户
dataBean.setIsN(1);
bloomFilter.update(filter);
}
out.collect(dataBean);
}
}
- job工作类
package com.zwf.jobs;
import com.alibaba.fastjson2.JSONObject;
import com.zwf.constant.EventID;
import com.zwf.pojo.DataBean;
import com.zwf.udf.DeviceIdBindIsNewFunction;
import com.zwf.utils.FlinkUtils;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
/**
* @author MrZeng
* @version 1.0
* @date 2024-01-17 9:52
*/
public class UserCount3 {
public static void main(String[] args) throws Exception {
//bloomFilter中不存在设备ID的用户是新用户
DataStream<String> kafkaStream = FlinkUtils.createKafka(args[0], SimpleStringSchema.class);
//将JSON数据转换为dataBean对象
SingleOutputStreamOperator<DataBean> dataBeanSource = kafkaStream.map(lines -> JSONObject.parseObject(lines, DataBean.class));
//过滤出app客户端运行产生的数据
SingleOutputStreamOperator<DataBean> filter = dataBeanSource.filter(x -> EventID.APP_LAUNCH.equals(x.getEventId()));
//根据设备类型分区 根据设备ID判断是否新用户!
filter.keyBy(databean->databean.getDeviceType()).process(new DeviceIdBindIsNewFunction())
.print();
//执行指令
FlinkUtils.env.execute();
}
}
- 方案二实现代码
解决了在一个TaskManager中创建多个布隆过滤器的问题,减少资源的损耗!实现了在算子层控制布隆过滤器的个数。
package com.zwf.udf;
import com.google.common.hash.BloomFilter;
import com.google.common.hash.Funnels;
import com.zwf.pojo.DataBean;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.*;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import java.util.Collections;
/**
* @author MrZeng
* @version 1.0
* @date 2024-01-17 9:58
*/
/**
* 在算子层控制布隆过滤器
*/
public class DeviceIdBindIsNewFunctionV2 extends RichMapFunction<DataBean,DataBean> implements CheckpointedFunction {
//初始化布隆过滤器状态对象
private transient ListState<BloomFilter<String>> bloomFilterListState;
//布隆过滤器
private transient BloomFilter<String> filter;
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
//更新布隆过滤器的状态值
bloomFilterListState.update(Collections.singletonList(filter));
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
//创建状态描述
ListStateDescriptor<BloomFilter<String>> descriptor = new ListStateDescriptor<>("deviceId-uid-isNew", TypeInformation.of(new TypeHint<BloomFilter<String>>() {}));
//创建算子控制层
bloomFilterListState = context.getOperatorStateStore().getListState(descriptor);
//如果已经保存
if(context.isRestored()){
for (BloomFilter<String> bloom:bloomFilterListState.get()){
//把每个分区的布隆过滤器中的对象遍历出来
this.filter=bloom;
}
}
}
@Override
public DataBean map(DataBean value) throws Exception {
String deviceId = value.getDeviceId();
//初始化布隆过滤器 判断列表状态对象中的布隆过滤器是否存在
if(filter==null){
filter=BloomFilter.create(Funnels.unencodedCharsFunnel(),10000);
}
if(!filter.mightContain(deviceId)){
filter.put(deviceId);
value.setIsN(1);
}
return value;
}
}
job工作类创建
package com.zwf.jobs;
import com.alibaba.fastjson2.JSONObject;
import com.zwf.constant.EventID;
import com.zwf.pojo.DataBean;
import com.zwf.udf.DeviceIdBindIsNewFunctionV2;
import com.zwf.utils.FlinkUtils;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
/**
* @author MrZeng
* @version 1.0
* @date 2024-01-17 9:52
*/
public class UserCount4 {
public static void main(String[] args) throws Exception {
//bloomFilter中不存在设备ID的用户是新用户
DataStream<String> kafkaStream = FlinkUtils.createKafka(args[0], SimpleStringSchema.class);
SingleOutputStreamOperator<DataBean> dataBeanSource = kafkaStream.map(lines -> JSONObject.parseObject(lines, DataBean.class));
//过滤出app客户端运行产生的数据
SingleOutputStreamOperator<DataBean> filterData = dataBeanSource.filter(x -> EventID.APP_LAUNCH.equals(x.getEventId()));
//根据设备Id进行分区 然后在算子层控制布隆过滤器 可以形成在一个分区内共用一个布隆过滤器
filterData.keyBy(lines->lines.getDeviceId()).map(new DeviceIdBindIsNewFunctionV2()).print();
FlinkUtils.env.execute();
}
}
4、使用状态后端
- 状态后端依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_2.11</artifactId>
<version>1.12.7</version>
<scope>provided</scope>
</dependency>
对于数据量小使用内存存储状态,计算效率会更高,但是对于数据量非常大的情况下,为了考虑数据安全性,应该存储到RockDB上!
- Flink工具类进行状态后端改进
/**
*
* @param args 参数
* @param kafkaMeta kafka自定义反序列化器获取kafka元数据
* @return
* @param <T>
* @throws Exception
*/
public static <T> DataStream<T> createKafkaV2(String args,Class<? extends KafkaDeserializationSchema<T>> kafkaMeta) throws Exception {
param=ParameterTool.fromPropertiesFile(args);
Long interval = param.getLong("checkpoint.interval", 3000L);
String chkPath = param.get("checkpoint.path");
String topics = param.getRequired("kafka.topics");
List<String> topicList = Arrays.asList(topics.split(","));
//开启状态保存
env.enableCheckpointing(interval,CheckpointingMode.EXACTLY_ONCE);
//设置RockDB状态后端
env.setStateBackend(new EmbeddedRocksDBStateBackend());
env.getCheckpointConfig().setCheckpointStorage(chkPath);
//设置外部存储清除策略
env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
Properties properties = param.getProperties();
//设置kafka source
FlinkKafkaConsumer<T> consumer = new FlinkKafkaConsumer(topicList, kafkaMeta.newInstance(), properties);
//是否应在检查点上将偏移量提交回Kafka
consumer.setCommitOffsetsOnCheckpoints(false);
return env.addSource(consumer);
}
5、使用ClickHouse进行多维复杂统计
使用clickhouse数据库创表时使用ReplacingMergeTree引擎可以进行去重,但是数据不能立即merge,需要手动optimize或者后台自动合并。
问题解决方案:查询时在表名后面追加
final
关键字,就只查最新的数据,但是效率变低了!
- 设计clickhouse表
1. 可以支持维度查询(大宽表)
2. 按照时间段进行查询(将时间作为表的字段并且建分区表)
3. 可以统计出PV、UV(去重查询)
4. 支持分区(按照时间进行分区)
5. 支持覆盖(ReplacingMergeTree)(对查询结果准确性要求高的,表名后面加final)
6. 生成一个唯一的ID (在Kafka中生成唯一的ID,topic+分区+偏移量)
7. 相同的数据要进入到相同的分区(按照数据的时间即EventTime进行分区)
- sql语句
drop table tb_user_event;
CREATE TABLE tb_user_event
(
`id` String comment '数据唯一id,使用Kafka的topic+分区+偏移量',
`deviceId` String comment '设备ID',
`eventId` String comment '事件ID',
`isNew` UInt8 comment '是否是新用户1为新,0为老',
`os` String comment '系统名称',
`province` String comment '省份',
`channel` String comment '下载渠道',
`deviceType` String comment '设备类型',
`eventTime` DateTime64 comment '数据中所携带的时间',
`date` String comment 'eventTime转成YYYYMMDD格式',
`hour` String comment 'eventTime转成HH格式列席',
`processTime` DateTime comment '插入到数据库时的系统时间'
)
ENGINE = ReplacingMergeTree(processTime)
PARTITION BY (date, hour)
ORDER BY id;
- 编写代码使用自定义KafkaDeserializationSchema读取kafka数据中的topic、partition、offset,将他们拼接成唯一的ID。
package com.zwf.kafka;
/**
* @author MrZeng
* @version 1.0
* @date 2024-01-17 15:31
*/
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.kafka.clients.consumer.ConsumerRecord;
/**
* 创建自定义反序列化器
* id=kafka主题+分区ID+offset
* value=message(UTF-8)
*/
public class kafkaToUniqueId implements KafkaDeserializationSchema<Tuple2<String,String>> {
//是否到了最后一个offset 消息发送完成
@Override
public boolean isEndOfStream(Tuple2<String, String> nextElement) {
return false;
}
//反序列化操作
@Override
public Tuple2<String, String> deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
//获取主题 分区id offset
String topic = record.topic();
int partition = record.partition();
long offset = record.offset();
String id=topic+"-"+partition+"-"+offset;
//获取message信息
String mess = new String(record.value());
return Tuple2.of(id,mess);
}
@Override
public TypeInformation<Tuple2<String, String>> getProducedType() {
return Types.TUPLE(Types.STRING,Types.STRING);
}
}
- 测试类
package com.zwf.Demo;
import com.zwf.kafka.kafkaToUniqueId;
import com.zwf.utils.FlinkUtils;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
/**
* @author MrZeng
* @version 1.0
* @date 2024-01-17 15:54
*/
public class KafkaMetaTest {
public static void main(String[] args) throws Exception {
DataStream<Tuple2<String, String>> kafkaV2 = FlinkUtils.createKafkaV2(args[0], kafkaToUniqueId.class);
kafkaV2.print();
FlinkUtils.env.execute();
}
}
6、Flink将数据写入clickhouse
环境依赖
<dependency>
<groupId>com.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<version>0.4.6</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>1.15.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc</artifactId>
<version>1.15.2</version>
</dependency>
先建好表,再使用jdbc把flink计算好的数据写入clickhouse数据库中,可以快速查询!
- 案例DEMO
create table t_user(
uid Int32,
uname FixedString(10),
age Int32,
processTime datetime64
)
engine =ReplacingMergeTree(processTime)
order by uid;
- 案例代码
package com.zwf.Demo;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.sql.Timestamp;
/**
* @author MrZeng
* @version 1.0
* @date 2024-01-17 16:16
*/
public class TestClickHouseJDBCSink {
public static void main(String[] args) throws Exception {
//测试使用jdbc操作clickhouse
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> source = environment.socketTextStream("node1", 8888);
SingleOutputStreamOperator<Tuple3<Integer, String, Integer>> mapSource = (SingleOutputStreamOperator<Tuple3<Integer, String, Integer>>) source.map(new MapFunction<String, Tuple3<Integer, String, Integer>>() {
@Override
public Tuple3<Integer, String, Integer> map(String value) throws Exception {
String[] split = value.split(",");
return Tuple3.of(Integer.parseInt(split[0]), split[1], Integer.parseInt(split[2]));
}
});
mapSource.addSink(JdbcSink.sink("insert into t_user values(?,?,?,?)",(pst,rs)->{
pst.setInt(1,rs.f0);
pst.setString(2,rs.f1);
pst.setInt(3,rs.f2);
pst.setTimestamp(4,new Timestamp(System.currentTimeMillis()));
},JdbcExecutionOptions.builder().withBatchSize(3).withBatchIntervalMs(5000).build(),
new JdbcConnectionOptions
.JdbcConnectionOptionsBuilder()
.withDriverName("com.clickhouse.jdbc.ClickHouseDriver")
.withUsername("default")
.withPassword("123456")
.withUrl("jdbc:clickhouse://192.168.147.110:8123/lives").build()));
environment.execute();
}
}
- 建表(项目SQL)
CREATE TABLE tb_user_event
(
`id` String comment '数据唯一id,使用Kafka的topic+分区+偏移量',
`deviceId` String comment '设备ID',
`eventId` String comment '事件ID',
`isNew` UInt8 comment '是否是新用户1为新,0为老',
`os` String comment '系统名称',
`province` String comment '省份',
`channel` String comment '下载渠道',
`deviceType` String comment '设备类型',
`eventTime` DateTime64 comment '数据中所携带的时间',
`date` String comment 'eventTime转成YYYYMMDD格式',
`hour` String comment 'eventTime转成HH格式列席',
`processTime` DateTime comment '插入到数据库时的系统时间'
)
ENGINE = ReplacingMergeTree(processTime)
PARTITION BY (date, hour)
ORDER BY id;
- JSON转DataBean(项目代码)
package com.zwf.udf;
import com.alibaba.fastjson2.JSONObject;
import com.zwf.pojo.DataBean;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
/**
* @author MrZeng
* @version 1.0
* @date 2024-01-17 20:32
*/
/**
* 把kafka中的json数据封装进dataBean对象中
*/
public class JsonToDataBeanV2 extends ProcessFunction<Tuple2<String,String>, DataBean> {
@Override
public void processElement(Tuple2<String, String> value, ProcessFunction<Tuple2<String, String>, DataBean>.Context ctx, Collector<DataBean> out) throws Exception {
DataBean dataBean = JSONObject.parseObject(value.f1, DataBean.class);
dataBean.setId(value.f0);
out.collect(dataBean);
}
}
- 经纬度转省市区名称后,再存入clickhouse数据库中,将批量数据写入clickhouse。
package com.zwf.jobs;
import com.zwf.kafka.kafkaToUniqueId;
import com.zwf.pojo.DataBean;
import com.zwf.udf.JsonToDataBeanV2;
import com.zwf.udf.LocationFunction;
import com.zwf.utils.FlinkUtils;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import java.sql.Timestamp;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.concurrent.TimeUnit;
/**
* @author MrZeng
* @version 1.0
* @date 2024-01-17 20:30
*/
public class DataToClickHouse {
public static void main(String[] args) throws Exception {
//把时间日期封装进dataBean中
DataStream<Tuple2<String, String>> kafkaV2 = FlinkUtils.createKafkaV2(args[0], kafkaToUniqueId.class);
//获取amap配置信息 以及 clickhouse信息
String amapKey = FlinkUtils.param.get("amap.http.key");
String amapUrl = FlinkUtils.param.get("amap.http.url");
int batchSize = FlinkUtils.param.getInt("clickhouse.batch.size");
long batchInterval = FlinkUtils.param.getLong("clickhouse.batch.interval");
//把时间戳转为date hour
SingleOutputStreamOperator<DataBean> mapSource = kafkaV2.process(new JsonToDataBeanV2()).map(new MapFunction<DataBean, DataBean>() {
@Override
public DataBean map(DataBean value) throws Exception {
DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyyMMdd-HH");
Instant instant = Instant.ofEpochMilli(value.getTimestamp());
ZoneId zoneId = ZoneId.systemDefault();
String dateTime = LocalDateTime.ofInstant(instant, zoneId).format(dateTimeFormatter);
String[] split = dateTime.split("-");
value.setDate(split[0]);
value.setHour(split[1]);
return value;
}
});
AsyncDataStream.unorderedWait(mapSource,new LocationFunction(amapUrl,amapKey,50),5, TimeUnit.SECONDS)
.addSink(JdbcSink.sink("insert into tb_user_event values(?,?,?,?,?,?,?,?,?,?,?,?)",(pts,t)->{
pts.setString(1,t.getId());
pts.setString(2,t.getDeviceId());
pts.setString(3,t.getEventId());
pts.setInt(4,t.getIsNew());
pts.setString(5,t.getOsName());
pts.setString(6,t.getProvince());
pts.setString(7,t.getReleaseChannel());
pts.setString(8,t.getDeviceType());
pts.setTimestamp(9,new Timestamp(t.getTimestamp()));
pts.setString(10,t.getDate());
pts.setString(11,t.getHour());
pts.setTimestamp(12,new Timestamp((System.currentTimeMillis()/1000)));
}, JdbcExecutionOptions.builder()
.withBatchSize(batchSize)
.withBatchIntervalMs(batchInterval).build()
,new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl("jdbc:clickhouse://192.168.147.110:8123/lives")
.withUsername("default")
.withPassword("123456") .withDriverName("com.clickhouse.jdbc.ClickHouseDriver").build()));
FlinkUtils.env.execute();
}
}
7、直播数据分析
统计每个直播间的累积观看人数(PV、UV)和各个直播间实时在线人数。
实现方式:
- 将数据攒起来批量(不是简单的增量聚合,不能使用窗口,而是使用定时器)写入到Redis/MySQL(延迟高、效率高、对数据库压力小)。
- 在同一个job中,将数据写入到Clickhouse中(同一个主题(类型)的数据尽量在一个job中完成,将不同的数据打上不同的标签,侧流输出)。
统计直播间的人气值
- 在直播间中至少停留1分钟
- 在30分钟之内,同一设备ID频繁进入该直播间,算一个用户的人气值
实现方案
- 按照主播ID、deviceId进行KeyBy
- 使用ProcessFunction定义定时器
- 数据格式(进入直播间)
{
"carrier":"中国电信",
"deviceId":"aff486e2-1942-45ae-8623-9d6ef1f29b2f",
"deviceType":"IPHONE-7PLUS",
"eventId":"liveEnter",
"id":79,
"isNew":0,
"lastUpdate":2021,
"latitude":37.942441944941706,
"longitude":115.21183014458548,
"netType":"WIFI",
"osName":"ios",
"osVersion":"8.2",
"properties":{
"anchor_id":2,
"province":"江苏",
"nickname":"孙悟空",
"live_session":"CUjeX7wC",
"category":"游戏"
},
"releaseChannel":"AppStore",
"resolution":"2048*1024",
"sessionId":"mECl1CLhcmGw",
"timestamp":1616115193340
}
- 数据格式(离开直播间)
{
"carrier":"中国电信",
"deviceId":"aff486e2-1942-45ae-8623-9d6ef1f29b2f",
"deviceType":"IPHONE-7PLUS",
"eventId":"liveLeave",
"id":79,
"isNew":0,
"lastUpdate":2021,
"latitude":37.942441944941706,
"longitude":115.21183014458548,
"netType":"WIFI",
"osName":"ios",
"osVersion":"8.2",
"properties":{
"anchor_id":2,
"live_session":"CUjeX7wC"
},
"releaseChannel":"AppStore",
"resolution":"2048*1024",
"sessionId":"mECl1CLhcmGw",
"timestamp":1616115269951
}
实现方式一:(计算实时在线人数)
package com.zwf.jobs;
import com.zwf.kafka.kafkaToUniqueId;
import com.zwf.pojo.DataBean;
import com.zwf.udf.JsonToDataBeanV2;
import com.zwf.utils.FlinkUtils;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
/**
* @author MrZeng
* @version 1.0
* @date 2024-01-18 9:50
*/
public class LiveAudienceCountV2 {
public static void main(String[] args) throws Exception {
DataStream<Tuple2<String, String>> dataStream = FlinkUtils.createKafkaV2(args[0], kafkaToUniqueId.class);
SingleOutputStreamOperator<DataBean> process =
dataStream.process(new JsonToDataBeanV2());
//过滤出eventId以live开头的事件
SingleOutputStreamOperator<DataBean> liveData = process.filter(bean -> bean.getEventId().startsWith("live"));
//过滤出进入直播间和离开直播间的事件数据
SingleOutputStreamOperator<DataBean> leaveOrEnterData = liveData.filter(data -> "liveLeave".equals(data.getEventId()) || "liveEnter".equals(data.getEventId()));
//按照设备ID和直播间ID进行keyBy
leaveOrEnterData.keyBy(new KeySelector<DataBean, Tuple2<String,String>>() {
@Override
public Tuple2<String, String> getKey(DataBean value) throws Exception {
String deviceId = value.getDeviceId();
String anchorId = value.getProperties().get("anchor_id").toString();
return Tuple2.of(deviceId,anchorId);
}
}).process(new KeyedProcessFunction<Tuple2<String, String>, DataBean, Tuple2<String,Integer>>() {
//进入直播间时间状态
private transient ValueState<Long> inTimeState;
//离开直播间时间状态
private transient ValueState<Long> outTimeState;
@Override
public void open(Configuration parameters) throws Exception {
ValueStateDescriptor<Long> inTimeDes = new ValueStateDescriptor<>("in-Time-state", Types.LONG);
inTimeState=getRuntimeContext().getState(inTimeDes);
ValueStateDescriptor<Long> outTimeDes = new ValueStateDescriptor<>("out-Time-state", Types.LONG);
outTimeState=getRuntimeContext().getState(outTimeDes);
}
@Override
public void processElement(DataBean value, KeyedProcessFunction<Tuple2<String, String>, DataBean, Tuple2<String, Integer>>.Context ctx, Collector<Tuple2<String, Integer>> out) throws Exception {
//事件时间
Long timestamp = value.getTimestamp();
//如果事件时间为空 当前时间就是事件时间
if(timestamp==null){
timestamp=System.currentTimeMillis();
}
//事件ID
String eventId = value.getEventId();
//进入直播间时间
if("liveEnter".equals(eventId)){
inTimeState.update(timestamp);
//超过1分钟后触发定时器操作
ctx.timerService().registerProcessingTimeTimer(timestamp+60000+1);
}else {
//离开直播间
outTimeState.update(timestamp);
Long inTime = inTimeState.value();
//在直播间停留的时间不足一分钟 就删除定时器
//第一次离开直播间的事件时间-第二次进入直播间的时间不足一分钟时删除定时器
if(timestamp-inTime<60000) {
ctx.timerService().deleteProcessingTimeTimer(inTime + 60000 + 1);
}
}
}
//定时器 //每个直播间的人气值 在直播间时间超过一分钟人气值加1
@Override
public void onTimer(long timestamp, KeyedProcessFunction<Tuple2<String, String>, DataBean, Tuple2<String, Integer>>.OnTimerContext ctx, Collector<Tuple2<String, Integer>> out) throws Exception {
Long outTimeValue = outTimeState.value();
//如果一直在线没有离开直播间 人气值加1
if(outTimeValue==null){
out.collect(Tuple2.of(ctx.getCurrentKey().f1,1));
}else {
//再次进入直播间时,下一次进入时间-上一次离开时间 间隔超过30分钟加一次
Long inTimeValue = inTimeState.value();
if(inTimeValue-outTimeValue>30*60000){
out.collect(Tuple2.of(ctx.getCurrentKey().f1,1));
}
}
}
@Override
public void close() throws Exception {
super.close();
}
//求和计算人气值
}).keyBy(x->x.f0).sum(1).print();
FlinkUtils.env.execute();
}
}
实现方式二(把数据写入clickhouse):
- clickhouse建表
CREATE TABLE tb_anchor_audience_count
(
`id` String comment '数据唯一id,使用Kafka的topic+分区+偏移量',
`deviceId` String comment '设备ID',
`eventId` String comment '事件ID',
`anchor_id` String comment '主播ID',
`os` String comment '系统名称',
`province` String comment '省份',
`channel` String comment '下载渠道',
`deviceType` String comment '设备类型',
`eventTime` DateTime64 comment '数据中所携带的时间',
`date` String comment 'eventTime转成YYYYMMDD格式',
`hour` String comment 'eventTime转成HH格式列席',
`processTime` DateTime default now() comment '插入到数据库时的系统时间'
)
ENGINE = ReplacingMergeTree(processTime)
PARTITION BY (date, hour)
ORDER BY id;
select * from tb_anchor_audience_count;
- 直播间累积在线人数和实时在线人数自定义函数
package com.zwf.udf;
import com.google.common.hash.BloomFilter;
import com.google.common.hash.Funnels;
import com.zwf.pojo.DataBean;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
/**
* @author MrZeng
* @version 1.0
* @date 2024-01-18 11:14
*/
//求出每个直播间的uv pv数
public class AnchorDistinctTotalAudienceFunc extends KeyedProcessFunction<String, DataBean, DataBean> {
//声明三个状态对象 anchor_id(主播ID) uv(用户访客数) pv(页面浏览数) 直播间在线人数(onlineCount)
private transient ValueState<BloomFilter<String>> bloomFilterValueState;
//直播间用户访问数
private transient ValueState<Integer> uvState;
//直播间页面访问数
private transient ValueState<Integer> pvState;
//直播间在线人数
private transient ValueState<Integer> onlineCount;
//侧输出标签
private OutputTag<Tuple4<String,Integer,Integer,Integer>> outputTag=new OutputTag<>("out_streaming",TypeInformation.of(new TypeHint<Tuple4<String, Integer, Integer, Integer>>() {})){};
//日期格式化
private SimpleDateFormat formatDT=new SimpleDateFormat("yyyyMMdd-HH");
@Override
public void open(Configuration parameters) throws Exception {
ValueStateDescriptor<BloomFilter<String>> anchorIdState = new ValueStateDescriptor<>("anchor_id_state", TypeInformation.of(new TypeHint<BloomFilter<String>>() {}));
bloomFilterValueState=getRuntimeContext().getState(anchorIdState);
anchorIdState.enableTimeToLive(StateTtlConfig.newBuilder(Time.hours(3)).setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite).build());
ValueStateDescriptor<Integer> uvStateDescribe = new ValueStateDescriptor<>("uv_state", Types.INT);
uvState=getRuntimeContext().getState(uvStateDescribe);
anchorIdState.enableTimeToLive(StateTtlConfig.newBuilder(Time.hours(3)).setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite).build());
ValueStateDescriptor<Integer> pvStateDescribe = new ValueStateDescriptor<>("pv_state", Types.INT);
pvState=getRuntimeContext().getState(pvStateDescribe);
anchorIdState.enableTimeToLive(StateTtlConfig.newBuilder(Time.hours(3)).setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite).build());
ValueStateDescriptor<Integer> onlineCountStateDescribe = new ValueStateDescriptor<>("online_count", Types.INT);
onlineCount=getRuntimeContext().getState(onlineCountStateDescribe);
anchorIdState.enableTimeToLive(StateTtlConfig.newBuilder(Time.hours(3)).setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite).build());
}
@Override
public void processElement(DataBean value, KeyedProcessFunction<String, DataBean, DataBean>.Context ctx, Collector<DataBean> out) throws Exception {
//获取事件id
String deviceId = value.getDeviceId();
BloomFilter<String> bloomFilter = bloomFilterValueState.value();
Integer uvCount = uvState.value();
Integer pvCount = pvState.value();
Integer liveCount = onlineCount.value();
//防止在线人数抛出NullPointException
if(liveCount==null){
liveCount=0;
}
//获取系统处理时间
Long ts = ctx.timerService().currentProcessingTime();
//定时任务十秒钟执行一次
Long firstTime=ts-ts%10000+10000;
//注册处理时间
ctx.timerService().registerProcessingTimeTimer(firstTime);
//进入直播间
if("liveEnter".equals(value.getEventId())){
if(bloomFilt er==null){
bloomFilter=BloomFilter.create(Funnels.unencodedCharsFunnel(),100000);
uvCount=0;
pvCount=0;
}
if(!bloomFilter.mightContain(deviceId)){
bloomFilter.put(deviceId);
uvCount++;
bloomFilterValueState.update(bloomFilter);
uvState.update(uvCount);
}
pvCount++;
pvState.update(pvCount);
liveCount++;
onlineCount.update(liveCount);
}else {
liveCount--;
onlineCount.update(liveCount);
}
//获取事件时间封装进dataBean中
String dateTime = formatDT.format(new Date(value.getTimestamp()));
String[] dt = dateTime.split("-");
value.setDate(dt[0]);
value.setHour(dt[1]);
//主流输出
out.collect(value);
}
//定时任务触发
@Override
public void onTimer(long timestamp, KeyedProcessFunction<String, DataBean, DataBean>.OnTimerContext ctx, Collector<DataBean> out) throws IOException {
//侧流输出
String date = formatDT.format(timestamp).split("-")[0];
//侧输出主播id pv uv 在线人数 到redis中
ctx.output(outputTag,Tuple4.of(ctx.getCurrentKey()+"_"+date,uvState.value(), pvState.value(),onlineCount.value()));
}
@Override
public void close() throws Exception {
super.close();
}
//侧流标签
public OutputTag<Tuple4<String, Integer, Integer, Integer>> getOutputTag() {
return outputTag;
}
}
- 计算代码(把anchor_id、pv、uv、onlineCount侧输出进redis中,把DataBean基本信息输出到clickhouse)
package com.zwf.jobs;
import com.zwf.kafka.kafkaToUniqueId;
import com.zwf.pojo.DataBean;
import com.zwf.udf.AnchorDistinctTotalAudienceFunc;
import com.zwf.udf.JsonToDataBeanV2;
import com.zwf.udf.LocationFunction;
import com.zwf.utils.FlinkUtils;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
import java.sql.Timestamp;
import java.util.concurrent.TimeUnit;
/**
* @author MrZeng
* @version 1.0
* @date 2024-01-18 9:50
*/
public class LiveAudienceCount {
public static void main(String[] args) throws Exception {
DataStream<Tuple2<String, String>> dataStream = FlinkUtils.createKafkaV2(args[0], kafkaToUniqueId.class);
//获取amap配置信息 以及 clickhouse信息
String amapKey = FlinkUtils.param.get("amap.http.key");
String amapUrl = FlinkUtils.param.get("amap.http.url");
SingleOutputStreamOperator<DataBean> process =
dataStream.process(new JsonToDataBeanV2());
//过滤出eventId以live开头的事件
SingleOutputStreamOperator<DataBean> liveData = process.filter(bean -> bean.getEventId().startsWith("live"));
//过滤出进入直播间和离开直播间的事件数据
SingleOutputStreamOperator<DataBean> leaveOrEnterData = liveData.filter(data -> "liveLeave".equals(data.getEventId()) || "liveEnter".equals(data.getEventId()));
//根据主播id分组
KeyedStream<DataBean, String> anchorData = leaveOrEnterData.keyBy(bean -> bean.getProperties().get("anchor_id").toString());
//求出 每个直播间(anchorId)的pv和uv数 在线人数
AnchorDistinctTotalAudienceFunc audienceFunc = new AnchorDistinctTotalAudienceFunc();
//主流输出
SingleOutputStreamOperator<DataBean> mainStream = anchorData.process(audienceFunc);
//侧流输出到redis中
FlinkJedisPoolConfig redisConf = new FlinkJedisPoolConfig.Builder().setHost("192.168.147.110").setDatabase(5)
.setPort(6379).setPassword("root@123456").build();
mainStream.getSideOutput(audienceFunc.getOutputTag())
.addSink(new RedisSink<Tuple4<String, Integer, Integer, Integer>>(redisConf,new LiveToTupleData()));
//主流存入clickhouse
int batchSize = FlinkUtils.param.getInt("clickhouse.batch.size");
long batchInterval = FlinkUtils.param.getLong("clickhouse.batch.interval");
//进行经纬度切换
AsyncDataStream.unorderedWait(mainStream,new LocationFunction(amapUrl,amapKey,50),5, TimeUnit.SECONDS)
.addSink(JdbcSink.sink("insert into tb_anchor_audience_count(id,deviceId,eventId,anchor_id,os,province,channel,deviceType,eventTime,date,hour)\n" +
" values(?,?,?,?,?,?,?,?,?,?,?)",(pts, t)->{
pts.setString(1,t.getId());
pts.setString(2,t.getDeviceId());
pts.setString(3,t.getEventId());
pts.setString(4,t.getProperties().get("anchor_id").toString());
pts.setString(5,t.getOsName());
pts.setString(6,t.getProvince()!=null?t.getProvince():"");
pts.setString(7,t.getReleaseChannel());
pts.setString(8,t.getDeviceType());
pts.setTimestamp(9,new Timestamp(t.getTimestamp()));
pts.setString(10,t.getDate());
pts.setString(11,t.getHour());
}, JdbcExecutionOptions.builder()
.withBatchSize(batchSize)
.withBatchIntervalMs(batchInterval).build()
,new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl("jdbc:clickhouse://192.168.147.110:8123/lives")
.withUsername("default")
.withPassword("123456")
.withDriverName("com.clickhouse.jdbc.ClickHouseDriver").build()));
FlinkUtils.env.execute();
}
public static class LiveToTupleData implements RedisMapper<Tuple4<String, Integer, Integer, Integer>> {
@Override
public RedisCommandDescription getCommandDescription() {
//使用HSET数据格式 大key是LiveToUvPvOnline
return new RedisCommandDescription(RedisCommand.HSET,"LiveToUvPvOnline");
}
//1_20220414 2_20240302
@Override
public String getKeyFromData(Tuple4<String, Integer, Integer, Integer> dataKey) {
return dataKey.f0;
}
@Override
public String getValueFromData(Tuple4<String, Integer, Integer, Integer> dataValue) {
return dataValue.f1+"_"+dataValue.f2+"_"+dataValue.f3;
}
}
}
- SQL版本(clickhouse上操作)
-- 查询实时在线人数
select count(eventId) as onlineNum
from (
select
if(eventId = 'liveEnter', 1, -1) as eventId,
anchor_id
from tb_anchor_audience_count
)
group by anchor_id;
-- 计算pv uv
select
anchor_id,
count(distinct deviceId) as uv,
count(id) as pv
from tb_anchor_audience_count
where eventId='liveEnter'
group by anchor_id;
optimize table tb_anchor_audience_count;
8、直播间礼物分析
按照主播(直播间)统计礼物的积分!
- 数据格式(直播奖励)
{
"carrier":"中国电信",
"deviceId":"9465bb37-deb5-4981-95e8-4c827e6e0104",
"deviceType":"MI-NOTE",
"eventId":"liveReward",
"id":1673,
"isNew":0,
"lastUpdate":2021,
"latitude":32.205310250750024,
"longitude":114.50768436766687,
"netType":"3G",
"osName":"android",
"osVersion":"8.5",
"properties":{
"room_id":100001,
"anchor_id":1,
"gift_id":2,
"live_session":"d5Nc8tGO"
},
"releaseChannel":"小米应用商店",
"resolution":"1024*768",
"sessionId":"W1HuGkvJ43GD",
"timestamp":1616309371940
}
- 实现方案
在MySQL中还有一种礼物表(维表),需要进行关联。
关联维表的解决方案:
- 每来一条数据查一次数据库(慢、吞吐量低)
- 可以使用异步IO(相对快,消耗资源多)
- 广播State(最快、适用于少量数据、数据可以变化的)(推荐使用)
- 统计的具体指标
各个主播的收到礼物的积分值
打赏礼物的数量、受欢迎礼物topN
做多维度的指标统计(ClickHouse)
- MySQL连接驱动依赖
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.16</version>
</dependency>
- 编写MySQLSource类,拉取MySQL数据库中的维度数据进行广播与kafka中的数据进行联接。
package com.zwf.source;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import java.sql.*;
/**
* @author MrZeng
* @version 1.0
* @date 2024-01-18 21:27
* 拉取MySQL数据库中的维度数据 进行广播 与kafka中的数据进行联接
*/
public class MySQLSource extends RichSourceFunction<Tuple4<Integer,String,Double,Integer>> {
private Connection connection=null;
private PreparedStatement pst=null;
private ResultSet rs=null;
boolean flag=true;
//建立jdbc连接配置信息
@Override
public void open(Configuration parameters) throws Exception {
connection=DriverManager.getConnection("jdbc:mysql://master:3306/lives?characterEncoding=utf8&serverTimeZone=Asia/Shanghai","root","Root@123456.");
}
@Override
public void run(SourceContext<Tuple4<Integer, String, Double, Integer>> ctx) throws Exception {
//初始化时间戳
long timeStamp=0;
while (flag){
String sql="select id,name,points,deleted from tb_live_gift where updateTime>? "+(timeStamp==0?" and deleted=0":"");
pst = connection.prepareStatement(sql);
//查询最新时间的数据
pst.setDate(1,new Date(timeStamp));
timeStamp=System.currentTimeMillis();
rs = pst.executeQuery();
while (rs.next()){
int id = rs.getInt(1);
String name = rs.getString(2);
double points = rs.getDouble(3);
int deleted = rs.getInt(4);
ctx.collect(Tuple4.of(id,name,points,deleted));
}
rs.close();
Thread.sleep(10000);
}
}
//取消run方法 建立连接时是一个无界流
@Override
public void cancel() {
flag=false;
try {
pst.close();
} catch (SQLException e) {
throw new RuntimeException(e);
}
try {
connection.close();
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
}
- 工作类(mysqlstream广播与kafka流进行联接操作,最后分组求和!)
package com.zwf.jobs;
/**
* @author MrZeng
* @version 1.0
* @date 2024-01-18 22:00
*/
import com.zwf.kafka.kafkaToUniqueId;
import com.zwf.pojo.DataBean;
import com.zwf.source.MySQLSource;
import com.zwf.udf.BroadCastGiftRewardFun;
import com.zwf.udf.JsonToDataBeanV2;
import com.zwf.utils.FlinkUtils;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
/**
* 直播间礼物积分数
*/
public class GiftPointCount {
public static void main(String[] args) throws Exception {
DataStreamSource<Tuple4<Integer, String, Double, Integer>> mysqlSource = FlinkUtils.env.addSource(new MySQLSource());
//广播变量(id Tuple.of(name,points)) 数据库字段
MapStateDescriptor broadcast_mysql = new MapStateDescriptor("broadcast_mysql", TypeInformation.of(new TypeHint<Integer>() {
}), TypeInformation.of(new TypeHint<Tuple2<String, Double>>() {}));
//把Mysql中的数据当做广播流
BroadcastStream<Tuple4<Integer, String, Double, Integer>> broadcast =
mysqlSource.broadcast(broadcast_mysql);
//从kafka中读取数据进行广播窗口联接
DataStream<Tuple2<String, String>> kafkaStream = FlinkUtils.createKafkaV2(args[0], kafkaToUniqueId.class);
SingleOutputStreamOperator<DataBean> processed = kafkaStream.process(new JsonToDataBeanV2());
//直播打赏
SingleOutputStreamOperator<DataBean> filter = processed.filter(x -> "liveReward".equals(x.getEventId()));
//广播变量 <gift_id,Tuple2.of(name,points)>
filter.connect(broadcast).process(new BroadCastGiftRewardFun(broadcast_mysql))
.keyBy(line->Tuple2.of(line.f0,line.f1),Types.TUPLE(Types.INT,Types.STRING)).sum(2).print();
FlinkUtils.env.execute();
}
}
9、商品浏览排名(TopN)
统计10分钟内,各个分类、各种事件类型的热门商品!
- 实现方案
实现技术:为了将一段时间的数据攒起来,才能统计出TopN。
窗口选择:统计10分钟,每隔一分钟统计一次结果,为了得到的结果比较准确(平滑)我们使用滑动窗口。窗口的长度和滑动的步长可以根据实际情况进行调整。
时间类型:为了精确的统计出用户在实际浏览、加入购物车、下单等热门商品的信息的变化情况,使用EventTime类型的窗口。
- Flink SQL案例
- 数据格式
{
"carrier":"中国联通",
"deviceId":"e9535ff8-4e58-417f-b2fe-8ed261a0a97c",
"deviceType":"IPHONE-10",
"eventId":"productView",
"id":1718,
"isNew":1,
"latitude":40.53040777609875,
"longitude":112.49708971837704,
"netType":"4G",
"osName":"ios",
"osVersion":"9.2",
"properties":{
"room_id":100001,
"anchor_id":1,
"category_id":1008009,
"product_id":1006002,
"brand_id":"0"
},
"releaseChannel":"AppStore",
"resolution":"2048*1024",
"sessionId":"UNICsYI45Hwb",
"timestamp":1616309372366
}
- 字段含义
eventId:事件类型,productView(浏览)、productAddCart:(加入购物车)、productOrder(下单)
timestamp:事件时间,因为要按照EventTime划分窗口
category_id:商品分类
product_id:商品ID
- 实现步骤
- 对数据进行KeyBy。
- 划分窗口。
- 在窗口中进行聚合。
- 排序(倒排)。
- 代码实现(Job类)
package com.zwf.jobs;
/**
* @author MrZeng
* @version 1.0
* @date 2024-01-19 10:31
*/
import com.zwf.kafka.kafkaToUniqueId;
import com.zwf.pojo.DataBean;
import com.zwf.udf.HotGoodsAggregateFunction;
import com.zwf.udf.HotGoodsWindowFunction;
import com.zwf.udf.JsonToDataBeanV2;
import com.zwf.udf.TopNWinFunction;
import com.zwf.utils.FlinkUtils;
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import java.time.Duration;
/**
* 热门商品排名
*/
public class HotGoodsCount {
public static void main(String[] args) throws Exception {
//获取kafka中的数据
DataStream<Tuple2<String, String>> KafkaSource = FlinkUtils.createKafkaV2(args[0], kafkaToUniqueId.class);
FlinkUtils.env.setParallelism(3);
SingleOutputStreamOperator<DataBean> process = KafkaSource.process(new JsonToDataBeanV2());
//过滤出事件Id以product开头的数据
SingleOutputStreamOperator<DataBean> filterData = process.filter(data -> data.getEventId().equals("productView")||data.getEventId().equals("productAddCart"));
//设置水位线 允许迟到5s时间
SingleOutputStreamOperator<DataBean> watermarks = filterData.assignTimestampsAndWatermarks(WatermarkStrategy.<DataBean>forBoundedOutOfOrderness(Duration.ofMillis(5000))
.withTimestampAssigner(new SerializableTimestampAssigner<DataBean>() {
@Override
public long extractTimestamp(DataBean element, long recordTimestamp) {
return element.getTimestamp();
}
}));
//对事件ID和商品分类以及商品分组
KeyedStream<DataBean, Tuple3<String,Integer, Integer>> keyedStream = watermarks.keyBy(line -> {
Integer categoryId = Integer.parseInt(line.getProperties().get("category_id").toString());
Integer productId =Integer.parseInt(line.getProperties().get("product_id").toString());
String eventId = line.getEventId();
return Tuple3.of(eventId, categoryId, productId);
//窗口大小是10分钟 滑动间隔是1分钟
//增量聚合计算(reduce、aggregate,sum) 不要全量聚合计算(process)
}, Types.TUPLE(Types.STRING, Types.INT, Types.INT));
//同一个类别商品的浏览量
keyedStream.window(SlidingEventTimeWindows.of(Time.minutes(10),Time.minutes(1)))
.aggregate(new HotGoodsAggregateFunction(),new HotGoodsWindowFunction())
.keyBy(itemBean->{
String eventId = itemBean.eventId;
Integer categoryId = itemBean.categoryId;
Long winStart = itemBean.winStart;
Long winEnd = itemBean.winEnd;
return Tuple4.of(eventId,categoryId,winStart,winEnd);
},Types.TUPLE(Types.STRING,Types.INT, Types.LONG,Types.LONG))
.process(new TopNWinFunction()).print();
FlinkUtils.env.execute();
}
}
- 自定义聚合函数(HotGoodsAggregateFunction)
package com.zwf.udf;
import com.zwf.pojo.DataBean;
import org.apache.flink.api.common.functions.AggregateFunction;
/**
* @author MrZeng
* @version 1.0
* @date 2024-01-19 11:21
*/
public class HotGoodsAggregateFunction implements AggregateFunction<DataBean,Long,Long> {
@Override
public Long createAccumulator() {
return 0L;
}
@Override
public Long add(DataBean value, Long accumulator) {
return accumulator+1;
}
@Override
public Long getResult(Long accumulator) {
return accumulator;
}
@Override
public Long merge(Long a, Long b) {
return null;
}
}
- 自定义窗口函数(获取窗口和自定义聚合函数的值)
package com.zwf.udf;
import com.zwf.pojo.DataBean;
import com.zwf.pojo.ItemEventCount;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
/**
* @author MrZeng
* @version 1.0
* @date 2024-01-19 11:26
*/
/**
* Long: 商品点击量 Input
* ItemEventCount 商品选项信息 Out
* Tuple3<String,Integer,Integer>事件ID 商品分类ID 商品ID keyedValue
*/
public class HotGoodsWindowFunction implements WindowFunction<Long, ItemEventCount, Tuple3<String,Integer,Integer>, TimeWindow> {
@Override
public void apply(Tuple3<String, Integer, Integer> data, TimeWindow window, Iterable<Long> input, Collector<ItemEventCount> out) throws Exception {
String eventId = data.f0;
Integer categoryId = data.f1;
Integer productId = data.f2;
//获取传入的数据 自定义聚合函数的增量结果 0 1 2 3 4 5 6
Long next = input.iterator().next();
out.collect(new ItemEventCount(eventId,categoryId,productId,next, window.getStart(), window.getEnd()));
}
}
- 自定义排名窗口函数(TopNWinFunction)
package com.zwf.udf;
import com.zwf.pojo.ItemEventCount;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import java.util.ArrayList;
import java.util.List;
/**
* @author MrZeng
* @version 1.0
* @date 2024-01-19 12:08
*/
public class TopNWinFunction extends KeyedProcessFunction<Tuple4<String,Integer,Long,Long>, ItemEventCount, List<ItemEventCount>> {
//定义状态存储前三个对象值
private transient ValueState<List<ItemEventCount>> valueStateList;
@Override
public void open(Configuration parameters) throws Exception {
ValueStateDescriptor<List<ItemEventCount>> stateDescriptor = new ValueStateDescriptor<>("top-N-ItemEventCount", TypeInformation.of(new TypeHint<List<ItemEventCount>>() {}));
valueStateList=getRuntimeContext().getState(stateDescriptor);
}
@Override
public void processElement(ItemEventCount value, KeyedProcessFunction<Tuple4<String, Integer, Long, Long>, ItemEventCount, List<ItemEventCount>>.Context ctx, Collector<List<ItemEventCount>> out) throws Exception {
//获取状态中的值
List<ItemEventCount> ivc = valueStateList.value();
if(ivc==null){
ivc=new ArrayList<>();
}
ivc.add(value);
valueStateList.update(ivc);
//窗口计算完后触发排序
ctx.timerService().registerEventTimeTimer(value.winEnd+1);
}
//进行排序
@Override
public void onTimer(long timestamp, KeyedProcessFunction<Tuple4<String, Integer, Long, Long>, ItemEventCount, List<ItemEventCount>>.OnTimerContext ctx, Collector<List<ItemEventCount>> out) throws Exception {
//获取状态中的排序对象
List<ItemEventCount> eventCounts = valueStateList.value();
eventCounts.sort((a,b)->Long.compare(a.count,b.count));
//取前3个值
ArrayList<ItemEventCount> topN = new ArrayList<>();
for(int i=0;i<Math.min(3,eventCounts.size());i++){
topN.add(eventCounts.get(i));
}
out.collect(topN);
}
@Override
public void close() throws Exception {
super.close();
}
}
- pojo类(ItemEventCount)
package com.zwf.pojo;
/**
* @author MrZeng
* @version 1.0
* @date 2024-01-19 11:29
*/
public class ItemEventCount {
public String eventId;
public Integer categoryId;
public Integer productId;
public Long count;
public Long winStart;
public Long winEnd;
public ItemEventCount() {
}
public ItemEventCount(String eventId, Integer categoryId, Integer productId, Long count, Long winStart, Long winEnd) {
this.eventId = eventId;
this.categoryId = categoryId;
this.productId = productId;
this.count = count;
this.winStart = winStart;
this.winEnd = winEnd;
}
@Override
public String toString() {
return "ItemEventCount{" +
"eventId='" + eventId + '\'' +
", categoryId=" + categoryId +
", productId=" + productId +
", count=" + count +
", winStart=" + winStart +
", winEnd=" + winEnd +
'}';
}
}
四、业务数据实时采集
业务数据库就是MySQL(集群),假设直接在业务数据库中写SQL进行查询,如果复杂的查询(多维度聚合、join、并且数据流比较大)业务数据库的性能就会下降。甚至不能完成正常的业务功能(不能完成普通的业务数据的查询、插入、修改或是性能下降)
- 离线和实时数据采集链路
离线:MySQL/Oracle=>Sqoop/SparkSQL/DataX=>HDFS(Hive)
实时:MySQL=>cannel(数据同步工具)=>kafka(消息队列)
1、Cannal采集方案
Cannal是基于数据库的binlog技术,进行业务数据同步拉取,因此在使用cannal工具时一定要开启binlog写入功能,配置binlog模式为row。(Mysql8.0及其以上默认开启binlog技术)
缺点:无法同步历史数据,只能同步最新数据。如果要同步全量数据,应该先开启canal再重新导入全部SQL数据,就会进行数据的全量同步。
- 修改MySQL配置文件
vim /etc/my.cnf
- 配置文件内容如下:
[mysqld]
log-bin=mysql-bin #添加这⼀⾏就ok
binlog-format=ROW #选择row模式
server_id=1 #配置mysql replaction需要定义,不能和canal的slaveId重复
- 重启MySQL
service mysqld restart
- 在Mysql中添加一个新的用户(可选:也可以使用Root账户,只需要在配置文件中配置即可)
CREATE USER canal IDENTIFIED BY '123456';
可选:如果因为数据库的安全级别对密码要求比较高,无法正常创建用户,需要修改安全策略和密码策略在添加用户(生成环境不用修改,要使用复杂的密码)
set global validate_password_policy=LOW;
set global validate_password_length=6;
- 授权
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
- 刷新权限
FLUSH PRIVILEGES;
2、安装Canal
- 下载Cannal1.1.4版本(canal.deployer-1.1.4.tar.gz)
- 上传到Canal服务器上,然后解压
注意:Cannal可以与MySQL安装在同一台服务器,也可以不在同一台服务器,本次安装在同一台服务器上。
- 修改Canal的主配置文件Cannal.properties
#canal跟kafka整合,将数据发送到kafka
canal.serverMode = kafka
#指定kafka broker地址
canal.mq.servers = node1:9092,node2:9092,master:9092
#数据发送kafka失败重试次数
canal.mq.retries = 10
# 配置元数据
canal.instance.tsdb.url = jdbc:mysql://master:3306/canal_tsdb?characterEncoding=utf-8&serverTimeZone=UTC&useSSL=false
canal.instance.tsdb.dbUsername = root
canal.instance.tsdb.dbPassword = Root@123456.
canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xml
- 修改canal的实例配置⽂件${CANAL_HOME}/example/instance.propertie
#mysql数据库的地址
canal.instance.master.address=master:3306
#mysql用户名
canal.instance.dbUsername=root
#mysql密码
canal.instance.dbPassword=Root@123456.
#注释掉使用默认的topic(将数据写入到默认的Topic)
canal.mq.topic=example
# dynamic topic route by schema or table regex
#将lives的数据库的tb_order表发送到kafka的order-main的topic
canal.mq.dynamicTopic=order-main:lives\\.tb_order,order-detail:lives\\.tb_order_goods
# 配置存放kafka主题的分区数量
canal.mq.partitionsNum=3
- 启动Canal
bin/startup.sh
- 开启kafka消费者进行测试消费
kafka-console-consumer.sh --bootstrap-server node1:9092,master:9092,node2:9092 --topic order-main --from-beginning
3、实时统计订单相关指标
-
相关分析指标:
-
直播间主播带货总金额、商品成交(下单)数量
-
直播间主播带商品各个分类的成交金额、商品成交(下单)数量
-
一天内中的成交金额
-
一天各个分类成交金额(维度:省份、操作系统、手机型...)
-
各种下单后商品的状态跟踪(下单 -> 支付 -> 发货 -> 签收 -> 评价)(下单 -> 用户取消 -> 系统取消)
-
-
需求分析(各个分类成交金额)
用户表:tb_live_user (用户ID)
订单主表:tb_order (订单ID、用户ID、订单状态、订单金额、下单时间、更新时间)
订单明细表:tb_order_gooods(订单主表ID、sku、数量、单价、分类ID、直播间ID)
主播房间表: tb_live_room(直播间ID、主播ID)
主播表: tb_live_anchor (主播ID)
- 需求所需数据
订单状态为成交(主表 tb_order--->order-main)
商品的分类ID(明细表 tb_order_goods--->order-detail)
商品的金额=单价*数量(明细表 tb_order_goods--->order-detail)
- 实现方案
需要对双流进行left join,把tb_order_goods数据流作为左表,把tb_order数据流作为右表,Flink中可以使用coGroup方法来实现。
- 可能出现的问题
1、右表迟到(订单明细,null)
2、左右表无迟到(订单明细、订单主表)
3、左表迟到的数据(订单明细 侧流输出)
- 代码实现
package com.zwf.jobs;
import com.zwf.pojo.OrderDetail;
import com.zwf.pojo.OrderMain;
import com.zwf.udf.JsonToOrderDetail;
import com.zwf.udf.JsonToOrderMain;
import com.zwf.udf.OrderLeftJoinFunc;
import com.zwf.utils.FlinkUtils;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import java.sql.*;
import java.time.Duration;
/**
* 左表是明细表 使用left join必须有左表,如果左表数据遗漏,可以侧流输出,再与CoGroup进行union,如果右表
* 不存在可以异步IO请求访问Mysql进行查询!
* 左表是大表 右表是小表 大表join小表
* ctrl+alt+F7
*/
/**
* @author MrZeng
* @version 1.0
* @date 2024-01-20 18:58
*/
public class OrderCount {
public static void main(String[] args) throws Exception {
ParameterTool tools=ParameterTool.fromPropertiesFile(args[0]);
FlinkUtils.env.setParallelism(1);
//获取配置文件中两个主题的配置文件
String orderMain = tools.getRequired("kafka.input.main");
String orderDetail = tools.getRequired("kafka.input.detail");
//orderMain数据流
DataStream<String> rightDataStream = FlinkUtils.createKafkaV2(tools, orderMain, SimpleStringSchema.class);
//orderDetail数据流
DataStream<String> leftDataStream = FlinkUtils.createKafkaV2(tools, orderDetail, SimpleStringSchema.class);
//过滤出type=insert/update/delete 并把json封装到javaBean对象中
SingleOutputStreamOperator<OrderMain> orderMainStream = rightDataStream.process(new JsonToOrderMain());
SingleOutputStreamOperator<OrderDetail> orderDetailStream = leftDataStream.process(new JsonToOrderDetail());
//设置水位线 并使用更新时间作为事件时间
SingleOutputStreamOperator<OrderMain> orderMainWaterMark = orderMainStream.assignTimestampsAndWatermarks(WatermarkStrategy.<OrderMain>forBoundedOutOfOrderness(Duration.ofSeconds(2))
.withTimestampAssigner(new SerializableTimestampAssigner<OrderMain>() {
@Override
public long extractTimestamp(OrderMain element, long recordTimestamp) {
//orderMain中的更新时间戳作为事件时间
return element.getUpdate_time().getTime();
}
}));
SingleOutputStreamOperator<OrderDetail> orderDetailWaterMark = orderDetailStream.assignTimestampsAndWatermarks(WatermarkStrategy.<OrderDetail>forBoundedOutOfOrderness(Duration.ofSeconds(2))
.withTimestampAssigner(new SerializableTimestampAssigner<OrderDetail>() {
@Override
public long extractTimestamp(OrderDetail element, long recordTimestamp) {
//orderDetail中的更新时间戳作为事件时间
return element.getUpdate_time().getTime();
}
}));
OutputTag<OrderDetail> outputTag = new OutputTag<>("order-detail", TypeInformation.of(new TypeHint<OrderDetail>() {})) {};
//对order_id进行分组 //使用滚动窗口 每5s滚动一次
//左表迟到数据侧流输出
SingleOutputStreamOperator<OrderDetail> orderDetailWindow = orderDetailWaterMark.keyBy(bean -> bean.getOrder_id())
//5s 滚动一次窗口
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
//迟到数据侧流输出
.sideOutputLateData(outputTag)
.apply(new WindowFunction<OrderDetail, OrderDetail, Long, TimeWindow>() {
@Override
public void apply(Long aLong, TimeWindow window, Iterable<OrderDetail> input, Collector<OrderDetail> out) throws Exception {
for (OrderDetail detail : input) {
out.collect(detail);
}
}
});
//获取侧流输出数据
DataStream<OrderDetail> sideOutput = orderDetailWindow.getSideOutput(outputTag);
//侧流数据输出处理
SingleOutputStreamOperator<Tuple2<OrderDetail, OrderMain>> lateOrderDetailStream = sideOutput.map(new MapFunction<OrderDetail, Tuple2<OrderDetail, OrderMain>>() {
@Override
public Tuple2<OrderDetail, OrderMain> map(OrderDetail value) throws Exception {
return Tuple2.of(value, null);
}
});
//窗口联接join coGroup orderDetail做左表 orderMain做右表
//两个设置水位线的表进行left join
DataStream<Tuple2<OrderDetail, OrderMain>> JoinedStream = orderDetailWaterMark.coGroup(orderMainWaterMark)
.where(line -> line.getOrder_id())
.equalTo(bean -> bean.getOid())
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.apply(new OrderLeftJoinFunc());
//将CoGroup的流和迟到的流,Union在一起 使用相同的方式处理 迟到的流也是左表
DataStream<Tuple2<OrderDetail, OrderMain>> unionStream = JoinedStream.union(lateOrderDetailStream);
//将没有关联的右表数据 查询数据关联左表数据
SingleOutputStreamOperator<Tuple2<OrderDetail, OrderMain>> res = unionStream.map(new MapFunction<Tuple2<OrderDetail, OrderMain>, Tuple2<OrderDetail, OrderMain>>() {
@Override
public Tuple2<OrderDetail, OrderMain> map(Tuple2<OrderDetail, OrderMain> value) throws Exception {
//如果右表为空 通过order_id查询数据库中orderMain的数据
if (value.f1 == null) {
//根据orderId查询OrderMain数据赋值给value.f1
value.f1=QueryJDBCOrderMain.getQueryOrderMain(value.f0.getOrder_id());
}
return value;
}
});
res.print();
FlinkUtils.env.execute();
}
private static class QueryJDBCOrderMain{
private static Connection connection=null;
private static PreparedStatement pst=null;
private static ResultSet rs=null;
public static OrderMain getQueryOrderMain(Long order_id){
try {
Class.forName("com.mysql.cj.jdbc.Driver");
connection=DriverManager.getConnection("jdbc:mysql://master:3306/lives?characterEncoding=utf-8&serverTimeZone=UTC","root","Root@123456.");
String sql="select * from orderMain where oid=? ";
pst = connection.prepareStatement(sql);
pst.setLong(1,order_id);
rs=pst.executeQuery();
OrderMain main=null;
while (rs.next()){
main= new OrderMain();
main.setOid(rs.getLong(1));
main.setCreate_time(rs.getDate(2));
main.setTotal_money(rs.getDouble(3));
main.setStatus(rs.getShort(4));
main.setUpdate_time(rs.getDate(5));
main.setUid(rs.getLong(6));
main.setProvince(rs.getString(7));
}
return main;
} catch (ClassNotFoundException e) {
throw new RuntimeException(e);
} catch (SQLException e) {
throw new RuntimeException(e);
}finally {
try {
if (rs != null) {
rs.close();
}
if(pst!=null){
pst.close();
}
if(connection!=null){
connection.close();
}
}catch (Exception e){
}
}
}
}
}
- 两个JSON转javaBean的类
package com.zwf.udf;
import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject;
import com.zwf.pojo.OrderMain;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
/**
* @author MrZeng
* @version 1.0
* @date 2024-01-20 19:53
*/
public class JsonToOrderMain extends ProcessFunction<String, OrderMain> {
@Override
public void processElement(String value, ProcessFunction<String, OrderMain>.Context ctx, Collector<OrderMain> out) throws Exception {
try {
//把数据状态从json中获取出来
JSONObject orderMain = JSONObject.parseObject(value);
//获取type值
String type = orderMain.getString("type");
if ("UPDATE".equals(type) || "DELETE".equals(type) || "INSERT".equals(type)) {
//获取json对象数组
JSONArray data = orderMain.getJSONArray("data");
//遍历数组中的对象
for (int i = 0; i < data.size(); i++) {
//获取data中每个对象进行转化
OrderMain main = data.getObject(i, OrderMain.class);
//把状态封装进pojo中
main.setType(type);
//把对象输出
out.collect(main);
}
}
}catch (Exception e){
e.printStackTrace();
}
}
}
package com.zwf.udf;
import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject;
import com.zwf.pojo.OrderDetail;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
/**
* @author MrZeng
* @version 1.0
* @date 2024-01-20 19:53
*/
public class JsonToOrderDetail extends ProcessFunction<String, OrderDetail> {
@Override
public void processElement(String value, ProcessFunction<String, OrderDetail>.Context ctx, Collector<OrderDetail> out) throws Exception {
try {
//把数据状态从json中获取出来
JSONObject orderDetail = JSONObject.parseObject(value);
//获取type值
String type = orderDetail.getString("type");
if ("UPDATE".equals(type) || "DELETE".equals(type) || "INSERT".equals(type)) {
//获取json对象数组
JSONArray data = orderDetail.getJSONArray("data");
//遍历数组中的对象
for (int i = 0; i < data.size(); i++) {
//获取data中每个对象进行转化
OrderDetail detail = data.getObject(i, OrderDetail.class);
//把状态封装进pojo中
detail.setType(type);
//把对象输出
out.collect(detail);
}
}
}catch (Exception e){
e.printStackTrace();
}
}
}
- 两个数据流进行CoGroup时,进行自定义应用
package com.zwf.udf;
import com.zwf.pojo.OrderDetail;
import com.zwf.pojo.OrderMain;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
/**
* @author MrZeng
* @version 1.0
* @date 2024-01-20 20:33
*/
public class OrderLeftJoinFunc implements CoGroupFunction<OrderDetail, OrderMain, Tuple2<OrderDetail,OrderMain>> {
@Override
public void coGroup(Iterable<OrderDetail> first, Iterable<OrderMain> second, Collector<Tuple2<OrderDetail, OrderMain>> out) throws Exception {
for (OrderDetail detail:first){
boolean isJoined=false;
//右表数据存在情况下
for (OrderMain main:second){
isJoined=true;
out.collect(Tuple2.of(detail,main));
}
//右表数据不存在 orderMain数据不存在
if (!isJoined){
out.collect(Tuple2.of(detail,null));
}
}
}
}
- OrderMain和OrderDetail两个Bean对象
package com.zwf.pojo;
/**
* @author MrZeng
* @version 1.0
* @date 2024-01-20 19:36
*/
import java.util.Date;
/**
* 订单表
*/
public class OrderMain {
private Long oid; //订单ID
private Date create_time; //创建时间
private Double total_money; //总金额
private Short status; //状态
private Date update_time; //信息更新时间
private Long uid; //用户id
private String province; //省
private String type; //canal拉取的数据属于update deleted insert create之一
public Long getOid() {
return oid;
}
public void setOid(Long oid) {
this.oid = oid;
}
public Date getCreate_time() {
return create_time;
}
public void setCreate_time(Date create_time) {
this.create_time = create_time;
}
public Double getTotal_money() {
return total_money;
}
public void setTotal_money(Double total_money) {
this.total_money = total_money;
}
public Short getStatus() {
return status;
}
public void setStatus(Short status) {
this.status = status;
}
public Date getUpdate_time() {
return update_time;
}
public void setUpdate_time(Date update_time) {
this.update_time = update_time;
}
public Long getUid() {
return uid;
}
public void setUid(Long uid) {
this.uid = uid;
}
public String getProvince() {
return province;
}
public void setProvince(String province) {
this.province = province;
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
@Override
public String toString() {
return "OrderMain{" +
"oid=" + oid +
", create_time=" + create_time +
", total_money=" + total_money +
", status=" + status +
", update_time=" + update_time +
", uid=" + uid +
", province='" + province + '\'' +
'}';
}
}
package com.zwf.pojo;
/**
* @author MrZeng
* @version 1.0
* @date 2024-01-20 19:36
*/
import java.util.Date;
/**
* 商品详情表
*/
public class OrderDetail {
private Long id; //商品详情ID
private Long order_id; //订单ID
private Integer category_id; //类别ID
private String sku; //商品最小库存单位
private Double money; //商品金额
private Integer amount; //账户号
private Date create_time; //创建时间
private Date update_time; //更新时间
private String type; //canal拉取数据类型
public Long getId() {
return id;
}
public void setId(Long id) {
this.id = id;
}
public Long getOrder_id() {
return order_id;
}
public void setOrder_id(Long order_id) {
this.order_id = order_id;
}
public Integer getCategory_id() {
return category_id;
}
public void setCategory_id(Integer category_id) {
this.category_id = category_id;
}
public String getSku() {
return sku;
}
public void setSku(String sku) {
this.sku = sku;
}
public Double getMoney() {
return money;
}
public void setMoney(Double money) {
this.money = money;
}
public Integer getAmount() {
return amount;
}
public void setAmount(Integer amount) {
this.amount = amount;
}
public Date getCreate_time() {
return create_time;
}
public void setCreate_time(Date create_time) {
this.create_time = create_time;
}
public Date getUpdate_time() {
return update_time;
}
public void setUpdate_time(Date update_time) {
this.update_time = update_time;
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
@Override
public String toString() {
return "OrderDetail{" +
"id=" + id +
", order_id=" + order_id +
", category_id=" + category_id +
", sku='" + sku + '\'' +
", money=" + money +
", amount=" + amount +
", create_time=" + create_time +
", update_time=" + update_time +
'}';
}
}
4、实时统计拼团相关的指标
统计,总共开了多少个团,团的成团率、开团热门分类、团购订单数量、成交的金额、成交的分类金额、成交的区域金额。
-
实时拼团数量(维度:拼团的状态、商品分类[关联商品表获取分类ID])
-
实时拼团的金额(维度:拼团的状态、商品分类[关联商品表获取分类ID])
- 需求分析(实时拼团的金额)
相关表
tb_groupon(拼团主表)
tb_groupon_users(拼团明细表)
tb_order(订单表)
实现方案:
实时拼团的金额:拼团明细表 Left Join 订单主表 Left Join 拼团主表
三个流进行Join(拼团明细表 Left Join 订单主表 Left Join 拼团主表)
问题:Flink窗口的Join、CoGroup不支持多个流在一个窗口内进行Join、CoGroup
解决方案一:
将两个流进行Join、CoGroup,将的得到Join后的流再查数据库关联信息(异步IO,要查数据库,效率较低)
解决方案二(推荐使用,减少数据库查询):
将两个流进行Join、CoGroup,将的得到Join后的流在跟第三流进行JOIN(有两个窗口,在窗口中进行Join,数据是放在WindowState中,效率高一些)
简化后的数据:
拼团主表:主表ID,拼团状态,分类ID
拼团明细表:细表ID,拼团主表ID, 订单ID
订单主表:订单ID、订单总金额
join后的结果
拼团主表ID,订单主表ID,订单总金额,拼团状态,分类ID、(省份)
- 代码实现(使用nc模拟三流join操作)
package com.zwf.jobs;
/**
* @author MrZeng
* @version 1.0
* @date 2024-01-20 21:33
*/
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
import java.time.Duration;
/**
* 先两张表(tb_groupon、tb_groupon_users)进行join得出数据流再和第三张表(tb_order)进行join
*
* //时间、用户ID、拼团主表ID、订单ID (左表) nc端口:8888
* //1000,u1646,p201,o1002
* //5001,u1647,p202,o1003
*
* //时间,拼团ID,拼团状态,分类ID nc端口:9999
* //1000,p201,1,手机
* //5001,p202,1,手机
*
* //时间,订单ID,订单状态,订单金额 nc端口:7777
* //1001,o1002,102,200.0
* //5002,o1003,101,3000.0
*/
public class GroupOnCount {
public static void main(String[] args) throws Exception {
//模拟三表join流
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.setParallelism(1);
DataStreamSource<String> userDetailStream = environment.socketTextStream("node1", 8888);
SingleOutputStreamOperator<Tuple4<Long, String, String, String>> orderDetailWaterMark = userDetailStream.map(lines -> {
String[] split = lines.split(",");
return Tuple4.of(Long.parseLong(split[0]), split[1], split[2], split[3]);
}, Types.TUPLE(Types.LONG,Types.STRING,Types.STRING,Types.STRING)).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple4<Long, String, String, String>>forBoundedOutOfOrderness(Duration.ofSeconds(0))
.withTimestampAssigner((bean, ts) -> bean.f0));
DataStreamSource<String> grouponDetailInfoStream = environment.socketTextStream("node1", 9999);
SingleOutputStreamOperator<Tuple4<Long, String, String, String>> grouponDetailWaterMark = grouponDetailInfoStream.map(lines -> {
String[] split = lines.split(",");
return Tuple4.of(Long.parseLong(split[0]), split[1], split[2], split[3]);
},Types.TUPLE(Types.LONG,Types.STRING,Types.STRING,Types.STRING)).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple4<Long, String, String, String>>forBoundedOutOfOrderness(Duration.ofSeconds(0))
.withTimestampAssigner((bean, ts) -> bean.f0));
DataStreamSource<String> orderDetailInfoStream = environment.socketTextStream("node1", 7777);
SingleOutputStreamOperator<Tuple4<Long, String, String, Double>> orderDetailDetailWaterMark = orderDetailInfoStream.map(lines -> {
String[] split = lines.split(",");
return Tuple4.of(Long.parseLong(split[0]), split[1], split[2], Double.parseDouble(split[3]));
},Types.TUPLE(Types.LONG,Types.STRING,Types.STRING,Types.DOUBLE)).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple4<Long, String, String, Double>>forBoundedOutOfOrderness(Duration.ofSeconds(0))
.withTimestampAssigner((bean, ts) -> bean.f0));
//表进行coGroupJoin
DataStream<Tuple2<Tuple4<Long, String, String, String>, Tuple4<Long, String, String, String>>> firstJoin = orderDetailWaterMark.coGroup(grouponDetailWaterMark)
.where(o -> o.f2)
.equalTo(u -> u.f1)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.apply(new CoGroupFunction<Tuple4<Long, String, String, String>, Tuple4<Long, String, String, String>, Tuple2<Tuple4<Long, String, String, String>, Tuple4<Long, String, String, String>>>() {
@Override
public void coGroup(Iterable<Tuple4<Long, String, String, String>> first, Iterable<Tuple4<Long, String, String, String>> second, Collector<Tuple2<Tuple4<Long, String, String, String>, Tuple4<Long, String, String, String>>> out) throws Exception {
boolean joined = false;
for (Tuple4<Long, String, String, String> f : first) {
for (Tuple4<Long, String, String, String> s : second) {
joined = true;
out.collect(Tuple2.of(f, s));
}
if (!joined) {
out.collect(Tuple2.of(f, null));
}
}
}
});
DataStream<Tuple2<Tuple2<Tuple4<Long, String, String, String>, Tuple4<Long, String, String, String>>, Tuple4<Long, String, String, Double>>> res = firstJoin.coGroup(orderDetailDetailWaterMark)
.where(f -> f.f0.f3)
.equalTo(s -> s.f1)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.apply(new CoGroupFunction<Tuple2<Tuple4<Long, String, String, String>, Tuple4<Long, String, String, String>>, Tuple4<Long, String, String, Double>, Tuple2<Tuple2<Tuple4<Long, String, String, String>, Tuple4<Long, String, String, String>>, Tuple4<Long, String, String, Double>>>() {
@Override
public void coGroup(Iterable<Tuple2<Tuple4<Long, String, String, String>, Tuple4<Long, String, String, String>>> first, Iterable<Tuple4<Long, String, String, Double>> second, Collector<Tuple2<Tuple2<Tuple4<Long, String, String, String>, Tuple4<Long, String, String, String>>, Tuple4<Long, String, String, Double>>> out) throws Exception {
boolean joined = false;
for (Tuple2<Tuple4<Long, String, String, String>, Tuple4<Long, String, String, String>> f : first) {
for (Tuple4<Long, String, String, Double> s : second) {
joined = true;
out.collect(Tuple2.of(f, s));
}
if (!joined) {
out.collect(Tuple2.of(f, null));
}
}
}
});
res.print();
environment.execute();
}
}
五、Drools规则引擎
1、规则引擎
Java规则引擎起源于基于规则的专家系统,而基于规则的专家系统又是专家系统的其中一个分支。专家系统属于人工智能的范畴,它模仿人类的推理方式,使用试探性的方法进行推理,并使用人类能理解的术语解释和证明它的推理结论。
2、规则引擎优点
声明式编程:使用规则更加容易对复杂的问题进行表述,并得到验证(sql语法)。
逻辑与数据分离:数据保存在系统对象中,逻辑保存在规则中。这根本性的打破了面向对象系统中将数据和 逻辑耦合起来的局面。
速度及可测量性:Drools的Rete、Leaps算法,提供了对系统数据对象非常有效率的匹配,这些算法经过了大 量实际考验的证明。
3、规则语法
package 包名
rule "规则名"
when
(条件) - 也叫作规则的 LHS(Left Hand Side)
then
(动作/结果) - 也叫作规则的 RHS(Right Hand Side)
end
4、快速入门案例
- Drools依赖
<!--drools规则引擎-->
<dependency>
<groupId>org.drools</groupId>
<artifactId>drools-compiler</artifactId>
<version>7.23.0.Final</version>
</dependency>
- 快速入门案例
package com.zwf.drools;
import com.zwf.droolspojo.Event;
import org.apache.commons.io.FileUtils;
import org.kie.api.io.ResourceType;
import org.kie.api.runtime.KieSession;
import org.kie.internal.utils.KieHelper;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
/**
* @author MrZeng
* @version 1.0
* @date 2024-01-21 10:46
*/
public class DroolsDemo {
public static void main(String[] args) {
//获取drools工具类
KieHelper kieHelper = new KieHelper();
try {
//读取规则文件中的内容
String rules = FileUtils.readFileToString(new File("rules/first-demo.drl"), StandardCharsets.UTF_8);
//加入规则信息
kieHelper.addContent(rules, ResourceType.DRL);
//创建规则匹配会话
KieSession kieSession = kieHelper.build().newKieSession();
//向会话中传入数据进行匹配
Event event = new Event("view",20,false);
//传入用户行为数据
kieSession.insert(event);
//启动所有规则
kieSession.fireAllRules();
//获取规则结果
System.out.println(event.isFlag());
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
- first-demo.drl
dialect "java"
import com.zwf.droolspojo.Event
rule "first-demo"
when
$e:Event(type=="view"&& count>=2)
then
System.out.println("优惠券发放成功!");
$e.setFlag(true);
end
5、Flink整合Drools案例
package com.zwf.drools;
import com.zwf.droolspojo.Event;
import org.apache.commons.io.FileUtils;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.kie.api.io.ResourceType;
import org.kie.api.runtime.KieSession;
import org.kie.internal.utils.KieHelper;
import java.io.File;
import java.nio.charset.StandardCharsets;
/**
* @author MrZeng
* @version 1.0
* @date 2024-01-21 14:34
*/
public class FlinkDroolsDemo {
//用户ID,商品分类ID,事件类型
//u1001,c201,view
//一个用户,在一天之内,浏览了某个分类的商品次数大于2就出发一个事件
public static void main(String[] args) throws Exception {
//获取flink环境
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> dataStream = environment.socketTextStream("node1", 8888);
//进行数据整理
SingleOutputStreamOperator<Tuple3<String, String, String>> mapSource = dataStream.map(lines -> {
String[] fields = lines.split(",");
return Tuple3.of(fields[0], fields[1], fields[2]);
}, Types.TUPLE(Types.STRING, Types.STRING, Types.STRING));
//根据用户ID进行分类 根据用户ID进行分组 用户ID、
SingleOutputStreamOperator<Tuple3<String, String, String>> processStream = mapSource.keyBy(tp -> tp.f0).process(new KeyedProcessFunction<String, Tuple3<String, String, String>, Tuple3<String, String, String>>() {
//初始化map状态对象 (产品类别,事件类型)
private transient MapState<Tuple2<String, String>, Integer> stateVle;
//初始化drools对象
private transient KieSession ksession;
@Override
public void open(Configuration parameters) throws Exception {
//初始化状态值
MapStateDescriptor<Tuple2<String, String>, Integer> descriptor = new MapStateDescriptor<>("category-id-count", TypeInformation.of(new TypeHint<Tuple2<String, String>>() {
}), TypeInformation.of(new TypeHint<Integer>() {
}));
stateVle = getRuntimeContext().getMapState(descriptor);
//初始化drools
KieHelper kieHelper = new KieHelper();
//读取规则文件
String rulesContent = FileUtils.readFileToString(new File("rules/first-demo.drl"), StandardCharsets.UTF_8);
kieHelper.addContent(rulesContent, ResourceType.DRL);
//创建规则匹配的会话
ksession = kieHelper.build().newKieSession();
}
@Override
public void processElement(Tuple3<String, String, String> value, KeyedProcessFunction<String, Tuple3<String, String, String>, Tuple3<String, String, String>>.Context ctx, Collector<Tuple3<String, String, String>> out) throws Exception {
//用户ID
String uid = value.f0;
//产品类别
String categoryId = value.f1;
//事件类型
String type = value.f2;
Tuple2<String, String> key = Tuple2.of(categoryId, type);
Integer viewCount = stateVle.get(key);
if (viewCount == null) {
viewCount = 0;
}
viewCount++;
stateVle.put(key, viewCount);
//建立规则
Event event = new Event(type, viewCount, false);
ksession.insert(event);
//启动规则
ksession.fireAllRules();
if (event.isFlag()) {
out.collect(Tuple3.of(uid, "发优惠券", "满30减50"));
}
}
});
//打印结果
processStream.print();
environment.execute();
}
}
- Drl文件
dialect "java"
import com.zwf.droolspojo.Event
rule "first-demo"
when
$e:Event(type=="view" && count>=2)
then
System.out.println("优惠券发放成功!");
$e.setFlag(true);
end
6、Flink整合Drools综合案例
Flink读取kafka中的商品信息,异步查询clickhouse中的数据进行匹配,同时通过canal实时同步拉取MySQL中自定义规则数据进kafka中,Flink最终消费kafka中同步后的自定义规则数据,触发规则引擎发送消费劵。
- job类
package com.zwf.drools;
import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject;
import com.zwf.droolsUtils.kafkaSourceUtil;
import com.zwf.droolspojo.*;
import com.zwf.service.ClickHouseQueryServiceImp;
import com.zwf.service.QueryService;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;
import org.kie.api.io.ResourceType;
import org.kie.api.runtime.KieSession;
import org.kie.internal.utils.KieHelper;
import java.util.Map;
/**
* @author MrZeng
* @version 1.0
* @date 2024-01-21 16:44
*/
public class FlinkDroolsDemo3 {
public static void main(String[] args) throws Exception {
//获取Flink环境
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.setParallelism(2);
String kafkaServers="node1:9092,master:9092,node2:9092";
//消费kafka中的自定义规则数据
FlinkKafkaConsumer<String> droolRulesStream = kafkaSourceUtil.getKafkaSource(kafkaServers, "drools-rules", "earliest");
//消费kafka中的业务数据(商品数据)
FlinkKafkaConsumer<String> eventsStream = kafkaSourceUtil.getKafkaSource(kafkaServers, "events", "latest");
//声明广播状态描述
MapStateDescriptor<Integer,BroadCastDroolsState> descriptor = new MapStateDescriptor("rules-state", Integer.class, BroadCastDroolsState.class);
//获取kafka中的流数据
DataStreamSource<String> droolsSource = environment.addSource(droolRulesStream);
DataStreamSource<String> eventsSource = environment.addSource(eventsStream);
//处理数据
SingleOutputStreamOperator<DroolsRules> rulesStream = droolsSource.process(new ProcessFunction<String, DroolsRules>() {
@Override
public void processElement(String value, ProcessFunction<String, DroolsRules>.Context ctx, Collector<DroolsRules> out) throws Exception {
JSONObject object = JSONObject.parseObject(value);
//插入或者更新状态 获取canal同步后的自定义规则数据。
Object type = object.get("type");
//只需要获取插入和更新后的数据
if ("INSERT".equals(type) || "UPDATE".equals(type)) {
JSONArray data = object.getJSONArray("data");
for (int i = 0; i < data.size(); i++) {
DroolsRules rules = data.getObject(i, DroolsRules.class);
out.collect(rules);
}
}
}
});
//设置广播变量
BroadcastStream<DroolsRules> broadcastStream = rulesStream.broadcast(descriptor);
//处理事件数据 (uid categoryId type)
KeyedStream<Tuple3<String, String, String>, String> eventsKeyedStream = eventsSource.map(new MapFunction<String, Tuple3<String, String, String>>() {
@Override
public Tuple3<String, String, String> map(String value) throws Exception {
String[] fields = value.split(",");
return fields.length==3?Tuple3.of(fields[0], fields[1], fields[2]):Tuple3.of("","","");
}
}).keyBy(tp -> tp.f0);
eventsKeyedStream.connect(broadcastStream).process(new KeyedBroadcastProcessFunction<String, Tuple3<String, String, String>, DroolsRules, Tuple3<String,String,String>>() {
//设置状态对象
private transient MapState<Tuple2<String,String>,Integer> mapState;
//设置业务接口对象
private transient QueryService queryService;
@Override
public void open(Configuration parameters) throws Exception {
//初始化状态对象
MapStateDescriptor<Tuple2<String, String>, Integer> mapStateDescriptor = new MapStateDescriptor<>("event-view-counts", TypeInformation.of(new TypeHint<Tuple2<String, String>>() {
}), TypeInformation.of(new TypeHint<Integer>() {}));
mapState=getRuntimeContext().getMapState(mapStateDescriptor);
//使用clickhouse数据库接口实现类,可以通过反射创建示例
queryService=new ClickHouseQueryServiceImp();
}
@Override
public void processElement(Tuple3<String, String, String> value, KeyedBroadcastProcessFunction<String, Tuple3<String, String, String>, DroolsRules, Tuple3<String, String, String>>.ReadOnlyContext ctx, Collector<Tuple3<String, String, String>> out) throws Exception {
//获取状态中的值
String uid = value.f0;
String categoryId = value.f1;
String type = value.f2;
//获取状态对象的值
Tuple2<String, String> key = Tuple2.of(categoryId, type);
Integer viewsCounts = mapState.get(key);
if(viewsCounts==null){
viewsCounts=0;
}
viewsCounts++;
mapState.put(key,viewsCounts);
//获取广播中的变量中迭代器
Iterable<Map.Entry<Integer, BroadCastDroolsState>> entryIterable = ctx.getBroadcastState(descriptor).immutableEntries();
//事件对象
Event event = new Event(type, viewsCounts, false);
//获取查询的参数对象
for (Map.Entry<Integer, BroadCastDroolsState> entry: entryIterable){
//获取广播变量中的值
BroadCastDroolsState broadCastDrools = entry.getValue();
//获取sql查询参数对象 广播中的对象
QueryParam queryParam = new QueryParam(broadCastDrools.getSql(), uid, broadCastDrools.getStart_time(), broadCastDrools.getEnd_time(), broadCastDrools.getCounts(), categoryId,type);
//获取DroolsRuleParam对象
DroolsRuleParam droolsRuleParam = new DroolsRuleParam(event, queryService, queryParam,false);
KieSession kieSession = broadCastDrools.getKieSession();
kieSession.insert(droolsRuleParam);
kieSession.fireAllRules();
if(droolsRuleParam.isHit()){
out.collect(Tuple3.of(uid,"发放优惠劵","满30减20"));
}
}
}
//读取规则数据插入广播变量中
@Override
public void processBroadcastElement(DroolsRules value, KeyedBroadcastProcessFunction<String, Tuple3<String, String, String>, DroolsRules, Tuple3<String, String, String>>.Context ctx, Collector<Tuple3<String, String, String>> out) throws Exception {
//处理广播变量
Short status = value.getStatus();
Integer id = value.getId();
String startTime = value.getStart_time();
String endTime = value.getEnd_time();
Integer counts = value.getCounts();
// System.out.println(value.toString());
//如果droolsRules是新增或者更新状态时
//获取广播变量对象
BroadcastState<Integer, BroadCastDroolsState> broadcastState = ctx.getBroadcastState(descriptor);
String code=value.getCode();
String sql=value.getSql();
if(status==null||code==null||sql==null){
status=3;
}
if(status==1||status==2){
KieHelper kieHelper = new KieHelper();
//获取对象中的代码规则
kieHelper.addContent(value.getCode(), ResourceType.DRL);
KieSession kieSession = kieHelper.build().newKieSession();
BroadCastDroolsState broadCastDroolsState = new BroadCastDroolsState(id, kieSession, value.getSql(), startTime, endTime, counts);
broadcastState.put(id,broadCastDroolsState);
}else if(status==3){
//如果是删除状态 就删除广播变量
broadcastState.remove(id);
}
}
}).print();
environment.execute();
}
}
- pojo类(Event、BroadCastDroolsState、DroolsRuleParam、DroolsRules、QueryParam)
package com.zwf.droolspojo;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* @author MrZeng
* @version 1.0
* @date 2024-01-21 10:59
*/
@Data
@NoArgsConstructor //无参构造器
@AllArgsConstructor //有参构造器
public class Event {
//事件类型
private String type;
//浏览数
private Integer count;
//触发优惠券标记
private boolean flag;
}
-----------------------------------------------------------------------------------------
package com.zwf.droolspojo;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.kie.api.runtime.KieSession;
/**
* @author MrZeng
* @version 1.0
* @date 2024-01-21 16:33
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class BroadCastDroolsState {
//DroolsRState实体类 用于广播变量
private Integer id;
//设置drools会话
private KieSession kieSession;
private String sql;
//时间不使用字符串类型 会自定转为时间戳去匹配查询
private String start_time; //生效时间
private String end_time; //失效时间
private Integer counts; //浏览数
}
-----------------------------------------------------------------------------------------
package com.zwf.droolspojo;
import com.zwf.service.QueryService;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* @author MrZeng
* @version 1.0
* @date 2024-01-21 20:21
*/
//DroolsRules规则参数
@Data
@AllArgsConstructor
@NoArgsConstructor
public class DroolsRuleParam {
//规则事件
private Event event;
//查询业务接口
private QueryService queryService;
//查询参数
private QueryParam queryParam;
//命中判断 是否触发规则
private boolean hit;
}
-----------------------------------------------------------------------------------------
package com.zwf.droolspojo;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* @author MrZeng
* @version 1.0
* @date 2024-01-21 16:28
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class DroolsRules {
//DroolsRules规则实体类
private Integer id;
//设置规则代码
private String code;
private String sql;
private String start_time;
private String end_time;
private Integer counts;
private Short status; //1表示新增 2表示更新 3表示删除
}
-----------------------------------------------------------------------------------------
package com.zwf.droolspojo;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* @author MrZeng
* @version 1.0
* @date 2024-01-21 20:24
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
//查询字段
public class QueryParam {
private String sql;
private String uid;
private String start_time;
private String end_time;
private Integer counts;
private String category_id;
private String type;
}
- kafka消费者工具类
package com.zwf.droolsUtils;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.util.Properties;
/**
* @author MrZeng
* @version 1.0
* @date 2024-01-21 15:29
*/
public class kafkaSourceUtil {
public static FlinkKafkaConsumer<String> getKafkaSource(String servers,String topic,String offset){
Properties prop=new Properties();
prop.put("bootstrap.servers",servers);
prop.put("auto.offset.reset",offset);
FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(), prop);
return kafkaConsumer;
}
}
- ClickhouseJDBC工具类
package com.zwf.utils;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
/**
* @author MrZeng
* @version 1.0
* @date 2024-01-21 21:41
*/
public class ClickHouseUtils {
private static Connection connection=null;
private static String ckDriver="com.clickhouse.jdbc.ClickHouseDriver";
private static String userName="default";
private static String passWord="123456";
private static String url="jdbc:clickhouse://192.168.147.110:8123/drools";
static {
try {
Class.forName(ckDriver);
} catch (ClassNotFoundException e) {
throw new RuntimeException(e);
}
}
public static Connection getConnection() throws SQLException {
connection= DriverManager.getConnection(url,userName,passWord);
return connection;
}
}
- clickhouse数据查询JDBC业务
package com.zwf.service;
/**
* @author MrZeng
* @version 1.0
* @date 2024-01-21 19:53
*/
public interface QueryService {
/**
* 业务查询接口
* @param sql 查询sql语句
* @param uid 用户ID
* @param cid category_id 类别ID
* @param type 类型 view addCard pay
* @param start_time 生效时间
* @param end_time 失效时间
* @param counts 查询次数
* @return
*/
boolean queryEventCountRangeTimes(String sql,String uid,String cid,String type,String start_time,String end_time,int counts) throws Exception;
}
package com.zwf.service;
import com.zwf.utils.ClickHouseUtils;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
/**
* @author MrZeng
* @version 1.0
* @date 2024-01-21 20:36
*/
//select count(*) counts from tb_user_event where uid = ? and cid = ? and
// type = ? start_time >= ? and end_time <= ? group by uid ,cid
public class ClickHouseQueryServiceImp implements QueryService{
@Override
public boolean queryEventCountRangeTimes(String sql, String uid, String cid, String type, String start_time, String end_time, int counts) throws Exception {
Connection connection = ClickHouseUtils.getConnection();
PreparedStatement preparedStatement = connection.prepareStatement(sql);
preparedStatement.setString(1,uid);
preparedStatement.setString(2,cid);
preparedStatement.setString(3,type);
preparedStatement.setString(4,start_time);
preparedStatement.setString(5,end_time);
ResultSet resultSet = preparedStatement.executeQuery();
int res=0;
if(resultSet.next()){
res=resultSet.getInt("counts");
}
//kafka测试数据uid cid type要与clickhouse中的数据一致!
//使用次数不能超过sql中规定的次数
return res>=counts;
}
}
- 自定义规则引擎(drl文件)用户浏览同一类别商品超过2次及以上触发优惠券活动。
dialect "java"
import com.zwf.droolspojo.DroolsRuleParam
import com.zwf.droolspojo.QueryParam
import com.zwf.droolspojo.Event
import com.zwf.service.QueryService
rule "first-demo2"
when
$dr:DroolsRuleParam()
$e: Event(type=="view"&&count>=2) from $dr.event
$param:QueryParam() from $dr.queryParam
$Service:QueryService() from $dr.queryService
then
QueryService queryService = $dr.getQueryService();
QueryParam queryParam = $dr.getQueryParam();
boolean flag = queryService.queryEventCountRangeTimes(queryParam.getSql(),queryParam.getUid(),queryParam.getCategory_id(),$e.getType(),queryParam.getStart_time(),queryParam.getEnd_time(),queryParam.getCounts());
$dr.setHit(flag);
end
- MySQL(业务数据库规则数据)
/*
SQLyog Ultimate v12.09 (64 bit)
MySQL - 8.0.16
*********************************************************************
*/
/*!40101 SET NAMES utf8 */;
use lives;
create table `tb_drools_rules` (
`id` int (11),
`name` varchar (60),
`code` text ,
`sql` text ,
`start_time` datetime ,
`end_time` datetime ,
`counts` int (11),
`status` tinyint (4)
);
insert into `tb_drools_rules` (`id`, `name`, `code`, `sql`, `start_time`, `end_time`, `counts`, `status`) values('1','推荐规则一','dialect \"java\"\r\nimport com.zwf.droolspojo.Event\r\nrule \"first-demo\"\r\n when\r\n $e:Event(type==\"view\" && count>=2)\r\n then\r\n System.out.println(\"优惠券发放成功!\");\r\n $e.setFlag(true);\r\nend',NULL,'2024-02-04 21:40:06','2024-04-05 21:40:20','2','3');
insert into `tb_drools_rules` (`id`, `name`, `code`, `sql`, `start_time`, `end_time`, `counts`, `status`) values('2','推荐规则二','dialect \"java\"\r\nimport com.zwf.droolspojo.DroolsRuleParam\r\nimport com.zwf.droolspojo.QueryParam\r\nimport com.zwf.droolspojo.Event\r\nimport com.zwf.service.QueryService\r\nrule \"first-demo2\"\r\n when\r\n $dr:DroolsRuleParam()\r\n $e: Event(type==\"view\"&&count>=2) from $dr.event\r\n $param:QueryParam() from $dr.queryParam\r\n $Service:QueryService() from $dr.queryService\r\n then\r\n QueryService queryService = $dr.getQueryService();\r\n QueryParam queryParam = $dr.getQueryParam();\r\n boolean flag = queryService.queryEventCountRangeTimes(queryParam.getSql(),queryParam.getUid(),queryParam.getCategory_id(),$e.getType(),queryParam.getStart_time(),queryParam.getEnd_time(),queryParam.getCounts());\r\n $dr.setHit(flag);\r\nend\r\n\r\n','select count(*) counts from tb_user_event where uid=? and cid = ? and type = ? and start_time >= ? and end_time <= ? group by uid ,cid','2023-12-01 10:38:28','2024-03-07 10:38:52','2','1');
- ClickHouseSQL(业务数据)
create database drools;
drop table drools.tb_user_event;
-- drools测试表
create table drools.tb_user_event(
id String,
uid String,
spu String,
cid String,
type String,
dt String,
start_time DATETIME64,
end_time DATETIME64,
ts DATETIME default now()
)engine =ReplacingMergeTree(ts)
partition by dt
order by id;
insert into drools.tb_user_event(id,uid,spu,cid,type,dt,start_time,end_time)
values('1','u20020','iphone13','手机','view','2024-01-09 22:34:16','2024-01-10 22:34:16','2024-01-21 22:34:33');
insert into drools.tb_user_event(id,uid,spu,cid,type,dt,start_time,end_time)
values('2','u20020','iphone13','手机','view','2024-01-09 22:34:16','2024-01-10 22:34:16','2024-01-21 22:34:33');
insert into drools.tb_user_event(id,uid,spu,cid,type,dt,start_time,end_time)
values('3','u20019','iphone13','手机','view','2024-01-09 22:34:16','2024-01-10 22:34:16','2024-01-21 22:34:33');
insert into drools.tb_user_event(id,uid,spu,cid,type,dt,start_time,end_time)
values('4','u20019','iphone13','手机','view','2024-01-09 22:34:16','2024-01-10 22:34:16','2024-01-21 22:34:33');
- 配置${CANAL_HOME}/conf/example/instance.properties如下:
canal.mq.dynamicTopic=order-main:lives\\.tb_order,order-detail:lives\\.tb_order_goods,ordermain:lives\\.orderMain,orderdetail:lives\\.orderDetail,drools-rules:lives\\.tb_drools_rules
- 启动canal同步MySQL中的规则引擎数据进kafka中主题为drools_rules队列中
bin/startup.sh
- 创建2个kafka主题
kafka-topics.sh --create --zookeeper node1:2181,master:2181,node2:2181/kafka0110 --topic events --replication-factor 3 --partitions 3
kafka-topics.sh --create --zookeeper node1:2181,master:2181,node2:2181/kafka0110 --topic drools_rules --replication-factor 3 --partitions 3
- 模拟事件数据(通过控制台加入事件数据)
kafka-console-producer.sh --broker-list node1:9092,master:9092,node2:9092 --topic events
标签:数仓,String,flink,实时,笔记,import,apache,org,public
From: https://www.cnblogs.com/smallzengstudy/p/18058895