diff options
| author | jason | 2015-11-23 20:26:36 -0700 |
|---|---|---|
| committer | jason | 2015-11-23 20:26:36 -0700 |
| commit | 5de2b6bc337ac69204f0e709beafb28108e6ca20 (patch) | |
| tree | 91db39c12f97e12440b52d223f579872e10b8c13 | |
| parent | 61869e4ae2e23a94c9cc92bb625069c50858db80 (diff) | |
| download | eventmq-5de2b6bc337ac69204f0e709beafb28108e6ca20.tar.gz eventmq-5de2b6bc337ac69204f0e709beafb28108e6ca20.zip | |
Many changes
- organizes some code
- Router sends acknowledgement after jobmanager INFORMs
| -rw-r--r-- | docs/api.rst | 3 | ||||
| -rw-r--r-- | docs/contributing.rst | 6 | ||||
| -rw-r--r-- | docs/exceptions.rst | 2 | ||||
| -rw-r--r-- | docs/poller.rst | 3 | ||||
| -rw-r--r-- | docs/protocol.rst | 43 | ||||
| -rw-r--r-- | docs/utils/classes.rst | 3 | ||||
| -rw-r--r-- | docs/utils/messages.rst | 3 | ||||
| -rw-r--r-- | eventmq/constants.py | 13 | ||||
| -rw-r--r-- | eventmq/jobmanager.py | 82 | ||||
| -rw-r--r-- | eventmq/log.py | 8 | ||||
| -rw-r--r-- | eventmq/poller.py | 105 | ||||
| -rw-r--r-- | eventmq/receiver.py | 19 | ||||
| -rw-r--r-- | eventmq/router.py | 59 | ||||
| -rw-r--r-- | eventmq/sender.py | 71 | ||||
| -rw-r--r-- | eventmq/tests/test_utils.py | 11 | ||||
| -rw-r--r-- | eventmq/utils/__init__.py (renamed from eventmq/utils.py) | 40 | ||||
| -rw-r--r-- | eventmq/utils/classes.py | 93 | ||||
| -rw-r--r-- | eventmq/utils/messages.py | 132 |
18 files changed, 550 insertions, 146 deletions
diff --git a/docs/api.rst b/docs/api.rst index b0ecf4e..b3cdc83 100644 --- a/docs/api.rst +++ b/docs/api.rst | |||
| @@ -5,9 +5,10 @@ API Documentation | |||
| 5 | .. toctree:: | 5 | .. toctree:: |
| 6 | :maxdepth: 2 | 6 | :maxdepth: 2 |
| 7 | 7 | ||
| 8 | exceptions | ||
| 8 | jobmanager | 9 | jobmanager |
| 10 | poller | ||
| 9 | receiver | 11 | receiver |
| 10 | router | 12 | router |
| 11 | sender | 13 | sender |
| 12 | utils | 14 | utils |
| 13 | exceptions | ||
diff --git a/docs/contributing.rst b/docs/contributing.rst index c0a03fb..2489abb 100644 --- a/docs/contributing.rst +++ b/docs/contributing.rst | |||
| @@ -2,7 +2,13 @@ | |||
| 2 | Contributing to EventMQ | 2 | Contributing to EventMQ |
| 3 | ####################### | 3 | ####################### |
| 4 | 4 | ||
| 5 | A few tips when working on the code | ||
| 6 | |||
| 7 | * Use relative imports. If you use absolute imports then when you `import eventmq.exceptions` it's possible that you receive in return a different version of eventmq. exceptions installed somewhere else on the system. | ||
| 8 | |||
| 5 | .. toctree:: | 9 | .. toctree:: |
| 6 | :maxdepth: 1 | 10 | :maxdepth: 1 |
| 7 | 11 | ||
| 8 | protocol | 12 | protocol |
| 13 | Source Code <https://github.com/enderlabs/eventmq> | ||
| 14 | Issues <https://github.com/enderlabs/eventmq/issues> | ||
diff --git a/docs/exceptions.rst b/docs/exceptions.rst new file mode 100644 index 0000000..04f9c5a --- /dev/null +++ b/docs/exceptions.rst | |||
| @@ -0,0 +1,2 @@ | |||
| 1 | .. automodule:: eventmq.exceptions | ||
| 2 | :members: | ||
diff --git a/docs/poller.rst b/docs/poller.rst new file mode 100644 index 0000000..1aab472 --- /dev/null +++ b/docs/poller.rst | |||
| @@ -0,0 +1,3 @@ | |||
| 1 | .. automodule:: eventmq.poller | ||
| 2 | :members: | ||
| 3 | :special-members: | ||
diff --git a/docs/protocol.rst b/docs/protocol.rst index d9bd9e1..c32501c 100644 --- a/docs/protocol.rst +++ b/docs/protocol.rst | |||
| @@ -46,9 +46,22 @@ From the 0MQ manual[[2](http://api.zeromq.org/master:zmq-socket)] | |||
| 46 | 46 | ||
| 47 | This extra frame is not shown in the specifications below. | 47 | This extra frame is not shown in the specifications below. |
| 48 | 48 | ||
| 49 | Global Frames | ||
| 50 | ------------- | ||
| 51 | An **ACK** command consists of a 4-frame multipart message, formatted as follows. | ||
| 52 | |||
| 53 | ====== ============== =========== | ||
| 54 | FRAME Value Description | ||
| 55 | ====== ============== =========== | ||
| 56 | 0 _EMPTY_ leave empty | ||
| 57 | 1 eMQP/1.0 Protocol version | ||
| 58 | 2 ACK command | ||
| 59 | 3 _MSGID_ A unique id for the msg | ||
| 60 | ====== ============== =========== | ||
| 61 | |||
| 49 | eMQP / Client | 62 | eMQP / Client |
| 50 | ------------- | 63 | ------------- |
| 51 | A **REQUEST** command consists of 7-frame multipart message, formatted as follows. | 64 | A **REQUEST** command consists of a 7-frame multipart message, formatted as follows. |
| 52 | 65 | ||
| 53 | ====== ============== =========== | 66 | ====== ============== =========== |
| 54 | FRAME Value Description | 67 | FRAME Value Description |
| @@ -62,6 +75,20 @@ FRAME Value Description | |||
| 62 | 6 _MSG_ The message to send | 75 | 6 _MSG_ The message to send |
| 63 | ====== ============== =========== | 76 | ====== ============== =========== |
| 64 | 77 | ||
| 78 | A **PUBLISH** command consists of a 7-frame multipart messag, formatted as follows. | ||
| 79 | |||
| 80 | ====== ============== =========== | ||
| 81 | FRAME Value Description | ||
| 82 | ====== ============== =========== | ||
| 83 | 0 _EMPTY_ leave empty | ||
| 84 | 1 eMQP/1.0 Protocol version | ||
| 85 | 2 PUBLISH command | ||
| 86 | 3 _MSGID_ A unique id for the msg | ||
| 87 | 4 _TOPIC_NAME_ the name of the queue the worker belongs to | ||
| 88 | 5 _HEADERS_ dictionary of headers. can be an empty set | ||
| 89 | 6 _MSG_ The message to send | ||
| 90 | ====== ============== =========== | ||
| 91 | |||
| 65 | eMQP / Worker | 92 | eMQP / Worker |
| 66 | ------------- | 93 | ------------- |
| 67 | An **INFORM** command consists of a 5-frame multipart message, formatted as follows. | 94 | An **INFORM** command consists of a 5-frame multipart message, formatted as follows. |
| @@ -128,3 +155,17 @@ Heartbeating | |||
| 128 | * Both worker and broker MUST send heartbeats at regular and agreed-upon intervals. | 155 | * Both worker and broker MUST send heartbeats at regular and agreed-upon intervals. |
| 129 | * If the worker detects that the broker disconnected it SHOULD restart the conversation. | 156 | * If the worker detects that the broker disconnected it SHOULD restart the conversation. |
| 130 | * If the broker detects that a worker has disconnected it should stop sending it a message of any type. | 157 | * If the broker detects that a worker has disconnected it should stop sending it a message of any type. |
| 158 | |||
| 159 | Request Headers | ||
| 160 | --------------- | ||
| 161 | Headers MUST be 0 to many comma seperated values inserted into the header field. If there are no headers requried, send an empty string MUST be sent where headers are required. | ||
| 162 | |||
| 163 | Below is a table which defines and describes the headers. | ||
| 164 | |||
| 165 | =============== ======= ======= ======= =========== | ||
| 166 | Header REQUEST PUBLISH Default Description | ||
| 167 | =============== ======= ======= ======= =========== | ||
| 168 | reply-requested X False Once the job is finished, send a reply back with information from the job. If there is no information reply with a True value. | ||
| 169 | retry-count:# X 0 Retry a failed job this many times before accepting defeat. | ||
| 170 | guarantee X False Ensure the job completes by letting someone else worry about a success reply. | ||
| 171 | =============== ======= ======= ======= =========== | ||
diff --git a/docs/utils/classes.rst b/docs/utils/classes.rst new file mode 100644 index 0000000..ab48e06 --- /dev/null +++ b/docs/utils/classes.rst | |||
| @@ -0,0 +1,3 @@ | |||
| 1 | .. automodule:: eventmq.utils.classes | ||
| 2 | :members: | ||
| 3 | :special-members: | ||
diff --git a/docs/utils/messages.rst b/docs/utils/messages.rst new file mode 100644 index 0000000..e49897c --- /dev/null +++ b/docs/utils/messages.rst | |||
| @@ -0,0 +1,3 @@ | |||
| 1 | .. automodule:: eventmq.utils.messages | ||
| 2 | :members: | ||
| 3 | :special-members: | ||
diff --git a/eventmq/constants.py b/eventmq/constants.py index aff5a59..621a6a5 100644 --- a/eventmq/constants.py +++ b/eventmq/constants.py | |||
| @@ -1,9 +1,12 @@ | |||
| 1 | class STATUS(object): | 1 | class STATUS(object): |
| 2 | wtf = -1 | 2 | wtf = -1 # Something went wrong |
| 3 | ready = 100 | 3 | ready = 100 # Waiting to connect or listen |
| 4 | starting = 101 | 4 | starting = 101 # Starting to bind |
| 5 | listening = 201 | 5 | listening = 102 # bound |
| 6 | connected = 202 | 6 | connecting = 200 |
| 7 | connected = 201 | ||
| 7 | stopping = 300 | 8 | stopping = 300 |
| 9 | stopped = 301 | ||
| 8 | 10 | ||
| 11 | # See doc/protocol.rst | ||
| 9 | PROTOCOL_VERSION = 'eMQP/1.0' | 12 | PROTOCOL_VERSION = 'eMQP/1.0' |
diff --git a/eventmq/jobmanager.py b/eventmq/jobmanager.py index 85a90aa..d00d3e6 100644 --- a/eventmq/jobmanager.py +++ b/eventmq/jobmanager.py | |||
| @@ -17,14 +17,17 @@ | |||
| 17 | ================================ | 17 | ================================ |
| 18 | Ensures things about jobs and spawns the actual tasks | 18 | Ensures things about jobs and spawns the actual tasks |
| 19 | """ | 19 | """ |
| 20 | from time import sleep | ||
| 20 | import uuid | 21 | import uuid |
| 21 | 22 | ||
| 22 | from zmq.eventloop import ioloop | ||
| 23 | |||
| 24 | from . import constants | 23 | from . import constants |
| 24 | from . import exceptions | ||
| 25 | from . import log | 25 | from . import log |
| 26 | from . import utils | 26 | from . import utils |
| 27 | from .poller import Poller, POLLIN | ||
| 27 | from .sender import Sender | 28 | from .sender import Sender |
| 29 | from .utils.messages import send_emqp_message as sendmsg | ||
| 30 | import utils.messages | ||
| 28 | 31 | ||
| 29 | logger = log.get_logger(__file__) | 32 | logger = log.get_logger(__file__) |
| 30 | 33 | ||
| @@ -46,55 +49,80 @@ class JobManager(object): | |||
| 46 | name (str): unique name of this instance. By default a uuid will be | 49 | name (str): unique name of this instance. By default a uuid will be |
| 47 | generated. | 50 | generated. |
| 48 | """ | 51 | """ |
| 49 | ioloop.install() | ||
| 50 | self.name = kwargs.get('name', str(uuid.uuid4())) | 52 | self.name = kwargs.get('name', str(uuid.uuid4())) |
| 51 | self.incoming = Sender() | 53 | self.incoming = Sender() |
| 54 | self.poller = Poller() | ||
| 55 | |||
| 56 | # Alert us of both incoming and outgoing events | ||
| 57 | self.poller.register(self.incoming, POLLIN) | ||
| 52 | 58 | ||
| 53 | self.status = constants.STATUS.ready | 59 | self.status = constants.STATUS.ready |
| 54 | 60 | ||
| 61 | # Are we waiting for an acknowledgment for something? | ||
| 62 | self.awaiting_ack = False | ||
| 63 | |||
| 55 | def start(self, addr='tcp://127.0.0.1:47291'): | 64 | def start(self, addr='tcp://127.0.0.1:47291'): |
| 56 | """ | 65 | """ |
| 57 | Begin listening for job requests | 66 | Connect to `addr` and begin listening for job requests |
| 58 | 67 | ||
| 59 | Args: | 68 | Args: |
| 60 | args (str): connection string to connect to | 69 | args (str): connection string to connect to |
| 61 | """ | 70 | """ |
| 71 | self.status = constants.STATUS.connecting | ||
| 62 | self.incoming.connect(addr) | 72 | self.incoming.connect(addr) |
| 63 | self.status = constants.STATUS.listening | ||
| 64 | 73 | ||
| 74 | self.awaiting_ack = True | ||
| 75 | |||
| 76 | #while self.awaiting_ack: | ||
| 65 | self.send_inform() | 77 | self.send_inform() |
| 66 | ioloop.IOLoop.instance().start() | 78 | # sleep(5) |
| 67 | 79 | ||
| 68 | def process_job(self, msg): | 80 | self.status = constants.STATUS.connected |
| 69 | pass | ||
| 70 | 81 | ||
| 71 | def sync(self): | 82 | while True: |
| 72 | pass | 83 | events = self.poller.poll(1000) |
| 73 | 84 | ||
| 74 | def send_message(self, command, message): | 85 | if events.get(self.incoming) == POLLIN: |
| 75 | """ | 86 | msg = self.incoming.recv_multipart() |
| 76 | send a message to `self.incoming` | 87 | self.process_message(msg) |
| 77 | Args: | ||
| 78 | message: a msg tuple to send | ||
| 79 | Raises: | ||
| 80 | 88 | ||
| 81 | Returns | 89 | def process_message(self, msg): |
| 82 | """ | 90 | """ |
| 83 | msg = (str(command).upper(), utils.generate_msgid()) | 91 | Processes a message |
| 84 | if isinstance(message, (tuple, list)): | 92 | """ |
| 85 | msg += message | 93 | try: |
| 94 | message = utils.messages.parse_message(msg) | ||
| 95 | except exceptions.InvalidMessageError: | ||
| 96 | logger.error('Invalid message: %s' % str(msg)) | ||
| 97 | return | ||
| 98 | |||
| 99 | command = message[0] | ||
| 100 | msgid = message[1] | ||
| 101 | message = message[2] | ||
| 102 | |||
| 103 | if hasattr(self, "on_%s" % command.lower()): | ||
| 104 | logger.debug('Calling on_%s' % command.lower()) | ||
| 105 | func = getattr(self, "on_%s" % command.lower()) | ||
| 106 | func(msgid, message) | ||
| 86 | else: | 107 | else: |
| 87 | msg += (message,) | 108 | logger.warning('No handler for %s found (tried: %s)' % |
| 109 | (command, ('on_%s' % command.lower))) | ||
| 88 | 110 | ||
| 89 | logger.debug('Sending message: %s' % str(msg)) | 111 | def process_job(self, msg): |
| 90 | self.incoming.send_multipart(msg, constants.PROTOCOL_VERSION) | 112 | pass |
| 113 | |||
| 114 | def sync(self): | ||
| 115 | pass | ||
| 91 | 116 | ||
| 92 | def send_inform(self): | 117 | def send_inform(self): |
| 93 | """ | 118 | """ |
| 94 | Send an INFORM frame | 119 | Send an INFORM frame |
| 95 | """ | 120 | """ |
| 96 | self.send_message('INFORM', 'default_queuename') | 121 | sendmsg(self.incoming, 'INFORM', 'default_queuename') |
| 97 | |||
| 98 | 122 | ||
| 99 | def respond(self): | 123 | def on_ack(self, msgid, message): |
| 100 | pass | 124 | """ |
| 125 | Sets :attr:`awaiting_ack` to False | ||
| 126 | """ | ||
| 127 | logger.info('Recieved ACK') | ||
| 128 | self.awaiting_ack = False | ||
diff --git a/eventmq/log.py b/eventmq/log.py index 836126d..487042a 100644 --- a/eventmq/log.py +++ b/eventmq/log.py | |||
| @@ -8,7 +8,9 @@ import zmq.log.handlers | |||
| 8 | 8 | ||
| 9 | 9 | ||
| 10 | FORMAT_STANDARD = logging.Formatter( | 10 | FORMAT_STANDARD = logging.Formatter( |
| 11 | '%(asctime)s - %(name)s %(levelname)s - "%(message)s') | 11 | '%(asctime)s - %(name)s %(levelname)s - %(message)s') |
| 12 | FORMAT_NAMELESS = logging.Formatter( | ||
| 13 | '%(asctime)s - %(levelname)s - %(message)s') | ||
| 12 | 14 | ||
| 13 | 15 | ||
| 14 | class PUBHandler(zmq.log.handlers.PUBHandler): | 16 | class PUBHandler(zmq.log.handlers.PUBHandler): |
| @@ -28,10 +30,12 @@ class handlers(object): | |||
| 28 | STREAM_HANDLER = logging.StreamHandler | 30 | STREAM_HANDLER = logging.StreamHandler |
| 29 | 31 | ||
| 30 | 32 | ||
| 31 | def get_logger(name, formatter=FORMAT_STANDARD, | 33 | def get_logger(name, formatter=FORMAT_NAMELESS, |
| 32 | handler=handlers.STREAM_HANDLER): | 34 | handler=handlers.STREAM_HANDLER): |
| 33 | logger = logging.getLogger(name) | 35 | logger = logging.getLogger(name) |
| 34 | logger.setLevel(logging.DEBUG) | 36 | logger.setLevel(logging.DEBUG) |
| 37 | for h in logger.handlers: | ||
| 38 | logger.removeHandler(h) | ||
| 35 | 39 | ||
| 36 | if handler == handlers.PUBLISH_HANDLER: | 40 | if handler == handlers.PUBLISH_HANDLER: |
| 37 | _handler_sock = zmq.Context.instance().socket(zmq.PUB) | 41 | _handler_sock = zmq.Context.instance().socket(zmq.PUB) |
diff --git a/eventmq/poller.py b/eventmq/poller.py new file mode 100644 index 0000000..82b8c2d --- /dev/null +++ b/eventmq/poller.py | |||
| @@ -0,0 +1,105 @@ | |||
| 1 | # This file is part of eventmq. | ||
| 2 | # | ||
| 3 | # eventmq is free software: you can redistribute it and/or modify | ||
| 4 | # it under the terms of the GNU General Public License as published by | ||
| 5 | # the Free Software Foundation, either version 3 of the License, or | ||
| 6 | # (at your option) any later version. | ||
| 7 | # | ||
| 8 | # eventmq is distributed in the hope that it will be useful, | ||
| 9 | # but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
| 10 | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||
| 11 | # GNU General Public License for more details. | ||
| 12 | # | ||
| 13 | # You should have received a copy of the GNU General Public License | ||
| 14 | # along with eventmq. If not, see <http://www.gnu.org/licenses/>. | ||
| 15 | """ | ||
| 16 | :mod:`poller` -- Poller | ||
| 17 | ======================= | ||
| 18 | Device for polling sockets | ||
| 19 | """ | ||
| 20 | import uuid | ||
| 21 | |||
| 22 | import zmq | ||
| 23 | from zmq import Poller as ZPoller | ||
| 24 | |||
| 25 | from . import log | ||
| 26 | |||
| 27 | logger = log.get_logger(__package__) | ||
| 28 | |||
| 29 | POLLIN = zmq.POLLIN | ||
| 30 | POLLOUT = zmq.POLLOUT | ||
| 31 | POLLINOUT = zmq.POLLIN | zmq.POLLOUT | ||
| 32 | |||
| 33 | |||
| 34 | class Poller(ZPoller): | ||
| 35 | """ | ||
| 36 | :class:`zmq.Poller` based class. | ||
| 37 | """ | ||
| 38 | def __init__(self, *args, **kwargs): | ||
| 39 | """ | ||
| 40 | See :class:`zmq.Poller` | ||
| 41 | """ | ||
| 42 | super(Poller, self).__init__(*args, **kwargs) | ||
| 43 | self.name = str(uuid.uuid4()) | ||
| 44 | |||
| 45 | self._sockets = [] | ||
| 46 | |||
| 47 | def register(self, socket, flag=0): | ||
| 48 | """ | ||
| 49 | Register a socket to be polled by this poller | ||
| 50 | |||
| 51 | Args: | ||
| 52 | socket (socket): The socket object to register | ||
| 53 | flags (int): :attr:`POLLIN`, :attr:`POLLOUT`, or | ||
| 54 | :attr:`POLLIN`|:attr:`POLLOUT` for both. If undefined the | ||
| 55 | socket remains unregistered. | ||
| 56 | """ | ||
| 57 | if not flag: | ||
| 58 | logger.warning("Leaving socket %s unregistered because no flag " | ||
| 59 | "was defined" % socket.name) | ||
| 60 | return | ||
| 61 | logger.debug('Registering %s with poller. (flags:%s)' % (socket.name, | ||
| 62 | flag)) | ||
| 63 | self._sockets.append(socket) | ||
| 64 | |||
| 65 | super(Poller, self).register(socket.zsocket, flag) | ||
| 66 | |||
| 67 | def unregister(self, socket): | ||
| 68 | """ | ||
| 69 | Unregister a socket from being polled | ||
| 70 | |||
| 71 | Args: | ||
| 72 | socket (socket): The socket object to registering | ||
| 73 | """ | ||
| 74 | logger.debug("Unregistering %s from poller" % socket.name) | ||
| 75 | if socket not in self._sockets: | ||
| 76 | logger.warning("Attempt to unregister unregistered socket from " | ||
| 77 | "poller: socket: %s" % socket.name) | ||
| 78 | |||
| 79 | self._sockets.remove(socket.zsocket) | ||
| 80 | super(Poller, self).unregister(socket.zsocket) | ||
| 81 | |||
| 82 | def poll(self, timeout=1): | ||
| 83 | """ | ||
| 84 | Calling :meth:`zmq.Poller.poll` directly returns a tuple set. This | ||
| 85 | method typecasts to a dictionary for convience using the socket object | ||
| 86 | as the key. If a socket doesn't appear in the returned dictionary then | ||
| 87 | no events happened. | ||
| 88 | |||
| 89 | Args: | ||
| 90 | timeout (int): How long should poller wait for before iterating | ||
| 91 | the next loop. | ||
| 92 | |||
| 93 | Returns (dict) Dictionary using the socket as the key and the event the | ||
| 94 | socket generated as the value | ||
| 95 | """ | ||
| 96 | events = dict(super(Poller, self).poll(timeout)) | ||
| 97 | return_events = {} | ||
| 98 | |||
| 99 | for socket in self._sockets: | ||
| 100 | if socket.zsocket not in events: | ||
| 101 | continue | ||
| 102 | |||
| 103 | return_events[socket] = events[socket.zsocket] | ||
| 104 | |||
| 105 | return return_events | ||
diff --git a/eventmq/receiver.py b/eventmq/receiver.py index dc1c9ef..5800653 100644 --- a/eventmq/receiver.py +++ b/eventmq/receiver.py | |||
| @@ -22,13 +22,14 @@ import uuid | |||
| 22 | import zmq | 22 | import zmq |
| 23 | from zmq.eventloop import zmqstream | 23 | from zmq.eventloop import zmqstream |
| 24 | 24 | ||
| 25 | import eventmq | 25 | from . import constants, log |
| 26 | import log | 26 | from .utils.classes import ZMQReceiveMixin, ZMQSendMixin |
| 27 | |||
| 27 | 28 | ||
| 28 | logger = log.get_logger(__file__) | 29 | logger = log.get_logger(__file__) |
| 29 | 30 | ||
| 30 | 31 | ||
| 31 | class Receiver(object): | 32 | class Receiver(ZMQReceiveMixin, ZMQSendMixin): |
| 32 | """ | 33 | """ |
| 33 | Receives messages and pass them to a on_recv. | 34 | Receives messages and pass them to a on_recv. |
| 34 | 35 | ||
| @@ -72,6 +73,8 @@ class Receiver(object): | |||
| 72 | self.zsocket.setsockopt(zmq.ROUTER_MANDATORY, 1) | 73 | self.zsocket.setsockopt(zmq.ROUTER_MANDATORY, 1) |
| 73 | 74 | ||
| 74 | if not self.skip_zmqstream: | 75 | if not self.skip_zmqstream: |
| 76 | # To use the built in ioloop, we have to wrap the socket in this | ||
| 77 | # ZMQStream object | ||
| 75 | if not callable(self.on_recv): | 78 | if not callable(self.on_recv): |
| 76 | raise TypeError('Required argument "on_recv" is not actually ' | 79 | raise TypeError('Required argument "on_recv" is not actually ' |
| 77 | 'callable') | 80 | 'callable') |
| @@ -80,7 +83,7 @@ class Receiver(object): | |||
| 80 | self.zsocket = zmqstream.ZMQStream(self.zsocket) | 83 | self.zsocket = zmqstream.ZMQStream(self.zsocket) |
| 81 | self.zsocket.on_recv(self.on_recv) | 84 | self.zsocket.on_recv(self.on_recv) |
| 82 | 85 | ||
| 83 | self.status = eventmq.STATUS.ready | 86 | self.status = constants.STATUS.ready |
| 84 | 87 | ||
| 85 | def listen(self, addr=None): | 88 | def listen(self, addr=None): |
| 86 | """ | 89 | """ |
| @@ -94,7 +97,7 @@ class Receiver(object): | |||
| 94 | """ | 97 | """ |
| 95 | if self.ready: | 98 | if self.ready: |
| 96 | self.zsocket.bind(addr) | 99 | self.zsocket.bind(addr) |
| 97 | self.status = eventmq.STATUS.listening | 100 | self.status = constants.STATUS.listening |
| 98 | logger.info('Receiver %s: Listening on %s' % (self.name, addr)) | 101 | logger.info('Receiver %s: Listening on %s' % (self.name, addr)) |
| 99 | else: | 102 | else: |
| 100 | raise Exception('Receiver %s not ready. status=%s' % | 103 | raise Exception('Receiver %s not ready. status=%s' % |
| @@ -112,8 +115,8 @@ class Receiver(object): | |||
| 112 | """ | 115 | """ |
| 113 | if self.ready: | 116 | if self.ready: |
| 114 | self.zsocket.connect(addr) | 117 | self.zsocket.connect(addr) |
| 115 | self.status = eventmq.STATUS.connected | 118 | self.status = constants.STATUS.connected |
| 116 | logger.info('Receiver %s: Connected to %s' % (self.name, addr)) | 119 | logger.debug('Receiver %s: Connected to %s' % (self.name, addr)) |
| 117 | else: | 120 | else: |
| 118 | raise Exception('Receiver %s not ready. status=%s' % | 121 | raise Exception('Receiver %s not ready. status=%s' % |
| 119 | (self.name, self.status)) | 122 | (self.name, self.status)) |
| @@ -127,4 +130,4 @@ class Receiver(object): | |||
| 127 | bool: True if the receiver is ready to connect or listen, otherwise | 130 | bool: True if the receiver is ready to connect or listen, otherwise |
| 128 | False | 131 | False |
| 129 | """ | 132 | """ |
| 130 | return self.status == eventmq.STATUS.ready | 133 | return self.status == constants.STATUS.ready |
diff --git a/eventmq/router.py b/eventmq/router.py index b71c6e8..77b752d 100644 --- a/eventmq/router.py +++ b/eventmq/router.py | |||
| @@ -22,9 +22,11 @@ import uuid | |||
| 22 | from zmq.eventloop import ioloop | 22 | from zmq.eventloop import ioloop |
| 23 | 23 | ||
| 24 | from .constants import STATUS | 24 | from .constants import STATUS |
| 25 | from . import log | 25 | from . import exceptions, log, receiver, utils |
| 26 | from . import receiver | 26 | from .utils.messages import ( |
| 27 | from . import utils | 27 | send_emqp_router_message as sendmsg, |
| 28 | parse_router_message | ||
| 29 | ) | ||
| 28 | 30 | ||
| 29 | logger = log.get_logger(__file__) | 31 | logger = log.get_logger(__file__) |
| 30 | 32 | ||
| @@ -48,6 +50,7 @@ class Router(object): | |||
| 48 | 50 | ||
| 49 | self.status = STATUS.ready | 51 | self.status = STATUS.ready |
| 50 | logger.info('Done initializing Router %s' % self.name) | 52 | logger.info('Done initializing Router %s' % self.name) |
| 53 | self.queues = {} | ||
| 51 | 54 | ||
| 52 | def start(self, | 55 | def start(self, |
| 53 | frontend_addr='tcp://127.0.0.1:47290', | 56 | frontend_addr='tcp://127.0.0.1:47290', |
| @@ -70,21 +73,67 @@ class Router(object): | |||
| 70 | 73 | ||
| 71 | ioloop.IOLoop.instance().start() | 74 | ioloop.IOLoop.instance().start() |
| 72 | 75 | ||
| 76 | def send_ack(self, socket, recipient): | ||
| 77 | """ | ||
| 78 | Sends an ACK response | ||
| 79 | """ | ||
| 80 | logger.info('Sending ACK to %s' % recipient) | ||
| 81 | sendmsg(socket, recipient, 'ACK') | ||
| 82 | |||
| 83 | def on_inform(self, sender, msgid, msg): | ||
| 84 | """ | ||
| 85 | Handles an INFORM message. Usually when new worker coming online | ||
| 86 | """ | ||
| 87 | logger.info('Received INFORM request from %s') | ||
| 88 | queue_name = msg[0] | ||
| 89 | |||
| 90 | if queue_name in self.queues: | ||
| 91 | self.queues[queue_name] += (sender,) | ||
| 92 | else: | ||
| 93 | self.queues[queue_name] = (sender,) | ||
| 94 | |||
| 95 | self.send_ack(self.outgoing, sender) | ||
| 96 | |||
| 73 | def on_receive_request(self, msg): | 97 | def on_receive_request(self, msg): |
| 74 | """ | 98 | """ |
| 75 | This function is called when a message comes in from the client socket. | 99 | This function is called when a message comes in from the client socket. |
| 76 | It then calls `on_command`. If `on_command` isn't found, then a | 100 | It then calls `on_command`. If `on_command` isn't found, then a |
| 77 | warning is created. | 101 | warning is created. |
| 78 | """ | 102 | """ |
| 79 | logger.info(str(utils.parse_message(msg))) | 103 | try: |
| 104 | message = parse_router_message(msg) | ||
| 105 | except exceptions.InvalidMessageError: | ||
| 106 | logger.exception('Invalid message from clients: %s' % str(msg)) | ||
| 107 | |||
| 108 | queue_name = message[3][0] | ||
| 80 | 109 | ||
| 81 | # do some things and forward it to the workers | 110 | # do some things and forward it to the workers |
| 82 | self.outgoing.send_multipart(msg) | 111 | self.outgoing.send_multipart(msg) |
| 83 | 112 | ||
| 113 | # If we have no workers for the queue TODO something about it | ||
| 114 | if queue_name not in self.queues: | ||
| 115 | logger.warning("Received REQUEST with a queue I don't recognize") | ||
| 116 | |||
| 84 | def on_receive_reply(self, msg): | 117 | def on_receive_reply(self, msg): |
| 85 | """ | 118 | """ |
| 86 | This method is called when a message comes in from the worker socket. | 119 | This method is called when a message comes in from the worker socket. |
| 87 | It then calls `on_command`. If `on_command` isn't found, then a warning | 120 | It then calls `on_command`. If `on_command` isn't found, then a warning |
| 88 | is created. | 121 | is created. |
| 122 | |||
| 123 | def on_inform(msg): | ||
| 124 | pass | ||
| 89 | """ | 125 | """ |
| 90 | logger.info(str(utils.parse_message(msg))) | 126 | try: |
| 127 | message = parse_router_message(msg) | ||
| 128 | except exceptions.InvalidMessageError: | ||
| 129 | logger.exception('Invalid message from workers: %s' % str(msg)) | ||
| 130 | return | ||
| 131 | |||
| 132 | sender = message[0] | ||
| 133 | command = message[1] | ||
| 134 | msgid = message[2] | ||
| 135 | message = message[3] | ||
| 136 | |||
| 137 | if hasattr(self, "on_%s" % command.lower()): | ||
| 138 | func = getattr(self, "on_%s" % command.lower()) | ||
| 139 | func(sender, msgid, message) | ||
diff --git a/eventmq/sender.py b/eventmq/sender.py index 2c19899..ddada6a 100644 --- a/eventmq/sender.py +++ b/eventmq/sender.py | |||
| @@ -25,11 +25,12 @@ from zmq.eventloop import zmqstream | |||
| 25 | from . import constants | 25 | from . import constants |
| 26 | from . import exceptions | 26 | from . import exceptions |
| 27 | from . import log | 27 | from . import log |
| 28 | from .utils.classes import ZMQReceiveMixin, ZMQSendMixin | ||
| 28 | 29 | ||
| 29 | logger = log.get_logger(__file__) | 30 | logger = log.get_logger(__file__) |
| 30 | 31 | ||
| 31 | 32 | ||
| 32 | class Sender(object): | 33 | class Sender(ZMQSendMixin, ZMQReceiveMixin): |
| 33 | """ | 34 | """ |
| 34 | Sends messages to a particular socket | 35 | Sends messages to a particular socket |
| 35 | 36 | ||
| @@ -39,7 +40,7 @@ class Sender(object): | |||
| 39 | 40 | ||
| 40 | Attributes: | 41 | Attributes: |
| 41 | name (str): Name of this socket | 42 | name (str): Name of this socket |
| 42 | zcontext (:class`zmq.Context`): socket context | 43 | zcontext (:class:`zmq.Context`): socket context |
| 43 | zsocket (:class:`zmq.Socket`): socket wrapped up in a | 44 | zsocket (:class:`zmq.Socket`): socket wrapped up in a |
| 44 | :class:`zmqstream.ZMQStream` | 45 | :class:`zmqstream.ZMQStream` |
| 45 | """ | 46 | """ |
| @@ -67,7 +68,7 @@ class Sender(object): | |||
| 67 | self.zsocket.setsockopt(zmq.IDENTITY, self.name) | 68 | self.zsocket.setsockopt(zmq.IDENTITY, self.name) |
| 68 | 69 | ||
| 69 | if not kwargs.get('skip_zmqstream', True): | 70 | if not kwargs.get('skip_zmqstream', True): |
| 70 | logger.debug('Using ZMQStream') | 71 | logger.info('Using ZMQStream') |
| 71 | self.zsocket = zmqstream.ZMQStream(self.zsocket) | 72 | self.zsocket = zmqstream.ZMQStream(self.zsocket) |
| 72 | self.zsocket.on_recv(kwargs.get('on_recv')) | 73 | self.zsocket.on_recv(kwargs.get('on_recv')) |
| 73 | 74 | ||
| @@ -86,10 +87,10 @@ class Sender(object): | |||
| 86 | if self.ready: | 87 | if self.ready: |
| 87 | self.zsocket.bind(addr) | 88 | self.zsocket.bind(addr) |
| 88 | self.status = constants.STATUS.listening | 89 | self.status = constants.STATUS.listening |
| 89 | logger.info('Receiver %s: Listening on %s' % (self.name, addr)) | 90 | logger.info('Listening on %s' % (addr)) |
| 90 | else: | 91 | else: |
| 91 | raise exceptions.EventMQError('Receiver %s not ready. status=%s' % | 92 | raise exceptions.EventMQError('Not ready. status=%s' % |
| 92 | (self.name, self.status)) | 93 | (self.status)) |
| 93 | 94 | ||
| 94 | def connect(self, addr=None): | 95 | def connect(self, addr=None): |
| 95 | """ | 96 | """ |
| @@ -104,10 +105,10 @@ class Sender(object): | |||
| 104 | if self.ready: | 105 | if self.ready: |
| 105 | self.zsocket.connect(addr) | 106 | self.zsocket.connect(addr) |
| 106 | self.status = constants.STATUS.connected | 107 | self.status = constants.STATUS.connected |
| 107 | logger.info('Receiver %s: Connected to %s' % (self.name, addr)) | 108 | logger.debug('Connecting to %s' % (addr)) |
| 108 | else: | 109 | else: |
| 109 | raise exceptions.EventMQError('Receiver %s not ready. status=%s' % | 110 | raise exceptions.EventMQError('Not ready. status=%s' % |
| 110 | (self.name, self.status)) | 111 | (self.status)) |
| 111 | 112 | ||
| 112 | @property | 113 | @property |
| 113 | def ready(self): | 114 | def ready(self): |
| @@ -119,55 +120,3 @@ class Sender(object): | |||
| 119 | False | 120 | False |
| 120 | """ | 121 | """ |
| 121 | return self.status == constants.STATUS.ready | 122 | return self.status == constants.STATUS.ready |
| 122 | |||
| 123 | def send_multipart(self, message, protocol_version): | ||
| 124 | """ | ||
| 125 | Send a message directly to the 0mq socket. Automatically inserts some | ||
| 126 | frames for your convience. The sent frame ends up looking something | ||
| 127 | like identity | ||
| 128 | |||
| 129 | (this, '', protocol_version) + (your, tuple) | ||
| 130 | |||
| 131 | Args: | ||
| 132 | message (tuple): Raw message to send. | ||
| 133 | protocol_version (str): protocol version. it's good practice but | ||
| 134 | you may explicitly specify None to skip adding the version | ||
| 135 | """ | ||
| 136 | supported_msg_types = (tuple, list) | ||
| 137 | if not isinstance(message, supported_msg_types): | ||
| 138 | raise exceptions.MessageError( | ||
| 139 | '%s message type not one of %s' % | ||
| 140 | (type(message), str(supported_msg_types))) | ||
| 141 | |||
| 142 | if isinstance(message, list): | ||
| 143 | message = tuple(message) | ||
| 144 | |||
| 145 | headers = ('', protocol_version, ) | ||
| 146 | |||
| 147 | message = headers + message | ||
| 148 | print message | ||
| 149 | self.zsocket.send_multipart(message) | ||
| 150 | |||
| 151 | def send(self, message, protocol_version): | ||
| 152 | """ | ||
| 153 | Sends a message | ||
| 154 | |||
| 155 | Args: | ||
| 156 | message: message to send to something | ||
| 157 | protocol_version (str): protocol version. it's good practice, but | ||
| 158 | you may explicitly specify None to skip adding the version | ||
| 159 | """ | ||
| 160 | logger.debug('Sending message: %s' % str(message)) | ||
| 161 | self.send_multipart((message, ), protocol_version) | ||
| 162 | |||
| 163 | def recv(self): | ||
| 164 | """ | ||
| 165 | Receive a message | ||
| 166 | """ | ||
| 167 | return self.zsocket.recv() | ||
| 168 | |||
| 169 | def recv_multipart(self): | ||
| 170 | """ | ||
| 171 | Receive a multipart message | ||
| 172 | """ | ||
| 173 | return self.zsocket.recv_multipart | ||
diff --git a/eventmq/tests/test_utils.py b/eventmq/tests/test_utils.py index f32c4af..db3307d 100644 --- a/eventmq/tests/test_utils.py +++ b/eventmq/tests/test_utils.py | |||
| @@ -16,11 +16,12 @@ import unittest | |||
| 16 | 16 | ||
| 17 | from .. import exceptions | 17 | from .. import exceptions |
| 18 | from .. import utils | 18 | from .. import utils |
| 19 | import utils.messages | ||
| 19 | 20 | ||
| 20 | 21 | ||
| 21 | class TestCase(unittest.TestCase): | 22 | class TestCase(unittest.TestCase): |
| 22 | def test_generate_msgid(self): | 23 | def test_generate_msgid(self): |
| 23 | msgid = utils.generate_msgid() | 24 | msgid = utils.messages.generate_msgid() |
| 24 | 25 | ||
| 25 | self.assertEqual(type(msgid), str) | 26 | self.assertEqual(type(msgid), str) |
| 26 | 27 | ||
| @@ -30,9 +31,9 @@ class TestCase(unittest.TestCase): | |||
| 30 | emq_frame_manymsg = emq_headers + ('many', 'parts') | 31 | emq_frame_manymsg = emq_headers + ('many', 'parts') |
| 31 | emq_frame_nomsg = emq_headers | 32 | emq_frame_nomsg = emq_headers |
| 32 | 33 | ||
| 33 | singlemsg = utils.parse_message(emq_frame_singlemsg) | 34 | singlemsg = utils.messages.parse_router_message(emq_frame_singlemsg) |
| 34 | manymsg = utils.parse_message(emq_frame_manymsg) | 35 | manymsg = utils.messages.parse_router_message(emq_frame_manymsg) |
| 35 | nomsg = utils.parse_message(emq_frame_nomsg) | 36 | nomsg = utils.messages.parse_router_message(emq_frame_nomsg) |
| 36 | 37 | ||
| 37 | self.assertEqual(singlemsg[0], emq_frame_singlemsg[0]) | 38 | self.assertEqual(singlemsg[0], emq_frame_singlemsg[0]) |
| 38 | self.assertEqual(singlemsg[1], emq_frame_singlemsg[3]) | 39 | self.assertEqual(singlemsg[1], emq_frame_singlemsg[3]) |
| @@ -51,4 +52,4 @@ class TestCase(unittest.TestCase): | |||
| 51 | 52 | ||
| 52 | broken_message = ('dlkajfs', 'lkasdjf') | 53 | broken_message = ('dlkajfs', 'lkasdjf') |
| 53 | with self.assertRaises(exceptions.InvalidMessageError): | 54 | with self.assertRaises(exceptions.InvalidMessageError): |
| 54 | utils.parse_message(broken_message) | 55 | utils.messages.parse_router_message(broken_message) |
diff --git a/eventmq/utils.py b/eventmq/utils/__init__.py index 756331f..af8a4dc 100644 --- a/eventmq/utils.py +++ b/eventmq/utils/__init__.py | |||
| @@ -17,40 +17,18 @@ | |||
| 17 | ========================= | 17 | ========================= |
| 18 | This module contains a handful of utility classes to make dealing with things | 18 | This module contains a handful of utility classes to make dealing with things |
| 19 | like creating message more simple. | 19 | like creating message more simple. |
| 20 | """ | ||
| 21 | import uuid | ||
| 22 | 20 | ||
| 23 | from . import exceptions | 21 | .. toctree :: |
| 22 | :maxdepth: 2 | ||
| 24 | 23 | ||
| 24 | utils/classes | ||
| 25 | utils/messages | ||
| 26 | """ | ||
| 27 | import uuid | ||
| 25 | 28 | ||
| 26 | def generate_msgid(): | 29 | def random_characters(): |
| 27 | """ | 30 | """ |
| 28 | Returns a (universally) unique id to be used for messages | 31 | Returns some random characters of a specified length |
| 29 | """ | 32 | """ |
| 33 | # TODO: Pull out the random_chars function from eb.io code | ||
| 30 | return str(uuid.uuid4()) | 34 | return str(uuid.uuid4()) |
| 31 | |||
| 32 | |||
| 33 | def parse_message(message): | ||
| 34 | """ | ||
| 35 | Parses the generic format of an eMQP/1.0 message and returns the | ||
| 36 | parts. | ||
| 37 | |||
| 38 | Args: | ||
| 39 | message: the message you wish to have parsed | ||
| 40 | |||
| 41 | Returns (tuple) (sender_id, command, message_id, (message_body, and_data)) | ||
| 42 | """ | ||
| 43 | try: | ||
| 44 | sender = message[0] | ||
| 45 | # noop = message[1] | ||
| 46 | # protocol_version = message[2] | ||
| 47 | command = message[3] | ||
| 48 | msgid = message[4] | ||
| 49 | except IndexError: | ||
| 50 | raise exceptions.InvalidMessageError('Invalid Message Encountered: %s' | ||
| 51 | % str(message)) | ||
| 52 | if len(message) > 5: | ||
| 53 | msg = message[5:] | ||
| 54 | else: | ||
| 55 | msg = () | ||
| 56 | return (sender, command, msgid, msg) | ||
diff --git a/eventmq/utils/classes.py b/eventmq/utils/classes.py new file mode 100644 index 0000000..84e88fe --- /dev/null +++ b/eventmq/utils/classes.py | |||
| @@ -0,0 +1,93 @@ | |||
| 1 | # This file is part of eventmq. | ||
| 2 | # | ||
| 3 | # eventmq is free software: you can redistribute it and/or modify | ||
| 4 | # it under the terms of the GNU General Public License as published by | ||
| 5 | # the Free Software Foundation, either version 3 of the License, or | ||
| 6 | # (at your option) any later version. | ||
| 7 | # | ||
| 8 | # eventmq is distributed in the hope that it will be useful, | ||
| 9 | # but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
| 10 | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||
| 11 | # GNU General Public License for more details. | ||
| 12 | # | ||
| 13 | # You should have received a copy of the GNU General Public License | ||
| 14 | # along with eventmq. If not, see <http://www.gnu.org/licenses/>. | ||
| 15 | """ | ||
| 16 | :mod:`classes` -- Utility Classes | ||
| 17 | ================================= | ||
| 18 | Defines some classes to use when implementing ZMQ devices | ||
| 19 | """ | ||
| 20 | |||
| 21 | from .. import exceptions | ||
| 22 | from .. import log | ||
| 23 | |||
| 24 | logger = log.get_logger(__file__) | ||
| 25 | |||
| 26 | |||
| 27 | class ZMQReceiveMixin(object): | ||
| 28 | """ | ||
| 29 | Defines some methods for receiving messages. This class will not work if | ||
| 30 | used on it's own | ||
| 31 | """ | ||
| 32 | def recv(self): | ||
| 33 | """ | ||
| 34 | Receive a message | ||
| 35 | """ | ||
| 36 | return self.zsocket.recv() | ||
| 37 | |||
| 38 | def recv_multipart(self): | ||
| 39 | """ | ||
| 40 | Receive a multipart message | ||
| 41 | """ | ||
| 42 | return self.zsocket.recv_multipart() | ||
| 43 | |||
| 44 | |||
| 45 | class ZMQSendMixin(object): | ||
| 46 | """ | ||
| 47 | Defines some methods for sending messages. This class will not work if used | ||
| 48 | on it's own | ||
| 49 | """ | ||
| 50 | def send_multipart(self, message, protocol_version, _recipient_id=None): | ||
| 51 | """ | ||
| 52 | Send a message directly to the 0mq socket. Automatically inserts some | ||
| 53 | frames for your convience. The sent frame ends up looking something | ||
| 54 | like identity | ||
| 55 | |||
| 56 | (this, '', protocol_version) + (your, tuple) | ||
| 57 | |||
| 58 | Args: | ||
| 59 | message (tuple): Raw message to send. | ||
| 60 | protocol_version (str): protocol version. it's good practice but | ||
| 61 | you may explicitly specify None to skip adding the version | ||
| 62 | _recipient_id (object): When using a :attr:`zmq.ROUTER` you must | ||
| 63 | specify the the recipient id of the | ||
| 64 | """ | ||
| 65 | supported_msg_types = (tuple, list) | ||
| 66 | if not isinstance(message, supported_msg_types): | ||
| 67 | raise exceptions.MessageError( | ||
| 68 | '%s message type not one of %s' % | ||
| 69 | (type(message), str(supported_msg_types))) | ||
| 70 | |||
| 71 | if isinstance(message, list): | ||
| 72 | message = tuple(message) | ||
| 73 | |||
| 74 | if _recipient_id: | ||
| 75 | headers = (_recipient_id, '', protocol_version) | ||
| 76 | else: | ||
| 77 | headers = ('', protocol_version, ) | ||
| 78 | |||
| 79 | msg = headers + message | ||
| 80 | logger.debug('Sending message: %s' % str(msg)) | ||
| 81 | self.zsocket.send_multipart(msg) | ||
| 82 | |||
| 83 | def send(self, message, protocol_version): | ||
| 84 | """ | ||
| 85 | Sends a message | ||
| 86 | |||
| 87 | Args: | ||
| 88 | message: message to send to something | ||
| 89 | protocol_version (str): protocol version. it's good practice, but | ||
| 90 | you may explicitly specify None to skip adding the version | ||
| 91 | """ | ||
| 92 | logger.debug('Sending message: %s' % str(message)) | ||
| 93 | self.send_multipart((message, ), protocol_version) | ||
diff --git a/eventmq/utils/messages.py b/eventmq/utils/messages.py new file mode 100644 index 0000000..8abd374 --- /dev/null +++ b/eventmq/utils/messages.py | |||
| @@ -0,0 +1,132 @@ | |||
| 1 | # This file is part of eventmq. | ||
| 2 | # | ||
| 3 | # eventmq is free software: you can redistribute it and/or modify | ||
| 4 | # it under the terms of the GNU General Public License as published by | ||
| 5 | # the Free Software Foundation, either version 3 of the License, or | ||
| 6 | # (at your option) any later version. | ||
| 7 | # | ||
| 8 | # eventmq is distributed in the hope that it will be useful, | ||
| 9 | # but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
| 10 | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||
| 11 | # GNU General Public License for more details. | ||
| 12 | # | ||
| 13 | # You should have received a copy of the GNU General Public License | ||
| 14 | # along with eventmq. If not, see <http://www.gnu.org/licenses/>. | ||
| 15 | """ | ||
| 16 | :mod:`messages` -- Message Utilities | ||
| 17 | ========================================== | ||
| 18 | """ | ||
| 19 | from .. import constants, log, exceptions | ||
| 20 | from . import random_characters | ||
| 21 | |||
| 22 | |||
| 23 | def parse_router_message(message): | ||
| 24 | """ | ||
| 25 | Parses the generic format of an eMQP/1.0 message and returns the | ||
| 26 | parts. | ||
| 27 | |||
| 28 | Args: | ||
| 29 | message: the message you wish to have parsed | ||
| 30 | |||
| 31 | Returns (tuple) (sender_id, command, message_id, (message_body, and_data)) | ||
| 32 | """ | ||
| 33 | try: | ||
| 34 | sender = message[0] | ||
| 35 | # noop = message[1] | ||
| 36 | # protocol_version = message[2] | ||
| 37 | command = message[3] | ||
| 38 | msgid = message[4] | ||
| 39 | except IndexError: | ||
| 40 | raise exceptions.InvalidMessageError('Invalid Message Encountered: %s' | ||
| 41 | % str(message)) | ||
| 42 | if len(message) > 5: | ||
| 43 | msg = message[5:] | ||
| 44 | else: | ||
| 45 | msg = () | ||
| 46 | return (sender, command, msgid, msg) | ||
| 47 | |||
| 48 | |||
| 49 | def parse_message(message): | ||
| 50 | """ | ||
| 51 | Parses the generic format of an eMQP/1.0 message and returns the | ||
| 52 | parts. | ||
| 53 | |||
| 54 | Args: | ||
| 55 | message: the message you wish to have parsed | ||
| 56 | |||
| 57 | Returns (tuple) (command, message_id, (message_body, and_data)) | ||
| 58 | """ | ||
| 59 | try: | ||
| 60 | # noop = message[0] | ||
| 61 | # protocol_version = message[1] | ||
| 62 | command = message[2] | ||
| 63 | msgid = message[3] | ||
| 64 | except IndexError: | ||
| 65 | raise exceptions.InvalidMessageError('Invalid Message Encountered: %s' | ||
| 66 | % str(message)) | ||
| 67 | |||
| 68 | print len(message) | ||
| 69 | if len(message) > 4: | ||
| 70 | msg = message[4:] | ||
| 71 | else: | ||
| 72 | msg = () | ||
| 73 | return (command, msgid, msg) | ||
| 74 | |||
| 75 | |||
| 76 | def generate_msgid(prefix=None): | ||
| 77 | """ | ||
| 78 | Returns a random string to be used for message ids. Optionally the ID can | ||
| 79 | be prefixed with `prefix`. | ||
| 80 | |||
| 81 | Args: | ||
| 82 | prefix (str): Value to prefix on to the random part of the id. Useful | ||
| 83 | for prefixing some meta data to use for things | ||
| 84 | """ | ||
| 85 | id = random_characters() | ||
| 86 | return id if not prefix else str(prefix) + id | ||
| 87 | |||
| 88 | logger = log.get_logger(__file__) | ||
| 89 | |||
| 90 | |||
| 91 | def send_emqp_message(socket, command, message=None): | ||
| 92 | """ | ||
| 93 | Formats and sends an eMQP message | ||
| 94 | |||
| 95 | Args: | ||
| 96 | |||
| 97 | Raises: | ||
| 98 | |||
| 99 | Returns | ||
| 100 | """ | ||
| 101 | msg = (str(command).upper(), generate_msgid()) | ||
| 102 | if message and isinstance(message, (tuple, list)): | ||
| 103 | msg += message | ||
| 104 | elif message: | ||
| 105 | msg += (message,) | ||
| 106 | |||
| 107 | socket.send_multipart(msg, constants.PROTOCOL_VERSION) | ||
| 108 | |||
| 109 | |||
| 110 | def send_emqp_router_message(socket, recipient_id, command, message=None): | ||
| 111 | """ | ||
| 112 | Formats and sends an eMQP message taking into account the recipient frame | ||
| 113 | used by a :attr:`zmq.ROUTER` device. | ||
| 114 | |||
| 115 | Args: | ||
| 116 | socket: socket to send the message with | ||
| 117 | recipient_id (str): the id of the connected device to reply to | ||
| 118 | command (str): the eMQP command to send | ||
| 119 | message: a msg tuple to send | ||
| 120 | |||
| 121 | Raises: | ||
| 122 | |||
| 123 | Returns | ||
| 124 | """ | ||
| 125 | msg = (str(command).upper(), generate_msgid()) | ||
| 126 | if message and isinstance(message, (tuple, list)): | ||
| 127 | msg += message | ||
| 128 | elif message: | ||
| 129 | msg += (message,) | ||
| 130 | |||
| 131 | socket.send_multipart(msg, constants.PROTOCOL_VERSION, | ||
| 132 | _recipient_id=recipient_id) | ||