import os
import json
import re
from ..subprocess import subprocess_run
from ..units import time_to_HMS
from .. import util
from .base import Scheduler
[docs]class SGE(Scheduler):
"""Create SGE object
Parameters
----------
host: str
username and host for ssh/rsync username@machine.fqdn
remsh_cmd: str, default EXPYRE_RSH env var or 'ssh'
remote shell command to use
"""
def __init__(self, host, remsh_cmd=None):
self.host = host
self.hold_command = ['qhold']
self.release_command = ['qrls']
self.cancel_command = ['qdel']
self.remsh_cmd = util.remsh_cmd(remsh_cmd)
[docs] def submit(self, id, remote_dir, partition, commands, max_time, header, node_dict, no_default_header=False,
script_exec="/bin/bash", pre_submit_cmds=[], verbose=False):
"""Submit a job on a remote machine
Parameters
----------
id: str
unique job id (local)
remote_dir: str
remote directory where files have already been prepared and job will run
partition: str
partition (or queue or node type)
commands: list(str)
list of commands to run in script
max_time: int
time in seconds to run
header: list(str)
list of header directives, not including walltime specific directive
node_dict: dict
properties related to node selection.
Fields: num_nodes, num_cores, num_cores_per_node, ppn, id, max_time, partition (and its synonum queue)
no_default_header: bool, default False
do not add normal header fields, only use what's passed in in "header"
script_exec: str, default '/bin/bash'
executable for first line of job script
pre_submit_cmds: list(str), default []
command to run in the remote process that does the submission before the actual submission,
e.g. to fix the environment
Returns
-------
str remote job id
"""
node_dict = node_dict.copy()
node_dict['id'] = id
node_dict['max_time'] = time_to_HMS(max_time)
node_dict['partition'] = partition
node_dict['queue'] = partition
header = header.copy()
if not no_default_header:
# Make sure that first characer is alphabetic
header.append('#$ -N N_{id}')
header.append('#$ -l h_rt={max_time}')
header.append('#$ -o job.{id}.stdout')
header.append('#$ -e job.{id}.stderr')
header.append('#$ -S /bin/bash')
header.append('#$ -r n')
header.append('#$ -cwd')
header.extend(json.loads(os.environ.get("EXPYRE_HEADER_EXTRA", "[]")))
# set EXPYRE_NUM_CORES_PER_NODE using scheduler-specific info, to support jobs
# that do not know exact node type at submit time. All related quantities
# in node_dict are set based on this one by superclass Scheduler static method
# for now assuming jobs can only run on single node (e.g. on Womble)
pre_commands = ['if [ ! -z $NSLOTS ] && [ ! -z $NHOSTS ]; then',
' export EXPYRE_NUM_CORES_PER_NODE=$(( ${{NSLOTS}} / ${{NHOSTS}} ))',
'else',
f' export EXPYRE_NUM_CORES_PER_NODE={node_dict["num_cores_per_node"]}',
'fi'
] + Scheduler.node_dict_env_var_commands(node_dict)
pre_commands = [l.format(**node_dict) for l in pre_commands]
# add "cd remote_dir" before any other command
if remote_dir.startswith('/'):
pre_commands.append(f'cd {remote_dir}')
else:
pre_commands.append(f'cd ${{HOME}}/{remote_dir}')
commands = pre_commands + commands
script = '#!' + script_exec + '\n'
script += '\n'.join([line.rstrip().format(**node_dict) for line in header]) + '\n'
script += '\n' + '\n'.join([line.rstrip() for line in commands]) + '\n'
submit_args = Scheduler.unset_scheduler_env_vars("SGE")
submit_args += pre_submit_cmds + (['&&'] if len(pre_submit_cmds) > 0 else [])
submit_args += ['cd', remote_dir, '&&', 'cat', '>', 'job.script.sge',
'&&', 'qsub', 'job.script.sge']
stdout, stderr = subprocess_run(self.host, args=submit_args, script=script, remsh_cmd=self.remsh_cmd, verbose=verbose)
# parse stdout for remote job id
if len(stdout.splitlines()) != 1:
raise RuntimeError('More than one line in qsub output')
m = re.match(r'Your\s+job\s+(\d+)\s+\("\S+"\)\s+has\s+been\s+submitted', stdout.strip())
if m is None:
raise RuntimeError('Empty output from qsub')
return m.group(1)
[docs] def status(self, remote_ids, verbose=False):
"""determine status of remote jobs
Parameters
----------
remote_ids: str, list(str)
list of remote ids to check
Returns
-------
dict { str remote_id: str status}, status is one of :
"queued", "held", "running", "done", "failed", "timeout", "other"
all remote ids passed in are guaranteed to be keys in dict, specific jobs that are
not listed by queueing system have status "done"
"""
if isinstance(remote_ids, str):
remote_ids = [remote_ids]
stdout, stderr = subprocess_run(self.host, ['qstat'],
remsh_cmd=self.remsh_cmd, verbose=verbose)
# first two lines are header
lines = stdout.splitlines()[2:]
# parse id and status from format
# (id, _priority, _jobname, _user, status, _sub_or_start_date,
# _sub_or_start_time, [_queue], _nprocs) = l.strip().split()
id_status = [(line.strip().split()[0], line.strip().split()[4]) for line in lines
if line.strip().split()[0] in remote_ids]
out = {}
for id, status in id_status:
if status in ['t', 'r']:
status = 'running'
elif status == 'qw':
status = 'queued'
elif status == 'hqw':
status = 'held'
else:
status = 'other'
out[id] = status
for id in remote_ids:
if id not in out:
out[id] = 'done'
return out