From bbbb6d1de4f9424040b416aefe9737f6cdcd044d Mon Sep 17 00:00:00 2001 From: Karamaz0V1 Date: Tue, 11 Sep 2018 17:42:14 +0200 Subject: [PATCH] Refactor Supervisor --- supervisor.py | 145 +++++++++----------------------------------------- 1 file changed, 24 insertions(+), 121 deletions(-) diff --git a/supervisor.py b/supervisor.py index 07f1fd8..bada05e 100644 --- a/supervisor.py +++ b/supervisor.py @@ -71,7 +71,7 @@ def run(expe_file): log.info('Run test {}'.format(expe_file)) test = expe_file.read() - ### Stage test + ### Stage experience expe_file.stage(test) ### Load protocol @@ -88,9 +88,11 @@ def run(expe_file): raise TestError(err) log.info('{} test protocol loaded'.format(experience)) - ### Write hahes + ### Get hashes test['hashes'] = experience.get_hashes() - test['report'] = create_report(start_time) + test['report'] = create_report(experience, start_time) + + ### Stage experience expe_file.stage(test) experience.set_results_base_name(expe_file.get_result_path()) @@ -106,78 +108,33 @@ def run(expe_file): end_time = time.time() - ### Write complete report - report = create_report(start_time, end_time) - ressources = OrderedDict() - ressources['ram'] = None - ressources['proccess_time'] = experience.get_process_time() - report['ressources'] = ressources - test['report'] = report + ### Get complete report + test['report'] = create_report(experience, start_time, end_time) - ### Write results + ### Get results test['results'] = experience.get_results() + + ### Write experience expe_file.result(test) ### End of test log.info('Test complete') - return - - - ### Create output names - oname = '{}_{}'.format(expe_file.stem, expe_hashes['global'][:6]) - oname_yml = oname + '.yml' - oname_tif = oname + '.tif' - - ### Create partial report - expe_report = create_report(kronos) - - ### Stage expe - log.info('Staging test') - write_expe_file(STAGING_DIR / oname_yml, expe, expe_hashes, expe_report) - expe_file.unlink() - ### Compute descriptors - log.info('Compute descriptors') - try: - descriptors = compute_descriptors(expe) - except Exception as e: - kronos.time('description') - expe_report = create_report(kronos) - (STAGING_DIR / oname_yml).unlink() - write_error(FAILED_DIR / oname_yml, expe, expe_hashes, expe_report, 'description', e) - raise TestError('Error occured during description') +def create_report(experience, stime=None, etime=None): + expe_report = OrderedDict() + expe_report['supervisor'] = os.uname()[1] - kronos.time('description') + # Dates + for datek, timev in zip(('start_date', 'end_date'), (stime, etime)): + expe_report[datek] = datetime.datetime.fromtimestamp(timev).strftime('Le %d/%m/%Y à %H:%M:%S') if timev is not None else None - ### Compute classification - log.info('Classify data') - try: - classification = compute_classification(expe, descriptors) - except Exception as e: - kronos.time('classification') - expe_report = create_report(kronos) - (STAGING_DIR / oname_yml).unlink() - write_error(FAILED_DIR / oname_yml, expe, expe_hashes, expe_report, 'classification', e) - raise TestError('Error occured during classification') - - kronos.time('classification') - - ### Metrics - log.info('Run initial metrics') - metrics = run_metrics(expe, classification, descriptors) - kronos.time('metrics') - - ### Create complete report - log.info('Write complete report') - expe_report = create_report(kronos) - - ### Name and write prediction - triskele.write(RESULT_DIR / oname_tif, classification) - - ### Write report and results - (STAGING_DIR / oname_yml).unlink() - write_expe_file(RESULT_DIR / oname_yml, expe, expe_hashes, expe_report, oname_tif, metrics) + # Ressources + ressources = OrderedDict() + ressources['ram'] = None + ressources['proccess_time'] = experience.get_process_time() + expe_report['ressources'] = ressources + return expe_report class ExpePath: """Utility wrapper for expe files. @@ -242,66 +199,12 @@ class ExpePath: self._actual.unlink() self._actual = new_path - - -def write_expe_file(file, expe, hashes=None, report=None, classification=None, results=None): - with open(file, 'w') as of: - yaml.dump(OrderedDict({'expe': expe, - 'expe_hashes': hashes, - 'expe_report': report, - 'expe_classification': classification, - 'expe_results': results}), - of, default_flow_style=False, encoding=None, allow_unicode=True) - - -def compute_hashes(expe): - glob = hashlib.sha1() - - expe_hashes = OrderedDict() - - for k in ['ground_truth', 'descriptors_script', 'cross_validation', 'classifier']: - v = str(expe[k]).encode('utf-8') - expe_hashes[k] = hashlib.sha1(v).hexdigest() - glob.update(v) - expe_hashes['global'] = glob.hexdigest() - return expe_hashes - -def create_report(stime=None, etime=None): - expe_report = OrderedDict() - expe_report['supervisor'] = os.uname()[1] - for datek, timev in zip(('start_date', 'end_date'), (stime, etime)): - expe_report[datek] = datetime.datetime.fromtimestamp(timev).strftime('Le %d/%m/%Y à %H:%M:%S') if timev is not None else None - - return expe_report def watch_folder(): log.info('Waiting for test') while not list(TEST_DIR.glob('*.yml')): time.sleep(3) -class Kronos(object): - def __init__(self): - self._pt = time.process_time() - self._stime = time.time() - self._etime = None - - def time(self, name): - self._times[name + '_process_time'] = time.process_time() - self._pt - self._pt = time.process_time() - self._etime = time.time() - - def get_times(self): - return self._times - - def get_start_date(self): - return self._stime - - def get_end_date(self): - return self._etime - - - - def main(): while(True): try: @@ -310,7 +213,7 @@ def main(): log.error('Critical exception while updating work queue') log.error(traceback.format_exc()) log.warning('Resuming') - break # continue + continue if not queue: watch_folder() continue @@ -322,7 +225,7 @@ def main(): log.error('Critical exception while running test. Resuming') log.error(traceback.format_exc()) log.warning('Resuming') - break # continue + continue if __name__ == '__main__': logger.setup_logging()