官方文档:https://grpc.io/docs/what-is-grpc/introduction/
一 gRPC 允许你定义四种服务方法:
-
一元 RPC,其中客户端向服务器发送单个请求并得到单个响应,就像普通函数调用一样。
rpc SayHello(HelloRequest) returns (HelloResponse);
-
服务器流式 RPC 中,客户端向服务器发送请求并获取流以读取一系列消息。客户端从返回的流中读取,直到没有更多消息。gRPC 保证单个RPC 调用中的消息排序。
rpc LotsOfReplies(HelloRequest) returns (stream HelloResponse);
-
客户端流式 RPC,客户端使用提供的流编写一系列消息并将其发送到服务器。客户端完成编写消息后,它会等待服务器读取消息并返回响应。同样,gRPC 保证了单个 RPC 调用中的消息排序。
rpc LotsOfGreetings(stream HelloRequest) returns (HelloResponse);
-
双向流式RPC,双方使用读写流发送一系列消息。两个流独立运行,因此客户端和服务器可以按任意顺序进行读写:例如,服务器可以等待接收所有客户端消息后再写入响应,也可以交替读取消息然后写入消息,或者采用其他读写组合。每个流中的消息顺序都会保留。
rpc BidiHello(stream HelloRequest) returns (stream HelloResponse);
二 代码样例
1 一元 RPC
其中客户端向服务器发送单个请求并得到单个响应,就像普通函数调用一样
1.1 Greeter.proto文件
syntax = "proto3";
option java_package = "org.apache.demo.v1";
option java_multiple_files = true;
service Greeter {
rpc SayHello(HelloRequest) returns (HelloReply);
}
message HelloRequest {
string name = 1;
}
message HelloReply {
string message = 1;
}
1.2 client端
public class GreeterClient {
public static void main(String[] args) {
// 创建一个通道连接到服务端
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 50051)
.usePlaintext() // 关闭TLS(只在开发时使用)
.build();
// 创建存根
GreeterGrpc.GreeterBlockingStub stub = GreeterGrpc.newBlockingStub(channel);
// 创建请求
HelloRequest request = HelloRequest.newBuilder()
.setName("World")
.build();
// 调用服务
try {
HelloReply response = stub.sayHello(request);
System.out.println("Response: " + response.getMessage());
} catch (StatusRuntimeException e) {
System.err.println("RPC failed: " + e.getStatus());
} finally {
// 关闭通道
channel.shutdown();
}
}
}
1.3 server端
public class GreeterServer {
public static void main(String[] args) throws Exception {
Server server = ServerBuilder.forPort(50051)
.addService(new GreeterImpl())
.build()
.start();
System.out.println("Server started on port 50051");
server.awaitTermination();
}
static class GreeterImpl extends GreeterGrpc.GreeterImplBase {
@Override
public void sayHello(HelloRequest request, StreamObserver<HelloReply> responseObserver) {
String message = "Hello, " + request.getName();
HelloReply reply = HelloReply.newBuilder().setMessage(message).build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
}
}
}
1.4 结果
2 服务器流式 RPC
2.1 StreamingGreeter.proto文件
syntax = "proto3";
option java_package = "org.apache.demo.v1";
option java_multiple_files = true;
import "Greeter.proto";
service StreamingGreeter {
rpc StreamHello(HelloRequest) returns (stream HelloReply);
}
2.2 client端
public class StreamingGreeterClient {
public static void main(String[] args) {
// 创建一个通道连接到服务端
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 50051)
.usePlaintext() // 关闭TLS(只在开发时使用)
.build();
// 创建存根
StreamingGreeterGrpc.StreamingGreeterBlockingStub stub = StreamingGreeterGrpc.newBlockingStub(channel);
// 创建请求
HelloRequest request = HelloRequest.newBuilder()
.setName("World")
.build();
// 调用StreamHello服务
try {
// 服务器会返回多个消息
stub.streamHello(request).forEachRemaining(response -> {
long l = System.currentTimeMillis();
System.out.println(l+":==:Received: " + response.getMessage());
});
} catch (StatusRuntimeException e) {
System.err.println("RPC failed: " + e.getStatus());
} finally {
channel.shutdown();
}
}
}
2.3 server端
/**
* 服务端流式
*/
public class StreamingGreeterServer {
public static void main(String[] args) throws Exception {
Server server = ServerBuilder.forPort(50051)
.addService(new StreamingGreeterImpl())
.build()
.start();
System.out.println("Server started on port 50051");
server.awaitTermination();
}
static class StreamingGreeterImpl extends StreamingGreeterGrpc.StreamingGreeterImplBase {
@Override
public void streamHello(HelloRequest request, StreamObserver<HelloReply> responseObserver) {
for (int i = 0; i < 5; i++) {
HelloReply reply = HelloReply.newBuilder()
.setMessage("Hello, " + request.getName() + " #" + i)
.build();
responseObserver.onNext(reply);
}
responseObserver.onCompleted();
}
}
}
2.4 结果
3 客户端流式 RPC
3.1 ClientStreamingGreeter.proto文件
syntax = "proto3";
option java_package = "org.apache.demo.v1";
option java_multiple_files = true;
import "Greeter.proto";
service ClientStreamingGreeter {
rpc StreamHello(stream HelloRequest) returns (HelloReply);
}
3.2 client端
public class ClientStreamingGreeterClient {
public static void main(String[] args) throws InterruptedException {
// 创建一个通道连接到服务端
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 50052)
.usePlaintext() // 关闭TLS(只在开发时使用)
.build();
final CountDownLatch latch = new CountDownLatch(1);
// 创建存根 异步
ClientStreamingGreeterGrpc.ClientStreamingGreeterStub asyncStub = ClientStreamingGreeterGrpc.newStub(channel);
// 创建请求
StreamObserver<HelloRequest> requestObserver = asyncStub.streamHello(new StreamObserver<HelloReply>() {
@Override
public void onNext(HelloReply value) {
System.out.println("Received: " + value.getMessage());
}
@Override
public void one rror(Throwable t) {
System.err.println("RPC failed: " + t.getMessage());
}
@Override
public void onCompleted() {
System.out.println("Server completed the RPC.");
latch.countDown(); // 完成时释放Latch
}
});
// 客户端发送多个请求
try {
requestObserver.onNext(HelloRequest.newBuilder().setName("John").build());
requestObserver.onNext(HelloRequest.newBuilder().setName("Jane").build());
requestObserver.onNext(HelloRequest.newBuilder().setName("Doe").build());
// 结束请求
requestObserver.onCompleted();
} catch (Exception e) {
System.err.println("Error sending requests: " + e.getMessage());
requestObserver.onError(e);
} finally {
latch.await();
channel.shutdown();
}
}
}
3.2 server端
public class ClientStreamingGreeterServer {
public static void main(String[] args) throws Exception {
Server server = ServerBuilder.forPort(50052)
.addService(new ClientStreamingGreeterImpl())
.build()
.start();
System.out.println("Server started on port 50051");
server.awaitTermination();
}
static class ClientStreamingGreeterImpl extends ClientStreamingGreeterGrpc.ClientStreamingGreeterImplBase {
@Override
public StreamObserver<HelloRequest> streamHello(StreamObserver<HelloReply> responseObserver) {
return new StreamObserver<HelloRequest>() {
int count = 0;
@Override
public void onNext(HelloRequest value) {
System.out.println(value.getName());
count++;
}
@Override
public void one rror(Throwable t) {
}
@Override
public void onCompleted() {
HelloReply reply = HelloReply.newBuilder()
.setMessage("Received " + count + " messages")
.build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
System.out.println(reply.getMessage());
}
};
}
}
}
3.4 结果
4 双向流式RPC
4.1 BidirectionalStreamingGreeter.proto文件
syntax = "proto3";
option java_package = "org.apache.demo.v1";
option java_multiple_files = true;
import "Greeter.proto";
service BidirectionalStreamingGreeter {
rpc StreamHello(stream HelloRequest) returns (stream HelloReply);
}
4.1 client端
public class BidirectionalStreamingGreeterClient {
public static void main(String[] args) throws InterruptedException {
// 创建一个通道连接到服务端
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 50051)
.usePlaintext() // 关闭TLS(只在开发时使用)
.build();
final CountDownLatch latch = new CountDownLatch(1);
// 创建存根 异步
BidirectionalStreamingGreeterGrpc.BidirectionalStreamingGreeterStub stub = BidirectionalStreamingGreeterGrpc.newStub(channel);
// 创建请求
StreamObserver<HelloRequest> requestObserver = stub.streamHello(new StreamObserver<HelloReply>() {
@Override
public void onNext(HelloReply helloReply) {
System.out.println("Received from server: " + helloReply.getMessage());
}
@Override
public void one rror(Throwable throwable) {
}
@Override
public void onCompleted() {
latch.countDown();
System.out.println("Server completed the RPC.");
}
});
// 客户端发送多个请求
try {
requestObserver.onNext(HelloRequest.newBuilder().setName("Alice").build());
requestObserver.onNext(HelloRequest.newBuilder().setName("Bob").build());
requestObserver.onNext(HelloRequest.newBuilder().setName("Charlie").build());
// 结束请求
requestObserver.onCompleted();
} catch (Exception e) {
System.err.println("Error sending requests: " + e.getMessage());
requestObserver.onError(e);
} finally {
latch.await();
channel.shutdown();
}
}
}
4.1 server端
public class BidirectionalStreamingGreeterServer {
public static void main(String[] args) throws InterruptedException, IOException {
Server server = ServerBuilder.forPort(50051)
.addService(new BidirectionalStreamingGreeterImpl())
.build()
.start();
System.out.println("Server started on port 50051");
server.awaitTermination();
}
static class BidirectionalStreamingGreeterImpl extends BidirectionalStreamingGreeterGrpc.BidirectionalStreamingGreeterImplBase{
@Override
public StreamObserver<HelloRequest> streamHello(StreamObserver<HelloReply> responseObserver) {
return new StreamObserver<HelloRequest>() {
@Override
public void onNext(HelloRequest helloRequest) {
HelloReply reply = HelloReply.newBuilder()
.setMessage("Hello, " + helloRequest.getName())
.build();
responseObserver.onNext(reply);
}
@Override
public void one rror(Throwable throwable) {
}
@Override
public void onCompleted() {
responseObserver.onCompleted();
}
};
}
}
}