diff options
| author | jason | 2019-01-16 16:39:06 -0700 |
|---|---|---|
| committer | GitHub | 2019-01-16 16:39:06 -0700 |
| commit | e2c46a83ef783a68ea1ef559b78f8df29d50e719 (patch) | |
| tree | cbb01780f216ea9544a2452d7984da89b611b6d1 | |
| parent | 643f051a32316e47f911c3ceaaad0fff305b3e5c (diff) | |
| parent | 0bed1a072d15ec01822b421bee51965a2d6910e7 (diff) | |
| download | eventmq-e2c46a83ef783a68ea1ef559b78f8df29d50e719.tar.gz eventmq-e2c46a83ef783a68ea1ef559b78f8df29d50e719.zip | |
Merge pull request #68 from eventmq/feature/scheduler_redis_cluster_support0.3.10-rc1
Scheduler: adds optional support for redis and specifying redis client
| -rw-r--r-- | docs/settings_file.rst | 70 | ||||
| -rw-r--r-- | etc/eventmq.conf-dist | 1 | ||||
| -rw-r--r-- | eventmq/__init__.py | 2 | ||||
| -rw-r--r-- | eventmq/client/messages.py | 4 | ||||
| -rw-r--r-- | eventmq/conf.py | 3 | ||||
| -rw-r--r-- | eventmq/scheduler.py | 211 | ||||
| -rw-r--r-- | eventmq/tests/test_utils.py | 151 | ||||
| -rw-r--r-- | eventmq/utils/settings.py | 12 | ||||
| -rw-r--r-- | setup.py | 1 |
9 files changed, 296 insertions, 159 deletions
diff --git a/docs/settings_file.rst b/docs/settings_file.rst index 04fe451..e9f4b04 100644 --- a/docs/settings_file.rst +++ b/docs/settings_file.rst | |||
| @@ -19,6 +19,13 @@ Default: False | |||
| 19 | 19 | ||
| 20 | Enable most verbose level of debug statements | 20 | Enable most verbose level of debug statements |
| 21 | 21 | ||
| 22 | hide_heartbeat_logs | ||
| 23 | =================== | ||
| 24 | Default: True | ||
| 25 | |||
| 26 | This hides heart beat messages from the logs. Disabling this will result in very | ||
| 27 | noisy log output. | ||
| 28 | |||
| 22 | max_sockets | 29 | max_sockets |
| 23 | =========== | 30 | =========== |
| 24 | Default: 1024 | 31 | Default: 1024 |
| @@ -91,15 +98,55 @@ Default: '' | |||
| 91 | 98 | ||
| 92 | Password to use when connecting to redis | 99 | Password to use when connecting to redis |
| 93 | 100 | ||
| 101 | redis_client_class | ||
| 102 | ================== | ||
| 103 | Default: ``redis.StrictRedis`` | ||
| 104 | |||
| 105 | The class to use as the redis client. This can be overridden if you want to use | ||
| 106 | a different module to connect to redis. For example | ||
| 107 | ``rediscluster.StrictRedisCluster``. Note: You make get errors if you don't use | ||
| 108 | a strict mode class. | ||
| 109 | |||
| 110 | redis_client_class_kwargs | ||
| 111 | ========================= | ||
| 112 | Default: {} | ||
| 113 | |||
| 114 | This is a JSON hash map of keyword arguments to pass to the Python class | ||
| 115 | constructor. This is useful for using ``redis-cluster-py`` on AWS Elasticache. | ||
| 116 | When using Elasticache this value should be set to | ||
| 117 | ``{"skip_full_coverage_check": true}`` to prevent startup errors. | ||
| 118 | |||
| 119 | redis_startup_error_hard_kill | ||
| 120 | ============================= | ||
| 121 | Default: True | ||
| 122 | |||
| 123 | If there is an error connecting to the Redis server for persistent schedule | ||
| 124 | storage on startup then kill the app. This is useful if you want to prevent | ||
| 125 | accidentally accepting schedules that can't be saved to a persistent store. If | ||
| 126 | you would like to use redis you will need to ``pip install redis`` or | ||
| 127 | ``pip install redis-py-cluster`` and define the necessary options. | ||
| 128 | |||
| 94 | *********** | 129 | *********** |
| 95 | Job Manager | 130 | Job Manager |
| 96 | *********** | 131 | *********** |
| 97 | 132 | ||
| 133 | default_queue_name | ||
| 134 | ================== | ||
| 135 | Default: default | ||
| 136 | |||
| 137 | This is the default queue a job manager will listen on if nothing is specified. | ||
| 138 | |||
| 139 | default_queue_weight | ||
| 140 | ==================== | ||
| 141 | Default: 10 | ||
| 142 | |||
| 143 | This is the default weight for the default queue is it is not explicitly set. | ||
| 144 | |||
| 98 | concurrent_jobs | 145 | concurrent_jobs |
| 99 | =============== | 146 | =============== |
| 100 | Default: 4 | 147 | Default: 4 |
| 101 | 148 | ||
| 102 | This is the number of concurrent jobs the indiviudal job manager should execute | 149 | This is the number of concurrent jobs the individual job manager should execute |
| 103 | at a time. If you are using the multiprocess or threading model this number | 150 | at a time. If you are using the multiprocess or threading model this number |
| 104 | becomes important as you will want to control the load on your server. If the | 151 | becomes important as you will want to control the load on your server. If the |
| 105 | load equals the number of cores on the server, processes will begin waiting for | 152 | load equals the number of cores on the server, processes will begin waiting for |
| @@ -115,7 +162,7 @@ queues | |||
| 115 | ====== | 162 | ====== |
| 116 | Default: [[10, "default"]] | 163 | Default: [[10, "default"]] |
| 117 | 164 | ||
| 118 | Comma seperated list of queues to process jobs for with their weights. This list | 165 | Comma separated list of queues to process jobs with their weights. This list |
| 119 | must be valid JSON otherwise an error will be thrown. | 166 | must be valid JSON otherwise an error will be thrown. |
| 120 | Example: ``queues=[[10, "data_process"], [15, "email"]]``. With these | 167 | Example: ``queues=[[10, "data_process"], [15, "email"]]``. With these |
| 121 | weights and the ``CONCURRENT_JOBS`` setting, you should be able to tune managers | 168 | weights and the ``CONCURRENT_JOBS`` setting, you should be able to tune managers |
| @@ -131,14 +178,29 @@ number until the large box is ready to accept another q1 job. | |||
| 131 | default queue so that anything that is not explicitly assigned will still be | 178 | default queue so that anything that is not explicitly assigned will still be |
| 132 | run. | 179 | run. |
| 133 | 180 | ||
| 134 | setup_callabe/setup_path | 181 | setup_callable/setup_path |
| 135 | ======================== | 182 | ========================= |
| 136 | Default: '' (Signifies no task will be attempted) | 183 | Default: '' (Signifies no task will be attempted) |
| 137 | 184 | ||
| 138 | Strings containing path and callable to be run when a worker is spawned | 185 | Strings containing path and callable to be run when a worker is spawned |
| 139 | if applicable to that type of worker. Currently the only supported worker is a | 186 | if applicable to that type of worker. Currently the only supported worker is a |
| 140 | MultiProcessWorker, and is useful for pulling any global state into memory. | 187 | MultiProcessWorker, and is useful for pulling any global state into memory. |
| 141 | 188 | ||
| 189 | job_entry_func | ||
| 190 | ============== | ||
| 191 | Default: '' (Signifies no function will be executed) | ||
| 192 | |||
| 193 | The function to execute before **every** job a worker thread executes. For | ||
| 194 | example: cleaning up stale database connections. (Django's | ||
| 195 | ``django.db.connections[].close_if_unusable_or_obsolete()``) | ||
| 196 | |||
| 197 | job_exit_func | ||
| 198 | ============= | ||
| 199 | Default: '' (Signifies no function will be executed) | ||
| 200 | |||
| 201 | The function to execute **after** every job a worker thread executes. For | ||
| 202 | example: closing any database handles that were left open. | ||
| 203 | |||
| 142 | max_job_count | 204 | max_job_count |
| 143 | ============= | 205 | ============= |
| 144 | Default: 1024 | 206 | Default: 1024 |
diff --git a/etc/eventmq.conf-dist b/etc/eventmq.conf-dist index 96dbad4..7882e0c 100644 --- a/etc/eventmq.conf-dist +++ b/etc/eventmq.conf-dist | |||
| @@ -19,6 +19,7 @@ scheduler_addr=tcp://eventmq:47291 | |||
| 19 | [router] | 19 | [router] |
| 20 | 20 | ||
| 21 | [scheduler] | 21 | [scheduler] |
| 22 | redis_client_class = redis.StrictRedis | ||
| 22 | 23 | ||
| 23 | [jobmanager] | 24 | [jobmanager] |
| 24 | worker_addr=tcp://127.0.0.1:47290 | 25 | worker_addr=tcp://127.0.0.1:47290 |
diff --git a/eventmq/__init__.py b/eventmq/__init__.py index 4143854..018a540 100644 --- a/eventmq/__init__.py +++ b/eventmq/__init__.py | |||
| @@ -1,5 +1,5 @@ | |||
| 1 | __author__ = 'EventMQ Contributors' | 1 | __author__ = 'EventMQ Contributors' |
| 2 | __version__ = '0.3.9' | 2 | __version__ = '0.3.10-rc1' |
| 3 | 3 | ||
| 4 | PROTOCOL_VERSION = 'eMQP/1.0' | 4 | PROTOCOL_VERSION = 'eMQP/1.0' |
| 5 | 5 | ||
diff --git a/eventmq/client/messages.py b/eventmq/client/messages.py index 9e49ea8..c178c30 100644 --- a/eventmq/client/messages.py +++ b/eventmq/client/messages.py | |||
| @@ -226,14 +226,14 @@ def send_request(socket, message, reply_requested=False, guarantee=False, | |||
| 226 | } | 226 | } |
| 227 | } | 227 | } |
| 228 | Args: | 228 | Args: |
| 229 | socket (socket): Socket to use when sending `message` | 229 | socket: Socket (Sender or Receiver) to use when sending `message` |
| 230 | message: message to send to `socket` | 230 | message: message to send to `socket` |
| 231 | reply_requested (bool): request the return value of func as a reply | 231 | reply_requested (bool): request the return value of func as a reply |
| 232 | guarantee (bool): (Give your best effort) to guarantee that func is | 232 | guarantee (bool): (Give your best effort) to guarantee that func is |
| 233 | executed. Exceptions and things will be logged. | 233 | executed. Exceptions and things will be logged. |
| 234 | retry_count (int): How many times should be retried when encountering | 234 | retry_count (int): How many times should be retried when encountering |
| 235 | an Exception or some other failure before giving up. (default: 0 | 235 | an Exception or some other failure before giving up. (default: 0 |
| 236 | or immediatly fail) | 236 | or immediately fail) |
| 237 | timeout (int): How many seconds should we wait before killing the job | 237 | timeout (int): How many seconds should we wait before killing the job |
| 238 | default: 0 which means infinite timeout | 238 | default: 0 which means infinite timeout |
| 239 | queue (str): Name of queue to use when executing the job. Default: is | 239 | queue (str): Name of queue to use when executing the job. Default: is |
diff --git a/eventmq/conf.py b/eventmq/conf.py index 87aa675..8634ebb 100644 --- a/eventmq/conf.py +++ b/eventmq/conf.py | |||
| @@ -88,6 +88,9 @@ RQ_HOST = 'localhost' | |||
| 88 | RQ_PORT = 6379 | 88 | RQ_PORT = 6379 |
| 89 | RQ_DB = 0 | 89 | RQ_DB = 0 |
| 90 | RQ_PASSWORD = '' | 90 | RQ_PASSWORD = '' |
| 91 | REDIS_CLIENT_CLASS = 'redis.StrictRedis' | ||
| 92 | REDIS_CLIENT_CLASS_KWARGS = {} | ||
| 93 | REDIS_STARTUP_ERROR_HARD_KILL = True | ||
| 91 | 94 | ||
| 92 | MAX_JOB_COUNT = 1024 | 95 | MAX_JOB_COUNT = 1024 |
| 93 | 96 | ||
diff --git a/eventmq/scheduler.py b/eventmq/scheduler.py index 2ff726a..e7c13ec 100644 --- a/eventmq/scheduler.py +++ b/eventmq/scheduler.py | |||
| @@ -18,14 +18,15 @@ | |||
| 18 | Handles cron and other scheduled tasks | 18 | Handles cron and other scheduled tasks |
| 19 | """ | 19 | """ |
| 20 | from hashlib import sha1 as emq_hash | 20 | from hashlib import sha1 as emq_hash |
| 21 | import importlib | ||
| 21 | import json | 22 | import json |
| 22 | from json import dumps as serialize | 23 | from json import dumps as serialize |
| 23 | from json import loads as deserialize | 24 | from json import loads as deserialize |
| 24 | import logging | 25 | import logging |
| 26 | import sys | ||
| 25 | 27 | ||
| 26 | from croniter import croniter | 28 | from croniter import croniter |
| 27 | import redis | 29 | from six import iteritems, next |
| 28 | from six import next | ||
| 29 | 30 | ||
| 30 | from eventmq.log import setup_logger | 31 | from eventmq.log import setup_logger |
| 31 | 32 | ||
| @@ -56,7 +57,6 @@ class Scheduler(HeartbeatMixin, EMQPService): | |||
| 56 | 57 | ||
| 57 | logger.info('EventMQ Version {}'.format(__version__)) | 58 | logger.info('EventMQ Version {}'.format(__version__)) |
| 58 | logger.info('Initializing Scheduler...') | 59 | logger.info('Initializing Scheduler...') |
| 59 | import_settings() | ||
| 60 | super(Scheduler, self).__init__(*args, **kwargs) | 60 | super(Scheduler, self).__init__(*args, **kwargs) |
| 61 | self.outgoing = Sender() | 61 | self.outgoing = Sender() |
| 62 | self._redis_server = None | 62 | self._redis_server = None |
| @@ -85,30 +85,28 @@ class Scheduler(HeartbeatMixin, EMQPService): | |||
| 85 | 85 | ||
| 86 | self.poller = Poller() | 86 | self.poller = Poller() |
| 87 | 87 | ||
| 88 | self.load_jobs() | ||
| 89 | |||
| 90 | self._setup() | 88 | self._setup() |
| 91 | 89 | ||
| 92 | def load_jobs(self): | 90 | def load_jobs(self): |
| 93 | """ | 91 | """ |
| 94 | Loads the jobs that need to be scheduled | 92 | Loads the jobs from redis that need to be scheduled |
| 95 | """ | 93 | """ |
| 96 | try: | 94 | if self.redis_server: |
| 97 | interval_job_list = self.redis_server.lrange( | 95 | try: |
| 98 | 'interval_jobs', 0, -1) | 96 | interval_job_list = self.redis_server.lrange( |
| 99 | if interval_job_list is not None: | 97 | 'interval_jobs', 0, -1) |
| 100 | for i in interval_job_list: | 98 | if interval_job_list is not None: |
| 101 | logger.debug('Restoring job with hash %s' % i) | 99 | for i in interval_job_list: |
| 102 | if (self.redis_server.get(i)): | 100 | logger.debug('Restoring job with hash %s' % i) |
| 103 | self.load_job_from_redis( | 101 | if self.redis_server.get(i): |
| 104 | message=deserialize(self.redis_server.get(i))) | 102 | self.load_job_from_redis( |
| 105 | else: | 103 | message=deserialize(self.redis_server.get(i))) |
| 106 | logger.warning('Expected scheduled job in redis,' + | 104 | else: |
| 107 | 'but none was found with hash %s' % i) | 105 | logger.warning( |
| 108 | except redis.ConnectionError: | 106 | 'Expected scheduled job in redis, but none ' |
| 109 | logger.warning('Could not contact redis server') | 107 | 'was found with hash {}'.format(i)) |
| 110 | except Exception as e: | 108 | except Exception as e: |
| 111 | logger.warning(str(e)) | 109 | logger.warning(str(e), exc_info=True) |
| 112 | 110 | ||
| 113 | def _start_event_loop(self): | 111 | def _start_event_loop(self): |
| 114 | """ | 112 | """ |
| @@ -126,7 +124,6 @@ class Scheduler(HeartbeatMixin, EMQPService): | |||
| 126 | msg = self.outgoing.recv_multipart() | 124 | msg = self.outgoing.recv_multipart() |
| 127 | self.process_message(msg) | 125 | self.process_message(msg) |
| 128 | 126 | ||
| 129 | # TODO: distribute me! | ||
| 130 | for hash_, cron in self.cron_jobs.items(): | 127 | for hash_, cron in self.cron_jobs.items(): |
| 131 | # If the time is now, or passed | 128 | # If the time is now, or passed |
| 132 | if cron[0] <= ts_now: | 129 | if cron[0] <= ts_now: |
| @@ -145,10 +142,9 @@ class Scheduler(HeartbeatMixin, EMQPService): | |||
| 145 | seconds_until(cron[0])) | 142 | seconds_until(cron[0])) |
| 146 | 143 | ||
| 147 | cancel_jobs = [] | 144 | cancel_jobs = [] |
| 148 | for k, v in self.interval_jobs.iteritems(): | 145 | for k, v in iteritems(self.interval_jobs): |
| 149 | # TODO: Refactor this entire loop to be readable by humankind | ||
| 150 | # The schedule time has elapsed | ||
| 151 | if v[0] <= m_now: | 146 | if v[0] <= m_now: |
| 147 | # The schedule time has elapsed | ||
| 152 | msg = v[1] | 148 | msg = v[1] |
| 153 | queue = v[3] | 149 | queue = v[3] |
| 154 | 150 | ||
| @@ -168,23 +164,25 @@ class Scheduler(HeartbeatMixin, EMQPService): | |||
| 168 | v[0] = next(v[2]) | 164 | v[0] = next(v[2]) |
| 169 | # Persist the change to redis if there are run | 165 | # Persist the change to redis if there are run |
| 170 | # counts still left | 166 | # counts still left |
| 171 | try: | 167 | if self.redis_server: |
| 172 | message = deserialize( | 168 | try: |
| 173 | self.redis_server.get(k)) | 169 | message = deserialize( |
| 174 | new_headers = [] | 170 | self.redis_server.get(k)) |
| 175 | for header in message[1].split(','): | 171 | new_headers = [] |
| 176 | if 'run_count:' in header: | 172 | for header in message[1].split(','): |
| 177 | new_headers.append( | 173 | if 'run_count:' in header: |
| 178 | 'run_count:{}'.format(v[4])) | 174 | new_headers.append( |
| 179 | else: | 175 | 'run_count:{}'.format( |
| 180 | new_headers.append(header) | 176 | v[4])) |
| 181 | message[1] = ",".join(new_headers) | 177 | else: |
| 182 | self.redis_server.set( | 178 | new_headers.append(header) |
| 183 | k, serialize(message)) | 179 | message[1] = ",".join(new_headers) |
| 184 | except Exception as e: | 180 | self.redis_server.set( |
| 185 | logger.warning( | 181 | k, serialize(message)) |
| 186 | 'Unable to update key in redis ' | 182 | except Exception as e: |
| 187 | 'server: {}'.format(e)) | 183 | logger.warning( |
| 184 | 'Unable to update key in redis ' | ||
| 185 | 'server: {}'.format(e)) | ||
| 188 | else: | 186 | else: |
| 189 | cancel_jobs.append(k) | 187 | cancel_jobs.append(k) |
| 190 | 188 | ||
| @@ -204,21 +202,45 @@ class Scheduler(HeartbeatMixin, EMQPService): | |||
| 204 | 202 | ||
| 205 | @property | 203 | @property |
| 206 | def redis_server(self): | 204 | def redis_server(self): |
| 207 | # Open connection to redis server for persistance | ||
| 208 | if self._redis_server is None: | 205 | if self._redis_server is None: |
| 206 | # Open connection to redis server for persistence | ||
| 207 | cls_split = conf.REDIS_CLIENT_CLASS.split('.') | ||
| 208 | cls_path, cls_name = '.'.join(cls_split[:-1]), cls_split[-1] | ||
| 209 | try: | 209 | try: |
| 210 | self._redis_server = \ | 210 | mod = importlib.import_module(cls_path) |
| 211 | redis.StrictRedis(host=conf.RQ_HOST, | 211 | redis_cls = getattr(mod, cls_name) |
| 212 | port=conf.RQ_PORT, | 212 | except (ImportError, AttributeError) as e: |
| 213 | db=conf.RQ_DB, | 213 | errmsg = 'Unable to import redis_client_class {} ({})'.format( |
| 214 | password=conf.RQ_PASSWORD) | 214 | conf.REDIS_CLIENT_CLASS, e) |
| 215 | return self._redis_server | 215 | logger.warning(errmsg) |
| 216 | |||
| 217 | if conf.REDIS_STARTUP_ERROR_HARD_KILL: | ||
| 218 | sys.exit(1) | ||
| 219 | return None | ||
| 220 | |||
| 221 | url = 'redis://{}:{}/{}'.format( | ||
| 222 | conf.RQ_HOST, conf.RQ_PORT, conf.RQ_DB) | ||
| 216 | 223 | ||
| 224 | logger.info('Connecting to redis: {}{}'.format( | ||
| 225 | url, ' with password' if conf.RQ_PASSWORD else '')) | ||
| 226 | |||
| 227 | if conf.RQ_PASSWORD: | ||
| 228 | url = 'redis://:{}@{}:{}/{}'.format( | ||
| 229 | conf.RQ_PASSWORD, conf.RQ_HOST, conf.RQ_PORT, | ||
| 230 | conf.RQ_DB) | ||
| 231 | |||
| 232 | try: | ||
| 233 | self._redis_server = \ | ||
| 234 | redis_cls.from_url(url, **conf.REDIS_CLIENT_CLASS_KWARGS) | ||
| 217 | except Exception as e: | 235 | except Exception as e: |
| 218 | logger.warning('Unable to connect to redis server: {}'.format( | 236 | logger.warning('Unable to connect to redis server: {}'.format( |
| 219 | e)) | 237 | e)) |
| 220 | else: | 238 | if conf.REDIS_STARTUP_ERROR_HARD_KILL: |
| 221 | return self._redis_server | 239 | sys.exit(1) |
| 240 | |||
| 241 | return None | ||
| 242 | |||
| 243 | return self._redis_server | ||
| 222 | 244 | ||
| 223 | def send_request(self, jobmsg, queue=None): | 245 | def send_request(self, jobmsg, queue=None): |
| 224 | """ | 246 | """ |
| @@ -238,21 +260,28 @@ class Scheduler(HeartbeatMixin, EMQPService): | |||
| 238 | return msgid | 260 | return msgid |
| 239 | 261 | ||
| 240 | def on_disconnect(self, msgid, message): | 262 | def on_disconnect(self, msgid, message): |
| 263 | """Process request to shut down.""" | ||
| 241 | logger.info("Received DISCONNECT request: {}".format(message)) | 264 | logger.info("Received DISCONNECT request: {}".format(message)) |
| 242 | self._redis_server.connection_pool.disconnect() | ||
| 243 | sendmsg(self.outgoing, KBYE) | 265 | sendmsg(self.outgoing, KBYE) |
| 244 | self.outgoing.unbind(conf.SCHEDULER_ADDR) | 266 | self.outgoing.unbind(conf.SCHEDULER_ADDR) |
| 267 | |||
| 268 | if self._redis_server: | ||
| 269 | # Check the internal var. No need to connect if we haven't already | ||
| 270 | # connected by this point | ||
| 271 | try: | ||
| 272 | self._redis_server.connection_pool.disconnect() | ||
| 273 | except Exception: | ||
| 274 | pass | ||
| 245 | super(Scheduler, self).on_disconnect(msgid, message) | 275 | super(Scheduler, self).on_disconnect(msgid, message) |
| 246 | 276 | ||
| 247 | def on_kbye(self, msgid, msg): | 277 | def on_kbye(self, msgid, msg): |
| 278 | """Process router going offline""" | ||
| 248 | if not self.is_heartbeat_enabled: | 279 | if not self.is_heartbeat_enabled: |
| 249 | self.reset() | 280 | self.reset() |
| 250 | 281 | ||
| 251 | def on_unschedule(self, msgid, message): | 282 | def on_unschedule(self, msgid, message): |
| 252 | """ | 283 | """Unschedule an existing schedule job, if it exists.""" |
| 253 | Unschedule an existing schedule job, if it exists | 284 | logger.debug("Received new UNSCHEDULE request: {}".format(message)) |
| 254 | """ | ||
| 255 | logger.info("Received new UNSCHEDULE request: {}".format(message)) | ||
| 256 | 285 | ||
| 257 | schedule_hash = self.schedule_hash(message) | 286 | schedule_hash = self.schedule_hash(message) |
| 258 | # TODO: Notify router whether or not this succeeds | 287 | # TODO: Notify router whether or not this succeeds |
| @@ -263,11 +292,10 @@ class Scheduler(HeartbeatMixin, EMQPService): | |||
| 263 | Cancels a job if it exists | 292 | Cancels a job if it exists |
| 264 | 293 | ||
| 265 | Args: | 294 | Args: |
| 266 | schedule_hash (str): The schedule's unique hash. See | 295 | schedule_hash (str): The schedule's unique hash. |
| 267 | :meth:`Scheduler.schedule_hash` | 296 | See :meth:`Scheduler.schedule_hash` |
| 268 | """ | 297 | """ |
| 269 | if schedule_hash in self.interval_jobs: | 298 | if schedule_hash in self.interval_jobs: |
| 270 | |||
| 271 | # If the hash wasn't found in either `cron_jobs` or `interval_jobs` | 299 | # If the hash wasn't found in either `cron_jobs` or `interval_jobs` |
| 272 | # then it's safe to assume it's already deleted. | 300 | # then it's safe to assume it's already deleted. |
| 273 | try: | 301 | try: |
| @@ -282,18 +310,18 @@ class Scheduler(HeartbeatMixin, EMQPService): | |||
| 282 | # Double check the redis server even if we didn't find the hash | 310 | # Double check the redis server even if we didn't find the hash |
| 283 | # in memory | 311 | # in memory |
| 284 | try: | 312 | try: |
| 285 | if (self.redis_server.get(schedule_hash)): | 313 | if self.redis_server: |
| 286 | self.redis_server.delete(schedule_hash) | ||
| 287 | self.redis_server.lrem('interval_jobs', 0, schedule_hash) | 314 | self.redis_server.lrem('interval_jobs', 0, schedule_hash) |
| 288 | self.redis_server.save() | 315 | except Exception as e: |
| 289 | except redis.ConnectionError as e: | 316 | logger.warning(str(e), exc_info=True) |
| 290 | logger.warning('Could not contact redis server: {}'.format(e)) | 317 | try: |
| 318 | if self.redis_server and self.redis_server.get(schedule_hash): | ||
| 319 | self.redis_server.delete(schedule_hash) | ||
| 291 | except Exception as e: | 320 | except Exception as e: |
| 292 | logger.warning(str(e), exc_info=True) | 321 | logger.warning(str(e), exc_info=True) |
| 293 | 322 | ||
| 294 | def load_job_from_redis(self, message): | 323 | def load_job_from_redis(self, message): |
| 295 | """ | 324 | """Parses and loads a message from redis as a scheduler job.""" |
| 296 | """ | ||
| 297 | from .utils.timeutils import IntervalIter | 325 | from .utils.timeutils import IntervalIter |
| 298 | 326 | ||
| 299 | queue = message[0].encode('utf-8') | 327 | queue = message[0].encode('utf-8') |
| @@ -329,8 +357,7 @@ class Scheduler(HeartbeatMixin, EMQPService): | |||
| 329 | self.cron_jobs[schedule_hash] = [c_next, message[3], c, queue] | 357 | self.cron_jobs[schedule_hash] = [c_next, message[3], c, queue] |
| 330 | 358 | ||
| 331 | def on_schedule(self, msgid, message): | 359 | def on_schedule(self, msgid, message): |
| 332 | """ | 360 | """Create a new scheduled job.""" |
| 333 | """ | ||
| 334 | logger.info("Received new SCHEDULE request: {}".format(message)) | 361 | logger.info("Received new SCHEDULE request: {}".format(message)) |
| 335 | 362 | ||
| 336 | queue = message[0] | 363 | queue = message[0] |
| @@ -380,19 +407,16 @@ class Scheduler(HeartbeatMixin, EMQPService): | |||
| 380 | self.interval_jobs.pop(schedule_hash) | 407 | self.interval_jobs.pop(schedule_hash) |
| 381 | 408 | ||
| 382 | # Persist the scheduled job | 409 | # Persist the scheduled job |
| 383 | try: | 410 | if self.redis_server: |
| 384 | if schedule_hash not in self.redis_server.lrange( | 411 | try: |
| 385 | 'interval_jobs', 0, -1): | 412 | if schedule_hash not in self.redis_server.lrange( |
| 386 | self.redis_server.lpush('interval_jobs', schedule_hash) | 413 | 'interval_jobs', 0, -1): |
| 387 | self.redis_server.set(schedule_hash, serialize(message)) | 414 | self.redis_server.lpush('interval_jobs', schedule_hash) |
| 388 | self.redis_server.save() | 415 | self.redis_server.set(schedule_hash, serialize(message)) |
| 389 | logger.debug('Saved job {} with hash {} to redis'.format( | 416 | logger.debug('Saved job {} with hash {} to redis'.format( |
| 390 | message, schedule_hash)) | 417 | message, schedule_hash)) |
| 391 | except redis.ConnectionError: | 418 | except Exception as e: |
| 392 | logger.warning('Could not contact redis server. Unable to ' | 419 | logger.warning(str(e)) |
| 393 | 'guarantee persistence.') | ||
| 394 | except Exception as e: | ||
| 395 | logger.warning(str(e)) | ||
| 396 | 420 | ||
| 397 | # Send a request in haste mode, decrement run_count if needed | 421 | # Send a request in haste mode, decrement run_count if needed |
| 398 | if 'nohaste' not in headers: | 422 | if 'nohaste' not in headers: |
| @@ -410,21 +434,17 @@ class Scheduler(HeartbeatMixin, EMQPService): | |||
| 410 | return run_count | 434 | return run_count |
| 411 | 435 | ||
| 412 | def on_heartbeat(self, msgid, message): | 436 | def on_heartbeat(self, msgid, message): |
| 413 | """ | 437 | """Noop command. The logic for heart beating is in the event loop.""" |
| 414 | Noop command. The logic for heartbeating is in the event loop. | ||
| 415 | """ | ||
| 416 | 438 | ||
| 417 | @classmethod | 439 | @classmethod |
| 418 | def schedule_hash(cls, message): | 440 | def schedule_hash(cls, message): |
| 419 | """ | 441 | """Create a unique identifier to store and reference a message later. |
| 420 | Create a unique identifier for this message for storing | ||
| 421 | and referencing later | ||
| 422 | 442 | ||
| 423 | Args: | 443 | Args: |
| 424 | message (str): The serialized message passed to the scheduler | 444 | message (str): The serialized message passed to the scheduler |
| 425 | 445 | ||
| 426 | Returns: | 446 | Returns: |
| 427 | int: unique hash for the job | 447 | str: unique hash for the job |
| 428 | """ | 448 | """ |
| 429 | 449 | ||
| 430 | # Get the job portion of the message | 450 | # Get the job portion of the message |
| @@ -452,18 +472,17 @@ class Scheduler(HeartbeatMixin, EMQPService): | |||
| 452 | """ | 472 | """ |
| 453 | setup_logger("eventmq") | 473 | setup_logger("eventmq") |
| 454 | import_settings() | 474 | import_settings() |
| 455 | self.__init__() | 475 | import_settings('scheduler') |
| 456 | self.start(addr=conf.SCHEDULER_ADDR) | ||
| 457 | |||
| 458 | 476 | ||
| 459 | # Entry point for pip console scripts | 477 | self.load_jobs() |
| 460 | def scheduler_main(): | 478 | self.start(addr=conf.SCHEDULER_ADDR) |
| 461 | s = Scheduler() | ||
| 462 | s.scheduler_main() | ||
| 463 | 479 | ||
| 464 | 480 | ||
| 465 | def test_job(*args, **kwargs): | 481 | def test_job(*args, **kwargs): |
| 466 | """ | 482 | """ |
| 467 | Simple test job for use with the scheduler | 483 | Simple test job for use with the scheduler |
| 468 | """ | 484 | """ |
| 485 | from pprint import pprint | ||
| 469 | print("hello!") # noqa | 486 | print("hello!") # noqa |
| 487 | pprint(args) # noqa | ||
| 488 | pprint(kwargs) # noqa | ||
diff --git a/eventmq/tests/test_utils.py b/eventmq/tests/test_utils.py index 03da18a..2937085 100644 --- a/eventmq/tests/test_utils.py +++ b/eventmq/tests/test_utils.py | |||
| @@ -12,52 +12,62 @@ | |||
| 12 | # | 12 | # |
| 13 | # You should have received a copy of the GNU Lesser General Public License | 13 | # You should have received a copy of the GNU Lesser General Public License |
| 14 | # along with eventmq. If not, see <http://www.gnu.org/licenses/>. | 14 | # along with eventmq. If not, see <http://www.gnu.org/licenses/>. |
| 15 | from configparser import ConfigParser | ||
| 15 | from imp import reload | 16 | from imp import reload |
| 16 | import io | 17 | import io |
| 18 | import os | ||
| 17 | import random | 19 | import random |
| 18 | import sys | 20 | import sys |
| 19 | import unittest | 21 | import unittest |
| 20 | 22 | ||
| 21 | import mock | 23 | import mock |
| 22 | 24 | ||
| 25 | from .. import conf | ||
| 23 | from .. import constants | 26 | from .. import constants |
| 24 | from .. import exceptions | 27 | from .. import exceptions |
| 25 | from ..utils import classes, messages, settings | 28 | from ..utils import classes, messages, settings |
| 26 | 29 | ||
| 27 | 30 | ||
| 28 | class SettingsTestCase(unittest.TestCase): | 31 | class SettingsTestCase(unittest.TestCase): |
| 29 | settings_ini = "\n".join( | 32 | settings_ini = '\n'.join( |
| 30 | ("[global]", | 33 | ('[global]', |
| 31 | "super_debug=TRuE", | 34 | 'super_debug=TRuE', |
| 32 | "frontend_addr=tcp://0.0.0.0:47291", | 35 | 'frontend_addr=tcp://0.0.0.0:47291', |
| 33 | "", | 36 | '', |
| 34 | "[jobmanager]", | 37 | '[jobmanager]', |
| 35 | "super_debug=FalSe", | 38 | 'super_debug=FalSe', |
| 36 | 'queues=[[50,"google"], [40,"pushes"], [10,"default"]]', | 39 | 'queues=[[50,"google"], [40,"pushes"], [10,"default"]]', |
| 37 | "worker_addr=tcp://160.254.23.88:47290", | 40 | 'worker_addr=tcp://160.254.23.88:47290', |
| 38 | "concurrent_jobs=9283",)) | 41 | 'concurrent_jobs=9283', |
| 42 | '', | ||
| 43 | '[scheduler]', | ||
| 44 | 'redis_client_class_kwargs={"test_kwarg": true}', | ||
| 45 | '', | ||
| 46 | '[section_with_bad_list]', | ||
| 47 | 'queues=[[10,asdf],]', | ||
| 48 | '', | ||
| 49 | '[section_with_bad_dict]', | ||
| 50 | 'redis_client_class_kwargs={asdf, 39}')) | ||
| 51 | |||
| 52 | def setUp(self): | ||
| 53 | self._config = ConfigParser() | ||
| 54 | |||
| 55 | if sys.version_info[0] == 3: | ||
| 56 | self._config.read_string(self.settings_ini) | ||
| 57 | else: | ||
| 58 | self._config.readfp(io.BytesIO(self.settings_ini)) | ||
| 39 | 59 | ||
| 40 | @mock.patch('eventmq.utils.settings.os.path.exists') | ||
| 41 | def test_import_settings_default(self, pathexists_mock): | ||
| 42 | from configparser import ConfigParser | ||
| 43 | from .. import conf | ||
| 44 | # sometimes the tests step on each other with this module. reloading | 60 | # sometimes the tests step on each other with this module. reloading |
| 45 | # ensures fresh test data | 61 | # ensures fresh test data |
| 46 | reload(conf) | 62 | reload(conf) |
| 47 | pathexists_mock.return_value = True | ||
| 48 | 63 | ||
| 49 | _config = ConfigParser() | 64 | @mock.patch('eventmq.utils.settings.os.path.exists') |
| 50 | 65 | def test_import_settings_default(self, pathexists_mock): | |
| 51 | if sys.version_info[0] == 3: | 66 | pathexists_mock.return_value = True |
| 52 | _config.read_string(self.settings_ini) | ||
| 53 | else: | ||
| 54 | _config.readfp(io.BytesIO(self.settings_ini)) | ||
| 55 | 67 | ||
| 56 | # Global section | ||
| 57 | # -------------- | ||
| 58 | with mock.patch('eventmq.utils.settings.ConfigParser', | 68 | with mock.patch('eventmq.utils.settings.ConfigParser', |
| 59 | return_value=_config): | 69 | return_value=self._config): |
| 60 | with mock.patch.object(_config, 'read'): | 70 | with mock.patch.object(self._config, 'read'): |
| 61 | settings.import_settings() | 71 | settings.import_settings() |
| 62 | 72 | ||
| 63 | # Changed. Default is false | 73 | # Changed. Default is false |
| @@ -75,25 +85,19 @@ class SettingsTestCase(unittest.TestCase): | |||
| 75 | # Default is (10, 'default') | 85 | # Default is (10, 'default') |
| 76 | self.assertEqual(conf.QUEUES, [(10, conf.DEFAULT_QUEUE_NAME), ]) | 86 | self.assertEqual(conf.QUEUES, [(10, conf.DEFAULT_QUEUE_NAME), ]) |
| 77 | 87 | ||
| 78 | # Job Manager Section | 88 | @mock.patch('eventmq.utils.settings.os.path.exists') |
| 79 | # ------------------- | 89 | def test_import_settings_jobmanager(self, pathexists_mock): |
| 80 | from configparser import ConfigParser | 90 | pathexists_mock.return_value = True |
| 81 | _config = ConfigParser() | ||
| 82 | if sys.version_info[0] == 3: | ||
| 83 | _config.read_string(self.settings_ini) | ||
| 84 | else: | ||
| 85 | _config.readfp(io.BytesIO(self.settings_ini)) | ||
| 86 | 91 | ||
| 87 | # Global section | ||
| 88 | # -------------- | ||
| 89 | with mock.patch('eventmq.utils.settings.ConfigParser', | 92 | with mock.patch('eventmq.utils.settings.ConfigParser', |
| 90 | return_value=_config): | 93 | return_value=self._config): |
| 91 | with mock.patch.object(ConfigParser, 'read'): | 94 | with mock.patch.object(self._config, 'read'): |
| 95 | settings.import_settings() | ||
| 92 | settings.import_settings('jobmanager') | 96 | settings.import_settings('jobmanager') |
| 93 | 97 | ||
| 94 | # Changed | 98 | # Changed from True (in global) to False |
| 95 | self.assertFalse(conf.SUPER_DEBUG) | 99 | self.assertFalse(conf.SUPER_DEBUG) |
| 96 | # Changed | 100 | # Override default value |
| 97 | self.assertEqual(conf.CONCURRENT_JOBS, 9283) | 101 | self.assertEqual(conf.CONCURRENT_JOBS, 9283) |
| 98 | 102 | ||
| 99 | # Changed | 103 | # Changed |
| @@ -102,21 +106,14 @@ class SettingsTestCase(unittest.TestCase): | |||
| 102 | 106 | ||
| 103 | self.assertEqual(conf.WORKER_ADDR, 'tcp://160.254.23.88:47290') | 107 | self.assertEqual(conf.WORKER_ADDR, 'tcp://160.254.23.88:47290') |
| 104 | 108 | ||
| 105 | # Invalid section | 109 | @mock.patch('eventmq.utils.settings.os.path.exists') |
| 106 | # --------------- | 110 | def test_load_invalid_section_uses_defaults(self, pathexists_mock): |
| 107 | # This shouldn't fail, and nothing should change | 111 | pathexists_mock.return_value = True |
| 108 | _config = ConfigParser() | ||
| 109 | |||
| 110 | if sys.version_info[0] == 3: | ||
| 111 | _config.read_string(self.settings_ini) | ||
| 112 | else: | ||
| 113 | _config.readfp(io.BytesIO(self.settings_ini)) | ||
| 114 | 112 | ||
| 115 | # Global section | ||
| 116 | # -------------- | ||
| 117 | with mock.patch('eventmq.utils.settings.ConfigParser', | 113 | with mock.patch('eventmq.utils.settings.ConfigParser', |
| 118 | return_value=_config): | 114 | return_value=self._config): |
| 119 | with mock.patch.object(ConfigParser, 'read'): | 115 | with mock.patch.object(self._config, 'read'): |
| 116 | settings.import_settings('jobmanager') | ||
| 120 | settings.import_settings('nonexistent_section') | 117 | settings.import_settings('nonexistent_section') |
| 121 | 118 | ||
| 122 | self.assertEqual(conf.CONCURRENT_JOBS, 9283) | 119 | self.assertEqual(conf.CONCURRENT_JOBS, 9283) |
| @@ -124,6 +121,58 @@ class SettingsTestCase(unittest.TestCase): | |||
| 124 | [(50, 'google'), (40, 'pushes'), (10, 'default')]) | 121 | [(50, 'google'), (40, 'pushes'), (10, 'default')]) |
| 125 | self.assertEqual(conf.WORKER_ADDR, 'tcp://160.254.23.88:47290') | 122 | self.assertEqual(conf.WORKER_ADDR, 'tcp://160.254.23.88:47290') |
| 126 | 123 | ||
| 124 | @mock.patch('eventmq.utils.settings.os.path.exists') | ||
| 125 | def test_dictionary(self, pathexists_mock): | ||
| 126 | pathexists_mock.return_value = True | ||
| 127 | |||
| 128 | with mock.patch('eventmq.utils.settings.ConfigParser', | ||
| 129 | return_value=self._config): | ||
| 130 | with mock.patch.object(self._config, 'read'): | ||
| 131 | settings.import_settings('scheduler') | ||
| 132 | |||
| 133 | # Dictionary should be dictionary | ||
| 134 | self.assertTrue(isinstance(conf.REDIS_CLIENT_CLASS_KWARGS, dict)) | ||
| 135 | # Dictionary should be dictionary | ||
| 136 | self.assertTrue(conf.REDIS_CLIENT_CLASS_KWARGS['test_kwarg']) | ||
| 137 | |||
| 138 | @mock.patch('eventmq.utils.settings.os.path.exists') | ||
| 139 | def test_favor_environment_variables(self, pathexists_mock): | ||
| 140 | pathexists_mock.return_value = True | ||
| 141 | |||
| 142 | # frontend_addr has been defined in global, but should be the following | ||
| 143 | # value | ||
| 144 | value = 'from environment variable' | ||
| 145 | os.environ['EVENTMQ_FRONTEND_ADDR'] = value | ||
| 146 | try: | ||
| 147 | with mock.patch('eventmq.utils.settings.ConfigParser', | ||
| 148 | return_value=self._config): | ||
| 149 | with mock.patch.object(self._config, 'read'): | ||
| 150 | settings.import_settings() | ||
| 151 | finally: | ||
| 152 | del os.environ['EVENTMQ_FRONTEND_ADDR'] | ||
| 153 | |||
| 154 | self.assertEqual(conf.FRONTEND_ADDR, value) | ||
| 155 | |||
| 156 | @mock.patch('eventmq.utils.settings.os.path.exists') | ||
| 157 | def test_valueerror_on_invalid_json_list(self, pathexists_mock): | ||
| 158 | pathexists_mock.return_value = True | ||
| 159 | |||
| 160 | with mock.patch('eventmq.utils.settings.ConfigParser', | ||
| 161 | return_value=self._config): | ||
| 162 | with mock.patch.object(self._config, 'read'): | ||
| 163 | with self.assertRaises(ValueError): | ||
| 164 | settings.import_settings('section_with_bad_list') | ||
| 165 | |||
| 166 | @mock.patch('eventmq.utils.settings.os.path.exists') | ||
| 167 | def test_valueerror_on_invalid_json_dict(self, pathexists_mock): | ||
| 168 | pathexists_mock.return_value = True | ||
| 169 | |||
| 170 | with mock.patch('eventmq.utils.settings.ConfigParser', | ||
| 171 | return_value=self._config): | ||
| 172 | with mock.patch.object(self._config, 'read'): | ||
| 173 | with self.assertRaises(ValueError): | ||
| 174 | settings.import_settings('section_with_bad_dict') | ||
| 175 | |||
| 127 | 176 | ||
| 128 | class EMQPServiceTestCase(unittest.TestCase): | 177 | class EMQPServiceTestCase(unittest.TestCase): |
| 129 | 178 | ||
diff --git a/eventmq/utils/settings.py b/eventmq/utils/settings.py index 3999e39..7a91858 100644 --- a/eventmq/utils/settings.py +++ b/eventmq/utils/settings.py | |||
| @@ -79,7 +79,7 @@ def import_settings(section='global'): | |||
| 79 | value = t(json.loads(value)) | 79 | value = t(json.loads(value)) |
| 80 | except ValueError: | 80 | except ValueError: |
| 81 | raise ValueError( | 81 | raise ValueError( |
| 82 | "Invalid JSON syntax for {} setting".format(name)) | 82 | 'Invalid JSON syntax for {} setting'.format(name)) |
| 83 | # json.loads coverts all arrays to lists, but if the first | 83 | # json.loads coverts all arrays to lists, but if the first |
| 84 | # element in the default is a tuple (like in QUEUES) then | 84 | # element in the default is a tuple (like in QUEUES) then |
| 85 | # convert those elements, otherwise whatever it's type is | 85 | # convert those elements, otherwise whatever it's type is |
| @@ -91,8 +91,12 @@ def import_settings(section='global'): | |||
| 91 | elif isinstance(default_value, bool): | 91 | elif isinstance(default_value, bool): |
| 92 | setattr(conf, name, | 92 | setattr(conf, name, |
| 93 | True if 't' in value.lower() else False) | 93 | True if 't' in value.lower() else False) |
| 94 | elif t == dict: | ||
| 95 | try: | ||
| 96 | value = json.loads(value) | ||
| 97 | except ValueError: | ||
| 98 | raise ValueError( | ||
| 99 | 'Invalid JSON syntax for {} setting'.format(name)) | ||
| 100 | setattr(conf, name, value) | ||
| 94 | else: | 101 | else: |
| 95 | setattr(conf, name, t(value)) | 102 | setattr(conf, name, t(value)) |
| 96 | |||
| 97 | logger.debug("Setting conf.{} to {}".format( | ||
| 98 | name, getattr(conf, name))) | ||
| @@ -22,7 +22,6 @@ setup( | |||
| 22 | 'six==1.10.0', | 22 | 'six==1.10.0', |
| 23 | 'monotonic==0.4', | 23 | 'monotonic==0.4', |
| 24 | 'croniter==0.3.10', | 24 | 'croniter==0.3.10', |
| 25 | 'redis==2.10.3', | ||
| 26 | 'future==0.15.2', | 25 | 'future==0.15.2', |
| 27 | 'psutil==5.0.0', | 26 | 'psutil==5.0.0', |
| 28 | 'python-dateutil>=2.1,<3.0.0'], | 27 | 'python-dateutil>=2.1,<3.0.0'], |