首页 > 其他分享 >kafka入门(三):kafka多线程消费

kafka入门(三):kafka多线程消费

时间:2023-12-05 23:34:25浏览次数:46  
标签:入门 kafka records static props new 多线程 public

kafka消费积压

如果生产者发送消息的速度过快,或者是消费者处理消息的速度太慢,那么就会有越来越多的消息无法及时消费,也就是消费积压。

消费积压时,可以使用多线程消费,提高消费速度。

kafka多线程消费的代码:

public class ThirdMultiConsumerThreadDemo {
    public static final String BROKER_LIST = "localhost:9092";
    public static final String TOPIC = "myTopic1";
    public static final String GROUP_ID = "group.demo";


    public static void main(String[] args) {
        Properties props = initConfig();
        KafkaConsumerThread consumerThread = new KafkaConsumerThread(props, TOPIC,
                Runtime.getRuntime().availableProcessors());
        consumerThread.start();
    }


    /***
     * kafka配置
     * @return
     */
    public static Properties initConfig() {
        Properties props = new Properties();
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class.getName());
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_LIST);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        return props;
    }


    /**
     * kafka消费者线程
     */
    public static class KafkaConsumerThread extends Thread {
        private KafkaConsumer<String, String> kafkaConsumer;
        private ExecutorService executorService;
        private int threadNumber;

        public KafkaConsumerThread(Properties props, String topic, int threadNumber) {
            kafkaConsumer = new KafkaConsumer<>(props);
            kafkaConsumer.subscribe(Collections.singletonList(topic));
            this.threadNumber = threadNumber;
            executorService = new ThreadPoolExecutor(threadNumber, threadNumber,
                    0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(1000),
                    new ThreadPoolExecutor.CallerRunsPolicy());
        }

        @Override
        public void run() {
            try {
                while (true) {
                    ConsumerRecords<String, String> records =
                            kafkaConsumer.poll(Duration.ofMillis(100));
                    if (!records.isEmpty()) {
                        executorService.submit(new RecordsHandler(records));
                    }
                }
            } catch (Exception e) {
                log.error("run error", e);
            } finally {
                kafkaConsumer.close();
            }
        }

    }

    /**
     * 处理消息
     */
    public static class RecordsHandler extends Thread {
        public final ConsumerRecords<String, String> records;

        public RecordsHandler(ConsumerRecords<String, String> records) {
            this.records = records;
        }

        @Override
        public void run() {
            //处理records.
            for (ConsumerRecord<String, String> record : records) {
                System.out.println("==========>record:"+record.value() + ",thread:" + Thread.currentThread().getName());
            }
        }
    }

}

发送消息后,使用多线程消息,运行结果如下:

==========>record:{"id":"1234","name":"lin"},thread:pool-1-thread-1
==========>record:{"id":"5678","name":"chen"},thread:pool-1-thread-2
==========>record:{"id":"91011","name":"wu"},thread:pool-1-thread-3

参考资料:

《深入理解Kafka:核心设计与实践原理》

标签:入门,kafka,records,static,props,new,多线程,public
From: https://www.cnblogs.com/expiator/p/17878585.html

相关文章

  • Unreal入门,门06,道具随机生成与解锁
    1.在关卡蓝图中随机生成道具,并删除之前在关卡中手动放置的道具2.打开道具类型蓝图可编辑开关,随机生成不同道具3.在GameMode中增加函数判断道具是否集齐,并在TheDoor蓝图中解锁4.实时打印进度其它引擎版本为5.3.2参考https://neil3d.github.io/assets/p......
  • Kafka集群调优+能力探底
    一、前言我们需要对4个规格的kafka能力进行探底,即其可以承载的最大吞吐;4个规格对应的单节点的配置如下:标准版:2C4G铂金版:4C8G专业版:8C16G企业版:16C32G另外,一般来讲,在同配置下,kafka的读性能是要优于写性能的,写操作时,数据要从网卡拷贝至堆内存,然后进行一堆数据校验、解......
  • Android OpenGL ES入门
    1.OpenGL和OpenGLESOpenGL(OpenGraphicsLibrary)是一种用于渲染2D和3D图形的跨平台编程接口。OpenGL提供了一套标准的函数和接口,使开发人员能够在各种操作系统上创建高性能的图形应用程序,这些操作系统包括Windows、Linux、macOS和一些嵌入式系统。OpenGLES(OpenGLforEm......
  • 软件测试/人工智能|Python算术运算符:入门指南
    前言在编写程序时,可以使用算术运算符来进行基本的数学计算。Python中的算术运算符包括加法、减法、乘法、除法、取模和幂运算。本文就给大家介绍一下Python算术运算符的使用。加法运算符+加法运算符用于将两个数值相加,例如,a+b表示将a和b相加的结果。如果a和b都是数字,则加法......
  • 算法入门经典 刘汝佳 4.2 地址与指针
    4.2 地址和指针4.1节介绍的数学函数的特点是:做计算,然后返回一个值。有时候,我们要做的事情 并不是“计算”——如交换两个变量;而有时候,我们需要返回两个甚至更多的值——如解一个二元一次方程组。4.2.1 变量交换程序4-4 用函数交换变量(错误)#include<stdio.h>void swap(in......
  • Systemd 入门教程:实战篇
    一、开机启动对于那些支持Systemd的软件,安装的时候,会自动在/usr/lib/systemd/system目录添加一个配置文件。如果你想让该软件开机启动,就执行下面的命令(以httpd.service为例)。$sudosystemctlenablehttpd上面的命令相当于在/etc/systemd/system目录添加一个符号链接,指......
  • Systemd 入门教程:命令篇
    Systemd是Linux系统工具,用来启动守护进程,已成为大多数发行版的标准配置。本文介绍它的基本用法,分为上下两篇。今天介绍它的主要命令,下一篇介绍如何用于实战。一、由来历史上,Linux的启动一直采用init进程。下面的命令用来启动服务。$sudo/etc/init.d/apache2start#......
  • Python闭包概念入门
    '''Python闭包概念入门闭包(Closure)是Python中一个重要的工具。闭包:高阶函数中,内层函数携带外层函数中的参数、变量及其环境,一同存在的状态(即使已经离开了创造它的外层函数),被称之为闭包。被携带的外层变量称之为:自由变量,也被形容为:外层变量被闭包......
  • PWN学习之LLVM入门
    一、基本流程①找到runOnFunction函数时如何重写的,一般来说runOnFunction都会在函数表最下面,找PASS注册的名称,一般会在README文件中给出,若是没有给出,可通过对__cxa_atexit函数"交叉引用"来定位:②通过逆向,找到函数名及参数,编写基本exp③找到漏洞,写利用exp.c,其中的pwn的目标是op......
  • 神经网络入门篇:详解参数VS超参数(Parameters vs Hyperparameters)
    参数VS超参数什么是超参数?比如算法中的learningrate\(a\)(学习率)、iterations(梯度下降法循环的数量)、\(L\)(隐藏层数目)、\({{n}^{[l]}}\)(隐藏层单元数目)、choiceofactivationfunction(激活函数的选择)都需要来设置,这些数字实际上控制了最后的参数\(W\)和\(b\)的值,所以它们......