wfl.autoparallelize package#
Submodules#
wfl.autoparallelize.autoparainfo module#
- class wfl.autoparallelize.autoparainfo.AutoparaInfo(**kwargs)#
Bases:
object
Object containing information required to autoparallelize a function
- Parameters
num_inputs_per_python_subprocess (int, default 1) – number of inputs passed to each call of the low level operation.
iterable_arg (int / str, default 0) – index (int positional, str keyword) of input iterable argument in low level function
skip_failed (bool, default True) – skip output for failed low level function calls
initializer ((func, func_kwargs), default (None, [])) – initializer to be called when each python subprocess is started
num_python_subprocesses (int, default None) – number of python subprocesses
remote_info (RemoteInfo, default None) – information for running remotely
remote_label (str, default None) – string label to match to keys in remote_info dict
- update_defaults(default_kwargs)#
Starting from object passed by user at runtime, update all unspecified fields to the defaults specified when wrapping function, otherwise to class-wide defaults
wfl.autoparallelize.base module#
- wfl.autoparallelize.base.autoparallelize(func, *args, default_autopara_info={}, **kwargs)#
autoparallelize a function
Use by defining function “op” which takes an input iterable and returns list of configs, and _after_ do
def autoparallelized_op(*args, **kwargs): return autoparallelize(op, *args, default_autopara_info={"autoparallelize_keyword_param_1": val, "autoparallelize_keyword_param_2": val, ... }, **kwargs ) autoparallelized_op.doc = autopara_docstring(op.__doc__, "iterable_contents")
The autoparallelized function can then be called with
parallelized_op(inputs, outputs, [args of op], autopara_info=AutoparaInfo(arg1=val1, ...), [kwargs of op])
If the op takes the argument _autopara_per_item_info a list of dicts with info for each item will be passed, always including item_i. If op takes the argument rng, per-item dict will also include key rng with a numpy.random.Generator with a unique state for each item.
- Parameters
func (function) – function to wrap in _autoparallelize_ll()
*args (list) – positional arguments to func, plus optional first or first and second inputs (iterable) and outputs (OutputSpec) arguments to wrapped function
default_autopara_info (dict, default {}) – dict with default values for AutoparaInfo constructor keywords setting default autoparallelization info
**kwargs (dict) – keyword arguments to func, plus optional inputs (iterable), outputs (OutputSpec), and autopara_info (AutoparaInfo)
- Returns
wrapped_func_out
- Return type
results of calling the function wrapped in autoparallelize via _autoparallelize_ll
- wfl.autoparallelize.base.autoparallelize_docstring(wrapped_func, wrappable_func, input_iterable_type, input_arg=0)#
wfl.autoparallelize.mpipool_support module#
- wfl.autoparallelize.mpipool_support.init(verbose=1)#
Startup code when mpipool will be used. Only master MPI task exists function, others wait to do mpipool stuff. Initialises mpipool_support.wfl_mpipool with created MPIExecutor object.
- Parameters
verbose (int, default 1) –
>= 1 : minimal start/end messages
> 1 : print stack trace at startup, to tell where it was called from
- wfl.autoparallelize.mpipool_support.shutdown_and_barrier(pool, comm)#
wfl.autoparallelize.pool module#
- wfl.autoparallelize.pool.do_in_pool(num_python_subprocesses=None, num_inputs_per_python_subprocess=1, iterable=None, outputspec=None, op=None, iterable_arg=0, skip_failed=True, initializer=(None, []), rng=None, args=[], kwargs={})#
parallelize some operation over an iterable
- Parameters
num_python_subprocesses (int, default os.environ['WFL_NUM_PYTHON_SUBPROCESSES']) – number of processes to parallelize over, 0 for running in serial
num_inputs_per_python_subprocess (int, default 1) – number of items from iterable to pass to kach invocation of operation
iterable (iterable, default None) – iterable to loop over, often ConfigSet but could also be other things like range()
outputspec (OutputSpec, defaulat None) – object containing returned Atoms objects
op (callable) – function to call with each chunk
iterable_arg (itr or str, default 0) – positional argument or keyword argument to place iterable items in when calling op
skip_failed (bool, default True) – skip function calls that return None
initializer ((callable, list), default (None, [])) – function to call at beginning of each thread and its positional args
args (list) – positional arguments to op
kwargs (dict) – keyword arguments to op
- Return type
ConfigSet containing returned configs if outputspec is not None, otherwise None
wfl.autoparallelize.remote module#
- wfl.autoparallelize.remote.do_remotely(autopara_info, iterable=None, outputspec=None, op=None, rng=None, args=[], kwargs={}, quiet=False, wait_for_results=True)#
run tasks as series of remote jobs
- Parameters
autopara_info (AutoparaInfo) – object with all information on autoparallelizing remote job, including autopara_info.remote_info which contains RemoteInfo with information on remote job, including system, resources, job num_inputs_per_python_subprocess, etc, or dict of kwargs for its constructor
quiet (bool, default False) – do not output (to stderr) progress info
args (See autoparallelize.autoparallelize() for other) –
wfl.autoparallelize.remoteinfo module#
- class wfl.autoparallelize.remoteinfo.RemoteInfo(sys_name, job_name, resources, num_inputs_per_queued_job=-100, pre_cmds=[], post_cmds=[], env_vars=[], input_files=[], output_files=[], header_extra=[], exact_fit=True, partial_node=False, timeout=3600, check_interval=30, ignore_failed_jobs=False, resubmit_killed_jobs=False, hash_ignore=[])#
Bases:
object
Create a RemoteInfo object
- Parameters
sys_name (str) – name of system to run on
job_name (str) – name for job (unique within this project)
resources (dict or Resources) – expyre.resources.Resources or kwargs for its constructor
num_inputs_per_queued_job (int, default -100) – num_inputs_per_python_subprocess for each job. If negative will be multiplied by iterable_autopara_wrappable num_inputs_per_python_subprocess
pre_cmds (list(str)) – commands to run before starting job
post_cmds (list(str)) – commands to run after finishing job
env_vars (list(str)) – environment variables to set before starting job
input_files (list(str)) – input_files to stage in starting job
output_files (list(str)) – output_files to stage out when job is done
header_extra (list(str), optional) – extra lines to add to queuing system header
exact_fit (bool, default True) – require exact fit to node size
partial_node (bool, default True) – allow jobs that take less than a whole node, overrides exact_fit
timeout (int) – time to wait in get_results before giving up
check_interval (int) – check_interval arg to pass to get_results
ignore_failed_jobs (bool, default False) – skip failures in remote jobs
resubmit_killed_jobs (bool, default False) – resubmit jobs that were killed without an exit status (out of walltime or crashed), hoping that other parameters such as walltime or memory have been changed to make run complete this time
hash_ignore (list(str), default []) – list of arguments to ignore when doing hash of remote function arguments to determine if it’s already been done
wfl.autoparallelize.utils module#
- wfl.autoparallelize.utils.get_remote_info(remote_info, remote_label, env_var='WFL_EXPYRE_INFO')#
get remote_info dict from passed in dict, label, and/or env. var
- Parameters
remote_info (RemoteInfo, default content of env var WFL_EXPYRE_INFO) – information for running on remote machine. If None, use WFL_EXPYRE_INFO env var, as json/yaml file if string, as RemoteInfo kwargs dict if keys include sys_name, or as dict of RemoteInfo kwrgs with keys that match end of stack trace with function names separated by ‘.’.
remote_label (str, default None) – remote_label to use for operation, to match to remote_info dict keys. If none, use calling routine filename ‘::’ calling function
env_var (str, default "WFL_EXPYRE_INFO") – environment var to get information from if not present in remote_info argument
- Returns
remote_info
- Return type
RemoteInfo or None
- wfl.autoparallelize.utils.grouper(n, iterable)#
iterator that goes over iterable in specified size groups
- Parameters
iterable (any iterable) – iterable to loop over
n (int) – size of group in each returned tuple
- Return type
sequence of tuples, with items from iterable, each of size n (or smaller if n items are not available)
- wfl.autoparallelize.utils.items_inputs_generator(iterable, num_inputs_per_group, rng)#
Returns generator that returns tuples consisting of items, and associated data
- Parameters
iterable (iterable) – input quantities (often of type ase.atoms.Atoms)
num_inputs_per_group (int) – number of inputs that will be included in each group
rng (numpy.random.Generator or None) – rng to generate rngs for each item
- Returns
(NOTE: _ConfigSet_loc is None unless item is ase.atoms.Atoms, rng is None unless rng is provided)
- Return type
generator that returns a sequence of items, each a tuple (item, item_i, item’s _ConfigSet_loc, unique rng)
- wfl.autoparallelize.utils.set_autopara_per_item_info(kwargs, op, inherited_per_item_info, rng_list, item_i_list)#
Set some per-config information
- Parameters
kwargs (dict) – keyword args of op
op (callable) – operation function
inherited_per_item_info (list(dict)) – list of per-item info dicts that needs to be split up to these particular items
rng_list (list(numpy.random.Generator) or list(None)) – rng (unique) for each item
item_i_list (int) – list of sequence numbers for the items that these per-info items correspond to
Module contents#
- class wfl.autoparallelize.AutoparaInfo(**kwargs)#
Bases:
object
Object containing information required to autoparallelize a function
- Parameters
num_inputs_per_python_subprocess (int, default 1) – number of inputs passed to each call of the low level operation.
iterable_arg (int / str, default 0) – index (int positional, str keyword) of input iterable argument in low level function
skip_failed (bool, default True) – skip output for failed low level function calls
initializer ((func, func_kwargs), default (None, [])) – initializer to be called when each python subprocess is started
num_python_subprocesses (int, default None) – number of python subprocesses
remote_info (RemoteInfo, default None) – information for running remotely
remote_label (str, default None) – string label to match to keys in remote_info dict
- update_defaults(default_kwargs)#
Starting from object passed by user at runtime, update all unspecified fields to the defaults specified when wrapping function, otherwise to class-wide defaults
- class wfl.autoparallelize.RemoteInfo(sys_name, job_name, resources, num_inputs_per_queued_job=-100, pre_cmds=[], post_cmds=[], env_vars=[], input_files=[], output_files=[], header_extra=[], exact_fit=True, partial_node=False, timeout=3600, check_interval=30, ignore_failed_jobs=False, resubmit_killed_jobs=False, hash_ignore=[])#
Bases:
object
Create a RemoteInfo object
- Parameters
sys_name (str) – name of system to run on
job_name (str) – name for job (unique within this project)
resources (dict or Resources) – expyre.resources.Resources or kwargs for its constructor
num_inputs_per_queued_job (int, default -100) – num_inputs_per_python_subprocess for each job. If negative will be multiplied by iterable_autopara_wrappable num_inputs_per_python_subprocess
pre_cmds (list(str)) – commands to run before starting job
post_cmds (list(str)) – commands to run after finishing job
env_vars (list(str)) – environment variables to set before starting job
input_files (list(str)) – input_files to stage in starting job
output_files (list(str)) – output_files to stage out when job is done
header_extra (list(str), optional) – extra lines to add to queuing system header
exact_fit (bool, default True) – require exact fit to node size
partial_node (bool, default True) – allow jobs that take less than a whole node, overrides exact_fit
timeout (int) – time to wait in get_results before giving up
check_interval (int) – check_interval arg to pass to get_results
ignore_failed_jobs (bool, default False) – skip failures in remote jobs
resubmit_killed_jobs (bool, default False) – resubmit jobs that were killed without an exit status (out of walltime or crashed), hoping that other parameters such as walltime or memory have been changed to make run complete this time
hash_ignore (list(str), default []) – list of arguments to ignore when doing hash of remote function arguments to determine if it’s already been done
- wfl.autoparallelize.autoparallelize(func, *args, default_autopara_info={}, **kwargs)#
autoparallelize a function
Use by defining function “op” which takes an input iterable and returns list of configs, and _after_ do
def autoparallelized_op(*args, **kwargs): return autoparallelize(op, *args, default_autopara_info={"autoparallelize_keyword_param_1": val, "autoparallelize_keyword_param_2": val, ... }, **kwargs ) autoparallelized_op.doc = autopara_docstring(op.__doc__, "iterable_contents")
The autoparallelized function can then be called with
parallelized_op(inputs, outputs, [args of op], autopara_info=AutoparaInfo(arg1=val1, ...), [kwargs of op])
If the op takes the argument _autopara_per_item_info a list of dicts with info for each item will be passed, always including item_i. If op takes the argument rng, per-item dict will also include key rng with a numpy.random.Generator with a unique state for each item.
- Parameters
func (function) – function to wrap in _autoparallelize_ll()
*args (list) – positional arguments to func, plus optional first or first and second inputs (iterable) and outputs (OutputSpec) arguments to wrapped function
default_autopara_info (dict, default {}) – dict with default values for AutoparaInfo constructor keywords setting default autoparallelization info
**kwargs (dict) – keyword arguments to func, plus optional inputs (iterable), outputs (OutputSpec), and autopara_info (AutoparaInfo)
- Returns
wrapped_func_out
- Return type
results of calling the function wrapped in autoparallelize via _autoparallelize_ll
- wfl.autoparallelize.autoparallelize_docstring(wrapped_func, wrappable_func, input_iterable_type, input_arg=0)#