diff options
| author | jason | 2017-03-20 12:31:40 -0600 |
|---|---|---|
| committer | jason | 2017-03-21 17:01:41 -0600 |
| commit | 57290ae8e83c07de60cc54f50300109689b9c11a (patch) | |
| tree | 3aca8be91c45a6d6e903c5409c7bf0dbffc5ef8d | |
| parent | 93fa6515f216f3ce6f447e54dfcfac47a4b30732 (diff) | |
| download | eventmq-57290ae8e83c07de60cc54f50300109689b9c11a.tar.gz eventmq-57290ae8e83c07de60cc54f50300109689b9c11a.zip | |
Rename incoming and outgoing ports to frontend and backend
- This refactors the `self.incoming` and `self.outgoing` sockets to
`self.frontend` (client facing) and `self.backend` (manager facing). This
change should make the purpose of these ports more clear for new people
working in the code.
Resolves: #16
| -rw-r--r-- | etc/eventmq.conf-dist | 4 | ||||
| -rw-r--r-- | eventmq/conf.py | 4 | ||||
| -rw-r--r-- | eventmq/jobmanager.py | 16 | ||||
| -rw-r--r-- | eventmq/pub.py | 28 | ||||
| -rw-r--r-- | eventmq/router.py | 55 | ||||
| -rw-r--r-- | eventmq/scheduler.py | 12 | ||||
| -rw-r--r-- | eventmq/tests/test_jobmanager.py | 8 | ||||
| -rw-r--r-- | eventmq/tests/test_receiver.py | 2 | ||||
| -rw-r--r-- | eventmq/tests/test_router.py | 36 | ||||
| -rw-r--r-- | eventmq/tests/test_utils.py | 12 | ||||
| -rw-r--r-- | eventmq/utils/classes.py | 20 | ||||
| -rw-r--r-- | setup.py | 2 |
12 files changed, 101 insertions, 98 deletions
diff --git a/etc/eventmq.conf-dist b/etc/eventmq.conf-dist index 43ec6f2..d492269 100644 --- a/etc/eventmq.conf-dist +++ b/etc/eventmq.conf-dist | |||
| @@ -23,5 +23,5 @@ queues=[[50,"google"], [40,"pushes"], [10,"default"]] | |||
| 23 | concurrent_jobs=2 | 23 | concurrent_jobs=2 |
| 24 | 24 | ||
| 25 | [publisher] | 25 | [publisher] |
| 26 | publisher_incoming_addr=tcp://0.0.0.0:47298 | 26 | publisher_frontend_addr=tcp://0.0.0.0:47298 |
| 27 | publisher_outgoing_addr=tcp://0.0.0.0:47299 \ No newline at end of file | 27 | publisher_backend_addr=tcp://0.0.0.0:47299 \ No newline at end of file |
diff --git a/eventmq/conf.py b/eventmq/conf.py index cec0249..16b1f78 100644 --- a/eventmq/conf.py +++ b/eventmq/conf.py | |||
| @@ -73,8 +73,8 @@ WORKER_ADDR_FAILOVER = 'tcp://127.0.0.1:47290' | |||
| 73 | ADMINISTRATIVE_ADDR = 'tcp://127.0.0.1:47293' | 73 | ADMINISTRATIVE_ADDR = 'tcp://127.0.0.1:47293' |
| 74 | 74 | ||
| 75 | # PubSub | 75 | # PubSub |
| 76 | PUBLISHER_INCOMING_ADDR = 'tcp://127.0.0.1:47298' | 76 | PUBLISHER_FRONTEND_ADDR = 'tcp://127.0.0.1:47298' |
| 77 | PUBLISHER_OUTGOING_ADDR = 'tcp://127.0.0.1:47299' | 77 | PUBLISHER_BACKEND_ADDR = 'tcp://127.0.0.1:47299' |
| 78 | 78 | ||
| 79 | # How many jobs should the job manager concurrently handle? | 79 | # How many jobs should the job manager concurrently handle? |
| 80 | CONCURRENT_JOBS = 4 | 80 | CONCURRENT_JOBS = 4 |
diff --git a/eventmq/jobmanager.py b/eventmq/jobmanager.py index ea5af07..831cee5 100644 --- a/eventmq/jobmanager.py +++ b/eventmq/jobmanager.py | |||
| @@ -95,7 +95,7 @@ class JobManager(HeartbeatMixin, EMQPService): | |||
| 95 | #: then telling the router that it is READY. The reply will be the unit | 95 | #: then telling the router that it is READY. The reply will be the unit |
| 96 | #: of work. | 96 | #: of work. |
| 97 | # Despite the name, jobs are received on this socket | 97 | # Despite the name, jobs are received on this socket |
| 98 | self.outgoing = Sender(name=self.name) | 98 | self.frontend = Sender(name=self.name) |
| 99 | 99 | ||
| 100 | self.poller = Poller() | 100 | self.poller = Poller() |
| 101 | 101 | ||
| @@ -149,8 +149,8 @@ class JobManager(HeartbeatMixin, EMQPService): | |||
| 149 | self.received_disconnect = True | 149 | self.received_disconnect = True |
| 150 | continue | 150 | continue |
| 151 | 151 | ||
| 152 | if events.get(self.outgoing) == POLLIN: | 152 | if events.get(self.frontend) == POLLIN: |
| 153 | msg = self.outgoing.recv_multipart() | 153 | msg = self.frontend.recv_multipart() |
| 154 | self.process_message(msg) | 154 | self.process_message(msg) |
| 155 | 155 | ||
| 156 | # Call appropiate callbacks for each finished job | 156 | # Call appropiate callbacks for each finished job |
| @@ -238,7 +238,7 @@ class JobManager(HeartbeatMixin, EMQPService): | |||
| 238 | send the READY command upstream to indicate that JobManager is ready | 238 | send the READY command upstream to indicate that JobManager is ready |
| 239 | for another REQUEST message. | 239 | for another REQUEST message. |
| 240 | """ | 240 | """ |
| 241 | sendmsg(self.outgoing, 'READY') | 241 | sendmsg(self.frontend, 'READY') |
| 242 | 242 | ||
| 243 | def send_reply(self, reply, msgid): | 243 | def send_reply(self, reply, msgid): |
| 244 | """ | 244 | """ |
| @@ -249,7 +249,7 @@ class JobManager(HeartbeatMixin, EMQPService): | |||
| 249 | recipient (str): The recipient id for the ack | 249 | recipient (str): The recipient id for the ack |
| 250 | msgid: The unique id that we are acknowledging | 250 | msgid: The unique id that we are acknowledging |
| 251 | """ | 251 | """ |
| 252 | sendmsg(self.outgoing, 'REPLY', [reply, msgid]) | 252 | sendmsg(self.frontend, 'REPLY', [reply, msgid]) |
| 253 | 253 | ||
| 254 | def on_heartbeat(self, msgid, message): | 254 | def on_heartbeat(self, msgid, message): |
| 255 | """ | 255 | """ |
| @@ -277,8 +277,8 @@ class JobManager(HeartbeatMixin, EMQPService): | |||
| 277 | self._workers.append(w) | 277 | self._workers.append(w) |
| 278 | 278 | ||
| 279 | def on_disconnect(self, msgid, msg): | 279 | def on_disconnect(self, msgid, msg): |
| 280 | sendmsg(self.outgoing, KBYE) | 280 | sendmsg(self.frontend, KBYE) |
| 281 | self.outgoing.unbind(conf.WORKER_ADDR) | 281 | self.frontend.unbind(conf.WORKER_ADDR) |
| 282 | super(JobManager, self).on_disconnect(msgid, msg) | 282 | super(JobManager, self).on_disconnect(msgid, msg) |
| 283 | 283 | ||
| 284 | def on_kbye(self, msgid, msg): | 284 | def on_kbye(self, msgid, msg): |
| @@ -295,7 +295,7 @@ class JobManager(HeartbeatMixin, EMQPService): | |||
| 295 | 295 | ||
| 296 | def sigterm_handler(self, signum, frame): | 296 | def sigterm_handler(self, signum, frame): |
| 297 | logger.info('Shutting down..') | 297 | logger.info('Shutting down..') |
| 298 | sendmsg(self.outgoing, KBYE) | 298 | sendmsg(self.frontend, KBYE) |
| 299 | 299 | ||
| 300 | self.awaiting_startup_ack = False | 300 | self.awaiting_startup_ack = False |
| 301 | self.received_disconnect = True | 301 | self.received_disconnect = True |
diff --git a/eventmq/pub.py b/eventmq/pub.py index 3918580..487e8c5 100644 --- a/eventmq/pub.py +++ b/eventmq/pub.py | |||
| @@ -32,26 +32,26 @@ logger = logging.getLogger(__name__) | |||
| 32 | class Pub(HeartbeatMixin): | 32 | class Pub(HeartbeatMixin): |
| 33 | def __init__(self): | 33 | def __init__(self): |
| 34 | self.poller = poller.Poller() | 34 | self.poller = poller.Poller() |
| 35 | self.incoming = receiver.Receiver() | 35 | self.frontend = receiver.Receiver() |
| 36 | self.outgoing = publisher.Publisher() | 36 | self.backend = publisher.Publisher() |
| 37 | 37 | ||
| 38 | self.received_disconnect = False | 38 | self.received_disconnect = False |
| 39 | 39 | ||
| 40 | self.poller.register(self.incoming, poller.POLLIN) | 40 | self.poller.register(self.frontend, poller.POLLIN) |
| 41 | return | 41 | return |
| 42 | 42 | ||
| 43 | def start(self, | 43 | def start(self, |
| 44 | incoming_addr=conf.PUBLISHER_INCOMING_ADDR, | 44 | frontend_addr=conf.PUBLISHER_FRONTEND_ADDR, |
| 45 | outgoing_addr=conf.PUBLISHER_OUTGOING_ADDR): | 45 | backend_addr=conf.PUBLISHER_BACKEND_ADDR): |
| 46 | 46 | ||
| 47 | self.status = STATUS.starting | 47 | self.status = STATUS.starting |
| 48 | 48 | ||
| 49 | self.incoming.listen(incoming_addr) | 49 | self.frontend.listen(frontend_addr) |
| 50 | self.outgoing.listen(outgoing_addr) | 50 | self.backend.listen(backend_addr) |
| 51 | 51 | ||
| 52 | logger.info('Listening for publish requests on {}'.format( | 52 | logger.info('Listening for publish requests on {}'.format( |
| 53 | incoming_addr)) | 53 | frontend_addr)) |
| 54 | logger.info('Listening for subscribers on {}'.format(outgoing_addr)) | 54 | logger.info('Listening for subscribers on {}'.format(backend_addr)) |
| 55 | 55 | ||
| 56 | self._start_event_loop() | 56 | self._start_event_loop() |
| 57 | 57 | ||
| @@ -63,8 +63,8 @@ class Pub(HeartbeatMixin): | |||
| 63 | 63 | ||
| 64 | events = self.poller.poll() | 64 | events = self.poller.poll() |
| 65 | 65 | ||
| 66 | if events.get(self.incoming) == poller.POLLIN: | 66 | if events.get(self.frontend) == poller.POLLIN: |
| 67 | msg = self.incoming.recv_multipart() | 67 | msg = self.frontend.recv_multipart() |
| 68 | self.process_client_message(msg) | 68 | self.process_client_message(msg) |
| 69 | 69 | ||
| 70 | def process_client_message(self, msg): | 70 | def process_client_message(self, msg): |
| @@ -77,7 +77,7 @@ class Pub(HeartbeatMixin): | |||
| 77 | logger.debug('Got Publish command') | 77 | logger.debug('Got Publish command') |
| 78 | topic = msg[5] | 78 | topic = msg[5] |
| 79 | sub_message = msg[6] | 79 | sub_message = msg[6] |
| 80 | logger.debug(self.outgoing.publish(topic, sub_message)) | 80 | logger.debug(self.backend.publish(topic, sub_message)) |
| 81 | 81 | ||
| 82 | return | 82 | return |
| 83 | 83 | ||
| @@ -87,8 +87,8 @@ class Pub(HeartbeatMixin): | |||
| 87 | """ | 87 | """ |
| 88 | setup_logger('eventmq') | 88 | setup_logger('eventmq') |
| 89 | import_settings(section='publisher') | 89 | import_settings(section='publisher') |
| 90 | self.start(incoming_addr=conf.PUBLISHER_INCOMING_ADDR, | 90 | self.start(frontend_addr=conf.PUBLISHER_FRONTEND_ADDR, |
| 91 | outgoing_addr=conf.PUBLISHER_OUTGOING_ADDR) | 91 | backend_addr=conf.PUBLISHER_BACKEND_ADDR) |
| 92 | 92 | ||
| 93 | 93 | ||
| 94 | # Entry point for pip console scripts | 94 | # Entry point for pip console scripts |
diff --git a/eventmq/router.py b/eventmq/router.py index 2943e3c..4ff3732 100644 --- a/eventmq/router.py +++ b/eventmq/router.py | |||
| @@ -55,12 +55,15 @@ class Router(HeartbeatMixin): | |||
| 55 | 55 | ||
| 56 | self.poller = poller.Poller() | 56 | self.poller = poller.Poller() |
| 57 | 57 | ||
| 58 | self.incoming = receiver.Receiver() | 58 | #: Port clients connect on. |
| 59 | self.outgoing = receiver.Receiver() | 59 | self.frontend = receiver.Receiver() |
| 60 | #: Port job managers connect on | ||
| 61 | self.backend = receiver.Receiver() | ||
| 62 | #: Port for administrative commands | ||
| 60 | self.administrative_socket = receiver.Receiver() | 63 | self.administrative_socket = receiver.Receiver() |
| 61 | 64 | ||
| 62 | self.poller.register(self.incoming, poller.POLLIN) | 65 | self.poller.register(self.frontend, poller.POLLIN) |
| 63 | self.poller.register(self.outgoing, poller.POLLIN) | 66 | self.poller.register(self.backend, poller.POLLIN) |
| 64 | self.poller.register(self.administrative_socket, poller.POLLIN) | 67 | self.poller.register(self.administrative_socket, poller.POLLIN) |
| 65 | 68 | ||
| 66 | self.status = STATUS.ready | 69 | self.status = STATUS.ready |
| @@ -147,8 +150,8 @@ class Router(HeartbeatMixin): | |||
| 147 | """ | 150 | """ |
| 148 | self.status = STATUS.starting | 151 | self.status = STATUS.starting |
| 149 | 152 | ||
| 150 | self.incoming.listen(frontend_addr) | 153 | self.frontend.listen(frontend_addr) |
| 151 | self.outgoing.listen(backend_addr) | 154 | self.backend.listen(backend_addr) |
| 152 | self.administrative_socket.listen(administrative_addr) | 155 | self.administrative_socket.listen(administrative_addr) |
| 153 | 156 | ||
| 154 | self.status = STATUS.listening | 157 | self.status = STATUS.listening |
| @@ -169,12 +172,12 @@ class Router(HeartbeatMixin): | |||
| 169 | now = monotonic() | 172 | now = monotonic() |
| 170 | events = self.poller.poll() | 173 | events = self.poller.poll() |
| 171 | 174 | ||
| 172 | if events.get(self.incoming) == poller.POLLIN: | 175 | if events.get(self.frontend) == poller.POLLIN: |
| 173 | msg = self.incoming.recv_multipart() | 176 | msg = self.frontend.recv_multipart() |
| 174 | self.process_client_message(msg) | 177 | self.process_client_message(msg) |
| 175 | 178 | ||
| 176 | if events.get(self.outgoing) == poller.POLLIN: | 179 | if events.get(self.backend) == poller.POLLIN: |
| 177 | msg = self.outgoing.recv_multipart() | 180 | msg = self.backend.recv_multipart() |
| 178 | self.process_worker_message(msg) | 181 | self.process_worker_message(msg) |
| 179 | 182 | ||
| 180 | if events.get(self.administrative_socket) == poller.POLLIN: | 183 | if events.get(self.administrative_socket) == poller.POLLIN: |
| @@ -275,7 +278,7 @@ class Router(HeartbeatMixin): | |||
| 275 | self._meta['last_sent_heartbeat'] = monotonic() | 278 | self._meta['last_sent_heartbeat'] = monotonic() |
| 276 | 279 | ||
| 277 | for worker_id in self.workers: | 280 | for worker_id in self.workers: |
| 278 | self.send_heartbeat(self.outgoing, worker_id) | 281 | self.send_heartbeat(self.backend, worker_id) |
| 279 | 282 | ||
| 280 | def send_schedulers_heartbeats(self): | 283 | def send_schedulers_heartbeats(self): |
| 281 | """ | 284 | """ |
| @@ -284,7 +287,7 @@ class Router(HeartbeatMixin): | |||
| 284 | self._meta['last_sent_scheduler_heartbeat'] = monotonic() | 287 | self._meta['last_sent_scheduler_heartbeat'] = monotonic() |
| 285 | 288 | ||
| 286 | for scheduler_id in self.schedulers: | 289 | for scheduler_id in self.schedulers: |
| 287 | self.send_heartbeat(self.incoming, scheduler_id) | 290 | self.send_heartbeat(self.frontend, scheduler_id) |
| 288 | 291 | ||
| 289 | def on_heartbeat(self, sender, msgid, msg): | 292 | def on_heartbeat(self, sender, msgid, msg): |
| 290 | """ | 293 | """ |
| @@ -319,10 +322,10 @@ class Router(HeartbeatMixin): | |||
| 319 | 322 | ||
| 320 | if client_type == CLIENT_TYPE.worker: | 323 | if client_type == CLIENT_TYPE.worker: |
| 321 | self.add_worker(sender, queues) | 324 | self.add_worker(sender, queues) |
| 322 | self.send_ack(self.outgoing, sender, msgid) | 325 | self.send_ack(self.backend, sender, msgid) |
| 323 | elif client_type == CLIENT_TYPE.scheduler: | 326 | elif client_type == CLIENT_TYPE.scheduler: |
| 324 | self.add_scheduler(sender) | 327 | self.add_scheduler(sender) |
| 325 | self.send_ack(self.incoming, sender, msgid) | 328 | self.send_ack(self.frontend, sender, msgid) |
| 326 | 329 | ||
| 327 | def on_reply(self, sender, msgid, msg): | 330 | def on_reply(self, sender, msgid, msg): |
| 328 | """ | 331 | """ |
| @@ -353,10 +356,10 @@ class Router(HeartbeatMixin): | |||
| 353 | # Remove schedulers and send them a kbye | 356 | # Remove schedulers and send them a kbye |
| 354 | logger.info("Router preparing to disconnect...") | 357 | logger.info("Router preparing to disconnect...") |
| 355 | for scheduler in self.schedulers: | 358 | for scheduler in self.schedulers: |
| 356 | self.send_kbye(self.incoming, scheduler) | 359 | self.send_kbye(self.frontend, scheduler) |
| 357 | 360 | ||
| 358 | self.schedulers.clear() | 361 | self.schedulers.clear() |
| 359 | self.incoming.unbind(conf.FRONTEND_ADDR) | 362 | self.frontend.unbind(conf.FRONTEND_ADDR) |
| 360 | 363 | ||
| 361 | if len(self.waiting_messages) > 0: | 364 | if len(self.waiting_messages) > 0: |
| 362 | logger.info("Router processing messages in queue.") | 365 | logger.info("Router processing messages in queue.") |
| @@ -366,10 +369,10 @@ class Router(HeartbeatMixin): | |||
| 366 | self.process_worker_message(msg) | 369 | self.process_worker_message(msg) |
| 367 | 370 | ||
| 368 | for worker in self.workers.keys(): | 371 | for worker in self.workers.keys(): |
| 369 | self.send_kbye(self.outgoing, worker) | 372 | self.send_kbye(self.backend, worker) |
| 370 | 373 | ||
| 371 | self.workers.clear() | 374 | self.workers.clear() |
| 372 | self.outgoing.unbind(conf.BACKEND_ADDR) | 375 | self.backend.unbind(conf.BACKEND_ADDR) |
| 373 | 376 | ||
| 374 | # Loops event loops should check for this and break out | 377 | # Loops event loops should check for this and break out |
| 375 | self.received_disconnect = True | 378 | self.received_disconnect = True |
| @@ -400,7 +403,7 @@ class Router(HeartbeatMixin): | |||
| 400 | msg = self.waiting_messages[queue_name].peekleft() | 403 | msg = self.waiting_messages[queue_name].peekleft() |
| 401 | 404 | ||
| 402 | try: | 405 | try: |
| 403 | fwdmsg(self.outgoing, sender, msg) | 406 | fwdmsg(self.backend, sender, msg) |
| 404 | self.waiting_messages[queue_name].popleft() | 407 | self.waiting_messages[queue_name].popleft() |
| 405 | except exceptions.PeerGoneAwayError: | 408 | except exceptions.PeerGoneAwayError: |
| 406 | # Cleanup a workerg that cannot be contacted, leaving the | 409 | # Cleanup a workerg that cannot be contacted, leaving the |
| @@ -494,12 +497,12 @@ class Router(HeartbeatMixin): | |||
| 494 | 497 | ||
| 495 | # Rebuild the message to be sent to the worker. fwdmsg will | 498 | # Rebuild the message to be sent to the worker. fwdmsg will |
| 496 | # properly address the message. | 499 | # properly address the message. |
| 497 | fwdmsg(self.outgoing, worker_addr, ['', constants.PROTOCOL_VERSION, | 500 | fwdmsg(self.backend, worker_addr, ['', constants.PROTOCOL_VERSION, |
| 498 | 'REQUEST', msgid, ] + msg) | 501 | 'REQUEST', msgid, ] + msg) |
| 499 | 502 | ||
| 500 | self.workers[worker_addr]['available_slots'] -= 1 | 503 | self.workers[worker_addr]['available_slots'] -= 1 |
| 501 | # Acknowledgment of the request being submitted to the client | 504 | # Acknowledgment of the request being submitted to the client |
| 502 | sendmsg(self.incoming, sender, 'REPLY', | 505 | sendmsg(self.frontend, sender, 'REPLY', |
| 503 | (msgid,)) | 506 | (msgid,)) |
| 504 | except exceptions.PeerGoneAwayError: | 507 | except exceptions.PeerGoneAwayError: |
| 505 | logger.debug( | 508 | logger.debug( |
| @@ -760,7 +763,7 @@ class Router(HeartbeatMixin): | |||
| 760 | try: | 763 | try: |
| 761 | # Strips off the client id before forwarding because the | 764 | # Strips off the client id before forwarding because the |
| 762 | # scheduler isn't expecting it. | 765 | # scheduler isn't expecting it. |
| 763 | fwdmsg(self.incoming, scheduler_addr, original_msg[1:]) | 766 | fwdmsg(self.frontend, scheduler_addr, original_msg[1:]) |
| 764 | 767 | ||
| 765 | except exceptions.PeerGoneAwayError: | 768 | except exceptions.PeerGoneAwayError: |
| 766 | logger.debug("Scheduler {} has unexpectedly gone away. Trying " | 769 | logger.debug("Scheduler {} has unexpectedly gone away. Trying " |
| @@ -777,7 +780,7 @@ class Router(HeartbeatMixin): | |||
| 777 | try: | 780 | try: |
| 778 | # Strips off the client id before forwarding because the | 781 | # Strips off the client id before forwarding because the |
| 779 | # scheduler isn't expecting it. | 782 | # scheduler isn't expecting it. |
| 780 | fwdmsg(self.incoming, scheduler_addr, original_msg[1:]) | 783 | fwdmsg(self.frontend, scheduler_addr, original_msg[1:]) |
| 781 | 784 | ||
| 782 | except exceptions.PeerGoneAwayError: | 785 | except exceptions.PeerGoneAwayError: |
| 783 | logger.debug("Scheduler {} has unexpectedly gone away." | 786 | logger.debug("Scheduler {} has unexpectedly gone away." |
| @@ -902,8 +905,8 @@ class Router(HeartbeatMixin): | |||
| 902 | process receives a SIGHUP from the system. | 905 | process receives a SIGHUP from the system. |
| 903 | """ | 906 | """ |
| 904 | logger.info('Caught signame %s' % signum) | 907 | logger.info('Caught signame %s' % signum) |
| 905 | self.incoming.unbind(conf.FRONTEND_ADDR) | 908 | self.frontend.unbind(conf.FRONTEND_ADDR) |
| 906 | self.outgoing.unbind(conf.BACKEND_ADDR) | 909 | self.backend.unbind(conf.BACKEND_ADDR) |
| 907 | import_settings() | 910 | import_settings() |
| 908 | self.start(frontend_addr=conf.FRONTEND_ADDR, | 911 | self.start(frontend_addr=conf.FRONTEND_ADDR, |
| 909 | backend_addr=conf.BACKEND_ADDR, | 912 | backend_addr=conf.BACKEND_ADDR, |
diff --git a/eventmq/scheduler.py b/eventmq/scheduler.py index 4bac781..0d953a6 100644 --- a/eventmq/scheduler.py +++ b/eventmq/scheduler.py | |||
| @@ -56,7 +56,7 @@ class Scheduler(HeartbeatMixin, EMQPService): | |||
| 56 | logger.info('Initializing Scheduler...') | 56 | logger.info('Initializing Scheduler...') |
| 57 | import_settings() | 57 | import_settings() |
| 58 | super(Scheduler, self).__init__(*args, **kwargs) | 58 | super(Scheduler, self).__init__(*args, **kwargs) |
| 59 | self.outgoing = Sender() | 59 | self.frontend = Sender() |
| 60 | self._redis_server = None | 60 | self._redis_server = None |
| 61 | 61 | ||
| 62 | # contains dict of 4-item lists representing cron jobs key of this | 62 | # contains dict of 4-item lists representing cron jobs key of this |
| @@ -120,8 +120,8 @@ class Scheduler(HeartbeatMixin, EMQPService): | |||
| 120 | m_now = monotonic() | 120 | m_now = monotonic() |
| 121 | events = self.poller.poll() | 121 | events = self.poller.poll() |
| 122 | 122 | ||
| 123 | if events.get(self.outgoing) == POLLIN: | 123 | if events.get(self.frontend) == POLLIN: |
| 124 | msg = self.outgoing.recv_multipart() | 124 | msg = self.frontend.recv_multipart() |
| 125 | self.process_message(msg) | 125 | self.process_message(msg) |
| 126 | 126 | ||
| 127 | # TODO: distribute me! | 127 | # TODO: distribute me! |
| @@ -213,7 +213,7 @@ class Scheduler(HeartbeatMixin, EMQPService): | |||
| 213 | str: ID of the message | 213 | str: ID of the message |
| 214 | """ | 214 | """ |
| 215 | jobmsg = json.loads(jobmsg) | 215 | jobmsg = json.loads(jobmsg) |
| 216 | msgid = send_request(self.outgoing, jobmsg, queue=queue, | 216 | msgid = send_request(self.frontend, jobmsg, queue=queue, |
| 217 | reply_requested=True) | 217 | reply_requested=True) |
| 218 | 218 | ||
| 219 | return msgid | 219 | return msgid |
| @@ -221,8 +221,8 @@ class Scheduler(HeartbeatMixin, EMQPService): | |||
| 221 | def on_disconnect(self, msgid, message): | 221 | def on_disconnect(self, msgid, message): |
| 222 | logger.info("Received DISCONNECT request: {}".format(message)) | 222 | logger.info("Received DISCONNECT request: {}".format(message)) |
| 223 | self._redis_server.connection_pool.disconnect() | 223 | self._redis_server.connection_pool.disconnect() |
| 224 | sendmsg(self.outgoing, KBYE) | 224 | sendmsg(self.frontend, KBYE) |
| 225 | self.outgoing.unbind(conf.SCHEDULER_ADDR) | 225 | self.frontend.unbind(conf.SCHEDULER_ADDR) |
| 226 | super(Scheduler, self).on_disconnect(msgid, message) | 226 | super(Scheduler, self).on_disconnect(msgid, message) |
| 227 | 227 | ||
| 228 | def on_kbye(self, msgid, msg): | 228 | def on_kbye(self, msgid, msg): |
diff --git a/eventmq/tests/test_jobmanager.py b/eventmq/tests/test_jobmanager.py index db82843..82fed3f 100644 --- a/eventmq/tests/test_jobmanager.py +++ b/eventmq/tests/test_jobmanager.py | |||
| @@ -42,7 +42,7 @@ class TestCase(unittest.TestCase): | |||
| 42 | jm = jobmanager.JobManager() | 42 | jm = jobmanager.JobManager() |
| 43 | jm.send_ready() | 43 | jm.send_ready() |
| 44 | 44 | ||
| 45 | sndmsg_mock.assert_called_with(jm.outgoing, 'READY') | 45 | sndmsg_mock.assert_called_with(jm.frontend, 'READY') |
| 46 | 46 | ||
| 47 | @mock.patch('multiprocessing.pool.Pool.close') | 47 | @mock.patch('multiprocessing.pool.Pool.close') |
| 48 | @mock.patch('eventmq.jobmanager.JobManager.process_message') | 48 | @mock.patch('eventmq.jobmanager.JobManager.process_message') |
| @@ -55,7 +55,7 @@ class TestCase(unittest.TestCase): | |||
| 55 | pool_close_mock): | 55 | pool_close_mock): |
| 56 | jm = jobmanager.JobManager() | 56 | jm = jobmanager.JobManager() |
| 57 | maybe_send_hb_mock.return_value = False | 57 | maybe_send_hb_mock.return_value = False |
| 58 | poll_mock.return_value = {jm.outgoing: jobmanager.POLLIN} | 58 | poll_mock.return_value = {jm.frontend: jobmanager.POLLIN} |
| 59 | sender_mock.return_value = [1, 2, 3] | 59 | sender_mock.return_value = [1, 2, 3] |
| 60 | 60 | ||
| 61 | jm._start_event_loop() | 61 | jm._start_event_loop() |
| @@ -106,7 +106,7 @@ class TestCase(unittest.TestCase): | |||
| 106 | socket_mock.return_value = True | 106 | socket_mock.return_value = True |
| 107 | 107 | ||
| 108 | jm = jobmanager.JobManager() | 108 | jm = jobmanager.JobManager() |
| 109 | jm.outgoing.status = constants.STATUS.listening | 109 | jm.frontend.status = constants.STATUS.listening |
| 110 | jm.on_disconnect(msgid, msg) | 110 | jm.on_disconnect(msgid, msg) |
| 111 | self.assertTrue(jm.received_disconnect, "Did not receive disconnect.") | 111 | self.assertTrue(jm.received_disconnect, "Did not receive disconnect.") |
| 112 | 112 | ||
| @@ -129,7 +129,7 @@ class TestCase(unittest.TestCase): | |||
| 129 | 129 | ||
| 130 | jm.sigterm_handler(13231, "FRAMEY the evil frame") | 130 | jm.sigterm_handler(13231, "FRAMEY the evil frame") |
| 131 | 131 | ||
| 132 | sendmsg_mock.assert_called_with(jm.outgoing, constants.KBYE) | 132 | sendmsg_mock.assert_called_with(jm.frontend, constants.KBYE) |
| 133 | self.assertFalse(jm.awaiting_startup_ack) | 133 | self.assertFalse(jm.awaiting_startup_ack) |
| 134 | self.assertTrue(jm.received_disconnect) | 134 | self.assertTrue(jm.received_disconnect) |
| 135 | 135 | ||
diff --git a/eventmq/tests/test_receiver.py b/eventmq/tests/test_receiver.py index 4ee7170..49eef69 100644 --- a/eventmq/tests/test_receiver.py +++ b/eventmq/tests/test_receiver.py | |||
| @@ -24,7 +24,7 @@ class TestCase(unittest.TestCase): | |||
| 24 | self.zcontext = zmq.Context.instance() | 24 | self.zcontext = zmq.Context.instance() |
| 25 | 25 | ||
| 26 | self.router = router.Router() | 26 | self.router = router.Router() |
| 27 | self.receiver = self.router.incoming | 27 | self.receiver = self.router.frontend |
| 28 | self.sender = sender.Sender() | 28 | self.sender = sender.Sender() |
| 29 | 29 | ||
| 30 | def test_send_multipart_unicode(self): | 30 | def test_send_multipart_unicode(self): |
diff --git a/eventmq/tests/test_router.py b/eventmq/tests/test_router.py index fdb02d0..10441eb 100644 --- a/eventmq/tests/test_router.py +++ b/eventmq/tests/test_router.py | |||
| @@ -29,16 +29,16 @@ class TestCase(unittest.TestCase): | |||
| 29 | def setUp(self): | 29 | def setUp(self): |
| 30 | self.router = router.Router(skip_signal=True) | 30 | self.router = router.Router(skip_signal=True) |
| 31 | self.router.zcontext = mock.Mock(spec=zmq.Context) | 31 | self.router.zcontext = mock.Mock(spec=zmq.Context) |
| 32 | self.router.incoming = mock.Mock(spec=receiver.Receiver) | 32 | self.router.frontend = mock.Mock(spec=receiver.Receiver) |
| 33 | self.router.outgoing = mock.Mock(spec=receiver.Receiver) | 33 | self.router.backend = mock.Mock(spec=receiver.Receiver) |
| 34 | 34 | ||
| 35 | @mock.patch('eventmq.receiver.zmq.Socket.bind') | 35 | @mock.patch('eventmq.receiver.zmq.Socket.bind') |
| 36 | @mock.patch('eventmq.router.Router._start_event_loop') | 36 | @mock.patch('eventmq.router.Router._start_event_loop') |
| 37 | def test_start(self, event_loop_mock, zsocket_bind_mock): | 37 | def test_start(self, event_loop_mock, zsocket_bind_mock): |
| 38 | # Test default args | 38 | # Test default args |
| 39 | self.router.start() | 39 | self.router.start() |
| 40 | self.router.incoming.listen.assert_called_with(conf.FRONTEND_ADDR) | 40 | self.router.frontend.listen.assert_called_with(conf.FRONTEND_ADDR) |
| 41 | self.router.outgoing.listen.assert_called_with(conf.BACKEND_ADDR) | 41 | self.router.backend.listen.assert_called_with(conf.BACKEND_ADDR) |
| 42 | self.assertEqual(self.router.status, constants.STATUS.listening) | 42 | self.assertEqual(self.router.status, constants.STATUS.listening) |
| 43 | 43 | ||
| 44 | # Test invalid args | 44 | # Test invalid args |
| @@ -55,7 +55,7 @@ class TestCase(unittest.TestCase): | |||
| 55 | sender_id, inform_msgid, [queues, 'worker']) | 55 | sender_id, inform_msgid, [queues, 'worker']) |
| 56 | 56 | ||
| 57 | self.router.send_ack.assert_called_with( | 57 | self.router.send_ack.assert_called_with( |
| 58 | self.router.outgoing, sender_id, inform_msgid) | 58 | self.router.backend, sender_id, inform_msgid) |
| 59 | 59 | ||
| 60 | self.router.add_worker.assert_called_with( | 60 | self.router.add_worker.assert_called_with( |
| 61 | sender_id, [(32, 'top'), (23, 'drop'), (12, 'shop')]) | 61 | sender_id, [(32, 'top'), (23, 'drop'), (12, 'shop')]) |
| @@ -75,7 +75,7 @@ class TestCase(unittest.TestCase): | |||
| 75 | sender_id, inform_msgid, [queues, constants.CLIENT_TYPE.worker]) | 75 | sender_id, inform_msgid, [queues, constants.CLIENT_TYPE.worker]) |
| 76 | 76 | ||
| 77 | self.router.send_ack.assert_called_with( | 77 | self.router.send_ack.assert_called_with( |
| 78 | self.router.outgoing, sender_id, inform_msgid) | 78 | self.router.backend, sender_id, inform_msgid) |
| 79 | self.router.add_worker.assert_called_with( | 79 | self.router.add_worker.assert_called_with( |
| 80 | sender_id, [(10, 'default'), ]) | 80 | sender_id, [(10, 'default'), ]) |
| 81 | 81 | ||
| @@ -146,10 +146,10 @@ class TestCase(unittest.TestCase): | |||
| 146 | 146 | ||
| 147 | generate_msgid_mock.return_value = ack_msgid | 147 | generate_msgid_mock.return_value = ack_msgid |
| 148 | 148 | ||
| 149 | self.router.send_ack(self.router.outgoing, sender_id, orig_msgid) | 149 | self.router.send_ack(self.router.backend, sender_id, orig_msgid) |
| 150 | 150 | ||
| 151 | # Verify that an ACK was sent for the INFORM | 151 | # Verify that an ACK was sent for the INFORM |
| 152 | self.router.outgoing.send_multipart.assert_called_with( | 152 | self.router.backend.send_multipart.assert_called_with( |
| 153 | ('ACK', ack_msgid, orig_msgid), | 153 | ('ACK', ack_msgid, orig_msgid), |
| 154 | constants.PROTOCOL_VERSION, _recipient_id=sender_id) | 154 | constants.PROTOCOL_VERSION, _recipient_id=sender_id) |
| 155 | 155 | ||
| @@ -168,9 +168,9 @@ class TestCase(unittest.TestCase): | |||
| 168 | 168 | ||
| 169 | generate_msgid_mock.return_value = msgid | 169 | generate_msgid_mock.return_value = msgid |
| 170 | 170 | ||
| 171 | self.router.send_heartbeat(self.router.incoming, recipient_id) | 171 | self.router.send_heartbeat(self.router.frontend, recipient_id) |
| 172 | 172 | ||
| 173 | self.router.incoming.send_multipart.assert_called_with( | 173 | self.router.frontend.send_multipart.assert_called_with( |
| 174 | ('HEARTBEAT', msgid, str(ts)), constants.PROTOCOL_VERSION, | 174 | ('HEARTBEAT', msgid, str(ts)), constants.PROTOCOL_VERSION, |
| 175 | _recipient_id=recipient_id) | 175 | _recipient_id=recipient_id) |
| 176 | 176 | ||
| @@ -197,8 +197,8 @@ class TestCase(unittest.TestCase): | |||
| 197 | self.assertGreater(self.router._meta['last_sent_heartbeat'], 0) | 197 | self.assertGreater(self.router._meta['last_sent_heartbeat'], 0) |
| 198 | 198 | ||
| 199 | send_heartbeat_mock.assert_has_calls( | 199 | send_heartbeat_mock.assert_has_calls( |
| 200 | [mock.call(self.router.outgoing, 'w1'), | 200 | [mock.call(self.router.backend, 'w1'), |
| 201 | mock.call(self.router.outgoing, 'w2')], any_order=True) | 201 | mock.call(self.router.backend, 'w2')], any_order=True) |
| 202 | 202 | ||
| 203 | @mock.patch('eventmq.router.Router.send_heartbeat') | 203 | @mock.patch('eventmq.router.Router.send_heartbeat') |
| 204 | def test_send_schedulers_heartbeats(self, send_hb_mock): | 204 | def test_send_schedulers_heartbeats(self, send_hb_mock): |
| @@ -215,7 +215,7 @@ class TestCase(unittest.TestCase): | |||
| 215 | 215 | ||
| 216 | self.assertGreater( | 216 | self.assertGreater( |
| 217 | self.router._meta['last_sent_scheduler_heartbeat'], 0) | 217 | self.router._meta['last_sent_scheduler_heartbeat'], 0) |
| 218 | send_hb_mock.assert_called_with(self.router.incoming, scheduler_id) | 218 | send_hb_mock.assert_called_with(self.router.frontend, scheduler_id) |
| 219 | 219 | ||
| 220 | def test_on_disconnect(self): | 220 | def test_on_disconnect(self): |
| 221 | self.assertFalse(self.router.received_disconnect) | 221 | self.assertFalse(self.router.received_disconnect) |
| @@ -245,7 +245,7 @@ class TestCase(unittest.TestCase): | |||
| 245 | 245 | ||
| 246 | self.router.on_ready(worker_id, msgid, msg) | 246 | self.router.on_ready(worker_id, msgid, msg) |
| 247 | 247 | ||
| 248 | fwdmsg_mock.assert_called_with(self.router.outgoing, worker_id, | 248 | fwdmsg_mock.assert_called_with(self.router.backend, worker_id, |
| 249 | waiting_msg) | 249 | waiting_msg) |
| 250 | 250 | ||
| 251 | self.router.on_ready(worker_id, msgid + 'a', msg) | 251 | self.router.on_ready(worker_id, msgid + 'a', msg) |
| @@ -296,19 +296,19 @@ class TestCase(unittest.TestCase): | |||
| 296 | # Forward waiting_msg1 | 296 | # Forward waiting_msg1 |
| 297 | ready_msgid1 = 'ready23' | 297 | ready_msgid1 = 'ready23' |
| 298 | self.router.on_ready(worker1_id, ready_msgid1, ['READY', ready_msgid1]) | 298 | self.router.on_ready(worker1_id, ready_msgid1, ['READY', ready_msgid1]) |
| 299 | fwdmsg_mock.assert_called_with(self.router.outgoing, worker1_id, | 299 | fwdmsg_mock.assert_called_with(self.router.backend, worker1_id, |
| 300 | waiting_msg1) | 300 | waiting_msg1) |
| 301 | 301 | ||
| 302 | # Forward waiting_msg3 -- blu is a higher priority for worker2 | 302 | # Forward waiting_msg3 -- blu is a higher priority for worker2 |
| 303 | ready_msgid3 = 'ready19' | 303 | ready_msgid3 = 'ready19' |
| 304 | self.router.on_ready(worker2_id, ready_msgid3, ['READY', ready_msgid3]) | 304 | self.router.on_ready(worker2_id, ready_msgid3, ['READY', ready_msgid3]) |
| 305 | fwdmsg_mock.assert_called_with(self.router.outgoing, worker2_id, | 305 | fwdmsg_mock.assert_called_with(self.router.backend, worker2_id, |
| 306 | waiting_msg3) | 306 | waiting_msg3) |
| 307 | 307 | ||
| 308 | # Forward waiting_msg2 | 308 | # Forward waiting_msg2 |
| 309 | ready_msgid2 = 'ready5' | 309 | ready_msgid2 = 'ready5' |
| 310 | self.router.on_ready(worker2_id, ready_msgid2, ['READY', ready_msgid2]) | 310 | self.router.on_ready(worker2_id, ready_msgid2, ['READY', ready_msgid2]) |
| 311 | fwdmsg_mock.assert_called_with(self.router.outgoing, worker2_id, | 311 | fwdmsg_mock.assert_called_with(self.router.backend, worker2_id, |
| 312 | waiting_msg2) | 312 | waiting_msg2) |
| 313 | 313 | ||
| 314 | # There should be no keys because the code checks for their existence | 314 | # There should be no keys because the code checks for their existence |
| @@ -348,7 +348,7 @@ class TestCase(unittest.TestCase): | |||
| 348 | 348 | ||
| 349 | # Router accepts job for 1 available slot | 349 | # Router accepts job for 1 available slot |
| 350 | self.router.on_request(client_id, msgid, msg) | 350 | self.router.on_request(client_id, msgid, msg) |
| 351 | fwdmsg_mock.assert_called_with(self.router.outgoing, worker_id, | 351 | fwdmsg_mock.assert_called_with(self.router.backend, worker_id, |
| 352 | ['', constants.PROTOCOL_VERSION, | 352 | ['', constants.PROTOCOL_VERSION, |
| 353 | 'REQUEST', msgid, ] + msg) | 353 | 'REQUEST', msgid, ] + msg) |
| 354 | self.assertEqual(self.router.workers[worker_id]['available_slots'], 0) | 354 | self.assertEqual(self.router.workers[worker_id]['available_slots'], 0) |
diff --git a/eventmq/tests/test_utils.py b/eventmq/tests/test_utils.py index 03da18a..996cddb 100644 --- a/eventmq/tests/test_utils.py +++ b/eventmq/tests/test_utils.py | |||
| @@ -128,13 +128,13 @@ class SettingsTestCase(unittest.TestCase): | |||
| 128 | class EMQPServiceTestCase(unittest.TestCase): | 128 | class EMQPServiceTestCase(unittest.TestCase): |
| 129 | 129 | ||
| 130 | # pretend to be an emq socket | 130 | # pretend to be an emq socket |
| 131 | outgoing = 'some-outgoing-socket' | 131 | frontend = 'some-frontend-socket' |
| 132 | 132 | ||
| 133 | def get_worker(self): | 133 | def get_worker(self): |
| 134 | """return an EMQPService mimicking a worker""" | 134 | """return an EMQPService mimicking a worker""" |
| 135 | obj = classes.EMQPService() | 135 | obj = classes.EMQPService() |
| 136 | obj.SERVICE_TYPE = constants.CLIENT_TYPE.worker | 136 | obj.SERVICE_TYPE = constants.CLIENT_TYPE.worker |
| 137 | obj.outgoing = self.outgoing | 137 | obj.frontend = self.frontend |
| 138 | obj._meta = { | 138 | obj._meta = { |
| 139 | 'last_sent_heartbeat': 0 | 139 | 'last_sent_heartbeat': 0 |
| 140 | } | 140 | } |
| @@ -145,7 +145,7 @@ class EMQPServiceTestCase(unittest.TestCase): | |||
| 145 | """return an EMQPService mimicking a scheduler""" | 145 | """return an EMQPService mimicking a scheduler""" |
| 146 | obj = classes.EMQPService() | 146 | obj = classes.EMQPService() |
| 147 | obj.SERVICE_TYPE = constants.CLIENT_TYPE.scheduler | 147 | obj.SERVICE_TYPE = constants.CLIENT_TYPE.scheduler |
| 148 | obj.outgoing = self.outgoing | 148 | obj.frontend = self.frontend |
| 149 | obj._meta = { | 149 | obj._meta = { |
| 150 | 'last_sent_heartbeat': 0 | 150 | 'last_sent_heartbeat': 0 |
| 151 | } | 151 | } |
| @@ -168,7 +168,7 @@ class EMQPServiceTestCase(unittest.TestCase): | |||
| 168 | obj.send_inform() | 168 | obj.send_inform() |
| 169 | 169 | ||
| 170 | sendmsg_mock.assert_called_with( | 170 | sendmsg_mock.assert_called_with( |
| 171 | self.outgoing, 'INFORM', | 171 | self.frontend, 'INFORM', |
| 172 | ['', constants.CLIENT_TYPE.worker]) | 172 | ['', constants.CLIENT_TYPE.worker]) |
| 173 | 173 | ||
| 174 | @mock.patch('eventmq.utils.classes.sendmsg') | 174 | @mock.patch('eventmq.utils.classes.sendmsg') |
| @@ -178,7 +178,7 @@ class EMQPServiceTestCase(unittest.TestCase): | |||
| 178 | obj.send_inform(queues="this shouldn't matter") | 178 | obj.send_inform(queues="this shouldn't matter") |
| 179 | 179 | ||
| 180 | sendmsg_mock.assert_called_with( | 180 | sendmsg_mock.assert_called_with( |
| 181 | self.outgoing, 'INFORM', | 181 | self.frontend, 'INFORM', |
| 182 | ['', constants.CLIENT_TYPE.scheduler] | 182 | ['', constants.CLIENT_TYPE.scheduler] |
| 183 | ) | 183 | ) |
| 184 | 184 | ||
| @@ -189,7 +189,7 @@ class EMQPServiceTestCase(unittest.TestCase): | |||
| 189 | obj.send_inform(queues=([10, 'push'], [7, 'email'], | 189 | obj.send_inform(queues=([10, 'push'], [7, 'email'], |
| 190 | [3, 'default'])) | 190 | [3, 'default'])) |
| 191 | sendmsg_mock.asert_called_with( | 191 | sendmsg_mock.asert_called_with( |
| 192 | 'some-outgoing-socket', 'INFORM', | 192 | 'some-frontend-socket', 'INFORM', |
| 193 | ['[10, "push"],[7, "email"],[3, "default"]', | 193 | ['[10, "push"],[7, "email"],[3, "default"]', |
| 194 | constants.CLIENT_TYPE.worker] | 194 | constants.CLIENT_TYPE.worker] |
| 195 | ) | 195 | ) |
diff --git a/eventmq/utils/classes.py b/eventmq/utils/classes.py index 39c600b..b9e09fb 100644 --- a/eventmq/utils/classes.py +++ b/eventmq/utils/classes.py | |||
| @@ -42,10 +42,10 @@ class EMQPService(object): | |||
| 42 | Also implements utlitiy methods for managing long-running processes. | 42 | Also implements utlitiy methods for managing long-running processes. |
| 43 | 43 | ||
| 44 | To use you must define: | 44 | To use you must define: |
| 45 | - `self.outgoing` - socket where messages can be sent to the Router | 45 | - `self.frontend` - socket where messages can be sent to the Router |
| 46 | - `self.SERVICE_TYPE` - defines the service type for INFORM. See | 46 | - `self.SERVICE_TYPE` - defines the service type for INFORM. See |
| 47 | :meth:`send_inform` for more information. | 47 | :meth:`send_inform` for more information. |
| 48 | - `self.poller` - the poller that `self.outgoing` will be using. | 48 | - `self.poller` - the poller that `self.frontend` will be using. |
| 49 | Usually: `self.poller = eventmq.poller.Poller()` | 49 | Usually: `self.poller = eventmq.poller.Poller()` |
| 50 | 50 | ||
| 51 | When messages are received from the router, they are processed in | 51 | When messages are received from the router, they are processed in |
| @@ -99,7 +99,7 @@ class EMQPService(object): | |||
| 99 | else: | 99 | else: |
| 100 | queues = '' | 100 | queues = '' |
| 101 | 101 | ||
| 102 | msgid = sendmsg(self.outgoing, 'INFORM', [ | 102 | msgid = sendmsg(self.frontend, 'INFORM', [ |
| 103 | queues, | 103 | queues, |
| 104 | self.SERVICE_TYPE | 104 | self.SERVICE_TYPE |
| 105 | ]) | 105 | ]) |
| @@ -116,7 +116,7 @@ class EMQPService(object): | |||
| 116 | also run on a reset are here. | 116 | also run on a reset are here. |
| 117 | """ | 117 | """ |
| 118 | # Look for incoming events | 118 | # Look for incoming events |
| 119 | self.poller.register(self.outgoing, poller.POLLIN) | 119 | self.poller.register(self.frontend, poller.POLLIN) |
| 120 | self.awaiting_startup_ack = False | 120 | self.awaiting_startup_ack = False |
| 121 | self.received_disconnect = False | 121 | self.received_disconnect = False |
| 122 | self.should_reset = False | 122 | self.should_reset = False |
| @@ -132,7 +132,7 @@ class EMQPService(object): | |||
| 132 | """ | 132 | """ |
| 133 | while not self.received_disconnect: | 133 | while not self.received_disconnect: |
| 134 | self.status = constants.STATUS.connecting | 134 | self.status = constants.STATUS.connecting |
| 135 | self.outgoing.connect(addr) | 135 | self.frontend.connect(addr) |
| 136 | 136 | ||
| 137 | # Setting this to false is how the loop is broken and the | 137 | # Setting this to false is how the loop is broken and the |
| 138 | # _event_loop is started. | 138 | # _event_loop is started. |
| @@ -149,8 +149,8 @@ class EMQPService(object): | |||
| 149 | # multiplied by 1000 to get seconds | 149 | # multiplied by 1000 to get seconds |
| 150 | events = self.poller.poll(conf.RECONNECT_TIMEOUT * 1000) | 150 | events = self.poller.poll(conf.RECONNECT_TIMEOUT * 1000) |
| 151 | 151 | ||
| 152 | if self.outgoing in events: # A message from the Router! | 152 | if self.frontend in events: # A message from the Router! |
| 153 | msg = self.outgoing.recv_multipart() | 153 | msg = self.frontend.recv_multipart() |
| 154 | # TODO This will silently drop messages that aren't | 154 | # TODO This will silently drop messages that aren't |
| 155 | # ACK/DISCONNECT | 155 | # ACK/DISCONNECT |
| 156 | if msg[2] == "ACK" or msg[2] == "DISCONNECT": | 156 | if msg[2] == "ACK" or msg[2] == "DISCONNECT": |
| @@ -176,10 +176,10 @@ class EMQPService(object): | |||
| 176 | Resets the current connection by closing and reopening the socket | 176 | Resets the current connection by closing and reopening the socket |
| 177 | """ | 177 | """ |
| 178 | # Unregister the old socket from the poller | 178 | # Unregister the old socket from the poller |
| 179 | self.poller.unregister(self.outgoing) | 179 | self.poller.unregister(self.frontend) |
| 180 | 180 | ||
| 181 | # Polish up a new socket to use | 181 | # Polish up a new socket to use |
| 182 | self.outgoing.rebuild() | 182 | self.frontend.rebuild() |
| 183 | 183 | ||
| 184 | # Prepare the device to connect again | 184 | # Prepare the device to connect again |
| 185 | self._setup() | 185 | self._setup() |
| @@ -323,7 +323,7 @@ class HeartbeatMixin(object): | |||
| 323 | # Send a HEARTBEAT if necessary | 323 | # Send a HEARTBEAT if necessary |
| 324 | if now - self._meta['last_sent_heartbeat'] >= \ | 324 | if now - self._meta['last_sent_heartbeat'] >= \ |
| 325 | conf.HEARTBEAT_INTERVAL: | 325 | conf.HEARTBEAT_INTERVAL: |
| 326 | self.send_heartbeat(self.outgoing) | 326 | self.send_heartbeat(self.frontend) |
| 327 | 327 | ||
| 328 | # Do something about any missed HEARTBEAT, if we have nothing | 328 | # Do something about any missed HEARTBEAT, if we have nothing |
| 329 | # waiting on the socket | 329 | # waiting on the socket |
| @@ -61,6 +61,6 @@ setup( | |||
| 61 | 'bin/emq-jobmanager', | 61 | 'bin/emq-jobmanager', |
| 62 | 'bin/emq-router', | 62 | 'bin/emq-router', |
| 63 | 'bin/emq-scheduler', | 63 | 'bin/emq-scheduler', |
| 64 | 'bin/emq-pubsub' | 64 | 'bin/emq-publisher' |
| 65 | ], | 65 | ], |
| 66 | ) | 66 | ) |