首页 > 其他分享 >kafka发送消息与消费消息

kafka发送消息与消费消息

时间:2023-05-15 23:46:24浏览次数:42  
标签:kafka 发送 消息 org apache import consumer properties

kafka发送消息与消费消息

package com.yl.kafka.producer;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

/**
 * 生产者
 *
 * @author Y-wee
 */
public class Producer {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // kafka 配置
        Properties properties = new Properties();
        // 配置 kafka-bootstrap.servers
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.116.128:9092");
        /*
        序列化配置
         */
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        // 生产者对象
        KafkaProducer<String, String> producer = new KafkaProducer(properties);

        for (int i = 0; i < 5; i++) {
            // 发送消息(异步)
//            producer.send(new ProducerRecord<>("topicA", "message" + i));
            /**
             * 发送消息(异步带回调函数)
             */
            producer.send(new ProducerRecord<>("topicA", "message" + i), new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    if (e == null) {
                        System.out.println("topic: " + recordMetadata.topic() + ",分区: " + recordMetadata.partition());
                    }
                }
            });

            // 发送消息(同步)
//            producer.send(new ProducerRecord<>("topicA", "message" + i)).get();
        }
        // 关闭资源
        producer.close();
    }

}

​ 启动 kafka,执行./kafka-console-consumer.sh --bootstrap-server 服务器ip:9092 --topic topicA命令打开消费者控制台;然后执行程序发送消息后可以在控制台看到发送的消息

​ 如果发送失败或阻塞,参考文档:https://blog.csdn.net/cnds123321/article/details/124181849

package com.yl.kafka.consumer;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;

/**
 * 消费者
 *
 * @author Y-wee
 */
public class Consumer {

    public static void main(String[] args) {
        Properties properties = new Properties();
        // 配置 kafka-bootstrap.servers
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.116.128:9092");
        /*
        反序列化配置(必须)
         */
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        // 配置消费者组
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "groupA");
        // 消费者对象
        KafkaConsumer<String, String> consumer = new KafkaConsumer(properties);

        /*
        设置消费的主题,不用配置分区使用默认分区(可以消费多个主题)
         */
//        ArrayList<String> topics = new ArrayList<>();
//        topics.add("topicA");
//        consumer.subscribe(topics);

        /*
        设置消费的主题及分区
         */
        ArrayList<TopicPartition> topicPartitions = new ArrayList<>();
        topicPartitions.add(new TopicPartition("topicA", 0));
        consumer.assign(topicPartitions);

        while (true) {
            // 设置消费间隔时间
            ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
            // 打印消费到的数据
            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                System.out.println(consumerRecord);
            }
        }

    }

}

​ 指定消费位置

package com.yl.kafka.consumer;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Properties;
import java.util.Set;

/**
 * 消费者
 *
 * @author Y-wee
 */
public class Consumer {

    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.116.128:9092");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "groupA");
        KafkaConsumer<String, String> consumer = new KafkaConsumer(properties);

        ArrayList<TopicPartition> topicPartitions = new ArrayList<>();
        topicPartitions.add(new TopicPartition("topicA", 0));
        consumer.assign(topicPartitions);

        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);
        
        // 获取分区集合
		Set<TopicPartition> assignment=new HashSet<>();
		/*
        等待分区分配完毕
         */
        while (assignment.size() == 0) {
            consumer.poll(Duration.ofSeconds(1));
            assignment = consumer.assignment();
        }

		/*
        遍历所有分区,并指定 offset 从 1700 的位置开始消费
         */
        for (TopicPartition tp: assignment) {
            consumer.seek(tp, 1700);
        }

        while (true) {
            ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                System.out.println(consumerRecord);
            }
        }
    }

}

​ 指定时间消费

package com.yl.kafka.consumer;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Properties;
import java.util.Set;

/**
 * 消费者
 *
 * @author Y-wee
 */
public class Consumer {

    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.116.128:9092");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "groupA");
        KafkaConsumer<String, String> consumer = new KafkaConsumer(properties);

        ArrayList<TopicPartition> topicPartitions = new ArrayList<>();
        topicPartitions.add(new TopicPartition("topicA", 0));
        consumer.assign(topicPartitions);

        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);
        
        Set<TopicPartition> assignment=new HashSet<>();
	
        while (assignment.size() == 0) {
            consumer.poll(Duration.ofSeconds(1));
            assignment = consumer.assignment();
        }

        // 存储分区及时间戳
        HashMap<TopicPartition, Long> timestampToSearch = new HashMap<>();
        
        for (TopicPartition topicPartition : assignment) {
            // 指定消费每个分区对应一天前的数据
            timestampToSearch.put(topicPartition, System.currentTimeMillis() - 24 * 3600 * 1000);
        }
        // 存储分区及对应的 offset
        Map<TopicPartition, OffsetAndTimestamp> offsets = consumer.offsetsForTimes(timestampToSearch);
        /*
        遍历每个分区,对每个分区设置消费时间
         */
        for (TopicPartition topicPartition : assignment) {
            OffsetAndTimestamp offsetAndTimestamp = offsets.get(topicPartition);
            if (offsetAndTimestamp != null){
                // 指定开始消费的位置
                consumer.seek(topicPartition, offsetAndTimestamp.offset());
            }
        }

        while (true) {
            ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                System.out.println(consumerRecord);
            }
        }
    }

}

标签:kafka,发送,消息,org,apache,import,consumer,properties
From: https://www.cnblogs.com/Y-wee/p/17403475.html

相关文章

  • kafka 磁盘迁移
    磁盘迁移背景介绍:Kafka搭建时,配置的磁盘过大,成本过高,所以需要迁移到小容量磁盘原kakfa配置:log.dirs=/data1/kafka/var/kafka-logs/1,/data2/kafka/var/kafka-logs/1(kafka磁盘可以支持多磁盘配置,提高吞吐量)log.retention.hours=168修改后的kafka配置:log.dirs=/data3/k......
  • WM_大屏实时计算深度剖析 flink写es kafka cannal配置 暂时没用
    第1章大屏实时计算深度剖析学习目标目标1:了解实时计算的应用场景目标2:实时流计算的快速入门(Flink的入门使用)目标3:Flink接入技术体系的剖析(hdfs,jdbc,kafka,socket)目标4:Flink数据处理引擎的实战(采用双十一大屏,热销数据统计,区域分类统计,cep复杂事件的处理)目标5:实时......
  • 工程监测NLM5无线中继采集发送仪的工作模式
    工程监测NLM5无线中继采集发送仪的工作模式 NLM5xx有自动定时启动和随时无线唤醒两种工作模式。可定时启动或者使用无线读数仪将其唤醒采集传感器数据并经LoRA无线发送。多达16通道的传感器接口,最多可连接16个振弦、温度或者模拟信号(电压/电流)。内置大容量存储器,可做为......
  • Kafka 高可靠高性能原理探究
    引言在探究Kafka核心知识之前,我们先思考一个问题:什么场景会促使我们使用Kafka? 说到这里,我们头脑中或多或少会蹦出异步解耦和削峰填谷等字样,是的,这就是Kafka最重要的落地场景。异步解耦:同步调用转换成异步消息通知,实现生产者和消费者的解耦。想象一个场景,在商品交易......
  • 前后端微信小程序订阅消息推送
    小程序端开发前需要获取小程序设置模板ID,没有设置模板消息时可以添加新的模板mp.weixin.qq.com拥有模板ID后,需要获取到下发消息权限用户下发推送消息权限在订单或者其它操作完成时,调起客户端小程序订阅消息界面,获取到用户操作结果//index.wxml<buttonbindtap="bindSubscribe......
  • Linux安装KafKa
    Linux安装KafKa​ 官方下载地址:http://kafka.apache.org/downloads.html​ 解压安装包tar-zxvfkafka_2.12-3.3.2.tgz​ 修改配置文件vimserver.properties#broker的全局唯一编号,不能重复,只能是数字broker.id=0#kafka运行日志(数据)存放的路径,路径不需要提前创......
  • Python学习之八_调用Outlook发送邮件以及调用远程windows上面的python
    Python学习之八_调用Outlook发送邮件以及调用远程windows上面的python摘要之前只有一个需求是发送加密邮件.之前一直是使用linux进行发送.但是总是无法发送加密邮件.最近学习python,发现可以使用python来调用outlook来发送邮件.这样就比较简单了.可以直接使用outlook的......
  • odoo 消息
    #defclick_created(self):#sender=self.env['res.partner'].browse(2)#receiver=self.env['res.partner'].browse(6)#message=self.env['mail.message'].create({#'subject&#......
  • Springboot 开启异步任务Async,邮件发送任务,定时任务
    异步任务1.主启动类开启异步注解 2.service目录下开启异步任务注解@ServicepublicclassAsyncService{@Async//异步任务注解的标志publicvoidhello(){try{Thread.sleep(3000);}catch(InterruptedExceptione){......
  • 用扩展的方式在 PHP 中使用 Kafka
    前言:    由于之前在PHP中使用Kafka是通过composer包的方式,由于 nmred/kafka-php很久没有维护,并且网上相关问题的文章也比较少。所以我这次换成PHP扩展 RdKafka继续使用,主要介绍扩展安装和这种方式的基本操作。 安装:1.下载2.目录    由于php-rdkafka......