minigrida/supervisor.py

236 lines
6.9 KiB
Python

#!/usr/bin/python
# -*- coding: utf-8 -*-
# \file supervisor.py
# \brief TODO
# \author Florent Guiotte <florent.guiotte@gmail.com>
# \version 0.1
# \date 07 sept. 2018
#
# TODO details
import yaml
import importlib
import hashlib
from collections import OrderedDict
import time
import os
import datetime
from pathlib import Path
from operator import itemgetter
import traceback
import logging
import logger
from protocols.protocol import TestError
ENRICHMENT_DIR = Path('./Enrichment/')
TEST_DIR = ENRICHMENT_DIR / 'Tests'
STAGING_DIR = ENRICHMENT_DIR / 'Staging'
RESULT_DIR = ENRICHMENT_DIR / 'Results'
FAILED_DIR = ENRICHMENT_DIR / 'Failed'
log = logging.getLogger('Supervisor [{}]'.format(os.uname()[1]))
def setup_yaml():
""" Keep yaml ordered, newline string
from https://stackoverflow.com/a/8661021
"""
represent_dict_order = lambda self, data: self.represent_mapping('tag:yaml.org,2002:map', data.items())
yaml.add_representer(OrderedDict, represent_dict_order)
""" https://stackoverflow.com/a/24291536 """
yaml.Dumper.org_represent_str = yaml.Dumper.represent_str
yaml.add_representer(str, repr_str, Dumper=yaml.Dumper)
def repr_str(dumper, data):
if '\n' in data:
return dumper.represent_scalar(u'tag:yaml.org,2002:str',
data, style='|')
return dumper.org_represent_str(data)
def update_queue():
tmp_queue = list()
for child in TEST_DIR.iterdir():
if child.is_file() and child.suffix == '.yml':
tmp_queue.append({'expe_file': child,
'priority': get_priority(child)})
queue = sorted(tmp_queue, key=itemgetter('priority'))
return queue
def get_priority(yml_file):
with open(yml_file) as f:
expe = OrderedDict(yaml.safe_load(f))
return expe['priority']
def run(expe_file):
log.info('Run test {}'.format(expe_file))
with open(expe_file) as f:
test = OrderedDict(yaml.safe_load(f))
### Stage test
### Load protocol
protocol = getattr(importlib.import_module('protocols.jurse'), test['protocol'])
experience = protocol(test['expe'])
log.info('{} test protocol loaded'.format(experience))
### Write hahes
hashes = experience.get_hashes()
log.info(hashes)
### Run test
### Write report
### Write results
### End of test
return
### Create output names
oname = '{}_{}'.format(expe_file.stem, expe_hashes['global'][:6])
oname_yml = oname + '.yml'
oname_tif = oname + '.tif'
### Create partial report
expe_report = create_report(kronos)
### Stage expe
log.info('Staging test')
write_expe_file(STAGING_DIR / oname_yml, expe, expe_hashes, expe_report)
expe_file.unlink()
### Compute descriptors
log.info('Compute descriptors')
try:
descriptors = compute_descriptors(expe)
except Exception as e:
kronos.time('description')
expe_report = create_report(kronos)
(STAGING_DIR / oname_yml).unlink()
write_error(FAILED_DIR / oname_yml, expe, expe_hashes, expe_report, 'description', e)
raise TestError('Error occured during description')
kronos.time('description')
### Compute classification
log.info('Classify data')
try:
classification = compute_classification(expe, descriptors)
except Exception as e:
kronos.time('classification')
expe_report = create_report(kronos)
(STAGING_DIR / oname_yml).unlink()
write_error(FAILED_DIR / oname_yml, expe, expe_hashes, expe_report, 'classification', e)
raise TestError('Error occured during classification')
kronos.time('classification')
### Metrics
log.info('Run initial metrics')
metrics = run_metrics(expe, classification, descriptors)
kronos.time('metrics')
### Create complete report
log.info('Write complete report')
expe_report = create_report(kronos)
### Name and write prediction
triskele.write(RESULT_DIR / oname_tif, classification)
### Write report and results
(STAGING_DIR / oname_yml).unlink()
write_expe_file(RESULT_DIR / oname_yml, expe, expe_hashes, expe_report, oname_tif, metrics)
log.info('Test complete')
def write_error(file, expe, hashes=None, report=None, when='', e=Exception):
error = OrderedDict()
error['when'] = when
error['what'] = str(e)
error['where'] = traceback.format_exc()
with open(file, 'w') as of:
yaml.dump(OrderedDict({'expe': expe,
'expe_hashes': hashes,
'expe_report': report,
'expe_error': error}),
of, default_flow_style=False, encoding=None, allow_unicode=True)
def write_expe_file(file, expe, hashes=None, report=None, classification=None, results=None):
with open(file, 'w') as of:
yaml.dump(OrderedDict({'expe': expe,
'expe_hashes': hashes,
'expe_report': report,
'expe_classification': classification,
'expe_results': results}),
of, default_flow_style=False, encoding=None, allow_unicode=True)
def compute_hashes(expe):
glob = hashlib.sha1()
expe_hashes = OrderedDict()
for k in ['ground_truth', 'descriptors_script', 'cross_validation', 'classifier']:
v = str(expe[k]).encode('utf-8')
expe_hashes[k] = hashlib.sha1(v).hexdigest()
glob.update(v)
expe_hashes['global'] = glob.hexdigest()
return expe_hashes
def create_report(kronos):
expe_report = OrderedDict()
expe_report['supervisor'] = os.uname()[1]
for timev, datek in zip((kronos.get_start_date(), kronos.get_end_date()), ('start_date', 'end_date')):
expe_report[datek] = datetime.datetime.fromtimestamp(timev).strftime('Le %d/%m/%Y à %H:%M:%S') if timev is not None else None
ressources = kronos.get_times()
ressources['ram'] = None
expe_report['ressources'] = ressources
return expe_report
def watch_folder():
log.info('Waiting for test')
while not list(TEST_DIR.glob('*.yml')):
time.sleep(10)
def main():
while(True):
try:
queue = update_queue()
except Exception:
log.error('Critical exception while updating work queue')
log.error(traceback.format_exc())
log.warning('Resuming')
break # continue
if not queue:
watch_folder()
continue
try:
run(queue.pop()['expe_file'])
except TestError:
log.warning('Test failed, error logged. Resuming')
except Exception:
log.error('Critical exception while running test. Resuming')
log.error(traceback.format_exc())
log.warning('Resuming')
break # continue
if __name__ == '__main__':
logger.setup_logging()
log.info('Starting supervisor')
setup_yaml()
main()