diff options
| author | jason | 2017-12-15 12:57:58 -0700 |
|---|---|---|
| committer | GitHub | 2017-12-15 12:57:58 -0700 |
| commit | ceee04e8afd4c8e960a6620dc2b45f3ba97b74a2 (patch) | |
| tree | aa6b6e1a2b91ecdb71fb3e8a6de044864844ce07 | |
| parent | c170ffb61daa34399312762067c379d107918a96 (diff) | |
| parent | 2a1dac6a47553e6522fecd11b184c26e10cb30f4 (diff) | |
| download | eventmq-ceee04e8afd4c8e960a6620dc2b45f3ba97b74a2.tar.gz eventmq-ceee04e8afd4c8e960a6620dc2b45f3ba97b74a2.zip | |
Merge pull request #57 from com4/master
Fix bug in scheduler; Optimize scheduler code a bit
| -rw-r--r-- | eventmq/router.py | 18 | ||||
| -rw-r--r-- | eventmq/scheduler.py | 99 | ||||
| -rw-r--r-- | eventmq/utils/timeutils.py | 12 |
3 files changed, 69 insertions, 60 deletions
diff --git a/eventmq/router.py b/eventmq/router.py index c35e88c..0b896b3 100644 --- a/eventmq/router.py +++ b/eventmq/router.py | |||
| @@ -175,15 +175,6 @@ class Router(HeartbeatMixin): | |||
| 175 | now = monotonic() | 175 | now = monotonic() |
| 176 | events = self.poller.poll() | 176 | events = self.poller.poll() |
| 177 | 177 | ||
| 178 | if events.get(self.incoming) == poller.POLLIN: | ||
| 179 | msg = self.incoming.recv_multipart() | ||
| 180 | self.handle_wal_log(msg) | ||
| 181 | self.process_client_message(msg) | ||
| 182 | |||
| 183 | if events.get(self.outgoing) == poller.POLLIN: | ||
| 184 | msg = self.outgoing.recv_multipart() | ||
| 185 | self.process_worker_message(msg) | ||
| 186 | |||
| 187 | if events.get(self.administrative_socket) == poller.POLLIN: | 178 | if events.get(self.administrative_socket) == poller.POLLIN: |
| 188 | msg = self.administrative_socket.recv_multipart() | 179 | msg = self.administrative_socket.recv_multipart() |
| 189 | if conf.SUPER_DEBUG: | 180 | if conf.SUPER_DEBUG: |
| @@ -207,6 +198,15 @@ class Router(HeartbeatMixin): | |||
| 207 | sendmsg(self.administrative_socket, msg[0], 'REPLY', | 198 | sendmsg(self.administrative_socket, msg[0], 'REPLY', |
| 208 | (self.get_schedulers_status(),)) | 199 | (self.get_schedulers_status(),)) |
| 209 | 200 | ||
| 201 | if events.get(self.incoming) == poller.POLLIN: | ||
| 202 | msg = self.incoming.recv_multipart() | ||
| 203 | self.handle_wal_log(msg) | ||
| 204 | self.process_client_message(msg) | ||
| 205 | |||
| 206 | if events.get(self.outgoing) == poller.POLLIN: | ||
| 207 | msg = self.outgoing.recv_multipart() | ||
| 208 | self.process_worker_message(msg) | ||
| 209 | |||
| 210 | # TODO: Optimization: the calls to functions could be done in | 210 | # TODO: Optimization: the calls to functions could be done in |
| 211 | # another thread so they don't block the loop. synchronize | 211 | # another thread so they don't block the loop. synchronize |
| 212 | if not conf.DISABLE_HEARTBEATS: | 212 | if not conf.DISABLE_HEARTBEATS: |
diff --git a/eventmq/scheduler.py b/eventmq/scheduler.py index c1103b9..32c9b46 100644 --- a/eventmq/scheduler.py +++ b/eventmq/scheduler.py | |||
| @@ -150,8 +150,8 @@ class Scheduler(HeartbeatMixin, EMQPService): | |||
| 150 | msg = v[1] | 150 | msg = v[1] |
| 151 | queue = v[3] | 151 | queue = v[3] |
| 152 | 152 | ||
| 153 | logger.debug("Time is: %s; Schedule is: %s - Running %s" | 153 | logger.debug("Time is: {}; Schedule is: {} - Running {} " |
| 154 | % (ts_now, v[0], msg)) | 154 | "({})".format(m_now, v[0], k, msg)) |
| 155 | 155 | ||
| 156 | # v[4] is the current remaining run_count | 156 | # v[4] is the current remaining run_count |
| 157 | if v[4] != INFINITE_RUN_COUNT: | 157 | if v[4] != INFINITE_RUN_COUNT: |
| @@ -161,42 +161,41 @@ class Scheduler(HeartbeatMixin, EMQPService): | |||
| 161 | else: | 161 | else: |
| 162 | # Decrement run_count | 162 | # Decrement run_count |
| 163 | v[4] -= 1 | 163 | v[4] -= 1 |
| 164 | # Persist the change to redis | 164 | if v[4] > 0: |
| 165 | try: | 165 | # Update the next run time |
| 166 | message = deserialize(self.redis_server.get(k)) | 166 | v[0] = next(v[2]) |
| 167 | new_headers = [] | 167 | # Persist the change to redis if there are run |
| 168 | for header in message[1].split(','): | 168 | # counts still left |
| 169 | if 'run_count:' in header: | 169 | try: |
| 170 | new_headers.append( | 170 | message = deserialize( |
| 171 | 'run_count:{}'.format(v[4])) | 171 | self.redis_server.get(k)) |
| 172 | else: | 172 | new_headers = [] |
| 173 | new_headers.append(header) | 173 | for header in message[1].split(','): |
| 174 | message[1] = ",".join(new_headers) | 174 | if 'run_count:' in header: |
| 175 | self.redis_server.set(k, serialize(message)) | 175 | new_headers.append( |
| 176 | except Exception as e: | 176 | 'run_count:{}'.format(v[4])) |
| 177 | logger.warning( | 177 | else: |
| 178 | 'Unable to update key in redis ' | 178 | new_headers.append(header) |
| 179 | 'server: {}'.format(e)) | 179 | message[1] = ",".join(new_headers) |
| 180 | # Perform the request since run_count still > 0 | 180 | self.redis_server.set( |
| 181 | k, serialize(message)) | ||
| 182 | except Exception as e: | ||
| 183 | logger.warning( | ||
| 184 | 'Unable to update key in redis ' | ||
| 185 | 'server: {}'.format(e)) | ||
| 186 | else: | ||
| 187 | cancel_jobs.append(k) | ||
| 188 | |||
| 189 | # Perform the job since run_count was still > 0 | ||
| 181 | self.send_request(msg, queue=queue) | 190 | self.send_request(msg, queue=queue) |
| 182 | v[0] = next(v[2]) | ||
| 183 | else: | 191 | else: |
| 184 | # Scheduled job is in running infinitely | 192 | # Scheduled job is in running infinitely |
| 185 | # Send job and update next schedule time | 193 | # Send job and update next schedule time |
| 186 | self.send_request(msg, queue=queue) | 194 | self.send_request(msg, queue=queue) |
| 187 | v[0] = next(v[2]) | 195 | v[0] = next(v[2]) |
| 188 | 196 | ||
| 189 | for job in cancel_jobs: | 197 | for job_id in cancel_jobs: |
| 190 | try: | 198 | self.cancel_job(job_id) |
| 191 | logger.debug('Cancelling job due to run_count: {}' | ||
| 192 | .format(k)) | ||
| 193 | self.redis_server.delete(k) | ||
| 194 | self.redis_server.lrem('interval_jobs', 0, k) | ||
| 195 | except Exception as e: | ||
| 196 | logger.warning( | ||
| 197 | 'Unable to update key in redis ' | ||
| 198 | 'server: {}'.format(e)) | ||
| 199 | del self.interval_jobs[k] | ||
| 200 | 199 | ||
| 201 | if not self.maybe_send_heartbeat(events): | 200 | if not self.maybe_send_heartbeat(events): |
| 202 | break | 201 | break |
| @@ -253,24 +252,30 @@ class Scheduler(HeartbeatMixin, EMQPService): | |||
| 253 | """ | 252 | """ |
| 254 | logger.info("Received new UNSCHEDULE request: {}".format(message)) | 253 | logger.info("Received new UNSCHEDULE request: {}".format(message)) |
| 255 | 254 | ||
| 255 | schedule_hash = self.schedule_hash(message) | ||
| 256 | # TODO: Notify router whether or not this succeeds | 256 | # TODO: Notify router whether or not this succeeds |
| 257 | self.unschedule_job(message) | 257 | self.cancel_job(schedule_hash) |
| 258 | 258 | ||
| 259 | def unschedule_job(self, message): | 259 | def cancel_job(self, schedule_hash): |
| 260 | """ | 260 | """ |
| 261 | Unschedules a job if it exists based on the message used to generate it | 261 | Cancels a job if it exists |
| 262 | """ | ||
| 263 | schedule_hash = self.schedule_hash(message) | ||
| 264 | 262 | ||
| 263 | Args: | ||
| 264 | schedule_hash (str): The schedule's unique hash. See | ||
| 265 | :meth:`Scheduler.schedule_hash` | ||
| 266 | """ | ||
| 265 | if schedule_hash in self.interval_jobs: | 267 | if schedule_hash in self.interval_jobs: |
| 266 | # Remove scheduled job | 268 | |
| 267 | self.interval_jobs.pop(schedule_hash) | 269 | # If the hash wasn't found in either `cron_jobs` or `interval_jobs` |
| 268 | elif schedule_hash in self.cron_jobs: | 270 | # then it's safe to assume it's already deleted. |
| 269 | # Remove scheduled job | 271 | try: |
| 270 | self.cron_jobs.pop(schedule_hash) | 272 | del self.interval_jobs[schedule_hash] |
| 271 | else: | 273 | except KeyError: |
| 272 | logger.warning("Couldn't find matching schedule for unschedule " + | 274 | pass |
| 273 | "request") | 275 | try: |
| 276 | del self.cron_jobs[schedule_hash] | ||
| 277 | except KeyError: | ||
| 278 | pass | ||
| 274 | 279 | ||
| 275 | # Double check the redis server even if we didn't find the hash | 280 | # Double check the redis server even if we didn't find the hash |
| 276 | # in memory | 281 | # in memory |
| @@ -279,10 +284,10 @@ class Scheduler(HeartbeatMixin, EMQPService): | |||
| 279 | self.redis_server.delete(schedule_hash) | 284 | self.redis_server.delete(schedule_hash) |
| 280 | self.redis_server.lrem('interval_jobs', 0, schedule_hash) | 285 | self.redis_server.lrem('interval_jobs', 0, schedule_hash) |
| 281 | self.redis_server.save() | 286 | self.redis_server.save() |
| 282 | except redis.ConnectionError: | 287 | except redis.ConnectionError as e: |
| 283 | logger.warning('Could not contact redis server') | 288 | logger.warning('Could not contact redis server: {}'.format(e)) |
| 284 | except Exception as e: | 289 | except Exception as e: |
| 285 | logger.warning(str(e)) | 290 | logger.warning(str(e), exc_info=True) |
| 286 | 291 | ||
| 287 | def load_job_from_redis(self, message): | 292 | def load_job_from_redis(self, message): |
| 288 | """ | 293 | """ |
diff --git a/eventmq/utils/timeutils.py b/eventmq/utils/timeutils.py index b33e18d..12cf7cc 100644 --- a/eventmq/utils/timeutils.py +++ b/eventmq/utils/timeutils.py | |||
| @@ -63,19 +63,23 @@ class IntervalIter(object): | |||
| 63 | next(interval) # 300 | 63 | next(interval) # 300 |
| 64 | next(interval) # 600 | 64 | next(interval) # 600 |
| 65 | """ | 65 | """ |
| 66 | def __init__(self, start_value, interval_secs): | 66 | def __init__(self, start_value, interval_secs, iterate_immediately=False): |
| 67 | """ | 67 | """ |
| 68 | Args: | 68 | Args: |
| 69 | start_value (numeric) - the timestamp to begin with. usually gotten | 69 | start_value (numeric): the timestamp to begin with. usually gotten |
| 70 | via :func:`monotonic` or :func:`timestamp` | 70 | via :func:`monotonic` or :func:`timestamp` |
| 71 | interval_secs (int) - the number of seconds between intervals | 71 | interval_secs (int): the number of seconds between intervals |
| 72 | iterate_immediately (bool): When this is ``True`` the value of this | ||
| 73 | iterator will increase ``start_value`` by ``interval_secs`` | ||
| 74 | immediately. *Default is False* | ||
| 72 | """ | 75 | """ |
| 73 | self.current = start_value | 76 | self.current = start_value |
| 74 | self.interval_secs = interval_secs | 77 | self.interval_secs = interval_secs |
| 75 | 78 | ||
| 76 | # iterate the first time so the first call to .next() is interval_secs | 79 | # iterate the first time so the first call to .next() is interval_secs |
| 77 | # + start_value | 80 | # + start_value |
| 78 | self.__next__() | 81 | if iterate_immediately: |
| 82 | self.__next__() | ||
| 79 | 83 | ||
| 80 | def __iter__(self): | 84 | def __iter__(self): |
| 81 | return self | 85 | return self |