diff options
| author | jason | 2017-08-14 11:03:06 -0600 |
|---|---|---|
| committer | jason | 2017-08-14 11:03:06 -0600 |
| commit | 02e0e19aa1f8f745c65c498ba69d8975699f913e (patch) | |
| tree | c5e17a4130a90afa26b204c239020e3842b44b76 | |
| parent | 00059a235bc8fa75fe24e59fc17cc239e41b24c0 (diff) | |
| parent | 9f60b21dd37c35c0b7909b5f1d8e403ef761d513 (diff) | |
| download | eventmq-02e0e19aa1f8f745c65c498ba69d8975699f913e.tar.gz eventmq-02e0e19aa1f8f745c65c498ba69d8975699f913e.zip | |
Merge branch 'master' of github.com:eventmq/eventmq
| -rwxr-xr-x | bin/send_msg | 2 | ||||
| -rw-r--r-- | docs/settings_file.rst | 14 | ||||
| -rw-r--r-- | eventmq/__init__.py | 2 | ||||
| -rw-r--r-- | eventmq/conf.py | 3 | ||||
| -rw-r--r-- | eventmq/jobmanager.py | 21 | ||||
| -rw-r--r-- | eventmq/log.py | 31 | ||||
| -rw-r--r-- | eventmq/router.py | 59 | ||||
| -rw-r--r-- | eventmq/tests/test_router.py | 5 | ||||
| -rw-r--r-- | eventmq/utils/classes.py | 33 | ||||
| -rw-r--r-- | eventmq/worker.py | 11 | ||||
| -rw-r--r-- | setup.py | 2 |
11 files changed, 134 insertions, 49 deletions
diff --git a/bin/send_msg b/bin/send_msg index c3e88fa..33cfb1a 100755 --- a/bin/send_msg +++ b/bin/send_msg | |||
| @@ -22,7 +22,7 @@ if __name__ == "__main__": | |||
| 22 | 'callable': 'work_job', | 22 | 'callable': 'work_job', |
| 23 | 'class_args': ('blurp',), | 23 | 'class_args': ('blurp',), |
| 24 | 'class_kwargs': {'kwarg1': True}, | 24 | 'class_kwargs': {'kwarg1': True}, |
| 25 | 'args': (10, ), | 25 | 'args': (1, ), |
| 26 | 'kwargs': {} | 26 | 'kwargs': {} |
| 27 | }] | 27 | }] |
| 28 | 28 | ||
diff --git a/docs/settings_file.rst b/docs/settings_file.rst index 43c922c..c758a16 100644 --- a/docs/settings_file.rst +++ b/docs/settings_file.rst | |||
| @@ -30,6 +30,20 @@ Default: 'tcp://127.0.0.1:47291' | |||
| 30 | 30 | ||
| 31 | The address used to listen for connections from workers | 31 | The address used to listen for connections from workers |
| 32 | 32 | ||
| 33 | wal | ||
| 34 | ======= | ||
| 35 | Default: '/var/log/eventmq/wal.log' | ||
| 36 | |||
| 37 | Write-ahead Log for replaying messages received by the Router. Will | ||
| 38 | try to create the directory specified and append to the filename given. | ||
| 39 | Requires correct permissions to write to the given file. | ||
| 40 | |||
| 41 | wal_enabled | ||
| 42 | =============== | ||
| 43 | Default: False | ||
| 44 | |||
| 45 | Enable or disable the Write-ahead Log | ||
| 46 | |||
| 33 | ********* | 47 | ********* |
| 34 | Scheduler | 48 | Scheduler |
| 35 | ********* | 49 | ********* |
diff --git a/eventmq/__init__.py b/eventmq/__init__.py index 60bb61c..a02a091 100644 --- a/eventmq/__init__.py +++ b/eventmq/__init__.py | |||
| @@ -1,5 +1,5 @@ | |||
| 1 | __author__ = 'EventMQ Contributors' | 1 | __author__ = 'EventMQ Contributors' |
| 2 | __version__ = '0.3.4.6' | 2 | __version__ = '0.3.4.9' |
| 3 | 3 | ||
| 4 | PROTOCOL_VERSION = 'eMQP/1.0' | 4 | PROTOCOL_VERSION = 'eMQP/1.0' |
| 5 | 5 | ||
diff --git a/eventmq/conf.py b/eventmq/conf.py index 4749e6b..8609716 100644 --- a/eventmq/conf.py +++ b/eventmq/conf.py | |||
| @@ -97,4 +97,7 @@ SETUP_CALLABLE = '' | |||
| 97 | KILL_GRACE_PERIOD = 300 | 97 | KILL_GRACE_PERIOD = 300 |
| 98 | GLOBAL_TIMEOUT = 300 | 98 | GLOBAL_TIMEOUT = 300 |
| 99 | 99 | ||
| 100 | WAL = '/var/log/eventmq/wal.log' | ||
| 101 | WAL_ENABLED = False | ||
| 102 | |||
| 100 | # }}} | 103 | # }}} |
diff --git a/eventmq/jobmanager.py b/eventmq/jobmanager.py index 4840fac..c900495 100644 --- a/eventmq/jobmanager.py +++ b/eventmq/jobmanager.py | |||
| @@ -146,8 +146,6 @@ class JobManager(HeartbeatMixin, EMQPService): | |||
| 146 | Starts the actual event loop. Usually called by :meth:`start` | 146 | Starts the actual event loop. Usually called by :meth:`start` |
| 147 | """ | 147 | """ |
| 148 | # Acknowledgment has come | 148 | # Acknowledgment has come |
| 149 | # Send a READY for each available worker | ||
| 150 | |||
| 151 | self.status = STATUS.running | 149 | self.status = STATUS.running |
| 152 | 150 | ||
| 153 | try: | 151 | try: |
| @@ -186,9 +184,9 @@ class JobManager(HeartbeatMixin, EMQPService): | |||
| 186 | else: | 184 | else: |
| 187 | try: | 185 | try: |
| 188 | events = self.poller.poll(1000) | 186 | events = self.poller.poll(1000) |
| 189 | except zmq.ZMQError: | 187 | except zmq.ZMQError as e: |
| 190 | logger.debug('Disconnecting due to ZMQError while' | 188 | logger.debug('Disconnecting due to ZMQError while' |
| 191 | ' polling') | 189 | ' polling: {}'.format(e)) |
| 192 | sendmsg(self.outgoing, KBYE) | 190 | sendmsg(self.outgoing, KBYE) |
| 193 | self.received_disconnect = True | 191 | self.received_disconnect = True |
| 194 | continue | 192 | continue |
| @@ -212,6 +210,15 @@ class JobManager(HeartbeatMixin, EMQPService): | |||
| 212 | except Exception: | 210 | except Exception: |
| 213 | logger.exception("Unhandled exception in main jobmanager loop") | 211 | logger.exception("Unhandled exception in main jobmanager loop") |
| 214 | 212 | ||
| 213 | # Cleanup | ||
| 214 | if hasattr(self, '_workers'): | ||
| 215 | del self._workers | ||
| 216 | |||
| 217 | # Flush the queues with workers | ||
| 218 | self.request_queue = mp_queue() | ||
| 219 | self.finished_queue = mp_queue() | ||
| 220 | logger.info("Reached end of event loop") | ||
| 221 | |||
| 215 | def handle_response(self, resp): | 222 | def handle_response(self, resp): |
| 216 | """ | 223 | """ |
| 217 | Handles a response from a worker process to the jobmanager | 224 | Handles a response from a worker process to the jobmanager |
| @@ -240,7 +247,8 @@ class JobManager(HeartbeatMixin, EMQPService): | |||
| 240 | 247 | ||
| 241 | """ | 248 | """ |
| 242 | 249 | ||
| 243 | logger.debug(resp) | 250 | if conf.SUPER_DEBUG: |
| 251 | logger.debug(resp) | ||
| 244 | pid = resp['pid'] | 252 | pid = resp['pid'] |
| 245 | msgid = resp['msgid'] | 253 | msgid = resp['msgid'] |
| 246 | callback = resp['callback'] | 254 | callback = resp['callback'] |
| @@ -383,9 +391,6 @@ class JobManager(HeartbeatMixin, EMQPService): | |||
| 383 | necessary | 391 | necessary |
| 384 | """ | 392 | """ |
| 385 | # Kill workers that aren't alive | 393 | # Kill workers that aren't alive |
| 386 | logger.debug("Jobs in flight: {}".format(len(self.jobs_in_flight))) | ||
| 387 | logger.debug("Total requests: {}".format(self.total_requests)) | ||
| 388 | logger.debug("Total ready sent: {}".format(self.total_ready_sent)) | ||
| 389 | try: | 394 | try: |
| 390 | [self.kill_worker(w.pid, signal.SIGKILL) for w in self.workers | 395 | [self.kill_worker(w.pid, signal.SIGKILL) for w in self.workers |
| 391 | if not w.is_alive] | 396 | if not w.is_alive] |
diff --git a/eventmq/log.py b/eventmq/log.py index 5564094..3d62165 100644 --- a/eventmq/log.py +++ b/eventmq/log.py | |||
| @@ -17,7 +17,10 @@ log module for eventmq | |||
| 17 | 17 | ||
| 18 | this needs so much work. | 18 | this needs so much work. |
| 19 | """ | 19 | """ |
| 20 | import errno | ||
| 20 | import logging | 21 | import logging |
| 22 | import os | ||
| 23 | import time | ||
| 21 | 24 | ||
| 22 | import zmq | 25 | import zmq |
| 23 | import zmq.log.handlers | 26 | import zmq.log.handlers |
| @@ -46,6 +49,7 @@ class handlers(object): | |||
| 46 | """ | 49 | """ |
| 47 | PUBLISH_HANDLER = PUBHandler | 50 | PUBLISH_HANDLER = PUBHandler |
| 48 | STREAM_HANDLER = logging.StreamHandler | 51 | STREAM_HANDLER = logging.StreamHandler |
| 52 | FILE_HANDLER = logging.FileHandler | ||
| 49 | 53 | ||
| 50 | 54 | ||
| 51 | def setup_logger(base_name, formatter=FORMAT_STANDARD, | 55 | def setup_logger(base_name, formatter=FORMAT_STANDARD, |
| @@ -75,3 +79,30 @@ def setup_logger(base_name, formatter=FORMAT_STANDARD, | |||
| 75 | logger.addHandler(handler) | 79 | logger.addHandler(handler) |
| 76 | 80 | ||
| 77 | return logger | 81 | return logger |
| 82 | |||
| 83 | |||
| 84 | def setup_wal_logger(base_name, filename, handler=handlers.FILE_HANDLER): | ||
| 85 | """ | ||
| 86 | Write-ahead Log for replaying messages. Should only contain | ||
| 87 | commands on the data path (REQUEST, SCHEDULE, UNSCHEDULE) | ||
| 88 | """ | ||
| 89 | |||
| 90 | if not os.path.exists(os.path.dirname(filename)): | ||
| 91 | try: | ||
| 92 | os.makedirs(os.path.dirname(filename)) | ||
| 93 | except OSError as exc: # Guard against race condition | ||
| 94 | if exc.errno != errno.EEXIST: | ||
| 95 | raise | ||
| 96 | |||
| 97 | with open(filename, "a+") as f: | ||
| 98 | f.close() | ||
| 99 | |||
| 100 | wal = logging.getLogger(base_name) | ||
| 101 | wal_handler = handler(filename) | ||
| 102 | formatter = logging.Formatter('%(asctime)s %(message)s') | ||
| 103 | formatter.converter = time.gmtime | ||
| 104 | wal_handler.setFormatter(formatter) | ||
| 105 | wal.addHandler(wal_handler) | ||
| 106 | wal.setLevel(logging.INFO) | ||
| 107 | |||
| 108 | return wal | ||
diff --git a/eventmq/router.py b/eventmq/router.py index 2943e3c..7c221bc 100644 --- a/eventmq/router.py +++ b/eventmq/router.py | |||
| @@ -22,7 +22,7 @@ import json # deserialize queues in on_inform. should be refactored | |||
| 22 | import logging | 22 | import logging |
| 23 | import signal | 23 | import signal |
| 24 | 24 | ||
| 25 | from eventmq.log import setup_logger | 25 | from eventmq.log import setup_logger, setup_wal_logger |
| 26 | from . import conf, constants, exceptions, poller, receiver | 26 | from . import conf, constants, exceptions, poller, receiver |
| 27 | from .constants import ( | 27 | from .constants import ( |
| 28 | CLIENT_TYPE, DISCONNECT, KBYE, PROTOCOL_VERSION, ROUTER_SHOW_SCHEDULERS, | 28 | CLIENT_TYPE, DISCONNECT, KBYE, PROTOCOL_VERSION, ROUTER_SHOW_SCHEDULERS, |
| @@ -41,6 +41,7 @@ from .utils.timeutils import monotonic, timestamp | |||
| 41 | 41 | ||
| 42 | 42 | ||
| 43 | logger = logging.getLogger(__name__) | 43 | logger = logging.getLogger(__name__) |
| 44 | wal_logger = logging.getLogger('eventmq-wal') | ||
| 44 | 45 | ||
| 45 | 46 | ||
| 46 | class Router(HeartbeatMixin): | 47 | class Router(HeartbeatMixin): |
| @@ -94,6 +95,13 @@ class Router(HeartbeatMixin): | |||
| 94 | #: workers available to take the job | 95 | #: workers available to take the job |
| 95 | self.waiting_messages = {} | 96 | self.waiting_messages = {} |
| 96 | 97 | ||
| 98 | # Key: Queue.name, Value: # of messages sent to workers on that queue | ||
| 99 | # Includes REQUESTS in flight but not REQUESTS queued | ||
| 100 | self.processed_message_counts = {} | ||
| 101 | |||
| 102 | # Same as above but Key: Worker.uuid | ||
| 103 | self.processed_message_counts_by_worker = {} | ||
| 104 | |||
| 97 | #: Tracks the last time the scheduler queue was cleaned out of dead | 105 | #: Tracks the last time the scheduler queue was cleaned out of dead |
| 98 | #: schedulers | 106 | #: schedulers |
| 99 | self._meta['last_scheduler_cleanup'] = 0 | 107 | self._meta['last_scheduler_cleanup'] = 0 |
| @@ -118,8 +126,6 @@ class Router(HeartbeatMixin): | |||
| 118 | #: Excecuted function tracking dictionary | 126 | #: Excecuted function tracking dictionary |
| 119 | #: Key: msgid of msg each REQUEST received and forwarded to a worker | 127 | #: Key: msgid of msg each REQUEST received and forwarded to a worker |
| 120 | #: Value: (function_name, queue_name) | 128 | #: Value: (function_name, queue_name) |
| 121 | self.executed_functions = {} | ||
| 122 | |||
| 123 | #: Set to True when the router should die. | 129 | #: Set to True when the router should die. |
| 124 | self.received_disconnect = False | 130 | self.received_disconnect = False |
| 125 | 131 | ||
| @@ -171,6 +177,7 @@ class Router(HeartbeatMixin): | |||
| 171 | 177 | ||
| 172 | if events.get(self.incoming) == poller.POLLIN: | 178 | if events.get(self.incoming) == poller.POLLIN: |
| 173 | msg = self.incoming.recv_multipart() | 179 | msg = self.incoming.recv_multipart() |
| 180 | self.handle_wal_log(msg) | ||
| 174 | self.process_client_message(msg) | 181 | self.process_client_message(msg) |
| 175 | 182 | ||
| 176 | if events.get(self.outgoing) == poller.POLLIN: | 183 | if events.get(self.outgoing) == poller.POLLIN: |
| @@ -179,7 +186,8 @@ class Router(HeartbeatMixin): | |||
| 179 | 186 | ||
| 180 | if events.get(self.administrative_socket) == poller.POLLIN: | 187 | if events.get(self.administrative_socket) == poller.POLLIN: |
| 181 | msg = self.administrative_socket.recv_multipart() | 188 | msg = self.administrative_socket.recv_multipart() |
| 182 | logger.debug('ADMIN: {}'.format(msg)) | 189 | if conf.SUPER_DEBUG: |
| 190 | logger.debug('ADMIN: {}'.format(msg)) | ||
| 183 | # ############## | 191 | # ############## |
| 184 | # Admin Commands | 192 | # Admin Commands |
| 185 | # ############## | 193 | # ############## |
| @@ -331,8 +339,9 @@ class Router(HeartbeatMixin): | |||
| 331 | """ | 339 | """ |
| 332 | 340 | ||
| 333 | orig_msgid = msg[1] | 341 | orig_msgid = msg[1] |
| 334 | logger.info('Received REPLY from {} (msgid: {}, ACK msgid: {})'.format( | 342 | if conf.SUPER_DEBUG: |
| 335 | sender, msgid, orig_msgid)) | 343 | logger.debug('Received REPLY from {} (msgid: {}, ACK msgid: {})'. |
| 344 | format(sender, msgid, orig_msgid)) | ||
| 336 | 345 | ||
| 337 | if orig_msgid in self.job_latencies: | 346 | if orig_msgid in self.job_latencies: |
| 338 | elapsed_secs = (monotonic() | 347 | elapsed_secs = (monotonic() |
| @@ -484,16 +493,20 @@ class Router(HeartbeatMixin): | |||
| 484 | 493 | ||
| 485 | try: | 494 | try: |
| 486 | # Check if msg type is for executing function | 495 | # Check if msg type is for executing function |
| 487 | if 'run' in msg and len(msg) > 2: | ||
| 488 | args_list = json.loads(msg[2]) | ||
| 489 | args_dict = args_list[1] | ||
| 490 | function = args_dict.get('callable') | ||
| 491 | if function: | ||
| 492 | self.executed_functions[msgid] = (function, queue_name) | ||
| 493 | self.job_latencies[msgid] = (monotonic(), queue_name) | 496 | self.job_latencies[msgid] = (monotonic(), queue_name) |
| 494 | 497 | ||
| 495 | # Rebuild the message to be sent to the worker. fwdmsg will | 498 | # Rebuild the message to be sent to the worker. fwdmsg will |
| 496 | # properly address the message. | 499 | # properly address the message. |
| 500 | if queue_name not in self.processed_message_counts: | ||
| 501 | self.processed_message_counts[queue_name] = 1 | ||
| 502 | else: | ||
| 503 | self.processed_message_counts[queue_name] += 1 | ||
| 504 | |||
| 505 | if queue_name not in self.processed_message_counts_by_worker: | ||
| 506 | self.processed_message_counts_by_worker[worker_addr] = 1 | ||
| 507 | else: | ||
| 508 | self.processed_message_counts_by_worker[worker_addr] += 1 | ||
| 509 | |||
| 497 | fwdmsg(self.outgoing, worker_addr, ['', constants.PROTOCOL_VERSION, | 510 | fwdmsg(self.outgoing, worker_addr, ['', constants.PROTOCOL_VERSION, |
| 498 | 'REQUEST', msgid, ] + msg) | 511 | 'REQUEST', msgid, ] + msg) |
| 499 | 512 | ||
| @@ -694,6 +707,21 @@ class Router(HeartbeatMixin): | |||
| 694 | """ | 707 | """ |
| 695 | self.workers[worker_id]['available_slots'] += 1 | 708 | self.workers[worker_id]['available_slots'] += 1 |
| 696 | 709 | ||
| 710 | def handle_wal_log(self, original_msg): | ||
| 711 | |||
| 712 | try: | ||
| 713 | message = parse_router_message(original_msg) | ||
| 714 | except exceptions.InvalidMessageError: | ||
| 715 | logger.exception('Invalid message from clients: {}'.format( | ||
| 716 | str(original_msg))) | ||
| 717 | return | ||
| 718 | |||
| 719 | command = message[1] | ||
| 720 | |||
| 721 | if conf.WAL_ENABLED and \ | ||
| 722 | command in ("REQUEST", "SCHEDULE", "UNSCHEDULE"): | ||
| 723 | wal_logger.info(original_msg) | ||
| 724 | |||
| 697 | def process_client_message(self, original_msg, depth=0): | 725 | def process_client_message(self, original_msg, depth=0): |
| 698 | """ | 726 | """ |
| 699 | Args: | 727 | Args: |
| @@ -876,8 +904,10 @@ class Router(HeartbeatMixin): | |||
| 876 | (str) Serialized information about the current state of the router. | 904 | (str) Serialized information about the current state of the router. |
| 877 | """ | 905 | """ |
| 878 | return json.dumps({ | 906 | return json.dumps({ |
| 879 | 'job_latencies': self.job_latencies, | 907 | 'job_latencies_count': len(self.job_latencies), |
| 880 | 'executed_functions': self.executed_functions, | 908 | 'processed_messages': self.processed_message_counts, |
| 909 | 'processed_messages_by_worker': | ||
| 910 | self.processed_message_counts_by_worker, | ||
| 881 | 'waiting_message_counts': [ | 911 | 'waiting_message_counts': [ |
| 882 | '{}: {}'. | 912 | '{}: {}'. |
| 883 | format(q, | 913 | format(q, |
| @@ -915,6 +945,7 @@ class Router(HeartbeatMixin): | |||
| 915 | """ | 945 | """ |
| 916 | setup_logger('eventmq') | 946 | setup_logger('eventmq') |
| 917 | import_settings() | 947 | import_settings() |
| 948 | setup_wal_logger('eventmq-wal', conf.WAL) | ||
| 918 | self.start(frontend_addr=conf.FRONTEND_ADDR, | 949 | self.start(frontend_addr=conf.FRONTEND_ADDR, |
| 919 | backend_addr=conf.BACKEND_ADDR, | 950 | backend_addr=conf.BACKEND_ADDR, |
| 920 | administrative_addr=conf.ADMINISTRATIVE_ADDR) | 951 | administrative_addr=conf.ADMINISTRATIVE_ADDR) |
diff --git a/eventmq/tests/test_router.py b/eventmq/tests/test_router.py index fdb02d0..e9efa25 100644 --- a/eventmq/tests/test_router.py +++ b/eventmq/tests/test_router.py | |||
| @@ -776,8 +776,9 @@ class TestCase(unittest.TestCase): | |||
| 776 | # correctly and what not. | 776 | # correctly and what not. |
| 777 | self.assertEqual( | 777 | self.assertEqual( |
| 778 | json.loads(json.dumps({ | 778 | json.loads(json.dumps({ |
| 779 | 'job_latencies': self.router.job_latencies, | 779 | 'job_latencies_count': len(self.router.job_latencies), |
| 780 | 'executed_functions': self.router.executed_functions, | 780 | 'processed_messages': {}, |
| 781 | 'processed_messages_by_worker': {}, | ||
| 781 | 'waiting_message_counts': [ | 782 | 'waiting_message_counts': [ |
| 782 | '{}: {}'.format( | 783 | '{}: {}'.format( |
| 783 | q, | 784 | q, |
diff --git a/eventmq/utils/classes.py b/eventmq/utils/classes.py index 39c600b..73c6364 100644 --- a/eventmq/utils/classes.py +++ b/eventmq/utils/classes.py | |||
| @@ -345,10 +345,9 @@ class ZMQReceiveMixin(object): | |||
| 345 | Receive a message | 345 | Receive a message |
| 346 | """ | 346 | """ |
| 347 | msg = self.zsocket.recv() | 347 | msg = self.zsocket.recv() |
| 348 | if conf.SUPER_DEBUG: | 348 | if not ("HEARTBEAT" == msg[2] or "HEARTBEAT" == msg[3]) or \ |
| 349 | if not ("HEARTBEAT" == msg[2] or "HEARTBEAT" == msg[3]) or \ | 349 | not conf.HIDE_HEARTBEAT_LOGS: |
| 350 | not conf.HIDE_HEARTBEAT_LOGS: | 350 | logger.debug('Received message: {}'.format(msg)) |
| 351 | logger.debug('Received message: {}'.format(msg)) | ||
| 352 | return msg | 351 | return msg |
| 353 | 352 | ||
| 354 | def recv_multipart(self): | 353 | def recv_multipart(self): |
| @@ -356,13 +355,12 @@ class ZMQReceiveMixin(object): | |||
| 356 | Receive a multipart message | 355 | Receive a multipart message |
| 357 | """ | 356 | """ |
| 358 | msg = self.zsocket.recv_multipart() | 357 | msg = self.zsocket.recv_multipart() |
| 359 | if conf.SUPER_DEBUG: | 358 | # If it's not at least 4 frames long then most likely it isn't an |
| 360 | # If it's not at least 4 frames long then most likely it isn't an | 359 | # eventmq message |
| 361 | # eventmq message | 360 | if len(msg) >= 4 and \ |
| 362 | if len(msg) >= 4 and \ | 361 | not ("HEARTBEAT" == msg[2] or "HEARTBEAT" == msg[3]) or \ |
| 363 | not ("HEARTBEAT" == msg[2] or "HEARTBEAT" == msg[3]) or \ | 362 | not conf.HIDE_HEARTBEAT_LOGS: |
| 364 | not conf.HIDE_HEARTBEAT_LOGS: | 363 | logger.debug('Received message: {}'.format(msg)) |
| 365 | logger.debug('Received message: {}'.format(msg)) | ||
| 366 | return msg | 364 | return msg |
| 367 | 365 | ||
| 368 | 366 | ||
| @@ -402,13 +400,12 @@ class ZMQSendMixin(object): | |||
| 402 | 400 | ||
| 403 | msg = encodify(headers + message) | 401 | msg = encodify(headers + message) |
| 404 | 402 | ||
| 405 | if conf.SUPER_DEBUG: | 403 | # If it's not at least 4 frames long then most likely it isn't an |
| 406 | # If it's not at least 4 frames long then most likely it isn't an | 404 | # eventmq message |
| 407 | # eventmq message | 405 | if len(msg) > 4 and \ |
| 408 | if len(msg) > 4 and \ | 406 | not ("HEARTBEAT" == msg[2] or "HEARTBEAT" == msg[3]) or \ |
| 409 | not ("HEARTBEAT" == msg[2] or "HEARTBEAT" == msg[3]) or \ | 407 | not conf.HIDE_HEARTBEAT_LOGS: |
| 410 | not conf.HIDE_HEARTBEAT_LOGS: | 408 | logger.debug('Sending message: %s' % str(msg)) |
| 411 | logger.debug('Sending message: %s' % str(msg)) | ||
| 412 | 409 | ||
| 413 | try: | 410 | try: |
| 414 | self.zsocket.send_multipart(msg, | 411 | self.zsocket.send_multipart(msg, |
diff --git a/eventmq/worker.py b/eventmq/worker.py index 859b831..1621578 100644 --- a/eventmq/worker.py +++ b/eventmq/worker.py | |||
| @@ -109,16 +109,19 @@ class MultiprocessWorker(Process): | |||
| 109 | timeout = payload.get("timeout") or conf.GLOBAL_TIMEOUT | 109 | timeout = payload.get("timeout") or conf.GLOBAL_TIMEOUT |
| 110 | msgid = payload.get('msgid', '') | 110 | msgid = payload.get('msgid', '') |
| 111 | callback = payload.get('callback', '') | 111 | callback = payload.get('callback', '') |
| 112 | logger.debug("Putting on thread queue msgid: {}".format( | 112 | |
| 113 | msgid)) | 113 | if conf.SUPER_DEBUG: |
| 114 | logger.debug("Putting on thread queue msgid: {}".format( | ||
| 115 | msgid)) | ||
| 114 | 116 | ||
| 115 | worker_queue.put(payload['params']) | 117 | worker_queue.put(payload['params']) |
| 116 | 118 | ||
| 117 | try: | 119 | try: |
| 118 | return_val = worker_result_queue.get(timeout=timeout) | 120 | return_val = worker_result_queue.get(timeout=timeout) |
| 119 | 121 | ||
| 120 | logger.debug("Got from result queue msgid: {}".format( | 122 | if conf.SUPER_DEBUG: |
| 121 | msgid)) | 123 | logger.debug("Got from result queue msgid: {}".format( |
| 124 | msgid)) | ||
| 122 | except Queue.Empty: | 125 | except Queue.Empty: |
| 123 | return_val = 'TimeoutError' | 126 | return_val = 'TimeoutError' |
| 124 | 127 | ||
| @@ -7,7 +7,7 @@ from setuptools import find_packages, setup | |||
| 7 | 7 | ||
| 8 | setup( | 8 | setup( |
| 9 | name='eventmq', | 9 | name='eventmq', |
| 10 | version='0.3.4.6', | 10 | version='0.3.4.9', |
| 11 | description='EventMQ job execution and messaging system based on ZeroMQ', | 11 | description='EventMQ job execution and messaging system based on ZeroMQ', |
| 12 | packages=find_packages(), | 12 | packages=find_packages(), |
| 13 | install_requires=['pyzmq==15.4.0', | 13 | install_requires=['pyzmq==15.4.0', |