[文档]classSchedulingError(Exception):"""An error occurred while scheduling a task."""
classBeatLazyFunc:"""A lazy function declared in 'beat_schedule' and called before sending to worker. Example: beat_schedule = { 'test-every-5-minutes': { 'task': 'test', 'schedule': 300, 'kwargs': { "current": BeatCallBack(datetime.datetime.now) } } } """def__init__(self,func,*args,**kwargs):self._func=funcself._func_params={"args":args,"kwargs":kwargs}def__call__(self):returnself.delay()defdelay(self):returnself._func(*self._func_params["args"],**self._func_params["kwargs"])
[文档]@total_orderingclassScheduleEntry:"""An entry in the scheduler. Arguments: name (str): see :attr:`name`. schedule (~celery.schedules.schedule): see :attr:`schedule`. args (Tuple): see :attr:`args`. kwargs (Dict): see :attr:`kwargs`. options (Dict): see :attr:`options`. last_run_at (~datetime.datetime): see :attr:`last_run_at`. total_run_count (int): see :attr:`total_run_count`. relative (bool): Is the time relative to when the server starts? """#: The task namename=None#: The schedule (:class:`~celery.schedules.schedule`)schedule=None#: Positional arguments to apply.args=None#: Keyword arguments to apply.kwargs=None#: Task execution options.options=None#: The time and date of when this task was last scheduled.last_run_at=None#: Total number of times this task has been scheduled.total_run_count=0def__init__(self,name=None,task=None,last_run_at=None,total_run_count=None,schedule=None,args=(),kwargs=None,options=None,relative=False,app=None):self.app=appself.name=nameself.task=taskself.args=argsself.kwargs=kwargsifkwargselse{}self.options=optionsifoptionselse{}self.schedule=maybe_schedule(schedule,relative,app=self.app)self.last_run_at=last_run_atorself.default_now()self.total_run_count=total_run_countor0
_default_now=default_now# compatdef_next_instance(self,last_run_at=None):"""Return new instance, with date and count fields updated."""returnself.__class__(**dict(self,last_run_at=last_run_atorself.default_now(),total_run_count=self.total_run_count+1,))__next__=next=_next_instance# for 2to3def__reduce__(self):returnself.__class__,(self.name,self.task,self.last_run_at,self.total_run_count,self.schedule,self.args,self.kwargs,self.options,)
[文档]defupdate(self,other):"""Update values from another entry. Will only update "editable" fields: ``task``, ``schedule``, ``args``, ``kwargs``, ``options``. """self.__dict__.update({'task':other.task,'schedule':other.schedule,'args':other.args,'kwargs':other.kwargs,'options':other.options,})
def__iter__(self):returniter(vars(self).items())def__repr__(self):return'<{name}: {0.name}{call}{0.schedule}'.format(self,call=reprcall(self.task,self.argsor(),self.kwargsor{}),name=type(self).__name__,)def__lt__(self,other):ifisinstance(other,ScheduleEntry):# How the object is ordered doesn't really matter, as# in the scheduler heap, the order is decided by the# preceding members of the tuple ``(time, priority, entry)``.## If all that's left to order on is the entry then it can# just as well be random.returnid(self)<id(other)returnNotImplemented
[文档]classScheduler:"""Scheduler for periodic tasks. The :program:`celery beat` program may instantiate this class multiple times for introspection purposes, but then with the ``lazy`` argument set. It's important for subclasses to be idempotent when this argument is set. Arguments: schedule (~celery.schedules.schedule): see :attr:`schedule`. max_interval (int): see :attr:`max_interval`. lazy (bool): Don't set up the schedule. """Entry=ScheduleEntry#: The schedule dict/shelve.schedule=None#: Maximum time to sleep between re-checking the schedule.max_interval=DEFAULT_MAX_INTERVAL#: How often to sync the schedule (3 minutes by default)sync_every=3*60#: How many tasks can be called before a sync is forced.sync_every_tasks=None_last_sync=None_tasks_since_sync=0logger=logger# compatdef__init__(self,app,schedule=None,max_interval=None,Producer=None,lazy=False,sync_every_tasks=None,**kwargs):self.app=appself.data=maybe_evaluate({}ifscheduleisNoneelseschedule)self.max_interval=(max_intervalorapp.conf.beat_max_loop_intervalorself.max_interval)self.Producer=Producerorapp.amqp.Producerself._heap=Noneself.old_schedulers=Noneself.sync_every_tasks=(app.conf.beat_sync_everyifsync_every_tasksisNoneelsesync_every_tasks)ifnotlazy:self.setup_schedule()
def_when(self,entry,next_time_to_run,mktime=timegm):"""Return a utc timestamp, make sure heapq in correct order."""adjust=self.adjustas_now=maybe_make_aware(entry.default_now())return(mktime(as_now.utctimetuple())+as_now.microsecond/1e6+(adjust(next_time_to_run)or0))
[文档]defpopulate_heap(self,event_t=event_t,heapify=heapq.heapify):"""Populate the heap with the data contained in the schedule."""priority=5self._heap=[]forentryinself.schedule.values():is_due,next_call_delay=entry.is_due()self._heap.append(event_t(self._when(entry,0ifis_dueelsenext_call_delay)or0,priority,entry))heapify(self._heap)
# pylint disable=redefined-outer-name
[文档]deftick(self,event_t=event_t,min=min,heappop=heapq.heappop,heappush=heapq.heappush):"""Run a tick - one iteration of the scheduler. Executes one due task per call. Returns: float: preferred delay in seconds for next call. """adjust=self.adjustmax_interval=self.max_intervalif(self._heapisNoneornotself.schedules_equal(self.old_schedulers,self.schedule)):self.old_schedulers=copy.copy(self.schedule)self.populate_heap()H=self._heapifnotH:returnmax_intervalevent=H[0]entry=event[2]is_due,next_time_to_run=self.is_due(entry)ifis_due:verify=heappop(H)ifverifyisevent:next_entry=self.reserve(entry)self.apply_entry(entry,producer=self.producer)heappush(H,event_t(self._when(next_entry,next_time_to_run),event[1],next_entry))return0else:heappush(H,verify)returnmin(verify[0],max_interval)adjusted_next_time_to_run=adjust(next_time_to_run)returnmin(adjusted_next_time_to_runifis_numeric_value(adjusted_next_time_to_run)elsemax_interval,max_interval)
[文档]defapply_async(self,entry,producer=None,advance=True,**kwargs):# Update time-stamps and run counts before we actually execute,# so we have that done if an exception is raised (doesn't schedule# forever.)entry=self.reserve(entry)ifadvanceelseentrytask=self.app.tasks.get(entry.task)try:entry_args=_evaluate_entry_args(entry.args)entry_kwargs=_evaluate_entry_kwargs(entry.kwargs)iftask:returntask.apply_async(entry_args,entry_kwargs,producer=producer,**entry.options)else:returnself.send_task(entry.task,entry_args,entry_kwargs,producer=producer,**entry.options)exceptExceptionasexc:# pylint: disable=broad-exceptreraise(SchedulingError,SchedulingError("Couldn't apply scheduled task {0.name}: {exc}".format(entry,exc=exc)),sys.exc_info()[2])finally:self._tasks_since_sync+=1ifself.should_sync():self._do_sync()
[文档]defmerge_inplace(self,b):schedule=self.scheduleA,B=set(schedule),set(b)# Remove items from disk not in the schedule anymore.forkeyinA^B:schedule.pop(key,None)# Update and add new items in the scheduleforkeyinB:entry=self.Entry(**dict(b[key],name=key,app=self.app))ifschedule.get(key):schedule[key].update(entry)else:schedule[key]=entry
def_ensure_connected(self):# callback called for each retry while the connection# can't be established.def_error_handler(exc,interval):error('beat: Connection error: %s. ''Trying again in %s seconds...',exc,interval)returnself.connection.ensure_connection(_error_handler,self.app.conf.broker_connection_max_retries)
[文档]classPersistentScheduler(Scheduler):"""Scheduler backed by :mod:`shelve` database."""persistence=shelveknown_suffixes=('','.db','.dat','.bak','.dir')_store=Nonedef__init__(self,*args,**kwargs):self.schedule_filename=kwargs.get('schedule_filename')super().__init__(*args,**kwargs)def_remove_db(self):forsuffixinself.known_suffixes:withplatforms.ignore_errno(errno.ENOENT):os.remove(self.schedule_filename+suffix)def_open_schedule(self):returnself.persistence.open(self.schedule_filename,writeback=True)def_destroy_open_corrupted_schedule(self,exc):error('Removing corrupted schedule file %r: %r',self.schedule_filename,exc,exc_info=True)self._remove_db()returnself._open_schedule()
[文档]defsetup_schedule(self):try:self._store=self._open_schedule()# In some cases there may be different errors from a storage# backend for corrupted files. Example - DBPageNotFoundError# exception from bsddb. In such case the file will be# successfully opened but the error will be raised on first key# retrieving.self._store.keys()exceptExceptionasexc:# pylint: disable=broad-exceptself._store=self._destroy_open_corrupted_schedule(exc)self._create_schedule()tz=self.app.conf.timezonestored_tz=self._store.get('tz')ifstored_tzisnotNoneandstored_tz!=tz:warning('Reset: Timezone changed from %r to %r',stored_tz,tz)self._store.clear()# Timezone changed, reset db!utc=self.app.conf.enable_utcstored_utc=self._store.get('utc_enabled')ifstored_utcisnotNoneandstored_utc!=utc:choices={True:'enabled',False:'disabled'}warning('Reset: UTC changed from %s to %s',choices[stored_utc],choices[utc])self._store.clear()# UTC setting changed, reset db!entries=self._store.setdefault('entries',{})self.merge_inplace(self.app.conf.beat_schedule)self.install_default_entries(self.schedule)self._store.update({'__version__':__version__,'tz':tz,'utc_enabled':utc,})self.sync()debug('Current schedule:\n'+'\n'.join(repr(entry)forentryinentries.values()))
def_create_schedule(self):for_in(1,2):try:self._store['entries']except(KeyError,UnicodeDecodeError,TypeError):# new schedule dbtry:self._store['entries']={}except(KeyError,UnicodeDecodeError,TypeError)+dbm.errorasexc:self._store=self._destroy_open_corrupted_schedule(exc)continueelse:if'__version__'notinself._store:warning('DB Reset: Account for new __version__ field')self._store.clear()# remove schedule at 2.2.2 upgrade.elif'tz'notinself._store:warning('DB Reset: Account for new tz field')self._store.clear()# remove schedule at 3.0.8 upgradeelif'utc_enabled'notinself._store:warning('DB Reset: Account for new utc_enabled field')self._store.clear()# remove schedule at 3.0.9 upgradebreak
[文档]defstart(self,embedded_process=False):info('beat: Starting...')debug('beat: Ticking with max interval->%s',humanize_seconds(self.scheduler.max_interval))signals.beat_init.send(sender=self)ifembedded_process:signals.beat_embedded_init.send(sender=self)platforms.set_process_title('celery beat')try:whilenotself._is_shutdown.is_set():interval=self.scheduler.tick()ifintervalandinterval>0.0:debug('beat: Waking up %s.',humanize_seconds(interval,prefix='in '))time.sleep(interval)ifself.scheduler.should_sync():self.scheduler._do_sync()except(KeyboardInterrupt,SystemExit):self._is_shutdown.set()finally:self.sync()
class_Threaded(Thread):"""Embedded task scheduler using threading."""def__init__(self,app,**kwargs):super().__init__()self.app=appself.service=Service(app,**kwargs)self.daemon=Trueself.name='Beat'defrun(self):self.app.set_current()self.service.start()defstop(self):self.service.stop(wait=True)try:ensure_multiprocessing()exceptNotImplementedError:# pragma: no cover_Process=Noneelse:class_Process(Process):def__init__(self,app,**kwargs):super().__init__()self.app=appself.service=Service(app,**kwargs)self.name='Beat'defrun(self):reset_signals(full=False)platforms.close_open_fds([sys.__stdin__,sys.__stdout__,sys.__stderr__,]+list(iter_open_logger_fds()))self.app.set_default()self.app.set_current()self.service.start(embedded_process=True)defstop(self):self.service.stop()self.terminate()
[文档]defEmbeddedService(app,max_interval=None,**kwargs):"""Return embedded clock service. Arguments: thread (bool): Run threaded instead of as a separate process. Uses :mod:`multiprocessing` by default, if available. """ifkwargs.pop('thread',False)or_ProcessisNone:# Need short max interval to be able to stop thread# in reasonable time.return_Threaded(app,max_interval=1,**kwargs)return_Process(app,max_interval=max_interval,**kwargs)