首页 > 数据库 >FLink1.17-Kafka实时同步到MySQL实践

FLink1.17-Kafka实时同步到MySQL实践

时间:2024-08-20 18:04:07浏览次数:13  
标签:table2 9092 Kafka MySQL test FLink1.17 kafka id

1.组件版本

组件

版本

Kafka

3.7.0

Flink

1.17.0

MySQL

8.0.32

 

2.Kafka生产数据

./kafka-console-producer.sh --broker-list hadoop01:9092,hadoop02:9092,hadoop03:9092 --topic   kafka_test_table2

>{"id":123,"test_age":33}

>{"id":125,"test_age":28}

>{"id":126,"test_age":18}

 

3.Kafka消费数据

./kafka-console-consumer.sh  --bootstrap-server  hadoop01:9092,hadoop02:9092:hadoop03:9092  --topic kafka_test_table2 --from-beginning

 

4.MySQL创建表

 

CREATE TABLE `test_table2` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`test_age` int(11) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=16 DEFAULT CHARSET=utf8;

 

 

5.Flink 启动任务

cd /data/flink-1.17.0/bin/

./start-cluster.sh

 

./sql-client.sh

 

set sql-client.execution.result-mode=tableau ;

create Table kafka_test_table2 (

id    int,

test_age   int

)

with (

'connector' = 'kafka',

'topic' = 'kafka_test_table2',

'scan.startup.mode' = 'earliest-offset',

'properties.bootstrap.servers' = 'hadoop01:9092,hadoop02:9092,hadoop03:9092',

'properties.group.id' = 'group01',

'format' = 'json'

);

 

Create Table rds_test_table2 (

id    int,

test_age   int,

PRIMARY KEY (id)  NOT ENFORCED

)

with (

'connector' = 'jdbc',

'url' = 'jdbc:mysql://192.168.132.22:3306/test?serverTimezone=Asia/Shanghai',

'driver' = 'com.mysql.cj.jdbc.Driver',

'username' = 'root',

'password' = 'Root@1234',

'table-name'= 'test_table2'

);

 

insert into  rds_test_table2  select * from  kafka_test_table2 ;

 

6.Flink WebUI

登录Flink WebUI页面,查看任务是否启动成功

 

点击查看任务是否报错

 

如果出现The server time zone value ‘�й���׼ʱ��’ is unrecognized or represents more than one time zone在MySQL的url连接字符串中添加?serverTimezone=Asia/Shanghai即可解决。

 

7.MySQL验证

登录MySQL数据库,查看数据是否同步成功。

 

标签:table2,9092,Kafka,MySQL,test,FLink1.17,kafka,id
From: https://www.cnblogs.com/yeyuzhuanjia/p/18370009

相关文章

  • MySQL-MGR实战指南:打造企业级高可用数据库集群
    文章目录前言MGR的介绍事务处理流程:实验测试环境:结束语前言在数字化时代,企业的数据安全和业务连续性至关重要。想象一下,当关键业务数据存储在数据库中,而数据库突然出现故障,或者面临硬件故障、网络中断、自然灾害等不可预知的灾难性事件时,企业如何确保数据的完整性和......
  • Linux(CentOS7)安装MySQL8全过程
    下载官方地址:https://dev.mysql.com/downloads/mysql/选择版本前需先看一下服务器的glibc版本ldd--version  上传将下载好的tar包上传到服务器上,这里演示上传到了/usr/local/文件夹下 解压tar -Jxvfmysql-8.0.36-linux-glibc2.17-x86_64.tar.xz ......
  • mysql错误-The server quit without updating PID file
    说明:尽量不要用root用户安装和启动mysql问题示例原因:一般是root用户执行导致,如果MySQL是root以外用户安装的,则用安装的用户执行不会出差固执:这里就是要用root执行。[root@hadoop01mysql]#servicemysqlstartStartingMySQL.Loggingto'/opt/mysql/data/hadoop01.err'.......
  • 【原创】java+swing+mysql网吧管理系统设计与实现
    个人主页:程序员杨工个人简介:从事软件开发多年,前后端均有涉猎,具有丰富的开发经验博客内容:全栈开发,分享Java、Python、Php、小程序、前后端、数据库经验和实战文末有本人名片,希望和大家一起共同努力,一起进步,顶峰相见。开发背景:随着互联网技术的飞速发展和普及,网络已成为人......
  • MySQL主从同步如何保证数据一致性?
    MySQL主从同步是MySQL集群方案中的一种,也是实现难度最低的一种。然而,现在的面试都不问MySQL主从同步原理了,而是开始问主从同步怎么保证数据一致性问题了。所以,今天就给大家安排上了。1.什么是数据一致性?数据一致性是指在一个系统中,数据在不同的部分、不同的时间点,以及不......
  • mysql - 根据某经纬度 从区域列表内筛选符合条件的区域. 地图经纬度 坐标筛选
    作者原创.转载请注明来源我有一个区域列表.每个区域都有一堆经纬度坐标集合它们组成一个不规则图形.然后我有个经纬度坐标想筛选出这个坐标属于那个区域.mysql适合做这样的筛选吗?//创建区域坐标表CREATETABLEregions( idINTAUTO_INCREMENTPRIMARYKEY,......
  • 如何删除数据库下的所有表(mysql)
    要在MySQL中删除数据库下的所有表,你有两个主要选项:一个是删除整个数据库然后重新创建它,另一个是查询所有表的名称并逐一删除它们。下面是这两种方法的步骤:方法1:删除并重新创建数据库这种方法是最简单和最快的,但请注意,它会删除整个数据库,包括其中的所有表、视图、存储过程等。......
  • mysql.user表的数据准确性问题
    mysql.user这个系统表中有些字段的数据是不准确的(或者说是不一定准确,这样表达更严谨一点)。这是一个让人头疼的问题,下面简单述说一下问题,主要是mysql.user表中的password_lifetime,password_reuse_history,password_reuse_time这几个字段的数据都不一定准确。下面简单演示一下,当......
  • 在Python中使用MySQL:保姆级指南
    Python是一种广泛使用的高级编程语言,因其简洁易读以及强大的库支持而受到开发者的青睐。当需要在Python程序中处理数据库时,MySQL是一个流行的选择。MySQL是一个开源的关系数据库管理系统,广泛用于Web应用程序的开发。目录第一步:安装MySQL数据库第二步:安装MySQLConnector第......
  • Centos 7.9系统 源码安装MySQL5.7版本
    mysql5.7安装详细一、MySQL5.7源码安装部署1.1安装环境准备相关依赖包的作用:cmake:由于从MySQL5.5版本开始弃用了常规的configure编译方法,所以需要CMake编译器,用于设置mysql的编译参数,如:安装目录、数据存放目录、字符编码、排序规则等。boost库:从MySQL5.7.5开......