首页 > 其他分享 >Kafka核心API -- Connect

Kafka核心API -- Connect

时间:2023-07-23 21:55:45浏览次数:36  
标签:-- 192.168 Kafka topic API connect mysql Connect

Connect基本概念

  • Kafka Connect是Kafka流式计算的一部分
  • Kafka Connect主要用来与其他中间件建立流式通道
  • Kafka Connect支持流式和批量处理集成

 环境准备

创建两个表

create table users_bak(
    `uuid` int primary key auto_increment,
    `name` VARCHAR(20),
    `age` INT    
)
create table users_bak(
    `uuid` int primary key auto_increment,
    `name` VARCHAR(20),
    `age` INT    
)

导入JDBC

 安装unzip,并解压con

yum install -y unzip

将下面两个jar文件拷贝到解压后的包内

cp /opt/plugins/mysql-connector-java-*.jar ./

修改kafka配置文件(分别为集群版和单机版的配置文件,更改集群版)

 更改以下内容

bootstrap.servers=192.168.75.136:9092
group.id=connect-cluster # 集群之间统一
# 默认JSON
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
# 打开rest.port
rest.port=8083
# 引入外部依赖
plugin.path=/opt/plugins

Kafka Connect Source 和 MySQL集成

操作命令

## connect启动命令
bin/connect-distributed.sh -daemon config/connect-distributed.properties
bin/connect-distributed.sh config/connect-distributed.properties

验证是否启动成功

# 输入网址,出现下图
http://192.168.75.136:8083/connector-plugins
# 查看connector
http://192.168.220.128:8083/connectors

 Git Bash执行下列操作,添加一个topic

curl -X POST -H 'Content-Type: application/json' -i 'http://192.168.75.136:8083/connectors' \
--data \
'{"name":"upload-mysql","config":{
"connector.class":"io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url":"jdbc:mysql://192.168.75.1:3306/kafka_study?user=root&password=root",
"table.whitelist":"users",
"incrementing.column.name": "uuid",
"mode":"incrementing",
"topic.prefix": "mysql-test-"}}'

# name:起名(唯一标识)
# connection.url:填数据库信息
# table.whitelist:白名单(哪些表被加载)
# incrementing.column.name:根据哪个字段判断是否是增量更新
# topic.prefix:topic前缀

 连接正常状态是201。无法连接的话需要配置一个拥有远程连接数据库权限的账号

create user 'root'@'%' identified with mysql_native_password by 'root';
ALTER USER 'root'@'%' IDENTIFIED BY 'root' PASSWORD EXPIRE NEVER;
grant all privileges on *.* to 'root'@'%' with grant option;
flush privileges;

然后刷新http://192.168.220.128:8083/connectors页面,得到以下结果

 数据库插入数据

 查看topic(从头开始读)

bin/kafka-console-consumer.sh --bootstrap-server 192.168.75.136:9092 --topic mysql-test-users --from-beginning

 

成功写入

 

Kafka Connect Sink 和 MySQL集成

消息如何从Kafka传递到数据库

name一定要与上面不同

curl -X POST -H 'Content-Type: application/json' -i 'http://192.168.75.136:8083/connectors' \
--data \
'{"name":"download-mysql","config":{
"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url":"jdbc:mysql://192.168.75.1:3306/kafka_study?user=root&password=root",
"topics":"mysql-test-users",
"auto.create":"false",
"insert.mode": "upsert",
"pk.mode":"record_value",
"pk.fields":"uuid",
"table.name.format": "users_bak"}}'

# topics表示从哪个topic读数据
# auto.create:是否自动创建表

成功

可以成功读入。第二行读入的是null,为什么会数据丢失不是很懂。

 

删除connect指令

curl -X DELETE http://192.168.75.136:8083/connectors/download-mysql

 

Kafka Connect关键词

Workers

  • 执行Connector和Task的运行进程。Connectors 和Task 属于逻辑单元,而Worker 是实际运行逻辑单元的进程。
  • 两种模式:Standalone和Distributed
    • Standalone Workers

                    Standalone模式是最简单的模式,用单一进程负责执行所有connector和task。适用于特定场景,如收集主机日志

    • Distributed Workers

                    分布式模式为Kafka Connect提供了可扩展性和自动容错能力,使用更广。在分布式模式下,相同group.id的Worker,会自动组成集群。当新增Worker,或者有Worker挂掉时,其余的worker将检测到这一点,集群会自                        动协调分配所有的Connector 和 Task(这个过程称为Rebalance)

Connector 和Task

Connectors

通过管理task来协调数据流的高级抽象。可分为两种connectors:

  • Source connector

          源连接器可以从多种渠道(如:数据库、静态文件、HDFS客户端等)拉取数据到kafka topic中

  • Sink connector

          宿连接器将topic中的数据push到多种目的端消费。将Kafka主题中的数据传递到Elasticsearch等二级索引中,或Hadoop等批处理系统中,用于离线分析。

Tasks

如何将数据复制到Kafka或从Kafka复制数据的实现。实际进行数据传输的单元,和连接器一样同样分为 Source和Sink
Task是Connect数据模型中的主要处理数据的角色。每个connector实例协调一组实际复制数据的task。通过允许connector将单个作业分解为多个task,Kafka Connect提供了内置的对并行性和可伸缩数据复制的支持,只需很少的配置。这些任务没有存储任何状态。任务状态存储在Kafka中的特殊主题config.storage.topic和status.storage.topic中。因此,可以在任何时候启动、停止或重新启动任务,以提供弹性的、可伸缩的数据管道。

 Task Rebalance

 

Converters

用于在Connect和外部系统发送或接收数据之间转换数据的代码。

Kafka Connect 通过 Converter 将数据在Kafka(字节数组)与Task(Object)之间进行转换。
在向Kafka写入或从Kafka读取数据时,Converter是使Kafka Connect支持特定数据格式所必需的。

task使用转换器将数据格式从字节更改为连接内部数据格式,反之亦然。

 

 

elk的logstash做迁移更好用

 

标签:--,192.168,Kafka,topic,API,connect,mysql,Connect
From: https://www.cnblogs.com/szhNJUPT/p/17574801.html

相关文章

  • spring6 ioc aop 从入门到精通零基础进阶学习路线?
    当你已经掌握了Spring框架的基础知识以及IoC和AOP的核心概念后,可以进一步深化你的学习。以下是更详细的学习路线:1.IoC容器进阶:-学习如何自定义Bean的初始化和销毁方法,并了解Bean生命周期的各个阶段。-深入了解Spring的作用域(Scope)概念,如单例模式、原型模式、会话模式和请求模......
  • ES 初学 1
    GET_search{"query":{"match_all":{}}}创建索引PUTpersonPUTperson2删除索引DELETEperson2查询索引GETperson2GETperson添加映射PUT/person/_mapping{"properties":{"name":{"type":"text"},&quo......
  • 7.23 做题记录
    P3897[湖南集训]CrazyRabbit考虑一个点到圆的两个切点构成的圆弧,如果两个点连线与圆不交,那么两端圆弧是相交但不包含的,且把一段圆弧取反不影响答案。断环成链,原问题等价于选最多的区间\((l,r)\)满足对于所有\(i<j\),\(l_j<r_i\)枚举起始区间做LCS即可,时间复杂度\(O(......
  • 牛客多校第二场-H
    H-0and1inBIT op1-->-x-1op2-->x+1由线性代数知识推每次操作要乘的矩阵,线段树维护一个矩阵信息 [op,d,1]就是代表一个f(x)=kx+b的方程,根据线性代数知识用矩阵表示该方程->f(x)=op*x+d,最后一个1只是凑矩阵用的,f代表该矩阵,因为刚开始就是x,所以op=1,d=0 #inclu......
  • mesg
    mesg设置当前终端的写权限补充说明mesg命令用于设置当前终端的写权限,即是否让其他用户向本终端发信息。将mesg设置y时,其他用户可利用write命令将信息直接显示在您的屏幕上。语法mesg(参数)参数y/n:y表示运行向当前终端写信息,n表示禁止向当前终端写信息。实例[root@local......
  • mii-tool
    mii-tool配置网络设备协商方式的工具补充说明mii-tool命令是用于查看、管理介质的网络接口的状态,有时网卡需要配置协商方式,比如10/100/1000M的网卡半双工、全双工、自动协商的配置。但大多数的网络设备是不用我们来修改协商,因为大多数网络设置接入的时候,都采用自动协商来解决......
  • mkbootdisk
    mkbootdisk可建立目前系统的启动盘补充说明mkbootdisk命令用来为当前运行的系统创建能够单独使用的系统引导软盘,以便在系统出现故障时能够启动操作进行适当的修复工作。语法mkbootdisk(选项)(参数)选项--device<设备>:指定设备;--mkinitrdargs<参数>:设置mkinitrd的参数;--......
  • mkcert
    mkcert用来生成自签证书的工具示例mkcert是GO编写的,一个简单的零配置的用来生成自签证书的工具。下面给一个简单的示例,在本地生成自签证书,并使用让nc使用生成的证书。~··············································......
  • mkdir
    mkdir用来创建目录补充说明mkdir命令用来创建目录。该命令创建由dirname命名的目录。如果在目录名的前面没有加任何路径名,则在当前目录下创建由dirname指定的目录;如果给出了一个已经存在的路径,将会在该目录下创建一个指定的目录。在创建目录时,应保证新建的目录与它所在目录下......
  • mke2fs
    mke2fs创建磁盘分区上的“etc2/etc3”文件系统补充说明mke2fs命令被用于创建磁盘分区上的“etc2/etc3”文件系统。语法mke2fs(选项)(参数)选项-b<区块大小>:指定区块大小,单位为字节;-c;检查是否有损坏的区块;-f<不连续区段大小>:指定不连续区段的大小,单位为字节;-F:不管指定......