首页 > 其他分享 > 让GatewayWorker支持HTTP协议

让GatewayWorker支持HTTP协议

时间:2023-02-10 19:34:36浏览次数:42  
标签:协议 网关 HTTP 连接 response static file 服务 GatewayWorker

官方默认GatewayWorker只提供单一格式通信数据的分布式服务框架,Workerman是其内核。

Workerman巧妙的通过PCNTL实现多子进程服务,让每个子进程都可以监听同一个服务地址并进行竞争服务。因windows系统不支持PCNTL,所以在windows系统下只能通过多进程处理(且各进程无法监听同一服务地址),以下均以PCNTL运行模式进行说明。

官方地址:​​https://www.workerman.net/gateway-worker​


GatewayWorker分布式结构

GatewayWorker允许外部扩展自定义协议,但默认不支持HTTP协议,这缘于GatewayWorker分布式通信结构,使得通信数据格式复杂的协议处理困难。

GatewayWorker通过内置三种服务:

  • 注册中心(\Workerman\Worker\Register)
  • 网关服务(\Workerman\Worker\Gateway)
  • 业务服务(\Workerman\Worker\BusinessWorker)

官方原理说明:​​https://www.workerman.net/doc/gateway-worker/principle.html​

注册中心

用来存储所有网关子进程与业务服务连接的内部连接服务地址,当有业务服务连接过来时就通知给业务服务连接网关的地址进行连接。

允许多个注册中心同时运行,以实现高可用。每个注册中心仅单进程运行,因为多进程各自是独立处理竞争得来的连接信息,会导致各进程竞争来得的网关信息不同,使业务服务获取的网关地址数据也不同,最终网关服务与业务服务连接不全服务性能受限。

监听

内部服务地址:使用内置text协议,仅限内部网关服务和业务服务连接。监听地址一般是内网或本地地址。


连接


网关服务

网关是用户与业务服务之间的桥梁,将用户请求的数据转发给某个业务服务子进程处理,再由业务服务子进程回复或通知网关转发给用户的数据。

网关服务建议使用多子进程模式,增加并发能力,同时各子进程共享了对外服务地址(通过竞争接受用户连接),但各子进程会独立创建一个对内服务地址(地址相同端口号会递增)。


监听

对外服务地址:给用户连接的,协议自行定义,一个网关地址在内部所有子进程内是共享连接的,但接受连接存在各子进程竞争。此监听是各子进程共享。

对内服务地址:给内部业务服务连接的,使用内置GatewayProtocol(\GatewayWorker\Protocols\GatewayProtocol)协议,此协议收发均会携带用户连接信息和数据,监听地址一般是内网或本地地址。当业务服务接收后进行解析后即可知道用户信息,处理完后能通知网关回复指定用户。此监听是各子进程独享。


连接

注册中心:使用内置text协议,主要将内部服务地址(网关节点)存放到注册中,用于业务服务获取并连接。连接是各子进程独享。


业务服务

业务服务只提供处理并无监听功能,均通过连接完成业务处理。建议使用多子进程模式,增加并发能力。所有连接是各子进程独享。


监听


连接

注册中心:使用内置text协议,主要获取内部服务地址(网关节点)进行连接。

网关节点:使用内置GatewayProtocol协议,将从注册中心获取的地址进行逐个连接并保持状态,等待网关通知业务处理数据。


如何支持HTTP协议

从GatewayWorker结构可以看的出GatewayProtocol协议是将网关服务收到的用户数据进行再打包转发给业务服务处理。

像HTTP这类协议会在网关收到后自动进行解码生成请求(\Workerman\Protocols\Http\Request)对象,再交给GatewayProtocol协议转发给业务服务,而GatewayProtocol协议只接收字符串转打包并不知道该如何处理Request对象。

而Websocket协议就不同,虽然Websocket协议有编码和解码过程,但最终解码与编码均是字符串,刚好匹配GatewayProtocol协议。

需要GatewayWorker支持HTTP协议最简单的办法是将新增一个HTTP网关协议,将HTTP解码与编码放到业务服务层处理即可衔接(其它类似协议也可以这样处理)。

示例代码如下,注意文件放置目录需要与命令空间相符。

HTTP网关协议代码:

<?php

/*
* Http协议网关处理
*/

namespace Protocols;

use Workerman\Connection\TcpConnection;

class HttpGateway extends \Workerman\Protocols\Http {

/**
* Http encode.
*
* @param string|Response $response
* @param TcpConnection $connection
* @return string
*/
public static function encode($response, TcpConnection $connection) {
return $response;
}

/**
* Http decode.
*
* @param string $recv_buffer
* @param TcpConnection $connection
* @return \Workerman\Protocols\Http\Request
*/
public static function decode($recv_buffer, TcpConnection $connection) {
return $recv_buffer;
}

}

业务服务事件处理代码

<?php

/*
* 业务处理事件
*/

namespace App;

use Workerman\Protocols\Http;
use GatewayWorker\Lib\Context;
use GatewayWorker\Lib\Gateway;
use GatewayWorker\BusinessWorker;
use Workerman\Protocols\Http\Request;
use Workerman\Protocols\Http\Response;

class Event {

/**
* @var BusinessWorker 当前业务处理服务实例
*/
public static $businessWorker;

/**
* @var array 静态文件类型集合
*/
protected static $mimeTypes = [];

/**
* 业务服务开始回调
* @param BusinessWorker $businesswork
*/
public static function onWorkerStart($businesswork) {
static::$businessWorker = $businesswork;
// 加载静态文件响应类型
$mimeFile = __DIR__ . '/../vendor/workerman/workerman/Protocols/Http/mime.types';
if (file_exists($mimeFile) && is_readable($mimeFile)) {
foreach (file($mimeFile, FILE_IGNORE_NEW_LINES | FILE_SKIP_EMPTY_LINES) as $line) {
$data = preg_split('/\s+/', $line);
if (count($data)) {
$type = array_shift($data);
foreach ($data as $ext) {
static::$mimeTypes[strtolower($ext)] = $type;
}
}
}
}
}

/**
* 业务服务结束回调
* @param BusinessWorker $businesswork
*/
public static function onWorkerStop($businesswork) {

}

/**
* 当客户端连接时触发
* 如果业务不需此回调可以删除onConnect
*
* @param int $client_id 连接id
*/
public static function onConnect($client_id) {

}

/**
* 当用户断开连接时触发
* @param int $client_id 连接id
*/
// 注意, 此时socket应该已经断开了
public static function onClose($client_id) {

}

/**
* 当客户端发来消息时触发
* @param int $cid 连接id
* @param mixed $recv_buffer 具体消息
*/
public static function onMessage($cid, $recv_buffer) {
$address_data = Context::clientIdToAddress($cid);
if ($address_data) {
$address = long2ip($address_data['local_ip']) . ":{$address_data['local_port']}";
$connection = Event::$businessWorker->gatewayConnections[$address];
$request = Http::decode($recv_buffer, $connection);
// 这里可以进行业务处理了
$response = static::requestStatic($request);
if (!$response) {
$response = static::requestBusiness($request);
}
if (!$response) {
$response = new Response(404, [], 'Not Found');
} elseif (isset($response->file)) {
$file = $response->file['file'];
if (file_exists($file) && is_readable($file)) {
// 发送文件
static::sendFile($response);
return;
}
$response = new Response(403, null, '403 Forbidden');
}
Gateway::sendToCurrentClient(Http::encode($response, $connection));
} else {
Gateway::closeCurrentClient();
}
}

/**
* 请求静态文件
* @param Request $request
* @return Response|null
*/
protected static function requestStatic(Request $request) {
$file = __DIR__ . '/../public/' . $request->path();
// 取出文件名后缀
$filename = basename($file);
$index = strrpos($filename, '.');
if ($index === false) {
return;
}
$ext = strtolower(substr($filename, $index + 1));
if (file_exists($file) && isset(static::$mimeTypes[$ext])) {
$response = new Response(200, ['Content-Type' => static::$mimeTypes[$ext]]);
$response->file = [
'file' => $file,
'offset' => 0,
'length' => 0,
];
return $response;
}
}

/**
* 请求业务处理
* @param Request $request
* @return Response|null
*/
protected static function requestBusiness(Request $request) {
$data = array_filter(explode('/', $request->path()));
switch (count($data)) {
case 0:
$controller = 'IndexController';
$action = 'index';
break;
case 1:
$controller = ucfirst(strtolower($data[0])) . 'Controller';
$action = 'index';
break;
default:
$action = array_pop($data);
$data = array_map(function($item) {
return ucfirst(strtolower($item));
}, $data);
$controller = implode('\\', $data) . 'Controller';
break;
}
$controller = "\\App\\Controllers\\{$controller}";
if (class_exists($controller) && method_exists($controller, $action)) {
return (new $controller())->$action($request);
}
}

/**
* 发送文件
* @param Response $response
*/
protected static function sendFile(Response $response) {
$file = $response->file['file'];
$offset = $response->file['offset'];
$length = $response->file['length'];
clearstatcache();
$file_size = (int) \filesize($file);
$body_len = $length > 0 ? $length : $file_size - $offset;
$response->withHeaders(array(
'Content-Length' => $body_len,
'Accept-Ranges' => 'bytes',
));
if ($offset || $length) {
$offset_end = $offset + $body_len - 1;
$response->header('Content-Range', "bytes $offset-$offset_end/$file_size");
}
$handle = \fopen($file, 'r');
if ($offset !== 0) {
\fseek($handle, $offset);
}
$size = 1024 * 1024;
$buffer = fread($handle, $size);
Gateway::sendToCurrentClient((string) $response . $buffer);
while (!feof($handle)) {
$buffer = fread($handle, $size);
Gateway::sendToCurrentClient($buffer);
}
fclose($handle);
}

}

网关地址配置代码

<?php

// 前段代码省略

// 指定网关地址
$gateway = new Gateway('HttpGateway://0.0.0.0:80');

// 后段代码省略
// 一定要网关发送心跳包功能

业务服务事件处理指定

<?php

// 前段代码省略

$worker = new BusinessWorker();
// 事件处理
$worker->eventHandler = \App\Event::class;

// 后段代码省略
// 一定要网关发送心跳包功能

增加控制器

<?php

/*
* 控制器
*/

namespace App\Controllers;

class IndexController extends Controller {

public function index() {
return __METHOD__;
}

}

静态文件需要放在项目根目录的 public/ 目录下才有效。

启动验证(能打印 App\Controllers\IndexController::index 表示成功)

curl http://127.0.0.1/

标签:协议,网关,HTTP,连接,response,static,file,服务,GatewayWorker
From: https://blog.51cto.com/php2012web/6049703

相关文章

  • Java使用代码调用接口http根据ip访问
    HttpClient的主要功能:实现了所有HTTP的方法(GET、POST、PUT、HEAD、DELETE、HEAD、OPTIONS等)支持HTTPS协议支持代理服务器(Nginx等)等支持自动(跳转)转向详细使用示例声......
  • 数据同步gossip协议原理与应用场景介绍
    作者:京东物流冯鸿儒1简介Gossip是一种p2p的分布式协议。它的核心是在去中心化结构下,通过将信息部分传递,达到全集群的状态信息传播,传播的时间收敛在O(Log(N))以内,其中N是节......
  • 数据同步gossip协议原理与应用场景介绍
    作者:京东物流冯鸿儒1简介Gossip是一种p2p的分布式协议。它的核心是在去中心化结构下,通过将信息部分传递,达到全集群的状态信息传播,传播的时间收敛在O(Log(N))以内,其中N是节点的......
  • springboot将http改造成https
    生成命令:keytool-genkey-aliastestalias-storetypePKCS12-keyalgRSA-keysize2048-keystorekeystore.p12-validity365关键字解释:alias  密钥别名store......
  • 浏览器如何使用 HTTP 防止限制 ip
    当我们浏览网页尤其是频发刷新网页时,会跳出来验证码。主要因为频繁刷新导致目标网页限制了您本地ip,正常过一段时间也或许恢复,如果遇到紧急的事情急需访问,不妨试试下面的操......
  • c#基于TCP/IP、CIP协议的欧姆龙PLC通信
    一、关于CIP协议CIP通信是CommonIndustrialProtocl(CIP)的简称,它是一个点到点的面向对象协议,能够实现工业器件(传感器,执行器)之间的连接,和高等级的控制器之间的连接。......
  • hutool工具包HttpUtil使用防坑
      Hutool是什么?   hutool-http介绍   hutool-http使用注意   总结Hutool是什么?Hutool是一个Java工具类库,里面很多封装好的工具类,涵盖方方面面,合理使用可以......
  • 718~719 HTTP响应消息响应头 AND Response功能介绍
    HTTP协议:1.请求消息:客户端发送给服务端的数据数据格式:1.请求行2.请求头3.请求空行4.请求体2.响应消息......
  • 网络协议-ssh基础
    ssh连接连接准备客户端如果想要连接服务端并登录,首先需要在本地生成一对密钥(私钥和公钥)。其中私钥文件:~/.ssh/id_rsa公钥文件:~/.ssh/id_rsa.pub然后将公钥......
  • 新项目克隆代码出现fatal: Unencrypted HTTP is not supported for GitLab. Ensure th
    ###新项目克隆代码出现fatal:UnencryptedHTTPisnotsupportedforGitLab.EnsuretherepositoryremoteURLisusingHTTPS报错话不多说,直接上图到网上也查了......