Refactor Supervisor

This commit is contained in:
Florent Guiotte 2018-09-11 17:42:14 +02:00
parent 314afce292
commit bbbb6d1de4

View File

@ -71,7 +71,7 @@ def run(expe_file):
log.info('Run test {}'.format(expe_file))
test = expe_file.read()
### Stage test
### Stage experience
expe_file.stage(test)
### Load protocol
@ -88,9 +88,11 @@ def run(expe_file):
raise TestError(err)
log.info('{} test protocol loaded'.format(experience))
### Write hahes
### Get hashes
test['hashes'] = experience.get_hashes()
test['report'] = create_report(start_time)
test['report'] = create_report(experience, start_time)
### Stage experience
expe_file.stage(test)
experience.set_results_base_name(expe_file.get_result_path())
@ -106,78 +108,33 @@ def run(expe_file):
end_time = time.time()
### Write complete report
report = create_report(start_time, end_time)
ressources = OrderedDict()
ressources['ram'] = None
ressources['proccess_time'] = experience.get_process_time()
report['ressources'] = ressources
test['report'] = report
### Get complete report
test['report'] = create_report(experience, start_time, end_time)
### Write results
### Get results
test['results'] = experience.get_results()
### Write experience
expe_file.result(test)
### End of test
log.info('Test complete')
return
def create_report(experience, stime=None, etime=None):
expe_report = OrderedDict()
expe_report['supervisor'] = os.uname()[1]
### Create output names
oname = '{}_{}'.format(expe_file.stem, expe_hashes['global'][:6])
oname_yml = oname + '.yml'
oname_tif = oname + '.tif'
# Dates
for datek, timev in zip(('start_date', 'end_date'), (stime, etime)):
expe_report[datek] = datetime.datetime.fromtimestamp(timev).strftime('Le %d/%m/%Y à %H:%M:%S') if timev is not None else None
### Create partial report
expe_report = create_report(kronos)
### Stage expe
log.info('Staging test')
write_expe_file(STAGING_DIR / oname_yml, expe, expe_hashes, expe_report)
expe_file.unlink()
### Compute descriptors
log.info('Compute descriptors')
try:
descriptors = compute_descriptors(expe)
except Exception as e:
kronos.time('description')
expe_report = create_report(kronos)
(STAGING_DIR / oname_yml).unlink()
write_error(FAILED_DIR / oname_yml, expe, expe_hashes, expe_report, 'description', e)
raise TestError('Error occured during description')
kronos.time('description')
### Compute classification
log.info('Classify data')
try:
classification = compute_classification(expe, descriptors)
except Exception as e:
kronos.time('classification')
expe_report = create_report(kronos)
(STAGING_DIR / oname_yml).unlink()
write_error(FAILED_DIR / oname_yml, expe, expe_hashes, expe_report, 'classification', e)
raise TestError('Error occured during classification')
kronos.time('classification')
### Metrics
log.info('Run initial metrics')
metrics = run_metrics(expe, classification, descriptors)
kronos.time('metrics')
### Create complete report
log.info('Write complete report')
expe_report = create_report(kronos)
### Name and write prediction
triskele.write(RESULT_DIR / oname_tif, classification)
### Write report and results
(STAGING_DIR / oname_yml).unlink()
write_expe_file(RESULT_DIR / oname_yml, expe, expe_hashes, expe_report, oname_tif, metrics)
# Ressources
ressources = OrderedDict()
ressources['ram'] = None
ressources['proccess_time'] = experience.get_process_time()
expe_report['ressources'] = ressources
return expe_report
class ExpePath:
"""Utility wrapper for expe files.
@ -243,65 +200,11 @@ class ExpePath:
self._actual = new_path
def write_expe_file(file, expe, hashes=None, report=None, classification=None, results=None):
with open(file, 'w') as of:
yaml.dump(OrderedDict({'expe': expe,
'expe_hashes': hashes,
'expe_report': report,
'expe_classification': classification,
'expe_results': results}),
of, default_flow_style=False, encoding=None, allow_unicode=True)
def compute_hashes(expe):
glob = hashlib.sha1()
expe_hashes = OrderedDict()
for k in ['ground_truth', 'descriptors_script', 'cross_validation', 'classifier']:
v = str(expe[k]).encode('utf-8')
expe_hashes[k] = hashlib.sha1(v).hexdigest()
glob.update(v)
expe_hashes['global'] = glob.hexdigest()
return expe_hashes
def create_report(stime=None, etime=None):
expe_report = OrderedDict()
expe_report['supervisor'] = os.uname()[1]
for datek, timev in zip(('start_date', 'end_date'), (stime, etime)):
expe_report[datek] = datetime.datetime.fromtimestamp(timev).strftime('Le %d/%m/%Y à %H:%M:%S') if timev is not None else None
return expe_report
def watch_folder():
log.info('Waiting for test')
while not list(TEST_DIR.glob('*.yml')):
time.sleep(3)
class Kronos(object):
def __init__(self):
self._pt = time.process_time()
self._stime = time.time()
self._etime = None
def time(self, name):
self._times[name + '_process_time'] = time.process_time() - self._pt
self._pt = time.process_time()
self._etime = time.time()
def get_times(self):
return self._times
def get_start_date(self):
return self._stime
def get_end_date(self):
return self._etime
def main():
while(True):
try:
@ -310,7 +213,7 @@ def main():
log.error('Critical exception while updating work queue')
log.error(traceback.format_exc())
log.warning('Resuming')
break # continue
continue
if not queue:
watch_folder()
continue
@ -322,7 +225,7 @@ def main():
log.error('Critical exception while running test. Resuming')
log.error(traceback.format_exc())
log.warning('Resuming')
break # continue
continue
if __name__ == '__main__':
logger.setup_logging()