首页 > 其他分享 >第三章 Spring Boot 整合 Kafka消息队列 消息者

第三章 Spring Boot 整合 Kafka消息队列 消息者

时间:2024-01-23 13:24:32浏览次数:29  
标签:Spring Boot private kafka com org import Kafka consumer

 前言

        Kafka 是一个消息队列产品,基于Topic partitions的设计,能达到非常高的消息发送处理性能。本文主是基于Spirng Boot封装了Apache 的Kafka-client,用于在Spring Boot 项目里快速集成kafka。


 

一、Kafka 是什么?

Apache Kafka是分布式发布-订阅消息系统。
它最初由LinkedIn公司开发,之后成为Apache项目的一部分。
Kafka是一种快速、可扩展的、设计内在就是分布式的,分区的和可复制的提交日志服务。

二、消息者

 1.引入库

引入需要依赖的jar包,引入POM文件

    <dependencies>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
        </dependency>
    </dependencies>

 

2.配置文件

spring:
  custom:
    kafka:
      username: admin
      password: admin-secret
      partitions: 1
      enable-auto-commit: false
      topics: CHANNEL-BodyBusiness-dataDev,CHANNEL-BodyBusiness-pushDev
      groupId: consumer-group-lms-dev
      batch-listener: false
      bootstrap-servers:
        - 192.168.1.95:9092

3.端启动类

启动类名 EnableAutoKafkaClient
package com.cdkjframework.kafka.consumer.annotation;

import com.cdkjframework.kafka.consumer.config.KafkaClientMarkerConfiguration;
import org.springframework.context.annotation.Import;

import java.lang.annotation.*;

/**
 * @ProjectName: cdkj-framework
 * @Package: com.cdkjframework.kafka.consumer.annotation
 * @ClassName: EnableAutoKafkaClient
 * @Description: Kafka客户端自动启动类
 * @Author: xiaLin
 * @Date: 2023/7/18 9:20
 * @Version: 1.0
 */
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import({KafkaClientMarkerConfiguration.class})
public @interface EnableAutoKafkaClient {
}



4.spring.factories配置文件

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
  com.cdkjframework.kafka.consumer.config.KafkaClientAutoConfiguration

5.配置类

5.1 Kafka客户端配置

package com.cdkjframework.kafka.consumer.config;

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;

import java.util.List;

/**
 * @ProjectName: cdkj-framework
 * @Package: com.cdkjframework.kafka.consumer.config
 * @ClassName: KafkaClientConfig
 * @Description: Kafka客户端配置
 * @Author: xiaLin
 * @Version: 1.0
 */
@Data
@Configuration
@ConfigurationProperties(prefix = "spring.custom.kafka")
public class KafkaClientConfig {

  /**
   * 服务列表
   */
  private List<String> bootstrapServers;

  /**
   * 主题
   */
  private List<String> topics;

  /**
   * 账号
   */
  private String username;

  /**
   * 密码
   */
  private String password;

  /**
   * 延迟为1毫秒
   */
  private Integer linger = 1;

  /**
   * 批量大小
   */
  private Integer batchSize = 16384;

  /**
   * 重试次数,0为不启用重试机制
   */
  private Integer retries = 0;

  /**
   * 人锁
   */
  private Integer maxBlock = 6000;

  /**
   * acks
   */
  private String acks = "1";

  /**
   * security.providers
   */
  private String securityProviders;

  /**
   * 启用自动提交
   */
  private boolean enableAutoCommit = true;

  /**
   * 会话超时
   */
  private String sessionTimeout = "5000";

  /**
   * 会话超时
   */
  private Integer maxPollInterval = 10000;

  /**
   * 组ID
   */
  private String groupId = "defaultGroup";

  /**
   * 最大投票记录
   */
  private Integer maxPollRecords = 1;

  /**
   * 并发性
   */
  private Integer concurrency = 3;

  /**
   * 拉取超时时间
   */
  private Integer pollTimeout = 60000;

  /**
   * 批量监听
   */
  private boolean batchListener = false;

  /**
   * 副本数量
   */
  private String sort = "1";

  /**
   * 分区数
   */
  private Integer partitions = 3;

  /**
   * 消费者默认支持解压
   */
  private String compressionType = "none";

  /**
   * offset偏移量规则设置
   */
  private String autoOffsetReset = "earliest";

  /**
   * 自动提交的频率
   */
  private Integer autoCommitInterval = 100;

  /**
   * 生产者可以使用的总内存字节来缓冲等待发送到服务器的记录
   */
  private Integer bufferMemory = 33554432;

  /**
   * 消息的最大大小限制
   */
  private Integer maxRequestSize = 1048576;
}

5.2 Kafka客户端自动配置

package com.cdkjframework.kafka.consumer.config;

import com.cdkjframework.kafka.consumer.ConsumerConfiguration;
import com.cdkjframework.kafka.consumer.service.ConsumerService;
import com.cdkjframework.kafka.consumer.listener.ConsumerListener;
import lombok.RequiredArgsConstructor;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.ImportAutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.web.reactive.function.client.WebClientAutoConfiguration;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Lazy;

/**
 * @ProjectName: cdkj-framework
 * @Package: com.cdkjframework.kafka.consumer.config
 * @ClassName: KafkaClientAutoConfiguration
 * @Description: Kafka客户端自动配置
 * @Author: xiaLin
 * @Date: 2023/7/18 9:21
 * @Version: 1.0
 */
@Lazy(false)
@RequiredArgsConstructor
@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties(KafkaClientConfig.class)
@AutoConfigureAfter({WebClientAutoConfiguration.class})
@ImportAutoConfiguration(ConsumerConfiguration.class)
@ConditionalOnBean(KafkaClientMarkerConfiguration.Marker.class)
public class KafkaClientAutoConfiguration {

  /**
   * 消费者服务接口
   */
  private final ConsumerService consumerService;

  /**
   * kafka topic 启动触发器
   *
   * @return 返回结果
   */
  @Bean
  @ConditionalOnMissingBean
  public ConsumerListener kafkaConsumer() {
    return new ConsumerListener(consumerService);
  }
}



5.3 Kafka客户端标记配置

package com.cdkjframework.kafka.consumer.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;

/**
 * @ProjectName: cdkj-framework
 * @Package: com.cdkjframework.kafka.consumer.config
 * @ClassName: KafkaClientMarkerConfiguration
 * @Description: Kafka客户端标记配置
 * @Author: xiaLin
 * @Date: 2023/12/6 13:11
 * @Version: 1.0
 */
@EnableKafka
@Configuration(proxyBeanMethods = false)
public class KafkaClientMarkerConfiguration {

  @Bean
  public Marker kafkaMarker() {
    return new Marker();
  }

  public static class Marker {

  }
}



6.消费者配置

package com.cdkjframework.kafka.consumer;

import com.cdkjframework.kafka.consumer.config.KafkaClientConfig;
import com.cdkjframework.util.log.LogUtils;
import com.cdkjframework.util.tool.JsonUtils;
import com.cdkjframework.util.tool.StringUtils;
import lombok.RequiredArgsConstructor;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.BatchErrorHandler;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ConsumerAwareErrorHandler;

import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;

/**
 * @ProjectName: cdkj-framework
 * @Package: com.cdkjframework.kafka.consumer
 * @ClassName: ProducerConfiguration
 * @Description: 设置@Configuration、@EnableKafka两个注解,声明Config并且打开KafkaTemplate能力。
 * @Author: xiaLin
 * @Version: 1.0
 */
@Configuration
@RequiredArgsConstructor
public class ConsumerConfiguration {

  /**
   * 日志
   */
  private final LogUtils logUtils = LogUtils.getLogger(ConsumerConfiguration.class);

  /**
   * JAAS配置
   */
  private String JAAS_CONFIG = "org.apache.kafka.common.security.plain.PlainLoginModule required username=%s password=%s;";

  /**
   * 配置
   */
  private final KafkaClientConfig kafkaClientConfig;

  /**
   * 监听容器工厂
   *
   * @return 返回结果
   */
  @Bean(name = "kafkaListenerContainerFactory")
  public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String>
            factory = new ConcurrentKafkaListenerContainerFactory<>();
    // 设置消费者工厂
    factory.setConsumerFactory(consumerFactory());
    // 消费者组中线程数量
    factory.setConcurrency(kafkaClientConfig.getConcurrency());
    // 拉取超时时间
    factory.getContainerProperties().setPollTimeout(kafkaClientConfig.getPollTimeout());
    // 当使用批量监听器时需要设置为true
    factory.setBatchListener(kafkaClientConfig.isBatchListener());
    // 将单条消息异常处理器添加到参数中
    factory.setErrorHandler(new ConsumerAwareErrorHandler() {
      @Override
      public void handle(Exception thrownException, ConsumerRecord<?, ?> data, Consumer<?, ?> consumer) {
        logUtils.error("// 将单条消息异常:" + thrownException.getMessage());
        logUtils.error("// 将单条消息:" + data.toString() + "," + consumer.toString());
        Iterator<TopicPartition> iterator = consumer.assignment().iterator();
        if (iterator.hasNext()) {
          // 提交重新消费
          consumer.seek(iterator.next(), data.offset());
        }
      }
    });
    if (kafkaClientConfig.isBatchListener()) {
      // 将批量消息异常处理器添加到参数中
      factory.setBatchErrorHandler(new BatchErrorHandler() {
        @Override
        public void handle(Exception thrownException, ConsumerRecords<?, ?> data) {
          logUtils.error("// 将批量消息异常:" + thrownException.getMessage());
          logUtils.error(thrownException);
          logUtils.error(JsonUtils.objectToJsonString(data));
        }
      });
    }
//        factory.setContainerCustomizer();

    return factory;
  }

  /**
   * 消费者工厂
   *
   * @return 返回消费工厂
   */
  @Bean
  public ConsumerFactory<String, String> consumerFactory() {
    return new DefaultKafkaConsumerFactory<>(consumerConfig());
  }

  /**
   * 消费者配置
   *
   * @return 返回结果
   */
  @Bean
  public Map<String, Object> consumerConfig() {
    Map<String, Object> propsMap = new HashMap<>();
    // Kafka地址
    propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaClientConfig.getBootstrapServers());
    //配置默认分组,这里没有配置+在监听的地方没有设置groupId,多个服务会出现收到相同消息情况
    propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaClientConfig.getGroupId());
    // 是否自动提交offset偏移量(默认true)
    propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, kafkaClientConfig.isEnableAutoCommit());
    // 心跳机制
    propsMap.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, kafkaClientConfig.getMaxPollInterval());
    // 每次读取最大记录
    propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, kafkaClientConfig.getMaxPollRecords());
    if (kafkaClientConfig.isEnableAutoCommit()) {
      // 自动提交的频率(ms)
      propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, kafkaClientConfig.getAutoCommitInterval());
    }
    // 键的反序列化方式
    propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    // 值的反序列化方式
    propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    // offset偏移量规则设置:
    // (1)、earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
    // (2)、latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
    // (3)、none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
    propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, kafkaClientConfig.getAutoOffsetReset());

    // 安全认证 账号密码
    if (StringUtils.isNotNullAndEmpty(kafkaClientConfig.getUsername()) &&
            StringUtils.isNotNullAndEmpty(kafkaClientConfig.getPassword())) {
      propsMap.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_PLAINTEXT.name);
      String SASL_MECHANISM = "PLAIN";
      propsMap.put(SaslConfigs.SASL_MECHANISM, SASL_MECHANISM);
      propsMap.put(SaslConfigs.SASL_JAAS_CONFIG, String.format(JAAS_CONFIG, kafkaClientConfig.getUsername(), kafkaClientConfig.getPassword()));
    }

    return propsMap;
  }
}

 

7.消费者服务

package com.cdkjframework.kafka.consumer.service;

import com.cdkjframework.exceptions.GlobalException;

/**
 * @ProjectName: cdkj-framework
 * @Package: com.cdkjframework.kafka.consumer
 * @ClassName: com.cdkjframework.kafka.consumer.service.ConsumerService
 * @Description: 消费者服务
 * @Author: xiaLin
 * @Version: 1.0
 */
public interface ConsumerService {

    /**
     * 消息内容
     *
     * @param topics  主题
     * @param message 内容
     * @throws GlobalException 异常信息
     */
    void onMessage(String topics, String message) throws GlobalException;
}

8.消费者监听器

package com.cdkjframework.kafka.consumer.listener;

import com.cdkjframework.kafka.consumer.service.ConsumerService;
import com.cdkjframework.util.log.LogUtils;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;

/**
 * @ProjectName: cdkj-kafka-client
 * @Package: com.cdkjframework.kafka.consumer
 * @ClassName: com.cdkjframework.kafka.consumer.listener.ConsumerListener
 * @Description: 消费者监听器
 * @Author: xiaLin
 * @Version: 1.0
 */
public class ConsumerListener {

    /**
     * 日志
     */
    private final LogUtils logUtils = LogUtils.getLogger(ConsumerListener.class);

    /**
     * 消费者服务接口
     */
    private final ConsumerService consumerService;

    /**
     * 构造函数
     *
     * @param consumerService 消费者服务接口
     */
    public ConsumerListener(ConsumerService consumerService) {
        this.consumerService = consumerService;
    }

    /**
     * 单条监听MQ消息
     *
     * @param data     消息内容
     * @param consumer 消息者
     */
    @KafkaListener(topics = "#{'${spring.custom.kafka.topics}'.split(',')}", groupId = "${spring.custom.kafka.groupId}")
    public void listener(ConsumerRecord<String, String> data, Consumer consumer) {
        try {
            consumerService.onMessage(data.topic(), data.value());
            consumer.commitAsync();
        } catch (Exception e) {
            logUtils.error(e);
            // 抛出异常,以重试消费
            throw new RuntimeException(e.getMessage());
        }
    }
}

 


总结

 例如:以上就是今天要讲的内容,本文仅仅简单介绍了 Spring Boot 集成消息消费者的封装。

相对应的开源项目欢迎访问:维基框架

标签:Spring,Boot,private,kafka,com,org,import,Kafka,consumer
From: https://www.cnblogs.com/cdkj/p/17982232

相关文章

  • spring学习笔记
    目录IoC概念DI(依赖注入)SpringDemo项目新建maven项目加入依赖定义类:接口和实现类Spring的配置文件Spring容器创建对象使用容器中的对象问题1:spring创建对象,调用是类的那个方法问题2:spring是在什么时候创建对象问题3:spring容器创建对象,一次创建几个获取容器中对象的信息spri......
  • Spring RestTemplate redirect 302
     TheredirectionisfollowedautomaticallyiftherequestisaGETrequest(see thisanswer).TomakeithappenonPOSTrequests,oneoptionmightbetouseadifferentrequestfactory,like HttpComponentsClientHttpRequestFactory,andsetittousean Ht......
  • 关于springboot 域认证
    最近项目,客户要求实现域认证,然后登录。网上资料自己整理一下,以备后续使用;springboot域认证,我采用的是ldap方式认证。1.引入插件:<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-ldap</artifactId......
  • shardingsphere springboot application.yml配置
    shardingsphere springbootapplication.yml配置 spring:sharding-sphere:datasource:names:mastermaster:type:com.zaxxer.hikari.HikariDataSourcedriver-class-name:com.mysql.cj.jdbc.Driverjdbc-url:jdbc:mysql:......
  • springboot+mybtais+mysql
    一、通过maven引入相应的包pom.xml<?xmlversion="1.0"encoding="UTF-8"?><projectxmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http......
  • Springboot学习第二天
    今天的学习内容是如何在项目中设计统一响应接口返回值,达到统一的格式1.响应实体我们首先要定义一个公共的接口响应实体,以后所有的接口返回值,都是返回的这个公共响应实体。这样做的好处是可以统一返回值的风格,编译接口的维护。需要包含3个关键的成员变量:状态码返回信息数......
  • SpringMVC - 谈谈你对SpringMVC的理解
     谈谈你对SpringMVC的理解?普通人:SpringMVC它是一个MVC框架吧,就是,我们可以使用SpringMVC来开发Web应用...呃它是基于Servlet上的一个扩展,就是它里面我记得好像有一个核心控制器,叫DispatcherServlet,然后扩展了之后,就是所有请求都会经过那个...DispatcherServlet然后再做一......
  • 在 SpringBoot 项目中使用 Mybatis 打印 SQL 日志
    前言我们在项目中使用的持久层框架大部分都是mybatis,如果在日志中能打印sql的话,对于我们排查问题会更加方便。第一种方式:修改mybatis配置修改配置mybatis:configuration:log-impl:org.apache.ibatis.logging.slf4j.Slf4jImpllogging:level:com.imooc.p......
  • spring自动装配的原理解析
    前言学习SpringBoot,绝对避不开自动装配这个概念,这也是SpringBoot的关键之一本人也是SpringBoot的初学者,下面的一些总结都是结合个人理解和实践得出的,如果有错误或者疏漏,请一定一定一定(不是欢迎,是一定)帮我指出,在评论区回复即可,一起学习!篇幅较长,希望你可以有耐心.如果只关心SpringBoo......
  • Zookeeper集群 +Kafka集群 之(Kafka集群)
    Kafka集群 消息队列(中间件)类型与特性 #Kafka概述#为什么需要消息队列(MQ)主要原因是由于在高并发环境下,同步请求来不及处理,请求往往会发生阻塞。比如大量的请求并发访问数据库,导致行锁表锁,最后请求线程会堆积过多,从而触发toomanyconnection错误,引发雪崩效应。我......