diff --git a/mockup.yml b/mockup.yml new file mode 100644 index 0000000..d55b970 --- /dev/null +++ b/mockup.yml @@ -0,0 +1,61 @@ +name: Première expérience +date: 9 juillet 2018 +priority: 1 +detail: | + Maquette pour la création du supervisor de minigrida. Par rapport à la + version legacy du projet LD2DAPs, le choix du protocole de test est + générique. Il faut ajouter le chargement dynamique du protocole puis + réusiner le fonctionnement du supervisor pour respecter l'esprit universel + de minigrida. +protocol: JurseSF +expe: + ground_truth: + raster: ./Data/ground_truth/2018_IEEE_GRSS_DFC_GT_TR.tif + meta_labels: GT/jurse_idx.csv + descriptors_script: + name: Descriptors.dfc_aps + parameters: + areas: + - 100 + - 1000 + moi: + - 0.5 + - 0.9 + rasters: + - ./Data/phase1_rasters/DEM+B_C123/UH17_GEM051_TR.tif + - ./Data/phase1_rasters/DEM_C123_3msr/UH17_GEG051_TR.tif + treshold: 1e4 + cross_validation: + name: CrossValidationGenerator.APsCVG + parameters: + n_test: 2 + classifier: + name: sklearn.ensemble.RandomForestClassifier + parameters: + min_samples_leaf: 10 + n_estimators: 50 + n_jobs: -1 + random_state: 0 +expe_hashes: + ground_truth: 2c5ecaddcb8c4a1c8863bc65e7440de4a1b4962c + descriptors_script: cfdcc84d9d9c47177930257f286d850db446812b + cross_validation: 4a61b34fda812fe717890b25d75430023335a7a6 + classifier: 40e6741ef8cc4b4fbe188b8ca0563eb5195b88ad + global: b8219fab322bf11ec1aac14a1f51466dd94ddbdd +expe_report: + supervisor: thecomedian + start_date: Le 27/07/2018 à 16:28:52 + end_date: Le 27/07/2018 à 16:29:54 + ressources: + ram: 42 Go + process_time: + description: 0.6744262149950373 + classification: 168.82905034400028 + metrics: 1.1557443889978458 +expe_results: + classification: test_b8219f.tif + dimensions: 42 + scores: + overall_accuracy: 0.5550408093111998 + cohen_kappa: 0.41714275852261407 + diff --git a/protocols/jurse_default.py b/protocols/jurse_default.py new file mode 100644 index 0000000..1575bb4 --- /dev/null +++ b/protocols/jurse_default.py @@ -0,0 +1,30 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- +# \file jurse_default.py +# \brief TODO +# \author Florent Guiotte +# \version 0.1 +# \date 07 sept. 2018 +# +# TODO details + +import hashlib +from protocol import Protocol + +class Jurse(Protocol): + def __init__(self, expe): + super().__init__(expe, self.__class__.__name__) + + def _get_hashes(self): + hashes = OrderedDict() + hashes['global'] = 'Protocol did not override _get_hashes()' + + glob = hashlib.sha1() + + for k in ['ground_truth', 'descriptors_script', 'cross_validation', 'classifier']: + v = str(expe[k]).encode('utf-8') + hashes[k] = hashlib.sha1(v).hexdigest() + glob.update(v) + hashes['global'] = glob.hexdigest() + + return hashes diff --git a/protocols/protocol.py b/protocols/protocol.py new file mode 100644 index 0000000..d55019e --- /dev/null +++ b/protocols/protocol.py @@ -0,0 +1,63 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- +# \file protocol.py +# \brief TODO +# \author Florent Guiotte +# \version 0.1 +# \date 07 sept. 2018 +# +# TODO details + +import logging +import time +from collections import OrderedDict + +class Protocol: + def __init__(self, expe, name=None): + self._log = logging.getLogger(name) + self._expe = expe + self._name = name + self._times = OrderedDict() + self._pt = time.process_time() + + def get_hashes(self): + self._log.info('Computing hashes') + return(self._get_hashes()) + + def run(self): + self._kronos = Kronos() + self._run() + + def get_results(self): + self._get_results() + + def get_process_time(self): + return self._times() + + def _time(self, process): + self._times[process] = time.process_time() - self._pt + self._pt = time.process_time() + + def _get_hashes(self): + self._log.warning('Protocol did not override _get_hashes()') + hashes = OrderedDict() + hashes['global'] = 'Protocol did not override _get_hashes()' + return hashes + + def _run(self): + self._log.error('Protocol did not override _run()') + raise NotImplementedError('Protocol {} did not override _run()'.format(self)) + + def _get_results(self): + self._log.warning('Protocol did not override _get_results()') + results = OrderedDict() + results['global'] = 'Protocol did not override _get_results()' + return results + + def __str__(self): + return('{}'.format(self._name)) + + +class TestError(Exception): + pass + diff --git a/supervisor.py b/supervisor.py new file mode 100644 index 0000000..f34a35b --- /dev/null +++ b/supervisor.py @@ -0,0 +1,301 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- +# \file supervisor.py +# \brief TODO +# \author Florent Guiotte +# \version 0.1 +# \date 07 sept. 2018 +# +# TODO details + +import yaml +import importlib +import hashlib +from collections import OrderedDict +import time +import os +import datetime +from pathlib import Path +from operator import itemgetter +import traceback +import logging +import logger + +log = logging.getLogger('Supervisor [{}]'.format(os.uname()[1])) + +### Keep yaml ordered, newline string +def setup_yaml(): + """ https://stackoverflow.com/a/8661021 """ + represent_dict_order = lambda self, data: self.represent_mapping('tag:yaml.org,2002:map', data.items()) + yaml.add_representer(OrderedDict, represent_dict_order) + + """ https://stackoverflow.com/a/24291536 """ + yaml.Dumper.org_represent_str = yaml.Dumper.represent_str + yaml.add_representer(str, repr_str, Dumper=yaml.Dumper) + +def repr_str(dumper, data): + if '\n' in data: + return dumper.represent_scalar(u'tag:yaml.org,2002:str', data, style='|') + return dumper.org_represent_str(data) + + + +ENRICHMENT_DIR = Path('./Enrichment/') +TEST_DIR = ENRICHMENT_DIR / 'Tests' +STAGING_DIR = ENRICHMENT_DIR / 'Staging' +RESULT_DIR = ENRICHMENT_DIR / 'Results' +FAILED_DIR = ENRICHMENT_DIR / 'Failed' + +def update_queue(): + tmp_queue = list() + for child in TEST_DIR.iterdir(): + if child.is_file() and child.suffix == '.yml': + tmp_queue.append({'expe_file': child, 'priority': get_priority(child)}) + + queue = sorted(tmp_queue, key=itemgetter('priority')) + return queue + +def get_priority(yml_file): + with open(yml_file) as f: + expe = OrderedDict(yaml.safe_load(f)) + return expe['priority'] + +def run(expe_file): + log.info('Run test {}'.format(expe_file)) + with open(expe_file) as f: + test = OrderedDict(yaml.safe_load(f)) + + ### Stage test + + ### Load protocol + + ### Write hahes + + ### Run test + + ### Write report + + ### Write results + + ### End of test + return + + ### Keep track of time + kronos = Kronos() + + ### Compute hashes + log.info('Computing hashes') + expe_hashes = compute_hashes(expe) + + ### Create output names + oname = '{}_{}'.format(expe_file.stem, expe_hashes['global'][:6]) + oname_yml = oname + '.yml' + oname_tif = oname + '.tif' + + ### Create partial report + expe_report = create_report(kronos) + + ### Stage expe + log.info('Staging test') + write_expe_file(STAGING_DIR / oname_yml, expe, expe_hashes, expe_report) + expe_file.unlink() + + ### Compute descriptors + log.info('Compute descriptors') + try: + descriptors = compute_descriptors(expe) + except Exception as e: + kronos.time('description') + expe_report = create_report(kronos) + (STAGING_DIR / oname_yml).unlink() + write_error(FAILED_DIR / oname_yml, expe, expe_hashes, expe_report, 'description', e) + raise TestError('Error occured during description') + + kronos.time('description') + + ### Compute classification + log.info('Classify data') + try: + classification = compute_classification(expe, descriptors) + except Exception as e: + kronos.time('classification') + expe_report = create_report(kronos) + (STAGING_DIR / oname_yml).unlink() + write_error(FAILED_DIR / oname_yml, expe, expe_hashes, expe_report, 'classification', e) + raise TestError('Error occured during classification') + + kronos.time('classification') + + ### Metrics + log.info('Run initial metrics') + metrics = run_metrics(expe, classification, descriptors) + kronos.time('metrics') + + ### Create complete report + log.info('Write complete report') + expe_report = create_report(kronos) + + ### Name and write prediction + triskele.write(RESULT_DIR / oname_tif, classification) + + ### Write report and results + (STAGING_DIR / oname_yml).unlink() + write_expe_file(RESULT_DIR / oname_yml, expe, expe_hashes, expe_report, oname_tif, metrics) + + log.info('Test complete') + + +def write_error(file, expe, hashes=None, report=None, when='', e=Exception): + error = OrderedDict() + error['when'] = when + error['what'] = str(e) + error['where'] = traceback.format_exc() + with open(file, 'w') as of: + yaml.dump(OrderedDict({'expe': expe, + 'expe_hashes': hashes, + 'expe_report': report, + 'expe_error': error}), + of, default_flow_style=False, encoding=None, allow_unicode=True) + +def write_expe_file(file, expe, hashes=None, report=None, classification=None, results=None): + with open(file, 'w') as of: + yaml.dump(OrderedDict({'expe': expe, + 'expe_hashes': hashes, + 'expe_report': report, + 'expe_classification': classification, + 'expe_results': results}), + of, default_flow_style=False, encoding=None, allow_unicode=True) + + +def compute_hashes(expe): + glob = hashlib.sha1() + + expe_hashes = OrderedDict() + + for k in ['ground_truth', 'descriptors_script', 'cross_validation', 'classifier']: + v = str(expe[k]).encode('utf-8') + expe_hashes[k] = hashlib.sha1(v).hexdigest() + glob.update(v) + expe_hashes['global'] = glob.hexdigest() + return expe_hashes + + +def compute_descriptors(expe): + """Compute descriptors from a standard expe recipe""" + script = expe['descriptors_script'] + desc = importlib.import_module(script['name']) + #importlib.reload(Descriptors) + att = desc.run(**script['parameters']) + + return att + + +def get_ground_truth(expe): + gt = triskele.read(expe['ground_truth']) + + # Meta labeling + idx_map = np.arange(gt.max() + 1) + + if 'meta_labels' in expe: + meta_idx = pd.read_csv(expe['meta_labels']) + idx = np.array(meta_idx['index']) + midx = np.array(meta_idx['metaclass_index']) + idx_map[idx] = midx + + return idx_map[gt] + + +def compute_classification(expe, descriptors): + """Read a standard expe recipe and descriptors, return the result classification""" + # Ground truth + gt = get_ground_truth(expe) + + # CrossVal and ML + cv = expe['cross_validation'] + cl = expe['classifier'] + + cross_val = getattr(importlib.import_module(cv['package']), cv['name']) + classifier = getattr(importlib.import_module(cl['package']), cl['name']) + + prediction = np.zeros_like(gt) + + for xt, xv, yt, yv, ti in cross_val(gt, descriptors, **cv['parameters']): + rfc = classifier(**cl['parameters']) + rfc.fit(xt, yt) + + ypred = rfc.predict(xv) + + prediction[ti] = ypred + + return prediction + + +def compute_metrics(ground_truth, classification, descriptors): + """Return dict of metrics for ground_truth and classification prediction in parameters""" + f = np.nonzero(classification) + pred = classification[f].ravel() + gt = ground_truth[f].ravel() + + results = OrderedDict() + results['dimension'] = descriptors.shape[-1] + results['overall_accuracy'] = float(metrics.accuracy_score(gt, pred)) + results['cohen_kappa'] = float(metrics.cohen_kappa_score(gt, pred)) + + return results + + +def run_metrics(expe, classification, descriptors): + """Compute the metrics from a standard expe recipe and an given classification""" + + ### Extensible: meta-classes + gt = get_ground_truth(expe) + return compute_metrics(gt, classification, descriptors) + + +def create_report(kronos): + expe_report = OrderedDict() + + expe_report['supervisor'] = os.uname()[1] + + for timev, datek in zip((kronos.get_start_date(), kronos.get_end_date()), ('start_date', 'end_date')): + expe_report[datek] = datetime.datetime.fromtimestamp(timev).strftime('Le %d/%m/%Y à %H:%M:%S') if timev is not None else None + + ressources = kronos.get_times() + ressources['ram'] = None + + expe_report['ressources'] = ressources + return expe_report + +def watch_folder(): + log.info('Waiting for test') + while not list(TEST_DIR.glob('*.yml')): + time.sleep(10) + +def main(): + while(True): + try: + queue = update_queue() + except Exception: + log.error('Critical exception while updating work queue') + log.error(traceback.format_exc()) + log.warning('Resuming') + continue + if not queue: + watch_folder() + continue + try: + run(queue.pop()['expe_file']) + except TestError: + log.warning('Test failed, error logged. Resuming') + except Exception: + log.error('Critical exception while running test. Resuming') + log.error(traceback.format_exc()) + log.warning('Resuming') + continue + +if __name__ == '__main__': + logger.setup_logging() + log.info('Starting supervisor') + + setup_yaml() + main()