准确阶段:
mysql:
开启mysql日志
kafka:
需检查服务是否正常
maxwell:
这里采用19版本,过新的版本对java版本要求高,我这里是java8
maxwell-1.19.0
maxwell操作:
cd /root/tar_temp/maxwell-1.19.0
bin/maxwell --user='root' --password='1017~Fulin' --port=3306 --host='172.18.1.5' --producer=kafka --kafka.bootstrap.servers=172.18.1.16:9092 --kafka_topic=test01 --filter="exclude:*.*, include:shufu_tag.t_call_record"
运行成功后,在相应的mysql中执行insert操作
然后运行kafka-consumer
cd /opt/cloudera/parcels/CDH-6.3.2-1.cdh6.3.2.p0.1605554/lib/kafka
./bin/kafka-console-consumer.sh --bootstrap-server 172.18.1.16:9092 --topic test01 --from-beginning
出现
则成功。
flink
这里引入了dinky平台,需要在plugins中添加kafka -connect、kafka-client包,然后重启执行代码
CREATE TABLE kafka_input (
`database` string,
`table` string,
`type` string,
ts_get string,
xid_get string,
commit_get string,
-- title ROW<id INT, name STRING, level INT>,
`data` ROW<id_get string,plan_detail_code string,task_code string,callee_phone string,record_id string,record_status string,tag string,staff_tag string,duration string,start_time string,answer_time string,end_time string,intervene_time string,call_media_url string,remark string,created_time string,call_round string,finish_call_times string,custom_guide string,update_time string,visit_guide string,call_stage integer>
) WITH (
'connector' = 'kafka',
'topic' = 'test01',
'properties.bootstrap.servers' = '172.18.1.16:9092',
'properties.group.id' = 'for_source_test',
'scan.startup.mode' = 'latest-offset',
'format' = 'json',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'false'
);
select `database`,`table`,`type`,`data`.plan_detail_code from kafka_input;
运行成功!
标签:1.16,binlog,dinky,string,--,flinksql,kafka,mysql,172.18 From: https://blog.51cto.com/u_14114312/6057374