pom.xml
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <exclusions> <exclusion> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-api</artifactId> </exclusion> </exclusions> </dependency>
创建kafka消费者
//配置对象 Properties props = new Properties(); //kakfka连接地址 支持集群环境 用','分割 props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.1:9092,192.168.0.2:9092,192.168.0.3:9092"); // group.id,指定了消费者所属群组 props.put(ConsumerConfig.GROUP_ID_CONFIG, "test.group.id_1"); //指定反序列化方式 props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); //指定反序列化方式 props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); KafkaConsumer<String, String> consumer= new KafkaConsumer<String, String>(props); //订阅多个主题 consumer.subscribe(Arrays.asList("test_topic","test_topic_2")); //使用方式 循环获取数据进行处理 while (true){ //100毫秒获取一次数据 ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String,String> record:records) { System.err.println(record.topic()+"\t接收到数据: key:【"+record.key()+"】\tmessage:【"+record.value()+"】\tpartition:【"+record.partition()+"】"); } }
创建kafka生产者
public void createKafkaProducer(){ //配置对象 Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.1:9092,192.168.0.2:9092,192.168.0.3:9092"); props.put(ProducerConfig.ACKS_CONFIG, "all"); props.put(ProducerConfig.RETRIES_CONFIG, 0); props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); props.put(ProducerConfig.LINGER_MS_CONFIG, 1); props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); //创建kafka对象 KafkaProducer<String, String> kafkaProducer=new KafkaProducer<String, String>(props); //给当前kafka生产者一个唯一id,存在多个kafka生产者时可以放入缓存中 String producerId = "1"; sendMessage(kafkaProducer,producerId,"test_topic","test_message_key","test_message_value",0); } /** * 消息发送 * * @param topicName 消息发送队列 * @param key 当前消息唯一key * @param message 消息 * @param partition 分区号 */ public void sendMessage(KafkaProducer<String, String> kafkaProducer,String producerId,String topicName, String key, String message, int partition) { ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topicName, partition, key, message); kafkaProducer.send(producerRecord, new ProducerCallBack(producerId, System.currentTimeMillis(), key, message)); } /** * 实现Callback接口的onCompletion方法 */ class ProducerCallBack implements Callback { private final String id; private final long startTime; private final String key; private final String message; //实现的是DemoCallBack的有参构造 public ProducerCallBack(String id, long startTime, String key, String message) { this.id = id; //通过this把获取的参数内容传递到外层类中,这有这样当回调发生时onCompletion才可以获取类的参数 this.startTime = startTime; this.key = key; this.message = message; } @Override public void onCompletion(RecordMetadata metadata, Exception exception) { long elapsedTime = System.currentTimeMillis() - startTime; if (metadata != null) { log.info(this.id + "发送消息:【topic={}, partition={}, offset={}, key={}, value={}】", metadata.topic(), metadata.partition(), metadata.offset() + " in " + elapsedTime + " ms", this.key, this.message); } else { log.error(this.id + "发送消息失败:【key={}, value={}】", this.key, this.message); } } }
标签:String,生产者,手动,kafka,key,props,put,message,CONFIG From: https://www.cnblogs.com/Sora-L/p/16755122.html