diff options
| author | jason | 2017-02-17 16:46:12 -0700 |
|---|---|---|
| committer | jason | 2017-02-17 16:46:12 -0700 |
| commit | da3538dcb8d189905116709b0d1bdb36910e36c6 (patch) | |
| tree | db94ade7bf967ff5fcd6b3e42a4e03361505629c | |
| parent | 6237f10994f940bb83a276a4ea41f11e5daf7af7 (diff) | |
| parent | 35101b668191aa084d1d115d4590e7d412e5282a (diff) | |
| download | eventmq-da3538dcb8d189905116709b0d1bdb36910e36c6.tar.gz eventmq-da3538dcb8d189905116709b0d1bdb36910e36c6.zip | |
Merge branch 'feature/schedule_helpers' into 0.4
| -rw-r--r-- | README.md | 12 | ||||
| -rwxr-xr-x | bin/send_msg | 2 | ||||
| -rwxr-xr-x | bin/send_publish | 12 | ||||
| -rw-r--r-- | docs/settings_file.rst | 11 | ||||
| -rw-r--r-- | eventmq/__init__.py | 2 | ||||
| -rw-r--r-- | eventmq/constants.py | 1 | ||||
| -rw-r--r-- | eventmq/jobmanager.py | 14 | ||||
| -rw-r--r-- | eventmq/scheduler.py | 1 | ||||
| -rw-r--r-- | eventmq/tests/test_jobmanager.py | 11 | ||||
| -rw-r--r-- | eventmq/utils/classes.py | 3 | ||||
| -rw-r--r-- | eventmq/worker.py | 26 | ||||
| -rw-r--r-- | setup.py | 8 |
12 files changed, 60 insertions, 43 deletions
| @@ -1,7 +1,7 @@ | |||
| 1 | EventMQ | 1 | EventMQ |
| 2 | ======= | 2 | ======= |
| 3 | [](https://circleci.com/gh/enderlabs/eventmq) | 3 | [](https://circleci.com/gh/eventmq/eventmq) |
| 4 | [](https://coveralls.io/github/enderlabs/eventmq?branch=master) | 4 | [](https://coveralls.io/github/eventmq/eventmq) |
| 5 | 5 | ||
| 6 | # Overview | 6 | # Overview |
| 7 | EventMQ is a message passing system built on [ZeroMQ](https://zeromq.org) | 7 | EventMQ is a message passing system built on [ZeroMQ](https://zeromq.org) |
| @@ -17,14 +17,18 @@ pip install eventmq | |||
| 17 | # Support | 17 | # Support |
| 18 | ## Documenation | 18 | ## Documenation |
| 19 | 19 | ||
| 20 | [Documentation](https://enderlabs.github.io/eventmq/) | 20 | [Documentation](https://eventmq.github.io/eventmq/) |
| 21 | 21 | ||
| 22 | ## Mailing Lists | 22 | ## Mailing Lists |
| 23 | User Support: http://lists.eventmq.io/listinfo.cgi/eventmq-users-eventmq.io | 23 | User Support: http://lists.eventmq.io/listinfo.cgi/eventmq-users-eventmq.io |
| 24 | 24 | ||
| 25 | Development Discussion: http://lists.eventmq.io/listinfo.cgi/eventmq-devel-eventmq.io | 25 | Development Discussion: http://lists.eventmq.io/listinfo.cgi/eventmq-devel-eventmq.io |
| 26 | 26 | ||
| 27 | ## Quick Start | 27 | ## IRC |
| 28 | |||
| 29 | #eventmq on [irc.freenode.net](https://webchat.freenode.net/?channels=#eventmq) | ||
| 30 | |||
| 31 | # Quick Start | ||
| 28 | 32 | ||
| 29 | my_jerbs.py | 33 | my_jerbs.py |
| 30 | ``` python | 34 | ``` python |
diff --git a/bin/send_msg b/bin/send_msg index b0ea160..19e88de 100755 --- a/bin/send_msg +++ b/bin/send_msg | |||
| @@ -26,7 +26,7 @@ if __name__ == "__main__": | |||
| 26 | 'kwargs': {} | 26 | 'kwargs': {} |
| 27 | }] | 27 | }] |
| 28 | 28 | ||
| 29 | send_request(s, msg, guarantee=True, reply_requested=True, timeout=10) | 29 | send_request(s, msg, guarantee=True, reply_requested=True, timeout=1) |
| 30 | print zmq.POLLOUT | 30 | print zmq.POLLOUT |
| 31 | events = dict(poller.poll(500)) | 31 | events = dict(poller.poll(500)) |
| 32 | print events | 32 | print events |
diff --git a/bin/send_publish b/bin/send_publish index 140be63..50f430e 100755 --- a/bin/send_publish +++ b/bin/send_publish | |||
| @@ -12,13 +12,5 @@ if __name__ == "__main__": | |||
| 12 | s.connect(sys.argv[1]) | 12 | s.connect(sys.argv[1]) |
| 13 | 13 | ||
| 14 | msg = ['update', '1'] | 14 | msg = ['update', '1'] |
| 15 | j = 0 | 15 | topic = str('topic') |
| 16 | while True: | 16 | send_publish_request(s, topic, msg) |
| 17 | |||
| 18 | for i in xrange(10): | ||
| 19 | topic = str(i) | ||
| 20 | send_publish_request(s, topic, msg) | ||
| 21 | |||
| 22 | |||
| 23 | print j | ||
| 24 | j += 1 | ||
diff --git a/docs/settings_file.rst b/docs/settings_file.rst index cd37b6a..17d3b67 100644 --- a/docs/settings_file.rst +++ b/docs/settings_file.rst | |||
| @@ -33,13 +33,14 @@ running. Grouping similar jobs in named queues will help you tune this number. | |||
| 33 | 33 | ||
| 34 | queues | 34 | queues |
| 35 | ====== | 35 | ====== |
| 36 | Default: (10, default) | 36 | Default: [[10, "default"]] |
| 37 | 37 | ||
| 38 | Semi-colon seperated list of queues to process jobs for with thier | 38 | Comma seperated list of queues to process jobs for with thier weights. This list |
| 39 | weights. Example: ``queues=(10, data_process); (15, email)``. With these | 39 | must be valid JSON otherwise an error will be thrown. |
| 40 | Example: ``queues=[[10, "data_process"], [15, "email"]]``. With these | ||
| 40 | weights and the ``CONCURRENT_JOBS`` setting, you should be able to tune managers | 41 | weights and the ``CONCURRENT_JOBS`` setting, you should be able to tune managers |
| 41 | running jobs locally pretty efficiently. If you have a larger box with a weight | 42 | running jobs locally efficiently. If you have a larger server with a weight of |
| 42 | of 50 on q1 and 8 concurrent jobs and a smaller box with a weight 30 and 4 | 43 | 50 on q1 and 8 concurrent jobs and a smaller server with a weight 30 and 4 |
| 43 | concurrent jobs, the q1 jobs will be sent to the large box until it is no longer | 44 | concurrent jobs, the q1 jobs will be sent to the large box until it is no longer |
| 44 | accepting jobs. At this point jobs will start to be sent to the next highest | 45 | accepting jobs. At this point jobs will start to be sent to the next highest |
| 45 | number until the large box is ready to accept another q1 job. | 46 | number until the large box is ready to accept another q1 job. |
diff --git a/eventmq/__init__.py b/eventmq/__init__.py index 8ec8043..ca9b517 100644 --- a/eventmq/__init__.py +++ b/eventmq/__init__.py | |||
| @@ -1,5 +1,5 @@ | |||
| 1 | __author__ = 'EventMQ Contributors' | 1 | __author__ = 'EventMQ Contributors' |
| 2 | __version__ = '0.3-rc10' | 2 | __version__ = '0.3.1' |
| 3 | 3 | ||
| 4 | PROTOCOL_VERSION = 'eMQP/1.0' | 4 | PROTOCOL_VERSION = 'eMQP/1.0' |
| 5 | 5 | ||
diff --git a/eventmq/constants.py b/eventmq/constants.py index 8dbf530..17c6130 100644 --- a/eventmq/constants.py +++ b/eventmq/constants.py | |||
| @@ -7,6 +7,7 @@ class STATUS(object): | |||
| 7 | connected = 201 | 7 | connected = 201 |
| 8 | stopping = 300 | 8 | stopping = 300 |
| 9 | stopped = 301 | 9 | stopped = 301 |
| 10 | running = 400 # Running and accepting jobs actively | ||
| 10 | 11 | ||
| 11 | 12 | ||
| 12 | class CLIENT_TYPE(object): | 13 | class CLIENT_TYPE(object): |
diff --git a/eventmq/jobmanager.py b/eventmq/jobmanager.py index 12629ee..ea5af07 100644 --- a/eventmq/jobmanager.py +++ b/eventmq/jobmanager.py | |||
| @@ -27,7 +27,7 @@ import zmq | |||
| 27 | 27 | ||
| 28 | from eventmq.log import setup_logger | 28 | from eventmq.log import setup_logger |
| 29 | from . import conf | 29 | from . import conf |
| 30 | from .constants import KBYE | 30 | from .constants import KBYE, STATUS |
| 31 | from .poller import Poller, POLLIN | 31 | from .poller import Poller, POLLIN |
| 32 | from .sender import Sender | 32 | from .sender import Sender |
| 33 | from .utils.classes import EMQPService, HeartbeatMixin | 33 | from .utils.classes import EMQPService, HeartbeatMixin |
| @@ -37,6 +37,7 @@ from .utils.messages import send_emqp_message as sendmsg | |||
| 37 | from .utils.settings import import_settings | 37 | from .utils.settings import import_settings |
| 38 | from .worker import MultiprocessWorker as Worker | 38 | from .worker import MultiprocessWorker as Worker |
| 39 | 39 | ||
| 40 | |||
| 40 | if sys.version[0] == '2': | 41 | if sys.version[0] == '2': |
| 41 | import Queue | 42 | import Queue |
| 42 | else: | 43 | else: |
| @@ -124,9 +125,12 @@ class JobManager(HeartbeatMixin, EMQPService): | |||
| 124 | for i in range(0, len(self.workers)): | 125 | for i in range(0, len(self.workers)): |
| 125 | self.send_ready() | 126 | self.send_ready() |
| 126 | 127 | ||
| 128 | self.status = STATUS.running | ||
| 129 | |||
| 127 | while True: | 130 | while True: |
| 128 | # Clear any workers if it's time to shut down | 131 | # Clear any workers if it's time to shut down |
| 129 | if self.received_disconnect: | 132 | if self.received_disconnect: |
| 133 | self.status = STATUS.stopping | ||
| 130 | for _ in range(0, len(self.workers)): | 134 | for _ in range(0, len(self.workers)): |
| 131 | logger.debug('Requesting worker death...') | 135 | logger.debug('Requesting worker death...') |
| 132 | self.request_queue.put_nowait('DONE') | 136 | self.request_queue.put_nowait('DONE') |
| @@ -253,7 +257,8 @@ class JobManager(HeartbeatMixin, EMQPService): | |||
| 253 | in :meth:`self.process_message` as every message is counted as a | 257 | in :meth:`self.process_message` as every message is counted as a |
| 254 | HEARTBEAT | 258 | HEARTBEAT |
| 255 | """ | 259 | """ |
| 256 | self.check_worker_health() | 260 | if self.status == STATUS.running: |
| 261 | self.check_worker_health() | ||
| 257 | 262 | ||
| 258 | def check_worker_health(self): | 263 | def check_worker_health(self): |
| 259 | """ | 264 | """ |
| @@ -282,10 +287,11 @@ class JobManager(HeartbeatMixin, EMQPService): | |||
| 282 | 287 | ||
| 283 | def sighup_handler(self, signum, frame): | 288 | def sighup_handler(self, signum, frame): |
| 284 | logger.info('Caught signal %s' % signum) | 289 | logger.info('Caught signal %s' % signum) |
| 285 | self.outgoing.rebuild() | ||
| 286 | import_settings() | 290 | import_settings() |
| 287 | import_settings(section='jobmanager') | 291 | import_settings(section='jobmanager') |
| 288 | self.start(addr=conf.WORKER_ADDR) | 292 | |
| 293 | self.should_reset = True | ||
| 294 | self.received_disconnect = True | ||
| 289 | 295 | ||
| 290 | def sigterm_handler(self, signum, frame): | 296 | def sigterm_handler(self, signum, frame): |
| 291 | logger.info('Shutting down..') | 297 | logger.info('Shutting down..') |
diff --git a/eventmq/scheduler.py b/eventmq/scheduler.py index 8e0f5ac..4bac781 100644 --- a/eventmq/scheduler.py +++ b/eventmq/scheduler.py | |||
| @@ -401,7 +401,6 @@ class Scheduler(HeartbeatMixin, EMQPService): | |||
| 401 | Returns: | 401 | Returns: |
| 402 | int: unique hash for the job | 402 | int: unique hash for the job |
| 403 | """ | 403 | """ |
| 404 | |||
| 405 | # Get the job portion of the message | 404 | # Get the job portion of the message |
| 406 | msg = deserialize(message[3])[1] | 405 | msg = deserialize(message[3])[1] |
| 407 | 406 | ||
diff --git a/eventmq/tests/test_jobmanager.py b/eventmq/tests/test_jobmanager.py index d0497bc..db82843 100644 --- a/eventmq/tests/test_jobmanager.py +++ b/eventmq/tests/test_jobmanager.py | |||
| @@ -111,27 +111,18 @@ class TestCase(unittest.TestCase): | |||
| 111 | self.assertTrue(jm.received_disconnect, "Did not receive disconnect.") | 111 | self.assertTrue(jm.received_disconnect, "Did not receive disconnect.") |
| 112 | 112 | ||
| 113 | # Other Tests | 113 | # Other Tests |
| 114 | @mock.patch('eventmq.jobmanager.JobManager.start') | ||
| 115 | @mock.patch('eventmq.jobmanager.import_settings') | 114 | @mock.patch('eventmq.jobmanager.import_settings') |
| 116 | @mock.patch('eventmq.jobmanager.Sender.rebuild') | 115 | def test_sighup_handler(self, import_settings_mock): |
| 117 | def test_sighup_handler(self, rebuild_mock, import_settings_mock, | ||
| 118 | start_mock): | ||
| 119 | jm = jobmanager.JobManager() | 116 | jm = jobmanager.JobManager() |
| 120 | 117 | ||
| 121 | jm.sighup_handler(982374, "FRAMEY the frame") | 118 | jm.sighup_handler(982374, "FRAMEY the frame") |
| 122 | 119 | ||
| 123 | self.assertTrue(rebuild_mock.called) | ||
| 124 | |||
| 125 | # called once for the default settings, once for the jobmanager | 120 | # called once for the default settings, once for the jobmanager |
| 126 | # settings | 121 | # settings |
| 127 | self.assertEqual(2, import_settings_mock.call_count) | 122 | self.assertEqual(2, import_settings_mock.call_count) |
| 128 | # check to see if the last call was called with the jobmanager section | 123 | # check to see if the last call was called with the jobmanager section |
| 129 | import_settings_mock.assert_called_with(section='jobmanager') | 124 | import_settings_mock.assert_called_with(section='jobmanager') |
| 130 | 125 | ||
| 131 | start_mock.assert_called_with( | ||
| 132 | addr=conf.WORKER_ADDR, | ||
| 133 | ) | ||
| 134 | |||
| 135 | @mock.patch('eventmq.jobmanager.sendmsg') | 126 | @mock.patch('eventmq.jobmanager.sendmsg') |
| 136 | def test_sigterm_handler(self, sendmsg_mock): | 127 | def test_sigterm_handler(self, sendmsg_mock): |
| 137 | jm = jobmanager.JobManager() | 128 | jm = jobmanager.JobManager() |
diff --git a/eventmq/utils/classes.py b/eventmq/utils/classes.py index d1ca275..39c600b 100644 --- a/eventmq/utils/classes.py +++ b/eventmq/utils/classes.py | |||
| @@ -119,6 +119,7 @@ class EMQPService(object): | |||
| 119 | self.poller.register(self.outgoing, poller.POLLIN) | 119 | self.poller.register(self.outgoing, poller.POLLIN) |
| 120 | self.awaiting_startup_ack = False | 120 | self.awaiting_startup_ack = False |
| 121 | self.received_disconnect = False | 121 | self.received_disconnect = False |
| 122 | self.should_reset = False | ||
| 122 | 123 | ||
| 123 | self.status = constants.STATUS.ready | 124 | self.status = constants.STATUS.ready |
| 124 | 125 | ||
| @@ -165,7 +166,7 @@ class EMQPService(object): | |||
| 165 | 166 | ||
| 166 | # When we return, soemthing has gone wrong and try to reconnect | 167 | # When we return, soemthing has gone wrong and try to reconnect |
| 167 | # unless self.received_disconnect is True | 168 | # unless self.received_disconnect is True |
| 168 | if not self.received_disconnect: | 169 | if not self.received_disconnect or self.should_reset: |
| 169 | self.reset() | 170 | self.reset() |
| 170 | 171 | ||
| 171 | logger.info('Death.') | 172 | logger.info('Death.') |
diff --git a/eventmq/worker.py b/eventmq/worker.py index 7729120..8d25c63 100644 --- a/eventmq/worker.py +++ b/eventmq/worker.py | |||
| @@ -21,13 +21,29 @@ from importlib import import_module | |||
| 21 | import logging | 21 | import logging |
| 22 | from multiprocessing import Process | 22 | from multiprocessing import Process |
| 23 | import os | 23 | import os |
| 24 | from threading import Thread | 24 | from threading import Event, Thread |
| 25 | 25 | ||
| 26 | from . import conf | 26 | from . import conf |
| 27 | 27 | ||
| 28 | logger = logging.getLogger(__name__) | 28 | logger = logging.getLogger(__name__) |
| 29 | 29 | ||
| 30 | 30 | ||
| 31 | class StoppableThread(Thread): | ||
| 32 | """Thread class with a stop() method. The thread itself has to check | ||
| 33 | regularly for the stopped() condition.""" | ||
| 34 | |||
| 35 | def __init__(self, target, name=None, args=()): | ||
| 36 | super(StoppableThread, self).__init__(name=name, target=target, | ||
| 37 | args=args) | ||
| 38 | self._stop = Event() | ||
| 39 | |||
| 40 | def stop(self): | ||
| 41 | self._stop.set() | ||
| 42 | |||
| 43 | def stopped(self): | ||
| 44 | return self._stop.isSet() | ||
| 45 | |||
| 46 | |||
| 31 | class MultiprocessWorker(Process): | 47 | class MultiprocessWorker(Process): |
| 32 | """ | 48 | """ |
| 33 | Defines a worker that spans the job in a multiprocessing task | 49 | Defines a worker that spans the job in a multiprocessing task |
| @@ -60,12 +76,13 @@ class MultiprocessWorker(Process): | |||
| 60 | 76 | ||
| 61 | try: | 77 | try: |
| 62 | if timeout: | 78 | if timeout: |
| 63 | worker_thread = Thread(target=_run, | 79 | worker_thread = StoppableThread(target=_run, |
| 64 | args=(payload['params'], )) | 80 | args=(payload['params'], )) |
| 65 | worker_thread.start() | 81 | worker_thread.start() |
| 66 | worker_thread.join(timeout) | 82 | worker_thread.join(timeout) |
| 67 | 83 | ||
| 68 | if worker_thread.isAlive(): | 84 | if worker_thread.isAlive(): |
| 85 | worker_thread.stop() | ||
| 69 | resp['return'] = 'TimeoutError' | 86 | resp['return'] = 'TimeoutError' |
| 70 | else: | 87 | else: |
| 71 | resp['return'] = 'DONE' | 88 | resp['return'] = 'DONE' |
| @@ -80,6 +97,9 @@ class MultiprocessWorker(Process): | |||
| 80 | if self.job_count > conf.MAX_JOB_COUNT: | 97 | if self.job_count > conf.MAX_JOB_COUNT: |
| 81 | break | 98 | break |
| 82 | 99 | ||
| 100 | if resp['return'] == 'TimeoutError': | ||
| 101 | break | ||
| 102 | |||
| 83 | logger.debug("Worker death, PID: {}".format(os.getpid())) | 103 | logger.debug("Worker death, PID: {}".format(os.getpid())) |
| 84 | 104 | ||
| 85 | 105 | ||
| @@ -7,8 +7,8 @@ from setuptools import find_packages, setup | |||
| 7 | 7 | ||
| 8 | setup( | 8 | setup( |
| 9 | name='eventmq', | 9 | name='eventmq', |
| 10 | version='0.3-rc10', | 10 | version='0.3.1', |
| 11 | description='EventMQ messaging system based on ZeroMQ', | 11 | description='EventMQ job execution and messaging system based on ZeroMQ', |
| 12 | packages=find_packages(), | 12 | packages=find_packages(), |
| 13 | install_requires=['pyzmq==16.0.2', | 13 | install_requires=['pyzmq==16.0.2', |
| 14 | 'six==1.10.0', | 14 | 'six==1.10.0', |
| @@ -32,7 +32,7 @@ setup( | |||
| 32 | 'mock==1.3.0'], | 32 | 'mock==1.3.0'], |
| 33 | }, | 33 | }, |
| 34 | author='EventMQ Contributors', | 34 | author='EventMQ Contributors', |
| 35 | url='https://github.com/enderlabs/eventmq/', | 35 | url='https://github.com/eventmq/eventmq/', |
| 36 | 36 | ||
| 37 | # See https://pypi.python.org/pypi?%3Aaction=list_classifiers | 37 | # See https://pypi.python.org/pypi?%3Aaction=list_classifiers |
| 38 | classifiers=[ | 38 | classifiers=[ |
| @@ -53,6 +53,8 @@ setup( | |||
| 53 | # that you indicate whether you support Python 2, Python 3 or both. | 53 | # that you indicate whether you support Python 2, Python 3 or both. |
| 54 | 'Programming Language :: Python :: 2.6', | 54 | 'Programming Language :: Python :: 2.6', |
| 55 | 'Programming Language :: Python :: 2.7', | 55 | 'Programming Language :: Python :: 2.7', |
| 56 | 'Programming Language :: Python :: 3.4', | ||
| 57 | 'Programming Language :: Python :: 3.5', | ||
| 56 | ], | 58 | ], |
| 57 | scripts=[ | 59 | scripts=[ |
| 58 | 'bin/emq-cli', | 60 | 'bin/emq-cli', |