首页 > 编程语言 >【sentinel】sentinel客户端与dashboard通讯源码分析

【sentinel】sentinel客户端与dashboard通讯源码分析

时间:2023-06-17 21:05:39浏览次数:298  
标签:return 源码 command dashboard CommandResponse sentinel new type


Sentinel客户端的规则大部分都是在Dashboard上完成配置,那么Sentinel客户端与Dashboard之间是如何进行通讯的呢?

客户端定时向dashboard发送心跳

要想实现Sentinel客户端与Dashboard进行通讯,我们需要引入下面的依赖:

<dependency>
    <groupId>com.alibaba.csp</groupId>
    <artifactId>sentinel-transport-common</artifactId>
    <version>${project.version}</version>
</dependency>

<dependency>
    <groupId>com.alibaba.csp</groupId>
    <artifactId>sentinel-transport-simple-http</artifactId>
    <version>${project.version}</version>
</dependency>

当然只需要引入sentinel-transport-simple-http即可,他里面已经引入了sentinel-transport-common,如果使用了spring-cloud-starter-alibaba-sentinel也会引入上述的包。

sentinel-transport-common中通过spi机制引入了两个实现了InitFunc接口的实现类:

  • HeartbeatSenderInitFunc:负责客户端发送心跳至dashboard
  • CommandCenterInitFunc:负责各种HTTP接口请求

HeartbeatSenderInitFunc#init主要就是负责构建一个定时线程池,然后定时发送心跳给dashboard,默认间隔时间为10s。

com.alibaba.csp.sentinel.transport.init.HeartbeatSenderInitFunc#init

private void initSchedulerIfNeeded() {
    if (pool == null) {
        pool = new ScheduledThreadPoolExecutor(2,
            new NamedThreadFactory("sentinel-heartbeat-send-task", true),
            new DiscardOldestPolicy());
    }
}

@Override
public void init() {
    HeartbeatSender sender = HeartbeatSenderProvider.getHeartbeatSender();
    if (sender == null) {
        RecordLog.warn("[HeartbeatSenderInitFunc] WARN: No HeartbeatSender loaded");
        return;
    }

    // 初始化ScheduledThreadPoolExecutor
    initSchedulerIfNeeded();

    // 定时发送间隔 1000 * 10
    long interval = retrieveInterval(sender);
    setIntervalIfNotExists(interval);

    // 定时发送心跳任务
    scheduleHeartbeatTask(sender, interval);
}

private boolean isValidHeartbeatInterval(Long interval) {
    return interval != null && interval > 0;
}

private void setIntervalIfNotExists(long interval) {
    SentinelConfig.setConfig(TransportConfig.HEARTBEAT_INTERVAL_MS, String.valueOf(interval));
}

long retrieveInterval(/*@NonNull*/ HeartbeatSender sender) {
    Long intervalInConfig = TransportConfig.getHeartbeatIntervalMs();
    if (isValidHeartbeatInterval(intervalInConfig)) {
        RecordLog.info("[HeartbeatSenderInitFunc] Using heartbeat interval "
            + "in Sentinel config property: " + intervalInConfig);
        return intervalInConfig;
    } else {
        long senderInterval = sender.intervalMs();
        RecordLog.info("[HeartbeatSenderInit] Heartbeat interval not configured in "
            + "config property or invalid, using sender default: " + senderInterval);
        return senderInterval;
    }
}

private void scheduleHeartbeatTask(/*@NonNull*/ final HeartbeatSender sender, /*@Valid*/ long interval) {
    pool.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            try {
                // 定时发送心跳任务
                sender.sendHeartbeat();
            } catch (Throwable e) {
                RecordLog.warn("[HeartbeatSender] Send heartbeat error", e);
            }
        }
    }, 5000, interval, TimeUnit.MILLISECONDS);
    RecordLog.info("[HeartbeatSenderInit] HeartbeatSender started: "
        + sender.getClass().getCanonicalName());
}

sendHeartbeat()负责具体的HTTP调用。

com.alibaba.csp.sentinel.transport.heartbeat.SimpleHttpHeartbeatSender#sendHeartbeat

public boolean sendHeartbeat() throws Exception {
    if (TransportConfig.getRuntimePort() <= 0) {
        RecordLog.info("[SimpleHttpHeartbeatSender] Command server port not initialized, won't send heartbeat");
        return false;
    }
    // 获取dashboard的地址。在配置文件中配置的地址
    Endpoint addrInfo = getAvailableAddress();
    if (addrInfo == null) {
        return false;
    }

    SimpleHttpRequest request = new SimpleHttpRequest(addrInfo, TransportConfig.getHeartbeatApiPath());
    // 请求的参数,主要是当前节点的端口,默认为8719
    request.setParams(heartBeat.generateCurrentMessage());
    try {
        // 请求dashboard
        SimpleHttpResponse response = httpClient.post(request);
        if (response.getStatusCode() == OK_STATUS) {
            return true;
        } else if (clientErrorCode(response.getStatusCode()) || serverErrorCode(response.getStatusCode())) {
            RecordLog.warn("[SimpleHttpHeartbeatSender] Failed to send heartbeat to " + addrInfo
                + ", http status code: " + response.getStatusCode());
        }
    } catch (Exception e) {
        RecordLog.warn("[SimpleHttpHeartbeatSender] Failed to send heartbeat to " + addrInfo, e);
    }
    return false;
}

注册完成后,就可以在dashboard的机器列表菜单看到了。

客户端启动HTTP服务

客户端会通过CommandCenterInitFunc类来启动一个HTTP服务。

com.alibaba.csp.sentinel.transport.init.CommandCenterInitFunc#init

public void init() throws Exception {
    CommandCenter commandCenter = CommandCenterProvider.getCommandCenter();

    if (commandCenter == null) {
        RecordLog.warn("[CommandCenterInitFunc] Cannot resolve CommandCenter");
        return;
    }

    /**
     * 注册CommandHandler
     * @see com.alibaba.csp.sentinel.transport.command.SimpleHttpCommandCenter#beforeStart()
     */
    commandCenter.beforeStart();
    /**
     * 启动ServerSocket,监听http请求
     * @see com.alibaba.csp.sentinel.transport.command.SimpleHttpCommandCenter#start()
     */
    commandCenter.start();
    RecordLog.info("[CommandCenterInit] Starting command center: "
            + commandCenter.getClass().getCanonicalName());
}

SimpleHttpCommandCenter会使用最原始的BIO(ServerSocket)来处理请求。

public void beforeStart() throws Exception {
    // Register handlers
    // 通过SPI加载所有的CommandHandler
    Map<String, CommandHandler> handlers = CommandHandlerProvider.getInstance().namedHandlers();
    // 将CommandHandler放入handlerMap
    registerCommands(handlers);
}

public void start() throws Exception {
  int nThreads = Runtime.getRuntime().availableProcessors();
  // bizExecutor负责处理业务请求
  this.bizExecutor = new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
      new ArrayBlockingQueue<Runnable>(10),
      new NamedThreadFactory("sentinel-command-center-service-executor"),
      new RejectedExecutionHandler() {
          @Override
          public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
              CommandCenterLog.info("EventTask rejected");
              throw new RejectedExecutionException();
          }
      });

  Runnable serverInitTask = new Runnable() {
      int port;

      {
          try {
              port = Integer.parseInt(TransportConfig.getPort());
          } catch (Exception e) {
              port = DEFAULT_PORT;
          }
      }

      @Override
      public void run() {
          boolean success = false;
          // 启动ServerSocket
          ServerSocket serverSocket = getServerSocketFromBasePort(port);

          if (serverSocket != null) {
              CommandCenterLog.info("[CommandCenter] Begin listening at port " + serverSocket.getLocalPort());
              socketReference = serverSocket;
              // ServerThread负责接收请求
              executor.submit(new ServerThread(serverSocket));
              success = true;
              port = serverSocket.getLocalPort();
          } else {
              CommandCenterLog.info("[CommandCenter] chooses port fail, http command center will not work");
          }

          if (!success) {
              port = PORT_UNINITIALIZED;
          }

          TransportConfig.setRuntimePort(port);
          executor.shutdown();
      }

  };

  // 初始化ServerSocket
  new Thread(serverInitTask).start();
}

dashboard拉取客户端的规则

Dashboard会通过接口http://localhost:8719/getRules?type=flow来拉取客户端的规则。

ServerThread主要负责接收请求,具体请求的处理交给bizExecutor线程池。

com.alibaba.csp.sentinel.transport.command.SimpleHttpCommandCenter.ServerThread#run

public void run() {
    while (true) {
        Socket socket = null;
        try {
            // 接收请求
            socket = this.serverSocket.accept();
            setSocketSoTimeout(socket);
            HttpEventTask eventTask = new HttpEventTask(socket);
            // 异步处理请求
            bizExecutor.submit(eventTask);
        } catch (Exception e) {
            CommandCenterLog.info("Server error", e);
            if (socket != null) {
                try {
                    socket.close();
                } catch (Exception e1) {
                    CommandCenterLog.info("Error when closing an opened socket", e1);
                }
            }
            try {
                // In case of infinite log.
                Thread.sleep(10);
            } catch (InterruptedException e1) {
                // Indicates the task should stop.
                break;
            }
        }
    }
}

HttpEventTask主要负责解析请求的url,根据url找到CommandHandler,具体请求的响应式交给CommandHandler来处理的,例如这里的url是/getRules,找到对应的CommandHandler为FetchActiveRuleCommandHandler。

com.alibaba.csp.sentinel.transport.command.http.HttpEventTask#run

public void run() {
    if (socket == null) {
        return;
    }

    PrintWriter printWriter = null;
    InputStream inputStream = null;
    try {
        long start = System.currentTimeMillis();
        inputStream = new BufferedInputStream(socket.getInputStream());
        OutputStream outputStream = socket.getOutputStream();

        printWriter = new PrintWriter(
            new OutputStreamWriter(outputStream, Charset.forName(SentinelConfig.charset())));

        // GET /getRules?type=flow HTTP/1.1
        String firstLine = readLine(inputStream);
        CommandCenterLog.info("[SimpleHttpCommandCenter] Socket income: " + firstLine
            + ", addr: " + socket.getInetAddress());
        CommandRequest request = processQueryString(firstLine);

        if (firstLine.length() > 4 && StringUtil.equalsIgnoreCase("POST", firstLine.substring(0, 4))) {
            // Deal with post method
            processPostRequest(inputStream, request);
        }

        // Validate the target command.
        // getRules
        String commandName = HttpCommandUtils.getTarget(request);
        if (StringUtil.isBlank(commandName)) {
            writeResponse(printWriter, StatusCode.BAD_REQUEST, INVALID_COMMAND_MESSAGE);
            return;
        }

        // Find the matching command handler.
        CommandHandler<?> commandHandler = SimpleHttpCommandCenter.getHandler(commandName);
        if (commandHandler != null) {
            /**
             * @see FetchActiveRuleCommandHandler#handle(com.alibaba.csp.sentinel.command.CommandRequest)
             */
            CommandResponse<?> response = commandHandler.handle(request);
            handleResponse(response, printWriter);
        } else {
            // No matching command handler.
            writeResponse(printWriter, StatusCode.BAD_REQUEST, "Unknown command `" + commandName + '`');
        }

        long cost = System.currentTimeMillis() - start;
        CommandCenterLog.info("[SimpleHttpCommandCenter] Deal a socket task: " + firstLine
            + ", address: " + socket.getInetAddress() + ", time cost: " + cost + " ms");
    } catch (RequestException e) {
        writeResponse(printWriter, e.getStatusCode(), e.getMessage());
    } catch (Throwable e) {
        CommandCenterLog.warn("[SimpleHttpCommandCenter] CommandCenter error", e);
        try {
            if (printWriter != null) {
                String errorMessage = SERVER_ERROR_MESSAGE;
                e.printStackTrace();
                if (!writtenHead) {
                    writeResponse(printWriter, StatusCode.INTERNAL_SERVER_ERROR, errorMessage);
                } else {
                    printWriter.println(errorMessage);
                }
                printWriter.flush();
            }
        } catch (Exception e1) {
            CommandCenterLog.warn("Failed to write error response", e1);
        }
    } finally {
        closeResource(inputStream);
        closeResource(printWriter);
        closeResource(socket);
    }
}

FetchActiveRuleCommandHandler主要是根据不同的规则类型从RuleManager中读取规则。
com.alibaba.csp.sentinel.command.handler.FetchActiveRuleCommandHandler#handle

public CommandResponse<String> handle(CommandRequest request) {
    String type = request.getParam("type");
    if ("flow".equalsIgnoreCase(type)) {
        return CommandResponse.ofSuccess(JSON.toJSONString(FlowRuleManager.getRules()));
    } else if ("degrade".equalsIgnoreCase(type)) {
        return CommandResponse.ofSuccess(JSON.toJSONString(DegradeRuleManager.getRules()));
    } else if ("authority".equalsIgnoreCase(type)) {
        return CommandResponse.ofSuccess(JSON.toJSONString(AuthorityRuleManager.getRules()));
    } else if ("system".equalsIgnoreCase(type)) {
        return CommandResponse.ofSuccess(JSON.toJSONString(SystemRuleManager.getRules()));
    } else {
        return CommandResponse.ofFailure(new IllegalArgumentException("invalid type"));
    }
}

dashboard修改配置推送客户端

Dashboard会通过接口http://localhost:8719/setRules来修改客户端的规则。

根据urlsetRules找到对应的CommandHandler为ModifyRulesCommandHandler。

ModifyRulesCommandHandler根据type找到RuleManager重新加载所有的规则,注意是全量更新而不是增加,所以如果想删除所有的规则,也需要传入一个空的集合。

com.alibaba.csp.sentinel.command.handler.ModifyRulesCommandHandler#handle

public CommandResponse<String> handle(CommandRequest request) {
    // XXX from 1.7.2, force to fail when fastjson is older than 1.2.12
    // We may need a better solution on this.
    if (VersionUtil.fromVersionString(JSON.VERSION) < FASTJSON_MINIMAL_VER) {
        // fastjson too old
        return CommandResponse.ofFailure(new RuntimeException("The \"fastjson-" + JSON.VERSION
                + "\" introduced in application is too old, you need fastjson-1.2.12 at least."));
    }
    String type = request.getParam("type");
    // rule data in get parameter
    String data = request.getParam("data");
    if (StringUtil.isNotEmpty(data)) {
        try {
            data = URLDecoder.decode(data, "utf-8");
        } catch (Exception e) {
            RecordLog.info("Decode rule data error", e);
            return CommandResponse.ofFailure(e, "decode rule data error");
        }
    }

    RecordLog.info("Receiving rule change (type: {}): {}", type, data);

    String result = "success";

    if (FLOW_RULE_TYPE.equalsIgnoreCase(type)) {
        List<FlowRule> flowRules = JSONArray.parseArray(data, FlowRule.class);
        // 重新加载所有的规则
        FlowRuleManager.loadRules(flowRules);
        if (!writeToDataSource(getFlowDataSource(), flowRules)) {
            result = WRITE_DS_FAILURE_MSG;
        }
        return CommandResponse.ofSuccess(result);
    } else if (AUTHORITY_RULE_TYPE.equalsIgnoreCase(type)) {
        List<AuthorityRule> rules = JSONArray.parseArray(data, AuthorityRule.class);
        AuthorityRuleManager.loadRules(rules);
        if (!writeToDataSource(getAuthorityDataSource(), rules)) {
            result = WRITE_DS_FAILURE_MSG;
        }
        return CommandResponse.ofSuccess(result);
    } else if (DEGRADE_RULE_TYPE.equalsIgnoreCase(type)) {
        List<DegradeRule> rules = JSONArray.parseArray(data, DegradeRule.class);
        DegradeRuleManager.loadRules(rules);
        if (!writeToDataSource(getDegradeDataSource(), rules)) {
            result = WRITE_DS_FAILURE_MSG;
        }
        return CommandResponse.ofSuccess(result);
    } else if (SYSTEM_RULE_TYPE.equalsIgnoreCase(type)) {
        List<SystemRule> rules = JSONArray.parseArray(data, SystemRule.class);
        SystemRuleManager.loadRules(rules);
        if (!writeToDataSource(getSystemSource(), rules)) {
            result = WRITE_DS_FAILURE_MSG;
        }
        return CommandResponse.ofSuccess(result);
    }
    return CommandResponse.ofFailure(new IllegalArgumentException("invalid type"));
}

至此我们可以发现Dashboard只是提供一个页面操作规则数据,本身并不存储任何的数据,所以Dashboard的重启并不会导致规则数据的丢失,但是应用的重启会导致规则数据的丢失。

上面的代码我们发现在重新加载完规则后,都会调用一个writeToDataSource()方法,这个方法可能为我们持久化规则数据提供了扩展点。


标签:return,源码,command,dashboard,CommandResponse,sentinel,new,type
From: https://blog.51cto.com/morris131/6506197

相关文章

  • 【框架源码】Spring源码解析之Bean创建源码流程
    问题:Spring中是如何初始化单例bean的?我们都知道Spring解析xml文件描述成BeanDefinition,解析BeanDefinition最后创建Bean将Bean放入单例池中,那么Spring在创建Bean的这个过程都做了什么。Spring核心方法refresh()中最最重要的一个方法finishBeanFactoryInitialization()方法,该方法......
  • 五、Sentinel介绍
    随着微服务的流行,服务和服务之间的稳定性变得越来越重要。Sentinel是面向分布式、多语言异构化服务架构的流量治理组件,主要以流量为切入点,从流量路由、流量控制、流量整形、熔断降级、系统自适应过载保护、热点流量防护等多个维度来帮助开发者保障微服务的稳定性。一、Sentinel......
  • 源码泄露+bak备份泄露+vim泄露+.DS_Store(mas迁移泄露)
    源码泄露+bak备份泄露+vim泄露+.DS_Store(mas迁移泄露)1.源码泄露web网站源码打包在web目录下造成泄露,通常以压缩包方式存在,如.zip、.rar、.tar、.tar.gz等,常见命名方式为网站名,www.网站名,backup+网站名等简单入门题目扫描到压缩包文件进行下载,找到对应文件,查看是否有flag,如果没......
  • 开源数字药店系统源码:打造高效的医药销售平台
    作为医药销售的全新解决方案,数字药店系统源码能够为医药企业提供更高效的销售解决方案,提高企业的竞争力。本文将详细介绍开源数字药店系统源码的特点和优势,以及如何打造高效的医药销售平台。一、开源数字药店系统源码的特点1. 功能丰富具有完善的功能,包括商品管理、订单管理、客户......
  • .Net Core医学检验LIS系统源码
    .NetCoreLIS系统源码,在第三方快检实验室、二级医院检验科应用5年以上,系统运行稳定、功能齐全,界面布局合理、操作简便。系统采用B/S架构SaaS模式,可扩展性强。LIS系统为实验室服务对象提供检验申请、采集标本、结果查询等功能;为实验室工作人员的核收标本、分送标本、传送资料、分析......
  • 互联网医院成品|互联网医院源码|线上医疗所含功能
    作为医疗服务领域的新生力量,互联网医院系统逐渐成为了医疗行业内时常交流和沟通的名词和产品,互联网医院系统就是用信息化、互联网化的手段去降低成本、优化服务,提升医疗服务诊前、诊中、诊后的全流程质量。那么互联网医院成品包含哪些功能呢?接下来啊小编就给大家介绍下。1、在线预......
  • 语音社交源码知识语音房间功能的实现
    当今,快节奏的社会为现在的很多年轻人带来了压力,每到深夜或是压力大的时候,很多人都想找人倾诉一下自己心里的悲伤与痛苦,以此来释放一下自己的压力,然而,这是一个独生子女的社会,大部分家庭都只有一个孩子,并且每天工作繁忙又加班到很晚的人根本没有时间或是精力去交朋友,所以这些人没有人......
  • 语音社交源码知识语音房间功能的实现
       当今,快节奏的社会为现在的很多年轻人带来了压力,每到深夜或是压力大的时候,很多人都想找人倾诉一下自己心里的悲伤与痛苦,以此来释放一下自己的压力,然而,这是一个独生子女的社会,大部分家庭都只有一个孩子,并且每天工作繁忙又加班到很晚的人根本没有时间或是精力去交朋友,所以这......
  • java基于springboot+vue的网吧管理系统,附源码+数据库+论文+PPT,适合课程设计、毕业设计
    1、项目介绍随着信息技术和网络技术的飞速发展,人类已进入全新信息化时代,传统管理技术已无法高效,便捷地管理信息。为了迎合时代需求,优化管理效率,各种各样的管理系统应运而生,各行各业相继进入信息管理时代,网吧管理系统就是信息时代变革中的产物之一。任何系统都要遵循系统设计的基......
  • **使用源码部署Nginx 1.23.3的详细步骤和性能优化**
    简介:在本篇博客文章中,我们将详细介绍如何使用源码部署Nginx1.23.3,并提供一些优化措施以提升性能和安全性。将按照以下步骤进行操作:目录准备工作下载和编译Nginx源码安装Nginx配置Nginx优化Nginx性能和安全性启动Nginx服务结论1.准备工作在开始部署Nginx之前,确保你的......