diff options
| -rw-r--r-- | eventmq/jobmanager.py | 30 | ||||
| -rw-r--r-- | eventmq/utils/classes.py | 4 | ||||
| -rw-r--r-- | eventmq/worker.py | 6 |
3 files changed, 29 insertions, 11 deletions
diff --git a/eventmq/jobmanager.py b/eventmq/jobmanager.py index c490e68..78a89e1 100644 --- a/eventmq/jobmanager.py +++ b/eventmq/jobmanager.py | |||
| @@ -142,6 +142,9 @@ class JobManager(HeartbeatMixin, EMQPService): | |||
| 142 | # Acknowledgment has come | 142 | # Acknowledgment has come |
| 143 | # Send a READY for each available worker | 143 | # Send a READY for each available worker |
| 144 | 144 | ||
| 145 | # Instatiate workers | ||
| 146 | self.workers | ||
| 147 | |||
| 145 | self.status = STATUS.running | 148 | self.status = STATUS.running |
| 146 | 149 | ||
| 147 | try: | 150 | try: |
| @@ -164,19 +167,24 @@ class JobManager(HeartbeatMixin, EMQPService): | |||
| 164 | else: | 167 | else: |
| 165 | self.handle_response(resp) | 168 | self.handle_response(resp) |
| 166 | 169 | ||
| 167 | if self.status == STATUS.stopping and \ | 170 | if self.status == STATUS.stopping: |
| 168 | not self.should_reset: | ||
| 169 | if len(self._workers) > 0: | 171 | if len(self._workers) > 0: |
| 170 | time.sleep(0.1) | 172 | time.sleep(0.1) |
| 171 | else: | 173 | elif not self.should_reset: |
| 172 | sys.exit(0) | 174 | sys.exit(0) |
| 175 | else: | ||
| 176 | break | ||
| 173 | 177 | ||
| 174 | if monotonic() > self.disconnect_time + \ | 178 | if monotonic() > self.disconnect_time + \ |
| 175 | conf.KILL_GRACE_PERIOD: | 179 | conf.KILL_GRACE_PERIOD: |
| 176 | logger.debug("Killing unresponsive workers") | 180 | logger.debug("Killing unresponsive workers") |
| 177 | for pid in self._workers.keys(): | 181 | for pid in self._workers.keys(): |
| 178 | self.kill_worker(pid, signal.SIGKILL) | 182 | self.kill_worker(pid, signal.SIGKILL) |
| 179 | sys.exit(0) | 183 | |
| 184 | if not self.should_reset: | ||
| 185 | sys.exit(0) | ||
| 186 | else: | ||
| 187 | break | ||
| 180 | else: | 188 | else: |
| 181 | try: | 189 | try: |
| 182 | events = self.poller.poll(1000) | 190 | events = self.poller.poll(1000) |
| @@ -200,6 +208,14 @@ class JobManager(HeartbeatMixin, EMQPService): | |||
| 200 | except Exception: | 208 | except Exception: |
| 201 | logger.exception("Unhandled exception in main jobmanager loop") | 209 | logger.exception("Unhandled exception in main jobmanager loop") |
| 202 | 210 | ||
| 211 | # Cleanup | ||
| 212 | del self._workers | ||
| 213 | |||
| 214 | # Flush the queues with workers | ||
| 215 | self.request_queue = mp_queue() | ||
| 216 | self.finished_queue = mp_queue() | ||
| 217 | logger.info("Reached end of event loop") | ||
| 218 | |||
| 203 | def handle_response(self, resp): | 219 | def handle_response(self, resp): |
| 204 | """ | 220 | """ |
| 205 | Handles a response from a worker process to the jobmanager | 221 | Handles a response from a worker process to the jobmanager |
| @@ -321,7 +337,8 @@ class JobManager(HeartbeatMixin, EMQPService): | |||
| 321 | del self._workers[pid] | 337 | del self._workers[pid] |
| 322 | 338 | ||
| 323 | def worker_ready(self, reply, msgid, death, pid): | 339 | def worker_ready(self, reply, msgid, death, pid): |
| 324 | self.send_ready() | 340 | if self.status != STATUS.stopping: |
| 341 | self.send_ready() | ||
| 325 | 342 | ||
| 326 | def worker_done_with_reply(self, reply, msgid, death, pid): | 343 | def worker_done_with_reply(self, reply, msgid, death, pid): |
| 327 | """ | 344 | """ |
| @@ -350,6 +367,7 @@ class JobManager(HeartbeatMixin, EMQPService): | |||
| 350 | for another REQUEST message. | 367 | for another REQUEST message. |
| 351 | """ | 368 | """ |
| 352 | self.total_ready_sent += 1 | 369 | self.total_ready_sent += 1 |
| 370 | logger.debug("Sending READY") | ||
| 353 | sendmsg(self.frontend, 'READY') | 371 | sendmsg(self.frontend, 'READY') |
| 354 | 372 | ||
| 355 | def send_reply(self, reply, msgid): | 373 | def send_reply(self, reply, msgid): |
| @@ -416,7 +434,7 @@ class JobManager(HeartbeatMixin, EMQPService): | |||
| 416 | self.reset() | 434 | self.reset() |
| 417 | 435 | ||
| 418 | def sighup_handler(self, signum, frame): | 436 | def sighup_handler(self, signum, frame): |
| 419 | logger.info('Caught signal %s' % signum) | 437 | logger.info('Caught SIGHUP') |
| 420 | reload_settings('jobmanager', self.override_settings) | 438 | reload_settings('jobmanager', self.override_settings) |
| 421 | 439 | ||
| 422 | self.should_reset = True | 440 | self.should_reset = True |
diff --git a/eventmq/utils/classes.py b/eventmq/utils/classes.py index 4dc3f0d..39ec852 100644 --- a/eventmq/utils/classes.py +++ b/eventmq/utils/classes.py | |||
| @@ -182,6 +182,7 @@ class EMQPService(object): | |||
| 182 | Resets the current connection by closing and reopening the socket | 182 | Resets the current connection by closing and reopening the socket |
| 183 | """ | 183 | """ |
| 184 | # Unregister the old socket from the poller | 184 | # Unregister the old socket from the poller |
| 185 | logger.debug("Resetting Jobmanager") | ||
| 185 | self.poller.unregister(self.frontend) | 186 | self.poller.unregister(self.frontend) |
| 186 | 187 | ||
| 187 | # Polish up a new socket to use | 188 | # Polish up a new socket to use |
| @@ -190,6 +191,9 @@ class EMQPService(object): | |||
| 190 | # Prepare the device to connect again | 191 | # Prepare the device to connect again |
| 191 | self._setup() | 192 | self._setup() |
| 192 | 193 | ||
| 194 | # Start the jobmanager again | ||
| 195 | self.start() | ||
| 196 | |||
| 193 | def process_message(self, msg): | 197 | def process_message(self, msg): |
| 194 | """ | 198 | """ |
| 195 | Processes a message. Processing takes form of calling an | 199 | Processes a message. Processing takes form of calling an |
diff --git a/eventmq/worker.py b/eventmq/worker.py index 14d4dfe..76fb16f 100644 --- a/eventmq/worker.py +++ b/eventmq/worker.py | |||
| @@ -109,16 +109,12 @@ class MultiprocessWorker(Process): | |||
| 109 | timeout = payload.get("timeout") or conf.GLOBAL_TIMEOUT | 109 | timeout = payload.get("timeout") or conf.GLOBAL_TIMEOUT |
| 110 | msgid = payload.get('msgid', '') | 110 | msgid = payload.get('msgid', '') |
| 111 | callback = payload.get('callback', '') | 111 | callback = payload.get('callback', '') |
| 112 | logger.debug("Putting on thread queue msgid: {}".format( | ||
| 113 | msgid)) | ||
| 114 | 112 | ||
| 115 | worker_queue.put(payload['params']) | 113 | worker_queue.put(payload['params']) |
| 116 | 114 | ||
| 117 | try: | 115 | try: |
| 118 | return_val = worker_result_queue.get(timeout=timeout) | 116 | return_val = worker_result_queue.get(timeout=timeout) |
| 119 | 117 | ||
| 120 | logger.debug("Got from result queue msgid: {}".format( | ||
| 121 | msgid)) | ||
| 122 | except Queue.Empty: | 118 | except Queue.Empty: |
| 123 | return_val = 'TimeoutError' | 119 | return_val = 'TimeoutError' |
| 124 | 120 | ||
| @@ -129,7 +125,7 @@ class MultiprocessWorker(Process): | |||
| 129 | {'msgid': msgid, | 125 | {'msgid': msgid, |
| 130 | 'return': return_val, | 126 | 'return': return_val, |
| 131 | 'death': self.job_count >= conf.MAX_JOB_COUNT or | 127 | 'death': self.job_count >= conf.MAX_JOB_COUNT or |
| 132 | return_val == 'TimeoutError', | 128 | return_val["value"] == 'TimeoutError', |
| 133 | 'pid': os.getpid(), | 129 | 'pid': os.getpid(), |
| 134 | 'callback': callback} | 130 | 'callback': callback} |
| 135 | ) | 131 | ) |