aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorjason2017-08-18 14:39:46 -0600
committerjason2017-08-18 14:39:46 -0600
commite945513dc1251e07a93c13ebebf92518251a9dff (patch)
tree752f323a7f56edbed2be27d129ab6764357c1baf
parent02e0e19aa1f8f745c65c498ba69d8975699f913e (diff)
downloadeventmq-e945513dc1251e07a93c13ebebf92518251a9dff.tar.gz
eventmq-e945513dc1251e07a93c13ebebf92518251a9dff.zip
Revert "Backport 0.4 handling of workers after death"
- This was a stab at a fix which didn't fix the issue we were seeing. It's possible that it caused a memory issue so it is being reverted. This reverts commit 56cfa885b92b024dc2f70157a0f0b7826ad23e5b.
-rwxr-xr-xbin/send_msg2
-rw-r--r--eventmq/jobmanager.py17
2 files changed, 9 insertions, 10 deletions
diff --git a/bin/send_msg b/bin/send_msg
index 33cfb1a..c3e88fa 100755
--- a/bin/send_msg
+++ b/bin/send_msg
@@ -22,7 +22,7 @@ if __name__ == "__main__":
22 'callable': 'work_job', 22 'callable': 'work_job',
23 'class_args': ('blurp',), 23 'class_args': ('blurp',),
24 'class_kwargs': {'kwarg1': True}, 24 'class_kwargs': {'kwarg1': True},
25 'args': (1, ), 25 'args': (10, ),
26 'kwargs': {} 26 'kwargs': {}
27 }] 27 }]
28 28
diff --git a/eventmq/jobmanager.py b/eventmq/jobmanager.py
index c900495..c904d99 100644
--- a/eventmq/jobmanager.py
+++ b/eventmq/jobmanager.py
@@ -146,6 +146,14 @@ class JobManager(HeartbeatMixin, EMQPService):
146 Starts the actual event loop. Usually called by :meth:`start` 146 Starts the actual event loop. Usually called by :meth:`start`
147 """ 147 """
148 # Acknowledgment has come 148 # Acknowledgment has come
149 # When the job manager unexpectedly disconnects from the router and
150 # reconnects it needs to send a ready for each previously available
151 # worker.
152 # Send a READY for each previously available worker
153 if hasattr(self, '_workers'):
154 for _ in self._workers:
155 self.send_ready()
156
149 self.status = STATUS.running 157 self.status = STATUS.running
150 158
151 try: 159 try:
@@ -210,15 +218,6 @@ class JobManager(HeartbeatMixin, EMQPService):
210 except Exception: 218 except Exception:
211 logger.exception("Unhandled exception in main jobmanager loop") 219 logger.exception("Unhandled exception in main jobmanager loop")
212 220
213 # Cleanup
214 if hasattr(self, '_workers'):
215 del self._workers
216
217 # Flush the queues with workers
218 self.request_queue = mp_queue()
219 self.finished_queue = mp_queue()
220 logger.info("Reached end of event loop")
221
222 def handle_response(self, resp): 221 def handle_response(self, resp):
223 """ 222 """
224 Handles a response from a worker process to the jobmanager 223 Handles a response from a worker process to the jobmanager