Source code for expyre.schedulers.slurm

import os
import json
import re

from ..subprocess import subprocess_run
from ..units import time_to_HMS
from .. import util

from .base import Scheduler


[docs]class Slurm(Scheduler): """Create Slurm object Parameters ---------- host: str username and host for ssh/rsync username@machine.fqdn remsh_cmd: str, default EXPYRE_RSH env var or 'ssh' remote shell command to use """ def __init__(self, host, remsh_cmd=None): self.host = host self.hold_command = ['scontrol', 'hold'] self.release_command = ['scontrol', 'release'] self.cancel_command = ['scancel'] self.remsh_cmd = util.remsh_cmd(remsh_cmd)
[docs] def submit(self, id, remote_dir, partition, commands, max_time, header, node_dict, no_default_header=False, script_exec="/bin/bash", pre_submit_cmds=[], verbose=False): """Submit a job on a remote machine Parameters ---------- id: str unique job id (local) remote_dir: str remote directory where files have already been prepared and job will run partition: str partition (or queue or node type) commands: list(str) list of commands to run in script max_time: int time in seconds to run header: list(str) list of header directives, not including walltime specific directive node_dict: dict properties related to node selection. Fields: num_nodes, num_cores, num_cores_per_node, ppn, id, max_time, partition (and its synonum queue) no_default_header: bool, default False do not add normal header fields, only use what's passed in in "header" script_exec: str, default '/bin/bash' executable for first line of job script pre_submit_cmds: list(str), default [] command to run in the remote process that does the submission before the actual submission, e.g. to fix the environment Returns ------- str remote job id """ node_dict = node_dict.copy() node_dict['id'] = id node_dict['max_time'] = time_to_HMS(max_time) node_dict['partition'] = partition node_dict['queue'] = partition header = header.copy() if not no_default_header: header.append('#SBATCH --job-name={id}') header.append('#SBATCH --time={max_time}') header.append('#SBATCH --output=job.{id}.stdout') header.append('#SBATCH --error=job.{id}.stderr') header.extend(json.loads(os.environ.get("EXPYRE_HEADER_EXTRA", "[]"))) # set EXPYRE_NUM_CORES_PER_NODE using scheduler-specific info, to support jobs # that do not know exact node type at submit time. All related quantities # in node_dict are set based on this one by superclass Scheduler static method pre_commands = ['if [ ! -z $SLURM_TASKS_PER_NODE ]; then', ' if echo "${{SLURM_TASKS_PER_NODE}}"| grep -q ","; then', ' echo "Using only first part of hetereogeneous tasks per node spec ${{SLURM_TASKS_PER_NODE}}"', ' fi', ' export EXPYRE_NUM_CORES_PER_NODE=$(echo $SLURM_TASKS_PER_NODE | sed "s/(.*//")', 'else', f' export EXPYRE_NUM_CORES_PER_NODE={node_dict["num_cores_per_node"]}', 'fi' ] + Scheduler.node_dict_env_var_commands(node_dict) pre_commands = [l.format(**node_dict) for l in pre_commands] # add "cd remote_dir" before any other command if remote_dir.startswith('/'): pre_commands.append(f'cd {remote_dir}') else: pre_commands.append(f'cd ${{HOME}}/{remote_dir}') commands = pre_commands + commands script = '#!' + script_exec + '\n' script += '\n'.join([line.rstrip().format(**node_dict) for line in header]) + '\n' script += '\n' + '\n'.join([line.rstrip() for line in commands]) + '\n' submit_args = Scheduler.unset_scheduler_env_vars("SLURM") submit_args += pre_submit_cmds + (['&&'] if len(pre_submit_cmds) > 0 else []) submit_args += ['cd', remote_dir, '&&', 'cat', '>', 'job.script.slurm', '&&', 'sbatch', 'job.script.slurm'] stdout, stderr = subprocess_run(self.host, args=submit_args, script=script, remsh_cmd=self.remsh_cmd, verbose=verbose) # parse stdout for remote job id remote_id = None for line in stdout.splitlines(): m = re.match(r'^Submitted\s+batch\s+job\s+(\S+)', line.strip()) if m is not None: if remote_id is not None: raise RuntimeError('More than one line matches "Submitted batch job "') remote_id = m.group(1) if remote_id is None: raise RuntimeError('No line matches "Submitted batch job "') return remote_id
[docs] def status(self, remote_ids, verbose=False): """determine status of remote jobs Parameters ---------- remote_ids: str, list(str) list of remote ids to check Returns ------- dict { str remote_id: str status}, status is one of : "queued", "held", "running", "done", "failed", "timeout", "other" all remote ids passed in are guaranteed to be keys in dict, specific jobs that are not listed by queueing system have status "done" """ if isinstance(remote_ids, str): remote_ids = [remote_ids] stdout, stderr = subprocess_run(self.host, ['squeue', '--user', '$USER', '--noheader', '-O', 'jobid:20,state:30,reason:200'], remsh_cmd=self.remsh_cmd, verbose=verbose) id_status_reasons = [line.strip().split(maxsplit=2) for line in stdout.splitlines() if line.split()[0] in remote_ids] out = {} for id_status_reason in id_status_reasons: try: (id, status, reason) = id_status_reason except Exception: raise ValueError(f'failed to parse id_status_reason {id_status_reason}') if status in ['RUNNING', 'COMPLETING']: status = 'running' elif status == 'PENDING': if 'held' in reason.lower(): status = 'held' else: status = 'queued' elif status == 'COMPLETED': status = 'done' elif 'fail' in status.lower(): status = 'failed' elif status == 'TIMEOUT': status = 'timeout' else: status = 'other' out[id] = status for id in remote_ids: if id not in out: out[id] = 'done' return out