首页 > 其他分享 >SpringBoot整合Kafka

SpringBoot整合Kafka

时间:2023-07-01 20:55:52浏览次数:49  
标签:SpringBoot springframework kafka value 整合 import org docker Kafka

1、安装kafka

这里我是用的是docker-compose方式安装

(1) 安装docker和docker-compose
sudo yum install -y yum-utils

sudo yum-config-manager \
    --add-repo \
    https://download.docker.com/linux/centos/docker-ce.repo

sudo yum install docker-ce docker-ce-cli containerd.io docker-buildx-plugin docker-compose-plugin

sudo systemctl enable docker --now

#测试工作
docker ps
#  批量安装所有软件
docker compose
(2) docker-compose.yml
version: '3.9'

services:

  zookeeper:
    image: bitnami/zookeeper:latest
    container_name: zookeeper
    restart: always
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    networks:
      - backend

  kafka:
    image: bitnami/kafka:3.4.0
    container_name: kafka
    restart: always
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      ALLOW_PLAINTEXT_LISTENER: yes
      KAFKA_CFG_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    networks:
      - backend
  # kafka-ui界面
  kafka-ui:
    image: provectuslabs/kafka-ui:latest
    container_name:  kafka-ui
    restart: always
    depends_on:
      - kafka
    ports:
      - "8080:8080"
    environment:
      KAFKA_CLUSTERS_0_NAME: dev
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
    networks:
      - backend

networks:
  backend:
    name: backend
(3) 一键启动
docker compose -f docker-compose.yml up -d

image

2、SpringBoot整合Kafka

(1) 基础环境搭建

image

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.ly</groupId>
    <artifactId>springboot-kafka</artifactId>
    <version>1.0-SNAPSHOT</version>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>3.1.0</version>
    </parent>

    <properties>
        <maven.compiler.source>17</maven.compiler.source>
        <maven.compiler.target>17</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.28</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>
    </dependencies>
</project>

配置文件

server:
  port: 9000
spring:
  kafka:
    bootstrap-servers:
      - 192.168.126.159:9092
    producer:
      # key的序列化方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      # value的序列化方式
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
(2) 发消息

KafkaTest

package com.ly;

import com.ly.message.Person;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.util.StopWatch;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

/**
 * @author ly (个人博客:https://www.cnblogs.com/ybbit)
 * @date 2023-07-01  14:03
 * @tags 喜欢就去努力的争取
 */
@SpringBootTest
public class KafkaTest {

    @Autowired
    private KafkaTemplate kafkaTemplate;

    /**
     * 发送字符串
     */
    @Test
    public void testSend() {
        // 开启计时
        StopWatch stopWatch = new StopWatch();

        CompletableFuture[] futures = new CompletableFuture[10000];

        stopWatch.start();
        for (int i = 0; i < 10000; i++) {
            CompletableFuture future = kafkaTemplate.send("news", "ly-" + i, "我的宝" + i);
            futures[i] = future;
        }
        CompletableFuture.allOf(futures).join(); // join
        stopWatch.stop();
        long time = stopWatch.getTotalTimeMillis();
        System.out.println("耗费的时间为:" + time + "ms ");
    }

    /**
     * 发送对象:
     *      这里需要配置value的序列化方式,默认是StringSerializer, 会出现报错
     * @throws ExecutionException
     * @throws InterruptedException
     */
    @Test
    public void sendObject() throws ExecutionException, InterruptedException {
        CompletableFuture future = kafkaTemplate.send("news2", "obj-1", new Person("ly", 25));
        System.out.println("future.get() = " + future.get());
    }

}

注意:这里启动会报一个错,找不到对应的kafka主机
image

解决办法:修改hosts文件(一定要以管理员方式修改)
image

提示:hosts文件所在位置

C:\Windows\System32\drivers\etc

测试结果
image

(3) 消费消息

LyKafkaListener

package com.ly.listener;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.PartitionOffset;
import org.springframework.kafka.annotation.TopicPartition;
import org.springframework.stereotype.Component;

/**
 * @author ly (个人博客:https://www.cnblogs.com/ybbit)
 * @date 2023-07-01  14:57
 * @tags 喜欢就去努力的争取
 */
@Component
public class LyKafkaListener {

    /**
     * 默认监听是和消息队列的最后一个消息开始
     *
     * @param consumerRecord
     */
    @KafkaListener(topics = "nows", groupId = "ly")
    public void listener1(ConsumerRecord consumerRecord) {
        String topic = consumerRecord.topic();
        System.out.println("其他信息 ============ topic = " + topic);

        Object key = consumerRecord.key();
        Object value = consumerRecord.value();
        System.out.println("listener1 接收消息 ============ key =  value :" + key + "=" + value);
    }

    /**
     * 获取完整消息
     */
    @KafkaListener(groupId = "baobao", topicPartitions = {
            @TopicPartition(topic = "nows",
                    partitionOffsets = {
                            @PartitionOffset(partition = "0", initialOffset = "0")
                    })
    })
    public void listener2(ConsumerRecord consumerRecord) {
        Object key = consumerRecord.key();
        Object value = consumerRecord.value();
        System.out.println("listener2 接收消息 ============ key =  value :" + key + "=" + value);
    }
}

注意:需要开启Kafka注解驱动功能
image

测试结果
image

更多详细的用法参考SpringBoot整合Kafka官方文档

标签:SpringBoot,springframework,kafka,value,整合,import,org,docker,Kafka
From: https://www.cnblogs.com/ybbit/p/17519906.html

相关文章

  • 18、【SparkStreaming】object not serializable (class: org.apache.kafka.clients.c
    背景:当SparkStream连接kafka,消费数据时,报错:objectnotserializable(class:org.apache.kafka.clients.consumer.ConsumerRecord,value:ConsumerRecord分析:消费者的消费记录序列化出现了问题,需要正确的进行序列化。措施:在设置sparkconf的时候,指定序列化方式就可以解......
  • SpringBoot3.0最新深入浅出从入门到项目实战,突出Web应用痛点解决方案
    SpringBoot3.0最新深入浅出从入门到项目实战,突出Web应用痛点解决方案SpringBoot已经成为Java开发中最流行的框架之一,它提供了一种快速构建、易于扩展的方式,使开发人员能够更加专注于业务逻辑而不是繁琐的配置。而最新的SpringBoot3.0版本将进一步改善开发体验,并提供更多的解决方......
  • SpringBoot 如何优雅的进行全局异常处理?
    在SpringBoot的开发中,为了提高程序运行的鲁棒性,我们经常需要对各种程序异常进行处理,但是如果在每个出异常的地方进行单独处理的话,这会引入大量业务不相关的异常处理代码,增加了程序的耦合,同时未来想改变异常的处理逻辑,也变得比较困难。这篇文章带大家了解一下如何优雅的进行全局异......
  • 绝无仅有的SpringBoot前后端分离项目《盈利宝》
    每天都在制造矛盾并解决矛盾的路上程序员的主要矛盾不是书和资料多不多的矛盾而是学着学着发现知识又更新了时间就像一台永不停歇的永动机向前不停地运作年初的flag渐行渐远管他前浪,还是后浪?能浪的浪,才是好浪!今天带你解锁 Springboot+Vue项目花了很长时间从头到尾从无到有地完......
  • 绝无仅有的SpringBoot+Vue前后端分离项目《盈利宝》
    ​每天都在制造矛盾并解决矛盾的路上程序员的主要矛盾不是书和资料多不多的矛盾而是学着学着发现知识又更新了时间就像一台永不停歇的永动机向前不停地运作年初的flag渐行渐远管他前浪,还是后浪?能浪的浪,才是好浪!今天带你解锁 Springboot+Vue项目花了很长时间从头到尾......
  • 【springboot】最初入门
    SpringBoot不需要配置容器,是因为使用了嵌入式容器,默认使用tomcat启动,默认端口8080。当然,用传统的方式打成war包,放入单独的tomcat也是可以的。SpringBoot项目使用main函数启动,一般放在XXXApplication类里,需要加@SpringBootApplication注解MavenWrapper可以不需要提前下载好Mave......
  • springboot整合mongodb
    文章目录自己的源码新建项目新建测试类基于MongoRepository(推荐)(个人推荐,简单,方便)基于Respository测试类:(了解)基于MongoTemplate的测试类自己的源码https://gitee.com/stackR/springboot-mongodb新建项目新建springboot项目,引入,spring-boot-starter-data-mongodb和lombo......
  • mybatis-plus springboot无法创建bean
    在学习尚硅谷的mybatis-plus中,发现依托代码无法创建userMapperbean,在网上找了各种办法,终于是找到了一个大无语的办法。只要将springboot的版本主动降到2.x.x之后便可以创建。可能mp是国人写的目前对springboot3.x.x并没有做适配,但是更进一步的方法正在思想中。packagecom.exam......
  • 白话Kafka
    一、Kafka基础消息系统的作用应该大部份小伙伴都清楚,用机油装箱举个例子所以消息系统就是如上图我们所说的仓库,能在中间过程作为缓存,并且实现解耦合的作用。引入一个场景,我们知道中国移动,中国联通,中国电信的日志处理,是交给外包去做大数据分析的,假设现在它们的日志都交给了你做的系......
  • 白话Kafka
     一、Kafka基础消息系统的作用应该大部份小伙伴都清楚,用机油装箱举个例子 所以消息系统就是如上图我们所说的仓库,能在中间过程作为缓存,并且实现解耦合的作用。引入一个场景,我们知道中国移动,中国联通,中国电信的日志处理,是交给外包去做大数据分析的,假设现在它们的日志都交......