首页 > 其他分享 >Kafka 生产者API,消费者API,拦截器,流计算

Kafka 生产者API,消费者API,拦截器,流计算

时间:2023-10-26 11:08:38浏览次数:30  
标签:拦截器 kafka put API org apache import prop Kafka


pom 文件如下:

<dependencies>
	<dependency>
	    <groupId>org.apache.kafka</groupId>
	    <artifactId>kafka-clients</artifactId>
	    <version>2.0.0</version>
	</dependency>
	<dependency>
		<groupId>org.apache.kafka</groupId>
		<artifactId>kafka-streams</artifactId>
		<version>2.0.0</version>
	</dependency>
</dependencies>

一、Kafka 生产者 API

package com.qjl.kafka.producer;

import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

/**
 * 生产者API
 * @author 曲健磊
 */
public class MyProducer {

	public static void main(String[] args) {
		
		// 1.配置生产者属性
		Properties prop = new Properties();
		// kafka节点地址
		prop.put("bootstrap.servers", "192.168.0.1:9092");
		// 发送消息是否等待应答
		prop.put("acks", "all");
		// 发送消息失败重试(重试的间隔时间)
		prop.put("retries", "0");
		// 配置批量处理消息大小
		prop.put("batch.size", "1024");
		// 配置批量处理数据延迟
		prop.put("linger.ms", "5");
		// 配置内存缓冲区
		prop.put("buffer.memory", "10240");
		// 消息发送前必须序列化
		prop.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
		// 序列化
		prop.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
		
		// 2.实例化生产者
		KafkaProducer<String, String> producer = new KafkaProducer<String, String>(prop);
		
		// 3.发送消息
		for (int i = 0; i < 99; i++) {
			producer.send(new ProducerRecord<String, String>("demo", "I love Beijing" + i));
		}
		
		// 4.释放资源
		producer.close();	
	}
}

带有接口回调的生产者API:

package com.qjl.kafka.producer;

import java.util.Properties;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

/**
 * 接口回调
 * @author 曲健磊
 */
public class MyProducerCallback {

	public static void main(String[] args) {
		// 1.配置生产者属性
		Properties prop = new Properties();
		// kafka节点地址
		prop.put("bootstrap.servers", "192.168.0.1:9092");
		// 发送消息是否等待应答
		prop.put("acks", "all");
		// 发送消息失败重试(重试的间隔时间)
		prop.put("retries", "0");
		// 配置批量处理消息大小
		prop.put("batch.size", "1024");
		// 配置批量处理数据延迟
		prop.put("linger.ms", "5");
		// 配置内存缓冲区
		prop.put("buffer.memory", "10240");
		// 消息发送前必须序列化
		prop.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
		// 序列化
		prop.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
		// 自定义分区规则
		prop.put("partitioner.class", "com.qjl.kafka.partition.MyPartition");
		
		// 2.实例化生产者
		KafkaProducer<String, String> producer = new KafkaProducer<String, String>(prop);
		
		// 3.发送消息
		for (int i = 0; i < 99; i++) {
			producer.send(new ProducerRecord<String, String>("yuandan", "hello" + i), new Callback(){
				public void onCompletion(RecordMetadata metdata, Exception ex) {
					// 如果metdata不为null,拿到当前数据偏移量和分区
					if (metdata != null) {
						System.out.println(metdata.topic() + "----" + metdata.offset() + "----" + metdata.partition());
					}
				}
			});
		}
		
		// 4.关闭资源
		producer.close();
	}
}
package com.qjl.kafka.partition;

import java.util.Map;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;

public class MyPartition implements Partitioner {

	// 配置
	public void configure(Map<String, ?> arg0) {
		
	}

	public void close() {
		
	}

	// 返回分区号
	public int partition(String arg0, Object arg1, byte[] arg2, Object arg3, byte[] arg4, Cluster arg5) {
		// 消费者默认监控0号分区,而生产者发送的分区是1,所以在不指定消费者消费分区的号的前提下,消费者无法消费到数据
//		return 1;
		return 0;
	}
}

二、Kafka 消费者 API

package com.qjl.kafka.consumer;

import java.util.Arrays;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

public class MyConsumer {

	public static void main(String[] args) {
		// 1.配置消费者属性
		Properties prop = new Properties();
		// kafka节点地址
		prop.put("bootstrap.servers", "192.168.0.1:9092");
		// 配置消费者组
		prop.put("group.id", "group1");
		// 配置自动确认偏移量offset
		prop.put("enable.auto.commit", "true");
		// 消息发送前必须序列化
		prop.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
		// 序列化
		prop.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
		
		// 2.实例消费者
		final KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(prop);
		
		// 4.释放资源,线程安全
		Runtime.getRuntime().addShutdownHook(new Thread(new Runnable(){
			public void run() {
				if (consumer != null) {
					consumer.close();
				}
			}
		}));
		
		// 订阅消息
		consumer.subscribe(Arrays.asList("demo", "yuandan", "t2"));
		
		// 3.拉消息,poll
		while (true) {
			ConsumerRecords<String, String> records = consumer.poll(1000);
			// 遍历消息
			for (ConsumerRecord<String, String> r : records) {
				System.out.println("消费:" + r.key() + "---" + r.value());
			}
		}
	}
}

三、拦截器

package com.qjl.kafka.interceptor;

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

/**
 * 带有拦截器的生产者
 * @author 曲健磊
 */
public class MyProducer {

	public static void main(String[] args) {
		// 1.配置生产者属性
		Properties prop = new Properties();
		// kafka节点地址
		prop.put("bootstrap.servers", "192.168.0.1:9092");
		// 发送消息是否等待应答
		prop.put("acks", "all");
		// 发送消息失败重试(重试的间隔时间)
		prop.put("retries", "0");
		// 配置批量处理消息大小
		prop.put("batch.size", "1024");
		// 配置批量处理数据延迟
		prop.put("linger.ms", "5");
		// 配置内存缓冲区
		prop.put("buffer.memory", "10240");
		// 消息发送前必须序列化
		prop.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
		// 序列化
		prop.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
		// 拦截器
		List<String> interceptorList = new ArrayList();
		interceptorList.add("com.qjl.kafka.interceptor.TimeInterceptor");
		prop.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptorList);
		
		// 2.实例化生产者
		KafkaProducer<String, String> producer = new KafkaProducer<String, String>(prop);
		
		// 3.发送消息
		for (int i = 0; i < 99; i++) {
			producer.send(new ProducerRecord<String, String>("demo", "key" + i, "I love Beijing" + i));
		}
		
		// 4.释放资源
		producer.close();		
	}
}

拦截器实现:

package com.qjl.kafka.interceptor;

import java.util.Map;

import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

public class TimeInterceptor implements ProducerInterceptor<String, String> {

	// 配置信息
	public void configure(Map<String, ?> configs) {
		// TODO Auto-generated method stub
		
	}

	// 拦截逻辑
	public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
		return new ProducerRecord<String, String>(record.topic(),
			record.partition(),
			record.key(),
			System.currentTimeMillis() + "-" + record.value());
	}

	// 发送失败时的应答
	public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
		
	}

	// 关闭
	public void close() {
		
	}
}

PS:拦截器其实就是起一个过滤的所用,把生产者发送的消息进行过滤。

四、流计算

Kafka 只能进行简单的流计算,更加复杂的流计算还需要依靠 Storm,JStorm,SparkStreaming,Flink 等。

package com.qjl.kafka.stream;

import java.util.Properties;

import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorSupplier;

/**
 * 对数据进行清洗
 * @author qujianlei
 */
public class Application {

	/**
	 * qujianlei-china 把-去掉
	 * @param args
	 */
	public static void main(String[] args) {
		// 1.定义主题,发送到另外一个主题中,数据清洗
		String oneTopic = "t1";
		String twoTopic = "t2";
		
		// 2.设置属性
		Properties prop = new Properties();
		prop.put(StreamsConfig.APPLICATION_ID_CONFIG, "logProcessor");
		// 多台节点的话用逗号分隔
		prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.1:9092");
		
		// 3.实例对象
		StreamsConfig config = new StreamsConfig(prop);
		
		// 4.流计算,拓扑
		Topology topology = new Topology();
		
		// 5.定义kafka组件来源
		topology.addSource("Source", oneTopic).addProcessor("Processor", new ProcessorSupplier<byte[], byte[]>(){
			public Processor<byte[], byte[]> get() {
				return new LogProcessor();
			}
			// 数据从Source中来,到Sink中去
		}, "Source").addSink("Sink", twoTopic, "Processor");
		
		// 6.实例化KafkaStream
		KafkaStreams kafkaStream = new KafkaStreams(topology, prop);
		kafkaStream.start();
	}
}
package com.qjl.kafka.stream;

import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;

/**
 * 完成数据清洗
 * @author qujianlei
 */
public class LogProcessor implements Processor<byte[], byte[]> {

	private ProcessorContext context;
	
	// 释放资源
	public void close() {
		
	}

	// 初始化
	public void init(ProcessorContext context) {
		this.context = context;
	}

	// 执行具体清洗的业务逻辑
	public void process(byte[] key, byte[] value) {
		// 1.拿到消息数据,转成字符串
		String message = new String(value);
		
		// 2.如果包含-就去除
		if (message.contains("-")) {
			// 去掉-及左侧的数据
			message = message.split("-")[1];
		}
		// 3.发送数据
		context.forward(key, message.getBytes());
	}
}

测试:先启动 Application 程序,在 kafka 上创建两个 topic:t1,t2,启动一个生产者向 t1 发送数据,启动一个消费者在 t2 接收数据,消费者就可以在 t2 上接收到经过处理后的数据。


标签:拦截器,kafka,put,API,org,apache,import,prop,Kafka
From: https://blog.51cto.com/u_14655640/8030911

相关文章

  • Kafka 简介、集群架构、安装部署、基本命令
    一、kafka是什么?在实时计算中,Kafka主要是用来缓存数据,storm可以通过消费kafka中的数据进行实时计算。一套开源的分布式的消息队列系统,由scala写成,支持javaAPI。Kafka读消息采用topic进行归类。二、kafka中有哪两种角色?发送消息:Producer(生产者)接收消息:Consumer(消费者)三......
  • kafka基于SCRAM认证,快速配置启用ACL
    启动和停止服务zookeeper/usr/local/apache-zookeeper-3.8.2-bin/bin/zkServer.shstart/usr/local/apache-zookeeper-3.8.2-bin/bin/zkServer.shstopkafka/usr/local/kafka_2.13-3.2.3/bin/kafka-server-stop.sh/usr/local/kafka_2.13-3.2.3/bin/kafka-server-start.sh-......
  • docker-compose部署SASL认证的kafka
    前言测试服务器:10.255.60.149一.编写docker-compose文件1.docker-compose.ymlversion:'3.8'services:zookeeper:image:wurstmeister/zookeepervolumes:-/data/zookeeper/data:/data-/home/docker-compose/kafka/config:/opt/zookeeper-......
  • helm部署kafka鉴权以及ACL
    官方文档https://github.com/bitnami/charts/tree/main/bitnami/kafkahttps://blog.csdn.net/u011618288/article/details/129105777(包含zookeeper与broker认证、鉴权流程)一.修改values.yaml文件按通用部署方案拉下来kafka安装包,修改values.yaml文件,开启scram鉴权,ACL以......
  • kafka-ACL
    本文档时在centos7直接部署添加认证的kafka文件基础上,做下面的修改实现ACL访问控制topic参考:https://www.seaxiang.com/blog/Qpsqii一.添加多个kafka用户及相关的配置文件1.kafka_server_jaas.confKafkaServer{org.apache.kafka.common.security.plain.PlainLoginModule......
  • centos7直接部署添加认证的kafka
    前言测试服务器:10.255.60.149一.安装jdk官网下载jdk1.8版本以上的https://www.oracle.com/java/technologies/downloads/测试系统版本为centos7,选择了最后一个下载后,使用rpm-ivh即可安装二.安装zookeeper和kafka软件版本:kafka_2.12-2.4.0(带zookeeper)下载链接:http://a......
  • 603-60API资源对象StorageClass、Ceph存储 6.3-6.5
    一、NFS存储使用master-1-230节点做NFS服务器,具体安装步骤参考:https://www.cnblogs.com/pythonlx/p/17766242.html (4.1在master节点搭建NFS)node节点查看NFS挂载目录##showmount-e192.168.1.230Exportlistfor192.168.1.230:/data/kubernetes*/data/nfs_test*......
  • Apipost现已支持连接数据库!
    Apipost提供了数据库连接功能,在接口调试时可以使用数据库获取入参或进行断言校验。目前的Apipost支持:Mysql、SQLSever、Oracle、Clickhouse、达梦数据库、PostgreSQL、Redis、MongoDB8种数据库的连接操作新建数据库连接:在「项目设置」-「公共资源维护」-「连接数据库」中配置需......
  • Apipost现已支持连接数据库!
    Apipost提供了数据库连接功能,在接口调试时可以使用数据库获取入参或进行断言校验。目前的Apipost支持:Mysql、SQLSever、Oracle、Clickhouse、达梦数据库、PostgreSQL、Redis、MongoDB8种数据库的连接操作新建数据库连接:在「项目设置」-「公共资源维护」-「连接数据库」中配置......
  • Wordpress Restful API Auth
    1.0Whydoesitnotwork? DELETE|http://127.0.0.1/wordpress.002/wp-json/wp/v2/smokes/20{"code":"rest_cannot_delete","message":"Sorry,youarenotallowedtodeletethispost.","data"......