aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--eventmq/router.py2
-rw-r--r--eventmq/utils/classes.py12
2 files changed, 12 insertions, 2 deletions
diff --git a/eventmq/router.py b/eventmq/router.py
index 7c221bc..c35e88c 100644
--- a/eventmq/router.py
+++ b/eventmq/router.py
@@ -502,7 +502,7 @@ class Router(HeartbeatMixin):
502 else: 502 else:
503 self.processed_message_counts[queue_name] += 1 503 self.processed_message_counts[queue_name] += 1
504 504
505 if queue_name not in self.processed_message_counts_by_worker: 505 if worker_addr not in self.processed_message_counts_by_worker:
506 self.processed_message_counts_by_worker[worker_addr] = 1 506 self.processed_message_counts_by_worker[worker_addr] = 1
507 else: 507 else:
508 self.processed_message_counts_by_worker[worker_addr] += 1 508 self.processed_message_counts_by_worker[worker_addr] += 1
diff --git a/eventmq/utils/classes.py b/eventmq/utils/classes.py
index 73c6364..1148da3 100644
--- a/eventmq/utils/classes.py
+++ b/eventmq/utils/classes.py
@@ -20,6 +20,7 @@ Defines some classes to use when implementing ZMQ devices
20from collections import deque 20from collections import deque
21import json 21import json
22import logging 22import logging
23import sys
23 24
24import zmq.error 25import zmq.error
25 26
@@ -344,7 +345,11 @@ class ZMQReceiveMixin(object):
344 """ 345 """
345 Receive a message 346 Receive a message
346 """ 347 """
347 msg = self.zsocket.recv() 348 if sys.version[0] == '2':
349 msg = self.zsocket.recv()
350 else:
351 msg = self.zsocket.recv_string()
352
348 if not ("HEARTBEAT" == msg[2] or "HEARTBEAT" == msg[3]) or \ 353 if not ("HEARTBEAT" == msg[2] or "HEARTBEAT" == msg[3]) or \
349 not conf.HIDE_HEARTBEAT_LOGS: 354 not conf.HIDE_HEARTBEAT_LOGS:
350 logger.debug('Received message: {}'.format(msg)) 355 logger.debug('Received message: {}'.format(msg))
@@ -355,6 +360,11 @@ class ZMQReceiveMixin(object):
355 Receive a multipart message 360 Receive a multipart message
356 """ 361 """
357 msg = self.zsocket.recv_multipart() 362 msg = self.zsocket.recv_multipart()
363
364 # Decode bytes to strings in python3
365 if type(msg[0] in (bytes,)):
366 msg = [m.decode() for m in msg]
367
358 # If it's not at least 4 frames long then most likely it isn't an 368 # If it's not at least 4 frames long then most likely it isn't an
359 # eventmq message 369 # eventmq message
360 if len(msg) >= 4 and \ 370 if len(msg) >= 4 and \