import time
import sqlite3
import re
from pathlib import Path
class _SQLite3Val():
def __init__(self, v):
self.v = v
def __str__(self):
if self.v is None:
return 'NULL'
elif isinstance(self.v, (int, float)):
return f"{self.v}"
else:
return f"'{self.v}'"
[docs]class JobsDB:
"""Database of jobs, currently implemented with sqlite. Saves essential information
on jobs, including local and remote id, local directory it's staged to, system
its been submitted to, and status.
Status: created - job has been created, but not yet submitted to run
submitted - job has been submitted to some queuing system
started - job has started running
succeeded - job has finished successfully, and results are available
failed - job has failed
died - job died without producing expected low level output
processed - job has been processed, and results no longer need to be saved
cleaned - job stage dir (local and remote) has been cleaned (large files overwritten or all files wiped)
"""
possible_status = ['created', 'submitted', 'started', 'succeeded', 'failed', 'died', 'processed', 'cleaned']
status_group = {'ongoing': ['created', 'submitted', 'started'],
'can_produce_results': ['created', 'submitted', 'started', 'succeeded', 'died']}
def _execute(self, cmd, retry_n=3, retry_wait=1):
for i in range(retry_n):
try:
with self.db:
res = self.db.execute(cmd)
return res
except sqlite3.OperationalError as exc:
if 'database is locked' in str(exc):
exc_str = f'{type(exc)} {exc}'
time.sleep(retry_wait)
else:
raise
except Exception as exc:
raise
raise RuntimeError(f'Repeatedly got {exc_str} from cmd {cmd}')
def __init__(self, db_filename):
"""Create JobsDB obect
Parameters
----------
db_filename: str
database file
"""
self.db_filename = db_filename
self.columns = ['id', 'name', 'from_dir', 'status', 'system', 'remote_id', 'remote_status', 'creation_time', 'status_time']
self.column_types = ['TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT', 'DATE', 'DATE']
for col_i, col in enumerate(self.columns):
setattr(self, col+'_col', col_i)
if Path(self.db_filename).exists():
# just connect to existing database
self.db = sqlite3.connect(db_filename)
# make sure database can be minimally accessed
try:
_ = self._execute("SELECT * FROM jobs")
except:
raise RuntimeError(f"Failed to read list of jobs from existing JobsDB file {self.db_filename}")
else:
# connect should create the file here
self.db = sqlite3.connect(db_filename)
# create actual database table
self._execute("CREATE TABLE jobs (" + ', '.join([c + ' ' + t for c, t in zip(self.columns, self.column_types)])+")")
[docs] def add(self, id, name, from_dir, status='created', system=None, remote_id=None, remote_status=None):
"""Add a job to the DB
Parameters
----------
id: str
unique id for job (fails if id already exists)
name: str
name for job
from_dir: str/Path
path to directory job is to run from
status: str, default 'created'
status of job
system: str, optional
system job is running on
remote_id: str, optional
remote id on system
remote_status: str, optional
remote status on system
"""
assert status in JobsDB.possible_status
rows = list(self._execute(f"SELECT * FROM jobs WHERE id = '{id}'"))
if len(rows) != 0:
raise ValueError(f"JobsDB trying to add job {id} which already exists")
self._execute(f'INSERT into jobs(id, name, from_dir, status, system, remote_id, remote_status, creation_time, status_time) '
f'values ({_SQLite3Val(id)}, {_SQLite3Val(name)}, {_SQLite3Val(from_dir)}, '
f'{_SQLite3Val(status)}, {_SQLite3Val(system)}, {_SQLite3Val(remote_id)}, {_SQLite3Val(remote_status)}, '
f'{_SQLite3Val(int(time.time()))}, {_SQLite3Val(None)})')
[docs] def remove(self, id):
"""Remove a job from the DB
Parameters
----------
id: str
unique id of job to remove
"""
rows = list(self._execute(f"SELECT * FROM jobs WHERE id = '{id}'"))
if len(rows) != 1:
raise ValueError(f"JobsDB trying to remove job {id}, found {len(rows)} such entries")
self._execute(f"DELETE FROM jobs WHERE id = '{id}'")
[docs] def update(self, id, /, **kwargs):
"""Update some field of job
Parameters
----------
id: str
unique id of job to update
from_dir, status, system, remote_id, remote_status: str
field(s) to update
"""
if 'status' in kwargs:
assert kwargs['status'] in JobsDB.possible_status
rows = list(self._execute(f"SELECT * FROM jobs WHERE id = '{id}'"))
if len(rows) != 1:
raise ValueError(f"JobsDB trying to update job {id}, found {len(rows)} such entries")
if 'status' in kwargs:
kwargs['status_time'] = int(time.time())
self._execute("UPDATE jobs SET " +
", ".join([f"{k}={_SQLite3Val(v)}" for k, v in kwargs.items()]) +
f" WHERE id = '{id}'")
[docs] def jobs(self, status=None, id=None, name=None, system=None, readable=True):
"""Iterate through jobs
Parameters
----------
status: str or list(str), default None
if not None, only report on jobs that match any status
id: str or list(str), default None
if present, include only jobs with id that match regexps in this list
name: str or list(str), default None
if present, include only jobs with name that matches regexps in this list
system: str or list(str), default None
if present, include only jobs with system in this list
Returns
-------
Iterator of dicts: Iterator of dicts with fields for all DB columns for each job that matches selection criteria.
"""
if isinstance(status, str):
status = [status]
if isinstance(id, str):
id = [id]
if isinstance(name, str):
name = [name]
if isinstance(system, str):
system = [system]
if status is not None:
assert all([stat in JobsDB.possible_status or stat in JobsDB.status_group for stat in status])
def _col_match(col_res, col_val):
try:
return col_res is None or any([col_val is not None and re.search('^' + col_re + '$', col_val) for col_re in col_res])
except re.error as exc:
raise ValueError(f"Bad regexp in {col_res}") from exc
# do selection in python, not SQL query
for row in self._execute('SELECT * FROM jobs'):
if (_col_match(id, row[self.id_col]) and
_col_match(system, row[self.system_col]) and
_col_match(name, row[self.name_col]) and
(status is None or
row[self.status_col] in status or
any([row[self.status_col] in JobsDB.status_group.get(stat_grp, []) for stat_grp in status]))):
row = {k: v for k, v in zip(self.columns, row)}
if readable:
if row['creation_time'] is not None:
row['creation_time'] = time.strftime('%Y-%m-%d %X', time.localtime(row['creation_time']))
if row['status_time'] is not None:
row['status_time'] = time.strftime('%Y-%m-%d %X', time.localtime(row['status_time']))
yield row
def __str__(self):
s = f'JobsDB {self.db_filename}\n' + ' '.join(self.columns)
jobs = list(self.jobs())
if len(jobs) > 0:
s += ('\n--------------------\n' +
'\n'.join([str(j) for j in jobs]))
s += '\n--------------------'
return s
[docs] def unlock(self):
"""unlocks the sqlite database"""
# create tmp
tmp_db_file = self.db_filename.parent / (self.db_filename.name + '.tmp')
new_db = sqlite3.connect(tmp_db_file)
# do backup
with self.db:
self.db.backup(new_db)
# rename
self.db.close()
self.db_filename.rename(self.db_filename.parent / (self.db_filename.name + '.old'))
tmp_db_file.rename(self.db_filename)
# reinit saved pointers
self.db = sqlite3.connect(self.db_filename)