aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorjason2019-01-16 16:39:06 -0700
committerGitHub2019-01-16 16:39:06 -0700
commite2c46a83ef783a68ea1ef559b78f8df29d50e719 (patch)
treecbb01780f216ea9544a2452d7984da89b611b6d1
parent643f051a32316e47f911c3ceaaad0fff305b3e5c (diff)
parent0bed1a072d15ec01822b421bee51965a2d6910e7 (diff)
downloadeventmq-0.3.10-rc1.tar.gz
eventmq-0.3.10-rc1.zip
Merge pull request #68 from eventmq/feature/scheduler_redis_cluster_support0.3.10-rc1
Scheduler: adds optional support for redis and specifying redis client
-rw-r--r--docs/settings_file.rst70
-rw-r--r--etc/eventmq.conf-dist1
-rw-r--r--eventmq/__init__.py2
-rw-r--r--eventmq/client/messages.py4
-rw-r--r--eventmq/conf.py3
-rw-r--r--eventmq/scheduler.py211
-rw-r--r--eventmq/tests/test_utils.py151
-rw-r--r--eventmq/utils/settings.py12
-rw-r--r--setup.py1
9 files changed, 296 insertions, 159 deletions
diff --git a/docs/settings_file.rst b/docs/settings_file.rst
index 04fe451..e9f4b04 100644
--- a/docs/settings_file.rst
+++ b/docs/settings_file.rst
@@ -19,6 +19,13 @@ Default: False
19 19
20Enable most verbose level of debug statements 20Enable most verbose level of debug statements
21 21
22hide_heartbeat_logs
23===================
24Default: True
25
26This hides heart beat messages from the logs. Disabling this will result in very
27noisy log output.
28
22max_sockets 29max_sockets
23=========== 30===========
24Default: 1024 31Default: 1024
@@ -91,15 +98,55 @@ Default: ''
91 98
92Password to use when connecting to redis 99Password to use when connecting to redis
93 100
101redis_client_class
102==================
103Default: ``redis.StrictRedis``
104
105The class to use as the redis client. This can be overridden if you want to use
106a different module to connect to redis. For example
107``rediscluster.StrictRedisCluster``. Note: You make get errors if you don't use
108a strict mode class.
109
110redis_client_class_kwargs
111=========================
112Default: {}
113
114This is a JSON hash map of keyword arguments to pass to the Python class
115constructor. This is useful for using ``redis-cluster-py`` on AWS Elasticache.
116When using Elasticache this value should be set to
117``{"skip_full_coverage_check": true}`` to prevent startup errors.
118
119redis_startup_error_hard_kill
120=============================
121Default: True
122
123If there is an error connecting to the Redis server for persistent schedule
124storage on startup then kill the app. This is useful if you want to prevent
125accidentally accepting schedules that can't be saved to a persistent store. If
126you would like to use redis you will need to ``pip install redis`` or
127``pip install redis-py-cluster`` and define the necessary options.
128
94*********** 129***********
95Job Manager 130Job Manager
96*********** 131***********
97 132
133default_queue_name
134==================
135Default: default
136
137This is the default queue a job manager will listen on if nothing is specified.
138
139default_queue_weight
140====================
141Default: 10
142
143This is the default weight for the default queue is it is not explicitly set.
144
98concurrent_jobs 145concurrent_jobs
99=============== 146===============
100Default: 4 147Default: 4
101 148
102This is the number of concurrent jobs the indiviudal job manager should execute 149This is the number of concurrent jobs the individual job manager should execute
103at a time. If you are using the multiprocess or threading model this number 150at a time. If you are using the multiprocess or threading model this number
104becomes important as you will want to control the load on your server. If the 151becomes important as you will want to control the load on your server. If the
105load equals the number of cores on the server, processes will begin waiting for 152load equals the number of cores on the server, processes will begin waiting for
@@ -115,7 +162,7 @@ queues
115====== 162======
116Default: [[10, "default"]] 163Default: [[10, "default"]]
117 164
118Comma seperated list of queues to process jobs for with their weights. This list 165Comma separated list of queues to process jobs with their weights. This list
119must be valid JSON otherwise an error will be thrown. 166must be valid JSON otherwise an error will be thrown.
120Example: ``queues=[[10, "data_process"], [15, "email"]]``. With these 167Example: ``queues=[[10, "data_process"], [15, "email"]]``. With these
121weights and the ``CONCURRENT_JOBS`` setting, you should be able to tune managers 168weights and the ``CONCURRENT_JOBS`` setting, you should be able to tune managers
@@ -131,14 +178,29 @@ number until the large box is ready to accept another q1 job.
131 default queue so that anything that is not explicitly assigned will still be 178 default queue so that anything that is not explicitly assigned will still be
132 run. 179 run.
133 180
134setup_callabe/setup_path 181setup_callable/setup_path
135======================== 182=========================
136Default: '' (Signifies no task will be attempted) 183Default: '' (Signifies no task will be attempted)
137 184
138Strings containing path and callable to be run when a worker is spawned 185Strings containing path and callable to be run when a worker is spawned
139if applicable to that type of worker. Currently the only supported worker is a 186if applicable to that type of worker. Currently the only supported worker is a
140MultiProcessWorker, and is useful for pulling any global state into memory. 187MultiProcessWorker, and is useful for pulling any global state into memory.
141 188
189job_entry_func
190==============
191Default: '' (Signifies no function will be executed)
192
193The function to execute before **every** job a worker thread executes. For
194example: cleaning up stale database connections. (Django's
195``django.db.connections[].close_if_unusable_or_obsolete()``)
196
197job_exit_func
198=============
199Default: '' (Signifies no function will be executed)
200
201The function to execute **after** every job a worker thread executes. For
202example: closing any database handles that were left open.
203
142max_job_count 204max_job_count
143============= 205=============
144Default: 1024 206Default: 1024
diff --git a/etc/eventmq.conf-dist b/etc/eventmq.conf-dist
index 96dbad4..7882e0c 100644
--- a/etc/eventmq.conf-dist
+++ b/etc/eventmq.conf-dist
@@ -19,6 +19,7 @@ scheduler_addr=tcp://eventmq:47291
19[router] 19[router]
20 20
21[scheduler] 21[scheduler]
22redis_client_class = redis.StrictRedis
22 23
23[jobmanager] 24[jobmanager]
24worker_addr=tcp://127.0.0.1:47290 25worker_addr=tcp://127.0.0.1:47290
diff --git a/eventmq/__init__.py b/eventmq/__init__.py
index 4143854..018a540 100644
--- a/eventmq/__init__.py
+++ b/eventmq/__init__.py
@@ -1,5 +1,5 @@
1__author__ = 'EventMQ Contributors' 1__author__ = 'EventMQ Contributors'
2__version__ = '0.3.9' 2__version__ = '0.3.10-rc1'
3 3
4PROTOCOL_VERSION = 'eMQP/1.0' 4PROTOCOL_VERSION = 'eMQP/1.0'
5 5
diff --git a/eventmq/client/messages.py b/eventmq/client/messages.py
index 9e49ea8..c178c30 100644
--- a/eventmq/client/messages.py
+++ b/eventmq/client/messages.py
@@ -226,14 +226,14 @@ def send_request(socket, message, reply_requested=False, guarantee=False,
226 } 226 }
227 } 227 }
228 Args: 228 Args:
229 socket (socket): Socket to use when sending `message` 229 socket: Socket (Sender or Receiver) to use when sending `message`
230 message: message to send to `socket` 230 message: message to send to `socket`
231 reply_requested (bool): request the return value of func as a reply 231 reply_requested (bool): request the return value of func as a reply
232 guarantee (bool): (Give your best effort) to guarantee that func is 232 guarantee (bool): (Give your best effort) to guarantee that func is
233 executed. Exceptions and things will be logged. 233 executed. Exceptions and things will be logged.
234 retry_count (int): How many times should be retried when encountering 234 retry_count (int): How many times should be retried when encountering
235 an Exception or some other failure before giving up. (default: 0 235 an Exception or some other failure before giving up. (default: 0
236 or immediatly fail) 236 or immediately fail)
237 timeout (int): How many seconds should we wait before killing the job 237 timeout (int): How many seconds should we wait before killing the job
238 default: 0 which means infinite timeout 238 default: 0 which means infinite timeout
239 queue (str): Name of queue to use when executing the job. Default: is 239 queue (str): Name of queue to use when executing the job. Default: is
diff --git a/eventmq/conf.py b/eventmq/conf.py
index 87aa675..8634ebb 100644
--- a/eventmq/conf.py
+++ b/eventmq/conf.py
@@ -88,6 +88,9 @@ RQ_HOST = 'localhost'
88RQ_PORT = 6379 88RQ_PORT = 6379
89RQ_DB = 0 89RQ_DB = 0
90RQ_PASSWORD = '' 90RQ_PASSWORD = ''
91REDIS_CLIENT_CLASS = 'redis.StrictRedis'
92REDIS_CLIENT_CLASS_KWARGS = {}
93REDIS_STARTUP_ERROR_HARD_KILL = True
91 94
92MAX_JOB_COUNT = 1024 95MAX_JOB_COUNT = 1024
93 96
diff --git a/eventmq/scheduler.py b/eventmq/scheduler.py
index 2ff726a..e7c13ec 100644
--- a/eventmq/scheduler.py
+++ b/eventmq/scheduler.py
@@ -18,14 +18,15 @@
18Handles cron and other scheduled tasks 18Handles cron and other scheduled tasks
19""" 19"""
20from hashlib import sha1 as emq_hash 20from hashlib import sha1 as emq_hash
21import importlib
21import json 22import json
22from json import dumps as serialize 23from json import dumps as serialize
23from json import loads as deserialize 24from json import loads as deserialize
24import logging 25import logging
26import sys
25 27
26from croniter import croniter 28from croniter import croniter
27import redis 29from six import iteritems, next
28from six import next
29 30
30from eventmq.log import setup_logger 31from eventmq.log import setup_logger
31 32
@@ -56,7 +57,6 @@ class Scheduler(HeartbeatMixin, EMQPService):
56 57
57 logger.info('EventMQ Version {}'.format(__version__)) 58 logger.info('EventMQ Version {}'.format(__version__))
58 logger.info('Initializing Scheduler...') 59 logger.info('Initializing Scheduler...')
59 import_settings()
60 super(Scheduler, self).__init__(*args, **kwargs) 60 super(Scheduler, self).__init__(*args, **kwargs)
61 self.outgoing = Sender() 61 self.outgoing = Sender()
62 self._redis_server = None 62 self._redis_server = None
@@ -85,30 +85,28 @@ class Scheduler(HeartbeatMixin, EMQPService):
85 85
86 self.poller = Poller() 86 self.poller = Poller()
87 87
88 self.load_jobs()
89
90 self._setup() 88 self._setup()
91 89
92 def load_jobs(self): 90 def load_jobs(self):
93 """ 91 """
94 Loads the jobs that need to be scheduled 92 Loads the jobs from redis that need to be scheduled
95 """ 93 """
96 try: 94 if self.redis_server:
97 interval_job_list = self.redis_server.lrange( 95 try:
98 'interval_jobs', 0, -1) 96 interval_job_list = self.redis_server.lrange(
99 if interval_job_list is not None: 97 'interval_jobs', 0, -1)
100 for i in interval_job_list: 98 if interval_job_list is not None:
101 logger.debug('Restoring job with hash %s' % i) 99 for i in interval_job_list:
102 if (self.redis_server.get(i)): 100 logger.debug('Restoring job with hash %s' % i)
103 self.load_job_from_redis( 101 if self.redis_server.get(i):
104 message=deserialize(self.redis_server.get(i))) 102 self.load_job_from_redis(
105 else: 103 message=deserialize(self.redis_server.get(i)))
106 logger.warning('Expected scheduled job in redis,' + 104 else:
107 'but none was found with hash %s' % i) 105 logger.warning(
108 except redis.ConnectionError: 106 'Expected scheduled job in redis, but none '
109 logger.warning('Could not contact redis server') 107 'was found with hash {}'.format(i))
110 except Exception as e: 108 except Exception as e:
111 logger.warning(str(e)) 109 logger.warning(str(e), exc_info=True)
112 110
113 def _start_event_loop(self): 111 def _start_event_loop(self):
114 """ 112 """
@@ -126,7 +124,6 @@ class Scheduler(HeartbeatMixin, EMQPService):
126 msg = self.outgoing.recv_multipart() 124 msg = self.outgoing.recv_multipart()
127 self.process_message(msg) 125 self.process_message(msg)
128 126
129 # TODO: distribute me!
130 for hash_, cron in self.cron_jobs.items(): 127 for hash_, cron in self.cron_jobs.items():
131 # If the time is now, or passed 128 # If the time is now, or passed
132 if cron[0] <= ts_now: 129 if cron[0] <= ts_now:
@@ -145,10 +142,9 @@ class Scheduler(HeartbeatMixin, EMQPService):
145 seconds_until(cron[0])) 142 seconds_until(cron[0]))
146 143
147 cancel_jobs = [] 144 cancel_jobs = []
148 for k, v in self.interval_jobs.iteritems(): 145 for k, v in iteritems(self.interval_jobs):
149 # TODO: Refactor this entire loop to be readable by humankind
150 # The schedule time has elapsed
151 if v[0] <= m_now: 146 if v[0] <= m_now:
147 # The schedule time has elapsed
152 msg = v[1] 148 msg = v[1]
153 queue = v[3] 149 queue = v[3]
154 150
@@ -168,23 +164,25 @@ class Scheduler(HeartbeatMixin, EMQPService):
168 v[0] = next(v[2]) 164 v[0] = next(v[2])
169 # Persist the change to redis if there are run 165 # Persist the change to redis if there are run
170 # counts still left 166 # counts still left
171 try: 167 if self.redis_server:
172 message = deserialize( 168 try:
173 self.redis_server.get(k)) 169 message = deserialize(
174 new_headers = [] 170 self.redis_server.get(k))
175 for header in message[1].split(','): 171 new_headers = []
176 if 'run_count:' in header: 172 for header in message[1].split(','):
177 new_headers.append( 173 if 'run_count:' in header:
178 'run_count:{}'.format(v[4])) 174 new_headers.append(
179 else: 175 'run_count:{}'.format(
180 new_headers.append(header) 176 v[4]))
181 message[1] = ",".join(new_headers) 177 else:
182 self.redis_server.set( 178 new_headers.append(header)
183 k, serialize(message)) 179 message[1] = ",".join(new_headers)
184 except Exception as e: 180 self.redis_server.set(
185 logger.warning( 181 k, serialize(message))
186 'Unable to update key in redis ' 182 except Exception as e:
187 'server: {}'.format(e)) 183 logger.warning(
184 'Unable to update key in redis '
185 'server: {}'.format(e))
188 else: 186 else:
189 cancel_jobs.append(k) 187 cancel_jobs.append(k)
190 188
@@ -204,21 +202,45 @@ class Scheduler(HeartbeatMixin, EMQPService):
204 202
205 @property 203 @property
206 def redis_server(self): 204 def redis_server(self):
207 # Open connection to redis server for persistance
208 if self._redis_server is None: 205 if self._redis_server is None:
206 # Open connection to redis server for persistence
207 cls_split = conf.REDIS_CLIENT_CLASS.split('.')
208 cls_path, cls_name = '.'.join(cls_split[:-1]), cls_split[-1]
209 try: 209 try:
210 self._redis_server = \ 210 mod = importlib.import_module(cls_path)
211 redis.StrictRedis(host=conf.RQ_HOST, 211 redis_cls = getattr(mod, cls_name)
212 port=conf.RQ_PORT, 212 except (ImportError, AttributeError) as e:
213 db=conf.RQ_DB, 213 errmsg = 'Unable to import redis_client_class {} ({})'.format(
214 password=conf.RQ_PASSWORD) 214 conf.REDIS_CLIENT_CLASS, e)
215 return self._redis_server 215 logger.warning(errmsg)
216
217 if conf.REDIS_STARTUP_ERROR_HARD_KILL:
218 sys.exit(1)
219 return None
220
221 url = 'redis://{}:{}/{}'.format(
222 conf.RQ_HOST, conf.RQ_PORT, conf.RQ_DB)
216 223
224 logger.info('Connecting to redis: {}{}'.format(
225 url, ' with password' if conf.RQ_PASSWORD else ''))
226
227 if conf.RQ_PASSWORD:
228 url = 'redis://:{}@{}:{}/{}'.format(
229 conf.RQ_PASSWORD, conf.RQ_HOST, conf.RQ_PORT,
230 conf.RQ_DB)
231
232 try:
233 self._redis_server = \
234 redis_cls.from_url(url, **conf.REDIS_CLIENT_CLASS_KWARGS)
217 except Exception as e: 235 except Exception as e:
218 logger.warning('Unable to connect to redis server: {}'.format( 236 logger.warning('Unable to connect to redis server: {}'.format(
219 e)) 237 e))
220 else: 238 if conf.REDIS_STARTUP_ERROR_HARD_KILL:
221 return self._redis_server 239 sys.exit(1)
240
241 return None
242
243 return self._redis_server
222 244
223 def send_request(self, jobmsg, queue=None): 245 def send_request(self, jobmsg, queue=None):
224 """ 246 """
@@ -238,21 +260,28 @@ class Scheduler(HeartbeatMixin, EMQPService):
238 return msgid 260 return msgid
239 261
240 def on_disconnect(self, msgid, message): 262 def on_disconnect(self, msgid, message):
263 """Process request to shut down."""
241 logger.info("Received DISCONNECT request: {}".format(message)) 264 logger.info("Received DISCONNECT request: {}".format(message))
242 self._redis_server.connection_pool.disconnect()
243 sendmsg(self.outgoing, KBYE) 265 sendmsg(self.outgoing, KBYE)
244 self.outgoing.unbind(conf.SCHEDULER_ADDR) 266 self.outgoing.unbind(conf.SCHEDULER_ADDR)
267
268 if self._redis_server:
269 # Check the internal var. No need to connect if we haven't already
270 # connected by this point
271 try:
272 self._redis_server.connection_pool.disconnect()
273 except Exception:
274 pass
245 super(Scheduler, self).on_disconnect(msgid, message) 275 super(Scheduler, self).on_disconnect(msgid, message)
246 276
247 def on_kbye(self, msgid, msg): 277 def on_kbye(self, msgid, msg):
278 """Process router going offline"""
248 if not self.is_heartbeat_enabled: 279 if not self.is_heartbeat_enabled:
249 self.reset() 280 self.reset()
250 281
251 def on_unschedule(self, msgid, message): 282 def on_unschedule(self, msgid, message):
252 """ 283 """Unschedule an existing schedule job, if it exists."""
253 Unschedule an existing schedule job, if it exists 284 logger.debug("Received new UNSCHEDULE request: {}".format(message))
254 """
255 logger.info("Received new UNSCHEDULE request: {}".format(message))
256 285
257 schedule_hash = self.schedule_hash(message) 286 schedule_hash = self.schedule_hash(message)
258 # TODO: Notify router whether or not this succeeds 287 # TODO: Notify router whether or not this succeeds
@@ -263,11 +292,10 @@ class Scheduler(HeartbeatMixin, EMQPService):
263 Cancels a job if it exists 292 Cancels a job if it exists
264 293
265 Args: 294 Args:
266 schedule_hash (str): The schedule's unique hash. See 295 schedule_hash (str): The schedule's unique hash.
267 :meth:`Scheduler.schedule_hash` 296 See :meth:`Scheduler.schedule_hash`
268 """ 297 """
269 if schedule_hash in self.interval_jobs: 298 if schedule_hash in self.interval_jobs:
270
271 # If the hash wasn't found in either `cron_jobs` or `interval_jobs` 299 # If the hash wasn't found in either `cron_jobs` or `interval_jobs`
272 # then it's safe to assume it's already deleted. 300 # then it's safe to assume it's already deleted.
273 try: 301 try:
@@ -282,18 +310,18 @@ class Scheduler(HeartbeatMixin, EMQPService):
282 # Double check the redis server even if we didn't find the hash 310 # Double check the redis server even if we didn't find the hash
283 # in memory 311 # in memory
284 try: 312 try:
285 if (self.redis_server.get(schedule_hash)): 313 if self.redis_server:
286 self.redis_server.delete(schedule_hash)
287 self.redis_server.lrem('interval_jobs', 0, schedule_hash) 314 self.redis_server.lrem('interval_jobs', 0, schedule_hash)
288 self.redis_server.save() 315 except Exception as e:
289 except redis.ConnectionError as e: 316 logger.warning(str(e), exc_info=True)
290 logger.warning('Could not contact redis server: {}'.format(e)) 317 try:
318 if self.redis_server and self.redis_server.get(schedule_hash):
319 self.redis_server.delete(schedule_hash)
291 except Exception as e: 320 except Exception as e:
292 logger.warning(str(e), exc_info=True) 321 logger.warning(str(e), exc_info=True)
293 322
294 def load_job_from_redis(self, message): 323 def load_job_from_redis(self, message):
295 """ 324 """Parses and loads a message from redis as a scheduler job."""
296 """
297 from .utils.timeutils import IntervalIter 325 from .utils.timeutils import IntervalIter
298 326
299 queue = message[0].encode('utf-8') 327 queue = message[0].encode('utf-8')
@@ -329,8 +357,7 @@ class Scheduler(HeartbeatMixin, EMQPService):
329 self.cron_jobs[schedule_hash] = [c_next, message[3], c, queue] 357 self.cron_jobs[schedule_hash] = [c_next, message[3], c, queue]
330 358
331 def on_schedule(self, msgid, message): 359 def on_schedule(self, msgid, message):
332 """ 360 """Create a new scheduled job."""
333 """
334 logger.info("Received new SCHEDULE request: {}".format(message)) 361 logger.info("Received new SCHEDULE request: {}".format(message))
335 362
336 queue = message[0] 363 queue = message[0]
@@ -380,19 +407,16 @@ class Scheduler(HeartbeatMixin, EMQPService):
380 self.interval_jobs.pop(schedule_hash) 407 self.interval_jobs.pop(schedule_hash)
381 408
382 # Persist the scheduled job 409 # Persist the scheduled job
383 try: 410 if self.redis_server:
384 if schedule_hash not in self.redis_server.lrange( 411 try:
385 'interval_jobs', 0, -1): 412 if schedule_hash not in self.redis_server.lrange(
386 self.redis_server.lpush('interval_jobs', schedule_hash) 413 'interval_jobs', 0, -1):
387 self.redis_server.set(schedule_hash, serialize(message)) 414 self.redis_server.lpush('interval_jobs', schedule_hash)
388 self.redis_server.save() 415 self.redis_server.set(schedule_hash, serialize(message))
389 logger.debug('Saved job {} with hash {} to redis'.format( 416 logger.debug('Saved job {} with hash {} to redis'.format(
390 message, schedule_hash)) 417 message, schedule_hash))
391 except redis.ConnectionError: 418 except Exception as e:
392 logger.warning('Could not contact redis server. Unable to ' 419 logger.warning(str(e))
393 'guarantee persistence.')
394 except Exception as e:
395 logger.warning(str(e))
396 420
397 # Send a request in haste mode, decrement run_count if needed 421 # Send a request in haste mode, decrement run_count if needed
398 if 'nohaste' not in headers: 422 if 'nohaste' not in headers:
@@ -410,21 +434,17 @@ class Scheduler(HeartbeatMixin, EMQPService):
410 return run_count 434 return run_count
411 435
412 def on_heartbeat(self, msgid, message): 436 def on_heartbeat(self, msgid, message):
413 """ 437 """Noop command. The logic for heart beating is in the event loop."""
414 Noop command. The logic for heartbeating is in the event loop.
415 """
416 438
417 @classmethod 439 @classmethod
418 def schedule_hash(cls, message): 440 def schedule_hash(cls, message):
419 """ 441 """Create a unique identifier to store and reference a message later.
420 Create a unique identifier for this message for storing
421 and referencing later
422 442
423 Args: 443 Args:
424 message (str): The serialized message passed to the scheduler 444 message (str): The serialized message passed to the scheduler
425 445
426 Returns: 446 Returns:
427 int: unique hash for the job 447 str: unique hash for the job
428 """ 448 """
429 449
430 # Get the job portion of the message 450 # Get the job portion of the message
@@ -452,18 +472,17 @@ class Scheduler(HeartbeatMixin, EMQPService):
452 """ 472 """
453 setup_logger("eventmq") 473 setup_logger("eventmq")
454 import_settings() 474 import_settings()
455 self.__init__() 475 import_settings('scheduler')
456 self.start(addr=conf.SCHEDULER_ADDR)
457
458 476
459# Entry point for pip console scripts 477 self.load_jobs()
460def scheduler_main(): 478 self.start(addr=conf.SCHEDULER_ADDR)
461 s = Scheduler()
462 s.scheduler_main()
463 479
464 480
465def test_job(*args, **kwargs): 481def test_job(*args, **kwargs):
466 """ 482 """
467 Simple test job for use with the scheduler 483 Simple test job for use with the scheduler
468 """ 484 """
485 from pprint import pprint
469 print("hello!") # noqa 486 print("hello!") # noqa
487 pprint(args) # noqa
488 pprint(kwargs) # noqa
diff --git a/eventmq/tests/test_utils.py b/eventmq/tests/test_utils.py
index 03da18a..2937085 100644
--- a/eventmq/tests/test_utils.py
+++ b/eventmq/tests/test_utils.py
@@ -12,52 +12,62 @@
12# 12#
13# You should have received a copy of the GNU Lesser General Public License 13# You should have received a copy of the GNU Lesser General Public License
14# along with eventmq. If not, see <http://www.gnu.org/licenses/>. 14# along with eventmq. If not, see <http://www.gnu.org/licenses/>.
15from configparser import ConfigParser
15from imp import reload 16from imp import reload
16import io 17import io
18import os
17import random 19import random
18import sys 20import sys
19import unittest 21import unittest
20 22
21import mock 23import mock
22 24
25from .. import conf
23from .. import constants 26from .. import constants
24from .. import exceptions 27from .. import exceptions
25from ..utils import classes, messages, settings 28from ..utils import classes, messages, settings
26 29
27 30
28class SettingsTestCase(unittest.TestCase): 31class SettingsTestCase(unittest.TestCase):
29 settings_ini = "\n".join( 32 settings_ini = '\n'.join(
30 ("[global]", 33 ('[global]',
31 "super_debug=TRuE", 34 'super_debug=TRuE',
32 "frontend_addr=tcp://0.0.0.0:47291", 35 'frontend_addr=tcp://0.0.0.0:47291',
33 "", 36 '',
34 "[jobmanager]", 37 '[jobmanager]',
35 "super_debug=FalSe", 38 'super_debug=FalSe',
36 'queues=[[50,"google"], [40,"pushes"], [10,"default"]]', 39 'queues=[[50,"google"], [40,"pushes"], [10,"default"]]',
37 "worker_addr=tcp://160.254.23.88:47290", 40 'worker_addr=tcp://160.254.23.88:47290',
38 "concurrent_jobs=9283",)) 41 'concurrent_jobs=9283',
42 '',
43 '[scheduler]',
44 'redis_client_class_kwargs={"test_kwarg": true}',
45 '',
46 '[section_with_bad_list]',
47 'queues=[[10,asdf],]',
48 '',
49 '[section_with_bad_dict]',
50 'redis_client_class_kwargs={asdf, 39}'))
51
52 def setUp(self):
53 self._config = ConfigParser()
54
55 if sys.version_info[0] == 3:
56 self._config.read_string(self.settings_ini)
57 else:
58 self._config.readfp(io.BytesIO(self.settings_ini))
39 59
40 @mock.patch('eventmq.utils.settings.os.path.exists')
41 def test_import_settings_default(self, pathexists_mock):
42 from configparser import ConfigParser
43 from .. import conf
44 # sometimes the tests step on each other with this module. reloading 60 # sometimes the tests step on each other with this module. reloading
45 # ensures fresh test data 61 # ensures fresh test data
46 reload(conf) 62 reload(conf)
47 pathexists_mock.return_value = True
48 63
49 _config = ConfigParser() 64 @mock.patch('eventmq.utils.settings.os.path.exists')
50 65 def test_import_settings_default(self, pathexists_mock):
51 if sys.version_info[0] == 3: 66 pathexists_mock.return_value = True
52 _config.read_string(self.settings_ini)
53 else:
54 _config.readfp(io.BytesIO(self.settings_ini))
55 67
56 # Global section
57 # --------------
58 with mock.patch('eventmq.utils.settings.ConfigParser', 68 with mock.patch('eventmq.utils.settings.ConfigParser',
59 return_value=_config): 69 return_value=self._config):
60 with mock.patch.object(_config, 'read'): 70 with mock.patch.object(self._config, 'read'):
61 settings.import_settings() 71 settings.import_settings()
62 72
63 # Changed. Default is false 73 # Changed. Default is false
@@ -75,25 +85,19 @@ class SettingsTestCase(unittest.TestCase):
75 # Default is (10, 'default') 85 # Default is (10, 'default')
76 self.assertEqual(conf.QUEUES, [(10, conf.DEFAULT_QUEUE_NAME), ]) 86 self.assertEqual(conf.QUEUES, [(10, conf.DEFAULT_QUEUE_NAME), ])
77 87
78 # Job Manager Section 88 @mock.patch('eventmq.utils.settings.os.path.exists')
79 # ------------------- 89 def test_import_settings_jobmanager(self, pathexists_mock):
80 from configparser import ConfigParser 90 pathexists_mock.return_value = True
81 _config = ConfigParser()
82 if sys.version_info[0] == 3:
83 _config.read_string(self.settings_ini)
84 else:
85 _config.readfp(io.BytesIO(self.settings_ini))
86 91
87 # Global section
88 # --------------
89 with mock.patch('eventmq.utils.settings.ConfigParser', 92 with mock.patch('eventmq.utils.settings.ConfigParser',
90 return_value=_config): 93 return_value=self._config):
91 with mock.patch.object(ConfigParser, 'read'): 94 with mock.patch.object(self._config, 'read'):
95 settings.import_settings()
92 settings.import_settings('jobmanager') 96 settings.import_settings('jobmanager')
93 97
94 # Changed 98 # Changed from True (in global) to False
95 self.assertFalse(conf.SUPER_DEBUG) 99 self.assertFalse(conf.SUPER_DEBUG)
96 # Changed 100 # Override default value
97 self.assertEqual(conf.CONCURRENT_JOBS, 9283) 101 self.assertEqual(conf.CONCURRENT_JOBS, 9283)
98 102
99 # Changed 103 # Changed
@@ -102,21 +106,14 @@ class SettingsTestCase(unittest.TestCase):
102 106
103 self.assertEqual(conf.WORKER_ADDR, 'tcp://160.254.23.88:47290') 107 self.assertEqual(conf.WORKER_ADDR, 'tcp://160.254.23.88:47290')
104 108
105 # Invalid section 109 @mock.patch('eventmq.utils.settings.os.path.exists')
106 # --------------- 110 def test_load_invalid_section_uses_defaults(self, pathexists_mock):
107 # This shouldn't fail, and nothing should change 111 pathexists_mock.return_value = True
108 _config = ConfigParser()
109
110 if sys.version_info[0] == 3:
111 _config.read_string(self.settings_ini)
112 else:
113 _config.readfp(io.BytesIO(self.settings_ini))
114 112
115 # Global section
116 # --------------
117 with mock.patch('eventmq.utils.settings.ConfigParser', 113 with mock.patch('eventmq.utils.settings.ConfigParser',
118 return_value=_config): 114 return_value=self._config):
119 with mock.patch.object(ConfigParser, 'read'): 115 with mock.patch.object(self._config, 'read'):
116 settings.import_settings('jobmanager')
120 settings.import_settings('nonexistent_section') 117 settings.import_settings('nonexistent_section')
121 118
122 self.assertEqual(conf.CONCURRENT_JOBS, 9283) 119 self.assertEqual(conf.CONCURRENT_JOBS, 9283)
@@ -124,6 +121,58 @@ class SettingsTestCase(unittest.TestCase):
124 [(50, 'google'), (40, 'pushes'), (10, 'default')]) 121 [(50, 'google'), (40, 'pushes'), (10, 'default')])
125 self.assertEqual(conf.WORKER_ADDR, 'tcp://160.254.23.88:47290') 122 self.assertEqual(conf.WORKER_ADDR, 'tcp://160.254.23.88:47290')
126 123
124 @mock.patch('eventmq.utils.settings.os.path.exists')
125 def test_dictionary(self, pathexists_mock):
126 pathexists_mock.return_value = True
127
128 with mock.patch('eventmq.utils.settings.ConfigParser',
129 return_value=self._config):
130 with mock.patch.object(self._config, 'read'):
131 settings.import_settings('scheduler')
132
133 # Dictionary should be dictionary
134 self.assertTrue(isinstance(conf.REDIS_CLIENT_CLASS_KWARGS, dict))
135 # Dictionary should be dictionary
136 self.assertTrue(conf.REDIS_CLIENT_CLASS_KWARGS['test_kwarg'])
137
138 @mock.patch('eventmq.utils.settings.os.path.exists')
139 def test_favor_environment_variables(self, pathexists_mock):
140 pathexists_mock.return_value = True
141
142 # frontend_addr has been defined in global, but should be the following
143 # value
144 value = 'from environment variable'
145 os.environ['EVENTMQ_FRONTEND_ADDR'] = value
146 try:
147 with mock.patch('eventmq.utils.settings.ConfigParser',
148 return_value=self._config):
149 with mock.patch.object(self._config, 'read'):
150 settings.import_settings()
151 finally:
152 del os.environ['EVENTMQ_FRONTEND_ADDR']
153
154 self.assertEqual(conf.FRONTEND_ADDR, value)
155
156 @mock.patch('eventmq.utils.settings.os.path.exists')
157 def test_valueerror_on_invalid_json_list(self, pathexists_mock):
158 pathexists_mock.return_value = True
159
160 with mock.patch('eventmq.utils.settings.ConfigParser',
161 return_value=self._config):
162 with mock.patch.object(self._config, 'read'):
163 with self.assertRaises(ValueError):
164 settings.import_settings('section_with_bad_list')
165
166 @mock.patch('eventmq.utils.settings.os.path.exists')
167 def test_valueerror_on_invalid_json_dict(self, pathexists_mock):
168 pathexists_mock.return_value = True
169
170 with mock.patch('eventmq.utils.settings.ConfigParser',
171 return_value=self._config):
172 with mock.patch.object(self._config, 'read'):
173 with self.assertRaises(ValueError):
174 settings.import_settings('section_with_bad_dict')
175
127 176
128class EMQPServiceTestCase(unittest.TestCase): 177class EMQPServiceTestCase(unittest.TestCase):
129 178
diff --git a/eventmq/utils/settings.py b/eventmq/utils/settings.py
index 3999e39..7a91858 100644
--- a/eventmq/utils/settings.py
+++ b/eventmq/utils/settings.py
@@ -79,7 +79,7 @@ def import_settings(section='global'):
79 value = t(json.loads(value)) 79 value = t(json.loads(value))
80 except ValueError: 80 except ValueError:
81 raise ValueError( 81 raise ValueError(
82 "Invalid JSON syntax for {} setting".format(name)) 82 'Invalid JSON syntax for {} setting'.format(name))
83 # json.loads coverts all arrays to lists, but if the first 83 # json.loads coverts all arrays to lists, but if the first
84 # element in the default is a tuple (like in QUEUES) then 84 # element in the default is a tuple (like in QUEUES) then
85 # convert those elements, otherwise whatever it's type is 85 # convert those elements, otherwise whatever it's type is
@@ -91,8 +91,12 @@ def import_settings(section='global'):
91 elif isinstance(default_value, bool): 91 elif isinstance(default_value, bool):
92 setattr(conf, name, 92 setattr(conf, name,
93 True if 't' in value.lower() else False) 93 True if 't' in value.lower() else False)
94 elif t == dict:
95 try:
96 value = json.loads(value)
97 except ValueError:
98 raise ValueError(
99 'Invalid JSON syntax for {} setting'.format(name))
100 setattr(conf, name, value)
94 else: 101 else:
95 setattr(conf, name, t(value)) 102 setattr(conf, name, t(value))
96
97 logger.debug("Setting conf.{} to {}".format(
98 name, getattr(conf, name)))
diff --git a/setup.py b/setup.py
index 1427d67..dceac7f 100644
--- a/setup.py
+++ b/setup.py
@@ -22,7 +22,6 @@ setup(
22 'six==1.10.0', 22 'six==1.10.0',
23 'monotonic==0.4', 23 'monotonic==0.4',
24 'croniter==0.3.10', 24 'croniter==0.3.10',
25 'redis==2.10.3',
26 'future==0.15.2', 25 'future==0.15.2',
27 'psutil==5.0.0', 26 'psutil==5.0.0',
28 'python-dateutil>=2.1,<3.0.0'], 27 'python-dateutil>=2.1,<3.0.0'],