diff options
| -rwxr-xr-x | bin/scheduler | 2 | ||||
| -rwxr-xr-x | bin/worker | 2 | ||||
| -rw-r--r-- | docs/protocol.rst | 36 | ||||
| -rw-r--r-- | eventmq/client/messages.py | 76 | ||||
| -rw-r--r-- | eventmq/jobmanager.py | 128 | ||||
| -rw-r--r-- | eventmq/router.py | 13 | ||||
| -rw-r--r-- | eventmq/scheduler.py | 26 | ||||
| -rw-r--r-- | eventmq/utils/classes.py | 168 |
8 files changed, 310 insertions, 141 deletions
diff --git a/bin/scheduler b/bin/scheduler index 33f164e..6be810c 100755 --- a/bin/scheduler +++ b/bin/scheduler | |||
| @@ -6,4 +6,4 @@ from eventmq.scheduler import Scheduler | |||
| 6 | if __name__ == "__main__": | 6 | if __name__ == "__main__": |
| 7 | setup_logger("eventmq") | 7 | setup_logger("eventmq") |
| 8 | s = Scheduler() | 8 | s = Scheduler() |
| 9 | s.start() | 9 | s.start(addr='tcp://127.0.0.1:47290') |
| @@ -6,4 +6,4 @@ from eventmq.log import setup_logger | |||
| 6 | if __name__ == "__main__": | 6 | if __name__ == "__main__": |
| 7 | setup_logger('') | 7 | setup_logger('') |
| 8 | j = JobManager() | 8 | j = JobManager() |
| 9 | j.start() | 9 | j.start(addr='tcp://127.0.0.1:47291') |
diff --git a/docs/protocol.rst b/docs/protocol.rst index e468b06..a0ab6be 100644 --- a/docs/protocol.rst +++ b/docs/protocol.rst | |||
| @@ -90,6 +90,34 @@ FRAME Value Description | |||
| 90 | 6 _MSG_ The message to send | 90 | 6 _MSG_ The message to send |
| 91 | ====== ============== =========== | 91 | ====== ============== =========== |
| 92 | 92 | ||
| 93 | A **SCHEDULE** command consists of a 7-frame multipart message, formatted as follows. | ||
| 94 | |||
| 95 | ====== ============== =========== | ||
| 96 | FRAME Value Description | ||
| 97 | ====== ============== =========== | ||
| 98 | 0 _EMPTY_ leave empty | ||
| 99 | 1 eMQP/1.0 Protocol version | ||
| 100 | 2 SCHEDULE command | ||
| 101 | 3 _MSGID_ A unique id for the msg | ||
| 102 | 4 _QUEUE_NAME_ csv seperated names of queue the worker belongs to | ||
| 103 | 5 _MSG_ The message to send | ||
| 104 | ====== ============== =========== | ||
| 105 | |||
| 106 | eMQP / Scheduler | ||
| 107 | ---------------- | ||
| 108 | An **INFORM** command consists of a 6-frame multipart message, formatted as follows. | ||
| 109 | |||
| 110 | ====== ============== =========== | ||
| 111 | FRAME Value Description | ||
| 112 | ====== ============== =========== | ||
| 113 | 0 _EMPTY_ leave empty | ||
| 114 | 1 eMQP/1.0 Protocol version | ||
| 115 | 2 INFORM command | ||
| 116 | 3 _MSGID_ A unique id for the msg | ||
| 117 | 4 _QUEUE_NAME_ csv seperated names of queue the worker belongs to | ||
| 118 | 5 scheduler type of peer connecting | ||
| 119 | ====== ============== =========== | ||
| 120 | |||
| 93 | eMQP / Worker | 121 | eMQP / Worker |
| 94 | ------------- | 122 | ------------- |
| 95 | An **INFORM** command consists of a 5-frame multipart message, formatted as follows. | 123 | An **INFORM** command consists of a 5-frame multipart message, formatted as follows. |
| @@ -102,6 +130,7 @@ FRAME Value Description | |||
| 102 | 2 INFORM command | 130 | 2 INFORM command |
| 103 | 3 _MSGID_ A unique id for the msg | 131 | 3 _MSGID_ A unique id for the msg |
| 104 | 4 _QUEUE_NAME_ csv seperated names of queue the worker belongs to | 132 | 4 _QUEUE_NAME_ csv seperated names of queue the worker belongs to |
| 133 | 5 worker type of peer connecting | ||
| 105 | ====== ============== =========== | 134 | ====== ============== =========== |
| 106 | 135 | ||
| 107 | A **READY** frame consists of a 4-frame multipart message, formatted as follows. | 136 | A **READY** frame consists of a 4-frame multipart message, formatted as follows. |
| @@ -154,11 +183,14 @@ Heartbeating | |||
| 154 | ------------ | 183 | ------------ |
| 155 | * HEARTBEAT commands are valid at any time after an INFORM command | 184 | * HEARTBEAT commands are valid at any time after an INFORM command |
| 156 | * Any command except DISCONNECT act as a heartbeat. Peers SHOULD NOT send HEARTBEAT commands while sending other commands. | 185 | * Any command except DISCONNECT act as a heartbeat. Peers SHOULD NOT send HEARTBEAT commands while sending other commands. |
| 157 | * Both worker and broker MUST send heartbeats at regular and agreed-upon intervals. | 186 | * Worker and broker MUST send heartbeats at regular and agreed-upon intervals. |
| 187 | * Scheduler and broker MUST send heartbeats at regular and agreed-upon intervals. | ||
| 158 | * If the worker detects that the broker disconnected it SHOULD restart the conversation. | 188 | * If the worker detects that the broker disconnected it SHOULD restart the conversation. |
| 159 | * If the broker detects that a worker has disconnected it should stop sending it a message of any type. | 189 | * If the broker detects that a worker has disconnected it should stop sending it a message of any type. |
| 190 | * If the scheduler detects that the broker disconnects it SHOULD restart the conversation. | ||
| 191 | * If the broker detects that a scheduler has disconnected it should ??????????. | ||
| 160 | 192 | ||
| 161 | Request Headers | 193 | REQUEST Headers |
| 162 | --------------- | 194 | --------------- |
| 163 | Headers MUST be 0 to many comma seperated values inserted into the header field. If there are no headers requried, send an empty string MUST be sent where headers are required. | 195 | Headers MUST be 0 to many comma seperated values inserted into the header field. If there are no headers requried, send an empty string MUST be sent where headers are required. |
| 164 | 196 | ||
diff --git a/eventmq/client/messages.py b/eventmq/client/messages.py index 66af8a9..801e861 100644 --- a/eventmq/client/messages.py +++ b/eventmq/client/messages.py | |||
| @@ -26,6 +26,62 @@ from ..utils.messages import send_emqp_message | |||
| 26 | logger = logging.getLogger(__name__) | 26 | logger = logging.getLogger(__name__) |
| 27 | 27 | ||
| 28 | 28 | ||
| 29 | def schedule(socket, func, interval_mins, args=(), kwargs=None, class_args=(), | ||
| 30 | class_kwargs=None, queue=conf.DEFAULT_QUEUE_NAME): | ||
| 31 | """ | ||
| 32 | Execute a task on a defined interval. | ||
| 33 | |||
| 34 | Args: | ||
| 35 | socket (socket): eventmq socket to use for sending the message | ||
| 36 | func (callable): the callable to be scheduled on a worker | ||
| 37 | minutes (int): minutes to wait in between executions | ||
| 38 | args (list): list of *args to pass to the callable | ||
| 39 | kwargs (dict): dict of **kwargs to pass to the callable | ||
| 40 | class_args (list): list of *args to pass to the class (if applicable) | ||
| 41 | class_kwargs (dict): dict of **kwargs to pass to the class (if | ||
| 42 | applicable) | ||
| 43 | queue (str): name of the queue to use when executing the job. The | ||
| 44 | default value is the default queue. | ||
| 45 | """ | ||
| 46 | if not class_kwargs: | ||
| 47 | class_kwargs = {} | ||
| 48 | if not kwargs: | ||
| 49 | kwargs = {} | ||
| 50 | |||
| 51 | if callable(func): | ||
| 52 | path, callable_name = build_module_path(func) | ||
| 53 | else: | ||
| 54 | logger.error('Encountered non-callable func: {}'.format(func)) | ||
| 55 | return False | ||
| 56 | |||
| 57 | if not callable_name: | ||
| 58 | logger.error('Encountered callable with no name in {}'.format( | ||
| 59 | func.__module__ | ||
| 60 | )) | ||
| 61 | return False | ||
| 62 | |||
| 63 | if not path: | ||
| 64 | logger.error('Encountered callable with no __module__ path {}'.format( | ||
| 65 | func.__name__ | ||
| 66 | )) | ||
| 67 | return False | ||
| 68 | |||
| 69 | # TODO: convert all the times to seconds for the clock | ||
| 70 | |||
| 71 | # TODO: send the schedule request | ||
| 72 | |||
| 73 | msg = ['run', { | ||
| 74 | 'callable': callable_name, | ||
| 75 | 'path': path, | ||
| 76 | 'args': args, | ||
| 77 | 'kwargs': kwargs, | ||
| 78 | 'class_args': class_args, | ||
| 79 | 'class_kwargs': class_kwargs, | ||
| 80 | }] | ||
| 81 | |||
| 82 | send_schedule_request(socket, 300, msg, queue) | ||
| 83 | |||
| 84 | |||
| 29 | def defer_job(socket, func, args=(), kwargs=None, class_args=(), | 85 | def defer_job(socket, func, args=(), kwargs=None, class_args=(), |
| 30 | class_kwargs=None, reply_requested=False, guarantee=False, | 86 | class_kwargs=None, reply_requested=False, guarantee=False, |
| 31 | retry_count=0, queue=conf.DEFAULT_QUEUE_NAME): | 87 | retry_count=0, queue=conf.DEFAULT_QUEUE_NAME): |
| @@ -94,7 +150,7 @@ def defer_job(socket, func, args=(), kwargs=None, class_args=(), | |||
| 94 | send_request(socket, msg, reply_requested=reply_requested, | 150 | send_request(socket, msg, reply_requested=reply_requested, |
| 95 | guarantee=guarantee, retry_count=retry_count, queue=queue) | 151 | guarantee=guarantee, retry_count=retry_count, queue=queue) |
| 96 | 152 | ||
| 97 | return True | 153 | return True # The message has successfully been queued for delivery |
| 98 | 154 | ||
| 99 | 155 | ||
| 100 | def build_module_path(func): | 156 | def build_module_path(func): |
| @@ -196,6 +252,24 @@ def send_request(socket, message, reply_requested=False, guarantee=False, | |||
| 196 | ) | 252 | ) |
| 197 | 253 | ||
| 198 | 254 | ||
| 255 | def send_schedule_request(socket, interval_secs, message, queue=None): | ||
| 256 | """ | ||
| 257 | Send a SCHEDULE command. | ||
| 258 | |||
| 259 | Queues a message requesting that something happens on an | ||
| 260 | interval for the scheduler. | ||
| 261 | |||
| 262 | Args: | ||
| 263 | socket (socket): | ||
| 264 | interval_secs (int): | ||
| 265 | message: Message to send socket. | ||
| 266 | """ | ||
| 267 | send_emqp_message(socket, 'SCHEDULE', | ||
| 268 | (queue or conf.DEFAULT_QUEUE_NAME, | ||
| 269 | str(interval_secs), | ||
| 270 | serialize(message))) | ||
| 271 | |||
| 272 | |||
| 199 | def job(block=False): # Move to decorators.py | 273 | def job(block=False): # Move to decorators.py |
| 200 | """ | 274 | """ |
| 201 | run the decorated function on a worker | 275 | run the decorated function on a worker |
diff --git a/eventmq/jobmanager.py b/eventmq/jobmanager.py index 9ea6800..6244c02 100644 --- a/eventmq/jobmanager.py +++ b/eventmq/jobmanager.py | |||
| @@ -20,26 +20,26 @@ Ensures things about jobs and spawns the actual tasks | |||
| 20 | import json | 20 | import json |
| 21 | import logging | 21 | import logging |
| 22 | 22 | ||
| 23 | from . import conf, constants, exceptions, utils | 23 | from . import conf |
| 24 | from .poller import Poller, POLLIN | 24 | from .poller import Poller, POLLIN |
| 25 | from .sender import Sender | 25 | from .sender import Sender |
| 26 | from .utils.classes import HeartbeatMixin | 26 | from .utils.classes import EMQPService, HeartbeatMixin |
| 27 | from .utils.devices import generate_device_name | 27 | from .utils.devices import generate_device_name |
| 28 | from .utils.messages import send_emqp_message as sendmsg | 28 | from .utils.messages import send_emqp_message as sendmsg |
| 29 | import utils.messages | ||
| 30 | from .utils.timeutils import monotonic | 29 | from .utils.timeutils import monotonic |
| 31 | from .worker import MultiprocessWorker as Worker | 30 | from .worker import MultiprocessWorker as Worker |
| 32 | 31 | ||
| 33 | logger = logging.getLogger(__name__) | 32 | logger = logging.getLogger(__name__) |
| 34 | 33 | ||
| 35 | 34 | ||
| 36 | class JobManager(HeartbeatMixin): | 35 | class JobManager(HeartbeatMixin, EMQPService): |
| 37 | """ | 36 | """ |
| 38 | The exposed portion of the worker. The job manager's main responsibility is | 37 | The exposed portion of the worker. The job manager's main responsibility is |
| 39 | to manage the resources on the server it's running. | 38 | to manage the resources on the server it's running. |
| 40 | 39 | ||
| 41 | This job manager uses tornado's eventloop. | 40 | This job manager uses tornado's eventloop. |
| 42 | """ | 41 | """ |
| 42 | SERVICE_TYPE = 'worker' | ||
| 43 | 43 | ||
| 44 | def __init__(self, *args, **kwargs): | 44 | def __init__(self, *args, **kwargs): |
| 45 | """ | 45 | """ |
| @@ -64,7 +64,8 @@ class JobManager(HeartbeatMixin): | |||
| 64 | #: JobManager starts out by INFORMing the router of it's existance, | 64 | #: JobManager starts out by INFORMing the router of it's existance, |
| 65 | #: then telling the router that it is READY. The reply will be the unit | 65 | #: then telling the router that it is READY. The reply will be the unit |
| 66 | #: of work. | 66 | #: of work. |
| 67 | self.incoming = Sender() | 67 | # Despite the name, jobs are received on this socket |
| 68 | self.outgoing = Sender() | ||
| 68 | 69 | ||
| 69 | #: Jobs that are running should be stored in `active_jobs`. There | 70 | #: Jobs that are running should be stored in `active_jobs`. There |
| 70 | #: should always be at most `available_workers` count of active jobs. | 71 | #: should always be at most `available_workers` count of active jobs. |
| @@ -75,81 +76,21 @@ class JobManager(HeartbeatMixin): | |||
| 75 | 76 | ||
| 76 | self._setup() | 77 | self._setup() |
| 77 | 78 | ||
| 78 | def _setup(self): | ||
| 79 | """ | ||
| 80 | Prepares JobManager ready to connect to a broker. Actions that must | ||
| 81 | also run on a reset are here. | ||
| 82 | """ | ||
| 83 | # Look for incoming events | ||
| 84 | self.poller.register(self.incoming, POLLIN) | ||
| 85 | self.awaiting_startup_ack = False | ||
| 86 | |||
| 87 | self.status = constants.STATUS.ready | ||
| 88 | |||
| 89 | def start(self, addr='tcp://127.0.0.1:47291'): | ||
| 90 | """ | ||
| 91 | Connect to `addr` and begin listening for job requests | ||
| 92 | |||
| 93 | Args: | ||
| 94 | addr (str): connection string to connect to | ||
| 95 | """ | ||
| 96 | while True: | ||
| 97 | self.status = constants.STATUS.connecting | ||
| 98 | self.incoming.connect(addr) | ||
| 99 | |||
| 100 | self.awaiting_startup_ack = True | ||
| 101 | self.send_inform() | ||
| 102 | |||
| 103 | # We don't want to accidentally start processing jobs before our | ||
| 104 | # connection has been setup completely and acknowledged. | ||
| 105 | while self.awaiting_startup_ack: | ||
| 106 | # Poller timeout is in ms so the reconnect timeout is | ||
| 107 | # multiplied by 1000 to get seconds | ||
| 108 | events = self.poller.poll(conf.RECONNECT_TIMEOUT * 1000) | ||
| 109 | |||
| 110 | if self.incoming in events: # A message from the Router! | ||
| 111 | msg = self.incoming.recv_multipart() | ||
| 112 | # TODO This will silently drop messages that aren't ACK | ||
| 113 | if msg[2] == "ACK": | ||
| 114 | # :meth:`on_ack` will set self.awaiting_startup_ack to | ||
| 115 | # False | ||
| 116 | self.process_message(msg) | ||
| 117 | |||
| 118 | # Acknowledgment has come | ||
| 119 | # Send a READY for each available worker | ||
| 120 | for i in range(0, self.available_workers): | ||
| 121 | self.send_ready() | ||
| 122 | |||
| 123 | self.status = constants.STATUS.connected | ||
| 124 | logger.info('Starting to listen for jobs') | ||
| 125 | self._start_event_loop() | ||
| 126 | # When we return, soemthing has gone wrong and we should try to | ||
| 127 | # reconnect | ||
| 128 | self.reset() | ||
| 129 | |||
| 130 | def reset(self): | ||
| 131 | """ | ||
| 132 | Resets the current connection by closing and reopening the socket | ||
| 133 | """ | ||
| 134 | # Unregister the old socket from the poller | ||
| 135 | self.poller.unregister(self.incoming) | ||
| 136 | |||
| 137 | # Polish up a new socket to use | ||
| 138 | self.incoming.rebuild() | ||
| 139 | |||
| 140 | # Prepare the device to connect again | ||
| 141 | self._setup() | ||
| 142 | |||
| 143 | def _start_event_loop(self): | 79 | def _start_event_loop(self): |
| 144 | """ | 80 | """ |
| 145 | Starts the actual eventloop. Usually called by :meth:`JobManager.start` | 81 | Starts the actual eventloop. Usually called by :meth:`start` |
| 146 | """ | 82 | """ |
| 83 | # Acknowledgment has come | ||
| 84 | # Send a READY for each available worker | ||
| 85 | for i in range(0, self.available_workers): | ||
| 86 | self.send_ready() | ||
| 87 | |||
| 147 | while True: | 88 | while True: |
| 148 | now = monotonic() | 89 | now = monotonic() |
| 149 | events = self.poller.poll() | 90 | events = self.poller.poll() |
| 150 | 91 | ||
| 151 | if events.get(self.incoming) == POLLIN: | 92 | if events.get(self.outgoing) == POLLIN: |
| 152 | msg = self.incoming.recv_multipart() | 93 | msg = self.outgoing.recv_multipart() |
| 153 | self.process_message(msg) | 94 | self.process_message(msg) |
| 154 | 95 | ||
| 155 | # Maintain the list of active jobs | 96 | # Maintain the list of active jobs |
| @@ -165,7 +106,7 @@ class JobManager(HeartbeatMixin): | |||
| 165 | # Send a HEARTBEAT if necessary | 106 | # Send a HEARTBEAT if necessary |
| 166 | if now - self._meta['last_sent_heartbeat'] >= \ | 107 | if now - self._meta['last_sent_heartbeat'] >= \ |
| 167 | conf.HEARTBEAT_INTERVAL: | 108 | conf.HEARTBEAT_INTERVAL: |
| 168 | self.send_heartbeat(self.incoming) | 109 | self.send_heartbeat(self.outgoing) |
| 169 | 110 | ||
| 170 | # Do something about any missed HEARTBEAT, if we have nothing | 111 | # Do something about any missed HEARTBEAT, if we have nothing |
| 171 | # waiting on the socket | 112 | # waiting on the socket |
| @@ -215,49 +156,12 @@ class JobManager(HeartbeatMixin): | |||
| 215 | 156 | ||
| 216 | self.available_workers -= 1 | 157 | self.available_workers -= 1 |
| 217 | 158 | ||
| 218 | def process_message(self, msg): | ||
| 219 | """ | ||
| 220 | Processes a message | ||
| 221 | |||
| 222 | Args: | ||
| 223 | msg: The message received from the socket to parse and process. | ||
| 224 | Processing takes form of calling an `on_COMMAND` method. | ||
| 225 | """ | ||
| 226 | # Any received message should count as a heartbeat | ||
| 227 | self._meta['last_received_heartbeat'] = monotonic() | ||
| 228 | if self._meta['heartbeat_miss_count']: | ||
| 229 | self._meta['heartbeat_miss_count'] = 0 # Reset the miss count too | ||
| 230 | |||
| 231 | try: | ||
| 232 | message = utils.messages.parse_message(msg) | ||
| 233 | except exceptions.InvalidMessageError: | ||
| 234 | logger.error('Invalid message: %s' % str(msg)) | ||
| 235 | return | ||
| 236 | |||
| 237 | command = message[0] | ||
| 238 | msgid = message[1] | ||
| 239 | message = message[2] | ||
| 240 | |||
| 241 | if hasattr(self, "on_%s" % command.lower()): | ||
| 242 | func = getattr(self, "on_%s" % command.lower()) | ||
| 243 | func(msgid, message) | ||
| 244 | else: | ||
| 245 | logger.warning('No handler for %s found (tried: %s)' % | ||
| 246 | (command, ('on_%s' % command.lower()))) | ||
| 247 | |||
| 248 | def send_ready(self): | 159 | def send_ready(self): |
| 249 | """ | 160 | """ |
| 250 | send the READY command upstream to indicate that JobManager is ready | 161 | send the READY command upstream to indicate that JobManager is ready |
| 251 | for another REQUEST message. | 162 | for another REQUEST message. |
| 252 | """ | 163 | """ |
| 253 | sendmsg(self.incoming, 'READY') | 164 | sendmsg(self.outgoing, 'READY') |
| 254 | |||
| 255 | def send_inform(self, queue=None): | ||
| 256 | """ | ||
| 257 | Send an INFORM command | ||
| 258 | """ | ||
| 259 | sendmsg(self.incoming, 'INFORM', queue or conf.DEFAULT_QUEUE_NAME) | ||
| 260 | self._meta['last_sent_heartbeat'] = monotonic() | ||
| 261 | 165 | ||
| 262 | def on_ack(self, msgid, ackd_msgid): | 166 | def on_ack(self, msgid, ackd_msgid): |
| 263 | """ | 167 | """ |
diff --git a/eventmq/router.py b/eventmq/router.py index 23700bf..d896853 100644 --- a/eventmq/router.py +++ b/eventmq/router.py | |||
| @@ -82,6 +82,11 @@ class Router(HeartbeatMixin): | |||
| 82 | #: workers available to take the job | 82 | #: workers available to take the job |
| 83 | self.waiting_messages = {} | 83 | self.waiting_messages = {} |
| 84 | 84 | ||
| 85 | #: Scheduler clients. Clients are able to send SCHEDULE commands that | ||
| 86 | #: need to be routed to a scheduler, which will keep track of time and | ||
| 87 | #: run the job. | ||
| 88 | self.schedulers = [] | ||
| 89 | |||
| 85 | def start(self, | 90 | def start(self, |
| 86 | frontend_addr='tcp://127.0.0.1:47290', | 91 | frontend_addr='tcp://127.0.0.1:47290', |
| 87 | backend_addr='tcp://127.0.0.1:47291'): | 92 | backend_addr='tcp://127.0.0.1:47291'): |
| @@ -176,9 +181,9 @@ class Router(HeartbeatMixin): | |||
| 176 | Handles an INFORM message. This happens when new worker coming online | 181 | Handles an INFORM message. This happens when new worker coming online |
| 177 | and announces itself. | 182 | and announces itself. |
| 178 | """ | 183 | """ |
| 179 | logger.info('Received INFORM request from %s' % sender) | ||
| 180 | queue_name = msg[0] | 184 | queue_name = msg[0] |
| 181 | 185 | client_type = msg[1] | |
| 186 | logger.info('Received INFORM request from {} (type: {})'.format(sender, client_type)) | ||
| 182 | self.add_worker(sender, queue_name) | 187 | self.add_worker(sender, queue_name) |
| 183 | 188 | ||
| 184 | self.send_ack(self.outgoing, sender, msgid) | 189 | self.send_ack(self.outgoing, sender, msgid) |
| @@ -305,8 +310,8 @@ class Router(HeartbeatMixin): | |||
| 305 | 310 | ||
| 306 | # If we have no workers for the queue TODO something about it | 311 | # If we have no workers for the queue TODO something about it |
| 307 | if queue_name not in self.queues: | 312 | if queue_name not in self.queues: |
| 308 | logger.warning("Received REQUEST with a queue I don't recognize: " | 313 | logger.warning("Received %s with a queue I don't recognize: " |
| 309 | "%s" % queue_name) | 314 | "%s" % (msg[3], queue_name)) |
| 310 | logger.critical("Discarding message") | 315 | logger.critical("Discarding message") |
| 311 | # TODO: Don't discard the message | 316 | # TODO: Don't discard the message |
| 312 | return | 317 | return |
diff --git a/eventmq/scheduler.py b/eventmq/scheduler.py index b1b3fb6..24e95f5 100644 --- a/eventmq/scheduler.py +++ b/eventmq/scheduler.py | |||
| @@ -24,7 +24,8 @@ from croniter import croniter | |||
| 24 | from six import next | 24 | from six import next |
| 25 | 25 | ||
| 26 | from .sender import Sender | 26 | from .sender import Sender |
| 27 | from .utils.classes import HeartbeatMixin | 27 | from .poller import Poller |
| 28 | from .utils.classes import EMQPService, HeartbeatMixin | ||
| 28 | from .utils.timeutils import seconds_until, timestamp | 29 | from .utils.timeutils import seconds_until, timestamp |
| 29 | from .client.messages import send_request | 30 | from .client.messages import send_request |
| 30 | 31 | ||
| @@ -32,28 +33,27 @@ from .client.messages import send_request | |||
| 32 | logger = logging.getLogger(__name__) | 33 | logger = logging.getLogger(__name__) |
| 33 | 34 | ||
| 34 | 35 | ||
| 35 | class Scheduler(HeartbeatMixin): | 36 | class Scheduler(HeartbeatMixin, EMQPService): |
| 36 | """ | 37 | """ |
| 37 | Keeper of time, master of schedules | 38 | Keeper of time, master of schedules |
| 38 | """ | 39 | """ |
| 39 | 40 | SERVICE_TYPE = 'scheduler' | |
| 40 | def __init__(self, *args, **kwargs): | 41 | def __init__(self, *args, **kwargs): |
| 41 | logger.info('Initializing Scheduler...') | 42 | logger.info('Initializing Scheduler...') |
| 42 | super(Scheduler, self).__init__(*args, **kwargs) | 43 | super(Scheduler, self).__init__(*args, **kwargs) |
| 43 | self.outgoing = Sender() | 44 | self.outgoing = Sender() |
| 44 | 45 | ||
| 46 | # IDX Description | ||
| 45 | # 0 = the next ts this job should be executed | 47 | # 0 = the next ts this job should be executed |
| 46 | # 1 = the function to be executed | 48 | # 1 = the function to be executed |
| 47 | # 2 = the croniter iterator for this job | 49 | # 2 = the croniter iterator for this job |
| 48 | self.jobs = [] | 50 | self.jobs = [] |
| 49 | 51 | ||
| 52 | self.poller = Poller() | ||
| 53 | |||
| 50 | self.load_jobs() | 54 | self.load_jobs() |
| 51 | 55 | ||
| 52 | def connect(self, addr='tcp://127.0.0.1:47290'): | 56 | self._setup() |
| 53 | """ | ||
| 54 | Connect the scheduler to worker/router at `addr` | ||
| 55 | """ | ||
| 56 | self.outgoing.connect(addr) | ||
| 57 | 57 | ||
| 58 | def load_jobs(self): | 58 | def load_jobs(self): |
| 59 | """ | 59 | """ |
| @@ -75,14 +75,6 @@ class Scheduler(HeartbeatMixin): | |||
| 75 | c_next = next(c) | 75 | c_next = next(c) |
| 76 | self.jobs.append([c_next, job[1], c]) | 76 | self.jobs.append([c_next, job[1], c]) |
| 77 | 77 | ||
| 78 | def start(self, addr='tcp://127.0.0.1:47290'): | ||
| 79 | """ | ||
| 80 | Begin sending messages to execute scheduled jobs | ||
| 81 | """ | ||
| 82 | self.connect(addr) | ||
| 83 | |||
| 84 | self._start_event_loop() | ||
| 85 | |||
| 86 | def _start_event_loop(self): | 78 | def _start_event_loop(self): |
| 87 | """ | 79 | """ |
| 88 | Starts the actual event loop. Usually called by :meth:`Scheduler.start` | 80 | Starts the actual event loop. Usually called by :meth:`Scheduler.start` |
| @@ -117,6 +109,4 @@ class Scheduler(HeartbeatMixin): | |||
| 117 | def test_job(): | 109 | def test_job(): |
| 118 | print "hello!" | 110 | print "hello!" |
| 119 | print "hello!" | 111 | print "hello!" |
| 120 | print "hello!" | ||
| 121 | print "hello!" | ||
| 122 | time.sleep(4) | 112 | time.sleep(4) |
diff --git a/eventmq/utils/classes.py b/eventmq/utils/classes.py index e98b9b7..b6ae03c 100644 --- a/eventmq/utils/classes.py +++ b/eventmq/utils/classes.py | |||
| @@ -21,13 +21,176 @@ import logging | |||
| 21 | 21 | ||
| 22 | import zmq.error | 22 | import zmq.error |
| 23 | 23 | ||
| 24 | from .. import conf, exceptions | 24 | from .. import conf, constants, exceptions, poller, utils |
| 25 | from ..utils.messages import send_emqp_message as sendmsg | 25 | from ..utils.messages import send_emqp_message as sendmsg |
| 26 | from ..utils.timeutils import monotonic, timestamp | 26 | from ..utils.timeutils import monotonic, timestamp |
| 27 | 27 | ||
| 28 | logger = logging.getLogger(__name__) | 28 | logger = logging.getLogger(__name__) |
| 29 | 29 | ||
| 30 | 30 | ||
| 31 | class EMQPService(object): | ||
| 32 | """ | ||
| 33 | Helper for devices that connect to brokers. | ||
| 34 | |||
| 35 | Implements utility methods for sending EMQP messages for the following | ||
| 36 | EMQP commands. | ||
| 37 | - INFORM | ||
| 38 | |||
| 39 | Also implements utlitiy methods for managing long-running processes. | ||
| 40 | |||
| 41 | To use you must define: | ||
| 42 | - `self.outgoing` - socket where messages can be sent to the Router | ||
| 43 | - `self.SERVICE_TYPE` - defines the service type for INFORM. See | ||
| 44 | :meth:`send_inform` for more information. | ||
| 45 | - `self.poller` - the poller that `self.outgoing` will be using. | ||
| 46 | Usually: `self.poller = eventmq.poller.Poller()` | ||
| 47 | |||
| 48 | When messages are received from the router, they are processed in | ||
| 49 | :meth:`process_message` which then calls `on_COMMAND`, so if you want to | ||
| 50 | respond to the SCHEDULE command, you would define the method `on_schedule` | ||
| 51 | in your service class. | ||
| 52 | |||
| 53 | See the code for :class:`Scheduler` and :class:`JobManager` for examples. | ||
| 54 | """ | ||
| 55 | def send_inform(self, queue=None): | ||
| 56 | """ | ||
| 57 | Queues an INFORM command to `self.outgoing`. | ||
| 58 | |||
| 59 | Args: | ||
| 60 | type_ (str): Either 'worker' or 'scheduler' | ||
| 61 | queue (list): | ||
| 62 | - For 'worker' type, the queues the worker is listening on | ||
| 63 | - Ignored for 'scheduler' type | ||
| 64 | |||
| 65 | Raises: | ||
| 66 | ValueError: When `type_` does not match a specified type | ||
| 67 | """ | ||
| 68 | valid_types = ('worker', 'scheduler') | ||
| 69 | |||
| 70 | if self.SERVICE_TYPE not in valid_types: | ||
| 71 | raise ValueError('{} not one of {}'.format(self.SERVICE_TYPE, | ||
| 72 | valid_types)) | ||
| 73 | |||
| 74 | sendmsg(self.outgoing, 'INFORM', [ | ||
| 75 | queue or conf.DEFAULT_QUEUE_NAME, | ||
| 76 | self.SERVICE_TYPE | ||
| 77 | ]) | ||
| 78 | |||
| 79 | # If heartbeating is active, update the last heartbeat time | ||
| 80 | if hasattr(self, '_meta') and 'last_sent_heartbeat' in self._meta: | ||
| 81 | self._meta['last_sent_heartbeat'] = monotonic() | ||
| 82 | |||
| 83 | def _setup(self): | ||
| 84 | """ | ||
| 85 | Prepares the service to connect to a broker. Actions that must | ||
| 86 | also run on a reset are here. | ||
| 87 | """ | ||
| 88 | # Look for incoming events | ||
| 89 | self.poller.register(self.outgoing, poller.POLLIN) | ||
| 90 | self.awaiting_startup_ack = False | ||
| 91 | |||
| 92 | self.status = constants.STATUS.ready | ||
| 93 | |||
| 94 | def start(self, addr, queues=conf.DEFAULT_QUEUE_NAME): | ||
| 95 | """ | ||
| 96 | Connect to `addr` and begin listening for job requests | ||
| 97 | |||
| 98 | Args: | ||
| 99 | addr (str): connection string to connect to | ||
| 100 | """ | ||
| 101 | while True: | ||
| 102 | self.status = constants.STATUS.connecting | ||
| 103 | self.outgoing.connect(addr) | ||
| 104 | |||
| 105 | # Setting this to false is how the loop is broken and the | ||
| 106 | # _event_loop is started. | ||
| 107 | self.awaiting_startup_ack = True | ||
| 108 | |||
| 109 | # If this is inside the loop, then many inform messages will stack | ||
| 110 | # up on the buffer until something is actually connected to. | ||
| 111 | self.send_inform(queues) | ||
| 112 | |||
| 113 | # We don't want to accidentally start processing jobs before our | ||
| 114 | # connection has been setup completely and acknowledged. | ||
| 115 | while self.awaiting_startup_ack: | ||
| 116 | # Poller timeout is in ms so the reconnect timeout is | ||
| 117 | # multiplied by 1000 to get seconds | ||
| 118 | events = self.poller.poll(conf.RECONNECT_TIMEOUT * 1000) | ||
| 119 | |||
| 120 | if self.outgoing in events: # A message from the Router! | ||
| 121 | msg = self.outgoing.recv_multipart() | ||
| 122 | # TODO This will silently drop messages that aren't ACK | ||
| 123 | if msg[2] == "ACK": | ||
| 124 | # :meth:`on_ack` will set self.awaiting_startup_ack to | ||
| 125 | # False | ||
| 126 | self.process_message(msg) | ||
| 127 | |||
| 128 | self.status = constants.STATUS.connected | ||
| 129 | logger.info('Starting event loop...') | ||
| 130 | self._start_event_loop() | ||
| 131 | # When we return, soemthing has gone wrong and we should try to | ||
| 132 | # reconnect | ||
| 133 | self.reset() | ||
| 134 | |||
| 135 | def reset(self): | ||
| 136 | """ | ||
| 137 | Resets the current connection by closing and reopening the socket | ||
| 138 | """ | ||
| 139 | # Unregister the old socket from the poller | ||
| 140 | self.poller.unregister(self.incoming) | ||
| 141 | |||
| 142 | # Polish up a new socket to use | ||
| 143 | self.outgoing.rebuild() | ||
| 144 | |||
| 145 | # Prepare the device to connect again | ||
| 146 | self._setup() | ||
| 147 | |||
| 148 | def process_message(self, msg): | ||
| 149 | """ | ||
| 150 | Processes a message. Processing takes form of calling an | ||
| 151 | `on_EMQP_COMMAND` method. The method must accept `msgid` and `message` | ||
| 152 | as the first arguments. | ||
| 153 | |||
| 154 | Args: | ||
| 155 | msg: The message received from the socket to parse and process. | ||
| 156 | """ | ||
| 157 | if self.is_heartbeat_enabled: | ||
| 158 | # Any received message should count as a heartbeat | ||
| 159 | self._meta['last_received_heartbeat'] = monotonic() | ||
| 160 | if self._meta['heartbeat_miss_count']: | ||
| 161 | # Reset the miss count too | ||
| 162 | self._meta['heartbeat_miss_count'] = 0 | ||
| 163 | |||
| 164 | try: | ||
| 165 | message = utils.messages.parse_message(msg) | ||
| 166 | except exceptions.InvalidMessageError: | ||
| 167 | logger.exception('Invalid message: %s' % str(msg)) | ||
| 168 | return | ||
| 169 | |||
| 170 | command = message[0].lower() | ||
| 171 | msgid = message[1] | ||
| 172 | message = message[2] | ||
| 173 | |||
| 174 | if hasattr(self, "on_%s" % command): | ||
| 175 | func = getattr(self, "on_%s" % command) | ||
| 176 | func(msgid, message) | ||
| 177 | else: | ||
| 178 | logger.warning('No handler for %s found (tried: %s)' % | ||
| 179 | (command.upper(), ('on_%s' % command))) | ||
| 180 | |||
| 181 | @property | ||
| 182 | def is_heartbeat_enabled(self): | ||
| 183 | """ | ||
| 184 | Property to check if heartbeating is enabled. Useful when certain | ||
| 185 | properties must be updated for heartbeating | ||
| 186 | Returns: | ||
| 187 | bool - True if heartbeating is enabled, False if it isn't | ||
| 188 | """ | ||
| 189 | if hasattr(self, '_meta') and 'last_sent_heartbeat' in self._meta: | ||
| 190 | return True | ||
| 191 | return False | ||
| 192 | |||
| 193 | |||
| 31 | class HeartbeatMixin(object): | 194 | class HeartbeatMixin(object): |
| 32 | """ | 195 | """ |
| 33 | Provides methods for implementing heartbeats | 196 | Provides methods for implementing heartbeats |
| @@ -36,6 +199,7 @@ class HeartbeatMixin(object): | |||
| 36 | """ | 199 | """ |
| 37 | Sets up some variables to track the state of heartbeaty things | 200 | Sets up some variables to track the state of heartbeaty things |
| 38 | """ | 201 | """ |
| 202 | super(HeartbeatMixin, self).__init__() | ||
| 39 | if not hasattr(self, '_meta'): | 203 | if not hasattr(self, '_meta'): |
| 40 | self._meta = {} | 204 | self._meta = {} |
| 41 | 205 | ||
| @@ -162,7 +326,7 @@ class ZMQSendMixin(object): | |||
| 162 | if conf.SUPER_DEBUG: | 326 | if conf.SUPER_DEBUG: |
| 163 | # If it's not at least 4 frames long then most likely it isn't an | 327 | # If it's not at least 4 frames long then most likely it isn't an |
| 164 | # eventmq message | 328 | # eventmq message |
| 165 | if len(msg) == 4 and \ | 329 | if len(msg) > 4 and \ |
| 166 | not ("HEARTBEAT" == msg[2] or "HEARTBEAT" == msg[3]) or \ | 330 | not ("HEARTBEAT" == msg[2] or "HEARTBEAT" == msg[3]) or \ |
| 167 | not conf.HIDE_HEARTBEAT_LOGS: | 331 | not conf.HIDE_HEARTBEAT_LOGS: |
| 168 | logger.debug('Sending message: %s' % str(msg)) | 332 | logger.debug('Sending message: %s' % str(msg)) |