diff options
| -rw-r--r-- | eventmq/scheduler.py | 105 |
1 files changed, 66 insertions, 39 deletions
diff --git a/eventmq/scheduler.py b/eventmq/scheduler.py index 3abe58a..ea0fdc3 100644 --- a/eventmq/scheduler.py +++ b/eventmq/scheduler.py | |||
| @@ -24,6 +24,7 @@ from json import loads as deserialize | |||
| 24 | import logging | 24 | import logging |
| 25 | 25 | ||
| 26 | from croniter import croniter | 26 | from croniter import croniter |
| 27 | from future.utils import iteritems | ||
| 27 | import redis | 28 | import redis |
| 28 | from six import next | 29 | from six import next |
| 29 | 30 | ||
| @@ -47,6 +48,8 @@ class Scheduler(HeartbeatMixin, EMQPService): | |||
| 47 | """ | 48 | """ |
| 48 | Keeper of time, master of schedules | 49 | Keeper of time, master of schedules |
| 49 | """ | 50 | """ |
| 51 | # TODO: Remove dependency on redis, make the backing store a generic | ||
| 52 | # interface | ||
| 50 | SERVICE_TYPE = constants.CLIENT_TYPE.scheduler | 53 | SERVICE_TYPE = constants.CLIENT_TYPE.scheduler |
| 51 | 54 | ||
| 52 | def __init__(self, override_settings=None, skip_signal=False, *args, | 55 | def __init__(self, override_settings=None, skip_signal=False, *args, |
| @@ -126,6 +129,9 @@ class Scheduler(HeartbeatMixin, EMQPService): | |||
| 126 | def _start_event_loop(self): | 129 | def _start_event_loop(self): |
| 127 | """ | 130 | """ |
| 128 | Starts the actual event loop. Usually called by :meth:`Scheduler.start` | 131 | Starts the actual event loop. Usually called by :meth:`Scheduler.start` |
| 132 | |||
| 133 | This loop is responsible for sending REQUESTs for scheduled jobs when | ||
| 134 | their next scheduled time has occurred | ||
| 129 | """ | 135 | """ |
| 130 | while True: | 136 | while True: |
| 131 | if self.received_disconnect: | 137 | if self.received_disconnect: |
| @@ -139,79 +145,100 @@ class Scheduler(HeartbeatMixin, EMQPService): | |||
| 139 | msg = self.frontend.recv_multipart() | 145 | msg = self.frontend.recv_multipart() |
| 140 | self.process_message(msg) | 146 | self.process_message(msg) |
| 141 | 147 | ||
| 142 | # TODO: distribute me! | 148 | for hash_, cron in iteritems(self.cron_jobs): |
| 143 | for hash_, cron in self.cron_jobs.items(): | 149 | # the next ts this job should be executed in |
| 144 | # If the time is now, or passed | 150 | next_monotonic = cron[0] |
| 145 | if cron[0] <= ts_now: | 151 | # 1 = the function to be executed |
| 146 | msg = cron[1] | 152 | job_message = cron[1] |
| 147 | queue = cron[3] | 153 | # 2 = the croniter iterator for this job |
| 154 | interval_iterator = cron[2] | ||
| 155 | # 3 = the queue to execute the job in | ||
| 156 | queue = cron[3] | ||
| 148 | 157 | ||
| 158 | # If the time is now, or passed | ||
| 159 | if next_monotonic <= ts_now: | ||
| 149 | # Run the msg | 160 | # Run the msg |
| 150 | logger.debug("Time is: %s; Schedule is: %s - Running %s" | 161 | logger.debug("Time is: %s; Schedule is: %s - Running %s" |
| 151 | % (ts_now, cron[0], msg)) | 162 | % (ts_now, next_monotonic, job_message)) |
| 152 | 163 | ||
| 153 | self.send_request(msg, queue=queue) | 164 | self.send_request(job_message, queue=queue) |
| 154 | 165 | ||
| 155 | # Update the next time to run | 166 | # Update the next time to run |
| 156 | cron[0] = next(cron[2]) | 167 | next_monotonic = next(interval_iterator) |
| 157 | logger.debug("Next execution will be in %ss" % | 168 | logger.debug("Next execution will be in %ss" % |
| 158 | seconds_until(cron[0])) | 169 | seconds_until(next_monotonic)) |
| 159 | 170 | ||
| 160 | cancel_jobs = [] | 171 | cancel_jobs = [] |
| 161 | for k, v in self.interval_jobs.iteritems(): | ||
| 162 | # TODO: Refactor this entire loop to be readable by humankind | ||
| 163 | # The schedule time has elapsed | ||
| 164 | if v[0] <= m_now: | ||
| 165 | msg = v[1] | ||
| 166 | queue = v[3] | ||
| 167 | 172 | ||
| 168 | logger.debug("Time is: %s; Schedule is: %s - Running %s" | 173 | # Iterate all interval style jobs and update their state, |
| 169 | % (ts_now, v[0], msg)) | 174 | # send REQUESTs if necessary |
| 175 | for job_hash, job in iteritems(self.interval_jobs): | ||
| 176 | # the next (monotonic) ts that this job should be executed in | ||
| 177 | next_monotonic = job[0] | ||
| 178 | # the function to be executed | ||
| 179 | job_message = job[1] | ||
| 180 | # the interval iter for this job | ||
| 181 | interval_iterator = job[2] | ||
| 182 | # the queue to execute the job in | ||
| 183 | queue = job[3] | ||
| 184 | # run_count: # of times to execute this job | ||
| 185 | run_count = job[4] | ||
| 186 | |||
| 187 | if next_monotonic <= m_now: | ||
| 188 | # The schedule time has elapsed | ||
| 170 | 189 | ||
| 171 | # v[4] is the current remaining run_count | 190 | logger.debug("Time is: %s; Schedule is: %s - Running %s" |
| 172 | if v[4] != INFINITE_RUN_COUNT: | 191 | % (ts_now, next_monotonic, job_message)) |
| 173 | # If run_count was 0, we cancel the job | 192 | |
| 174 | if v[4] <= 0: | 193 | # Only do run_count processing if its set to anything |
| 175 | cancel_jobs.append(k) | 194 | # besides the default of INFINITE |
| 195 | if run_count != INFINITE_RUN_COUNT: | ||
| 196 | # If run_count was <= 0, we cancel the job | ||
| 197 | if run_count <= 0: | ||
| 198 | cancel_jobs.append(job_hash) | ||
| 176 | else: | 199 | else: |
| 177 | # Decrement run_count | 200 | # Decrement run_count |
| 178 | v[4] -= 1 | 201 | run_count -= 1 |
| 179 | # Persist the change to redis | 202 | # Persist the change to redis |
| 180 | try: | 203 | try: |
| 181 | message = deserialize(self.redis_server.get(k)) | 204 | message = deserialize( |
| 205 | self.redis_server.get(job_hash)) | ||
| 182 | new_headers = [] | 206 | new_headers = [] |
| 183 | for header in message[1].split(','): | 207 | for header in message[1].split(','): |
| 184 | if 'run_count:' in header: | 208 | if 'run_count:' in header: |
| 185 | new_headers.append( | 209 | new_headers.append( |
| 186 | 'run_count:{}'.format(v[4])) | 210 | 'run_count:{}'.format(run_count)) |
| 187 | else: | 211 | else: |
| 188 | new_headers.append(header) | 212 | new_headers.append(header) |
| 189 | message[1] = ",".join(new_headers) | 213 | message[1] = ",".join(new_headers) |
| 190 | self.redis_server.set(k, serialize(message)) | 214 | self.redis_server.set( |
| 215 | job_hash, serialize(message)) | ||
| 191 | except Exception as e: | 216 | except Exception as e: |
| 192 | logger.warning( | 217 | logger.warning( |
| 193 | 'Unable to update key in redis ' | 218 | 'Unable to update key in redis ' |
| 194 | 'server: {}'.format(e)) | 219 | 'server: {}'.format(e)) |
| 195 | # Perform the request since run_count still > 0 | 220 | # Perform the request since run_count still > 0 |
| 196 | self.send_request(msg, queue=queue) | 221 | self.send_request(job_message, queue=queue) |
| 197 | v[0] = next(v[2]) | 222 | next_monotonic = next(interval_iterator) |
| 198 | else: | 223 | else: |
| 199 | # Scheduled job is in running infinitely | 224 | # Scheduled job is in running infinitely |
| 200 | # Send job and update next schedule time | 225 | # Send job and update next schedule time |
| 201 | self.send_request(msg, queue=queue) | 226 | self.send_request(job_message, queue=queue) |
| 202 | v[0] = next(v[2]) | 227 | next_monotonic = next(interval_iterator) |
| 203 | 228 | ||
| 229 | # Cancel and remove jobs where run_count has reached 0, | ||
| 230 | # and persist that to redis | ||
| 204 | for job in cancel_jobs: | 231 | for job in cancel_jobs: |
| 205 | try: | 232 | try: |
| 206 | logger.debug('Cancelling job due to run_count: {}' | 233 | logger.debug('Cancelling job due to run_count: {}' |
| 207 | .format(k)) | 234 | .format(job_hash)) |
| 208 | self.redis_server.delete(k) | 235 | self.redis_server.delete(job_hash) |
| 209 | self.redis_server.lrem('interval_jobs', 0, k) | 236 | self.redis_server.lrem('interval_jobs', 0, job_hash) |
| 210 | except Exception as e: | 237 | except Exception as e: |
| 211 | logger.warning( | 238 | logger.warning( |
| 212 | 'Unable to update key in redis ' | 239 | 'Unable to update key in redis ' |
| 213 | 'server: {}'.format(e)) | 240 | 'server: {}'.format(e)) |
| 214 | del self.interval_jobs[k] | 241 | del self.interval_jobs[job_hash] |
| 215 | 242 | ||
| 216 | if not self.maybe_send_heartbeat(events): | 243 | if not self.maybe_send_heartbeat(events): |
| 217 | break | 244 | break |
| @@ -234,19 +261,19 @@ class Scheduler(HeartbeatMixin, EMQPService): | |||
| 234 | else: | 261 | else: |
| 235 | return self._redis_server | 262 | return self._redis_server |
| 236 | 263 | ||
| 237 | def send_request(self, jobmsg, queue=None): | 264 | def send_request(self, job_message, queue=None): |
| 238 | """ | 265 | """ |
| 239 | Send a request message to the broker | 266 | Send a request message to the broker |
| 240 | 267 | ||
| 241 | Args: | 268 | Args: |
| 242 | jobmsg: The message to send to the broker | 269 | job_message: The message to send to the broker |
| 243 | queue: The name of the queue to use_impersonation | 270 | queue: The name of the queue to use_impersonation |
| 244 | 271 | ||
| 245 | Returns: | 272 | Returns: |
| 246 | str: ID of the message | 273 | str: ID of the message |
| 247 | """ | 274 | """ |
| 248 | jobmsg = json.loads(jobmsg) | 275 | job_message = json.loads(job_message) |
| 249 | msgid = send_request(self.frontend, jobmsg, queue=queue, | 276 | msgid = send_request(self.frontend, job_message, queue=queue, |
| 250 | reply_requested=True) | 277 | reply_requested=True) |
| 251 | 278 | ||
| 252 | return msgid | 279 | return msgid |