Kafka学习01:默认分区策略解析
Kafka版本:2.5.1
DefaultPartitioner 类
/** * The default partitioning strategy: * <ul> * <li>If a partition is specified in the record, use it ;如果方法声明了分区,则直接使用 * <li>If no partition is specified but a key is present choose a partition based on a hash of the key; 没有没有申明分区,但是传入了key,则根据key的Hash值,进行分区选择 * <li>If no partition or key is present choose the sticky partition that changes when the batch is full. 如果没有声明分区,也没有传入key,使用粘性分区策略 * * See KIP-480 for details about sticky partitioning. */ public class DefaultPartitioner implements Partitioner { private final StickyPartitionCache stickyPartitionCache = new StickyPartitionCache(); public void configure(Map<String, ?> configs) {} public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { if (keyBytes == null) { // key不存在时,使用粘性分区策略 return stickyPartitionCache.partition(topic, cluster); } //获取分区信息 List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); //获取分区数量 int numPartitions = partitions.size(); // hash the keyBytes to choose a partition //根据可以的Hash值取余分区数量,确认分区 return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; } }
StickyPartitionCache类public class StickyPartitionCache {
//ConcurrentMap用于缓存,key=topic,value=分区Id private final ConcurrentMap<String, Integer> indexCache;
public StickyPartitionCache() { this.indexCache = new ConcurrentHashMap<>(); } public int partition(String topic, Cluster cluster) {
//在缓存中查找topic对应的分区Id Integer part = indexCache.get(topic); if (part == null) {
//如果分区号不存在,则寻找下一个 return nextPartition(topic, cluster, -1); } return part; } public int nextPartition(String topic, Cluster cluster, int prevPartition) {
//获取全部分区信息 List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
//获取原分区Id Integer oldPart = indexCache.get(topic);
//默认新分区Id = 原分区Id Integer newPart = oldPart; //判断原分区是否存在(是否已经设置过),如果没有则执行if逻辑 if (oldPart == null || oldPart == prevPartition) {
//获取可用分区列表 List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
if (availablePartitions.size() < 1) {
//如果可用分区不存在,则随机选一个分区 Integer random = Utils.toPositive(ThreadLocalRandom.current().nextInt()); newPart = random % partitions.size(); } else if (availablePartitions.size() == 1) {
//如果可用分区只有一个,则选择该分区 newPart = availablePartitions.get(0).partition(); } else {
//如果可用分区有多个,则循环随机选择一个可用分区,直到和原分区不一样 while (newPart == null || newPart.equals(oldPart)) { Integer random = Utils.toPositive(ThreadLocalRandom.current().nextInt()); newPart = availablePartitions.get(random % availablePartitions.size()).partition(); } } // Only change the sticky partition if it is null or prevPartition matches the current sticky partition. if (oldPart == null) {
//如果原分区不存在,则对topic这是新的分区 indexCache.putIfAbsent(topic, newPart); } else {
//如果原分区存在,则用新分区对topic替换原分区 indexCache.replace(topic, prevPartition, newPart); }
//查询返回分区 return indexCache.get(topic); } return indexCache.get(topic); } }
END
标签:01,分区,partition,默认,Kafka,topic,key,indexCache,newPart From: https://www.cnblogs.com/wobuchifanqie/p/17734118.html