diff options
| author | jason | 2020-03-23 12:49:39 -0600 |
|---|---|---|
| committer | GitHub | 2020-03-23 12:49:39 -0600 |
| commit | 56dc571d4c1e60d275cb9da706ae1d1600205c60 (patch) | |
| tree | a0a5b3dd7a406afaae3635ca08b6793fa2014bbd | |
| parent | f504fe1dac22cd35b0eb2b96318ffa22d760e338 (diff) | |
| parent | fa67f83c0e4d1c6d91e87d1c8afc6149430adaf0 (diff) | |
| download | eventmq-56dc571d4c1e60d275cb9da706ae1d1600205c60.tar.gz eventmq-56dc571d4c1e60d275cb9da706ae1d1600205c60.zip | |
Merge branch 'master' into dependabot/pip/psutil-5.6.6dependabot/pip/psutil-5.6.6
| -rw-r--r-- | eventmq/router.py | 60 | ||||
| -rw-r--r-- | eventmq/tests/test_router.py | 29 |
2 files changed, 64 insertions, 25 deletions
diff --git a/eventmq/router.py b/eventmq/router.py index 3c141c3..49ffb3a 100644 --- a/eventmq/router.py +++ b/eventmq/router.py | |||
| @@ -51,6 +51,7 @@ class Router(HeartbeatMixin): | |||
| 51 | """ | 51 | """ |
| 52 | A simple router of messages | 52 | A simple router of messages |
| 53 | """ | 53 | """ |
| 54 | |||
| 54 | def __init__(self, *args, **kwargs): | 55 | def __init__(self, *args, **kwargs): |
| 55 | super(Router, self).__init__(*args, **kwargs) # Creates _meta | 56 | super(Router, self).__init__(*args, **kwargs) # Creates _meta |
| 56 | 57 | ||
| @@ -532,7 +533,7 @@ class Router(HeartbeatMixin): | |||
| 532 | # Recursively try again. TODO: are there better options? | 533 | # Recursively try again. TODO: are there better options? |
| 533 | self.process_client_message( | 534 | self.process_client_message( |
| 534 | [sender, '', PROTOCOL_VERSION, 'REQUEST', msgid] + msg, | 535 | [sender, '', PROTOCOL_VERSION, 'REQUEST', msgid] + msg, |
| 535 | depth=depth+1) | 536 | depth=depth + 1) |
| 536 | 537 | ||
| 537 | def clean_up_dead_workers(self): | 538 | def clean_up_dead_workers(self): |
| 538 | """ | 539 | """ |
| @@ -601,7 +602,7 @@ class Router(HeartbeatMixin): | |||
| 601 | self.queues[q[1]] = self.prioritize_queue_list(self.queues[q[1]]) | 602 | self.queues[q[1]] = self.prioritize_queue_list(self.queues[q[1]]) |
| 602 | 603 | ||
| 603 | logger.debug('Added worker {} to the queues {}'.format( | 604 | logger.debug('Added worker {} to the queues {}'.format( |
| 604 | worker_id, queues)) | 605 | worker_id, queues)) |
| 605 | 606 | ||
| 606 | def get_available_worker(self, queue_name=conf.DEFAULT_QUEUE_NAME): | 607 | def get_available_worker(self, queue_name=conf.DEFAULT_QUEUE_NAME): |
| 607 | """ | 608 | """ |
| @@ -799,7 +800,7 @@ class Router(HeartbeatMixin): | |||
| 799 | except exceptions.PeerGoneAwayError: | 800 | except exceptions.PeerGoneAwayError: |
| 800 | logger.debug("Scheduler {} has unexpectedly gone away. Trying " | 801 | logger.debug("Scheduler {} has unexpectedly gone away. Trying " |
| 801 | "another scheduler.".format(scheduler_addr)) | 802 | "another scheduler.".format(scheduler_addr)) |
| 802 | self.process_client_message(original_msg[1:], depth+1) | 803 | self.process_client_message(original_msg[1:], depth + 1) |
| 803 | 804 | ||
| 804 | elif command == "UNSCHEDULE": | 805 | elif command == "UNSCHEDULE": |
| 805 | # Forward the unschedule message to all schedulers | 806 | # Forward the unschedule message to all schedulers |
| @@ -817,7 +818,7 @@ class Router(HeartbeatMixin): | |||
| 817 | logger.debug("Scheduler {} has unexpectedly gone away." | 818 | logger.debug("Scheduler {} has unexpectedly gone away." |
| 818 | " Schedule may still exist.". | 819 | " Schedule may still exist.". |
| 819 | format(scheduler_addr)) | 820 | format(scheduler_addr)) |
| 820 | self.process_client_message(original_msg[1:], depth+1) | 821 | self.process_client_message(original_msg[1:], depth + 1) |
| 821 | 822 | ||
| 822 | elif command == DISCONNECT: | 823 | elif command == DISCONNECT: |
| 823 | self.on_disconnect(msgid, msg) | 824 | self.on_disconnect(msgid, msg) |
| @@ -909,16 +910,49 @@ class Router(HeartbeatMixin): | |||
| 909 | Return | 910 | Return |
| 910 | (str) Serialized information about the current state of the router. | 911 | (str) Serialized information about the current state of the router. |
| 911 | """ | 912 | """ |
| 913 | queue_latency_list = {} | ||
| 914 | queue_latency_count = {} | ||
| 915 | queue_max_latency_list = {} | ||
| 916 | queue_waiting_list = {} | ||
| 917 | |||
| 918 | now = monotonic() | ||
| 919 | |||
| 920 | for job in self.job_latencies: | ||
| 921 | queue = self.job_latencies[job][1] | ||
| 922 | latency = self.job_latencies[job][0] | ||
| 923 | |||
| 924 | if queue not in queue_latency_list: | ||
| 925 | queue_latency_list[queue] = latency | ||
| 926 | queue_latency_count[queue] = 1 | ||
| 927 | else: | ||
| 928 | queue_latency_list[queue] += latency | ||
| 929 | queue_latency_count[queue] += 1 | ||
| 930 | |||
| 931 | if queue not in queue_max_latency_list: | ||
| 932 | queue_max_latency_list[queue] = latency | ||
| 933 | else: | ||
| 934 | if queue_max_latency_list[queue] > latency: | ||
| 935 | queue_max_latency_list[queue] = latency | ||
| 936 | |||
| 937 | for queue in queue_latency_list: | ||
| 938 | queue_latency_list[queue] = int( | ||
| 939 | (now - (queue_latency_list[queue] / | ||
| 940 | max(queue_latency_count[queue], 1))) * 1000) | ||
| 941 | |||
| 942 | for queue in queue_max_latency_list: | ||
| 943 | queue_max_latency_list[queue] = int( | ||
| 944 | (now - queue_max_latency_list[queue]) * 1000) | ||
| 945 | |||
| 946 | for queue in self.waiting_messages: | ||
| 947 | queue_waiting_list[queue] = len(self.waiting_messages[queue]) | ||
| 948 | |||
| 912 | return json.dumps({ | 949 | return json.dumps({ |
| 913 | 'job_latencies_count': len(self.job_latencies), | 950 | 'inflight_messages_by_queue': queue_latency_count, |
| 914 | 'processed_messages': self.processed_message_counts, | 951 | 'latency_messages_by_queue': queue_latency_list, |
| 915 | 'processed_messages_by_worker': | 952 | 'max_latency_messages_by_queue': queue_max_latency_list, |
| 916 | self.processed_message_counts_by_worker, | 953 | 'waiting_messages_by_queue': queue_waiting_list, |
| 917 | 'waiting_message_counts': [ | 954 | 'processed_messages_by_queue': self.processed_message_counts, |
| 918 | '{}: {}'. | 955 | 'processed_messages_by_worker': self.processed_message_counts_by_worker # noqa |
| 919 | format(q, | ||
| 920 | len(self.waiting_messages[q])) | ||
| 921 | for q in self.waiting_messages] | ||
| 922 | }) | 956 | }) |
| 923 | 957 | ||
| 924 | def get_workers_status(self): | 958 | def get_workers_status(self): |
diff --git a/eventmq/tests/test_router.py b/eventmq/tests/test_router.py index 0a315e3..73612e7 100644 --- a/eventmq/tests/test_router.py +++ b/eventmq/tests/test_router.py | |||
| @@ -464,6 +464,8 @@ class TestCase(unittest.TestCase): | |||
| 464 | queue2_id = 'jimjam' | 464 | queue2_id = 'jimjam' |
| 465 | nonexistent_queue1 = 'nonexistent' | 465 | nonexistent_queue1 = 'nonexistent' |
| 466 | 466 | ||
| 467 | t = monotonic() | ||
| 468 | |||
| 467 | # To ensure the value was changed later because monotonic() is hard to | 469 | # To ensure the value was changed later because monotonic() is hard to |
| 468 | # mock | 470 | # mock |
| 469 | self.assertEqual(self.router._meta['last_worker_cleanup'], 0) | 471 | self.assertEqual(self.router._meta['last_worker_cleanup'], 0) |
| @@ -479,20 +481,20 @@ class TestCase(unittest.TestCase): | |||
| 479 | # 3 in the future | 481 | # 3 in the future |
| 480 | worker1_id: { | 482 | worker1_id: { |
| 481 | 'queues': [(10, queue1_id), ], | 483 | 'queues': [(10, queue1_id), ], |
| 482 | 'hb': monotonic() + 3, | 484 | 'hb': t + 3, |
| 483 | 'available_slots': 0, | 485 | 'available_slots': 0, |
| 484 | }, | 486 | }, |
| 485 | # below the timeout | 487 | # below the timeout |
| 486 | 488 | ||
| 487 | worker2_id: { | 489 | worker2_id: { |
| 488 | 'queues': [(10, queue2_id), (0, queue1_id)], | 490 | 'queues': [(10, queue2_id), (0, queue1_id)], |
| 489 | 'hb': 0, | 491 | 'hb': t - 2, |
| 490 | 'available_slots': 2, | 492 | 'available_slots': 2, |
| 491 | }, | 493 | }, |
| 492 | # below the timeout and a queue missing from self.router.queues | 494 | # below the timeout and a queue missing from self.router.queues |
| 493 | worker3_id: { | 495 | worker3_id: { |
| 494 | 'queues': [(10, queue2_id), (3, nonexistent_queue1)], | 496 | 'queues': [(10, queue2_id), (3, nonexistent_queue1)], |
| 495 | 'hb': 0, | 497 | 'hb': t - 2, |
| 496 | 'available_slots': 0, | 498 | 'available_slots': 0, |
| 497 | }, | 499 | }, |
| 498 | } | 500 | } |
| @@ -778,16 +780,19 @@ class TestCase(unittest.TestCase): | |||
| 778 | 780 | ||
| 779 | # hacky, but the serialize/deserialize converts the keys to unicode | 781 | # hacky, but the serialize/deserialize converts the keys to unicode |
| 780 | # correctly and what not. | 782 | # correctly and what not. |
| 783 | expected_object = { | ||
| 784 | 'inflight_messages_by_queue': {}, | ||
| 785 | 'latency_messages_by_queue': {}, | ||
| 786 | 'max_latency_messages_by_queue': {}, | ||
| 787 | 'processed_messages_by_queue': {}, | ||
| 788 | 'processed_messages_by_worker': {}, | ||
| 789 | 'waiting_messages_by_queue': { | ||
| 790 | q: len(self.router.waiting_messages[q]) | ||
| 791 | for q in self.router.waiting_messages | ||
| 792 | } | ||
| 793 | } | ||
| 781 | self.assertEqual( | 794 | self.assertEqual( |
| 782 | json.loads(json.dumps({ | 795 | json.loads(json.dumps(expected_object)), |
| 783 | 'job_latencies_count': len(self.router.job_latencies), | ||
| 784 | 'processed_messages': {}, | ||
| 785 | 'processed_messages_by_worker': {}, | ||
| 786 | 'waiting_message_counts': [ | ||
| 787 | '{}: {}'.format( | ||
| 788 | q, | ||
| 789 | len(self.router.waiting_messages[q])) for q in self.router.waiting_messages] # noqa | ||
| 790 | })), | ||
| 791 | json.loads(self.router.get_status())) | 796 | json.loads(self.router.get_status())) |
| 792 | 797 | ||
| 793 | self.assertEqual( | 798 | self.assertEqual( |