diff options
| -rw-r--r-- | docs/protocol.rst | 53 | ||||
| -rw-r--r-- | eventmq/client/messages.py | 91 | ||||
| -rw-r--r-- | eventmq/constants.py | 6 | ||||
| -rw-r--r-- | eventmq/jobmanager.py | 179 | ||||
| -rw-r--r-- | eventmq/router.py | 217 | ||||
| -rw-r--r-- | eventmq/scheduler.py | 228 | ||||
| -rw-r--r-- | eventmq/tests/test_jobmanager.py | 148 | ||||
| -rw-r--r-- | eventmq/tests/test_router.py | 42 | ||||
| -rw-r--r-- | eventmq/tests/test_utils.py | 3 | ||||
| -rw-r--r-- | eventmq/utils/classes.py | 209 | ||||
| -rw-r--r-- | eventmq/utils/timeutils.py | 42 | ||||
| -rw-r--r-- | requirements.txt | 4 |
12 files changed, 990 insertions, 232 deletions
diff --git a/docs/protocol.rst b/docs/protocol.rst index e468b06..f7089ad 100644 --- a/docs/protocol.rst +++ b/docs/protocol.rst | |||
| @@ -86,10 +86,53 @@ FRAME Value Description | |||
| 86 | 2 PUBLISH command | 86 | 2 PUBLISH command |
| 87 | 3 _MSGID_ A unique id for the msg | 87 | 3 _MSGID_ A unique id for the msg |
| 88 | 4 _TOPIC_NAME_ the name of the queue the worker belongs to | 88 | 4 _TOPIC_NAME_ the name of the queue the worker belongs to |
| 89 | 5 _HEADERS_ dictionary of headers. can be an empty set | 89 | 5 _HEADERS_ csv list of headers |
| 90 | 6 _MSG_ The message to send | ||
| 91 | ====== ============== =========== | ||
| 92 | |||
| 93 | A **SCHEDULE** command consists of a 7-frame multipart message, formatted as follows. | ||
| 94 | |||
| 95 | ====== ============== =========== | ||
| 96 | FRAME Value Description | ||
| 97 | ====== ============== =========== | ||
| 98 | 0 _EMPTY_ leave empty | ||
| 99 | 1 eMQP/1.0 Protocol version | ||
| 100 | 2 SCHEDULE command | ||
| 101 | 3 _MSGID_ A unique id for the msg | ||
| 102 | 4 _TOPIC_NAME_ name of queue that the job should run in | ||
| 103 | 5 _HEADERS_ csv list of headers for this message | ||
| 104 | 6 _MSG_ The message to send | ||
| 105 | ====== ============== =========== | ||
| 106 | |||
| 107 | An **UNSCHEDULE** command consists of a 7-frame multipart message, formatted as follows. | ||
| 108 | |||
| 109 | ====== ============== =========== | ||
| 110 | FRAME Value Description | ||
| 111 | ====== ============== =========== | ||
| 112 | 0 _EMPTY_ leave empty | ||
| 113 | 1 eMQP/1.0 Protocol version | ||
| 114 | 2 UNSCHEDULE command | ||
| 115 | 3 _MSGID_ A unique id for the msg | ||
| 116 | 4 _TOPIC_NAME_ ignored for this command, broadcasted to all queues | ||
| 117 | 5 _HEADERS_ csv list of headers for this message | ||
| 90 | 6 _MSG_ The message to send | 118 | 6 _MSG_ The message to send |
| 91 | ====== ============== =========== | 119 | ====== ============== =========== |
| 92 | 120 | ||
| 121 | eMQP / Scheduler | ||
| 122 | ---------------- | ||
| 123 | An **INFORM** command consists of a 6-frame multipart message, formatted as follows. | ||
| 124 | |||
| 125 | ====== ============== =========== | ||
| 126 | FRAME Value Description | ||
| 127 | ====== ============== =========== | ||
| 128 | 0 _EMPTY_ leave empty | ||
| 129 | 1 eMQP/1.0 Protocol version | ||
| 130 | 2 INFORM command | ||
| 131 | 3 _MSGID_ A unique id for the msg | ||
| 132 | 4 _QUEUE_NAME_ csv seperated names of queue the worker belongs to | ||
| 133 | 5 scheduler type of peer connecting | ||
| 134 | ====== ============== =========== | ||
| 135 | |||
| 93 | eMQP / Worker | 136 | eMQP / Worker |
| 94 | ------------- | 137 | ------------- |
| 95 | An **INFORM** command consists of a 5-frame multipart message, formatted as follows. | 138 | An **INFORM** command consists of a 5-frame multipart message, formatted as follows. |
| @@ -102,6 +145,7 @@ FRAME Value Description | |||
| 102 | 2 INFORM command | 145 | 2 INFORM command |
| 103 | 3 _MSGID_ A unique id for the msg | 146 | 3 _MSGID_ A unique id for the msg |
| 104 | 4 _QUEUE_NAME_ csv seperated names of queue the worker belongs to | 147 | 4 _QUEUE_NAME_ csv seperated names of queue the worker belongs to |
| 148 | 5 worker type of peer connecting | ||
| 105 | ====== ============== =========== | 149 | ====== ============== =========== |
| 106 | 150 | ||
| 107 | A **READY** frame consists of a 4-frame multipart message, formatted as follows. | 151 | A **READY** frame consists of a 4-frame multipart message, formatted as follows. |
| @@ -154,11 +198,14 @@ Heartbeating | |||
| 154 | ------------ | 198 | ------------ |
| 155 | * HEARTBEAT commands are valid at any time after an INFORM command | 199 | * HEARTBEAT commands are valid at any time after an INFORM command |
| 156 | * Any command except DISCONNECT act as a heartbeat. Peers SHOULD NOT send HEARTBEAT commands while sending other commands. | 200 | * Any command except DISCONNECT act as a heartbeat. Peers SHOULD NOT send HEARTBEAT commands while sending other commands. |
| 157 | * Both worker and broker MUST send heartbeats at regular and agreed-upon intervals. | 201 | * Worker and broker MUST send heartbeats at regular and agreed-upon intervals. |
| 202 | * Scheduler and broker MUST send heartbeats at regular and agreed-upon intervals. | ||
| 158 | * If the worker detects that the broker disconnected it SHOULD restart the conversation. | 203 | * If the worker detects that the broker disconnected it SHOULD restart the conversation. |
| 159 | * If the broker detects that a worker has disconnected it should stop sending it a message of any type. | 204 | * If the broker detects that a worker has disconnected it should stop sending it a message of any type. |
| 205 | * If the scheduler detects that the broker disconnects it SHOULD restart the conversation. | ||
| 206 | * If the broker detects that a scheduler has disconnected it should ??????????. | ||
| 160 | 207 | ||
| 161 | Request Headers | 208 | REQUEST Headers |
| 162 | --------------- | 209 | --------------- |
| 163 | Headers MUST be 0 to many comma seperated values inserted into the header field. If there are no headers requried, send an empty string MUST be sent where headers are required. | 210 | Headers MUST be 0 to many comma seperated values inserted into the header field. If there are no headers requried, send an empty string MUST be sent where headers are required. |
| 164 | 211 | ||
diff --git a/eventmq/client/messages.py b/eventmq/client/messages.py index 66af8a9..03732f9 100644 --- a/eventmq/client/messages.py +++ b/eventmq/client/messages.py | |||
| @@ -26,6 +26,67 @@ from ..utils.messages import send_emqp_message | |||
| 26 | logger = logging.getLogger(__name__) | 26 | logger = logging.getLogger(__name__) |
| 27 | 27 | ||
| 28 | 28 | ||
| 29 | def schedule(socket, func, interval_secs, args=(), kwargs=None, class_args=(), | ||
| 30 | class_kwargs=None, headers=('guarantee',), | ||
| 31 | queue=conf.DEFAULT_QUEUE_NAME, unschedule=False): | ||
| 32 | """ | ||
| 33 | Execute a task on a defined interval. | ||
| 34 | |||
| 35 | Args: | ||
| 36 | socket (socket): eventmq socket to use for sending the message | ||
| 37 | func (callable): the callable to be scheduled on a worker | ||
| 38 | minutes (int): minutes to wait in between executions | ||
| 39 | args (list): list of *args to pass to the callable | ||
| 40 | kwargs (dict): dict of **kwargs to pass to the callable | ||
| 41 | class_args (list): list of *args to pass to the class (if applicable) | ||
| 42 | class_kwargs (dict): dict of **kwargs to pass to the class (if | ||
| 43 | applicable) | ||
| 44 | headers (list): list of strings denoting enabled headers. Default: | ||
| 45 | guarantee is enabled to ensure the scheduler schedules the job. | ||
| 46 | queue (str): name of the queue to use when executing the job. The | ||
| 47 | default value is the default queue. | ||
| 48 | """ | ||
| 49 | if not class_kwargs: | ||
| 50 | class_kwargs = {} | ||
| 51 | if not kwargs: | ||
| 52 | kwargs = {} | ||
| 53 | |||
| 54 | if callable(func): | ||
| 55 | path, callable_name = build_module_path(func) | ||
| 56 | else: | ||
| 57 | logger.error('Encountered non-callable func: {}'.format(func)) | ||
| 58 | return False | ||
| 59 | |||
| 60 | if not callable_name: | ||
| 61 | logger.error('Encountered callable with no name in {}'.format( | ||
| 62 | func.__module__ | ||
| 63 | )) | ||
| 64 | return False | ||
| 65 | |||
| 66 | if not path: | ||
| 67 | logger.error('Encountered callable with no __module__ path {}'.format( | ||
| 68 | func.__name__ | ||
| 69 | )) | ||
| 70 | return False | ||
| 71 | |||
| 72 | # TODO: convert all the times to seconds for the clock | ||
| 73 | |||
| 74 | # TODO: send the schedule request | ||
| 75 | |||
| 76 | msg = ['run', { | ||
| 77 | 'callable': callable_name, | ||
| 78 | 'path': path, | ||
| 79 | 'args': args, | ||
| 80 | 'kwargs': kwargs, | ||
| 81 | 'class_args': class_args, | ||
| 82 | 'class_kwargs': class_kwargs, | ||
| 83 | }] | ||
| 84 | |||
| 85 | send_schedule_request(socket, interval_secs=interval_secs, | ||
| 86 | message=msg, headers=headers, queue=queue, | ||
| 87 | unschedule=unschedule) | ||
| 88 | |||
| 89 | |||
| 29 | def defer_job(socket, func, args=(), kwargs=None, class_args=(), | 90 | def defer_job(socket, func, args=(), kwargs=None, class_args=(), |
| 30 | class_kwargs=None, reply_requested=False, guarantee=False, | 91 | class_kwargs=None, reply_requested=False, guarantee=False, |
| 31 | retry_count=0, queue=conf.DEFAULT_QUEUE_NAME): | 92 | retry_count=0, queue=conf.DEFAULT_QUEUE_NAME): |
| @@ -94,7 +155,7 @@ def defer_job(socket, func, args=(), kwargs=None, class_args=(), | |||
| 94 | send_request(socket, msg, reply_requested=reply_requested, | 155 | send_request(socket, msg, reply_requested=reply_requested, |
| 95 | guarantee=guarantee, retry_count=retry_count, queue=queue) | 156 | guarantee=guarantee, retry_count=retry_count, queue=queue) |
| 96 | 157 | ||
| 97 | return True | 158 | return True # The message has successfully been queued for delivery |
| 98 | 159 | ||
| 99 | 160 | ||
| 100 | def build_module_path(func): | 161 | def build_module_path(func): |
| @@ -196,6 +257,34 @@ def send_request(socket, message, reply_requested=False, guarantee=False, | |||
| 196 | ) | 257 | ) |
| 197 | 258 | ||
| 198 | 259 | ||
| 260 | def send_schedule_request(socket, interval_secs, message, headers=(), | ||
| 261 | queue=None, unschedule=False): | ||
| 262 | """ | ||
| 263 | Send a SCHEDULE or UNSCHEDULE command. | ||
| 264 | |||
| 265 | Queues a message requesting that something happens on an | ||
| 266 | interval for the scheduler. | ||
| 267 | |||
| 268 | Args: | ||
| 269 | socket (socket): | ||
| 270 | interval_secs (int): | ||
| 271 | message: Message to send socket. | ||
| 272 | headers (list): List of headers for the message | ||
| 273 | queue (str): name of queue the job should be executed in | ||
| 274 | """ | ||
| 275 | |||
| 276 | if unschedule: | ||
| 277 | command = 'UNSCHEDULE' | ||
| 278 | else: | ||
| 279 | command = 'SCHEDULE' | ||
| 280 | |||
| 281 | send_emqp_message(socket, command, | ||
| 282 | (queue or conf.DEFAULT_QUEUE_NAME, | ||
| 283 | ','.join(headers), | ||
| 284 | str(interval_secs), | ||
| 285 | serialize(message))) | ||
| 286 | |||
| 287 | |||
| 199 | def job(block=False): # Move to decorators.py | 288 | def job(block=False): # Move to decorators.py |
| 200 | """ | 289 | """ |
| 201 | run the decorated function on a worker | 290 | run the decorated function on a worker |
diff --git a/eventmq/constants.py b/eventmq/constants.py index 621a6a5..c379ef5 100644 --- a/eventmq/constants.py +++ b/eventmq/constants.py | |||
| @@ -8,5 +8,11 @@ class STATUS(object): | |||
| 8 | stopping = 300 | 8 | stopping = 300 |
| 9 | stopped = 301 | 9 | stopped = 301 |
| 10 | 10 | ||
| 11 | |||
| 12 | class CLIENT_TYPE(object): | ||
| 13 | worker = 'worker' | ||
| 14 | scheduler = 'scheduler' | ||
| 15 | |||
| 16 | |||
| 11 | # See doc/protocol.rst | 17 | # See doc/protocol.rst |
| 12 | PROTOCOL_VERSION = 'eMQP/1.0' | 18 | PROTOCOL_VERSION = 'eMQP/1.0' |
diff --git a/eventmq/jobmanager.py b/eventmq/jobmanager.py index b52eebd..9499146 100644 --- a/eventmq/jobmanager.py +++ b/eventmq/jobmanager.py | |||
| @@ -21,14 +21,13 @@ import json | |||
| 21 | import logging | 21 | import logging |
| 22 | import signal | 22 | import signal |
| 23 | 23 | ||
| 24 | from . import conf, constants, exceptions, utils | 24 | from . import conf |
| 25 | from .poller import Poller, POLLIN | 25 | from .poller import Poller, POLLIN |
| 26 | from .sender import Sender | 26 | from .sender import Sender |
| 27 | from .utils.classes import HeartbeatMixin | 27 | from .utils.classes import EMQPService, HeartbeatMixin |
| 28 | from .utils.settings import import_settings | 28 | from .utils.settings import import_settings |
| 29 | from .utils.devices import generate_device_name | 29 | from .utils.devices import generate_device_name |
| 30 | from .utils.messages import send_emqp_message as sendmsg | 30 | from .utils.messages import send_emqp_message as sendmsg |
| 31 | import utils.messages | ||
| 32 | from .utils.timeutils import monotonic | 31 | from .utils.timeutils import monotonic |
| 33 | from .worker import MultiprocessWorker as Worker | 32 | from .worker import MultiprocessWorker as Worker |
| 34 | from eventmq.log import setup_logger | 33 | from eventmq.log import setup_logger |
| @@ -36,13 +35,14 @@ from eventmq.log import setup_logger | |||
| 36 | logger = logging.getLogger(__name__) | 35 | logger = logging.getLogger(__name__) |
| 37 | 36 | ||
| 38 | 37 | ||
| 39 | class JobManager(HeartbeatMixin): | 38 | class JobManager(HeartbeatMixin, EMQPService): |
| 40 | """ | 39 | """ |
| 41 | The exposed portion of the worker. The job manager's main responsibility is | 40 | The exposed portion of the worker. The job manager's main responsibility is |
| 42 | to manage the resources on the server it's running. | 41 | to manage the resources on the server it's running. |
| 43 | 42 | ||
| 44 | This job manager uses tornado's eventloop. | 43 | This job manager uses tornado's eventloop. |
| 45 | """ | 44 | """ |
| 45 | SERVICE_TYPE = 'worker' | ||
| 46 | 46 | ||
| 47 | def __init__(self, *args, **kwargs): | 47 | def __init__(self, *args, **kwargs): |
| 48 | """ | 48 | """ |
| @@ -58,7 +58,7 @@ class JobManager(HeartbeatMixin): | |||
| 58 | #: Define the name of this JobManager instance. Useful to know when | 58 | #: Define the name of this JobManager instance. Useful to know when |
| 59 | #: referring to the logs. | 59 | #: referring to the logs. |
| 60 | self.name = kwargs.pop('name', generate_device_name()) | 60 | self.name = kwargs.pop('name', generate_device_name()) |
| 61 | logger.info('Initializing JobManager %s...'.format(self.name)) | 61 | logger.info('Initializing JobManager {}...'.format(self.name)) |
| 62 | 62 | ||
| 63 | #: Number of workers that are available to have a job executed. This | 63 | #: Number of workers that are available to have a job executed. This |
| 64 | #: number changes as workers become busy with jobs | 64 | #: number changes as workers become busy with jobs |
| @@ -67,7 +67,8 @@ class JobManager(HeartbeatMixin): | |||
| 67 | #: JobManager starts out by INFORMing the router of it's existance, | 67 | #: JobManager starts out by INFORMing the router of it's existance, |
| 68 | #: then telling the router that it is READY. The reply will be the unit | 68 | #: then telling the router that it is READY. The reply will be the unit |
| 69 | #: of work. | 69 | #: of work. |
| 70 | self.incoming = Sender() | 70 | # Despite the name, jobs are received on this socket |
| 71 | self.outgoing = Sender(name=self.name) | ||
| 71 | 72 | ||
| 72 | #: Jobs that are running should be stored in `active_jobs`. There | 73 | #: Jobs that are running should be stored in `active_jobs`. There |
| 73 | #: should always be at most `available_workers` count of active jobs. | 74 | #: should always be at most `available_workers` count of active jobs. |
| @@ -78,109 +79,39 @@ class JobManager(HeartbeatMixin): | |||
| 78 | 79 | ||
| 79 | self._setup() | 80 | self._setup() |
| 80 | 81 | ||
| 81 | def _setup(self): | 82 | def _start_event_loop(self): |
| 82 | """ | 83 | """ |
| 83 | Prepares JobManager ready to connect to a broker. Actions that must | 84 | Starts the actual eventloop. Usually called by :meth:`start` |
| 84 | also run on a reset are here. | ||
| 85 | """ | 85 | """ |
| 86 | # Look for incoming events | 86 | # Acknowledgment has come |
| 87 | self.poller.register(self.incoming, POLLIN) | 87 | # Send a READY for each available worker |
| 88 | self.awaiting_startup_ack = False | 88 | for i in range(0, self.available_workers): |
| 89 | 89 | self.send_ready() | |
| 90 | self.status = constants.STATUS.ready | ||
| 91 | 90 | ||
| 92 | def start(self, addr='tcp://127.0.0.1:47291'): | ||
| 93 | """ | ||
| 94 | Connect to `addr` and begin listening for job requests | ||
| 95 | |||
| 96 | Args: | ||
| 97 | addr (str): connection string to connect to | ||
| 98 | """ | ||
| 99 | while True: | ||
| 100 | self.status = constants.STATUS.connecting | ||
| 101 | self.incoming.connect(addr) | ||
| 102 | |||
| 103 | self.awaiting_startup_ack = True | ||
| 104 | self.send_inform() | ||
| 105 | |||
| 106 | # We don't want to accidentally start processing jobs before our | ||
| 107 | # connection has been setup completely and acknowledged. | ||
| 108 | while self.awaiting_startup_ack: | ||
| 109 | # Poller timeout is in ms so the reconnect timeout is | ||
| 110 | # multiplied by 1000 to get seconds | ||
| 111 | events = self.poller.poll(conf.RECONNECT_TIMEOUT * 1000) | ||
| 112 | |||
| 113 | if self.incoming in events: # A message from the Router! | ||
| 114 | msg = self.incoming.recv_multipart() | ||
| 115 | # TODO This will silently drop messages that aren't ACK | ||
| 116 | if msg[2] == "ACK": | ||
| 117 | # :meth:`on_ack` will set self.awaiting_startup_ack to | ||
| 118 | # False | ||
| 119 | self.process_message(msg) | ||
| 120 | |||
| 121 | # Acknowledgment has come | ||
| 122 | # Send a READY for each available worker | ||
| 123 | for i in range(0, self.available_workers): | ||
| 124 | self.send_ready() | ||
| 125 | |||
| 126 | self.status = constants.STATUS.connected | ||
| 127 | logger.info('Starting to listen for jobs') | ||
| 128 | 91 | ||
| 129 | # handle any sighups by reloading config | 92 | # handle any sighups by reloading config |
| 130 | signal.signal(signal.SIGHUP, self.sighup_handler) | 93 | signal.signal(signal.SIGHUP, self.sighup_handler) |
| 131 | 94 | ||
| 132 | self._start_event_loop() | ||
| 133 | # When we return, soemthing has gone wrong and we should try to | ||
| 134 | # reconnect | ||
| 135 | self.reset() | ||
| 136 | |||
| 137 | def reset(self): | ||
| 138 | """ | ||
| 139 | Resets the current connection by closing and reopening the socket | ||
| 140 | """ | ||
| 141 | # Unregister the old socket from the poller | ||
| 142 | self.poller.unregister(self.incoming) | ||
| 143 | |||
| 144 | # Polish up a new socket to use | ||
| 145 | self.incoming.rebuild() | ||
| 146 | |||
| 147 | # Prepare the device to connect again | ||
| 148 | self._setup() | ||
| 149 | |||
| 150 | def _start_event_loop(self): | ||
| 151 | """ | ||
| 152 | Starts the actual eventloop. Usually called by :meth:`JobManager.start` | ||
| 153 | """ | ||
| 154 | while True: | 95 | while True: |
| 96 | if self.received_disconnect: | ||
| 97 | # self.reset() | ||
| 98 | # Shut down if there are no active jobs waiting | ||
| 99 | if len(self.active_jobs) > 0: | ||
| 100 | self.prune_active_jobs() | ||
| 101 | continue | ||
| 102 | break | ||
| 103 | |||
| 155 | now = monotonic() | 104 | now = monotonic() |
| 156 | events = self.poller.poll() | 105 | events = self.poller.poll() |
| 157 | 106 | ||
| 158 | if events.get(self.incoming) == POLLIN: | 107 | if events.get(self.outgoing) == POLLIN: |
| 159 | msg = self.incoming.recv_multipart() | 108 | msg = self.outgoing.recv_multipart() |
| 160 | self.process_message(msg) | 109 | self.process_message(msg) |
| 161 | 110 | ||
| 162 | # Maintain the list of active jobs | 111 | self.prune_active_jobs() |
| 163 | for job in self.active_jobs: | ||
| 164 | if not job.is_alive(): | ||
| 165 | self.active_jobs.remove(job) | ||
| 166 | self.available_workers += 1 | ||
| 167 | self.send_ready() | ||
| 168 | 112 | ||
| 169 | # TODO: Optimization: Move the method calls into another thread so | 113 | if not self.maybe_send_heartbeat(events): |
| 170 | # they don't block the event loop | 114 | break |
| 171 | if not conf.DISABLE_HEARTBEATS: | ||
| 172 | # Send a HEARTBEAT if necessary | ||
| 173 | if now - self._meta['last_sent_heartbeat'] >= \ | ||
| 174 | conf.HEARTBEAT_INTERVAL: | ||
| 175 | self.send_heartbeat(self.incoming) | ||
| 176 | |||
| 177 | # Do something about any missed HEARTBEAT, if we have nothing | ||
| 178 | # waiting on the socket | ||
| 179 | if self.is_dead() and not events: | ||
| 180 | logger.critical( | ||
| 181 | 'The broker appears to have gone away. ' | ||
| 182 | 'Reconnecting...') | ||
| 183 | break | ||
| 184 | 115 | ||
| 185 | def on_request(self, msgid, msg): | 116 | def on_request(self, msgid, msg): |
| 186 | """ | 117 | """ |
| @@ -222,58 +153,12 @@ class JobManager(HeartbeatMixin): | |||
| 222 | 153 | ||
| 223 | self.available_workers -= 1 | 154 | self.available_workers -= 1 |
| 224 | 155 | ||
| 225 | def process_message(self, msg): | ||
| 226 | """ | ||
| 227 | Processes a message | ||
| 228 | |||
| 229 | Args: | ||
| 230 | msg: The message received from the socket to parse and process. | ||
| 231 | Processing takes form of calling an `on_COMMAND` method. | ||
| 232 | """ | ||
| 233 | # Any received message should count as a heartbeat | ||
| 234 | self._meta['last_received_heartbeat'] = monotonic() | ||
| 235 | if self._meta['heartbeat_miss_count']: | ||
| 236 | self._meta['heartbeat_miss_count'] = 0 # Reset the miss count too | ||
| 237 | |||
| 238 | try: | ||
| 239 | message = utils.messages.parse_message(msg) | ||
| 240 | except exceptions.InvalidMessageError: | ||
| 241 | logger.error('Invalid message: %s' % str(msg)) | ||
| 242 | return | ||
| 243 | |||
| 244 | command = message[0] | ||
| 245 | msgid = message[1] | ||
| 246 | message = message[2] | ||
| 247 | |||
| 248 | if hasattr(self, "on_%s" % command.lower()): | ||
| 249 | func = getattr(self, "on_%s" % command.lower()) | ||
| 250 | func(msgid, message) | ||
| 251 | else: | ||
| 252 | logger.warning('No handler for %s found (tried: %s)' % | ||
| 253 | (command, ('on_%s' % command.lower()))) | ||
| 254 | |||
| 255 | def send_ready(self): | 156 | def send_ready(self): |
| 256 | """ | 157 | """ |
| 257 | send the READY command upstream to indicate that JobManager is ready | 158 | send the READY command upstream to indicate that JobManager is ready |
| 258 | for another REQUEST message. | 159 | for another REQUEST message. |
| 259 | """ | 160 | """ |
| 260 | sendmsg(self.incoming, 'READY') | 161 | sendmsg(self.outgoing, 'READY') |
| 261 | |||
| 262 | def send_inform(self, queue=None): | ||
| 263 | """ | ||
| 264 | Send an INFORM command | ||
| 265 | """ | ||
| 266 | sendmsg(self.incoming, 'INFORM', queue or conf.DEFAULT_QUEUE_NAME) | ||
| 267 | self._meta['last_sent_heartbeat'] = monotonic() | ||
| 268 | |||
| 269 | def on_ack(self, msgid, ackd_msgid): | ||
| 270 | """ | ||
| 271 | Sets :attr:`awaiting_ack` to False | ||
| 272 | """ | ||
| 273 | # The msgid is the only frame in the message | ||
| 274 | ackd_msgid = ackd_msgid[0] | ||
| 275 | logger.info('Received ACK for router (or client) %s' % ackd_msgid) | ||
| 276 | self.awaiting_startup_ack = False | ||
| 277 | 162 | ||
| 278 | def on_heartbeat(self, msgid, message): | 163 | def on_heartbeat(self, msgid, message): |
| 279 | """ | 164 | """ |
| @@ -282,6 +167,16 @@ class JobManager(HeartbeatMixin): | |||
| 282 | HEARTBEAT | 167 | HEARTBEAT |
| 283 | """ | 168 | """ |
| 284 | 169 | ||
| 170 | def prune_active_jobs(self): | ||
| 171 | # Maintain the list of active jobs | ||
| 172 | for job in self.active_jobs: | ||
| 173 | if not job.is_alive(): | ||
| 174 | self.active_jobs.remove(job) | ||
| 175 | self.available_workers += 1 | ||
| 176 | |||
| 177 | if not self.received_disconnect: | ||
| 178 | self.send_ready() | ||
| 179 | |||
| 285 | def sighup_handler(self, signum, frame): | 180 | def sighup_handler(self, signum, frame): |
| 286 | logger.info('Caught signal %s' % signum) | 181 | logger.info('Caught signal %s' % signum) |
| 287 | self.incoming.unbind(conf.FRONTEND_ADDR) | 182 | self.incoming.unbind(conf.FRONTEND_ADDR) |
diff --git a/eventmq/router.py b/eventmq/router.py index ac230ff..ecbe46c 100644 --- a/eventmq/router.py +++ b/eventmq/router.py | |||
| @@ -19,12 +19,11 @@ Routes messages to workers (that are in named queues). | |||
| 19 | """ | 19 | """ |
| 20 | from copy import copy | 20 | from copy import copy |
| 21 | import logging | 21 | import logging |
| 22 | import threading | ||
| 23 | import warnings | 22 | import warnings |
| 24 | import signal | 23 | import signal |
| 25 | 24 | ||
| 26 | from . import conf, exceptions, poller, receiver | 25 | from . import conf, exceptions, poller, receiver |
| 27 | from .constants import STATUS | 26 | from .constants import STATUS, CLIENT_TYPE |
| 28 | from .utils.classes import HeartbeatMixin | 27 | from .utils.classes import HeartbeatMixin |
| 29 | from .utils.messages import ( | 28 | from .utils.messages import ( |
| 30 | send_emqp_router_message as sendmsg, | 29 | send_emqp_router_message as sendmsg, |
| @@ -85,6 +84,25 @@ class Router(HeartbeatMixin): | |||
| 85 | #: workers available to take the job | 84 | #: workers available to take the job |
| 86 | self.waiting_messages = {} | 85 | self.waiting_messages = {} |
| 87 | 86 | ||
| 87 | #: Tracks the last time the scheduler queue was cleaned out of dead | ||
| 88 | #: schedulers | ||
| 89 | self._meta['last_scheduler_cleanup'] = 0 | ||
| 90 | |||
| 91 | #: Queue for schedulers to use: | ||
| 92 | self.scheduler_queue = [] | ||
| 93 | |||
| 94 | #: Scheduler clients. Clients are able to send SCHEDULE commands that | ||
| 95 | #: need to be routed to a scheduler, which will keep track of time and | ||
| 96 | #: run the job. | ||
| 97 | #: Contains dictionaries: | ||
| 98 | #: self.schedulers[<scheduler_zmq_id>] = { | ||
| 99 | #: 'hb': <last_recv_heartbeat>, | ||
| 100 | #: } | ||
| 101 | self.schedulers = {} | ||
| 102 | |||
| 103 | #: Set to True when the router should die. | ||
| 104 | self.received_disconnect = False | ||
| 105 | |||
| 88 | def start(self, | 106 | def start(self, |
| 89 | frontend_addr=conf.FRONTEND_ADDR, | 107 | frontend_addr=conf.FRONTEND_ADDR, |
| 90 | backend_addr=conf.BACKEND_ADDR): | 108 | backend_addr=conf.BACKEND_ADDR): |
| @@ -113,6 +131,10 @@ class Router(HeartbeatMixin): | |||
| 113 | Starts the actual eventloop. Usually called by :meth:`Router.start` | 131 | Starts the actual eventloop. Usually called by :meth:`Router.start` |
| 114 | """ | 132 | """ |
| 115 | while True: | 133 | while True: |
| 134 | |||
| 135 | if self.received_disconnect: | ||
| 136 | break | ||
| 137 | |||
| 116 | now = monotonic() | 138 | now = monotonic() |
| 117 | events = self.poller.poll() | 139 | events = self.poller.poll() |
| 118 | 140 | ||
| @@ -137,6 +159,22 @@ class Router(HeartbeatMixin): | |||
| 137 | # ones so the next one is alive | 159 | # ones so the next one is alive |
| 138 | self.clean_up_dead_workers() | 160 | self.clean_up_dead_workers() |
| 139 | 161 | ||
| 162 | if now - self._meta['last_sent_scheduler_heartbeat'] >= \ | ||
| 163 | conf.HEARTBEAT_INTERVAL: | ||
| 164 | self.send_schedulers_heartbeats() | ||
| 165 | |||
| 166 | if now - self._meta['last_scheduler_cleanup'] >= 10: | ||
| 167 | self.clean_up_dead_schedulers() | ||
| 168 | |||
| 169 | def reset_heartbeat_counters(self): | ||
| 170 | """ | ||
| 171 | Reset all the counters for heartbeats back to 0 | ||
| 172 | """ | ||
| 173 | super(Router, self).reset_heartbeat_counters() | ||
| 174 | |||
| 175 | # track the last time the router sent a heartbeat to the schedulers | ||
| 176 | self._meta['last_sent_scheduler_heartbeat'] = 0 | ||
| 177 | |||
| 140 | def send_ack(self, socket, recipient, msgid): | 178 | def send_ack(self, socket, recipient, msgid): |
| 141 | """ | 179 | """ |
| 142 | Sends an ACK response | 180 | Sends an ACK response |
| @@ -162,13 +200,22 @@ class Router(HeartbeatMixin): | |||
| 162 | 200 | ||
| 163 | def send_workers_heartbeats(self): | 201 | def send_workers_heartbeats(self): |
| 164 | """ | 202 | """ |
| 165 | Send heartbeats to all registered workers. | 203 | Send HEARTBEATs to all registered workers. |
| 166 | """ | 204 | """ |
| 167 | self._meta['last_sent_heartbeat'] = monotonic() | 205 | self._meta['last_sent_heartbeat'] = monotonic() |
| 168 | 206 | ||
| 169 | for worker_id in self.workers: | 207 | for worker_id in self.workers: |
| 170 | self.send_heartbeat(self.outgoing, worker_id) | 208 | self.send_heartbeat(self.outgoing, worker_id) |
| 171 | 209 | ||
| 210 | def send_schedulers_heartbeats(self): | ||
| 211 | """ | ||
| 212 | Send HEARTBEATs to all registered schedulers | ||
| 213 | """ | ||
| 214 | self._meta['last_sent_scheduler_heartbeat'] = monotonic() | ||
| 215 | |||
| 216 | for scheduler_id in self.schedulers: | ||
| 217 | self.send_heartbeat(self.incoming, scheduler_id) | ||
| 218 | |||
| 172 | def on_heartbeat(self, sender, msgid, msg): | 219 | def on_heartbeat(self, sender, msgid, msg): |
| 173 | """ | 220 | """ |
| 174 | a placeholder for a no-op command. The actual 'logic' for HEARTBEAT is | 221 | a placeholder for a no-op command. The actual 'logic' for HEARTBEAT is |
| @@ -181,12 +228,22 @@ class Router(HeartbeatMixin): | |||
| 181 | Handles an INFORM message. This happens when new worker coming online | 228 | Handles an INFORM message. This happens when new worker coming online |
| 182 | and announces itself. | 229 | and announces itself. |
| 183 | """ | 230 | """ |
| 184 | logger.info('Received INFORM request from %s' % sender) | ||
| 185 | queue_name = msg[0] | 231 | queue_name = msg[0] |
| 232 | client_type = msg[1] | ||
| 233 | |||
| 234 | logger.info('Received INFORM request from {} (type: {})'.format( | ||
| 235 | sender, client_type)) | ||
| 186 | 236 | ||
| 187 | self.add_worker(sender, queue_name) | 237 | if client_type == CLIENT_TYPE.worker: |
| 238 | self.add_worker(sender, queue_name) | ||
| 239 | self.send_ack(self.outgoing, sender, msgid) | ||
| 240 | elif client_type == CLIENT_TYPE.scheduler: | ||
| 241 | self.add_scheduler(sender) | ||
| 242 | self.send_ack(self.incoming, sender, msgid) | ||
| 188 | 243 | ||
| 189 | self.send_ack(self.outgoing, sender, msgid) | 244 | def on_disconnect(self, msgid, msg): |
| 245 | # Loops event loops should check for this and break out | ||
| 246 | self.received_disconnect = True | ||
| 190 | 247 | ||
| 191 | def on_ready(self, sender, msgid, msg): | 248 | def on_ready(self, sender, msgid, msg): |
| 192 | """ | 249 | """ |
| @@ -252,7 +309,7 @@ class Router(HeartbeatMixin): | |||
| 252 | Adds a worker to worker queues | 309 | Adds a worker to worker queues |
| 253 | 310 | ||
| 254 | Args: | 311 | Args: |
| 255 | worker_id: unique id of the worker to add | 312 | worker_id (str): unique id of the worker to add |
| 256 | queues: queue or queues this worker should be a member of | 313 | queues: queue or queues this worker should be a member of |
| 257 | """ | 314 | """ |
| 258 | # Add the worker to our worker dict | 315 | # Add the worker to our worker dict |
| @@ -263,6 +320,36 @@ class Router(HeartbeatMixin): | |||
| 263 | logger.debug('Adding {} to the self.workers for queues:{}'.format( | 320 | logger.debug('Adding {} to the self.workers for queues:{}'.format( |
| 264 | worker_id, str(queues))) | 321 | worker_id, str(queues))) |
| 265 | 322 | ||
| 323 | def clean_up_dead_schedulers(self): | ||
| 324 | """ | ||
| 325 | Loops through the list of schedulers and remove any schedulers who | ||
| 326 | the router hasn't received a heartbeat in HEARTBEAT_TIMEOUT | ||
| 327 | """ | ||
| 328 | now = monotonic() | ||
| 329 | self._meta['last_scheduler_cleanup'] = now | ||
| 330 | schedulers = copy(self.scheduler_queue) | ||
| 331 | |||
| 332 | for scheduler_id in schedulers: | ||
| 333 | last_hb_seconds = now - self.schedulers[scheduler_id]['hb'] | ||
| 334 | if last_hb_seconds >= conf.HEARTBEAT_TIMEOUT: | ||
| 335 | logger.info("No HEARTBEAT from scheduler {} in {} Removing " | ||
| 336 | "from the queue".format(scheduler_id, | ||
| 337 | last_hb_seconds)) | ||
| 338 | del self.schedulers[scheduler_id] | ||
| 339 | self.scheduler_queue.remove(scheduler_id) | ||
| 340 | |||
| 341 | def add_scheduler(self, scheduler_id): | ||
| 342 | """ | ||
| 343 | Adds a scheduler to the queue to receive SCHEDULE commands | ||
| 344 | |||
| 345 | Args: | ||
| 346 | scheduler_id (str): unique id of the scheduler to add | ||
| 347 | """ | ||
| 348 | self.scheduler_queue.append(scheduler_id) | ||
| 349 | self.schedulers[scheduler_id] = {} | ||
| 350 | self.schedulers[scheduler_id]['hb'] = monotonic() | ||
| 351 | logger.debug('Adding {} to self.schedulers'.format(scheduler_id)) | ||
| 352 | |||
| 266 | def requeue_worker(self, worker_id): | 353 | def requeue_worker(self, worker_id): |
| 267 | """ | 354 | """ |
| 268 | Add a worker back to the pools for which it is a member of. | 355 | Add a worker back to the pools for which it is a member of. |
| @@ -295,10 +382,6 @@ class Router(HeartbeatMixin): | |||
| 295 | 382 | ||
| 296 | def on_receive_request(self, msg): | 383 | def on_receive_request(self, msg): |
| 297 | """ | 384 | """ |
| 298 | This function is called when a message comes in from the client socket. | ||
| 299 | It then calls `on_command`. If `on_command` isn't found, then a | ||
| 300 | warning is created. | ||
| 301 | |||
| 302 | Args: | 385 | Args: |
| 303 | msg: The untouched message from zmq | 386 | msg: The untouched message from zmq |
| 304 | """ | 387 | """ |
| @@ -307,43 +390,83 @@ class Router(HeartbeatMixin): | |||
| 307 | except exceptions.InvalidMessageError: | 390 | except exceptions.InvalidMessageError: |
| 308 | logger.exception('Invalid message from clients: %s' % str(msg)) | 391 | logger.exception('Invalid message from clients: %s' % str(msg)) |
| 309 | 392 | ||
| 310 | queue_name = message[3][0] | 393 | sender = message[0] |
| 311 | 394 | command = message[1] | |
| 312 | # If we have no workers for the queue TODO something about it | ||
| 313 | if queue_name not in self.queues: | ||
| 314 | logger.warning("Received REQUEST with a queue I don't recognize: " | ||
| 315 | "%s" % queue_name) | ||
| 316 | logger.critical("Discarding message") | ||
| 317 | # TODO: Don't discard the message | ||
| 318 | return | ||
| 319 | |||
| 320 | try: | ||
| 321 | worker_addr = self.queues[queue_name].pop() | ||
| 322 | except KeyError: | ||
| 323 | logger.critical("REQUEST for an unknown queue caught in exception") | ||
| 324 | logger.critical("Discarding message") | ||
| 325 | return | ||
| 326 | except IndexError: | ||
| 327 | logger.warning('No available workers for queue "%s". Buffering ' | ||
| 328 | 'message to send later.' % queue_name) | ||
| 329 | if queue_name not in self.waiting_messages: | ||
| 330 | self.waiting_messages[queue_name] = [] | ||
| 331 | self.waiting_messages[queue_name].append(msg) | ||
| 332 | logger.debug('%d waiting messages in queue "%s"' % | ||
| 333 | (len(self.waiting_messages[queue_name]), queue_name)) | ||
| 334 | return | ||
| 335 | 395 | ||
| 336 | try: | 396 | # Count this message as a heart beat if it came from a scheduler that |
| 337 | # strip off the client id before forwarding because the worker | 397 | # the router is aware of. |
| 338 | # isn't expecting it, and the zmq socket is going to put our | 398 | if sender in self.schedulers and sender in self.scheduler_queue: |
| 339 | # id on it. | 399 | self.schedulers[sender]['hb'] = monotonic() |
| 340 | fwdmsg(self.outgoing, worker_addr, msg[1:]) | 400 | |
| 341 | except exceptions.PeerGoneAwayError: | 401 | # If it is a heartbeat then there is nothing left to do |
| 342 | logger.debug("Worker {} has unexpectedly gone away. Trying " | 402 | if command == "HEARTBEAT": |
| 343 | "another worker".format(worker_addr)) | 403 | return |
| 344 | 404 | ||
| 345 | # TODO: Rewrite this logic as a loop | 405 | # REQUEST is the most common message so it goes at the top |
| 346 | self.on_receive_request(msg) | 406 | if command == "REQUEST": |
| 407 | queue_name = message[3][0] | ||
| 408 | # If we have no workers for the queue TODO something about it | ||
| 409 | if queue_name not in self.queues: | ||
| 410 | logger.warning("Received %s with a queue I don't recognize: " | ||
| 411 | "%s" % (msg[3], queue_name)) | ||
| 412 | logger.critical("Discarding message") | ||
| 413 | # TODO: Don't discard the message | ||
| 414 | return | ||
| 415 | |||
| 416 | try: | ||
| 417 | worker_addr = self.queues[queue_name].pop() | ||
| 418 | except KeyError: | ||
| 419 | logger.critical("REQUEST for an unknown queue caught in " | ||
| 420 | "exception") | ||
| 421 | logger.critical("Discarding message") | ||
| 422 | return | ||
| 423 | except IndexError: | ||
| 424 | logger.warning('No available workers for queue "%s". ' | ||
| 425 | 'Buffering message to send later.' % queue_name) | ||
| 426 | if queue_name not in self.waiting_messages: | ||
| 427 | self.waiting_messages[queue_name] = [] | ||
| 428 | self.waiting_messages[queue_name].append(msg) | ||
| 429 | logger.debug('%d waiting messages in queue "%s"' % | ||
| 430 | (len(self.waiting_messages[queue_name]), | ||
| 431 | queue_name)) | ||
| 432 | return | ||
| 433 | |||
| 434 | try: | ||
| 435 | # strip off the client id before forwarding because the | ||
| 436 | # worker isn't expecting it, and the zmq socket is going | ||
| 437 | # to put this router's id on it. | ||
| 438 | fwdmsg(self.outgoing, worker_addr, msg[1:]) | ||
| 439 | except exceptions.PeerGoneAwayError: | ||
| 440 | logger.debug("Worker {} has unexpectedly gone away. " | ||
| 441 | "Trying another worker".format(worker_addr)) | ||
| 442 | |||
| 443 | # TODO: Rewrite this logic as a loop, so it can't recurse | ||
| 444 | # into oblivion | ||
| 445 | self.on_receive_request(msg) | ||
| 446 | # elif command == "HEARTBEAT": | ||
| 447 | # # The scheduler is heartbeating | ||
| 448 | |||
| 449 | elif command == "INFORM": | ||
| 450 | # This is a scheduler trying join | ||
| 451 | self.on_inform(message[0], message[2], message[3]) | ||
| 452 | |||
| 453 | elif command == "SCHEDULE" or command == "UNSCHEDULE": | ||
| 454 | # Forward the schedule message to the schedulers | ||
| 455 | scheduler_addr = self.scheduler_queue.pop() | ||
| 456 | self.scheduler_queue.append(scheduler_addr) | ||
| 457 | self.schedulers[scheduler_addr] = { | ||
| 458 | 'hb': monotonic(), | ||
| 459 | } | ||
| 460 | |||
| 461 | try: | ||
| 462 | # Strips off the client id before forwarding because the | ||
| 463 | # scheduler isn't expecting it. | ||
| 464 | fwdmsg(self.incoming, scheduler_addr, msg[1:]) | ||
| 465 | except exceptions.PeerGoneAwayError: | ||
| 466 | logger.debug("Scheduler {} has unexpectedly gone away. Trying " | ||
| 467 | "another scheduler.".format(scheduler_addr)) | ||
| 468 | # TODO: rewrite this in a loop | ||
| 469 | self.on_receive_request(msg) | ||
| 347 | 470 | ||
| 348 | def process_worker_message(self, msg): | 471 | def process_worker_message(self, msg): |
| 349 | """ | 472 | """ |
diff --git a/eventmq/scheduler.py b/eventmq/scheduler.py index c5c5754..16f6095 100644 --- a/eventmq/scheduler.py +++ b/eventmq/scheduler.py | |||
| @@ -17,17 +17,23 @@ | |||
| 17 | ============================= | 17 | ============================= |
| 18 | Handles cron and other scheduled tasks | 18 | Handles cron and other scheduled tasks |
| 19 | """ | 19 | """ |
| 20 | import json | ||
| 20 | import logging | 21 | import logging |
| 21 | import time | 22 | import time |
| 23 | import redis | ||
| 22 | 24 | ||
| 23 | from croniter import croniter | 25 | from croniter import croniter |
| 24 | from six import next | 26 | from six import next |
| 25 | 27 | ||
| 26 | from . import conf | 28 | from . import conf |
| 27 | from .sender import Sender | 29 | from .sender import Sender |
| 28 | from .utils.classes import HeartbeatMixin | 30 | from .poller import Poller, POLLIN |
| 31 | from .utils.classes import EMQPService, HeartbeatMixin | ||
| 32 | from json import loads as deserialize | ||
| 33 | from json import dumps as serialize | ||
| 29 | from .utils.settings import import_settings | 34 | from .utils.settings import import_settings |
| 30 | from .utils.timeutils import seconds_until, timestamp | 35 | from .utils.timeutils import IntervalIter |
| 36 | from .utils.timeutils import seconds_until, timestamp, monotonic | ||
| 31 | from .client.messages import send_request | 37 | from .client.messages import send_request |
| 32 | 38 | ||
| 33 | from eventmq.log import setup_logger | 39 | from eventmq.log import setup_logger |
| @@ -35,40 +41,65 @@ from eventmq.log import setup_logger | |||
| 35 | logger = logging.getLogger(__name__) | 41 | logger = logging.getLogger(__name__) |
| 36 | 42 | ||
| 37 | 43 | ||
| 38 | class Scheduler(HeartbeatMixin): | 44 | class Scheduler(HeartbeatMixin, EMQPService): |
| 39 | """ | 45 | """ |
| 40 | Keeper of time, master of schedules | 46 | Keeper of time, master of schedules |
| 41 | """ | 47 | """ |
| 48 | SERVICE_TYPE = 'scheduler' | ||
| 42 | 49 | ||
| 43 | def __init__(self, *args, **kwargs): | 50 | def __init__(self, *args, **kwargs): |
| 44 | logger.info('Initializing Scheduler...') | 51 | logger.info('Initializing Scheduler...') |
| 45 | super(Scheduler, self).__init__(*args, **kwargs) | 52 | super(Scheduler, self).__init__(*args, **kwargs) |
| 46 | self.outgoing = Sender() | 53 | self.outgoing = Sender() |
| 47 | 54 | ||
| 48 | # 0 = the next ts this job should be executed | 55 | # Open connection to redis server for persistance |
| 56 | self.redis_server = redis.StrictRedis(host='localhost', | ||
| 57 | port=6379, | ||
| 58 | db=0) | ||
| 59 | |||
| 60 | # contains 4-item lists representing cron jobs | ||
| 61 | # IDX Description | ||
| 62 | # 0 = the next ts this job should be executed in | ||
| 49 | # 1 = the function to be executed | 63 | # 1 = the function to be executed |
| 50 | # 2 = the croniter iterator for this job | 64 | # 2 = the croniter iterator for this job |
| 51 | self.jobs = [] | 65 | # 3 = the queue to execute the job in |
| 66 | self.cron_jobs = [] | ||
| 67 | |||
| 68 | # contains dict of 4-item lists representing jobs based on an interval | ||
| 69 | # key of this dictionary is a hash of company_id, path, and callable | ||
| 70 | # from the message of the SCHEDULE command received | ||
| 71 | # values of this list follow this format: | ||
| 72 | # IDX Descriptions | ||
| 73 | # 0 = the next (monotonic) ts that this job should be executed in | ||
| 74 | # 1 = the function to be executed | ||
| 75 | # 2 = the interval iter for this job | ||
| 76 | # 3 = the queue to execute the job in | ||
| 77 | self.interval_jobs = {} | ||
| 78 | |||
| 79 | self.poller = Poller() | ||
| 52 | 80 | ||
| 53 | self.load_jobs() | 81 | self.load_jobs() |
| 54 | 82 | ||
| 55 | def connect(self, addr='tcp://127.0.0.1:47290'): | 83 | self._setup() |
| 56 | """ | ||
| 57 | Connect the scheduler to worker/router at `addr` | ||
| 58 | """ | ||
| 59 | self.outgoing.connect(addr) | ||
| 60 | 84 | ||
| 61 | def load_jobs(self): | 85 | def load_jobs(self): |
| 62 | """ | 86 | """ |
| 63 | Loads the jobs that need to be scheduled | 87 | Loads the jobs that need to be scheduled |
| 64 | """ | 88 | """ |
| 65 | raw_jobs = ( | 89 | raw_jobs = ( |
| 66 | ('* * * * *', 'eventmq.scheduler.test_job'), | 90 | # ('* * * * *', 'eventmq.scheduler.test_job'), |
| 67 | ) | 91 | ) |
| 68 | ts = int(timestamp()) | 92 | ts = int(timestamp()) |
| 69 | for job in raw_jobs: | 93 | for job in raw_jobs: |
| 70 | # Create the croniter iterator | 94 | # Create the croniter iterator |
| 71 | c = croniter(job[0]) | 95 | c = croniter(job[0]) |
| 96 | path = '.'.join(job[1].split('.')[:-1]) | ||
| 97 | callable_ = job.split('.')[-1] | ||
| 98 | |||
| 99 | msg = ['run', { | ||
| 100 | 'path': path, | ||
| 101 | 'callable': callable_ | ||
| 102 | }] | ||
| 72 | 103 | ||
| 73 | # Get the next time this job should be run | 104 | # Get the next time this job should be run |
| 74 | c_next = next(c) | 105 | c_next = next(c) |
| @@ -76,15 +107,24 @@ class Scheduler(HeartbeatMixin): | |||
| 76 | # If the next execution time has passed move the iterator to | 107 | # If the next execution time has passed move the iterator to |
| 77 | # the following time | 108 | # the following time |
| 78 | c_next = next(c) | 109 | c_next = next(c) |
| 79 | self.jobs.append([c_next, job[1], c]) | 110 | self.cron_jobs.append([c_next, msg, c, None]) |
| 80 | |||
| 81 | def start(self, addr='tcp://127.0.0.1:47290'): | ||
| 82 | """ | ||
| 83 | Begin sending messages to execute scheduled jobs | ||
| 84 | """ | ||
| 85 | self.connect(addr) | ||
| 86 | 111 | ||
| 87 | self._start_event_loop() | 112 | # Restore persisted data if redis connection is alive and has jobs |
| 113 | if (self.redis_server): | ||
| 114 | interval_job_list = self.redis_server.lrange('interval_jobs', | ||
| 115 | 0, | ||
| 116 | -1) | ||
| 117 | if interval_job_list is not None: | ||
| 118 | for i in interval_job_list: | ||
| 119 | logger.debug('Restoring job with hash %s' % i) | ||
| 120 | if (self.redis_server.get(i)): | ||
| 121 | self.load_job_from_redis( | ||
| 122 | message=deserialize(self.redis_server.get(i))) | ||
| 123 | else: | ||
| 124 | logger.warning('Expected scheduled job in redis,' + | ||
| 125 | 'but none was found with hash %s' % i) | ||
| 126 | else: | ||
| 127 | logger.warning('Unabled to talk to redis server') | ||
| 88 | 128 | ||
| 89 | def _start_event_loop(self): | 129 | def _start_event_loop(self): |
| 90 | """ | 130 | """ |
| @@ -92,29 +132,146 @@ class Scheduler(HeartbeatMixin): | |||
| 92 | """ | 132 | """ |
| 93 | while True: | 133 | while True: |
| 94 | ts_now = int(timestamp()) | 134 | ts_now = int(timestamp()) |
| 135 | m_now = monotonic() | ||
| 136 | events = self.poller.poll() | ||
| 137 | |||
| 138 | if events.get(self.outgoing) == POLLIN: | ||
| 139 | msg = self.outgoing.recv_multipart() | ||
| 140 | self.process_message(msg) | ||
| 95 | 141 | ||
| 96 | for i in range(0, len(self.jobs)): | 142 | # TODO: distribute me! |
| 97 | if self.jobs[i][0] <= ts_now: # If the time is now, or passed | 143 | for i in range(0, len(self.cron_jobs)): |
| 98 | job = self.jobs[i][1] | 144 | # If the time is now, or passed |
| 99 | path = '.'.join(job.split('.')[:-1]) | 145 | if self.cron_jobs[i][0] <= ts_now: |
| 100 | callable_ = job.split('.')[-1] | 146 | msg = self.cron_jobs[i][1] |
| 147 | queue = self.cron_jobs[i][3] | ||
| 101 | 148 | ||
| 102 | # Run the job | 149 | # Run the msg |
| 103 | logger.debug("Time is: %s; Schedule is: %s - Running %s" | 150 | logger.debug("Time is: %s; Schedule is: %s - Running %s" |
| 104 | % (ts_now, self.jobs[i][0], job)) | 151 | % (ts_now, self.cron_jobs[i][0], msg)) |
| 105 | 152 | ||
| 106 | msg = ['run', { | 153 | self.send_request(self.outgoing, msg, queue=queue) |
| 107 | 'path': path, | ||
| 108 | 'callable': callable_ | ||
| 109 | }] | ||
| 110 | send_request(self.outgoing, msg) | ||
| 111 | 154 | ||
| 112 | # Update the next time to run | 155 | # Update the next time to run |
| 113 | self.jobs[i][0] = next(self.jobs[i][2]) | 156 | self.cron_jobs[i][0] = next(self.cron_jobs[i][2]) |
| 114 | logger.debug("Next execution will be in %ss" % | 157 | logger.debug("Next execution will be in %ss" % |
| 115 | seconds_until(self.jobs[i][0])) | 158 | seconds_until(self.cron_jobs[i][0])) |
| 116 | 159 | ||
| 117 | time.sleep(0.1) | 160 | for k, v in self.interval_jobs.iteritems(): |
| 161 | if v[0] <= m_now: | ||
| 162 | msg = v[1] | ||
| 163 | queue = v[3] | ||
| 164 | |||
| 165 | logger.debug("Time is: %s; Schedule is: %s - Running %s" | ||
| 166 | % (ts_now, v[0], msg)) | ||
| 167 | |||
| 168 | self.send_request(msg, queue=queue) | ||
| 169 | v[0] = next(v[2]) | ||
| 170 | |||
| 171 | if not self.maybe_send_heartbeat(events): | ||
| 172 | break | ||
| 173 | |||
| 174 | def send_request(self, jobmsg, queue=None): | ||
| 175 | jobmsg = json.loads(jobmsg) | ||
| 176 | send_request(self.outgoing, jobmsg, queue=queue) | ||
| 177 | |||
| 178 | def on_unschedule(self, msgid, message): | ||
| 179 | """ | ||
| 180 | Unschedule an existing schedule job, if it exists | ||
| 181 | """ | ||
| 182 | logger.info("Received new UNSCHEDULE request: {}".format(message)) | ||
| 183 | |||
| 184 | schedule_hash = self.schedule_hash(message) | ||
| 185 | |||
| 186 | if schedule_hash in self.interval_jobs: | ||
| 187 | # Remove scheduled job | ||
| 188 | self.interval_jobs.pop(schedule_hash) | ||
| 189 | else: | ||
| 190 | logger.debug("Couldn't find matching schedule for unschedule " + | ||
| 191 | "request") | ||
| 192 | |||
| 193 | # Double check the redis server even if we didn't find the hash | ||
| 194 | # in memory | ||
| 195 | if (self.redis_server): | ||
| 196 | if (self.redis_server.get(schedule_hash)): | ||
| 197 | self.redis_server.lrem('interval_jobs', 0, schedule_hash) | ||
| 198 | self.redis_server.save() | ||
| 199 | |||
| 200 | def load_job_from_redis(self, message): | ||
| 201 | """ | ||
| 202 | """ | ||
| 203 | from .utils.timeutils import IntervalIter | ||
| 204 | |||
| 205 | queue = message[0].encode('utf-8') | ||
| 206 | interval = int(message[2]) | ||
| 207 | inter_iter = IntervalIter(monotonic(), interval) | ||
| 208 | schedule_hash = self.schedule_hash(message) | ||
| 209 | |||
| 210 | self.interval_jobs[schedule_hash] = [ | ||
| 211 | next(inter_iter), | ||
| 212 | message[3], | ||
| 213 | inter_iter, | ||
| 214 | queue | ||
| 215 | ] | ||
| 216 | |||
| 217 | def on_schedule(self, msgid, message): | ||
| 218 | """ | ||
| 219 | """ | ||
| 220 | logger.info("Received new SCHEDULE request: {}".format(message)) | ||
| 221 | |||
| 222 | queue = message[0] | ||
| 223 | interval = int(message[2]) | ||
| 224 | inter_iter = IntervalIter(monotonic(), interval) | ||
| 225 | schedule_hash = self.schedule_hash(message) | ||
| 226 | |||
| 227 | # Notify if this is updating existing, or new | ||
| 228 | if (schedule_hash in self.interval_jobs): | ||
| 229 | logger.debug('Update existing scheduled job with %s' | ||
| 230 | % schedule_hash) | ||
| 231 | else: | ||
| 232 | logger.debug('Creating a new scheduled job with %s' | ||
| 233 | % schedule_hash) | ||
| 234 | |||
| 235 | self.interval_jobs[schedule_hash] = [ | ||
| 236 | next(inter_iter), | ||
| 237 | message[3], | ||
| 238 | inter_iter, | ||
| 239 | queue | ||
| 240 | ] | ||
| 241 | |||
| 242 | # Persist the scheduled job | ||
| 243 | if (self.redis_server): | ||
| 244 | if schedule_hash not in self.redis_server.lrange('interval_jobs', | ||
| 245 | 0, | ||
| 246 | -1): | ||
| 247 | self.redis_server.lpush('interval_jobs', schedule_hash) | ||
| 248 | self.redis_server.set(schedule_hash, serialize(message)) | ||
| 249 | self.redis_server.save() | ||
| 250 | |||
| 251 | self.send_request(message[3], queue=queue) | ||
| 252 | |||
| 253 | def on_heartbeat(self, msgid, message): | ||
| 254 | """ | ||
| 255 | Noop command. The logic for heartbeating is in the event loop. | ||
| 256 | """ | ||
| 257 | |||
| 258 | def schedule_hash(self, message): | ||
| 259 | """ | ||
| 260 | Create a unique identifier for this message for storing | ||
| 261 | and referencing later | ||
| 262 | """ | ||
| 263 | # Items to use for uniquely identifying this scheduled job | ||
| 264 | # TODO: Pass company_id in a more rigid place | ||
| 265 | msg = deserialize(message[3])[1] | ||
| 266 | schedule_hash_items = {'company_id': msg['class_args'][0], | ||
| 267 | 'path': msg['path'], | ||
| 268 | 'callable': msg['callable']} | ||
| 269 | |||
| 270 | # Hash the sorted, immutable set of items in our identifying dict | ||
| 271 | schedule_hash = str(hash(tuple(frozenset(sorted( | ||
| 272 | schedule_hash_items.items()))))) | ||
| 273 | |||
| 274 | return schedule_hash | ||
| 118 | 275 | ||
| 119 | def scheduler_main(self): | 276 | def scheduler_main(self): |
| 120 | """ | 277 | """ |
| @@ -122,6 +279,7 @@ class Scheduler(HeartbeatMixin): | |||
| 122 | """ | 279 | """ |
| 123 | setup_logger("eventmq") | 280 | setup_logger("eventmq") |
| 124 | import_settings() | 281 | import_settings() |
| 282 | self.__init__() | ||
| 125 | self.start(addr=conf.SCHEDULER_ADDR) | 283 | self.start(addr=conf.SCHEDULER_ADDR) |
| 126 | 284 | ||
| 127 | 285 | ||
| @@ -134,6 +292,4 @@ def scheduler_main(): | |||
| 134 | def test_job(): | 292 | def test_job(): |
| 135 | print "hello!" | 293 | print "hello!" |
| 136 | print "hello!" | 294 | print "hello!" |
| 137 | print "hello!" | ||
| 138 | print "hello!" | ||
| 139 | time.sleep(4) | 295 | time.sleep(4) |
diff --git a/eventmq/tests/test_jobmanager.py b/eventmq/tests/test_jobmanager.py new file mode 100644 index 0000000..3c3941d --- /dev/null +++ b/eventmq/tests/test_jobmanager.py | |||
| @@ -0,0 +1,148 @@ | |||
| 1 | # This file is part of eventmq. | ||
| 2 | # | ||
| 3 | # eventmq is free software: you can redistribute it and/or modify | ||
| 4 | # it under the terms of the GNU General Public License as published by | ||
| 5 | # the Free Software Foundation, either version 3 of the License, or | ||
| 6 | # (at your option) any later version. | ||
| 7 | # | ||
| 8 | # eventmq is distributed in the hope that it will be useful, | ||
| 9 | # but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
| 10 | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||
| 11 | # GNU General Public License for more details. | ||
| 12 | # | ||
| 13 | # You should have received a copy of the GNU General Public License | ||
| 14 | # along with eventmq. If not, see <http://www.gnu.org/licenses/>. | ||
| 15 | import json | ||
| 16 | import threading | ||
| 17 | import time | ||
| 18 | import unittest | ||
| 19 | |||
| 20 | import mock | ||
| 21 | import zmq | ||
| 22 | |||
| 23 | from .. import conf, constants, jobmanager | ||
| 24 | from ..utils.classes import ZMQSendMixin, ZMQReceiveMixin | ||
| 25 | from ..utils.messages import send_emqp_router_message | ||
| 26 | |||
| 27 | ADDR = 'inproc://pour_the_rice_in_the_thing' | ||
| 28 | |||
| 29 | |||
| 30 | class FakeDevice(ZMQReceiveMixin, ZMQSendMixin): | ||
| 31 | """ | ||
| 32 | A fake router device so we can test with some of the nice utilities, but | ||
| 33 | still allowing manual control | ||
| 34 | """ | ||
| 35 | def __init__(self, addr=ADDR): | ||
| 36 | super(FakeDevice, self).__init__() | ||
| 37 | |||
| 38 | self.zsocket = zmq.Context.instance().socket(zmq.ROUTER) | ||
| 39 | |||
| 40 | |||
| 41 | class TestCase(unittest.TestCase): | ||
| 42 | def setUp(self): | ||
| 43 | self.jm = jobmanager.JobManager() | ||
| 44 | |||
| 45 | # Since JobManager runs as a process a thread is used to allow the loop | ||
| 46 | # to run | ||
| 47 | self.jm_thread = threading.Thread(target=start_jm, | ||
| 48 | args=(self.jm,)) | ||
| 49 | |||
| 50 | self.addCleanup(self.cleanup) | ||
| 51 | |||
| 52 | def test__setup(self): | ||
| 53 | jm = jobmanager.JobManager(name='RuckusBringer') | ||
| 54 | self.assertEqual(jm.name, 'RuckusBringer') | ||
| 55 | |||
| 56 | self.assertFalse(jm.awaiting_startup_ack) | ||
| 57 | self.assertEqual(jm.status, constants.STATUS.ready) | ||
| 58 | |||
| 59 | # EMQP Tests | ||
| 60 | def test_reset(self): | ||
| 61 | self.jm.reset() | ||
| 62 | |||
| 63 | self.assertFalse(self.jm.awaiting_startup_ack) | ||
| 64 | self.assertEqual(self.jm.status, constants.STATUS.ready) | ||
| 65 | |||
| 66 | @mock.patch('signal.signal') | ||
| 67 | def test_start(self, mock_signal_signal): | ||
| 68 | sock = FakeDevice() | ||
| 69 | |||
| 70 | self.jm_thread.start() | ||
| 71 | time.sleep(.1) # wait for the manager to warm up | ||
| 72 | |||
| 73 | self.assertTrue(self.jm.awaiting_startup_ack) | ||
| 74 | self.assertEqual(self.jm.status, constants.STATUS.connecting) | ||
| 75 | |||
| 76 | # Give JM something to connect to. | ||
| 77 | sock.zsocket.bind(ADDR) | ||
| 78 | |||
| 79 | jm_addr, _, _, cmd, msgid, queues, type_ = sock.recv_multipart() | ||
| 80 | self.assertEqual(self.jm.name, jm_addr) | ||
| 81 | self.assertEqual(cmd, "INFORM") | ||
| 82 | self.assertEqual(type_, constants.CLIENT_TYPE.worker) | ||
| 83 | |||
| 84 | self.send_ack(sock, jm_addr, msgid) | ||
| 85 | |||
| 86 | time.sleep(.1) | ||
| 87 | self.assertEqual(self.jm.status, constants.STATUS.connected) | ||
| 88 | |||
| 89 | def send_ack(self, sock, jm_addr, msgid): | ||
| 90 | send_emqp_router_message(sock, jm_addr, "ACK", msgid) | ||
| 91 | |||
| 92 | @mock.patch('signal.signal') | ||
| 93 | def test__start_event_loop(self, mock_signal_signal): | ||
| 94 | # Tests the first part of the event loop | ||
| 95 | sock = FakeDevice() | ||
| 96 | sock.zsocket.bind(ADDR) | ||
| 97 | |||
| 98 | self.jm_thread.start() | ||
| 99 | |||
| 100 | # Consume the INFORM command | ||
| 101 | jm_addr, _, _, cmd, msgid, queues, type_ = sock.recv_multipart() | ||
| 102 | self.send_ack(sock, jm_addr, msgid) | ||
| 103 | |||
| 104 | # Test the correct number of READY messages is sent for the broker | ||
| 105 | # to know how many jobs the JM can handle | ||
| 106 | ready_msg_count = 0 | ||
| 107 | for i in range(0, self.jm.available_workers): | ||
| 108 | msg = sock.recv_multipart() | ||
| 109 | if len(msg) > 4 and msg[3] == "READY": | ||
| 110 | ready_msg_count += 1 | ||
| 111 | # If this fails, less READY messages were sent than were supposed | ||
| 112 | # to be sent. | ||
| 113 | self.assertEqual(ready_msg_count, self.jm.available_workers) | ||
| 114 | |||
| 115 | @mock.patch('signal.signal') | ||
| 116 | def test_on_request(self, mock_signal_signal): | ||
| 117 | from ..client.messages import build_module_path | ||
| 118 | sock = FakeDevice() | ||
| 119 | sock.zsocket.bind(ADDR) | ||
| 120 | self.jm_thread.start() | ||
| 121 | |||
| 122 | jm_addr, _, _, _, msgid, _, _ = sock.recv_multipart() | ||
| 123 | self.send_ack(sock, jm_addr, msgid) | ||
| 124 | time.sleep(.1) # give time for the JM to process | ||
| 125 | |||
| 126 | path, callable_name = build_module_path(pretend_job) | ||
| 127 | |||
| 128 | run_msg = ['run', { | ||
| 129 | 'path': path, | ||
| 130 | 'callable': callable_name, | ||
| 131 | }] | ||
| 132 | |||
| 133 | msg = (conf.DEFAULT_QUEUE_NAME, '', json.dumps(run_msg)) | ||
| 134 | |||
| 135 | send_emqp_router_message(sock, jm_addr, 'REQUEST', msg) | ||
| 136 | time.sleep(.1) # give time for the job to start up. | ||
| 137 | self.assertEqual(len(self.jm.active_jobs), 1) | ||
| 138 | |||
| 139 | def cleanup(self): | ||
| 140 | self.jm.on_disconnect(None, None) | ||
| 141 | |||
| 142 | |||
| 143 | def start_jm(jm): | ||
| 144 | jm.start(ADDR) | ||
| 145 | |||
| 146 | |||
| 147 | def pretend_job(): | ||
| 148 | time.sleep(1) | ||
diff --git a/eventmq/tests/test_router.py b/eventmq/tests/test_router.py new file mode 100644 index 0000000..11ca9ec --- /dev/null +++ b/eventmq/tests/test_router.py | |||
| @@ -0,0 +1,42 @@ | |||
| 1 | # This file is part of eventmq. | ||
| 2 | # | ||
| 3 | # eventmq is free software: you can redistribute it and/or modify | ||
| 4 | # it under the terms of the GNU General Public License as published by | ||
| 5 | # the Free Software Foundation, either version 3 of the License, or | ||
| 6 | # (at your option) any later version. | ||
| 7 | # | ||
| 8 | # eventmq is distributed in the hope that it will be useful, | ||
| 9 | # but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
| 10 | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||
| 11 | # GNU General Public License for more details. | ||
| 12 | # | ||
| 13 | # You should have received a copy of the GNU General Public License | ||
| 14 | # along with eventmq. If not, see <http://www.gnu.org/licenses/>. | ||
| 15 | import threading | ||
| 16 | import unittest | ||
| 17 | |||
| 18 | import mock | ||
| 19 | |||
| 20 | from .. import router | ||
| 21 | |||
| 22 | ADDR = 'inproc://kodak_film_festival' | ||
| 23 | |||
| 24 | |||
| 25 | class TestCase(unittest.TestCase): | ||
| 26 | def setUp(self): | ||
| 27 | self.router = router.Router() | ||
| 28 | |||
| 29 | self.thread = threading.Thread(target=start_router, | ||
| 30 | args=(self.router,)) | ||
| 31 | |||
| 32 | self.addCleanup(self.cleanup) | ||
| 33 | |||
| 34 | @mock.patch('signal.signal') | ||
| 35 | def test_start(self, mock_signal_signal): | ||
| 36 | self.thread.start() | ||
| 37 | |||
| 38 | def cleanup(self): | ||
| 39 | self.router.on_disconnect(None, None) | ||
| 40 | |||
| 41 | def start_router(router): | ||
| 42 | router.start(ADDR) | ||
diff --git a/eventmq/tests/test_utils.py b/eventmq/tests/test_utils.py index 0fc8042..7ed36d7 100644 --- a/eventmq/tests/test_utils.py +++ b/eventmq/tests/test_utils.py | |||
| @@ -52,3 +52,6 @@ class TestCase(unittest.TestCase): | |||
| 52 | broken_message = ('dlkajfs', 'lkasdjf') | 52 | broken_message = ('dlkajfs', 'lkasdjf') |
| 53 | with self.assertRaises(exceptions.InvalidMessageError): | 53 | with self.assertRaises(exceptions.InvalidMessageError): |
| 54 | messages.parse_router_message(broken_message) | 54 | messages.parse_router_message(broken_message) |
| 55 | |||
| 56 | def test_parse_router_message(self): | ||
| 57 | ['aef451a0-5cef-4f03-818a-221061c8ab68', '', 'eMQP/1.0', 'INFORM', '5caeb5fd-15d4-4b08-89e8-4e536672eef3', 'default', 'worker'] | ||
diff --git a/eventmq/utils/classes.py b/eventmq/utils/classes.py index 3459c65..cfd06d5 100644 --- a/eventmq/utils/classes.py +++ b/eventmq/utils/classes.py | |||
| @@ -21,13 +21,198 @@ import logging | |||
| 21 | 21 | ||
| 22 | import zmq.error | 22 | import zmq.error |
| 23 | 23 | ||
| 24 | from .. import conf, exceptions | 24 | from .. import conf, constants, exceptions, poller, utils |
| 25 | from ..utils.messages import send_emqp_message as sendmsg | 25 | from ..utils.messages import send_emqp_message as sendmsg |
| 26 | from ..utils.timeutils import monotonic, timestamp | 26 | from ..utils.timeutils import monotonic, timestamp |
| 27 | 27 | ||
| 28 | logger = logging.getLogger(__name__) | 28 | logger = logging.getLogger(__name__) |
| 29 | 29 | ||
| 30 | 30 | ||
| 31 | class EMQPService(object): | ||
| 32 | """ | ||
| 33 | Helper for devices that connect to brokers. | ||
| 34 | |||
| 35 | Implements utility methods for sending EMQP messages for the following | ||
| 36 | EMQP commands. | ||
| 37 | - INFORM | ||
| 38 | |||
| 39 | Also implements utlitiy methods for managing long-running processes. | ||
| 40 | |||
| 41 | To use you must define: | ||
| 42 | - `self.outgoing` - socket where messages can be sent to the Router | ||
| 43 | - `self.SERVICE_TYPE` - defines the service type for INFORM. See | ||
| 44 | :meth:`send_inform` for more information. | ||
| 45 | - `self.poller` - the poller that `self.outgoing` will be using. | ||
| 46 | Usually: `self.poller = eventmq.poller.Poller()` | ||
| 47 | |||
| 48 | When messages are received from the router, they are processed in | ||
| 49 | :meth:`process_message` which then calls `on_COMMAND`. This should be used | ||
| 50 | in the event loop so if you want to respond to the SCHEDULE command, you | ||
| 51 | would define the method `on_schedule` in your service class. | ||
| 52 | |||
| 53 | See the code for :class:`Scheduler` and :class:`JobManager` for examples. | ||
| 54 | """ | ||
| 55 | def send_inform(self, queue=None): | ||
| 56 | """ | ||
| 57 | Queues an INFORM command to `self.outgoing`. | ||
| 58 | |||
| 59 | Args: | ||
| 60 | type_ (str): Either 'worker' or 'scheduler' | ||
| 61 | queue (list): | ||
| 62 | - For 'worker' type, the queues the worker is listening on | ||
| 63 | - Ignored for 'scheduler' type | ||
| 64 | |||
| 65 | Raises: | ||
| 66 | ValueError: When `type_` does not match a specified type | ||
| 67 | """ | ||
| 68 | valid_types = (constants.CLIENT_TYPE.worker, | ||
| 69 | constants.CLIENT_TYPE.scheduler) | ||
| 70 | |||
| 71 | if self.SERVICE_TYPE not in valid_types: | ||
| 72 | raise ValueError('{} not one of {}'.format(self.SERVICE_TYPE, | ||
| 73 | valid_types)) | ||
| 74 | |||
| 75 | sendmsg(self.outgoing, 'INFORM', [ | ||
| 76 | queue or conf.DEFAULT_QUEUE_NAME, | ||
| 77 | self.SERVICE_TYPE | ||
| 78 | ]) | ||
| 79 | |||
| 80 | # If heartbeating is active, update the last heartbeat time | ||
| 81 | if hasattr(self, '_meta') and 'last_sent_heartbeat' in self._meta: | ||
| 82 | self._meta['last_sent_heartbeat'] = monotonic() | ||
| 83 | |||
| 84 | def _setup(self): | ||
| 85 | """ | ||
| 86 | Prepares the service to connect to a broker. Actions that must | ||
| 87 | also run on a reset are here. | ||
| 88 | """ | ||
| 89 | # Look for incoming events | ||
| 90 | self.poller.register(self.outgoing, poller.POLLIN) | ||
| 91 | self.awaiting_startup_ack = False | ||
| 92 | self.received_disconnect = False | ||
| 93 | |||
| 94 | self.status = constants.STATUS.ready | ||
| 95 | |||
| 96 | def start(self, addr, queues=conf.DEFAULT_QUEUE_NAME): | ||
| 97 | """ | ||
| 98 | Connect to `addr` and begin listening for job requests | ||
| 99 | |||
| 100 | Args: | ||
| 101 | addr (str): connection string to connect to | ||
| 102 | """ | ||
| 103 | while not self.received_disconnect: | ||
| 104 | self.status = constants.STATUS.connecting | ||
| 105 | self.outgoing.connect(addr) | ||
| 106 | |||
| 107 | # Setting this to false is how the loop is broken and the | ||
| 108 | # _event_loop is started. | ||
| 109 | self.awaiting_startup_ack = True | ||
| 110 | |||
| 111 | # If this is inside the loop, then many inform messages will stack | ||
| 112 | # up on the buffer until something is actually connected to. | ||
| 113 | self.send_inform(queues) | ||
| 114 | |||
| 115 | # We don't want to accidentally start processing jobs before our | ||
| 116 | # connection has been setup completely and acknowledged. | ||
| 117 | while self.awaiting_startup_ack: | ||
| 118 | # Poller timeout is in ms so the reconnect timeout is | ||
| 119 | # multiplied by 1000 to get seconds | ||
| 120 | events = self.poller.poll(conf.RECONNECT_TIMEOUT * 1000) | ||
| 121 | |||
| 122 | if self.outgoing in events: # A message from the Router! | ||
| 123 | msg = self.outgoing.recv_multipart() | ||
| 124 | # TODO This will silently drop messages that aren't | ||
| 125 | # ACK/DISCONNECT | ||
| 126 | if msg[2] == "ACK" or msg[2] == "DISCONNECT": | ||
| 127 | # :meth:`on_ack` will set self.awaiting_startup_ack to | ||
| 128 | # False | ||
| 129 | self.process_message(msg) | ||
| 130 | |||
| 131 | self.status = constants.STATUS.connected | ||
| 132 | logger.info('Starting event loop...') | ||
| 133 | self._start_event_loop() | ||
| 134 | # When we return, soemthing has gone wrong and try to reconnect | ||
| 135 | # unless self.received_disconnect is True | ||
| 136 | if not self.received_disconnect: | ||
| 137 | self.reset() | ||
| 138 | |||
| 139 | logger.info('Death.') | ||
| 140 | |||
| 141 | def reset(self): | ||
| 142 | """ | ||
| 143 | Resets the current connection by closing and reopening the socket | ||
| 144 | """ | ||
| 145 | # Unregister the old socket from the poller | ||
| 146 | self.poller.unregister(self.outgoing) | ||
| 147 | |||
| 148 | # Polish up a new socket to use | ||
| 149 | self.outgoing.rebuild() | ||
| 150 | |||
| 151 | # Prepare the device to connect again | ||
| 152 | self._setup() | ||
| 153 | |||
| 154 | def process_message(self, msg): | ||
| 155 | """ | ||
| 156 | Processes a message. Processing takes form of calling an | ||
| 157 | `on_EMQP_COMMAND` method. The method must accept `msgid` and `message` | ||
| 158 | as the first arguments. | ||
| 159 | |||
| 160 | Args: | ||
| 161 | msg: The message received from the socket to parse and process. | ||
| 162 | """ | ||
| 163 | if self.is_heartbeat_enabled: | ||
| 164 | # Any received message should count as a heartbeat | ||
| 165 | self._meta['last_received_heartbeat'] = monotonic() | ||
| 166 | if self._meta['heartbeat_miss_count']: | ||
| 167 | # Reset the miss count too | ||
| 168 | self._meta['heartbeat_miss_count'] = 0 | ||
| 169 | |||
| 170 | try: | ||
| 171 | message = utils.messages.parse_message(msg) | ||
| 172 | except exceptions.InvalidMessageError: | ||
| 173 | logger.exception('Invalid message: %s' % str(msg)) | ||
| 174 | return | ||
| 175 | |||
| 176 | command = message[0].lower() | ||
| 177 | msgid = message[1] | ||
| 178 | message = message[2] | ||
| 179 | |||
| 180 | if hasattr(self, "on_%s" % command): | ||
| 181 | func = getattr(self, "on_%s" % command) | ||
| 182 | func(msgid, message) | ||
| 183 | else: | ||
| 184 | logger.warning('No handler for %s found (tried: %s)' % | ||
| 185 | (command.upper(), ('on_%s' % command))) | ||
| 186 | |||
| 187 | def on_ack(self, msgid, ackd_msgid): | ||
| 188 | """ | ||
| 189 | Sets :attr:`awaiting_ack` to False | ||
| 190 | """ | ||
| 191 | # The msgid is the only frame in the message | ||
| 192 | ackd_msgid = ackd_msgid[0] | ||
| 193 | logger.info('Received ACK for router (or client) %s' % ackd_msgid) | ||
| 194 | self.awaiting_startup_ack = False | ||
| 195 | |||
| 196 | def on_disconnect(self, msgid, msg): | ||
| 197 | # To break out of the connecting loop if necessary | ||
| 198 | self.awaiting_startup_ack = False | ||
| 199 | |||
| 200 | # Loops event loops should check for this and break out | ||
| 201 | self.received_disconnect = True | ||
| 202 | |||
| 203 | @property | ||
| 204 | def is_heartbeat_enabled(self): | ||
| 205 | """ | ||
| 206 | Property to check if heartbeating is enabled. Useful when certain | ||
| 207 | properties must be updated for heartbeating | ||
| 208 | Returns: | ||
| 209 | bool - True if heartbeating is enabled, False if it isn't | ||
| 210 | """ | ||
| 211 | if hasattr(self, '_meta') and 'last_sent_heartbeat' in self._meta: | ||
| 212 | return True | ||
| 213 | return False | ||
| 214 | |||
| 215 | |||
| 31 | class HeartbeatMixin(object): | 216 | class HeartbeatMixin(object): |
| 32 | """ | 217 | """ |
| 33 | Provides methods for implementing heartbeats | 218 | Provides methods for implementing heartbeats |
| @@ -36,6 +221,7 @@ class HeartbeatMixin(object): | |||
| 36 | """ | 221 | """ |
| 37 | Sets up some variables to track the state of heartbeaty things | 222 | Sets up some variables to track the state of heartbeaty things |
| 38 | """ | 223 | """ |
| 224 | super(HeartbeatMixin, self).__init__() | ||
| 39 | if not hasattr(self, '_meta'): | 225 | if not hasattr(self, '_meta'): |
| 40 | self._meta = {} | 226 | self._meta = {} |
| 41 | 227 | ||
| @@ -91,6 +277,25 @@ class HeartbeatMixin(object): | |||
| 91 | 277 | ||
| 92 | return False | 278 | return False |
| 93 | 279 | ||
| 280 | def maybe_send_heartbeat(self, events): | ||
| 281 | # TODO: Optimization: Move the method calls into another thread so | ||
| 282 | # they don't block the event loop | ||
| 283 | if not conf.DISABLE_HEARTBEATS: | ||
| 284 | now = monotonic() | ||
| 285 | # Send a HEARTBEAT if necessary | ||
| 286 | if now - self._meta['last_sent_heartbeat'] >= \ | ||
| 287 | conf.HEARTBEAT_INTERVAL: | ||
| 288 | self.send_heartbeat(self.outgoing) | ||
| 289 | |||
| 290 | # Do something about any missed HEARTBEAT, if we have nothing | ||
| 291 | # waiting on the socket | ||
| 292 | if self.is_dead() and not events: | ||
| 293 | logger.critical( | ||
| 294 | 'The broker appears to have gone away. ' | ||
| 295 | 'Reconnecting...') | ||
| 296 | return False | ||
| 297 | return True | ||
| 298 | |||
| 94 | 299 | ||
| 95 | class ZMQReceiveMixin(object): | 300 | class ZMQReceiveMixin(object): |
| 96 | """ | 301 | """ |
| @@ -162,7 +367,7 @@ class ZMQSendMixin(object): | |||
| 162 | if conf.SUPER_DEBUG: | 367 | if conf.SUPER_DEBUG: |
| 163 | # If it's not at least 4 frames long then most likely it isn't an | 368 | # If it's not at least 4 frames long then most likely it isn't an |
| 164 | # eventmq message | 369 | # eventmq message |
| 165 | if len(msg) == 4 and \ | 370 | if len(msg) > 4 and \ |
| 166 | not ("HEARTBEAT" == msg[2] or "HEARTBEAT" == msg[3]) or \ | 371 | not ("HEARTBEAT" == msg[2] or "HEARTBEAT" == msg[3]) or \ |
| 167 | not conf.HIDE_HEARTBEAT_LOGS: | 372 | not conf.HIDE_HEARTBEAT_LOGS: |
| 168 | logger.debug('Sending message: %s' % str(msg)) | 373 | logger.debug('Sending message: %s' % str(msg)) |
diff --git a/eventmq/utils/timeutils.py b/eventmq/utils/timeutils.py index 9d494eb..70fd6ce 100644 --- a/eventmq/utils/timeutils.py +++ b/eventmq/utils/timeutils.py | |||
| @@ -44,3 +44,45 @@ def seconds_until(ts): | |||
| 44 | time.time() | 44 | time.time() |
| 45 | """ | 45 | """ |
| 46 | return ts - timestamp() | 46 | return ts - timestamp() |
| 47 | |||
| 48 | |||
| 49 | class IntervalIter(object): | ||
| 50 | """ | ||
| 51 | represents an interval (in seconds) and it's `next()` execution time | ||
| 52 | |||
| 53 | Usage: | ||
| 54 | # interval of 5min using monotonic clock (assume it starts at 0 for the | ||
| 55 | # sake of the example) | ||
| 56 | interval = IntervalIter(monotonic, 300) | ||
| 57 | # Py2 | ||
| 58 | |||
| 59 | interval.next() # 300 | ||
| 60 | interval.next() # 600 | ||
| 61 | |||
| 62 | # Py3 | ||
| 63 | next(interval) # 300 | ||
| 64 | next(interval) # 600 | ||
| 65 | """ | ||
| 66 | def __init__(self, start_value, interval_secs): | ||
| 67 | """ | ||
| 68 | Args: | ||
| 69 | start_value (numeric) - the timestamp to begin with. usually gotten | ||
| 70 | via :func:`monotonic` or :func:`timestamp` | ||
| 71 | interval_secs (int) - the number of seconds between intervals | ||
| 72 | """ | ||
| 73 | self.current = start_value | ||
| 74 | self.interval_secs = interval_secs | ||
| 75 | |||
| 76 | # iterate the first time so the first call to .next() is interval_secs | ||
| 77 | # + start_value | ||
| 78 | self.__next__() | ||
| 79 | |||
| 80 | def __iter__(self): | ||
| 81 | return self | ||
| 82 | |||
| 83 | def __next__(self): # Py3 | ||
| 84 | self.current += self.interval_secs | ||
| 85 | return self.current - self.interval_secs | ||
| 86 | |||
| 87 | def next(self): | ||
| 88 | return self.__next__() | ||
diff --git a/requirements.txt b/requirements.txt index ada0351..8872619 100644 --- a/requirements.txt +++ b/requirements.txt | |||
| @@ -1,7 +1,8 @@ | |||
| 1 | pyzmq==14.6.0 | 1 | pyzmq==15.1.0 |
| 2 | six==1.10.0 | 2 | six==1.10.0 |
| 3 | monotonic==0.4 # A clock who's time is not changed. used for scheduling | 3 | monotonic==0.4 # A clock who's time is not changed. used for scheduling |
| 4 | croniter==0.3.10 | 4 | croniter==0.3.10 |
| 5 | redis==2.10.3 | ||
| 5 | 6 | ||
| 6 | # Documentation | 7 | # Documentation |
| 7 | sphinxcontrib-napoleon==0.4.3 | 8 | sphinxcontrib-napoleon==0.4.3 |
| @@ -11,3 +12,4 @@ Sphinx==1.3.1 # must come after napoleon to get the latest version | |||
| 11 | nose==1.3.6 | 12 | nose==1.3.6 |
| 12 | coverage==4.0.3 | 13 | coverage==4.0.3 |
| 13 | testfixtures==4.7.0 # To test that logging exists | 14 | testfixtures==4.7.0 # To test that logging exists |
| 15 | mock==1.3.0 | ||