aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--circle.yml3
-rw-r--r--eventmq/client/debounce.py295
-rw-r--r--eventmq/client/messages.py133
-rw-r--r--eventmq/conf.py2
-rw-r--r--eventmq/exceptions.py7
-rw-r--r--eventmq/tests/test_client_messages.py4
-rw-r--r--eventmq/tests/test_debounce.py134
-rw-r--r--eventmq/utils/functions.py151
-rw-r--r--eventmq/worker.py25
9 files changed, 626 insertions, 128 deletions
diff --git a/circle.yml b/circle.yml
index dfdc164..bbbdb47 100644
--- a/circle.yml
+++ b/circle.yml
@@ -1,3 +1,6 @@
1machine:
2 services:
3 - redis
1test: 4test:
2 override: 5 override:
3 - pip install --upgrade -r requirements_.txt 6 - pip install --upgrade -r requirements_.txt
diff --git a/eventmq/client/debounce.py b/eventmq/client/debounce.py
new file mode 100644
index 0000000..e6f462e
--- /dev/null
+++ b/eventmq/client/debounce.py
@@ -0,0 +1,295 @@
1# This file is part of eventmq.
2#
3# eventmq is free software: you can redistribute it and/or modify it under the
4# terms of the GNU Lesser General Public License as published by the Free
5# Software Foundation, either version 2.1 of the License, or (at your option)
6# any later version.
7#
8# eventmq is distributed in the hope that it will be useful,
9# but WITHOUT ANY WARRANTY; without even the implied warranty of
10# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11# GNU Lesser General Public License for more details.
12#
13# You should have received a copy of the GNU Lesser General Public License
14# along with eventmq. If not, see <http://www.gnu.org/licenses/>.
15import logging
16import copy
17
18from .. import conf
19
20logger = logging.getLogger(__name__)
21
22# attempt to import redis
23try:
24 import redis # noqa
25except ImportError:
26 redis = None
27 logger.warning('Redis not installed, debounce support not available.')
28
29
30def _debounce_run_deferred_job(context):
31 """Called by the debounce system to actually run the job.
32
33 This is called by the debounce system at the end of the process, when the
34 job should actually run. It deserializes the callable, the arguments, and
35 tries to execute the callable with the arguments.
36
37 context keys:
38 path (str): The path of the callable function (the module information)
39 callable_name (str): The name of the callable
40 args (list): The arguments to pass when invoking the callable
41 kwargs (dict): The arguments to pass when invoking the callable
42 class_args (list): Arguments to use when initializing the callable
43 class (if it is a class)
44 class_kwargs (dict): The arguments to use when initializing the
45 callable class (if it is a class)
46 cache_key1 (str): The cache key for step 1
47 cache_key2 (str): The cache key for step 2
48 queue (str): The queue name for this job
49 job_id (str): The unique job identifier
50 scheduled (bool): Whether or not this was deferred immediately, or
51 scheduled for the future.
52
53 Args:
54 context (dict): See above for "context keys"
55 """
56 from ..sender import Sender
57 from ..utils.functions import run_function
58 from .messages import schedule
59
60 context = copy.deepcopy(context) # don't modify caller's context
61 queue = context.get('queue')
62 job_id = context.get('job_id')
63 path = context.get('path')
64 callable_name = context.get('callable_name')
65 scheduled = context.get('scheduled')
66 cache_key1 = context.get('cache_key1')
67 cache_key2 = context.get('cache_key2')
68 args = context.get('args')
69 kwargs = context.get('kwargs')
70 class_args = context.get('class_args')
71 class_kwargs = context.get('class_kwargs')
72
73 if queue:
74 queue = str(queue)
75
76 if job_id:
77 job_id = str(job_id)
78
79 logger.debug('DEBOUNCE: {}.{} - running {} job'.format(
80 path, callable_name, 'scheduled' if job_id else 'deferred'))
81
82 if scheduled and job_id:
83 # TODO: here we create a new socket just so we can unschedule the
84 # debunce scheduled job. This can be removed once the `run_x` feature
85 # is implemented that lets you specify how many times a scheduled job
86 # should run before stopping.
87 socket = Sender()
88 socket.connect(addr=conf.FRONTEND_ADDR)
89 schedule(socket, _debounce_run_deferred_job,
90 queue=queue, unschedule=True, class_args=(job_id, ))
91 socket.zsocket.close()
92
93 del socket
94
95 try:
96 redis_connection = redis.StrictRedis(
97 host=conf.RQ_HOST,
98 port=conf.RQ_PORT,
99 db=conf.RQ_DB,
100 password=conf.RQ_PASSWORD)
101 except Exception as e:
102 logger.error('DEBOUNCE: {}.{} - requested, but could not connect '
103 'to redis server'.format(path, callable_name, e))
104 redis_connection.delete(cache_key1)
105 if scheduled:
106 redis_connection.delete(cache_key2)
107 return
108
109 # if the timer countdown caused this function to run, and `cache_key1` is
110 # set, bail out. Otherwise, continue, because the cache is set prior to
111 # deferring the function.
112 if redis_connection.get(cache_key1) and scheduled:
113 logger.debug('DEBOUNCE: {}.{} - cache_key1 was set, not '
114 'running job.'.format(path, callable_name))
115 return
116
117 redis_connection.set(cache_key1, 1)
118 redis_connection.expire(
119 cache_key1, getattr(conf, 'DEBOUNCE_CACHE_KEY1_TIMEOUT', 60))
120
121 try:
122 run_function(
123 path, callable_name, class_args, class_kwargs, *args, **kwargs)
124 except Exception as e:
125 logger.error('DEBOUNCE: {}.{} - exception {}'.format(
126 path, callable_name, e))
127
128 redis_connection.delete(cache_key1)
129 if scheduled:
130 redis_connection.delete(cache_key2)
131
132
133def _debounce_deferred_job(context):
134 """Debounce the job.
135
136 context keys:
137 path (str): The path of the callable function (the module information)
138 callable_name (str): The name of the callable
139 args (list): The arguments to pass when invoking the callable
140 kwargs (dict): The arguments to pass when invoking the callable
141 class_args (list): Arguments to use when initializing the callable
142 class (if it is a class)
143 class_kwargs (dict): The arguments to use when initializing the
144 callable class (if it is a class)
145 queue (str): The queue name for this job
146 debounce_secs (int): Debounce schedule interval, in seconds
147
148 Args:
149 context (dict): See "context keys" above.
150 """
151 from ..sender import Sender
152 from ..utils.functions import arguments_hash
153 from .messages import schedule
154
155 job_id = arguments_hash(locals())
156 context = copy.deepcopy(context) # don't modify caller's context
157 path = context.get('path')
158 callable_name = context.get('callable_name')
159 debounce_secs = context.pop('debounce_secs')
160
161 if not redis:
162 logger.error('DEBOUNCE requested, but redis is not available.')
163 return
164
165 try:
166 redis_connection = redis.StrictRedis(
167 host=conf.RQ_HOST,
168 port=conf.RQ_PORT,
169 db=conf.RQ_DB,
170 password=conf.RQ_PASSWORD)
171 except Exception as e:
172 logger.error('Debounce requested, but could not connect to '
173 'redis server: {}.'.format(e))
174 return
175
176 cache_key1 = '{}_key1'.format(job_id)
177 cache_key2 = '{}_key2'.format(job_id)
178
179 logger.debug('DEBOUNCE: {}.{} - job id: {}'.format(
180 path, callable_name, job_id))
181
182 context.update({
183 'cache_key1': cache_key1,
184 'cache_key2': cache_key2,
185 'scheduled': False,
186 })
187
188 if not redis_connection.get(cache_key1):
189 logger.debug('DEBOUNCE: {}.{} - cache_key1 was not set, '
190 'deferring immediately.'.format(path, callable_name))
191 redis_connection.set(cache_key1, 1)
192 redis_connection.expire(
193 cache_key1, getattr(conf, 'DEBOUNCE_CACHE_KEY1_TIMEOUT', 60))
194
195 _debounce_run_deferred_job(context)
196 return
197 else:
198 logger.debug(
199 'DEBOUNCE: {}.{} - cache_key1 was set, '
200 'trying cache_key2...'.format(path, callable_name))
201
202 if not redis_connection.get(cache_key2):
203 logger.debug('DEBOUNCE: {}.{} - cache_key2 was not set, scheduling '
204 'for {} seconds'.format(path, callable_name,
205 debounce_secs))
206 redis_connection.set(cache_key2, 1)
207 redis_connection.expire(cache_key2, debounce_secs)
208
209 context['job_id'] = job_id
210 context['scheduled'] = True
211
212 # TODO: here we create a new socket just so we can schedule the
213 # debunce scheduled job. This should probably use the socket that was
214 # passed to the original `defer_job` call, however, this is not
215 # currently possible in eventmq.
216 socket = Sender()
217 socket.connect(addr=conf.FRONTEND_ADDR)
218
219 schedule(
220 socket=socket,
221 func=_debounce_run_deferred_job,
222 queue=context.get('queue'),
223 interval_secs=debounce_secs,
224 kwargs={'context': context},
225 headers=('nohaste', 'guarantee'),
226 class_args=(job_id, ),
227 )
228 socket.zsocket.close()
229
230 del socket
231 return
232 else:
233 logger.debug(
234 'DEBOUNCE: {}.{} - cache_key2 was set, dropping message. '.format(
235 path, callable_name))
236
237
238def _debounce_schedule(
239 socket, path, callable_name, debounce_secs, args=(), kwargs=None,
240 class_args=(), class_kwargs=None, reply_requested=False,
241 guarantee=False, retry_count=0, queue=conf.DEFAULT_QUEUE_NAME):
242 """Schedule the initial debounce.
243
244 Debounce works like this:
245
246 1. When the first job comes in, `cache_key1` will not be set, so it
247 will defer the job to run immediately. At the beginning of that job,
248 it will set `cache_key1`, and at the end, it will unset `cache_key1`.
249
250 2. If `cache_key1` is set and a new job comes in, if `cache_key2` is
251 not set, it will set `cache_key2` and schedule the job to run
252 `debounce_secs` in the future.
253
254 3. If a third job comes in while `cache_key1` and `cache_key2` are
255 set, it will be ignored.
256
257 Args:
258 path (str): The path of the callable function (the module information)
259 callable_name (str): The name of the callable
260 debounce_secs (int): Debounce schedule interval, in seconds
261 reply_requested (bool): request the return value of func as a reply
262 retry_count (int): How many times should be retried when encountering
263 an Exception or some other failure before giving up. (default: 0
264 or immediately fail)
265 queue (str): Name of queue to use when executing the job. If this value
266 evaluates to False, the default is used. Default: is configured
267 default queue name
268 """
269
270 from .messages import defer_job
271
272 logger.debug('DEBOUNCE: {}.{} - initial schedule'.format(
273 path, callable_name))
274
275 context = {
276 'path': path,
277 'callable_name': callable_name,
278 'args': args,
279 'kwargs': kwargs,
280 'class_args': class_args,
281 'class_kwargs': class_kwargs,
282 'queue': queue,
283 'scheduled': False,
284 'debounce_secs': debounce_secs,
285 }
286
287 defer_job(
288 socket,
289 _debounce_deferred_job,
290 reply_requested=reply_requested,
291 retry_count=retry_count,
292 queue=queue,
293 guarantee=guarantee,
294 kwargs={'context': context},
295 )
diff --git a/eventmq/client/messages.py b/eventmq/client/messages.py
index cd26b3f..e589d4d 100644
--- a/eventmq/client/messages.py
+++ b/eventmq/client/messages.py
@@ -16,21 +16,16 @@
16:mod:`messages` -- Client Messaging 16:mod:`messages` -- Client Messaging
17=================================== 17===================================
18""" 18"""
19import inspect
20import logging 19import logging
21import importlib
22from json import dumps as serialize 20from json import dumps as serialize
23 21
24from .. import conf 22from .. import conf
25from ..utils.messages import send_emqp_message 23from ..utils.messages import send_emqp_message
24from ..utils.functions import path_from_callable
26 25
27logger = logging.getLogger(__name__) 26logger = logging.getLogger(__name__)
28 27
29 28
30class CallableFromPathError(Exception):
31 pass
32
33
34def schedule(socket, func, interval_secs=None, args=(), kwargs=None, 29def schedule(socket, func, interval_secs=None, args=(), kwargs=None,
35 class_args=(), class_kwargs=None, headers=('guarantee',), 30 class_args=(), class_kwargs=None, headers=('guarantee',),
36 queue=conf.DEFAULT_QUEUE_NAME, unschedule=False, cron=None): 31 queue=conf.DEFAULT_QUEUE_NAME, unschedule=False, cron=None):
@@ -111,9 +106,10 @@ def schedule(socket, func, interval_secs=None, args=(), kwargs=None,
111 return msgid 106 return msgid
112 107
113 108
114def defer_job(socket, func, args=(), kwargs=None, class_args=(), 109def defer_job(
115 class_kwargs=None, reply_requested=False, guarantee=False, 110 socket, func, args=(), kwargs=None, class_args=(), class_kwargs=None,
116 retry_count=0, queue=conf.DEFAULT_QUEUE_NAME): 111 reply_requested=False, guarantee=False, retry_count=0,
112 debounce_secs=False, queue=conf.DEFAULT_QUEUE_NAME):
117 """ 113 """
118 Used to send a job to a worker to execute via `socket`. 114 Used to send a job to a worker to execute via `socket`.
119 115
@@ -130,11 +126,11 @@ def defer_job(socket, func, args=(), kwargs=None, class_args=(),
130 class_kwargs (dict): dict of **kwargs to pass to the class when 126 class_kwargs (dict): dict of **kwargs to pass to the class when
131 initializing (if applicable). 127 initializing (if applicable).
132 reply_requested (bool): request the return value of func as a reply 128 reply_requested (bool): request the return value of func as a reply
133 guarantee (bool): (Give your best effort) to guarantee that func is
134 executed. Exceptions and things will be logged.
135 retry_count (int): How many times should be retried when encountering 129 retry_count (int): How many times should be retried when encountering
136 an Exception or some other failure before giving up. (default: 0 130 an Exception or some other failure before giving up. (default: 0
137 or immediately fail) 131 or immediately fail)
132 debounce_secs (secs): Number of seconds to debounce the job. See
133 `debounce_deferred_job` for more information.
138 queue (str): Name of queue to use when executing the job. If this value 134 queue (str): Name of queue to use when executing the job. If this value
139 evaluates to False, the default is used. Default: is configured 135 evaluates to False, the default is used. Default: is configured
140 default queue name 136 default queue name
@@ -142,6 +138,8 @@ def defer_job(socket, func, args=(), kwargs=None, class_args=(),
142 str: ID for the message/deferred job. This value will be None if there 138 str: ID for the message/deferred job. This value will be None if there
143 was an error. 139 was an error.
144 """ 140 """
141 from eventmq.client import debounce
142
145 callable_name = None 143 callable_name = None
146 path = None 144 path = None
147 145
@@ -172,6 +170,25 @@ def defer_job(socket, func, args=(), kwargs=None, class_args=(),
172 format(func.__name__)) 170 format(func.__name__))
173 return 171 return
174 172
173 if debounce_secs:
174 logger.debug('DEBOUNCE: {} - called, debounce_secs: {}...'.format(
175 func.__name__,
176 debounce_secs))
177 debounce._debounce_schedule(
178 socket=socket,
179 path=path,
180 callable_name=callable_name,
181 debounce_secs=debounce_secs,
182 args=args,
183 kwargs=kwargs,
184 class_args=class_args,
185 class_kwargs=class_kwargs,
186 reply_requested=reply_requested,
187 guarantee=guarantee,
188 retry_count=retry_count,
189 queue=queue)
190 return
191
175 msg = ['run', { 192 msg = ['run', {
176 'callable': callable_name, 193 'callable': callable_name,
177 'path': path, 194 'path': path,
@@ -190,100 +207,6 @@ def defer_job(socket, func, args=(), kwargs=None, class_args=(),
190 return msgid 207 return msgid
191 208
192 209
193def path_from_callable(func):
194 """
195 Builds the module path in string format for a callable.
196
197 .. note:
198 To use a callable Object, pass Class.__call__ as func and provide any
199 class_args/class_kwargs. This is so side effects from pickling won't
200 occur.
201
202 Args:
203 func (callable): The function or method to build the path for
204
205 Returns:
206 list: (import path (w/ class seperated by a ':'), callable name) or
207 (None, None) on error.
208 """
209 callable_name = None
210
211 path = None
212 # Methods also have the func_name property
213 if inspect.ismethod(func):
214 path = ("{}:{}".format(func.__module__, func.im_class.__name__))
215 callable_name = func.func_name
216 elif inspect.isfunction(func):
217 path = func.__module__
218 callable_name = func.func_name
219 else:
220 # We should account for another callable type so log information
221 # about it
222 if hasattr(func, '__class__') and isinstance(func, func.__class__):
223 func_type = 'instanceobject'
224 else:
225 func_type = type(func)
226
227 logger.error('Encountered unknown callable ({}) type {}'.format(
228 func,
229 func_type
230 ))
231 return None, None
232
233 return path, callable_name
234
235
236def callable_from_path(path, callable_name, *args, **kwargs):
237 """Build a callable from a path and callable_name.
238
239 This function is the opposite of `path_from_callable`. It takes what is
240 returned from `build_module_name` and converts it back to the original
241 caller.
242
243 Args:
244 path (str): The module path of the callable. This is the first
245 position in the tuple returned from `path_from_callable`.
246 callable_name (str): The name of the function. This is the second
247 position of the tuple returned from `path_from_callable`.
248 *args (list): if `callable_name` is a method on a class, these
249 arguments will be passed to the constructor when instantiating the
250 class.
251 *kwargs (dict): if `callable_name` is a method on a class, these
252 arguments will be passed to the constructor when instantiating the
253 class.
254
255 Returns:
256 function: The callable
257 """
258 if ':' in path:
259 _pksplit = path.split(':')
260 s_package = _pksplit[0]
261 s_cls = _pksplit[1]
262 else:
263 s_package = path
264 s_cls = None
265
266 try:
267 package = importlib.import_module(s_package)
268 reload(package)
269 except Exception as e:
270 raise CallableFromPathError(str(e))
271
272 if s_cls:
273 cls = getattr(package, s_cls)
274
275 obj = cls(*args, **kwargs)
276 else:
277 obj = package
278
279 try:
280 callable_ = getattr(obj, callable_name)
281 except AttributeError as e:
282 raise CallableFromPathError(str(e))
283
284 return callable_
285
286
287def send_request(socket, message, reply_requested=False, guarantee=False, 210def send_request(socket, message, reply_requested=False, guarantee=False,
288 retry_count=0, queue=None): 211 retry_count=0, queue=None):
289 """ 212 """
diff --git a/eventmq/conf.py b/eventmq/conf.py
index 0c4b7cc..59c8f90 100644
--- a/eventmq/conf.py
+++ b/eventmq/conf.py
@@ -20,7 +20,7 @@
20#: SUPER_DEBUG basically enables more debugging logs. Specifically the messages 20#: SUPER_DEBUG basically enables more debugging logs. Specifically the messages
21#: at different levels in the application. 21#: at different levels in the application.
22#: Default: False 22#: Default: False
23SUPER_DEBUG = False 23SUPER_DEBUG = True
24 24
25#: Don't show HEARTBEAT message when debug logging is enabled 25#: Don't show HEARTBEAT message when debug logging is enabled
26#: Default: True 26#: Default: True
diff --git a/eventmq/exceptions.py b/eventmq/exceptions.py
index 955709f..1277cd2 100644
--- a/eventmq/exceptions.py
+++ b/eventmq/exceptions.py
@@ -55,3 +55,10 @@ class UnknownQueueError(EventMQError):
55 """ 55 """
56 Raised when a queue is not found in the internal list of queues. 56 Raised when a queue is not found in the internal list of queues.
57 """ 57 """
58
59
60class CallableFromPathError(EventMQError):
61 """
62 Raised when construction of a callable from a path and callable_name fails.
63 """
64 pass
diff --git a/eventmq/tests/test_client_messages.py b/eventmq/tests/test_client_messages.py
index 49d7ca2..3337987 100644
--- a/eventmq/tests/test_client_messages.py
+++ b/eventmq/tests/test_client_messages.py
@@ -105,7 +105,7 @@ class TestCase(unittest.TestCase):
105 ('eventmq.client.messages', 105 ('eventmq.client.messages',
106 'ERROR', 106 'ERROR',
107 'Encountered callable with no __module__ path nameless_func'), 107 'Encountered callable with no __module__ path nameless_func'),
108 ('eventmq.client.messages', 108 ('eventmq.utils.functions',
109 'ERROR', 109 'ERROR',
110 'Encountered unknown callable ({}) type instanceobject'. 110 'Encountered unknown callable ({}) type instanceobject'.
111 format(callable_obj)), 111 format(callable_obj)),
@@ -210,7 +210,7 @@ class TestCase(unittest.TestCase):
210 ('eventmq.client.messages', 210 ('eventmq.client.messages',
211 'ERROR', 211 'ERROR',
212 'Encountered callable with no __module__ path nameless_func'), 212 'Encountered callable with no __module__ path nameless_func'),
213 ('eventmq.client.messages', 213 ('eventmq.utils.functions',
214 'ERROR', 214 'ERROR',
215 'Encountered unknown callable ({}) type instanceobject'. 215 'Encountered unknown callable ({}) type instanceobject'.
216 format(callable_obj)), 216 format(callable_obj)),
diff --git a/eventmq/tests/test_debounce.py b/eventmq/tests/test_debounce.py
new file mode 100644
index 0000000..15008ab
--- /dev/null
+++ b/eventmq/tests/test_debounce.py
@@ -0,0 +1,134 @@
1import uuid
2import unittest
3import mock
4import redis
5
6from .. import conf
7
8from eventmq.client import messages
9
10
11def test_job_function():
12 return 'test_job_function'
13
14
15class TestCase(unittest.TestCase):
16 def setUp(self):
17 super(TestCase, self).setUp()
18 self.redis = redis.StrictRedis(
19 host=conf.RQ_HOST,
20 port=conf.RQ_PORT,
21 db=conf.RQ_DB,
22 password=conf.RQ_PASSWORD)
23 self.socket = mock.Mock()
24
25 self.job_id = str(uuid.uuid4())
26 self.arguments_hash_mock = mock.patch(
27 'eventmq.utils.functions.arguments_hash', lambda *x: self.job_id)
28 self.arguments_hash_mock.start()
29
30 def tearDown(self):
31 cache_key1 = '{}_key1'.format(self.job_id)
32 cache_key2 = '{}_key2'.format(self.job_id)
33 self.redis.delete(cache_key1)
34 self.redis.delete(cache_key2)
35
36 self.arguments_hash_mock.stop()
37
38 super(TestCase, self).tearDown()
39
40 @mock.patch('eventmq.client.debounce._debounce_schedule')
41 def test_debounce_initial_call(self, debounce_schedule_mock):
42 messages.defer_job(
43 self.socket,
44 test_job_function,
45 debounce_secs=1)
46
47 debounce_schedule_mock.assert_called_with(
48 path='eventmq.tests.test_debounce',
49 callable_name='test_job_function',
50 args=(),
51 kwargs={},
52 class_args=(),
53 class_kwargs={},
54 queue='default',
55 reply_requested=False,
56 retry_count=0,
57 guarantee=False,
58 debounce_secs=1,
59 socket=self.socket)
60
61 @mock.patch('eventmq.client.debounce._debounce_run_deferred_job')
62 def test_debounce_step1(self, run_deferred_job_mock):
63 """Should call through to _debounce_run_deferred_job."""
64 from eventmq.client import debounce
65 from eventmq.utils.functions import path_from_callable
66
67 path, callable_name = path_from_callable(test_job_function)
68
69 context = {
70 'job_id': self.job_id,
71 'cache_key1': '{}_key1'.format(self.job_id),
72 'cache_key2': '{}_key2'.format(self.job_id),
73 'path': path,
74 'callable_name': callable_name,
75 'queue': 'default',
76 'debounce_secs': 1,
77 'scheduled': False,
78 }
79
80 debounce._debounce_deferred_job(context)
81
82 context.pop('debounce_secs', None)
83
84 run_deferred_job_mock.assert_called_with(context)
85
86 @mock.patch('eventmq.sender.Sender')
87 @mock.patch('eventmq.client.debounce._debounce_run_deferred_job')
88 @mock.patch('eventmq.client.messages.schedule')
89 def test_debounce_step2(self, schedule_mock, run_deferred_job_mock,
90 sender_mock):
91 """Should call through to _debounce_run_deferred_job."""
92 from eventmq.client import debounce
93 from eventmq.utils.functions import path_from_callable
94
95 sender_mock.return_value = self.socket
96
97 path, callable_name = path_from_callable(test_job_function)
98
99 context = {
100 'job_id': self.job_id,
101 'cache_key1': '{}_key1'.format(self.job_id),
102 'cache_key2': '{}_key2'.format(self.job_id),
103 'path': path,
104 'callable_name': callable_name,
105 'queue': 'default',
106 'debounce_secs': 1,
107 'scheduled': False,
108 }
109
110 debounce._debounce_deferred_job(context)
111
112 context.pop('debounce_secs', None)
113
114 run_deferred_job_mock.assert_called_with(context)
115 run_deferred_job_mock.reset_mock()
116
117 # this time, since we already went through step1, it should schedule a
118 # job for step2
119 context['debounce_secs'] = 1
120 debounce._debounce_deferred_job(context)
121
122 run_deferred_job_mock.assert_not_called()
123
124 context['scheduled'] = True
125 context.pop('debounce_secs', None)
126
127 schedule_mock.assert_called_with(
128 socket=self.socket,
129 func=run_deferred_job_mock,
130 headers=('nohaste', 'guarantee'),
131 class_args=(self.job_id,),
132 interval_secs=1,
133 kwargs={'context': context},
134 queue='default')
diff --git a/eventmq/utils/functions.py b/eventmq/utils/functions.py
new file mode 100644
index 0000000..0bd1ba9
--- /dev/null
+++ b/eventmq/utils/functions.py
@@ -0,0 +1,151 @@
1import json
2import hashlib
3import importlib
4import inspect
5
6from .. import log
7from ..exceptions import CallableFromPathError
8
9logger = log.setup_logger(__name__)
10
11
12class IgnoreJSONEncoder(json.JSONEncoder):
13 """JSON Encoder that ignores unknown keys."""
14 def default(self, obj):
15 try:
16 return super(IgnoreJSONEncoder, self).default(obj)
17 except TypeError:
18 return None
19
20
21def run_function(path, callable_name,
22 class_args=(), class_kwargs=None, *args, **kwargs):
23 """Constructs a callable from `path` and `callable_name` and calls it.
24
25 Args:
26 class_args (list): If the callable is a class method, these args will
27 be used to initialize the class.
28 class_kwargs (dict): If the callable is a class method, these kwargs
29 will be used to initialize the class.
30 args (list): These arguments will be passed as parameters to the
31 callable
32 kwargs (dict): These keyword arguments will be passed as parameters to
33 the callable.
34
35 Return:
36 object: Whatever is returned by calling the callable will be returned
37 from this function.
38 """
39 try:
40 callable_ = callable_from_path(
41 path, callable_name, *class_args, **class_kwargs)
42 except CallableFromPathError as e:
43 logger.exception('Error importing callable {}.{}: {}'.format(
44 path, callable_name, str(e)))
45 return
46
47 return callable_(*args, **kwargs)
48
49
50def arguments_hash(*args, **kwargs):
51 """Takes `args` and `kwargs` and creates a unique identifier."""
52 args = {
53 'args': args,
54 'kwargs': kwargs,
55 }
56
57 data = json.dumps(args, cls=IgnoreJSONEncoder)
58 return hashlib.sha1(data).hexdigest()
59
60
61def path_from_callable(func):
62 """
63 Builds the module path in string format for a callable.
64
65 .. note:
66 To use a callable Object, pass Class.__call__ as func and provide any
67 class_args/class_kwargs. This is so side effects from pickling won't
68 occur.
69
70 Args:
71 func (callable): The function or method to build the path for
72
73 Returns:
74 list: (import path (w/ class seperated by a ':'), callable name) or
75 (None, None) on error.
76 """
77 callable_name = None
78
79 path = None
80 # Methods also have the func_name property
81 if inspect.ismethod(func):
82 path = ("{}:{}".format(func.__module__, func.im_class.__name__))
83 callable_name = func.func_name
84 elif inspect.isfunction(func):
85 path = func.__module__
86 callable_name = func.func_name
87 else:
88 # We should account for another callable type so log information
89 # about it
90 if hasattr(func, '__class__') and isinstance(func, func.__class__):
91 func_type = 'instanceobject'
92 else:
93 func_type = type(func)
94
95 logger.error('Encountered unknown callable ({}) type {}'.format(
96 func,
97 func_type
98 ))
99 return None, None
100
101 return path, callable_name
102
103
104def callable_from_path(path, callable_name, *args, **kwargs):
105 """Build a callable from a path and callable_name.
106
107 This function is the opposite of `path_from_callable`. It takes what is
108 returned from `build_module_name` and converts it back to the original
109 caller.
110
111 Args:
112 path (str): The module path of the callable. This is the first
113 position in the tuple returned from `path_from_callable`.
114 callable_name (str): The name of the function. This is the second
115 position of the tuple returned from `path_from_callable`.
116 *args (list): if `callable_name` is a method on a class, these
117 arguments will be passed to the constructor when instantiating the
118 class.
119 *kwargs (dict): if `callable_name` is a method on a class, these
120 arguments will be passed to the constructor when instantiating the
121 class.
122
123 Returns:
124 function: The callable
125 """
126 if ':' in path:
127 _pksplit = path.split(':')
128 s_package = _pksplit[0]
129 s_cls = _pksplit[1]
130 else:
131 s_package = path
132 s_cls = None
133
134 try:
135 package = importlib.import_module(s_package)
136 reload(package)
137 except Exception as e:
138 raise CallableFromPathError(str(e))
139
140 if s_cls:
141 cls = getattr(package, s_cls)
142 obj = cls(*args, **kwargs)
143 else:
144 obj = package
145
146 try:
147 callable_ = getattr(obj, callable_name)
148 except AttributeError as e:
149 raise CallableFromPathError(str(e))
150
151 return callable_
diff --git a/eventmq/worker.py b/eventmq/worker.py
index 852c7da..6b1773b 100644
--- a/eventmq/worker.py
+++ b/eventmq/worker.py
@@ -17,7 +17,7 @@
17=============================== 17===============================
18Defines different short-lived workers that execute jobs 18Defines different short-lived workers that execute jobs
19""" 19"""
20from client.messages import callable_from_path, CallableFromPathError 20from .utils.functions import run_function
21 21
22# the run function is executed in a different process, so we need to set the 22# the run function is executed in a different process, so we need to set the
23# logger up. 23# logger up.
@@ -37,27 +37,12 @@ def run(payload, msgid):
37 callable_name = payload.get('callable') 37 callable_name = payload.get('callable')
38 class_args = payload.get('class_args', tuple()) or tuple() 38 class_args = payload.get('class_args', tuple()) or tuple()
39 class_kwargs = payload.get('class_kwargs', dict()) or dict() 39 class_kwargs = payload.get('class_kwargs', dict()) or dict()
40 args = payload.get('args', tuple()) or tuple()
41 kwargs = payload.get('kwargs', dict()) or dict()
40 42
41 try: 43 try:
42 callable_ = callable_from_path( 44 r = run_function(
43 path, callable_name, *class_args, **class_kwargs) 45 path, callable_name, class_args, class_kwargs, *args, **kwargs)
44 except CallableFromPathError as e:
45 logger.exception('Error importing callable {}.{}: {}'.format(
46 path, callable_name, str(e)))
47 return (msgid, str(e))
48
49 if "args" in payload:
50 args = payload["args"]
51 else:
52 args = ()
53
54 if "kwargs" in payload:
55 kwargs = payload["kwargs"]
56 else:
57 kwargs = {}
58
59 try:
60 r = callable_(*args, **kwargs)
61 return (msgid, r) 46 return (msgid, r)
62 except Exception as e: 47 except Exception as e:
63 logger.exception(e) 48 logger.exception(e)