代码fork 自awesomeoxc/xxl-job-executor-nodejs,进行了一些以来包的升级,同时发布npm包到npm 仓库中,方便使用
npm 包名称
npm 包我已经发布npm 仓库中了,可以直接使用@dalongrong/xxl-job-executor
参考使用
- 安装
npm install @dalongrong/xxl-job-executor --save
or
yarn add @dalongrong/xxl-job-executor
- handler 定义
jobHandlers.js
/**
* demo任务
* @param {any} jobLogger 由xxl-job组件定义的任务logger,会将日志内容输出到文件,可在调度中心查看执行日志
* @param {{ jobParam1: any, jobParam2: any }} jobParams 任务参数
* @param {Object} context 任务上下文
* @return {Promise<void>} 函数必须返回一个 promise
*/
const demoJobHandler = async (jobLogger, jobParams, context) => {
jobLogger.debug('params: %o, context: %o', jobParams, context)
const sleep = async (millis) => new Promise((resolve) => setTimeout(resolve, millis))
for (let i = 1; i < 10; i++) {
await sleep(1000)
jobLogger.debug(`${i}s passed`)
}
//
// do error handler with throw exception, default is success
throw new Error('exector jobHandler is error')
// return { code: 500, msg: `exector jobHandler is error)` }
}
module.exports = {
jobHandlers: new Map([
['demoJobHandler', demoJobHandler],
]),
}
一些说明: 默认xxljob 的job 之行状态是正常的,但是如果我们的业务需要自定义异常(方便报警通知的场景),可以自己抛出一个exception
同时返回对应的信息,方便报警集成
- 集成
因为handler 对应的executor 需要是一个long running 的任务,同时需要支持api 操作(rpc 接口)具体示例在example 中有
web 服务代码
require("dotenv").config();
// 实例化 XxlJobExecutor 组件
const XxlJobExecutor = require('@dalongrong/xxl-job-executor')
const { jobHandlers } = require('./jobHandlers')
const context = { /* anything*/ }
const xxlJobExecutor = new XxlJobExecutor(jobHandlers, context)
const address =require('address')
// 实例化 express app
const app = require('express')()
app.use(require('body-parser').json())
console.log(address.ip())
// 应用 XxlJobExecutor 组件,注意因为注册的执行器是需要在管理中可以接口调用的,所以注册的地址需要是一个可达地址(对于nodejs 可以通过address 模块获取ip 地址)
xxlJobExecutor.applyMiddleware({ app, appType: 'EXPRESS', appDomain: `http://${address.ip()}:3000`, path: '/job' })
app.listen(3000, '0.0.0.0', () => console.log('server startup'))
// 应用退出前从注册中心摘除执行器
// const { addShutdownHandlers } = require('./shutdown')
// addShutdownHandlers(xxlJobExecutor.close.bind(xxlJobExecutor))
process.on('exit', () => {
console.log('************ The process exits completely. ************')
xxlJobExecutor.close()
})
process.on('SIGINT', async () => {
console.log('************ The process SIGINT completely. ************')
await xxlJobExecutor.close()
process.exit()
})
process.on('SIGHUP', async () => {
console.log('************ The process SIGHUP completely. ************')
await xxlJobExecutor.close()
process.exit()
})
process.on('SIGTERM', async () => {
console.log('************ The process SIGTERM completely. ************')
await xxlJobExecutor.close()
process.exit()
})
说明
代码主要是fork 自社区提供的,自己调整了包名,详细的可以参考源码, xxl-job-executor-nodejs 客户端可以方便
业务集成,实际上可以参考java 版本的进行一些调整,更方便业务使用
参考资料
https://www.npmjs.com/package/@dalongrong/xxl-job-executor
https://www.xuxueli.com/xxl-job/
https://github.com/awesomeoxc/xxl-job-executor-nodejs
https://github.com/rongfengliang/xxl-job-executor-nodejs/tree/main