首页 > 其他分享 >Kafka数据迁移全解析:同集群和跨集群

Kafka数据迁移全解析:同集群和跨集群

时间:2024-12-27 17:58:25浏览次数:6  
标签:broker kafka json 集群 offset 迁移 解析 Kafka

文章目录


Kafka两种迁移场景,分别是同集群数据迁移、跨集群数据迁移。

一、同集群迁移

在这里插入图片描述
应用场景
broker 迁移 主要使用的场景是broker 上线,下线,或者扩容等.基于同一套zookeeper的操作。

实践
将需要新添加的broker 列表一并添加到kafka的集群中。(启动新的kafka指定同一套zk)Kafka由之前的三节点,扩容至四节点。
在这里插入图片描述
数据迁移

  1. 查询信息:(四个分区分布在三台机器上)
    在这里插入图片描述
  2. 新建json文件:
cat topic-to-move.json

{
"topics": [
{"topic": "test-topic"}
],
"version":1
}
  1. 获取重新分配方案:
kafka-reassign-partitions.sh --zookeeper node1:2181,node2:2181,node3:2181 --topics-to-move-json-file topics-to-move.json --broker-list "150,151,155,159" –generate

##通过kafka-reassign-partitions.sh 获取重新分配方案,–broker-lsit 的参数 “150,151,155,159"是指集群中每个broker的id,由于我们是需要将所有topic均匀分配到扩完结点的4台机器上,所以要指定。同理,当业务改变为将原来的所有数据从旧节点(0,5,9)迁移到新节点(1)实现数据平滑迁移,这时的参数应"4”
,执行后会出现以下内容:
在这里插入图片描述
复制新的方案到一个json文件 assignplan.json (文件名不重要,文件格式也不一定要以json为 结尾,只要保证内容是json即可)

 ./kafka-reassign-partitions.sh --zookeeper node1:2181 --reassignment-json-file assignplan.json --execute

在这里插入图片描述
完成后查看topic信息:(四个分区分布在四台机器上)
在这里插入图片描述

二、跨集群迁移

在这里插入图片描述
应用场景
主要用于kafka 集群的变更. 将数据同步等操作. 相当于是实现了双打.等各项数据消费端在零误差的对接好了后,可以停掉就集群

数据迁移:
MirrorMaker

  1. 修改kafka配置
    consumer.properties(内容为原始集群信息)
    在这里插入图片描述
#config/consumer.properties 在网上看到有在此配置zookeeper的应该是之前的老版本。kafka_2.11-2.4.1中不需要
bootstrap.servers=kafka-cluster1:9092,kafka-cluster1:9093 # source-cluster的broker list
group.id=test-consumer-group1 # 自定义一个消费者的group id
auto.offset.reset= # latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据; earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费; none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常

producer.properties(内容为新集群信息)
在这里插入图片描述
#config/producer.properties 在网上看到有在此配置zookeeper的应该是之前的老版本。kafka_2.11-2.4.1中不需要
bootstrap.servers=kafka-cluster2:9092,kafka-cluster2:9093 # destination-cluster的broker list
compression.type=none # 数据压缩方式none, gzip, snappy, lz4, zstd
partitioner.class= # 指定分区程序路径,默认为随机分区
request.timeout.ms= # 请求超时时间
max.block.ms= # KafkaProducer.send and KafkaProducer.partitionsFor 阻塞时间
linger.ms= # 等待指定时间后批量发送
max.request.size= # 发送消息最大字节数
batch.size= # 单次批量处理的字节数
buffer.memory= # 指定等待发送消息的缓冲区大小

执行操作:

./kafka-mirror-maker.sh --consumer.config ../config/consumer.properties --producer.config ../config/producer.properties --whitelist 'test'

说明:

  1. –num.streams: 指定流就是指定消费者,所有消费者公用一个生产者。
  2. –whitelist: 表明需要同步的白名单,可以使用”|”来连接多个topic,还可以使用正则表达式。可设置黑名单。

ZK迁移:

  1. zk迁移就比较简单了,起新节点加入zk集群,稳定后关停旧节点。
  2. 新增broker加入集群,将所有topic分区只分配给新broker,执行分配任务后,kafka将旧broker的分区数据复制到新broker,新broker成为各分区的leader,随后kafka删除旧broker上的分区数据;
  3. 整个过程中客户端应用正常生产消费消息,执行结束后使用新的消费者组从头消费可以获取到全部历史消息。
  4. 停止旧broker后,正在运行的客户端应用正常生产消费消息,新建客户端连接旧broker失败,连接新broker正常

标签:broker,kafka,json,集群,offset,迁移,解析,Kafka
From: https://blog.csdn.net/qq_40477248/article/details/144759436

相关文章

  • 深入解析如何从Snowflake加载文档
    #深入解析如何从Snowflake加载文档老铁们,这篇文章我们来聊聊如何从Snowflake这个强大的数据仓库中加载文档。这个技术点其实不难,重点是找对工具和方法。下面我会带大伙详细过一遍原理,顺便分享一些我的踩坑经验。##技术背景介绍Snowflake是一个非常流行的云数据仓库......
  • Kafka Broker、Producer、Consumer配置参数
    参数的设置对Kafka性能有着至关重要的影响。以下是一些关键参数及其对性能的具体影响:KafkaBroker配置参数num.network.threads:控制Kafka网络线程的数量,这些线程负责处理网络I/O操作。增加此参数的值可以提高网络I/O处理能力,但也会增加内存消耗。num.io.threads:控制KafkaI/O......
  • 学习干货万字全面解析网络安全、黑客技术,小白看完面试网安工作和护网蓝队初级竟然秒通
    前言本次环境以DVWA靶场(不太安全的网站)及CTF题目(夺旗赛)先对OWASPTOP10漏洞原理通俗概述,接着对基础代码解析,然后执行的命令落地到本地复现,前端进行复现后分析流量包,植入CTF题目,最后演示WAF流量经过,以及最高级别代码防护分析包括最终流程图,分析较为详细,对于初学者,网安爱......
  • DNS解析 电子邮件安全协议 DMACR
    什么是DMARC记录?DMARC记录是一条发布在域名上面的,在DNS中的TXT记录,位于_dmarc.yourdomain.com,在这里“yourdomain.com”是实际的域名或者子域名。它告诉接收邮件的服务器当邮件在DMARC验证中失败时,应该如何处理,并且应该把邮件验证数据发往哪里。DMARC记录由一系列......
  • 香港服务器带宽计费方式解析
    香港服务器带宽计费方式解析在当今数字化时代,服务器扮演着至关重要的角色,而香港作为亚太地区的网络枢纽,其服务器备受关注。其中,带宽计费方式是用户选择香港服务器时需要重点考量的因素之一。一、按流量计费这是较为常见的一种计费模式。用户根据服务器在一定时......
  • 揭秘 Transformer 内部原理:八问八答全解析!
    近期,SakanaAI发表了一篇题为《TransformerLayersasPainters》的论文,探究了预训练transformer中的信息流,并针对仅解码器和仅编码器冻结transformer模型进行了一系列实验。请注意,该研究没有对预训练模型进行任何类型的微调。论文地址:https://arxiv.org/pdf/2407......
  • DNS域名解析服务
    一、实验环境二、实验步骤实验一(搭建DNS服务器)1、安装dns(主DNS)yum-yinstallbind-chroot.x86_642、修改主配置文件(主DNS)vim/etc/named.conf修改:vim/etc/named.rfc1912.zones修改:3、创建正向和反向区域数据文件(主DNS)cd/var/named/cpnamed.localhostlkk.co......
  • Java 并发编程:原子类(Atomic Classes)核心技术的深度解析
    Java并发编程:原子类(AtomicClasses)核心技术的深度解析在高并发场景下,线程安全是一个重要的话题。Atomic类通过高效的CAS(Compare-And-Swap)机制,为开发者提供了一种无需锁的线程安全解决方案。本篇文章将系统讲解Java原子类的核心概念、常用成员、使用方法以及实际应用。......
  • FutureTask 源码解析
    一.FutureTask继承接口    FutureTask是Future的具体实现。FutureTask实现了RunnableFuture接口。RunnableFuture接口又同时继承了Future和Runnable接口。所以FutureTask既可以作为Runnable被线程执行,又可以作为Future得到Callable的返回值。二.FutureTask属性......
  • 完成这个函数,解析带符号或不带符号的整形
    fninteger64(input:&str)->IResult<&str,Token>{//map(digit1,|s:&str|Token::Number(s.parse().unwrap()))(input)}完成这个函数,解析带符号或不带符号的整形UUUUUUUUUUUUUUUUUUUUUUU为了完成integer64函数并解析带符号或不带符号的64位整数,我们需要使用......