From 12d0bcd2c825a272f1fc543786bd6847be3eb3c1 Mon Sep 17 00:00:00 2001 From: Karamaz0V1 Date: Fri, 7 Sep 2018 20:06:28 +0200 Subject: [PATCH 1/5] Mockup, protocols and WIP on supervisor --- mockup.yml | 61 ++++++++ protocols/jurse_default.py | 30 ++++ protocols/protocol.py | 63 ++++++++ supervisor.py | 301 +++++++++++++++++++++++++++++++++++++ 4 files changed, 455 insertions(+) create mode 100644 mockup.yml create mode 100644 protocols/jurse_default.py create mode 100644 protocols/protocol.py create mode 100644 supervisor.py diff --git a/mockup.yml b/mockup.yml new file mode 100644 index 0000000..d55b970 --- /dev/null +++ b/mockup.yml @@ -0,0 +1,61 @@ +name: Première expérience +date: 9 juillet 2018 +priority: 1 +detail: | + Maquette pour la création du supervisor de minigrida. Par rapport à la + version legacy du projet LD2DAPs, le choix du protocole de test est + générique. Il faut ajouter le chargement dynamique du protocole puis + réusiner le fonctionnement du supervisor pour respecter l'esprit universel + de minigrida. +protocol: JurseSF +expe: + ground_truth: + raster: ./Data/ground_truth/2018_IEEE_GRSS_DFC_GT_TR.tif + meta_labels: GT/jurse_idx.csv + descriptors_script: + name: Descriptors.dfc_aps + parameters: + areas: + - 100 + - 1000 + moi: + - 0.5 + - 0.9 + rasters: + - ./Data/phase1_rasters/DEM+B_C123/UH17_GEM051_TR.tif + - ./Data/phase1_rasters/DEM_C123_3msr/UH17_GEG051_TR.tif + treshold: 1e4 + cross_validation: + name: CrossValidationGenerator.APsCVG + parameters: + n_test: 2 + classifier: + name: sklearn.ensemble.RandomForestClassifier + parameters: + min_samples_leaf: 10 + n_estimators: 50 + n_jobs: -1 + random_state: 0 +expe_hashes: + ground_truth: 2c5ecaddcb8c4a1c8863bc65e7440de4a1b4962c + descriptors_script: cfdcc84d9d9c47177930257f286d850db446812b + cross_validation: 4a61b34fda812fe717890b25d75430023335a7a6 + classifier: 40e6741ef8cc4b4fbe188b8ca0563eb5195b88ad + global: b8219fab322bf11ec1aac14a1f51466dd94ddbdd +expe_report: + supervisor: thecomedian + start_date: Le 27/07/2018 à 16:28:52 + end_date: Le 27/07/2018 à 16:29:54 + ressources: + ram: 42 Go + process_time: + description: 0.6744262149950373 + classification: 168.82905034400028 + metrics: 1.1557443889978458 +expe_results: + classification: test_b8219f.tif + dimensions: 42 + scores: + overall_accuracy: 0.5550408093111998 + cohen_kappa: 0.41714275852261407 + diff --git a/protocols/jurse_default.py b/protocols/jurse_default.py new file mode 100644 index 0000000..1575bb4 --- /dev/null +++ b/protocols/jurse_default.py @@ -0,0 +1,30 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- +# \file jurse_default.py +# \brief TODO +# \author Florent Guiotte +# \version 0.1 +# \date 07 sept. 2018 +# +# TODO details + +import hashlib +from protocol import Protocol + +class Jurse(Protocol): + def __init__(self, expe): + super().__init__(expe, self.__class__.__name__) + + def _get_hashes(self): + hashes = OrderedDict() + hashes['global'] = 'Protocol did not override _get_hashes()' + + glob = hashlib.sha1() + + for k in ['ground_truth', 'descriptors_script', 'cross_validation', 'classifier']: + v = str(expe[k]).encode('utf-8') + hashes[k] = hashlib.sha1(v).hexdigest() + glob.update(v) + hashes['global'] = glob.hexdigest() + + return hashes diff --git a/protocols/protocol.py b/protocols/protocol.py new file mode 100644 index 0000000..d55019e --- /dev/null +++ b/protocols/protocol.py @@ -0,0 +1,63 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- +# \file protocol.py +# \brief TODO +# \author Florent Guiotte +# \version 0.1 +# \date 07 sept. 2018 +# +# TODO details + +import logging +import time +from collections import OrderedDict + +class Protocol: + def __init__(self, expe, name=None): + self._log = logging.getLogger(name) + self._expe = expe + self._name = name + self._times = OrderedDict() + self._pt = time.process_time() + + def get_hashes(self): + self._log.info('Computing hashes') + return(self._get_hashes()) + + def run(self): + self._kronos = Kronos() + self._run() + + def get_results(self): + self._get_results() + + def get_process_time(self): + return self._times() + + def _time(self, process): + self._times[process] = time.process_time() - self._pt + self._pt = time.process_time() + + def _get_hashes(self): + self._log.warning('Protocol did not override _get_hashes()') + hashes = OrderedDict() + hashes['global'] = 'Protocol did not override _get_hashes()' + return hashes + + def _run(self): + self._log.error('Protocol did not override _run()') + raise NotImplementedError('Protocol {} did not override _run()'.format(self)) + + def _get_results(self): + self._log.warning('Protocol did not override _get_results()') + results = OrderedDict() + results['global'] = 'Protocol did not override _get_results()' + return results + + def __str__(self): + return('{}'.format(self._name)) + + +class TestError(Exception): + pass + diff --git a/supervisor.py b/supervisor.py new file mode 100644 index 0000000..f34a35b --- /dev/null +++ b/supervisor.py @@ -0,0 +1,301 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- +# \file supervisor.py +# \brief TODO +# \author Florent Guiotte +# \version 0.1 +# \date 07 sept. 2018 +# +# TODO details + +import yaml +import importlib +import hashlib +from collections import OrderedDict +import time +import os +import datetime +from pathlib import Path +from operator import itemgetter +import traceback +import logging +import logger + +log = logging.getLogger('Supervisor [{}]'.format(os.uname()[1])) + +### Keep yaml ordered, newline string +def setup_yaml(): + """ https://stackoverflow.com/a/8661021 """ + represent_dict_order = lambda self, data: self.represent_mapping('tag:yaml.org,2002:map', data.items()) + yaml.add_representer(OrderedDict, represent_dict_order) + + """ https://stackoverflow.com/a/24291536 """ + yaml.Dumper.org_represent_str = yaml.Dumper.represent_str + yaml.add_representer(str, repr_str, Dumper=yaml.Dumper) + +def repr_str(dumper, data): + if '\n' in data: + return dumper.represent_scalar(u'tag:yaml.org,2002:str', data, style='|') + return dumper.org_represent_str(data) + + + +ENRICHMENT_DIR = Path('./Enrichment/') +TEST_DIR = ENRICHMENT_DIR / 'Tests' +STAGING_DIR = ENRICHMENT_DIR / 'Staging' +RESULT_DIR = ENRICHMENT_DIR / 'Results' +FAILED_DIR = ENRICHMENT_DIR / 'Failed' + +def update_queue(): + tmp_queue = list() + for child in TEST_DIR.iterdir(): + if child.is_file() and child.suffix == '.yml': + tmp_queue.append({'expe_file': child, 'priority': get_priority(child)}) + + queue = sorted(tmp_queue, key=itemgetter('priority')) + return queue + +def get_priority(yml_file): + with open(yml_file) as f: + expe = OrderedDict(yaml.safe_load(f)) + return expe['priority'] + +def run(expe_file): + log.info('Run test {}'.format(expe_file)) + with open(expe_file) as f: + test = OrderedDict(yaml.safe_load(f)) + + ### Stage test + + ### Load protocol + + ### Write hahes + + ### Run test + + ### Write report + + ### Write results + + ### End of test + return + + ### Keep track of time + kronos = Kronos() + + ### Compute hashes + log.info('Computing hashes') + expe_hashes = compute_hashes(expe) + + ### Create output names + oname = '{}_{}'.format(expe_file.stem, expe_hashes['global'][:6]) + oname_yml = oname + '.yml' + oname_tif = oname + '.tif' + + ### Create partial report + expe_report = create_report(kronos) + + ### Stage expe + log.info('Staging test') + write_expe_file(STAGING_DIR / oname_yml, expe, expe_hashes, expe_report) + expe_file.unlink() + + ### Compute descriptors + log.info('Compute descriptors') + try: + descriptors = compute_descriptors(expe) + except Exception as e: + kronos.time('description') + expe_report = create_report(kronos) + (STAGING_DIR / oname_yml).unlink() + write_error(FAILED_DIR / oname_yml, expe, expe_hashes, expe_report, 'description', e) + raise TestError('Error occured during description') + + kronos.time('description') + + ### Compute classification + log.info('Classify data') + try: + classification = compute_classification(expe, descriptors) + except Exception as e: + kronos.time('classification') + expe_report = create_report(kronos) + (STAGING_DIR / oname_yml).unlink() + write_error(FAILED_DIR / oname_yml, expe, expe_hashes, expe_report, 'classification', e) + raise TestError('Error occured during classification') + + kronos.time('classification') + + ### Metrics + log.info('Run initial metrics') + metrics = run_metrics(expe, classification, descriptors) + kronos.time('metrics') + + ### Create complete report + log.info('Write complete report') + expe_report = create_report(kronos) + + ### Name and write prediction + triskele.write(RESULT_DIR / oname_tif, classification) + + ### Write report and results + (STAGING_DIR / oname_yml).unlink() + write_expe_file(RESULT_DIR / oname_yml, expe, expe_hashes, expe_report, oname_tif, metrics) + + log.info('Test complete') + + +def write_error(file, expe, hashes=None, report=None, when='', e=Exception): + error = OrderedDict() + error['when'] = when + error['what'] = str(e) + error['where'] = traceback.format_exc() + with open(file, 'w') as of: + yaml.dump(OrderedDict({'expe': expe, + 'expe_hashes': hashes, + 'expe_report': report, + 'expe_error': error}), + of, default_flow_style=False, encoding=None, allow_unicode=True) + +def write_expe_file(file, expe, hashes=None, report=None, classification=None, results=None): + with open(file, 'w') as of: + yaml.dump(OrderedDict({'expe': expe, + 'expe_hashes': hashes, + 'expe_report': report, + 'expe_classification': classification, + 'expe_results': results}), + of, default_flow_style=False, encoding=None, allow_unicode=True) + + +def compute_hashes(expe): + glob = hashlib.sha1() + + expe_hashes = OrderedDict() + + for k in ['ground_truth', 'descriptors_script', 'cross_validation', 'classifier']: + v = str(expe[k]).encode('utf-8') + expe_hashes[k] = hashlib.sha1(v).hexdigest() + glob.update(v) + expe_hashes['global'] = glob.hexdigest() + return expe_hashes + + +def compute_descriptors(expe): + """Compute descriptors from a standard expe recipe""" + script = expe['descriptors_script'] + desc = importlib.import_module(script['name']) + #importlib.reload(Descriptors) + att = desc.run(**script['parameters']) + + return att + + +def get_ground_truth(expe): + gt = triskele.read(expe['ground_truth']) + + # Meta labeling + idx_map = np.arange(gt.max() + 1) + + if 'meta_labels' in expe: + meta_idx = pd.read_csv(expe['meta_labels']) + idx = np.array(meta_idx['index']) + midx = np.array(meta_idx['metaclass_index']) + idx_map[idx] = midx + + return idx_map[gt] + + +def compute_classification(expe, descriptors): + """Read a standard expe recipe and descriptors, return the result classification""" + # Ground truth + gt = get_ground_truth(expe) + + # CrossVal and ML + cv = expe['cross_validation'] + cl = expe['classifier'] + + cross_val = getattr(importlib.import_module(cv['package']), cv['name']) + classifier = getattr(importlib.import_module(cl['package']), cl['name']) + + prediction = np.zeros_like(gt) + + for xt, xv, yt, yv, ti in cross_val(gt, descriptors, **cv['parameters']): + rfc = classifier(**cl['parameters']) + rfc.fit(xt, yt) + + ypred = rfc.predict(xv) + + prediction[ti] = ypred + + return prediction + + +def compute_metrics(ground_truth, classification, descriptors): + """Return dict of metrics for ground_truth and classification prediction in parameters""" + f = np.nonzero(classification) + pred = classification[f].ravel() + gt = ground_truth[f].ravel() + + results = OrderedDict() + results['dimension'] = descriptors.shape[-1] + results['overall_accuracy'] = float(metrics.accuracy_score(gt, pred)) + results['cohen_kappa'] = float(metrics.cohen_kappa_score(gt, pred)) + + return results + + +def run_metrics(expe, classification, descriptors): + """Compute the metrics from a standard expe recipe and an given classification""" + + ### Extensible: meta-classes + gt = get_ground_truth(expe) + return compute_metrics(gt, classification, descriptors) + + +def create_report(kronos): + expe_report = OrderedDict() + + expe_report['supervisor'] = os.uname()[1] + + for timev, datek in zip((kronos.get_start_date(), kronos.get_end_date()), ('start_date', 'end_date')): + expe_report[datek] = datetime.datetime.fromtimestamp(timev).strftime('Le %d/%m/%Y à %H:%M:%S') if timev is not None else None + + ressources = kronos.get_times() + ressources['ram'] = None + + expe_report['ressources'] = ressources + return expe_report + +def watch_folder(): + log.info('Waiting for test') + while not list(TEST_DIR.glob('*.yml')): + time.sleep(10) + +def main(): + while(True): + try: + queue = update_queue() + except Exception: + log.error('Critical exception while updating work queue') + log.error(traceback.format_exc()) + log.warning('Resuming') + continue + if not queue: + watch_folder() + continue + try: + run(queue.pop()['expe_file']) + except TestError: + log.warning('Test failed, error logged. Resuming') + except Exception: + log.error('Critical exception while running test. Resuming') + log.error(traceback.format_exc()) + log.warning('Resuming') + continue + +if __name__ == '__main__': + logger.setup_logging() + log.info('Starting supervisor') + + setup_yaml() + main() -- 2.45.2 From 5eb4180bae2c3b7c68335b7e204bac58074f6344 Mon Sep 17 00:00:00 2001 From: Karamaz0V1 Date: Sun, 9 Sep 2018 16:37:02 +0200 Subject: [PATCH 2/5] Refactoring in progress on Supervisor and Jurse --- .gitignore | 4 +- logger.py | 51 ++++++++++++++ logging.yaml | 40 +++++++++++ mockup.yml | 2 +- protocols/__init__.py | 9 +++ protocols/jurse.py | 122 ++++++++++++++++++++++++++++++++ protocols/jurse_default.py | 30 -------- protocols/protocol.py | 7 +- supervisor.py | 140 ++++++++++--------------------------- 9 files changed, 267 insertions(+), 138 deletions(-) create mode 100644 logger.py create mode 100644 logging.yaml create mode 100644 protocols/__init__.py create mode 100644 protocols/jurse.py delete mode 100644 protocols/jurse_default.py diff --git a/.gitignore b/.gitignore index ced047a..ae7c1b2 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,3 @@ -Enrichment +Enrichment/ +__pycache__/ +Logs/ diff --git a/logger.py b/logger.py new file mode 100644 index 0000000..14441fa --- /dev/null +++ b/logger.py @@ -0,0 +1,51 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- +# \file %filename%.py +# \brief TODO +# \author Florent Guiotte +# \version 0.1 +# \date 24 avril 2018 +# +# from https://fangpenlin.com/posts/2012/08/26/good-logging-practice-in-python/ + +import os +import logging.config +from pathlib import Path + +import yaml + +def setup_logging( + default_path='logging.yaml', + default_level=logging.WARN, + env_key='LOG_CFG' +): + """Setup logging configuration + + """ + path = default_path + value = os.getenv(env_key, None) + if value: + path = value + if os.path.exists(path): + with open(path, 'rt') as f: + config = yaml.safe_load(f.read()) + makedirs(config) + logging.config.dictConfig(config) + else: + logging.basicConfig(level=default_level) + + +def makedirs(dic): + files = finddirs(dic) + for f in files: + d = Path(*f.parts[:-1]) + d.mkdir(parents=True, exist_ok=True) + +def finddirs(dic, key='filename'): + r = list() + value = dic.get(key) + if value : r.append(Path(value)) + for k, v in dic.items(): + if isinstance(v, dict): + r.extend(finddirs(v)) + return r diff --git a/logging.yaml b/logging.yaml new file mode 100644 index 0000000..b18e6ed --- /dev/null +++ b/logging.yaml @@ -0,0 +1,40 @@ +version: 1 +disable_existing_loggers: False +formatters: + simple: + format: "%(asctime)s %(levelname)s %(name)s: %(message)s" + +handlers: + console: + class: logging.StreamHandler + level: INFO + formatter: simple + stream: ext://sys.stdout + + info_file_handler: + class: logging.handlers.RotatingFileHandler + level: INFO + formatter: simple + filename: Logs/info.log + maxBytes: 10485760 # 10MB + backupCount: 20 + encoding: utf8 + + error_file_handler: + class: logging.handlers.RotatingFileHandler + level: ERROR + formatter: simple + filename: Logs/errors.log + maxBytes: 10485760 # 10MB + backupCount: 20 + encoding: utf8 + +loggers: + my_module: + level: DEBUG + handlers: [console] + propagate: no + +root: + level: DEBUG + handlers: [console, info_file_handler, error_file_handler] diff --git a/mockup.yml b/mockup.yml index d55b970..c678907 100644 --- a/mockup.yml +++ b/mockup.yml @@ -7,7 +7,7 @@ detail: | générique. Il faut ajouter le chargement dynamique du protocole puis réusiner le fonctionnement du supervisor pour respecter l'esprit universel de minigrida. -protocol: JurseSF +protocol: Jurse expe: ground_truth: raster: ./Data/ground_truth/2018_IEEE_GRSS_DFC_GT_TR.tif diff --git a/protocols/__init__.py b/protocols/__init__.py new file mode 100644 index 0000000..e17274e --- /dev/null +++ b/protocols/__init__.py @@ -0,0 +1,9 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- +# \file __init__.py +# \brief TODO +# \author Florent Guiotte +# \version 0.1 +# \date 09 sept. 2018 +# +# TODO details diff --git a/protocols/jurse.py b/protocols/jurse.py new file mode 100644 index 0000000..ea3a378 --- /dev/null +++ b/protocols/jurse.py @@ -0,0 +1,122 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- +# \file jurse.py +# \brief TODO +# \author Florent Guiotte +# \version 0.1 +# \date 07 sept. 2018 +# +# TODO details + +import hashlib +import importlib +from collections import OrderedDict +import numpy as np +import pandas as pd +from sklearn import metrics +# TODO: create package, use dev +import sys +sys.path.append('../triskele/python') +import triskele +from .protocol import Protocol, TestError + + +class Jurse(Protocol): + """First JURSE test protocol for LiDAR classification with 2D maps. + + This first protocol compute attribute profiles on the whole scene then + split in train and test for a random forest classifier. + + """ + + def __init__(self, expe): + super().__init__(expe, self.__class__.__name__) + + def _get_hashes(self): + hashes = OrderedDict() + glob = hashlib.sha1() + + for k in ['ground_truth', 'descriptors_script', + 'cross_validation', 'classifier']: + val = str(self._expe[k]).encode('utf-8') + hashes[k] = hashlib.sha1(val).hexdigest() + glob.update(val) + hashes['global'] = glob.hexdigest() + + return hashes + + def _run(self): + self._log.info('Compute descriptors') + try: + descriptors = self._compute_descriptors() + except Exception: + raise TestError('Error occured during description') + + self._log.info('Classify data') + try: + classification = self._compute_classificatin(descriptors) + except Exception: + raise TestError('Error occured during classification') + + self._log.info('Run metrics') + self._metrics = self._run_metrics(classification, descriptors) + + def _compute_descriptors(self): + script = self._expe['descriptors_script'] + + desc = importlib.import_module(script['name']) + att = desc.run(**script['parameters']) + + return att + + def _compute_classification(self, descriptors): + # Ground truth + gt = self._get_ground_truth() + + # CrossVal and ML + cv = self._expe['cross_validation'] + cl = self._expe['classifier'] + + cross_val = getattr(importlib.import_module(cv['package']), cv['name']) + classifier = getattr(importlib.import_module(cl['package']), cl['name']) + + prediction = np.zeros_like(gt) + + for xt, xv, yt, yv, ti in cross_val(gt, descriptors, **cv['parameters']): + rfc = classifier(**cl['parameters']) + rfc.fit(xt, yt) + + ypred = rfc.predict(xv) + + prediction[ti] = ypred + + return prediction + + def _get_ground_truth(self): + gt_expe = self._expe['ground_truth'] + gt = triskele.read(gt_expe['raster']) + + # Meta labeling + idx_map = np.arange(gt.max() + 1) + + if 'meta_labels' in self._expe: + meta_idx = pd.read_csv(gt_expe['meta_labels']) + idx = np.array(meta_idx['index']) + midx = np.array(meta_idx['metaclass_index']) + idx_map[idx] = midx + + return idx_map[gt] + + def _run_metrics(self, classification, descriptors): + gt = self._get_ground_truth() + + f = np.nonzero(classification) + pred = classification[f].ravel() + gt = gt[f].ravel() + + results = OrderedDict() + results['dimension'] = descriptors.shape[-1] + results['overall_accuracy'] = float(metrics.accuracy_score(gt, pred)) + results['cohen_kappa'] = float(metrics.cohen_kappa_score(gt, pred)) + + return results diff --git a/protocols/jurse_default.py b/protocols/jurse_default.py deleted file mode 100644 index 1575bb4..0000000 --- a/protocols/jurse_default.py +++ /dev/null @@ -1,30 +0,0 @@ -#!/usr/bin/python -# -*- coding: utf-8 -*- -# \file jurse_default.py -# \brief TODO -# \author Florent Guiotte -# \version 0.1 -# \date 07 sept. 2018 -# -# TODO details - -import hashlib -from protocol import Protocol - -class Jurse(Protocol): - def __init__(self, expe): - super().__init__(expe, self.__class__.__name__) - - def _get_hashes(self): - hashes = OrderedDict() - hashes['global'] = 'Protocol did not override _get_hashes()' - - glob = hashlib.sha1() - - for k in ['ground_truth', 'descriptors_script', 'cross_validation', 'classifier']: - v = str(expe[k]).encode('utf-8') - hashes[k] = hashlib.sha1(v).hexdigest() - glob.update(v) - hashes['global'] = glob.hexdigest() - - return hashes diff --git a/protocols/protocol.py b/protocols/protocol.py index d55019e..60824c6 100644 --- a/protocols/protocol.py +++ b/protocols/protocol.py @@ -12,21 +12,23 @@ import logging import time from collections import OrderedDict + class Protocol: def __init__(self, expe, name=None): self._log = logging.getLogger(name) self._expe = expe self._name = name self._times = OrderedDict() - self._pt = time.process_time() + self._log.debug('expe loaded: {}'.format(self._expe)) def get_hashes(self): self._log.info('Computing hashes') return(self._get_hashes()) def run(self): - self._kronos = Kronos() + self._pt = time.process_time() self._run() +# TODO: Strop process timer def get_results(self): self._get_results() @@ -60,4 +62,3 @@ class Protocol: class TestError(Exception): pass - diff --git a/supervisor.py b/supervisor.py index f34a35b..a41573a 100644 --- a/supervisor.py +++ b/supervisor.py @@ -20,25 +20,7 @@ from operator import itemgetter import traceback import logging import logger - -log = logging.getLogger('Supervisor [{}]'.format(os.uname()[1])) - -### Keep yaml ordered, newline string -def setup_yaml(): - """ https://stackoverflow.com/a/8661021 """ - represent_dict_order = lambda self, data: self.represent_mapping('tag:yaml.org,2002:map', data.items()) - yaml.add_representer(OrderedDict, represent_dict_order) - - """ https://stackoverflow.com/a/24291536 """ - yaml.Dumper.org_represent_str = yaml.Dumper.represent_str - yaml.add_representer(str, repr_str, Dumper=yaml.Dumper) - -def repr_str(dumper, data): - if '\n' in data: - return dumper.represent_scalar(u'tag:yaml.org,2002:str', data, style='|') - return dumper.org_represent_str(data) - - +from protocols.protocol import TestError ENRICHMENT_DIR = Path('./Enrichment/') TEST_DIR = ENRICHMENT_DIR / 'Tests' @@ -46,30 +28,61 @@ STAGING_DIR = ENRICHMENT_DIR / 'Staging' RESULT_DIR = ENRICHMENT_DIR / 'Results' FAILED_DIR = ENRICHMENT_DIR / 'Failed' +log = logging.getLogger('Supervisor [{}]'.format(os.uname()[1])) + +def setup_yaml(): + """ Keep yaml ordered, newline string + from https://stackoverflow.com/a/8661021 + """ + represent_dict_order = lambda self, data: self.represent_mapping('tag:yaml.org,2002:map', data.items()) + yaml.add_representer(OrderedDict, represent_dict_order) + + """ https://stackoverflow.com/a/24291536 """ + yaml.Dumper.org_represent_str = yaml.Dumper.represent_str + yaml.add_representer(str, repr_str, Dumper=yaml.Dumper) + + +def repr_str(dumper, data): + if '\n' in data: + return dumper.represent_scalar(u'tag:yaml.org,2002:str', + data, style='|') + return dumper.org_represent_str(data) + + def update_queue(): tmp_queue = list() for child in TEST_DIR.iterdir(): if child.is_file() and child.suffix == '.yml': - tmp_queue.append({'expe_file': child, 'priority': get_priority(child)}) - + tmp_queue.append({'expe_file': child, + 'priority': get_priority(child)}) + queue = sorted(tmp_queue, key=itemgetter('priority')) return queue + def get_priority(yml_file): with open(yml_file) as f: expe = OrderedDict(yaml.safe_load(f)) return expe['priority'] - + + def run(expe_file): log.info('Run test {}'.format(expe_file)) with open(expe_file) as f: test = OrderedDict(yaml.safe_load(f)) + + ### Stage test ### Load protocol + protocol = getattr(importlib.import_module('protocols.jurse'), test['protocol']) + experience = protocol(test['expe']) + log.info('{} test protocol loaded'.format(experience)) ### Write hahes + hashes = experience.get_hashes() + log.info(hashes) ### Run test @@ -80,12 +93,6 @@ def run(expe_file): ### End of test return - ### Keep track of time - kronos = Kronos() - - ### Compute hashes - log.info('Computing hashes') - expe_hashes = compute_hashes(expe) ### Create output names oname = '{}_{}'.format(expe_file.stem, expe_hashes['global'][:6]) @@ -179,79 +186,6 @@ def compute_hashes(expe): expe_hashes['global'] = glob.hexdigest() return expe_hashes - -def compute_descriptors(expe): - """Compute descriptors from a standard expe recipe""" - script = expe['descriptors_script'] - desc = importlib.import_module(script['name']) - #importlib.reload(Descriptors) - att = desc.run(**script['parameters']) - - return att - - -def get_ground_truth(expe): - gt = triskele.read(expe['ground_truth']) - - # Meta labeling - idx_map = np.arange(gt.max() + 1) - - if 'meta_labels' in expe: - meta_idx = pd.read_csv(expe['meta_labels']) - idx = np.array(meta_idx['index']) - midx = np.array(meta_idx['metaclass_index']) - idx_map[idx] = midx - - return idx_map[gt] - - -def compute_classification(expe, descriptors): - """Read a standard expe recipe and descriptors, return the result classification""" - # Ground truth - gt = get_ground_truth(expe) - - # CrossVal and ML - cv = expe['cross_validation'] - cl = expe['classifier'] - - cross_val = getattr(importlib.import_module(cv['package']), cv['name']) - classifier = getattr(importlib.import_module(cl['package']), cl['name']) - - prediction = np.zeros_like(gt) - - for xt, xv, yt, yv, ti in cross_val(gt, descriptors, **cv['parameters']): - rfc = classifier(**cl['parameters']) - rfc.fit(xt, yt) - - ypred = rfc.predict(xv) - - prediction[ti] = ypred - - return prediction - - -def compute_metrics(ground_truth, classification, descriptors): - """Return dict of metrics for ground_truth and classification prediction in parameters""" - f = np.nonzero(classification) - pred = classification[f].ravel() - gt = ground_truth[f].ravel() - - results = OrderedDict() - results['dimension'] = descriptors.shape[-1] - results['overall_accuracy'] = float(metrics.accuracy_score(gt, pred)) - results['cohen_kappa'] = float(metrics.cohen_kappa_score(gt, pred)) - - return results - - -def run_metrics(expe, classification, descriptors): - """Compute the metrics from a standard expe recipe and an given classification""" - - ### Extensible: meta-classes - gt = get_ground_truth(expe) - return compute_metrics(gt, classification, descriptors) - - def create_report(kronos): expe_report = OrderedDict() @@ -279,7 +213,7 @@ def main(): log.error('Critical exception while updating work queue') log.error(traceback.format_exc()) log.warning('Resuming') - continue + break # continue if not queue: watch_folder() continue @@ -291,7 +225,7 @@ def main(): log.error('Critical exception while running test. Resuming') log.error(traceback.format_exc()) log.warning('Resuming') - continue + break # continue if __name__ == '__main__': logger.setup_logging() -- 2.45.2 From 2943cc4a2205a80235b7702564af5681dbf91f80 Mon Sep 17 00:00:00 2001 From: Karamaz0V1 Date: Mon, 10 Sep 2018 17:30:26 +0200 Subject: [PATCH 3/5] Rewrite complete, need packages import before test --- mockup.yml => result_mockup.yml | 27 +++--- supervisor.py | 154 +++++++++++++++++++++++++------- test_mockup.yml | 43 +++++++++ 3 files changed, 183 insertions(+), 41 deletions(-) rename mockup.yml => result_mockup.yml (80%) create mode 100644 test_mockup.yml diff --git a/mockup.yml b/result_mockup.yml similarity index 80% rename from mockup.yml rename to result_mockup.yml index c678907..ee15209 100644 --- a/mockup.yml +++ b/result_mockup.yml @@ -7,13 +7,16 @@ detail: | générique. Il faut ajouter le chargement dynamique du protocole puis réusiner le fonctionnement du supervisor pour respecter l'esprit universel de minigrida. -protocol: Jurse +protocol: + name: Jurse + package: protocols.jurse expe: ground_truth: raster: ./Data/ground_truth/2018_IEEE_GRSS_DFC_GT_TR.tif - meta_labels: GT/jurse_idx.csv + meta_labels: ./Data/ground_truth/jurse_idx.csv descriptors_script: - name: Descriptors.dfc_aps + name: dfc_aps + package: descriptors parameters: areas: - 100 @@ -26,23 +29,25 @@ expe: - ./Data/phase1_rasters/DEM_C123_3msr/UH17_GEG051_TR.tif treshold: 1e4 cross_validation: - name: CrossValidationGenerator.APsCVG + name: APsCVG + package: CVGenerators parameters: n_test: 2 classifier: - name: sklearn.ensemble.RandomForestClassifier + name: RandomForestClassifier + package: sklearn.ensemble parameters: min_samples_leaf: 10 n_estimators: 50 n_jobs: -1 random_state: 0 -expe_hashes: +hashes: ground_truth: 2c5ecaddcb8c4a1c8863bc65e7440de4a1b4962c descriptors_script: cfdcc84d9d9c47177930257f286d850db446812b cross_validation: 4a61b34fda812fe717890b25d75430023335a7a6 classifier: 40e6741ef8cc4b4fbe188b8ca0563eb5195b88ad global: b8219fab322bf11ec1aac14a1f51466dd94ddbdd -expe_report: +report: supervisor: thecomedian start_date: Le 27/07/2018 à 16:28:52 end_date: Le 27/07/2018 à 16:29:54 @@ -52,10 +57,10 @@ expe_report: description: 0.6744262149950373 classification: 168.82905034400028 metrics: 1.1557443889978458 -expe_results: - classification: test_b8219f.tif - dimensions: 42 - scores: +results: + classification: ./Enrichment/Results/test_b8219f.tif + metrics: + dimensions: 42 overall_accuracy: 0.5550408093111998 cohen_kappa: 0.41714275852261407 diff --git a/supervisor.py b/supervisor.py index a41573a..2488805 100644 --- a/supervisor.py +++ b/supervisor.py @@ -53,7 +53,7 @@ def update_queue(): tmp_queue = list() for child in TEST_DIR.iterdir(): if child.is_file() and child.suffix == '.yml': - tmp_queue.append({'expe_file': child, + tmp_queue.append({'expe_file': ExpePath(child), 'priority': get_priority(child)}) queue = sorted(tmp_queue, key=itemgetter('priority')) @@ -67,30 +67,56 @@ def get_priority(yml_file): def run(expe_file): + start_time = time.time() log.info('Run test {}'.format(expe_file)) - with open(expe_file) as f: - test = OrderedDict(yaml.safe_load(f)) - - + test = expe_file.read() ### Stage test + expe_file.stage(test) ### Load protocol - protocol = getattr(importlib.import_module('protocols.jurse'), test['protocol']) - experience = protocol(test['expe']) + try: + protocol = getattr(importlib.import_module(test['protocol']['package']), + test['protocol']['name']) + experience = protocol(test['expe']) + except Exception as e: + err = 'Could not load protocol from test {}'.format(expe_file) + log.warning(err) + expe_file.error(test, 'loading protocol', e) + raise TestError(err) log.info('{} test protocol loaded'.format(experience)) ### Write hahes - hashes = experience.get_hashes() - log.info(hashes) + test['hashes'] = experience.get_hashes() + test['report'] = create_report(start_time) + expe_file.stage(test) ### Run test + try: + experience.run() + except Exception as e: + err = 'Experience error' + log.warning(err) + expe_file.error(test, 'testing', e) + raise TestError(err) - ### Write report + end_time = time.time() + + ### Write complete report + report = create_report(start_time, end_time) + ressources = OrderedDict() + ressouces['ram'] = None + ressouces['proccess_time'] = experience.get_process_time() + report['ressources'] = ressouces + test['report'] = report ### Write results + test['results'] = experience.get_results() + expe_file.result(test) + log.info('Additional results in {}'.format(expe_file.get_result_path())) ### End of test + log.info('Test complete') return @@ -149,20 +175,71 @@ def run(expe_file): (STAGING_DIR / oname_yml).unlink() write_expe_file(RESULT_DIR / oname_yml, expe, expe_hashes, expe_report, oname_tif, metrics) - log.info('Test complete') + +class ExpePath: + """Utility wrapper for expe files. + + Extend pathlib Path with staging, result and errors function to move the + test report through the Enrichment center. + + """ + def __init__(self, path, hash_length=6): + self._actual = Path(path) + self._base_name = self._actual.stem + self._hash_length = hash_length + self._hash = None + + def __str__(self): + return self._get_complete_name() + + def read(self): + with open(self._actual) as f: + return OrderedDict(yaml.safe_load(f)) + + def _get_hash_name(self): + return '{}{}'.format(self._base_name, + '_' + self._hash[:self._hash_length] if self._hash is not None else '') + + def _get_complete_name(self): + return self._get_hash_name() + '.yml' + + def stage(self, expe): + log.info('Staging {}'.format(self._base_name)) + self._check_hash(expe) + self._write(STAGING_DIR, expe) + + def result(self, expe): + log.info('Write results for test {}'.format(self._base_name)) + self._check_hash(expe) + self._write(RESULT_DIR, expe) + + def error(self, expe, when='', e=Exception): + error = OrderedDict() + error['when'] = when + error['what'] = str(e) + error['where'] = traceback.format_exc() + expe['error'] = error + self._write(FAILED_DIR, expe) + + def get_result_path(self): + return Path(RESULT_DIR) / self._get_hash_name() + + def _check_hash(self, expe): + if self._hash is None: + if 'hashes' in expe: + self._hash = expe['hashes']['global'] + + def _write(self, path, expe): + new_path = Path(path) / self._get_complete_name() + with open(new_path, 'w') as of: + yaml.dump(expe, of, + default_flow_style=False, + encoding=None, + allow_unicode=True) + self._actual.unlink() + self._actual = new_path -def write_error(file, expe, hashes=None, report=None, when='', e=Exception): - error = OrderedDict() - error['when'] = when - error['what'] = str(e) - error['where'] = traceback.format_exc() - with open(file, 'w') as of: - yaml.dump(OrderedDict({'expe': expe, - 'expe_hashes': hashes, - 'expe_report': report, - 'expe_error': error}), - of, default_flow_style=False, encoding=None, allow_unicode=True) def write_expe_file(file, expe, hashes=None, report=None, classification=None, results=None): with open(file, 'w') as of: @@ -186,24 +263,41 @@ def compute_hashes(expe): expe_hashes['global'] = glob.hexdigest() return expe_hashes -def create_report(kronos): +def create_report(stime=None, etime=None): expe_report = OrderedDict() - expe_report['supervisor'] = os.uname()[1] - - for timev, datek in zip((kronos.get_start_date(), kronos.get_end_date()), ('start_date', 'end_date')): + for datek, timev in zip(('start_date', 'end_date'), (stime, etime)): expe_report[datek] = datetime.datetime.fromtimestamp(timev).strftime('Le %d/%m/%Y à %H:%M:%S') if timev is not None else None - ressources = kronos.get_times() - ressources['ram'] = None - - expe_report['ressources'] = ressources return expe_report def watch_folder(): log.info('Waiting for test') while not list(TEST_DIR.glob('*.yml')): time.sleep(10) + +class Kronos(object): + def __init__(self): + self._pt = time.process_time() + self._stime = time.time() + self._etime = None + + def time(self, name): + self._times[name + '_process_time'] = time.process_time() - self._pt + self._pt = time.process_time() + self._etime = time.time() + + def get_times(self): + return self._times + + def get_start_date(self): + return self._stime + + def get_end_date(self): + return self._etime + + + def main(): while(True): diff --git a/test_mockup.yml b/test_mockup.yml new file mode 100644 index 0000000..32c0f8e --- /dev/null +++ b/test_mockup.yml @@ -0,0 +1,43 @@ +name: Première expérience +date: 9 juillet 2018 +priority: 1 +detail: | + Maquette pour la création du supervisor de minigrida. Par rapport à la + version legacy du projet LD2DAPs, le choix du protocole de test est + générique. Il faut ajouter le chargement dynamique du protocole puis + réusiner le fonctionnement du supervisor pour respecter l'esprit universel + de minigrida. +protocol: + name: Jurse + package: protocols.jurse +expe: + ground_truth: + raster: ./Data/ground_truth/2018_IEEE_GRSS_DFC_GT_TR.tif + meta_labels: ./Data/ground_truth/jurse_idx.csv + descriptors_script: + name: dfc_aps + package: descriptors + parameters: + areas: + - 100 + - 1000 + moi: + - 0.5 + - 0.9 + rasters: + - ./Data/phase1_rasters/DEM+B_C123/UH17_GEM051_TR.tif + - ./Data/phase1_rasters/DEM_C123_3msr/UH17_GEG051_TR.tif + treshold: 1e4 + cross_validation: + name: APsCVG + package: CVGenerators + parameters: + n_test: 2 + classifier: + name: RandomForestClassifier + package: sklearn.ensemble + parameters: + min_samples_leaf: 10 + n_estimators: 50 + n_jobs: -1 + random_state: 0 -- 2.45.2 From 314afce292ba402f65a08700e7e1a6e503ecc4c1 Mon Sep 17 00:00:00 2001 From: Karamaz0V1 Date: Tue, 11 Sep 2018 17:23:00 +0200 Subject: [PATCH 4/5] Supervisor and Jurse passing tests --- .gitignore | 1 + descriptors/__init__.py | 0 descriptors/dfc_aps.py | 30 ++++++++++++++++++++ descriptors/dfc_base.py | 34 +++++++++++++++++++++++ descriptors/dfc_daps.py | 41 +++++++++++++++++++++++++++ descriptors/dfc_dsdaps.py | 41 +++++++++++++++++++++++++++ descriptors/dfc_lfaps.py | 57 ++++++++++++++++++++++++++++++++++++++ descriptors/dfc_lfsdaps.py | 57 ++++++++++++++++++++++++++++++++++++++ descriptors/dfc_sdaps.py | 40 ++++++++++++++++++++++++++ protocols/jurse.py | 28 +++++++++++++------ protocols/protocol.py | 9 ++++-- result_mockup.yml | 9 +++--- supervisor.py | 17 +++++++----- test_mockup.yml | 11 ++++---- 14 files changed, 346 insertions(+), 29 deletions(-) create mode 100644 descriptors/__init__.py create mode 100644 descriptors/dfc_aps.py create mode 100644 descriptors/dfc_base.py create mode 100644 descriptors/dfc_daps.py create mode 100644 descriptors/dfc_dsdaps.py create mode 100644 descriptors/dfc_lfaps.py create mode 100644 descriptors/dfc_lfsdaps.py create mode 100644 descriptors/dfc_sdaps.py diff --git a/.gitignore b/.gitignore index ae7c1b2..42120ce 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ Enrichment/ __pycache__/ Logs/ +[Dd]ata/ diff --git a/descriptors/__init__.py b/descriptors/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/descriptors/dfc_aps.py b/descriptors/dfc_aps.py new file mode 100644 index 0000000..c632f9a --- /dev/null +++ b/descriptors/dfc_aps.py @@ -0,0 +1,30 @@ +import numpy as np +import yaml + +import sys +sys.path.append('..') +import ld2dap + +def run(rasters, treshold=1e4, areas=None, sd=None, moi=None): + # Parse attribute type + treshold = float(treshold) + areas = None if areas is None else np.array(areas).astype(np.float).astype(np.int) + sd = None if sd is None else np.array(sd).astype(np.float) + moi = None if moi is None else np.array(moi).astype(np.float) + + # APs Pipelines + loader = ld2dap.LoadTIFF(rasters) + dfc_filter = ld2dap.Treshold(treshold) + dfc_filter.input = loader + aps = ld2dap.AttributeProfiles(area=areas, sd=sd, moi=moi) + aps.input = dfc_filter + out_vectors = ld2dap.RawOutput() + out_vectors.input = aps + + # Compute vectors + out_vectors.run() + + return out_vectors.data + +def version(): + return 'v0.0' \ No newline at end of file diff --git a/descriptors/dfc_base.py b/descriptors/dfc_base.py new file mode 100644 index 0000000..dede241 --- /dev/null +++ b/descriptors/dfc_base.py @@ -0,0 +1,34 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- +# \file dfc_base.py +# \brief TODO +# \author Florent Guiotte +# \version 0.1 +# \date 27 août 2018 +# +# TODO details + +import numpy as np + +import sys +sys.path.append('..') +import ld2dap + +def run(rasters, treshold=1e4): + # Parse parameters type + treshold = float(treshold) + + # Pipelines + loader = ld2dap.LoadTIFF(rasters) + dfc_filter = ld2dap.Treshold(treshold) + dfc_filter.input = loader + out_vectors = ld2dap.RawOutput() + out_vectors.input = dfc_filter + + # Compute vectors + out_vectors.run() + + return out_vectors.data + +def version(): + return 'v0.0' diff --git a/descriptors/dfc_daps.py b/descriptors/dfc_daps.py new file mode 100644 index 0000000..12c59fb --- /dev/null +++ b/descriptors/dfc_daps.py @@ -0,0 +1,41 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- +# \file dfc_daps.py +# \brief TODO +# \author Florent Guiotte +# \version 0.1 +# \date 27 août 2018 +# +# TODO details + +import numpy as np + +import sys +sys.path.append('..') +import ld2dap + +def run(rasters, treshold=1e4, areas=None, sd=None, moi=None): + # Parse parameters type + treshold = float(treshold) + areas = None if areas is None else np.array(areas).astype(np.float).astype(np.int) + sd = None if sd is None else np.array(sd).astype(np.float) + moi = None if moi is None else np.array(moi).astype(np.float) + + # Pipelines + loader = ld2dap.LoadTIFF(rasters) + dfc_filter = ld2dap.Treshold(treshold) + dfc_filter.input = loader + aps = ld2dap.AttributeProfiles(area=areas, sd=sd, moi=moi) + aps.input = dfc_filter + differential = ld2dap.Differential() + differential.input = aps + out_vectors = ld2dap.RawOutput() + out_vectors.input = differential + + # Compute vectors + out_vectors.run() + + return out_vectors.data + +def version(): + return 'v0.0' diff --git a/descriptors/dfc_dsdaps.py b/descriptors/dfc_dsdaps.py new file mode 100644 index 0000000..dca083d --- /dev/null +++ b/descriptors/dfc_dsdaps.py @@ -0,0 +1,41 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- +# \file dfc_dsdaps.py +# \brief TODO +# \author Florent Guiotte +# \version 0.1 +# \date 28 août 2018 +# +# TODO details + +import numpy as np + +import sys +sys.path.append('..') +import ld2dap + +def run(rasters, treshold=1e4, areas=None, sd=None, moi=None): + # Parse parameters type + treshold = float(treshold) + areas = None if areas is None else np.array(areas).astype(np.float).astype(np.int) + sd = None if sd is None else np.array(sd).astype(np.float) + moi = None if moi is None else np.array(moi).astype(np.float) + + # Pipelines + loader = ld2dap.LoadTIFF(rasters) + dfc_filter = ld2dap.Treshold(treshold) + dfc_filter.input = loader + sdaps = ld2dap.SelfDualAttributeProfiles(area=areas, sd=sd, moi=moi) + sdaps.input = dfc_filter + differential = ld2dap.Differential() + differential.input = sdaps + out_vectors = ld2dap.RawOutput() + out_vectors.input = differential + + # Compute vectors + out_vectors.run() + + return out_vectors.data + +def version(): + return 'v0.0' diff --git a/descriptors/dfc_lfaps.py b/descriptors/dfc_lfaps.py new file mode 100644 index 0000000..50853eb --- /dev/null +++ b/descriptors/dfc_lfaps.py @@ -0,0 +1,57 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- +# \file dfc_lfaps.py +# \brief TODO +# \author Florent Guiotte +# \version 0.1 +# \date 27 août 2018 +# +# TODO details + +import numpy as np + +import sys +sys.path.append('..') +import ld2dap + +# TODO: Add param percentile? + +dispatcher = { + 'mean': np.mean, # Arithmetic mean + 'median': np.median, # Median + 'average': np.average, # Weighted average (=mean ?) + 'std': np.std, # Standard deviation + 'var': np.var, # Variance + 'amax': np.amax, # Maximum + 'amin': np.amin, # Minimum + 'ptp': np.ptp, # Range of values (max - min) + } + +def run(rasters, treshold=1e4, areas=None, sd=None, moi=None, features=['mean'], patch_size=3): + # Parse parameters type + treshold = float(treshold) + areas = None if areas is None else np.array(areas).astype(np.float).astype(np.int) + sd = None if sd is None else np.array(sd).astype(np.float) + moi = None if moi is None else np.array(moi).astype(np.float) + patch_size = int(patch_size) + features = [dispatcher[x] for x in features] + + # Pipelines + loader = ld2dap.LoadTIFF(rasters) + dfc_filter = ld2dap.Treshold(treshold) + dfc_filter.input = loader + aps = ld2dap.AttributeProfiles(area=areas, sd=sd, moi=moi) + aps.input = dfc_filter + local_features = ld2dap.LocalFeatures(features, patch_size) + local_features.input = aps + out_vectors = ld2dap.RawOutput() + out_vectors.input = local_features + + # Compute vectors + out_vectors.run() + + return out_vectors.data + +def version(): + return 'v0.0' + diff --git a/descriptors/dfc_lfsdaps.py b/descriptors/dfc_lfsdaps.py new file mode 100644 index 0000000..ab1556b --- /dev/null +++ b/descriptors/dfc_lfsdaps.py @@ -0,0 +1,57 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- +# \file dfc_lfsdaps.py +# \brief TODO +# \author Florent Guiotte +# \version 0.1 +# \date 28 août 2018 +# +# TODO details + +import numpy as np + +import sys +sys.path.append('..') +import ld2dap + +# TODO: Add param percentile? + +dispatcher = { + 'mean': np.mean, # Arithmetic mean + 'median': np.median, # Median + 'average': np.average, # Weighted average (=mean ?) + 'std': np.std, # Standard deviation + 'var': np.var, # Variance + 'amax': np.amax, # Maximum + 'amin': np.amin, # Minimum + 'ptp': np.ptp, # Range of values (max - min) + } + +def run(rasters, treshold=1e4, areas=None, sd=None, moi=None, features=['mean'], patch_size=3): + # Parse parameters type + treshold = float(treshold) + areas = None if areas is None else np.array(areas).astype(np.float).astype(np.int) + sd = None if sd is None else np.array(sd).astype(np.float) + moi = None if moi is None else np.array(moi).astype(np.float) + patch_size = int(patch_size) + features = [dispatcher[x] for x in features] + + # Pipelines + loader = ld2dap.LoadTIFF(rasters) + dfc_filter = ld2dap.Treshold(treshold) + dfc_filter.input = loader + sdaps = ld2dap.SelfDualAttributeProfiles(area=areas, sd=sd, moi=moi) + sdaps.input = dfc_filter + local_features = ld2dap.LocalFeatures(features, patch_size) + local_features.input = sdaps + out_vectors = ld2dap.RawOutput() + out_vectors.input = local_features + + # Compute vectors + out_vectors.run() + + return out_vectors.data + +def version(): + return 'v0.0' + diff --git a/descriptors/dfc_sdaps.py b/descriptors/dfc_sdaps.py new file mode 100644 index 0000000..ab67dba --- /dev/null +++ b/descriptors/dfc_sdaps.py @@ -0,0 +1,40 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- +# \file dfc_sdaps.py +# \brief TODO +# \author Florent Guiotte +# \version 0.1 +# \date 27 août 2018 +# +# TODO details + +import numpy as np + +import sys +sys.path.append('..') +import ld2dap + +def run(rasters, treshold=1e4, areas=None, sd=None, moi=None): + # Parse parameters type + treshold = float(treshold) + areas = None if areas is None else np.array(areas).astype(np.float).astype(np.int) + sd = None if sd is None else np.array(sd).astype(np.float) + moi = None if moi is None else np.array(moi).astype(np.float) + + # Pipelines + loader = ld2dap.LoadTIFF(rasters) + dfc_filter = ld2dap.Treshold(treshold) + dfc_filter.input = loader + sdaps = ld2dap.SelfDualAttributeProfiles(area=areas, sd=sd, moi=moi) + sdaps.input = dfc_filter + out_vectors = ld2dap.RawOutput() + out_vectors.input = sdaps + + # Compute vectors + out_vectors.run() + + return out_vectors.data + +def version(): + return 'v0.0' + diff --git a/protocols/jurse.py b/protocols/jurse.py index ea3a378..1f41485 100644 --- a/protocols/jurse.py +++ b/protocols/jurse.py @@ -14,9 +14,6 @@ from collections import OrderedDict import numpy as np import pandas as pd from sklearn import metrics -# TODO: create package, use dev -import sys -sys.path.append('../triskele/python') import triskele from .protocol import Protocol, TestError @@ -51,15 +48,27 @@ class Jurse(Protocol): descriptors = self._compute_descriptors() except Exception: raise TestError('Error occured during description') + self._time('description') self._log.info('Classify data') try: - classification = self._compute_classificatin(descriptors) + classification = self._compute_classification(descriptors) except Exception: raise TestError('Error occured during classification') + self._time('classification') self._log.info('Run metrics') - self._metrics = self._run_metrics(classification, descriptors) + metrics = self._run_metrics(classification, descriptors) + self._time('metrics') + + cmap = str(self._results_base_name) + '.tif' + self._log.info('Saving classification map {}'.format(cmap)) + triskele.write(cmap, classification) + + results = OrderedDict() + results['classification'] = cmap + results['metrics'] = metrics + self._results = results def _compute_descriptors(self): script = self._expe['descriptors_script'] @@ -80,7 +89,7 @@ class Jurse(Protocol): cross_val = getattr(importlib.import_module(cv['package']), cv['name']) classifier = getattr(importlib.import_module(cl['package']), cl['name']) - prediction = np.zeros_like(gt) + prediction = np.zeros_like(gt, dtype=np.uint8) for xt, xv, yt, yv, ti in cross_val(gt, descriptors, **cv['parameters']): rfc = classifier(**cl['parameters']) @@ -99,7 +108,7 @@ class Jurse(Protocol): # Meta labeling idx_map = np.arange(gt.max() + 1) - if 'meta_labels' in self._expe: + if 'meta_labels' in gt_expe: meta_idx = pd.read_csv(gt_expe['meta_labels']) idx = np.array(meta_idx['index']) midx = np.array(meta_idx['metaclass_index']) @@ -107,6 +116,9 @@ class Jurse(Protocol): return idx_map[gt] + def _get_results(self): + return self._results + def _run_metrics(self, classification, descriptors): gt = self._get_ground_truth() @@ -115,7 +127,7 @@ class Jurse(Protocol): gt = gt[f].ravel() results = OrderedDict() - results['dimension'] = descriptors.shape[-1] + results['dimensions'] = descriptors.shape[-1] results['overall_accuracy'] = float(metrics.accuracy_score(gt, pred)) results['cohen_kappa'] = float(metrics.cohen_kappa_score(gt, pred)) diff --git a/protocols/protocol.py b/protocols/protocol.py index 60824c6..b53efc8 100644 --- a/protocols/protocol.py +++ b/protocols/protocol.py @@ -19,22 +19,25 @@ class Protocol: self._expe = expe self._name = name self._times = OrderedDict() + self._results_base_name = None self._log.debug('expe loaded: {}'.format(self._expe)) def get_hashes(self): self._log.info('Computing hashes') return(self._get_hashes()) + def set_results_base_name(self, base_name): + self._results_base_name = base_name + def run(self): self._pt = time.process_time() self._run() -# TODO: Strop process timer def get_results(self): - self._get_results() + return self._get_results() def get_process_time(self): - return self._times() + return self._times def _time(self, process): self._times[process] = time.process_time() - self._pt diff --git a/result_mockup.yml b/result_mockup.yml index ee15209..f2e8a6d 100644 --- a/result_mockup.yml +++ b/result_mockup.yml @@ -13,10 +13,9 @@ protocol: expe: ground_truth: raster: ./Data/ground_truth/2018_IEEE_GRSS_DFC_GT_TR.tif - meta_labels: ./Data/ground_truth/jurse_idx.csv + meta_labels: ./Data/ground_truth/jurse_meta_idx.csv descriptors_script: - name: dfc_aps - package: descriptors + name: descriptors.dfc_aps parameters: areas: - 100 @@ -25,8 +24,8 @@ expe: - 0.5 - 0.9 rasters: - - ./Data/phase1_rasters/DEM+B_C123/UH17_GEM051_TR.tif - - ./Data/phase1_rasters/DEM_C123_3msr/UH17_GEG051_TR.tif + - ./Data/dfc_rasters/DEM+B_C123/UH17_GEM051_TR.tif + - ./Data/dfc_rasters/DEM_C123_3msr/UH17_GEG051_TR.tif treshold: 1e4 cross_validation: name: APsCVG diff --git a/supervisor.py b/supervisor.py index 2488805..07f1fd8 100644 --- a/supervisor.py +++ b/supervisor.py @@ -76,8 +76,10 @@ def run(expe_file): ### Load protocol try: - protocol = getattr(importlib.import_module(test['protocol']['package']), - test['protocol']['name']) + #protocol = getattr(importlib.import_module(test['protocol']['package']), test['protocol']['name']) + protocol_module = importlib.import_module(test['protocol']['package']) + importlib.reload(protocol_module) + protocol = getattr(protocol_module, test['protocol']['name']) experience = protocol(test['expe']) except Exception as e: err = 'Could not load protocol from test {}'.format(expe_file) @@ -91,6 +93,8 @@ def run(expe_file): test['report'] = create_report(start_time) expe_file.stage(test) + experience.set_results_base_name(expe_file.get_result_path()) + ### Run test try: experience.run() @@ -105,15 +109,14 @@ def run(expe_file): ### Write complete report report = create_report(start_time, end_time) ressources = OrderedDict() - ressouces['ram'] = None - ressouces['proccess_time'] = experience.get_process_time() - report['ressources'] = ressouces + ressources['ram'] = None + ressources['proccess_time'] = experience.get_process_time() + report['ressources'] = ressources test['report'] = report ### Write results test['results'] = experience.get_results() expe_file.result(test) - log.info('Additional results in {}'.format(expe_file.get_result_path())) ### End of test log.info('Test complete') @@ -274,7 +277,7 @@ def create_report(stime=None, etime=None): def watch_folder(): log.info('Waiting for test') while not list(TEST_DIR.glob('*.yml')): - time.sleep(10) + time.sleep(3) class Kronos(object): def __init__(self): diff --git a/test_mockup.yml b/test_mockup.yml index 32c0f8e..503caee 100644 --- a/test_mockup.yml +++ b/test_mockup.yml @@ -13,10 +13,9 @@ protocol: expe: ground_truth: raster: ./Data/ground_truth/2018_IEEE_GRSS_DFC_GT_TR.tif - meta_labels: ./Data/ground_truth/jurse_idx.csv + meta_labels: ./Data/ground_truth/jurse_meta_idx.csv descriptors_script: - name: dfc_aps - package: descriptors + name: descriptors.dfc_aps parameters: areas: - 100 @@ -25,8 +24,8 @@ expe: - 0.5 - 0.9 rasters: - - ./Data/phase1_rasters/DEM+B_C123/UH17_GEM051_TR.tif - - ./Data/phase1_rasters/DEM_C123_3msr/UH17_GEG051_TR.tif + - ./Data/dfc_rasters/DEM+B_C123/UH17_GEM051_TR.tif + - ./Data/dfc_rasters/DEM_C123_3msr/UH17_GEG051_TR.tif treshold: 1e4 cross_validation: name: APsCVG @@ -38,6 +37,6 @@ expe: package: sklearn.ensemble parameters: min_samples_leaf: 10 - n_estimators: 50 + n_estimators: 10 n_jobs: -1 random_state: 0 -- 2.45.2 From bbbb6d1de4f9424040b416aefe9737f6cdcd044d Mon Sep 17 00:00:00 2001 From: Karamaz0V1 Date: Tue, 11 Sep 2018 17:42:14 +0200 Subject: [PATCH 5/5] Refactor Supervisor --- supervisor.py | 145 +++++++++----------------------------------------- 1 file changed, 24 insertions(+), 121 deletions(-) diff --git a/supervisor.py b/supervisor.py index 07f1fd8..bada05e 100644 --- a/supervisor.py +++ b/supervisor.py @@ -71,7 +71,7 @@ def run(expe_file): log.info('Run test {}'.format(expe_file)) test = expe_file.read() - ### Stage test + ### Stage experience expe_file.stage(test) ### Load protocol @@ -88,9 +88,11 @@ def run(expe_file): raise TestError(err) log.info('{} test protocol loaded'.format(experience)) - ### Write hahes + ### Get hashes test['hashes'] = experience.get_hashes() - test['report'] = create_report(start_time) + test['report'] = create_report(experience, start_time) + + ### Stage experience expe_file.stage(test) experience.set_results_base_name(expe_file.get_result_path()) @@ -106,78 +108,33 @@ def run(expe_file): end_time = time.time() - ### Write complete report - report = create_report(start_time, end_time) - ressources = OrderedDict() - ressources['ram'] = None - ressources['proccess_time'] = experience.get_process_time() - report['ressources'] = ressources - test['report'] = report + ### Get complete report + test['report'] = create_report(experience, start_time, end_time) - ### Write results + ### Get results test['results'] = experience.get_results() + + ### Write experience expe_file.result(test) ### End of test log.info('Test complete') - return - - - ### Create output names - oname = '{}_{}'.format(expe_file.stem, expe_hashes['global'][:6]) - oname_yml = oname + '.yml' - oname_tif = oname + '.tif' - - ### Create partial report - expe_report = create_report(kronos) - - ### Stage expe - log.info('Staging test') - write_expe_file(STAGING_DIR / oname_yml, expe, expe_hashes, expe_report) - expe_file.unlink() - ### Compute descriptors - log.info('Compute descriptors') - try: - descriptors = compute_descriptors(expe) - except Exception as e: - kronos.time('description') - expe_report = create_report(kronos) - (STAGING_DIR / oname_yml).unlink() - write_error(FAILED_DIR / oname_yml, expe, expe_hashes, expe_report, 'description', e) - raise TestError('Error occured during description') +def create_report(experience, stime=None, etime=None): + expe_report = OrderedDict() + expe_report['supervisor'] = os.uname()[1] - kronos.time('description') + # Dates + for datek, timev in zip(('start_date', 'end_date'), (stime, etime)): + expe_report[datek] = datetime.datetime.fromtimestamp(timev).strftime('Le %d/%m/%Y à %H:%M:%S') if timev is not None else None - ### Compute classification - log.info('Classify data') - try: - classification = compute_classification(expe, descriptors) - except Exception as e: - kronos.time('classification') - expe_report = create_report(kronos) - (STAGING_DIR / oname_yml).unlink() - write_error(FAILED_DIR / oname_yml, expe, expe_hashes, expe_report, 'classification', e) - raise TestError('Error occured during classification') - - kronos.time('classification') - - ### Metrics - log.info('Run initial metrics') - metrics = run_metrics(expe, classification, descriptors) - kronos.time('metrics') - - ### Create complete report - log.info('Write complete report') - expe_report = create_report(kronos) - - ### Name and write prediction - triskele.write(RESULT_DIR / oname_tif, classification) - - ### Write report and results - (STAGING_DIR / oname_yml).unlink() - write_expe_file(RESULT_DIR / oname_yml, expe, expe_hashes, expe_report, oname_tif, metrics) + # Ressources + ressources = OrderedDict() + ressources['ram'] = None + ressources['proccess_time'] = experience.get_process_time() + expe_report['ressources'] = ressources + return expe_report class ExpePath: """Utility wrapper for expe files. @@ -242,66 +199,12 @@ class ExpePath: self._actual.unlink() self._actual = new_path - - -def write_expe_file(file, expe, hashes=None, report=None, classification=None, results=None): - with open(file, 'w') as of: - yaml.dump(OrderedDict({'expe': expe, - 'expe_hashes': hashes, - 'expe_report': report, - 'expe_classification': classification, - 'expe_results': results}), - of, default_flow_style=False, encoding=None, allow_unicode=True) - - -def compute_hashes(expe): - glob = hashlib.sha1() - - expe_hashes = OrderedDict() - - for k in ['ground_truth', 'descriptors_script', 'cross_validation', 'classifier']: - v = str(expe[k]).encode('utf-8') - expe_hashes[k] = hashlib.sha1(v).hexdigest() - glob.update(v) - expe_hashes['global'] = glob.hexdigest() - return expe_hashes - -def create_report(stime=None, etime=None): - expe_report = OrderedDict() - expe_report['supervisor'] = os.uname()[1] - for datek, timev in zip(('start_date', 'end_date'), (stime, etime)): - expe_report[datek] = datetime.datetime.fromtimestamp(timev).strftime('Le %d/%m/%Y à %H:%M:%S') if timev is not None else None - - return expe_report def watch_folder(): log.info('Waiting for test') while not list(TEST_DIR.glob('*.yml')): time.sleep(3) -class Kronos(object): - def __init__(self): - self._pt = time.process_time() - self._stime = time.time() - self._etime = None - - def time(self, name): - self._times[name + '_process_time'] = time.process_time() - self._pt - self._pt = time.process_time() - self._etime = time.time() - - def get_times(self): - return self._times - - def get_start_date(self): - return self._stime - - def get_end_date(self): - return self._etime - - - - def main(): while(True): try: @@ -310,7 +213,7 @@ def main(): log.error('Critical exception while updating work queue') log.error(traceback.format_exc()) log.warning('Resuming') - break # continue + continue if not queue: watch_folder() continue @@ -322,7 +225,7 @@ def main(): log.error('Critical exception while running test. Resuming') log.error(traceback.format_exc()) log.warning('Resuming') - break # continue + continue if __name__ == '__main__': logger.setup_logging() -- 2.45.2