protobuff定义:
syntax = "proto3";
package com.liyao;
option java_package = "com.liyao.protobuf.test.service";
option java_outer_classname = "MyServiceProto";
option java_multiple_files = true;
message MyRequest {
repeated uint32 keys = 1;
}
message MyResponse {
string value = 1;
}
service MyService {
rpc GetByKey (MyRequest) returns (MyResponse);
rpc GetByKeyServerStream (MyRequest) returns (stream MyResponse);
rpc GetByKeyClientStream (stream MyRequest) returns (MyResponse);
rpc GetByKeyBiStream (stream MyRequest) returns (stream MyResponse);
}
服务比较简单,请求包含一个int的list,返回对应的key。
服务端实现类:
public class MyRpcServiceImpl extends MyServiceGrpc.MyServiceImplBase {
private final Map<Integer, String> map = ImmutableMap.<Integer, String>builder()
.put(1, "v1")
.put(2, "v2")
.put(3, "v3")
.put(4, "v4")
.put(5, "v5")
.build();
@Override
public void getByKey(MyRequest request, StreamObserver<MyResponse> responseObserver) {
int key = request.getKeys(0);
String value = map.getOrDefault(key, "null");
responseObserver.onNext(MyResponse.newBuilder().setValue(value).build());
responseObserver.onCompleted();
}
@Override
public void getByKeyServerStream(MyRequest request, StreamObserver<MyResponse> responseObserver) {
for (int key : request.getKeysList()) {
String value = map.getOrDefault(key, "null");
responseObserver.onNext(MyResponse.newBuilder().setValue(value).build());
}
responseObserver.onCompleted();
}
@Override
public StreamObserver<MyRequest> getByKeyClientStream(StreamObserver<MyResponse> responseObserver) {
return new StreamObserver<MyRequest>() {
String values = "";
@Override
public void onNext(MyRequest myRequest) {
int key = myRequest.getKeys(0);
values += map.getOrDefault(key, "null");
}
@Override
public void one rror(Throwable throwable) {
}
@Override
public void onCompleted() {
responseObserver.onNext(MyResponse.newBuilder().setValue(values).build());
responseObserver.onCompleted();
}
};
}
@Override
public StreamObserver<MyRequest> getByKeyBiStream(StreamObserver<MyResponse> responseObserver) {
return new StreamObserver<MyRequest>() {
@Override
public void onNext(MyRequest myRequest) {
int key = myRequest.getKeys(0);
String value = map.getOrDefault(key, "null");
responseObserver.onNext(MyResponse.newBuilder().setValue(value).build());
}
@Override
public void one rror(Throwable throwable) {
}
@Override
public void onCompleted() {
responseObserver.onCompleted();
}
};
}
}
服务端启动类:
public class RpcServer {
public static final int port = 8088;
public static void main( String[] args ) throws IOException, InterruptedException {
MyRpcServiceImpl service = new MyRpcServiceImpl();
Server server = io.grpc.ServerBuilder
.forPort(port)
.addService(service)
.build();
server.start();
server.awaitTermination();
}
}
客户端启动类:
public class RpcClient {
private static ManagedChannel channel = ManagedChannelBuilder
.forAddress("127.0.0.1", RpcServer.port)
.usePlaintext()
.build();
private static MyServiceGrpc.MyServiceBlockingStub blockingStub = MyServiceGrpc.newBlockingStub(channel);
private static MyServiceGrpc.MyServiceStub asyncStub = MyServiceGrpc.newStub(channel);
private static final StreamObserver<MyResponse> responseObserver = new StreamObserver<MyResponse>() {
@Override
public void onNext(MyResponse response) {
System.out.println("receive: " + response.getValue());
}
@Override
public void one rror(Throwable t) {
System.out.println("error");
}
@Override
public void onCompleted() {
System.out.println("completed");
}
};
public static void main(String[] args) throws InterruptedException {
simpleSync();
simpleAsync();
serverStreamSync();
serverStreamAsync();
clientStream();
biStream();
Thread.sleep(100000);
}
private static void simpleSync() {
MyRequest request = MyRequest.newBuilder()
.addKeys(1)
.build();
String value = blockingStub.getByKey(request).getValue();
System.out.println(value);
}
private static void simpleAsync() {
MyRequest request = MyRequest.newBuilder()
.addKeys(1)
.build();
asyncStub.getByKey(request, responseObserver);
}
private static void serverStreamSync() {
MyRequest request = MyRequest.newBuilder()
.addKeys(1)
.addKeys(2)
.addKeys(3)
.build();
Iterator<MyResponse> itr = blockingStub.getByKeyServerStream(request);
while (itr.hasNext()) {
System.out.println(itr.next());
}
}
private static void serverStreamAsync() {
MyRequest request = MyRequest.newBuilder()
.addKeys(1)
.addKeys(2)
.addKeys(3)
.build();
asyncStub.getByKeyServerStream(request, responseObserver);
}
private static void clientStream() {
StreamObserver<MyRequest> requestData = asyncStub.getByKeyClientStream(responseObserver);
for (int i = 1; i <= 5; i++) {
requestData.onNext(MyRequest.newBuilder().addKeys(i).build());
}
requestData.onCompleted();
}
private static void biStream() {
StreamObserver<MyRequest> requestData = asyncStub.getByKeyBiStream(responseObserver);
for (int i = 1; i <= 5; i++) {
requestData.onNext(MyRequest.newBuilder().addKeys(i).build());
}
requestData.onCompleted();
}
}
对于同步stub,只能调用unary以及服务端stream的方法;对于异步stub,可以调用任意方法;
unary以及服务端stream写法比较简单;对于客户端stream的情况,需要在构建请求参数的observer。
\
标签:stream,示例,void,unary,MyRequest,Override,static,responseObserver,public From: https://blog.51cto.com/u_15873544/5844593