aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--eventmq/scheduler.py99
-rw-r--r--eventmq/utils/timeutils.py12
2 files changed, 60 insertions, 51 deletions
diff --git a/eventmq/scheduler.py b/eventmq/scheduler.py
index c1103b9..32c9b46 100644
--- a/eventmq/scheduler.py
+++ b/eventmq/scheduler.py
@@ -150,8 +150,8 @@ class Scheduler(HeartbeatMixin, EMQPService):
150 msg = v[1] 150 msg = v[1]
151 queue = v[3] 151 queue = v[3]
152 152
153 logger.debug("Time is: %s; Schedule is: %s - Running %s" 153 logger.debug("Time is: {}; Schedule is: {} - Running {} "
154 % (ts_now, v[0], msg)) 154 "({})".format(m_now, v[0], k, msg))
155 155
156 # v[4] is the current remaining run_count 156 # v[4] is the current remaining run_count
157 if v[4] != INFINITE_RUN_COUNT: 157 if v[4] != INFINITE_RUN_COUNT:
@@ -161,42 +161,41 @@ class Scheduler(HeartbeatMixin, EMQPService):
161 else: 161 else:
162 # Decrement run_count 162 # Decrement run_count
163 v[4] -= 1 163 v[4] -= 1
164 # Persist the change to redis 164 if v[4] > 0:
165 try: 165 # Update the next run time
166 message = deserialize(self.redis_server.get(k)) 166 v[0] = next(v[2])
167 new_headers = [] 167 # Persist the change to redis if there are run
168 for header in message[1].split(','): 168 # counts still left
169 if 'run_count:' in header: 169 try:
170 new_headers.append( 170 message = deserialize(
171 'run_count:{}'.format(v[4])) 171 self.redis_server.get(k))
172 else: 172 new_headers = []
173 new_headers.append(header) 173 for header in message[1].split(','):
174 message[1] = ",".join(new_headers) 174 if 'run_count:' in header:
175 self.redis_server.set(k, serialize(message)) 175 new_headers.append(
176 except Exception as e: 176 'run_count:{}'.format(v[4]))
177 logger.warning( 177 else:
178 'Unable to update key in redis ' 178 new_headers.append(header)
179 'server: {}'.format(e)) 179 message[1] = ",".join(new_headers)
180 # Perform the request since run_count still > 0 180 self.redis_server.set(
181 k, serialize(message))
182 except Exception as e:
183 logger.warning(
184 'Unable to update key in redis '
185 'server: {}'.format(e))
186 else:
187 cancel_jobs.append(k)
188
189 # Perform the job since run_count was still > 0
181 self.send_request(msg, queue=queue) 190 self.send_request(msg, queue=queue)
182 v[0] = next(v[2])
183 else: 191 else:
184 # Scheduled job is in running infinitely 192 # Scheduled job is in running infinitely
185 # Send job and update next schedule time 193 # Send job and update next schedule time
186 self.send_request(msg, queue=queue) 194 self.send_request(msg, queue=queue)
187 v[0] = next(v[2]) 195 v[0] = next(v[2])
188 196
189 for job in cancel_jobs: 197 for job_id in cancel_jobs:
190 try: 198 self.cancel_job(job_id)
191 logger.debug('Cancelling job due to run_count: {}'
192 .format(k))
193 self.redis_server.delete(k)
194 self.redis_server.lrem('interval_jobs', 0, k)
195 except Exception as e:
196 logger.warning(
197 'Unable to update key in redis '
198 'server: {}'.format(e))
199 del self.interval_jobs[k]
200 199
201 if not self.maybe_send_heartbeat(events): 200 if not self.maybe_send_heartbeat(events):
202 break 201 break
@@ -253,24 +252,30 @@ class Scheduler(HeartbeatMixin, EMQPService):
253 """ 252 """
254 logger.info("Received new UNSCHEDULE request: {}".format(message)) 253 logger.info("Received new UNSCHEDULE request: {}".format(message))
255 254
255 schedule_hash = self.schedule_hash(message)
256 # TODO: Notify router whether or not this succeeds 256 # TODO: Notify router whether or not this succeeds
257 self.unschedule_job(message) 257 self.cancel_job(schedule_hash)
258 258
259 def unschedule_job(self, message): 259 def cancel_job(self, schedule_hash):
260 """ 260 """
261 Unschedules a job if it exists based on the message used to generate it 261 Cancels a job if it exists
262 """
263 schedule_hash = self.schedule_hash(message)
264 262
263 Args:
264 schedule_hash (str): The schedule's unique hash. See
265 :meth:`Scheduler.schedule_hash`
266 """
265 if schedule_hash in self.interval_jobs: 267 if schedule_hash in self.interval_jobs:
266 # Remove scheduled job 268
267 self.interval_jobs.pop(schedule_hash) 269 # If the hash wasn't found in either `cron_jobs` or `interval_jobs`
268 elif schedule_hash in self.cron_jobs: 270 # then it's safe to assume it's already deleted.
269 # Remove scheduled job 271 try:
270 self.cron_jobs.pop(schedule_hash) 272 del self.interval_jobs[schedule_hash]
271 else: 273 except KeyError:
272 logger.warning("Couldn't find matching schedule for unschedule " + 274 pass
273 "request") 275 try:
276 del self.cron_jobs[schedule_hash]
277 except KeyError:
278 pass
274 279
275 # Double check the redis server even if we didn't find the hash 280 # Double check the redis server even if we didn't find the hash
276 # in memory 281 # in memory
@@ -279,10 +284,10 @@ class Scheduler(HeartbeatMixin, EMQPService):
279 self.redis_server.delete(schedule_hash) 284 self.redis_server.delete(schedule_hash)
280 self.redis_server.lrem('interval_jobs', 0, schedule_hash) 285 self.redis_server.lrem('interval_jobs', 0, schedule_hash)
281 self.redis_server.save() 286 self.redis_server.save()
282 except redis.ConnectionError: 287 except redis.ConnectionError as e:
283 logger.warning('Could not contact redis server') 288 logger.warning('Could not contact redis server: {}'.format(e))
284 except Exception as e: 289 except Exception as e:
285 logger.warning(str(e)) 290 logger.warning(str(e), exc_info=True)
286 291
287 def load_job_from_redis(self, message): 292 def load_job_from_redis(self, message):
288 """ 293 """
diff --git a/eventmq/utils/timeutils.py b/eventmq/utils/timeutils.py
index b33e18d..12cf7cc 100644
--- a/eventmq/utils/timeutils.py
+++ b/eventmq/utils/timeutils.py
@@ -63,19 +63,23 @@ class IntervalIter(object):
63 next(interval) # 300 63 next(interval) # 300
64 next(interval) # 600 64 next(interval) # 600
65 """ 65 """
66 def __init__(self, start_value, interval_secs): 66 def __init__(self, start_value, interval_secs, iterate_immediately=False):
67 """ 67 """
68 Args: 68 Args:
69 start_value (numeric) - the timestamp to begin with. usually gotten 69 start_value (numeric): the timestamp to begin with. usually gotten
70 via :func:`monotonic` or :func:`timestamp` 70 via :func:`monotonic` or :func:`timestamp`
71 interval_secs (int) - the number of seconds between intervals 71 interval_secs (int): the number of seconds between intervals
72 iterate_immediately (bool): When this is ``True`` the value of this
73 iterator will increase ``start_value`` by ``interval_secs``
74 immediately. *Default is False*
72 """ 75 """
73 self.current = start_value 76 self.current = start_value
74 self.interval_secs = interval_secs 77 self.interval_secs = interval_secs
75 78
76 # iterate the first time so the first call to .next() is interval_secs 79 # iterate the first time so the first call to .next() is interval_secs
77 # + start_value 80 # + start_value
78 self.__next__() 81 if iterate_immediately:
82 self.__next__()
79 83
80 def __iter__(self): 84 def __iter__(self):
81 return self 85 return self