diff options
| author | jason | 2016-08-31 14:29:04 -0600 |
|---|---|---|
| committer | GitHub | 2016-08-31 14:29:04 -0600 |
| commit | 3b04545442b63eeee8e60e43c2e690849e0751f5 (patch) | |
| tree | 080d99426447f5e93b952ffba1b56aba76309843 | |
| parent | cd4d1eac53881e55fdb635d76276be8721cd6ad2 (diff) | |
| parent | 222c7492d0473f67ed2a9bb536bd41b1ea8eafe7 (diff) | |
| download | eventmq-0.2.5.tar.gz eventmq-0.2.5.zip | |
Merge pull request #41 from synic/debounce0.2.5
Add debounce function
| -rw-r--r-- | circle.yml | 3 | ||||
| -rw-r--r-- | eventmq/client/debounce.py | 295 | ||||
| -rw-r--r-- | eventmq/client/messages.py | 133 | ||||
| -rw-r--r-- | eventmq/conf.py | 2 | ||||
| -rw-r--r-- | eventmq/exceptions.py | 7 | ||||
| -rw-r--r-- | eventmq/tests/test_client_messages.py | 4 | ||||
| -rw-r--r-- | eventmq/tests/test_debounce.py | 134 | ||||
| -rw-r--r-- | eventmq/utils/functions.py | 151 | ||||
| -rw-r--r-- | eventmq/worker.py | 25 |
9 files changed, 626 insertions, 128 deletions
| @@ -1,3 +1,6 @@ | |||
| 1 | machine: | ||
| 2 | services: | ||
| 3 | - redis | ||
| 1 | test: | 4 | test: |
| 2 | override: | 5 | override: |
| 3 | - pip install --upgrade -r requirements_.txt | 6 | - pip install --upgrade -r requirements_.txt |
diff --git a/eventmq/client/debounce.py b/eventmq/client/debounce.py new file mode 100644 index 0000000..e6f462e --- /dev/null +++ b/eventmq/client/debounce.py | |||
| @@ -0,0 +1,295 @@ | |||
| 1 | # This file is part of eventmq. | ||
| 2 | # | ||
| 3 | # eventmq is free software: you can redistribute it and/or modify it under the | ||
| 4 | # terms of the GNU Lesser General Public License as published by the Free | ||
| 5 | # Software Foundation, either version 2.1 of the License, or (at your option) | ||
| 6 | # any later version. | ||
| 7 | # | ||
| 8 | # eventmq is distributed in the hope that it will be useful, | ||
| 9 | # but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
| 10 | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||
| 11 | # GNU Lesser General Public License for more details. | ||
| 12 | # | ||
| 13 | # You should have received a copy of the GNU Lesser General Public License | ||
| 14 | # along with eventmq. If not, see <http://www.gnu.org/licenses/>. | ||
| 15 | import logging | ||
| 16 | import copy | ||
| 17 | |||
| 18 | from .. import conf | ||
| 19 | |||
| 20 | logger = logging.getLogger(__name__) | ||
| 21 | |||
| 22 | # attempt to import redis | ||
| 23 | try: | ||
| 24 | import redis # noqa | ||
| 25 | except ImportError: | ||
| 26 | redis = None | ||
| 27 | logger.warning('Redis not installed, debounce support not available.') | ||
| 28 | |||
| 29 | |||
| 30 | def _debounce_run_deferred_job(context): | ||
| 31 | """Called by the debounce system to actually run the job. | ||
| 32 | |||
| 33 | This is called by the debounce system at the end of the process, when the | ||
| 34 | job should actually run. It deserializes the callable, the arguments, and | ||
| 35 | tries to execute the callable with the arguments. | ||
| 36 | |||
| 37 | context keys: | ||
| 38 | path (str): The path of the callable function (the module information) | ||
| 39 | callable_name (str): The name of the callable | ||
| 40 | args (list): The arguments to pass when invoking the callable | ||
| 41 | kwargs (dict): The arguments to pass when invoking the callable | ||
| 42 | class_args (list): Arguments to use when initializing the callable | ||
| 43 | class (if it is a class) | ||
| 44 | class_kwargs (dict): The arguments to use when initializing the | ||
| 45 | callable class (if it is a class) | ||
| 46 | cache_key1 (str): The cache key for step 1 | ||
| 47 | cache_key2 (str): The cache key for step 2 | ||
| 48 | queue (str): The queue name for this job | ||
| 49 | job_id (str): The unique job identifier | ||
| 50 | scheduled (bool): Whether or not this was deferred immediately, or | ||
| 51 | scheduled for the future. | ||
| 52 | |||
| 53 | Args: | ||
| 54 | context (dict): See above for "context keys" | ||
| 55 | """ | ||
| 56 | from ..sender import Sender | ||
| 57 | from ..utils.functions import run_function | ||
| 58 | from .messages import schedule | ||
| 59 | |||
| 60 | context = copy.deepcopy(context) # don't modify caller's context | ||
| 61 | queue = context.get('queue') | ||
| 62 | job_id = context.get('job_id') | ||
| 63 | path = context.get('path') | ||
| 64 | callable_name = context.get('callable_name') | ||
| 65 | scheduled = context.get('scheduled') | ||
| 66 | cache_key1 = context.get('cache_key1') | ||
| 67 | cache_key2 = context.get('cache_key2') | ||
| 68 | args = context.get('args') | ||
| 69 | kwargs = context.get('kwargs') | ||
| 70 | class_args = context.get('class_args') | ||
| 71 | class_kwargs = context.get('class_kwargs') | ||
| 72 | |||
| 73 | if queue: | ||
| 74 | queue = str(queue) | ||
| 75 | |||
| 76 | if job_id: | ||
| 77 | job_id = str(job_id) | ||
| 78 | |||
| 79 | logger.debug('DEBOUNCE: {}.{} - running {} job'.format( | ||
| 80 | path, callable_name, 'scheduled' if job_id else 'deferred')) | ||
| 81 | |||
| 82 | if scheduled and job_id: | ||
| 83 | # TODO: here we create a new socket just so we can unschedule the | ||
| 84 | # debunce scheduled job. This can be removed once the `run_x` feature | ||
| 85 | # is implemented that lets you specify how many times a scheduled job | ||
| 86 | # should run before stopping. | ||
| 87 | socket = Sender() | ||
| 88 | socket.connect(addr=conf.FRONTEND_ADDR) | ||
| 89 | schedule(socket, _debounce_run_deferred_job, | ||
| 90 | queue=queue, unschedule=True, class_args=(job_id, )) | ||
| 91 | socket.zsocket.close() | ||
| 92 | |||
| 93 | del socket | ||
| 94 | |||
| 95 | try: | ||
| 96 | redis_connection = redis.StrictRedis( | ||
| 97 | host=conf.RQ_HOST, | ||
| 98 | port=conf.RQ_PORT, | ||
| 99 | db=conf.RQ_DB, | ||
| 100 | password=conf.RQ_PASSWORD) | ||
| 101 | except Exception as e: | ||
| 102 | logger.error('DEBOUNCE: {}.{} - requested, but could not connect ' | ||
| 103 | 'to redis server'.format(path, callable_name, e)) | ||
| 104 | redis_connection.delete(cache_key1) | ||
| 105 | if scheduled: | ||
| 106 | redis_connection.delete(cache_key2) | ||
| 107 | return | ||
| 108 | |||
| 109 | # if the timer countdown caused this function to run, and `cache_key1` is | ||
| 110 | # set, bail out. Otherwise, continue, because the cache is set prior to | ||
| 111 | # deferring the function. | ||
| 112 | if redis_connection.get(cache_key1) and scheduled: | ||
| 113 | logger.debug('DEBOUNCE: {}.{} - cache_key1 was set, not ' | ||
| 114 | 'running job.'.format(path, callable_name)) | ||
| 115 | return | ||
| 116 | |||
| 117 | redis_connection.set(cache_key1, 1) | ||
| 118 | redis_connection.expire( | ||
| 119 | cache_key1, getattr(conf, 'DEBOUNCE_CACHE_KEY1_TIMEOUT', 60)) | ||
| 120 | |||
| 121 | try: | ||
| 122 | run_function( | ||
| 123 | path, callable_name, class_args, class_kwargs, *args, **kwargs) | ||
| 124 | except Exception as e: | ||
| 125 | logger.error('DEBOUNCE: {}.{} - exception {}'.format( | ||
| 126 | path, callable_name, e)) | ||
| 127 | |||
| 128 | redis_connection.delete(cache_key1) | ||
| 129 | if scheduled: | ||
| 130 | redis_connection.delete(cache_key2) | ||
| 131 | |||
| 132 | |||
| 133 | def _debounce_deferred_job(context): | ||
| 134 | """Debounce the job. | ||
| 135 | |||
| 136 | context keys: | ||
| 137 | path (str): The path of the callable function (the module information) | ||
| 138 | callable_name (str): The name of the callable | ||
| 139 | args (list): The arguments to pass when invoking the callable | ||
| 140 | kwargs (dict): The arguments to pass when invoking the callable | ||
| 141 | class_args (list): Arguments to use when initializing the callable | ||
| 142 | class (if it is a class) | ||
| 143 | class_kwargs (dict): The arguments to use when initializing the | ||
| 144 | callable class (if it is a class) | ||
| 145 | queue (str): The queue name for this job | ||
| 146 | debounce_secs (int): Debounce schedule interval, in seconds | ||
| 147 | |||
| 148 | Args: | ||
| 149 | context (dict): See "context keys" above. | ||
| 150 | """ | ||
| 151 | from ..sender import Sender | ||
| 152 | from ..utils.functions import arguments_hash | ||
| 153 | from .messages import schedule | ||
| 154 | |||
| 155 | job_id = arguments_hash(locals()) | ||
| 156 | context = copy.deepcopy(context) # don't modify caller's context | ||
| 157 | path = context.get('path') | ||
| 158 | callable_name = context.get('callable_name') | ||
| 159 | debounce_secs = context.pop('debounce_secs') | ||
| 160 | |||
| 161 | if not redis: | ||
| 162 | logger.error('DEBOUNCE requested, but redis is not available.') | ||
| 163 | return | ||
| 164 | |||
| 165 | try: | ||
| 166 | redis_connection = redis.StrictRedis( | ||
| 167 | host=conf.RQ_HOST, | ||
| 168 | port=conf.RQ_PORT, | ||
| 169 | db=conf.RQ_DB, | ||
| 170 | password=conf.RQ_PASSWORD) | ||
| 171 | except Exception as e: | ||
| 172 | logger.error('Debounce requested, but could not connect to ' | ||
| 173 | 'redis server: {}.'.format(e)) | ||
| 174 | return | ||
| 175 | |||
| 176 | cache_key1 = '{}_key1'.format(job_id) | ||
| 177 | cache_key2 = '{}_key2'.format(job_id) | ||
| 178 | |||
| 179 | logger.debug('DEBOUNCE: {}.{} - job id: {}'.format( | ||
| 180 | path, callable_name, job_id)) | ||
| 181 | |||
| 182 | context.update({ | ||
| 183 | 'cache_key1': cache_key1, | ||
| 184 | 'cache_key2': cache_key2, | ||
| 185 | 'scheduled': False, | ||
| 186 | }) | ||
| 187 | |||
| 188 | if not redis_connection.get(cache_key1): | ||
| 189 | logger.debug('DEBOUNCE: {}.{} - cache_key1 was not set, ' | ||
| 190 | 'deferring immediately.'.format(path, callable_name)) | ||
| 191 | redis_connection.set(cache_key1, 1) | ||
| 192 | redis_connection.expire( | ||
| 193 | cache_key1, getattr(conf, 'DEBOUNCE_CACHE_KEY1_TIMEOUT', 60)) | ||
| 194 | |||
| 195 | _debounce_run_deferred_job(context) | ||
| 196 | return | ||
| 197 | else: | ||
| 198 | logger.debug( | ||
| 199 | 'DEBOUNCE: {}.{} - cache_key1 was set, ' | ||
| 200 | 'trying cache_key2...'.format(path, callable_name)) | ||
| 201 | |||
| 202 | if not redis_connection.get(cache_key2): | ||
| 203 | logger.debug('DEBOUNCE: {}.{} - cache_key2 was not set, scheduling ' | ||
| 204 | 'for {} seconds'.format(path, callable_name, | ||
| 205 | debounce_secs)) | ||
| 206 | redis_connection.set(cache_key2, 1) | ||
| 207 | redis_connection.expire(cache_key2, debounce_secs) | ||
| 208 | |||
| 209 | context['job_id'] = job_id | ||
| 210 | context['scheduled'] = True | ||
| 211 | |||
| 212 | # TODO: here we create a new socket just so we can schedule the | ||
| 213 | # debunce scheduled job. This should probably use the socket that was | ||
| 214 | # passed to the original `defer_job` call, however, this is not | ||
| 215 | # currently possible in eventmq. | ||
| 216 | socket = Sender() | ||
| 217 | socket.connect(addr=conf.FRONTEND_ADDR) | ||
| 218 | |||
| 219 | schedule( | ||
| 220 | socket=socket, | ||
| 221 | func=_debounce_run_deferred_job, | ||
| 222 | queue=context.get('queue'), | ||
| 223 | interval_secs=debounce_secs, | ||
| 224 | kwargs={'context': context}, | ||
| 225 | headers=('nohaste', 'guarantee'), | ||
| 226 | class_args=(job_id, ), | ||
| 227 | ) | ||
| 228 | socket.zsocket.close() | ||
| 229 | |||
| 230 | del socket | ||
| 231 | return | ||
| 232 | else: | ||
| 233 | logger.debug( | ||
| 234 | 'DEBOUNCE: {}.{} - cache_key2 was set, dropping message. '.format( | ||
| 235 | path, callable_name)) | ||
| 236 | |||
| 237 | |||
| 238 | def _debounce_schedule( | ||
| 239 | socket, path, callable_name, debounce_secs, args=(), kwargs=None, | ||
| 240 | class_args=(), class_kwargs=None, reply_requested=False, | ||
| 241 | guarantee=False, retry_count=0, queue=conf.DEFAULT_QUEUE_NAME): | ||
| 242 | """Schedule the initial debounce. | ||
| 243 | |||
| 244 | Debounce works like this: | ||
| 245 | |||
| 246 | 1. When the first job comes in, `cache_key1` will not be set, so it | ||
| 247 | will defer the job to run immediately. At the beginning of that job, | ||
| 248 | it will set `cache_key1`, and at the end, it will unset `cache_key1`. | ||
| 249 | |||
| 250 | 2. If `cache_key1` is set and a new job comes in, if `cache_key2` is | ||
| 251 | not set, it will set `cache_key2` and schedule the job to run | ||
| 252 | `debounce_secs` in the future. | ||
| 253 | |||
| 254 | 3. If a third job comes in while `cache_key1` and `cache_key2` are | ||
| 255 | set, it will be ignored. | ||
| 256 | |||
| 257 | Args: | ||
| 258 | path (str): The path of the callable function (the module information) | ||
| 259 | callable_name (str): The name of the callable | ||
| 260 | debounce_secs (int): Debounce schedule interval, in seconds | ||
| 261 | reply_requested (bool): request the return value of func as a reply | ||
| 262 | retry_count (int): How many times should be retried when encountering | ||
| 263 | an Exception or some other failure before giving up. (default: 0 | ||
| 264 | or immediately fail) | ||
| 265 | queue (str): Name of queue to use when executing the job. If this value | ||
| 266 | evaluates to False, the default is used. Default: is configured | ||
| 267 | default queue name | ||
| 268 | """ | ||
| 269 | |||
| 270 | from .messages import defer_job | ||
| 271 | |||
| 272 | logger.debug('DEBOUNCE: {}.{} - initial schedule'.format( | ||
| 273 | path, callable_name)) | ||
| 274 | |||
| 275 | context = { | ||
| 276 | 'path': path, | ||
| 277 | 'callable_name': callable_name, | ||
| 278 | 'args': args, | ||
| 279 | 'kwargs': kwargs, | ||
| 280 | 'class_args': class_args, | ||
| 281 | 'class_kwargs': class_kwargs, | ||
| 282 | 'queue': queue, | ||
| 283 | 'scheduled': False, | ||
| 284 | 'debounce_secs': debounce_secs, | ||
| 285 | } | ||
| 286 | |||
| 287 | defer_job( | ||
| 288 | socket, | ||
| 289 | _debounce_deferred_job, | ||
| 290 | reply_requested=reply_requested, | ||
| 291 | retry_count=retry_count, | ||
| 292 | queue=queue, | ||
| 293 | guarantee=guarantee, | ||
| 294 | kwargs={'context': context}, | ||
| 295 | ) | ||
diff --git a/eventmq/client/messages.py b/eventmq/client/messages.py index cd26b3f..e589d4d 100644 --- a/eventmq/client/messages.py +++ b/eventmq/client/messages.py | |||
| @@ -16,21 +16,16 @@ | |||
| 16 | :mod:`messages` -- Client Messaging | 16 | :mod:`messages` -- Client Messaging |
| 17 | =================================== | 17 | =================================== |
| 18 | """ | 18 | """ |
| 19 | import inspect | ||
| 20 | import logging | 19 | import logging |
| 21 | import importlib | ||
| 22 | from json import dumps as serialize | 20 | from json import dumps as serialize |
| 23 | 21 | ||
| 24 | from .. import conf | 22 | from .. import conf |
| 25 | from ..utils.messages import send_emqp_message | 23 | from ..utils.messages import send_emqp_message |
| 24 | from ..utils.functions import path_from_callable | ||
| 26 | 25 | ||
| 27 | logger = logging.getLogger(__name__) | 26 | logger = logging.getLogger(__name__) |
| 28 | 27 | ||
| 29 | 28 | ||
| 30 | class CallableFromPathError(Exception): | ||
| 31 | pass | ||
| 32 | |||
| 33 | |||
| 34 | def schedule(socket, func, interval_secs=None, args=(), kwargs=None, | 29 | def schedule(socket, func, interval_secs=None, args=(), kwargs=None, |
| 35 | class_args=(), class_kwargs=None, headers=('guarantee',), | 30 | class_args=(), class_kwargs=None, headers=('guarantee',), |
| 36 | queue=conf.DEFAULT_QUEUE_NAME, unschedule=False, cron=None): | 31 | queue=conf.DEFAULT_QUEUE_NAME, unschedule=False, cron=None): |
| @@ -111,9 +106,10 @@ def schedule(socket, func, interval_secs=None, args=(), kwargs=None, | |||
| 111 | return msgid | 106 | return msgid |
| 112 | 107 | ||
| 113 | 108 | ||
| 114 | def defer_job(socket, func, args=(), kwargs=None, class_args=(), | 109 | def defer_job( |
| 115 | class_kwargs=None, reply_requested=False, guarantee=False, | 110 | socket, func, args=(), kwargs=None, class_args=(), class_kwargs=None, |
| 116 | retry_count=0, queue=conf.DEFAULT_QUEUE_NAME): | 111 | reply_requested=False, guarantee=False, retry_count=0, |
| 112 | debounce_secs=False, queue=conf.DEFAULT_QUEUE_NAME): | ||
| 117 | """ | 113 | """ |
| 118 | Used to send a job to a worker to execute via `socket`. | 114 | Used to send a job to a worker to execute via `socket`. |
| 119 | 115 | ||
| @@ -130,11 +126,11 @@ def defer_job(socket, func, args=(), kwargs=None, class_args=(), | |||
| 130 | class_kwargs (dict): dict of **kwargs to pass to the class when | 126 | class_kwargs (dict): dict of **kwargs to pass to the class when |
| 131 | initializing (if applicable). | 127 | initializing (if applicable). |
| 132 | reply_requested (bool): request the return value of func as a reply | 128 | reply_requested (bool): request the return value of func as a reply |
| 133 | guarantee (bool): (Give your best effort) to guarantee that func is | ||
| 134 | executed. Exceptions and things will be logged. | ||
| 135 | retry_count (int): How many times should be retried when encountering | 129 | retry_count (int): How many times should be retried when encountering |
| 136 | an Exception or some other failure before giving up. (default: 0 | 130 | an Exception or some other failure before giving up. (default: 0 |
| 137 | or immediately fail) | 131 | or immediately fail) |
| 132 | debounce_secs (secs): Number of seconds to debounce the job. See | ||
| 133 | `debounce_deferred_job` for more information. | ||
| 138 | queue (str): Name of queue to use when executing the job. If this value | 134 | queue (str): Name of queue to use when executing the job. If this value |
| 139 | evaluates to False, the default is used. Default: is configured | 135 | evaluates to False, the default is used. Default: is configured |
| 140 | default queue name | 136 | default queue name |
| @@ -142,6 +138,8 @@ def defer_job(socket, func, args=(), kwargs=None, class_args=(), | |||
| 142 | str: ID for the message/deferred job. This value will be None if there | 138 | str: ID for the message/deferred job. This value will be None if there |
| 143 | was an error. | 139 | was an error. |
| 144 | """ | 140 | """ |
| 141 | from eventmq.client import debounce | ||
| 142 | |||
| 145 | callable_name = None | 143 | callable_name = None |
| 146 | path = None | 144 | path = None |
| 147 | 145 | ||
| @@ -172,6 +170,25 @@ def defer_job(socket, func, args=(), kwargs=None, class_args=(), | |||
| 172 | format(func.__name__)) | 170 | format(func.__name__)) |
| 173 | return | 171 | return |
| 174 | 172 | ||
| 173 | if debounce_secs: | ||
| 174 | logger.debug('DEBOUNCE: {} - called, debounce_secs: {}...'.format( | ||
| 175 | func.__name__, | ||
| 176 | debounce_secs)) | ||
| 177 | debounce._debounce_schedule( | ||
| 178 | socket=socket, | ||
| 179 | path=path, | ||
| 180 | callable_name=callable_name, | ||
| 181 | debounce_secs=debounce_secs, | ||
| 182 | args=args, | ||
| 183 | kwargs=kwargs, | ||
| 184 | class_args=class_args, | ||
| 185 | class_kwargs=class_kwargs, | ||
| 186 | reply_requested=reply_requested, | ||
| 187 | guarantee=guarantee, | ||
| 188 | retry_count=retry_count, | ||
| 189 | queue=queue) | ||
| 190 | return | ||
| 191 | |||
| 175 | msg = ['run', { | 192 | msg = ['run', { |
| 176 | 'callable': callable_name, | 193 | 'callable': callable_name, |
| 177 | 'path': path, | 194 | 'path': path, |
| @@ -190,100 +207,6 @@ def defer_job(socket, func, args=(), kwargs=None, class_args=(), | |||
| 190 | return msgid | 207 | return msgid |
| 191 | 208 | ||
| 192 | 209 | ||
| 193 | def path_from_callable(func): | ||
| 194 | """ | ||
| 195 | Builds the module path in string format for a callable. | ||
| 196 | |||
| 197 | .. note: | ||
| 198 | To use a callable Object, pass Class.__call__ as func and provide any | ||
| 199 | class_args/class_kwargs. This is so side effects from pickling won't | ||
| 200 | occur. | ||
| 201 | |||
| 202 | Args: | ||
| 203 | func (callable): The function or method to build the path for | ||
| 204 | |||
| 205 | Returns: | ||
| 206 | list: (import path (w/ class seperated by a ':'), callable name) or | ||
| 207 | (None, None) on error. | ||
| 208 | """ | ||
| 209 | callable_name = None | ||
| 210 | |||
| 211 | path = None | ||
| 212 | # Methods also have the func_name property | ||
| 213 | if inspect.ismethod(func): | ||
| 214 | path = ("{}:{}".format(func.__module__, func.im_class.__name__)) | ||
| 215 | callable_name = func.func_name | ||
| 216 | elif inspect.isfunction(func): | ||
| 217 | path = func.__module__ | ||
| 218 | callable_name = func.func_name | ||
| 219 | else: | ||
| 220 | # We should account for another callable type so log information | ||
| 221 | # about it | ||
| 222 | if hasattr(func, '__class__') and isinstance(func, func.__class__): | ||
| 223 | func_type = 'instanceobject' | ||
| 224 | else: | ||
| 225 | func_type = type(func) | ||
| 226 | |||
| 227 | logger.error('Encountered unknown callable ({}) type {}'.format( | ||
| 228 | func, | ||
| 229 | func_type | ||
| 230 | )) | ||
| 231 | return None, None | ||
| 232 | |||
| 233 | return path, callable_name | ||
| 234 | |||
| 235 | |||
| 236 | def callable_from_path(path, callable_name, *args, **kwargs): | ||
| 237 | """Build a callable from a path and callable_name. | ||
| 238 | |||
| 239 | This function is the opposite of `path_from_callable`. It takes what is | ||
| 240 | returned from `build_module_name` and converts it back to the original | ||
| 241 | caller. | ||
| 242 | |||
| 243 | Args: | ||
| 244 | path (str): The module path of the callable. This is the first | ||
| 245 | position in the tuple returned from `path_from_callable`. | ||
| 246 | callable_name (str): The name of the function. This is the second | ||
| 247 | position of the tuple returned from `path_from_callable`. | ||
| 248 | *args (list): if `callable_name` is a method on a class, these | ||
| 249 | arguments will be passed to the constructor when instantiating the | ||
| 250 | class. | ||
| 251 | *kwargs (dict): if `callable_name` is a method on a class, these | ||
| 252 | arguments will be passed to the constructor when instantiating the | ||
| 253 | class. | ||
| 254 | |||
| 255 | Returns: | ||
| 256 | function: The callable | ||
| 257 | """ | ||
| 258 | if ':' in path: | ||
| 259 | _pksplit = path.split(':') | ||
| 260 | s_package = _pksplit[0] | ||
| 261 | s_cls = _pksplit[1] | ||
| 262 | else: | ||
| 263 | s_package = path | ||
| 264 | s_cls = None | ||
| 265 | |||
| 266 | try: | ||
| 267 | package = importlib.import_module(s_package) | ||
| 268 | reload(package) | ||
| 269 | except Exception as e: | ||
| 270 | raise CallableFromPathError(str(e)) | ||
| 271 | |||
| 272 | if s_cls: | ||
| 273 | cls = getattr(package, s_cls) | ||
| 274 | |||
| 275 | obj = cls(*args, **kwargs) | ||
| 276 | else: | ||
| 277 | obj = package | ||
| 278 | |||
| 279 | try: | ||
| 280 | callable_ = getattr(obj, callable_name) | ||
| 281 | except AttributeError as e: | ||
| 282 | raise CallableFromPathError(str(e)) | ||
| 283 | |||
| 284 | return callable_ | ||
| 285 | |||
| 286 | |||
| 287 | def send_request(socket, message, reply_requested=False, guarantee=False, | 210 | def send_request(socket, message, reply_requested=False, guarantee=False, |
| 288 | retry_count=0, queue=None): | 211 | retry_count=0, queue=None): |
| 289 | """ | 212 | """ |
diff --git a/eventmq/conf.py b/eventmq/conf.py index 0c4b7cc..59c8f90 100644 --- a/eventmq/conf.py +++ b/eventmq/conf.py | |||
| @@ -20,7 +20,7 @@ | |||
| 20 | #: SUPER_DEBUG basically enables more debugging logs. Specifically the messages | 20 | #: SUPER_DEBUG basically enables more debugging logs. Specifically the messages |
| 21 | #: at different levels in the application. | 21 | #: at different levels in the application. |
| 22 | #: Default: False | 22 | #: Default: False |
| 23 | SUPER_DEBUG = False | 23 | SUPER_DEBUG = True |
| 24 | 24 | ||
| 25 | #: Don't show HEARTBEAT message when debug logging is enabled | 25 | #: Don't show HEARTBEAT message when debug logging is enabled |
| 26 | #: Default: True | 26 | #: Default: True |
diff --git a/eventmq/exceptions.py b/eventmq/exceptions.py index 955709f..1277cd2 100644 --- a/eventmq/exceptions.py +++ b/eventmq/exceptions.py | |||
| @@ -55,3 +55,10 @@ class UnknownQueueError(EventMQError): | |||
| 55 | """ | 55 | """ |
| 56 | Raised when a queue is not found in the internal list of queues. | 56 | Raised when a queue is not found in the internal list of queues. |
| 57 | """ | 57 | """ |
| 58 | |||
| 59 | |||
| 60 | class CallableFromPathError(EventMQError): | ||
| 61 | """ | ||
| 62 | Raised when construction of a callable from a path and callable_name fails. | ||
| 63 | """ | ||
| 64 | pass | ||
diff --git a/eventmq/tests/test_client_messages.py b/eventmq/tests/test_client_messages.py index 49d7ca2..3337987 100644 --- a/eventmq/tests/test_client_messages.py +++ b/eventmq/tests/test_client_messages.py | |||
| @@ -105,7 +105,7 @@ class TestCase(unittest.TestCase): | |||
| 105 | ('eventmq.client.messages', | 105 | ('eventmq.client.messages', |
| 106 | 'ERROR', | 106 | 'ERROR', |
| 107 | 'Encountered callable with no __module__ path nameless_func'), | 107 | 'Encountered callable with no __module__ path nameless_func'), |
| 108 | ('eventmq.client.messages', | 108 | ('eventmq.utils.functions', |
| 109 | 'ERROR', | 109 | 'ERROR', |
| 110 | 'Encountered unknown callable ({}) type instanceobject'. | 110 | 'Encountered unknown callable ({}) type instanceobject'. |
| 111 | format(callable_obj)), | 111 | format(callable_obj)), |
| @@ -210,7 +210,7 @@ class TestCase(unittest.TestCase): | |||
| 210 | ('eventmq.client.messages', | 210 | ('eventmq.client.messages', |
| 211 | 'ERROR', | 211 | 'ERROR', |
| 212 | 'Encountered callable with no __module__ path nameless_func'), | 212 | 'Encountered callable with no __module__ path nameless_func'), |
| 213 | ('eventmq.client.messages', | 213 | ('eventmq.utils.functions', |
| 214 | 'ERROR', | 214 | 'ERROR', |
| 215 | 'Encountered unknown callable ({}) type instanceobject'. | 215 | 'Encountered unknown callable ({}) type instanceobject'. |
| 216 | format(callable_obj)), | 216 | format(callable_obj)), |
diff --git a/eventmq/tests/test_debounce.py b/eventmq/tests/test_debounce.py new file mode 100644 index 0000000..15008ab --- /dev/null +++ b/eventmq/tests/test_debounce.py | |||
| @@ -0,0 +1,134 @@ | |||
| 1 | import uuid | ||
| 2 | import unittest | ||
| 3 | import mock | ||
| 4 | import redis | ||
| 5 | |||
| 6 | from .. import conf | ||
| 7 | |||
| 8 | from eventmq.client import messages | ||
| 9 | |||
| 10 | |||
| 11 | def test_job_function(): | ||
| 12 | return 'test_job_function' | ||
| 13 | |||
| 14 | |||
| 15 | class TestCase(unittest.TestCase): | ||
| 16 | def setUp(self): | ||
| 17 | super(TestCase, self).setUp() | ||
| 18 | self.redis = redis.StrictRedis( | ||
| 19 | host=conf.RQ_HOST, | ||
| 20 | port=conf.RQ_PORT, | ||
| 21 | db=conf.RQ_DB, | ||
| 22 | password=conf.RQ_PASSWORD) | ||
| 23 | self.socket = mock.Mock() | ||
| 24 | |||
| 25 | self.job_id = str(uuid.uuid4()) | ||
| 26 | self.arguments_hash_mock = mock.patch( | ||
| 27 | 'eventmq.utils.functions.arguments_hash', lambda *x: self.job_id) | ||
| 28 | self.arguments_hash_mock.start() | ||
| 29 | |||
| 30 | def tearDown(self): | ||
| 31 | cache_key1 = '{}_key1'.format(self.job_id) | ||
| 32 | cache_key2 = '{}_key2'.format(self.job_id) | ||
| 33 | self.redis.delete(cache_key1) | ||
| 34 | self.redis.delete(cache_key2) | ||
| 35 | |||
| 36 | self.arguments_hash_mock.stop() | ||
| 37 | |||
| 38 | super(TestCase, self).tearDown() | ||
| 39 | |||
| 40 | @mock.patch('eventmq.client.debounce._debounce_schedule') | ||
| 41 | def test_debounce_initial_call(self, debounce_schedule_mock): | ||
| 42 | messages.defer_job( | ||
| 43 | self.socket, | ||
| 44 | test_job_function, | ||
| 45 | debounce_secs=1) | ||
| 46 | |||
| 47 | debounce_schedule_mock.assert_called_with( | ||
| 48 | path='eventmq.tests.test_debounce', | ||
| 49 | callable_name='test_job_function', | ||
| 50 | args=(), | ||
| 51 | kwargs={}, | ||
| 52 | class_args=(), | ||
| 53 | class_kwargs={}, | ||
| 54 | queue='default', | ||
| 55 | reply_requested=False, | ||
| 56 | retry_count=0, | ||
| 57 | guarantee=False, | ||
| 58 | debounce_secs=1, | ||
| 59 | socket=self.socket) | ||
| 60 | |||
| 61 | @mock.patch('eventmq.client.debounce._debounce_run_deferred_job') | ||
| 62 | def test_debounce_step1(self, run_deferred_job_mock): | ||
| 63 | """Should call through to _debounce_run_deferred_job.""" | ||
| 64 | from eventmq.client import debounce | ||
| 65 | from eventmq.utils.functions import path_from_callable | ||
| 66 | |||
| 67 | path, callable_name = path_from_callable(test_job_function) | ||
| 68 | |||
| 69 | context = { | ||
| 70 | 'job_id': self.job_id, | ||
| 71 | 'cache_key1': '{}_key1'.format(self.job_id), | ||
| 72 | 'cache_key2': '{}_key2'.format(self.job_id), | ||
| 73 | 'path': path, | ||
| 74 | 'callable_name': callable_name, | ||
| 75 | 'queue': 'default', | ||
| 76 | 'debounce_secs': 1, | ||
| 77 | 'scheduled': False, | ||
| 78 | } | ||
| 79 | |||
| 80 | debounce._debounce_deferred_job(context) | ||
| 81 | |||
| 82 | context.pop('debounce_secs', None) | ||
| 83 | |||
| 84 | run_deferred_job_mock.assert_called_with(context) | ||
| 85 | |||
| 86 | @mock.patch('eventmq.sender.Sender') | ||
| 87 | @mock.patch('eventmq.client.debounce._debounce_run_deferred_job') | ||
| 88 | @mock.patch('eventmq.client.messages.schedule') | ||
| 89 | def test_debounce_step2(self, schedule_mock, run_deferred_job_mock, | ||
| 90 | sender_mock): | ||
| 91 | """Should call through to _debounce_run_deferred_job.""" | ||
| 92 | from eventmq.client import debounce | ||
| 93 | from eventmq.utils.functions import path_from_callable | ||
| 94 | |||
| 95 | sender_mock.return_value = self.socket | ||
| 96 | |||
| 97 | path, callable_name = path_from_callable(test_job_function) | ||
| 98 | |||
| 99 | context = { | ||
| 100 | 'job_id': self.job_id, | ||
| 101 | 'cache_key1': '{}_key1'.format(self.job_id), | ||
| 102 | 'cache_key2': '{}_key2'.format(self.job_id), | ||
| 103 | 'path': path, | ||
| 104 | 'callable_name': callable_name, | ||
| 105 | 'queue': 'default', | ||
| 106 | 'debounce_secs': 1, | ||
| 107 | 'scheduled': False, | ||
| 108 | } | ||
| 109 | |||
| 110 | debounce._debounce_deferred_job(context) | ||
| 111 | |||
| 112 | context.pop('debounce_secs', None) | ||
| 113 | |||
| 114 | run_deferred_job_mock.assert_called_with(context) | ||
| 115 | run_deferred_job_mock.reset_mock() | ||
| 116 | |||
| 117 | # this time, since we already went through step1, it should schedule a | ||
| 118 | # job for step2 | ||
| 119 | context['debounce_secs'] = 1 | ||
| 120 | debounce._debounce_deferred_job(context) | ||
| 121 | |||
| 122 | run_deferred_job_mock.assert_not_called() | ||
| 123 | |||
| 124 | context['scheduled'] = True | ||
| 125 | context.pop('debounce_secs', None) | ||
| 126 | |||
| 127 | schedule_mock.assert_called_with( | ||
| 128 | socket=self.socket, | ||
| 129 | func=run_deferred_job_mock, | ||
| 130 | headers=('nohaste', 'guarantee'), | ||
| 131 | class_args=(self.job_id,), | ||
| 132 | interval_secs=1, | ||
| 133 | kwargs={'context': context}, | ||
| 134 | queue='default') | ||
diff --git a/eventmq/utils/functions.py b/eventmq/utils/functions.py new file mode 100644 index 0000000..0bd1ba9 --- /dev/null +++ b/eventmq/utils/functions.py | |||
| @@ -0,0 +1,151 @@ | |||
| 1 | import json | ||
| 2 | import hashlib | ||
| 3 | import importlib | ||
| 4 | import inspect | ||
| 5 | |||
| 6 | from .. import log | ||
| 7 | from ..exceptions import CallableFromPathError | ||
| 8 | |||
| 9 | logger = log.setup_logger(__name__) | ||
| 10 | |||
| 11 | |||
| 12 | class IgnoreJSONEncoder(json.JSONEncoder): | ||
| 13 | """JSON Encoder that ignores unknown keys.""" | ||
| 14 | def default(self, obj): | ||
| 15 | try: | ||
| 16 | return super(IgnoreJSONEncoder, self).default(obj) | ||
| 17 | except TypeError: | ||
| 18 | return None | ||
| 19 | |||
| 20 | |||
| 21 | def run_function(path, callable_name, | ||
| 22 | class_args=(), class_kwargs=None, *args, **kwargs): | ||
| 23 | """Constructs a callable from `path` and `callable_name` and calls it. | ||
| 24 | |||
| 25 | Args: | ||
| 26 | class_args (list): If the callable is a class method, these args will | ||
| 27 | be used to initialize the class. | ||
| 28 | class_kwargs (dict): If the callable is a class method, these kwargs | ||
| 29 | will be used to initialize the class. | ||
| 30 | args (list): These arguments will be passed as parameters to the | ||
| 31 | callable | ||
| 32 | kwargs (dict): These keyword arguments will be passed as parameters to | ||
| 33 | the callable. | ||
| 34 | |||
| 35 | Return: | ||
| 36 | object: Whatever is returned by calling the callable will be returned | ||
| 37 | from this function. | ||
| 38 | """ | ||
| 39 | try: | ||
| 40 | callable_ = callable_from_path( | ||
| 41 | path, callable_name, *class_args, **class_kwargs) | ||
| 42 | except CallableFromPathError as e: | ||
| 43 | logger.exception('Error importing callable {}.{}: {}'.format( | ||
| 44 | path, callable_name, str(e))) | ||
| 45 | return | ||
| 46 | |||
| 47 | return callable_(*args, **kwargs) | ||
| 48 | |||
| 49 | |||
| 50 | def arguments_hash(*args, **kwargs): | ||
| 51 | """Takes `args` and `kwargs` and creates a unique identifier.""" | ||
| 52 | args = { | ||
| 53 | 'args': args, | ||
| 54 | 'kwargs': kwargs, | ||
| 55 | } | ||
| 56 | |||
| 57 | data = json.dumps(args, cls=IgnoreJSONEncoder) | ||
| 58 | return hashlib.sha1(data).hexdigest() | ||
| 59 | |||
| 60 | |||
| 61 | def path_from_callable(func): | ||
| 62 | """ | ||
| 63 | Builds the module path in string format for a callable. | ||
| 64 | |||
| 65 | .. note: | ||
| 66 | To use a callable Object, pass Class.__call__ as func and provide any | ||
| 67 | class_args/class_kwargs. This is so side effects from pickling won't | ||
| 68 | occur. | ||
| 69 | |||
| 70 | Args: | ||
| 71 | func (callable): The function or method to build the path for | ||
| 72 | |||
| 73 | Returns: | ||
| 74 | list: (import path (w/ class seperated by a ':'), callable name) or | ||
| 75 | (None, None) on error. | ||
| 76 | """ | ||
| 77 | callable_name = None | ||
| 78 | |||
| 79 | path = None | ||
| 80 | # Methods also have the func_name property | ||
| 81 | if inspect.ismethod(func): | ||
| 82 | path = ("{}:{}".format(func.__module__, func.im_class.__name__)) | ||
| 83 | callable_name = func.func_name | ||
| 84 | elif inspect.isfunction(func): | ||
| 85 | path = func.__module__ | ||
| 86 | callable_name = func.func_name | ||
| 87 | else: | ||
| 88 | # We should account for another callable type so log information | ||
| 89 | # about it | ||
| 90 | if hasattr(func, '__class__') and isinstance(func, func.__class__): | ||
| 91 | func_type = 'instanceobject' | ||
| 92 | else: | ||
| 93 | func_type = type(func) | ||
| 94 | |||
| 95 | logger.error('Encountered unknown callable ({}) type {}'.format( | ||
| 96 | func, | ||
| 97 | func_type | ||
| 98 | )) | ||
| 99 | return None, None | ||
| 100 | |||
| 101 | return path, callable_name | ||
| 102 | |||
| 103 | |||
| 104 | def callable_from_path(path, callable_name, *args, **kwargs): | ||
| 105 | """Build a callable from a path and callable_name. | ||
| 106 | |||
| 107 | This function is the opposite of `path_from_callable`. It takes what is | ||
| 108 | returned from `build_module_name` and converts it back to the original | ||
| 109 | caller. | ||
| 110 | |||
| 111 | Args: | ||
| 112 | path (str): The module path of the callable. This is the first | ||
| 113 | position in the tuple returned from `path_from_callable`. | ||
| 114 | callable_name (str): The name of the function. This is the second | ||
| 115 | position of the tuple returned from `path_from_callable`. | ||
| 116 | *args (list): if `callable_name` is a method on a class, these | ||
| 117 | arguments will be passed to the constructor when instantiating the | ||
| 118 | class. | ||
| 119 | *kwargs (dict): if `callable_name` is a method on a class, these | ||
| 120 | arguments will be passed to the constructor when instantiating the | ||
| 121 | class. | ||
| 122 | |||
| 123 | Returns: | ||
| 124 | function: The callable | ||
| 125 | """ | ||
| 126 | if ':' in path: | ||
| 127 | _pksplit = path.split(':') | ||
| 128 | s_package = _pksplit[0] | ||
| 129 | s_cls = _pksplit[1] | ||
| 130 | else: | ||
| 131 | s_package = path | ||
| 132 | s_cls = None | ||
| 133 | |||
| 134 | try: | ||
| 135 | package = importlib.import_module(s_package) | ||
| 136 | reload(package) | ||
| 137 | except Exception as e: | ||
| 138 | raise CallableFromPathError(str(e)) | ||
| 139 | |||
| 140 | if s_cls: | ||
| 141 | cls = getattr(package, s_cls) | ||
| 142 | obj = cls(*args, **kwargs) | ||
| 143 | else: | ||
| 144 | obj = package | ||
| 145 | |||
| 146 | try: | ||
| 147 | callable_ = getattr(obj, callable_name) | ||
| 148 | except AttributeError as e: | ||
| 149 | raise CallableFromPathError(str(e)) | ||
| 150 | |||
| 151 | return callable_ | ||
diff --git a/eventmq/worker.py b/eventmq/worker.py index 852c7da..6b1773b 100644 --- a/eventmq/worker.py +++ b/eventmq/worker.py | |||
| @@ -17,7 +17,7 @@ | |||
| 17 | =============================== | 17 | =============================== |
| 18 | Defines different short-lived workers that execute jobs | 18 | Defines different short-lived workers that execute jobs |
| 19 | """ | 19 | """ |
| 20 | from client.messages import callable_from_path, CallableFromPathError | 20 | from .utils.functions import run_function |
| 21 | 21 | ||
| 22 | # the run function is executed in a different process, so we need to set the | 22 | # the run function is executed in a different process, so we need to set the |
| 23 | # logger up. | 23 | # logger up. |
| @@ -37,27 +37,12 @@ def run(payload, msgid): | |||
| 37 | callable_name = payload.get('callable') | 37 | callable_name = payload.get('callable') |
| 38 | class_args = payload.get('class_args', tuple()) or tuple() | 38 | class_args = payload.get('class_args', tuple()) or tuple() |
| 39 | class_kwargs = payload.get('class_kwargs', dict()) or dict() | 39 | class_kwargs = payload.get('class_kwargs', dict()) or dict() |
| 40 | args = payload.get('args', tuple()) or tuple() | ||
| 41 | kwargs = payload.get('kwargs', dict()) or dict() | ||
| 40 | 42 | ||
| 41 | try: | 43 | try: |
| 42 | callable_ = callable_from_path( | 44 | r = run_function( |
| 43 | path, callable_name, *class_args, **class_kwargs) | 45 | path, callable_name, class_args, class_kwargs, *args, **kwargs) |
| 44 | except CallableFromPathError as e: | ||
| 45 | logger.exception('Error importing callable {}.{}: {}'.format( | ||
| 46 | path, callable_name, str(e))) | ||
| 47 | return (msgid, str(e)) | ||
| 48 | |||
| 49 | if "args" in payload: | ||
| 50 | args = payload["args"] | ||
| 51 | else: | ||
| 52 | args = () | ||
| 53 | |||
| 54 | if "kwargs" in payload: | ||
| 55 | kwargs = payload["kwargs"] | ||
| 56 | else: | ||
| 57 | kwargs = {} | ||
| 58 | |||
| 59 | try: | ||
| 60 | r = callable_(*args, **kwargs) | ||
| 61 | return (msgid, r) | 46 | return (msgid, r) |
| 62 | except Exception as e: | 47 | except Exception as e: |
| 63 | logger.exception(e) | 48 | logger.exception(e) |