说明
gRPC是基于HTTP/2协议传输,使用Protocol Buffers作为接口描述语言,并提供认证(authentication)、双向流(bidirectional streaming)和流量控制、阻塞或非阻塞绑定以及取消和超时(Deadlines)等功能的跨平台开源
的高性能RPC框架。
使用场景
- 低延迟、高度可扩展的分布式系统。
- 开发与云服务器通信的移动客户端。
- 设计一种新的协议,需要具备精确、高效且与语言无关的特性。
- 分层设计以支持扩展,例如认证、负载均衡、日志记录和监控等
四种通信模式
单一请求-响应
最简单的方式,客户端发送一个请求,服务端响应请求
服务端流式
客户端发送一个请求,服务端返回消息流,服务端发送完所有消息后,服务端的状态详细信息(状态码及可选的状态消息)和可选的元数据将发送给客户端。
客户端流式
客户端发送消息流,服务端通常但不一定在收到所有客户端消息后,以一条消息进行(及其状态详细信息及可选的元数据)响应
双边流式
由客户端发起调用,服务端接收客户端元数据、方法和截止时间,服务器可以选择在客户端开始发送消息之前就返回初始元数据,也可以等待客户端开始流式传输消息后再返回初始元数据。
客户端和服务器端的流处理是特定于应用程序的。由于这两个流是独立的,客户端和服务器可以按任何顺序读取和写入消息。例如,服务器可以等到收到客户端的所有消息后再发送自己的消息,或者服务器和客户端可以像“乒乓球”一样进行通信——服务器接收到一个请求后发送回一个响应,然后客户端根据该响应发送另一个请求,如此往复。
什么是gRPC Channels
gRPC Channels提供了与指定主机和端口上的 gRPC 服务器的连接。它在创建客户端存根时使用。客户端可以指定通道参数来修改 gRPC 的默认行为,例如开启或关闭消息压缩。通道具有状态,包括连接状态和空闲状态。对于简单的客户端和服务端架构,通常一个 Channel 对应一个 TCP 连接,并通过该连接处理所有请求。在复杂的场景下,一个 Channel 可能会管理多个 TCP 连接。特别是在客户端需要连接多个服务端实例或使用负载均衡时,Channel 会创建和维护多个 TCP 连接,以便在不同的连接间分发请求。
特性
Authentication(认证)
gRPC支持SSL/TLS、ALTS、Token-based authentication with Google(gRPC 提供了一个通用机制,可以将包含认证信息的元数据附加到请求和响应中。这些认证信息通常以 token 的形式存在,如 OAuth2 token)等认证机制。
此外gRPC提供了凭据插件API,允许自定义扩展认证方式。
Benchmarking(基准测试)
Cancellation(取消操作)
gRPC支持取消调用,当客户端因为某种原因需要取消操作时,可以发送一个取消信号。服务端接收到信号后,应该停止任何计算并结束流的一侧。理想情况下,客户端的取消操作应该传播到整个调用链中,使每一端都能及时取消操作。截止日期到期或I/O错误,也会触发Cancellation
Compression(压缩)
gRPC允许非对称压缩通信,请求和响应的压缩方式可以不同,或者根本不压缩。如果客户端使用了服务端不支持的压缩算法,服务端将会返回UNIMPLEMENTED错误状态
Custom Backend Metrics
gRPC 自定义后端指标(Custom Backend Metrics)是 gRPC 流量控制和负载均衡策略中的一个概念。这些指标允许 gRPC 客户端或服务网格(如 Istio)根据后端服务的实时性能数据来做出更加智能的路由决策。可以与负载均衡算法结合使用(比如根据:CPU 使用率、内存使用率、请求队列长度、请求处理时间),以实现更复杂的负载均衡策略
Custom Load Balancing Policies
允许开发者自定义负载均衡策略。gRPC负载均衡策略通过名称解析器获得服务器IP地址列表。该策略负责维护与这些服务器的连接(子通道),并在发送RPC时选择一个连接来使用。
Custom Name Resolution
CustomName Resolution(自定义名称解析)允许开发者根据实际需求和服务场景,实现自定义的服务名称到服务实例地址的映射机制
Deadlines(截止时间/超时时间)
Deadlines 是客户端等待服务端响应的最大截止时间。如果在指定时间内服务器没有返回响应,客户端将终止请求,并返回一个 DEADLINE_EXCEEDED 错误。这种机制帮助防止客户端无限期地等待一个可能永远不会完成的请求,从而提高系统的可靠性和响应性。默认gRPC没有设置deadline,为了避免无限等待,需要明确为客户端设置一个等待时间。
在Java和Go的gRPC实现中,deadline propagation默认开启,意味着它将传播给整个调用链。对于其它(如:C++)需要显示开启是否传播
Debugging
gRPC提供了调试工具grpcdebug,grpcdebug是gRPC生态系统中的一个命令行工具,提供了监控、调试、健康检查、以及配置状态观察等功能。
Error handling
gRPC 提供了一套标准的错误代码和机制,帮助客户端和服务器明确地传达和处理错误情况。
const (
// OK is returned on success.
OK Code = 0
Canceled Code = 1
Unknown Code = 2
InvalidArgument Code = 3
.......
)
Flow Control
gRPC支持流量控制,主要用于流式RPCs,与一元RPCs无关。gRPC利用底层传输来检测何时可以安全地发送更多数据。当接收方读取数据时,会向发送方返回确认,让其知道接收方有更多的容量。
不同语言gRP针对Flow Control实现有所区别。由于gRPC-Go使用阻塞式API进行流操作,因此流控制推回是通过达到流的流量控制时简单地阻止流上的发送操作来实现的。当接收方从流中读取了足够的数据时,发送操作将自动解除阻塞。 而gRPC-java是基于Netty框架实现,Netty是一个异步基于事件驱动的网络应用框架。流控制是通过异步回调和事件机制来管理的。
Health Checking
gRPC提供了标准API,用于对gRPC服务器进行健康检查。gRPC提供了两种检查模式,根据实际的健康检查需要,可以使用不同的模式
service Health {
//简单的请求-响应模式(Unary RPC)
rpc Check(HealthCheckRequest) returns (HealthCheckResponse);
//流模式,当服务端状态发生变化时,会立马发送一条消息
rpc Watch(HealthCheckRequest) returns (stream HealthCheckResponse);
}
Interceptors
gRPC支持拦截器功能,这些拦截器可以在客户端和服务端实现,用于处理诸如认证、日志记录、限流、重试、监控等各种任务。
Keepalive
gRPC Keepalive是基于 HTTP/2 的 Ping 帧机制用于客户端与服务端之间保活。
Metadata
gRPC Metadata 是 gRPC 框架中用于在客户端和服务端之间传递附加信息的机制。它类似于 HTTP 的请求头和响应头,可以携带各种元数据信息,如认证令牌、请求 ID、用户信息等。这些元数据可以在调用之前或调用期间进行传递。
OpenTelemetry Metrics
gRPC 与 OpenTelemetry 的集成可以用来收集和报告 gRPC 服务的各种指标(Metrics),帮助监控和优化分布式系统的性能
Performance Best Practices(性能最佳实践)
- 尽可能的复用stubs和channels
- 使用keepalive pings来保持http/2连接,以便快速初始rpc,而不会造成延迟
- 当处理长期逻辑数据流时,使用流式RPCs,流可以避免频繁rpc初始化,包括在客户端连接负载均衡、在传输层启动新的http/2请求、在服务端调用用户定义的方法处理程序。
- 为应用程序都每个高负载区域创建单独的通道
- 使用通道池,将RPC调用分配到多个连接上
- (Java)使用非阻塞stubs来并行化RPCs
- (Java)提供一个自定义的执行器(executor),根据你的工作负载来限制线程的数量(cached (default), fixed, forkjoin, etc)
Reflection
gRPC Reflection 允许客户端在运行时动态地查询服务器的服务元数据(服务列表、方法列表、消息类型等)。Reflection可以用于开发和调试,使用工具如 grpcurl 或 grpc_cli,你可以在没有 .proto 文件的情况下与 gRPC 服务交互。API 发现:客户端可以动态发现并调用未知的 gRPC 服务。注意需要在服务端启用gRPC Reflection才能使用Reflection。
Request Hedging
在 gRPC 中,Hedging 是两种可配置重试策略之一。使用 Hedging 策略时,gRPC 客户端会将相同的请求发送到多个后端实例,并使用最快响应的结果。客户端随后会取消所有未完成的请求,并将第一个收到的响应转发给应用程序
Retry
支持重试功能
Service Config
服务配置指定gRPC客户端在与gRPC服务器交互时应如何表现。服务所有者可以提供包含所有服务客户端的预期行为的服务配置。服务配置可以通过名称解析(name resolution)或由客户端应用程序以编程的方式提供给客户端
Status Codes
所有RPCs都会返回一个状态给客户端。其中以下几个状态只会由用户代码生成,而不是gRPC库
- INVALID_ARGUMENT
- NOT_FOUND
- ALREADY_EXISTS
- FAILED_PRECONDITION
- ABORTED
- OUT_OF_RANGE
- DATA_LOSS
Wait-for-Ready
wait-for-ready 选项允许客户端在服务端未准备好(例如,暂时不可用或网络连接尚未建立)时等待,而不是立即返回错误。这个选项对于在动态或不稳定的网络环境中确保请求成功传递非常有用。默认情况下,wait-for-ready 是禁用的
代码例子
Go语言生成服务端例子
首选需要定义一个.proto文件
syntax = "proto3";
option go_package = "/helloworld";
option java_multiple_files = true;
option java_package = "com.demo.grpc.helloworld";
option java_outer_classname = "HelloWorldProto";
package helloworld;
// The greeting service definition.
service Greeter {
// Sends a greeting
rpc SayHello (HelloRequest) returns (HelloReply) {}
rpc SayHelloAgain (HelloRequest) returns (HelloReply) {}
rpc SayHelloStream(HelloRequest) returns (stream HelloReply) {}
rpc SayHelloClientStream(stream HelloRequest) returns (HelloReply) {}
rpc SayHelloBothStream(stream HelloRequest) returns (stream HelloReply) {}
}
// The request message containing the user's name.
message HelloRequest {
string name = 1;
}
// The response message containing the greetings
message HelloReply {
string message = 1;
}
需要安装两个go编译插件,
go install google.golang.org/protobuf/cmd/protoc-gen-go@latest
go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest
安装完成后,执行命令,根据实际需要调整参数,生成对应的go代码
protoc --go_out=. --go-grpc_out=. helloworld.proto
代码生成后,实现proto定义的相关接口,部分代码如下:
// SayHello 单一请求/响应 SayHello implements helloworld.GreeterServer
func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {
if deadline, ok := ctx.Deadline(); ok {
log.Printf("Deadline received: %v", deadline)
// 计算剩余时间
remainingTime := time.Until(deadline)
log.Printf("Time remaining: %v", remainingTime)
}
log.Printf("Received: %v", in.GetName())
time.Sleep(5 * time.Second)
return &pb.HelloReply{Message: "Hello " + in.GetName()}, nil
}
// SayHelloStream 服务端返回数据流
func (s *server) SayHelloStream(request *pb.HelloRequest, stream pb.Greeter_SayHelloStreamServer) error {
log.Printf("Received: %v", request.Name)
err := stream.Send(&pb.HelloReply{Message: "Hello stream 1"})
if err != nil {
return err
}
err = stream.Send(&pb.HelloReply{Message: "Hello stream 2"})
if err != nil {
return err
}
return nil
}
// SayHelloClientStream 客户端发送数据流
func (s *server) SayHelloClientStream(stream pb.Greeter_SayHelloClientStreamServer) error {
for {
helloRequest, err := stream.Recv()
if err == io.EOF {
return stream.SendAndClose(&pb.HelloReply{Message: "Hello client stream "})
}
if err != nil {
return err
}
log.Printf("received: %v", helloRequest.Name)
}
}
// SayHelloBothStream 双边流
func (s *server) SayHelloBothStream(stream pb.Greeter_SayHelloBothStreamServer) error {
for {
in, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return err
}
log.Printf("received stream: %v", in.Name)
stream.Send(&pb.HelloReply{Message: "Hello stream from grpcServer"})
}
}
Java语言生成客户端
对于Java,可以使用gRPC java的protobuf插件生成
$ protoc --plugin=protoc-gen-grpc-java \
--grpc-java_out="$OUTPUT_FILE" --proto_path="$DIR_OF_PROTO_FILE" "$PROTO_FILE"
也可以通过maven添加生成插件
<build>
<extensions>
<extension>
<groupId>kr.motd.maven</groupId>
<artifactId>os-maven-plugin</artifactId>
<version>1.7.1</version>
</extension>
</extensions>
<plugins>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.6.1</version>
<configuration>
<protocArtifact>com.google.protobuf:protoc:3.25.3:exe:${os.detected.classifier}</protocArtifact>
<pluginId>grpc-java</pluginId>
<pluginArtifact>io.grpc:protoc-gen-grpc-java:1.66.0:exe:${os.detected.classifier}</pluginArtifact>
</configuration>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>compile-custom</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
还要添加grpc相关的依赖
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty-shaded</artifactId>
<version>1.66.0</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-protobuf</artifactId>
<version>1.66.0</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-stub</artifactId>
<version>1.66.0</version>
</dependency>
<dependency> <!-- necessary for Java 9+ -->
<groupId>org.apache.tomcat</groupId>
<artifactId>annotations-api</artifactId>
<version>6.0.53</version>
<scope>provided</scope>
</dependency>
实现对应的客户端调用代码,部分调用代码:
public void greet(String name) {
logger.info("Will try to greet " + name + " ...");
HelloRequest request = HelloRequest.newBuilder().setName(name).build();
HelloReply response;
try {
response = blockingStub.withDeadlineAfter(3, TimeUnit.SECONDS).sayHello(request);
// response = blockingStub.sayHello(request);
} catch (StatusRuntimeException e) {
logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
return;
}
logger.info("Greeting: " + response.getMessage());
}
/**
* 双边流
*/
public void greetBothStream(String name) throws InterruptedException {
StreamObserver<HelloRequest> requestObserver = asyncStub.sayHelloBothStream(new StreamObserver<HelloReply>() {
@Override
public void onNext(HelloReply helloReply) {
logger.info("get message:"+helloReply.getMessage());
}
@Override
public void one rror(Throwable throwable) {
logger.info("error......");
}
@Override
public void onCompleted() {
logger.info("commpleted......");
}
});
for (int i = 0;i<10;i++) {
HelloRequest request = HelloRequest.newBuilder().setName(name+i).build();
requestObserver.onNext(request);
Thread.sleep(1000L);
}
}
最终实现java grpc客户端调用go gRPC服务端。
参考
https://github.com/grpc/grpc-java/blob/master/README.md
标签:err,stream,gRPC,grpc,Go,使用指南,服务端,客户端 From: https://blog.csdn.net/zhangj1125/article/details/141331853