Source code for psimpy.simulator.run_simulator

import os
import numpy as np
from concurrent.futures import ProcessPoolExecutor
from beartype.typing import Callable
from beartype import beartype
from typing import Union

[docs]class RunSimulator: @beartype def __init__( self, simulator: Callable, var_inp_parameter: list[str], fix_inp: Union[dict, None] = None, o_parameter: Union[str, None] = None, dir_out: Union[str, None] = None, save_out: bool = False) -> None: """ Serial or parallel execution of a given simulator. Parameters ---------- simulator : Callable A Python function defining the simulator. Its parameters are defined in three parts: ``var_inp_parameter`` defines variable parameters; ``fix_inp`` defines fixed parameters (``fix_inp.keys()``) and their values (``fix_inp.values()``); ``o_parameter`` defines an additional variable which is used to name files saved onto the disk within the function body of ``simulator``. The simulator should return outputs of interest as :class:`numpy.ndarray`. var_inp_parameter : list of str A list consists of all variable input parameters of ``simulator``. fix_inp : dict, optional A dictionary consists of key-value pairs of all fixed input of the ``simulator``. o_parameter : str, optional Keyword parameter of ``simulator`` which is used to name internally saved files (if defined). It is only relevant if the function body of ``simulator`` saves data onto the disk. dir_out : str, optional Directory to save outputs of interest returned by ``simulator``. save_out : bool, optional Whether to save returned values of ``simulator``. If `True`, ``dir_out`` must be given. """ self.simulator = simulator self.var_inp_parameter = var_inp_parameter self.fix_inp = {} if fix_inp is None else fix_inp self.o_parameter = o_parameter if save_out: if dir_out is None: raise ValueError( "dir_out must be defined if save_out is True") elif not os.path.isdir(dir_out): raise ValueError( f"{dir_out} does not exist or is not a directory") self.dir_out = dir_out self.save_out = save_out self.var_samples: np.ndarray = np.array([]) """Samples of variable input parameters.""" self.outputs: list = [] """Simulation outputs."""
[docs] @beartype def serial_run( self, var_samples: np.ndarray, prefixes: Union[list[str], None] = None, append: bool = False) -> None: """ Perform serial execution of ``simulator`` at a set of ``var_samples``. Parameters ---------- var_samples : numpy array Samples of variable input parameters. The first axis (:code:`var_samples.shape[0]`) corresponds to the number of samples. The second axis (:code:`var_samples.shape[1]`) corresponds to the number of variable input parameters. prefixes : list of str, optional Contains :code:`len(var_samples)` prefixes. Each of them is used to name corresponding simulation output file. append : bool, optional If `True`, append ``var_samples`` to existing samples and correspondingly append simulation outputs. """ num_pre_run, num_new_run, kwargs_num_new_run, prefixes = \ self._preprocess(var_samples, prefixes, append) for i in range(num_new_run): kwargs = kwargs_num_new_run[i] try: output = self.simulator(**kwargs) self.outputs.append(output) if self.save_out: fname = f"{prefixes[i]}_output.npy" np.save(os.path.join(self.dir_out, fname), output) except Exception as e: print(f"Exception occurred in simulation {prefixes[i]}." f"The error message is: {str(e)}") self.outputs.append(str(e))
[docs] @beartype def parallel_run( self, var_samples: np.ndarray, prefixes: Union[list[str], None] = None, append: bool = False, max_workers: Union[int, None] = None) -> None: """ Perform parallel execution of ``simulator`` at a set of ``var_samples``. Parameters ---------- var_samples : numpy array Samples of variable input parameters. The first axis (:code:`var_samples.shape[0]`) corresponds to the number of samples. The second axis (:code:`var_samples.shape[1]`) corresponds to the number of variable input parameters. prefixes : list of str, optional Contains :code:`len(var_samples)` prefixes. Each of them is used to name corresponding simulation output file. append : bool, optional If `True`, append ``var_samples`` to existing samples and correspondingly append simulation outputs. max_workers : int, optional Controls the maximum number of tasks running in parallel. Default is the number of CPUs on the host. """ num_pre_run, num_new_run, kwargs_num_new_run, prefixes = \ self._preprocess(var_samples, prefixes, append) with ProcessPoolExecutor(max_workers=max_workers) as executor: futures = [executor.submit(self.simulator, **kwargs) for kwargs in kwargs_num_new_run] for i in range(num_new_run): try: output = futures[i].result() self.outputs.append(output) if self.save_out: fname = f"{prefixes[i]}_output.npy" np.save(os.path.join(self.dir_out, fname), output) except Exception as e: print(f"Exception occurred in simulation {prefixes[i]}." f"The error message is: {str(e)}") self.outputs.append(str(e))
def _preprocess(self, var_samples: np.ndarray, prefixes: list[str], append: bool) -> tuple[int, int, list[dict], list[str]]: """ Preprocess inputs for :meth:`.serial_run` and :meth:`parallel_run`. Parameters ---------- var_samples : numpy array Samples of variable input parameters. The first axis (:code:`var_samples.shape[0]`) corresponds to the number of samples. The second axis (:code:`var_samples.shape[1]`) corresponds to the number of variable input parameters. prefixes : list of str, optional Contains :code:`len(var_samples)` prefixes. Each of them is used to name corresponding simulation output file. append : bool If `True`, append ``var_samples`` to existing samples and correspondingly append simulation outputs. Returns ------- num_pre_run : int Number of existing runs. num_new_run : int Number of new runs. kwargs_num_new_run: list Contains ``num_new_run`` dictionaries, each of which corresponds to ``kwargs`` of one simulation. prefixes : list of str """ if var_samples.ndim == 0 or var_samples.ndim == 1: var_samples = np.reshape(var_samples, (1, -1)) if var_samples.shape[1] != len(self.var_inp_parameter): raise ValueError( "var_samples should have the same number of" " variable inputs as defined by var_input_parameter") num_new_run = len(var_samples) if not append: num_pre_run = 0 self.outputs = [] self.var_samples = var_samples else: num_pre_run = len(self.var_samples) if num_pre_run == 0: raise ValueError( "append is True but no previous var_samples exist") else: self.var_samples = np.vstack((self.var_samples, var_samples)) if prefixes is None: prefixes = [f'sim{i+num_pre_run}' for i in range(num_new_run)] elif len(prefixes) != num_new_run: raise ValueError( "prefixes should have same number of items as var_samples") elif len(set(prefixes)) != num_new_run: raise ValueError("Each element of prefixes should be unique") if self.save_out: new_filenames = \ [f'{prefixes[i]}_output.npy' for i in range(num_new_run)] exist_filenames = os.listdir(self.dir_out) common_filenames = set(new_filenames).intersection(exist_filenames) if len(common_filenames) != 0: raise ValueError( f"{self.dir_out} contains files named with prefixes. " f"Please move or delete the files before starting runs, " f"or provide different prefixes.") kwargs_num_new_run = [] for i in range(num_new_run): var_sample = self.var_samples[i+num_pre_run] kwargs = {self.var_inp_parameter[j]: var_sample[j] for j in range(len(var_sample))} kwargs.update(self.fix_inp) if self.o_parameter is not None: kwargs.update({f'{self.o_parameter}': f'{prefixes[i]}'}) kwargs_num_new_run.append(kwargs) return num_pre_run, num_new_run, kwargs_num_new_run, prefixes