aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--eventmq/scheduler.py105
1 files changed, 66 insertions, 39 deletions
diff --git a/eventmq/scheduler.py b/eventmq/scheduler.py
index 3abe58a..ea0fdc3 100644
--- a/eventmq/scheduler.py
+++ b/eventmq/scheduler.py
@@ -24,6 +24,7 @@ from json import loads as deserialize
24import logging 24import logging
25 25
26from croniter import croniter 26from croniter import croniter
27from future.utils import iteritems
27import redis 28import redis
28from six import next 29from six import next
29 30
@@ -47,6 +48,8 @@ class Scheduler(HeartbeatMixin, EMQPService):
47 """ 48 """
48 Keeper of time, master of schedules 49 Keeper of time, master of schedules
49 """ 50 """
51 # TODO: Remove dependency on redis, make the backing store a generic
52 # interface
50 SERVICE_TYPE = constants.CLIENT_TYPE.scheduler 53 SERVICE_TYPE = constants.CLIENT_TYPE.scheduler
51 54
52 def __init__(self, override_settings=None, skip_signal=False, *args, 55 def __init__(self, override_settings=None, skip_signal=False, *args,
@@ -126,6 +129,9 @@ class Scheduler(HeartbeatMixin, EMQPService):
126 def _start_event_loop(self): 129 def _start_event_loop(self):
127 """ 130 """
128 Starts the actual event loop. Usually called by :meth:`Scheduler.start` 131 Starts the actual event loop. Usually called by :meth:`Scheduler.start`
132
133 This loop is responsible for sending REQUESTs for scheduled jobs when
134 their next scheduled time has occurred
129 """ 135 """
130 while True: 136 while True:
131 if self.received_disconnect: 137 if self.received_disconnect:
@@ -139,79 +145,100 @@ class Scheduler(HeartbeatMixin, EMQPService):
139 msg = self.frontend.recv_multipart() 145 msg = self.frontend.recv_multipart()
140 self.process_message(msg) 146 self.process_message(msg)
141 147
142 # TODO: distribute me! 148 for hash_, cron in iteritems(self.cron_jobs):
143 for hash_, cron in self.cron_jobs.items(): 149 # the next ts this job should be executed in
144 # If the time is now, or passed 150 next_monotonic = cron[0]
145 if cron[0] <= ts_now: 151 # 1 = the function to be executed
146 msg = cron[1] 152 job_message = cron[1]
147 queue = cron[3] 153 # 2 = the croniter iterator for this job
154 interval_iterator = cron[2]
155 # 3 = the queue to execute the job in
156 queue = cron[3]
148 157
158 # If the time is now, or passed
159 if next_monotonic <= ts_now:
149 # Run the msg 160 # Run the msg
150 logger.debug("Time is: %s; Schedule is: %s - Running %s" 161 logger.debug("Time is: %s; Schedule is: %s - Running %s"
151 % (ts_now, cron[0], msg)) 162 % (ts_now, next_monotonic, job_message))
152 163
153 self.send_request(msg, queue=queue) 164 self.send_request(job_message, queue=queue)
154 165
155 # Update the next time to run 166 # Update the next time to run
156 cron[0] = next(cron[2]) 167 next_monotonic = next(interval_iterator)
157 logger.debug("Next execution will be in %ss" % 168 logger.debug("Next execution will be in %ss" %
158 seconds_until(cron[0])) 169 seconds_until(next_monotonic))
159 170
160 cancel_jobs = [] 171 cancel_jobs = []
161 for k, v in self.interval_jobs.iteritems():
162 # TODO: Refactor this entire loop to be readable by humankind
163 # The schedule time has elapsed
164 if v[0] <= m_now:
165 msg = v[1]
166 queue = v[3]
167 172
168 logger.debug("Time is: %s; Schedule is: %s - Running %s" 173 # Iterate all interval style jobs and update their state,
169 % (ts_now, v[0], msg)) 174 # send REQUESTs if necessary
175 for job_hash, job in iteritems(self.interval_jobs):
176 # the next (monotonic) ts that this job should be executed in
177 next_monotonic = job[0]
178 # the function to be executed
179 job_message = job[1]
180 # the interval iter for this job
181 interval_iterator = job[2]
182 # the queue to execute the job in
183 queue = job[3]
184 # run_count: # of times to execute this job
185 run_count = job[4]
186
187 if next_monotonic <= m_now:
188 # The schedule time has elapsed
170 189
171 # v[4] is the current remaining run_count 190 logger.debug("Time is: %s; Schedule is: %s - Running %s"
172 if v[4] != INFINITE_RUN_COUNT: 191 % (ts_now, next_monotonic, job_message))
173 # If run_count was 0, we cancel the job 192
174 if v[4] <= 0: 193 # Only do run_count processing if its set to anything
175 cancel_jobs.append(k) 194 # besides the default of INFINITE
195 if run_count != INFINITE_RUN_COUNT:
196 # If run_count was <= 0, we cancel the job
197 if run_count <= 0:
198 cancel_jobs.append(job_hash)
176 else: 199 else:
177 # Decrement run_count 200 # Decrement run_count
178 v[4] -= 1 201 run_count -= 1
179 # Persist the change to redis 202 # Persist the change to redis
180 try: 203 try:
181 message = deserialize(self.redis_server.get(k)) 204 message = deserialize(
205 self.redis_server.get(job_hash))
182 new_headers = [] 206 new_headers = []
183 for header in message[1].split(','): 207 for header in message[1].split(','):
184 if 'run_count:' in header: 208 if 'run_count:' in header:
185 new_headers.append( 209 new_headers.append(
186 'run_count:{}'.format(v[4])) 210 'run_count:{}'.format(run_count))
187 else: 211 else:
188 new_headers.append(header) 212 new_headers.append(header)
189 message[1] = ",".join(new_headers) 213 message[1] = ",".join(new_headers)
190 self.redis_server.set(k, serialize(message)) 214 self.redis_server.set(
215 job_hash, serialize(message))
191 except Exception as e: 216 except Exception as e:
192 logger.warning( 217 logger.warning(
193 'Unable to update key in redis ' 218 'Unable to update key in redis '
194 'server: {}'.format(e)) 219 'server: {}'.format(e))
195 # Perform the request since run_count still > 0 220 # Perform the request since run_count still > 0
196 self.send_request(msg, queue=queue) 221 self.send_request(job_message, queue=queue)
197 v[0] = next(v[2]) 222 next_monotonic = next(interval_iterator)
198 else: 223 else:
199 # Scheduled job is in running infinitely 224 # Scheduled job is in running infinitely
200 # Send job and update next schedule time 225 # Send job and update next schedule time
201 self.send_request(msg, queue=queue) 226 self.send_request(job_message, queue=queue)
202 v[0] = next(v[2]) 227 next_monotonic = next(interval_iterator)
203 228
229 # Cancel and remove jobs where run_count has reached 0,
230 # and persist that to redis
204 for job in cancel_jobs: 231 for job in cancel_jobs:
205 try: 232 try:
206 logger.debug('Cancelling job due to run_count: {}' 233 logger.debug('Cancelling job due to run_count: {}'
207 .format(k)) 234 .format(job_hash))
208 self.redis_server.delete(k) 235 self.redis_server.delete(job_hash)
209 self.redis_server.lrem('interval_jobs', 0, k) 236 self.redis_server.lrem('interval_jobs', 0, job_hash)
210 except Exception as e: 237 except Exception as e:
211 logger.warning( 238 logger.warning(
212 'Unable to update key in redis ' 239 'Unable to update key in redis '
213 'server: {}'.format(e)) 240 'server: {}'.format(e))
214 del self.interval_jobs[k] 241 del self.interval_jobs[job_hash]
215 242
216 if not self.maybe_send_heartbeat(events): 243 if not self.maybe_send_heartbeat(events):
217 break 244 break
@@ -234,19 +261,19 @@ class Scheduler(HeartbeatMixin, EMQPService):
234 else: 261 else:
235 return self._redis_server 262 return self._redis_server
236 263
237 def send_request(self, jobmsg, queue=None): 264 def send_request(self, job_message, queue=None):
238 """ 265 """
239 Send a request message to the broker 266 Send a request message to the broker
240 267
241 Args: 268 Args:
242 jobmsg: The message to send to the broker 269 job_message: The message to send to the broker
243 queue: The name of the queue to use_impersonation 270 queue: The name of the queue to use_impersonation
244 271
245 Returns: 272 Returns:
246 str: ID of the message 273 str: ID of the message
247 """ 274 """
248 jobmsg = json.loads(jobmsg) 275 job_message = json.loads(job_message)
249 msgid = send_request(self.frontend, jobmsg, queue=queue, 276 msgid = send_request(self.frontend, job_message, queue=queue,
250 reply_requested=True) 277 reply_requested=True)
251 278
252 return msgid 279 return msgid