首页 > 数据库 >flink sql kafka数据接入mysql

flink sql kafka数据接入mysql

时间:2022-10-18 16:34:15浏览次数:53  
标签:varchar DEFAULT 32 flink kafka default applet sql NULL

-- 定义 source 表


drop table IF EXISTS source_applet_kafka;
CREATE TABLE IF NOT EXISTS source_applet_kafka 
(provinceCode String,companyName String,appId String, appName String, eventTime  TIMESTAMP(3) ,errorType String, loadTime Int,appVersion String,trmnlStyle String,
WATERMARK FOR eventTime AS eventTime - INTERVAL '10' SECOND ) with (
  'connector' = 'kafka',
'topic' = 'applet-topic',
'properties.bootstrap.servers' = '192.168.10.106:6667',
'properties.group.id' = 'testGroup-02',
'format' = 'json',
 'scan.startup.mode' = 'group-offsets'
 );

-- 定义 sink 表


drop table IF EXISTS applet_request_dtl;
CREATE TABLE IF NOT EXISTS applet_request_dtl
(REPORT_TIME datetime not null default '0000-00-00 00:00:00',
PROVINCE_CODE VARCHAR(8) default null,
COMPANY_NAME VARCHAR(32) default null,
APP_ID VARCHAR(32) default null,
APP_NAME VARCHAR(32) default null,
ERROR_TYPE VARCHAR(32) default null,
APP_VERSION varchar(16),
TRMNL_STYLE varchar(8),
LOADTIME_AVG FLOAT default null,
COUNT int default null) 
with (
'connector.type' = 'jdbc',
'connector.url' = 'jdbc:mysql://192.168.10.106:3306/cmbh2test',
'connector.driver' = 'com.mysql.cj.jdbc.Driver',
'connector.username' = 'cmbh',
'connector.password' = 'cmbh123456');

-- mysql数据库建表

CREATE TABLE `applet_request_dtl` (
  `START_ENENT_TIME` datetime DEFAULT NULL,
  `END_ENENT_TIME` datetime DEFAULT NULL,
  `PROVINCE_CODE` varchar(8) DEFAULT NULL,
  `COMPANY_NAME` varchar(32) DEFAULT NULL,
  `APP_ID` varchar(32) DEFAULT NULL,
  `APP_NAME` varchar(32) DEFAULT NULL,
  `ERROR_TYPE` varchar(32) DEFAULT NULL,
  `APP_VERSION` varchar(16) DEFAULT NULL,
  `TRMNL_STYLE` varchar(8) DEFAULT NULL,
  `LOADTIME_AVG` int(11) DEFAULT NULL,
  `CNT` int(11) DEFAULT NULL,
  KEY `START_ENENT_TIME_index` (`START_ENENT_TIME`),
  KEY `PROVINCE_CODE_index` (`PROVINCE_CODE`),
  KEY `COMPANY_NAME_index` (`COMPANY_NAME`)
)

-- 数据从kafka 插入 mysql


insert into applet_request_dtl
select window_start,window_end,provinceCode,companyname,appId,appName,errorType,appVersion,trmnlStyle,avg(loadTime) as avg_time,count(1) as cnt from TABLE(TUMBLE(TABLE source_applet_kafka, DESCRIPTOR(eventTime),INTERVAL '60' SECOND)) group by window_start,window_end,provinceCode,companyname,appId,appName,errorType,appVersion,trmnlStyle;

标签:varchar,DEFAULT,32,flink,kafka,default,applet,sql,NULL
From: https://www.cnblogs.com/whiteY/p/16803033.html

相关文章

  • Flink WordCount入门
    下面通过一个单词统计的案例,快速上手应用Flink,进行流处理(Streaming)和批处理(Batch)单词统计(批处理)引入依赖<!--flink核心包--><dependency><groupId>org.apa......
  • PostgreSQL 数据库开发规范
    背景PostgreSQL的功能非常强大,但是要把PostgreSQL用好,开发人员是非常关键的。下面将针对PostgreSQL数据库原理与特性,输出一份开发规范,希望可以减少大家在使用PostgreSQL......
  • 记录上传文件到mysql数据库中遇到的问题
    在开发一个Qt的界面程序,想把程序批量生成的数据文件上传进数据库中。最开始尝试将文件读到QByteArray类型的变量中,插入数据表的mediumblob类型里,但是插不了,数据表结果一直......
  • MySQL进阶--存储引擎--2022年10月18日
    第一节  MYSQL体系结构1、第二节  存储引擎简介1、建表时指定存储引擎CREATETABLE表名(字段1字段1类型[COMMENT字段1注释],......字段n字......
  • mybatis_15_在 SqlSessionFactoryBuilder.build() 方法中传入属性值
    查阅SqlSessionFactoryBuilder.java中重载函数build的定义中,存在支持传入Properties的定义  使用参考:Stringresource="mybatis-config2.xml";InputStreaminputS......
  • PgSQL外连接分页时出现重复数据
    今天工作中遇到的问题。SELECT*FROMaLEFTJOINb ONa.c=b.d LIMIT20OFFSET0;SELECT*FROMaLEFTJOINb ONa.c=b.d LIMIT20OFFSET20;上面两条......
  • 上位笔记_04_SQLITE操作(创建以及可视化查看)
    nuget安装sqlite,引用  System.Data.SQLite分X64和X86版本。一般来说,在64位系统上就应该使用X64版本的,但是这样一来开发工作似乎就繁琐了许多如果不区分,就会出现如......
  • MYSQL日志查看
    方法一:登录到mysql查看binlog获取binlog文件列表:mysql>showbinarylogs;查看当前使用的binlog文件:mysql>showmasterstatus;只查看第一个binlog文件的内容:mys......
  • Java代码审计sql注入
    java_sec_code该项目也可以叫做JavaVulnerabilityCode(Java漏洞代码)。每个漏洞类型代码默认存在安全漏洞(除非本身不存在漏洞),相关修复代码在注释里。具体可查看每个漏......
  • SQL Server 函数大全
    表达式:是常量、变量、列或函数等与运算符的任意组合。以下参数中表达式类型是指表达式经运算后返回的值的类型字符串函数函数名称参数示例说明ascii......