This commit is contained in:
Florent Guiotte 2020-05-27 09:36:37 +02:00
parent d0ceae758a
commit c967d76516
3 changed files with 25 additions and 7 deletions

3
config.json Normal file
View File

@ -0,0 +1,3 @@
{
"process_count": 2
}

View File

@ -57,7 +57,8 @@ def create_session(name, desc, project_name, urgency=1):
def pending_experiments(): def pending_experiments():
return Experiment.select(lambda x: x.status == 'pending').exists() return Experiment.select(lambda x: x.status == 'pending').exists()
@orm.db_session
@orm.db_session(serializable=True, optimistic=False)
def next_experiment(): def next_experiment():
# TODO: take session urgency into account # TODO: take session urgency into account
expe = orm.select(e for e in Experiment expe = orm.select(e for e in Experiment
@ -79,9 +80,10 @@ def update_experiment(expe, **params):
log.info('Retry update') log.info('Retry update')
_update_experiment(expe, **params) _update_experiment(expe, **params)
@orm.db_session @orm.db_session
def _update_experiment(expe, **params): def _update_experiment(expe, **params):
e = Experiment.select(lambda x: x.id == expe.id).first() e = Experiment.get_for_update(id=expe.id)
for k, v in params.items(): for k, v in params.items():
setattr(e, k, v) setattr(e, k, v)

View File

@ -17,13 +17,15 @@ import logging
import logger import logger
from protocols.protocol import TestError from protocols.protocol import TestError
import database import database
from multiprocessing import Process
import json
host = os.uname()[1] host = os.uname()[1]
log = logging.getLogger('Supervisor [{}]'.format(host)) log = logging.getLogger('Supervisor [{}]'.format(host))
def run(expe): def run(expe, hostpid=host):
database.update_experiment(expe, worker=host, start_date=datetime.now()) database.update_experiment(expe, worker=hostpid, start_date=datetime.now())
# Load protocol # Load protocol
log.info('Load protocol {}'.format(expe.protocol)) log.info('Load protocol {}'.format(expe.protocol))
@ -57,7 +59,10 @@ def run(expe):
log.info('Expe {} complete'.format(expe.expe_hash)) log.info('Expe {} complete'.format(expe.expe_hash))
def main(): def main(pid=None):
hostpid = host + '_' + str(pid) if pid is not None else host
log.name = 'Supervisor [{}]'.format(hostpid)
log.info('Connecting to database') log.info('Connecting to database')
database.connect('credentials.json') database.connect('credentials.json')
@ -73,7 +78,7 @@ def main():
continue continue
log.info('Expe {} loaded'.format(expe.expe_hash)) log.info('Expe {} loaded'.format(expe.expe_hash))
try: try:
run(expe) run(expe, hostpid)
except Exception as e: except Exception as e:
log.error(e) log.error(e)
log.error('Error occured on expe {}'.format(expe.id)) log.error('Error occured on expe {}'.format(expe.id))
@ -83,5 +88,13 @@ def main():
if __name__ == '__main__': if __name__ == '__main__':
logger.setup_logging() logger.setup_logging()
log.info('Starting supervisor') log.info('Starting supervisor')
try:
with open('config.json') as f:
config = json.load(f)
process_count = config['process_count']
except Exception as e:
log.warning(e)
process_count = 1
main() for i in range(process_count):
Process(target=main, args=(i,)).start()