首页 > 其他分享 >http和websocket的一些思考

http和websocket的一些思考

时间:2024-01-11 11:22:51浏览次数:37  
标签:std http file lock connection 思考 websocket data conn

In CivetWeb, the terms CivetHandler and CivetWebSocketHandler are related to different types of request handling.

  1. CivetHandler:
    • CivetHandler is a generic class in CivetWeb that is used for handling HTTP requests.
    • When you create a class that inherits from CivetHandler and override its virtual functions, you can define how the server should handle various HTTP methods (GET, POST, etc.) and request types.
    • This class is used for handling traditional HTTP requests, not WebSocket connections.

Example:

class MyHandler : public CivetHandler {
public:
    bool handleGet(CivetServer *server, struct mg_connection *conn) {
        // Handle GET requests here
        return true; // Request handled successfully
    }
};

例如,Apollo代码库中proto_handler的定义,其作用是为了在dv启动阶段获取所需要的proto,从而用于图表的自定义

有兴趣的可以去Apollo开源代码库自行学习:https://github.com/ApolloAuto/apollo

proto_handler.h

#pragma once
#include <string>
#include <functional>
#include <mutex>
#include <unordered_map>

#include "CivetServer.h"

/**
 * @namespace apollo::dreamview
 * @brief apollo::dreamview
 */
namespace apollo {
namespace dreamview {

/**
 * @class ProtoHandler
 *
 * @brief The ProtoHandler, built on top of CivetHandler, transfer the proto
 * file by http for frontend to parse message.
 */
class ProtoHandler : public CivetHandler {
 public:
  ProtoHandler() {}

  bool handleGet(CivetServer *server, struct mg_connection *conn);
  std::string GenerateETag(const std::string &content);

 private:
  bool FindProtoPath(const std::string file_relative_path, std::string* file_abs_path);
  std::unordered_map<std::string, std::string> content_cache_;
  std::mutex cache_mutex_;
};

}  // namespace dreamview
}  // namespace apollo
View Code

proto_handler.cc

其重写了handleGet

#include <fstream>

#include "cyber/common/file.h"
#include "cyber/common/log.h"
#include "modules/dreamview/backend/common/handlers/proto_handler.h"

namespace apollo {
namespace dreamview {

bool ProtoHandler::handleGet(CivetServer *server, struct mg_connection *conn) {
  const struct mg_request_info *req_info = mg_get_request_info(conn);

  // parse request rui
  std::string request_uri = req_info->local_uri;

  // replace /proto to actual file root path prefix,remove /proto
  // todo(@lijin):adapt to package version,change this to a variable
  std::string file_relative_path = request_uri.substr(6);
  std::string content;

  {
    std::lock_guard<std::mutex> lock(cache_mutex_);
    if (content_cache_.find(file_relative_path) != content_cache_.end()) {
      content = content_cache_[file_relative_path];
    }
  }

  if (content.empty()) {
    std::string file_abs_path;
    if (FindProtoPath(file_relative_path, &file_abs_path) &&
        apollo::cyber::common::GetContent(file_abs_path, &content)) {
      std::lock_guard<std::mutex> lock(cache_mutex_);
      content_cache_[file_relative_path] = content;
    } else {
      mg_printf(
          conn,
          "HTTP/1.1 404 Not Found\r\nContent-Type: text/plain\r\n\r\nFile "
          "not found");
      return true;
    }
  }

  mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: text/plain\r\n");
  mg_printf(conn, "Cache-Control: max-age=86400\r\n\r\n");  // 缓存 24 小时
  // mg_printf(conn, "ETag: \"%s\"\r\n",
  //           GenerateETag(content).c_str());  // 生成并发送ETag
  mg_printf(conn, "%s", content.c_str());

  return true;
}

bool ProtoHandler::FindProtoPath(const std::string file_relative_path,
                                 std::string* file_abs_path) {
  std::string tmp_file_path;
  // source code
  tmp_file_path = "/apollo" + file_relative_path;
  if (apollo::cyber::common::PathExists(tmp_file_path)) {
    *file_abs_path = tmp_file_path;
    return true;
  }
  // package -source code
  tmp_file_path = "/apollo_workspace" + file_relative_path;
  if (apollo::cyber::common::PathExists(tmp_file_path)) {
    *file_abs_path = tmp_file_path;
    return true;
  }
  // package -source code
  tmp_file_path = "/opt/apollo/neo/src" + file_relative_path;
  if (apollo::cyber::common::PathExists(tmp_file_path)) {
    *file_abs_path = tmp_file_path;
    return true;
  }
  return false;
}

std::string ProtoHandler::GenerateETag(const std::string &content) {
  // 使用 std::hash 生成基于内容的哈希值
  std::hash<std::string> hasher;
  size_t hash = hasher(content);

  // 将哈希值转换为十六进制字符串
  std::stringstream ss;
  ss << std::hex << hash;

  return ss.str();
}

}  // namespace dreamview
}  // namespace apollo
View Code

 

  1. CivetWebSocketHandler:
    • CivetWebSocketHandler is a specific type of handler in CivetWeb designed for handling WebSocket connections.
    • WebSocket is a protocol that provides full-duplex communication channels over a single, long-lived TCP connection. It's often used for real-time applications.
    • When you create a class that inherits from CivetWebSocketHandler and override its virtual functions, you define how the server should handle WebSocket connections.

Example:

class MyWebSocketHandler : public CivetWebSocketHandler {
public:
    bool handleConnection(CivetServer *server, const struct mg_connection *conn) {
        // Callback method for when the client intends to establish a websocket connection, before websocket handshake.
        return true; // Connection accepted
    }

    void handleReadyState(CivetServer *server, struct mg_connection *conn) {
        // Callback method for when websocket handshake is successfully completed, and connection is ready for data exchange.
    }
    bool WebSocketHandler::handleData(CivetServer *server, Connection *conn,
                                  int bits, char *data, size_t data_len) {
        // Callback method for when a data frame has been received from the client.
    }
};

例如,Apollo的websocket中重写了这三个函数,同时响应拓展了支持处理json数据和binary数据,对于json数据又通过map映射了多个名为key的函数,通过前端指定不同接口名,即可通过一个websocket进行响应;此外发送也支持单播和广播,广播是在连接建立时记录了该连接,使用时遍历一次发送。

websocket_handler.h

#pragma once

#include <memory>
#include <mutex>
#include <string>
#include <thread>
#include <unordered_map>
#include <vector>

#include "CivetServer.h"
#include "nlohmann/json.hpp"

/**
 * @namespace apollo::dreamview
 * @brief apollo::dreamview
 */
namespace apollo {
namespace dreamview {

/**
 * @class WebSocketHandler
 *
 * @brief The WebSocketHandler, built on top of CivetWebSocketHandler, is a
 * websocket handler that handles different types of websocket related events.
 */
class WebSocketHandler : public CivetWebSocketHandler {
  // In case of receiving fragmented message,
  // websocket opcode and accumulated data are stored.
  thread_local static unsigned char current_opcode_;
  thread_local static std::stringstream data_;

 public:
  using Json = nlohmann::json;
  using Connection = struct mg_connection;
  using MessageHandler = std::function<void(const Json &, Connection *)>;
  using ConnectionReadyHandler = std::function<void(Connection *)>;

  explicit WebSocketHandler(const std::string &name) : name_(name) {}

  /**
   * @brief Callback method for when the client intends to establish a websocket
   * connection, before websocket handshake.
   *
   * @param server the calling server
   * @param conn the connection information
   * @returns true to keep socket open, false to close it
   */
  bool handleConnection(CivetServer *server, const Connection *conn) override {
    return true;
  }

  /**
   * @brief Callback method for when websocket handshake is successfully
   * completed, and connection is ready for data exchange.
   *
   * @param server the calling server
   * @param conn the connection information
   */
  void handleReadyState(CivetServer *server, Connection *conn) override;

  /**
   * @brief Callback method for when a data frame has been received from the
   * client.
   *
   * @details In the websocket protocol, data is transmitted using a sequence of
   * frames, and each frame received invokes this callback method. Since the
   * type of opcode (text, binary, etc) is given in the first frame, this method
   * stores the opcode in a thread_local variable named current_opcode_. And
   * data from each frame is accumulated to data_ until the final fragment is
   * detected. See websocket RFC at http://tools.ietf.org/html/rfc6455, section
   * 5.4 for more protocol and fragmentation details.
   *
   * @param server the calling server
   * @param conn the connection information
   * @param bits first byte of the websocket frame, see websocket RFC at
   *             http://tools.ietf.org/html/rfc6455, section 5.2
   * @param data payload, with mask (if any) already applied.
   * @param data_len length of data
   * @returns true to keep socket open, false to close it
   */
  bool handleData(CivetServer *server, Connection *conn, int bits, char *data,
                  size_t data_len) override;

  bool handleJsonData(Connection *conn, const std::string &data);
  bool handleBinaryData(Connection *conn, const std::string &data);

  /**
   * @brief Callback method for when the connection is closed.
   *
   * @param server the calling server
   * @param conn the connection information
   */
  void handleClose(CivetServer *server, const Connection *conn) override;

  /**
   * @brief Sends the provided data to all the connected clients.
   * @param data The message string to be sent.
   */
  bool BroadcastData(const std::string &data, bool skippable = false);

  /**
   * @brief Sends the provided binary data to all the connected clients.
   * @param data The message string to be sent.
   */
  bool BroadcastBinaryData(const std::string &data, bool skippable = false);

  /**
   * @brief Sends the provided data to a specific connected client.
   *
   * @param conn The connection to send to.
   * @param data The message string to be sent.
   * @param skippable whether the data is allowed to be skipped if some other is
   * being sent to this connection.
   */
  bool SendData(Connection *conn, const std::string &data,
                bool skippable = false, int op_code = MG_WEBSOCKET_OPCODE_TEXT);

  bool SendBinaryData(Connection *conn, const std::string &data,
                      bool skippable = false);

  /**
   * @brief Add a new message handler for a message type.
   * @param type The name/key to identify the message type.
   * @param handler The function to handle the received message.
   */
  void RegisterMessageHandler(std::string type, MessageHandler handler) {
    message_handlers_[type] = handler;
  }

  /**
   * @brief Add a new handler for new connections.
   * @param handler The function to handle the new connection in ReadyState.
   */
  void RegisterConnectionReadyHandler(ConnectionReadyHandler handler) {
    connection_ready_handlers_.emplace_back(handler);
  }

 private:
  const std::string name_;

  // Message handlers keyed by message type.
  std::unordered_map<std::string, MessageHandler> message_handlers_;
  // New connection ready handlers.
  std::vector<ConnectionReadyHandler> connection_ready_handlers_;

  // The mutex guarding the connection set. We are not using read
  // write lock, as the server is not expected to get many clients
  // (connections).
  // CAVEAT: Execution section while holding this global lock should be as
  // brief as possible.
  mutable std::mutex mutex_;

  // The pool of all maintained connections. Each connection has a lock to
  // guard against simultaneous write.
  std::unordered_map<Connection *, std::shared_ptr<std::mutex>> connections_;
};

}  // namespace dreamview
}  // namespace apollo
View Code

websocket_handler.cc

#include "modules/dreamview/backend/common/handlers/websocket_handler.h"

#include "cyber/common/log.h"
#include "modules/common/util/map_util.h"

namespace apollo {
namespace dreamview {

using apollo::common::util::ContainsKey;

void WebSocketHandler::handleReadyState(CivetServer *server, Connection *conn) {
  {
    std::unique_lock<std::mutex> lock(mutex_);
    connections_.emplace(conn, std::make_shared<std::mutex>());
  }
  AINFO << name_
        << ": Accepted connection. Total connections: " << connections_.size();

  // Trigger registered new connection handlers.
  for (const auto handler : connection_ready_handlers_) {
    handler(conn);
  }
}

void WebSocketHandler::handleClose(CivetServer *server,
                                   const Connection *conn) {
  // Remove from the store of currently open connections. Copy the mutex out
  // so that it won't be reclaimed during map.erase().
  Connection *connection = const_cast<Connection *>(conn);

  std::shared_ptr<std::mutex> connection_lock;
  {
    std::unique_lock<std::mutex> lock(mutex_);
    connection_lock = connections_[connection];
  }

  {
    // Make sure there's no data being sent via the connection
    std::unique_lock<std::mutex> lock_connection(*connection_lock);
    std::unique_lock<std::mutex> lock(mutex_);
    connections_.erase(connection);
  }
  // send close frame
  int ret = mg_websocket_write(connection, 0x8, "", 0);
  // Determine error message based on return value.
  AWARN << name_ << ": Failed to send clase frame. Reason";
  if (ret == 0) {
    AWARN << "Connection closed";
  } else if (ret < 0) {
    AWARN << "Send error: " << std::strerror(errno);
  } else {
    AWARN << "Bytes to send: expected 2, actual: " << ret;
  }

  AINFO << name_
        << ": Connection closed. Total connections: " << connections_.size();
}

bool WebSocketHandler::BroadcastData(const std::string &data, bool skippable) {
  std::vector<Connection *> connections_to_send;
  {
    std::unique_lock<std::mutex> lock(mutex_);
    if (connections_.empty()) {
      return true;
    }
    for (auto &kv : connections_) {
      Connection *conn = kv.first;
      connections_to_send.push_back(conn);
    }
  }

  bool all_success = true;
  for (Connection *conn : connections_to_send) {
    if (!SendData(conn, data, skippable)) {
      all_success = false;
    }
  }

  return all_success;
}

bool WebSocketHandler::BroadcastBinaryData(const std::string &data,
                                           bool skippable) {
  std::vector<Connection *> connections_to_send;
  {
    std::unique_lock<std::mutex> lock(mutex_);
    if (connections_.empty()) {
      return true;
    }
    for (auto &kv : connections_) {
      Connection *conn = kv.first;
      connections_to_send.push_back(conn);
    }
  }

  bool all_success = true;
  for (Connection *conn : connections_to_send) {
    if (!SendData(conn, data, skippable, MG_WEBSOCKET_OPCODE_BINARY)) {
      all_success = false;
    }
  }

  return all_success;
}

bool WebSocketHandler::SendBinaryData(Connection *conn, const std::string &data,
                                      bool skippable) {
  return SendData(conn, data, skippable, MG_WEBSOCKET_OPCODE_BINARY);
}

bool WebSocketHandler::SendData(Connection *conn, const std::string &data,
                                bool skippable, int op_code) {
  std::shared_ptr<std::mutex> connection_lock;
  {
    std::unique_lock<std::mutex> lock(mutex_);
    if (!ContainsKey(connections_, conn)) {
      AERROR << name_
             << ": Trying to send to an uncached connection, skipping.";
      return false;
    }
    // Copy the lock so that it still exists if the connection is closed after
    // this block.
    connection_lock = connections_[conn];
  }

  // Lock the connection while sending.
  if (!connection_lock->try_lock()) {
    // Skip sending data if:
    // 1. Data is skippable according to sender and there's higher priority data
    // being sent.
    // 2. The connection has been closed.
    if (skippable) {
      AWARN << "Skip sending a droppable message!";
      return false;
    }
    // Block to acquire the lock.
    connection_lock->lock();
    std::unique_lock<std::mutex> lock(mutex_);
    if (!ContainsKey(connections_, conn)) {
      return false;
    }
  }

  // Note that while we are holding the connection lock, the connection won't be
  // closed and removed.
  int ret = mg_websocket_write(conn, op_code, data.c_str(), data.size());
  connection_lock->unlock();

  if (ret != static_cast<int>(data.size())) {
    // When data is empty, the header length (2) is returned.
    if (data.empty() && ret == 2) {
      return true;
    }

    // Determine error message based on return value.
    AWARN << name_ << ": Failed to send data via websocket connection. Reason";
    if (ret == 0) {
      AWARN << "Connection closed";
    } else if (ret < 0) {
      AWARN << "Send error: " << std::strerror(errno);
    } else {
      AWARN << "Bytes to send: expected " << data.size() << ", actual: " << ret;
    }
    return false;
  }

  return true;
}

thread_local unsigned char WebSocketHandler::current_opcode_ = 0x00;
thread_local std::stringstream WebSocketHandler::data_;

bool WebSocketHandler::handleData(CivetServer *server, Connection *conn,
                                  int bits, char *data, size_t data_len) {
  // Ignore connection close request.
  if ((bits & 0x0F) == MG_WEBSOCKET_OPCODE_CONNECTION_CLOSE) {
    return false;
  }

  data_.write(data, data_len);
  if (current_opcode_ == 0x00) {
    current_opcode_ = bits & 0x7f;
  }

  bool result = true;

  // The FIN bit (the left most significant bit) is used to indicates
  // the final fragment in a message. Note, the first fragment MAY
  // also be the final fragment.
  bool is_final_fragment = bits & 0x80;
  if (is_final_fragment) {
    switch (current_opcode_) {
      case MG_WEBSOCKET_OPCODE_TEXT:
        result = handleJsonData(conn, data_.str());
        break;
      case MG_WEBSOCKET_OPCODE_BINARY:
        result = handleBinaryData(conn, data_.str());
        break;
      default:
        AERROR << name_ << ": Unknown WebSocket bits flag: " << bits;
        break;
    }

    // reset opcode and data
    current_opcode_ = 0x00;
    data_.clear();
    data_.str(std::string());
  }

  return result;
}

bool WebSocketHandler::handleJsonData(Connection *conn,
                                      const std::string &data) {
  Json json;
  try {
    json = Json::parse(data.begin(), data.end());
  } catch (const std::exception &e) {
    AERROR << "Failed to parse JSON data: " << e.what();
    return false;
  }

  if (!ContainsKey(json, "type")) {
    AERROR << "Received JSON data without type field: " << json;
    return true;
  }

  auto type = json["type"];
  if (!ContainsKey(message_handlers_, type)) {
    AERROR << "No message handler found for message type " << type
           << ". The message will be discarded!";
    return true;
  }
  message_handlers_[type](json, conn);
  return true;
}

bool WebSocketHandler::handleBinaryData(Connection *conn,
                                        const std::string &data) {
  auto type = "Binary";
  message_handlers_[type](data, conn);
  return true;
}

}  // namespace dreamview
}  // namespace apollo
View Code

 

标签:std,http,file,lock,connection,思考,websocket,data,conn
From: https://www.cnblogs.com/WTSRUVF/p/17958156

相关文章

  • HTTP(S)状态码
    HTTP状态码用于表示Web服务器在处理HTTP请求时返回的结果状态。HTTP状态码由3位数字组成,共分为5类,分别是:1xx(信息性状态码):接受的请求正在处理2xx(成功状态码):请求已成功被服务器接收、理解、并接受3xx(重定向状态码):需要客户端执行进一步的操作才能完成请求4xx(客户端错误状态码):请......
  • RestSharp Body Raw, https 支持
    publicstaticIRestResponseRestPost(stringurl,objectbodyRawObj){varrequest=newRestRequest(Method.POST);request.Timeout=30000;request.RequestFormat=DataFormat.Json;request.AddBody(bodyRawObj);//BodyRaw......
  • WebSocket与JavaScript:实现实时地理位置定位系统的关键技术
    Laravel是一个流行的PHP框架,它具有出色的可测试性,可以帮助开发人员在更短的时间内编写可靠的代码。但是,即使使用了这个框架,也可能会出现测试覆盖率较低的情况。测试覆盖率是指代码中已由测试案例覆盖的部分比例。测试覆盖率越高,代码质量越高。在本文中,我们将分享几种技巧,帮助您提......
  • 如何使用WebSocket和JavaScript实现在线人脸识别系统
    Laravel是一个流行的PHP框架,它具有出色的可测试性,可以帮助开发人员在更短的时间内编写可靠的代码。但是,即使使用了这个框架,也可能会出现测试覆盖率较低的情况。测试覆盖率是指代码中已由测试案例覆盖的部分比例。测试覆盖率越高,代码质量越高。在本文中,我们将分享几种技巧,帮助您提......
  • 配置CentOS系统以支持静态HTTP服务
    CentOS是一个流行的Linux发行版,广泛应用于服务器环境。要配置CentOS系统以支持静态HTTP服务,您可以按照以下步骤进行操作:1. 安装Web服务器软件:CentOS自带了ApacheHTTP服务器软件,您可以使用以下命令安装它:2. 复制代码sudoyuminstallhttpd1. 启动Web服务器:安装完成后,您需要启动......
  • SpringBoot WebSocket 样例
    SpringBootWebSocket样例pom.xml依赖配置<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId></dependency><dependency><groupId>javax.webso......
  • 在Linux中使用Apache HTTP服务器
    ApacheHTTP服务器,也被称为Apache,是全球使用最广泛的Web服务器软件之一。它以其稳定性、强大的功能和灵活性而闻名,尤其在Linux操作系统上表现得尤为出色。以下是关于如何在Linux中使用ApacheHTTP服务器的详细指南。1.安装Apache首先,你需要安装Apache。在大多数Linux发行版中,可以......
  • 在Linux中处理HTTPS请求
    随着互联网的发展,数据传输的安全性变得越来越重要。HTTPS,全称为安全超文本传输协议(HypertextTransferProtocolSecure),是互联网上应用最广泛的安全传输协议。在Linux系统中处理HTTPS请求,通常涉及到配置Web服务器软件(如Apache或Nginx)来支持SSL/TLS加密。1.安装和配置Web服务器软件......
  • 使用Linux防火墙管理HTTP流量
    在Linux系统中,防火墙是用于控制网络流量的重要工具。通过防火墙,你可以根据需要限制、过滤或允许特定的网络流量,从而提高系统的安全性。在处理HTTP流量时,防火墙可以帮助你实施访问控制、流量监控和其他安全策略。iptablesiptables是Linux中最常用的防火墙工具之一。它允许用户定义一......
  • 使用cURL命令在Linux中测试HTTP服务器的性能
    cURL是一个强大的命令行工具,用于从或向服务器传输数据。它支持多种协议,包括HTTP、HTTPS、FTP等。在Linux系统中,cURL可以用于测试和评估HTTP服务器的性能。下面是一些使用cURL命令测试HTTP服务器性能的示例和说明。1.基本请求要向指定的URL发送基本的GET请求,你可以使用以下命令:bash......