Source code for besta.io

"""Module dedicated to read and manipulate the output products of BESTA."""
import os
import functools
import re
import importlib.util
import sys
from pathlib import Path

import numpy as np

import psutil
import cosmosis
from cosmosis import Inifile
from cosmosis.datablock import DataBlock, SectionOptions
from astropy.table import Table

from besta import pipeline_modules
from besta.logging import get_logger, setup_logging
from besta.utils import expand_env_vars

logger = get_logger(__name__)


_NUM_RE = re.compile(r"""
    ^[+-]?
    (?:
        (?:\d+(?:\.\d*)?) | (?:\.\d+)
    )
    (?:[eE][+-]?\d+)?$
""", re.VERBOSE)

[docs] def _split_top_level(s: str, *, split_on_whitespace=True) -> list[str]: """Split a string into top-level tokens, respecting quotes and nested parentheses/brackets.""" s = s.strip() if not s: return [] tokens = [] current = [] quote = None escape = False paren_depth = 0 bracket_depth = 0 def flush_current(): token = "".join(current).strip() if token: tokens.append(token) current.clear() for char in s: if quote is not None: current.append(char) if escape: escape = False elif char == "\\": escape = True elif char == quote: quote = None continue if char in {"'", '"'}: quote = char current.append(char) elif char == "(": paren_depth += 1 current.append(char) elif char == ")": paren_depth = max(0, paren_depth - 1) current.append(char) elif char == "[": bracket_depth += 1 current.append(char) elif char == "]": bracket_depth = max(0, bracket_depth - 1) current.append(char) elif paren_depth == 0 and bracket_depth == 0 and ( char == "," or (split_on_whitespace and char.isspace()) ): flush_current() else: current.append(char) flush_current() return tokens
[docs] def _split_tokens(s: str) -> list[str]: """Split a string into tokens, respecting quotes and nested parentheses/brackets.""" return _split_top_level(s, split_on_whitespace=True)
[docs] def _is_wrapped_group(token: str) -> bool: """Check if a token is a wrapped group like (1, 2, 3) or [1, 2, 3].""" token = token.strip() if len(token) < 2: return False pairs = {"(": ")", "[": "]"} opening = token[0] closing = pairs.get(opening) if closing is None or token[-1] != closing: return False quote = None escape = False depth = 0 for index, char in enumerate(token): if quote is not None: if escape: escape = False elif char == "\\": escape = True elif char == quote: quote = None continue if char in {"'", '"'}: quote = char elif char == opening: depth += 1 elif char == closing: depth -= 1 if depth == 0 and index != len(token) - 1: return False return depth == 0 and quote is None
[docs] def _sequence_to_numpy(values, *, force_sequence=False): """Coerce a list of values into a numpy array if all elements are numeric, otherwise keep as list.""" if not values: return [] if len(values) == 1 and not force_sequence: return values[0] if all(isinstance(x, (int, float)) for x in values): dtype = float if any(isinstance(x, float) for x in values) else int return np.array(values, dtype=dtype) return values
def _parse_group(token: str): inner = token[1:-1].strip() if not inner: return [] values = [_parse_token(part) for part in _split_tokens(inner)] return _sequence_to_numpy(values, force_sequence=True) def _parse_scalar(token: str): low = token.lower() if low in {"true", "yes", "on"}: return True if low in {"false", "no", "off"}: return False if low in {"none", "null"}: return "none" if _NUM_RE.match(token): # choose int vs float if any(c in token for c in ".eE"): return float(token) return int(token) # keep as string (unquote if it looks quoted) if (len(token) >= 2) and ((token[0] == token[-1] == "'") or (token[0] == token[-1] == '"')): return token[1:-1] return token def _parse_token(token: str): token = token.strip() if _is_wrapped_group(token): return _parse_group(token) return _parse_scalar(token) def _split_keyword_arg(token: str): quote = None escape = False paren_depth = 0 bracket_depth = 0 for index, char in enumerate(token): if quote is not None: if escape: escape = False elif char == "\\": escape = True elif char == quote: quote = None continue if char in {"'", '"'}: quote = char elif char == "(": paren_depth += 1 elif char == ")": paren_depth = max(0, paren_depth - 1) elif char == "[": bracket_depth += 1 elif char == "]": bracket_depth = max(0, bracket_depth - 1) elif char == "=" and paren_depth == 0 and bracket_depth == 0: key = token[:index].strip() value = token[index + 1 :].strip() if not key: raise ValueError(f"Invalid keyword argument: {token!r}") return key, value return None def _parse_value(value: str): if value == "": return "" tokens = _split_tokens(value) parsed = [_parse_token(token) for token in tokens] return _sequence_to_numpy(parsed)
[docs] def string_to_func_args(text: str): """Parse a string of the form 'arg1, arg2, key=value' into separate args and kwargs. Parameters ---------- text : str Input string containing positional and keyword arguments. Returns ------- args : list List of positional arguments. kwargs : dict Dictionary of keyword arguments. """ args = [] kwargs = {} seen_keyword = False for token in _split_top_level(text, split_on_whitespace=False): key_value = _split_keyword_arg(token) if key_value is None: if seen_keyword: raise ValueError( "Positional arguments cannot appear after keyword arguments." ) args.append(_parse_value(token)) continue seen_keyword = True key, value = key_value kwargs[key] = _parse_value(value) return args, kwargs
def _ini_file_to_dict(path): ini = Inifile(path) ini_dict = {} values = [ini.items(s) for s in ini.sections()] for sec, params in zip(ini.sections(), values): ini_dict[sec] = {} for k, v in params: ini_dict[sec][k] = _parse_value(v) return ini_dict def _ini_string_to_dict(text): ini = Inifile(None) ini.read_string(text) ini_dict = {} values = [ini.items(s) for s in ini.sections()] for sec, params in zip(ini.sections(), values): ini_dict[sec] = {} for k, v in params: ini_dict[sec][k] = _parse_value(v) return ini_dict
[docs] @expand_env_vars() def make_ini_file(filename, config, ignore_sec="values"): """Create a .ini file from an input configuration. Parameters ---------- filename : str Output file name. config : dict Dictionary containing the configuration parameters. """ logger.info("Writing .ini file: %s", filename) with open(filename, "w") as f: f.write(f"; File generated automatically by BESTA\n") for section in config.keys(): # Ignore the Values section if section.lower() == ignore_sec.lower(): continue f.write(f"[{section}]\n") for key, value in config[section].items(): content = f"{key} = " if type(value) is str: content += " " + value elif type(value) is list: content += " ".join([str(v) for v in value]) # elif (type(value) is float) or (type(value) is int): # content += str(value) elif value is None: content += "None" else: content += str(value) f.write(f"{content}\n") f.write(r"; \(゚▽゚)/")
[docs] def make_values_file(config, overwrite=True, values_sec="values"): """Make a values.ini file from the configuration. Parameters ---------- config : dict Configuration parameters """ values_filename = os.path.expandvars(config["pipeline"]["values"]) if os.path.isfile(values_filename): logger.warning( "File containing the .ini priors already exists at %s", values_filename ) if not overwrite: return else: logger.info("Overwriting file") if values_sec in config: logger.info("Writing values file to: %s", values_filename) make_ini_file(values_filename, config[values_sec], ignore_sec=None)
[docs] @expand_env_vars() def read_results_file(path): """Read the results produced during a CosmoSIS run. Parameters ---------- path : str Path to the file containing the cosmosis results Returns ------- table : :class:`astropy.table.Table` Table containing the results. """ with open(path, "r", encoding="utf-8") as f: header = f.readline().strip("#") columns = header.replace("\n", "").split("\t") matrix = np.atleast_2d(np.loadtxt(path)) table = Table() if matrix.size > 1: for ith, c in enumerate(columns): table.add_column(matrix.T[ith], name=c.lower()) return table
[docs] def load_class_from_path(file_path, class_name): """Load a class object from a Python file path at runtime.""" file_path = Path(file_path) # Create module spec spec = importlib.util.spec_from_file_location( file_path.stem, # module name file_path ) module = importlib.util.module_from_spec(spec) sys.modules[file_path.stem] = module spec.loader.exec_module(module) return getattr(module, class_name)
[docs] class Reader(object): r"""CosmoSIS run results reader. This class is meant to improve the accessibility to the results from a CosmoSIS run. The Reader can be initialised either from a .ini conguration file or from the file containing the results of the run (which implicitly includes the configuration information in the header). Currently, there are two factory methods to initialise the ``Reader``. >>> from besta.io import Reader >>> reader = Reader.from_ini_file(path_to_ini_config_file) >>> reader = Reader.from_results_file(path_to_results_file) From here, users may want to load the results of the run into a table: >>> reader.load_results() >>> reader.results_table.info The ``Reader`` also provides tools for extracting some solutions among the full run. For instance, users may be interested on using the solution that maximizes the likelihood (i.e., the best-fit solution): >>> best_fit = reader.get_maxlike_solution(log_prob="post", as_datablock=True) The first argument tells the function to use the row with the highest value of ``"post"`` (the column naming convention depends on the CosmoSIS sampler used). Alternatively, users might be interested in selecting a set of solutions based on their posterior probability or likelihood. For that purpose, the ``Reader`` includes two methods that allow to pull a subset of the solutions: - Users can retrieve a fraction of the solutions with the highest posterior by calling: >>> reader.get_top_frac_solutions(frac=1) which will return the first top percent of all the solutions. - Users can retrieve a fraction of the solutions that accounts for a given fraction of the cumulative posterior. >>> reader.get_pct_solutions(pct=99) which will return the solutions that account for 99% of the total posterior. Besides selecting solutions, the ``Reader`` also provides access to the modules used during the sampling, allowing to recompute the observable quantities or to evaluate the fit: >>> module = reader.last_module >>> flux_model = module.make_observable(best_fit, parse=True) """ @property def ini(self) -> dict: """Cosmosis .ini configuration file""" return self._ini @ini.setter def ini(self, value): """Set the parsed CosmoSIS configuration dictionary.""" self._ini = value @property def ini_file(self) -> str: """Path to the CosmoSIS configuration file.""" return getattr(self, "_ini_file", None) @ini_file.setter def ini_file(self, value): """Set the path to the CosmoSIS configuration file.""" self._ini_file = value @property def ini_values(self) -> dict: """Cosmosis configuration values.""" return self._ini_values @ini_values.setter def ini_values(self, value): """Set the parsed CosmoSIS values dictionary.""" self._ini_values = value @property def values_file(self) -> str: """Path to the CosmoSIS (prior) values configuration file.""" return getattr(self, "_ini_file", None) @values_file.setter def values_file(self, value): """Set the path to the CosmoSIS values file.""" self._values_file = value @property def modules(self) -> list: """List of modules used in the pipeline as specified in the ini file.""" m = self.ini["pipeline"]["modules"] if isinstance(m, str): return [m] return m @property def module_names(self) -> list: """List of module names used in the pipeline.""" return [mod.replace(" ", "").split("_")[0] + "Module" for mod in self.modules]
[docs] def get_module(self, module_name): """Get a pipeline module by name. Parameters ---------- module_name : str Name of the module to retrieve. Returns ------- module : instance An instance of the requested pipeline module. """ if module_name not in self.modules: raise ValueError(f"Module {module_name} not found in the pipeline.") module = load_class_from_path(self.ini[module_name]["file"], "module") logger.debug(f"Loaded module {module_name} from {self.ini[module_name]['file']}") return module(self.ini, alias=module_name)
# if not hasattr(pipeline_modules, module_class): # raise ValueError( # f"Module class {module_class} not found in besta.pipeline_modules.") # return getattr(pipeline_modules, module_class)(options) #TODO: deprecate @property def last_module(self): """An instance of the last pipeline module used in the run.""" return self.get_module(self.modules[-1]) @property def config(self) -> dict: """Pipeline module configuration.""" return self._config @config.setter def config(self, value): """Set the cached pipeline-module configuration.""" self._config = value @property def results_table(self) -> Table: """Table containing the CosmoSIS run results.""" return self._results_table @results_table.setter def results_table(self, value): """Set the table containing CosmoSIS run results.""" self._results_table = value @property def results_file(self) -> str: """Path to the file containing the CosmoSIS run results.""" return self._results_file @results_file.setter def results_file(self, value): """Set the path to the CosmoSIS results file.""" self._results_file = value def __init__(self, ini_file=None, results_file=None): if ini_file is not None: self.ini_file = ini_file self.ini = self.read_ini_file(self.ini_file) elif results_file is not None: self.results_file = results_file self.ini = self.read_ini_file_from_results(self.results_file) else: raise ValueError("Must provide either ini or results file") self.ini_values = self.read_ini_file(self.ini["pipeline"]["values"]) self.ini_values_free = { (sect, key): val for sect, params in self.ini_values.items() for key, val in params.items() if not isinstance(val, (int, float)) } self.ini_values_fixed = { (sect, key): val for sect, params in self.ini_values.items() for key, val in params.items() if (sect, key) not in self.ini_values_free } self.config = {} # setup logging based on the first module in the pipeline (if any) if self.modules: logging_console = self.ini[self.modules[0]].get("logging_console") logging_level = self.ini[self.modules[0]].get("logging_level", "INFO").upper() logging_overwrite = self.ini[self.modules[0]].get("logging_overwrite", False) logging_file = self.ini[self.modules[0]].get("logging_file", None) setup_logging(level=logging_level, log_file=logging_file, overwrite=logging_overwrite, console=logging_console)
[docs] def load_results(self): """Load the cosmosis run results associated to the ``ini`` file.""" path = self.ini["output"]["filename"] if ".txt" not in path: path += ".txt" self.results_table = read_results_file(path)
[docs] def get_maxlike_solution(self, log_prob="post", as_datablock=False, **kwargs): """Get the maximum likelihood solution. Obtain the maximum likelihood solution from the ``results_table`` Parameters ---------- log_prob : str, optional Column name to use for computing the maximum likelihood. Default is ``post``. as_datablock : bool, optional If ``True``, the solution is converted to a :class:`DataBlock`. **kwargs : Additional arguments for converting the solution into a DataBlock. Returns ------- solution : dict Dictionary containing the solution with the maximum likelihood. See also -------- :func:`solution_to_datablock` """ good_sample = np.isfinite(self.results_table[log_prob]) tab = self.results_table[good_sample] maxlike_pos = np.nanargmax(tab[log_prob].value) if as_datablock: return self.solution_to_datablock(tab[maxlike_pos], **kwargs) else: return self.solution_to_dict(tab[maxlike_pos], **kwargs)
[docs] def get_top_frac_solutions(self, frac=1, log_prob="post", as_datablock=False, **kwargs): """Return a fraction of the solutions sorted by their posterior probability. This method provides a given fraction of the solutions with the highest posterior probability. Parameters ---------- frac : float, optional Percentage of the solutions to return, e.g. ``frac=10`` will return the top 10 per cent with the highest probability. log_prob : str, optional Column name to use for computing the maximum likelihood. Default is ``post``. as_datablock : bool, optional If ``True``, the solution is converted to a :class:`DataBlock`. **kwargs : Additional arguments for converting the solution into a DataBlock. Returns ------- all_solutions : list A list containing the solutions. See also -------- :func:`solution_to_datablock` """ assert frac > 0 and frac <= 100, "Fraction must be in (0, 100]" good_sample = np.isfinite(self.results_table[log_prob]) tab = self.results_table[good_sample] post_sort = np.argsort(tab[log_prob]) # Select the top frac per cent first_row = max(1, np.ceil(post_sort.size / 100 * frac)) solutions = tab[post_sort][-first_row:] if as_datablock: all_solutions = [self.solution_to_datablock(sol, **kwargs) for sol in solutions] else: all_solutions = [self.solution_to_dict(sol, **kwargs) for sol in solutions] return all_solutions
[docs] def get_pct_solutions(self, pct=99, log_prob="post", as_datablock=False, **kwargs): """Return the top percentile solutions. This method sorts all solutions based on their posterior probablity and returns a given fraction. Parameters ---------- pct : float, optional Fraction of the solutions to return, e.g. ``pct=99`` will return the top 1 per cent with the highest probability. log_prob : str, optional Column name to use for computing the maximum likelihood. Default is ``post``. as_datablock : bool, optional If ``True``, the solution is converted to a :class:`DataBlock`. **kwargs : Additional arguments for converting the solution into a DataBlock. Returns ------- all_solutions : list A list containing the solutions. See also -------- :func:`solution_to_datablock` """ if not (0 < pct <= 100): raise ValueError("pct must be in (0, 100].") lp = np.asarray(self.results_table[log_prob]) good = np.isfinite(lp) tab = self.results_table[good] if len(tab) == 0: return [] lp = np.asarray(tab[log_prob], dtype=float) # normalized weights lp_shift = lp - np.max(lp) w = np.exp(lp_shift) w_sum = np.sum(w) if not np.isfinite(w_sum) or w_sum <= 0: # fallback: pick the max only idx_sorted = np.array([np.argmax(lp)]) selected = tab[idx_sorted] else: w /= w_sum # Sort by decreasing weight idx_sorted = np.argsort(w)[::-1] w_sorted = w[idx_sorted] cum = np.cumsum(w_sorted) target = pct / 100.0 # first index where cum >= target, inclusive k = int(np.searchsorted(cum, target, side="left")) + 1 selected = tab[idx_sorted[:k]] if as_datablock: return [self.solution_to_datablock(dict(row), **kwargs) if not isinstance(row, dict) else self.solution_to_datablock(row, **kwargs) for row in selected] return [self.solution_to_dict(row) for row in selected]
[docs] def solution_to_dict(self, sample, *, add_fixed=True, extra_params=None): """TODO""" solution = {} for (sect, name) in self.ini_values_free.keys(): solution[f"{sect}--{name}"] = sample[f"{sect}--{name}"] if add_fixed: for (sect, name), v in self.ini_values_fixed.items(): solution[f"{sect}--{name}"] = v if extra_params is not None: for sect, name in extra_params: solution[f"{sect}--{name}"] = sample[f'{sect}--{name}'] return solution
[docs] def solution_to_datablock(self, solution: dict, add_fixed=True, extra_params=None): """Convert a solution into a DataBlock. Parameters ---------- solution : dict-like A dictionary-like containing the parameter values. add_fixed : bool, optional If True, add the default values of the fixed parameters from the ini values config file. Default is True. extra_params : iterable, optional An iterable containing pairs (section, name) of additional parameters contained in ``solution`` to be included in the datablock. Returns ------- datablock : DataBlock The DataBlock containing the input solution. """ datablock = cosmosis.DataBlock() for (sect, name) in self.ini_values_free.keys(): datablock[sect, name] = solution[f'{sect}--{name}'] if add_fixed: for (sect, name), v in self.ini_values_fixed.items(): datablock[sect, name] = v if extra_params is not None: for sect, name in extra_params: datablock[sect, name] = solution[f'{sect}--{name}'] return datablock
[docs] @classmethod @expand_env_vars(1) def read_ini_file(cls, path): """Read the cosmosis configuration .ini file. Parameters ---------- path : str Path to the file. Returns ------- ini : dict Dictionary containing the information from the ini file. """ logger.info("Reading ini file: %s", path) return _ini_file_to_dict(path)
[docs] @classmethod @expand_env_vars(1) def read_ini_file_from_results(cls, path): """Read the embedded CosmoSIS ini block from a results file. Parameters ---------- path : str Path to the results file containing the ini block. Returns ------- dict Parsed ini configuration stored in the results header. """ with open(path, "r") as file: file_lines = file.readlines() line_start, line_end = [ith for ith, f in enumerate(file_lines) if ( "START_OF_PARAMS_INI" in f) or ("END_OF_PARAMS_INI" in f)] content = "".join([l.replace("## ", "") for l in file_lines[line_start + 1:line_end]]) return _ini_string_to_dict(content)
[docs] @classmethod def from_ini_file(cls, path_to_ini): """Create a reader from a CosmoSIS ini file.""" return cls(ini_file=path_to_ini)
[docs] @classmethod def from_results_file(cls, path_to_results): """Create a reader from a CosmoSIS results file.""" return cls(results_file=path_to_results)