首页 > 数据库 > dinky-binlog-kafka-flinksql流程处理

dinky-binlog-kafka-flinksql流程处理

时间:2023-02-14 19:04:41浏览次数:52  
标签:1.16 binlog dinky string -- flinksql kafka mysql 172.18

准确阶段:

mysql:

开启mysql日志

kafka:

需检查服务是否正常

maxwell:

这里采用19版本,过新的版本对java版本要求高,我这里是java8

maxwell-1.19.0

maxwell操作:

cd /root/tar_temp/maxwell-1.19.0
bin/maxwell --user='root' --password='1017~Fulin' --port=3306 --host='172.18.1.5' --producer=kafka --kafka.bootstrap.servers=172.18.1.16:9092 --kafka_topic=test01 --filter="exclude:*.*, include:shufu_tag.t_call_record"

运行成功后,在相应的mysql中执行insert操作

然后运行kafka-consumer

cd /opt/cloudera/parcels/CDH-6.3.2-1.cdh6.3.2.p0.1605554/lib/kafka
./bin/kafka-console-consumer.sh --bootstrap-server 172.18.1.16:9092 --topic test01 --from-beginning

出现

 dinky-binlog-kafka-flinksql流程处理_mysql

则成功。

flink

这里引入了dinky平台,需要在plugins中添加kafka -connect、kafka-client包,然后重启执行代码

CREATE TABLE kafka_input (
`database` string,
`table` string,
`type` string,
ts_get string,
xid_get string,
commit_get string,
-- title ROW<id INT, name STRING, level INT>,
`data` ROW<id_get string,plan_detail_code string,task_code string,callee_phone string,record_id string,record_status string,tag string,staff_tag string,duration string,start_time string,answer_time string,end_time string,intervene_time string,call_media_url string,remark string,created_time string,call_round string,finish_call_times string,custom_guide string,update_time string,visit_guide string,call_stage integer>
) WITH (
'connector' = 'kafka',
'topic' = 'test01',
'properties.bootstrap.servers' = '172.18.1.16:9092',
'properties.group.id' = 'for_source_test',
'scan.startup.mode' = 'latest-offset',
'format' = 'json',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'false'
);

select `database`,`table`,`type`,`data`.plan_detail_code from kafka_input;


运行成功!

标签:1.16,binlog,dinky,string,--,flinksql,kafka,mysql,172.18
From: https://blog.51cto.com/u_14114312/6057374

相关文章

  • MySQL-利用binlog恢复数据
    MySQL-利用binlog恢复数据  这一篇文章里,我们来记录使用mysql-binlog来恢复数据的整个过程   一、前期准备  1、建库建表  创建数据库blog以及下面的a......
  • MySQL--binlog2sql 安装及使用
    安装:依赖python3,请提前安装开源地址:https://github.com/danfengcao/binlog2sql$curlhttps://bootstrap.pypa.io/get-pip.py-oget-pip.py #下载安装脚本$sudop......
  • flinksql的初始化
    Mavn的依赖<properties><java.version>1.8</java.version><maven.compiler.source>${java.version}</maven.compiler.source><maven.compiler.targ......
  • 认真学习MySQL中的二进制日志(binlog)与中继日志(Relay log)
    binlog即binarylog,二进制日志文件,也叫作变更日志(updatelog)。它记录了数据库所有执行的DDL和DML等数据库更新事件的语句,但是不包含没有修改任何数据的语句(如数据查询语句se......
  • FlinkSQL 时间类型转化使用小结
    https://blog.noname.cc/2023/01/16/20230116/FlinkSQL的时间类型在FlinkSQL中,存在两种时间类型,分别是TIMESTAMP和TIMESTAMP_LTZ.以下示例所用的字段:TIMESTAM......
  • binlog学习
    binlog中记载了数据库发生的变化,比方说新建了一个数据库或者表、表结构发生改变、表中的数据发生了变化时都会记录相应的binlog日志。binlog主要用在下边两个方面:用途一......
  • k8s启动flinksql
    ./bin/kubernetes-session.sh-Dkubernetes.cluster-id=my-second-flink-cluster-Dkubernetes.jobmanager.service-account=flink-service-account-Dkubernetes.containe......
  • mysql8.0设置binlog保存时间,并清除过期日志释放空间
    在线修改mysql>showvariableslike'%expire%';+--------------------------------+---------+|Variable_name|Value|+-------------------------......
  • 数据库日志——binlog、redo log、undo log扫盲
    日志是数据库中比较重要的组成部分,很多核心的功能必须依靠日志才能完成。该篇文章简要介绍了binlog、redolog与undolog,能够在一定程度上拓宽对mysql日志的整体认识。......
  • mysql基于binlog的恢复
    [root@stag-8-460104]#mysql--socket=/tmp/mysql_sandbox20034.sock-umsandbox-p'msandbox'mysql:[Warning]Usingapasswordonthecommandlineinterfacecan......