aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDavid Hurst2017-06-27 12:08:11 -0600
committerGitHub2017-06-27 12:08:11 -0600
commitaf24bcedc49a59e673c4898e28c5e86e9d070de3 (patch)
treeae8a204f85ac0727153abb97612400998560849e
parentf3da7ca5ca56d2ab6ee25ea616cde75084e05158 (diff)
parent1ba238e51f1c523af47e9f6aaeacf3d8f8e83279 (diff)
downloadeventmq-af24bcedc49a59e673c4898e28c5e86e9d070de3.tar.gz
eventmq-af24bcedc49a59e673c4898e28c5e86e9d070de3.zip
Merge pull request #48 from sideshowdave7/feature/backport_processed_message_counts
Feature/backport processed message counts
-rw-r--r--eventmq/router.py31
-rw-r--r--eventmq/tests/test_router.py5
2 files changed, 24 insertions, 12 deletions
diff --git a/eventmq/router.py b/eventmq/router.py
index 2943e3c..0d05468 100644
--- a/eventmq/router.py
+++ b/eventmq/router.py
@@ -94,6 +94,13 @@ class Router(HeartbeatMixin):
94 #: workers available to take the job 94 #: workers available to take the job
95 self.waiting_messages = {} 95 self.waiting_messages = {}
96 96
97 # Key: Queue.name, Value: # of messages sent to workers on that queue
98 # Includes REQUESTS in flight but not REQUESTS queued
99 self.processed_message_counts = {}
100
101 # Same as above but Key: Worker.uuid
102 self.processed_message_counts_by_worker = {}
103
97 #: Tracks the last time the scheduler queue was cleaned out of dead 104 #: Tracks the last time the scheduler queue was cleaned out of dead
98 #: schedulers 105 #: schedulers
99 self._meta['last_scheduler_cleanup'] = 0 106 self._meta['last_scheduler_cleanup'] = 0
@@ -118,8 +125,6 @@ class Router(HeartbeatMixin):
118 #: Excecuted function tracking dictionary 125 #: Excecuted function tracking dictionary
119 #: Key: msgid of msg each REQUEST received and forwarded to a worker 126 #: Key: msgid of msg each REQUEST received and forwarded to a worker
120 #: Value: (function_name, queue_name) 127 #: Value: (function_name, queue_name)
121 self.executed_functions = {}
122
123 #: Set to True when the router should die. 128 #: Set to True when the router should die.
124 self.received_disconnect = False 129 self.received_disconnect = False
125 130
@@ -484,16 +489,20 @@ class Router(HeartbeatMixin):
484 489
485 try: 490 try:
486 # Check if msg type is for executing function 491 # Check if msg type is for executing function
487 if 'run' in msg and len(msg) > 2:
488 args_list = json.loads(msg[2])
489 args_dict = args_list[1]
490 function = args_dict.get('callable')
491 if function:
492 self.executed_functions[msgid] = (function, queue_name)
493 self.job_latencies[msgid] = (monotonic(), queue_name) 492 self.job_latencies[msgid] = (monotonic(), queue_name)
494 493
495 # Rebuild the message to be sent to the worker. fwdmsg will 494 # Rebuild the message to be sent to the worker. fwdmsg will
496 # properly address the message. 495 # properly address the message.
496 if queue_name not in self.processed_message_counts:
497 self.processed_message_counts[queue_name] = 1
498 else:
499 self.processed_message_counts[queue_name] += 1
500
501 if queue_name not in self.processed_message_counts_by_worker:
502 self.processed_message_counts_by_worker[worker_addr] = 1
503 else:
504 self.processed_message_counts_by_worker[worker_addr] += 1
505
497 fwdmsg(self.outgoing, worker_addr, ['', constants.PROTOCOL_VERSION, 506 fwdmsg(self.outgoing, worker_addr, ['', constants.PROTOCOL_VERSION,
498 'REQUEST', msgid, ] + msg) 507 'REQUEST', msgid, ] + msg)
499 508
@@ -876,8 +885,10 @@ class Router(HeartbeatMixin):
876 (str) Serialized information about the current state of the router. 885 (str) Serialized information about the current state of the router.
877 """ 886 """
878 return json.dumps({ 887 return json.dumps({
879 'job_latencies': self.job_latencies, 888 'job_latencies_count': len(self.job_latencies),
880 'executed_functions': self.executed_functions, 889 'processed_messages': self.processed_message_counts,
890 'processed_messages_by_worker':
891 self.processed_message_counts_by_worker,
881 'waiting_message_counts': [ 892 'waiting_message_counts': [
882 '{}: {}'. 893 '{}: {}'.
883 format(q, 894 format(q,
diff --git a/eventmq/tests/test_router.py b/eventmq/tests/test_router.py
index fdb02d0..e9efa25 100644
--- a/eventmq/tests/test_router.py
+++ b/eventmq/tests/test_router.py
@@ -776,8 +776,9 @@ class TestCase(unittest.TestCase):
776 # correctly and what not. 776 # correctly and what not.
777 self.assertEqual( 777 self.assertEqual(
778 json.loads(json.dumps({ 778 json.loads(json.dumps({
779 'job_latencies': self.router.job_latencies, 779 'job_latencies_count': len(self.router.job_latencies),
780 'executed_functions': self.router.executed_functions, 780 'processed_messages': {},
781 'processed_messages_by_worker': {},
781 'waiting_message_counts': [ 782 'waiting_message_counts': [
782 '{}: {}'.format( 783 '{}: {}'.format(
783 q, 784 q,