首页 > 其他分享 >消息队列篇--原理篇--Pulsar(Namespace,BookKeeper,类似Kafka甚至更好的消息队列)

消息队列篇--原理篇--Pulsar(Namespace,BookKeeper,类似Kafka甚至更好的消息队列)

时间:2025-01-21 22:28:06浏览次数:3  
标签:订阅 存储 -- 主题 Namespace BookKeeper 队列 消息 Pulsar

Apache Pulusar是一个分布式、多租户、高性能的发布/订阅(Pub/Sub)消息系统,最初由Yahoo开发并开源。它结合了Kafka和传统消息队列的优点,提供高吞吐量、低延迟、强一致性和可扩展的消息传递能力,适用于大规模分布式系统的实时数据处理和异步通信。
Pulsar的架构设计结合了消息队列和流处理的特点,既可以作为传统消息队列使用,也可以作为流处理平台支持实时数据处理。

主要特点:

  • 分布式架构:Pulsar采用分层架构,将消息存储与代理服务分离,提供了更好的水平扩展能力和故障隔离。
  • 多租户支持:Pulsar支持多租户部署,不同租户可以共享同一集群,同时保证资源隔离和安全性。
  • 持久化和一致性:Pulsar支持消息的持久化存储,并通过BookKeeper提供强一致性保障。
  • 灵活的消息模型:Pulsar支持多种消息传递模式,包括Pub/Sub、P2P和Key_Shared订阅模式。
  • 多语言支持:Pulsar提供了多种编程语言的客户端库,如Java、Python、Go、C++等。
  • 丰富的生态:Pulsar拥有活跃的社区和丰富的生态系统,支持与其他工具和服务集成,如Kafka Connect、Flink、Spark等。

1、核心概念

(1)、命名空间(Namespace)

命名空间是Pulsar中的一个逻辑单元,用于组织和管理主题(Topic)。每个命名空间可以包含多个主题,并且可以为不同的命名空间设置不同的配置,例如保留策略、订阅类型等。命名空间通常用于实现多租户隔离。

(2)、主题(Topic)

主题是Pulsar中的消息通道,生产者(Producer)将消息发送到主题,消费者(Consumer)从主题中消费消息。

Pulsar支持两种类型的主题:

  • 持久化主题(Persistent Topic):消息会被持久化存储,确保即使在broker故障的情况下也不会丢失。
  • 非持久化主题(Non-Persistent Topic):消息不会被持久化存储,适用于对延迟敏感但对可靠性要求较低的场景。

(3)、订阅(Subscription)

订阅是消费者与主题之间的绑定关系。Pulsar支持多种订阅类型,每种订阅类型决定了消息的分发方式:

  • 独占订阅(Exclusive Subscription):只有一个消费者可以订阅该主题,其他消费者无法订阅。
  • 共享订阅(Shared Subscription):多个消费者可以订阅同一个主题,消息会被轮询分发给不同的消费者。
  • 故障转移订阅(Failover Subscription):多个消费者可以订阅同一个主题,但只有一个是活跃的消费者,其他消费者作为备用。当活跃消费者失败时,备用消费者会接管消息消费。
  • Key_Shared 订阅:基于消息的key进行分区,确保相同key的消息总是被分发给同一个消费者。

(4)、消息(Message)

消息是Pulsar中的基本数据单位,由生产者发送到主题。

每个消息可以包含以下属性:

  • 消息体(Payload):消息的实际内容,可以是任意二进制数据。
  • 消息ID(Message ID):唯一标识每条消息的ID,用于确认消息的消费状态。
  • 属性(Properties):用户可以为消息添加自定义的键值对属性,方便后续处理。
  • 时间戳(Timestamp):消息的创建时间或发送时间。

(5)、分区(Partition)

Pulsar支持主题分区,即将一个主题划分为多个分区,每个分区可以独立地处理消息。分区可以提高主题的吞吐量和并发性,特别是在高负载场景下。Pulsar会自动将消息均匀分布到不同的分区中。

(6)、Broker

Broker是Pulsar的核心组件之一,负责接收生产者的消息并将其分发给消费者。注意,Broker不直接存储消息,而是将消息委托给BookKeeper进行持久化存储。Broker负责管理主题、订阅和消费者的连接,并处理消息的路由和分发。

(7)、BookKeeper

BookKeeper是Pulsar的持久化存储层,负责将消息持久化到磁盘。BookKeeper采用分布式日志存储机制,提供了高可用性和强一致性保障。每个消息会被写入多个BookKeeper节点,确保即使部分节点故障也不会丢失数据。

(8)、ZooKeeper

ZooKeeper是Pulsar的元数据管理组件,用于存储集群的配置信息、主题和命名空间的元数据、以及Broker和BookKeeper的状态信息。ZooKeeper提供了分布式协调服务,确保Pulsar集群的一致性和可靠性。

2、架构设计

Pulsar的架构设计采用了分层结构,将消息存储与代理服务分离,使得系统更加模块化和可扩展。

结构示例图:
在这里插入图片描述

Pulsar的主要组件及其作用:

  • Broker:负责接收生产者的消息并将其分发给消费者。Broker不直接存储消息,而是将消息委托给BookKeeper进行持久化存储。Broker还负责管理主题、订阅和消费者的连接。

  • BookKeeper:即上图BK Client。负责将消息持久化到磁盘,提供高可用性和强一致性保障。BookKeeper采用分布式日志存储机制,确保消息的安全性和可靠性。

  • Bookie:Bookie是BookKeeper的存储节点组成,持久化地存储消息。BookKeeper采用分布式日志存储的方式,将消息以日志的形式存储在多个Bookie节点上。这种设计确保了消息的可靠性和持久性,即使在节点故障的情况下也能保证消息不丢失。

  • ZooKeeper:负责存储集群的元数据,包括主题、命名空间、Broker和BookKeeper的状态信息。ZooKeeper提供了分布式协调服务,确保集群的一致性和可靠性。

  • Proxy(可选):Pulsar提供了一个可选的代理层(Proxy),允许客户端通过HTTP或WebSocket协议与Pulsar集群进行通信。Proxy可以简化客户端的连接管理,并提供跨区域访问的能力。

  • Function:Pulsar提供了一个轻量级的流处理框架(Pulsar Functions),允许用户编写简单的流处理逻辑并将其部署到Pulsar集群中。Pulsar Functions可以用于实时数据处理、事件驱动计算等场景。

  • SQL:Pulsar提供了一个SQL查询引擎(Pulsar SQL),允许用户通过SQL语句查询Pulsar中的消息数据。Pulsar SQL可以用于数据分析、监控和告警等场景。

3、特性与优势

(1)、高吞吐量和低延迟

Pulsar采用了分层架构,将消息存储与代理服务分离,使得系统能够同时具备高吞吐量和低延迟。Broker负责处理消息的路由和分发,而BookKeeper负责持久化存储,两者相互协作,确保消息的高效传递。

(2)、多租户支持

Pulsar支持多租户部署,不同租户可以共享同一集群,同时保证资源隔离和安全性。每个租户可以拥有自己的命名空间,并可以根据需要设置不同的配置,例如保留策略、订阅类型等。
即:类似Nacos的命名空间,实现配置,服务等隔离。

(3)、持久化和一致性

Pulsar支持消息的持久化存储,并通过BookKeeper提供强一致性保障。每个消息会被写入多个Bookie节点,确保即使部分节点故障也不会丢失数据。Pulsar还支持事务和幂等性,确保消息的可靠传递。

(4)、灵活的消息模型

Pulsar支持多种消息传递模式,包括Pub/Sub、P2P和Key_Shared订阅模式。用户可以根据实际需求选择合适的订阅类型,满足不同的业务场景。Pulsar还支持消息的重播、回溯和跳过等功能,方便用户进行调试和故障排查。

(5)、多语言支持

Pulsar提供了多种编程语言的客户端库,包括Java、Python、Go、C++等。用户可以根据自己的技术栈选择合适的客户端库,快速集成Pulsar到应用程序中。

(6)、丰富的生态

Pulsar拥有活跃的社区和丰富的生态系统,支持与其他工具和服务集成。例如,Pulsar可以与Kafka Connect、Flink、Spark等工具集成,实现数据的实时处理和分析。Pulsar还提供了Pulsar Functions和Pulsar SQL等功能,进一步扩展了其应用场景。

4、应用场景

(1)、实时数据处理

Pulsar的高吞吐量和低延迟特性使其非常适合用于实时数据处理场景。例如,电商网站可以使用Pulsar来处理订单、支付、库存等实时数据,确保数据的及时性和准确性。

(2)、物联网(IoT)

Pulsar的分布式架构和多租户支持使其非常适合用于物联网场景。物联网设备可以将传感器数据发送到Pulsar,Pulsar可以将这些数据分发给不同的消费者进行处理。Pulsar还支持消息的重播和回溯功能,方便用户进行历史数据分析。

(3)、微服务架构

Pulsar可以作为微服务之间的消息总线,实现服务间的异步通信。微服务可以通过Pulsar发送和接收消息,避免阻塞主线程,提高系统的响应速度和稳定性。

(4)、日志收集和监控

Pulsar可以用于日志收集和监控场景,将应用的日志数据发送到Pulsar,Pulsar可以将这些数据分发给不同的消费者进行处理。Pulsar还支持消息的持久化存储,确保日志数据不会丢失。

(5)、事件驱动架构

Pulsar支持事件驱动架构,用户可以将事件发送到Pulsar,Pulsar可以将这些事件分发给不同的消费者进行处理。Pulsar还支持消息的重播和回溯功能,方便用户进行事件的回放和调试。

5、代码示例

(1)、生产者示例

import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.MessageId;

public class PulsarProducerExample {

    public static void main(String[] args) throws Exception {
        // 1、创建Pulsar客户端
        try (PulsarClient client = PulsarClient.builder()
                .serviceUrl("pulsar://localhost:6650")
                .build()) {

            // 2、创建生产者
            try (Producer<byte[]> producer = client.newProducer()
                    .topic("persistent://public/default/example-topic")    // 指定主题
                    .create()) {

                // 3、发送消息
                for (int i = 0; i < 10; i++) {
                    String message = "Hello, Pulsar! " + i;
                    MessageId msgId = producer.send(message.getBytes());    // 发送消息
                    System.out.println(" [x] Sent message: " + message + ", msgId: " + msgId);
                }
            }
        }
    }
}

(2)、消费者示例

import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.SubscriptionType;

public class PulsarConsumerExample {

    public static void main(String[] args) throws Exception {
        // 1、创建Pulsar客户端
        try (PulsarClient client = PulsarClient.builder()
                .serviceUrl("pulsar://localhost:6650")
                .build()) {

            // 2、创建消费者
            try (Consumer<byte[]> consumer = client.newConsumer()
                    .topic("persistent://public/default/example-topic")   // 监听的主题
                    .subscriptionName("example-subscription")
                    .subscriptionType(SubscriptionType.Shared)
                    .subscribe()) {

                // 3、接收和消费消息
                while (true) {       // 利用循环接收消息
                    Message<byte[]> msg = consumer.receive();      // 具体接收消息
                    try {
                        System.out.println(" [x] Received message: " + new String(msg.getData()));
                        consumer.acknowledge(msg);  // 4、确认消息已消费
                    } catch (Exception e) {
                        consumer.negativeAcknowledge(msg);  // 5、处理失败,重新投递
                    }
                }
            }
        }
    }
}

6、Pulsar总结

Apache Pulsar是一个功能强大、架构灵活的消息系统,特别适合大规模分布式系统的实时数据处理和异步通信。它的分层架构、多租户支持、持久化和一致性保障、灵活的消息模型等特点,使其在性能、可靠性和可扩展性方面表现出色。Pulsar还拥有丰富的生态系统,支持与其他工具和服务集成,适用于多种应用场景。

乘风破浪会有时,直挂云帆济沧海!!!

标签:订阅,存储,--,主题,Namespace,BookKeeper,队列,消息,Pulsar
From: https://blog.csdn.net/qq_34207422/article/details/145291463

相关文章

  • 消息队列篇--原理篇--RabbitMQ和Kafka对比分析
    RabbitMQ和Kafka是两种非常流行的消息队列系统,但它们的设计哲学、架构特点和适用场景存在显著差异。对比如下。1、架构设计RabbitMQ:基AMQP协议:RabbitMQ是基于AMQP(高级消息队列协议)构建的,支持多种消息传递模式,如发布/订阅、路由、RPC等。单片架构:RabbitMQ采用的是传统的Br......
  • Doherty放大器中输入功分器—Wilkinson功分器(2)
    Wilkinson功分器集总元件实现Wilkinson功分器通常采用分布参数元件实现,某些场合需要采用集总元件网络来替代分布参数网络。推导Wilkinson功分器的集总元件等效电路的一种方法是从推导四分之一波长传输线的集总元件等效电路开始。设传输线的特性阻抗为,其中端口标称阻抗。若传输......
  • JAVA概述
    一.Java的历史​Java诞生于1995年,创始人为大胡子gosling,后来给甲骨文公司收购。二.Java概述2.1Java的重要特点Java是面向对象的(oop)Java是健壮的,有强类型机制、异常处理、垃圾的自动收集等Java是跨平台的,生成的class文件可以在各个系统平台运行(基于Java虚拟机JVM)Java是......
  • K8S从私有仓库拉取镜像
    pod结合secret下载私有镜像1、保证节点机器可以登录仓库dockerlogin--usernameadmin--passwordHarbor12345harbor.hack.me2、结合sercet资源针对密钥文件进行加密kubectlcreatesecretgenericregcred--from-file=/root/.docker/config.json--type=kubernetes.io/......
  • 使用 Go 语言与 Tesseract 进行验证码识别
    验证码(CAPTCHA)作为一种常见的防止自动化脚本的安全措施,广泛应用于各种网站和应用程序中。为了突破验证码的防护,可以通过OCR(光学字符识别)技术自动识别验证码中的文本。Tesseract是一个开源的OCR引擎,能够识别图像中的文字。在本文中,我们将介绍如何使用Go语言和TesseractOCR......
  • nbu控制台无法登陆处理
     NBU登陆控制台报错VerificationofX.509certificatefailedwhenconnectingtothebpjavamsvcserviceon 处理过程停止并重起服务[root@nbuopenv]#/usr/openv/netbackup/bin/nbcertcmd-getCertificate-servernbu主服务器[nbu]已存在主机证书和证书吊销列表......
  • CF1765C Card Guessing
    CF1765CCardGuessingDescription有\(4\)种花色的牌,每种牌均为\(n\)张,则牌的排列⼀共有\((4\cdotn)!\)种。现在你从牌堆中逐张地取出牌,取牌之前你都会猜这张牌是什么花色。你会根据之前的\(k\)张牌中出现最少的花色来猜这张牌。如果有多种花色都是最少的,你随机地猜......
  • dfs专题一:递归
    dfs简介:1.汉诺塔问题link:面试题08.06.汉诺塔问题-力扣(LeetCode)codeclassSolution{public:voidhanota(vector<int>&A,vector<int>&B,vector<int>&C){dfs(A,B,C,A.size());}voiddfs(vector<int>&......
  • springboot毕设 基于SpringBoot远程教学网页前端的设计与实现 程序+论文
    本系统(程序+源码)带文档lw万字以上文末可获取一份本项目的java源码和数据库参考。系统程序文件列表开题报告内容研究背景随着互联网技术的飞速发展和普及,教育领域正经历着深刻的变革。传统面对面的教学模式虽然具有其独特的优势,但在时空限制、资源分配以及个性化学习需求......
  • 深度学习目标检测使用yolov8训练使用路障类数据集之_道路积水检测数据集并构建一个基
    道路积水检测数据集,23097张,yolo和voc1类,标注数量:0道路积水:133261imagenum:23097积水啦,怎么办构建基于YOLOv8的道路积水检测系统。两种标注方式(YOLO和VOC)。以下是完整的代码实现,包括数据加载、模型训练、评估和推理。1.安装依赖首先,确保您已经安装了所需的库,特......