diff options
| -rw-r--r-- | eventmq/router.py | 2 | ||||
| -rw-r--r-- | eventmq/utils/classes.py | 12 |
2 files changed, 12 insertions, 2 deletions
diff --git a/eventmq/router.py b/eventmq/router.py index 7c221bc..c35e88c 100644 --- a/eventmq/router.py +++ b/eventmq/router.py | |||
| @@ -502,7 +502,7 @@ class Router(HeartbeatMixin): | |||
| 502 | else: | 502 | else: |
| 503 | self.processed_message_counts[queue_name] += 1 | 503 | self.processed_message_counts[queue_name] += 1 |
| 504 | 504 | ||
| 505 | if queue_name not in self.processed_message_counts_by_worker: | 505 | if worker_addr not in self.processed_message_counts_by_worker: |
| 506 | self.processed_message_counts_by_worker[worker_addr] = 1 | 506 | self.processed_message_counts_by_worker[worker_addr] = 1 |
| 507 | else: | 507 | else: |
| 508 | self.processed_message_counts_by_worker[worker_addr] += 1 | 508 | self.processed_message_counts_by_worker[worker_addr] += 1 |
diff --git a/eventmq/utils/classes.py b/eventmq/utils/classes.py index 73c6364..1148da3 100644 --- a/eventmq/utils/classes.py +++ b/eventmq/utils/classes.py | |||
| @@ -20,6 +20,7 @@ Defines some classes to use when implementing ZMQ devices | |||
| 20 | from collections import deque | 20 | from collections import deque |
| 21 | import json | 21 | import json |
| 22 | import logging | 22 | import logging |
| 23 | import sys | ||
| 23 | 24 | ||
| 24 | import zmq.error | 25 | import zmq.error |
| 25 | 26 | ||
| @@ -344,7 +345,11 @@ class ZMQReceiveMixin(object): | |||
| 344 | """ | 345 | """ |
| 345 | Receive a message | 346 | Receive a message |
| 346 | """ | 347 | """ |
| 347 | msg = self.zsocket.recv() | 348 | if sys.version[0] == '2': |
| 349 | msg = self.zsocket.recv() | ||
| 350 | else: | ||
| 351 | msg = self.zsocket.recv_string() | ||
| 352 | |||
| 348 | if not ("HEARTBEAT" == msg[2] or "HEARTBEAT" == msg[3]) or \ | 353 | if not ("HEARTBEAT" == msg[2] or "HEARTBEAT" == msg[3]) or \ |
| 349 | not conf.HIDE_HEARTBEAT_LOGS: | 354 | not conf.HIDE_HEARTBEAT_LOGS: |
| 350 | logger.debug('Received message: {}'.format(msg)) | 355 | logger.debug('Received message: {}'.format(msg)) |
| @@ -355,6 +360,11 @@ class ZMQReceiveMixin(object): | |||
| 355 | Receive a multipart message | 360 | Receive a multipart message |
| 356 | """ | 361 | """ |
| 357 | msg = self.zsocket.recv_multipart() | 362 | msg = self.zsocket.recv_multipart() |
| 363 | |||
| 364 | # Decode bytes to strings in python3 | ||
| 365 | if type(msg[0] in (bytes,)): | ||
| 366 | msg = [m.decode() for m in msg] | ||
| 367 | |||
| 358 | # If it's not at least 4 frames long then most likely it isn't an | 368 | # If it's not at least 4 frames long then most likely it isn't an |
| 359 | # eventmq message | 369 | # eventmq message |
| 360 | if len(msg) >= 4 and \ | 370 | if len(msg) >= 4 and \ |