首页 > 其他分享 >springboot - kafka实践

springboot - kafka实践

时间:2023-07-19 12:55:20浏览次数:47  
标签:springboot 实践 kafka 消息 key import Kafka properties

Kafka是一个开源的分布式流处理平台,由Apache软件基金会开发和维护。它是一种高性能、可持久化、可扩展的消息队列系统,常用于解决大规模数据传输和处理的问题。

以下是Kafka的一些核心概念和主要特点:

  1. 消息和主题:Kafka基于发布订阅模式,消息被发布到一个或多个主题(Topic)中。每条消息都有一个唯一的标识符称为偏移量(Offset)。

  2. 分区和副本:每个主题可以被分为一个或多个分区(Partition),每个分区在不同的Kafka节点上存储副本(Replica)。分区和副本提供了高可用性和容错性。

  3. 生产者和消费者:生产者(Producer)将消息发布到主题,消费者(Consumer)从主题订阅并消费消息。

  4. 流处理:Kafka支持流处理,即可以实时处理和转换数据流,支持复杂的流处理逻辑。

  5. 可扩展性:Kafka具有良好的可扩展性,可以通过添加更多的节点来增加处理能力和存储容量。

  6. 持久化:Kafka使用日志式存储结构,将所有的消息持久化到磁盘上,从而保证数据的可靠性和持久性。

Kafka在很多场景中被广泛使用,例如实时日志收集、事件驱动架构、微服务通信、大数据处理等。它具有高吞吐量、低延迟、可靠性强的特点,并且提供了一系列的客户端库和工具来简化使用和集成。

一、引入依赖

1     <dependency>
2       <groupId>org.springframework.kafka</groupId>
3       <artifactId>spring-kafka</artifactId>
4     </dependency>

二、生产消息

 1 import org.apache.kafka.clients.producer.KafkaProducer;
 2 import org.apache.kafka.clients.producer.ProducerRecord;
 3 import org.apache.kafka.common.serialization.StringSerializer;
 4 
 5 import java.util.Properties;
 6 
 7 /**
 8  * @Classname KafkaProducer
 9  * @Created by Michael
10  * @Date 2023/7/19
11  * @Description 消息生产者
12  */
13 public class MessageProducer {
14   public static void main(String[] args) {
15     Properties properties = new Properties();
16     properties.setProperty("bootstrap.servers","192.168.3.100:9092");// kafka部署服务器
17     properties.setProperty("key.serializer", StringSerializer.class.getName());
18     properties.setProperty("value.serializer", StringSerializer.class.getName());
19     KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
20 
21     //准备消息
22     String topic = "message"; // 主题
23     Integer partition = 0; // 指定分区
24     long timeMillis = System.currentTimeMillis(); // 毫秒值 15分钟
25     String key = "key-message"; // key
26     String value = "value-this is kafka demo -- "+timeMillis; // value
27 
28     ProducerRecord<String, String> record
29             = new ProducerRecord<>(topic, partition, timeMillis, key, value);
30 
31     //生产消息
32     producer.send(record);
33     producer.close();
34   }
35 
36 }

三、消费消息

 1 import org.apache.kafka.clients.consumer.ConsumerRecords;
 2 import org.apache.kafka.clients.consumer.KafkaConsumer;
 3 import org.apache.kafka.common.serialization.StringDeserializer;
 4 
 5 import java.util.Arrays;
 6 import java.util.Properties;
 7 
 8 /**
 9  * @Classname MessageConsumer
10  * @Created by Michael
11  * @Date 2023/7/19
12  * @Description 消息消费者
13  */
14 public class MessageConsumer {
15   public static void main(String[] args) {
16     Properties properties = new Properties();
17     properties.setProperty("bootstrap.servers","192.168.3.100:9092");
18     properties.setProperty("group.id", "group-1");
19     properties.setProperty("key.deserializer", StringDeserializer.class.getName());
20     properties.setProperty("value.deserializer", StringDeserializer.class.getName());
21 
22     //创建kafka消费者
23     KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
24     //订阅kafka主题
25     consumer.subscribe(Arrays.asList("message"));
26 
27     //持续消费消息
28     ConsumerRecords<String, String> messages;
29 //    while (true) {
30       messages = consumer.poll(1000);//每一秒拉取依次消息
31       if (messages != null && messages.count() > 0) {
32         messages.forEach((record) -> {
33           System.out.printf("========offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value());
34         });
35 //      }
36     }
37 
38   }
39 }

四、测试结果

运行producer发布消息到kafka,consumer因为订阅了消息,所以从kafka中拉取到消息内容

 注意,demo没做特殊处理,所以这里的消息时可以被重复消费的

标签:springboot,实践,kafka,消息,key,import,Kafka,properties
From: https://www.cnblogs.com/lfhappy/p/17565295.html

相关文章

  • docker-compose 安装springboot
    DockerCompose安装SpringBoot在开发和部署SpringBoot应用程序时,使用DockerCompose是一个非常方便和高效的方法。DockerCompose可以帮助我们定义和运行多个Docker容器,从而简化了应用程序的部署过程。本文将介绍如何使用DockerCompose安装和运行一个简单的Spring......
  • SpringBoot + Sharding JDBC 分库分表
    Sharding-JDBC最早是当当网内部使用的一款分库分表框架,到2017年的时候才开始对外开源,这几年在大量社区贡献者的不断迭代下,功能也逐渐完善,现已更名为ShardingSphere,2020年4⽉16日正式成为Apache软件基金会的顶级项目。ShardingSphere-Jdbc定位为轻量级Java框架,在Java的Jdbc层提......
  • 一、创建springboot项目
    1.创建父项目创建新工程父工程无需选择依赖 2.创建第一个子模块选择springweb依赖2.1.项目结构 2.2.pom.xml文件<?xmlversion="1.0"encoding="UTF-8"?><projectxmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/......
  • springBoot 2.7.x整合 swagger2.9
    1.添加依赖<!--swagger--><dependency><groupId>io.springfox</groupId><artifactId>springfox-swagger2</artifactId><version>2.9.2</version></dep......
  • Java的SPI机制实践
    JavaSPI机制概述先给出结论:“Java的SPI是一种服务发现机制,用于约定接口和动态发现实现类,体现了分层解耦的思想”。Java的SPI机制常用于框架扩展或组件替换,最常见的JavaSPI应用就是JDBCDriver,JDK提供了java.sql.Driver接口,却将具体的实现交给了相应的数据库驱动,比如:在mysql-co......
  • springboot开启jdk虚拟线程
    修改编译参数<plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><configuration><compilerArgs>--enable-preview</compilerArgs></configura......
  • 六月学习之Haproxy ACL实践(基于路径调度)
    2、ACL场景实践2.3、ACL案例-基于路径调度#根据用户请求的URL。调度到不同的后端集群用户通过/static调度到172.16.1.7:80用户请求/user调度到172.16.1.8:802.3.1、配置后端节点#web1:/static站点定义cat/etc/nginx/conf.d/www.qingchen.com.confserver{listen80;......
  • Sealos Web UI 公有云部署实践
    不管部署任何开源的产品,首先看他的官网文档或者github的readme、wiki等等这是 sealosgithub readmehttps://github.com/labring/sealos/blob/main/deploy/cloud/README.md  一、准备一台服务器你可以使用自己本地电脑安装vmware创建一台虚拟机,操作系统Ubuntu我这里......
  • springboot下使用rabbitMQ之开发配置方式(二)
    springboot下使用rabbitMQ之传参及序列化(二)消息参数传递在开发中也是个坑,不论使用内置的SimpleMessageConverter还是Jackson2JsonMessageConverter均无法让Consumer接收动态参数一.序列化的问题首先贴出具体代码以及测试用例:消费者@RabbitListener(queues="text.q......
  • Zipkin链路监控实践
    Zipkin是一种开源的分布式链路追踪系统,可以用于监控和跟踪微服务架构中的请求调用链。它可以帮助定位和解决分布式系统中的延迟问题,提供对请求的可视化跟踪和监控。一、引入依赖      使用项目来启动Zipkin,创建一个springbootweb项目添加zipkin依赖1<groupId>co......