1. gRPC 简介
gRPC 是一种高性能、开源和通用的 RPC 框架,支持多种编程语言。在 gRPC 中,有四种类型的 RPC,分别是 Unary RPC、Server Streaming RPC、Client Streaming RPC 和 Bidirectional Streaming RPC。
-
Unary RPC:一元 RPC
一元 RPC 是最简单的 RPC 类型,它是一种单向的请求-响应模式。客户端向服务端发送一个请求,并等待服务端响应。 -
Server Streaming RPC:服务器流式 RPC
服务器流式 RPC 是一种服务端主动向客户端发送流式数据的 RPC 类型。在这种 RPC 类型中,客户端向服务端发送一条请求消息,并等待服务端的响应。与 Unary RPC 不同的是,服务端在响应客户端的请求消息后,会向客户端发送一系列的响应消息,客户端在接收到消息后可以进行相应的处理。 -
Client Streaming RPC:客户端流式 RPC
客户端流式 RPC 是一种客户端主动向服务端发送流式数据的 RPC 类型。在这种 RPC 类型中,客户端可以向服务端发送多条请求消息,服务端在接收到请求消息后进行相应的处理,并最终向客户端发送一条响应消息。 -
Bidirectional Streaming RPC:双向流式 RPC
双向流式 RPC 是一种双向流式数据传输的 RPC 类型。在这种 RPC 类型中,客户端和服务端都可以主动向对方发送数据,并可以同时接收对方发送的数据。
若要实现服务端的消息推送,应该使用 Server Streaming RPC。
2. 代码示例
(1) 编写 proto 文件
syntax = "proto3";
package com.example.grpcdemo;
option java_multiple_files = true;
// 定义传输的消息格式
message Message {
string content = 1;
}
// 定义服务接口
service MessageService {
// 定义服务方法,客户端向服务端发送一个请求,并返回一个流式响应
rpc streamMessages (Message) returns (stream Message);
}
(2) 实现服务端
public class MessageServiceImpl extends MessageServiceGrpc.MessageServiceImplBase {
@Override
public void streamMessages(Message request, final StreamObserver<Message> responseObserver) {
System.out.println("Received message: " + request.getContent());
// 启动一个定时任务,每秒钟向客户端发送一条消息
Timer timer = new Timer();
timer.schedule(new TimerTask() {
int count = 0;
@Override
public void run() {
String message = "Message " + count++;
System.out.println("Sending message: " + message);
// 构造消息对象并发送给客户端
Message response = Message.newBuilder().setContent(message).build();
responseObserver.onNext(response);
}
}, 0, 1000);
}
public class MessageServer {
public static void main(String[] args) throws IOException, InterruptedException {
int port = 8080;
// 创建 gRPC 服务器
Server server = ServerBuilder.forPort(port)
.addService(new MessageServiceImpl())
.build();
// 启动 gRPC 服务器
server.start();
// 阻塞线程以等待关闭信号
System.out.println("Server started at port " + port);
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
System.out.println("Server stopped.");
server.shutdown();
}));
server.awaitTermination();
}
}
(3) 实现客户端
public class MessageClient {
public static void main(String[] args) throws InterruptedException {
// 创建 gRPC 通道和存根
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 8080)
.usePlaintext()
.build();
MessageServiceGrpc.MessageServiceStub stub = MessageServiceGrpc.newStub(channel);
// 创建请求对象
Message request = Message.newBuilder().setContent("Hello, server!").build();
// 调用服务方法,并接收来自服务端推送的消息
stub.streamMessages(request, new StreamObserver<Message>() {
@Override
public void onNext(Message response) {
System.out.println("Received message: " + response.getContent());
}
@Override
public void one rror(Throwable t) {
t.printStackTrace();
}
@Override
public void onCompleted() {
System.out.println("Server closed the stream.");
}
});
// 阻塞线程以等待来自服务端推送的消息
Thread.sleep(10000);
// 关闭gRPC通道
channel.shutdown();
}
}
服务端输出如下:
Server started at port 8080
Received message: Hello, server!
Sending message: Message 0
Sending message: Message 1
Sending message: Message 2
Sending message: Message 3
Sending message: Message 4
Sending message: Message 5
客户端输出如下:
Received message: Message 0
Received message: Message 1
Received message: Message 2
Received message: Message 3
Received message: Message 4
Received message: Message 5
标签:gRPC,public,RPC,message,推送,Message,服务端,客户端
From: https://www.cnblogs.com/dalelee/p/17393615.html