#!/usr/bin/python # -*- coding: utf-8 -*- # \file supervisor.py # \brief TODO # \author Florent Guiotte # \version 0.1 # \date 07 sept. 2018 # # TODO details import yaml import importlib import hashlib from collections import OrderedDict import time import os import datetime from pathlib import Path from operator import itemgetter import traceback import logging import logger from protocols.protocol import TestError ENRICHMENT_DIR = Path('./Enrichment/') TEST_DIR = ENRICHMENT_DIR / 'Tests' STAGING_DIR = ENRICHMENT_DIR / 'Staging' RESULT_DIR = ENRICHMENT_DIR / 'Results' FAILED_DIR = ENRICHMENT_DIR / 'Failed' log = logging.getLogger('Supervisor [{}]'.format(os.uname()[1])) def setup_yaml(): """ Keep yaml ordered, newline string from https://stackoverflow.com/a/8661021 """ represent_dict_order = lambda self, data: self.represent_mapping('tag:yaml.org,2002:map', data.items()) yaml.add_representer(OrderedDict, represent_dict_order) """ https://stackoverflow.com/a/24291536 """ yaml.Dumper.org_represent_str = yaml.Dumper.represent_str yaml.add_representer(str, repr_str, Dumper=yaml.Dumper) def repr_str(dumper, data): if '\n' in data: return dumper.represent_scalar(u'tag:yaml.org,2002:str', data, style='|') return dumper.org_represent_str(data) def update_queue(): tmp_queue = list() for child in TEST_DIR.iterdir(): if child.is_file() and child.suffix == '.yml': tmp_queue.append({'expe_file': ExpePath(child), 'priority': get_priority(child)}) queue = sorted(tmp_queue, key=itemgetter('priority')) return queue def get_priority(yml_file): with open(yml_file) as f: expe = OrderedDict(yaml.safe_load(f)) return expe['priority'] def run(expe_file): start_time = time.time() log.info('Run test {}'.format(expe_file)) test = expe_file.read() ### Stage experience expe_file.stage(test) ### Load protocol try: #protocol = getattr(importlib.import_module(test['protocol']['package']), test['protocol']['name']) protocol_module = importlib.import_module(test['protocol']['package']) importlib.reload(protocol_module) protocol = getattr(protocol_module, test['protocol']['name']) experience = protocol(test['expe']) except Exception as e: err = 'Could not load protocol from test {}'.format(expe_file) log.warning(err) expe_file.error(test, 'loading protocol', e) raise TestError(err) log.info('{} test protocol loaded'.format(experience)) ### Get hashes test['hashes'] = experience.get_hashes() test['report'] = create_report(experience, start_time) ### Stage experience expe_file.stage(test) experience.set_results_base_name(expe_file.get_result_path()) ### Run test try: experience.run() except Exception as e: err = 'Experience error' log.warning(err) expe_file.error(test, 'testing', e) raise TestError(err) end_time = time.time() ### Get complete report test['report'] = create_report(experience, start_time, end_time) ### Get results test['results'] = experience.get_results() ### Write experience expe_file.result(test) ### End of test log.info('Test complete') def create_report(experience, stime=None, etime=None): expe_report = OrderedDict() host = os.getenv("HOST") expe_report['supervisor'] = host if host is not None else os.uname()[1] # Dates for datek, timev in zip(('start_date', 'end_date'), (stime, etime)): expe_report[datek] = datetime.datetime.fromtimestamp(timev).strftime('Le %d/%m/%Y à %H:%M:%S') if timev is not None else None # Ressources ressources = OrderedDict() ressources['ram'] = None ressources['proccess_time'] = experience.get_process_time() expe_report['ressources'] = ressources return expe_report class ExpePath: """Utility wrapper for expe files. Extend pathlib Path with staging, result and errors function to move the test report through the Enrichment center. """ def __init__(self, path, hash_length=6): self._actual = Path(path) self._base_name = self._actual.stem self._hash_length = hash_length self._hash = None def __str__(self): return self._get_complete_name() def read(self): with open(self._actual) as f: return OrderedDict(yaml.safe_load(f)) def _get_hash_name(self): return '{}{}'.format(self._base_name, '_' + self._hash[:self._hash_length] if self._hash is not None else '') def _get_complete_name(self): return self._get_hash_name() + '.yml' def stage(self, expe): log.info('Staging {}'.format(self._base_name)) self._check_hash(expe) self._write(STAGING_DIR, expe) def result(self, expe): log.info('Write results for test {}'.format(self._base_name)) self._check_hash(expe) self._write(RESULT_DIR, expe) def error(self, expe, when='', e=Exception): error = OrderedDict() error['when'] = when error['what'] = str(e) error['where'] = traceback.format_exc() expe['error'] = error self._write(FAILED_DIR, expe) def get_result_path(self): return Path(RESULT_DIR) / self._get_hash_name() def _check_hash(self, expe): if self._hash is None: if 'hashes' in expe: self._hash = expe['hashes']['global'] def _write(self, path, expe): new_path = Path(path) / self._get_complete_name() with open(new_path, 'w') as of: yaml.dump(expe, of, default_flow_style=False, encoding=None, allow_unicode=True) self._actual.unlink() self._actual = new_path def watch_folder(): log.info('Waiting for test') while not list(TEST_DIR.glob('*.yml')): time.sleep(3) def main(): while(True): try: queue = update_queue() except Exception: log.error('Critical exception while updating work queue') log.error(traceback.format_exc()) log.warning('Resuming') continue if not queue: watch_folder() continue try: run(queue.pop()['expe_file']) except TestError: log.warning('Test failed, error logged. Resuming') except Exception: log.error('Critical exception while running test. Resuming') log.error(traceback.format_exc()) log.warning('Resuming') continue if __name__ == '__main__': logger.setup_logging() log.info('Starting supervisor') setup_yaml() main()