diff --git a/config.json b/config.json new file mode 100644 index 0000000..8e895e8 --- /dev/null +++ b/config.json @@ -0,0 +1,3 @@ +{ + "process_count": 2 +} diff --git a/minigrida/database/helpers.py b/minigrida/database/helpers.py index 36e6354..7c95ba6 100644 --- a/minigrida/database/helpers.py +++ b/minigrida/database/helpers.py @@ -57,7 +57,8 @@ def create_session(name, desc, project_name, urgency=1): def pending_experiments(): return Experiment.select(lambda x: x.status == 'pending').exists() -@orm.db_session + +@orm.db_session(serializable=True, optimistic=False) def next_experiment(): # TODO: take session urgency into account expe = orm.select(e for e in Experiment @@ -79,9 +80,10 @@ def update_experiment(expe, **params): log.info('Retry update') _update_experiment(expe, **params) + @orm.db_session def _update_experiment(expe, **params): - e = Experiment.select(lambda x: x.id == expe.id).first() + e = Experiment.get_for_update(id=expe.id) for k, v in params.items(): setattr(e, k, v) diff --git a/minigrida/supervisor.py b/minigrida/supervisor.py index 1b9ed3e..e38cb7a 100644 --- a/minigrida/supervisor.py +++ b/minigrida/supervisor.py @@ -17,13 +17,15 @@ import logging import logger from protocols.protocol import TestError import database +from multiprocessing import Process +import json host = os.uname()[1] log = logging.getLogger('Supervisor [{}]'.format(host)) -def run(expe): - database.update_experiment(expe, worker=host, start_date=datetime.now()) +def run(expe, hostpid=host): + database.update_experiment(expe, worker=hostpid, start_date=datetime.now()) # Load protocol log.info('Load protocol {}'.format(expe.protocol)) @@ -57,7 +59,10 @@ def run(expe): log.info('Expe {} complete'.format(expe.expe_hash)) -def main(): +def main(pid=None): + hostpid = host + '_' + str(pid) if pid is not None else host + log.name = 'Supervisor [{}]'.format(hostpid) + log.info('Connecting to database') database.connect('credentials.json') @@ -73,7 +78,7 @@ def main(): continue log.info('Expe {} loaded'.format(expe.expe_hash)) try: - run(expe) + run(expe, hostpid) except Exception as e: log.error(e) log.error('Error occured on expe {}'.format(expe.id)) @@ -83,5 +88,13 @@ def main(): if __name__ == '__main__': logger.setup_logging() log.info('Starting supervisor') + try: + with open('config.json') as f: + config = json.load(f) + process_count = config['process_count'] + except Exception as e: + log.warning(e) + process_count = 1 - main() + for i in range(process_count): + Process(target=main, args=(i,)).start()