aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorjason2017-02-14 16:10:37 -0700
committerGitHub2017-02-14 16:10:37 -0700
commit9d1ef94bd1755cde2459cd1bc8a61ad768f5c987 (patch)
tree45e989748996f2e5e0d85137aa3ac56e9da3c6e3
parent7820331fd493d17513ca412a927e556ec0dd0a05 (diff)
parentbfa8bbe993d603046647384b7448a0fd5fc850c9 (diff)
downloadeventmq-9d1ef94bd1755cde2459cd1bc8a61ad768f5c987.tar.gz
eventmq-9d1ef94bd1755cde2459cd1bc8a61ad768f5c987.zip
Merge pull request #80 from sideshowdave7/master
EventMQ 0.3
-rwxr-xr-xbin/send_msg2
-rwxr-xr-xbin/send_publish12
-rw-r--r--eventmq/__init__.py2
-rw-r--r--eventmq/constants.py1
-rw-r--r--eventmq/jobmanager.py14
-rw-r--r--eventmq/tests/test_jobmanager.py11
-rw-r--r--eventmq/utils/classes.py3
-rw-r--r--eventmq/worker.py26
-rw-r--r--setup.py2
9 files changed, 42 insertions, 31 deletions
diff --git a/bin/send_msg b/bin/send_msg
index b0ea160..19e88de 100755
--- a/bin/send_msg
+++ b/bin/send_msg
@@ -26,7 +26,7 @@ if __name__ == "__main__":
26 'kwargs': {} 26 'kwargs': {}
27 }] 27 }]
28 28
29 send_request(s, msg, guarantee=True, reply_requested=True, timeout=10) 29 send_request(s, msg, guarantee=True, reply_requested=True, timeout=1)
30 print zmq.POLLOUT 30 print zmq.POLLOUT
31 events = dict(poller.poll(500)) 31 events = dict(poller.poll(500))
32 print events 32 print events
diff --git a/bin/send_publish b/bin/send_publish
index 140be63..50f430e 100755
--- a/bin/send_publish
+++ b/bin/send_publish
@@ -12,13 +12,5 @@ if __name__ == "__main__":
12 s.connect(sys.argv[1]) 12 s.connect(sys.argv[1])
13 13
14 msg = ['update', '1'] 14 msg = ['update', '1']
15 j = 0 15 topic = str('topic')
16 while True: 16 send_publish_request(s, topic, msg)
17
18 for i in xrange(10):
19 topic = str(i)
20 send_publish_request(s, topic, msg)
21
22
23 print j
24 j += 1
diff --git a/eventmq/__init__.py b/eventmq/__init__.py
index 6b5c4b9..5b8029b 100644
--- a/eventmq/__init__.py
+++ b/eventmq/__init__.py
@@ -1,5 +1,5 @@
1__author__ = 'EventMQ Contributors' 1__author__ = 'EventMQ Contributors'
2__version__ = '0.3-rc10' 2__version__ = '0.3'
3 3
4PROTOCOL_VERSION = 'eMQP/1.0' 4PROTOCOL_VERSION = 'eMQP/1.0'
5 5
diff --git a/eventmq/constants.py b/eventmq/constants.py
index 8dbf530..17c6130 100644
--- a/eventmq/constants.py
+++ b/eventmq/constants.py
@@ -7,6 +7,7 @@ class STATUS(object):
7 connected = 201 7 connected = 201
8 stopping = 300 8 stopping = 300
9 stopped = 301 9 stopped = 301
10 running = 400 # Running and accepting jobs actively
10 11
11 12
12class CLIENT_TYPE(object): 13class CLIENT_TYPE(object):
diff --git a/eventmq/jobmanager.py b/eventmq/jobmanager.py
index 12629ee..ea5af07 100644
--- a/eventmq/jobmanager.py
+++ b/eventmq/jobmanager.py
@@ -27,7 +27,7 @@ import zmq
27 27
28from eventmq.log import setup_logger 28from eventmq.log import setup_logger
29from . import conf 29from . import conf
30from .constants import KBYE 30from .constants import KBYE, STATUS
31from .poller import Poller, POLLIN 31from .poller import Poller, POLLIN
32from .sender import Sender 32from .sender import Sender
33from .utils.classes import EMQPService, HeartbeatMixin 33from .utils.classes import EMQPService, HeartbeatMixin
@@ -37,6 +37,7 @@ from .utils.messages import send_emqp_message as sendmsg
37from .utils.settings import import_settings 37from .utils.settings import import_settings
38from .worker import MultiprocessWorker as Worker 38from .worker import MultiprocessWorker as Worker
39 39
40
40if sys.version[0] == '2': 41if sys.version[0] == '2':
41 import Queue 42 import Queue
42else: 43else:
@@ -124,9 +125,12 @@ class JobManager(HeartbeatMixin, EMQPService):
124 for i in range(0, len(self.workers)): 125 for i in range(0, len(self.workers)):
125 self.send_ready() 126 self.send_ready()
126 127
128 self.status = STATUS.running
129
127 while True: 130 while True:
128 # Clear any workers if it's time to shut down 131 # Clear any workers if it's time to shut down
129 if self.received_disconnect: 132 if self.received_disconnect:
133 self.status = STATUS.stopping
130 for _ in range(0, len(self.workers)): 134 for _ in range(0, len(self.workers)):
131 logger.debug('Requesting worker death...') 135 logger.debug('Requesting worker death...')
132 self.request_queue.put_nowait('DONE') 136 self.request_queue.put_nowait('DONE')
@@ -253,7 +257,8 @@ class JobManager(HeartbeatMixin, EMQPService):
253 in :meth:`self.process_message` as every message is counted as a 257 in :meth:`self.process_message` as every message is counted as a
254 HEARTBEAT 258 HEARTBEAT
255 """ 259 """
256 self.check_worker_health() 260 if self.status == STATUS.running:
261 self.check_worker_health()
257 262
258 def check_worker_health(self): 263 def check_worker_health(self):
259 """ 264 """
@@ -282,10 +287,11 @@ class JobManager(HeartbeatMixin, EMQPService):
282 287
283 def sighup_handler(self, signum, frame): 288 def sighup_handler(self, signum, frame):
284 logger.info('Caught signal %s' % signum) 289 logger.info('Caught signal %s' % signum)
285 self.outgoing.rebuild()
286 import_settings() 290 import_settings()
287 import_settings(section='jobmanager') 291 import_settings(section='jobmanager')
288 self.start(addr=conf.WORKER_ADDR) 292
293 self.should_reset = True
294 self.received_disconnect = True
289 295
290 def sigterm_handler(self, signum, frame): 296 def sigterm_handler(self, signum, frame):
291 logger.info('Shutting down..') 297 logger.info('Shutting down..')
diff --git a/eventmq/tests/test_jobmanager.py b/eventmq/tests/test_jobmanager.py
index d0497bc..db82843 100644
--- a/eventmq/tests/test_jobmanager.py
+++ b/eventmq/tests/test_jobmanager.py
@@ -111,27 +111,18 @@ class TestCase(unittest.TestCase):
111 self.assertTrue(jm.received_disconnect, "Did not receive disconnect.") 111 self.assertTrue(jm.received_disconnect, "Did not receive disconnect.")
112 112
113 # Other Tests 113 # Other Tests
114 @mock.patch('eventmq.jobmanager.JobManager.start')
115 @mock.patch('eventmq.jobmanager.import_settings') 114 @mock.patch('eventmq.jobmanager.import_settings')
116 @mock.patch('eventmq.jobmanager.Sender.rebuild') 115 def test_sighup_handler(self, import_settings_mock):
117 def test_sighup_handler(self, rebuild_mock, import_settings_mock,
118 start_mock):
119 jm = jobmanager.JobManager() 116 jm = jobmanager.JobManager()
120 117
121 jm.sighup_handler(982374, "FRAMEY the frame") 118 jm.sighup_handler(982374, "FRAMEY the frame")
122 119
123 self.assertTrue(rebuild_mock.called)
124
125 # called once for the default settings, once for the jobmanager 120 # called once for the default settings, once for the jobmanager
126 # settings 121 # settings
127 self.assertEqual(2, import_settings_mock.call_count) 122 self.assertEqual(2, import_settings_mock.call_count)
128 # check to see if the last call was called with the jobmanager section 123 # check to see if the last call was called with the jobmanager section
129 import_settings_mock.assert_called_with(section='jobmanager') 124 import_settings_mock.assert_called_with(section='jobmanager')
130 125
131 start_mock.assert_called_with(
132 addr=conf.WORKER_ADDR,
133 )
134
135 @mock.patch('eventmq.jobmanager.sendmsg') 126 @mock.patch('eventmq.jobmanager.sendmsg')
136 def test_sigterm_handler(self, sendmsg_mock): 127 def test_sigterm_handler(self, sendmsg_mock):
137 jm = jobmanager.JobManager() 128 jm = jobmanager.JobManager()
diff --git a/eventmq/utils/classes.py b/eventmq/utils/classes.py
index d1ca275..39c600b 100644
--- a/eventmq/utils/classes.py
+++ b/eventmq/utils/classes.py
@@ -119,6 +119,7 @@ class EMQPService(object):
119 self.poller.register(self.outgoing, poller.POLLIN) 119 self.poller.register(self.outgoing, poller.POLLIN)
120 self.awaiting_startup_ack = False 120 self.awaiting_startup_ack = False
121 self.received_disconnect = False 121 self.received_disconnect = False
122 self.should_reset = False
122 123
123 self.status = constants.STATUS.ready 124 self.status = constants.STATUS.ready
124 125
@@ -165,7 +166,7 @@ class EMQPService(object):
165 166
166 # When we return, soemthing has gone wrong and try to reconnect 167 # When we return, soemthing has gone wrong and try to reconnect
167 # unless self.received_disconnect is True 168 # unless self.received_disconnect is True
168 if not self.received_disconnect: 169 if not self.received_disconnect or self.should_reset:
169 self.reset() 170 self.reset()
170 171
171 logger.info('Death.') 172 logger.info('Death.')
diff --git a/eventmq/worker.py b/eventmq/worker.py
index 7729120..8d25c63 100644
--- a/eventmq/worker.py
+++ b/eventmq/worker.py
@@ -21,13 +21,29 @@ from importlib import import_module
21import logging 21import logging
22from multiprocessing import Process 22from multiprocessing import Process
23import os 23import os
24from threading import Thread 24from threading import Event, Thread
25 25
26from . import conf 26from . import conf
27 27
28logger = logging.getLogger(__name__) 28logger = logging.getLogger(__name__)
29 29
30 30
31class StoppableThread(Thread):
32 """Thread class with a stop() method. The thread itself has to check
33 regularly for the stopped() condition."""
34
35 def __init__(self, target, name=None, args=()):
36 super(StoppableThread, self).__init__(name=name, target=target,
37 args=args)
38 self._stop = Event()
39
40 def stop(self):
41 self._stop.set()
42
43 def stopped(self):
44 return self._stop.isSet()
45
46
31class MultiprocessWorker(Process): 47class MultiprocessWorker(Process):
32 """ 48 """
33 Defines a worker that spans the job in a multiprocessing task 49 Defines a worker that spans the job in a multiprocessing task
@@ -60,12 +76,13 @@ class MultiprocessWorker(Process):
60 76
61 try: 77 try:
62 if timeout: 78 if timeout:
63 worker_thread = Thread(target=_run, 79 worker_thread = StoppableThread(target=_run,
64 args=(payload['params'], )) 80 args=(payload['params'], ))
65 worker_thread.start() 81 worker_thread.start()
66 worker_thread.join(timeout) 82 worker_thread.join(timeout)
67 83
68 if worker_thread.isAlive(): 84 if worker_thread.isAlive():
85 worker_thread.stop()
69 resp['return'] = 'TimeoutError' 86 resp['return'] = 'TimeoutError'
70 else: 87 else:
71 resp['return'] = 'DONE' 88 resp['return'] = 'DONE'
@@ -80,6 +97,9 @@ class MultiprocessWorker(Process):
80 if self.job_count > conf.MAX_JOB_COUNT: 97 if self.job_count > conf.MAX_JOB_COUNT:
81 break 98 break
82 99
100 if resp['return'] == 'TimeoutError':
101 break
102
83 logger.debug("Worker death, PID: {}".format(os.getpid())) 103 logger.debug("Worker death, PID: {}".format(os.getpid()))
84 104
85 105
diff --git a/setup.py b/setup.py
index 0419294..24a364b 100644
--- a/setup.py
+++ b/setup.py
@@ -7,7 +7,7 @@ from setuptools import find_packages, setup
7 7
8setup( 8setup(
9 name='eventmq', 9 name='eventmq',
10 version='0.3-rc10', 10 version='0.3',
11 description='EventMQ messaging system based on ZeroMQ', 11 description='EventMQ messaging system based on ZeroMQ',
12 packages=find_packages(), 12 packages=find_packages(),
13 install_requires=['pyzmq==15.4.0', 13 install_requires=['pyzmq==15.4.0',