首页 > 编程语言 >java实现kafka生产者消费者

java实现kafka生产者消费者

时间:2022-08-31 16:33:40浏览次数:82  
标签:java 生产者 import kafka apache props put org

参考:https://blog.csdn.net/zhengzaifeidelushang/article/details/121984379

深入浅出理解kafka原理系列之:java实现kafka消费者

一、pom.xml引入kafka依赖
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.7.2</version>
</dependency>

二、kafka消费者程序
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class MyConsumer {
private final static String TOPIC_NAME = "optics-topic";

public static void main(String[] args) throws ExecutionException, InterruptedException {
Properties props = new Properties();
//设置kafka集群的地址
props.put("bootstrap.servers", "10.129.88.26:9092,10.129.88.32:9092,10.129.88.39:9092");
props.put("sasl.jaas.config","org.apache.kafka.common.security.plain.PlainLoginModule " +
"required username=\"debezium\" password=\"NGFlM2I1NTJlNmFk\";");
props.put("security.protocol","SASL_PLAINTEXT");
props.put("sasl.mechanism","PLAIN");
//消费者组
props.put("group.id", "opticsgroup");
//反序列化
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//创建消费者
KafkaConsumer<String, String> consumer = new KafkaConsumer<String,String>(props);
//订阅主题
consumer.subscribe(Arrays.asList(TOPIC_NAME));

try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for(ConsumerRecord<String,String> record : records){
System.out.printf("topic = %s, partition = %s, offset = %d, key = %s, value =%s\n",
record.topic(), record.partition(), record.offset(), record.key(), record.value());
}
}
} finally {
consumer.close();
}
}
}

三、kafka生产者程序
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class MyProducer {
private final static String TOPIC_NAME = "optics-topic";

public static void main(String[] args) throws ExecutionException, InterruptedException {
Properties props = new Properties();
//设置kafka集群的地址
props.put("bootstrap.servers", "10.129.88.26:9092,10.129.88.32:9092,10.129.88.39:9092");
props.put("sasl.jaas.config","org.apache.kafka.common.security.plain.PlainLoginModule " +
"required username=\"debezium\" password=\"NGFlM2I1NTJlNmFk\";");
props.put("security.protocol","SASL_PLAINTEXT");
props.put("sasl.mechanism","PLAIN");
//ack模式,all是最慢但最安全的
props.put("acks", "-1");
props.put("retries", 3);
//每个分区未发送消息总字节大小(单位:字节),超过设置的值就会提交数据到服务端
props.put("batch.size", 16384);
//生产者客户端能发送消息的最大值,默认值为1048576B,1MB
//props.put("max.request.size",10);
//消息在缓冲区保留的时间,超过设置的值就会被提交到服务端
props.put("linger.ms", 10);
//整个Producer用到总内存的大小,如果缓冲区满了会提交数据到服务端
//buffer.memory要大于batch.size,否则会报申请内存不足的错误
props.put("buffer.memory", 33554432);
//序列化器
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
//key:作用是决定了往哪个分区上发,value:具体要发送的消息内容
for (int i = 0; i < 10; i++) {
RecordMetadata metadata = producer.send(new ProducerRecord<String, String>(TOPIC_NAME, Integer.toString(i), "dd:" + i)).get();
System.out.println("同步方式发送消息结果:" + "topic名称:" + metadata.topic() + " | partition分区:" + metadata.partition() + " | offset偏移量:" + metadata.offset());
}
}
}

四、先运行kafka生产者程序,再查看kafka消费者程序

 

标签:java,生产者,import,kafka,apache,props,put,org
From: https://www.cnblogs.com/qsds/p/16643553.html

相关文章

  • # JavaScript中的数组
    目录JavaScript中的数组数组的概念创建数组1.利用new创建数组2.利用数组字面量创建数组获取数组中的元素数组的索引数组遍历数组新增元素冒泡排序JavaScript中的数组数组......
  • java修饰符
    基本介绍:java提供了四种访问控制修饰符号,控制犯方法和属性(成员变量)的访问权限(范围)1、公开级别:用public修饰,绝对公开2、受保护级别:用protected修饰,对子类和同一个包中的......
  • Java15-File类、递归
    Java15【File类、递归】主要内容File类递归Lambda优化第一章File类1.1概述java.io.File类是文件和目录路径名的抽象表示,主要用于文件和目录的创建、查找......
  • IOS下无法获取到WebViewJavascriptBridge的问题排查
    问题描述在安卓下正常获取到WebViewJavascriptBridge,在IOS下一直无法获取到WebViewJavascriptBridge官网的示例如下:接手项目时已有的桥接代码:可以发现src不同,一个......
  • Java 修饰符
    访问控制修饰符Java中,可以使用访问控制符来保护对类、变量、方法和构造方法的访问。Java支持4种不同的访问权限。default (即默认,什么也不写):在同一包内可见,不使......
  • JAVA面试题总结归纳
    问题一:JAVA的原始数据类型有哪些,它们的大小以及其对应的封装类是什么?数据类型对应大小对应的封装类boolean1bit,boolean类型单独使用是4个字节,而在数组中又是1个......
  • smile——Java机器学习引擎
    资源https://haifengl.github.io/https://github.com/haifengl/smile介绍Smile(统计机器智能和学习引擎)是一个基于Java和Scala的快速、全面的机器学习、NLP、线性代数、......
  • 【Java学习Day11】变量种类及命名规范
    变量变量是什么:就是可以变化的量Java是一种强类型语言,每个变量都必须声明其类型Java变量是程序中最基本的存储单元,其要素包括变量名,变量类型和作用域typevarNa......
  • java 实现逻辑分页
    //逻辑分页PageModelmodel=newPageModel();longtotal=list.size();model.setTotal(total);model.setPageNum(pageNum);model.setPageSize(pageSize);longpage=tot......
  • Java:将Excel转换为XPS
    Excel文档具有存放、处理数据等功能,用途十分广泛。有时为了满足不同的工作需要,我们需要将Excel文件进行文档格式转换。将Excel文件转换为XPS文件就是其中一种。XPS是一种版......