pom 文件如下:
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>2.0.0</version>
</dependency>
</dependencies>
一、Kafka 生产者 API
package com.qjl.kafka.producer;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
/**
* 生产者API
* @author 曲健磊
*/
public class MyProducer {
public static void main(String[] args) {
// 1.配置生产者属性
Properties prop = new Properties();
// kafka节点地址
prop.put("bootstrap.servers", "192.168.0.1:9092");
// 发送消息是否等待应答
prop.put("acks", "all");
// 发送消息失败重试(重试的间隔时间)
prop.put("retries", "0");
// 配置批量处理消息大小
prop.put("batch.size", "1024");
// 配置批量处理数据延迟
prop.put("linger.ms", "5");
// 配置内存缓冲区
prop.put("buffer.memory", "10240");
// 消息发送前必须序列化
prop.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 序列化
prop.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 2.实例化生产者
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(prop);
// 3.发送消息
for (int i = 0; i < 99; i++) {
producer.send(new ProducerRecord<String, String>("demo", "I love Beijing" + i));
}
// 4.释放资源
producer.close();
}
}
带有接口回调的生产者API:
package com.qjl.kafka.producer;
import java.util.Properties;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
/**
* 接口回调
* @author 曲健磊
*/
public class MyProducerCallback {
public static void main(String[] args) {
// 1.配置生产者属性
Properties prop = new Properties();
// kafka节点地址
prop.put("bootstrap.servers", "192.168.0.1:9092");
// 发送消息是否等待应答
prop.put("acks", "all");
// 发送消息失败重试(重试的间隔时间)
prop.put("retries", "0");
// 配置批量处理消息大小
prop.put("batch.size", "1024");
// 配置批量处理数据延迟
prop.put("linger.ms", "5");
// 配置内存缓冲区
prop.put("buffer.memory", "10240");
// 消息发送前必须序列化
prop.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 序列化
prop.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 自定义分区规则
prop.put("partitioner.class", "com.qjl.kafka.partition.MyPartition");
// 2.实例化生产者
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(prop);
// 3.发送消息
for (int i = 0; i < 99; i++) {
producer.send(new ProducerRecord<String, String>("yuandan", "hello" + i), new Callback(){
public void onCompletion(RecordMetadata metdata, Exception ex) {
// 如果metdata不为null,拿到当前数据偏移量和分区
if (metdata != null) {
System.out.println(metdata.topic() + "----" + metdata.offset() + "----" + metdata.partition());
}
}
});
}
// 4.关闭资源
producer.close();
}
}
package com.qjl.kafka.partition;
import java.util.Map;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
public class MyPartition implements Partitioner {
// 配置
public void configure(Map<String, ?> arg0) {
}
public void close() {
}
// 返回分区号
public int partition(String arg0, Object arg1, byte[] arg2, Object arg3, byte[] arg4, Cluster arg5) {
// 消费者默认监控0号分区,而生产者发送的分区是1,所以在不指定消费者消费分区的号的前提下,消费者无法消费到数据
// return 1;
return 0;
}
}
二、Kafka 消费者 API
package com.qjl.kafka.consumer;
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
public class MyConsumer {
public static void main(String[] args) {
// 1.配置消费者属性
Properties prop = new Properties();
// kafka节点地址
prop.put("bootstrap.servers", "192.168.0.1:9092");
// 配置消费者组
prop.put("group.id", "group1");
// 配置自动确认偏移量offset
prop.put("enable.auto.commit", "true");
// 消息发送前必须序列化
prop.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// 序列化
prop.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// 2.实例消费者
final KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(prop);
// 4.释放资源,线程安全
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable(){
public void run() {
if (consumer != null) {
consumer.close();
}
}
}));
// 订阅消息
consumer.subscribe(Arrays.asList("demo", "yuandan", "t2"));
// 3.拉消息,poll
while (true) {
ConsumerRecords<String, String> records = consumer.poll(1000);
// 遍历消息
for (ConsumerRecord<String, String> r : records) {
System.out.println("消费:" + r.key() + "---" + r.value());
}
}
}
}
三、拦截器
package com.qjl.kafka.interceptor;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
/**
* 带有拦截器的生产者
* @author 曲健磊
*/
public class MyProducer {
public static void main(String[] args) {
// 1.配置生产者属性
Properties prop = new Properties();
// kafka节点地址
prop.put("bootstrap.servers", "192.168.0.1:9092");
// 发送消息是否等待应答
prop.put("acks", "all");
// 发送消息失败重试(重试的间隔时间)
prop.put("retries", "0");
// 配置批量处理消息大小
prop.put("batch.size", "1024");
// 配置批量处理数据延迟
prop.put("linger.ms", "5");
// 配置内存缓冲区
prop.put("buffer.memory", "10240");
// 消息发送前必须序列化
prop.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 序列化
prop.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 拦截器
List<String> interceptorList = new ArrayList();
interceptorList.add("com.qjl.kafka.interceptor.TimeInterceptor");
prop.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptorList);
// 2.实例化生产者
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(prop);
// 3.发送消息
for (int i = 0; i < 99; i++) {
producer.send(new ProducerRecord<String, String>("demo", "key" + i, "I love Beijing" + i));
}
// 4.释放资源
producer.close();
}
}
拦截器实现:
package com.qjl.kafka.interceptor;
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
public class TimeInterceptor implements ProducerInterceptor<String, String> {
// 配置信息
public void configure(Map<String, ?> configs) {
// TODO Auto-generated method stub
}
// 拦截逻辑
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
return new ProducerRecord<String, String>(record.topic(),
record.partition(),
record.key(),
System.currentTimeMillis() + "-" + record.value());
}
// 发送失败时的应答
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
}
// 关闭
public void close() {
}
}
PS:拦截器其实就是起一个过滤的所用,把生产者发送的消息进行过滤。
四、流计算
Kafka 只能进行简单的流计算,更加复杂的流计算还需要依靠 Storm,JStorm,SparkStreaming,Flink 等。
package com.qjl.kafka.stream;
import java.util.Properties;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorSupplier;
/**
* 对数据进行清洗
* @author qujianlei
*/
public class Application {
/**
* qujianlei-china 把-去掉
* @param args
*/
public static void main(String[] args) {
// 1.定义主题,发送到另外一个主题中,数据清洗
String oneTopic = "t1";
String twoTopic = "t2";
// 2.设置属性
Properties prop = new Properties();
prop.put(StreamsConfig.APPLICATION_ID_CONFIG, "logProcessor");
// 多台节点的话用逗号分隔
prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.1:9092");
// 3.实例对象
StreamsConfig config = new StreamsConfig(prop);
// 4.流计算,拓扑
Topology topology = new Topology();
// 5.定义kafka组件来源
topology.addSource("Source", oneTopic).addProcessor("Processor", new ProcessorSupplier<byte[], byte[]>(){
public Processor<byte[], byte[]> get() {
return new LogProcessor();
}
// 数据从Source中来,到Sink中去
}, "Source").addSink("Sink", twoTopic, "Processor");
// 6.实例化KafkaStream
KafkaStreams kafkaStream = new KafkaStreams(topology, prop);
kafkaStream.start();
}
}
package com.qjl.kafka.stream;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
/**
* 完成数据清洗
* @author qujianlei
*/
public class LogProcessor implements Processor<byte[], byte[]> {
private ProcessorContext context;
// 释放资源
public void close() {
}
// 初始化
public void init(ProcessorContext context) {
this.context = context;
}
// 执行具体清洗的业务逻辑
public void process(byte[] key, byte[] value) {
// 1.拿到消息数据,转成字符串
String message = new String(value);
// 2.如果包含-就去除
if (message.contains("-")) {
// 去掉-及左侧的数据
message = message.split("-")[1];
}
// 3.发送数据
context.forward(key, message.getBytes());
}
}
测试:先启动 Application 程序,在 kafka 上创建两个 topic:t1,t2,启动一个生产者向 t1 发送数据,启动一个消费者在 t2 接收数据,消费者就可以在 t2 上接收到经过处理后的数据。