首页 > 其他分享 >pulsar-websocket协议

pulsar-websocket协议

时间:2022-09-25 17:47:58浏览次数:48  
标签:协议 WebSocket string Pulsar topic 消息 messageId pulsar websocket

WebSocket简介

  通过 Pulsar 的 WebSocket API,用户可以简单便捷地与 Pulsar 进行交互,WebSocket API 不依赖于官方客户端库。 WebSocket 支持使用 Java、Go、Python 和 C++ 客户端中提供的所有功能来发布和订阅消息。

运行 WebSocket 服务

在非单机模式下,有两种方法可以部署 WebSocket 服务:1. 嵌入 Pulsar Broker。 2.作为一个独立的组件。

嵌入 Pulsar Broker

在这种模式下,WebSocket 服务会使用已经在 broker 中运行的 HTTP 服务。 要启用此模式,需在安装目录下的 conf/broker.conf 文件中设置 webSocketServiceEnabled 参数。
webSocketServiceEnabled=true

作为一个独立的组件

在这种模式下,WebSocket 会作为单独的服务在 Pulsar broker 上运行。 运行此模式,需在 conf/websocket.conf 文件中进行配置。
configurationStoreServers=zk1:2181,zk2:2181,zk3:2181
webServicePort=8080
clusterName=my-cluster

启动 Broker

配置完成后,你可以使用 pulsar-daemon 命令来启动服务:
bin/pulsar-daemon start websocket
也可以使用restart命令来重启websocket
bin/pulsar-daemon restart websocket

API 手册

Pulsar 的 WebSocket API 提供三个端点,用于生产消息、消费消息和阅读消息。
所有通过 WebSocket API 的数据都使用 JSON 进行交互。

Producer 端

Producer 端需要在 URL 中指定租户、命名空间和 topic,例如:
ws://broker-service-url:8080/ws/v2/producer/persistent/:tenant/:namespace/:topic
ws://broker-service-url:8080/ws/v2/producer/persistent/:tenant/:namespace/:topic?参数1=值&参数2=值

查询参数:
| Key                     | 类型      | 是否必需 | 说明                                                                                                                   |
| ----------------------- | ------- | ---- | -------------------------------------------------------------------------------------------------------------------- |
| sendTimeoutMillis       | long    | 否    | 发送超时(默认值:30秒)                                                                                                        |
| batchingEnabled         | boolean | 否    | 启用批量缓存消息(默认值:false)                                                                                                  |
| batchingMaxMessages     | int     | 否    | 批量消息数最大值(默认值:1000)                                                                                                   |
| maxPendingMessages      | int     | 否    | 设置消息内部队列的最大值(默认值:1000)                                                                                               |
| batchingMaxPublishDelay | long    | 否    | 批量处理消息的时间(默认:10毫秒)                                                                                                   |
| messageRoutingMode      | string  | 否    | Message routing mode for the partitioned producer: SinglePartition, RoundRobinPartition                              |
| compressionType         | string  | 否    | 压缩类型:LZ4 或 ZLIB                                                                                                      |
| producerName            | string  | 否    | Specify the name for the producer. Pulsar will enforce only one producer with same name can be publishing on a topic |
| initialSequenceId       | long    | 否    | 设置 producer 发布消息序列 id 的标准。                                                                                           |
| hashingScheme           | string  | 否    | Hashing function to use when publishing on a partitioned topic: JavaStringHash, Murmur3_32Hash                       |
发布消息:
{
  "payload": "SGVsbG8gV29ybGQ=",
  "properties": {"key1": "value1", "key2": "value2"},
  "context": "1"
}
| ey                  | 类型     | 是否必需 | 说明               |
| ------------------- | ------ | ---- | ---------------- |
| payload             | string | 是    | Base-64 编码的负载    |
| properties          | 键值对    | 否    | 应用程序定义的属性        |
| context             | string | 否    | 应用程序定义的请求标识符     |
| key                 | string | 否    | 分区 topic 中使用的分区  |
| replicationClusters | 数组     | 否    | 根据名称允许添加到集群列表的副本 |
响应成功示例
{
   "result": "ok",
   "messageId": "CAAQAw==",
   "context": "1"
 }
响应失败示例
 {
   "result": "send-error:3",
   "errorMsg": "Failed to de-serialize from JSON",
   "context": "1"
 }
| Key       | 类型     | 是否必需 | 说明                |
| --------- | ------ | ---- | ----------------- |
| result    | string | 是    | 发送成功则为 ok,否则抛出异常  |
| messageId | string | 是    | 已发布消息的 Message ID |
| context   | string | 否    | 应用程序定义的请求标识符      |

Consumer 端

Concumer 端要求在 URL 中指定租户、命名空间、topic 和订阅:
ws://broker-service-url:8080/ws/v2/consumer/persistent/:tenant/:namespace/:topic/:subscription
ws://broker-service-url:8080/ws/v2/consumer/persistent/:tenant/:namespace/:topic/:subscription?参数1=值&参数2=值

查询参数
| Key               | 类型      | 是否必需 | 说明                                                                                                      |
| ----------------- | ------- | ---- | ------------------------------------------------------------------------------------------------------- |
| ackTimeoutMillis  | long    | 否    | 设置未完成消息确认的超时时间(默认值:0)                                                                                   |
| subscriptionType  | string  | 否    | 订阅类型:独占、灾备、共享                                                                                           |
| receiverQueueSize | int     | 否    | Consumer 接收队列的大小(默认:1000)                                                                               |
| consumerName      | string  | 否    | Consumer 的名称                                                                                            |
| priorityLevel     | int     | 否    | 指定 consumer 的优先级                                                                                        |
| maxRedeliverCount | int     | 否    | Define a maxRedeliverCount for the consumer (default: 0). 启用 Dead Letter Topic 。                        |
| deadLetterTopic   | string  | 否    | Define a deadLetterTopic for the consumer (default: {topic}-{subscription}-DLQ). 启用 Dead Letter Topic 。 |
| pullMode          | boolean | 否    | Enable pull mode (default: false). See “Flow Control” below.                                            |

注意:以上参数(pullMode 除外)适用于 WebSocket 服务的内部 consumer。 因此,即使客户端没有在 WebSocket 上消费,只要消息进入接收队列,就会受到传递设置的约束。

接收消息
{
  "messageId": "CAAQAw==",
  "payload": "SGVsbG8gV29ybGQ=",
  "properties": {"key1": "value1", "key2": "value2"},
  "publishTime": "2016-08-30 16:45:57.785"
}
| Key         | 类型     | 是否必需 | 说明                 |
| ----------- | ------ | ---- | ------------------ |
| messageId   | string | 是    | 消息 ID              |
| payload     | string | 是    | Base-64 编码的负载      |
| publishTime | string | 是    | 发布时间戳              |
| properties  | 键值对    | 否    | 应用程序定义的属性          |
| key         | string | 否    | Producer 设置的原始路由密钥 |

ACK 确认消息:
{
  "messageId": "CAAQAw=="
}
Key	类型	是否必需	说明
messageId	string	是	处理消息的消息ID

推送模式:
默认情况下(pullMode=false),consumer 端使用 receiverQueueSize 参数设置内部接收队列的大小,并限制传递到 WebSocket 客户端的未确认消息数。 在这种模式下,如果不发送消息确认,发送到 WebSocket 客户端的消息达到 receiverQueueSize时,Pulsar WebSocket 将停止发送消息。

拉取模式
如果设置 pullMode 为 true,则 WebSocket 客户端需要使用 permit 命令允许 Pulsar WebSocket 服务发送更多消息。
{
  "type": "permit",
  "permitMessages": 100
}
| Key            | 类型     | 是否必需 | 说明                              |
| -------------- | ------ | ---- | ------------------------------- |
| type           | string | 是    | Type of command. Must be permit |
| permitMessages | int    | 是    | 允许的消息数量                         |
注意:在这种模式下,可以在不同的连接中确认消息。

Reader 端:
ws://broker-service-url:8080/ws/v2/reader/persistent/:tenant/:namespace/:topic

查询参数
| Key               | 类型          | 是否必需 | 说明                                                             |
| ----------------- | ----------- | ---- | -------------------------------------------------------------- |
| readerName        | string      | 否    | Reader name                                                    |
| receiverQueueSize | int         | 否    | Consumer 接收队列的大小(默认:1000)                                      |
| messageId         | int or enum | 否    | Message ID to start from, earliest or latest (default: latest) |

接收消息
{
  "messageId": "CAAQAw==",
  "payload": "SGVsbG8gV29ybGQ=",
  "properties": {"key1": "value1", "key2": "value2"},
  "publishTime": "2016-08-30 16:45:57.785"
}

| Key         | 类型     | 是否必需 | 说明                 |
| ----------- | ------ | ---- | ------------------ |
| messageId   | string | 是    | 消息 ID              |
| payload     | string | 是    | Base-64 编码的负载      |
| publishTime | string | 是    | 发布时间戳              |
| properties  | 键值对    | 否    | 应用程序定义的属性          |
| key         | string | 否    | Producer 设置的原始路由密钥 |

ACK 确认消息
在WebSocket中,阅读器需要确认消息处理成功,以便Pulsar WebSocket服务更新挂起消息的数量。如果不发送确认,Pulsar WebSocket服务将在达到pendingMessages限制后停止发送消息。
{
  "messageId": "CAAQAw=="
}
| Key       | 类型     | 是否必需 | 说明        |
| --------- | ------ | ---- | --------- |
| messageId | string | 是    | 处理消息的消息ID |

标签:协议,WebSocket,string,Pulsar,topic,消息,messageId,pulsar,websocket
From: https://www.cnblogs.com/shaokai7878/p/16728320.html

相关文章

  • 接口协议(4) - USB
    USB(UniversalSerialBus,通用串行总线)是一种新兴的并逐渐取代其他接口标准的数据通信方式,作为一种高速串行总线,其极高的传输速度可以满足高速数据传输的应用环境要求,且该......
  • 实验3:OpenFlow协议分析实践
    一、实验目的1.能够运用wireshark对OpenFlow协议数据交互过程进行抓包;2.能够借助包解析工具,分析与解释OpenFlow协议的数据包交互过程与机制。二、实验环境Ubuntu......
  • 实验3:OpenFlow协议分析实践
    实验3:OpenFlow协议分析实践实验目的能够运用wireshark对OpenFlow协议数据交互过程进行抓包;能够借助包解析工具,分析与解释OpenFlow协议的数据包交互过程与机制。......
  • 以太网协议
    以太网协议以太帧格式ethernetv2帧格式802.3帧格式以太帧长度为什么是64到1518参考资料以太网协议以太网协议是局域网的一种标准,被IEEE采纳定为802.3标准......
  • 路由协议
    最短路径常用的有两种方法,一种是Bellman-Ford算法,一种是Dijkstra算法。一、距离矢量路由算法第一大类的算法称为距离矢量路由(distancevectorrouting)。它是基于Bell......
  • http协议详解:HTTP报文、请求方法、HTTP状态码
    简介HTTP协议,即超文本传输协议(Hypertexttransferprotocol)。是一种详细规定了浏览器和万维网(WWW=WorldWideWeb)服务器之间互相通信的规则,通过因特网传送万维网文......
  • 5.动态路由协议和RIP
    静态路由——环回口R1-f0/0:32.32.12.1—R2-f0/0:32.32.12.2环回口:两设备相连,没有第三台设备但还要验证自己的静态路由时,在路由器上取的逻辑的虚拟接口,一般用来测试使......
  • 解决 WebSocketClient.js?5586:16 WebSocket connection to 'ws://192.168.13.25:8080
    控制台报错: vue.config.jsVue的配置文件const{defineConfig}=require('@vue/cli-service')module.exports=defineConfig({devServer:{host:'0.0.0......
  • 如何循环存储以呈现有关戴森协议的博客(第 6 部分)
    如何循环存储以呈现有关戴森协议的博客(第6部分)欢迎回到戴森协议教程系列。请注意,UI正在开发中,提供的某些屏幕截图可能无法反映我们网站UI的当前版本。在我们的上......
  • RTMP_PUSH协议的通道IP不正确是什么原因?该如何解决?
    EasyCVR视频融合云平台基于云边端一体化架构,兼容性高、拓展性强,可支持多类型设备、多协议方式接入,包括国标GB/T28181、RTMP、RTSP/Onvif协议,以及厂家的私有协议,如:海康Ehome......