Java Kafka生产消费测试类:
生产者:
package test.kafkatest; import java.text.SimpleDateFormat; import java.util.Properties; import java.util.Random; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; public class ProducerTest { // 定义主题 public static String topic = "hejiuwei_test01"; public static void main(String[] args) throws InterruptedException { Properties p = new Properties(); p.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "myhadoop01:9092"); // acks:消息的确认机制,默认值是0. acks=0: 如果设置为0,生产者不会等待kafka的响应; acks=1: 这个配置意味着kafka会把这条消息写到本地日志文件中,但是不会等待集群中其他机器的成功响应 // acks=all: 这个配置意味着leader会等待所有的follower同步完成. 这个确保消息不会丢失, 除非kafka集群中所有机器挂掉. 这是最强的可用性保证. p.put("acks", "all"); // retries: 配置为大于0的值的话, 客户端会在消息发送失败时重新发送. p.put("retries", 0); // batch.size: 当多条消息需要发送到同一个分区时,生产者会尝试合并网络请求. 这会提高client和生产者的效率. p.put("batch.size", 16384); // key.serializer: 键序列化,默认org.apache.kafka.common.serialization.StringDeserializer. p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); // value.deserializer:值序列化,默认org.apache.kafka.common.serialization.StringDeserializer. p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(p); try { int i = 2; do { String msg = "{'id':'"+i+"','name':'嬴政-"+i+"','createTime':'"+new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(System.currentTimeMillis()) +"'}"; ProducerRecord<String, String> record = new ProducerRecord<>(topic, msg); kafkaProducer.send(record); Thread.sleep(1000L); i++; } while (true); } finally { kafkaProducer.close(); } } }
消费者:
package test.kafkatest; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; import java.util.Collections; import java.util.Properties; public class ConsumerTest { private static final String GROUPID = "mytestGroupId01"; public static void main(String[] args) { Properties p = new Properties(); p.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "myhadoop01:9092"); p.put(ConsumerConfig.GROUP_ID_CONFIG, GROUPID); // 是否自动提交, 默认为true. p.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); // 从poll(拉)的回话处理时长 p.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); // 超时时间 p.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000"); // 一次最大拉取的条数 p.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1000); // 消费规则, 默认earliest p.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // key.serializer: 键序列化, 默认org.apache.kafka.common.serialization.StringDeserializer. p.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); // value.deserializer:值序列化, 默认org.apache.kafka.common.serialization.StringDeserializer. p.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(p); // 订阅消息 kafkaConsumer.subscribe(Collections.singletonList(ProducerTest.topic)); do { // 订阅之后, 再从kafka中拉取数据 ConsumerRecords<String, String> records = kafkaConsumer.poll(100); for (ConsumerRecord<String, String> record : records) { System.out.printf("-----topic:%s, offset:%d, 消息:%s-----\n", record.topic(), record.offset(), record.value()); } } while (true); } }
标签:Java,kafka,apache,测试,org,put,import,Kafka,CONFIG From: https://www.cnblogs.com/fengbonu/p/18327207