Source code for cobi.simulation.cmb

"""
CMB Simulation Module
=====================

This module provides comprehensive functionality for simulating Cosmic Microwave 
Background (CMB) temperature and polarization maps with support for:

- Lensed and unlensed CMB realizations
- Cosmic birefringence effects (isotropic and anisotropic)
- Custom power spectra from CAMB
- Auxiliary scalar fields (for patchy τ models)
- Beam convolution and coordinate transformations

Classes
-------
CMB
    Main class for generating CMB maps with optional birefringence rotation.

Functions
---------
synfast_pol
    Generate polarized CMB maps with optional auxiliary fields.
hp_alm2map_spin
    Fast spin-weighted spherical harmonic transforms.
get_camb_cls
    Compute CMB power spectra using CAMB.

Example
-------
Generate a CMB realization with isotropic birefringence::

    from cobi.simulation import CMB
    
    cmb = CMB(
        nside=512,
        lmax=1500,
        beta=0.35,  # isotropic rotation angle in degrees
        lensing=True,
        libdir='./cmb_sims'
    )
    
    # Get rotated CMB map
    cmb_map = cmb.get_map(idx=0, apply_beta=True)

Notes
-----
The module uses lenspyx for efficient lensing and supports MPI parallelization
for generating large simulation sets.
"""

# General imports
import os
import camb
import numpy as np
import pickle as pl
import healpy as hp
from typing import Dict, Optional, Any, Union, List
import lenspyx
# Local imports
from cobi import mpi
from cobi.utils import Logger, inrad
from cobi.data import CAMB_INI, SPECTRA, ISO_TD_SPECTRA

[docs] def synfast_pol(nside, spectra): """ Generate polarized CMB maps (Q, U) with optional auxiliary field A. Parameters ---------- nside : int HEALPix resolution parameter. spectra : dict Power spectra dictionary with required 'EE' (and optional 'BB', 'AA', 'AE'). Returns ------- (Q_map, U_map) or (Q_map, U_map, A_map) if 'AA' in spectra """ assert 'EE' in spectra, "The 'EE' spectrum must be provided." assert 'AA' in spectra, "The 'AA' spectrum must be provided." lmax = min(len(spectra['EE']) - 1, 3*nside -1) epsilon = 1e-20 EE = spectra['EE'][:lmax+1] BB = spectra.get('BB', np.zeros(lmax+1))[:lmax+1] AE = spectra.get('AE', np.zeros(lmax+1))[:lmax+1] AA = spectra['AA'][:lmax+1] cov = np.empty((lmax+1, 2, 2)) cov[:, 0, 0] = EE cov[:, 0, 1] = AE cov[:, 1, 0] = AE cov[:, 1, 1] = AA nfields = 2 cov += epsilon * np.eye(nfields)[None, :, :] # Cholesky decomposition L_chol = np.empty_like(cov) for l in range(lmax+1): extra_eps = 0.0 while True: try: L_chol[l] = np.linalg.cholesky(cov[l] + extra_eps * np.eye(nfields)) if extra_eps > 0: print(f"Warning: Increased epsilon for l={l} to {extra_eps}") break except np.linalg.LinAlgError: extra_eps = extra_eps * 10 if extra_eps > 0 else epsilon # Allocate alm arrays num_alm = hp.sphtfunc.Alm.getsize(lmax) alm_E = np.zeros(num_alm, dtype=complex) alm_B = np.zeros(num_alm, dtype=complex) alm_A = np.zeros(num_alm, dtype=complex) l_arr, m_arr = hp.Alm.getlm(lmax) # m = 0 m0 = (m_arr == 0) l0 = l_arr[m0] z0 = np.random.normal(0.0, 1.0, size=(m0.sum(), 2)) L0 = L_chol[l0, :, :] a_EA0 = np.einsum('ijk,ik->ij', L0, z0) alm_E[m0] = a_EA0[:, 0] alm_A[m0] = a_EA0[:, 1] # m > 0 mpos = (m_arr > 0) lpos = l_arr[mpos] Npos = mpos.sum() z_real = np.random.normal(0.0, 1.0, size=(Npos, 2)) / np.sqrt(2) z_imag = np.random.normal(0.0, 1.0, size=(Npos, 2)) / np.sqrt(2) L_pos = L_chol[lpos, :, :] a_EA_real = np.einsum('ijk,ik->ij', L_pos, z_real) a_EA_imag = np.einsum('ijk,ik->ij', L_pos, z_imag) alm_E[mpos] = a_EA_real[:, 0] + 1j * a_EA_imag[:, 0] alm_A[mpos] = a_EA_real[:, 1] + 1j * a_EA_imag[:, 1] # B-modes b0 = np.random.normal(0.0, 1.0, size=m0.sum()) alm_B[m0] = b0 * np.sqrt(BB[l_arr[m0]]) b_real = np.random.normal(0.0, 1.0, size=Npos) / np.sqrt(2) b_imag = np.random.normal(0.0, 1.0, size=Npos) / np.sqrt(2) alm_B[mpos] = (b_real + 1j * b_imag) * np.sqrt(BB[l_arr[mpos]]) # Maps Q_map, U_map = hp.alm2map_spin([alm_E, alm_B], nside, spin=2, lmax=lmax) A_map = hp.alm2map(alm_A, nside, lmax=lmax) # Always apply birefringence-like rotation Q_rot = Q_map * np.cos(2 * A_map) - U_map * np.sin(2 * A_map) U_rot = Q_map * np.sin(2 * A_map) + U_map * np.cos(2 * A_map) Q_map, U_map = Q_rot, U_rot return Q_map, U_map, A_map
[docs] class CMB: """ CMB map generator with cosmic birefringence support. This class handles the generation of CMB temperature and polarization maps with optional cosmic birefringence rotation. Supports three birefringence models: - **iso**: Isotropic (constant) rotation angle β - **iso_td**: Time-dependent isotropic model with axion mass parameter - **aniso**: Anisotropic (spatially-varying) rotation from patchy reionization Parameters ---------- libdir : str Directory for caching simulation products and power spectra. nside : int HEALPix resolution parameter (nside = 2^n). model : {'iso', 'iso_td', 'aniso'}, default='iso' Birefringence model type. beta : float, optional Isotropic rotation angle in degrees (required for model='iso'). mass : float, optional Axion mass parameter in units of 10^-22 eV. Must be 1, 1.5, or 6.4 (required for model='iso_td'). Acb : float, optional Amplitude of anisotropic birefringence power spectrum (required for model='aniso'). lensing : bool, default=False Whether to include CMB lensing effects. Acb_sim_config : dict, optional Configuration for anisotropic simulation batches with keys: - 'alpha_vary_index': [start, end] indices for varying α - 'alpha_cons_index': [start, end] indices for constant α - 'null_alpha_index': [start, end] indices for null test (α=0) verbose : bool, default=True Enable logging output. Attributes ---------- nside : int HEALPix resolution. lmax : int Maximum multipole (3*nside - 1). model : str Birefringence model type. beta : float or None Isotropic rotation angle in degrees. lensing : bool Whether lensing is enabled. cls : dict CMB power spectra (TT, EE, BB, TE). Examples -------- Generate isotropic birefringence simulation:: cmb = CMB( libdir='./cmb_sims', nside=512, model='iso', beta=0.35, lensing=True ) # Get rotated CMB map cmb_map = cmb.get_map(idx=0, apply_beta=True) Generate anisotropic birefringence:: cmb = CMB( libdir='./cmb_sims', nside=512, model='aniso', Acb=1e-6, lensing=True ) # Get map with spatially-varying rotation cmb_map = cmb.get_map(idx=0, apply_beta=True) Notes ----- - Uses CAMB for power spectrum generation - Supports efficient caching of realizations - MPI-aware for parallel simulation generation - Lensing implemented via lenspyx """
[docs] def __init__( self, libdir: str, nside: int, model: str = "iso", beta: Optional[float]=None, mass: Optional[float]=None, Acb: Optional[float]=None, lensing: Optional[bool] = False, Acb_sim_config: Optional[Dict[str, List[int]]] = None, verbose: Optional[bool] = True, ): self.logger = Logger(self.__class__.__name__, verbose=verbose if verbose is not None else False) self.basedir = libdir self.nside = nside self.lmax = 3 * nside - 1 self.__set_power__() self.beta = beta self.mass = mass self.Acb = Acb assert model in ["iso", "iso_td","aniso"], "model should be 'iso' or 'aniso'" self.model = model if self.model == "aniso": self.logger.log("Anisotropic cosmic birefringence model selected", level="info") assert self.Acb is not None, "Acb should be provided for anisotropic model" if self.model == "iso": self.logger.log("Isotropic(constant) cosmic birefringence model selected", level="info") assert self.beta is not None, "beta should be provided for isotropic model" if self.model == "iso_td": assert self.mass in [1,1.5,6.4], "mass should be 1, 1.59 or 6.36" self.logger.log("Isotropic(time dep.) cosmic birefringence model selected", level="info") self.lensing = lensing self.Acb_sim_config = Acb_sim_config self.__validate_Acb_sim_config__() self.__set_workspace__() self.__set_seeds__() self.verbose = verbose if verbose is not None else False
[docs] def scale_invariant(self, Acb): ells = np.arange(self.lmax + 1) cl = Acb * 2 * np.pi / ( ells**2 + ells + 1e-30) cl[0], cl[1] = 0, 0 return cl
[docs] def __validate_Acb_sim_config__(self) -> None: """ Validate the Acb_sim_config dictionary structure. """ if self.Acb_sim_config is None: return if self.model != "aniso": self.logger.log("Acb_sim_config is only applicable for anisotropic model. Ignoring.", level="warning") self.Acb_sim_config = None return valid_keys = ['alpha_vary_index', 'alpha_cons_index', 'null_alpha_index'] for key in self.Acb_sim_config.keys(): if key not in valid_keys: raise ValueError(f"Invalid key '{key}' in Acb_sim_config. Valid keys are: {valid_keys}") if not isinstance(self.Acb_sim_config[key], (list, tuple)) or len(self.Acb_sim_config[key]) != 2: raise ValueError(f"Value for '{key}' must be a list/tuple of two integers [start, end]") if self.Acb_sim_config[key][0] >= self.Acb_sim_config[key][1]: raise ValueError(f"Invalid range for '{key}': start must be less than end") self.logger.log(f"Acb simulation configuration validated: {self.Acb_sim_config}", level="info")
[docs] def __get_alpha_mode__(self, idx: int) -> str: """ Determine which alpha mode to use for a given index. Parameters: idx (int): The realization index. Returns: str: One of 'vary', 'constant', or 'null' """ if self.Acb_sim_config is None: return 'vary' # Default behavior for key, (start, end) in self.Acb_sim_config.items(): if start <= idx < end: if key == 'alpha_vary_index': return 'vary' elif key == 'alpha_cons_index': return 'constant' elif key == 'null_alpha_index': return 'null' # If idx is not in any configured range, use default return 'vary'
[docs] def __set_seeds__(self) -> None: """ Sets the seeds for the simulation. """ nos = 700 self.__cseeds__ = np.arange(11111,11111+nos, dtype=int) self.__aseeds__ = np.arange(22222,22222+nos, dtype=int) self.__pseeds__ = np.arange(33333,33333+nos, dtype=int)
[docs] def __set_workspace__(self) -> None: """ Set the workspace for the CMB simulations. """ lens = "lensed" if self.lensing else "gaussian" if self.model == "iso": model = f"iso_beta_{str(self.beta).replace('.','p')}" elif self.model == "iso_td": model = f"iso_mass_{str(self.mass).replace('.','p')}" elif self.model == "aniso": model = f"aniso_alpha_{str(self.Acb)}" else: raise ValueError("Model not implemented yet, only 'iso', 'iso_td', and 'aniso' are supported") self.cmbdir = os.path.join(self.basedir, 'CMB', lens, model, 'cmb') os.makedirs(self.cmbdir, exist_ok=True) if self.lensing: self.phidir = os.path.join(self.basedir, 'CMB', lens, model, 'phi') os.makedirs(self.phidir, exist_ok=True) if self.model == "aniso": self.alphadir = os.path.join(self.basedir, 'CMB', lens, model, 'alpha') os.makedirs(self.alphadir, exist_ok=True)
[docs] def __dl2cl__(self, arr: np.ndarray,unit_only=False) -> np.ndarray: """ Convert Dl to Cl. """ tcmb = 2.7255e6 l = np.arange(len(arr)) dl = l * (l + 1) / (2 * np.pi) if unit_only: return arr * tcmb ** 2 else: arr = arr * tcmb ** 2 / (dl + 1e-30) arr[0] = 0 arr[1] = 0 return arr
[docs] def __td_eb__(self,dl=True) -> np.ndarray: """ Read the E and B mode power spectra from the CMB power spectra data. """ ISO_TD_SPECTRA.directory = self.basedir m = str(self.mass).replace('.','p') spectra = ISO_TD_SPECTRA.data[m] eb = np.zeros(self.lmax + 1) eb[2:] = spectra[:self.lmax + 1 - 2, 5] return self.__dl2cl__(eb,unit_only= dl) # type: ignore
def __td_tb__(self,dl=True) -> np.ndarray: ISO_TD_SPECTRA.directory = self.basedir m = str(self.mass).replace('.', 'p') spectra = ISO_TD_SPECTRA.data[m] tb = np.zeros(self.lmax + 1) tb[2:] = spectra[:self.lmax + 1 - 2, 6] return self.__dl2cl__(tb,unit_only= dl)
[docs] def compute_powers(self) -> Dict[str, Any]: """ compute the CMB power spectra using CAMB. """ CAMB_INI.directory = self.basedir params = CAMB_INI.data results = camb.get_results(params) powers = {} powers["cls"] = results.get_cmb_power_spectra( params, CMB_unit="muK", raw_cl=True ) powers["dls"] = results.get_cmb_power_spectra( params, CMB_unit="muK", raw_cl=False ) if mpi.rank == 0: pl.dump(powers, open(self.__spectra_file__, "wb")) mpi.barrier() return powers
[docs] def get_power(self, dl: bool = True) -> Dict[str, np.ndarray]: """ Get the CMB power spectra. Parameters: dl (bool): If True, return the power spectra with dl factor else without dl factor. Returns: Dict[str, np.ndarray]: A dictionary containing the CMB power spectra. """ return self.powers["dls"] if dl else self.powers["cls"]
def __set_power__(self) -> None: SPECTRA.directory = self.basedir self.__spectra_file__ = SPECTRA.fname if os.path.isfile(self.__spectra_file__): self.logger.log("Loading CMB power spectra from file", level="info") self.powers = pl.load(open(self.__spectra_file__, "rb")) else: self.powers = SPECTRA.data lmax_infile = len(self.powers['cls']['lensed_scalar'][:, 0]) if lmax_infile < self.lmax: self.logger.log("CMB power spectra file does not contain enough data", level="warning") self.logger.log("Computing CMB power spectra", level="info") self.powers = self.compute_powers() #TODO: feed the lmax to the compute_powers method self.logger.log("CMB power spectra computed doesn't guarantee the lmax", level="critical") else: self.logger.log("CMB power spectra file is up-to-date", level="info") ################ Power spectra retrieval methods ###############
[docs] def get_unlensed_spectra( self, dl: bool = True, dtype: str = "d" ) -> Union[Dict[str, Any], np.ndarray]: """ Retrieve the unlensed scalar spectra from the power spectrum data. Parameters: dl (bool, optional): If True, returns Dl (C_l * l * (l + 1) / 2π). Defaults to True. dtype (str, optional): Specifies the format of the returned spectra. - 'd' returns a dictionary with keys 'tt', 'ee', 'bb', 'te'. - 'a' returns the full array of power spectra. Defaults to 'd'. Returns: Union[Dict[str, Any], np.ndarray]: - A dictionary containing individual power spectra for 'tt', 'ee', 'bb', 'te' if dtype is 'd'. - The full array of unlensed scalar power spectra if dtype is 'a'. Raises: ValueError: If `dtype` is not 'd' or 'a'. """ powers = self.get_power(dl)["unlensed_scalar"] if dtype == "d": pow = {} pow["tt"] = powers[:, 0] pow["ee"] = powers[:, 1] pow["bb"] = powers[:, 2] pow["te"] = powers[:, 3] return pow elif dtype == "a": return powers else: raise ValueError("dtype should be 'd' or 'a'")
[docs] def get_lensed_spectra( self, dl: bool = True, dtype: str = "d" ) -> Union[Dict[str, Any], np.ndarray]: """ Retrieve the lensed scalar spectra from the power spectrum data. Parameters: dl (bool, optional): If True, returns Dl (C_l * l * (l + 1) / 2π). Defaults to True. dtype (str, optional): Specifies the format of the returned spectra. - 'd' returns a dictionary with keys 'tt', 'ee', 'bb', 'te'. - 'a' returns the full array of power spectra. Defaults to 'd'. Returns: Union[Dict[str, Any], np.ndarray]: - A dictionary containing individual power spectra for 'tt', 'ee', 'bb', 'te' if dtype is 'd'. - The full array of lensed scalar power spectra if dtype is 'a'. Raises: ValueError: If `dtype` is not 'd' or 'a'. """ powers = self.get_power(dl)["lensed_scalar"] if dtype == "d": pow = {} pow["tt"] = powers[:, 0] pow["ee"] = powers[:, 1] pow["bb"] = powers[:, 2] pow["te"] = powers[:, 3] return pow elif dtype == "a": return powers else: raise ValueError("dtype should be 'd' or 'a'")
[docs] def get_cb_unlensed_spectra( self, beta: float = 0.0, dl: bool = True, dtype: str = "d", new: bool = False ) -> Union[Dict[str, np.ndarray], np.ndarray]: """ Calculate the cosmic birefringence (CB) unlensed spectra with a given rotation angle `beta`. Parameters: beta (float, optional): The rotation angle in degrees for the cosmic birefringence effect. Defaults to 0.3 degrees. dl (bool, optional): If True, returns Dl (C_l * l * (l + 1) / 2π). Defaults to True. dtype (str, optional): Specifies the format of the returned spectra. - 'd' returns a dictionary with keys 'tt', 'ee', 'bb', 'te', 'eb', 'tb'. - 'a' returns an array of power spectra. Defaults to 'd'. new (bool, optional): Determines the ordering of the spectra in the array if dtype is 'a'. If True, returns [TT, EE, BB, TE, EB, TB]. If False, returns [TT, TE, TB, EE, EB, BB]. Defaults to False. Returns: Union[Dict[str, np.ndarray], np.ndarray]: - A dictionary containing individual CB unlensed power spectra for 'tt', 'ee', 'bb', 'te', 'eb', 'tb' if dtype is 'd'. - An array of CB unlensed power spectra with ordering determined by the `new` parameter if dtype is 'a'. Raises: ValueError: If `dtype` is not 'd' or 'a'. Notes: The method applies a rotation by `alpha` degrees to the E and B mode spectra to account for cosmic birefringence. """ powers = self.get_unlensed_spectra(dl=dl) pow = {} pow["tt"] = powers["tt"] pow["te"] = powers["te"] * np.cos(2 * inrad(beta)) pow["ee"] = (powers["ee"] * np.cos(inrad(2 * beta)) ** 2) + (powers["bb"] * np.sin(inrad(2 * beta)) ** 2) pow["bb"] = (powers["ee"] * np.sin(inrad(2 * beta)) ** 2) + (powers["bb"] * np.cos(inrad(2 * beta)) ** 2) pow["eb"] = 0.5 * (powers["ee"] - powers["bb"]) * np.sin(inrad(4 * beta)) pow["tb"] = powers["te"] * np.sin(2 * inrad(beta)) if dtype == "d": return pow elif dtype == "a": if new: # TT, EE, BB, TE, EB, TB return np.array( [pow["tt"], pow["ee"], pow["bb"], pow["te"], pow["eb"], pow["tb"]] ) else: # TT, TE, TB, EE, EB, BB return np.array( [pow["tt"], pow["te"], pow["tb"], pow["ee"], pow["eb"], pow["bb"]] ) else: raise ValueError("dtype should be 'd' or 'a'")
[docs] def get_cb_unlensed_mass_spectra( self, dl: bool = True, dtype: str = "d", new: bool = False ) -> Union[Dict[str, np.ndarray], np.ndarray]: powers = self.get_unlensed_spectra(dl=dl) pow = {} pow["tt"] = powers["tt"] pow["te"] = powers["te"] pow["ee"] = powers["ee"] pow["bb"] = powers["bb"] internal_len = len(powers["tt"]) _eb_ = self.__td_eb__(dl=dl) _tb_ = self.__td_tb__(dl=dl) eb = np.zeros(len(powers["tt"])) tb = np.zeros(len(powers["tt"])) eb[2:len(_eb_)] = _eb_[2:] tb[2:len(_eb_)] = _tb_[2:] pow["eb"] = eb pow["tb"] = tb if dtype == "d": return pow elif dtype == "a": if new: # TT, EE, BB, TE, EB, TB return np.array( [pow["tt"], pow["ee"], pow["bb"], pow["te"], pow["eb"], pow["tb"]] ) else: # TT, TE, TB, EE, EB, BB return np.array( [pow["tt"], pow["te"], pow["tb"], pow["ee"], pow["eb"], pow["bb"]] ) else: raise ValueError("dtype should be 'd' or 'a'")
[docs] def get_cb_lensed_spectra( self, beta: float = 0.0, dl: bool = True, dtype: str = "d", new: bool = False ) -> Union[Dict[str, np.ndarray], np.ndarray]: """ Calculate the cosmic birefringence (CB) lensed spectra with a given rotation angle `beta`. Parameters: beta (float, optional): The rotation angle in degrees for the cosmic birefringence effect. Defaults to 0.3 degrees. dl (bool, optional): If True, returns Dl (C_l * l * (l + 1) / 2π). Defaults to True. dtype (str, optional): Specifies the format of the returned spectra. - 'd' returns a dictionary with keys 'tt', 'ee', 'bb', 'te', 'eb', 'tb'. - 'a' returns an array of power spectra. Defaults to 'd'. new (bool, optional): Determines the ordering of the spectra in the array if dtype is 'a'. If True, returns [TT, EE, BB, TE, EB, TB]. If False, returns [TT, TE, TB, EE, EB, BB]. Defaults to False. Returns: Union[Dict[str, np.ndarray], np.ndarray]: - A dictionary containing individual CB lensed power spectra for 'tt', 'ee', 'bb', 'te', 'eb', 'tb' if dtype is 'd'. - An array of CB lensed power spectra with ordering determined by the `new` parameter if dtype is 'a'. Raises: ValueError: If `dtype` is not 'd' or 'a'. Notes: The method applies a rotation by `alpha` degrees to the E and B mode spectra to account for cosmic birefringence. """ powers = self.get_lensed_spectra(dl=dl) if beta == 0.0: if self.beta is None: pass else: beta = self.beta pow = {} pow["tt"] = powers["tt"] pow["te"] = powers["te"] * np.cos(2 * inrad(beta)) # type: ignore pow["ee"] = (powers["ee"] * np.cos(inrad(2 * beta)) ** 2) + (powers["bb"] * np.sin(inrad(2 * beta)) ** 2) # type: ignore pow["bb"] = (powers["ee"] * np.sin(inrad(2 * beta)) ** 2) + (powers["bb"] * np.cos(inrad(2 * beta)) ** 2) # type: ignore pow["eb"] = 0.5 * (powers["ee"] - powers["bb"]) * np.sin(inrad(4 * beta)) # type: ignore pow["tb"] = powers["te"] * np.sin(2 * inrad(beta)) # type: ignore if dtype == "d": return pow elif dtype == "a": if new: # TT, EE, BB, TE, EB, TB return np.array( [pow["tt"], pow["ee"], pow["bb"], pow["te"], pow["eb"], pow["tb"]] ) else: # TT, TE, TB, EE, EB, BB return np.array( [pow["tt"], pow["te"], pow["tb"], pow["ee"], pow["eb"], pow["bb"]] ) else: raise ValueError("dtype should be 'd' or 'a'")
[docs] def get_cb_lensed_mass_spectra( self, dl: bool = True, dtype: str = "d", new: bool = False ) -> Union[Dict[str, np.ndarray], np.ndarray]: powers = self.get_lensed_spectra(dl=dl) pow = {} pow["tt"] = powers["tt"] pow["te"] = powers["te"] pow["ee"] = powers["ee"] pow["bb"] = powers["bb"] internal_len = len(powers["tt"]) _eb_ = self.__td_eb__(dl=dl) _tb_ = self.__td_tb__(dl=dl) eb = np.zeros(len(powers["tt"])) tb = np.zeros(len(powers["tt"])) eb[2:len(_eb_)] = _eb_[2:] tb[2:len(_eb_)] = _tb_[2:] pow["eb"] = eb pow["tb"] = tb if dtype == "d": return pow elif dtype == "a": if new: # TT, EE, BB, TE, EB, TB return np.array( [pow["tt"], pow["ee"], pow["bb"], pow["te"], pow["eb"], pow["tb"]] ) else: # TT, TE, TB, EE, EB, BB return np.array( [pow["tt"], pow["te"], pow["tb"], pow["ee"], pow["eb"], pow["bb"]] ) else: raise ValueError("dtype should be 'd' or 'a'")
[docs] def cl_aa(self): """ Compute the Cl_AA power spectrum for the anisotropic model. """ assert self.Acb is not None, "Acb should be provided for anisotropic model" return self.scale_invariant(self.Acb)
[docs] def cl_pp(self): powers = self.get_power(dl=False)['lens_potential'] return powers[:, 0]
################ CMB map generation methods ############### ###### Isotropic models ###### ###### Real space lensed ######
[docs] def get_iso_const_cb_real_lensed_QU(self, idx: int) -> List[np.ndarray]: fname = os.path.join( self.cmbdir, f"sims_nside{self.nside}_{idx:03d}.fits", ) if os.path.isfile(fname): return hp.read_map(fname, field=[0, 1]) else: spectra = self.get_cb_unlensed_spectra( beta=self.beta if self.beta is not None else 0.0, dl=False, ) alms = hp.synalm( [spectra["tt"], spectra["ee"], spectra["bb"], spectra["te"], spectra["eb"], spectra["tb"]], lmax=self.lmax, new=True, ) defl = self.grad_phi_alm(idx) geom_info = ('healpix', {'nside':self.nside}) Qlen, Ulen = lenspyx.alm2lenmap_spin([alms[1],alms[2]], defl, 2, geometry=geom_info, verbose=int(self.verbose)) hp.write_map(fname, [Qlen, Ulen], dtype=np.float64) return [Qlen, Ulen]
[docs] def get_iso_td_cb_real_lensed_QU(self, idx: int) -> List[np.ndarray]: fname = os.path.join( self.cmbdir, f"sims_nside{self.nside}_{idx:03d}.fits", ) if os.path.isfile(fname): return hp.read_map(fname, field=[0, 1]) else: spectra = self.get_cb_unlensed_mass_spectra( dl=False, ) alms = hp.synalm( [spectra["tt"], spectra["ee"], spectra["bb"], spectra["te"], spectra["eb"], spectra["tb"]], lmax=self.lmax, new=True, ) defl = self.grad_phi_alm(idx) geom_info = ('healpix', {'nside':self.nside}) Qlen, Ulen = lenspyx.alm2lenmap_spin([alms[1],alms[2]], defl, 2, geometry=geom_info, verbose=int(self.verbose)) hp.write_map(fname, [Qlen, Ulen], dtype=np.float64) return [Qlen, Ulen]
[docs] def get_iso_model_cb_real_lensed_QU(self,idx: int) -> List[np.ndarray]: if self.model == "iso": return self.get_iso_const_cb_real_lensed_QU(idx) elif self.model == "iso_td": return self.get_iso_td_cb_real_lensed_QU(idx) else: raise NotImplementedError("Model not implemented yet")
################ CMB map generation methods ############### ###### Isotropic models ###### ###### Gaussian lensed ######
[docs] def get_iso_const_cb_gaussian_lensed_QU(self, idx: int) -> List[np.ndarray]: """ Generate or retrieve the Q and U Stokes parameters after applying cosmic birefringence. Parameters: idx (int): Index for the realization of the CMB map. Returns: List[np.ndarray]: A list containing the Q and U Stokes parameter maps as NumPy arrays. Notes: The method applies a rotation to the E and B mode spherical harmonics to simulate the effect of cosmic birefringence. If the map for the given `idx` exists in the specified directory, it reads the map from the file. Otherwise, it generates the Q and U maps, applies the birefringence, and saves the resulting map to a FITS file. """ fname = os.path.join( self.cmbdir, f"sims_nside{self.nside}_{idx:03d}.fits", ) if os.path.isfile(fname): return hp.read_map(fname, field=[0, 1]) # type: ignore else: spectra = self.get_cb_lensed_spectra( beta=self.beta if self.beta is not None else 0.0, dl=False, ) # PDP: spectra start at ell=0, we are fine np.random.seed(self.__cseeds__[idx]) T, E, B = hp.synalm( [spectra["tt"], spectra["ee"], spectra["bb"], spectra["te"], spectra["eb"], spectra["tb"]], lmax=self.lmax, new=True, ) del T QU = hp.alm2map_spin([E, B], self.nside, 2, lmax=self.lmax) hp.write_map(fname, QU, dtype=np.float32) return QU
[docs] def get_iso_td_cb_gaussian_lensed_QU(self, idx: int) -> List[np.ndarray]: raise NotImplementedError("Model not implemented yet")
[docs] def get_iso_model_cb_gaussian_lensed_QU(self,idx: int) -> List[np.ndarray]: if self.model == "iso": return self.get_iso_const_cb_gaussian_lensed_QU(idx) elif self.model == "iso_td": return self.get_iso_td_cb_gaussian_lensed_QU(idx) else: raise NotImplementedError("Model not implemented yet")
################ CMB map generation methods ############### ###### Isotropic models ######
[docs] def get_iso_model_cb_lensed_QU(self,idx: int) -> List[np.ndarray]: if self.lensing: return self.get_iso_model_cb_real_lensed_QU(idx) else: return self.get_iso_model_cb_gaussian_lensed_QU(idx)
################ CMB map generation methods ############### ###### Secondary Source Fields(alms) ######
[docs] def alpha_alm(self, idx: int) -> np.ndarray: """ Generate the alpha alm for the anisotropic model. Parameters: idx (int): Index for the realization of the CMB map. Returns: np.ndarray: The alpha alm as a NumPy array. Notes: The method generates the alpha alm for the anisotropic model. The alpha alm is generated as a random realization of the Cl_AA power spectrum. The behavior depends on Acb_sim_config: - 'vary': Each idx gets a unique seed (default) - 'constant': All indices in this range use the same fixed seed - 'null': Returns zeros (no rotation) """ mode = self.__get_alpha_mode__(idx) if mode == 'null': # Return zero alm (no rotation) return np.zeros(hp.Alm.getsize(self.lmax), dtype=complex) cl_aa = self.cl_aa() cl_aa[0] = 0 if mode == 'constant': # Use a fixed seed for constant alpha across this range # Use the first seed as the constant seed np.random.seed(44444) else: # mode == 'vary' # Use index-specific seed (default behavior) np.random.seed(self.__aseeds__[idx]) alm = hp.synalm(cl_aa, lmax=self.lmax, new=True) return alm
[docs] def phi_alm(self, idx: int) -> np.ndarray: fname = os.path.join(self.phidir, f"phi_Lmax{self.lmax}_{idx:03d}.fits") if os.path.isfile(fname): return hp.read_alm(fname) else: cl_pp = self.cl_pp() np.random.seed(self.__pseeds__[idx]) alm = hp.synalm(cl_pp, lmax=self.lmax, new=True) hp.write_alm(fname, alm) return alm
[docs] def grad_phi_alm(self, idx: int) -> np.ndarray: phi_alm = self.phi_alm(idx) return hp.almxfl(phi_alm, np.sqrt(np.arange(self.lmax + 1, dtype=float) * np.arange(1, self.lmax + 2)), None, False)
################ CMB map generation methods ############### ###### Anisotropic models ###### ###### Source Fields(map) ######
[docs] def alpha_map(self, idx: int) -> np.ndarray: """ Generate a map of the rotation angle alpha for the anisotropic model. Parameters: idx (int): Index for the realization of the CMB map. Returns: np.ndarray: A map of the rotation angle alpha as a NumPy array. Notes: The method generates a map of the rotation angle alpha for the anisotropic model. The map is generated as a random realization of the Cl_AA power spectrum. """ fname = os.path.join( self.alphadir, f"alpha_nside{self.nside}_{idx:03d}.fits", ) if os.path.isfile(fname): return hp.read_map(fname) else: alm = self.alpha_alm(idx) alpha = hp.alm2map(alm, self.nside) hp.write_map(fname, alpha, dtype=np.float64) return alpha # type: ignore
################ CMB map generation methods ############### ###### Anisotropic models ###### ###### Real space lensed ######
[docs] def get_aniso_real_lensed_QU(self, idx: int) -> List[np.ndarray]: fname = os.path.join( self.cmbdir, f"sims_nside{self.nside}_{idx:03d}.fits", ) if os.path.isfile(fname): return hp.read_map(fname, field=[0, 1]) else: spectra = self.get_unlensed_spectra(dl=False) np.random.seed(self.__cseeds__[idx]) Q,U = hp.synfast( [spectra["tt"], spectra["ee"], spectra["bb"], spectra["te"]], nside=self.nside, new=True, )[1:] # Check if rotation should be applied based on configuration mode = self.__get_alpha_mode__(idx) if self.Acb != 0 and mode != 'null': alpha = self.alpha_map(idx) rQ = Q * np.cos(2 * alpha) - U * np.sin(2 * alpha) rU = Q * np.sin(2 * alpha) + U * np.cos(2 * alpha) del (Q, U, alpha) elm, blm = hp.map2alm_spin([rQ, rU], 2) else: elm, blm = hp.map2alm_spin([Q, U], 2) defl = self.grad_phi_alm(idx) geom_info = ('healpix', {'nside':self.nside}) Q, U = lenspyx.alm2lenmap_spin([elm,blm], defl, 2, geometry=geom_info, verbose=int(self.verbose)) del (elm, blm, defl) hp.write_map(fname, [Q, U], dtype=np.float64) return [Q, U]
################ CMB map generation methods ############### ###### Anisotropic models ###### ###### Gaussian lensed ######
[docs] def get_aniso_gauss_lensed_QU(self, idx: int) -> List[np.ndarray]: fname = os.path.join( self.cmbdir, f"sims_g_nside{self.nside}_{idx:03d}.fits", ) if os.path.isfile(fname): return hp.read_map(fname, field=[0, 1]) else: spectra = self.get_lensed_spectra(dl=False) np.random.seed(self.__cseeds__[idx]) Q, U = hp.synfast( [spectra["tt"], spectra["ee"], spectra["bb"], spectra["te"]], nside=self.nside, new=True, )[1:] # Check if rotation should be applied based on configuration mode = self.__get_alpha_mode__(idx) if self.Acb != 0 and mode != 'null': alpha = self.alpha_map(idx) rQ = Q * np.cos(2 * alpha) - U * np.sin(2 * alpha) rU = Q * np.sin(2 * alpha) + U * np.cos(2 * alpha) del (Q, U, alpha) else: if mode == 'null': self.logger.log(f"Index {idx} in null_alpha range, no rotation applied", level="info") else: self.logger.log("Acb is zero, no rotation applied", level="info") rQ, rU = Q, U hp.write_map(fname, [rQ, rU], dtype=np.float64) return [rQ, rU]
################ CMB map generation methods ############### ###### Anisotropic models ######
[docs] def get_aniso_model_cb_lensed_QU(self,idx: int) -> List[np.ndarray]: if self.lensing: return self.get_aniso_real_lensed_QU(idx) else: return self.get_aniso_gauss_lensed_QU(idx)
################ CMB map generation methods ############### ###### General models ######
[docs] def get_cb_lensed_QU(self, idx: int) -> List[np.ndarray]: if self.model == "iso" or self.model == "iso_td": return self.get_iso_model_cb_lensed_QU(idx) elif self.model == "aniso": return self.get_aniso_model_cb_lensed_QU(idx) else: raise NotImplementedError("Model not implemented yet")