首页 > 其他分享 >kafka_demo

kafka_demo

时间:2023-08-06 10:55:58浏览次数:37  
标签:demo kafka topic props put new import

参考:

概念:https://zhuanlan.zhihu.com/p/74063251

代码运用:https://zhuanlan.zhihu.com/p/114209326

 

参考 kafka 在windows 平台的搭建和简单实用_一代键客的博客-CSDN博客,先验证本地是否能使用kafka成功生产消费消息,如果因为版本问题遇到报错“bootstrap-server is not a recognized option”,参考 Kafka踩坑记----bootstrap-server is not a recognized option如何解决_南望南山的博客-CSDN博客 、Kafka参数zookeeper和bootstrap-server的区别 - Clotho_Lee - 博客园 (cnblogs.com)解决。

启动zookeeper

 启动kafka

 创建topic,生产消息

 能成功消费到消息

 再用代码实现生产消费消息

Producer.java

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class Producer implements Runnable {
    private KafkaProducer<String, String> producer;
    private String topic;

    public Producer(String topic) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "localhost:9092");
        properties.put("acks", "all");
        properties.put("retries", 0);
        properties.put("batch.size", 16384);
        properties.put("key.serializer", StringSerializer.class.getName());
        properties.put("value.serializer", StringSerializer.class.getName());
        this.producer = new KafkaProducer<String, String>(properties);
        this.topic = topic;
    }

    @Override
    public void run() {
        int count = 0;
        while (count < 10) {
            String msg = "NO. " + count + " msg";
            System.out.println("produce NO." + count + " msg...");
            producer.send(new ProducerRecord<>(topic, "msg", msg));
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            count++;
        }
        producer.close();
    }

    public static void main(String[] args) {
        new Thread(new Producer("kafka")).start();
    }
}

  

Consumer.java

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.Arrays;
import java.util.Properties;

public class Consumer implements Runnable {
    private KafkaConsumer<String, String> consumer;
    private String groundId;
    private String topic;
    private ConsumerRecords<String, String> msgList;

    public Consumer(String topic, String groundId) {
        this.topic = topic;
        this.groundId = groundId;
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", groundId);
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "30000");
        props.put("auto.offset.reset", "earliest");
        props.put("key.deserializer", StringDeserializer.class.getName());
        props.put("value.deserializer", StringDeserializer.class.getName());
        this.consumer = new KafkaConsumer<String, String>(props);
        this.consumer.subscribe(Arrays.asList(topic));
    }

    @Override
    public void run() {
        while (true) {
            msgList = consumer.poll(1000);
            System.out.println("poll msg,msgList size:" + msgList.count());
            if (msgList != null && msgList.count() > 0) {
                int idx = 0;
                for (ConsumerRecord<String, String> record : msgList) {
                    System.out.println("consume NO." + idx + " msg:key " + record.key() + " val " + record.value() +
                            " offset" + record.offset());
                    idx++;
                }
            } else {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        }
    }

    public static void main(String[] args) {
        new Thread(new Consumer("kafka", "groundA")).start();
    }
}

  

先启动生产者,再启动消费者,运行结果:

 

 

标签:demo,kafka,topic,props,put,new,import
From: https://www.cnblogs.com/hemeiwolong/p/17580620.html

相关文章

  • 05服务拆分-案例Demo
    下载对应初始项目cloud-demo链接:https://pan.baidu.com/s/1NpovDVLj8ZSrDjt2seID2A?pwd=dp3f提取码:dp3f准备数据库dockerpsdockerexec-it6d542566d077/bin/bashpsql'host=localhostport=5432user=postgresdbname=postgres'createdatabasecloud_order;cre......
  • 谈谈 Kafka 的幂等性 Producer
    使用消息队列,我们肯定希望不丢消息,也就是消息队列组件,需要保证消息的可靠交付。消息交付的可靠性保障,有以下三种承诺:最多一次(atmostonce):消息可能会丢失,但绝不会被重复发送。至少一次(atleastonce):消息不会丢失,但有可能被重复发送。精确一次(exactlyonce):消息不会丢失,也不会被......
  • 搭建zk,kafka环境所遇到的问题
    env:jdk1.8centos7.1zookeeper3.4.6选这版体是因为对kafka做了很多优化,修改 Q1:zk启动不了A1: ./zkServer.sh start-foreground   发现少了myid 解决 :在zk数据文件存放目录下(见$ZK/conf/zoo.cfg,dataDir属性),创建myid文件并写入一个数字用来标识本节点(类似这个节点的......
  • Kafka 2.11 安装和测试
    1.简介 kafka(官网地址:http://kafka.apache.org)是一款分布式消息发布和订阅的系统,具有高性能和高吞吐率。  i.消息的发布(publish)称作producer,消息的订阅(subscribe)称作consumer,中间的存储阵列称作broker。 ii.多个broker协同合作,producer、consumer和broker三者之间通过zooke......
  • kafka权威指南(阅读摘录)
    零复制Kafka使用零复制技术向客户端发送消息——也就是说,Kafka直接把消息从文件(或者更确切地说是Linux文件系统缓存)里发送到网络通道,而不需要经过任何中间缓冲区。这是Kafka与其他大部分数据库系统不一样的地方,其他数据库在将数据发送给客户端之前会先把它们保存在本地缓......
  • 玩一玩Aidlux应用中心demo
    玩一玩Aidlux应用中心demo之目标检测首先,安装aidlux安卓端,登录pc端。其次,下载桌面examples。最后,任意选择一个demo文件打开运行,即可得到如下展示效果。点击下方链接观看视频。https://link.zhihu.com/?target=https%3A//www.bilibili.com/video/BV1sk4y137Gz/%3Fvd_source%3D......
  • Qt+GDAL开发笔记(二):在windows系统msvc207x64编译GDAL库、搭建开发环境和基础Demo
    前言  上一篇使用mingw32版本的gdal,过程曲折,为更好的更方便搭建环境,在windows上msvc方式对于库比较友好。<br>大地坐标简介概述  大地坐标(Geodeticcoordinate)是大地测量中以参考椭球面为基准面的坐标,地面点P的位置用大地经度L、大地纬度B和大地高H表示。原理  当点在......
  • kratos项目中使用kafka实现延迟队列
    项目地址https://gitee.com/huoyingwhw/kratos_kafkaB站视频地址B站视频地址——kratos项目中使用kafka实现延迟队列......
  • Qt+GDAL开发笔记(二):在windows系统msvc207x64编译GDAL库、搭建开发环境和基础Demo
    前言  上一篇使用mingw32版本的gdal,过程曲折,为更好的更方便搭建环境,在windows上msvc方式对于库比较友好。 大地坐标简介概述  大地坐标(Geodeticcoordinate)是大地测量中以参考椭球面为基准面的坐标,地面点P的位置用大地经度L、大地纬度B和大地高H表示。原理......
  • Kafka - Kafka v.s. NATS v.s. RabbitMQ
     Kafkav.s.RabbitMQ 优先选择Kafka的条件·严格的消息顺序·延长消息留存时间,包括过去消息重放的可能·传统解决方案无法满足的高伸缩能力 优先选择RabbitMQ的条件·高级灵活的路由规则·消息时序控制(控制消息过期或消息延迟)·高级的容错处理能力,在消费者更......