aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDavid Hurst2017-01-18 17:35:46 -0700
committerGitHub2017-01-18 17:35:46 -0700
commita87bb8c50b126d4872eb20dbcf9e50f12c3649f9 (patch)
tree3e1c9af8ec3cecd3c9eb3fbb599e5ad239e5de0c
parent65a3bd89c7e214c738a40b42cdc69503a44af3ca (diff)
parent3720390fa3e9027e545ccd5de2e90c19ec898a6c (diff)
downloadeventmq-a87bb8c50b126d4872eb20dbcf9e50f12c3649f9.tar.gz
eventmq-a87bb8c50b126d4872eb20dbcf9e50f12c3649f9.zip
Merge pull request #70 from sideshowdave7/master
EventMQ 0.3rc5 (We'll get 0.3 someday) Fix critical worker health check bug, Simplify scheduler run_count header (remove support for persistance)
-rwxr-xr-xbin/send_request3
-rw-r--r--eventmq/__init__.py2
-rw-r--r--eventmq/conf.py2
-rw-r--r--eventmq/jobmanager.py7
-rw-r--r--eventmq/scheduler.py53
-rw-r--r--eventmq/worker.py9
-rw-r--r--setup.py2
7 files changed, 39 insertions, 39 deletions
diff --git a/bin/send_request b/bin/send_request
index aa84639..9c8696d 100755
--- a/bin/send_request
+++ b/bin/send_request
@@ -21,4 +21,5 @@ if __name__ == "__main__":
21 'kwargs': {} 21 'kwargs': {}
22 }] 22 }]
23 23
24 send_request(s, msg, guarantee=True, reply_requested=True) 24 while True:
25 send_request(s, msg, guarantee=True, reply_requested=True)
diff --git a/eventmq/__init__.py b/eventmq/__init__.py
index e1feca9..184ddfe 100644
--- a/eventmq/__init__.py
+++ b/eventmq/__init__.py
@@ -1,5 +1,5 @@
1__author__ = 'EventMQ Contributors' 1__author__ = 'EventMQ Contributors'
2__version__ = '0.3-rc4' 2__version__ = '0.3-rc5'
3 3
4PROTOCOL_VERSION = 'eMQP/1.0' 4PROTOCOL_VERSION = 'eMQP/1.0'
5 5
diff --git a/eventmq/conf.py b/eventmq/conf.py
index e2d2b1c..6a0b3a4 100644
--- a/eventmq/conf.py
+++ b/eventmq/conf.py
@@ -86,4 +86,6 @@ RQ_PORT = 6379
86RQ_DB = 0 86RQ_DB = 0
87RQ_PASSWORD = '' 87RQ_PASSWORD = ''
88 88
89MAX_JOB_COUNT=1024
90
89# }}} 91# }}}
diff --git a/eventmq/jobmanager.py b/eventmq/jobmanager.py
index e94f16d..b6de15e 100644
--- a/eventmq/jobmanager.py
+++ b/eventmq/jobmanager.py
@@ -244,13 +244,14 @@ class JobManager(HeartbeatMixin, EMQPService):
244 244
245 def check_worker_health(self): 245 def check_worker_health(self):
246 """ 246 """
247 Checks for any dead processes in the pool and recreates them if necessary 247 Checks for any dead processes in the pool and recreates them if
248 necessary
248 """ 249 """
249 self._workers = [w for w in self._workers if w.is_alive()] 250 self._workers = [w for w in self._workers if w.is_alive()]
250 251
251 if len(self._workers) < conf.CONCURRENT_JOBS: 252 if len(self._workers) < conf.CONCURRENT_JOBS:
252 logger.warning("{} worker process(es) may have died...recreating")\ 253 logger.warning("{} worker process(es) may have died...recreating"
253 .format(conf.CONCURRENT_JOBS - len(self._workers)) 254 .format(conf.CONCURRENT_JOBS - len(self._workers)))
254 255
255 for i in range(0, conf.CONCURRENT_JOBS - len(self._workers)): 256 for i in range(0, conf.CONCURRENT_JOBS - len(self._workers)):
256 w = Worker(self.request_queue, self.finished_queue) 257 w = Worker(self.request_queue, self.finished_queue)
diff --git a/eventmq/scheduler.py b/eventmq/scheduler.py
index 9130694..086ba69 100644
--- a/eventmq/scheduler.py
+++ b/eventmq/scheduler.py
@@ -182,49 +182,30 @@ class Scheduler(HeartbeatMixin, EMQPService):
182 % (ts_now, v[0], msg)) 182 % (ts_now, v[0], msg))
183 183
184 if v[4] != INFINITE_RUN_COUNT: 184 if v[4] != INFINITE_RUN_COUNT:
185 # Decrement run_count
186 v[4] -= 1
187 # If run_count was 0, we cancel the job 185 # If run_count was 0, we cancel the job
188 if v[4] <= 0: 186 if v[4] <= 0:
189 cancel_jobs.append(k) 187 cancel_jobs.append(k)
190 # Otherwise we run the job
191 else: 188 else:
192 # Send job and update next schedule time 189 # Decrement run_count
190 v[4] -= 1
193 self.send_request(msg, queue=queue) 191 self.send_request(msg, queue=queue)
194 v[0] = next(v[2]) 192 v[0] = next(v[2])
195 # Rename redis key and save new run_count counter
196 try:
197 self.redis_server.rename(k,
198 self.schedule_hash(v))
199 self.redis_server.set(self.schedule_hash(v),
200 serialize(v))
201 self.redis_server.save()
202 except redis.ConnectionError:
203 logger.warning("Couldn't contact redis server")
204 except Exception as e:
205 logger.warning(
206 'Unable to update key in redis '
207 'server: {}'.format(e.message))
208 else: 193 else:
209 # Scheduled job is in running infinitely 194 # Scheduled job is in running infinitely
210 # Send job and update next schedule time 195 # Send job and update next schedule time
211 self.send_request(msg, queue=queue) 196 self.send_request(msg, queue=queue)
212 v[0] = next(v[2]) 197 v[0] = next(v[2])
213 # Persist changes to redis
214 try:
215 self.redis_server.set(
216 self.schedule_hash(v), serialize(v))
217 self.redis_server.save()
218 except redis.ConnectionError:
219 logger.warning("Couldn't contact redis server")
220 except Exception as e:
221 logger.warning(
222 'Unable to update key in redis '
223 'server: {}'.format(e.message))
224 198
225 for job in cancel_jobs: 199 for job in cancel_jobs:
226 message = self.interval_jobs[k][1] 200 try:
227 self.unschedule_job(message) 201 logger.debug('Cancelling job due to run_count: {}'
202 .format(k))
203 self.redis_server.delete(k)
204 self.redis_server.lrem('interval_jobs', 0, k)
205 except Exception as e:
206 logger.warning(
207 'Unable to update key in redis '
208 'server: {}'.format(e))
228 del self.interval_jobs[k] 209 del self.interval_jobs[k]
229 210
230 if not self.maybe_send_heartbeat(events): 211 if not self.maybe_send_heartbeat(events):
@@ -244,7 +225,7 @@ class Scheduler(HeartbeatMixin, EMQPService):
244 225
245 except Exception as e: 226 except Exception as e:
246 logger.warning('Unable to connect to redis server: {}'.format( 227 logger.warning('Unable to connect to redis server: {}'.format(
247 e.message)) 228 e))
248 else: 229 else:
249 return self._redis_server 230 return self._redis_server
250 231
@@ -304,6 +285,7 @@ class Scheduler(HeartbeatMixin, EMQPService):
304 # in memory 285 # in memory
305 try: 286 try:
306 if (self.redis_server.get(schedule_hash)): 287 if (self.redis_server.get(schedule_hash)):
288 self.redis_server.delete(schedule_hash)
307 self.redis_server.lrem('interval_jobs', 0, schedule_hash) 289 self.redis_server.lrem('interval_jobs', 0, schedule_hash)
308 self.redis_server.save() 290 self.redis_server.save()
309 except redis.ConnectionError: 291 except redis.ConnectionError:
@@ -357,6 +339,7 @@ class Scheduler(HeartbeatMixin, EMQPService):
357 headers = message[1] 339 headers = message[1]
358 interval = int(message[2]) 340 interval = int(message[2])
359 cron = str(message[4]) 341 cron = str(message[4])
342 run_count = self.get_run_count_from_headers(headers)
360 343
361 schedule_hash = self.schedule_hash(message) 344 schedule_hash = self.schedule_hash(message)
362 345
@@ -378,7 +361,7 @@ class Scheduler(HeartbeatMixin, EMQPService):
378 message[3], 361 message[3],
379 inter_iter, 362 inter_iter,
380 queue, 363 queue,
381 self.get_run_count_from_headers(headers) 364 run_count
382 ] 365 ]
383 366
384 if schedule_hash in self.cron_jobs: 367 if schedule_hash in self.cron_jobs:
@@ -412,8 +395,12 @@ class Scheduler(HeartbeatMixin, EMQPService):
412 except Exception as e: 395 except Exception as e:
413 logger.warning(str(e)) 396 logger.warning(str(e))
414 397
398 # Send a request in haste mode, decrement run_count if valid
415 if 'nohaste' not in headers: 399 if 'nohaste' not in headers:
416 self.send_request(message[3], queue=queue) 400 # Don't allow decrement past 0
401 if run_count > 0:
402 self.interval_jobs[schedule_hash][4] -= 1
403 self.send_request(message[3], queue=queue)
417 404
418 def get_run_count_from_headers(self, headers): 405 def get_run_count_from_headers(self, headers):
419 run_count = INFINITE_RUN_COUNT 406 run_count = INFINITE_RUN_COUNT
diff --git a/eventmq/worker.py b/eventmq/worker.py
index 00855e8..29814f4 100644
--- a/eventmq/worker.py
+++ b/eventmq/worker.py
@@ -20,8 +20,10 @@ Defines different short-lived workers that execute jobs
20from importlib import import_module 20from importlib import import_module
21from multiprocessing import Process 21from multiprocessing import Process
22from threading import Thread 22from threading import Thread
23from . import conf
23 24
24import logging 25import logging
26import os
25 27
26logger = logging.getLogger(__name__) 28logger = logging.getLogger(__name__)
27 29
@@ -35,6 +37,7 @@ class MultiprocessWorker(Process):
35 super(MultiprocessWorker, self).__init__() 37 super(MultiprocessWorker, self).__init__()
36 self.input_queue = input_queue 38 self.input_queue = input_queue
37 self.output_queue = output_queue 39 self.output_queue = output_queue
40 self.job_count = 0
38 41
39 def run(self): 42 def run(self):
40 """ 43 """
@@ -48,6 +51,7 @@ class MultiprocessWorker(Process):
48 # Pull the payload off the queue and run it 51 # Pull the payload off the queue and run it
49 for payload in iter(self.input_queue.get, 'DONE'): 52 for payload in iter(self.input_queue.get, 'DONE'):
50 53
54 self.job_count += 1
51 timeout = payload.get("timeout", None) 55 timeout = payload.get("timeout", None)
52 msgid = payload.get('msgid', '') 56 msgid = payload.get('msgid', '')
53 57
@@ -73,6 +77,11 @@ class MultiprocessWorker(Process):
73 resp['callback'] = payload['callback'] 77 resp['callback'] = payload['callback']
74 self.output_queue.put(resp) 78 self.output_queue.put(resp)
75 79
80 if self.job_count > conf.MAX_JOB_COUNT:
81 break
82
83 logger.debug("Worker death, PID: {}".format(os.getpid()))
84
76 85
77def _run(payload): 86def _run(payload):
78 if ":" in payload["path"]: 87 if ":" in payload["path"]:
diff --git a/setup.py b/setup.py
index 09e3abd..276732e 100644
--- a/setup.py
+++ b/setup.py
@@ -7,7 +7,7 @@ from setuptools import setup, find_packages
7 7
8setup( 8setup(
9 name='eventmq', 9 name='eventmq',
10 version='0.3-rc4', 10 version='0.3-rc5',
11 description='EventMQ messaging system based on ZeroMQ', 11 description='EventMQ messaging system based on ZeroMQ',
12 packages=find_packages(), 12 packages=find_packages(),
13 install_requires=['pyzmq==15.4.0', 13 install_requires=['pyzmq==15.4.0',