"""Functions dealing with running subprocesses, optionally on remote machines, handling
quoting/shell escping, encoding/decoding, environment, and making sure that optional
arguments needed for things like kerberized-ssh are set correctly. """
import sys
import os
import subprocess
import re
import warnings
import time
import shlex
from pathlib import Path
[docs]class FailedSubprocessWarning(RuntimeWarning):
pass
warnings.filterwarnings('always', category=FailedSubprocessWarning)
def _my_warn(msg):
if 'pytest' in sys.modules:
# also print to stderr since pytest shows all warnings
# separately, and it's harder to use them to debug
sys.stderr.write(msg + '\n')
sys.stderr.flush()
warnings.warn(msg, category=FailedSubprocessWarning)
def _optionally_remote_args(args, shell, host, remsh_cmd, in_dir='_HOME_'):
"""Convert args array into a shell call, optionally preceded by an ssh command
Parameters
----------
args: list(str)
arguments to run
shell: str
shell to use
host: str
[username@]host to pass to ssh, None to run locally but act like ssh, i.e. start in $HOME,
remsh_cmd: str, list(str)
remote shell command (usually ssh)
in_dir: str, default _HOME_
directory to cd into before running args, _HOME_ for home dir, _PWD_ for python current working directory (only for host == None)
Returns
-------
args list(str)
"""
# make args appropriate for passing to subprocess.Popen, either directly
# or preceded by an ssh command, with the shell as an explicit arg
if isinstance(remsh_cmd, str):
remsh_cmd = remsh_cmd.split()
# NOTE: quoting below may be bash specific
if in_dir == '_PWD_':
assert host is None
# get ready to pass args into bash as a single command:
# join args into a single string, backslash escaping some chars in it, namely: ', ", space, (, )
# start with "cd $HOME" so that whether or not there's an ssh, operations start relative to home dir
cmd_str = ''
if in_dir == '_HOME_':
cmd_str = 'cd $HOME'
elif in_dir != '_PWD_':
cmd_str = 'cd ' + in_dir
if len(args) > 0:
if len(cmd_str) > 0:
cmd_str += ' && '
cmd_str += ' '.join([re.sub(r'([\'" \(\)])', r'\\\1', arg) for arg in args])
args = shell.split() + [cmd_str]
if host is not None:
# pass remote command to an ssh command, in single quotes
# args[0:-1] are shell itself and its flags, args[-1] is the command to run
# which needs to be single quoted
args = remsh_cmd + [host] + args[0:-1] + ["'" + args[-1] + "'"]
return args
[docs]def subprocess_run(host, args, script=None, shell='bash -c', remsh_cmd=None, retry=None, in_dir='_HOME_', dry_run=False, verbose=False):
"""run a subprocess, optionally via ssh on a remote machine. Raises RuntimeError for non-zero
return status.
Parameters
----------
host: str
[username]@machine.domain, or None for a locally run process
args: list(str)
arguments to run, starting with command and followed by its command line args
script: str, default None
text to write to process's standard input
shell: str, default 'bash -c'
shell to use, including any flags necessary for it to interpret the next argument
as the commands to run (-c for bash)
remsh_command: str | list(str), default env var EXPYRE_RSH or 'ssh'
command to start on remote host, usually ssh
retry: (int, int), default env var EXPYRE_RETRY.split() or (2, 5)
number of times to retry and number of seconds to wait between each trial
in_dir: str, default _HOME_
directory to cd into before running args, _HOME_ for home dir, _PWD_ for python current working directory (only for host == None)
verbose: bool, default False
verbose output
Returns
-------
stdout, stderr: output and error of subprocess, as strings (bytes.decode())
"""
if remsh_cmd is None:
remsh_cmd = os.environ.get('EXPYRE_RSH', 'ssh')
if retry is None:
if 'EXPYRE_RETRY' in os.environ:
retry = tuple([int(_ii) for _ii in os.environ['EXPYRE_RETRY'].strip().split()])
else:
retry = (3, 5)
# always run at least once, and wait a valid (>= 0) amount of time
retry = (max(retry[0], 1), max(retry[1], 0))
args = _optionally_remote_args(args, shell, host, remsh_cmd, in_dir)
if verbose:
if dry_run:
print('DRY-RUN COMMAND:')
else:
print('RUNNING COMMAND:')
print(' '.join([shlex.quote(arg) for arg in args]))
if script is not None:
print('SCRIPT:')
print(script.rstrip())
if script is not None:
script = script.encode()
if dry_run:
return args, script
for i_try in range(retry[0]):
try:
p = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdin=subprocess.PIPE,
close_fds=False, env=os.environ)
stdout, stderr = p.communicate(script)
if p.returncode != 0:
raise RuntimeError(f'Failed to run command "{" ".join(args)}" with err {stderr.decode()}')
# success
if i_try > 0:
_my_warn(f'Succeeded to run "{" ".join(args)}" on attempt {i_try} after failure(s), trying again')
break
except Exception:
if i_try == retry[0]-1:
# last try
_my_warn(f'Failed to run "{" ".join(args)}" on attempt {i_try} for the last time, giving up.\nSTDERR\n{stderr.decode()}')
# failed last chance
raise
_my_warn(f'Failed to run "{" ".join(args)}" on attempt {i_try}, trying again.\nSTDERR\n{stderr.decode()}')
time.sleep(retry[1])
if verbose:
print('GOT STDOUT:')
print(stdout.decode())
print('GOT STDERR:')
print(stderr.decode())
return stdout.decode(), stderr.decode()
[docs]def subprocess_copy(from_files, to_file, from_host='_LOCAL_', to_host='_LOCAL_',
rcp_args='-a', rcp_cmd='rsync', remsh_cmd=None, retry=None, remsh_flags='-e',
delete=False, verbose=False, dry_run=False):
"""Run a remote copy (e.g. rsync) in a subprocess, optionally to/from remote machine. Exactly one
machine has to be specified, and relative paths on that machine are relative to its home dir, like
rsync. If the specified machine is None the copy is local, but relative paths are still relative to
home dir.
Raises RuntimeError for non-zero return status.
Parameters
----------
from_files: str, Path, list(str), list(Path)
one or more files/directories to copy from (not including 'user@host:' part)
to_file: str, Path
_one_ file/directory to copy to (not including 'user@host:' part)
from_host: str, optional
[username@]host.domain to copy from (no ":"), mutually exclusive with to_host, one is required.
If None, use a local dir, but make relative paths relative to $HOME instead of to $PWD (like rsync with a host)
to_host: str, optional
[username@]host.domain to copy to (no ":"), mutually exclusive with from_host, one is required.
If None, use a local dir, but make relative paths relative to $HOME instead of to $PWD (like rsync with a host)
rcp_args: str, default '-a'
non-filename arguments to remote copy command
rcp_cmd: str, default 'rsync'
command to do copy
remsh_cmd: str, default EXPYRE_RSH env var or ssh
arguments to set shell command for rcp_cmd to use
retry: optional
passed as retry argument to subprocess_run
remsh_flags: str, default '-e'
flag to prefix to remsh_cmd when calling rcp_cmd
delete: bool, default False
delete target files that aren't in source with --delete option
verbose: bool, default False
verbose output
dry_run: bool, default False
dry run, don't actually copy
"""
# exactly one of from_host, to_host must be provided
if from_host != '_LOCAL_' and to_host != '_LOCAL_':
raise RuntimeError(f'Cannot have remote machine for both of source host "{from_host}" and "{to_host}"')
if remsh_cmd is None:
remsh_cmd = os.environ.get('EXPYRE_RSH', 'ssh')
rcp_args = remsh_flags + ' ' + remsh_cmd + ' ' + rcp_args
if delete:
rcp_args += ' --delete'
# make from_files plain str or Path into list
if isinstance(from_files, str) or isinstance(from_files, Path):
from_files = [from_files]
if from_host is None:
# local dir, but relative is relative to home dir rather than current dir, like rsync
abs_from_files = []
for f in from_files:
if not Path(f).is_absolute():
f = Path.home() / f
abs_from_files.append(str(f))
else:
abs_from_files = from_files
if from_host is None or from_host == '_LOCAL_':
# make into string that can be prepended
from_host = ''
if to_host is None and not Path(to_file).is_absolute():
# local dir, but relative is relative to home dir rather than current dir, like rsync
abs_to_file = Path.home() / to_file
else:
abs_to_file = to_file
if to_host is None or to_host == '_LOCAL_':
# make into string that can be prepended
to_host = ''
# add ':' after non-blank host
if len(from_host) > 0:
from_host += ':'
if len(to_host) > 0:
to_host += ':'
# prepend host parts to from_files and to_file
abs_from_files = [from_host + str(f) for f in abs_from_files]
abs_to_file = to_host + str(abs_to_file)
# do copy (or dry run)
retval = subprocess_run(None, [rcp_cmd] + rcp_args.split() + abs_from_files + [abs_to_file], retry=retry, in_dir='_PWD_', dry_run=dry_run, verbose=verbose)
if dry_run:
return retval