Supervisor and Jurse ready for production #1

Merged
florent merged 5 commits from develop into master 2024-11-10 15:43:36 +01:00
4 changed files with 455 additions and 0 deletions
Showing only changes of commit 12d0bcd2c8 - Show all commits

61
mockup.yml Normal file
View File

@ -0,0 +1,61 @@
name: Première expérience
date: 9 juillet 2018
priority: 1
detail: |
Maquette pour la création du supervisor de minigrida. Par rapport à la
version legacy du projet LD2DAPs, le choix du protocole de test est
générique. Il faut ajouter le chargement dynamique du protocole puis
réusiner le fonctionnement du supervisor pour respecter l'esprit universel
de minigrida.
protocol: JurseSF
expe:
ground_truth:
raster: ./Data/ground_truth/2018_IEEE_GRSS_DFC_GT_TR.tif
meta_labels: GT/jurse_idx.csv
descriptors_script:
name: Descriptors.dfc_aps
parameters:
areas:
- 100
- 1000
moi:
- 0.5
- 0.9
rasters:
- ./Data/phase1_rasters/DEM+B_C123/UH17_GEM051_TR.tif
- ./Data/phase1_rasters/DEM_C123_3msr/UH17_GEG051_TR.tif
treshold: 1e4
cross_validation:
name: CrossValidationGenerator.APsCVG
parameters:
n_test: 2
classifier:
name: sklearn.ensemble.RandomForestClassifier
parameters:
min_samples_leaf: 10
n_estimators: 50
n_jobs: -1
random_state: 0
expe_hashes:
ground_truth: 2c5ecaddcb8c4a1c8863bc65e7440de4a1b4962c
descriptors_script: cfdcc84d9d9c47177930257f286d850db446812b
cross_validation: 4a61b34fda812fe717890b25d75430023335a7a6
classifier: 40e6741ef8cc4b4fbe188b8ca0563eb5195b88ad
global: b8219fab322bf11ec1aac14a1f51466dd94ddbdd
expe_report:
supervisor: thecomedian
start_date: Le 27/07/2018 à 16:28:52
end_date: Le 27/07/2018 à 16:29:54
ressources:
ram: 42 Go
process_time:
description: 0.6744262149950373
classification: 168.82905034400028
metrics: 1.1557443889978458
expe_results:
classification: test_b8219f.tif
dimensions: 42
scores:
overall_accuracy: 0.5550408093111998
cohen_kappa: 0.41714275852261407

View File

@ -0,0 +1,30 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
# \file jurse_default.py
# \brief TODO
# \author Florent Guiotte <florent.guiotte@gmail.com>
# \version 0.1
# \date 07 sept. 2018
#
# TODO details
import hashlib
from protocol import Protocol
class Jurse(Protocol):
def __init__(self, expe):
super().__init__(expe, self.__class__.__name__)
def _get_hashes(self):
hashes = OrderedDict()
hashes['global'] = 'Protocol did not override _get_hashes()'
glob = hashlib.sha1()
for k in ['ground_truth', 'descriptors_script', 'cross_validation', 'classifier']:
v = str(expe[k]).encode('utf-8')
hashes[k] = hashlib.sha1(v).hexdigest()
glob.update(v)
hashes['global'] = glob.hexdigest()
return hashes

63
protocols/protocol.py Normal file
View File

@ -0,0 +1,63 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
# \file protocol.py
# \brief TODO
# \author Florent Guiotte <florent.guiotte@gmail.com>
# \version 0.1
# \date 07 sept. 2018
#
# TODO details
import logging
import time
from collections import OrderedDict
class Protocol:
def __init__(self, expe, name=None):
self._log = logging.getLogger(name)
self._expe = expe
self._name = name
self._times = OrderedDict()
self._pt = time.process_time()
def get_hashes(self):
self._log.info('Computing hashes')
return(self._get_hashes())
def run(self):
self._kronos = Kronos()
self._run()
def get_results(self):
self._get_results()
def get_process_time(self):
return self._times()
def _time(self, process):
self._times[process] = time.process_time() - self._pt
self._pt = time.process_time()
def _get_hashes(self):
self._log.warning('Protocol did not override _get_hashes()')
hashes = OrderedDict()
hashes['global'] = 'Protocol did not override _get_hashes()'
return hashes
def _run(self):
self._log.error('Protocol did not override _run()')
raise NotImplementedError('Protocol {} did not override _run()'.format(self))
def _get_results(self):
self._log.warning('Protocol did not override _get_results()')
results = OrderedDict()
results['global'] = 'Protocol did not override _get_results()'
return results
def __str__(self):
return('{}'.format(self._name))
class TestError(Exception):
pass

301
supervisor.py Normal file
View File

@ -0,0 +1,301 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
# \file supervisor.py
# \brief TODO
# \author Florent Guiotte <florent.guiotte@gmail.com>
# \version 0.1
# \date 07 sept. 2018
#
# TODO details
import yaml
import importlib
import hashlib
from collections import OrderedDict
import time
import os
import datetime
from pathlib import Path
from operator import itemgetter
import traceback
import logging
import logger
log = logging.getLogger('Supervisor [{}]'.format(os.uname()[1]))
### Keep yaml ordered, newline string
def setup_yaml():
""" https://stackoverflow.com/a/8661021 """
represent_dict_order = lambda self, data: self.represent_mapping('tag:yaml.org,2002:map', data.items())
yaml.add_representer(OrderedDict, represent_dict_order)
""" https://stackoverflow.com/a/24291536 """
yaml.Dumper.org_represent_str = yaml.Dumper.represent_str
yaml.add_representer(str, repr_str, Dumper=yaml.Dumper)
def repr_str(dumper, data):
if '\n' in data:
return dumper.represent_scalar(u'tag:yaml.org,2002:str', data, style='|')
return dumper.org_represent_str(data)
ENRICHMENT_DIR = Path('./Enrichment/')
TEST_DIR = ENRICHMENT_DIR / 'Tests'
STAGING_DIR = ENRICHMENT_DIR / 'Staging'
RESULT_DIR = ENRICHMENT_DIR / 'Results'
FAILED_DIR = ENRICHMENT_DIR / 'Failed'
def update_queue():
tmp_queue = list()
for child in TEST_DIR.iterdir():
if child.is_file() and child.suffix == '.yml':
tmp_queue.append({'expe_file': child, 'priority': get_priority(child)})
queue = sorted(tmp_queue, key=itemgetter('priority'))
return queue
def get_priority(yml_file):
with open(yml_file) as f:
expe = OrderedDict(yaml.safe_load(f))
return expe['priority']
def run(expe_file):
log.info('Run test {}'.format(expe_file))
with open(expe_file) as f:
test = OrderedDict(yaml.safe_load(f))
### Stage test
### Load protocol
### Write hahes
### Run test
### Write report
### Write results
### End of test
return
### Keep track of time
kronos = Kronos()
### Compute hashes
log.info('Computing hashes')
expe_hashes = compute_hashes(expe)
### Create output names
oname = '{}_{}'.format(expe_file.stem, expe_hashes['global'][:6])
oname_yml = oname + '.yml'
oname_tif = oname + '.tif'
### Create partial report
expe_report = create_report(kronos)
### Stage expe
log.info('Staging test')
write_expe_file(STAGING_DIR / oname_yml, expe, expe_hashes, expe_report)
expe_file.unlink()
### Compute descriptors
log.info('Compute descriptors')
try:
descriptors = compute_descriptors(expe)
except Exception as e:
kronos.time('description')
expe_report = create_report(kronos)
(STAGING_DIR / oname_yml).unlink()
write_error(FAILED_DIR / oname_yml, expe, expe_hashes, expe_report, 'description', e)
raise TestError('Error occured during description')
kronos.time('description')
### Compute classification
log.info('Classify data')
try:
classification = compute_classification(expe, descriptors)
except Exception as e:
kronos.time('classification')
expe_report = create_report(kronos)
(STAGING_DIR / oname_yml).unlink()
write_error(FAILED_DIR / oname_yml, expe, expe_hashes, expe_report, 'classification', e)
raise TestError('Error occured during classification')
kronos.time('classification')
### Metrics
log.info('Run initial metrics')
metrics = run_metrics(expe, classification, descriptors)
kronos.time('metrics')
### Create complete report
log.info('Write complete report')
expe_report = create_report(kronos)
### Name and write prediction
triskele.write(RESULT_DIR / oname_tif, classification)
### Write report and results
(STAGING_DIR / oname_yml).unlink()
write_expe_file(RESULT_DIR / oname_yml, expe, expe_hashes, expe_report, oname_tif, metrics)
log.info('Test complete')
def write_error(file, expe, hashes=None, report=None, when='', e=Exception):
error = OrderedDict()
error['when'] = when
error['what'] = str(e)
error['where'] = traceback.format_exc()
with open(file, 'w') as of:
yaml.dump(OrderedDict({'expe': expe,
'expe_hashes': hashes,
'expe_report': report,
'expe_error': error}),
of, default_flow_style=False, encoding=None, allow_unicode=True)
def write_expe_file(file, expe, hashes=None, report=None, classification=None, results=None):
with open(file, 'w') as of:
yaml.dump(OrderedDict({'expe': expe,
'expe_hashes': hashes,
'expe_report': report,
'expe_classification': classification,
'expe_results': results}),
of, default_flow_style=False, encoding=None, allow_unicode=True)
def compute_hashes(expe):
glob = hashlib.sha1()
expe_hashes = OrderedDict()
for k in ['ground_truth', 'descriptors_script', 'cross_validation', 'classifier']:
v = str(expe[k]).encode('utf-8')
expe_hashes[k] = hashlib.sha1(v).hexdigest()
glob.update(v)
expe_hashes['global'] = glob.hexdigest()
return expe_hashes
def compute_descriptors(expe):
"""Compute descriptors from a standard expe recipe"""
script = expe['descriptors_script']
desc = importlib.import_module(script['name'])
#importlib.reload(Descriptors)
att = desc.run(**script['parameters'])
return att
def get_ground_truth(expe):
gt = triskele.read(expe['ground_truth'])
# Meta labeling
idx_map = np.arange(gt.max() + 1)
if 'meta_labels' in expe:
meta_idx = pd.read_csv(expe['meta_labels'])
idx = np.array(meta_idx['index'])
midx = np.array(meta_idx['metaclass_index'])
idx_map[idx] = midx
return idx_map[gt]
def compute_classification(expe, descriptors):
"""Read a standard expe recipe and descriptors, return the result classification"""
# Ground truth
gt = get_ground_truth(expe)
# CrossVal and ML
cv = expe['cross_validation']
cl = expe['classifier']
cross_val = getattr(importlib.import_module(cv['package']), cv['name'])
classifier = getattr(importlib.import_module(cl['package']), cl['name'])
prediction = np.zeros_like(gt)
for xt, xv, yt, yv, ti in cross_val(gt, descriptors, **cv['parameters']):
rfc = classifier(**cl['parameters'])
rfc.fit(xt, yt)
ypred = rfc.predict(xv)
prediction[ti] = ypred
return prediction
def compute_metrics(ground_truth, classification, descriptors):
"""Return dict of metrics for ground_truth and classification prediction in parameters"""
f = np.nonzero(classification)
pred = classification[f].ravel()
gt = ground_truth[f].ravel()
results = OrderedDict()
results['dimension'] = descriptors.shape[-1]
results['overall_accuracy'] = float(metrics.accuracy_score(gt, pred))
results['cohen_kappa'] = float(metrics.cohen_kappa_score(gt, pred))
return results
def run_metrics(expe, classification, descriptors):
"""Compute the metrics from a standard expe recipe and an given classification"""
### Extensible: meta-classes
gt = get_ground_truth(expe)
return compute_metrics(gt, classification, descriptors)
def create_report(kronos):
expe_report = OrderedDict()
expe_report['supervisor'] = os.uname()[1]
for timev, datek in zip((kronos.get_start_date(), kronos.get_end_date()), ('start_date', 'end_date')):
expe_report[datek] = datetime.datetime.fromtimestamp(timev).strftime('Le %d/%m/%Y à %H:%M:%S') if timev is not None else None
ressources = kronos.get_times()
ressources['ram'] = None
expe_report['ressources'] = ressources
return expe_report
def watch_folder():
log.info('Waiting for test')
while not list(TEST_DIR.glob('*.yml')):
time.sleep(10)
def main():
while(True):
try:
queue = update_queue()
except Exception:
log.error('Critical exception while updating work queue')
log.error(traceback.format_exc())
log.warning('Resuming')
continue
if not queue:
watch_folder()
continue
try:
run(queue.pop()['expe_file'])
except TestError:
log.warning('Test failed, error logged. Resuming')
except Exception:
log.error('Critical exception while running test. Resuming')
log.error(traceback.format_exc())
log.warning('Resuming')
continue
if __name__ == '__main__':
logger.setup_logging()
log.info('Starting supervisor')
setup_yaml()
main()