minigrida/minigrida/supervisor.py
2020-05-30 18:39:46 +02:00

120 lines
3.5 KiB
Python

#!/usr/bin/python
# -*- coding: utf-8 -*-
# \file supervisor.py
# \brief TODO
# \author Florent Guiotte <florent.guiotte@gmail.com>
# \version 2.1
# \date 07 sept. 2018
#
# TODO details
import importlib
import time
import os
from datetime import datetime
import traceback
import logging
import logger
from protocols.protocol import TestError
import database
from multiprocessing import Process
import json
import argparse
host = os.uname()[1]
log = logging.getLogger('Supervisor [{}]'.format(host))
parser = argparse.ArgumentParser(description='Run minigrida supervisor')
parser.add_argument('--conf',
metavar='config',
default='config.json',
type=str,
help='the path to the supervisor\'s config file.')
parser.add_argument('--cred',
metavar='credentials',
default='credentials.json',
type=str,
help='the path to the DB credentials file.')
args = parser.parse_args()
def run(expe, hostpid=host):
database.update_experiment(expe,
worker=hostpid,
status='running',
start_date=datetime.now())
# Load protocol
log.info('Load protocol {}'.format(expe.protocol))
protocol_module = importlib.import_module('protocols')
importlib.reload(protocol_module)
protocol = getattr(protocol_module, expe.protocol)
test = protocol(expe.expe)
# Run test
try:
test.run()
except Exception as e:
err = 'Experience error'
log.warning(err)
report = {'error': {'name': str(err),
'trace': traceback.format_exc()}}
database.update_experiment(expe, report=report, status='error')
raise TestError(err)
# Write report
log.info('Write report')
database.update_experiment(expe,
end_date=datetime.now(),
oa=test.oa,
aa=test.aa,
k=test.k,
report=test.get_results(),
ressources={
'process_time': test.get_process_time()
},
status='complete')
# End of test
log.info('Expe {} complete'.format(expe.expe_hash))
def main(pid=None):
hostpid = host + '_' + str(pid) if pid is not None else host
log.name = 'Supervisor [{}]'.format(hostpid)
log.info('Connecting to database')
database.connect(args.cred)
while(True):
if not database.pending_experiments():
log.info('No pending experiments, waiting...')
time.sleep(30)
else:
log.info('Loading next experiment')
expe = database.next_experiment()
if not expe:
continue
log.info('Expe {} loaded'.format(expe.expe_hash))
try:
run(expe, hostpid)
except Exception:
log.exception('Error occured on expe {}'.format(expe.id))
time.sleep(60)
if __name__ == '__main__':
logger.setup_logging()
log.info('Starting supervisor')
try:
with open(args.conf) as f:
config = json.load(f)
process_count = config['process_count']
except Exception as e:
log.warning(e)
process_count = 1
for i in range(process_count):
Process(target=main, args=(i,)).start()
time.sleep(1)