首页 > 数据库 >通过Maxwell同步mysql数据至kafka

通过Maxwell同步mysql数据至kafka

时间:2023-06-23 23:00:52浏览次数:44  
标签:bin -- Maxwell maxwell 9092 kafka mysql server

实验环境

本地虚拟机
maraidb 10.8.8
kafka 2.12-3.3.1
maxwell由容器部署

1 mariadb

1.1 配置log_bin

配置文件中加入如下内容

server-id = 111
log_bin = mysql-bin
binlog_format = ROW
expire_logs_days = 1

重启服务

systemctl restart mariadb

查询命令

SHOW VARIABLES LIKE 'log_bin%';

1.2 创建用户

CREATE USER 'maxwell'@'%' IDENTIFIED BY '123456';
GRANT ALL ON maxwell.* TO 'maxwell'@'%';
GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE on *.* to 'maxwell'@'%'; 
flush privileges;

2 kafka

参考前期博文《单节点kafka部署笔记》

2.1 修改配置

修改kafka目录下的config/kraft/server.properties

listeners=PLAINTEXT://:9092,CONTROLLER://:9093
advertised.listeners=PLAINTEXT://172.17.0.1:9092

2.2 启动kafka

KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"
bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server.properties
bin/kafka-server-start.sh config/kraft/server.properties &

2.3 创建topic

bin/kafka-topics.sh --create --topic maxwell-mysql --bootstrap-server localhost:9092

3 maxwell

3.1 拉取镜像

docker pull zendesk/maxwell

3.2 测试

创建容器

docker run -it --rm zendesk/maxwell bin/maxwell --user='maxwell' --password='123456' --host='172.17.0.1' --producer=stdout

写入数据

create database company;
CREATE TABLE products (id int(10), name varchar(255), price int(20));
insert into products values (1, "car001", 10000);

即可看到数据库操作

{"database":"company","table":"products","type":"insert","ts":1687524147,"xid":1640,"commit":true,"data":{"id":1,"name":"car001","price":10000}}

中断后容器会自动删除

4 正式使用

4.1 启动容器

docker run -d --name maxwell zendesk/maxwell bin/maxwell --user='maxwell' --password='123456' --host='172.17.0.1' --producer=kafka --kafka.bootstrap.servers='172.17.0.1:9092' --kafka_topic=maxwell --log_level=debug

4.2 kafka读取

创建一个consumer

bin/kafka-console-consumer.sh --topic maxwell --from-beginning --bootstrap-server localhost:9092

即可在终端看到变化数据

标签:bin,--,Maxwell,maxwell,9092,kafka,mysql,server
From: https://www.cnblogs.com/virtualzzf/p/17500438.html

相关文章

  • debezium同步mysql数据至kafka(未完待续)
    实验环境全部部署于本地虚拟机1mysql参考官方文档和根据官方示例镜像(debezium/example-mysql,mysql版本为8.0.32)1.1创建用户官方镜像里一共有三个账号debezium:connect用户mysqluser:普通用户replicator:用于主从?设置命令createuser'debezium'@'%'identifiedby"db......
  • mysql基础
    一存储引擎1mysql存储引擎的种类:MYISAM InnoDB(默认)2.MYISAM和InnoDB的区别在于InnoDB支持事物处理和外键约束3.MYISAM和InnoDB的应用场景的区别:MYISAM不需要事物,空间小,已查询访问为主;InnoDB多删除,更新操作,安全性高,事物处理即并发控制查询存储文件showvariableslike'&sto......
  • mysql索引优点缺点及命令
    索引是什么:索引是对数据库表中一列或者多列的值进行排序的一种结构,使用索引可提高数据库中特定数据的查询速度。(索引本质上是数据库结构,拥有排序跟查找两种功能,可以理解为排好顺序,快速查询数据库)。索引优点:加快查询速度。创建唯一索引保证了数据的唯一性。3.实现数据的完整性,加速表......
  • springboot整合mysql和clickhouse多数据源
    1、添加依赖<!--MyBatis-PlusStarter--><dependency><groupId>com.baomidou</groupId><artifactId>mybatis-plus-boot-starter</artifactId><version>3.2.0</version></dependency>......
  • mysql的IN查询优化
    mysql的IN里面的数量太大,比如大于1千时,查询的性能就会差很多。有以下的解决方法。解决方法一:拆分IN的数量IN数量超过1千,就拆成多条sql,每条sql的IN数量不超过1千。用OR或者UNION进行SQL改写。也可以使用Java写代码,把IN数量进行拆分,每条sql的IN数量不超过1千。多次执行。......
  • Mysql存储引擎
    原文链接:https://blog.csdn.net/lzb348110175/article/details/106555504本文目录:1.MySQL体系结构2.存储引擎介绍3.MySQL存储引擎特性4.MySQL有哪些存储引擎5.了解MySQL数据存储方式6.MySQL存储引擎介绍6.1CSV存储引擎6.1.1CSV介绍6.1.2使用CSV存储引擎......
  • mysql的数据类型以及mysql中的int11是什么意思
    今天抽时间来讲一下mysql里的知识点,之前有不少人问过我,mysql中的int(11),这个11到底是啥意思?是11位的意思吗?你是否也想过这个问题,是否也有这个疑问?ok,今天就展开来讲一下,用通俗易懂的大白话来给你彻底搞明白一、跟你扯点二进制的小东西要讲清楚这个问题,我先来给大家科普一点计算......
  • MySQL事务四大隔离级别分析
    什么是事务?事务,由一个有限的数据库操作序列构成,这些操作要么全部执行,要么全部不执行,是一个不可分割的工作单位。事务的四大特性原子性:事务作为一个整体被执行,包含在其中的对数据库的操作要么全部都执行,要么都不执行。一致性:指在事务开始之前和事务结束以后,数据不会被破坏,......
  • Mysql 事务
    1.事务基本特性ACID1.1原子性:     指的是一个事务中的操作要么全部成功,要么全部失败。1.2一致性     指的是数据库总是从一个一致性的状态转换到另外一个一致性的状态。比如A转账给B100块钱,假设中间sql执行过程中系统崩溃A也不会损失100块,因为事务没有提交,修......
  • mysql
    一登录mysql 1.windowsCMD登录输入mysql -uroot-p1234562.mysql命令界面登录:输入密码1234563. 命令行登录:在navicat中连接mysql输入密码123456二 初识mysql数据库简介1.为何需要数据库可以持久化2.数据库能够做什么存储大量数据,方便检索和访问(数据库中的数据量大)......