Source code for expyre.resources

import re

from .units import time_to_sec, mem_to_kB


[docs]class Resources: """Resources required for a task, including time, memory, cores/nodes, and particular partitions/queues. Mainly consists of code that selects appropriate partition/queue from the list associated with each System. Parameters ---------- max_time: int, str max time for job in sec (int) or time spec (str) num_nodes: int number of nodes to use, mutually exclusive with num_cores, one is required num_cores: int number of cores to use, mutually exclusive with num_nodes, one is required max_mem_tot: int/str, default None total max mem in kB (int) or memory spec (str), mutually exclusive with max_mem_per_core max_mem_per_core: int/str, default None per-core max mem in kB (int) or memory spec (str), mutually exclusive with max_mem_tot partitions/queues: list(str), default None regexps for types of node that can be used """ def __init__(self, max_time, num_nodes=None, num_cores=None, max_mem_tot=None, max_mem_per_core=None, partitions=None, queues=None): if sum([num_nodes is None, num_cores is None]) != 1: raise ValueError(f"exactly one of num_nodes {num_nodes} and num_cores {num_cores} is required") if sum([max_mem_tot is None, max_mem_per_core is None]) not in [1, 2]: raise ValueError(f"at most one of max_mem_tot {max_mem_tot} and max_mem_per_core {max_mem_per_core} is required") if sum([partitions is None, queues is None]) not in [1, 2]: raise ValueError(f"at most one of partitions {partitions} and queues {queues} is required") if num_nodes is not None and not isinstance(num_nodes, int): raise ValueError(f"got num_nodes {num_nodes} not int") if num_cores is not None and not isinstance(num_cores, int): raise ValueError(f"got num_cores {num_cores} not int") self.max_time = time_to_sec(max_time) self.n = (num_nodes, 'nodes') if num_nodes is not None else (num_cores, 'cores') if max_mem_tot is not None: self.max_mem = (mem_to_kB(max_mem_tot), 'tot') elif max_mem_per_core is not None: self.max_mem = (mem_to_kB(max_mem_per_core), 'per_core') else: self.max_mem = None self.partitions = partitions if isinstance(self.partitions, str): self.partitions = [self.partitions]
[docs] def find_nodes(self, partitions, exact_fit=True, partial_node=False): """find a node type that accommodates requested resources Parameters ---------- partitions: dict properties of available partitions (only used internally by system.py, so "queues" synonymn is not implemented here). exact_fit: bool, default True only return nodes that exactly satisfy the number of cores partial_node: bool, default False allow jobs that take less than one entire node, overrides exact_fit Returns ------- partition: str name of partition selected node_dict: dict various quantities of node * num_nodes: int, total number of nodes needed * num_cores: int, total number of cores needed * num_cores_per_node: int, number of cores per node for selected nodes """ selected_partitions = [] if partial_node: exact_fit=False for partition, node_spec in partitions.items(): num_nodes, num_cores = self._get_num_nodes_num_cores(node_spec) if self.partitions is not None and all([re.search('^'+nt_re+'$', partition) is None for nt_re in self.partitions]): # wrong node type continue if node_spec['max_time'] is not None and self.max_time > node_spec['max_time']: # too much time continue if exact_fit and self.n[1] == 'cores' and self.n[0] % node_spec['num_cores'] != 0: # wrong number of cores continue if self.max_mem is not None and node_spec['max_mem'] is not None: if ((self.max_mem[1] == 'per_core' and (self.max_mem[0] > node_spec['max_mem'] / node_spec['num_cores'])) or (self.max_mem[1] == 'tot' and (self.max_mem[0] > node_spec['max_mem'] * num_nodes))): # too much memory continue selected_partitions.append(partition) if len(selected_partitions) == 0: raise RuntimeError(f'Failed to find acceptable node type ' f'for {self} with exact_fit={exact_fit}') if len(selected_partitions) > 1: excess_cores = [] for nt in selected_partitions: node_spec = partitions[nt] _, num_cores = self._get_num_nodes_num_cores(node_spec) excess_cores.append((node_spec['num_cores'] - num_cores % node_spec['num_cores']) % node_spec['num_cores']) try: # look for first one that matches exactly partition_i = excess_cores.index(0) except ValueError: # pick best among remaining max_extra = min(excess_cores) partition_i = excess_cores.index(max_extra) selected_partitions = [selected_partitions[partition_i]] partition = selected_partitions[0] num_nodes, num_cores = self._get_num_nodes_num_cores(partitions[partition]) if partial_node: if num_cores <= partitions[partition]['num_cores']: # partial node num_cores_per_node = num_cores else: raise ValueError('partial_node only supported when it can be satisfied by 1 node') else: # entire nodes num_cores_per_node = partitions[partition]['num_cores'] num_cores = num_nodes * num_cores_per_node return partition, {'num_nodes': num_nodes, 'num_cores': num_cores, 'num_cores_per_node': num_cores_per_node}
def _get_num_nodes_num_cores(self, node_spec): """ get totals numbers of nodes and cores for this task Parameters ---------- node_spec: dict node type, from partitions dict Returns ------- num_nodes, num_cores: total number of sufficient nodes and cores """ if self.n[1] == 'nodes': # fill up requested # of nodes num_nodes = self.n[0] num_cores = num_nodes * node_spec['num_cores'] elif self.n[1] == 'cores': # determine how many nodes are necessary num_cores = self.n[0] num_nodes = num_cores // node_spec['num_cores'] if num_nodes * node_spec['num_cores'] < num_cores: num_nodes += 1 else: raise ValueError(f'number of unknown quantity {self.n[1]}, not "nodes" or "cores"') return num_nodes, num_cores def __repr__(self): return (f'time={self.max_time} n={self.n} mem={self.max_mem} partitions={self.partitions}')