首页 > 其他分享 >kafka

kafka

时间:2023-09-14 15:59:01浏览次数:32  
标签:bin Kafka 发送 sh 线程 kafka

Kafka学习笔记_day01
适用场景:大数据场景
消息队列模式
点对点模式
消费者主动拉取数据,消息收到以后清除消息
发布/订阅模式
可以存在多个Topic主题
消费者消费完数据以后,不删除数据
每个消费者相互独立,都可以消费到数据
基础架构
内部将一个Topic(主题)分为了多个partition(分区),并配合分区的设计,在消费者端设计了消费者组的概念,组内的每个消费者并行消费,也就是说同一个组里的消费者不能同时消费同一个分区内的消息。

为了提高可用性,为每一个partition增加了若干个副本,即存在Leader和Follower,所有的消息的生产和消费都是基于Leader进行的,当Leader挂掉以后,将选举出新的Leader。

快速入门Kafka
kafka的官网下载地址:https://kafka.apache.org/downloads.html

 

 

1)解压下载的tar包:
tar -zxvf xxxx.tgz -C 复制的目录


2)修改解压后的文件名称:
mv kafka_2.12-3.0.0/ kafka


3)进入到kafka的目录下,里面存在三个重要配置的配置文件
server.properties
producer.properties
consumer.properties


其中,对于server.properties配置文件进行修改:

#broker.id 为每一个kafka的唯一标识,不能重复
broker.id = 0
#kafka运行日志(数据)存放的路径,路径不需要重新创建,配置好kafka会自动帮你创建,可以配置多个磁盘路径,路径与路径之间可以用","分隔
log.dirs=opt/module/kafka/datas
#配置连接Zookeeper的集群地址(ps:在zk的根目录下创建/kafka目录,方便进行管理)
zookeeper.connect= 服务器1:2181,服务器2:2181,服务器3:2181/kafka


#其他参数
#处理网络请求的线程数量
num.network.threads=3
#用来处理磁盘 IO 的线程数量
num.io.threads=8
#发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
#接收套接字的缓冲区大小
socket.receive.buffer.bytes=102400
#请求套接字的缓冲区大小
socket.request.max.bytes=104857600
#topic 在当前 broker 上的分区个数
num.partitions=1
#用来恢复和清理 data 下数据的线程数量
num.recovery.threads.per.data.dir=1
# 每个 topic 创建时的副本数,默认时 1 个副本
offsets.topic.replication.factor=1
#segment 文件保留的最长时间,超时将被删除
log.retention.hours=168
#每个 segment 文件的大小,默认最大 1G
log.segment.bytes=1073741824
# 检查过期数据的时间,默认 5 分钟检查一次是否数据过期
log.retention.check.interval.ms=300000

 


配置完毕以后进入其他服务器进行kafka的分发安装包工作,同时按照上述操作修改其broker.id值,保持其唯一性

#使用脚本xsync进行分发操作
xsync kafka/

4)配置环境变量
在/etc/profile.d/my_env.sh文件中增加kafka环境变量的配置:

vim /etc/profile.d/my_env.sh

添加以下关于Kafka的环境信息:

# KAFKA_HOME

export KAFKA_HOME=/opt/module/kafka
export PATH=$PATH:$KAFKA_HOME/bin

 


配置完成以后,进行环境变量的刷新工作

source /etc/profile

同时将环境变量的配置分发到其他节点中,并进行source操作:

xsync /etc/profile.d/my_env.sh
source /etc/profile

此时,kafka的安装及配置工作基本完成。

注意:上面使用的xsnc脚本:

#!/bin/bash

#1. 判断参数个数
if [ $# -lt 1 ]
  then
    echo Not Enough Arguement!
  exit;
fi

#2. 遍历集群所有机器
for host in hadoop102 hadoop103 hadoop104[你的集群服务器名称]
do

  echo ==================== $host ====================
  #3. 遍历所有目录,挨个发送

  for file in $@
  do
    #4. 判断文件是否存在
    if [ -e $file ]
    then
     #5. 获取父目录
      pdir=$(cd -P $(dirname $file); pwd)

    #6. 获取当前文件的名称
      fname=$(basename $file)
      ssh $host "mkdir -p $pdir"
      rsync -av $pdir/$fname $host:$pdir
    else
      echo $file does not exists!
    fi
  done

done

 


5)启动Kafka
启动kafka之前,需要先启动Zookeeper集群服务。

#使用zk.sh脚本启动zookeeper集群
zk.sh start

#zk.sh脚本

case $1 in
"start"){
for i in hadoop102 hadoop103 hadoop104[服务器名称]
do
echo ------------- zookeeper $i 启动 ------------
ssh $i "/opt/module/zookeeper-3.5.7/bin/zkServer.sh start"
done
}
;;
"stop"){ for i in hadoop102 hadoop103 hadoop104 do echo ------------- zookeeper $i 停止 ------------ ssh $i "/opt/module/zookeeper-3.5.7/bin/zkServer.sh stop" done } ;;
"status"){ for i in hadoop102 hadoop103 hadoop104 do echo ------------- zookeeper $i 状态 ------------ ssh $i "/opt/module/zookeeper-3.5.7/bin/zkServer.sh status" done } ;;
esac

 



依次在集群服务器节点启动kafka:

#先进入到kafka根目录
bin/kafka-server-start.sh -daemon config/server.properties

启动/停止脚本
开发/生产环境中,手动启动kafka集群效率较低,一般编写启动/停止脚本来进行kafka集群的统一启动和关闭。无论是启动还是停止命令,对应的命令路劲均为绝对路径,而非相对路径

#! /bin/bash
case $1 in
"start"){
for i in hadoop102 hadoop103 hadoop104[服务器名称]
do
echo " --------启动 $i Kafka-------"
ssh $i "/opt/module/kafka/bin/kafka-server-start.sh -
daemon /opt/module/kafka/config/server.properties"
done
};;
"stop"){
for i in hadoop102 hadoop103 hadoop104
do
echo " --------停止 $i Kafka-------"
ssh $i "/opt/module/kafka/bin/kafka-server-stop.sh "
done
};;
esac

 

注意:

关闭kafka命令到所有的kafka集群关闭成功提示存在一定的时间差,一定要等kafka集群全部关闭完毕以后,再进行Zookeeper集群的关闭操作!!!
Topic命令
操作主题命令参数

bin/kafka-topics.sh
其中,具体包括主题的CRUD操作:

参数 描述
–bootstrap-server <String: server toconnect to> 连接的Kafka Broker主机名称和端口号
–topic <String: topic> 操作的topic名称
–create 创建主题
–delete 删除主题
–alter 修改主题
–list 查看所有主题
–describe 查看主题的详细描述
–partitons 设置分区数
–replication-factor 设置分区的副本数
–config 更新系统默认的配置
Kafka之生产者
生产者消息发送流程
在消息发送过程中,存在着两个线程:

Main线程:将消息发送给RecordAccumulator的队列中
Sender线程:从RecordAccumulator的队列中拉取消息发送到Kafka Broker
具体流程图:


其中:

RecordAccumulator的默认存储大小为32M,内部每一个消息队列的默认大小为16K。只有当队列的数据达到batch.size以后,sender才会拉取对应的消息,默认batch.size大小为16k;同时,如果数据未达到batch.size,但是sender线程等待时间达到linger.ms时也会发送消息到sender线程。

Sender线程中的请求时基于Kafka节点(Broker)的,每个broker节点上默认最多缓存5个请求,Selector则构成请求的IO通道,当请求通过Selector选择器到达Kafka集群,进行对应的消息处理,处理完成后会给予对应的acks(应答),这里存在三种应答策略:

0:表示生产者发送过来的数据,不需要等数据落盘,直接给予应答
1:生产者发送过来的数据,只要Leader收到数据即应答
-1(all):生产者发送过来的数据,Leader和ISR队列里面的所有节点收齐数据后应答。
如果应答成功,则需要删除Sender中的对应请求,同时删除对应队列中的请求;若应答失败,可以进行重试机制

异步发送
依赖添加:

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.0.0</version>
</dependency>

创建一个CustomProducer进行消息的生产:

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class CustomProducer {
public static void main(String[] args) throws
InterruptedException {
// 1. 创建 kafka 生产者的配置对象
Properties properties = new Properties();
// 2. 给 kafka 配置对象添加配置信息:bootstrap.servers
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"hadoop102:9092");
// key,value 序列化(必须):key.serializer,value.serializer
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
// 3. 创建 kafka 生产者对象
KafkaProducer<String, String> kafkaProducer = new
KafkaProducer<String, String>(properties);
// 4. 调用 send 方法,发送消息
for (int i = 0; i < 5; i++) {
kafkaProducer.send(new
ProducerRecord<>("first","message " + i));
}
// 5. 关闭资源
kafkaProducer.close();
}
}

 

异步发送带回调函数
回调函数会在 producer 收到 ack 时调用,为异步调用,该方法有两个参数,分别是元 数据信息(RecordMetadata)和异常信息(Exception),如果 Exception 为 null,说明消息发 送成功,如果 Exception 不为 null,说明消息发送失败。

在send方法中添加回调逻辑:

kafkaProducer.send(new ProducerRecord<>("first",
"message " + i), new Callback() {
// 该方法在 Producer 收到 ack 时调用,为异步调用
@Override
public void onCompletion(RecordMetadata metadata,
Exception exception) {
if (exception == null) {
// 没有异常,输出信息到控制台
System.out.println(" 主题: " +
metadata.topic() + "->" + "分区:" + metadata.partition());
} else {
// 出现异常打印
exception.printStackTrace();
}
}
});
// 延迟一会会看到数据发往不同分区
Thread.sleep(2);
}

 



同步发送
只需要在异步发送的基础上调用一个get()方法即可

kafkaProducer.send(new ProducerRecord<>("first","message" + i)).get();

TRANSLATE with x English
Arabic Hebrew Polish
Bulgarian Hindi Portuguese
Catalan Hmong Daw Romanian
Chinese Simplified Hungarian Russian
Chinese Traditional Indonesian Slovak
Czech Italian Slovenian
Danish Japanese Spanish
Dutch Klingon Swedish
English Korean Thai
Estonian Latvian Turkish
Finnish Lithuanian Ukrainian
French Malay Urdu
German Maltese Vietnamese
Greek Norwegian Welsh
Haitian Creole Persian  
  TRANSLATE with COPY THE URL BELOW Back EMBED THE SNIPPET BELOW IN YOUR SITE Enable collaborative features and customize widget: Bing Webmaster Portal Back     此页面的语言为中文(简体)   翻译为        
  • 中文(简体)
  • 中文(繁体)
  • 丹麦语
  • 乌克兰语
  • 乌尔都语
  • 亚美尼亚语
  • 俄语
  • 保加利亚语
  • 克罗地亚语
  • 冰岛语
  • 加泰罗尼亚语
  • 匈牙利语
  • 卡纳达语
  • 印地语
  • 印尼语
  • 古吉拉特语
  • 哈萨克语
  • 土耳其语
  • 威尔士语
  • 孟加拉语
  • 尼泊尔语
  • 布尔语(南非荷兰语)
  • 希伯来语
  • 希腊语
  • 库尔德语
  • 德语
  • 意大利语
  • 拉脱维亚语
  • 挪威语
  • 捷克语
  • 斯洛伐克语
  • 斯洛文尼亚语
  • 旁遮普语
  • 日语
  • 普什图语
  • 毛利语
  • 法语
  • 波兰语
  • 波斯语
  • 泰卢固语
  • 泰米尔语
  • 泰语
  • 海地克里奥尔语
  • 爱沙尼亚语
  • 瑞典语
  • 立陶宛语
  • 缅甸语
  • 罗马尼亚语
  • 老挝语
  • 芬兰语
  • 英语
  • 荷兰语
  • 萨摩亚语
  • 葡萄牙语
  • 西班牙语
  • 越南语
  • 阿塞拜疆语
  • 阿姆哈拉语
  • 阿尔巴尼亚语
  • 阿拉伯语
  • 韩语
  • 马尔加什语
  • 马拉地语
  • 马拉雅拉姆语
  • 马来语
  • 马耳他语
  • 高棉语
 

标签:bin,Kafka,发送,sh,线程,kafka
From: https://www.cnblogs.com/gaoyuechen/p/17702692.html

相关文章

  • KafKa概述
    概述KafKa就是一个消息队列:作用概况为:解耦、异步、削峰https://juejin.cn/post/6996826368512098317使用消息队列的好处解耦(类似Spring的IOC)允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。可恢复性系统的一部分组件失效时,不会影响到整个......
  • RabbitMQ、RocketMQ和Kafka的不同之处
    RabbitMQ、RocketMQ和Kafka是三种常见的消息队列系统,它们在设计和使用方面有一些不同之处:架构设计:RabbitMQ:RabbitMQ是一个基于AMQP(高级消息队列协议)的开源消息队列系统,采用的是传统的Broker架构模式,其中包括生产者、消费者和中间件(Broker)。RocketMQ:RocketMQ是一个基于分布式......
  • OGG-Postgres实时同步到Kafka
    (一)数据同步信息名称源端名称目标端数据库类型Postgresql12.4组件类型KafkaIP地址20.2.127.23Broker地址20.2.125.52:9092,20.2.127.23:9092,20.2.127.24:9092端口5432端口9092数据库testpdbZookeeperHa......
  • SpringBoot-Learning系列之Kafka整合
    SpringBoot-Learning系列之Kafka整合本系列是一个独立的SpringBoot学习系列,本着WhatWhyHow的思想去整合Java开发领域各种组件。消息系统主要应用场景流量消峰(秒杀抢购)、应用解耦(核心业务与非核心业务之间的解耦)异步处理、顺序处理实时数据传输管道异构语言架构......
  • kafka3.x 简单使用
    ***保证kafka和zookeeper已经在linux上进行了安装,目录需要改为自己的目录 ***kafka2.8之后引入了kraft机制,不用zookeeper也能启动参数介绍 --create创建一个topic --topic[your_topic_name]创建的topic的信息 --describe描述信息 --bootstrap-server[host_url......
  • 【Kafka】ZooKeeper启动失败报错java.net.BindException_ Address already in use_ bi
    问题描述Kafka2.8.1ZooKeeper启动失败。zookeeper-server-start.bat../../config/zookeeper.properties[2023-09-0418:21:49,497]INFObindingtoport0.0.0.0/0.0.0.0:2181(org.apache.zookeeper.server.NIOServerCnxnFactory)[2023-09-0418:21:49,498]ERRORUnexpected......
  • Strimzi从入门到精通系列之三:部署Kafka Connect
    Strimzi从入门到精通系列之三:部署KafkaConnect一、概述二、将KafkaConnect部署到Kubernetes集群三、KafkaConnect配置四、为多个实例配置KafkaConnect五、添加连接器六、自动使用连接器插件构建新的容器映像七、使用KafkaConnect基础镜像中的连接器插件构建新的容器镜......
  • Strimzi从入门到精通系列之二:部署Kafka
    Strimzi从入门到精通系列之二:部署Kafka一、认识Strimzi二、Strimzi的核心知识点三、Kafka集群、TopicOperator、UserOperator四、部署Kafka集群五、使用ClusterOperator部署TopicOperator六、使用ClusterOperator部署UserOperator一、认识StrimziStrimzi是一款用于在......
  • rdkafka编译
    1、下载源码rakafka源码https://github.com/confluentinc/librdkafka 2、安装openssl,可以参考https://www.cnblogs.com/ho966/p/15916018.html3、如果要支持gssapi,需要依赖cyrus-sasl和krb53.1安装krb5下载源码https://kerberos.org/dist/krb5/1.20/krb5-1.2......
  • OGG-将PostgreSQL通过OGG_BigData同步到Kafka后数据存在8小时时间差
    问题描述:将PostgreSQL通过OGG_BigData同步到Kafka后数据存在8小时时间差。 问题原因:kafka.properties中的参数goldengate.userexit.timestamp=utc解决办法:修改kafka.properties中的参数goldengate.userexit.timestamp为utc+8,然后重启目标端replicat进程。 ......