aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorjason2017-08-14 11:03:06 -0600
committerjason2017-08-14 11:03:06 -0600
commit02e0e19aa1f8f745c65c498ba69d8975699f913e (patch)
treec5e17a4130a90afa26b204c239020e3842b44b76
parent00059a235bc8fa75fe24e59fc17cc239e41b24c0 (diff)
parent9f60b21dd37c35c0b7909b5f1d8e403ef761d513 (diff)
downloadeventmq-02e0e19aa1f8f745c65c498ba69d8975699f913e.tar.gz
eventmq-02e0e19aa1f8f745c65c498ba69d8975699f913e.zip
Merge branch 'master' of github.com:eventmq/eventmq
-rwxr-xr-xbin/send_msg2
-rw-r--r--docs/settings_file.rst14
-rw-r--r--eventmq/__init__.py2
-rw-r--r--eventmq/conf.py3
-rw-r--r--eventmq/jobmanager.py21
-rw-r--r--eventmq/log.py31
-rw-r--r--eventmq/router.py59
-rw-r--r--eventmq/tests/test_router.py5
-rw-r--r--eventmq/utils/classes.py33
-rw-r--r--eventmq/worker.py11
-rw-r--r--setup.py2
11 files changed, 134 insertions, 49 deletions
diff --git a/bin/send_msg b/bin/send_msg
index c3e88fa..33cfb1a 100755
--- a/bin/send_msg
+++ b/bin/send_msg
@@ -22,7 +22,7 @@ if __name__ == "__main__":
22 'callable': 'work_job', 22 'callable': 'work_job',
23 'class_args': ('blurp',), 23 'class_args': ('blurp',),
24 'class_kwargs': {'kwarg1': True}, 24 'class_kwargs': {'kwarg1': True},
25 'args': (10, ), 25 'args': (1, ),
26 'kwargs': {} 26 'kwargs': {}
27 }] 27 }]
28 28
diff --git a/docs/settings_file.rst b/docs/settings_file.rst
index 43c922c..c758a16 100644
--- a/docs/settings_file.rst
+++ b/docs/settings_file.rst
@@ -30,6 +30,20 @@ Default: 'tcp://127.0.0.1:47291'
30 30
31The address used to listen for connections from workers 31The address used to listen for connections from workers
32 32
33wal
34=======
35Default: '/var/log/eventmq/wal.log'
36
37Write-ahead Log for replaying messages received by the Router. Will
38try to create the directory specified and append to the filename given.
39Requires correct permissions to write to the given file.
40
41wal_enabled
42===============
43Default: False
44
45Enable or disable the Write-ahead Log
46
33********* 47*********
34Scheduler 48Scheduler
35********* 49*********
diff --git a/eventmq/__init__.py b/eventmq/__init__.py
index 60bb61c..a02a091 100644
--- a/eventmq/__init__.py
+++ b/eventmq/__init__.py
@@ -1,5 +1,5 @@
1__author__ = 'EventMQ Contributors' 1__author__ = 'EventMQ Contributors'
2__version__ = '0.3.4.6' 2__version__ = '0.3.4.9'
3 3
4PROTOCOL_VERSION = 'eMQP/1.0' 4PROTOCOL_VERSION = 'eMQP/1.0'
5 5
diff --git a/eventmq/conf.py b/eventmq/conf.py
index 4749e6b..8609716 100644
--- a/eventmq/conf.py
+++ b/eventmq/conf.py
@@ -97,4 +97,7 @@ SETUP_CALLABLE = ''
97KILL_GRACE_PERIOD = 300 97KILL_GRACE_PERIOD = 300
98GLOBAL_TIMEOUT = 300 98GLOBAL_TIMEOUT = 300
99 99
100WAL = '/var/log/eventmq/wal.log'
101WAL_ENABLED = False
102
100# }}} 103# }}}
diff --git a/eventmq/jobmanager.py b/eventmq/jobmanager.py
index 4840fac..c900495 100644
--- a/eventmq/jobmanager.py
+++ b/eventmq/jobmanager.py
@@ -146,8 +146,6 @@ class JobManager(HeartbeatMixin, EMQPService):
146 Starts the actual event loop. Usually called by :meth:`start` 146 Starts the actual event loop. Usually called by :meth:`start`
147 """ 147 """
148 # Acknowledgment has come 148 # Acknowledgment has come
149 # Send a READY for each available worker
150
151 self.status = STATUS.running 149 self.status = STATUS.running
152 150
153 try: 151 try:
@@ -186,9 +184,9 @@ class JobManager(HeartbeatMixin, EMQPService):
186 else: 184 else:
187 try: 185 try:
188 events = self.poller.poll(1000) 186 events = self.poller.poll(1000)
189 except zmq.ZMQError: 187 except zmq.ZMQError as e:
190 logger.debug('Disconnecting due to ZMQError while' 188 logger.debug('Disconnecting due to ZMQError while'
191 ' polling') 189 ' polling: {}'.format(e))
192 sendmsg(self.outgoing, KBYE) 190 sendmsg(self.outgoing, KBYE)
193 self.received_disconnect = True 191 self.received_disconnect = True
194 continue 192 continue
@@ -212,6 +210,15 @@ class JobManager(HeartbeatMixin, EMQPService):
212 except Exception: 210 except Exception:
213 logger.exception("Unhandled exception in main jobmanager loop") 211 logger.exception("Unhandled exception in main jobmanager loop")
214 212
213 # Cleanup
214 if hasattr(self, '_workers'):
215 del self._workers
216
217 # Flush the queues with workers
218 self.request_queue = mp_queue()
219 self.finished_queue = mp_queue()
220 logger.info("Reached end of event loop")
221
215 def handle_response(self, resp): 222 def handle_response(self, resp):
216 """ 223 """
217 Handles a response from a worker process to the jobmanager 224 Handles a response from a worker process to the jobmanager
@@ -240,7 +247,8 @@ class JobManager(HeartbeatMixin, EMQPService):
240 247
241 """ 248 """
242 249
243 logger.debug(resp) 250 if conf.SUPER_DEBUG:
251 logger.debug(resp)
244 pid = resp['pid'] 252 pid = resp['pid']
245 msgid = resp['msgid'] 253 msgid = resp['msgid']
246 callback = resp['callback'] 254 callback = resp['callback']
@@ -383,9 +391,6 @@ class JobManager(HeartbeatMixin, EMQPService):
383 necessary 391 necessary
384 """ 392 """
385 # Kill workers that aren't alive 393 # Kill workers that aren't alive
386 logger.debug("Jobs in flight: {}".format(len(self.jobs_in_flight)))
387 logger.debug("Total requests: {}".format(self.total_requests))
388 logger.debug("Total ready sent: {}".format(self.total_ready_sent))
389 try: 394 try:
390 [self.kill_worker(w.pid, signal.SIGKILL) for w in self.workers 395 [self.kill_worker(w.pid, signal.SIGKILL) for w in self.workers
391 if not w.is_alive] 396 if not w.is_alive]
diff --git a/eventmq/log.py b/eventmq/log.py
index 5564094..3d62165 100644
--- a/eventmq/log.py
+++ b/eventmq/log.py
@@ -17,7 +17,10 @@ log module for eventmq
17 17
18this needs so much work. 18this needs so much work.
19""" 19"""
20import errno
20import logging 21import logging
22import os
23import time
21 24
22import zmq 25import zmq
23import zmq.log.handlers 26import zmq.log.handlers
@@ -46,6 +49,7 @@ class handlers(object):
46 """ 49 """
47 PUBLISH_HANDLER = PUBHandler 50 PUBLISH_HANDLER = PUBHandler
48 STREAM_HANDLER = logging.StreamHandler 51 STREAM_HANDLER = logging.StreamHandler
52 FILE_HANDLER = logging.FileHandler
49 53
50 54
51def setup_logger(base_name, formatter=FORMAT_STANDARD, 55def setup_logger(base_name, formatter=FORMAT_STANDARD,
@@ -75,3 +79,30 @@ def setup_logger(base_name, formatter=FORMAT_STANDARD,
75 logger.addHandler(handler) 79 logger.addHandler(handler)
76 80
77 return logger 81 return logger
82
83
84def setup_wal_logger(base_name, filename, handler=handlers.FILE_HANDLER):
85 """
86 Write-ahead Log for replaying messages. Should only contain
87 commands on the data path (REQUEST, SCHEDULE, UNSCHEDULE)
88 """
89
90 if not os.path.exists(os.path.dirname(filename)):
91 try:
92 os.makedirs(os.path.dirname(filename))
93 except OSError as exc: # Guard against race condition
94 if exc.errno != errno.EEXIST:
95 raise
96
97 with open(filename, "a+") as f:
98 f.close()
99
100 wal = logging.getLogger(base_name)
101 wal_handler = handler(filename)
102 formatter = logging.Formatter('%(asctime)s %(message)s')
103 formatter.converter = time.gmtime
104 wal_handler.setFormatter(formatter)
105 wal.addHandler(wal_handler)
106 wal.setLevel(logging.INFO)
107
108 return wal
diff --git a/eventmq/router.py b/eventmq/router.py
index 2943e3c..7c221bc 100644
--- a/eventmq/router.py
+++ b/eventmq/router.py
@@ -22,7 +22,7 @@ import json # deserialize queues in on_inform. should be refactored
22import logging 22import logging
23import signal 23import signal
24 24
25from eventmq.log import setup_logger 25from eventmq.log import setup_logger, setup_wal_logger
26from . import conf, constants, exceptions, poller, receiver 26from . import conf, constants, exceptions, poller, receiver
27from .constants import ( 27from .constants import (
28 CLIENT_TYPE, DISCONNECT, KBYE, PROTOCOL_VERSION, ROUTER_SHOW_SCHEDULERS, 28 CLIENT_TYPE, DISCONNECT, KBYE, PROTOCOL_VERSION, ROUTER_SHOW_SCHEDULERS,
@@ -41,6 +41,7 @@ from .utils.timeutils import monotonic, timestamp
41 41
42 42
43logger = logging.getLogger(__name__) 43logger = logging.getLogger(__name__)
44wal_logger = logging.getLogger('eventmq-wal')
44 45
45 46
46class Router(HeartbeatMixin): 47class Router(HeartbeatMixin):
@@ -94,6 +95,13 @@ class Router(HeartbeatMixin):
94 #: workers available to take the job 95 #: workers available to take the job
95 self.waiting_messages = {} 96 self.waiting_messages = {}
96 97
98 # Key: Queue.name, Value: # of messages sent to workers on that queue
99 # Includes REQUESTS in flight but not REQUESTS queued
100 self.processed_message_counts = {}
101
102 # Same as above but Key: Worker.uuid
103 self.processed_message_counts_by_worker = {}
104
97 #: Tracks the last time the scheduler queue was cleaned out of dead 105 #: Tracks the last time the scheduler queue was cleaned out of dead
98 #: schedulers 106 #: schedulers
99 self._meta['last_scheduler_cleanup'] = 0 107 self._meta['last_scheduler_cleanup'] = 0
@@ -118,8 +126,6 @@ class Router(HeartbeatMixin):
118 #: Excecuted function tracking dictionary 126 #: Excecuted function tracking dictionary
119 #: Key: msgid of msg each REQUEST received and forwarded to a worker 127 #: Key: msgid of msg each REQUEST received and forwarded to a worker
120 #: Value: (function_name, queue_name) 128 #: Value: (function_name, queue_name)
121 self.executed_functions = {}
122
123 #: Set to True when the router should die. 129 #: Set to True when the router should die.
124 self.received_disconnect = False 130 self.received_disconnect = False
125 131
@@ -171,6 +177,7 @@ class Router(HeartbeatMixin):
171 177
172 if events.get(self.incoming) == poller.POLLIN: 178 if events.get(self.incoming) == poller.POLLIN:
173 msg = self.incoming.recv_multipart() 179 msg = self.incoming.recv_multipart()
180 self.handle_wal_log(msg)
174 self.process_client_message(msg) 181 self.process_client_message(msg)
175 182
176 if events.get(self.outgoing) == poller.POLLIN: 183 if events.get(self.outgoing) == poller.POLLIN:
@@ -179,7 +186,8 @@ class Router(HeartbeatMixin):
179 186
180 if events.get(self.administrative_socket) == poller.POLLIN: 187 if events.get(self.administrative_socket) == poller.POLLIN:
181 msg = self.administrative_socket.recv_multipart() 188 msg = self.administrative_socket.recv_multipart()
182 logger.debug('ADMIN: {}'.format(msg)) 189 if conf.SUPER_DEBUG:
190 logger.debug('ADMIN: {}'.format(msg))
183 # ############## 191 # ##############
184 # Admin Commands 192 # Admin Commands
185 # ############## 193 # ##############
@@ -331,8 +339,9 @@ class Router(HeartbeatMixin):
331 """ 339 """
332 340
333 orig_msgid = msg[1] 341 orig_msgid = msg[1]
334 logger.info('Received REPLY from {} (msgid: {}, ACK msgid: {})'.format( 342 if conf.SUPER_DEBUG:
335 sender, msgid, orig_msgid)) 343 logger.debug('Received REPLY from {} (msgid: {}, ACK msgid: {})'.
344 format(sender, msgid, orig_msgid))
336 345
337 if orig_msgid in self.job_latencies: 346 if orig_msgid in self.job_latencies:
338 elapsed_secs = (monotonic() 347 elapsed_secs = (monotonic()
@@ -484,16 +493,20 @@ class Router(HeartbeatMixin):
484 493
485 try: 494 try:
486 # Check if msg type is for executing function 495 # Check if msg type is for executing function
487 if 'run' in msg and len(msg) > 2:
488 args_list = json.loads(msg[2])
489 args_dict = args_list[1]
490 function = args_dict.get('callable')
491 if function:
492 self.executed_functions[msgid] = (function, queue_name)
493 self.job_latencies[msgid] = (monotonic(), queue_name) 496 self.job_latencies[msgid] = (monotonic(), queue_name)
494 497
495 # Rebuild the message to be sent to the worker. fwdmsg will 498 # Rebuild the message to be sent to the worker. fwdmsg will
496 # properly address the message. 499 # properly address the message.
500 if queue_name not in self.processed_message_counts:
501 self.processed_message_counts[queue_name] = 1
502 else:
503 self.processed_message_counts[queue_name] += 1
504
505 if queue_name not in self.processed_message_counts_by_worker:
506 self.processed_message_counts_by_worker[worker_addr] = 1
507 else:
508 self.processed_message_counts_by_worker[worker_addr] += 1
509
497 fwdmsg(self.outgoing, worker_addr, ['', constants.PROTOCOL_VERSION, 510 fwdmsg(self.outgoing, worker_addr, ['', constants.PROTOCOL_VERSION,
498 'REQUEST', msgid, ] + msg) 511 'REQUEST', msgid, ] + msg)
499 512
@@ -694,6 +707,21 @@ class Router(HeartbeatMixin):
694 """ 707 """
695 self.workers[worker_id]['available_slots'] += 1 708 self.workers[worker_id]['available_slots'] += 1
696 709
710 def handle_wal_log(self, original_msg):
711
712 try:
713 message = parse_router_message(original_msg)
714 except exceptions.InvalidMessageError:
715 logger.exception('Invalid message from clients: {}'.format(
716 str(original_msg)))
717 return
718
719 command = message[1]
720
721 if conf.WAL_ENABLED and \
722 command in ("REQUEST", "SCHEDULE", "UNSCHEDULE"):
723 wal_logger.info(original_msg)
724
697 def process_client_message(self, original_msg, depth=0): 725 def process_client_message(self, original_msg, depth=0):
698 """ 726 """
699 Args: 727 Args:
@@ -876,8 +904,10 @@ class Router(HeartbeatMixin):
876 (str) Serialized information about the current state of the router. 904 (str) Serialized information about the current state of the router.
877 """ 905 """
878 return json.dumps({ 906 return json.dumps({
879 'job_latencies': self.job_latencies, 907 'job_latencies_count': len(self.job_latencies),
880 'executed_functions': self.executed_functions, 908 'processed_messages': self.processed_message_counts,
909 'processed_messages_by_worker':
910 self.processed_message_counts_by_worker,
881 'waiting_message_counts': [ 911 'waiting_message_counts': [
882 '{}: {}'. 912 '{}: {}'.
883 format(q, 913 format(q,
@@ -915,6 +945,7 @@ class Router(HeartbeatMixin):
915 """ 945 """
916 setup_logger('eventmq') 946 setup_logger('eventmq')
917 import_settings() 947 import_settings()
948 setup_wal_logger('eventmq-wal', conf.WAL)
918 self.start(frontend_addr=conf.FRONTEND_ADDR, 949 self.start(frontend_addr=conf.FRONTEND_ADDR,
919 backend_addr=conf.BACKEND_ADDR, 950 backend_addr=conf.BACKEND_ADDR,
920 administrative_addr=conf.ADMINISTRATIVE_ADDR) 951 administrative_addr=conf.ADMINISTRATIVE_ADDR)
diff --git a/eventmq/tests/test_router.py b/eventmq/tests/test_router.py
index fdb02d0..e9efa25 100644
--- a/eventmq/tests/test_router.py
+++ b/eventmq/tests/test_router.py
@@ -776,8 +776,9 @@ class TestCase(unittest.TestCase):
776 # correctly and what not. 776 # correctly and what not.
777 self.assertEqual( 777 self.assertEqual(
778 json.loads(json.dumps({ 778 json.loads(json.dumps({
779 'job_latencies': self.router.job_latencies, 779 'job_latencies_count': len(self.router.job_latencies),
780 'executed_functions': self.router.executed_functions, 780 'processed_messages': {},
781 'processed_messages_by_worker': {},
781 'waiting_message_counts': [ 782 'waiting_message_counts': [
782 '{}: {}'.format( 783 '{}: {}'.format(
783 q, 784 q,
diff --git a/eventmq/utils/classes.py b/eventmq/utils/classes.py
index 39c600b..73c6364 100644
--- a/eventmq/utils/classes.py
+++ b/eventmq/utils/classes.py
@@ -345,10 +345,9 @@ class ZMQReceiveMixin(object):
345 Receive a message 345 Receive a message
346 """ 346 """
347 msg = self.zsocket.recv() 347 msg = self.zsocket.recv()
348 if conf.SUPER_DEBUG: 348 if not ("HEARTBEAT" == msg[2] or "HEARTBEAT" == msg[3]) or \
349 if not ("HEARTBEAT" == msg[2] or "HEARTBEAT" == msg[3]) or \ 349 not conf.HIDE_HEARTBEAT_LOGS:
350 not conf.HIDE_HEARTBEAT_LOGS: 350 logger.debug('Received message: {}'.format(msg))
351 logger.debug('Received message: {}'.format(msg))
352 return msg 351 return msg
353 352
354 def recv_multipart(self): 353 def recv_multipart(self):
@@ -356,13 +355,12 @@ class ZMQReceiveMixin(object):
356 Receive a multipart message 355 Receive a multipart message
357 """ 356 """
358 msg = self.zsocket.recv_multipart() 357 msg = self.zsocket.recv_multipart()
359 if conf.SUPER_DEBUG: 358 # If it's not at least 4 frames long then most likely it isn't an
360 # If it's not at least 4 frames long then most likely it isn't an 359 # eventmq message
361 # eventmq message 360 if len(msg) >= 4 and \
362 if len(msg) >= 4 and \ 361 not ("HEARTBEAT" == msg[2] or "HEARTBEAT" == msg[3]) or \
363 not ("HEARTBEAT" == msg[2] or "HEARTBEAT" == msg[3]) or \ 362 not conf.HIDE_HEARTBEAT_LOGS:
364 not conf.HIDE_HEARTBEAT_LOGS: 363 logger.debug('Received message: {}'.format(msg))
365 logger.debug('Received message: {}'.format(msg))
366 return msg 364 return msg
367 365
368 366
@@ -402,13 +400,12 @@ class ZMQSendMixin(object):
402 400
403 msg = encodify(headers + message) 401 msg = encodify(headers + message)
404 402
405 if conf.SUPER_DEBUG: 403 # If it's not at least 4 frames long then most likely it isn't an
406 # If it's not at least 4 frames long then most likely it isn't an 404 # eventmq message
407 # eventmq message 405 if len(msg) > 4 and \
408 if len(msg) > 4 and \ 406 not ("HEARTBEAT" == msg[2] or "HEARTBEAT" == msg[3]) or \
409 not ("HEARTBEAT" == msg[2] or "HEARTBEAT" == msg[3]) or \ 407 not conf.HIDE_HEARTBEAT_LOGS:
410 not conf.HIDE_HEARTBEAT_LOGS: 408 logger.debug('Sending message: %s' % str(msg))
411 logger.debug('Sending message: %s' % str(msg))
412 409
413 try: 410 try:
414 self.zsocket.send_multipart(msg, 411 self.zsocket.send_multipart(msg,
diff --git a/eventmq/worker.py b/eventmq/worker.py
index 859b831..1621578 100644
--- a/eventmq/worker.py
+++ b/eventmq/worker.py
@@ -109,16 +109,19 @@ class MultiprocessWorker(Process):
109 timeout = payload.get("timeout") or conf.GLOBAL_TIMEOUT 109 timeout = payload.get("timeout") or conf.GLOBAL_TIMEOUT
110 msgid = payload.get('msgid', '') 110 msgid = payload.get('msgid', '')
111 callback = payload.get('callback', '') 111 callback = payload.get('callback', '')
112 logger.debug("Putting on thread queue msgid: {}".format( 112
113 msgid)) 113 if conf.SUPER_DEBUG:
114 logger.debug("Putting on thread queue msgid: {}".format(
115 msgid))
114 116
115 worker_queue.put(payload['params']) 117 worker_queue.put(payload['params'])
116 118
117 try: 119 try:
118 return_val = worker_result_queue.get(timeout=timeout) 120 return_val = worker_result_queue.get(timeout=timeout)
119 121
120 logger.debug("Got from result queue msgid: {}".format( 122 if conf.SUPER_DEBUG:
121 msgid)) 123 logger.debug("Got from result queue msgid: {}".format(
124 msgid))
122 except Queue.Empty: 125 except Queue.Empty:
123 return_val = 'TimeoutError' 126 return_val = 'TimeoutError'
124 127
diff --git a/setup.py b/setup.py
index 9ac6089..553c86b 100644
--- a/setup.py
+++ b/setup.py
@@ -7,7 +7,7 @@ from setuptools import find_packages, setup
7 7
8setup( 8setup(
9 name='eventmq', 9 name='eventmq',
10 version='0.3.4.6', 10 version='0.3.4.9',
11 description='EventMQ job execution and messaging system based on ZeroMQ', 11 description='EventMQ job execution and messaging system based on ZeroMQ',
12 packages=find_packages(), 12 packages=find_packages(),
13 install_requires=['pyzmq==15.4.0', 13 install_requires=['pyzmq==15.4.0',