aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorjason2017-01-25 16:55:30 -0700
committerjason2017-01-31 15:54:49 -0700
commitc50a83c63e313f0b3e48947f6ef39a297cf8b14f (patch)
treed23cfd55d7e5f6eda29bbdf316ad8bdc346c8d4e
parent7820331fd493d17513ca412a927e556ec0dd0a05 (diff)
downloadeventmq-c50a83c63e313f0b3e48947f6ef39a297cf8b14f.tar.gz
eventmq-c50a83c63e313f0b3e48947f6ef39a297cf8b14f.zip
Add schedule and unschedule helper functions
- These functions will create the socket for the user
-rw-r--r--eventmq/__init__.py2
-rw-r--r--eventmq/client/jobs.py108
-rw-r--r--eventmq/client/messages.py3
-rw-r--r--eventmq/exceptions.py6
4 files changed, 116 insertions, 3 deletions
diff --git a/eventmq/__init__.py b/eventmq/__init__.py
index 6b5c4b9..8ec8043 100644
--- a/eventmq/__init__.py
+++ b/eventmq/__init__.py
@@ -4,4 +4,4 @@ __version__ = '0.3-rc10'
4PROTOCOL_VERSION = 'eMQP/1.0' 4PROTOCOL_VERSION = 'eMQP/1.0'
5 5
6from .client.messages import defer_job # noqa 6from .client.messages import defer_job # noqa
7from .client.jobs import job # noqa 7from .client.jobs import job, schedule, unschedule # noqa
diff --git a/eventmq/client/jobs.py b/eventmq/client/jobs.py
index 2f4f784..6b53ab1 100644
--- a/eventmq/client/jobs.py
+++ b/eventmq/client/jobs.py
@@ -20,7 +20,9 @@ import logging
20import os 20import os
21 21
22from . import messages 22from . import messages
23from .. import conf
23from ..constants import ENV_BROKER_ADDR 24from ..constants import ENV_BROKER_ADDR
25from ..exceptions import ConnectionError
24from ..sender import Sender 26from ..sender import Sender
25 27
26logger = logging.getLogger(__name__) 28logger = logging.getLogger(__name__)
@@ -30,6 +32,10 @@ class Job(object):
30 """ 32 """
31 Defines a deferred EventMQ job. 33 Defines a deferred EventMQ job.
32 34
35 .. note::
36
37 All passed class & function kwargs/args MUST be json serializable.
38
33 Usage: 39 Usage:
34 40
35 .. code-block:: python 41 .. code-block:: python
@@ -68,6 +74,8 @@ class Job(object):
68 can set this to False. This is useful for unit tests. 74 can set this to False. This is useful for unit tests.
69 75
70 """ 76 """
77 # conf.BROKER_ADDR isn't used because /etc/eventmq.conf is for the
78 # daemons.
71 self.broker_addr = broker_addr or os.environ.get(ENV_BROKER_ADDR) 79 self.broker_addr = broker_addr or os.environ.get(ENV_BROKER_ADDR)
72 self.queue = queue 80 self.queue = queue
73 self.async = async 81 self.async = async
@@ -104,3 +112,103 @@ def job(func, broker_addr=None, queue=None, async=True, *args, **kwargs):
104 return decorator(func) 112 return decorator(func)
105 else: 113 else:
106 return decorator 114 return decorator
115
116
117def schedule(func, broker_addr=None, interval_secs=None, args=(), kwargs=None,
118 class_args=(), class_kwargs=None, headers=('guarantee',),
119 queue=conf.DEFAULT_QUEUE_NAME, cron=None):
120 """
121 Execute a task on a defined interval.
122
123 .. note::
124
125 All passed class & function kwargs/args MUST be json serializable.
126
127 Args:
128 func (callable): the callable (or string path to calable) to be
129 scheduled on a worker
130 broker_addr (str): Address of the broker to send the job to. If no
131 address is given then the value of the environment variable
132 ``EMQ_BROKER_ADDR`` will be used.
133 interval_secs (int): Run job every interval_secs or None if using cron
134 args (list): list of *args to pass to the callable
135 kwargs (dict): dict of **kwargs to pass to the callable
136 class_args (list): list of *args to pass to the class (if applicable)
137 class_kwargs (dict): dict of **kwargs to pass to the class (if
138 applicable)
139 headers (list): list of strings denoting enabled headers. Default:
140 guarantee is enabled to ensure the scheduler schedules the job.
141 queue (str): name of the queue to use when executing the job. The
142 default value is the default queue.
143 cron (string): cron formatted string used for job schedule if
144 interval_secs is None, i.e. '* * * * *' (every minute)
145 Raises:
146 TypeError: When one or more parameters are not JSON serializable.
147 Returns:
148 str: ID of the schedule message that was sent. None if there was an
149 error
150 """
151 socket = Sender()
152 # conf.BROKER_ADDR isn't used because /etc/eventmq.conf is for the daemons.
153 broker_addr = broker_addr or os.environ.get(ENV_BROKER_ADDR)
154
155 if not broker_addr:
156 raise ConnectionError('unknown broker address: {}'.format(broker_addr))
157
158 socket.connect(addr=broker_addr)
159
160 return messages.schedule(
161 socket, func, interval_secs=interval_secs, args=args,
162 kwargs=kwargs, class_args=class_args, class_kwargs=class_kwargs,
163 headers=headers, queue=conf.DEFAULT_QUEUE_NAME, cron=cron)
164
165
166def unschedule(func, broker_addr=None, interval_secs=None, args=(),
167 kwargs=None, class_args=(), class_kwargs=None,
168 headers=('guarantee',), queue=conf.DEFAULT_QUEUE_NAME,
169 cron=None):
170 """
171 Stop periodically executing a task
172
173 .. note::
174
175 All passed class & function kwargs/args MUST be json serializable.
176
177 Args:
178 func (callable): the callable (or string path to calable) to be
179 scheduled on a worker
180 broker_addr (str): Address of the broker to send the job to. If no
181 address is given then the value of the environment variable
182 ``EMQ_BROKER_ADDR`` will be used.
183 interval_secs (int): Run job every interval_secs or None if using cron
184 args (list): list of *args to pass to the callable
185 kwargs (dict): dict of **kwargs to pass to the callable
186 class_args (list): list of *args to pass to the class (if applicable)
187 class_kwargs (dict): dict of **kwargs to pass to the class (if
188 applicable)
189 headers (list): list of strings denoting enabled headers. Default:
190 guarantee is enabled to ensure the scheduler schedules the job.
191 queue (str): name of the queue to use when executing the job. The
192 default value is the default queue.
193 cron (string): cron formatted string used for job schedule if
194 interval_secs is None, i.e. '* * * * *' (every minute)
195 Raises:
196 TypeError: When one or more parameters are not JSON serializable.
197 Returns:
198 str: ID of the schedule message that was sent. None if there was an
199 error
200 """
201 socket = Sender()
202 # conf.BROKER_ADDR isn't used because /etc/eventmq.conf is for the daemons.
203 broker_addr = broker_addr or os.environ.get(ENV_BROKER_ADDR)
204
205 if not broker_addr:
206 raise ConnectionError('unknown broker address: {}'.format(broker_addr))
207
208 socket.connect(addr=broker_addr)
209
210 return messages.schedule(
211 socket, func, interval_secs=interval_secs, args=args,
212 kwargs=kwargs, class_args=class_args, class_kwargs=class_kwargs,
213 headers=headers, queue=conf.DEFAULT_QUEUE_NAME, cron=cron,
214 unschedule=True)
diff --git a/eventmq/client/messages.py b/eventmq/client/messages.py
index 1963e6e..f7e9420 100644
--- a/eventmq/client/messages.py
+++ b/eventmq/client/messages.py
@@ -42,9 +42,8 @@ def schedule(socket, func, interval_secs=None, args=(), kwargs=None,
42 socket (socket): eventmq socket to use for sending the message 42 socket (socket): eventmq socket to use for sending the message
43 func (callable): the callable (or string path to calable) to be 43 func (callable): the callable (or string path to calable) to be
44 scheduled on a worker 44 scheduled on a worker
45 minutes (int): minutes to wait in between executions
46 args (list): list of *args to pass to the callable
47 interval_secs (int): Run job every interval_secs or None if using cron 45 interval_secs (int): Run job every interval_secs or None if using cron
46 args (list): list of *args to pass to the callable
48 cron (string): cron formatted string used for job schedule if 47 cron (string): cron formatted string used for job schedule if
49 interval_secs is None, i.e. '* * * * *' (every minute) 48 interval_secs is None, i.e. '* * * * *' (every minute)
50 kwargs (dict): dict of **kwargs to pass to the callable 49 kwargs (dict): dict of **kwargs to pass to the callable
diff --git a/eventmq/exceptions.py b/eventmq/exceptions.py
index 1277cd2..e376de0 100644
--- a/eventmq/exceptions.py
+++ b/eventmq/exceptions.py
@@ -62,3 +62,9 @@ class CallableFromPathError(EventMQError):
62 Raised when construction of a callable from a path and callable_name fails. 62 Raised when construction of a callable from a path and callable_name fails.
63 """ 63 """
64 pass 64 pass
65
66
67class ConnectionError(EventMQError):
68 """
69 Raised when there is an error connecting to a network service.
70 """