需求:通过在kafka的topic里面传入json串,再把数据同步到mysql中,这个也可以作为半结构化数据同步的案例
一、添加依赖包
将依赖包放到dinky的pulgins目录和flink的lib目录下,并重启dinky和flink
依赖包下载地址参考:https://www.bookstack.cn/read/ApacheFlink-1.13-zh/6838f5ad108cfcc6.md
当然我建议同时把kafka-clients*.jar的依赖包也放进去
二、创建作业
三、编写finksql代码
SET execution.checkpointing.interval = 6000; SET execution.checkpointing.tolerable-failed-checkpoints = 10; SET execution.checkpointing.timeout =600000; SET execution.checkpointing.externalized-checkpoint-retention = RETAIN_ON_CANCELLATION; SET execution.checkpointing.mode = EXACTLY_ONCE; SET execution.checkpointing.unaligned = true; SET execution.checkpointing.max-concurrent-checkpoints = 1; SET state.checkpoints.num-retained = 3; CREATE TABLE kafka_input ( `id` STRING, `home` STRING, `work` STRING ) WITH ( 'connector' = 'kafka', 'properties.bootstrap.servers' = '172.16.119.28:9092', 'scan.startup.mode' = 'latest-offset', 'format' = 'json', 'topic' = 'gong1' ); CREATE TABLE kafka_out( `id` STRING, `home` STRING, `work` STRING ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://172.16.119.50:3306/test?createDatabaseIfNotExist=true&useSSL=false', 'username' = 'root', 'password' = 'Tj@20220710', 'table-name' = 'kafka_out' ); INSERT INTO kafka_out select id,home,work from kafka_input;
四、运行作业
保存代码并通过dinky自带的语法检查功能
检测语法没问题
运行作业
这个时候作业一直处于运行中的状态,可以通过flink的页面查看
五、在kafka的topic中加入数据
如何创建topic,已经打开生产者和消费者可以参考:https://www.cnblogs.com/braveym/p/13190897.html
我这里已经创建好topic了,并在生产者端传入json串
生产者端传入数据
消费者端接收到数据
查看mysql的表,数据是否同步过来了
可以看到刚刚我们通过kafka传入的两条数据过来了
注意要点:
在给topic传入json字符串的时候,一定要写成一行,不要留有空格,不然会出问题的
正确的姿势:
错误的姿势:
标签:SET,Dinky,STRING,kafka2mysql,checkpointing,kafka,topic,使用,execution From: https://www.cnblogs.com/braveym/p/16803567.html