aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorsideshowdave72017-03-08 15:21:23 -0700
committersideshowdave72017-03-08 15:21:23 -0700
commitea01e0e6205cd314657c0ab5851a584685e0ecd9 (patch)
tree654e1054bf95396e76b121e8dec36d3515a9e7b9
parent2e67c94849e28927c13a0695457e94f01eba389e (diff)
downloadeventmq-ea01e0e6205cd314657c0ab5851a584685e0ecd9.tar.gz
eventmq-ea01e0e6205cd314657c0ab5851a584685e0ecd9.zip
Implement pre/post hook support
-rwxr-xr-xbin/send_msg2
-rw-r--r--eventmq/client/jobs.py3
-rw-r--r--eventmq/client/messages.py13
-rw-r--r--eventmq/conf.py4
-rw-r--r--eventmq/tests/test_jobmanager.py5
-rw-r--r--eventmq/tests/test_worker.py65
-rw-r--r--eventmq/worker.py36
7 files changed, 113 insertions, 15 deletions
diff --git a/bin/send_msg b/bin/send_msg
index 19e88de..b0ea160 100755
--- a/bin/send_msg
+++ b/bin/send_msg
@@ -26,7 +26,7 @@ if __name__ == "__main__":
26 'kwargs': {} 26 'kwargs': {}
27 }] 27 }]
28 28
29 send_request(s, msg, guarantee=True, reply_requested=True, timeout=1) 29 send_request(s, msg, guarantee=True, reply_requested=True, timeout=10)
30 print zmq.POLLOUT 30 print zmq.POLLOUT
31 events = dict(poller.poll(500)) 31 events = dict(poller.poll(500))
32 print events 32 print events
diff --git a/eventmq/client/jobs.py b/eventmq/client/jobs.py
index f82210d..9ec9a07 100644
--- a/eventmq/client/jobs.py
+++ b/eventmq/client/jobs.py
@@ -101,7 +101,8 @@ class Job(object):
101 return f 101 return f
102 102
103 103
104def job(func, broker_addr=None, queue=None, async=True, *args, **kwargs): 104def job(func, broker_addr=None, queue=None, async=True, *args,
105 **kwargs):
105 """ 106 """
106 Functional decorator helper for creating a deferred eventmq job. See 107 Functional decorator helper for creating a deferred eventmq job. See
107 :class:`Job` for more information. 108 :class:`Job` for more information.
diff --git a/eventmq/client/messages.py b/eventmq/client/messages.py
index 617720a..9e49ea8 100644
--- a/eventmq/client/messages.py
+++ b/eventmq/client/messages.py
@@ -108,7 +108,7 @@ def schedule(socket, func, interval_secs=None, args=(), kwargs=None,
108 108
109 109
110def defer_job( 110def defer_job(
111 socket, func, wrapper=None, args=(), kwargs=None, class_args=(), 111 socket, func, args=(), kwargs=None, class_args=(),
112 class_kwargs=None, reply_requested=False, guarantee=False, 112 class_kwargs=None, reply_requested=False, guarantee=False,
113 retry_count=0, timeout=0, debounce_secs=False, 113 retry_count=0, timeout=0, debounce_secs=False,
114 queue=conf.DEFAULT_QUEUE_NAME): 114 queue=conf.DEFAULT_QUEUE_NAME):
@@ -126,8 +126,6 @@ def defer_job(
126 socket (socket): eventmq socket to use for sending the message 126 socket (socket): eventmq socket to use for sending the message
127 func (callable or str): the callable (or string path to callable) to be 127 func (callable or str): the callable (or string path to callable) to be
128 deferred to a worker 128 deferred to a worker
129 wrapper (callable): optional wrapper for the call to func to be
130 wrapped with
131 args (list): list of *args for the callable 129 args (list): list of *args for the callable
132 kwargs (dict): dict of **kwargs for the callable 130 kwargs (dict): dict of **kwargs for the callable
133 class_args (list): list of *args to pass to the the class when 131 class_args (list): list of *args to pass to the the class when
@@ -177,15 +175,6 @@ def defer_job(
177 logger.error('Encountered non-callable func: {}'.format(func)) 175 logger.error('Encountered non-callable func: {}'.format(func))
178 return 176 return
179 177
180 if wrapper and callable(wrapper):
181 # Prepend the original path and callable name to args
182 args = (path, callable_name, args)
183 callable_name = name_from_callable(wrapper)
184 path, callable_name = split_callable_name(callable_name)
185 elif wrapper:
186 logger.error('Encountered non-callable wrapper: {}'.format(wrapper))
187 return
188
189 if not callable_name or not path: 178 if not callable_name or not path:
190 logger.error('Encountered invalid callable, will not proceed.') 179 logger.error('Encountered invalid callable, will not proceed.')
191 return 180 return
diff --git a/eventmq/conf.py b/eventmq/conf.py
index cec0249..4ad17f7 100644
--- a/eventmq/conf.py
+++ b/eventmq/conf.py
@@ -88,4 +88,8 @@ RQ_PASSWORD = ''
88 88
89MAX_JOB_COUNT = 1024 89MAX_JOB_COUNT = 1024
90 90
91# Path/Callable to run on start of a worker process
92SETUP_PATH = ''
93SETUP_CALLABLE = ''
94
91# }}} 95# }}}
diff --git a/eventmq/tests/test_jobmanager.py b/eventmq/tests/test_jobmanager.py
index db82843..f7d41c7 100644
--- a/eventmq/tests/test_jobmanager.py
+++ b/eventmq/tests/test_jobmanager.py
@@ -170,3 +170,8 @@ def start_jm(jm, addr):
170 170
171def pretend_job(t): 171def pretend_job(t):
172 time.sleep(t) 172 time.sleep(t)
173
174
175def test_setup():
176 import time
177 assert time
diff --git a/eventmq/tests/test_worker.py b/eventmq/tests/test_worker.py
new file mode 100644
index 0000000..2f2c67e
--- /dev/null
+++ b/eventmq/tests/test_worker.py
@@ -0,0 +1,65 @@
1# This file is part of eventmq.
2#
3# eventmq is free software: you can redistribute it and/or modify it under the
4# terms of the GNU Lesser General Public License as published by the Free
5# Software Foundation, either version 2.1 of the License, or (at your option)
6# any later version.
7#
8# eventmq is distributed in the hope that it will be useful,
9# but WITHOUT ANY WARRANTY; without even the implied warranty of
10# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11# GNU Lesser General Public License for more details.
12#
13# You should have received a copy of the GNU Lesser General Public License
14# along with eventmq. If not, see <http://www.gnu.org/licenses/>.
15
16from multiprocessing import Pool
17import time
18
19from nose import with_setup
20
21from .. import worker
22
23ADDR = 'inproc://pour_the_rice_in_the_thing'
24
25
26def setup_func():
27 global pool
28 global out
29 pool = Pool()
30 out = pool.map(job, range(1))
31
32
33@with_setup(setup_func)
34def test_run_with_timeout():
35 payload = {
36 'path': 'eventmq.tests.test_worker',
37 'callable': 'job',
38 'args': [2]
39 }
40
41 msgid = worker._run(payload)
42
43 assert msgid
44
45
46@with_setup(setup_func)
47def test_run_setup():
48 setup_callable = 'pre_hook'
49 setup_path = 'eventmq.tests.test_worker'
50
51 worker.run_setup(setup_path, setup_callable)
52
53
54def job(sleep_time=0):
55 time.sleep(sleep_time)
56
57 return True
58
59
60def pre_hook():
61 return 1
62
63
64def post_hook():
65 return 1
diff --git a/eventmq/worker.py b/eventmq/worker.py
index 8d25c63..1e8e5a4 100644
--- a/eventmq/worker.py
+++ b/eventmq/worker.py
@@ -49,11 +49,12 @@ class MultiprocessWorker(Process):
49 Defines a worker that spans the job in a multiprocessing task 49 Defines a worker that spans the job in a multiprocessing task
50 """ 50 """
51 51
52 def __init__(self, input_queue, output_queue): 52 def __init__(self, input_queue, output_queue, run_setup=True):
53 super(MultiprocessWorker, self).__init__() 53 super(MultiprocessWorker, self).__init__()
54 self.input_queue = input_queue 54 self.input_queue = input_queue
55 self.output_queue = output_queue 55 self.output_queue = output_queue
56 self.job_count = 0 56 self.job_count = 0
57 self.run_setup = run_setup
57 58
58 def run(self): 59 def run(self):
59 """ 60 """
@@ -61,6 +62,15 @@ class MultiprocessWorker(Process):
61 62
62 This is designed to run in a seperate process. 63 This is designed to run in a seperate process.
63 """ 64 """
65
66 if self.run_setup:
67 self.run_setup = False
68 if conf.SETUP_CALLABLE and conf.SETUP_PATH:
69 try:
70 run_setup(conf.SETUP_PATH, conf.SETUP_CALLABLE)
71 except Exception as e:
72 logger.warning('Unable to complete setup: ' + str(e))
73
64 import zmq 74 import zmq
65 zmq.Context.instance().term() 75 zmq.Context.instance().term()
66 76
@@ -104,6 +114,9 @@ class MultiprocessWorker(Process):
104 114
105 115
106def _run(payload): 116def _run(payload):
117 """
118 Takes care of actually executing the code given a message payload
119 """
107 if ":" in payload["path"]: 120 if ":" in payload["path"]:
108 _pkgsplit = payload["path"].split(':') 121 _pkgsplit = payload["path"].split(':')
109 s_package = _pkgsplit[0] 122 s_package = _pkgsplit[0]
@@ -148,5 +161,26 @@ def _run(payload):
148 except Exception as e: 161 except Exception as e:
149 logger.exception(e) 162 logger.exception(e)
150 return str(e) 163 return str(e)
164
151 # Signal that we're done with this job 165 # Signal that we're done with this job
152 return 'DONE' 166 return 'DONE'
167
168
169def run_setup(setup_path, setup_callable):
170 logger.debug("Running setup for worker id {}".format(os.getpid()))
171 if ":" in setup_path:
172 _pkgsplit = setup_path.split(':')
173 s_setup_package = _pkgsplit[0]
174 else:
175 s_setup_package = setup_path
176
177 if setup_callable and s_setup_package:
178 setup_package = import_module(s_setup_package)
179
180 setup_callable_ = getattr(setup_package, setup_callable)
181
182 try:
183 setup_callable_()
184 except Exception as e:
185 logger.exception(e)
186 return str(e)