首页 > 其他分享 >7.Kafka,构建TB级异步消息系统

7.Kafka,构建TB级异步消息系统

时间:2023-12-29 21:00:29浏览次数:36  
标签:异步 -- zookeeper server kafka queue Kafka TB

1.阻塞队列

  • BlockingQueue
    • 解决线程通信的问题。
    • 阻塞方法:put、take。
  • 生产者消费者模式
    • 生产者:产生数据的线程。
    • 消费者:使用数据的线程。
  • 实现类
    • ArrayBlockingQueue
    • LinkedBlockingQueue
    • PriorityBlockingQueue、SynchronousQueue、DelayQueue等。

 面试题:写一个生产者消费者实现

public class Test {
    public static void main(String[] args) {
        BlockingQueue<String> queue = new LinkedBlockingDeque<>(10);
        Producer p = new Producer(queue);
        Consumer c = new Consumer(queue);
        new Thread(p,"producer").start();
        new Thread(c,"consumer").start();
    }
}

class Consumer implements Runnable {
    private BlockingQueue<String> queue;
    public Consumer(BlockingQueue<String> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            while(true){
                Thread.sleep(20);
                System.out.println("消费者消费了:" + queue.take());
            }
        }catch (InterruptedException e) {
                e.printStackTrace();
            }
    }
}

class Producer implements Runnable{
    private BlockingQueue<String> queue;
    public Producer(BlockingQueue<String> queue){
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            for (int i = 0; i < 5; i++) {
                String tmp = "a product " + i + " from:" + Thread.currentThread().getName();
                System.out.println("生产者生产了:" + tmp);
                queue.put(tmp);
                Thread.sleep(20);
            }
        }catch (InterruptedException e) {
                e.printStackTrace();
        }
    }
}

2.kafka入门

  • Kafka简介
    • Kafka是一个分布式的消息队列。
    • 应用:消息系统、日志收集、用户行为追踪、流式处理。
  • Kafka特点
    • 高吞吐量、消息持久化、高可靠性、高扩展性。
  • Kafka术语
    • Broker:Kafka的服务器
    • Zookeeper:管理集群
    • Topic:点对点模式中每个消费者拿到的消息都不同,发布订阅模式中消费者可能拿到同一份消息。Kafka采用发布订阅模式,生产者把消息发布到的空间(位置)就叫Topic
    • Partition:是对Topic位置的分区,如下图:
      img
    • Offset:就是消息在分区中的索引
      img
    • Leader Replica:主副本,可以处理请求
    • Follower Replica:从副本,只是用作备份

Kafka相关链接:https://kafka.apache.org/

Windows下使用Kafka

  在2.8以前,kafka安装前需要安装zookeeper。如果不需要额外使用zookeeper其他功能,可以安装2.8以后的版本。

在启动前改一下相关配置:

(1)解压kafka压缩包

(2)config下的zookeeper.properties

(3)config下的server.properties

  注意启动的时候先zookeeper后kafka,停止的时候先kafka后zookeeper。

 (4) 启动zookeeper

cd E:\Software\Kafka\kafka_2.12-3.4.0
bin\windows\zookeeper-server-start.bat config\zookeeper.properties

报错:INFO ZooKeeper audit is disabled. (org.apache.zookeeper.audit.ZKAuditProvider),需要改zookeeper.properties

(5)启动kafka

bin\windows\kafka-server-start.bat config\server.properties

启动完成后会出现配置的文件夹

(6) 创建主题

bin\windows\kafka-topics.bat --create --topic topicDemo --bootstrap-server localhost:9092

(7)显示所有topic列表

bin\windows\kafka-topics --list --bootstrap-server localhost:9092

 (8) 向主题发送消息

E:\Software\Kafka\kafka_2.12-3.4.0>bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic topicDemo

(9)消费消息

bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic topicDemo --from-beginning

3.Spring整合Kafka

 

标签:异步,--,zookeeper,server,kafka,queue,Kafka,TB
From: https://www.cnblogs.com/cjhtxdy/p/17935567.html

相关文章

  • debezium+kafka实现mysql数据同步(debezium-connector-mysql)
    1.情景展示在企业当中,往往会存在不同数据库之间的表的数据需要保持一致的情况(数据同步)。如何将A库a表的数据同步至B库a表当中呢?(包含:新增、修改和删除)往往不仅仅需要保持数据的一致性,还要保证数据的即时性,即:A库a表的数据发生变化后,B库a表也能立刻同步变化。实时保持两表数据......
  • Kafka-基本介绍和常见问题
    1、kafka1.1、kafka介绍​kafka是最初由linkedin公司开发的,使用scala语言编写,kafka是一个分布式,分区的,多副本的,多订阅者的消息队列系统。 1.2、kafka相比其他消息队列的优势常见的消息队列:RabbitMQ,Redis,zeroMQ,ActiveMQkafka的优势:1) 可靠性:分布式的,分区,复制和容错的。......
  • 经纬恒润轻量化网络自动化测试系统TestBase_DESKNAT重磅发布!
        经纬恒润桌面式网络自动化测试系统TestBase_DESKNAT2.0产品重磅发布! ......
  • Smartbi荣获工信部旗下赛迪网“2023行业信息技术应用创新产品”大奖
    近日,由工信部旗下的赛迪网、《数字经济》杂志共同主办的2023行业信息技术应用创新大会上,“信息技术应用创新成果名单”重磅揭晓,思迈特软件凭借“Smartbi自然语言分析引擎”斩获“2023行业信息技术应用创新产品”大奖。据了解,本次行业信息技术应用创新成果评选,旨在评选出能够引领行......
  • [c#]WebClient异步下载文件并显示进度
    https://www.cnblogs.com/wolf-sun/p/6699733.html在项目开发中经常会用到下载文件,这里使用winform实现了一个带进度条的例子。一个例子usingSystem;usingSystem.Collections.Generic;usingSystem.ComponentModel;usingSystem.Data;usingSystem.Drawing;usingSystem.IO;u......
  • kafka消费中文显示为乱码
    1.情景展示如上图所示,在windows操作系统当中,当我使用消费主题的命令进行数据消费时,存在kafka当中的消息含有的中文,最终展示为乱码。kafka-console-consumer.bat--bootstrap-serverlocalhost:9092--topictopic-xxx-63.库名.表名--from-beginning这是怎么回事?如何解决?2......
  • kafka下载、安装与部署
    1.kafka简介kafka官网地址:https://kafka.apache.org/kafka的本质是一个数据存储平台,流平台,只是他在做消息发布,消息消费的时候我们可以把他当做消息中间件来用。Kafka提供了一个KafkaBroker、一个KafkaProducer和一个KafkaConsumer。以下介绍源自:文心一言。KafkaBro......
  • CompletableFuture异步编程
    一、基本介绍1.1 多线程编程的发展过程创建线程的方式继承 Thread 类实现 Runnable 接口特点:没有参数,没有返回值,没办法抛出异常JDK 1.5 进阶版Callable + FutureCallable接口中定义的 V call() throws Exception,该方法可以返回泛型值 V,并能够抛出异常......
  • kafka-Kafka3.4版本创建topic出现zookeeper is not a recognized option
    问题描述:在linux云服务器上搭建了一套kafka3.0集群,然后按照以前的创建topic指令:./kafka-topics.sh--zookeeperhadoop01:2181,hadoop02:2181,hadoop03:2181--replication-factor1--partitions1--topictest然而,却出现了这样一个异常提示:Exceptioninthread"main"jopt......
  • 【flink番外篇】6、flink的WaterMark(介绍、基本使用、kafka的水印以及超出最大允许延
    文章目录Flink系列文章一、maven依赖二、示例-Flink1.13.6版本:kafka数据源,每10s统计一次地铁进站每个入口人数1、maven依赖2、实现1)、javabean2)、实现3、验证1)、验证步骤2)、验证三、示例-Flink1.17.0版本:kafka数据源,每10s统计一次地铁进站每个入口人数1、maven依赖2、实现1)、j......