diff options
| author | jason | 2017-08-11 14:16:10 -0600 |
|---|---|---|
| committer | GitHub | 2017-08-11 14:16:10 -0600 |
| commit | 9f60b21dd37c35c0b7909b5f1d8e403ef761d513 (patch) | |
| tree | 9f676e3fa04b3f17c0840a93b1e9b612eb75a55e | |
| parent | c2ce431de1079f1a2a70dd0a08b0282bcbfe389b (diff) | |
| parent | 56cfa885b92b024dc2f70157a0f0b7826ad23e5b (diff) | |
| download | eventmq-9f60b21dd37c35c0b7909b5f1d8e403ef761d513.tar.gz eventmq-9f60b21dd37c35c0b7909b5f1d8e403ef761d513.zip | |
Merge pull request #53 from sideshowdave7/master
Backport 0.4 handling of workers after death
| -rwxr-xr-x | bin/send_msg | 2 | ||||
| -rw-r--r-- | eventmq/jobmanager.py | 17 |
2 files changed, 10 insertions, 9 deletions
diff --git a/bin/send_msg b/bin/send_msg index c3e88fa..33cfb1a 100755 --- a/bin/send_msg +++ b/bin/send_msg | |||
| @@ -22,7 +22,7 @@ if __name__ == "__main__": | |||
| 22 | 'callable': 'work_job', | 22 | 'callable': 'work_job', |
| 23 | 'class_args': ('blurp',), | 23 | 'class_args': ('blurp',), |
| 24 | 'class_kwargs': {'kwarg1': True}, | 24 | 'class_kwargs': {'kwarg1': True}, |
| 25 | 'args': (10, ), | 25 | 'args': (1, ), |
| 26 | 'kwargs': {} | 26 | 'kwargs': {} |
| 27 | }] | 27 | }] |
| 28 | 28 | ||
diff --git a/eventmq/jobmanager.py b/eventmq/jobmanager.py index c904d99..c900495 100644 --- a/eventmq/jobmanager.py +++ b/eventmq/jobmanager.py | |||
| @@ -146,14 +146,6 @@ class JobManager(HeartbeatMixin, EMQPService): | |||
| 146 | Starts the actual event loop. Usually called by :meth:`start` | 146 | Starts the actual event loop. Usually called by :meth:`start` |
| 147 | """ | 147 | """ |
| 148 | # Acknowledgment has come | 148 | # Acknowledgment has come |
| 149 | # When the job manager unexpectedly disconnects from the router and | ||
| 150 | # reconnects it needs to send a ready for each previously available | ||
| 151 | # worker. | ||
| 152 | # Send a READY for each previously available worker | ||
| 153 | if hasattr(self, '_workers'): | ||
| 154 | for _ in self._workers: | ||
| 155 | self.send_ready() | ||
| 156 | |||
| 157 | self.status = STATUS.running | 149 | self.status = STATUS.running |
| 158 | 150 | ||
| 159 | try: | 151 | try: |
| @@ -218,6 +210,15 @@ class JobManager(HeartbeatMixin, EMQPService): | |||
| 218 | except Exception: | 210 | except Exception: |
| 219 | logger.exception("Unhandled exception in main jobmanager loop") | 211 | logger.exception("Unhandled exception in main jobmanager loop") |
| 220 | 212 | ||
| 213 | # Cleanup | ||
| 214 | if hasattr(self, '_workers'): | ||
| 215 | del self._workers | ||
| 216 | |||
| 217 | # Flush the queues with workers | ||
| 218 | self.request_queue = mp_queue() | ||
| 219 | self.finished_queue = mp_queue() | ||
| 220 | logger.info("Reached end of event loop") | ||
| 221 | |||
| 221 | def handle_response(self, resp): | 222 | def handle_response(self, resp): |
| 222 | """ | 223 | """ |
| 223 | Handles a response from a worker process to the jobmanager | 224 | Handles a response from a worker process to the jobmanager |