首页 > 其他分享 >Kafka篇

Kafka篇

时间:2024-05-23 10:54:43浏览次数:17  
标签:--------------------------------------------------------------------------------

----------------------------------------------------------------------------------------
消息队列分类:
点对点(Queue),将消息发送到queue中,可以有多个消费者,但一个消息只能被一个消费者消费。
发布与订阅(Topic),将消息发布到topic(主题)中,有多个消费者订阅某消息。发布到topic中的消息会被所有订阅者消费,kafka处理topic类型的消息
----------------------------------------------------------------------------------------
Kafka作用:
发送通知;采集日志;监控性能(如cpu利用率、磁盘利用率等)
----------------------------------------------------------------------------------------
一个Kafka可以有多个Broker,Broker是服务总线,Kafka集群的所有服务注册到Zookeeper集群中,一个Kafka宕机,Zookeeper可以将其信息同步到其他服务中。
----------------------------------------------------------------------------------------
基本概念:
Topic:Kafka处理的消息源
Partition(分区):Topic物理上的分组,一个Topic多个Partition
Message:消息,通信的基本单位
Producer:生产者,向Kafka的一个Topic发布消息的过程叫生产
Consumer:消费者,订阅Topic并处理其发布的消息的过程叫消费
Broker:缓存代理,Kafka的服务总线
----------------------------------------------------------------------------------------
修改配置文件config/server.properties,指定日志路径log.dirs和zookeeper连接地址zookeeper.connect=ip:port,ip:port,ip:port,先启动zookeeper集群,在启动kafka,如下:
启动kafka:bin/kafka-server-start.sh config/server.properties &
关闭kafka:bin/kafka-server-stop.sh config/server.properties
查看进程jps
----------------------------------------------------------------------------------------
单机多broker:复制一份配置文件,改个日志目录,改个broker.id和端口号port,根据配置文件启动多个broker即可。
----------------------------------------------------------------------------------------
多机多broker,集群:一样,确保broker.id不一致就行了
----------------------------------------------------------------------------------------
自带命令:kafka-console-producer.sh是生产者脚本、kafka-console-consumer.sh是消费者脚本,可以用脚本创建topic,发送、接收消息。
----------------------------------------------------------------------------------------
Kafka的元数据在ZooKeeper中存储结构:
consumers:消费者信息
config:Kafka的配置信息
controller:存储中央控制器的broker信息
admin:管理信息
brokers:broker元信息,topic信息
----------------------------------------------------------------------------------------
Kafka的核心配置:
broker.id =0,集群中的唯一标识,即使ip变了,但是broker.id没变,不会影响消息的消费;
日志、端口、最大处理线程、zookeeper.connect = ip:端口,zookeeper超时时间,leader和follower同步时间等
----------------------------------------------------------------------------------------
java连接kafka:导入所需jar包,解压Kafka后,服务器上有。
生产者主要代码:new一个Properties,给它put一个zookeeper.connect,key是zookeeper.connect,value是ip:port字符串,逗号分隔,即指定zookeeper地址。put一个metadata.broker.list,value是ip:port,通过Properties,声明了zookeeper地址和broker地址。通过Producer producer=new Producer(new ProducerConfig(prop));创建一个Producer实例。发送消息:producer.send(new KeyedMessage(this.topic, "你好,这是发送的消息,哈哈哈!"));
消费者主要代码:new一个Properties,给它put一个zookeeper.connect,key是zookeeper.connect,value是ip:port字符串,逗号分隔,即指定zookeeper地址。指明消费者组,prop.put("group.id", "group1");指明zookeeper连接超时时间prop.put("zookeeper.connection.timeout.ms", "60000");然后创建一个消费者连接ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(new ConsumerConfig(prop));通过这个连接获取消费者,ConsumerConnector consumer = createConsumer();
消费者获取数据:consumer.createMessageStreams,代码如下
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(this.topic, 1);//一次从这个topic中获取一条记录
Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams = consumer.createMessageStreams(topicCountMap);
KafkaStream<byte[], byte[]> stream = messageStreams.get(this.topic).get(0);
ConsumerIterator<byte[], byte[]> iterator = stream.iterator();
while(iterator.hasNext()){
String message = new String(iterator.next().message());
System.out.println("接受数据:" + message);
}
重点,key-value,Kafka流的List,通过topic获得。本来是一个map,topic是key。获得Kafka流,进而获得数据。
坑:关闭生产者和消费者,重新打开消费者会有重复数据,消费过的数据再接收一遍,这块需要和已接收的数据比对,否则数据有问题。
配置Flume,当有日志文件变化时,Flume会马上读取内容,然后将内容发送给Kafka,需要在Flume的配置文件中配置Kafka的ip和端口,Kafka接收的数据可以写到Hadoop中。
----------------------------------------------------------------------------------------

SpringBoot整合Kafka:
引入spring-kafka依赖
配置Kafka地址,可以多个如spring.kafka.bootstrap-servers=localhost:9092
配置生产者,指定每次批量发送消息的数量,指定消息key和消息体的编解码方式
配置消费者,指定group id,指定消息key和消息体的编解码方式
发送消息:注入KafkaTemplate,执行send方法
kafkaTemplate.send("user", JSON.toJSONString(封装的对象实体));其中user是topic
接收消息:接收方法上加注解@KafkaListener,指定topic,通过ConsumerRecord参数获取,如下
@Component
public class Consumer {
@KafkaListener(topics = "user")
public void consumer(ConsumerRecord consumerRecord){
Optional<Object> kafkaMassage = Optional.ofNullable(consumerRecord.value());
if(kafkaMassage.isPresent()){
Object o = kafkaMassage.get();
System.out.println(o);
}
}
}
----------------------------------------------------------------------------------------

 

标签:--------------------------------------------------------------------------------
From: https://www.cnblogs.com/javasl/p/13435775.html

相关文章

  • kafka再学习,上海银行面试后
    卡夫卡消费者  offset值,消费到哪里了呢?之前存储在zookeeper,后面kafka保存在一个主题里,并持久化到硬盘,相当安全消费者组ID用命令行创建会默认给你一个  cororifnator协调器,分区初始化对主题数50取模,选择有一个corrdinator超高频面试题再平衡 保持3秒的心跳,na......
  • Java核心面试知识集—Kafka面试题
    目录基础篇1、TCP、UDP的区别?2、TCP协议如何保证可靠传输?3、TCP的握手、挥手机制?4、TCP的粘包/拆包原因及其解决方法是什么?5、Netty的粘包/拆包是怎么处理的,有哪些实现?6、同步与异步、阻塞与非阻塞的区别?7、说说网络IO模型?8、BIO、NIO、AIO分别是什么?9、select、poll、epoll的机制......
  • Flink同步kafka到iceberg数据延迟,两个checkpoint后才可查询
    一、问题描述用户配置了高级参数很多,观察kafka增量数据不多,flink负载不高情况下两个checkpoint后才可查询到数据。  排查时hdfs有数据文件产生,但是mainfast文件中最新快照id没变化。 二、原因经腾讯排查,用户参数指定高级参数execution.checkpointing.unaligned:true引起......
  • kafka权威指南
    消息有字节数组组成消息Key也是一个字节数组根据消息key的哈希码进行取模后写入不同分区,保证具有相同key的消息总是被写入到相同分区中为了提供消息写入效率,支持消息的分批次写入,批次就是一组消息,每个批次的消息输入同一个主体和分区为了减少网络消耗,批次数据会被压缩kakfa使......
  • kafka安装(windows)
    首先安装zookeeper,kafka启动前需要先启动Zookeeper。1、官网下载kafka,解压https://kafka.apache.org/downloads2、配置安装1、进入D:\Study_Tool\kafka\config目录,找到zookeeper.properties文件。创建一个data目录,用于存放zookeeper的数据文件。2、找到server.properties......
  • Flink同步kafka到iceberg(cos存储)
    一、flink到logger1、sourcecreatetablesource_table(idbigintcomment'唯一编号',order_numberbigintcomment'订单编号',update_timestamptimestamp_ltzmetadatafr......
  • 教你如何搞定springboot集成kafka
    本文分享自华为云社区《手拉手入门springboot+kafka》,作者:QGS。安装kafka启动Kafka本地环境需Java8+以上Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。Kafka启动方式有Zookeeper和Kraft,两种方式只能选择其中一种启动,不能同时使用......
  • offsetExplorer3.0 如何连接加SASL认证的zookeeper、kafka
    offsetExplorer3.0连接速度与查看topic、consumers查询速度显著提升。建议使用offsetExplorer3.0代替旧版offsetExploreroffsetExplorer3.0下载地址:https://www.kafkatool.com/download.html配置方式如下:注意:zookeeper和kafka的地址、端口,可以二选一,只配置一个,也可以全配置。......
  • kafka_2.13-3.7.0 单机版安装
    [root@localhost~]#adduserkafka[root@localhost~]#passwordkafka-bash:password:commandnotfound[root@localhost~]#ls/homejenkinskafkanacos[root@localhost~]#passwdkafkaChangingpasswordforuserkafka.Newpassword:BADPASSWORD:The......
  • kafka数据一致性
    kafka作为商业级中间件,它在设计时优先考虑的可靠性、可用性,同时兼顾一致性,这是所有分布式都会遇到的cap理论,kafka也不例外;可靠性通过副本机制解决,可用性通过leader和follower机制来解决。   kafka的可靠性,根据ack的设置不同,可靠性不同,ack=-1可靠性最高,但效率会稍微低一点。......