aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorjason2020-03-23 12:49:20 -0600
committerGitHub2020-03-23 12:49:20 -0600
commitfa67f83c0e4d1c6d91e87d1c8afc6149430adaf0 (patch)
treed488aaea37defaa72f5e4ea218fb63a3b2adf733
parent71daa0f1aab54bf24ab8cb97fb999d2874e9c5ee (diff)
parente85723a8891e3cd32eb295f773e32162caae97c7 (diff)
downloadeventmq-fa67f83c0e4d1c6d91e87d1c8afc6149430adaf0.tar.gz
eventmq-fa67f83c0e4d1c6d91e87d1c8afc6149430adaf0.zip
Merge pull request #78 from undertakingyou/feature/metrics-updates
Feature/metrics updates
-rw-r--r--eventmq/router.py60
-rw-r--r--eventmq/tests/test_router.py29
2 files changed, 64 insertions, 25 deletions
diff --git a/eventmq/router.py b/eventmq/router.py
index 3c141c3..49ffb3a 100644
--- a/eventmq/router.py
+++ b/eventmq/router.py
@@ -51,6 +51,7 @@ class Router(HeartbeatMixin):
51 """ 51 """
52 A simple router of messages 52 A simple router of messages
53 """ 53 """
54
54 def __init__(self, *args, **kwargs): 55 def __init__(self, *args, **kwargs):
55 super(Router, self).__init__(*args, **kwargs) # Creates _meta 56 super(Router, self).__init__(*args, **kwargs) # Creates _meta
56 57
@@ -532,7 +533,7 @@ class Router(HeartbeatMixin):
532 # Recursively try again. TODO: are there better options? 533 # Recursively try again. TODO: are there better options?
533 self.process_client_message( 534 self.process_client_message(
534 [sender, '', PROTOCOL_VERSION, 'REQUEST', msgid] + msg, 535 [sender, '', PROTOCOL_VERSION, 'REQUEST', msgid] + msg,
535 depth=depth+1) 536 depth=depth + 1)
536 537
537 def clean_up_dead_workers(self): 538 def clean_up_dead_workers(self):
538 """ 539 """
@@ -601,7 +602,7 @@ class Router(HeartbeatMixin):
601 self.queues[q[1]] = self.prioritize_queue_list(self.queues[q[1]]) 602 self.queues[q[1]] = self.prioritize_queue_list(self.queues[q[1]])
602 603
603 logger.debug('Added worker {} to the queues {}'.format( 604 logger.debug('Added worker {} to the queues {}'.format(
604 worker_id, queues)) 605 worker_id, queues))
605 606
606 def get_available_worker(self, queue_name=conf.DEFAULT_QUEUE_NAME): 607 def get_available_worker(self, queue_name=conf.DEFAULT_QUEUE_NAME):
607 """ 608 """
@@ -799,7 +800,7 @@ class Router(HeartbeatMixin):
799 except exceptions.PeerGoneAwayError: 800 except exceptions.PeerGoneAwayError:
800 logger.debug("Scheduler {} has unexpectedly gone away. Trying " 801 logger.debug("Scheduler {} has unexpectedly gone away. Trying "
801 "another scheduler.".format(scheduler_addr)) 802 "another scheduler.".format(scheduler_addr))
802 self.process_client_message(original_msg[1:], depth+1) 803 self.process_client_message(original_msg[1:], depth + 1)
803 804
804 elif command == "UNSCHEDULE": 805 elif command == "UNSCHEDULE":
805 # Forward the unschedule message to all schedulers 806 # Forward the unschedule message to all schedulers
@@ -817,7 +818,7 @@ class Router(HeartbeatMixin):
817 logger.debug("Scheduler {} has unexpectedly gone away." 818 logger.debug("Scheduler {} has unexpectedly gone away."
818 " Schedule may still exist.". 819 " Schedule may still exist.".
819 format(scheduler_addr)) 820 format(scheduler_addr))
820 self.process_client_message(original_msg[1:], depth+1) 821 self.process_client_message(original_msg[1:], depth + 1)
821 822
822 elif command == DISCONNECT: 823 elif command == DISCONNECT:
823 self.on_disconnect(msgid, msg) 824 self.on_disconnect(msgid, msg)
@@ -909,16 +910,49 @@ class Router(HeartbeatMixin):
909 Return 910 Return
910 (str) Serialized information about the current state of the router. 911 (str) Serialized information about the current state of the router.
911 """ 912 """
913 queue_latency_list = {}
914 queue_latency_count = {}
915 queue_max_latency_list = {}
916 queue_waiting_list = {}
917
918 now = monotonic()
919
920 for job in self.job_latencies:
921 queue = self.job_latencies[job][1]
922 latency = self.job_latencies[job][0]
923
924 if queue not in queue_latency_list:
925 queue_latency_list[queue] = latency
926 queue_latency_count[queue] = 1
927 else:
928 queue_latency_list[queue] += latency
929 queue_latency_count[queue] += 1
930
931 if queue not in queue_max_latency_list:
932 queue_max_latency_list[queue] = latency
933 else:
934 if queue_max_latency_list[queue] > latency:
935 queue_max_latency_list[queue] = latency
936
937 for queue in queue_latency_list:
938 queue_latency_list[queue] = int(
939 (now - (queue_latency_list[queue] /
940 max(queue_latency_count[queue], 1))) * 1000)
941
942 for queue in queue_max_latency_list:
943 queue_max_latency_list[queue] = int(
944 (now - queue_max_latency_list[queue]) * 1000)
945
946 for queue in self.waiting_messages:
947 queue_waiting_list[queue] = len(self.waiting_messages[queue])
948
912 return json.dumps({ 949 return json.dumps({
913 'job_latencies_count': len(self.job_latencies), 950 'inflight_messages_by_queue': queue_latency_count,
914 'processed_messages': self.processed_message_counts, 951 'latency_messages_by_queue': queue_latency_list,
915 'processed_messages_by_worker': 952 'max_latency_messages_by_queue': queue_max_latency_list,
916 self.processed_message_counts_by_worker, 953 'waiting_messages_by_queue': queue_waiting_list,
917 'waiting_message_counts': [ 954 'processed_messages_by_queue': self.processed_message_counts,
918 '{}: {}'. 955 'processed_messages_by_worker': self.processed_message_counts_by_worker # noqa
919 format(q,
920 len(self.waiting_messages[q]))
921 for q in self.waiting_messages]
922 }) 956 })
923 957
924 def get_workers_status(self): 958 def get_workers_status(self):
diff --git a/eventmq/tests/test_router.py b/eventmq/tests/test_router.py
index 0a315e3..73612e7 100644
--- a/eventmq/tests/test_router.py
+++ b/eventmq/tests/test_router.py
@@ -464,6 +464,8 @@ class TestCase(unittest.TestCase):
464 queue2_id = 'jimjam' 464 queue2_id = 'jimjam'
465 nonexistent_queue1 = 'nonexistent' 465 nonexistent_queue1 = 'nonexistent'
466 466
467 t = monotonic()
468
467 # To ensure the value was changed later because monotonic() is hard to 469 # To ensure the value was changed later because monotonic() is hard to
468 # mock 470 # mock
469 self.assertEqual(self.router._meta['last_worker_cleanup'], 0) 471 self.assertEqual(self.router._meta['last_worker_cleanup'], 0)
@@ -479,20 +481,20 @@ class TestCase(unittest.TestCase):
479 # 3 in the future 481 # 3 in the future
480 worker1_id: { 482 worker1_id: {
481 'queues': [(10, queue1_id), ], 483 'queues': [(10, queue1_id), ],
482 'hb': monotonic() + 3, 484 'hb': t + 3,
483 'available_slots': 0, 485 'available_slots': 0,
484 }, 486 },
485 # below the timeout 487 # below the timeout
486 488
487 worker2_id: { 489 worker2_id: {
488 'queues': [(10, queue2_id), (0, queue1_id)], 490 'queues': [(10, queue2_id), (0, queue1_id)],
489 'hb': 0, 491 'hb': t - 2,
490 'available_slots': 2, 492 'available_slots': 2,
491 }, 493 },
492 # below the timeout and a queue missing from self.router.queues 494 # below the timeout and a queue missing from self.router.queues
493 worker3_id: { 495 worker3_id: {
494 'queues': [(10, queue2_id), (3, nonexistent_queue1)], 496 'queues': [(10, queue2_id), (3, nonexistent_queue1)],
495 'hb': 0, 497 'hb': t - 2,
496 'available_slots': 0, 498 'available_slots': 0,
497 }, 499 },
498 } 500 }
@@ -778,16 +780,19 @@ class TestCase(unittest.TestCase):
778 780
779 # hacky, but the serialize/deserialize converts the keys to unicode 781 # hacky, but the serialize/deserialize converts the keys to unicode
780 # correctly and what not. 782 # correctly and what not.
783 expected_object = {
784 'inflight_messages_by_queue': {},
785 'latency_messages_by_queue': {},
786 'max_latency_messages_by_queue': {},
787 'processed_messages_by_queue': {},
788 'processed_messages_by_worker': {},
789 'waiting_messages_by_queue': {
790 q: len(self.router.waiting_messages[q])
791 for q in self.router.waiting_messages
792 }
793 }
781 self.assertEqual( 794 self.assertEqual(
782 json.loads(json.dumps({ 795 json.loads(json.dumps(expected_object)),
783 'job_latencies_count': len(self.router.job_latencies),
784 'processed_messages': {},
785 'processed_messages_by_worker': {},
786 'waiting_message_counts': [
787 '{}: {}'.format(
788 q,
789 len(self.router.waiting_messages[q])) for q in self.router.waiting_messages] # noqa
790 })),
791 json.loads(self.router.get_status())) 796 json.loads(self.router.get_status()))
792 797
793 self.assertEqual( 798 self.assertEqual(