diff options
| author | David Hurst | 2017-01-18 17:35:46 -0700 |
|---|---|---|
| committer | GitHub | 2017-01-18 17:35:46 -0700 |
| commit | a87bb8c50b126d4872eb20dbcf9e50f12c3649f9 (patch) | |
| tree | 3e1c9af8ec3cecd3c9eb3fbb599e5ad239e5de0c | |
| parent | 65a3bd89c7e214c738a40b42cdc69503a44af3ca (diff) | |
| parent | 3720390fa3e9027e545ccd5de2e90c19ec898a6c (diff) | |
| download | eventmq-a87bb8c50b126d4872eb20dbcf9e50f12c3649f9.tar.gz eventmq-a87bb8c50b126d4872eb20dbcf9e50f12c3649f9.zip | |
Merge pull request #70 from sideshowdave7/master
EventMQ 0.3rc5 (We'll get 0.3 someday) Fix critical worker health check bug, Simplify scheduler run_count header (remove support for persistance)
| -rwxr-xr-x | bin/send_request | 3 | ||||
| -rw-r--r-- | eventmq/__init__.py | 2 | ||||
| -rw-r--r-- | eventmq/conf.py | 2 | ||||
| -rw-r--r-- | eventmq/jobmanager.py | 7 | ||||
| -rw-r--r-- | eventmq/scheduler.py | 53 | ||||
| -rw-r--r-- | eventmq/worker.py | 9 | ||||
| -rw-r--r-- | setup.py | 2 |
7 files changed, 39 insertions, 39 deletions
diff --git a/bin/send_request b/bin/send_request index aa84639..9c8696d 100755 --- a/bin/send_request +++ b/bin/send_request | |||
| @@ -21,4 +21,5 @@ if __name__ == "__main__": | |||
| 21 | 'kwargs': {} | 21 | 'kwargs': {} |
| 22 | }] | 22 | }] |
| 23 | 23 | ||
| 24 | send_request(s, msg, guarantee=True, reply_requested=True) | 24 | while True: |
| 25 | send_request(s, msg, guarantee=True, reply_requested=True) | ||
diff --git a/eventmq/__init__.py b/eventmq/__init__.py index e1feca9..184ddfe 100644 --- a/eventmq/__init__.py +++ b/eventmq/__init__.py | |||
| @@ -1,5 +1,5 @@ | |||
| 1 | __author__ = 'EventMQ Contributors' | 1 | __author__ = 'EventMQ Contributors' |
| 2 | __version__ = '0.3-rc4' | 2 | __version__ = '0.3-rc5' |
| 3 | 3 | ||
| 4 | PROTOCOL_VERSION = 'eMQP/1.0' | 4 | PROTOCOL_VERSION = 'eMQP/1.0' |
| 5 | 5 | ||
diff --git a/eventmq/conf.py b/eventmq/conf.py index e2d2b1c..6a0b3a4 100644 --- a/eventmq/conf.py +++ b/eventmq/conf.py | |||
| @@ -86,4 +86,6 @@ RQ_PORT = 6379 | |||
| 86 | RQ_DB = 0 | 86 | RQ_DB = 0 |
| 87 | RQ_PASSWORD = '' | 87 | RQ_PASSWORD = '' |
| 88 | 88 | ||
| 89 | MAX_JOB_COUNT=1024 | ||
| 90 | |||
| 89 | # }}} | 91 | # }}} |
diff --git a/eventmq/jobmanager.py b/eventmq/jobmanager.py index e94f16d..b6de15e 100644 --- a/eventmq/jobmanager.py +++ b/eventmq/jobmanager.py | |||
| @@ -244,13 +244,14 @@ class JobManager(HeartbeatMixin, EMQPService): | |||
| 244 | 244 | ||
| 245 | def check_worker_health(self): | 245 | def check_worker_health(self): |
| 246 | """ | 246 | """ |
| 247 | Checks for any dead processes in the pool and recreates them if necessary | 247 | Checks for any dead processes in the pool and recreates them if |
| 248 | necessary | ||
| 248 | """ | 249 | """ |
| 249 | self._workers = [w for w in self._workers if w.is_alive()] | 250 | self._workers = [w for w in self._workers if w.is_alive()] |
| 250 | 251 | ||
| 251 | if len(self._workers) < conf.CONCURRENT_JOBS: | 252 | if len(self._workers) < conf.CONCURRENT_JOBS: |
| 252 | logger.warning("{} worker process(es) may have died...recreating")\ | 253 | logger.warning("{} worker process(es) may have died...recreating" |
| 253 | .format(conf.CONCURRENT_JOBS - len(self._workers)) | 254 | .format(conf.CONCURRENT_JOBS - len(self._workers))) |
| 254 | 255 | ||
| 255 | for i in range(0, conf.CONCURRENT_JOBS - len(self._workers)): | 256 | for i in range(0, conf.CONCURRENT_JOBS - len(self._workers)): |
| 256 | w = Worker(self.request_queue, self.finished_queue) | 257 | w = Worker(self.request_queue, self.finished_queue) |
diff --git a/eventmq/scheduler.py b/eventmq/scheduler.py index 9130694..086ba69 100644 --- a/eventmq/scheduler.py +++ b/eventmq/scheduler.py | |||
| @@ -182,49 +182,30 @@ class Scheduler(HeartbeatMixin, EMQPService): | |||
| 182 | % (ts_now, v[0], msg)) | 182 | % (ts_now, v[0], msg)) |
| 183 | 183 | ||
| 184 | if v[4] != INFINITE_RUN_COUNT: | 184 | if v[4] != INFINITE_RUN_COUNT: |
| 185 | # Decrement run_count | ||
| 186 | v[4] -= 1 | ||
| 187 | # If run_count was 0, we cancel the job | 185 | # If run_count was 0, we cancel the job |
| 188 | if v[4] <= 0: | 186 | if v[4] <= 0: |
| 189 | cancel_jobs.append(k) | 187 | cancel_jobs.append(k) |
| 190 | # Otherwise we run the job | ||
| 191 | else: | 188 | else: |
| 192 | # Send job and update next schedule time | 189 | # Decrement run_count |
| 190 | v[4] -= 1 | ||
| 193 | self.send_request(msg, queue=queue) | 191 | self.send_request(msg, queue=queue) |
| 194 | v[0] = next(v[2]) | 192 | v[0] = next(v[2]) |
| 195 | # Rename redis key and save new run_count counter | ||
| 196 | try: | ||
| 197 | self.redis_server.rename(k, | ||
| 198 | self.schedule_hash(v)) | ||
| 199 | self.redis_server.set(self.schedule_hash(v), | ||
| 200 | serialize(v)) | ||
| 201 | self.redis_server.save() | ||
| 202 | except redis.ConnectionError: | ||
| 203 | logger.warning("Couldn't contact redis server") | ||
| 204 | except Exception as e: | ||
| 205 | logger.warning( | ||
| 206 | 'Unable to update key in redis ' | ||
| 207 | 'server: {}'.format(e.message)) | ||
| 208 | else: | 193 | else: |
| 209 | # Scheduled job is in running infinitely | 194 | # Scheduled job is in running infinitely |
| 210 | # Send job and update next schedule time | 195 | # Send job and update next schedule time |
| 211 | self.send_request(msg, queue=queue) | 196 | self.send_request(msg, queue=queue) |
| 212 | v[0] = next(v[2]) | 197 | v[0] = next(v[2]) |
| 213 | # Persist changes to redis | ||
| 214 | try: | ||
| 215 | self.redis_server.set( | ||
| 216 | self.schedule_hash(v), serialize(v)) | ||
| 217 | self.redis_server.save() | ||
| 218 | except redis.ConnectionError: | ||
| 219 | logger.warning("Couldn't contact redis server") | ||
| 220 | except Exception as e: | ||
| 221 | logger.warning( | ||
| 222 | 'Unable to update key in redis ' | ||
| 223 | 'server: {}'.format(e.message)) | ||
| 224 | 198 | ||
| 225 | for job in cancel_jobs: | 199 | for job in cancel_jobs: |
| 226 | message = self.interval_jobs[k][1] | 200 | try: |
| 227 | self.unschedule_job(message) | 201 | logger.debug('Cancelling job due to run_count: {}' |
| 202 | .format(k)) | ||
| 203 | self.redis_server.delete(k) | ||
| 204 | self.redis_server.lrem('interval_jobs', 0, k) | ||
| 205 | except Exception as e: | ||
| 206 | logger.warning( | ||
| 207 | 'Unable to update key in redis ' | ||
| 208 | 'server: {}'.format(e)) | ||
| 228 | del self.interval_jobs[k] | 209 | del self.interval_jobs[k] |
| 229 | 210 | ||
| 230 | if not self.maybe_send_heartbeat(events): | 211 | if not self.maybe_send_heartbeat(events): |
| @@ -244,7 +225,7 @@ class Scheduler(HeartbeatMixin, EMQPService): | |||
| 244 | 225 | ||
| 245 | except Exception as e: | 226 | except Exception as e: |
| 246 | logger.warning('Unable to connect to redis server: {}'.format( | 227 | logger.warning('Unable to connect to redis server: {}'.format( |
| 247 | e.message)) | 228 | e)) |
| 248 | else: | 229 | else: |
| 249 | return self._redis_server | 230 | return self._redis_server |
| 250 | 231 | ||
| @@ -304,6 +285,7 @@ class Scheduler(HeartbeatMixin, EMQPService): | |||
| 304 | # in memory | 285 | # in memory |
| 305 | try: | 286 | try: |
| 306 | if (self.redis_server.get(schedule_hash)): | 287 | if (self.redis_server.get(schedule_hash)): |
| 288 | self.redis_server.delete(schedule_hash) | ||
| 307 | self.redis_server.lrem('interval_jobs', 0, schedule_hash) | 289 | self.redis_server.lrem('interval_jobs', 0, schedule_hash) |
| 308 | self.redis_server.save() | 290 | self.redis_server.save() |
| 309 | except redis.ConnectionError: | 291 | except redis.ConnectionError: |
| @@ -357,6 +339,7 @@ class Scheduler(HeartbeatMixin, EMQPService): | |||
| 357 | headers = message[1] | 339 | headers = message[1] |
| 358 | interval = int(message[2]) | 340 | interval = int(message[2]) |
| 359 | cron = str(message[4]) | 341 | cron = str(message[4]) |
| 342 | run_count = self.get_run_count_from_headers(headers) | ||
| 360 | 343 | ||
| 361 | schedule_hash = self.schedule_hash(message) | 344 | schedule_hash = self.schedule_hash(message) |
| 362 | 345 | ||
| @@ -378,7 +361,7 @@ class Scheduler(HeartbeatMixin, EMQPService): | |||
| 378 | message[3], | 361 | message[3], |
| 379 | inter_iter, | 362 | inter_iter, |
| 380 | queue, | 363 | queue, |
| 381 | self.get_run_count_from_headers(headers) | 364 | run_count |
| 382 | ] | 365 | ] |
| 383 | 366 | ||
| 384 | if schedule_hash in self.cron_jobs: | 367 | if schedule_hash in self.cron_jobs: |
| @@ -412,8 +395,12 @@ class Scheduler(HeartbeatMixin, EMQPService): | |||
| 412 | except Exception as e: | 395 | except Exception as e: |
| 413 | logger.warning(str(e)) | 396 | logger.warning(str(e)) |
| 414 | 397 | ||
| 398 | # Send a request in haste mode, decrement run_count if valid | ||
| 415 | if 'nohaste' not in headers: | 399 | if 'nohaste' not in headers: |
| 416 | self.send_request(message[3], queue=queue) | 400 | # Don't allow decrement past 0 |
| 401 | if run_count > 0: | ||
| 402 | self.interval_jobs[schedule_hash][4] -= 1 | ||
| 403 | self.send_request(message[3], queue=queue) | ||
| 417 | 404 | ||
| 418 | def get_run_count_from_headers(self, headers): | 405 | def get_run_count_from_headers(self, headers): |
| 419 | run_count = INFINITE_RUN_COUNT | 406 | run_count = INFINITE_RUN_COUNT |
diff --git a/eventmq/worker.py b/eventmq/worker.py index 00855e8..29814f4 100644 --- a/eventmq/worker.py +++ b/eventmq/worker.py | |||
| @@ -20,8 +20,10 @@ Defines different short-lived workers that execute jobs | |||
| 20 | from importlib import import_module | 20 | from importlib import import_module |
| 21 | from multiprocessing import Process | 21 | from multiprocessing import Process |
| 22 | from threading import Thread | 22 | from threading import Thread |
| 23 | from . import conf | ||
| 23 | 24 | ||
| 24 | import logging | 25 | import logging |
| 26 | import os | ||
| 25 | 27 | ||
| 26 | logger = logging.getLogger(__name__) | 28 | logger = logging.getLogger(__name__) |
| 27 | 29 | ||
| @@ -35,6 +37,7 @@ class MultiprocessWorker(Process): | |||
| 35 | super(MultiprocessWorker, self).__init__() | 37 | super(MultiprocessWorker, self).__init__() |
| 36 | self.input_queue = input_queue | 38 | self.input_queue = input_queue |
| 37 | self.output_queue = output_queue | 39 | self.output_queue = output_queue |
| 40 | self.job_count = 0 | ||
| 38 | 41 | ||
| 39 | def run(self): | 42 | def run(self): |
| 40 | """ | 43 | """ |
| @@ -48,6 +51,7 @@ class MultiprocessWorker(Process): | |||
| 48 | # Pull the payload off the queue and run it | 51 | # Pull the payload off the queue and run it |
| 49 | for payload in iter(self.input_queue.get, 'DONE'): | 52 | for payload in iter(self.input_queue.get, 'DONE'): |
| 50 | 53 | ||
| 54 | self.job_count += 1 | ||
| 51 | timeout = payload.get("timeout", None) | 55 | timeout = payload.get("timeout", None) |
| 52 | msgid = payload.get('msgid', '') | 56 | msgid = payload.get('msgid', '') |
| 53 | 57 | ||
| @@ -73,6 +77,11 @@ class MultiprocessWorker(Process): | |||
| 73 | resp['callback'] = payload['callback'] | 77 | resp['callback'] = payload['callback'] |
| 74 | self.output_queue.put(resp) | 78 | self.output_queue.put(resp) |
| 75 | 79 | ||
| 80 | if self.job_count > conf.MAX_JOB_COUNT: | ||
| 81 | break | ||
| 82 | |||
| 83 | logger.debug("Worker death, PID: {}".format(os.getpid())) | ||
| 84 | |||
| 76 | 85 | ||
| 77 | def _run(payload): | 86 | def _run(payload): |
| 78 | if ":" in payload["path"]: | 87 | if ":" in payload["path"]: |
| @@ -7,7 +7,7 @@ from setuptools import setup, find_packages | |||
| 7 | 7 | ||
| 8 | setup( | 8 | setup( |
| 9 | name='eventmq', | 9 | name='eventmq', |
| 10 | version='0.3-rc4', | 10 | version='0.3-rc5', |
| 11 | description='EventMQ messaging system based on ZeroMQ', | 11 | description='EventMQ messaging system based on ZeroMQ', |
| 12 | packages=find_packages(), | 12 | packages=find_packages(), |
| 13 | install_requires=['pyzmq==15.4.0', | 13 | install_requires=['pyzmq==15.4.0', |