diff options
| author | jason | 2017-01-25 16:55:30 -0700 |
|---|---|---|
| committer | jason | 2017-02-17 15:52:01 -0700 |
| commit | 0315ed45f74821de29d713d975a3ee3e7c86998c (patch) | |
| tree | fa0221d49e4672db1a681742d365237d815ea346 | |
| parent | 690edcf803703e23ae98c3f1d95771ac318e129e (diff) | |
| download | eventmq-0315ed45f74821de29d713d975a3ee3e7c86998c.tar.gz eventmq-0315ed45f74821de29d713d975a3ee3e7c86998c.zip | |
Add schedule and unschedule helper functions
- These functions will create the socket for the user
| -rw-r--r-- | eventmq/__init__.py | 2 | ||||
| -rw-r--r-- | eventmq/client/jobs.py | 108 | ||||
| -rw-r--r-- | eventmq/client/messages.py | 3 | ||||
| -rw-r--r-- | eventmq/exceptions.py | 6 |
4 files changed, 116 insertions, 3 deletions
diff --git a/eventmq/__init__.py b/eventmq/__init__.py index 5b8029b..091aeab 100644 --- a/eventmq/__init__.py +++ b/eventmq/__init__.py | |||
| @@ -4,4 +4,4 @@ __version__ = '0.3' | |||
| 4 | PROTOCOL_VERSION = 'eMQP/1.0' | 4 | PROTOCOL_VERSION = 'eMQP/1.0' |
| 5 | 5 | ||
| 6 | from .client.messages import defer_job # noqa | 6 | from .client.messages import defer_job # noqa |
| 7 | from .client.jobs import job # noqa | 7 | from .client.jobs import job, schedule, unschedule # noqa |
diff --git a/eventmq/client/jobs.py b/eventmq/client/jobs.py index 2f4f784..6b53ab1 100644 --- a/eventmq/client/jobs.py +++ b/eventmq/client/jobs.py | |||
| @@ -20,7 +20,9 @@ import logging | |||
| 20 | import os | 20 | import os |
| 21 | 21 | ||
| 22 | from . import messages | 22 | from . import messages |
| 23 | from .. import conf | ||
| 23 | from ..constants import ENV_BROKER_ADDR | 24 | from ..constants import ENV_BROKER_ADDR |
| 25 | from ..exceptions import ConnectionError | ||
| 24 | from ..sender import Sender | 26 | from ..sender import Sender |
| 25 | 27 | ||
| 26 | logger = logging.getLogger(__name__) | 28 | logger = logging.getLogger(__name__) |
| @@ -30,6 +32,10 @@ class Job(object): | |||
| 30 | """ | 32 | """ |
| 31 | Defines a deferred EventMQ job. | 33 | Defines a deferred EventMQ job. |
| 32 | 34 | ||
| 35 | .. note:: | ||
| 36 | |||
| 37 | All passed class & function kwargs/args MUST be json serializable. | ||
| 38 | |||
| 33 | Usage: | 39 | Usage: |
| 34 | 40 | ||
| 35 | .. code-block:: python | 41 | .. code-block:: python |
| @@ -68,6 +74,8 @@ class Job(object): | |||
| 68 | can set this to False. This is useful for unit tests. | 74 | can set this to False. This is useful for unit tests. |
| 69 | 75 | ||
| 70 | """ | 76 | """ |
| 77 | # conf.BROKER_ADDR isn't used because /etc/eventmq.conf is for the | ||
| 78 | # daemons. | ||
| 71 | self.broker_addr = broker_addr or os.environ.get(ENV_BROKER_ADDR) | 79 | self.broker_addr = broker_addr or os.environ.get(ENV_BROKER_ADDR) |
| 72 | self.queue = queue | 80 | self.queue = queue |
| 73 | self.async = async | 81 | self.async = async |
| @@ -104,3 +112,103 @@ def job(func, broker_addr=None, queue=None, async=True, *args, **kwargs): | |||
| 104 | return decorator(func) | 112 | return decorator(func) |
| 105 | else: | 113 | else: |
| 106 | return decorator | 114 | return decorator |
| 115 | |||
| 116 | |||
| 117 | def schedule(func, broker_addr=None, interval_secs=None, args=(), kwargs=None, | ||
| 118 | class_args=(), class_kwargs=None, headers=('guarantee',), | ||
| 119 | queue=conf.DEFAULT_QUEUE_NAME, cron=None): | ||
| 120 | """ | ||
| 121 | Execute a task on a defined interval. | ||
| 122 | |||
| 123 | .. note:: | ||
| 124 | |||
| 125 | All passed class & function kwargs/args MUST be json serializable. | ||
| 126 | |||
| 127 | Args: | ||
| 128 | func (callable): the callable (or string path to calable) to be | ||
| 129 | scheduled on a worker | ||
| 130 | broker_addr (str): Address of the broker to send the job to. If no | ||
| 131 | address is given then the value of the environment variable | ||
| 132 | ``EMQ_BROKER_ADDR`` will be used. | ||
| 133 | interval_secs (int): Run job every interval_secs or None if using cron | ||
| 134 | args (list): list of *args to pass to the callable | ||
| 135 | kwargs (dict): dict of **kwargs to pass to the callable | ||
| 136 | class_args (list): list of *args to pass to the class (if applicable) | ||
| 137 | class_kwargs (dict): dict of **kwargs to pass to the class (if | ||
| 138 | applicable) | ||
| 139 | headers (list): list of strings denoting enabled headers. Default: | ||
| 140 | guarantee is enabled to ensure the scheduler schedules the job. | ||
| 141 | queue (str): name of the queue to use when executing the job. The | ||
| 142 | default value is the default queue. | ||
| 143 | cron (string): cron formatted string used for job schedule if | ||
| 144 | interval_secs is None, i.e. '* * * * *' (every minute) | ||
| 145 | Raises: | ||
| 146 | TypeError: When one or more parameters are not JSON serializable. | ||
| 147 | Returns: | ||
| 148 | str: ID of the schedule message that was sent. None if there was an | ||
| 149 | error | ||
| 150 | """ | ||
| 151 | socket = Sender() | ||
| 152 | # conf.BROKER_ADDR isn't used because /etc/eventmq.conf is for the daemons. | ||
| 153 | broker_addr = broker_addr or os.environ.get(ENV_BROKER_ADDR) | ||
| 154 | |||
| 155 | if not broker_addr: | ||
| 156 | raise ConnectionError('unknown broker address: {}'.format(broker_addr)) | ||
| 157 | |||
| 158 | socket.connect(addr=broker_addr) | ||
| 159 | |||
| 160 | return messages.schedule( | ||
| 161 | socket, func, interval_secs=interval_secs, args=args, | ||
| 162 | kwargs=kwargs, class_args=class_args, class_kwargs=class_kwargs, | ||
| 163 | headers=headers, queue=conf.DEFAULT_QUEUE_NAME, cron=cron) | ||
| 164 | |||
| 165 | |||
| 166 | def unschedule(func, broker_addr=None, interval_secs=None, args=(), | ||
| 167 | kwargs=None, class_args=(), class_kwargs=None, | ||
| 168 | headers=('guarantee',), queue=conf.DEFAULT_QUEUE_NAME, | ||
| 169 | cron=None): | ||
| 170 | """ | ||
| 171 | Stop periodically executing a task | ||
| 172 | |||
| 173 | .. note:: | ||
| 174 | |||
| 175 | All passed class & function kwargs/args MUST be json serializable. | ||
| 176 | |||
| 177 | Args: | ||
| 178 | func (callable): the callable (or string path to calable) to be | ||
| 179 | scheduled on a worker | ||
| 180 | broker_addr (str): Address of the broker to send the job to. If no | ||
| 181 | address is given then the value of the environment variable | ||
| 182 | ``EMQ_BROKER_ADDR`` will be used. | ||
| 183 | interval_secs (int): Run job every interval_secs or None if using cron | ||
| 184 | args (list): list of *args to pass to the callable | ||
| 185 | kwargs (dict): dict of **kwargs to pass to the callable | ||
| 186 | class_args (list): list of *args to pass to the class (if applicable) | ||
| 187 | class_kwargs (dict): dict of **kwargs to pass to the class (if | ||
| 188 | applicable) | ||
| 189 | headers (list): list of strings denoting enabled headers. Default: | ||
| 190 | guarantee is enabled to ensure the scheduler schedules the job. | ||
| 191 | queue (str): name of the queue to use when executing the job. The | ||
| 192 | default value is the default queue. | ||
| 193 | cron (string): cron formatted string used for job schedule if | ||
| 194 | interval_secs is None, i.e. '* * * * *' (every minute) | ||
| 195 | Raises: | ||
| 196 | TypeError: When one or more parameters are not JSON serializable. | ||
| 197 | Returns: | ||
| 198 | str: ID of the schedule message that was sent. None if there was an | ||
| 199 | error | ||
| 200 | """ | ||
| 201 | socket = Sender() | ||
| 202 | # conf.BROKER_ADDR isn't used because /etc/eventmq.conf is for the daemons. | ||
| 203 | broker_addr = broker_addr or os.environ.get(ENV_BROKER_ADDR) | ||
| 204 | |||
| 205 | if not broker_addr: | ||
| 206 | raise ConnectionError('unknown broker address: {}'.format(broker_addr)) | ||
| 207 | |||
| 208 | socket.connect(addr=broker_addr) | ||
| 209 | |||
| 210 | return messages.schedule( | ||
| 211 | socket, func, interval_secs=interval_secs, args=args, | ||
| 212 | kwargs=kwargs, class_args=class_args, class_kwargs=class_kwargs, | ||
| 213 | headers=headers, queue=conf.DEFAULT_QUEUE_NAME, cron=cron, | ||
| 214 | unschedule=True) | ||
diff --git a/eventmq/client/messages.py b/eventmq/client/messages.py index 1963e6e..f7e9420 100644 --- a/eventmq/client/messages.py +++ b/eventmq/client/messages.py | |||
| @@ -42,9 +42,8 @@ def schedule(socket, func, interval_secs=None, args=(), kwargs=None, | |||
| 42 | socket (socket): eventmq socket to use for sending the message | 42 | socket (socket): eventmq socket to use for sending the message |
| 43 | func (callable): the callable (or string path to calable) to be | 43 | func (callable): the callable (or string path to calable) to be |
| 44 | scheduled on a worker | 44 | scheduled on a worker |
| 45 | minutes (int): minutes to wait in between executions | ||
| 46 | args (list): list of *args to pass to the callable | ||
| 47 | interval_secs (int): Run job every interval_secs or None if using cron | 45 | interval_secs (int): Run job every interval_secs or None if using cron |
| 46 | args (list): list of *args to pass to the callable | ||
| 48 | cron (string): cron formatted string used for job schedule if | 47 | cron (string): cron formatted string used for job schedule if |
| 49 | interval_secs is None, i.e. '* * * * *' (every minute) | 48 | interval_secs is None, i.e. '* * * * *' (every minute) |
| 50 | kwargs (dict): dict of **kwargs to pass to the callable | 49 | kwargs (dict): dict of **kwargs to pass to the callable |
diff --git a/eventmq/exceptions.py b/eventmq/exceptions.py index 1277cd2..e376de0 100644 --- a/eventmq/exceptions.py +++ b/eventmq/exceptions.py | |||
| @@ -62,3 +62,9 @@ class CallableFromPathError(EventMQError): | |||
| 62 | Raised when construction of a callable from a path and callable_name fails. | 62 | Raised when construction of a callable from a path and callable_name fails. |
| 63 | """ | 63 | """ |
| 64 | pass | 64 | pass |
| 65 | |||
| 66 | |||
| 67 | class ConnectionError(EventMQError): | ||
| 68 | """ | ||
| 69 | Raised when there is an error connecting to a network service. | ||
| 70 | """ | ||