"""The consumers highly-optimized inner loop."""importerrnoimportsocketfromceleryimportbootstepsfromcelery.exceptionsimportWorkerLostErrorfromcelery.utils.logimportget_loggerfrom.importstate__all__=('asynloop','synloop')# pylint: disable=redefined-outer-name# We cache globals and attribute lookups, so disable this warning.logger=get_logger(__name__)def_quick_drain(connection,timeout=0.1):try:connection.drain_events(timeout=timeout)exceptExceptionasexc:# pylint: disable=broad-exceptexc_errno=getattr(exc,'errno',None)ifexc_errnoisnotNoneandexc_errno!=errno.EAGAIN:raisedef_enable_amqheartbeats(timer,connection,rate=2.0):heartbeat_error=[None]ifnotconnection:returnheartbeat_errorheartbeat=connection.get_heartbeat_interval()# negotiatedifnot(heartbeatandconnection.supports_heartbeats):returnheartbeat_errordeftick(rate):try:connection.heartbeat_check(rate)exceptExceptionase:# heartbeat_error is passed by reference can be updated# no append here list should be fixed size=1heartbeat_error[0]=etimer.call_repeatedly(heartbeat/rate,tick,(rate,))returnheartbeat_error
[文档]defasynloop(obj,connection,consumer,blueprint,hub,qos,heartbeat,clock,hbrate=2.0):"""Non-blocking event loop."""RUN=bootsteps.RUNupdate_qos=qos.updateerrors=connection.connection_errorson_task_received=obj.create_task_handler()heartbeat_error=_enable_amqheartbeats(hub.timer,connection,rate=hbrate)consumer.on_message=on_task_receivedobj.controller.register_with_event_loop(hub)obj.register_with_event_loop(hub)consumer.consume()obj.on_ready()# did_start_ok will verify that pool processes were able to start,# but this will only work the first time we start, as# maxtasksperchild will mess up metrics.ifnotobj.restart_countandnotobj.pool.did_start_ok():raiseWorkerLostError('Could not start worker processes')# consumer.consume() may have prefetched up to our# limit - drain an event so we're in a clean state# prior to starting our event loop.ifconnection.transport.driver_type=='amqp':hub.call_soon(_quick_drain,connection)# FIXME: Use loop.run_forever# Tried and works, but no time to test properly before release.hub.propagate_errors=errorsloop=hub.create_loop()try:whileblueprint.state==RUNandobj.connection:state.maybe_shutdown()ifheartbeat_error[0]isnotNone:raiseheartbeat_error[0]# We only update QoS when there's no more messages to read.# This groups together qos calls, and makes sure that remote# control commands will be prioritized over task messages.ifqos.prev!=qos.value:update_qos()try:next(loop)exceptStopIteration:loop=hub.create_loop()finally:try:hub.reset()exceptExceptionasexc:# pylint: disable=broad-exceptlogger.exception('Error cleaning up after event loop: %r',exc)
[文档]defsynloop(obj,connection,consumer,blueprint,hub,qos,heartbeat,clock,hbrate=2.0,**kwargs):"""Fallback blocking event loop for transports that doesn't support AIO."""RUN=bootsteps.RUNon_task_received=obj.create_task_handler()perform_pending_operations=obj.perform_pending_operationsheartbeat_error=[None]ifgetattr(obj.pool,'is_green',False):heartbeat_error=_enable_amqheartbeats(obj.timer,connection,rate=hbrate)consumer.on_message=on_task_receivedconsumer.consume()obj.on_ready()def_loop_cycle():""" Perform one iteration of the blocking event loop. """ifheartbeat_error[0]isnotNone:raiseheartbeat_error[0]ifqos.prev!=qos.value:qos.update()try:perform_pending_operations()connection.drain_events(timeout=2.0)exceptsocket.timeout:passexceptOSError:ifblueprint.state==RUN:raisewhileblueprint.state==RUNandobj.connection:try:state.maybe_shutdown()finally:_loop_cycle()