一般情况下,我们在写springboot使用Kafka监听的代码时,都是直接写个类,然后在方法上加个@KafkaListener就可以了,简单省事。
就像下面这样
@Component @Slf4j public class KafkaConsumer { @Autowired private KafkaCustomProperties kafkaCustomProperties; @KafkaListener(topics = {"#{@kafkaCustomProperties.topic}"}, groupId = "#{@kafkaCustomProperties.groupId}") public void listen(String message) { log.info("接收到的消息:{}", message); // do something... } }
这样做其实是没问题的,但有的时候我们的kafka服务会莫名其妙停掉,然后就一直报监听不到Kafka服务的错误信息,又不想改代码,就可以使用spring的Condition机制,在启动springboot服务时,先判断一下能不能连上Kafka服务,如果连不上,就不注入KafkaConsumer类。
import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.context.annotation.Condition; import org.springframework.context.annotation.ConditionContext; import org.springframework.core.env.Environment; import org.springframework.core.type.AnnotatedTypeMetadata; import java.io.IOException; import java.net.InetSocketAddress; import java.net.Socket; import java.net.URI; /** * @Author: 夏威夷8080 * @Date: 2011/6/7 9:38 */ @Slf4j public class KafkaConnectCondition implements Condition { @Override public boolean matches(ConditionContext context, AnnotatedTypeMetadata metadata) { Environment environment = context.getEnvironment(); String kafkaServers = environment.getProperty("spring.kafka.consumer.bootstrap-servers"); log.info("获取到的kafkaServers:{}", kafkaServers); if (StringUtils.isBlank(kafkaServers)){ return false; } String serverPort = kafkaServers.split(",")[0]; URI uri = URI.create("http://" + serverPort); return this.isConnectable(uri.getHost(), uri.getPort()); } /** * 判断kafka服务能否正常连接 * @param host * @param port * @return */ private boolean isConnectable(String host, int port) { boolean result = true; Socket socket = new Socket(); try { socket.connect(new InetSocketAddress(host, port),3000); } catch (IOException e) { log.error("========注意!!!!!未能连接上kafka服务,意味着kafka监听将不开启,{}:{},{}", host, port, e.getMessage()); result = false; } finally { try { socket.close(); } catch (IOException e) { log.error("关闭kafka服务socket出错,{}:{},{}", host, port, e.getMessage()); result = false; } } log.info("========kafka服务能正常连接========"); return result; } }
在KafkaConsumer类上加上@Conditional(KafkaConnectCondition.class)就可以了。
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Conditional; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; /** * @Author: 夏威夷8080 * @Date: 2011/5/11 19:33 */ @Component @Slf4j @Conditional(KafkaConnectCondition.class) public class KafkaConsumer { @Autowired private SyncHandlerFactory syncHandlerFactory; @Autowired private KafkaCustomProperties kafkaCustomProperties; // 先屏蔽监听,后面再放开 @KafkaListener(topics = {"#{@kafkaCustomProperties.topic}"}, groupId = "#{@kafkaCustomProperties.groupId}") public void listen(String message) { log.info("接收到的消息:{}", message); // do something... } }
这样修改之后,每次在启动springboot服务时都会检查下Kafka能不能正常连接上,相当于做一个容错的处理吧。
标签:springboot,springframework,kafka,org,import,Kafka,public,Condition From: https://www.cnblogs.com/shamo89/p/17005325.html