aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--eventmq/jobmanager.py5
-rw-r--r--eventmq/publisher.py5
-rw-r--r--eventmq/router.py3
-rw-r--r--eventmq/scheduler.py6
-rw-r--r--eventmq/sender.py12
-rw-r--r--eventmq/settings.py4
-rw-r--r--eventmq/tests/test_jobmanager.py12
-rw-r--r--eventmq/tests/test_scheduler.py6
-rw-r--r--eventmq/tests/test_sender.py3
-rw-r--r--eventmq/tests/test_utils.py2
-rw-r--r--eventmq/utils/devices.py12
11 files changed, 42 insertions, 28 deletions
diff --git a/eventmq/jobmanager.py b/eventmq/jobmanager.py
index 78a89e1..dbf930c 100644
--- a/eventmq/jobmanager.py
+++ b/eventmq/jobmanager.py
@@ -81,7 +81,8 @@ class JobManager(HeartbeatMixin, EMQPService):
81 81
82 #: Define the name of this JobManager instance. Useful to know when 82 #: Define the name of this JobManager instance. Useful to know when
83 #: referring to the logs. 83 #: referring to the logs.
84 self.name = conf.NAME or generate_device_name() 84 self.name = generate_device_name(conf.NAME)
85
85 logger.info('Initializing JobManager {}...'.format(self.name)) 86 logger.info('Initializing JobManager {}...'.format(self.name))
86 87
87 if not skip_signal: 88 if not skip_signal:
@@ -96,7 +97,7 @@ class JobManager(HeartbeatMixin, EMQPService):
96 #: then telling the router that it is READY. The reply will be the unit 97 #: then telling the router that it is READY. The reply will be the unit
97 #: of work. 98 #: of work.
98 # Despite the name, jobs are received on this socket 99 # Despite the name, jobs are received on this socket
99 self.frontend = Sender(name=self.name) 100 self.frontend = Sender(name=conf.NAME)
100 101
101 self.poller = Poller() 102 self.poller = Poller()
102 103
diff --git a/eventmq/publisher.py b/eventmq/publisher.py
index 0e8f9b3..7b5c1fb 100644
--- a/eventmq/publisher.py
+++ b/eventmq/publisher.py
@@ -29,14 +29,15 @@ logger = logging.getLogger(__name__)
29 29
30class Publisher(object): 30class Publisher(object):
31 """ 31 """
32 name (str): Name of this socket 32 name (str): Named prefix of this socket
33 zcontext (:class:`zmq.Context`): socket context 33 zcontext (:class:`zmq.Context`): socket context
34 zsocket (:class:`zmq.Socket`): 34 zsocket (:class:`zmq.Socket`):
35 """ 35 """
36 36
37 def __init__(self, *args, **kwargs): 37 def __init__(self, *args, **kwargs):
38 self.zcontext = kwargs.get('context', zmq.Context.instance()) 38 self.zcontext = kwargs.get('context', zmq.Context.instance())
39 self.name = kwargs.get('name', generate_device_name()) 39
40 self.name = generate_device_name(kwargs.pop('name', None))
40 41
41 self.zsocket = kwargs.get('socket', self.zcontext.socket(zmq.PUB)) 42 self.zsocket = kwargs.get('socket', self.zcontext.socket(zmq.PUB))
42 self.zsocket.setsockopt(zmq.IDENTITY, self.name) 43 self.zsocket.setsockopt(zmq.IDENTITY, self.name)
diff --git a/eventmq/router.py b/eventmq/router.py
index bf9b392..89f92e8 100644
--- a/eventmq/router.py
+++ b/eventmq/router.py
@@ -65,7 +65,8 @@ class Router(HeartbeatMixin):
65 65
66 super(Router, self).__init__(*args, **kwargs) # Creates _meta 66 super(Router, self).__init__(*args, **kwargs) # Creates _meta
67 67
68 self.name = conf.NAME or generate_device_name() 68 self.name = generate_device_name(conf.NAME)
69
69 logger.info('Initializing Router %s...' % self.name) 70 logger.info('Initializing Router %s...' % self.name)
70 71
71 self.poller = poller.Poller() 72 self.poller = poller.Poller()
diff --git a/eventmq/scheduler.py b/eventmq/scheduler.py
index e04add6..db72d82 100644
--- a/eventmq/scheduler.py
+++ b/eventmq/scheduler.py
@@ -77,8 +77,10 @@ class Scheduler(HeartbeatMixin, EMQPService):
77 logger.info('Initializing Scheduler...') 77 logger.info('Initializing Scheduler...')
78 78
79 super(Scheduler, self).__init__(*args, **kwargs) 79 super(Scheduler, self).__init__(*args, **kwargs)
80 self.name = conf.NAME or generate_device_name() 80
81 self.frontend = Sender() 81 self.name = generate_device_name(conf.NAME)
82
83 self.frontend = Sender(conf.NAME)
82 self._redis_server = None 84 self._redis_server = None
83 85
84 admin_addr = conf.SCHEDULER_ADMINISTRATIVE_LISTEN_ADDR 86 admin_addr = conf.SCHEDULER_ADMINISTRATIVE_LISTEN_ADDR
diff --git a/eventmq/sender.py b/eventmq/sender.py
index f0cc09c..4396e7f 100644
--- a/eventmq/sender.py
+++ b/eventmq/sender.py
@@ -18,13 +18,12 @@
18The sender is responsible for sending messages 18The sender is responsible for sending messages
19""" 19"""
20import logging 20import logging
21import sys
22import uuid
23 21
24import zmq 22import zmq
25 23
26from . import constants, exceptions 24from . import constants, exceptions
27from .utils.classes import ZMQReceiveMixin, ZMQSendMixin 25from .utils.classes import ZMQReceiveMixin, ZMQSendMixin
26from .utils.devices import generate_device_name
28 27
29logger = logging.getLogger(__name__) 28logger = logging.getLogger(__name__)
30 29
@@ -61,7 +60,8 @@ class Sender(ZMQSendMixin, ZMQReceiveMixin):
61 # rebuilding it later. 60 # rebuilding it later.
62 self.zsocket = None 61 self.zsocket = None
63 62
64 self.name = kwargs.pop('name', str(uuid.uuid4())) 63 # Immutable name that was specified at construction time
64 self.conf_name = kwargs.pop('name', None)
65 65
66 self.rebuild(*args, **kwargs) 66 self.rebuild(*args, **kwargs)
67 67
@@ -138,10 +138,8 @@ class Sender(ZMQSendMixin, ZMQReceiveMixin):
138 self.zsocket.close() 138 self.zsocket.close()
139 139
140 self.zsocket = kwargs.pop('socket', self.zcontext.socket(zmq.DEALER)) 140 self.zsocket = kwargs.pop('socket', self.zcontext.socket(zmq.DEALER))
141 if sys.version[0] == '2': 141 self.name = generate_device_name(self.conf_name)
142 self.zsocket.setsockopt(zmq.IDENTITY, self.name) 142 self.zsocket.setsockopt(zmq.IDENTITY, self.name)
143 else:
144 self.zsocket.setsockopt_string(zmq.IDENTITY, str(self.name))
145 143
146 self.status = constants.STATUS.ready 144 self.status = constants.STATUS.ready
147 145
diff --git a/eventmq/settings.py b/eventmq/settings.py
index 0d225b5..826b3a1 100644
--- a/eventmq/settings.py
+++ b/eventmq/settings.py
@@ -132,8 +132,8 @@ _CONFIG_DEFS = {
132 'long-arg': '--name', 132 'long-arg': '--name',
133 'short-arg': '-n', 133 'short-arg': '-n',
134 'type': str, 134 'type': str,
135 'help': "A unique ame to give this node. If one isn't provided a " 135 'help': "A unique name to give this node. If one isn't provided a "
136 "random uuid will be generated", 136 "random uuid will be generated in its place",
137 }, 137 },
138 }, 138 },
139 'router': { 139 'router': {
diff --git a/eventmq/tests/test_jobmanager.py b/eventmq/tests/test_jobmanager.py
index 4da2a6e..a19dfd7 100644
--- a/eventmq/tests/test_jobmanager.py
+++ b/eventmq/tests/test_jobmanager.py
@@ -23,20 +23,26 @@ ADDR = 'inproc://pour_the_rice_in_the_thing'
23 23
24 24
25class TestCase(unittest.TestCase): 25class TestCase(unittest.TestCase):
26 def test__setup(self): 26 @mock.patch('uuid.uuid4')
27 def test__setup(self, name_mock):
28 name_mock.return_value = 'some_uuid'
27 override_settings = { 29 override_settings = {
28 'NAME': 'RuckusBringer' 30 'NAME': 'RuckusBringer'
29 } 31 }
30 jm = jobmanager.JobManager(override_settings=override_settings) 32 jm = jobmanager.JobManager(override_settings=override_settings)
31 self.assertEqual(jm.name, 'RuckusBringer') 33 self.assertEqual(jm.name.decode('ascii'), 'RuckusBringer:some_uuid')
32 34
33 self.assertFalse(jm.awaiting_startup_ack) 35 self.assertFalse(jm.awaiting_startup_ack)
34 self.assertEqual(jm.status, constants.STATUS.ready) 36 self.assertEqual(jm.status, constants.STATUS.ready)
35 37
36# EMQP Tests 38# EMQP Tests
37 def test_reset(self): 39 @mock.patch('uuid.uuid4')
40 def test_reset(self, name_mock):
41 name_mock.return_value = 'some_uuid'
38 jm = jobmanager.JobManager() 42 jm = jobmanager.JobManager()
39 43
44 self.assertEqual(jm.name.decode('ascii'), 'some_uuid')
45
40 self.assertFalse(jm.awaiting_startup_ack) 46 self.assertFalse(jm.awaiting_startup_ack)
41 self.assertEqual(jm.status, constants.STATUS.ready) 47 self.assertEqual(jm.status, constants.STATUS.ready)
42 48
diff --git a/eventmq/tests/test_scheduler.py b/eventmq/tests/test_scheduler.py
index cf5733a..82b459d 100644
--- a/eventmq/tests/test_scheduler.py
+++ b/eventmq/tests/test_scheduler.py
@@ -22,12 +22,14 @@ from .. import constants, scheduler, utils
22 22
23 23
24class TestCase(unittest.TestCase): 24class TestCase(unittest.TestCase):
25 def test__setup(self): 25 @mock.patch('uuid.uuid4')
26 def test__setup(self, name_mock):
27 name_mock.return_value = 'some_uuid'
26 override_settings = { 28 override_settings = {
27 'NAME': 'RuckasBringer' 29 'NAME': 'RuckasBringer'
28 } 30 }
29 sched = scheduler.Scheduler(override_settings=override_settings) 31 sched = scheduler.Scheduler(override_settings=override_settings)
30 self.assertEqual(sched.name, 'RuckasBringer') 32 self.assertEqual(sched.name.decode('ascii'), 'RuckasBringer:some_uuid')
31 33
32 self.assertFalse(sched.awaiting_startup_ack) 34 self.assertFalse(sched.awaiting_startup_ack)
33 self.assertEqual(sched.status, constants.STATUS.ready) 35 self.assertEqual(sched.status, constants.STATUS.ready)
diff --git a/eventmq/tests/test_sender.py b/eventmq/tests/test_sender.py
index f292696..f1a9984 100644
--- a/eventmq/tests/test_sender.py
+++ b/eventmq/tests/test_sender.py
@@ -35,7 +35,8 @@ class TestCase(unittest.TestCase):
35 35
36 self.assert_(socket.poll() != 0) 36 self.assert_(socket.poll() != 0)
37 msg = socket.recv_multipart() 37 msg = socket.recv_multipart()
38 self.assertEqual(msg[0].decode('ascii'), self.sender.name) 38 self.assertEqual(msg[0].decode('ascii'),
39 self.sender.name.decode('ascii'))
39 self.assertEqual(msg[1], b'') 40 self.assertEqual(msg[1], b'')
40 self.assertEqual(msg[2], b'1') 41 self.assertEqual(msg[2], b'1')
41 self.assertEqual(msg[3], b'Hello!') 42 self.assertEqual(msg[3], b'Hello!')
diff --git a/eventmq/tests/test_utils.py b/eventmq/tests/test_utils.py
index 96d8c0f..45eba50 100644
--- a/eventmq/tests/test_utils.py
+++ b/eventmq/tests/test_utils.py
@@ -142,7 +142,7 @@ class TestCase(unittest.TestCase):
142 142
143 def test_emqDeque(self): 143 def test_emqDeque(self):
144 144
145 full = random.randint(1, 100) 145 full = random.randint(2, 100)
146 pfull = random.randint(1, full-1) 146 pfull = random.randint(1, full-1)
147 147
148 q = classes.EMQdeque(full=full, 148 q = classes.EMQdeque(full=full,
diff --git a/eventmq/utils/devices.py b/eventmq/utils/devices.py
index db7a655..7fbb028 100644
--- a/eventmq/utils/devices.py
+++ b/eventmq/utils/devices.py
@@ -26,11 +26,13 @@ def generate_device_name(prefix=None):
26 Args: 26 Args:
27 prefix (str): Prefix the id with this string. 27 prefix (str): Prefix the id with this string.
28 28
29 Returns (str) An ascii encoded string that can be used as an IDENTITY for a 29 Returns (str/bytes) An ascii encoded string that can be used as an IDENTITY
30 ZMQ socket. 30 for a ZMQ socket. In python3 this returns a `bytes`, python2 a 'str
31 """ 31 """
32 import uuid 32 import uuid
33 ret = str(uuid.uuid4()).encode('ascii') 33 ret = str(uuid.uuid4())
34
34 if prefix: 35 if prefix:
35 ret = prefix + ret 36 ret = "{}:{}".format(prefix, ret)
36 return ret 37
38 return ret.encode('ascii')