Source code for expyre.system

import sys
import os
from pathlib import Path
import time

from .subprocess import subprocess_run, subprocess_copy
from .schedulers import schedulers
from . import util


[docs]class System: """Interface for a System that can run jobs remotely, including staging files from a local directory to a (config-specified) remote directory, submitting it with the correct kind of Scheduler, and staging back the results from the remote directory. Does not report any other status directly. Parameters ---------- host: str [username@]machine.fqdn partitions: dict dictionary describing partitions types scheduler: str, Scheduler type of scheduler header: list(str), optional list of batch system header to use in every job, typically for system-specific things like selecting nodes no_default_header: bool, default False do not automatically add default header fields, namely job name, partition/queue, max runtime, and stdout/stderr files script_exec: str, default '/bin/bash' executable for 1st line of script pre_submit_cmds: list(str), optional command to run in process that does submission before actual job submission commands: list(str), optional list of commands to run at start of every job on machine rundir: str / None, default 'run_expyre' if host is not None, else None path on remote machine to run in. If absolute, used as is, and if relative, relative to (remote) home directory. If host is None, rundir is None means run directly in stage directory rundir_extra: str, default None extra string to add to remote_rundir, e.g. per-project part of path remsh_cmd: str, default EXPYRE_RSH or 'ssh' remote shell command to use with this system """ def __init__(self, host, partitions, scheduler, header=[], no_default_header=False, script_exec='/bin/bash', pre_submit_cmds=[], commands=[], rundir=None, rundir_extra=None, remsh_cmd=None): self.host = host self.remote_rundir = rundir if self.remote_rundir is None and self.host is not None: # set default for remote runs self.remote_rundir = 'run_expyre' if self.host is None and self.remote_rundir is not None and not Path(self.remote_rundir).is_absolute(): # make local runs with non-absolute rundir relative to $HOME, mimicking behavior # of rsync with remote directory specifications self.remote_rundir = str(Path.home() / self.remote_rundir) if self.remote_rundir is not None: while self.remote_rundir.endswith('/'): self.remote_rundir = self.remote_rundir[:-1] if rundir_extra is not None and self.remote_rundir is not None: self.remote_rundir += '/' + rundir_extra self.partitions = partitions.copy() if partitions is not None else partitions self.queuing_sys_header = header.copy() self.no_default_header = no_default_header self.script_exec = script_exec self.pre_submit_cmds = pre_submit_cmds self.commands = commands.copy() self.remsh_cmd = util.remsh_cmd(remsh_cmd) self.initialized = False if isinstance(scheduler, str): self.scheduler = schedulers[scheduler](host, self.remsh_cmd) else: self.scheduler = scheduler(host)
[docs] def run(self, args, script=None, shell='bash -c', retry=None, in_dir='_HOME_', dry_run=False, verbose=False): # like subprocess_run, but filling in host and remsh command from self return subprocess_run(self.host, args, script=script, shell=shell, remsh_cmd=self.remsh_cmd, retry=retry, in_dir=in_dir, dry_run=dry_run, verbose=verbose)
[docs] def initialize_remote_rundir(self, verbose=False): if self.initialized or self.remote_rundir is None: return self.run(['mkdir', '-p', str(self.remote_rundir)], verbose=verbose) self.initialized = True
def _job_remote_rundir(self, stage_dir): return f'{self.remote_rundir}/{stage_dir.name}'
[docs] def submit(self, id, stage_dir, resources, commands, header_extra=[], exact_fit=True, partial_node=False, verbose=False): """Submit a job on a remote machine, including staging out files Parameters ---------- id: str unique id for job stage_dir: str, Path directory in which files have been prepared resources: Resources resources to use for job commands: list(str) commands to run in job script after per-machine commands header_extra: list(str), optional list of lines to append to system header for this job exact_fit: bool, default True only match partitions that have nodes with exact match to number of cores partial_node: bool, default False allow jobs that take less than an entire node Returns ------- id of job on remote machine """ if 'EXPYRE_TIMING_VERBOSE' in os.environ: sys.stderr.write(f'system {self.id} submit start {time.time()}\n') self.initialize_remote_rundir() partition, node_dict = resources.find_nodes(self.partitions, exact_fit=exact_fit, partial_node=partial_node) # add partition-specific header after per-system header but before header specific # to this submission header_extra = self.partitions[partition].get("header", []) + header_extra # override default partition name from dict key (but after using dict key to look up # other things like header above) actual_partition = self.partitions[partition].get("partition", partition) commands = self.commands + commands stage_dir = Path(stage_dir) if self.remote_rundir is None: # no host, so run in stage dir to avoid needless copying job_remote_rundir = str(stage_dir) else: job_remote_rundir = self._job_remote_rundir(stage_dir) # make remote rundir, but fail if job-specific remote dir already exists self.run(['bash'], script=f'if [ ! -d "{self.remote_rundir}" ]; then ' f' echo "remote rundir \'{self.remote_rundir}\' does not exist" 1>&2; ' ' exit 1; ' f'elif [ -e "{job_remote_rundir}" ]; then ' f' echo "remote job rundir \'{job_remote_rundir}\' already exists" 1>&2; ' ' exit 2; ' 'else ' f' mkdir -p "{job_remote_rundir}"; ' 'fi', verbose=verbose) # stage out files # strip out final / from source path so that rsync creates stage_dir.name remotely under self.remote_rundir stage_dir_src = str(stage_dir) while stage_dir_src.endswith('/'): stage_dir_src = stage_dir_src[:-1] if 'EXPYRE_TIMING_VERBOSE' in os.environ: sys.stderr.write(f'system {self.id} submit start stage in {time.time()}\n') subprocess_copy(stage_dir_src, self.remote_rundir, to_host=self.host, remsh_cmd=self.remsh_cmd, verbose=verbose) # submit job if 'EXPYRE_TIMING_VERBOSE' in os.environ: sys.stderr.write(f'system {self.id} submit start scheduler submit {time.time()}\n') try: r = self.scheduler.submit(id, str(job_remote_rundir), actual_partition, commands, resources.max_time, self.queuing_sys_header + header_extra, node_dict, no_default_header=self.no_default_header, script_exec=self.script_exec, pre_submit_cmds=self.pre_submit_cmds, verbose=verbose) except Exception: if self.remote_rundir is not None: sys.stderr.write(f'System.submit call to Scheduler.submit failed for job id {id}, ' f'cleaning up remote dir {str(self.remote_rundir)}\n') self.run(['rm', '-r', str(job_remote_rundir)], verbose=verbose) raise if 'EXPYRE_TIMING_VERBOSE' in os.environ: sys.stderr.write(f'system {self.id} submit end {time.time()}\n') return r
[docs] def get_remotes(self, local_dir, subdir_glob=None, delete=False, verbose=False): """get data from directories of remotely running jobs Parameters ---------- local_dir: str local directory to stage to subdir_glob: str, list(str), default None only get subdirectories that much one or more globs delete: bool, default False delete local files that aren't in remote dir verbose: bool, default False verbose output """ if self.remote_rundir is None: # nothing to "get" since this ran in stage dir return if subdir_glob is None: subdir_glob = '/*' elif isinstance(subdir_glob, str): subdir_glob = '/' + subdir_glob elif len(subdir_glob) == 1: subdir_glob = '/' + subdir_glob[0] else: subdir_glob = '/{' + ','.join(subdir_glob) + '}' subprocess_copy(self.remote_rundir + subdir_glob, local_dir, from_host=self.host, remsh_cmd=self.remsh_cmd, delete=delete, verbose=verbose)
[docs] def clean_rundir(self, stage_dir, filenames, dry_run=False, verbose=False): """clean a remote stage directory Parameters ---------- stage_dir: str | Path local stage directory path files: list(str) or None list of files to replaced with 'CLEANED', or wipe entire directory if None verbose: bool, default False verbose output """ if self.remote_rundir is None: # the job remote rundir _is_ the stage dir, so do not delete here return job_remote_rundir = self._job_remote_rundir(Path(stage_dir)) if filenames is not None: filenames = ['"' + filename + '"' for filename in filenames] self.run(['bash'], script=(f'for f in {" ".join(filenames)}; do\n' f' ff={job_remote_rundir}/$f\n' f' if [ -f $ff ]; then\n' f' echo "CLEANED" > $ff\n' f' fi\n' f'done\n'), dry_run=dry_run, verbose=verbose) else: self.run(['bash'], script="find " + str(job_remote_rundir) + " -type d -exec chmod u+rwx {} \\; ; rm -rf " + str(job_remote_rundir), dry_run=dry_run, verbose=verbose)
def __str__(self): s = f'System: host {self.host} rundir {self.remote_rundir} scheduler {type(self.scheduler).__name__}\n' if self.partitions is not None: s += ' ' + ' '.join(self.partitions.keys()) return s