aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--eventmq/jobmanager.py12
-rw-r--r--eventmq/sender.py6
-rw-r--r--eventmq/tests/test_sender.py2
-rw-r--r--eventmq/worker.py38
4 files changed, 41 insertions, 17 deletions
diff --git a/eventmq/jobmanager.py b/eventmq/jobmanager.py
index 16cb506..ec15b3c 100644
--- a/eventmq/jobmanager.py
+++ b/eventmq/jobmanager.py
@@ -79,7 +79,7 @@ class JobManager(HeartbeatMixin, EMQPService):
79 79
80 #: Define the name of this JobManager instance. Useful to know when 80 #: Define the name of this JobManager instance. Useful to know when
81 #: referring to the logs. 81 #: referring to the logs.
82 self.name = kwargs.pop('name') or generate_device_name() 82 self.name = kwargs.pop('name', None) or generate_device_name()
83 logger.info('Initializing JobManager {}...'.format(self.name)) 83 logger.info('Initializing JobManager {}...'.format(self.name))
84 84
85 #: keep track of workers 85 #: keep track of workers
@@ -106,9 +106,12 @@ class JobManager(HeartbeatMixin, EMQPService):
106 106
107 self.poller = Poller() 107 self.poller = Poller()
108 108
109 # Stats and monitoring information
109 self.jobs_in_flight = {} 110 self.jobs_in_flight = {}
110 self.total_requests = 0 111 self.total_requests = 0
111 self.total_ready_sent = 0 112 self.total_ready_sent = 0
113 # Keep track of what pids are servicing our requests
114 self.pid_distribution = {}
112 115
113 #: Setup worker queues 116 #: Setup worker queues
114 self.request_queue = mp_queue() 117 self.request_queue = mp_queue()
@@ -242,6 +245,12 @@ class JobManager(HeartbeatMixin, EMQPService):
242 if msgid in self.jobs_in_flight: 245 if msgid in self.jobs_in_flight:
243 del self.jobs_in_flight[msgid] 246 del self.jobs_in_flight[msgid]
244 247
248 if not death:
249 if pid not in self.pid_distribution:
250 self.pid_distribution[pid] = 1
251 else:
252 self.pid_distribution[pid] += 1
253
245 def on_request(self, msgid, msg): 254 def on_request(self, msgid, msg):
246 """ 255 """
247 Handles a REQUEST command 256 Handles a REQUEST command
@@ -307,7 +316,6 @@ class JobManager(HeartbeatMixin, EMQPService):
307 Worker died of natural causes, ensure its death and 316 Worker died of natural causes, ensure its death and
308 remove from tracking, will be replaced on next heartbeat 317 remove from tracking, will be replaced on next heartbeat
309 """ 318 """
310 self.kill_worker(pid, signal.SIGKILL)
311 if pid in self._workers.keys(): 319 if pid in self._workers.keys():
312 del self._workers[pid] 320 del self._workers[pid]
313 321
diff --git a/eventmq/sender.py b/eventmq/sender.py
index 8452abd..f0cc09c 100644
--- a/eventmq/sender.py
+++ b/eventmq/sender.py
@@ -18,6 +18,7 @@
18The sender is responsible for sending messages 18The sender is responsible for sending messages
19""" 19"""
20import logging 20import logging
21import sys
21import uuid 22import uuid
22 23
23import zmq 24import zmq
@@ -137,7 +138,10 @@ class Sender(ZMQSendMixin, ZMQReceiveMixin):
137 self.zsocket.close() 138 self.zsocket.close()
138 139
139 self.zsocket = kwargs.pop('socket', self.zcontext.socket(zmq.DEALER)) 140 self.zsocket = kwargs.pop('socket', self.zcontext.socket(zmq.DEALER))
140 self.zsocket.setsockopt(zmq.IDENTITY, self.name) 141 if sys.version[0] == '2':
142 self.zsocket.setsockopt(zmq.IDENTITY, self.name)
143 else:
144 self.zsocket.setsockopt_string(zmq.IDENTITY, str(self.name))
141 145
142 self.status = constants.STATUS.ready 146 self.status = constants.STATUS.ready
143 147
diff --git a/eventmq/tests/test_sender.py b/eventmq/tests/test_sender.py
index b2570a3..f292696 100644
--- a/eventmq/tests/test_sender.py
+++ b/eventmq/tests/test_sender.py
@@ -35,7 +35,7 @@ class TestCase(unittest.TestCase):
35 35
36 self.assert_(socket.poll() != 0) 36 self.assert_(socket.poll() != 0)
37 msg = socket.recv_multipart() 37 msg = socket.recv_multipart()
38 self.assertEqual(msg[0], self.sender.name) 38 self.assertEqual(msg[0].decode('ascii'), self.sender.name)
39 self.assertEqual(msg[1], b'') 39 self.assertEqual(msg[1], b'')
40 self.assertEqual(msg[2], b'1') 40 self.assertEqual(msg[2], b'1')
41 self.assertEqual(msg[3], b'Hello!') 41 self.assertEqual(msg[3], b'Hello!')
diff --git a/eventmq/worker.py b/eventmq/worker.py
index 838be09..69567d5 100644
--- a/eventmq/worker.py
+++ b/eventmq/worker.py
@@ -51,10 +51,7 @@ class MultiprocessWorker(Process):
51 51
52 @property 52 @property
53 def logger(self): 53 def logger(self):
54 if not hasattr(self, '_logger'): 54 return logging.getLogger(__name__ + '.' + str(os.getpid()))
55 self._logger = logging.getLogger(__name__ + '.' + str(os.getpid()))
56
57 return self._logger
58 55
59 def run(self): 56 def run(self):
60 """ 57 """
@@ -63,12 +60,14 @@ class MultiprocessWorker(Process):
63 This is designed to run in a seperate process. 60 This is designed to run in a seperate process.
64 """ 61 """
65 # Define the 2 queues for communicating with the worker thread 62 # Define the 2 queues for communicating with the worker thread
66 worker_queue = Queue.Queue() 63 logger = self.logger
67 worker_result_queue = Queue.Queue() 64
65 worker_queue = Queue.Queue(1)
66 worker_result_queue = Queue.Queue(1)
68 worker_thread = Thread(target=_run, 67 worker_thread = Thread(target=_run,
69 args=(worker_queue, 68 args=(worker_queue,
70 worker_result_queue, 69 worker_result_queue,
71 self.logger)) 70 logger))
72 71
73 import zmq 72 import zmq
74 zmq.Context.instance().term() 73 zmq.Context.instance().term()
@@ -107,10 +106,10 @@ class MultiprocessWorker(Process):
107 try: 106 try:
108 return_val = 'None' 107 return_val = 'None'
109 self.job_count += 1 108 self.job_count += 1
110 timeout = payload.get("timeout", conf.GLOBAL_TIMEOUT) 109 timeout = payload.get("timeout") or conf.GLOBAL_TIMEOUT
111 msgid = payload.get('msgid', '') 110 msgid = payload.get('msgid', '')
112 callback = payload.get('callback', '') 111 callback = payload.get('callback', '')
113 self.logger.debug("Putting on thread queue msgid: {}".format( 112 logger.debug("Putting on thread queue msgid: {}".format(
114 msgid)) 113 msgid))
115 114
116 worker_queue.put(payload['params']) 115 worker_queue.put(payload['params'])
@@ -118,7 +117,7 @@ class MultiprocessWorker(Process):
118 try: 117 try:
119 return_val = worker_result_queue.get(timeout=timeout) 118 return_val = worker_result_queue.get(timeout=timeout)
120 119
121 self.logger.debug("Got from result queue msgid: {}".format( 120 logger.debug("Got from result queue msgid: {}".format(
122 msgid)) 121 msgid))
123 except Queue.Empty: 122 except Queue.Empty:
124 return_val = 'TimeoutError' 123 return_val = 'TimeoutError'
@@ -143,9 +142,12 @@ class MultiprocessWorker(Process):
143 return_val = str(e) 142 return_val = str(e)
144 143
145 if self.job_count >= conf.MAX_JOB_COUNT: 144 if self.job_count >= conf.MAX_JOB_COUNT:
146 self.logger.debug("Worker reached job limit, exiting") 145 logger.debug("Worker reached job limit, exiting")
147 break 146 break
148 147
148 worker_queue.put('DONE')
149 worker_thread.join(timeout=5)
150
149 self.output_queue.put( 151 self.output_queue.put(
150 {'msgid': None, 152 {'msgid': None,
151 'return': 'DEATH', 153 'return': 'DEATH',
@@ -153,7 +155,11 @@ class MultiprocessWorker(Process):
153 'pid': os.getpid(), 155 'pid': os.getpid(),
154 'callback': 'worker_death'} 156 'callback': 'worker_death'}
155 ) 157 )
156 self.logger.debug("Worker death") 158
159 logger.debug("Worker death")
160
161 if worker_thread.is_alive():
162 logger.debug("Worker thread did not die gracefully")
157 163
158 164
159def _run(queue, result_queue, logger): 165def _run(queue, result_queue, logger):
@@ -187,14 +193,20 @@ def _run(queue, result_queue, logger):
187 # Blocking get so we don't spin cycles reading over and over 193 # Blocking get so we don't spin cycles reading over and over
188 try: 194 try:
189 payload = queue.get() 195 payload = queue.get()
190 except: 196 except Exception as e:
197 logger.exception(e)
191 continue 198 continue
192 199
200 if payload == 'DONE':
201 break
202
193 return_val = _run_job(payload, logger) 203 return_val = _run_job(payload, logger)
194 # Signal that we're done with this job and put its return value on the 204 # Signal that we're done with this job and put its return value on the
195 # result queue 205 # result queue
196 result_queue.put(return_val) 206 result_queue.put(return_val)
197 207
208 logger.debug("Worker thread death")
209
198 210
199def _run_job(payload, logger): 211def _run_job(payload, logger):
200 try: 212 try: