首页 > 编程语言 >Java Kafka生产消费测试类

Java Kafka生产消费测试类

时间:2024-07-27 17:19:22浏览次数:15  
标签:Java kafka apache 测试 org put import Kafka CONFIG

Java Kafka生产消费测试类:

生产者:

package test.kafkatest;

import java.text.SimpleDateFormat;
import java.util.Properties;
import java.util.Random;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;


public class ProducerTest {

    // 定义主题
    public static String topic = "hejiuwei_test01";

    public static void main(String[] args) throws InterruptedException {
        Properties p = new Properties();
        p.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "myhadoop01:9092");
        // acks:消息的确认机制,默认值是0. acks=0: 如果设置为0,生产者不会等待kafka的响应; acks=1: 这个配置意味着kafka会把这条消息写到本地日志文件中,但是不会等待集群中其他机器的成功响应
        // acks=all: 这个配置意味着leader会等待所有的follower同步完成. 这个确保消息不会丢失, 除非kafka集群中所有机器挂掉. 这是最强的可用性保证.
        p.put("acks", "all");
        // retries: 配置为大于0的值的话, 客户端会在消息发送失败时重新发送.
        p.put("retries", 0);
        // batch.size: 当多条消息需要发送到同一个分区时,生产者会尝试合并网络请求. 这会提高client和生产者的效率.
        p.put("batch.size", 16384);
        // key.serializer: 键序列化,默认org.apache.kafka.common.serialization.StringDeserializer.
        p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        // value.deserializer:值序列化,默认org.apache.kafka.common.serialization.StringDeserializer.
        p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(p);
        try {
            int i = 2;
            do {
                String msg = "{'id':'"+i+"','name':'嬴政-"+i+"','createTime':'"+new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(System.currentTimeMillis()) +"'}";
                ProducerRecord<String, String> record = new ProducerRecord<>(topic, msg);
                kafkaProducer.send(record);
                Thread.sleep(1000L);
                i++;
            } while (true);
        } finally {
            kafkaProducer.close();
        }

    }

}

消费者:

package test.kafkatest;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.Collections;
import java.util.Properties;

public class ConsumerTest {

    private static final String GROUPID = "mytestGroupId01";

    public static void main(String[] args) {

        Properties p = new Properties();
        p.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "myhadoop01:9092");
        p.put(ConsumerConfig.GROUP_ID_CONFIG, GROUPID);
        // 是否自动提交, 默认为true.
        p.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
        // 从poll(拉)的回话处理时长
        p.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        // 超时时间
        p.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
        // 一次最大拉取的条数
        p.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1000);
        // 消费规则, 默认earliest
        p.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        // key.serializer: 键序列化, 默认org.apache.kafka.common.serialization.StringDeserializer.
        p.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        // value.deserializer:值序列化, 默认org.apache.kafka.common.serialization.StringDeserializer.
        p.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(p);
        // 订阅消息
        kafkaConsumer.subscribe(Collections.singletonList(ProducerTest.topic));
        do {
            // 订阅之后, 再从kafka中拉取数据
            ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("-----topic:%s, offset:%d, 消息:%s-----\n", record.topic(), record.offset(), record.value());
            }
        } while (true);
    }
}

 

标签:Java,kafka,apache,测试,org,put,import,Kafka,CONFIG
From: https://www.cnblogs.com/fengbonu/p/18327207

相关文章

  • 2024暑假集训测试13
    前言比赛链接。从来没见过交互题,T1狂CE不止心态炸了,后面的题也没打好,T2、T3简单题都不会了,所以为啥T4又放黑题。T1大众点评原题:AT_joisc2014_d。难点主要在交互,赛时琢磨了半场比赛终于搞明白是啥玩意儿了,可以将给定库当成压缩的一部分代码,可以调用里面的函数,输入......
  • JavaSE常用类1
    常用类object类toString功能:返回对象的字符串形式object类是所有类的父类,所有类都有toString方法使用sout打印对象的引用时,会自动调用toString()返回值:全类名@hash值如果object中的toString不能满足要求,就要重写toString()==1)判断基本数据类型的值是否相同2......
  • dpdk下ipsec内联卸载(inline offload)测试
    使用intel82599网卡完成。介绍本文介绍了数据平面开发套件(DPDK)框架中的内联IPsec加速支持实现,特别关注英特尔®8259910千兆以太网控制器系列的功能和支持。内联IPsec可用于实现IPsec感知系统,该系统具有比旁路辅助和加速硬件更好的延迟,前提是支持的算法合适。......
  • java毕设之学生管理系统(部分源码)
    如有需要完整代码的请+vaaa5988689-------------------------------------------------------------------------------publicclassStudentSysterm{publicstaticvoidmain(String[]args){ArrayList<Student>list=newArrayList<>();/......
  • 运行 Github Action 测试 Docker 镜像时退出代码 137
    我正在学习Testdriven.io:使用FastAPI和Docker进行测试驱动开发课程,目前正在学习持续集成部分。在本节中,您将使用github操作来构建docker映像并运行测试和linting等。在流程的测试Docker映像步骤中,当尝试进行pytest时,我收到以下错误:错误:进程已完成并退出代码......
  • tortoise.exceptions.OperationalError:运行测试时关系不存在
    我正在学习TDDfastapi、docker和pytest课程的第一部分。我遇到了一个奇怪的问题,需要您的帮助。当我创建第一个使用torotoise的测试时,它工作正常,将记录添加到数据库,并从fastapi获取它,没有任何问题。分钟我添加另一个测试(添加的测试,然后读取记录),我收到此错误:tor......
  • “论软件测试中缺陷管理及其应用”写作框架,软考高级论文,系统架构设计师论文
    原创范文软件缺陷指的是计算机软件或程序中存在的某种破坏正常运行能力的问题、错误,或者隐藏的功能缺陷。缺陷的存在会导致软件产品在某种程度上不能满足用户的需要。在目前的软件开发过程中,缺陷是不可避免的。软件测试是发现缺陷的主要手段,其核心目标就是尽可能多地找出......
  • 甄选范文“论软件测试中缺陷管理及其应用”软考高级论文,系统架构设计师论文
    论文真题软件缺陷指的是计算机软件或程序中存在的某种破坏正常运行能力的问题、错误,或者隐藏的功能缺陷。缺陷的存在会导致软件产品在某种程度上不能满足用户的需要。在目前的软件开发过程中,缺陷是不可避免的。软件测试是发现缺陷的主要手段,其核心目标就是尽可能多地找出......
  • 学生Java学习历程-4
    ok,到了一周一次的总结时刻,我大致会有下面几个方面的论述:1.这周学习了Java的那些东西2.这周遇到了什么苦难3.未来是否需要改进方法等几个方面阐述我的学习路程。这周最先开始学的仍旧是一些字词的使用instanceof,左边对象,右边为类,若对象是此类或其子类的对象,则输出true,否则输出fla......
  • docker 中的 Pytest 运行 venv 文件的测试
    我正在关注https://testdriven.io/courses/tdd-fastapi/pytest-setup/,但是当第一次运行docker-composeexecwebpython-mpytest时,我得到collected212items/24errors而不是预期的0个项目.简短的测试摘要信息显示在其他中ERRORenv/Lib/site-pa......