celery.contrib.migrate¶
Message migration tools (Broker <-> Broker).
- class celery.contrib.migrate.State[源代码]¶
- Migration progress state. - count = 0¶
 - filtered = 0¶
 - property strtotal¶
 - total_apx = 0¶
 
- celery.contrib.migrate.migrate_task(producer, body_, message, queues=None)[源代码]¶
- Migrate single task message. 
- celery.contrib.migrate.migrate_tasks(source, dest, migrate=<function migrate_task>, app=None, queues=None, **kwargs)[源代码]¶
- Migrate tasks from one broker to another. 
- celery.contrib.migrate.move(predicate, connection=None, exchange=None, routing_key=None, source=None, app=None, callback=None, limit=None, transform=None, **kwargs)[源代码]¶
- Find tasks by filtering them and move the tasks to a new queue. - 参数:
- predicate (Callable) -- - Filter function used to decide the messages to move. Must accept the standard signature of - (body, message)used by Kombu consumer callbacks. If the predicate wants the message to be moved it must return either:- a tuple of - (exchange, routing_key), or
- a - Queueinstance, or
- any other true value means the specified
- exchangeand- routing_keyarguments will be used.
 
 
- connection (kombu.Connection) -- Custom connection to use. 
- source -- List[Union[str, kombu.Queue]]: Optional list of source queues to use instead of the default (queues in - task_queues). This list can also contain- Queueinstances.
- exchange (str, kombu.Exchange) -- Default destination exchange. 
- routing_key (str) -- Default destination routing key. 
- limit (int) -- Limit number of messages to filter. 
- callback (Callable) -- Callback called after message moved, with signature - (state, body, message).
- transform (Callable) -- Optional function to transform the return value (destination) of the filter function. 
 
 - Also supports the same keyword arguments as - start_filter().- To demonstrate, the - move_task_by_id()operation can be implemented like this:- def is_wanted_task(body, message): if body['id'] == wanted_id: return Queue('foo', exchange=Exchange('foo'), routing_key='foo') move(is_wanted_task) - or with a transform: - def transform(value): if isinstance(value, str): return Queue(value, Exchange(value), value) return value move(is_wanted_task, transform=transform) - 备注 - The predicate may also return a tuple of - (exchange, routing_key)to specify the destination to where the task should be moved, or a- Queueinstance. Any other true value means that the task will be moved to the default exchange/routing_key.
- celery.contrib.migrate.move_by_idmap(map, **kwargs)[源代码]¶
- Move tasks by matching from a - task_id: queuemapping.- Where - queueis a queue to move the task to.- 示例 - >>> move_by_idmap({ ... '5bee6e82-f4ac-468e-bd3d-13e8600250bc': Queue('name'), ... 'ada8652d-aef3-466b-abd2-becdaf1b82b3': Queue('name'), ... '3a2b140d-7db1-41ba-ac90-c36a0ef4ab1f': Queue('name')}, ... queues=['hipri']) 
- celery.contrib.migrate.move_by_taskmap(map, **kwargs)[源代码]¶
- Move tasks by matching from a - task_name: queuemapping.- queueis the queue to move the task to.- 示例 - >>> move_by_taskmap({ ... 'tasks.add': Queue('name'), ... 'tasks.mul': Queue('name'), ... }) 
- celery.contrib.migrate.move_direct(predicate, connection=None, exchange=None, routing_key=None, source=None, app=None, callback=None, limit=None, *, transform=<function worker_direct>, **kwargs)¶
- Find tasks by filtering them and move the tasks to a new queue. - 参数:
- predicate (Callable) -- - Filter function used to decide the messages to move. Must accept the standard signature of - (body, message)used by Kombu consumer callbacks. If the predicate wants the message to be moved it must return either:- a tuple of - (exchange, routing_key), or
- a - Queueinstance, or
- any other true value means the specified
- exchangeand- routing_keyarguments will be used.
 
 
- connection (kombu.Connection) -- Custom connection to use. 
- source -- List[Union[str, kombu.Queue]]: Optional list of source queues to use instead of the default (queues in - task_queues). This list can also contain- Queueinstances.
- exchange (str, kombu.Exchange) -- Default destination exchange. 
- routing_key (str) -- Default destination routing key. 
- limit (int) -- Limit number of messages to filter. 
- callback (Callable) -- Callback called after message moved, with signature - (state, body, message).
- transform (Callable) -- Optional function to transform the return value (destination) of the filter function. 
 
 - Also supports the same keyword arguments as - start_filter().- To demonstrate, the - move_task_by_id()operation can be implemented like this:- def is_wanted_task(body, message): if body['id'] == wanted_id: return Queue('foo', exchange=Exchange('foo'), routing_key='foo') move(is_wanted_task) - or with a transform: - def transform(value): if isinstance(value, str): return Queue(value, Exchange(value), value) return value move(is_wanted_task, transform=transform) - 备注 - The predicate may also return a tuple of - (exchange, routing_key)to specify the destination to where the task should be moved, or a- Queueinstance. Any other true value means that the task will be moved to the default exchange/routing_key.
- celery.contrib.migrate.move_direct_by_id(task_id, dest, **kwargs)¶
- Find a task by id and move it to another queue. 
- celery.contrib.migrate.move_task_by_id(task_id, dest, **kwargs)[源代码]¶
- Find a task by id and move it to another queue. 
- celery.contrib.migrate.republish(producer, message, exchange=None, routing_key=None, remove_props=None)[源代码]¶
- Republish message. 
- celery.contrib.migrate.start_filter(app, conn, filter, limit=None, timeout=1.0, ack_messages=False, tasks=None, queues=None, callback=None, forever=False, on_declare_queue=None, consume_from=None, state=None, accept=None, **kwargs)[源代码]¶
- Filter tasks.