aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDavid Hurst2017-05-11 19:19:03 -0600
committerDavid Hurst2017-05-11 19:34:57 -0600
commit00d8b53931a6e106575f96da397b4c66afb8338e (patch)
tree78143e566838ad3eb1d5786bc69cb5d31f3b7cb5
parent73c9e66b3f8070838246c758d30a51a31a129313 (diff)
downloadeventmq-00d8b53931a6e106575f96da397b4c66afb8338e.tar.gz
eventmq-00d8b53931a6e106575f96da397b4c66afb8338e.zip
generate_device_name now does the work it should
-rw-r--r--eventmq/jobmanager.py7
-rw-r--r--eventmq/publisher.py10
-rw-r--r--eventmq/receiver.py2
-rw-r--r--eventmq/router.py5
-rw-r--r--eventmq/scheduler.py7
-rw-r--r--eventmq/sender.py9
-rw-r--r--eventmq/tests/test_jobmanager.py4
-rw-r--r--eventmq/tests/test_scheduler.py2
-rw-r--r--eventmq/tests/test_sender.py3
-rw-r--r--eventmq/tests/utils.py2
-rw-r--r--eventmq/utils/devices.py14
11 files changed, 28 insertions, 37 deletions
diff --git a/eventmq/jobmanager.py b/eventmq/jobmanager.py
index 41f30d3..dbf930c 100644
--- a/eventmq/jobmanager.py
+++ b/eventmq/jobmanager.py
@@ -81,10 +81,7 @@ class JobManager(HeartbeatMixin, EMQPService):
81 81
82 #: Define the name of this JobManager instance. Useful to know when 82 #: Define the name of this JobManager instance. Useful to know when
83 #: referring to the logs. 83 #: referring to the logs.
84 if conf.NAME: 84 self.name = generate_device_name(conf.NAME)
85 self.name = "{}:{}".format(conf.NAME, generate_device_name())
86 else:
87 self.name = generate_device_name()
88 85
89 logger.info('Initializing JobManager {}...'.format(self.name)) 86 logger.info('Initializing JobManager {}...'.format(self.name))
90 87
@@ -100,7 +97,7 @@ class JobManager(HeartbeatMixin, EMQPService):
100 #: then telling the router that it is READY. The reply will be the unit 97 #: then telling the router that it is READY. The reply will be the unit
101 #: of work. 98 #: of work.
102 # Despite the name, jobs are received on this socket 99 # Despite the name, jobs are received on this socket
103 self.frontend = Sender(name=self.name) 100 self.frontend = Sender(name=conf.NAME)
104 101
105 self.poller = Poller() 102 self.poller = Poller()
106 103
diff --git a/eventmq/publisher.py b/eventmq/publisher.py
index aac7b79..7b5c1fb 100644
--- a/eventmq/publisher.py
+++ b/eventmq/publisher.py
@@ -23,14 +23,13 @@ import zmq
23 23
24from . import constants 24from . import constants
25from .utils.devices import generate_device_name 25from .utils.devices import generate_device_name
26from .settings import conf
27 26
28logger = logging.getLogger(__name__) 27logger = logging.getLogger(__name__)
29 28
30 29
31class Publisher(object): 30class Publisher(object):
32 """ 31 """
33 name (str): Name of this socket 32 name (str): Named prefix of this socket
34 zcontext (:class:`zmq.Context`): socket context 33 zcontext (:class:`zmq.Context`): socket context
35 zsocket (:class:`zmq.Socket`): 34 zsocket (:class:`zmq.Socket`):
36 """ 35 """
@@ -38,13 +37,10 @@ class Publisher(object):
38 def __init__(self, *args, **kwargs): 37 def __init__(self, *args, **kwargs):
39 self.zcontext = kwargs.get('context', zmq.Context.instance()) 38 self.zcontext = kwargs.get('context', zmq.Context.instance())
40 39
41 if conf.NAME: 40 self.name = generate_device_name(kwargs.pop('name', None))
42 self.name = "{}:{}".format(conf.NAME, generate_device_name())
43 else:
44 self.name = generate_device_name()
45 41
46 self.zsocket = kwargs.get('socket', self.zcontext.socket(zmq.PUB)) 42 self.zsocket = kwargs.get('socket', self.zcontext.socket(zmq.PUB))
47 self.zsocket.setsockopt_string(zmq.IDENTITY, self.name) 43 self.zsocket.setsockopt(zmq.IDENTITY, self.name)
48 44
49 self.status = constants.STATUS.ready 45 self.status = constants.STATUS.ready
50 46
diff --git a/eventmq/receiver.py b/eventmq/receiver.py
index 6c9060f..4450e2e 100644
--- a/eventmq/receiver.py
+++ b/eventmq/receiver.py
@@ -57,7 +57,7 @@ class Receiver(ZMQReceiveMixin, ZMQSendMixin):
57 self.name = kwargs.get('name', generate_device_name()) 57 self.name = kwargs.get('name', generate_device_name())
58 58
59 self.zsocket = kwargs.get('socket', self.zcontext.socket(zmq.ROUTER)) 59 self.zsocket = kwargs.get('socket', self.zcontext.socket(zmq.ROUTER))
60 self.zsocket.setsockopt_string(zmq.IDENTITY, self.name) 60 self.zsocket.setsockopt(zmq.IDENTITY, self.name)
61 self.zsocket.setsockopt(zmq.ROUTER_MANDATORY, 1) 61 self.zsocket.setsockopt(zmq.ROUTER_MANDATORY, 1)
62 62
63 self.status = constants.STATUS.ready 63 self.status = constants.STATUS.ready
diff --git a/eventmq/router.py b/eventmq/router.py
index 3911997..eceb080 100644
--- a/eventmq/router.py
+++ b/eventmq/router.py
@@ -64,10 +64,7 @@ class Router(HeartbeatMixin):
64 64
65 super(Router, self).__init__(*args, **kwargs) # Creates _meta 65 super(Router, self).__init__(*args, **kwargs) # Creates _meta
66 66
67 if conf.NAME: 67 self.name = generate_device_name(conf.NAME)
68 self.name = "{}:{}".format(conf.NAME, generate_device_name())
69 else:
70 self.name = generate_device_name()
71 68
72 logger.info('Initializing Router %s...' % self.name) 69 logger.info('Initializing Router %s...' % self.name)
73 70
diff --git a/eventmq/scheduler.py b/eventmq/scheduler.py
index 686b29c..799ae20 100644
--- a/eventmq/scheduler.py
+++ b/eventmq/scheduler.py
@@ -75,12 +75,9 @@ class Scheduler(HeartbeatMixin, EMQPService):
75 75
76 super(Scheduler, self).__init__(*args, **kwargs) 76 super(Scheduler, self).__init__(*args, **kwargs)
77 77
78 if conf.NAME: 78 self.name = generate_device_name(conf.NAME)
79 self.name = "{}:{}".format(conf.NAME, generate_device_name())
80 else:
81 self.name = generate_device_name()
82 79
83 self.frontend = Sender() 80 self.frontend = Sender(conf.NAME)
84 self._redis_server = None 81 self._redis_server = None
85 82
86 # contains dict of 4-item lists representing cron jobs key of this 83 # contains dict of 4-item lists representing cron jobs key of this
diff --git a/eventmq/sender.py b/eventmq/sender.py
index 315e04e..4396e7f 100644
--- a/eventmq/sender.py
+++ b/eventmq/sender.py
@@ -18,13 +18,12 @@
18The sender is responsible for sending messages 18The sender is responsible for sending messages
19""" 19"""
20import logging 20import logging
21import sys
22import uuid
23 21
24import zmq 22import zmq
25 23
26from . import constants, exceptions 24from . import constants, exceptions
27from .utils.classes import ZMQReceiveMixin, ZMQSendMixin 25from .utils.classes import ZMQReceiveMixin, ZMQSendMixin
26from .utils.devices import generate_device_name
28 27
29logger = logging.getLogger(__name__) 28logger = logging.getLogger(__name__)
30 29
@@ -61,7 +60,8 @@ class Sender(ZMQSendMixin, ZMQReceiveMixin):
61 # rebuilding it later. 60 # rebuilding it later.
62 self.zsocket = None 61 self.zsocket = None
63 62
64 self.name = kwargs.pop('name', str(uuid.uuid4())) 63 # Immutable name that was specified at construction time
64 self.conf_name = kwargs.pop('name', None)
65 65
66 self.rebuild(*args, **kwargs) 66 self.rebuild(*args, **kwargs)
67 67
@@ -138,7 +138,8 @@ class Sender(ZMQSendMixin, ZMQReceiveMixin):
138 self.zsocket.close() 138 self.zsocket.close()
139 139
140 self.zsocket = kwargs.pop('socket', self.zcontext.socket(zmq.DEALER)) 140 self.zsocket = kwargs.pop('socket', self.zcontext.socket(zmq.DEALER))
141 self.zsocket.setsockopt_string(zmq.IDENTITY, str(self.name)) 141 self.name = generate_device_name(self.conf_name)
142 self.zsocket.setsockopt(zmq.IDENTITY, self.name)
142 143
143 self.status = constants.STATUS.ready 144 self.status = constants.STATUS.ready
144 145
diff --git a/eventmq/tests/test_jobmanager.py b/eventmq/tests/test_jobmanager.py
index 2f9218f..a19dfd7 100644
--- a/eventmq/tests/test_jobmanager.py
+++ b/eventmq/tests/test_jobmanager.py
@@ -30,7 +30,7 @@ class TestCase(unittest.TestCase):
30 'NAME': 'RuckusBringer' 30 'NAME': 'RuckusBringer'
31 } 31 }
32 jm = jobmanager.JobManager(override_settings=override_settings) 32 jm = jobmanager.JobManager(override_settings=override_settings)
33 self.assertEqual(jm.name, 'RuckusBringer:some_uuid') 33 self.assertEqual(jm.name.decode('ascii'), 'RuckusBringer:some_uuid')
34 34
35 self.assertFalse(jm.awaiting_startup_ack) 35 self.assertFalse(jm.awaiting_startup_ack)
36 self.assertEqual(jm.status, constants.STATUS.ready) 36 self.assertEqual(jm.status, constants.STATUS.ready)
@@ -41,7 +41,7 @@ class TestCase(unittest.TestCase):
41 name_mock.return_value = 'some_uuid' 41 name_mock.return_value = 'some_uuid'
42 jm = jobmanager.JobManager() 42 jm = jobmanager.JobManager()
43 43
44 self.assertEqual(jm.name, 'some_uuid') 44 self.assertEqual(jm.name.decode('ascii'), 'some_uuid')
45 45
46 self.assertFalse(jm.awaiting_startup_ack) 46 self.assertFalse(jm.awaiting_startup_ack)
47 self.assertEqual(jm.status, constants.STATUS.ready) 47 self.assertEqual(jm.status, constants.STATUS.ready)
diff --git a/eventmq/tests/test_scheduler.py b/eventmq/tests/test_scheduler.py
index a673561..dc96b09 100644
--- a/eventmq/tests/test_scheduler.py
+++ b/eventmq/tests/test_scheduler.py
@@ -29,7 +29,7 @@ class TestCase(unittest.TestCase):
29 'NAME': 'RuckasBringer' 29 'NAME': 'RuckasBringer'
30 } 30 }
31 sched = scheduler.Scheduler(override_settings=override_settings) 31 sched = scheduler.Scheduler(override_settings=override_settings)
32 self.assertEqual(sched.name, 'RuckasBringer:some_uuid') 32 self.assertEqual(sched.name.decode('ascii'), 'RuckasBringer:some_uuid')
33 33
34 self.assertFalse(sched.awaiting_startup_ack) 34 self.assertFalse(sched.awaiting_startup_ack)
35 self.assertEqual(sched.status, constants.STATUS.ready) 35 self.assertEqual(sched.status, constants.STATUS.ready)
diff --git a/eventmq/tests/test_sender.py b/eventmq/tests/test_sender.py
index f292696..f1a9984 100644
--- a/eventmq/tests/test_sender.py
+++ b/eventmq/tests/test_sender.py
@@ -35,7 +35,8 @@ class TestCase(unittest.TestCase):
35 35
36 self.assert_(socket.poll() != 0) 36 self.assert_(socket.poll() != 0)
37 msg = socket.recv_multipart() 37 msg = socket.recv_multipart()
38 self.assertEqual(msg[0].decode('ascii'), self.sender.name) 38 self.assertEqual(msg[0].decode('ascii'),
39 self.sender.name.decode('ascii'))
39 self.assertEqual(msg[1], b'') 40 self.assertEqual(msg[1], b'')
40 self.assertEqual(msg[2], b'1') 41 self.assertEqual(msg[2], b'1')
41 self.assertEqual(msg[3], b'Hello!') 42 self.assertEqual(msg[3], b'Hello!')
diff --git a/eventmq/tests/utils.py b/eventmq/tests/utils.py
index 4ea8c6d..c93d488 100644
--- a/eventmq/tests/utils.py
+++ b/eventmq/tests/utils.py
@@ -37,7 +37,7 @@ class FakeDevice(ZMQReceiveMixin, ZMQSendMixin):
37 37
38 self.name = generate_device_name() 38 self.name = generate_device_name()
39 self.zsocket = zmq.Context.instance().socket(type_) 39 self.zsocket = zmq.Context.instance().socket(type_)
40 self.zsocket.setsockopt_string(zmq.IDENTITY, self.name) 40 self.zsocket.setsockopt(zmq.IDENTITY, self.name)
41 41
42 42
43def send_raw_INFORM(sock, type_, queues=(conf.DEFAULT_QUEUE_NAME,)): 43def send_raw_INFORM(sock, type_, queues=(conf.DEFAULT_QUEUE_NAME,)):
diff --git a/eventmq/utils/devices.py b/eventmq/utils/devices.py
index d00815c..7fbb028 100644
--- a/eventmq/utils/devices.py
+++ b/eventmq/utils/devices.py
@@ -20,17 +20,19 @@
20 20
21def generate_device_name(prefix=None): 21def generate_device_name(prefix=None):
22 """ 22 """
23 This generates a uuid for use in setsockopt_string(zmq.IDENTITY, x) 23 This takes care of the python3 'everything is unicode' feature which
24 Note: This will fail if used with python3 and setsockopt(zmq.IDENTIOTY, x) 24 causes errors when setting the IDENTITY of the ZMQ socket.
25 25
26 Args: 26 Args:
27 prefix (str): Prefix the id with this string. 27 prefix (str): Prefix the id with this string.
28 28
29 Returns (str) An ascii encoded string that can be used as an IDENTITY for a 29 Returns (str/bytes) An ascii encoded string that can be used as an IDENTITY
30 ZMQ socket. 30 for a ZMQ socket. In python3 this returns a `bytes`, python2 a 'str
31 """ 31 """
32 import uuid 32 import uuid
33 ret = str(uuid.uuid4()) 33 ret = str(uuid.uuid4())
34
34 if prefix: 35 if prefix:
35 ret = prefix + ret 36 ret = "{}:{}".format(prefix, ret)
36 return ret 37
38 return ret.encode('ascii')