首页 > 其他分享 >go-zero开发入门之网关往rpc服务传递数据

go-zero开发入门之网关往rpc服务传递数据

时间:2023-12-12 19:00:12浏览次数:33  
标签:网关 return err ctx headers rpc go 传递数据 mtd

go-zero 的网关往 rpc 服务传递数据时,可以使用 headers,但需要注意前缀规则,否则会发现数据传递不过去,或者对方取不到数据。

go-zero 的网关对服务的调用使用了第三方库 grpcurl,入口函数为 InvokeRPC:

grpcurl.InvokeRPC(r.Context(), source, cli.Conn(), rpcPath, s.prepareMetadata(r.Header), handler, parser.Next)

调用在 https://github.com/zeromicro/go-zero/blob/master/gateway/server.go 中进行的,上述调用会处理 HTTP 的 headers 数据,对于不是以字符串“Grpc-Metadata-”打头的会过滤掉,对于以字符串“Grpc-Metadata-”打头的会将“Grpc-Metadata-”转为“gateway-”。

// go-zero/gateway/internal/headerprocessor.go
// ProcessHeaders builds the headers for the gateway from HTTP headers.
func ProcessHeaders(header http.Header) []string {
	var headers []string

	for k, v := range header {
		if !strings.HasPrefix(k, metadataHeaderPrefix) { // 判断是否包含了前缀“Grpc-Metadata-”
			continue // 如果没有前缀“Grpc-Metadata-”则直接过滤丢弃掉
		}

		// 将前缀“Grpc-Metadata-”替换为前缀“gateway-”
		key := fmt.Sprintf("%s%s", metadataPrefix, strings.TrimPrefix(k, metadataHeaderPrefix))
		for _, vv := range v {
			headers = append(headers, key+":"+vv)
		}
	}

	return headers
}

函数 MetadataFromHeaders 负责从 headers 解码数据:

// https://github.com/fullstorydev/grpcurl/blob/master/grpcurl.go
func MetadataFromHeaders(headers []string) metadata.MD {
	md := make(metadata.MD)
	for _, part := range headers {
		if part != "" {
			pieces := strings.SplitN(part, ":", 2)
			if len(pieces) == 1 {
				pieces = append(pieces, "") // if no value was specified, just make it "" (maybe the header value doesn't matter)
			}
			headerName := strings.ToLower(strings.TrimSpace(pieces[0]))
			val := strings.TrimSpace(pieces[1])
			if strings.HasSuffix(headerName, "-bin") {
				if v, err := decode(val); err == nil {
					val = v
				}
			}
			md[headerName] = append(md[headerName], val)
		}
	}
	return md
}
// https://github.com/fullstorydev/grpcurl/blob/master/invoke.go
func InvokeRPC(ctx context.Context, source DescriptorSource, ch grpcdynamic.Channel, methodName string,
	headers []string, handler InvocationEventHandler, requestData RequestSupplier) error {

	md := MetadataFromHeaders(headers)

	svc, mth := parseSymbol(methodName)
	if svc == "" || mth == "" {
		return fmt.Errorf("given method name %q is not in expected format: 'service/method' or 'service.method'", methodName)
	}

	dsc, err := source.FindSymbol(svc)
	if err != nil {
		// return a gRPC status error if hasStatus is true
		errStatus, hasStatus := status.FromError(err)
		switch {
		case hasStatus && isNotFoundError(err):
			return status.Errorf(errStatus.Code(), "target server does not expose service %q: %s", svc, errStatus.Message())
		case hasStatus:
			return status.Errorf(errStatus.Code(), "failed to query for service descriptor %q: %s", svc, errStatus.Message())
		case isNotFoundError(err):
			return fmt.Errorf("target server does not expose service %q", svc)
		}
		return fmt.Errorf("failed to query for service descriptor %q: %v", svc, err)
	}
	sd, ok := dsc.(*desc.ServiceDescriptor)
	if !ok {
		return fmt.Errorf("target server does not expose service %q", svc)
	}
	mtd := sd.FindMethodByName(mth)
	if mtd == nil {
		return fmt.Errorf("service %q does not include a method named %q", svc, mth)
	}

	handler.OnResolveMethod(mtd)

	// we also download any applicable extensions so we can provide full support for parsing user-provided data
	var ext dynamic.ExtensionRegistry
	alreadyFetched := map[string]bool{}
	if err = fetchAllExtensions(source, &ext, mtd.GetInputType(), alreadyFetched); err != nil {
		return fmt.Errorf("error resolving server extensions for message %s: %v", mtd.GetInputType().GetFullyQualifiedName(), err)
	}
	if err = fetchAllExtensions(source, &ext, mtd.GetOutputType(), alreadyFetched); err != nil {
		return fmt.Errorf("error resolving server extensions for message %s: %v", mtd.GetOutputType().GetFullyQualifiedName(), err)
	}

	msgFactory := dynamic.NewMessageFactoryWithExtensionRegistry(&ext)
	req := msgFactory.NewMessage(mtd.GetInputType())

	handler.OnSendHeaders(md)
	ctx = metadata.NewOutgoingContext(ctx, md)

	stub := grpcdynamic.NewStubWithMessageFactory(ch, msgFactory)
	ctx, cancel := context.WithCancel(ctx)
	defer cancel()

	if mtd.IsClientStreaming() && mtd.IsServerStreaming() {
		return invokeBidi(ctx, stub, mtd, handler, requestData, req)
	} else if mtd.IsClientStreaming() {
		return invokeClientStream(ctx, stub, mtd, handler, requestData, req)
	} else if mtd.IsServerStreaming() {
		return invokeServerStream(ctx, stub, mtd, handler, requestData, req)
	} else {
		return invokeUnary(ctx, stub, mtd, handler, requestData, req)
	}
}

网关可如下实现:

newReq := r.WithContext(r.Context())
newReq.Header.Set("Grpc-Metadata-myuid", userId)
next.ServeHTTP(w, newReq)

服务端的实现:

vals := metadata.ValueFromIncomingContext(l.ctx, "gateway-myuid")
userId := vals[0]

标签:网关,return,err,ctx,headers,rpc,go,传递数据,mtd
From: https://www.cnblogs.com/aquester/p/17897616.html

相关文章

  • 基于5G智能网关的河道清洁机器人应用
    5G技术的普及也带动了机器人应用的快速发展,得益于5G通信技术的高带宽、低时延和高可靠性特性,显著提升了机器人的感知能力、控制精度以及远程操作能力,让机器人变得更加智能化、自主化。 5G智能网关赋能河道清洁机器人 河道清洁机器人就是一种具有代表性的5G赋能智能机器人应......
  • 工业生产中Profibus主站转Profinet网关优势所在
    应用案例分享:某制造企业拥有一条生产线,生产线上的多个设备之间需要进行数据通讯和相互控制。这些设备原本使用Profibus-DP总线进行通讯,但企业希望将这些设备集成到Profinet网络中,以便能够更好地实现设备监控和管理。我们为此提供了一款Profibus-DP主站转Profinet网关,将多个Profibu......
  • 是谁的简历上全是秒杀商城和RPC啊?
    是不是还在苦于自己简历上的项目离不开商城、RPC、秒杀、论坛、外卖、点评等等烂大街的项目?是不是翻遍全网再很难找到一个既有含金量又能看得懂的项目?那么现在就不用找了,下面这个项目一定适合你!高性能短链系统EZLink!教程地址:https://itmtx.cn/column/17(或者小......
  • 边缘智能网关如何应对环境污染难题
    随着我国工业化、城镇化的深入推进,包括大气污染在内的环境污染防治压力继续加大。为应对环境污染防治难题,佰马综合边缘计算、物联网、智能感知等技术,基于边缘智能网关打造环境污染实时监测、预警及智能干预方案,可应用于大气保护、水体保护、土壤保护、植被保护、生物保护等丰富场......
  • RPC
    RPC是什么?RPC(RemoteProcedureCall)即远程过程调用。为什么要RPC?因为,两个不同的服务器上的服务提供的方法不在一个内存空间,所以,需要通过网络编程才能传递方法调用所需要的参数。并且,方法调用的结果也需要通过网络编程来接收。但是,如果我们自己手动网络编程来实现这个调用......
  • go-zero开发入门-API网关鉴权开发示例
    本文是go-zero开发入门-API网关开发示例一文的延伸,继续之前请先阅读此文。在项目根目录下创建子目录middleware,在此目录下创建文件auth.go,内容如下://鉴权中间件packagemiddlewareimport("context""errors""net/http")varErrInvalidToken=errors.Ne......
  • API网关
    1.API网关:(1)什么是网关?微服务背景下,一个系统被拆分为多个服务,但是像安全认证,流量控制,日志,监控等功能是每个服务都需要的,没有网关的话,我们就需要在每个服务中单独实现,重复且零散。实际上,网关主要做了两件事情:请求转发+请求过滤。由于引入网关之后,会多一步网络转......
  • A sample of JSON RPC service
    ThisisasampleserviceprogramwhichshowhowtoimplementaJSONRPC.TheRPCserviceincludedtwofunctionswhichusedforRSAsignandverify.Ifyouwanttobuildthesourcecode,youneedinstallorbuildthreeopensorucelibraries:Libevent,cJSON......
  • Nameko,gRPC,Spring Cloud区别?
    他们都是基于RPC这个思想理念,弄出来的具体的框架。我咨询chatGPT4,他们在下面11个不同维度的区别是什么?Certainly,here'sacomparisonintheformofaMarkdowntable:特性NamekogRPCSpringCloud开发语言Python支持多种编程语言(如Python、Java、Go、C#)Jav......
  • VUE框架CLI组件化组件的自定义事件和子组件向父组件传递数据的实现------VUE框架
    <template> <div> <!--内置函数的实现步骤--> <!--提供事件源,给事件源绑定事件,编写回调函数,将回调函数和事件进行绑定--> <!--等待事件的触发,事件触发执行回调函数--> <!--组件的自定义事件实现步骤--> <button@click="Hello()">你好</button> <!--给Us......