diff options
| author | sideshowdave7 | 2017-03-08 15:21:23 -0700 |
|---|---|---|
| committer | sideshowdave7 | 2017-03-08 15:21:23 -0700 |
| commit | ea01e0e6205cd314657c0ab5851a584685e0ecd9 (patch) | |
| tree | 654e1054bf95396e76b121e8dec36d3515a9e7b9 | |
| parent | 2e67c94849e28927c13a0695457e94f01eba389e (diff) | |
| download | eventmq-ea01e0e6205cd314657c0ab5851a584685e0ecd9.tar.gz eventmq-ea01e0e6205cd314657c0ab5851a584685e0ecd9.zip | |
Implement pre/post hook support
| -rwxr-xr-x | bin/send_msg | 2 | ||||
| -rw-r--r-- | eventmq/client/jobs.py | 3 | ||||
| -rw-r--r-- | eventmq/client/messages.py | 13 | ||||
| -rw-r--r-- | eventmq/conf.py | 4 | ||||
| -rw-r--r-- | eventmq/tests/test_jobmanager.py | 5 | ||||
| -rw-r--r-- | eventmq/tests/test_worker.py | 65 | ||||
| -rw-r--r-- | eventmq/worker.py | 36 |
7 files changed, 113 insertions, 15 deletions
diff --git a/bin/send_msg b/bin/send_msg index 19e88de..b0ea160 100755 --- a/bin/send_msg +++ b/bin/send_msg | |||
| @@ -26,7 +26,7 @@ if __name__ == "__main__": | |||
| 26 | 'kwargs': {} | 26 | 'kwargs': {} |
| 27 | }] | 27 | }] |
| 28 | 28 | ||
| 29 | send_request(s, msg, guarantee=True, reply_requested=True, timeout=1) | 29 | send_request(s, msg, guarantee=True, reply_requested=True, timeout=10) |
| 30 | print zmq.POLLOUT | 30 | print zmq.POLLOUT |
| 31 | events = dict(poller.poll(500)) | 31 | events = dict(poller.poll(500)) |
| 32 | print events | 32 | print events |
diff --git a/eventmq/client/jobs.py b/eventmq/client/jobs.py index f82210d..9ec9a07 100644 --- a/eventmq/client/jobs.py +++ b/eventmq/client/jobs.py | |||
| @@ -101,7 +101,8 @@ class Job(object): | |||
| 101 | return f | 101 | return f |
| 102 | 102 | ||
| 103 | 103 | ||
| 104 | def job(func, broker_addr=None, queue=None, async=True, *args, **kwargs): | 104 | def job(func, broker_addr=None, queue=None, async=True, *args, |
| 105 | **kwargs): | ||
| 105 | """ | 106 | """ |
| 106 | Functional decorator helper for creating a deferred eventmq job. See | 107 | Functional decorator helper for creating a deferred eventmq job. See |
| 107 | :class:`Job` for more information. | 108 | :class:`Job` for more information. |
diff --git a/eventmq/client/messages.py b/eventmq/client/messages.py index 617720a..9e49ea8 100644 --- a/eventmq/client/messages.py +++ b/eventmq/client/messages.py | |||
| @@ -108,7 +108,7 @@ def schedule(socket, func, interval_secs=None, args=(), kwargs=None, | |||
| 108 | 108 | ||
| 109 | 109 | ||
| 110 | def defer_job( | 110 | def defer_job( |
| 111 | socket, func, wrapper=None, args=(), kwargs=None, class_args=(), | 111 | socket, func, args=(), kwargs=None, class_args=(), |
| 112 | class_kwargs=None, reply_requested=False, guarantee=False, | 112 | class_kwargs=None, reply_requested=False, guarantee=False, |
| 113 | retry_count=0, timeout=0, debounce_secs=False, | 113 | retry_count=0, timeout=0, debounce_secs=False, |
| 114 | queue=conf.DEFAULT_QUEUE_NAME): | 114 | queue=conf.DEFAULT_QUEUE_NAME): |
| @@ -126,8 +126,6 @@ def defer_job( | |||
| 126 | socket (socket): eventmq socket to use for sending the message | 126 | socket (socket): eventmq socket to use for sending the message |
| 127 | func (callable or str): the callable (or string path to callable) to be | 127 | func (callable or str): the callable (or string path to callable) to be |
| 128 | deferred to a worker | 128 | deferred to a worker |
| 129 | wrapper (callable): optional wrapper for the call to func to be | ||
| 130 | wrapped with | ||
| 131 | args (list): list of *args for the callable | 129 | args (list): list of *args for the callable |
| 132 | kwargs (dict): dict of **kwargs for the callable | 130 | kwargs (dict): dict of **kwargs for the callable |
| 133 | class_args (list): list of *args to pass to the the class when | 131 | class_args (list): list of *args to pass to the the class when |
| @@ -177,15 +175,6 @@ def defer_job( | |||
| 177 | logger.error('Encountered non-callable func: {}'.format(func)) | 175 | logger.error('Encountered non-callable func: {}'.format(func)) |
| 178 | return | 176 | return |
| 179 | 177 | ||
| 180 | if wrapper and callable(wrapper): | ||
| 181 | # Prepend the original path and callable name to args | ||
| 182 | args = (path, callable_name, args) | ||
| 183 | callable_name = name_from_callable(wrapper) | ||
| 184 | path, callable_name = split_callable_name(callable_name) | ||
| 185 | elif wrapper: | ||
| 186 | logger.error('Encountered non-callable wrapper: {}'.format(wrapper)) | ||
| 187 | return | ||
| 188 | |||
| 189 | if not callable_name or not path: | 178 | if not callable_name or not path: |
| 190 | logger.error('Encountered invalid callable, will not proceed.') | 179 | logger.error('Encountered invalid callable, will not proceed.') |
| 191 | return | 180 | return |
diff --git a/eventmq/conf.py b/eventmq/conf.py index cec0249..4ad17f7 100644 --- a/eventmq/conf.py +++ b/eventmq/conf.py | |||
| @@ -88,4 +88,8 @@ RQ_PASSWORD = '' | |||
| 88 | 88 | ||
| 89 | MAX_JOB_COUNT = 1024 | 89 | MAX_JOB_COUNT = 1024 |
| 90 | 90 | ||
| 91 | # Path/Callable to run on start of a worker process | ||
| 92 | SETUP_PATH = '' | ||
| 93 | SETUP_CALLABLE = '' | ||
| 94 | |||
| 91 | # }}} | 95 | # }}} |
diff --git a/eventmq/tests/test_jobmanager.py b/eventmq/tests/test_jobmanager.py index db82843..f7d41c7 100644 --- a/eventmq/tests/test_jobmanager.py +++ b/eventmq/tests/test_jobmanager.py | |||
| @@ -170,3 +170,8 @@ def start_jm(jm, addr): | |||
| 170 | 170 | ||
| 171 | def pretend_job(t): | 171 | def pretend_job(t): |
| 172 | time.sleep(t) | 172 | time.sleep(t) |
| 173 | |||
| 174 | |||
| 175 | def test_setup(): | ||
| 176 | import time | ||
| 177 | assert time | ||
diff --git a/eventmq/tests/test_worker.py b/eventmq/tests/test_worker.py new file mode 100644 index 0000000..2f2c67e --- /dev/null +++ b/eventmq/tests/test_worker.py | |||
| @@ -0,0 +1,65 @@ | |||
| 1 | # This file is part of eventmq. | ||
| 2 | # | ||
| 3 | # eventmq is free software: you can redistribute it and/or modify it under the | ||
| 4 | # terms of the GNU Lesser General Public License as published by the Free | ||
| 5 | # Software Foundation, either version 2.1 of the License, or (at your option) | ||
| 6 | # any later version. | ||
| 7 | # | ||
| 8 | # eventmq is distributed in the hope that it will be useful, | ||
| 9 | # but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
| 10 | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||
| 11 | # GNU Lesser General Public License for more details. | ||
| 12 | # | ||
| 13 | # You should have received a copy of the GNU Lesser General Public License | ||
| 14 | # along with eventmq. If not, see <http://www.gnu.org/licenses/>. | ||
| 15 | |||
| 16 | from multiprocessing import Pool | ||
| 17 | import time | ||
| 18 | |||
| 19 | from nose import with_setup | ||
| 20 | |||
| 21 | from .. import worker | ||
| 22 | |||
| 23 | ADDR = 'inproc://pour_the_rice_in_the_thing' | ||
| 24 | |||
| 25 | |||
| 26 | def setup_func(): | ||
| 27 | global pool | ||
| 28 | global out | ||
| 29 | pool = Pool() | ||
| 30 | out = pool.map(job, range(1)) | ||
| 31 | |||
| 32 | |||
| 33 | @with_setup(setup_func) | ||
| 34 | def test_run_with_timeout(): | ||
| 35 | payload = { | ||
| 36 | 'path': 'eventmq.tests.test_worker', | ||
| 37 | 'callable': 'job', | ||
| 38 | 'args': [2] | ||
| 39 | } | ||
| 40 | |||
| 41 | msgid = worker._run(payload) | ||
| 42 | |||
| 43 | assert msgid | ||
| 44 | |||
| 45 | |||
| 46 | @with_setup(setup_func) | ||
| 47 | def test_run_setup(): | ||
| 48 | setup_callable = 'pre_hook' | ||
| 49 | setup_path = 'eventmq.tests.test_worker' | ||
| 50 | |||
| 51 | worker.run_setup(setup_path, setup_callable) | ||
| 52 | |||
| 53 | |||
| 54 | def job(sleep_time=0): | ||
| 55 | time.sleep(sleep_time) | ||
| 56 | |||
| 57 | return True | ||
| 58 | |||
| 59 | |||
| 60 | def pre_hook(): | ||
| 61 | return 1 | ||
| 62 | |||
| 63 | |||
| 64 | def post_hook(): | ||
| 65 | return 1 | ||
diff --git a/eventmq/worker.py b/eventmq/worker.py index 8d25c63..1e8e5a4 100644 --- a/eventmq/worker.py +++ b/eventmq/worker.py | |||
| @@ -49,11 +49,12 @@ class MultiprocessWorker(Process): | |||
| 49 | Defines a worker that spans the job in a multiprocessing task | 49 | Defines a worker that spans the job in a multiprocessing task |
| 50 | """ | 50 | """ |
| 51 | 51 | ||
| 52 | def __init__(self, input_queue, output_queue): | 52 | def __init__(self, input_queue, output_queue, run_setup=True): |
| 53 | super(MultiprocessWorker, self).__init__() | 53 | super(MultiprocessWorker, self).__init__() |
| 54 | self.input_queue = input_queue | 54 | self.input_queue = input_queue |
| 55 | self.output_queue = output_queue | 55 | self.output_queue = output_queue |
| 56 | self.job_count = 0 | 56 | self.job_count = 0 |
| 57 | self.run_setup = run_setup | ||
| 57 | 58 | ||
| 58 | def run(self): | 59 | def run(self): |
| 59 | """ | 60 | """ |
| @@ -61,6 +62,15 @@ class MultiprocessWorker(Process): | |||
| 61 | 62 | ||
| 62 | This is designed to run in a seperate process. | 63 | This is designed to run in a seperate process. |
| 63 | """ | 64 | """ |
| 65 | |||
| 66 | if self.run_setup: | ||
| 67 | self.run_setup = False | ||
| 68 | if conf.SETUP_CALLABLE and conf.SETUP_PATH: | ||
| 69 | try: | ||
| 70 | run_setup(conf.SETUP_PATH, conf.SETUP_CALLABLE) | ||
| 71 | except Exception as e: | ||
| 72 | logger.warning('Unable to complete setup: ' + str(e)) | ||
| 73 | |||
| 64 | import zmq | 74 | import zmq |
| 65 | zmq.Context.instance().term() | 75 | zmq.Context.instance().term() |
| 66 | 76 | ||
| @@ -104,6 +114,9 @@ class MultiprocessWorker(Process): | |||
| 104 | 114 | ||
| 105 | 115 | ||
| 106 | def _run(payload): | 116 | def _run(payload): |
| 117 | """ | ||
| 118 | Takes care of actually executing the code given a message payload | ||
| 119 | """ | ||
| 107 | if ":" in payload["path"]: | 120 | if ":" in payload["path"]: |
| 108 | _pkgsplit = payload["path"].split(':') | 121 | _pkgsplit = payload["path"].split(':') |
| 109 | s_package = _pkgsplit[0] | 122 | s_package = _pkgsplit[0] |
| @@ -148,5 +161,26 @@ def _run(payload): | |||
| 148 | except Exception as e: | 161 | except Exception as e: |
| 149 | logger.exception(e) | 162 | logger.exception(e) |
| 150 | return str(e) | 163 | return str(e) |
| 164 | |||
| 151 | # Signal that we're done with this job | 165 | # Signal that we're done with this job |
| 152 | return 'DONE' | 166 | return 'DONE' |
| 167 | |||
| 168 | |||
| 169 | def run_setup(setup_path, setup_callable): | ||
| 170 | logger.debug("Running setup for worker id {}".format(os.getpid())) | ||
| 171 | if ":" in setup_path: | ||
| 172 | _pkgsplit = setup_path.split(':') | ||
| 173 | s_setup_package = _pkgsplit[0] | ||
| 174 | else: | ||
| 175 | s_setup_package = setup_path | ||
| 176 | |||
| 177 | if setup_callable and s_setup_package: | ||
| 178 | setup_package = import_module(s_setup_package) | ||
| 179 | |||
| 180 | setup_callable_ = getattr(setup_package, setup_callable) | ||
| 181 | |||
| 182 | try: | ||
| 183 | setup_callable_() | ||
| 184 | except Exception as e: | ||
| 185 | logger.exception(e) | ||
| 186 | return str(e) | ||