疑问和收获
-
exec
和execFile
到底有什么区别? -
为什么
exec/execFile/fork
都是通过spawn
实现的,spawn
的作用到底是什么? -
为什么
spawn
调用后没有回调,而exec
和execFile
能够回调? -
为什么
spawn
调用后需要手动调用child.stdout.on('data', callback)
,这里的child.stdout/child.stderr
到底是什么? -
为什么有
data/error/exit/close
这么多种回调,它们的执行顺序到底是怎样的?
1 Node 多进程 child_process 库 exec 方法源码执行流程分析
2 child_process 库 exec 源码精读
function validateString(value, name) {
if(typeof value !== 'string') {
throw new ERR_INVALID_ARG_TYPE(name, 'string', value)
}
}
function validateNumber(value, name) {
if(typeof value !== 'number') {
throw new ERR_INVALID_ARG_TYPE(name, 'number', value)
}
}
2.1 normalizeExecArgs 源码解读
exec()
参数解析
function normalizeExecArgs(command, options, callback) {
// 兼容参数个数为2个的情况
if(typeof options === 'funciton') {
callback = options
options = undefined
}
// 浅复制,不去clobber用户的options对象
options = { ...options }
options.shell = typeof options.shell === 'string' ? options.shell : true
return {
file: command,
options,
callback
}
}
2.2 normalizeExecFileArgs 源码解读
execFile()
参数解析
// funciton execFile(file /* , args, options, callback */)
let args = []
let callback
let options
let pos = 1
// 参数解析方法--无需限定参数个数
if(pos < arguments.length && Array.isArray(arguments[pos])) {
args = arguments[pos++]
} else if(pos < arguments.length && arguments[pos] === null) {
pos++
}
if(pos < arguments.length && typeof arguments[pos] === 'object') {
options = arguments[pos++]
} else if(pos < arguments.length && arguments[pos] === null) {
pos++
}
if(pos < arguments.length && typeof arguments[pos] === 'function') {
callback = arguments[pos++]
}
// 存在第四个参数则抛出异常
if(!callback && pos < arguments.length && arguments[pos] !== null) {
throw new ERR_INVALID_ARG_VALUE('args', arguments[pos])
}
options = {
encoding: 'utf8',
timeout: 0,
maxBuffer: MAX_BUFFER,
killSignal: 'SIGTERM',
cwd: null,
env: null,
shell: false,
...options
}
const child = spawn(file, args, { /* ... */ })
2.3 normalizeSpawnArguments 源码解读
spawn()
参数解析
function normalizeSpawnArguments(file, args, options) {
// 检验 file是否为字符串
validateString(file, 'file')
if(file.length == 0) {
throw new ERR_INVALID_ARG_TYPE('file', file, 'cannot be empty')
}
if(Array.isArray(args)) {
args = args.slice(0) // 浅拷贝
} else if (args === null) {
args = []
} else if(typeof args !== 'object') {
throw new ERR_INVALID_ARG_TYPE('args', 'object', args)
} else {
options = args
args = []
}
// 参数检查
if(options === undefined) {
options = {}
} else if(options === null || typeof options !== 'object') {
throw new ERR_INVALID_ARG_TYPE('options', 'object', options)
}
if(options.cwd !== null && typeof options.cwd !== 'string') {
throw new ERR_INVALID_ARG_TYPE('options.cwd', 'string', options.cwd)
}
if(options.detached !== null && typeof options.detached !== 'boolean') {
throw new ERR_INVALID_ARG_TYPE('options.detached', 'boolean', options.detached)
}
if(options.uid !== null && !isInt32(options.uid)) {
throw new ERR_INVALID_ARG_TYPE('options.uid', 'int32', options.uid)
}
if(options.gid !== null && !isInt32(options.gid)) {
throw new ERR_INVALID_ARG_TYPE('options.gid', 'int32', options.gid)
}
if(options.shell !== null && typeof options.shell !== 'boolean' && typeof options.shell !== 'string') {
throw new ERR_INVALID_ARG_TYPE('options.shell', ['boolean', 'string'], options.shell)
}
options = { ...options } // 浅拷贝
if(options.shell) {
const command = [file].concat(args).join(' ')
if(process.plaform === 'win32') {
if(typeof options.shell === 'string') {
file = options.shell
} else {
file = process.env.comspec || 'cmd.exe'
// 使用 cmd -c 执行命令
if(/^(?:.*\\)?cmd(?:\.exe)?$/i.test(file)) {
args = ['/d', '/s', '/c', `"${command}"`]
options.windowsVerbatimArguments = true
} else {
args = ['-c', command]
}
}
} else { /* ... ios mac android */ }
}
if(typeof options.argv0 === 'string') {
args.unshift(options.argv0)
} else {
args.unshift(file) // 构成终端执行语句 ['cmd.exe', '-c', 'node node_module']
}
const env = options.env || process.env
const envPairs = []
// 处理 env 的内容
for(const key in env) {
const value = env[key]
if(value !== undefined) {
envPairs.push(`${key}=${value}`)
}
}
return {
file,
args,
options,
envPairs
}
}
2.4 ChildProcess 源码解读
function ChildProcess() {
EventEmitter.call(this) // 注册EventEmitter 后续可通过on监听事件 emit触发事件
// ...定义一系列变量
this._handle = new Process() // spawn kill constructor
this._handle[owner_symbol] = this
// 进程执行完之后会执行的回调函数
this._handle.onexit = (exitCode, signalCode) => { /* ... */ }
}
2.5 spawn 流程&源码
funciton spawn(file, args, options) {
const opts = normalizeSpawnArguments(file, args, options)
const child = new ChildProcess()
options = opts.options
child.spawn({ /* ... */ })
}
3 深度分析 child_process 库 spawn 底层实现
ChildProcess.prototype.spawn = function(options) {
if(options === null || typeof options !== 'object') {
throw new ERR_INVALID_ARG_TYPE('options', 'object', options)
}
let stdio = options.stdio || 'pipe' // 创建管道实现父子进程的通信
stdio = getValidStdio(stdio, false) // 关键步骤1 -- 创建 pipe 管道
stdio = options.stdio = stdio.stdio
this.spawnfile = options.file
if(Array.isArray(options.args)) {
this.spawnargs = options.args
} else if(options.args === undefined) {
this.spawnargs = []
} else {
throw new ERR_INVALID_ARG_TYPE('options.args', 'Array', options.args)
}
const err = this._handle.spawn(options)
// ... 处理 err > 0的情况
this.pid = this._handle.pid
for(let i = 0; i < stdio.length; i++) {
const stream = stdio[i]
if(stream.handle) {
// 关键步骤2 创建 Socket 实例 -> socket 通信已创建
stream.socket = createSocket()
}
if(i > 0 && this.pid !== 0) {
// 绑定 close 监听
stream.socket.on('close', () => {
maybeClose(this)
})
}
}
// 关键步骤3
this.stdin = stdio.length >= 1 && stdio[0].socket !== undefined ? stdio[0].socket : null
this.stdout = stdio.length >= 2 && stdio[1].socket !== undefined ? stdio[1].socket : null
this.stderr = stdio.length >= 3 && stdio[2].socket !== undefined ? stdio[2].socket : null
this.stdio = []
for(i = 0; i < stdio.length; i++) {
this.stdio.push(stdio[i].socket === undefined ? null : stdio[i].socket)
}
return err
}
3.1 getValidStdio -- 创建 pipe 管道 源码解读
function getValidStdio(stdio, sync) {
if(typeof stdio === 'string') {
stdio = stdioStringToArray(stdio) // 'pipe' -> ['pipe', 'pipe', 'pipe']
} else if(!Array.isArray(stdio)) {
throw new ERR_INVALID_ARG_VALUE('stdio', inspect(stdio))
}
while(stdio.length < 3) stdio.push(undefined)
stdio = stdio.reduce((acc, stdio, i) => {
funciton cleanup() { /* ... */ }
// 设置默认值
if(stdio == null) stdio = i < 3 ? 'pipe' : 'ignore'
if(stdio === 'ignore') { // 静默执行子进程
acc.push({ type: 'ignore' })
} else if(stdio === 'pipe' || typeof stdio === 'number' && stdio < 0) {
var a = {
type: 'pipe',
readable: i === 0,
writable: i !== 0
}
// { bindm listen connet open fchmod constructor}
if(!sync) a.handle = new Pipe(PipeConstants.SOCKET)
acc.push(a)
}
return acc
}, [])
return { stdio }
}
stdio: [
{ type: 'pipe', readable: true, writable: false, handle: Pipe },
{ type: 'pipe', readable: false, writable: true, handle: Pipe },
{ type: 'pipe', readable: false, writable: true, handle: Pipe },
]
3.2 spawn 流程&源码
1. 同步执行流程
funciton spawn(file, args, options) {
const opts = normalizeSpawnArguments(file, args, options)
const child = new ChildProcess()
options = opts.options
child.spawn({ /* ... */ })
/* ... 定义变量 */
// 判断子进程的输出流
if(child.stdout) {
child.stdout.on('data', onChildStdout = chunk => {})
}
// 判断子进程的错误流
if(child.stderr) {
child.stderr.on('data', onChildStderr = error => {})
}
child.addListener('close', exithandler) // 进程结束退出
child.addListener('error', errorhandler) // 进程执行出现错误
return child
}
2. 异步执行流程
监听子进程的输出流
onChildStdout = chunk => {
1. Buffer.byteLength(chunk, 输出流) > options.maxBuffer 时
_stdout.push(输出流.slice(0, 超出长度))
kill()
2. _stdout.push(chunk)
}
进程执行完之后会执行的回调函数
flushStdio
将缓存中还未输出的内容输出
this._handle.onexit = (exitCode, signalCode) => {
this._stdin?.destory() // 销毁输入流
this._handle.close() // 关闭进程
// 没有出现执行异常时,触发 exit 事件
this.emit('exit', exitCode, signalCode)
process.nextTick(flushStdio, this)
maybeClose(this) // ...之后发出 close 事件
}
maybeClose() 触发的 exithandler 执行函数
exithandler = (code, signal) => {
stdout = _stdout.join('')
stderr = _stderr.join('')
callback(null, stdout, stderr) // 传入的回调
}
4 child_process 事件应用方法详解
4.1 命令执行是否异常 -- emit error
const child = cp.exec('node ' + path.resolve(__dirname, 'support.js'), (err, stdout, stderr) => {
console.log('callback start ----------');
console.log('eer', err);
console.log('stdout', stdout);
console.log('stderr', stderr);
console.log('callback end ----------');
})
4.2 onStreamRead -- emit data
child.stdout.on('data', chunk => {
console.log('stdout data', chunk);
})
child.stderr.on('data', chunk => {
console.log('stderr data', chunk);
})
4.3 onReadableStreamEnd -- socket emit close
child.stdout.on('close', () => {
console.log('stdout close');
})
child.stderr.on('close', () => {
console.log('stderr close');
})
4.4 Process.onexit -- emit exit
child.on('exit', exitCode => {
console.log('exit!', exitCode);
})
4.5 Socket all close -- emit close
child.on('close', () => {
console.log('close!');
})
5 深度解析 child_process 库 spawn 方法回调原理
this._handle.onexit = (exitCode, signalCode) => {
this._stdin?.destory() // 销毁输入流
this._handle.close() // 关闭进程
// 没有出现执行异常时,触发 exit 事件
this.emit('exit', exitCode, signalCode)
process.nextTick(flushStdio, this)
stream.socket.on('close', () => {
maybeClose(this) // ...之后发出 close 事件
})
}
进程执行完之后会执行的回调函数, Process.onexit 触发 maybeClose() 事件
5.1 createSocket() 事件
创建 Socket 实例 -> socket 通信已创建
function Socket(options) {
if(!(this instanceof Socket)) return new Socket(options)
/* ... */
// 关键二:onReadableStreamEnd
this.on('end', onReadableStreamEnd)
// 关键一:onStreamRead
initSocketHandle(this)
}
5.2 onStreamRead -- emit data
function initSocketHandle(self) {
/* ... */
self._handle.onread = onStreamRead // 写进数据流
}
// onStreamRead
// buf -- 输出的字符串
result = stream.push(buf)
// stream.push()
Readable.prototype.push = (chunk, encoding) => {
return readableAddChunk(this, chunk, encoding, false)
}
// readableAddChunk 调用 addChunk()
// addChunk
stream.emit('data', chunk)
5.3 onReadableStreamEnd -- socket emit close
触发 stdout stderr
function onReadableStreamEnd() {
this.destroy()
}
// this.destroy() 调用 this._destory()
Socket.prototype._destory = function(exception, cb) {
/* ... */
// this -> Pipe
this._handle.close(() => {
debug('emit close')
this.emit('close', isException) // -> 触发maybeClose
})
}
5.4 Process.onexit -- emit exit
this._stdin?.destory() // 销毁输入流
this._handle.close()
this.emit('exit', this.exitCode, this.signalCode)
6 child_process 库 fork 执行流程分析
6.1 getValidStdio -- 创建 pipe 管道
function getValidStdio(stdio, sync) {
/* 创建子进程*/
if(ipc !=== undefined) setupChannel(this, ipc)
}
6.2 setupChannel -- 创建Socket通信,在父子进程之间启动ipc
function setupChannel(target, channel) {
// control 用于建立ipc通信
const control = new Control()
}
7 Node 多进程源码总结
1. exec/execFile/spawn/fork
的区别
-
exec
: 原理是调用/bin/sh -c
执行我们传入的shell
脚本,底层调用了execFile
-
execFile
: 原理是直接执行我们传入的file
和args
,底层调用spawn
创建和执行子进程,并建立了回调,一次性将所有的stdout
和stderr
结果返回 -
spawn
: 原理是调用了internal/child_process
,实例化了ChildProcess
子进程对象,再调用child.spawn
创建子进程并执行命令, 底层是调用了child._handle.spawn
执行process_wrap
中的spawn
方法,执行过程是异步的,执行完毕后通过PIPE
进行单向数据通信,通信结束后会子进程发起onexit
回调,同时Socket
会执行close
回调 -
fork
: 原理是通过spawn
创建子进程和执行命令,采用node
执行命令,通过setupchannel
创建IPC
用于子进程和父进程之间的双向通信
2. data/error/exit/close
回调的区别
-
data
: 主进程读取数据过程中通过onStreamRead
发起的回调 -
error
: 命令执行失败后发起的回调 -
exit
: 子进程关闭完成后发起的回调 -
close
: 子进程所有Socket
通信端口全部关闭后发起的回调 -
stdout close/stderr close
: 特定的PIPE
读取完成后调用onReadableStreamEnd
关闭Socket
时发起的回调