From 131d687d42dfa8ab8b5bdbd68f3fee5abb3050a4 Mon Sep 17 00:00:00 2001 From: Karamaz0V1 Date: Fri, 22 May 2020 16:19:07 +0200 Subject: [PATCH 1/5] Create root package and database subpackage --- .gitignore | 1 + minigrida/__init__.py | 10 +++ .../cvgenerators}/__init__.py | 0 .../cvgenerators}/jurse.py | 0 minigrida/database/__init__.py | 12 ++++ minigrida/database/design.py | 46 ++++++++++++++ minigrida/database/helpers.py | 62 +++++++++++++++++++ .../descriptors}/__init__.py | 0 .../descriptors}/dfc_aps.py | 0 .../descriptors}/dfc_base.py | 0 .../descriptors}/dfc_daps.py | 0 .../descriptors}/dfc_dsdaps.py | 0 .../descriptors}/dfc_lfaps.py | 0 .../descriptors}/dfc_lfsdaps.py | 0 .../descriptors}/dfc_sdaps.py | 0 .../protocols}/__init__.py | 0 {protocols => minigrida/protocols}/jurse.py | 0 .../protocols}/protocol.py | 0 setup.py | 4 +- 19 files changed, 133 insertions(+), 2 deletions(-) create mode 100644 minigrida/__init__.py rename {cvgenerators => minigrida/cvgenerators}/__init__.py (100%) rename {cvgenerators => minigrida/cvgenerators}/jurse.py (100%) create mode 100644 minigrida/database/__init__.py create mode 100644 minigrida/database/design.py create mode 100644 minigrida/database/helpers.py rename {descriptors => minigrida/descriptors}/__init__.py (100%) rename {descriptors => minigrida/descriptors}/dfc_aps.py (100%) rename {descriptors => minigrida/descriptors}/dfc_base.py (100%) rename {descriptors => minigrida/descriptors}/dfc_daps.py (100%) rename {descriptors => minigrida/descriptors}/dfc_dsdaps.py (100%) rename {descriptors => minigrida/descriptors}/dfc_lfaps.py (100%) rename {descriptors => minigrida/descriptors}/dfc_lfsdaps.py (100%) rename {descriptors => minigrida/descriptors}/dfc_sdaps.py (100%) rename {protocols => minigrida/protocols}/__init__.py (100%) rename {protocols => minigrida/protocols}/jurse.py (100%) rename {protocols => minigrida/protocols}/protocol.py (100%) diff --git a/.gitignore b/.gitignore index 42120ce..fe1e3ea 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,4 @@ Enrichment/ __pycache__/ Logs/ [Dd]ata/ +*.egg-info/ diff --git a/minigrida/__init__.py b/minigrida/__init__.py new file mode 100644 index 0000000..04906e8 --- /dev/null +++ b/minigrida/__init__.py @@ -0,0 +1,10 @@ +#!/usr/bin/env python +# file __init__.py +# author Florent Guiotte +# version 0.0 +# date 22 mai 2020 +"""Abstract + +doc. +""" + diff --git a/cvgenerators/__init__.py b/minigrida/cvgenerators/__init__.py similarity index 100% rename from cvgenerators/__init__.py rename to minigrida/cvgenerators/__init__.py diff --git a/cvgenerators/jurse.py b/minigrida/cvgenerators/jurse.py similarity index 100% rename from cvgenerators/jurse.py rename to minigrida/cvgenerators/jurse.py diff --git a/minigrida/database/__init__.py b/minigrida/database/__init__.py new file mode 100644 index 0000000..6cba338 --- /dev/null +++ b/minigrida/database/__init__.py @@ -0,0 +1,12 @@ +#!/usr/bin/env python +# file __init__.py +# author Florent Guiotte +# version 0.0 +# date 22 mai 2020 +"""Abstract + +doc. +""" + +from .design import * +from .helpers import * diff --git a/minigrida/database/design.py b/minigrida/database/design.py new file mode 100644 index 0000000..ad553e8 --- /dev/null +++ b/minigrida/database/design.py @@ -0,0 +1,46 @@ +#!/usr/bin/env python +# file design.py +# author Florent Guiotte +# version 0.0 +# date 22 mai 2020 +"""Abstract + +doc. +""" + +from datetime import date +from pony import orm + +db = orm.Database() + +class Experiment(db.Entity): + sessions = orm.Set('Session') + urgency = orm.Required(int, default=1) + status = orm.Required(str, default='pending') + + protocol = orm.Required(str) + expe = orm.Required(orm.Json) + expe_hash = orm.Required(str, 32, unique=True) + + start_date = orm.Optional(date) + end_date = orm.Optional(date) + worker = orm.Optional(str) + ressources = orm.Optional(orm.Json) + + report = orm.Optional(orm.Json) + oa = orm.Optional(float) + aa = orm.Optional(float) + k = orm.Optional(float) + +class Session(db.Entity): + project = orm.Required('Project') + date = orm.Required(date) + name = orm.PrimaryKey(str) + desc = orm.Optional(str) + urgency = orm.Required(int, default=1) + experiments = orm.Set('Experiment') + +class Project(db.Entity): + name = orm.PrimaryKey(str) + sessions = orm.Set('Session') + diff --git a/minigrida/database/helpers.py b/minigrida/database/helpers.py new file mode 100644 index 0000000..6413c34 --- /dev/null +++ b/minigrida/database/helpers.py @@ -0,0 +1,62 @@ +#!/usr/bin/env python +# file helpers.py +# author Florent Guiotte +# version 0.0 +# date 22 mai 2020 +"""Abstract + +doc. +""" + +from pony import orm +from datetime import datetime, date +import json +import hashlib + +from .design import * + +def compute_expe_hash(experiment): + return hashlib.md5(json.dumps(experiment, sort_keys=True).encode('utf-8')).hexdigest() + +@orm.db_session +def create_experiment(session_name, protocol, expe, urgency=1): + session = Session.select(lambda x: x.name == session_name) + if not session.exists(): + raise ValueError('Session "{}" does not exist'.format(session_name)) + + expe_hash = compute_expe_hash(expe) + q = Experiment.select(lambda x: x.expe_hash == expe_hash) + + if q.exists(): + e = q.first() + e.sessions.add(session) + else: + Experiment(sessions=session, protocol=protocol, expe=experiment, expe_hash=expe_hash) + +@orm.db_session +def create_project(name): + if not Project.select(lambda x: x.name == name).exists(): + Project(name=name) + else: + print('Project "{}" already exists.'.format(name)) + +@orm.db_session +def create_session(name, desc, project_name, urgency=1): + project = Project[project_name] + + if not Session.select(lambda x: x.name == name).exists(): + Session(project=project, date=datetime.now(), + name=name, desc=desc, urgency=1) + else: + print('Session "{}" already exists.'.format(name)) + +def bind_testing(): + db.bind('sqlite', ':memory:') + db.generate_mapping(create_tables=True) + +def bind(credentials_file): + with open(credentials_file) as f: + credentials = json.load(f) + db.bind(**credentials) + db.generate_mapping(create_tables=True) + diff --git a/descriptors/__init__.py b/minigrida/descriptors/__init__.py similarity index 100% rename from descriptors/__init__.py rename to minigrida/descriptors/__init__.py diff --git a/descriptors/dfc_aps.py b/minigrida/descriptors/dfc_aps.py similarity index 100% rename from descriptors/dfc_aps.py rename to minigrida/descriptors/dfc_aps.py diff --git a/descriptors/dfc_base.py b/minigrida/descriptors/dfc_base.py similarity index 100% rename from descriptors/dfc_base.py rename to minigrida/descriptors/dfc_base.py diff --git a/descriptors/dfc_daps.py b/minigrida/descriptors/dfc_daps.py similarity index 100% rename from descriptors/dfc_daps.py rename to minigrida/descriptors/dfc_daps.py diff --git a/descriptors/dfc_dsdaps.py b/minigrida/descriptors/dfc_dsdaps.py similarity index 100% rename from descriptors/dfc_dsdaps.py rename to minigrida/descriptors/dfc_dsdaps.py diff --git a/descriptors/dfc_lfaps.py b/minigrida/descriptors/dfc_lfaps.py similarity index 100% rename from descriptors/dfc_lfaps.py rename to minigrida/descriptors/dfc_lfaps.py diff --git a/descriptors/dfc_lfsdaps.py b/minigrida/descriptors/dfc_lfsdaps.py similarity index 100% rename from descriptors/dfc_lfsdaps.py rename to minigrida/descriptors/dfc_lfsdaps.py diff --git a/descriptors/dfc_sdaps.py b/minigrida/descriptors/dfc_sdaps.py similarity index 100% rename from descriptors/dfc_sdaps.py rename to minigrida/descriptors/dfc_sdaps.py diff --git a/protocols/__init__.py b/minigrida/protocols/__init__.py similarity index 100% rename from protocols/__init__.py rename to minigrida/protocols/__init__.py diff --git a/protocols/jurse.py b/minigrida/protocols/jurse.py similarity index 100% rename from protocols/jurse.py rename to minigrida/protocols/jurse.py diff --git a/protocols/protocol.py b/minigrida/protocols/protocol.py similarity index 100% rename from protocols/protocol.py rename to minigrida/protocols/protocol.py diff --git a/setup.py b/setup.py index 9dbb74b..08e970f 100644 --- a/setup.py +++ b/setup.py @@ -11,10 +11,10 @@ from distutils.core import setup setup(name='minigrida', - version='1.11', + version='2.0', description='Simple and decentralized computing grid', author='Florent Guiotte', author_email='florent.guiotte@uhb.fr', url='https://git.guiotte.fr/Florent/minigrida', - packages=['cvgenerators', 'descriptors', 'protocols'], + packages=['minigrida'],#'cvgenerators', 'descriptors', 'protocols', 'database'], ) From d356ff1dd5e6465d18cba63a540620f6180917ce Mon Sep 17 00:00:00 2001 From: Karamaz0V1 Date: Tue, 26 May 2020 10:02:32 +0200 Subject: [PATCH 2/5] WIP on refactor --- minigrida/database/helpers.py | 4 +- logger.py => minigrida/logger.py | 0 minigrida/logging.yaml | 40 +++++++ minigrida/protocols/jurse2.py | 128 +++++++++++++++++++++++ minigrida/protocols/protocol.py | 4 - supervisor.py => minigrida/supervisor.py | 57 ++-------- 6 files changed, 177 insertions(+), 56 deletions(-) rename logger.py => minigrida/logger.py (100%) create mode 100644 minigrida/logging.yaml create mode 100644 minigrida/protocols/jurse2.py rename supervisor.py => minigrida/supervisor.py (77%) diff --git a/minigrida/database/helpers.py b/minigrida/database/helpers.py index 6413c34..bfe382b 100644 --- a/minigrida/database/helpers.py +++ b/minigrida/database/helpers.py @@ -50,11 +50,11 @@ def create_session(name, desc, project_name, urgency=1): else: print('Session "{}" already exists.'.format(name)) -def bind_testing(): +def connect_testing(): db.bind('sqlite', ':memory:') db.generate_mapping(create_tables=True) -def bind(credentials_file): +def connect(credentials_file): with open(credentials_file) as f: credentials = json.load(f) db.bind(**credentials) diff --git a/logger.py b/minigrida/logger.py similarity index 100% rename from logger.py rename to minigrida/logger.py diff --git a/minigrida/logging.yaml b/minigrida/logging.yaml new file mode 100644 index 0000000..b18e6ed --- /dev/null +++ b/minigrida/logging.yaml @@ -0,0 +1,40 @@ +version: 1 +disable_existing_loggers: False +formatters: + simple: + format: "%(asctime)s %(levelname)s %(name)s: %(message)s" + +handlers: + console: + class: logging.StreamHandler + level: INFO + formatter: simple + stream: ext://sys.stdout + + info_file_handler: + class: logging.handlers.RotatingFileHandler + level: INFO + formatter: simple + filename: Logs/info.log + maxBytes: 10485760 # 10MB + backupCount: 20 + encoding: utf8 + + error_file_handler: + class: logging.handlers.RotatingFileHandler + level: ERROR + formatter: simple + filename: Logs/errors.log + maxBytes: 10485760 # 10MB + backupCount: 20 + encoding: utf8 + +loggers: + my_module: + level: DEBUG + handlers: [console] + propagate: no + +root: + level: DEBUG + handlers: [console, info_file_handler, error_file_handler] diff --git a/minigrida/protocols/jurse2.py b/minigrida/protocols/jurse2.py new file mode 100644 index 0000000..aabf715 --- /dev/null +++ b/minigrida/protocols/jurse2.py @@ -0,0 +1,128 @@ +#!/usr/bin/env python +# file jurse2.py +# author Florent Guiotte +# version 0.0 +# date 26 mai 2020 +"""Abstract + +doc. +""" + +import importlib +import numpy as np +from sklearn import metrics +import rasterio +from .protocol import Protocol, TestError + + +class Jurse2(Protocol): + """Second JURSE test protocol for LiDAR classification with 2D maps. + + """ + + def __init__(self, expe): + super().__init__(expe, self.__class__.__name__) + + def _get_hashes(self): + hashes = OrderedDict() + glob = hashlib.sha1() + + for k in ['ground_truth', 'descriptors_script', + 'cross_validation', 'classifier']: + val = str(self._expe[k]).encode('utf-8') + hashes[k] = hashlib.sha1(val).hexdigest() + glob.update(val) + hashes['global'] = glob.hexdigest() + + return hashes + + def _run(self): + self._log.info('Compute descriptors') + try: + descriptors = self._compute_descriptors() + except Exception: + raise TestError('Error occured during description') + self._time('description') + + self._log.info('Classify data') + try: + classification = self._compute_classification(descriptors) + except Exception: + raise TestError('Error occured during classification') + self._time('classification') + + self._log.info('Run metrics') + metrics = self._run_metrics(classification, descriptors) + self._time('metrics') + + cmap = str(self._results_base_name) + '.tif' + self._log.info('Saving classification map {}'.format(cmap)) + triskele.write(cmap, classification) + + results = OrderedDict() + results['classification'] = cmap + results['metrics'] = metrics + self._results = results + + def _compute_descriptors(self): + script = self._expe['descriptors_script'] + + desc = importlib.import_module(script['name']) + att = desc.run(**script['parameters']) + + return att + + def _compute_classification(self, descriptors): + # Ground truth + gt = self._get_ground_truth() + + # CrossVal and ML + cv = self._expe['cross_validation'] + cl = self._expe['classifier'] + + cross_val = getattr(importlib.import_module(cv['package']), cv['name']) + classifier = getattr(importlib.import_module(cl['package']), cl['name']) + + prediction = np.zeros_like(gt, dtype=np.uint8) + + for xt, xv, yt, yv, ti in cross_val(gt, descriptors, **cv['parameters']): + rfc = classifier(**cl['parameters']) + rfc.fit(xt, yt) + + ypred = rfc.predict(xv) + + prediction[ti] = ypred + + return prediction + + def _get_ground_truth(self): + gt_expe = self._expe['ground_truth'] + gt = triskele.read(gt_expe['raster']) + + # Meta labeling + idx_map = np.arange(gt.max() + 1) + + if 'meta_labels' in gt_expe: + meta_idx = pd.read_csv(gt_expe['meta_labels']) + idx = np.array(meta_idx['index']) + midx = np.array(meta_idx['metaclass_index']) + idx_map[idx] = midx + + return idx_map[gt] + + def _get_results(self): + return self._results + + def _run_metrics(self, classification, descriptors): + gt = self._get_ground_truth() + + f = np.nonzero(classification) + pred = classification[f].ravel() + gt = gt[f].ravel() + + results = OrderedDict() + results['dimensions'] = descriptors.shape[-1] + results['overall_accuracy'] = float(metrics.accuracy_score(gt, pred)) + results['cohen_kappa'] = float(metrics.cohen_kappa_score(gt, pred)) + + return results diff --git a/minigrida/protocols/protocol.py b/minigrida/protocols/protocol.py index b53efc8..73cbbab 100644 --- a/minigrida/protocols/protocol.py +++ b/minigrida/protocols/protocol.py @@ -10,16 +10,12 @@ import logging import time -from collections import OrderedDict - class Protocol: def __init__(self, expe, name=None): self._log = logging.getLogger(name) self._expe = expe self._name = name - self._times = OrderedDict() - self._results_base_name = None self._log.debug('expe loaded: {}'.format(self._expe)) def get_hashes(self): diff --git a/supervisor.py b/minigrida/supervisor.py similarity index 77% rename from supervisor.py rename to minigrida/supervisor.py index 63cacb8..61c7907 100644 --- a/supervisor.py +++ b/minigrida/supervisor.py @@ -3,15 +3,13 @@ # \file supervisor.py # \brief TODO # \author Florent Guiotte -# \version 0.1 +# \version 2.1 # \date 07 sept. 2018 # # TODO details -import yaml import importlib -import hashlib -from collections import OrderedDict +import json import time import os import datetime @@ -21,34 +19,10 @@ import traceback import logging import logger from protocols.protocol import TestError - -ENRICHMENT_DIR = Path('./Enrichment/') -TEST_DIR = ENRICHMENT_DIR / 'Tests' -STAGING_DIR = ENRICHMENT_DIR / 'Staging' -RESULT_DIR = ENRICHMENT_DIR / 'Results' -FAILED_DIR = ENRICHMENT_DIR / 'Failed' +import database log = logging.getLogger('Supervisor [{}]'.format(os.uname()[1])) -def setup_yaml(): - """ Keep yaml ordered, newline string - from https://stackoverflow.com/a/8661021 - """ - represent_dict_order = lambda self, data: self.represent_mapping('tag:yaml.org,2002:map', data.items()) - yaml.add_representer(OrderedDict, represent_dict_order) - - """ https://stackoverflow.com/a/24291536 """ - yaml.Dumper.org_represent_str = yaml.Dumper.represent_str - yaml.add_representer(str, repr_str, Dumper=yaml.Dumper) - - -def repr_str(dumper, data): - if '\n' in data: - return dumper.represent_scalar(u'tag:yaml.org,2002:str', - data, style='|') - return dumper.org_represent_str(data) - - def update_queue(): tmp_queue = list() for child in TEST_DIR.iterdir(): @@ -188,28 +162,12 @@ class ExpePath: def get_result_path(self): return Path(RESULT_DIR) / self._get_hash_name() - def _check_hash(self, expe): - if self._hash is None: - if 'hashes' in expe: - self._hash = expe['hashes']['global'] - - def _write(self, path, expe): - new_path = Path(path) / self._get_complete_name() - with open(new_path, 'w') as of: - yaml.dump(expe, of, - default_flow_style=False, - encoding=None, - allow_unicode=True) - self._actual.unlink() - self._actual = new_path - - -def watch_folder(): - log.info('Waiting for test') - while not list(TEST_DIR.glob('*.yml')): - time.sleep(3) +def get_database(): def main(): + print('Hello again') + database + return while(True): try: queue = update_queue() @@ -239,5 +197,4 @@ if __name__ == '__main__': logger.setup_logging() log.info('Starting supervisor') - setup_yaml() main() From bbd62654f82eec9d7ee55be2ee23e68e0bf189a5 Mon Sep 17 00:00:00 2001 From: Karamaz0V1 Date: Wed, 27 May 2020 08:50:54 +0200 Subject: [PATCH 3/5] Working --- .gitignore | 7 +- logging.yaml | 4 +- minigrida/database/helpers.py | 42 +++++- minigrida/descriptors/dfc_base.py | 2 +- minigrida/descriptors/pixel.py | 37 ++++++ minigrida/loader/__init__.py | 9 ++ minigrida/loader/tiles.py | 32 +++++ minigrida/protocols/__init__.py | 3 + minigrida/protocols/jurse2.py | 103 +++++++-------- minigrida/protocols/protocol.py | 33 ++--- minigrida/supervisor.py | 210 +++++++----------------------- 11 files changed, 231 insertions(+), 251 deletions(-) create mode 100644 minigrida/descriptors/pixel.py create mode 100644 minigrida/loader/__init__.py create mode 100644 minigrida/loader/tiles.py diff --git a/.gitignore b/.gitignore index fe1e3ea..aef6076 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,8 @@ Enrichment/ __pycache__/ -Logs/ -[Dd]ata/ +[Ll]ogs/ +[Dd]ata +[Cc]ache *.egg-info/ +credentials* + diff --git a/logging.yaml b/logging.yaml index b18e6ed..25024b3 100644 --- a/logging.yaml +++ b/logging.yaml @@ -15,7 +15,7 @@ handlers: class: logging.handlers.RotatingFileHandler level: INFO formatter: simple - filename: Logs/info.log + filename: logs/info.log maxBytes: 10485760 # 10MB backupCount: 20 encoding: utf8 @@ -24,7 +24,7 @@ handlers: class: logging.handlers.RotatingFileHandler level: ERROR formatter: simple - filename: Logs/errors.log + filename: logs/errors.log maxBytes: 10485760 # 10MB backupCount: 20 encoding: utf8 diff --git a/minigrida/database/helpers.py b/minigrida/database/helpers.py index bfe382b..36e6354 100644 --- a/minigrida/database/helpers.py +++ b/minigrida/database/helpers.py @@ -12,8 +12,11 @@ from pony import orm from datetime import datetime, date import json import hashlib +import logging -from .design import * +from .design import Session, Experiment, Project, db + +log = logging.getLogger() def compute_expe_hash(experiment): return hashlib.md5(json.dumps(experiment, sort_keys=True).encode('utf-8')).hexdigest() @@ -31,7 +34,7 @@ def create_experiment(session_name, protocol, expe, urgency=1): e = q.first() e.sessions.add(session) else: - Experiment(sessions=session, protocol=protocol, expe=experiment, expe_hash=expe_hash) + Experiment(sessions=session, protocol=protocol, expe=expe, expe_hash=expe_hash) @orm.db_session def create_project(name): @@ -50,13 +53,46 @@ def create_session(name, desc, project_name, urgency=1): else: print('Session "{}" already exists.'.format(name)) +@orm.db_session +def pending_experiments(): + return Experiment.select(lambda x: x.status == 'pending').exists() + +@orm.db_session +def next_experiment(): + # TODO: take session urgency into account + expe = orm.select(e for e in Experiment + if e.status == 'pending' + and e.urgency == orm.max(e.urgency for e in Experiment + if e.status == 'pending') + ).random(1) + if expe: + expe = expe[0] + expe.status = 'staging' + return expe + + +def update_experiment(expe, **params): + try: + _update_experiment(expe, **params) + except orm.DatabaseError as e: + log.error(e) + log.info('Retry update') + _update_experiment(expe, **params) + +@orm.db_session +def _update_experiment(expe, **params): + e = Experiment.select(lambda x: x.id == expe.id).first() + for k, v in params.items(): + setattr(e, k, v) + + def connect_testing(): db.bind('sqlite', ':memory:') db.generate_mapping(create_tables=True) + def connect(credentials_file): with open(credentials_file) as f: credentials = json.load(f) db.bind(**credentials) db.generate_mapping(create_tables=True) - diff --git a/minigrida/descriptors/dfc_base.py b/minigrida/descriptors/dfc_base.py index da83012..bc8bfd3 100644 --- a/minigrida/descriptors/dfc_base.py +++ b/minigrida/descriptors/dfc_base.py @@ -1,4 +1,4 @@ -#!/usr/bin/python +j#!/usr/bin/python # -*- coding: utf-8 -*- # \file dfc_base.py # \brief TODO diff --git a/minigrida/descriptors/pixel.py b/minigrida/descriptors/pixel.py new file mode 100644 index 0000000..cf329fa --- /dev/null +++ b/minigrida/descriptors/pixel.py @@ -0,0 +1,37 @@ +#!/usr/bin/env python +# file pixel.py +# author Florent Guiotte +# version 0.0 +# date 26 mai 2020 +"""Abstract + +doc. +""" + +import numpy as np + + +def run(gt, rasters, remove=None): + X = [] + y = [] + groups = [] + + for i, (gti, rastersi) in enumerate(zip(gt, rasters)): + # Create vectors + X_raw = np.moveaxis(np.array(list(rastersi.values())), 0, -1) + y_raw = gti + + # Remove unwanted label X, y + lbl = np.ones_like(y_raw, dtype=np.bool) + for l in remove if remove else []: + lbl &= y_raw != l + + X += [X_raw[lbl]] + y += [y_raw[lbl]] + groups += [np.repeat(i, lbl.sum())] + + X = np.concatenate(X) + y = np.concatenate(y) + groups = np.concatenate(groups) + + return X, y, groups diff --git a/minigrida/loader/__init__.py b/minigrida/loader/__init__.py new file mode 100644 index 0000000..174a390 --- /dev/null +++ b/minigrida/loader/__init__.py @@ -0,0 +1,9 @@ +#!/usr/bin/env python +# file __init__.py +# author Florent Guiotte +# version 0.0 +# date 26 mai 2020 +"""Abstract + +doc. +""" diff --git a/minigrida/loader/tiles.py b/minigrida/loader/tiles.py new file mode 100644 index 0000000..e86d3e9 --- /dev/null +++ b/minigrida/loader/tiles.py @@ -0,0 +1,32 @@ +#!/usr/bin/env python +# file tiles.py +# author Florent Guiotte +# version 0.0 +# date 26 mai 2020 +"""Abstract + +doc. +""" + +from pathlib import Path +import rasterio as rio + + +def run(tiles_count, gt_path, gt_name, rasters_path, rasters_name): + gt = [] + rasters = [] + + for i in range(tiles_count): + gt += [load_tif(raster_path(gt_path, gt_name, i))] + rasters += [{Path(n).stem: load_tif(raster_path(rasters_path, n, i)) + for n in rasters_name}] + + return gt, rasters + + +def load_tif(path): + return rio.open(str(path)).read()[0] + + +def raster_path(path, name, i): + return Path(path) / '{}_{}_{}.tif'.format(Path(name).stem, 0, i) diff --git a/minigrida/protocols/__init__.py b/minigrida/protocols/__init__.py index e17274e..d9b5df2 100644 --- a/minigrida/protocols/__init__.py +++ b/minigrida/protocols/__init__.py @@ -7,3 +7,6 @@ # \date 09 sept. 2018 # # TODO details + +#from .jurse import Jurse +from .jurse2 import Jurse2 diff --git a/minigrida/protocols/jurse2.py b/minigrida/protocols/jurse2.py index aabf715..8238b16 100644 --- a/minigrida/protocols/jurse2.py +++ b/minigrida/protocols/jurse2.py @@ -11,7 +11,6 @@ doc. import importlib import numpy as np from sklearn import metrics -import rasterio from .protocol import Protocol, TestError @@ -23,58 +22,50 @@ class Jurse2(Protocol): def __init__(self, expe): super().__init__(expe, self.__class__.__name__) - def _get_hashes(self): - hashes = OrderedDict() - glob = hashlib.sha1() - - for k in ['ground_truth', 'descriptors_script', - 'cross_validation', 'classifier']: - val = str(self._expe[k]).encode('utf-8') - hashes[k] = hashlib.sha1(val).hexdigest() - glob.update(val) - hashes['global'] = glob.hexdigest() - - return hashes - def _run(self): + self._log.info('Load data') + try: + data = self._load_data() + except Exception: + raise TestError('Error occured during data loading') + self._log.info('Compute descriptors') try: - descriptors = self._compute_descriptors() + descriptors = self._compute_descriptors(data) except Exception: raise TestError('Error occured during description') - self._time('description') - self._log.info('Classify data') + self._log.info('Classify descriptors') try: classification = self._compute_classification(descriptors) except Exception: raise TestError('Error occured during classification') - self._time('classification') self._log.info('Run metrics') metrics = self._run_metrics(classification, descriptors) - self._time('metrics') - cmap = str(self._results_base_name) + '.tif' - self._log.info('Saving classification map {}'.format(cmap)) - triskele.write(cmap, classification) - - results = OrderedDict() - results['classification'] = cmap + results = {} results['metrics'] = metrics self._results = results - def _compute_descriptors(self): + def _load_data(self): + data_loader = self._expe['data_loader'] + + loader = importlib.import_module(data_loader['name']) + data = loader.run(**data_loader['parameters']) + + return data + + def _compute_descriptors(self, data): script = self._expe['descriptors_script'] desc = importlib.import_module(script['name']) - att = desc.run(**script['parameters']) + att = desc.run(*data, **script['parameters']) return att def _compute_classification(self, descriptors): - # Ground truth - gt = self._get_ground_truth() + X, y, groups = descriptors # CrossVal and ML cv = self._expe['cross_validation'] @@ -83,46 +74,40 @@ class Jurse2(Protocol): cross_val = getattr(importlib.import_module(cv['package']), cv['name']) classifier = getattr(importlib.import_module(cl['package']), cl['name']) - prediction = np.zeros_like(gt, dtype=np.uint8) + y_pred = np.zeros_like(y) - for xt, xv, yt, yv, ti in cross_val(gt, descriptors, **cv['parameters']): - rfc = classifier(**cl['parameters']) - rfc.fit(xt, yt) + cvi = cross_val(**cv['parameters']) + for train_index, test_index in cvi.split(X, y, groups): + cli = classifier(**cl['parameters']) - ypred = rfc.predict(xv) + self._log.info(' - fit') + cli.fit(X[train_index], y[train_index]) - prediction[ti] = ypred + self._log.info(' - predict') + y_pred[test_index] = cli.predict(X[test_index]) - return prediction - - def _get_ground_truth(self): - gt_expe = self._expe['ground_truth'] - gt = triskele.read(gt_expe['raster']) - - # Meta labeling - idx_map = np.arange(gt.max() + 1) - - if 'meta_labels' in gt_expe: - meta_idx = pd.read_csv(gt_expe['meta_labels']) - idx = np.array(meta_idx['index']) - midx = np.array(meta_idx['metaclass_index']) - idx_map[idx] = midx - - return idx_map[gt] + return y_pred def _get_results(self): return self._results def _run_metrics(self, classification, descriptors): - gt = self._get_ground_truth() + X, y_true, groups = descriptors + y_pred = classification - f = np.nonzero(classification) - pred = classification[f].ravel() - gt = gt[f].ravel() + self._log.info(' - Scores') + self.oa = metrics.accuracy_score(y_true, y_pred) + self.aa = metrics.balanced_accuracy_score(y_true, y_pred) + self.k = metrics.cohen_kappa_score(y_true, y_pred) - results = OrderedDict() - results['dimensions'] = descriptors.shape[-1] - results['overall_accuracy'] = float(metrics.accuracy_score(gt, pred)) - results['cohen_kappa'] = float(metrics.cohen_kappa_score(gt, pred)) + self._log.info(' - Additional results') + p, r, f, s = metrics.precision_recall_fscore_support(y_true, y_pred) + cm = metrics.confusion_matrix(y_true, y_pred) + results = {'dimensions': X.shape[-1], + 'precision': p.tolist(), + 'recall': r.tolist(), + 'f1score': f.tolist(), + 'support': s.tolist(), + 'confusion': cm.tolist()} return results diff --git a/minigrida/protocols/protocol.py b/minigrida/protocols/protocol.py index 73cbbab..68e3428 100644 --- a/minigrida/protocols/protocol.py +++ b/minigrida/protocols/protocol.py @@ -11,52 +11,41 @@ import logging import time + class Protocol: def __init__(self, expe, name=None): self._log = logging.getLogger(name) self._expe = expe self._name = name self._log.debug('expe loaded: {}'.format(self._expe)) - - def get_hashes(self): - self._log.info('Computing hashes') - return(self._get_hashes()) - - def set_results_base_name(self, base_name): - self._results_base_name = base_name + self.k = None + self.oa = None + self.aa = None def run(self): - self._pt = time.process_time() + spt = time.process_time() self._run() + self._pt = time.process_time() - spt def get_results(self): return self._get_results() def get_process_time(self): - return self._times - - def _time(self, process): - self._times[process] = time.process_time() - self._pt - self._pt = time.process_time() - - def _get_hashes(self): - self._log.warning('Protocol did not override _get_hashes()') - hashes = OrderedDict() - hashes['global'] = 'Protocol did not override _get_hashes()' - return hashes + return self._pt def _run(self): self._log.error('Protocol did not override _run()') - raise NotImplementedError('Protocol {} did not override _run()'.format(self)) + raise NotImplementedError('Protocol {} did not override _run()' + .format(self)) def _get_results(self): self._log.warning('Protocol did not override _get_results()') - results = OrderedDict() + results = {} results['global'] = 'Protocol did not override _get_results()' return results def __str__(self): - return('{}'.format(self._name)) + return '{}'.format(self._name) class TestError(Exception): diff --git a/minigrida/supervisor.py b/minigrida/supervisor.py index 61c7907..f5b42d0 100644 --- a/minigrida/supervisor.py +++ b/minigrida/supervisor.py @@ -9,190 +9,76 @@ # TODO details import importlib -import json import time import os -import datetime -from pathlib import Path -from operator import itemgetter +from datetime import datetime import traceback import logging import logger from protocols.protocol import TestError import database -log = logging.getLogger('Supervisor [{}]'.format(os.uname()[1])) - -def update_queue(): - tmp_queue = list() - for child in TEST_DIR.iterdir(): - if child.is_file() and child.suffix == '.yml': - tmp_queue.append({'expe_file': ExpePath(child), - 'priority': get_priority(child)}) - - queue = sorted(tmp_queue, key=itemgetter('priority')) - return queue +host = os.uname()[1] +log = logging.getLogger('Supervisor [{}]'.format(host)) -def get_priority(yml_file): - with open(yml_file) as f: - expe = OrderedDict(yaml.safe_load(f)) - return expe['priority'] +def run(expe): + database.update_experiment(expe, worker=host, start_date=datetime.now()) + # Load protocol + log.info('Load protocol {}'.format(expe.protocol)) + protocol_module = importlib.import_module('protocols') + importlib.reload(protocol_module) + protocol = getattr(protocol_module, expe.protocol) + test = protocol(expe.expe) -def run(expe_file): - start_time = time.time() - log.info('Run test {}'.format(expe_file)) - test = expe_file.read() - - ### Stage experience - expe_file.stage(test) - - ### Load protocol + # Run test try: - #protocol = getattr(importlib.import_module(test['protocol']['package']), test['protocol']['name']) - protocol_module = importlib.import_module(test['protocol']['package']) - importlib.reload(protocol_module) - protocol = getattr(protocol_module, test['protocol']['name']) - experience = protocol(test['expe']) - except Exception as e: - err = 'Could not load protocol from test {}'.format(expe_file) - log.warning(err) - expe_file.error(test, 'loading protocol', e) - raise TestError(err) - log.info('{} test protocol loaded'.format(experience)) - - ### Get hashes - test['hashes'] = experience.get_hashes() - test['report'] = create_report(experience, start_time) - - ### Stage experience - expe_file.stage(test) - - experience.set_results_base_name(expe_file.get_result_path()) - - ### Run test - try: - experience.run() + test.run() except Exception as e: err = 'Experience error' log.warning(err) - expe_file.error(test, 'testing', e) - raise TestError(err) + report = {'error': {'name': str(err), + 'trace': traceback.format_exc()}} + database.update_experiment(expe, report=report, status='error') + raise TestError(err) - end_time = time.time() + # Write report + log.info('Write report') + database.update_experiment(expe, + end_date=datetime.now(), + oa=test.oa, + aa=test.aa, + k=test.k, + report=test.get_results(), + status='complete') - ### Get complete report - test['report'] = create_report(experience, start_time, end_time) + # End of test + log.info('Expe {} complete'.format(expe.expe_hash)) - ### Get results - test['results'] = experience.get_results() - - ### Write experience - expe_file.result(test) - - ### End of test - log.info('Test complete') - -def create_report(experience, stime=None, etime=None): - expe_report = OrderedDict() - host = os.getenv("HOST") - expe_report['supervisor'] = host if host is not None else os.uname()[1] - - # Dates - for datek, timev in zip(('start_date', 'end_date'), (stime, etime)): - expe_report[datek] = datetime.datetime.fromtimestamp(timev).strftime('Le %d/%m/%Y à %H:%M:%S') if timev is not None else None - - # Ressources - ressources = OrderedDict() - ressources['ram'] = None - ressources['proccess_time'] = experience.get_process_time() - expe_report['ressources'] = ressources - - return expe_report - -class ExpePath: - """Utility wrapper for expe files. - - Extend pathlib Path with staging, result and errors function to move the - test report through the Enrichment center. - - """ - def __init__(self, path, hash_length=6): - self._actual = Path(path) - self._base_name = self._actual.stem - self._hash_length = hash_length - self._hash = None - - def __str__(self): - return self._get_complete_name() - - def read(self): - with open(self._actual) as f: - return OrderedDict(yaml.safe_load(f)) - - def _get_hash_name(self): - return '{}{}'.format(self._base_name, - '_' + self._hash[:self._hash_length] if self._hash is not None else '') - - def _get_complete_name(self): - return self._get_hash_name() + '.yml' - - def exists(self): - return self._actual.exists() - - def stage(self, expe): - log.info('Staging {}'.format(self._base_name)) - self._check_hash(expe) - self._write(STAGING_DIR, expe) - - def result(self, expe): - log.info('Write results for test {}'.format(self._base_name)) - self._check_hash(expe) - self._write(RESULT_DIR, expe) - - def error(self, expe, when='', e=Exception): - error = OrderedDict() - error['when'] = when - error['what'] = str(e) - error['where'] = traceback.format_exc() - expe['error'] = error - self._write(FAILED_DIR, expe) - - def get_result_path(self): - return Path(RESULT_DIR) / self._get_hash_name() - -def get_database(): def main(): - print('Hello again') - database - return + log.info('Connecting to database') + database.connect('credentials.json') + while(True): - try: - queue = update_queue() - except Exception: - log.error('Critical exception while updating work queue') - log.error(traceback.format_exc()) - log.warning('Resuming') - continue - if not queue: - watch_folder() - continue - try: - expe_file = queue.pop()['expe_file'] - while(not expe_file.exists() and queue): - expe_file = queue.pop()['expe_file'] - if expe_file.exists(): - run(expe_file) - except TestError: - log.warning('Test failed, error logged. Resuming') - except Exception: - log.error('Critical exception while running test. Resuming') - log.error(traceback.format_exc()) - log.warning('Resuming') - continue - + if not database.pending_experiments(): + log.info('No pending experiments, waiting...') + time.sleep(30) + + else: + log.info('Loading next experiment') + expe = database.next_experiment() + if not expe: + continue + log.info('Expe {} loaded'.format(expe.expe_hash)) + try: + run(expe) + except Exception as e: + log.error(e) + log.error('Error occured on expe {}'.format(expe.id)) + + if __name__ == '__main__': logger.setup_logging() log.info('Starting supervisor') From d0ceae758a9377efc322d9983a37ab26d6cc7966 Mon Sep 17 00:00:00 2001 From: Karamaz0V1 Date: Wed, 27 May 2020 08:52:09 +0200 Subject: [PATCH 4/5] Delay when failing --- minigrida/supervisor.py | 1 + 1 file changed, 1 insertion(+) diff --git a/minigrida/supervisor.py b/minigrida/supervisor.py index f5b42d0..1b9ed3e 100644 --- a/minigrida/supervisor.py +++ b/minigrida/supervisor.py @@ -77,6 +77,7 @@ def main(): except Exception as e: log.error(e) log.error('Error occured on expe {}'.format(expe.id)) + time.sleep(60) if __name__ == '__main__': From c967d76516eb20bec82ad796111c50dd83fdae18 Mon Sep 17 00:00:00 2001 From: Karamaz0V1 Date: Wed, 27 May 2020 09:36:37 +0200 Subject: [PATCH 5/5] Process --- config.json | 3 +++ minigrida/database/helpers.py | 6 ++++-- minigrida/supervisor.py | 23 ++++++++++++++++++----- 3 files changed, 25 insertions(+), 7 deletions(-) create mode 100644 config.json diff --git a/config.json b/config.json new file mode 100644 index 0000000..8e895e8 --- /dev/null +++ b/config.json @@ -0,0 +1,3 @@ +{ + "process_count": 2 +} diff --git a/minigrida/database/helpers.py b/minigrida/database/helpers.py index 36e6354..7c95ba6 100644 --- a/minigrida/database/helpers.py +++ b/minigrida/database/helpers.py @@ -57,7 +57,8 @@ def create_session(name, desc, project_name, urgency=1): def pending_experiments(): return Experiment.select(lambda x: x.status == 'pending').exists() -@orm.db_session + +@orm.db_session(serializable=True, optimistic=False) def next_experiment(): # TODO: take session urgency into account expe = orm.select(e for e in Experiment @@ -79,9 +80,10 @@ def update_experiment(expe, **params): log.info('Retry update') _update_experiment(expe, **params) + @orm.db_session def _update_experiment(expe, **params): - e = Experiment.select(lambda x: x.id == expe.id).first() + e = Experiment.get_for_update(id=expe.id) for k, v in params.items(): setattr(e, k, v) diff --git a/minigrida/supervisor.py b/minigrida/supervisor.py index 1b9ed3e..e38cb7a 100644 --- a/minigrida/supervisor.py +++ b/minigrida/supervisor.py @@ -17,13 +17,15 @@ import logging import logger from protocols.protocol import TestError import database +from multiprocessing import Process +import json host = os.uname()[1] log = logging.getLogger('Supervisor [{}]'.format(host)) -def run(expe): - database.update_experiment(expe, worker=host, start_date=datetime.now()) +def run(expe, hostpid=host): + database.update_experiment(expe, worker=hostpid, start_date=datetime.now()) # Load protocol log.info('Load protocol {}'.format(expe.protocol)) @@ -57,7 +59,10 @@ def run(expe): log.info('Expe {} complete'.format(expe.expe_hash)) -def main(): +def main(pid=None): + hostpid = host + '_' + str(pid) if pid is not None else host + log.name = 'Supervisor [{}]'.format(hostpid) + log.info('Connecting to database') database.connect('credentials.json') @@ -73,7 +78,7 @@ def main(): continue log.info('Expe {} loaded'.format(expe.expe_hash)) try: - run(expe) + run(expe, hostpid) except Exception as e: log.error(e) log.error('Error occured on expe {}'.format(expe.id)) @@ -83,5 +88,13 @@ def main(): if __name__ == '__main__': logger.setup_logging() log.info('Starting supervisor') + try: + with open('config.json') as f: + config = json.load(f) + process_count = config['process_count'] + except Exception as e: + log.warning(e) + process_count = 1 - main() + for i in range(process_count): + Process(target=main, args=(i,)).start()