diff options
| author | Adam Olsen | 2016-08-25 15:25:38 -0600 |
|---|---|---|
| committer | Adam Olsen | 2016-08-25 17:01:50 -0600 |
| commit | 6bcbc67e6e52e49a942a6fe0933511838ee23efd (patch) | |
| tree | 3b672b1f2b688388270dce3c803533ad7e8bd50c | |
| parent | 5f6d2b301b320b8e690fe9bfae7727238991551f (diff) | |
| download | eventmq-6bcbc67e6e52e49a942a6fe0933511838ee23efd.tar.gz eventmq-6bcbc67e6e52e49a942a6fe0933511838ee23efd.zip | |
Add "nohaste" header for scheduled jobs
What
===
1. Add the `haste` option (header) for scheduling jobs.
This will allow you to schedule a job, and either have it run as it's
scheduled, or wait until the interval has elapsed.
Why
===
1. We need it for the debounce decorator, among other things
Tests
=====
I tried it manually
| -rw-r--r-- | eventmq/jobmanager.py | 16 | ||||
| -rw-r--r-- | eventmq/scheduler.py | 5 |
2 files changed, 10 insertions, 11 deletions
diff --git a/eventmq/jobmanager.py b/eventmq/jobmanager.py index f25f98c..1a6d7c6 100644 --- a/eventmq/jobmanager.py +++ b/eventmq/jobmanager.py | |||
| @@ -161,9 +161,9 @@ class JobManager(HeartbeatMixin, EMQPService): | |||
| 161 | params = payload[1] | 161 | params = payload[1] |
| 162 | 162 | ||
| 163 | if 'reply-requested' in headers: | 163 | if 'reply-requested' in headers: |
| 164 | callback = self.worker_done_with_reply | 164 | callback = self.worker_done_with_reply |
| 165 | else: | 165 | else: |
| 166 | callback = self.worker_done | 166 | callback = self.worker_done |
| 167 | 167 | ||
| 168 | # kick off the job asynchronously with an appropiate callback | 168 | # kick off the job asynchronously with an appropiate callback |
| 169 | self.workers.apply_async(func=worker.run, | 169 | self.workers.apply_async(func=worker.run, |
| @@ -185,7 +185,7 @@ class JobManager(HeartbeatMixin, EMQPService): | |||
| 185 | sendmsg(self.outgoing, 'READY') | 185 | sendmsg(self.outgoing, 'READY') |
| 186 | 186 | ||
| 187 | def send_reply(self, res): | 187 | def send_reply(self, res): |
| 188 | """ | 188 | """ |
| 189 | Sends an REPLY response | 189 | Sends an REPLY response |
| 190 | 190 | ||
| 191 | Args: | 191 | Args: |
| @@ -193,12 +193,10 @@ class JobManager(HeartbeatMixin, EMQPService): | |||
| 193 | recipient (str): The recipient id for the ack | 193 | recipient (str): The recipient id for the ack |
| 194 | msgid: The unique id that we are acknowledging | 194 | msgid: The unique id that we are acknowledging |
| 195 | """ | 195 | """ |
| 196 | msgid = res[0] | 196 | msgid = res[0] |
| 197 | 197 | reply = res[1] | |
| 198 | reply = res[1] | 198 | reply = serializer(reply) |
| 199 | 199 | sendmsg(self.outgoing, 'REPLY', [reply, msgid]) | |
| 200 | reply = serializer(reply) | ||
| 201 | sendmsg(self.outgoing, 'REPLY', [reply, msgid]) | ||
| 202 | 200 | ||
| 203 | def on_heartbeat(self, msgid, message): | 201 | def on_heartbeat(self, msgid, message): |
| 204 | """ | 202 | """ |
diff --git a/eventmq/scheduler.py b/eventmq/scheduler.py index 7538fdd..7bf2dba 100644 --- a/eventmq/scheduler.py +++ b/eventmq/scheduler.py | |||
| @@ -17,7 +17,6 @@ | |||
| 17 | ============================= | 17 | ============================= |
| 18 | Handles cron and other scheduled tasks | 18 | Handles cron and other scheduled tasks |
| 19 | """ | 19 | """ |
| 20 | import sys | ||
| 21 | import json | 20 | import json |
| 22 | import logging | 21 | import logging |
| 23 | import redis | 22 | import redis |
| @@ -298,6 +297,7 @@ class Scheduler(HeartbeatMixin, EMQPService): | |||
| 298 | logger.info("Received new SCHEDULE request: {}".format(message)) | 297 | logger.info("Received new SCHEDULE request: {}".format(message)) |
| 299 | 298 | ||
| 300 | queue = message[0] | 299 | queue = message[0] |
| 300 | headers = message[1] | ||
| 301 | interval = int(message[2]) | 301 | interval = int(message[2]) |
| 302 | cron = str(message[4]) | 302 | cron = str(message[4]) |
| 303 | 303 | ||
| @@ -354,7 +354,8 @@ class Scheduler(HeartbeatMixin, EMQPService): | |||
| 354 | except Exception as e: | 354 | except Exception as e: |
| 355 | logger.warning(str(e)) | 355 | logger.warning(str(e)) |
| 356 | 356 | ||
| 357 | self.send_request(message[3], queue=queue) | 357 | if 'dont_start_immediately' not in headers: |
| 358 | self.send_request(message[3], queue=queue) | ||
| 358 | 359 | ||
| 359 | def on_heartbeat(self, msgid, message): | 360 | def on_heartbeat(self, msgid, message): |
| 360 | """ | 361 | """ |