【I】集群规划
5台节点
IP地址
10.101.1.45 ZK、Kafka、Debezium Connector、JDK、Debezium UI、MySQL、Kafka-Eagle
10.101.1.46 ZK、Kafka、Debezium Connector、JDK
10.101.1.47 ZK、Kafka、Debezium Connector、JDK
10.101.1.48 ZK、Kafka、Debezium Connector、JDK
10.101.1.49 ZK、Kafka、Debezium Connector、JDK
从对应官网上下载所需资源包并上传到集群每个节点 /opt/src 目录下
apache-zookeeper-3.8.1-bin.tar.gz
debezium-connector-jdbc-2.3.0.Final-plugin.tar.gz
debezium-connector-postgres-2.3.0.Final-plugin.tar.gz
jdk-17_linux-x64_bin.tar.gz
kafka_2.13-3.5.0.tgz
【II】配置Java环境
# 登录45节点
cd /opt/src
# 解压并移动/opt目录下
tar -xvf jdk-17_linux-x64_bin.tar.gz
mv jdk-17.0.4.1/ ../
# 设置环境变量
vi /etc/profile
# 在末尾增加 Java Environment
export JAVA_HOME=/opt/jdk-17.0.4.1
export PATH=$JAVA_HOME/bin:$PATH
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
# 应用环境变量
source /etc/profile
# 查看Java版本, 返回 java version "17.0.4.1" 2022-08-18 LTS 即设置成功
java -version
# 其他4台节点重复相同的操作
【III】ZooKeeper集群配置
# 登录45节点
cd /opt/src
# 解压
tar -xvf apache-zookeeper-3.8.1-bin.tar.gz
mv apache-zookeeper-3.8.1-bin ../zookeeper-3.8.1
# 创建存放数据和日志的目录
cd /opt/zookeeper-3.8.1
mkdir data logs
# 修改配置文件
cp conf/zoo_sample.cfg conf/zoo.cfg
vi conf/zoo.cfg
# 增加或修改如下参数
dataDir=/opt/zookeeper-3.8.1/data
dataLogDir=/opt/zookeeper-3.8.1/logs
admin.serverPort=8887
server.45=10.101.1.45:2888:3888
server.46=10.101.1.46:2888:3888
server.47=10.101.1.47:2888:3888
server.48=10.101.1.48:2888:3888
server.49=10.101.1.49:2888:3888
# 写入当前节点在集群中的唯一标识符(标识符不能每个节点上的值不能相同,可以用12345,个人喜欢用IP段最后一位来区分)
echo 45 > data/myid
# 以上操作在另外4个节点做同样的操作后,启动ZK集群
bin/zkServer.sh start
bin/zkServer.sh status # 查看节点状态, 应该有1台返回leader,另外4台是follower
【IV】Kafka集群配置
# 登录45节点
cd /opt/src
# 解压
tar -xvf kafka_2.13-3.5.0.tgz
mv kafka_2.13-3.5.0 ../
# 创建存放数据和日志的目录
cd /opt/kafka_2.13-3.5.0
mkdir data logs
# 编辑kafka server配置文件, 增加或修改如下参数, 除broker.id和listeners按各自ip来区分外,其余参数在每个节点上一样
vi config/server.properties
broker.id=45
delete.topic.enable=true
auto.create.topics.enable=false
listeners=PLAINTEXT://10.101.1.45:9092
log.dirs=/opt/kafka_2.13-3.5.0/data
num.partitions=10
default.replication.factor=3
min.insync.replicas = 2
num.recovery.threads.per.data.dir=3
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=3
log.retention.hours=72
zookeeper.connect=10.101.1.45:2181,10.101.1.46:2181,10.101.1.47:2181,10.101.1.48:2181,10.101.1.49:2181
unclean.leader.election.enable=false
auto.leader.rebalance.enable=false
# jvm调整
vi bin/kafka-server-start.sh
export KAFKA_HEAP_OPTS="-Xmx2G -Xms2G"
# 以上操作在另外4个节点做同样的操作后,启动Kafka集群
bin/kafka-server-start.sh -daemon config/server.properties > /dev/null &
# 检查日志有无错误信息
more logs/server.log
【V】Kafka Connect集群配置
# 创建插件目录
cd /opt/kafka_2.13-3.5.0
mkdir plugins
# 将从Debezium官网上下载的Postgres以及jdbc连接器包放入plugins目录
cd /opt/src
tar -xvf debezium-connector-postgres-2.3.0.Final-plugin.tar.gz
cp debezium-connector-postgres/*.jar ../kafka_2.13-3.5.0/plugins/
tar -xvf debezium-connector-jdbc-2.3.0.Final-plugin.tar.gz
cp debezium-connector-jdbc/*.jar ../kafka_2.13-3.5.0/plugins/
# 将jar包远程复制到其他4个节点对应目录下
cd /opt/kafka_2.13-3.5.0
scp plugins/*.jar [email protected]:/opt/kafka_2.13-3.5.0/plugins/
scp plugins/*.jar [email protected]:/opt/kafka_2.13-3.5.0/plugins/
scp plugins/*.jar [email protected]:/opt/kafka_2.13-3.5.0/plugins/
scp plugins/*.jar [email protected]:/opt/kafka_2.13-3.5.0/plugins/
# 修改connect配置文件
cd /opt/kafka_2.13-3.5.0
vi config/connect-distributed.properties
# 增加或修改如下参数
bootstrap.servers=10.101.1.45:9092,10.101.1.46:9092,10.101.1.47:9092,10.101.1.48:9092,10.101.1.49:9092
group.id=bsd-connect-cluster
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.topic=connect-offsets
offset.storage.replication.factor=3
offset.storage.partitions=10
config.storage.topic=connect-configs
config.storage.replication.factor=3
config.storage.partitions=1
status.storage.topic=connect-status
status.storage.replication.factor=3
status.storage.partitions=10
listeners=HTTP://10.101.1.45:8083
plugin.path=/opt/kafka_2.13-3.5.0/plugins,
# 复制配置文件到其余4个节点对应目录下, 修改listeners参数的对应IP
scp config/connect-distributed.properties [email protected]:/opt/kafka_2.13-3.5.0/config/
scp config/connect-distributed.properties [email protected]:/opt/kafka_2.13-3.5.0/config/
scp config/connect-distributed.properties [email protected]:/opt/kafka_2.13-3.5.0/config/
scp config/connect-distributed.properties [email protected]:/opt/kafka_2.13-3.5.0/config/
# jvm调整, 每个节点都要调整
vi bin/connect-distributed.sh
export KAFKA_HEAP_OPTS="-Xms8G -Xmx16G -Xmn6G"
# 挨个节点启动Kafka Connect
cd /opt/kafka_2.13-3.5.0
bin/connect-distributed.sh -daemon config/connect-distributed.properties > /dev/null &
tail -f logs/connect.log
# 查看集群是否正常运行
curl -X GET http://10.101.1.45:8083
{"version":"3.5.0","commit":"c97b88d5db4de28d","kafka_cluster_id":"2qAwkH6ISlW8lIX5vC1qHQ"}
【VI】数据库的准备工作
# PostgreSQL,原理是通过创建复制槽来监听日志
1、检查是否有phoutput插件,没有的话要安装一下
2、设置wal_level参数为logical,需要重启数据库生效
3、创建具有复制权限的用户
CREATE ROLE <replication_group>;
GRANT REPLICATION_GROUP TO <original_owner>;
GRANT REPLICATION_GROUP TO <replication_user>;
# Oracle,原理是通过Logminer来监听日志,下面实例为12c及以后版本,11g及以前把 CONTAINER=ALL删除就行
sqlplus sys/top_secret@//localhost:1521/ORCLCDB as sysdba
CREATE TABLESPACE logminer_tbs DATAFILE '/opt/oracle/oradata/ORCLCDB/logminer_tbs.dbf'
SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
exit;
sqlplus sys/top_secret@//localhost:1521/ORCLPDB1 as sysdba
CREATE TABLESPACE logminer_tbs DATAFILE '/opt/oracle/oradata/ORCLCDB/ORCLPDB1/logminer_tbs.dbf'
SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
exit;
sqlplus sys/top_secret@//localhost:1521/ORCLCDB as sysdba
CREATE USER c##dbzuser IDENTIFIED BY dbz
DEFAULT TABLESPACE logminer_tbs
QUOTA UNLIMITED ON logminer_tbs
CONTAINER=ALL;
GRANT CREATE SESSION TO c##dbzuser CONTAINER=ALL;
GRANT SET CONTAINER TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ON V_$DATABASE to c##dbzuser CONTAINER=ALL;
GRANT FLASHBACK ANY TABLE TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ANY TABLE TO c##dbzuser CONTAINER=ALL;
GRANT SELECT_CATALOG_ROLE TO c##dbzuser CONTAINER=ALL;
GRANT EXECUTE_CATALOG_ROLE TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ANY TRANSACTION TO c##dbzuser CONTAINER=ALL;
GRANT LOGMINING TO c##dbzuser CONTAINER=ALL;
GRANT CREATE TABLE TO c##dbzuser CONTAINER=ALL;
GRANT LOCK ANY TABLE TO c##dbzuser CONTAINER=ALL;
GRANT CREATE SEQUENCE TO c##dbzuser CONTAINER=ALL;
GRANT EXECUTE ON DBMS_LOGMNR TO c##dbzuser CONTAINER=ALL;
GRANT EXECUTE ON DBMS_LOGMNR_D TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ON V_$LOG TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ON V_$LOG_HISTORY TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ON V_$LOGMNR_LOGS TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ON V_$LOGMNR_CONTENTS TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ON V_$LOGMNR_PARAMETERS TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ON V_$LOGFILE TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ON V_$ARCHIVED_LOG TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ON V_$TRANSACTION TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ON V_$MYSTAT TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ON V_$STATNAME TO c##dbzuser CONTAINER=ALL;
exit;
【VII】Source连接器注册
# 创建Source连接器配置文件
mkdir /opt/kafka_2.13-3.5.0/json
cd /opt/kafka_2.13-3.5.0/json
vi source_channel.json # 添加如下内容后保存, 双斜杠及后面内容为参数说明,实际使用时请去除
{
"name": "source_channel", // 连接器名称, 集群内需唯一
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector", // 连接器类型
"plugin.name": "pgoutput", // 插件名称, PG库独有配置
"slot.name": "debezium_channel", // 复制槽名称, PG库独有配置
"database.hostname": "your db connect url", // 数据库连接地址
"database.port": "5432", // 端口
"database.user": "your account", // 账号
"database.password": "your password", // 密码
"database.dbname" : "channel", // 数据库名称
"schema.include.list": "rop_channel", // 模式列表,多个用逗号分隔
"table.include.list": "rop_channel.debezium_test,rop_channel.csc_store,rop_channel.csc_store_data,rop_channel.csc_store_dim,rop_channel.cls_cp_store_version,rop_channel.cls_cp_virtual_store",
"snapshot.mode": "never", // 快照模式(当前设置为不做数据全量初始化)
"tombstones.on.delete": "true", // 是否发送墓碑记录, 删除同步参数
"database.history.kafka.bootstrap.servers": "10.101.1.45:9092,10.101.1.46:9092,10.101.1.47:9092,10.101.1.48:9092,10.101.1.49:9092", // MQ集群地址
"database.history.kafka.topic": "__debezium_ddl_channel", // 表结构存放主题
"key.converter": "io.confluent.connect.avro.AvroConverter", // MQ消息序列化与反序列化格式, 采用avro时需指定schema registry集群地址,即下面的参数地址
"key.converter.schema.registry.url": "http://10.101.1.45:8081,http://10.101.1.46:8081,http://10.101.1.47:8081,http://10.101.1.48:8081,http://10.101.1.49:8081",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://10.101.1.45:8081,http://10.101.1.46:8081,http://10.101.1.47:8081,http://10.101.1.48:8081,http://10.101.1.49:8081",
"topic.prefix": "channel", // MQ主题前缀
"time.precision.mode": "adaptive", // 时间精度模式(自适应)
"heartbeat.interval.ms": "60000", // 心跳检测间隔毫秒(针对监控库数据变化量非常小时的一个参数设置,防止复制槽堆积)
"signal.data.collection": "public.debezium_signal", // 信令表(针对指定表发起全量快照)
"topic.creation.enable": "true", // 自动创建MQ主题
"topic.creation.default.replication.factor": "3", // MQ主题副本数
"topic.creation.default.partitions": "20", // MQ主题分区数
"topic.creation.default.compression.type": "lz4" // MQ消息压缩类型
}
}
# Oracle源连接器注册配置参数
{
"name": "source_ewm",
"config": {
"connector.class": "io.debezium.connector.oracle.OracleConnector",
"tasks.max": "1",
"database.hostname": "your db connect url",
"database.port": "1521",
"database.user": "your account",
"database.password": "your password",
"database.dbname": "DB1",
"schema.include.list": "SAPSR3",
"table.include.list": "SAPSR3.ZTSTOCK_PARTNER,SAPSR3.ZTSTOCK_COMMON,SAPSR3./SAPAPO/MATKEY,SAPSR3./SCWM/AQUA,SAPSR3./SCWM/QUAN,SAPSR3./SCWM/LAGP,SAPSR3./SCWM/ORDIM_O,SAPSR3.ZTLGORT_CONF,SAPSR3.ZTLGORT_LGTYP,SAPSR3.ZTMAST_UNIT,SAPSR3.ZTMATERIAL_D,SAPSR3.ZTRM_OUT_H,SAPSR3.ZTRM_OUT_I,SAPSR3.ZTSTOCK_LIFNR",
"snapshot.mode": "schema_only",
"snapshot.fetch.size": "10000",
"query.fetch.size": "10000",
"log.mining.query.filter.mode": "none",
"log.mining.strategy": "online_catalog",
"log.mining.batch.size.min": "500",
"log.mining.batch.size.max": "100000",
"log.mining.batch.size.default": "5000",
"log.mining.sleep.time.min.ms": "0",
"log.mining.sleep.time.max.ms": "1000",
"log.mining.sleep.time.default.ms": "300",
"log.mining.sleep.time.increment.ms": "100",
"tombstones.on.delete": "true",
"schema.history.internal.kafka.bootstrap.servers": "10.101.1.45:9092,10.101.1.46:9092,10.101.1.47:9092,10.101.1.48:9092,10.101.1.49:9092",
"schema.history.internal.kafka.topic": "EWM",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://10.101.1.45:8081,http://10.101.1.46:8081,http://10.101.1.47:8081,http://10.101.1.48:8081,http://10.101.1.49:8081",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://10.101.1.45:8081,http://10.101.1.46:8081,http://10.101.1.47:8081,http://10.101.1.48:8081,http://10.101.1.49:8081",
"topic.prefix": "EWM",
"time.precision.mode": "connect",
"heartbeat.interval.ms": "60000",
"signal.data.collection": "SAPSR3.DEBEZIUM_SIGNAL",
"topic.creation.enable": "true",
"topic.creation.default.replication.factor": "3",
"topic.creation.default.partitions": "10",
"topic.creation.default.compression.type": "lz4"
}
}
# Kafka Connect提供的一些Restful API说明
curl -X POST http://10.101.1.45:8083/connectors -d @source_channel.json # 注册配置文件到connect集群
curl -X PUT http://10.101.1.45:8083/connectors/source_channel/config -d @source_channel.json # 更新连接器配置(更新时的json文件只需要config体里面包含的内容)
curl -X DELETE http://10.101.1.45:8083/connectors/source_channel # 删除指定连接器
curl -X GET http://10.101.1.45:8083/connectors/source_sc_cloud # 获取指定连接器配置信息
curl -X GET http://10.101.1.45:8083/connectors/source_sc_cloud/topics # 获取指定连接器包含的kafka主题列表
curl -X GET http://10.101.1.45:8083/connectors/source_sc_cloud/status # 获取指定连接器的运行状态
【VIII】Sink连接器注册
# 创建Sink连接器配置文件
mkdir /opt/kafka_2.13-3.5.0/json
cd /opt/kafka_2.13-3.5.0/json
vi sink_channel.json # 添加如下内容后保存, 双斜杠及后面内容为参数说明,实际使用时请去除
{
"name": "sink_channel",
"config": {
"connector.class": "io.debezium.connector.jdbc.JdbcSinkConnector",
"tasks.max": "10", // 并发任务数,最佳效果是和MQ主题分区数相同
"topics": "sc_cloud.public.sys_user,sc_cloud.public.sys_user_role,sc_cloud.public.sys_role,sc_cloud.public.sys_role_data,sc_cloud.public.usc_hr_dept,sc_cloud.public.usc_hr_employee,sc_cloud.masterdata.csc_area,sc_cloud.masterdata.csc_corp,sc_cloud.masterdata.csc_customer,sc_cloud.masterdata.csc_region_city,sc_cloud.masterdata.csc_region_county,sc_cloud.masterdata.csc_region_province",
"connection.url": "jdbc:postgresql://xxx.xxx.xxx.xxx:5432/channel?currentSchema=public&reWriteBatchedInserts=true",
"connection.username": "your account",
"connection.password": "your password",
"insert.mode": "upsert", // 插入模式
"delete.enabled": "true", // 启用删除(需要源连接器开启墓碑参数)
"primary.key.mode": "record_key", // 主键识别模式
"schema.evolution": "basic", // 结构演变模式
"database.time_zone": "UTC", // 时区
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://10.101.1.45:8081,http://10.101.1.46:8081,http://10.101.1.47:8081,http://10.101.1.48:8081,http://10.101.1.49:8081",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://10.101.1.45:8081,http://10.101.1.46:8081,http://10.101.1.47:8081,http://10.101.1.48:8081,http://10.101.1.49:8081",
"transforms": "route", // 转换设置
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter", // 设置转换类型为正则
"transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)", // 设置正则
"transforms.route.replacement": "$3" // 匹配替换(这里主要是默认创建的MQ主题为 库.模式.表,在这里只取表的名字)
}
}
curl -X POST http://10.101.1.45:8083/connectors -d @sink_channel.json # 注册配置文件到connect集群
【IX】信令表设置
# 首先要在Source连接器中增加下面的参数
"signal.data.collection": "public.debezium_signal"
# 在数据库中对应schema下创建信令表
CREATE TABLE "public"."debezium_signal" (
"id" varchar(100) COLLATE "pg_catalog"."default" NOT NULL,
"type" varchar(50) COLLATE "pg_catalog"."default",
"data" varchar(1000) COLLATE "pg_catalog"."default",
CONSTRAINT "debezium_signal_pkey" PRIMARY KEY ("id")
)
;
ALTER TABLE "public"."debezium_signal"
OWNER TO "channel";
COMMENT ON TABLE "public"."debezium_signal" IS '同步信令表';
# 当需要对某些表执行快照时,就在表中插入一条记录
INSERT INTO public.debezium_signal(id,type,data)
VALUES (
gen_random_uuid(),
'execute-snapshot',
'{data-collections: [public.table1,public.table2], type: incremental}'
);
【X】定时清理日志文件
0 1 * * * /opt/kafka_2.12-3.4.0/bin/auto_delete_logs.sh
# 创建脚本,定时删除3天前的日志
cd /opt/kafka_2.13-3.5.0/bin
vi auto_delete_logs.sh
find /opt/kafka_2.13-3.5.0/logs/ -type f -name "*.log.*" -mtime +3 -exec rm -rf {} \;
chmod +x auto_delete_logs.sh
# 创建crontab定时任务,添加下面行
crontab -e
0 1 * * * /opt/kafka_2.13-3.5.0/bin/auto_delete_logs.sh
【XI】Debezium UI安装
# 安装yum管理工具
yum install -y yum-utils
# 添加阿里云镜像
yum-config-manager --add-repo https://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo
# 查看ce版本,然后选择一个安装
yum list docker-ce --showduplicates|sort -r
docker-ce.x86_64 3:24.0.4-1.el7 docker-ce-stable
docker-ce.x86_64 3:24.0.3-1.el7 docker-ce-stable
docker-ce.x86_64 3:24.0.2-1.el7 docker-ce-stable
docker-ce.x86_64 3:24.0.1-1.el7 docker-ce-stable
docker-ce.x86_64 3:24.0.0-1.el7 docker-ce-stable
docker-ce.x86_64 3:23.0.6-1.el7 docker-ce-stable
docker-ce.x86_64 3:23.0.5-1.el7 docker-ce-stable
docker-ce.x86_64 3:23.0.4-1.el7 docker-ce-stable
docker-ce.x86_64 3:23.0.3-1.el7 docker-ce-stable
docker-ce.x86_64 3:23.0.2-1.el7 docker-ce-stable
docker-ce.x86_64 3:23.0.1-1.el7 docker-ce-stable
docker-ce.x86_64 3:23.0.0-1.el7 docker-ce-stable
docker-ce.x86_64 3:20.10.9-3.el7 docker-ce-stable
docker-ce.x86_64 3:20.10.8-3.el7 docker-ce-stable
docker-ce.x86_64 3:20.10.7-3.el7 docker-ce-stable
# 不指定版本的话默认是安装最新版
yum install docker-ce docker-ce-cli containerd.io
# 启动docker
systemctl enable docker
systemctl start docker
systemctl status docker
# 查看docker版本
docker version
# 修改docker镜像仓库后重启
cat > /etc/docker/daemon.json << EOF
{
"registry-mirrors":["http://hub-mirror.c.163.com"]
}
EOF
systemctl restart docker
# 安装Debezium-UI, 成功运行后可在浏览器上访问当前机器的8080
docker run -d -it --rm --name debezium-ui -p 8080:8080 -e KAFKA_CONNECT_URIS=http://10.101.1.45:8083 quay.io/debezium/debezium-ui:2.2
【XII】Confluent安装配置
#Confluent也打包了ZK以及Kafka Connect的所有功能,我这里只用了它的schema-registry功能
# 下载安装包
cd /opt/src
curl -O https://packages.confluent.io/archive/7.4/confluent-community-7.4.1.tar.gz
wget url://confluentinc-kafka-connect-avro-converter-7.4.1.tar.gz # 地址忘记了,可以搜索到
# 解压
tar -xvf confluent-community-7.4.1.tar.gz
mv confluent-7.4.1/ ../
tar -xvf confluentinc-kafka-connect-avro-converter-7.4.1.tar.gz
cp confluentinc-kafka-connect-avro-converter-7.4.1/libs/*.jar /opt/kafka_2.13-3.5.0/plugins
cp /opt/confluent-7.4.1/share/java/schema-registry/*.jar /opt/kafka_2.13-3.5.0/plugins/
# 设置环境变量
vi /etc/profile
# 在末尾增加
export CONFLUENT_HOME=/opt/confluent-7.4.1
export PATH=$CONFLUENT_HOME/bin:$PATH
# 修改schema-registry配置文件
cd /opt/confluent-7.4.1/
vi etc/schema-registry/schema-registry.properties
# 增加或修改如下参数
listeners=http://10.101.1.45:8081
kafkastore.bootstrap.servers=PLAINTEXT://10.101.1.45:9092,10.101.1.46:9092,10.101.1.47:9092,10.101.1.48:9092,10.101.1.49:9092
kafkastore.topic=__confluent_schemas
# 将整个文件夹复制到其他节点
cd /opt
scp -r confluent-7.4.1/ [email protected]:/opt/
scp -r confluent-7.4.1/ [email protected]:/opt/
scp -r confluent-7.4.1/ [email protected]:/opt/
scp -r confluent-7.4.1/ [email protected]:/opt/
# 启动schema-registry,每个节点都要执行
cd /opt/confluent-7.4.1
bin/schema-registry-start -daemon etc/schema-registry/schema-registry.properties > /dev/null &
# 这里有个坑,schema-regeistry集群内部默认用域名+端口访问,所以每台机器的hosts里要做一下设置
vi /etc/hosts
10.101.1.45 bsddts45
10.101.1.46 bsddts46
10.101.1.47 bsddts47
10.101.1.48 bsddts48
10.101.1.49 bsddts49
【XIII】安装mysql
# 下载MySQL安装包后解压
cd /opt/src
wget https://dev.mysql.com/get/Downloads/MySQL-5.7/mysql-5.7.43-1.el7.x86_64.rpm-bundle.tar
tar -xvf mysql-5.7.43-1.el7.x86_64.rpm-bundle.tar
# 卸载MariaDB
rpm -e mariadb-libs --nodeps
# 安装mysql
yum install openssl-devel
rpm -ivh mysql-community-common-5.7.43-1.el7.x86_64.rpm
rpm -ivh mysql-community-libs-5.7.43-1.el7.x86_64.rpm
rpm -ivh mysql-community-libs-compat-5.7.43-1.el7.x86_64.rpm
rpm -ivh mysql-community-devel-5.7.43-1.el7.x86_64.rpm
rpm -ivh mysql-community-client-5.7.43-1.el7.x86_64.rpm
rpm -ivh mysql-community-server-5.7.43-1.el7.x86_64.rpm
# mysql启停命令
systemctl start mysqld
systemctl restart mysqld
systemctl stop mysqld
# 查看默认root密码
more /var/log/mysqld.log | grep password
[Note] A temporary password is generated for root@localhost: hjM;oPtgx2>+
# 登录mysql后修改root密码
mysql -u -p
alter user 'root'@'localhost' identified by 'xxxxxxxxxx';
# 创建可远程登录的账号密码,并赋权
create user 'wine'@'%' IDENTIFIED BY 'xxxxxxxxxx';
grant all on *.* to 'wine'@'%';
# 创建Kafka-Eagle存储数据库ke;
create database ke;
【XIV】安装Kafka-Eagle监控
# 下载ke安装包
cd /opt/src
wget https://github.com/smartloli/kafka-eagle-bin/archive/v3.0.1.tar.gz
tar -xvf v3.0.1.tar.gz
cd kafka-eagle-bin-3.0.1
tar -xvf efak-web-3.0.1-bin.tar.gz
mv efak-web-3.0.1 /opt
# 设置ke环境变量,添加下面几行后保存
vi /etc/profile
# Kafka Eagle Environment
export KE_HOME=/opt/efak-web-3.0.1
export PATH=$PATH:$KE_HOME/bin
# 使环境变量及时生效
source /etc/profile
# 修改ke配置文件
cd /opt/efak-web-3.0.1
vi conf/system-config.properties
# 增加或修改如下参数
efak.zk.cluster.alias=cluster1
cluster1.zk.list=10.101.1.45:2181,10.101.1.46:2181,10.101.1.47:2181,10.101.1.48:2181,10.101.1.49:2181
efak.driver=com.mysql.cj.jdbc.Driver
efak.url=jdbc:mysql://127.0.0.1:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
efak.username=wine
efak.password=x
【XV】采用Flink接avro格式数据的一些注意事项
1、Avro对数据类型匹配要求非常高,所以在Flink SQL中定义的数据类型要和源库基本保持一致
2、小数点后的精度务必保持一致
3、Flink默认的时间戳精度只有3位,碰到源库时间精度到微秒的,要用BIGINT接,然后再/1000000 -28800
4、Oracle中的特殊字段类型RAW, 要用BYTES接
5、with参数需要制定avro格式,如果是confluent的还要指定schema-registry,示例如下
WITH (
'connector' = 'kafka'
,'topic' = 'EWM.SAPSR3.ZTMAST_UNIT'
,'properties.bootstrap.servers' = '10.101.1.45:9092,10.101.1.46:9092,10.101.1.47:9092,10.101.1.48:9092,10.101.1.49:9092'
,'format' = 'avro-confluent'
,'avro-confluent.schema-registry.url' = 'http://10.101.1.45:8081,http://10.101.1.46:8081,http://10.101.1.47:8081,http://10.101.1.48:8081,http://10.101.1.49:8081'
,'scan.startup.mode' = 'latest-offset'
);
【XVI】监控消费延时,并实现钉钉机器人预警
vi /opt/kafka_2.13-3.5.0/bin/consumer_lag_alert.sh
#!/bin/sh
# kafka的所属目录 运行命令依赖其下bin中相关命令
FAKFA_DIR=/opt/kafka_2.13-3.5.0
# 默认积压报警条数
DEFAULT_LIMIT=100
# group信息的临时文件
GROUP_INFO_FILE=/opt/kafka_2.13-3.5.0/logs/monitor-group-infos.txt
# kafka的broker 多个逗号隔开
BROKER=10.101.1.45:9092,10.101.1.46:9092,10.101.1.47:9092,10.101.1.48:9092,10.101.1.49:9092
# 写入文件
${FAKFA_DIR}/bin/kafka-consumer-groups.sh --bootstrap-server ${BROKER} --describe --all-groups > ${GROUP_INFO_FILE}
error_infos=()
# 遍历每一行 判断
while read line
do
if [[ -n ${line} ]]; then
# 去掉标题行
if [[ $line =~ CONSUMER-ID ]];then
# nothing
# echo "${line}"
WINE=""
else
group=`echo $line | awk -F' ' '{print $1}'`
topic=`echo $line | awk -F' ' '{print $2}'`
partition=`echo $line | awk -F' ' '{print $3}'`
lag=`echo $line | awk -F' ' '{print $6}'`
consumer_id=`echo $line | awk -F' ' '{print $7}'`
# echo "$topic $lag"
# lag为 - 或者consumer_id为 - 的忽略
if [[ "${lag}" = "-" ]] || [[ "${consumer_id}" = "-" ]];then
# nothing
consumer_id="-"
elif [ $((lag)) -ge $((DEFAULT_LIMIT)) ];then
# 超过设置阈值则报警
#echo ">$((DEFAULT_LIMIT)) $((lag)) ${line}"
error_infos[${#error_infos[*]}]=">Group:${group}, Topic:${topic}, Partition:${partition}, 积压数:${lag} \n"
fi
fi
fi
done < ${GROUP_INFO_FILE}
# 非空的 发送钉钉消息提醒
if [[ -n ${error_infos[*]} ]]; then
curl -s -XPOST -H 'Content-Type: application/json' https://oapi.dingtalk.com/robot/send?access_token=xxxxxxxxxx -d "{\"msgtype\":\"text\",\"te
xt\":{\"content\":\"数据复制平台消费积压报警\\n ${error_infos[*]} \"},\"at\":{\"atUserIds\":[\"0001\",\"0002\"]}}"
fi
标签:opt,10.101,http,8081,KafkaConnect,Confluent,ce,kafka,企业级 From: https://blog.51cto.com/xwhite/7147959