以celery worker启动为例: 起celery worker 的指令为 celery -A mysite.celery worker 如果指定-c 2 代表celery fork出两个子进程。 首先从启动worker进程的入口出发: 文件:Celery/bin/celery.py 这个文件是celery命令行的启动命令需要用到的模块引导文件,代码如下: def main(argv=None): # Fix for setuptools generated scripts, so that it will # work with multiprocessing fork emulation. # (see multiprocessing.forking.get_preparation_data()) try: if __name__ != '__main__': # pragma: no cover sys.modules['__main__'] = sys.modules[__name__] cmd = CeleryCommand() cmd.maybe_patch_concurrency() from billiard import freeze_support freeze_support() cmd.execute_from_commandline(argv) # 执行celery指令,argv为celery指令参数 except KeyboardInterrupt: pass execute_from_commandline指令关键步骤: *************************************************************************************************************************** keystep1: argv = self.setup_app_from_commandline(argv) # 获取command的配置参数,尤其是app属性,我们这边指定的就是mysite def setup_app_from_commandline(self, argv): preload_options = self.parse_preload_options(argv) quiet = preload_options.get('quiet') if quiet is not None: self.quiet = quiet try: self.no_color = preload_options['no_color'] except KeyError: pass workdir = preload_options.get('working_directory') #制定了工作目录,首先切换目录 if workdir: os.chdir(workdir) app = (preload_options.get('app') or #从参数中获取app属性 os.environ.get('CELERY_APP') or self.app) preload_loader = preload_options.get('loader') if preload_loader: # Default app takes loader from this env (Issue #1066). os.environ['CELERY_LOADER'] = preload_loader loader = (preload_loader, os.environ.get('CELERY_LOADER') or 'default') broker = preload_options.get('broker', None) if broker: os.environ['CELERY_BROKER_URL'] = broker config = preload_options.get('config') if config: os.environ['CELERY_CONFIG_MODULE'] = config if self.respects_app_option: if app: self.app = self.find_app(app) #这里的find_app其实就是将字符串比如'celery.concurrency.processes.TaskPool'转化成 <class 'celery.concurrency.processes.TaskPool'>,然后我们这边的app,应该就是 Celery类了,我们填参数时,可以填到mysite.celery,代码自动寻找模块的app属性,这也是名字叫做find_app的原因了。 elif self.app is None: self.app = self.get_app(loader=loader) if self.enable_config_from_cmdline: argv = self.process_cmdline_config(argv) else: self.app = Celery(fixups=[]) user_preload = tuple(self.app.user_options['preload'] or ()) if user_preload: user_options = self.preparse_options(argv, user_preload) for user_option in user_preload: user_options.setdefault(user_option.dest, user_option.default) signals.user_preload_options.send( sender=self, app=self.app, options=user_options, ) return argv *************************************************************************************************************************** keystep2: self.handle_argv 获取启动参数并执行指令,这里面需要注意的是我们目前有三个类,CeleryCommand继承自command基类,CeleryCommand包含 属性commands,里面是一个包含指令参数和不同指令类的映射map。然后每个指令类继承自command。这里的self指的是CeleryCommand类。 def handle_argv(self, prog_name, argv): self.prog_name = self.prepare_prog_name(prog_name) argv = self._relocate_args_from_start(argv) _, argv = self.prepare_args(None, argv) try: command = argv[0] except IndexError: #如果指令不含在commands的map中,执行help指令 command, argv = 'help', ['help'] return self.execute(command, argv) #执行对应指令 def execute(self, command, argv=None): try: cls = self.commands[command] #找到对应的指令类 except KeyError: cls, argv = self.commands['help'], ['help'] #如果指令不存在,执行help指令 cls = self.commands.get(command) or self.commands['help'] try: return cls( #关键步骤,初始化worker指令类,然后执行指令的run_from_argv函数 app=self.app, on_error=self.on_error, no_color=self.no_color, quiet=self.quiet, on_usage_error=partial(self.on_usage_error, command=command), ).run_from_argv(self.prog_name, argv[1:], command=argv[0]) except self.UsageError as exc: self.on_usage_error(exc) return exc.status except self.Error as exc: self.on_error(exc) return exc.status *************************************************************************************************************************** keystep3:run_from_argv() 这块其实是执行了worker command的run函数: def run(self, hostname=None, pool_cls=None, app=None, uid=None, gid=None, loglevel=None, logfile=None, pidfile=None, state_db=None, **kwargs): maybe_drop_privileges(uid=uid, gid=gid) # # Pools like eventlet/gevent needs to patch libs as early # as possible. pool_cls = (concurrency.get_implementation(pool_cls) or self.app.conf.CELERYD_POOL) if self.app.IS_WINDOWS and kwargs.get('beat'): self.die('-B option does not work on Windows. ' 'Please run celery beat as a separate service.') hostname = self.host_format(default_nodename(hostname)) if loglevel: try: loglevel = mlevel(loglevel) except KeyError: # pragma: no cover self.die('Unknown level {0!r}. Please use one of {1}.'.format( loglevel, '|'.join( l for l in LOG_LEVELS if isinstance(l, string_t)))) #这块主要的代码,self.app其实就是之前find_app中找到的app对象,然后调用app的Worker属性的start方法。也就是启动我们的worker了。 return self.app.Worker( hostname=hostname, pool_cls=pool_cls, loglevel=loglevel, logfile=logfile, # node format handled by celery.app.log.setup pidfile=self.node_format(pidfile, hostname), state_db=self.node_format(state_db, hostname), **kwargs ).start() *************************************************************************************************************************** keystep4: celery.app.worker.Worker的启动 下面就是我们的celery对象的worker属性调用了,也就是我们的celery.apps.worker:Worker对象 @cached_property def Worker(self): return self.subclass_with_self('celery.apps.worker:Worker') #这块注意subclass_with_self函数,这个函数将Worker类的 的app属性设置为celery实例 下面看下我们self.app.Worker这个类的真面目,该类继承自WorkController类。在初始化Worker类时,主要初始化过程为父类WorkController的初始化: def __init__(self, app=None, hostname=None, **kwargs): self.app = app or self.app #这个app属性为我们的项目中初始化的celery类 self.hostname = default_nodename(hostname) self.app.loader.init_worker() #self.app.loader属于celery.loaders.app:AppLoader类对象,里面包含的app属性为celery实例 self.on_before_init(**kwargs) self.setup_defaults(**kwargs) self.on_after_init(**kwargs) self.setup_instance(**self.prepare_args(**kwargs)) self._finalize = [ Finalize(self, self._send_worker_shutdown, exitpriority=10), ] 1、self.app.loader.init_worker() 这块是调用app_loader类的父类BaseLoader的init_worker方法,我们可以把loader看成celery的加载类 def init_worker(self): if not self.worker_initialized: self.worker_initialized = True self.import_default_modules() self.on_worker_init() (1)第一个调用方法: import_default_modules-导入模块,这里触发了信号import_modules,那么这边信号的connect过程在哪定义的呢?答案为:在celery类中定义的autodiscover_tasks方法中存在import_default_modules信号的连接,连接的函数其实最终调用的还是self.loader.autodiscover_tasks(packages, related_name),绕了一圈还是回到了loader的autodiscover_tasks方法。这里有两个地方需要注意,第一就是我们在项目中经常会调用celery类的autodiscover_tasks来加载所有注册的应用,比如我自己在项目中的配置为: # 实例化Celery app = Celery(project_name) # 使用django的settings文件配置celery app.config_from_object('django.conf:settings') # Celery加载所有注册的应用 app.autodiscover_tasks(lambda: settings.INSTALLED_APPS) 最后一个配置项app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)其实就是调用连接我们的self.loader.autodiscover_tasks(packages, related_name)方法和import_modules这个信号。当worker进程起来,worker初始化时,信号import_modules触发,这个时候会开始加载模块。理解了这一点,其实第二点为什么信号连接和信号触发在不同的类中的原因也就知道了,我们配置的时候其实只会使用celery对象的接口,所以自然信号连接的方法放在celery类的接口里面了。 这里还需要理解的一个地方就是,我们这个地方加载的是什么?下面贴self.loader.autodiscover_tasks的源码: def autodiscover_tasks(self, packages, related_name='tasks'): self.task_modules.update( mod.__name__ for mod in autodiscover_tasks(packages or (), related_name) if mod) 简单解释下源码:self.task_modules一开始就是一个空的集合。autodiscover_tasks(packages or (),related_name) 就是一个模块的公共方法,目的就是 查询setting中所有应用的模块下的tasks.py,找到所有应用下的tasks.py后呢,将tasks.py导入,导入模块后,tasks.py中文件装饰器会执行。其实除了导入tasks.py 导入外,还会导入celery内置的一些模块和配置需要导入的模块。 (2)self.on_worker_init() 这个方法源码没写,pass掉 (3)注意到,之前还有个配置项叫做app.config_from_object('django.conf:settings'),这个主要是给app的属性_config_source赋值。换句话说,就是指定app的配置源。 2、on_before_init: def on_before_init(self, **kwargs): trace.setup_worker_optimizations(self.app) # this signal can be used to set up configuration for # workers by name. signals.celeryd_init.send( sender=self.hostname, instance=self, conf=self.app.conf, options=kwargs, ) check_privileges(self.app.conf.CELERY_ACCEPT_CONTENT) (1)setup_worker_optimizations:worker的配置,其实目的是给worker的app属性做配置,那么主要配置了什么呢?这个非常重要: 第一件事:app.set_current():这一步主要是配置一个当前线程的全局变量类threading.local的current_app属性。我们简单说明下threading.local类,这个类实例可用于多线程中的访问,每个线程可以设置类的属性,比如在当前线程A设置了 全局local类变量的current_app属性为app,其他线程B线程无法访问线程A的全局类变量的current_app属性,但是可以设置线程B的current_app属性。这里还要注意下,我们celery中的全局local变量类名字叫做_tls放在模块celery._state中; 第二件事:set_default_app:在模块celery._state中,设置一个全局变量default_app=app; 第三件事:app.finalize():这个函数非常重要,因为里面牵涉到任务的注册 def finalize(self, auto=False): with self._finalize_mutex: if not self.finalized: if auto and not self.autofinalize: raise RuntimeError('Contract breach: app not finalized') self.finalized = True #任务注册标志位,代表app对应的任务已经注册了,这边用了一个线程锁作为上下文管理器,防止多线程对标志位进行修改 _announce_app_finalized(self) #执行回调函数,回调函数什么时候注册呢,这块需要我们好好分析下 pending = self._pending while pending: maybe_evaluate(pending.popleft()) for task in values(self._tasks): task.bind(self) 解析:_announce_app_finalized(self): 我们在定义celery任务的时候,用了一个装饰器函数:@app.task(name='celery_task'),在导入带有装饰器的模块的时候,装饰器会执行,比如: @app.task(name='celery_task') def fun(): pass 此时第一步,由于是带参数的装饰器,所以实际的装饰器是_create_task_cls函数对象 然后@语法糖的作用: fun = _create_task_cls(fun) //即fun函数已经变成了一个task对象,且celery的app对象已经将包含fun的task对象注册到app中 现在我们来探究这个装饰器函数做了什么?下面就是app.task装饰器函数: def task(self, *args, **opts): """Creates new task class from any callable.""" #通过注释我们可以看到,其实就是将我们装饰的函数包装成一个task if _EXECV and not opts.get('_force_evaluate'): from . import shared_task return shared_task(*args, _force_evaluate=True, **opts) def inner_create_task_cls(shared=True, filter=None, **opts): #这块的参数就是装饰器中携带的参数,比如我们的name参数 _filt = filter # stupid 2to3 def _create_task_cls(fun): #这块的fun就是我们的装饰的函数 if shared: def cons(app): return app._task_from_fun(fun, **opts) #在不指定share参数的时候,默认会走到这 cons.__name__ = fun.__name__ connect_on_app_finalize(cons) #要注意这个地方,会将cons函数注册到集合_on_app_finalizers中,_announce_app_finalized调用的回调函数就是cons函数 调用cons函数返回源码如下: def _task_from_fun(self, fun, **options): if not self.finalized and not self.autofinalize: raise RuntimeError('Contract breach: app not finalized') base = options.pop('base', None) or self.Task #这个self.Task类默认值为celery.app.task:Task类 bind = options.pop('bind', False) T = type(fun.__name__, (base, ), dict({ #以self.Task作为基类,创建一个新的Task类 'app': self, 'accept_magic_kwargs': False, 'run': fun if bind else staticmethod(fun), '_decorated': True, '__doc__': fun.__doc__, '__module__': fun.__module__, '__wrapped__': fun}, **options))() task = self._tasks[T.name] # return global instance. #这个地方源码有问题,应该是self._tasks[T.name]=task 也就是创建的任务都将存储(注册)在self._task()里面 return task 也就是说worker启动时,所有调用@app.task装饰器装饰的函数都会用来创建一个任务类,比如TaskA(继承自celery.app.task:Task) 在项目中,我们使用TaskA.delay(fun)方法时,可以做到将fun的逻辑加入到TaskA里面,且将TaskA发布,这块逻辑比较复杂,在接下来我要 讲的AMQP协议中会讲(稍微解释下:TaskA.delay(fun)调用的是celery.app.task:Task基类的delay方法,Task基类调用的是app的 send_task方法) if self.accept_magic_kwargs: # compat mode task = self._task_from_fun(fun, **opts) if filter: task = filter(task) return task if self.finalized or opts.get('_force_evaluate'): ret = self._task_from_fun(fun, **opts) else: # return a proxy object that evaluates on first use ret = PromiseProxy(self._task_from_fun, (fun, ), opts, __doc__=fun.__doc__) self._pending.append(ret) if _filt: return _filt(ret) return ret return _create_task_cls if len(args) == 1: if callable(args[0]): return inner_create_task_cls(**opts)(*args) raise TypeError('argument 1 to @task() must be a callable') if args: raise TypeError( '@task() takes exactly 1 argument ({0} given)'.format( sum([len(args), len(opts)]))) return inner_create_task_cls(**opts) ********************************************************************************************************************** 扩展: 关于AMQP和关键点send_task,这个和worker启动无关,但是也非常重要。 一个关键的点:AMQP协议(高级消息队列协议,进程间传递异步消息的网络协议),celery其实实现了AMQP协议。 AMQP定义了几个角色:发布者(Publisher):发布者发布消息(Message)给交换机(Exchange)。 消息队列(Queue):交换机根据路由规则将收到的消息分发给与该交换机绑定的队列(Queue)。 消费者(Consumer):AMQP 代理(公司项目中使用的redis)会将消息投递给订阅了此队列的消费者,或者消费者按照需求自行获取。 通道(channel):有些应用需要与 AMQP 代理建立多个连接。无论怎样,同时开启多个 TCP 连接都是不合适的,因为这样做会消耗掉过多的系统资源并且使得防火墙的配置更加困难。AMQP 0-9-1 提供了通道(channels)来处理多连接,可以把通道理解成共享一个 TCP 连接的多个轻量化连接。在涉及多线程 / 进程的应用中,为每个线程 / 进程开启一个通道(channel)是很常见的,并且这些通道不能被线程 / 进程共享。一个特定通道上的通讯与其他通道上的通讯是完全隔离的,因此每个 AMQP 方法都需要携带一个通道号,这样客户端就可以指定此方法是为哪个通道准备的。 接下来我们分析send_task方法: def send_task(self, name, args=None, kwargs=None, countdown=None, eta=None, task_id=None, producer=None, connection=None, router=None, result_cls=None, expires=None, publisher=None, link=None, link_error=None, add_to_parent=True, reply_to=None, **options): task_id = task_id or uuid() producer = producer or publisher # XXX compat router = router or self.amqp.router conf = self.conf if conf.CELERY_ALWAYS_EAGER: # pragma: no cover warnings.warn(AlwaysEagerIgnored( 'CELERY_ALWAYS_EAGER has no effect on send_task', ), stacklevel=2) options = router.route(options, name, args, kwargs) if connection: producer = self.amqp.TaskProducer(connection) with self.producer_or_acquire(producer) as P: #这段代码是核心,self.producer_or_acquire(producer)对象为amqp.py模块中的TaskProducer类,也是发布者(Publisher)这个角色 self.backend.on_task_call(P, task_id) task_id = P.publish_task( #发布任务为关键的点,接下来重点分析,对于redis来讲,这里面的router-key决定了发布的队列 name, args, kwargs, countdown=countdown, eta=eta, task_id=task_id, expires=expires, callbacks=maybe_list(link), errbacks=maybe_list(link_error), reply_to=reply_to or self.oid, **options ) result = (result_cls or self.AsyncResult)(task_id) if add_to_parent: parent = get_current_worker_task() if parent: parent.add_trail(result) return result task_id = P.publish_task,忽略掉非关键步骤,分为以下小步骤: (1)信号触发:signals.before_task_publish.send触发,看官方文档,指的是任务发布前的派发,发送者是被发送的任务的名称。 (2)任务发布:self.publish(self指的是producer对象),发送消息给交换机。具体过程为:producer建立通道连接,然后组装message,组装的消息是符合AMQP协议的,消息队列可以使用redis(其实只需要使用redis的list类型作为简单的Queue,并没有消息订阅功能),另外我们通过在redis中查看发布的消息结构如下: {"task": "myapp.tasks.celery_test", "id": "633df5d5-95dc-4f57-87ec-e9bc22bcafc3", "args": [], "kwargs": {}, "retries": 0, "eta": null, "expires": null, "utc": true, "callbacks": null, "errbacks": null, "timelimit": [null, null], "taskset": null, "chord": null} 参数各个含义为: task:当前task在你项目中的路径 id:task id args:执行当前task所需要的位置参数 kwargs:执行当前task所需要的命名参数 retries:任务失败后重试次数 eta、expires、callbacks等参数可以参考官方文档 另外,还有一点非常重要:这里redis中已经发布的task消息和worker启动时注册的task类不是同一个东西,属于两个不同过程的产物,两者没有依赖关系(关键易混淆点) (3) 信号触发:signals.after_task_publish.send信号触发,任务发布后的信号触发,任务在被发送到中间后触发,注意这个是在发送任务的进程中执行 *************************************************************************************************************** 好了,言归正传,setup_worker_optimizations的第三件事,也就是任务创建和注册已经分析完了。 (2)signals.celeryd_init.send(sender=self.hostname, instance=self,conf=self.app.conf, options=kwargs,) 这是工作单元启动后发送的第一个信号。sender是工作单元的主机名,所以这个信号可以用来设置工作单元的特殊配置(如果你想给多个工作单元设置配置,你连接该信号的时候可以忽略 sender 参数),比如以下代码, 做了connect操作,进行信号触发后的特殊配置: from celery.signals import celeryd_init @celeryd_init.connect(sender='[email protected]') def configure_worker12(conf=None,**kwargs): conf.task_default_rate_limit= '10/m' 提供的参数: - sender 工作单元的节点名称 - instance 这是要初始化的 celery.apps.worker.Worker 实例。注意,至今为止,只设置了 app 和 hostname 属性,并且 __init__ 函数的余下部分还没有执行。 - conf 当前应用实例的配置。 - options 从命令行传递给工作单元的选项 以上就是on_before_init的全部内容, 3. self.setup_defaults(**kwargs):这个函数也是在初始化worker类的时候调用,这块也没什么特别,在初始化worker时有些参数没有带,在这里会初始化成默认值。 4. self.on_after_init(**kwargs):设置日志输出位置,不需要太过关注 5. 接下来就是重头戏,self.setup_instance(**self.prepare_args(**kwargs)),这里不需要关注prepare_args,这个函数没有做任何操作 def setup_instance(self, queues=None, ready_callback=None, pidfile=None, include=None, use_eventloop=None, exclude_queues=None, **kwargs): self.pidfile = pidfile self.setup_queues(queues, exclude_queues) # 听名字就知道,这个是设置amqp队列的。主要逻辑是self.app.amqp.queues.select(include), self.app.amqp.queues.deselect(exclude),也就是指定相关的消费与不消费队列 self.setup_includes(str_to_list(include)) # 我们之前已经在self._tasks中存放了装饰器@app.task装饰的任务函数生成的任务类。另外在之前在解释 autodiscover函数的时候知道,这个时候已经导入了setting中各个app下的tasks.py文件,这里主要是取任务 函数所在模块和tasks.py的并集,防止有模块没有导入而报错。源码注释如下: # Update celery_include to have all known task modules, so that we # ensure all task modules are imported in case an execv happens. # Set default concurrency if not self.concurrency: #设置并发数,如果没有设置并发数,取系统cpu的核数作为并发数 try: self.concurrency = cpu_count() except NotImplementedError: self.concurrency = 2 # Options self.loglevel = mlevel(self.loglevel) self.ready_callback = ready_callback or self.on_consumer_ready #如果worker设置了回调函数,就使用设置回调函数,否则采用默认回调函数, 也就是signals.worker_ready.send(sender=consumer)触发消费者 已经准备好的信号,注意这里信号还没有触发。只是给worker添加了回调函数 # this connection is not established, only used for params self._conninfo = self.app.connection() #这里并没有建立和代理的连接,只是用于初始化参数,self.app.connection()指的是 对消息队列(MQ)连接的一个抽象类Connection,属于kumbu库的一个类。一个 Connection就对应一个消息队列 的连接,而Transport才是真实的 MQ 连接,也是真正连接到 MQ(redis/rabbitmq) 的实例,这边 再开个小专题,讲下Connection、Transport、Channel之间的关系和区别, 见Kombu源码阅读 self.use_eventloop = ( self.should_use_eventloop() if use_eventloop is None else use_eventloop ) self.options = kwargs signals.worker_init.send(sender=self) # Initialize bootsteps self.pool_cls = _concurrency.get_implementation(self.pool_cls) self.steps = [] self.on_init_blueprint() #和on_before_init做了同样的事情 self.blueprint = self.Blueprint(app=self.app, on_start=self.on_start, on_close=self.on_close, on_stopped=self.on_stopped) self.blueprint.apply(self, **kwargs) #初始化worker依赖组件--蓝图,并进行apply完成蓝图各个step的依赖关系图的构建,并进行各个组件的初始化,调用各个组件的 ``__init__`` and ``include`` 方法 以上就是worker对象的初始化过程。 初始化结束后,由keystep3可知,会调用worker对象的start方法。 *****************************************************self.app.Worker(***).start()******************************************** worker的start方法实际调用的是self.blueprint.start(self)方法,其中self对应的是worker对象。以下是self.blueprint.start(self)方法源码: def start(self, parent): #这里的parent就是worker实例 self.state = RUN if self.on_start: self.on_start() for i, step in enumerate(s for s in parent.steps if s is not None): #这块就是调用各个组建的start方法 self._debug('Starting %s', step.alias) self.started = i + 1 step.start(parent) debug('^-- substep ok') 根据worker启动的日志来看,启动的组件为: [2021-06-07 23:55:28,349: DEBUG/MainProcess] | Worker: Starting Hub #组件1 Hub [2021-06-07 23:55:28,349: DEBUG/MainProcess] ^-- substep ok [2021-06-07 23:55:28,349: DEBUG/MainProcess] | Worker: Starting Pool #组件2 Pool [2021-06-07 23:55:28,390: DEBUG/Worker-1] Using selector: KqueueSelector [2021-06-07 23:55:28,433: DEBUG/Worker-2] Using selector: KqueueSelector [2021-06-07 23:55:28,473: DEBUG/Worker-3] Using selector: KqueueSelector [2021-06-07 23:55:28,507: DEBUG/Worker-4] Using selector: KqueueSelector [2021-06-07 23:55:28,548: DEBUG/Worker-5] Using selector: KqueueSelector [2021-06-07 23:55:28,586: DEBUG/Worker-6] Using selector: KqueueSelector [2021-06-07 23:55:28,621: DEBUG/Worker-7] Using selector: KqueueSelector [2021-06-07 23:55:28,654: DEBUG/MainProcess] ^-- substep ok [2021-06-07 23:55:28,655: DEBUG/MainProcess] | Worker: Starting Consumer #组件3 Consumer [2021-06-07 23:55:28,655: DEBUG/MainProcess] | Consumer: Starting Connection [2021-06-07 23:55:28,660: DEBUG/Worker-8] Using selector: KqueueSelector [2021-06-07 23:55:28,671: INFO/MainProcess] Connected to redis://localhost:6379/0 [2021-06-07 23:55:28,672: DEBUG/MainProcess] ^-- substep ok [2021-06-07 23:55:28,672: DEBUG/MainProcess] | Consumer: Starting Events #组件4 Events [2021-06-07 23:55:28,682: DEBUG/MainProcess] ^-- substep ok [2021-06-07 23:55:28,682: DEBUG/MainProcess] | Consumer: Starting Mingle #组件5 Mingle:不同worker之间同步状态用的 [2021-06-07 23:55:28,682: INFO/MainProcess] mingle: searching for neighbors [2021-06-07 23:55:29,693: INFO/MainProcess] mingle: all alone [2021-06-07 23:55:29,693: DEBUG/MainProcess] ^-- substep ok [2021-06-07 23:55:29,694: DEBUG/MainProcess] | Consumer: Starting Gossip #组件6 Gossip:消费来自其他worker的事件 [2021-06-07 23:55:29,697: DEBUG/MainProcess] ^-- substep ok [2021-06-07 23:55:29,697: DEBUG/MainProcess] | Consumer: Starting Tasks #组件7 Tasks:启动消息Consumer [2021-06-07 23:55:29,702: DEBUG/MainProcess] ^-- substep ok [2021-06-07 23:55:29,703: DEBUG/MainProcess] | Consumer: Starting Control #组件8 Control:远程命令管理服务 [2021-06-07 23:55:29,706: DEBUG/MainProcess] ^-- substep ok [2021-06-07 23:55:29,706: DEBUG/MainProcess] | Consumer: Starting Heart #组件9 Heart:发送心跳事件(consumer的心跳) [2021-06-07 23:55:29,707: DEBUG/MainProcess] ^-- substep ok [2021-06-07 23:55:29,707: DEBUG/MainProcess] | Consumer: Starting event loop #组件10 event loop [2021-06-07 23:55:29,710: DEBUG/MainProcess] | Worker: Hub.register Pool... 接下来就是重点介绍下启动的各个重要组件的作用: 首先,我们的Hub对象,是一个事件循环对象,属于kombu库中的一个类Hub(kombu.async1.hub.Hub),而我们调用hub的start的方法时,目的是给worker对象的hub属性赋值为kombu库Hub类实例,实例 使用timer作为参数进行初始化。也就是说我们worker.componets下面所有的类对象,其实都是作为一个kombu对应对象的一个操作的封装。 再比如说我们在启动消费者这个组件的时候,实际上调用的方法是worker.componets模块下的Consumer.create方法: class Consumer(bootsteps.StartStopStep): last = True def create(self, w): if w.max_concurrency: prefetch_count = max(w.min_concurrency, 1) * w.prefetch_multiplier else: prefetch_count = w.concurrency * w.prefetch_multiplier c = w.consumer = self.instantiate( # 这里的w就是我们的worker实例对象,这里会给worker的consumer属性初始化kombu库的consumer类对象 w.consumer_cls, w.process_task, hostname=w.hostname, send_events=w.send_events, init_callback=w.ready_callback, initial_prefetch_count=prefetch_count, pool=w.pool, timer=w.timer, app=w.app, controller=w, hub=w.hub, worker_options=w.options, disable_rate_limits=w.disable_rate_limits, prefetch_multiplier=w.prefetch_multiplier, ) return c #这里return的值是worker类的consumer_cls实例,在调用Consumer的start方法的时候,会调用 worker类的consumer_cls的start方法,也就是celery.worker.consumer.Consumer类的start方法 ***************************************************************celery.worker.consumer.Consumer类的start方法************************************************************* 和worker的start方法类似,消费者组件的start方法也是启动消费者的组件,从日志打印也可以看到,消费者的组件分别为: 'celery.worker.consumer:Connection', 'celery.worker.consumer:Mingle', 'celery.worker.consumer:Events', 'celery.worker.consumer:Gossip', 'celery.worker.consumer:Heart', 'celery.worker.consumer:Control', 'celery.worker.consumer:Tasks', 'celery.worker.consumer:Evloop', 'celery.worker.consumer:Agent', 下面选取一些重要的组件进行分析: celery.worker.consumer:Evloop:事件循环 class Evloop(bootsteps.StartStopStep): label = 'event loop' last = True def start(self, c): self.patch_all(c) c.loop(*c.loop_args()) #这里的c就是我们Consumer类,c.loop是由Consumer类初始化,也就是以下代码决定的: if not hasattr(self, 'loop'): self.loop = loops.asynloop if hub else loops.synloop 注意,这里的asynloop并不是一个类,而是loops模块的一个函数,只有带hub参数,才是走异步循环asynloop函数 def patch_all(self, c): c.qos._mutex = DummyLock() 接下来我们看下loops.asynloop函数: def asynloop(obj, connection, consumer, blueprint, hub, qos, heartbeat, clock, hbrate=2.0, RUN=RUN): """Non-blocking event loop consuming messages until connection is lost, #由注释可以看出,这块其实是事件循环 or shutdown is requested.""" update_qos = qos.update hbtick = connection.heartbeat_check errors = connection.connection_errors heartbeat = connection.get_heartbeat_interval() # negotiated on_task_received = obj.create_task_handler() #这里很关键,obj这里指的是consumer对象,consumer对象返回的函数如下关键点1 if heartbeat and connection.supports_heartbeats: hub.call_repeatedly(heartbeat / hbrate, hbtick, hbrate) consumer.callbacks = [on_task_received] #这里的consumer参数是worker.consumer.Consumer类的task_consumer属性值,这个属性是在组件celery.worker.consumer:Tasks 调用start方法时赋值的,代码如下: c.task_consumer = c.app.amqp.TaskConsumer(c.connection, on_decode_error=c.on_decode_error,)注意 c.app.amqp.TaskConsumer是继承自consumer的类,所以这段代码里面的consumer也就是kombu里面的consumer类,这里就是将 kombu里面的consumer类的callbacks属性设置为on_task_received,下面会分析什么时候会调用kombu中consumer类的callback属性 里面的函数 consumer.consume() #这里调用了consumer.consume()方法,在Kombu源码分析中可以知道,此时其实只是建立了映射关系,具体的映射关系见关键点2 obj.on_ready() obj.controller.register_with_event_loop(hub) obj.register_with_event_loop(hub) # did_start_ok will verify that pool processes were able to start, # but this will only work the first time we start, as # maxtasksperchild will mess up metrics. if not obj.restart_count and not obj.pool.did_start_ok(): raise WorkerLostError('Could not start worker processes') # consumer.consume() may have prefetched up to our # limit - drain an event so we are in a clean state # prior to starting our event loop. if connection.transport.driver_type == 'amqp': hub.call_soon(connection.drain_events) #这里主要是生成事件promise类,然后放入hub的ready属性中 # FIXME: Use loop.run_forever # Tried and works, but no time to test properly before release. hub.propagate_errors = errors loop = hub.create_loop() #这里的loop是hub创建的生成器,生成器没有返回值 try: while blueprint.state == RUN and obj.connection: # shutdown if signal handlers told us to. if state.should_stop: raise WorkerShutdown() elif state.should_terminate: raise WorkerTerminate() # We only update QoS when there is no more messages to read. # This groups together qos calls, and makes sure that remote # control commands will be prioritized over task messages. if qos.prev != qos.value: update_qos() try: next(loop) #每执行一次next(loop)都会执行connection.drain_events函数,从Kombu源码可知,此时会取出消息,将消息作为参数,执行关键点1的 on_task_received这个回调函数 except StopIteration: loop = hub.create_loop() finally: try: hub.reset() except Exception as exc: error( 'Error cleaning up after event loop: %r', exc, exc_info=1, ) ****关键点1:obj.create_task_handler()**** def create_task_handler(self): strategies = self.strategies on_unknown_message = self.on_unknown_message on_unknown_task = self.on_unknown_task on_invalid_task = self.on_invalid_task callbacks = self.on_task_message def on_task_received(body, message): #获取消息后,在这里执行消费消息 headers = message.headers try: type_, is_proto2 = body['task'], 0 except (KeyError, TypeError): try: type_, is_proto2 = headers['task'], 1 #这里的type_就是当前task在你项目中的路径,比如我们自己建的项目里面的myapp.tasks.celery_test except (KeyError, TypeError): return on_unknown_message(body, message) if is_proto2: body = proto2_to_proto1( #body就是我们函数所携带的参数信息 self.app, type_, body, message, headers) try: strategies[type_](message, body, #这里的strategies是一个字典类型,也就是说我们得找到key为myapp.tasks.celery_test所对应的函数对象来消费消息, message.ack_log_error, 那么问题来了,这个strategies字典是什么写的呢?见关键点3 message.reject_log_error, callbacks) except KeyError as exc: on_unknown_task(body, message, exc) except InvalidTaskError as exc: on_invalid_task(body, message, exc) return on_task_received ****关键点2:consumer.consume() **** 由Kombu的源码解读可以知道,消费者comsumer对象的consumer()方法旨在建立映射关系,具体如下: Channel是由Transport类创建得到,每个channel包含了: class Channel(AbstractChannel, base.StdChannel): def __init__(self, connection, **kwargs): self.connection = connection self._consumers = set() self._cycle = None self._tag_to_queue = {} self._active_queues = [] ... 其中,_consumers是相关联的消费者标签集合,_active_queues是相关联的Queue列表,_tag_to_queue则是消费者标签与Queue的映射 注意,consumer对象初始化的时候有一个很重要的参数:callbacks 我们可以不传,此时callbacks默认是none,比如我们celery在初始化consumer就没有传这个参数,而是在组件consumer启动的时候给这个属性赋值,那么这个参数有什么用呢?建立映射关系: 针对消费者关联的每个队列,分别建立以下映射; (1)self.connection._callbacks[queue] = _callback 这里的connection就是我们的transport对象,所以这里是建立了队列和回调函数的映射,这里要注意的是_callback函数最终调用的其实也就是消费者consumer对象的callbacks里面的回调函数; (2)self._consumers.add(consumer_tag) 这里的self就是channel,也就是我们的轻量化连接,这里就建立了连接被哪些消费者共享关系 在建立好映射后,我们需要一个循环机制将消息取出来进行消费,也就是执行回调函数,在Kombu源码解读里面是通过Transport的drain_events方法实现的,实现逻辑为: 遍历Transport生成的所有channel,对每个channel,如果channel对应的消费者标签(self._consumers.add(consumer_tag)时添加的)存在,就遍历调用每个队列对应的回调函数(也就是在self.connection._callbacks[queue] = _callback时候写入的映射关系) **** 关键点3:写入strategies **** 写入strategies是worker.consumer.Consumer类的update_strategies写入的,在consumer组件celery.worker.consumer:Tasks启动的时候,会调用该方法: def update_strategies(self): loader = self.app.loader for name, task in items(self.app.tasks): #看到这里就很明了了,self.app.tasks属性就是我们之前注册到app的任务类了 self.strategies[name] = task.start_strategy(self.app, self) task.__trace__ = build_tracer(name, task, loader, self.hostname, app=self.app) 在task类中: def start_strategy(self, app, consumer, **kwargs): return instantiate(self.Strategy, self, app, consumer, **kwargs) def instantiate(name, *args, **kwargs): """Instantiate class by name. See :func:`symbol_by_name`. """ return symbol_by_name(name)(*args, **kwargs) 可以看到其实就是以对task函数进行调用 ***************************************************************taskpool.start():起进程执行具体消息************************************************************* 真正消费消息在taskpool里面,在启动pool类的时候,会调用worker.pool_cls(这个属性由启动worker时候的传参决定,类分别对应celery.concurrency下不同模块的TaskPool类)的on_start方法: 以celery/concurrency/prefork.py中的TaskPool类为例, 里面主要是fork出worker的子进程,并且准备好接受任务,具体细节可以参考博客:https://zhuanlan.zhihu.com/p/56416240: def on_start(self): """Run the task pool. Will pre-fork all workers so they're ready to accept tasks. """ forking_enable(self.forking_enable) Pool = (self.BlockingPool if self.options.get('threads', True) else self.Pool) P = self._pool = Pool(processes=self.limit, #重点是初始化pool对象,后面都是创建pool对象方法的代理 initializer=process_initializer, on_process_exit=process_destructor, synack=False, **self.options) # Create proxy methods self.on_apply = P.apply_async self.maintain_pool = P.maintain_pool self.terminate_job = P.terminate_job self.grow = P.grow self.shrink = P.shrink self.flush = getattr(P, 'flush', None) # FIXME add to billiard 接下来我们来看下pool的实例化,在类的初始化过程中,执行了: self._processes = self.cpu_count() if processes is None else processes #如果传入了进程数量,进程属性值为传参,否则以cpu数量值为进程属性值 for i in range(self._processes): #根据进程属性的数值,起指定个数的进程 self._create_worker_process(i) self._create_worker_process(i)代码如下: def _create_worker_process(self, i): sentinel = Event() if self.allow_restart else None inq, outq, synq = self.get_process_queues() w = self.Worker( #这里的w是模块billiard.pool.Worker中的worker类,继承子process类,也就是我们的进程类 inq, outq, synq, self._initializer, self._initargs, self._maxtasksperchild, sentinel, self._on_process_exit, # Need to handle all signals if using the ipc semaphore, # to make sure the semaphore is released. sigprotection=self.threads, ) self._pool.append(w) self._process_register_queues(w, (inq, outq, synq)) w.name = w.name.replace('Process', 'PoolWorker') w.daemon = True w.index = i w.start() #这里就是进程类的初始化,也就是进程的启动,这里调用的是worker父类process的start方法 self._poolctrl[w.pid] = sentinel if self.on_process_up: self.on_process_up(w) return w process类的start方法: def start(self): ''' Start child process ''' assert self._popen is None, 'cannot start a process twice' assert self._parent_pid == os.getpid(), \ 'can only start a process object created by current process' _cleanup() if self._Popen is not None: Popen = self._Popen else: from .forking import Popen self._popen = Popen(self) #这个Popen类的初始化比较重要 self._sentinel = self._popen.sentinel _current_process._children.add(self) Popen类的初始化: class Popen(object): ''' Start a subprocess to run the code of a process object #从注释可以看到,这个类的方法是创建进程的子进程,这里的process object也就是父进程worker类,这里主要就是执行 worker类的workloop方法 ''' _tls = thread._local() def __init__(self, process_obj): _Django_old_layout_hack__save() # create pipe for communication with child rfd, wfd = os.pipe() # get handle for read end of the pipe and make it inheritable rhandle = duplicate(msvcrt.get_osfhandle(rfd), inheritable=True) os.close(rfd) # start process cmd = get_command_line() + [rhandle] cmd = ' '.join('"%s"' % x for x in cmd) hp, ht, pid, tid = _subprocess.CreateProcess( _python_exe, cmd, None, None, 1, 0, None, None, None ) close(ht) if isinstance(ht, int_types) else ht.Close() (close(rhandle) if isinstance(rhandle, int_types) else rhandle.Close()) # set attributes of self self.pid = pid self.returncode = None self._handle = hp self.sentinel = int(hp) # send information to child prep_data = get_preparation_data(process_obj._name) to_child = os.fdopen(wfd, 'wb') Popen._tls.process_handle = int(hp) try: dump(prep_data, to_child, HIGHEST_PROTOCOL) dump(process_obj, to_child, HIGHEST_PROTOCOL) finally: del Popen._tls.process_handle to_child.close() ***********************************************************worker类的workloop方法:最终执行任务的地方************************************************** def workloop(self, debug=debug, now=monotonic, pid=None): pid = pid or os.getpid() put = self.outq.put inqW_fd = self.inqW_fd synqW_fd = self.synqW_fd maxtasks = self.maxtasks prepare_result = self.prepare_result wait_for_job = self.wait_for_job _wait_for_syn = self.wait_for_syn def wait_for_syn(jid): i = 0 while 1: if i > 60: error('!!!WAIT FOR ACK TIMEOUT: job:%r fd:%r!!!', jid, self.synq._reader.fileno(), exc_info=1) req = _wait_for_syn() if req: type_, args = req if type_ == NACK: return False assert type_ == ACK return True i += 1 completed = 0 while maxtasks is None or (maxtasks and completed < maxtasks): req = wait_for_job() if req: type_, args_ = req assert type_ == TASK job, i, fun, args, kwargs = args_ put((ACK, (job, i, now(), pid, synqW_fd))) if _wait_for_syn: confirm = wait_for_syn(job) if not confirm: continue # received NACK try: result = (True, prepare_result(fun(*args, **kwargs))) except Exception: result = (False, ExceptionInfo()) try: put((READY, (job, i, result, inqW_fd))) except Exception as exc: _, _, tb = sys.exc_info() try: wrapped = MaybeEncodingError(exc, result[1]) einfo = ExceptionInfo(( MaybeEncodingError, wrapped, tb, )) put((READY, (job, i, (False, einfo), inqW_fd))) finally: del(tb) completed += 1 debug('worker exiting after %d tasks', completed) if maxtasks: return EX_RECYCLE if completed == maxtasks else EX_FAILURE return EX_OK
标签:celery,None,task,self,worker,Celery,源码,app From: https://www.cnblogs.com/kevin-zsq/p/16845362.html