首页 > 数据库 >Dinky的使用——kafka2mysql

Dinky的使用——kafka2mysql

时间:2022-10-18 18:47:00浏览次数:50  
标签:SET Dinky STRING kafka2mysql checkpointing kafka topic 使用 execution

需求:通过在kafka的topic里面传入json串,再把数据同步到mysql中,这个也可以作为半结构化数据同步的案例

一、添加依赖包

将依赖包放到dinky的pulgins目录和flink的lib目录下,并重启dinky和flink

依赖包下载地址参考:https://www.bookstack.cn/read/ApacheFlink-1.13-zh/6838f5ad108cfcc6.md

 当然我建议同时把kafka-clients*.jar的依赖包也放进去

 

 

 

二、创建作业

 

 

 

三、编写finksql代码

SET execution.checkpointing.interval = 6000;
SET execution.checkpointing.tolerable-failed-checkpoints = 10;
SET execution.checkpointing.timeout =600000;
SET execution.checkpointing.externalized-checkpoint-retention = RETAIN_ON_CANCELLATION;
SET execution.checkpointing.mode = EXACTLY_ONCE;
SET execution.checkpointing.unaligned = true;
SET execution.checkpointing.max-concurrent-checkpoints = 1;
SET state.checkpoints.num-retained = 3;
CREATE TABLE kafka_input (
`id` STRING,
`home` STRING,
`work` STRING
) WITH (
'connector' = 'kafka',
'properties.bootstrap.servers' = '172.16.119.28:9092',
'scan.startup.mode' = 'latest-offset',
'format' = 'json',
'topic' = 'gong1'
);

CREATE TABLE kafka_out( 
`id` STRING,
`home` STRING,
`work` STRING
) WITH (
  'connector' = 'jdbc', 
  'url' = 'jdbc:mysql://172.16.119.50:3306/test?createDatabaseIfNotExist=true&useSSL=false',
  'username' = 'root',
  'password' = 'Tj@20220710', 
  'table-name' = 'kafka_out' 
);

INSERT INTO kafka_out select id,home,work from kafka_input;

 

四、运行作业

保存代码并通过dinky自带的语法检查功能

 

检测语法没问题

运行作业

 

 

 

这个时候作业一直处于运行中的状态,可以通过flink的页面查看

 

 

五、在kafka的topic中加入数据

 如何创建topic,已经打开生产者和消费者可以参考:https://www.cnblogs.com/braveym/p/13190897.html

我这里已经创建好topic了,并在生产者端传入json串

 

生产者端传入数据

 

 

 

消费者端接收到数据

 

 

 

查看mysql的表,数据是否同步过来了

 

 

 可以看到刚刚我们通过kafka传入的两条数据过来了

 

注意要点:

 在给topic传入json字符串的时候,一定要写成一行,不要留有空格,不然会出问题的

 

正确的姿势:

 

 

 

错误的姿势:

 

标签:SET,Dinky,STRING,kafka2mysql,checkpointing,kafka,topic,使用,execution
From: https://www.cnblogs.com/braveym/p/16803567.html

相关文章

  • 神器软件:虚拟机软件安装与使用
    问什么是虚拟机?答虚拟机是一个软件。在自己电脑上通过虚拟机软件,可以模拟出新的电脑,并为其安装操作系统。问虚拟机有什么用?答使用电脑时,对一些软件不放心,担心其损坏电脑。或......
  • MAC m1使用homebrew安装redis报错
    报错信息tar:Erroropeningarchive:Failedtoopen'/Users/peas/Library/Caches/Homebrew/downloads/df016fccee96887f4f24a989ec9b08c04eef867bfb15f67a6e5eee58f6ce......
  • 使用istioctl 快速部署Istio
    环境介绍k8s集群:v1.25.2istio版本:1.15.2下载Istio方法一#curl-Lhttps://istio.io/downloadIstio|ISTIO_VERSION=1.15.2TARGET_ARCH=x86_64sh-%Total%......
  • Manifest使用示例3-安装并使用本地私有库
    使用示例 本示例包括两部分,第一,构建本地的私有库,第二,使用本地私有库一、构建本地私有库构建本地私有库的相关文件,示例为testport。文件结构: E:/ |--Manifest/ |----my......
  • Manifest使用示例6 - 安装并使用git私有仓库
    有的开发者会借助github创建自己的私有库,那么如何利用vcpkg使用git上的私有库呢? 请参考以下示例。 使用示例1.准备一个私有仓库Cheney-W/test,且为这个私有库生成一......
  • 考研数学 | 关于考研数学真题使用方法和模考的个人建议
    关于22年真题建议留到最后,供自己模拟使用!配备答题卡,体验考场的感觉,也可是20、21、22这三年,这个自己斟酌一下就行!09至21年真题如何做?个人的建议是:第一遍:先按套卷做,模......
  • Manifest使用示例2 - 依赖多个vcpkg 的历史版本库
    以下示例将提供在自定义工程中使用vcpkg中fmt、sqlite3、zlib的固定版本。Manifest模式-CMake工程1.示例根目录:E:/test_manifest,文件目录结构如下:E:/ |--test......
  • Manifest使用示例4 - Binarycaching使用缓存文件
    在多人开发环境中,我们仅希望一个人管理项目需要的所有第三方库,并使用服务器部署和分发vcpkg中已编译的库,此时可以使用vcpkg的Binarysource特性。VCPKG默认开启Binaryca......
  • AI智能视频融合平台EasyCVR如何使用SQL语句批量开启通道音频?
    EasyCVR视频融合云平台可支持多协议、多类型的设备接入,包括国标GB28181、RTSP/Onvif、RTMP协议,以及海康/大华SDK、海康Ehome等,对外可分发RTSP、RTMP、FLV、HLS、WebRTC等格......
  • 19、python模块 模块的导入和使用
    目录一、模块1、简介2、模块的表现形式二、模块的分类1、自定义模块2、内置模块3、第三方模块三、导入模块的句式学前须知:1、import句式2、from...import...句式3、补充说......