"""Graphical monitor of Celery events using curses."""importcursesimportsysimportthreadingfromdatetimeimportdatetimefromitertoolsimportcountfrommathimportceilfromtextwrapimportwrapfromtimeimporttimefromceleryimportVERSION_BANNER,statesfromcelery.appimportapp_or_defaultfromcelery.utils.textimportabbr,abbrtask__all__=('CursesMonitor','evtop')BORDER_SPACING=4LEFT_BORDER_OFFSET=3UUID_WIDTH=36STATE_WIDTH=8TIMESTAMP_WIDTH=8MIN_WORKER_WIDTH=15MIN_TASK_WIDTH=16# this module is considered experimental# we don't care about coverage.STATUS_SCREEN="""\events: {s.event_count} tasks:{s.task_count} workers:{w_alive}/{w_all}"""
[文档]defformat_row(self,uuid,task,worker,timestamp,state):mx=self.display_width# include spacingdetail_width=mx-1-STATE_WIDTH-1-TIMESTAMP_WIDTHuuid_space=detail_width-1-MIN_TASK_WIDTH-1-MIN_WORKER_WIDTHifuuid_space<UUID_WIDTH:uuid_width=uuid_spaceelse:uuid_width=UUID_WIDTHdetail_width=detail_width-uuid_width-1task_width=int(ceil(detail_width/2.0))worker_width=detail_width-task_width-1uuid=abbr(uuid,uuid_width).ljust(uuid_width)worker=abbr(worker,worker_width).ljust(worker_width)task=abbrtask(task,task_width).ljust(task_width)state=abbr(state,STATE_WIDTH).ljust(STATE_WIDTH)timestamp=timestamp.ljust(TIMESTAMP_WIDTH)row=f'{uuid}{worker}{task}{timestamp}{state} 'ifself.screen_widthisNone:self.screen_width=len(row[:mx])returnrow[:mx]
[文档]defalert(self,callback,title=None):self.win.erase()my,mx=self.win.getmaxyx()y=blank_line=count(2)iftitle:self.win.addstr(next(y),3,title,curses.A_BOLD|curses.A_UNDERLINE)next(blank_line)callback(my,mx,next(y))self.win.addstr(my-1,0,'Press any key to continue...',curses.A_BOLD)self.win.refresh()while1:try:returnself.win.getkey().upper()exceptException:# pylint: disable=broad-exceptpass
[文档]defalert_remote_control_reply(self,reply):defcallback(my,mx,xs):y=count(xs)ifnotreply:self.win.addstr(next(y),3,'No replies received in 1s deadline.',curses.A_BOLD+curses.color_pair(2),)returnforsubreplyinreply:curline=next(y)host,response=next(subreply.items())host=f'{host}: 'self.win.addstr(curline,3,host,curses.A_BOLD)attr=curses.A_NORMALtext=''if'error'inresponse:text=response['error']attr|=curses.color_pair(2)elif'ok'inresponse:text=response['ok']attr|=curses.color_pair(3)self.win.addstr(curline,3+len(host),text,attr)returnself.alert(callback,'Remote Control Command Replies')
[文档]defselection_info(self):ifnotself.selected_task:returndefalert_callback(mx,my,xs):my,mx=self.win.getmaxyx()y=count(xs)task=self.state.tasks[self.selected_task]info=task.info(extra=['state'])infoitems=[('args',info.pop('args',None)),('kwargs',info.pop('kwargs',None))]+list(info.items())forkey,valueininfoitems:ifkeyisNone:continuevalue=str(value)curline=next(y)keys=key+': 'self.win.addstr(curline,3,keys,curses.A_BOLD)wrapped=wrap(value,mx-2)iflen(wrapped)==1:self.win.addstr(curline,len(keys)+3,abbr(wrapped[0],self.screen_width-(len(keys)+3)))else:forsublineinwrapped:nexty=next(y)ifnexty>=my-1:subline=' '*4+'[...]'self.win.addstr(nexty,3,abbr(' '*4+subline,self.screen_width-4),curses.A_NORMAL,)returnself.alert(alert_callback,f'Task details for {self.selected_task}',)
[文档]defselection_traceback(self):ifnotself.selected_task:returncurses.beep()task=self.state.tasks[self.selected_task]iftask.statenotinstates.EXCEPTION_STATES:returncurses.beep()defalert_callback(my,mx,xs):y=count(xs)forlineintask.traceback.split('\n'):self.win.addstr(next(y),3,line)returnself.alert(alert_callback,f'Task Exception Traceback for {self.selected_task}',)
[文档]defselection_result(self):ifnotself.selected_task:returndefalert_callback(my,mx,xs):y=count(xs)task=self.state.tasks[self.selected_task]result=(getattr(task,'result',None)orgetattr(task,'exception',None))forlineinwrap(resultor'',mx-2):self.win.addstr(next(y),3,line)returnself.alert(alert_callback,f'Task Result for {self.selected_task}',)