源码角度了解Skywalking之服务端OAP对Trace的处理
从前几篇的文章我们知道Skywalking对Trace信息进行生成收集后,将TraceSegment对象转换为UpstreamSegment对象,通过GRPC发送给OAP服务端,服务端处理对应的模块是skywalking-trace-receiver-plugin模块
TraceModuleProvider向GRPCHandlerRegister中添加处理器TraceSegmentReportServiceHandler
接收Agent数据
TraceSegmentReportServiceHandler的collect()方法接收Agent的数据,调用SegmentParseV2.Producer的send()方法发送
SegmentParseV2.Producer的send()方法:
public void send(UpstreamSegment segment, SegmentSource source) {
SegmentParseV2 segmentParse = new SegmentParseV2(moduleManager, listenerManager, config);
segmentParse.setStandardizationWorker(standardizationWorker);
segmentParse.parse(new BufferData<>(segment), source);
}
- 创建SegmentParseV2解析器
- 调用SegmentParseV2的parse()方法
SegmentParseV2的parse()方法:
- 创建SpanListener集合
- 获取UpstreamSegment对象
- 获取UpstreamSegment对象中关联的所有的TraceID
- 如果UpstreamSegment中的SegmentObject实例为空,就解析UpstreamSegment实例得到SegmentObject对象进行填充
- 重新检查段信息是否来自文件缓冲区,如果缓存中不存这个段信息对应的服务实例Id,然后返回true
- 把SegmentObject对象封装成SegmentDecorator对象,这里是装饰者模式的体现
- 调用preBuild()方法进行预构建操作,预构建不成功写入缓存文件中,构建成功会通知具体的监听器来进行构建
预构建
SegmentParseV2的preBuild()方法:
- 构建SegmentCoreInfo对象中的segmentId
- 调用notifyGlobalsListener()方法通知这个TraceSegment所关联的TraceId对应的监听器进行解析TraceId,需要采样的进行采样
- 将Segment信息填充到SegmentCoreInfo对象中
- 遍历TraceSegment的所有span,如果是TraceSegment的第一个span,调用notifyFirstListener()方法解析第一个span,将SegmentCoreInfo对象的属性添加到Segment对象中,记录firstEndpointId和firstEndpointName,其实就是对应的请求URL,根据Span类型通知不同的监听类
通知监听器者构建
这个方法遍历所有的span调用SegmentSpanListener的build()方法,设置Segment信息的端点id和端点名后调用SourceReceiverImpl的receive()方法,最终调用SegmentDispatcher的dispatch()方法
SegmentDispatcher的dispatch()方法:
@Override public void dispatch(Segment source) {
SegmentRecord segment = new SegmentRecord();
segment.setSegmentId(source.getSegmentId());
segment.setTraceId(source.getTraceId());
segment.setServiceId(source.getServiceId());
segment.setServiceInstanceId(source.getServiceInstanceId());
segment.setEndpointName(source.getEndpointName());
segment.setEndpointId(source.getEndpointId());
segment.setStartTime(source.getStartTime());
segment.setEndTime(source.getEndTime());
segment.setLatency(source.getLatency());
segment.setIsError(source.getIsError());
segment.setDataBinary(source.getDataBinary());
segment.setTimeBucket(source.getTimeBucket());
segment.setVersion(source.getVersion());
RecordStreamProcessor.getInstance().in(segment);
}
组装SegmentRecord对象,通过RecordStreamProcessor创建实例,in()方法中调用RecordPersistentWorker来批量异步插入ES数据库中。
总结
这篇文章主要讲解了Skywalking的OAP接收到Agent发来的Trace信息的处理逻辑,入口是TraceSegmentReportServiceHandler的collect()方法,会对Agent封装的UpstreamSegment对象进行反序列化,构建Segment、Span等信息,最终由RecordStreamProcessor来批量异步把SegmentRecord写入ES数据库中
❤️ 感谢大家
如果你觉得这篇内容对你挺有有帮助的话:
- 欢迎关注我❤️,点赞 标签:Trace,对象,方法,source,源码,UpstreamSegment,SegmentParseV2,OAP,segment From: https://blog.51cto.com/u_15460453/5747555