首页 > 其他分享 >gRPC 实现服务端消息推送

gRPC 实现服务端消息推送

时间:2023-05-12 11:55:05浏览次数:48  
标签:gRPC public RPC message 推送 Message 服务端 客户端

1. gRPC 简介

gRPC 是一种高性能、开源和通用的 RPC 框架,支持多种编程语言。在 gRPC 中,有四种类型的 RPC,分别是 Unary RPC、Server Streaming RPC、Client Streaming RPC 和 Bidirectional Streaming RPC。

  1. Unary RPC:一元 RPC
    一元 RPC 是最简单的 RPC 类型,它是一种单向的请求-响应模式。客户端向服务端发送一个请求,并等待服务端响应。

  2. Server Streaming RPC:服务器流式 RPC
    服务器流式 RPC 是一种服务端主动向客户端发送流式数据的 RPC 类型。在这种 RPC 类型中,客户端向服务端发送一条请求消息,并等待服务端的响应。与 Unary RPC 不同的是,服务端在响应客户端的请求消息后,会向客户端发送一系列的响应消息,客户端在接收到消息后可以进行相应的处理。

  3. Client Streaming RPC:客户端流式 RPC
    客户端流式 RPC 是一种客户端主动向服务端发送流式数据的 RPC 类型。在这种 RPC 类型中,客户端可以向服务端发送多条请求消息,服务端在接收到请求消息后进行相应的处理,并最终向客户端发送一条响应消息。

  4. Bidirectional Streaming RPC:双向流式 RPC
    双向流式 RPC 是一种双向流式数据传输的 RPC 类型。在这种 RPC 类型中,客户端和服务端都可以主动向对方发送数据,并可以同时接收对方发送的数据。

若要实现服务端的消息推送,应该使用 Server Streaming RPC。

2. 代码示例

(1) 编写 proto 文件

syntax = "proto3";

package com.example.grpcdemo;
option java_multiple_files = true;
// 定义传输的消息格式
message Message {
  string content = 1;
}

// 定义服务接口
service MessageService {
  // 定义服务方法,客户端向服务端发送一个请求,并返回一个流式响应
  rpc streamMessages (Message) returns (stream Message);
}

(2) 实现服务端

public class MessageServiceImpl extends MessageServiceGrpc.MessageServiceImplBase {
    @Override
    public void streamMessages(Message request, final StreamObserver<Message> responseObserver) {
        System.out.println("Received message: " + request.getContent());
        // 启动一个定时任务,每秒钟向客户端发送一条消息
        Timer timer = new Timer();
        timer.schedule(new TimerTask() {
            int count = 0;

            @Override
            public void run() {
                String message = "Message " + count++;
                System.out.println("Sending message: " + message);

                // 构造消息对象并发送给客户端
                Message response = Message.newBuilder().setContent(message).build();
                responseObserver.onNext(response);
            }
        }, 0, 1000);
    }
public class MessageServer {
    public static void main(String[] args) throws IOException, InterruptedException {
        int port = 8080;
        // 创建 gRPC 服务器
        Server server = ServerBuilder.forPort(port)
                .addService(new MessageServiceImpl())
                .build();

        // 启动 gRPC 服务器
        server.start();

        // 阻塞线程以等待关闭信号
        System.out.println("Server started at port " + port);
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            System.out.println("Server stopped.");
            server.shutdown();
        }));
        server.awaitTermination();
    }
}

(3) 实现客户端

public class MessageClient {
    public static void main(String[] args) throws InterruptedException {
        // 创建 gRPC 通道和存根
        ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 8080)
                .usePlaintext()
                .build();
        MessageServiceGrpc.MessageServiceStub stub = MessageServiceGrpc.newStub(channel);

        // 创建请求对象
        Message request = Message.newBuilder().setContent("Hello, server!").build();

        // 调用服务方法,并接收来自服务端推送的消息
        stub.streamMessages(request, new StreamObserver<Message>() {
            @Override
            public void onNext(Message response) {
                System.out.println("Received message: " + response.getContent());
            }

            @Override
            public void one rror(Throwable t) {
                t.printStackTrace();
            }

            @Override
            public void onCompleted() {
                System.out.println("Server closed the stream.");
            }
        });

        // 阻塞线程以等待来自服务端推送的消息
        Thread.sleep(10000);

        // 关闭gRPC通道
        channel.shutdown();
    }
}

服务端输出如下:

Server started at port 8080
Received message: Hello, server!
Sending message: Message 0
Sending message: Message 1
Sending message: Message 2
Sending message: Message 3
Sending message: Message 4
Sending message: Message 5

客户端输出如下:

Received message: Message 0
Received message: Message 1
Received message: Message 2
Received message: Message 3
Received message: Message 4
Received message: Message 5

标签:gRPC,public,RPC,message,推送,Message,服务端,客户端
From: https://www.cnblogs.com/dalelee/p/17393615.html

相关文章

  • 后端消息推送-SSE协议
    介绍HTTP服务器推送也称HTTP流,是一种客户端-服务器通信模式,它将信息从HTTP服务器异步推送到客户端,而无需客户端请求。现在的web和app中,越来越多的场景使用这种通信模式,比如实时的消息提醒,IM在线聊天,多人文档协作等。以前实现这种类似的功能一般都是用ajax长轮询,而现......
  • 消息推送平台的实时数仓?!flink消费kafka消息入到hive
    大家好,3y啊。好些天没更新了,并没有偷懒,只不过一直在安装环境,差点都想放弃了。上一次比较大的更新是做了austin的预览地址,把企业微信的应用和机器人消息各种的消息类型和功能给完善了。上一篇文章也提到了,austin常规的功能已经更新得差不多了,剩下的就是各种细节的完善。不知道大......
  • 音频 caf转MP3 到上传服务端
    今天一个录制音频到服务端的功能音频录制 导入头文件#import<CoreMedia/CoreMedia.h>#import<AVFoundation/AVFoundation.h>两个对象@property(nonatomic,strong)AVAudioRecorder*audioRecorder;// 录音对象@property(nonatomic,strong)AVAudioPlayer*audioP......
  • Android实现推送方式解决方案 (转1)
    Android实现推送方式解决方案本文介绍在Android中实现推送方式的基础知识及相关解决方案。 1.推送方式基础知识: 当我们开发需要和服务器交互的应用程序时,基本上都需要获取服务器端的数据,比如《地震应急通》就需要及时获取服务器上最新的地震信息。要获取服务器上不定时更新......
  • git仓库过渡,同时向两个仓库推送代码
    公司部门被大佬收购,产品项目迁移新公司仓库,过渡期间产品上线流程继续使用原公司的,新公司部署新系统后通过域名重定向逐渐将用户引流到新系统上完成切换,最后关闭原公司系统及上线流程。过渡期间新功能代码需要保证两边git仓库里一致,即执行gitpush命令时同时往两个仓库里推送代码......
  • websocket多实例推送解决方案-数据实时展示
    需求需要前端展示实时的订单数据信息。如下图所示,实时下单实时页面统计更新展示 思路方案前端使用websocket建立通信  后端监听数据库的binglog变更,实时得到最新数据,推送到前端 现状及问题客户端想实现实时获取数据的变更,使用了websocket+kafkaMq,当......
  • socket服务端
    Socket实现在接收到不同的消息时启动PeriodicCallback返回不同的数据,并且关闭时只关闭其中某一个定时器首先,你需要对MainHandler进行修改,添加一个字典callbacks,用于保存每个消息对应的回调函数和其它相关信息,例如:importtornado.ioloopimporttornado.webclassMainHand......
  • SVN服务端使用说明(二)
    服务端使用说明服务端安装完成后,进行创建用户,新建Repositiories等 项目描述Repositories仓库,可分不同文件夹进行存储项目。Users用户,可新增,删除,设置密码等Groups用户组,对不同用户可进行组划分Jobs计划任务,属于企业版高级功能(本文档暂不涉及),可定时备份仓库文件。1)创建用......
  • 浅谈Protocol Buffers、GRPC、Buf、GRPC-Gateway
    1.ProtocolBuffers什么是proto?ProtocolBuffers如何理解ProtocolBuffers?协议缓冲区非proto协议如何订立、传播以及维护?如何理解协议缓冲区?Protocolbuffers提供了一种语言中立、平台中立、可扩展的机制,用于以向前兼容和向后兼容的方式序列化结构化数据。它......
  • 开源项目消息推送平台Austin
    开源项目消息推送平台Austin终于要上线了,迎来在线演示的第一版!......