diff options
| author | jason | 2016-06-30 15:24:57 -0600 |
|---|---|---|
| committer | GitHub | 2016-06-30 15:24:57 -0600 |
| commit | d687fab75c8ae61a9e4d602f1fbdf9003100cb22 (patch) | |
| tree | 977e2febfc35172d418bf287d3937dedd44d5b21 | |
| parent | c7acf27259f5145722c3d9f29d70e8e6af48b959 (diff) | |
| parent | 6463dbf471e4806a1a0fb9e46b465cfa1bb6b22a (diff) | |
| download | eventmq-0.2.3.tar.gz eventmq-0.2.3.zip | |
Merge pull request #35 from sideshowdave7/feature/replies_and_latency0.2.3
Feature: job latencies
| -rwxr-xr-x | bin/send_request | 24 | ||||
| -rw-r--r-- | eventmq/jobmanager.py | 60 | ||||
| -rw-r--r-- | eventmq/router.py | 26 | ||||
| -rw-r--r-- | eventmq/tests/test_jobmanager.py | 2 | ||||
| -rw-r--r-- | eventmq/worker.py | 13 |
5 files changed, 107 insertions, 18 deletions
diff --git a/bin/send_request b/bin/send_request new file mode 100755 index 0000000..aa84639 --- /dev/null +++ b/bin/send_request | |||
| @@ -0,0 +1,24 @@ | |||
| 1 | #!/usr/bin/env python | ||
| 2 | """ | ||
| 3 | Usage: ./send_msg <ipaddresss> <command> <frame w/ values space separated> | ||
| 4 | """ | ||
| 5 | import sys | ||
| 6 | |||
| 7 | from eventmq.sender import Sender | ||
| 8 | from eventmq.client.messages import send_request | ||
| 9 | |||
| 10 | |||
| 11 | if __name__ == "__main__": | ||
| 12 | s = Sender() | ||
| 13 | s.connect(sys.argv[1]) | ||
| 14 | |||
| 15 | msg = ['run', { | ||
| 16 | 'path': 'eventmq.scheduler', | ||
| 17 | 'callable': 'test_job', | ||
| 18 | 'class_args': (), | ||
| 19 | 'class_kwargs': {}, | ||
| 20 | 'args': (), | ||
| 21 | 'kwargs': {} | ||
| 22 | }] | ||
| 23 | |||
| 24 | send_request(s, msg, guarantee=True, reply_requested=True) | ||
diff --git a/eventmq/jobmanager.py b/eventmq/jobmanager.py index 52b3c7e..6e4cd5e 100644 --- a/eventmq/jobmanager.py +++ b/eventmq/jobmanager.py | |||
| @@ -17,7 +17,8 @@ | |||
| 17 | ================================ | 17 | ================================ |
| 18 | Ensures things about jobs and spawns the actual tasks | 18 | Ensures things about jobs and spawns the actual tasks |
| 19 | """ | 19 | """ |
| 20 | from json import loads as serializer | 20 | from json import loads as deserializer |
| 21 | from json import dumps as serializer | ||
| 21 | import logging | 22 | import logging |
| 22 | import signal | 23 | import signal |
| 23 | 24 | ||
| @@ -67,14 +68,13 @@ class JobManager(HeartbeatMixin, EMQPService): | |||
| 67 | 68 | ||
| 68 | #: keep track of workers | 69 | #: keep track of workers |
| 69 | concurrent_jobs = kwargs.pop('concurrent_jobs', None) | 70 | concurrent_jobs = kwargs.pop('concurrent_jobs', None) |
| 70 | if concurrent_jobs is None: | 71 | if concurrent_jobs is not None: |
| 71 | concurrent_jobs = conf.CONCURRENT_JOBS | 72 | conf.CONCURRENT_JOBS = concurrent_jobs |
| 72 | self.workers = Pool(processes=concurrent_jobs) | ||
| 73 | 73 | ||
| 74 | #: List of queues that this job manager is listening on | 74 | #: List of queues that this job manager is listening on |
| 75 | self.queues = kwargs.pop('queues', None) | 75 | self.queues = kwargs.pop('queues', None) |
| 76 | if self.queues is None: | 76 | if self.queues is None: |
| 77 | self.quques = conf.QUEUES | 77 | self.queues = conf.QUEUES |
| 78 | 78 | ||
| 79 | if not kwargs.pop('skip_signal', False): | 79 | if not kwargs.pop('skip_signal', False): |
| 80 | # handle any sighups by reloading config | 80 | # handle any sighups by reloading config |
| @@ -90,12 +90,23 @@ class JobManager(HeartbeatMixin, EMQPService): | |||
| 90 | 90 | ||
| 91 | self._setup() | 91 | self._setup() |
| 92 | 92 | ||
| 93 | @property | ||
| 94 | def workers(self): | ||
| 95 | if not hasattr(self, '_workers'): | ||
| 96 | self._workers = Pool(processes=conf.CONCURRENT_JOBS) | ||
| 97 | elif self._workers.processes != conf.CONCURRENT_JOBS: | ||
| 98 | self._workers.close() | ||
| 99 | self._workers = Pool(processes=conf.CONCURRENT_JOBS) | ||
| 100 | |||
| 101 | return self._workers | ||
| 102 | |||
| 93 | def _start_event_loop(self): | 103 | def _start_event_loop(self): |
| 94 | """ | 104 | """ |
| 95 | Starts the actual event loop. Usually called by :meth:`start` | 105 | Starts the actual event loop. Usually called by :meth:`start` |
| 96 | """ | 106 | """ |
| 97 | # Acknowledgment has come | 107 | # Acknowledgment has come |
| 98 | # Send a READY for each available worker | 108 | # Send a READY for each available worker |
| 109 | |||
| 99 | for i in range(0, conf.CONCURRENT_JOBS): | 110 | for i in range(0, conf.CONCURRENT_JOBS): |
| 100 | self.send_ready() | 111 | self.send_ready() |
| 101 | 112 | ||
| @@ -145,16 +156,26 @@ class JobManager(HeartbeatMixin, EMQPService): | |||
| 145 | # s_ indicates the string path vs the actual module and class | 156 | # s_ indicates the string path vs the actual module and class |
| 146 | # queue_name = msg[0] | 157 | # queue_name = msg[0] |
| 147 | 158 | ||
| 148 | # run callable | 159 | # Parse REQUEST message values |
| 149 | payload = serializer(msg[2]) | 160 | headers = msg[1] |
| 150 | # subcmd = payload[0] | 161 | payload = deserializer(msg[2]) |
| 151 | params = payload[1] | 162 | params = payload[1] |
| 152 | 163 | ||
| 164 | if 'reply-requested' in headers: | ||
| 165 | callback = self.worker_done_with_reply | ||
| 166 | else: | ||
| 167 | callback = self.worker_done | ||
| 168 | |||
| 169 | # kick off the job asynchronously with an appropiate callback | ||
| 153 | self.workers.apply_async(func=worker.run, | 170 | self.workers.apply_async(func=worker.run, |
| 154 | args=(params,), | 171 | args=(params, msgid), |
| 155 | callback=self.worker_done) | 172 | callback=callback) |
| 173 | |||
| 174 | def worker_done_with_reply(self, msgid): | ||
| 175 | self.send_reply(msgid) | ||
| 176 | self.send_ready() | ||
| 156 | 177 | ||
| 157 | def worker_done(self, result): | 178 | def worker_done(self, msgid): |
| 158 | self.send_ready() | 179 | self.send_ready() |
| 159 | 180 | ||
| 160 | def send_ready(self): | 181 | def send_ready(self): |
| @@ -164,6 +185,23 @@ class JobManager(HeartbeatMixin, EMQPService): | |||
| 164 | """ | 185 | """ |
| 165 | sendmsg(self.outgoing, 'READY') | 186 | sendmsg(self.outgoing, 'READY') |
| 166 | 187 | ||
| 188 | def send_reply(self, res): | ||
| 189 | """ | ||
| 190 | Sends an REPLY response | ||
| 191 | |||
| 192 | Args: | ||
| 193 | socket (socket): The socket to use for this ack | ||
| 194 | recipient (str): The recipient id for the ack | ||
| 195 | msgid: The unique id that we are acknowledging | ||
| 196 | """ | ||
| 197 | msgid = res[0] | ||
| 198 | |||
| 199 | reply = res[1] | ||
| 200 | |||
| 201 | reply = serializer(reply) | ||
| 202 | |||
| 203 | sendmsg(self.outgoing, 'REPLY', [reply, msgid]) | ||
| 204 | |||
| 167 | def on_heartbeat(self, msgid, message): | 205 | def on_heartbeat(self, msgid, message): |
| 168 | """ | 206 | """ |
| 169 | a placeholder for a noop command. The actual 'logic' for HEARTBEAT is | 207 | a placeholder for a noop command. The actual 'logic' for HEARTBEAT is |
diff --git a/eventmq/router.py b/eventmq/router.py index fbd8d80..7dd2471 100644 --- a/eventmq/router.py +++ b/eventmq/router.py | |||
| @@ -105,6 +105,12 @@ class Router(HeartbeatMixin): | |||
| 105 | #: } | 105 | #: } |
| 106 | self.schedulers = {} | 106 | self.schedulers = {} |
| 107 | 107 | ||
| 108 | |||
| 109 | #: Latency tracking dictionary | ||
| 110 | #: Key: msgid of message each REQUEST received and forwarded to a worker | ||
| 111 | #: Value: (timestamp, queue_name) | ||
| 112 | self.job_latencies = {} | ||
| 113 | |||
| 108 | #: Set to True when the router should die. | 114 | #: Set to True when the router should die. |
| 109 | self.received_disconnect = False | 115 | self.received_disconnect = False |
| 110 | 116 | ||
| @@ -270,6 +276,25 @@ class Router(HeartbeatMixin): | |||
| 270 | self.add_scheduler(sender) | 276 | self.add_scheduler(sender) |
| 271 | self.send_ack(self.incoming, sender, msgid) | 277 | self.send_ack(self.incoming, sender, msgid) |
| 272 | 278 | ||
| 279 | def on_reply(self, sender, msgid, msg): | ||
| 280 | """ | ||
| 281 | Handles an REPLY message. Replies are sent by the worker for latanecy | ||
| 282 | measurements | ||
| 283 | """ | ||
| 284 | |||
| 285 | orig_msgid = msg[1] | ||
| 286 | logger.info('Received REPLY from {} (msgid: {}, ACK msgid: {})'.format( | ||
| 287 | sender, msgid, orig_msgid)) | ||
| 288 | |||
| 289 | if orig_msgid in self.job_latencies: | ||
| 290 | logger.info("Completed {queue} job with msgid: {msgid} in " | ||
| 291 | "{time:.2f}ms".\ | ||
| 292 | format(queue=self.job_latencies[orig_msgid][1], | ||
| 293 | msgid=orig_msgid, | ||
| 294 | time=(monotonic()-self.job_latencies[orig_msgid][0])*1000.0)) | ||
| 295 | del self.job_latencies[orig_msgid] | ||
| 296 | |||
| 297 | |||
| 273 | def on_disconnect(self, msgid, msg): | 298 | def on_disconnect(self, msgid, msg): |
| 274 | # Loops event loops should check for this and break out | 299 | # Loops event loops should check for this and break out |
| 275 | self.received_disconnect = True | 300 | self.received_disconnect = True |
| @@ -364,6 +389,7 @@ class Router(HeartbeatMixin): | |||
| 364 | try: | 389 | try: |
| 365 | # Rebuild the message to be sent to the worker. fwdmsg will | 390 | # Rebuild the message to be sent to the worker. fwdmsg will |
| 366 | # properly address the message. | 391 | # properly address the message. |
| 392 | self.job_latencies[msgid] = (monotonic(), queue_name) | ||
| 367 | fwdmsg(self.outgoing, worker_addr, ['', constants.PROTOCOL_VERSION, | 393 | fwdmsg(self.outgoing, worker_addr, ['', constants.PROTOCOL_VERSION, |
| 368 | 'REQUEST', msgid, ] + msg) | 394 | 'REQUEST', msgid, ] + msg) |
| 369 | self.workers[worker_addr]['available_slots'] -= 1 | 395 | self.workers[worker_addr]['available_slots'] -= 1 |
diff --git a/eventmq/tests/test_jobmanager.py b/eventmq/tests/test_jobmanager.py index 6d3c663..483dde1 100644 --- a/eventmq/tests/test_jobmanager.py +++ b/eventmq/tests/test_jobmanager.py | |||
| @@ -80,7 +80,7 @@ class TestCase(unittest.TestCase): | |||
| 80 | 80 | ||
| 81 | jm.on_request(_msgid, _msg) | 81 | jm.on_request(_msgid, _msg) |
| 82 | apply_async_mock.assert_called_with( | 82 | apply_async_mock.assert_called_with( |
| 83 | args=({'a': 1},), | 83 | args=({'a': 1}, _msgid), |
| 84 | callback=jm.worker_done, | 84 | callback=jm.worker_done, |
| 85 | func=run_mock) | 85 | func=run_mock) |
| 86 | 86 | ||
diff --git a/eventmq/worker.py b/eventmq/worker.py index 78ea97a..284f478 100644 --- a/eventmq/worker.py +++ b/eventmq/worker.py | |||
| @@ -26,7 +26,7 @@ from . import log | |||
| 26 | logger = log.setup_logger(__name__) | 26 | logger = log.setup_logger(__name__) |
| 27 | 27 | ||
| 28 | 28 | ||
| 29 | def run(payload): | 29 | def run(payload, msgid): |
| 30 | """ | 30 | """ |
| 31 | process a run message and execute a job | 31 | process a run message and execute a job |
| 32 | 32 | ||
| @@ -47,7 +47,7 @@ def run(payload): | |||
| 47 | reload(package) | 47 | reload(package) |
| 48 | except Exception as e: | 48 | except Exception as e: |
| 49 | logger.exception('Error importing module: {}'.format(str(e))) | 49 | logger.exception('Error importing module: {}'.format(str(e))) |
| 50 | return 'DONE' | 50 | return (msgid, str(e)) |
| 51 | 51 | ||
| 52 | if s_cls: | 52 | if s_cls: |
| 53 | cls = getattr(package, s_cls) | 53 | cls = getattr(package, s_cls) |
| @@ -70,7 +70,7 @@ def run(payload): | |||
| 70 | callable_ = getattr(obj, s_callable) | 70 | callable_ = getattr(obj, s_callable) |
| 71 | except AttributeError as e: | 71 | except AttributeError as e: |
| 72 | logger.exception('Error getting callable: {}'.format(str(e))) | 72 | logger.exception('Error getting callable: {}'.format(str(e))) |
| 73 | return 'DONE' | 73 | return (msgid, str(e)) |
| 74 | 74 | ||
| 75 | if "args" in payload: | 75 | if "args" in payload: |
| 76 | args = payload["args"] | 76 | args = payload["args"] |
| @@ -83,10 +83,11 @@ def run(payload): | |||
| 83 | kwargs = {} | 83 | kwargs = {} |
| 84 | 84 | ||
| 85 | try: | 85 | try: |
| 86 | callable_(*args, **kwargs) | 86 | r = callable_(*args, **kwargs) |
| 87 | return (msgid, r) | ||
| 87 | except Exception as e: | 88 | except Exception as e: |
| 88 | logger.exception(e) | 89 | logger.exception(e) |
| 89 | return 'DONE' | 90 | return (msgid, str(e)) |
| 90 | 91 | ||
| 91 | # Signal that we're done with this job | 92 | # Signal that we're done with this job |
| 92 | return 'DONE' | 93 | return (msgid, '') |