diff options
| -rw-r--r-- | eventmq/jobmanager.py | 12 | ||||
| -rw-r--r-- | eventmq/sender.py | 6 | ||||
| -rw-r--r-- | eventmq/tests/test_sender.py | 2 | ||||
| -rw-r--r-- | eventmq/worker.py | 38 |
4 files changed, 41 insertions, 17 deletions
diff --git a/eventmq/jobmanager.py b/eventmq/jobmanager.py index 16cb506..ec15b3c 100644 --- a/eventmq/jobmanager.py +++ b/eventmq/jobmanager.py | |||
| @@ -79,7 +79,7 @@ class JobManager(HeartbeatMixin, EMQPService): | |||
| 79 | 79 | ||
| 80 | #: Define the name of this JobManager instance. Useful to know when | 80 | #: Define the name of this JobManager instance. Useful to know when |
| 81 | #: referring to the logs. | 81 | #: referring to the logs. |
| 82 | self.name = kwargs.pop('name') or generate_device_name() | 82 | self.name = kwargs.pop('name', None) or generate_device_name() |
| 83 | logger.info('Initializing JobManager {}...'.format(self.name)) | 83 | logger.info('Initializing JobManager {}...'.format(self.name)) |
| 84 | 84 | ||
| 85 | #: keep track of workers | 85 | #: keep track of workers |
| @@ -106,9 +106,12 @@ class JobManager(HeartbeatMixin, EMQPService): | |||
| 106 | 106 | ||
| 107 | self.poller = Poller() | 107 | self.poller = Poller() |
| 108 | 108 | ||
| 109 | # Stats and monitoring information | ||
| 109 | self.jobs_in_flight = {} | 110 | self.jobs_in_flight = {} |
| 110 | self.total_requests = 0 | 111 | self.total_requests = 0 |
| 111 | self.total_ready_sent = 0 | 112 | self.total_ready_sent = 0 |
| 113 | # Keep track of what pids are servicing our requests | ||
| 114 | self.pid_distribution = {} | ||
| 112 | 115 | ||
| 113 | #: Setup worker queues | 116 | #: Setup worker queues |
| 114 | self.request_queue = mp_queue() | 117 | self.request_queue = mp_queue() |
| @@ -242,6 +245,12 @@ class JobManager(HeartbeatMixin, EMQPService): | |||
| 242 | if msgid in self.jobs_in_flight: | 245 | if msgid in self.jobs_in_flight: |
| 243 | del self.jobs_in_flight[msgid] | 246 | del self.jobs_in_flight[msgid] |
| 244 | 247 | ||
| 248 | if not death: | ||
| 249 | if pid not in self.pid_distribution: | ||
| 250 | self.pid_distribution[pid] = 1 | ||
| 251 | else: | ||
| 252 | self.pid_distribution[pid] += 1 | ||
| 253 | |||
| 245 | def on_request(self, msgid, msg): | 254 | def on_request(self, msgid, msg): |
| 246 | """ | 255 | """ |
| 247 | Handles a REQUEST command | 256 | Handles a REQUEST command |
| @@ -307,7 +316,6 @@ class JobManager(HeartbeatMixin, EMQPService): | |||
| 307 | Worker died of natural causes, ensure its death and | 316 | Worker died of natural causes, ensure its death and |
| 308 | remove from tracking, will be replaced on next heartbeat | 317 | remove from tracking, will be replaced on next heartbeat |
| 309 | """ | 318 | """ |
| 310 | self.kill_worker(pid, signal.SIGKILL) | ||
| 311 | if pid in self._workers.keys(): | 319 | if pid in self._workers.keys(): |
| 312 | del self._workers[pid] | 320 | del self._workers[pid] |
| 313 | 321 | ||
diff --git a/eventmq/sender.py b/eventmq/sender.py index 8452abd..f0cc09c 100644 --- a/eventmq/sender.py +++ b/eventmq/sender.py | |||
| @@ -18,6 +18,7 @@ | |||
| 18 | The sender is responsible for sending messages | 18 | The sender is responsible for sending messages |
| 19 | """ | 19 | """ |
| 20 | import logging | 20 | import logging |
| 21 | import sys | ||
| 21 | import uuid | 22 | import uuid |
| 22 | 23 | ||
| 23 | import zmq | 24 | import zmq |
| @@ -137,7 +138,10 @@ class Sender(ZMQSendMixin, ZMQReceiveMixin): | |||
| 137 | self.zsocket.close() | 138 | self.zsocket.close() |
| 138 | 139 | ||
| 139 | self.zsocket = kwargs.pop('socket', self.zcontext.socket(zmq.DEALER)) | 140 | self.zsocket = kwargs.pop('socket', self.zcontext.socket(zmq.DEALER)) |
| 140 | self.zsocket.setsockopt(zmq.IDENTITY, self.name) | 141 | if sys.version[0] == '2': |
| 142 | self.zsocket.setsockopt(zmq.IDENTITY, self.name) | ||
| 143 | else: | ||
| 144 | self.zsocket.setsockopt_string(zmq.IDENTITY, str(self.name)) | ||
| 141 | 145 | ||
| 142 | self.status = constants.STATUS.ready | 146 | self.status = constants.STATUS.ready |
| 143 | 147 | ||
diff --git a/eventmq/tests/test_sender.py b/eventmq/tests/test_sender.py index b2570a3..f292696 100644 --- a/eventmq/tests/test_sender.py +++ b/eventmq/tests/test_sender.py | |||
| @@ -35,7 +35,7 @@ class TestCase(unittest.TestCase): | |||
| 35 | 35 | ||
| 36 | self.assert_(socket.poll() != 0) | 36 | self.assert_(socket.poll() != 0) |
| 37 | msg = socket.recv_multipart() | 37 | msg = socket.recv_multipart() |
| 38 | self.assertEqual(msg[0], self.sender.name) | 38 | self.assertEqual(msg[0].decode('ascii'), self.sender.name) |
| 39 | self.assertEqual(msg[1], b'') | 39 | self.assertEqual(msg[1], b'') |
| 40 | self.assertEqual(msg[2], b'1') | 40 | self.assertEqual(msg[2], b'1') |
| 41 | self.assertEqual(msg[3], b'Hello!') | 41 | self.assertEqual(msg[3], b'Hello!') |
diff --git a/eventmq/worker.py b/eventmq/worker.py index 838be09..69567d5 100644 --- a/eventmq/worker.py +++ b/eventmq/worker.py | |||
| @@ -51,10 +51,7 @@ class MultiprocessWorker(Process): | |||
| 51 | 51 | ||
| 52 | @property | 52 | @property |
| 53 | def logger(self): | 53 | def logger(self): |
| 54 | if not hasattr(self, '_logger'): | 54 | return logging.getLogger(__name__ + '.' + str(os.getpid())) |
| 55 | self._logger = logging.getLogger(__name__ + '.' + str(os.getpid())) | ||
| 56 | |||
| 57 | return self._logger | ||
| 58 | 55 | ||
| 59 | def run(self): | 56 | def run(self): |
| 60 | """ | 57 | """ |
| @@ -63,12 +60,14 @@ class MultiprocessWorker(Process): | |||
| 63 | This is designed to run in a seperate process. | 60 | This is designed to run in a seperate process. |
| 64 | """ | 61 | """ |
| 65 | # Define the 2 queues for communicating with the worker thread | 62 | # Define the 2 queues for communicating with the worker thread |
| 66 | worker_queue = Queue.Queue() | 63 | logger = self.logger |
| 67 | worker_result_queue = Queue.Queue() | 64 | |
| 65 | worker_queue = Queue.Queue(1) | ||
| 66 | worker_result_queue = Queue.Queue(1) | ||
| 68 | worker_thread = Thread(target=_run, | 67 | worker_thread = Thread(target=_run, |
| 69 | args=(worker_queue, | 68 | args=(worker_queue, |
| 70 | worker_result_queue, | 69 | worker_result_queue, |
| 71 | self.logger)) | 70 | logger)) |
| 72 | 71 | ||
| 73 | import zmq | 72 | import zmq |
| 74 | zmq.Context.instance().term() | 73 | zmq.Context.instance().term() |
| @@ -107,10 +106,10 @@ class MultiprocessWorker(Process): | |||
| 107 | try: | 106 | try: |
| 108 | return_val = 'None' | 107 | return_val = 'None' |
| 109 | self.job_count += 1 | 108 | self.job_count += 1 |
| 110 | timeout = payload.get("timeout", conf.GLOBAL_TIMEOUT) | 109 | timeout = payload.get("timeout") or conf.GLOBAL_TIMEOUT |
| 111 | msgid = payload.get('msgid', '') | 110 | msgid = payload.get('msgid', '') |
| 112 | callback = payload.get('callback', '') | 111 | callback = payload.get('callback', '') |
| 113 | self.logger.debug("Putting on thread queue msgid: {}".format( | 112 | logger.debug("Putting on thread queue msgid: {}".format( |
| 114 | msgid)) | 113 | msgid)) |
| 115 | 114 | ||
| 116 | worker_queue.put(payload['params']) | 115 | worker_queue.put(payload['params']) |
| @@ -118,7 +117,7 @@ class MultiprocessWorker(Process): | |||
| 118 | try: | 117 | try: |
| 119 | return_val = worker_result_queue.get(timeout=timeout) | 118 | return_val = worker_result_queue.get(timeout=timeout) |
| 120 | 119 | ||
| 121 | self.logger.debug("Got from result queue msgid: {}".format( | 120 | logger.debug("Got from result queue msgid: {}".format( |
| 122 | msgid)) | 121 | msgid)) |
| 123 | except Queue.Empty: | 122 | except Queue.Empty: |
| 124 | return_val = 'TimeoutError' | 123 | return_val = 'TimeoutError' |
| @@ -143,9 +142,12 @@ class MultiprocessWorker(Process): | |||
| 143 | return_val = str(e) | 142 | return_val = str(e) |
| 144 | 143 | ||
| 145 | if self.job_count >= conf.MAX_JOB_COUNT: | 144 | if self.job_count >= conf.MAX_JOB_COUNT: |
| 146 | self.logger.debug("Worker reached job limit, exiting") | 145 | logger.debug("Worker reached job limit, exiting") |
| 147 | break | 146 | break |
| 148 | 147 | ||
| 148 | worker_queue.put('DONE') | ||
| 149 | worker_thread.join(timeout=5) | ||
| 150 | |||
| 149 | self.output_queue.put( | 151 | self.output_queue.put( |
| 150 | {'msgid': None, | 152 | {'msgid': None, |
| 151 | 'return': 'DEATH', | 153 | 'return': 'DEATH', |
| @@ -153,7 +155,11 @@ class MultiprocessWorker(Process): | |||
| 153 | 'pid': os.getpid(), | 155 | 'pid': os.getpid(), |
| 154 | 'callback': 'worker_death'} | 156 | 'callback': 'worker_death'} |
| 155 | ) | 157 | ) |
| 156 | self.logger.debug("Worker death") | 158 | |
| 159 | logger.debug("Worker death") | ||
| 160 | |||
| 161 | if worker_thread.is_alive(): | ||
| 162 | logger.debug("Worker thread did not die gracefully") | ||
| 157 | 163 | ||
| 158 | 164 | ||
| 159 | def _run(queue, result_queue, logger): | 165 | def _run(queue, result_queue, logger): |
| @@ -187,14 +193,20 @@ def _run(queue, result_queue, logger): | |||
| 187 | # Blocking get so we don't spin cycles reading over and over | 193 | # Blocking get so we don't spin cycles reading over and over |
| 188 | try: | 194 | try: |
| 189 | payload = queue.get() | 195 | payload = queue.get() |
| 190 | except: | 196 | except Exception as e: |
| 197 | logger.exception(e) | ||
| 191 | continue | 198 | continue |
| 192 | 199 | ||
| 200 | if payload == 'DONE': | ||
| 201 | break | ||
| 202 | |||
| 193 | return_val = _run_job(payload, logger) | 203 | return_val = _run_job(payload, logger) |
| 194 | # Signal that we're done with this job and put its return value on the | 204 | # Signal that we're done with this job and put its return value on the |
| 195 | # result queue | 205 | # result queue |
| 196 | result_queue.put(return_val) | 206 | result_queue.put(return_val) |
| 197 | 207 | ||
| 208 | logger.debug("Worker thread death") | ||
| 209 | |||
| 198 | 210 | ||
| 199 | def _run_job(payload, logger): | 211 | def _run_job(payload, logger): |
| 200 | try: | 212 | try: |