Source code for psimpy.inference.bayes_inference

import sys
import numpy as np
from abc import ABC
from abc import abstractmethod
from typing import Union
from beartype.typing import Callable
from beartype import beartype
from psimpy.sampler import MetropolisHastings
from psimpy.utility import check_bounds

_min_float = 10**(sys.float_info.min_10_exp)
_max_float = 10**(sys.float_info.max_10_exp)
_max_e = np.log(np.array(_max_float, dtype=float))

class BayesInferenceBase(ABC):

    @beartype
    def __init__(
        self,
        ndim: int,
        bounds: Union[np.ndarray, None] = None,
        prior: Union[Callable, None] = None,
        likelihood: Union[Callable, None] = None,
        ln_prior: Union[Callable, None] = None,
        ln_likelihood: Union[Callable, None] = None,
        ln_pxl: Union[Callable, None] = None,
        args_prior: Union[list, None] = None,
        kwgs_prior: Union[dict, None] = None,
        args_likelihood: Union[list, None] = None,
        kwgs_likelihood: Union[dict, None]=None,
        args_ln_pxl: Union[list, None] = None,
        kwgs_ln_pxl: Union[dict, None]=None) -> None:
        """
        Set up basic input for Bayesian inference.
    
        Parameters
        ----------
        ndim : int
            Dimension of parameter ``x``.
        bounds : numpy array
            Upper and lower boundaries of each parameter.
            Shape :code:`(ndim, 2)`. 
            `bounds[:, 0]` corresponds to lower boundaries of each parameter and
            `bounds[:, 1]` to upper boundaries of each parameter. 
        prior : Callable
            Prior probability density function.
            Call with :code:`prior(x, *args_prior, **kwgs_prior)`.
            Return the prior probability density value at ``x``, where ``x`` is
            a one dimension :class:`numpy.ndarray` of shape :code:`(ndim,)`.
        likelihood : Callable
            Likelihood function.
            Call with :code:`likelihood(x, *args_likelihood, **kwgs_likelihood)`.
            Return the likelihood value at ``x``.
        ln_prior : Callable
            Natural logarithm of prior probability density function.
            Call with :code:`ln_prior(x, *args_prior, **kwgs_prior)`.
            Return the value of natural logarithm of prior probability density
            at ``x``.
        ln_likelihood : Callable
            Natural logarithm of likelihood function.
            Call with :code:`ln_likelihood(x, *args_likelihood, **kwgs_likelihood)`.
            Return the value of natural logarithm of likelihood at ``x``.
        ln_pxl : Callable
            Natural logarithm of the product of prior and likelihood.
            Call with :code:`ln_pxl(x, *args_ln_pxl, **kwgs_ln_pxl)`.
            Return the value of natural logarithm of the product of prior and
            likelihood at ``x``.    
        args_prior : list, optional
            Positional arguments for ``prior`` or ``ln_prior``.
        kwgs_prior: dict, optional
            Keyword arguments for ``prior`` or ``ln_prior``.
        args_likelihood : list, optional
            Positional arguments for ``likelihood`` or ``ln_likelihood``.
        kwgs_likelihood : dict, optional
            Keyword arguments for ``likelihood`` or ``ln_likelihood``.
        args_ln_pxl : list, optional
            Positional arguments for ``ln_pxl``.
        kwgs_ln_pxl : dict, optional
            Keyword arguments for ``ln_pxl``.
        """
        self.ndim = ndim            
        self.bounds = bounds

        self.args_prior = () if args_prior is None else args_prior
        self.kwgs_prior = {} if kwgs_prior is None else kwgs_prior

        self.args_likelihood = () if args_likelihood is None else args_likelihood
        self.kwgs_likelihood = {} if kwgs_likelihood is None else kwgs_likelihood

        self.args_ln_pxl = () if args_ln_pxl is None else args_ln_pxl
        self.kwgs_ln_pxl = {} if kwgs_ln_pxl is None else kwgs_ln_pxl

        if ln_pxl is not None:
            self.ln_pxl = ln_pxl
        elif (ln_prior is not None) and (ln_likelihood is not None):
            self.ln_prior = ln_prior
            self.ln_likelihood = ln_likelihood
            self.ln_pxl = self._ln_pxl_1
            self.args_ln_pxl = ()
            self.kwgs_ln_pxl = {}
        elif (prior is not None) and (likelihood is not None):
            self.prior = prior
            self.likelihood = likelihood
            self.ln_pxl = self._ln_pxl_2
            self.args_ln_pxl = ()
            self.kwgs_ln_pxl = {}
        else:
            raise ValueError((
                "One of the following inputs is necessary:"
                " (1) `ln_pxl`"
                " (2) `ln_prior` and `ln_likelihood`"
                " (3) `prior` and `likelihood`"))
    
    def _ln_pxl_1(self, x: np.ndarray) -> float:
        """
        Construct ``ln_pxl``, given ``ln_prior`` and ``ln_likelihood``.
        """
        ln_pxl = self.ln_prior(x, *self.args_prior, **self.kwgs_prior) + \
            self.ln_likelihood(x, *self.args_likelihood, **self.kwgs_likelihood)

        return float(ln_pxl)
    
    def _ln_pxl_2(self, x: np.ndarray) -> float:
        """
        Construct ``ln_pxl``, given ``prior`` and ``likelihood``.
        """
        pxl = self.prior(x, *self.args_prior, **self.kwgs_prior) * \
            self.likelihood(x, *self.args_likelihood, **self.kwgs_likelihood)
        ln_pxl = np.log(np.maximum(pxl, _min_float))
        
        return float(ln_pxl)

    @abstractmethod
    def run(self, *args, **kwgs):
        pass


[docs]class GridEstimation(BayesInferenceBase):
[docs] @beartype def run(self, nbins: Union[int, list[int]] ) -> tuple[np.ndarray, list[np.ndarray]]: """ Use Grid approximation to estimate the posterior. Parameters ---------- nbins : int or list of ints Number of bins for each parameter. If `int`, the same value is used for each parameter. If `list of int`, it should be of length ``ndim``. Returns ------- posterior : numpy array Estimated posterior probability density values at grid points. Shape of :code:`(nbins[0], nbins[1], ..., nbins[ndim])`. x_ndim : list of numpy array Contain ``ndim`` 1d :class:`numpy.ndarray` `x1`, `x2`, ... Each `xi` is a 1d array of length ``nbins[i]``, representing the 1d coordinate array along the i-th axis. """ if isinstance(nbins, int): nbins = [nbins] * self.ndim elif len(nbins) != self.ndim: raise ValueError( "nbins must be an integer or a list of ndim integers") if self.bounds is None: raise ValueError("bounds must be provided for grid estimation") else: check_bounds(self.ndim, self.bounds) steps = (self.bounds[:,1] - self.bounds[:,0]) / np.array(nbins) starts = steps/2 + self.bounds[:,0] stops = self.bounds[:,1] x_ndim = [np.arange(starts[i], stops[i], steps[i]) for i in range(self.ndim)] xx_matrices = np.meshgrid(*x_ndim, indexing='ij') grid_point_coords = np.hstack( tuple(matrix.reshape((-1,1)) for matrix in xx_matrices) ) n = len(grid_point_coords) ln_unnorm_posterior = np.zeros(n) for i in range(n): point_i = grid_point_coords[i,:] ln_unnorm_posterior[i] = \ self.ln_pxl(point_i, *self.args_ln_pxl, **self.kwgs_ln_pxl) ln_unnorm_posterior = ln_unnorm_posterior.reshape(xx_matrices[0].shape) ln_unnorm_posterior = np.where( ln_unnorm_posterior >= _max_e, _max_e, ln_unnorm_posterior) unnorm_posterior = np.exp(ln_unnorm_posterior) posterior = unnorm_posterior / np.sum(unnorm_posterior) / np.prod(steps) return posterior, x_ndim
[docs]class MetropolisHastingsEstimation(BayesInferenceBase):
[docs] def run(self, nsamples: int, mh_sampler: MetropolisHastings) -> tuple[ np.ndarray, np.ndarray]: """ Use Metropolis Hastings sampling to draw samples from the posterior. Parameters ---------- nsamples : int Number of samples to be drawn. mh_sampler : instance of :class:`.MetropolisHastings` Metropolis Hastings sampler. Returns ------- mh_samples : numpy array Samples drawn from the posterior. Shape of :code:`(nsamples, ndim)`. mh_accept : numpy array Shape of :code:`(nsamples,)`. Each element indicates whether the corresponding sample is the proposed new state (value 1) or the old state (value 0). :code:`np.mean(mh_accept)` thus gives the overall acceptance ratio. """ if mh_sampler.ndim != self.ndim: raise RuntimeError( "ndim of mh_sampler and ndim defined in this class must be" " consistent") if type(self.bounds) != type(mh_sampler.bounds): raise RuntimeError( "bounds of mh_sampler and bounds defined in this class must be" " consistent") elif isinstance(self.bounds, np.ndarray) and \ not np.all(np.equal(self.bounds, mh_sampler.bounds)): raise RuntimeError( "bounds of mh_sampler and bounds defined in this class must be" " consistent") mh_sampler.ln_target = self.ln_pxl mh_sampler.args_target = self.args_ln_pxl mh_sampler.kwgs_target = self.kwgs_ln_pxl mh_samples, mh_accept = mh_sampler.sample(nsamples) return mh_samples, mh_accept