aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorjason2015-11-23 20:26:36 -0700
committerjason2015-11-23 20:26:36 -0700
commit5de2b6bc337ac69204f0e709beafb28108e6ca20 (patch)
tree91db39c12f97e12440b52d223f579872e10b8c13
parent61869e4ae2e23a94c9cc92bb625069c50858db80 (diff)
downloadeventmq-5de2b6bc337ac69204f0e709beafb28108e6ca20.tar.gz
eventmq-5de2b6bc337ac69204f0e709beafb28108e6ca20.zip
Many changes
- organizes some code - Router sends acknowledgement after jobmanager INFORMs
-rw-r--r--docs/api.rst3
-rw-r--r--docs/contributing.rst6
-rw-r--r--docs/exceptions.rst2
-rw-r--r--docs/poller.rst3
-rw-r--r--docs/protocol.rst43
-rw-r--r--docs/utils/classes.rst3
-rw-r--r--docs/utils/messages.rst3
-rw-r--r--eventmq/constants.py13
-rw-r--r--eventmq/jobmanager.py82
-rw-r--r--eventmq/log.py8
-rw-r--r--eventmq/poller.py105
-rw-r--r--eventmq/receiver.py19
-rw-r--r--eventmq/router.py59
-rw-r--r--eventmq/sender.py71
-rw-r--r--eventmq/tests/test_utils.py11
-rw-r--r--eventmq/utils/__init__.py (renamed from eventmq/utils.py)40
-rw-r--r--eventmq/utils/classes.py93
-rw-r--r--eventmq/utils/messages.py132
18 files changed, 550 insertions, 146 deletions
diff --git a/docs/api.rst b/docs/api.rst
index b0ecf4e..b3cdc83 100644
--- a/docs/api.rst
+++ b/docs/api.rst
@@ -5,9 +5,10 @@ API Documentation
5.. toctree:: 5.. toctree::
6 :maxdepth: 2 6 :maxdepth: 2
7 7
8 exceptions
8 jobmanager 9 jobmanager
10 poller
9 receiver 11 receiver
10 router 12 router
11 sender 13 sender
12 utils 14 utils
13 exceptions
diff --git a/docs/contributing.rst b/docs/contributing.rst
index c0a03fb..2489abb 100644
--- a/docs/contributing.rst
+++ b/docs/contributing.rst
@@ -2,7 +2,13 @@
2Contributing to EventMQ 2Contributing to EventMQ
3####################### 3#######################
4 4
5A few tips when working on the code
6
7 * Use relative imports. If you use absolute imports then when you `import eventmq.exceptions` it's possible that you receive in return a different version of eventmq. exceptions installed somewhere else on the system.
8
5.. toctree:: 9.. toctree::
6 :maxdepth: 1 10 :maxdepth: 1
7 11
8 protocol 12 protocol
13 Source Code <https://github.com/enderlabs/eventmq>
14 Issues <https://github.com/enderlabs/eventmq/issues>
diff --git a/docs/exceptions.rst b/docs/exceptions.rst
new file mode 100644
index 0000000..04f9c5a
--- /dev/null
+++ b/docs/exceptions.rst
@@ -0,0 +1,2 @@
1.. automodule:: eventmq.exceptions
2 :members:
diff --git a/docs/poller.rst b/docs/poller.rst
new file mode 100644
index 0000000..1aab472
--- /dev/null
+++ b/docs/poller.rst
@@ -0,0 +1,3 @@
1.. automodule:: eventmq.poller
2 :members:
3 :special-members:
diff --git a/docs/protocol.rst b/docs/protocol.rst
index d9bd9e1..c32501c 100644
--- a/docs/protocol.rst
+++ b/docs/protocol.rst
@@ -46,9 +46,22 @@ From the 0MQ manual[[2](http://api.zeromq.org/master:zmq-socket)]
46 46
47This extra frame is not shown in the specifications below. 47This extra frame is not shown in the specifications below.
48 48
49Global Frames
50-------------
51An **ACK** command consists of a 4-frame multipart message, formatted as follows.
52
53====== ============== ===========
54FRAME Value Description
55====== ============== ===========
560 _EMPTY_ leave empty
571 eMQP/1.0 Protocol version
582 ACK command
593 _MSGID_ A unique id for the msg
60====== ============== ===========
61
49eMQP / Client 62eMQP / Client
50------------- 63-------------
51A **REQUEST** command consists of 7-frame multipart message, formatted as follows. 64A **REQUEST** command consists of a 7-frame multipart message, formatted as follows.
52 65
53====== ============== =========== 66====== ============== ===========
54FRAME Value Description 67FRAME Value Description
@@ -62,6 +75,20 @@ FRAME Value Description
626 _MSG_ The message to send 756 _MSG_ The message to send
63====== ============== =========== 76====== ============== ===========
64 77
78A **PUBLISH** command consists of a 7-frame multipart messag, formatted as follows.
79
80====== ============== ===========
81FRAME Value Description
82====== ============== ===========
830 _EMPTY_ leave empty
841 eMQP/1.0 Protocol version
852 PUBLISH command
863 _MSGID_ A unique id for the msg
874 _TOPIC_NAME_ the name of the queue the worker belongs to
885 _HEADERS_ dictionary of headers. can be an empty set
896 _MSG_ The message to send
90====== ============== ===========
91
65eMQP / Worker 92eMQP / Worker
66------------- 93-------------
67An **INFORM** command consists of a 5-frame multipart message, formatted as follows. 94An **INFORM** command consists of a 5-frame multipart message, formatted as follows.
@@ -128,3 +155,17 @@ Heartbeating
128 * Both worker and broker MUST send heartbeats at regular and agreed-upon intervals. 155 * Both worker and broker MUST send heartbeats at regular and agreed-upon intervals.
129 * If the worker detects that the broker disconnected it SHOULD restart the conversation. 156 * If the worker detects that the broker disconnected it SHOULD restart the conversation.
130 * If the broker detects that a worker has disconnected it should stop sending it a message of any type. 157 * If the broker detects that a worker has disconnected it should stop sending it a message of any type.
158
159Request Headers
160---------------
161Headers MUST be 0 to many comma seperated values inserted into the header field. If there are no headers requried, send an empty string MUST be sent where headers are required.
162
163Below is a table which defines and describes the headers.
164
165=============== ======= ======= ======= ===========
166Header REQUEST PUBLISH Default Description
167=============== ======= ======= ======= ===========
168reply-requested X False Once the job is finished, send a reply back with information from the job. If there is no information reply with a True value.
169retry-count:# X 0 Retry a failed job this many times before accepting defeat.
170guarantee X False Ensure the job completes by letting someone else worry about a success reply.
171=============== ======= ======= ======= ===========
diff --git a/docs/utils/classes.rst b/docs/utils/classes.rst
new file mode 100644
index 0000000..ab48e06
--- /dev/null
+++ b/docs/utils/classes.rst
@@ -0,0 +1,3 @@
1.. automodule:: eventmq.utils.classes
2 :members:
3 :special-members:
diff --git a/docs/utils/messages.rst b/docs/utils/messages.rst
new file mode 100644
index 0000000..e49897c
--- /dev/null
+++ b/docs/utils/messages.rst
@@ -0,0 +1,3 @@
1.. automodule:: eventmq.utils.messages
2 :members:
3 :special-members:
diff --git a/eventmq/constants.py b/eventmq/constants.py
index aff5a59..621a6a5 100644
--- a/eventmq/constants.py
+++ b/eventmq/constants.py
@@ -1,9 +1,12 @@
1class STATUS(object): 1class STATUS(object):
2 wtf = -1 2 wtf = -1 # Something went wrong
3 ready = 100 3 ready = 100 # Waiting to connect or listen
4 starting = 101 4 starting = 101 # Starting to bind
5 listening = 201 5 listening = 102 # bound
6 connected = 202 6 connecting = 200
7 connected = 201
7 stopping = 300 8 stopping = 300
9 stopped = 301
8 10
11# See doc/protocol.rst
9PROTOCOL_VERSION = 'eMQP/1.0' 12PROTOCOL_VERSION = 'eMQP/1.0'
diff --git a/eventmq/jobmanager.py b/eventmq/jobmanager.py
index 85a90aa..d00d3e6 100644
--- a/eventmq/jobmanager.py
+++ b/eventmq/jobmanager.py
@@ -17,14 +17,17 @@
17================================ 17================================
18Ensures things about jobs and spawns the actual tasks 18Ensures things about jobs and spawns the actual tasks
19""" 19"""
20from time import sleep
20import uuid 21import uuid
21 22
22from zmq.eventloop import ioloop
23
24from . import constants 23from . import constants
24from . import exceptions
25from . import log 25from . import log
26from . import utils 26from . import utils
27from .poller import Poller, POLLIN
27from .sender import Sender 28from .sender import Sender
29from .utils.messages import send_emqp_message as sendmsg
30import utils.messages
28 31
29logger = log.get_logger(__file__) 32logger = log.get_logger(__file__)
30 33
@@ -46,55 +49,80 @@ class JobManager(object):
46 name (str): unique name of this instance. By default a uuid will be 49 name (str): unique name of this instance. By default a uuid will be
47 generated. 50 generated.
48 """ 51 """
49 ioloop.install()
50 self.name = kwargs.get('name', str(uuid.uuid4())) 52 self.name = kwargs.get('name', str(uuid.uuid4()))
51 self.incoming = Sender() 53 self.incoming = Sender()
54 self.poller = Poller()
55
56 # Alert us of both incoming and outgoing events
57 self.poller.register(self.incoming, POLLIN)
52 58
53 self.status = constants.STATUS.ready 59 self.status = constants.STATUS.ready
54 60
61 # Are we waiting for an acknowledgment for something?
62 self.awaiting_ack = False
63
55 def start(self, addr='tcp://127.0.0.1:47291'): 64 def start(self, addr='tcp://127.0.0.1:47291'):
56 """ 65 """
57 Begin listening for job requests 66 Connect to `addr` and begin listening for job requests
58 67
59 Args: 68 Args:
60 args (str): connection string to connect to 69 args (str): connection string to connect to
61 """ 70 """
71 self.status = constants.STATUS.connecting
62 self.incoming.connect(addr) 72 self.incoming.connect(addr)
63 self.status = constants.STATUS.listening
64 73
74 self.awaiting_ack = True
75
76 #while self.awaiting_ack:
65 self.send_inform() 77 self.send_inform()
66 ioloop.IOLoop.instance().start() 78 # sleep(5)
67 79
68 def process_job(self, msg): 80 self.status = constants.STATUS.connected
69 pass
70 81
71 def sync(self): 82 while True:
72 pass 83 events = self.poller.poll(1000)
73 84
74 def send_message(self, command, message): 85 if events.get(self.incoming) == POLLIN:
75 """ 86 msg = self.incoming.recv_multipart()
76 send a message to `self.incoming` 87 self.process_message(msg)
77 Args:
78 message: a msg tuple to send
79 Raises:
80 88
81 Returns 89 def process_message(self, msg):
82 """ 90 """
83 msg = (str(command).upper(), utils.generate_msgid()) 91 Processes a message
84 if isinstance(message, (tuple, list)): 92 """
85 msg += message 93 try:
94 message = utils.messages.parse_message(msg)
95 except exceptions.InvalidMessageError:
96 logger.error('Invalid message: %s' % str(msg))
97 return
98
99 command = message[0]
100 msgid = message[1]
101 message = message[2]
102
103 if hasattr(self, "on_%s" % command.lower()):
104 logger.debug('Calling on_%s' % command.lower())
105 func = getattr(self, "on_%s" % command.lower())
106 func(msgid, message)
86 else: 107 else:
87 msg += (message,) 108 logger.warning('No handler for %s found (tried: %s)' %
109 (command, ('on_%s' % command.lower)))
88 110
89 logger.debug('Sending message: %s' % str(msg)) 111 def process_job(self, msg):
90 self.incoming.send_multipart(msg, constants.PROTOCOL_VERSION) 112 pass
113
114 def sync(self):
115 pass
91 116
92 def send_inform(self): 117 def send_inform(self):
93 """ 118 """
94 Send an INFORM frame 119 Send an INFORM frame
95 """ 120 """
96 self.send_message('INFORM', 'default_queuename') 121 sendmsg(self.incoming, 'INFORM', 'default_queuename')
97
98 122
99 def respond(self): 123 def on_ack(self, msgid, message):
100 pass 124 """
125 Sets :attr:`awaiting_ack` to False
126 """
127 logger.info('Recieved ACK')
128 self.awaiting_ack = False
diff --git a/eventmq/log.py b/eventmq/log.py
index 836126d..487042a 100644
--- a/eventmq/log.py
+++ b/eventmq/log.py
@@ -8,7 +8,9 @@ import zmq.log.handlers
8 8
9 9
10FORMAT_STANDARD = logging.Formatter( 10FORMAT_STANDARD = logging.Formatter(
11 '%(asctime)s - %(name)s %(levelname)s - "%(message)s') 11 '%(asctime)s - %(name)s %(levelname)s - %(message)s')
12FORMAT_NAMELESS = logging.Formatter(
13 '%(asctime)s - %(levelname)s - %(message)s')
12 14
13 15
14class PUBHandler(zmq.log.handlers.PUBHandler): 16class PUBHandler(zmq.log.handlers.PUBHandler):
@@ -28,10 +30,12 @@ class handlers(object):
28 STREAM_HANDLER = logging.StreamHandler 30 STREAM_HANDLER = logging.StreamHandler
29 31
30 32
31def get_logger(name, formatter=FORMAT_STANDARD, 33def get_logger(name, formatter=FORMAT_NAMELESS,
32 handler=handlers.STREAM_HANDLER): 34 handler=handlers.STREAM_HANDLER):
33 logger = logging.getLogger(name) 35 logger = logging.getLogger(name)
34 logger.setLevel(logging.DEBUG) 36 logger.setLevel(logging.DEBUG)
37 for h in logger.handlers:
38 logger.removeHandler(h)
35 39
36 if handler == handlers.PUBLISH_HANDLER: 40 if handler == handlers.PUBLISH_HANDLER:
37 _handler_sock = zmq.Context.instance().socket(zmq.PUB) 41 _handler_sock = zmq.Context.instance().socket(zmq.PUB)
diff --git a/eventmq/poller.py b/eventmq/poller.py
new file mode 100644
index 0000000..82b8c2d
--- /dev/null
+++ b/eventmq/poller.py
@@ -0,0 +1,105 @@
1# This file is part of eventmq.
2#
3# eventmq is free software: you can redistribute it and/or modify
4# it under the terms of the GNU General Public License as published by
5# the Free Software Foundation, either version 3 of the License, or
6# (at your option) any later version.
7#
8# eventmq is distributed in the hope that it will be useful,
9# but WITHOUT ANY WARRANTY; without even the implied warranty of
10# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11# GNU General Public License for more details.
12#
13# You should have received a copy of the GNU General Public License
14# along with eventmq. If not, see <http://www.gnu.org/licenses/>.
15"""
16:mod:`poller` -- Poller
17=======================
18Device for polling sockets
19"""
20import uuid
21
22import zmq
23from zmq import Poller as ZPoller
24
25from . import log
26
27logger = log.get_logger(__package__)
28
29POLLIN = zmq.POLLIN
30POLLOUT = zmq.POLLOUT
31POLLINOUT = zmq.POLLIN | zmq.POLLOUT
32
33
34class Poller(ZPoller):
35 """
36 :class:`zmq.Poller` based class.
37 """
38 def __init__(self, *args, **kwargs):
39 """
40 See :class:`zmq.Poller`
41 """
42 super(Poller, self).__init__(*args, **kwargs)
43 self.name = str(uuid.uuid4())
44
45 self._sockets = []
46
47 def register(self, socket, flag=0):
48 """
49 Register a socket to be polled by this poller
50
51 Args:
52 socket (socket): The socket object to register
53 flags (int): :attr:`POLLIN`, :attr:`POLLOUT`, or
54 :attr:`POLLIN`|:attr:`POLLOUT` for both. If undefined the
55 socket remains unregistered.
56 """
57 if not flag:
58 logger.warning("Leaving socket %s unregistered because no flag "
59 "was defined" % socket.name)
60 return
61 logger.debug('Registering %s with poller. (flags:%s)' % (socket.name,
62 flag))
63 self._sockets.append(socket)
64
65 super(Poller, self).register(socket.zsocket, flag)
66
67 def unregister(self, socket):
68 """
69 Unregister a socket from being polled
70
71 Args:
72 socket (socket): The socket object to registering
73 """
74 logger.debug("Unregistering %s from poller" % socket.name)
75 if socket not in self._sockets:
76 logger.warning("Attempt to unregister unregistered socket from "
77 "poller: socket: %s" % socket.name)
78
79 self._sockets.remove(socket.zsocket)
80 super(Poller, self).unregister(socket.zsocket)
81
82 def poll(self, timeout=1):
83 """
84 Calling :meth:`zmq.Poller.poll` directly returns a tuple set. This
85 method typecasts to a dictionary for convience using the socket object
86 as the key. If a socket doesn't appear in the returned dictionary then
87 no events happened.
88
89 Args:
90 timeout (int): How long should poller wait for before iterating
91 the next loop.
92
93 Returns (dict) Dictionary using the socket as the key and the event the
94 socket generated as the value
95 """
96 events = dict(super(Poller, self).poll(timeout))
97 return_events = {}
98
99 for socket in self._sockets:
100 if socket.zsocket not in events:
101 continue
102
103 return_events[socket] = events[socket.zsocket]
104
105 return return_events
diff --git a/eventmq/receiver.py b/eventmq/receiver.py
index dc1c9ef..5800653 100644
--- a/eventmq/receiver.py
+++ b/eventmq/receiver.py
@@ -22,13 +22,14 @@ import uuid
22import zmq 22import zmq
23from zmq.eventloop import zmqstream 23from zmq.eventloop import zmqstream
24 24
25import eventmq 25from . import constants, log
26import log 26from .utils.classes import ZMQReceiveMixin, ZMQSendMixin
27
27 28
28logger = log.get_logger(__file__) 29logger = log.get_logger(__file__)
29 30
30 31
31class Receiver(object): 32class Receiver(ZMQReceiveMixin, ZMQSendMixin):
32 """ 33 """
33 Receives messages and pass them to a on_recv. 34 Receives messages and pass them to a on_recv.
34 35
@@ -72,6 +73,8 @@ class Receiver(object):
72 self.zsocket.setsockopt(zmq.ROUTER_MANDATORY, 1) 73 self.zsocket.setsockopt(zmq.ROUTER_MANDATORY, 1)
73 74
74 if not self.skip_zmqstream: 75 if not self.skip_zmqstream:
76 # To use the built in ioloop, we have to wrap the socket in this
77 # ZMQStream object
75 if not callable(self.on_recv): 78 if not callable(self.on_recv):
76 raise TypeError('Required argument "on_recv" is not actually ' 79 raise TypeError('Required argument "on_recv" is not actually '
77 'callable') 80 'callable')
@@ -80,7 +83,7 @@ class Receiver(object):
80 self.zsocket = zmqstream.ZMQStream(self.zsocket) 83 self.zsocket = zmqstream.ZMQStream(self.zsocket)
81 self.zsocket.on_recv(self.on_recv) 84 self.zsocket.on_recv(self.on_recv)
82 85
83 self.status = eventmq.STATUS.ready 86 self.status = constants.STATUS.ready
84 87
85 def listen(self, addr=None): 88 def listen(self, addr=None):
86 """ 89 """
@@ -94,7 +97,7 @@ class Receiver(object):
94 """ 97 """
95 if self.ready: 98 if self.ready:
96 self.zsocket.bind(addr) 99 self.zsocket.bind(addr)
97 self.status = eventmq.STATUS.listening 100 self.status = constants.STATUS.listening
98 logger.info('Receiver %s: Listening on %s' % (self.name, addr)) 101 logger.info('Receiver %s: Listening on %s' % (self.name, addr))
99 else: 102 else:
100 raise Exception('Receiver %s not ready. status=%s' % 103 raise Exception('Receiver %s not ready. status=%s' %
@@ -112,8 +115,8 @@ class Receiver(object):
112 """ 115 """
113 if self.ready: 116 if self.ready:
114 self.zsocket.connect(addr) 117 self.zsocket.connect(addr)
115 self.status = eventmq.STATUS.connected 118 self.status = constants.STATUS.connected
116 logger.info('Receiver %s: Connected to %s' % (self.name, addr)) 119 logger.debug('Receiver %s: Connected to %s' % (self.name, addr))
117 else: 120 else:
118 raise Exception('Receiver %s not ready. status=%s' % 121 raise Exception('Receiver %s not ready. status=%s' %
119 (self.name, self.status)) 122 (self.name, self.status))
@@ -127,4 +130,4 @@ class Receiver(object):
127 bool: True if the receiver is ready to connect or listen, otherwise 130 bool: True if the receiver is ready to connect or listen, otherwise
128 False 131 False
129 """ 132 """
130 return self.status == eventmq.STATUS.ready 133 return self.status == constants.STATUS.ready
diff --git a/eventmq/router.py b/eventmq/router.py
index b71c6e8..77b752d 100644
--- a/eventmq/router.py
+++ b/eventmq/router.py
@@ -22,9 +22,11 @@ import uuid
22from zmq.eventloop import ioloop 22from zmq.eventloop import ioloop
23 23
24from .constants import STATUS 24from .constants import STATUS
25from . import log 25from . import exceptions, log, receiver, utils
26from . import receiver 26from .utils.messages import (
27from . import utils 27 send_emqp_router_message as sendmsg,
28 parse_router_message
29)
28 30
29logger = log.get_logger(__file__) 31logger = log.get_logger(__file__)
30 32
@@ -48,6 +50,7 @@ class Router(object):
48 50
49 self.status = STATUS.ready 51 self.status = STATUS.ready
50 logger.info('Done initializing Router %s' % self.name) 52 logger.info('Done initializing Router %s' % self.name)
53 self.queues = {}
51 54
52 def start(self, 55 def start(self,
53 frontend_addr='tcp://127.0.0.1:47290', 56 frontend_addr='tcp://127.0.0.1:47290',
@@ -70,21 +73,67 @@ class Router(object):
70 73
71 ioloop.IOLoop.instance().start() 74 ioloop.IOLoop.instance().start()
72 75
76 def send_ack(self, socket, recipient):
77 """
78 Sends an ACK response
79 """
80 logger.info('Sending ACK to %s' % recipient)
81 sendmsg(socket, recipient, 'ACK')
82
83 def on_inform(self, sender, msgid, msg):
84 """
85 Handles an INFORM message. Usually when new worker coming online
86 """
87 logger.info('Received INFORM request from %s')
88 queue_name = msg[0]
89
90 if queue_name in self.queues:
91 self.queues[queue_name] += (sender,)
92 else:
93 self.queues[queue_name] = (sender,)
94
95 self.send_ack(self.outgoing, sender)
96
73 def on_receive_request(self, msg): 97 def on_receive_request(self, msg):
74 """ 98 """
75 This function is called when a message comes in from the client socket. 99 This function is called when a message comes in from the client socket.
76 It then calls `on_command`. If `on_command` isn't found, then a 100 It then calls `on_command`. If `on_command` isn't found, then a
77 warning is created. 101 warning is created.
78 """ 102 """
79 logger.info(str(utils.parse_message(msg))) 103 try:
104 message = parse_router_message(msg)
105 except exceptions.InvalidMessageError:
106 logger.exception('Invalid message from clients: %s' % str(msg))
107
108 queue_name = message[3][0]
80 109
81 # do some things and forward it to the workers 110 # do some things and forward it to the workers
82 self.outgoing.send_multipart(msg) 111 self.outgoing.send_multipart(msg)
83 112
113 # If we have no workers for the queue TODO something about it
114 if queue_name not in self.queues:
115 logger.warning("Received REQUEST with a queue I don't recognize")
116
84 def on_receive_reply(self, msg): 117 def on_receive_reply(self, msg):
85 """ 118 """
86 This method is called when a message comes in from the worker socket. 119 This method is called when a message comes in from the worker socket.
87 It then calls `on_command`. If `on_command` isn't found, then a warning 120 It then calls `on_command`. If `on_command` isn't found, then a warning
88 is created. 121 is created.
122
123 def on_inform(msg):
124 pass
89 """ 125 """
90 logger.info(str(utils.parse_message(msg))) 126 try:
127 message = parse_router_message(msg)
128 except exceptions.InvalidMessageError:
129 logger.exception('Invalid message from workers: %s' % str(msg))
130 return
131
132 sender = message[0]
133 command = message[1]
134 msgid = message[2]
135 message = message[3]
136
137 if hasattr(self, "on_%s" % command.lower()):
138 func = getattr(self, "on_%s" % command.lower())
139 func(sender, msgid, message)
diff --git a/eventmq/sender.py b/eventmq/sender.py
index 2c19899..ddada6a 100644
--- a/eventmq/sender.py
+++ b/eventmq/sender.py
@@ -25,11 +25,12 @@ from zmq.eventloop import zmqstream
25from . import constants 25from . import constants
26from . import exceptions 26from . import exceptions
27from . import log 27from . import log
28from .utils.classes import ZMQReceiveMixin, ZMQSendMixin
28 29
29logger = log.get_logger(__file__) 30logger = log.get_logger(__file__)
30 31
31 32
32class Sender(object): 33class Sender(ZMQSendMixin, ZMQReceiveMixin):
33 """ 34 """
34 Sends messages to a particular socket 35 Sends messages to a particular socket
35 36
@@ -39,7 +40,7 @@ class Sender(object):
39 40
40 Attributes: 41 Attributes:
41 name (str): Name of this socket 42 name (str): Name of this socket
42 zcontext (:class`zmq.Context`): socket context 43 zcontext (:class:`zmq.Context`): socket context
43 zsocket (:class:`zmq.Socket`): socket wrapped up in a 44 zsocket (:class:`zmq.Socket`): socket wrapped up in a
44 :class:`zmqstream.ZMQStream` 45 :class:`zmqstream.ZMQStream`
45 """ 46 """
@@ -67,7 +68,7 @@ class Sender(object):
67 self.zsocket.setsockopt(zmq.IDENTITY, self.name) 68 self.zsocket.setsockopt(zmq.IDENTITY, self.name)
68 69
69 if not kwargs.get('skip_zmqstream', True): 70 if not kwargs.get('skip_zmqstream', True):
70 logger.debug('Using ZMQStream') 71 logger.info('Using ZMQStream')
71 self.zsocket = zmqstream.ZMQStream(self.zsocket) 72 self.zsocket = zmqstream.ZMQStream(self.zsocket)
72 self.zsocket.on_recv(kwargs.get('on_recv')) 73 self.zsocket.on_recv(kwargs.get('on_recv'))
73 74
@@ -86,10 +87,10 @@ class Sender(object):
86 if self.ready: 87 if self.ready:
87 self.zsocket.bind(addr) 88 self.zsocket.bind(addr)
88 self.status = constants.STATUS.listening 89 self.status = constants.STATUS.listening
89 logger.info('Receiver %s: Listening on %s' % (self.name, addr)) 90 logger.info('Listening on %s' % (addr))
90 else: 91 else:
91 raise exceptions.EventMQError('Receiver %s not ready. status=%s' % 92 raise exceptions.EventMQError('Not ready. status=%s' %
92 (self.name, self.status)) 93 (self.status))
93 94
94 def connect(self, addr=None): 95 def connect(self, addr=None):
95 """ 96 """
@@ -104,10 +105,10 @@ class Sender(object):
104 if self.ready: 105 if self.ready:
105 self.zsocket.connect(addr) 106 self.zsocket.connect(addr)
106 self.status = constants.STATUS.connected 107 self.status = constants.STATUS.connected
107 logger.info('Receiver %s: Connected to %s' % (self.name, addr)) 108 logger.debug('Connecting to %s' % (addr))
108 else: 109 else:
109 raise exceptions.EventMQError('Receiver %s not ready. status=%s' % 110 raise exceptions.EventMQError('Not ready. status=%s' %
110 (self.name, self.status)) 111 (self.status))
111 112
112 @property 113 @property
113 def ready(self): 114 def ready(self):
@@ -119,55 +120,3 @@ class Sender(object):
119 False 120 False
120 """ 121 """
121 return self.status == constants.STATUS.ready 122 return self.status == constants.STATUS.ready
122
123 def send_multipart(self, message, protocol_version):
124 """
125 Send a message directly to the 0mq socket. Automatically inserts some
126 frames for your convience. The sent frame ends up looking something
127 like identity
128
129 (this, '', protocol_version) + (your, tuple)
130
131 Args:
132 message (tuple): Raw message to send.
133 protocol_version (str): protocol version. it's good practice but
134 you may explicitly specify None to skip adding the version
135 """
136 supported_msg_types = (tuple, list)
137 if not isinstance(message, supported_msg_types):
138 raise exceptions.MessageError(
139 '%s message type not one of %s' %
140 (type(message), str(supported_msg_types)))
141
142 if isinstance(message, list):
143 message = tuple(message)
144
145 headers = ('', protocol_version, )
146
147 message = headers + message
148 print message
149 self.zsocket.send_multipart(message)
150
151 def send(self, message, protocol_version):
152 """
153 Sends a message
154
155 Args:
156 message: message to send to something
157 protocol_version (str): protocol version. it's good practice, but
158 you may explicitly specify None to skip adding the version
159 """
160 logger.debug('Sending message: %s' % str(message))
161 self.send_multipart((message, ), protocol_version)
162
163 def recv(self):
164 """
165 Receive a message
166 """
167 return self.zsocket.recv()
168
169 def recv_multipart(self):
170 """
171 Receive a multipart message
172 """
173 return self.zsocket.recv_multipart
diff --git a/eventmq/tests/test_utils.py b/eventmq/tests/test_utils.py
index f32c4af..db3307d 100644
--- a/eventmq/tests/test_utils.py
+++ b/eventmq/tests/test_utils.py
@@ -16,11 +16,12 @@ import unittest
16 16
17from .. import exceptions 17from .. import exceptions
18from .. import utils 18from .. import utils
19import utils.messages
19 20
20 21
21class TestCase(unittest.TestCase): 22class TestCase(unittest.TestCase):
22 def test_generate_msgid(self): 23 def test_generate_msgid(self):
23 msgid = utils.generate_msgid() 24 msgid = utils.messages.generate_msgid()
24 25
25 self.assertEqual(type(msgid), str) 26 self.assertEqual(type(msgid), str)
26 27
@@ -30,9 +31,9 @@ class TestCase(unittest.TestCase):
30 emq_frame_manymsg = emq_headers + ('many', 'parts') 31 emq_frame_manymsg = emq_headers + ('many', 'parts')
31 emq_frame_nomsg = emq_headers 32 emq_frame_nomsg = emq_headers
32 33
33 singlemsg = utils.parse_message(emq_frame_singlemsg) 34 singlemsg = utils.messages.parse_router_message(emq_frame_singlemsg)
34 manymsg = utils.parse_message(emq_frame_manymsg) 35 manymsg = utils.messages.parse_router_message(emq_frame_manymsg)
35 nomsg = utils.parse_message(emq_frame_nomsg) 36 nomsg = utils.messages.parse_router_message(emq_frame_nomsg)
36 37
37 self.assertEqual(singlemsg[0], emq_frame_singlemsg[0]) 38 self.assertEqual(singlemsg[0], emq_frame_singlemsg[0])
38 self.assertEqual(singlemsg[1], emq_frame_singlemsg[3]) 39 self.assertEqual(singlemsg[1], emq_frame_singlemsg[3])
@@ -51,4 +52,4 @@ class TestCase(unittest.TestCase):
51 52
52 broken_message = ('dlkajfs', 'lkasdjf') 53 broken_message = ('dlkajfs', 'lkasdjf')
53 with self.assertRaises(exceptions.InvalidMessageError): 54 with self.assertRaises(exceptions.InvalidMessageError):
54 utils.parse_message(broken_message) 55 utils.messages.parse_router_message(broken_message)
diff --git a/eventmq/utils.py b/eventmq/utils/__init__.py
index 756331f..af8a4dc 100644
--- a/eventmq/utils.py
+++ b/eventmq/utils/__init__.py
@@ -17,40 +17,18 @@
17========================= 17=========================
18This module contains a handful of utility classes to make dealing with things 18This module contains a handful of utility classes to make dealing with things
19like creating message more simple. 19like creating message more simple.
20"""
21import uuid
22 20
23from . import exceptions 21.. toctree ::
22 :maxdepth: 2
24 23
24 utils/classes
25 utils/messages
26"""
27import uuid
25 28
26def generate_msgid(): 29def random_characters():
27 """ 30 """
28 Returns a (universally) unique id to be used for messages 31 Returns some random characters of a specified length
29 """ 32 """
33 # TODO: Pull out the random_chars function from eb.io code
30 return str(uuid.uuid4()) 34 return str(uuid.uuid4())
31
32
33def parse_message(message):
34 """
35 Parses the generic format of an eMQP/1.0 message and returns the
36 parts.
37
38 Args:
39 message: the message you wish to have parsed
40
41 Returns (tuple) (sender_id, command, message_id, (message_body, and_data))
42 """
43 try:
44 sender = message[0]
45 # noop = message[1]
46 # protocol_version = message[2]
47 command = message[3]
48 msgid = message[4]
49 except IndexError:
50 raise exceptions.InvalidMessageError('Invalid Message Encountered: %s'
51 % str(message))
52 if len(message) > 5:
53 msg = message[5:]
54 else:
55 msg = ()
56 return (sender, command, msgid, msg)
diff --git a/eventmq/utils/classes.py b/eventmq/utils/classes.py
new file mode 100644
index 0000000..84e88fe
--- /dev/null
+++ b/eventmq/utils/classes.py
@@ -0,0 +1,93 @@
1# This file is part of eventmq.
2#
3# eventmq is free software: you can redistribute it and/or modify
4# it under the terms of the GNU General Public License as published by
5# the Free Software Foundation, either version 3 of the License, or
6# (at your option) any later version.
7#
8# eventmq is distributed in the hope that it will be useful,
9# but WITHOUT ANY WARRANTY; without even the implied warranty of
10# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11# GNU General Public License for more details.
12#
13# You should have received a copy of the GNU General Public License
14# along with eventmq. If not, see <http://www.gnu.org/licenses/>.
15"""
16:mod:`classes` -- Utility Classes
17=================================
18Defines some classes to use when implementing ZMQ devices
19"""
20
21from .. import exceptions
22from .. import log
23
24logger = log.get_logger(__file__)
25
26
27class ZMQReceiveMixin(object):
28 """
29 Defines some methods for receiving messages. This class will not work if
30 used on it's own
31 """
32 def recv(self):
33 """
34 Receive a message
35 """
36 return self.zsocket.recv()
37
38 def recv_multipart(self):
39 """
40 Receive a multipart message
41 """
42 return self.zsocket.recv_multipart()
43
44
45class ZMQSendMixin(object):
46 """
47 Defines some methods for sending messages. This class will not work if used
48 on it's own
49 """
50 def send_multipart(self, message, protocol_version, _recipient_id=None):
51 """
52 Send a message directly to the 0mq socket. Automatically inserts some
53 frames for your convience. The sent frame ends up looking something
54 like identity
55
56 (this, '', protocol_version) + (your, tuple)
57
58 Args:
59 message (tuple): Raw message to send.
60 protocol_version (str): protocol version. it's good practice but
61 you may explicitly specify None to skip adding the version
62 _recipient_id (object): When using a :attr:`zmq.ROUTER` you must
63 specify the the recipient id of the
64 """
65 supported_msg_types = (tuple, list)
66 if not isinstance(message, supported_msg_types):
67 raise exceptions.MessageError(
68 '%s message type not one of %s' %
69 (type(message), str(supported_msg_types)))
70
71 if isinstance(message, list):
72 message = tuple(message)
73
74 if _recipient_id:
75 headers = (_recipient_id, '', protocol_version)
76 else:
77 headers = ('', protocol_version, )
78
79 msg = headers + message
80 logger.debug('Sending message: %s' % str(msg))
81 self.zsocket.send_multipart(msg)
82
83 def send(self, message, protocol_version):
84 """
85 Sends a message
86
87 Args:
88 message: message to send to something
89 protocol_version (str): protocol version. it's good practice, but
90 you may explicitly specify None to skip adding the version
91 """
92 logger.debug('Sending message: %s' % str(message))
93 self.send_multipart((message, ), protocol_version)
diff --git a/eventmq/utils/messages.py b/eventmq/utils/messages.py
new file mode 100644
index 0000000..8abd374
--- /dev/null
+++ b/eventmq/utils/messages.py
@@ -0,0 +1,132 @@
1# This file is part of eventmq.
2#
3# eventmq is free software: you can redistribute it and/or modify
4# it under the terms of the GNU General Public License as published by
5# the Free Software Foundation, either version 3 of the License, or
6# (at your option) any later version.
7#
8# eventmq is distributed in the hope that it will be useful,
9# but WITHOUT ANY WARRANTY; without even the implied warranty of
10# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11# GNU General Public License for more details.
12#
13# You should have received a copy of the GNU General Public License
14# along with eventmq. If not, see <http://www.gnu.org/licenses/>.
15"""
16:mod:`messages` -- Message Utilities
17==========================================
18"""
19from .. import constants, log, exceptions
20from . import random_characters
21
22
23def parse_router_message(message):
24 """
25 Parses the generic format of an eMQP/1.0 message and returns the
26 parts.
27
28 Args:
29 message: the message you wish to have parsed
30
31 Returns (tuple) (sender_id, command, message_id, (message_body, and_data))
32 """
33 try:
34 sender = message[0]
35 # noop = message[1]
36 # protocol_version = message[2]
37 command = message[3]
38 msgid = message[4]
39 except IndexError:
40 raise exceptions.InvalidMessageError('Invalid Message Encountered: %s'
41 % str(message))
42 if len(message) > 5:
43 msg = message[5:]
44 else:
45 msg = ()
46 return (sender, command, msgid, msg)
47
48
49def parse_message(message):
50 """
51 Parses the generic format of an eMQP/1.0 message and returns the
52 parts.
53
54 Args:
55 message: the message you wish to have parsed
56
57 Returns (tuple) (command, message_id, (message_body, and_data))
58 """
59 try:
60 # noop = message[0]
61 # protocol_version = message[1]
62 command = message[2]
63 msgid = message[3]
64 except IndexError:
65 raise exceptions.InvalidMessageError('Invalid Message Encountered: %s'
66 % str(message))
67
68 print len(message)
69 if len(message) > 4:
70 msg = message[4:]
71 else:
72 msg = ()
73 return (command, msgid, msg)
74
75
76def generate_msgid(prefix=None):
77 """
78 Returns a random string to be used for message ids. Optionally the ID can
79 be prefixed with `prefix`.
80
81 Args:
82 prefix (str): Value to prefix on to the random part of the id. Useful
83 for prefixing some meta data to use for things
84 """
85 id = random_characters()
86 return id if not prefix else str(prefix) + id
87
88logger = log.get_logger(__file__)
89
90
91def send_emqp_message(socket, command, message=None):
92 """
93 Formats and sends an eMQP message
94
95 Args:
96
97 Raises:
98
99 Returns
100 """
101 msg = (str(command).upper(), generate_msgid())
102 if message and isinstance(message, (tuple, list)):
103 msg += message
104 elif message:
105 msg += (message,)
106
107 socket.send_multipart(msg, constants.PROTOCOL_VERSION)
108
109
110def send_emqp_router_message(socket, recipient_id, command, message=None):
111 """
112 Formats and sends an eMQP message taking into account the recipient frame
113 used by a :attr:`zmq.ROUTER` device.
114
115 Args:
116 socket: socket to send the message with
117 recipient_id (str): the id of the connected device to reply to
118 command (str): the eMQP command to send
119 message: a msg tuple to send
120
121 Raises:
122
123 Returns
124 """
125 msg = (str(command).upper(), generate_msgid())
126 if message and isinstance(message, (tuple, list)):
127 msg += message
128 elif message:
129 msg += (message,)
130
131 socket.send_multipart(msg, constants.PROTOCOL_VERSION,
132 _recipient_id=recipient_id)