首页 > 其他分享 >【Celery】异步任务框架入门使用

【Celery】异步任务框架入门使用

时间:2024-01-29 16:24:40浏览次数:28  
标签:异步 task 入门 celery py worker Celery 任务 tasks

背景. 项目中需要用到后台数据爬取更新的功能,同步做起来web页面毫无用户体验可言。使用celery异步任务框架来解决这个问题简单、高效。用了一段时间比较稳定,现在有空梳理下文档。

简介

Celery 是一个强大的分布式任务队列系统,它允许你将工作以异步的方式排队执行,这对于执行耗时的后台作业是非常有用的。它使用分布的消息传递来处理分布式系统之间的通信,并支持多种消息中间件作为传输,比如 RabbitMQ, Redis, Amazon SQS, Kafka 等等。

Celery 泛用性很强,它不仅可以用于Web应用程序,也适用于其他各种系统。在Web开发中,Celery 常与Django这样的Web框架一起使用,用于处理后台任务,如发送电子邮件、处理大量数据、定时任务等。

Celery的基本概念:

  1. 任务(Tasks):表示异步执行的工作单元,是你定义的函数。
  2. 任务队列(Queue):任务被发到队列中,等待被Worker执行。
  3. Worker:是运行在服务器上的进程,负责执行队列中的任务。
  4. Broker:消息代理,是一个中间件,用于接收发送给它的消息并将它们转发给相应的Worker。
  5. Backend:存储任务执行结果的地方,可以是数据库、缓存系统等。

Celery的使用流程一般为:

  1. 定义任务:这通常是定义一个Python函数,并将其装饰为Celery 提供的任务。
  2. 设置Broker:选择并配置一种消息中间件,如 RabbitMQ 或 Redis。
  3. 启动Worker:运行Celery Worker进程,这些进程监听任务消息并运行它们。
  4. 调用任务:从你的应用程序中异步调用任务。
  5. 获取结果:(可选)如果任务产生结果且需要访问,可从Backend中获取。

Celery配置和管理相对简单,它提供了命令行工具和某些设施来帮助你监控和管理任务。

官方文档https://docs.celeryq.dev/en/stable/getting-started/index.html

入门使用

安装

pip install celery[redis]  # 后面的例子均以redis作为存储任务信息的中间件

创建celery实例

# 文件名称: celery_tasks.py
app = Celery("my_celery", backend="redis://172.xx.xx.x1:6379/9", broker="redis://172.xx.xx.x1:6379/8")

worker:字符串,自定义的Celery应用/实例的名称,便于后续设置和引用中标识它。

backend:结果后端(result backend)数据库信息,用于储存任务执行的状态和结果。这里指定使用Redis的9号数据库做为后端存储。如果只指定broker,任务会正常执行,但是没有地方记录执行结果。所以这时候就需要backend来记录执行结果。

broker:消息代理(message broker)数据库信息,用于发送和接收任务消息的中间件。

使用不同的数据库编号(9和8)可以保证任务的存储与消息传递相互隔离,防止潜在的键值冲突。

定义celery任务

# 文件名称: celery_tasks.py
# 只需要使用实例的task方法装饰需要异步执行的方法即可,例如
@app.task
def add(a: int, b: int):
    return a + b

启动celery实例

# 在终端中执行
celery -A celery_tasks worker --loglevel=INFO

启动命令中celery_tasks为文件名也就是实例所在的文件名称,worker表示启动·worker实例,即上述文件中的app。

执行后,控制台可以看到如下输出信息:

(test_celery) PS E:\work\test_celery> celery -A celery_tasks worker --loglevel=INFO
 
 -------------- celery@no v5.3.6 (emerald-rush)
--- ***** -----
-- ******* ---- Windows-10-10.0.19041-SP0 2024-01-28 17:29:52
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app:         my_celery:0x4bd2250
- ** ---------- .> transport:   redis://172.xx.xx.x1:6379/8
- ** ---------- .> results:     redis://172.xx.xx.x1:6379/9
- *** --- * --- .> concurrency: 8 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery


[tasks]
  . celery_tasks.add

[2024-01-28 17:29:52,666: WARNING/MainProcess] E:\virtualenv\test_celery\lib\site-packages\celery\worker\consumer\consumer.py:507: CPendingDeprecationWarning: Th
e broker_connection_retry configuration setting will no longer determine
whether broker connection retries are made during startup in Celery 6.0 and above.
If you wish to retain the existing behavior for retrying connections on startup,
you should set broker_connection_retry_on_startup to True.
  warnings.warn(

[2024-01-28 17:29:52,682: INFO/MainProcess] Connected to redis://172.xx.xx.x1:6379/8
[2024-01-28 17:29:52,683: WARNING/MainProcess] E:\virtualenv\test_celery\lib\site-packages\celery\worker\consumer\consumer.py:507: CPendingDeprecationWarning: Th
e broker_connection_retry configuration setting will no longer determine
whether broker connection retries are made during startup in Celery 6.0 and above.
If you wish to retain the existing behavior for retrying connections on startup,
you should set broker_connection_retry_on_startup to True.
  warnings.warn(

[2024-01-28 17:29:52,690: INFO/MainProcess] mingle: searching for neighbors
[2024-01-28 17:29:53,032: INFO/SpawnPoolWorker-1] child process 15508 calling self.run()
[2024-01-28 17:29:53,048: INFO/SpawnPoolWorker-4] child process 18784 calling self.run()
[2024-01-28 17:29:53,051: INFO/SpawnPoolWorker-2] child process 23484 calling self.run()
[2024-01-28 17:29:53,053: INFO/SpawnPoolWorker-3] child process 19992 calling self.run()
[2024-01-28 17:29:53,054: INFO/SpawnPoolWorker-7] child process 25344 calling self.run()
[2024-01-28 17:29:53,064: INFO/SpawnPoolWorker-6] child process 4208 calling self.run()
[2024-01-28 17:29:53,064: INFO/SpawnPoolWorker-5] child process 21420 calling self.run()
[2024-01-28 17:29:53,072: INFO/SpawnPoolWorker-8] child process 22316 calling self.run()
[2024-01-28 17:29:53,726: INFO/MainProcess] mingle: all alone
[2024-01-28 17:29:53,774: INFO/MainProcess] celery@no ready.


可以看到执行机信息(操作系统、启动时间)、配置信息(app名称等)、已注册到worker的任务信息(add)、

、worker启动的8个子进程、当前worker的只有一个实例。

多文件定义任务

c_wk.py引入app后通过app.task定义任务:

# 文件名称: c_wk.py
from celery_tasks import app


@app.task
def sum(a: int, b: int):
    return a + b

这时候如果通过celery_tasks启动实例,只能发现一个任务,例如:

# 执行 celery -A celery_tasks worker --loglevel=INFO

[tasks]
  . celery_tasks.add

但是通过c_wk启动就能看到两个任务:

# 执行 celery -A c_wk worker --loglevel=INFO

[tasks]
  . c_wk.sum
  . celery_tasks.add

调用任务

在use_c_task.py中引入异步方法后通过delay传参调用即可。

# 文件名称:use_c_task.py

from celery_tasks import add

if __name__ == '__main__':
    add.delay(333, 222)

运行use_c_task.py后可以在启动的celery_tasks日志中看到任务接收和执行信息。

[2024-01-28 18:03:04,274: INFO/MainProcess] Task celery_tasks.add[211efed1-a126-46f5-b622-4bd93bd904b7] received
[2024-01-28 18:03:04,277: INFO/MainProcess] Task celery_tasks.add[620e1b0e-4969-4ace-943f-1b41192b2764] received

很奇怪,只看到接收了任务,但是没有执行?

原因是当前测试的worker是在windows上执行的,由于 Celery 4.x 及其以上版本在 Windows 上的兼容性问题,以及 Windows 不支持进程的fork模式。所以无法执行。此时,我们可以在执行命令后加-P solo 或者 --pool=solo使用单线程的方式启动worker。

-P或者--pool参数用于指定任务执行的并发方法(即并发池的实现方式),Celery支持几种类型的执行池,这些执行池允许你在运行时根据你的需求调整并发策略。

solo是一种执行池的类型,具有如下特点

  • solo模式会在一个单线程的进程中顺序执行任务。这意味着它一次只能处理一个任务。
  • 对于开发阶段或者调试任务来说,solo模式因其简单性而非常实用,它简化了调试过程,因为所有的操作都在一个线程中顺序发生,没有并发相关的复杂性。
  • Celery的官方文档曾经指出,由于Windows不支持fork(), prefork这类基于fork的并发执行策略在Windows平台上可能会遇到问题。因此,在Windows上运行Celery Worker时推荐使用solo模式,以保证兼容性。

启动后会发现,看不到了前文中提到的worker启动的多个子进程。

(test_celery) PS E:\work\test_celery> celery -A celery_tasks worker --loglevel=INFO --pool=solo
[2024-01-29 11:09:52,295: INFO/MainProcess] mingle: searching for neighbors
[2024-01-29 11:09:53,339: INFO/MainProcess] mingle: all alone
[2024-01-29 11:09:53,379: INFO/MainProcess] celery@no ready.

这时再运行use_c_task.py后在启动的celery_tasks日志中查看执行信息:

[2024-01-29 11:26:36,045: INFO/MainProcess] Task celery_tasks.add[d3adf45e-393a-4548-b070-c9266ca692b4] received
[2024-01-29 11:26:36,052: INFO/MainProcess] Task celery_tasks.add[d3adf45e-393a-4548-b070-c9266ca692b4] succeeded in 0.0s: 335

获取执行结果

修改任务:

# 文件名称: celery_tasks.py
from celery import Celery
import time


app = Celery("my_celery", backend="redis://172.xx.xx.x1:6379/9", broker="redis://172.xx.xx.x1:6379/8")

@app.task
def add(a: int, b: int):
    res = a + b
    time.sleep(10)  # 增加10秒延迟
    return res

调用入口:

# 文件名称:use_c_task.py

from celery_tasks import *

if __name__ == '__main__':
    st = time.time()
    task = add.delay(333, 22)
    res = task.get()
    # 计算耗时多少秒
    print(time.time() - st)
    print(res)

执行use_c_tasks之后控制台输出结果:

10.207865476608276
355

可以看到获取到了任务执行的返回值,同时也发现这里一直在等待任务执行完成。

如果我们的任务执行耗时是不确定的,但是又不想一直等待,这时候可以通过get()传入timeout参数来控制最长的等待时间,当到达了等待时间还没有返回值,则会抛出TimeoutError错误。

修改调用入口:

# 文件名称:use_c_task.py

from celery_tasks import *

if __name__ == '__main__':
    st = time.time()
    task = add.delay(333, 22)
    res = task.get(timeout=2)  # 此处加入等待超时2秒
    # 计算耗时多少秒
    print(time.time() - st)
    print(res)

执行任务后控制台抛出异常:

E:\virtualenv\test_celery\Scripts\python.exe E:\work\test_celery\use_c_task.py 
Traceback (most recent call last):
  File "E:\virtualenv\test_celery\lib\site-packages\celery\backends\asynchronous.py", line 287, in _wait_for_pending
    for _ in self.drain_events_until(
  File "E:\virtualenv\test_celery\lib\site-packages\celery\backends\asynchronous.py", line 52, in drain_events_until
    raise socket.timeout()
socket.timeout

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "E:\work\test_celery\use_c_task.py", line 15, in <module>
    res = task.get(timeout=2)
  File "E:\virtualenv\test_celery\lib\site-packages\celery\result.py", line 251, in get
    return self.backend.wait_for_pending(
  File "E:\virtualenv\test_celery\lib\site-packages\celery\backends\asynchronous.py", line 221, in wait_for_pending
    for _ in self._wait_for_pending(result, **kwargs):
  File "E:\virtualenv\test_celery\lib\site-packages\celery\backends\asynchronous.py", line 293, in _wait_for_pending
    raise TimeoutError('The operation timed out.')
celery.exceptions.TimeoutError: The operation timed out.

任务超时

如何在等待超时后不抛出异常,程序继续往下执行呢?

解决办法:通过获取任务执行状态来获取执行结果,此处引入ready()方法

ready:获取任务的执行状态,返回False代表未执行完成;返回True代表执行结束了

修改调用入口:

# 文件名称:use_c_task.py

from celery_tasks import *

if __name__ == '__main__':
    st = time.time()
    task = add.delay(333, 22)
    res = 0
    wait_time = 15
    for n in range(wait_time):
        status = task.ready()
        if not status:
            time.sleep(1)
            print(f'任务未完成,继续等待...{n + 1}s')
        else:
            res = task.get(timeout=1)
            break
    else:
        print(f'等待超时:{wait_time}s')
    # 计算耗时多少秒
    print(time.time() - st)
    print(res)

执行上述代码,查看执行结果日志信息:

E:\virtualenv\test_celery\Scripts\python.exe E:\work\test_celery\use_c_task.py 
任务未完成,继续等待...1s
任务未完成,继续等待...2s
任务未完成,继续等待...3s
任务未完成,继续等待...4s
任务未完成,继续等待...5s
任务未完成,继续等待...6s
任务未完成,继续等待...7s
任务未完成,继续等待...8s
任务未完成,继续等待...9s
任务未完成,继续等待...10s
10.31168532371521
355

可以看到任务未执行完成时,会继续等待,任务执行完成后才会获取任务的返回值。

如果任务超时了呢?修改等待时间:

# 文件名称:use_c_task.py

from celery_tasks import *

if __name__ == '__main__':
    st = time.time()
    task = add.delay(333, 22)
    res = 0
    wait_time = 5    # 此处修改超时时间为5秒
    for n in range(wait_time):
        status = task.ready()
        if not status:
            time.sleep(1)
            print(f'任务未完成,继续等待...{n + 1}s')
        else:
            res = task.get(timeout=1)
            break
    else:
        print(f'等待超时:{wait_time}s')
    # 计算耗时多少秒
    print(time.time() -  st)
    print(res)

执行结果日志:

E:\virtualenv\test_celery\Scripts\python.exe E:\work\test_celery\use_c_task.py 
任务未完成,继续等待...1s
任务未完成,继续等待...2s
任务未完成,继续等待...3s
任务未完成,继续等待...4s
任务未完成,继续等待...5s
等待超时:5s
5.25053858757019
0

通过ready()获取了任务的执行状态,动态获取执行结果,是可行的。

任务异常

如果由于传参问题或者定时任务本身的问题导致任务异常终止了,如何处理?

解决办法:通过get传入propagate获取执行的异常信息

我们先来看下没有传入此参数时程序传入了错误的参数,worker和任务分别有什么现象:

# 文件名称:use_c_task.py

from celery_tasks import *

if __name__ == '__main__':
    st = time.time()
    task = add.delay(333, ‘aa’)  # 修改此处第二个参数为字符串类型
    res = 0
    wait_time = 5    # 此处修改超时时间为5秒
    for n in range(wait_time):
        status = task.ready()
        if not status:
            time.sleep(1)
            print(f'任务未完成,继续等待...{n + 1}s')
        else:
            res = task.get(timeout=1)
            break
    else:
        print(f'等待超时:{wait_time}s')
    # 计算耗时多少秒
    print(time.time() -  st)
    print(res)

查看任务执行的结果:

E:\virtualenv\test_celery\Scripts\python.exe E:\work\test_celery\use_c_task.py 
Traceback (most recent call last):
  File "E:\work\test_celery\use_c_task.py", line 23, in <module>
    res = task.get(timeout=1)
  File "E:\virtualenv\test_celery\lib\site-packages\celery\result.py", line 247, in get
    self.maybe_throw(callback=callback)
  File "E:\virtualenv\test_celery\lib\site-packages\celery\result.py", line 365, in maybe_throw
    self.throw(value, self._to_remote_traceback(tb))
  File "E:\virtualenv\test_celery\lib\site-packages\celery\result.py", line 358, in throw
    self.on_ready.throw(*args, **kwargs)
  File "E:\virtualenv\test_celery\lib\site-packages\vine\promises.py", line 235, in throw
    reraise(type(exc), exc, tb)
  File "E:\virtualenv\test_celery\lib\site-packages\vine\utils.py", line 27, in reraise
    raise value
TypeError: unsupported operand type(s) for +: 'int' and 'str'

可以看到任务这里直接报错了,再看下worker的执行日志信息:

[2024-01-29 14:35:05,150: INFO/MainProcess] Task celery_tasks.add[8048ca7e-8af3-4cbf-a32c-a8c90cbfb5d8] received
[2024-01-29 14:35:05,158: ERROR/MainProcess] Task celery_tasks.add[8048ca7e-8af3-4cbf-a32c-a8c90cbfb5d8] raised unexpected: TypeError("unsupported operand type(s
) for +: 'int' and 'str'")
Traceback (most recent call last):
  File "E:\virtualenv\test_celery\lib\site-packages\celery\app\trace.py", line 477, in trace_task
    R = retval = fun(*args, **kwargs)
  File "E:\virtualenv\test_celery\lib\site-packages\celery\app\trace.py", line 760, in __protected_call__
    return self.run(*args, **kwargs)
  File "E:\work\test_celery\celery_tasks.py", line 18, in add
    res = a + b
TypeError: unsupported operand type(s) for +: 'int' and 'str'

可以看到worker这里也报错了。当任务报错后程序直接停止,无法继续往下执行了,我们可以通过使用propagate来捕获异常信息。

propagate:布尔值,默认为True。当propagate为True时,如果任务抛出了异常,那么在调用.get()方法时,这个异常会被再次抛出。这意味着原始任务的异常会传播(或者说抛给)给调用它的代码。当你将propagate设置为False时,如果任务失败了(即抛出了异常),调用.get()方法将不会抛出异常,而是正常返回。这在某些场景下非常有用,比如当你想要在任务失败后执行某些特定的错误处理逻辑时。

修改调用入口:

# 文件名称:use_c_task.py

from celery_tasks import *

if __name__ == '__main__':
    st = time.time()
    task = add.delay(333, 'aa')
    res = 0
    wait_time = 15
    for n in range(wait_time):
        status = task.ready()
        if not status:
            time.sleep(1)
            print(f'任务未完成,继续等待...{n + 1}s')
        else:
            res = task.get(timeout=1, propagate=False)
            break
    else:
        print(f'等待超时:{wait_time}s')
    if task.failed():
        print(f"任务执行出现异常,异常概览:{res}")
        print(f"任务执行失败,异常没有往外抛出,异常信息为:\n{task.traceback}")
    else:
        print("任务正常执行完成")
        # 计算耗时多少秒
        print(time.time() - st)
        print(res)

查看执行日志:

E:\virtualenv\test_celery\Scripts\python.exe E:\work\test_celery\use_c_task.py 
任务执行出现异常,异常概览:unsupported operand type(s) for +: 'int' and 'str'
任务执行失败,异常没有往外抛出,异常信息为:
Traceback (most recent call last):
  File "E:\virtualenv\test_celery\lib\site-packages\celery\app\trace.py", line 477, in trace_task
    R = retval = fun(*args, **kwargs)
  File "E:\virtualenv\test_celery\lib\site-packages\celery\app\trace.py", line 760, in __protected_call__
    return self.run(*args, **kwargs)
  File "E:\work\test_celery\celery_tasks.py", line 18, in add
    res = a + b
TypeError: unsupported operand type(s) for +: 'int' and 'str'


进程已结束,退出代码0

可以看到当执行出现异常时,由于传参propagate=False所以异常被捕获了,没有导致任务异常退出。同时,也能看到当出现异常时,任务的执行结果为异常的概览信息,此处为unsupported operand type(s) for +: 'int' and 'str',也可以通过打印任务的堆栈task.traceback信息获取详细错误信息,此处为:

Traceback (most recent call last):
  File "E:\virtualenv\test_celery\lib\site-packages\celery\app\trace.py", line 477, in trace_task
    R = retval = fun(*args, **kwargs)
  File "E:\virtualenv\test_celery\lib\site-packages\celery\app\trace.py", line 760, in __protected_call__
    return self.run(*args, **kwargs)
  File "E:\work\test_celery\celery_tasks.py", line 18, in add
    res = a + b
TypeError: unsupported operand type(s) for +: 'int' and 'str'

定时任务

我们知道调用任务时可以导入任务方法名称后通过delay传入参数即可通过brokerworker发送执行请求。

如果我们想要定时执行某个任务该如何处理呢?

在Celery中定义定时任务时,需要创建一个beat_schedule字典,这个字典定义了任务的执行计划。这个字典的键是你给任务定义的名字,值是一个字典,用于描述任务执行的调度和参数。

这里有一些常用的schedule类:

  • celery.schedules.crontab(): 使用crontab格式定义任务的执行时间,例如,每天早上5点半运行,或每星期一的下午3:15运行。
  • celery.schedules.schedule(): 定义任务的执行频率,例如,每5分钟执行一次。这个是默认的调度类。

每一项的具体配置项包括:

  • task: 要执行的任务(函数)的名称。
  • schedule: 任务执行的时间间隔或条件,使用上述的schedule类之一来定义。
  • args: (Optional) 任务需要的位置参数。
  • kwargs: (Optional) 任务需要的关键字参数。
  • options: (Optional) 传递给任务的额外参数,例如队列名。

示例

这里是一个使用Celery Beat 定时任务的示例:

from celery.schedules import crontab

app.conf.beat_schedule = {
    'add-every-30-seconds': {  # 任务名
        'task': 'tasks.add',  # 任务函数路径
        'schedule': 30.0,  # 每30秒执行一次
        'args': (16, 16),  # 任务函数的参数
    },
    'multiply-at-midnight': {
        'task': 'tasks.multiply',
        'schedule': crontab(hour=0, minute=0),  # 每天午夜执行
        'args': (4, 4),
    },
}

注意事项

  1. 当调整了定时任务的配置后,记得重启Celery Beat服务,以便更改生效。
  2. Celery执行定时任务时依赖于其内部的定时器,因此要注意系统时间的准确性和时区的配置,特别是在分布式场景下。
  3. 定时任务创建时就会把参数被序列化(保存)为特定的值,如果我们任务的传参是datetime.now(),定时任务执行后会发现参数不会随着时间的推移而更新。如果想要实时计算,建议把参数放到定义的task中。

补充

启动workerbeat

前面说了通过命令行启动实例

# 仅启动worker时
celery -A celery_tasks worker --loglevel=INFO

windows上不支持多进程,指定执行池

celery -A celery_tasks worker --loglevel=INFO --pool=solo

如果定时任务beat和执行器worker写在同一个文件中且在windows

# 文件名称: celery_tasks.py

import time
from celery import Celery
from celery.schedules import crontab

# 定义一个Celery实例
app = Celery("my_celery", backend="redis://172.xx.xx.x1:6379/9", broker="redis://172.xx.xx.x1:6379/8")


@app.task
def add(a, b):
    res = a + b
    time.sleep(10)
    return res


app.conf.beat_schedule = {
    # 每分钟执行一次add任务
    'auto_run_add': {
        'task': 'celery_tasks.add',
        'schedule': 20,
        'args': (1, 2),
    },
}

启动

# 先启动worker
celery -A celery_tasks worker --loglevel=INFO --pool=solo

# 再启动beat
celery -A celery_tasks beat  --loglevel=INFO

beat启动后的日志如下:

(test_celery) PS E:\work\test_celery> celery -A celery_tasks beat --loglevel=INFO
celery beat v5.3.6 (emerald-rush) is starting.
__    -    ... __   -        _                    
LocalTime -> 2024-01-29 15:48:23                  
Configuration ->                                  
    . broker -> redis://172.xx.xx.xx1:6379/8     
    . loader -> celery.loaders.app.AppLoader      
    . scheduler -> celery.beat.PersistentScheduler
    . db -> celerybeat-schedule                   
    . logfile -> [stderr]@%INFO                   
    . maxinterval -> 5.00 minutes (300s)          
[2024-01-29 15:48:23,845: INFO/MainProcess] beat: Starting...

如果是linux的话,可以带参数-B或者--beat直接执行,会同时启动workerbeat

celery -A celery_tasks worker -B --loglevel=INFO
# 或者 
celery -A celery_tasks worker --beat --loglevel=INFO

前面的启动是在系统中启动celery,如何像执行py模块一样动态的启动呢?

动态启动

可以通过celery.Celery.start或者celery.Celery.worker_main动态启动celery。

worker_main 是一个方便的方法,用于模拟命令行启动 worker 的行为。你可以通过传递一个参数列表(类似于命令行参数)给这个方法来启动 worker。这种方法允许程序在不离开当前 Python 运行环境的情况下启动 worker,非常适合在代码中集成 Celery worker 的启动过程。

from celery import Celery
 
app = Celery('myapp', broker='redis://localhost')

app.worker_main(['worker', '--loglevel=info'])

上述例子中,worker_main(['worker', '--loglevel=info']) 会像是在命令行中执行 celery -A myapp worker --loglevel=info 一样启动一个 worker,其中 -A myapp 是通过 Celery('myapp', ...) 指定的。

start 方法较为底层,它接受一个参数列表,通常用于启动 Celery 应用的命令行接口。这个方法可以用来启动 worker,但它也可以用来执行 Celery 支持的任何命令,比如 beatflower

from celery import Celery

app = Celery('myapp', broker='redis://localhost')

app.start(['celery', 'worker', '--loglevel=info'])

这种方式相比 worker_main 更通用,但在使用上需要更明确地指定是启动 celery 命令。start 方法的用途不限于启动 worker,因此在某些更复杂的场景下可能更适合使用。

本质区别

  • worker_main 是一个更高级别的封装,专门设计用于在代码中启动 worker。它提供了一个简化的接口来启动 worker,更接近于命令行的使用方式。

  • start 方法更通用,它可以启动任意 Celery 支持的命令。这使得它在需要动态执行不同类型的 Celery 命令时非常有用。

使用场景

  • 如果你的目标仅仅是在 Python 代码中启动一个 Celery worker,并希望这个过程尽可能简单,那么 worker_main 是一个更好的选择。

  • 如果你需要在代码中执行 Celery 的其他命令,或者对启动过程有更多的自定义需求,start 方法会是一个更灵活的选项。

标签:异步,task,入门,celery,py,worker,Celery,任务,tasks
From: https://www.cnblogs.com/wjlv/p/17994768

相关文章

  • 异步 ApllicationContext pushlishEvent @EventLisener来处理
    importorg.springframework.context.ApplicationContext;importorg.springframework.context.ApplicationEvent;importorg.springframework.context.event.ApplicationEventMulticaster;importorg.springframework.core.task.AsyncTaskExecutor;@Component publi......
  • 5分钟快速入门GIS(GIS基础)(GIS当中常见数据)
    5分钟快速入门GIS(GIS基础)(GIS当中常见数据):https://blog.csdn.net/xiaopeixiaojun/article/details/117574745?ops_request_misc=%257B%2522request%255Fid%2522%253A%2522170651063316800227413988%2522%252C%2522scm%2522%253A%252220140713.130102334..%2522%257D&request_id=170......
  • Docker 入门系列(4)- Docker 数据管理(挂载目录、挂载文件、数据卷挂载、数据卷共享、数
    Docker入门系列(4)-Docker数据管理(挂载目录、挂载文件、数据卷挂载、数据卷共享、数据卷删除、数据卷容器备份和恢复):https://blog.csdn.net/wohu1104/article/details/84962695?ops_request_misc=%257B%2522request%255Fid%2522%253A%2522170649547416800213023691%2522%252C%252......
  • 自动化测试神器:Python之Pytest库入门使用
    (自动化测试神器:Python之Pytest库入门使用)本文写作思路和建议:1、本文从Python的Pytest库基本安装使用开始讲起,详细说明pytest如何操作,如何快速入门?2、同时说明了pytest的常用特性,如何在自动化测试中进行应用,达到应有的测试效率?3、建议:阅读本文基本可以掌握Pytest的用法,另外学......
  • 给你一颗“定心丸”——记一次由线上事故引发的Log4j2日志异步打印优化分析
    一、内容提要自知是人外有人,天外有天,相信对于Log4j2的异步日志打印早有老师或者同学已是熟稔于心,优化配置更是信手拈来,为了防止我在这里啰里八嗦的班门弄斧,我先将谜底在此公布:log4j2.asyncQueueFullPolicy=Discard&log4j2.discardThreshold=ERROR,这两个Log4j2配置在强依赖的RPC......
  • [Git]入门及其常用命令
    【版权声明】未经博主同意,谢绝转载!(请尊重原创,博主保留追究权)https://www.cnblogs.com/cnb-yuchen/p/17993832出自【进步*于辰的博客】Git的难点在于细节很多,一般很难一次性掌握。若想快速学会乃至掌握,就需要查阅博文、视频等资料,手打测试一些常用命令。参考笔记二,P79~81;笔......
  • 共性化异步任务处理方案
    背景考虑到现有业务很多依赖于MQ的方式进行,这种方式需要依赖于MQ,发送消息到mq和消费mq消息时需要了解mq消息结构进行相应处理;对于后续对同样的事件做其他处理的人如不能提前了解到已有相应消息发到了mq就得再发一次消息到mq等。图1......
  • WebAssembly入门笔记[3]:利用Table传递引用
    在《WebAssembly入门笔记[2]》中,我们介绍了如何利用Memory在作为宿主的JavaScript应用和wasm模块之间传递数据,但是Memory面向单纯二进制字节的读写在使用起来还是不太方便,此时我们会更多地用到另一个重要的对象Table。Table利用用来存储一组指定类型的对象,说得准确一点是对象的引......
  • 数学建模入门笔记(3) 插值与拟合
    插值与拟合插值和拟合的区别:拟合不要求过每一个已知点,而插值要求过每一个已知点,因而插值可以看作过每一个点的拟合。插值适用于补全缺失值,因为使用一般拟合就有可能使已知值偏移,不符合需求。据说PS用某种样条插值,放大的时候最大程度的保留连续性,因此显得不是那么模糊.数学建模......
  • 【ElasticSearch】入门-基础概念
    什么是ES?是一个高可用分布式的搜索引擎。可以用于实时存储、检索数据。底层是使用Lucene全文检索框架。基本概念存储结构:由_index_type和_id标识唯一的一个文档_index:指向一个或多个物理分片的逻辑命名空间_type:用于区分同一个集合中的不同的细分(ES6.X中只允许一个i......