go语言使用GRPC流处理模式
标签(空格分隔): go,grpc
proto文件
syntax = "proto3";
package four_kinds_method.v1;
option go_package="go-example/grpc/four_kinds_method/proto;four_kinds_method_pb";
// gRPC 允许您定义四种服务方法
// 1. 一元 RPC,其中客户端向服务器发送单个请求并返回单个响应,就像普通函数调用一样
// rpc SayHello(HelloRequest) returns (HelloResponse);
// 2. 服务器流式处理 RPC,其中客户端向服务器发送请求并获取流以读回消息序列。客户端从返回的流中读取,直到没有更多消息。gRPC 保证单个 RPC 调用中的消息排序
// rpc LotsOfReplies(HelloRequest) returns (stream HelloResponse);
// 3. 客户端流式处理 RPC,其中客户端写入一系列消息并将其发送到服务器,再次使用提供的流。客户端完成消息写入后,它将等待服务器读取消息并返回其响应。同样,gRPC 保证单个 RPC 调用中的消息排序
// rpc LotsOfGreetings(stream HelloRequest) returns (HelloResponse);
// 4. 双向流式处理 RPC,其中双方使用读写流发送一系列消息。这两个流独立运行,因此客户端和服务器可以按照它们喜欢的任何顺序进行读取和写入:例如,服务器可以等待接收所有客户端消息,然后再写入响应,或者它可以交替读取消息然后写入消息,或者读取和写入的某种其他组合。保留每个流中消息的顺序
// rpc BidiHello(stream HelloRequest) returns (stream HelloResponse);
// 生成pb文件: protoc --go_out=plugins=grpc:. --go_opt=paths=source_relative ./four_kinds_method.proto
message StreamRequest {
string data = 1;
}
message StreamResponse {
string data = 1;
}
service Greeter {
// 服务端流
rpc GetStream(StreamRequest) returns (stream StreamResponse);
// 客户端流
rpc PutStream(stream StreamRequest) returns (StreamResponse);
// 双向流
rpc AllStream(stream StreamRequest) returns (stream StreamResponse);
}
server实现
package api
import (
"fmt"
four_kinds_method_pb "go-example/grpc/four_kinds_service_method/proto"
"sync"
"time"
)
type Service struct {
}
// GetStream 服务端流模式
func (s *Service) GetStream(req *four_kinds_method_pb.StreamRequest, res four_kinds_method_pb.Greeter_GetStreamServer) error {
i := 0
for true {
i++
res.Send(&four_kinds_method_pb.StreamResponse{
Data: fmt.Sprintf("%s, %+v", req.Data, time.Now().Unix()),
})
if i >= 10 {
break
}
time.Sleep(time.Second * 2)
}
return nil
}
// PutStream 客户端流模式
func (s *Service) PutStream(clientStream four_kinds_method_pb.Greeter_PutStreamServer) error {
for {
recv, err := clientStream.Recv()
if err != nil {
fmt.Println("PutStreamErr: ", err)
break
} else {
fmt.Println("PutStreamRecv: ", recv.Data)
}
}
return nil
}
// AllStream 双向流模式
func (s *Service) AllStream(allStream four_kinds_method_pb.Greeter_AllStreamServer) error {
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
for {
recv, err := allStream.Recv()
if err != nil {
fmt.Println("服务端接受数据错误: ", err)
break
} else {
fmt.Println("服务端接受数据成功: ", recv.Data)
}
// break
}
}()
go func() {
defer wg.Done()
for {
allStream.Send(&four_kinds_method_pb.StreamResponse{
Data: "我是服务端:hello world",
})
// break
}
}()
wg.Wait()
return nil
}
server-main启动服务
package main
import (
"go-example/grpc/four_kinds_service_method/api"
four_kinds_method_pb "go-example/grpc/four_kinds_service_method/proto"
"go.uber.org/zap"
"google.golang.org/grpc"
"net"
)
func main() {
logger, _ := zap.NewDevelopment()
server := grpc.NewServer()
four_kinds_method_pb.RegisterGreeterServer(server, &api.Service{})
listen, err := net.Listen("tcp", ":6662")
if err != nil {
logger.Fatal("net.Listen error", zap.Error(err))
}
logger.Info("grpc four_kinds_method service started [127.0.0.1:6662] ")
err = server.Serve(listen)
if err != nil {
logger.Fatal("grpc serve error", zap.Error(err))
}
}
client
package main
import (
"context"
"fmt"
four_kinds_method_pb "go-example/grpc/four_kinds_service_method/proto"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
func main() {
conn, err := grpc.Dial("127.0.0.1:6662", grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
panic(err)
}
client := four_kinds_method_pb.NewGreeterClient(conn)
//stream, _ := client.GetStream(context.Background(), &four_kinds_method_pb.StreamRequest{Data: "Hello World"})
//for {
// recv, err := stream.Recv()
// if err != nil {
// fmt.Println("err:", err)
// break
// }
// fmt.Println("recv: ", recv)
//}
//putStream, err := client.PutStream(context.Background())
//if err != nil {
// panic(err)
//}
//
//i := 0
//for {
// i++
// putStream.Send(&four_kinds_method_pb.StreamRequest{
// Data: "Hello World" + strconv.Itoa(i),
// })
// time.Sleep(time.Second * 2)
// if i > 10 {
// break
// }
//}
stream, err := client.AllStream(context.TODO())
if err != nil {
fmt.Println("err1:", err)
}
//for {
err = stream.Send(&four_kinds_method_pb.StreamRequest{
Data: "hello php",
})
fmt.Println("err:", err)
stream.CloseSend()
//}
//go func() {
// for {
// recv, _ := stream.Recv()
// fmt.Println(recv)
// }
//}()
}
标签:pb,kinds,语言,err,GRPC,four,go,method
From: https://www.cnblogs.com/yanweifeng/p/17463744.html