首页 > 数据库 >flink sql kafka数据接入clickhouse

flink sql kafka数据接入clickhouse

时间:2022-10-18 16:47:57浏览次数:41  
标签:flink String dtl -- bos kafka sql login

--参数

--并行度设置
set  'parallelism.default' ='2';

--reset execution.savepoint.path;
--reset execution.checkpoint.path;

--设置队列
set  'yarn.application.queue' = 'realtime';

-- 定义 source 表


drop table if exists  source_bos_login_dtl_kafka;

create table source_bos_login_dtl_kafka(
serial_number String,
bus_time TIMESTAMP(3),
`result`  String,
errcode String,
serialnum_bg String,
bizcode String,
deal_time String,
serialnum_boss String,
errdesc String,
mbosversion String,
clnt_ver String,
bosscode String,
cid String,
push_cid String,
xk String,
channel_code String,
imei String,
ip String,
mb_type_info String,
scr_pix String,
mb_type_brand String,
sys_plat_ver String,
network_type String,
province_code String,
city_code        String,
event_day String,
WATERMARK FOR bus_time AS bus_time - INTERVAL '10' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'top-bos-login-dtl',
'properties.bootstrap.servers' = '192.168.10.106:6667',
'properties.group.id' = 'top-bos-login-dtl-online-01',
'scan.startup.mode' = 'latest-offset',  
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true',
'format' = 'json'
);

-- 定义 sink 表


drop table if exists  ods_bos_login_dtl_clickhouse;

create table ods_bos_login_dtl_clickhouse(
event_day String,
window_start TIMESTAMP(3),
window_end TIMESTAMP(3),
province_code String,
clnt_ver String,
mbosversion String,
bizcode String,
errcode String,
pv BIGINT,
uv BIGINT
)
WITH (
  'connector' = 'clickhouse',
  'url' = 'clickhouse://192.168.10.106:6667:8123/testdb',
  'database-name' = 'testdb',
  'username' = 'test',
  'password' = 'test123456',
  'table-name' = 'bos_login_stats_all'
);

-- 数据从kafka 插入 hudi


insert into ods_bos_login_dtl_clickhouse 
SELECT event_day,window_start, window_end,province_code,clnt_ver,mbosversion,bizcode,errcode, count(serial_number) as pv,count(distinct serial_number) as uv FROM TABLE(CUMULATE(TABLE source_bos_login_dtl_kafka, DESCRIPTOR(bus_time), INTERVAL '60' SECOND, INTERVAL '1' DAY)) GROUP BY event_day,window_start, window_end,province_code,clnt_ver,mbosversion,bizcode,errcode;

标签:flink,String,dtl,--,bos,kafka,sql,login
From: https://www.cnblogs.com/whiteY/p/16803084.html

相关文章

  • 技术分享| 消息队列Kafka群集部署
    一、简介1、介绍Kafka是最初由Linkedin公司开发,是一个分布式、分区的、多副本的、多订阅者,基于zookeeper协调的分布式日志系统(也可以当做MQ系统),常见可以用于web/nginx日......
  • flink sql kafka数据接入mysql
    --定义source表droptableIFEXISTSsource_applet_kafka;CREATETABLEIFNOTEXISTSsource_applet_kafka(provinceCodeString,companyNameString,appIdStri......
  • Flink WordCount入门
    下面通过一个单词统计的案例,快速上手应用Flink,进行流处理(Streaming)和批处理(Batch)单词统计(批处理)引入依赖<!--flink核心包--><dependency><groupId>org.apa......
  • PostgreSQL 数据库开发规范
    背景PostgreSQL的功能非常强大,但是要把PostgreSQL用好,开发人员是非常关键的。下面将针对PostgreSQL数据库原理与特性,输出一份开发规范,希望可以减少大家在使用PostgreSQL......
  • 记录上传文件到mysql数据库中遇到的问题
    在开发一个Qt的界面程序,想把程序批量生成的数据文件上传进数据库中。最开始尝试将文件读到QByteArray类型的变量中,插入数据表的mediumblob类型里,但是插不了,数据表结果一直......
  • MySQL进阶--存储引擎--2022年10月18日
    第一节  MYSQL体系结构1、第二节  存储引擎简介1、建表时指定存储引擎CREATETABLE表名(字段1字段1类型[COMMENT字段1注释],......字段n字......
  • mybatis_15_在 SqlSessionFactoryBuilder.build() 方法中传入属性值
    查阅SqlSessionFactoryBuilder.java中重载函数build的定义中,存在支持传入Properties的定义  使用参考:Stringresource="mybatis-config2.xml";InputStreaminputS......
  • PgSQL外连接分页时出现重复数据
    今天工作中遇到的问题。SELECT*FROMaLEFTJOINb ONa.c=b.d LIMIT20OFFSET0;SELECT*FROMaLEFTJOINb ONa.c=b.d LIMIT20OFFSET20;上面两条......
  • 上位笔记_04_SQLITE操作(创建以及可视化查看)
    nuget安装sqlite,引用  System.Data.SQLite分X64和X86版本。一般来说,在64位系统上就应该使用X64版本的,但是这样一来开发工作似乎就繁琐了许多如果不区分,就会出现如......
  • MYSQL日志查看
    方法一:登录到mysql查看binlog获取binlog文件列表:mysql>showbinarylogs;查看当前使用的binlog文件:mysql>showmasterstatus;只查看第一个binlog文件的内容:mys......