aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorjason2017-03-20 12:31:40 -0600
committerjason2017-03-21 17:01:41 -0600
commit57290ae8e83c07de60cc54f50300109689b9c11a (patch)
tree3aca8be91c45a6d6e903c5409c7bf0dbffc5ef8d
parent93fa6515f216f3ce6f447e54dfcfac47a4b30732 (diff)
downloadeventmq-57290ae8e83c07de60cc54f50300109689b9c11a.tar.gz
eventmq-57290ae8e83c07de60cc54f50300109689b9c11a.zip
Rename incoming and outgoing ports to frontend and backend
- This refactors the `self.incoming` and `self.outgoing` sockets to `self.frontend` (client facing) and `self.backend` (manager facing). This change should make the purpose of these ports more clear for new people working in the code. Resolves: #16
-rw-r--r--etc/eventmq.conf-dist4
-rw-r--r--eventmq/conf.py4
-rw-r--r--eventmq/jobmanager.py16
-rw-r--r--eventmq/pub.py28
-rw-r--r--eventmq/router.py55
-rw-r--r--eventmq/scheduler.py12
-rw-r--r--eventmq/tests/test_jobmanager.py8
-rw-r--r--eventmq/tests/test_receiver.py2
-rw-r--r--eventmq/tests/test_router.py36
-rw-r--r--eventmq/tests/test_utils.py12
-rw-r--r--eventmq/utils/classes.py20
-rw-r--r--setup.py2
12 files changed, 101 insertions, 98 deletions
diff --git a/etc/eventmq.conf-dist b/etc/eventmq.conf-dist
index 43ec6f2..d492269 100644
--- a/etc/eventmq.conf-dist
+++ b/etc/eventmq.conf-dist
@@ -23,5 +23,5 @@ queues=[[50,"google"], [40,"pushes"], [10,"default"]]
23concurrent_jobs=2 23concurrent_jobs=2
24 24
25[publisher] 25[publisher]
26publisher_incoming_addr=tcp://0.0.0.0:47298 26publisher_frontend_addr=tcp://0.0.0.0:47298
27publisher_outgoing_addr=tcp://0.0.0.0:47299 \ No newline at end of file 27publisher_backend_addr=tcp://0.0.0.0:47299 \ No newline at end of file
diff --git a/eventmq/conf.py b/eventmq/conf.py
index cec0249..16b1f78 100644
--- a/eventmq/conf.py
+++ b/eventmq/conf.py
@@ -73,8 +73,8 @@ WORKER_ADDR_FAILOVER = 'tcp://127.0.0.1:47290'
73ADMINISTRATIVE_ADDR = 'tcp://127.0.0.1:47293' 73ADMINISTRATIVE_ADDR = 'tcp://127.0.0.1:47293'
74 74
75# PubSub 75# PubSub
76PUBLISHER_INCOMING_ADDR = 'tcp://127.0.0.1:47298' 76PUBLISHER_FRONTEND_ADDR = 'tcp://127.0.0.1:47298'
77PUBLISHER_OUTGOING_ADDR = 'tcp://127.0.0.1:47299' 77PUBLISHER_BACKEND_ADDR = 'tcp://127.0.0.1:47299'
78 78
79# How many jobs should the job manager concurrently handle? 79# How many jobs should the job manager concurrently handle?
80CONCURRENT_JOBS = 4 80CONCURRENT_JOBS = 4
diff --git a/eventmq/jobmanager.py b/eventmq/jobmanager.py
index ea5af07..831cee5 100644
--- a/eventmq/jobmanager.py
+++ b/eventmq/jobmanager.py
@@ -95,7 +95,7 @@ class JobManager(HeartbeatMixin, EMQPService):
95 #: then telling the router that it is READY. The reply will be the unit 95 #: then telling the router that it is READY. The reply will be the unit
96 #: of work. 96 #: of work.
97 # Despite the name, jobs are received on this socket 97 # Despite the name, jobs are received on this socket
98 self.outgoing = Sender(name=self.name) 98 self.frontend = Sender(name=self.name)
99 99
100 self.poller = Poller() 100 self.poller = Poller()
101 101
@@ -149,8 +149,8 @@ class JobManager(HeartbeatMixin, EMQPService):
149 self.received_disconnect = True 149 self.received_disconnect = True
150 continue 150 continue
151 151
152 if events.get(self.outgoing) == POLLIN: 152 if events.get(self.frontend) == POLLIN:
153 msg = self.outgoing.recv_multipart() 153 msg = self.frontend.recv_multipart()
154 self.process_message(msg) 154 self.process_message(msg)
155 155
156 # Call appropiate callbacks for each finished job 156 # Call appropiate callbacks for each finished job
@@ -238,7 +238,7 @@ class JobManager(HeartbeatMixin, EMQPService):
238 send the READY command upstream to indicate that JobManager is ready 238 send the READY command upstream to indicate that JobManager is ready
239 for another REQUEST message. 239 for another REQUEST message.
240 """ 240 """
241 sendmsg(self.outgoing, 'READY') 241 sendmsg(self.frontend, 'READY')
242 242
243 def send_reply(self, reply, msgid): 243 def send_reply(self, reply, msgid):
244 """ 244 """
@@ -249,7 +249,7 @@ class JobManager(HeartbeatMixin, EMQPService):
249 recipient (str): The recipient id for the ack 249 recipient (str): The recipient id for the ack
250 msgid: The unique id that we are acknowledging 250 msgid: The unique id that we are acknowledging
251 """ 251 """
252 sendmsg(self.outgoing, 'REPLY', [reply, msgid]) 252 sendmsg(self.frontend, 'REPLY', [reply, msgid])
253 253
254 def on_heartbeat(self, msgid, message): 254 def on_heartbeat(self, msgid, message):
255 """ 255 """
@@ -277,8 +277,8 @@ class JobManager(HeartbeatMixin, EMQPService):
277 self._workers.append(w) 277 self._workers.append(w)
278 278
279 def on_disconnect(self, msgid, msg): 279 def on_disconnect(self, msgid, msg):
280 sendmsg(self.outgoing, KBYE) 280 sendmsg(self.frontend, KBYE)
281 self.outgoing.unbind(conf.WORKER_ADDR) 281 self.frontend.unbind(conf.WORKER_ADDR)
282 super(JobManager, self).on_disconnect(msgid, msg) 282 super(JobManager, self).on_disconnect(msgid, msg)
283 283
284 def on_kbye(self, msgid, msg): 284 def on_kbye(self, msgid, msg):
@@ -295,7 +295,7 @@ class JobManager(HeartbeatMixin, EMQPService):
295 295
296 def sigterm_handler(self, signum, frame): 296 def sigterm_handler(self, signum, frame):
297 logger.info('Shutting down..') 297 logger.info('Shutting down..')
298 sendmsg(self.outgoing, KBYE) 298 sendmsg(self.frontend, KBYE)
299 299
300 self.awaiting_startup_ack = False 300 self.awaiting_startup_ack = False
301 self.received_disconnect = True 301 self.received_disconnect = True
diff --git a/eventmq/pub.py b/eventmq/pub.py
index 3918580..487e8c5 100644
--- a/eventmq/pub.py
+++ b/eventmq/pub.py
@@ -32,26 +32,26 @@ logger = logging.getLogger(__name__)
32class Pub(HeartbeatMixin): 32class Pub(HeartbeatMixin):
33 def __init__(self): 33 def __init__(self):
34 self.poller = poller.Poller() 34 self.poller = poller.Poller()
35 self.incoming = receiver.Receiver() 35 self.frontend = receiver.Receiver()
36 self.outgoing = publisher.Publisher() 36 self.backend = publisher.Publisher()
37 37
38 self.received_disconnect = False 38 self.received_disconnect = False
39 39
40 self.poller.register(self.incoming, poller.POLLIN) 40 self.poller.register(self.frontend, poller.POLLIN)
41 return 41 return
42 42
43 def start(self, 43 def start(self,
44 incoming_addr=conf.PUBLISHER_INCOMING_ADDR, 44 frontend_addr=conf.PUBLISHER_FRONTEND_ADDR,
45 outgoing_addr=conf.PUBLISHER_OUTGOING_ADDR): 45 backend_addr=conf.PUBLISHER_BACKEND_ADDR):
46 46
47 self.status = STATUS.starting 47 self.status = STATUS.starting
48 48
49 self.incoming.listen(incoming_addr) 49 self.frontend.listen(frontend_addr)
50 self.outgoing.listen(outgoing_addr) 50 self.backend.listen(backend_addr)
51 51
52 logger.info('Listening for publish requests on {}'.format( 52 logger.info('Listening for publish requests on {}'.format(
53 incoming_addr)) 53 frontend_addr))
54 logger.info('Listening for subscribers on {}'.format(outgoing_addr)) 54 logger.info('Listening for subscribers on {}'.format(backend_addr))
55 55
56 self._start_event_loop() 56 self._start_event_loop()
57 57
@@ -63,8 +63,8 @@ class Pub(HeartbeatMixin):
63 63
64 events = self.poller.poll() 64 events = self.poller.poll()
65 65
66 if events.get(self.incoming) == poller.POLLIN: 66 if events.get(self.frontend) == poller.POLLIN:
67 msg = self.incoming.recv_multipart() 67 msg = self.frontend.recv_multipart()
68 self.process_client_message(msg) 68 self.process_client_message(msg)
69 69
70 def process_client_message(self, msg): 70 def process_client_message(self, msg):
@@ -77,7 +77,7 @@ class Pub(HeartbeatMixin):
77 logger.debug('Got Publish command') 77 logger.debug('Got Publish command')
78 topic = msg[5] 78 topic = msg[5]
79 sub_message = msg[6] 79 sub_message = msg[6]
80 logger.debug(self.outgoing.publish(topic, sub_message)) 80 logger.debug(self.backend.publish(topic, sub_message))
81 81
82 return 82 return
83 83
@@ -87,8 +87,8 @@ class Pub(HeartbeatMixin):
87 """ 87 """
88 setup_logger('eventmq') 88 setup_logger('eventmq')
89 import_settings(section='publisher') 89 import_settings(section='publisher')
90 self.start(incoming_addr=conf.PUBLISHER_INCOMING_ADDR, 90 self.start(frontend_addr=conf.PUBLISHER_FRONTEND_ADDR,
91 outgoing_addr=conf.PUBLISHER_OUTGOING_ADDR) 91 backend_addr=conf.PUBLISHER_BACKEND_ADDR)
92 92
93 93
94# Entry point for pip console scripts 94# Entry point for pip console scripts
diff --git a/eventmq/router.py b/eventmq/router.py
index 2943e3c..4ff3732 100644
--- a/eventmq/router.py
+++ b/eventmq/router.py
@@ -55,12 +55,15 @@ class Router(HeartbeatMixin):
55 55
56 self.poller = poller.Poller() 56 self.poller = poller.Poller()
57 57
58 self.incoming = receiver.Receiver() 58 #: Port clients connect on.
59 self.outgoing = receiver.Receiver() 59 self.frontend = receiver.Receiver()
60 #: Port job managers connect on
61 self.backend = receiver.Receiver()
62 #: Port for administrative commands
60 self.administrative_socket = receiver.Receiver() 63 self.administrative_socket = receiver.Receiver()
61 64
62 self.poller.register(self.incoming, poller.POLLIN) 65 self.poller.register(self.frontend, poller.POLLIN)
63 self.poller.register(self.outgoing, poller.POLLIN) 66 self.poller.register(self.backend, poller.POLLIN)
64 self.poller.register(self.administrative_socket, poller.POLLIN) 67 self.poller.register(self.administrative_socket, poller.POLLIN)
65 68
66 self.status = STATUS.ready 69 self.status = STATUS.ready
@@ -147,8 +150,8 @@ class Router(HeartbeatMixin):
147 """ 150 """
148 self.status = STATUS.starting 151 self.status = STATUS.starting
149 152
150 self.incoming.listen(frontend_addr) 153 self.frontend.listen(frontend_addr)
151 self.outgoing.listen(backend_addr) 154 self.backend.listen(backend_addr)
152 self.administrative_socket.listen(administrative_addr) 155 self.administrative_socket.listen(administrative_addr)
153 156
154 self.status = STATUS.listening 157 self.status = STATUS.listening
@@ -169,12 +172,12 @@ class Router(HeartbeatMixin):
169 now = monotonic() 172 now = monotonic()
170 events = self.poller.poll() 173 events = self.poller.poll()
171 174
172 if events.get(self.incoming) == poller.POLLIN: 175 if events.get(self.frontend) == poller.POLLIN:
173 msg = self.incoming.recv_multipart() 176 msg = self.frontend.recv_multipart()
174 self.process_client_message(msg) 177 self.process_client_message(msg)
175 178
176 if events.get(self.outgoing) == poller.POLLIN: 179 if events.get(self.backend) == poller.POLLIN:
177 msg = self.outgoing.recv_multipart() 180 msg = self.backend.recv_multipart()
178 self.process_worker_message(msg) 181 self.process_worker_message(msg)
179 182
180 if events.get(self.administrative_socket) == poller.POLLIN: 183 if events.get(self.administrative_socket) == poller.POLLIN:
@@ -275,7 +278,7 @@ class Router(HeartbeatMixin):
275 self._meta['last_sent_heartbeat'] = monotonic() 278 self._meta['last_sent_heartbeat'] = monotonic()
276 279
277 for worker_id in self.workers: 280 for worker_id in self.workers:
278 self.send_heartbeat(self.outgoing, worker_id) 281 self.send_heartbeat(self.backend, worker_id)
279 282
280 def send_schedulers_heartbeats(self): 283 def send_schedulers_heartbeats(self):
281 """ 284 """
@@ -284,7 +287,7 @@ class Router(HeartbeatMixin):
284 self._meta['last_sent_scheduler_heartbeat'] = monotonic() 287 self._meta['last_sent_scheduler_heartbeat'] = monotonic()
285 288
286 for scheduler_id in self.schedulers: 289 for scheduler_id in self.schedulers:
287 self.send_heartbeat(self.incoming, scheduler_id) 290 self.send_heartbeat(self.frontend, scheduler_id)
288 291
289 def on_heartbeat(self, sender, msgid, msg): 292 def on_heartbeat(self, sender, msgid, msg):
290 """ 293 """
@@ -319,10 +322,10 @@ class Router(HeartbeatMixin):
319 322
320 if client_type == CLIENT_TYPE.worker: 323 if client_type == CLIENT_TYPE.worker:
321 self.add_worker(sender, queues) 324 self.add_worker(sender, queues)
322 self.send_ack(self.outgoing, sender, msgid) 325 self.send_ack(self.backend, sender, msgid)
323 elif client_type == CLIENT_TYPE.scheduler: 326 elif client_type == CLIENT_TYPE.scheduler:
324 self.add_scheduler(sender) 327 self.add_scheduler(sender)
325 self.send_ack(self.incoming, sender, msgid) 328 self.send_ack(self.frontend, sender, msgid)
326 329
327 def on_reply(self, sender, msgid, msg): 330 def on_reply(self, sender, msgid, msg):
328 """ 331 """
@@ -353,10 +356,10 @@ class Router(HeartbeatMixin):
353 # Remove schedulers and send them a kbye 356 # Remove schedulers and send them a kbye
354 logger.info("Router preparing to disconnect...") 357 logger.info("Router preparing to disconnect...")
355 for scheduler in self.schedulers: 358 for scheduler in self.schedulers:
356 self.send_kbye(self.incoming, scheduler) 359 self.send_kbye(self.frontend, scheduler)
357 360
358 self.schedulers.clear() 361 self.schedulers.clear()
359 self.incoming.unbind(conf.FRONTEND_ADDR) 362 self.frontend.unbind(conf.FRONTEND_ADDR)
360 363
361 if len(self.waiting_messages) > 0: 364 if len(self.waiting_messages) > 0:
362 logger.info("Router processing messages in queue.") 365 logger.info("Router processing messages in queue.")
@@ -366,10 +369,10 @@ class Router(HeartbeatMixin):
366 self.process_worker_message(msg) 369 self.process_worker_message(msg)
367 370
368 for worker in self.workers.keys(): 371 for worker in self.workers.keys():
369 self.send_kbye(self.outgoing, worker) 372 self.send_kbye(self.backend, worker)
370 373
371 self.workers.clear() 374 self.workers.clear()
372 self.outgoing.unbind(conf.BACKEND_ADDR) 375 self.backend.unbind(conf.BACKEND_ADDR)
373 376
374 # Loops event loops should check for this and break out 377 # Loops event loops should check for this and break out
375 self.received_disconnect = True 378 self.received_disconnect = True
@@ -400,7 +403,7 @@ class Router(HeartbeatMixin):
400 msg = self.waiting_messages[queue_name].peekleft() 403 msg = self.waiting_messages[queue_name].peekleft()
401 404
402 try: 405 try:
403 fwdmsg(self.outgoing, sender, msg) 406 fwdmsg(self.backend, sender, msg)
404 self.waiting_messages[queue_name].popleft() 407 self.waiting_messages[queue_name].popleft()
405 except exceptions.PeerGoneAwayError: 408 except exceptions.PeerGoneAwayError:
406 # Cleanup a workerg that cannot be contacted, leaving the 409 # Cleanup a workerg that cannot be contacted, leaving the
@@ -494,12 +497,12 @@ class Router(HeartbeatMixin):
494 497
495 # Rebuild the message to be sent to the worker. fwdmsg will 498 # Rebuild the message to be sent to the worker. fwdmsg will
496 # properly address the message. 499 # properly address the message.
497 fwdmsg(self.outgoing, worker_addr, ['', constants.PROTOCOL_VERSION, 500 fwdmsg(self.backend, worker_addr, ['', constants.PROTOCOL_VERSION,
498 'REQUEST', msgid, ] + msg) 501 'REQUEST', msgid, ] + msg)
499 502
500 self.workers[worker_addr]['available_slots'] -= 1 503 self.workers[worker_addr]['available_slots'] -= 1
501 # Acknowledgment of the request being submitted to the client 504 # Acknowledgment of the request being submitted to the client
502 sendmsg(self.incoming, sender, 'REPLY', 505 sendmsg(self.frontend, sender, 'REPLY',
503 (msgid,)) 506 (msgid,))
504 except exceptions.PeerGoneAwayError: 507 except exceptions.PeerGoneAwayError:
505 logger.debug( 508 logger.debug(
@@ -760,7 +763,7 @@ class Router(HeartbeatMixin):
760 try: 763 try:
761 # Strips off the client id before forwarding because the 764 # Strips off the client id before forwarding because the
762 # scheduler isn't expecting it. 765 # scheduler isn't expecting it.
763 fwdmsg(self.incoming, scheduler_addr, original_msg[1:]) 766 fwdmsg(self.frontend, scheduler_addr, original_msg[1:])
764 767
765 except exceptions.PeerGoneAwayError: 768 except exceptions.PeerGoneAwayError:
766 logger.debug("Scheduler {} has unexpectedly gone away. Trying " 769 logger.debug("Scheduler {} has unexpectedly gone away. Trying "
@@ -777,7 +780,7 @@ class Router(HeartbeatMixin):
777 try: 780 try:
778 # Strips off the client id before forwarding because the 781 # Strips off the client id before forwarding because the
779 # scheduler isn't expecting it. 782 # scheduler isn't expecting it.
780 fwdmsg(self.incoming, scheduler_addr, original_msg[1:]) 783 fwdmsg(self.frontend, scheduler_addr, original_msg[1:])
781 784
782 except exceptions.PeerGoneAwayError: 785 except exceptions.PeerGoneAwayError:
783 logger.debug("Scheduler {} has unexpectedly gone away." 786 logger.debug("Scheduler {} has unexpectedly gone away."
@@ -902,8 +905,8 @@ class Router(HeartbeatMixin):
902 process receives a SIGHUP from the system. 905 process receives a SIGHUP from the system.
903 """ 906 """
904 logger.info('Caught signame %s' % signum) 907 logger.info('Caught signame %s' % signum)
905 self.incoming.unbind(conf.FRONTEND_ADDR) 908 self.frontend.unbind(conf.FRONTEND_ADDR)
906 self.outgoing.unbind(conf.BACKEND_ADDR) 909 self.backend.unbind(conf.BACKEND_ADDR)
907 import_settings() 910 import_settings()
908 self.start(frontend_addr=conf.FRONTEND_ADDR, 911 self.start(frontend_addr=conf.FRONTEND_ADDR,
909 backend_addr=conf.BACKEND_ADDR, 912 backend_addr=conf.BACKEND_ADDR,
diff --git a/eventmq/scheduler.py b/eventmq/scheduler.py
index 4bac781..0d953a6 100644
--- a/eventmq/scheduler.py
+++ b/eventmq/scheduler.py
@@ -56,7 +56,7 @@ class Scheduler(HeartbeatMixin, EMQPService):
56 logger.info('Initializing Scheduler...') 56 logger.info('Initializing Scheduler...')
57 import_settings() 57 import_settings()
58 super(Scheduler, self).__init__(*args, **kwargs) 58 super(Scheduler, self).__init__(*args, **kwargs)
59 self.outgoing = Sender() 59 self.frontend = Sender()
60 self._redis_server = None 60 self._redis_server = None
61 61
62 # contains dict of 4-item lists representing cron jobs key of this 62 # contains dict of 4-item lists representing cron jobs key of this
@@ -120,8 +120,8 @@ class Scheduler(HeartbeatMixin, EMQPService):
120 m_now = monotonic() 120 m_now = monotonic()
121 events = self.poller.poll() 121 events = self.poller.poll()
122 122
123 if events.get(self.outgoing) == POLLIN: 123 if events.get(self.frontend) == POLLIN:
124 msg = self.outgoing.recv_multipart() 124 msg = self.frontend.recv_multipart()
125 self.process_message(msg) 125 self.process_message(msg)
126 126
127 # TODO: distribute me! 127 # TODO: distribute me!
@@ -213,7 +213,7 @@ class Scheduler(HeartbeatMixin, EMQPService):
213 str: ID of the message 213 str: ID of the message
214 """ 214 """
215 jobmsg = json.loads(jobmsg) 215 jobmsg = json.loads(jobmsg)
216 msgid = send_request(self.outgoing, jobmsg, queue=queue, 216 msgid = send_request(self.frontend, jobmsg, queue=queue,
217 reply_requested=True) 217 reply_requested=True)
218 218
219 return msgid 219 return msgid
@@ -221,8 +221,8 @@ class Scheduler(HeartbeatMixin, EMQPService):
221 def on_disconnect(self, msgid, message): 221 def on_disconnect(self, msgid, message):
222 logger.info("Received DISCONNECT request: {}".format(message)) 222 logger.info("Received DISCONNECT request: {}".format(message))
223 self._redis_server.connection_pool.disconnect() 223 self._redis_server.connection_pool.disconnect()
224 sendmsg(self.outgoing, KBYE) 224 sendmsg(self.frontend, KBYE)
225 self.outgoing.unbind(conf.SCHEDULER_ADDR) 225 self.frontend.unbind(conf.SCHEDULER_ADDR)
226 super(Scheduler, self).on_disconnect(msgid, message) 226 super(Scheduler, self).on_disconnect(msgid, message)
227 227
228 def on_kbye(self, msgid, msg): 228 def on_kbye(self, msgid, msg):
diff --git a/eventmq/tests/test_jobmanager.py b/eventmq/tests/test_jobmanager.py
index db82843..82fed3f 100644
--- a/eventmq/tests/test_jobmanager.py
+++ b/eventmq/tests/test_jobmanager.py
@@ -42,7 +42,7 @@ class TestCase(unittest.TestCase):
42 jm = jobmanager.JobManager() 42 jm = jobmanager.JobManager()
43 jm.send_ready() 43 jm.send_ready()
44 44
45 sndmsg_mock.assert_called_with(jm.outgoing, 'READY') 45 sndmsg_mock.assert_called_with(jm.frontend, 'READY')
46 46
47 @mock.patch('multiprocessing.pool.Pool.close') 47 @mock.patch('multiprocessing.pool.Pool.close')
48 @mock.patch('eventmq.jobmanager.JobManager.process_message') 48 @mock.patch('eventmq.jobmanager.JobManager.process_message')
@@ -55,7 +55,7 @@ class TestCase(unittest.TestCase):
55 pool_close_mock): 55 pool_close_mock):
56 jm = jobmanager.JobManager() 56 jm = jobmanager.JobManager()
57 maybe_send_hb_mock.return_value = False 57 maybe_send_hb_mock.return_value = False
58 poll_mock.return_value = {jm.outgoing: jobmanager.POLLIN} 58 poll_mock.return_value = {jm.frontend: jobmanager.POLLIN}
59 sender_mock.return_value = [1, 2, 3] 59 sender_mock.return_value = [1, 2, 3]
60 60
61 jm._start_event_loop() 61 jm._start_event_loop()
@@ -106,7 +106,7 @@ class TestCase(unittest.TestCase):
106 socket_mock.return_value = True 106 socket_mock.return_value = True
107 107
108 jm = jobmanager.JobManager() 108 jm = jobmanager.JobManager()
109 jm.outgoing.status = constants.STATUS.listening 109 jm.frontend.status = constants.STATUS.listening
110 jm.on_disconnect(msgid, msg) 110 jm.on_disconnect(msgid, msg)
111 self.assertTrue(jm.received_disconnect, "Did not receive disconnect.") 111 self.assertTrue(jm.received_disconnect, "Did not receive disconnect.")
112 112
@@ -129,7 +129,7 @@ class TestCase(unittest.TestCase):
129 129
130 jm.sigterm_handler(13231, "FRAMEY the evil frame") 130 jm.sigterm_handler(13231, "FRAMEY the evil frame")
131 131
132 sendmsg_mock.assert_called_with(jm.outgoing, constants.KBYE) 132 sendmsg_mock.assert_called_with(jm.frontend, constants.KBYE)
133 self.assertFalse(jm.awaiting_startup_ack) 133 self.assertFalse(jm.awaiting_startup_ack)
134 self.assertTrue(jm.received_disconnect) 134 self.assertTrue(jm.received_disconnect)
135 135
diff --git a/eventmq/tests/test_receiver.py b/eventmq/tests/test_receiver.py
index 4ee7170..49eef69 100644
--- a/eventmq/tests/test_receiver.py
+++ b/eventmq/tests/test_receiver.py
@@ -24,7 +24,7 @@ class TestCase(unittest.TestCase):
24 self.zcontext = zmq.Context.instance() 24 self.zcontext = zmq.Context.instance()
25 25
26 self.router = router.Router() 26 self.router = router.Router()
27 self.receiver = self.router.incoming 27 self.receiver = self.router.frontend
28 self.sender = sender.Sender() 28 self.sender = sender.Sender()
29 29
30 def test_send_multipart_unicode(self): 30 def test_send_multipart_unicode(self):
diff --git a/eventmq/tests/test_router.py b/eventmq/tests/test_router.py
index fdb02d0..10441eb 100644
--- a/eventmq/tests/test_router.py
+++ b/eventmq/tests/test_router.py
@@ -29,16 +29,16 @@ class TestCase(unittest.TestCase):
29 def setUp(self): 29 def setUp(self):
30 self.router = router.Router(skip_signal=True) 30 self.router = router.Router(skip_signal=True)
31 self.router.zcontext = mock.Mock(spec=zmq.Context) 31 self.router.zcontext = mock.Mock(spec=zmq.Context)
32 self.router.incoming = mock.Mock(spec=receiver.Receiver) 32 self.router.frontend = mock.Mock(spec=receiver.Receiver)
33 self.router.outgoing = mock.Mock(spec=receiver.Receiver) 33 self.router.backend = mock.Mock(spec=receiver.Receiver)
34 34
35 @mock.patch('eventmq.receiver.zmq.Socket.bind') 35 @mock.patch('eventmq.receiver.zmq.Socket.bind')
36 @mock.patch('eventmq.router.Router._start_event_loop') 36 @mock.patch('eventmq.router.Router._start_event_loop')
37 def test_start(self, event_loop_mock, zsocket_bind_mock): 37 def test_start(self, event_loop_mock, zsocket_bind_mock):
38 # Test default args 38 # Test default args
39 self.router.start() 39 self.router.start()
40 self.router.incoming.listen.assert_called_with(conf.FRONTEND_ADDR) 40 self.router.frontend.listen.assert_called_with(conf.FRONTEND_ADDR)
41 self.router.outgoing.listen.assert_called_with(conf.BACKEND_ADDR) 41 self.router.backend.listen.assert_called_with(conf.BACKEND_ADDR)
42 self.assertEqual(self.router.status, constants.STATUS.listening) 42 self.assertEqual(self.router.status, constants.STATUS.listening)
43 43
44 # Test invalid args 44 # Test invalid args
@@ -55,7 +55,7 @@ class TestCase(unittest.TestCase):
55 sender_id, inform_msgid, [queues, 'worker']) 55 sender_id, inform_msgid, [queues, 'worker'])
56 56
57 self.router.send_ack.assert_called_with( 57 self.router.send_ack.assert_called_with(
58 self.router.outgoing, sender_id, inform_msgid) 58 self.router.backend, sender_id, inform_msgid)
59 59
60 self.router.add_worker.assert_called_with( 60 self.router.add_worker.assert_called_with(
61 sender_id, [(32, 'top'), (23, 'drop'), (12, 'shop')]) 61 sender_id, [(32, 'top'), (23, 'drop'), (12, 'shop')])
@@ -75,7 +75,7 @@ class TestCase(unittest.TestCase):
75 sender_id, inform_msgid, [queues, constants.CLIENT_TYPE.worker]) 75 sender_id, inform_msgid, [queues, constants.CLIENT_TYPE.worker])
76 76
77 self.router.send_ack.assert_called_with( 77 self.router.send_ack.assert_called_with(
78 self.router.outgoing, sender_id, inform_msgid) 78 self.router.backend, sender_id, inform_msgid)
79 self.router.add_worker.assert_called_with( 79 self.router.add_worker.assert_called_with(
80 sender_id, [(10, 'default'), ]) 80 sender_id, [(10, 'default'), ])
81 81
@@ -146,10 +146,10 @@ class TestCase(unittest.TestCase):
146 146
147 generate_msgid_mock.return_value = ack_msgid 147 generate_msgid_mock.return_value = ack_msgid
148 148
149 self.router.send_ack(self.router.outgoing, sender_id, orig_msgid) 149 self.router.send_ack(self.router.backend, sender_id, orig_msgid)
150 150
151 # Verify that an ACK was sent for the INFORM 151 # Verify that an ACK was sent for the INFORM
152 self.router.outgoing.send_multipart.assert_called_with( 152 self.router.backend.send_multipart.assert_called_with(
153 ('ACK', ack_msgid, orig_msgid), 153 ('ACK', ack_msgid, orig_msgid),
154 constants.PROTOCOL_VERSION, _recipient_id=sender_id) 154 constants.PROTOCOL_VERSION, _recipient_id=sender_id)
155 155
@@ -168,9 +168,9 @@ class TestCase(unittest.TestCase):
168 168
169 generate_msgid_mock.return_value = msgid 169 generate_msgid_mock.return_value = msgid
170 170
171 self.router.send_heartbeat(self.router.incoming, recipient_id) 171 self.router.send_heartbeat(self.router.frontend, recipient_id)
172 172
173 self.router.incoming.send_multipart.assert_called_with( 173 self.router.frontend.send_multipart.assert_called_with(
174 ('HEARTBEAT', msgid, str(ts)), constants.PROTOCOL_VERSION, 174 ('HEARTBEAT', msgid, str(ts)), constants.PROTOCOL_VERSION,
175 _recipient_id=recipient_id) 175 _recipient_id=recipient_id)
176 176
@@ -197,8 +197,8 @@ class TestCase(unittest.TestCase):
197 self.assertGreater(self.router._meta['last_sent_heartbeat'], 0) 197 self.assertGreater(self.router._meta['last_sent_heartbeat'], 0)
198 198
199 send_heartbeat_mock.assert_has_calls( 199 send_heartbeat_mock.assert_has_calls(
200 [mock.call(self.router.outgoing, 'w1'), 200 [mock.call(self.router.backend, 'w1'),
201 mock.call(self.router.outgoing, 'w2')], any_order=True) 201 mock.call(self.router.backend, 'w2')], any_order=True)
202 202
203 @mock.patch('eventmq.router.Router.send_heartbeat') 203 @mock.patch('eventmq.router.Router.send_heartbeat')
204 def test_send_schedulers_heartbeats(self, send_hb_mock): 204 def test_send_schedulers_heartbeats(self, send_hb_mock):
@@ -215,7 +215,7 @@ class TestCase(unittest.TestCase):
215 215
216 self.assertGreater( 216 self.assertGreater(
217 self.router._meta['last_sent_scheduler_heartbeat'], 0) 217 self.router._meta['last_sent_scheduler_heartbeat'], 0)
218 send_hb_mock.assert_called_with(self.router.incoming, scheduler_id) 218 send_hb_mock.assert_called_with(self.router.frontend, scheduler_id)
219 219
220 def test_on_disconnect(self): 220 def test_on_disconnect(self):
221 self.assertFalse(self.router.received_disconnect) 221 self.assertFalse(self.router.received_disconnect)
@@ -245,7 +245,7 @@ class TestCase(unittest.TestCase):
245 245
246 self.router.on_ready(worker_id, msgid, msg) 246 self.router.on_ready(worker_id, msgid, msg)
247 247
248 fwdmsg_mock.assert_called_with(self.router.outgoing, worker_id, 248 fwdmsg_mock.assert_called_with(self.router.backend, worker_id,
249 waiting_msg) 249 waiting_msg)
250 250
251 self.router.on_ready(worker_id, msgid + 'a', msg) 251 self.router.on_ready(worker_id, msgid + 'a', msg)
@@ -296,19 +296,19 @@ class TestCase(unittest.TestCase):
296 # Forward waiting_msg1 296 # Forward waiting_msg1
297 ready_msgid1 = 'ready23' 297 ready_msgid1 = 'ready23'
298 self.router.on_ready(worker1_id, ready_msgid1, ['READY', ready_msgid1]) 298 self.router.on_ready(worker1_id, ready_msgid1, ['READY', ready_msgid1])
299 fwdmsg_mock.assert_called_with(self.router.outgoing, worker1_id, 299 fwdmsg_mock.assert_called_with(self.router.backend, worker1_id,
300 waiting_msg1) 300 waiting_msg1)
301 301
302 # Forward waiting_msg3 -- blu is a higher priority for worker2 302 # Forward waiting_msg3 -- blu is a higher priority for worker2
303 ready_msgid3 = 'ready19' 303 ready_msgid3 = 'ready19'
304 self.router.on_ready(worker2_id, ready_msgid3, ['READY', ready_msgid3]) 304 self.router.on_ready(worker2_id, ready_msgid3, ['READY', ready_msgid3])
305 fwdmsg_mock.assert_called_with(self.router.outgoing, worker2_id, 305 fwdmsg_mock.assert_called_with(self.router.backend, worker2_id,
306 waiting_msg3) 306 waiting_msg3)
307 307
308 # Forward waiting_msg2 308 # Forward waiting_msg2
309 ready_msgid2 = 'ready5' 309 ready_msgid2 = 'ready5'
310 self.router.on_ready(worker2_id, ready_msgid2, ['READY', ready_msgid2]) 310 self.router.on_ready(worker2_id, ready_msgid2, ['READY', ready_msgid2])
311 fwdmsg_mock.assert_called_with(self.router.outgoing, worker2_id, 311 fwdmsg_mock.assert_called_with(self.router.backend, worker2_id,
312 waiting_msg2) 312 waiting_msg2)
313 313
314 # There should be no keys because the code checks for their existence 314 # There should be no keys because the code checks for their existence
@@ -348,7 +348,7 @@ class TestCase(unittest.TestCase):
348 348
349 # Router accepts job for 1 available slot 349 # Router accepts job for 1 available slot
350 self.router.on_request(client_id, msgid, msg) 350 self.router.on_request(client_id, msgid, msg)
351 fwdmsg_mock.assert_called_with(self.router.outgoing, worker_id, 351 fwdmsg_mock.assert_called_with(self.router.backend, worker_id,
352 ['', constants.PROTOCOL_VERSION, 352 ['', constants.PROTOCOL_VERSION,
353 'REQUEST', msgid, ] + msg) 353 'REQUEST', msgid, ] + msg)
354 self.assertEqual(self.router.workers[worker_id]['available_slots'], 0) 354 self.assertEqual(self.router.workers[worker_id]['available_slots'], 0)
diff --git a/eventmq/tests/test_utils.py b/eventmq/tests/test_utils.py
index 03da18a..996cddb 100644
--- a/eventmq/tests/test_utils.py
+++ b/eventmq/tests/test_utils.py
@@ -128,13 +128,13 @@ class SettingsTestCase(unittest.TestCase):
128class EMQPServiceTestCase(unittest.TestCase): 128class EMQPServiceTestCase(unittest.TestCase):
129 129
130 # pretend to be an emq socket 130 # pretend to be an emq socket
131 outgoing = 'some-outgoing-socket' 131 frontend = 'some-frontend-socket'
132 132
133 def get_worker(self): 133 def get_worker(self):
134 """return an EMQPService mimicking a worker""" 134 """return an EMQPService mimicking a worker"""
135 obj = classes.EMQPService() 135 obj = classes.EMQPService()
136 obj.SERVICE_TYPE = constants.CLIENT_TYPE.worker 136 obj.SERVICE_TYPE = constants.CLIENT_TYPE.worker
137 obj.outgoing = self.outgoing 137 obj.frontend = self.frontend
138 obj._meta = { 138 obj._meta = {
139 'last_sent_heartbeat': 0 139 'last_sent_heartbeat': 0
140 } 140 }
@@ -145,7 +145,7 @@ class EMQPServiceTestCase(unittest.TestCase):
145 """return an EMQPService mimicking a scheduler""" 145 """return an EMQPService mimicking a scheduler"""
146 obj = classes.EMQPService() 146 obj = classes.EMQPService()
147 obj.SERVICE_TYPE = constants.CLIENT_TYPE.scheduler 147 obj.SERVICE_TYPE = constants.CLIENT_TYPE.scheduler
148 obj.outgoing = self.outgoing 148 obj.frontend = self.frontend
149 obj._meta = { 149 obj._meta = {
150 'last_sent_heartbeat': 0 150 'last_sent_heartbeat': 0
151 } 151 }
@@ -168,7 +168,7 @@ class EMQPServiceTestCase(unittest.TestCase):
168 obj.send_inform() 168 obj.send_inform()
169 169
170 sendmsg_mock.assert_called_with( 170 sendmsg_mock.assert_called_with(
171 self.outgoing, 'INFORM', 171 self.frontend, 'INFORM',
172 ['', constants.CLIENT_TYPE.worker]) 172 ['', constants.CLIENT_TYPE.worker])
173 173
174 @mock.patch('eventmq.utils.classes.sendmsg') 174 @mock.patch('eventmq.utils.classes.sendmsg')
@@ -178,7 +178,7 @@ class EMQPServiceTestCase(unittest.TestCase):
178 obj.send_inform(queues="this shouldn't matter") 178 obj.send_inform(queues="this shouldn't matter")
179 179
180 sendmsg_mock.assert_called_with( 180 sendmsg_mock.assert_called_with(
181 self.outgoing, 'INFORM', 181 self.frontend, 'INFORM',
182 ['', constants.CLIENT_TYPE.scheduler] 182 ['', constants.CLIENT_TYPE.scheduler]
183 ) 183 )
184 184
@@ -189,7 +189,7 @@ class EMQPServiceTestCase(unittest.TestCase):
189 obj.send_inform(queues=([10, 'push'], [7, 'email'], 189 obj.send_inform(queues=([10, 'push'], [7, 'email'],
190 [3, 'default'])) 190 [3, 'default']))
191 sendmsg_mock.asert_called_with( 191 sendmsg_mock.asert_called_with(
192 'some-outgoing-socket', 'INFORM', 192 'some-frontend-socket', 'INFORM',
193 ['[10, "push"],[7, "email"],[3, "default"]', 193 ['[10, "push"],[7, "email"],[3, "default"]',
194 constants.CLIENT_TYPE.worker] 194 constants.CLIENT_TYPE.worker]
195 ) 195 )
diff --git a/eventmq/utils/classes.py b/eventmq/utils/classes.py
index 39c600b..b9e09fb 100644
--- a/eventmq/utils/classes.py
+++ b/eventmq/utils/classes.py
@@ -42,10 +42,10 @@ class EMQPService(object):
42 Also implements utlitiy methods for managing long-running processes. 42 Also implements utlitiy methods for managing long-running processes.
43 43
44 To use you must define: 44 To use you must define:
45 - `self.outgoing` - socket where messages can be sent to the Router 45 - `self.frontend` - socket where messages can be sent to the Router
46 - `self.SERVICE_TYPE` - defines the service type for INFORM. See 46 - `self.SERVICE_TYPE` - defines the service type for INFORM. See
47 :meth:`send_inform` for more information. 47 :meth:`send_inform` for more information.
48 - `self.poller` - the poller that `self.outgoing` will be using. 48 - `self.poller` - the poller that `self.frontend` will be using.
49 Usually: `self.poller = eventmq.poller.Poller()` 49 Usually: `self.poller = eventmq.poller.Poller()`
50 50
51 When messages are received from the router, they are processed in 51 When messages are received from the router, they are processed in
@@ -99,7 +99,7 @@ class EMQPService(object):
99 else: 99 else:
100 queues = '' 100 queues = ''
101 101
102 msgid = sendmsg(self.outgoing, 'INFORM', [ 102 msgid = sendmsg(self.frontend, 'INFORM', [
103 queues, 103 queues,
104 self.SERVICE_TYPE 104 self.SERVICE_TYPE
105 ]) 105 ])
@@ -116,7 +116,7 @@ class EMQPService(object):
116 also run on a reset are here. 116 also run on a reset are here.
117 """ 117 """
118 # Look for incoming events 118 # Look for incoming events
119 self.poller.register(self.outgoing, poller.POLLIN) 119 self.poller.register(self.frontend, poller.POLLIN)
120 self.awaiting_startup_ack = False 120 self.awaiting_startup_ack = False
121 self.received_disconnect = False 121 self.received_disconnect = False
122 self.should_reset = False 122 self.should_reset = False
@@ -132,7 +132,7 @@ class EMQPService(object):
132 """ 132 """
133 while not self.received_disconnect: 133 while not self.received_disconnect:
134 self.status = constants.STATUS.connecting 134 self.status = constants.STATUS.connecting
135 self.outgoing.connect(addr) 135 self.frontend.connect(addr)
136 136
137 # Setting this to false is how the loop is broken and the 137 # Setting this to false is how the loop is broken and the
138 # _event_loop is started. 138 # _event_loop is started.
@@ -149,8 +149,8 @@ class EMQPService(object):
149 # multiplied by 1000 to get seconds 149 # multiplied by 1000 to get seconds
150 events = self.poller.poll(conf.RECONNECT_TIMEOUT * 1000) 150 events = self.poller.poll(conf.RECONNECT_TIMEOUT * 1000)
151 151
152 if self.outgoing in events: # A message from the Router! 152 if self.frontend in events: # A message from the Router!
153 msg = self.outgoing.recv_multipart() 153 msg = self.frontend.recv_multipart()
154 # TODO This will silently drop messages that aren't 154 # TODO This will silently drop messages that aren't
155 # ACK/DISCONNECT 155 # ACK/DISCONNECT
156 if msg[2] == "ACK" or msg[2] == "DISCONNECT": 156 if msg[2] == "ACK" or msg[2] == "DISCONNECT":
@@ -176,10 +176,10 @@ class EMQPService(object):
176 Resets the current connection by closing and reopening the socket 176 Resets the current connection by closing and reopening the socket
177 """ 177 """
178 # Unregister the old socket from the poller 178 # Unregister the old socket from the poller
179 self.poller.unregister(self.outgoing) 179 self.poller.unregister(self.frontend)
180 180
181 # Polish up a new socket to use 181 # Polish up a new socket to use
182 self.outgoing.rebuild() 182 self.frontend.rebuild()
183 183
184 # Prepare the device to connect again 184 # Prepare the device to connect again
185 self._setup() 185 self._setup()
@@ -323,7 +323,7 @@ class HeartbeatMixin(object):
323 # Send a HEARTBEAT if necessary 323 # Send a HEARTBEAT if necessary
324 if now - self._meta['last_sent_heartbeat'] >= \ 324 if now - self._meta['last_sent_heartbeat'] >= \
325 conf.HEARTBEAT_INTERVAL: 325 conf.HEARTBEAT_INTERVAL:
326 self.send_heartbeat(self.outgoing) 326 self.send_heartbeat(self.frontend)
327 327
328 # Do something about any missed HEARTBEAT, if we have nothing 328 # Do something about any missed HEARTBEAT, if we have nothing
329 # waiting on the socket 329 # waiting on the socket
diff --git a/setup.py b/setup.py
index 067b352..8e6e731 100644
--- a/setup.py
+++ b/setup.py
@@ -61,6 +61,6 @@ setup(
61 'bin/emq-jobmanager', 61 'bin/emq-jobmanager',
62 'bin/emq-router', 62 'bin/emq-router',
63 'bin/emq-scheduler', 63 'bin/emq-scheduler',
64 'bin/emq-pubsub' 64 'bin/emq-publisher'
65 ], 65 ],
66) 66)