aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAdam Olsen2016-08-25 15:25:38 -0600
committerAdam Olsen2016-08-25 17:01:50 -0600
commit6bcbc67e6e52e49a942a6fe0933511838ee23efd (patch)
tree3b672b1f2b688388270dce3c803533ad7e8bd50c
parent5f6d2b301b320b8e690fe9bfae7727238991551f (diff)
downloadeventmq-6bcbc67e6e52e49a942a6fe0933511838ee23efd.tar.gz
eventmq-6bcbc67e6e52e49a942a6fe0933511838ee23efd.zip
Add "nohaste" header for scheduled jobs
What === 1. Add the `haste` option (header) for scheduling jobs. This will allow you to schedule a job, and either have it run as it's scheduled, or wait until the interval has elapsed. Why === 1. We need it for the debounce decorator, among other things Tests ===== I tried it manually
-rw-r--r--eventmq/jobmanager.py16
-rw-r--r--eventmq/scheduler.py5
2 files changed, 10 insertions, 11 deletions
diff --git a/eventmq/jobmanager.py b/eventmq/jobmanager.py
index f25f98c..1a6d7c6 100644
--- a/eventmq/jobmanager.py
+++ b/eventmq/jobmanager.py
@@ -161,9 +161,9 @@ class JobManager(HeartbeatMixin, EMQPService):
161 params = payload[1] 161 params = payload[1]
162 162
163 if 'reply-requested' in headers: 163 if 'reply-requested' in headers:
164 callback = self.worker_done_with_reply 164 callback = self.worker_done_with_reply
165 else: 165 else:
166 callback = self.worker_done 166 callback = self.worker_done
167 167
168 # kick off the job asynchronously with an appropiate callback 168 # kick off the job asynchronously with an appropiate callback
169 self.workers.apply_async(func=worker.run, 169 self.workers.apply_async(func=worker.run,
@@ -185,7 +185,7 @@ class JobManager(HeartbeatMixin, EMQPService):
185 sendmsg(self.outgoing, 'READY') 185 sendmsg(self.outgoing, 'READY')
186 186
187 def send_reply(self, res): 187 def send_reply(self, res):
188 """ 188 """
189 Sends an REPLY response 189 Sends an REPLY response
190 190
191 Args: 191 Args:
@@ -193,12 +193,10 @@ class JobManager(HeartbeatMixin, EMQPService):
193 recipient (str): The recipient id for the ack 193 recipient (str): The recipient id for the ack
194 msgid: The unique id that we are acknowledging 194 msgid: The unique id that we are acknowledging
195 """ 195 """
196 msgid = res[0] 196 msgid = res[0]
197 197 reply = res[1]
198 reply = res[1] 198 reply = serializer(reply)
199 199 sendmsg(self.outgoing, 'REPLY', [reply, msgid])
200 reply = serializer(reply)
201 sendmsg(self.outgoing, 'REPLY', [reply, msgid])
202 200
203 def on_heartbeat(self, msgid, message): 201 def on_heartbeat(self, msgid, message):
204 """ 202 """
diff --git a/eventmq/scheduler.py b/eventmq/scheduler.py
index 7538fdd..7bf2dba 100644
--- a/eventmq/scheduler.py
+++ b/eventmq/scheduler.py
@@ -17,7 +17,6 @@
17============================= 17=============================
18Handles cron and other scheduled tasks 18Handles cron and other scheduled tasks
19""" 19"""
20import sys
21import json 20import json
22import logging 21import logging
23import redis 22import redis
@@ -298,6 +297,7 @@ class Scheduler(HeartbeatMixin, EMQPService):
298 logger.info("Received new SCHEDULE request: {}".format(message)) 297 logger.info("Received new SCHEDULE request: {}".format(message))
299 298
300 queue = message[0] 299 queue = message[0]
300 headers = message[1]
301 interval = int(message[2]) 301 interval = int(message[2])
302 cron = str(message[4]) 302 cron = str(message[4])
303 303
@@ -354,7 +354,8 @@ class Scheduler(HeartbeatMixin, EMQPService):
354 except Exception as e: 354 except Exception as e:
355 logger.warning(str(e)) 355 logger.warning(str(e))
356 356
357 self.send_request(message[3], queue=queue) 357 if 'dont_start_immediately' not in headers:
358 self.send_request(message[3], queue=queue)
358 359
359 def on_heartbeat(self, msgid, message): 360 def on_heartbeat(self, msgid, message):
360 """ 361 """