Source code for orpheus.cp.solver

"""Collision probability (CP) method for neutron transport.

Solves the multi-group neutron transport equation using the collision
probability method with white boundary condition for an infinite lattice.

The geometry-specific kernel is encapsulated in :class:`CPMesh`, an
augmented geometry that wraps a :class:`~geometry.mesh.Mesh1D` and
provides :meth:`~CPMesh.compute_pinf_group`.  Three coordinate systems
are supported:

* **Cartesian** (slab) — E₃ exponential-integral kernel.
* **Cylindrical** (Wigner-Seitz pin) — Ki₃/Ki₄ Bickley-Naylor kernel.
* **Spherical** (shell) — exponential kernel with y-weighted quadrature.

All share the same power iteration via :class:`CPSolver`.

Two solver modes are available:

* **Jacobi** (default) — computes the scattering source from the
  previous iteration's flux for all groups simultaneously.  Simple,
  no inner iterations.
* **Gauss-Seidel** — sweeps group-by-group from fast to thermal,
  using the updated flux from already-processed groups.  Within each
  group, inner iterations converge the within-group scattering.
  Converges faster for problems with strong thermal self-scatter.

.. seealso:: :ref:`theory-collision-probability` — Key Facts, equations, gotchas.
"""

from __future__ import annotations

import time
from dataclasses import dataclass, field
from typing import Callable

import numpy as np
from scipy.integrate import quad
from scipy.special import expn

from orpheus.data.macro_xs.mixture import Mixture
from orpheus.data.macro_xs.cell_xs import CellXS, assemble_cell_xs
from orpheus.geometry import CoordSystem, Mesh1D
from orpheus.numerics.eigenvalue import power_iteration


# ═══════════════════════════════════════════════════════════════════════
# Parameters and result container
# ═══════════════════════════════════════════════════════════════════════

[docs] @dataclass class CPParams: """Solver parameters for the collision probability method.""" max_outer: int = 500 keff_tol: float = 1e-6 flux_tol: float = 1e-5 n_ki_table: int = 20000 ki_max: float = 50.0 n_quad_y: int = 64 solver_mode: str = "jacobi" # "jacobi" or "gauss_seidel" inner_tol: float = 1e-8 # convergence for inner iterations (GS only) max_inner: int = 100 # max inner iterations per group (GS only)
[docs] @dataclass class CPResult: """Results of a collision probability calculation.""" keff: float keff_history: list[float] flux: np.ndarray flux_fuel: np.ndarray flux_clad: np.ndarray flux_cool: np.ndarray geometry: Mesh1D eg: np.ndarray elapsed_seconds: float residual_history: list[float] = field(default_factory=list) n_inner: np.ndarray | None = None # (n_outer, ng) inner iteration counts
# ═══════════════════════════════════════════════════════════════════════ # Helper functions (module-level, used by CPMesh) # ═══════════════════════════════════════════════════════════════════════ def _e3(x): """Vectorised E_3(x) = integral_0^1 mu exp(-x/mu) dmu.""" return expn(3, np.maximum(x, 0.0)) def _build_ki_tables( n_pts: int = 20000, x_max: float = 50.0, ) -> tuple[np.ndarray, np.ndarray, np.ndarray]: """Tabulate Ki_3 and Ki_4 on a uniform grid. Ki_3(x) = int_0^{pi/2} exp(-x/sin t) sin t dt Ki_4(x) = int_x^inf Ki_3(t) dt Returns (x_grid, ki3_vals, ki4_vals). """ x_grid = np.linspace(0, x_max, n_pts) ki3_vals = np.empty(n_pts) ki3_vals[0] = 1.0 for i in range(1, n_pts): ki3_vals[i], _ = quad( lambda t, xx=x_grid[i]: np.exp(-xx / np.sin(t)) * np.sin(t), 0, np.pi / 2, ) dx = x_grid[1] - x_grid[0] ki4_vals = np.cumsum(ki3_vals[::-1])[::-1] * dx ki4_vals[-1] = 0.0 return x_grid, ki3_vals, ki4_vals def _ki4_lookup(x, x_grid, ki4_vals): """Vectorised Ki_4 lookup.""" return np.interp(x, x_grid, ki4_vals, right=0.0) def _chord_half_lengths(radii, y_pts): """Half-chord lengths l_k(y) for each annular region. Shape (N, n_y).""" N = len(radii) chords = np.zeros((N, len(y_pts))) r_inner = np.zeros(N) r_inner[1:] = radii[:-1] y2 = y_pts**2 for k in range(N): r_out, r_in = radii[k], r_inner[k] outer = np.sqrt(np.maximum(r_out**2 - y2, 0.0)) if r_in > 0: inner = np.sqrt(np.maximum(r_in**2 - y2, 0.0)) mask = y_pts < r_in chords[k, mask] = outer[mask] - inner[mask] mask_p = (y_pts >= r_in) & (y_pts < r_out) chords[k, mask_p] = outer[mask_p] return chords def _composite_gauss_legendre(breakpoints, n_quad): """Composite Gauss-Legendre quadrature over breakpoint intervals. Returns (pts, wts) concatenated across all intervals. """ gl_pts, gl_wts = np.polynomial.legendre.leggauss(n_quad) y_all, w_all = [], [] for seg in range(len(breakpoints) - 1): a, b = breakpoints[seg], breakpoints[seg + 1] y_all.append(0.5 * (b - a) * gl_pts + 0.5 * (b + a)) w_all.append(0.5 * (b - a) * gl_wts) return np.concatenate(y_all), np.concatenate(w_all) # ═══════════════════════════════════════════════════════════════════════ # CPMesh — augmented geometry for the collision probability method # ═══════════════════════════════════════════════════════════════════════
[docs] class CPMesh: """Augmented geometry for the collision probability method. Wraps a :class:`~geometry.mesh.Mesh1D` and adds the CP-specific kernel, quadrature, and :meth:`compute_pinf_group` method. The kernel is selected automatically based on the mesh coordinate system: * **Cartesian** — E₃ kernel (slab), scalar computation. * **Cylindrical** — Ki₄ kernel, y-quadrature with chord half-lengths. * **Spherical** — exp(-τ) kernel, y-quadrature with y-weighted chords. Parameters ---------- mesh : Mesh1D Base geometry. params : CPParams, optional Solver parameters (Ki table size, quadrature order, etc.). """ def __init__(self, mesh: Mesh1D, params: CPParams | None = None) -> None: self.mesh = mesh self.params = params or CPParams() match mesh.coord: case CoordSystem.CARTESIAN: self._setup_slab() case CoordSystem.CYLINDRICAL: self._setup_cylindrical() case CoordSystem.SPHERICAL: self._setup_spherical() case _: raise ValueError(f"CP not implemented for {mesh.coord}") # ── Setup methods ───────────────────────────────────────────────── def _setup_slab(self) -> None: """Slab: E₃ kernel, no y-quadrature.""" self._chords = None self._y_wts = None self._kernel: Callable | None = None self._kernel_zero = None def _setup_radial_quadrature(self) -> None: """Shared: build y-quadrature and chord half-lengths for cylindrical or spherical meshes.""" p = self.params radii = self.mesh.edges[1:] self._y_pts, self._y_wts = _composite_gauss_legendre( self.mesh.edges, p.n_quad_y, ) self._chords = _chord_half_lengths(radii, self._y_pts) def _setup_cylindrical(self) -> None: """Cylindrical: Ki₄ kernel + y-quadrature.""" p = self.params print(" Building Ki3/Ki4 lookup tables ...") ki_x, _, ki4_v = _build_ki_tables(p.n_ki_table, p.ki_max) self._setup_radial_quadrature() n_y = len(self._y_pts) self._kernel = lambda tau: _ki4_lookup(tau, ki_x, ki4_v) self._kernel_zero = _ki4_lookup(np.zeros(n_y), ki_x, ki4_v) def _setup_spherical(self) -> None: """Spherical: exp(-τ) kernel + y-weighted quadrature.""" self._setup_radial_quadrature() # Kernel F(τ) = exp(-τ) self._kernel = lambda tau: np.exp(-tau) self._kernel_zero = np.ones(len(self._y_pts)) # Spherical weight: extra factor of y in the quadrature self._y_wts = self._y_wts * self._y_pts # ── P_inf computation ─────────────────────────────────────────────
[docs] def compute_pinf_group(self, sig_t_g: np.ndarray) -> np.ndarray: """Compute the infinite-lattice P_inf matrix for one energy group. Parameters ---------- sig_t_g : ndarray, shape (N,) Total macroscopic cross section per cell for this group. Returns ------- ndarray, shape (N, N) Infinite-lattice collision probability matrix. """ match self.mesh.coord: case CoordSystem.CARTESIAN: rcp = self._compute_slab_rcp(sig_t_g) case _: rcp = self._compute_radial_rcp(sig_t_g) P_cell = self._normalize_rcp(rcp, sig_t_g) return self._apply_white_bc(P_cell, sig_t_g)
def _compute_slab_rcp(self, sig_t_g: np.ndarray) -> np.ndarray: """Reduced collision probabilities for slab geometry (E₃ kernel). Returns rcp[i,j] = Σ_t(i) · V(i) · P_cell(i,j) (un-normalized). """ N = self.mesh.N t = self.mesh.widths tau = sig_t_g * t # Optical boundary positions bnd_pos = np.concatenate(([0.0], np.cumsum(tau))) rcp = np.zeros((N, N)) for i in range(N): sti = sig_t_g[i] tau_i = tau[i] if sti <= 0: continue # Self-same collision: Σ_t·V - (F(0) - F(τ_i)) rcp[i, i] += sti * t[i] - (0.5 - _e3(tau_i)) for j in range(N): tau_j = tau[j] # Direct path gap (optical distance between regions) if j > i: gap_d = bnd_pos[j] - bnd_pos[i + 1] elif j < i: gap_d = bnd_pos[i] - bnd_pos[j + 1] else: gap_d = None if gap_d is not None: gap_d = max(gap_d, 0.0) dd = (_e3(gap_d) - _e3(gap_d + tau_i) - _e3(gap_d + tau_j) + _e3(gap_d + tau_i + tau_j)) else: dd = 0.0 # Reflected path (via centre symmetry plane) gap_c = bnd_pos[i] + bnd_pos[j] dc = (_e3(gap_c) - _e3(gap_c + tau_i) - _e3(gap_c + tau_j) + _e3(gap_c + tau_i + tau_j)) rcp[i, j] += 0.5 * (dd + dc) return rcp def _compute_radial_rcp(self, sig_t_g: np.ndarray) -> np.ndarray: """Reduced collision probabilities for radial geometries. Shared by cylindrical (Ki₄ kernel) and spherical (exp kernel). The kernel and y-quadrature weights are set during setup. Returns rcp[i,j] = Σ_t(i) · V(i) · P_cell(i,j) (un-normalized). """ N = self.mesh.N chords = self._chords y_wts = self._y_wts kernel = self._kernel kernel_zero = self._kernel_zero n_y = len(y_wts) tau = sig_t_g[:, None] * chords # (N, n_y) # Optical boundary positions at each y-line bnd_pos = np.zeros((N + 1, n_y)) for k in range(N): bnd_pos[k + 1, :] = bnd_pos[k, :] + tau[k, :] rcp = np.zeros((N, N)) for i in range(N): tau_i = tau[i, :] sti = sig_t_g[i] if sti == 0: continue # Self-same collision self_same = 2.0 * chords[i, :] - (2.0 / sti) * ( kernel_zero - kernel(tau_i) ) rcp[i, i] += 2.0 * sti * np.dot(y_wts, self_same) for j in range(N): tau_j = tau[j, :] # Direct path gap if j > i: gap_d = bnd_pos[j, :] - bnd_pos[i + 1, :] elif j < i: gap_d = bnd_pos[i, :] - bnd_pos[j + 1, :] else: gap_d = None if gap_d is not None: gap_d = np.maximum(gap_d, 0.0) dd = (kernel(gap_d) - kernel(gap_d + tau_i) - kernel(gap_d + tau_j) + kernel(gap_d + tau_i + tau_j)) else: dd = np.zeros(n_y) # Reflected path gap_c = bnd_pos[i, :] + bnd_pos[j, :] dc = (kernel(gap_c) - kernel(gap_c + tau_i) - kernel(gap_c + tau_j) + kernel(gap_c + tau_i + tau_j)) rcp[i, j] += 2.0 * np.dot(y_wts, dd + dc) return rcp def _normalize_rcp( self, rcp: np.ndarray, sig_t_g: np.ndarray, ) -> np.ndarray: """Normalize rcp to get P_cell: P_cell[i,:] = rcp[i,:] / (Σ_t·V).""" N = self.mesh.N V = self.mesh.volumes P_cell = np.zeros((N, N)) for i in range(N): denom = sig_t_g[i] * V[i] if denom > 0: P_cell[i, :] = rcp[i, :] / denom return P_cell def _apply_white_bc( self, P_cell: np.ndarray, sig_t_g: np.ndarray, ) -> np.ndarray: """Apply white boundary condition to get P_inf. P_inf = P_cell + P_out ⊗ P_in / (1 - P_inout) Works for all coordinate systems because mesh.volumes and mesh.surfaces[-1] encode the geometry correctly. """ V = self.mesh.volumes S_cell = self.mesh.surfaces[-1] P_out = np.maximum(1.0 - P_cell.sum(axis=1), 0.0) P_in = sig_t_g * V * P_out / S_cell P_inout = max(1.0 - P_in.sum(), 0.0) P_inf = P_cell.copy() if P_inout < 1.0: P_inf += np.outer(P_out, P_in) / (1.0 - P_inout) return P_inf
# ═══════════════════════════════════════════════════════════════════════ # CP solver class (satisfies EigenvalueSolver protocol) # ═══════════════════════════════════════════════════════════════════════
[docs] class CPSolver: """Geometry-independent CP eigenvalue solver. Once the infinite-lattice CP matrices P_inf are built (by :class:`CPMesh`), the eigenvalue iteration is identical for all geometries: φ_g = P_inf_g^T · (V · Q_g) / (Σ_t · V) where V is the cell volume array and Q is the total source (fission + scattering + n,2n). Two solver modes are supported: * **Jacobi** (default) — all groups updated from previous iteration's flux. Simple, no inner iterations. * **Gauss-Seidel** — groups swept from fast to thermal; each group uses the latest flux from already-processed groups. Inner iterations within each group converge the within-group scattering source. Converges faster for thermal problems with strong self-scatter. """ def __init__( self, P_inf: np.ndarray, xs: CellXS, volumes: np.ndarray, mat_ids: np.ndarray, materials: dict[int, Mixture], keff_tol: float = 1e-6, flux_tol: float = 1e-5, solver_mode: str = "jacobi", inner_tol: float = 1e-8, max_inner: int = 100, ) -> None: self.P_inf = P_inf # (N, N, ng) self.xs = xs self.volumes = volumes # (N,) self.mat_ids = mat_ids self.ng = xs.sig_t.shape[1] self.N = xs.sig_t.shape[0] self.keff_tol = keff_tol self.flux_tol = flux_tol self.solver_mode = solver_mode self.inner_tol = inner_tol self.max_inner = max_inner # Cache scattering and (n,2n) matrices per material self._scat_mats = {mid: materials[mid].SigS[0] for mid in materials} self._n2n_mats = {mid: materials[mid].Sig2 for mid in materials} # Convergence diagnostics (populated during iteration) self.residual_history: list[float] = [] self.n_inner_history: list[np.ndarray] = [] # list of (ng,) arrays
[docs] def initial_flux_distribution(self) -> np.ndarray: return np.ones((self.N, self.ng))
[docs] def compute_fission_source( self, flux_distribution: np.ndarray, keff: float, ) -> np.ndarray: fission_rate = np.sum(self.xs.sig_p * flux_distribution, axis=1) return self.xs.chi * fission_rate[:, np.newaxis] / keff
[docs] def solve_fixed_source( self, fission_source: np.ndarray, flux_distribution: np.ndarray, ) -> np.ndarray: if self.solver_mode == "gauss_seidel": return self._solve_fixed_source_gs(fission_source, flux_distribution) return self._solve_fixed_source_jacobi(fission_source, flux_distribution)
def _solve_fixed_source_jacobi( self, fission_source: np.ndarray, flux_distribution: np.ndarray, ) -> np.ndarray: """Jacobi solver: all groups updated from previous iteration's flux.""" N, ng = self.N, self.ng # Total source = fission + scattering + (n,2n) Q = fission_source.copy() for k in range(N): mid = self.mat_ids[k] Q[k, :] += self._scat_mats[mid].T @ flux_distribution[k, :] Q[k, :] += 2.0 * (self._n2n_mats[mid].T @ flux_distribution[k, :]) # Apply CP matrices: φ_g = P_inf_g^T · (V · Q_g) / (Σ_t · V) phi = np.empty((N, ng)) for g in range(ng): source = self.volumes * Q[:, g] phi[:, g] = self.P_inf[:, :, g].T @ source denom = self.xs.sig_t * self.volumes[:, np.newaxis] pos = denom > 0 phi[pos] = phi[pos] / denom[pos] phi[~pos] = 0.0 # Numerical conditioning (prevent overflow in subsequent iterations) phi *= 1.0 / np.max(phi) return phi def _solve_fixed_source_gs( self, fission_source: np.ndarray, flux_distribution: np.ndarray, ) -> np.ndarray: """Gauss-Seidel solver with inner iterations per group. Sweeps groups from fast (g=0) to thermal (g=ng-1). For each group, inner iterations converge the within-group scattering source by recomputing the scattering from the latest flux (which includes already-updated groups < g). The inner iteration count per group reveals which groups have strong self-scatter (typically thermal groups). """ N, ng = self.N, self.ng phi = flux_distribution.copy() # start from previous estimate n_inner_this_outer = np.zeros(ng, dtype=int) for g in range(ng): for n_in in range(1, self.max_inner + 1): phi_g_old = phi[:, g].copy() # Build total source for group g using latest phi Q_g = fission_source[:, g].copy() for k in range(N): mid = self.mat_ids[k] # Scattering into group g from all groups (using latest phi) scat_row = np.asarray( self._scat_mats[mid].T[g, :].todense() ).ravel() Q_g[k] += scat_row @ phi[k, :] # (n,2n) into group g n2n_row = np.asarray( self._n2n_mats[mid].T[g, :].todense() ).ravel() Q_g[k] += 2.0 * (n2n_row @ phi[k, :]) # Apply CP matrix for group g source_g = self.volumes * Q_g transported_g = self.P_inf[:, :, g].T @ source_g # New flux for group g denom_g = self.xs.sig_t[:, g] * self.volumes phi_g_new = np.zeros(N) pos = denom_g > 0 phi_g_new[pos] = transported_g[pos] / denom_g[pos] phi[:, g] = phi_g_new # Inner convergence: relative change in group flux # Nonzero when within-group self-scatter Σ_s(g→g) causes # the source Q_g to depend on φ_g itself. phi_g_norm = np.linalg.norm(phi_g_new) res_in = (np.linalg.norm(phi_g_new - phi_g_old) / max(phi_g_norm, 1e-30)) if res_in < self.inner_tol: break n_inner_this_outer[g] = n_in self.n_inner_history.append(n_inner_this_outer.copy()) # Numerical conditioning phi *= 1.0 / np.max(phi) return phi def _compute_balance_residual( self, flux_distribution: np.ndarray, keff: float, ) -> float: """Compute the neutron balance residual ||Aφ - B1φ - (1/k)B2φ||₂. This is the L2 norm of the difference between the collision rate (Σ_t V φ) and the transported total source (P^T V Q_total) across all regions and groups. At convergence, this should be zero. """ N, ng = self.N, self.ng V = self.volumes # Collision rate: Σ_t(i,g) · V(i) · φ(i,g) collision = self.xs.sig_t * V[:, np.newaxis] * flux_distribution # Build total source Q = fission + scattering + (n,2n) fission_rate = np.sum(self.xs.sig_p * flux_distribution, axis=1) Q_fis = self.xs.chi * fission_rate[:, np.newaxis] / keff Q = Q_fis.copy() for k in range(N): mid = self.mat_ids[k] Q[k, :] += self._scat_mats[mid].T @ flux_distribution[k, :] Q[k, :] += 2.0 * (self._n2n_mats[mid].T @ flux_distribution[k, :]) # Transported source: P^T · (V · Q) for each group transported = np.empty((N, ng)) for g in range(ng): transported[:, g] = self.P_inf[:, :, g].T @ (V * Q[:, g]) return float(np.linalg.norm(collision - transported))
[docs] def compute_keff(self, flux_distribution: np.ndarray) -> float: v = self.volumes[:, np.newaxis] production = np.sum(self.xs.sig_p * flux_distribution * v) # Net removal = total - scattering - 2*(n,2n) # This is the denominator of the eigenvalue balance: # (Σt - Σs - 2Σ₂)φV = (1/k) χ(νΣf·φ)V total = np.sum(self.xs.sig_t * flux_distribution * v) scatter = 0.0 n2n = 0.0 for k in range(self.N): mid = self.mat_ids[k] scatter += np.sum( (self._scat_mats[mid].T @ flux_distribution[k, :]) * self.volumes[k] ) n2n += np.sum( (self._n2n_mats[mid].T @ flux_distribution[k, :]) * self.volumes[k] ) net_removal = total - scatter - 2.0 * n2n return float(production / net_removal)
[docs] def converged( self, keff: float, keff_old: float, flux_distribution: np.ndarray, flux_old: np.ndarray, iteration: int, ) -> bool: if iteration <= 2: return False delta_k = abs(keff - keff_old) delta_phi = np.max(np.abs(flux_distribution - flux_old)) / \ max(np.max(np.abs(flux_distribution)), 1e-30) # Compute and store the neutron balance residual residual = self._compute_balance_residual(flux_distribution, keff) self.residual_history.append(residual) # Print diagnostics if iteration <= 5 or iteration % 10 == 0: msg = (f" iter {iteration:4d} keff = {keff:.6f} " f"dk = {delta_k:.2e} dphi = {delta_phi:.2e} " f"res = {residual:.2e}") if (self.solver_mode == "gauss_seidel" and self.n_inner_history): n_in = self.n_inner_history[-1] msg += f" inner(max/mean) = {n_in.max()}/{n_in.mean():.1f}" print(msg) if delta_k < self.keff_tol and delta_phi < self.flux_tol: print(f" iter {iteration:4d} keff = {keff:.6f} " f"res = {residual:.2e} Converged.") return True return False
# ═══════════════════════════════════════════════════════════════════════ # Post-processing # ═══════════════════════════════════════════════════════════════════════ def _volume_averaged_fluxes( phi: np.ndarray, volumes: np.ndarray, mat_ids: np.ndarray, ) -> tuple[np.ndarray, np.ndarray, np.ndarray]: """Volume-averaged flux per material (fuel=2, clad=1, cool=0).""" ng = phi.shape[1] vol_fuel = volumes[mat_ids == 2].sum() vol_clad = volumes[mat_ids == 1].sum() vol_cool = volumes[mat_ids == 0].sum() flux_fuel = np.zeros(ng) flux_clad = np.zeros(ng) flux_cool = np.zeros(ng) for k in range(len(mat_ids)): v = volumes[k] mid = mat_ids[k] if mid == 2: flux_fuel += phi[k, :] * v / vol_fuel elif mid == 1: flux_clad += phi[k, :] * v / vol_clad else: flux_cool += phi[k, :] * v / vol_cool return flux_fuel, flux_clad, flux_cool # ═══════════════════════════════════════════════════════════════════════ # Public API # ═══════════════════════════════════════════════════════════════════════
[docs] def solve_cp( materials: dict[int, Mixture], mesh: Mesh1D | None = None, params: CPParams | None = None, ) -> CPResult: """Solve the CP eigenvalue problem for any supported geometry. The kernel is selected automatically based on ``mesh.coord``: Cartesian (slab E₃), Cylindrical (Ki₄), or Spherical (exp). Parameters ---------- materials : dict[int, Mixture] Macroscopic cross sections keyed by material ID. mesh : Mesh1D, optional 1-D mesh. Defaults to a cylindrical PWR pin cell via :func:`geometry.factories.pwr_pin_equivalent`. params : CPParams, optional Solver parameters (tolerances, Ki table size, quadrature order). Returns ------- CPResult """ t_start = time.perf_counter() if mesh is None: from orpheus.geometry import pwr_pin_equivalent mesh = pwr_pin_equivalent() if params is None: params = CPParams() _any_mat = next(iter(materials.values())) eg = _any_mat.eg ng = _any_mat.ng N = mesh.N # Build augmented geometry (sets up kernel + quadrature) cp_mesh = CPMesh(mesh, params) xs = assemble_cell_xs(materials, mesh.mat_ids) # Build CP matrices for all energy groups print(f" Computing CP matrices for {ng} groups, {N} regions " f"({mesh.coord.value}) ...") P_inf = np.empty((N, N, ng)) for g in range(ng): P_inf[:, :, g] = cp_mesh.compute_pinf_group(xs.sig_t[:, g]) if (g + 1) % 100 == 0: print(f" group {g + 1}/{ng}") print(" CP matrices done.") # Eigenvalue solve print(f" Starting power iteration (mode: {params.solver_mode}) ...") solver = CPSolver( P_inf, xs, mesh.volumes, mesh.mat_ids, materials, keff_tol=params.keff_tol, flux_tol=params.flux_tol, solver_mode=params.solver_mode, inner_tol=params.inner_tol, max_inner=params.max_inner, ) keff, keff_history, phi = power_iteration(solver, max_iter=params.max_outer) flux_fuel, flux_clad, flux_cool = _volume_averaged_fluxes( phi, mesh.volumes, mesh.mat_ids) # Collect inner iteration diagnostics (GS mode only) n_inner = None if solver.n_inner_history: n_inner = np.array(solver.n_inner_history) # (n_outer, ng) elapsed = time.perf_counter() - t_start print(f" Elapsed: {elapsed:.1f}s") return CPResult( keff=keff, keff_history=keff_history, flux=phi, flux_fuel=flux_fuel, flux_clad=flux_clad, flux_cool=flux_cool, geometry=mesh, eg=eg, elapsed_seconds=elapsed, residual_history=solver.residual_history, n_inner=n_inner, )