public class MyConsumer { private final static String TOPIC_NAME = "my-replicated-topic"; private final static String CONSUMER_GROUP_NAME = "testGroup"; public static void main(String[] args) { Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"10.31.167.10:9092,10.31.167.10:9093,10.31.167.10:9094"); // 消费分组名 props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_NAME); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName()); //创建一个消费者的客户端 KafkaConsumer<String, String> consumer = new KafkaConsumer<String,String>(props); // 消费者订阅主题列表 consumer.subscribe(Arrays.asList(TOPIC_NAME)); while (true) { /* * poll() API 是拉取消息的⻓轮询 */ ConsumerRecords<String, String> records =consumer.poll(Duration.ofMillis( 1000 )); for (ConsumerRecord<String, String> record : records) { System.out.printf("收到消息:partition = %d,offset = %d, key =%s, value = %s%n", record.partition(),record.offset(), record.key(), record.value()); } } } }
标签:ConsumerConfig,java,NAME,spring,boot,record,props,put,CONFIG From: https://www.cnblogs.com/xiaobaibailongma/p/17259118.html