aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorjason2017-12-15 12:57:58 -0700
committerGitHub2017-12-15 12:57:58 -0700
commitceee04e8afd4c8e960a6620dc2b45f3ba97b74a2 (patch)
treeaa6b6e1a2b91ecdb71fb3e8a6de044864844ce07
parentc170ffb61daa34399312762067c379d107918a96 (diff)
parent2a1dac6a47553e6522fecd11b184c26e10cb30f4 (diff)
downloadeventmq-ceee04e8afd4c8e960a6620dc2b45f3ba97b74a2.tar.gz
eventmq-ceee04e8afd4c8e960a6620dc2b45f3ba97b74a2.zip
Merge pull request #57 from com4/master
Fix bug in scheduler; Optimize scheduler code a bit
-rw-r--r--eventmq/router.py18
-rw-r--r--eventmq/scheduler.py99
-rw-r--r--eventmq/utils/timeutils.py12
3 files changed, 69 insertions, 60 deletions
diff --git a/eventmq/router.py b/eventmq/router.py
index c35e88c..0b896b3 100644
--- a/eventmq/router.py
+++ b/eventmq/router.py
@@ -175,15 +175,6 @@ class Router(HeartbeatMixin):
175 now = monotonic() 175 now = monotonic()
176 events = self.poller.poll() 176 events = self.poller.poll()
177 177
178 if events.get(self.incoming) == poller.POLLIN:
179 msg = self.incoming.recv_multipart()
180 self.handle_wal_log(msg)
181 self.process_client_message(msg)
182
183 if events.get(self.outgoing) == poller.POLLIN:
184 msg = self.outgoing.recv_multipart()
185 self.process_worker_message(msg)
186
187 if events.get(self.administrative_socket) == poller.POLLIN: 178 if events.get(self.administrative_socket) == poller.POLLIN:
188 msg = self.administrative_socket.recv_multipart() 179 msg = self.administrative_socket.recv_multipart()
189 if conf.SUPER_DEBUG: 180 if conf.SUPER_DEBUG:
@@ -207,6 +198,15 @@ class Router(HeartbeatMixin):
207 sendmsg(self.administrative_socket, msg[0], 'REPLY', 198 sendmsg(self.administrative_socket, msg[0], 'REPLY',
208 (self.get_schedulers_status(),)) 199 (self.get_schedulers_status(),))
209 200
201 if events.get(self.incoming) == poller.POLLIN:
202 msg = self.incoming.recv_multipart()
203 self.handle_wal_log(msg)
204 self.process_client_message(msg)
205
206 if events.get(self.outgoing) == poller.POLLIN:
207 msg = self.outgoing.recv_multipart()
208 self.process_worker_message(msg)
209
210 # TODO: Optimization: the calls to functions could be done in 210 # TODO: Optimization: the calls to functions could be done in
211 # another thread so they don't block the loop. synchronize 211 # another thread so they don't block the loop. synchronize
212 if not conf.DISABLE_HEARTBEATS: 212 if not conf.DISABLE_HEARTBEATS:
diff --git a/eventmq/scheduler.py b/eventmq/scheduler.py
index c1103b9..32c9b46 100644
--- a/eventmq/scheduler.py
+++ b/eventmq/scheduler.py
@@ -150,8 +150,8 @@ class Scheduler(HeartbeatMixin, EMQPService):
150 msg = v[1] 150 msg = v[1]
151 queue = v[3] 151 queue = v[3]
152 152
153 logger.debug("Time is: %s; Schedule is: %s - Running %s" 153 logger.debug("Time is: {}; Schedule is: {} - Running {} "
154 % (ts_now, v[0], msg)) 154 "({})".format(m_now, v[0], k, msg))
155 155
156 # v[4] is the current remaining run_count 156 # v[4] is the current remaining run_count
157 if v[4] != INFINITE_RUN_COUNT: 157 if v[4] != INFINITE_RUN_COUNT:
@@ -161,42 +161,41 @@ class Scheduler(HeartbeatMixin, EMQPService):
161 else: 161 else:
162 # Decrement run_count 162 # Decrement run_count
163 v[4] -= 1 163 v[4] -= 1
164 # Persist the change to redis 164 if v[4] > 0:
165 try: 165 # Update the next run time
166 message = deserialize(self.redis_server.get(k)) 166 v[0] = next(v[2])
167 new_headers = [] 167 # Persist the change to redis if there are run
168 for header in message[1].split(','): 168 # counts still left
169 if 'run_count:' in header: 169 try:
170 new_headers.append( 170 message = deserialize(
171 'run_count:{}'.format(v[4])) 171 self.redis_server.get(k))
172 else: 172 new_headers = []
173 new_headers.append(header) 173 for header in message[1].split(','):
174 message[1] = ",".join(new_headers) 174 if 'run_count:' in header:
175 self.redis_server.set(k, serialize(message)) 175 new_headers.append(
176 except Exception as e: 176 'run_count:{}'.format(v[4]))
177 logger.warning( 177 else:
178 'Unable to update key in redis ' 178 new_headers.append(header)
179 'server: {}'.format(e)) 179 message[1] = ",".join(new_headers)
180 # Perform the request since run_count still > 0 180 self.redis_server.set(
181 k, serialize(message))
182 except Exception as e:
183 logger.warning(
184 'Unable to update key in redis '
185 'server: {}'.format(e))
186 else:
187 cancel_jobs.append(k)
188
189 # Perform the job since run_count was still > 0
181 self.send_request(msg, queue=queue) 190 self.send_request(msg, queue=queue)
182 v[0] = next(v[2])
183 else: 191 else:
184 # Scheduled job is in running infinitely 192 # Scheduled job is in running infinitely
185 # Send job and update next schedule time 193 # Send job and update next schedule time
186 self.send_request(msg, queue=queue) 194 self.send_request(msg, queue=queue)
187 v[0] = next(v[2]) 195 v[0] = next(v[2])
188 196
189 for job in cancel_jobs: 197 for job_id in cancel_jobs:
190 try: 198 self.cancel_job(job_id)
191 logger.debug('Cancelling job due to run_count: {}'
192 .format(k))
193 self.redis_server.delete(k)
194 self.redis_server.lrem('interval_jobs', 0, k)
195 except Exception as e:
196 logger.warning(
197 'Unable to update key in redis '
198 'server: {}'.format(e))
199 del self.interval_jobs[k]
200 199
201 if not self.maybe_send_heartbeat(events): 200 if not self.maybe_send_heartbeat(events):
202 break 201 break
@@ -253,24 +252,30 @@ class Scheduler(HeartbeatMixin, EMQPService):
253 """ 252 """
254 logger.info("Received new UNSCHEDULE request: {}".format(message)) 253 logger.info("Received new UNSCHEDULE request: {}".format(message))
255 254
255 schedule_hash = self.schedule_hash(message)
256 # TODO: Notify router whether or not this succeeds 256 # TODO: Notify router whether or not this succeeds
257 self.unschedule_job(message) 257 self.cancel_job(schedule_hash)
258 258
259 def unschedule_job(self, message): 259 def cancel_job(self, schedule_hash):
260 """ 260 """
261 Unschedules a job if it exists based on the message used to generate it 261 Cancels a job if it exists
262 """
263 schedule_hash = self.schedule_hash(message)
264 262
263 Args:
264 schedule_hash (str): The schedule's unique hash. See
265 :meth:`Scheduler.schedule_hash`
266 """
265 if schedule_hash in self.interval_jobs: 267 if schedule_hash in self.interval_jobs:
266 # Remove scheduled job 268
267 self.interval_jobs.pop(schedule_hash) 269 # If the hash wasn't found in either `cron_jobs` or `interval_jobs`
268 elif schedule_hash in self.cron_jobs: 270 # then it's safe to assume it's already deleted.
269 # Remove scheduled job 271 try:
270 self.cron_jobs.pop(schedule_hash) 272 del self.interval_jobs[schedule_hash]
271 else: 273 except KeyError:
272 logger.warning("Couldn't find matching schedule for unschedule " + 274 pass
273 "request") 275 try:
276 del self.cron_jobs[schedule_hash]
277 except KeyError:
278 pass
274 279
275 # Double check the redis server even if we didn't find the hash 280 # Double check the redis server even if we didn't find the hash
276 # in memory 281 # in memory
@@ -279,10 +284,10 @@ class Scheduler(HeartbeatMixin, EMQPService):
279 self.redis_server.delete(schedule_hash) 284 self.redis_server.delete(schedule_hash)
280 self.redis_server.lrem('interval_jobs', 0, schedule_hash) 285 self.redis_server.lrem('interval_jobs', 0, schedule_hash)
281 self.redis_server.save() 286 self.redis_server.save()
282 except redis.ConnectionError: 287 except redis.ConnectionError as e:
283 logger.warning('Could not contact redis server') 288 logger.warning('Could not contact redis server: {}'.format(e))
284 except Exception as e: 289 except Exception as e:
285 logger.warning(str(e)) 290 logger.warning(str(e), exc_info=True)
286 291
287 def load_job_from_redis(self, message): 292 def load_job_from_redis(self, message):
288 """ 293 """
diff --git a/eventmq/utils/timeutils.py b/eventmq/utils/timeutils.py
index b33e18d..12cf7cc 100644
--- a/eventmq/utils/timeutils.py
+++ b/eventmq/utils/timeutils.py
@@ -63,19 +63,23 @@ class IntervalIter(object):
63 next(interval) # 300 63 next(interval) # 300
64 next(interval) # 600 64 next(interval) # 600
65 """ 65 """
66 def __init__(self, start_value, interval_secs): 66 def __init__(self, start_value, interval_secs, iterate_immediately=False):
67 """ 67 """
68 Args: 68 Args:
69 start_value (numeric) - the timestamp to begin with. usually gotten 69 start_value (numeric): the timestamp to begin with. usually gotten
70 via :func:`monotonic` or :func:`timestamp` 70 via :func:`monotonic` or :func:`timestamp`
71 interval_secs (int) - the number of seconds between intervals 71 interval_secs (int): the number of seconds between intervals
72 iterate_immediately (bool): When this is ``True`` the value of this
73 iterator will increase ``start_value`` by ``interval_secs``
74 immediately. *Default is False*
72 """ 75 """
73 self.current = start_value 76 self.current = start_value
74 self.interval_secs = interval_secs 77 self.interval_secs = interval_secs
75 78
76 # iterate the first time so the first call to .next() is interval_secs 79 # iterate the first time so the first call to .next() is interval_secs
77 # + start_value 80 # + start_value
78 self.__next__() 81 if iterate_immediately:
82 self.__next__()
79 83
80 def __iter__(self): 84 def __iter__(self):
81 return self 85 return self