首页 > 其他分享 >flink demo

flink demo

时间:2023-07-14 10:12:01浏览次数:42  
标签:buy -- demo flink connector user behavior

1. 搭建测试环境安装

1.1 下载并启动docker-compose容器

# 该 Docker Compose 中包含的容器有:
# DataGen:数据生成器。容器启动后会自动开始生成用户行为数据,并发送到 Kafka 集群中。默认每秒生成 1000 条数据,持续生成约 3 小时。也可以更改 docker-compose.yml 中 datagen 的 speedup 参数来调整生成速率(重启 docker compose 才能生效)。
# MySQL:集成了 MySQL 5.7 ,以及预先创建好了类目表(category),预先填入了子类目与顶级类目的映射关系,后续作为维表使用。
# Kafka:主要用作数据源。DataGen 组件会自动将数据灌入这个容器中。
# Zookeeper:Kafka 容器依赖。
# Elasticsearch:主要存储 Flink SQL 产出的数据。
# Kibana:可视化 Elasticsearch 中的数据
mkdir -p /data/flink/flink-demo
cd /data/flink/flink-demo
wget https://raw.githubusercontent.com/wuchong/flink-sql-demo/master/docker-compose.yml

# 启动:
docker-compose up -d
# 停止并删除:
docker-compose down
# 重启:
docker-compose restart

# 查看kafka测试数据
docker-compose exec kafka bash -c 'kafka-console-consumer.sh --topic user_behavior --bootstrap-server kafka:9094 --from-beginning --max-messages 10'
docker-compose exec kafka bash -c 'kafka-console-consumer.sh --topic user_behavior --bootstrap-server kafka:9094 '
# https://downloads.apache.org/flink/
wget "https://downloads.apache.org/flink/flink-1.10.3/flink-1.10.3-bin-scala_2.11.tgz"
gzip -d flink-1.10.3-bin-scala_2.11.tgz
tar -xvf flink-1.10.3-bin-scala_2.11.tar
/data/flink/flink-1.10.3

ln -s /data/Apps/flink-1.10.3 /data/flink/flink
# 下载flink sql connect包
# https://repo1.maven.org/maven2/org/apache/flink/
cd /data/flink/flink/lib/
wget https://repo1.maven.org/maven2/org/apache/flink/flink-json/1.10.3/flink-json-1.10.3.jar 
wget https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.11/1.10.3/flink-sql-connector-kafka_2.11-1.10.3.jar 
wget https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-elasticsearch6_2.11/1.10.3/flink-sql-connector-elasticsearch6_2.11-1.10.3.jar 
wget https://repo1.maven.org/maven2/org/apache/flink/flink-jdbc_2.11/1.10.3/flink-jdbc_2.11-1.10.3.jar 
wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/5.1.48/mysql-connector-java-5.1.48.jar

# 修改并发配置
vi /data/flink/flink/conf/flink-conf.yaml
taskmanager.numberOfTaskSlots: 10

# 启动Flink
bin/stop-cluster.sh
bin/start-cluster.sh
# 启动 SQL CLI
bin/sql-client.sh embedded

2. 创建实时任务

-- 创建kafka数据源表
CREATE TABLE user_behavior (
    user_id BIGINT,
    item_id BIGINT,
    category_id BIGINT,
    behavior STRING,
    ts TIMESTAMP(3),
    proctime as PROCTIME(),   -- 通过计算列产生一个处理时间列
    WATERMARK FOR ts as ts - INTERVAL '5' SECOND  -- 在ts上定义watermark,ts成为事件时间列
) WITH (
    'connector.type' = 'kafka',  -- 使用 kafka connector
    'connector.version' = 'universal',  -- kafka 版本,universal 支持 0.11 以上的版本
    'connector.topic' = 'user_behavior',  -- kafka topic
    'connector.startup-mode' = 'earliest-offset',  -- 从起始 offset 开始读取
    'connector.properties.zookeeper.connect' = 'localhost:2181',  -- zookeeper 地址
    'connector.properties.bootstrap.servers' = 'localhost:9092',  -- kafka broker 地址
    'format.type' = 'json'  -- 数据源格式为 json
);

-- 验证SQL
show databases;
create database demo;
use demo;
show tables;
describe user_behavior;
SELECT * FROM user_behavior limit 10;
-- 数据显示方式
SET execution.result-mode=changelog;
SET execution.result-mode=table;

-- 创建统计每小时的成交量的elasticsearch结果表
CREATE TABLE buy_cnt_per_hour (
    hour_of_day BIGINT,
    buy_cnt BIGINT
) WITH (
    'connector.type' = 'elasticsearch',             -- 使用 elasticsearch connector
    'connector.version' = '6',                      -- elasticsearch 版本,6 能支持 es 6+ 以及 7+ 的版本
    'connector.hosts' = 'http://10.8.60.127:9200',  -- elasticsearch 地址
    'connector.index' = 'buy_cnt_per_hour',         -- elasticsearch 索引名,相当于数据库的表名
    'connector.document-type' = 'user_behavior',    -- elasticsearch 的 type,相当于数据库的库名
    'connector.bulk-flush.max-actions' = '1',       -- 每条数据都刷新
    'format.type' = 'json',                         -- 输出数据格式 json
    'update-mode' = 'append'
);

-- 统计每小时的成交量
INSERT INTO buy_cnt_per_hour
SELECT HOUR(TUMBLE_START(ts, INTERVAL '1' HOUR)), COUNT(*)
FROM user_behavior
WHERE behavior = 'buy'
GROUP BY TUMBLE(ts, INTERVAL '1' HOUR)
;




mysql -h127.0.0.1 -uroot -p123456 flink
CREATE TABLE buy_cnt_per_hour (
    dayhour BIGINT
    ,cnt_buy bigint
)
;



drop table sink_mysql_buy_cnt_per_hour;
CREATE TABLE sink_mysql_buy_cnt_per_hour (
    dayhour BIGINT
    ,cnt_buy bigint
) WITH (
    'connector.type' = 'jdbc',
    'connector.url' = 'jdbc:mysql://127.0.0.1:3306/flink',
    'connector.table' = 'buy_cnt_per_hour',
    'connector.driver' = 'com.mysql.jdbc.Driver',
    'connector.username' = 'root',
    'connector.password' = '123456'
    ,'connector.write.flush.max-rows' = '5000'
    ,'connector.write.flush.interval' = '2s'
    ,'connector.write.max-retries' = '3'

);

INSERT INTO sink_mysql_buy_cnt_per_hour
SELECT
 HOUR(TUMBLE_START(ts, INTERVAL '10' SECOND)) as dayhour
, COUNT(*) as cnt_buy
FROM user_behavior
WHERE behavior = 'buy'
GROUP BY TUMBLE(ts, INTERVAL '10' SECOND)
;


MINUTE
SECOND
HOUR


INSERT INTO buy_cnt_per_hour
SELECT HOUR(TUMBLE_START(ts, INTERVAL '1' HOUR)) ad day_hour, COUNT(*) as cnt_buy
FROM user_behavior
WHERE behavior = 'buy'
GROUP BY TUMBLE(ts, INTERVAL '1' HOUR)
;




-- 统计一天每10分钟累计独立用户数的es结果表
CREATE TABLE cumulative_uv (
    time_str STRING,
    uv BIGINT
) WITH (
    'connector.type' = 'elasticsearch',
    'connector.version' = '6',
    'connector.hosts' = 'http://localhost:9200',
    'connector.index' = 'cumulative_uv',
    'connector.document-type' = 'user_behavior',
    'format.type' = 'json',
    'update-mode' = 'upsert'
);

-- 创建预处理的视图
CREATE VIEW uv_per_10min AS
SELECT 
  MAX(SUBSTR(DATE_FORMAT(ts, 'HH:mm'),1,4) || '0') OVER w AS time_str, 
  COUNT(DISTINCT user_id) OVER w AS uv
FROM user_behavior
WINDOW w AS (ORDER BY proctime ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW);

-- 统计SQL
INSERT INTO cumulative_uv
SELECT time_str, MAX(uv)
FROM uv_per_10min
GROUP BY time_str;

-- 创建mysql维表
CREATE TABLE category_dim (
    sub_category_id BIGINT,  -- 子类目
    parent_category_id BIGINT -- 顶级类目
) WITH (
    'connector.type' = 'jdbc',
    'connector.url' = 'jdbc:mysql://localhost:3306/flink',
    'connector.table' = 'category',
    'connector.driver' = 'com.mysql.jdbc.Driver',
    'connector.username' = 'root',
    'connector.password' = '123456',
    'connector.lookup.cache.max-rows' = '5000',
    'connector.lookup.cache.ttl' = '10min'
);

-- 创建顶级类目操行es表
CREATE TABLE top_category (
    category_name STRING,  -- 类目名称
    buy_cnt BIGINT  -- 销量
) WITH (
    'connector.type' = 'elasticsearch',
    'connector.version' = '6',
    'connector.hosts' = 'http://localhost:9200',
    'connector.index' = 'top_category',
    'connector.document-type' = 'user_behavior',
    'format.type' = 'json',
    'update-mode' = 'upsert'
);

-- 创建视图
create view rich_user_behavior
as
select
     u.user_id
    ,u.item_id
    ,u.behavior, 
    case c.parent_category_id
        when 1 then '服饰鞋包'
        when 2 then '家装家饰'
        when 3 then '家电'
        when 4 then '美妆'
        when 5 then '母婴'
        when 6 then '3c数码'
        when 7 then '运动户外'
        when 8 then '食品'
        else '其他'
    end as category_name
from user_behavior as u 
left join category_dim for system_time as of u.proctime as c
    on u.category_id = c.sub_category_id
;
-- 按顶级类目进行统计
INSERT INTO top_category
SELECT
     category_name
    ,COUNT(*) buy_cnt
FROM rich_user_behavior
WHERE behavior = 'buy'
GROUP BY
     category_name;

http://10.8.60.127:5601
REF

https://iteblog.blog.csdn.net/article/details/111465792
https://blog.csdn.net/weixin_42066446/article/details/113243977
https://blog.csdn.net/weixin_43039757/article/details/112850707
https://blog.csdn.net/wshl1234567/article/details/104512644/
https://mp.weixin.qq.com/s/pXJfxp0wxdlafFyg4tgiGg

标签:buy,--,demo,flink,connector,user,behavior
From: https://www.cnblogs.com/chenzechao/p/17552921.html

相关文章

  • flask demo
    fromflaskimportFlask,requestfromflask_corsimportCORSapp=Flask(__name__)CORS(app)@app.route('/')defindex():return'欢迎使用FlaskDemo应用!'@app.route('/hello',methods=['GET','POST'])d......
  • dhtmlx基本使用demo,vue
    main.js  引入样式import'dhtmlx-gantt/codebase/dhtmlxgantt.css'父组件:gangtData,数据<ganttChartv-if="value8":gangtData="gangtData"/> 子组件:<template><divstyle="height:62vh;"ref="ganttContai......
  • golang的list数据结构demo
    packagemainimport"container/list"funcmain(){varmylistlist.List//放在尾部mylist.PushBack("go")mylist.PushBack("grpc")mylist.PushBack("mysql")//头部放数据mylist.PushFront("gi......
  • Rust 使用egui创建一个简单的下载器demo
    仓库连接:https://github.com/GaN601/egui-demo-download-util这是我第一个rustguidemo,学习rust有挺长时间了,但是一直没有落实到实践中,本着对桌面应用的兴趣,考察了slint、egui两种框架,最后还是选择了egui.这篇博客同时包含我当前的一些理解,但是自身技术有限,可......
  • 基于three.js的3D展厅Demo功能设计与实现
    参考网址: http://www.webgl3d.cn/          https://www.three3d.cn/ 1、three.js之helloworld    功能:立方体在三维空间的转动。   代码位置:demo_0_scene    目的:理解场景/相机 /渲染器/坐标系/几何体/材质/物体......
  • 创建 Code Interpreter Demo: 一次实践的探索
    好消息,好消息,CodeInterpreter可以测试使用了!!!在这篇文章中,我们将探索如何创建一个CodeInterpreterDemo。提交一个2023年1-5月份的融资记录数据,让它来帮我们分析一下这些数据。执行的过程如下:生成图表的代码我们也可以找到,需要做调整的话,可以把代码复制到本地进行修......
  • HR_INFOTYPE_OPERATION DEMO
    DATA:ls_returnTYPEbapireturn1."bapi的返回结果LOOPAT<gfs_t_output>ASSIGNING<gfs_s_output>.CLEAR:gv_pernr,gv_begda,gv_endda,gv_subty,gv_seqnr,ls_return.ASSIGNCOMPONENT'ZSTATUS'OFSTRUCTURE<gfs_s_output>......
  • Hello-FPGA CoaXPress 2.0 FPGA HOST IP Core Demo User Manual
     目录Hello-FPGACoaXPress2.0HostFPGAIPCoreDemo41说明42设备连接53VIVADOFPGA工程64SDK工程9图1‑1VIVADO工程目录结构4图1‑2SDK工程目录结构4图2‑1ZCU102结构图5图2‑2ZCU102UART接口6图3‑1VIVADO工程6图3‑2CPU控制器7图......
  • 尝试写一个SpringBoot的demo
    在Spring官网使用脚手架:  https://start.spring.io/选择3项依赖:   编写代码: 启动运行应用:   启动了8080端口 访问:http://localhost:8080/hyc健康检查:  http://localhost:8080/actuator/health......
  • flink初识
    一、flink:apache开源的一款流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。Flink以数据并行和流水线方式执行任意流数据程序,Flink的流水线运行时系统可以执行批处理和流处理程序。此外,Flink的运行时本身也支持迭代算法的执行。二、Flink是一个计算框架......