首页 > 其他分享 >sentinel-tansport-SPI-CommandSPI

sentinel-tansport-SPI-CommandSPI

时间:2024-09-29 10:27:06浏览次数:1  
标签:socket tansport alibaba command SimpleHttpCommandCenter CommandSPI sentinel new

说明

我们引入以下

<dependency>
    <groupId>com.alibaba.csp</groupId>
    <artifactId>sentinel-transport-simple-http</artifactId>
     <version>1.8.6</version>
</dependency>

通过初始化com.alibaba.csp.sentinel.init.InitExecutor#doInit 之后就可以通过sentinel暴露的端口调用相关api

如果使用了alibaba-starter-sentinel则不需要手动调用因为com.alibaba.cloud.sentinel.custom.SentinelAutoConfiguration#init在这里面执行了自动调用

他是如何实现呢

<1>

com.alibaba.csp.sentinel.init.InitExecutor#doInit

  public static void doInit() {
        //原子性set和判断是否已经初始化
        if (!initialized.compareAndSet(false, true)) {
            return;
        }
        try {
            //SPI获取InitFunc
            ServiceLoader<InitFunc> loader = ServiceLoaderUtil.getServiceLoader(InitFunc.class);
            List<OrderWrapper> initList = new ArrayList<OrderWrapper>();
            for (InitFunc initFunc : loader) {
                RecordLog.info("[InitExecutor] Found init func: " + initFunc.getClass().getCanonicalName());
                insertSorted(initList, initFunc);
            }
            for (OrderWrapper w : initList) {
                //<2>执行初始化 针对commnad是com.alibaba.csp.sentinel.transport.init.CommandCenterInitFunc#init
                w.func.init();
                RecordLog.info(String.format("[InitExecutor] Executing %s with order %d",
                    w.func.getClass().getCanonicalName(), w.order));
            }
        } catch (Exception ex) {
            RecordLog.warn("[InitExecutor] WARN: Initialization failed", ex);
            ex.printStackTrace();
        } catch (Error error) {
            RecordLog.warn("[InitExecutor] ERROR: Initialization failed with fatal error", error);
            error.printStackTrace();
        }
    }

<2>

@InitOrder(-1)
public class CommandCenterInitFunc implements InitFunc {

    @Override
    public void init() throws Exception {
        //这里也是SPI的方式获取CommandCenter 我们可以自定义,默认是 com.alibaba.csp.sentinel.transport.command.SimpleHttpCommandCenter
        CommandCenter commandCenter = CommandCenterProvider.getCommandCenter();

        if (commandCenter == null) {
            RecordLog.warn("[CommandCenterInitFunc] Cannot resolve CommandCenter");
            return;
        }
        //<3>执行command的初始化内部也是SPI的方式,我们可以动态新增
        commandCenter.beforeStart();
//<6>开启http service commandCenter.start(); RecordLog.info("[CommandCenterInit] Starting command center: " + commandCenter.getClass().getCanonicalName()); } }

<3>

com.alibaba.csp.sentinel.transport.command.SimpleHttpCommandCenter#beforeStart

    public void beforeStart() throws Exception {
        //<4>SPI的方式获取
        Map<String, CommandHandler> handlers = CommandHandlerProvider.getInstance().namedHandlers();
        //<5>注册command 
        registerCommands(handlers);
    }

<4>

com.alibaba.csp.sentinel.command.CommandHandlerProvider#namedHandlers

public class CommandHandlerProvider implements Iterable<CommandHandler> {

    //SPI的方式获取CommandHandler
    private final ServiceLoader<CommandHandler> serviceLoader = ServiceLoaderUtil.getServiceLoader(
        CommandHandler.class);

    /**
     * Get all command handlers annotated with {@link CommandMapping} with command name.
     *
     * @return list of all named command handlers
     */
    public Map<String, CommandHandler> namedHandlers() {
        Map<String, CommandHandler> map = new HashMap<String, CommandHandler>();
        for (CommandHandler handler : serviceLoader) {
            String name = parseCommandName(handler);
            if (!StringUtil.isEmpty(name)) {
                map.put(name, handler);
            }
        }
        return map;
    }

    private String parseCommandName(CommandHandler handler) {
        //需要打了CommandMapping注解 name为url映射
        CommandMapping commandMapping = handler.getClass().getAnnotation(CommandMapping.class);
        if (commandMapping != null) {
            return commandMapping.name();
        } else {
            return null;
        }
    }
  ......

}

<5>

注册到static变量

com.alibaba.csp.sentinel.transport.command.SimpleHttpCommandCenter#registerCommands

  private static final Map<String, CommandHandler> handlerMap = new ConcurrentHashMap();
 public static void registerCommand(String commandName, CommandHandler handler) {
        if (!StringUtil.isEmpty(commandName)) {
            if (handlerMap.containsKey(commandName)) {
                CommandCenterLog.warn("Register failed (duplicate command): " + commandName, new Object[0]);
            } else {
                handlerMap.put(commandName, handler);
            }
        }
    }

<6>

com.alibaba.csp.sentinel.transport.command.SimpleHttpCommandCenter#start

  public void start() throws Exception {
         //获取cpu核数 作为请求处理线程数
        int nThreads = Runtime.getRuntime().availableProcessors();
        //初始化http请求处理的线程池
        this.bizExecutor = new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue(10), new NamedThreadFactory("sentinel-command-center-service-executor"), new RejectedExecutionHandler() {
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                CommandCenterLog.info("EventTask rejected", new Object[0]);
                throw new RejectedExecutionException();
            }
        });
        //处理认为
        Runnable serverInitTask = new Runnable() {
            int port;

            {
                try {
                    //获取配置的端口信息
                    this.port = Integer.parseInt(TransportConfig.getPort());
                } catch (Exception var3) {
                    //出现异常8719
                    this.port = 8719;
                }

            }

            public void run() {
                boolean success = false;
                //初始化ServerSocket 内部会通过port进行监听,如果冲突则自动+1往上找
                ServerSocket serverSocket = SimpleHttpCommandCenter.getServerSocketFromBasePort(this.port);
                if (serverSocket != null) {
                    CommandCenterLog.info("[CommandCenter] Begin listening at port " + serverSocket.getLocalPort(), new Object[0]);
                    SimpleHttpCommandCenter.this.socketReference = serverSocket;
                    //<7>开启监听http线程处理器 监听器为ServerThread 
                    SimpleHttpCommandCenter.this.executor.submit(SimpleHttpCommandCenter.this.new ServerThread(serverSocket));
                    success = true;
                    this.port = serverSocket.getLocalPort();
                } else {
                    CommandCenterLog.info("[CommandCenter] chooses port fail, http command center will not work", new Object[0]);
                }

                if (!success) {
                    this.port = -1;
                }

                TransportConfig.setRuntimePort(this.port);
                SimpleHttpCommandCenter.this.executor.shutdown();
            }
        };
        (new Thread(serverInitTask)).start();

<7>

com.alibaba.csp.sentinel.transport.command.SimpleHttpCommandCenter.ServerThread#run

 public void run() {
            while(true) {
                Socket socket = null;

                try {
                    //监听请求
                    socket = this.serverSocket.accept();
                    SimpleHttpCommandCenter.this.setSocketSoTimeout(socket);
                    HttpEventTask eventTask = new HttpEventTask(socket);
                    //<8>监听到请求交给HttpEventTask 处理
                    SimpleHttpCommandCenter.this.bizExecutor.submit(eventTask);
                } catch (Exception var6) {
                    CommandCenterLog.info("Server error", var6);
                    if (socket != null) {
                        try {
                            socket.close();
                        } catch (Exception var5) {
                            CommandCenterLog.info("Error when closing an opened socket", var5);
                        }
                    }

                    try {
                        Thread.sleep(10L);
                    } catch (InterruptedException var4) {
                        return;
                    }
                }
            }
        }

com.alibaba.csp.sentinel.transport.command.http.HttpEventTask#run

    public void run() {
        if (this.socket != null) {
            PrintWriter printWriter = null;
            InputStream inputStream = null;

            try {
                long start = System.currentTimeMillis();
                inputStream = new BufferedInputStream(this.socket.getInputStream());
                OutputStream outputStream = this.socket.getOutputStream();
                printWriter = new PrintWriter(new OutputStreamWriter(outputStream, Charset.forName(SentinelConfig.charset())));
                String firstLine = readLine(inputStream);
                CommandCenterLog.info("[SimpleHttpCommandCenter] Socket income: " + firstLine + ", addr: " + this.socket.getInetAddress(), new Object[0]);
                CommandRequest request = processQueryString(firstLine);
                if (firstLine.length() > 4 && StringUtil.equalsIgnoreCase("POST", firstLine.substring(0, 4))) {
                    processPostRequest(inputStream, request);
                }

                String commandName = HttpCommandUtils.getTarget(request);
                if (StringUtil.isBlank(commandName)) {
                    this.writeResponse(printWriter, StatusCode.BAD_REQUEST, "Invalid command");
                    return;
                }

                //核心代码 获取对应的Handler就是前面SPI初始化的
                CommandHandler<?> commandHandler = SimpleHttpCommandCenter.getHandler(commandName);
                if (commandHandler != null) {
                    CommandResponse<?> response = commandHandler.handle(request);
                    this.handleResponse(response, printWriter);
                } else {
                    this.writeResponse(printWriter, StatusCode.BAD_REQUEST, "Unknown command `" + commandName + '`');
                }

                long cost = System.currentTimeMillis() - start;
                CommandCenterLog.info("[SimpleHttpCommandCenter] Deal a socket task: " + firstLine + ", address: " + this.socket.getInetAddress() + ", time cost: " + cost + " ms", new Object[0]);
            } catch (RequestException var18) {
                this.writeResponse(printWriter, var18.getStatusCode(), var18.getMessage());
            } catch (Throwable var19) {
                Throwable e = var19;
                CommandCenterLog.warn("[SimpleHttpCommandCenter] CommandCenter error", var19);

                try {
                    if (printWriter != null) {
                        String errorMessage = "Command server error";
                        e.printStackTrace();
                        if (!this.writtenHead) {
                            this.writeResponse(printWriter, StatusCode.INTERNAL_SERVER_ERROR, errorMessage);
                        } else {
                            printWriter.println(errorMessage);
                        }

                        printWriter.flush();
                    }
                } catch (Exception var17) {
                    CommandCenterLog.warn("Failed to write error response", var17);
                }
            } finally {
                this.closeResource(inputStream);
                this.closeResource(printWriter);
                this.closeResource(this.socket);
            }

        }
    }

 

 

标签:socket,tansport,alibaba,command,SimpleHttpCommandCenter,CommandSPI,sentinel,new
From: https://www.cnblogs.com/LQBlog/p/18439026

相关文章

  • 基于Sentinel自研组件的系统限流、降级、负载保护最佳实践探索
    一、Sentinel简介Sentinel以流量为切入点,从流量控制、熔断降级、系统负载保护等多个维度保护服务的稳定性。Sentinel具有以下特征:•丰富的应用场景:Sentinel承接了阿里巴巴近10年的双十一大促流量的核心场景,例如秒杀(即突发流量控制在系统容量可以承受的范围)、消息削峰填谷......
  • 基于Sentinel自研组件的系统限流、降级、负载保护最佳实践探索
    一、Sentinel简介Sentinel以流量为切入点,从流量控制、熔断降级、系统负载保护等多个维度保护服务的稳定性。Sentinel具有以下特征:•丰富的应用场景:Sentinel承接了阿里巴巴近10年的双十一大促流量的核心场景,例如秒杀(即突发流量控制在系统容量可以承受的范围)、消息削峰填谷、集......
  • sentinel常见运维
    查看规则是否加载成功规则类型URL1系统规则http://{serverip}:{port}/getRules?getParamFlowRules2获取授权规则http://{serverip}:{port}/getRules?type=authority3获取熔断规则http://{serverip}:{port}/getRules?type=degrade4获取限流规......
  • springcloud整合sentinel
    此处只做个简单的springcloud中添加sentinel的demo1、下载sentinel的jar,访问网址:https://github.com/alibaba/Sentinel/releases 2、找到本地jar文件夹,使用cmd或者终端中打开,执行java-jar sentinel-dashboard-1.8.0.jar 启动完成 访问可视化页面,默认账号密码都是sent......
  • Redis Sentinel:秒杀系统背后的可靠性保障神器!
    哈喽,大家好呀!我是小米,今天我想和大家聊聊如何在个人项目中保证系统的可靠性,尤其是用Redis哨兵模式来保障高可用性。相信很多小伙伴在开发中遇到过Redis挂掉的情况,特别是在高并发场景下,一旦主服务器下线,整个系统可能会因此瘫痪。那我们该如何应对这个问题呢?今天就带大家深入了解......
  • GEE教程:利用sentinel-2数据进行ndwi和ndci指数的计算和下载
    目录简介函数normalizedDifference(bandNames)Arguments:Returns: ImageExport.image.toDrive(image, description, folder, fileNamePrefix, dimensions, region, scale, crs, crsTransform, maxPixels, shardSize, fileDimensions, skipEmptyTiles, file......
  • Redis哨兵机制sentinel集群配置
    一、安装redis1主2从集群略二、复制sentinel.conf文件到指定目录修改sentinel.conf配置port26379dir"/tmp"logfile"/usr/local/redis/logs/sentinel-26379.log"daemonizeyessentinelmonitormymaster10.211.55.763792sentinelauth-passmymasterlinlinsen......
  • 入门sentinel
    Sentinel是阿里巴巴开源的一款面向分布式服务架构的轻量级流量控制组件,主要用于保护微服务和分布式系统,防止因流量过大或服务故障导致系统崩溃。以下是对Sentinel入门的详细介绍:一、Sentinel的主要功能Sentinel是阿里巴巴开源的一款面向分布式服务架构的高可用流量防护......
  • Sentinel在边缘服务(如API网关)中的应用和策略是什么?
    在微服务架构中,API网关(也称为边缘服务)通常作为所有客户端请求的单一入口点。API网关不仅可以提供路由功能,还将安全性、监控、缓存、限流、动态数据转换等功能集中在一起。Sentinel在API网关中的应用主要是为了保护后端服务免受过载风险,确保系统的稳定性和响应时间。以下是......
  • 保姆级,手把手教你物理机搭建Redis-sentinel(哨兵)集群
    集群介绍        Redis,作为一种开源的、基于内存的数据结构存储系统,被广泛应用于各种场景,包括缓存、消息队列、短期存储等。单一实例的工作模式通常无法保证Redis的可用性和拓展性,Redis提供了三种分布式方案:主从模式哨兵模式集群模式      主从模式    ......