diff options
| author | David Hurst | 2017-04-20 22:07:13 -0600 |
|---|---|---|
| committer | GitHub | 2017-04-20 22:07:13 -0600 |
| commit | e61a13dd9e340537fc2146c0d7588f78448c000e (patch) | |
| tree | 1666d4184c63e0a9cbed8ca4633202d2490e175b | |
| parent | 7ad9644b64e1cbd2bd0013903fb6d145c6209d58 (diff) | |
| parent | 422ddad2321ef63aed5665702d0c419e4acde545 (diff) | |
| download | eventmq-e61a13dd9e340537fc2146c0d7588f78448c000e.tar.gz eventmq-e61a13dd9e340537fc2146c0d7588f78448c000e.zip | |
Merge pull request #31 from sideshowdave7/0.4
Port master changes to 0.4
| -rw-r--r-- | eventmq/__init__.py | 2 | ||||
| -rw-r--r-- | eventmq/jobmanager.py | 102 | ||||
| -rw-r--r-- | eventmq/sender.py | 7 | ||||
| -rw-r--r-- | eventmq/settings.py | 17 | ||||
| -rw-r--r-- | eventmq/tests/test_jobmanager.py | 7 | ||||
| -rw-r--r-- | eventmq/tests/test_sender.py | 2 | ||||
| -rw-r--r-- | eventmq/tests/test_worker.py | 2 | ||||
| -rw-r--r-- | eventmq/worker.py | 148 | ||||
| -rw-r--r-- | setup.py | 2 |
9 files changed, 194 insertions, 95 deletions
diff --git a/eventmq/__init__.py b/eventmq/__init__.py index 43c0365..492f1b9 100644 --- a/eventmq/__init__.py +++ b/eventmq/__init__.py | |||
| @@ -1,5 +1,5 @@ | |||
| 1 | __author__ = 'EventMQ Contributors' | 1 | __author__ = 'EventMQ Contributors' |
| 2 | __version__ = '0.3.4' | 2 | __version__ = '0.3.4.4' |
| 3 | 3 | ||
| 4 | PROTOCOL_VERSION = 'eMQP/1.0' | 4 | PROTOCOL_VERSION = 'eMQP/1.0' |
| 5 | 5 | ||
diff --git a/eventmq/jobmanager.py b/eventmq/jobmanager.py index d86c738..c490e68 100644 --- a/eventmq/jobmanager.py +++ b/eventmq/jobmanager.py | |||
| @@ -84,12 +84,13 @@ class JobManager(HeartbeatMixin, EMQPService): | |||
| 84 | self.name = conf.NAME or generate_device_name() | 84 | self.name = conf.NAME or generate_device_name() |
| 85 | logger.info('Initializing JobManager {}...'.format(self.name)) | 85 | logger.info('Initializing JobManager {}...'.format(self.name)) |
| 86 | 86 | ||
| 87 | if skip_signal: | 87 | if not skip_signal: |
| 88 | # handle any sighups by reloading config | 88 | # handle any sighups by reloading config |
| 89 | signal.signal(signal.SIGHUP, self.sighup_handler) | 89 | signal.signal(signal.SIGHUP, self.sighup_handler) |
| 90 | signal.signal(signal.SIGTERM, self.sigterm_handler) | 90 | signal.signal(signal.SIGTERM, self.sigterm_handler) |
| 91 | signal.signal(signal.SIGINT, self.sigterm_handler) | 91 | signal.signal(signal.SIGINT, self.sigterm_handler) |
| 92 | signal.signal(signal.SIGQUIT, self.sigterm_handler) | 92 | signal.signal(signal.SIGQUIT, self.sigterm_handler) |
| 93 | signal.signal(signal.SIGUSR1, self.handle_pdb) | ||
| 93 | 94 | ||
| 94 | #: JobManager starts out by INFORMing the router of it's existence, | 95 | #: JobManager starts out by INFORMing the router of it's existence, |
| 95 | #: then telling the router that it is READY. The reply will be the unit | 96 | #: then telling the router that it is READY. The reply will be the unit |
| @@ -99,11 +100,29 @@ class JobManager(HeartbeatMixin, EMQPService): | |||
| 99 | 100 | ||
| 100 | self.poller = Poller() | 101 | self.poller = Poller() |
| 101 | 102 | ||
| 103 | #: Stats and monitoring information | ||
| 104 | |||
| 105 | #: Jobs in flight tracks all jobs currently executing. | ||
| 106 | #: Key: msgid, Value: The message with all the details of the job | ||
| 107 | self.jobs_in_flight = {} | ||
| 108 | |||
| 109 | #: Running total number of REQUEST messages received on the broker | ||
| 110 | self.total_requests = 0 | ||
| 111 | #: Running total number of READY messages sent to the broker | ||
| 112 | self.total_ready_sent = 0 | ||
| 113 | #: Keep track of what pids are servicing our requests | ||
| 114 | #: Key: pid, Value: # of jobs completed on the process with that pid | ||
| 115 | self.pid_distribution = {} | ||
| 116 | |||
| 102 | #: Setup worker queues | 117 | #: Setup worker queues |
| 103 | self.request_queue = mp_queue() | 118 | self.request_queue = mp_queue() |
| 104 | self.finished_queue = mp_queue() | 119 | self.finished_queue = mp_queue() |
| 105 | self._setup() | 120 | self._setup() |
| 106 | 121 | ||
| 122 | def handle_pdb(self, sig, frame): | ||
| 123 | import pdb | ||
| 124 | pdb.Pdb().set_trace(frame) | ||
| 125 | |||
| 107 | @property | 126 | @property |
| 108 | def workers(self): | 127 | def workers(self): |
| 109 | if not hasattr(self, '_workers'): | 128 | if not hasattr(self, '_workers'): |
| @@ -123,9 +142,6 @@ class JobManager(HeartbeatMixin, EMQPService): | |||
| 123 | # Acknowledgment has come | 142 | # Acknowledgment has come |
| 124 | # Send a READY for each available worker | 143 | # Send a READY for each available worker |
| 125 | 144 | ||
| 126 | for i in range(0, len(self.workers)): | ||
| 127 | self.send_ready() | ||
| 128 | |||
| 129 | self.status = STATUS.running | 145 | self.status = STATUS.running |
| 130 | 146 | ||
| 131 | try: | 147 | try: |
| @@ -163,7 +179,7 @@ class JobManager(HeartbeatMixin, EMQPService): | |||
| 163 | sys.exit(0) | 179 | sys.exit(0) |
| 164 | else: | 180 | else: |
| 165 | try: | 181 | try: |
| 166 | events = self.poller.poll(10) | 182 | events = self.poller.poll(1000) |
| 167 | except zmq.ZMQError: | 183 | except zmq.ZMQError: |
| 168 | logger.debug('Disconnecting due to ZMQError while' | 184 | logger.debug('Disconnecting due to ZMQError while' |
| 169 | ' polling') | 185 | ' polling') |
| @@ -214,13 +230,21 @@ class JobManager(HeartbeatMixin, EMQPService): | |||
| 214 | 230 | ||
| 215 | logger.debug(resp) | 231 | logger.debug(resp) |
| 216 | pid = resp['pid'] | 232 | pid = resp['pid'] |
| 233 | msgid = resp['msgid'] | ||
| 234 | callback = resp['callback'] | ||
| 235 | death = resp['death'] | ||
| 217 | 236 | ||
| 218 | callback = getattr(self, resp['callback']) | 237 | callback = getattr(self, callback) |
| 219 | callback(resp['return'], resp['msgid']) | 238 | callback(resp['return'], msgid, death, pid) |
| 220 | 239 | ||
| 221 | if resp['return'] == 'DEATH': | 240 | if msgid in self.jobs_in_flight: |
| 222 | if pid in self._workers.keys(): | 241 | del self.jobs_in_flight[msgid] |
| 223 | del self._workers[pid] | 242 | |
| 243 | if not death: | ||
| 244 | if pid not in self.pid_distribution: | ||
| 245 | self.pid_distribution[pid] = 1 | ||
| 246 | else: | ||
| 247 | self.pid_distribution[pid] += 1 | ||
| 224 | 248 | ||
| 225 | def on_request(self, msgid, msg): | 249 | def on_request(self, msgid, msg): |
| 226 | """ | 250 | """ |
| @@ -258,6 +282,9 @@ class JobManager(HeartbeatMixin, EMQPService): | |||
| 258 | # queue_name = msg[0] | 282 | # queue_name = msg[0] |
| 259 | 283 | ||
| 260 | # Parse REQUEST message values | 284 | # Parse REQUEST message values |
| 285 | |||
| 286 | self.total_requests += 1 | ||
| 287 | |||
| 261 | headers = msg[1] | 288 | headers = msg[1] |
| 262 | payload = deserializer(msg[2]) | 289 | payload = deserializer(msg[2]) |
| 263 | params = payload[1] | 290 | params = payload[1] |
| @@ -275,27 +302,54 @@ class JobManager(HeartbeatMixin, EMQPService): | |||
| 275 | payload['msgid'] = msgid | 302 | payload['msgid'] = msgid |
| 276 | payload['callback'] = callback | 303 | payload['callback'] = callback |
| 277 | 304 | ||
| 305 | self.jobs_in_flight[msgid] = (monotonic(), payload) | ||
| 306 | |||
| 278 | self.request_queue.put(payload) | 307 | self.request_queue.put(payload) |
| 279 | 308 | ||
| 280 | def premature_death(self, reply, msgid): | 309 | def premature_death(self, reply, msgid): |
| 310 | """ | ||
| 311 | Worker died before running any jobs | ||
| 312 | """ | ||
| 281 | return | 313 | return |
| 282 | 314 | ||
| 283 | def worker_death(self, reply, msgid): | 315 | def worker_death(self, reply, msgid, death, pid): |
| 284 | return | 316 | """ |
| 317 | Worker died of natural causes, ensure its death and | ||
| 318 | remove from tracking, will be replaced on next heartbeat | ||
| 319 | """ | ||
| 320 | if pid in self._workers.keys(): | ||
| 321 | del self._workers[pid] | ||
| 285 | 322 | ||
| 286 | def worker_done_with_reply(self, reply, msgid): | 323 | def worker_ready(self, reply, msgid, death, pid): |
| 287 | reply = serializer(reply) | ||
| 288 | self.send_reply(reply, msgid) | ||
| 289 | self.send_ready() | 324 | self.send_ready() |
| 290 | 325 | ||
| 291 | def worker_done(self, msgid): | 326 | def worker_done_with_reply(self, reply, msgid, death, pid): |
| 292 | self.send_ready() | 327 | """ |
| 328 | Worker finished a job and requested the return value | ||
| 329 | """ | ||
| 330 | try: | ||
| 331 | reply = serializer(reply) | ||
| 332 | except TypeError as e: | ||
| 333 | reply = serializer({"value": str(e)}) | ||
| 334 | |||
| 335 | self.send_reply(reply, msgid) | ||
| 336 | |||
| 337 | if self.status != STATUS.stopping and not death: | ||
| 338 | self.send_ready() | ||
| 339 | |||
| 340 | def worker_done(self, reply, msgid, death, pid): | ||
| 341 | """ | ||
| 342 | Worker finished a job, notify broker of an additional slot opening | ||
| 343 | """ | ||
| 344 | if self.status != STATUS.stopping and not death: | ||
| 345 | self.send_ready() | ||
| 293 | 346 | ||
| 294 | def send_ready(self): | 347 | def send_ready(self): |
| 295 | """ | 348 | """ |
| 296 | send the READY command upstream to indicate that JobManager is ready | 349 | send the READY command upstream to indicate that JobManager is ready |
| 297 | for another REQUEST message. | 350 | for another REQUEST message. |
| 298 | """ | 351 | """ |
| 352 | self.total_ready_sent += 1 | ||
| 299 | sendmsg(self.frontend, 'READY') | 353 | sendmsg(self.frontend, 'READY') |
| 300 | 354 | ||
| 301 | def send_reply(self, reply, msgid): | 355 | def send_reply(self, reply, msgid): |
| @@ -323,6 +377,9 @@ class JobManager(HeartbeatMixin, EMQPService): | |||
| 323 | necessary | 377 | necessary |
| 324 | """ | 378 | """ |
| 325 | # Kill workers that aren't alive | 379 | # Kill workers that aren't alive |
| 380 | logger.debug("Jobs in flight: {}".format(len(self.jobs_in_flight))) | ||
| 381 | logger.debug("Total requests: {}".format(self.total_requests)) | ||
| 382 | logger.debug("Total ready sent: {}".format(self.total_ready_sent)) | ||
| 326 | try: | 383 | try: |
| 327 | [self.kill_worker(w.pid, signal.SIGKILL) for w in self.workers | 384 | [self.kill_worker(w.pid, signal.SIGKILL) for w in self.workers |
| 328 | if not w.is_alive] | 385 | if not w.is_alive] |
| @@ -345,7 +402,7 @@ class JobManager(HeartbeatMixin, EMQPService): | |||
| 345 | try: | 402 | try: |
| 346 | os.kill(pid, signal) | 403 | os.kill(pid, signal) |
| 347 | except Exception as e: | 404 | except Exception as e: |
| 348 | logger.excpetion( | 405 | logger.exception( |
| 349 | "Encountered exception trying to send {} to worker {}: {}" | 406 | "Encountered exception trying to send {} to worker {}: {}" |
| 350 | .format(signal, pid, str(e))) | 407 | .format(signal, pid, str(e))) |
| 351 | 408 | ||
| @@ -366,8 +423,9 @@ class JobManager(HeartbeatMixin, EMQPService): | |||
| 366 | self.received_disconnect = True | 423 | self.received_disconnect = True |
| 367 | 424 | ||
| 368 | def sigterm_handler(self, signum, frame): | 425 | def sigterm_handler(self, signum, frame): |
| 369 | logger.info('Shutting down..') | 426 | if not self.received_disconnect: |
| 370 | sendmsg(self.frontend, KBYE) | 427 | logger.info('Shutting down..') |
| 428 | sendmsg(self.frontend, KBYE) | ||
| 371 | 429 | ||
| 372 | self.awaiting_startup_ack = False | 430 | self.awaiting_startup_ack = False |
| 373 | self.received_disconnect = True | 431 | self.received_disconnect = True |
diff --git a/eventmq/sender.py b/eventmq/sender.py index 063cd74..f0cc09c 100644 --- a/eventmq/sender.py +++ b/eventmq/sender.py | |||
| @@ -18,6 +18,7 @@ | |||
| 18 | The sender is responsible for sending messages | 18 | The sender is responsible for sending messages |
| 19 | """ | 19 | """ |
| 20 | import logging | 20 | import logging |
| 21 | import sys | ||
| 21 | import uuid | 22 | import uuid |
| 22 | 23 | ||
| 23 | import zmq | 24 | import zmq |
| @@ -136,9 +137,11 @@ class Sender(ZMQSendMixin, ZMQReceiveMixin): | |||
| 136 | if self.zsocket: | 137 | if self.zsocket: |
| 137 | self.zsocket.close() | 138 | self.zsocket.close() |
| 138 | 139 | ||
| 139 | self.name = kwargs.pop('name', str(uuid.uuid4()).encode('ascii')) | ||
| 140 | self.zsocket = kwargs.pop('socket', self.zcontext.socket(zmq.DEALER)) | 140 | self.zsocket = kwargs.pop('socket', self.zcontext.socket(zmq.DEALER)) |
| 141 | self.zsocket.setsockopt(zmq.IDENTITY, self.name) | 141 | if sys.version[0] == '2': |
| 142 | self.zsocket.setsockopt(zmq.IDENTITY, self.name) | ||
| 143 | else: | ||
| 144 | self.zsocket.setsockopt_string(zmq.IDENTITY, str(self.name)) | ||
| 142 | 145 | ||
| 143 | self.status = constants.STATUS.ready | 146 | self.status = constants.STATUS.ready |
| 144 | 147 | ||
diff --git a/eventmq/settings.py b/eventmq/settings.py index ee15742..18ec306 100644 --- a/eventmq/settings.py +++ b/eventmq/settings.py | |||
| @@ -185,6 +185,13 @@ _CONFIG_DEFS = { | |||
| 185 | "weight from the name with a comma. For example " | 185 | "weight from the name with a comma. For example " |
| 186 | "-Q 10,default 20,high" | 186 | "-Q 10,default 20,high" |
| 187 | }, | 187 | }, |
| 188 | 'MAX_JOB_COUNT': { | ||
| 189 | 'default': 1024, | ||
| 190 | 'long-arg': '--max-job-count', | ||
| 191 | 'type': int, | ||
| 192 | 'help': "Maximum number of jobs each worker process executes " | ||
| 193 | "before resetting", | ||
| 194 | }, | ||
| 188 | 'CONCURRENT_JOBS': { | 195 | 'CONCURRENT_JOBS': { |
| 189 | 'default': 4, | 196 | 'default': 4, |
| 190 | 'long-arg': '--concurrent-jobs', | 197 | 'long-arg': '--concurrent-jobs', |
| @@ -209,7 +216,15 @@ _CONFIG_DEFS = { | |||
| 209 | 'long-arg': '--kill-grace-period', | 216 | 'long-arg': '--kill-grace-period', |
| 210 | 'type': int, | 217 | 'type': int, |
| 211 | 'help': 'Seconds to wait before forefully killing worker ' | 218 | 'help': 'Seconds to wait before forefully killing worker ' |
| 212 | 'processes after receiving a SIGTERM' | 219 | 'processes after receiving a SIGTERM', |
| 220 | }, | ||
| 221 | 'GLOBAL_TIMEOUT': { | ||
| 222 | 'default': 300, | ||
| 223 | 'long-arg': '--global-timeout', | ||
| 224 | 'short-arg': '-t', | ||
| 225 | 'type': int, | ||
| 226 | 'help': "Global default timeout for all jobs unless the request " | ||
| 227 | "specifies otherwise", | ||
| 213 | } | 228 | } |
| 214 | }, | 229 | }, |
| 215 | 'scheduler': { | 230 | 'scheduler': { |
diff --git a/eventmq/tests/test_jobmanager.py b/eventmq/tests/test_jobmanager.py index 105f76c..1ace379 100644 --- a/eventmq/tests/test_jobmanager.py +++ b/eventmq/tests/test_jobmanager.py | |||
| @@ -18,7 +18,6 @@ import unittest | |||
| 18 | import mock | 18 | import mock |
| 19 | 19 | ||
| 20 | from .. import constants, jobmanager | 20 | from .. import constants, jobmanager |
| 21 | from ..settings import conf | ||
| 22 | 21 | ||
| 23 | ADDR = 'inproc://pour_the_rice_in_the_thing' | 22 | ADDR = 'inproc://pour_the_rice_in_the_thing' |
| 24 | 23 | ||
| @@ -53,8 +52,7 @@ class TestCase(unittest.TestCase): | |||
| 53 | @mock.patch('eventmq.jobmanager.Sender.recv_multipart') | 52 | @mock.patch('eventmq.jobmanager.Sender.recv_multipart') |
| 54 | @mock.patch('eventmq.jobmanager.Poller.poll') | 53 | @mock.patch('eventmq.jobmanager.Poller.poll') |
| 55 | @mock.patch('eventmq.jobmanager.JobManager.maybe_send_heartbeat') | 54 | @mock.patch('eventmq.jobmanager.JobManager.maybe_send_heartbeat') |
| 56 | @mock.patch('eventmq.jobmanager.JobManager.send_ready') | 55 | def test__start_event_loop(self, maybe_send_hb_mock, |
| 57 | def test__start_event_loop(self, send_ready_mock, maybe_send_hb_mock, | ||
| 58 | poll_mock, sender_mock, process_msg_mock, | 56 | poll_mock, sender_mock, process_msg_mock, |
| 59 | pool_close_mock): | 57 | pool_close_mock): |
| 60 | jm = jobmanager.JobManager(override_settings={ | 58 | jm = jobmanager.JobManager(override_settings={ |
| @@ -66,9 +64,6 @@ class TestCase(unittest.TestCase): | |||
| 66 | 64 | ||
| 67 | jm._start_event_loop() | 65 | jm._start_event_loop() |
| 68 | 66 | ||
| 69 | # send int(conf.CONCURRENT_JOBS) ready messages | ||
| 70 | self.assertEqual(conf.CONCURRENT_JOBS, send_ready_mock.call_count) | ||
| 71 | |||
| 72 | process_msg_mock.assert_called_with( | 67 | process_msg_mock.assert_called_with( |
| 73 | sender_mock.return_value) | 68 | sender_mock.return_value) |
| 74 | 69 | ||
diff --git a/eventmq/tests/test_sender.py b/eventmq/tests/test_sender.py index b2570a3..f292696 100644 --- a/eventmq/tests/test_sender.py +++ b/eventmq/tests/test_sender.py | |||
| @@ -35,7 +35,7 @@ class TestCase(unittest.TestCase): | |||
| 35 | 35 | ||
| 36 | self.assert_(socket.poll() != 0) | 36 | self.assert_(socket.poll() != 0) |
| 37 | msg = socket.recv_multipart() | 37 | msg = socket.recv_multipart() |
| 38 | self.assertEqual(msg[0], self.sender.name) | 38 | self.assertEqual(msg[0].decode('ascii'), self.sender.name) |
| 39 | self.assertEqual(msg[1], b'') | 39 | self.assertEqual(msg[1], b'') |
| 40 | self.assertEqual(msg[2], b'1') | 40 | self.assertEqual(msg[2], b'1') |
| 41 | self.assertEqual(msg[3], b'Hello!') | 41 | self.assertEqual(msg[3], b'Hello!') |
diff --git a/eventmq/tests/test_worker.py b/eventmq/tests/test_worker.py index 115aa52..66808fd 100644 --- a/eventmq/tests/test_worker.py +++ b/eventmq/tests/test_worker.py | |||
| @@ -39,7 +39,7 @@ def test_run_with_timeout(): | |||
| 39 | 'args': [2] | 39 | 'args': [2] |
| 40 | } | 40 | } |
| 41 | 41 | ||
| 42 | msgid = worker._run(payload, logging.getLogger()) | 42 | msgid = worker._run_job(payload, logging.getLogger()) |
| 43 | 43 | ||
| 44 | assert msgid | 44 | assert msgid |
| 45 | 45 | ||
diff --git a/eventmq/worker.py b/eventmq/worker.py index 04a35eb..14d4dfe 100644 --- a/eventmq/worker.py +++ b/eventmq/worker.py | |||
| @@ -26,7 +26,7 @@ from multiprocessing import Process | |||
| 26 | import os | 26 | import os |
| 27 | import sys | 27 | import sys |
| 28 | 28 | ||
| 29 | from threading import Event, Thread | 29 | from threading import Thread |
| 30 | 30 | ||
| 31 | from .settings import conf | 31 | from .settings import conf |
| 32 | 32 | ||
| @@ -36,33 +36,6 @@ else: | |||
| 36 | import queue as Queue | 36 | import queue as Queue |
| 37 | 37 | ||
| 38 | 38 | ||
| 39 | class StoppableThread(Thread): | ||
| 40 | """Thread class with a stop() method. The thread itself has to check | ||
| 41 | regularly for the stopped() condition.""" | ||
| 42 | |||
| 43 | def __init__(self, target, name=None, args=()): | ||
| 44 | super(StoppableThread, self).__init__(name=name, target=target, | ||
| 45 | args=args) | ||
| 46 | self._return = None | ||
| 47 | self._stop = Event() | ||
| 48 | |||
| 49 | def stop(self): | ||
| 50 | self._stop.set() | ||
| 51 | |||
| 52 | def stopped(self): | ||
| 53 | return self._stop.isSet() | ||
| 54 | |||
| 55 | def run(self): | ||
| 56 | """ | ||
| 57 | Overrides default run to have a side effect of saving the result | ||
| 58 | in self._return that will be accessible when the job completes, | ||
| 59 | or remain None after a timeout | ||
| 60 | """ | ||
| 61 | if self._Thread__target is not None: | ||
| 62 | self._return = self._Thread__target(*self._Thread__args, | ||
| 63 | **self._Thread__kwargs) | ||
| 64 | |||
| 65 | |||
| 66 | class MultiprocessWorker(Process): | 39 | class MultiprocessWorker(Process): |
| 67 | """ | 40 | """ |
| 68 | Defines a worker that spans the job in a multiprocessing task | 41 | Defines a worker that spans the job in a multiprocessing task |
| @@ -78,10 +51,7 @@ class MultiprocessWorker(Process): | |||
| 78 | 51 | ||
| 79 | @property | 52 | @property |
| 80 | def logger(self): | 53 | def logger(self): |
| 81 | if not hasattr(self, '_logger'): | 54 | return logging.getLogger(__name__ + '.' + str(os.getpid())) |
| 82 | self._logger = logging.getLogger(__name__ + '.' + str(os.getpid())) | ||
| 83 | |||
| 84 | return self._logger | ||
| 85 | 55 | ||
| 86 | def run(self): | 56 | def run(self): |
| 87 | """ | 57 | """ |
| @@ -89,31 +59,39 @@ class MultiprocessWorker(Process): | |||
| 89 | 59 | ||
| 90 | This is designed to run in a seperate process. | 60 | This is designed to run in a seperate process. |
| 91 | """ | 61 | """ |
| 92 | if self.run_setup: | 62 | # Define the 2 queues for communicating with the worker thread |
| 93 | self.run_setup = False | 63 | logger = self.logger |
| 94 | if any(conf.SETUP_CALLABLE) and any(conf.SETUP_PATH): | 64 | |
| 95 | try: | 65 | worker_queue = Queue.Queue(1) |
| 96 | self.logger.debug("Running setup ({}.{}) for worker id {}" | 66 | worker_result_queue = Queue.Queue(1) |
| 97 | .format( | 67 | worker_thread = Thread(target=_run, |
| 98 | conf.SETUP_PATH, | 68 | args=(worker_queue, |
| 99 | conf.SETUP_CALLABLE, | 69 | worker_result_queue, |
| 100 | os.getpid())) | 70 | logger)) |
| 101 | run_setup(conf.SETUP_PATH, conf.SETUP_CALLABLE) | ||
| 102 | except Exception as e: | ||
| 103 | self.logger.warning('Unable to do setup task ({}.{}): {}' | ||
| 104 | .format(conf.SETUP_PATH, | ||
| 105 | conf.SETUP_CALLABLE, str(e))) | ||
| 106 | 71 | ||
| 107 | import zmq | 72 | import zmq |
| 108 | zmq.Context.instance().term() | 73 | zmq.Context.instance().term() |
| 109 | 74 | ||
| 75 | self.output_queue.put( | ||
| 76 | {'msgid': None, | ||
| 77 | 'return': None, | ||
| 78 | 'death': False, | ||
| 79 | 'pid': os.getpid(), | ||
| 80 | 'callback': 'worker_ready'} | ||
| 81 | ) | ||
| 82 | |||
| 83 | callback = 'premature_death' | ||
| 84 | |||
| 85 | worker_thread.start() | ||
| 86 | |||
| 110 | # Main execution loop, only break in cases that we can't recover from | 87 | # Main execution loop, only break in cases that we can't recover from |
| 111 | # or we reach job count limit | 88 | # or we reach job count limit |
| 112 | while True: | 89 | while True: |
| 113 | try: | 90 | try: |
| 114 | payload = self.input_queue.get(block=False, timeout=1000) | 91 | payload = self.input_queue.get(timeout=1) |
| 115 | if payload == 'DONE': | 92 | if payload == 'DONE': |
| 116 | break | 93 | break |
| 94 | |||
| 117 | except Queue.Empty: | 95 | except Queue.Empty: |
| 118 | if os.getppid() != self.ppid: | 96 | if os.getppid() != self.ppid: |
| 119 | break | 97 | break |
| @@ -121,53 +99,71 @@ class MultiprocessWorker(Process): | |||
| 121 | except Exception as e: | 99 | except Exception as e: |
| 122 | break | 100 | break |
| 123 | finally: | 101 | finally: |
| 102 | # If I'm an orphan, die | ||
| 124 | if os.getppid() != self.ppid: | 103 | if os.getppid() != self.ppid: |
| 125 | break | 104 | break |
| 126 | 105 | ||
| 127 | try: | 106 | try: |
| 128 | return_val = 'None' | 107 | return_val = 'None' |
| 129 | self.job_count += 1 | 108 | self.job_count += 1 |
| 130 | timeout = payload.get("timeout", None) | 109 | timeout = payload.get("timeout") or conf.GLOBAL_TIMEOUT |
| 131 | msgid = payload.get('msgid', '') | 110 | msgid = payload.get('msgid', '') |
| 132 | callback = payload.get('callback', '') | 111 | callback = payload.get('callback', '') |
| 112 | logger.debug("Putting on thread queue msgid: {}".format( | ||
| 113 | msgid)) | ||
| 114 | |||
| 115 | worker_queue.put(payload['params']) | ||
| 133 | 116 | ||
| 134 | worker_thread = StoppableThread(target=_run, | 117 | try: |
| 135 | args=(payload['params'], | 118 | return_val = worker_result_queue.get(timeout=timeout) |
| 136 | self.logger)) | ||
| 137 | worker_thread.start() | ||
| 138 | worker_thread.join(timeout) | ||
| 139 | return_val = {"value": worker_thread._return} | ||
| 140 | 119 | ||
| 141 | if worker_thread.isAlive(): | 120 | logger.debug("Got from result queue msgid: {}".format( |
| 142 | worker_thread.stop() | 121 | msgid)) |
| 122 | except Queue.Empty: | ||
| 143 | return_val = 'TimeoutError' | 123 | return_val = 'TimeoutError' |
| 144 | 124 | ||
| 125 | return_val = {"value": return_val} | ||
| 126 | |||
| 145 | try: | 127 | try: |
| 146 | self.output_queue.put_nowait( | 128 | self.output_queue.put_nowait( |
| 147 | {'msgid': msgid, | 129 | {'msgid': msgid, |
| 148 | 'return': return_val, | 130 | 'return': return_val, |
| 131 | 'death': self.job_count >= conf.MAX_JOB_COUNT or | ||
| 132 | return_val == 'TimeoutError', | ||
| 149 | 'pid': os.getpid(), | 133 | 'pid': os.getpid(), |
| 150 | 'callback': callback} | 134 | 'callback': callback} |
| 151 | ) | 135 | ) |
| 152 | except Exception: | 136 | except Exception: |
| 153 | break | 137 | break |
| 154 | 138 | ||
| 139 | if return_val["value"] == 'TimeoutError': | ||
| 140 | break | ||
| 141 | |||
| 155 | except Exception as e: | 142 | except Exception as e: |
| 156 | return_val = str(e) | 143 | return_val = str(e) |
| 157 | 144 | ||
| 158 | if self.job_count >= conf.MAX_JOB_COUNT: | 145 | if self.job_count >= conf.MAX_JOB_COUNT: |
| 146 | logger.debug("Worker reached job limit, exiting") | ||
| 159 | break | 147 | break |
| 160 | 148 | ||
| 149 | worker_queue.put('DONE') | ||
| 150 | worker_thread.join(timeout=5) | ||
| 151 | |||
| 161 | self.output_queue.put( | 152 | self.output_queue.put( |
| 162 | {'msgid': None, | 153 | {'msgid': None, |
| 163 | 'return': 'DEATH', | 154 | 'return': 'DEATH', |
| 155 | 'death': True, | ||
| 164 | 'pid': os.getpid(), | 156 | 'pid': os.getpid(), |
| 165 | 'callback': 'worker_death'} | 157 | 'callback': 'worker_death'} |
| 166 | ) | 158 | ) |
| 167 | self.logger.debug("Worker death") | 159 | |
| 160 | logger.debug("Worker death") | ||
| 161 | |||
| 162 | if worker_thread.is_alive(): | ||
| 163 | logger.debug("Worker thread did not die gracefully") | ||
| 168 | 164 | ||
| 169 | 165 | ||
| 170 | def _run(payload, logger): | 166 | def _run(queue, result_queue, logger): |
| 171 | """ | 167 | """ |
| 172 | Takes care of actually executing the code given a message payload | 168 | Takes care of actually executing the code given a message payload |
| 173 | 169 | ||
| @@ -181,6 +177,39 @@ def _run(payload, logger): | |||
| 181 | "class_kwargs": {"value": 2} | 177 | "class_kwargs": {"value": 2} |
| 182 | } | 178 | } |
| 183 | """ | 179 | """ |
| 180 | if any(conf.SETUP_CALLABLE) and any(conf.SETUP_PATH): | ||
| 181 | try: | ||
| 182 | logger.debug("Running setup ({}.{}) for worker id {}" | ||
| 183 | .format( | ||
| 184 | conf.SETUP_PATH, | ||
| 185 | conf.SETUP_CALLABLE, | ||
| 186 | os.getpid())) | ||
| 187 | run_setup(conf.SETUP_PATH, conf.SETUP_CALLABLE) | ||
| 188 | except Exception as e: | ||
| 189 | logger.warning('Unable to do setup task ({}.{}): {}' | ||
| 190 | .format(conf.SETUP_PATH, | ||
| 191 | conf.SETUP_CALLABLE, str(e))) | ||
| 192 | |||
| 193 | while True: | ||
| 194 | # Blocking get so we don't spin cycles reading over and over | ||
| 195 | try: | ||
| 196 | payload = queue.get() | ||
| 197 | except Exception as e: | ||
| 198 | logger.exception(e) | ||
| 199 | continue | ||
| 200 | |||
| 201 | if payload == 'DONE': | ||
| 202 | break | ||
| 203 | |||
| 204 | return_val = _run_job(payload, logger) | ||
| 205 | # Signal that we're done with this job and put its return value on the | ||
| 206 | # result queue | ||
| 207 | result_queue.put(return_val) | ||
| 208 | |||
| 209 | logger.debug("Worker thread death") | ||
| 210 | |||
| 211 | |||
| 212 | def _run_job(payload, logger): | ||
| 184 | try: | 213 | try: |
| 185 | if ":" in payload["path"]: | 214 | if ":" in payload["path"]: |
| 186 | _pkgsplit = payload["path"].split(':') | 215 | _pkgsplit = payload["path"].split(':') |
| @@ -224,9 +253,8 @@ def _run(payload, logger): | |||
| 224 | return_val = callable_(*args, **kwargs) | 253 | return_val = callable_(*args, **kwargs) |
| 225 | except Exception as e: | 254 | except Exception as e: |
| 226 | logger.exception(e) | 255 | logger.exception(e) |
| 227 | return str(e) | 256 | return_val = str(e) |
| 228 | 257 | ||
| 229 | # Signal that we're done with this job | ||
| 230 | return return_val | 258 | return return_val |
| 231 | 259 | ||
| 232 | 260 | ||
| @@ -7,7 +7,7 @@ from setuptools import find_packages, setup | |||
| 7 | 7 | ||
| 8 | setup( | 8 | setup( |
| 9 | name='eventmq', | 9 | name='eventmq', |
| 10 | version='0.3.4', | 10 | version='0.3.4.4', |
| 11 | description='EventMQ job execution and messaging system based on ZeroMQ', | 11 | description='EventMQ job execution and messaging system based on ZeroMQ', |
| 12 | packages=find_packages(), | 12 | packages=find_packages(), |
| 13 | install_requires=['pyzmq==16.0.2', | 13 | install_requires=['pyzmq==16.0.2', |