代码运行版本
springboot.version=2.7.7
spring-kafka.version=2.8.11
1 POM
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<!-- springboot依赖管理中有kafka的预管理版本,这里不用写版本就可以-->
<version>2.8.11</version>
</dependency>
2 JavaConfig
KafkaConfiguration.java 生产者消费者配置类
TopicProperties 主题属性类
2.1 KafkaConfiguration.java
@Configuration
public class KafkaConfiguration {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.producer.retries}")
private Integer retries;
@Value("${spring.kafka.producer.batch-size}")
private Integer batchSize;
@Value("${spring.kafka.producer.buffer-memory}")
private Integer bufferMemory;
@Value("${spring.kafka.consumer.group-id}")
private String groupId;
@Value("${spring.kafka.consumer.auto-offset-reset}")
private String autoOffsetReset;
@Value("${spring.kafka.consumer.max-poll-records}")
private Integer maxPollRecords;
@Value("${spring.kafka.consumer.batch.concurrency}")
private Integer batchConcurrency;
@Value("${spring.kafka.consumer.enable-auto-commit}")
private Boolean autoCommit;
@Value("${spring.kafka.consumer.auto-commit-interval}")
private Integer autoCommitInterval;
/**
* 生产者配置信息
*/
@Bean
public Map<String, Object> producerConfigs() {
// 配置Kafka生产者参数
Map<String, Object> props = new HashMap<>();
// 设置消息确认模式为无需等待确认
props.put(ProducerConfig.ACKS_CONFIG, "0");
// 指定Kafka集群的初始连接地址
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
// 设置消息发送失败时的重试次数
props.put(ProducerConfig.RETRIES_CONFIG, retries);
// 设置批次发送的消息大小
props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
// 设置消息在发送缓冲区中等待的最长时间
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
// 设置发送缓冲区的大小
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
// 设置键的序列化方式为字符串序列化
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// 设置值的序列化方式为字符串序列化
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
/**
* 生产者工厂
*/
@Bean
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
/**
* 生产者模板
*/
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
/**
* 消费者配置信息
*/
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
// 消费者组ID
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
// 自动重置偏移量的策略
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
// Kafka服务器初始连接地址
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
// 单次最大拉取记录数
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
// 是否自动提交偏移量
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);
// 消费者会话超时时间
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
// 请求超时时间
props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
// 键的反序列化器
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// 值的反序列化器
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}
/**
* 消费者批量工厂
*/
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> batchFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));
//设置并发量,小于或等于Topic的分区数
factory.setConcurrency(batchConcurrency);
factory.getContainerProperties().setPollTimeout(3000);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
//设置为批量消费,每个批次数量在Kafka配置参数中设置ConsumerConfig.MAX_POLL_RECORDS_CONFIG
factory.setBatchListener(true);
return factory;
}
}
3.2 TopicProperties.java
@ConfigurationProperties(prefix = "spring.kafka.topic")
@Component
@Data
public class TopicProperties {
/**
* testTopic
*/
private String testTopic;
}
3.Producer & Consumer
3.1 发送消息
生产者发送消息,或者直接使用kafkaUtils中的方法
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
...
private boolean sendKafkaMsg(Source source) {
kafkaTemplate.send(topicProperties.getSourceTopic(), JSONObject.toJSONString(source)).addCallback(new CommonListenableFutureCallback());
return true;
}
3.2 消费消息
@Slf4j
@Component
public class SourceTopicListener {
@Autowired
private SourceService sourceService;
@Autowired
private RobotEventBuilder robotEventBuilder;
// 这里的batchFactory是前面配置的bean
@KafkaListener(topics = {"${spring.kafka.topic.source-topic}"}, containerFactory = "batchFactory")
public void consumer(List<ConsumerRecord<String, String>> consumerRecords, Acknowledgment ack) {
List<Source> sources = new ArrayList<>();
for (ConsumerRecord<String, String> record : consumerRecords) {
Source source = JSONObject.parseObject(record.value(), Source.class);
sources.add(source);
}
// 消息确认
ack.acknowledge();
}
}
3.3 消息发送回调
@Slf4j
public class CommonListenableFutureCallback implements ListenableFutureCallback<SendResult<String, String>> {
@Override
public void onFailure(Throwable ex) {
log.error("Kafka 消息发送失败", ex);
}
@Override
public void onSuccess(SendResult<String, String> result) {
log.info("kafka 消息发送成功");
}
}
4 application.yml
# [kafka]
spring:
kafka:
# 指定kafka server的地址,集群配多个,中间,逗号隔开
bootstrap-servers: 100.25.199.94:9092
producer:
# 重试次数
retries: 3
# 批量发送的消息数量
batch-size: 1000
# 32MB的批处理缓冲区
buffer-memory: 33554432
# 默认消费者组
consumer:
group-id: analyze-service-dev
# 最早未被消费的offset
auto-offset-reset: earliest
# 批量一次最大拉取数据量
max-poll-records: 10
# 是否自动提交
enable-auto-commit: false
# 自动提交时间间隔,单位ms
auto-commit-interval: 1000
# 批消费并发量,小于或等于Topic的分区数
batch.concurrency: 3
topic:
# source
source-topic: evt_source_topic_dev
# event
event-topic: evt_event_topic_dev
5 Utils
@Component
public class KafkaUtils {
@Value("${spring.kafka.bootstrap-servers}")
private String springKafkaBootstrapServers;
private AdminClient adminClient;
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
/**
* 初始化AdminClient
* '@PostConstruct该注解被用来修饰一个非静态的void()方法。
* 被@PostConstruct修饰的方法会在服务器加载Servlet的时候运行,并且只会被服务器执行一次。
* PostConstruct在构造函数之后执行,init()方法之前执行。
*/
@PostConstruct
private void initAdminClient() {
Map<String, Object> props = new HashMap<>(1);
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, springKafkaBootstrapServers);
adminClient = KafkaAdminClient.create(props);
}
/**
* 新增topic,支持批量
*/
public void createTopic(Collection<NewTopic> newTopics) {
adminClient.createTopics(newTopics);
}
/**
* 删除topic,支持批量
*/
public void deleteTopic(Collection<String> topics) {
adminClient.deleteTopics(topics);
}
/**
* 获取指定topic的信息
*/
public String getTopicInfo(Collection<String> topics) {
AtomicReference<String> info = new AtomicReference<>("");
try {
adminClient.describeTopics(topics).all().get().forEach((topic, description) -> {
for (TopicPartitionInfo partition : description.partitions()) {
info.set(info + partition.toString() + "\n");
}
});
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
return info.get();
}
/**
* 获取全部topic
*/
public List<String> getAllTopic() {
try {
return adminClient.listTopics().listings().get().stream().map(TopicListing::name).collect(Collectors.toList());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
return Lists.newArrayList();
}
/**
* 往topic中发送消息
*/
public void sendMessage(String topic, String message) {
kafkaTemplate.send(topic, message);
}
public void sendMessage(String topic, String key, String message) {
kafkaTemplate.send(topic, key, message);
}
}
标签:集成,SpringBoot,private,kafka,topic,props,CONFIG,public
From: https://www.cnblogs.com/a999/p/18215639