首页 > 其他分享 >springboot利用Condition机制解决Kafka监听不到服务时报错的问题

springboot利用Condition机制解决Kafka监听不到服务时报错的问题

时间:2022-12-26 11:33:58浏览次数:43  
标签:springboot springframework kafka org import Kafka public Condition

一般情况下,我们在写springboot使用Kafka监听的代码时,都是直接写个类,然后在方法上加个@KafkaListener就可以了,简单省事。

就像下面这样

@Component
@Slf4j
public class KafkaConsumer {

    @Autowired
    private KafkaCustomProperties kafkaCustomProperties;

    @KafkaListener(topics = {"#{@kafkaCustomProperties.topic}"}, groupId = "#{@kafkaCustomProperties.groupId}")
    public void listen(String message) {
        log.info("接收到的消息:{}", message);
        // do something...

    }

}

这样做其实是没问题的,但有的时候我们的kafka服务会莫名其妙停掉,然后就一直报监听不到Kafka服务的错误信息,又不想改代码,就可以使用spring的Condition机制,在启动springboot服务时,先判断一下能不能连上Kafka服务,如果连不上,就不注入KafkaConsumer类。

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.context.annotation.Condition;
import org.springframework.context.annotation.ConditionContext;
import org.springframework.core.env.Environment;
import org.springframework.core.type.AnnotatedTypeMetadata;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.URI;

/**
 * @Author: 夏威夷8080
 * @Date: 2011/6/7 9:38
 */
@Slf4j
public class KafkaConnectCondition implements Condition {
    @Override
    public boolean matches(ConditionContext context, AnnotatedTypeMetadata metadata) {
        Environment environment = context.getEnvironment();
        String kafkaServers = environment.getProperty("spring.kafka.consumer.bootstrap-servers");
        log.info("获取到的kafkaServers:{}", kafkaServers);
        if (StringUtils.isBlank(kafkaServers)){
            return false;
        }

        String serverPort = kafkaServers.split(",")[0];
        URI uri = URI.create("http://" + serverPort);

        return this.isConnectable(uri.getHost(), uri.getPort());
    }

    /**
     * 判断kafka服务能否正常连接
     * @param host
     * @param port
     * @return
     */
    private boolean isConnectable(String host, int port) {
        boolean result = true;
        Socket socket = new Socket();
        try {
            socket.connect(new InetSocketAddress(host, port),3000);
        } catch (IOException e) {
            log.error("========注意!!!!!未能连接上kafka服务,意味着kafka监听将不开启,{}:{},{}", host, port, e.getMessage());
            result = false;
        } finally {
            try {
                socket.close();
            } catch (IOException e) {
                log.error("关闭kafka服务socket出错,{}:{},{}", host, port, e.getMessage());
                result = false;
            }
        }
        log.info("========kafka服务能正常连接========");
        return result;
    }
}

在KafkaConsumer类上加上@Conditional(KafkaConnectCondition.class)就可以了。

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Conditional;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

/**
 * @Author: 夏威夷8080
 * @Date: 2011/5/11 19:33
 */
@Component
@Slf4j
@Conditional(KafkaConnectCondition.class)
public class KafkaConsumer {

    @Autowired
    private SyncHandlerFactory syncHandlerFactory;

    @Autowired
    private KafkaCustomProperties kafkaCustomProperties;

    // 先屏蔽监听,后面再放开
    @KafkaListener(topics = {"#{@kafkaCustomProperties.topic}"}, groupId = "#{@kafkaCustomProperties.groupId}")
    public void listen(String message) {
        log.info("接收到的消息:{}", message);
        // do something...

    }
}

这样修改之后,每次在启动springboot服务时都会检查下Kafka能不能正常连接上,相当于做一个容错的处理吧。

标签:springboot,springframework,kafka,org,import,Kafka,public,Condition
From: https://www.cnblogs.com/shamo89/p/17005325.html

相关文章

  • SpringBoot2.x系列教程汇总-从入门到精通
    因为N没有分类归纳博客的功能,所以特写本帖汇总SpringBoot2.x系列教程,方便大家查阅!本套案例源码地址:https://gitee.com/sunyiyi/SpringBoot-demos​​SpringBoot2.x系列教程......
  • springboot 连接不上 redis 的三种解决方案!
    针对于这种情况,首先,我们最简单直接的方法就是需要确认Redis是否已经正常启动(验证方法:如果安装在Linux下的话可以使用ps-ef|grepredis来进行确认是否开启) 如果未开启,我......
  • 笑死,面试官又问我SpringBoot自动配置原理
    面试官:好久没见,甚是想念。今天来聊聊SpringBoot的自动配置吧?候选者:嗯,SpringBoot的自动配置我觉得是SpringBoot很重要的“特性”了。众所周知,SpringBoot有着“约定大于配置......
  • SpringBoot的Maven项目使用SystemPath引用本地jar
    对于本地jar的maven引用,在不方便使用私有maven仓库的情况下,使用SystemPath方式引用还是比较合适的,这里以uid-generator-1.0.0-SNAPSHOT.jar这个本地包为例。1.将打好的包拷......
  • ReentrantLock Condition await signal 专题
     Condition的执行方式,是当在线程T1中调用await方法后,线程T1将释放锁,并且将自己阻塞,等待唤醒,线程T2获取到锁后,开始做事,完毕后,调用Condition的signal方法,唤醒线程T1,在t2执行......
  • springboot运行jar包报 "XXX中没有主清单属性"
    报错原因:打包后的jar文件中的MANIFEST.MF缺少项目启动项,即没有Main-Class解决:在项目pom.xml文件中添加插件spring-boot-maven-plugin:<build><plugins><plugin>......
  • SpringBoot加载相关注解
    springBoot加载@Configuration表明该类是一个配置类常常配合@Bean使用,让容器管理对象@Configuration(proxyBeanMethods=true)proxyBeanMethods=true表示@Configura......
  • springboot 缓存介绍,缓存注解和常见问题
    spring缓存    spring框架对缓存服务进行了抽象,提供了缓存增删查改等功能。但需要实现一个具体的数据存储实体。   缓存与缓冲区    缓存是无感......
  • springboot 使用redis和lettuce原理
    springboot使用redis  简介   在SpringBoot中,要访问Redis,可以直接引入spring-boot-starter-data-redis依赖,它实际上是SpringData的一个子项目——SpringDat......
  • WebService简单教学??SpringBoot整合CXF的快速入门??CXF发布Rest服务
    目录​##springboot整合CXF的快速入门##​​一,服务端提供webservice服务​​​1,实体类User​​​​2,webservice接口​​​​3,webservice接口的实现类​​​​4,CXF配置类​​......