今天主要看SeaTunnel自研的数据同步引擎,叫Zeta。
首先,如果使用的是zeta引擎,那么第一步一定是运行bin/seatunnel-cluster.sh脚本,这个脚本就是启动zeta的服务端的。
打开seatunnel-cluster.sh看看,可以看到其实是去启动seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/SeaTunnelServer.java中的main()方法
这个就是zeta的核心启动方法了。
大家可以先打开自己看看,看了之后可能会一头雾水,怎么就2句代码
ServerCommandArgs serverCommandArgs = CommandLineUtils.parse( args, new ServerCommandArgs(), EngineType.SEATUNNEL.getStarterShellName(), true);
SeaTunnel.run(serverCommandArgs.buildCommand());
就完事了?
其实应该先看看new ServerCommandArgs()这部分,这里返回了一个ServerCommandArgs类,
然后接着应该看看serverCommandArgs.buildCommand(),这里buildCommand()方法是关键。我们把代码贴上来
@Override
public Command<?> buildCommand() {
return new ServerExecuteCommand(this);
}
接着贴new ServerExecuteCommand(this)的代码:
public class ServerExecuteCommand implements Command<ServerCommandArgs> {
private final ServerCommandArgs serverCommandArgs;
public ServerExecuteCommand(ServerCommandArgs serverCommandArgs) {
this.serverCommandArgs = serverCommandArgs;
}
······
}
这里ServerExecuteCommand()方法,返回了一个ServerExecuteCommand类。
回到最初的2句代码,可以看到返回的ServerExecuteCommand类会被传递给SeaTunnel.run()方法,这个方法其实最终调用的就是ServerExecuteCommand类的execute()方法。
ServerCommandArgs serverCommandArgs = CommandLineUtils.parse( args, new ServerCommandArgs(), EngineType.SEATUNNEL.getStarterShellName(), true);
SeaTunnel.run(serverCommandArgs.buildCommand());
所以接下来我们就继续分析ServerExecuteCommand类的execute()方法。
@Override
public void execute() {
SeaTunnelConfig seaTunnelConfig = ConfigProvider.locateAndGetSeaTunnelConfig();
if (StringUtils.isNotEmpty(serverCommandArgs.getClusterName())) {
seaTunnelConfig.getHazelcastConfig().setClusterName(serverCommandArgs.getClusterName());
}
HazelcastInstanceFactory.newHazelcastInstance(
seaTunnelConfig.getHazelcastConfig(),
Thread.currentThread().getName(),
new SeaTunnelNodeContext(seaTunnelConfig));
}
这个方法最重要的是执行了 HazelcastInstanceFactory.newHazelcastInstance( seaTunnelConfig.getHazelcastConfig(), Thread.currentThread().getName(), new SeaTunnelNodeContext(seaTunnelConfig));
这行代码比较复杂,最重要的就是这句代码中的new SeaTunnelNodeContext(seaTunnelConfig),这里会返回一个SeaTunnelNodeContext类,这个类是继承自Hazelcast这个组件的DefaultNodeContext类。在Hazelcast启动的过程中,会去调用DefaultNodeContext类的实现类的createNodeExtension()方法,在这里其实也就是SeaTunnelNodeContext类的createNodeExtension()方法。这里不具体展开讲解Hazelcast类,大家可以去查一下其他Hazelcast资料。
然后我们接着分析createNodeExtension()方法,贴一下代码:
@Override
public NodeExtension createNodeExtension(@NonNull Node node) {
return new org.apache.seatunnel.engine.server.NodeExtension(node, seaTunnelConfig);
}
这里跟踪进去:
public NodeExtension(@NonNull Node node, @NonNull SeaTunnelConfig seaTunnelConfig) {
super(node);
extCommon = new NodeExtensionCommon(node, new SeaTunnelServer(seaTunnelConfig));
}
跟踪后发现进到了NodeExtension类的NodeExtension()方法,在NodeExtension()方法中,可以看到调用了new SeaTunnelServer()方法,好了,这里其实就是SeaTunnelServer,也就是Zeta引擎的启动点了。
我们接着分析new SeaTunnelServer()方法:
public SeaTunnelServer(@NonNull SeaTunnelConfig seaTunnelConfig) {
this.liveOperationRegistry = new LiveOperationRegistry();
this.seaTunnelConfig = seaTunnelConfig;
LOGGER.info("SeaTunnel server start...");
}
可以看到这里打印了 SeaTunnel server start...
SeaTunnelServer也是继承了Hazelcast的一些方法,比如init()方法,这个方法不用多想,肯定是会被Hazelcast在启动的过程中调用到的。我们接着看看init()方法
@Override
public void init(NodeEngine engine, Properties hzProperties) {
this.nodeEngine = (NodeEngineImpl) engine;
// TODO Determine whether to execute there method on the master node according to the deploy
// type
taskExecutionService = new TaskExecutionService(nodeEngine, nodeEngine.getProperties());
nodeEngine.getMetricsRegistry().registerDynamicMetricsProvider(taskExecutionService);
taskExecutionService.start();
getSlotService();
coordinatorService =
new CoordinatorService(nodeEngine, this, seaTunnelConfig.getEngineConfig());
monitorService = Executors.newSingleThreadScheduledExecutor();
monitorService.scheduleAtFixedRate(
this::printExecutionInfo,
0,
seaTunnelConfig.getEngineConfig().getPrintExecutionInfoInterval(),
TimeUnit.SECONDS);
seaTunnelHealthMonitor = new SeaTunnelHealthMonitor(((NodeEngineImpl) engine).getNode());
}
这里就看到TaskExecutionService类了,大家应该舒了一口气,因为我们看一下下面这个图片:
可以看到其实SeaTunnelServer最核心的是调用了TaskExecutionService类的start()方法。
紧接着,又会调用getSlotService(),这个方法用于获取哪些槽位可以执行任务。
这是最关键的2行代码,大家需要自己跟踪研究一下。
这里引用来自一篇官方文章的介绍文字:
TaskExecutionService
TaskExecutionService 是一个执行任务的服务,将在每个节点上运行一个实例。它从 JobMaster 接收 TaskGroup 并在其中运行 Task。并维护TaskID->TaskContext,对Task的具体操作都封装在TaskContext中。而Task内部持有OperationService,也就是说Task可以通过OperationService远程调用其他Task或JobMaster进行通信。
CoordinatorService
CoordinatorService是一个充当协调器的服务,它主要负责处理客户端提交的命令以及切换master后任务的恢复。客户端在提交任务时会找到master节点并将任务提交到CoordinatorService服务上,CoordinatorService会缓存任务信息并等待任务执行结束。当任务结束后再对任务进行归档处理。
SlotService
SlotService是slot管理服务,用于管理集群的可用Slot资源。SlotService运行在所有节点上并定期向master上报资源信息。
接着分析taskExecutionService.start()方法
public void start() {
runBusWorkSupplier.runNewBusWork(false);
}
该方法又调用了runNewBusWork(false)方法:
public boolean runNewBusWork(boolean checkTaskQueue) {
if (!checkTaskQueue || taskQueue.size() > 0) {
BlockingQueue<Future<?>> futureBlockingQueue = new LinkedBlockingQueue<>();
CooperativeTaskWorker cooperativeTaskWorker =
new CooperativeTaskWorker(taskQueue, this, futureBlockingQueue);
Future<?> submit = executorService.submit(cooperativeTaskWorker);
futureBlockingQueue.add(submit);
return true;
}
return false;
}
接口看到上面代码中的new cooperativeTaskWorker(taskQueue,this,futureBlockingQueue)方法,
这里新建了一个cooperativeTaskWorker类,这个类对象会被提交到executorService去执行,代码就是下一行的executorService.submit(cooperativeTaskWorker)这个代码。
然后当cooperativeTaskWorker被提交到executorService上的时候,其实是会运行cooperativeTaskWorker这个类的run方法的。所以接下来我们要看cooperativeTaskWorker类中的run方法具体做了什么事情。
public void run() {
thisTaskFuture = futureBlockingQueue.take();
futureBlockingQueue = null;
myThread = currentThread();
while (keep.get() && isRunning) {
TaskTracker taskTracker =
null != exclusiveTaskTracker.get()
? exclusiveTaskTracker.get()
: taskqueue.takeFirst();
TaskGroupExecutionTracker taskGroupExecutionTracker =
taskTracker.taskGroupExecutionTracker;
if (taskGroupExecutionTracker.executionCompletedExceptionally()) {
taskGroupExecutionTracker.taskDone(taskTracker.task);
if (null != exclusiveTaskTracker.get()) {
// If it's exclusive need to end the work
break;
} else {
// No action required and don't put back
continue;
}
}
taskGroupExecutionTracker.currRunningTaskFuture.put(
taskTracker.task.getTaskID(), thisTaskFuture);
// start timer, if it's exclusive, don't need to start
if (null == exclusiveTaskTracker.get()) {
timer.timerStart(taskTracker);
}
ProgressState call = null;
try {
// run task
myThread.setContextClassLoader(
executionContexts
.get(taskGroupExecutionTracker.taskGroup.getTaskGroupLocation())
.getClassLoader());
call = taskTracker.task.call();
synchronized (timer) {
timer.timerStop();
}
} catch (InterruptedException e) {
if (taskGroupExecutionTracker.executionException.get() == null
&& !taskGroupExecutionTracker.isCancel.get()) {
taskGroupExecutionTracker.exception(e);
}
taskGroupExecutionTracker.taskDone(taskTracker.task);
logger.warning("Exception in " + taskTracker.task, e);
if (null != exclusiveTaskTracker.get()) {
break;
}
} catch (Throwable e) {
// task Failure and complete
taskGroupExecutionTracker.exception(e);
taskGroupExecutionTracker.taskDone(taskTracker.task);
// If it's exclusive need to end the work
logger.warning("Exception in " + taskTracker.task, e);
if (null != exclusiveTaskTracker.get()) {
break;
}
} finally {
// stop timer
timer.timerStop();
taskGroupExecutionTracker.currRunningTaskFuture.remove(
taskTracker.task.getTaskID());
}
// task call finished
if (null != call) {
if (call.isDone()) {
// If it's exclusive, you need to end the work
taskGroupExecutionTracker.taskDone(taskTracker.task);
if (null != exclusiveTaskTracker.get()) {
break;
}
} else {
// Task is not completed. Put task to the end of the queue
// If the current work has an exclusive tracker, it will not be put back
if (null == exclusiveTaskTracker.get()) {
taskqueue.offer(taskTracker);
}
}
}
}
}
这里的代码其实最核心的就是会启动一个while循环,不断去取future队列中是否已经有完成的task,所以刚开始的时候,一般不会进入到while循环,因为还没有启动完成,肯定是去不到future对象的。这里我们会后续再分析,只需要知道在此处启动了一个线程,会不断地去判断是否有task已经做完了,做完了就进入while循环进行后续一系列的处理。我们先回到启动过程的分析上来,这个方法我们后续分析任务执行的时候再深入分析把。
接着就是需要看
getSlotService();
这行代码了。
/** Lazy load for Slot Service */
public SlotService getSlotService() {
if (slotService == null) {
synchronized (this) {
if (slotService == null) {
SlotService service =
new DefaultSlotService(
nodeEngine,
taskExecutionService,
seaTunnelConfig.getEngineConfig().getSlotServiceConfig());
service.init();
slotService = service;
}
}
}
return slotService;
}
首先把getSlotService()方法全部贴上来。
这里会新建一个SlotService 类对象 service,然后会调用该对象的init()方法。
SlotService是一个接口类,它只有一个实现类叫DefaultSlotService类,所以调用SlotService的init()方法就是调用了DefaultSlotService类的init()方法。我们接着看看DefaultSlotService类的init()方法代码。
@Override
public void init() {
initStatus = true;
slotServiceSequence = UUID.randomUUID().toString();
contexts = new ConcurrentHashMap<>();
assignedSlots = new ConcurrentHashMap<>();
unassignedSlots = new ConcurrentHashMap<>();
unassignedResource = new AtomicReference<>(new ResourceProfile());
assignedResource = new AtomicReference<>(new ResourceProfile());
scheduledExecutorService =
Executors.newSingleThreadScheduledExecutor(
r ->
new Thread(
r,
String.format(
"hz.%s.seaTunnel.slotService.thread",
nodeEngine.getHazelcastInstance().getName())));
if (!config.isDynamicSlot()) {
initFixedSlots();
}
unassignedResource.set(getNodeResource());
scheduledExecutorService.scheduleAtFixedRate(
() -> {
try {
LOGGER.fine(
"start send heartbeat to resource manager, this address: "
+ nodeEngine.getClusterService().getThisAddress());
sendToMaster(new WorkerHeartbeatOperation(getWorkerProfile())).join();
} catch (Exception e) {
LOGGER.warning(
"failed send heartbeat to resource manager, will retry later. this address: "
+ nodeEngine.getClusterService().getThisAddress());
}
},
0,
DEFAULT_HEARTBEAT_TIMEOUT,
TimeUnit.MILLISECONDS);
}
该方法先创建了scheduledExecutorService对象线程池,具体线程池的数量要看配置文件中hz.xxxxxx.seaTunnel.slotService.thread的配置文件中的数量,xxxxxx是Hazelcast实例的名称。
然后创建一个定时器,每隔5秒运行sendToMaster(new WorkerHeartbeatOperation(getWorkerProfile())).join();方法。其实就是心跳。心跳是发给Master的,Master在SeaTunnel中也叫做Coordinater
标签:SeaTunnel,task,--,seaTunnelConfig,taskGroupExecutionTracker,taskTracker,源码,new,方 From: https://www.cnblogs.com/lukairui/p/17449822.html