From eaaf2b2daf20f560f351cd18cdfd0621e65d18cc Mon Sep 17 00:00:00 2001 From: sideshowdave7 Date: Fri, 15 Jan 2016 09:45:16 -0700 Subject: MAN-38 MAN-40 Scheduler deletion fixes from jason's review --- eventmq/scheduler.py | 52 ++++++++++++++++++++++++---------------------------- requirements.txt | 2 +- 2 files changed, 25 insertions(+), 29 deletions(-) diff --git a/eventmq/scheduler.py b/eventmq/scheduler.py index 0ee6da6..0743414 100644 --- a/eventmq/scheduler.py +++ b/eventmq/scheduler.py @@ -64,6 +64,9 @@ class Scheduler(HeartbeatMixin, EMQPService): self.cron_jobs = [] # contains dict of 4-item lists representing jobs based on an interval + # key of this dictionary is a hash of company_id, path, and callable + # from the message of the SCHEDULE command received + # values of this list follow this format: # IDX Descriptions # 0 = the next (monotonic) ts that this job should be executed in # 1 = the function to be executed @@ -179,15 +182,7 @@ class Scheduler(HeartbeatMixin, EMQPService): """ logger.info("Received new UNSCHEDULE request: {}".format(message)) - # Items to use for uniquely identifying this scheduled job - # TODO: Pass company_id in a more rigid place - schedule_hash_items = {'company_id': message[2]['args'][0], - 'path': message[2]['path'], - 'callable': message[2]['callable']} - - # Hash the sorted, immutable set of items in our identifying dict - schedule_hash = str(hash(tuple(frozenset(sorted( - schedule_hash_items.items()))))) + schedule_hash = self.schedule_hash(message) if schedule_hash in self.interval_jobs: # Remove scheduled job @@ -202,7 +197,8 @@ class Scheduler(HeartbeatMixin, EMQPService): if (self.redis_server): if (self.redis_server.get(schedule_hash)): self.redis_server.delete(schedule_hash) - self.redis_server.set('interval_jobs', self.interval_jobs.keys()) + self.redis_server.set('interval_jobs', + self.interval_jobs.keys()) self.redis_server.save() def on_schedule(self, msgid, message): @@ -213,24 +209,7 @@ class Scheduler(HeartbeatMixin, EMQPService): queue = message[0] interval = int(message[1]) inter_iter = IntervalIter(monotonic(), interval) - - # Items to use for uniquely identifying this scheduled job - # TODO: Pass company_id in a more rigid place - schedule_hash_items = {'company_id': message[2]['args'][0], - 'path': message[2]['path'], - 'callable': message[2]['callable']} - - # Hash the sorted, immutable set of items in our identifying dict - schedule_hash = str(hash(tuple(frozenset(sorted( - schedule_hash_items.items()))))) - - # Notify if this is updating existing, or new - if (schedule_hash in self.interval_jobs): - logger.debug('Update existing scheduled job with %s' - % schedule_hash) - else: - logger.debug('Creating a new scheduled job with %s' - % schedule_hash) + schedule_hash = self.schedule_hash(message) self.interval_jobs[schedule_hash] = [ next(inter_iter), @@ -252,6 +231,23 @@ class Scheduler(HeartbeatMixin, EMQPService): Noop command. The logic for heartbeating is in the event loop. """ + def schedule_hash(self, message): + """ + Create a unique identifier for this message for storing + and referencing later + """ + # Items to use for uniquely identifying this scheduled job + # TODO: Pass company_id in a more rigid place + schedule_hash_items = {'company_id': message[2]['args'][0], + 'path': message[2]['path'], + 'callable': message[2]['callable']} + + # Hash the sorted, immutable set of items in our identifying dict + schedule_hash = str(hash(tuple(frozenset(sorted( + schedule_hash_items.items()))))) + + return schedule_hash + def scheduler_main(self): """ Kick off scheduler with logging and settings import diff --git a/requirements.txt b/requirements.txt index 52b5cf7..01a8e09 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,7 +2,7 @@ pyzmq==14.6.0 six==1.10.0 monotonic==0.4 # A clock who's time is not changed. used for scheduling croniter==0.3.10 -redis==2.10.5 +redis==2.10.3 # Documentation sphinxcontrib-napoleon==0.4.3 -- cgit v1.2.1