[文档]defwait_for(self,fun,# type: Callablecatch,# type: Sequence[Any]desc="thing",# type: strargs=(),# type: Tuplekwargs=None,# type: Dicterrback=None,# type: Callablemax_retries=10,# type: intinterval_start=0.1,# type: floatinterval_step=0.5,# type: floatinterval_max=5.0,# type: floatemit_warning=False,# type: bool**options# type: Any):# type: (...) -> Any"""Wait for event to happen. The `catch` argument specifies the exception that means the event has not happened yet. """kwargs={}ifnotkwargselsekwargsdefon_error(exc,intervals,retries):interval=next(intervals)ifemit_warning:self.warn(E_STILL_WAITING.format(desc,when=humanize_seconds(interval,'in',' '),exc=exc,))iferrback:errback(exc,interval,retries)returnintervalreturnself.retry_over_time(fun,catch,args=args,kwargs=kwargs,errback=on_error,max_retries=max_retries,interval_start=interval_start,interval_step=interval_step,**options)
[文档]defensure_not_for_a_while(self,fun,catch,desc='thing',max_retries=20,interval_start=0.1,interval_step=0.02,interval_max=1.0,emit_warning=False,**options):"""Make sure something does not happen (at least for a while)."""try:returnself.wait_for(fun,catch,desc=desc,max_retries=max_retries,interval_start=interval_start,interval_step=interval_step,interval_max=interval_max,emit_warning=emit_warning,)exceptcatch:passelse:raiseAssertionError(f'Should not have happened: {desc}')
[文档]defassert_accepted(self,ids,interval=0.5,desc='waiting for tasks to be accepted',**policy):returnself.assert_task_worker_state(self.is_accepted,ids,interval=interval,desc=desc,**policy)
[文档]defassert_received(self,ids,interval=0.5,desc='waiting for tasks to be received',**policy):returnself.assert_task_worker_state(self.is_received,ids,interval=interval,desc=desc,**policy)
[文档]defassert_result_tasks_in_progress_or_completed(self,async_results,interval=0.5,desc='waiting for tasks to be started or completed',**policy):returnself.assert_task_state_from_result(self.is_result_task_in_progress,async_results,interval=interval,desc=desc,**policy)
[文档]defwait_until_idle(self):control=self.app.controlwithself.app.connection()asconnection:# Try to purge the queue before we start# to attempt to avoid interference from other testswhileTrue:count=control.purge(connection=connection)ifcount==0:break# Wait until worker is idleinspect=control.inspect()inspect.connection=connectionwhileTrue:try:count=sum(len(t)fortininspect.active().values())exceptContentDisallowed:# test_security_task_done may trigger this exceptionbreakifcount==0:break
[文档]classManager(ManagerMixin):"""Test helpers for task integration tests."""def__init__(self,app,**kwargs):self.app=appself._init_manager(**kwargs)