diff options
| author | lwhite | 2020-03-06 09:50:07 -0700 |
|---|---|---|
| committer | Will Smith | 2020-03-21 17:03:47 -0600 |
| commit | 132fb46f7ee28e83eeb1ec08ee873b04029d102f (patch) | |
| tree | ed5fd151cde5e3a6fdf7047e5af12145e1652f03 | |
| parent | 71daa0f1aab54bf24ab8cb97fb999d2874e9c5ee (diff) | |
| download | eventmq-132fb46f7ee28e83eeb1ec08ee873b04029d102f.tar.gz eventmq-132fb46f7ee28e83eeb1ec08ee873b04029d102f.zip | |
update get_status() to return a list of job latencies and queue stats
| -rw-r--r-- | eventmq/router.py | 59 |
1 files changed, 46 insertions, 13 deletions
diff --git a/eventmq/router.py b/eventmq/router.py index 3c141c3..e1967bf 100644 --- a/eventmq/router.py +++ b/eventmq/router.py | |||
| @@ -51,6 +51,7 @@ class Router(HeartbeatMixin): | |||
| 51 | """ | 51 | """ |
| 52 | A simple router of messages | 52 | A simple router of messages |
| 53 | """ | 53 | """ |
| 54 | |||
| 54 | def __init__(self, *args, **kwargs): | 55 | def __init__(self, *args, **kwargs): |
| 55 | super(Router, self).__init__(*args, **kwargs) # Creates _meta | 56 | super(Router, self).__init__(*args, **kwargs) # Creates _meta |
| 56 | 57 | ||
| @@ -532,7 +533,7 @@ class Router(HeartbeatMixin): | |||
| 532 | # Recursively try again. TODO: are there better options? | 533 | # Recursively try again. TODO: are there better options? |
| 533 | self.process_client_message( | 534 | self.process_client_message( |
| 534 | [sender, '', PROTOCOL_VERSION, 'REQUEST', msgid] + msg, | 535 | [sender, '', PROTOCOL_VERSION, 'REQUEST', msgid] + msg, |
| 535 | depth=depth+1) | 536 | depth=depth + 1) |
| 536 | 537 | ||
| 537 | def clean_up_dead_workers(self): | 538 | def clean_up_dead_workers(self): |
| 538 | """ | 539 | """ |
| @@ -601,7 +602,7 @@ class Router(HeartbeatMixin): | |||
| 601 | self.queues[q[1]] = self.prioritize_queue_list(self.queues[q[1]]) | 602 | self.queues[q[1]] = self.prioritize_queue_list(self.queues[q[1]]) |
| 602 | 603 | ||
| 603 | logger.debug('Added worker {} to the queues {}'.format( | 604 | logger.debug('Added worker {} to the queues {}'.format( |
| 604 | worker_id, queues)) | 605 | worker_id, queues)) |
| 605 | 606 | ||
| 606 | def get_available_worker(self, queue_name=conf.DEFAULT_QUEUE_NAME): | 607 | def get_available_worker(self, queue_name=conf.DEFAULT_QUEUE_NAME): |
| 607 | """ | 608 | """ |
| @@ -799,7 +800,7 @@ class Router(HeartbeatMixin): | |||
| 799 | except exceptions.PeerGoneAwayError: | 800 | except exceptions.PeerGoneAwayError: |
| 800 | logger.debug("Scheduler {} has unexpectedly gone away. Trying " | 801 | logger.debug("Scheduler {} has unexpectedly gone away. Trying " |
| 801 | "another scheduler.".format(scheduler_addr)) | 802 | "another scheduler.".format(scheduler_addr)) |
| 802 | self.process_client_message(original_msg[1:], depth+1) | 803 | self.process_client_message(original_msg[1:], depth + 1) |
| 803 | 804 | ||
| 804 | elif command == "UNSCHEDULE": | 805 | elif command == "UNSCHEDULE": |
| 805 | # Forward the unschedule message to all schedulers | 806 | # Forward the unschedule message to all schedulers |
| @@ -817,7 +818,7 @@ class Router(HeartbeatMixin): | |||
| 817 | logger.debug("Scheduler {} has unexpectedly gone away." | 818 | logger.debug("Scheduler {} has unexpectedly gone away." |
| 818 | " Schedule may still exist.". | 819 | " Schedule may still exist.". |
| 819 | format(scheduler_addr)) | 820 | format(scheduler_addr)) |
| 820 | self.process_client_message(original_msg[1:], depth+1) | 821 | self.process_client_message(original_msg[1:], depth + 1) |
| 821 | 822 | ||
| 822 | elif command == DISCONNECT: | 823 | elif command == DISCONNECT: |
| 823 | self.on_disconnect(msgid, msg) | 824 | self.on_disconnect(msgid, msg) |
| @@ -909,16 +910,48 @@ class Router(HeartbeatMixin): | |||
| 909 | Return | 910 | Return |
| 910 | (str) Serialized information about the current state of the router. | 911 | (str) Serialized information about the current state of the router. |
| 911 | """ | 912 | """ |
| 913 | queue_latency_list = {} | ||
| 914 | queue_latency_count = {} | ||
| 915 | queue_max_latency_list = {} | ||
| 916 | queue_waiting_list = {} | ||
| 917 | |||
| 918 | now = monotonic() | ||
| 919 | |||
| 920 | for job in self.job_latencies: | ||
| 921 | queue = self.job_latencies[job][1] | ||
| 922 | latency = self.job_latencies[job][0] | ||
| 923 | |||
| 924 | if queue not in queue_latency_list: | ||
| 925 | queue_latency_list[queue] = latency | ||
| 926 | queue_latency_count[queue] = 1 | ||
| 927 | else: | ||
| 928 | queue_latency_list[queue] += latency | ||
| 929 | queue_latency_count[queue] += 1 | ||
| 930 | |||
| 931 | if queue not in queue_max_latency_list: | ||
| 932 | queue_max_latency_list[queue] = latency | ||
| 933 | else: | ||
| 934 | if queue_max_latency_list[queue] > latency: | ||
| 935 | queue_max_latency_list[queue] = latency | ||
| 936 | |||
| 937 | for queue in queue_latency_list: | ||
| 938 | queue_latency_list[queue] = int( | ||
| 939 | (now - (queue_latency_list[queue] / max(queue_latency_count[queue], 1))) * 1000) | ||
| 940 | |||
| 941 | for queue in queue_max_latency_list: | ||
| 942 | queue_max_latency_list[queue] = int( | ||
| 943 | (now - queue_max_latency_list[queue]) * 1000) | ||
| 944 | |||
| 945 | for queue in self.waiting_messages: | ||
| 946 | queue_waiting_list[queue] = len(self.waiting_messages[queue]) | ||
| 947 | |||
| 912 | return json.dumps({ | 948 | return json.dumps({ |
| 913 | 'job_latencies_count': len(self.job_latencies), | 949 | 'inflight_messages_by_queue': queue_latency_count, |
| 914 | 'processed_messages': self.processed_message_counts, | 950 | 'latency_messages_by_queue': queue_latency_list, |
| 915 | 'processed_messages_by_worker': | 951 | 'max_latency_messages_by_queue': queue_max_latency_list, |
| 916 | self.processed_message_counts_by_worker, | 952 | 'waiting_messages_by_queue': queue_waiting_list, |
| 917 | 'waiting_message_counts': [ | 953 | 'processed_messages_by_queue': self.processed_message_counts, |
| 918 | '{}: {}'. | 954 | 'processed_messages_by_worker': self.processed_message_counts_by_worker |
| 919 | format(q, | ||
| 920 | len(self.waiting_messages[q])) | ||
| 921 | for q in self.waiting_messages] | ||
| 922 | }) | 955 | }) |
| 923 | 956 | ||
| 924 | def get_workers_status(self): | 957 | def get_workers_status(self): |