首页 > 其他分享 >kafka消费

kafka消费

时间:2024-01-18 23:12:08浏览次数:27  
标签:消费 offsets kafkaConsumer kafka records put new public

多线程消费方式

方式1:一个线程对应一个消费者
消费者数量不大于分区数,最好也能对等起来

方式2:多线程消费同一个分区
位移提交和顺序控制的处理非常复杂,不推荐

方式1:消费者==分区数

int threadNum = 4;
for (int i = 0; i < threadNum; i++) {
	new KafkaConsumer(config,"tcp_link").start();
}
public KafkaConsumerThread(Properties config, String topic) {
	this.kafkaConsumer = new KafkaConsumer(config);
	this.kafkaConsumer.subscribe(Collections.singleton(topic));
}

每个线程可以顺序消费各个分区中的消息,但也有个问题,每个
这里有个问题,每个线程都要维护一个独立的TCP连接,如果分区数和线程数很大,系统开销是大大的!

方式3:poll与处理消息模块拆分

处理消息模块用多线程

public class KafkaConsumerBase {
    public final Properties configs;
    public KafkaConsumerBase() {
        this.configs = buildConfig();
    }
    public Properties buildConfig() {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:8080");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group.demo");
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        return properties;
    }
    public void Scheduled(String[] args) {
        List<String> topics = Arrays.asList("tcp_link", "agent");
        KafkaConsumerThread kafkaConsumerThread = new KafkaConsumerThread(this.configs,
                topics, Runtime.getRuntime().availableProcessors());
    }
    public static
    class KafkaConsumerThread extends Thread {
        private KafkaConsumer kafkaConsumer;
        private ThreadPoolExecutor executor;
        private Map<TopicPartition, OffsetAndMetadata> offsets;
        public KafkaConsumerThread(Properties configs, List<String> topics, int threadNum) {
            this.kafkaConsumer = new KafkaConsumer<>(configs);
            kafkaConsumer.subscribe(topics);
            executor = new ThreadPoolExecutor(threadNum, threadNum, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(1000), new ThreadPoolExecutor.CallerRunsPolicy());
        }
        @Override
        public void run() {
            try {
                while (true) {
                    ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(10));
                    if (!records.isEmpty()) {
                        executor.submit(new MetricParser(records, offsets));
                        synchronized (offsets) {
                            if (!offsets.isEmpty()) {
                                kafkaConsumer.commitSync();
                            }
                        }
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                kafkaConsumer.close();
            }
        }
    }
}

public class MetricParser extends Thread {
    public final ConsumerRecords<String, String> records;
    public Map<TopicPartition, OffsetAndMetadata> offsets;
    public MetricParser(ConsumerRecords<String, String> records, Map<TopicPartition, OffsetAndMetadata> offsets) {
        this.records = records;
        this.offsets = offsets;
    }
    @Override
    public void run() {
        for (TopicPartition tp : records.partitions()) {
            List<ConsumerRecord<String, String>> tpRecords = records.records(tp);
            long lastOffset = tpRecords.get(tpRecords.size() - 1).offset();
            synchronized (offsets) {
                if (!offsets.containsKey(tp)) {
                    offsets.put(tp, new OffsetAndMetadata(lastOffset + 1));
                } else {
                    long currOffset = offsets.get(tp).offset();
                    if (currOffset < lastOffset + 1) {
                        offsets.put(tp, new OffsetAndMetadata(lastOffset + 1));
                    }
                }
            }
        }
    }
}

标签:消费,offsets,kafkaConsumer,kafka,records,put,new,public
From: https://www.cnblogs.com/yuanbaobao/p/17973521

相关文章

  • 进程间通信(生产者消费者模型)
    【一】进程间通信介绍什么是进程间通信进程间通信(Inter-processCommunication,IPC)是指在不同进程之间传输数据或信号的机制。由于每个进程拥有自己独立的内存空间,所以不同进程之间无法直接访问对方的变量或数据结构。因此,操作系统提供了多种IPC机制来允许进程之间共享信息和协......
  • RocketMQ消息客户端生产与消费的基本实现
    支撑环境JDK:javaversion"1.8.0_391"应用框架:org.springframework.boot:2.7.17RocketMQ客户端SDK:rocketmq-spring-boot-starter:2.2.3生产者消息提供者添加依赖implementation'org.apache.rocketmq:rocketmq-spring-boot-starter:2.2.3'添加配置application.......
  • 当“低价高质”成行业共识,零食品牌还能靠什么拿捏消费者?
    文|螳螂观察作者|图霖年关将至,一年一度的“年货内卷赛”已一触即发。尤其是,2024年是疫情过后的首个春节,热闹必不可少,大众走亲访友的年礼更必不可少。而在这个赛场里,具备购买力但又尤其厌倦千篇一律传统年货形式的年轻人,是决定品牌能否抢占年货节制高点的关键。今年的情况尚待定......
  • kafka安装配置
    简介ApacheKafka是一款开源的分布式流处理平台,最初由LinkedIn开发,并于2011年开源。它是一个高吞吐量、可扩展、持久化的消息发布-订阅系统。Kafka被设计用于处理实时数据流,支持大规模的数据流和实时事件处理。准备工作确保你的系统满足以下要求:•Java8或更高版本已安装并配......
  • kafka简单介绍
    “这是一篇理论文章,给大家讲一讲kafka”简介在大数据领域开发者常常会听到MQ这个术语,该术语便是消息队列的意思,Kafka是分布式的发布—订阅消息系统。它最初由LinkedIn(领英)公司发布,使用Scala语言编写,与2010年12月份开源,成为Apache的顶级项目。Kafka是一个高吞吐量的、持久性的、......
  • 简单的.NET 8 Web API使用Kafka 发布订阅模式,示例api示例
    简单的.NET8WebAPI使用Kafka发布订阅模式,示例api示例kafka当使用Kafka时,我们需要使用Kafka的客户端库来与Kafka集群进行通信。在.NETCore中,可以使用Confluent.Kafka客户端库来实现与Kafka的集成。首先,我们需要在项目中添加Confluent.Kafka库的引用。首先,使用NuGet包管......
  • SparkStreaming 连接 Kafka数据源
    本文的前提条件:SparkStreaminginJava参考地址:SparkStreaming+KafkaIntegrationGuide(Kafkabrokerversion0.10.0orhigher)1.添加POM依赖<dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>......
  • SpringBoot集成Kafka构建消息系统
    一、前言在我们当前的互联网应用中,消息驱动已经成为一种不可或缺的模式,Kafka作为一款高性能的分布式消息系统,已经成为很多公司在消息驱动架构选择中很重要的工具。我们使用SpringBoot和Kafka快速构建消息驱动应用,应对高并发的消息处理业务。Kafka是分布式发布-订阅消息系统。主要特......
  • Kafka环境安装
    wgethttps://downloads.apache.org/kafka/3.6.1/kafka_2.13-3.6.1.tgzsudomkdir/usr/local/kafka-server&&cd$_sudotar-xvzf~/kafka_2.13-3.6.1.tgz--strip1sudouseradd-r-d/usr/local/kafka-server-s/usr/sbin/nologinkafkasudo-ukafkamkdi......
  • 第二章 Spring Boot 整合 Kafka消息队列 生产者
    ​ 系列文章目录第一章Kafka配置部署及SASL_PLAINTEXT安全认证第二章  SpringBoot整合Kafka消息队列 生产者第三章  SpringBoot整合Kafka消息队列 消息者(待续) 前言        Kafka是一个消息队列产品,基于Topicpartitions的设计,能达到非常高的消息......