diff options
| author | David Hurst | 2017-04-19 10:51:48 -0600 |
|---|---|---|
| committer | GitHub | 2017-04-19 10:51:48 -0600 |
| commit | dbbf0fadaadb9867caf6689a9bb7ac1d514e31a5 (patch) | |
| tree | 2beff9ac5866b9c6c1e172228d1070c514b66757 | |
| parent | fbeb9bf5e0593a2367ad48325c6269f4e7e6a9d4 (diff) | |
| parent | 05e9aef96205e1eac40ac91a722c533f03e8ebb3 (diff) | |
| download | eventmq-dbbf0fadaadb9867caf6689a9bb7ac1d514e31a5.tar.gz eventmq-dbbf0fadaadb9867caf6689a9bb7ac1d514e31a5.zip | |
Merge pull request #30 from sideshowdave7/feature-slot-leak-investigation0.3.4.4
Feature slot leak investigation
| -rwxr-xr-x | bin/emq-jobmanager | 9 | ||||
| -rwxr-xr-x | bin/send_msg | 4 | ||||
| -rw-r--r-- | eventmq/__init__.py | 2 | ||||
| -rw-r--r-- | eventmq/conf.py | 1 | ||||
| -rw-r--r-- | eventmq/jobmanager.py | 79 | ||||
| -rw-r--r-- | eventmq/sender.py | 7 | ||||
| -rw-r--r-- | eventmq/tests/test_jobmanager.py | 6 | ||||
| -rw-r--r-- | eventmq/tests/test_sender.py | 2 | ||||
| -rw-r--r-- | eventmq/worker.py | 55 | ||||
| -rw-r--r-- | setup.py | 2 |
10 files changed, 121 insertions, 46 deletions
diff --git a/bin/emq-jobmanager b/bin/emq-jobmanager index 4071ada..69c21ed 100755 --- a/bin/emq-jobmanager +++ b/bin/emq-jobmanager | |||
| @@ -34,7 +34,10 @@ if __name__ == "__main__": | |||
| 34 | parser.add_argument('--jobs', '-J', type=int, nargs='?', | 34 | parser.add_argument('--jobs', '-J', type=int, nargs='?', |
| 35 | help='the max number of concurrent jobs to manage at ' | 35 | help='the max number of concurrent jobs to manage at ' |
| 36 | 'a time') | 36 | 'a time') |
| 37 | 37 | parser.add_argument('--name', '-n', type=str, default=None, | |
| 38 | help="A unique ame to give this node. If one " | ||
| 39 | "isn't provided a random uuid will be " | ||
| 40 | "generated") | ||
| 38 | args = parser.parse_args() | 41 | args = parser.parse_args() |
| 39 | 42 | ||
| 40 | # Overwrite the default config location with the one passed to the app | 43 | # Overwrite the default config location with the one passed to the app |
| @@ -48,6 +51,8 @@ if __name__ == "__main__": | |||
| 48 | 51 | ||
| 49 | broker_addr = args.broker_addr | 52 | broker_addr = args.broker_addr |
| 50 | concurrent_jobs = args.jobs | 53 | concurrent_jobs = args.jobs |
| 54 | name = args.name | ||
| 51 | 55 | ||
| 52 | j = JobManager(queues=queues, concurrent_jobs=concurrent_jobs) | 56 | j = JobManager(queues=queues, concurrent_jobs=concurrent_jobs, |
| 57 | name=name) | ||
| 53 | j.jobmanager_main(broker_addr=broker_addr) | 58 | j.jobmanager_main(broker_addr=broker_addr) |
diff --git a/bin/send_msg b/bin/send_msg index 47de45f..c3e88fa 100755 --- a/bin/send_msg +++ b/bin/send_msg | |||
| @@ -22,11 +22,11 @@ if __name__ == "__main__": | |||
| 22 | 'callable': 'work_job', | 22 | 'callable': 'work_job', |
| 23 | 'class_args': ('blurp',), | 23 | 'class_args': ('blurp',), |
| 24 | 'class_kwargs': {'kwarg1': True}, | 24 | 'class_kwargs': {'kwarg1': True}, |
| 25 | 'args': (50, ), | 25 | 'args': (10, ), |
| 26 | 'kwargs': {} | 26 | 'kwargs': {} |
| 27 | }] | 27 | }] |
| 28 | 28 | ||
| 29 | msgid = send_request(s, msg, guarantee=True, reply_requested=True, timeout=10) | 29 | msgid = send_request(s, msg, guarantee=True, reply_requested=True, timeout=10, queue='other') |
| 30 | msgid = send_request(s, msg, guarantee=True, reply_requested=True) | 30 | msgid = send_request(s, msg, guarantee=True, reply_requested=True) |
| 31 | # print 'Sent message, use msgid={} to track responses'.format(msgid) | 31 | # print 'Sent message, use msgid={} to track responses'.format(msgid) |
| 32 | # events = dict(poller.poll(500)) | 32 | # events = dict(poller.poll(500)) |
diff --git a/eventmq/__init__.py b/eventmq/__init__.py index 30572f3..492f1b9 100644 --- a/eventmq/__init__.py +++ b/eventmq/__init__.py | |||
| @@ -1,5 +1,5 @@ | |||
| 1 | __author__ = 'EventMQ Contributors' | 1 | __author__ = 'EventMQ Contributors' |
| 2 | __version__ = '0.3.4.3' | 2 | __version__ = '0.3.4.4' |
| 3 | 3 | ||
| 4 | PROTOCOL_VERSION = 'eMQP/1.0' | 4 | PROTOCOL_VERSION = 'eMQP/1.0' |
| 5 | 5 | ||
diff --git a/eventmq/conf.py b/eventmq/conf.py index 5710933..4749e6b 100644 --- a/eventmq/conf.py +++ b/eventmq/conf.py | |||
| @@ -95,5 +95,6 @@ SETUP_CALLABLE = '' | |||
| 95 | # Time to wait after receiving SIGTERM to kill the workers in the jobmanager | 95 | # Time to wait after receiving SIGTERM to kill the workers in the jobmanager |
| 96 | # forecfully | 96 | # forecfully |
| 97 | KILL_GRACE_PERIOD = 300 | 97 | KILL_GRACE_PERIOD = 300 |
| 98 | GLOBAL_TIMEOUT = 300 | ||
| 98 | 99 | ||
| 99 | # }}} | 100 | # }}} |
diff --git a/eventmq/jobmanager.py b/eventmq/jobmanager.py index d4a0ddc..4840fac 100644 --- a/eventmq/jobmanager.py +++ b/eventmq/jobmanager.py | |||
| @@ -75,9 +75,11 @@ class JobManager(HeartbeatMixin, EMQPService): | |||
| 75 | """ | 75 | """ |
| 76 | super(JobManager, self).__init__(*args, **kwargs) | 76 | super(JobManager, self).__init__(*args, **kwargs) |
| 77 | 77 | ||
| 78 | setup_logger("eventmq") | ||
| 79 | |||
| 78 | #: Define the name of this JobManager instance. Useful to know when | 80 | #: Define the name of this JobManager instance. Useful to know when |
| 79 | #: referring to the logs. | 81 | #: referring to the logs. |
| 80 | self.name = kwargs.pop('name', generate_device_name()) | 82 | self.name = kwargs.pop('name', None) or generate_device_name() |
| 81 | logger.info('Initializing JobManager {}...'.format(self.name)) | 83 | logger.info('Initializing JobManager {}...'.format(self.name)) |
| 82 | 84 | ||
| 83 | #: keep track of workers | 85 | #: keep track of workers |
| @@ -94,6 +96,7 @@ class JobManager(HeartbeatMixin, EMQPService): | |||
| 94 | signal.signal(signal.SIGTERM, self.sigterm_handler) | 96 | signal.signal(signal.SIGTERM, self.sigterm_handler) |
| 95 | signal.signal(signal.SIGINT, self.sigterm_handler) | 97 | signal.signal(signal.SIGINT, self.sigterm_handler) |
| 96 | signal.signal(signal.SIGQUIT, self.sigterm_handler) | 98 | signal.signal(signal.SIGQUIT, self.sigterm_handler) |
| 99 | signal.signal(signal.SIGUSR1, self.handle_pdb) | ||
| 97 | 100 | ||
| 98 | #: JobManager starts out by INFORMing the router of it's existence, | 101 | #: JobManager starts out by INFORMing the router of it's existence, |
| 99 | #: then telling the router that it is READY. The reply will be the unit | 102 | #: then telling the router that it is READY. The reply will be the unit |
| @@ -103,11 +106,29 @@ class JobManager(HeartbeatMixin, EMQPService): | |||
| 103 | 106 | ||
| 104 | self.poller = Poller() | 107 | self.poller = Poller() |
| 105 | 108 | ||
| 109 | #: Stats and monitoring information | ||
| 110 | |||
| 111 | #: Jobs in flight tracks all jobs currently executing. | ||
| 112 | #: Key: msgid, Value: The message with all the details of the job | ||
| 113 | self.jobs_in_flight = {} | ||
| 114 | |||
| 115 | #: Running total number of REQUEST messages received on the broker | ||
| 116 | self.total_requests = 0 | ||
| 117 | #: Running total number of READY messages sent to the broker | ||
| 118 | self.total_ready_sent = 0 | ||
| 119 | #: Keep track of what pids are servicing our requests | ||
| 120 | #: Key: pid, Value: # of jobs completed on the process with that pid | ||
| 121 | self.pid_distribution = {} | ||
| 122 | |||
| 106 | #: Setup worker queues | 123 | #: Setup worker queues |
| 107 | self.request_queue = mp_queue() | 124 | self.request_queue = mp_queue() |
| 108 | self.finished_queue = mp_queue() | 125 | self.finished_queue = mp_queue() |
| 109 | self._setup() | 126 | self._setup() |
| 110 | 127 | ||
| 128 | def handle_pdb(self, sig, frame): | ||
| 129 | import pdb | ||
| 130 | pdb.Pdb().set_trace(frame) | ||
| 131 | |||
| 111 | @property | 132 | @property |
| 112 | def workers(self): | 133 | def workers(self): |
| 113 | if not hasattr(self, '_workers'): | 134 | if not hasattr(self, '_workers'): |
| @@ -127,9 +148,6 @@ class JobManager(HeartbeatMixin, EMQPService): | |||
| 127 | # Acknowledgment has come | 148 | # Acknowledgment has come |
| 128 | # Send a READY for each available worker | 149 | # Send a READY for each available worker |
| 129 | 150 | ||
| 130 | for i in range(0, len(self.workers)): | ||
| 131 | self.send_ready() | ||
| 132 | |||
| 133 | self.status = STATUS.running | 151 | self.status = STATUS.running |
| 134 | 152 | ||
| 135 | try: | 153 | try: |
| @@ -167,7 +185,7 @@ class JobManager(HeartbeatMixin, EMQPService): | |||
| 167 | sys.exit(0) | 185 | sys.exit(0) |
| 168 | else: | 186 | else: |
| 169 | try: | 187 | try: |
| 170 | events = self.poller.poll(10) | 188 | events = self.poller.poll(1000) |
| 171 | except zmq.ZMQError: | 189 | except zmq.ZMQError: |
| 172 | logger.debug('Disconnecting due to ZMQError while' | 190 | logger.debug('Disconnecting due to ZMQError while' |
| 173 | ' polling') | 191 | ' polling') |
| @@ -224,13 +242,21 @@ class JobManager(HeartbeatMixin, EMQPService): | |||
| 224 | 242 | ||
| 225 | logger.debug(resp) | 243 | logger.debug(resp) |
| 226 | pid = resp['pid'] | 244 | pid = resp['pid'] |
| 245 | msgid = resp['msgid'] | ||
| 246 | callback = resp['callback'] | ||
| 247 | death = resp['death'] | ||
| 227 | 248 | ||
| 228 | callback = getattr(self, resp['callback']) | 249 | callback = getattr(self, callback) |
| 229 | callback(resp['return'], resp['msgid']) | 250 | callback(resp['return'], msgid, death, pid) |
| 230 | 251 | ||
| 231 | if resp['return'] == 'DEATH': | 252 | if msgid in self.jobs_in_flight: |
| 232 | if pid in self._workers.keys(): | 253 | del self.jobs_in_flight[msgid] |
| 233 | del self._workers[pid] | 254 | |
| 255 | if not death: | ||
| 256 | if pid not in self.pid_distribution: | ||
| 257 | self.pid_distribution[pid] = 1 | ||
| 258 | else: | ||
| 259 | self.pid_distribution[pid] += 1 | ||
| 234 | 260 | ||
| 235 | def on_request(self, msgid, msg): | 261 | def on_request(self, msgid, msg): |
| 236 | """ | 262 | """ |
| @@ -262,6 +288,9 @@ class JobManager(HeartbeatMixin, EMQPService): | |||
| 262 | # queue_name = msg[0] | 288 | # queue_name = msg[0] |
| 263 | 289 | ||
| 264 | # Parse REQUEST message values | 290 | # Parse REQUEST message values |
| 291 | |||
| 292 | self.total_requests += 1 | ||
| 293 | |||
| 265 | headers = msg[1] | 294 | headers = msg[1] |
| 266 | payload = deserializer(msg[2]) | 295 | payload = deserializer(msg[2]) |
| 267 | params = payload[1] | 296 | params = payload[1] |
| @@ -279,6 +308,8 @@ class JobManager(HeartbeatMixin, EMQPService): | |||
| 279 | payload['msgid'] = msgid | 308 | payload['msgid'] = msgid |
| 280 | payload['callback'] = callback | 309 | payload['callback'] = callback |
| 281 | 310 | ||
| 311 | self.jobs_in_flight[msgid] = (monotonic(), payload) | ||
| 312 | |||
| 282 | self.request_queue.put(payload) | 313 | self.request_queue.put(payload) |
| 283 | 314 | ||
| 284 | def premature_death(self, reply, msgid): | 315 | def premature_death(self, reply, msgid): |
| @@ -287,31 +318,36 @@ class JobManager(HeartbeatMixin, EMQPService): | |||
| 287 | """ | 318 | """ |
| 288 | return | 319 | return |
| 289 | 320 | ||
| 290 | def worker_death(self, reply, msgid): | 321 | def worker_death(self, reply, msgid, death, pid): |
| 291 | """ | 322 | """ |
| 292 | Worker died of natural causes | 323 | Worker died of natural causes, ensure its death and |
| 324 | remove from tracking, will be replaced on next heartbeat | ||
| 293 | """ | 325 | """ |
| 294 | return | 326 | if pid in self._workers.keys(): |
| 327 | del self._workers[pid] | ||
| 328 | |||
| 329 | def worker_ready(self, reply, msgid, death, pid): | ||
| 330 | self.send_ready() | ||
| 295 | 331 | ||
| 296 | def worker_done_with_reply(self, reply, msgid): | 332 | def worker_done_with_reply(self, reply, msgid, death, pid): |
| 297 | """ | 333 | """ |
| 298 | Worker finished a job and requested the return value | 334 | Worker finished a job and requested the return value |
| 299 | """ | 335 | """ |
| 300 | try: | 336 | try: |
| 301 | reply = serializer(reply) | 337 | reply = serializer(reply) |
| 302 | except TypeError as e: | 338 | except TypeError as e: |
| 303 | reply = {"value": str(e)} | 339 | reply = serializer({"value": str(e)}) |
| 304 | 340 | ||
| 305 | self.send_reply(reply, msgid) | 341 | self.send_reply(reply, msgid) |
| 306 | 342 | ||
| 307 | if self.status != STATUS.stopping: | 343 | if self.status != STATUS.stopping and not death: |
| 308 | self.send_ready() | 344 | self.send_ready() |
| 309 | 345 | ||
| 310 | def worker_done(self, reply, msgid): | 346 | def worker_done(self, reply, msgid, death, pid): |
| 311 | """ | 347 | """ |
| 312 | Worker finished a job, notify broker of an additional slot opening | 348 | Worker finished a job, notify broker of an additional slot opening |
| 313 | """ | 349 | """ |
| 314 | if self.status != STATUS.stopping: | 350 | if self.status != STATUS.stopping and not death: |
| 315 | self.send_ready() | 351 | self.send_ready() |
| 316 | 352 | ||
| 317 | def send_ready(self): | 353 | def send_ready(self): |
| @@ -319,6 +355,7 @@ class JobManager(HeartbeatMixin, EMQPService): | |||
| 319 | send the READY command upstream to indicate that JobManager is ready | 355 | send the READY command upstream to indicate that JobManager is ready |
| 320 | for another REQUEST message. | 356 | for another REQUEST message. |
| 321 | """ | 357 | """ |
| 358 | self.total_ready_sent += 1 | ||
| 322 | sendmsg(self.outgoing, 'READY') | 359 | sendmsg(self.outgoing, 'READY') |
| 323 | 360 | ||
| 324 | def send_reply(self, reply, msgid): | 361 | def send_reply(self, reply, msgid): |
| @@ -346,6 +383,9 @@ class JobManager(HeartbeatMixin, EMQPService): | |||
| 346 | necessary | 383 | necessary |
| 347 | """ | 384 | """ |
| 348 | # Kill workers that aren't alive | 385 | # Kill workers that aren't alive |
| 386 | logger.debug("Jobs in flight: {}".format(len(self.jobs_in_flight))) | ||
| 387 | logger.debug("Total requests: {}".format(self.total_requests)) | ||
| 388 | logger.debug("Total ready sent: {}".format(self.total_ready_sent)) | ||
| 349 | try: | 389 | try: |
| 350 | [self.kill_worker(w.pid, signal.SIGKILL) for w in self.workers | 390 | [self.kill_worker(w.pid, signal.SIGKILL) for w in self.workers |
| 351 | if not w.is_alive] | 391 | if not w.is_alive] |
| @@ -368,7 +408,7 @@ class JobManager(HeartbeatMixin, EMQPService): | |||
| 368 | try: | 408 | try: |
| 369 | os.kill(pid, signal) | 409 | os.kill(pid, signal) |
| 370 | except Exception as e: | 410 | except Exception as e: |
| 371 | logger.excpetion( | 411 | logger.exception( |
| 372 | "Encountered exception trying to send {} to worker {}: {}" | 412 | "Encountered exception trying to send {} to worker {}: {}" |
| 373 | .format(signal, pid, str(e))) | 413 | .format(signal, pid, str(e))) |
| 374 | 414 | ||
| @@ -404,7 +444,6 @@ class JobManager(HeartbeatMixin, EMQPService): | |||
| 404 | Args: | 444 | Args: |
| 405 | broker_addr (str): The address of the broker to connect to. | 445 | broker_addr (str): The address of the broker to connect to. |
| 406 | """ | 446 | """ |
| 407 | setup_logger('') | ||
| 408 | import_settings() | 447 | import_settings() |
| 409 | import_settings(section='jobmanager') | 448 | import_settings(section='jobmanager') |
| 410 | 449 | ||
diff --git a/eventmq/sender.py b/eventmq/sender.py index 063cd74..f0cc09c 100644 --- a/eventmq/sender.py +++ b/eventmq/sender.py | |||
| @@ -18,6 +18,7 @@ | |||
| 18 | The sender is responsible for sending messages | 18 | The sender is responsible for sending messages |
| 19 | """ | 19 | """ |
| 20 | import logging | 20 | import logging |
| 21 | import sys | ||
| 21 | import uuid | 22 | import uuid |
| 22 | 23 | ||
| 23 | import zmq | 24 | import zmq |
| @@ -136,9 +137,11 @@ class Sender(ZMQSendMixin, ZMQReceiveMixin): | |||
| 136 | if self.zsocket: | 137 | if self.zsocket: |
| 137 | self.zsocket.close() | 138 | self.zsocket.close() |
| 138 | 139 | ||
| 139 | self.name = kwargs.pop('name', str(uuid.uuid4()).encode('ascii')) | ||
| 140 | self.zsocket = kwargs.pop('socket', self.zcontext.socket(zmq.DEALER)) | 140 | self.zsocket = kwargs.pop('socket', self.zcontext.socket(zmq.DEALER)) |
| 141 | self.zsocket.setsockopt(zmq.IDENTITY, self.name) | 141 | if sys.version[0] == '2': |
| 142 | self.zsocket.setsockopt(zmq.IDENTITY, self.name) | ||
| 143 | else: | ||
| 144 | self.zsocket.setsockopt_string(zmq.IDENTITY, str(self.name)) | ||
| 142 | 145 | ||
| 143 | self.status = constants.STATUS.ready | 146 | self.status = constants.STATUS.ready |
| 144 | 147 | ||
diff --git a/eventmq/tests/test_jobmanager.py b/eventmq/tests/test_jobmanager.py index 927e5f6..566c583 100644 --- a/eventmq/tests/test_jobmanager.py +++ b/eventmq/tests/test_jobmanager.py | |||
| @@ -49,8 +49,7 @@ class TestCase(unittest.TestCase): | |||
| 49 | @mock.patch('eventmq.jobmanager.Sender.recv_multipart') | 49 | @mock.patch('eventmq.jobmanager.Sender.recv_multipart') |
| 50 | @mock.patch('eventmq.jobmanager.Poller.poll') | 50 | @mock.patch('eventmq.jobmanager.Poller.poll') |
| 51 | @mock.patch('eventmq.jobmanager.JobManager.maybe_send_heartbeat') | 51 | @mock.patch('eventmq.jobmanager.JobManager.maybe_send_heartbeat') |
| 52 | @mock.patch('eventmq.jobmanager.JobManager.send_ready') | 52 | def test__start_event_loop(self, maybe_send_hb_mock, |
| 53 | def test__start_event_loop(self, send_ready_mock, maybe_send_hb_mock, | ||
| 54 | poll_mock, sender_mock, process_msg_mock, | 53 | poll_mock, sender_mock, process_msg_mock, |
| 55 | pool_close_mock): | 54 | pool_close_mock): |
| 56 | jm = jobmanager.JobManager() | 55 | jm = jobmanager.JobManager() |
| @@ -60,9 +59,6 @@ class TestCase(unittest.TestCase): | |||
| 60 | 59 | ||
| 61 | jm._start_event_loop() | 60 | jm._start_event_loop() |
| 62 | 61 | ||
| 63 | # send int(conf.CONCURRENT_JOBS) ready messages | ||
| 64 | self.assertEqual(conf.CONCURRENT_JOBS, send_ready_mock.call_count) | ||
| 65 | |||
| 66 | process_msg_mock.assert_called_with( | 62 | process_msg_mock.assert_called_with( |
| 67 | sender_mock.return_value) | 63 | sender_mock.return_value) |
| 68 | 64 | ||
diff --git a/eventmq/tests/test_sender.py b/eventmq/tests/test_sender.py index b2570a3..f292696 100644 --- a/eventmq/tests/test_sender.py +++ b/eventmq/tests/test_sender.py | |||
| @@ -35,7 +35,7 @@ class TestCase(unittest.TestCase): | |||
| 35 | 35 | ||
| 36 | self.assert_(socket.poll() != 0) | 36 | self.assert_(socket.poll() != 0) |
| 37 | msg = socket.recv_multipart() | 37 | msg = socket.recv_multipart() |
| 38 | self.assertEqual(msg[0], self.sender.name) | 38 | self.assertEqual(msg[0].decode('ascii'), self.sender.name) |
| 39 | self.assertEqual(msg[1], b'') | 39 | self.assertEqual(msg[1], b'') |
| 40 | self.assertEqual(msg[2], b'1') | 40 | self.assertEqual(msg[2], b'1') |
| 41 | self.assertEqual(msg[3], b'Hello!') | 41 | self.assertEqual(msg[3], b'Hello!') |
diff --git a/eventmq/worker.py b/eventmq/worker.py index 4f42a2c..1af433e 100644 --- a/eventmq/worker.py +++ b/eventmq/worker.py | |||
| @@ -51,10 +51,7 @@ class MultiprocessWorker(Process): | |||
| 51 | 51 | ||
| 52 | @property | 52 | @property |
| 53 | def logger(self): | 53 | def logger(self): |
| 54 | if not hasattr(self, '_logger'): | 54 | return logging.getLogger(__name__ + '.' + str(os.getpid())) |
| 55 | self._logger = logging.getLogger(__name__ + '.' + str(os.getpid())) | ||
| 56 | |||
| 57 | return self._logger | ||
| 58 | 55 | ||
| 59 | def run(self): | 56 | def run(self): |
| 60 | """ | 57 | """ |
| @@ -63,16 +60,26 @@ class MultiprocessWorker(Process): | |||
| 63 | This is designed to run in a seperate process. | 60 | This is designed to run in a seperate process. |
| 64 | """ | 61 | """ |
| 65 | # Define the 2 queues for communicating with the worker thread | 62 | # Define the 2 queues for communicating with the worker thread |
| 66 | worker_queue = Queue.Queue() | 63 | logger = self.logger |
| 67 | worker_result_queue = Queue.Queue() | 64 | |
| 65 | worker_queue = Queue.Queue(1) | ||
| 66 | worker_result_queue = Queue.Queue(1) | ||
| 68 | worker_thread = Thread(target=_run, | 67 | worker_thread = Thread(target=_run, |
| 69 | args=(worker_queue, | 68 | args=(worker_queue, |
| 70 | worker_result_queue, | 69 | worker_result_queue, |
| 71 | self.logger)) | 70 | logger)) |
| 72 | 71 | ||
| 73 | import zmq | 72 | import zmq |
| 74 | zmq.Context.instance().term() | 73 | zmq.Context.instance().term() |
| 75 | 74 | ||
| 75 | self.output_queue.put( | ||
| 76 | {'msgid': None, | ||
| 77 | 'return': None, | ||
| 78 | 'death': False, | ||
| 79 | 'pid': os.getpid(), | ||
| 80 | 'callback': 'worker_ready'} | ||
| 81 | ) | ||
| 82 | |||
| 76 | callback = 'premature_death' | 83 | callback = 'premature_death' |
| 77 | 84 | ||
| 78 | worker_thread.start() | 85 | worker_thread.start() |
| @@ -99,14 +106,19 @@ class MultiprocessWorker(Process): | |||
| 99 | try: | 106 | try: |
| 100 | return_val = 'None' | 107 | return_val = 'None' |
| 101 | self.job_count += 1 | 108 | self.job_count += 1 |
| 102 | timeout = payload.get("timeout", None) | 109 | timeout = payload.get("timeout") or conf.GLOBAL_TIMEOUT |
| 103 | msgid = payload.get('msgid', '') | 110 | msgid = payload.get('msgid', '') |
| 104 | callback = payload.get('callback', '') | 111 | callback = payload.get('callback', '') |
| 112 | logger.debug("Putting on thread queue msgid: {}".format( | ||
| 113 | msgid)) | ||
| 105 | 114 | ||
| 106 | worker_queue.put(payload['params']) | 115 | worker_queue.put(payload['params']) |
| 107 | 116 | ||
| 108 | try: | 117 | try: |
| 109 | return_val = worker_result_queue.get(timeout=timeout) | 118 | return_val = worker_result_queue.get(timeout=timeout) |
| 119 | |||
| 120 | logger.debug("Got from result queue msgid: {}".format( | ||
| 121 | msgid)) | ||
| 110 | except Queue.Empty: | 122 | except Queue.Empty: |
| 111 | return_val = 'TimeoutError' | 123 | return_val = 'TimeoutError' |
| 112 | 124 | ||
| @@ -116,6 +128,8 @@ class MultiprocessWorker(Process): | |||
| 116 | self.output_queue.put_nowait( | 128 | self.output_queue.put_nowait( |
| 117 | {'msgid': msgid, | 129 | {'msgid': msgid, |
| 118 | 'return': return_val, | 130 | 'return': return_val, |
| 131 | 'death': self.job_count >= conf.MAX_JOB_COUNT or | ||
| 132 | return_val == 'TimeoutError', | ||
| 119 | 'pid': os.getpid(), | 133 | 'pid': os.getpid(), |
| 120 | 'callback': callback} | 134 | 'callback': callback} |
| 121 | ) | 135 | ) |
| @@ -129,15 +143,24 @@ class MultiprocessWorker(Process): | |||
| 129 | return_val = str(e) | 143 | return_val = str(e) |
| 130 | 144 | ||
| 131 | if self.job_count >= conf.MAX_JOB_COUNT: | 145 | if self.job_count >= conf.MAX_JOB_COUNT: |
| 146 | logger.debug("Worker reached job limit, exiting") | ||
| 132 | break | 147 | break |
| 133 | 148 | ||
| 149 | worker_queue.put('DONE') | ||
| 150 | worker_thread.join(timeout=5) | ||
| 151 | |||
| 134 | self.output_queue.put( | 152 | self.output_queue.put( |
| 135 | {'msgid': None, | 153 | {'msgid': None, |
| 136 | 'return': 'DEATH', | 154 | 'return': 'DEATH', |
| 155 | 'death': True, | ||
| 137 | 'pid': os.getpid(), | 156 | 'pid': os.getpid(), |
| 138 | 'callback': 'worker_death'} | 157 | 'callback': 'worker_death'} |
| 139 | ) | 158 | ) |
| 140 | self.logger.debug("Worker death") | 159 | |
| 160 | logger.debug("Worker death") | ||
| 161 | |||
| 162 | if worker_thread.is_alive(): | ||
| 163 | logger.debug("Worker thread did not die gracefully") | ||
| 141 | 164 | ||
| 142 | 165 | ||
| 143 | def _run(queue, result_queue, logger): | 166 | def _run(queue, result_queue, logger): |
| @@ -169,14 +192,22 @@ def _run(queue, result_queue, logger): | |||
| 169 | 192 | ||
| 170 | while True: | 193 | while True: |
| 171 | # Blocking get so we don't spin cycles reading over and over | 194 | # Blocking get so we don't spin cycles reading over and over |
| 172 | payload = queue.get() | 195 | try: |
| 173 | return_val = _run_job(payload, logger) | 196 | payload = queue.get() |
| 197 | except Exception as e: | ||
| 198 | logger.exception(e) | ||
| 199 | continue | ||
| 200 | |||
| 201 | if payload == 'DONE': | ||
| 202 | break | ||
| 174 | 203 | ||
| 204 | return_val = _run_job(payload, logger) | ||
| 175 | # Signal that we're done with this job and put its return value on the | 205 | # Signal that we're done with this job and put its return value on the |
| 176 | # result queue | 206 | # result queue |
| 177 | queue.task_done() | ||
| 178 | result_queue.put(return_val) | 207 | result_queue.put(return_val) |
| 179 | 208 | ||
| 209 | logger.debug("Worker thread death") | ||
| 210 | |||
| 180 | 211 | ||
| 181 | def _run_job(payload, logger): | 212 | def _run_job(payload, logger): |
| 182 | try: | 213 | try: |
| @@ -7,7 +7,7 @@ from setuptools import find_packages, setup | |||
| 7 | 7 | ||
| 8 | setup( | 8 | setup( |
| 9 | name='eventmq', | 9 | name='eventmq', |
| 10 | version='0.3.4.3', | 10 | version='0.3.4.4', |
| 11 | description='EventMQ job execution and messaging system based on ZeroMQ', | 11 | description='EventMQ job execution and messaging system based on ZeroMQ', |
| 12 | packages=find_packages(), | 12 | packages=find_packages(), |
| 13 | install_requires=['pyzmq==15.4.0', | 13 | install_requires=['pyzmq==15.4.0', |