首页 > 其他分享 >kafka配置-代码配置篇

kafka配置-代码配置篇

时间:2023-11-08 10:25:44浏览次数:37  
标签:ProducerConfig 代码 配置 kafka class props put CONFIG public

KafkaProducerConfig

@Configuration
@EnableKafka
public class KafkaProducerConfig {

    /**
     * Producer Template 配置
     */
    @Bean(name = "kafkaTemplate")
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    /**
     * Producer 工厂配置
     */
    @Bean
    public ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    /**
     * Producer 参数配置
     */
    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        // 指定多个kafka集群多个地址
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaConstants.bootstrapServers);

        // 重试次数,0为不启用重试机制
        props.put(ProducerConfig.RETRIES_CONFIG, 0);
        //同步到副本, 默认为1
        // acks=0 把消息发送到kafka就认为发送成功
        // acks=1 把消息发送到kafka leader分区,并且写入磁盘就认为发送成功
        // acks=all 把消息发送到kafka leader分区,并且leader分区的副本follower对消息进行了同步就任务发送成功
        props.put(ProducerConfig.ACKS_CONFIG, 1);

        // 生产者空间不足时,send()被阻塞的时间,默认60s
        props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 6000);
        // 控制批处理大小,单位为字节
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 4096);
        // 批量发送,延迟为1毫秒,启用该功能能有效减少生产者发送消息次数,从而提高并发量
        props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
        // 生产者可以使用的总内存字节来缓冲等待发送到服务器的记录
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 40960);
        // 消息的最大大小限制,也就是说send的消息大小不能超过这个限制, 默认1048576(1MB)
        props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 1048576);
        // 键的序列化方式
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        // 值的序列化方式
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        // 压缩消息,支持四种类型,分别为:none、lz4、gzip、snappy,默认为none。
        // 消费者默认支持解压,所以压缩设置在生产者,消费者无需设置。
        props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "none");
        return props;
    }

}
KafkaConsumerConfig
 @Configuration
@EnableKafka
public class KafkaConsumerConfig {

    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String>
                factory = new ConcurrentKafkaListenerContainerFactory<>();
        // 设置消费者工厂
        factory.setConsumerFactory(consumerFactory());
        // 消费者组中线程数量
        factory.setConcurrency(3);
        // 拉取超时时间
        factory.getContainerProperties().setPollTimeout(3000);

        // 当使用批量监听器时需要设置为true
        factory.setBatchListener(true);

        return factory;
    }

    @Bean
    public ConsumerFactory consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> propsMap = new HashMap<>();
        // Kafka地址
        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaConstants.bootstrapServers);
        //配置默认分组,这里没有配置+在监听的地方没有设置groupId,多个服务会出现收到相同消息情况
        propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, "defaultGroup");
        // 是否自动提交offset偏移量(默认true)
        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        // 自动提交的频率(ms)
        propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
        // Session超时设置
        propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
        // 键的反序列化方式
        propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        // 值的反序列化方式
        propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        // offset偏移量规则设置:
        // (1)、earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
        // (2)、latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
        // (3)、none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
        propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        return propsMap;
    }

}
单消息消费异常处理器
 @Service
public class ConsumerService {

    private static final Logger log = LoggerFactory.getLogger(ConsumerService.class);

    /**
     * 消息监听器
     * errorHandler 不指定listenErrorHandler的情况,使用全局异常
     */
    @KafkaListener(topics = {"test"}, groupId = "group21", errorHandler = "listenErrorHandler")
    public void listen(String message) {
        log.info(message);
        // 创建异常,触发异常处理器
        throw new NullPointerException("测试错误处理器");
    }

    /**
     * 异常处理器
     */
    @Bean
    public ConsumerAwareListenerErrorHandler listenErrorHandler() {
        return new ConsumerAwareListenerErrorHandler() {

            @Override
            public Object handleError(Message<?> message,
                                      ListenerExecutionFailedException e,
                                      Consumer<?, ?> consumer) {
                log.info("message:" + message.getPayload());
                log.info("exception:" + e.getMessage());
                return null;
            }
        };
    }
}
TopicAdministrator
 @Configuration
public class TopicAdministrator {
    private final TopicConfigurations configurations;
    private final GenericWebApplicationContext context;

    public TopicAdministrator(TopicConfigurations configurations, GenericWebApplicationContext genericContext) {
        this.configurations = configurations;
        this.context = genericContext;
    }

    @PostConstruct
    public void init() {
        // 创建topic
        initializeBeans(configurations.getTopics());
    }

    private void initializeBeans(List<TopicConfigurations.Topic> topics) {
        topics.forEach(t -> context.registerBean(t.name, NewTopic.class, t::toNewTopic));
    }


}
TopicConfigurations
 @Configuration
@ConfigurationProperties(prefix = "kafka")
@Data
@ToString
public class TopicConfigurations {
    private List<Topic> topics;

    @Setter
    @Getter
    @ToString
    static class Topic {
        String name;
        Integer numPartitions = 3;
        Short replicationFactor = 1;

        NewTopic toNewTopic() {
            return new NewTopic(this.name, this.numPartitions, this.replicationFactor);
        }

    }
}

# yml 自动创建kafka topic的集合
kafka:
  topics:
    - name: topic1
      num-partitions: 3
      replication-factor: 1
    - name: topic2
      num-partitions: 1
      replication-factor: 1
    - name: topic3
      num-partitions: 2
      replication-factor: 1

标签:ProducerConfig,代码,配置,kafka,class,props,put,CONFIG,public
From: https://www.cnblogs.com/xxsdnol/p/17816747.html

相关文章

  • kafka配置-yml篇
    spring:kafka:template:#当使用kafkaTemplate的sendDefault方法的时候,使用的是这里配置的topicdefaultTopic:topic-1#partition-num和replication-numKafkaProperties没有提供配置的地方bootstrap-servers:127.0.0.1:9092produ......
  • c# 操作xml配置文件
     Xml配置文件<?xmlversion="1.0"encoding="utf-8"?><root><tokenStrvalue=""/><overTimevalue=""></overTime></root>Helper类//读取stringuri=Enviro......
  • 让你的 bash 命令行像 vim 一样飞:一行代码搞定
    引言你是不是觉得在Bash命令行界面中编辑命令有时候很不方便?尤其是当你需要修改一个长命令的某个部分时,使用方向键来回移动光标简直是一场噩梦。如果你是Vim的忠实用户,那么我有一个好消息要告诉你:你可以把Bash的按键模式设置成Vi模式,让你的命令行体验瞬间飞起来!公众号【厦门......
  • Linux服务器网络配置记录
    Linux服务器网络配置记录材料准备材料数量服务器1显示器1网线2(千兆*1)千兆交换机1插线板1网线连接从路由器LAN口引出网线到交换机任一口,再从交换机剩余任一口引出千兆网线到服务器网线插口1服务器网线插口1插入后有有灯闪烁代表网线连接正常网......
  • matlab贝叶斯隐马尔可夫hmm模型实现|附代码数据
    原文链接:http://tecdat.cn/?p=7973原文出处:拓端数据部落公众号  最近我们被客户要求撰写关于贝叶斯隐马尔可夫hmm的研究报告,包括一些图形和统计输出。贝叶斯隐马尔可夫模型是一种用于分割连续多变量数据的概率模型。该模型将数据解释为一系列隐藏状态生成。每个状态都是重尾......
  • R语言Copula模型分析股票市场板块相关性结构|附代码数据
    原文链接:http://tecdat.cn/?p=25804 原文出处:拓端数据部落公众号  最近我们被客户要求撰写关于Copula的研究报告,包括一些图形和统计输出。这篇文章是关于copulas和重尾的。在全球金融危机之前,许多投资者是多元化的。看看下面这张熟悉的图:黑线是近似正态的。红线代表Cau......
  • 华东师大2023程序设计基础代码
    Lab07递归与函数1.正整数的各位数字之和#include<stdio.h>#include<math.h>intsum(inta);intmain(){inta=0;scanf("%d",&a);printf("%d",sum(a));return0;}intsum(inta){if(a<10){ret......
  • 思科设备 -- 预配置
    用户模式下输入enable进入特权模式在特权模式下输入configureterminal进入全局模式在全局模式下输入lineconsole0进入线路模式在线路模式下输入noexec-timeout关闭登录超时登出在线路模式下输入loggingsynchronous关闭日志打断命令在......
  • 代码随想训练营第二十八天(Python)| 93.复原IP地址 、 78.子集、 90.子集II
    93.复原IP地址1、方法一classSolution:defrestoreIpAddresses(self,s:str)->List[str]:res=[]self.tracebacking(s,0,[],res)returnresdeftracebacking(self,s,start,path,res):ifstart==len(s)andlen(pa......
  • windows+jenkins+pscp发布代码
    1、背景:由于公司开发游戏的小伙用cocos,CocosCreator3.x目前只能在windows下编包,所以只能在windows系统安装jenkins实现。jenkins安装等忽略了,下面提供一些下载地址。https://www.jenkins.io/download/https://www.oracle.com/java/technologies/downloads/#jdk17-windowshttp......