首页 > 其他分享 >Fastapi微服务系列(1)-之GRPC入门篇

Fastapi微服务系列(1)-之GRPC入门篇

时间:2022-11-28 00:33:30浏览次数:47  
标签:name pb2 GRPC Fastapi self grpc 入门篇 hello 客户端

一些微服务说明

前言

在转回python之前,其实就对微服务有所尝试,不过当时使用的是go-micro-v2来进行了解,当时也只是浅尝辄止,没深入继续深究~

其实微服务这东西没必要为了微服务而微服务吧!除非真的业务需要,其实没必要进行拆分,毕竟加入你只是一个人再干!哈哈那你引入这个微服务的话,估计是要把自己给累趴了!

我这里主要是为了学习而学习的做的示例而已,生产环境的话其实,可能涉及的问题还甚多,我这里主要是总结一些微服务的雏形。

关于微服务

PS:我这里大概率是不会去用nameko,这个框架定格再了19年之后好像就没更新了在而且不具备跨语言的通用性!

参考之前学习笔记大概问的微服务总体的架构就是这样:

图示来源:https://github.com/stack-labs/learning-videos/tree/master/docs/micro-api

那我们后续的话,使用的是fastapi来做的话,其实它也只是充当我们的里面的聚合服务层。

其实微服务涉及的几个问题点主要有:

  • 如何进行服务的拆分
  • 如何进行服务之间的通信
  • 如何做服务注册和发现(consul,edct)
  • 如何进行服务的配置中心(Nacos,apollo,Spring Cloud Config)
  • API网关做类似SLB层处理(goku,kong,apisix)
  • 微服务的相关的链路追踪问题(opentracing)
  • 微服务中的日志聚合问题

所以一个完整的微服务图示应该大概如下:

图示来源:https://github.com/stack-labs/learning-videos/tree/master/docs/micro-api

image.png

fastapi微服务前奏:

1:关于protobuf简述:

  • 1:它是一种高效数据存贮格式,也是一种数据交换格式
  • 2:高压缩
  • 3:对比XML和JSON的序列化和反序列化压缩传输比较高
  • 4:传输快
  • 5:支持跨语言,跨平台,一种与语言、平台无关,可扩展的序列化结构化数据
  • 6:它只是一个协议可以脱离具体的框架存在
  • 7:接口定义语言(IDL)来描述服务接口和有效负载消息的结构

使用 protobuf 的过程:

编写 proto 文件 -> 使用 protoc 编译 -> 添加 protobuf 运行时 -> 项目中集成

更新 protobuf 的过程:

修改 proto 文件 -> 使用 protoc 重新编译 -> 项目中修改集成的地方

2:关于GRPC简述

关于RPC

定义:

  • 远程过程调用(Remote Procedure Call)
  • 一台服务器调用另一个服务器上的服务的方法,看起像本地调用一样

常见 RPC 框架

  • gRPC(谷歌)
  • Thrift(脸书-现在改名买它
  • Dubbo(阿里的JAVA系)

定义:

Grpc基于protobuf数据协议rpc框架. 它使用 protobuf 进行数据传输.

grpc的特性:

  • 1:基于c++高性能,且协议基于protobuf序列化和反序列化(和Python中xml和json的rpa框架有别)
  • 2:通同性,跨通用主流的语言(python客户端可以调用Go写的客户端)
  • 3:高性能、通用的开源 RPC 框架
  • 4:更容易地创建分布式应用和服务

grpc-python官方文档:

 

文档地址:http://grpc.github.io/grpc/python/grpc.html

3:python下进行的grpc框架简单使用体验:

低版本的IDE:

3.1 pychram安装protobuf插件

主要是为了方便识别的对于的protobuf的文件格式:

步骤1- 下载插件ZIP文件::

https://plugins.jetbrains.com/plugin/8277-protobuf-support
下载地址:
https://plugins.jetbrains.com/plugin/16228-protobuf-support/versions/stable/144595

步骤2- 本地安装插件

步骤3- 重启pychram

重启后就可以正常的识别proto的文件了!

2021版本的话直接搜索:

image.png

安装后可以自动识别:

image.png

3.2  python下的GRPC工具安装:

具体工具包:

1:grapio
2:grpcio-tools

安装:

pip install grpcio -i https://pypi.tuna.tsinghua.edu.cn/simple
pip install grpcio-tools -i https://pypi.tuna.tsinghua.edu.cn/simple

3.3 官网的 GRPC-PYTHON 体验示例:

相关的示例步骤如下:

1:步骤1 -编写protobuf文件(版本使用3)

syntax = "proto3";

service Greeter {
    //   定义PAC对于的具体的服务包含方法
    rpc SayHello (HelloRequest) returns (HelloReply) {}
    rpc SayHelloAgain (HelloRequest) returns (HelloReply) {}
}

message HelloRequest {
    string name = 1; //定义我们的服务的一个请求的需要提交的参数
}

message HelloReply {
    string message = 1; //我们的请求向移动额报文的字段信息
}

图示:

2:步骤2 -编译 proto 文件

PS:建议注意需要进入的当前的我们的所以在的proto文件下再执行命令:

python -m grpc_tools.protoc --python_out=. --grpc_python_out=. -I. hello.proto

关于上述的命令的一些说明:

  • grpc_tools.protoc :依赖于我们上面安装的grpcio-tools

  • --python_out=. :表示我们的输出编译生成的protobuf文件路径, . 点号 表示的是当前目录下(生成的文件放置到当前目录下)

  • --grpc_python_out=. :表示我们的输出编译生成的grpc的文件路径, . 点号 表示的是当前目录下

  • -I. :表示输入Input,主要是强调从那个目录下 去找我们的xx.proto 文件 . 点好表示的是从当前的目录下去寻找

PS:只有PY语言会生成两个文件,其他语言都只是一个文件如GO的

上述命令执行后的结果:

PS:需要注意的点,生成的文件的引入的包的路径问题!

生成文件的描述:

  • hello_pb2.py: 是对我们的protobuf 里面定义的请求和响应的等参数数数据封装,使用里面的可以对我们的请求体参数和响应体参数进行实例化的操作等。

  • hello_pb2_grpc.py: 主要是用于针对GRPC服务的生成,当需要生成服务端或者客户端的时候需要依赖这个文件,此文件包含生 客户端(GreeterStub)和服务端(GreeterServicer)的类。

3:步骤3 - 编写grpc的服务端(多线程模式处理并发):

  • 1:基于我们的hello_pb2_grpc实现里面我们的定义的接口

定义一个服务名称,继承我们的hello_pb2_grpc,帮我们的生成的服务名称,并且实现所有的方法

2:把服务注册的rpc服务上

3:进行我们的rpc服务的一些启动配置处理

ps:关于rpc服务的启动有多重方式:

方式1:

def serve():
    # 实例化一个rpc服务,使用线程池的方式启动我们的服务
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    # 添加我们服务
    hello_pb2_grpc.add_GreeterServicer_to_server(Greeter(), server)
    # 配置启动的端口
    server.add_insecure_port('[::]:50051')
    #  开始启动的服务
    server.start()、
    # --循环-主要是为了目标启动后主进程直接的结束!需要一个循环的方式进行进行进程运行
    try:
        while True:
            time.sleep(60 * 60 * 24)  # one day in seconds
    except KeyboardInterrupt:
        server.stop(0)

方式2:

def serve():
  # 实例化一个rpc服务,使用线程池的方式启动我们的服务
  server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
  # 添加我们服务
  hello_pb2_grpc.add_GreeterServicer_to_server(Greeter(), server)
  # 配置启动的端口
  server.add_insecure_port('[::]:50051')
  #  开始启动的服务
  server.start()
  # wait_for_termination --主要是为了目标启动后主进程直接的结束!需要一个循环的方式进行进行进程运行
  server.wait_for_termination()

PS:wait_for_termination 阻塞当前线程,直到服务器停止。

这是一个实验性API。

等待在阻塞期间不会消耗计算资源,它将阻塞直到满足以下两个条件之一:

  1. 停止或终止服务器;
  2. 如果没有超时,则会发生超时。无.

server-完整的服务端实例代码为:

from concurrent import futures
import time
import grpc
import hello_pb2
import hello_pb2_grpc


# 实现 proto文件中定义的 GreeterServicer的接口
class Greeter(hello_pb2_grpc.GreeterServicer):
  # 实现 proto 文件中定义的 rpc 调用
  def SayHello(self, request, context):
      # 返回是我们的定义的响应体的对象
      return hello_pb2.HelloReply(message='hello {msg}'.format(msg=request.name))

  def SayHelloAgain(self, request, context):
      # 返回是我们的定义的响应体的对象
      return hello_pb2.HelloReply(message='hello {msg}'.format(msg=request.name))


def serve():
  # 实例化一个rpc服务,使用线程池的方式启动我们的服务
  server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
  # 添加我们服务
  hello_pb2_grpc.add_GreeterServicer_to_server(Greeter(), server)
  # 配置启动的端口
  server.add_insecure_port('[::]:50051')
  #  开始启动的服务
  server.start()
  # wait_for_termination --主要是为了目标启动后主进程直接的结束!需要一个循环的方式进行进行进程运行
  server.wait_for_termination()

if __name__ == '__main__':
  serve()

4:步骤4 - 编写client -grpc的客户端,调用我们的服务端:

#!/usr/bin/evn python
# coding=utf-8

import grpc
import hello_pb2
import hello_pb2_grpc


def run():
    # 连接 rpc 服务器
    with grpc.insecure_channel('localhost:50051') as channel:
        # 通过通道服务一个服务
        stub = hello_pb2_grpc.GreeterStub(channel)
        # 生成请求我们的服务的函数的时候,需要传递的参数体,它放在hello_pb2里面-请求体为:hello_pb2.HelloRequest对象
        response = stub.SayHello(hello_pb2.HelloRequest(name='小钟同学'))
        print("SayHello函数调用结果返回:: " + response.message)
        response = stub.SayHelloAgain(hello_pb2.HelloRequest(name='欢迎下次光临'))
        print("SayHelloAgain函数调用结果的返回: " + response.message)


if __name__ == '__main__':
    run()

5:步骤5 - 服务启动:

  • 启动服务端
  • 再启动客户端

客户端最后的输出结果为:

SayHello函数调用结果返回:: hello 小钟同学
SayHelloAgain函数调用结果的返回: hello 欢迎下次光临

总结步骤:

  • 1:编写.proto文件定义服务(定义了消息体和服务接口)
  • 2:编译.proto文件,生成具体的服务信息
  • 3:编写客户端和服务端

4:grpc 4个通讯模式(python实现)

不同的业务需求场景,不同的业务模式,不同的通讯模式:

  • 简单模式:请求响应一次调用(也就是客户端请求一次,服务端响应一次)

    PS:简单模式也可以叫做一元RPC模式

  • 服务端流模式:客服端一次请求, 服务器多次进行数据流式应答(客户端发送一个对象服务器端返回一个Stream(流式消息))

  • 客户端流模式:客服端多次流式的请求, 发送结束后,服务器一次应答(客户端数据上报)

  • 双向流模式:客服端多次流式的请求,服务器多次进行数据流式应答(类似于WebSocket(长连接),客户端可以向服务端请求消息,服务器端也可以向客户端请求消息))

由于简单模式上面的一有所演示,那么这里我就不演示,下面示例我也是来自官网的示例,我主要是拆分开进行实践体验。

通常情况下流模式主要使用于下面一些场景:

  • 大规模数据包
  • 实时场景数据传输

4.1 服务端流模式示例

定义:

  • 服务端流模式:客服端一次请求, 服务器多次进行数据流式应答(客户端发送一个对象服务器端返回一个Stream(流式消息))

1:步骤1: 编写serverstrem.proto文件定义服务(定义了消息体和服务接口)

syntax = "proto3";

service Greeter {
    //  服务端流模式实现
    rpc SayHello(HelloRequest) returns (stream HelloReply) {}
}

message HelloRequest {
    string name = 1; //定义我们的服务的一个请求的需要提交的参数
}

message HelloReply {
    string message = 1; //我们的请求向移动额报文的字段信息
}

2:步骤2 -编译 serverstrem.proto 文件

PS:建议注意需要进入的当前的我们的所以在的proto文件下再执行命令(当前我的示例调整,调整到Demo2包下):

python -m grpc_tools.protoc --python_out=. --grpc_python_out=. -I. serverstrem.proto

3:步骤3 - 编写serverstrem_grpc_server.py grpc的服务端:


from concurrent import futures
import grpc
from demo2 import serverstrem_pb2_grpc, serverstrem_pb2
import threading
import time
import random

# 实现 proto文件中定义的 GreeterServicer的接口
class Greeter(serverstrem_pb2_grpc.GreeterServicer):
    # 实现 proto 文件中定义的 rpc 调用
    def SayHello(self, request, context):
        # 使用流的方式不断返回给客户端信息
        # 检查客户端是否还保持连接状态

        while context.is_active():
            # 接收到客户端的信息
            client_name = request.name
            # 使用生成器的方式不安给我们的---返回给客户端发送信息
            time.sleep(1)
            yield serverstrem_pb2.HelloReply(message=f"{client_name} 啊!我是你大爷!{random.sample('zyxwvutsrqponmlkjihgfedcba',5)}")





def serve():
    # 实例化一个rpc服务,使用线程池的方式启动我们的服务
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=2))
    # 添加我们服务
    serverstrem_pb2_grpc.add_GreeterServicer_to_server(Greeter(), server)
    # 配置启动的端口
    server.add_insecure_port('[::]:50051')
    #  开始启动的服务
    server.start()
    # wait_for_termination --主要是为了目标启动后主进程直接的结束!需要一个循环的方式进行进行进程运行
    server.wait_for_termination()

if __name__ == '__main__':
    serve()

上面的流服务的实现的时候,使用的是生成器的方式返回我们的数据流:

 while context.is_active():
            # 接收到客户端的信息
            client_name = request.name
            # 使用生成器的方式不安给我们的---返回给客户端发送信息
            time.sleep(1)
            yield serverstrem_pb2.HelloReply(message=f"{client_name} 啊!我是你大爷!{random.sample('zyxwvutsrqponmlkjihgfedcba',5)}")

PS:上面为了演示关于线程池的问题,我们设定的是只是开启了2两个的线程,这个表示以为的这在这服务端流模式下,我们最多能处理的只有两个客户端连接而已!!!超过2个的话就没办法了!!需要等待!!

4:步骤4 - 编写serverstrem_grpc_client.py grpc的客户端,调用我们的服务端:

客户端拥有一个存根(stub在某些语言中仅称为客户端),提供与服务器相同的方法

import grpc
from demo2 import serverstrem_pb2, serverstrem_pb2_grpc


def run():
    # 连接 rpc 服务器
    with grpc.insecure_channel('localhost:50051') as channel:
        # 通过通道服务一个服务
        stub = serverstrem_pb2_grpc.GreeterStub(channel)
        # 生成请求我们的服务的函数的时候,需要传递的参数体,它放在hello_pb2里面-请求体为:hello_pb2.HelloRequest对象
        response = stub.SayHello(serverstrem_pb2.HelloRequest(name='小风学'))
        for item in response:
            print("SayHello函数调用结果返回:: " + item.message)


if __name__ == '__main__':
    run()

注意点:上面我们的接收来自服务端的数据的时候使用的循环方式来接收!:

response = stub.SayHello(serverstrem_pb2.HelloRequest(name='小风学'))
        for item in response:
            print("SayHello函数调用结果返回:: " + item.message)

启动多个客户端的时候,最终我们的客户端输出的信息为:

超过三个则无法输出,需关闭一个客户端后才可以处理:

总结:

1:服务端流其实也是使用某种的循环迭代的方式进行我们的数据的迭代的发送而已!
2:另外根据业务场景来处理是否进行业务服务端业务的中断取消机制,
3:如果需服务端主动的关闭连接的话,需要使用  context.cancel()

补充一个服务端主动的关闭的示例:

# 实现 proto文件中定义的 GreeterServicer的接口
class Greeter(serverstrem_pb2_grpc.GreeterServicer):
    # 实现 proto 文件中定义的 rpc 调用
    def SayHello(self, request, context):
        # 使用流的方式不断返回给客户端信息
        # 检查客户端是否还保持连接状态
        idnex = 1
        while context.is_active():
            # 接收到客户端的信息
            idnex=idnex +1
            print("服务端的索引:",idnex)
            client_name = request.name
            # 使用生成器的方式不安给我们的---返回给客户端发送信息
            time.sleep(1)
            #  如果需要主动的关闭的服务端的话可以使用:
            if idnex == 5:
                context.cancel()
            yield serverstrem_pb2.HelloReply(message=f"{client_name} 啊!我是你大爷!{random.sample('zyxwvutsrqponmlkjihgfedcba',5)}")

当我们的服务端主动的关闭连接后:客户端会进行异常的抛出:

4.2 客户端流模式示例

定义:

  • 客服端多次流式的请求, 发送结束后,服务器一次应答(客户端数据上报)

1:步骤1: 编写serverstrem.proto文件定义服务(定义了消息体和服务接口)

syntax = "proto3";

service Greeter {
    //  服务端流模式实现
    rpc SayHello(HelloRequest) returns (stream HelloReply) {}
    //  新增客户端的流程模式
    rpc SayRequestStream(stream HelloRequest) returns (HelloReply) {}
}

message HelloRequest {
    string name = 1; //定义我们的服务的一个请求的需要提交的参数
}

message HelloReply {
    string message = 1; //我们的请求向移动额报文的字段信息
}

2:步骤2 -更新编译 serverstrem.proto 文件

PS:建议注意需要进入的当前的我们的所以在的proto文件下再执行命令(当前我的示例调整,调整到Demo2包下):

python -m grpc_tools.protoc --python_out=. --grpc_python_out=. -I. serverstrem.proto

3:步骤3 - 更新编写serverstrem_grpc_server.py grpc的服务端:

其实只需要新增需要实现的SayRequestStream方法就可以了!

根据我们的对这个模式的定义就是:

  • 客服端多次流式的请求, 发送结束后,服务器一次应答(客户端数据上报),所以我们的服务端需要设计相关的条件,结束客户端的提交,然后返回数据,这个需要结合自己真实的业务场景来处理。

完整代码:



from concurrent import futures
import grpc
from demo2 import serverstrem_pb2_grpc, serverstrem_pb2
import threading
import time
import random

# 实现 proto文件中定义的 GreeterServicer的接口
class Greeter(serverstrem_pb2_grpc.GreeterServicer):
    # 实现 proto 文件中定义的 rpc 调用
    def SayHello(self, request, context):
        # 使用流的方式不断返回给客户端信息
        # 检查客户端是否还保持连接状态
        idnex = 1
        while context.is_active():
            # 接收到客户端的信息
            idnex=idnex +1
            print("服务端的索引:",idnex)
            client_name = request.name
            # 使用生成器的方式不安给我们的---返回给客户端发送信息
            time.sleep(1)
            #  如果需要主动的关闭的服务端的话可以使用:
            if idnex == 5:
                context.cancel()
            yield serverstrem_pb2.HelloReply(message=f"{client_name} 啊!我是你大爷!{random.sample('zyxwvutsrqponmlkjihgfedcba',5)}")

    # 新增处理客户端的流模式的函数,注意下面的request_iterator是一个迭代器的对象
    def SayRequestStream(self, request_iterator, context):
        pass
        # 循环的接收来此客户端每次提交的数据
        for curr_request in request_iterator:
            # 打印当前客户端的数据信息
            print(curr_request.name)
            if curr_request.name=="后会有期":
                return serverstrem_pb2.HelloReply(message=f"{curr_request.name=} 啊!我们后会有期!")
        # 返回最终的服务器一次处理结果




def serve():
    # 实例化一个rpc服务,使用线程池的方式启动我们的服务
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=2))
    # 添加我们服务
    serverstrem_pb2_grpc.add_GreeterServicer_to_server(Greeter(), server)
    # 配置启动的端口
    server.add_insecure_port('[::]:50051')
    #  开始启动的服务
    server.start()
    # wait_for_termination --主要是为了目标启动后主进程直接的结束!需要一个循环的方式进行进行进程运行
    server.wait_for_termination()

if __name__ == '__main__':
    serve()

主要是新增服务函数处理:

逻辑说明:

  • 1:服务端一直接收客户端发生的消息,当我接收到后会有期的时候,则结束返回告诉客户端终止提交!
  • 2:并把xxxx 啊!我们后会有期!的结果返回给客户端。

4:步骤4 - 编写serverstrem_grpc_client.py grpc的客户端,调用我们的服务端:

此时是我们的客户端进行流的方式的提交数据给我们的服务端,所以我们的也设计一个迭代的方式自己新年数据的提交:

#!/usr/bin/evn python
# coding=utf-8

import grpc
from demo2 import serverstrem_pb2, serverstrem_pb2_grpc
import time

def run():
    # 连接 rpc 服务器
    with grpc.insecure_channel('localhost:50051') as channel:
        # 通过通道服务一个服务
        stub = serverstrem_pb2_grpc.GreeterStub(channel)
        # 生成请求我们的服务的函数的时候,需要传递的参数体,它放在hello_pb2里面-请求体为:hello_pb2.HelloRequest对象
        # response = stub.SayHello(serverstrem_pb2.HelloRequest(name='小名同学'))
        # for item in response:
        #     print("SayHello函数调用结果返回:: " + item.message)

        def send_action():
            for send_name in ['我是你大爷',"我是你小爷",'我是你大舅子',"后会有期"]:
                print("send_name:",send_name)
                time.sleep(1)
                yield serverstrem_pb2.HelloRequest(name=send_name)
        # 接收服务端返回的结果
        response = stub.SayRequestStream(send_action())
        print(response.message)


if __name__ == '__main__':
    run()

5:步骤5 - 服务启动:

  • 启动服务端
  • 再启动客户端

客户端最后的输出结果为:

send_name: 我是你大爷
send_name: 我是你小爷
send_name: 我是你大舅子
send_name: 后会有期
后会有期 啊!我们后会有期!

服务端输出:

我是你大爷
我是你小爷
我是你大舅子
后会有期

4.2 双向的流模式示例

定义:

  • 客服端多次流式的请求,服务器多次进行数据流式应答(类似于WebSocket(长连接),客户端可以向服务端请求消息,服务器端也可以向客户端请求消息))

1:步骤1: 新增接口-编写serverstrem.proto文件定义服务(定义了消息体和服务接口)

syntax = "proto3";

service Greeter {
    //  服务端流模式实现
    rpc SayHello(HelloRequest) returns (stream HelloReply) {}
    //  新增客户端的流程模式
    rpc SayRequestStream(stream HelloRequest) returns (HelloReply) {}
}

message HelloRequest {
    string name = 1; //定义我们的服务的一个请求的需要提交的参数
}

message HelloReply {
    string message = 1; //我们的请求向移动额报文的字段信息
}

2:步骤2 -更新编译 serverstrem.proto 文件

PS:建议注意需要进入的当前的我们的所以在的proto文件下再执行命令(当前我的示例调整,调整到Demo2包下):

python -m grpc_tools.protoc --python_out=. --grpc_python_out=. -I. serverstrem.proto

3:步骤3 - 更新编写serverstrem_grpc_server.py grpc的服务端:

完整代码:



from concurrent import futures
import grpc
from demo2 import serverstrem_pb2_grpc, serverstrem_pb2
import threading
import time
import random


# 实现 proto文件中定义的 GreeterServicer的接口
class Greeter(serverstrem_pb2_grpc.GreeterServicer):


    # 实现 proto 文件中定义的 rpc 调用
    def SayHello(self, request, context):
        # 使用流的方式不断返回给客户端信息
        # 检查客户端是否还保持连接状态
        idnex = 1
        while context.is_active():
            # 接收到客户端的信息
            idnex = idnex + 1
            print("服务端的索引:", idnex)
            client_name = request.name
            # 使用生成器的方式不安给我们的---返回给客户端发送信息
            time.sleep(1)
            #  如果需要主动的关闭的服务端的话可以使用:
            if idnex == 5:
                context.cancel()
            yield serverstrem_pb2.HelloReply(
                message=f"{client_name} 啊!我是你大爷!{random.sample('zyxwvutsrqponmlkjihgfedcba',5)}")


    # 新增处理客户端的流模式的函数,注意下面的request_iterator是一个迭代器的对象
    def SayRequestStream(self, request_iterator, context):
        pass
        # 循环的接收来此客户端每次提交的数据
        for curr_request in request_iterator:
            # 打印当前客户端的数据信息
            print(curr_request.name)
            if curr_request.name == "后会有期":
                return serverstrem_pb2.HelloReply(message=f"{curr_request.name} 啊!我们后会有期!")
        # 返回最终的服务器一次处理结果

    # 新增双向流的模式处理
    def SayRequestAndRespStream(self, request_iterator, context):
        pass
        # 循环的接收来此客户端每次提交的数据
        for curr_request in request_iterator:
            # 打印当前客户端的数据信息
            print(curr_request.name)
            # 对每一个的客户端的数据进行也循环的应答的回复处理
            yield serverstrem_pb2.HelloReply(message=f"{curr_request.name} 啊!我是来自服务端的回复!请接收!!")


def serve():
    # 实例化一个rpc服务,使用线程池的方式启动我们的服务
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=3))
    # 添加我们服务
    serverstrem_pb2_grpc.add_GreeterServicer_to_server(Greeter(), server)
    # 配置启动的端口
    server.add_insecure_port('[::]:50051')
    #  开始启动的服务
    server.start()
    # wait_for_termination --主要是为了目标启动后主进程直接的结束!需要一个循环的方式进行进行进程运行
    server.wait_for_termination()


if __name__ == '__main__':
    serve()


主要是新增服务函数处理:

逻辑说明:

  • 1:服务端一直接收客户端发生的消息

4:步骤4 - 编写serverstrem_grpc_client.py grpc的客户端,调用我们的服务端:

此时是我们的客户端进行流的方式的提交数据给我们的服务端,所以我们的也设计一个迭代的方式自己新年数据的提交:

#!/usr/bin/evn python
# coding=utf-8

import grpc
from demo2 import serverstrem_pb2, serverstrem_pb2_grpc
import time

def run():
    # 连接 rpc 服务器
    with grpc.insecure_channel('localhost:50051') as channel:
        # 通过通道服务一个服务
        stub = serverstrem_pb2_grpc.GreeterStub(channel)
        # 生成请求我们的服务的函数的时候,需要传递的参数体,它放在hello_pb2里面-请求体为:hello_pb2.HelloRequest对象
        # response = stub.SayHello(serverstrem_pb2.HelloRequest(name='小名同学'))
        # for item in response:
        #     print("SayHello函数调用结果返回:: " + item.message)

        # # ============客户端流模式
        # def send_action():
        #     for send_name in ['我是你大爷',"我是你小爷",'我是你大舅子',"后会有期"]:
        #         print("send_name:",send_name)
        #         time.sleep(1)
        #         yield serverstrem_pb2.HelloRequest(name=send_name)
        # # 接收服务端返回的结果
        # response = stub.SayRequestStream(send_action())
        # print(response.message)
        # ============双向流模式
        def send_action():
            for send_name in ['我是你大爷',"我是你小爷",'我是你大舅子',"后会有期"]:
                time.sleep(1)
                yield serverstrem_pb2.HelloRequest(name=send_name)
        # 接收服务端返回的结果
        response_iterator = stub.SayRequestAndRespStream(send_action())
        for response in response_iterator:
            print(response.message)

if __name__ == '__main__':
    run()

5:步骤5 - 服务启动:

  • 启动服务端
  • 再启动客户端

客户端最后的输出结果为:

send_name: 我是你大爷
send_name: 我是你小爷
我是你大爷 啊!我是来自服务端的回复!请接收!!
send_name: 我是你大舅子
我是你小爷 啊!我是来自服务端的回复!请接收!!
send_name: 后会有期
我是你大舅子 啊!我是来自服务端的回复!请接收!!
后会有期 啊!我是来自服务端的回复!请接收!!

服务端输出:

我是你大爷
我是你小爷
我是你大舅子
后会有期

5:安全认证

5.1支持的授权机制

以下是来自官方文档的说明:

  • SSL/TLS

    • gRPc 集成 SSL/TLS 并对服务端授权所使用的 SSL/TLS 进行了改良,对客户端和服务端交换的所有数据进行了加密。对客户端来讲提供了可选的机制提供凭证来获得共同的授权。
  • OAuth 2.0

    • RPC 提供通用的机制(后续进行描述)来对请求和应答附加基于元数据的凭证。当通过 gRPC 访问 Google API 时,会为一定的授权流程提供额外的获取访问令牌的支持,这将通过以下代码例子进行展示。警告:Google OAuth2 凭证应该仅用于连接 Google 的服务。把 Google 对应的 OAuth2 令牌发往非 Google 的服务会导致令牌被窃取用作冒充客户端来访问 Google 的服务。
  • API

    为了减少复杂性和将混乱最小化, gRPC 以一个统一的凭证对象来进行工作。凭证可以是以下两类:

    • 频道凭证, 被附加在 频道
      上, 比如 SSL 凭证。
    • 调用凭证, 被附加在调用上(或者 C++ 里的 客户端上下文
      )。凭证可以用组合频道凭证
      来进行组合。一个组合频道凭证
      可以将一个频道凭证
      和一个调用凭证
      关联创建一个新的频道凭证
      。结果在这个频道上的每次调用会发送组合的调用凭证
      来作为授权数据。例如,一各频道凭证
      可以由一个Ssl 凭证
      和一个访问令牌凭证
      生成。结果是在这个频道上的每次调用都会发送对应的访问令牌。 调用凭证
      可以用 组合凭证
      来组装。组装后的 调用凭证
      应用到一个客户端上下文
      里,将触发发送这两个调用凭证
      的授权数据。

5.1 关于 SSL

通常SSL主要是用于更加的安全进行数据传输,主要作用有:

  1. 进行数据的认证(用户和服务的认证)
  2. 数据的加密传输
  3. 维护数据完整性,确保数据传输过程中不被改变

5.2 携带TSL的实现(python实现)

示例代码来源:https://www.cnblogs.com/areful/p/10372619.html

使用SSL启动GRPC的服务示例:

  • 服务端:
# -*- coding: utf-8 -*-
# Author: areful
#
# pip install grpcio
# pip install protobuf
# pip install grpcio-tools
# ...
 
# Copyright 2015, Google Inc.
# All rights reserved.
 
"""The Python implementation of the GRPC helloworld.Greeter server."""
 
import time
from concurrent import futures
 
from gj.grpc.helloworld.helloworld_pb2 import *
from gj.grpc.helloworld.helloworld_pb2_grpc import *
 
_ONE_DAY_IN_SECONDS = 60 * 60 * 24
 
 
class Greeter(GreeterServicer):
 
    def SayHello(self, request, context):
        return HelloReply(message='Hello, %s!' % request.name)
 
 
def serve():
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    add_GreeterServicer_to_server(Greeter(), server)
 
    with open('server.pem', 'rb') as f:
        private_key = f.read()
    with open('server.crt', 'rb') as f:
        certificate_chain = f.read()
    with open('ca.crt', 'rb') as f:
        root_certificates = f.read()
    server_credentials = grpc.ssl_server_credentials(((private_key, certificate_chain),), root_certificates,True)
    server.add_secure_port('localhost:50051', server_credentials)
    server.start()
    try:
        while True:
            time.sleep(_ONE_DAY_IN_SECONDS)
    except KeyboardInterrupt:
        server.stop(0)
 
 
if __name__ == '__main__':
    serve()
  • 客户端:
# -*- coding: utf-8 -*-
# Author: areful
#
# pip install grpcio
# pip install protobuf
# pip install grpcio-tools
#
# Copyright 2015, Google Inc.
# All rights reserved.
# ...
 
"""The Python implementation of the GRPC helloworld.Greeter client."""
 
from __future__ import print_function
 
from gj.grpc.helloworld.helloworld_pb2 import *
from gj.grpc.helloworld.helloworld_pb2_grpc import *
 
 
def run():
    with open('client.pem', 'rb') as f:
        private_key = f.read()
    with open('client.crt', 'rb') as f:
        certificate_chain = f.read()
    with open('ca.crt', 'rb') as f:
        root_certificates = f.read()
    creds = grpc.ssl_channel_credentials(root_certificates, private_key, certificate_chain)
    channel = grpc.secure_channel('localhost:50051', creds)
    stub = GreeterStub(channel)
    response = stub.SayHello(HelloRequest(name='world'))
    print("Greeter client received: " + response.message)
 
 
if __name__ == '__main__':
    run()

6:GRPC 上下文对象相关内容

6.1 抽象基类:

```
class RpcContext(six.with_metaclass(abc.ABCMeta)):
    """Provides RPC-related information and action."""

    @abc.abstractmethod
    def is_active(self):
        """Describes whether the RPC is active or has terminated.

        Returns:
          bool:
          True if RPC is active, False otherwise.
        """
        raise NotImplementedError()

    @abc.abstractmethod
    def time_remaining(self):
        """Describes the length of allowed time remaining for the RPC.

        Returns:
          A nonnegative float indicating the length of allowed time in seconds
          remaining for the RPC to complete before it is considered to have
          timed out, or None if no deadline was specified for the RPC.
        """
        raise NotImplementedError()

    @abc.abstractmethod
    def cancel(self):
        """Cancels the RPC.

        Idempotent and has no effect if the RPC has already terminated.
        """
        raise NotImplementedError()

    @abc.abstractmethod
    def add_callback(self, callback):
        """Registers a callback to be called on RPC termination.

        Args:
          callback: A no-parameter callable to be called on RPC termination.

        Returns:
          True if the callback was added and will be called later; False if
            the callback was not added and will not be called (because the RPC
            already terminated or some other reason).
        """
        raise NotImplementedError()
```

6.2 实现类:

从上面的示例可以看,我们的几乎每个srv的服务的函数里面都有自带有一个上下问的对象,我们这里看看一下它的源码:第一个实现RpcContext的类

class ServicerContext(six.with_metaclass(abc.ABCMeta, RpcContext)):

子类:

class _Context(grpc.ServicerContext):

    def __init__(self, rpc_event, state, request_deserializer):
        self._rpc_event = rpc_event
        self._state = state
        self._request_deserializer = request_deserializer

    def is_active(self):
        with self._state.condition:
            return _is_rpc_state_active(self._state)

    def time_remaining(self):
        return max(self._rpc_event.call_details.deadline - time.time(), 0)

    def cancel(self):
        self._rpc_event.call.cancel()

    def add_callback(self, callback):
        with self._state.condition:
            if self._state.callbacks is None:
                return False
            else:
                self._state.callbacks.append(callback)
                return True

    def disable_next_message_compression(self):
        with self._state.condition:
            self._state.disable_next_compression = True

    def invocation_metadata(self):
        return self._rpc_event.invocation_metadata

    def peer(self):
        return _common.decode(self._rpc_event.call.peer())

    def peer_identities(self):
        return cygrpc.peer_identities(self._rpc_event.call)

    def peer_identity_key(self):
        id_key = cygrpc.peer_identity_key(self._rpc_event.call)
        return id_key if id_key is None else _common.decode(id_key)

    def auth_context(self):
        return {
            _common.decode(key): value for key, value in six.iteritems(
                cygrpc.auth_context(self._rpc_event.call))
        }

    def set_compression(self, compression):
        with self._state.condition:
            self._state.compression_algorithm = compression

    def send_initial_metadata(self, initial_metadata):
        with self._state.condition:
            if self._state.client is _CANCELLED:
                _raise_rpc_error(self._state)
            else:
                if self._state.initial_metadata_allowed:
                    operation = _get_initial_metadata_operation(
                        self._state, initial_metadata)
                    self._rpc_event.call.start_server_batch(
                        (operation,), _send_initial_metadata(self._state))
                    self._state.initial_metadata_allowed = False
                    self._state.due.add(_SEND_INITIAL_METADATA_TOKEN)
                else:
                    raise ValueError('Initial metadata no longer allowed!')

    def set_trailing_metadata(self, trailing_metadata):
        with self._state.condition:
            self._state.trailing_metadata = trailing_metadata

    def trailing_metadata(self):
        return self._state.trailing_metadata

    def abort(self, code, details):
        # treat OK like other invalid arguments: fail the RPC
        if code == grpc.StatusCode.OK:
            _LOGGER.error(
                'abort() called with StatusCode.OK; returning UNKNOWN')
            code = grpc.StatusCode.UNKNOWN
            details = ''
        with self._state.condition:
            self._state.code = code
            self._state.details = _common.encode(details)
            self._state.aborted = True
            raise Exception()

    def abort_with_status(self, status):
        self._state.trailing_metadata = status.trailing_metadata
        self.abort(status.code, status.details)

    def set_code(self, code):
        with self._state.condition:
            self._state.code = code

    def code(self):
        return self._state.code

    def set_details(self, details):
        with self._state.condition:
            self._state.details = _common.encode(details)

    def details(self):
        return self._state.details

    def _finalize_state(self):
        pass

6.3 共享上下文和服务端上下文方法

实现类里面大概有一些方法是我们需要去了解的:

  • is_active() :判断客户端是否还存活
  • time_remaining :超时剩余时间,如果为请求设置了超时时间的话,则可以获取
  • cancel 取消当前请求,主动的进行链接的取消,当服务端调用这个函数后,客户端会直接的抛出以下的异常:
grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
 status = StatusCode.CANCELLED
 details = "Cancelled"
 debug_error_string = "{"created":"@1636954326.072000000","description":"Error received from peer
  • add_callback() :添加一个RPC终止时调用的回调函数(如果链接断开,则不会调用)

  • disable_next_message_compression :禁用下一条响应消息的压缩,此方法将覆盖在服务器创建期间或在调用时设置的任何压缩配置集

  • invocation_metadata:获取当前自定义一些元数据信息,其实就是获取【请求头】信息

  • set_compression :设置当时数据传输的压缩相关的算法

  • send_initial_metadata:发送元数据信息

  • set_trailing_metadata():设置当前传输的自定义的【响应报文头】元数据信息

  • trailing_metadata:元数据的获取

  • abort(self, code, details): 打断连接

  • abort_with_status

  • set_code 设置异常的时候抛出的状态码

  • code 获取抛出异常的状态码

  • set_details 和 details 设置和获取异常信息

6.4 客户端上下文方法

下文有对上下文在一下方法有调用的示例,如获取异常信息!

  • code() :注意是一个方法,不是一个属性,返回服务端的响应码
  • details():返回服务端的响应描述
  • initial_metadata() 获取服务端发送的元数据信息【返回是初值元数据】
  • trailing_metadata() 访问服务器发送的跟踪元数据【返回是尾随元数据】

上述的方法都会将导致阻塞,直到该值可用为止

6.5 异常状态码

分析相关状态码:

@enum.unique
class StatusCode(enum.Enum):
    """Mirrors grpc_status_code in the gRPC Core.

    Attributes:
      OK: Not an error; returned on success
      CANCELLED: The operation was cancelled (typically by the caller).
      UNKNOWN: Unknown error.
      INVALID_ARGUMENT: Client specified an invalid argument.
      DEADLINE_EXCEEDED: Deadline expired before operation could complete.
      NOT_FOUND: Some requested entity (e.g., file or directory) was not found.
      ALREADY_EXISTS: Some entity that we attempted to create (e.g., file or directory)
        already exists.
      PERMISSION_DENIED: The caller does not have permission to execute the specified
        operation.
      UNAUTHENTICATED: The request does not have valid authentication credentials for the
        operation.
      RESOURCE_EXHAUSTED: Some resource has been exhausted, perhaps a per-user quota, or
        perhaps the entire file system is out of space.
      FAILED_PRECONDITION: Operation was rejected because the system is not in a state
        required for the operation's execution.
      ABORTED: The operation was aborted, typically due to a concurrency issue
        like sequencer check failures, transaction aborts, etc.
      UNIMPLEMENTED: Operation is not implemented or not supported/enabled in this service.
      INTERNAL: Internal errors.  Means some invariants expected by underlying
        system has been broken.
      UNAVAILABLE: The service is currently unavailable.
      DATA_LOSS: Unrecoverable data loss or corruption.
    """
    OK = (_cygrpc.StatusCode.ok, 'ok')
    CANCELLED = (_cygrpc.StatusCode.cancelled, 'cancelled')
    UNKNOWN = (_cygrpc.StatusCode.unknown, 'unknown')
    INVALID_ARGUMENT = (_cygrpc.StatusCode.invalid_argument, 'invalid argument')
    DEADLINE_EXCEEDED = (_cygrpc.StatusCode.deadline_exceeded,
                         'deadline exceeded')
    NOT_FOUND = (_cygrpc.StatusCode.not_found, 'not found')
    ALREADY_EXISTS = (_cygrpc.StatusCode.already_exists, 'already exists')
    PERMISSION_DENIED = (_cygrpc.StatusCode.permission_denied,
                         'permission denied')
    RESOURCE_EXHAUSTED = (_cygrpc.StatusCode.resource_exhausted,
                          'resource exhausted')
    FAILED_PRECONDITION = (_cygrpc.StatusCode.failed_precondition,
                           'failed precondition')
    ABORTED = (_cygrpc.StatusCode.aborted, 'aborted')
    OUT_OF_RANGE = (_cygrpc.StatusCode.out_of_range, 'out of range')
    UNIMPLEMENTED = (_cygrpc.StatusCode.unimplemented, 'unimplemented')
    INTERNAL = (_cygrpc.StatusCode.internal, 'internal')
    UNAVAILABLE = (_cygrpc.StatusCode.unavailable, 'unavailable')
    DATA_LOSS = (_cygrpc.StatusCode.data_loss, 'data loss')
    UNAUTHENTICATED = (_cygrpc.StatusCode.unauthenticated, 'unauthenticated')

相关状态码表示的异常描述为:

  • OK 默认都是这个,调用返回成功的时候
  • CANCELLED 表示的是链接已断开的错误状态
  • UNKNOWN 表示未知的错误,当我们的服务端出现了未知的异常错误,类似web500之类的(使用about为正常传参数的时候就会有这错误)
  • INVALID_ARGUMENT 表示对客户端提交的参数校验失败错误
  • DEADLINE_EXCEEDED 表示请求超时的错误
  • NOT_FOUND 表示请求的函数或资源找不到
  • ALREADY_EXISTS 表示请求处理资源已存在,类似数据库的唯一索引的时候那种错误
  • PERMISSION_DENIED 权限错误,无权限访问
  • UNAUTHENTICATED 表示认证失败,无效信息认证
  • RESOURCE_EXHAUSTED 表示请求资源已消耗完毕,无可用资源
  • FAILED_PRECONDITION 表示请求处理被拒绝
  • ABORTED 表示请求被打断终止了请求,操作被中止,通常是由于并发问题,如顺序检查失败、事务中止等。
  • UNIMPLEMENTED 表示暂时不支持此类的请求处理或无法执行请求处理
  • INTERNAL 表示意外异常错误好像和UNKNOWN有点类似
  • UNAVAILABLE 服务无法正常运行,服务不可用
  • DATA_LOSS 表示数据丢失

6.6 异常处理示例

服务端抛异常:

# 实现 proto文件中定义的 GreeterServicer的接口
class Greeter(hello_pb2_grpc.GreeterServicer):
    # 实现 proto 文件中定义的 rpc 调用
    def SayHello(self, request, context):
        # 返回是我们的定义的响应体的对象
        return hello_pb2.HelloReply(message='hello {msg}'.format(msg=request.name))

    def SayHelloAgain(self, request, context):
        # 返回是我们的定义的响应体的对象

        # 设置异常状态码
        context.set_code(grpc.StatusCode.PERMISSION_DENIED)
        context.set_details("你没有这个访问的权限")
        raise context

        return hello_pb2.HelloReply(message='hello {msg}'.format(msg=request.name))

客户端接收异常:

image.png

客户端处理异常:

#!/usr/bin/evn python
# coding=utf-8

import grpc
import hello_pb2
import hello_pb2_grpc


def run():
    # 连接 rpc 服务器
    with grpc.insecure_channel('localhost:50051') as channel:
        # 通过通道服务一个服务
        stub = hello_pb2_grpc.GreeterStub(channel)
        # 生成请求我们的服务的函数的时候,需要传递的参数体,它放在hello_pb2里面-请求体为:hello_pb2.HelloRequest对象
        try:
            response = stub.SayHelloAgain(hello_pb2.HelloRequest(name='欢迎下次光临'))
            print("SayHelloAgain函数调用结果的返回: " + response.message)
        except grpc._channel._InactiveRpcError as e:
            print(e.code())
            print(e.details())


if __name__ == '__main__':
    run()

最终输出:

image.png

6.6 initial_metadata和trailing_metadata

  • 初始元数据 initial_metadata
    • 初始元数据 initial_metadata 其实可以理解文的客户端的请求头信息
  • 尾随元数据 trailing_metadata
    • 尾随元数据 trailing_metadata的方法,其实可以理解是响应报文头信息

6.6.1 服务端设置响应报文头信息

通常我们的如果有特殊的需要,需要返回响应的报文头信息的话,可以通过采取类似的方法来实现需求:

如服务端,返回一个响应报文信息:

def set_trailing_metadata(self, trailing_metadata):
    with self._state.condition:
        self._state.trailing_metadata = trailing_metadata

源码分析:传入的参数格式:

image.pngimage.png

我勒个去,传的是元组,Tuple,仔细分析的一下其他意思是:

  • 我需要传一个元组的对象,(MetadataKey,MetadataValue)
  • *args 表示我的我可以接收多个值

于是乎有一下服务端示例:

image.png

但是此时启动客户端请求的时候,客户端就卡死了一直没响应!:查看服务端输出信息为:

 validate_metadata: {"created":"@1636960201.178000000","description":"Illegal header value","fil

大概意思就是说:你的元素校验不通过!!!我把中文改为其他的时候,我擦,!!!!!有可以通过!!

image.png

看来是队我们的中文支持是有问题滴啊!想来好像我们的头文件好像似乎也没设置过中文的吧!!!所以呵呵!怪我自己了!

6.6.2  客户端获取响应报文头信息

参考来自:http://cn.voidcc.com/question/p-srroqmsj-bme.html的解决方案

所以有了以下的处理:

def run():
    # 连接 rpc 服务器
    with grpc.insecure_channel('localhost:50051') as channel:
        # 通过通道服务一个服务
        stub = hello_pb2_grpc.GreeterStub(channel)
        # 生成请求我们的服务的函数的时候,需要传递的参数体,它放在hello_pb2里面-请求体为:hello_pb2.HelloRequest对象
        try:
            response,callbask = stub.SayHelloAgain.with_call(hello_pb2.HelloRequest(name='欢迎下次光临'))
            print("SayHelloAgain函数调用结果的返回: " + response.message)
            print("SayHelloAgain函数调用结果的返回---响应报文头信息: " ,callbask.trailing_metadata())
        except grpc._channel._InactiveRpcError as e:
            print(e.code())
            print(e.details())

6.6.3 获取服务端获取客户端请求头信息

服务端:

#!/usr/bin/evn python
# -*- coding: utf-8 -*-

from concurrent import futures
import time
import grpc
import hello_pb2
import hello_pb2_grpc


# 实现 proto文件中定义的 GreeterServicer的接口
class Greeter(hello_pb2_grpc.GreeterServicer):
    # 实现 proto 文件中定义的 rpc 调用
    def SayHello(self, request, context):
        # 返回是我们的定义的响应体的对象
        return hello_pb2.HelloReply(message='hello {msg}'.format(msg=request.name))

    def SayHelloAgain(self, request, context):
        # 返回是我们的定义的响应体的对象

        # # 设置异常状态码
        # context.set_code(grpc.StatusCode.PERMISSION_DENIED)
        # context.set_details("你没有这个访问的权限")
        # raise context

        # 接收请求头的信息
        print("接收到的请求头元数据信息",context.invocation_metadata())
        # 设置响应报文头信息
        context.set_trailing_metadata((('name','223232'),('sex','23232')))
        return hello_pb2.HelloReply(message='hello {msg}'.format(msg=request.name))


def serve():
    # 实例化一个rpc服务,使用线程池的方式启动我们的服务
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    # 添加我们服务
    hello_pb2_grpc.add_GreeterServicer_to_server(Greeter(), server)
    # 配置启动的端口
    server.add_insecure_port('[::]:50051')
    #  开始启动的服务
    server.start()
    # wait_for_termination --主要是为了目标启动后主进程直接的结束!需要一个循环的方式进行进行进程运行
    server.wait_for_termination()


if __name__ == '__main__':
    serve()

客户端提交自定义请求头信息:

#!/usr/bin/evn python
# -*- coding: utf-8 -*-


import grpc
import hello_pb2
import hello_pb2_grpc


def run():
    # 连接 rpc 服务器
    with grpc.insecure_channel('localhost:50051') as channel:
        # 通过通道服务一个服务
        stub = hello_pb2_grpc.GreeterStub(channel)
        # 生成请求我们的服务的函数的时候,需要传递的参数体,它放在hello_pb2里面-请求体为:hello_pb2.HelloRequest对象
        try:

            reest_header = (
                ('mesasge', '1010'),
                ('error', 'No Error')
            )

            response, callbask = stub.SayHelloAgain.with_call(request=hello_pb2.HelloRequest(name='欢迎下次光临'),
                                                              # 设置请求的超时处理
                                                              timeout=5,
                                                              # 设置请求的头的信息
                                                              metadata=reest_header,
                                                              )
            print("SayHelloAgain函数调用结果的返回: " + response.message)
            print("SayHelloAgain函数调用结果的返回---响应报文头信息: ", callbask.trailing_metadata())
        except grpc._channel._InactiveRpcError as e:
            print(e.code())
            print(e.details())


if __name__ == '__main__':
    run()

输出的结果为:

客户端:

SayHelloAgain函数调用结果的返回: hello 欢迎下次光临
SayHelloAgain函数调用结果的返回---响应报文头信息:  (_Metadatum(key='name', value='223232'), _Metadatum(key='sex', value='23232'))

服务端:

接收到的请求头元数据信息 (_Metadatum(key='mesasge', value='1010'), _Metadatum(key='error', value='No Error'), _Metadatum(key='user-agent', value='grpc-python/1.41.1 grpc-c/19.0.0 (windows; chttp2)'))

6.7 数据传输大小修改和数据解压缩

通常我们的服务一般会设置能接收的数据的上限和能发送数据的上限,所以我们的可以对我们的服务端和客户端都进行相关的传输的数据的大小的限制。

另外对于传输数据过大的情况下,我们的通信也会对数据进行相关解压缩,加快的数据高效传输,。对于服务端来说我们的可以设置全局压缩和局部压缩。

  • 服务端数据压缩和数据限制:
#!/usr/bin/evn python
# -*- coding: utf-8 -*-

from concurrent import futures
import time
import grpc
import hello_pb2
import hello_pb2_grpc


# 实现 proto文件中定义的 GreeterServicer的接口
class Greeter(hello_pb2_grpc.GreeterServicer):
    # 实现 proto 文件中定义的 rpc 调用
    def SayHello(self, request, context):
        # 返回是我们的定义的响应体的对象
        return hello_pb2.HelloReply(message='hello {msg}'.format(msg=request.name))

    def SayHelloAgain(self, request, context):
        # 返回是我们的定义的响应体的对象

        # # 设置异常状态码
        # context.set_code(grpc.StatusCode.PERMISSION_DENIED)
        # context.set_details("你没有这个访问的权限")
        # raise context

        # 接收请求头的信息
        print("接收到的请求头元数据信息", context.invocation_metadata())
        # 设置响应报文头信息
        context.set_trailing_metadata((('name', '223232'), ('sex', '23232')))
        # 三种的压缩机制处理
        # NoCompression = _compression.NoCompression
        # Deflate = _compression.Deflate
        # Gzip = _compression.Gzip
        # 局部的数据进行压缩
        context.set_compression(grpc.Compression.Gzip)
        return hello_pb2.HelloReply(message='hello {msg}'.format(msg=request.name))


def serve():
    # 实例化一个rpc服务,使用线程池的方式启动我们的服务

    # 服务一些参数信息的配置
    options = [
        ('grpc.max_send_message_length', 60 * 1024 * 1024),  # 限制发送的最大的数据大小
        ('grpc.max_receive_message_length', 60 * 1024 * 1024),  # 限制接收的最大的数据的大小
    ]
    # 三种的压缩机制处理
    # NoCompression = _compression.NoCompression
    # Deflate = _compression.Deflate
    # Gzip = _compression.Gzip
    # 配置服务启动全局的数据传输的压缩机制
    compression = grpc.Compression.Gzip
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10),
                         options=options,
                         compression=compression)
    # 添加我们服务
    hello_pb2_grpc.add_GreeterServicer_to_server(Greeter(), server)
    # 配置启动的端口
    server.add_insecure_port('[::]:50051')
    #  开始启动的服务
    server.start()
    # wait_for_termination --主要是为了目标启动后主进程直接的结束!需要一个循环的方式进行进行进程运行
    server.wait_for_termination()


if __name__ == '__main__':
    serve()
  • 客户端端数据压缩和数据限制:
#!/usr/bin/evn python
# -*- coding: utf-8 -*-


import grpc
import hello_pb2
import hello_pb2_grpc


def run():
    # 连接 rpc 服务器
    options = [
        ('grpc.max_send_message_length', 60 * 1024 * 1024),  # 限制发送的最大的数据大小
        ('grpc.max_receive_message_length', 60 * 1024 * 1024),  # 限制接收的最大的数据的大小
    ]
    # 三种的压缩机制处理
    # NoCompression = _compression.NoCompression
    # Deflate = _compression.Deflate
    # Gzip = _compression.Gzip
    # 配置服务启动全局的数据传输的压缩机制
    compression = grpc.Compression.Gzip
    # 配置相关的客户端一些参数信息
    # 配置相关的客户端一些参数信息
    # 配置相关的客户端一些参数信息
    with grpc.insecure_channel(target='localhost:50051',
                               options=options,
                               compression = compression
                               ) as channel:
        # 通过通道服务一个服务
        stub = hello_pb2_grpc.GreeterStub(channel)
        # 生成请求我们的服务的函数的时候,需要传递的参数体,它放在hello_pb2里面-请求体为:hello_pb2.HelloRequest对象
        try:

            reest_header = (
                ('mesasge', '1010'),
                ('error', 'No Error')
            )

            response, callbask = stub.SayHelloAgain.with_call(request=hello_pb2.HelloRequest(name='欢迎下次光临'),
                                                              # 设置请求的超时处理
                                                              timeout=5,
                                                              # 设置请求的头的信息
                                                              metadata=reest_header,
                                                              )
            print("SayHelloAgain函数调用结果的返回: " + response.message)
            print("SayHelloAgain函数调用结果的返回---响应报文头信息: ", callbask.trailing_metadata())
        except grpc._channel._InactiveRpcError as e:
            print(e.code())
            print(e.details())


if __name__ == '__main__':
    run()

6.8 客户端重试机制

所谓的重试机制限流机制其实就是客户端请求服务没响应的情况,方式进行重试重连,但是也不是无限循环进行重试,需要有一个度。

以下的一些资料信息参考来自:https://blog.csdn.net/chinesehuazhou2/article/details/107852855

一些配置参数说明:

  • grpc.max_send_message_length 限制发送最大数据量
  • grpc.max_receive_message_length 限制最大接收数据量
  • grpc.enable_retries 透明重试机制,默认值是1开启,可以关闭设置为0
  • grpc.service_config -配置重试机制策略
{
    "retryPolicy":{
        "maxAttempts": 4,
        "initialBackoff": "0.1s",
        "maxBackoff": "1s",
        "backoffMutiplier": 2,
        "retryableStatusCodes": [
            "UNAVAILABLE" ] 
    }
}

PS:retryableStatusCodes 配置重试的错误码情况,上面的情况是当UNAVAILABLE的情况下才会触发重试,

可以指定重试次数等等,具体参数含义可参考官网,简单介绍一下:

-   maxAttempts 必须是大于 1 的整数,对于大于5的值会被视为5
-   initialBackoff 和 maxBackoff 必须指定,并且必须具有大于0
-   backoffMultiplier 必须指定,并且大于零
-   retryableStatusCodes 必须制定为状态码的数据,不能为空,并且没有状态码必须是有效的 gPRC 状态码,可以是整数形式,并且不区分大小写

6.9 客户端对冲重试策略

对冲是指

  • 如果一个方法使用对冲策略,那么首先会像正常的 RPC 调用一样发送第一次请求,如果配置时间内没有响应情况下会,那么直接发送第二次请求,以此类推,直到发送了 maxAttempts 次

  • 多次重试情况下,需要留意是后端负载均衡情况下的幂等性问题

6.10 客户端重试限流策略

  • 当客户端的失败和成功比超过某个阈值时,gRPC 会通过禁用这些重试策略来防止由于重试导致服务器过载
  • 实际限流参数配置,需根据服务器性能资源来制定

限流说明:

  • 每一个服务器,gRPC 客户端会维护一个 token_count 变量,最初设置为 maxToken , 值的范围是 0 - maxToken

  • 对于每个 RPC 请求都会对 token_count 产生一下效果

    • 每个失败的 RPC 请求都会递减token_count 1
    • 成功 RPC 将会递增 token_count和tokenRatio 如果 token_count <= ( maxTokens / 2), 则关闭重试策略,直到 token_count > (maxTokens/2),恢复重试

配置方法在servie config中配置一下信息:

"retryThrottling":{ "maxTokens": 10, "tokenRatio": 0.1 }

7:利用信号进行grpc 服务进程结束监听

通常我们使用grpc的时候做微服务的srv的时候,都需要一个机制来监听我们的服务进程的情况,用户服务的发现和注册已经注销。

如果服务不在注册中心,进行注销的话,就会引发请求到错误的后端。

这里其实我们主要是理由信号机制来对我们的服务进行监听。

PS:window下支持信号有限,对KeyboardInterrupt也无法捕获,直接从进程管理器结束进程也无法知晓

完整示例:

```
#!/usr/bin/evn python
# -*- coding: utf-8 -*-
import sys
from concurrent import futures
import time
import grpc
import hello_pb2
import hello_pb2_grpc
import signal

# 实现 proto文件中定义的 GreeterServicer的接口
class Greeter(hello_pb2_grpc.GreeterServicer):
    # 实现 proto 文件中定义的 rpc 调用
    def SayHello(self, request, context):
        # 返回是我们的定义的响应体的对象
        return hello_pb2.HelloReply(message='hello {msg}'.format(msg=request.name))

    def SayHelloAgain(self, request, context):
        # 返回是我们的定义的响应体的对象

        # # 设置异常状态码
        # context.set_code(grpc.StatusCode.PERMISSION_DENIED)
        # context.set_details("你没有这个访问的权限")
        # raise context

        # 接收请求头的信息
        print("接收到的请求头元数据信息", context.invocation_metadata())
        # 设置响应报文头信息
        context.set_trailing_metadata((('name', '223232'), ('sex', '23232')))
        # 三种的压缩机制处理
        # NoCompression = _compression.NoCompression
        # Deflate = _compression.Deflate
        # Gzip = _compression.Gzip
        # 局部的数据进行压缩
        context.set_compression(grpc.Compression.Gzip)
        return hello_pb2.HelloReply(message='hello {msg}'.format(msg=request.name))




def serve():
    # 实例化一个rpc服务,使用线程池的方式启动我们的服务

    # 服务一些参数信息的配置
    options = [
        ('grpc.max_send_message_length', 60 * 1024 * 1024),  # 限制发送的最大的数据大小
        ('grpc.max_receive_message_length', 60 * 1024 * 1024),  # 限制接收的最大的数据的大小
    ]
    # 三种的压缩机制处理
    # NoCompression = _compression.NoCompression
    # Deflate = _compression.Deflate
    # Gzip = _compression.Gzip
    # 配置服务启动全局的数据传输的压缩机制
    compression = grpc.Compression.Gzip
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10),
                         options=options,
                         compression=compression)
    # 添加我们服务
    hello_pb2_grpc.add_GreeterServicer_to_server(Greeter(), server)
    # 配置启动的端口
    server.add_insecure_port('[::]:50051')
    #  开始启动的服务
    server.start()

    def stop_serve(signum, frame):
        print("进程结束了!!!!")
        # sys.exit(0)
        raise KeyboardInterrupt

    # 注销相关的信号
    # SIGINT 对应windos下的 ctrl+c的命令
    # SIGTERM 对应的linux下的kill命令
    signal.signal(signal.SIGINT, stop_serve)
    # signal.signal(signal.SIGTERM, stop_serve)

    # wait_for_termination --主要是为了目标启动后主进程直接的结束!需要一个循环的方式进行进行进程运行
    server.wait_for_termination()


if __name__ == '__main__':
    serve()
```

8:使用协程的方式进行服务启动

8.1 安装依赖包

上面的示例中都是基于线程池的方式来处理并发,以下是使用协程的方式进行处理示例。

首先安装新的依赖包:

相关的版本要对应的上:

grpcio-reflection==1.41.1
pip install grpcio-reflection -i https://pypi.tuna.tsinghua.edu.cn/simple

最终安装后的:

image.png

8.2 修改服务端启动

修改我们的服务端代码(修改的的是3.3小节的代码):

#!/usr/bin/evn python
# -*- coding: utf-8 -*-

import grpc
import hello_pb2
import hello_pb2_grpc
from grpc_reflection.v1alpha import reflection
import asyncio

# 实现 proto文件中定义的 GreeterServicer的接口
class Greeter(hello_pb2_grpc.GreeterServicer):
  # 实现 proto 文件中定义的 rpc 调用
  async def SayHello(self, request, context):
      # 返回是我们的定义的响应体的对象
      return hello_pb2.HelloReply(message='hello {msg}'.format(msg=request.name))

  async def SayHelloAgain(self, request, context):
      # 返回是我们的定义的响应体的对象
      return hello_pb2.HelloReply(message='hello {msg}'.format(msg=request.name))


async def serve():
  # 实例化一个rpc服务,使用线程池的方式启动我们的服务
  service_names = (
      hello_pb2.DESCRIPTOR.services_by_name["Greeter"].full_name,
      reflection.SERVICE_NAME,
  )

  server = grpc.aio.server()
  # 添加我们服务
  hello_pb2_grpc.add_GreeterServicer_to_server(Greeter(), server)
  reflection.enable_server_reflection(service_names, server)
  # 配置启动的端口
  server.add_insecure_port('[::]:50051')
  await server.start()
  await server.wait_for_termination()



if __name__ == '__main__':
    asyncio.run(serve())

8.3 启动客户端调用

我们的客户端保持原来的3.3小节的客户端不变:

#!/usr/bin/evn python
# coding=utf-8

import grpc
import hello_pb2
import hello_pb2_grpc


def run():
    # 连接 rpc 服务器
    with grpc.insecure_channel('localhost:50051') as channel:
        # 通过通道服务一个服务
        stub = hello_pb2_grpc.GreeterStub(channel)
        # 生成请求我们的服务的函数的时候,需要传递的参数体,它放在hello_pb2里面-请求体为:hello_pb2.HelloRequest对象
        response = stub.SayHello(hello_pb2.HelloRequest(name='小钟同学'))
        print("SayHello函数调用结果返回:: " + response.message)
        response = stub.SayHelloAgain(hello_pb2.HelloRequest(name='欢迎下次光临'))
        print("SayHelloAgain函数调用结果的返回: " + response.message)


if __name__ == '__main__':
    run()

直接启动也可以正常进行和服务端的通信。

转发自https://www.modb.pro/db/185486

标签:name,pb2,GRPC,Fastapi,self,grpc,入门篇,hello,客户端
From: https://www.cnblogs.com/a00ium/p/16931128.html

相关文章

  • FastAPI使用typing类型提示
    typing是Python标准库,用来做类型提示。FastAPI使用typing做了:编辑器支持;类型检查;定义类型,requestpathparameters,queryparameters,headers,bodies,dependencies等等;......
  • 微服务、gGRPC、protobuf、rest和json
    微服务、gGRPC、protobuf、rest和json到目前为止,基于REST的API已经成为大多数服务间通信的首选架构。虽然基于REST/JSON的通信有几个好处,并且得到跨语言和提供......
  • python grpc
    pythonrpc进入examples/python/route_guide,运行:pipinstallgrpcio-toolspython-mgrpc_tools.protoc-I../../protos--python_out=.--pyi_out=.--grpc_python_out......
  • php 安装 grpc 扩展
    升级gcc默认gcc的版本为4.8.5,grpc扩展需要支持c++14查询对照的话gcc6.1就可以完全至此c++14(转载至)如图:踩过的坑,重新安装升级了gcc6.5.0的版本,安装grpc还是提示如下:......
  • fastapi框架
    1/kaxiluo/fastapi-frame/2|--app3||--commands-----放置一些命令行4||`--__init__.py5||--exceptions......
  • fastapi异常处理
    DEMOitems={"test":"这是测试"}@app.get("/item/{item_id}")defread_item(item_id:str):ifitem_idnotinitems:raiseHTTPException(status_code=40......
  • fastapi学习
    Get请求fromfastapiimportFastAPIapp=FastAPI()@app.get("/items/{item_id}")asyncdefread_root(item_id:int):return{"item_id":item_id}#在最......
  • fastapi学习之路
    一、python3写一个http接口服务,给别人调用3这次选择fastapi,FastAPI是一个现代的、快速(高性能)的web框架,用于基于标准Python类型提示使用Python3.6+构建api。具有快速......
  • FastAPI项目的Nginx配置
    前景:已经使用supervisor把FastAPI开发的后端服务挂载到端口上Nginx的配置如下:upstreamhuiyuan_api{server127.0.0.1:9120;}server{server_namehui.wak......
  • golang grpc使用示例
    疑问写前面grpc有内部对心跳的处理吗,还是说,双工需要自己作心跳管理,有懂的留言一下。SEO优化grpc如何双工通信?grpc如何从服务端推送消息给客户端?gprc环境如何搭建?grpc......