diff options
| author | jason | 2018-04-10 16:25:27 -0600 |
|---|---|---|
| committer | jason | 2018-04-10 16:25:27 -0600 |
| commit | 57329aba1e888b64d56ac2386466791304cdf86a (patch) | |
| tree | 2390baadba00d8a6051fbf9311fb9862bbdbdcea | |
| parent | 41d02b6ac7b3d1c79d39039204b9238c8997f33f (diff) | |
| download | eventmq-57329aba1e888b64d56ac2386466791304cdf86a.tar.gz eventmq-57329aba1e888b64d56ac2386466791304cdf86a.zip | |
Add job setup and cleanup functions
This adds 2 new config options: `job_entry_func` and `job_exit_func`.
These functions are executed before and after every single job
execution. The rationale behind this is before and after each request
Django cleans up stale database connections, so Django jobs need some
way of running this same setup/cleanup functions.
https://github.com/django/django/blob/master/django/db/__init__.py#L57
```
# Register an event to reset transaction state and close connections past
# their lifetime.
def close_old_connections(**kwargs):
for conn in connections.all():
conn.close_if_unusable_or_obsolete()
signals.request_started.connect(close_old_connections)
signals.request_finished.connect(close_old_connections)
```
fixes: #41
| -rw-r--r-- | etc/eventmq.conf-dist | 15 | ||||
| -rw-r--r-- | eventmq/__init__.py | 2 | ||||
| -rw-r--r-- | eventmq/conf.py | 9 | ||||
| -rw-r--r-- | eventmq/tests/test_worker.py | 142 | ||||
| -rw-r--r-- | eventmq/worker.py | 38 | ||||
| -rw-r--r-- | setup.py | 2 |
6 files changed, 192 insertions, 16 deletions
diff --git a/etc/eventmq.conf-dist b/etc/eventmq.conf-dist index 43ec6f2..1ff09c4 100644 --- a/etc/eventmq.conf-dist +++ b/etc/eventmq.conf-dist | |||
| @@ -19,9 +19,22 @@ scheduler_addr=tcp://eventmq:47291 | |||
| 19 | 19 | ||
| 20 | [jobmanager] | 20 | [jobmanager] |
| 21 | worker_addr=tcp://127.0.0.1:47290 | 21 | worker_addr=tcp://127.0.0.1:47290 |
| 22 | queues=[[50,"google"], [40,"pushes"], [10,"default"]] | 22 | |
| 23 | # Defines the weight and name of queues this worker deals with. | ||
| 24 | queues=[[20,"heavy-cpu"], [30,"low-cpu"], [10,"default"]] | ||
| 25 | |||
| 26 | # Specifies the number of of sub-processes to spawn to process jobs concurrently | ||
| 23 | concurrent_jobs=2 | 27 | concurrent_jobs=2 |
| 24 | 28 | ||
| 29 | # This function is executed when EventMQ creates a new worker subprocess | ||
| 30 | # subprocess_setup_func = 'path.to.my_setup_function' | ||
| 31 | |||
| 32 | # This function is executed before every job | ||
| 33 | # job_entry_func = 'path.to.my_job_setup_function' | ||
| 34 | |||
| 35 | # This function is executed after every job | ||
| 36 | # job_exit_func = 'path.to.my_job_teardown_function' | ||
| 37 | |||
| 25 | [publisher] | 38 | [publisher] |
| 26 | publisher_incoming_addr=tcp://0.0.0.0:47298 | 39 | publisher_incoming_addr=tcp://0.0.0.0:47298 |
| 27 | publisher_outgoing_addr=tcp://0.0.0.0:47299 \ No newline at end of file | 40 | publisher_outgoing_addr=tcp://0.0.0.0:47299 \ No newline at end of file |
diff --git a/eventmq/__init__.py b/eventmq/__init__.py index aefc3a2..2e180ed 100644 --- a/eventmq/__init__.py +++ b/eventmq/__init__.py | |||
| @@ -1,5 +1,5 @@ | |||
| 1 | __author__ = 'EventMQ Contributors' | 1 | __author__ = 'EventMQ Contributors' |
| 2 | __version__ = '0.3.5.6' | 2 | __version__ = '0.3.6' |
| 3 | 3 | ||
| 4 | PROTOCOL_VERSION = 'eMQP/1.0' | 4 | PROTOCOL_VERSION = 'eMQP/1.0' |
| 5 | 5 | ||
diff --git a/eventmq/conf.py b/eventmq/conf.py index 8609716..715bbb2 100644 --- a/eventmq/conf.py +++ b/eventmq/conf.py | |||
| @@ -89,9 +89,18 @@ RQ_PASSWORD = '' | |||
| 89 | MAX_JOB_COUNT = 1024 | 89 | MAX_JOB_COUNT = 1024 |
| 90 | 90 | ||
| 91 | # Path/Callable to run on start of a worker process | 91 | # Path/Callable to run on start of a worker process |
| 92 | # These options are deprecated for the more user-friendly | ||
| 93 | # SUBPROCESS_SETUP_FUNC which can be a full path to a function. | ||
| 92 | SETUP_PATH = '' | 94 | SETUP_PATH = '' |
| 93 | SETUP_CALLABLE = '' | 95 | SETUP_CALLABLE = '' |
| 94 | 96 | ||
| 97 | # Function to run on the start of a new worker subprocess | ||
| 98 | SUBPROCESS_SETUP_FUNC = '' | ||
| 99 | |||
| 100 | # function to be run before the execution of every job | ||
| 101 | JOB_ENTRY_FUNC = '' | ||
| 102 | # function to be run after the execution of every job | ||
| 103 | JOB_EXIT_FUNC = '' | ||
| 95 | # Time to wait after receiving SIGTERM to kill the workers in the jobmanager | 104 | # Time to wait after receiving SIGTERM to kill the workers in the jobmanager |
| 96 | # forecfully | 105 | # forecfully |
| 97 | KILL_GRACE_PERIOD = 300 | 106 | KILL_GRACE_PERIOD = 300 |
diff --git a/eventmq/tests/test_worker.py b/eventmq/tests/test_worker.py index 5e72f39..b8e4f3e 100644 --- a/eventmq/tests/test_worker.py +++ b/eventmq/tests/test_worker.py | |||
| @@ -15,11 +15,19 @@ | |||
| 15 | 15 | ||
| 16 | import logging | 16 | import logging |
| 17 | import time | 17 | import time |
| 18 | import sys | ||
| 18 | 19 | ||
| 20 | import mock | ||
| 19 | from .. import worker | 21 | from .. import worker |
| 20 | 22 | ||
| 21 | ADDR = 'inproc://pour_the_rice_in_the_thing' | 23 | if sys.version[0] == '2': |
| 24 | import Queue | ||
| 25 | else: | ||
| 26 | import queue as Queue | ||
| 27 | |||
| 22 | 28 | ||
| 29 | ADDR = 'inproc://pour_the_rice_in_the_thing' | ||
| 30 | SETUP_SUCCESS_RETVAL = 'job setup success' | ||
| 23 | 31 | ||
| 24 | def test_run_with_timeout(): | 32 | def test_run_with_timeout(): |
| 25 | payload = { | 33 | payload = { |
| @@ -33,11 +41,124 @@ def test_run_with_timeout(): | |||
| 33 | assert msgid | 41 | assert msgid |
| 34 | 42 | ||
| 35 | 43 | ||
| 44 | @mock.patch('eventmq.worker.callable_from_name') | ||
| 45 | def test_run_job_setup_hook(callable_from_name_mock): | ||
| 46 | from eventmq import conf | ||
| 47 | |||
| 48 | setup_func_str = 'eventmq.tests.test_worker.job_setup_hook' | ||
| 49 | callable_from_name_mock.return_value = mock.Mock() | ||
| 50 | |||
| 51 | payload = { | ||
| 52 | 'path': 'eventmq.tests.test_worker', | ||
| 53 | 'callable': 'job', | ||
| 54 | 'args': [2] | ||
| 55 | } | ||
| 56 | |||
| 57 | q, res_q = Queue.Queue(), Queue.Queue() | ||
| 58 | |||
| 59 | q.put(payload) | ||
| 60 | q.put('DONE') | ||
| 61 | |||
| 62 | try: | ||
| 63 | conf.JOB_ENTRY_FUNC = setup_func_str | ||
| 64 | worker._run(q, res_q, logging.getLogger()) | ||
| 65 | finally: | ||
| 66 | conf.JOB_ENTRY_FUNC = '' | ||
| 67 | |||
| 68 | callable_from_name_mock.assert_called_with(setup_func_str) | ||
| 69 | assert callable_from_name_mock.return_value.call_count == 1 | ||
| 70 | |||
| 71 | |||
| 72 | @mock.patch('eventmq.worker.callable_from_name') | ||
| 73 | def test_run_job_teardown_hook(callable_from_name_mock): | ||
| 74 | from eventmq import conf | ||
| 75 | |||
| 76 | teardown_func_str = 'eventmq.tests.test_worker.job_teardown_hook' | ||
| 77 | callable_from_name_mock.return_value = mock.Mock() | ||
| 78 | |||
| 79 | payload = { | ||
| 80 | 'path': 'eventmq.tests.test_worker', | ||
| 81 | 'callable': 'job', | ||
| 82 | 'args': [2] | ||
| 83 | } | ||
| 84 | |||
| 85 | q, res_q = Queue.Queue(), Queue.Queue() | ||
| 86 | |||
| 87 | q.put(payload) | ||
| 88 | q.put('DONE') | ||
| 89 | |||
| 90 | try: | ||
| 91 | conf.JOB_EXIT_FUNC = teardown_func_str | ||
| 92 | worker._run(q, res_q, logging.getLogger()) | ||
| 93 | finally: | ||
| 94 | conf.JOB_EXIT_FUNC = '' | ||
| 95 | |||
| 96 | callable_from_name_mock.assert_called_with(teardown_func_str) | ||
| 97 | assert callable_from_name_mock.return_value.call_count == 1 | ||
| 98 | |||
| 99 | |||
| 100 | @mock.patch('eventmq.worker.callable_from_name') | ||
| 101 | def test_run_subprocess_setup_func(callable_from_name_mock): | ||
| 102 | from eventmq import conf | ||
| 103 | |||
| 104 | setup_func_str = 'eventmq.tests.test_worker.process_setup_hook' | ||
| 105 | callable_from_name_mock.return_value = mock.Mock() | ||
| 106 | |||
| 107 | payload = { | ||
| 108 | 'path': 'eventmq.tests.test_worker', | ||
| 109 | 'callable': 'job', | ||
| 110 | 'args': [2] | ||
| 111 | } | ||
| 112 | |||
| 113 | q, res_q = Queue.Queue(), Queue.Queue() | ||
| 114 | |||
| 115 | q.put(payload) | ||
| 116 | q.put('DONE') | ||
| 117 | |||
| 118 | try: | ||
| 119 | conf.SUBPROCESS_SETUP_FUNC = setup_func_str | ||
| 120 | worker._run(q, res_q, logging.getLogger()) | ||
| 121 | finally: | ||
| 122 | conf.SUBPROCESS_SETUP_FUNC = '' | ||
| 123 | |||
| 124 | callable_from_name_mock.assert_called_with(setup_func_str) | ||
| 125 | assert callable_from_name_mock.return_value.call_count == 1 | ||
| 126 | |||
| 127 | |||
| 128 | @mock.patch('eventmq.worker.run_setup') | ||
| 129 | def test_run_run_setup_func(run_setup_mock): | ||
| 130 | from eventmq import conf | ||
| 131 | |||
| 132 | setup_func_path = 'eventmq.tests.test_worker' | ||
| 133 | setup_func_callable = 'process_setup_hook' | ||
| 134 | |||
| 135 | payload = { | ||
| 136 | 'path': 'eventmq.tests.test_worker', | ||
| 137 | 'callable': 'job', | ||
| 138 | 'args': [2] | ||
| 139 | } | ||
| 140 | |||
| 141 | q, res_q = Queue.Queue(), Queue.Queue() | ||
| 142 | |||
| 143 | q.put(payload) | ||
| 144 | q.put('DONE') | ||
| 145 | |||
| 146 | try: | ||
| 147 | conf.SETUP_PATH = setup_func_path | ||
| 148 | conf.SETUP_CALLABLE = setup_func_callable | ||
| 149 | worker._run(q, res_q, logging.getLogger()) | ||
| 150 | finally: | ||
| 151 | conf.SETUP_PATH = '' | ||
| 152 | conf.SETUP_CALLABLE = '' | ||
| 153 | |||
| 154 | run_setup_mock.assert_called_with(setup_func_path, setup_func_callable) | ||
| 155 | |||
| 156 | |||
| 36 | def test_run_setup(): | 157 | def test_run_setup(): |
| 37 | setup_callable = 'pre_hook' | 158 | setup_callable = 'process_setup_hook' |
| 38 | setup_path = 'eventmq.tests.test_worker' | 159 | setup_path = 'eventmq.tests.test_worker' |
| 39 | 160 | ||
| 40 | worker.run_setup(setup_path, setup_callable) | 161 | assert worker.run_setup(setup_path, setup_callable) |
| 41 | 162 | ||
| 42 | 163 | ||
| 43 | def job(sleep_time=0): | 164 | def job(sleep_time=0): |
| @@ -46,9 +167,16 @@ def job(sleep_time=0): | |||
| 46 | return True | 167 | return True |
| 47 | 168 | ||
| 48 | 169 | ||
| 49 | def pre_hook(): | 170 | def process_setup_hook(): |
| 50 | return 1 | 171 | print 'process setup hook executed' |
| 172 | return True | ||
| 51 | 173 | ||
| 52 | 174 | ||
| 53 | def post_hook(): | 175 | def job_setup_hook(): |
| 54 | return 1 | 176 | print 'job setup hook executed' |
| 177 | return SETUP_SUCCESS_RETVAL | ||
| 178 | |||
| 179 | |||
| 180 | def job_teardown_hook(): | ||
| 181 | print 'job teardown hook executed' | ||
| 182 | return True | ||
diff --git a/eventmq/worker.py b/eventmq/worker.py index 1621578..aa03284 100644 --- a/eventmq/worker.py +++ b/eventmq/worker.py | |||
| @@ -18,16 +18,13 @@ | |||
| 18 | Defines different short-lived workers that execute jobs | 18 | Defines different short-lived workers that execute jobs |
| 19 | """ | 19 | """ |
| 20 | from importlib import import_module | 20 | from importlib import import_module |
| 21 | |||
| 22 | import logging | 21 | import logging |
| 23 | |||
| 24 | from multiprocessing import Process | 22 | from multiprocessing import Process |
| 25 | |||
| 26 | import os | 23 | import os |
| 27 | import sys | 24 | import sys |
| 28 | |||
| 29 | from threading import Thread | 25 | from threading import Thread |
| 30 | 26 | ||
| 27 | from .utils.functions import callable_from_name | ||
| 31 | from . import conf | 28 | from . import conf |
| 32 | 29 | ||
| 33 | if sys.version[0] == '2': | 30 | if sys.version[0] == '2': |
| @@ -180,7 +177,19 @@ def _run(queue, result_queue, logger): | |||
| 180 | "class_kwargs": {"value": 2} | 177 | "class_kwargs": {"value": 2} |
| 181 | } | 178 | } |
| 182 | """ | 179 | """ |
| 183 | if any(conf.SETUP_CALLABLE) and any(conf.SETUP_PATH): | 180 | if any(conf.SUBPROCESS_SETUP_FUNC): |
| 181 | try: | ||
| 182 | logger.debug("Running setup ({}) for worker id {}".format( | ||
| 183 | conf.SUBPROCESS_SETUP_FUNC, os.getpid())) | ||
| 184 | setup_func = callable_from_name(conf.SUBPROCESS_SETUP_FUNC) | ||
| 185 | setup_func() | ||
| 186 | except Exception as e: | ||
| 187 | logger.warning('Unable to do setup task ({}): {}' | ||
| 188 | .format(conf.SUBPROCESS_SETUP_FUNC, str(e))) | ||
| 189 | |||
| 190 | elif any(conf.SETUP_CALLABLE) and any(conf.SETUP_PATH): | ||
| 191 | logger.warning("SETUP_CALLABLE and SETUP_PATH deprecated in favor for " | ||
| 192 | "SUBPROCESS_SETUP_FUNC") | ||
| 184 | try: | 193 | try: |
| 185 | logger.debug("Running setup ({}.{}) for worker id {}" | 194 | logger.debug("Running setup ({}.{}) for worker id {}" |
| 186 | .format( | 195 | .format( |
| @@ -193,6 +202,16 @@ def _run(queue, result_queue, logger): | |||
| 193 | .format(conf.SETUP_PATH, | 202 | .format(conf.SETUP_PATH, |
| 194 | conf.SETUP_CALLABLE, str(e))) | 203 | conf.SETUP_CALLABLE, str(e))) |
| 195 | 204 | ||
| 205 | if conf.JOB_ENTRY_FUNC: | ||
| 206 | job_entry_func = callable_from_name(conf.JOB_ENTRY_FUNC) | ||
| 207 | else: | ||
| 208 | job_entry_func = None | ||
| 209 | |||
| 210 | if conf.JOB_EXIT_FUNC: | ||
| 211 | job_exit_func = callable_from_name(conf.JOB_EXIT_FUNC) | ||
| 212 | else: | ||
| 213 | job_exit_func = None | ||
| 214 | |||
| 196 | while True: | 215 | while True: |
| 197 | # Blocking get so we don't spin cycles reading over and over | 216 | # Blocking get so we don't spin cycles reading over and over |
| 198 | try: | 217 | try: |
| @@ -204,7 +223,14 @@ def _run(queue, result_queue, logger): | |||
| 204 | if payload == 'DONE': | 223 | if payload == 'DONE': |
| 205 | break | 224 | break |
| 206 | 225 | ||
| 226 | if job_entry_func: | ||
| 227 | job_entry_func() | ||
| 228 | |||
| 207 | return_val = _run_job(payload, logger) | 229 | return_val = _run_job(payload, logger) |
| 230 | |||
| 231 | if job_exit_func: | ||
| 232 | job_exit_func() | ||
| 233 | |||
| 208 | # Signal that we're done with this job and put its return value on the | 234 | # Signal that we're done with this job and put its return value on the |
| 209 | # result queue | 235 | # result queue |
| 210 | result_queue.put(return_val) | 236 | result_queue.put(return_val) |
| @@ -278,4 +304,4 @@ def run_setup(setup_path, setup_callable): | |||
| 278 | 304 | ||
| 279 | setup_callable_ = getattr(setup_package, setup_callable) | 305 | setup_callable_ = getattr(setup_package, setup_callable) |
| 280 | 306 | ||
| 281 | setup_callable_() | 307 | return setup_callable_() |
| @@ -7,7 +7,7 @@ from setuptools import find_packages, setup | |||
| 7 | 7 | ||
| 8 | setup( | 8 | setup( |
| 9 | name='eventmq', | 9 | name='eventmq', |
| 10 | version='0.3.5.6', | 10 | version='0.3.6', |
| 11 | description='EventMQ job execution and messaging system based on ZeroMQ', | 11 | description='EventMQ job execution and messaging system based on ZeroMQ', |
| 12 | packages=find_packages(), | 12 | packages=find_packages(), |
| 13 | install_requires=['pyzmq==15.4.0', | 13 | install_requires=['pyzmq==15.4.0', |