aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--etc/eventmq.conf-dist15
-rw-r--r--eventmq/__init__.py2
-rw-r--r--eventmq/conf.py9
-rw-r--r--eventmq/tests/test_worker.py142
-rw-r--r--eventmq/worker.py38
-rw-r--r--setup.py2
6 files changed, 192 insertions, 16 deletions
diff --git a/etc/eventmq.conf-dist b/etc/eventmq.conf-dist
index 43ec6f2..1ff09c4 100644
--- a/etc/eventmq.conf-dist
+++ b/etc/eventmq.conf-dist
@@ -19,9 +19,22 @@ scheduler_addr=tcp://eventmq:47291
19 19
20[jobmanager] 20[jobmanager]
21worker_addr=tcp://127.0.0.1:47290 21worker_addr=tcp://127.0.0.1:47290
22queues=[[50,"google"], [40,"pushes"], [10,"default"]] 22
23# Defines the weight and name of queues this worker deals with.
24queues=[[20,"heavy-cpu"], [30,"low-cpu"], [10,"default"]]
25
26# Specifies the number of of sub-processes to spawn to process jobs concurrently
23concurrent_jobs=2 27concurrent_jobs=2
24 28
29# This function is executed when EventMQ creates a new worker subprocess
30# subprocess_setup_func = 'path.to.my_setup_function'
31
32# This function is executed before every job
33# job_entry_func = 'path.to.my_job_setup_function'
34
35# This function is executed after every job
36# job_exit_func = 'path.to.my_job_teardown_function'
37
25[publisher] 38[publisher]
26publisher_incoming_addr=tcp://0.0.0.0:47298 39publisher_incoming_addr=tcp://0.0.0.0:47298
27publisher_outgoing_addr=tcp://0.0.0.0:47299 \ No newline at end of file 40publisher_outgoing_addr=tcp://0.0.0.0:47299 \ No newline at end of file
diff --git a/eventmq/__init__.py b/eventmq/__init__.py
index aefc3a2..2e180ed 100644
--- a/eventmq/__init__.py
+++ b/eventmq/__init__.py
@@ -1,5 +1,5 @@
1__author__ = 'EventMQ Contributors' 1__author__ = 'EventMQ Contributors'
2__version__ = '0.3.5.6' 2__version__ = '0.3.6'
3 3
4PROTOCOL_VERSION = 'eMQP/1.0' 4PROTOCOL_VERSION = 'eMQP/1.0'
5 5
diff --git a/eventmq/conf.py b/eventmq/conf.py
index 8609716..715bbb2 100644
--- a/eventmq/conf.py
+++ b/eventmq/conf.py
@@ -89,9 +89,18 @@ RQ_PASSWORD = ''
89MAX_JOB_COUNT = 1024 89MAX_JOB_COUNT = 1024
90 90
91# Path/Callable to run on start of a worker process 91# Path/Callable to run on start of a worker process
92# These options are deprecated for the more user-friendly
93# SUBPROCESS_SETUP_FUNC which can be a full path to a function.
92SETUP_PATH = '' 94SETUP_PATH = ''
93SETUP_CALLABLE = '' 95SETUP_CALLABLE = ''
94 96
97# Function to run on the start of a new worker subprocess
98SUBPROCESS_SETUP_FUNC = ''
99
100# function to be run before the execution of every job
101JOB_ENTRY_FUNC = ''
102# function to be run after the execution of every job
103JOB_EXIT_FUNC = ''
95# Time to wait after receiving SIGTERM to kill the workers in the jobmanager 104# Time to wait after receiving SIGTERM to kill the workers in the jobmanager
96# forecfully 105# forecfully
97KILL_GRACE_PERIOD = 300 106KILL_GRACE_PERIOD = 300
diff --git a/eventmq/tests/test_worker.py b/eventmq/tests/test_worker.py
index 5e72f39..b8e4f3e 100644
--- a/eventmq/tests/test_worker.py
+++ b/eventmq/tests/test_worker.py
@@ -15,11 +15,19 @@
15 15
16import logging 16import logging
17import time 17import time
18import sys
18 19
20import mock
19from .. import worker 21from .. import worker
20 22
21ADDR = 'inproc://pour_the_rice_in_the_thing' 23if sys.version[0] == '2':
24 import Queue
25else:
26 import queue as Queue
27
22 28
29ADDR = 'inproc://pour_the_rice_in_the_thing'
30SETUP_SUCCESS_RETVAL = 'job setup success'
23 31
24def test_run_with_timeout(): 32def test_run_with_timeout():
25 payload = { 33 payload = {
@@ -33,11 +41,124 @@ def test_run_with_timeout():
33 assert msgid 41 assert msgid
34 42
35 43
44@mock.patch('eventmq.worker.callable_from_name')
45def test_run_job_setup_hook(callable_from_name_mock):
46 from eventmq import conf
47
48 setup_func_str = 'eventmq.tests.test_worker.job_setup_hook'
49 callable_from_name_mock.return_value = mock.Mock()
50
51 payload = {
52 'path': 'eventmq.tests.test_worker',
53 'callable': 'job',
54 'args': [2]
55 }
56
57 q, res_q = Queue.Queue(), Queue.Queue()
58
59 q.put(payload)
60 q.put('DONE')
61
62 try:
63 conf.JOB_ENTRY_FUNC = setup_func_str
64 worker._run(q, res_q, logging.getLogger())
65 finally:
66 conf.JOB_ENTRY_FUNC = ''
67
68 callable_from_name_mock.assert_called_with(setup_func_str)
69 assert callable_from_name_mock.return_value.call_count == 1
70
71
72@mock.patch('eventmq.worker.callable_from_name')
73def test_run_job_teardown_hook(callable_from_name_mock):
74 from eventmq import conf
75
76 teardown_func_str = 'eventmq.tests.test_worker.job_teardown_hook'
77 callable_from_name_mock.return_value = mock.Mock()
78
79 payload = {
80 'path': 'eventmq.tests.test_worker',
81 'callable': 'job',
82 'args': [2]
83 }
84
85 q, res_q = Queue.Queue(), Queue.Queue()
86
87 q.put(payload)
88 q.put('DONE')
89
90 try:
91 conf.JOB_EXIT_FUNC = teardown_func_str
92 worker._run(q, res_q, logging.getLogger())
93 finally:
94 conf.JOB_EXIT_FUNC = ''
95
96 callable_from_name_mock.assert_called_with(teardown_func_str)
97 assert callable_from_name_mock.return_value.call_count == 1
98
99
100@mock.patch('eventmq.worker.callable_from_name')
101def test_run_subprocess_setup_func(callable_from_name_mock):
102 from eventmq import conf
103
104 setup_func_str = 'eventmq.tests.test_worker.process_setup_hook'
105 callable_from_name_mock.return_value = mock.Mock()
106
107 payload = {
108 'path': 'eventmq.tests.test_worker',
109 'callable': 'job',
110 'args': [2]
111 }
112
113 q, res_q = Queue.Queue(), Queue.Queue()
114
115 q.put(payload)
116 q.put('DONE')
117
118 try:
119 conf.SUBPROCESS_SETUP_FUNC = setup_func_str
120 worker._run(q, res_q, logging.getLogger())
121 finally:
122 conf.SUBPROCESS_SETUP_FUNC = ''
123
124 callable_from_name_mock.assert_called_with(setup_func_str)
125 assert callable_from_name_mock.return_value.call_count == 1
126
127
128@mock.patch('eventmq.worker.run_setup')
129def test_run_run_setup_func(run_setup_mock):
130 from eventmq import conf
131
132 setup_func_path = 'eventmq.tests.test_worker'
133 setup_func_callable = 'process_setup_hook'
134
135 payload = {
136 'path': 'eventmq.tests.test_worker',
137 'callable': 'job',
138 'args': [2]
139 }
140
141 q, res_q = Queue.Queue(), Queue.Queue()
142
143 q.put(payload)
144 q.put('DONE')
145
146 try:
147 conf.SETUP_PATH = setup_func_path
148 conf.SETUP_CALLABLE = setup_func_callable
149 worker._run(q, res_q, logging.getLogger())
150 finally:
151 conf.SETUP_PATH = ''
152 conf.SETUP_CALLABLE = ''
153
154 run_setup_mock.assert_called_with(setup_func_path, setup_func_callable)
155
156
36def test_run_setup(): 157def test_run_setup():
37 setup_callable = 'pre_hook' 158 setup_callable = 'process_setup_hook'
38 setup_path = 'eventmq.tests.test_worker' 159 setup_path = 'eventmq.tests.test_worker'
39 160
40 worker.run_setup(setup_path, setup_callable) 161 assert worker.run_setup(setup_path, setup_callable)
41 162
42 163
43def job(sleep_time=0): 164def job(sleep_time=0):
@@ -46,9 +167,16 @@ def job(sleep_time=0):
46 return True 167 return True
47 168
48 169
49def pre_hook(): 170def process_setup_hook():
50 return 1 171 print 'process setup hook executed'
172 return True
51 173
52 174
53def post_hook(): 175def job_setup_hook():
54 return 1 176 print 'job setup hook executed'
177 return SETUP_SUCCESS_RETVAL
178
179
180def job_teardown_hook():
181 print 'job teardown hook executed'
182 return True
diff --git a/eventmq/worker.py b/eventmq/worker.py
index 1621578..aa03284 100644
--- a/eventmq/worker.py
+++ b/eventmq/worker.py
@@ -18,16 +18,13 @@
18Defines different short-lived workers that execute jobs 18Defines different short-lived workers that execute jobs
19""" 19"""
20from importlib import import_module 20from importlib import import_module
21
22import logging 21import logging
23
24from multiprocessing import Process 22from multiprocessing import Process
25
26import os 23import os
27import sys 24import sys
28
29from threading import Thread 25from threading import Thread
30 26
27from .utils.functions import callable_from_name
31from . import conf 28from . import conf
32 29
33if sys.version[0] == '2': 30if sys.version[0] == '2':
@@ -180,7 +177,19 @@ def _run(queue, result_queue, logger):
180 "class_kwargs": {"value": 2} 177 "class_kwargs": {"value": 2}
181 } 178 }
182 """ 179 """
183 if any(conf.SETUP_CALLABLE) and any(conf.SETUP_PATH): 180 if any(conf.SUBPROCESS_SETUP_FUNC):
181 try:
182 logger.debug("Running setup ({}) for worker id {}".format(
183 conf.SUBPROCESS_SETUP_FUNC, os.getpid()))
184 setup_func = callable_from_name(conf.SUBPROCESS_SETUP_FUNC)
185 setup_func()
186 except Exception as e:
187 logger.warning('Unable to do setup task ({}): {}'
188 .format(conf.SUBPROCESS_SETUP_FUNC, str(e)))
189
190 elif any(conf.SETUP_CALLABLE) and any(conf.SETUP_PATH):
191 logger.warning("SETUP_CALLABLE and SETUP_PATH deprecated in favor for "
192 "SUBPROCESS_SETUP_FUNC")
184 try: 193 try:
185 logger.debug("Running setup ({}.{}) for worker id {}" 194 logger.debug("Running setup ({}.{}) for worker id {}"
186 .format( 195 .format(
@@ -193,6 +202,16 @@ def _run(queue, result_queue, logger):
193 .format(conf.SETUP_PATH, 202 .format(conf.SETUP_PATH,
194 conf.SETUP_CALLABLE, str(e))) 203 conf.SETUP_CALLABLE, str(e)))
195 204
205 if conf.JOB_ENTRY_FUNC:
206 job_entry_func = callable_from_name(conf.JOB_ENTRY_FUNC)
207 else:
208 job_entry_func = None
209
210 if conf.JOB_EXIT_FUNC:
211 job_exit_func = callable_from_name(conf.JOB_EXIT_FUNC)
212 else:
213 job_exit_func = None
214
196 while True: 215 while True:
197 # Blocking get so we don't spin cycles reading over and over 216 # Blocking get so we don't spin cycles reading over and over
198 try: 217 try:
@@ -204,7 +223,14 @@ def _run(queue, result_queue, logger):
204 if payload == 'DONE': 223 if payload == 'DONE':
205 break 224 break
206 225
226 if job_entry_func:
227 job_entry_func()
228
207 return_val = _run_job(payload, logger) 229 return_val = _run_job(payload, logger)
230
231 if job_exit_func:
232 job_exit_func()
233
208 # Signal that we're done with this job and put its return value on the 234 # Signal that we're done with this job and put its return value on the
209 # result queue 235 # result queue
210 result_queue.put(return_val) 236 result_queue.put(return_val)
@@ -278,4 +304,4 @@ def run_setup(setup_path, setup_callable):
278 304
279 setup_callable_ = getattr(setup_package, setup_callable) 305 setup_callable_ = getattr(setup_package, setup_callable)
280 306
281 setup_callable_() 307 return setup_callable_()
diff --git a/setup.py b/setup.py
index 64afbd9..9bacfbf 100644
--- a/setup.py
+++ b/setup.py
@@ -7,7 +7,7 @@ from setuptools import find_packages, setup
7 7
8setup( 8setup(
9 name='eventmq', 9 name='eventmq',
10 version='0.3.5.6', 10 version='0.3.6',
11 description='EventMQ job execution and messaging system based on ZeroMQ', 11 description='EventMQ job execution and messaging system based on ZeroMQ',
12 packages=find_packages(), 12 packages=find_packages(),
13 install_requires=['pyzmq==15.4.0', 13 install_requires=['pyzmq==15.4.0',