首页 > 其他分享 >实时数仓项目笔记

实时数仓项目笔记

时间:2024-03-07 15:00:34浏览次数:33  
标签:数仓 String flink 实时 笔记 import apache org public

实时项目笔记处理

一、行为日志数据采集

1648795831582

1、ngx_kafka_module安装

先提前安装好nginx和kafka组件,目的配置nginx,nginx获取到分布式系统的消息轮询进行分发到kafka中进行消费!
  • 安装编译c客户端的kafka源码
#git 拉取librdkafka
git clone https://github.com/edenhill/librdkafka
#进入librdkafka环境目录
cd librdkafka
# 安装c++环境
yum install -y gcc gcc-c++ pcre-devel zlib-devel
# 编译nginx安装包
./configure
# 安装nginx软件
make && make install
  • 安装nginx整合kafka的ngx_kafka_module插件
# 下载nginx插件源码包
git clone https://github.com/brgliuwei/ngx_kafka_module
#进入nginx源码包根目录下
cd /usr/local/nginx/nginx-1.20.2
# 给nginx源码包添加插件
./configure --add-module=/root/ngx_kafka_module
# 安装nginx环境
make && make install
# 测试nginx环境是否安装成功
/usr/local/nginx/sbin/nginx -V
  • 配置nginx.conf文件
#user  nobody;
worker_processes  1;
#error_log  logs/error.log;
#error_log  logs/error.log  notice;
#error_log  logs/error.log  info;
#pid        logs/nginx.pid;
events {
    worker_connections  1024;
}

http {
    include       mime.types;
    default_type  application/octet-stream;
    #log_format  main  '$remote_addr - $remote_user [$time_local] "$request" '
    #                  '$status $body_bytes_sent "$http_referer" '
    #                  '"$http_user_agent" "$http_x_forwarded_for"';
    #access_log  logs/access.log  main;
    sendfile        on;
    #tcp_nopush     on;
    #keepalive_timeout  0;
    keepalive_timeout  65;
    #gzip  on;
     
    # 配置kafka
    kafka;
    kafka_broker_list node1:9092 master:9092 node2:9092;
    
    server {
        listen       80;
        server_name  localhost;
        #charset koi8-r;
        #access_log  logs/host.access.log  main;
        
        #设置kafka请求地址
        location /logs/access/topic {
             #kafka分区轮询分发
		     kafka_partition auto;
		     # 设置的主题是action_log
			 kafka_topic action_log;
		}

        #error_page  404              /404.html;
        # redirect server error pages to the static page /50x.html
        #
        error_page   500 502 503 504  /50x.html;
        location = /50x.html {
            root   html;
        }
    }

}
  • 测试从nginx获取的数据传输进kafka
 # 发送消息进kafka中进行消费
 curl localhost/logs/access/topic -d "message send to kafka topic"
 # 显示发送的详细信息
 curl localhost/logs/access/topic -d "message send to kafka topic" -v
  • 启动kafka消费者
kafka-console-consumer.sh --bootstrap-server node1:9092,master:9092,node2:9092 --topic action_log --from-beginning

注意:如果出现以下库找不到情况下,就重新加载so库。

image-20240115135657280

#lib
echo "/usr/local/lib" >> /etc/ld.so.conf
ldconfig

2、OpenResty(推荐使用)

通过lua脚本语言编写,http请求访问,nginx轮询生成日志行为数据文件,使用flume拉取日志行为文件到kafka中进行消费!

  • 安装OpenResty依赖环境。
yum install -y readline-devel pcre-devel openssl-devel gcc perl wget
  • 使用wget下载OpenResty安装包
wget https://openresty.org/download/openresty-1.13.6.2.tar.gz
  • 解压安装包
tar -zxvf openresty-1.13.6.2.tar.gz -C /usr/local/openresty/
  • 编译OpenResty

进入到openresty自带的和LuaJIT整合的目录,然后编译安装.

cd /usr/local/openresty/openresty-1.13.6.2/bundle/LuaJIT-2.1-20180420

make && make install
  • 切换到openresty的安装目录,预编译openresty
cd /usr/local/src/openresty-1.13.6.2

./configure
  • 编译安装
gmake && gmake install
  • 启动openresty配套的nginx
cd /usr/local/openresty/nginx/sbin/
# 启动OpenResty中nginx
./nginx
  • 查看nginx的详细信息
/usr/local/openresty/nginx/sbin/nginx -V
  • 使用浏览器访问nginx

1649758441840

  • nginx结合lua脚本语言(案例测试)

修改openresty下nginx的配置文件

 vim /usr/local/openresty/nginx/conf/nginx.conf

在http的配置下添加如下配置(测试页面输出字符)

location /lua {  
    default_type 'text/html';  
    content_by_lua 'ngx.say("hello yjx")';  
} 

测试http请求访问

image-20240225150141618

  • 编写lua脚本语言,伪装成空图片,写入到特定指定的日志文件中
location /log.gif {
  #伪装成gif文件
  default_type image/gif;    
  #本身关闭access_log
  access_log off;    
  
  #使用lua将nginx的接收的参数写入到日志文件中
  log_by_lua_file 'conf/log.lua';
  
  #返回空图片
  empty_gif;
}
  • 在nginx的conf目录下创建一个log.lua文件
vim /usr/local/openresty/nginx/conf/log.lua

但是以上方法容易导致行为数据文件过大,造成读写效率变低。

  • 解决方案:

编写lua脚本语言,让行为数据生成一个个小文件数据

-- 引入lua用来解析json的库
local cjson = require "cjson" 
-- 获取请求参数列表
local request_args_tab = ngx.req.get_uri_args()
-- 获取当前系统时间
local time = os.date("%Y%m%d%H",unixtime)
-- 使用lua的io打开一个文件,如果文件不存在,就创建,a为append模式
local path = "/logs/access-" .. time .. ".log"
local file = io.open(path, "a")
-- 定义一个json对象
local log_json = {}
-- 将参数的K和V迭代出来,添加到json对象中
for k, v in pairs(request_args_tab) do
    log_json[k] = v
end
-- 将json写入到指定的log文件,末尾追加换行
file:write(cjson.encode(log_json), "\n")
-- 将数据写入
file:flush()

授予读写权限给/logs目录,让logs目录有权限进行读写操作!

mkdir /logs
chmod o+w /logs

#或者将/logs目录的所属用户改成nobody 
chown -R nobody:nobody /logs

在nginx所在的机器上安装Flume,使用TailDirSource和KafkaChannel将数据采集到Kafka中,也就是使用flume拉取nginx上的行为日志数据。

  • flume配置文件信息(使用kafka channel模式把日志文件数据拉取到kafka中进行消费)
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /root/taildir_position.json
a1.sources.r1.filegroups = f1
#使用正则表达式
a1.sources.r1.filegroups.f1 = /logs/access-.*\.log
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = node1:9092,master:9092,node2:9092 
a1.channels.c1.kafka.consumer.group.id = flume-consumer
a1.channels.c1.kafka.topic = action_log
a1.channels.c1.parseAsFlumeEvent= false
a1.sources.r1.channels = c1
  • OpenResty和kafka整合插件

下载插件源码

 wget https://github.com/doujiang24/lua-resty-kafka/archive/master.zip

解压lua-resty-kafka的源码

#首先安装unzip解压工具,如果没有按照unzip会报错
yum install -y unzip
#解压lua-resty-kafka的源码
unzip master.zip -d /usr/local/src/
#在/usr/local/openresty/lualib下创建一个kafka目录,用于保存lua-resty-kafka整合的模块代码
mkdir /usr/local/openresty/lualib/kafka
#将lua-resty-kafka整合的模块代码拷贝到/usr/local/openresty/lualib/kafka目录
cp -r /usr/local/src/lua-resty-kafka-master/lib/resty/ /usr/local/openresty/lualib/kafka/

修改nginx的配置文件

vim /usr/local/openresty/nginx/conf/nginx.conf

在http配置的下面、server配置同级的部分添加如下配置

lua_package_path "/usr/local/openresty/lualib/kafka/?.lua;;";
  • 在server配置的下面添加如下配置
location /log {
       
}
  • 启动Zookeeper和kafka
zookeeper-3.4.10/bin/zkServer.sh start

kafka_2.11-0.10.2.1/bin/kafka-server-start.sh -daemon /bigdata/kafka_2.11-0.10.2.1/config/server.properties
  • 创建主题
kafka_2.11-0.10.2.1/bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic test --replication-factor 3 --partitions 3 

3、FileBeat采集行为数据

Filebeat 是一个用于转发和集中日志数据的轻量级传送器。作为代理安装在您的服务器上,Filebeat 监控您指定的日志文件或位置,收集日志事件,并将它们转发到ElasticsearchLogstash以进行索引。

当您启动 Filebeat 时,它会启动一个或多个输入,这些输入会在您为日志数据指定的位置中查找。对于 Filebeat 定位的每个日志,Filebeat 都会启动一个收割机。每个harvester 读取单个日志以获取新内容并将新日志数据发送到libbeat,libbeat 聚合事件并将聚合数据发送到您为Filebeat 配置的输出。

filebeat

  • FileBeat和Flume比较
指标 Flume FileBeat
内存
CPU
背压敏感协议
插件 需要写API
功能 从多种输入端采集到多种输出端 传输
轻重 相对较重 轻量级二进制文件
过滤能力 自带了分区和拦截器的功能 有过滤能力但是较弱
进程 一台服务有多个进程,挂掉之后需要手动拉起 十分稳定
编写语言 Java Go
集群 分布式 单节点
二次开发或扩展开发 一般
  • 安装

官网:https://www.elastic.co/cn/downloads/beats

curl -L -O https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-8.1.3-x86_64.rpm

##默认安装路径在 /usr/share/filebeat/

sudo rpm -vi filebeat-8.1.3-x86_64.rpm
  • 开启kafka模式
filebeat modules list  #查看需要启动的模块
filebeat modules enable kafka #在安装目录下,启用一个或多个模块
  • 创建采集配置文件(上传filebeat2kafka.yml文件到/etc/filebeat/文件目录下)
filebeat.inputs:
- type: log
  paths:
    - /opt/data/data.json

output.kafka:
  # initial brokers for reading cluster metadata
  hosts: ["node01:9092", "node02:9092", "node03:9092"]

  # message topic selection + partitioning
  topic: filebeat
  partition.round_robin:
    reachable_only: true

  required_acks: 1
  max_message_bytes: 1000000
  • kafka0.11版本以下使用以下配置
filebeat.inputs:
- type: log
  paths:
    - /home/data/data.json

output.kafka:
  version: 0.11.0.2
  enabled: true
  # initial brokers for reading cluster metadata
  hosts: ["node1:9092", "master:9092", "node2:9092"]

  # message topic selection + partitioning
  topic: filebeat
  partition.round_robin:
    reachable_only: true

  required_acks: 1
  max_message_bytes: 1000000
  • 开启FileBeat工具采集日志行为数据
filebeat -e -c filebeat2kafka.yml

二、乐字节直播项目搭建

1、导包(普通Maven工程)

 <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>11</maven.compiler.source>
    <maven.compiler.target>11</maven.compiler.target>
    <flink.version>1.15.2</flink.version>
    <scala.version>2.12.2</scala.version>
    <log4j.version>2.12.1</log4j.version>
  </properties>

  <dependencies>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-java</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-java</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-scala_2.12</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-scala_2.12</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <!--flink客户端-->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-clients</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <!--本地运行的webUI-->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-runtime-web</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <!--flink与kafka整合-->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-kafka</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-base</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-jdbc</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>mysql</groupId>
      <artifactId>mysql-connector-java</artifactId>
      <version>8.0.16</version>
    </dependency>
    <!--状态后端-->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-statebackend-rocksdb</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <!--日志系统-->
    <dependency>
      <groupId>org.apache.logging.log4j</groupId>
      <artifactId>log4j-slf4j-impl</artifactId>
      <version>${log4j.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.logging.log4j</groupId>
      <artifactId>log4j-api</artifactId>
      <version>${log4j.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.logging.log4j</groupId>
      <artifactId>log4j-core</artifactId>
      <version>${log4j.version}</version>
    </dependency>
    <dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-core</artifactId>
      <version>5.3.21</version>
    </dependency>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-client</artifactId>
      <version>3.3.4</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-avro</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-cep</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <!--json格式依赖-->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-json</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <!--csv格式依赖-->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-csv</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <!-- Flink SQL -->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-common</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-planner_2.12</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-api-java</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-api-scala_2.12</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-api-java-bridge</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-api-scala-bridge_2.12</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <!-- Flink CDC 的依赖 -->
    <dependency>
      <groupId>com.ververica</groupId>
      <artifactId>flink-connector-mysql-cdc</artifactId>
      <version>2.3.0</version>
    </dependency>

    <!-- flink与File整合的依赖 -->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-files</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <!--fastjson工具类-->
    <dependency>
      <groupId>com.alibaba.fastjson2</groupId>
      <artifactId>fastjson2</artifactId>
      <version>2.0.41</version>
    </dependency>
    <dependency>
      <groupId>org.apache.commons</groupId>
      <artifactId>commons-compress</artifactId>
      <version>1.21</version>
    </dependency>
    <!--Alibaba Druid数据库连接池-->
    <dependency>
      <groupId>com.alibaba</groupId>
      <artifactId>druid</artifactId>
      <version>1.2.15</version>
    </dependency>
    <!--发送异步HTTP请求的Java工具包 -->
    <dependency>
      <groupId>org.apache.httpcomponents</groupId>
      <artifactId>httpasyncclient</artifactId>
      <version>4.1.4</version>
    </dependency>
    <!--连接clickhouse 驱动包-->
    <dependency>
      <groupId>com.clickhouse</groupId>
      <artifactId>clickhouse-jdbc</artifactId>
      <version>0.4.6</version>
    </dependency>
    <!--redis连接包-->
    <dependency>
      <groupId>org.apache.bahir</groupId>
      <artifactId>flink-connector-redis_2.12</artifactId>
      <version>1.1.0</version>
    </dependency>
    <!--drools规则-->
    <dependency>
      <groupId>org.drools</groupId>
      <artifactId>drools-compiler</artifactId>
      <version>7.23.0.Final</version>
    </dependency>

    <dependency>
      <groupId>org.projectlombok</groupId>
      <artifactId>lombok</artifactId>
      <version>1.18.2</version>
    </dependency>
  </dependencies>

2、编写Flink工具类

package com.zwf.utils;
/**
 * @author MrZeng
 * @version 1.0
 * @date 2024-01-15 17:04
 */

import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;

import java.util.Arrays;
import java.util.List;
import java.util.Properties;

/**
 * Flink工具类
 */
public class FlinkUtils {
    /**
     * @param ConfigPath 外部配置文件信息
     * @param serialize  序列化器类
     * @param <T>        返回类型
     * @return
     */
    //创建flink环境
    public static final   StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
    //获取外部文件参数对象
    public static ParameterTool param=null;
    public static <T> DataStream<T> createKafka(String configPath, Class<? extends DeserializationSchema<T>> deserialization) throws Exception{
     //加载外部文件配置
        param=ParameterTool.fromPropertiesFile(configPath);
        //获取checkpoint间隔时间
        long chk = param.getLong("checkpoint.interval",3000L);
        //获取checkpoint路径
        String ckPath = param.get("checkpoint.path");
        //获取kafka topic
        String topics = param.get("kafka.topics");
        env.enableCheckpointing(chk, CheckpointingMode.EXACTLY_ONCE);
        //设置chk文件系统路径
        env.setStateBackend(new FsStateBackend(ckPath));
        //设置外部存储策略
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        //获取topic数组集
        String[] split = topics.split(",");
        List<String> topis = Arrays.asList(split);
        Properties properties = param.getProperties();
        FlinkKafkaConsumer<T> kafkaConsumer = new FlinkKafkaConsumer<>(topis, deserialization.newInstance(), properties);
        //不提交偏移量进入chk
        kafkaConsumer.setCommitOffsetsOnCheckpoints(false);

         return env.addSource(kafkaConsumer);
    }

    /**
     *
     * @param args  参数
     * @param kafkaMeta  kafka自定义反序列化器获取kafka元数据
     * @return
     * @param <T>
     * @throws Exception
     */
    public static <T> DataStream<T> createKafkaV2(String args,Class<? extends KafkaDeserializationSchema<T>> kafkaMeta) throws Exception {
        param=ParameterTool.fromPropertiesFile(args);
        Long interval = param.getLong("checkpoint.interval", 3000L);
        String chkPath = param.get("checkpoint.path");
        String topics = param.getRequired("kafka.topics");
        List<String> topicList = Arrays.asList(topics.split(","));
        //开启状态保存
        env.enableCheckpointing(interval,CheckpointingMode.EXACTLY_ONCE);
         //设置RockDB状态后端
        env.setStateBackend(new EmbeddedRocksDBStateBackend());
        env.getCheckpointConfig().setCheckpointStorage(chkPath);
       //设置外部存储清除策略
        env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        Properties properties = param.getProperties();
        //设置kafka source
        FlinkKafkaConsumer<T> consumer = new FlinkKafkaConsumer(topicList, kafkaMeta.newInstance(), properties);
       //是否应在检查点上将偏移量提交回Kafka
        consumer.setCommitOffsetsOnCheckpoints(false);
        return env.addSource(consumer);
    }

    //根据主题获取数据流
    public static <T> DataStream<T> createKafkaV2(ParameterTool tools,String topic,Class<? extends DeserializationSchema<T>> deserialization) throws Exception {
          //检查点生成间隔时间
        Long interval = tools.getLong("checkpoint.interval", 3000L);
        String chkPath = tools.get("checkpoint.path");
        //开启状态保存
        env.enableCheckpointing(interval,CheckpointingMode.EXACTLY_ONCE);
        //设置RockDB状态后端
        env.setStateBackend(new EmbeddedRocksDBStateBackend());
        env.getCheckpointConfig().setCheckpointStorage(chkPath);
        //设置外部存储清除策略
        env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        Properties properties = tools.getProperties();
        //设置kafka source
        FlinkKafkaConsumer<T> consumer = new FlinkKafkaConsumer(topic,deserialization.newInstance(), properties);
        //是否应在检查点上将偏移量提交回Kafka
        consumer.setCommitOffsetsOnCheckpoints(false);
        return env.addSource(consumer);
    }
}

3、flink&kafka配置信息

运行时的配置参数定义

checkpoint.interval=5000
checkpoint.path=hdfs://master:8020/chk/statebackend
kafka.topics=order-main
bootstrap.servers=node1:9092,master:9092,node2:9092
auto.offset.reset=earliest
group.id=zwf_9870
isolation.level=read_committed
amap.http.key=26ccd8417f5df7255027dbe118d5258d
amap.http.url=https://restapi.amap.com/v3/geocode/regeo
clickhouse.batch.size=3
clickhouse.batch.interval=5000
kafka.input.main=ordermain
kafka.input.detail=orderdetail

4、log4j日志文件(log4j2.properties)

rootLogger.level =Info
rootLogger.appenderRef.console.ref = ConsoleAppender
appender.console.name = ConsoleAppender
appender.console.type = CONSOLE
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n

三、实时指标计算

1、新老用户统计

  • 导入数据到kafka中
cat data.json | kafka-console-producer.sh --broker-list node1:9092,master:9092,node2:9092 --topic weblogs

kafka-console-consumer.sh --bootstrap-server node1:9092,master:9092,node2:9092 --topic weblogs --from-beginning
  • 数据格式
{
"carrier":"中国电信",    
"deviceId":"aff486e2-1942-45ae-8623-9d6ef1f29b2f",    
"deviceType":"IPHONE7PLUS",    
"eventId":"appLaunch",    
"id":79,    
"isNew":1,    
"latitude":37.942441944941706,    
"longitude":115.21183014458548,     
"netType":"WIFI",    
"osName":"ios",    
"osVersion":"8.2",    
"releaseChannel":"AppStore",    
"resolution":"2048*1024",    
"sessionId":"fqKovEGyuNM0",    
"timestamp":1615814321410
}
  • 字段格式说明
carrier: 运营商
deviceId: 设备ID
eventId:事件类型ID(appLaunch即App启动)
isNew:1为新用户,0为老用户
latitude:纬度
longitude:经度
netType:网络类型
osName:操作系统
osVersion:操作系统版本
releaseChannel:下载渠道
resolution:屏幕分辨率
sessionId:会话ID
timestamp:时间戳
osName:操作系统
  • 编码
//pojo实体类
package com.zwf.pojo;
/**
 * @author MrZeng
 * @version 1.0
 * @date 2024-01-16 11:01
 */

import java.util.HashMap;


public class DataBean {
   //唯一标识码
    private String id;
    //Unique Device Identifier,唯一设备标识码
    //private String udid;
    //设备ID
    private String deviceId;
     //全局用户ID
    private String guid;
    //账户ID
    private String account;
    //
    private String appId;
      //应用版本
    private String appVersion;
    //运营商
    private String carrier;
    //设备类型
    private String deviceType;
    //事件ID
    private String eventId;
   //IP地址
    private String ip;
    //纬度
    private Double latitude;
    //经度
    private Double longitude;
      //网络类型
    private String netType;
     //操作系统名字
    private String osName;
      //操作系统版本
    private String osVersion;
     //下载渠道
    private String releaseChannel;
      //屏幕分辨率
    private String resolution;
     //会话ID
    private String sessionId;
      //事件时间戳
    private Long timestamp;
      //切割的会话ID  30分钟切割一次
    private String newSessionId;
     //国家
    private String country;
    //省
    private String province;
    //市
    private String city;
     //县
    private String region;
    //日期
    private String date;
     //小时
    private String hour;
     //设置
    private HashMap<String, Object> properties;
     //最后更新时间戳
    private Long lastUpdate;
   
    private int isNew; //数据存在是否是一个新用户(通常不存在)

    //是不是新用户,如果为1为新用户,如果为0为老用户
    private int isN;

    public DataBean(){}


    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public String getDeviceId() {
        return deviceId;
    }

    public void setDeviceId(String deviceId) {
        this.deviceId = deviceId;
    }

    public String getGuid() {
        return guid;
    }

    public void setGuid(String guid) {
        this.guid = guid;
    }

    public String getAccount() {
        return account;
    }

    public void setAccount(String account) {
        this.account = account;
    }

    public String getAppId() {
        return appId;
    }

    public void setAppId(String appId) {
        this.appId = appId;
    }

    public String getAppVersion() {
        return appVersion;
    }

    public void setAppVersion(String appVersion) {
        this.appVersion = appVersion;
    }

    public String getCarrier() {
        return carrier;
    }

    public void setCarrier(String carrier) {
        this.carrier = carrier;
    }

    public String getDeviceType() {
        return deviceType;
    }

    public void setDeviceType(String deviceType) {
        this.deviceType = deviceType;
    }

    public String getEventId() {
        return eventId;
    }

    public void setEventId(String eventId) {
        this.eventId = eventId;
    }

    public String getIp() {
        return ip;
    }

    public void setIp(String ip) {
        this.ip = ip;
    }

    public Double getLatitude() {
        return latitude;
    }

    public void setLatitude(Double latitude) {
        this.latitude = latitude;
    }

    public Double getLongitude() {
        return longitude;
    }

    public void setLongitude(Double longitude) {
        this.longitude = longitude;
    }

    public String getNetType() {
        return netType;
    }

    public void setNetType(String netType) {
        this.netType = netType;
    }

    public String getOsName() {
        return osName;
    }

    public void setOsName(String osName) {
        this.osName = osName;
    }

    public String getOsVersion() {
        return osVersion;
    }

    public void setOsVersion(String osVersion) {
        this.osVersion = osVersion;
    }

    public String getReleaseChannel() {
        return releaseChannel;
    }

    public void setReleaseChannel(String releaseChannel) {
        this.releaseChannel = releaseChannel;
    }

    public String getResolution() {
        return resolution;
    }

    public void setResolution(String resolution) {
        this.resolution = resolution;
    }

    public String getSessionId() {
        return sessionId;
    }

    public void setSessionId(String sessionId) {
        this.sessionId = sessionId;
    }

    public Long getTimestamp() {
        return timestamp;
    }

    public void setTimestamp(Long timestamp) {
        this.timestamp = timestamp;
    }

    public String getNewSessionId() {
        return newSessionId;
    }

    public void setNewSessionId(String newSessionId) {
        this.newSessionId = newSessionId;
    }

    public String getCountry() {
        return country;
    }

    public void setCountry(String country) {
        this.country = country;
    }

    public String getProvince() {
        return province;
    }

    public void setProvince(String province) {
        this.province = province;
    }

    public String getCity() {
        return city;
    }

    public void setCity(String city) {
        this.city = city;
    }

    public String getRegion() {
        return region;
    }

    public void setRegion(String region) {
        this.region = region;
    }

    public HashMap<String, Object> getProperties() {
        return properties;
    }

    public void setProperties(HashMap<String, Object> properties) {
        this.properties = properties;
    }

    public Long getLastUpdate() {
        return lastUpdate;
    }

    public void setLastUpdate(Long lastUpdate) {
        this.lastUpdate = lastUpdate;
    }

    public int getIsNew() {
        return isNew;
    }

    public void setIsNew(int isNew) {
        this.isNew = isNew;
    }

    public int getIsN() {
        return isN;
    }

    public void setIsN(int isN) {
        this.isN = isN;
    }

    public String getDate() {
        return date;
    }

    public void setDate(String date) {
        this.date = date;
    }

    public String getHour() {
        return hour;
    }

    public void setHour(String hour) {
        this.hour = hour;
    }

    @Override
    public String toString() {
        return "DataBean{" +
                "id=" + id +
                ", deviceId='" + deviceId + '\'' +
                ", guid='" + guid + '\'' +
                ", account='" + account + '\'' +
                ", appId='" + appId + '\'' +
                ", appVersion='" + appVersion + '\'' +
                ", carrier='" + carrier + '\'' +
                ", deviceType='" + deviceType + '\'' +
                ", eventId='" + eventId + '\'' +
                ", ip='" + ip + '\'' +
                ", latitude=" + latitude +
                ", longitude=" + longitude +
                ", netType='" + netType + '\'' +
                ", osName='" + osName + '\'' +
                ", osVersion='" + osVersion + '\'' +
                ", releaseChannel='" + releaseChannel + '\'' +
                ", resolution='" + resolution + '\'' +
                ", sessionId='" + sessionId + '\'' +
                ", timestamp=" + timestamp +
                ", newSessionId='" + newSessionId + '\'' +
                ", country='" + country + '\'' +
                ", province='" + province + '\'' +
                ", city='" + city + '\'' +
                ", region='" + region + '\'' +
                ", properties=" + properties +
                ", lastUpdate=" + lastUpdate +
                ", isNew=" + isN +
                '}';
    }
}
  • job工作类
package com.zwf.jobs;

import com.zwf.constant.EventID;
import com.zwf.pojo.DataBean;
import com.zwf.udf.JsonToDataBean;
import com.zwf.utils.FlinkUtils;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;

/**
 * @author MrZeng
 * @version 1.0
 * @date 2024-01-16 9:28
 */
public class UserCount {
    public static void main(String[] args) throws Exception {
        DataStream<String> stream = FlinkUtils.createKafka(args[0], SimpleStringSchema.class);
           //把json字符串转为javabean
        //过滤出applaunch(手机app启动)事件中 每个设备 新老用户 标记:1  (0表示老用户、1表示新用户)
        SingleOutputStreamOperator<DataBean> dataSource = stream.process(new JsonToDataBean());
        SingleOutputStreamOperator<Tuple3<String, Integer, Integer>> sumRes = dataSource.filter(x -> EventID.APP_LAUNCH.equals(x.getEventId())).map(x -> Tuple3.of(x.getDeviceType(), x.getIsNew(), 1), Types.TUPLE(Types.STRING, Types.INT, Types.INT))
                .keyBy(x -> Tuple2.of(x.f0, x.f1), Types.TUPLE(Types.STRING, Types.INT)).sum(2);
//        sumRes.print();
        //过滤eventID=applaunch出新老用户数量  计算新老用户总人数
        sumRes.map(x->Tuple2.of(x.f1,x.f2),Types.TUPLE(Types.INT, Types.INT)).keyBy(x->x.f0).sum(1).print();
        FlinkUtils.env.execute();
    }

}
  • 编写自定义函数把json字符串转换为java Bean对象

    在Flink中实现keyBy,要按照很多的维度进行keyBy,然后sum,在sink。


package com.zwf.udf;

import com.alibaba.fastjson2.JSONObject;
import com.zwf.pojo.DataBean;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;

/**
 * @author MrZeng
 * @version 1.0
 * @date 2024-01-16 11:01
 */
public class JsonToDataBean extends ProcessFunction<String, DataBean> {

    @Override
    public void processElement(String value, ProcessFunction<String, DataBean>.Context ctx, Collector<DataBean> out) throws Exception {
        try {
            DataBean dataBean = JSONObject.parseObject(value, DataBean.class);
            out.collect(dataBean);
        }catch (Exception e){

        }
    }
}
  • 异步IO获取地理位置(经纬度转省市区)

本项目使用了高德地图API,对数据的经纬度进行转换,把数据中的经纬度转换为省市区!

采用httpclient工具,对高德地图API进行异步请求操作,在查询的时候把经纬度转换为省市区。

官网地址:https://lbs.amap.com/api/webservice/guide/api/georegeo

  • udf(富有异步接口对api进行http请求获取数据)
package com.zwf.udf;

import com.alibaba.fastjson2.JSONObject;
import com.zwf.pojo.DataBean;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.apache.http.HttpResponse;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
import org.apache.http.impl.nio.client.HttpAsyncClients;
import org.apache.http.util.EntityUtils;

import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.function.Supplier;

/**
 * @author MrZeng
 * @version 1.0
 * @date 2024-01-16 14:53
 */
public class LocationFunction extends RichAsyncFunction<DataBean,DataBean> {
     //创建异步请求对象
    CloseableHttpAsyncClient httpclient=null;
             //创建url
    private  String url=null;
            //创建key
    private String key=null;
           //创建最大连接数
    private int maxConnectNum;

    public LocationFunction(String url, String key, int maxConnectNum) {
        this.url = url;
        this.key = key;
        this.maxConnectNum = maxConnectNum;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        //构建配置信息
        RequestConfig config=RequestConfig.custom().build();
        //构建异步连接池
          httpclient= HttpAsyncClients.custom()
                  .setDefaultRequestConfig(config)
                  .setMaxConnTotal(maxConnectNum).build();
            //开启异步请求
          httpclient.start();
    }

    @Override
    public void asyncInvoke(DataBean bean, ResultFuture<DataBean> resultFuture) {
        //获取经纬度
        Double longitude = bean.getLongitude();
        Double latitude = bean.getLatitude();
        //构建http Get请求
        HttpGet get=new HttpGet(url+"?location="+longitude+","+latitude+"&key="+key);
         //发送get请求
        Future<HttpResponse> execute = httpclient.execute(get, null);
        //获取相应结果
        CompletableFuture.supplyAsync(new Supplier<DataBean>() {
            @Override
            public DataBean get() {
                //获取http响应
                HttpResponse response=null;
                try {
                    //省
                    String province = null;
                    //市
                    String city = null;
                    //地区
                    String region=null;
                    //国家
                    String country=null;
                    //获取响应信息
                    response = execute.get();
                    if(response.getStatusLine().getStatusCode()==200){
                        //解析返回的字段串
                        String resInfo = EntityUtils.toString(response.getEntity());
                        JSONObject jsonObject = JSONObject.parseObject(resInfo);
                        JSONObject regeocode = jsonObject.getJSONObject("regeocode");
                           //获取的对象数据存在并且不为null.
                        if(regeocode!=null&&!regeocode.isEmpty()){
                            JSONObject addressComponent = regeocode.getJSONObject("addressComponent");
                            province = addressComponent.getString("province");
                            country= addressComponent.getString("country");
                            city = addressComponent.getString("city");
                            region = addressComponent.getString("district");

                        }
                    }
                    bean.setProvince(province);
                    bean.setCity(city);
                    bean.setRegion(region);
                    bean.setCountry(country);
                    return bean;
                }catch (Exception e){
                     return null;
                }
            }
            //返回一个DataBean对象
        }).thenAccept((DataBean resp)->{
            resultFuture.complete(Collections.singleton(resp));
        });

    }

    @Override
    public void close() throws Exception {
       httpclient.close();
    }
}
  • 创建job对象类
package com.zwf.jobs;

import com.zwf.constant.EventID;
import com.zwf.pojo.DataBean;
import com.zwf.udf.JsonToDataBean;
import com.zwf.udf.LocationFunction;
import com.zwf.utils.FlinkUtils;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;

import java.util.concurrent.TimeUnit;

/**
 * @author MrZeng
 * @version 1.0
 * @date 2024-01-16 9:28
 */
public class UserCount2 {
    public static void main(String[] args) throws Exception {
        DataStream<String> stream = FlinkUtils.createKafka(args[0], SimpleStringSchema.class);
           //把json字符串转为javabean
        //过滤出applaunch事件中 每个设备新老用户数量
        SingleOutputStreamOperator<DataBean> dataSource = stream.process(new JsonToDataBean());
        //过滤出app客户端运行产生的数据
        SingleOutputStreamOperator<DataBean> filter = dataSource.filter(x -> EventID.APP_LAUNCH.equals(x.getEventId()));
        //转换经纬度
        String key = FlinkUtils.param.getRequired("amap.http.key");
        String url = FlinkUtils.param.getRequired("amap.http.url");
        SingleOutputStreamOperator<DataBean> source = AsyncDataStream.unorderedWait(filter, new LocationFunction(url, key, 50), 5, TimeUnit.MINUTES);
        //计算每个省的新老用户人数
        source.map(line->Tuple3.of(line.getProvince(),line.getIsNew(),1),Types.TUPLE(Types.STRING, Types.INT,Types.INT))
                        .keyBy(v->Tuple2.of(v.f0,v.f1),Types.TUPLE(Types.STRING,Types.INT)).sum(2).print();

        FlinkUtils.env.execute();
    }

}

2、客户访问数及用户数(去重)

同一个用户重复访问一个活动,或者多个用户重复访问一个活动情况下,我们需要对数据进行去重,才能获取到用户数或者活动访问数

缺点:使用hashSet会造成资源的大量消耗!

  • 案例(使用状态对象去重)
package com.zwf.Demo;

import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

import java.util.HashSet;

/**
 * @author MrZeng
 * @version 1.0
 * @date 2024-01-16 20:48
 */

/**
 * u1,A1,view
 *  u1,A1,view
 *  u1,A1,view
 *  u1,A1,join
 *  u1,A1,join
 *  u2,A1,view
 *  u2,A1,join
 *  u1,A2,view
 *  u1,A2,view
 *  u1,A2,join
 */
public class ActiveCount {
    public static void main(String[] args) throws Exception {
        //统计用户 activeId  eventId
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> source = environment.socketTextStream("192.168.147.110", 8888);
        SingleOutputStreamOperator<Tuple3<String, String, String>> map = source.map(lines -> {
            String[] split = lines.split(",");
            return Tuple3.of(split[0], split[1], split[2]);
        }, Types.TUPLE(Types.STRING, Types.STRING, Types.STRING));
        //对用户的活动ID和事件ID进行分组
        KeyedStream<Tuple3<String, String, String>, Tuple2<String, String>> keyedStream = map.keyBy(tup -> Tuple2.of(tup.f1, tup.f2),Types.TUPLE(Types.STRING,Types.STRING));
        //求出每个活动ID和事件ID的用户数 和 浏览记录数
        keyedStream.process(new KeyedProcessFunction<Tuple2<String, String>, Tuple3<String, String, String>, Tuple4<String,String,Integer,Integer>>() {
               //定义两个valueState 一个记录uid去重 一个记录浏览数
            private transient ValueState<HashSet<String>> vUidCount=null;
            private transient ValueState<Integer> pageViewCount=null;
            @Override
            public void processElement(Tuple3<String, String, String> value, KeyedProcessFunction<Tuple2<String, String>, Tuple3<String, String, String>, Tuple4<String, String, Integer, Integer>>.Context ctx, Collector<Tuple4<String, String, Integer, Integer>> out) throws Exception {
                String uid = value.f0;
                HashSet<String> suid = vUidCount.value();
                if(suid==null){
                    suid=new HashSet<>();
                }
                suid.add(uid);
                vUidCount.update(suid);
                //页面访问数
                Integer uCount = pageViewCount.value();
                if(uCount==null){
                    uCount=0;
                }
                uCount++;
                pageViewCount.update(uCount);
                    //活动id  事件id  用户数  页面浏览数
                out.collect(Tuple4.of(value.f1, value.f2,suid.size(),uCount));
            }

            @Override
            public void open(Configuration parameters) throws Exception {
                //用户数统计
                ValueStateDescriptor<HashSet<String>> uidState = new ValueStateDescriptor<>("uidCount", TypeInformation.of(new TypeHint<HashSet<String>>() {}));
                vUidCount=getRuntimeContext().getState(uidState);
                  //页面访问数
                ValueStateDescriptor<Integer> uidCount = new ValueStateDescriptor<>("pageViewCount", Types.INT);
                pageViewCount=getRuntimeContext().getState(uidCount);

            }


        }).print();


      environment.execute();

    }
}

3、使用布隆过滤器去重

优点:轻量级,耗费资源少,通过判断数据一定不存在来对数据进行去重操作,可以用来统计用户数和用户浏览数。

  • 案例
package com.zwf.Demo;

import com.google.common.hash.BloomFilter;
import com.google.common.hash.Funnels;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

/**
 * @author MrZeng
 * @version 1.0
 * @date 2024-01-16 20:48
 */

/**
 * u1,A1,view
 *  u1,A1,view
 *  u1,A1,view
 *  u1,A1,join
 *  u1,A1,join
 *  u2,A1,view
 *  u2,A1,join
 *  u1,A2,view
 *  u1,A2,view
 *  u1,A2,join
 */
public class ActiveCount1 {
    public static void main(String[] args) throws Exception {
        //统计用户 activeId  eventId
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> source = environment.socketTextStream("192.168.147.110", 8888);
        SingleOutputStreamOperator<Tuple3<String, String, String>> map = source.map(lines -> {
            String[] split = lines.split(",");
            return Tuple3.of(split[0], split[1], split[2]);
        }, Types.TUPLE(Types.STRING, Types.STRING, Types.STRING));
        //对用户的活动ID和事件ID进行分组
        KeyedStream<Tuple3<String, String, String>, Tuple2<String, String>> keyedStream = map.keyBy(tup -> Tuple2.of(tup.f1, tup.f2),Types.TUPLE(Types.STRING,Types.STRING));
        //求出每个活动ID和事件ID的用户数 和 浏览记录数
        keyedStream.process(new KeyedProcessFunction<Tuple2<String, String>, Tuple3<String, String, String>, Tuple4<String,String,Integer,Integer>>() {
               //定义两个valueState 一个记录uid去重 一个记录浏览数
            private transient ValueState<Integer> countUid=null;
            private transient ValueState<Integer> vPageCount=null;
            private transient ValueState<BloomFilter<String>> bloomFilter;

            @Override
            public void processElement(Tuple3<String, String, String> value, KeyedProcessFunction<Tuple2<String, String>, Tuple3<String, String, String>, Tuple4<String, String, Integer, Integer>>.Context ctx, Collector<Tuple4<String, String, Integer, Integer>> out) throws Exception {
                String uid = value.f0;
                BloomFilter<String> filter = bloomFilter.value();
                Integer uidCount = countUid.value();
                 //filter对象为null
                if(filter==null){
                    //初始化布隆过滤器
                    filter = BloomFilter.create(Funnels.unencodedCharsFunnel(), 10000);
                    uidCount=0;
                }

                //如果布隆过滤器中不存在用户id 就计数一次
                if(!filter.mightContain(uid)){
                    filter.put(uid);
                    uidCount++;
                }
                //更新布隆对象
                bloomFilter.update(filter);
                //更新用户数
                countUid.update(uidCount);

                Integer uCount = vPageCount.value();
                if(uCount==null){
                    uCount=0;
                }
                uCount++;
                vPageCount.update(uCount);
                    //活动id  事件id  用户数  页面浏览数
                out.collect(Tuple4.of(value.f1, value.f2,uidCount,uCount));
            }

            @Override
            public void open(Configuration parameters) throws Exception {
                //布隆过滤器

                ValueStateDescriptor<BloomFilter<String>> bloom = new ValueStateDescriptor<>("bloomfilter", TypeInformation.of(new TypeHint<BloomFilter<String>>() {}));
                 bloomFilter=getRuntimeContext().getState(bloom);

                ValueStateDescriptor<Integer> uidState = new ValueStateDescriptor<>("uidState", TypeInformation.of(new TypeHint<Integer>() {}));
                countUid=getRuntimeContext().getState(uidState);

                ValueStateDescriptor<Integer> uidCount = new ValueStateDescriptor<>("vUidCount", Types.INT);
                vPageCount=getRuntimeContext().getState(uidCount);

            }

        }).print();

      environment.execute();

    }
}
  • 项目实战

使用布隆过滤器判断是否存在设备ID,不存在设备ID就标记为新设备!

package com.zwf.udf;

import com.google.common.hash.BloomFilter;
import com.google.common.hash.Funnels;
import com.zwf.pojo.DataBean;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

/**
 * @author MrZeng
 * @version 1.0
 * @date 2024-01-17 9:58
 */
public class DeviceIdBindIsNewFunction extends KeyedProcessFunction<String, DataBean,DataBean> {

    private transient ValueState<BloomFilter<String>>  bloomFilter;

    @Override
    public void open(Configuration parameters) throws Exception {
        //初始化布隆过滤器
        ValueStateDescriptor<BloomFilter<String>> deviceIdBloom = new ValueStateDescriptor<>("device_Id_Bloom", TypeInformation.of(new TypeHint<BloomFilter<String>>() {}));
         bloomFilter=getRuntimeContext().getState(deviceIdBloom);
    }

    @Override
    public void processElement(DataBean dataBean, KeyedProcessFunction<String, DataBean, DataBean>.Context ctx, Collector<DataBean> out) throws Exception {
        String deviceId = dataBean.getDeviceId();
        BloomFilter<String> filter = bloomFilter.value();
        if(filter==null){
            filter=BloomFilter.create(Funnels.unencodedCharsFunnel(),10000);
        }
        //如果布隆过滤器中不存在的值
        if(!filter.mightContain(deviceId)){
            filter.put(deviceId);
            dataBean.setIsN(1);
            bloomFilter.update(filter);
        }

      out.collect(dataBean);
    }
}

job工具类,处理设备浏览数据,标记是否新设备!

package com.zwf.jobs;

import com.alibaba.fastjson2.JSONObject;
import com.zwf.constant.EventID;
import com.zwf.pojo.DataBean;
import com.zwf.udf.DeviceIdBindIsNewFunction;
import com.zwf.utils.FlinkUtils;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;

/**
 * @author MrZeng
 * @version 1.0
 * @date 2024-01-17 9:52
 */
public class UserCount3 {
    public static void main(String[] args) throws Exception {
        //bloomFilter中不存在设备ID的用户是新用户
        DataStream<String> kafkaStream = FlinkUtils.createKafka(args[0], SimpleStringSchema.class);
        SingleOutputStreamOperator<DataBean> dataBeanSource = kafkaStream.map(lines -> JSONObject.parseObject(lines, DataBean.class));
        //过滤出app客户端运行产生的数据
        SingleOutputStreamOperator<DataBean> filter = dataBeanSource.filter(x -> EventID.APP_LAUNCH.equals(x.getEventId()));
        //根据设备类型分区
        filter.keyBy(databean->databean.getDeviceType()).process(new DeviceIdBindIsNewFunction())
                .print();
        FlinkUtils.env.execute();
    }
}
  • 项目实战

实现方案一

按设备类型(huawei-p40)进行keyBy,然后使用BloomFilter进行判断是否为新用户。

存在问题:如果某个设备类型的用户比较多可能会出现数据倾斜,而且一个分区中会有多个组,一个组对应一个BloomFilter,那么一个分区就会有多个BloomFilter,也会占用更多资源。

1649919646858

实现方案二

按照设备ID进行keyBy,然后定义一个OperatorState保存BloomFilter进行判断是否为新用户,一个分区对应一个BloomFilter,更加节省资源,不会出现数据倾斜。

1649919689842

解决问题

如果数据中出现isNew为空或数据字段没有isNew的情况,可以利用此方法解决。

  • 方案一实现代码

  • 自定义函数

package com.zwf.udf;

import com.google.common.hash.BloomFilter;
import com.google.common.hash.Funnels;
import com.zwf.pojo.DataBean;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

/**
 * @author MrZeng
 * @version 1.0
 * @date 2024-01-17 9:58
 */
public class DeviceIdBindIsNewFunction extends KeyedProcessFunction<String, DataBean,DataBean> {

    private transient ValueState<BloomFilter<String>>  bloomFilter;

    @Override
    public void open(Configuration parameters) throws Exception {
        //初始化布隆过滤器
        ValueStateDescriptor<BloomFilter<String>> deviceIdBloom = new ValueStateDescriptor<>("device_Id_Bloom", TypeInformation.of(new TypeHint<BloomFilter<String>>() {}));
         bloomFilter=getRuntimeContext().getState(deviceIdBloom);
    }

    @Override
    public void processElement(DataBean dataBean, KeyedProcessFunction<String, DataBean, DataBean>.Context ctx, Collector<DataBean> out) throws Exception {
        String deviceId = dataBean.getDeviceId();
        BloomFilter<String> filter = bloomFilter.value();
        if(filter==null){
            filter=BloomFilter.create(Funnels.unencodedCharsFunnel(),10000);
        }
        //如果布隆过滤器中不存在的值
        if(!filter.mightContain(deviceId)){
            filter.put(deviceId);
            //新用户
            dataBean.setIsN(1);
            bloomFilter.update(filter);
        }

      out.collect(dataBean);
    }
}
  • job工作类
package com.zwf.jobs;

import com.alibaba.fastjson2.JSONObject;
import com.zwf.constant.EventID;
import com.zwf.pojo.DataBean;
import com.zwf.udf.DeviceIdBindIsNewFunction;
import com.zwf.utils.FlinkUtils;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;

/**
 * @author MrZeng
 * @version 1.0
 * @date 2024-01-17 9:52
 */
public class UserCount3 {
    public static void main(String[] args) throws Exception {
        //bloomFilter中不存在设备ID的用户是新用户
        DataStream<String> kafkaStream = FlinkUtils.createKafka(args[0], SimpleStringSchema.class);
        //将JSON数据转换为dataBean对象
        SingleOutputStreamOperator<DataBean> dataBeanSource = kafkaStream.map(lines -> JSONObject.parseObject(lines, DataBean.class));
        //过滤出app客户端运行产生的数据
        SingleOutputStreamOperator<DataBean> filter = dataBeanSource.filter(x -> EventID.APP_LAUNCH.equals(x.getEventId()));
        //根据设备类型分区  根据设备ID判断是否新用户!
        filter.keyBy(databean->databean.getDeviceType()).process(new DeviceIdBindIsNewFunction())
                .print();
         //执行指令
        FlinkUtils.env.execute();

    }

}
  • 方案二实现代码

解决了在一个TaskManager中创建多个布隆过滤器的问题,减少资源的损耗!实现了在算子层控制布隆过滤器的个数。

package com.zwf.udf;

import com.google.common.hash.BloomFilter;
import com.google.common.hash.Funnels;
import com.zwf.pojo.DataBean;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.*;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import java.util.Collections;

/**
 * @author MrZeng
 * @version 1.0
 * @date 2024-01-17 9:58
 */

/**
 * 在算子层控制布隆过滤器
 */
public class DeviceIdBindIsNewFunctionV2 extends RichMapFunction<DataBean,DataBean> implements CheckpointedFunction {
      //初始化布隆过滤器状态对象
   private transient ListState<BloomFilter<String>> bloomFilterListState;
     //布隆过滤器
   private transient BloomFilter<String> filter;
    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
           //更新布隆过滤器的状态值
           bloomFilterListState.update(Collections.singletonList(filter));
    }

    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {
        //创建状态描述
        ListStateDescriptor<BloomFilter<String>> descriptor = new ListStateDescriptor<>("deviceId-uid-isNew", TypeInformation.of(new TypeHint<BloomFilter<String>>() {}));
          //创建算子控制层
        bloomFilterListState = context.getOperatorStateStore().getListState(descriptor);

           //如果已经保存
        if(context.isRestored()){
           for (BloomFilter<String> bloom:bloomFilterListState.get()){
               //把每个分区的布隆过滤器中的对象遍历出来
               this.filter=bloom;
           }
        }

    }

    @Override
    public DataBean map(DataBean value) throws Exception {
        String deviceId = value.getDeviceId();
        //初始化布隆过滤器 判断列表状态对象中的布隆过滤器是否存在
        if(filter==null){
            filter=BloomFilter.create(Funnels.unencodedCharsFunnel(),10000);
        }
        if(!filter.mightContain(deviceId)){
            filter.put(deviceId);
            value.setIsN(1);
        }

        return value;
    }
}

job工作类创建

package com.zwf.jobs;

import com.alibaba.fastjson2.JSONObject;
import com.zwf.constant.EventID;
import com.zwf.pojo.DataBean;
import com.zwf.udf.DeviceIdBindIsNewFunctionV2;
import com.zwf.utils.FlinkUtils;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
/**
 * @author MrZeng
 * @version 1.0
 * @date 2024-01-17 9:52
 */
public class UserCount4 {
    public static void main(String[] args) throws Exception {
        //bloomFilter中不存在设备ID的用户是新用户
        DataStream<String> kafkaStream = FlinkUtils.createKafka(args[0], SimpleStringSchema.class);
        SingleOutputStreamOperator<DataBean> dataBeanSource = kafkaStream.map(lines -> JSONObject.parseObject(lines, DataBean.class));
        //过滤出app客户端运行产生的数据
        SingleOutputStreamOperator<DataBean> filterData = dataBeanSource.filter(x -> EventID.APP_LAUNCH.equals(x.getEventId()));
        //根据设备Id进行分区  然后在算子层控制布隆过滤器  可以形成在一个分区内共用一个布隆过滤器
        filterData.keyBy(lines->lines.getDeviceId()).map(new DeviceIdBindIsNewFunctionV2()).print();
        FlinkUtils.env.execute();
    }
}

4、使用状态后端

  • 状态后端依赖
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-statebackend-rocksdb_2.11</artifactId>
    <version>1.12.7</version>
    <scope>provided</scope>
</dependency>

对于数据量小使用内存存储状态,计算效率会更高,但是对于数据量非常大的情况下,为了考虑数据安全性,应该存储到RockDB上!

  • Flink工具类进行状态后端改进
/**
     *
     * @param args  参数
     * @param kafkaMeta  kafka自定义反序列化器获取kafka元数据
     * @return
     * @param <T>
     * @throws Exception
     */
    public static <T> DataStream<T> createKafkaV2(String args,Class<? extends KafkaDeserializationSchema<T>> kafkaMeta) throws Exception {
        param=ParameterTool.fromPropertiesFile(args);
        Long interval = param.getLong("checkpoint.interval", 3000L);
        String chkPath = param.get("checkpoint.path");
        String topics = param.getRequired("kafka.topics");
        List<String> topicList = Arrays.asList(topics.split(","));
        //开启状态保存
        env.enableCheckpointing(interval,CheckpointingMode.EXACTLY_ONCE);
         //设置RockDB状态后端
        env.setStateBackend(new EmbeddedRocksDBStateBackend());
        env.getCheckpointConfig().setCheckpointStorage(chkPath);
       //设置外部存储清除策略
        env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        Properties properties = param.getProperties();
        //设置kafka source
        FlinkKafkaConsumer<T> consumer = new FlinkKafkaConsumer(topicList, kafkaMeta.newInstance(), properties);
       //是否应在检查点上将偏移量提交回Kafka
        consumer.setCommitOffsetsOnCheckpoints(false);
        return env.addSource(consumer);
    }

5、使用ClickHouse进行多维复杂统计

使用clickhouse数据库创表时使用ReplacingMergeTree引擎可以进行去重,但是数据不能立即merge,需要手动optimize或者后台自动合并。

问题解决方案:查询时在表名后面追加final关键字,就只查最新的数据,但是效率变低了!

  • 设计clickhouse表
1. 可以支持维度查询(大宽表)
2. 按照时间段进行查询(将时间作为表的字段并且建分区表)
3. 可以统计出PV、UV(去重查询)
4. 支持分区(按照时间进行分区)
5. 支持覆盖(ReplacingMergeTree)(对查询结果准确性要求高的,表名后面加final)
6. 生成一个唯一的ID (在Kafka中生成唯一的ID,topic+分区+偏移量)
7. 相同的数据要进入到相同的分区(按照数据的时间即EventTime进行分区)
  • sql语句
drop table  tb_user_event;
CREATE TABLE tb_user_event
(
    `id` String comment '数据唯一id,使用Kafka的topic+分区+偏移量',
    `deviceId` String comment '设备ID',
    `eventId` String comment '事件ID',
    `isNew` UInt8 comment '是否是新用户1为新,0为老',
    `os` String comment '系统名称',
    `province` String comment '省份',
    `channel` String comment '下载渠道',
    `deviceType` String comment '设备类型',
    `eventTime` DateTime64 comment '数据中所携带的时间',
    `date` String comment 'eventTime转成YYYYMMDD格式',
    `hour` String comment 'eventTime转成HH格式列席',
    `processTime` DateTime comment '插入到数据库时的系统时间'
)
    ENGINE = ReplacingMergeTree(processTime)
        PARTITION BY (date, hour)
        ORDER BY id;
  • 编写代码使用自定义KafkaDeserializationSchema读取kafka数据中的topic、partition、offset,将他们拼接成唯一的ID。
package com.zwf.kafka;

/**
 * @author MrZeng
 * @version 1.0
 * @date 2024-01-17 15:31
 */

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.kafka.clients.consumer.ConsumerRecord;


/**
 * 创建自定义反序列化器
 * id=kafka主题+分区ID+offset
 * value=message(UTF-8)
 */
public class kafkaToUniqueId implements KafkaDeserializationSchema<Tuple2<String,String>> {
      //是否到了最后一个offset  消息发送完成
    @Override
    public boolean isEndOfStream(Tuple2<String, String> nextElement) {
        return false;
    }

    //反序列化操作
    @Override
    public Tuple2<String, String> deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
        //获取主题 分区id offset
        String topic = record.topic();
        int partition = record.partition();
        long offset = record.offset();
        String id=topic+"-"+partition+"-"+offset;
        //获取message信息
        String mess = new String(record.value());
        return Tuple2.of(id,mess);
    }

    @Override
    public TypeInformation<Tuple2<String, String>> getProducedType() {
        return Types.TUPLE(Types.STRING,Types.STRING);
    }
}
  • 测试类
package com.zwf.Demo;

import com.zwf.kafka.kafkaToUniqueId;
import com.zwf.utils.FlinkUtils;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;

/**
 * @author MrZeng
 * @version 1.0
 * @date 2024-01-17 15:54
 */
public class KafkaMetaTest {

    public static void main(String[] args) throws Exception {
        DataStream<Tuple2<String, String>> kafkaV2 = FlinkUtils.createKafkaV2(args[0], kafkaToUniqueId.class);
        kafkaV2.print();
        FlinkUtils.env.execute();
    }
}

6、Flink将数据写入clickhouse

环境依赖

   <dependency>
      <groupId>com.clickhouse</groupId>
      <artifactId>clickhouse-jdbc</artifactId>
      <version>0.4.6</version>
    </dependency>
  <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-base</artifactId>
      <version>1.15.2</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-jdbc</artifactId>
      <version>1.15.2</version>
    </dependency>

先建好表,再使用jdbc把flink计算好的数据写入clickhouse数据库中,可以快速查询!

  • 案例DEMO
create table t_user(
    uid Int32,
    uname FixedString(10),
    age Int32,
    processTime datetime64
)
engine =ReplacingMergeTree(processTime)
order by uid;
  • 案例代码
package com.zwf.Demo;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.sql.Timestamp;


/**
 * @author MrZeng
 * @version 1.0
 * @date 2024-01-17 16:16
 */
public class TestClickHouseJDBCSink {
    public static void main(String[] args) throws Exception {
        //测试使用jdbc操作clickhouse
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> source = environment.socketTextStream("node1", 8888);
        SingleOutputStreamOperator<Tuple3<Integer, String, Integer>> mapSource = (SingleOutputStreamOperator<Tuple3<Integer, String, Integer>>) source.map(new MapFunction<String, Tuple3<Integer, String, Integer>>() {
            @Override
            public Tuple3<Integer, String, Integer> map(String value) throws Exception {
                String[] split = value.split(",");

                return Tuple3.of(Integer.parseInt(split[0]), split[1], Integer.parseInt(split[2]));
            }
        });
       mapSource.addSink(JdbcSink.sink("insert into t_user  values(?,?,?,?)",(pst,rs)->{
                             pst.setInt(1,rs.f0);
                             pst.setString(2,rs.f1);
                             pst.setInt(3,rs.f2);
                             pst.setTimestamp(4,new Timestamp(System.currentTimeMillis()));
       },JdbcExecutionOptions.builder().withBatchSize(3).withBatchIntervalMs(5000).build(),
               new JdbcConnectionOptions
                       .JdbcConnectionOptionsBuilder()
                       .withDriverName("com.clickhouse.jdbc.ClickHouseDriver")
                       .withUsername("default")
                       .withPassword("123456")
                       .withUrl("jdbc:clickhouse://192.168.147.110:8123/lives").build()));
       environment.execute();

    }
}
  • 建表(项目SQL)
CREATE TABLE tb_user_event
(
    `id` String comment '数据唯一id,使用Kafka的topic+分区+偏移量',
    `deviceId` String comment '设备ID',
    `eventId` String comment '事件ID',
    `isNew` UInt8 comment '是否是新用户1为新,0为老',
    `os` String comment '系统名称',
    `province` String comment '省份',
    `channel` String comment '下载渠道',
    `deviceType` String comment '设备类型',
    `eventTime` DateTime64 comment '数据中所携带的时间',
    `date` String comment 'eventTime转成YYYYMMDD格式',
    `hour` String comment 'eventTime转成HH格式列席',
    `processTime` DateTime comment '插入到数据库时的系统时间'
)
    ENGINE = ReplacingMergeTree(processTime)
        PARTITION BY (date, hour)
        ORDER BY id;
  • JSON转DataBean(项目代码)
package com.zwf.udf;

import com.alibaba.fastjson2.JSONObject;
import com.zwf.pojo.DataBean;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;

/**
 * @author MrZeng
 * @version 1.0
 * @date 2024-01-17 20:32
 */

/**
 * 把kafka中的json数据封装进dataBean对象中
 */
public class JsonToDataBeanV2 extends ProcessFunction<Tuple2<String,String>, DataBean> {

    @Override
    public void processElement(Tuple2<String, String> value, ProcessFunction<Tuple2<String, String>, DataBean>.Context ctx, Collector<DataBean> out) throws Exception {
        DataBean dataBean = JSONObject.parseObject(value.f1, DataBean.class);
        dataBean.setId(value.f0);
        out.collect(dataBean);
    }
}
  • 经纬度转省市区名称后,再存入clickhouse数据库中,将批量数据写入clickhouse。
package com.zwf.jobs;

import com.zwf.kafka.kafkaToUniqueId;
import com.zwf.pojo.DataBean;
import com.zwf.udf.JsonToDataBeanV2;
import com.zwf.udf.LocationFunction;
import com.zwf.utils.FlinkUtils;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;


import java.sql.Timestamp;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.concurrent.TimeUnit;

/**
 * @author MrZeng
 * @version 1.0
 * @date 2024-01-17 20:30
 */
public class DataToClickHouse {
    public static void main(String[] args) throws Exception {
        //把时间日期封装进dataBean中
        DataStream<Tuple2<String, String>> kafkaV2 = FlinkUtils.createKafkaV2(args[0], kafkaToUniqueId.class);
        //获取amap配置信息  以及  clickhouse信息
        String amapKey = FlinkUtils.param.get("amap.http.key");
        String amapUrl = FlinkUtils.param.get("amap.http.url");
        int batchSize = FlinkUtils.param.getInt("clickhouse.batch.size");
        long batchInterval = FlinkUtils.param.getLong("clickhouse.batch.interval");
        //把时间戳转为date hour
        SingleOutputStreamOperator<DataBean> mapSource = kafkaV2.process(new JsonToDataBeanV2()).map(new MapFunction<DataBean, DataBean>() {
            @Override
            public DataBean map(DataBean value) throws Exception {
                DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyyMMdd-HH");
                Instant instant = Instant.ofEpochMilli(value.getTimestamp());
                ZoneId zoneId = ZoneId.systemDefault();
                String dateTime = LocalDateTime.ofInstant(instant, zoneId).format(dateTimeFormatter);
                String[] split = dateTime.split("-");
                value.setDate(split[0]);
                value.setHour(split[1]);
                return value;
            }
        });
        AsyncDataStream.unorderedWait(mapSource,new LocationFunction(amapUrl,amapKey,50),5, TimeUnit.SECONDS)
                .addSink(JdbcSink.sink("insert into tb_user_event values(?,?,?,?,?,?,?,?,?,?,?,?)",(pts,t)->{
                                pts.setString(1,t.getId());
                                pts.setString(2,t.getDeviceId());
                                pts.setString(3,t.getEventId());
                                pts.setInt(4,t.getIsNew());
                                pts.setString(5,t.getOsName());
                                pts.setString(6,t.getProvince());
                                pts.setString(7,t.getReleaseChannel());
                                pts.setString(8,t.getDeviceType());
                                pts.setTimestamp(9,new Timestamp(t.getTimestamp()));
                                pts.setString(10,t.getDate());
                                pts.setString(11,t.getHour());
                                pts.setTimestamp(12,new Timestamp((System.currentTimeMillis()/1000)));
                }, JdbcExecutionOptions.builder()
                                .withBatchSize(batchSize)
                                .withBatchIntervalMs(batchInterval).build()
                        ,new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                                .withUrl("jdbc:clickhouse://192.168.147.110:8123/lives")
                                .withUsername("default")
                                .withPassword("123456")                        .withDriverName("com.clickhouse.jdbc.ClickHouseDriver").build()));
        FlinkUtils.env.execute();

    }
}

7、直播数据分析

统计每个直播间的累积观看人数(PV、UV)和各个直播间实时在线人数。

实现方式:

  1. 将数据攒起来批量(不是简单的增量聚合,不能使用窗口,而是使用定时器)写入到Redis/MySQL(延迟高、效率高、对数据库压力小)。
  2. 在同一个job中,将数据写入到Clickhouse中(同一个主题(类型)的数据尽量在一个job中完成,将不同的数据打上不同的标签,侧流输出)。

统计直播间的人气值

  • 在直播间中至少停留1分钟
  • 在30分钟之内,同一设备ID频繁进入该直播间,算一个用户的人气值

实现方案

  1. 按照主播ID、deviceId进行KeyBy
  2. 使用ProcessFunction定义定时器
  • 数据格式(进入直播间)
{
    "carrier":"中国电信",
    "deviceId":"aff486e2-1942-45ae-8623-9d6ef1f29b2f",
    "deviceType":"IPHONE-7PLUS",
    "eventId":"liveEnter",
    "id":79,
    "isNew":0,
    "lastUpdate":2021,
    "latitude":37.942441944941706,
    "longitude":115.21183014458548,
    "netType":"WIFI",
    "osName":"ios",
    "osVersion":"8.2",
    "properties":{
        "anchor_id":2,
        "province":"江苏",
        "nickname":"孙悟空",
        "live_session":"CUjeX7wC",
        "category":"游戏"
    },
    "releaseChannel":"AppStore",
    "resolution":"2048*1024",
    "sessionId":"mECl1CLhcmGw",
    "timestamp":1616115193340
}
  • 数据格式(离开直播间)
{
    "carrier":"中国电信",
    "deviceId":"aff486e2-1942-45ae-8623-9d6ef1f29b2f",
    "deviceType":"IPHONE-7PLUS",
    "eventId":"liveLeave",
    "id":79,
    "isNew":0,
    "lastUpdate":2021,
    "latitude":37.942441944941706,
    "longitude":115.21183014458548,
    "netType":"WIFI",
    "osName":"ios",
    "osVersion":"8.2",
    "properties":{
        "anchor_id":2,
        "live_session":"CUjeX7wC"
    },
    "releaseChannel":"AppStore",
    "resolution":"2048*1024",
    "sessionId":"mECl1CLhcmGw",
    "timestamp":1616115269951
}

实现方式一:(计算实时在线人数)

package com.zwf.jobs;

import com.zwf.kafka.kafkaToUniqueId;
import com.zwf.pojo.DataBean;
import com.zwf.udf.JsonToDataBeanV2;
import com.zwf.utils.FlinkUtils;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

/**
 * @author MrZeng
 * @version 1.0
 * @date 2024-01-18 9:50
 */
public class LiveAudienceCountV2 {

    public static void main(String[] args) throws Exception {
        DataStream<Tuple2<String, String>> dataStream = FlinkUtils.createKafkaV2(args[0], kafkaToUniqueId.class);
        SingleOutputStreamOperator<DataBean> process =
                dataStream.process(new JsonToDataBeanV2());
        //过滤出eventId以live开头的事件
        SingleOutputStreamOperator<DataBean> liveData = process.filter(bean -> bean.getEventId().startsWith("live"));
        //过滤出进入直播间和离开直播间的事件数据
        SingleOutputStreamOperator<DataBean> leaveOrEnterData = liveData.filter(data -> "liveLeave".equals(data.getEventId()) || "liveEnter".equals(data.getEventId()));
        //按照设备ID和直播间ID进行keyBy
           leaveOrEnterData.keyBy(new KeySelector<DataBean, Tuple2<String,String>>() {
               @Override
               public Tuple2<String, String> getKey(DataBean value) throws Exception {
                   String deviceId = value.getDeviceId();
                   String anchorId = value.getProperties().get("anchor_id").toString();
                   return Tuple2.of(deviceId,anchorId);
               }
           }).process(new KeyedProcessFunction<Tuple2<String, String>, DataBean, Tuple2<String,Integer>>() {
               //进入直播间时间状态
               private  transient ValueState<Long> inTimeState;
               //离开直播间时间状态
               private transient ValueState<Long>  outTimeState;

               @Override
               public void open(Configuration parameters) throws Exception {
                   ValueStateDescriptor<Long> inTimeDes = new ValueStateDescriptor<>("in-Time-state", Types.LONG);
                             inTimeState=getRuntimeContext().getState(inTimeDes);

                   ValueStateDescriptor<Long> outTimeDes = new ValueStateDescriptor<>("out-Time-state", Types.LONG);
                             outTimeState=getRuntimeContext().getState(outTimeDes);
               }

               @Override
               public void processElement(DataBean value, KeyedProcessFunction<Tuple2<String, String>, DataBean, Tuple2<String, Integer>>.Context ctx, Collector<Tuple2<String, Integer>> out) throws Exception {
                   //事件时间
                   Long timestamp = value.getTimestamp();
                   //如果事件时间为空  当前时间就是事件时间
                   if(timestamp==null){
                       timestamp=System.currentTimeMillis();
                   }
                   //事件ID
                   String eventId = value.getEventId();
                   //进入直播间时间
                   if("liveEnter".equals(eventId)){
                       inTimeState.update(timestamp);
                       //超过1分钟后触发定时器操作
                       ctx.timerService().registerProcessingTimeTimer(timestamp+60000+1);
                   }else {
                       //离开直播间
                       outTimeState.update(timestamp);
                       Long inTime = inTimeState.value();
                       //在直播间停留的时间不足一分钟 就删除定时器
                       //第一次离开直播间的事件时间-第二次进入直播间的时间不足一分钟时删除定时器
                       if(timestamp-inTime<60000) {
                           ctx.timerService().deleteProcessingTimeTimer(inTime + 60000 + 1);
                       }
                   }
               }

                  //定时器  //每个直播间的人气值 在直播间时间超过一分钟人气值加1
               @Override
               public void onTimer(long timestamp, KeyedProcessFunction<Tuple2<String, String>, DataBean, Tuple2<String, Integer>>.OnTimerContext ctx, Collector<Tuple2<String, Integer>> out) throws Exception {
                   Long outTimeValue = outTimeState.value();
                   //如果一直在线没有离开直播间 人气值加1
                   if(outTimeValue==null){
                     out.collect(Tuple2.of(ctx.getCurrentKey().f1,1));
                  }else {
                       //再次进入直播间时,下一次进入时间-上一次离开时间 间隔超过30分钟加一次
                       Long inTimeValue = inTimeState.value();
                       if(inTimeValue-outTimeValue>30*60000){
                           out.collect(Tuple2.of(ctx.getCurrentKey().f1,1));
                       }
                   }


               }

               @Override
               public void close() throws Exception {
                   super.close();
               }
               //求和计算人气值
           }).keyBy(x->x.f0).sum(1).print();

          FlinkUtils.env.execute();

    }
}

实现方式二(把数据写入clickhouse):

  • clickhouse建表
CREATE TABLE tb_anchor_audience_count
(
    `id` String comment '数据唯一id,使用Kafka的topic+分区+偏移量',
    `deviceId` String comment '设备ID',
    `eventId` String comment '事件ID',
    `anchor_id` String comment '主播ID',
    `os` String comment '系统名称',
    `province` String comment '省份',
    `channel` String comment '下载渠道',
    `deviceType` String comment '设备类型',
    `eventTime` DateTime64 comment '数据中所携带的时间',
    `date` String comment 'eventTime转成YYYYMMDD格式',
    `hour` String comment 'eventTime转成HH格式列席',
    `processTime` DateTime default now() comment '插入到数据库时的系统时间'
)
    ENGINE = ReplacingMergeTree(processTime)
        PARTITION BY (date, hour)
        ORDER BY id;

select * from tb_anchor_audience_count;
  • 直播间累积在线人数和实时在线人数自定义函数
package com.zwf.udf;

import com.google.common.hash.BloomFilter;
import com.google.common.hash.Funnels;
import com.zwf.pojo.DataBean;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;

/**
 * @author MrZeng
 * @version 1.0
 * @date 2024-01-18 11:14
 */
//求出每个直播间的uv pv数
public class  AnchorDistinctTotalAudienceFunc extends KeyedProcessFunction<String, DataBean, DataBean> {
    //声明三个状态对象 anchor_id(主播ID)  uv(用户访客数)  pv(页面浏览数)  直播间在线人数(onlineCount)
    private transient ValueState<BloomFilter<String>> bloomFilterValueState;
    //直播间用户访问数
    private transient ValueState<Integer> uvState;
    //直播间页面访问数
    private transient ValueState<Integer> pvState;
     //直播间在线人数
    private transient ValueState<Integer> onlineCount;
    //侧输出标签
     private OutputTag<Tuple4<String,Integer,Integer,Integer>> outputTag=new OutputTag<>("out_streaming",TypeInformation.of(new TypeHint<Tuple4<String, Integer, Integer, Integer>>() {})){};
    //日期格式化
     private SimpleDateFormat formatDT=new SimpleDateFormat("yyyyMMdd-HH");

    @Override
    public void open(Configuration parameters) throws Exception {
        ValueStateDescriptor<BloomFilter<String>> anchorIdState = new ValueStateDescriptor<>("anchor_id_state", TypeInformation.of(new TypeHint<BloomFilter<String>>() {}));
        bloomFilterValueState=getRuntimeContext().getState(anchorIdState);
        anchorIdState.enableTimeToLive(StateTtlConfig.newBuilder(Time.hours(3)).setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite).build());

        ValueStateDescriptor<Integer> uvStateDescribe = new ValueStateDescriptor<>("uv_state", Types.INT);
        uvState=getRuntimeContext().getState(uvStateDescribe);
        anchorIdState.enableTimeToLive(StateTtlConfig.newBuilder(Time.hours(3)).setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite).build());

        ValueStateDescriptor<Integer> pvStateDescribe = new ValueStateDescriptor<>("pv_state", Types.INT);
        pvState=getRuntimeContext().getState(pvStateDescribe);
        anchorIdState.enableTimeToLive(StateTtlConfig.newBuilder(Time.hours(3)).setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite).build());

        ValueStateDescriptor<Integer> onlineCountStateDescribe = new ValueStateDescriptor<>("online_count", Types.INT);
        onlineCount=getRuntimeContext().getState(onlineCountStateDescribe);
        anchorIdState.enableTimeToLive(StateTtlConfig.newBuilder(Time.hours(3)).setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite).build());
    }

    @Override
    public void processElement(DataBean value, KeyedProcessFunction<String, DataBean, DataBean>.Context ctx, Collector<DataBean> out) throws Exception {
         //获取事件id
        String deviceId = value.getDeviceId();
        BloomFilter<String> bloomFilter = bloomFilterValueState.value();
        Integer uvCount = uvState.value();
        Integer pvCount = pvState.value();
        Integer liveCount = onlineCount.value();
        //防止在线人数抛出NullPointException
        if(liveCount==null){
            liveCount=0;
        }
        //获取系统处理时间
        Long ts = ctx.timerService().currentProcessingTime();
         //定时任务十秒钟执行一次
        Long firstTime=ts-ts%10000+10000;
        //注册处理时间
        ctx.timerService().registerProcessingTimeTimer(firstTime);
        //进入直播间
        if("liveEnter".equals(value.getEventId())){
            if(bloomFilt er==null){
                bloomFilter=BloomFilter.create(Funnels.unencodedCharsFunnel(),100000);
                uvCount=0;
                pvCount=0;
            }
            if(!bloomFilter.mightContain(deviceId)){
                bloomFilter.put(deviceId);
                uvCount++;
                bloomFilterValueState.update(bloomFilter);
                uvState.update(uvCount);
            }
            pvCount++;
            pvState.update(pvCount);
            liveCount++;
            onlineCount.update(liveCount);
        }else {
            liveCount--;
            onlineCount.update(liveCount);
        }
        //获取事件时间封装进dataBean中
        String dateTime = formatDT.format(new Date(value.getTimestamp()));
        String[] dt = dateTime.split("-");
        value.setDate(dt[0]);
        value.setHour(dt[1]);
        //主流输出
         out.collect(value);
    }

      //定时任务触发
    @Override
    public void onTimer(long timestamp, KeyedProcessFunction<String, DataBean, DataBean>.OnTimerContext ctx, Collector<DataBean> out) throws IOException {
        //侧流输出
        String date = formatDT.format(timestamp).split("-")[0];
        //侧输出主播id pv uv 在线人数 到redis中
       ctx.output(outputTag,Tuple4.of(ctx.getCurrentKey()+"_"+date,uvState.value(), pvState.value(),onlineCount.value()));
    }

    @Override
    public void close() throws Exception {
        super.close();
    }
     //侧流标签
    public OutputTag<Tuple4<String, Integer, Integer, Integer>> getOutputTag() {
        return outputTag;
    }
}
  • 计算代码(把anchor_id、pv、uv、onlineCount侧输出进redis中,把DataBean基本信息输出到clickhouse)
package com.zwf.jobs;

import com.zwf.kafka.kafkaToUniqueId;
import com.zwf.pojo.DataBean;
import com.zwf.udf.AnchorDistinctTotalAudienceFunc;
import com.zwf.udf.JsonToDataBeanV2;
import com.zwf.udf.LocationFunction;
import com.zwf.utils.FlinkUtils;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
import java.sql.Timestamp;
import java.util.concurrent.TimeUnit;
/**
 * @author MrZeng
 * @version 1.0
 * @date 2024-01-18 9:50
 */
public class LiveAudienceCount {

    public static void main(String[] args) throws Exception {
        DataStream<Tuple2<String, String>> dataStream = FlinkUtils.createKafkaV2(args[0], kafkaToUniqueId.class);
        //获取amap配置信息  以及  clickhouse信息
        String amapKey = FlinkUtils.param.get("amap.http.key");
        String amapUrl = FlinkUtils.param.get("amap.http.url");
        SingleOutputStreamOperator<DataBean> process =
                dataStream.process(new JsonToDataBeanV2());
        //过滤出eventId以live开头的事件
        SingleOutputStreamOperator<DataBean> liveData = process.filter(bean -> bean.getEventId().startsWith("live"));
        //过滤出进入直播间和离开直播间的事件数据
        SingleOutputStreamOperator<DataBean> leaveOrEnterData = liveData.filter(data -> "liveLeave".equals(data.getEventId()) || "liveEnter".equals(data.getEventId()));
        //根据主播id分组
        KeyedStream<DataBean, String> anchorData = leaveOrEnterData.keyBy(bean -> bean.getProperties().get("anchor_id").toString());
        //求出 每个直播间(anchorId)的pv和uv数 在线人数
        AnchorDistinctTotalAudienceFunc audienceFunc = new AnchorDistinctTotalAudienceFunc();
         //主流输出
        SingleOutputStreamOperator<DataBean> mainStream = anchorData.process(audienceFunc);
           //侧流输出到redis中
        FlinkJedisPoolConfig redisConf = new FlinkJedisPoolConfig.Builder().setHost("192.168.147.110").setDatabase(5)
                .setPort(6379).setPassword("root@123456").build();

        mainStream.getSideOutput(audienceFunc.getOutputTag())
                .addSink(new RedisSink<Tuple4<String, Integer, Integer, Integer>>(redisConf,new LiveToTupleData()));
          //主流存入clickhouse
        int batchSize = FlinkUtils.param.getInt("clickhouse.batch.size");
        long batchInterval = FlinkUtils.param.getLong("clickhouse.batch.interval");
        //进行经纬度切换
        AsyncDataStream.unorderedWait(mainStream,new LocationFunction(amapUrl,amapKey,50),5, TimeUnit.SECONDS)
        .addSink(JdbcSink.sink("insert into tb_anchor_audience_count(id,deviceId,eventId,anchor_id,os,province,channel,deviceType,eventTime,date,hour)\n" +
                        " values(?,?,?,?,?,?,?,?,?,?,?)",(pts, t)->{
                    pts.setString(1,t.getId());
                    pts.setString(2,t.getDeviceId());
                    pts.setString(3,t.getEventId());
                    pts.setString(4,t.getProperties().get("anchor_id").toString());
                    pts.setString(5,t.getOsName());
                    pts.setString(6,t.getProvince()!=null?t.getProvince():"");
                    pts.setString(7,t.getReleaseChannel());
                    pts.setString(8,t.getDeviceType());
                    pts.setTimestamp(9,new Timestamp(t.getTimestamp()));
                    pts.setString(10,t.getDate());
                    pts.setString(11,t.getHour());
                }, JdbcExecutionOptions.builder()
                        .withBatchSize(batchSize)
                        .withBatchIntervalMs(batchInterval).build()
                ,new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                        .withUrl("jdbc:clickhouse://192.168.147.110:8123/lives")
                        .withUsername("default")
                        .withPassword("123456")
                        .withDriverName("com.clickhouse.jdbc.ClickHouseDriver").build()));
        FlinkUtils.env.execute();

    }
public static class LiveToTupleData implements RedisMapper<Tuple4<String, Integer, Integer, Integer>> {

    @Override
    public RedisCommandDescription getCommandDescription() {
        //使用HSET数据格式    大key是LiveToUvPvOnline
        return new RedisCommandDescription(RedisCommand.HSET,"LiveToUvPvOnline");
    }
    //1_20220414  2_20240302
    @Override
    public String getKeyFromData(Tuple4<String, Integer, Integer, Integer> dataKey) {
        return dataKey.f0;
    }

    @Override
    public String getValueFromData(Tuple4<String, Integer, Integer, Integer> dataValue) {
        return dataValue.f1+"_"+dataValue.f2+"_"+dataValue.f3;
    }
}

}
  • SQL版本(clickhouse上操作)
-- 查询实时在线人数
select count(eventId) as onlineNum
from (
         select
                if(eventId = 'liveEnter', 1, -1) as eventId,
                anchor_id
         from tb_anchor_audience_count
         )
group by anchor_id;


-- 计算pv uv
select
    anchor_id,
    count(distinct deviceId) as uv,
    count(id) as pv
from tb_anchor_audience_count
where eventId='liveEnter'
    group by anchor_id;
optimize table tb_anchor_audience_count;

8、直播间礼物分析

按照主播(直播间)统计礼物的积分!

image-20240302162701180

1649048723721

  • 数据格式(直播奖励)
{
    "carrier":"中国电信",
    "deviceId":"9465bb37-deb5-4981-95e8-4c827e6e0104",
    "deviceType":"MI-NOTE",
    "eventId":"liveReward",
    "id":1673,
    "isNew":0,
    "lastUpdate":2021,
    "latitude":32.205310250750024,
    "longitude":114.50768436766687,
    "netType":"3G",
    "osName":"android",
    "osVersion":"8.5",
    "properties":{
        "room_id":100001,
        "anchor_id":1,
        "gift_id":2,
        "live_session":"d5Nc8tGO"
    },
    "releaseChannel":"小米应用商店",
    "resolution":"1024*768",
    "sessionId":"W1HuGkvJ43GD",
    "timestamp":1616309371940
}
  • 实现方案

在MySQL中还有一种礼物表(维表),需要进行关联。

关联维表的解决方案

  1. 每来一条数据查一次数据库(慢、吞吐量低)
  2. 可以使用异步IO(相对快,消耗资源多)
  3. 广播State(最快、适用于少量数据、数据可以变化的)(推荐使用)
  • 统计的具体指标

各个主播的收到礼物的积分值

打赏礼物的数量、受欢迎礼物topN

做多维度的指标统计(ClickHouse)

  • MySQL连接驱动依赖
   <dependency>
      <groupId>mysql</groupId>
      <artifactId>mysql-connector-java</artifactId>
      <version>8.0.16</version>
    </dependency>
  • 编写MySQLSource类,拉取MySQL数据库中的维度数据进行广播与kafka中的数据进行联接。
package com.zwf.source;

import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;

import java.sql.*;

/**
 * @author MrZeng
 * @version 1.0
 * @date 2024-01-18 21:27
 * 拉取MySQL数据库中的维度数据 进行广播 与kafka中的数据进行联接
 */
public class MySQLSource extends RichSourceFunction<Tuple4<Integer,String,Double,Integer>> {
         private Connection connection=null;
         private PreparedStatement pst=null;
         private ResultSet rs=null;
         boolean flag=true;
         //建立jdbc连接配置信息
    @Override
    public void open(Configuration parameters) throws Exception {
        connection=DriverManager.getConnection("jdbc:mysql://master:3306/lives?characterEncoding=utf8&serverTimeZone=Asia/Shanghai","root","Root@123456.");
    }

    @Override
    public void run(SourceContext<Tuple4<Integer, String, Double, Integer>> ctx) throws Exception {
      //初始化时间戳
        long timeStamp=0;
        while (flag){
            String sql="select id,name,points,deleted from tb_live_gift where updateTime>? "+(timeStamp==0?" and deleted=0":"");
            pst = connection.prepareStatement(sql);
            //查询最新时间的数据
            pst.setDate(1,new Date(timeStamp));
            timeStamp=System.currentTimeMillis();
             rs = pst.executeQuery();
             while (rs.next()){
                 int id = rs.getInt(1);
                 String name = rs.getString(2);
                 double points = rs.getDouble(3);
                 int deleted = rs.getInt(4);
                 ctx.collect(Tuple4.of(id,name,points,deleted));
             }
             rs.close();
             Thread.sleep(10000);
        }
    }
      //取消run方法  建立连接时是一个无界流
    @Override
    public void cancel() {
      flag=false;
        try {
            pst.close();
        } catch (SQLException e) {
            throw new RuntimeException(e);
        }
        try {
            connection.close();
        } catch (SQLException e) {
            throw new RuntimeException(e);
        }

    }
}
  • 工作类(mysqlstream广播与kafka流进行联接操作,最后分组求和!)
package com.zwf.jobs;
/**
 * @author MrZeng
 * @version 1.0
 * @date 2024-01-18 22:00
 */
import com.zwf.kafka.kafkaToUniqueId;
import com.zwf.pojo.DataBean;
import com.zwf.source.MySQLSource;
import com.zwf.udf.BroadCastGiftRewardFun;
import com.zwf.udf.JsonToDataBeanV2;
import com.zwf.utils.FlinkUtils;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;

/**
 * 直播间礼物积分数
 */
public class GiftPointCount {
    public static void main(String[] args) throws Exception {
        DataStreamSource<Tuple4<Integer, String, Double, Integer>> mysqlSource = FlinkUtils.env.addSource(new MySQLSource());
        //广播变量(id Tuple.of(name,points)) 数据库字段  
        MapStateDescriptor broadcast_mysql = new MapStateDescriptor("broadcast_mysql", TypeInformation.of(new TypeHint<Integer>() {
        }), TypeInformation.of(new TypeHint<Tuple2<String, Double>>() {}));
        //把Mysql中的数据当做广播流
        BroadcastStream<Tuple4<Integer, String, Double, Integer>> broadcast =
                mysqlSource.broadcast(broadcast_mysql);
        //从kafka中读取数据进行广播窗口联接
        DataStream<Tuple2<String, String>> kafkaStream = FlinkUtils.createKafkaV2(args[0], kafkaToUniqueId.class);
        SingleOutputStreamOperator<DataBean> processed = kafkaStream.process(new JsonToDataBeanV2());
        //直播打赏
        SingleOutputStreamOperator<DataBean> filter = processed.filter(x -> "liveReward".equals(x.getEventId()));
        //广播变量  <gift_id,Tuple2.of(name,points)>
        filter.connect(broadcast).process(new BroadCastGiftRewardFun(broadcast_mysql))
                .keyBy(line->Tuple2.of(line.f0,line.f1),Types.TUPLE(Types.INT,Types.STRING)).sum(2).print();
        FlinkUtils.env.execute();
    }
}

9、商品浏览排名(TopN)

统计10分钟内,各个分类、各种事件类型的热门商品!

  • 实现方案

实现技术:为了将一段时间的数据攒起来,才能统计出TopN。

窗口选择:统计10分钟,每隔一分钟统计一次结果,为了得到的结果比较准确(平滑)我们使用滑动窗口。窗口的长度和滑动的步长可以根据实际情况进行调整。

时间类型:为了精确的统计出用户在实际浏览、加入购物车、下单等热门商品的信息的变化情况,使用EventTime类型的窗口。

  • Flink SQL案例

image-20240305104840021

  • 数据格式
{
    "carrier":"中国联通",
    "deviceId":"e9535ff8-4e58-417f-b2fe-8ed261a0a97c",
    "deviceType":"IPHONE-10",
    "eventId":"productView",
    "id":1718,
    "isNew":1,
    "latitude":40.53040777609875,
    "longitude":112.49708971837704,
    "netType":"4G",
    "osName":"ios",
    "osVersion":"9.2",
    "properties":{
        "room_id":100001,
        "anchor_id":1,
        "category_id":1008009,
        "product_id":1006002,
        "brand_id":"0"
    },
    "releaseChannel":"AppStore",
    "resolution":"2048*1024",
    "sessionId":"UNICsYI45Hwb",
    "timestamp":1616309372366
}
  • 字段含义

eventId:事件类型,productView(浏览)、productAddCart:(加入购物车)、productOrder(下单)

timestamp:事件时间,因为要按照EventTime划分窗口

category_id:商品分类

product_id:商品ID

  • 实现步骤
  1. 对数据进行KeyBy。
  2. 划分窗口。
  3. 在窗口中进行聚合。
  4. 排序(倒排)。
  • 代码实现(Job类)
package com.zwf.jobs;

/**
 * @author MrZeng
 * @version 1.0
 * @date 2024-01-19 10:31
 */

import com.zwf.kafka.kafkaToUniqueId;
import com.zwf.pojo.DataBean;
import com.zwf.udf.HotGoodsAggregateFunction;
import com.zwf.udf.HotGoodsWindowFunction;
import com.zwf.udf.JsonToDataBeanV2;
import com.zwf.udf.TopNWinFunction;
import com.zwf.utils.FlinkUtils;
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import java.time.Duration;


/**
 * 热门商品排名
 */
public class HotGoodsCount {
    public static void main(String[] args) throws Exception {
       //获取kafka中的数据
        DataStream<Tuple2<String, String>> KafkaSource = FlinkUtils.createKafkaV2(args[0], kafkaToUniqueId.class);
        FlinkUtils.env.setParallelism(3);
        SingleOutputStreamOperator<DataBean> process = KafkaSource.process(new JsonToDataBeanV2());
        //过滤出事件Id以product开头的数据
        SingleOutputStreamOperator<DataBean> filterData = process.filter(data -> data.getEventId().equals("productView")||data.getEventId().equals("productAddCart"));
        //设置水位线  允许迟到5s时间
        SingleOutputStreamOperator<DataBean> watermarks = filterData.assignTimestampsAndWatermarks(WatermarkStrategy.<DataBean>forBoundedOutOfOrderness(Duration.ofMillis(5000))
                .withTimestampAssigner(new SerializableTimestampAssigner<DataBean>() {
                    @Override
                    public long extractTimestamp(DataBean element, long recordTimestamp) {
                        return element.getTimestamp();
                    }
                }));
        //对事件ID和商品分类以及商品分组
        KeyedStream<DataBean, Tuple3<String,Integer, Integer>> keyedStream = watermarks.keyBy(line -> {
            Integer categoryId = Integer.parseInt(line.getProperties().get("category_id").toString());
            Integer productId =Integer.parseInt(line.getProperties().get("product_id").toString());
            String eventId = line.getEventId();
            return Tuple3.of(eventId, categoryId, productId);
            //窗口大小是10分钟  滑动间隔是1分钟
            //增量聚合计算(reduce、aggregate,sum)  不要全量聚合计算(process)
        }, Types.TUPLE(Types.STRING, Types.INT, Types.INT));
        //同一个类别商品的浏览量
       keyedStream.window(SlidingEventTimeWindows.of(Time.minutes(10),Time.minutes(1)))
                   .aggregate(new HotGoodsAggregateFunction(),new HotGoodsWindowFunction())
                   .keyBy(itemBean->{
                       String eventId = itemBean.eventId;
                       Integer categoryId = itemBean.categoryId;
                       Long winStart = itemBean.winStart;
                       Long winEnd = itemBean.winEnd;
                       return Tuple4.of(eventId,categoryId,winStart,winEnd);
                   },Types.TUPLE(Types.STRING,Types.INT, Types.LONG,Types.LONG))
                   .process(new TopNWinFunction()).print();

           FlinkUtils.env.execute();

    }
}
  • 自定义聚合函数(HotGoodsAggregateFunction)
package com.zwf.udf;

import com.zwf.pojo.DataBean;
import org.apache.flink.api.common.functions.AggregateFunction;


/**
 * @author MrZeng
 * @version 1.0
 * @date 2024-01-19 11:21
 */
public class HotGoodsAggregateFunction implements AggregateFunction<DataBean,Long,Long> {

    @Override
    public Long createAccumulator() {
        return 0L;
    }

    @Override
    public Long add(DataBean value, Long accumulator) {
        return accumulator+1;
    }

    @Override
    public Long getResult(Long accumulator) {
        return accumulator;
    }

    @Override
    public Long merge(Long a, Long b) {
        return null;
    }
}
  • 自定义窗口函数(获取窗口和自定义聚合函数的值)
package com.zwf.udf;

import com.zwf.pojo.DataBean;
import com.zwf.pojo.ItemEventCount;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

/**
 * @author MrZeng
 * @version 1.0
 * @date 2024-01-19 11:26
 */

/**
 * Long: 商品点击量     Input
 * ItemEventCount 商品选项信息   Out
 * Tuple3<String,Integer,Integer>事件ID  商品分类ID 商品ID    keyedValue
 */
public class HotGoodsWindowFunction implements WindowFunction<Long, ItemEventCount, Tuple3<String,Integer,Integer>, TimeWindow> {
    @Override
    public void apply(Tuple3<String, Integer, Integer> data, TimeWindow window, Iterable<Long> input, Collector<ItemEventCount> out) throws Exception {
        String eventId = data.f0;
        Integer categoryId = data.f1;
        Integer productId = data.f2;
        //获取传入的数据  自定义聚合函数的增量结果 0 1 2 3 4 5 6
        Long next = input.iterator().next();
        out.collect(new ItemEventCount(eventId,categoryId,productId,next, window.getStart(), window.getEnd()));

    }
}
  • 自定义排名窗口函数(TopNWinFunction)
package com.zwf.udf;

import com.zwf.pojo.ItemEventCount;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

import java.util.ArrayList;
import java.util.List;

/**
 * @author MrZeng
 * @version 1.0
 * @date 2024-01-19 12:08
 */
public class TopNWinFunction extends KeyedProcessFunction<Tuple4<String,Integer,Long,Long>, ItemEventCount, List<ItemEventCount>> {
     //定义状态存储前三个对象值
    private  transient ValueState<List<ItemEventCount>> valueStateList;

    @Override
    public void open(Configuration parameters) throws Exception {
        ValueStateDescriptor<List<ItemEventCount>> stateDescriptor = new ValueStateDescriptor<>("top-N-ItemEventCount", TypeInformation.of(new TypeHint<List<ItemEventCount>>() {}));
        valueStateList=getRuntimeContext().getState(stateDescriptor);
    }

    @Override
    public void processElement(ItemEventCount value, KeyedProcessFunction<Tuple4<String, Integer, Long, Long>, ItemEventCount, List<ItemEventCount>>.Context ctx, Collector<List<ItemEventCount>> out) throws Exception {
         //获取状态中的值
        List<ItemEventCount> ivc = valueStateList.value();
        if(ivc==null){
            ivc=new ArrayList<>();
        }
        ivc.add(value);
        valueStateList.update(ivc);
          //窗口计算完后触发排序
        ctx.timerService().registerEventTimeTimer(value.winEnd+1);
    }
      //进行排序
    @Override
    public void onTimer(long timestamp, KeyedProcessFunction<Tuple4<String, Integer, Long, Long>, ItemEventCount, List<ItemEventCount>>.OnTimerContext ctx, Collector<List<ItemEventCount>> out) throws Exception {
       //获取状态中的排序对象
        List<ItemEventCount> eventCounts = valueStateList.value();
        eventCounts.sort((a,b)->Long.compare(a.count,b.count));
        //取前3个值
        ArrayList<ItemEventCount> topN = new ArrayList<>();
        for(int i=0;i<Math.min(3,eventCounts.size());i++){
            topN.add(eventCounts.get(i));
        }
        out.collect(topN);
    }

    @Override
    public void close() throws Exception {
        super.close();
    }
}
  • pojo类(ItemEventCount)
package com.zwf.pojo;


/**
 * @author MrZeng
 * @version 1.0
 * @date 2024-01-19 11:29
 */
public class ItemEventCount {
    public String eventId;
    public Integer categoryId;
    public Integer productId;
    public Long count;
    public Long winStart;
    public Long winEnd;

    public ItemEventCount() {
    }

    public ItemEventCount(String eventId, Integer categoryId, Integer productId, Long count, Long winStart, Long winEnd) {
        this.eventId = eventId;
        this.categoryId = categoryId;
        this.productId = productId;
        this.count = count;
        this.winStart = winStart;
        this.winEnd = winEnd;
    }

    @Override
    public String toString() {
        return "ItemEventCount{" +
                "eventId='" + eventId + '\'' +
                ", categoryId=" + categoryId +
                ", productId=" + productId +
                ", count=" + count +
                ", winStart=" + winStart +
                ", winEnd=" + winEnd +
                '}';
    }
}

四、业务数据实时采集

1648795831582

业务数据库就是MySQL(集群),假设直接在业务数据库中写SQL进行查询,如果复杂的查询(多维度聚合、join、并且数据流比较大)业务数据库的性能就会下降。甚至不能完成正常的业务功能(不能完成普通的业务数据的查询、插入、修改或是性能下降)

  • 离线和实时数据采集链路

离线:MySQL/Oracle=>Sqoop/SparkSQL/DataX=>HDFS(Hive)

实时:MySQL=>cannel(数据同步工具)=>kafka(消息队列)

1、Cannal采集方案

image-20240305115254341

Cannal是基于数据库的binlog技术,进行业务数据同步拉取,因此在使用cannal工具时一定要开启binlog写入功能,配置binlog模式为row(Mysql8.0及其以上默认开启binlog技术)

缺点:无法同步历史数据,只能同步最新数据。如果要同步全量数据,应该先开启canal再重新导入全部SQL数据,就会进行数据的全量同步。

  • 修改MySQL配置文件
vim /etc/my.cnf
  • 配置文件内容如下:
[mysqld] 
log-bin=mysql-bin #添加这⼀⾏就ok 
binlog-format=ROW #选择row模式 
server_id=1 #配置mysql replaction需要定义,不能和canal的slaveId重复 
  • 重启MySQL
service mysqld restart
  • 在Mysql中添加一个新的用户(可选:也可以使用Root账户,只需要在配置文件中配置即可)
CREATE USER canal IDENTIFIED BY '123456';

可选:如果因为数据库的安全级别对密码要求比较高,无法正常创建用户,需要修改安全策略和密码策略在添加用户(生成环境不用修改,要使用复杂的密码)

set global validate_password_policy=LOW; 
set global validate_password_length=6;
  • 授权
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
  • 刷新权限
FLUSH PRIVILEGES;

2、安装Canal

  • 下载Cannal1.1.4版本(canal.deployer-1.1.4.tar.gz)

https://github.com/alibaba/canal/releases

  • 上传到Canal服务器上,然后解压

注意:Cannal可以与MySQL安装在同一台服务器,也可以不在同一台服务器,本次安装在同一台服务器上。

  • 修改Canal的主配置文件Cannal.properties
#canal跟kafka整合,将数据发送到kafka
canal.serverMode = kafka
#指定kafka broker地址
canal.mq.servers = node1:9092,node2:9092,master:9092
#数据发送kafka失败重试次数
canal.mq.retries = 10
# 配置元数据
canal.instance.tsdb.url = jdbc:mysql://master:3306/canal_tsdb?characterEncoding=utf-8&serverTimeZone=UTC&useSSL=false
canal.instance.tsdb.dbUsername = root
canal.instance.tsdb.dbPassword = Root@123456.
canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xml
  • 修改canal的实例配置⽂件${CANAL_HOME}/example/instance.propertie
#mysql数据库的地址
canal.instance.master.address=master:3306
#mysql用户名
canal.instance.dbUsername=root
#mysql密码
canal.instance.dbPassword=Root@123456.
#注释掉使用默认的topic(将数据写入到默认的Topic)
canal.mq.topic=example
# dynamic topic route by schema or table regex
#将lives的数据库的tb_order表发送到kafka的order-main的topic
canal.mq.dynamicTopic=order-main:lives\\.tb_order,order-detail:lives\\.tb_order_goods
# 配置存放kafka主题的分区数量
canal.mq.partitionsNum=3
  • 启动Canal
bin/startup.sh
  • 开启kafka消费者进行测试消费
kafka-console-consumer.sh --bootstrap-server node1:9092,master:9092,node2:9092 --topic order-main --from-beginning

3、实时统计订单相关指标

  • 相关分析指标:

    • 直播间主播带货总金额、商品成交(下单)数量

    • 直播间主播带商品各个分类的成交金额、商品成交(下单)数量

    • 一天内中的成交金额

    • 一天各个分类成交金额(维度:省份、操作系统、手机型...)

    • 各种下单后商品的状态跟踪(下单 -> 支付 -> 发货 -> 签收 -> 评价)(下单 -> 用户取消 -> 系统取消

  • 需求分析(各个分类成交金额)

用户表:tb_live_user (用户ID)

订单主表:tb_order (订单ID、用户ID、订单状态、订单金额、下单时间、更新时间)

订单明细表:tb_order_gooods(订单主表ID、sku、数量、单价、分类ID、直播间ID)

主播房间表: tb_live_room(直播间ID、主播ID

主播表: tb_live_anchor (主播ID)

  • 需求所需数据

​ 订单状态为成交(主表 tb_order--->order-main)

​ 商品的分类ID(明细表 tb_order_goods--->order-detail)

​ 商品的金额=单价*数量(明细表 tb_order_goods--->order-detail)

  • 实现方案

需要对双流进行left join,把tb_order_goods数据流作为左表,把tb_order数据流作为右表,Flink中可以使用coGroup方法来实现

  • 可能出现的问题

​ 1、右表迟到(订单明细,null)

​ 2、左右表无迟到(订单明细、订单主表)

​ 3、左表迟到的数据(订单明细 侧流输出)

1649317223914

  • 代码实现
package com.zwf.jobs;

import com.zwf.pojo.OrderDetail;
import com.zwf.pojo.OrderMain;
import com.zwf.udf.JsonToOrderDetail;
import com.zwf.udf.JsonToOrderMain;
import com.zwf.udf.OrderLeftJoinFunc;
import com.zwf.utils.FlinkUtils;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import java.sql.*;
import java.time.Duration;
/**
 * 左表是明细表 使用left join必须有左表,如果左表数据遗漏,可以侧流输出,再与CoGroup进行union,如果右表
 * 不存在可以异步IO请求访问Mysql进行查询!
 * 左表是大表  右表是小表  大表join小表
 * ctrl+alt+F7
 */

/**
 * @author MrZeng
 * @version 1.0
 * @date 2024-01-20 18:58
 */
public class OrderCount {
    public static void main(String[] args) throws Exception {
        ParameterTool tools=ParameterTool.fromPropertiesFile(args[0]);
        FlinkUtils.env.setParallelism(1);
        //获取配置文件中两个主题的配置文件
        String orderMain = tools.getRequired("kafka.input.main");
        String orderDetail = tools.getRequired("kafka.input.detail");
        //orderMain数据流
        DataStream<String> rightDataStream = FlinkUtils.createKafkaV2(tools, orderMain, SimpleStringSchema.class);
        //orderDetail数据流
        DataStream<String> leftDataStream = FlinkUtils.createKafkaV2(tools, orderDetail, SimpleStringSchema.class);
         //过滤出type=insert/update/delete  并把json封装到javaBean对象中
        SingleOutputStreamOperator<OrderMain> orderMainStream = rightDataStream.process(new JsonToOrderMain());

        SingleOutputStreamOperator<OrderDetail> orderDetailStream = leftDataStream.process(new JsonToOrderDetail());
        //设置水位线  并使用更新时间作为事件时间
        SingleOutputStreamOperator<OrderMain> orderMainWaterMark = orderMainStream.assignTimestampsAndWatermarks(WatermarkStrategy.<OrderMain>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                .withTimestampAssigner(new SerializableTimestampAssigner<OrderMain>() {
                    @Override
                    public long extractTimestamp(OrderMain element, long recordTimestamp) {
                          //orderMain中的更新时间戳作为事件时间
                        return element.getUpdate_time().getTime();
                    }
                }));

        SingleOutputStreamOperator<OrderDetail> orderDetailWaterMark = orderDetailStream.assignTimestampsAndWatermarks(WatermarkStrategy.<OrderDetail>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                .withTimestampAssigner(new SerializableTimestampAssigner<OrderDetail>() {
                    @Override
                    public long extractTimestamp(OrderDetail element, long recordTimestamp) {
                        //orderDetail中的更新时间戳作为事件时间
                        return element.getUpdate_time().getTime();
                    }
                }));

        OutputTag<OrderDetail> outputTag = new OutputTag<>("order-detail", TypeInformation.of(new TypeHint<OrderDetail>() {})) {};
        //对order_id进行分组 //使用滚动窗口  每5s滚动一次
      //左表迟到数据侧流输出
        SingleOutputStreamOperator<OrderDetail> orderDetailWindow = orderDetailWaterMark.keyBy(bean -> bean.getOrder_id())
                //5s 滚动一次窗口
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                //迟到数据侧流输出
                .sideOutputLateData(outputTag)
                .apply(new WindowFunction<OrderDetail, OrderDetail, Long, TimeWindow>() {
                    @Override
                    public void apply(Long aLong, TimeWindow window, Iterable<OrderDetail> input, Collector<OrderDetail> out) throws Exception {
                        for (OrderDetail detail : input) {
                            out.collect(detail);
                        }
                    }
                });

        //获取侧流输出数据
        DataStream<OrderDetail> sideOutput = orderDetailWindow.getSideOutput(outputTag);
        //侧流数据输出处理
        SingleOutputStreamOperator<Tuple2<OrderDetail, OrderMain>> lateOrderDetailStream = sideOutput.map(new MapFunction<OrderDetail, Tuple2<OrderDetail, OrderMain>>() {
            @Override
            public Tuple2<OrderDetail, OrderMain> map(OrderDetail value) throws Exception {
                return Tuple2.of(value, null);
            }
        });
        //窗口联接join  coGroup   orderDetail做左表  orderMain做右表
        //两个设置水位线的表进行left join
        DataStream<Tuple2<OrderDetail, OrderMain>> JoinedStream = orderDetailWaterMark.coGroup(orderMainWaterMark)
                .where(line -> line.getOrder_id())
                .equalTo(bean -> bean.getOid())
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .apply(new OrderLeftJoinFunc());
//将CoGroup的流和迟到的流,Union在一起  使用相同的方式处理 迟到的流也是左表
        DataStream<Tuple2<OrderDetail, OrderMain>> unionStream = JoinedStream.union(lateOrderDetailStream);
     //将没有关联的右表数据 查询数据关联左表数据
        SingleOutputStreamOperator<Tuple2<OrderDetail, OrderMain>> res = unionStream.map(new MapFunction<Tuple2<OrderDetail, OrderMain>, Tuple2<OrderDetail, OrderMain>>() {
            @Override
            public Tuple2<OrderDetail, OrderMain> map(Tuple2<OrderDetail, OrderMain> value) throws Exception {
                //如果右表为空  通过order_id查询数据库中orderMain的数据
                if (value.f1 == null) {
                    //根据orderId查询OrderMain数据赋值给value.f1
                    value.f1=QueryJDBCOrderMain.getQueryOrderMain(value.f0.getOrder_id());
                }
                return value;
            }
        });
        res.print();

        FlinkUtils.env.execute();
    }

    private static class QueryJDBCOrderMain{
        private static Connection connection=null;
        private static PreparedStatement pst=null;
        private static ResultSet rs=null;

        public static OrderMain getQueryOrderMain(Long order_id){
            try {
                Class.forName("com.mysql.cj.jdbc.Driver");
               connection=DriverManager.getConnection("jdbc:mysql://master:3306/lives?characterEncoding=utf-8&serverTimeZone=UTC","root","Root@123456.");
               String sql="select * from orderMain where oid=? ";
                pst = connection.prepareStatement(sql);
               pst.setLong(1,order_id);
               rs=pst.executeQuery();
                OrderMain main=null;
               while (rs.next()){
                   main= new OrderMain();
                   main.setOid(rs.getLong(1));
                   main.setCreate_time(rs.getDate(2));
                   main.setTotal_money(rs.getDouble(3));
                   main.setStatus(rs.getShort(4));
                   main.setUpdate_time(rs.getDate(5));
                   main.setUid(rs.getLong(6));
                   main.setProvince(rs.getString(7));
               }
               return main;
            } catch (ClassNotFoundException e) {
                throw new RuntimeException(e);
            } catch (SQLException e) {
                throw new RuntimeException(e);
            }finally {
                try {
                    if (rs != null) {
                        rs.close();
                    }
                    if(pst!=null){
                        pst.close();
                    }
                    if(connection!=null){
                        connection.close();
                    }
                }catch (Exception e){

                }
            }

        }

    }

}
  • 两个JSON转javaBean的类
package com.zwf.udf;

import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject;
import com.zwf.pojo.OrderMain;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;

/**
 * @author MrZeng
 * @version 1.0
 * @date 2024-01-20 19:53
 */
public class JsonToOrderMain extends ProcessFunction<String, OrderMain> {
    @Override
    public void processElement(String value, ProcessFunction<String, OrderMain>.Context ctx, Collector<OrderMain> out) throws Exception {
        try {
            //把数据状态从json中获取出来
            JSONObject orderMain = JSONObject.parseObject(value);
            //获取type值
            String type = orderMain.getString("type");
            if ("UPDATE".equals(type) || "DELETE".equals(type) || "INSERT".equals(type)) {
                //获取json对象数组
                JSONArray data = orderMain.getJSONArray("data");

                    //遍历数组中的对象
                    for (int i = 0; i < data.size(); i++) {
                        //获取data中每个对象进行转化
                        OrderMain main = data.getObject(i, OrderMain.class);
                            //把状态封装进pojo中
                            main.setType(type);
                            //把对象输出
                            out.collect(main);
                    }
            }
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}
package com.zwf.udf;

import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject;
import com.zwf.pojo.OrderDetail;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;

/**
 * @author MrZeng
 * @version 1.0
 * @date 2024-01-20 19:53
 */
public class JsonToOrderDetail extends ProcessFunction<String, OrderDetail> {
    @Override
    public void processElement(String value, ProcessFunction<String, OrderDetail>.Context ctx, Collector<OrderDetail> out) throws Exception {
        try {
            //把数据状态从json中获取出来
            JSONObject orderDetail = JSONObject.parseObject(value);
            //获取type值
            String type = orderDetail.getString("type");
            if ("UPDATE".equals(type) || "DELETE".equals(type) || "INSERT".equals(type)) {
                //获取json对象数组
                JSONArray data = orderDetail.getJSONArray("data");
                //遍历数组中的对象
                for (int i = 0; i < data.size(); i++) {
                    //获取data中每个对象进行转化
                    OrderDetail detail = data.getObject(i, OrderDetail.class);
                    //把状态封装进pojo中
                    detail.setType(type);
                    //把对象输出
                    out.collect(detail);
                }

            }
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}
  • 两个数据流进行CoGroup时,进行自定义应用
package com.zwf.udf;

import com.zwf.pojo.OrderDetail;
import com.zwf.pojo.OrderMain;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

/**
 * @author MrZeng
 * @version 1.0
 * @date 2024-01-20 20:33
 */
public class OrderLeftJoinFunc implements CoGroupFunction<OrderDetail, OrderMain, Tuple2<OrderDetail,OrderMain>> {
    @Override
    public void coGroup(Iterable<OrderDetail> first, Iterable<OrderMain> second, Collector<Tuple2<OrderDetail, OrderMain>> out) throws Exception {

        for (OrderDetail detail:first){
            boolean isJoined=false;
            //右表数据存在情况下
            for (OrderMain main:second){
                isJoined=true;
                out.collect(Tuple2.of(detail,main));
            }
            //右表数据不存在 orderMain数据不存在
            if (!isJoined){
                out.collect(Tuple2.of(detail,null));
            }
        }
    }
}
  • OrderMain和OrderDetail两个Bean对象
package com.zwf.pojo;

/**
 * @author MrZeng
 * @version 1.0
 * @date 2024-01-20 19:36
 */

import java.util.Date;

/**
 * 订单表
 */
public class OrderMain {
   private Long oid;  //订单ID
   private Date create_time; //创建时间
   private Double total_money; //总金额
   private Short status;  //状态
   private Date update_time; //信息更新时间
   private Long uid;  //用户id
   private String province; //省
    private String type;  //canal拉取的数据属于update deleted insert create之一
    public Long getOid() {
        return oid;
    }

    public void setOid(Long oid) {
        this.oid = oid;
    }

    public Date getCreate_time() {
        return create_time;
    }

    public void setCreate_time(Date create_time) {
        this.create_time = create_time;
    }

    public Double getTotal_money() {
        return total_money;
    }

    public void setTotal_money(Double total_money) {
        this.total_money = total_money;
    }

    public Short getStatus() {
        return status;
    }

    public void setStatus(Short status) {
        this.status = status;
    }

    public Date getUpdate_time() {
        return update_time;
    }

    public void setUpdate_time(Date update_time) {
        this.update_time = update_time;
    }

    public Long getUid() {
        return uid;
    }

    public void setUid(Long uid) {
        this.uid = uid;
    }

    public String getProvince() {
        return province;
    }

    public void setProvince(String province) {
        this.province = province;
    }

    public String getType() {
        return type;
    }

    public void setType(String type) {
        this.type = type;
    }

    @Override
    public String toString() {
        return "OrderMain{" +
                "oid=" + oid +
                ", create_time=" + create_time +
                ", total_money=" + total_money +
                ", status=" + status +
                ", update_time=" + update_time +
                ", uid=" + uid +
                ", province='" + province + '\'' +
                '}';
    }
}
package com.zwf.pojo;

/**
 * @author MrZeng
 * @version 1.0
 * @date 2024-01-20 19:36
 */

import java.util.Date;

/**
 * 商品详情表
 */
public class OrderDetail {
    private Long id;  //商品详情ID
    private Long order_id;  //订单ID
    private Integer category_id;  //类别ID
    private String sku;  //商品最小库存单位
    private Double money;  //商品金额
    private Integer amount; //账户号
    private Date create_time; //创建时间
    private Date update_time;  //更新时间
    private String type;  //canal拉取数据类型

    public Long getId() {
        return id;
    }

    public void setId(Long id) {
        this.id = id;
    }

    public Long getOrder_id() {
        return order_id;
    }

    public void setOrder_id(Long order_id) {
        this.order_id = order_id;
    }

    public Integer getCategory_id() {
        return category_id;
    }

    public void setCategory_id(Integer category_id) {
        this.category_id = category_id;
    }

    public String getSku() {
        return sku;
    }

    public void setSku(String sku) {
        this.sku = sku;
    }

    public Double getMoney() {
        return money;
    }

    public void setMoney(Double money) {
        this.money = money;
    }

    public Integer getAmount() {
        return amount;
    }

    public void setAmount(Integer amount) {
        this.amount = amount;
    }

    public Date getCreate_time() {
        return create_time;
    }

    public void setCreate_time(Date create_time) {
        this.create_time = create_time;
    }

    public Date getUpdate_time() {
        return update_time;
    }

    public void setUpdate_time(Date update_time) {
        this.update_time = update_time;
    }

    public String getType() {
        return type;
    }

    public void setType(String type) {
        this.type = type;
    }

    @Override
    public String toString() {
        return "OrderDetail{" +
                "id=" + id +
                ", order_id=" + order_id +
                ", category_id=" + category_id +
                ", sku='" + sku + '\'' +
                ", money=" + money +
                ", amount=" + amount +
                ", create_time=" + create_time +
                ", update_time=" + update_time +
                '}';
    }
}

4、实时统计拼团相关的指标

统计,总共开了多少个团,团的成团率、开团热门分类、团购订单数量、成交的金额、成交的分类金额、成交的区域金额。

  1. 实时拼团数量(维度:拼团的状态、商品分类[关联商品表获取分类ID])

  2. 实时拼团的金额(维度:拼团的状态、商品分类[关联商品表获取分类ID])

  • 需求分析(实时拼团的金额)

相关表

​ tb_groupon(拼团主表)

​ tb_groupon_users(拼团明细表)

​ tb_order(订单表)

实现方案:

实时拼团的金额:拼团明细表 Left Join 订单主表 Left Join 拼团主表

三个流进行Join(拼团明细表 Left Join 订单主表 Left Join 拼团主表)

问题:Flink窗口的Join、CoGroup不支持多个流在一个窗口内进行Join、CoGroup

解决方案一

将两个流进行Join、CoGroup,将的得到Join后的流再查数据库关联信息(异步IO,要查数据库,效率较低)

解决方案二(推荐使用,减少数据库查询):

将两个流进行Join、CoGroup,将的得到Join后的流在跟第三流进行JOIN(有两个窗口,在窗口中进行Join,数据是放在WindowState中,效率高一些)

简化后的数据

拼团主表:主表ID,拼团状态,分类ID

拼团明细表:细表ID,拼团主表ID, 订单ID

订单主表:订单ID、订单总金额

join后的结果

拼团主表ID,订单主表ID,订单总金额,拼团状态,分类ID、(省份)

  • 代码实现(使用nc模拟三流join操作)
package com.zwf.jobs;
/**
 * @author MrZeng
 * @version 1.0
 * @date 2024-01-20 21:33
 */

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

import java.time.Duration;

/**
 * 先两张表(tb_groupon、tb_groupon_users)进行join得出数据流再和第三张表(tb_order)进行join
 *
 *   //时间、用户ID、拼团主表ID、订单ID (左表)  nc端口:8888
 *         //1000,u1646,p201,o1002
 *         //5001,u1647,p202,o1003
 *
 *   //时间,拼团ID,拼团状态,分类ID   nc端口:9999
 *         //1000,p201,1,手机
 *         //5001,p202,1,手机
 *
 *  //时间,订单ID,订单状态,订单金额   nc端口:7777
 *         //1001,o1002,102,200.0
 *         //5002,o1003,101,3000.0
 */
public class GroupOnCount {
    public static void main(String[] args) throws Exception {
       //模拟三表join流
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        environment.setParallelism(1);
        DataStreamSource<String> userDetailStream = environment.socketTextStream("node1", 8888);
        SingleOutputStreamOperator<Tuple4<Long, String, String, String>> orderDetailWaterMark = userDetailStream.map(lines -> {
            String[] split = lines.split(",");
            return Tuple4.of(Long.parseLong(split[0]), split[1], split[2], split[3]);
        }, Types.TUPLE(Types.LONG,Types.STRING,Types.STRING,Types.STRING)).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple4<Long, String, String, String>>forBoundedOutOfOrderness(Duration.ofSeconds(0))
                .withTimestampAssigner((bean, ts) -> bean.f0));

        DataStreamSource<String> grouponDetailInfoStream = environment.socketTextStream("node1", 9999);
        SingleOutputStreamOperator<Tuple4<Long, String, String, String>> grouponDetailWaterMark = grouponDetailInfoStream.map(lines -> {
            String[] split = lines.split(",");
            return Tuple4.of(Long.parseLong(split[0]), split[1], split[2], split[3]);
        },Types.TUPLE(Types.LONG,Types.STRING,Types.STRING,Types.STRING)).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple4<Long, String, String, String>>forBoundedOutOfOrderness(Duration.ofSeconds(0))
                .withTimestampAssigner((bean, ts) -> bean.f0));

        DataStreamSource<String> orderDetailInfoStream = environment.socketTextStream("node1", 7777);
        SingleOutputStreamOperator<Tuple4<Long, String, String, Double>> orderDetailDetailWaterMark = orderDetailInfoStream.map(lines -> {
            String[] split = lines.split(",");
            return Tuple4.of(Long.parseLong(split[0]), split[1], split[2], Double.parseDouble(split[3]));
        },Types.TUPLE(Types.LONG,Types.STRING,Types.STRING,Types.DOUBLE)).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple4<Long, String, String, Double>>forBoundedOutOfOrderness(Duration.ofSeconds(0))
                .withTimestampAssigner((bean, ts) -> bean.f0));
        //表进行coGroupJoin
        DataStream<Tuple2<Tuple4<Long, String, String, String>, Tuple4<Long, String, String, String>>> firstJoin = orderDetailWaterMark.coGroup(grouponDetailWaterMark)
                .where(o -> o.f2)
                .equalTo(u -> u.f1)
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .apply(new CoGroupFunction<Tuple4<Long, String, String, String>, Tuple4<Long, String, String, String>, Tuple2<Tuple4<Long, String, String, String>, Tuple4<Long, String, String, String>>>() {
                    @Override
                    public void coGroup(Iterable<Tuple4<Long, String, String, String>> first, Iterable<Tuple4<Long, String, String, String>> second, Collector<Tuple2<Tuple4<Long, String, String, String>, Tuple4<Long, String, String, String>>> out) throws Exception {
                        boolean joined = false;
                        for (Tuple4<Long, String, String, String> f : first) {
                            for (Tuple4<Long, String, String, String> s : second) {
                                joined = true;
                                out.collect(Tuple2.of(f, s));
                            }
                            if (!joined) {
                                out.collect(Tuple2.of(f, null));
                            }
                        }
                    }
                });
        DataStream<Tuple2<Tuple2<Tuple4<Long, String, String, String>, Tuple4<Long, String, String, String>>, Tuple4<Long, String, String, Double>>> res = firstJoin.coGroup(orderDetailDetailWaterMark)
                .where(f -> f.f0.f3)
                .equalTo(s -> s.f1)
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .apply(new CoGroupFunction<Tuple2<Tuple4<Long, String, String, String>, Tuple4<Long, String, String, String>>, Tuple4<Long, String, String, Double>, Tuple2<Tuple2<Tuple4<Long, String, String, String>, Tuple4<Long, String, String, String>>, Tuple4<Long, String, String, Double>>>() {
                    @Override
                    public void coGroup(Iterable<Tuple2<Tuple4<Long, String, String, String>, Tuple4<Long, String, String, String>>> first, Iterable<Tuple4<Long, String, String, Double>> second, Collector<Tuple2<Tuple2<Tuple4<Long, String, String, String>, Tuple4<Long, String, String, String>>, Tuple4<Long, String, String, Double>>> out) throws Exception {
                        boolean joined = false;
                        for (Tuple2<Tuple4<Long, String, String, String>, Tuple4<Long, String, String, String>> f : first) {
                            for (Tuple4<Long, String, String, Double> s : second) {
                                joined = true;
                                out.collect(Tuple2.of(f, s));
                            }
                            if (!joined) {
                                out.collect(Tuple2.of(f, null));
                            }
                        }

                    }
                });

          res.print();
        environment.execute();

    }
}

五、Drools规则引擎

1、规则引擎

Java规则引擎起源于基于规则的专家系统,而基于规则的专家系统又是专家系统的其中一个分支。专家系统属于人工智能的范畴,它模仿人类的推理方式,使用试探性的方法进行推理,并使用人类能理解的术语解释和证明它的推理结论。

2、规则引擎优点

声明式编程:使用规则更加容易对复杂的问题进行表述,并得到验证(sql语法)。
逻辑与数据分离:数据保存在系统对象中,逻辑保存在规则中。这根本性的打破了面向对象系统中将数据和 逻辑耦合起来的局面。
速度及可测量性:Drools的Rete、Leaps算法,提供了对系统数据对象非常有效率的匹配,这些算法经过了大 量实际考验的证明。

3、规则语法

package 包名

rule "规则名"
when
    (条件) - 也叫作规则的 LHS(Left Hand Side)
then
    (动作/结果) - 也叫作规则的 RHS(Right Hand Side)
end

4、快速入门案例

  • Drools依赖
<!--drools规则引擎-->
    <dependency>
      <groupId>org.drools</groupId>
      <artifactId>drools-compiler</artifactId>
      <version>7.23.0.Final</version>
    </dependency>
  • 快速入门案例
package com.zwf.drools;

import com.zwf.droolspojo.Event;
import org.apache.commons.io.FileUtils;
import org.kie.api.io.ResourceType;
import org.kie.api.runtime.KieSession;
import org.kie.internal.utils.KieHelper;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;

/**
 * @author MrZeng
 * @version 1.0
 * @date 2024-01-21 10:46
 */
public class  DroolsDemo {
    public static void main(String[] args) {
        //获取drools工具类
        KieHelper kieHelper = new KieHelper();
        try {
            //读取规则文件中的内容
            String rules = FileUtils.readFileToString(new File("rules/first-demo.drl"), StandardCharsets.UTF_8);
              //加入规则信息
             kieHelper.addContent(rules, ResourceType.DRL);
            //创建规则匹配会话
            KieSession kieSession = kieHelper.build().newKieSession();
             //向会话中传入数据进行匹配
            Event event = new Event("view",20,false);
            //传入用户行为数据
            kieSession.insert(event);
            //启动所有规则
            kieSession.fireAllRules();
            //获取规则结果
            System.out.println(event.isFlag());
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
}
  • first-demo.drl

dialect  "java"
import com.zwf.droolspojo.Event
rule "first-demo"
    when
         $e:Event(type=="view"&& count>=2)
    then
         System.out.println("优惠券发放成功!");
         $e.setFlag(true);
end

5、Flink整合Drools案例

package com.zwf.drools;

import com.zwf.droolspojo.Event;
import org.apache.commons.io.FileUtils;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.kie.api.io.ResourceType;
import org.kie.api.runtime.KieSession;
import org.kie.internal.utils.KieHelper;

import java.io.File;
import java.nio.charset.StandardCharsets;

/**
 * @author MrZeng
 * @version 1.0
 * @date 2024-01-21 14:34
 */
public class FlinkDroolsDemo {
    //用户ID,商品分类ID,事件类型
    //u1001,c201,view
    //一个用户,在一天之内,浏览了某个分类的商品次数大于2就出发一个事件
    public static void main(String[] args) throws Exception {
        //获取flink环境
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> dataStream = environment.socketTextStream("node1", 8888);
         //进行数据整理
        SingleOutputStreamOperator<Tuple3<String, String, String>> mapSource = dataStream.map(lines -> {
            String[] fields = lines.split(",");
            return Tuple3.of(fields[0], fields[1], fields[2]);
        }, Types.TUPLE(Types.STRING, Types.STRING, Types.STRING));
        //根据用户ID进行分类  根据用户ID进行分组 用户ID、
        SingleOutputStreamOperator<Tuple3<String, String, String>> processStream = mapSource.keyBy(tp -> tp.f0).process(new KeyedProcessFunction<String, Tuple3<String, String, String>, Tuple3<String, String, String>>() {
            //初始化map状态对象   (产品类别,事件类型)
            private transient MapState<Tuple2<String, String>, Integer> stateVle;
            //初始化drools对象
            private transient KieSession ksession;

            @Override
            public void open(Configuration parameters) throws Exception {
                //初始化状态值  
                MapStateDescriptor<Tuple2<String, String>, Integer> descriptor = new MapStateDescriptor<>("category-id-count", TypeInformation.of(new TypeHint<Tuple2<String, String>>() {
                }), TypeInformation.of(new TypeHint<Integer>() {
                }));
                stateVle = getRuntimeContext().getMapState(descriptor);
                //初始化drools
                KieHelper kieHelper = new KieHelper();
                //读取规则文件
                String rulesContent = FileUtils.readFileToString(new File("rules/first-demo.drl"), StandardCharsets.UTF_8);
                kieHelper.addContent(rulesContent, ResourceType.DRL);
                 //创建规则匹配的会话
                ksession = kieHelper.build().newKieSession();
            }

            @Override
            public void processElement(Tuple3<String, String, String> value, KeyedProcessFunction<String, Tuple3<String, String, String>, Tuple3<String, String, String>>.Context ctx, Collector<Tuple3<String, String, String>> out) throws Exception {
                 //用户ID
                String uid = value.f0;
                //产品类别
                String categoryId = value.f1;
                //事件类型
                String type = value.f2;
                Tuple2<String, String> key = Tuple2.of(categoryId, type);
                Integer viewCount = stateVle.get(key);
                if (viewCount == null) {
                    viewCount = 0;
                }
                viewCount++;
                stateVle.put(key, viewCount);
                //建立规则
                Event event = new Event(type, viewCount, false);
                ksession.insert(event);
                //启动规则
                ksession.fireAllRules();
                if (event.isFlag()) {
                    out.collect(Tuple3.of(uid, "发优惠券", "满30减50"));
                }

            }
        });
        //打印结果
        processStream.print();

        environment.execute();
    }


}
  • Drl文件

dialect  "java"
import com.zwf.droolspojo.Event
rule "first-demo"
    when
         $e:Event(type=="view" && count>=2)
    then
         System.out.println("优惠券发放成功!");
         $e.setFlag(true);
end

6、Flink整合Drools综合案例

1649339236983

Flink读取kafka中的商品信息,异步查询clickhouse中的数据进行匹配,同时通过canal实时同步拉取MySQL中自定义规则数据进kafka中,Flink最终消费kafka中同步后的自定义规则数据,触发规则引擎发送消费劵。

  • job类
package com.zwf.drools;

import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject;
import com.zwf.droolsUtils.kafkaSourceUtil;
import com.zwf.droolspojo.*;
import com.zwf.service.ClickHouseQueryServiceImp;
import com.zwf.service.QueryService;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;
import org.kie.api.io.ResourceType;
import org.kie.api.runtime.KieSession;
import org.kie.internal.utils.KieHelper;

import java.util.Map;

/**
 * @author MrZeng
 * @version 1.0
 * @date 2024-01-21 16:44
 */
public class FlinkDroolsDemo3 {
    public static void main(String[] args) throws Exception {
        //获取Flink环境
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        environment.setParallelism(2);
        String kafkaServers="node1:9092,master:9092,node2:9092";
        //消费kafka中的自定义规则数据
        FlinkKafkaConsumer<String> droolRulesStream = kafkaSourceUtil.getKafkaSource(kafkaServers, "drools-rules", "earliest");
        //消费kafka中的业务数据(商品数据)
        FlinkKafkaConsumer<String> eventsStream = kafkaSourceUtil.getKafkaSource(kafkaServers, "events", "latest");
        //声明广播状态描述
        MapStateDescriptor<Integer,BroadCastDroolsState> descriptor = new MapStateDescriptor("rules-state", Integer.class, BroadCastDroolsState.class);
        //获取kafka中的流数据
        DataStreamSource<String> droolsSource = environment.addSource(droolRulesStream);
        DataStreamSource<String> eventsSource = environment.addSource(eventsStream);

        //处理数据
        SingleOutputStreamOperator<DroolsRules> rulesStream = droolsSource.process(new ProcessFunction<String, DroolsRules>() {
            @Override
            public void processElement(String value, ProcessFunction<String, DroolsRules>.Context ctx, Collector<DroolsRules> out) throws Exception {
                JSONObject object = JSONObject.parseObject(value);
                //插入或者更新状态 获取canal同步后的自定义规则数据。
                Object type = object.get("type");
                //只需要获取插入和更新后的数据
                if ("INSERT".equals(type) || "UPDATE".equals(type)) {
                    JSONArray data = object.getJSONArray("data");
                    for (int i = 0; i < data.size(); i++) {
                        DroolsRules rules = data.getObject(i, DroolsRules.class);
                        out.collect(rules);
                    }
                }
            }
        });
        //设置广播变量
        BroadcastStream<DroolsRules> broadcastStream = rulesStream.broadcast(descriptor);
        //处理事件数据   (uid categoryId type)
        KeyedStream<Tuple3<String, String, String>, String> eventsKeyedStream = eventsSource.map(new MapFunction<String, Tuple3<String, String, String>>() {
            @Override
            public Tuple3<String, String, String> map(String value) throws Exception {
                String[] fields = value.split(",");
                return fields.length==3?Tuple3.of(fields[0], fields[1], fields[2]):Tuple3.of("","","");
            }
        }).keyBy(tp -> tp.f0);

        eventsKeyedStream.connect(broadcastStream).process(new KeyedBroadcastProcessFunction<String, Tuple3<String, String, String>, DroolsRules, Tuple3<String,String,String>>() {
               //设置状态对象
            private transient MapState<Tuple2<String,String>,Integer> mapState;
             //设置业务接口对象
            private transient QueryService queryService;

            @Override
            public void open(Configuration parameters) throws Exception {
                //初始化状态对象
                MapStateDescriptor<Tuple2<String, String>, Integer> mapStateDescriptor = new MapStateDescriptor<>("event-view-counts", TypeInformation.of(new TypeHint<Tuple2<String, String>>() {
                }), TypeInformation.of(new TypeHint<Integer>() {}));
                mapState=getRuntimeContext().getMapState(mapStateDescriptor);
                //使用clickhouse数据库接口实现类,可以通过反射创建示例
                queryService=new ClickHouseQueryServiceImp();
            }

            @Override
            public void processElement(Tuple3<String, String, String> value, KeyedBroadcastProcessFunction<String, Tuple3<String, String, String>, DroolsRules, Tuple3<String, String, String>>.ReadOnlyContext ctx, Collector<Tuple3<String, String, String>> out) throws Exception {
                //获取状态中的值
                String uid = value.f0;
                String categoryId = value.f1;
                String type = value.f2;
                //获取状态对象的值
                Tuple2<String, String> key = Tuple2.of(categoryId, type);
                Integer viewsCounts = mapState.get(key);
                if(viewsCounts==null){
                    viewsCounts=0;
                }
                viewsCounts++;
                mapState.put(key,viewsCounts);
                //获取广播中的变量中迭代器
                Iterable<Map.Entry<Integer, BroadCastDroolsState>> entryIterable = ctx.getBroadcastState(descriptor).immutableEntries();
                //事件对象
                Event event = new Event(type, viewsCounts, false);
                //获取查询的参数对象
                for (Map.Entry<Integer, BroadCastDroolsState> entry: entryIterable){
                    //获取广播变量中的值
                    BroadCastDroolsState broadCastDrools = entry.getValue();
                    //获取sql查询参数对象 广播中的对象
                    QueryParam queryParam = new QueryParam(broadCastDrools.getSql(), uid, broadCastDrools.getStart_time(), broadCastDrools.getEnd_time(), broadCastDrools.getCounts(), categoryId,type);
                    //获取DroolsRuleParam对象
                    DroolsRuleParam droolsRuleParam = new DroolsRuleParam(event, queryService, queryParam,false);
                    KieSession kieSession = broadCastDrools.getKieSession();
                    kieSession.insert(droolsRuleParam);
                    kieSession.fireAllRules();
                    if(droolsRuleParam.isHit()){
                        out.collect(Tuple3.of(uid,"发放优惠劵","满30减20"));
                    }
                }

            }
             //读取规则数据插入广播变量中
            @Override
            public void processBroadcastElement(DroolsRules value, KeyedBroadcastProcessFunction<String, Tuple3<String, String, String>, DroolsRules, Tuple3<String, String, String>>.Context ctx, Collector<Tuple3<String, String, String>> out) throws Exception {
                   //处理广播变量
                Short status = value.getStatus();
                Integer id = value.getId();
                String startTime = value.getStart_time();
                String endTime = value.getEnd_time();
                Integer counts = value.getCounts();
//                System.out.println(value.toString());
                //如果droolsRules是新增或者更新状态时
                //获取广播变量对象
                BroadcastState<Integer, BroadCastDroolsState> broadcastState = ctx.getBroadcastState(descriptor);
                 String code=value.getCode();
                 String sql=value.getSql();
                if(status==null||code==null||sql==null){
                    status=3;
                }
                  if(status==1||status==2){
                      KieHelper kieHelper = new KieHelper();
                      //获取对象中的代码规则
                      kieHelper.addContent(value.getCode(), ResourceType.DRL);
                      KieSession kieSession = kieHelper.build().newKieSession();
                      BroadCastDroolsState broadCastDroolsState = new BroadCastDroolsState(id, kieSession, value.getSql(), startTime, endTime, counts);
                      broadcastState.put(id,broadCastDroolsState);
                  }else if(status==3){
                      //如果是删除状态  就删除广播变量
                      broadcastState.remove(id);
                  }
            }
        }).print();

        environment.execute();

    }

}
  • pojo类(Event、BroadCastDroolsState、DroolsRuleParam、DroolsRules、QueryParam)
package com.zwf.droolspojo;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
 * @author MrZeng
 * @version 1.0
 * @date 2024-01-21 10:59
 */
@Data
@NoArgsConstructor  //无参构造器
@AllArgsConstructor  //有参构造器
public class Event {
    //事件类型
    private String type;
    //浏览数
    private Integer count;
    //触发优惠券标记
    private boolean flag;

}
-----------------------------------------------------------------------------------------
package com.zwf.droolspojo;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.kie.api.runtime.KieSession;

/**
 * @author MrZeng
 * @version 1.0
 * @date 2024-01-21 16:33
 */
@Data
@AllArgsConstructor
@NoArgsConstructor
public class BroadCastDroolsState {
    //DroolsRState实体类  用于广播变量
    private Integer id;
    //设置drools会话
    private KieSession kieSession;
    private String sql;
    //时间不使用字符串类型  会自定转为时间戳去匹配查询
    private String start_time;  //生效时间
    private String end_time;  //失效时间
    private Integer counts;  //浏览数
}

-----------------------------------------------------------------------------------------
package com.zwf.droolspojo;

import com.zwf.service.QueryService;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
 * @author MrZeng
 * @version 1.0
 * @date 2024-01-21 20:21
 */
//DroolsRules规则参数
 @Data
 @AllArgsConstructor
 @NoArgsConstructor
public class DroolsRuleParam {
    //规则事件
    private Event event;
    //查询业务接口
    private QueryService queryService;
    //查询参数
    private QueryParam queryParam;
    //命中判断 是否触发规则
    private boolean hit;
}

-----------------------------------------------------------------------------------------
package com.zwf.droolspojo;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
 * @author MrZeng
 * @version 1.0
 * @date 2024-01-21 16:28
 */
@Data
@NoArgsConstructor
@AllArgsConstructor
public class DroolsRules {
    //DroolsRules规则实体类
    private Integer id;
    //设置规则代码
    private String code;
    private String sql;
    private String start_time;
    private String end_time;
    private Integer counts;
    private Short status;  //1表示新增  2表示更新  3表示删除
}

-----------------------------------------------------------------------------------------
package com.zwf.droolspojo;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
 * @author MrZeng
 * @version 1.0
 * @date 2024-01-21 20:24
 */
@Data
@AllArgsConstructor
@NoArgsConstructor
//查询字段
public class QueryParam {
    private String sql;
    private String uid;
    private String start_time;
    private String end_time;
    private Integer counts;
    private String category_id;
    private String type;

}

  • kafka消费者工具类
package com.zwf.droolsUtils;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;

import java.util.Properties;

/**
 * @author MrZeng
 * @version 1.0
 * @date 2024-01-21 15:29
 */
public class kafkaSourceUtil {

    public static FlinkKafkaConsumer<String> getKafkaSource(String servers,String topic,String offset){
        Properties  prop=new Properties();
        prop.put("bootstrap.servers",servers);
        prop.put("auto.offset.reset",offset);
        FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(), prop);
        return kafkaConsumer;

    }
}
  • ClickhouseJDBC工具类
package com.zwf.utils;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;

/**
 * @author MrZeng
 * @version 1.0
 * @date 2024-01-21 21:41
 */
public class ClickHouseUtils {
    private static Connection connection=null;
    private static String ckDriver="com.clickhouse.jdbc.ClickHouseDriver";
    private static String userName="default";
    private static String passWord="123456";
    private static String url="jdbc:clickhouse://192.168.147.110:8123/drools";
     static {
         try {
             Class.forName(ckDriver);
         } catch (ClassNotFoundException e) {
             throw new RuntimeException(e);
         }
     }
    public static Connection getConnection() throws SQLException {
        connection= DriverManager.getConnection(url,userName,passWord);
        return connection;
    }

}
  • clickhouse数据查询JDBC业务
package com.zwf.service;

/**
 * @author MrZeng
 * @version 1.0
 * @date 2024-01-21 19:53
 */
public interface QueryService {
    /**
     * 业务查询接口
     * @param sql  查询sql语句
     * @param uid  用户ID
     * @param cid category_id 类别ID
     * @param type  类型  view addCard  pay
     * @param start_time 生效时间
     * @param end_time  失效时间
     * @param counts 查询次数
     * @return
     */
    boolean queryEventCountRangeTimes(String sql,String uid,String cid,String type,String start_time,String end_time,int counts) throws Exception;

}
package com.zwf.service;

import com.zwf.utils.ClickHouseUtils;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;

/**
 * @author MrZeng
 * @version 1.0
 * @date 2024-01-21 20:36
 */
//select count(*) counts from tb_user_event where uid = ? and cid = ? and
// type = ? start_time >= ? and end_time <= ?  group by uid ,cid
public class ClickHouseQueryServiceImp implements QueryService{
    @Override
    public boolean queryEventCountRangeTimes(String sql, String uid, String cid, String type, String start_time, String end_time, int counts) throws Exception {
        Connection connection = ClickHouseUtils.getConnection();
        PreparedStatement preparedStatement = connection.prepareStatement(sql);
        preparedStatement.setString(1,uid);
        preparedStatement.setString(2,cid);
        preparedStatement.setString(3,type);
        preparedStatement.setString(4,start_time);
        preparedStatement.setString(5,end_time);
        ResultSet resultSet = preparedStatement.executeQuery();
        int res=0;
        if(resultSet.next()){
            res=resultSet.getInt("counts");
        }
        //kafka测试数据uid  cid type要与clickhouse中的数据一致!
          //使用次数不能超过sql中规定的次数
        return res>=counts;
    }
}
  • 自定义规则引擎(drl文件)用户浏览同一类别商品超过2次及以上触发优惠券活动。
dialect  "java"
import com.zwf.droolspojo.DroolsRuleParam
import com.zwf.droolspojo.QueryParam
import com.zwf.droolspojo.Event
import com.zwf.service.QueryService
rule "first-demo2"
    when
       $dr:DroolsRuleParam()
       $e: Event(type=="view"&&count>=2) from $dr.event
       $param:QueryParam() from $dr.queryParam
       $Service:QueryService() from $dr.queryService
    then
         QueryService queryService = $dr.getQueryService();
         QueryParam queryParam = $dr.getQueryParam();
         boolean flag = queryService.queryEventCountRangeTimes(queryParam.getSql(),queryParam.getUid(),queryParam.getCategory_id(),$e.getType(),queryParam.getStart_time(),queryParam.getEnd_time(),queryParam.getCounts());
         $dr.setHit(flag);
end
  • MySQL(业务数据库规则数据)
/*
SQLyog Ultimate v12.09 (64 bit)
MySQL - 8.0.16 
*********************************************************************
*/
/*!40101 SET NAMES utf8 */;
use lives;
create table `tb_drools_rules` (
	`id` int (11),
	`name` varchar (60),
	`code` text ,
	`sql` text ,
	`start_time` datetime ,
	`end_time` datetime ,
	`counts` int (11),
	`status` tinyint (4)
); 
insert into `tb_drools_rules` (`id`, `name`, `code`, `sql`, `start_time`, `end_time`, `counts`, `status`) values('1','推荐规则一','dialect  \"java\"\r\nimport com.zwf.droolspojo.Event\r\nrule \"first-demo\"\r\n    when\r\n         $e:Event(type==\"view\" && count>=2)\r\n    then\r\n         System.out.println(\"优惠券发放成功!\");\r\n         $e.setFlag(true);\r\nend',NULL,'2024-02-04 21:40:06','2024-04-05 21:40:20','2','3');
insert into `tb_drools_rules` (`id`, `name`, `code`, `sql`, `start_time`, `end_time`, `counts`, `status`) values('2','推荐规则二','dialect  \"java\"\r\nimport com.zwf.droolspojo.DroolsRuleParam\r\nimport com.zwf.droolspojo.QueryParam\r\nimport com.zwf.droolspojo.Event\r\nimport com.zwf.service.QueryService\r\nrule \"first-demo2\"\r\n    when\r\n       $dr:DroolsRuleParam()\r\n       $e: Event(type==\"view\"&&count>=2) from $dr.event\r\n       $param:QueryParam() from $dr.queryParam\r\n       $Service:QueryService() from $dr.queryService\r\n    then\r\n         QueryService queryService = $dr.getQueryService();\r\n         QueryParam queryParam = $dr.getQueryParam();\r\n         boolean flag = queryService.queryEventCountRangeTimes(queryParam.getSql(),queryParam.getUid(),queryParam.getCategory_id(),$e.getType(),queryParam.getStart_time(),queryParam.getEnd_time(),queryParam.getCounts());\r\n         $dr.setHit(flag);\r\nend\r\n\r\n','select count(*) counts from tb_user_event where uid=? and cid = ? and type = ? and start_time >= ? and end_time <= ?  group by uid ,cid','2023-12-01 10:38:28','2024-03-07 10:38:52','2','1');
  • ClickHouseSQL(业务数据)
create database drools;

drop table drools.tb_user_event;
-- drools测试表
create table  drools.tb_user_event(
    id String,
    uid String,
    spu String,
    cid String,
    type String,
    dt String,
    start_time DATETIME64,
    end_time DATETIME64,
    ts DATETIME default now()
)engine =ReplacingMergeTree(ts)
partition by dt
order by id;


insert into drools.tb_user_event(id,uid,spu,cid,type,dt,start_time,end_time)
values('1','u20020','iphone13','手机','view','2024-01-09 22:34:16','2024-01-10 22:34:16','2024-01-21 22:34:33');
insert into drools.tb_user_event(id,uid,spu,cid,type,dt,start_time,end_time)
values('2','u20020','iphone13','手机','view','2024-01-09 22:34:16','2024-01-10 22:34:16','2024-01-21 22:34:33');
insert into drools.tb_user_event(id,uid,spu,cid,type,dt,start_time,end_time)
values('3','u20019','iphone13','手机','view','2024-01-09 22:34:16','2024-01-10 22:34:16','2024-01-21 22:34:33');
insert into drools.tb_user_event(id,uid,spu,cid,type,dt,start_time,end_time)
values('4','u20019','iphone13','手机','view','2024-01-09 22:34:16','2024-01-10 22:34:16','2024-01-21 22:34:33');
  • 配置${CANAL_HOME}/conf/example/instance.properties如下:
canal.mq.dynamicTopic=order-main:lives\\.tb_order,order-detail:lives\\.tb_order_goods,ordermain:lives\\.orderMain,orderdetail:lives\\.orderDetail,drools-rules:lives\\.tb_drools_rules
  • 启动canal同步MySQL中的规则引擎数据进kafka中主题为drools_rules队列中
bin/startup.sh
  • 创建2个kafka主题
kafka-topics.sh --create --zookeeper node1:2181,master:2181,node2:2181/kafka0110 --topic events --replication-factor 3 --partitions 3

kafka-topics.sh --create --zookeeper node1:2181,master:2181,node2:2181/kafka0110 --topic drools_rules --replication-factor 3 --partitions 3
  • 模拟事件数据(通过控制台加入事件数据)
kafka-console-producer.sh --broker-list node1:9092,master:9092,node2:9092 --topic events

标签:数仓,String,flink,实时,笔记,import,apache,org,public
From: https://www.cnblogs.com/smallzengstudy/p/18058895

相关文章

  • Windows内核基础理论笔记
    内核理论基础特权级别​ 现代计算机的CPU设计中有四个特权级别:R0、R1、R2、R3​ 内核运行在R0(拥有最高权限),用户程序运行在R3​例如:WindowsXP体系结构图中HardwareAbstractionLayer(硬件抽象层):用于提供硬件的低级接口WindowsXP的执行体是NTOSKRNL.EXE的上层ntdll.dll:......
  • Java学习笔记——第八天
    常用API(String、ArrayList部分)APIAPI是什么API(ApplicationProgrammingInterface,应用程序编程接口)就是别人写好的一些程序,给程序员直接拿去调用即可解决问题。API文档是什么Java提供的API使用说明书。包包是什么包是用来分门别类的管理各种不同程序的,类似于文件夹,建包......
  • Vue学习笔记37--内置关系
    示例一:<!DOCTYPEhtml><htmllang="en"><head><metacharset="UTF-8"><metaname="viewport"content="width=device-width,initial-scale=1.0"><title>一个重要的内置关系</title>......
  • NetCore Rtsp视频流转Websocket实现Web实时查看摄像头
    .NetCoreRtsp视频流转Websocket实现Web实时查看摄像头最近工作中遇到需求需要实现这个功能,网上找了很多方案,大都是转为视频文件保存,实时查看的方案倒比较少,最终自己慢慢琢磨了很久在windows系统下实现了,里面的核心思路是:由FFmpeg.AutoGen捕捉Rtsp流视频帧,转为Bitmap,借由Websocke......
  • Go学习笔记(一)
    Go学习日记Go的数据类型bool:true和falseint:int8,int16,int32(int),int64uint:uint8,uint16,uint32(uint),uint64float:float32,float64byte字节类型,本质是uint8,对应的操作包是bytesGolang的数值类型明确的制定了长度、有无符号Golang不会做类型转换......
  • 企业级应用于架构设计笔记
    课堂笔记-主要是给自己复习的第一节课课程结构:架构定义:用一致认可方式从多个角度对系统的组成部分及各部分之间的协作关系所做的描述。软件架构的定义(软件体系结构SoftwareArchitecture):用开发团一致认可的方式从多个角度(业务、开发、运维等)对软件的组成部分及各部分之间的协......
  • 推荐系统学习笔记(一)
    好的推荐系统疑问:推荐系统的主要任务,推荐系统和分类目录以及与搜索引擎的区别等什么是推荐系统如果对象数目巨大,用户只能通过搜索引擎找到自己需要的目标如果用户没有明确需求时,需要一个工具帮助筛选,给出一些建议,这个工具就是推荐系统推荐系统的任务联系用户和信息,一方......
  • web实时消息推送方案 - (重要~个人简历要引申)
    一什么是消息推送推送的场景比较多,比如有人关注我的公众号,这时我就会收到一条推送消息,以此来吸引我点击打开应用。消息推送通常是指网站的运营工作等人员,通过某种工具对用户当前网页或移动设备APP进行的主动消息推送。消息推送一般又分为Web端消息推送和移动端消息推送。......
  • 2024.3.6学习笔记
    1.InfoNCEloss(源自知乎https://zhuanlan.zhihu.com/p/506544456)1.引入把对比学习看成是一个字典查询的任务,即训练一个编码器从而去做字典查询的任务。假设已经有一个编码好的queryq以及一系列编码好的样本k0,k1,k2...,把k0,k1,k2...可以看作是字典里的key。假设只有一......
  • 模拟退火学习笔记
    模拟退火,优雅的暴力我认为有必要摘抄一下提单上的简介ZX写的前言:本片适用于模拟退火入门-进阶模拟退火(SA)是一种随机化算法。当一个问题的方案数量极大(甚至是无穷的)而且不是一个单峰函数时,我们常使用模拟退火求解。一般的,很多题都可以用模拟退火水过,在OI界称之[优雅的暴......