client
package main
import (
"context"
"fmt"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/exporters/jaeger"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/sdk/resource"
tracesdk "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/semconv/v1.17.0"
"go_otel/2go_grpc/grpc_proto/hello_grpc"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"log"
"os"
"time"
)
const (
service = "grpc_client"
environment = "dev"
id = 2
)
func tracerProvider() (*tracesdk.TracerProvider, error) {
// Create the Jaeger exporter
exp, err := jaeger.New(jaeger.WithCollectorEndpoint(jaeger.WithEndpoint("http://192.168.252.128:14268/api/traces")))
if err != nil {
return nil, err
}
tp := tracesdk.NewTracerProvider(
// Always be sure to batch in production.
tracesdk.WithSampler(tracesdk.AlwaysSample()),
tracesdk.WithBatcher(exp),
tracesdk.WithResource(resource.NewWithAttributes(
semconv.SchemaURL,
semconv.ServiceName(service),
attribute.String("environment", environment),
attribute.Int64("ID", id),
),
),
)
// Register our TracerProvider as the global so any imported
// instrumentation in the future will default to using it.
otel.SetTracerProvider(tp)
otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{}))
return tp, nil
}
func main() {
//创建traceProvider
tp, err := tracerProvider()
if err != nil {
fmt.Println("NewTracerProvider err:", err)
os.Exit(1)
}
conn, err := grpc.Dial("localhost:8080", grpc.WithTransportCredentials(insecure.NewCredentials()),
//Unary拦截器
grpc.WithChainUnaryInterceptor(
//openTelemetry 链路追踪
otelgrpc.UnaryClientInterceptor(otelgrpc.WithTracerProvider(tp)),
),
//Stream拦截器
grpc.WithChainStreamInterceptor(func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
// Pre-processing logic
s := time.Now()
cs, err := streamer(ctx, desc, cc, method, opts...)
// Post-processing logic
log.Printf("method: %s, latency: %s\n", method, time.Now().Sub(s))
return cs, err
}),
)
defer func() {
println("关闭TracerProvider。所有注册的跨度处理器都会按照它们注册的顺序关闭,并释放所有持有的计算资源。")
if err := tp.Shutdown(context.Background()); err != nil {
panic(err)
}
}()
if err != nil {
log.Fatalf("connection failed,err:%s", err)
}
// 初始化客户端
client := hello_grpc.NewHelloServiceClient(conn)
result, err := client.SayHello(context.Background(), &hello_grpc.HelloRequest{
Name: "client",
Message: "hello",
})
fmt.Println(result, err)
}
server
package main
import (
"context"
"fmt"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/exporters/jaeger"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/sdk/resource"
tracesdk "go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.17.0"
"go_otel/2go_grpc/grpc_proto/hello_grpc"
"google.golang.org/grpc"
"google.golang.org/grpc/grpclog"
"log"
"net"
)
const (
service = "grpc_server"
environment = "dev"
id = 2
)
var tracer = otel.Tracer(service)
// HelloServer 得有一个结构体,需要实现这个服务的全部方法,叫什么名字不重要
type HelloServer struct {
}
func (HelloServer) SayHello(ctx context.Context, request *hello_grpc.HelloRequest) (*hello_grpc.HelloResponse, error) {
fmt.Println("入参:", request.Name, request.Message)
_, span := tracer.Start(ctx, "grpc")
span.SetAttributes(attribute.Key("request.Name").String(request.Name))
span.SetAttributes(attribute.Key("request.Message").String(request.Message))
defer span.End()
return &hello_grpc.HelloResponse{
Name: "server",
Message: "hello " + request.Name,
}, nil
}
func tracerProvider() (*tracesdk.TracerProvider, error) {
// Create the Jaeger exporter
exp, err := jaeger.New(jaeger.WithCollectorEndpoint(jaeger.WithEndpoint("http://192.168.252.128:14268/api/traces")))
if err != nil {
return nil, err
}
tp := tracesdk.NewTracerProvider(
// Always be sure to batch in production.
tracesdk.WithSampler(tracesdk.AlwaysSample()),
tracesdk.WithBatcher(exp),
tracesdk.WithResource(resource.NewWithAttributes(
semconv.SchemaURL,
semconv.ServiceName(service),
attribute.String("environment", environment),
attribute.Int64("ID", id),
),
),
)
// Register our TracerProvider as the global so any imported
// instrumentation in the future will default to using it.
otel.SetTracerProvider(tp)
otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{}))
return tp, nil
}
func main() {
tp, err := tracerProvider()
if err != nil {
log.Fatal(err)
}
// Cleanly shutdown and flush telemetry when the application exits.
defer func() {
if err := tp.Shutdown(context.Background()); err != nil {
log.Fatal(err)
}
}()
// grpc
listen, err := net.Listen("tcp", ":8080")
if err != nil {
grpclog.Fatalf("Failed to listen: %v", err)
}
// 创建一个gRPC服务器实例。
s := grpc.NewServer(
//引入grpc-middleware定义的拦截器
grpc.ChainUnaryInterceptor(
//openTelemetry 链路追踪
otelgrpc.UnaryServerInterceptor(otelgrpc.WithTracerProvider(tp)),
),
)
server := HelloServer{}
// 将server结构体注册为gRPC服务。
hello_grpc.RegisterHelloServiceServer(s, &server)
fmt.Println("grpc server running :8080")
// 开始处理客户端请求。
err = s.Serve(listen)
}
hello.proto
syntax = "proto3"; // 指定proto版本
package hello_grpc; // 指定默认包名
// 指定golang包名
option go_package = "/hello_grpc";
//定义rpc服务
service HelloService {
// 定义函数
rpc SayHello (HelloRequest) returns (HelloResponse) {}
}
// HelloRequest 请求内容
message HelloRequest {
string name = 1;
string message = 2;
}
// HelloResponse 响应内容
message HelloResponse{
string name = 1;
string message = 2;
}