跳转至

分发简单任务

在之前,我们已经建立好环境。下面测试一下环境,发送一个计算平方根的任务。

定义任务模块tasks.py。在开始,导入必须的模块。

from math import sqrt
from celery import Celery

然后,创建Celery实例,代表客户端应用:

app = Celery('tasks', broker='redis://192.168.25.21:6379/0')

在初始化时我们传入了模块的名称和broker的地址。

然后,启动result backend,如下:

app.config.CELERY_RESULT_BACKEND = 'redis://192.168.25.21:6379/0'

# 较新的版本(v5.2.7)直接填充在celery app的初始化参数中.
app = Celery('tasks', broker='redis://localhost/0', backend='redis://localhost/0')

@app.tack装饰器定义任务:

@app.task
def sqrt_task(value):
    return sqrt(value)

到此,我们完成了tasks.py模块的定义,我们需要初始化服务端的workers。我们创建了一个单独的目录叫做8397_07_broker。拷贝tasks.py模块到这个目录,运行如下命令:

$celery –A tasks worker –-loglevel=INFO

上述命令初始化了Clery Server—A代表Celery应用。下图是初始化的部分截图

$# celery -A tasks worker  --loglevel=INFO
/opt/celery_env/lib/python3.9/site-packages/celery/platforms.py:840: SecurityWarning: You're running the worker with superuser privileges: this is
absolutely not recommended!

Please specify a different user using the --uid option.

User information: uid=0 euid=0 gid=0 egid=0

  warnings.warn(SecurityWarning(ROOT_DISCOURAGED.format(

 -------------- celery@ch1.nauu.com v5.2.7 (dawn-chorus)
--- ***** -----
-- ******* ---- Linux-3.10.0-957.el7.x86_64-x86_64-with-glibc2.17 2023-03-06 16:12:10
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app:         tasks:0x7fe5cbea9b80
- ** ---------- .> transport:   redis://localhost:6379/0
- ** ---------- .> results:     redis://localhost/0
- *** --- * --- .> concurrency: 2 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery


[tasks]
  . tasks.square_root

[2023-03-06 16:12:10,866: INFO/MainProcess] Connected to redis://localhost:6379/0
[2023-03-06 16:12:10,871: INFO/MainProcess] mingle: searching for neighbors
[2023-03-06 16:12:11,897: INFO/MainProcess] mingle: all alone
[2023-03-06 16:12:11,929: INFO/MainProcess] celery@ch1.nauu.com ready.

现在,Celery Server等待接收任务并且发送给workers

下一步就是在客户端创建应用调用tasks

上述步骤不能忽略,因为下面会用在之前创建的东西。

在客户端机器,我们有celery_env虚拟环境,现在创建一个task_dispatcher.py模块很简单,如下步骤;

  1. 导入logging模块来显示程序执行信息,导入Celery模块:

    import logging
    from celery import Celery
    
  2. 下一步是创建Celery实例,和服务端一样:

    #logger configuration...
    app = Celery('tasks', broker='redis://192.168.25.21:6379/0')
    app.conf.CELERY_RESULT_BACKEND = 'redis://192.168.25.21:6397/0'
    

由于我们在接下的内容中要复用这个模块来实现任务的调用,下面我们创建一个方法来封装sqrt_task(value)的发送,我们将创建manage_sqrt_task(value)方法:

def manage_sqrt_task(value):
    result = app.send_task('tasks.sqrt_task', args=(value,))
    logging.info(result.get())

从上述代码我们发现客户端应用不需要知道服务端的实现。通过Celery类中的send_task方法,我们传入module.task格式的字符串和以元组的方式传入参数就可以调用一个任务。最后,我们看一看log中的结果。 在__main__中,我们调用了manage_sqrt_task(value)方法:

if __name__ == '__main__':
    manage_sqrt_task(4)

下面的截图是执行task_dispatcher.py文件的结果:

[2023-03-06 16:18:45,481: INFO/MainProcess] Task tasks.sqrt_task[3ecab729-f1cb-4f29-bb47-b713b2e563ed] received
[2023-03-06 16:18:45,500: INFO/ForkPoolWorker-2] Task tasks.sqrt_task[3ecab729-f1cb-4f29-bb47-b713b2e563ed] succeeded in 0.015412827953696251s: 2.0

在客户端,通过get()方法得到结果,这是通过send_task()返回的AsyncResult实例中的重要特征。结果如下图:

$# python task_dispatcher.py
2023-03-06 16:26:05,841 - 2.0

完整案例

tasks.py

from math import sqrt
from celery import Celery

app = Celery('tasks', broker='redis://localhost/0', backend='redis://localhost/0')


@app.task
def sqrt_task(value):
    return sqrt(value)

task_dispatcher.py

import logging
from celery import Celery

logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(asctime)s - %(message)s')

ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
ch.setFormatter(formatter)
logger.addHandler(ch)

app = Celery('tasks', broker='redis://localhost/0', backend='redis://localhost/0')

def manage_sqrt_task(value):
    result = app.send_task('tasks.sqrt_task', args=(value,))
    logger.info(result.get())


if __name__ == '__main__':
    print(manage_sqrt_task(4))

最后更新: 2023年3月6日
创建日期: 2023年3月6日