[文档]defrepublish(producer,message,exchange=None,routing_key=None,remove_props=None):"""Republish message."""ifnotremove_props:remove_props=['application_headers','content_type','content_encoding','headers']body=ensure_bytes(message.body)# use raw message body.info,headers,props=(message.delivery_info,message.headers,message.properties)exchange=info['exchange']ifexchangeisNoneelseexchangerouting_key=info['routing_key']ifrouting_keyisNoneelserouting_keyctype,enc=message.content_type,message.content_encoding# remove compression header, as this will be inserted again# when the message is recompressed.compression=headers.pop('compression',None)expiration=props.pop('expiration',None)# ensure expiration is a floatexpiration=float(expiration)ifexpirationisnotNoneelseNoneforkeyinremove_props:props.pop(key,None)producer.publish(ensure_bytes(body),exchange=exchange,routing_key=routing_key,compression=compression,headers=headers,content_type=ctype,content_encoding=enc,expiration=expiration,**props)
[文档]defmigrate_task(producer,body_,message,queues=None):"""Migrate single task message."""info=message.delivery_infoqueues={}ifqueuesisNoneelsequeuesrepublish(producer,message,exchange=queues.get(info['exchange']),routing_key=queues.get(info['routing_key']))
[文档]defmigrate_tasks(source,dest,migrate=migrate_task,app=None,queues=None,**kwargs):"""Migrate tasks from one broker to another."""app=app_or_default(app)queues=prepare_queues(queues)producer=app.amqp.Producer(dest,auto_declare=False)migrate=partial(migrate,producer,queues=queues)defon_declare_queue(queue):new_queue=queue(producer.channel)new_queue.name=queues.get(queue.name,queue.name)ifnew_queue.routing_key==queue.name:new_queue.routing_key=queues.get(queue.name,new_queue.routing_key)ifnew_queue.exchange.name==queue.name:new_queue.exchange.name=queues.get(queue.name,queue.name)new_queue.declare()returnstart_filter(app,source,migrate,queues=queues,on_declare_queue=on_declare_queue,**kwargs)
[文档]defmove(predicate,connection=None,exchange=None,routing_key=None,source=None,app=None,callback=None,limit=None,transform=None,**kwargs):"""Find tasks by filtering them and move the tasks to a new queue. Arguments: predicate (Callable): Filter function used to decide the messages to move. Must accept the standard signature of ``(body, message)`` used by Kombu consumer callbacks. If the predicate wants the message to be moved it must return either: 1) a tuple of ``(exchange, routing_key)``, or 2) a :class:`~kombu.entity.Queue` instance, or 3) any other true value means the specified ``exchange`` and ``routing_key`` arguments will be used. connection (kombu.Connection): Custom connection to use. source: List[Union[str, kombu.Queue]]: Optional list of source queues to use instead of the default (queues in :setting:`task_queues`). This list can also contain :class:`~kombu.entity.Queue` instances. exchange (str, kombu.Exchange): Default destination exchange. routing_key (str): Default destination routing key. limit (int): Limit number of messages to filter. callback (Callable): Callback called after message moved, with signature ``(state, body, message)``. transform (Callable): Optional function to transform the return value (destination) of the filter function. Also supports the same keyword arguments as :func:`start_filter`. To demonstrate, the :func:`move_task_by_id` operation can be implemented like this: .. code-block:: python def is_wanted_task(body, message): if body['id'] == wanted_id: return Queue('foo', exchange=Exchange('foo'), routing_key='foo') move(is_wanted_task) or with a transform: .. code-block:: python def transform(value): if isinstance(value, str): return Queue(value, Exchange(value), value) return value move(is_wanted_task, transform=transform) Note: The predicate may also return a tuple of ``(exchange, routing_key)`` to specify the destination to where the task should be moved, or a :class:`~kombu.entity.Queue` instance. Any other true value means that the task will be moved to the default exchange/routing_key. """app=app_or_default(app)queues=[_maybe_queue(app,queue)forqueueinsourceor[]]orNonewithapp.connection_or_acquire(connection,pool=False)asconn:producer=app.amqp.Producer(conn)state=State()defon_task(body,message):ret=predicate(body,message)ifret:iftransform:ret=transform(ret)ifisinstance(ret,Queue):maybe_declare(ret,conn.default_channel)ex,rk=ret.exchange.name,ret.routing_keyelse:ex,rk=expand_dest(ret,exchange,routing_key)republish(producer,message,exchange=ex,routing_key=rk)message.ack()state.filtered+=1ifcallback:callback(state,body,message)iflimitandstate.filtered>=limit:raiseStopFiltering()returnstart_filter(app,conn,on_task,consume_from=queues,**kwargs)
[文档]deftask_id_eq(task_id,body,message):"""Return true if task id equals task_id'."""returnbody['id']==task_id
[文档]deftask_id_in(ids,body,message):"""Return true if task id is member of set ids'."""returnbody['id']inids
defprepare_queues(queues):ifisinstance(queues,str):queues=queues.split(',')ifisinstance(queues,list):queues=dict(tuple(islice(cycle(q.split(':')),None,2))forqinqueues)ifqueuesisNone:queues={}returnqueuesclassFilterer:def__init__(self,app,conn,filter,limit=None,timeout=1.0,ack_messages=False,tasks=None,queues=None,callback=None,forever=False,on_declare_queue=None,consume_from=None,state=None,accept=None,**kwargs):self.app=appself.conn=connself.filter=filterself.limit=limitself.timeout=timeoutself.ack_messages=ack_messagesself.tasks=set(str_to_list(tasks)or[])self.queues=prepare_queues(queues)self.callback=callbackself.forever=foreverself.on_declare_queue=on_declare_queueself.consume_from=[_maybe_queue(self.app,q)forqinconsume_fromorlist(self.queues)]self.state=stateorState()self.accept=acceptdefstart(self):# start migrating messages.withself.prepare_consumer(self.create_consumer()):try:for_ineventloop(self.conn,# pragma: no covertimeout=self.timeout,ignore_timeouts=self.forever):passexceptsocket.timeout:passexceptStopFiltering:passreturnself.statedefupdate_state(self,body,message):self.state.count+=1ifself.limitandself.state.count>=self.limit:raiseStopFiltering()defack_message(self,body,message):message.ack()defcreate_consumer(self):returnself.app.amqp.TaskConsumer(self.conn,queues=self.consume_from,accept=self.accept,)defprepare_consumer(self,consumer):filter=self.filterupdate_state=self.update_stateack_message=self.ack_messageifself.tasks:filter=filter_callback(filter,self.tasks)update_state=filter_callback(update_state,self.tasks)ack_message=filter_callback(ack_message,self.tasks)consumer.register_callback(filter)consumer.register_callback(update_state)ifself.ack_messages:consumer.register_callback(self.ack_message)ifself.callbackisnotNone:callback=partial(self.callback,self.state)ifself.tasks:callback=filter_callback(callback,self.tasks)consumer.register_callback(callback)self.declare_queues(consumer)returnconsumerdefdeclare_queues(self,consumer):# declare all queues on the new broker.forqueueinconsumer.queues:ifself.queuesandqueue.namenotinself.queues:continueifself.on_declare_queueisnotNone:self.on_declare_queue(queue)try:_,mcount,_=queue(consumer.channel).queue_declare(passive=True)ifmcount:self.state.total_apx+=mcountexceptself.conn.channel_errors:pass
[文档]defmove_task_by_id(task_id,dest,**kwargs):"""Find a task by id and move it to another queue. Arguments: task_id (str): Id of task to find and move. dest: (str, kombu.Queue): Destination queue. transform (Callable): Optional function to transform the return value (destination) of the filter function. **kwargs (Any): Also supports the same keyword arguments as :func:`move`. """returnmove_by_idmap({task_id:dest},**kwargs)
[文档]defmove_by_idmap(map,**kwargs):"""Move tasks by matching from a ``task_id: queue`` mapping. Where ``queue`` is a queue to move the task to. Example: >>> move_by_idmap({ ... '5bee6e82-f4ac-468e-bd3d-13e8600250bc': Queue('name'), ... 'ada8652d-aef3-466b-abd2-becdaf1b82b3': Queue('name'), ... '3a2b140d-7db1-41ba-ac90-c36a0ef4ab1f': Queue('name')}, ... queues=['hipri']) """deftask_id_in_map(body,message):returnmap.get(message.properties['correlation_id'])# adding the limit means that we don't have to consume any more# when we've found everything.returnmove(task_id_in_map,limit=len(map),**kwargs)
[文档]defmove_by_taskmap(map,**kwargs):"""Move tasks by matching from a ``task_name: queue`` mapping. ``queue`` is the queue to move the task to. Example: >>> move_by_taskmap({ ... 'tasks.add': Queue('name'), ... 'tasks.mul': Queue('name'), ... }) """deftask_name_in_map(body,message):returnmap.get(body['task'])# <- name of taskreturnmove(task_name_in_map,**kwargs)