diff options
| -rw-r--r-- | etc/eventmq.conf-dist | 15 | ||||
| -rw-r--r-- | eventmq/__init__.py | 2 | ||||
| -rw-r--r-- | eventmq/conf.py | 9 | ||||
| -rw-r--r-- | eventmq/jobmanager.py | 2 | ||||
| -rw-r--r-- | eventmq/router.py | 7 | ||||
| -rw-r--r-- | eventmq/scheduler.py | 2 | ||||
| -rw-r--r-- | eventmq/tests/test_worker.py | 139 | ||||
| -rw-r--r-- | eventmq/worker.py | 38 | ||||
| -rw-r--r-- | setup.py | 2 |
9 files changed, 199 insertions, 17 deletions
diff --git a/etc/eventmq.conf-dist b/etc/eventmq.conf-dist index 43ec6f2..9a59f2a 100644 --- a/etc/eventmq.conf-dist +++ b/etc/eventmq.conf-dist | |||
| @@ -19,9 +19,22 @@ scheduler_addr=tcp://eventmq:47291 | |||
| 19 | 19 | ||
| 20 | [jobmanager] | 20 | [jobmanager] |
| 21 | worker_addr=tcp://127.0.0.1:47290 | 21 | worker_addr=tcp://127.0.0.1:47290 |
| 22 | queues=[[50,"google"], [40,"pushes"], [10,"default"]] | 22 | |
| 23 | # Defines the weight and name of queues this worker deals with. | ||
| 24 | queues=[[20,"heavy-cpu"], [30,"low-cpu"], [10,"default"]] | ||
| 25 | |||
| 26 | # Specifies the number of of sub-processes to spawn to process jobs concurrently | ||
| 23 | concurrent_jobs=2 | 27 | concurrent_jobs=2 |
| 24 | 28 | ||
| 29 | # This function is executed when EventMQ creates a new worker subprocess | ||
| 30 | # subprocess_setup_func = path.to.my_setup_function | ||
| 31 | |||
| 32 | # This function is executed before every job | ||
| 33 | # job_entry_func = path.to.my_job_setup_function | ||
| 34 | |||
| 35 | # This function is executed after every job | ||
| 36 | # job_exit_func = path.to.my_job_teardown_function | ||
| 37 | |||
| 25 | [publisher] | 38 | [publisher] |
| 26 | publisher_incoming_addr=tcp://0.0.0.0:47298 | 39 | publisher_incoming_addr=tcp://0.0.0.0:47298 |
| 27 | publisher_outgoing_addr=tcp://0.0.0.0:47299 \ No newline at end of file | 40 | publisher_outgoing_addr=tcp://0.0.0.0:47299 \ No newline at end of file |
diff --git a/eventmq/__init__.py b/eventmq/__init__.py index aefc3a2..2e180ed 100644 --- a/eventmq/__init__.py +++ b/eventmq/__init__.py | |||
| @@ -1,5 +1,5 @@ | |||
| 1 | __author__ = 'EventMQ Contributors' | 1 | __author__ = 'EventMQ Contributors' |
| 2 | __version__ = '0.3.5.6' | 2 | __version__ = '0.3.6' |
| 3 | 3 | ||
| 4 | PROTOCOL_VERSION = 'eMQP/1.0' | 4 | PROTOCOL_VERSION = 'eMQP/1.0' |
| 5 | 5 | ||
diff --git a/eventmq/conf.py b/eventmq/conf.py index 8609716..715bbb2 100644 --- a/eventmq/conf.py +++ b/eventmq/conf.py | |||
| @@ -89,9 +89,18 @@ RQ_PASSWORD = '' | |||
| 89 | MAX_JOB_COUNT = 1024 | 89 | MAX_JOB_COUNT = 1024 |
| 90 | 90 | ||
| 91 | # Path/Callable to run on start of a worker process | 91 | # Path/Callable to run on start of a worker process |
| 92 | # These options are deprecated for the more user-friendly | ||
| 93 | # SUBPROCESS_SETUP_FUNC which can be a full path to a function. | ||
| 92 | SETUP_PATH = '' | 94 | SETUP_PATH = '' |
| 93 | SETUP_CALLABLE = '' | 95 | SETUP_CALLABLE = '' |
| 94 | 96 | ||
| 97 | # Function to run on the start of a new worker subprocess | ||
| 98 | SUBPROCESS_SETUP_FUNC = '' | ||
| 99 | |||
| 100 | # function to be run before the execution of every job | ||
| 101 | JOB_ENTRY_FUNC = '' | ||
| 102 | # function to be run after the execution of every job | ||
| 103 | JOB_EXIT_FUNC = '' | ||
| 95 | # Time to wait after receiving SIGTERM to kill the workers in the jobmanager | 104 | # Time to wait after receiving SIGTERM to kill the workers in the jobmanager |
| 96 | # forecfully | 105 | # forecfully |
| 97 | KILL_GRACE_PERIOD = 300 | 106 | KILL_GRACE_PERIOD = 300 |
diff --git a/eventmq/jobmanager.py b/eventmq/jobmanager.py index 78a080b..f4a928c 100644 --- a/eventmq/jobmanager.py +++ b/eventmq/jobmanager.py | |||
| @@ -29,6 +29,7 @@ import time | |||
| 29 | import zmq | 29 | import zmq |
| 30 | 30 | ||
| 31 | from eventmq.log import setup_logger | 31 | from eventmq.log import setup_logger |
| 32 | from . import __version__ | ||
| 32 | from . import conf | 33 | from . import conf |
| 33 | from .constants import KBYE, STATUS | 34 | from .constants import KBYE, STATUS |
| 34 | from .poller import Poller, POLLIN | 35 | from .poller import Poller, POLLIN |
| @@ -80,6 +81,7 @@ class JobManager(HeartbeatMixin, EMQPService): | |||
| 80 | #: Define the name of this JobManager instance. Useful to know when | 81 | #: Define the name of this JobManager instance. Useful to know when |
| 81 | #: referring to the logs. | 82 | #: referring to the logs. |
| 82 | self.name = kwargs.pop('name', None) or generate_device_name() | 83 | self.name = kwargs.pop('name', None) or generate_device_name() |
| 84 | logger.info('EventMQ Version {}'.format(__version__)) | ||
| 83 | logger.info('Initializing JobManager {}...'.format(self.name)) | 85 | logger.info('Initializing JobManager {}...'.format(self.name)) |
| 84 | 86 | ||
| 85 | #: keep track of workers | 87 | #: keep track of workers |
diff --git a/eventmq/router.py b/eventmq/router.py index 0b896b3..81ca257 100644 --- a/eventmq/router.py +++ b/eventmq/router.py | |||
| @@ -23,6 +23,7 @@ import logging | |||
| 23 | import signal | 23 | import signal |
| 24 | 24 | ||
| 25 | from eventmq.log import setup_logger, setup_wal_logger | 25 | from eventmq.log import setup_logger, setup_wal_logger |
| 26 | from . import __version__ | ||
| 26 | from . import conf, constants, exceptions, poller, receiver | 27 | from . import conf, constants, exceptions, poller, receiver |
| 27 | from .constants import ( | 28 | from .constants import ( |
| 28 | CLIENT_TYPE, DISCONNECT, KBYE, PROTOCOL_VERSION, ROUTER_SHOW_SCHEDULERS, | 29 | CLIENT_TYPE, DISCONNECT, KBYE, PROTOCOL_VERSION, ROUTER_SHOW_SCHEDULERS, |
| @@ -51,8 +52,11 @@ class Router(HeartbeatMixin): | |||
| 51 | def __init__(self, *args, **kwargs): | 52 | def __init__(self, *args, **kwargs): |
| 52 | super(Router, self).__init__(*args, **kwargs) # Creates _meta | 53 | super(Router, self).__init__(*args, **kwargs) # Creates _meta |
| 53 | 54 | ||
| 55 | setup_logger("eventmq") | ||
| 56 | |||
| 54 | self.name = generate_device_name() | 57 | self.name = generate_device_name() |
| 55 | logger.info('Initializing Router %s...' % self.name) | 58 | logger.info('EventMQ Version {}'.format(__version__)) |
| 59 | logger.info('Initializing Router {}...'.format(self.name)) | ||
| 56 | 60 | ||
| 57 | self.poller = poller.Poller() | 61 | self.poller = poller.Poller() |
| 58 | 62 | ||
| @@ -943,7 +947,6 @@ class Router(HeartbeatMixin): | |||
| 943 | """ | 947 | """ |
| 944 | Kick off router with logging and settings import | 948 | Kick off router with logging and settings import |
| 945 | """ | 949 | """ |
| 946 | setup_logger('eventmq') | ||
| 947 | import_settings() | 950 | import_settings() |
| 948 | setup_wal_logger('eventmq-wal', conf.WAL) | 951 | setup_wal_logger('eventmq-wal', conf.WAL) |
| 949 | self.start(frontend_addr=conf.FRONTEND_ADDR, | 952 | self.start(frontend_addr=conf.FRONTEND_ADDR, |
diff --git a/eventmq/scheduler.py b/eventmq/scheduler.py index 32c9b46..2ff726a 100644 --- a/eventmq/scheduler.py +++ b/eventmq/scheduler.py | |||
| @@ -29,6 +29,7 @@ from six import next | |||
| 29 | 29 | ||
| 30 | from eventmq.log import setup_logger | 30 | from eventmq.log import setup_logger |
| 31 | 31 | ||
| 32 | from . import __version__ | ||
| 32 | from . import conf, constants | 33 | from . import conf, constants |
| 33 | from .client.messages import send_request | 34 | from .client.messages import send_request |
| 34 | from .constants import KBYE | 35 | from .constants import KBYE |
| @@ -53,6 +54,7 @@ class Scheduler(HeartbeatMixin, EMQPService): | |||
| 53 | def __init__(self, *args, **kwargs): | 54 | def __init__(self, *args, **kwargs): |
| 54 | self.name = kwargs.get('name', None) | 55 | self.name = kwargs.get('name', None) |
| 55 | 56 | ||
| 57 | logger.info('EventMQ Version {}'.format(__version__)) | ||
| 56 | logger.info('Initializing Scheduler...') | 58 | logger.info('Initializing Scheduler...') |
| 57 | import_settings() | 59 | import_settings() |
| 58 | super(Scheduler, self).__init__(*args, **kwargs) | 60 | super(Scheduler, self).__init__(*args, **kwargs) |
diff --git a/eventmq/tests/test_worker.py b/eventmq/tests/test_worker.py index 5e72f39..ec13be6 100644 --- a/eventmq/tests/test_worker.py +++ b/eventmq/tests/test_worker.py | |||
| @@ -14,11 +14,21 @@ | |||
| 14 | # along with eventmq. If not, see <http://www.gnu.org/licenses/>. | 14 | # along with eventmq. If not, see <http://www.gnu.org/licenses/>. |
| 15 | 15 | ||
| 16 | import logging | 16 | import logging |
| 17 | import sys | ||
| 17 | import time | 18 | import time |
| 18 | 19 | ||
| 20 | import mock | ||
| 21 | |||
| 19 | from .. import worker | 22 | from .. import worker |
| 20 | 23 | ||
| 24 | if sys.version[0] == '2': | ||
| 25 | import Queue | ||
| 26 | else: | ||
| 27 | import queue as Queue | ||
| 28 | |||
| 29 | |||
| 21 | ADDR = 'inproc://pour_the_rice_in_the_thing' | 30 | ADDR = 'inproc://pour_the_rice_in_the_thing' |
| 31 | SETUP_SUCCESS_RETVAL = 'job setup success' | ||
| 22 | 32 | ||
| 23 | 33 | ||
| 24 | def test_run_with_timeout(): | 34 | def test_run_with_timeout(): |
| @@ -33,11 +43,124 @@ def test_run_with_timeout(): | |||
| 33 | assert msgid | 43 | assert msgid |
| 34 | 44 | ||
| 35 | 45 | ||
| 46 | @mock.patch('eventmq.worker.callable_from_name') | ||
| 47 | def test_run_job_setup_hook(callable_from_name_mock): | ||
| 48 | from eventmq import conf | ||
| 49 | |||
| 50 | setup_func_str = 'eventmq.tests.test_worker.job_setup_hook' | ||
| 51 | callable_from_name_mock.return_value = mock.Mock() | ||
| 52 | |||
| 53 | payload = { | ||
| 54 | 'path': 'eventmq.tests.test_worker', | ||
| 55 | 'callable': 'job', | ||
| 56 | 'args': [2] | ||
| 57 | } | ||
| 58 | |||
| 59 | q, res_q = Queue.Queue(), Queue.Queue() | ||
| 60 | |||
| 61 | q.put(payload) | ||
| 62 | q.put('DONE') | ||
| 63 | |||
| 64 | try: | ||
| 65 | conf.JOB_ENTRY_FUNC = setup_func_str | ||
| 66 | worker._run(q, res_q, logging.getLogger()) | ||
| 67 | finally: | ||
| 68 | conf.JOB_ENTRY_FUNC = '' | ||
| 69 | |||
| 70 | callable_from_name_mock.assert_called_with(setup_func_str) | ||
| 71 | assert callable_from_name_mock.return_value.call_count == 1 | ||
| 72 | |||
| 73 | |||
| 74 | @mock.patch('eventmq.worker.callable_from_name') | ||
| 75 | def test_run_job_teardown_hook(callable_from_name_mock): | ||
| 76 | from eventmq import conf | ||
| 77 | |||
| 78 | teardown_func_str = 'eventmq.tests.test_worker.job_teardown_hook' | ||
| 79 | callable_from_name_mock.return_value = mock.Mock() | ||
| 80 | |||
| 81 | payload = { | ||
| 82 | 'path': 'eventmq.tests.test_worker', | ||
| 83 | 'callable': 'job', | ||
| 84 | 'args': [2] | ||
| 85 | } | ||
| 86 | |||
| 87 | q, res_q = Queue.Queue(), Queue.Queue() | ||
| 88 | |||
| 89 | q.put(payload) | ||
| 90 | q.put('DONE') | ||
| 91 | |||
| 92 | try: | ||
| 93 | conf.JOB_EXIT_FUNC = teardown_func_str | ||
| 94 | worker._run(q, res_q, logging.getLogger()) | ||
| 95 | finally: | ||
| 96 | conf.JOB_EXIT_FUNC = '' | ||
| 97 | |||
| 98 | callable_from_name_mock.assert_called_with(teardown_func_str) | ||
| 99 | assert callable_from_name_mock.return_value.call_count == 1 | ||
| 100 | |||
| 101 | |||
| 102 | @mock.patch('eventmq.worker.callable_from_name') | ||
| 103 | def test_run_subprocess_setup_func(callable_from_name_mock): | ||
| 104 | from eventmq import conf | ||
| 105 | |||
| 106 | setup_func_str = 'eventmq.tests.test_worker.process_setup_hook' | ||
| 107 | callable_from_name_mock.return_value = mock.Mock() | ||
| 108 | |||
| 109 | payload = { | ||
| 110 | 'path': 'eventmq.tests.test_worker', | ||
| 111 | 'callable': 'job', | ||
| 112 | 'args': [2] | ||
| 113 | } | ||
| 114 | |||
| 115 | q, res_q = Queue.Queue(), Queue.Queue() | ||
| 116 | |||
| 117 | q.put(payload) | ||
| 118 | q.put('DONE') | ||
| 119 | |||
| 120 | try: | ||
| 121 | conf.SUBPROCESS_SETUP_FUNC = setup_func_str | ||
| 122 | worker._run(q, res_q, logging.getLogger()) | ||
| 123 | finally: | ||
| 124 | conf.SUBPROCESS_SETUP_FUNC = '' | ||
| 125 | |||
| 126 | callable_from_name_mock.assert_called_with(setup_func_str) | ||
| 127 | assert callable_from_name_mock.return_value.call_count == 1 | ||
| 128 | |||
| 129 | |||
| 130 | @mock.patch('eventmq.worker.run_setup') | ||
| 131 | def test_run_run_setup_func(run_setup_mock): | ||
| 132 | from eventmq import conf | ||
| 133 | |||
| 134 | setup_func_path = 'eventmq.tests.test_worker' | ||
| 135 | setup_func_callable = 'process_setup_hook' | ||
| 136 | |||
| 137 | payload = { | ||
| 138 | 'path': 'eventmq.tests.test_worker', | ||
| 139 | 'callable': 'job', | ||
| 140 | 'args': [2] | ||
| 141 | } | ||
| 142 | |||
| 143 | q, res_q = Queue.Queue(), Queue.Queue() | ||
| 144 | |||
| 145 | q.put(payload) | ||
| 146 | q.put('DONE') | ||
| 147 | |||
| 148 | try: | ||
| 149 | conf.SETUP_PATH = setup_func_path | ||
| 150 | conf.SETUP_CALLABLE = setup_func_callable | ||
| 151 | worker._run(q, res_q, logging.getLogger()) | ||
| 152 | finally: | ||
| 153 | conf.SETUP_PATH = '' | ||
| 154 | conf.SETUP_CALLABLE = '' | ||
| 155 | |||
| 156 | run_setup_mock.assert_called_with(setup_func_path, setup_func_callable) | ||
| 157 | |||
| 158 | |||
| 36 | def test_run_setup(): | 159 | def test_run_setup(): |
| 37 | setup_callable = 'pre_hook' | 160 | setup_callable = 'process_setup_hook' |
| 38 | setup_path = 'eventmq.tests.test_worker' | 161 | setup_path = 'eventmq.tests.test_worker' |
| 39 | 162 | ||
| 40 | worker.run_setup(setup_path, setup_callable) | 163 | assert worker.run_setup(setup_path, setup_callable) |
| 41 | 164 | ||
| 42 | 165 | ||
| 43 | def job(sleep_time=0): | 166 | def job(sleep_time=0): |
| @@ -46,9 +169,13 @@ def job(sleep_time=0): | |||
| 46 | return True | 169 | return True |
| 47 | 170 | ||
| 48 | 171 | ||
| 49 | def pre_hook(): | 172 | def process_setup_hook(): |
| 50 | return 1 | 173 | return True |
| 51 | 174 | ||
| 52 | 175 | ||
| 53 | def post_hook(): | 176 | def job_setup_hook(): |
| 54 | return 1 | 177 | return SETUP_SUCCESS_RETVAL |
| 178 | |||
| 179 | |||
| 180 | def job_teardown_hook(): | ||
| 181 | return True | ||
diff --git a/eventmq/worker.py b/eventmq/worker.py index 1621578..1f3a6fa 100644 --- a/eventmq/worker.py +++ b/eventmq/worker.py | |||
| @@ -18,17 +18,14 @@ | |||
| 18 | Defines different short-lived workers that execute jobs | 18 | Defines different short-lived workers that execute jobs |
| 19 | """ | 19 | """ |
| 20 | from importlib import import_module | 20 | from importlib import import_module |
| 21 | |||
| 22 | import logging | 21 | import logging |
| 23 | |||
| 24 | from multiprocessing import Process | 22 | from multiprocessing import Process |
| 25 | |||
| 26 | import os | 23 | import os |
| 27 | import sys | 24 | import sys |
| 28 | |||
| 29 | from threading import Thread | 25 | from threading import Thread |
| 30 | 26 | ||
| 31 | from . import conf | 27 | from . import conf |
| 28 | from .utils.functions import callable_from_name | ||
| 32 | 29 | ||
| 33 | if sys.version[0] == '2': | 30 | if sys.version[0] == '2': |
| 34 | import Queue | 31 | import Queue |
| @@ -180,7 +177,19 @@ def _run(queue, result_queue, logger): | |||
| 180 | "class_kwargs": {"value": 2} | 177 | "class_kwargs": {"value": 2} |
| 181 | } | 178 | } |
| 182 | """ | 179 | """ |
| 183 | if any(conf.SETUP_CALLABLE) and any(conf.SETUP_PATH): | 180 | if any(conf.SUBPROCESS_SETUP_FUNC): |
| 181 | try: | ||
| 182 | logger.debug("Running setup ({}) for worker id {}".format( | ||
| 183 | conf.SUBPROCESS_SETUP_FUNC, os.getpid())) | ||
| 184 | setup_func = callable_from_name(conf.SUBPROCESS_SETUP_FUNC) | ||
| 185 | setup_func() | ||
| 186 | except Exception as e: | ||
| 187 | logger.warning('Unable to do setup task ({}): {}' | ||
| 188 | .format(conf.SUBPROCESS_SETUP_FUNC, str(e))) | ||
| 189 | |||
| 190 | elif any(conf.SETUP_CALLABLE) and any(conf.SETUP_PATH): | ||
| 191 | logger.warning("SETUP_CALLABLE and SETUP_PATH deprecated in favor for " | ||
| 192 | "SUBPROCESS_SETUP_FUNC") | ||
| 184 | try: | 193 | try: |
| 185 | logger.debug("Running setup ({}.{}) for worker id {}" | 194 | logger.debug("Running setup ({}.{}) for worker id {}" |
| 186 | .format( | 195 | .format( |
| @@ -193,6 +202,16 @@ def _run(queue, result_queue, logger): | |||
| 193 | .format(conf.SETUP_PATH, | 202 | .format(conf.SETUP_PATH, |
| 194 | conf.SETUP_CALLABLE, str(e))) | 203 | conf.SETUP_CALLABLE, str(e))) |
| 195 | 204 | ||
| 205 | if conf.JOB_ENTRY_FUNC: | ||
| 206 | job_entry_func = callable_from_name(conf.JOB_ENTRY_FUNC) | ||
| 207 | else: | ||
| 208 | job_entry_func = None | ||
| 209 | |||
| 210 | if conf.JOB_EXIT_FUNC: | ||
| 211 | job_exit_func = callable_from_name(conf.JOB_EXIT_FUNC) | ||
| 212 | else: | ||
| 213 | job_exit_func = None | ||
| 214 | |||
| 196 | while True: | 215 | while True: |
| 197 | # Blocking get so we don't spin cycles reading over and over | 216 | # Blocking get so we don't spin cycles reading over and over |
| 198 | try: | 217 | try: |
| @@ -204,7 +223,14 @@ def _run(queue, result_queue, logger): | |||
| 204 | if payload == 'DONE': | 223 | if payload == 'DONE': |
| 205 | break | 224 | break |
| 206 | 225 | ||
| 226 | if job_entry_func: | ||
| 227 | job_entry_func() | ||
| 228 | |||
| 207 | return_val = _run_job(payload, logger) | 229 | return_val = _run_job(payload, logger) |
| 230 | |||
| 231 | if job_exit_func: | ||
| 232 | job_exit_func() | ||
| 233 | |||
| 208 | # Signal that we're done with this job and put its return value on the | 234 | # Signal that we're done with this job and put its return value on the |
| 209 | # result queue | 235 | # result queue |
| 210 | result_queue.put(return_val) | 236 | result_queue.put(return_val) |
| @@ -278,4 +304,4 @@ def run_setup(setup_path, setup_callable): | |||
| 278 | 304 | ||
| 279 | setup_callable_ = getattr(setup_package, setup_callable) | 305 | setup_callable_ = getattr(setup_package, setup_callable) |
| 280 | 306 | ||
| 281 | setup_callable_() | 307 | return setup_callable_() |
| @@ -7,7 +7,7 @@ from setuptools import find_packages, setup | |||
| 7 | 7 | ||
| 8 | setup( | 8 | setup( |
| 9 | name='eventmq', | 9 | name='eventmq', |
| 10 | version='0.3.5.6', | 10 | version='0.3.6', |
| 11 | description='EventMQ job execution and messaging system based on ZeroMQ', | 11 | description='EventMQ job execution and messaging system based on ZeroMQ', |
| 12 | packages=find_packages(), | 12 | packages=find_packages(), |
| 13 | install_requires=['pyzmq==15.4.0', | 13 | install_requires=['pyzmq==15.4.0', |