首页 > 系统相关 >Cluster机制剖析1——进程复制

Cluster机制剖析1——进程复制

时间:2023-08-01 15:38:00浏览次数:45  
标签:fork cluster process worker 剖析 Cluster 复制 env 进程


Fork

fork()是类UNIX系统父进程复制子进程的系统调用,在Node里通过libuv实现了对不同平台(unix,linux,windows)的封装。引用百度百科的一段话来描述fork的特性:


fork之后的子进程是父进程的副本,它将获得父进程数据空间、堆、栈等资源的副本。注意,子进程持有的是上述存储空间的“副本”,这意味着父子进程间不共享这些存储空间。


其实,在node的cluster模式里,worker进程的产生也是调用了require('child_process').fork。


那直接这样不就行了?


var fork =require('child_process').fork;
var cpuNums =require('os').cpus().length;
var workerPath =require('path').join(__dirname,'worker.js');
for(var i =0; i < cpuNums; i++){
fork(workerPath,{ env: process.env });
}

这样的方式仅仅实现了多进程。多进程运行还涉及父子进程通信,子进程管理,以及负载均衡等问题,这些特性cluster帮你实现了,在后面的章节会一一道来。


两种逻辑

先看一个官网的例子:


var cluster =require('cluster');
var http =require('http');
var numCPUs =require('os').cpus().length;
if(cluster.isMaster){
// Fork workers.
for(var i =0; i < numCPUs; i++){
cluster.fork();
}
cluster.on('exit',function(worker, code, signal){
console.log('worker '+ worker.process.pid +' died');
});
}else{
// Workers can share any TCP connection
// In this case its a HTTP server
http.createServer(function(req, res){
res.writeHead(200);
res.end("hello world\n");
}).listen(8000);
}

以上代码的意思是:如果是父进程(Master),就复制多个子进程,子进程的数目等于CPU核心数。如果是子进程(Worker),就创建一个http服务器并监听8000端口。


我们在前面提到,子进程是父进程的“副本”,将得到父进程的数据(代码)空间及堆栈,因此,父子进程会执行同一段代码逻辑。这样就需要一种机制,能够在代码里区分当前进程是父进程还是子进程。首先来看看神秘的cluster模块究竟是什么:


// lib/cluster.js
functionCluster(){
EventEmitter.call(this);
}
util.inherits(Cluster,EventEmitter);
var cluster =module.exports =newCluster();

它是一个Object,通过继承EventEmitter,使它有了事件驱动机制。


// Define isWorker and isMaster
cluster.isWorker ='NODE_UNIQUE_ID'in process.env;
cluster.isMaster =! cluster.isWorker;


可以看到cluster下有两个flag来标识区分当前进程,依据是当前进程的环境变量(env)里是否包含NODE_UNIQUE_ID这个字段。为什么子进程的环境变量里有这个字段而父进程没有?


我们知道,在复制子进程的时候实际上是调用了require('child_process').fork,看一下这个方法的用法:


child_process.fork(modulePath,[args],[options])
modulePath StringThemodule to run in the child
args ArrayList of string arguments
options Object
cwd StringCurrent working directory of the child process
env ObjectEnvironment key-value pairs
encoding String(Default:'utf8')
execPath StringExecutable used to create the child process
execArgv ArrayList of string arguments passed to the executable (Default: process.execArgv)
silent BooleanIftrue, stdin, stdout,and stderr of the child will be piped to the parent, otherwise they will be inherited from the parent, see the "pipe"and"inherit" options for spawn()'s stdio for more details (default is false)
Return: ChildProcess object

可以看到方法的第三个参数options是个对象,其中options.env可以设置子进程的环境变量,即process.env。因此,可以推测,应该是这里调用的时候,在env里面添加了NODE_UNIQUE_ID这个标识。接下来就来看下代码:


functionWorker(customEnv){
...
// Create or get process
if(cluster.isMaster){
// Create env object
// first: copy and add id property
var envCopy = util._extend({}, env);
envCopy['NODE_UNIQUE_ID']=this.id;
// second: extend envCopy with the env argument
if(isObject(customEnv)){
envCopy = util._extend(envCopy, customEnv);
}
// fork worker
this.process = fork(settings.exec, settings.args,{
'env': envCopy,
'silent': settings.silent,
'execArgv': settings.execArgv
});
}else{
this.process = process;
}
...
}

以上代码段印证了之前的推测,就不再赘述了。


还有个细节需要注意:


// src/node.js
if(process.env.NODE_UNIQUE_ID){
var cluster =NativeModule.require('cluster');
cluster._setupWorker();
// Make sure it's not accidentally inherited by child processes.
delete process.env.NODE_UNIQUE_ID;
}


在Node进程启动的时候,子进程会执行以上代码,我们先不追究_setupWorker的细节。可以看到NODE_UNIQUE_ID从环境变量里剔除了,这使得子进程可以作为Master继续复制子进程。


进程复制

继续官网的示例,我们先看父进程的执行逻辑:


if(cluster.isMaster){
// Fork workers.
for(var i =0; i < numCPUs; i++){
cluster.fork();
}
cluster.on('exit',function(worker, code, signal){
console.log('worker '+ worker.process.pid +' died');
});
}


很明显,主要就是执行fork调用。那我们就来看看cluster.fork做了什么?


// Fork a new worker
cluster.fork =function(env){
// This can only be called from the master.
assert(cluster.isMaster);
// Make sure that the master has been initialized
cluster.setupMaster();
return(new cluster.Worker(env));
};


先setupMaster,再返回一个cluster.Worker实例。其实workder实例化的代码在前面已经贴过了,我们来个完整版:


// Create a worker object, that works both for master and worker
functionWorker(customEnv){
if(!(thisinstanceofWorker))returnnewWorker();
EventEmitter.call(this);
varself=this;
var env = process.env;
// Assign a unique id, default null
this.id = cluster.isMaster ?++ids : toDecInt(env.NODE_UNIQUE_ID);
// XXX: Legacy. Remove in 0.9
this.workerID =this.uniqueID =this.id;
// Assign state
this.state ='none';
// Create or get process
if(cluster.isMaster){
// Create env object
// first: copy and add id property
var envCopy = util._extend({}, env);
envCopy['NODE_UNIQUE_ID']=this.id;
// second: extend envCopy with the env argument
if(isObject(customEnv)){
envCopy = util._extend(envCopy, customEnv);
}
// fork worker
this.process = fork(settings.exec, settings.args,{
'env': envCopy,
'silent': settings.silent,
'execArgv': settings.execArgv
});
}else{
this.process = process;
}
if(cluster.isMaster){
// Save worker in the cluster.workers array
cluster.workers[this.id]=this;
// Emit a fork event, on next tick
// There is no worker.fork event since this has no real purpose
process.nextTick(function(){
cluster.emit('fork',self);
});
}
// handle internalMessage, exit and disconnect event
this.process.on('internalMessage', handleMessage.bind(null,this));
this.process.once('exit',function(exitCode, signalCode){
prepareExit(self,'dead');
self.emit('exit', exitCode, signalCode);
cluster.emit('exit',self, exitCode, signalCode);
});
this.process.once('disconnect',function(){
prepareExit(self,'disconnected');
self.emit('disconnect');
cluster.emit('disconnect',self);
});
// relay message and error
this.process.on('message',this.emit.bind(this,'message'));
this.process.on('error',this.emit.bind(this,'error'));
}

每个worker分配了一个id,注册在cluster.workers里。父进程和子进程注册了一堆事件,这些事件涉及父子进程通讯,我们在下一篇文章里详细讨论。接下来,我们再看看setupMaster做了什么:


cluster.setupMaster =function(options){
// This can only be called from the master.
assert(cluster.isMaster);
// Don't allow this function to run more than once
if(masterStarted)return;
masterStarted =true;
// Get filename and arguments
options = options ||{};
// By default, V8 writes the profile data of all processes to a single
// v8.log.
//
// Running that log file through a tick processor produces bogus numbers
// because many events won't match up with the recorded memory mappings
// and you end up with graphs where 80+% of ticks is unaccounted for.
//
// Fixing the tick processor to deal with multi-process output is not very
// useful because the processes may be running wildly disparate workloads.
//
// That's why we fix up the command line arguments to include
// a "--logfile=v8-%p.log" argument (where %p is expanded to the PID)
// unless it already contains a --logfile argument.
var execArgv = options.execArgv || process.execArgv;
if(execArgv.some(function(s){return/^--prof/.test(s);})&&
!execArgv.some(function(s){return/^--logfile=/.test(s);}))
{
execArgv = execArgv.slice();
execArgv.push('--logfile=v8-%p.log');
}
// Set settings object
settings = cluster.settings ={
exec: options.exec|| process.argv[1],
execArgv: execArgv,
args: options.args || process.argv.slice(2),
silent: options.silent ||false
};
// emit setup event
cluster.emit('setup');
};


这里是复制子进程之前的一些准备工作。你可以显示调用这个方法并传入一些配置,主要就是指定子进程的执行入口options.exec,如果像官网例子那样不显式调用,则默认把当前文件作为入口。


标签:fork,cluster,process,worker,剖析,Cluster,复制,env,进程
From: https://blog.51cto.com/u_2700990/6922584

相关文章

  • Spring Boot Starter 剖析与实践
    引言对于Java开发人员来说,Spring框架几乎是必不可少的。它是一个广泛用于开发企业应用程序的开源轻量级框架。近几年,SpringBoot在传统Spring框架的基础上应运而生,不仅提供了Spring的全部功能,还使开发人员更加便捷地使用。在使用SpringBoot时,我们经常会接触到各种Spr......
  • redis主从复制
    1.概念指将一台Redis服务器的数据,复制到其它的Redis服务器。前者称为主节点(master),后者称为从节点(slave);数据的复制是单向的,只能由主节点到从节点。2.配置步骤1、在/etc/redis下面,将6379.conf拷贝两份,分别称为6380.conf与6381.conf2、修改配置文件6380.conf与6381.conf中......
  • Inofity-tools+Rsync实施复制实战
    Inofity-tools+Rsync实施复制实战一、先准备rsyncd服务环境1.快速的部署rsyncd服务端#!/bin/bashyuminstallrsync-ycat>/etc/rsyncd.conf<<'EOF'uid=wwwgid=wwwport=873fakesuper=yesusechroot=nomaxconnections=200timeout=600ignoree......
  • 什么东西别人难以复制?
    作为一名创业家,对在牙医诊所的漫长等待经历感到厌烦的他判定,牙医诊所的管理非常糟糕。因此他设计开发了一款全面的牙医诊所管理系统并推向市场。软件很贵,每诊所每年要25000美元,但仍深受诊所欢迎,因为那些牙医意识到部署系统后显著节省了成本。而Dentasoft公司也迅速发展成为一......
  • 微信 H5 页面兼容性——复制到剪贴板
    在开发微信H5页面时,时常会遇到难以解决的兼容性问题,现收集问题和解决方案,以备后用。在PC浏览器和手机移动端浏览器中,WebAPI提供了两种方式:1.1.Document.execCommand()方法Document.execCommand()是操作剪贴板的传统方法,各种浏览器都支持。它支持复制、剪切和粘贴这三......
  • 运行 docker-compose -f common.yml -f kafka_cluster.yml up 命令之后,其中一个broker
    今天在运行docker-compose-fcommon.yml-fkafka_cluster.ymlup,这条命令的时候,原来应该启动的broker-3容器没有启动,然后允许dockerps-a查看存在但是没有up的容器。 找到broker-3的容器id:,dockerlogs a6488cb653a1 。找打容器启动日志,发现关键错误信息。En......
  • xtrabackup 重建 Slave 复制
    一:物理全备优先选择基于slave节点全备;当不存在可用的slave节点时,选择master节点备份基于slave节点:innobackupex--defaults-file=/etc/my.cnf-S/tmp/mysql.sock--ftwrl-wait-threshold=2--ftwrl-wait-timeout=3--slave-info--host=127.0.0.1--user=xxx--pass......
  • mysql 延迟复制 second bechind
    实现MySQL延迟复制secondbehind1.简介在MySQL复制过程中,主服务器(Master)将二进制日志(BinaryLog)中的事件复制到从服务器(Slave)上。而"mysql延迟复制secondbehind"的目标是在从服务器上设置延迟时间来实现数据同步的延迟。本文将介绍如何通过更改MySQL的复制参数来实......
  • python 矩阵自我复制
    Python矩阵自我复制实现指南作为一名经验丰富的开发者,我将带领你学习如何实现"Python矩阵自我复制"。在本文中,我们将使用Python编程语言来实现这个功能。首先,让我们来看一下整个实现的流程。实现流程下面是实现"Python矩阵自我复制"的步骤:步骤描述1创建一个矩阵2......
  • 3-2 编写一个函数 escape(s, t),将字符串 t 复制到字符串 s 中,并在复制 过程中将换行
    ArchlinuxGCC13.1.1 202304292023-07-3012:57:46星期日 点击查看代码#include<stdio.h>voidescape(chars[],chart[]){inti,j;i=j=0;while(t[i]!='\0'){switch(t[i]){case�......