Task Cookbook

确保每次只执行一个任务

Ensuring a task is only executed one at a time

您可以通过使用锁来实现这一点。

在这个例子中,我们将使用缓存框架来设置一个对所有工作者可访问的锁。

它是一个虚构的 RSS 源导入器的一部分,名为 djangofeeds。 该任务将一个源 URL 作为唯一参数,并将该源导入到 一个名为 Feed 的 Django 模型中。我们通过设置一个缓存键 (由源 URL 的 MD5 校验和组成)来确保不可能有两个或更多的工作者 同时导入相同的源。

缓存键会在一定时间后过期,以防发生意外情况, 而意外总是会发生……

因此,您的任务运行时间不应超过超时。

备注

为了使其正确工作,您需要使用一个缓存 后端,其中 .add 操作是原子的。已知 memcached 在此目的下 能很好地工作。

import time
from celery import task
from celery.utils.log import get_task_logger
from contextlib import contextmanager
from django.core.cache import cache
from hashlib import md5
from djangofeeds.models import Feed

logger = get_task_logger(__name__)

LOCK_EXPIRE = 60 * 10  # 锁定在10分钟后过期

@contextmanager
def memcache_lock(lock_id, oid):
    timeout_at = time.monotonic() + LOCK_EXPIRE - 3
    # cache.add 如果键已存在则失败
    status = cache.add(lock_id, oid, LOCK_EXPIRE)
    try:
        yield status
    finally:
        # memcache 删除操作非常慢,但我们必须使用它以利用
        # 使用 add() 实现原子锁定
        if time.monotonic() < timeout_at and status:
            # 如果超过超时则不释放锁
            # 以减少释放已过期锁的机会
            # 该锁可能已被其他人拥有
            # 如果我们没有成功获取锁,也不释放锁
            cache.delete(lock_id)

@task(bind=True)
def import_feed(self, feed_url):
    # 缓存键由任务名称和源 URL 的 MD5 摘要组成。
    feed_url_hexdigest = md5(feed_url).hexdigest()
    lock_id = '{0}-lock-{1}'.format(self.name, feed_url_hexdigest)
    logger.debug('正在导入源: %s', feed_url)
    with memcache_lock(lock_id, self.app.oid) as acquired:
        if acquired:
            return Feed.objects.import_feed(feed_url).url
    logger.debug(
        '源 %s 正在被另一个工作者导入', feed_url)

You can accomplish this by using a lock.

In this example we'll be using the cache framework to set a lock that's accessible for all workers.

It's part of an imaginary RSS feed importer called djangofeeds. The task takes a feed URL as a single argument, and imports that feed into a Django model called Feed. We ensure that it's not possible for two or more workers to import the same feed at the same time by setting a cache key consisting of the MD5 check-sum of the feed URL.

The cache key expires after some time in case something unexpected happens, and something always will...

For this reason your tasks run-time shouldn't exceed the timeout.

备注

In order for this to work correctly you need to be using a cache backend where the .add operation is atomic. memcached is known to work well for this purpose.

import time
from celery import task
from celery.utils.log import get_task_logger
from contextlib import contextmanager
from django.core.cache import cache
from hashlib import md5
from djangofeeds.models import Feed

logger = get_task_logger(__name__)

LOCK_EXPIRE = 60 * 10  # Lock expires in 10 minutes

@contextmanager
def memcache_lock(lock_id, oid):
    timeout_at = time.monotonic() + LOCK_EXPIRE - 3
    # cache.add fails if the key already exists
    status = cache.add(lock_id, oid, LOCK_EXPIRE)
    try:
        yield status
    finally:
        # memcache delete is very slow, but we have to use it to take
        # advantage of using add() for atomic locking
        if time.monotonic() < timeout_at and status:
            # don't release the lock if we exceeded the timeout
            # to lessen the chance of releasing an expired lock
            # owned by someone else
            # also don't release the lock if we didn't acquire it
            cache.delete(lock_id)

@task(bind=True)
def import_feed(self, feed_url):
    # The cache key consists of the task name and the MD5 digest
    # of the feed URL.
    feed_url_hexdigest = md5(feed_url).hexdigest()
    lock_id = '{0}-lock-{1}'.format(self.name, feed_url_hexdigest)
    logger.debug('Importing feed: %s', feed_url)
    with memcache_lock(lock_id, self.app.oid) as acquired:
        if acquired:
            return Feed.objects.import_feed(feed_url).url
    logger.debug(
        'Feed %s is already being imported by another worker', feed_url)