diff options
| author | sideshowdave7 | 2016-01-15 09:45:16 -0700 |
|---|---|---|
| committer | sideshowdave7 | 2016-01-15 09:45:16 -0700 |
| commit | eaaf2b2daf20f560f351cd18cdfd0621e65d18cc (patch) | |
| tree | 3cedb9f0256ed0f39590ca8e61a0601f76c97657 | |
| parent | 52db7abf98e9cb406e05b31347a1db8f3c1157cc (diff) | |
| download | eventmq-eaaf2b2daf20f560f351cd18cdfd0621e65d18cc.tar.gz eventmq-eaaf2b2daf20f560f351cd18cdfd0621e65d18cc.zip | |
MAN-38 MAN-40 Scheduler deletion fixes from jason's review
| -rw-r--r-- | eventmq/scheduler.py | 52 | ||||
| -rw-r--r-- | requirements.txt | 2 |
2 files changed, 25 insertions, 29 deletions
diff --git a/eventmq/scheduler.py b/eventmq/scheduler.py index 0ee6da6..0743414 100644 --- a/eventmq/scheduler.py +++ b/eventmq/scheduler.py | |||
| @@ -64,6 +64,9 @@ class Scheduler(HeartbeatMixin, EMQPService): | |||
| 64 | self.cron_jobs = [] | 64 | self.cron_jobs = [] |
| 65 | 65 | ||
| 66 | # contains dict of 4-item lists representing jobs based on an interval | 66 | # contains dict of 4-item lists representing jobs based on an interval |
| 67 | # key of this dictionary is a hash of company_id, path, and callable | ||
| 68 | # from the message of the SCHEDULE command received | ||
| 69 | # values of this list follow this format: | ||
| 67 | # IDX Descriptions | 70 | # IDX Descriptions |
| 68 | # 0 = the next (monotonic) ts that this job should be executed in | 71 | # 0 = the next (monotonic) ts that this job should be executed in |
| 69 | # 1 = the function to be executed | 72 | # 1 = the function to be executed |
| @@ -179,15 +182,7 @@ class Scheduler(HeartbeatMixin, EMQPService): | |||
| 179 | """ | 182 | """ |
| 180 | logger.info("Received new UNSCHEDULE request: {}".format(message)) | 183 | logger.info("Received new UNSCHEDULE request: {}".format(message)) |
| 181 | 184 | ||
| 182 | # Items to use for uniquely identifying this scheduled job | 185 | schedule_hash = self.schedule_hash(message) |
| 183 | # TODO: Pass company_id in a more rigid place | ||
| 184 | schedule_hash_items = {'company_id': message[2]['args'][0], | ||
| 185 | 'path': message[2]['path'], | ||
| 186 | 'callable': message[2]['callable']} | ||
| 187 | |||
| 188 | # Hash the sorted, immutable set of items in our identifying dict | ||
| 189 | schedule_hash = str(hash(tuple(frozenset(sorted( | ||
| 190 | schedule_hash_items.items()))))) | ||
| 191 | 186 | ||
| 192 | if schedule_hash in self.interval_jobs: | 187 | if schedule_hash in self.interval_jobs: |
| 193 | # Remove scheduled job | 188 | # Remove scheduled job |
| @@ -202,7 +197,8 @@ class Scheduler(HeartbeatMixin, EMQPService): | |||
| 202 | if (self.redis_server): | 197 | if (self.redis_server): |
| 203 | if (self.redis_server.get(schedule_hash)): | 198 | if (self.redis_server.get(schedule_hash)): |
| 204 | self.redis_server.delete(schedule_hash) | 199 | self.redis_server.delete(schedule_hash) |
| 205 | self.redis_server.set('interval_jobs', self.interval_jobs.keys()) | 200 | self.redis_server.set('interval_jobs', |
| 201 | self.interval_jobs.keys()) | ||
| 206 | self.redis_server.save() | 202 | self.redis_server.save() |
| 207 | 203 | ||
| 208 | def on_schedule(self, msgid, message): | 204 | def on_schedule(self, msgid, message): |
| @@ -213,24 +209,7 @@ class Scheduler(HeartbeatMixin, EMQPService): | |||
| 213 | queue = message[0] | 209 | queue = message[0] |
| 214 | interval = int(message[1]) | 210 | interval = int(message[1]) |
| 215 | inter_iter = IntervalIter(monotonic(), interval) | 211 | inter_iter = IntervalIter(monotonic(), interval) |
| 216 | 212 | schedule_hash = self.schedule_hash(message) | |
| 217 | # Items to use for uniquely identifying this scheduled job | ||
| 218 | # TODO: Pass company_id in a more rigid place | ||
| 219 | schedule_hash_items = {'company_id': message[2]['args'][0], | ||
| 220 | 'path': message[2]['path'], | ||
| 221 | 'callable': message[2]['callable']} | ||
| 222 | |||
| 223 | # Hash the sorted, immutable set of items in our identifying dict | ||
| 224 | schedule_hash = str(hash(tuple(frozenset(sorted( | ||
| 225 | schedule_hash_items.items()))))) | ||
| 226 | |||
| 227 | # Notify if this is updating existing, or new | ||
| 228 | if (schedule_hash in self.interval_jobs): | ||
| 229 | logger.debug('Update existing scheduled job with %s' | ||
| 230 | % schedule_hash) | ||
| 231 | else: | ||
| 232 | logger.debug('Creating a new scheduled job with %s' | ||
| 233 | % schedule_hash) | ||
| 234 | 213 | ||
| 235 | self.interval_jobs[schedule_hash] = [ | 214 | self.interval_jobs[schedule_hash] = [ |
| 236 | next(inter_iter), | 215 | next(inter_iter), |
| @@ -252,6 +231,23 @@ class Scheduler(HeartbeatMixin, EMQPService): | |||
| 252 | Noop command. The logic for heartbeating is in the event loop. | 231 | Noop command. The logic for heartbeating is in the event loop. |
| 253 | """ | 232 | """ |
| 254 | 233 | ||
| 234 | def schedule_hash(self, message): | ||
| 235 | """ | ||
| 236 | Create a unique identifier for this message for storing | ||
| 237 | and referencing later | ||
| 238 | """ | ||
| 239 | # Items to use for uniquely identifying this scheduled job | ||
| 240 | # TODO: Pass company_id in a more rigid place | ||
| 241 | schedule_hash_items = {'company_id': message[2]['args'][0], | ||
| 242 | 'path': message[2]['path'], | ||
| 243 | 'callable': message[2]['callable']} | ||
| 244 | |||
| 245 | # Hash the sorted, immutable set of items in our identifying dict | ||
| 246 | schedule_hash = str(hash(tuple(frozenset(sorted( | ||
| 247 | schedule_hash_items.items()))))) | ||
| 248 | |||
| 249 | return schedule_hash | ||
| 250 | |||
| 255 | def scheduler_main(self): | 251 | def scheduler_main(self): |
| 256 | """ | 252 | """ |
| 257 | Kick off scheduler with logging and settings import | 253 | Kick off scheduler with logging and settings import |
diff --git a/requirements.txt b/requirements.txt index 52b5cf7..01a8e09 100644 --- a/requirements.txt +++ b/requirements.txt | |||
| @@ -2,7 +2,7 @@ pyzmq==14.6.0 | |||
| 2 | six==1.10.0 | 2 | six==1.10.0 |
| 3 | monotonic==0.4 # A clock who's time is not changed. used for scheduling | 3 | monotonic==0.4 # A clock who's time is not changed. used for scheduling |
| 4 | croniter==0.3.10 | 4 | croniter==0.3.10 |
| 5 | redis==2.10.5 | 5 | redis==2.10.3 |
| 6 | 6 | ||
| 7 | # Documentation | 7 | # Documentation |
| 8 | sphinxcontrib-napoleon==0.4.3 | 8 | sphinxcontrib-napoleon==0.4.3 |