aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rwxr-xr-xbin/emq-jobmanager9
-rwxr-xr-xbin/send_msg4
-rw-r--r--eventmq/__init__.py2
-rw-r--r--eventmq/conf.py1
-rw-r--r--eventmq/jobmanager.py79
-rw-r--r--eventmq/sender.py10
-rw-r--r--eventmq/tests/test_jobmanager.py6
-rw-r--r--eventmq/tests/test_sender.py2
-rw-r--r--eventmq/worker.py55
-rw-r--r--setup.py2
10 files changed, 124 insertions, 46 deletions
diff --git a/bin/emq-jobmanager b/bin/emq-jobmanager
index 4071ada..69c21ed 100755
--- a/bin/emq-jobmanager
+++ b/bin/emq-jobmanager
@@ -34,7 +34,10 @@ if __name__ == "__main__":
34 parser.add_argument('--jobs', '-J', type=int, nargs='?', 34 parser.add_argument('--jobs', '-J', type=int, nargs='?',
35 help='the max number of concurrent jobs to manage at ' 35 help='the max number of concurrent jobs to manage at '
36 'a time') 36 'a time')
37 37 parser.add_argument('--name', '-n', type=str, default=None,
38 help="A unique ame to give this node. If one "
39 "isn't provided a random uuid will be "
40 "generated")
38 args = parser.parse_args() 41 args = parser.parse_args()
39 42
40 # Overwrite the default config location with the one passed to the app 43 # Overwrite the default config location with the one passed to the app
@@ -48,6 +51,8 @@ if __name__ == "__main__":
48 51
49 broker_addr = args.broker_addr 52 broker_addr = args.broker_addr
50 concurrent_jobs = args.jobs 53 concurrent_jobs = args.jobs
54 name = args.name
51 55
52 j = JobManager(queues=queues, concurrent_jobs=concurrent_jobs) 56 j = JobManager(queues=queues, concurrent_jobs=concurrent_jobs,
57 name=name)
53 j.jobmanager_main(broker_addr=broker_addr) 58 j.jobmanager_main(broker_addr=broker_addr)
diff --git a/bin/send_msg b/bin/send_msg
index 47de45f..c3e88fa 100755
--- a/bin/send_msg
+++ b/bin/send_msg
@@ -22,11 +22,11 @@ if __name__ == "__main__":
22 'callable': 'work_job', 22 'callable': 'work_job',
23 'class_args': ('blurp',), 23 'class_args': ('blurp',),
24 'class_kwargs': {'kwarg1': True}, 24 'class_kwargs': {'kwarg1': True},
25 'args': (50, ), 25 'args': (10, ),
26 'kwargs': {} 26 'kwargs': {}
27 }] 27 }]
28 28
29 msgid = send_request(s, msg, guarantee=True, reply_requested=True, timeout=10) 29 msgid = send_request(s, msg, guarantee=True, reply_requested=True, timeout=10, queue='other')
30 msgid = send_request(s, msg, guarantee=True, reply_requested=True) 30 msgid = send_request(s, msg, guarantee=True, reply_requested=True)
31 # print 'Sent message, use msgid={} to track responses'.format(msgid) 31 # print 'Sent message, use msgid={} to track responses'.format(msgid)
32 # events = dict(poller.poll(500)) 32 # events = dict(poller.poll(500))
diff --git a/eventmq/__init__.py b/eventmq/__init__.py
index 30572f3..60bb61c 100644
--- a/eventmq/__init__.py
+++ b/eventmq/__init__.py
@@ -1,5 +1,5 @@
1__author__ = 'EventMQ Contributors' 1__author__ = 'EventMQ Contributors'
2__version__ = '0.3.4.3' 2__version__ = '0.3.4.6'
3 3
4PROTOCOL_VERSION = 'eMQP/1.0' 4PROTOCOL_VERSION = 'eMQP/1.0'
5 5
diff --git a/eventmq/conf.py b/eventmq/conf.py
index 5710933..4749e6b 100644
--- a/eventmq/conf.py
+++ b/eventmq/conf.py
@@ -95,5 +95,6 @@ SETUP_CALLABLE = ''
95# Time to wait after receiving SIGTERM to kill the workers in the jobmanager 95# Time to wait after receiving SIGTERM to kill the workers in the jobmanager
96# forecfully 96# forecfully
97KILL_GRACE_PERIOD = 300 97KILL_GRACE_PERIOD = 300
98GLOBAL_TIMEOUT = 300
98 99
99# }}} 100# }}}
diff --git a/eventmq/jobmanager.py b/eventmq/jobmanager.py
index d4a0ddc..4840fac 100644
--- a/eventmq/jobmanager.py
+++ b/eventmq/jobmanager.py
@@ -75,9 +75,11 @@ class JobManager(HeartbeatMixin, EMQPService):
75 """ 75 """
76 super(JobManager, self).__init__(*args, **kwargs) 76 super(JobManager, self).__init__(*args, **kwargs)
77 77
78 setup_logger("eventmq")
79
78 #: Define the name of this JobManager instance. Useful to know when 80 #: Define the name of this JobManager instance. Useful to know when
79 #: referring to the logs. 81 #: referring to the logs.
80 self.name = kwargs.pop('name', generate_device_name()) 82 self.name = kwargs.pop('name', None) or generate_device_name()
81 logger.info('Initializing JobManager {}...'.format(self.name)) 83 logger.info('Initializing JobManager {}...'.format(self.name))
82 84
83 #: keep track of workers 85 #: keep track of workers
@@ -94,6 +96,7 @@ class JobManager(HeartbeatMixin, EMQPService):
94 signal.signal(signal.SIGTERM, self.sigterm_handler) 96 signal.signal(signal.SIGTERM, self.sigterm_handler)
95 signal.signal(signal.SIGINT, self.sigterm_handler) 97 signal.signal(signal.SIGINT, self.sigterm_handler)
96 signal.signal(signal.SIGQUIT, self.sigterm_handler) 98 signal.signal(signal.SIGQUIT, self.sigterm_handler)
99 signal.signal(signal.SIGUSR1, self.handle_pdb)
97 100
98 #: JobManager starts out by INFORMing the router of it's existence, 101 #: JobManager starts out by INFORMing the router of it's existence,
99 #: then telling the router that it is READY. The reply will be the unit 102 #: then telling the router that it is READY. The reply will be the unit
@@ -103,11 +106,29 @@ class JobManager(HeartbeatMixin, EMQPService):
103 106
104 self.poller = Poller() 107 self.poller = Poller()
105 108
109 #: Stats and monitoring information
110
111 #: Jobs in flight tracks all jobs currently executing.
112 #: Key: msgid, Value: The message with all the details of the job
113 self.jobs_in_flight = {}
114
115 #: Running total number of REQUEST messages received on the broker
116 self.total_requests = 0
117 #: Running total number of READY messages sent to the broker
118 self.total_ready_sent = 0
119 #: Keep track of what pids are servicing our requests
120 #: Key: pid, Value: # of jobs completed on the process with that pid
121 self.pid_distribution = {}
122
106 #: Setup worker queues 123 #: Setup worker queues
107 self.request_queue = mp_queue() 124 self.request_queue = mp_queue()
108 self.finished_queue = mp_queue() 125 self.finished_queue = mp_queue()
109 self._setup() 126 self._setup()
110 127
128 def handle_pdb(self, sig, frame):
129 import pdb
130 pdb.Pdb().set_trace(frame)
131
111 @property 132 @property
112 def workers(self): 133 def workers(self):
113 if not hasattr(self, '_workers'): 134 if not hasattr(self, '_workers'):
@@ -127,9 +148,6 @@ class JobManager(HeartbeatMixin, EMQPService):
127 # Acknowledgment has come 148 # Acknowledgment has come
128 # Send a READY for each available worker 149 # Send a READY for each available worker
129 150
130 for i in range(0, len(self.workers)):
131 self.send_ready()
132
133 self.status = STATUS.running 151 self.status = STATUS.running
134 152
135 try: 153 try:
@@ -167,7 +185,7 @@ class JobManager(HeartbeatMixin, EMQPService):
167 sys.exit(0) 185 sys.exit(0)
168 else: 186 else:
169 try: 187 try:
170 events = self.poller.poll(10) 188 events = self.poller.poll(1000)
171 except zmq.ZMQError: 189 except zmq.ZMQError:
172 logger.debug('Disconnecting due to ZMQError while' 190 logger.debug('Disconnecting due to ZMQError while'
173 ' polling') 191 ' polling')
@@ -224,13 +242,21 @@ class JobManager(HeartbeatMixin, EMQPService):
224 242
225 logger.debug(resp) 243 logger.debug(resp)
226 pid = resp['pid'] 244 pid = resp['pid']
245 msgid = resp['msgid']
246 callback = resp['callback']
247 death = resp['death']
227 248
228 callback = getattr(self, resp['callback']) 249 callback = getattr(self, callback)
229 callback(resp['return'], resp['msgid']) 250 callback(resp['return'], msgid, death, pid)
230 251
231 if resp['return'] == 'DEATH': 252 if msgid in self.jobs_in_flight:
232 if pid in self._workers.keys(): 253 del self.jobs_in_flight[msgid]
233 del self._workers[pid] 254
255 if not death:
256 if pid not in self.pid_distribution:
257 self.pid_distribution[pid] = 1
258 else:
259 self.pid_distribution[pid] += 1
234 260
235 def on_request(self, msgid, msg): 261 def on_request(self, msgid, msg):
236 """ 262 """
@@ -262,6 +288,9 @@ class JobManager(HeartbeatMixin, EMQPService):
262 # queue_name = msg[0] 288 # queue_name = msg[0]
263 289
264 # Parse REQUEST message values 290 # Parse REQUEST message values
291
292 self.total_requests += 1
293
265 headers = msg[1] 294 headers = msg[1]
266 payload = deserializer(msg[2]) 295 payload = deserializer(msg[2])
267 params = payload[1] 296 params = payload[1]
@@ -279,6 +308,8 @@ class JobManager(HeartbeatMixin, EMQPService):
279 payload['msgid'] = msgid 308 payload['msgid'] = msgid
280 payload['callback'] = callback 309 payload['callback'] = callback
281 310
311 self.jobs_in_flight[msgid] = (monotonic(), payload)
312
282 self.request_queue.put(payload) 313 self.request_queue.put(payload)
283 314
284 def premature_death(self, reply, msgid): 315 def premature_death(self, reply, msgid):
@@ -287,31 +318,36 @@ class JobManager(HeartbeatMixin, EMQPService):
287 """ 318 """
288 return 319 return
289 320
290 def worker_death(self, reply, msgid): 321 def worker_death(self, reply, msgid, death, pid):
291 """ 322 """
292 Worker died of natural causes 323 Worker died of natural causes, ensure its death and
324 remove from tracking, will be replaced on next heartbeat
293 """ 325 """
294 return 326 if pid in self._workers.keys():
327 del self._workers[pid]
328
329 def worker_ready(self, reply, msgid, death, pid):
330 self.send_ready()
295 331
296 def worker_done_with_reply(self, reply, msgid): 332 def worker_done_with_reply(self, reply, msgid, death, pid):
297 """ 333 """
298 Worker finished a job and requested the return value 334 Worker finished a job and requested the return value
299 """ 335 """
300 try: 336 try:
301 reply = serializer(reply) 337 reply = serializer(reply)
302 except TypeError as e: 338 except TypeError as e:
303 reply = {"value": str(e)} 339 reply = serializer({"value": str(e)})
304 340
305 self.send_reply(reply, msgid) 341 self.send_reply(reply, msgid)
306 342
307 if self.status != STATUS.stopping: 343 if self.status != STATUS.stopping and not death:
308 self.send_ready() 344 self.send_ready()
309 345
310 def worker_done(self, reply, msgid): 346 def worker_done(self, reply, msgid, death, pid):
311 """ 347 """
312 Worker finished a job, notify broker of an additional slot opening 348 Worker finished a job, notify broker of an additional slot opening
313 """ 349 """
314 if self.status != STATUS.stopping: 350 if self.status != STATUS.stopping and not death:
315 self.send_ready() 351 self.send_ready()
316 352
317 def send_ready(self): 353 def send_ready(self):
@@ -319,6 +355,7 @@ class JobManager(HeartbeatMixin, EMQPService):
319 send the READY command upstream to indicate that JobManager is ready 355 send the READY command upstream to indicate that JobManager is ready
320 for another REQUEST message. 356 for another REQUEST message.
321 """ 357 """
358 self.total_ready_sent += 1
322 sendmsg(self.outgoing, 'READY') 359 sendmsg(self.outgoing, 'READY')
323 360
324 def send_reply(self, reply, msgid): 361 def send_reply(self, reply, msgid):
@@ -346,6 +383,9 @@ class JobManager(HeartbeatMixin, EMQPService):
346 necessary 383 necessary
347 """ 384 """
348 # Kill workers that aren't alive 385 # Kill workers that aren't alive
386 logger.debug("Jobs in flight: {}".format(len(self.jobs_in_flight)))
387 logger.debug("Total requests: {}".format(self.total_requests))
388 logger.debug("Total ready sent: {}".format(self.total_ready_sent))
349 try: 389 try:
350 [self.kill_worker(w.pid, signal.SIGKILL) for w in self.workers 390 [self.kill_worker(w.pid, signal.SIGKILL) for w in self.workers
351 if not w.is_alive] 391 if not w.is_alive]
@@ -368,7 +408,7 @@ class JobManager(HeartbeatMixin, EMQPService):
368 try: 408 try:
369 os.kill(pid, signal) 409 os.kill(pid, signal)
370 except Exception as e: 410 except Exception as e:
371 logger.excpetion( 411 logger.exception(
372 "Encountered exception trying to send {} to worker {}: {}" 412 "Encountered exception trying to send {} to worker {}: {}"
373 .format(signal, pid, str(e))) 413 .format(signal, pid, str(e)))
374 414
@@ -404,7 +444,6 @@ class JobManager(HeartbeatMixin, EMQPService):
404 Args: 444 Args:
405 broker_addr (str): The address of the broker to connect to. 445 broker_addr (str): The address of the broker to connect to.
406 """ 446 """
407 setup_logger('')
408 import_settings() 447 import_settings()
409 import_settings(section='jobmanager') 448 import_settings(section='jobmanager')
410 449
diff --git a/eventmq/sender.py b/eventmq/sender.py
index 063cd74..12dec05 100644
--- a/eventmq/sender.py
+++ b/eventmq/sender.py
@@ -18,6 +18,7 @@
18The sender is responsible for sending messages 18The sender is responsible for sending messages
19""" 19"""
20import logging 20import logging
21import sys
21import uuid 22import uuid
22 23
23import zmq 24import zmq
@@ -136,9 +137,14 @@ class Sender(ZMQSendMixin, ZMQReceiveMixin):
136 if self.zsocket: 137 if self.zsocket:
137 self.zsocket.close() 138 self.zsocket.close()
138 139
139 self.name = kwargs.pop('name', str(uuid.uuid4()).encode('ascii'))
140 self.zsocket = kwargs.pop('socket', self.zcontext.socket(zmq.DEALER)) 140 self.zsocket = kwargs.pop('socket', self.zcontext.socket(zmq.DEALER))
141 self.zsocket.setsockopt(zmq.IDENTITY, self.name) 141
142 self.name = kwargs.pop('name', str(uuid.uuid4()))
143
144 if sys.version[0] == '2':
145 self.zsocket.setsockopt(zmq.IDENTITY, self.name)
146 else:
147 self.zsocket.setsockopt_string(zmq.IDENTITY, str(self.name))
142 148
143 self.status = constants.STATUS.ready 149 self.status = constants.STATUS.ready
144 150
diff --git a/eventmq/tests/test_jobmanager.py b/eventmq/tests/test_jobmanager.py
index 927e5f6..566c583 100644
--- a/eventmq/tests/test_jobmanager.py
+++ b/eventmq/tests/test_jobmanager.py
@@ -49,8 +49,7 @@ class TestCase(unittest.TestCase):
49 @mock.patch('eventmq.jobmanager.Sender.recv_multipart') 49 @mock.patch('eventmq.jobmanager.Sender.recv_multipart')
50 @mock.patch('eventmq.jobmanager.Poller.poll') 50 @mock.patch('eventmq.jobmanager.Poller.poll')
51 @mock.patch('eventmq.jobmanager.JobManager.maybe_send_heartbeat') 51 @mock.patch('eventmq.jobmanager.JobManager.maybe_send_heartbeat')
52 @mock.patch('eventmq.jobmanager.JobManager.send_ready') 52 def test__start_event_loop(self, maybe_send_hb_mock,
53 def test__start_event_loop(self, send_ready_mock, maybe_send_hb_mock,
54 poll_mock, sender_mock, process_msg_mock, 53 poll_mock, sender_mock, process_msg_mock,
55 pool_close_mock): 54 pool_close_mock):
56 jm = jobmanager.JobManager() 55 jm = jobmanager.JobManager()
@@ -60,9 +59,6 @@ class TestCase(unittest.TestCase):
60 59
61 jm._start_event_loop() 60 jm._start_event_loop()
62 61
63 # send int(conf.CONCURRENT_JOBS) ready messages
64 self.assertEqual(conf.CONCURRENT_JOBS, send_ready_mock.call_count)
65
66 process_msg_mock.assert_called_with( 62 process_msg_mock.assert_called_with(
67 sender_mock.return_value) 63 sender_mock.return_value)
68 64
diff --git a/eventmq/tests/test_sender.py b/eventmq/tests/test_sender.py
index b2570a3..f292696 100644
--- a/eventmq/tests/test_sender.py
+++ b/eventmq/tests/test_sender.py
@@ -35,7 +35,7 @@ class TestCase(unittest.TestCase):
35 35
36 self.assert_(socket.poll() != 0) 36 self.assert_(socket.poll() != 0)
37 msg = socket.recv_multipart() 37 msg = socket.recv_multipart()
38 self.assertEqual(msg[0], self.sender.name) 38 self.assertEqual(msg[0].decode('ascii'), self.sender.name)
39 self.assertEqual(msg[1], b'') 39 self.assertEqual(msg[1], b'')
40 self.assertEqual(msg[2], b'1') 40 self.assertEqual(msg[2], b'1')
41 self.assertEqual(msg[3], b'Hello!') 41 self.assertEqual(msg[3], b'Hello!')
diff --git a/eventmq/worker.py b/eventmq/worker.py
index 4f42a2c..859b831 100644
--- a/eventmq/worker.py
+++ b/eventmq/worker.py
@@ -51,10 +51,7 @@ class MultiprocessWorker(Process):
51 51
52 @property 52 @property
53 def logger(self): 53 def logger(self):
54 if not hasattr(self, '_logger'): 54 return logging.getLogger(__name__ + '.' + str(os.getpid()))
55 self._logger = logging.getLogger(__name__ + '.' + str(os.getpid()))
56
57 return self._logger
58 55
59 def run(self): 56 def run(self):
60 """ 57 """
@@ -63,16 +60,26 @@ class MultiprocessWorker(Process):
63 This is designed to run in a seperate process. 60 This is designed to run in a seperate process.
64 """ 61 """
65 # Define the 2 queues for communicating with the worker thread 62 # Define the 2 queues for communicating with the worker thread
66 worker_queue = Queue.Queue() 63 logger = self.logger
67 worker_result_queue = Queue.Queue() 64
65 worker_queue = Queue.Queue(1)
66 worker_result_queue = Queue.Queue(1)
68 worker_thread = Thread(target=_run, 67 worker_thread = Thread(target=_run,
69 args=(worker_queue, 68 args=(worker_queue,
70 worker_result_queue, 69 worker_result_queue,
71 self.logger)) 70 logger))
72 71
73 import zmq 72 import zmq
74 zmq.Context.instance().term() 73 zmq.Context.instance().term()
75 74
75 self.output_queue.put(
76 {'msgid': None,
77 'return': None,
78 'death': False,
79 'pid': os.getpid(),
80 'callback': 'worker_ready'}
81 )
82
76 callback = 'premature_death' 83 callback = 'premature_death'
77 84
78 worker_thread.start() 85 worker_thread.start()
@@ -99,14 +106,19 @@ class MultiprocessWorker(Process):
99 try: 106 try:
100 return_val = 'None' 107 return_val = 'None'
101 self.job_count += 1 108 self.job_count += 1
102 timeout = payload.get("timeout", None) 109 timeout = payload.get("timeout") or conf.GLOBAL_TIMEOUT
103 msgid = payload.get('msgid', '') 110 msgid = payload.get('msgid', '')
104 callback = payload.get('callback', '') 111 callback = payload.get('callback', '')
112 logger.debug("Putting on thread queue msgid: {}".format(
113 msgid))
105 114
106 worker_queue.put(payload['params']) 115 worker_queue.put(payload['params'])
107 116
108 try: 117 try:
109 return_val = worker_result_queue.get(timeout=timeout) 118 return_val = worker_result_queue.get(timeout=timeout)
119
120 logger.debug("Got from result queue msgid: {}".format(
121 msgid))
110 except Queue.Empty: 122 except Queue.Empty:
111 return_val = 'TimeoutError' 123 return_val = 'TimeoutError'
112 124
@@ -116,6 +128,8 @@ class MultiprocessWorker(Process):
116 self.output_queue.put_nowait( 128 self.output_queue.put_nowait(
117 {'msgid': msgid, 129 {'msgid': msgid,
118 'return': return_val, 130 'return': return_val,
131 'death': self.job_count >= conf.MAX_JOB_COUNT or
132 return_val["value"] == 'TimeoutError',
119 'pid': os.getpid(), 133 'pid': os.getpid(),
120 'callback': callback} 134 'callback': callback}
121 ) 135 )
@@ -129,15 +143,24 @@ class MultiprocessWorker(Process):
129 return_val = str(e) 143 return_val = str(e)
130 144
131 if self.job_count >= conf.MAX_JOB_COUNT: 145 if self.job_count >= conf.MAX_JOB_COUNT:
146 logger.debug("Worker reached job limit, exiting")
132 break 147 break
133 148
149 worker_queue.put('DONE')
150 worker_thread.join(timeout=5)
151
134 self.output_queue.put( 152 self.output_queue.put(
135 {'msgid': None, 153 {'msgid': None,
136 'return': 'DEATH', 154 'return': 'DEATH',
155 'death': True,
137 'pid': os.getpid(), 156 'pid': os.getpid(),
138 'callback': 'worker_death'} 157 'callback': 'worker_death'}
139 ) 158 )
140 self.logger.debug("Worker death") 159
160 logger.debug("Worker death")
161
162 if worker_thread.is_alive():
163 logger.debug("Worker thread did not die gracefully")
141 164
142 165
143def _run(queue, result_queue, logger): 166def _run(queue, result_queue, logger):
@@ -169,14 +192,22 @@ def _run(queue, result_queue, logger):
169 192
170 while True: 193 while True:
171 # Blocking get so we don't spin cycles reading over and over 194 # Blocking get so we don't spin cycles reading over and over
172 payload = queue.get() 195 try:
173 return_val = _run_job(payload, logger) 196 payload = queue.get()
197 except Exception as e:
198 logger.exception(e)
199 continue
200
201 if payload == 'DONE':
202 break
174 203
204 return_val = _run_job(payload, logger)
175 # Signal that we're done with this job and put its return value on the 205 # Signal that we're done with this job and put its return value on the
176 # result queue 206 # result queue
177 queue.task_done()
178 result_queue.put(return_val) 207 result_queue.put(return_val)
179 208
209 logger.debug("Worker thread death")
210
180 211
181def _run_job(payload, logger): 212def _run_job(payload, logger):
182 try: 213 try:
diff --git a/setup.py b/setup.py
index ee81dae..9ac6089 100644
--- a/setup.py
+++ b/setup.py
@@ -7,7 +7,7 @@ from setuptools import find_packages, setup
7 7
8setup( 8setup(
9 name='eventmq', 9 name='eventmq',
10 version='0.3.4.3', 10 version='0.3.4.6',
11 description='EventMQ job execution and messaging system based on ZeroMQ', 11 description='EventMQ job execution and messaging system based on ZeroMQ',
12 packages=find_packages(), 12 packages=find_packages(),
13 install_requires=['pyzmq==15.4.0', 13 install_requires=['pyzmq==15.4.0',