diff options
| author | jason | 2017-12-14 17:07:50 -0700 |
|---|---|---|
| committer | jason | 2017-12-15 09:03:24 -0700 |
| commit | 2a1dac6a47553e6522fecd11b184c26e10cb30f4 (patch) | |
| tree | aa6b6e1a2b91ecdb71fb3e8a6de044864844ce07 | |
| parent | 900a1e4b95a19f281cd78421af8b327351eedb56 (diff) | |
| download | eventmq-2a1dac6a47553e6522fecd11b184c26e10cb30f4.tar.gz eventmq-2a1dac6a47553e6522fecd11b184c26e10cb30f4.zip | |
Fix bug in scheduler; Optimize scheduler code a bit
- Fixes a bug when cancelling schedules due to run count. The schedule
cancelled weren't the schedules that were marked for cancellation,
but the schedule that was last checked to see if it needed to be
executed. (use `job_id` from the cancellation loop instead of the job
id from the previous look `k`)
- Consolidate the job cancellation logic. The main loop and
`unschedule` both implemented the same logic. This was consolidated
into a new `cancel_job` method
- Optimize the main loop a bit. If `run_count` is 0 then don't write
to redis and cancel on the next loop. Instead just cancel the
schedule.
| -rw-r--r-- | eventmq/scheduler.py | 99 | ||||
| -rw-r--r-- | eventmq/utils/timeutils.py | 12 |
2 files changed, 60 insertions, 51 deletions
diff --git a/eventmq/scheduler.py b/eventmq/scheduler.py index c1103b9..32c9b46 100644 --- a/eventmq/scheduler.py +++ b/eventmq/scheduler.py | |||
| @@ -150,8 +150,8 @@ class Scheduler(HeartbeatMixin, EMQPService): | |||
| 150 | msg = v[1] | 150 | msg = v[1] |
| 151 | queue = v[3] | 151 | queue = v[3] |
| 152 | 152 | ||
| 153 | logger.debug("Time is: %s; Schedule is: %s - Running %s" | 153 | logger.debug("Time is: {}; Schedule is: {} - Running {} " |
| 154 | % (ts_now, v[0], msg)) | 154 | "({})".format(m_now, v[0], k, msg)) |
| 155 | 155 | ||
| 156 | # v[4] is the current remaining run_count | 156 | # v[4] is the current remaining run_count |
| 157 | if v[4] != INFINITE_RUN_COUNT: | 157 | if v[4] != INFINITE_RUN_COUNT: |
| @@ -161,42 +161,41 @@ class Scheduler(HeartbeatMixin, EMQPService): | |||
| 161 | else: | 161 | else: |
| 162 | # Decrement run_count | 162 | # Decrement run_count |
| 163 | v[4] -= 1 | 163 | v[4] -= 1 |
| 164 | # Persist the change to redis | 164 | if v[4] > 0: |
| 165 | try: | 165 | # Update the next run time |
| 166 | message = deserialize(self.redis_server.get(k)) | 166 | v[0] = next(v[2]) |
| 167 | new_headers = [] | 167 | # Persist the change to redis if there are run |
| 168 | for header in message[1].split(','): | 168 | # counts still left |
| 169 | if 'run_count:' in header: | 169 | try: |
| 170 | new_headers.append( | 170 | message = deserialize( |
| 171 | 'run_count:{}'.format(v[4])) | 171 | self.redis_server.get(k)) |
| 172 | else: | 172 | new_headers = [] |
| 173 | new_headers.append(header) | 173 | for header in message[1].split(','): |
| 174 | message[1] = ",".join(new_headers) | 174 | if 'run_count:' in header: |
| 175 | self.redis_server.set(k, serialize(message)) | 175 | new_headers.append( |
| 176 | except Exception as e: | 176 | 'run_count:{}'.format(v[4])) |
| 177 | logger.warning( | 177 | else: |
| 178 | 'Unable to update key in redis ' | 178 | new_headers.append(header) |
| 179 | 'server: {}'.format(e)) | 179 | message[1] = ",".join(new_headers) |
| 180 | # Perform the request since run_count still > 0 | 180 | self.redis_server.set( |
| 181 | k, serialize(message)) | ||
| 182 | except Exception as e: | ||
| 183 | logger.warning( | ||
| 184 | 'Unable to update key in redis ' | ||
| 185 | 'server: {}'.format(e)) | ||
| 186 | else: | ||
| 187 | cancel_jobs.append(k) | ||
| 188 | |||
| 189 | # Perform the job since run_count was still > 0 | ||
| 181 | self.send_request(msg, queue=queue) | 190 | self.send_request(msg, queue=queue) |
| 182 | v[0] = next(v[2]) | ||
| 183 | else: | 191 | else: |
| 184 | # Scheduled job is in running infinitely | 192 | # Scheduled job is in running infinitely |
| 185 | # Send job and update next schedule time | 193 | # Send job and update next schedule time |
| 186 | self.send_request(msg, queue=queue) | 194 | self.send_request(msg, queue=queue) |
| 187 | v[0] = next(v[2]) | 195 | v[0] = next(v[2]) |
| 188 | 196 | ||
| 189 | for job in cancel_jobs: | 197 | for job_id in cancel_jobs: |
| 190 | try: | 198 | self.cancel_job(job_id) |
| 191 | logger.debug('Cancelling job due to run_count: {}' | ||
| 192 | .format(k)) | ||
| 193 | self.redis_server.delete(k) | ||
| 194 | self.redis_server.lrem('interval_jobs', 0, k) | ||
| 195 | except Exception as e: | ||
| 196 | logger.warning( | ||
| 197 | 'Unable to update key in redis ' | ||
| 198 | 'server: {}'.format(e)) | ||
| 199 | del self.interval_jobs[k] | ||
| 200 | 199 | ||
| 201 | if not self.maybe_send_heartbeat(events): | 200 | if not self.maybe_send_heartbeat(events): |
| 202 | break | 201 | break |
| @@ -253,24 +252,30 @@ class Scheduler(HeartbeatMixin, EMQPService): | |||
| 253 | """ | 252 | """ |
| 254 | logger.info("Received new UNSCHEDULE request: {}".format(message)) | 253 | logger.info("Received new UNSCHEDULE request: {}".format(message)) |
| 255 | 254 | ||
| 255 | schedule_hash = self.schedule_hash(message) | ||
| 256 | # TODO: Notify router whether or not this succeeds | 256 | # TODO: Notify router whether or not this succeeds |
| 257 | self.unschedule_job(message) | 257 | self.cancel_job(schedule_hash) |
| 258 | 258 | ||
| 259 | def unschedule_job(self, message): | 259 | def cancel_job(self, schedule_hash): |
| 260 | """ | 260 | """ |
| 261 | Unschedules a job if it exists based on the message used to generate it | 261 | Cancels a job if it exists |
| 262 | """ | ||
| 263 | schedule_hash = self.schedule_hash(message) | ||
| 264 | 262 | ||
| 263 | Args: | ||
| 264 | schedule_hash (str): The schedule's unique hash. See | ||
| 265 | :meth:`Scheduler.schedule_hash` | ||
| 266 | """ | ||
| 265 | if schedule_hash in self.interval_jobs: | 267 | if schedule_hash in self.interval_jobs: |
| 266 | # Remove scheduled job | 268 | |
| 267 | self.interval_jobs.pop(schedule_hash) | 269 | # If the hash wasn't found in either `cron_jobs` or `interval_jobs` |
| 268 | elif schedule_hash in self.cron_jobs: | 270 | # then it's safe to assume it's already deleted. |
| 269 | # Remove scheduled job | 271 | try: |
| 270 | self.cron_jobs.pop(schedule_hash) | 272 | del self.interval_jobs[schedule_hash] |
| 271 | else: | 273 | except KeyError: |
| 272 | logger.warning("Couldn't find matching schedule for unschedule " + | 274 | pass |
| 273 | "request") | 275 | try: |
| 276 | del self.cron_jobs[schedule_hash] | ||
| 277 | except KeyError: | ||
| 278 | pass | ||
| 274 | 279 | ||
| 275 | # Double check the redis server even if we didn't find the hash | 280 | # Double check the redis server even if we didn't find the hash |
| 276 | # in memory | 281 | # in memory |
| @@ -279,10 +284,10 @@ class Scheduler(HeartbeatMixin, EMQPService): | |||
| 279 | self.redis_server.delete(schedule_hash) | 284 | self.redis_server.delete(schedule_hash) |
| 280 | self.redis_server.lrem('interval_jobs', 0, schedule_hash) | 285 | self.redis_server.lrem('interval_jobs', 0, schedule_hash) |
| 281 | self.redis_server.save() | 286 | self.redis_server.save() |
| 282 | except redis.ConnectionError: | 287 | except redis.ConnectionError as e: |
| 283 | logger.warning('Could not contact redis server') | 288 | logger.warning('Could not contact redis server: {}'.format(e)) |
| 284 | except Exception as e: | 289 | except Exception as e: |
| 285 | logger.warning(str(e)) | 290 | logger.warning(str(e), exc_info=True) |
| 286 | 291 | ||
| 287 | def load_job_from_redis(self, message): | 292 | def load_job_from_redis(self, message): |
| 288 | """ | 293 | """ |
diff --git a/eventmq/utils/timeutils.py b/eventmq/utils/timeutils.py index b33e18d..12cf7cc 100644 --- a/eventmq/utils/timeutils.py +++ b/eventmq/utils/timeutils.py | |||
| @@ -63,19 +63,23 @@ class IntervalIter(object): | |||
| 63 | next(interval) # 300 | 63 | next(interval) # 300 |
| 64 | next(interval) # 600 | 64 | next(interval) # 600 |
| 65 | """ | 65 | """ |
| 66 | def __init__(self, start_value, interval_secs): | 66 | def __init__(self, start_value, interval_secs, iterate_immediately=False): |
| 67 | """ | 67 | """ |
| 68 | Args: | 68 | Args: |
| 69 | start_value (numeric) - the timestamp to begin with. usually gotten | 69 | start_value (numeric): the timestamp to begin with. usually gotten |
| 70 | via :func:`monotonic` or :func:`timestamp` | 70 | via :func:`monotonic` or :func:`timestamp` |
| 71 | interval_secs (int) - the number of seconds between intervals | 71 | interval_secs (int): the number of seconds between intervals |
| 72 | iterate_immediately (bool): When this is ``True`` the value of this | ||
| 73 | iterator will increase ``start_value`` by ``interval_secs`` | ||
| 74 | immediately. *Default is False* | ||
| 72 | """ | 75 | """ |
| 73 | self.current = start_value | 76 | self.current = start_value |
| 74 | self.interval_secs = interval_secs | 77 | self.interval_secs = interval_secs |
| 75 | 78 | ||
| 76 | # iterate the first time so the first call to .next() is interval_secs | 79 | # iterate the first time so the first call to .next() is interval_secs |
| 77 | # + start_value | 80 | # + start_value |
| 78 | self.__next__() | 81 | if iterate_immediately: |
| 82 | self.__next__() | ||
| 79 | 83 | ||
| 80 | def __iter__(self): | 84 | def __iter__(self): |
| 81 | return self | 85 | return self |