aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorjason2017-08-11 14:16:10 -0600
committerGitHub2017-08-11 14:16:10 -0600
commit9f60b21dd37c35c0b7909b5f1d8e403ef761d513 (patch)
tree9f676e3fa04b3f17c0840a93b1e9b612eb75a55e
parentc2ce431de1079f1a2a70dd0a08b0282bcbfe389b (diff)
parent56cfa885b92b024dc2f70157a0f0b7826ad23e5b (diff)
downloadeventmq-9f60b21dd37c35c0b7909b5f1d8e403ef761d513.tar.gz
eventmq-9f60b21dd37c35c0b7909b5f1d8e403ef761d513.zip
Merge pull request #53 from sideshowdave7/master
Backport 0.4 handling of workers after death
-rwxr-xr-xbin/send_msg2
-rw-r--r--eventmq/jobmanager.py17
2 files changed, 10 insertions, 9 deletions
diff --git a/bin/send_msg b/bin/send_msg
index c3e88fa..33cfb1a 100755
--- a/bin/send_msg
+++ b/bin/send_msg
@@ -22,7 +22,7 @@ if __name__ == "__main__":
22 'callable': 'work_job', 22 'callable': 'work_job',
23 'class_args': ('blurp',), 23 'class_args': ('blurp',),
24 'class_kwargs': {'kwarg1': True}, 24 'class_kwargs': {'kwarg1': True},
25 'args': (10, ), 25 'args': (1, ),
26 'kwargs': {} 26 'kwargs': {}
27 }] 27 }]
28 28
diff --git a/eventmq/jobmanager.py b/eventmq/jobmanager.py
index c904d99..c900495 100644
--- a/eventmq/jobmanager.py
+++ b/eventmq/jobmanager.py
@@ -146,14 +146,6 @@ class JobManager(HeartbeatMixin, EMQPService):
146 Starts the actual event loop. Usually called by :meth:`start` 146 Starts the actual event loop. Usually called by :meth:`start`
147 """ 147 """
148 # Acknowledgment has come 148 # Acknowledgment has come
149 # When the job manager unexpectedly disconnects from the router and
150 # reconnects it needs to send a ready for each previously available
151 # worker.
152 # Send a READY for each previously available worker
153 if hasattr(self, '_workers'):
154 for _ in self._workers:
155 self.send_ready()
156
157 self.status = STATUS.running 149 self.status = STATUS.running
158 150
159 try: 151 try:
@@ -218,6 +210,15 @@ class JobManager(HeartbeatMixin, EMQPService):
218 except Exception: 210 except Exception:
219 logger.exception("Unhandled exception in main jobmanager loop") 211 logger.exception("Unhandled exception in main jobmanager loop")
220 212
213 # Cleanup
214 if hasattr(self, '_workers'):
215 del self._workers
216
217 # Flush the queues with workers
218 self.request_queue = mp_queue()
219 self.finished_queue = mp_queue()
220 logger.info("Reached end of event loop")
221
221 def handle_response(self, resp): 222 def handle_response(self, resp):
222 """ 223 """
223 Handles a response from a worker process to the jobmanager 224 Handles a response from a worker process to the jobmanager