首页 > 其他分享 >kafka复习:(22)一个分区只能被消费者组中的一个消费者消费吗?

kafka复习:(22)一个分区只能被消费者组中的一个消费者消费吗?

时间:2023-09-07 16:38:49浏览次数:39  
标签:组中 java 消费者 22 kafka org apache import properties


默认情况下,一个分区只能被消费者组中的一个消费者消费。但可以自定义PartitionAssignor来打破这个限制。
一、自定义PartitionAssignor.

package com.cisdi.dsp.modules.metaAnalysis.rest.kafka2023;

import org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor;
import org.apache.kafka.common.TopicPartition;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class BroadcastAssignor extends AbstractPartitionAssignor {
    @Override
    public String name() {
        return "broadcast";
    }

    private Map<String, List<String>> consumersPerTopic(Map<String, Subscription> consumerMetadata) {
        Map<String, List<String>> res = new HashMap<>();
        for (Map.Entry<String, Subscription> subscriptionEntry : consumerMetadata.entrySet()) {
            String consumerId = subscriptionEntry.getKey();
            for (String topic : subscriptionEntry.getValue().topics())
                put(res, topic, consumerId);
        }
        return res;
    }

    @Override
    public Map<String, List<TopicPartition>> assign(
            Map<String, Integer> partitionsPerTopic,
            Map<String, Subscription> subscriptions) {
        Map<String, List<String>> consumersPerTopic =
                consumersPerTopic(subscriptions);
        Map<String, List<TopicPartition>> assignment = new HashMap<>();
        subscriptions.keySet().forEach(memberId ->
                assignment.put(memberId, new ArrayList<>()));
        consumersPerTopic.entrySet().forEach(topicEntry->{
            String topic = topicEntry.getKey();
            List<String> members = topicEntry.getValue();

            Integer numPartitionsForTopic = partitionsPerTopic.get(topic);
            if (numPartitionsForTopic == null || members.isEmpty())
                return;
            List<TopicPartition> partitions = AbstractPartitionAssignor
                    .partitions(topic, numPartitionsForTopic);
            if (!partitions.isEmpty()) {
                members.forEach(memberId ->
                        assignment.get(memberId).addAll(partitions));
            }
        });
        return assignment;
    }
}

二、定义两个消费者,给其配置上述PartitionAssignor.

package com.cisdi.dsp.modules.metaAnalysis.rest.kafka2023;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;

import java.time.Duration;
import java.time.temporal.TemporalUnit;
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.TimeUnit;

public class KafkaTest19 {

    private static Properties getProperties(){
        Properties properties=new Properties();

        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"xx.xx.xx.xx:9092");
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"testGroup2023");
        properties.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
                BroadcastAssignor.class.getName());
        return properties;
    }
    public static void main(String[] args) {

        KafkaConsumer<String,String> myConsumer=new KafkaConsumer<String, String>(getProperties());
        String topic="study2023";
        myConsumer.subscribe(Arrays.asList(topic));

        while(true){
            ConsumerRecords<String,String> consumerRecords=myConsumer.poll(Duration.ofMillis(5000));
            for(ConsumerRecord record: consumerRecords){
                System.out.println(record.value());
                System.out.println("record offset is: "+record.offset());
            }

        }



    }
}
package com.cisdi.dsp.modules.metaAnalysis.rest.kafka2023;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;

import java.time.Duration;
import java.time.temporal.TemporalUnit;
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.TimeUnit;

public class KafkaTest20 {

    private static Properties getProperties(){
        Properties properties=new Properties();

        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"xx.xx.xx.xx:9092");
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"testGroup2023");
        properties.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
                BroadcastAssignor.class.getName());
        return properties;
    }
    public static void main(String[] args) {

        KafkaConsumer<String,String> myConsumer=new KafkaConsumer<String, String>(getProperties());
        String topic="study2023";
        myConsumer.subscribe(Arrays.asList(topic));

        while(true){
            ConsumerRecords<String,String> consumerRecords=myConsumer.poll(Duration.ofMillis(5000));
            for(ConsumerRecord record: consumerRecords){
                System.out.println(record.value());
                System.out.println("record offset is: "+record.offset());
            }

        }



    }
}

在kafka创建只有一个分区的topic : study2023

创建一个生产者往study2023这个 topic发送消息:

package com.cisdi.dsp.modules.metaAnalysis.rest.kafka2023;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Date;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

public class KafkaTest01 {
    public static void main(String[] args) {
        Properties properties= new Properties();

        properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"xx.xx.xx.xx:9092");
        KafkaProducer<String,String> kafkaProducer=new KafkaProducer<String, String>(properties);
        ProducerRecord<String,String> producerRecord=new ProducerRecord<>("study2023",0,"fff","hello sister,now is: "+ new Date());
        Future<RecordMetadata> future = kafkaProducer.send(producerRecord);
        long offset = 0;
        try {
            offset = future.get().offset();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
        System.out.println(offset);

        kafkaProducer.close();
    }
}

分别运行生产者和消费者,可以看到相同消费者组里两个消费者可以消费study2023这个topic的同一个分区的数据

kafka复习:(22)一个分区只能被消费者组中的一个消费者消费吗?_kafka

kafka复习:(22)一个分区只能被消费者组中的一个消费者消费吗?_kafka_02


标签:组中,java,消费者,22,kafka,org,apache,import,properties
From: https://blog.51cto.com/amadeusliu/7398476

相关文章

  • 业务安全情报第22期 | 不法分子为何盗刷企业短信?
    顶象防御云业务安全情报中心监测发现,某知名社交平台遭遇黑灰产大规模注册账号,账号短信接口被疯狂盗用。不仅影响正常用户操作,更带来各种威胁。 手机短信的重要性在互联网时代,账号服务是我们日常生活中不可或缺的一部分,包括账号注册、账号登录、账号密码找回等。而手机短信......
  • 亚马逊电热水壶认证加拿大CSA22.1和SOR/2016-181标准和要求
    近日,亚马逊平台发布公告,要求在加拿大站销售的所有电水壶必须有ISO17025实验室出具的符合CSA22.1和SOR/2016-181标准的认证证书。卖家们应尽快上传相关资料以避免产品被强制下架,截止日期为2023年10月30日。电水壶作为一种常见的小家电,受到了广大消费者的喜爱。然而,由于安全问题的日......
  • 用友畅捷通T+ Upload.aspx任意文件上传漏洞CNVD-2022-60632
    漏洞描述用友畅捷通T+Upload.aspx接口存在任意文件上传漏洞,攻击者通过preload参数绕过身份验证进行文件上传,控制服务器漏洞影响用友畅捷通T+漏洞复现fofa语法:app="畅捷通-TPlus"登录页面如下:上传文件类型验证不完善,可上传任意文件到服务器中的任意位置,验证POCPOST/......
  • office2022正式版下载 office官方下载安装(含教程) 新功能介绍
    软件特色1.更适合个人使用2.每人拥有独立账号和独立1TB云存储,无需担心隐私安全.3.office365离线安装包官方版支持1台电脑或Mac、1台平板和1部智能手机(Windows、iOS、Android皆可)4.独享更多高效炫酷新功能,且每月更新5.Word、Excel、powerpoint、outlook和OneNote完全安......
  • 2022csp-j复赛试题及答案
    1#include<iostream>2usingnamespacestd;34intmain(){5inta,b;6cin>>a>>b;7longlongans=1;//注意longlong,不能用int8for(inti=1;i<=b;i++){9ans*=a;10if(ans>1e9){11......
  • P2215 [HAOI2007] 上升序列
    考虑一个长度为\(L\)的最长上升子序列\(P\),以它的第\(i\)个元素\(a_{x_i}\)开头的最长上升子序列长度至少为\(L-i+1\)。反之,若一个数满足以其开头的最长上升子序列长度至少为\(L-i+1\)则这个数必定可以作为\(P\)的第\(i\)个元素。所以我们可以先倒着跑一遍最长下降......
  • 【ICML2022】Understanding The Robustness in Vision Transformers
    来自NUS&NVIDIA文章地址:[2204.12451]UnderstandingTheRobustnessinVisionTransformers(arxiv.org)项目地址:https://github.com/NVlabs/FAN一、MotivationCNN使用滑动窗的策略来处理输入,ViT将输入划分成一系列的补丁,随后使用自注意力层来聚合补丁并产生他们的表示,ViT的......
  • 在 PHP 数组中的两个字符串之间切换
    在PHP中,你可以使用array_flip()函数和条件语句来在数组中的两个字符串之间进行切换。以下是一个示例://创建一个数组,包含两个字符串的映射关系$mapping=array('string1'=>'value1','string2'=>'value2');//定义当前需要切换的字符串$currentString='string......
  • The 2022 ICPC Asia Nanjing Regional Contest
    链接:https://codeforces.com/gym/104128A.Stop,YesterdayPleaseNoMore#include"bits/stdc++.h"usingnamespacestd;usingi64=longlong;voidsolve(){intn,m,k;cin>>n>>m>>k;strings;cin>>......
  • The 2022 ICPC Asia Hangzhou Regional Programming Contest
    The2022ICPCAsiaHangzhouRegionalProgrammingContestNoBugNoGame  #include<bits/stdc++.h>usingnamespacestd;#defineendl"\n"#defineintlonglongtypedeflonglongll;constintN=3e3+10;intf[N][N][2];signedm......