首页 > 其他分享 >【Grpc(二)】两种stub, 四种模式(unary,客户端stream,服务端strea)示例

【Grpc(二)】两种stub, 四种模式(unary,客户端stream,服务端strea)示例

时间:2022-11-11 12:34:12浏览次数:45  
标签:stream 示例 void unary MyRequest Override static responseObserver public


protobuff定义:

syntax = "proto3";

package com.liyao;

option java_package = "com.liyao.protobuf.test.service";
option java_outer_classname = "MyServiceProto";
option java_multiple_files = true;

message MyRequest {
repeated uint32 keys = 1;
}

message MyResponse {
string value = 1;
}

service MyService {
rpc GetByKey (MyRequest) returns (MyResponse);
rpc GetByKeyServerStream (MyRequest) returns (stream MyResponse);
rpc GetByKeyClientStream (stream MyRequest) returns (MyResponse);
rpc GetByKeyBiStream (stream MyRequest) returns (stream MyResponse);
}

服务比较简单,请求包含一个int的list,返回对应的key。

服务端实现类:

public class MyRpcServiceImpl extends MyServiceGrpc.MyServiceImplBase {

private final Map<Integer, String> map = ImmutableMap.<Integer, String>builder()
.put(1, "v1")
.put(2, "v2")
.put(3, "v3")
.put(4, "v4")
.put(5, "v5")
.build();

@Override
public void getByKey(MyRequest request, StreamObserver<MyResponse> responseObserver) {
int key = request.getKeys(0);
String value = map.getOrDefault(key, "null");

responseObserver.onNext(MyResponse.newBuilder().setValue(value).build());
responseObserver.onCompleted();
}

@Override
public void getByKeyServerStream(MyRequest request, StreamObserver<MyResponse> responseObserver) {
for (int key : request.getKeysList()) {
String value = map.getOrDefault(key, "null");
responseObserver.onNext(MyResponse.newBuilder().setValue(value).build());
}
responseObserver.onCompleted();
}

@Override
public StreamObserver<MyRequest> getByKeyClientStream(StreamObserver<MyResponse> responseObserver) {
return new StreamObserver<MyRequest>() {
String values = "";
@Override
public void onNext(MyRequest myRequest) {
int key = myRequest.getKeys(0);
values += map.getOrDefault(key, "null");
}

@Override
public void one rror(Throwable throwable) {

}

@Override
public void onCompleted() {
responseObserver.onNext(MyResponse.newBuilder().setValue(values).build());
responseObserver.onCompleted();
}
};
}

@Override
public StreamObserver<MyRequest> getByKeyBiStream(StreamObserver<MyResponse> responseObserver) {
return new StreamObserver<MyRequest>() {
@Override
public void onNext(MyRequest myRequest) {
int key = myRequest.getKeys(0);
String value = map.getOrDefault(key, "null");
responseObserver.onNext(MyResponse.newBuilder().setValue(value).build());
}

@Override
public void one rror(Throwable throwable) {

}

@Override
public void onCompleted() {
responseObserver.onCompleted();
}
};
}
}

服务端启动类:

public class RpcServer {
public static final int port = 8088;

public static void main( String[] args ) throws IOException, InterruptedException {
MyRpcServiceImpl service = new MyRpcServiceImpl();
Server server = io.grpc.ServerBuilder
.forPort(port)
.addService(service)
.build();
server.start();
server.awaitTermination();
}
}

客户端启动类:

public class RpcClient {
private static ManagedChannel channel = ManagedChannelBuilder
.forAddress("127.0.0.1", RpcServer.port)
.usePlaintext()
.build();

private static MyServiceGrpc.MyServiceBlockingStub blockingStub = MyServiceGrpc.newBlockingStub(channel);
private static MyServiceGrpc.MyServiceStub asyncStub = MyServiceGrpc.newStub(channel);

private static final StreamObserver<MyResponse> responseObserver = new StreamObserver<MyResponse>() {
@Override
public void onNext(MyResponse response) {
System.out.println("receive: " + response.getValue());
}

@Override
public void one rror(Throwable t) {
System.out.println("error");
}

@Override
public void onCompleted() {
System.out.println("completed");
}
};

public static void main(String[] args) throws InterruptedException {
simpleSync();
simpleAsync();
serverStreamSync();
serverStreamAsync();
clientStream();
biStream();

Thread.sleep(100000);
}

private static void simpleSync() {
MyRequest request = MyRequest.newBuilder()
.addKeys(1)
.build();
String value = blockingStub.getByKey(request).getValue();
System.out.println(value);
}

private static void simpleAsync() {
MyRequest request = MyRequest.newBuilder()
.addKeys(1)
.build();
asyncStub.getByKey(request, responseObserver);
}

private static void serverStreamSync() {
MyRequest request = MyRequest.newBuilder()
.addKeys(1)
.addKeys(2)
.addKeys(3)
.build();
Iterator<MyResponse> itr = blockingStub.getByKeyServerStream(request);
while (itr.hasNext()) {
System.out.println(itr.next());
}
}

private static void serverStreamAsync() {
MyRequest request = MyRequest.newBuilder()
.addKeys(1)
.addKeys(2)
.addKeys(3)
.build();
asyncStub.getByKeyServerStream(request, responseObserver);
}

private static void clientStream() {
StreamObserver<MyRequest> requestData = asyncStub.getByKeyClientStream(responseObserver);
for (int i = 1; i <= 5; i++) {
requestData.onNext(MyRequest.newBuilder().addKeys(i).build());
}
requestData.onCompleted();
}

private static void biStream() {
StreamObserver<MyRequest> requestData = asyncStub.getByKeyBiStream(responseObserver);
for (int i = 1; i <= 5; i++) {
requestData.onNext(MyRequest.newBuilder().addKeys(i).build());
}
requestData.onCompleted();
}
}

对于同步stub,只能调用unary以及服务端stream的方法;对于异步stub,可以调用任意方法;

unary以及服务端stream写法比较简单;对于客户端stream的情况,需要在构建请求参数的observer。

\

标签:stream,示例,void,unary,MyRequest,Override,static,responseObserver,public
From: https://blog.51cto.com/u_15873544/5844593

相关文章

  • SparkStreaming_Dstream创建
    SparkStreaming原生支持一些不同的数据源。一些“核心”数据源已经被打包到SparkStreaming的Maven工件中,而其他的一些则可以通过spark-streaming-kafka等附加工件获......
  • 拓端数据|R语言代写阈值模型代码示例
    阈值模型用于统计的几个不同区域,而不仅仅是时间序列。一般的想法是,当变量的值超过某个阈值时,过程可能表现不同。也就是说,当值大于阈值时,可以应用不同的模型,而不是当它们低于......
  • Stream数据序列
    Stream类用于处理字符数据流或二进制数据流。1. Serial.available();  如果串口接收到信息返回TRUE,否则返回FALSE2. Serial.readString();    //读取串口接收......
  • C语言函数指针示例
      1#include<stdio.h>23doubleadd(doublea,doubleb)4{5returna+b;6}78doublesub(doublea,doubleb)9{10returna-......
  • Content type 'application/octet-stream' not supported for bodyType=boolean",
    @PostMapping("upload")publicResultadd(@RequestPart(value="startTime")StringstartTime,@RequestPart(value="id")Strin......
  • 记录一下Stream流的一个坑
    List<String>list=newArrayList<>();booleana=list.stream().anyMatch("a"::equals);//Ifthestreamisemptythenfalseisreturnedandthepredi......
  • Stream
    Stream概括的的将Stream可以分为三种类型创建Stream流:主要负责新建一个Stream流,或者基于现有的数组、List、Set、Map等集合类型对象创建出新的Stream流。API功......
  • Git: delete all branches without upstream
     #!/usr/bin/envbash#deleteallbrancheswithoutupstreamwhilereadbranch;doupstream=$(gitrev-parse--abbrev-ref$branch@{upstream}2>/dev/null)......
  • Redis Stream
    RedisStream是Redis5.0版本新增加的数据结构。RedisStream主要用于消息队列(MQ,MessageQueue),Redis本身是有一个Redis发布订阅(pub/sub)来实现消息队列的功能,但......
  • ideavimrc 示例
    """Mapleadertospace---------------------letmapleader="""""mac设置normal模式,为英文输入letkeep_input_source_in_normal="com.apple.keylayout.ABC""""......