From 962903afc7634d52444ec047a4ee59d9e19ec330 Mon Sep 17 00:00:00 2001 From: tpavelchak Date: Fri, 7 Feb 2020 15:43:26 +0200 Subject: Fix Python 2/3 compatibility issues - version bump 0.3.12 - upgrade `six, flake8` libs - fix `flake8` check issues - add import of `from __future__ import print_function` - fix params for `zsocket.setsockopt()` and `hashlib.sha1()` methods - use `range, map` functions from `six` - wrap result of `map, dict.keys(), dict.values()` functions in a list - fix `EMQdeque.__unicode__()` method for py2 support - fix using of exception messages - update integer division for support the same behavior in py3 as is in py2 - refactor using of `filter` function --- bin/logwatcher.py | 4 +++- eventmq/__init__.py | 2 +- eventmq/jobmanager.py | 6 +++--- eventmq/pub.py | 1 - eventmq/publisher.py | 4 +++- eventmq/receiver.py | 4 +++- eventmq/router.py | 12 +++++++----- eventmq/scheduler.py | 3 ++- eventmq/subscriber.py | 5 ++++- eventmq/switch.py | 2 +- eventmq/tests/test_router.py | 3 ++- eventmq/tests/test_utils.py | 1 + eventmq/utils/__init__.py | 1 + eventmq/utils/classes.py | 8 ++++---- eventmq/utils/encoding.py | 1 + eventmq/utils/functions.py | 4 +++- eventmq/utils/messages.py | 4 ++-- eventmq/utils/settings.py | 2 ++ eventmq/worker.py | 2 +- setup.py | 8 ++++---- 20 files changed, 48 insertions(+), 29 deletions(-) diff --git a/bin/logwatcher.py b/bin/logwatcher.py index 1abeb4a..d76b0a1 100644 --- a/bin/logwatcher.py +++ b/bin/logwatcher.py @@ -1,9 +1,11 @@ #!/usr/bin/env python +from __future__ import print_function + import zmq s = zmq.Context.instance().socket(zmq.SUB) -s.setsockopt(zmq.SUBSCRIBE, '') +s.setsockopt(zmq.SUBSCRIBE, b'') s.connect('tcp://127.0.0.1:33445') poller = zmq.Poller() diff --git a/eventmq/__init__.py b/eventmq/__init__.py index 77bbe4a..d9b9226 100644 --- a/eventmq/__init__.py +++ b/eventmq/__init__.py @@ -1,5 +1,5 @@ __author__ = 'EventMQ Contributors' -__version__ = '0.3.11' +__version__ = '0.3.12' PROTOCOL_VERSION = 'eMQP/1.0' diff --git a/eventmq/jobmanager.py b/eventmq/jobmanager.py index f4a928c..e15e49f 100644 --- a/eventmq/jobmanager.py +++ b/eventmq/jobmanager.py @@ -18,7 +18,6 @@ Ensures things about jobs and spawns the actual tasks """ from json import dumps as serializer, loads as deserializer - import logging from multiprocessing import Manager as MPManager import os @@ -26,6 +25,7 @@ import signal import sys import time +from six.moves import range import zmq from eventmq.log import setup_logger @@ -142,7 +142,7 @@ class JobManager(HeartbeatMixin, EMQPService): w.start() self._workers[w.pid] = w - return self._workers.values() + return list(self._workers.values()) def _start_event_loop(self): """ @@ -333,7 +333,7 @@ class JobManager(HeartbeatMixin, EMQPService): Worker died of natural causes, ensure its death and remove from tracking, will be replaced on next heartbeat """ - if pid in self._workers.keys(): + if pid in list(self._workers.keys()): del self._workers[pid] def worker_ready(self, reply, msgid, death, pid): diff --git a/eventmq/pub.py b/eventmq/pub.py index 3918580..8575278 100644 --- a/eventmq/pub.py +++ b/eventmq/pub.py @@ -20,7 +20,6 @@ Publishes messages to subscribers import logging from eventmq.log import setup_logger - from . import conf, poller, publisher, receiver from .constants import STATUS from .utils.classes import HeartbeatMixin diff --git a/eventmq/publisher.py b/eventmq/publisher.py index 2e77bcd..cf29a2e 100644 --- a/eventmq/publisher.py +++ b/eventmq/publisher.py @@ -19,6 +19,7 @@ Publishes messages to subscribers """ import logging +import six import zmq from . import constants @@ -36,7 +37,8 @@ class Publisher(): def __init__(self, *args, **kwargs): self.zcontext = kwargs.get('context', zmq.Context.instance()) - self.name = kwargs.get('name', generate_device_name()) + self.name = six.ensure_binary(kwargs.get('name', + generate_device_name())) self.zsocket = kwargs.get('socket', self.zcontext.socket(zmq.PUB)) self.zsocket.setsockopt(zmq.IDENTITY, self.name) diff --git a/eventmq/receiver.py b/eventmq/receiver.py index dc0ead8..b901405 100644 --- a/eventmq/receiver.py +++ b/eventmq/receiver.py @@ -19,6 +19,7 @@ The receiver is responsible for receiveing messages """ import logging +import six import zmq from . import conf, constants @@ -59,7 +60,8 @@ class Receiver(ZMQReceiveMixin, ZMQSendMixin): self.zcontext = kwargs.get('context', zmq.Context.instance()) self.zcontext.set(zmq.MAX_SOCKETS, conf.MAX_SOCKETS) - self.name = kwargs.get('name', generate_device_name()) + self.name = six.ensure_binary(kwargs.get('name', + generate_device_name())) self.zsocket = kwargs.get('socket', self.zcontext.socket(zmq.ROUTER)) self.zsocket.setsockopt(zmq.IDENTITY, self.name) diff --git a/eventmq/router.py b/eventmq/router.py index 81ca257..3c141c3 100644 --- a/eventmq/router.py +++ b/eventmq/router.py @@ -22,6 +22,8 @@ import json # deserialize queues in on_inform. should be refactored import logging import signal +from six.moves import map + from eventmq.log import setup_logger, setup_wal_logger from . import __version__ from . import conf, constants, exceptions, poller, receiver @@ -407,7 +409,7 @@ class Router(HeartbeatMixin): # Assumes the highest priority queue comes first for queue in queue_names: queue_name = queue[1] - if queue_name in self.waiting_messages.keys(): + if queue_name in list(self.waiting_messages.keys()): logger.debug('Found waiting message in the %s waiting_messages' ' queue' % queue_name) msg = self.waiting_messages[queue_name].peekleft() @@ -476,7 +478,7 @@ class Router(HeartbeatMixin): total_mem = psutil.virtual_memory().total # Set queue limit to be 75% of total memory with ~100 byte # messages - limit = int((total_mem / 100) * 0.75) + limit = int(int(total_mem / 100) * 0.75) self.waiting_messages[queue_name] = EMQdeque( full=limit, on_full=router_on_full) else: @@ -866,7 +868,7 @@ class Router(HeartbeatMixin): for queue in worker['queues']: name = queue[1] workers = self.queues[name] - revised_list = filter(lambda x: x[1] != worker_id, workers) + revised_list = [x for x in workers if x[1] != worker_id] self.queues[name] = revised_list logger.debug('Removed worker - {} from {}'.format(worker_id, name)) @@ -879,8 +881,8 @@ class Router(HeartbeatMixin): """ self.schedulers.pop(scheduler_id) schedulers_to_remove = self.scheduler_queue - self.scheduler_queue = filter(lambda x: x != scheduler_id, - schedulers_to_remove) + self.scheduler_queue = \ + [x for x in schedulers_to_remove if x != scheduler_id] logger.debug('Removed scheduler - {} from known schedulers'.format( scheduler_id)) diff --git a/eventmq/scheduler.py b/eventmq/scheduler.py index e7c13ec..afdb116 100644 --- a/eventmq/scheduler.py +++ b/eventmq/scheduler.py @@ -17,6 +17,8 @@ ============================= Handles cron and other scheduled tasks """ +from __future__ import print_function + from hashlib import sha1 as emq_hash import importlib import json @@ -29,7 +31,6 @@ from croniter import croniter from six import iteritems, next from eventmq.log import setup_logger - from . import __version__ from . import conf, constants from .client.messages import send_request diff --git a/eventmq/subscriber.py b/eventmq/subscriber.py index ef1aa0b..5aef82f 100644 --- a/eventmq/subscriber.py +++ b/eventmq/subscriber.py @@ -1,7 +1,10 @@ """ derp subscriber """ +from __future__ import print_function + from past.builtins import xrange +import six import zmq @@ -11,7 +14,7 @@ if __name__ == "__main__": ctx = zmq.Context() s = ctx.socket(zmq.SUB) s.linger = 0 - s.setsockopt(zmq.SUBSCRIBE, str(i)) + s.setsockopt(zmq.SUBSCRIBE, six.ensure_binary(str(i))) s.connect('tcp://127.0.0.1:47299') sockets.append(s) diff --git a/eventmq/switch.py b/eventmq/switch.py index f6e94b4..5fe6861 100644 --- a/eventmq/switch.py +++ b/eventmq/switch.py @@ -5,5 +5,5 @@ if __name__ == "__main__": # switch.listen('tcp://127.0.0.1:47331', 'tcp://127.0.0.1:47330') try: switch.start() - except: + except Exception: switch.stop() diff --git a/eventmq/tests/test_router.py b/eventmq/tests/test_router.py index cf7c9ef..0a315e3 100644 --- a/eventmq/tests/test_router.py +++ b/eventmq/tests/test_router.py @@ -18,6 +18,7 @@ import unittest from freezegun import freeze_time import mock +from six.moves import range from testfixtures import LogCapture import zmq @@ -318,7 +319,7 @@ class TestCase(unittest.TestCase): # There should be no keys because the code checks for their existence # to know if there is a waiting message - self.assertEqual(0, len(self.router.waiting_messages.keys())) + self.assertEqual(0, len(list(self.router.waiting_messages.keys()))) # No waiting messages self.router.on_ready(worker1_id, ready_msgid1, ['READY', ready_msgid1]) diff --git a/eventmq/tests/test_utils.py b/eventmq/tests/test_utils.py index 5173832..e71f2e3 100644 --- a/eventmq/tests/test_utils.py +++ b/eventmq/tests/test_utils.py @@ -30,6 +30,7 @@ import sys import unittest import mock +from six.moves import range from .. import conf from .. import constants diff --git a/eventmq/utils/__init__.py b/eventmq/utils/__init__.py index dad8299..1f65d65 100644 --- a/eventmq/utils/__init__.py +++ b/eventmq/utils/__init__.py @@ -27,6 +27,7 @@ like creating message more simple. settings timeutils """ +from six.moves import map def random_characters(): diff --git a/eventmq/utils/classes.py b/eventmq/utils/classes.py index 5788e94..14409ea 100644 --- a/eventmq/utils/classes.py +++ b/eventmq/utils/classes.py @@ -422,7 +422,7 @@ class ZMQSendMixin(object): self.zsocket.send_multipart(msg, flags=zmq.NOBLOCK) except zmq.error.ZMQError as e: - if 'No route' in e.message: + if 'No route' in str(e): raise exceptions.PeerGoneAwayError(e) def send(self, message, protocol_version): @@ -469,7 +469,7 @@ class EMQdeque(object): return "{}".format(str(self._queue)) def __unicode__(self): - return "{}".format(six.u(self._queue)) + return "{}".format(six.text_type(self._queue)) def __repr__(self): return "{}".format(repr(self._queue)) @@ -519,7 +519,7 @@ class EMQdeque(object): bool: True if the deque contains at least ``full`` items. False otherwise """ - if self.full and self.full is not 0: + if self.full and self.full != 0: return len(self._queue) >= self.full else: return False @@ -541,7 +541,7 @@ class EMQdeque(object): bool: True if the deque contains at least ``pfull`` items. False otherwise """ - if self.pfull and self.pfull is not 0: + if self.pfull and self.pfull != 0: return len(self._queue) >= self.pfull else: return False diff --git a/eventmq/utils/encoding.py b/eventmq/utils/encoding.py index d45624e..333ff48 100644 --- a/eventmq/utils/encoding.py +++ b/eventmq/utils/encoding.py @@ -13,6 +13,7 @@ # You should have received a copy of the GNU Lesser General Public License # along with eventmq. If not, see . from past.builtins import basestring +from six.moves import range from .. import conf diff --git a/eventmq/utils/functions.py b/eventmq/utils/functions.py index f47526b..556107d 100644 --- a/eventmq/utils/functions.py +++ b/eventmq/utils/functions.py @@ -4,6 +4,8 @@ import inspect import json import sys +import six + from .. import log from ..exceptions import CallableFromPathError @@ -70,7 +72,7 @@ def arguments_hash(*args, **kwargs): } data = json.dumps(args, cls=IgnoreJSONEncoder) - return hashlib.sha1(data).hexdigest() + return hashlib.sha1(six.ensure_binary(data)).hexdigest() def name_from_callable(func): diff --git a/eventmq/utils/messages.py b/eventmq/utils/messages.py index 02ac129..d652c4c 100644 --- a/eventmq/utils/messages.py +++ b/eventmq/utils/messages.py @@ -170,8 +170,8 @@ def fwd_emqp_router_message(socket, recipient_id, payload): flags=zmq.NOBLOCK) except zmq.error.ZMQError as e: if e.errno in errnos: - e.message = e.message + " {}".format(recipient_id) - raise exceptions.PeerGoneAwayError(e) + e_message = str(e) + " {}".format(recipient_id) + raise exceptions.PeerGoneAwayError(e_message) else: raise exceptions.EventMQError("errno {}: {}".format(e.errno, str(e))) diff --git a/eventmq/utils/settings.py b/eventmq/utils/settings.py index ade76c2..700d983 100644 --- a/eventmq/utils/settings.py +++ b/eventmq/utils/settings.py @@ -28,6 +28,8 @@ import json import logging import os +from six.moves import map + from . import tuplify from .. import conf diff --git a/eventmq/worker.py b/eventmq/worker.py index 1f3a6fa..0b22bad 100644 --- a/eventmq/worker.py +++ b/eventmq/worker.py @@ -93,7 +93,7 @@ class MultiprocessWorker(Process): if os.getppid() != self.ppid: break continue - except Exception as e: + except Exception: break finally: # If I'm an orphan, die diff --git a/setup.py b/setup.py index b4adf1d..3bcbc4c 100644 --- a/setup.py +++ b/setup.py @@ -20,7 +20,7 @@ setup( packages=find_packages(), install_requires=[ 'pyzmq==18.1.0', - 'six==1.10.0', + 'six==1.14.0', 'monotonic==0.4', 'croniter==0.3.10', 'future==0.15.2', @@ -29,9 +29,9 @@ setup( extras_require={ 'docs': ['Sphinx==1.5.2', ], 'testing': [ - 'flake8==3.2.1', - 'flake8-import-order==0.11', - 'flake8-print==2.0.2', + 'flake8==3.7.8', + 'flake8-import-order==0.18.1', + 'flake8-print==3.1.0', 'coverage==4.0.3', 'testfixtures==4.7.0', 'freezegun==0.3.7', -- cgit v1.2.1