aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorjason2015-11-24 17:38:24 -0700
committerjason2015-11-24 17:38:24 -0700
commita214dbf065fbea2f5e2706aa5e9f0a872b73536b (patch)
tree9c143a172d92cda78e5ee2e24bfe8b84092dc261
parent5de2b6bc337ac69204f0e709beafb28108e6ca20 (diff)
downloadeventmq-a214dbf065fbea2f5e2706aa5e9f0a872b73536b.tar.gz
eventmq-a214dbf065fbea2f5e2706aa5e9f0a872b73536b.zip
JobWorker gets clocks and heartbeats
Added a monotonic to JobWorker to schedule heartbeast
-rw-r--r--docs/protocol.rst5
-rw-r--r--eventmq/jobmanager.py103
-rw-r--r--eventmq/router.py11
-rw-r--r--eventmq/tests/test_utils.py15
-rw-r--r--eventmq/utils/__init__.py1
-rw-r--r--requirements.txt2
6 files changed, 98 insertions, 39 deletions
diff --git a/docs/protocol.rst b/docs/protocol.rst
index c32501c..72ab361 100644
--- a/docs/protocol.rst
+++ b/docs/protocol.rst
@@ -57,6 +57,7 @@ FRAME Value Description
571 eMQP/1.0 Protocol version 571 eMQP/1.0 Protocol version
582 ACK command 582 ACK command
593 _MSGID_ A unique id for the msg 593 _MSGID_ A unique id for the msg
604 _MSGID_ The message id of the message this ACK is acknowledging
60====== ============== =========== 61====== ============== ===========
61 62
62eMQP / Client 63eMQP / Client
@@ -165,7 +166,7 @@ Below is a table which defines and describes the headers.
165=============== ======= ======= ======= =========== 166=============== ======= ======= ======= ===========
166Header REQUEST PUBLISH Default Description 167Header REQUEST PUBLISH Default Description
167=============== ======= ======= ======= =========== 168=============== ======= ======= ======= ===========
168reply-requested X False Once the job is finished, send a reply back with information from the job. If there is no information reply with a True value. 169reply-requested X False Once the job is finished, send a reply back with information from the job. If there is no information reply with a True value.
169retry-count:# X 0 Retry a failed job this many times before accepting defeat. 170retry-count:# X 0 Retry a failed job this many times before accepting defeat.
170guarantee X False Ensure the job completes by letting someone else worry about a success reply. 171guarantee X False Ensure the job completes by letting someone else worry about a success reply.
171=============== ======= ======= ======= =========== 172=============== ======= ======= ======= ===========
diff --git a/eventmq/jobmanager.py b/eventmq/jobmanager.py
index d00d3e6..9b57285 100644
--- a/eventmq/jobmanager.py
+++ b/eventmq/jobmanager.py
@@ -17,17 +17,14 @@
17================================ 17================================
18Ensures things about jobs and spawns the actual tasks 18Ensures things about jobs and spawns the actual tasks
19""" 19"""
20from time import sleep
21import uuid 20import uuid
22 21
23from . import constants 22from . import conf, constants, exceptions, log, utils
24from . import exceptions
25from . import log
26from . import utils
27from .poller import Poller, POLLIN 23from .poller import Poller, POLLIN
28from .sender import Sender 24from .sender import Sender
29from .utils.messages import send_emqp_message as sendmsg 25from .utils.messages import send_emqp_message as sendmsg
30import utils.messages 26import utils.messages
27from .utils.timeutils import monotonic, timestamp
31 28
32logger = log.get_logger(__file__) 29logger = log.get_logger(__file__)
33 30
@@ -58,44 +55,101 @@ class JobManager(object):
58 55
59 self.status = constants.STATUS.ready 56 self.status = constants.STATUS.ready
60 57
61 # Are we waiting for an acknowledgment for something? 58 # Are we waiting for acknowledgement from someone that we've connected?
62 self.awaiting_ack = False 59 self.awaiting_startup_ack = False
60
61 # Meta data such as times, and counters are stored here
62 self._meta = {
63 'last_sent_heartbeat': 0,
64 'last_received_heartbeat': 0,
65 'heartbeat_miss_count': 0,
66 }
63 67
64 def start(self, addr='tcp://127.0.0.1:47291'): 68 def start(self, addr='tcp://127.0.0.1:47291'):
65 """ 69 """
66 Connect to `addr` and begin listening for job requests 70 Connect to `addr` and begin listening for job requests
67 71
68 Args: 72 Args:
69 args (str): connection string to connect to 73 addr (str): connection string to connect to
70 """ 74 """
71 self.status = constants.STATUS.connecting 75 self.status = constants.STATUS.connecting
72 self.incoming.connect(addr) 76 self.incoming.connect(addr)
73 77
74 self.awaiting_ack = True 78 self.awaiting_startup_ack = True
79
80 while self.awaiting_startup_ack:
81 self.send_inform()
82 # Poller timeout is in ms so we multiply it to get seconds
83 events = self.poller.poll(conf.RECONNECT_TIMEOUT * 1000)
84 if self.incoming in events:
85 msg = self.incoming.recv_multipart()
86 # We don't want to accidentally start processing jobs before
87 # our conenction has been setup completely and acknowledged.
88 if msg[2] != "ACK":
89 continue
90 self.process_message(msg)
75 91
76 #while self.awaiting_ack: 92 if not self.awaiting_startup_ack:
77 self.send_inform() 93 logger.info('Starting to listen for jobs')
78 # sleep(5) 94 self._start_event_loop()
79 95
96 def _start_event_loop(self):
97 """
98 Starts the actual eventloop. Usually called by :meth:`JobManager.start`
99 """
80 self.status = constants.STATUS.connected 100 self.status = constants.STATUS.connected
81 101
82 while True: 102 while True:
83 events = self.poller.poll(1000) 103 now = monotonic()
104 events = self.poller.poll()
84 105
85 if events.get(self.incoming) == POLLIN: 106 if events.get(self.incoming) == POLLIN:
86 msg = self.incoming.recv_multipart() 107 msg = self.incoming.recv_multipart()
87 self.process_message(msg) 108 self.process_message(msg)
88 109
110 # Send a HEARTBEAT if necessary
111 if now - self._meta['last_sent_heartbeat'] >= \
112 conf.HEARTBEAT_INTERVAL:
113 if conf.SUPER_DEBUG:
114 logger.debug(now - self._meta['last_sent_heartbeat'])
115 self.send_heartbeat()
116
117 # Do something about any missed HEARTBEAT
118 if now - self._meta['last_received_heartbeat'] >= \
119 conf.HEARTBEAT_TIMEOUT:
120 # Update as if we got the last heartbeat so we can check in
121 # interval again
122 self._meta['heartbeat_miss_count'] += 1
123 self._meta['last_received_heartbeat'] = monotonic()
124 if self._meta['heartbeat_miss_count'] >= \
125 conf.HEARTBEAT_LIVENESS:
126 logger.critical('The broker appears to have gone away. '
127 'Reconnecting...')
128 break
129 print self._meta['heartbeat_miss_count']
130
89 def process_message(self, msg): 131 def process_message(self, msg):
90 """ 132 """
91 Processes a message 133 Processes a message
134
135 Args:
136 msg: The message received from the socket to parse and process.
137 Processing takes form of calling an `on_COMMAND` method.
92 """ 138 """
139 # Any received message should count as a heartbeat
140 self._meta['last_received_heartbeat'] = monotonic()
141 if self._meta['heartbeat_miss_count']:
142 self._meta['heartbeat_miss_count'] = 0 # Reset the miss count too
143
93 try: 144 try:
94 message = utils.messages.parse_message(msg) 145 message = utils.messages.parse_message(msg)
95 except exceptions.InvalidMessageError: 146 except exceptions.InvalidMessageError:
96 logger.error('Invalid message: %s' % str(msg)) 147 logger.error('Invalid message: %s' % str(msg))
97 return 148 return
98 149
150 if conf.SUPER_DEBUG:
151 logger.debug("Received Message: %s" % msg)
152
99 command = message[0] 153 command = message[0]
100 msgid = message[1] 154 msgid = message[1]
101 message = message[2] 155 message = message[2]
@@ -108,21 +162,24 @@ class JobManager(object):
108 logger.warning('No handler for %s found (tried: %s)' % 162 logger.warning('No handler for %s found (tried: %s)' %
109 (command, ('on_%s' % command.lower))) 163 (command, ('on_%s' % command.lower)))
110 164
111 def process_job(self, msg):
112 pass
113
114 def sync(self):
115 pass
116
117 def send_inform(self): 165 def send_inform(self):
118 """ 166 """
119 Send an INFORM frame 167 Send an INFORM command
120 """ 168 """
121 sendmsg(self.incoming, 'INFORM', 'default_queuename') 169 sendmsg(self.incoming, 'INFORM', 'default_queuename')
122 170
123 def on_ack(self, msgid, message): 171 def send_heartbeat(self):
172 """
173 Send a HEARTBEAT command to the connected broker
174 """
175 sendmsg(self.incoming, 'HEARTBEAT', str(timestamp()))
176 self._meta['last_sent_heartbeat'] = monotonic()
177
178 def on_ack(self, msgid, ackd_msgid):
124 """ 179 """
125 Sets :attr:`awaiting_ack` to False 180 Sets :attr:`awaiting_ack` to False
126 """ 181 """
127 logger.info('Recieved ACK') 182 # The msgid is the only frame in the message
128 self.awaiting_ack = False 183 ackd_msgid = ackd_msgid[0]
184 logger.info('Received ACK for %s' % ackd_msgid)
185 self.awaiting_startup_ack = False
diff --git a/eventmq/router.py b/eventmq/router.py
index 77b752d..b6a4cdf 100644
--- a/eventmq/router.py
+++ b/eventmq/router.py
@@ -18,15 +18,16 @@
18Routes messages to workers (that are in named queues). 18Routes messages to workers (that are in named queues).
19""" 19"""
20import uuid 20import uuid
21
22from zmq.eventloop import ioloop 21from zmq.eventloop import ioloop
23 22
24from .constants import STATUS 23from .constants import STATUS
25from . import exceptions, log, receiver, utils 24from . import exceptions, log, receiver
26from .utils.messages import ( 25from .utils.messages import (
27 send_emqp_router_message as sendmsg, 26 send_emqp_router_message as sendmsg,
28 parse_router_message 27 parse_router_message
29) 28)
29from .utils.time import monotonic
30
30 31
31logger = log.get_logger(__file__) 32logger = log.get_logger(__file__)
32 33
@@ -73,12 +74,12 @@ class Router(object):
73 74
74 ioloop.IOLoop.instance().start() 75 ioloop.IOLoop.instance().start()
75 76
76 def send_ack(self, socket, recipient): 77 def send_ack(self, socket, recipient, msgid):
77 """ 78 """
78 Sends an ACK response 79 Sends an ACK response
79 """ 80 """
80 logger.info('Sending ACK to %s' % recipient) 81 logger.info('Sending ACK to %s' % recipient)
81 sendmsg(socket, recipient, 'ACK') 82 sendmsg(socket, recipient, 'ACK', msgid)
82 83
83 def on_inform(self, sender, msgid, msg): 84 def on_inform(self, sender, msgid, msg):
84 """ 85 """
@@ -92,7 +93,7 @@ class Router(object):
92 else: 93 else:
93 self.queues[queue_name] = (sender,) 94 self.queues[queue_name] = (sender,)
94 95
95 self.send_ack(self.outgoing, sender) 96 self.send_ack(self.outgoing, sender, msgid)
96 97
97 def on_receive_request(self, msg): 98 def on_receive_request(self, msg):
98 """ 99 """
diff --git a/eventmq/tests/test_utils.py b/eventmq/tests/test_utils.py
index db3307d..8ce7f6e 100644
--- a/eventmq/tests/test_utils.py
+++ b/eventmq/tests/test_utils.py
@@ -14,14 +14,13 @@
14# along with eventmq. If not, see <http://www.gnu.org/licenses/>. 14# along with eventmq. If not, see <http://www.gnu.org/licenses/>.
15import unittest 15import unittest
16 16
17from .. import exceptions 17from .. import exceptions, utils
18from .. import utils 18from ..utils import messages, classes
19import utils.messages
20 19
21 20
22class TestCase(unittest.TestCase): 21class TestCase(unittest.TestCase):
23 def test_generate_msgid(self): 22 def test_generate_msgid(self):
24 msgid = utils.messages.generate_msgid() 23 msgid = messages.generate_msgid()
25 24
26 self.assertEqual(type(msgid), str) 25 self.assertEqual(type(msgid), str)
27 26
@@ -31,9 +30,9 @@ class TestCase(unittest.TestCase):
31 emq_frame_manymsg = emq_headers + ('many', 'parts') 30 emq_frame_manymsg = emq_headers + ('many', 'parts')
32 emq_frame_nomsg = emq_headers 31 emq_frame_nomsg = emq_headers
33 32
34 singlemsg = utils.messages.parse_router_message(emq_frame_singlemsg) 33 singlemsg = messages.parse_router_message(emq_frame_singlemsg)
35 manymsg = utils.messages.parse_router_message(emq_frame_manymsg) 34 manymsg = messages.parse_router_message(emq_frame_manymsg)
36 nomsg = utils.messages.parse_router_message(emq_frame_nomsg) 35 nomsg = messages.parse_router_message(emq_frame_nomsg)
37 36
38 self.assertEqual(singlemsg[0], emq_frame_singlemsg[0]) 37 self.assertEqual(singlemsg[0], emq_frame_singlemsg[0])
39 self.assertEqual(singlemsg[1], emq_frame_singlemsg[3]) 38 self.assertEqual(singlemsg[1], emq_frame_singlemsg[3])
@@ -52,4 +51,4 @@ class TestCase(unittest.TestCase):
52 51
53 broken_message = ('dlkajfs', 'lkasdjf') 52 broken_message = ('dlkajfs', 'lkasdjf')
54 with self.assertRaises(exceptions.InvalidMessageError): 53 with self.assertRaises(exceptions.InvalidMessageError):
55 utils.messages.parse_router_message(broken_message) 54 messages.parse_router_message(broken_message)
diff --git a/eventmq/utils/__init__.py b/eventmq/utils/__init__.py
index af8a4dc..4fd9a89 100644
--- a/eventmq/utils/__init__.py
+++ b/eventmq/utils/__init__.py
@@ -26,6 +26,7 @@ like creating message more simple.
26""" 26"""
27import uuid 27import uuid
28 28
29
29def random_characters(): 30def random_characters():
30 """ 31 """
31 Returns some random characters of a specified length 32 Returns some random characters of a specified length
diff --git a/requirements.txt b/requirements.txt
index c4848d0..9724011 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -1,7 +1,7 @@
1pyzmq 1pyzmq
2six 2six
3tornado 3tornado
4 4monotonic==0.4 # A clock who's time is not changed. used for scheduling
5 5
6# Documentation 6# Documentation
7sphinxcontrib-napoleon 7sphinxcontrib-napoleon