aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorsideshowdave72016-01-15 09:45:16 -0700
committersideshowdave72016-01-15 09:45:16 -0700
commiteaaf2b2daf20f560f351cd18cdfd0621e65d18cc (patch)
tree3cedb9f0256ed0f39590ca8e61a0601f76c97657
parent52db7abf98e9cb406e05b31347a1db8f3c1157cc (diff)
downloadeventmq-eaaf2b2daf20f560f351cd18cdfd0621e65d18cc.tar.gz
eventmq-eaaf2b2daf20f560f351cd18cdfd0621e65d18cc.zip
MAN-38 MAN-40 Scheduler deletion fixes from jason's review
-rw-r--r--eventmq/scheduler.py52
-rw-r--r--requirements.txt2
2 files changed, 25 insertions, 29 deletions
diff --git a/eventmq/scheduler.py b/eventmq/scheduler.py
index 0ee6da6..0743414 100644
--- a/eventmq/scheduler.py
+++ b/eventmq/scheduler.py
@@ -64,6 +64,9 @@ class Scheduler(HeartbeatMixin, EMQPService):
64 self.cron_jobs = [] 64 self.cron_jobs = []
65 65
66 # contains dict of 4-item lists representing jobs based on an interval 66 # contains dict of 4-item lists representing jobs based on an interval
67 # key of this dictionary is a hash of company_id, path, and callable
68 # from the message of the SCHEDULE command received
69 # values of this list follow this format:
67 # IDX Descriptions 70 # IDX Descriptions
68 # 0 = the next (monotonic) ts that this job should be executed in 71 # 0 = the next (monotonic) ts that this job should be executed in
69 # 1 = the function to be executed 72 # 1 = the function to be executed
@@ -179,15 +182,7 @@ class Scheduler(HeartbeatMixin, EMQPService):
179 """ 182 """
180 logger.info("Received new UNSCHEDULE request: {}".format(message)) 183 logger.info("Received new UNSCHEDULE request: {}".format(message))
181 184
182 # Items to use for uniquely identifying this scheduled job 185 schedule_hash = self.schedule_hash(message)
183 # TODO: Pass company_id in a more rigid place
184 schedule_hash_items = {'company_id': message[2]['args'][0],
185 'path': message[2]['path'],
186 'callable': message[2]['callable']}
187
188 # Hash the sorted, immutable set of items in our identifying dict
189 schedule_hash = str(hash(tuple(frozenset(sorted(
190 schedule_hash_items.items())))))
191 186
192 if schedule_hash in self.interval_jobs: 187 if schedule_hash in self.interval_jobs:
193 # Remove scheduled job 188 # Remove scheduled job
@@ -202,7 +197,8 @@ class Scheduler(HeartbeatMixin, EMQPService):
202 if (self.redis_server): 197 if (self.redis_server):
203 if (self.redis_server.get(schedule_hash)): 198 if (self.redis_server.get(schedule_hash)):
204 self.redis_server.delete(schedule_hash) 199 self.redis_server.delete(schedule_hash)
205 self.redis_server.set('interval_jobs', self.interval_jobs.keys()) 200 self.redis_server.set('interval_jobs',
201 self.interval_jobs.keys())
206 self.redis_server.save() 202 self.redis_server.save()
207 203
208 def on_schedule(self, msgid, message): 204 def on_schedule(self, msgid, message):
@@ -213,24 +209,7 @@ class Scheduler(HeartbeatMixin, EMQPService):
213 queue = message[0] 209 queue = message[0]
214 interval = int(message[1]) 210 interval = int(message[1])
215 inter_iter = IntervalIter(monotonic(), interval) 211 inter_iter = IntervalIter(monotonic(), interval)
216 212 schedule_hash = self.schedule_hash(message)
217 # Items to use for uniquely identifying this scheduled job
218 # TODO: Pass company_id in a more rigid place
219 schedule_hash_items = {'company_id': message[2]['args'][0],
220 'path': message[2]['path'],
221 'callable': message[2]['callable']}
222
223 # Hash the sorted, immutable set of items in our identifying dict
224 schedule_hash = str(hash(tuple(frozenset(sorted(
225 schedule_hash_items.items())))))
226
227 # Notify if this is updating existing, or new
228 if (schedule_hash in self.interval_jobs):
229 logger.debug('Update existing scheduled job with %s'
230 % schedule_hash)
231 else:
232 logger.debug('Creating a new scheduled job with %s'
233 % schedule_hash)
234 213
235 self.interval_jobs[schedule_hash] = [ 214 self.interval_jobs[schedule_hash] = [
236 next(inter_iter), 215 next(inter_iter),
@@ -252,6 +231,23 @@ class Scheduler(HeartbeatMixin, EMQPService):
252 Noop command. The logic for heartbeating is in the event loop. 231 Noop command. The logic for heartbeating is in the event loop.
253 """ 232 """
254 233
234 def schedule_hash(self, message):
235 """
236 Create a unique identifier for this message for storing
237 and referencing later
238 """
239 # Items to use for uniquely identifying this scheduled job
240 # TODO: Pass company_id in a more rigid place
241 schedule_hash_items = {'company_id': message[2]['args'][0],
242 'path': message[2]['path'],
243 'callable': message[2]['callable']}
244
245 # Hash the sorted, immutable set of items in our identifying dict
246 schedule_hash = str(hash(tuple(frozenset(sorted(
247 schedule_hash_items.items())))))
248
249 return schedule_hash
250
255 def scheduler_main(self): 251 def scheduler_main(self):
256 """ 252 """
257 Kick off scheduler with logging and settings import 253 Kick off scheduler with logging and settings import
diff --git a/requirements.txt b/requirements.txt
index 52b5cf7..01a8e09 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -2,7 +2,7 @@ pyzmq==14.6.0
2six==1.10.0 2six==1.10.0
3monotonic==0.4 # A clock who's time is not changed. used for scheduling 3monotonic==0.4 # A clock who's time is not changed. used for scheduling
4croniter==0.3.10 4croniter==0.3.10
5redis==2.10.5 5redis==2.10.3
6 6
7# Documentation 7# Documentation
8sphinxcontrib-napoleon==0.4.3 8sphinxcontrib-napoleon==0.4.3