aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorlwhite2020-03-06 09:50:07 -0700
committerWill Smith2020-03-21 17:03:47 -0600
commit132fb46f7ee28e83eeb1ec08ee873b04029d102f (patch)
treeed5fd151cde5e3a6fdf7047e5af12145e1652f03
parent71daa0f1aab54bf24ab8cb97fb999d2874e9c5ee (diff)
downloadeventmq-132fb46f7ee28e83eeb1ec08ee873b04029d102f.tar.gz
eventmq-132fb46f7ee28e83eeb1ec08ee873b04029d102f.zip
update get_status() to return a list of job latencies and queue stats
-rw-r--r--eventmq/router.py59
1 files changed, 46 insertions, 13 deletions
diff --git a/eventmq/router.py b/eventmq/router.py
index 3c141c3..e1967bf 100644
--- a/eventmq/router.py
+++ b/eventmq/router.py
@@ -51,6 +51,7 @@ class Router(HeartbeatMixin):
51 """ 51 """
52 A simple router of messages 52 A simple router of messages
53 """ 53 """
54
54 def __init__(self, *args, **kwargs): 55 def __init__(self, *args, **kwargs):
55 super(Router, self).__init__(*args, **kwargs) # Creates _meta 56 super(Router, self).__init__(*args, **kwargs) # Creates _meta
56 57
@@ -532,7 +533,7 @@ class Router(HeartbeatMixin):
532 # Recursively try again. TODO: are there better options? 533 # Recursively try again. TODO: are there better options?
533 self.process_client_message( 534 self.process_client_message(
534 [sender, '', PROTOCOL_VERSION, 'REQUEST', msgid] + msg, 535 [sender, '', PROTOCOL_VERSION, 'REQUEST', msgid] + msg,
535 depth=depth+1) 536 depth=depth + 1)
536 537
537 def clean_up_dead_workers(self): 538 def clean_up_dead_workers(self):
538 """ 539 """
@@ -601,7 +602,7 @@ class Router(HeartbeatMixin):
601 self.queues[q[1]] = self.prioritize_queue_list(self.queues[q[1]]) 602 self.queues[q[1]] = self.prioritize_queue_list(self.queues[q[1]])
602 603
603 logger.debug('Added worker {} to the queues {}'.format( 604 logger.debug('Added worker {} to the queues {}'.format(
604 worker_id, queues)) 605 worker_id, queues))
605 606
606 def get_available_worker(self, queue_name=conf.DEFAULT_QUEUE_NAME): 607 def get_available_worker(self, queue_name=conf.DEFAULT_QUEUE_NAME):
607 """ 608 """
@@ -799,7 +800,7 @@ class Router(HeartbeatMixin):
799 except exceptions.PeerGoneAwayError: 800 except exceptions.PeerGoneAwayError:
800 logger.debug("Scheduler {} has unexpectedly gone away. Trying " 801 logger.debug("Scheduler {} has unexpectedly gone away. Trying "
801 "another scheduler.".format(scheduler_addr)) 802 "another scheduler.".format(scheduler_addr))
802 self.process_client_message(original_msg[1:], depth+1) 803 self.process_client_message(original_msg[1:], depth + 1)
803 804
804 elif command == "UNSCHEDULE": 805 elif command == "UNSCHEDULE":
805 # Forward the unschedule message to all schedulers 806 # Forward the unschedule message to all schedulers
@@ -817,7 +818,7 @@ class Router(HeartbeatMixin):
817 logger.debug("Scheduler {} has unexpectedly gone away." 818 logger.debug("Scheduler {} has unexpectedly gone away."
818 " Schedule may still exist.". 819 " Schedule may still exist.".
819 format(scheduler_addr)) 820 format(scheduler_addr))
820 self.process_client_message(original_msg[1:], depth+1) 821 self.process_client_message(original_msg[1:], depth + 1)
821 822
822 elif command == DISCONNECT: 823 elif command == DISCONNECT:
823 self.on_disconnect(msgid, msg) 824 self.on_disconnect(msgid, msg)
@@ -909,16 +910,48 @@ class Router(HeartbeatMixin):
909 Return 910 Return
910 (str) Serialized information about the current state of the router. 911 (str) Serialized information about the current state of the router.
911 """ 912 """
913 queue_latency_list = {}
914 queue_latency_count = {}
915 queue_max_latency_list = {}
916 queue_waiting_list = {}
917
918 now = monotonic()
919
920 for job in self.job_latencies:
921 queue = self.job_latencies[job][1]
922 latency = self.job_latencies[job][0]
923
924 if queue not in queue_latency_list:
925 queue_latency_list[queue] = latency
926 queue_latency_count[queue] = 1
927 else:
928 queue_latency_list[queue] += latency
929 queue_latency_count[queue] += 1
930
931 if queue not in queue_max_latency_list:
932 queue_max_latency_list[queue] = latency
933 else:
934 if queue_max_latency_list[queue] > latency:
935 queue_max_latency_list[queue] = latency
936
937 for queue in queue_latency_list:
938 queue_latency_list[queue] = int(
939 (now - (queue_latency_list[queue] / max(queue_latency_count[queue], 1))) * 1000)
940
941 for queue in queue_max_latency_list:
942 queue_max_latency_list[queue] = int(
943 (now - queue_max_latency_list[queue]) * 1000)
944
945 for queue in self.waiting_messages:
946 queue_waiting_list[queue] = len(self.waiting_messages[queue])
947
912 return json.dumps({ 948 return json.dumps({
913 'job_latencies_count': len(self.job_latencies), 949 'inflight_messages_by_queue': queue_latency_count,
914 'processed_messages': self.processed_message_counts, 950 'latency_messages_by_queue': queue_latency_list,
915 'processed_messages_by_worker': 951 'max_latency_messages_by_queue': queue_max_latency_list,
916 self.processed_message_counts_by_worker, 952 'waiting_messages_by_queue': queue_waiting_list,
917 'waiting_message_counts': [ 953 'processed_messages_by_queue': self.processed_message_counts,
918 '{}: {}'. 954 'processed_messages_by_worker': self.processed_message_counts_by_worker
919 format(q,
920 len(self.waiting_messages[q]))
921 for q in self.waiting_messages]
922 }) 955 })
923 956
924 def get_workers_status(self): 957 def get_workers_status(self):