aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDavid Hurst2017-04-20 22:07:13 -0600
committerGitHub2017-04-20 22:07:13 -0600
commite61a13dd9e340537fc2146c0d7588f78448c000e (patch)
tree1666d4184c63e0a9cbed8ca4633202d2490e175b
parent7ad9644b64e1cbd2bd0013903fb6d145c6209d58 (diff)
parent422ddad2321ef63aed5665702d0c419e4acde545 (diff)
downloadeventmq-e61a13dd9e340537fc2146c0d7588f78448c000e.tar.gz
eventmq-e61a13dd9e340537fc2146c0d7588f78448c000e.zip
Merge pull request #31 from sideshowdave7/0.4
Port master changes to 0.4
-rw-r--r--eventmq/__init__.py2
-rw-r--r--eventmq/jobmanager.py102
-rw-r--r--eventmq/sender.py7
-rw-r--r--eventmq/settings.py17
-rw-r--r--eventmq/tests/test_jobmanager.py7
-rw-r--r--eventmq/tests/test_sender.py2
-rw-r--r--eventmq/tests/test_worker.py2
-rw-r--r--eventmq/worker.py148
-rw-r--r--setup.py2
9 files changed, 194 insertions, 95 deletions
diff --git a/eventmq/__init__.py b/eventmq/__init__.py
index 43c0365..492f1b9 100644
--- a/eventmq/__init__.py
+++ b/eventmq/__init__.py
@@ -1,5 +1,5 @@
1__author__ = 'EventMQ Contributors' 1__author__ = 'EventMQ Contributors'
2__version__ = '0.3.4' 2__version__ = '0.3.4.4'
3 3
4PROTOCOL_VERSION = 'eMQP/1.0' 4PROTOCOL_VERSION = 'eMQP/1.0'
5 5
diff --git a/eventmq/jobmanager.py b/eventmq/jobmanager.py
index d86c738..c490e68 100644
--- a/eventmq/jobmanager.py
+++ b/eventmq/jobmanager.py
@@ -84,12 +84,13 @@ class JobManager(HeartbeatMixin, EMQPService):
84 self.name = conf.NAME or generate_device_name() 84 self.name = conf.NAME or generate_device_name()
85 logger.info('Initializing JobManager {}...'.format(self.name)) 85 logger.info('Initializing JobManager {}...'.format(self.name))
86 86
87 if skip_signal: 87 if not skip_signal:
88 # handle any sighups by reloading config 88 # handle any sighups by reloading config
89 signal.signal(signal.SIGHUP, self.sighup_handler) 89 signal.signal(signal.SIGHUP, self.sighup_handler)
90 signal.signal(signal.SIGTERM, self.sigterm_handler) 90 signal.signal(signal.SIGTERM, self.sigterm_handler)
91 signal.signal(signal.SIGINT, self.sigterm_handler) 91 signal.signal(signal.SIGINT, self.sigterm_handler)
92 signal.signal(signal.SIGQUIT, self.sigterm_handler) 92 signal.signal(signal.SIGQUIT, self.sigterm_handler)
93 signal.signal(signal.SIGUSR1, self.handle_pdb)
93 94
94 #: JobManager starts out by INFORMing the router of it's existence, 95 #: JobManager starts out by INFORMing the router of it's existence,
95 #: then telling the router that it is READY. The reply will be the unit 96 #: then telling the router that it is READY. The reply will be the unit
@@ -99,11 +100,29 @@ class JobManager(HeartbeatMixin, EMQPService):
99 100
100 self.poller = Poller() 101 self.poller = Poller()
101 102
103 #: Stats and monitoring information
104
105 #: Jobs in flight tracks all jobs currently executing.
106 #: Key: msgid, Value: The message with all the details of the job
107 self.jobs_in_flight = {}
108
109 #: Running total number of REQUEST messages received on the broker
110 self.total_requests = 0
111 #: Running total number of READY messages sent to the broker
112 self.total_ready_sent = 0
113 #: Keep track of what pids are servicing our requests
114 #: Key: pid, Value: # of jobs completed on the process with that pid
115 self.pid_distribution = {}
116
102 #: Setup worker queues 117 #: Setup worker queues
103 self.request_queue = mp_queue() 118 self.request_queue = mp_queue()
104 self.finished_queue = mp_queue() 119 self.finished_queue = mp_queue()
105 self._setup() 120 self._setup()
106 121
122 def handle_pdb(self, sig, frame):
123 import pdb
124 pdb.Pdb().set_trace(frame)
125
107 @property 126 @property
108 def workers(self): 127 def workers(self):
109 if not hasattr(self, '_workers'): 128 if not hasattr(self, '_workers'):
@@ -123,9 +142,6 @@ class JobManager(HeartbeatMixin, EMQPService):
123 # Acknowledgment has come 142 # Acknowledgment has come
124 # Send a READY for each available worker 143 # Send a READY for each available worker
125 144
126 for i in range(0, len(self.workers)):
127 self.send_ready()
128
129 self.status = STATUS.running 145 self.status = STATUS.running
130 146
131 try: 147 try:
@@ -163,7 +179,7 @@ class JobManager(HeartbeatMixin, EMQPService):
163 sys.exit(0) 179 sys.exit(0)
164 else: 180 else:
165 try: 181 try:
166 events = self.poller.poll(10) 182 events = self.poller.poll(1000)
167 except zmq.ZMQError: 183 except zmq.ZMQError:
168 logger.debug('Disconnecting due to ZMQError while' 184 logger.debug('Disconnecting due to ZMQError while'
169 ' polling') 185 ' polling')
@@ -214,13 +230,21 @@ class JobManager(HeartbeatMixin, EMQPService):
214 230
215 logger.debug(resp) 231 logger.debug(resp)
216 pid = resp['pid'] 232 pid = resp['pid']
233 msgid = resp['msgid']
234 callback = resp['callback']
235 death = resp['death']
217 236
218 callback = getattr(self, resp['callback']) 237 callback = getattr(self, callback)
219 callback(resp['return'], resp['msgid']) 238 callback(resp['return'], msgid, death, pid)
220 239
221 if resp['return'] == 'DEATH': 240 if msgid in self.jobs_in_flight:
222 if pid in self._workers.keys(): 241 del self.jobs_in_flight[msgid]
223 del self._workers[pid] 242
243 if not death:
244 if pid not in self.pid_distribution:
245 self.pid_distribution[pid] = 1
246 else:
247 self.pid_distribution[pid] += 1
224 248
225 def on_request(self, msgid, msg): 249 def on_request(self, msgid, msg):
226 """ 250 """
@@ -258,6 +282,9 @@ class JobManager(HeartbeatMixin, EMQPService):
258 # queue_name = msg[0] 282 # queue_name = msg[0]
259 283
260 # Parse REQUEST message values 284 # Parse REQUEST message values
285
286 self.total_requests += 1
287
261 headers = msg[1] 288 headers = msg[1]
262 payload = deserializer(msg[2]) 289 payload = deserializer(msg[2])
263 params = payload[1] 290 params = payload[1]
@@ -275,27 +302,54 @@ class JobManager(HeartbeatMixin, EMQPService):
275 payload['msgid'] = msgid 302 payload['msgid'] = msgid
276 payload['callback'] = callback 303 payload['callback'] = callback
277 304
305 self.jobs_in_flight[msgid] = (monotonic(), payload)
306
278 self.request_queue.put(payload) 307 self.request_queue.put(payload)
279 308
280 def premature_death(self, reply, msgid): 309 def premature_death(self, reply, msgid):
310 """
311 Worker died before running any jobs
312 """
281 return 313 return
282 314
283 def worker_death(self, reply, msgid): 315 def worker_death(self, reply, msgid, death, pid):
284 return 316 """
317 Worker died of natural causes, ensure its death and
318 remove from tracking, will be replaced on next heartbeat
319 """
320 if pid in self._workers.keys():
321 del self._workers[pid]
285 322
286 def worker_done_with_reply(self, reply, msgid): 323 def worker_ready(self, reply, msgid, death, pid):
287 reply = serializer(reply)
288 self.send_reply(reply, msgid)
289 self.send_ready() 324 self.send_ready()
290 325
291 def worker_done(self, msgid): 326 def worker_done_with_reply(self, reply, msgid, death, pid):
292 self.send_ready() 327 """
328 Worker finished a job and requested the return value
329 """
330 try:
331 reply = serializer(reply)
332 except TypeError as e:
333 reply = serializer({"value": str(e)})
334
335 self.send_reply(reply, msgid)
336
337 if self.status != STATUS.stopping and not death:
338 self.send_ready()
339
340 def worker_done(self, reply, msgid, death, pid):
341 """
342 Worker finished a job, notify broker of an additional slot opening
343 """
344 if self.status != STATUS.stopping and not death:
345 self.send_ready()
293 346
294 def send_ready(self): 347 def send_ready(self):
295 """ 348 """
296 send the READY command upstream to indicate that JobManager is ready 349 send the READY command upstream to indicate that JobManager is ready
297 for another REQUEST message. 350 for another REQUEST message.
298 """ 351 """
352 self.total_ready_sent += 1
299 sendmsg(self.frontend, 'READY') 353 sendmsg(self.frontend, 'READY')
300 354
301 def send_reply(self, reply, msgid): 355 def send_reply(self, reply, msgid):
@@ -323,6 +377,9 @@ class JobManager(HeartbeatMixin, EMQPService):
323 necessary 377 necessary
324 """ 378 """
325 # Kill workers that aren't alive 379 # Kill workers that aren't alive
380 logger.debug("Jobs in flight: {}".format(len(self.jobs_in_flight)))
381 logger.debug("Total requests: {}".format(self.total_requests))
382 logger.debug("Total ready sent: {}".format(self.total_ready_sent))
326 try: 383 try:
327 [self.kill_worker(w.pid, signal.SIGKILL) for w in self.workers 384 [self.kill_worker(w.pid, signal.SIGKILL) for w in self.workers
328 if not w.is_alive] 385 if not w.is_alive]
@@ -345,7 +402,7 @@ class JobManager(HeartbeatMixin, EMQPService):
345 try: 402 try:
346 os.kill(pid, signal) 403 os.kill(pid, signal)
347 except Exception as e: 404 except Exception as e:
348 logger.excpetion( 405 logger.exception(
349 "Encountered exception trying to send {} to worker {}: {}" 406 "Encountered exception trying to send {} to worker {}: {}"
350 .format(signal, pid, str(e))) 407 .format(signal, pid, str(e)))
351 408
@@ -366,8 +423,9 @@ class JobManager(HeartbeatMixin, EMQPService):
366 self.received_disconnect = True 423 self.received_disconnect = True
367 424
368 def sigterm_handler(self, signum, frame): 425 def sigterm_handler(self, signum, frame):
369 logger.info('Shutting down..') 426 if not self.received_disconnect:
370 sendmsg(self.frontend, KBYE) 427 logger.info('Shutting down..')
428 sendmsg(self.frontend, KBYE)
371 429
372 self.awaiting_startup_ack = False 430 self.awaiting_startup_ack = False
373 self.received_disconnect = True 431 self.received_disconnect = True
diff --git a/eventmq/sender.py b/eventmq/sender.py
index 063cd74..f0cc09c 100644
--- a/eventmq/sender.py
+++ b/eventmq/sender.py
@@ -18,6 +18,7 @@
18The sender is responsible for sending messages 18The sender is responsible for sending messages
19""" 19"""
20import logging 20import logging
21import sys
21import uuid 22import uuid
22 23
23import zmq 24import zmq
@@ -136,9 +137,11 @@ class Sender(ZMQSendMixin, ZMQReceiveMixin):
136 if self.zsocket: 137 if self.zsocket:
137 self.zsocket.close() 138 self.zsocket.close()
138 139
139 self.name = kwargs.pop('name', str(uuid.uuid4()).encode('ascii'))
140 self.zsocket = kwargs.pop('socket', self.zcontext.socket(zmq.DEALER)) 140 self.zsocket = kwargs.pop('socket', self.zcontext.socket(zmq.DEALER))
141 self.zsocket.setsockopt(zmq.IDENTITY, self.name) 141 if sys.version[0] == '2':
142 self.zsocket.setsockopt(zmq.IDENTITY, self.name)
143 else:
144 self.zsocket.setsockopt_string(zmq.IDENTITY, str(self.name))
142 145
143 self.status = constants.STATUS.ready 146 self.status = constants.STATUS.ready
144 147
diff --git a/eventmq/settings.py b/eventmq/settings.py
index ee15742..18ec306 100644
--- a/eventmq/settings.py
+++ b/eventmq/settings.py
@@ -185,6 +185,13 @@ _CONFIG_DEFS = {
185 "weight from the name with a comma. For example " 185 "weight from the name with a comma. For example "
186 "-Q 10,default 20,high" 186 "-Q 10,default 20,high"
187 }, 187 },
188 'MAX_JOB_COUNT': {
189 'default': 1024,
190 'long-arg': '--max-job-count',
191 'type': int,
192 'help': "Maximum number of jobs each worker process executes "
193 "before resetting",
194 },
188 'CONCURRENT_JOBS': { 195 'CONCURRENT_JOBS': {
189 'default': 4, 196 'default': 4,
190 'long-arg': '--concurrent-jobs', 197 'long-arg': '--concurrent-jobs',
@@ -209,7 +216,15 @@ _CONFIG_DEFS = {
209 'long-arg': '--kill-grace-period', 216 'long-arg': '--kill-grace-period',
210 'type': int, 217 'type': int,
211 'help': 'Seconds to wait before forefully killing worker ' 218 'help': 'Seconds to wait before forefully killing worker '
212 'processes after receiving a SIGTERM' 219 'processes after receiving a SIGTERM',
220 },
221 'GLOBAL_TIMEOUT': {
222 'default': 300,
223 'long-arg': '--global-timeout',
224 'short-arg': '-t',
225 'type': int,
226 'help': "Global default timeout for all jobs unless the request "
227 "specifies otherwise",
213 } 228 }
214 }, 229 },
215 'scheduler': { 230 'scheduler': {
diff --git a/eventmq/tests/test_jobmanager.py b/eventmq/tests/test_jobmanager.py
index 105f76c..1ace379 100644
--- a/eventmq/tests/test_jobmanager.py
+++ b/eventmq/tests/test_jobmanager.py
@@ -18,7 +18,6 @@ import unittest
18import mock 18import mock
19 19
20from .. import constants, jobmanager 20from .. import constants, jobmanager
21from ..settings import conf
22 21
23ADDR = 'inproc://pour_the_rice_in_the_thing' 22ADDR = 'inproc://pour_the_rice_in_the_thing'
24 23
@@ -53,8 +52,7 @@ class TestCase(unittest.TestCase):
53 @mock.patch('eventmq.jobmanager.Sender.recv_multipart') 52 @mock.patch('eventmq.jobmanager.Sender.recv_multipart')
54 @mock.patch('eventmq.jobmanager.Poller.poll') 53 @mock.patch('eventmq.jobmanager.Poller.poll')
55 @mock.patch('eventmq.jobmanager.JobManager.maybe_send_heartbeat') 54 @mock.patch('eventmq.jobmanager.JobManager.maybe_send_heartbeat')
56 @mock.patch('eventmq.jobmanager.JobManager.send_ready') 55 def test__start_event_loop(self, maybe_send_hb_mock,
57 def test__start_event_loop(self, send_ready_mock, maybe_send_hb_mock,
58 poll_mock, sender_mock, process_msg_mock, 56 poll_mock, sender_mock, process_msg_mock,
59 pool_close_mock): 57 pool_close_mock):
60 jm = jobmanager.JobManager(override_settings={ 58 jm = jobmanager.JobManager(override_settings={
@@ -66,9 +64,6 @@ class TestCase(unittest.TestCase):
66 64
67 jm._start_event_loop() 65 jm._start_event_loop()
68 66
69 # send int(conf.CONCURRENT_JOBS) ready messages
70 self.assertEqual(conf.CONCURRENT_JOBS, send_ready_mock.call_count)
71
72 process_msg_mock.assert_called_with( 67 process_msg_mock.assert_called_with(
73 sender_mock.return_value) 68 sender_mock.return_value)
74 69
diff --git a/eventmq/tests/test_sender.py b/eventmq/tests/test_sender.py
index b2570a3..f292696 100644
--- a/eventmq/tests/test_sender.py
+++ b/eventmq/tests/test_sender.py
@@ -35,7 +35,7 @@ class TestCase(unittest.TestCase):
35 35
36 self.assert_(socket.poll() != 0) 36 self.assert_(socket.poll() != 0)
37 msg = socket.recv_multipart() 37 msg = socket.recv_multipart()
38 self.assertEqual(msg[0], self.sender.name) 38 self.assertEqual(msg[0].decode('ascii'), self.sender.name)
39 self.assertEqual(msg[1], b'') 39 self.assertEqual(msg[1], b'')
40 self.assertEqual(msg[2], b'1') 40 self.assertEqual(msg[2], b'1')
41 self.assertEqual(msg[3], b'Hello!') 41 self.assertEqual(msg[3], b'Hello!')
diff --git a/eventmq/tests/test_worker.py b/eventmq/tests/test_worker.py
index 115aa52..66808fd 100644
--- a/eventmq/tests/test_worker.py
+++ b/eventmq/tests/test_worker.py
@@ -39,7 +39,7 @@ def test_run_with_timeout():
39 'args': [2] 39 'args': [2]
40 } 40 }
41 41
42 msgid = worker._run(payload, logging.getLogger()) 42 msgid = worker._run_job(payload, logging.getLogger())
43 43
44 assert msgid 44 assert msgid
45 45
diff --git a/eventmq/worker.py b/eventmq/worker.py
index 04a35eb..14d4dfe 100644
--- a/eventmq/worker.py
+++ b/eventmq/worker.py
@@ -26,7 +26,7 @@ from multiprocessing import Process
26import os 26import os
27import sys 27import sys
28 28
29from threading import Event, Thread 29from threading import Thread
30 30
31from .settings import conf 31from .settings import conf
32 32
@@ -36,33 +36,6 @@ else:
36 import queue as Queue 36 import queue as Queue
37 37
38 38
39class StoppableThread(Thread):
40 """Thread class with a stop() method. The thread itself has to check
41 regularly for the stopped() condition."""
42
43 def __init__(self, target, name=None, args=()):
44 super(StoppableThread, self).__init__(name=name, target=target,
45 args=args)
46 self._return = None
47 self._stop = Event()
48
49 def stop(self):
50 self._stop.set()
51
52 def stopped(self):
53 return self._stop.isSet()
54
55 def run(self):
56 """
57 Overrides default run to have a side effect of saving the result
58 in self._return that will be accessible when the job completes,
59 or remain None after a timeout
60 """
61 if self._Thread__target is not None:
62 self._return = self._Thread__target(*self._Thread__args,
63 **self._Thread__kwargs)
64
65
66class MultiprocessWorker(Process): 39class MultiprocessWorker(Process):
67 """ 40 """
68 Defines a worker that spans the job in a multiprocessing task 41 Defines a worker that spans the job in a multiprocessing task
@@ -78,10 +51,7 @@ class MultiprocessWorker(Process):
78 51
79 @property 52 @property
80 def logger(self): 53 def logger(self):
81 if not hasattr(self, '_logger'): 54 return logging.getLogger(__name__ + '.' + str(os.getpid()))
82 self._logger = logging.getLogger(__name__ + '.' + str(os.getpid()))
83
84 return self._logger
85 55
86 def run(self): 56 def run(self):
87 """ 57 """
@@ -89,31 +59,39 @@ class MultiprocessWorker(Process):
89 59
90 This is designed to run in a seperate process. 60 This is designed to run in a seperate process.
91 """ 61 """
92 if self.run_setup: 62 # Define the 2 queues for communicating with the worker thread
93 self.run_setup = False 63 logger = self.logger
94 if any(conf.SETUP_CALLABLE) and any(conf.SETUP_PATH): 64
95 try: 65 worker_queue = Queue.Queue(1)
96 self.logger.debug("Running setup ({}.{}) for worker id {}" 66 worker_result_queue = Queue.Queue(1)
97 .format( 67 worker_thread = Thread(target=_run,
98 conf.SETUP_PATH, 68 args=(worker_queue,
99 conf.SETUP_CALLABLE, 69 worker_result_queue,
100 os.getpid())) 70 logger))
101 run_setup(conf.SETUP_PATH, conf.SETUP_CALLABLE)
102 except Exception as e:
103 self.logger.warning('Unable to do setup task ({}.{}): {}'
104 .format(conf.SETUP_PATH,
105 conf.SETUP_CALLABLE, str(e)))
106 71
107 import zmq 72 import zmq
108 zmq.Context.instance().term() 73 zmq.Context.instance().term()
109 74
75 self.output_queue.put(
76 {'msgid': None,
77 'return': None,
78 'death': False,
79 'pid': os.getpid(),
80 'callback': 'worker_ready'}
81 )
82
83 callback = 'premature_death'
84
85 worker_thread.start()
86
110 # Main execution loop, only break in cases that we can't recover from 87 # Main execution loop, only break in cases that we can't recover from
111 # or we reach job count limit 88 # or we reach job count limit
112 while True: 89 while True:
113 try: 90 try:
114 payload = self.input_queue.get(block=False, timeout=1000) 91 payload = self.input_queue.get(timeout=1)
115 if payload == 'DONE': 92 if payload == 'DONE':
116 break 93 break
94
117 except Queue.Empty: 95 except Queue.Empty:
118 if os.getppid() != self.ppid: 96 if os.getppid() != self.ppid:
119 break 97 break
@@ -121,53 +99,71 @@ class MultiprocessWorker(Process):
121 except Exception as e: 99 except Exception as e:
122 break 100 break
123 finally: 101 finally:
102 # If I'm an orphan, die
124 if os.getppid() != self.ppid: 103 if os.getppid() != self.ppid:
125 break 104 break
126 105
127 try: 106 try:
128 return_val = 'None' 107 return_val = 'None'
129 self.job_count += 1 108 self.job_count += 1
130 timeout = payload.get("timeout", None) 109 timeout = payload.get("timeout") or conf.GLOBAL_TIMEOUT
131 msgid = payload.get('msgid', '') 110 msgid = payload.get('msgid', '')
132 callback = payload.get('callback', '') 111 callback = payload.get('callback', '')
112 logger.debug("Putting on thread queue msgid: {}".format(
113 msgid))
114
115 worker_queue.put(payload['params'])
133 116
134 worker_thread = StoppableThread(target=_run, 117 try:
135 args=(payload['params'], 118 return_val = worker_result_queue.get(timeout=timeout)
136 self.logger))
137 worker_thread.start()
138 worker_thread.join(timeout)
139 return_val = {"value": worker_thread._return}
140 119
141 if worker_thread.isAlive(): 120 logger.debug("Got from result queue msgid: {}".format(
142 worker_thread.stop() 121 msgid))
122 except Queue.Empty:
143 return_val = 'TimeoutError' 123 return_val = 'TimeoutError'
144 124
125 return_val = {"value": return_val}
126
145 try: 127 try:
146 self.output_queue.put_nowait( 128 self.output_queue.put_nowait(
147 {'msgid': msgid, 129 {'msgid': msgid,
148 'return': return_val, 130 'return': return_val,
131 'death': self.job_count >= conf.MAX_JOB_COUNT or
132 return_val == 'TimeoutError',
149 'pid': os.getpid(), 133 'pid': os.getpid(),
150 'callback': callback} 134 'callback': callback}
151 ) 135 )
152 except Exception: 136 except Exception:
153 break 137 break
154 138
139 if return_val["value"] == 'TimeoutError':
140 break
141
155 except Exception as e: 142 except Exception as e:
156 return_val = str(e) 143 return_val = str(e)
157 144
158 if self.job_count >= conf.MAX_JOB_COUNT: 145 if self.job_count >= conf.MAX_JOB_COUNT:
146 logger.debug("Worker reached job limit, exiting")
159 break 147 break
160 148
149 worker_queue.put('DONE')
150 worker_thread.join(timeout=5)
151
161 self.output_queue.put( 152 self.output_queue.put(
162 {'msgid': None, 153 {'msgid': None,
163 'return': 'DEATH', 154 'return': 'DEATH',
155 'death': True,
164 'pid': os.getpid(), 156 'pid': os.getpid(),
165 'callback': 'worker_death'} 157 'callback': 'worker_death'}
166 ) 158 )
167 self.logger.debug("Worker death") 159
160 logger.debug("Worker death")
161
162 if worker_thread.is_alive():
163 logger.debug("Worker thread did not die gracefully")
168 164
169 165
170def _run(payload, logger): 166def _run(queue, result_queue, logger):
171 """ 167 """
172 Takes care of actually executing the code given a message payload 168 Takes care of actually executing the code given a message payload
173 169
@@ -181,6 +177,39 @@ def _run(payload, logger):
181 "class_kwargs": {"value": 2} 177 "class_kwargs": {"value": 2}
182 } 178 }
183 """ 179 """
180 if any(conf.SETUP_CALLABLE) and any(conf.SETUP_PATH):
181 try:
182 logger.debug("Running setup ({}.{}) for worker id {}"
183 .format(
184 conf.SETUP_PATH,
185 conf.SETUP_CALLABLE,
186 os.getpid()))
187 run_setup(conf.SETUP_PATH, conf.SETUP_CALLABLE)
188 except Exception as e:
189 logger.warning('Unable to do setup task ({}.{}): {}'
190 .format(conf.SETUP_PATH,
191 conf.SETUP_CALLABLE, str(e)))
192
193 while True:
194 # Blocking get so we don't spin cycles reading over and over
195 try:
196 payload = queue.get()
197 except Exception as e:
198 logger.exception(e)
199 continue
200
201 if payload == 'DONE':
202 break
203
204 return_val = _run_job(payload, logger)
205 # Signal that we're done with this job and put its return value on the
206 # result queue
207 result_queue.put(return_val)
208
209 logger.debug("Worker thread death")
210
211
212def _run_job(payload, logger):
184 try: 213 try:
185 if ":" in payload["path"]: 214 if ":" in payload["path"]:
186 _pkgsplit = payload["path"].split(':') 215 _pkgsplit = payload["path"].split(':')
@@ -224,9 +253,8 @@ def _run(payload, logger):
224 return_val = callable_(*args, **kwargs) 253 return_val = callable_(*args, **kwargs)
225 except Exception as e: 254 except Exception as e:
226 logger.exception(e) 255 logger.exception(e)
227 return str(e) 256 return_val = str(e)
228 257
229 # Signal that we're done with this job
230 return return_val 258 return return_val
231 259
232 260
diff --git a/setup.py b/setup.py
index 694c6cc..72e8e35 100644
--- a/setup.py
+++ b/setup.py
@@ -7,7 +7,7 @@ from setuptools import find_packages, setup
7 7
8setup( 8setup(
9 name='eventmq', 9 name='eventmq',
10 version='0.3.4', 10 version='0.3.4.4',
11 description='EventMQ job execution and messaging system based on ZeroMQ', 11 description='EventMQ job execution and messaging system based on ZeroMQ',
12 packages=find_packages(), 12 packages=find_packages(),
13 install_requires=['pyzmq==16.0.2', 13 install_requires=['pyzmq==16.0.2',