expyre package#
Subpackages#
Submodules#
expyre.config module#
configuration for expyre from config.json
. Use env var EXPYRE_ROOT
if set.
Otherwise (or if env var is set to @
), search from HOME
(or /
, if current
dir is not below HOME
) down to current directory for .expyre
or _expyre
.
Configuration is parsed in the same order, with deeper directories modifying previous
ones. local_stage_dir
is set to deepest directory unless it is set explicitly
in one of the found config.json
files.
Global variables#
- local_stage_dir: str
path of directory to stage jobs into
- systems: dict
dict of expyre.system.System that jobs can run on
- db: JobsDB
expyre.jobsdb.JobsDB database of jobs
expyre.func module#
Interface for remotely running python functions. It pickles the function and its arguments, stages files (pickles as well as additional requested input files) to a local directory, submits to remote System (which stages out inputs, submits on remote queuing system, and stages outputs back in), and then unpickles results to return function result. It also creates/updates the JobsDB entry for the job as it does these steps.
Possible status (see JobsDB): created, submitted, started, succeeded/failed, processed, cleaned
- class expyre.func.ExPyRe(name, *, input_files=[], env_vars=[], pre_run_commands=[], post_run_commands=[], output_files=[], try_restart_from_prev=True, hash_ignore=[], function=None, args=[], kwargs={}, _from_db_info=None)[source]#
Bases:
object
Create Queued Remote Function object, pickles function and inputs, stores files in local stage dir, and adds to job database.
- Parameters
name (str) – name of job
input_files (list(str | Path), optional) – input files that function will need. Relative paths without ‘..’ path components files will be copied to same path relative to remote rundir on remote machine. Absolute paths will result in files copied into top level remote rundir
env_vars (list(str), optional) – list of env vars to set in remote queuing script
pre_run_commands (list(str), optional) – list of commands, one per line, to run at start of remote queuing script, after commands in config.json that are used for system
post_run_commands (list(str), optional) – list of commands, one per line, to run at end of remote queuing script, after actual task
output_files (list(str), optional) – output files to be copied back after evaluation is done
try_restart_from_prev (bool, default True) – try to restart from previous call, based on hash of function, arguments, and input files
hash_ignore (list(int or str), optional) – args elements (int) or kwargs items (str) to ignore when making hash to determine if run is identical to some previous one
function (Callable) – function to call
args (list) – positional arguments to function
kwargs (dict) – keyword arguments to function
_from_db_info (dict, optional (intended for internal use)) – restart is from db, and dict contains special arguments: remote_id, system_name, status, stage_dir
JobsDB) (Possible status (see) –
- clean(wipe=False, dry_run=False, remote_only=False, verbose=False)[source]#
clean the local and remote stage directories
- Parameters
wipe (bool, default False) – wipe directory completely, opposed to just writing CLEANED into files that could be large like python function input and output (NOTE: other staged in files or files that are created are not cleaned if wipe=False)
dry_run (bool, default False) – dry run only, print what will happen but do not actually delete or overwrite anything
remote_only (bool, default False) – wipe only remote dir (ignored when wipe is False)
verbose (bool, default False) – verbose output
- static from_jobsdb(db_jobs)[source]#
Create a list of ExPyRe objects from JobsDB records
- Parameters
- db_jobs: dict or list(dict)
one or more jobs records returned from
JobsDB.jobs()
- Returns
created objects
- Return type
list(ExPyRe)
- get_results(timeout=3600, check_interval=30, sync=True, sync_all=True, force_sync=False, quiet=False, verbose=False)[source]#
Get results from a remote job
- Parameters
timeout (int or str, default 3600) – Max time (in sec if int, time spec if str) to wait for job to complete, None or int <= 0 to wait forever
check_interval (int, default 30) – Time to wait (in sec) between checks of job completion
sync (bool, default True) – Synchronize remote files before checking for results Note that if this is False and job is finished on remote system but output files haven’t been previously synchronize, this will wait one check_interval then raise an error
sync_all (bool, default True) – Sync files from all jobs (not just this one), to reduce number of separate remote copy calls
force_sync (bool, default False) – Sync remote files even if job’s DB status indicates that this was already done. Note: together with sync_all this can lead to rsync having to compare many files.
quiet (bool, default False) – No progress info
verbose (bool, default False) – Verbose output (from remote system/scheduler commands)
- Returns
value of function
string containing stdout during function
string containing stderr during function
- Return type
return, stdout, stderr
- property id#
- start(resources, system_name=None, header_extra=[], exact_fit=True, partial_node=False, python_cmd='python3', force_rerun=False)[source]#
Start a job on a remote machine
- Parameters
resoures (dict or Resources) – resources to use for job, either Resources or dict of Resources constructor kwargs
system_name (str) – name of system in config.systems
header_extra (list(str), optional) – list of lines to add to queuing system header, appended to System.header[]
exact_fit (bool, default True) – use only nodes that exactly match number of tasks
partial_node (bool, default False) – allow jobs that take less than an entire node
python_cmd (str, default python3) – name of python interpreter to use on remote machine
force_rerun (bool, default False) – force a rerun even if self.status is not “created”
- sync_remote_results_status(sync_all=True, force_sync=False, verbose=False)[source]#
Sync files associated with results from remote machine to local stage dirs and updates ‘remote_status’ in jobsdb. Note that both have to happen because other functions assume that if remote status has been updated files have been staged back as well.
- Parameters
sync_all (bool, default True) – sync files for all jobs on same system, to minimize number of calls to remote copy
force_sync (bool, default False) – sync files even if this job’s status indicates that this has already been done
verbose (bool, default False) – verbose output
expyre.jobsdb module#
- class expyre.jobsdb.JobsDB(db_filename)[source]#
Bases:
object
Database of jobs, currently implemented with sqlite. Saves essential information on jobs, including local and remote id, local directory it’s staged to, system its been submitted to, and status.
- Status: created - job has been created, but not yet submitted to run
submitted - job has been submitted to some queuing system started - job has started running succeeded - job has finished successfully, and results are available failed - job has failed died - job died without producing expected low level output processed - job has been processed, and results no longer need to be saved cleaned - job stage dir (local and remote) has been cleaned (large files overwritten or all files wiped)
- add(id, name, from_dir, status='created', system=None, remote_id=None, remote_status=None)[source]#
Add a job to the DB
- Parameters
id (str) – unique id for job (fails if id already exists)
name (str) – name for job
from_dir (str/Path) – path to directory job is to run from
status (str, default 'created') – status of job
system (str, optional) – system job is running on
remote_id (str, optional) – remote id on system
remote_status (str, optional) – remote status on system
- jobs(status=None, id=None, name=None, system=None, readable=True)[source]#
Iterate through jobs
- Parameters
status (str or list(str), default None) – if not None, only report on jobs that match any status
id (str or list(str), default None) – if present, include only jobs with id that match regexps in this list
name (str or list(str), default None) – if present, include only jobs with name that matches regexps in this list
system (str or list(str), default None) – if present, include only jobs with system in this list
- Returns
Iterator of dicts
- Return type
Iterator of dicts with fields for all DB columns for each job that matches selection criteria.
- possible_status = ['created', 'submitted', 'started', 'succeeded', 'failed', 'died', 'processed', 'cleaned']#
- status_group = {'can_produce_results': ['created', 'submitted', 'started', 'succeeded', 'died'], 'ongoing': ['created', 'submitted', 'started']}#
expyre.resources module#
- class expyre.resources.Resources(max_time, num_nodes=None, num_cores=None, max_mem_tot=None, max_mem_per_core=None, partitions=None, queues=None)[source]#
Bases:
object
Resources required for a task, including time, memory, cores/nodes, and particular partitions/queues. Mainly consists of code that selects appropriate partition/queue from the list associated with each System.
- Parameters
max_time (int, str) – max time for job in sec (int) or time spec (str)
num_nodes (int) – number of nodes to use, mutually exclusive with num_cores, one is required
num_cores (int) – number of cores to use, mutually exclusive with num_nodes, one is required
max_mem_tot (int/str, default None) – total max mem in kB (int) or memory spec (str), mutually exclusive with max_mem_per_core
max_mem_per_core (int/str, default None) – per-core max mem in kB (int) or memory spec (str), mutually exclusive with max_mem_tot
partitions/queues (list(str), default None) – regexps for types of node that can be used
- find_nodes(partitions, exact_fit=True, partial_node=False)[source]#
find a node type that accommodates requested resources
- Parameters
partitions (dict) – properties of available partitions (only used internally by system.py, so “queues” synonymn is not implemented here).
exact_fit (bool, default True) – only return nodes that exactly satisfy the number of cores
partial_node (bool, default False) – allow jobs that take less than one entire node, overrides exact_fit
- Returns
partition (str) – name of partition selected
node_dict (dict) – various quantities of node
num_nodes: int, total number of nodes needed
num_cores: int, total number of cores needed
num_cores_per_node: int, number of cores per node for selected nodes
expyre.subprocess module#
Functions dealing with running subprocesses, optionally on remote machines, handling quoting/shell escping, encoding/decoding, environment, and making sure that optional arguments needed for things like kerberized-ssh are set correctly.
- expyre.subprocess.subprocess_copy(from_files, to_file, from_host='_LOCAL_', to_host='_LOCAL_', rcp_args='-a', rcp_cmd='rsync', remsh_cmd=None, retry=None, remsh_flags='-e', delete=False, verbose=False, dry_run=False)[source]#
Run a remote copy (e.g. rsync) in a subprocess, optionally to/from remote machine. Exactly one machine has to be specified, and relative paths on that machine are relative to its home dir, like rsync. If the specified machine is None the copy is local, but relative paths are still relative to home dir.
Raises RuntimeError for non-zero return status.
- Parameters
from_files (str, Path, list(str), list(Path)) – one or more files/directories to copy from (not including ‘user@host:’ part)
to_file (str, Path) – _one_ file/directory to copy to (not including ‘user@host:’ part)
from_host (str, optional) – [username@]host.domain to copy from (no “:”), mutually exclusive with to_host, one is required. If None, use a local dir, but make relative paths relative to $HOME instead of to $PWD (like rsync with a host)
to_host (str, optional) – [username@]host.domain to copy to (no “:”), mutually exclusive with from_host, one is required. If None, use a local dir, but make relative paths relative to $HOME instead of to $PWD (like rsync with a host)
rcp_args (str, default '-a') – non-filename arguments to remote copy command
rcp_cmd (str, default 'rsync') – command to do copy
remsh_cmd (str, default EXPYRE_RSH env var or ssh) – arguments to set shell command for rcp_cmd to use
retry (optional) – passed as retry argument to subprocess_run
remsh_flags (str, default '-e') – flag to prefix to remsh_cmd when calling rcp_cmd
delete (bool, default False) – delete target files that aren’t in source with –delete option
verbose (bool, default False) – verbose output
dry_run (bool, default False) – dry run, don’t actually copy
- expyre.subprocess.subprocess_run(host, args, script=None, shell='bash -c', remsh_cmd=None, retry=None, in_dir='_HOME_', dry_run=False, verbose=False)[source]#
run a subprocess, optionally via ssh on a remote machine. Raises RuntimeError for non-zero return status.
- Parameters
host (str) – [username]@machine.domain, or None for a locally run process
args (list(str)) – arguments to run, starting with command and followed by its command line args
script (str, default None) – text to write to process’s standard input
shell (str, default 'bash -c') – shell to use, including any flags necessary for it to interpret the next argument as the commands to run (-c for bash)
remsh_command (str | list(str), default env var EXPYRE_RSH or 'ssh') – command to start on remote host, usually ssh
retry ((int, int), default env var EXPYRE_RETRY.split() or (2, 5)) – number of times to retry and number of seconds to wait between each trial
in_dir (str, default _HOME_) – directory to cd into before running args, _HOME_ for home dir, _PWD_ for python current working directory (only for host == None)
verbose (bool, default False) – verbose output
- Returns
stdout, stderr
- Return type
output and error of subprocess, as strings (bytes.decode())
expyre.system module#
- class expyre.system.System(host, partitions, scheduler, header=[], no_default_header=False, script_exec='/bin/bash', pre_submit_cmds=[], commands=[], rundir=None, rundir_extra=None, remsh_cmd=None)[source]#
Bases:
object
Interface for a System that can run jobs remotely, including staging files from a local directory to a (config-specified) remote directory, submitting it with the correct kind of Scheduler, and staging back the results from the remote directory. Does not report any other status directly.
- Parameters
host (str) – [username@]machine.fqdn
partitions (dict) – dictionary describing partitions types
scheduler (str, Scheduler) – type of scheduler
header (list(str), optional) – list of batch system header to use in every job, typically for system-specific things like selecting nodes
no_default_header (bool, default False) – do not automatically add default header fields, namely job name, partition/queue, max runtime, and stdout/stderr files
script_exec (str, default '/bin/bash') – executable for 1st line of script
pre_submit_cmds (list(str), optional) – command to run in process that does submission before actual job submission
commands (list(str), optional) – list of commands to run at start of every job on machine
rundir (str / None, default 'run_expyre' if host is not None, else None) – path on remote machine to run in. If absolute, used as is, and if relative, relative to (remote) home directory. If host is None, rundir is None means run directly in stage directory
rundir_extra (str, default None) – extra string to add to remote_rundir, e.g. per-project part of path
remsh_cmd (str, default EXPYRE_RSH or 'ssh') – remote shell command to use with this system
- clean_rundir(stage_dir, filenames, dry_run=False, verbose=False)[source]#
clean a remote stage directory
- Parameters
stage_dir (str | Path) – local stage directory path
files (list(str) or None) – list of files to replaced with ‘CLEANED’, or wipe entire directory if None
verbose (bool, default False) – verbose output
- get_remotes(local_dir, subdir_glob=None, delete=False, verbose=False)[source]#
get data from directories of remotely running jobs
- Parameters
local_dir (str) – local directory to stage to
subdir_glob (str, list(str), default None) – only get subdirectories that much one or more globs
delete (bool, default False) – delete local files that aren’t in remote dir
verbose (bool, default False) – verbose output
- run(args, script=None, shell='bash -c', retry=None, in_dir='_HOME_', dry_run=False, verbose=False)[source]#
- submit(id, stage_dir, resources, commands, header_extra=[], exact_fit=True, partial_node=False, verbose=False)[source]#
Submit a job on a remote machine, including staging out files
- Parameters
id (str) – unique id for job
stage_dir (str, Path) – directory in which files have been prepared
resources (Resources) – resources to use for job
commands (list(str)) – commands to run in job script after per-machine commands
header_extra (list(str), optional) – list of lines to append to system header for this job
exact_fit (bool, default True) – only match partitions that have nodes with exact match to number of cores
partial_node (bool, default False) – allow jobs that take less than an entire node
- Return type
id of job on remote machine
expyre.units module#
Units-related utilities for converting between various time and memory formats