刚开始接触Grpc时,桩代码里有许多StreamObserver类型,不太清楚是怎么用的,这里做一个记录。
首先看下StreamObserver接口定义:
public interface StreamObserver<V> {
void onNext(V value);
void one rror(Throwable t);
void onCompleted();
}
可以看到,这是一个泛型化的回调接口,并且从命名上看,该接口使用了观察者模式。提到回调,或者提到观察者模式,首先可以想到,这里面涉及到两个角色:观察者和被观察者。被观察者负责在某一事件发生时调用回调函数通知观察者,观察者负责提供回调函数。对于需要被观察的事件,这里是作为回调接口里的泛型变量存在的。这就是这个接口的使用流程。下面结合Grpc里的几个使用场景来看下。
1.unary模式服务端方法返回值例子:
我们可以看一个简单的unary模式的返回值:
@Override
public void getByKey(MyRequest request, StreamObserver<MyResponse> responseObserver) {
int key = request.getKey();
String value = ""; // 计算逻辑
responseObserver.onNext(MyResponse.newBuilder().setValue(value).build());
responseObserver.onCompleted();
}
该实现方法里定义了一个MyResponse类型的返回值,这个返回值是被关注的对象。方法参数里有一个StreamObserver<MyResponse>的变量,该变量就是回调函数。回调函数由谁提供?也就是观察者是谁?答案是Grpc框架。可以理解为Grpc框架层在关注MyResponse类型的返回值的生成,然后使用协议层及io层做数据发送。我们可以大致看一下回调函数的实现,比如onNext方法:
@Override
public void onNext(RespT response) {
if (cancelled) {
if (onCancelHandler == null) {
throw Status.CANCELLED.withDescription("call already cancelled").asRuntimeException();
}
return;
}
checkState(!aborted, "Stream was terminated by error, no further calls are allowed");
checkState(!completed, "Stream is already completed, no further calls are allowed");
if (!sentHeaders) {
call.sendHeaders(new Metadata());
sentHeaders = true;
}
call.sendMessage(response);
}
实现类是ServerCallStreamObserverImpl,其中的onNext、onError以及onComplete方法均会调用内部的ServerCall实例发送消息。具体发送逻辑实现这里不做深入,总之可以明确的是,这里的StreamObserver回调接口,其实现逻辑就是将消息发送至客户端,这就是观察者的逻辑。再看下被观察者,也就是StreamObserver回调接口的调用方。其实就是实现类里返回值的生成的逻辑,我们需要根据request取到参数,然后生成返回值,调用StreamObserver回调接口,来通知Grpc框架层发送返回值。至此服务端实现方法里的StreamObserver已经清晰了:被观察的对象就是返回值,Grpc框架层是观察者,提供发送逻辑作为回调函数,实现类是被观察者,每一次返回值的生成都会调用回调函数通知Grpc。还有一点,StreamObserver接口的定义其实和stream息息相关。我们知道stream模式意味着可以在一个连接中发送多条消息,所以该接口提供了onNext回调函数,该函数可以被多次调用,每一次对onNext的调用都代表一条消息的发送。如果全部发送完了或者发送出错,那么就需要调用onError或者onComplete来告知对方本次stream已经结束。所以该接口的设计也与stream的概念也完全契合。
2.流式客户端例子:
客户端流式与双端流式类似,这里只看客户端流式。先看客户端代码:
private static void clientStream() {
StreamObserver<MyResponse> responseData = new StreamObserver<MyResponse>() {
@Override
public void onNext(MyResponse response) {
System.out.println("res: " + response.getValue());
}
@Override
public void one rror(Throwable t) {
}
@Override
public void onCompleted() {
System.out.println("onCompleted");
}
};
StreamObserver<MyRequest> requestData = asyncStub.getByKeyClientStream(responseData);
for (int i = 0; i < 10; i++) {
requestData.onNext(MyRequest.newBuilder().setKey(i).build());
}
requestData.onCompleted();
}
可以看到,客户端侧,需要创建两个observer分别用于处理请求与返回值。初次看可能比较懵逼,入参数返回值类型,并且返回了一个请求???接下来具体看下,对于请求observer,回调接口由Grpc框架层提供,实现类见CallToStreamObserverAdapter,也就是观察者,所做的事情就是将请求数据发送到服务端。回调函数的调用方,也即被观察者,是我们提供的业务代码,作用就是生成请求数据,并调用回调函数。这个与unary模式下服务端返回值是类似的。对于返回值observer,回调函数由我们来提供,也即观察者,作用是处理返回值。回调函数调用方是Grpc框架,监听到有返回数据时,就调用回调函数,通知客户端处理返回值。
服务端侧代码:
@Override
public StreamObserver<MyRequest> getByKeyClientStream(StreamObserver<MyResponse> responseObserver) {
return new StreamObserver<MyRequest>() {
int count = 0;
@Override
public void onNext(MyRequest myRequest) {
System.out.println("recv: " + myRequest.getKey());
count++;
}
@Override
public void one rror(Throwable throwable) {
}
@Override
public void onCompleted() {
responseObserver.onNext(MyResponse.newBuilder().setValue("count: " + count).build());
responseObserver.onCompleted();
}
};
}
方法返回值是请求类型的observer。回调函数有我们来提供,作用是处理接收到的请求,也就是观察者逻辑。回调函数的调用方式Grpc框架层,监听到有请求到达时,就会调用我们提供的回调函数,通知处理请求。对于返回值的observer与前边类似。
围绕着观察者与被观察者两个角色看待Grpc里的调用方式,就会清晰很多。
总结:
1.StreamObserver接口使用了观察者模式的概念,与stream流模式完全契合,一次onnext调用代表stream内一次发送;
2.对于需要接受数据的场景,Grpc框架是被观察者;对于需要发送数据的场景,Grpc框架是观察者;
标签:StreamObserver,Java,Grpc,void,观察者,返回值,回调 From: https://blog.51cto.com/u_15873544/5844590