import os
import json
import re
from ..subprocess import subprocess_run
from ..units import time_to_HMS
from .. import util
from .base import Scheduler
[docs]class Slurm(Scheduler):
"""Create Slurm object
Parameters
----------
host: str
username and host for ssh/rsync username@machine.fqdn
remsh_cmd: str, default EXPYRE_RSH env var or 'ssh'
remote shell command to use
"""
def __init__(self, host, remsh_cmd=None):
self.host = host
self.hold_command = ['scontrol', 'hold']
self.release_command = ['scontrol', 'release']
self.cancel_command = ['scancel']
self.remsh_cmd = util.remsh_cmd(remsh_cmd)
[docs] def submit(self, id, remote_dir, partition, commands, max_time, header, node_dict, no_default_header=False,
script_exec="/bin/bash", pre_submit_cmds=[], verbose=False):
"""Submit a job on a remote machine
Parameters
----------
id: str
unique job id (local)
remote_dir: str
remote directory where files have already been prepared and job will run
partition: str
partition (or queue or node type)
commands: list(str)
list of commands to run in script
max_time: int
time in seconds to run
header: list(str)
list of header directives, not including walltime specific directive
node_dict: dict
properties related to node selection.
Fields: num_nodes, num_cores, num_cores_per_node, ppn, id, max_time, partition (and its synonum queue)
no_default_header: bool, default False
do not add normal header fields, only use what's passed in in "header"
script_exec: str, default '/bin/bash'
executable for first line of job script
pre_submit_cmds: list(str), default []
command to run in the remote process that does the submission before the actual submission,
e.g. to fix the environment
Returns
-------
str remote job id
"""
node_dict = node_dict.copy()
node_dict['id'] = id
node_dict['max_time'] = time_to_HMS(max_time)
node_dict['partition'] = partition
node_dict['queue'] = partition
header = header.copy()
if not no_default_header:
header.append('#SBATCH --job-name={id}')
header.append('#SBATCH --time={max_time}')
header.append('#SBATCH --output=job.{id}.stdout')
header.append('#SBATCH --error=job.{id}.stderr')
header.extend(json.loads(os.environ.get("EXPYRE_HEADER_EXTRA", "[]")))
# set EXPYRE_NUM_CORES_PER_NODE using scheduler-specific info, to support jobs
# that do not know exact node type at submit time. All related quantities
# in node_dict are set based on this one by superclass Scheduler static method
pre_commands = ['if [ ! -z $SLURM_TASKS_PER_NODE ]; then',
' if echo "${{SLURM_TASKS_PER_NODE}}"| grep -q ","; then',
' echo "Using only first part of hetereogeneous tasks per node spec ${{SLURM_TASKS_PER_NODE}}"',
' fi',
' export EXPYRE_NUM_CORES_PER_NODE=$(echo $SLURM_TASKS_PER_NODE | sed "s/(.*//")',
'else',
f' export EXPYRE_NUM_CORES_PER_NODE={node_dict["num_cores_per_node"]}',
'fi'
] + Scheduler.node_dict_env_var_commands(node_dict)
pre_commands = [l.format(**node_dict) for l in pre_commands]
# add "cd remote_dir" before any other command
if remote_dir.startswith('/'):
pre_commands.append(f'cd {remote_dir}')
else:
pre_commands.append(f'cd ${{HOME}}/{remote_dir}')
commands = pre_commands + commands
script = '#!' + script_exec + '\n'
script += '\n'.join([line.rstrip().format(**node_dict) for line in header]) + '\n'
script += '\n' + '\n'.join([line.rstrip() for line in commands]) + '\n'
submit_args = Scheduler.unset_scheduler_env_vars("SLURM")
submit_args += pre_submit_cmds + (['&&'] if len(pre_submit_cmds) > 0 else [])
submit_args += ['cd', remote_dir, '&&', 'cat', '>', 'job.script.slurm',
'&&', 'sbatch', 'job.script.slurm']
stdout, stderr = subprocess_run(self.host, args=submit_args, script=script, remsh_cmd=self.remsh_cmd, verbose=verbose)
# parse stdout for remote job id
remote_id = None
for line in stdout.splitlines():
m = re.match(r'^Submitted\s+batch\s+job\s+(\S+)', line.strip())
if m is not None:
if remote_id is not None:
raise RuntimeError('More than one line matches "Submitted batch job "')
remote_id = m.group(1)
if remote_id is None:
raise RuntimeError('No line matches "Submitted batch job "')
return remote_id
[docs] def status(self, remote_ids, verbose=False):
"""determine status of remote jobs
Parameters
----------
remote_ids: str, list(str)
list of remote ids to check
Returns
-------
dict { str remote_id: str status}, status is one of :
"queued", "held", "running", "done", "failed", "timeout", "other"
all remote ids passed in are guaranteed to be keys in dict, specific jobs that are
not listed by queueing system have status "done"
"""
if isinstance(remote_ids, str):
remote_ids = [remote_ids]
stdout, stderr = subprocess_run(self.host,
['squeue', '--user', '$USER', '--noheader', '-O', 'jobid:20,state:30,reason:200'],
remsh_cmd=self.remsh_cmd, verbose=verbose)
id_status_reasons = [line.strip().split(maxsplit=2) for line in stdout.splitlines()
if line.split()[0] in remote_ids]
out = {}
for id_status_reason in id_status_reasons:
try:
(id, status, reason) = id_status_reason
except Exception:
raise ValueError(f'failed to parse id_status_reason {id_status_reason}')
if status in ['RUNNING', 'COMPLETING']:
status = 'running'
elif status == 'PENDING':
if 'held' in reason.lower():
status = 'held'
else:
status = 'queued'
elif status == 'COMPLETED':
status = 'done'
elif 'fail' in status.lower():
status = 'failed'
elif status == 'TIMEOUT':
status = 'timeout'
else:
status = 'other'
out[id] = status
for id in remote_ids:
if id not in out:
out[id] = 'done'
return out