微服务之链路追踪

基本概念

链路追踪是分布式系统下的一个概念,它的目的就是要解决上面所提出的问题,也就是将一次分布式请求还原成调用链路,将一次分布式请求的调用情况集中展示,比如,各个服务节点上的耗时、请求具体到达哪台机器上、每个服务节点的请求状态等等。

img

链路追踪产品架构

Tracing Analysis Workflow

链路追踪的主要工作流程如下:

  1. 客户端的应用程序通过集成链路追踪的多语言客户端SDK上报服务调用数据。链路追踪支持多种开源社区的SDK,且支持OpenTracing标准。
  2. 数据上报至链路追踪控制台后,链路追踪组件进行实时聚合计算和持久化,形成链路明细、性能总览、实时拓扑等监控数据。您可以据此进行问题排查与诊断。
  3. 调用链数据可对接下游阿里云产品,例如LogSearch、CloudMonitor、MaxCompute等,用于离线分析、报警等场景。

OpenTracing

为了解决不同的分布式追踪系统 API 不兼容的问题,诞生了 OpenTracing 规范。
OpenTracing 是一个轻量级的标准化层,它位于应用程序/类库追踪或日志分析程序之间。

img

OpenTracing 的优势

  • OpenTracing 已进入 CNCF,正在为全球的分布式追踪,提供统一的概念和数据标准。
  • OpenTracing 通过提供平台无关、厂商无关的 API,使得开发人员能够方便的添加(或更换)追踪系统的实现。

OpenTracing 数据模型

OpenTracing 中的 Trace(调用链)通过归属于此调用链的 Span 来隐性的定义。
特别说明,一条 Trace(调用链)可以被认为是一个由多个 Span 组成的有向无环图(DAG图),SpanSpan 的关系被命名为 References

例如:下面的示例 Trace 就是由8个 Span 组成:

img

有些时候,使用下面这种,基于时间轴的时序图可以更好的展现 Trace(调用链):

img

每个 Span 包含以下的状态:(译者注:由于这些状态会反映在 OpenTracing API 中,所以会保留部分英文说明)

  • An operation name,操作名称
  • A start timestamp,起始时间
  • A finish timestamp,结束时间
  • Span Tag,一组键值对构成的 Span 标签集合。键值对中,键必须为 string,值可以是字符串,布尔,或者数字类型。
  • Span Log,一组 span 的日志集合。每次 log 操作包含一个键值对,以及一个时间戳。

键值对中,键必须为 string,值可以是任意类型。
但是需要注意,不是所有的支持 OpenTracing 的 Tracer,都需要支持所有的值类型。

  • SpanContext,Span 上下文对象 (下面会详细说明)
  • References(Span间关系),相关的零个或者多个 Span(Span 间通过 SpanContext 建立这种关系)

每一个 SpanContext 包含以下状态:

  • 任何一个 OpenTracing 的实现,都需要将当前调用链的状态(例如:trace 和 span 的 id),依赖一个独特的 Span 去跨进程边界传输
  • Baggage Items,Trace 的随行数据,是一个键值对集合,它存在于 trace 中,也需要跨进程边界传输

更多关于 OpenTracing 数据模型的知识,请参考 OpenTracing语义标准

概念和术语

Traces

一个trace代表一个潜在的,分布式的,存在并行数据或并行执行轨迹(潜在的分布式、并行)的系统。一个trace可以认为是多个span的有向无环图(DAG)。

Spans

一个span代表系统中具有开始时间和执行时长的逻辑运行单元。span之间通过嵌套或者顺序排列建立逻辑因果关系。

Operation Names

每一个span都有一个操作名称,这个名称简单,并具有可读性高。(例如:一个RPC方法的名称,一个函数名,或者一个大型计算过程中的子任务或阶段)。span的操作名应该是一个抽象、通用的标识,能够明确的、具有统计意义的名称;更具体的子类型的描述,请使用Tags

例如,假设一个获取账户信息的span会有如下可能的名称:

操作名 指导意见
get 太抽象
get_account/792 太明确
get_account 正确的操作名,关于account_id=792的信息应该使用Tag操作
Inter-Span References

一个span可以和一个或者多个span间存在因果关系。OpenTracing定义了两种关系:ChildOfFollowsFrom这两种引用类型代表了子节点和父节点间的直接因果关系。未来,OpenTracing将支持非因果关系的span引用关系。(例如:多个span被批量处理,span在同一个队列中,等等)

ChildOf 引用: 一个span可能是一个父级span的孩子,即”ChildOf”关系。在”ChildOf”引用关系下,父级span某种程度上取决于子span。下面这些情况会构成”ChildOf”关系:

  • 一个RPC调用的服务端的span,和RPC服务客户端的span构成ChildOf关系
  • 一个sql insert操作的span,和ORM的save方法的span构成ChildOf关系
  • 很多span可以并行工作(或者分布式工作)都可能是一个父级的span的子项,他会合并所有子span的执行结果,并在指定期限内返回

下面都是合理的表述一个”ChildOf”关系的父子节点关系的时序图。

1
2
3
4
5
6
7
8
9
[-Parent Span---------]
[-Child Span----]

[-Parent Span--------------]
[-Child Span A----]
[-Child Span B----]
[-Child Span C----]
[-Child Span D---------------]
[-Child Span E----]

FollowsFrom 引用: 一些父级节点不以任何方式依然他们子节点的执行结果,这种情况下,我们说这些子span和父span之间是”FollowsFrom”的因果关系。”FollowsFrom”关系可以被分为很多不同的子类型,未来版本的OpenTracing中将正式的区分这些类型

下面都是合理的表述一个”FollowFrom”关系的父子节点关系的时序图。

1
2
3
4
5
6
7
8
9
[-Parent Span-]  [-Child Span-]


[-Parent Span--]
[-Child Span-]


[-Parent Span-]
[-Child Span-]

SpanContext

每个span必须提供方法访问SpanContext。SpanContext代表跨越进程边界,传递到下级span的状态。(例如,包含<trace_id, span_id, sampled>元组),并用于封装Baggage (关于Baggage的解释,请参考下文)。SpanContext在跨越进程边界,和在追踪图中创建边界的时候会使用。(ChildOf关系或者其他关系,参考Span间关系 )。

主要功能

链路追踪的主要功能如下:

  • 分布式调用链查询和诊断:追踪分布式架构中的所有微服务用户请求,并将它们汇总成分布式调用链。
  • 应用性能实时汇总:通过追踪整个应用程序的用户请求,来实时汇总组成应用程序的单个服务和资源。
  • 分布式拓扑动态发现:用户的所有分布式微服务应用和相关PaaS产品可以通过链路追踪收集到分布式调用信息。
  • 多语言开发程序接入:基于OpenTracing标准,兼容开源社区,例如Jaeger、Zipkin。
  • 丰富的下游对接场景:收集的链路可直接用于日志分析,且可对接到MaxCompute等下游分析平台。

选型

image-20221016002632262

Go技术栈下,推荐使用Go原生的Jaeger

Jaeger

官方文档比较齐全:Introduction

中文文档:https://rocdu.gitbook.io/jaeger-doc-zh/

安装

安装使用Docker安装

1
2
3
4
5
6
7
8
9
10
11
12
13
14
docker run -d --name jaeger \
-e COLLECTOR_ZIPKIN_HOST_PORT=:9411 \
-e COLLECTOR_OTLP_ENABLED=true \
-p 6831:6831/udp \
-p 6832:6832/udp \
-p 5778:5778 \
-p 16686:16686 \
-p 4317:4317 \
-p 4318:4318 \
-p 14250:14250 \
-p 14268:14268 \
-p 14269:14269 \
-p 9411:9411 \
jaegertracing/all-in-one:1.38

安装成功后,通过 http://127.0.0.1:16686/search打开管理页面

image-20221016160115057

架构

Architecture

通过Go操作Jagger

https://github.com/jaegertracing/jaeger-client-go

示例:https://github.com/jaegertracing/jaeger-client-go/blob/master/config/example_test.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
package main

import (
"log"
"time"

"github.com/opentracing/opentracing-go"
"github.com/uber/jaeger-client-go"
"github.com/uber/jaeger-lib/metrics"

jaegercfg "github.com/uber/jaeger-client-go/config"
jaegerlog "github.com/uber/jaeger-client-go/log"
)

func main() {
cfg := jaegercfg.Configuration{
// 采样配置
Sampler: &jaegercfg.SamplerConfig{
Type: jaeger.SamplerTypeConst,
Param: 1,
},
// 采样报告服务器地址
Reporter: &jaegercfg.ReporterConfig{
LogSpans: true,
LocalAgentHostPort: "172.16.211.17:6831",
},
ServiceName: "mitaka_test",
}

// Example logger and metrics factory. Use github.com/uber/jaeger-client-go/log
// and github.com/uber/jaeger-lib/metrics respectively to bind to real logging and metrics
// frameworks.
jLogger := jaegerlog.StdLogger
jMetricsFactory := metrics.NullFactory

// Initialize tracer with a logger and a metrics factory
// 注册全局tracer
closer, err := cfg.InitGlobalTracer(
"mitaka_test",
jaegercfg.Logger(jLogger),
jaegercfg.Metrics(jMetricsFactory),
)
// 或者注册局部tracer,使用全局tracer时需要注意,一定需要首选注册,然后再使用(放在main中初始化),否则后续使用的tracer都是空tracer,无法与其他进程中的tracer关联起来。
//tracer, closer, err := cfg.NewTracer(jaegercfg.Logger(jLogger),
// jaegercfg.Metrics(jMetricsFactory))
if err != nil {
log.Printf("Could not initialize jaeger tracer: %s", err.Error())
return
}
defer closer.Close()

tracer := opentracing.GlobalTracer()
span := tracer.StartSpan("go-grpc-web")
defer span.Finish()
time.Sleep(time.Second)
}

选择对应servicemitaka_test

image-20221016160207362

点击可查看详情

image-20221016160308755

span嵌套

1
2
3
4
5
6
7
8
9
10
11
12
parentSpan := tracer.StartSpan("main")
defer parentSpan.Finish()

// funcA是main的子span
spanA := tracer.StartSpan("funcA", opentracing.ChildOf(parentSpan.Context()))
time.Sleep(time.Second)
spanA.Finish()

// funcB是main的子span
spanB := tracer.StartSpan("funcB", opentracing.ChildOf(parentSpan.Context()))
time.Sleep(time.Second)
spanB.Finish()

image-20221016162504150

应用到gRPC中

结合midware,这里推荐使用全局tracer

在go grpc midware官方文档中可以找到支持监控的midware

Monitoring

  • grpc_prometheus - Prometheus client-side and server-side monitoring middleware
  • otgrpc - OpenTracing client-side and server-side interceptors
  • grpc_opentracing - OpenTracing client-side and server-side interceptors with support for streaming and handler-returned tags
  • otelgrpc - OpenTelemetry client-side and server-side interceptors

其中支持OpenTracing的插件有

1
2
https://github.com/grpc-ecosystem/grpc-opentracing/tree/master/go/otgrpc
https://github.com/grpc-ecosystem/go-grpc-middleware/tree/master/tracing/opentracing

这里使用性能更高的 otgrpc。(官方文旦中使用的是grpc_opentracing

从代码上看,otgrpcgithub.com/opentracing-contrib/go-grpc差不多,但是前者stars更多

安装

1
go get github.com/grpc-ecosystem/grpc-opentracing/go/otgrpc

客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
package main

import (
"bytes"
"context"
"fmt"
"io"
"log"
"time"

"github.com/grpc-ecosystem/grpc-opentracing/go/otgrpc"
"github.com/opentracing/opentracing-go"
"github.com/uber/jaeger-client-go"
jaegercfg "github.com/uber/jaeger-client-go/config"
jaegerlog "github.com/uber/jaeger-client-go/log"
"github.com/uber/jaeger-lib/metrics"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/status"

"gostudy/920/minegrpc"
)

func main() {
// 1s 超时
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()

// 注册全局tracer
closer := initTracer()
defer closer.Close()

parentSpan := opentracing.GlobalTracer().StartSpan("main")
defer parentSpan.Finish()

dial, err := grpc.Dial("127.0.0.1:8080",
// 配置tracer拦截器
grpc.WithUnaryInterceptor(otgrpc.OpenTracingClientInterceptor(opentracing.GlobalTracer())),
grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Fatal(err)
}
defer dial.Close()

client := minegrpc.NewGreeterClient(dial)

ctx = opentracing.ContextWithSpan(ctx, parentSpan)
reply, err := client.SayHello(ctx, &minegrpc.HelloRequest{Name: "mitaka"})
if err != nil {
if e, ok := status.FromError(err); ok {
fmt.Println(e.Code())
fmt.Println(e.Message())
}
}
fmt.Printf("reply: %v\n", reply)
}

func initTracer() io.Closer {
cfg := jaegercfg.Configuration{
// 采样配置
Sampler: &jaegercfg.SamplerConfig{
Type: jaeger.SamplerTypeConst,
Param: 1,
},
// 采样报告服务器地址
Reporter: &jaegercfg.ReporterConfig{
LogSpans: true,
LocalAgentHostPort: "192.168.51.16:6831",
},
ServiceName: "mitaka_client",
}

jLogger := jaegerlog.StdLogger
jMetricsFactory := metrics.NullFactory

// 注册全局tracer
closer, err := cfg.InitGlobalTracer(
"mitaka_client",
jaegercfg.Logger(jLogger),
jaegercfg.Metrics(jMetricsFactory),
)
if err != nil {
log.Printf("Could not initialize jaeger tracer: %s", err.Error())
return io.NopCloser(bytes.NewReader(nil))
}
return closer
}

结合HTTP请求使用tracer:一般而言,前后端分离架构上,前端一般通过HTTP服务请求后端接口,后端微服务之间使用gRPC通信,作为入口是HTTP服务的接口,那么tracer可以在midware时生成:

1
2
3
4
5
6
7
8
9
func Tracer() gin.HandlerFunc {
return func(c *gin.Context) {
span := opentracing.StartSpan(c.Request.URL.String())
defer span.Finish()
c.Set("tracer", opentracing.GlobalTracer())
c.Set("span", span)
c.Next()
}
}

使用时,则需要注意,span可以通过context注入

1
2
3
4
if i, ok := c.Get("span"); ok {
span := i.(opentracing.Span)
ctx = opentracing.ContextWithSpan(ctx, span)
}

从官网的例子看,官网推荐使用http客户端和服务端的用法

客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func makeSomeRequest(ctx context.Context) ... {
if span := opentracing.SpanFromContext(ctx); span != nil {
httpClient := &http.Client{}
httpReq, _ := http.NewRequest("GET", "http://myservice/", nil)

// Transmit the span's TraceContext as HTTP headers on our
// outbound request.
// tracer注入到header中
opentracing.GlobalTracer().Inject(
span.Context(),
opentracing.HTTPHeaders,
opentracing.HTTPHeadersCarrier(httpReq.Header))

resp, err := httpClient.Do(httpReq)
...
}
...
}

服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
http.HandleFunc("/", func(w http.ResponseWriter, req *http.Request) {
var serverSpan opentracing.Span
appSpecificOperationName := ...
// 将tracer从header中反序列化出来
wireContext, err := opentracing.GlobalTracer().Extract(
opentracing.HTTPHeaders,
opentracing.HTTPHeadersCarrier(req.Header))
if err != nil {
// Optionally record something about err here
}

// Create the span referring to the RPC client if available.
// If wireContext == nil, a root span will be created.
serverSpan = opentracing.StartSpan(
appSpecificOperationName,
ext.RPCServerOption(wireContext))

defer serverSpan.Finish()

ctx := opentracing.ContextWithSpan(context.Background(), serverSpan)
...
}

通过源码可以看到

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
func OpenTracingClientInterceptor(tracer opentracing.Tracer, optFuncs ...Option) grpc.UnaryClientInterceptor {
otgrpcOpts := newOptions()
otgrpcOpts.apply(optFuncs...)
return func(
ctx context.Context,
method string,
req, resp interface{},
cc *grpc.ClientConn,
invoker grpc.UnaryInvoker,
opts ...grpc.CallOption,
) error {
var err error
var parentCtx opentracing.SpanContext
// 获取parent Span
if parent := opentracing.SpanFromContext(ctx); parent != nil {
parentCtx = parent.Context()
}
if otgrpcOpts.inclusionFunc != nil &&
!otgrpcOpts.inclusionFunc(parentCtx, method, req, resp) {
return invoker(ctx, method, req, resp, cc, opts...)
}
clientSpan := tracer.StartSpan(
method,
opentracing.ChildOf(parentCtx),
ext.SpanKindRPCClient,
gRPCComponentTag,
)
defer clientSpan.Finish()
// 将tracer和span保存到ctx的元数据中
ctx = injectSpanContext(ctx, tracer, clientSpan)
if otgrpcOpts.logPayloads {
clientSpan.LogFields(log.Object("gRPC request", req))
}
err = invoker(ctx, method, req, resp, cc, opts...)
if err == nil {
if otgrpcOpts.logPayloads {
clientSpan.LogFields(log.Object("gRPC response", resp))
}
} else {
SetSpanTags(clientSpan, err, true)
clientSpan.LogFields(log.String("event", "error"), log.String("message", err.Error()))
}
if otgrpcOpts.decorator != nil {
otgrpcOpts.decorator(ctx, clientSpan, method, req, resp, err)
}
return err
}
}

span获取方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
if parent := opentracing.SpanFromContext(ctx); parent != nil {
parentCtx = parent.Context()
}

func SpanFromContext(ctx context.Context) Span {
val := ctx.Value(activeSpanKey)
if sp, ok := val.(Span); ok {
return sp
}
return nil
}

// 是通过context获取,则这里可以将span注入
func ContextWithSpan(ctx context.Context, span Span) context.Context {
if span != nil {
if tracerWithHook, ok := span.Tracer().(TracerContextWithSpanExtension); ok {
ctx = tracerWithHook.ContextWithSpanHook(ctx, span)
}
}
return context.WithValue(ctx, activeSpanKey, span)
}

从源码中可以看到,tracer使用的是传入的tracer,这里传入global tracer即可。

作为下游gRPC的服务端,配置tracer的拦截器即可

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
package main

import (
"bytes"
"context"
"fmt"
"io"
"log"
"net"
"time"

"github.com/google/uuid"
"github.com/grpc-ecosystem/grpc-opentracing/go/otgrpc"
"github.com/opentracing/opentracing-go"
"github.com/uber/jaeger-client-go"
jaegercfg "github.com/uber/jaeger-client-go/config"
jaegerlog "github.com/uber/jaeger-client-go/log"
"github.com/uber/jaeger-lib/metrics"
"google.golang.org/grpc"

"gostudy/920/minegrpc"
)

type Service struct {
minegrpc.UnimplementedGreeterServer
}

var serviceID string

func (s *Service) SayHello(ctx context.Context, req *minegrpc.HelloRequest) (*minegrpc.HelloReply, error) {
// 其他业务逻辑使用span
parentSpan := opentracing.SpanFromContext(ctx)

spanA := opentracing.StartSpan("logic do something", opentracing.ChildOf(parentSpan.Context()))
fmt.Println("logic A do something")
time.Sleep(100 * time.Millisecond)
spanA.Finish()

spanB := opentracing.StartSpan("logic do something", opentracing.ChildOf(parentSpan.Context()))
fmt.Println("logic B do something")
time.Sleep(100 * time.Millisecond)
spanB.Finish()

return &minegrpc.HelloReply{
Message: fmt.Sprintf("i am service %s", serviceID),
}, nil
}

func main() {
log.SetFlags(log.Lshortfile)
tracerClose := initTracer()
defer tracerClose.Close()

newUUID, _ := uuid.NewUUID()
serviceID = newUUID.String()

listener, err := net.Listen("tcp", ":8080")
if err != nil {
log.Fatal(err)
}
defer listener.Close()

opentracingOpts := []otgrpc.Option{
otgrpc.IncludingSpans(func(parentSpanCtx opentracing.SpanContext, method string, req, resp interface{}) bool {
if method == "/grpc.health.v1.Health/Check" {
return false
}
return true
}),
}

s := grpc.NewServer(grpc.UnaryInterceptor(
// 服务端拦截器,需要注意,拦截器要在GlobalTracer注册之后,不然使用的就是一个nil tracer
otgrpc.OpenTracingServerInterceptor(opentracing.GlobalTracer(), opentracingOpts...)),
)
minegrpc.RegisterGreeterServer(s, new(Service)) // 注册calc服务

log.Fatal(s.Serve(listener))
}

func initTracer() io.Closer {
cfg := jaegercfg.Configuration{
// 采样配置
Sampler: &jaegercfg.SamplerConfig{
Type: jaeger.SamplerTypeConst,
Param: 1,
},
// 采样报告服务器地址
Reporter: &jaegercfg.ReporterConfig{
LogSpans: true,
LocalAgentHostPort: "192.168.51.16:6831",
},
ServiceName: "mitaka_server",
}

jLogger := jaegerlog.StdLogger
jMetricsFactory := metrics.NullFactory

// 注册全局tracer
closer, err := cfg.InitGlobalTracer(
"mitaka_server",
jaegercfg.Logger(jLogger),
jaegercfg.Metrics(jMetricsFactory),
)
if err != nil {
log.Printf("Could not initialize jaeger tracer: %s", err.Error())
return io.NopCloser(bytes.NewReader(nil))
}
return closer
}

客户端执行

1
2
3
4
5
6
2022/10/20 15:46:38 Initializing logging reporter
2022/10/20 15:46:38 Reporting span 70f332d6ce6d5b42:36bfff52065de34e:70f332d6ce6d5b42:1
reply: message:"i am service 52dc416e-504b-11ed-9d12-be5e6078b62c"
2022/10/20 15:46:38 Reporting span 70f332d6ce6d5b42:70f332d6ce6d5b42:0000000000000000:1
2022/10/20 15:46:38 DEBUG: closing tracer
2022/10/20 15:46:38 DEBUG: closing reporter

服务端

1
2
3
4
5
6
logger.go:47: Initializing logging reporter
logic A do something
logger.go:47: Reporting span 70f332d6ce6d5b42:365a31e17a1c09fa:783980e40ebd1267:1
logic B do something
logger.go:47: Reporting span 70f332d6ce6d5b42:1b6b52d51c122cc5:783980e40ebd1267:1
logger.go:47: Reporting span 70f332d6ce6d5b42:783980e40ebd1267:36bfff52065de34e:1

格式为 traceID:spanID:parentID:flags

image-20221020154751742

源码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
func OpenTracingServerInterceptor(tracer opentracing.Tracer, optFuncs ...Option) grpc.UnaryServerInterceptor {
otgrpcOpts := newOptions()
otgrpcOpts.apply(optFuncs...)
return func(
ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (resp interface{}, err error) {
// 从元数据中获取span信息
spanContext, err := extractSpanContext(ctx, tracer)
if err != nil && err != opentracing.ErrSpanContextNotFound {
// TODO: establish some sort of error reporting mechanism here. We
// don't know where to put such an error and must rely on Tracer
// implementations to do something appropriate for the time being.
}
if otgrpcOpts.inclusionFunc != nil &&
!otgrpcOpts.inclusionFunc(spanContext, info.FullMethod, req, nil) {
return handler(ctx, req)
}
serverSpan := tracer.StartSpan(
// grpc调用,span名称是grpc全称,后续可以用于过滤一些请求,不记录到trace中
info.FullMethod,
ext.RPCServerOption(spanContext),
gRPCComponentTag,
)
defer serverSpan.Finish()

ctx = opentracing.ContextWithSpan(ctx, serverSpan)
if otgrpcOpts.logPayloads {
serverSpan.LogFields(log.Object("gRPC request", req))
}
// 处理请求
resp, err = handler(ctx, req)
if err == nil {
if otgrpcOpts.logPayloads {
serverSpan.LogFields(log.Object("gRPC response", resp))
}
} else {
SetSpanTags(serverSpan, err, false)
serverSpan.LogFields(log.String("event", "error"), log.String("message", err.Error()))
}
if otgrpcOpts.decorator != nil {
otgrpcOpts.decorator(ctx, serverSpan, info.FullMethod, req, resp, err)
}
return resp, err
}
}

所有的span是服务和服务之间传递,实际上,span可以在服务内部使用

1
2
3
parentSpan := opentracing.SpanFromContext(ctx)
span := opentracing.GlobalTracer().StartSpan("do something", opentracing.ChildOf(parentSpan.Context()))
defer span.Finish()

应用到http

服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
package main

import (
"bytes"
"context"
"io"
"log"
"net/http"
"time"

"github.com/gin-gonic/gin"
"github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/ext"
"github.com/uber/jaeger-client-go"
jaegercfg "github.com/uber/jaeger-client-go/config"
jaegerlog "github.com/uber/jaeger-client-go/log"
"github.com/uber/jaeger-lib/metrics"
"golang.org/x/sync/errgroup"
)

func main() {
closer := initTracer()
defer closer.Close()

engine := gin.Default()
// 通过midware实现tracer功能
engine.Use(tracerMidware())
engine.GET("/hello/http", func(c *gin.Context) {
var ctx context.Context
spanCtx, ok := c.Get("span_ctx")
if ok {
ctx = spanCtx.(context.Context)
}
parentSpan := opentracing.SpanFromContext(ctx)

eg, ctx := errgroup.WithContext(ctx)
eg.Go(func() error {
spanA := opentracing.StartSpan("do something A", opentracing.ChildOf(parentSpan.Context()))
time.Sleep(100 * time.Millisecond)
spanA.Finish()
return nil
})
eg.Go(func() error {
spanA := opentracing.StartSpan("do something A", opentracing.ChildOf(parentSpan.Context()))
time.Sleep(100 * time.Millisecond)
spanA.Finish()
return nil
})
_ = eg.Wait()
c.JSON(http.StatusOK, "I am gin server")
})

engine.Run(":8080")
}

func tracerMidware() gin.HandlerFunc {
return func(c *gin.Context) {
var span opentracing.Span
operationName := c.Request.URL.Path
// 从header中获取tracer信息
wireContext, err := opentracing.GlobalTracer().Extract(
opentracing.HTTPHeaders,
opentracing.HTTPHeadersCarrier(c.Request.Header))
if err == nil {
// 获取到,则使用客户端传入的span
span = opentracing.StartSpan(
operationName, ext.RPCServerOption(wireContext))
} else {
// 如果没有获取到,则开启一个root span
span = opentracing.StartSpan(operationName)
}
defer span.Finish()
ctx := opentracing.ContextWithSpan(context.Background(), span)
c.Set("span_ctx", ctx)
c.Next()
}
}

func initTracer() io.Closer {
cfg := jaegercfg.Configuration{
// 采样配置
Sampler: &jaegercfg.SamplerConfig{
Type: jaeger.SamplerTypeConst,
Param: 1,
},
// 采样报告服务器地址
Reporter: &jaegercfg.ReporterConfig{
LogSpans: true,
LocalAgentHostPort: "192.168.51.16:6831",
},
ServiceName: "mitaka_server",
}

jLogger := jaegerlog.StdLogger
jMetricsFactory := metrics.NullFactory

// 注册全局tracer
closer, err := cfg.InitGlobalTracer(
"mitaka_server",
jaegercfg.Logger(jLogger),
jaegercfg.Metrics(jMetricsFactory),
)
if err != nil {
log.Printf("Could not initialize jaeger tracer: %s", err.Error())
return io.NopCloser(bytes.NewReader(nil))
}
return closer
}

客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
package main

import (
"bytes"
"fmt"
"io"
"log"
"net/http"

"github.com/opentracing/opentracing-go"
"github.com/uber/jaeger-client-go"
jaegercfg "github.com/uber/jaeger-client-go/config"
jaegerlog "github.com/uber/jaeger-client-go/log"
"github.com/uber/jaeger-lib/metrics"
)

func main() {
closer := initTracer()
defer closer.Close()

span := opentracing.StartSpan("client make a requet")
defer span.Finish()

httpClient := &http.Client{}
httpReq, _ := http.NewRequest("GET", "http://127.0.0.1:8080/hello/http?a=v", nil)

// 发送请求时,将tracer注入到header中
opentracing.GlobalTracer().Inject(
span.Context(),
opentracing.HTTPHeaders,
opentracing.HTTPHeadersCarrier(httpReq.Header))

resp, err := httpClient.Do(httpReq)
if err != nil {
log.Fatal(err)
}
data, _ := io.ReadAll(resp.Body)
fmt.Println(string(data))
}

func initTracer() io.Closer {
cfg := jaegercfg.Configuration{
// 采样配置
Sampler: &jaegercfg.SamplerConfig{
Type: jaeger.SamplerTypeConst,
Param: 1,
},
// 采样报告服务器地址
Reporter: &jaegercfg.ReporterConfig{
LogSpans: true,
LocalAgentHostPort: "192.168.51.16:6831",
},
ServiceName: "mitaka_client",
}

jLogger := jaegerlog.StdLogger
jMetricsFactory := metrics.NullFactory

// 注册全局tracer
closer, err := cfg.InitGlobalTracer(
"mitaka_client",
jaegercfg.Logger(jLogger),
jaegercfg.Metrics(jMetricsFactory),
)
if err != nil {
log.Printf("Could not initialize jaeger tracer: %s", err.Error())
return io.NopCloser(bytes.NewReader(nil))
}
return closer
}

过滤

当一些探测请求发过来时,默认情况下也会记录到链路追踪,这里就可以使用过滤器过滤掉,例如健康检查

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 通过option实现
func OpenTracingServerInterceptor(tracer opentracing.Tracer, optFuncs ...Option) grpc.UnaryServerInterceptor {}
type Option func(o *options)
// 通过过滤功能
type SpanInclusionFunc func(
parentSpanCtx opentracing.SpanContext,
method string,
req, resp interface{}) bool

// IncludingSpans binds a IncludeSpanFunc to the options
func IncludingSpans(inclusionFunc SpanInclusionFunc) Option {
return func(o *options) {
o.inclusionFunc = inclusionFunc
}
}

具体实现

1
2
3
4
5
6
7
8
9
10
11
12
opentracingOpts := []otgrpc.Option{
otgrpc.IncludingSpans(func(parentSpanCtx opentracing.SpanContext, method string, req, resp interface{}) bool {
if method == "/grpc.health.v1.Health/Check" {
return false
}
return true
}),
}

s := grpc.NewServer(grpc.UnaryInterceptor(
otgrpc.OpenTracingServerInterceptor(opentracing.GlobalTracer(), opentracingOpts...)),
)

推荐阅读:

什么是链路追踪?分布式系统如何实现链路追踪?

链路追踪Tracing Analysis

链路追踪

链路追踪–选型/对比/工具/方案/分布式

opentracing文档中文版 ( 翻译 ) 吴晟

分布式服务调用链路追踪——方案选型

jaeger

www.jaegertracing.io/