diff options
| author | jason | 2015-11-24 17:38:24 -0700 |
|---|---|---|
| committer | jason | 2015-11-24 17:38:24 -0700 |
| commit | a214dbf065fbea2f5e2706aa5e9f0a872b73536b (patch) | |
| tree | 9c143a172d92cda78e5ee2e24bfe8b84092dc261 | |
| parent | 5de2b6bc337ac69204f0e709beafb28108e6ca20 (diff) | |
| download | eventmq-a214dbf065fbea2f5e2706aa5e9f0a872b73536b.tar.gz eventmq-a214dbf065fbea2f5e2706aa5e9f0a872b73536b.zip | |
JobWorker gets clocks and heartbeats
Added a monotonic to JobWorker to schedule heartbeast
| -rw-r--r-- | docs/protocol.rst | 5 | ||||
| -rw-r--r-- | eventmq/jobmanager.py | 103 | ||||
| -rw-r--r-- | eventmq/router.py | 11 | ||||
| -rw-r--r-- | eventmq/tests/test_utils.py | 15 | ||||
| -rw-r--r-- | eventmq/utils/__init__.py | 1 | ||||
| -rw-r--r-- | requirements.txt | 2 |
6 files changed, 98 insertions, 39 deletions
diff --git a/docs/protocol.rst b/docs/protocol.rst index c32501c..72ab361 100644 --- a/docs/protocol.rst +++ b/docs/protocol.rst | |||
| @@ -57,6 +57,7 @@ FRAME Value Description | |||
| 57 | 1 eMQP/1.0 Protocol version | 57 | 1 eMQP/1.0 Protocol version |
| 58 | 2 ACK command | 58 | 2 ACK command |
| 59 | 3 _MSGID_ A unique id for the msg | 59 | 3 _MSGID_ A unique id for the msg |
| 60 | 4 _MSGID_ The message id of the message this ACK is acknowledging | ||
| 60 | ====== ============== =========== | 61 | ====== ============== =========== |
| 61 | 62 | ||
| 62 | eMQP / Client | 63 | eMQP / Client |
| @@ -165,7 +166,7 @@ Below is a table which defines and describes the headers. | |||
| 165 | =============== ======= ======= ======= =========== | 166 | =============== ======= ======= ======= =========== |
| 166 | Header REQUEST PUBLISH Default Description | 167 | Header REQUEST PUBLISH Default Description |
| 167 | =============== ======= ======= ======= =========== | 168 | =============== ======= ======= ======= =========== |
| 168 | reply-requested X False Once the job is finished, send a reply back with information from the job. If there is no information reply with a True value. | 169 | reply-requested X False Once the job is finished, send a reply back with information from the job. If there is no information reply with a True value. |
| 169 | retry-count:# X 0 Retry a failed job this many times before accepting defeat. | 170 | retry-count:# X 0 Retry a failed job this many times before accepting defeat. |
| 170 | guarantee X False Ensure the job completes by letting someone else worry about a success reply. | 171 | guarantee X False Ensure the job completes by letting someone else worry about a success reply. |
| 171 | =============== ======= ======= ======= =========== | 172 | =============== ======= ======= ======= =========== |
diff --git a/eventmq/jobmanager.py b/eventmq/jobmanager.py index d00d3e6..9b57285 100644 --- a/eventmq/jobmanager.py +++ b/eventmq/jobmanager.py | |||
| @@ -17,17 +17,14 @@ | |||
| 17 | ================================ | 17 | ================================ |
| 18 | Ensures things about jobs and spawns the actual tasks | 18 | Ensures things about jobs and spawns the actual tasks |
| 19 | """ | 19 | """ |
| 20 | from time import sleep | ||
| 21 | import uuid | 20 | import uuid |
| 22 | 21 | ||
| 23 | from . import constants | 22 | from . import conf, constants, exceptions, log, utils |
| 24 | from . import exceptions | ||
| 25 | from . import log | ||
| 26 | from . import utils | ||
| 27 | from .poller import Poller, POLLIN | 23 | from .poller import Poller, POLLIN |
| 28 | from .sender import Sender | 24 | from .sender import Sender |
| 29 | from .utils.messages import send_emqp_message as sendmsg | 25 | from .utils.messages import send_emqp_message as sendmsg |
| 30 | import utils.messages | 26 | import utils.messages |
| 27 | from .utils.timeutils import monotonic, timestamp | ||
| 31 | 28 | ||
| 32 | logger = log.get_logger(__file__) | 29 | logger = log.get_logger(__file__) |
| 33 | 30 | ||
| @@ -58,44 +55,101 @@ class JobManager(object): | |||
| 58 | 55 | ||
| 59 | self.status = constants.STATUS.ready | 56 | self.status = constants.STATUS.ready |
| 60 | 57 | ||
| 61 | # Are we waiting for an acknowledgment for something? | 58 | # Are we waiting for acknowledgement from someone that we've connected? |
| 62 | self.awaiting_ack = False | 59 | self.awaiting_startup_ack = False |
| 60 | |||
| 61 | # Meta data such as times, and counters are stored here | ||
| 62 | self._meta = { | ||
| 63 | 'last_sent_heartbeat': 0, | ||
| 64 | 'last_received_heartbeat': 0, | ||
| 65 | 'heartbeat_miss_count': 0, | ||
| 66 | } | ||
| 63 | 67 | ||
| 64 | def start(self, addr='tcp://127.0.0.1:47291'): | 68 | def start(self, addr='tcp://127.0.0.1:47291'): |
| 65 | """ | 69 | """ |
| 66 | Connect to `addr` and begin listening for job requests | 70 | Connect to `addr` and begin listening for job requests |
| 67 | 71 | ||
| 68 | Args: | 72 | Args: |
| 69 | args (str): connection string to connect to | 73 | addr (str): connection string to connect to |
| 70 | """ | 74 | """ |
| 71 | self.status = constants.STATUS.connecting | 75 | self.status = constants.STATUS.connecting |
| 72 | self.incoming.connect(addr) | 76 | self.incoming.connect(addr) |
| 73 | 77 | ||
| 74 | self.awaiting_ack = True | 78 | self.awaiting_startup_ack = True |
| 79 | |||
| 80 | while self.awaiting_startup_ack: | ||
| 81 | self.send_inform() | ||
| 82 | # Poller timeout is in ms so we multiply it to get seconds | ||
| 83 | events = self.poller.poll(conf.RECONNECT_TIMEOUT * 1000) | ||
| 84 | if self.incoming in events: | ||
| 85 | msg = self.incoming.recv_multipart() | ||
| 86 | # We don't want to accidentally start processing jobs before | ||
| 87 | # our conenction has been setup completely and acknowledged. | ||
| 88 | if msg[2] != "ACK": | ||
| 89 | continue | ||
| 90 | self.process_message(msg) | ||
| 75 | 91 | ||
| 76 | #while self.awaiting_ack: | 92 | if not self.awaiting_startup_ack: |
| 77 | self.send_inform() | 93 | logger.info('Starting to listen for jobs') |
| 78 | # sleep(5) | 94 | self._start_event_loop() |
| 79 | 95 | ||
| 96 | def _start_event_loop(self): | ||
| 97 | """ | ||
| 98 | Starts the actual eventloop. Usually called by :meth:`JobManager.start` | ||
| 99 | """ | ||
| 80 | self.status = constants.STATUS.connected | 100 | self.status = constants.STATUS.connected |
| 81 | 101 | ||
| 82 | while True: | 102 | while True: |
| 83 | events = self.poller.poll(1000) | 103 | now = monotonic() |
| 104 | events = self.poller.poll() | ||
| 84 | 105 | ||
| 85 | if events.get(self.incoming) == POLLIN: | 106 | if events.get(self.incoming) == POLLIN: |
| 86 | msg = self.incoming.recv_multipart() | 107 | msg = self.incoming.recv_multipart() |
| 87 | self.process_message(msg) | 108 | self.process_message(msg) |
| 88 | 109 | ||
| 110 | # Send a HEARTBEAT if necessary | ||
| 111 | if now - self._meta['last_sent_heartbeat'] >= \ | ||
| 112 | conf.HEARTBEAT_INTERVAL: | ||
| 113 | if conf.SUPER_DEBUG: | ||
| 114 | logger.debug(now - self._meta['last_sent_heartbeat']) | ||
| 115 | self.send_heartbeat() | ||
| 116 | |||
| 117 | # Do something about any missed HEARTBEAT | ||
| 118 | if now - self._meta['last_received_heartbeat'] >= \ | ||
| 119 | conf.HEARTBEAT_TIMEOUT: | ||
| 120 | # Update as if we got the last heartbeat so we can check in | ||
| 121 | # interval again | ||
| 122 | self._meta['heartbeat_miss_count'] += 1 | ||
| 123 | self._meta['last_received_heartbeat'] = monotonic() | ||
| 124 | if self._meta['heartbeat_miss_count'] >= \ | ||
| 125 | conf.HEARTBEAT_LIVENESS: | ||
| 126 | logger.critical('The broker appears to have gone away. ' | ||
| 127 | 'Reconnecting...') | ||
| 128 | break | ||
| 129 | print self._meta['heartbeat_miss_count'] | ||
| 130 | |||
| 89 | def process_message(self, msg): | 131 | def process_message(self, msg): |
| 90 | """ | 132 | """ |
| 91 | Processes a message | 133 | Processes a message |
| 134 | |||
| 135 | Args: | ||
| 136 | msg: The message received from the socket to parse and process. | ||
| 137 | Processing takes form of calling an `on_COMMAND` method. | ||
| 92 | """ | 138 | """ |
| 139 | # Any received message should count as a heartbeat | ||
| 140 | self._meta['last_received_heartbeat'] = monotonic() | ||
| 141 | if self._meta['heartbeat_miss_count']: | ||
| 142 | self._meta['heartbeat_miss_count'] = 0 # Reset the miss count too | ||
| 143 | |||
| 93 | try: | 144 | try: |
| 94 | message = utils.messages.parse_message(msg) | 145 | message = utils.messages.parse_message(msg) |
| 95 | except exceptions.InvalidMessageError: | 146 | except exceptions.InvalidMessageError: |
| 96 | logger.error('Invalid message: %s' % str(msg)) | 147 | logger.error('Invalid message: %s' % str(msg)) |
| 97 | return | 148 | return |
| 98 | 149 | ||
| 150 | if conf.SUPER_DEBUG: | ||
| 151 | logger.debug("Received Message: %s" % msg) | ||
| 152 | |||
| 99 | command = message[0] | 153 | command = message[0] |
| 100 | msgid = message[1] | 154 | msgid = message[1] |
| 101 | message = message[2] | 155 | message = message[2] |
| @@ -108,21 +162,24 @@ class JobManager(object): | |||
| 108 | logger.warning('No handler for %s found (tried: %s)' % | 162 | logger.warning('No handler for %s found (tried: %s)' % |
| 109 | (command, ('on_%s' % command.lower))) | 163 | (command, ('on_%s' % command.lower))) |
| 110 | 164 | ||
| 111 | def process_job(self, msg): | ||
| 112 | pass | ||
| 113 | |||
| 114 | def sync(self): | ||
| 115 | pass | ||
| 116 | |||
| 117 | def send_inform(self): | 165 | def send_inform(self): |
| 118 | """ | 166 | """ |
| 119 | Send an INFORM frame | 167 | Send an INFORM command |
| 120 | """ | 168 | """ |
| 121 | sendmsg(self.incoming, 'INFORM', 'default_queuename') | 169 | sendmsg(self.incoming, 'INFORM', 'default_queuename') |
| 122 | 170 | ||
| 123 | def on_ack(self, msgid, message): | 171 | def send_heartbeat(self): |
| 172 | """ | ||
| 173 | Send a HEARTBEAT command to the connected broker | ||
| 174 | """ | ||
| 175 | sendmsg(self.incoming, 'HEARTBEAT', str(timestamp())) | ||
| 176 | self._meta['last_sent_heartbeat'] = monotonic() | ||
| 177 | |||
| 178 | def on_ack(self, msgid, ackd_msgid): | ||
| 124 | """ | 179 | """ |
| 125 | Sets :attr:`awaiting_ack` to False | 180 | Sets :attr:`awaiting_ack` to False |
| 126 | """ | 181 | """ |
| 127 | logger.info('Recieved ACK') | 182 | # The msgid is the only frame in the message |
| 128 | self.awaiting_ack = False | 183 | ackd_msgid = ackd_msgid[0] |
| 184 | logger.info('Received ACK for %s' % ackd_msgid) | ||
| 185 | self.awaiting_startup_ack = False | ||
diff --git a/eventmq/router.py b/eventmq/router.py index 77b752d..b6a4cdf 100644 --- a/eventmq/router.py +++ b/eventmq/router.py | |||
| @@ -18,15 +18,16 @@ | |||
| 18 | Routes messages to workers (that are in named queues). | 18 | Routes messages to workers (that are in named queues). |
| 19 | """ | 19 | """ |
| 20 | import uuid | 20 | import uuid |
| 21 | |||
| 22 | from zmq.eventloop import ioloop | 21 | from zmq.eventloop import ioloop |
| 23 | 22 | ||
| 24 | from .constants import STATUS | 23 | from .constants import STATUS |
| 25 | from . import exceptions, log, receiver, utils | 24 | from . import exceptions, log, receiver |
| 26 | from .utils.messages import ( | 25 | from .utils.messages import ( |
| 27 | send_emqp_router_message as sendmsg, | 26 | send_emqp_router_message as sendmsg, |
| 28 | parse_router_message | 27 | parse_router_message |
| 29 | ) | 28 | ) |
| 29 | from .utils.time import monotonic | ||
| 30 | |||
| 30 | 31 | ||
| 31 | logger = log.get_logger(__file__) | 32 | logger = log.get_logger(__file__) |
| 32 | 33 | ||
| @@ -73,12 +74,12 @@ class Router(object): | |||
| 73 | 74 | ||
| 74 | ioloop.IOLoop.instance().start() | 75 | ioloop.IOLoop.instance().start() |
| 75 | 76 | ||
| 76 | def send_ack(self, socket, recipient): | 77 | def send_ack(self, socket, recipient, msgid): |
| 77 | """ | 78 | """ |
| 78 | Sends an ACK response | 79 | Sends an ACK response |
| 79 | """ | 80 | """ |
| 80 | logger.info('Sending ACK to %s' % recipient) | 81 | logger.info('Sending ACK to %s' % recipient) |
| 81 | sendmsg(socket, recipient, 'ACK') | 82 | sendmsg(socket, recipient, 'ACK', msgid) |
| 82 | 83 | ||
| 83 | def on_inform(self, sender, msgid, msg): | 84 | def on_inform(self, sender, msgid, msg): |
| 84 | """ | 85 | """ |
| @@ -92,7 +93,7 @@ class Router(object): | |||
| 92 | else: | 93 | else: |
| 93 | self.queues[queue_name] = (sender,) | 94 | self.queues[queue_name] = (sender,) |
| 94 | 95 | ||
| 95 | self.send_ack(self.outgoing, sender) | 96 | self.send_ack(self.outgoing, sender, msgid) |
| 96 | 97 | ||
| 97 | def on_receive_request(self, msg): | 98 | def on_receive_request(self, msg): |
| 98 | """ | 99 | """ |
diff --git a/eventmq/tests/test_utils.py b/eventmq/tests/test_utils.py index db3307d..8ce7f6e 100644 --- a/eventmq/tests/test_utils.py +++ b/eventmq/tests/test_utils.py | |||
| @@ -14,14 +14,13 @@ | |||
| 14 | # along with eventmq. If not, see <http://www.gnu.org/licenses/>. | 14 | # along with eventmq. If not, see <http://www.gnu.org/licenses/>. |
| 15 | import unittest | 15 | import unittest |
| 16 | 16 | ||
| 17 | from .. import exceptions | 17 | from .. import exceptions, utils |
| 18 | from .. import utils | 18 | from ..utils import messages, classes |
| 19 | import utils.messages | ||
| 20 | 19 | ||
| 21 | 20 | ||
| 22 | class TestCase(unittest.TestCase): | 21 | class TestCase(unittest.TestCase): |
| 23 | def test_generate_msgid(self): | 22 | def test_generate_msgid(self): |
| 24 | msgid = utils.messages.generate_msgid() | 23 | msgid = messages.generate_msgid() |
| 25 | 24 | ||
| 26 | self.assertEqual(type(msgid), str) | 25 | self.assertEqual(type(msgid), str) |
| 27 | 26 | ||
| @@ -31,9 +30,9 @@ class TestCase(unittest.TestCase): | |||
| 31 | emq_frame_manymsg = emq_headers + ('many', 'parts') | 30 | emq_frame_manymsg = emq_headers + ('many', 'parts') |
| 32 | emq_frame_nomsg = emq_headers | 31 | emq_frame_nomsg = emq_headers |
| 33 | 32 | ||
| 34 | singlemsg = utils.messages.parse_router_message(emq_frame_singlemsg) | 33 | singlemsg = messages.parse_router_message(emq_frame_singlemsg) |
| 35 | manymsg = utils.messages.parse_router_message(emq_frame_manymsg) | 34 | manymsg = messages.parse_router_message(emq_frame_manymsg) |
| 36 | nomsg = utils.messages.parse_router_message(emq_frame_nomsg) | 35 | nomsg = messages.parse_router_message(emq_frame_nomsg) |
| 37 | 36 | ||
| 38 | self.assertEqual(singlemsg[0], emq_frame_singlemsg[0]) | 37 | self.assertEqual(singlemsg[0], emq_frame_singlemsg[0]) |
| 39 | self.assertEqual(singlemsg[1], emq_frame_singlemsg[3]) | 38 | self.assertEqual(singlemsg[1], emq_frame_singlemsg[3]) |
| @@ -52,4 +51,4 @@ class TestCase(unittest.TestCase): | |||
| 52 | 51 | ||
| 53 | broken_message = ('dlkajfs', 'lkasdjf') | 52 | broken_message = ('dlkajfs', 'lkasdjf') |
| 54 | with self.assertRaises(exceptions.InvalidMessageError): | 53 | with self.assertRaises(exceptions.InvalidMessageError): |
| 55 | utils.messages.parse_router_message(broken_message) | 54 | messages.parse_router_message(broken_message) |
diff --git a/eventmq/utils/__init__.py b/eventmq/utils/__init__.py index af8a4dc..4fd9a89 100644 --- a/eventmq/utils/__init__.py +++ b/eventmq/utils/__init__.py | |||
| @@ -26,6 +26,7 @@ like creating message more simple. | |||
| 26 | """ | 26 | """ |
| 27 | import uuid | 27 | import uuid |
| 28 | 28 | ||
| 29 | |||
| 29 | def random_characters(): | 30 | def random_characters(): |
| 30 | """ | 31 | """ |
| 31 | Returns some random characters of a specified length | 32 | Returns some random characters of a specified length |
diff --git a/requirements.txt b/requirements.txt index c4848d0..9724011 100644 --- a/requirements.txt +++ b/requirements.txt | |||
| @@ -1,7 +1,7 @@ | |||
| 1 | pyzmq | 1 | pyzmq |
| 2 | six | 2 | six |
| 3 | tornado | 3 | tornado |
| 4 | 4 | monotonic==0.4 # A clock who's time is not changed. used for scheduling | |
| 5 | 5 | ||
| 6 | # Documentation | 6 | # Documentation |
| 7 | sphinxcontrib-napoleon | 7 | sphinxcontrib-napoleon |