expyre package#

Submodules#

expyre.config module#

configuration for expyre from config.json. Use env var EXPYRE_ROOT if set. Otherwise (or if env var is set to @), search from HOME (or /, if current dir is not below HOME) down to current directory for .expyre or _expyre. Configuration is parsed in the same order, with deeper directories modifying previous ones. local_stage_dir is set to deepest directory unless it is set explicitly in one of the found config.json files.

Global variables#

local_stage_dir: str

path of directory to stage jobs into

systems: dict

dict of expyre.system.System that jobs can run on

db: JobsDB

expyre.jobsdb.JobsDB database of jobs

expyre.config.init(root_dir, verbose=False)[source]#

Initializes root, systems, db

expyre.config.update_dict_leaves(d, d_loc)[source]#

expyre.func module#

Interface for remotely running python functions. It pickles the function and its arguments, stages files (pickles as well as additional requested input files) to a local directory, submits to remote System (which stages out inputs, submits on remote queuing system, and stages outputs back in), and then unpickles results to return function result. It also creates/updates the JobsDB entry for the job as it does these steps.

Possible status (see JobsDB): created, submitted, started, succeeded/failed, processed, cleaned

class expyre.func.ExPyRe(name, *, input_files=[], env_vars=[], pre_run_commands=[], post_run_commands=[], output_files=[], try_restart_from_prev=True, hash_ignore=[], function=None, args=[], kwargs={}, _from_db_info=None)[source]#

Bases: object

Create Queued Remote Function object, pickles function and inputs, stores files in local stage dir, and adds to job database.

Parameters
  • name (str) – name of job

  • input_files (list(str | Path), optional) – input files that function will need. Relative paths without ‘..’ path components files will be copied to same path relative to remote rundir on remote machine. Absolute paths will result in files copied into top level remote rundir

  • env_vars (list(str), optional) – list of env vars to set in remote queuing script

  • pre_run_commands (list(str), optional) – list of commands, one per line, to run at start of remote queuing script, after commands in config.json that are used for system

  • post_run_commands (list(str), optional) – list of commands, one per line, to run at end of remote queuing script, after actual task

  • output_files (list(str), optional) – output files to be copied back after evaluation is done

  • try_restart_from_prev (bool, default True) – try to restart from previous call, based on hash of function, arguments, and input files

  • hash_ignore (list(int or str), optional) – args elements (int) or kwargs items (str) to ignore when making hash to determine if run is identical to some previous one

  • function (Callable) – function to call

  • args (list) – positional arguments to function

  • kwargs (dict) – keyword arguments to function

  • _from_db_info (dict, optional (intended for internal use)) – restart is from db, and dict contains special arguments: remote_id, system_name, status, stage_dir

  • JobsDB) (Possible status (see) –

cancel(verbose=False)[source]#
clean(wipe=False, dry_run=False, remote_only=False, verbose=False)[source]#

clean the local and remote stage directories

Parameters
  • wipe (bool, default False) – wipe directory completely, opposed to just writing CLEANED into files that could be large like python function input and output (NOTE: other staged in files or files that are created are not cleaned if wipe=False)

  • dry_run (bool, default False) – dry run only, print what will happen but do not actually delete or overwrite anything

  • remote_only (bool, default False) – wipe only remote dir (ignored when wipe is False)

  • verbose (bool, default False) – verbose output

static from_jobsdb(db_jobs)[source]#

Create a list of ExPyRe objects from JobsDB records

Parameters
db_jobs: dict or list(dict)

one or more jobs records returned from JobsDB.jobs()

Returns

created objects

Return type

list(ExPyRe)

get_results(timeout=3600, check_interval=30, sync=True, sync_all=True, force_sync=False, quiet=False, verbose=False)[source]#

Get results from a remote job

Parameters
  • timeout (int or str, default 3600) – Max time (in sec if int, time spec if str) to wait for job to complete, None or int <= 0 to wait forever

  • check_interval (int, default 30) – Time to wait (in sec) between checks of job completion

  • sync (bool, default True) – Synchronize remote files before checking for results Note that if this is False and job is finished on remote system but output files haven’t been previously synchronize, this will wait one check_interval then raise an error

  • sync_all (bool, default True) – Sync files from all jobs (not just this one), to reduce number of separate remote copy calls

  • force_sync (bool, default False) – Sync remote files even if job’s DB status indicates that this was already done. Note: together with sync_all this can lead to rsync having to compare many files.

  • quiet (bool, default False) – No progress info

  • verbose (bool, default False) – Verbose output (from remote system/scheduler commands)

Returns

  • value of function

  • string containing stdout during function

  • string containing stderr during function

Return type

return, stdout, stderr

property id#
mark_processed()[source]#

Mark job as processed (usually after results have been stored someplace)

start(resources, system_name=None, header_extra=[], exact_fit=True, partial_node=False, python_cmd='python3', force_rerun=False)[source]#

Start a job on a remote machine

Parameters
  • resoures (dict or Resources) – resources to use for job, either Resources or dict of Resources constructor kwargs

  • system_name (str) – name of system in config.systems

  • header_extra (list(str), optional) – list of lines to add to queuing system header, appended to System.header[]

  • exact_fit (bool, default True) – use only nodes that exactly match number of tasks

  • partial_node (bool, default False) – allow jobs that take less than an entire node

  • python_cmd (str, default python3) – name of python interpreter to use on remote machine

  • force_rerun (bool, default False) – force a rerun even if self.status is not “created”

sync_remote_results_status(sync_all=True, force_sync=False, verbose=False)[source]#

Sync files associated with results from remote machine to local stage dirs and updates ‘remote_status’ in jobsdb. Note that both have to happen because other functions assume that if remote status has been updated files have been staged back as well.

Parameters
  • sync_all (bool, default True) – sync files for all jobs on same system, to minimize number of calls to remote copy

  • force_sync (bool, default False) – sync files even if this job’s status indicates that this has already been done

  • verbose (bool, default False) – verbose output

exception expyre.func.ExPyReJobDiedError[source]#

Bases: Exception

Exception that is raised when ExPyRe remote job appears to have been killed for reasons other than the python process raising an exception, e.g. if it was out of time in the queuing system.

exception expyre.func.ExPyReTimeoutError[source]#

Bases: TimeoutError

Exception raised when ExPyRe gave up waiting for a job to finish because it exceeded the timeout value

expyre.jobsdb module#

class expyre.jobsdb.JobsDB(db_filename)[source]#

Bases: object

Database of jobs, currently implemented with sqlite. Saves essential information on jobs, including local and remote id, local directory it’s staged to, system its been submitted to, and status.

Status: created - job has been created, but not yet submitted to run

submitted - job has been submitted to some queuing system started - job has started running succeeded - job has finished successfully, and results are available failed - job has failed died - job died without producing expected low level output processed - job has been processed, and results no longer need to be saved cleaned - job stage dir (local and remote) has been cleaned (large files overwritten or all files wiped)

add(id, name, from_dir, status='created', system=None, remote_id=None, remote_status=None)[source]#

Add a job to the DB

Parameters
  • id (str) – unique id for job (fails if id already exists)

  • name (str) – name for job

  • from_dir (str/Path) – path to directory job is to run from

  • status (str, default 'created') – status of job

  • system (str, optional) – system job is running on

  • remote_id (str, optional) – remote id on system

  • remote_status (str, optional) – remote status on system

jobs(status=None, id=None, name=None, system=None, readable=True)[source]#

Iterate through jobs

Parameters
  • status (str or list(str), default None) – if not None, only report on jobs that match any status

  • id (str or list(str), default None) – if present, include only jobs with id that match regexps in this list

  • name (str or list(str), default None) – if present, include only jobs with name that matches regexps in this list

  • system (str or list(str), default None) – if present, include only jobs with system in this list

Returns

Iterator of dicts

Return type

Iterator of dicts with fields for all DB columns for each job that matches selection criteria.

possible_status = ['created', 'submitted', 'started', 'succeeded', 'failed', 'died', 'processed', 'cleaned']#
remove(id)[source]#

Remove a job from the DB

Parameters

id (str) – unique id of job to remove

status_group = {'can_produce_results': ['created', 'submitted', 'started', 'succeeded', 'died'], 'ongoing': ['created', 'submitted', 'started']}#
unlock()[source]#

unlocks the sqlite database

update(id, /, **kwargs)[source]#

Update some field of job

Parameters
  • id (str) – unique id of job to update

  • from_dir (str) – field(s) to update

  • status (str) – field(s) to update

  • system (str) – field(s) to update

  • remote_id (str) – field(s) to update

  • remote_status (str) – field(s) to update

expyre.resources module#

class expyre.resources.Resources(max_time, num_nodes=None, num_cores=None, max_mem_tot=None, max_mem_per_core=None, partitions=None, queues=None)[source]#

Bases: object

Resources required for a task, including time, memory, cores/nodes, and particular partitions/queues. Mainly consists of code that selects appropriate partition/queue from the list associated with each System.

Parameters
  • max_time (int, str) – max time for job in sec (int) or time spec (str)

  • num_nodes (int) – number of nodes to use, mutually exclusive with num_cores, one is required

  • num_cores (int) – number of cores to use, mutually exclusive with num_nodes, one is required

  • max_mem_tot (int/str, default None) – total max mem in kB (int) or memory spec (str), mutually exclusive with max_mem_per_core

  • max_mem_per_core (int/str, default None) – per-core max mem in kB (int) or memory spec (str), mutually exclusive with max_mem_tot

  • partitions/queues (list(str), default None) – regexps for types of node that can be used

find_nodes(partitions, exact_fit=True, partial_node=False)[source]#

find a node type that accommodates requested resources

Parameters
  • partitions (dict) – properties of available partitions (only used internally by system.py, so “queues” synonymn is not implemented here).

  • exact_fit (bool, default True) – only return nodes that exactly satisfy the number of cores

  • partial_node (bool, default False) – allow jobs that take less than one entire node, overrides exact_fit

Returns

  • partition (str) – name of partition selected

  • node_dict (dict) – various quantities of node

    • num_nodes: int, total number of nodes needed

    • num_cores: int, total number of cores needed

    • num_cores_per_node: int, number of cores per node for selected nodes

expyre.subprocess module#

Functions dealing with running subprocesses, optionally on remote machines, handling quoting/shell escping, encoding/decoding, environment, and making sure that optional arguments needed for things like kerberized-ssh are set correctly.

exception expyre.subprocess.FailedSubprocessWarning[source]#

Bases: RuntimeWarning

expyre.subprocess.subprocess_copy(from_files, to_file, from_host='_LOCAL_', to_host='_LOCAL_', rcp_args='-a', rcp_cmd='rsync', remsh_cmd=None, retry=None, remsh_flags='-e', delete=False, verbose=False, dry_run=False)[source]#

Run a remote copy (e.g. rsync) in a subprocess, optionally to/from remote machine. Exactly one machine has to be specified, and relative paths on that machine are relative to its home dir, like rsync. If the specified machine is None the copy is local, but relative paths are still relative to home dir.

Raises RuntimeError for non-zero return status.

Parameters
  • from_files (str, Path, list(str), list(Path)) – one or more files/directories to copy from (not including ‘user@host:’ part)

  • to_file (str, Path) – _one_ file/directory to copy to (not including ‘user@host:’ part)

  • from_host (str, optional) – [username@]host.domain to copy from (no “:”), mutually exclusive with to_host, one is required. If None, use a local dir, but make relative paths relative to $HOME instead of to $PWD (like rsync with a host)

  • to_host (str, optional) – [username@]host.domain to copy to (no “:”), mutually exclusive with from_host, one is required. If None, use a local dir, but make relative paths relative to $HOME instead of to $PWD (like rsync with a host)

  • rcp_args (str, default '-a') – non-filename arguments to remote copy command

  • rcp_cmd (str, default 'rsync') – command to do copy

  • remsh_cmd (str, default EXPYRE_RSH env var or ssh) – arguments to set shell command for rcp_cmd to use

  • retry (optional) – passed as retry argument to subprocess_run

  • remsh_flags (str, default '-e') – flag to prefix to remsh_cmd when calling rcp_cmd

  • delete (bool, default False) – delete target files that aren’t in source with –delete option

  • verbose (bool, default False) – verbose output

  • dry_run (bool, default False) – dry run, don’t actually copy

expyre.subprocess.subprocess_run(host, args, script=None, shell='bash -c', remsh_cmd=None, retry=None, in_dir='_HOME_', dry_run=False, verbose=False)[source]#

run a subprocess, optionally via ssh on a remote machine. Raises RuntimeError for non-zero return status.

Parameters
  • host (str) – [username]@machine.domain, or None for a locally run process

  • args (list(str)) – arguments to run, starting with command and followed by its command line args

  • script (str, default None) – text to write to process’s standard input

  • shell (str, default 'bash -c') – shell to use, including any flags necessary for it to interpret the next argument as the commands to run (-c for bash)

  • remsh_command (str | list(str), default env var EXPYRE_RSH or 'ssh') – command to start on remote host, usually ssh

  • retry ((int, int), default env var EXPYRE_RETRY.split() or (2, 5)) – number of times to retry and number of seconds to wait between each trial

  • in_dir (str, default _HOME_) – directory to cd into before running args, _HOME_ for home dir, _PWD_ for python current working directory (only for host == None)

  • verbose (bool, default False) – verbose output

Returns

stdout, stderr

Return type

output and error of subprocess, as strings (bytes.decode())

expyre.system module#

class expyre.system.System(host, partitions, scheduler, header=[], no_default_header=False, script_exec='/bin/bash', pre_submit_cmds=[], commands=[], rundir=None, rundir_extra=None, remsh_cmd=None)[source]#

Bases: object

Interface for a System that can run jobs remotely, including staging files from a local directory to a (config-specified) remote directory, submitting it with the correct kind of Scheduler, and staging back the results from the remote directory. Does not report any other status directly.

Parameters
  • host (str) – [username@]machine.fqdn

  • partitions (dict) – dictionary describing partitions types

  • scheduler (str, Scheduler) – type of scheduler

  • header (list(str), optional) – list of batch system header to use in every job, typically for system-specific things like selecting nodes

  • no_default_header (bool, default False) – do not automatically add default header fields, namely job name, partition/queue, max runtime, and stdout/stderr files

  • script_exec (str, default '/bin/bash') – executable for 1st line of script

  • pre_submit_cmds (list(str), optional) – command to run in process that does submission before actual job submission

  • commands (list(str), optional) – list of commands to run at start of every job on machine

  • rundir (str / None, default 'run_expyre' if host is not None, else None) – path on remote machine to run in. If absolute, used as is, and if relative, relative to (remote) home directory. If host is None, rundir is None means run directly in stage directory

  • rundir_extra (str, default None) – extra string to add to remote_rundir, e.g. per-project part of path

  • remsh_cmd (str, default EXPYRE_RSH or 'ssh') – remote shell command to use with this system

clean_rundir(stage_dir, filenames, dry_run=False, verbose=False)[source]#

clean a remote stage directory

Parameters
  • stage_dir (str | Path) – local stage directory path

  • files (list(str) or None) – list of files to replaced with ‘CLEANED’, or wipe entire directory if None

  • verbose (bool, default False) – verbose output

get_remotes(local_dir, subdir_glob=None, delete=False, verbose=False)[source]#

get data from directories of remotely running jobs

Parameters
  • local_dir (str) – local directory to stage to

  • subdir_glob (str, list(str), default None) – only get subdirectories that much one or more globs

  • delete (bool, default False) – delete local files that aren’t in remote dir

  • verbose (bool, default False) – verbose output

initialize_remote_rundir(verbose=False)[source]#
run(args, script=None, shell='bash -c', retry=None, in_dir='_HOME_', dry_run=False, verbose=False)[source]#
submit(id, stage_dir, resources, commands, header_extra=[], exact_fit=True, partial_node=False, verbose=False)[source]#

Submit a job on a remote machine, including staging out files

Parameters
  • id (str) – unique id for job

  • stage_dir (str, Path) – directory in which files have been prepared

  • resources (Resources) – resources to use for job

  • commands (list(str)) – commands to run in job script after per-machine commands

  • header_extra (list(str), optional) – list of lines to append to system header for this job

  • exact_fit (bool, default True) – only match partitions that have nodes with exact match to number of cores

  • partial_node (bool, default False) – allow jobs that take less than an entire node

Return type

id of job on remote machine

expyre.units module#

Units-related utilities for converting between various time and memory formats

expyre.units.mem_to_kB(mem)[source]#

convert memory to kB

Parameters

mem (str | int) – int: mem in kB str format “float[kKmMgGtT]b?”: memory in KB, MB, GB, TB, float cannot have exponent

Return type

int memory in kB

expyre.units.time_to_HMS(t)[source]#
expyre.units.time_to_sec(t)[source]#

convert time to seconds

Parameters

t (str, int) –

  • int: time in sec

  • str: format “float[sSmMhHdD]” number of seconds, minutes, hours, days, or “days(int)-HMS” or “HMS”, with HMS being (HHHH:MM:SS | MM:SS | SS)

Return type

int time in seconds

expyre.util module#

expyre.util.remsh_cmd(cmd)[source]#

Get remote command

Parameters

cmd (str) –

Returns

cmd – Command set as EXPYRE_RSH environment variable, "ssh" otherwise.

Return type

str

Module contents#