minigrida/supervisor.py

333 lines
9.8 KiB
Python

#!/usr/bin/python
# -*- coding: utf-8 -*-
# \file supervisor.py
# \brief TODO
# \author Florent Guiotte <florent.guiotte@gmail.com>
# \version 0.1
# \date 07 sept. 2018
#
# TODO details
import yaml
import importlib
import hashlib
from collections import OrderedDict
import time
import os
import datetime
from pathlib import Path
from operator import itemgetter
import traceback
import logging
import logger
from protocols.protocol import TestError
ENRICHMENT_DIR = Path('./Enrichment/')
TEST_DIR = ENRICHMENT_DIR / 'Tests'
STAGING_DIR = ENRICHMENT_DIR / 'Staging'
RESULT_DIR = ENRICHMENT_DIR / 'Results'
FAILED_DIR = ENRICHMENT_DIR / 'Failed'
log = logging.getLogger('Supervisor [{}]'.format(os.uname()[1]))
def setup_yaml():
""" Keep yaml ordered, newline string
from https://stackoverflow.com/a/8661021
"""
represent_dict_order = lambda self, data: self.represent_mapping('tag:yaml.org,2002:map', data.items())
yaml.add_representer(OrderedDict, represent_dict_order)
""" https://stackoverflow.com/a/24291536 """
yaml.Dumper.org_represent_str = yaml.Dumper.represent_str
yaml.add_representer(str, repr_str, Dumper=yaml.Dumper)
def repr_str(dumper, data):
if '\n' in data:
return dumper.represent_scalar(u'tag:yaml.org,2002:str',
data, style='|')
return dumper.org_represent_str(data)
def update_queue():
tmp_queue = list()
for child in TEST_DIR.iterdir():
if child.is_file() and child.suffix == '.yml':
tmp_queue.append({'expe_file': ExpePath(child),
'priority': get_priority(child)})
queue = sorted(tmp_queue, key=itemgetter('priority'))
return queue
def get_priority(yml_file):
with open(yml_file) as f:
expe = OrderedDict(yaml.safe_load(f))
return expe['priority']
def run(expe_file):
start_time = time.time()
log.info('Run test {}'.format(expe_file))
test = expe_file.read()
### Stage test
expe_file.stage(test)
### Load protocol
try:
#protocol = getattr(importlib.import_module(test['protocol']['package']), test['protocol']['name'])
protocol_module = importlib.import_module(test['protocol']['package'])
importlib.reload(protocol_module)
protocol = getattr(protocol_module, test['protocol']['name'])
experience = protocol(test['expe'])
except Exception as e:
err = 'Could not load protocol from test {}'.format(expe_file)
log.warning(err)
expe_file.error(test, 'loading protocol', e)
raise TestError(err)
log.info('{} test protocol loaded'.format(experience))
### Write hahes
test['hashes'] = experience.get_hashes()
test['report'] = create_report(start_time)
expe_file.stage(test)
experience.set_results_base_name(expe_file.get_result_path())
### Run test
try:
experience.run()
except Exception as e:
err = 'Experience error'
log.warning(err)
expe_file.error(test, 'testing', e)
raise TestError(err)
end_time = time.time()
### Write complete report
report = create_report(start_time, end_time)
ressources = OrderedDict()
ressources['ram'] = None
ressources['proccess_time'] = experience.get_process_time()
report['ressources'] = ressources
test['report'] = report
### Write results
test['results'] = experience.get_results()
expe_file.result(test)
### End of test
log.info('Test complete')
return
### Create output names
oname = '{}_{}'.format(expe_file.stem, expe_hashes['global'][:6])
oname_yml = oname + '.yml'
oname_tif = oname + '.tif'
### Create partial report
expe_report = create_report(kronos)
### Stage expe
log.info('Staging test')
write_expe_file(STAGING_DIR / oname_yml, expe, expe_hashes, expe_report)
expe_file.unlink()
### Compute descriptors
log.info('Compute descriptors')
try:
descriptors = compute_descriptors(expe)
except Exception as e:
kronos.time('description')
expe_report = create_report(kronos)
(STAGING_DIR / oname_yml).unlink()
write_error(FAILED_DIR / oname_yml, expe, expe_hashes, expe_report, 'description', e)
raise TestError('Error occured during description')
kronos.time('description')
### Compute classification
log.info('Classify data')
try:
classification = compute_classification(expe, descriptors)
except Exception as e:
kronos.time('classification')
expe_report = create_report(kronos)
(STAGING_DIR / oname_yml).unlink()
write_error(FAILED_DIR / oname_yml, expe, expe_hashes, expe_report, 'classification', e)
raise TestError('Error occured during classification')
kronos.time('classification')
### Metrics
log.info('Run initial metrics')
metrics = run_metrics(expe, classification, descriptors)
kronos.time('metrics')
### Create complete report
log.info('Write complete report')
expe_report = create_report(kronos)
### Name and write prediction
triskele.write(RESULT_DIR / oname_tif, classification)
### Write report and results
(STAGING_DIR / oname_yml).unlink()
write_expe_file(RESULT_DIR / oname_yml, expe, expe_hashes, expe_report, oname_tif, metrics)
class ExpePath:
"""Utility wrapper for expe files.
Extend pathlib Path with staging, result and errors function to move the
test report through the Enrichment center.
"""
def __init__(self, path, hash_length=6):
self._actual = Path(path)
self._base_name = self._actual.stem
self._hash_length = hash_length
self._hash = None
def __str__(self):
return self._get_complete_name()
def read(self):
with open(self._actual) as f:
return OrderedDict(yaml.safe_load(f))
def _get_hash_name(self):
return '{}{}'.format(self._base_name,
'_' + self._hash[:self._hash_length] if self._hash is not None else '')
def _get_complete_name(self):
return self._get_hash_name() + '.yml'
def stage(self, expe):
log.info('Staging {}'.format(self._base_name))
self._check_hash(expe)
self._write(STAGING_DIR, expe)
def result(self, expe):
log.info('Write results for test {}'.format(self._base_name))
self._check_hash(expe)
self._write(RESULT_DIR, expe)
def error(self, expe, when='', e=Exception):
error = OrderedDict()
error['when'] = when
error['what'] = str(e)
error['where'] = traceback.format_exc()
expe['error'] = error
self._write(FAILED_DIR, expe)
def get_result_path(self):
return Path(RESULT_DIR) / self._get_hash_name()
def _check_hash(self, expe):
if self._hash is None:
if 'hashes' in expe:
self._hash = expe['hashes']['global']
def _write(self, path, expe):
new_path = Path(path) / self._get_complete_name()
with open(new_path, 'w') as of:
yaml.dump(expe, of,
default_flow_style=False,
encoding=None,
allow_unicode=True)
self._actual.unlink()
self._actual = new_path
def write_expe_file(file, expe, hashes=None, report=None, classification=None, results=None):
with open(file, 'w') as of:
yaml.dump(OrderedDict({'expe': expe,
'expe_hashes': hashes,
'expe_report': report,
'expe_classification': classification,
'expe_results': results}),
of, default_flow_style=False, encoding=None, allow_unicode=True)
def compute_hashes(expe):
glob = hashlib.sha1()
expe_hashes = OrderedDict()
for k in ['ground_truth', 'descriptors_script', 'cross_validation', 'classifier']:
v = str(expe[k]).encode('utf-8')
expe_hashes[k] = hashlib.sha1(v).hexdigest()
glob.update(v)
expe_hashes['global'] = glob.hexdigest()
return expe_hashes
def create_report(stime=None, etime=None):
expe_report = OrderedDict()
expe_report['supervisor'] = os.uname()[1]
for datek, timev in zip(('start_date', 'end_date'), (stime, etime)):
expe_report[datek] = datetime.datetime.fromtimestamp(timev).strftime('Le %d/%m/%Y à %H:%M:%S') if timev is not None else None
return expe_report
def watch_folder():
log.info('Waiting for test')
while not list(TEST_DIR.glob('*.yml')):
time.sleep(3)
class Kronos(object):
def __init__(self):
self._pt = time.process_time()
self._stime = time.time()
self._etime = None
def time(self, name):
self._times[name + '_process_time'] = time.process_time() - self._pt
self._pt = time.process_time()
self._etime = time.time()
def get_times(self):
return self._times
def get_start_date(self):
return self._stime
def get_end_date(self):
return self._etime
def main():
while(True):
try:
queue = update_queue()
except Exception:
log.error('Critical exception while updating work queue')
log.error(traceback.format_exc())
log.warning('Resuming')
break # continue
if not queue:
watch_folder()
continue
try:
run(queue.pop()['expe_file'])
except TestError:
log.warning('Test failed, error logged. Resuming')
except Exception:
log.error('Critical exception while running test. Resuming')
log.error(traceback.format_exc())
log.warning('Resuming')
break # continue
if __name__ == '__main__':
logger.setup_logging()
log.info('Starting supervisor')
setup_yaml()
main()