1.指定Stop接口
public interface IStop {
void close();
}
2.指定Consumer接口
public interface IConsumer extends IStop{
void init();
void start() throws Exception;
}
3.定义mq消费者kafka的消费类
@Component
public class ConsumerService extends Thread implements IConsumer {
public void init() {
// 进行初始化动作
logger.info("app {}, token {}, nameserver {}, topic {}", app, token, nameserver, topic);
this.processorList.add(new RedisProcessor());
Properties props = new Properties();
String jaasConfigFormat = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";";
String jaasConfig = String.format(jaasConfigFormat, app, token);
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, nameserver);
props.put(ConsumerConfig.GROUP_ID_CONFIG, app);
props.put(ConsumerConfig.CLIENT_ID_CONFIG, app);
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_PLAINTEXT.name());
props.put(CommonClientConfigs.RETRIES_CONFIG, SecurityProtocol.SASL_PLAINTEXT.name());
props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
props.put(SaslConfigs.SASL_JAAS_CONFIG, jaasConfig);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 100000);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 20);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 2000000);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, 30000);
this.kafkaConsumer = new KafkaConsumer<>(props);
kafkaConsumer.subscribe(Sets.newHashSet(topic));
}
@Override
public synchronized void start() {
super.setName("DeltaKafkaThreadService");
super.start();
}
@Override
public void run() {
while (Stopper.isRunning()) {
try {
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(3000));
records.forEach(record -> {
Record record1 = converter.convert(record);
processorList.forEach(process -> process.process(record1));
});
commit();
} catch (Exception ex) {
logger.error("handle msg exception ", ex);
}
}
}
// 手动commit offset
private void commit() {
kafkaConsumer.commitAsync((offsets, exception) -> {
if (exception != null) {
logger.warn("offset info:={} exception: ", offsets, exception);
}
});
}
@Override
public void close() {
commit();
kafkaConsumer.close();
}
}
标签:ConsumerConfig,技巧,void,mq,props,put,CONFIG,public,消费者
From: https://www.cnblogs.com/PythonOrg/p/16881080.html