Source code for expyre.schedulers.pbs

import os
import json
import re

from ..subprocess import subprocess_run
from ..units import time_to_HMS
from .. import util

from .base import Scheduler


[docs]class PBS(Scheduler): """Create PBS object Parameters ---------- host: str username and host for ssh/rsync username@machine.fqdn remsh_cmd: str, default EXPYRE_RSH env var or 'ssh' remote shell command to use """ def __init__(self, host, remsh_cmd=None): self.host = host self.hold_command = ['qhold'] self.release_command = ['qrls'] self.cancel_command = ['qdel'] self.remsh_cmd = util.remsh_cmd(remsh_cmd)
[docs] def submit(self, id, remote_dir, partition, commands, max_time, header, node_dict, no_default_header=False, script_exec="/bin/bash", pre_submit_cmds=[], verbose=False): """Submit a job on a remote machine Parameters ---------- id: str unique job id (local) remote_dir: str remote directory where files have already been prepared and job will run partition: str partition (or queue or node type) commands: list(str) list of commands to run in script max_time: int time in seconds to run header: list(str) list of header directives, not including walltime specific directive node_dict: dict properties related to node selection. Fields: num_nodes, num_cores, num_cores_per_node, ppn, id, max_time, partition (and its synonum queue) no_default_header: bool, default False do not add normal header fields, only use what's passed in in "header" script_exec: str, default '/bin/bash' executable for first line of job script pre_submit_cmds: list(str), default [] command to run in the remote process that does the submission before the actual submission, e.g. to fix the environment Returns ------- str remote job id """ node_dict = node_dict.copy() # Make sure that there are no '=' in id # (e.g. produced by base64.urlsafe encoded argument hashes), # since those are rejected for "#PBS -N" despite the claim in the docs that # any printable non-whitespace character is OK: # http://docs.adaptivecomputing.com/torque/4-0-2/Content/topics/commands/qsub.htm # # WARNING: since this is happening in scheduler-specific code, it will make stdout/stderr # filenames dependent on scheduler, so they cannot be relied on. Would need to move # sanitization outside to prevent this, but it'd have to be superset of everything needed # for every scheduler. node_dict['id'] = id.replace('=', 'EQ') node_dict['max_time'] = time_to_HMS(max_time) node_dict['partition'] = partition node_dict['queue'] = partition header = header.copy() if not no_default_header: # Make sure that first characer is alphabetic # Let's hope there aren't length limitations anymore header.append('#PBS -N N_{id}') header.append('#PBS -l walltime={max_time}') header.append('#PBS -o job.{id}.stdout') header.append('#PBS -e job.{id}.stderr') header.append('#PBS -S /bin/bash') header.append('#PBS -r n') header.extend(json.loads(os.environ.get("EXPYRE_HEADER_EXTRA", "[]"))) # set EXPYRE_NUM_CORES_PER_NODE using scheduler-specific info, to support jobs # that do not know exact node type at submit time. All related quantities # in node_dict are set based on this one by superclass Scheduler static method pre_commands = ['if [ ! -z $PBS_NUM_PPN ]; then', ' export EXPYRE_NUM_CORES_PER_NODE=$PBS_NUM_PPN', 'elif [ ! -z $PBS_NODEFILE ]; then', ' export EXPYRE_NUM_CORES_PER_NODE=$(sort -k1 $PBS_NODEFILE | uniq -c | head -1 | awk \'{{print $1}}\')', 'else', f' export EXPYRE_NUM_CORES_PER_NODE={node_dict["num_cores_per_node"]}', 'fi' ] + Scheduler.node_dict_env_var_commands(node_dict) pre_commands = [l.format(**node_dict) for l in pre_commands] # add "cd remote_dir" before any other command if remote_dir.startswith('/'): pre_commands.append(f'cd {remote_dir}') else: pre_commands.append(f'cd ${{HOME}}/{remote_dir}') commands = pre_commands + commands script = '#!' + script_exec + '\n' script += '\n'.join([line.rstrip().format(**node_dict) for line in header]) + '\n' script += '\n' + '\n'.join([line.rstrip() for line in commands]) + '\n' submit_args = Scheduler.unset_scheduler_env_vars("PBS") submit_args += pre_submit_cmds + (['&&'] if len(pre_submit_cmds) > 0 else []) submit_args += ['cd', remote_dir, '&&', 'cat', '>', 'job.script.pbs', '&&', 'qsub', 'job.script.pbs'] stdout, stderr = subprocess_run(self.host, args=submit_args, script=script, remsh_cmd=self.remsh_cmd, verbose=verbose) # parse stdout for remote job id if len(stdout.splitlines()) != 1: raise RuntimeError('More than one line in qsub output') remote_id = stdout.strip() if len(remote_id) == 0: raise RuntimeError('Empty output from qsub') return remote_id
[docs] def status(self, remote_ids, verbose=False): """determine status of remote jobs Parameters ---------- remote_ids: str, list(str) list of remote ids to check Returns ------- dict { str remote_id: str status}, status is one of : "queued", "held", "running", "done", "failed", "timeout", "other" all remote ids passed in are guaranteed to be keys in dict, specific jobs that are not listed by queueing system have status "done" """ if isinstance(remote_ids, str): remote_ids = [remote_ids] # -w to make fields wide and less likely to truncate jobid # -x for historical data it should never say "Job has finished", but rather use same format for all jobs # -a for all jobs, most useful version that works with -w, hence need for grep USER stdout, stderr = subprocess_run(self.host, ['qstat', '-w', '-x', '-a', '|', 'grep', ' $USER '], remsh_cmd=self.remsh_cmd, verbose=verbose) lines = stdout.splitlines() # parse id and status from format # (id, _user, _queue, _jobname, _sessid, _nds, _tsk, _mem, _timereq, status, _timeelap) = l.strip().split() id_status = [(line.strip().split()[0], line.strip().split()[9]) for line in lines if line.strip().split()[0] in remote_ids] out = {} for id, status in id_status: if status in ['R', 'E']: status = 'running' elif status == 'Q': status = 'queued' elif status == 'H': status = 'held' elif status == 'F': status = 'done' else: status = 'other' out[id] = status for id in remote_ids: if id not in out: out[id] = 'done' return out