aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorjason2017-12-14 17:07:50 -0700
committerjason2017-12-15 09:03:24 -0700
commit2a1dac6a47553e6522fecd11b184c26e10cb30f4 (patch)
treeaa6b6e1a2b91ecdb71fb3e8a6de044864844ce07
parent900a1e4b95a19f281cd78421af8b327351eedb56 (diff)
downloadeventmq-2a1dac6a47553e6522fecd11b184c26e10cb30f4.tar.gz
eventmq-2a1dac6a47553e6522fecd11b184c26e10cb30f4.zip
Fix bug in scheduler; Optimize scheduler code a bit
- Fixes a bug when cancelling schedules due to run count. The schedule cancelled weren't the schedules that were marked for cancellation, but the schedule that was last checked to see if it needed to be executed. (use `job_id` from the cancellation loop instead of the job id from the previous look `k`) - Consolidate the job cancellation logic. The main loop and `unschedule` both implemented the same logic. This was consolidated into a new `cancel_job` method - Optimize the main loop a bit. If `run_count` is 0 then don't write to redis and cancel on the next loop. Instead just cancel the schedule.
-rw-r--r--eventmq/scheduler.py99
-rw-r--r--eventmq/utils/timeutils.py12
2 files changed, 60 insertions, 51 deletions
diff --git a/eventmq/scheduler.py b/eventmq/scheduler.py
index c1103b9..32c9b46 100644
--- a/eventmq/scheduler.py
+++ b/eventmq/scheduler.py
@@ -150,8 +150,8 @@ class Scheduler(HeartbeatMixin, EMQPService):
150 msg = v[1] 150 msg = v[1]
151 queue = v[3] 151 queue = v[3]
152 152
153 logger.debug("Time is: %s; Schedule is: %s - Running %s" 153 logger.debug("Time is: {}; Schedule is: {} - Running {} "
154 % (ts_now, v[0], msg)) 154 "({})".format(m_now, v[0], k, msg))
155 155
156 # v[4] is the current remaining run_count 156 # v[4] is the current remaining run_count
157 if v[4] != INFINITE_RUN_COUNT: 157 if v[4] != INFINITE_RUN_COUNT:
@@ -161,42 +161,41 @@ class Scheduler(HeartbeatMixin, EMQPService):
161 else: 161 else:
162 # Decrement run_count 162 # Decrement run_count
163 v[4] -= 1 163 v[4] -= 1
164 # Persist the change to redis 164 if v[4] > 0:
165 try: 165 # Update the next run time
166 message = deserialize(self.redis_server.get(k)) 166 v[0] = next(v[2])
167 new_headers = [] 167 # Persist the change to redis if there are run
168 for header in message[1].split(','): 168 # counts still left
169 if 'run_count:' in header: 169 try:
170 new_headers.append( 170 message = deserialize(
171 'run_count:{}'.format(v[4])) 171 self.redis_server.get(k))
172 else: 172 new_headers = []
173 new_headers.append(header) 173 for header in message[1].split(','):
174 message[1] = ",".join(new_headers) 174 if 'run_count:' in header:
175 self.redis_server.set(k, serialize(message)) 175 new_headers.append(
176 except Exception as e: 176 'run_count:{}'.format(v[4]))
177 logger.warning( 177 else:
178 'Unable to update key in redis ' 178 new_headers.append(header)
179 'server: {}'.format(e)) 179 message[1] = ",".join(new_headers)
180 # Perform the request since run_count still > 0 180 self.redis_server.set(
181 k, serialize(message))
182 except Exception as e:
183 logger.warning(
184 'Unable to update key in redis '
185 'server: {}'.format(e))
186 else:
187 cancel_jobs.append(k)
188
189 # Perform the job since run_count was still > 0
181 self.send_request(msg, queue=queue) 190 self.send_request(msg, queue=queue)
182 v[0] = next(v[2])
183 else: 191 else:
184 # Scheduled job is in running infinitely 192 # Scheduled job is in running infinitely
185 # Send job and update next schedule time 193 # Send job and update next schedule time
186 self.send_request(msg, queue=queue) 194 self.send_request(msg, queue=queue)
187 v[0] = next(v[2]) 195 v[0] = next(v[2])
188 196
189 for job in cancel_jobs: 197 for job_id in cancel_jobs:
190 try: 198 self.cancel_job(job_id)
191 logger.debug('Cancelling job due to run_count: {}'
192 .format(k))
193 self.redis_server.delete(k)
194 self.redis_server.lrem('interval_jobs', 0, k)
195 except Exception as e:
196 logger.warning(
197 'Unable to update key in redis '
198 'server: {}'.format(e))
199 del self.interval_jobs[k]
200 199
201 if not self.maybe_send_heartbeat(events): 200 if not self.maybe_send_heartbeat(events):
202 break 201 break
@@ -253,24 +252,30 @@ class Scheduler(HeartbeatMixin, EMQPService):
253 """ 252 """
254 logger.info("Received new UNSCHEDULE request: {}".format(message)) 253 logger.info("Received new UNSCHEDULE request: {}".format(message))
255 254
255 schedule_hash = self.schedule_hash(message)
256 # TODO: Notify router whether or not this succeeds 256 # TODO: Notify router whether or not this succeeds
257 self.unschedule_job(message) 257 self.cancel_job(schedule_hash)
258 258
259 def unschedule_job(self, message): 259 def cancel_job(self, schedule_hash):
260 """ 260 """
261 Unschedules a job if it exists based on the message used to generate it 261 Cancels a job if it exists
262 """
263 schedule_hash = self.schedule_hash(message)
264 262
263 Args:
264 schedule_hash (str): The schedule's unique hash. See
265 :meth:`Scheduler.schedule_hash`
266 """
265 if schedule_hash in self.interval_jobs: 267 if schedule_hash in self.interval_jobs:
266 # Remove scheduled job 268
267 self.interval_jobs.pop(schedule_hash) 269 # If the hash wasn't found in either `cron_jobs` or `interval_jobs`
268 elif schedule_hash in self.cron_jobs: 270 # then it's safe to assume it's already deleted.
269 # Remove scheduled job 271 try:
270 self.cron_jobs.pop(schedule_hash) 272 del self.interval_jobs[schedule_hash]
271 else: 273 except KeyError:
272 logger.warning("Couldn't find matching schedule for unschedule " + 274 pass
273 "request") 275 try:
276 del self.cron_jobs[schedule_hash]
277 except KeyError:
278 pass
274 279
275 # Double check the redis server even if we didn't find the hash 280 # Double check the redis server even if we didn't find the hash
276 # in memory 281 # in memory
@@ -279,10 +284,10 @@ class Scheduler(HeartbeatMixin, EMQPService):
279 self.redis_server.delete(schedule_hash) 284 self.redis_server.delete(schedule_hash)
280 self.redis_server.lrem('interval_jobs', 0, schedule_hash) 285 self.redis_server.lrem('interval_jobs', 0, schedule_hash)
281 self.redis_server.save() 286 self.redis_server.save()
282 except redis.ConnectionError: 287 except redis.ConnectionError as e:
283 logger.warning('Could not contact redis server') 288 logger.warning('Could not contact redis server: {}'.format(e))
284 except Exception as e: 289 except Exception as e:
285 logger.warning(str(e)) 290 logger.warning(str(e), exc_info=True)
286 291
287 def load_job_from_redis(self, message): 292 def load_job_from_redis(self, message):
288 """ 293 """
diff --git a/eventmq/utils/timeutils.py b/eventmq/utils/timeutils.py
index b33e18d..12cf7cc 100644
--- a/eventmq/utils/timeutils.py
+++ b/eventmq/utils/timeutils.py
@@ -63,19 +63,23 @@ class IntervalIter(object):
63 next(interval) # 300 63 next(interval) # 300
64 next(interval) # 600 64 next(interval) # 600
65 """ 65 """
66 def __init__(self, start_value, interval_secs): 66 def __init__(self, start_value, interval_secs, iterate_immediately=False):
67 """ 67 """
68 Args: 68 Args:
69 start_value (numeric) - the timestamp to begin with. usually gotten 69 start_value (numeric): the timestamp to begin with. usually gotten
70 via :func:`monotonic` or :func:`timestamp` 70 via :func:`monotonic` or :func:`timestamp`
71 interval_secs (int) - the number of seconds between intervals 71 interval_secs (int): the number of seconds between intervals
72 iterate_immediately (bool): When this is ``True`` the value of this
73 iterator will increase ``start_value`` by ``interval_secs``
74 immediately. *Default is False*
72 """ 75 """
73 self.current = start_value 76 self.current = start_value
74 self.interval_secs = interval_secs 77 self.interval_secs = interval_secs
75 78
76 # iterate the first time so the first call to .next() is interval_secs 79 # iterate the first time so the first call to .next() is interval_secs
77 # + start_value 80 # + start_value
78 self.__next__() 81 if iterate_immediately:
82 self.__next__()
79 83
80 def __iter__(self): 84 def __iter__(self):
81 return self 85 return self