Source code for expyre.func

"""Interface for remotely running python functions.  It pickles the function and
    its arguments, stages files (pickles as well as additional requested input files)
    to a local directory, submits to remote System (which stages out inputs, submits
    on remote queuing system, and stages outputs back in), and then unpickles results
    to return function result.  It also creates/updates the JobsDB entry for the job
    as it does these steps.

    Possible status (see JobsDB): created, submitted, started, succeeded/failed, processed, cleaned
"""
import sys
import os
import time
import itertools
import re
import warnings

import shutil
import tempfile
try:
    # use dill if available so that things like lambdas can be pickled
    import dill as pickle
except:
    import pickle
import hashlib
import base64

from pathlib import Path

from . import config
from .subprocess import subprocess_run
from .resources import Resources
from .jobsdb import JobsDB
from .units import time_to_sec

[docs]class ExPyReJobDiedError(Exception): """Exception that is raised when ExPyRe remote job appears to have been killed for reasons other than the python process raising an exception, e.g. if it was out of time in the queuing system. """ pass
[docs]class ExPyReTimeoutError(TimeoutError): """Exception raised when ExPyRe gave up waiting for a job to finish because it exceeded the timeout value """ pass
[docs]class ExPyRe: """ Create Queued Remote Function object, pickles function and inputs, stores files in local stage dir, and adds to job database. Parameters ---------- name: str name of job input_files: list(str | Path), optional input files that function will need. Relative paths without '..' path components files will be copied to same path relative to remote rundir on remote machine. Absolute paths will result in files copied into top level remote rundir env_vars: list(str), optional list of env vars to set in remote queuing script pre_run_commands: list(str), optional list of commands, one per line, to run at start of remote queuing script, after commands in config.json that are used for system post_run_commands: list(str), optional list of commands, one per line, to run at end of remote queuing script, after actual task output_files: list(str), optional output files to be copied back after evaluation is done try_restart_from_prev: bool, default True try to restart from previous call, based on hash of function, arguments, and input files hash_ignore: list(int or str), optional args elements (int) or kwargs items (str) to ignore when making hash to determine if run is identical to some previous one function: Callable function to call args: list positional arguments to function kwargs: dict keyword arguments to function _from_db_info: dict, optional (intended for internal use) restart is from db, and dict contains special arguments: remote_id, system_name, status, stage_dir Possible status (see JobsDB): created, submitted, started, succeeded/failed/died, processed, cleaned """ def __init__(self, name, *, input_files=[], env_vars=[], pre_run_commands=[], post_run_commands=[], output_files=[], try_restart_from_prev=True, hash_ignore=[], function=None, args=[], kwargs={}, _from_db_info=None): if len(config.systems) == 0 or config.db is None: raise RuntimeError('Configuration file was not found, ExPyRe object cannot be created') # name will be used as part of path, can't have a few special things assert ('/' not in name and '[' not in name and ']' not in name and '{' not in name and '}' not in name and '*' not in name and '\\' not in name) if 'EXPYRE_TIMING_VERBOSE' in os.environ: sys.stderr.write(f'ExPyRe {name} constructor start {time.time()}\n') # arguments that are used when creating from a database entry if _from_db_info is not None: self.remote_id = _from_db_info['remote_id'] self.system_name = _from_db_info['system_name'] self.status = _from_db_info['status'] self.stage_dir = Path(_from_db_info['stage_dir']) return assert function is not None self.remote_id = None self.system_name = None self.status = 'created' # self.stage_dir is set below kwargs = kwargs.copy() input_files = [Path(f) for f in input_files] env_vars = env_vars.copy() pre_run_commands = pre_run_commands.copy() post_run_commands = post_run_commands.copy() # check for valid input/output filenames for f in input_files: if not f.is_absolute() and any([p == '..' for p in f.parts]): raise ValueError(f'Input path with ".." in input file "{f}" not supported') for f in output_files: if f.startswith('/'): raise ValueError(f'Absolute output path "{f}" not supported') # pickle function and arguments pickled_func = pickle.dumps((function, args, kwargs)) if 'EXPYRE_TIMING_VERBOSE' in os.environ: sys.stderr.write(f'ExPyRe {name} constructor starting hash {time.time()}\n') # hash for unique identifier h = hashlib.sha256() # hash on function h.update((function.__module__ + function.__name__).encode()) # hash on args, possibly ignoring some assert all([isinstance(arg, (int, str)) for arg in hash_ignore]) for arg_i, arg in enumerate(args): if arg_i not in hash_ignore: h.update(pickle.dumps(arg)) for arg_key in sorted(list(kwargs)): if arg_key not in hash_ignore: h.update(pickle.dumps((arg_key, kwargs[arg_key]))) # hash on input filenames and content for infile in input_files: subfiles = [infile] if infile.is_dir(): subfiles += sorted(infile.rglob('*')) for subfile in subfiles: # filename h.update(str(subfile).encode()) if subfile.is_file(): # file contents with open(subfile, 'rb') as fin: h.update(fin.read()) # create deterministic unique identifier that can be part of filename arghash = base64.urlsafe_b64encode(h.digest()).decode() self.recreated = False # check if this task was successfully run before with matching id (name + arghash) if try_restart_from_prev: # NOTE: following loop will not match status == 'processed' because such jobs are # not guaranteed to have results available. Is it a good idea to try to use those, # if results actually seem to be available? for job in config.db.jobs(status='can_produce_results', id=re.escape(f'{name}_{arghash}') + '_.*'): old_stage_dir = Path(job['from_dir']) # this also never happen if job['status'] == 'succeeded' and not (old_stage_dir / '_expyre_job_succeeded').exists(): raise RuntimeError(f'Found previously run job with matching id "{job["id"]}" ' f'and status{job["status"]} but _succeeded file does not exist') # confirm that name is consistent, which it has to be since it's part of id assert name == job['name'] # reconstruct job self.stage_dir = old_stage_dir # confirm that JobsDB id and self.id match, as they must be since self.id # is derived from self.stage_dir (that's why self.stage_dir has to be set # first) but they are stored separately in JobsDB assert self.id == job['id'] # save remaining attributes self.status = job['status'] self.remote_id = job['remote_id'] self.system_name = job['system'] # set flag in case later routines need to treat it specially self.recreated = True return # didn't find an old run, need to create new and unique stage dir self.stage_dir = Path(tempfile.mkdtemp(prefix=f'run_{name}_{arghash}_', dir=config.local_stage_dir)) # id is property derived from stage dir, so it is also unique if 'EXPYRE_TIMING_VERBOSE' in os.environ: sys.stderr.write(f'ExPyRe {name} constructor starting pickle {time.time()}\n') # write pickled function and arguments with open(self.stage_dir / '_expyre_task_in.pckl', 'wb') as fout: fout.write(pickled_func) if len(output_files) > 0: with open(self.stage_dir / '_expyre_output_files', 'w') as fout: fout.write('\n'.join(output_files) + '\n') if 'EXPYRE_TIMING_VERBOSE' in os.environ: sys.stderr.write(f'ExPyRe {name} constructor starting stage in files {time.time()}\n') # stage in input files # NOTE: does this require more thought? for f in input_files: if f.is_absolute(): # copy absolute path into stage_dir stripping all leading components ExPyRe._copy(None, self.stage_dir, f) else: ExPyRe._copy(Path.cwd(), self.stage_dir, f) sys.stderr.write(f'ExPyRe {name} constructor done stage in files {time.time()}\n') # script commands to touch file indicating job has started pre_run_commands = ['touch _expyre_job_started', '('] + pre_run_commands # commands to set extra remote env vars for env_var in env_vars: if '=' in env_var: # form is already var=value pre_run_commands.append(f'export {env_var}') else: # get value from current environment pre_run_commands.append(f'export {env_var}={os.environ["env_var"]}') # save to file in stage dir with open(self.stage_dir / '_expyre_pre_run_commands', 'w') as fout: fout.write('\n'.join(pre_run_commands) + '\n') # Below always write to temporary file and then mv to try to make creation of final files more atomic # create core of remote job script with open(self.stage_dir / '_expyre_script_core.py', 'w') as fout: fout.write('try:\n' ' import pickle, traceback, sys\n' ' with open("_expyre_task_in.pckl", "rb") as fin:\n' ' (function, args, kwargs) = pickle.load(fin)\n' ' stdout_orig = sys.stdout\n' ' stderr_orig = sys.stderr\n' ' sys.stdout = open("_expyre_stdout", "w")\n' ' sys.stderr = open("_expyre_stderr", "w")\n' ' results = function(*args, **kwargs)\n' ' sys.stdout = stdout_orig\n' ' sys.stderr = stderr_orig\n' ' with open(f"_tmp_expyre_job_succeeded", "wb") as fout:\n' ' pickle.dump(results, fout)\n' 'except Exception as exc:\n' ' with open(f"_expyre_exception", "wb") as fout:\n' ' pickle.dump(exc, fout)\n' ' with open(f"_expyre_error", "w") as fout:\n' ' fout.write(f"Exception: {exc}\\n")\n' ' traceback.print_exc(file=fout)\n' ' raise\n') # commands to check status and create final _succeeded or _error files post_run_commands = (['error_stat=$?'] + post_run_commands + ['exit $error_stat', ')', 'error_stat=$?', 'if [ $error_stat == 0 ]; then', ' if [ -e _tmp_expyre_job_succeeded ]; then', ' mv _tmp_expyre_job_succeeded _expyre_job_succeeded', ' else', ' echo "No error code but _tmp_expyre_job_succeeded does not exist" > _tmp_expyre_job_error', ' if [ -f _expyre_error ]; then', ' cat _expyre_error >> _tmp_expyre_job_error', ' fi', ' mv _tmp_expyre_job_error _expyre_job_error', ' fi', 'else', ' if [ -e _expyre_exception ]; then', ' mv _expyre_exception _expyre_job_exception', ' fi', ' if [ -e _expyre_error ]; then', ' mv _expyre_error _expyre_job_error', ' else', ' echo "ERROR STATUS FROM python $error_stat" > _tmp_expyre_job_error', ' mv _tmp_expyre_job_error _expyre_job_error', ' fi', 'fi', '']) # save to file in stage dir with open(self.stage_dir / '_expyre_post_run_commands', 'w') as fout: fout.write('\n'.join(post_run_commands) + '\n') config.db.add(self.id, name=name, from_dir=str(self.stage_dir), status=self.status) if 'EXPYRE_TIMING_VERBOSE' in os.environ: sys.stderr.write(f'ExPyRe {name} constructor end {time.time()}\n') @staticmethod def _copy(in_dir, out_dir, file_glob): """ Copy files from in_dir to out_dir, including globs in filenames. If file_glob is absolute, file or directory is copied into out_dir with all of file's leading path components remove, i.e. file_glob -> out_dir / file_glob.name Otherwise, file_glob is copied into out_dir with all its (relative) components preserved, i.e. in_dir / file_glob -> out_dir / file_glob Parameters ---------- in_dir: str or Path input directory, None if file_glob is absolute out_dir: str or Path output directory file_glob: str or Path glob of file or directory to copy (recursively), absolute iff in_dir is None """ out_dir = Path(out_dir) file_glob = Path(file_glob) exclude_glob = None if file_glob.is_absolute(): strip_leading = True assert in_dir is None # make in_dir root and file_glob relative to that, so that in_dir.glob(file_glob) works in_dir = Path(file_glob.root) file_glob = str(file_glob).replace(str(file_glob.root), '', 1) else: strip_leading = False assert in_dir is not None in_dir = Path(in_dir) assert in_dir.is_dir() if str(file_glob) == '.': # don't stage back internel expyre files exclude_glob = '_expyre*' # for some reason Path.glob('.') gives an error file_glob = '*' in_files = list(in_dir.glob(str(file_glob))) if exclude_glob is not None: excluded_files = list(in_dir.glob(str(exclude_glob))) else: excluded_files = [] in_files = [f for f in in_files if f not in excluded_files] if len(in_files) == 0: raise RuntimeError(f'File glob "{file_glob}" (excluding {exclude_glob}) in input_files does not match any files') for in_file in in_files: if strip_leading: rel_out_file = in_file.name else: rel_out_file = in_file.relative_to(in_dir) out_file = out_dir / rel_out_file out_file.parent.mkdir(parents=True, exist_ok=True) try: shutil.copytree(in_file, out_file, dirs_exist_ok=True) except NotADirectoryError: shutil.copy(in_file, out_file) @property def id(self): return self.stage_dir.name.replace('run_', '', 1)
[docs] @staticmethod def from_jobsdb(db_jobs): """Create a list of ExPyRe objects from JobsDB records Parameters ---------- db_jobs: dict or list(dict) one or more jobs records returned from ``JobsDB.jobs()`` Returns ------- list(ExPyRe) created objects """ if isinstance(db_jobs, dict): db_jobs = [db_jobs] expyres = [] for j in db_jobs: assert isinstance(j, dict) expyres.append(ExPyRe(name=j['name'], _from_db_info={'remote_id': j['remote_id'], 'system_name': j['system'], 'stage_dir': j['from_dir'], 'status': j['status']})) return expyres
[docs] def cancel(self, verbose=False): if self.remote_id is None: return if self.status in JobsDB.status_group['ongoing']: try: config.systems[self.system_name].scheduler.cancel(self.remote_id, verbose=verbose) except Exception: pass
[docs] def start(self, resources, system_name=os.environ.get('EXPYRE_SYS', None), header_extra=[], exact_fit=True, partial_node=False, python_cmd='python3', force_rerun=False): """Start a job on a remote machine Parameters ---------- resoures: dict or Resources resources to use for job, either Resources or dict of Resources constructor kwargs system_name: str name of system in config.systems header_extra: list(str), optional list of lines to add to queuing system header, appended to System.header[] exact_fit: bool, default True use only nodes that exactly match number of tasks partial_node: bool, default False allow jobs that take less than an entire node python_cmd: str, default python3 name of python interpreter to use on remote machine force_rerun: bool, default False force a rerun even if self.status is not "created" """ if not force_rerun and self.status != 'created': # If job is not newly created, return instead of resubmitting # First check that it is newly recreated, otherwise this shouldn't be happening assert self.recreated return if force_rerun: # make sure remote dir is gone, otherwise submission below will fail self.clean(wipe=True, remote_only=True) if 'EXPYRE_TIMING_VERBOSE' in os.environ: sys.stderr.write(f'ExPyRe {self.id} start() start {time.time()}\n') if isinstance(resources, dict): resources = Resources(**resources) self.system_name = system_name system = config.systems[self.system_name] with open(self.stage_dir / '_expyre_pre_run_commands') as fin: pre_run_commands = fin.readlines() with open(self.stage_dir / '_expyre_post_run_commands') as fin: post_run_commands = fin.readlines() if 'EXPYRE_TIMING_VERBOSE' in os.environ: sys.stderr.write(f'ExPyRe {self.id} start() calling system.submit {time.time()}\n') self.remote_id = system.submit(self.id, self.stage_dir, resources=resources, header_extra=header_extra, commands=(pre_run_commands + [f'{python_cmd} _expyre_script_core.py'] + post_run_commands), exact_fit=exact_fit, partial_node=partial_node) if 'EXPYRE_TIMING_VERBOSE' in os.environ: sys.stderr.write(f'ExPyRe {self.id} start() done system.submit {time.time()}\n') self.status = 'submitted' config.db.update(self.id, status=self.status, system=self.system_name, remote_id=self.remote_id) # make sure remote status is not done, so get_results() actually syncs new status before giving up if force_rerun: config.db.update(self.id, remote_status=None) if 'EXPYRE_TIMING_VERBOSE' in os.environ: sys.stderr.write(f'ExPyRe {self.id} start() end {time.time()}\n')
[docs] def sync_remote_results_status(self, sync_all=True, force_sync=False, verbose=False): """Sync files associated with results from remote machine to local stage dirs and updates 'remote_status' in jobsdb. Note that both have to happen because other functions assume that if remote status has been updated files have been staged back as well. Parameters ---------- sync_all: bool, default True sync files for all jobs on same system, to minimize number of calls to remote copy force_sync: bool, default False sync files even if this job's status indicates that this has already been done verbose: bool, default False verbose output """ # sync all jobs if sync_all: job_id = None else: job_id = re.escape(self.id) # sync even if already done if force_sync: status = None else: status = 'ongoing' jobs_to_sync = list(config.db.jobs(system=self.system_name, id=job_id, status=status)) ExPyRe._sync_remote_results_status_ll(jobs_to_sync, verbose=verbose)
@classmethod def _sync_remote_results_status_ll(cls, jobs_to_sync, n_group=250, cli=False, delete=False, verbose=False): """Low level part of syncing jobs. Gets remote files _and_ updates 'remote_status' field in jobsdb. Note that both have to happen because other functions assume that if remote status has been updated files have been staged back as well. Parameters ---------- cls: class class for classmethod (unused) jobs_to_sync: list(dict) list of job dicts returned by jobsdb.jobs() n_group: int, default 250 number of jobs to do in a group with each rsync call cli: bool, default False command is being from from cli 'xpr sync' delete: bool, default False delete local files that are not in remote dir verbose: bool, default False verbose output """ if len(jobs_to_sync) == 0: return def _grouper(n, iterable): it = iter(iterable) while True: chunk = tuple(itertools.islice(it, n)) if not chunk: return yield chunk for system_name in set([j['system'] for j in jobs_to_sync]): system = config.systems[system_name] # assume all jobs are staged from same place stage_root = Path(jobs_to_sync[0]['from_dir']).parent # get remote statuses and update in JobsDB status_of_remote_id = system.scheduler.status([j['remote_id'] for j in jobs_to_sync], verbose=verbose) for j in jobs_to_sync: old_remote_status = list(config.db.jobs(id=j['id']))[0]['remote_status'] new_remote_status = status_of_remote_id[j['remote_id']] if old_remote_status != new_remote_status: if cli: sys.stderr.write(f'Update remote status of {j["id"]} to {status_of_remote_id[j["remote_id"]]}\n') config.db.update(j['id'], remote_status=status_of_remote_id[j['remote_id']]) # get remote files only _AFTER_ getting remote status, since otherwise might result in # a race condition: # copy files while job is running, so not all files are ready # while files are being copied, job finishes (but some files were not copied) # update status, showing job as done (despite missing files) for job_group in _grouper(n_group, jobs_to_sync): system.get_remotes(stage_root, subdir_glob=[Path(j['from_dir']).name for j in job_group], delete=delete, verbose=verbose)
[docs] def clean(self, wipe=False, dry_run=False, remote_only=False, verbose=False): """clean the local and remote stage directories Parameters ---------- wipe: bool, default False wipe directory completely, opposed to just writing CLEANED into files that could be large like python function input and output (NOTE: other staged in files or files that are created are not cleaned if wipe=False) dry_run: bool, default False dry run only, print what will happen but do not actually delete or overwrite anything remote_only: bool, default False wipe only remote dir (ignored when wipe is False) verbose: bool, default False verbose output """ if self.system_name is not None: system = config.systems[self.system_name] else: system = None if wipe: if system is not None: # delete remote stage dir system.clean_rundir(self.stage_dir, None, dry_run=dry_run, verbose=verbose or dry_run) # delete local stage dir if not remote_only: subprocess_run(None, ['find', str(self.stage_dir), '-type', 'd', '-exec', 'chmod', 'u+rwx', '{}', '\\;'], dry_run=dry_run, verbose=verbose or dry_run) subprocess_run(None, ['rm', '-rf', str(self.stage_dir)], dry_run=dry_run, verbose=verbose or dry_run) else: if system is not None: # clean remote stage dir system.clean_rundir(self.stage_dir, ['_expyre_task_in.pckl', '_expyre_job_succeeded'], dry_run=dry_run, verbose=verbose or dry_run) if dry_run: print(f"dry-run overwrite local dirs {self.stage_dir / '_expyre_task_in.pckl'} and " f"{self.stage_dir / '_expyre_job_succeeded'}, and create " f"{self.stage_dir / '_expyre_job_cleaned'}") else: # clean local stage dir with open(self.stage_dir / '_expyre_task_in.pckl', 'w') as fout: fout.write('CLEANED\n') f = self.stage_dir / '_expyre_job_succeeded' if f.exists(): with open(f, 'w') as fout: fout.write('CLEANED\n') with open(self.stage_dir / '_expyre_job_cleaned', 'w') as fout: fout.write('CLEANED\n') if not dry_run: self.status = 'cleaned' config.db.update(self.id, status=self.status)
def _read_stdout_err(self): """Read all stdout and stderr files, from python run and from submitted job Returns ------- stdout, stderr, job_stdout, job_stderr: str """ try: with open(self.stage_dir / '_expyre_stdout') as fin: stdout = fin.read() except: stdout = None try: with open(self.stage_dir / '_expyre_stderr') as fin: stderr = fin.read() except: stderr = None try: with open(self.stage_dir / f'job.{self.id}.stdout') as fin: job_stdout = fin.read() except: job_stdout = None try: with open(self.stage_dir / f'job.{self.id}.stderr') as fin: job_stderr = fin.read() except: job_stderr = None return stdout, stderr, job_stdout, job_stderr
[docs] def get_results(self, timeout=3600, check_interval=30, sync=True, sync_all=True, force_sync=False, quiet=False, verbose=False): """Get results from a remote job Parameters ---------- timeout: int or str, default 3600 Max time (in sec if int, time spec if str) to wait for job to complete, None or int <= 0 to wait forever check_interval: int, default 30 Time to wait (in sec) between checks of job completion sync: bool, default True Synchronize remote files before checking for results Note that if this is False and job is finished on remote system but output files haven't been previously synchronize, this will wait one check_interval then raise an error sync_all: bool, default True Sync files from all jobs (not just this one), to reduce number of separate remote copy calls force_sync: bool, default False Sync remote files even if job's DB status indicates that this was already done. Note: together with sync_all this can lead to rsync having to compare many files. quiet: bool, default False No progress info verbose: bool, default False Verbose output (from remote system/scheduler commands) Returns ------- return, stdout, stderr: * value of function * string containing stdout during function * string containing stderr during function """ if self.status == 'processed' or self.status == 'cleaned': raise RuntimeError(f'Job {self.id} has status {self.status}, results are no longer available') timeout = time_to_sec(timeout) system = config.systems[self.system_name] start_time = time.time() problem_last_chance = False out_of_time = False n_iter = 0 # this is a messy state machine - only fairly sure that there are no deadlocks or infinite loops while True: # sync remote results and status. This used to be inside test for status and/or succeeded/failed # file existence, but that's probably not useful. If we're in this loop we have to sync, since if # sync isn't needed, loop should have been exited on previous iteration # get remote status of job from db (either unset or was set by call to # self.sync_remote_results_status() in previous iter) # remote_status values: queued, held, running, done, failed, timeout, other remote_status = list(config.db.jobs(id=re.escape(self.id)))[0]['remote_status'] if remote_status != 'done': # If previous status was not 'done', need to sync remote status. # If it was pre-done, we obviously need current state and results. # If it was something else (even 'failed'), lets try again just in case it needed # more time or was fixed manually. self.sync_remote_results_status(sync_all, force_sync, verbose=verbose) remote_status = list(config.db.jobs(id=re.escape(self.id)))[0]['remote_status'] # poke filesystem, since on some machines Path.exists() fails even if file appears to be there when doing ls _ = list(self.stage_dir.glob('_expyre_job_*')) # read all text output stdout, stderr, job_stdout, job_stderr = self._read_stdout_err() # update state depending on presence of various progress files and remote status if (self.stage_dir / '_expyre_job_succeeded').exists(): # job created final succeeded file assert remote_status not in ['queued', 'held'] try: with open(self.stage_dir / '_expyre_job_succeeded', 'rb') as fin: results = pickle.load(fin) except Exception as exc: raise RuntimeError(f'Job {self.id} got "_succeeded" file, but failed to parse it with error {exc}\n' f'stdout: {stdout}\nstderr: {stderr}\njob stdout: {job_stdout}\njob stderr: {job_stderr}') self.status = 'succeeded' elif (self.stage_dir / '_expyre_job_error').exists(): # job created final failed file assert remote_status not in ['queued', 'held'] with open(self.stage_dir / '_expyre_job_error') as fin: error_msg = fin.read() self.status = 'failed' else: if (self.stage_dir / '_expyre_job_started').exists(): self.status = 'started' # job does not _appear_ to have finished if remote_status not in ['queued', 'held', 'running']: # problem - job does not seem to be queued (even held) or running if problem_last_chance: # already on last chance, giving up self.status = 'died' config.db.update(self.id, status=self.status) raise ExPyReJobDiedError(f'Job {self.id} has remote status {remote_status} but no _succeeded or _error\n' f'stdout: {stdout}\nstderr: {stderr}\n' f'job stdout: {job_stdout}\njob stderr: {job_stderr}\n') # give it one more chance, perhaps queuing system status and file are slow to sync to head node warnings.warn(f'Job {self.id} has no _succeeded or _error file, but remote status {remote_status} is ' 'not "queued", "held", or "running". Giving it one more chance.') problem_last_chance = True else: # No apparent problem, just not done yet, leave status as is, but check for timeout if out_of_time: if not quiet and n_iter > 0: sys.stderr.write('\n') sys.stderr.flush() raise ExPyReTimeoutError # update status in database config.db.update(self.id, status=self.status) # return if succeeded or failed if self.status == 'succeeded': # stage out remotely created files # should we do this for failed calls? if (self.stage_dir / '_expyre_output_files').exists(): with open(self.stage_dir / '_expyre_output_files') as fin: for in_file in [f.replace('\n', '') for f in fin.readlines()]: ExPyRe._copy(self.stage_dir, Path.cwd(), in_file) if not quiet and n_iter > 0: # newline after one or more 'q|r' progress characters sys.stderr.write('\n') sys.stderr.flush() return results, stdout, stderr elif self.status == 'failed': if not quiet and n_iter > 0: # newline after one or more 'q|r' progress characters sys.stderr.write('\n') sys.stderr.flush() if (self.stage_dir / "_expyre_job_exception").is_file(): # reraise python exception that caused job to fail with open(self.stage_dir / "_expyre_job_exception", "rb") as fin: exc = pickle.load(fin) sys.stderr.write(f'Remote job {self.id} failed with exception ' f'error_msg {error_msg}\n' f'stdout: {stdout}\nstderr: {stderr}\n' f'job stdout: {job_stdout}\njob stderr: {job_stderr}') raise exc else: raise ExPyReJobDiedError(f'Remote job {self.id} failed with no exception but remote status {remote_status} ' f'error_msg {error_msg}\n' f'stdout: {stdout}\nstderr: {stderr}\n' f'job stdout: {job_stdout}\njob stderr: {job_stderr}') out_of_time = (timeout is not None) and (timeout >= 0) and (time.time() - start_time > timeout) if not quiet: if n_iter == 0: sys.stderr.write(f'Waiting for job {self.id} up to {timeout} s: \n') sys.stderr.flush() # progress info to stderr if n_iter % 10 == 10 - 1: sys.stderr.write(f'{(n_iter // 10) % 10}') else: sys.stderr.write(remote_status[0]) if n_iter % 100 == 100 - 1: sys.stderr.write('\n') sys.stderr.flush() # wait for next check time.sleep(check_interval) n_iter += 1
[docs] def mark_processed(self): """Mark job as processed (usually after results have been stored someplace) """ self.status = 'processed' config.db.update(self.id, status=self.status)
def __str__(self): return f'{self.id} system={self.system_name} remote_id={self.remote_id} status={self.status} stage_dir={self.stage_dir}'