首页 > 其他分享 >go语言使用GRPC流处理模式

go语言使用GRPC流处理模式

时间:2023-06-07 16:38:06浏览次数:68  
标签:pb kinds 语言 err GRPC four go method

go语言使用GRPC流处理模式

标签(空格分隔): go,grpc

proto文件

syntax = "proto3";
package four_kinds_method.v1;
option go_package="go-example/grpc/four_kinds_method/proto;four_kinds_method_pb";


// gRPC 允许您定义四种服务方法
// 1. 一元 RPC,其中客户端向服务器发送单个请求并返回单个响应,就像普通函数调用一样
// rpc SayHello(HelloRequest) returns (HelloResponse);
// 2. 服务器流式处理 RPC,其中客户端向服务器发送请求并获取流以读回消息序列。客户端从返回的流中读取,直到没有更多消息。gRPC 保证单个 RPC 调用中的消息排序
// rpc LotsOfReplies(HelloRequest) returns (stream HelloResponse);
// 3. 客户端流式处理 RPC,其中客户端写入一系列消息并将其发送到服务器,再次使用提供的流。客户端完成消息写入后,它将等待服务器读取消息并返回其响应。同样,gRPC 保证单个 RPC 调用中的消息排序
// rpc LotsOfGreetings(stream HelloRequest) returns (HelloResponse);
// 4. 双向流式处理 RPC,其中双方使用读写流发送一系列消息。这两个流独立运行,因此客户端和服务器可以按照它们喜欢的任何顺序进行读取和写入:例如,服务器可以等待接收所有客户端消息,然后再写入响应,或者它可以交替读取消息然后写入消息,或者读取和写入的某种其他组合。保留每个流中消息的顺序
// rpc BidiHello(stream HelloRequest) returns (stream HelloResponse);


// 生成pb文件: protoc --go_out=plugins=grpc:. --go_opt=paths=source_relative ./four_kinds_method.proto

message StreamRequest {
  string data = 1;
}

message StreamResponse {
  string data = 1;
}

service Greeter {
  // 服务端流
  rpc GetStream(StreamRequest) returns (stream StreamResponse);
  // 客户端流
  rpc PutStream(stream StreamRequest) returns (StreamResponse);
  // 双向流
  rpc AllStream(stream StreamRequest) returns (stream StreamResponse);
}

server实现

package api

import (
	"fmt"
	four_kinds_method_pb "go-example/grpc/four_kinds_service_method/proto"
	"sync"
	"time"
)

type Service struct {
}

// GetStream 服务端流模式
func (s *Service) GetStream(req *four_kinds_method_pb.StreamRequest, res four_kinds_method_pb.Greeter_GetStreamServer) error {
	i := 0
	for true {
		i++
		res.Send(&four_kinds_method_pb.StreamResponse{
			Data: fmt.Sprintf("%s, %+v", req.Data, time.Now().Unix()),
		})
		if i >= 10 {
			break
		}
		time.Sleep(time.Second * 2)
	}
	return nil
}

// PutStream 客户端流模式
func (s *Service) PutStream(clientStream four_kinds_method_pb.Greeter_PutStreamServer) error {
	for {
		recv, err := clientStream.Recv()
		if err != nil {
			fmt.Println("PutStreamErr: ", err)
			break
		} else {
			fmt.Println("PutStreamRecv: ", recv.Data)
		}
	}
	return nil
}

// AllStream 双向流模式
func (s *Service) AllStream(allStream four_kinds_method_pb.Greeter_AllStreamServer) error {

	var wg sync.WaitGroup
	wg.Add(2)
	go func() {
		defer wg.Done()
		for {
			recv, err := allStream.Recv()
			if err != nil {
				fmt.Println("服务端接受数据错误: ", err)
				break
			} else {
				fmt.Println("服务端接受数据成功: ", recv.Data)
			}

			// break
		}
	}()

	go func() {
		defer wg.Done()
		for {
			allStream.Send(&four_kinds_method_pb.StreamResponse{
				Data: "我是服务端:hello world",
			})
			// break
		}
	}()

	wg.Wait()
	return nil
}

server-main启动服务

package main

import (
	"go-example/grpc/four_kinds_service_method/api"
	four_kinds_method_pb "go-example/grpc/four_kinds_service_method/proto"
	"go.uber.org/zap"
	"google.golang.org/grpc"
	"net"
)

func main() {

	logger, _ := zap.NewDevelopment()

	server := grpc.NewServer()

	four_kinds_method_pb.RegisterGreeterServer(server, &api.Service{})

	listen, err := net.Listen("tcp", ":6662")
	if err != nil {
		logger.Fatal("net.Listen error", zap.Error(err))
	}
	logger.Info("grpc four_kinds_method service started [127.0.0.1:6662] ")
	err = server.Serve(listen)
	if err != nil {
		logger.Fatal("grpc serve error", zap.Error(err))
	}
}

client

package main

import (
	"context"
	"fmt"
	four_kinds_method_pb "go-example/grpc/four_kinds_service_method/proto"
	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials/insecure"
)

func main() {

	conn, err := grpc.Dial("127.0.0.1:6662", grpc.WithTransportCredentials(insecure.NewCredentials()))
	if err != nil {
		panic(err)
	}

	client := four_kinds_method_pb.NewGreeterClient(conn)

	//stream, _ := client.GetStream(context.Background(), &four_kinds_method_pb.StreamRequest{Data: "Hello World"})
	//for {
	//	recv, err := stream.Recv()
	//	if err != nil {
	//		fmt.Println("err:", err)
	//		break
	//	}
	//	fmt.Println("recv: ", recv)
	//}

	//putStream, err := client.PutStream(context.Background())
	//if err != nil {
	//	panic(err)
	//}
	//
	//i := 0
	//for {
	//	i++
	//	putStream.Send(&four_kinds_method_pb.StreamRequest{
	//		Data: "Hello World" + strconv.Itoa(i),
	//	})
	//	time.Sleep(time.Second * 2)
	//	if i > 10 {
	//		break
	//	}
	//}

	stream, err := client.AllStream(context.TODO())
	if err != nil {
		fmt.Println("err1:", err)
	}
	//for {

	err = stream.Send(&four_kinds_method_pb.StreamRequest{
		Data: "hello php",
	})
	fmt.Println("err:", err)
	stream.CloseSend()
	//}

	//go func() {
	//	for {
	//		recv, _ := stream.Recv()
	//		fmt.Println(recv)
	//	}
	//}()
}

标签:pb,kinds,语言,err,GRPC,four,go,method
From: https://www.cnblogs.com/yanweifeng/p/17463744.html

相关文章

  • 使用gorm进行数量统计【limit、offset对count的统计的影响】
    limit、offset对count的统计的影响错误示例1:请注意,如下例子中,Count放在了最后面,查询时,count方法也会加上Limit和offset这两个语句:global.DB.Limit(10).Offset(2).Find(&users).Count(&total)错误示例2:下面这种方法,看似没啥问题,实际上count的时候也会带上分页。varorm=glob......
  • Go内存逃逸
    前言很久以前就听过过内存逃逸这个词,最近了解了一下,才发现是个很简单的概念.只要把前言部分看完,就已经了解了.来吧…在介绍内存逃逸之前,我们先用C语言来引出这个概念.我们的进程在内存中有栈内存和堆内存的概念,栈内存是函数执行的局部内存,会随着函数的结束而全部......
  • Golang中如何控制goroutine的执行顺序?
    首先说明一下原理:前后协程之间通过通道去相互限制,后一个线程尝试去获取一个channel的值,当channel中没有值时,就会一直阻塞,而前一个协程则负责关闭channel,当前一个协程完成了这个操作,后一个协程才可以结束阻塞,继续执行。示例代码:packagemainimport( "fmt" "time")funcma......
  • 深度学习应用篇-计算机视觉-图像分类[2]:LeNet、AlexNet、VGG、GoogleNet、DarkNet模型
    深度学习应用篇-计算机视觉-图像分类[2]:LeNet、AlexNet、VGG、GoogleNet、DarkNet模型结构、实现、模型特点详细介绍1.LeNet(1998)LeNet是最早的卷积神经网络之一<sup>[1]</sup>,其被提出用于识别手写数字和机器印刷字符。1998年,YannLeCun第一次将LeNet卷积神经网络应用到图像分类......
  • 深度学习应用篇-计算机视觉-图像分类[2]:LeNet、AlexNet、VGG、GoogleNet、DarkNet模型
    深度学习应用篇-计算机视觉-图像分类[2]:LeNet、AlexNet、VGG、GoogleNet、DarkNet模型结构、实现、模型特点详细介绍1.LeNet(1998)LeNet是最早的卷积神经网络之一[1],其被提出用于识别手写数字和机器印刷字符。1998年,YannLeCun第一次将LeNet卷积神经网络应用到图像分类上,在手写数......
  • 传奇GOM引擎补丁安装教程图解,传奇pak补丁介绍
    gameofmir引擎的加密补丁格式是PAK格式,该补丁放入客户端的位置也和传统的补丁不一样,导致很多玩家补丁都打错了,最后游戏不显示和黑屏常规的补丁文件夹就这几个datamapwav自定义补丁文件夹名字如名字就是自定义正确的自定义补丁使用方法如下:先带大家认识一下自定义补丁目录,每个版本作......
  • C#语言LIS系统如何接收和解析仪器数据HL7协议
    以下是使用C#实现HL7接口协议的接收和解析的简单示例:1.使用TcpListener类创建一个TCP服务器,用于监听指定端口上的连接请求:```TcpListenerserver=newTcpListener(IPAddress.Any,8888);server.Start();```2.等待客户端连接,并使用TcpClient类创建一个TCP连接:```TcpClie......
  • C语言判断大小端的几种方法
    在操作系统中,经常会用到判断大小端,很多面试题中也会经常遇到,以前的时候没有总结过,这里总结一下。以后用到了就直接可以用了。所谓的大小端,大致的解释意思就是:【大端模式】CPU对操作数的存放方式是高地址存放低位,低地址存放高位。【小端模式】CPU对操作数的存放方式是高地址存......
  • 通过redis学网络(1)-用go基于epoll实现最简单网络通信框架
    本系列主要是为了对redis的网络模型进行学习,我会用golang实现一个reactor网络模型,并实现对redis协议的解析。系列源码已经上传githubhttps://github.com/HobbyBear/tinyredis/tree/chapter1redis的网络模型是基于epoll实现的,所以这一节让我们先基于epoll,实现一个最简单的服......
  • gRPC 简介
    gRPC简介标签(空格分隔):go,grpc概述在gRPC中,客户端应用程序可以直接调用不同计算机上的服务器应用程序上的方法,就像它是本地对象一样,从而使您更轻松地创建分布式应用程序和服务。与许多RPC系统一样,gRPC基于定义服务的思想,指定可以使用其参数和返回类型远程调用的方法。在......