首页 > 其他分享 >手动创建kafka生产者,消费者

手动创建kafka生产者,消费者

时间:2022-10-05 10:13:17浏览次数:38  
标签:String 生产者 手动 kafka key props put message CONFIG

pom.xml

<dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.logging.log4j</groupId>
                    <artifactId>log4j-api</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

创建kafka消费者

//配置对象
        Properties props = new Properties();
        //kakfka连接地址 支持集群环境 用','分割
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.1:9092,192.168.0.2:9092,192.168.0.3:9092");
        // group.id,指定了消费者所属群组
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test.group.id_1");
        //指定反序列化方式
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        //指定反序列化方式
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

        KafkaConsumer<String, String> consumer= new KafkaConsumer<String, String>(props);
        //订阅多个主题
        consumer.subscribe(Arrays.asList("test_topic","test_topic_2"));
        //使用方式 循环获取数据进行处理
        while (true){
            //100毫秒获取一次数据
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String,String> record:records) {
                System.err.println(record.topic()+"\t接收到数据: key:【"+record.key()+"】\tmessage:【"+record.value()+"】\tpartition:【"+record.partition()+"】");
            }
        }

 

创建kafka生产者

public void createKafkaProducer(){
        //配置对象
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.1:9092,192.168.0.2:9092,192.168.0.3:9092");
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        props.put(ProducerConfig.RETRIES_CONFIG, 0);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        //创建kafka对象
        KafkaProducer<String, String> kafkaProducer=new KafkaProducer<String, String>(props);
        //给当前kafka生产者一个唯一id,存在多个kafka生产者时可以放入缓存中
        String producerId = "1";
        sendMessage(kafkaProducer,producerId,"test_topic","test_message_key","test_message_value",0);
    }


    /**
     * 消息发送
     *
     * @param topicName 消息发送队列
     * @param key       当前消息唯一key
     * @param message   消息
     * @param partition 分区号
     */
    public void sendMessage(KafkaProducer<String, String> kafkaProducer,String producerId,String topicName, String key, String message, int partition) {
        ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topicName, partition, key, message);
        kafkaProducer.send(producerRecord, new ProducerCallBack(producerId, System.currentTimeMillis(), key, message));
    }

    /**
     * 实现Callback接口的onCompletion方法
     */
    class ProducerCallBack implements Callback {
        private final String id;
        private final long startTime;
        private final String key;
        private final String message;

        //实现的是DemoCallBack的有参构造
        public ProducerCallBack(String id, long startTime, String key, String message) {
            this.id = id;
            //通过this把获取的参数内容传递到外层类中,这有这样当回调发生时onCompletion才可以获取类的参数
            this.startTime = startTime;
            this.key = key;
            this.message = message;
        }

        @Override
        public void onCompletion(RecordMetadata metadata, Exception exception) {
            long elapsedTime = System.currentTimeMillis() - startTime;
            if (metadata != null) {
                log.info(this.id + "发送消息:【topic={}, partition={}, offset={}, key={}, value={}】",
                        metadata.topic(), metadata.partition(), metadata.offset() + " in " + elapsedTime + " ms",
                        this.key, this.message);
            } else {
                log.error(this.id + "发送消息失败:【key={}, value={}】", this.key, this.message);
            }
        }
    }

 

标签:String,生产者,手动,kafka,key,props,put,message,CONFIG
From: https://www.cnblogs.com/Sora-L/p/16755122.html

相关文章

  • 手动创建mongo连接
    通过读取配置手动创建mongo连接对象一、配置实体importcom.alibaba.druid.util.StringUtils;importcom.ft.monitoring.dataInterface.common.util.Md5Util;importc......
  • 动手动脑——类与对象
    1.publicclassTest1{publicstaticvoidmain(String[]args){Fooobj1=newFoo();Fooobj2=newFoo();......
  • 基于Kafka+ELK搭建海量日志平台
    早在传统的单体应用时代,查看日志大都通过SSH客户端登服务器去看,使用较多的命令就是less或者tail。如果服务部署了好几台,就要分别登录到这几台机器上看,等到了分布式和微服......
  • 【kafka源码】kafka跨目录数据迁移实现源码解析
    ​​......
  • 13张图让你百分百掌握kafka副本同步限流机制
    ​​......
  • k8s上部署Kafka
    一、集群部署Kafka1.1、指定节点部署给以下节点打上标签:k8s-node01、k8s-node02、k8s-master03【也就是我们的三个节点的集群部署在这三个节点上】[root@k8s-master01......
  • 0766-6.3.3-如何实现Kafka跨网络访问
    文档说明在使用Kafka时会遇到内外网的场景,即Kafka集群使用内网搭建,在内网和外网均有客户端需要消费Kafka的消息,同时在集群内由于使用内网环境通信,因此不必太过考虑通信的加......
  • 如何规划设置Kafka Broker的heap size
    温馨提示:如果使用电脑查看图片不清晰,可以使用手机打开文章单击文中的图片放大查看高清原图。Fayson的github:​​https://github.com/fayson/cdhproject​​提示:代码块部分可......
  • 0887-7.1.4-如何在CDP中为Kafka启用Kerberos认证及使用
    1.文档编写目的在CDP集群中启用了Kerberos认证,那么Kafka集群能否与Kerberos认证服务集成呢?本文主要讲述如何通过ClouderaManager为Kafka集群启用Kerberos认证及客户端配置......
  • 动手动脑 类和对象
      两个结果不同,第一个为false,第二个为true 可以推断出对象obj1是引用类型,==判断的是地址是否相等,要想判断内容是否相等要重写equals方法,因为类没有声明继承的话......