diff options
| author | David Hurst | 2016-12-05 16:52:36 -0700 |
|---|---|---|
| committer | GitHub | 2016-12-05 16:52:36 -0700 |
| commit | 24026ef566372ffbe55360a214f270a18dd5d4da (patch) | |
| tree | ba57954ce3d50dde3703f630f708f94e06e03d47 | |
| parent | 27f071b7e470bc5b31193e4887e84c63bddd5d43 (diff) | |
| parent | 664094d1eb15cccc3ac16a5f7cf5640cfea39ff5 (diff) | |
| download | eventmq-0.2.7.tar.gz eventmq-0.2.7.zip | |
Merge pull request #56 from com4/jobmanager-graceful-shutdown0.2.7
Implement cleanup for SIGTERM, SIGQUIT, & SIGINT
| -rw-r--r-- | eventmq/jobmanager.py | 10 | ||||
| -rw-r--r-- | eventmq/tests/test_jobmanager.py | 10 | ||||
| -rw-r--r-- | eventmq/utils/classes.py | 7 |
3 files changed, 25 insertions, 2 deletions
diff --git a/eventmq/jobmanager.py b/eventmq/jobmanager.py index f3ce5e9..9c696ff 100644 --- a/eventmq/jobmanager.py +++ b/eventmq/jobmanager.py | |||
| @@ -79,6 +79,9 @@ class JobManager(HeartbeatMixin, EMQPService): | |||
| 79 | if not kwargs.pop('skip_signal', False): | 79 | if not kwargs.pop('skip_signal', False): |
| 80 | # handle any sighups by reloading config | 80 | # handle any sighups by reloading config |
| 81 | signal.signal(signal.SIGHUP, self.sighup_handler) | 81 | signal.signal(signal.SIGHUP, self.sighup_handler) |
| 82 | signal.signal(signal.SIGTERM, self.sigterm_handler) | ||
| 83 | signal.signal(signal.SIGINT, self.sigterm_handler) | ||
| 84 | signal.signal(signal.SIGQUIT, self.sigterm_handler) | ||
| 82 | 85 | ||
| 83 | #: JobManager starts out by INFORMing the router of it's existence, | 86 | #: JobManager starts out by INFORMing the router of it's existence, |
| 84 | #: then telling the router that it is READY. The reply will be the unit | 87 | #: then telling the router that it is READY. The reply will be the unit |
| @@ -232,6 +235,13 @@ class JobManager(HeartbeatMixin, EMQPService): | |||
| 232 | import_settings(section='jobmanager') | 235 | import_settings(section='jobmanager') |
| 233 | self.start(addr=conf.WORKER_ADDR) | 236 | self.start(addr=conf.WORKER_ADDR) |
| 234 | 237 | ||
| 238 | def sigterm_handler(self, signum, frame): | ||
| 239 | logger.info('Shutting down..') | ||
| 240 | sendmsg(self.outgoing, KBYE) | ||
| 241 | |||
| 242 | self.awaiting_startup_ack = False | ||
| 243 | self.received_disconnect = True | ||
| 244 | |||
| 235 | def jobmanager_main(self, broker_addr=None): | 245 | def jobmanager_main(self, broker_addr=None): |
| 236 | """ | 246 | """ |
| 237 | Kick off jobmanager with logging and settings import | 247 | Kick off jobmanager with logging and settings import |
diff --git a/eventmq/tests/test_jobmanager.py b/eventmq/tests/test_jobmanager.py index 370eb88..4c0d7ae 100644 --- a/eventmq/tests/test_jobmanager.py +++ b/eventmq/tests/test_jobmanager.py | |||
| @@ -134,6 +134,16 @@ class TestCase(unittest.TestCase): | |||
| 134 | addr=conf.WORKER_ADDR, | 134 | addr=conf.WORKER_ADDR, |
| 135 | ) | 135 | ) |
| 136 | 136 | ||
| 137 | @mock.patch('eventmq.jobmanager.sendmsg') | ||
| 138 | def test_sigterm_handler(self, sendmsg_mock): | ||
| 139 | jm = jobmanager.JobManager() | ||
| 140 | |||
| 141 | jm.sigterm_handler(13231, "FRAMEY the evil frame") | ||
| 142 | |||
| 143 | sendmsg_mock.assert_called_with(jm.outgoing, constants.KBYE) | ||
| 144 | self.assertFalse(jm.awaiting_startup_ack) | ||
| 145 | self.assertTrue(jm.received_disconnect) | ||
| 146 | |||
| 137 | @mock.patch('eventmq.jobmanager.JobManager.start') | 147 | @mock.patch('eventmq.jobmanager.JobManager.start') |
| 138 | @mock.patch('eventmq.jobmanager.import_settings') | 148 | @mock.patch('eventmq.jobmanager.import_settings') |
| 139 | def test_jobmanager_main(self, import_settings_mock, start_mock): | 149 | def test_jobmanager_main(self, import_settings_mock, start_mock): |
diff --git a/eventmq/utils/classes.py b/eventmq/utils/classes.py index be38e80..3cc3b49 100644 --- a/eventmq/utils/classes.py +++ b/eventmq/utils/classes.py | |||
| @@ -157,8 +157,11 @@ class EMQPService(object): | |||
| 157 | self.process_message(msg) | 157 | self.process_message(msg) |
| 158 | 158 | ||
| 159 | self.status = constants.STATUS.connected | 159 | self.status = constants.STATUS.connected |
| 160 | logger.info('Starting event loop...') | 160 | |
| 161 | self._start_event_loop() | 161 | if not self.received_disconnect: |
| 162 | logger.info('Starting event loop...') | ||
| 163 | self._start_event_loop() | ||
| 164 | |||
| 162 | # When we return, soemthing has gone wrong and try to reconnect | 165 | # When we return, soemthing has gone wrong and try to reconnect |
| 163 | # unless self.received_disconnect is True | 166 | # unless self.received_disconnect is True |
| 164 | if not self.received_disconnect: | 167 | if not self.received_disconnect: |