-- 定义 source 表
drop table IF EXISTS source_applet_kafka;
CREATE TABLE IF NOT EXISTS source_applet_kafka
(provinceCode String,companyName String,appId String, appName String, eventTime TIMESTAMP(3) ,errorType String, loadTime Int,appVersion String,trmnlStyle String,
WATERMARK FOR eventTime AS eventTime - INTERVAL '10' SECOND ) with (
'connector' = 'kafka',
'topic' = 'applet-topic',
'properties.bootstrap.servers' = '192.168.10.106:6667',
'properties.group.id' = 'testGroup-02',
'format' = 'json',
'scan.startup.mode' = 'group-offsets'
);
-- 定义 sink 表
drop table IF EXISTS applet_request_dtl;
CREATE TABLE IF NOT EXISTS applet_request_dtl
(REPORT_TIME datetime not null default '0000-00-00 00:00:00',
PROVINCE_CODE VARCHAR(8) default null,
COMPANY_NAME VARCHAR(32) default null,
APP_ID VARCHAR(32) default null,
APP_NAME VARCHAR(32) default null,
ERROR_TYPE VARCHAR(32) default null,
APP_VERSION varchar(16),
TRMNL_STYLE varchar(8),
LOADTIME_AVG FLOAT default null,
COUNT int default null)
with (
'connector.type' = 'jdbc',
'connector.url' = 'jdbc:mysql://192.168.10.106:3306/cmbh2test',
'connector.driver' = 'com.mysql.cj.jdbc.Driver',
'connector.username' = 'cmbh',
'connector.password' = 'cmbh123456');
-- mysql数据库建表
CREATE TABLE `applet_request_dtl` (
`START_ENENT_TIME` datetime DEFAULT NULL,
`END_ENENT_TIME` datetime DEFAULT NULL,
`PROVINCE_CODE` varchar(8) DEFAULT NULL,
`COMPANY_NAME` varchar(32) DEFAULT NULL,
`APP_ID` varchar(32) DEFAULT NULL,
`APP_NAME` varchar(32) DEFAULT NULL,
`ERROR_TYPE` varchar(32) DEFAULT NULL,
`APP_VERSION` varchar(16) DEFAULT NULL,
`TRMNL_STYLE` varchar(8) DEFAULT NULL,
`LOADTIME_AVG` int(11) DEFAULT NULL,
`CNT` int(11) DEFAULT NULL,
KEY `START_ENENT_TIME_index` (`START_ENENT_TIME`),
KEY `PROVINCE_CODE_index` (`PROVINCE_CODE`),
KEY `COMPANY_NAME_index` (`COMPANY_NAME`)
)
-- 数据从kafka 插入 mysql
insert into applet_request_dtl
select window_start,window_end,provinceCode,companyname,appId,appName,errorType,appVersion,trmnlStyle,avg(loadTime) as avg_time,count(1) as cnt from TABLE(TUMBLE(TABLE source_applet_kafka, DESCRIPTOR(eventTime),INTERVAL '60' SECOND)) group by window_start,window_end,provinceCode,companyname,appId,appName,errorType,appVersion,trmnlStyle;
标签:varchar,DEFAULT,32,flink,kafka,default,applet,sql,NULL
From: https://www.cnblogs.com/whiteY/p/16803033.html