aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--eventmq/jobmanager.py30
-rw-r--r--eventmq/utils/classes.py4
-rw-r--r--eventmq/worker.py6
3 files changed, 29 insertions, 11 deletions
diff --git a/eventmq/jobmanager.py b/eventmq/jobmanager.py
index c490e68..78a89e1 100644
--- a/eventmq/jobmanager.py
+++ b/eventmq/jobmanager.py
@@ -142,6 +142,9 @@ class JobManager(HeartbeatMixin, EMQPService):
142 # Acknowledgment has come 142 # Acknowledgment has come
143 # Send a READY for each available worker 143 # Send a READY for each available worker
144 144
145 # Instatiate workers
146 self.workers
147
145 self.status = STATUS.running 148 self.status = STATUS.running
146 149
147 try: 150 try:
@@ -164,19 +167,24 @@ class JobManager(HeartbeatMixin, EMQPService):
164 else: 167 else:
165 self.handle_response(resp) 168 self.handle_response(resp)
166 169
167 if self.status == STATUS.stopping and \ 170 if self.status == STATUS.stopping:
168 not self.should_reset:
169 if len(self._workers) > 0: 171 if len(self._workers) > 0:
170 time.sleep(0.1) 172 time.sleep(0.1)
171 else: 173 elif not self.should_reset:
172 sys.exit(0) 174 sys.exit(0)
175 else:
176 break
173 177
174 if monotonic() > self.disconnect_time + \ 178 if monotonic() > self.disconnect_time + \
175 conf.KILL_GRACE_PERIOD: 179 conf.KILL_GRACE_PERIOD:
176 logger.debug("Killing unresponsive workers") 180 logger.debug("Killing unresponsive workers")
177 for pid in self._workers.keys(): 181 for pid in self._workers.keys():
178 self.kill_worker(pid, signal.SIGKILL) 182 self.kill_worker(pid, signal.SIGKILL)
179 sys.exit(0) 183
184 if not self.should_reset:
185 sys.exit(0)
186 else:
187 break
180 else: 188 else:
181 try: 189 try:
182 events = self.poller.poll(1000) 190 events = self.poller.poll(1000)
@@ -200,6 +208,14 @@ class JobManager(HeartbeatMixin, EMQPService):
200 except Exception: 208 except Exception:
201 logger.exception("Unhandled exception in main jobmanager loop") 209 logger.exception("Unhandled exception in main jobmanager loop")
202 210
211 # Cleanup
212 del self._workers
213
214 # Flush the queues with workers
215 self.request_queue = mp_queue()
216 self.finished_queue = mp_queue()
217 logger.info("Reached end of event loop")
218
203 def handle_response(self, resp): 219 def handle_response(self, resp):
204 """ 220 """
205 Handles a response from a worker process to the jobmanager 221 Handles a response from a worker process to the jobmanager
@@ -321,7 +337,8 @@ class JobManager(HeartbeatMixin, EMQPService):
321 del self._workers[pid] 337 del self._workers[pid]
322 338
323 def worker_ready(self, reply, msgid, death, pid): 339 def worker_ready(self, reply, msgid, death, pid):
324 self.send_ready() 340 if self.status != STATUS.stopping:
341 self.send_ready()
325 342
326 def worker_done_with_reply(self, reply, msgid, death, pid): 343 def worker_done_with_reply(self, reply, msgid, death, pid):
327 """ 344 """
@@ -350,6 +367,7 @@ class JobManager(HeartbeatMixin, EMQPService):
350 for another REQUEST message. 367 for another REQUEST message.
351 """ 368 """
352 self.total_ready_sent += 1 369 self.total_ready_sent += 1
370 logger.debug("Sending READY")
353 sendmsg(self.frontend, 'READY') 371 sendmsg(self.frontend, 'READY')
354 372
355 def send_reply(self, reply, msgid): 373 def send_reply(self, reply, msgid):
@@ -416,7 +434,7 @@ class JobManager(HeartbeatMixin, EMQPService):
416 self.reset() 434 self.reset()
417 435
418 def sighup_handler(self, signum, frame): 436 def sighup_handler(self, signum, frame):
419 logger.info('Caught signal %s' % signum) 437 logger.info('Caught SIGHUP')
420 reload_settings('jobmanager', self.override_settings) 438 reload_settings('jobmanager', self.override_settings)
421 439
422 self.should_reset = True 440 self.should_reset = True
diff --git a/eventmq/utils/classes.py b/eventmq/utils/classes.py
index 4dc3f0d..39ec852 100644
--- a/eventmq/utils/classes.py
+++ b/eventmq/utils/classes.py
@@ -182,6 +182,7 @@ class EMQPService(object):
182 Resets the current connection by closing and reopening the socket 182 Resets the current connection by closing and reopening the socket
183 """ 183 """
184 # Unregister the old socket from the poller 184 # Unregister the old socket from the poller
185 logger.debug("Resetting Jobmanager")
185 self.poller.unregister(self.frontend) 186 self.poller.unregister(self.frontend)
186 187
187 # Polish up a new socket to use 188 # Polish up a new socket to use
@@ -190,6 +191,9 @@ class EMQPService(object):
190 # Prepare the device to connect again 191 # Prepare the device to connect again
191 self._setup() 192 self._setup()
192 193
194 # Start the jobmanager again
195 self.start()
196
193 def process_message(self, msg): 197 def process_message(self, msg):
194 """ 198 """
195 Processes a message. Processing takes form of calling an 199 Processes a message. Processing takes form of calling an
diff --git a/eventmq/worker.py b/eventmq/worker.py
index 14d4dfe..76fb16f 100644
--- a/eventmq/worker.py
+++ b/eventmq/worker.py
@@ -109,16 +109,12 @@ class MultiprocessWorker(Process):
109 timeout = payload.get("timeout") or conf.GLOBAL_TIMEOUT 109 timeout = payload.get("timeout") or conf.GLOBAL_TIMEOUT
110 msgid = payload.get('msgid', '') 110 msgid = payload.get('msgid', '')
111 callback = payload.get('callback', '') 111 callback = payload.get('callback', '')
112 logger.debug("Putting on thread queue msgid: {}".format(
113 msgid))
114 112
115 worker_queue.put(payload['params']) 113 worker_queue.put(payload['params'])
116 114
117 try: 115 try:
118 return_val = worker_result_queue.get(timeout=timeout) 116 return_val = worker_result_queue.get(timeout=timeout)
119 117
120 logger.debug("Got from result queue msgid: {}".format(
121 msgid))
122 except Queue.Empty: 118 except Queue.Empty:
123 return_val = 'TimeoutError' 119 return_val = 'TimeoutError'
124 120
@@ -129,7 +125,7 @@ class MultiprocessWorker(Process):
129 {'msgid': msgid, 125 {'msgid': msgid,
130 'return': return_val, 126 'return': return_val,
131 'death': self.job_count >= conf.MAX_JOB_COUNT or 127 'death': self.job_count >= conf.MAX_JOB_COUNT or
132 return_val == 'TimeoutError', 128 return_val["value"] == 'TimeoutError',
133 'pid': os.getpid(), 129 'pid': os.getpid(),
134 'callback': callback} 130 'callback': callback}
135 ) 131 )