diff options
| -rw-r--r-- | etc/eventmq.conf-dist | 15 | ||||
| -rw-r--r-- | eventmq/__init__.py | 2 | ||||
| -rw-r--r-- | eventmq/conf.py | 9 | ||||
| -rw-r--r-- | eventmq/tests/test_worker.py | 142 | ||||
| -rw-r--r-- | eventmq/worker.py | 38 | ||||
| -rw-r--r-- | setup.py | 2 |
6 files changed, 192 insertions, 16 deletions
diff --git a/etc/eventmq.conf-dist b/etc/eventmq.conf-dist index 43ec6f2..1ff09c4 100644 --- a/etc/eventmq.conf-dist +++ b/etc/eventmq.conf-dist | |||
| @@ -19,9 +19,22 @@ scheduler_addr=tcp://eventmq:47291 | |||
| 19 | 19 | ||
| 20 | [jobmanager] | 20 | [jobmanager] |
| 21 | worker_addr=tcp://127.0.0.1:47290 | 21 | worker_addr=tcp://127.0.0.1:47290 |
| 22 | queues=[[50,"google"], [40,"pushes"], [10,"default"]] | 22 | |
| 23 | # Defines the weight and name of queues this worker deals with. | ||
| 24 | queues=[[20,"heavy-cpu"], [30,"low-cpu"], [10,"default"]] | ||
| 25 | |||
| 26 | # Specifies the number of of sub-processes to spawn to process jobs concurrently | ||
| 23 | concurrent_jobs=2 | 27 | concurrent_jobs=2 |
| 24 | 28 | ||
| 29 | # This function is executed when EventMQ creates a new worker subprocess | ||
| 30 | # subprocess_setup_func = 'path.to.my_setup_function' | ||
| 31 | |||
| 32 | # This function is executed before every job | ||
| 33 | # job_entry_func = 'path.to.my_job_setup_function' | ||
| 34 | |||
| 35 | # This function is executed after every job | ||
| 36 | # job_exit_func = 'path.to.my_job_teardown_function' | ||
| 37 | |||
| 25 | [publisher] | 38 | [publisher] |
| 26 | publisher_incoming_addr=tcp://0.0.0.0:47298 | 39 | publisher_incoming_addr=tcp://0.0.0.0:47298 |
| 27 | publisher_outgoing_addr=tcp://0.0.0.0:47299 \ No newline at end of file | 40 | publisher_outgoing_addr=tcp://0.0.0.0:47299 \ No newline at end of file |
diff --git a/eventmq/__init__.py b/eventmq/__init__.py index aefc3a2..2e180ed 100644 --- a/eventmq/__init__.py +++ b/eventmq/__init__.py | |||
| @@ -1,5 +1,5 @@ | |||
| 1 | __author__ = 'EventMQ Contributors' | 1 | __author__ = 'EventMQ Contributors' |
| 2 | __version__ = '0.3.5.6' | 2 | __version__ = '0.3.6' |
| 3 | 3 | ||
| 4 | PROTOCOL_VERSION = 'eMQP/1.0' | 4 | PROTOCOL_VERSION = 'eMQP/1.0' |
| 5 | 5 | ||
diff --git a/eventmq/conf.py b/eventmq/conf.py index 8609716..715bbb2 100644 --- a/eventmq/conf.py +++ b/eventmq/conf.py | |||
| @@ -89,9 +89,18 @@ RQ_PASSWORD = '' | |||
| 89 | MAX_JOB_COUNT = 1024 | 89 | MAX_JOB_COUNT = 1024 |
| 90 | 90 | ||
| 91 | # Path/Callable to run on start of a worker process | 91 | # Path/Callable to run on start of a worker process |
| 92 | # These options are deprecated for the more user-friendly | ||
| 93 | # SUBPROCESS_SETUP_FUNC which can be a full path to a function. | ||
| 92 | SETUP_PATH = '' | 94 | SETUP_PATH = '' |
| 93 | SETUP_CALLABLE = '' | 95 | SETUP_CALLABLE = '' |
| 94 | 96 | ||
| 97 | # Function to run on the start of a new worker subprocess | ||
| 98 | SUBPROCESS_SETUP_FUNC = '' | ||
| 99 | |||
| 100 | # function to be run before the execution of every job | ||
| 101 | JOB_ENTRY_FUNC = '' | ||
| 102 | # function to be run after the execution of every job | ||
| 103 | JOB_EXIT_FUNC = '' | ||
| 95 | # Time to wait after receiving SIGTERM to kill the workers in the jobmanager | 104 | # Time to wait after receiving SIGTERM to kill the workers in the jobmanager |
| 96 | # forecfully | 105 | # forecfully |
| 97 | KILL_GRACE_PERIOD = 300 | 106 | KILL_GRACE_PERIOD = 300 |
diff --git a/eventmq/tests/test_worker.py b/eventmq/tests/test_worker.py index 5e72f39..b8e4f3e 100644 --- a/eventmq/tests/test_worker.py +++ b/eventmq/tests/test_worker.py | |||
| @@ -15,11 +15,19 @@ | |||
| 15 | 15 | ||
| 16 | import logging | 16 | import logging |
| 17 | import time | 17 | import time |
| 18 | import sys | ||
| 18 | 19 | ||
| 20 | import mock | ||
| 19 | from .. import worker | 21 | from .. import worker |
| 20 | 22 | ||
| 21 | ADDR = 'inproc://pour_the_rice_in_the_thing' | 23 | if sys.version[0] == '2': |
| 24 | import Queue | ||
| 25 | else: | ||
| 26 | import queue as Queue | ||
| 27 | |||
| 22 | 28 | ||
| 29 | ADDR = 'inproc://pour_the_rice_in_the_thing' | ||
| 30 | SETUP_SUCCESS_RETVAL = 'job setup success' | ||
| 23 | 31 | ||
| 24 | def test_run_with_timeout(): | 32 | def test_run_with_timeout(): |
| 25 | payload = { | 33 | payload = { |
| @@ -33,11 +41,124 @@ def test_run_with_timeout(): | |||
| 33 | assert msgid | 41 | assert msgid |
| 34 | 42 | ||
| 35 | 43 | ||
| 44 | @mock.patch('eventmq.worker.callable_from_name') | ||
| 45 | def test_run_job_setup_hook(callable_from_name_mock): | ||
| 46 | from eventmq import conf | ||
| 47 | |||
| 48 | setup_func_str = 'eventmq.tests.test_worker.job_setup_hook' | ||
| 49 | callable_from_name_mock.return_value = mock.Mock() | ||
| 50 | |||
| 51 | payload = { | ||
| 52 | 'path': 'eventmq.tests.test_worker', | ||
| 53 | 'callable': 'job', | ||
| 54 | 'args': [2] | ||
| 55 | } | ||
| 56 | |||
| 57 | q, res_q = Queue.Queue(), Queue.Queue() | ||
| 58 | |||
| 59 | q.put(payload) | ||
| 60 | q.put('DONE') | ||
| 61 | |||
| 62 | try: | ||
| 63 | conf.JOB_ENTRY_FUNC = setup_func_str | ||
| 64 | worker._run(q, res_q, logging.getLogger()) | ||
| 65 | finally: | ||
| 66 | conf.JOB_ENTRY_FUNC = '' | ||
| 67 | |||
| 68 | callable_from_name_mock.assert_called_with(setup_func_str) | ||
| 69 | assert callable_from_name_mock.return_value.call_count == 1 | ||
| 70 | |||
| 71 | |||
| 72 | @mock.patch('eventmq.worker.callable_from_name') | ||
| 73 | def test_run_job_teardown_hook(callable_from_name_mock): | ||
| 74 | from eventmq import conf | ||
| 75 | |||
| 76 | teardown_func_str = 'eventmq.tests.test_worker.job_teardown_hook' | ||
| 77 | callable_from_name_mock.return_value = mock.Mock() | ||
| 78 | |||
| 79 | payload = { | ||
| 80 | 'path': 'eventmq.tests.test_worker', | ||
| 81 | 'callable': 'job', | ||
| 82 | 'args': [2] | ||
| 83 | } | ||
| 84 | |||
| 85 | q, res_q = Queue.Queue(), Queue.Queue() | ||
| 86 | |||
| 87 | q.put(payload) | ||
| 88 | q.put('DONE') | ||
| 89 | |||
| 90 | try: | ||
| 91 | conf.JOB_EXIT_FUNC = teardown_func_str | ||
| 92 | worker._run(q, res_q, logging.getLogger()) | ||
| 93 | finally: | ||
| 94 | conf.JOB_EXIT_FUNC = '' | ||
| 95 | |||
| 96 | callable_from_name_mock.assert_called_with(teardown_func_str) | ||
| 97 | assert callable_from_name_mock.return_value.call_count == 1 | ||
| 98 | |||
| 99 | |||
| 100 | @mock.patch('eventmq.worker.callable_from_name') | ||
| 101 | def test_run_subprocess_setup_func(callable_from_name_mock): | ||
| 102 | from eventmq import conf | ||
| 103 | |||
| 104 | setup_func_str = 'eventmq.tests.test_worker.process_setup_hook' | ||
| 105 | callable_from_name_mock.return_value = mock.Mock() | ||
| 106 | |||
| 107 | payload = { | ||
| 108 | 'path': 'eventmq.tests.test_worker', | ||
| 109 | 'callable': 'job', | ||
| 110 | 'args': [2] | ||
| 111 | } | ||
| 112 | |||
| 113 | q, res_q = Queue.Queue(), Queue.Queue() | ||
| 114 | |||
| 115 | q.put(payload) | ||
| 116 | q.put('DONE') | ||
| 117 | |||
| 118 | try: | ||
| 119 | conf.SUBPROCESS_SETUP_FUNC = setup_func_str | ||
| 120 | worker._run(q, res_q, logging.getLogger()) | ||
| 121 | finally: | ||
| 122 | conf.SUBPROCESS_SETUP_FUNC = '' | ||
| 123 | |||
| 124 | callable_from_name_mock.assert_called_with(setup_func_str) | ||
| 125 | assert callable_from_name_mock.return_value.call_count == 1 | ||
| 126 | |||
| 127 | |||
| 128 | @mock.patch('eventmq.worker.run_setup') | ||
| 129 | def test_run_run_setup_func(run_setup_mock): | ||
| 130 | from eventmq import conf | ||
| 131 | |||
| 132 | setup_func_path = 'eventmq.tests.test_worker' | ||
| 133 | setup_func_callable = 'process_setup_hook' | ||
| 134 | |||
| 135 | payload = { | ||
| 136 | 'path': 'eventmq.tests.test_worker', | ||
| 137 | 'callable': 'job', | ||
| 138 | 'args': [2] | ||
| 139 | } | ||
| 140 | |||
| 141 | q, res_q = Queue.Queue(), Queue.Queue() | ||
| 142 | |||
| 143 | q.put(payload) | ||
| 144 | q.put('DONE') | ||
| 145 | |||
| 146 | try: | ||
| 147 | conf.SETUP_PATH = setup_func_path | ||
| 148 | conf.SETUP_CALLABLE = setup_func_callable | ||
| 149 | worker._run(q, res_q, logging.getLogger()) | ||
| 150 | finally: | ||
| 151 | conf.SETUP_PATH = '' | ||
| 152 | conf.SETUP_CALLABLE = '' | ||
| 153 | |||
| 154 | run_setup_mock.assert_called_with(setup_func_path, setup_func_callable) | ||
| 155 | |||
| 156 | |||
| 36 | def test_run_setup(): | 157 | def test_run_setup(): |
| 37 | setup_callable = 'pre_hook' | 158 | setup_callable = 'process_setup_hook' |
| 38 | setup_path = 'eventmq.tests.test_worker' | 159 | setup_path = 'eventmq.tests.test_worker' |
| 39 | 160 | ||
| 40 | worker.run_setup(setup_path, setup_callable) | 161 | assert worker.run_setup(setup_path, setup_callable) |
| 41 | 162 | ||
| 42 | 163 | ||
| 43 | def job(sleep_time=0): | 164 | def job(sleep_time=0): |
| @@ -46,9 +167,16 @@ def job(sleep_time=0): | |||
| 46 | return True | 167 | return True |
| 47 | 168 | ||
| 48 | 169 | ||
| 49 | def pre_hook(): | 170 | def process_setup_hook(): |
| 50 | return 1 | 171 | print 'process setup hook executed' |
| 172 | return True | ||
| 51 | 173 | ||
| 52 | 174 | ||
| 53 | def post_hook(): | 175 | def job_setup_hook(): |
| 54 | return 1 | 176 | print 'job setup hook executed' |
| 177 | return SETUP_SUCCESS_RETVAL | ||
| 178 | |||
| 179 | |||
| 180 | def job_teardown_hook(): | ||
| 181 | print 'job teardown hook executed' | ||
| 182 | return True | ||
diff --git a/eventmq/worker.py b/eventmq/worker.py index 1621578..aa03284 100644 --- a/eventmq/worker.py +++ b/eventmq/worker.py | |||
| @@ -18,16 +18,13 @@ | |||
| 18 | Defines different short-lived workers that execute jobs | 18 | Defines different short-lived workers that execute jobs |
| 19 | """ | 19 | """ |
| 20 | from importlib import import_module | 20 | from importlib import import_module |
| 21 | |||
| 22 | import logging | 21 | import logging |
| 23 | |||
| 24 | from multiprocessing import Process | 22 | from multiprocessing import Process |
| 25 | |||
| 26 | import os | 23 | import os |
| 27 | import sys | 24 | import sys |
| 28 | |||
| 29 | from threading import Thread | 25 | from threading import Thread |
| 30 | 26 | ||
| 27 | from .utils.functions import callable_from_name | ||
| 31 | from . import conf | 28 | from . import conf |
| 32 | 29 | ||
| 33 | if sys.version[0] == '2': | 30 | if sys.version[0] == '2': |
| @@ -180,7 +177,19 @@ def _run(queue, result_queue, logger): | |||
| 180 | "class_kwargs": {"value": 2} | 177 | "class_kwargs": {"value": 2} |
| 181 | } | 178 | } |
| 182 | """ | 179 | """ |
| 183 | if any(conf.SETUP_CALLABLE) and any(conf.SETUP_PATH): | 180 | if any(conf.SUBPROCESS_SETUP_FUNC): |
| 181 | try: | ||
| 182 | logger.debug("Running setup ({}) for worker id {}".format( | ||
| 183 | conf.SUBPROCESS_SETUP_FUNC, os.getpid())) | ||
| 184 | setup_func = callable_from_name(conf.SUBPROCESS_SETUP_FUNC) | ||
| 185 | setup_func() | ||
| 186 | except Exception as e: | ||
| 187 | logger.warning('Unable to do setup task ({}): {}' | ||
| 188 | .format(conf.SUBPROCESS_SETUP_FUNC, str(e))) | ||
| 189 | |||
| 190 | elif any(conf.SETUP_CALLABLE) and any(conf.SETUP_PATH): | ||
| 191 | logger.warning("SETUP_CALLABLE and SETUP_PATH deprecated in favor for " | ||
| 192 | "SUBPROCESS_SETUP_FUNC") | ||
| 184 | try: | 193 | try: |
| 185 | logger.debug("Running setup ({}.{}) for worker id {}" | 194 | logger.debug("Running setup ({}.{}) for worker id {}" |
| 186 | .format( | 195 | .format( |
| @@ -193,6 +202,16 @@ def _run(queue, result_queue, logger): | |||
| 193 | .format(conf.SETUP_PATH, | 202 | .format(conf.SETUP_PATH, |
| 194 | conf.SETUP_CALLABLE, str(e))) | 203 | conf.SETUP_CALLABLE, str(e))) |
| 195 | 204 | ||
| 205 | if conf.JOB_ENTRY_FUNC: | ||
| 206 | job_entry_func = callable_from_name(conf.JOB_ENTRY_FUNC) | ||
| 207 | else: | ||
| 208 | job_entry_func = None | ||
| 209 | |||
| 210 | if conf.JOB_EXIT_FUNC: | ||
| 211 | job_exit_func = callable_from_name(conf.JOB_EXIT_FUNC) | ||
| 212 | else: | ||
| 213 | job_exit_func = None | ||
| 214 | |||
| 196 | while True: | 215 | while True: |
| 197 | # Blocking get so we don't spin cycles reading over and over | 216 | # Blocking get so we don't spin cycles reading over and over |
| 198 | try: | 217 | try: |
| @@ -204,7 +223,14 @@ def _run(queue, result_queue, logger): | |||
| 204 | if payload == 'DONE': | 223 | if payload == 'DONE': |
| 205 | break | 224 | break |
| 206 | 225 | ||
| 226 | if job_entry_func: | ||
| 227 | job_entry_func() | ||
| 228 | |||
| 207 | return_val = _run_job(payload, logger) | 229 | return_val = _run_job(payload, logger) |
| 230 | |||
| 231 | if job_exit_func: | ||
| 232 | job_exit_func() | ||
| 233 | |||
| 208 | # Signal that we're done with this job and put its return value on the | 234 | # Signal that we're done with this job and put its return value on the |
| 209 | # result queue | 235 | # result queue |
| 210 | result_queue.put(return_val) | 236 | result_queue.put(return_val) |
| @@ -278,4 +304,4 @@ def run_setup(setup_path, setup_callable): | |||
| 278 | 304 | ||
| 279 | setup_callable_ = getattr(setup_package, setup_callable) | 305 | setup_callable_ = getattr(setup_package, setup_callable) |
| 280 | 306 | ||
| 281 | setup_callable_() | 307 | return setup_callable_() |
| @@ -7,7 +7,7 @@ from setuptools import find_packages, setup | |||
| 7 | 7 | ||
| 8 | setup( | 8 | setup( |
| 9 | name='eventmq', | 9 | name='eventmq', |
| 10 | version='0.3.5.6', | 10 | version='0.3.6', |
| 11 | description='EventMQ job execution and messaging system based on ZeroMQ', | 11 | description='EventMQ job execution and messaging system based on ZeroMQ', |
| 12 | packages=find_packages(), | 12 | packages=find_packages(), |
| 13 | install_requires=['pyzmq==15.4.0', | 13 | install_requires=['pyzmq==15.4.0', |