首页 > 其他分享 >Web Socket连接以及STOMP协议

Web Socket连接以及STOMP协议

时间:2024-12-31 11:12:09浏览次数:1  
标签:Web WebSocket Socket STOMP headers frame message self 客户端

一、WebSocket

WebSocket是一种网络通信协议,它允许在Web应用程序和服务器之间建立实时、双向的通信连接。

WebSocket协议基于TCP,它允许客户端和服务器之间建立一个持久的连接,通过这个连接可以实时交换数据,就像聊天一样,双方可以随时发送和接收信息。

WebSocket的生命周期包括四个阶段:连接建立、连接开放、连接关闭和连接关闭完成。

WebSocket的消息格式包括消息头和消息体。消息头包含控制位和有效载荷长度等信息,而消息体则是实际传输的数据。

WebSocket API提供了在Web应用程序中创建和管理WebSocket连接的接口。以下是一些常用的API:

WebSocket 构造函数:创建WebSocket对象。
send() 方法:向服务器发送数据。
onopen、onmessage、onerror、onclose 事件:处理WebSocket事件。

WebSocket的优势和劣势

优势 劣势
实时性: 实现实时数据传输,无需轮询。 兼容性要求: 需要浏览器和服务器都支持WebSocket。
双向通信: 服务器可以主动推送数据到客户端。 额外开销: 服务器需要维护长时间连接,增加了资源消耗。
减少网络负载: 持久化连接减少了HTTP请求的数量。 安全问题: 需要确保只向合法客户端发送数据。

二、HTTP与WebSocket区别

HTTP (超文本传输协议)

  1. 请求-响应模式
    • HTTP是一种无状态的请求-响应协议,客户端发送请求,服务器返回响应。
    • 每个请求都必须由客户端发起,服务器不能主动向客户端发送数据。
  2. 状态
    • HTTP本身是无状态的,但通过使用cookies和session可以维持状态。
  3. 用途
    • 适用于典型的网页浏览,如获取HTML页面、图片、视频等。
  4. 开销
    • 由于每次交互都需要完整的请求头和响应头,HTTP的开销相对较大。
  5. 全双工通信
    • HTTP本身不支持全双工通信,但可以通过轮询(polling)或长轮询(long polling)模拟。
  6. 协议
    • 通常运行在TCP协议之上,默认端口为80(HTTP)和443(HTTPS)。

WebSocket

  1. 全双工通信
    • WebSocket提供全双工通信通道,允许服务器和客户端在任何时候互相发送消息。
  2. 持久连接
    • 一旦建立WebSocket连接,该连接将保持开放状态,直到任何一方显式地关闭连接。
  3. 轻量级
    • WebSocket在建立连接后,只需要发送少量的数据来交换消息,因此开销较小。
  4. 用途
    • 适用于需要实时、双向通信的应用,如在线游戏、实时交易系统、实时聊天等。
  5. 状态
    • WebSocket连接本身是持久的,因此不需要额外的机制来维持状态。
  6. 协议
    • WebSocket也运行在TCP协议之上,但使用不同的端口,通常是80(ws)和443(wss)。

WebSocket的升级过程

HTTP升级为WebSocket的过程涉及一个特殊的HTTP请求,该请求通过发送特定的HTTP头信息来指示服务器将当前的HTTP连接升级为WebSocket连接。

  1. 客户端发起WebSocket握手请求: 客户端发送一个标准的HTTP请求到服务器,但请求中包含了一些特殊的头信息,表明客户端想要将连接升级为WebSocket连接。

    示例请求可能如下所示:

    GET /ws endpoint HTTP/1.1
    Host: example.com
    Upgrade: websocket
    Connection: Upgrade
    Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==
    Sec-WebSocket-Version: 13
    

    这里重要的头信息包括:

    • Upgrade: websocket:告诉服务器客户端想要升级协议到WebSocket。
    • Connection: Upgrade:与Upgrade头一起使用,表示客户端想要升级协议。
    • Sec-WebSocket-Key:一个Base64编码的随机值,服务器将使用这个值来构造一个响应,以证明它理解WebSocket协议。
    • Sec-WebSocket-Version:指示客户端使用的WebSocket协议版本。
  2. 服务器响应WebSocket握手: 如果服务器支持WebSocket协议,并且愿意升级连接,它会返回一个包含以下头信息的HTTP响应:

    HTTP/1.1 101 Switching Protocols
    Upgrade: websocket
    Connection: Upgrade
    Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo=
    

    这里重要的头信息包括:

    • 101 Switching Protocols:这是一个标准的HTTP状态码,表示服务器同意切换协议。
    • Upgrade: websocketConnection: Upgrade:确认协议升级。
    • Sec-WebSocket-Accept:服务器根据客户端发送的Sec-WebSocket-Key计算出的值,用来证明它理解WebSocket协议。

    服务器计算Sec-WebSocket-Accept的值是通过将客户端发送的Sec-WebSocket-Key与一个特定的GUID(“258EAFA5-E914-47DA-95CA-C5AB0DC85B11”)拼接,然后对拼接后的字符串进行SHA-1散列,并将结果进行Base64编码。

  3. 完成握手,建立WebSocket连接: 一旦客户端接收到服务器的101响应,握手过程就完成了,客户端和服务器现在可以通过WebSocket连接进行全双工通信。此时,HTTP连接被“升级”为WebSocket连接,并且不再遵循HTTP协议的请求-响应模式。

WebSocket连接保持开放,直到客户端或服务器显式地发送关闭帧来关闭连接。在WebSocket连接期间,数据可以通过帧的形式在客户端和服务器之间双向传输。

三、STOMP

STOMP代表简单文本导向的消息协议(Simple Text Oriented Messaging Protocol)。由于WebSockets是一种低级协议,使用帧(frames)来传输数据,而STOMP是一种高级协议,定义了如何解释某些帧类型中的数据。

这些帧类型包括CONNECT、SEND、ACK等。因此,使用STOMP能够更加简化使用WebSockets进行数据的发送、接收和解析过程。

WebSocket 与 STOMP 的区别

WebSocket 是一种基于 TCP 的协议,它提供了全双工通信,允许客户端和服务器之间的实时数据传输。WebSocket 是一种低级别的通信协议,它不提供消息队列、主题和订阅等功能。

STOMP 是一种基于 TCP 的消息协议,它提供了消息队列、主题和订阅等功能。STOMP 可以运行在 WebSocket 上,它使用了 WebSocket 来实现实时通信。STOMP 是一种高级别的通信协议,它提供了更丰富的功能。stomp客户端实现了心跳维护,订阅,取消订阅,发布等功能

总之,WebSocket 是一种通信协议,而 STOMP 是基于 WebSocket 的一种消息协议,它提供了更丰富的功能。

STOMP报文的组成

COMMAND
header1:value1
header2:value2
 
Body^@

//例
SEND   //作为COMMAND
USER-TOKEN:value1     //作为Headers
content-type:application/json   //作为Headers

Hello, stomp.  //消息内容,可以多行,直到^@为止  //作为Body
^@ //此Frame结束
属性 类型 说明
command String 动作,例如 (“CONNECT”, "SEND"等)
headers JavaScript object 请求头
body String 请求体

报文示例

  1. CONNECT

    • 客户端发送CONNECT报文以启动与STOMP服务器的连接。

    • 例子:

      CONNECT
      accept-version:1.2
      host:stomp.server.org
      
    • 服务器响应CONNECTED报文。

  2. CONNECTED

    • 服务器发送CONNECTED报文作为对CONNECT报文的响应。

    • 例子:

      CONNECTED
      version:1.2
      session:session1234
      server:my-stomp-server/1.0.0
      heart-beat:10000,10000
      
  3. SEND

    • 客户端或服务器发送SEND报文以发送消息到目的地。

    • 例子:

      SEND
      destination:/queue/test
      content-type:text/plain
      
      Hello, STOMP!
      
  4. SUBSCRIBE

    • 客户端发送SUBSCRIBE报文以订阅一个或多个目的地。

    • 例子:

      SUBSCRIBE
      id:0
      destination:/queue/test
      ack:auto
      
  5. UNSUBSCRIBE

    • 客户端发送UNSUBSCRIBE报文以取消订阅之前订阅的目的地。

    • 例子:

      UNSUBSCRIBE
      id:0
      
  6. ACK

    • 客户端发送ACK报文以确认消息的接收。

    • 例子:

      ACK
      id:0
      
  7. NACK

    • 客户端发送NACK报文以表示消息没有被确认(通常用于消息处理失败)。

    • 例子:

      NACK
      id:0
      
  8. BEGIN

    • 客户端发送BEGIN报文以开始一个事务。

    • 例子:

      复制

      BEGIN
      transaction:tx1
      
  9. COMMIT

    • 客户端发送COMMIT报文以提交一个事务。

    • 例子:

      复制

      COMMIT
      transaction:tx1
      
  10. ABORT

    • 客户端发送ABORT报文以回滚一个事务。

    • 例子:

      复制

      ABORT
      transaction:tx1
      
  11. DISCONNECT

    • 客户端发送DISCONNECT报文以优雅地断开与服务器的连接。

    • 例子:

      复制

      DISCONNECT
      receipt:id-12345
      
  12. ERROR

    • 服务器发送ERROR报文以通知客户端发生了错误。

    • 例子:

      复制

      ERROR
      message:Invalid destination
      content-type:text/plain
      
      The destination /queue/invalid does not exist.
      

四、代码实现

服务端(Spring Boot)

添加依赖

<!-- Spring Boot WebSocket依赖 -->
  <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-websocket</artifactId>
  </dependency>

  <dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-messaging</artifactId>
  </dependency>

使用一个嵌入式消息代理,它将是一个提供WebSocket功能的内存中代理。给代理添加一些目的地。这些目的地指的是将要发送消息的路径。

@Configuration
@EnableWebSocketMessageBroker
@Order(Ordered.HIGHEST_PRECEDENCE + 999)
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        // 配置消息代理,支持客户端订阅的消息
        registry.setApplicationDestinationPrefixes("/app"); // 服务器处理的前缀
        // 订阅点对点用户的消息 (携带用户名返回)
        registry.setUserDestinationPrefix("/user");

        // 自定义调度器,用于控制心跳线程
        ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
        // 线程池线程数,心跳连接开线程
        taskScheduler.setPoolSize(3);
        // 线程名前缀
        taskScheduler.setThreadNamePrefix("websocket-heartbeat-thread-");
        // 初始化
        taskScheduler.initialize();

        // 配置服务端推送消息给客户端的代理路径
        registry.enableSimpleBroker("/app","/topic", "/queue")
                .setHeartbeatValue(new long[]{15000, 15000})
                .setTaskScheduler(taskScheduler);

//        registry.enableSimpleBroker("/app","/topic", "/queue");

    }

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        // 配置 STOMP 连接端点,使用 SockJS 作为回退
        registry.addEndpoint("/ws").setAllowedOriginPatterns("*");
        registry.addEndpoint("/ws").setAllowedOriginPatterns("*").withSockJS();
    }

    @Override
    public void configureClientInboundChannel(ChannelRegistration registration) {
        registration.interceptors(new AuthChannelInterceptor());
    }


    @Override
    public void configureWebSocketTransport(WebSocketTransportRegistration registry) {
        registry.setSendTimeLimit(15000) // 发送消息时间限制
                .setSendBufferSizeLimit(512 * 1024) // 发送缓冲区大小限制
                .setMessageSizeLimit(1024 * 1024); // 消息大小限制
    }


	//	主要实现拦截器,完成连接后发送的验证,订阅的消息发送给客户端会到拦截器进行处理
	// 如果拦截器拦截消息不通过,客户端接收不到订阅的消息通知
  public static class AuthChannelInterceptor implements ChannelInterceptor {
          /**
           * 连接前监听
           *
           * @param message
           * @param channel
           * @return
           */
          @Override
          public Message<?> preSend(Message<?> message, MessageChannel channel) {
              StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
              System.out.println("连接信息:" + accessor);
              System.out.println("perSend => message:" + message + "; channel:" + channel);
              //不是首次连接,已经登陆成功
              return message;
          }

          @Override
          public void postSend(Message<?> message, MessageChannel channel, boolean sent) {
              System.out.println("postSend => message:" + message + "; channel:" + channel + "; sent:" + sent);
          }
      }
}

在第一部分中,启用了一个带有两个目的地(/app和/topic)的代理。/app目的地将用于向所有用户发送通知,/topic目的地用于向特定用户(订阅主题)发送通知。

接下来,设置应用程序的目的地,即 /app,这样就可以向应用程序发送信息了。

在第二部分中,注册了STOMP端点。其中一个启用了SockJS,另一个仅使用WebSocket。之所以这样做,是因为并非所有浏览器都支持WebSocket,当不可用时,可以回退到使用SockJS。

向所有用户发送推送通知。

为此,首先实现一个控制器,该控制器会把来自一个客户端的信息转发给所有客户端。

@Controller
public class StompController {

    @Autowired
    SimpMessagingTemplate messaging;

    @MessageMapping("/hello")
    public void processMessage(String message) {
        // 处理接收到的消息
        System.out.println("Received message: " + message);
        // 返回消息到/topic/greetings
        messaging.convertAndSend("/topic/greetings",message);
    }

    @MessageMapping("/hello")
    @SendTo("/topic/greetings")
    public String processMessage(String message) {
        // 处理接收到的消息
        System.out.println("Received message: " + message);
        return "Hello, " + message + "!";
    }
}

客户端(SockJs)

<script src="https://cdn.bootcdn.net/ajax/libs/sockjs-client/1.6.1/sockjs.min.js"></script>
<script src="https://cdn.bootcdn.net/ajax/libs/stomp.js/2.3.3/stomp.min.js"></script>
<script type="text/javascript">
    var stompClient = null;

    function connect() {
      var socket = new SockJS("http://localhost:8080/ws");
      stompClient = Stomp.over(socket);

      // 连接到服务器
      stompClient.connect({}, function (frame) {
        console.log('Connected: ' + frame);
        // 订阅主题
        stompClient.subscribe('/topic/greetings', function (greeting) {
          // 打印接收到的消息
          console.log('Received message: ', greeting.body);
        });
      });

      // 错误处理
      stompClient.onerror = function (error) {
        console.error('WebSocket error: ' + error);
      };
    }

    function disconnect() {
      if (stompClient != null) {
        stompClient.disconnect();
      }
      console.log("Disconnected");
    }

    function sendMsg(data) {
      stompClient.send("/app/hello", {}, JSON.stringify(data))
    }
</script>

客户端(stomp.py)

import time

import stomp


class MyListener(stomp.ConnectionListener):
   def on_connecting(self, host_and_port):
       """
       Called by the STOMP connection once a TCP/IP connection to the
       STOMP server has been established or re-established. Note that
       at this point, no connection has been established on the STOMP
       protocol level. For this, you need to invoke the "connect"
       method on the connection.

       :param (str,int) host_and_port: a tuple containing the host name and port number to which the connection
           has been established.
       """
       pass

   def on_connected(self, frame):
       """
       Called by the STOMP connection when a CONNECTED frame is
       received (after a connection has been established or
       re-established).

       :param Frame frame: the stomp frame
       """
       pass

   def on_disconnecting(self):
       """
       Called before a DISCONNECT frame is sent.
       """
       pass

   def on_disconnected(self):
       """
       Called by the STOMP connection when a TCP/IP connection to the
       STOMP server has been lost.  No messages should be sent via
       the connection until it has been reestablished.
       """
       pass

   def on_heartbeat_timeout(self):
       """
       Called by the STOMP connection when a heartbeat message has not been
       received beyond the specified period.
       """
       pass

   def on_before_message(self, frame):
       """
       Called by the STOMP connection before a message is returned to the client app. Returns a tuple
       containing the headers and body (so that implementing listeners can pre-process the content).

       :param Frame frame: the stomp frame
       """
       pass

   def on_message(self, frame):
       """
       Called by the STOMP connection when a MESSAGE frame is received.

       :param Frame frame: the stomp frame
       """
       print('Received message: {0}'.format(frame.body))
       pass

   def on_receipt(self, frame):
       """
       Called by the STOMP connection when a RECEIPT frame is
       received, sent by the server if requested by the client using
       the 'receipt' header.

       :param Frame frame: the stomp frame
       """
       pass

   def on_error(self, frame):
       """
       Called by the STOMP connection when an ERROR frame is received.

       :param Frame frame: the stomp frame
       """
       pass

   def on_send(self, frame):
       """
       Called by the STOMP connection when it is in the process of sending a message

       :param Frame frame: the stomp frame
       """
       pass

   def on_heartbeat(self):
       """
       Called on receipt of a heartbeat.
       """
       pass

   def on_receiver_loop_completed(self, frame):
       """
       Called when the connection receiver_loop has finished.
       """
       pass

# 创建连接对象
conn = stomp.WSStompConnection([('localhost', 8080)], ws_path="/ws")  # 使用正确的服务器地址和端口

# 创建一个监听器实例
listener = MyListener()

# 将监听器添加到连接对象
conn.set_listener('', listener)

# 连接到STOMP服务器
conn.connect(wait=True)

# 订阅消息
conn.subscribe(destination='/topic/greetings', id=1, ack='auto')

time.sleep(1)

conn.send(destination="/app/hello", body="1111")
# 保持连接,以便可以持续接收消息
# 在实际应用中,你可能需要添加逻辑来优雅地断开连接
try:
   while True:
       pass
except KeyboardInterrupt:
   conn.disconnect()

# 注意:这个例子只是为了演示,实际应用中需要处理连接断开和异常情况,以及一些回掉方法

通过WebSockt发送frame帧,手动拼接报文形式

import websocket
import time
from threading import Thread
import uuid
from constants import *

BYTE = {
    'LF': '\x0A',
    'NULL': '\x00'
}

VERSIONS = '1.0,1.1'

class Stomp:
    def __init__(self, host, sockjs=False, wss=True):
        """
        Initialize STOMP communication. This is the high level API that is exposed to clients.

        Args:
            host: Hostname
            sockjs: True if the STOMP server is sockjs
            wss: True if communication is over SSL
        """
        # websocket.enableTrace(True)
        ws_host = host if sockjs is False else host + "/websocket"
        protocol = "ws://" if wss is False else "wss://"

        self.url = protocol + ws_host
        print("websocket url:" + self.url)
        self.dispatcher = Dispatcher(self)

        # maintain callback registry for subscriptions -> topic (str) vs callback (func)
        self.callback_registry = {}
        self.on_error = None
        self.on_connect = None
        self.on_message = None
        self.on_close = None

    def connect(self, username=None, passcode=None):
        """
        Connect to the remote STOMP server
        """
        # set flag to false
        self.connected = False

        # attempt to connect
        self.dispatcher.connect(username, passcode)

        # wait until connected
        start_time = time.time()
        timeout = 10  # 10 seconds
        while self.connected is False:
            if time.time() - start_time > timeout:
                print("Connection timed out")
                return False
            time.sleep(.50)
        if self.on_connect is not None:
            self.on_connect(self.connected)

        return self.connected

    def disconnect(self):
        """
        Disconnect from the remote STOMP server
        """
        self.dispatcher.ws.close()
        self.connected = False
        if self.on_close is not None:
            self.on_close()

    def subscribe(self, destination, id=None, ack='auto', callback=None):
        """
        Subscribe to a destination and supply a callback that should be executed when a message is received on that destination
        """
        # create entry in registry against destination
        if callback is not None:
            self.callback_registry[destination] = callback

        # transmit subscribe frame
        self.dispatcher.subscribe(destination, id, ack)

    def send(self, destination, message):
        """
        Send a message to a destination
        """
        self.dispatcher.send(destination, message)


class Dispatcher:
    def __init__(self, stomp):
        """
        The Dispatcher handles all network I/O and frame marshalling/unmarshalling
        """
        self.stomp = stomp
        # websocket.enableTrace(True)  # 开启调试信息
        self.ws = websocket.WebSocketApp(self.stomp.url)

        self.ws.ping_interval = 30
        self.ws.ping_timeout = 10
        # register websocket callbacks
        self.ws.on_open = self._on_open
        self.ws.on_message = self._on_message
        self.ws.on_error = self._on_error
        self.ws.on_close = self._on_close
        self.ws.on_ping = self._on_ping

        # run event loop on separate thread
        Thread(target=self.ws.run_forever, kwargs={'ping_interval': 10, 'ping_timeout': 8}).start()

        self.opened = False

        # wait until connected
        start_time = time.time()
        timeout = 10  # 10 seconds
        while self.opened is False:
            if time.time() - start_time > timeout:
                print("WebSocket Connection timeout")
                break
            time.sleep(.50)

    def _on_message(self, ws, message):
        """
        Executed when messages is received on WS
        """
        print("<<< " + message)
        if len(message) > 0:
            command, headers, body = self._parse_message(message)
            # if connected, let Stomp know
            if command == "CONNECTED":
                self.stomp.connected = True
            # if message received, call appropriate callback
            if command == "MESSAGE":
                # 检查字典中是否存在该主题的回调函数
                if headers['destination'] in self.stomp.callback_registry:
                    self.stomp.callback_registry[headers['destination']](body)
            # if message is acked, let Stomp know
            if command == CMD_ACK:
                print("ACK: " + headers['id'])

            if command != '':
                if self.stomp.on_message is not None:
                    self.stomp.on_message(command, headers, body)

    def _on_error(self, ws, error):
        """
        Executed when WS connection errors out
        """
        print(error)
        if hasattr(self.stomp, 'on_error') and self.stomp.on_error is not None:
            self.stomp.on_error(error)

    def _on_close(self, ws, code, reason):
        """
        Executed when WS connection is closed
        """
        print("### closed ###")
        if hasattr(self.stomp, 'on_close') and self.stomp.on_close is not None:
            self.stomp.on_close(code, reason)

    def _on_open(self, ws):
        """
        Executed when WS connection is opened
        """
        self.opened = True

    def _on_ping(self, ws, message):
        print("### ping ###")

    def _transmit(self, command, headers, msg=None):
        """
        Marshalls and transmits the frame
        """
        # Contruct the frame
        lines = []
        lines.append(command + BYTE['LF'])

        # add headers
        for key in headers:
            lines.append(key + ":" + headers[key] + BYTE['LF'])

        lines.append(BYTE['LF'])

        # add message, if any
        if msg is not None:
            lines.append(msg)

        # terminate with null octet
        lines.append(BYTE['NULL'])

        frame = ''.join(lines)

        # transmit over ws
        print(">>>" + frame)
        # if not self.dispatcher.ws.keep_running:
        #     print("Cannot send message: connection is closed.")
        #     return
        self.ws.send(frame)

    def _parse_message(self, frame):
        """
        Returns:
            command
            headers
            body

        Args:
            frame: raw frame string
        """
        lines = frame.split(BYTE['LF'])

        command = lines[0].strip()
        headers = {}

        # get all headers
        i = 1
        while lines[i] != '':
            # get key, value from raw header
            (key, value) = lines[i].split(':')
            headers[key] = value
            i += 1

        # set body to None if there is no body
        if i < len(lines) - 1:
            body = None if lines[i + 1] == BYTE['NULL'] else ''.join(lines[i + 1:len(lines) - 1])
            if body is not None:
                body = body.replace('\x00', '')
        else:
            body = None

        return command, headers, body

    def connect(self, username=None, passcode=None):
        """
        Transmit a CONNECT frame
        """
        headers = {}

        headers[HDR_HOST] = '/'
        headers[HDR_ACCEPT_VERSION] = VERSIONS
        headers[HDR_HEARTBEAT] = '10000,10000'

        if username is not None:
            headers[HDR_LOGIN] = username

        if passcode is not None:
            headers[HDR_PASSCODE] = passcode

        self._transmit(CMD_CONNECT, headers)

    def subscribe(self, destination, id, ack):
        """
        Transmit a SUBSCRIBE frame
        """
        headers = {}

        # TODO id should be auto generated
        if id is None:
            id = str(uuid.uuid4())

        headers[HDR_ID] = id
        headers[CMD_ACK] = ack
        headers[HDR_DESTINATION] = destination

        self._transmit(CMD_SUBSCRIBE, headers)

    def send(self, destination, message):
        """
        Transmit a SEND frame
        """
        headers = {}

        headers[HDR_DESTINATION] = destination
        headers[HDR_CONTENT_LENGTH] = str(len(message))

        self._transmit(CMD_SEND, headers, msg=message)

    def ack(self, message_id, subscription):
        """
        Transmit an ACK frame
        ACK 命令用于确认消息已成功处理
        当客户端接收到消息时,消息的头部会包含 message-id 字段。客户端需要从这个字段中提取 message_id
        在订阅消息时,客户端会指定一个 id,这个 id 就是 subscription
        """
        headers = {}

        headers['id'] = message_id
        headers['subscription'] = subscription

        self._transmit(CMD_ACK, headers)


def do_thing_a(msg):
    print("MESSAGE: " + msg)


def main(url, *sub_topic, **send_topic):
    stomp = Stomp(url, sockjs=False, wss=False)
    stomp.connect()
    stomp.subscribe(destination="/topic/greetings", id="sub-0", callback=do_thing_a)
    time.sleep(2)
    stomp.send("/app/hello", '{"name":"akshaye"}')


if __name__ == "__main__":
    main("localhost:8080/ws")
"""
The STOMP command and header name strings.
"""
# 报文请求头关键字定义
HDR_ACCEPT_VERSION = "accept-version"
HDR_ACK = "ack"
HDR_CONTENT_LENGTH = "content-length"
HDR_CONTENT_TYPE = "content-type"
HDR_DESTINATION = "destination"
HDR_HEARTBEAT = "heart-beat"
HDR_HOST = "host"
HDR_ID = "id"
HDR_MESSAGE_ID = "message-id"
HDR_LOGIN = "login"
HDR_PASSCODE = "passcode"
HDR_RECEIPT = "receipt"
HDR_SUBSCRIPTION = "subscription"
HDR_TRANSACTION = "transaction"

#命令帧形式定义
CMD_ABORT = "ABORT"
CMD_ACK = "ACK"
CMD_BEGIN = "BEGIN"
CMD_COMMIT = "COMMIT"
CMD_CONNECT = "CONNECT"
CMD_DISCONNECT = "DISCONNECT"
CMD_NACK = "NACK"
CMD_STOMP = "STOMP"
CMD_SEND = "SEND"
CMD_SUBSCRIBE = "SUBSCRIBE"
CMD_UNSUBSCRIBE = "UNSUBSCRIBE"

标签:Web,WebSocket,Socket,STOMP,headers,frame,message,self,客户端
From: https://www.cnblogs.com/chase-h/p/18643526

相关文章

  • 使用websocket制作一个简易的聊天系统
    创建一个简易的聊天系统前端部分使用WebSocket主要包含以下几个步骤:建立WebSocket连接处理连接打开事件发送消息接收并显示消息处理连接关闭和错误事件以下是一个简易的HTML和JavaScript示例,展示了如何使用WebSocket实现聊天系统的前端部分:HTML(index.html)<......
  • web安全测试
    1、软件安全测试概述网络世界中,安全问题无处不在,安全意识仍需提高,只有通过不断地安全测试,才能保证我们的软件不被非法入侵,才能够尽量的去规避我们软件中的一些问题,提升软件质量和安全。21世纪以来,智能化的软件成为商业决策、推广等不可缺少的利器,很多软件涉及了客户商业上重要......
  • dotnet最小webApi开发实践
    dotnet最小webApi开发实践软件开发过程中,经常需要写一些功能验证代码。通常是创建一个console程序来验证测试,但黑呼呼的方脑袋界面,实在是不讨人喜欢。Web开发目前已是网络世界中的主流,微软在asp.net框架大行其道之下,也整了个最小webapi项目开发向导。今天,我也拥抱一下新的开发......
  • Java Web学生自习管理系统
    一、项目背景与需求分析随着网络技术的不断发展和学校规模的扩大,学生自习管理系统的需求日益增加。传统的自习管理方式存在效率低下、资源浪费等问题,因此,开发一个智能化的学生自习管理系统显得尤为重要。该系统旨在提高自习室的利用率和管理效率,为学生提供方便快捷的自习预约服务......
  • 【Web】0基础学Web—正则、字符串验证正则、字符串替换
    0基础学Web—正则、字符串验证正则、字符串替换正则正则使用字符串验证正则string方法regexp方法字符串替换正则[]:每个[]代表一位[0-9a-zA-Z][A-z]:字母和_[^0-9]:排除0-9[\u4e00-\u9fa5]:中文元......
  • node.js基于web的旅游网站的设计与实现程序+论文 可用于毕业设计
    本系统(程序+源码+数据库+调试部署+开发环境)带文档lw万字以上,文末可获取源码系统程序文件列表开题报告内容一、选题背景关于旅游网站的设计与实现问题的研究,现有研究主要以提供旅游信息、预订服务等功能为主。在国内外,已经有不少旅游网站投入使用,并且在满足用户基本旅游需......
  • Web3.0热门领域:NFT项目实战解析
    背景与概述Web3.0正在引领互联网的下一次革命,其中NFT(Non-FungibleToken)作为热门领域之一,以其独特的资产标识和不可替代性,在数字艺术品、游戏、音乐、票务等领域崭露头角。开发一个完整的NFT项目需要从智能合约设计入手,结合NFT项目开发的前端实现,以及利用Web3开发生态的......
  • WebSocket 心得分享 转载
    一、前言❝本文将介绍WebSocket的封装,比如:心跳机制,重连和一些问题如何去处理❞二、背景之前,钱包相关的查询,我们是使用的轮询方案来做的,后来更新了一次需求,需要做一些实时数据统计的更新,然后顺带给钱包的余额也用长连接来做了,好,那么故事就开始了...某天,「老板:」 我钱怎么......
  • 认识webRTC
    什么是WebRTC2010年5月,谷歌收购了GlobalIPSolutions(简称GIPS),这是一家专注于VoIP和视频会议软件的公司,已开发出RTC所需的多项关键组件,如编解码器和回声消除技术。谷歌随后将GIPS技术开源,并与IETF和W3C等标准机构合作,以确保行业共识。2011年5月,谷歌发布了一个......
  • WebApiDemo
    以下是一个使用ASP.NETWebAPI(基于.NETFramework)的简单示例。1.创建ASP.NETWebAPI项目首先,确保你已经安装了VisualStudio,并且选择了包含ASP.NET和Web开发工作负载的安装选项。打开VisualStudio。选择“创建新项目”。在搜索栏中输入“ASP.NETWeb应用程序(.NETFra......