aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rwxr-xr-xbin/router3
-rwxr-xr-xbin/scheduler3
-rw-r--r--docs/protocol.rst6
-rw-r--r--eventmq/client/__init__.py0
-rw-r--r--eventmq/client/messages.py61
-rw-r--r--eventmq/conf.py5
-rw-r--r--eventmq/jobmanager.py34
-rw-r--r--eventmq/log.py18
-rw-r--r--eventmq/poller.py4
-rw-r--r--eventmq/receiver.py5
-rw-r--r--eventmq/router.py81
-rw-r--r--eventmq/scheduler.py10
-rw-r--r--eventmq/sender.py5
-rw-r--r--eventmq/utils/classes.py28
-rw-r--r--eventmq/utils/messages.py29
15 files changed, 220 insertions, 72 deletions
diff --git a/bin/router b/bin/router
index 356ac4b..0081e97 100755
--- a/bin/router
+++ b/bin/router
@@ -1,8 +1,9 @@
1#!/usr/bin/env python 1#!/usr/bin/env python
2# -*- mode: python -*- 2# -*- mode: python -*-
3 3from eventmq.log import setup_logger
4from eventmq.router import Router 4from eventmq.router import Router
5 5
6if __name__ == "__main__": 6if __name__ == "__main__":
7 setup_logger('eventmq')
7 r = Router() 8 r = Router()
8 r.start() 9 r.start()
diff --git a/bin/scheduler b/bin/scheduler
index b591ce9..33f164e 100755
--- a/bin/scheduler
+++ b/bin/scheduler
@@ -1,8 +1,9 @@
1#!/usr/bin/env python 1#!/usr/bin/env python
2# -*- mode: python -*- 2# -*- mode: python -*-
3 3from eventmq.log import setup_logger
4from eventmq.scheduler import Scheduler 4from eventmq.scheduler import Scheduler
5 5
6if __name__ == "__main__": 6if __name__ == "__main__":
7 setup_logger("eventmq")
7 s = Scheduler() 8 s = Scheduler()
8 s.start() 9 s.start()
diff --git a/docs/protocol.rst b/docs/protocol.rst
index 86491dc..e468b06 100644
--- a/docs/protocol.rst
+++ b/docs/protocol.rst
@@ -5,7 +5,7 @@ EventMQ Protocol Specification
5 5
6Goals 6Goals
7===== 7=====
8The EventMQ Protocol (eMQP) defines a reliable service-oriented request-reply and pub-sub dialog between a set of clients, a broker, and a set of workers. This goal is to 8The EventMQ Protocol (eMQP) defines a reliable service-oriented request-reply and pub-sub dialog between a set of clients, a broker, and a set of workers. This goal is to
9 9
10The goals are to: 10The goals are to:
11 11
@@ -13,7 +13,7 @@ The goals are to:
13 * Allow requests to be routed to workers by an abstracted service name. 13 * Allow requests to be routed to workers by an abstracted service name.
14 * Detect disconnected peers through heartbeating. 14 * Detect disconnected peers through heartbeating.
15 * Allow for message tracing and debugging. 15 * Allow for message tracing and debugging.
16 16
17 17
18License 18License
19======= 19=======
@@ -69,7 +69,7 @@ FRAME Value Description
69====== ============== =========== 69====== ============== ===========
700 _EMPTY_ leave empty 700 _EMPTY_ leave empty
711 eMQP/1.0 Protocol version 711 eMQP/1.0 Protocol version
722 READY command 722 REQUEST command
733 _MSGID_ A unique id for the msg 733 _MSGID_ A unique id for the msg
744 _QUEUE_NAME_ the name of the queue the worker belongs to 744 _QUEUE_NAME_ the name of the queue the worker belongs to
755 _HEADERS_ dictionary of headers. can be an empty set 755 _HEADERS_ dictionary of headers. can be an empty set
diff --git a/eventmq/client/__init__.py b/eventmq/client/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/eventmq/client/__init__.py
diff --git a/eventmq/client/messages.py b/eventmq/client/messages.py
new file mode 100644
index 0000000..091b36f
--- /dev/null
+++ b/eventmq/client/messages.py
@@ -0,0 +1,61 @@
1# This file is part of eventmq.
2#
3# eventmq is free software: you can redistribute it and/or modify
4# it under the terms of the GNU General Public License as published by
5# the Free Software Foundation, either version 3 of the License, or
6# (at your option) any later version.
7#
8# eventmq is distributed in the hope that it will be useful,
9# but WITHOUT ANY WARRANTY; without even the implied warranty of
10# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11# GNU General Public License for more details.
12#
13# You should have received a copy of the GNU General Public License
14# along with eventmq. If not, see <http://www.gnu.org/licenses/>.
15"""
16:mod:`messages` -- Client Messaging
17===================================
18"""
19from json import dumps as serialize
20
21from .. import conf
22from ..utils.messages import send_emqp_message
23
24
25def send_request(socket, message, reply_requested=False, guarantee=False,
26 retry_count=0, queue=None):
27 """
28 Send a REQUEST command.
29
30 Default headers are always all disabled by default. If they are included in
31 the headers then they have been enabled.
32 """
33 headers = []
34
35 if reply_requested:
36 headers.append('reply-requested')
37
38 if guarantee:
39 headers.append('guarantee')
40
41 if retry_count > 0:
42 headers.append('retry-count:%d' % retry_count)
43
44 send_emqp_message(socket, 'REQUEST',
45 (queue or conf.DEFAULT_QUEUE_NAME,
46 ",".join(headers),
47 serialize(message))
48 )
49
50
51def job(block=False): # Move to decorators.py
52 """
53 run the decorated function on a worker
54
55 Args:
56 block (bool): Set to True if you wish to block and wait for the
57 response. This may be useful for running quick but cpu intesive
58 that would otherwise overwhelm a box that has to do it all alone.
59 (decryption?)
60 """
61 pass
diff --git a/eventmq/conf.py b/eventmq/conf.py
index ff4fe0f..6e48080 100644
--- a/eventmq/conf.py
+++ b/eventmq/conf.py
@@ -2,6 +2,11 @@
2# at different levels in the application 2# at different levels in the application
3SUPER_DEBUG = True 3SUPER_DEBUG = True
4 4
5# When a queue name isn't specified use this queue name for the default. It
6# would be a good idea to have a handful of workers listening on this queue
7# unless you're positive that everything specifies a queue with workers.
8DEFAULT_QUEUE_NAME = 'default'
9
5# {{{Job Manager 10# {{{Job Manager
6# How long should we wait before retrying to connect to a broker? 11# How long should we wait before retrying to connect to a broker?
7RECONNECT_TIMEOUT = 5 # in seconds 12RECONNECT_TIMEOUT = 5 # in seconds
diff --git a/eventmq/jobmanager.py b/eventmq/jobmanager.py
index 17b0bbd..10a4a34 100644
--- a/eventmq/jobmanager.py
+++ b/eventmq/jobmanager.py
@@ -17,18 +17,19 @@
17================================ 17================================
18Ensures things about jobs and spawns the actual tasks 18Ensures things about jobs and spawns the actual tasks
19""" 19"""
20import logging
20import uuid 21import uuid
21 22
22from . import conf, constants, exceptions, log, utils 23from . import conf, constants, exceptions, utils
23from .poller import Poller, POLLIN 24from .poller import Poller, POLLIN
24from .sender import Sender 25from .sender import Sender
25from .utils.classes import HeartbeatMixin 26from .utils.classes import HeartbeatMixin
26from .utils.messages import send_emqp_message as sendmsg 27from .utils.messages import send_emqp_message as sendmsg
27import utils.messages 28import utils.messages
28from .utils.timeutils import monotonic, timestamp 29from .utils.timeutils import monotonic
29 30
30 31
31logger = log.get_logger(__file__) 32logger = logging.getLogger(__name__)
32 33
33 34
34class JobManager(HeartbeatMixin): 35class JobManager(HeartbeatMixin):
@@ -49,7 +50,6 @@ class JobManager(HeartbeatMixin):
49 generated. 50 generated.
50 """ 51 """
51 super(JobManager, self).__init__(*args, **kwargs) 52 super(JobManager, self).__init__(*args, **kwargs)
52 print self._meta
53 self.name = kwargs.get('name', str(uuid.uuid4())) 53 self.name = kwargs.get('name', str(uuid.uuid4()))
54 logger.info('Initializing JobManager %s...' % self.name) 54 logger.info('Initializing JobManager %s...' % self.name)
55 self.incoming = Sender() 55 self.incoming = Sender()
@@ -141,6 +141,13 @@ class JobManager(HeartbeatMixin):
141 'Reconnecting...') 141 'Reconnecting...')
142 break 142 break
143 143
144 def on_request(self, msgid, msg):
145 """
146 Handles a REQUEST command
147 """
148 logger.debug("WHAT")
149 self.send_ready()
150
144 def process_message(self, msg): 151 def process_message(self, msg):
145 """ 152 """
146 Processes a message 153 Processes a message
@@ -160,26 +167,31 @@ class JobManager(HeartbeatMixin):
160 logger.error('Invalid message: %s' % str(msg)) 167 logger.error('Invalid message: %s' % str(msg))
161 return 168 return
162 169
163 if conf.SUPER_DEBUG:
164 logger.debug("Received Message: %s" % msg)
165
166 command = message[0] 170 command = message[0]
167 msgid = message[1] 171 msgid = message[1]
168 message = message[2] 172 message = message[2]
169 173
170 if hasattr(self, "on_%s" % command.lower()): 174 if hasattr(self, "on_%s" % command.lower()):
171 logger.debug('Calling on_%s' % command.lower()) 175 if conf.SUPER_DEBUG:
176 logger.debug('Calling on_%s' % command.lower())
172 func = getattr(self, "on_%s" % command.lower()) 177 func = getattr(self, "on_%s" % command.lower())
173 func(msgid, message) 178 func(msgid, message)
174 else: 179 else:
175 logger.warning('No handler for %s found (tried: %s)' % 180 logger.warning('No handler for %s found (tried: %s)' %
176 (command, ('on_%s' % command.lower()))) 181 (command, ('on_%s' % command.lower())))
177 182
178 def send_inform(self): 183 def send_ready(self):
184 """
185 send the READY command upstream to indicate that JobManager is ready
186 for another REQUEST message.
187 """
188 sendmsg(self.incoming, 'READY')
189
190 def send_inform(self, queue=None):
179 """ 191 """
180 Send an INFORM command 192 Send an INFORM command
181 """ 193 """
182 sendmsg(self.incoming, 'INFORM', 'default_queuename') 194 sendmsg(self.incoming, 'INFORM', queue or conf.DEFAULT_QUEUE_NAME)
183 self._meta['last_sent_heartbeat'] = monotonic() 195 self._meta['last_sent_heartbeat'] = monotonic()
184 196
185 def on_ack(self, msgid, ackd_msgid): 197 def on_ack(self, msgid, ackd_msgid):
@@ -188,7 +200,7 @@ class JobManager(HeartbeatMixin):
188 """ 200 """
189 # The msgid is the only frame in the message 201 # The msgid is the only frame in the message
190 ackd_msgid = ackd_msgid[0] 202 ackd_msgid = ackd_msgid[0]
191 logger.info('Received ACK for %s' % ackd_msgid) 203 logger.info('Received ACK for router (or client) %s' % ackd_msgid)
192 self.awaiting_startup_ack = False 204 self.awaiting_startup_ack = False
193 205
194 def on_heartbeat(self, msgid, message): 206 def on_heartbeat(self, msgid, message):
diff --git a/eventmq/log.py b/eventmq/log.py
index 487042a..072e1f3 100644
--- a/eventmq/log.py
+++ b/eventmq/log.py
@@ -1,11 +1,15 @@
1""" 1"""
2log module for eventmq 2log module for eventmq
3
4this needs so much work.
3""" 5"""
4import logging 6import logging
5 7
6import zmq 8import zmq
7import zmq.log.handlers 9import zmq.log.handlers
8 10
11import watchtower
12
9 13
10FORMAT_STANDARD = logging.Formatter( 14FORMAT_STANDARD = logging.Formatter(
11 '%(asctime)s - %(name)s %(levelname)s - %(message)s') 15 '%(asctime)s - %(name)s %(levelname)s - %(message)s')
@@ -28,12 +32,16 @@ class handlers(object):
28 """ 32 """
29 PUBLISH_HANDLER = PUBHandler 33 PUBLISH_HANDLER = PUBHandler
30 STREAM_HANDLER = logging.StreamHandler 34 STREAM_HANDLER = logging.StreamHandler
35 CLOUDWATCH_HANDLER = watchtower.CloudWatchLogHandler
31 36
32 37
33def get_logger(name, formatter=FORMAT_NAMELESS, 38def setup_logger(base_name, formatter=FORMAT_STANDARD,
34 handler=handlers.STREAM_HANDLER): 39 handler=handlers.STREAM_HANDLER):
35 logger = logging.getLogger(name) 40
41 logger = logging.getLogger(base_name)
36 logger.setLevel(logging.DEBUG) 42 logger.setLevel(logging.DEBUG)
43
44 # remove handlers we don't want
37 for h in logger.handlers: 45 for h in logger.handlers:
38 logger.removeHandler(h) 46 logger.removeHandler(h)
39 47
@@ -45,7 +53,9 @@ def get_logger(name, formatter=FORMAT_NAMELESS,
45 time.sleep(1) 53 time.sleep(1)
46 54
47 handler = handler(_handler_sock) 55 handler = handler(_handler_sock)
48 handler.root_topic = name 56 handler.root_topic = base_name
57 elif handler == handlers.CLOUDWATCH_HANDLER:
58 handler = handler(log_group='eventmq-dev')
49 else: 59 else:
50 handler = handler() 60 handler = handler()
51 61
diff --git a/eventmq/poller.py b/eventmq/poller.py
index 9c05b2d..73a00a4 100644
--- a/eventmq/poller.py
+++ b/eventmq/poller.py
@@ -17,14 +17,14 @@
17======================= 17=======================
18Device for polling sockets 18Device for polling sockets
19""" 19"""
20import logging
20import uuid 21import uuid
21 22
22import zmq 23import zmq
23from zmq import Poller as ZPoller 24from zmq import Poller as ZPoller
24 25
25from . import log
26 26
27logger = log.get_logger(__package__) 27logger = logging.getLogger(__name__)
28 28
29POLLIN = zmq.POLLIN 29POLLIN = zmq.POLLIN
30POLLOUT = zmq.POLLOUT 30POLLOUT = zmq.POLLOUT
diff --git a/eventmq/receiver.py b/eventmq/receiver.py
index 70b1279..e1ec062 100644
--- a/eventmq/receiver.py
+++ b/eventmq/receiver.py
@@ -17,15 +17,16 @@
17=========================== 17===========================
18The receiver is responsible for receiveing messages 18The receiver is responsible for receiveing messages
19""" 19"""
20import logging
20import uuid 21import uuid
21 22
22import zmq 23import zmq
23 24
24from . import constants, log 25from . import constants
25from .utils.classes import ZMQReceiveMixin, ZMQSendMixin 26from .utils.classes import ZMQReceiveMixin, ZMQSendMixin
26 27
27 28
28logger = log.get_logger(__file__) 29logger = logging.getLogger(__name__)
29 30
30 31
31class Receiver(ZMQReceiveMixin, ZMQSendMixin): 32class Receiver(ZMQReceiveMixin, ZMQSendMixin):
diff --git a/eventmq/router.py b/eventmq/router.py
index 821f422..145918b 100644
--- a/eventmq/router.py
+++ b/eventmq/router.py
@@ -18,19 +18,21 @@
18Routes messages to workers (that are in named queues). 18Routes messages to workers (that are in named queues).
19""" 19"""
20from copy import copy 20from copy import copy
21import uuid 21import logging
22 22
23from . import conf, exceptions, log, poller, receiver 23from . import conf, exceptions, poller, receiver
24from .constants import STATUS 24from .constants import STATUS
25from .utils.classes import HeartbeatMixin 25from .utils.classes import HeartbeatMixin
26from .utils.messages import ( 26from .utils.messages import (
27 send_emqp_router_message as sendmsg, 27 send_emqp_router_message as sendmsg,
28 fwd_emqp_router_message as fwdmsg,
28 parse_router_message 29 parse_router_message
29) 30)
31from .utils.devices import generate_device_name
30from .utils.timeutils import monotonic, timestamp 32from .utils.timeutils import monotonic, timestamp
31 33
32 34
33logger = log.get_logger(__file__) 35logger = logging.getLogger(__name__)
34 36
35 37
36class Router(HeartbeatMixin): 38class Router(HeartbeatMixin):
@@ -40,7 +42,7 @@ class Router(HeartbeatMixin):
40 def __init__(self, *args, **kwargs): 42 def __init__(self, *args, **kwargs):
41 super(Router, self).__init__(*args, **kwargs) # Creates _meta 43 super(Router, self).__init__(*args, **kwargs) # Creates _meta
42 44
43 self.name = str(uuid.uuid4()) 45 self.name = generate_device_name()
44 logger.info('Initializing Router %s...' % self.name) 46 logger.info('Initializing Router %s...' % self.name)
45 47
46 self.poller = poller.Poller() 48 self.poller = poller.Poller()
@@ -141,14 +143,14 @@ class Router(HeartbeatMixin):
141 """ 143 """
142 self._meta['last_sent_heartbeat'] = monotonic() 144 self._meta['last_sent_heartbeat'] = monotonic()
143 145
144 for k in self.workers: 146 for worker_id in self.workers:
145 self.send_heartbeat(self.outgoing, k) 147 self.send_heartbeat(self.outgoing, worker_id)
146 148
147 def on_heartbeat(self, sender, msgid, msg): 149 def on_heartbeat(self, sender, msgid, msg):
148 """ 150 """
149 a placeholder for a noop command. The actual 'logic' for HEARTBEAT is 151 a placeholder for a noop command. The actual 'logic' for HEARTBEAT is
150 in :meth:`self.process_worker_message` because any message from a worker 152 in :meth:`self.process_worker_message` because any message from a
151 counts as a HEARTBEAT 153 worker counts as a HEARTBEAT
152 """ 154 """
153 155
154 def on_inform(self, sender, msgid, msg): 156 def on_inform(self, sender, msgid, msg):
@@ -158,20 +160,16 @@ class Router(HeartbeatMixin):
158 logger.info('Received INFORM request from %s' % sender) 160 logger.info('Received INFORM request from %s' % sender)
159 queue_name = msg[0] 161 queue_name = msg[0]
160 162
161 # Add the worker to our worker dict 163 self.add_worker(sender, queue_name)
162 self.workers[sender] = {}
163 self.workers[sender]['queues'] = queue_name
164
165 # Add the worker to the queues it supports
166 if queue_name in self.queues:
167 self.queues[queue_name] += (sender,)
168 else:
169 self.queues[queue_name] = (sender,)
170 logger.debug('Adding %s to the worker pool for %s' %
171 (sender, queue_name))
172 164
173 self.send_ack(self.outgoing, sender, msgid) 165 self.send_ack(self.outgoing, sender, msgid)
174 166
167 def on_ready(self, sender, msgid, msg):
168 """
169 A worker that we should already know about is ready for another job
170 """
171 self.requeue_worker(sender)
172
175 def clean_up_dead_workers(self): 173 def clean_up_dead_workers(self):
176 """ 174 """
177 Loops through the worker queues and removes any workers who haven't 175 Loops through the worker queues and removes any workers who haven't
@@ -189,6 +187,7 @@ class Router(HeartbeatMixin):
189 187
190 # If a worker started, then immediatly died then no hb dictionary 188 # If a worker started, then immediatly died then no hb dictionary
191 # was created so we should just remove that worker. 189 # was created so we should just remove that worker.
190 # hb stands for heartbeat
192 if 'hb' not in self.workers[worker_id]: 191 if 'hb' not in self.workers[worker_id]:
193 logger.info('Removing worker %s from the queue due to no ' 192 logger.info('Removing worker %s from the queue due to no '
194 'heartbeat' % (worker_id)) 193 'heartbeat' % (worker_id))
@@ -202,7 +201,7 @@ class Router(HeartbeatMixin):
202 # Remove the worker from the actual worker queues 201 # Remove the worker from the actual worker queues
203 del self.workers[worker_id] 202 del self.workers[worker_id]
204 203
205 def add_worker(self, id, queues=None): 204 def add_worker(self, worker_id, queues=None):
206 """ 205 """
207 Adds a worker to worker queues 206 Adds a worker to worker queues
208 207
@@ -210,6 +209,32 @@ class Router(HeartbeatMixin):
210 worker_id: unique id of the worker to add 209 worker_id: unique id of the worker to add
211 queues: queue or queues this worker should be a member of 210 queues: queue or queues this worker should be a member of
212 """ 211 """
212 # Add the worker to our worker dict
213 self.workers[worker_id] = {}
214 self.workers[worker_id]['queues'] = queues
215
216 # Add the worker to the queues it supports
217 if queues in self.queues:
218 self.queues[queues] += [worker_id, ]
219 else:
220 self.queues[queues] = [worker_id, ]
221 logger.debug('Adding %s to the worker pool for %s' %
222 (worker_id, str(queues)))
223
224 def requeue_worker(self, worker_id):
225 """
226 Add a worker back to the queue pool
227 """
228 if worker_id in self.workers:
229 queues = self.workers[worker_id].get('queues', None)
230 else:
231 queues = None
232
233 if queues:
234 logger.debug('Readding worker {} to queues {}'.
235 format(worker_id, queues))
236
237 self.queues[queues].append(worker_id)
213 238
214 def on_receive_request(self, msg): 239 def on_receive_request(self, msg):
215 """ 240 """
@@ -224,18 +249,24 @@ class Router(HeartbeatMixin):
224 249
225 queue_name = message[3][0] 250 queue_name = message[3][0]
226 251
227 # cheat here and forward the message to the workers
228 self.outgoing.zsocket.send_multipart(msg)
229
230 # If we have no workers for the queue TODO something about it 252 # If we have no workers for the queue TODO something about it
231 if queue_name not in self.queues: 253 if queue_name not in self.queues:
232 logger.warning("Received REQUEST with a queue I don't recognize") 254 logger.warning("Received REQUEST with a queue I don't recognize")
233 255
256 try:
257 worker_addr = self.queues[queue_name].pop()
258 except IndexError:
259 # There were no workers in the queue
260 raise NotImplementedError("TODO: buffer when there are no workers "
261 "waiting in the queue")
262
263 fwdmsg(self.outgoing, worker_addr, msg[1:])
264
234 def process_worker_message(self, msg): 265 def process_worker_message(self, msg):
235 """ 266 """
236 This method is called when a message comes in from the worker socket. 267 This method is called when a message comes in from the worker socket.
237 It then calls `on_command`. If `on_command` isn't found, then a warning 268 It then calls `on_COMMAND.lower()`. If `on_command` isn't found, then
238 is created. 269 a warning is created.
239 270
240 def on_inform(msg): 271 def on_inform(msg):
241 pass 272 pass
diff --git a/eventmq/scheduler.py b/eventmq/scheduler.py
index a9a823b..a234c34 100644
--- a/eventmq/scheduler.py
+++ b/eventmq/scheduler.py
@@ -17,18 +17,17 @@
17============================= 17=============================
18Handles cron and other scheduled tasks 18Handles cron and other scheduled tasks
19""" 19"""
20import logging
20import time 21import time
21 22
22from croniter import croniter 23from croniter import croniter
23from six import next 24from six import next
24 25
25from . import log
26from .sender import Sender 26from .sender import Sender
27from .utils.classes import HeartbeatMixin 27from .utils.classes import HeartbeatMixin
28from .utils.devices import generate_device_name
29from .utils.timeutils import monotonic, seconds_until, timestamp 28from .utils.timeutils import monotonic, seconds_until, timestamp
30 29
31logger = log.get_logger(__file__) 30logger = logging.getLogger(__name__)
32 31
33 32
34class Scheduler(HeartbeatMixin): 33class Scheduler(HeartbeatMixin):
@@ -48,11 +47,12 @@ class Scheduler(HeartbeatMixin):
48 47
49 self.load_jobs() 48 self.load_jobs()
50 49
51 def connect(self, addr=''): 50 def connect(self, addr='tcp://127.0.0.1:47290'):
52 """ 51 """
53 Connect the scheduler to worker/router at `addr` 52 Connect the scheduler to worker/router at `addr`
54 """ 53 """
55 54
55
56 def load_jobs(self): 56 def load_jobs(self):
57 """ 57 """
58 Loads the jobs that need to be scheduled 58 Loads the jobs that need to be scheduled
@@ -117,4 +117,4 @@ class Scheduler(HeartbeatMixin):
117 logger.debug("Next execution will be in %ss" % 117 logger.debug("Next execution will be in %ss" %
118 seconds_until(self.jobs[i][0])) 118 seconds_until(self.jobs[i][0]))
119 119
120 time.sleep(1) 120 time.sleep(0.1)
diff --git a/eventmq/sender.py b/eventmq/sender.py
index e9eb655..7a780c4 100644
--- a/eventmq/sender.py
+++ b/eventmq/sender.py
@@ -17,14 +17,15 @@
17======================= 17=======================
18The sender is responsible for sending messages 18The sender is responsible for sending messages
19""" 19"""
20import logging
20import uuid 21import uuid
21 22
22import zmq 23import zmq
23 24
24from . import constants, exceptions, log 25from . import constants, exceptions
25from .utils.classes import ZMQReceiveMixin, ZMQSendMixin 26from .utils.classes import ZMQReceiveMixin, ZMQSendMixin
26 27
27logger = log.get_logger(__file__) 28logger = logging.getLogger(__name__)
28 29
29 30
30class Sender(ZMQSendMixin, ZMQReceiveMixin): 31class Sender(ZMQSendMixin, ZMQReceiveMixin):
diff --git a/eventmq/utils/classes.py b/eventmq/utils/classes.py
index 5a82b97..8b34431 100644
--- a/eventmq/utils/classes.py
+++ b/eventmq/utils/classes.py
@@ -17,13 +17,15 @@
17================================= 17=================================
18Defines some classes to use when implementing ZMQ devices 18Defines some classes to use when implementing ZMQ devices
19""" 19"""
20import logging
21
20import zmq.error 22import zmq.error
21 23
22from .. import conf, exceptions, log 24from .. import conf, exceptions
23from ..utils.messages import send_emqp_message as sendmsg 25from ..utils.messages import send_emqp_message as sendmsg
24from ..utils.timeutils import monotonic, timestamp 26from ..utils.timeutils import monotonic, timestamp
25 27
26logger = log.get_logger(__file__) 28logger = logging.getLogger(__name__)
27 29
28 30
29class HeartbeatMixin(object): 31class HeartbeatMixin(object):
@@ -99,13 +101,19 @@ class ZMQReceiveMixin(object):
99 """ 101 """
100 Receive a message 102 Receive a message
101 """ 103 """
102 return self.zsocket.recv() 104 msg = self.zsocket.recv()
105 if conf.SUPER_DEBUG:
106 logger.debug('Received message: {}'.format(msg))
107 return msg
103 108
104 def recv_multipart(self): 109 def recv_multipart(self):
105 """ 110 """
106 Receive a multipart message 111 Receive a multipart message
107 """ 112 """
108 return self.zsocket.recv_multipart() 113 msg = self.zsocket.recv_multipart()
114 if conf.SUPER_DEBUG:
115 logger.debug('Received message: {}'.format(msg))
116 return msg
109 117
110 118
111class ZMQSendMixin(object): 119class ZMQSendMixin(object):
@@ -117,16 +125,16 @@ class ZMQSendMixin(object):
117 """ 125 """
118 Send a message directly to the 0mq socket. Automatically inserts some 126 Send a message directly to the 0mq socket. Automatically inserts some
119 frames for your convience. The sent frame ends up looking something 127 frames for your convience. The sent frame ends up looking something
120 like identity 128 like this
121 129
122 (this, '', protocol_version) + (your, tuple) 130 (_recipient_id, '', protocol_version) + (your, tuple)
123 131
124 Args: 132 Args:
125 message (tuple): Raw message to send. 133 message (tuple): Raw message to send.
126 protocol_version (str): protocol version. it's good practice but 134 protocol_version (str): protocol version. it's good practice but
127 you may explicitly specify None to skip adding the version 135 you may explicitly specify None to skip adding the version
128 _recipient_id (object): When using a :attr:`zmq.ROUTER` you must 136 _recipient_id (object): When using a :attr:`zmq.ROUTER` you must
129 specify the the recipient id of the 137 specify the the recipient id of the remote socket
130 """ 138 """
131 supported_msg_types = (tuple, list) 139 supported_msg_types = (tuple, list)
132 if not isinstance(message, supported_msg_types): 140 if not isinstance(message, supported_msg_types):
@@ -143,7 +151,10 @@ class ZMQSendMixin(object):
143 headers = ('', protocol_version, ) 151 headers = ('', protocol_version, )
144 152
145 msg = headers + message 153 msg = headers + message
146 logger.debug('Sending message: %s' % str(msg)) 154
155 if conf.SUPER_DEBUG:
156 logger.debug('Sending message: %s' % str(msg))
157
147 try: 158 try:
148 self.zsocket.send_multipart(msg) 159 self.zsocket.send_multipart(msg)
149 except zmq.error.ZMQError as e: 160 except zmq.error.ZMQError as e:
@@ -159,5 +170,4 @@ class ZMQSendMixin(object):
159 protocol_version (str): protocol version. it's good practice, but 170 protocol_version (str): protocol version. it's good practice, but
160 you may explicitly specify None to skip adding the version 171 you may explicitly specify None to skip adding the version
161 """ 172 """
162 logger.debug('Sending message: %s' % str(message))
163 self.send_multipart((message, ), protocol_version) 173 self.send_multipart((message, ), protocol_version)
diff --git a/eventmq/utils/messages.py b/eventmq/utils/messages.py
index 643712c..3b26a1f 100644
--- a/eventmq/utils/messages.py
+++ b/eventmq/utils/messages.py
@@ -16,9 +16,13 @@
16:mod:`messages` -- Message Utilities 16:mod:`messages` -- Message Utilities
17========================================== 17==========================================
18""" 18"""
19from .. import constants, log, exceptions 19import logging
20
21from .. import constants, exceptions
20from . import random_characters 22from . import random_characters
21 23
24logger = logging.getLogger(__name__)
25
22 26
23def parse_router_message(message): 27def parse_router_message(message):
24 """ 28 """
@@ -84,22 +88,20 @@ def generate_msgid(prefix=None):
84 id = random_characters() 88 id = random_characters()
85 return id if not prefix else str(prefix) + id 89 return id if not prefix else str(prefix) + id
86 90
87logger = log.get_logger(__file__)
88
89 91
90def send_emqp_message(socket, command, message=None): 92def send_emqp_message(socket, command, message=None):
91 """ 93 """
92 Formats and sends an eMQP message 94 Formats and sends an eMQP message
93 95
94 Args: 96 Args:
95 97 socket
98 command
99 message
96 Raises: 100 Raises:
97
98 Returns
99 """ 101 """
100 msg = (str(command).upper(), generate_msgid()) 102 msg = (str(command).upper(), generate_msgid())
101 if message and isinstance(message, (tuple, list)): 103 if message and isinstance(message, (tuple, list)):
102 msg += message 104 msg += tuple(message)
103 elif message: 105 elif message:
104 msg += (message,) 106 msg += (message,)
105 107
@@ -129,3 +131,16 @@ def send_emqp_router_message(socket, recipient_id, command, message=None):
129 131
130 socket.send_multipart(msg, constants.PROTOCOL_VERSION, 132 socket.send_multipart(msg, constants.PROTOCOL_VERSION,
131 _recipient_id=recipient_id) 133 _recipient_id=recipient_id)
134
135
136def fwd_emqp_router_message(socket, recipient_id, payload):
137 """
138 Forwards `payload` to socket untouched.
139
140 .. note:
141 Because it's untouched, and because this function targets
142 :prop:`zmq.ROUTER`, it may be a good idea to first strip off the
143 leading sender id before forwarding it. If you dont you will need to
144 account for that on the recipient side.
145 """
146 socket.zsocket.send_multipart([recipient_id, ] + payload)