diff options
| -rw-r--r-- | eventmq/scheduler.py | 99 | ||||
| -rw-r--r-- | eventmq/utils/timeutils.py | 12 |
2 files changed, 60 insertions, 51 deletions
diff --git a/eventmq/scheduler.py b/eventmq/scheduler.py index c1103b9..32c9b46 100644 --- a/eventmq/scheduler.py +++ b/eventmq/scheduler.py | |||
| @@ -150,8 +150,8 @@ class Scheduler(HeartbeatMixin, EMQPService): | |||
| 150 | msg = v[1] | 150 | msg = v[1] |
| 151 | queue = v[3] | 151 | queue = v[3] |
| 152 | 152 | ||
| 153 | logger.debug("Time is: %s; Schedule is: %s - Running %s" | 153 | logger.debug("Time is: {}; Schedule is: {} - Running {} " |
| 154 | % (ts_now, v[0], msg)) | 154 | "({})".format(m_now, v[0], k, msg)) |
| 155 | 155 | ||
| 156 | # v[4] is the current remaining run_count | 156 | # v[4] is the current remaining run_count |
| 157 | if v[4] != INFINITE_RUN_COUNT: | 157 | if v[4] != INFINITE_RUN_COUNT: |
| @@ -161,42 +161,41 @@ class Scheduler(HeartbeatMixin, EMQPService): | |||
| 161 | else: | 161 | else: |
| 162 | # Decrement run_count | 162 | # Decrement run_count |
| 163 | v[4] -= 1 | 163 | v[4] -= 1 |
| 164 | # Persist the change to redis | 164 | if v[4] > 0: |
| 165 | try: | 165 | # Update the next run time |
| 166 | message = deserialize(self.redis_server.get(k)) | 166 | v[0] = next(v[2]) |
| 167 | new_headers = [] | 167 | # Persist the change to redis if there are run |
| 168 | for header in message[1].split(','): | 168 | # counts still left |
| 169 | if 'run_count:' in header: | 169 | try: |
| 170 | new_headers.append( | 170 | message = deserialize( |
| 171 | 'run_count:{}'.format(v[4])) | 171 | self.redis_server.get(k)) |
| 172 | else: | 172 | new_headers = [] |
| 173 | new_headers.append(header) | 173 | for header in message[1].split(','): |
| 174 | message[1] = ",".join(new_headers) | 174 | if 'run_count:' in header: |
| 175 | self.redis_server.set(k, serialize(message)) | 175 | new_headers.append( |
| 176 | except Exception as e: | 176 | 'run_count:{}'.format(v[4])) |
| 177 | logger.warning( | 177 | else: |
| 178 | 'Unable to update key in redis ' | 178 | new_headers.append(header) |
| 179 | 'server: {}'.format(e)) | 179 | message[1] = ",".join(new_headers) |
| 180 | # Perform the request since run_count still > 0 | 180 | self.redis_server.set( |
| 181 | k, serialize(message)) | ||
| 182 | except Exception as e: | ||
| 183 | logger.warning( | ||
| 184 | 'Unable to update key in redis ' | ||
| 185 | 'server: {}'.format(e)) | ||
| 186 | else: | ||
| 187 | cancel_jobs.append(k) | ||
| 188 | |||
| 189 | # Perform the job since run_count was still > 0 | ||
| 181 | self.send_request(msg, queue=queue) | 190 | self.send_request(msg, queue=queue) |
| 182 | v[0] = next(v[2]) | ||
| 183 | else: | 191 | else: |
| 184 | # Scheduled job is in running infinitely | 192 | # Scheduled job is in running infinitely |
| 185 | # Send job and update next schedule time | 193 | # Send job and update next schedule time |
| 186 | self.send_request(msg, queue=queue) | 194 | self.send_request(msg, queue=queue) |
| 187 | v[0] = next(v[2]) | 195 | v[0] = next(v[2]) |
| 188 | 196 | ||
| 189 | for job in cancel_jobs: | 197 | for job_id in cancel_jobs: |
| 190 | try: | 198 | self.cancel_job(job_id) |
| 191 | logger.debug('Cancelling job due to run_count: {}' | ||
| 192 | .format(k)) | ||
| 193 | self.redis_server.delete(k) | ||
| 194 | self.redis_server.lrem('interval_jobs', 0, k) | ||
| 195 | except Exception as e: | ||
| 196 | logger.warning( | ||
| 197 | 'Unable to update key in redis ' | ||
| 198 | 'server: {}'.format(e)) | ||
| 199 | del self.interval_jobs[k] | ||
| 200 | 199 | ||
| 201 | if not self.maybe_send_heartbeat(events): | 200 | if not self.maybe_send_heartbeat(events): |
| 202 | break | 201 | break |
| @@ -253,24 +252,30 @@ class Scheduler(HeartbeatMixin, EMQPService): | |||
| 253 | """ | 252 | """ |
| 254 | logger.info("Received new UNSCHEDULE request: {}".format(message)) | 253 | logger.info("Received new UNSCHEDULE request: {}".format(message)) |
| 255 | 254 | ||
| 255 | schedule_hash = self.schedule_hash(message) | ||
| 256 | # TODO: Notify router whether or not this succeeds | 256 | # TODO: Notify router whether or not this succeeds |
| 257 | self.unschedule_job(message) | 257 | self.cancel_job(schedule_hash) |
| 258 | 258 | ||
| 259 | def unschedule_job(self, message): | 259 | def cancel_job(self, schedule_hash): |
| 260 | """ | 260 | """ |
| 261 | Unschedules a job if it exists based on the message used to generate it | 261 | Cancels a job if it exists |
| 262 | """ | ||
| 263 | schedule_hash = self.schedule_hash(message) | ||
| 264 | 262 | ||
| 263 | Args: | ||
| 264 | schedule_hash (str): The schedule's unique hash. See | ||
| 265 | :meth:`Scheduler.schedule_hash` | ||
| 266 | """ | ||
| 265 | if schedule_hash in self.interval_jobs: | 267 | if schedule_hash in self.interval_jobs: |
| 266 | # Remove scheduled job | 268 | |
| 267 | self.interval_jobs.pop(schedule_hash) | 269 | # If the hash wasn't found in either `cron_jobs` or `interval_jobs` |
| 268 | elif schedule_hash in self.cron_jobs: | 270 | # then it's safe to assume it's already deleted. |
| 269 | # Remove scheduled job | 271 | try: |
| 270 | self.cron_jobs.pop(schedule_hash) | 272 | del self.interval_jobs[schedule_hash] |
| 271 | else: | 273 | except KeyError: |
| 272 | logger.warning("Couldn't find matching schedule for unschedule " + | 274 | pass |
| 273 | "request") | 275 | try: |
| 276 | del self.cron_jobs[schedule_hash] | ||
| 277 | except KeyError: | ||
| 278 | pass | ||
| 274 | 279 | ||
| 275 | # Double check the redis server even if we didn't find the hash | 280 | # Double check the redis server even if we didn't find the hash |
| 276 | # in memory | 281 | # in memory |
| @@ -279,10 +284,10 @@ class Scheduler(HeartbeatMixin, EMQPService): | |||
| 279 | self.redis_server.delete(schedule_hash) | 284 | self.redis_server.delete(schedule_hash) |
| 280 | self.redis_server.lrem('interval_jobs', 0, schedule_hash) | 285 | self.redis_server.lrem('interval_jobs', 0, schedule_hash) |
| 281 | self.redis_server.save() | 286 | self.redis_server.save() |
| 282 | except redis.ConnectionError: | 287 | except redis.ConnectionError as e: |
| 283 | logger.warning('Could not contact redis server') | 288 | logger.warning('Could not contact redis server: {}'.format(e)) |
| 284 | except Exception as e: | 289 | except Exception as e: |
| 285 | logger.warning(str(e)) | 290 | logger.warning(str(e), exc_info=True) |
| 286 | 291 | ||
| 287 | def load_job_from_redis(self, message): | 292 | def load_job_from_redis(self, message): |
| 288 | """ | 293 | """ |
diff --git a/eventmq/utils/timeutils.py b/eventmq/utils/timeutils.py index b33e18d..12cf7cc 100644 --- a/eventmq/utils/timeutils.py +++ b/eventmq/utils/timeutils.py | |||
| @@ -63,19 +63,23 @@ class IntervalIter(object): | |||
| 63 | next(interval) # 300 | 63 | next(interval) # 300 |
| 64 | next(interval) # 600 | 64 | next(interval) # 600 |
| 65 | """ | 65 | """ |
| 66 | def __init__(self, start_value, interval_secs): | 66 | def __init__(self, start_value, interval_secs, iterate_immediately=False): |
| 67 | """ | 67 | """ |
| 68 | Args: | 68 | Args: |
| 69 | start_value (numeric) - the timestamp to begin with. usually gotten | 69 | start_value (numeric): the timestamp to begin with. usually gotten |
| 70 | via :func:`monotonic` or :func:`timestamp` | 70 | via :func:`monotonic` or :func:`timestamp` |
| 71 | interval_secs (int) - the number of seconds between intervals | 71 | interval_secs (int): the number of seconds between intervals |
| 72 | iterate_immediately (bool): When this is ``True`` the value of this | ||
| 73 | iterator will increase ``start_value`` by ``interval_secs`` | ||
| 74 | immediately. *Default is False* | ||
| 72 | """ | 75 | """ |
| 73 | self.current = start_value | 76 | self.current = start_value |
| 74 | self.interval_secs = interval_secs | 77 | self.interval_secs = interval_secs |
| 75 | 78 | ||
| 76 | # iterate the first time so the first call to .next() is interval_secs | 79 | # iterate the first time so the first call to .next() is interval_secs |
| 77 | # + start_value | 80 | # + start_value |
| 78 | self.__next__() | 81 | if iterate_immediately: |
| 82 | self.__next__() | ||
| 79 | 83 | ||
| 80 | def __iter__(self): | 84 | def __iter__(self): |
| 81 | return self | 85 | return self |