diff options
| author | jason | 2015-12-04 17:15:05 -0700 |
|---|---|---|
| committer | jason | 2015-12-04 17:15:05 -0700 |
| commit | e59910fd96b38ec930128220fbdd67af4586e0d8 (patch) | |
| tree | 32e5de7fe64794b142de6f02b67d1ea2571d781d | |
| parent | fff3c0c9337fefac954614855c11087626ae9677 (diff) | |
| download | eventmq-e59910fd96b38ec930128220fbdd67af4586e0d8.tar.gz eventmq-e59910fd96b38ec930128220fbdd67af4586e0d8.zip | |
fix to logging, send multiple messages to workers
- JobManager now replies with READY when it's 'finished' with the job to
requeue the worker in the broker.
- some improvements to logging that prevents duplicate messages
| -rwxr-xr-x | bin/router | 3 | ||||
| -rwxr-xr-x | bin/scheduler | 3 | ||||
| -rw-r--r-- | docs/protocol.rst | 6 | ||||
| -rw-r--r-- | eventmq/client/__init__.py | 0 | ||||
| -rw-r--r-- | eventmq/client/messages.py | 61 | ||||
| -rw-r--r-- | eventmq/conf.py | 5 | ||||
| -rw-r--r-- | eventmq/jobmanager.py | 34 | ||||
| -rw-r--r-- | eventmq/log.py | 18 | ||||
| -rw-r--r-- | eventmq/poller.py | 4 | ||||
| -rw-r--r-- | eventmq/receiver.py | 5 | ||||
| -rw-r--r-- | eventmq/router.py | 81 | ||||
| -rw-r--r-- | eventmq/scheduler.py | 10 | ||||
| -rw-r--r-- | eventmq/sender.py | 5 | ||||
| -rw-r--r-- | eventmq/utils/classes.py | 28 | ||||
| -rw-r--r-- | eventmq/utils/messages.py | 29 |
15 files changed, 220 insertions, 72 deletions
| @@ -1,8 +1,9 @@ | |||
| 1 | #!/usr/bin/env python | 1 | #!/usr/bin/env python |
| 2 | # -*- mode: python -*- | 2 | # -*- mode: python -*- |
| 3 | 3 | from eventmq.log import setup_logger | |
| 4 | from eventmq.router import Router | 4 | from eventmq.router import Router |
| 5 | 5 | ||
| 6 | if __name__ == "__main__": | 6 | if __name__ == "__main__": |
| 7 | setup_logger('eventmq') | ||
| 7 | r = Router() | 8 | r = Router() |
| 8 | r.start() | 9 | r.start() |
diff --git a/bin/scheduler b/bin/scheduler index b591ce9..33f164e 100755 --- a/bin/scheduler +++ b/bin/scheduler | |||
| @@ -1,8 +1,9 @@ | |||
| 1 | #!/usr/bin/env python | 1 | #!/usr/bin/env python |
| 2 | # -*- mode: python -*- | 2 | # -*- mode: python -*- |
| 3 | 3 | from eventmq.log import setup_logger | |
| 4 | from eventmq.scheduler import Scheduler | 4 | from eventmq.scheduler import Scheduler |
| 5 | 5 | ||
| 6 | if __name__ == "__main__": | 6 | if __name__ == "__main__": |
| 7 | setup_logger("eventmq") | ||
| 7 | s = Scheduler() | 8 | s = Scheduler() |
| 8 | s.start() | 9 | s.start() |
diff --git a/docs/protocol.rst b/docs/protocol.rst index 86491dc..e468b06 100644 --- a/docs/protocol.rst +++ b/docs/protocol.rst | |||
| @@ -5,7 +5,7 @@ EventMQ Protocol Specification | |||
| 5 | 5 | ||
| 6 | Goals | 6 | Goals |
| 7 | ===== | 7 | ===== |
| 8 | The EventMQ Protocol (eMQP) defines a reliable service-oriented request-reply and pub-sub dialog between a set of clients, a broker, and a set of workers. This goal is to | 8 | The EventMQ Protocol (eMQP) defines a reliable service-oriented request-reply and pub-sub dialog between a set of clients, a broker, and a set of workers. This goal is to |
| 9 | 9 | ||
| 10 | The goals are to: | 10 | The goals are to: |
| 11 | 11 | ||
| @@ -13,7 +13,7 @@ The goals are to: | |||
| 13 | * Allow requests to be routed to workers by an abstracted service name. | 13 | * Allow requests to be routed to workers by an abstracted service name. |
| 14 | * Detect disconnected peers through heartbeating. | 14 | * Detect disconnected peers through heartbeating. |
| 15 | * Allow for message tracing and debugging. | 15 | * Allow for message tracing and debugging. |
| 16 | 16 | ||
| 17 | 17 | ||
| 18 | License | 18 | License |
| 19 | ======= | 19 | ======= |
| @@ -69,7 +69,7 @@ FRAME Value Description | |||
| 69 | ====== ============== =========== | 69 | ====== ============== =========== |
| 70 | 0 _EMPTY_ leave empty | 70 | 0 _EMPTY_ leave empty |
| 71 | 1 eMQP/1.0 Protocol version | 71 | 1 eMQP/1.0 Protocol version |
| 72 | 2 READY command | 72 | 2 REQUEST command |
| 73 | 3 _MSGID_ A unique id for the msg | 73 | 3 _MSGID_ A unique id for the msg |
| 74 | 4 _QUEUE_NAME_ the name of the queue the worker belongs to | 74 | 4 _QUEUE_NAME_ the name of the queue the worker belongs to |
| 75 | 5 _HEADERS_ dictionary of headers. can be an empty set | 75 | 5 _HEADERS_ dictionary of headers. can be an empty set |
diff --git a/eventmq/client/__init__.py b/eventmq/client/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/eventmq/client/__init__.py | |||
diff --git a/eventmq/client/messages.py b/eventmq/client/messages.py new file mode 100644 index 0000000..091b36f --- /dev/null +++ b/eventmq/client/messages.py | |||
| @@ -0,0 +1,61 @@ | |||
| 1 | # This file is part of eventmq. | ||
| 2 | # | ||
| 3 | # eventmq is free software: you can redistribute it and/or modify | ||
| 4 | # it under the terms of the GNU General Public License as published by | ||
| 5 | # the Free Software Foundation, either version 3 of the License, or | ||
| 6 | # (at your option) any later version. | ||
| 7 | # | ||
| 8 | # eventmq is distributed in the hope that it will be useful, | ||
| 9 | # but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
| 10 | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||
| 11 | # GNU General Public License for more details. | ||
| 12 | # | ||
| 13 | # You should have received a copy of the GNU General Public License | ||
| 14 | # along with eventmq. If not, see <http://www.gnu.org/licenses/>. | ||
| 15 | """ | ||
| 16 | :mod:`messages` -- Client Messaging | ||
| 17 | =================================== | ||
| 18 | """ | ||
| 19 | from json import dumps as serialize | ||
| 20 | |||
| 21 | from .. import conf | ||
| 22 | from ..utils.messages import send_emqp_message | ||
| 23 | |||
| 24 | |||
| 25 | def send_request(socket, message, reply_requested=False, guarantee=False, | ||
| 26 | retry_count=0, queue=None): | ||
| 27 | """ | ||
| 28 | Send a REQUEST command. | ||
| 29 | |||
| 30 | Default headers are always all disabled by default. If they are included in | ||
| 31 | the headers then they have been enabled. | ||
| 32 | """ | ||
| 33 | headers = [] | ||
| 34 | |||
| 35 | if reply_requested: | ||
| 36 | headers.append('reply-requested') | ||
| 37 | |||
| 38 | if guarantee: | ||
| 39 | headers.append('guarantee') | ||
| 40 | |||
| 41 | if retry_count > 0: | ||
| 42 | headers.append('retry-count:%d' % retry_count) | ||
| 43 | |||
| 44 | send_emqp_message(socket, 'REQUEST', | ||
| 45 | (queue or conf.DEFAULT_QUEUE_NAME, | ||
| 46 | ",".join(headers), | ||
| 47 | serialize(message)) | ||
| 48 | ) | ||
| 49 | |||
| 50 | |||
| 51 | def job(block=False): # Move to decorators.py | ||
| 52 | """ | ||
| 53 | run the decorated function on a worker | ||
| 54 | |||
| 55 | Args: | ||
| 56 | block (bool): Set to True if you wish to block and wait for the | ||
| 57 | response. This may be useful for running quick but cpu intesive | ||
| 58 | that would otherwise overwhelm a box that has to do it all alone. | ||
| 59 | (decryption?) | ||
| 60 | """ | ||
| 61 | pass | ||
diff --git a/eventmq/conf.py b/eventmq/conf.py index ff4fe0f..6e48080 100644 --- a/eventmq/conf.py +++ b/eventmq/conf.py | |||
| @@ -2,6 +2,11 @@ | |||
| 2 | # at different levels in the application | 2 | # at different levels in the application |
| 3 | SUPER_DEBUG = True | 3 | SUPER_DEBUG = True |
| 4 | 4 | ||
| 5 | # When a queue name isn't specified use this queue name for the default. It | ||
| 6 | # would be a good idea to have a handful of workers listening on this queue | ||
| 7 | # unless you're positive that everything specifies a queue with workers. | ||
| 8 | DEFAULT_QUEUE_NAME = 'default' | ||
| 9 | |||
| 5 | # {{{Job Manager | 10 | # {{{Job Manager |
| 6 | # How long should we wait before retrying to connect to a broker? | 11 | # How long should we wait before retrying to connect to a broker? |
| 7 | RECONNECT_TIMEOUT = 5 # in seconds | 12 | RECONNECT_TIMEOUT = 5 # in seconds |
diff --git a/eventmq/jobmanager.py b/eventmq/jobmanager.py index 17b0bbd..10a4a34 100644 --- a/eventmq/jobmanager.py +++ b/eventmq/jobmanager.py | |||
| @@ -17,18 +17,19 @@ | |||
| 17 | ================================ | 17 | ================================ |
| 18 | Ensures things about jobs and spawns the actual tasks | 18 | Ensures things about jobs and spawns the actual tasks |
| 19 | """ | 19 | """ |
| 20 | import logging | ||
| 20 | import uuid | 21 | import uuid |
| 21 | 22 | ||
| 22 | from . import conf, constants, exceptions, log, utils | 23 | from . import conf, constants, exceptions, utils |
| 23 | from .poller import Poller, POLLIN | 24 | from .poller import Poller, POLLIN |
| 24 | from .sender import Sender | 25 | from .sender import Sender |
| 25 | from .utils.classes import HeartbeatMixin | 26 | from .utils.classes import HeartbeatMixin |
| 26 | from .utils.messages import send_emqp_message as sendmsg | 27 | from .utils.messages import send_emqp_message as sendmsg |
| 27 | import utils.messages | 28 | import utils.messages |
| 28 | from .utils.timeutils import monotonic, timestamp | 29 | from .utils.timeutils import monotonic |
| 29 | 30 | ||
| 30 | 31 | ||
| 31 | logger = log.get_logger(__file__) | 32 | logger = logging.getLogger(__name__) |
| 32 | 33 | ||
| 33 | 34 | ||
| 34 | class JobManager(HeartbeatMixin): | 35 | class JobManager(HeartbeatMixin): |
| @@ -49,7 +50,6 @@ class JobManager(HeartbeatMixin): | |||
| 49 | generated. | 50 | generated. |
| 50 | """ | 51 | """ |
| 51 | super(JobManager, self).__init__(*args, **kwargs) | 52 | super(JobManager, self).__init__(*args, **kwargs) |
| 52 | print self._meta | ||
| 53 | self.name = kwargs.get('name', str(uuid.uuid4())) | 53 | self.name = kwargs.get('name', str(uuid.uuid4())) |
| 54 | logger.info('Initializing JobManager %s...' % self.name) | 54 | logger.info('Initializing JobManager %s...' % self.name) |
| 55 | self.incoming = Sender() | 55 | self.incoming = Sender() |
| @@ -141,6 +141,13 @@ class JobManager(HeartbeatMixin): | |||
| 141 | 'Reconnecting...') | 141 | 'Reconnecting...') |
| 142 | break | 142 | break |
| 143 | 143 | ||
| 144 | def on_request(self, msgid, msg): | ||
| 145 | """ | ||
| 146 | Handles a REQUEST command | ||
| 147 | """ | ||
| 148 | logger.debug("WHAT") | ||
| 149 | self.send_ready() | ||
| 150 | |||
| 144 | def process_message(self, msg): | 151 | def process_message(self, msg): |
| 145 | """ | 152 | """ |
| 146 | Processes a message | 153 | Processes a message |
| @@ -160,26 +167,31 @@ class JobManager(HeartbeatMixin): | |||
| 160 | logger.error('Invalid message: %s' % str(msg)) | 167 | logger.error('Invalid message: %s' % str(msg)) |
| 161 | return | 168 | return |
| 162 | 169 | ||
| 163 | if conf.SUPER_DEBUG: | ||
| 164 | logger.debug("Received Message: %s" % msg) | ||
| 165 | |||
| 166 | command = message[0] | 170 | command = message[0] |
| 167 | msgid = message[1] | 171 | msgid = message[1] |
| 168 | message = message[2] | 172 | message = message[2] |
| 169 | 173 | ||
| 170 | if hasattr(self, "on_%s" % command.lower()): | 174 | if hasattr(self, "on_%s" % command.lower()): |
| 171 | logger.debug('Calling on_%s' % command.lower()) | 175 | if conf.SUPER_DEBUG: |
| 176 | logger.debug('Calling on_%s' % command.lower()) | ||
| 172 | func = getattr(self, "on_%s" % command.lower()) | 177 | func = getattr(self, "on_%s" % command.lower()) |
| 173 | func(msgid, message) | 178 | func(msgid, message) |
| 174 | else: | 179 | else: |
| 175 | logger.warning('No handler for %s found (tried: %s)' % | 180 | logger.warning('No handler for %s found (tried: %s)' % |
| 176 | (command, ('on_%s' % command.lower()))) | 181 | (command, ('on_%s' % command.lower()))) |
| 177 | 182 | ||
| 178 | def send_inform(self): | 183 | def send_ready(self): |
| 184 | """ | ||
| 185 | send the READY command upstream to indicate that JobManager is ready | ||
| 186 | for another REQUEST message. | ||
| 187 | """ | ||
| 188 | sendmsg(self.incoming, 'READY') | ||
| 189 | |||
| 190 | def send_inform(self, queue=None): | ||
| 179 | """ | 191 | """ |
| 180 | Send an INFORM command | 192 | Send an INFORM command |
| 181 | """ | 193 | """ |
| 182 | sendmsg(self.incoming, 'INFORM', 'default_queuename') | 194 | sendmsg(self.incoming, 'INFORM', queue or conf.DEFAULT_QUEUE_NAME) |
| 183 | self._meta['last_sent_heartbeat'] = monotonic() | 195 | self._meta['last_sent_heartbeat'] = monotonic() |
| 184 | 196 | ||
| 185 | def on_ack(self, msgid, ackd_msgid): | 197 | def on_ack(self, msgid, ackd_msgid): |
| @@ -188,7 +200,7 @@ class JobManager(HeartbeatMixin): | |||
| 188 | """ | 200 | """ |
| 189 | # The msgid is the only frame in the message | 201 | # The msgid is the only frame in the message |
| 190 | ackd_msgid = ackd_msgid[0] | 202 | ackd_msgid = ackd_msgid[0] |
| 191 | logger.info('Received ACK for %s' % ackd_msgid) | 203 | logger.info('Received ACK for router (or client) %s' % ackd_msgid) |
| 192 | self.awaiting_startup_ack = False | 204 | self.awaiting_startup_ack = False |
| 193 | 205 | ||
| 194 | def on_heartbeat(self, msgid, message): | 206 | def on_heartbeat(self, msgid, message): |
diff --git a/eventmq/log.py b/eventmq/log.py index 487042a..072e1f3 100644 --- a/eventmq/log.py +++ b/eventmq/log.py | |||
| @@ -1,11 +1,15 @@ | |||
| 1 | """ | 1 | """ |
| 2 | log module for eventmq | 2 | log module for eventmq |
| 3 | |||
| 4 | this needs so much work. | ||
| 3 | """ | 5 | """ |
| 4 | import logging | 6 | import logging |
| 5 | 7 | ||
| 6 | import zmq | 8 | import zmq |
| 7 | import zmq.log.handlers | 9 | import zmq.log.handlers |
| 8 | 10 | ||
| 11 | import watchtower | ||
| 12 | |||
| 9 | 13 | ||
| 10 | FORMAT_STANDARD = logging.Formatter( | 14 | FORMAT_STANDARD = logging.Formatter( |
| 11 | '%(asctime)s - %(name)s %(levelname)s - %(message)s') | 15 | '%(asctime)s - %(name)s %(levelname)s - %(message)s') |
| @@ -28,12 +32,16 @@ class handlers(object): | |||
| 28 | """ | 32 | """ |
| 29 | PUBLISH_HANDLER = PUBHandler | 33 | PUBLISH_HANDLER = PUBHandler |
| 30 | STREAM_HANDLER = logging.StreamHandler | 34 | STREAM_HANDLER = logging.StreamHandler |
| 35 | CLOUDWATCH_HANDLER = watchtower.CloudWatchLogHandler | ||
| 31 | 36 | ||
| 32 | 37 | ||
| 33 | def get_logger(name, formatter=FORMAT_NAMELESS, | 38 | def setup_logger(base_name, formatter=FORMAT_STANDARD, |
| 34 | handler=handlers.STREAM_HANDLER): | 39 | handler=handlers.STREAM_HANDLER): |
| 35 | logger = logging.getLogger(name) | 40 | |
| 41 | logger = logging.getLogger(base_name) | ||
| 36 | logger.setLevel(logging.DEBUG) | 42 | logger.setLevel(logging.DEBUG) |
| 43 | |||
| 44 | # remove handlers we don't want | ||
| 37 | for h in logger.handlers: | 45 | for h in logger.handlers: |
| 38 | logger.removeHandler(h) | 46 | logger.removeHandler(h) |
| 39 | 47 | ||
| @@ -45,7 +53,9 @@ def get_logger(name, formatter=FORMAT_NAMELESS, | |||
| 45 | time.sleep(1) | 53 | time.sleep(1) |
| 46 | 54 | ||
| 47 | handler = handler(_handler_sock) | 55 | handler = handler(_handler_sock) |
| 48 | handler.root_topic = name | 56 | handler.root_topic = base_name |
| 57 | elif handler == handlers.CLOUDWATCH_HANDLER: | ||
| 58 | handler = handler(log_group='eventmq-dev') | ||
| 49 | else: | 59 | else: |
| 50 | handler = handler() | 60 | handler = handler() |
| 51 | 61 | ||
diff --git a/eventmq/poller.py b/eventmq/poller.py index 9c05b2d..73a00a4 100644 --- a/eventmq/poller.py +++ b/eventmq/poller.py | |||
| @@ -17,14 +17,14 @@ | |||
| 17 | ======================= | 17 | ======================= |
| 18 | Device for polling sockets | 18 | Device for polling sockets |
| 19 | """ | 19 | """ |
| 20 | import logging | ||
| 20 | import uuid | 21 | import uuid |
| 21 | 22 | ||
| 22 | import zmq | 23 | import zmq |
| 23 | from zmq import Poller as ZPoller | 24 | from zmq import Poller as ZPoller |
| 24 | 25 | ||
| 25 | from . import log | ||
| 26 | 26 | ||
| 27 | logger = log.get_logger(__package__) | 27 | logger = logging.getLogger(__name__) |
| 28 | 28 | ||
| 29 | POLLIN = zmq.POLLIN | 29 | POLLIN = zmq.POLLIN |
| 30 | POLLOUT = zmq.POLLOUT | 30 | POLLOUT = zmq.POLLOUT |
diff --git a/eventmq/receiver.py b/eventmq/receiver.py index 70b1279..e1ec062 100644 --- a/eventmq/receiver.py +++ b/eventmq/receiver.py | |||
| @@ -17,15 +17,16 @@ | |||
| 17 | =========================== | 17 | =========================== |
| 18 | The receiver is responsible for receiveing messages | 18 | The receiver is responsible for receiveing messages |
| 19 | """ | 19 | """ |
| 20 | import logging | ||
| 20 | import uuid | 21 | import uuid |
| 21 | 22 | ||
| 22 | import zmq | 23 | import zmq |
| 23 | 24 | ||
| 24 | from . import constants, log | 25 | from . import constants |
| 25 | from .utils.classes import ZMQReceiveMixin, ZMQSendMixin | 26 | from .utils.classes import ZMQReceiveMixin, ZMQSendMixin |
| 26 | 27 | ||
| 27 | 28 | ||
| 28 | logger = log.get_logger(__file__) | 29 | logger = logging.getLogger(__name__) |
| 29 | 30 | ||
| 30 | 31 | ||
| 31 | class Receiver(ZMQReceiveMixin, ZMQSendMixin): | 32 | class Receiver(ZMQReceiveMixin, ZMQSendMixin): |
diff --git a/eventmq/router.py b/eventmq/router.py index 821f422..145918b 100644 --- a/eventmq/router.py +++ b/eventmq/router.py | |||
| @@ -18,19 +18,21 @@ | |||
| 18 | Routes messages to workers (that are in named queues). | 18 | Routes messages to workers (that are in named queues). |
| 19 | """ | 19 | """ |
| 20 | from copy import copy | 20 | from copy import copy |
| 21 | import uuid | 21 | import logging |
| 22 | 22 | ||
| 23 | from . import conf, exceptions, log, poller, receiver | 23 | from . import conf, exceptions, poller, receiver |
| 24 | from .constants import STATUS | 24 | from .constants import STATUS |
| 25 | from .utils.classes import HeartbeatMixin | 25 | from .utils.classes import HeartbeatMixin |
| 26 | from .utils.messages import ( | 26 | from .utils.messages import ( |
| 27 | send_emqp_router_message as sendmsg, | 27 | send_emqp_router_message as sendmsg, |
| 28 | fwd_emqp_router_message as fwdmsg, | ||
| 28 | parse_router_message | 29 | parse_router_message |
| 29 | ) | 30 | ) |
| 31 | from .utils.devices import generate_device_name | ||
| 30 | from .utils.timeutils import monotonic, timestamp | 32 | from .utils.timeutils import monotonic, timestamp |
| 31 | 33 | ||
| 32 | 34 | ||
| 33 | logger = log.get_logger(__file__) | 35 | logger = logging.getLogger(__name__) |
| 34 | 36 | ||
| 35 | 37 | ||
| 36 | class Router(HeartbeatMixin): | 38 | class Router(HeartbeatMixin): |
| @@ -40,7 +42,7 @@ class Router(HeartbeatMixin): | |||
| 40 | def __init__(self, *args, **kwargs): | 42 | def __init__(self, *args, **kwargs): |
| 41 | super(Router, self).__init__(*args, **kwargs) # Creates _meta | 43 | super(Router, self).__init__(*args, **kwargs) # Creates _meta |
| 42 | 44 | ||
| 43 | self.name = str(uuid.uuid4()) | 45 | self.name = generate_device_name() |
| 44 | logger.info('Initializing Router %s...' % self.name) | 46 | logger.info('Initializing Router %s...' % self.name) |
| 45 | 47 | ||
| 46 | self.poller = poller.Poller() | 48 | self.poller = poller.Poller() |
| @@ -141,14 +143,14 @@ class Router(HeartbeatMixin): | |||
| 141 | """ | 143 | """ |
| 142 | self._meta['last_sent_heartbeat'] = monotonic() | 144 | self._meta['last_sent_heartbeat'] = monotonic() |
| 143 | 145 | ||
| 144 | for k in self.workers: | 146 | for worker_id in self.workers: |
| 145 | self.send_heartbeat(self.outgoing, k) | 147 | self.send_heartbeat(self.outgoing, worker_id) |
| 146 | 148 | ||
| 147 | def on_heartbeat(self, sender, msgid, msg): | 149 | def on_heartbeat(self, sender, msgid, msg): |
| 148 | """ | 150 | """ |
| 149 | a placeholder for a noop command. The actual 'logic' for HEARTBEAT is | 151 | a placeholder for a noop command. The actual 'logic' for HEARTBEAT is |
| 150 | in :meth:`self.process_worker_message` because any message from a worker | 152 | in :meth:`self.process_worker_message` because any message from a |
| 151 | counts as a HEARTBEAT | 153 | worker counts as a HEARTBEAT |
| 152 | """ | 154 | """ |
| 153 | 155 | ||
| 154 | def on_inform(self, sender, msgid, msg): | 156 | def on_inform(self, sender, msgid, msg): |
| @@ -158,20 +160,16 @@ class Router(HeartbeatMixin): | |||
| 158 | logger.info('Received INFORM request from %s' % sender) | 160 | logger.info('Received INFORM request from %s' % sender) |
| 159 | queue_name = msg[0] | 161 | queue_name = msg[0] |
| 160 | 162 | ||
| 161 | # Add the worker to our worker dict | 163 | self.add_worker(sender, queue_name) |
| 162 | self.workers[sender] = {} | ||
| 163 | self.workers[sender]['queues'] = queue_name | ||
| 164 | |||
| 165 | # Add the worker to the queues it supports | ||
| 166 | if queue_name in self.queues: | ||
| 167 | self.queues[queue_name] += (sender,) | ||
| 168 | else: | ||
| 169 | self.queues[queue_name] = (sender,) | ||
| 170 | logger.debug('Adding %s to the worker pool for %s' % | ||
| 171 | (sender, queue_name)) | ||
| 172 | 164 | ||
| 173 | self.send_ack(self.outgoing, sender, msgid) | 165 | self.send_ack(self.outgoing, sender, msgid) |
| 174 | 166 | ||
| 167 | def on_ready(self, sender, msgid, msg): | ||
| 168 | """ | ||
| 169 | A worker that we should already know about is ready for another job | ||
| 170 | """ | ||
| 171 | self.requeue_worker(sender) | ||
| 172 | |||
| 175 | def clean_up_dead_workers(self): | 173 | def clean_up_dead_workers(self): |
| 176 | """ | 174 | """ |
| 177 | Loops through the worker queues and removes any workers who haven't | 175 | Loops through the worker queues and removes any workers who haven't |
| @@ -189,6 +187,7 @@ class Router(HeartbeatMixin): | |||
| 189 | 187 | ||
| 190 | # If a worker started, then immediatly died then no hb dictionary | 188 | # If a worker started, then immediatly died then no hb dictionary |
| 191 | # was created so we should just remove that worker. | 189 | # was created so we should just remove that worker. |
| 190 | # hb stands for heartbeat | ||
| 192 | if 'hb' not in self.workers[worker_id]: | 191 | if 'hb' not in self.workers[worker_id]: |
| 193 | logger.info('Removing worker %s from the queue due to no ' | 192 | logger.info('Removing worker %s from the queue due to no ' |
| 194 | 'heartbeat' % (worker_id)) | 193 | 'heartbeat' % (worker_id)) |
| @@ -202,7 +201,7 @@ class Router(HeartbeatMixin): | |||
| 202 | # Remove the worker from the actual worker queues | 201 | # Remove the worker from the actual worker queues |
| 203 | del self.workers[worker_id] | 202 | del self.workers[worker_id] |
| 204 | 203 | ||
| 205 | def add_worker(self, id, queues=None): | 204 | def add_worker(self, worker_id, queues=None): |
| 206 | """ | 205 | """ |
| 207 | Adds a worker to worker queues | 206 | Adds a worker to worker queues |
| 208 | 207 | ||
| @@ -210,6 +209,32 @@ class Router(HeartbeatMixin): | |||
| 210 | worker_id: unique id of the worker to add | 209 | worker_id: unique id of the worker to add |
| 211 | queues: queue or queues this worker should be a member of | 210 | queues: queue or queues this worker should be a member of |
| 212 | """ | 211 | """ |
| 212 | # Add the worker to our worker dict | ||
| 213 | self.workers[worker_id] = {} | ||
| 214 | self.workers[worker_id]['queues'] = queues | ||
| 215 | |||
| 216 | # Add the worker to the queues it supports | ||
| 217 | if queues in self.queues: | ||
| 218 | self.queues[queues] += [worker_id, ] | ||
| 219 | else: | ||
| 220 | self.queues[queues] = [worker_id, ] | ||
| 221 | logger.debug('Adding %s to the worker pool for %s' % | ||
| 222 | (worker_id, str(queues))) | ||
| 223 | |||
| 224 | def requeue_worker(self, worker_id): | ||
| 225 | """ | ||
| 226 | Add a worker back to the queue pool | ||
| 227 | """ | ||
| 228 | if worker_id in self.workers: | ||
| 229 | queues = self.workers[worker_id].get('queues', None) | ||
| 230 | else: | ||
| 231 | queues = None | ||
| 232 | |||
| 233 | if queues: | ||
| 234 | logger.debug('Readding worker {} to queues {}'. | ||
| 235 | format(worker_id, queues)) | ||
| 236 | |||
| 237 | self.queues[queues].append(worker_id) | ||
| 213 | 238 | ||
| 214 | def on_receive_request(self, msg): | 239 | def on_receive_request(self, msg): |
| 215 | """ | 240 | """ |
| @@ -224,18 +249,24 @@ class Router(HeartbeatMixin): | |||
| 224 | 249 | ||
| 225 | queue_name = message[3][0] | 250 | queue_name = message[3][0] |
| 226 | 251 | ||
| 227 | # cheat here and forward the message to the workers | ||
| 228 | self.outgoing.zsocket.send_multipart(msg) | ||
| 229 | |||
| 230 | # If we have no workers for the queue TODO something about it | 252 | # If we have no workers for the queue TODO something about it |
| 231 | if queue_name not in self.queues: | 253 | if queue_name not in self.queues: |
| 232 | logger.warning("Received REQUEST with a queue I don't recognize") | 254 | logger.warning("Received REQUEST with a queue I don't recognize") |
| 233 | 255 | ||
| 256 | try: | ||
| 257 | worker_addr = self.queues[queue_name].pop() | ||
| 258 | except IndexError: | ||
| 259 | # There were no workers in the queue | ||
| 260 | raise NotImplementedError("TODO: buffer when there are no workers " | ||
| 261 | "waiting in the queue") | ||
| 262 | |||
| 263 | fwdmsg(self.outgoing, worker_addr, msg[1:]) | ||
| 264 | |||
| 234 | def process_worker_message(self, msg): | 265 | def process_worker_message(self, msg): |
| 235 | """ | 266 | """ |
| 236 | This method is called when a message comes in from the worker socket. | 267 | This method is called when a message comes in from the worker socket. |
| 237 | It then calls `on_command`. If `on_command` isn't found, then a warning | 268 | It then calls `on_COMMAND.lower()`. If `on_command` isn't found, then |
| 238 | is created. | 269 | a warning is created. |
| 239 | 270 | ||
| 240 | def on_inform(msg): | 271 | def on_inform(msg): |
| 241 | pass | 272 | pass |
diff --git a/eventmq/scheduler.py b/eventmq/scheduler.py index a9a823b..a234c34 100644 --- a/eventmq/scheduler.py +++ b/eventmq/scheduler.py | |||
| @@ -17,18 +17,17 @@ | |||
| 17 | ============================= | 17 | ============================= |
| 18 | Handles cron and other scheduled tasks | 18 | Handles cron and other scheduled tasks |
| 19 | """ | 19 | """ |
| 20 | import logging | ||
| 20 | import time | 21 | import time |
| 21 | 22 | ||
| 22 | from croniter import croniter | 23 | from croniter import croniter |
| 23 | from six import next | 24 | from six import next |
| 24 | 25 | ||
| 25 | from . import log | ||
| 26 | from .sender import Sender | 26 | from .sender import Sender |
| 27 | from .utils.classes import HeartbeatMixin | 27 | from .utils.classes import HeartbeatMixin |
| 28 | from .utils.devices import generate_device_name | ||
| 29 | from .utils.timeutils import monotonic, seconds_until, timestamp | 28 | from .utils.timeutils import monotonic, seconds_until, timestamp |
| 30 | 29 | ||
| 31 | logger = log.get_logger(__file__) | 30 | logger = logging.getLogger(__name__) |
| 32 | 31 | ||
| 33 | 32 | ||
| 34 | class Scheduler(HeartbeatMixin): | 33 | class Scheduler(HeartbeatMixin): |
| @@ -48,11 +47,12 @@ class Scheduler(HeartbeatMixin): | |||
| 48 | 47 | ||
| 49 | self.load_jobs() | 48 | self.load_jobs() |
| 50 | 49 | ||
| 51 | def connect(self, addr=''): | 50 | def connect(self, addr='tcp://127.0.0.1:47290'): |
| 52 | """ | 51 | """ |
| 53 | Connect the scheduler to worker/router at `addr` | 52 | Connect the scheduler to worker/router at `addr` |
| 54 | """ | 53 | """ |
| 55 | 54 | ||
| 55 | |||
| 56 | def load_jobs(self): | 56 | def load_jobs(self): |
| 57 | """ | 57 | """ |
| 58 | Loads the jobs that need to be scheduled | 58 | Loads the jobs that need to be scheduled |
| @@ -117,4 +117,4 @@ class Scheduler(HeartbeatMixin): | |||
| 117 | logger.debug("Next execution will be in %ss" % | 117 | logger.debug("Next execution will be in %ss" % |
| 118 | seconds_until(self.jobs[i][0])) | 118 | seconds_until(self.jobs[i][0])) |
| 119 | 119 | ||
| 120 | time.sleep(1) | 120 | time.sleep(0.1) |
diff --git a/eventmq/sender.py b/eventmq/sender.py index e9eb655..7a780c4 100644 --- a/eventmq/sender.py +++ b/eventmq/sender.py | |||
| @@ -17,14 +17,15 @@ | |||
| 17 | ======================= | 17 | ======================= |
| 18 | The sender is responsible for sending messages | 18 | The sender is responsible for sending messages |
| 19 | """ | 19 | """ |
| 20 | import logging | ||
| 20 | import uuid | 21 | import uuid |
| 21 | 22 | ||
| 22 | import zmq | 23 | import zmq |
| 23 | 24 | ||
| 24 | from . import constants, exceptions, log | 25 | from . import constants, exceptions |
| 25 | from .utils.classes import ZMQReceiveMixin, ZMQSendMixin | 26 | from .utils.classes import ZMQReceiveMixin, ZMQSendMixin |
| 26 | 27 | ||
| 27 | logger = log.get_logger(__file__) | 28 | logger = logging.getLogger(__name__) |
| 28 | 29 | ||
| 29 | 30 | ||
| 30 | class Sender(ZMQSendMixin, ZMQReceiveMixin): | 31 | class Sender(ZMQSendMixin, ZMQReceiveMixin): |
diff --git a/eventmq/utils/classes.py b/eventmq/utils/classes.py index 5a82b97..8b34431 100644 --- a/eventmq/utils/classes.py +++ b/eventmq/utils/classes.py | |||
| @@ -17,13 +17,15 @@ | |||
| 17 | ================================= | 17 | ================================= |
| 18 | Defines some classes to use when implementing ZMQ devices | 18 | Defines some classes to use when implementing ZMQ devices |
| 19 | """ | 19 | """ |
| 20 | import logging | ||
| 21 | |||
| 20 | import zmq.error | 22 | import zmq.error |
| 21 | 23 | ||
| 22 | from .. import conf, exceptions, log | 24 | from .. import conf, exceptions |
| 23 | from ..utils.messages import send_emqp_message as sendmsg | 25 | from ..utils.messages import send_emqp_message as sendmsg |
| 24 | from ..utils.timeutils import monotonic, timestamp | 26 | from ..utils.timeutils import monotonic, timestamp |
| 25 | 27 | ||
| 26 | logger = log.get_logger(__file__) | 28 | logger = logging.getLogger(__name__) |
| 27 | 29 | ||
| 28 | 30 | ||
| 29 | class HeartbeatMixin(object): | 31 | class HeartbeatMixin(object): |
| @@ -99,13 +101,19 @@ class ZMQReceiveMixin(object): | |||
| 99 | """ | 101 | """ |
| 100 | Receive a message | 102 | Receive a message |
| 101 | """ | 103 | """ |
| 102 | return self.zsocket.recv() | 104 | msg = self.zsocket.recv() |
| 105 | if conf.SUPER_DEBUG: | ||
| 106 | logger.debug('Received message: {}'.format(msg)) | ||
| 107 | return msg | ||
| 103 | 108 | ||
| 104 | def recv_multipart(self): | 109 | def recv_multipart(self): |
| 105 | """ | 110 | """ |
| 106 | Receive a multipart message | 111 | Receive a multipart message |
| 107 | """ | 112 | """ |
| 108 | return self.zsocket.recv_multipart() | 113 | msg = self.zsocket.recv_multipart() |
| 114 | if conf.SUPER_DEBUG: | ||
| 115 | logger.debug('Received message: {}'.format(msg)) | ||
| 116 | return msg | ||
| 109 | 117 | ||
| 110 | 118 | ||
| 111 | class ZMQSendMixin(object): | 119 | class ZMQSendMixin(object): |
| @@ -117,16 +125,16 @@ class ZMQSendMixin(object): | |||
| 117 | """ | 125 | """ |
| 118 | Send a message directly to the 0mq socket. Automatically inserts some | 126 | Send a message directly to the 0mq socket. Automatically inserts some |
| 119 | frames for your convience. The sent frame ends up looking something | 127 | frames for your convience. The sent frame ends up looking something |
| 120 | like identity | 128 | like this |
| 121 | 129 | ||
| 122 | (this, '', protocol_version) + (your, tuple) | 130 | (_recipient_id, '', protocol_version) + (your, tuple) |
| 123 | 131 | ||
| 124 | Args: | 132 | Args: |
| 125 | message (tuple): Raw message to send. | 133 | message (tuple): Raw message to send. |
| 126 | protocol_version (str): protocol version. it's good practice but | 134 | protocol_version (str): protocol version. it's good practice but |
| 127 | you may explicitly specify None to skip adding the version | 135 | you may explicitly specify None to skip adding the version |
| 128 | _recipient_id (object): When using a :attr:`zmq.ROUTER` you must | 136 | _recipient_id (object): When using a :attr:`zmq.ROUTER` you must |
| 129 | specify the the recipient id of the | 137 | specify the the recipient id of the remote socket |
| 130 | """ | 138 | """ |
| 131 | supported_msg_types = (tuple, list) | 139 | supported_msg_types = (tuple, list) |
| 132 | if not isinstance(message, supported_msg_types): | 140 | if not isinstance(message, supported_msg_types): |
| @@ -143,7 +151,10 @@ class ZMQSendMixin(object): | |||
| 143 | headers = ('', protocol_version, ) | 151 | headers = ('', protocol_version, ) |
| 144 | 152 | ||
| 145 | msg = headers + message | 153 | msg = headers + message |
| 146 | logger.debug('Sending message: %s' % str(msg)) | 154 | |
| 155 | if conf.SUPER_DEBUG: | ||
| 156 | logger.debug('Sending message: %s' % str(msg)) | ||
| 157 | |||
| 147 | try: | 158 | try: |
| 148 | self.zsocket.send_multipart(msg) | 159 | self.zsocket.send_multipart(msg) |
| 149 | except zmq.error.ZMQError as e: | 160 | except zmq.error.ZMQError as e: |
| @@ -159,5 +170,4 @@ class ZMQSendMixin(object): | |||
| 159 | protocol_version (str): protocol version. it's good practice, but | 170 | protocol_version (str): protocol version. it's good practice, but |
| 160 | you may explicitly specify None to skip adding the version | 171 | you may explicitly specify None to skip adding the version |
| 161 | """ | 172 | """ |
| 162 | logger.debug('Sending message: %s' % str(message)) | ||
| 163 | self.send_multipart((message, ), protocol_version) | 173 | self.send_multipart((message, ), protocol_version) |
diff --git a/eventmq/utils/messages.py b/eventmq/utils/messages.py index 643712c..3b26a1f 100644 --- a/eventmq/utils/messages.py +++ b/eventmq/utils/messages.py | |||
| @@ -16,9 +16,13 @@ | |||
| 16 | :mod:`messages` -- Message Utilities | 16 | :mod:`messages` -- Message Utilities |
| 17 | ========================================== | 17 | ========================================== |
| 18 | """ | 18 | """ |
| 19 | from .. import constants, log, exceptions | 19 | import logging |
| 20 | |||
| 21 | from .. import constants, exceptions | ||
| 20 | from . import random_characters | 22 | from . import random_characters |
| 21 | 23 | ||
| 24 | logger = logging.getLogger(__name__) | ||
| 25 | |||
| 22 | 26 | ||
| 23 | def parse_router_message(message): | 27 | def parse_router_message(message): |
| 24 | """ | 28 | """ |
| @@ -84,22 +88,20 @@ def generate_msgid(prefix=None): | |||
| 84 | id = random_characters() | 88 | id = random_characters() |
| 85 | return id if not prefix else str(prefix) + id | 89 | return id if not prefix else str(prefix) + id |
| 86 | 90 | ||
| 87 | logger = log.get_logger(__file__) | ||
| 88 | |||
| 89 | 91 | ||
| 90 | def send_emqp_message(socket, command, message=None): | 92 | def send_emqp_message(socket, command, message=None): |
| 91 | """ | 93 | """ |
| 92 | Formats and sends an eMQP message | 94 | Formats and sends an eMQP message |
| 93 | 95 | ||
| 94 | Args: | 96 | Args: |
| 95 | 97 | socket | |
| 98 | command | ||
| 99 | message | ||
| 96 | Raises: | 100 | Raises: |
| 97 | |||
| 98 | Returns | ||
| 99 | """ | 101 | """ |
| 100 | msg = (str(command).upper(), generate_msgid()) | 102 | msg = (str(command).upper(), generate_msgid()) |
| 101 | if message and isinstance(message, (tuple, list)): | 103 | if message and isinstance(message, (tuple, list)): |
| 102 | msg += message | 104 | msg += tuple(message) |
| 103 | elif message: | 105 | elif message: |
| 104 | msg += (message,) | 106 | msg += (message,) |
| 105 | 107 | ||
| @@ -129,3 +131,16 @@ def send_emqp_router_message(socket, recipient_id, command, message=None): | |||
| 129 | 131 | ||
| 130 | socket.send_multipart(msg, constants.PROTOCOL_VERSION, | 132 | socket.send_multipart(msg, constants.PROTOCOL_VERSION, |
| 131 | _recipient_id=recipient_id) | 133 | _recipient_id=recipient_id) |
| 134 | |||
| 135 | |||
| 136 | def fwd_emqp_router_message(socket, recipient_id, payload): | ||
| 137 | """ | ||
| 138 | Forwards `payload` to socket untouched. | ||
| 139 | |||
| 140 | .. note: | ||
| 141 | Because it's untouched, and because this function targets | ||
| 142 | :prop:`zmq.ROUTER`, it may be a good idea to first strip off the | ||
| 143 | leading sender id before forwarding it. If you dont you will need to | ||
| 144 | account for that on the recipient side. | ||
| 145 | """ | ||
| 146 | socket.zsocket.send_multipart([recipient_id, ] + payload) | ||