Source code for expyre.schedulers.base
import os
from ..subprocess import subprocess_run
from .. import util
[docs]class Scheduler:
def __init__(self, host, remsh_cmd=None):
"""Create Scheduler object. [NEED MORE INFO ABOUT HOW SCRIPTS WILL BE SET UP, THEIR ENVIRONMENT, ETC]
Parameters
----------
host: str
username and host for ssh/rsync username@machine.fqdn
"""
self.host = host
self.hold_command = None
self.release_command = None
self.cancel_command = None
self.remsh_cmd = util.remsh_cmd(remsh_cmd)
[docs] def submit(self, id, remote_dir, partition, commands, max_time, header, node_dict, no_default_header=False,
script_exec="/bin/bash", pre_submit_cmds=[], verbose=False):
raise RuntimeError('Not implemented')
[docs] def status(self, remote_ids, verbose=False):
raise RuntimeError('Not implemented')
[docs] def hold(self, remote_ids, verbose=False):
"""hold remote job
Parameters
----------
remote_ids: str, list(str)
remote ids of jobs to hold
"""
if isinstance(remote_ids, str):
remote_ids = [remote_ids]
subprocess_run(self.host, args=self.hold_command + remote_ids, remsh_cmd=self.remsh_cmd, verbose=verbose)
[docs] def release(self, remote_ids, verbose=False):
"""release remote job
Parameters
----------
remote_ids: str, list(str)
remote ids of jobs to hold
"""
if isinstance(remote_ids, str):
remote_ids = [remote_ids]
subprocess_run(self.host, args=self.release_command + remote_ids, remsh_cmd=self.remsh_cmd, verbose=verbose)
[docs] def cancel(self, remote_ids, verbose=False):
"""cancel remote job
Parameters
----------
remote_ids: str, list(str)
remote ids of jobs to hold
"""
if isinstance(remote_ids, str):
remote_ids = [remote_ids]
subprocess_run(self.host, args=self.cancel_command + remote_ids, remsh_cmd=self.remsh_cmd, verbose=verbose)
[docs] @staticmethod
def unset_scheduler_env_vars(prefix):
unset_cmds = []
if 'WFL_SCHEDULER_IGNORE_ENV' in os.environ:
for v in os.environ:
if v.startswith(prefix + '_'):
unset_cmds += ['unset', f'{v}', '&&']
return unset_cmds
[docs] @staticmethod
def node_dict_env_var_commands(node_dict):
# set env vars for node_dict, with max flexibility in case only some are known at submit time
# EXPYRE_NUM_CORES_PER_NODE is defined by scheduler before these commands are run
pre_commands = []
# either num_nodes or num_cores must be known at submit time, so compute each in terms of the other
if node_dict.get('num_nodes', None) is None:
pre_commands.append('export EXPYRE_NUM_NODES=$(( {num_cores} / $EXPYRE_NUM_CORES_PER_NODE ))')
else:
pre_commands.append('export EXPYRE_NUM_NODES={num_nodes}')
if node_dict.get('num_cores', None) is None:
pre_commands.append('export EXPYRE_NUM_CORES=$(( {num_nodes} * $EXPYRE_NUM_CORES_PER_NODE ))')
else:
pre_commands.append('export EXPYRE_NUM_CORES={num_cores}')
return pre_commands