首页 > 其他分享 >Debezium+KafkaConnect+Confluent实现企业级实时数据复制平台

Debezium+KafkaConnect+Confluent实现企业级实时数据复制平台

时间:2023-08-19 12:37:47浏览次数:43  
标签:opt 10.101 http 8081 KafkaConnect Confluent ce kafka 企业级

【I】集群规划


5台节点

IP地址    

10.101.1.45  ZK、Kafka、Debezium Connector、JDK、Debezium UI、MySQL、Kafka-Eagle

10.101.1.46  ZK、Kafka、Debezium Connector、JDK

10.101.1.47  ZK、Kafka、Debezium Connector、JDK

10.101.1.48  ZK、Kafka、Debezium Connector、JDK

10.101.1.49  ZK、Kafka、Debezium Connector、JDK


从对应官网上下载所需资源包并上传到集群每个节点 /opt/src 目录下

apache-zookeeper-3.8.1-bin.tar.gz

debezium-connector-jdbc-2.3.0.Final-plugin.tar.gz

debezium-connector-postgres-2.3.0.Final-plugin.tar.gz

jdk-17_linux-x64_bin.tar.gz

kafka_2.13-3.5.0.tgz


【II】配置Java环境


# 登录45节点

ssh [email protected]

cd /opt/src


# 解压并移动/opt目录下

tar -xvf jdk-17_linux-x64_bin.tar.gz

mv jdk-17.0.4.1/ ../


# 设置环境变量

vi /etc/profile


# 在末尾增加 Java Environment

export JAVA_HOME=/opt/jdk-17.0.4.1

export PATH=$JAVA_HOME/bin:$PATH

export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar


# 应用环境变量

source /etc/profile


# 查看Java版本, 返回 java version "17.0.4.1" 2022-08-18 LTS 即设置成功

java -version


# 其他4台节点重复相同的操作



【III】ZooKeeper集群配置


# 登录45节点

ssh [email protected]

cd /opt/src


# 解压

tar -xvf apache-zookeeper-3.8.1-bin.tar.gz

mv apache-zookeeper-3.8.1-bin ../zookeeper-3.8.1


# 创建存放数据和日志的目录

cd /opt/zookeeper-3.8.1

mkdir data logs


# 修改配置文件

cp conf/zoo_sample.cfg conf/zoo.cfg

vi conf/zoo.cfg


# 增加或修改如下参数


dataDir=/opt/zookeeper-3.8.1/data

dataLogDir=/opt/zookeeper-3.8.1/logs

admin.serverPort=8887

server.45=10.101.1.45:2888:3888

server.46=10.101.1.46:2888:3888

server.47=10.101.1.47:2888:3888

server.48=10.101.1.48:2888:3888

server.49=10.101.1.49:2888:3888


# 写入当前节点在集群中的唯一标识符(标识符不能每个节点上的值不能相同,可以用12345,个人喜欢用IP段最后一位来区分)

echo 45 > data/myid


# 以上操作在另外4个节点做同样的操作后,启动ZK集群

bin/zkServer.sh start

bin/zkServer.sh status   # 查看节点状态, 应该有1台返回leader,另外4台是follower


【IV】Kafka集群配置


# 登录45节点

ssh [email protected]

cd /opt/src


# 解压

tar -xvf kafka_2.13-3.5.0.tgz

mv kafka_2.13-3.5.0 ../


# 创建存放数据和日志的目录

cd /opt/kafka_2.13-3.5.0

mkdir data logs


# 编辑kafka server配置文件, 增加或修改如下参数, 除broker.id和listeners按各自ip来区分外,其余参数在每个节点上一样

vi config/server.properties


broker.id=45

delete.topic.enable=true

auto.create.topics.enable=false

listeners=PLAINTEXT://10.101.1.45:9092

log.dirs=/opt/kafka_2.13-3.5.0/data

num.partitions=10

default.replication.factor=3

min.insync.replicas = 2

num.recovery.threads.per.data.dir=3

offsets.topic.replication.factor=3

transaction.state.log.replication.factor=3

transaction.state.log.min.isr=3

log.retention.hours=72

zookeeper.connect=10.101.1.45:2181,10.101.1.46:2181,10.101.1.47:2181,10.101.1.48:2181,10.101.1.49:2181

unclean.leader.election.enable=false

auto.leader.rebalance.enable=false


# jvm调整

vi bin/kafka-server-start.sh

export KAFKA_HEAP_OPTS="-Xmx2G -Xms2G"


# 以上操作在另外4个节点做同样的操作后,启动Kafka集群

bin/kafka-server-start.sh -daemon config/server.properties > /dev/null &


# 检查日志有无错误信息

more logs/server.log


【V】Kafka Connect集群配置


# 创建插件目录

cd /opt/kafka_2.13-3.5.0

mkdir plugins


# 将从Debezium官网上下载的Postgres以及jdbc连接器包放入plugins目录

cd /opt/src

tar -xvf debezium-connector-postgres-2.3.0.Final-plugin.tar.gz

cp debezium-connector-postgres/*.jar ../kafka_2.13-3.5.0/plugins/

tar -xvf debezium-connector-jdbc-2.3.0.Final-plugin.tar.gz

cp debezium-connector-jdbc/*.jar ../kafka_2.13-3.5.0/plugins/


# 将jar包远程复制到其他4个节点对应目录下

cd /opt/kafka_2.13-3.5.0

scp plugins/*.jar [email protected]:/opt/kafka_2.13-3.5.0/plugins/

scp plugins/*.jar [email protected]:/opt/kafka_2.13-3.5.0/plugins/

scp plugins/*.jar [email protected]:/opt/kafka_2.13-3.5.0/plugins/

scp plugins/*.jar [email protected]:/opt/kafka_2.13-3.5.0/plugins/


# 修改connect配置文件

cd /opt/kafka_2.13-3.5.0

vi config/connect-distributed.properties


# 增加或修改如下参数


bootstrap.servers=10.101.1.45:9092,10.101.1.46:9092,10.101.1.47:9092,10.101.1.48:9092,10.101.1.49:9092

group.id=bsd-connect-cluster

key.converter.schemas.enable=true

value.converter.schemas.enable=true


offset.storage.topic=connect-offsets

offset.storage.replication.factor=3

offset.storage.partitions=10


config.storage.topic=connect-configs

config.storage.replication.factor=3

config.storage.partitions=1


status.storage.topic=connect-status

status.storage.replication.factor=3

status.storage.partitions=10


listeners=HTTP://10.101.1.45:8083

plugin.path=/opt/kafka_2.13-3.5.0/plugins,


# 复制配置文件到其余4个节点对应目录下, 修改listeners参数的对应IP

scp config/connect-distributed.properties [email protected]:/opt/kafka_2.13-3.5.0/config/

scp config/connect-distributed.properties [email protected]:/opt/kafka_2.13-3.5.0/config/

scp config/connect-distributed.properties [email protected]:/opt/kafka_2.13-3.5.0/config/

scp config/connect-distributed.properties [email protected]:/opt/kafka_2.13-3.5.0/config/


# jvm调整, 每个节点都要调整

vi bin/connect-distributed.sh

export KAFKA_HEAP_OPTS="-Xms8G -Xmx16G -Xmn6G"


# 挨个节点启动Kafka Connect

cd /opt/kafka_2.13-3.5.0

bin/connect-distributed.sh -daemon config/connect-distributed.properties > /dev/null &

tail -f logs/connect.log


# 查看集群是否正常运行

curl -X GET http://10.101.1.45:8083

{"version":"3.5.0","commit":"c97b88d5db4de28d","kafka_cluster_id":"2qAwkH6ISlW8lIX5vC1qHQ"}



【VI】数据库的准备工作


# PostgreSQL,原理是通过创建复制槽来监听日志


1、检查是否有phoutput插件,没有的话要安装一下

2、设置wal_level参数为logical,需要重启数据库生效

3、创建具有复制权限的用户

  CREATE ROLE <replication_group>;

  GRANT REPLICATION_GROUP TO <original_owner>;

  GRANT REPLICATION_GROUP TO <replication_user>;

 

# Oracle,原理是通过Logminer来监听日志,下面实例为12c及以后版本,11g及以前把 CONTAINER=ALL删除就行


sqlplus sys/top_secret@//localhost:1521/ORCLCDB as sysdba

 CREATE TABLESPACE logminer_tbs DATAFILE '/opt/oracle/oradata/ORCLCDB/logminer_tbs.dbf'

   SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;

 exit;


sqlplus sys/top_secret@//localhost:1521/ORCLPDB1 as sysdba

 CREATE TABLESPACE logminer_tbs DATAFILE '/opt/oracle/oradata/ORCLCDB/ORCLPDB1/logminer_tbs.dbf'

   SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;

 exit;


sqlplus sys/top_secret@//localhost:1521/ORCLCDB as sysdba


 CREATE USER c##dbzuser IDENTIFIED BY dbz

   DEFAULT TABLESPACE logminer_tbs

   QUOTA UNLIMITED ON logminer_tbs

   CONTAINER=ALL;


 GRANT CREATE SESSION TO c##dbzuser CONTAINER=ALL;

 GRANT SET CONTAINER TO c##dbzuser CONTAINER=ALL;

 GRANT SELECT ON V_$DATABASE to c##dbzuser CONTAINER=ALL;

 GRANT FLASHBACK ANY TABLE TO c##dbzuser CONTAINER=ALL;

 GRANT SELECT ANY TABLE TO c##dbzuser CONTAINER=ALL;

 GRANT SELECT_CATALOG_ROLE TO c##dbzuser CONTAINER=ALL;

 GRANT EXECUTE_CATALOG_ROLE TO c##dbzuser CONTAINER=ALL;

 GRANT SELECT ANY TRANSACTION TO c##dbzuser CONTAINER=ALL;

 GRANT LOGMINING TO c##dbzuser CONTAINER=ALL;


 GRANT CREATE TABLE TO c##dbzuser CONTAINER=ALL;

 GRANT LOCK ANY TABLE TO c##dbzuser CONTAINER=ALL;

 GRANT CREATE SEQUENCE TO c##dbzuser CONTAINER=ALL;


 GRANT EXECUTE ON DBMS_LOGMNR TO c##dbzuser CONTAINER=ALL;

 GRANT EXECUTE ON DBMS_LOGMNR_D TO c##dbzuser CONTAINER=ALL;


 GRANT SELECT ON V_$LOG TO c##dbzuser CONTAINER=ALL;

 GRANT SELECT ON V_$LOG_HISTORY TO c##dbzuser CONTAINER=ALL;

 GRANT SELECT ON V_$LOGMNR_LOGS TO c##dbzuser CONTAINER=ALL;

 GRANT SELECT ON V_$LOGMNR_CONTENTS TO c##dbzuser CONTAINER=ALL;

 GRANT SELECT ON V_$LOGMNR_PARAMETERS TO c##dbzuser CONTAINER=ALL;

 GRANT SELECT ON V_$LOGFILE TO c##dbzuser CONTAINER=ALL;

 GRANT SELECT ON V_$ARCHIVED_LOG TO c##dbzuser CONTAINER=ALL;

 GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO c##dbzuser CONTAINER=ALL;

 GRANT SELECT ON V_$TRANSACTION TO c##dbzuser CONTAINER=ALL;


 GRANT SELECT ON V_$MYSTAT TO c##dbzuser CONTAINER=ALL;

 GRANT SELECT ON V_$STATNAME TO c##dbzuser CONTAINER=ALL;


 exit;


【VII】Source连接器注册


# 创建Source连接器配置文件

mkdir /opt/kafka_2.13-3.5.0/json

cd /opt/kafka_2.13-3.5.0/json


vi source_channel.json  # 添加如下内容后保存, 双斜杠及后面内容为参数说明,实际使用时请去除


{

   "name": "source_channel",  // 连接器名称, 集群内需唯一

   "config": {

       "connector.class": "io.debezium.connector.postgresql.PostgresConnector",  // 连接器类型

       "plugin.name": "pgoutput",              // 插件名称, PG库独有配置

       "slot.name": "debezium_channel",            // 复制槽名称, PG库独有配置

       "database.hostname": "your db connect url",           // 数据库连接地址

       "database.port": "5432",              // 端口

       "database.user": "your account",            // 账号

       "database.password": "your password",           // 密码

       "database.dbname" : "channel",             // 数据库名称

       "schema.include.list": "rop_channel",           // 模式列表,多个用逗号分隔

       "table.include.list": "rop_channel.debezium_test,rop_channel.csc_store,rop_channel.csc_store_data,rop_channel.csc_store_dim,rop_channel.cls_cp_store_version,rop_channel.cls_cp_virtual_store",

       "snapshot.mode": "never",              // 快照模式(当前设置为不做数据全量初始化)

       "tombstones.on.delete": "true",             // 是否发送墓碑记录, 删除同步参数

       "database.history.kafka.bootstrap.servers": "10.101.1.45:9092,10.101.1.46:9092,10.101.1.47:9092,10.101.1.48:9092,10.101.1.49:9092", // MQ集群地址

       "database.history.kafka.topic": "__debezium_ddl_channel",      // 表结构存放主题

       "key.converter": "io.confluent.connect.avro.AvroConverter",      // MQ消息序列化与反序列化格式, 采用avro时需指定schema registry集群地址,即下面的参数地址

       "key.converter.schema.registry.url": "http://10.101.1.45:8081,http://10.101.1.46:8081,http://10.101.1.47:8081,http://10.101.1.48:8081,http://10.101.1.49:8081",

       "value.converter": "io.confluent.connect.avro.AvroConverter",

       "value.converter.schema.registry.url": "http://10.101.1.45:8081,http://10.101.1.46:8081,http://10.101.1.47:8081,http://10.101.1.48:8081,http://10.101.1.49:8081",

       "topic.prefix": "channel",              // MQ主题前缀

       "time.precision.mode": "adaptive",            // 时间精度模式(自适应)

       "heartbeat.interval.ms": "60000",            // 心跳检测间隔毫秒(针对监控库数据变化量非常小时的一个参数设置,防止复制槽堆积)

       "signal.data.collection": "public.debezium_signal",        // 信令表(针对指定表发起全量快照)

       "topic.creation.enable": "true",            // 自动创建MQ主题

       "topic.creation.default.replication.factor": "3",        // MQ主题副本数

       "topic.creation.default.partitions": "20",          // MQ主题分区数

       "topic.creation.default.compression.type": "lz4"        // MQ消息压缩类型

   }

}


# Oracle源连接器注册配置参数


{

   "name": "source_ewm",

   "config": {

       "connector.class": "io.debezium.connector.oracle.OracleConnector",

       "tasks.max": "1",

       "database.hostname": "your db connect url",

       "database.port": "1521",

       "database.user": "your account",

       "database.password": "your password",

       "database.dbname": "DB1",

       "schema.include.list": "SAPSR3",

       "table.include.list": "SAPSR3.ZTSTOCK_PARTNER,SAPSR3.ZTSTOCK_COMMON,SAPSR3./SAPAPO/MATKEY,SAPSR3./SCWM/AQUA,SAPSR3./SCWM/QUAN,SAPSR3./SCWM/LAGP,SAPSR3./SCWM/ORDIM_O,SAPSR3.ZTLGORT_CONF,SAPSR3.ZTLGORT_LGTYP,SAPSR3.ZTMAST_UNIT,SAPSR3.ZTMATERIAL_D,SAPSR3.ZTRM_OUT_H,SAPSR3.ZTRM_OUT_I,SAPSR3.ZTSTOCK_LIFNR",

       "snapshot.mode": "schema_only",

       "snapshot.fetch.size": "10000",

       "query.fetch.size": "10000",

       "log.mining.query.filter.mode": "none",

       "log.mining.strategy": "online_catalog",

       "log.mining.batch.size.min": "500",

       "log.mining.batch.size.max": "100000",

       "log.mining.batch.size.default": "5000",

       "log.mining.sleep.time.min.ms": "0",

       "log.mining.sleep.time.max.ms": "1000",

       "log.mining.sleep.time.default.ms": "300",

       "log.mining.sleep.time.increment.ms": "100",

       "tombstones.on.delete": "true",

       "schema.history.internal.kafka.bootstrap.servers": "10.101.1.45:9092,10.101.1.46:9092,10.101.1.47:9092,10.101.1.48:9092,10.101.1.49:9092",

       "schema.history.internal.kafka.topic": "EWM",

       "key.converter": "io.confluent.connect.avro.AvroConverter",

       "key.converter.schema.registry.url": "http://10.101.1.45:8081,http://10.101.1.46:8081,http://10.101.1.47:8081,http://10.101.1.48:8081,http://10.101.1.49:8081",

       "value.converter": "io.confluent.connect.avro.AvroConverter",

       "value.converter.schema.registry.url": "http://10.101.1.45:8081,http://10.101.1.46:8081,http://10.101.1.47:8081,http://10.101.1.48:8081,http://10.101.1.49:8081",

       "topic.prefix": "EWM",

       "time.precision.mode": "connect",

       "heartbeat.interval.ms": "60000",

       "signal.data.collection": "SAPSR3.DEBEZIUM_SIGNAL",

       "topic.creation.enable": "true",

       "topic.creation.default.replication.factor": "3",

       "topic.creation.default.partitions": "10",

       "topic.creation.default.compression.type": "lz4"

   }

}



# Kafka Connect提供的一些Restful API说明


curl -X POST http://10.101.1.45:8083/connectors -d @source_channel.json  # 注册配置文件到connect集群

curl -X PUT http://10.101.1.45:8083/connectors/source_channel/config -d @source_channel.json   # 更新连接器配置(更新时的json文件只需要config体里面包含的内容)

curl -X DELETE http://10.101.1.45:8083/connectors/source_channel    # 删除指定连接器

curl -X GET http://10.101.1.45:8083/connectors/source_sc_cloud    # 获取指定连接器配置信息

curl -X GET http://10.101.1.45:8083/connectors/source_sc_cloud/topics   # 获取指定连接器包含的kafka主题列表

curl -X GET http://10.101.1.45:8083/connectors/source_sc_cloud/status   # 获取指定连接器的运行状态



【VIII】Sink连接器注册


# 创建Sink连接器配置文件

mkdir /opt/kafka_2.13-3.5.0/json

cd /opt/kafka_2.13-3.5.0/json


vi sink_channel.json  # 添加如下内容后保存, 双斜杠及后面内容为参数说明,实际使用时请去除


{

   "name": "sink_channel",

   "config": {

       "connector.class": "io.debezium.connector.jdbc.JdbcSinkConnector",

       "tasks.max": "10",       // 并发任务数,最佳效果是和MQ主题分区数相同

       "topics": "sc_cloud.public.sys_user,sc_cloud.public.sys_user_role,sc_cloud.public.sys_role,sc_cloud.public.sys_role_data,sc_cloud.public.usc_hr_dept,sc_cloud.public.usc_hr_employee,sc_cloud.masterdata.csc_area,sc_cloud.masterdata.csc_corp,sc_cloud.masterdata.csc_customer,sc_cloud.masterdata.csc_region_city,sc_cloud.masterdata.csc_region_county,sc_cloud.masterdata.csc_region_province",

       "connection.url": "jdbc:postgresql://xxx.xxx.xxx.xxx:5432/channel?currentSchema=public&reWriteBatchedInserts=true",

       "connection.username": "your account",

       "connection.password": "your password",

       "insert.mode": "upsert",     // 插入模式

       "delete.enabled": "true",     // 启用删除(需要源连接器开启墓碑参数)

       "primary.key.mode": "record_key",   // 主键识别模式

       "schema.evolution": "basic",    // 结构演变模式

       "database.time_zone": "UTC",    // 时区

       "key.converter": "io.confluent.connect.avro.AvroConverter",

       "key.converter.schema.registry.url": "http://10.101.1.45:8081,http://10.101.1.46:8081,http://10.101.1.47:8081,http://10.101.1.48:8081,http://10.101.1.49:8081",

       "value.converter": "io.confluent.connect.avro.AvroConverter",

       "value.converter.schema.registry.url": "http://10.101.1.45:8081,http://10.101.1.46:8081,http://10.101.1.47:8081,http://10.101.1.48:8081,http://10.101.1.49:8081",

       "transforms": "route",      // 转换设置

       "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",   // 设置转换类型为正则

       "transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",       // 设置正则

       "transforms.route.replacement": "$3"            // 匹配替换(这里主要是默认创建的MQ主题为 库.模式.表,在这里只取表的名字)

   }

}


curl -X POST http://10.101.1.45:8083/connectors -d @sink_channel.json  # 注册配置文件到connect集群


【IX】信令表设置


# 首先要在Source连接器中增加下面的参数

"signal.data.collection": "public.debezium_signal"


# 在数据库中对应schema下创建信令表

CREATE TABLE "public"."debezium_signal" (

 "id" varchar(100) COLLATE "pg_catalog"."default" NOT NULL,

 "type" varchar(50) COLLATE "pg_catalog"."default",

 "data" varchar(1000) COLLATE "pg_catalog"."default",

 CONSTRAINT "debezium_signal_pkey" PRIMARY KEY ("id")

)

;


ALTER TABLE "public"."debezium_signal"

 OWNER TO "channel";


COMMENT ON TABLE "public"."debezium_signal" IS '同步信令表';


# 当需要对某些表执行快照时,就在表中插入一条记录


INSERT INTO public.debezium_signal(id,type,data)

VALUES (

  gen_random_uuid(),

  'execute-snapshot',

  '{data-collections: [public.table1,public.table2], type: incremental}'

);



【X】定时清理日志文件


0 1 * * * /opt/kafka_2.12-3.4.0/bin/auto_delete_logs.sh


# 创建脚本,定时删除3天前的日志

cd /opt/kafka_2.13-3.5.0/bin

vi auto_delete_logs.sh

find /opt/kafka_2.13-3.5.0/logs/ -type f -name "*.log.*" -mtime +3 -exec rm -rf {} \;

chmod +x auto_delete_logs.sh


# 创建crontab定时任务,添加下面行

crontab -e

0 1 * * * /opt/kafka_2.13-3.5.0/bin/auto_delete_logs.sh


【XI】Debezium UI安装


# 安装yum管理工具

yum install -y yum-utils


# 添加阿里云镜像

yum-config-manager --add-repo https://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo


# 查看ce版本,然后选择一个安装

yum list docker-ce --showduplicates|sort -r


docker-ce.x86_64            3:24.0.4-1.el7                      docker-ce-stable

docker-ce.x86_64            3:24.0.3-1.el7                      docker-ce-stable

docker-ce.x86_64            3:24.0.2-1.el7                      docker-ce-stable

docker-ce.x86_64            3:24.0.1-1.el7                      docker-ce-stable

docker-ce.x86_64            3:24.0.0-1.el7                      docker-ce-stable

docker-ce.x86_64            3:23.0.6-1.el7                      docker-ce-stable

docker-ce.x86_64            3:23.0.5-1.el7                      docker-ce-stable

docker-ce.x86_64            3:23.0.4-1.el7                      docker-ce-stable

docker-ce.x86_64            3:23.0.3-1.el7                      docker-ce-stable

docker-ce.x86_64            3:23.0.2-1.el7                      docker-ce-stable

docker-ce.x86_64            3:23.0.1-1.el7                      docker-ce-stable

docker-ce.x86_64            3:23.0.0-1.el7                      docker-ce-stable

docker-ce.x86_64            3:20.10.9-3.el7                     docker-ce-stable

docker-ce.x86_64            3:20.10.8-3.el7                     docker-ce-stable

docker-ce.x86_64            3:20.10.7-3.el7                     docker-ce-stable


# 不指定版本的话默认是安装最新版

yum install docker-ce docker-ce-cli containerd.io


# 启动docker

systemctl enable docker

systemctl start docker

systemctl status docker


# 查看docker版本

docker version


# 修改docker镜像仓库后重启

cat > /etc/docker/daemon.json << EOF

{

"registry-mirrors":["http://hub-mirror.c.163.com"]

}

EOF


systemctl restart docker


# 安装Debezium-UI, 成功运行后可在浏览器上访问当前机器的8080

docker run -d -it --rm --name debezium-ui -p 8080:8080 -e KAFKA_CONNECT_URIS=http://10.101.1.45:8083 quay.io/debezium/debezium-ui:2.2



【XII】Confluent安装配置

#Confluent也打包了ZK以及Kafka Connect的所有功能,我这里只用了它的schema-registry功能

# 下载安装包

cd /opt/src

curl -O https://packages.confluent.io/archive/7.4/confluent-community-7.4.1.tar.gz

wget url://confluentinc-kafka-connect-avro-converter-7.4.1.tar.gz     # 地址忘记了,可以搜索到


# 解压

tar -xvf confluent-community-7.4.1.tar.gz

mv confluent-7.4.1/ ../

tar -xvf confluentinc-kafka-connect-avro-converter-7.4.1.tar.gz

cp confluentinc-kafka-connect-avro-converter-7.4.1/libs/*.jar /opt/kafka_2.13-3.5.0/plugins

cp /opt/confluent-7.4.1/share/java/schema-registry/*.jar /opt/kafka_2.13-3.5.0/plugins/


# 设置环境变量

vi /etc/profile


# 在末尾增加

export CONFLUENT_HOME=/opt/confluent-7.4.1

export PATH=$CONFLUENT_HOME/bin:$PATH


# 修改schema-registry配置文件

cd /opt/confluent-7.4.1/

vi etc/schema-registry/schema-registry.properties


# 增加或修改如下参数

listeners=http://10.101.1.45:8081

kafkastore.bootstrap.servers=PLAINTEXT://10.101.1.45:9092,10.101.1.46:9092,10.101.1.47:9092,10.101.1.48:9092,10.101.1.49:9092

kafkastore.topic=__confluent_schemas


# 将整个文件夹复制到其他节点

cd /opt

scp -r confluent-7.4.1/ [email protected]:/opt/

scp -r confluent-7.4.1/ [email protected]:/opt/

scp -r confluent-7.4.1/ [email protected]:/opt/

scp -r confluent-7.4.1/ [email protected]:/opt/


# 启动schema-registry,每个节点都要执行

cd /opt/confluent-7.4.1

bin/schema-registry-start -daemon etc/schema-registry/schema-registry.properties > /dev/null &


# 这里有个坑,schema-regeistry集群内部默认用域名+端口访问,所以每台机器的hosts里要做一下设置

vi /etc/hosts


10.101.1.45   bsddts45

10.101.1.46   bsddts46

10.101.1.47   bsddts47

10.101.1.48   bsddts48

10.101.1.49   bsddts49



【XIII】安装mysql


# 下载MySQL安装包后解压

cd /opt/src

wget https://dev.mysql.com/get/Downloads/MySQL-5.7/mysql-5.7.43-1.el7.x86_64.rpm-bundle.tar

tar -xvf mysql-5.7.43-1.el7.x86_64.rpm-bundle.tar


# 卸载MariaDB

rpm -e mariadb-libs --nodeps


# 安装mysql

yum install openssl-devel

rpm -ivh mysql-community-common-5.7.43-1.el7.x86_64.rpm

rpm -ivh mysql-community-libs-5.7.43-1.el7.x86_64.rpm

rpm -ivh mysql-community-libs-compat-5.7.43-1.el7.x86_64.rpm

rpm -ivh mysql-community-devel-5.7.43-1.el7.x86_64.rpm

rpm -ivh mysql-community-client-5.7.43-1.el7.x86_64.rpm

rpm -ivh mysql-community-server-5.7.43-1.el7.x86_64.rpm


# mysql启停命令

systemctl start mysqld

systemctl restart mysqld

systemctl stop mysqld


# 查看默认root密码

more /var/log/mysqld.log | grep password

[Note] A temporary password is generated for root@localhost: hjM;oPtgx2>+


# 登录mysql后修改root密码

mysql -u -p

alter user 'root'@'localhost' identified by 'xxxxxxxxxx';


# 创建可远程登录的账号密码,并赋权

create user 'wine'@'%' IDENTIFIED BY 'xxxxxxxxxx';

grant all on *.* to 'wine'@'%';


# 创建Kafka-Eagle存储数据库ke;

create database ke;


【XIV】安装Kafka-Eagle监控


# 下载ke安装包

cd /opt/src

wget https://github.com/smartloli/kafka-eagle-bin/archive/v3.0.1.tar.gz

tar -xvf v3.0.1.tar.gz

cd kafka-eagle-bin-3.0.1

tar -xvf efak-web-3.0.1-bin.tar.gz

mv efak-web-3.0.1 /opt


# 设置ke环境变量,添加下面几行后保存

vi /etc/profile


# Kafka Eagle Environment

export KE_HOME=/opt/efak-web-3.0.1

export PATH=$PATH:$KE_HOME/bin


# 使环境变量及时生效

source /etc/profile


# 修改ke配置文件

cd /opt/efak-web-3.0.1

vi conf/system-config.properties


# 增加或修改如下参数

efak.zk.cluster.alias=cluster1

cluster1.zk.list=10.101.1.45:2181,10.101.1.46:2181,10.101.1.47:2181,10.101.1.48:2181,10.101.1.49:2181

efak.driver=com.mysql.cj.jdbc.Driver

efak.url=jdbc:mysql://127.0.0.1:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull

efak.username=wine

efak.password=x


【XV】采用Flink接avro格式数据的一些注意事项


1、Avro对数据类型匹配要求非常高,所以在Flink SQL中定义的数据类型要和源库基本保持一致

2、小数点后的精度务必保持一致

3、Flink默认的时间戳精度只有3位,碰到源库时间精度到微秒的,要用BIGINT接,然后再/1000000 -28800

4、Oracle中的特殊字段类型RAW,  要用BYTES接

5、with参数需要制定avro格式,如果是confluent的还要指定schema-registry,示例如下


WITH (

   'connector' = 'kafka'                                

   ,'topic' = 'EWM.SAPSR3.ZTMAST_UNIT'

   ,'properties.bootstrap.servers' = '10.101.1.45:9092,10.101.1.46:9092,10.101.1.47:9092,10.101.1.48:9092,10.101.1.49:9092'

   ,'format' = 'avro-confluent'

   ,'avro-confluent.schema-registry.url' = 'http://10.101.1.45:8081,http://10.101.1.46:8081,http://10.101.1.47:8081,http://10.101.1.48:8081,http://10.101.1.49:8081'

   ,'scan.startup.mode' = 'latest-offset'

);


【XVI】监控消费延时,并实现钉钉机器人预警


vi /opt/kafka_2.13-3.5.0/bin/consumer_lag_alert.sh


#!/bin/sh

# kafka的所属目录 运行命令依赖其下bin中相关命令

FAKFA_DIR=/opt/kafka_2.13-3.5.0


# 默认积压报警条数

DEFAULT_LIMIT=100


# group信息的临时文件

GROUP_INFO_FILE=/opt/kafka_2.13-3.5.0/logs/monitor-group-infos.txt


# kafka的broker 多个逗号隔开

BROKER=10.101.1.45:9092,10.101.1.46:9092,10.101.1.47:9092,10.101.1.48:9092,10.101.1.49:9092


# 写入文件

${FAKFA_DIR}/bin/kafka-consumer-groups.sh --bootstrap-server ${BROKER} --describe --all-groups > ${GROUP_INFO_FILE}


error_infos=()


# 遍历每一行  判断

while read line

do

 if [[ -n ${line} ]]; then

   # 去掉标题行

   if [[ $line =~ CONSUMER-ID ]];then

     # nothing

     # echo "${line}"

     WINE=""

   else

     group=`echo $line | awk -F' ' '{print $1}'`

     topic=`echo $line | awk -F' ' '{print $2}'`

     partition=`echo $line | awk -F' ' '{print $3}'`

     lag=`echo $line | awk -F' ' '{print $6}'`

     consumer_id=`echo $line | awk -F' ' '{print $7}'`

     # echo "$topic $lag"


     # lag为 - 或者consumer_id为 - 的忽略

     if [[ "${lag}" = "-" ]] || [[ "${consumer_id}" = "-" ]];then

       # nothing

       consumer_id="-"

     elif [ $((lag)) -ge $((DEFAULT_LIMIT)) ];then

       # 超过设置阈值则报警

       #echo ">$((DEFAULT_LIMIT)) $((lag)) ${line}"

       error_infos[${#error_infos[*]}]=">Group:${group}, Topic:${topic}, Partition:${partition}, 积压数:${lag} \n"

     fi

   fi

 fi

done  < ${GROUP_INFO_FILE}


# 非空的 发送钉钉消息提醒

if [[ -n ${error_infos[*]} ]]; then

 curl -s -XPOST -H 'Content-Type: application/json' https://oapi.dingtalk.com/robot/send?access_token=xxxxxxxxxx -d "{\"msgtype\":\"text\",\"te

xt\":{\"content\":\"数据复制平台消费积压报警\\n ${error_infos[*]} \"},\"at\":{\"atUserIds\":[\"0001\",\"0002\"]}}"

fi

标签:opt,10.101,http,8081,KafkaConnect,Confluent,ce,kafka,企业级
From: https://blog.51cto.com/xwhite/7147959

相关文章

  • [13章]Spring Boot打造企业级一体化SaaS系统
    SpringBoot打造企业级一体化SaaS系统 提取码:f7gh SAAS代表“软件即服务”(SoftwareasaService),它是一种软件交付模型,通过互联网提供软件应用程序给用户使用。在SAAS模型中,软件应用程序由供应商托管在云端的服务器上,并通过互联网进行访问和使用。SAAS系统的优点包括:低成本:用户无......
  • 基于Spring Boot手把手博客系统企业级前后端实战-学习笔记
     一、springboot初始化工程1、网址:https://start.spring.io二、Gradle安装(绿色版)1、windows下-下载:http://downloads.gradle.org/distributions/gradle-3.5-bin.zip-解压:-配置环境变量:新建环境变......
  • Springboot整合模版方法模式概念->原理优缺点->框架应用场景->企业级实战
    一、前言常见的设计模式有23种,我们不得不提到模板方法设计模式,这是一种在软件开发中广泛使用的行为型设计模式之一。它为我们提供了一种优雅的方式来定义算法的结构,并将算法的具体实现延迟到子类中!在本篇博客中,我们将深入探讨模板方法设计模式在SpringBoot中的应用。我们将从概念......
  • 【13章】SpringBoot打造企业级一体化SaaS系统
    课程下载——【13章】SpringBoot打造企业级一体化SaaS系统提取码:y8v1 分享课程——【13章】SpringBoot打造企业级一体化SaaS系统,附源码。课程中整合后端主流技术(SpringBoot、物理数据库隔离、加载动态权限、多方式权限控制)、前端必会框架(vue3),完整落地ERP+CRM一体化SaaS系统,带......
  • 基于Kubernetes云原生技术的低代码PaaS平台,快速构建企业级应用程序
    低代码开发平台只需要编写简单的配置文件即可构建企业级应用程序。低代码PaaS平台可以在云端开发、部署、运行低代码应用程序。使用独立数据库模型,基于Kubernetes云原生技术,每个租户均可拥有一套独立的存储、数据库、代码和命名空间,实现了100%的租户数据隔离,并可以随时迁移到私有部......
  • [18章]Vue3+NestJS 全栈开发企业级管理后台
    点击下载:[18章]Vue3+NestJS全栈开发企业级管理后台提取码:zzbv Next.js是一个用于构建现代化React应用程序的框架。它强调性能、开发体验和SEO优化,是许多React开发者的首选。Next.js提供了许多功能,包括:服务器渲染:Next.js允许在服务器端渲染React应用程序,从而提高了应......
  • Scrum敏捷开发企业级实战课-Leangoo领歌
    ​课程简介Scrum是目前运用最为广泛的敏捷开发方法,是一个轻量级的项目管理和产品研发管理框架。这是一个两天的实训课程,面向研发管理者、项目经理、产品经理、研发团队等,旨在帮助学员全面系统地学习Scrum和敏捷开发,帮助企业快速启动敏捷实施。课程采用案例讲解+沙盘演练的方......
  • 云原生架构实战:构建可弹性扩展的企业级应用
    云原生架构实战:构建可弹性扩展的企业级应用一、微服务架构与SpringCloud概述随着云计算和容器技术的普及,传统的单体应用架构逐渐暴露出扩展性、部署和维护等方面的瓶颈。微服务架构应运而生,将应用拆分成一组小型服务,每个服务都运行在自己的进程中,通过HTTP或消息中间件等通信机制......
  • 企业级架构设计:迈向大型项目开发的高级阶段
     企业级架构设计:迈向大型项目开发的高级阶段摘要:每个程序员、或者说每个工作者都应该有自己的职业规划,如果你不是富二代,不是官二代,也没有职业规划,希望你可以思考一下自己的将来。每个程序员、或者说每个工作者都应该有自己的职业规划,如果你不是富二代,不是官二代,也没有职业规划......
  • Java微服务项目【动力商城】企业级实战项目直击大厂
    2023年动力节点全新版微服务项目【动力商城】震撼来袭!企业级实战项目,直击大厂!动力商城是以B2C模式运营的在线商城,总览企业及公司产品于一体,主打自主研发品牌以推广的互联网化新商城。系统采用的微服务架构模式。技术选型:Springboot+SpringCloud、Mybatis-plus、MySQL、Redis、阿里......