首页 > 编程语言 >源码角度了解Skywalking之建立连接与服务注册

源码角度了解Skywalking之建立连接与服务注册

时间:2022-10-04 19:31:47浏览次数:51  
标签:实例 GRPCChannelManager boot Agent INSTANCE 源码 注册 Skywalking 方法

源码角度了解Skywalking之建立连接与服务注册

这篇文章主要讲一下Agent与OAP建立连接并进行服务注册。

从SkyWalking的启动流程SkyWalkingAgent的premain()中我们了解到调用ServiceManager.INSTANCE.boot()来启动插件服务,并分析了利用SPI机制加载配置的各种插件的BootService,然后依次遍历BootService实例,调用他们的prepare()准备方法、startup();启动方法和onComplete()完成方法。

ServiceManager.INSTANCE.boot()方法:

public void boot() {
        bootedServices = loadAllServices();

        prepare();
        startup();
        onComplete();
    }

我们先看一下prepare()方法,方法中遍历配置的BootService实现类,依次调用他们的prepare()方法,其中就有一个GRPCChannelManager类

GRPCChannelManager

GRPCChannelManager从名字就可以看出来,它是对gprc协议管道的管理类,它只实现了boot()方法

boot()方法

boot()方法的关键代码:

public static long GRPC_CHANNEL_CHECK_INTERVAL = 30;
connectCheckFuture = Executors
            .newSingleThreadScheduledExecutor(new DefaultNamedThreadFactory("GRPCChannelManager"))
            .scheduleAtFixedRate(new RunnableWithExceptionProtection(this, new RunnableWithExceptionProtection.CallbackWhenException() {
                @Override
                public void handle(Throwable t) {
                    logger.error("unexpected exception.", t);
                }
            }), 0, Config.Collector.GRPC_CHANNEL_CHECK_INTERVAL, TimeUnit.SECONDS);

也就是开启了通过线程池开启了开启,每隔30s执行一次,那么这个线程主要是做什么工作的呢?

我们看一下GRPCChannelManager类的run()方法

run()方法

第一步:根据ip和端口与指定OAP实例建立连接

第二步:连接成功后调用notify()方法通知所有的GRPCChannelListener连接成功的状态,GRPCChannelListener会调用statusChanged()方法来改变连接状态的值

第三步:设置重新连接为false

也就是它建立网络连接并且通知其他监听者的类

ServiceAndEndpointRegisterClient

ServiceAndEndpointRegisterClient既实现了BootService接口,又实现了GRPCChannelListener接口,主要的工作就是完成服务和端点的注册

prepare()方法

@Override
    public void prepare() throws Throwable {
        ServiceManager.INSTANCE.findService(GRPCChannelManager.class).addChannelListener(this);

        INSTANCE_UUID = StringUtil.isEmpty(Config.Agent.INSTANCE_UUID) ? UUID.randomUUID().toString()
            .replaceAll("-", "") : Config.Agent.INSTANCE_UUID;

        SERVICE_INSTANCE_PROPERTIES = new ArrayList<KeyStringValuePair>();

        for (String key : Config.Agent.INSTANCE_PROPERTIES.keySet()) {
            SERVICE_INSTANCE_PROPERTIES.add(KeyStringValuePair.newBuilder()
                .setKey(key).setValue(Config.Agent.INSTANCE_PROPERTIES.get(key)).build());
        }
    }
  1. 查找GRPCChannelManager实例添加ServiceAndEndpointRegisterClient实例作为监听
  2. 生成实例UUID,先从agent.config文件中读取,如果文件中没有定义生成UUID
  3. 读取agent.config文件中的配置信息添加到SERVICE_INSTANCE_PROPERTIES集合中

boot()方法

public static long APP_AND_SERVICE_REGISTER_CHECK_INTERVAL = 3;
@Override
    public void boot() throws Throwable {
        applicationRegisterFuture = Executors
            .newSingleThreadScheduledExecutor(new DefaultNamedThreadFactory("ServiceAndEndpointRegisterClient"))
            .scheduleAtFixedRate(new RunnableWithExceptionProtection(this, new RunnableWithExceptionProtection.CallbackWhenException() {
                @Override
                public void handle(Throwable t) {
                    logger.error("unexpected exception.", t);
                }
            }), 0, Config.Collector.APP_AND_SERVICE_REGISTER_CHECK_INTERVAL, TimeUnit.SECONDS);
    }

boot()方法中会每3s执行一次run()

run()方法

run()方法的代码同样比较长,主要分为三个部分,

第一部分:如果Agent还没有完成了注册,也就是服务id为空

通过doServiceRegister()方法进行服务注册,返回服务注册映射关系,遍历关系集合得到服务id

第二部分:已经完成了注册,判断服务实例id是否为空,如果为空

通过ServiceId、实例uuid和时间戳等参数来调用doServiceInstanceRegister()方法,得到ServiceInstanceRegisterMapping映射对象,遍历映射信息得到服务实例id

第三部分:已经完成了注册,并且服务实例id不为空

这种情况下定期发送心跳请求,也就是调用serviceInstancePingStub的doPing()方法,通知Skywalking的OAP服务Agent状态良好。然后同步Endpoint的映射关系和同步网络地址的映射关系。

总结

这篇文章我们详细详解了Skywalking初始化过程中ServiceManager.INSTANCE.boot() 代码的具体逻辑,这里提及到了两个BootService接口,一个是GRPCChannelManager类,这个类每30s调用线程来通过ip和端口与OAP建立连接,连接成功后通知监听者状态已经改变,另一个类是ServiceAndEndpointRegisterClient,这个类在准备方法中建立与GRPCChannelManager的监听类,当GRPCChannelManager建立完连接后,通过这个连接创建两个stub客户端,启动方法中每3s开启线程进行服务注册并获取服务实例的id,然后向OAP发送心跳请求来表示自己的状态良好,同步Endpoint映射关系和网络地址映射关系。

❤️ 感谢大家

如果你觉得这篇内容对你挺有有帮助的话:

  1. 欢迎关注我❤️,点赞

    标签:实例,GRPCChannelManager,boot,Agent,INSTANCE,源码,注册,Skywalking,方法
    From: https://blog.51cto.com/u_15460453/5731518

相关文章

  • 源码角度了解Skywalking之Trace信息的生成
    源码角度了解Skywalking之Trace信息的生成TraceId是分布式链路的一个信息,可以通过它定位一条链路TraceId的生成Skwalking的TraceId的生成是通过GlobalIdGenerator的gener......
  • 从vue源码中学习观察者模式
    摘要:源码解读设计模式系列文章将陆陆续续进行更新中~摘要:源码解读设计模式系列文章将陆陆续续进行更新中~观察者模式首先话题下来,我们得反问一下自己,什么是观察者模式......
  • 118-22-ZooKeeper 基础设施详解 和 服务启动流程源码分析_ev
         ......
  • PCA算法介绍及源码实现
    前言PCA(主成分分析)是十大经典机器学习算法之一。PCA是Pearson在1901年提出的,后来由Hotelling在1933年加以发展提出的一种多变量的统计方法。PCA算法介绍PCA(principalc......
  • skywalking 实现钉钉告警
    一、映射本地告警文件到docker-compose1.1、skywalking告警-指标说明#catconfig/oal/core.oalservice_resp_time#服务的响应时间service_sla#服务的http请求成功......
  • 源码、反码、补码和精度损失
    数据类型转换,转换过程中可能导致溢出或损失精度1.源码:源码就是二进制的数字并且开头的一位代表符号位。例:(+1)的源码:00000001(-1)的源码:100000012.反码:正数......
  • React源码解读之任务调度
    前言简单说下为什么React选择函数式组件,主要是class组件比较冗余、生命周期函数写法不友好,骚写法多,functional组件更符合React编程思想等等等。更具体的可以拜读dan大神的b......
  • react的useState源码分析
    前言简单说下为什么React选择函数式组件,主要是class组件比较冗余、生命周期函数写法不友好,骚写法多,functional组件更符合React编程思想等等等。更具体的可以拜读dan大神的b......
  • skywalking 实现收集基于python的Django项目链路追踪案例
    一、python3环境设置1.1、安装python3apt-getupdateaptinstallpython3-pip-ypipinstall"apache-skywalking"[root@skywalking-agent-07~]#pipinstall"apache-s......
  • Collections之Arraylist源码解读(四)
    ......