--参数
--并行度设置
set 'parallelism.default' ='2';
--reset execution.savepoint.path;
--reset execution.checkpoint.path;
--设置队列
set 'yarn.application.queue' = 'realtime';
-- 定义 source 表
drop table if exists source_bos_login_dtl_kafka;
create table source_bos_login_dtl_kafka(
serial_number String,
bus_time TIMESTAMP(3),
`result` String,
errcode String,
serialnum_bg String,
bizcode String,
deal_time String,
serialnum_boss String,
errdesc String,
mbosversion String,
clnt_ver String,
bosscode String,
cid String,
push_cid String,
xk String,
channel_code String,
imei String,
ip String,
mb_type_info String,
scr_pix String,
mb_type_brand String,
sys_plat_ver String,
network_type String,
province_code String,
city_code String,
event_day String,
WATERMARK FOR bus_time AS bus_time - INTERVAL '10' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'top-bos-login-dtl',
'properties.bootstrap.servers' = '192.168.10.106:6667',
'properties.group.id' = 'top-bos-login-dtl-online-01',
'scan.startup.mode' = 'latest-offset',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true',
'format' = 'json'
);
-- 定义 sink 表
drop table if exists ods_bos_login_dtl_clickhouse;
create table ods_bos_login_dtl_clickhouse(
event_day String,
window_start TIMESTAMP(3),
window_end TIMESTAMP(3),
province_code String,
clnt_ver String,
mbosversion String,
bizcode String,
errcode String,
pv BIGINT,
uv BIGINT
)
WITH (
'connector' = 'clickhouse',
'url' = 'clickhouse://192.168.10.106:6667:8123/testdb',
'database-name' = 'testdb',
'username' = 'test',
'password' = 'test123456',
'table-name' = 'bos_login_stats_all'
);
-- 数据从kafka 插入 hudi
insert into ods_bos_login_dtl_clickhouse
SELECT event_day,window_start, window_end,province_code,clnt_ver,mbosversion,bizcode,errcode, count(serial_number) as pv,count(distinct serial_number) as uv FROM TABLE(CUMULATE(TABLE source_bos_login_dtl_kafka, DESCRIPTOR(bus_time), INTERVAL '60' SECOND, INTERVAL '1' DAY)) GROUP BY event_day,window_start, window_end,province_code,clnt_ver,mbosversion,bizcode,errcode;
标签:flink,String,dtl,--,bos,kafka,sql,login
From: https://www.cnblogs.com/whiteY/p/16803084.html