[文档]defget_extension(container,extension_cls,**match_attrs):""" Inspect ``container.extensions`` and return the first item that is an instance of ``extension_cls``. Optionally also require that the instance has an attribute with a particular value as given in the ``match_attrs`` kwargs. """forextincontainer.extensions:ifisinstance(ext,extension_cls):ifnotmatch_attrs:returnextdefhas_attribute(name,value):returngetattr(ext,name)==valueifall([has_attribute(name,value)forname,valueinmatch_attrs.items()]):returnext
[文档]defget_container(runner,service_cls):""" Inspect ``runner.containers`` and return the first item that is hosting an instance of ``service_cls``. """forcontainerinrunner.containers:ifcontainer.service_cls==service_cls:returncontainer
@contextmanager
[文档]defwait_for_call(timeout,mock_method):""" Return a context manager that waits ``timeout`` seconds for ``mock_method`` to be called, yielding the mock if so. Raises an :class:`eventlet.Timeout` if the method was not called within ``timeout`` seconds. """witheventlet.Timeout(timeout):whilenotmock_method.called:eventlet.sleep()yieldmock_method
[文档]defwait_for_worker_idle(container,timeout=10):""" Blocks until ``container`` has no running workers. Raises an :class:`eventlet.Timeout` if the method was not called within ``timeout`` seconds. """warnings.warn("`wait_for_worker_idle` is deprecated. Use the `entrypoint_waiter` ""to wait for specific entrypoints instead.",DeprecationWarning)witheventlet.Timeout(timeout):container._worker_pool.waitall()
[文档]defassert_stops_raising(fn,exception_type=Exception,timeout=10,interval=0.1):"""Assert that ``fn`` returns successfully within ``timeout`` seconds, trying every ``interval`` seconds. If ``exception_type`` is provided, fail unless the exception thrown is an instance of ``exception_type``. If not specified, any `:class:`Exception` instance is allowed. """witheventlet.Timeout(timeout):whileTrue:try:fn()exceptexception_type:passelse:returneventlet.sleep(interval)
[文档]defreset_rabbit_connections(vhost,rabbit_manager):forconnectioninget_rabbit_connections(vhost,rabbit_manager):try:rabbit_manager.delete_connection(connection['name'])exceptHTTPErrorasexc:ifexc.response.status_code==404:pass# connection closed in a raceelse:raise
[文档]classResourcePipeline(object):""" Creates and destroys resources in background threads. Creates up to `size` resources ahead of time so the caller avoids waiting for lazy creation. """
[文档]def_shutdown(self):self.running=False# increase max size of the ready queue and yield, allowing the# create thread to exit now if it's blocked trying to put an itemself.ready.resize(self.size+1)eventlet.sleep()# trash unused items while there are any left in the queue,# or the create thread is still runningwhileself.ready.qsize()ornotself.create_thread.dead:unused=self.ready.get()self.trash.put(unused)# finally wait for the destroy thread to exitself.trash.put(ResourcePipeline.STOP)self.destroy_thread.wait()