首页 > 编程语言 >【Grpc(一)】Java 何如理解StreamObserver?

【Grpc(一)】Java 何如理解StreamObserver?

时间:2022-11-11 12:35:50浏览次数:52  
标签:StreamObserver Java Grpc void 观察者 返回值 回调


刚开始接触Grpc时,桩代码里有许多StreamObserver类型,不太清楚是怎么用的,这里做一个记录。

首先看下StreamObserver接口定义:

public interface StreamObserver<V>  {
void onNext(V value);
void one rror(Throwable t);
void onCompleted();
}

可以看到,这是一个泛型化的回调接口,并且从命名上看,该接口使用了观察者模式。提到回调,或者提到观察者模式,首先可以想到,这里面涉及到两个角色:观察者和被观察者。被观察者负责在某一事件发生时调用回调函数通知观察者,观察者负责提供回调函数。对于需要被观察的事件,这里是作为回调接口里的泛型变量存在的。这就是这个接口的使用流程。下面结合Grpc里的几个使用场景来看下。
1.unary模式服务端方法返回值例子:
我们可以看一个简单的unary模式的返回值:

@Override
public void getByKey(MyRequest request, StreamObserver<MyResponse> responseObserver) {
int key = request.getKey();
String value = ""; // 计算逻辑

responseObserver.onNext(MyResponse.newBuilder().setValue(value).build());
responseObserver.onCompleted();
}

该实现方法里定义了一个MyResponse类型的返回值,这个返回值是被关注的对象。方法参数里有一个StreamObserver<MyResponse>的变量,该变量就是回调函数。回调函数由谁提供?也就是观察者是谁?答案是Grpc框架。可以理解为Grpc框架层在关注MyResponse类型的返回值的生成,然后使用协议层及io层做数据发送。我们可以大致看一下回调函数的实现,比如onNext方法:

@Override
public void onNext(RespT response) {
if (cancelled) {
if (onCancelHandler == null) {
throw Status.CANCELLED.withDescription("call already cancelled").asRuntimeException();
}
return;
}
checkState(!aborted, "Stream was terminated by error, no further calls are allowed");
checkState(!completed, "Stream is already completed, no further calls are allowed");
if (!sentHeaders) {
call.sendHeaders(new Metadata());
sentHeaders = true;
}
call.sendMessage(response);
}

实现类是ServerCallStreamObserverImpl,其中的onNext、onError以及onComplete方法均会调用内部的ServerCall实例发送消息。具体发送逻辑实现这里不做深入,总之可以明确的是,这里的StreamObserver回调接口,其实现逻辑就是将消息发送至客户端,这就是观察者的逻辑。再看下被观察者,也就是StreamObserver回调接口的调用方。其实就是实现类里返回值的生成的逻辑,我们需要根据request取到参数,然后生成返回值,调用StreamObserver回调接口,来通知Grpc框架层发送返回值。至此服务端实现方法里的StreamObserver已经清晰了:被观察的对象就是返回值,Grpc框架层是观察者,提供发送逻辑作为回调函数,实现类是被观察者,每一次返回值的生成都会调用回调函数通知Grpc。还有一点,StreamObserver接口的定义其实和stream息息相关。我们知道stream模式意味着可以在一个连接中发送多条消息,所以该接口提供了onNext回调函数,该函数可以被多次调用,每一次对onNext的调用都代表一条消息的发送。如果全部发送完了或者发送出错,那么就需要调用onError或者onComplete来告知对方本次stream已经结束。所以该接口的设计也与stream的概念也完全契合。

2.流式客户端例子:
客户端流式与双端流式类似,这里只看客户端流式。先看客户端代码:

private static void clientStream() {
StreamObserver<MyResponse> responseData = new StreamObserver<MyResponse>() {
@Override
public void onNext(MyResponse response) {
System.out.println("res: " + response.getValue());
}

@Override
public void one rror(Throwable t) {
}

@Override
public void onCompleted() {
System.out.println("onCompleted");
}
};
StreamObserver<MyRequest> requestData = asyncStub.getByKeyClientStream(responseData);
for (int i = 0; i < 10; i++) {
requestData.onNext(MyRequest.newBuilder().setKey(i).build());
}
requestData.onCompleted();
}

可以看到,客户端侧,需要创建两个observer分别用于处理请求与返回值。初次看可能比较懵逼,入参数返回值类型,并且返回了一个请求???接下来具体看下,对于请求observer,回调接口由Grpc框架层提供,实现类见CallToStreamObserverAdapter,也就是观察者,所做的事情就是将请求数据发送到服务端。回调函数的调用方,也即被观察者,是我们提供的业务代码,作用就是生成请求数据,并调用回调函数。这个与unary模式下服务端返回值是类似的。对于返回值observer,回调函数由我们来提供,也即观察者,作用是处理返回值。回调函数调用方是Grpc框架,监听到有返回数据时,就调用回调函数,通知客户端处理返回值。
服务端侧代码:

@Override
public StreamObserver<MyRequest> getByKeyClientStream(StreamObserver<MyResponse> responseObserver) {
return new StreamObserver<MyRequest>() {
int count = 0;

@Override
public void onNext(MyRequest myRequest) {
System.out.println("recv: " + myRequest.getKey());
count++;
}

@Override
public void one rror(Throwable throwable) {

}

@Override
public void onCompleted() {
responseObserver.onNext(MyResponse.newBuilder().setValue("count: " + count).build());
responseObserver.onCompleted();
}
};
}

方法返回值是请求类型的observer。回调函数有我们来提供,作用是处理接收到的请求,也就是观察者逻辑。回调函数的调用方式Grpc框架层,监听到有请求到达时,就会调用我们提供的回调函数,通知处理请求。对于返回值的observer与前边类似。

围绕着观察者与被观察者两个角色看待Grpc里的调用方式,就会清晰很多。

总结:

1.StreamObserver接口使用了观察者模式的概念,与stream流模式完全契合,一次onnext调用代表stream内一次发送;

2.对于需要接受数据的场景,Grpc框架是被观察者;对于需要发送数据的场景,Grpc框架是观察者;

标签:StreamObserver,Java,Grpc,void,观察者,返回值,回调
From: https://blog.51cto.com/u_15873544/5844590

相关文章

  • 【Java】随机数原理 Random ThreadLocalRandom
    大致生成原理:随机数由seed经过一定的转换生成。需要提供初始seed。每一次生成随机数时,先由老seed生成新seed,再根据新seed生成新的随机数。由于算法是固定的,所以如果初始seed......
  • 【docker】Java应用 容器内存管理 -XX:+UseContainerSupport
    早期时候,容器内运行Java应用程序时,Jvm无法感知容器环境存在,所以对容器资源的限制比如内存或者cpu等都无法生效。原因是容器的资源管理使用了操作系统cgroup机制,但是Jvm无法......
  • 【Grpc(二)】两种stub, 四种模式(unary,客户端stream,服务端strea)示例
    protobuff定义:syntax="proto3";packagecom.liyao;optionjava_package="com.liyao.protobuf.test.service";optionjava_outer_classname="MyServiceProto";optionj......
  • 【Java】Instrumentation热更新 premain agentmain
    有两种办法:1)在java5中,可以利用jvm加载类的一个扩展点来实现类文件的动态修改。需要提供一个premain方法。缺点是只能在类文件加载且main方法执行之前修改,无法实现真正的运行......
  • 【zookeeper】java API 例子
    之前体验了命令行客户端,这次看一下javaAPI操作zk。server还是按照之前的配置,一个server1,server2和server3的伪集群。maven:这里使用maven管理zk的jar包,大致需要zk的jar和日......
  • 狂神说Javase基础学习1
    狂神学习博客1基本的DOS命令打开CMD的方式1.开始+系统+命令提示符2.win+R,进入运行窗口,输入cmd打开控制台3.在任意的文件夹下面,Shift+鼠标右键,进入命令行窗口4.资源管......
  • 【Java】内存模型 volatile
    java堆存储对象和数组,是一块线程共享数据区,但是实际线程运行的时候,对于用到的对象都会在线程私有空间即虚拟机栈保存一个副本,为了效率。这两快内存叫主内存和工作内存。java......
  • 【Java】内存区域与对象创建
    这块内容是java很基础的部分,涉及到JVM的设计原理,很久以前就看到过,这次需要区分线程私有和共享基本java的运行时数据区可以分为五大块:程序计数器,为线程私有,每一个线程都有一......
  • 【Java】split(
    java的split函数接受一个正则表达式的分隔符为参数,将string按照分隔符划分为一个数组。我们可能会忽略这个参数的要求,这里传入的分隔符并不是一个普通的字符串,而是一个正则......
  • 【Java】多线程 数目
    今天看到一篇文章,讲多线程数目的,很棒这个问题还是很容易被忽略的,就是多线程到底是为了什么?最开始学习多线程的时候,往往将多线程和性能高划等号,只要用了多线程就能提升性能,其......