aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--etc/eventmq.conf-dist15
-rw-r--r--eventmq/__init__.py2
-rw-r--r--eventmq/conf.py9
-rw-r--r--eventmq/jobmanager.py2
-rw-r--r--eventmq/router.py7
-rw-r--r--eventmq/scheduler.py2
-rw-r--r--eventmq/tests/test_worker.py139
-rw-r--r--eventmq/worker.py38
-rw-r--r--setup.py2
9 files changed, 199 insertions, 17 deletions
diff --git a/etc/eventmq.conf-dist b/etc/eventmq.conf-dist
index 43ec6f2..9a59f2a 100644
--- a/etc/eventmq.conf-dist
+++ b/etc/eventmq.conf-dist
@@ -19,9 +19,22 @@ scheduler_addr=tcp://eventmq:47291
19 19
20[jobmanager] 20[jobmanager]
21worker_addr=tcp://127.0.0.1:47290 21worker_addr=tcp://127.0.0.1:47290
22queues=[[50,"google"], [40,"pushes"], [10,"default"]] 22
23# Defines the weight and name of queues this worker deals with.
24queues=[[20,"heavy-cpu"], [30,"low-cpu"], [10,"default"]]
25
26# Specifies the number of of sub-processes to spawn to process jobs concurrently
23concurrent_jobs=2 27concurrent_jobs=2
24 28
29# This function is executed when EventMQ creates a new worker subprocess
30# subprocess_setup_func = path.to.my_setup_function
31
32# This function is executed before every job
33# job_entry_func = path.to.my_job_setup_function
34
35# This function is executed after every job
36# job_exit_func = path.to.my_job_teardown_function
37
25[publisher] 38[publisher]
26publisher_incoming_addr=tcp://0.0.0.0:47298 39publisher_incoming_addr=tcp://0.0.0.0:47298
27publisher_outgoing_addr=tcp://0.0.0.0:47299 \ No newline at end of file 40publisher_outgoing_addr=tcp://0.0.0.0:47299 \ No newline at end of file
diff --git a/eventmq/__init__.py b/eventmq/__init__.py
index aefc3a2..2e180ed 100644
--- a/eventmq/__init__.py
+++ b/eventmq/__init__.py
@@ -1,5 +1,5 @@
1__author__ = 'EventMQ Contributors' 1__author__ = 'EventMQ Contributors'
2__version__ = '0.3.5.6' 2__version__ = '0.3.6'
3 3
4PROTOCOL_VERSION = 'eMQP/1.0' 4PROTOCOL_VERSION = 'eMQP/1.0'
5 5
diff --git a/eventmq/conf.py b/eventmq/conf.py
index 8609716..715bbb2 100644
--- a/eventmq/conf.py
+++ b/eventmq/conf.py
@@ -89,9 +89,18 @@ RQ_PASSWORD = ''
89MAX_JOB_COUNT = 1024 89MAX_JOB_COUNT = 1024
90 90
91# Path/Callable to run on start of a worker process 91# Path/Callable to run on start of a worker process
92# These options are deprecated for the more user-friendly
93# SUBPROCESS_SETUP_FUNC which can be a full path to a function.
92SETUP_PATH = '' 94SETUP_PATH = ''
93SETUP_CALLABLE = '' 95SETUP_CALLABLE = ''
94 96
97# Function to run on the start of a new worker subprocess
98SUBPROCESS_SETUP_FUNC = ''
99
100# function to be run before the execution of every job
101JOB_ENTRY_FUNC = ''
102# function to be run after the execution of every job
103JOB_EXIT_FUNC = ''
95# Time to wait after receiving SIGTERM to kill the workers in the jobmanager 104# Time to wait after receiving SIGTERM to kill the workers in the jobmanager
96# forecfully 105# forecfully
97KILL_GRACE_PERIOD = 300 106KILL_GRACE_PERIOD = 300
diff --git a/eventmq/jobmanager.py b/eventmq/jobmanager.py
index 78a080b..f4a928c 100644
--- a/eventmq/jobmanager.py
+++ b/eventmq/jobmanager.py
@@ -29,6 +29,7 @@ import time
29import zmq 29import zmq
30 30
31from eventmq.log import setup_logger 31from eventmq.log import setup_logger
32from . import __version__
32from . import conf 33from . import conf
33from .constants import KBYE, STATUS 34from .constants import KBYE, STATUS
34from .poller import Poller, POLLIN 35from .poller import Poller, POLLIN
@@ -80,6 +81,7 @@ class JobManager(HeartbeatMixin, EMQPService):
80 #: Define the name of this JobManager instance. Useful to know when 81 #: Define the name of this JobManager instance. Useful to know when
81 #: referring to the logs. 82 #: referring to the logs.
82 self.name = kwargs.pop('name', None) or generate_device_name() 83 self.name = kwargs.pop('name', None) or generate_device_name()
84 logger.info('EventMQ Version {}'.format(__version__))
83 logger.info('Initializing JobManager {}...'.format(self.name)) 85 logger.info('Initializing JobManager {}...'.format(self.name))
84 86
85 #: keep track of workers 87 #: keep track of workers
diff --git a/eventmq/router.py b/eventmq/router.py
index 0b896b3..81ca257 100644
--- a/eventmq/router.py
+++ b/eventmq/router.py
@@ -23,6 +23,7 @@ import logging
23import signal 23import signal
24 24
25from eventmq.log import setup_logger, setup_wal_logger 25from eventmq.log import setup_logger, setup_wal_logger
26from . import __version__
26from . import conf, constants, exceptions, poller, receiver 27from . import conf, constants, exceptions, poller, receiver
27from .constants import ( 28from .constants import (
28 CLIENT_TYPE, DISCONNECT, KBYE, PROTOCOL_VERSION, ROUTER_SHOW_SCHEDULERS, 29 CLIENT_TYPE, DISCONNECT, KBYE, PROTOCOL_VERSION, ROUTER_SHOW_SCHEDULERS,
@@ -51,8 +52,11 @@ class Router(HeartbeatMixin):
51 def __init__(self, *args, **kwargs): 52 def __init__(self, *args, **kwargs):
52 super(Router, self).__init__(*args, **kwargs) # Creates _meta 53 super(Router, self).__init__(*args, **kwargs) # Creates _meta
53 54
55 setup_logger("eventmq")
56
54 self.name = generate_device_name() 57 self.name = generate_device_name()
55 logger.info('Initializing Router %s...' % self.name) 58 logger.info('EventMQ Version {}'.format(__version__))
59 logger.info('Initializing Router {}...'.format(self.name))
56 60
57 self.poller = poller.Poller() 61 self.poller = poller.Poller()
58 62
@@ -943,7 +947,6 @@ class Router(HeartbeatMixin):
943 """ 947 """
944 Kick off router with logging and settings import 948 Kick off router with logging and settings import
945 """ 949 """
946 setup_logger('eventmq')
947 import_settings() 950 import_settings()
948 setup_wal_logger('eventmq-wal', conf.WAL) 951 setup_wal_logger('eventmq-wal', conf.WAL)
949 self.start(frontend_addr=conf.FRONTEND_ADDR, 952 self.start(frontend_addr=conf.FRONTEND_ADDR,
diff --git a/eventmq/scheduler.py b/eventmq/scheduler.py
index 32c9b46..2ff726a 100644
--- a/eventmq/scheduler.py
+++ b/eventmq/scheduler.py
@@ -29,6 +29,7 @@ from six import next
29 29
30from eventmq.log import setup_logger 30from eventmq.log import setup_logger
31 31
32from . import __version__
32from . import conf, constants 33from . import conf, constants
33from .client.messages import send_request 34from .client.messages import send_request
34from .constants import KBYE 35from .constants import KBYE
@@ -53,6 +54,7 @@ class Scheduler(HeartbeatMixin, EMQPService):
53 def __init__(self, *args, **kwargs): 54 def __init__(self, *args, **kwargs):
54 self.name = kwargs.get('name', None) 55 self.name = kwargs.get('name', None)
55 56
57 logger.info('EventMQ Version {}'.format(__version__))
56 logger.info('Initializing Scheduler...') 58 logger.info('Initializing Scheduler...')
57 import_settings() 59 import_settings()
58 super(Scheduler, self).__init__(*args, **kwargs) 60 super(Scheduler, self).__init__(*args, **kwargs)
diff --git a/eventmq/tests/test_worker.py b/eventmq/tests/test_worker.py
index 5e72f39..ec13be6 100644
--- a/eventmq/tests/test_worker.py
+++ b/eventmq/tests/test_worker.py
@@ -14,11 +14,21 @@
14# along with eventmq. If not, see <http://www.gnu.org/licenses/>. 14# along with eventmq. If not, see <http://www.gnu.org/licenses/>.
15 15
16import logging 16import logging
17import sys
17import time 18import time
18 19
20import mock
21
19from .. import worker 22from .. import worker
20 23
24if sys.version[0] == '2':
25 import Queue
26else:
27 import queue as Queue
28
29
21ADDR = 'inproc://pour_the_rice_in_the_thing' 30ADDR = 'inproc://pour_the_rice_in_the_thing'
31SETUP_SUCCESS_RETVAL = 'job setup success'
22 32
23 33
24def test_run_with_timeout(): 34def test_run_with_timeout():
@@ -33,11 +43,124 @@ def test_run_with_timeout():
33 assert msgid 43 assert msgid
34 44
35 45
46@mock.patch('eventmq.worker.callable_from_name')
47def test_run_job_setup_hook(callable_from_name_mock):
48 from eventmq import conf
49
50 setup_func_str = 'eventmq.tests.test_worker.job_setup_hook'
51 callable_from_name_mock.return_value = mock.Mock()
52
53 payload = {
54 'path': 'eventmq.tests.test_worker',
55 'callable': 'job',
56 'args': [2]
57 }
58
59 q, res_q = Queue.Queue(), Queue.Queue()
60
61 q.put(payload)
62 q.put('DONE')
63
64 try:
65 conf.JOB_ENTRY_FUNC = setup_func_str
66 worker._run(q, res_q, logging.getLogger())
67 finally:
68 conf.JOB_ENTRY_FUNC = ''
69
70 callable_from_name_mock.assert_called_with(setup_func_str)
71 assert callable_from_name_mock.return_value.call_count == 1
72
73
74@mock.patch('eventmq.worker.callable_from_name')
75def test_run_job_teardown_hook(callable_from_name_mock):
76 from eventmq import conf
77
78 teardown_func_str = 'eventmq.tests.test_worker.job_teardown_hook'
79 callable_from_name_mock.return_value = mock.Mock()
80
81 payload = {
82 'path': 'eventmq.tests.test_worker',
83 'callable': 'job',
84 'args': [2]
85 }
86
87 q, res_q = Queue.Queue(), Queue.Queue()
88
89 q.put(payload)
90 q.put('DONE')
91
92 try:
93 conf.JOB_EXIT_FUNC = teardown_func_str
94 worker._run(q, res_q, logging.getLogger())
95 finally:
96 conf.JOB_EXIT_FUNC = ''
97
98 callable_from_name_mock.assert_called_with(teardown_func_str)
99 assert callable_from_name_mock.return_value.call_count == 1
100
101
102@mock.patch('eventmq.worker.callable_from_name')
103def test_run_subprocess_setup_func(callable_from_name_mock):
104 from eventmq import conf
105
106 setup_func_str = 'eventmq.tests.test_worker.process_setup_hook'
107 callable_from_name_mock.return_value = mock.Mock()
108
109 payload = {
110 'path': 'eventmq.tests.test_worker',
111 'callable': 'job',
112 'args': [2]
113 }
114
115 q, res_q = Queue.Queue(), Queue.Queue()
116
117 q.put(payload)
118 q.put('DONE')
119
120 try:
121 conf.SUBPROCESS_SETUP_FUNC = setup_func_str
122 worker._run(q, res_q, logging.getLogger())
123 finally:
124 conf.SUBPROCESS_SETUP_FUNC = ''
125
126 callable_from_name_mock.assert_called_with(setup_func_str)
127 assert callable_from_name_mock.return_value.call_count == 1
128
129
130@mock.patch('eventmq.worker.run_setup')
131def test_run_run_setup_func(run_setup_mock):
132 from eventmq import conf
133
134 setup_func_path = 'eventmq.tests.test_worker'
135 setup_func_callable = 'process_setup_hook'
136
137 payload = {
138 'path': 'eventmq.tests.test_worker',
139 'callable': 'job',
140 'args': [2]
141 }
142
143 q, res_q = Queue.Queue(), Queue.Queue()
144
145 q.put(payload)
146 q.put('DONE')
147
148 try:
149 conf.SETUP_PATH = setup_func_path
150 conf.SETUP_CALLABLE = setup_func_callable
151 worker._run(q, res_q, logging.getLogger())
152 finally:
153 conf.SETUP_PATH = ''
154 conf.SETUP_CALLABLE = ''
155
156 run_setup_mock.assert_called_with(setup_func_path, setup_func_callable)
157
158
36def test_run_setup(): 159def test_run_setup():
37 setup_callable = 'pre_hook' 160 setup_callable = 'process_setup_hook'
38 setup_path = 'eventmq.tests.test_worker' 161 setup_path = 'eventmq.tests.test_worker'
39 162
40 worker.run_setup(setup_path, setup_callable) 163 assert worker.run_setup(setup_path, setup_callable)
41 164
42 165
43def job(sleep_time=0): 166def job(sleep_time=0):
@@ -46,9 +169,13 @@ def job(sleep_time=0):
46 return True 169 return True
47 170
48 171
49def pre_hook(): 172def process_setup_hook():
50 return 1 173 return True
51 174
52 175
53def post_hook(): 176def job_setup_hook():
54 return 1 177 return SETUP_SUCCESS_RETVAL
178
179
180def job_teardown_hook():
181 return True
diff --git a/eventmq/worker.py b/eventmq/worker.py
index 1621578..1f3a6fa 100644
--- a/eventmq/worker.py
+++ b/eventmq/worker.py
@@ -18,17 +18,14 @@
18Defines different short-lived workers that execute jobs 18Defines different short-lived workers that execute jobs
19""" 19"""
20from importlib import import_module 20from importlib import import_module
21
22import logging 21import logging
23
24from multiprocessing import Process 22from multiprocessing import Process
25
26import os 23import os
27import sys 24import sys
28
29from threading import Thread 25from threading import Thread
30 26
31from . import conf 27from . import conf
28from .utils.functions import callable_from_name
32 29
33if sys.version[0] == '2': 30if sys.version[0] == '2':
34 import Queue 31 import Queue
@@ -180,7 +177,19 @@ def _run(queue, result_queue, logger):
180 "class_kwargs": {"value": 2} 177 "class_kwargs": {"value": 2}
181 } 178 }
182 """ 179 """
183 if any(conf.SETUP_CALLABLE) and any(conf.SETUP_PATH): 180 if any(conf.SUBPROCESS_SETUP_FUNC):
181 try:
182 logger.debug("Running setup ({}) for worker id {}".format(
183 conf.SUBPROCESS_SETUP_FUNC, os.getpid()))
184 setup_func = callable_from_name(conf.SUBPROCESS_SETUP_FUNC)
185 setup_func()
186 except Exception as e:
187 logger.warning('Unable to do setup task ({}): {}'
188 .format(conf.SUBPROCESS_SETUP_FUNC, str(e)))
189
190 elif any(conf.SETUP_CALLABLE) and any(conf.SETUP_PATH):
191 logger.warning("SETUP_CALLABLE and SETUP_PATH deprecated in favor for "
192 "SUBPROCESS_SETUP_FUNC")
184 try: 193 try:
185 logger.debug("Running setup ({}.{}) for worker id {}" 194 logger.debug("Running setup ({}.{}) for worker id {}"
186 .format( 195 .format(
@@ -193,6 +202,16 @@ def _run(queue, result_queue, logger):
193 .format(conf.SETUP_PATH, 202 .format(conf.SETUP_PATH,
194 conf.SETUP_CALLABLE, str(e))) 203 conf.SETUP_CALLABLE, str(e)))
195 204
205 if conf.JOB_ENTRY_FUNC:
206 job_entry_func = callable_from_name(conf.JOB_ENTRY_FUNC)
207 else:
208 job_entry_func = None
209
210 if conf.JOB_EXIT_FUNC:
211 job_exit_func = callable_from_name(conf.JOB_EXIT_FUNC)
212 else:
213 job_exit_func = None
214
196 while True: 215 while True:
197 # Blocking get so we don't spin cycles reading over and over 216 # Blocking get so we don't spin cycles reading over and over
198 try: 217 try:
@@ -204,7 +223,14 @@ def _run(queue, result_queue, logger):
204 if payload == 'DONE': 223 if payload == 'DONE':
205 break 224 break
206 225
226 if job_entry_func:
227 job_entry_func()
228
207 return_val = _run_job(payload, logger) 229 return_val = _run_job(payload, logger)
230
231 if job_exit_func:
232 job_exit_func()
233
208 # Signal that we're done with this job and put its return value on the 234 # Signal that we're done with this job and put its return value on the
209 # result queue 235 # result queue
210 result_queue.put(return_val) 236 result_queue.put(return_val)
@@ -278,4 +304,4 @@ def run_setup(setup_path, setup_callable):
278 304
279 setup_callable_ = getattr(setup_package, setup_callable) 305 setup_callable_ = getattr(setup_package, setup_callable)
280 306
281 setup_callable_() 307 return setup_callable_()
diff --git a/setup.py b/setup.py
index 64afbd9..9bacfbf 100644
--- a/setup.py
+++ b/setup.py
@@ -7,7 +7,7 @@ from setuptools import find_packages, setup
7 7
8setup( 8setup(
9 name='eventmq', 9 name='eventmq',
10 version='0.3.5.6', 10 version='0.3.6',
11 description='EventMQ job execution and messaging system based on ZeroMQ', 11 description='EventMQ job execution and messaging system based on ZeroMQ',
12 packages=find_packages(), 12 packages=find_packages(),
13 install_requires=['pyzmq==15.4.0', 13 install_requires=['pyzmq==15.4.0',