分布式追踪 SkyWalking 源码分析四 Agent 收集 && 发送 trace数据
分布式链路追踪系统,链路的追踪大体流程如下:
- Agent 收集 Trace 数据。
- Agent 发送 Trace 数据给 Collector 。
- Collector 接收 Trace 数据。
- Collector 存储 Trace 数据到存储器,例如,数据库。
org.skywalking.apm.agent.core.context.trace.TraceSegment
,是一次分布式链路追踪( Distributed Trace ) 的一段。
- 一条 TraceSegment ,用于记录所在线程( Thread )的链路。
- 一次分布式链路追踪,可以包含多条 TraceSegment ,因为存在跨进程( 例如,RPC 、MQ 等等),或者垮线程( 例如,并发执行、异步回调等等 )。
traceSegmentId
属性,TraceSegment 的编号,全局唯一
spans
属性,包含的 Span 数组。这是 TraceSegment 的主体,总的来说,TraceSegment 是 Span 数组的封装。
我们先来看看一个爸爸的情况,常见于 RPC 调用。例如,【服务 A】调用【服务 B】时,【服务 B】新建一个 TraceSegment 对象:
- 将自己的
refs
指向【服务 A】的 TraceSegment 。 - 将自己的
relatedGlobalTraces
设置为 【服务 A】的 DistributedTraceId 对象。
2.1 ID
org.skywalking.apm.agent.core.context.ids.ID
,编号。从类的定义上,这是一个通用的编号,由三段整数组成。
目前使用 GlobalIdGenerator 生成,作为全局唯一编号。属性如下:
-
part1
属性,应用实例编号。 -
part2
属性,线程编号。 -
part3
属性,时间戳串,生成方式为${时间戳} * 10000 + 线程自增序列([0, 9999])
。例如:15127007074950012 。具体生成方法的代码,在 GlobalIdGenerator 中详细解析。 -
encoding
属性,编码后的字符串。格式为"${part1}.${part2}.${part3}"
。例如,"12.35.15127007074950000"
。- 使用
#encode()
方法,编码编号。
- 使用
-
isValid
属性,编号是否合法。- 使用
ID(encodingString)
构造方法,解析字符串,生成 ID 。
- 使用
org.skywalking.apm.agent.core.context.ids.NewDistributedTraceId ,新建的分布式链路追踪编号。当全局链路追踪开始,创建 TraceSegment 对象的过程中,会调用 DistributedTraceId()
构造方法,创建 DistributedTraceId 对象。该构造方法内部会调用 GlobalIdGenerator#generate()
方法,创建 ID 对象。
#setOperationId(operationId)
方法,设置操作编号。考虑到操作名是字符串,Agent 发送给 Collector 占用流量较大。因此,Agent 会将操作注册到 Collector ,生成操作编号。
2.2.1 Tag
2.2.1.1 AbstractTag
org.skywalking.apm.agent.core.context.tag.AbstractTag<T>
,标签抽象类。注意,这个类的用途是将标签属性设置到 Span 上,或者说,它是设置 Span 的标签的工具类。代码如下:
-
key
属性,标签的键。 -
#set(AbstractSpan span, T tagValue)
抽象方法,设置 Span 的标签键key
的值为tagValue
关于span的类继承图
Span 只有三种实现类:
- EntrySpan :入口 Span
- LocalSpan :本地 Span
- ExitSpan :出口 Span
2.2.2.2.1 EntrySpan
org.skywalking.apm.agent.core.context.trace.EntrySpan
,实现 StackBasedTracingSpan 抽象类,入口 Span ,用于服务提供者( Service Provider ) ,例如 Tomcat 。
那么为什么 EntrySpan 继承 StackBasedTracingSpan ?
例如,我们常用的 SprintBoot 场景下,Agent 会在 SkyWalking 插件在 Tomcat 定义的方法切面,创建 EntrySpan 对象,也会在 SkyWalking 插件在 SpringMVC 定义的方法切面,创建 EntrySpan 对象。那岂不是出现两个 EntrySpan ,一个 TraceSegment 出现了两个入口 Span ?
答案是当然不会!Agent 只会在第一个方法切面,生成 EntrySpan 对象,第二个方法切面,栈深度 + 1。这也是上面我们看到的 #finish(TraceSegment)
方法,只在栈深度为零时,出栈成功。通过这样的方式,保持一个 TraceSegment 有且仅有一个 EntrySpan 对象。
对新进入的方法切面,就把栈深度+1
而对于StackBasedTracingSpan的finish方法,把栈深度减少
2.2.2.2.2 ExitSpan
org.skywalking.apm.agent.core.context.trace.ExitSpan
,继承 StackBasedTracingSpan 抽象类,出口 Span ,用于服务消费者( Service Consumer ),例如 HttpClient 、MongoDBClient 。
那么为什么 ExitSpan 继承 StackBasedTracingSpan ?
例如,我们可能在使用的 Dubbox 场景下,【Dubbox 服务 A】使用 HTTP 调用【Dubbox 服务 B】时,实际过程是,【Dubbox 服务 A】=》【HttpClient】=》【Dubbox 服务 B】。Agent 会在【Dubbox 服务 A】创建 ExitSpan 对象,也会在 【HttpClient】创建 ExitSpan 对象。那岂不是一次出口,出现两个 ExitSpan ?
答案是当然不会!Agent 只会在【Dubbox 服务 A】,生成 EntrySpan 对象,第二个方法切面,栈深度 + 1。这也是上面我们看到的 #finish(TraceSegment)
方法,只在栈深度为零时,出栈成功。通过这样的方式,保持一次出口有且仅有一个 ExitSpan 对象。
当然,一个 TraceSegment 会有多个 ExitSpan 对象 ,例如【服务 A】远程调用【服务 B】,然后【服务 A】再次远程调用【服务 B】,或者然后【服务 A】远程调用【服务 C】。
2.3 TraceSegmentRef
org.skywalking.apm.agent.core.context.trace.TraceSegmentRef
,TraceSegment 指向,通过 traceSegmentId
和 spanId
属性,指向父级 TraceSegment 的指定 Span 。
3. Context
在 「2. Trace」 中,我们看了 Trace 的数据结构,本小节,我们一起来看看 Context 是怎么收集 Trace 数据的。
3.1 ContextManager
org.skywalking.apm.agent.core.context.ContextManager
,实现了 BootService 、TracingContextListener 、IgnoreTracerContextListener 接口,链路追踪上下文管理器。
CONTEXT
静态属性,线程变量,存储 AbstractTracerContext 对象。为什么是线程变量呢?
一个 TraceSegment 对象,关联到一个线程,负责收集该线程的链路追踪数据,因此使用线程变量。
而一个 AbstractTracerContext 会关联一个 TraceSegment 对象,ContextManager 负责获取、创建、销毁 AbstractTracerContext 对象。
#getOrCreate(operationName, forceSampling)
静态方法,获取 AbstractTracerContext 对象。若不存在,进行创建。
- 要需要收集 Trace 数据的情况下,创建 TracingContext 对象。
- 不需要收集 Trace 数据的情况下,创建 IgnoredTracerContext 对象。
在下面的 #createEntrySpan(...)
、#createLocalSpan(...)
、#createExitSpan(...)
等等方法中,都会调用 AbstractTracerContext 提供的方法。这些方法的代码,我们放在 「3.2 AbstractTracerContext」 一起解析,保证流程的整体性。
另外,ContextManager 封装了所有 AbstractTracerContext 提供的方法,从而实现,外部调用者,例如 SkyWalking 的插件,只调用 ContextManager 的方法,而不调用 AbstractTracerContext 的方法。
创建出traceContext
+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
核心类实现TracingContext
创建EntrySpan
父span存在,就直接start;父span不存在,就新建一个EntrySpan
创建exitSpan,原理类似
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
结束span
调用pop弹栈,然后调用finish,结束本线程的traceSegment
3.2.3.3 传输
org.skywalking.apm.agent.core.context.CarrierItem
,传输载体项。代码如:
-
headKey
属性,Header 键。 -
headValue
属性,Header 值。 -
next
属性,下一个项。
CarrierItem 有两个子类:
- CarrierItemHead :Carrier 项的头( Head ),即首个元素。
-
SW3CarrierItem :
header = sw3
,用于传输 ContextCarrier 。
+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Agent发送trace数据
Agent 收集到 Trace 数据后,不是写入外部消息队列( 例如,Kafka )或者日志文件,而是 Agent 写入内存消息队列,后台线程【异步】发送给 Collector 。
核心类为TraceSegmentServiceClient,负责将 TraceSegment 异步发送到 Collector
核心方法consume
1.判断状态是Connected
2.开启一个观察器 upstreamSegmentStreamObserver
3.循环data,然后转换并且把这个upstreamSegment,发送到collector