diff options
| author | David Hurst | 2017-05-12 12:14:55 -0600 |
|---|---|---|
| committer | David Hurst | 2017-05-12 12:14:55 -0600 |
| commit | 874793e39ffe8ea0846ca686ca7524f2336e748f (patch) | |
| tree | ede71736de51d108ef3739f48dcdd3a4aed862d2 | |
| parent | c5dbd3b914bfefd343bbff1eb95d0d6accac1499 (diff) | |
| parent | b4190739f651401b28324cb14cd1a92cceb4128b (diff) | |
| download | eventmq-874793e39ffe8ea0846ca686ca7524f2336e748f.tar.gz eventmq-874793e39ffe8ea0846ca686ca7524f2336e748f.zip | |
Merge 0.4 into feature/scheduler_status
| -rw-r--r-- | eventmq/jobmanager.py | 5 | ||||
| -rw-r--r-- | eventmq/publisher.py | 5 | ||||
| -rw-r--r-- | eventmq/router.py | 3 | ||||
| -rw-r--r-- | eventmq/scheduler.py | 6 | ||||
| -rw-r--r-- | eventmq/sender.py | 12 | ||||
| -rw-r--r-- | eventmq/settings.py | 4 | ||||
| -rw-r--r-- | eventmq/tests/test_jobmanager.py | 12 | ||||
| -rw-r--r-- | eventmq/tests/test_scheduler.py | 6 | ||||
| -rw-r--r-- | eventmq/tests/test_sender.py | 3 | ||||
| -rw-r--r-- | eventmq/tests/test_utils.py | 2 | ||||
| -rw-r--r-- | eventmq/utils/devices.py | 12 |
11 files changed, 42 insertions, 28 deletions
diff --git a/eventmq/jobmanager.py b/eventmq/jobmanager.py index 78a89e1..dbf930c 100644 --- a/eventmq/jobmanager.py +++ b/eventmq/jobmanager.py | |||
| @@ -81,7 +81,8 @@ class JobManager(HeartbeatMixin, EMQPService): | |||
| 81 | 81 | ||
| 82 | #: Define the name of this JobManager instance. Useful to know when | 82 | #: Define the name of this JobManager instance. Useful to know when |
| 83 | #: referring to the logs. | 83 | #: referring to the logs. |
| 84 | self.name = conf.NAME or generate_device_name() | 84 | self.name = generate_device_name(conf.NAME) |
| 85 | |||
| 85 | logger.info('Initializing JobManager {}...'.format(self.name)) | 86 | logger.info('Initializing JobManager {}...'.format(self.name)) |
| 86 | 87 | ||
| 87 | if not skip_signal: | 88 | if not skip_signal: |
| @@ -96,7 +97,7 @@ class JobManager(HeartbeatMixin, EMQPService): | |||
| 96 | #: then telling the router that it is READY. The reply will be the unit | 97 | #: then telling the router that it is READY. The reply will be the unit |
| 97 | #: of work. | 98 | #: of work. |
| 98 | # Despite the name, jobs are received on this socket | 99 | # Despite the name, jobs are received on this socket |
| 99 | self.frontend = Sender(name=self.name) | 100 | self.frontend = Sender(name=conf.NAME) |
| 100 | 101 | ||
| 101 | self.poller = Poller() | 102 | self.poller = Poller() |
| 102 | 103 | ||
diff --git a/eventmq/publisher.py b/eventmq/publisher.py index 0e8f9b3..7b5c1fb 100644 --- a/eventmq/publisher.py +++ b/eventmq/publisher.py | |||
| @@ -29,14 +29,15 @@ logger = logging.getLogger(__name__) | |||
| 29 | 29 | ||
| 30 | class Publisher(object): | 30 | class Publisher(object): |
| 31 | """ | 31 | """ |
| 32 | name (str): Name of this socket | 32 | name (str): Named prefix of this socket |
| 33 | zcontext (:class:`zmq.Context`): socket context | 33 | zcontext (:class:`zmq.Context`): socket context |
| 34 | zsocket (:class:`zmq.Socket`): | 34 | zsocket (:class:`zmq.Socket`): |
| 35 | """ | 35 | """ |
| 36 | 36 | ||
| 37 | def __init__(self, *args, **kwargs): | 37 | def __init__(self, *args, **kwargs): |
| 38 | self.zcontext = kwargs.get('context', zmq.Context.instance()) | 38 | self.zcontext = kwargs.get('context', zmq.Context.instance()) |
| 39 | self.name = kwargs.get('name', generate_device_name()) | 39 | |
| 40 | self.name = generate_device_name(kwargs.pop('name', None)) | ||
| 40 | 41 | ||
| 41 | self.zsocket = kwargs.get('socket', self.zcontext.socket(zmq.PUB)) | 42 | self.zsocket = kwargs.get('socket', self.zcontext.socket(zmq.PUB)) |
| 42 | self.zsocket.setsockopt(zmq.IDENTITY, self.name) | 43 | self.zsocket.setsockopt(zmq.IDENTITY, self.name) |
diff --git a/eventmq/router.py b/eventmq/router.py index bf9b392..89f92e8 100644 --- a/eventmq/router.py +++ b/eventmq/router.py | |||
| @@ -65,7 +65,8 @@ class Router(HeartbeatMixin): | |||
| 65 | 65 | ||
| 66 | super(Router, self).__init__(*args, **kwargs) # Creates _meta | 66 | super(Router, self).__init__(*args, **kwargs) # Creates _meta |
| 67 | 67 | ||
| 68 | self.name = conf.NAME or generate_device_name() | 68 | self.name = generate_device_name(conf.NAME) |
| 69 | |||
| 69 | logger.info('Initializing Router %s...' % self.name) | 70 | logger.info('Initializing Router %s...' % self.name) |
| 70 | 71 | ||
| 71 | self.poller = poller.Poller() | 72 | self.poller = poller.Poller() |
diff --git a/eventmq/scheduler.py b/eventmq/scheduler.py index e04add6..db72d82 100644 --- a/eventmq/scheduler.py +++ b/eventmq/scheduler.py | |||
| @@ -77,8 +77,10 @@ class Scheduler(HeartbeatMixin, EMQPService): | |||
| 77 | logger.info('Initializing Scheduler...') | 77 | logger.info('Initializing Scheduler...') |
| 78 | 78 | ||
| 79 | super(Scheduler, self).__init__(*args, **kwargs) | 79 | super(Scheduler, self).__init__(*args, **kwargs) |
| 80 | self.name = conf.NAME or generate_device_name() | 80 | |
| 81 | self.frontend = Sender() | 81 | self.name = generate_device_name(conf.NAME) |
| 82 | |||
| 83 | self.frontend = Sender(conf.NAME) | ||
| 82 | self._redis_server = None | 84 | self._redis_server = None |
| 83 | 85 | ||
| 84 | admin_addr = conf.SCHEDULER_ADMINISTRATIVE_LISTEN_ADDR | 86 | admin_addr = conf.SCHEDULER_ADMINISTRATIVE_LISTEN_ADDR |
diff --git a/eventmq/sender.py b/eventmq/sender.py index f0cc09c..4396e7f 100644 --- a/eventmq/sender.py +++ b/eventmq/sender.py | |||
| @@ -18,13 +18,12 @@ | |||
| 18 | The sender is responsible for sending messages | 18 | The sender is responsible for sending messages |
| 19 | """ | 19 | """ |
| 20 | import logging | 20 | import logging |
| 21 | import sys | ||
| 22 | import uuid | ||
| 23 | 21 | ||
| 24 | import zmq | 22 | import zmq |
| 25 | 23 | ||
| 26 | from . import constants, exceptions | 24 | from . import constants, exceptions |
| 27 | from .utils.classes import ZMQReceiveMixin, ZMQSendMixin | 25 | from .utils.classes import ZMQReceiveMixin, ZMQSendMixin |
| 26 | from .utils.devices import generate_device_name | ||
| 28 | 27 | ||
| 29 | logger = logging.getLogger(__name__) | 28 | logger = logging.getLogger(__name__) |
| 30 | 29 | ||
| @@ -61,7 +60,8 @@ class Sender(ZMQSendMixin, ZMQReceiveMixin): | |||
| 61 | # rebuilding it later. | 60 | # rebuilding it later. |
| 62 | self.zsocket = None | 61 | self.zsocket = None |
| 63 | 62 | ||
| 64 | self.name = kwargs.pop('name', str(uuid.uuid4())) | 63 | # Immutable name that was specified at construction time |
| 64 | self.conf_name = kwargs.pop('name', None) | ||
| 65 | 65 | ||
| 66 | self.rebuild(*args, **kwargs) | 66 | self.rebuild(*args, **kwargs) |
| 67 | 67 | ||
| @@ -138,10 +138,8 @@ class Sender(ZMQSendMixin, ZMQReceiveMixin): | |||
| 138 | self.zsocket.close() | 138 | self.zsocket.close() |
| 139 | 139 | ||
| 140 | self.zsocket = kwargs.pop('socket', self.zcontext.socket(zmq.DEALER)) | 140 | self.zsocket = kwargs.pop('socket', self.zcontext.socket(zmq.DEALER)) |
| 141 | if sys.version[0] == '2': | 141 | self.name = generate_device_name(self.conf_name) |
| 142 | self.zsocket.setsockopt(zmq.IDENTITY, self.name) | 142 | self.zsocket.setsockopt(zmq.IDENTITY, self.name) |
| 143 | else: | ||
| 144 | self.zsocket.setsockopt_string(zmq.IDENTITY, str(self.name)) | ||
| 145 | 143 | ||
| 146 | self.status = constants.STATUS.ready | 144 | self.status = constants.STATUS.ready |
| 147 | 145 | ||
diff --git a/eventmq/settings.py b/eventmq/settings.py index 0d225b5..826b3a1 100644 --- a/eventmq/settings.py +++ b/eventmq/settings.py | |||
| @@ -132,8 +132,8 @@ _CONFIG_DEFS = { | |||
| 132 | 'long-arg': '--name', | 132 | 'long-arg': '--name', |
| 133 | 'short-arg': '-n', | 133 | 'short-arg': '-n', |
| 134 | 'type': str, | 134 | 'type': str, |
| 135 | 'help': "A unique ame to give this node. If one isn't provided a " | 135 | 'help': "A unique name to give this node. If one isn't provided a " |
| 136 | "random uuid will be generated", | 136 | "random uuid will be generated in its place", |
| 137 | }, | 137 | }, |
| 138 | }, | 138 | }, |
| 139 | 'router': { | 139 | 'router': { |
diff --git a/eventmq/tests/test_jobmanager.py b/eventmq/tests/test_jobmanager.py index 4da2a6e..a19dfd7 100644 --- a/eventmq/tests/test_jobmanager.py +++ b/eventmq/tests/test_jobmanager.py | |||
| @@ -23,20 +23,26 @@ ADDR = 'inproc://pour_the_rice_in_the_thing' | |||
| 23 | 23 | ||
| 24 | 24 | ||
| 25 | class TestCase(unittest.TestCase): | 25 | class TestCase(unittest.TestCase): |
| 26 | def test__setup(self): | 26 | @mock.patch('uuid.uuid4') |
| 27 | def test__setup(self, name_mock): | ||
| 28 | name_mock.return_value = 'some_uuid' | ||
| 27 | override_settings = { | 29 | override_settings = { |
| 28 | 'NAME': 'RuckusBringer' | 30 | 'NAME': 'RuckusBringer' |
| 29 | } | 31 | } |
| 30 | jm = jobmanager.JobManager(override_settings=override_settings) | 32 | jm = jobmanager.JobManager(override_settings=override_settings) |
| 31 | self.assertEqual(jm.name, 'RuckusBringer') | 33 | self.assertEqual(jm.name.decode('ascii'), 'RuckusBringer:some_uuid') |
| 32 | 34 | ||
| 33 | self.assertFalse(jm.awaiting_startup_ack) | 35 | self.assertFalse(jm.awaiting_startup_ack) |
| 34 | self.assertEqual(jm.status, constants.STATUS.ready) | 36 | self.assertEqual(jm.status, constants.STATUS.ready) |
| 35 | 37 | ||
| 36 | # EMQP Tests | 38 | # EMQP Tests |
| 37 | def test_reset(self): | 39 | @mock.patch('uuid.uuid4') |
| 40 | def test_reset(self, name_mock): | ||
| 41 | name_mock.return_value = 'some_uuid' | ||
| 38 | jm = jobmanager.JobManager() | 42 | jm = jobmanager.JobManager() |
| 39 | 43 | ||
| 44 | self.assertEqual(jm.name.decode('ascii'), 'some_uuid') | ||
| 45 | |||
| 40 | self.assertFalse(jm.awaiting_startup_ack) | 46 | self.assertFalse(jm.awaiting_startup_ack) |
| 41 | self.assertEqual(jm.status, constants.STATUS.ready) | 47 | self.assertEqual(jm.status, constants.STATUS.ready) |
| 42 | 48 | ||
diff --git a/eventmq/tests/test_scheduler.py b/eventmq/tests/test_scheduler.py index cf5733a..82b459d 100644 --- a/eventmq/tests/test_scheduler.py +++ b/eventmq/tests/test_scheduler.py | |||
| @@ -22,12 +22,14 @@ from .. import constants, scheduler, utils | |||
| 22 | 22 | ||
| 23 | 23 | ||
| 24 | class TestCase(unittest.TestCase): | 24 | class TestCase(unittest.TestCase): |
| 25 | def test__setup(self): | 25 | @mock.patch('uuid.uuid4') |
| 26 | def test__setup(self, name_mock): | ||
| 27 | name_mock.return_value = 'some_uuid' | ||
| 26 | override_settings = { | 28 | override_settings = { |
| 27 | 'NAME': 'RuckasBringer' | 29 | 'NAME': 'RuckasBringer' |
| 28 | } | 30 | } |
| 29 | sched = scheduler.Scheduler(override_settings=override_settings) | 31 | sched = scheduler.Scheduler(override_settings=override_settings) |
| 30 | self.assertEqual(sched.name, 'RuckasBringer') | 32 | self.assertEqual(sched.name.decode('ascii'), 'RuckasBringer:some_uuid') |
| 31 | 33 | ||
| 32 | self.assertFalse(sched.awaiting_startup_ack) | 34 | self.assertFalse(sched.awaiting_startup_ack) |
| 33 | self.assertEqual(sched.status, constants.STATUS.ready) | 35 | self.assertEqual(sched.status, constants.STATUS.ready) |
diff --git a/eventmq/tests/test_sender.py b/eventmq/tests/test_sender.py index f292696..f1a9984 100644 --- a/eventmq/tests/test_sender.py +++ b/eventmq/tests/test_sender.py | |||
| @@ -35,7 +35,8 @@ class TestCase(unittest.TestCase): | |||
| 35 | 35 | ||
| 36 | self.assert_(socket.poll() != 0) | 36 | self.assert_(socket.poll() != 0) |
| 37 | msg = socket.recv_multipart() | 37 | msg = socket.recv_multipart() |
| 38 | self.assertEqual(msg[0].decode('ascii'), self.sender.name) | 38 | self.assertEqual(msg[0].decode('ascii'), |
| 39 | self.sender.name.decode('ascii')) | ||
| 39 | self.assertEqual(msg[1], b'') | 40 | self.assertEqual(msg[1], b'') |
| 40 | self.assertEqual(msg[2], b'1') | 41 | self.assertEqual(msg[2], b'1') |
| 41 | self.assertEqual(msg[3], b'Hello!') | 42 | self.assertEqual(msg[3], b'Hello!') |
diff --git a/eventmq/tests/test_utils.py b/eventmq/tests/test_utils.py index 96d8c0f..45eba50 100644 --- a/eventmq/tests/test_utils.py +++ b/eventmq/tests/test_utils.py | |||
| @@ -142,7 +142,7 @@ class TestCase(unittest.TestCase): | |||
| 142 | 142 | ||
| 143 | def test_emqDeque(self): | 143 | def test_emqDeque(self): |
| 144 | 144 | ||
| 145 | full = random.randint(1, 100) | 145 | full = random.randint(2, 100) |
| 146 | pfull = random.randint(1, full-1) | 146 | pfull = random.randint(1, full-1) |
| 147 | 147 | ||
| 148 | q = classes.EMQdeque(full=full, | 148 | q = classes.EMQdeque(full=full, |
diff --git a/eventmq/utils/devices.py b/eventmq/utils/devices.py index db7a655..7fbb028 100644 --- a/eventmq/utils/devices.py +++ b/eventmq/utils/devices.py | |||
| @@ -26,11 +26,13 @@ def generate_device_name(prefix=None): | |||
| 26 | Args: | 26 | Args: |
| 27 | prefix (str): Prefix the id with this string. | 27 | prefix (str): Prefix the id with this string. |
| 28 | 28 | ||
| 29 | Returns (str) An ascii encoded string that can be used as an IDENTITY for a | 29 | Returns (str/bytes) An ascii encoded string that can be used as an IDENTITY |
| 30 | ZMQ socket. | 30 | for a ZMQ socket. In python3 this returns a `bytes`, python2 a 'str |
| 31 | """ | 31 | """ |
| 32 | import uuid | 32 | import uuid |
| 33 | ret = str(uuid.uuid4()).encode('ascii') | 33 | ret = str(uuid.uuid4()) |
| 34 | |||
| 34 | if prefix: | 35 | if prefix: |
| 35 | ret = prefix + ret | 36 | ret = "{}:{}".format(prefix, ret) |
| 36 | return ret | 37 | |
| 38 | return ret.encode('ascii') | ||