diff options
| author | David Hurst | 2017-06-27 12:08:11 -0600 |
|---|---|---|
| committer | GitHub | 2017-06-27 12:08:11 -0600 |
| commit | af24bcedc49a59e673c4898e28c5e86e9d070de3 (patch) | |
| tree | ae8a204f85ac0727153abb97612400998560849e | |
| parent | f3da7ca5ca56d2ab6ee25ea616cde75084e05158 (diff) | |
| parent | 1ba238e51f1c523af47e9f6aaeacf3d8f8e83279 (diff) | |
| download | eventmq-af24bcedc49a59e673c4898e28c5e86e9d070de3.tar.gz eventmq-af24bcedc49a59e673c4898e28c5e86e9d070de3.zip | |
Merge pull request #48 from sideshowdave7/feature/backport_processed_message_counts
Feature/backport processed message counts
| -rw-r--r-- | eventmq/router.py | 31 | ||||
| -rw-r--r-- | eventmq/tests/test_router.py | 5 |
2 files changed, 24 insertions, 12 deletions
diff --git a/eventmq/router.py b/eventmq/router.py index 2943e3c..0d05468 100644 --- a/eventmq/router.py +++ b/eventmq/router.py | |||
| @@ -94,6 +94,13 @@ class Router(HeartbeatMixin): | |||
| 94 | #: workers available to take the job | 94 | #: workers available to take the job |
| 95 | self.waiting_messages = {} | 95 | self.waiting_messages = {} |
| 96 | 96 | ||
| 97 | # Key: Queue.name, Value: # of messages sent to workers on that queue | ||
| 98 | # Includes REQUESTS in flight but not REQUESTS queued | ||
| 99 | self.processed_message_counts = {} | ||
| 100 | |||
| 101 | # Same as above but Key: Worker.uuid | ||
| 102 | self.processed_message_counts_by_worker = {} | ||
| 103 | |||
| 97 | #: Tracks the last time the scheduler queue was cleaned out of dead | 104 | #: Tracks the last time the scheduler queue was cleaned out of dead |
| 98 | #: schedulers | 105 | #: schedulers |
| 99 | self._meta['last_scheduler_cleanup'] = 0 | 106 | self._meta['last_scheduler_cleanup'] = 0 |
| @@ -118,8 +125,6 @@ class Router(HeartbeatMixin): | |||
| 118 | #: Excecuted function tracking dictionary | 125 | #: Excecuted function tracking dictionary |
| 119 | #: Key: msgid of msg each REQUEST received and forwarded to a worker | 126 | #: Key: msgid of msg each REQUEST received and forwarded to a worker |
| 120 | #: Value: (function_name, queue_name) | 127 | #: Value: (function_name, queue_name) |
| 121 | self.executed_functions = {} | ||
| 122 | |||
| 123 | #: Set to True when the router should die. | 128 | #: Set to True when the router should die. |
| 124 | self.received_disconnect = False | 129 | self.received_disconnect = False |
| 125 | 130 | ||
| @@ -484,16 +489,20 @@ class Router(HeartbeatMixin): | |||
| 484 | 489 | ||
| 485 | try: | 490 | try: |
| 486 | # Check if msg type is for executing function | 491 | # Check if msg type is for executing function |
| 487 | if 'run' in msg and len(msg) > 2: | ||
| 488 | args_list = json.loads(msg[2]) | ||
| 489 | args_dict = args_list[1] | ||
| 490 | function = args_dict.get('callable') | ||
| 491 | if function: | ||
| 492 | self.executed_functions[msgid] = (function, queue_name) | ||
| 493 | self.job_latencies[msgid] = (monotonic(), queue_name) | 492 | self.job_latencies[msgid] = (monotonic(), queue_name) |
| 494 | 493 | ||
| 495 | # Rebuild the message to be sent to the worker. fwdmsg will | 494 | # Rebuild the message to be sent to the worker. fwdmsg will |
| 496 | # properly address the message. | 495 | # properly address the message. |
| 496 | if queue_name not in self.processed_message_counts: | ||
| 497 | self.processed_message_counts[queue_name] = 1 | ||
| 498 | else: | ||
| 499 | self.processed_message_counts[queue_name] += 1 | ||
| 500 | |||
| 501 | if queue_name not in self.processed_message_counts_by_worker: | ||
| 502 | self.processed_message_counts_by_worker[worker_addr] = 1 | ||
| 503 | else: | ||
| 504 | self.processed_message_counts_by_worker[worker_addr] += 1 | ||
| 505 | |||
| 497 | fwdmsg(self.outgoing, worker_addr, ['', constants.PROTOCOL_VERSION, | 506 | fwdmsg(self.outgoing, worker_addr, ['', constants.PROTOCOL_VERSION, |
| 498 | 'REQUEST', msgid, ] + msg) | 507 | 'REQUEST', msgid, ] + msg) |
| 499 | 508 | ||
| @@ -876,8 +885,10 @@ class Router(HeartbeatMixin): | |||
| 876 | (str) Serialized information about the current state of the router. | 885 | (str) Serialized information about the current state of the router. |
| 877 | """ | 886 | """ |
| 878 | return json.dumps({ | 887 | return json.dumps({ |
| 879 | 'job_latencies': self.job_latencies, | 888 | 'job_latencies_count': len(self.job_latencies), |
| 880 | 'executed_functions': self.executed_functions, | 889 | 'processed_messages': self.processed_message_counts, |
| 890 | 'processed_messages_by_worker': | ||
| 891 | self.processed_message_counts_by_worker, | ||
| 881 | 'waiting_message_counts': [ | 892 | 'waiting_message_counts': [ |
| 882 | '{}: {}'. | 893 | '{}: {}'. |
| 883 | format(q, | 894 | format(q, |
diff --git a/eventmq/tests/test_router.py b/eventmq/tests/test_router.py index fdb02d0..e9efa25 100644 --- a/eventmq/tests/test_router.py +++ b/eventmq/tests/test_router.py | |||
| @@ -776,8 +776,9 @@ class TestCase(unittest.TestCase): | |||
| 776 | # correctly and what not. | 776 | # correctly and what not. |
| 777 | self.assertEqual( | 777 | self.assertEqual( |
| 778 | json.loads(json.dumps({ | 778 | json.loads(json.dumps({ |
| 779 | 'job_latencies': self.router.job_latencies, | 779 | 'job_latencies_count': len(self.router.job_latencies), |
| 780 | 'executed_functions': self.router.executed_functions, | 780 | 'processed_messages': {}, |
| 781 | 'processed_messages_by_worker': {}, | ||
| 781 | 'waiting_message_counts': [ | 782 | 'waiting_message_counts': [ |
| 782 | '{}: {}'.format( | 783 | '{}: {}'.format( |
| 783 | q, | 784 | q, |