KafkaProducerConfig
@Configuration
@EnableKafka
public class KafkaProducerConfig {
/**
* Producer Template 配置
*/
@Bean(name = "kafkaTemplate")
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
/**
* Producer 工厂配置
*/
@Bean
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
/**
* Producer 参数配置
*/
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
// 指定多个kafka集群多个地址
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaConstants.bootstrapServers);
// 重试次数,0为不启用重试机制
props.put(ProducerConfig.RETRIES_CONFIG, 0);
//同步到副本, 默认为1
// acks=0 把消息发送到kafka就认为发送成功
// acks=1 把消息发送到kafka leader分区,并且写入磁盘就认为发送成功
// acks=all 把消息发送到kafka leader分区,并且leader分区的副本follower对消息进行了同步就任务发送成功
props.put(ProducerConfig.ACKS_CONFIG, 1);
// 生产者空间不足时,send()被阻塞的时间,默认60s
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 6000);
// 控制批处理大小,单位为字节
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 4096);
// 批量发送,延迟为1毫秒,启用该功能能有效减少生产者发送消息次数,从而提高并发量
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
// 生产者可以使用的总内存字节来缓冲等待发送到服务器的记录
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 40960);
// 消息的最大大小限制,也就是说send的消息大小不能超过这个限制, 默认1048576(1MB)
props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 1048576);
// 键的序列化方式
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// 值的序列化方式
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// 压缩消息,支持四种类型,分别为:none、lz4、gzip、snappy,默认为none。
// 消费者默认支持解压,所以压缩设置在生产者,消费者无需设置。
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "none");
return props;
}
}
KafkaConsumerConfig
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String>
factory = new ConcurrentKafkaListenerContainerFactory<>();
// 设置消费者工厂
factory.setConsumerFactory(consumerFactory());
// 消费者组中线程数量
factory.setConcurrency(3);
// 拉取超时时间
factory.getContainerProperties().setPollTimeout(3000);
// 当使用批量监听器时需要设置为true
factory.setBatchListener(true);
return factory;
}
@Bean
public ConsumerFactory consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> propsMap = new HashMap<>();
// Kafka地址
propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaConstants.bootstrapServers);
//配置默认分组,这里没有配置+在监听的地方没有设置groupId,多个服务会出现收到相同消息情况
propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, "defaultGroup");
// 是否自动提交offset偏移量(默认true)
propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
// 自动提交的频率(ms)
propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
// Session超时设置
propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
// 键的反序列化方式
propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// 值的反序列化方式
propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// offset偏移量规则设置:
// (1)、earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
// (2)、latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
// (3)、none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return propsMap;
}
}
单消息消费异常处理器
@Service
public class ConsumerService {
private static final Logger log = LoggerFactory.getLogger(ConsumerService.class);
/**
* 消息监听器
* errorHandler 不指定listenErrorHandler的情况,使用全局异常
*/
@KafkaListener(topics = {"test"}, groupId = "group21", errorHandler = "listenErrorHandler")
public void listen(String message) {
log.info(message);
// 创建异常,触发异常处理器
throw new NullPointerException("测试错误处理器");
}
/**
* 异常处理器
*/
@Bean
public ConsumerAwareListenerErrorHandler listenErrorHandler() {
return new ConsumerAwareListenerErrorHandler() {
@Override
public Object handleError(Message<?> message,
ListenerExecutionFailedException e,
Consumer<?, ?> consumer) {
log.info("message:" + message.getPayload());
log.info("exception:" + e.getMessage());
return null;
}
};
}
}
TopicAdministrator
@Configuration
public class TopicAdministrator {
private final TopicConfigurations configurations;
private final GenericWebApplicationContext context;
public TopicAdministrator(TopicConfigurations configurations, GenericWebApplicationContext genericContext) {
this.configurations = configurations;
this.context = genericContext;
}
@PostConstruct
public void init() {
// 创建topic
initializeBeans(configurations.getTopics());
}
private void initializeBeans(List<TopicConfigurations.Topic> topics) {
topics.forEach(t -> context.registerBean(t.name, NewTopic.class, t::toNewTopic));
}
}
TopicConfigurations
@Configuration
@ConfigurationProperties(prefix = "kafka")
@Data
@ToString
public class TopicConfigurations {
private List<Topic> topics;
@Setter
@Getter
@ToString
static class Topic {
String name;
Integer numPartitions = 3;
Short replicationFactor = 1;
NewTopic toNewTopic() {
return new NewTopic(this.name, this.numPartitions, this.replicationFactor);
}
}
}
# yml 自动创建kafka topic的集合
kafka:
topics:
- name: topic1
num-partitions: 3
replication-factor: 1
- name: topic2
num-partitions: 1
replication-factor: 1
- name: topic3
num-partitions: 2
replication-factor: 1