aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDavid Hurst2017-10-14 20:50:27 -0600
committerDavid Hurst2017-10-14 20:50:27 -0600
commit235f125c3ca68f9835936dcde40eff2fb275dcd0 (patch)
tree4e9b28eb92ce54b9cf201c93111b1094cdcdbe3c
parent55963796809ce5aacf47c24d8295ab5b1a500ad1 (diff)
parenta6cab42591944d2515f96a5e1975dd8224972dda (diff)
downloadeventmq-235f125c3ca68f9835936dcde40eff2fb275dcd0.tar.gz
eventmq-235f125c3ca68f9835936dcde40eff2fb275dcd0.zip
Merge remote-tracking branch 'com4/master'
-rwxr-xr-xbin/send_msg2
-rw-r--r--eventmq/__init__.py2
-rw-r--r--eventmq/jobmanager.py24
-rw-r--r--eventmq/log.py6
-rw-r--r--setup.py2
5 files changed, 19 insertions, 17 deletions
diff --git a/bin/send_msg b/bin/send_msg
index 33cfb1a..c3e88fa 100755
--- a/bin/send_msg
+++ b/bin/send_msg
@@ -22,7 +22,7 @@ if __name__ == "__main__":
22 'callable': 'work_job', 22 'callable': 'work_job',
23 'class_args': ('blurp',), 23 'class_args': ('blurp',),
24 'class_kwargs': {'kwarg1': True}, 24 'class_kwargs': {'kwarg1': True},
25 'args': (1, ), 25 'args': (10, ),
26 'kwargs': {} 26 'kwargs': {}
27 }] 27 }]
28 28
diff --git a/eventmq/__init__.py b/eventmq/__init__.py
index a02a091..ef87218 100644
--- a/eventmq/__init__.py
+++ b/eventmq/__init__.py
@@ -1,5 +1,5 @@
1__author__ = 'EventMQ Contributors' 1__author__ = 'EventMQ Contributors'
2__version__ = '0.3.4.9' 2__version__ = '0.3.4.11'
3 3
4PROTOCOL_VERSION = 'eMQP/1.0' 4PROTOCOL_VERSION = 'eMQP/1.0'
5 5
diff --git a/eventmq/jobmanager.py b/eventmq/jobmanager.py
index c900495..78a080b 100644
--- a/eventmq/jobmanager.py
+++ b/eventmq/jobmanager.py
@@ -20,7 +20,7 @@ Ensures things about jobs and spawns the actual tasks
20from json import dumps as serializer, loads as deserializer 20from json import dumps as serializer, loads as deserializer
21 21
22import logging 22import logging
23from multiprocessing import Queue as mp_queue 23from multiprocessing import Manager as MPManager
24import os 24import os
25import signal 25import signal
26import sys 26import sys
@@ -121,8 +121,9 @@ class JobManager(HeartbeatMixin, EMQPService):
121 self.pid_distribution = {} 121 self.pid_distribution = {}
122 122
123 #: Setup worker queues 123 #: Setup worker queues
124 self.request_queue = mp_queue() 124 self._mp_manager = MPManager()
125 self.finished_queue = mp_queue() 125 self.request_queue = self._mp_manager.Queue()
126 self.finished_queue = self._mp_manager.Queue()
126 self._setup() 127 self._setup()
127 128
128 def handle_pdb(self, sig, frame): 129 def handle_pdb(self, sig, frame):
@@ -146,6 +147,14 @@ class JobManager(HeartbeatMixin, EMQPService):
146 Starts the actual event loop. Usually called by :meth:`start` 147 Starts the actual event loop. Usually called by :meth:`start`
147 """ 148 """
148 # Acknowledgment has come 149 # Acknowledgment has come
150 # When the job manager unexpectedly disconnects from the router and
151 # reconnects it needs to send a ready for each previously available
152 # worker.
153 # Send a READY for each previously available worker
154 if hasattr(self, '_workers'):
155 for _ in self._workers:
156 self.send_ready()
157
149 self.status = STATUS.running 158 self.status = STATUS.running
150 159
151 try: 160 try:
@@ -210,15 +219,6 @@ class JobManager(HeartbeatMixin, EMQPService):
210 except Exception: 219 except Exception:
211 logger.exception("Unhandled exception in main jobmanager loop") 220 logger.exception("Unhandled exception in main jobmanager loop")
212 221
213 # Cleanup
214 if hasattr(self, '_workers'):
215 del self._workers
216
217 # Flush the queues with workers
218 self.request_queue = mp_queue()
219 self.finished_queue = mp_queue()
220 logger.info("Reached end of event loop")
221
222 def handle_response(self, resp): 222 def handle_response(self, resp):
223 """ 223 """
224 Handles a response from a worker process to the jobmanager 224 Handles a response from a worker process to the jobmanager
diff --git a/eventmq/log.py b/eventmq/log.py
index 1864c8d..3d62165 100644
--- a/eventmq/log.py
+++ b/eventmq/log.py
@@ -27,9 +27,11 @@ import zmq.log.handlers
27 27
28 28
29FORMAT_STANDARD = logging.Formatter( 29FORMAT_STANDARD = logging.Formatter(
30 '%(asctime)s - %(name)s %(levelname)s - %(message)s') 30 '%(asctime)s - %(name)s %(levelname)s - %(message)s',
31 datefmt='%Y-%m-%dT%H:%M:%S%z')
31FORMAT_NAMELESS = logging.Formatter( 32FORMAT_NAMELESS = logging.Formatter(
32 '%(asctime)s - %(levelname)s - %(message)s') 33 '%(asctime)s - %(levelname)s - %(message)s',
34 datefmt='%Y-%m-%dT%H:%M:%S%z')
33 35
34 36
35class PUBHandler(zmq.log.handlers.PUBHandler): 37class PUBHandler(zmq.log.handlers.PUBHandler):
diff --git a/setup.py b/setup.py
index 553c86b..6faa4c1 100644
--- a/setup.py
+++ b/setup.py
@@ -7,7 +7,7 @@ from setuptools import find_packages, setup
7 7
8setup( 8setup(
9 name='eventmq', 9 name='eventmq',
10 version='0.3.4.9', 10 version='0.3.4.11',
11 description='EventMQ job execution and messaging system based on ZeroMQ', 11 description='EventMQ job execution and messaging system based on ZeroMQ',
12 packages=find_packages(), 12 packages=find_packages(),
13 install_requires=['pyzmq==15.4.0', 13 install_requires=['pyzmq==15.4.0',