236 lines
6.7 KiB
Python
236 lines
6.7 KiB
Python
#!/usr/bin/python
|
|
# -*- coding: utf-8 -*-
|
|
# \file supervisor.py
|
|
# \brief TODO
|
|
# \author Florent Guiotte <florent.guiotte@gmail.com>
|
|
# \version 0.1
|
|
# \date 07 sept. 2018
|
|
#
|
|
# TODO details
|
|
|
|
import yaml
|
|
import importlib
|
|
import hashlib
|
|
from collections import OrderedDict
|
|
import time
|
|
import os
|
|
import datetime
|
|
from pathlib import Path
|
|
from operator import itemgetter
|
|
import traceback
|
|
import logging
|
|
import logger
|
|
from protocols.protocol import TestError
|
|
|
|
ENRICHMENT_DIR = Path('./Enrichment/')
|
|
TEST_DIR = ENRICHMENT_DIR / 'Tests'
|
|
STAGING_DIR = ENRICHMENT_DIR / 'Staging'
|
|
RESULT_DIR = ENRICHMENT_DIR / 'Results'
|
|
FAILED_DIR = ENRICHMENT_DIR / 'Failed'
|
|
|
|
log = logging.getLogger('Supervisor [{}]'.format(os.uname()[1]))
|
|
|
|
def setup_yaml():
|
|
""" Keep yaml ordered, newline string
|
|
from https://stackoverflow.com/a/8661021
|
|
"""
|
|
represent_dict_order = lambda self, data: self.represent_mapping('tag:yaml.org,2002:map', data.items())
|
|
yaml.add_representer(OrderedDict, represent_dict_order)
|
|
|
|
""" https://stackoverflow.com/a/24291536 """
|
|
yaml.Dumper.org_represent_str = yaml.Dumper.represent_str
|
|
yaml.add_representer(str, repr_str, Dumper=yaml.Dumper)
|
|
|
|
|
|
def repr_str(dumper, data):
|
|
if '\n' in data:
|
|
return dumper.represent_scalar(u'tag:yaml.org,2002:str',
|
|
data, style='|')
|
|
return dumper.org_represent_str(data)
|
|
|
|
|
|
def update_queue():
|
|
tmp_queue = list()
|
|
for child in TEST_DIR.iterdir():
|
|
if child.is_file() and child.suffix == '.yml':
|
|
tmp_queue.append({'expe_file': ExpePath(child),
|
|
'priority': get_priority(child)})
|
|
|
|
queue = sorted(tmp_queue, key=itemgetter('priority'))
|
|
return queue
|
|
|
|
|
|
def get_priority(yml_file):
|
|
with open(yml_file) as f:
|
|
expe = OrderedDict(yaml.safe_load(f))
|
|
return expe['priority']
|
|
|
|
|
|
def run(expe_file):
|
|
start_time = time.time()
|
|
log.info('Run test {}'.format(expe_file))
|
|
test = expe_file.read()
|
|
|
|
### Stage experience
|
|
expe_file.stage(test)
|
|
|
|
### Load protocol
|
|
try:
|
|
#protocol = getattr(importlib.import_module(test['protocol']['package']), test['protocol']['name'])
|
|
protocol_module = importlib.import_module(test['protocol']['package'])
|
|
importlib.reload(protocol_module)
|
|
protocol = getattr(protocol_module, test['protocol']['name'])
|
|
experience = protocol(test['expe'])
|
|
except Exception as e:
|
|
err = 'Could not load protocol from test {}'.format(expe_file)
|
|
log.warning(err)
|
|
expe_file.error(test, 'loading protocol', e)
|
|
raise TestError(err)
|
|
log.info('{} test protocol loaded'.format(experience))
|
|
|
|
### Get hashes
|
|
test['hashes'] = experience.get_hashes()
|
|
test['report'] = create_report(experience, start_time)
|
|
|
|
### Stage experience
|
|
expe_file.stage(test)
|
|
|
|
experience.set_results_base_name(expe_file.get_result_path())
|
|
|
|
### Run test
|
|
try:
|
|
experience.run()
|
|
except Exception as e:
|
|
err = 'Experience error'
|
|
log.warning(err)
|
|
expe_file.error(test, 'testing', e)
|
|
raise TestError(err)
|
|
|
|
end_time = time.time()
|
|
|
|
### Get complete report
|
|
test['report'] = create_report(experience, start_time, end_time)
|
|
|
|
### Get results
|
|
test['results'] = experience.get_results()
|
|
|
|
### Write experience
|
|
expe_file.result(test)
|
|
|
|
### End of test
|
|
log.info('Test complete')
|
|
|
|
def create_report(experience, stime=None, etime=None):
|
|
expe_report = OrderedDict()
|
|
expe_report['supervisor'] = os.environ["HOST"]
|
|
|
|
# Dates
|
|
for datek, timev in zip(('start_date', 'end_date'), (stime, etime)):
|
|
expe_report[datek] = datetime.datetime.fromtimestamp(timev).strftime('Le %d/%m/%Y à %H:%M:%S') if timev is not None else None
|
|
|
|
# Ressources
|
|
ressources = OrderedDict()
|
|
ressources['ram'] = None
|
|
ressources['proccess_time'] = experience.get_process_time()
|
|
expe_report['ressources'] = ressources
|
|
|
|
return expe_report
|
|
|
|
class ExpePath:
|
|
"""Utility wrapper for expe files.
|
|
|
|
Extend pathlib Path with staging, result and errors function to move the
|
|
test report through the Enrichment center.
|
|
|
|
"""
|
|
def __init__(self, path, hash_length=6):
|
|
self._actual = Path(path)
|
|
self._base_name = self._actual.stem
|
|
self._hash_length = hash_length
|
|
self._hash = None
|
|
|
|
def __str__(self):
|
|
return self._get_complete_name()
|
|
|
|
def read(self):
|
|
with open(self._actual) as f:
|
|
return OrderedDict(yaml.safe_load(f))
|
|
|
|
def _get_hash_name(self):
|
|
return '{}{}'.format(self._base_name,
|
|
'_' + self._hash[:self._hash_length] if self._hash is not None else '')
|
|
|
|
def _get_complete_name(self):
|
|
return self._get_hash_name() + '.yml'
|
|
|
|
def stage(self, expe):
|
|
log.info('Staging {}'.format(self._base_name))
|
|
self._check_hash(expe)
|
|
self._write(STAGING_DIR, expe)
|
|
|
|
def result(self, expe):
|
|
log.info('Write results for test {}'.format(self._base_name))
|
|
self._check_hash(expe)
|
|
self._write(RESULT_DIR, expe)
|
|
|
|
def error(self, expe, when='', e=Exception):
|
|
error = OrderedDict()
|
|
error['when'] = when
|
|
error['what'] = str(e)
|
|
error['where'] = traceback.format_exc()
|
|
expe['error'] = error
|
|
self._write(FAILED_DIR, expe)
|
|
|
|
def get_result_path(self):
|
|
return Path(RESULT_DIR) / self._get_hash_name()
|
|
|
|
def _check_hash(self, expe):
|
|
if self._hash is None:
|
|
if 'hashes' in expe:
|
|
self._hash = expe['hashes']['global']
|
|
|
|
def _write(self, path, expe):
|
|
new_path = Path(path) / self._get_complete_name()
|
|
with open(new_path, 'w') as of:
|
|
yaml.dump(expe, of,
|
|
default_flow_style=False,
|
|
encoding=None,
|
|
allow_unicode=True)
|
|
self._actual.unlink()
|
|
self._actual = new_path
|
|
|
|
|
|
def watch_folder():
|
|
log.info('Waiting for test')
|
|
while not list(TEST_DIR.glob('*.yml')):
|
|
time.sleep(3)
|
|
|
|
def main():
|
|
while(True):
|
|
try:
|
|
queue = update_queue()
|
|
except Exception:
|
|
log.error('Critical exception while updating work queue')
|
|
log.error(traceback.format_exc())
|
|
log.warning('Resuming')
|
|
continue
|
|
if not queue:
|
|
watch_folder()
|
|
continue
|
|
try:
|
|
run(queue.pop()['expe_file'])
|
|
except TestError:
|
|
log.warning('Test failed, error logged. Resuming')
|
|
except Exception:
|
|
log.error('Critical exception while running test. Resuming')
|
|
log.error(traceback.format_exc())
|
|
log.warning('Resuming')
|
|
continue
|
|
|
|
if __name__ == '__main__':
|
|
logger.setup_logging()
|
|
log.info('Starting supervisor')
|
|
|
|
setup_yaml()
|
|
main()
|