Source code for besta.pipeline

"""
This module contains the pipeline manager to concatenate multiple modules.
"""

import os
import subprocess
import numpy as np

from matplotlib import pyplot as plt

from cosmosis import DataBlock

from besta import io
from besta import pipeline_modules
from besta.logging import get_logger, setup_logging

logger = get_logger(__name__)

[docs] class MainPipeline(object): """BESTA Pipeline manager. Attributes ---------- pipelines_config : list List of dictionaries containing the configuration parameters for each subpipeline. n_cores_list : list, optional, default=None List containing the number of cores to be used on each run. If None, every subpipeline will use one single core during runtime. ini_files : list List of .ini filenames ini_values_files : list List of files containing the priors associated to ``ini_files``. """ def __init__( self, pipeline_configuration_list, n_cores_list=None, ini_files=None, ini_values_files=None, ): self._parse_logger(pipeline_configuration_list) self.pipelines_config = pipeline_configuration_list if n_cores_list is None: self.n_cores_list = [1] * len(pipeline_configuration_list) else: self.n_cores_list = n_cores_list if ini_files is None: self.ini_files = [ini_files] * len(pipeline_configuration_list) else: self.ini_files = ini_files if ini_values_files is None: self.ini_values_files = [ini_values_files] * len( pipeline_configuration_list ) else: self.ini_values_files = ini_values_files def _parse_logger(self, pipeline_configuration_list): for config in pipeline_configuration_list: # select the first module in the pipeline to configure logging (if any) module = config["pipeline"]["modules"].replace(",", " ").split(" ")[0] logging_console = config[module].get("logging_console", False) logging_level = config[module].get("logging_level", "INFO").upper() logging_overwrite = config[module].get("logging_overwrite", False) logging_file = config[module].get("logging_file", None) setup_logging(level=logging_level, log_file=logging_file, overwrite=logging_overwrite, console=logging_console) break
[docs] def run_command(self, command): """Execute a shell command and return its process exit code.""" logger.info(f"Running command >> {command} <<") return subprocess.call(command, shell=True)
[docs] def execute_pipeline( self, config, n_cores, ini_filename=None, ini_values_filename=None ): """Execute a sub-pipeline. Parameters ---------- config : dict Dictionary containing the configuration parameters for setting up the subpipeline. n_cores : int Number of cores to use during runtime. ini_filename : str, optional, default=None If provided, this file is used to run cosmosis. ini_values_filename : str, optional, default=None If provided, use this file to set the prior values. """ if ini_filename is None: ini_filename = os.path.join( os.path.dirname(config["output"]["filename"]), config["pipeline"]["modules"].replace(" ", "_") + "_auto.ini", ) io.make_ini_file(ini_filename, config) else: assert os.path.isfile(os.path.expandvars(ini_filename) ), f"{os.path.expandvars(ini_filename)} not found" if ini_values_filename is None: io.make_values_file(config) else: assert os.path.isfile( ini_values_filename ), f"{ini_values_filename} not found" config["pipeline"]["values"] = ini_values_filename if n_cores == -1: n_cores = os.cpu_count() logger.info(f"Using all available cores: {n_cores}") if n_cores > 1: command = f"mpiexec -n {n_cores} cosmosis --mpi {ini_filename}" else: command = f"cosmosis {ini_filename}" return_code = self.run_command(command) if return_code == 0: logger.info("Successful run, return code: %s", return_code) return ini_filename else: logger.error("Unsuccessful run, return code: %s", return_code) return None
[docs] def execute_all(self, plot_result=False): """Execute all sub-pipelines.""" logger.info("Executing all pipelines") prev_solution = None for subpipe_config, n_cores, ini_filename, ini_values_filename in zip( self.pipelines_config, self.n_cores_list, self.ini_files, self.ini_values_files, ): if prev_solution is not None: logger.info("Updating configuration file with previous run results") # Update the input values subpipe_config[subpipe_config["pipeline"]["modules"]].update( (k, v) for k, v in prev_solution.items() if k in subpipe_config[subpipe_config["pipeline"]["modules"]] ) # Execute sub-pipepline ini_filename = self.execute_pipeline( subpipe_config, n_cores, ini_filename=ini_filename, ini_values_filename=ini_values_filename, ) if ini_filename is None: logger.error("Pipeline execution failed, stopping.") return 1 # Extract best solution logger.info("Extracting results from the run") reader = io.Reader(ini_filename) reader.load_results() solution = reader.get_maxlike_solution() prev_solution = solution.copy() logger.info("MaxLike solution: %s", solution) if plot_result: solution_datablock = reader.solution_to_datablock( prev_solution) # Initialise the module to reconstruct the solution for par_module in reader.modules: logger.info("Plotting results for module: %s", par_module) pipeline_module = reader.get_module(par_module) figname = subpipe_config["output"].get( "figurename", subpipe_config["output"]["filename"].replace(".txt", "") + f"_{par_module}_best_fit_solution.png", ) pipeline_module.plot_solution(solution_datablock, figname=figname) return 0