首页 > 编程语言 >SeaTunnel V2.3.1源码分析--zeta引擎启动过程分析

SeaTunnel V2.3.1源码分析--zeta引擎启动过程分析

时间:2023-06-02 12:33:50浏览次数:47  
标签:SeaTunnel task -- seaTunnelConfig taskGroupExecutionTracker taskTracker 源码 new 

今天主要看SeaTunnel自研的数据同步引擎,叫Zeta。

首先,如果使用的是zeta引擎,那么第一步一定是运行bin/seatunnel-cluster.sh脚本,这个脚本就是启动zeta的服务端的。

打开seatunnel-cluster.sh看看,可以看到其实是去启动seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/SeaTunnelServer.java中的main()方法

这个就是zeta的核心启动方法了。

大家可以先打开自己看看,看了之后可能会一头雾水,怎么就2句代码

ServerCommandArgs serverCommandArgs = CommandLineUtils.parse( args, new ServerCommandArgs(), EngineType.SEATUNNEL.getStarterShellName(), true);
SeaTunnel.run(serverCommandArgs.buildCommand());


就完事了?

其实应该先看看new ServerCommandArgs()这部分,这里返回了一个ServerCommandArgs类,
然后接着应该看看serverCommandArgs.buildCommand(),这里buildCommand()方法是关键。我们把代码贴上来

    @Override
    public Command<?> buildCommand() {
        return new ServerExecuteCommand(this);
    }

接着贴new  ServerExecuteCommand(this)的代码:

public class ServerExecuteCommand implements Command<ServerCommandArgs> {

    private final ServerCommandArgs serverCommandArgs;

    public ServerExecuteCommand(ServerCommandArgs serverCommandArgs) {
        this.serverCommandArgs = serverCommandArgs;
    }

    ······
}

这里ServerExecuteCommand()方法,返回了一个ServerExecuteCommand类。

回到最初的2句代码,可以看到返回的ServerExecuteCommand类会被传递给SeaTunnel.run()方法,这个方法其实最终调用的就是ServerExecuteCommand类的execute()方法。

ServerCommandArgs serverCommandArgs = CommandLineUtils.parse( args, new ServerCommandArgs(), EngineType.SEATUNNEL.getStarterShellName(), true);
SeaTunnel.run(serverCommandArgs.buildCommand());

所以接下来我们就继续分析ServerExecuteCommand类的execute()方法。

    @Override
    public void execute() {
        SeaTunnelConfig seaTunnelConfig = ConfigProvider.locateAndGetSeaTunnelConfig();
        if (StringUtils.isNotEmpty(serverCommandArgs.getClusterName())) {
            seaTunnelConfig.getHazelcastConfig().setClusterName(serverCommandArgs.getClusterName());
        }
        HazelcastInstanceFactory.newHazelcastInstance(
                seaTunnelConfig.getHazelcastConfig(),
                Thread.currentThread().getName(),
                new SeaTunnelNodeContext(seaTunnelConfig));
    }

这个方法最重要的是执行了 HazelcastInstanceFactory.newHazelcastInstance( seaTunnelConfig.getHazelcastConfig(), Thread.currentThread().getName(), new SeaTunnelNodeContext(seaTunnelConfig));

这行代码比较复杂,最重要的就是这句代码中的new SeaTunnelNodeContext(seaTunnelConfig),这里会返回一个SeaTunnelNodeContext类,这个类是继承自Hazelcast这个组件的DefaultNodeContext类。在Hazelcast启动的过程中,会去调用DefaultNodeContext类的实现类的createNodeExtension()方法,在这里其实也就是SeaTunnelNodeContext类的createNodeExtension()方法。这里不具体展开讲解Hazelcast类,大家可以去查一下其他Hazelcast资料。

然后我们接着分析createNodeExtension()方法,贴一下代码:

@Override
public NodeExtension createNodeExtension(@NonNull Node node) {
    return new org.apache.seatunnel.engine.server.NodeExtension(node, seaTunnelConfig);
}

这里跟踪进去:

public NodeExtension(@NonNull Node node, @NonNull SeaTunnelConfig seaTunnelConfig) {
    super(node);
    extCommon = new NodeExtensionCommon(node, new SeaTunnelServer(seaTunnelConfig));
}

跟踪后发现进到了NodeExtension类的NodeExtension()方法,在NodeExtension()方法中,可以看到调用了new  SeaTunnelServer()方法,好了,这里其实就是SeaTunnelServer,也就是Zeta引擎的启动点了。

 

我们接着分析new SeaTunnelServer()方法:

public SeaTunnelServer(@NonNull SeaTunnelConfig seaTunnelConfig) {
    this.liveOperationRegistry = new LiveOperationRegistry();
    this.seaTunnelConfig = seaTunnelConfig;
    LOGGER.info("SeaTunnel server start...");
}

可以看到这里打印了 SeaTunnel  server  start...

SeaTunnelServer也是继承了Hazelcast的一些方法,比如init()方法,这个方法不用多想,肯定是会被Hazelcast在启动的过程中调用到的。我们接着看看init()方法

@Override
public void init(NodeEngine engine, Properties hzProperties) {
    this.nodeEngine = (NodeEngineImpl) engine;
    // TODO Determine whether to execute there method on the master node according to the deploy
    // type
    taskExecutionService = new TaskExecutionService(nodeEngine, nodeEngine.getProperties());
    nodeEngine.getMetricsRegistry().registerDynamicMetricsProvider(taskExecutionService);
    taskExecutionService.start();
    getSlotService();
    coordinatorService =
            new CoordinatorService(nodeEngine, this, seaTunnelConfig.getEngineConfig());
    monitorService = Executors.newSingleThreadScheduledExecutor();
    monitorService.scheduleAtFixedRate(
            this::printExecutionInfo,
            0,
            seaTunnelConfig.getEngineConfig().getPrintExecutionInfoInterval(),
            TimeUnit.SECONDS);

    seaTunnelHealthMonitor = new SeaTunnelHealthMonitor(((NodeEngineImpl) engine).getNode());
}

这里就看到TaskExecutionService类了,大家应该舒了一口气,因为我们看一下下面这个图片:

可以看到其实SeaTunnelServer最核心的是调用了TaskExecutionService类的start()方法。

紧接着,又会调用getSlotService(),这个方法用于获取哪些槽位可以执行任务。

这是最关键的2行代码,大家需要自己跟踪研究一下。

这里引用来自一篇官方文章的介绍文字:

 TaskExecutionService 

TaskExecutionService 是一个执行任务的服务,将在每个节点上运行一个实例。它从 JobMaster 接收 TaskGroup 并在其中运行 Task。并维护TaskID->TaskContext,对Task的具体操作都封装在TaskContext中。而Task内部持有OperationService,也就是说Task可以通过OperationService远程调用其他Task或JobMaster进行通信。

CoordinatorService 

CoordinatorService是一个充当协调器的服务,它主要负责处理客户端提交的命令以及切换master后任务的恢复。客户端在提交任务时会找到master节点并将任务提交到CoordinatorService服务上,CoordinatorService会缓存任务信息并等待任务执行结束。当任务结束后再对任务进行归档处理。

SlotService

SlotService是slot管理服务,用于管理集群的可用Slot资源。SlotService运行在所有节点上并定期向master上报资源信息。

 

 

接着分析taskExecutionService.start()方法

public void start() {
    runBusWorkSupplier.runNewBusWork(false);
}

该方法又调用了runNewBusWork(false)方法:

public boolean runNewBusWork(boolean checkTaskQueue) {
    if (!checkTaskQueue || taskQueue.size() > 0) {
        BlockingQueue<Future<?>> futureBlockingQueue = new LinkedBlockingQueue<>();
        CooperativeTaskWorker cooperativeTaskWorker =
                new CooperativeTaskWorker(taskQueue, this, futureBlockingQueue);
        Future<?> submit = executorService.submit(cooperativeTaskWorker);
        futureBlockingQueue.add(submit);
        return true;
    }
    return false;
}

接口看到上面代码中的new cooperativeTaskWorker(taskQueue,this,futureBlockingQueue)方法,

这里新建了一个cooperativeTaskWorker类,这个类对象会被提交到executorService去执行,代码就是下一行的executorService.submit(cooperativeTaskWorker)这个代码。

然后当cooperativeTaskWorker被提交到executorService上的时候,其实是会运行cooperativeTaskWorker这个类的run方法的。所以接下来我们要看cooperativeTaskWorker类中的run方法具体做了什么事情。

public void run() {
    thisTaskFuture = futureBlockingQueue.take();
    futureBlockingQueue = null;
    myThread = currentThread();
    while (keep.get() && isRunning) {
        TaskTracker taskTracker =
                null != exclusiveTaskTracker.get()
                        ? exclusiveTaskTracker.get()
                        : taskqueue.takeFirst();
        TaskGroupExecutionTracker taskGroupExecutionTracker =
                taskTracker.taskGroupExecutionTracker;
        if (taskGroupExecutionTracker.executionCompletedExceptionally()) {
            taskGroupExecutionTracker.taskDone(taskTracker.task);
            if (null != exclusiveTaskTracker.get()) {
                // If it's exclusive need to end the work
                break;
            } else {
                // No action required and don't put back
                continue;
            }
        }
        taskGroupExecutionTracker.currRunningTaskFuture.put(
                taskTracker.task.getTaskID(), thisTaskFuture);
        // start timer, if it's exclusive, don't need to start
        if (null == exclusiveTaskTracker.get()) {
            timer.timerStart(taskTracker);
        }
        ProgressState call = null;
        try {
            // run task
            myThread.setContextClassLoader(
                    executionContexts
                            .get(taskGroupExecutionTracker.taskGroup.getTaskGroupLocation())
                            .getClassLoader());
            call = taskTracker.task.call();
            synchronized (timer) {
                timer.timerStop();
            }
        } catch (InterruptedException e) {
            if (taskGroupExecutionTracker.executionException.get() == null
                    && !taskGroupExecutionTracker.isCancel.get()) {
                taskGroupExecutionTracker.exception(e);
            }
            taskGroupExecutionTracker.taskDone(taskTracker.task);
            logger.warning("Exception in " + taskTracker.task, e);
            if (null != exclusiveTaskTracker.get()) {
                break;
            }
        } catch (Throwable e) {
            // task Failure and complete
            taskGroupExecutionTracker.exception(e);
            taskGroupExecutionTracker.taskDone(taskTracker.task);
            // If it's exclusive need to end the work
            logger.warning("Exception in " + taskTracker.task, e);
            if (null != exclusiveTaskTracker.get()) {
                break;
            }
        } finally {
            // stop timer
            timer.timerStop();
            taskGroupExecutionTracker.currRunningTaskFuture.remove(
                    taskTracker.task.getTaskID());
        }
        // task call finished
        if (null != call) {
            if (call.isDone()) {
                // If it's exclusive, you need to end the work
                taskGroupExecutionTracker.taskDone(taskTracker.task);
                if (null != exclusiveTaskTracker.get()) {
                    break;
                }
            } else {
                // Task is not completed. Put task to the end of the queue
                // If the current work has an exclusive tracker, it will not be put back
                if (null == exclusiveTaskTracker.get()) {
                    taskqueue.offer(taskTracker);
                }
            }
        }
    }
}

这里的代码其实最核心的就是会启动一个while循环,不断去取future队列中是否已经有完成的task,所以刚开始的时候,一般不会进入到while循环,因为还没有启动完成,肯定是去不到future对象的。这里我们会后续再分析,只需要知道在此处启动了一个线程,会不断地去判断是否有task已经做完了,做完了就进入while循环进行后续一系列的处理。我们先回到启动过程的分析上来,这个方法我们后续分析任务执行的时候再深入分析把。

接着就是需要看

getSlotService();

这行代码了。

/** Lazy load for Slot Service */
public SlotService getSlotService() {
    if (slotService == null) {
        synchronized (this) {
            if (slotService == null) {
                SlotService service =
                        new DefaultSlotService(
                                nodeEngine,
                                taskExecutionService,
                                seaTunnelConfig.getEngineConfig().getSlotServiceConfig());
                service.init();
                slotService = service;
            }
        }
    }
    return slotService;
}

首先把getSlotService()方法全部贴上来。

这里会新建一个SlotService 类对象 service,然后会调用该对象的init()方法。

SlotService是一个接口类,它只有一个实现类叫DefaultSlotService类,所以调用SlotService的init()方法就是调用了DefaultSlotService类的init()方法。我们接着看看DefaultSlotService类的init()方法代码。

@Override
public void init() {
    initStatus = true;
    slotServiceSequence = UUID.randomUUID().toString();
    contexts = new ConcurrentHashMap<>();
    assignedSlots = new ConcurrentHashMap<>();
    unassignedSlots = new ConcurrentHashMap<>();
    unassignedResource = new AtomicReference<>(new ResourceProfile());
    assignedResource = new AtomicReference<>(new ResourceProfile());
    scheduledExecutorService =
            Executors.newSingleThreadScheduledExecutor(
                    r ->
                            new Thread(
                                    r,
                                    String.format(
                                            "hz.%s.seaTunnel.slotService.thread",
                                            nodeEngine.getHazelcastInstance().getName())));
    if (!config.isDynamicSlot()) {
        initFixedSlots();
    }
    unassignedResource.set(getNodeResource());
    scheduledExecutorService.scheduleAtFixedRate(
            () -> {
                try {
                    LOGGER.fine(
                            "start send heartbeat to resource manager, this address: "
                                    + nodeEngine.getClusterService().getThisAddress());
                    sendToMaster(new WorkerHeartbeatOperation(getWorkerProfile())).join();
                } catch (Exception e) {
                    LOGGER.warning(
                            "failed send heartbeat to resource manager, will retry later. this address: "
                                    + nodeEngine.getClusterService().getThisAddress());
                }
            },
            0,
            DEFAULT_HEARTBEAT_TIMEOUT,
            TimeUnit.MILLISECONDS);
}

该方法先创建了scheduledExecutorService对象线程池,具体线程池的数量要看配置文件中hz.xxxxxx.seaTunnel.slotService.thread的配置文件中的数量,xxxxxx是Hazelcast实例的名称。

然后创建一个定时器,每隔5秒运行sendToMaster(new WorkerHeartbeatOperation(getWorkerProfile())).join();方法。其实就是心跳。心跳是发给Master的,Master在SeaTunnel中也叫做Coordinater

 

标签:SeaTunnel,task,--,seaTunnelConfig,taskGroupExecutionTracker,taskTracker,源码,new,
From: https://www.cnblogs.com/lukairui/p/17449822.html

相关文章

  • Hazelcast的ManagedService接口类执行顺序
    在Hazelcast中,ManagedService接口中定义的方法的执行顺序如下:init(NodeEnginenodeEngine,Propertiesproperties):此方法在服务初始化时调用,允许你执行一些初始化逻辑或设置。reset():此方法在服务重置时调用,允许你重置或清理服务的状态。partitionLost(intpartitio......
  • 《深度剖析CPython解释器》16. Python函数机制的深度解析(第三部分): 闭包的底层实现
    https://www.cnblogs.com/traditional/p/13580694.html楔子上一篇我们看了函数是如何调用的,这一次我们看一下函数中局部变量的访问、以及闭包相关的知识。函数中局部变量的访问我们说过函数的参数和函数内部定义的变量都属于局部变量,所以它也一样是通过静态的方式进行访问。......
  • C#.NET大文件分片上传/多线程上传
    ​IE的自带下载功能中没有断点续传功能,要实现断点续传功能,需要用到HTTP协议中鲜为人知的几个响应头和请求头。 一. 两个必要响应头Accept-Ranges、ETag        客户端每次提交下载请求时,服务端都要添加这两个响应头,以保证客户端和服务端将此下载识别为可以断点续传......
  • ChatGPT + Flutter快速开发多端聊天机器人App
    ChatGPT+Flutter快速开发多端聊天机器人Appdownload:3w51xuebccom剖析ChatGPT的应用场景和案例ChatGPT是一种基于人工智能技术的自然语言处理模型,它可以通过对话的方式与用户进行交互。在本篇文章中,我们将介绍ChatGPT的应用场景和相关案例。ChatGPT的应用场景ChatGPT主要应用于......
  • Docker网络详解
    文章目录一、理解docker0网桥二、Docker网络模式三、Docker容器互联四、自定义网络一、理解docker0网桥安装docker的时候,会生成一个docker0的虚拟网桥。每运行一个docker容器都会生成一个veth设备对,这个veth一个接口在容器里,一个接口在物理机上安装网桥管理工具:yuminstallbridg......
  • Docker资源配额详解
    文章目录一、针对CPU限制二、针对内存限制三、针对磁盘限制四、stress压测工具Docker通过cgroup来控制容器使用的资源限制,可以对docker限制的资源包括CPU、内存、磁盘一、针对CPU限制Docker容器针对CPU限制包括--cpu-shares、--cpuset-cpus参数。–cpu-shares:CPU使用份额控制......
  • SpringBoot Vue3 Element Plus 打造分布式存储系统
    SpringBoot+Vue3+ElementPlus打造分布式存储系统download:3w51xuebccom配置IDEA热部署-devtools开发过程中频繁修改代码,每次都需要重新编译,部署,重启服务器,这无疑极大浪费了我们的时间。解决这个问题的方法就是使用热部署技术。本篇文章将介绍如何在IDEA中使用devtools实现热部署......
  • 黑马Vue3 + ElementPlus + Pinia 小兔鲜电商项目2023版
    黑马Vue3+ElementPlus+Pinia小兔鲜电商项目2023版download:3w51xuebccom合式API-watch-基本使用和立即执行合式API是一个用于构建可靠、模块化、灵活的RESTfulAPI的框架。它提供了许多实用的功能,其中包括watch机制。在本篇文章中,我们将介绍合式API的watch机制的基本使用和立......
  • MySQL日志
    文章目录一、理论二、错误日志(ErrorLog)三、二进制日志四、事务日志(redo日志)五、慢查询日志(slowquerylog)一、理论数据库日常操作和错误信息。MySQL有不同类型的日志文件(各自存储了不同类型的日志),从日志当中可以查询到MySQL数据库的运行情况、用户的操作、错误的信息等。MySQ......
  • 浪潮4U服务器 raid5 &&直通(JBOD)
    浪潮服务器微信公众号寻求帮助浪潮专家服务raid5等配置直通参考1.8JBOD章节R卡在浪潮八路机器UEFI模式下配置步骤(例如310893619460):http://www.4008600011.com/archives/16861PM8204在浪潮八路机器UEFI模式下配置步骤:http://www.4008600011.com/archives/16967官网-支持下载-......