首页 > 编程语言 >javaclient操作kafka&springboot整合kafka&kafka分区

javaclient操作kafka&springboot整合kafka&kafka分区

时间:2022-12-17 18:39:34浏览次数:59  
标签:springboot kafka topic org apache import public javaclient

1. javaclient 测试kafka

1. 配置kafka 允许远程推送

修改config/Kraft/server.properties 文件,,将地址变为服务器公网IP地址。

advertised.listeners=PLAINTEXT://localhost:9092

然后重启

2. 测试AdminClient 对topic等元数据的管理

测试类以及结果:

package cn.qz.cloud.kafka.client;

import com.google.common.collect.Sets;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.*;

import java.util.*;
import java.util.concurrent.ExecutionException;

/**
* 对Topic的CRUD
*/
@Slf4j
public class KafkaAdminTest {

public static Properties props = new Properties();

static {
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaConstants.BOOTSTRAP_SERVER);
props.put("request.timeout.ms", 60000);
}

public static void main(String[] args) throws ExecutionException, InterruptedException {
createTopic();
describeTopic();
}

public static void createTopic() throws ExecutionException, InterruptedException {
String topicName = KafkaConstants.TOPIC_NAME;
try (AdminClient adminClient = AdminClient.create(props)) {
/**
* 2 代表分区
* 1 代表副本
*/
NewTopic newTopic = new NewTopic(topicName, 2, (short) 1);
CreateTopicsResult topics = adminClient.createTopics(Collections.singletonList(newTopic));
log.info("{}", topics.all().get());
}
}

public static void listTopic() throws ExecutionException, InterruptedException {
ListTopicsOptions listTopicsOptions = new ListTopicsOptions();
listTopicsOptions.listInternal(true);
try (AdminClient adminClient = AdminClient.create(props)) {
ListTopicsResult listTopicsResult = adminClient.listTopics(listTopicsOptions);
Collection<TopicListing> topicListings = listTopicsResult.listings().get();
log.info("{}", topicListings);
/**
* [(name=quickstart-events, topicId=rPIXse70QvK3Rri24a-bNg, internal=false), (name=myTopic1, topicId=E6i1TbWXTz-11yKI207ZLA, internal=false), (name=__consumer_offsets, topicId=38T6UsJSRn2BL6tnfj5Wfg, internal=true)]
*/
}
}

public static void deleteTopic() throws ExecutionException, InterruptedException {
String topicName = KafkaConstants.TOPIC_NAME;
try (AdminClient adminClient = AdminClient.create(props)) {
DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(Sets.newHashSet(topicName));
log.info("{}", deleteTopicsResult);
}
}

public static void describeTopic() throws ExecutionException, InterruptedException {
String topicName = KafkaConstants.TOPIC_NAME;
try (AdminClient adminClient = AdminClient.create(props)) {
DescribeTopicsResult topicsResult = adminClient.describeTopics(Arrays.asList(topicName));
Map<String, TopicDescription> topicDescription = topicsResult.all().get();
log.info("{}", topicDescription);
/**
* {myTopic1=(name=myTopic1, internal=false, partitions=(partition=0, leader=x.x.x.x:9092 (id: 1 rack: null), replicas=x.x.x.x:9092 (id: 1 rack: null), isr=x.x.x.x:9092 (id: 1 rack: null)),(partition=1, leader=x.x.x.x:9092 (id: 1 rack: null), replicas=x.x.x.x:9092 (id: 1 rack: null), isr=x.x.x.x:9092 (id: 1 rack: null)), authorizedOperations=null)}
*/
}
}
}

3. 消息生产者

下面重新创建myTopic1。 设置分区位6,副本为1。启动一个消费者进行监听测试:

bin/kafka-console-consumer.sh --topic myTopic1 --from-beginning --bootstrap-server localhost:9092

1. ProducerRecord 介绍

向topic 发送消息的时候是发送这么一条消息。源码如下:

public class ProducerRecord<K, V> {

private final String topic;
private final Integer partition;
private final Headers headers;
private final K key;
private final V value;
private final Long timestamp;

/**
* Creates a record with a specified timestamp to be sent to a specified topic and partition
*
* @param topic The topic the record will be appended to
* @param partition The partition to which the record should be sent
* @param timestamp The timestamp of the record, in milliseconds since epoch. If null, the producer will assign
* the timestamp using System.currentTimeMillis().
* @param key The key that will be included in the record
* @param value The record contents
* @param headers the headers that will be included in the record
*/
public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers) {
if (topic == null)
throw new IllegalArgumentException("Topic cannot be null.");
if (timestamp != null && timestamp < 0)
throw new IllegalArgumentException(
String.format("Invalid timestamp: %d. Timestamp should always be non-negative or null.", timestamp));
if (partition != null && partition < 0)
throw new IllegalArgumentException(
String.format("Invalid partition: %d. Partition number should always be non-negative or null.", partition));
this.topic = topic;
this.partition = partition;
this.key = key;
this.value = value;
this.timestamp = timestamp;
this.headers = new RecordHeaders(headers);
}

可以看到可以指定partition、key、value、headers,其中只有topic和value是必须的。其逻辑如下:

  1. 若指定Partition ID,则PR被发送至指定Partition
  2. 若未指定Partition ID,但指定了Key, PR会按照hasy(key)发送至对应Partition
  3. 若既未指定Partition ID也没指定Key,PR会按照round-robin模式发送到每个Partition
  4. 若同时指定了Partition ID和Key, PR只会发送到指定的Partition (Key不起作用,代码逻辑决定)

比如发送一条消息如下:

Header header = new RecordHeader("testHeader", "testHeaderValue".getBytes());
ProducerRecord producerRecord = new ProducerRecord(topic, null, null, "TEST_KEY", msg, Sets.newHashSet(header));

消费者收到的消息如下:(也就是消费者可以拿到header的消息)

topic: myTopic1, partition: 2, offset: 0, key: TEST_KEY, value: testMsg
key: testHeader, value: testHeaderValue

下面发送的消息以及消费者都简单的发送字符串消息,不指定key、也不指定partition、也不指定header。

2. 发送消息

下面代码演示了同步发送、异步发送、基于幂等发送、以及基于事务的发送消息。

package cn.qz.cloud.kafka.client;

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.commons.lang3.builder.ToStringStyle;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

@Slf4j
public class Producer {

private Properties properties = new Properties();

private KafkaProducer kafkaProducer;

public Producer() {
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaConstants.BOOTSTRAP_SERVER);
/**
* client 的作用是
*/
// properties.put(ProducerConfig.CLIENT_ID_CONFIG, "client1");
/**
* 序列化方法
*/
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.BATCH_SIZE_CONFIG, "16384"); // DEFAULT 16384 = 16K
/**
* acks=0 消息发送出去,不管数据是否从Partition Leader上写到磁盘是否成功,直接认为消息发送成功。
* acks = 1 Partition Leader接收到消息并写入本地磁盘,就认为消息发送成功,不管其他的Follower有没有同步消息
* acks=all Partition Leader接收到消息之后,必须确认ISR列表里跟Leader保持同步的Follower列表集合都要同步此消息后,客户端才认为消息发送成功
*/
properties.put(ProducerConfig.ACKS_CONFIG, "all"); // default 1
properties.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "3000"); // DEFAULT 3000 ms = 3 s
// 更多默认值参考: CommonClientConfigs
}

/**
* 简单的发送消息
*/
public void produce(SendTypeEnum sendTypeEnum, String msg) {
String topic = KafkaConstants.TOPIC_NAME;
try {
kafkaProducer = new KafkaProducer(properties);
long startTime = System.currentTimeMillis();
// 异步
if (sendTypeEnum == SendTypeEnum.ASYNC) {
kafkaProducer.send(new ProducerRecord(topic, msg), new ProducerCallBack(startTime, msg));
}
// 发出去不关心结果
// 方法返回的是一个Future 对象,不调用get 则不会阻塞
if (SendTypeEnum.WITHOUT_RESULT == sendTypeEnum) {
kafkaProducer.send(new ProducerRecord(topic, msg));
}
// 同步:org.apache.kafka.clients.producer.KafkaProducer.send(org.apache.kafka.clients.producer.ProducerRecord<K,V>)
// 方法返回的是一个Future 对象,调用get 则是阻塞等待结果
if (SendTypeEnum.SYNC_WITH_RESULT == sendTypeEnum) {
RecordMetadata rm = (RecordMetadata) kafkaProducer.send(new ProducerRecord(topic, msg)).get();
log.info("rm: {}", ToStringBuilder.reflectionToString(rm, ToStringStyle.NO_CLASS_NAME_STYLE));
}
} catch (Exception e) {
log.error("produce error", e);
} finally {
kafkaProducer.close();
}
}

/**
* 开启幂等性
*
* @param msg
*/
public void produceIdempotence(String msg) {
// 设置幂等之后,重试次数将变为Integer.MAX_VALUE 次, 且acks 被设为all
/**
* Producer ID(即PID)和Sequence Number
* PID。每个新的Producer在初始化的时候会被分配一个唯一的PID,这个PID对用户是不可见的。
* Sequence Numbler。(对于每个PID,该Producer发送数据的每个<Topic, Partition>都对应一个从0开始单调递增的Sequence Number。Broker端在缓存中保存了这seq number,对于接收的每条消息,如果其序号比Broker缓存中序号大于1则接受它,否则将其丢弃。这样就可以实现了消息重复提交了。
* 它只能保证单分区上的幂等性,即一个幂等性Producer 能够保证某个主题的一个分区上不出现重复消息,它无法实现多个分区的幂等性。其次,它只能实现单会话上的幂等性,不能实现跨会话的幂等性。
*/
properties.put("enable.idempotence", "true");//开启幂等性
try {
kafkaProducer = new KafkaProducer(properties);
long startTime = System.currentTimeMillis();
kafkaProducer.send(new ProducerRecord(KafkaConstants.TOPIC_NAME, msg, msg), new ProducerCallBack(startTime, msg));
} catch (Exception e) {
log.error("", e);
} finally {
kafkaProducer.close();
}
}

/**
* 开启事务
* 事务是基于PID。
* transactional.id与producerId在事务管理器中是一一对应关系,即transactional.id作为key,producerId作为value这样的键值对方式存储在事务管理器中。
* 当producer恢复时,会通过用户自己指定的transactional.id从事务管理器获取producerId,以此来确保幂等性不同会话之间发送数据的幂等性。
*/
public void produceInTransaction() {
properties.put("transactional.id", "myTx");
kafkaProducer = new KafkaProducer(properties);
kafkaProducer.initTransactions();
try {
long startTime = System.currentTimeMillis();
try {
kafkaProducer.beginTransaction();
for (int i = 0; i < 100; i++) {
String messageStr = "message_" + i;
if (i == 99) {
throw new RuntimeException("XXX");
}
kafkaProducer.send(new ProducerRecord(KafkaConstants.TOPIC_NAME, messageStr, messageStr),
new ProducerCallBack(startTime, messageStr));
}
kafkaProducer.commitTransaction();
} catch (ProducerFencedException e) {
kafkaProducer.close();
log.error("", e);
} catch (OutOfOrderSequenceException e) {
kafkaProducer.close();
log.error("", e);
} catch (Exception e) {
kafkaProducer.abortTransaction();
log.warn("", e);
}
} catch (Exception e) {
log.error("", e);
} finally {
kafkaProducer.close();
}
}

@Slf4j
private static class ProducerCallBack implements Callback {

private final long startTime;

private final String message;

public ProducerCallBack(long startTime, String message) {
this.startTime = startTime;
this.message = message;
}

/**
* 收到Kafka服务端发来的Ack确认消息后,会调用此函数
*
* @param metadata 生产者发送消息的元数据,如果发送过程出现异常,此参数为null
* @param e 发送过程出现的异常,如果发送成功此参数为空
*/
public void onCompletion(RecordMetadata metadata, Exception e) {
long elapsedTime = System.currentTimeMillis() - startTime;
if (metadata != null) {
log.info("send success! partition:{}, offset:{}, messgage:{}, elapsedTimeMs:{}", metadata.partition(), metadata.offset(), message, elapsedTime);
} else {
log.error("", e);
}
}
}

public enum SendTypeEnum {

/**
* Async
*/
ASYNC,

/**
* 不关注结果,发出去就行
*/
WITHOUT_RESULT,

/**
* 同步发送
*/
SYNC_WITH_RESULT;
}

public static void main(String[] args) {
Producer producer = new Producer();
for (int i = 0; i < 10; i++) {
producer.produce(SendTypeEnum.ASYNC, "testMsg" + i);
}
}
}

4. 消息消费者

消息有手动提交和异步提交。手动提交需要自己commit然后来记录便宜量,异步提交不需要自己提交offset。

1. 自动提交:

package cn.qz.cloud.kafka.client;

import cn.hutool.core.collection.CollectionUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Properties;

@Slf4j
public class Consumer {

private static Properties properties = new Properties();

static {
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaConstants.BOOTSTRAP_SERVER); //required
properties.put(ConsumerConfig.GROUP_ID_CONFIG, KafkaConstants.Concumer.GROUP_ID);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "300000");//default 300000
properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "500");//default 500
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); // 设置是否自动提交,设为true之后偏移量会自动记录,不需要自己ack
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "4194304"); // 服务端允许的最大消息大小为4MB。
}

private KafkaConsumer kafkaConsumer;

public void consume() {
kafkaConsumer = new KafkaConsumer(properties);
kafkaConsumer.subscribe(Arrays.asList(KafkaConstants.TOPIC_NAME), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
System.out.println(1);
}

@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
System.out.println(2);
}
});

try {
while (true) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(20));
for (ConsumerRecord<String, String> record : records) {
log.info("topic: {}, partition: {}, offset: {}, key: {}, value: {}",
record.topic(), record.partition(), record.offset(), record.key(), record.value());
/**
* 如果生产者发送了消息header,消费者可以获取到
*/
Headers headers = record.headers();
if (CollectionUtil.isNotEmpty(headers)) {
headers.forEach(h -> {
log.info("key: {}, value: {}", h.key(), new String(h.value()));
});
}
}
}
} catch (Exception e) {
log.error("", e);
} finally {
kafkaConsumer.close();
}

}

public static void main(String[] args) throws Exception {
Consumer consumerDemo = new Consumer();
consumerDemo.consume();
}

}

2. 手动提交

package cn.qz.cloud.kafka.client;

import cn.hutool.core.collection.CollectionUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Properties;

@Slf4j
public class Consumer {

private static Properties properties = new Properties();

static {
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaConstants.BOOTSTRAP_SERVER); //required
properties.put(ConsumerConfig.GROUP_ID_CONFIG, KafkaConstants.Concumer.GROUP_ID);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "300000");//default 300000
properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "500");//default 500
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); // 设置是否自动提交,设为true之后偏移量会自动记录,不需要自己ack
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "4194304"); // 服务端允许的最大消息大小为4MB。
}

private KafkaConsumer kafkaConsumer;

public void consume() {
kafkaConsumer = new KafkaConsumer(properties);
kafkaConsumer.subscribe(Arrays.asList(KafkaConstants.TOPIC_NAME), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
System.out.println(1);
}

@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
System.out.println(2);
}
});

try {
while (true) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(20));
for (ConsumerRecord<String, String> record : records) {
log.info("topic: {}, partition: {}, offset: {}, key: {}, value: {}",
record.topic(), record.partition(), record.offset(), record.key(), record.value());
/**
* 如果生产者发送了消息header,消费者可以获取到
*/
Headers headers = record.headers();
if (CollectionUtil.isNotEmpty(headers)) {
headers.forEach(h -> {
log.info("key: {}, value: {}", h.key(), new String(h.value()));
});
}
}
// 提交offset
kafkaConsumer.commitAsync();
}
} catch (Exception e) {
log.error("", e);
} finally {
kafkaConsumer.close();
}

}

public static void main(String[] args) throws Exception {
Consumer consumerDemo = new Consumer();
consumerDemo.consume();
}

}

2. springboot 项目测试kafka

  1. pom配置引入kafka
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
  1. 新增kafka相关配置
server:
port: 8080

spring:
#kafka配置
kafka:
#这里改为你的kafka服务器ip和端口号
bootstrap-servers: xxx:9092
#=============== producer =======================
producer:
#如果该值大于零时,表示启用重试失败的发送次数
retries: 0
#每当多个记录被发送到同一分区时,生产者将尝试将记录一起批量处理为更少的请求,默认值为16384(单位字节)
batch-size: 16384
#生产者可用于缓冲等待发送到服务器的记录的内存总字节数,默认值为3355443
buffer-memory: 33554432
#key的Serializer类,实现类实现了接口org.apache.kafka.common.serialization.Serializer
key-serializer: org.apache.kafka.common.serialization.StringSerializer
#value的Serializer类,实现类实现了接口org.apache.kafka.common.serialization.Serializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
#=============== consumer =======================
consumer:
#用于标识此使用者所属的使用者组的唯一字符串
group-id: test-consumer-group
#当Kafka中没有初始偏移量或者服务器上不再存在当前偏移量时该怎么办,默认值为latest,表示自动将偏移重置为最新的偏移量
#可选的值为latest, earliest, none
auto-offset-reset: earliest
#消费者的偏移量将在后台定期提交,默认值为true
enable-auto-commit: true
#如果'enable-auto-commit'为true,则消费者偏移自动提交给Kafka的频率(以毫秒为单位),默认值为5000。
auto-commit-interval: 100
#密钥的反序列化器类,实现类实现了接口org.apache.kafka.common.serialization.Deserializer
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
#值的反序列化器类,实现类实现了接口org.apache.kafka.common.serialization.Deserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  1. 增加类:生产者、消费者
package cn.qz.cloud.kafka.springboot.springboot;

import cn.qz.cloud.kafka.client.KafkaConstants;
import com.google.common.collect.Lists;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.KafkaAdmin;

import javax.annotation.PostConstruct;
import java.util.ArrayList;
import java.util.List;

@Configuration
public class kafkaConfig {

@Autowired
private KafkaAdmin kafkaAdmin;

@PostConstruct
public void init() {
/**
* init topic
*/
AdminClient adminClient = AdminClient.create(kafkaAdmin.getConfig());
adminClient.deleteTopics(Lists.newArrayList(KafkaConstants.TOPIC_NAME));
List<NewTopic> topics = new ArrayList<>();
topics.add(new NewTopic(KafkaConstants.TOPIC_NAME, 3, (short) 1));
adminClient.createTopics(topics);
System.out.println("创建topic成功");
}
}
===

package cn.qz.cloud.kafka.springboot.springboot;

import cn.qz.cloud.kafka.client.KafkaConstants;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping
public class Producer {

@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;

@GetMapping("/index")
public String index() {
return "index";
}

@GetMapping("/send-msg")
public String send(@RequestParam String msg) {
//生产消息
ListenableFuture<SendResult<String, Object>> listenableFuture = kafkaTemplate.send(KafkaConstants.TOPIC_NAME, msg, msg);
listenableFuture.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
@Override
public void onFailure(Throwable throwable) {
throwable.printStackTrace();
}

@Override
public void onSuccess(SendResult<String, Object> stringObjectSendResult) {
System.out.println(stringObjectSendResult);
}
});
return msg;
}

}

===
package cn.qz.cloud.kafka.springboot.springboot;

import cn.qz.cloud.kafka.client.KafkaConstants;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class Consumer {

/**
* org.springframework.kafka.annotation.KafkaListener 可以指定分区,指定groupId 等参数
*
* @param record
*/
@KafkaListener(topics = {KafkaConstants.TOPIC_NAME})
public void handMessage(ConsumerRecord<String, String> record) {
String topic = record.topic();
String msg = record.value();
System.out.println("消费者接受消息:topic-->" + topic + ",msg->>" + msg);
}
}

关于配置参考:

org.springframework.boot.autoconfigure.kafka.KafkaProperties

3. 关于kafka 的分区

1. Kafka 的分区数量可以修改:

[root@VM-8-16-centos kafka_2.13-3.3.1]# bin/kafka-topics.sh --describe --topic myTopic1 --bootstrap-server localhost:9092
Topic: myTopic1 TopicId: 9LsqbI1dRVelPxx-3FJ9lw PartitionCount: 3 ReplicationFactor: 1 Configs: segment.bytes=1073741824
Topic: myTopic1 Partition: 0 Leader: 1 Replicas: 1 Isr: 1
Topic: myTopic1 Partition: 1 Leader: 1 Replicas: 1 Isr: 1
Topic: myTopic1 Partition: 2 Leader: 1 Replicas: 1 Isr: 1
[root@VM-8-16-centos kafka_2.13-3.3.1]# bin/kafka-topics.sh --alter --topic myTopic1 --bootstrap-server localhost:9092 --partitions 12
[root@VM-8-16-centos kafka_2.13-3.3.1]# bin/kafka-topics.sh --describe --topic myTopic1 --bootstrap-server localhost:9092
Topic: myTopic1 TopicId: 9LsqbI1dRVelPxx-3FJ9lw PartitionCount: 12 ReplicationFactor: 1 Configs: segment.bytes=1073741824
Topic: myTopic1 Partition: 0 Leader: 1 Replicas: 1 Isr: 1
Topic: myTopic1 Partition: 1 Leader: 1 Replicas: 1 Isr: 1
Topic: myTopic1 Partition: 2 Leader: 1 Replicas: 1 Isr: 1
Topic: myTopic1 Partition: 3 Leader: 1 Replicas: 1 Isr: 1
Topic: myTopic1 Partition: 4 Leader: 1 Replicas: 1 Isr: 1
Topic: myTopic1 Partition: 5 Leader: 1 Replicas: 1 Isr: 1
Topic: myTopic1 Partition: 6 Leader: 1 Replicas: 1 Isr: 1
Topic: myTopic1 Partition: 7 Leader: 1 Replicas: 1 Isr: 1
Topic: myTopic1 Partition: 8 Leader: 1 Replicas: 1 Isr: 1
Topic: myTopic1 Partition: 9 Leader: 1 Replicas: 1 Isr: 1
Topic: myTopic1 Partition: 10 Leader: 1 Replicas: 1 Isr: 1
Topic: myTopic1 Partition: 11 Leader: 1 Replicas: 1 Isr: 1

2. kafka 的分区策略如下

如果是kafka-client,取分区的默认实现是:org.apache.kafka.clients.producer.internals.DefaultPartitioner

package org.apache.kafka.clients.producer.internals;

import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.utils.Utils;

public class DefaultPartitioner implements Partitioner {
private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap();

public DefaultPartitioner() {
}

public void configure(Map<String, ?> configs) {
}

public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if (keyBytes == null) {
int nextValue = this.nextValue(topic);
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
if (availablePartitions.size() > 0) {
int part = Utils.toPositive(nextValue) % availablePartitions.size();
return ((PartitionInfo)availablePartitions.get(part)).partition();
} else {
return Utils.toPositive(nextValue) % numPartitions;
}
} else {
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}

private int nextValue(String topic) {
AtomicInteger counter = (AtomicInteger)this.topicCounterMap.get(topic);
if (null == counter) {
counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
AtomicInteger currentCounter = (AtomicInteger)this.topicCounterMap.putIfAbsent(topic, counter);
if (currentCounter != null) {
counter = currentCounter;
}
}

return counter.getAndIncrement();
}

public void close() {
}
}

这里可以看到如果有key,会将key进行计算得到值,然后转为整数,和分区数量取模做运算;如果没传,类似轮询的方式发送。

调用分区是在:

org.apache.kafka.clients.producer.KafkaProducer#send(org.apache.kafka.clients.producer.ProducerRecord<K,V>)
->
org.apache.kafka.clients.producer.KafkaProducer#doSend
->
org.apache.kafka.clients.producer.KafkaProducer#partition 源码如下:
private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
Integer partition = record.partition();
return partition != null ? partition : this.partitioner.partition(record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
}

3. 自定义自己的分区策略

  1. 新建实现类:一直送到分区0
package cn.qz.cloud.kafka.client;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;

import java.util.Map;

public class CustomPartitioner implements Partitioner {

@Override
public int partition(String s, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) {
return 0;
}

@Override
public void close() {

}

@Override
public void configure(Map<String, ?> map) {

}
}
  1. 生产者配置指定分区策略
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "cn.qz.cloud.kafka.client.CustomPartitioner");

4. 和ES分片区别

ES不

1.kafka很容易的通过管理工具增加新的分区,这种方式只会对指定了key的消息产生影响,但是这种影响其实不大,因为消费者其实还是能消费到全部的消息
2.相比较之下es不支持增加分区,原因在于es的查询流程中:query phase–fetch phase,fetch phase的情况下是根据id去获取文档的,如果此时分区数变化了,那么就会有很多id获取不到文档数据,而其实这个文档数据是存在于es的另外的分片中的,所以es并不支持在线增加分区

解释:

1.ES你先当它是个数据库,然后,你设想一种场景,你程序里自定义分库分表规则,按uid分片,uid尾号为0的在0号库,尾号1的在1号库,以此类推,你一共分了10个库。

OK,现在要加第11个库,从改了规则那一刻,就需要有数据迁移,数据迁移的过程,你如果要做到平滑,人为完成都非常麻烦。

  1. Kafka本身就要是订阅某个主题,然后会有一个group cordinator来分配机器A消费分区1,机器B消费分区2

本身就是按分区来消费的,无论扩缩容,就不存在问题。

【当你用心写完每一篇博客之后,你会发现它比你用代码实现功能更有成就感!】



标签:springboot,kafka,topic,org,apache,import,public,javaclient
From: https://blog.51cto.com/u_12826294/5949949

相关文章

  • SpringBoot注解~@PropertySource
    1.@PropertySourceSpringBoot读取配置信息的方式有以下几种。但实际开发中一个配置文件是不够用的,比如项目中集成mongorediskafka等需要多个配置文件,这样有利于开发以......
  • SpringBoot启动流程
    1.简述Springboot启动是通过Application启动类实现@SpringBootApplication(exclude={MongoAutoConfiguration.class,MongoDataAutoConfiguration.class},......
  • 【SpringBoot】Spring Data Redis封装和Spring Cache
    一、参考资料​​RedisUtil:最全的Java操作Redis的工具类,使用StringRedisTemplate实现,封装了对Redis五种基本类型的各种操作!​​​​SpringCache-简书​​​​redis分布......
  • 【SpringBoot】封装自定义的starter
    一、参考资料​​SpringBoot封装自己的Starter-码农教程​​​​[Gradle]发布构件到本地仓库​​​​Gradle插件之maven-publish:发布androidlibrary到maven仓库-知乎......
  • SpringBoot2.x 优秀开源项目
    前后端分离vue开源项目:项目名开源地址eladmin-web​​https://gitee.com/elunez/eladmin-web​​eladmin​​https://gitee.com/elunez/eladmin​​RuoYi-Vue​​https://gi......
  • 阿里云服务器部署springboot项目
    一、安装jdk​​安装jdk​​二、安装mysql下载安装包:​​​rpm-ivhhttp://dev.mysql.com/get/mysql57-community-release-el7-8.noarch.rpm​​设置开机自启动:​​​yum......
  • dnmp 运行多PHP版本--PHP74安装支持swoole,kafka, redis
    官方文档:https://github.com/yeszao/dnmp本身默认PHP7.1版本如果需要同时支持多个版本PHP,需要另外在配置下面举例子配置多个PHP版本--PHP7.4dnmp/service目录下......
  • zookeeper+kafka
    目录:Zookperzookeeper概述zookeeper概述zookeeper工作机制zookper特点zookeeper应用场景zookeeper选举机制实验Kafka为什么需要消息队列使用消......
  • SpringBoot(六):配置文件的位置以及优先级
    SpringApplication 从以下位置的 application.properties 文件中加载属性(properties),并将它们添加到Spring Environment 中:项目目录的 /config 子目录项目目录的......
  • 如何在SpringBoot中优雅地重试调用第三方API?
    前言作为后端程序员,我们的日常工作就是调用一些第三方服务,将数据存入数据库,返回信息给前端。但你不能保证所有的事情一直都很顺利。像有些第三方API,偶尔会出现超时。此时,......