0 环境
集群信息
4个broker
topic:100+(每个topic30个partition)
集群加密方式:plaintext
存储:ceph
Producer
单线程,每秒50条左右
Consumer
不间断poll消息
测试客户端
原生的KafkaConsumer/KafkaProducer;
测试场景
Producer和Consumer运行过程中,通过登录 kakfa 运行主机,直接停止当前 broker 服务。
在程序运行过程中,模拟宕机,Consumer和Producer可能会出现以下异常:
1 Consumer异常信息汇总
主要是消费者协调器异常、offset提交失败以及连接失败等信息。
[Consumer clientId=consumer-zhurunhua-test-ssl-1, groupId=zhurunhua-test-ssl] Error sending fetch request (sessionId=855212619, epoch=419) to node 3:
org.apache.kafka.common.errors.DisconnectException: null
===========================================
[34m2020-12-15 11:04:33.017[0;39m [Consumer clientId=consumer-zhurunhua-test-1-1, groupId=zhurunhua-test-1] Discovered group coordinator kafka-reksgten-0.kafka-reksgten-headless.kafka.svc.xke.test.xdf.cn:29092 (id: 2147483647 rack: null)
[31m2020-12-15 11:04:33.023[0;39m [Consumer clientId=consumer-zhurunhua-test-1-1, groupId=zhurunhua-test-1] Error connecting to node kafka-reksgten-0.kafka-reksgten-headless.kafka.svc.xke.test.xdf.cn:29092 (id: 2147483647 rack: null)
java.net.UnknownHostException: kafka-reksgten-0.kafka-reksgten-headless.kafka.svc.xke.test.xdf.cn: nodename nor servname provided, or not known
===========================================
2020-12-03 17:41:28.212 ERROR 33340 --- [read | test-tsl] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-1, groupId=test-tsl] Offset commit failed on partition test-topic-1-3 at offset 49854: The coordinator is loading and hence can't process requests.
2020-12-03 17:35:23.471 ERROR 33340 --- [read | test-tsl] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-1, groupId=test-tsl] Offset commit failed on partition test-topic-1-3 at offset 49823: This is not the correct coordinator.
===========================================
2020-12-16 10:47:16.416 [INFO ] [ at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatResponseHandler.handle(AbstractCoordinator.java:1077)
] [Consumer clientId=consumer-zhurunhua-test-ssl-1, groupId=zhurunhua-test-ssl] Attempt to heartbeat failed since coordinator kafka-kbzngtcn-0.kafka-kbzngtcn-headless.kafka.svc.xke.test.xdf.cn:29092 (id: 2147483647 rack: null) is either not started or not valid
2020-12-16 10:47:16.417 [INFO ] [ at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.markCoordinatorUnknown(AbstractCoordinator.java:867)
] [Consumer clientId=consumer-zhurunhua-test-ssl-1, groupId=zhurunhua-test-ssl] Group coordinator kafka-kbzngtcn-0.kafka-kbzngtcn-headless.kafka.svc.xke.test.xdf.cn:29092 (id: 2147483647 rack: null) is unavailable or invalid, will attempt rediscovery
2020-12-16 10:47:16.534 [INFO ] [ at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$FindCoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:815)
] [Consumer clientId=consumer-zhurunhua-test-ssl-1, groupId=zhurunhua-test-ssl] Discovered group coordinator kafka-kbzngtcn-1.kafka-kbzngtcn-headless.kafka.svc.xke.test.xdf.cn:29092 (id: 2147483646 rack: null)
2020-12-16 10:47:16.665 [INFO ] [ at org.apache.kafka.clients.FetchSessionHandler.handleResponse(FetchSessionHandler.java:403)
] [Consumer clientId=consumer-zhurunhua-test-ssl-1, groupId=zhurunhua-test-ssl] Node 3 was unable to process the fetch request with (sessionId=1429230934, epoch=40): FETCH_SESSION_ID_NOT_FOUND.
2 Producer异常信息汇总
send error This server is not the leader for that topic-partition.
===================================
[31m2020-12-15 11:09:06.008[0;39m [Producer clientId=producer-1] Got error produce response with correlation id 880 on topic-partition zhurunhua-test-1-1, retrying (4 attempts left). Error: NOT_LEADER_OR_FOLLOWER
[31m2020-12-15 11:09:06.008[0;39m [Producer clientId=producer-1] Received invalid metadata error in produce request on partition zhurunhua-test-1-1 due to org.apache.kafka.common.errors.NotLeaderOrFollowerException: For requests intended only for the leader, this error indicates that the broker is not the current leader. For requests intended for any replica, this error indicates that the broker is not a replica of the topic partition.. Going to request metadata update now
3 测试程序
3.1 Consumer
package cn.xdf.xadd.testing;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
/**
* @author zhurunhua
* @since 12/18/20 4:52 PM
*/
@SpringBootTest
@Slf4j
public class ConsumerTest {
@Test
public void consumer() {
KafkaConsumer<String, String> consumer = getConsumer();
consumer.subscribe(Collections.singletonList("test-broker-down-23"));
while (true) {
ConsumerRecords<String, String> records;
try {
//如果Kafka没有消息,等待1秒,超过1秒还没数据,则返回空集合
records = consumer.poll(Duration.ofSeconds(1L));
} catch (Exception e) {
log.error("poll error:", e);
continue;
}
if (!records.isEmpty()) {
for (ConsumerRecord<String, String> record : records) {
log.info("ssl poll message success:partition:{} offset:{},leaderEpoch:{}", record.partition(), record.offset(), record.leaderEpoch());
}
}
}
}
private KafkaConsumer<String, String> getConsumer() {
Properties props = new Properties();
// 必须设置的属性
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-reksgten-headless.kafka.svc.xke.test.xdf.cn:29092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-broker-down-1");
// 可选设置属性
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// 自动提交offset,每1s提交一次
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);
return new KafkaConsumer<>(props);
}
}
3.2 Producer
对参数配置没有做优化,都是用默认的,采用同步发送的模式(调用send方法的get方法,其实只是客户端拿到响应结果,服务端还是异步发送的)
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import java.util.Objects;
import java.util.Properties;
import static cn.xdf.xadd.testing.entity.User.getUser;
@SpringBootTest
@Slf4j
public class ProducerTest {
@Test
public void produce() throws Exception {
KafkaProducer<String, String> producer = getProducer();
// while (true) {
long loopStart = System.currentTimeMillis();
for (int i = 1; i <= 2000; i++) {
long start = System.currentTimeMillis();
try {
RecordMetadata metadata = producer.send(new ProducerRecord<>("test-broker-down-14", JSON.toJSONString(getUser())), (recordMetadata, e) -> {
if (Objects.nonNull(e)) {
log.error("send error:", e);
}
}).get();
log.info("ssl send success,partition:{},offset:{},cost:{}ms", metadata.partition(), metadata.offset(), System.currentTimeMillis() - start);
} catch (Exception e) {
log.error("send error:", e);
}
log.info("------->current finish:{}", i);
Thread.sleep(10);
}
log.info("loop finished,cost:{}", System.currentTimeMillis() - loopStart);
}
private KafkaProducer<String, String> getProducer() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-reksgten-headless.kafka.svc.xke.test.xdf.cn:29092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
return new KafkaProducer<>(props);
}
}
测试代码
import lombok.AllArgsConstructor;
import lombok.Data;
import org.apache.commons.lang3.RandomStringUtils;
@Data
@AllArgsConstructor
public class User {
private String username;
private String address;
private String phone;
private String birthday;
private String country;
public static User getUser() {
return new User(getRandomString(10), getRandomString(50), getRandomString(11), getRandomString(10), getRandomString(4));
}
private static String getRandomString(int length) {
return RandomStringUtils.random(length, true, true);
}
}
4 测试场景
4.1 杀掉Partition 0、2的Leader所在Broker,观测Consumer
Consumer在检测到Broker故障后,停止消息拉取,大概10s后恢复消息拉取。
4.2 Broker故障后,Producer重试的日志信息
2020-12-18 18:33:57.056 [WARN ] [ at org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:628)
] [Producer clientId=producer-2] Received invalid metadata error in produce request on partition test-broker-down-11-9 due to org.apache.kafka.common.errors.NotLeaderOrFollowerException: For requests intended only for the leader, this error indicates that the broker is not the current leader. For requests intended for any replica, this error indicates that the broker is not a replica of the topic partition.. Going to request metadata update now
2020-12-18 18:33:57.134 [WARN ] [ at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.handleSuccessfulResponse(NetworkClient.java:1066)
] [Producer clientId=producer-2] 22 partitions have leader brokers without a matching listener, including [test-broker-down-11-0, test-broker-down-11-5, test-broker-down-11-20, test-broker-down-11-15, test-broker-down-11-26, test-broker-down-11-9, test-broker-down-11-11, test-broker-down-11-17, test-broker-down-11-3, test-broker-down-11-24]
2020-12-18 18:33:57.166 [WARN ] [ at org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:595)
] [Producer clientId=producer-2] Got error produce response with correlation id 1321 on topic-partition test-broker-down-11-9, retrying (2147483617 attempts left). Error: NOT_LEADER_OR_FOLLOWER
2020-12-18 18:33:57.166 [WARN ] [ at org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:628)
] [Producer clientId=producer-2] Received invalid metadata error in produce request on partition test-broker-down-11-9 due to org.apache.kafka.common.errors.NotLeaderOrFollowerException: For requests intended only for the leader, this error indicates that the broker is not the current leader. For requests intended for any replica, this error indicates that the broker is not a replica of the topic partition.. Going to request metadata update now
2020-12-18 18:33:57.243 [WARN ] [ at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.handleSuccessfulResponse(NetworkClient.java:1066)
] [Producer clientId=producer-2] 22 partitions have leader brokers without a matching listener, including [test-broker-down-11-0, test-broker-down-11-5, test-broker-down-11-20, test-broker-down-11-15, test-broker-down-11-26, test-broker-down-11-9, test-broker-down-11-11, test-broker-down-11-17, test-broker-down-11-3, test-broker-down-11-24]
2020-12-18 18:33:57.319 [WARN ] [ at org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:595)
] [Producer clientId=producer-2] Got error produce response with correlation id 1323 on topic-partition test-broker-down-11-9, retrying (2147483616 attempts left). Error: NOT_LEADER_OR_FOLLOWER
2020-12-18 18:33:57.319 [WARN ] [ at org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:628)
] [Producer clientId=producer-2] Received invalid metadata error in produce request on partition test-broker-down-11-9 due to org.apache.kafka.common.errors.NotLeaderOrFollowerException: For requests intended only for the leader, this error indicates that the broker is not the current leader. For requests intended for any replica, this error indicates that the broker is not a replica of the topic partition.. Going to request metadata update now
2020-12-18 18:33:57.431 [WARN ] [ at org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:595)
] [Producer clientId=producer-2] Got error produce response with correlation id 1325 on topic-partition test-broker-down-11-9, retrying (2147483615 attempts left). Error: NOT_LEADER_OR_FOLLOWER
2020-12-18 18:33:57.431 [WARN ] [ at org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:628)
] [Producer clientId=producer-2] Received invalid metadata error in produce request on partition test-broker-down-11-9 due to org.apache.kafka.common.errors.NotLeaderOrFollowerException: For requests intended only for the leader, this error indicates that the broker is not the current leader. For requests intended for any replica, this error indicates that the broker is not a replica of the topic partition.. Going to request metadata update now
2020-12-18 18:33:57.465 [WARN ] [ at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.handleSuccessfulResponse(NetworkClient.java:1066)
] [Producer clientId=producer-2] 22 partitions have leader brokers without a matching listener, including [test-broker-down-11-0, test-broker-down-11-5, test-broker-down-11-20, test-broker-down-11-15, test-broker-down-11-26, test-broker-down-11-9, test-broker-down-11-11, test-broker-down-11-17, test-broker-down-11-3, test-broker-down-11-24]
2020-12-18 18:33:57.541 [WARN ] [ at org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:595)
] [Producer clientId=producer-2] Got error produce response with correlation id 1327 on topic-partition test-broker-down-11-9, retrying (2147483614 attempts left). Error: NOT_LEADER_OR_FOLLOWER
2020-12-18 18:33:57.541 [WARN ] [ at org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:628)
] [Producer clientId=producer-2] Received invalid metadata error in produce request on partition test-broker-down-11-9 due to org.apache.kafka.common.errors.NotLeaderOrFollowerException: For requests intended only for the leader, this error indicates that the broker is not the current leader. For requests intended for any replica, this error indicates that the broker is not a replica of the topic partition.. Going to request metadata update now
2020-12-18 18:34:12.276 [INFO ] [ at cn.xdf.xadd.testing.ProducerTest.produce(ProducerTest.java:43)
] ssl send success,partition:9,offset:482,cost:19415ms
2020-12-18 18:34:12.276 [INFO ] [ at cn.xdf.xadd.testing.ProducerTest.produce(ProducerTest.java:47)
] ------->current finish:870
2020-12-18 18:34:12.322 [INFO ] [ at cn.xdf.xadd.testing.ProducerTest.produce(ProducerTest.java:43)
] ssl send success,partition:26,offset:491,cost:35ms
2020-12-18 18:34:12.322 [INFO ] [ at cn.xdf.xadd.testing.ProducerTest.produce(ProducerTest.java:47)
] ------->current finish:871
2020-12-18 18:34:12.344 [INFO ] [ at cn.xdf.xadd.testing.ProducerTest.produce(ProducerTest.java:43)
] ssl send success,partition:29,offset:468,cost:10ms
2020-12-18 18:34:12.345 [INFO ] [ at cn.xdf.xadd.testing.ProducerTest.produce(ProducerTest.java:47)
2020-12-18 18:34:13.927 [INFO ] [ at cn.xdf.xadd.testing.ProducerTest.produce(ProducerTest.java:43)
] ssl send success,partition:14,offset:517,cost:1569ms
2020-12-18 18:34:13.928 [INFO ] [ at cn.xdf.xadd.testing.ProducerTest.produce(ProducerTest.java:47)
] ------->current finish:873
2020-12-18 18:34:13.948 [INFO ] [ at cn.xdf.xadd.testing.ProducerTest.produce(ProducerTest.java:43)
] ssl send success,partition:18,offset:493,cost:9ms
2020-12-18 18:34:13.948 [INFO ] [ at cn.xdf.xadd.testing.ProducerTest.produce(ProducerTest.java:47)
] ------->current finish:874
2020-12-18 18:34:13.979 [INFO ] [ at cn.xdf.xadd.testing.ProducerTest.produce(ProducerTest.java:43)
] ssl send success,partition:10,offset:466,cost:18ms
2020-12-18 18:34:13.979 [INFO ] [ at cn.xdf.xadd.testing.ProducerTest.produce(ProducerTest.java:47)
2020-12-18 18:34:15.666 [INFO ] [ at cn.xdf.xadd.testing.ProducerTest.produce(ProducerTest.java:43)
] ssl send success,partition:7,offset:472,cost:1676ms
2020-12-18 18:34:15.666 [INFO ] [ at cn.xdf.xadd.testing.ProducerTest.produce(ProducerTest.java:47)
] ------->current finish:876
2020-12-18 18:34:15.689 [INFO ] [ at cn.xdf.xadd.testing.ProducerTest.produce(ProducerTest.java:43)
] ssl send success,partition:16,offset:470,cost:12ms
2020-12-18 18:34:15.689 [INFO ] [ at cn.xdf.xadd.testing.ProducerTest.produce(ProducerTest.java:47)
] ------->current finish:877
2020-12-18 18:34:15.720 [INFO ] [ at cn.xdf.xadd.testing.ProducerTest.produce(ProducerTest.java:43)
] ssl send success,partition:12,offset:507,cost:20ms
2020-12-18 18:34:15.720 [INFO ] [ at cn.xdf.xadd.testing.ProducerTest.produce(ProducerTest.java:47)
可见Producer向Partition发送消息时,若发现当前Leader所在Broker挂了,会不断重试,测试测序使用的是默认的重试次数(2147483647),会一直重试直到成功,重试次数通过参数retries配置。
5 测试结果总结
- Consumer在Broker节点故障后,如果拉取的消息所在Partition的Leader为故障节点,则会停止对当前partition的消息拉取,大概几秒~十几秒之后恢复对该Partition的消息拉取(Leader选举完成)
- 而Producer发现正在提交消息的Broker故障之后,会不断重试(默认2147483647次,可通过retries配置)直到消息提交成功,如果超过配置的重试次数还是失败,则返回给客户端异常。
6 Producer是否会丢消息?
测试方案:每次发送2000条数据,发送过程中杀掉Broker,测试完成后通过Kafka Tool统计消息条数,测试了8次,每次耗时一分钟左右,结果如下:
第一次:2000
第二次:3967(丢33条)
第三次:5967
第四次:7965(丢2条)
第五次:9964(丢1条)
第六次:11964(前5次杀一个Broker,本次杀两个)
第七次:13963(丢1条,本次杀的Broker为Controller)
第八次:17922(丢41条,本次杀所有Broker,为长时间观察,测了4000条,整个集群大概花了两分钟完全恢复)
经过测试,发现Broker故障时,Producer是可能丢数据的,丢数据的原因猜想:因为acks使用的是默认值1,只要 Leader写入成功就算发送成功,如果Leader故障,此时其余的副本还没有一个完全同步完消息的,导致消息丢失。(果然,书中解释:如果消息写入Leader副本并返回成功给生产者,且在被其他Follower副本拉取消息之前,Leader崩溃,那么此时消息会丢失,因为新选举的Leader副本中并没有这条消息)。
注意:acks的值类型为字符串
当把参数acks设置为-1之后,经过几轮测试,即使把所有Broker杀掉,都没有丢消息的情况,只不过发送消息的耗时会增加很多。
7 如何保证消息可靠性?
如果要保证消息的可靠性,不考虑程序发送消息到Kafka的耗时,可以采用异步提交到Kafka,将retries设置大一些,也可以直接使用默认的int最大值,然后将acks设置为-1或者all,除此之外,还需要对生产者配置进行合理的优化,主要涉及的参数:
- buffer.memory:设置生产者客户端消息累加器(RecordAccumulator)的缓存大小,默认值33554432B,即32MB,如果生产者发送消息的速度超过发送到服务器的速度,则会导致生产者空间不足,这个时候Kafka Producer的send()方法要么被阻塞,要么抛出异常,这个取决于参数:max.block.ms的配置,此值默认为60000,即60秒
- retry.backoff.ms:默认100,用来设定两次重试之间的时间间隔,避免无效的频繁重试,在配置retries和retry.backoff.ms之前,最好先估算一下异常恢复的时间,这样可以设定总的重试时间大于这个异常恢复时间,避免生产者过早地放弃
- delivery.timeout.ms:设置从客户端发送消息到Broker返回响应的总时间上限(包括了失败重试的时间),默认120000 (2 minutes),该参数配置的值应该大于或等于request.timeout.ms和linger.ms配置的总和
- linger.ms:指定生产者客户端Sender线程发送ProducerBatch之前等待更多消息(ProducerRecord)加入ProducerBatch的时间,默认为0,生产者客户端会在ProducerBatch被填满(batch.size大小默认为16384,即16KB)或者等待时间超过linger.ms值时发送出去,增大这个值会增加消息的延迟,但是能提升一定的吞吐量
- request.timeout.ms:配置生产者等待请求响应的最长时间,默认为30000(30s)。注意这个参数要比broker端的配置参数:replica.lag.time.max.ms的值要大,可减少因客户端重试而引起的消息重复的概率
理论上这已经可以解决消息丢失了,但还需考虑Partition的ISR只有一个,也就是说,该Partition的ISR集合中只有Leader本身,此时acks=all就退化成了acks=1,当Leader挂了,还是会从其他未同步完消息的副本中选举Leader,出现丢消息的情况,所以,还需要配合以下两个Kafka服务端的配置,才能进一步保证消息的可靠性:
- min.insync.replicas:指定ISR集合中最小的副本数,如果不满足条件会抛出异常,默认为1,典型的方案是副本数配置为3,min.insync.replicas配置为
- unclean.leader.election.enable:默认false,如果设置为true,意味着Leader故障的时候可以从非ISR集合中选举出新的Leader,这样还是有可能导致消息丢失