"""Diamond-difference transport sweep for 1D and 2D Cartesian meshes.
Two paths, automatically dispatched:
- **1D cumprod**: for ny=1, mu_y=0 (Gauss-Legendre). Solves the
recurrence via cumulative products — O(nc) numpy ops, ~ms.
- **2D wavefront**: for general 2D. Sweeps cells along anti-diagonals
(i+j=const), vectorized within each diagonal.
Both paths use the precomputed streaming stencil from :class:`SNMesh`
to avoid redundant per-ordinate per-cell divisions.
"""
from __future__ import annotations
from typing import TYPE_CHECKING
import numpy as np
from .quadrature import AngularQuadrature
if TYPE_CHECKING:
from .geometry import SNMesh
[docs]
def transport_sweep(
Q: np.ndarray,
sig_t: np.ndarray,
sn_mesh: SNMesh,
psi_bc: dict,
Q_aniso: np.ndarray | None = None,
) -> tuple[np.ndarray, np.ndarray]:
"""Perform one full diamond-difference transport sweep.
Parameters
----------
Q : (nx, ny, ng) isotropic source density.
sig_t : (nx, ny, ng) total macroscopic cross section.
sn_mesh : SNMesh — augmented geometry with precomputed stencil.
psi_bc : mutable dict storing persistent boundary fluxes
for reflective BCs between outer iterations.
Q_aniso : (N, nx, ny, ng) per-ordinate anisotropic source (P1+
scattering). None for isotropic-only (P0).
Returns
-------
angular_flux : (N, nx, ny, ng) angular flux per ordinate.
scalar_flux : (nx, ny, ng) = Σ_n w_n ψ_n.
"""
quad = sn_mesh.quad
ny = sn_mesh.ny
if sn_mesh.curvature == "spherical":
return _sweep_1d_spherical(Q, sig_t, sn_mesh, psi_bc)
if sn_mesh.curvature == "cylindrical":
return _sweep_1d_cylindrical(Q, sig_t, sn_mesh, psi_bc)
is_gl_1d = (ny == 1 and np.all(np.abs(quad.mu_y) < 1e-15)
and Q_aniso is None)
if is_gl_1d:
return _sweep_1d_cumprod(Q, sig_t, sn_mesh, psi_bc)
else:
return _sweep_2d_wavefront(Q, sig_t, sn_mesh, psi_bc, Q_aniso)
# ═══════════════════════════════════════════════════════════════════════
# 1D cumprod path (fast, for GL quadrature on slab)
# ═══════════════════════════════════════════════════════════════════════
def _sweep_1d_cumprod(
Q: np.ndarray,
sig_t: np.ndarray,
sn_mesh: SNMesh,
psi_bc: dict,
Q_aniso: np.ndarray | None = None,
) -> tuple[np.ndarray, np.ndarray]:
"""1D sweep using cumulative products for the DD recurrence.
Uses the precomputed streaming stencil from SNMesh:
streaming_x[n, i] = 2|μ_x[n]| / dx[i].
"""
dx = sn_mesh.dx
nx = len(dx)
ng = Q.shape[2]
quad = sn_mesh.quad
N = quad.N
weights = quad.weights
ref_x = quad.reflection_index("x")
# Squeeze out the ny=1 dimension for 1D arrays
Q_1d = Q[:, 0, :] # (nx, ng)
sig_t_1d = sig_t[:, 0, :] # (nx, ng)
# Precompute DD coefficients for positive directions
n_half = N // 2
mu_pos = np.abs(quad.mu_x[N // 2:]) # positive half
w_pos = weights[N // 2:]
# stream_coeff[n,i,g] = 2μ / (2μ + dx[i]·Σ_t[i,g])
# source_coeff[n,i,g] = 0.5·dx[i] / (2μ + dx[i]·Σ_t[i,g])
denom = 2.0 * mu_pos[:, None, None] + dx[None, :, None] * sig_t_1d[None, :, :]
stream_coeff = 2.0 * mu_pos[:, None, None] / denom
source_coeff = 0.5 * dx[None, :, None] / denom
# Initialize boundary fluxes
if "bc_1d" not in psi_bc:
psi_bc["bc_1d"] = {
"left": np.zeros((n_half, ng)),
"right": np.zeros((n_half, ng)),
}
bc = psi_bc["bc_1d"]
angular_flux = np.zeros((N, nx, 1, ng))
phi = np.zeros((nx, ng))
bQ = source_coeff * Q_1d[None, :, :]
for n in range(n_half):
a = stream_coeff[n] # (nx, ng)
s = bQ[n] # (nx, ng)
# Forward sweep (positive direction)
psi_fwd = _solve_recurrence(a, s, bc["left"][n])
bc["right"][n, :] = _outgoing(a, s, bc["left"][n])
phi += w_pos[n] * psi_fwd
angular_flux[n_half + n, :, 0, :] = psi_fwd # store cell-avg angular flux
# Backward sweep (negative direction via reversal)
psi_bwd = _solve_recurrence(a[::-1], s[::-1], bc["right"][n])
bc["left"][n, :] = _outgoing(a[::-1], s[::-1], bc["right"][n])
phi += w_pos[n] * psi_bwd[::-1]
angular_flux[n_half - 1 - n, :, 0, :] = psi_bwd[::-1]
return angular_flux, phi[:, None, :] # restore ny=1 dim
def _solve_recurrence(
a: np.ndarray, s: np.ndarray, psi0: np.ndarray,
) -> np.ndarray:
"""Solve DD recurrence via cumulative products. Returns cell-average flux."""
nc = a.shape[0]
cp = np.cumprod(a, axis=0)
cs = np.cumsum(s / cp, axis=0)
psi_in = np.empty_like(a)
psi_in[0] = psi0
if nc > 1:
psi_in[1:] = cp[:-1] * (psi0[None, :] + cs[:-1])
psi_out = a * psi_in + s
return 0.5 * (psi_in + psi_out)
def _outgoing(
a: np.ndarray, s: np.ndarray, psi0: np.ndarray,
) -> np.ndarray:
"""Outgoing flux at the end of a forward sweep."""
cp = np.cumprod(a, axis=0)
cs = np.cumsum(s / cp, axis=0)
return cp[-1] * (psi0 + cs[-1])
# ═══════════════════════════════════════════════════════════════════════
# 1D spherical path (cell-by-cell with angular redistribution)
# ═══════════════════════════════════════════════════════════════════════
def _sweep_1d_spherical(
Q: np.ndarray,
sig_t: np.ndarray,
sn_mesh: SNMesh,
psi_bc: dict,
) -> tuple[np.ndarray, np.ndarray]:
r"""Spherical 1-D sweep with geometry-weighted angular redistribution.
Processes ordinates sequentially from most negative :math:`\mu` to
most positive, applying angular redistribution via the :math:`\alpha`
coefficients at each cell.
The balance equation includes a geometry factor
:math:`\Delta A_i / w_n` on the redistribution term
(Bailey et al. 2009), ensuring per-ordinate flat-flux consistency:
.. math::
\psi_{n,i} = \frac{S_i V_i + |\mu_n|(A_{\rm in}+A_{\rm out})
\psi^s_{\rm in} + \frac{\Delta A_i}{w_n}
(\alpha_{n+\frac12}+\alpha_{n-\frac12})\psi_{n-\frac12}}
{2|\mu_n| A^s_{\rm out}
+ 2\frac{\Delta A_i}{w_n}\alpha_{n+\frac12} + \Sigma_t V_i}
The :math:`\alpha` coefficients are computed as
:math:`\alpha_{n+1/2} = \alpha_{n-1/2} - w_n \mu_n` and form a
non-negative dome for μ-sorted ordinates.
"""
nx = sn_mesh.nx
ng = Q.shape[2]
quad = sn_mesh.quad
N = quad.N
mu = quad.mu_x
weights = quad.weights
ref = quad.reflection_index("x")
Q_1d = Q[:, 0, :] # (nx, ng)
sig_t_1d = sig_t[:, 0, :] # (nx, ng)
A = sn_mesh.face_areas # (nx+1,) surface areas at cell faces
V = sn_mesh.volumes[:, 0] # (nx,) cell volumes
alpha = sn_mesh.alpha_half # (N+1,) non-negative dome
dAw = sn_mesh.redist_dAw # (nx, N) precomputed ΔA_i/w_n
tau = sn_mesh.tau_mm # (N,) Morel–Montry angular weights
# Persistent boundary flux at the outer face (per ordinate)
if "bc_sph" not in psi_bc:
psi_bc["bc_sph"] = np.zeros((N, ng))
bc_outer = psi_bc["bc_sph"]
# Angular "face flux" between successive ordinates: ψ_{n-1/2, i}
# Shape (nx, ng). Initialised to zero for the first ordinate (α_{1/2}=0).
psi_angle = np.zeros((nx, ng))
angular_flux = np.zeros((N, nx, 1, ng))
scalar_flux = np.zeros((nx, ng))
# Isotropic source → angular source density by dividing by sum(w)
# Then multiply by cell volume for the balance equation
weight_norm = 1.0 / weights.sum()
QV = Q_1d * V[:, None] * weight_norm # (nx, ng)
for n in range(N):
mu_n = mu[n]
abs_mu = abs(mu_n)
w_n = weights[n]
alpha_in = alpha[n] # α_{n-1/2} ≥ 0 (dome)
alpha_out = alpha[n + 1] # α_{n+1/2} ≥ 0 (dome)
tau_n = tau[n] # M-M angular closure weight
c_out = alpha_out / tau_n # denom coefficient
c_in = (1.0 - tau_n) / tau_n * alpha_out + alpha_in # numer coefficient
if mu_n < 0:
# Inward sweep: outer boundary → centre
psi_spatial_in = bc_outer[ref[n]].copy()
for i in range(nx - 1, -1, -1):
A_in = A[i + 1] # incoming face (outer)
A_out = A[i] # outgoing face (inner)
dA_w = dAw[i, n] # precomputed geometry factor
denom = (2.0 * abs_mu * A_out
+ dA_w * c_out
+ sig_t_1d[i] * V[i])
numer = (QV[i]
+ abs_mu * (A_in + A_out) * psi_spatial_in
+ dA_w * c_in * psi_angle[i])
psi = numer / denom
# WDD closures
psi_spatial_out = 2.0 * psi - psi_spatial_in
psi_angle[i] = (psi - (1.0 - tau_n) * psi_angle[i]) / tau_n
angular_flux[n, i, 0, :] = psi
scalar_flux[i] += w_n * psi
psi_spatial_in = psi_spatial_out
else:
# Outward sweep: centre → outer boundary
# At r=0, A[0] = 4π(0)² = 0, so no spatial incoming flux
psi_spatial_in = np.zeros(ng)
for i in range(nx):
A_in = A[i] # incoming face (inner)
A_out = A[i + 1] # outgoing face (outer)
dA_w = dAw[i, n]
denom = (2.0 * abs_mu * A_out
+ dA_w * c_out
+ sig_t_1d[i] * V[i])
numer = (QV[i]
+ abs_mu * (A_in + A_out) * psi_spatial_in
+ dA_w * c_in * psi_angle[i])
psi = numer / denom
# WDD closures
psi_spatial_out = 2.0 * psi - psi_spatial_in
psi_angle[i] = (psi - (1.0 - tau_n) * psi_angle[i]) / tau_n
angular_flux[n, i, 0, :] = psi
scalar_flux[i] += w_n * psi
psi_spatial_in = psi_spatial_out
# Store outgoing flux at outer boundary for reflective BC
bc_outer[n] = psi_spatial_out
return angular_flux, scalar_flux[:, None, :] # restore ny=1 dim
# ═══════════════════════════════════════════════════════════════════════
# 1D cylindrical path (per-level azimuthal redistribution)
# ═══════════════════════════════════════════════════════════════════════
def _sweep_1d_cylindrical(
Q: np.ndarray,
sig_t: np.ndarray,
sn_mesh: SNMesh,
psi_bc: dict,
) -> tuple[np.ndarray, np.ndarray]:
r"""Cylindrical 1-D sweep with geometry-weighted azimuthal redistribution.
For each μ-level *p*, processes azimuthal ordinates sequentially
from most-inward (:math:`\eta = -\sin\theta`) to most-outward
(:math:`\eta = +\sin\theta`), applying the redistribution
:math:`\alpha_{p,m+1/2}` which couples successive azimuthal
directions on that level.
The balance equation includes a geometry factor
:math:`\Delta A_i / w_m` on the redistribution term
(Bailey et al. 2009), ensuring per-ordinate flat-flux consistency:
.. math::
\psi_{m,i} = \frac{S_i V_i + |\eta_m|(A_{\rm in}+A_{\rm out})
\psi^s_{\rm in} + \frac{\Delta A_i}{w_m}
(\alpha_{m+\frac12}+\alpha_{m-\frac12})\psi_{m-\frac12}}
{2|\eta_m| A^s_{\rm out}
+ 2\frac{\Delta A_i}{w_m}\alpha_{m+\frac12} + \Sigma_t V_i}
The :math:`\alpha` coefficients are computed from the radial
direction cosine :math:`\eta` (Bailey et al. Eq. 50) and form a
non-negative dome, so the denominator is unconditionally positive.
"""
nx = sn_mesh.nx
ng = Q.shape[2]
quad = sn_mesh.quad
N = quad.N
weights = quad.weights
ref = quad.reflection_index("x")
Q_1d = Q[:, 0, :] # (nx, ng)
sig_t_1d = sig_t[:, 0, :] # (nx, ng)
A = sn_mesh.face_areas # (nx+1,) = 2πr at edges
V = sn_mesh.volumes[:, 0] # (nx,) cell volumes
# Persistent boundary flux at the outer face (per ordinate)
if "bc_cyl" not in psi_bc:
psi_bc["bc_cyl"] = np.zeros((N, ng))
bc_outer = psi_bc["bc_cyl"]
angular_flux = np.zeros((N, nx, 1, ng))
scalar_flux = np.zeros((nx, ng))
# Isotropic source → angular source density
weight_norm = 1.0 / weights.sum()
QV = Q_1d * V[:, None] * weight_norm # (nx, ng)
# Process each μ-level independently
for p, level_idx in enumerate(quad.level_indices):
alpha = sn_mesh.alpha_per_level[p] # (M+1,) non-negative dome
dAw = sn_mesh.redist_dAw_per_level[p] # (nx, M) precomputed
tau_level = sn_mesh.tau_mm_per_level[p] # (M,) M-M weights
M = len(level_idx)
# Azimuthal "face flux" between successive ordinates on this level.
# Initialised to zero: α_{1/2} = 0 so the product α·ψ vanishes.
psi_angle = np.zeros((nx, ng))
for m_local in range(M):
n = level_idx[m_local] # global ordinate index
eta_n = quad.mu_x[n] # radial direction cosine
abs_eta = abs(eta_n)
w_n = weights[n]
alpha_in = alpha[m_local] # α_{m-1/2} ≥ 0
alpha_out = alpha[m_local + 1] # α_{m+1/2} ≥ 0
tau_m = tau_level[m_local] # M-M angular closure weight
c_out = alpha_out / tau_m
c_in = (1.0 - tau_m) / tau_m * alpha_out + alpha_in
if eta_n < 0:
# Inward sweep: outer → centre
psi_spatial_in = bc_outer[ref[n]].copy()
for i in range(nx - 1, -1, -1):
A_in = A[i + 1]
A_out = A[i]
dA_w = dAw[i, m_local] # precomputed geometry factor
denom = (2.0 * abs_eta * A_out
+ dA_w * c_out
+ sig_t_1d[i] * V[i])
numer = (QV[i]
+ abs_eta * (A_in + A_out) * psi_spatial_in
+ dA_w * c_in * psi_angle[i])
psi = numer / denom
psi_spatial_out = 2.0 * psi - psi_spatial_in
psi_angle[i] = (psi - (1.0 - tau_m) * psi_angle[i]) / tau_m
angular_flux[n, i, 0, :] = psi
scalar_flux[i] += w_n * psi
psi_spatial_in = psi_spatial_out
elif abs_eta < 1e-15:
# Pure azimuthal ordinate (η≈0): no radial streaming.
for i in range(nx):
dA_w = dAw[i, m_local]
denom = dA_w * c_out + sig_t_1d[i] * V[i]
numer = (QV[i]
+ dA_w * c_in * psi_angle[i])
psi = numer / denom
psi_angle[i] = (psi - (1.0 - tau_m) * psi_angle[i]) / tau_m
angular_flux[n, i, 0, :] = psi
scalar_flux[i] += w_n * psi
else:
# Outward sweep: centre → outer
psi_spatial_in = np.zeros(ng)
for i in range(nx):
A_in = A[i]
A_out = A[i + 1]
dA_w = dAw[i, m_local]
denom = (2.0 * abs_eta * A_out
+ dA_w * c_out
+ sig_t_1d[i] * V[i])
numer = (QV[i]
+ abs_eta * (A_in + A_out) * psi_spatial_in
+ dA_w * c_in * psi_angle[i])
psi = numer / denom
psi_spatial_out = 2.0 * psi - psi_spatial_in
psi_angle[i] = (psi - (1.0 - tau_m) * psi_angle[i]) / tau_m
angular_flux[n, i, 0, :] = psi
scalar_flux[i] += w_n * psi
psi_spatial_in = psi_spatial_out
# Store outgoing at outer boundary for reflective BC
bc_outer[n] = psi_spatial_out
return angular_flux, scalar_flux[:, None, :] # restore ny=1 dim
# ═══════════════════════════════════════════════════════════════════════
# 2D wavefront path (vectorized along anti-diagonals)
# ═══════════════════════════════════════════════════════════════════════
def _sweep_2d_wavefront(
Q: np.ndarray,
sig_t: np.ndarray,
sn_mesh: SNMesh,
psi_bc: dict,
Q_aniso: np.ndarray | None = None,
) -> tuple[np.ndarray, np.ndarray]:
"""2D sweep using wavefront parallelism along anti-diagonals.
Uses the precomputed streaming stencil from SNMesh:
streaming_x[n, i] = 2|μ_x[n]| / dx[i],
streaming_y[n, j] = 2|μ_y[n]| / dy[j].
"""
dx = sn_mesh.dx
dy = sn_mesh.dy
nx, ny, ng = Q.shape
quad = sn_mesh.quad
N = quad.N
mu_x = quad.mu_x
mu_y = quad.mu_y
weights = quad.weights
ref_x = quad.reflection_index("x")
ref_y = quad.reflection_index("y")
angular_flux = np.zeros((N, nx, ny, ng))
scalar_flux = np.zeros((nx, ny, ng))
# Persistent boundary flux arrays for reflective BCs
if "bc_2d_x" not in psi_bc:
psi_bc["bc_2d_x"] = np.zeros((N, nx + 1, ny, ng))
psi_bc["bc_2d_y"] = np.zeros((N, nx, ny + 1, ng))
psi_x = psi_bc["bc_2d_x"] # (N, nx+1, ny, ng) face fluxes in x
psi_y = psi_bc["bc_2d_y"] # (N, nx, ny+1, ng) face fluxes in y
weight_norm = 1.0 / weights.sum()
# Precompute diagonal indices per sweep direction (4 directions).
_diag_cache: dict[tuple[int, int], tuple] = {}
for sx in (-1, 1):
for sy in (-1, 1):
ix_arr = np.arange(nx) if sx >= 0 else np.arange(nx - 1, -1, -1)
iy_arr = np.arange(ny) if sy >= 0 else np.arange(ny - 1, -1, -1)
diags = []
for k in range(nx + ny - 1):
i_start = max(0, k - ny + 1)
i_end = min(nx - 1, k)
local_i = np.arange(i_start, i_end + 1)
local_j = k - local_i
diags.append((ix_arr[local_i], iy_arr[local_j]))
_diag_cache[(sx, sy)] = (
0 if sx >= 0 else 1, # ix_in
1 if sx >= 0 else 0, # ix_out
0 if sy >= 0 else 1, # iy_in
1 if sy >= 0 else 0, # iy_out
diags,
)
# Precompute scaled source (avoids recomputing per diagonal)
Q_scaled = Q * weight_norm
has_aniso = Q_aniso is not None
if has_aniso:
Q_aniso_scaled = Q_aniso * weight_norm # (N, nx, ny, ng)
# Precomputed streaming stencil
str_x = sn_mesh.streaming_x # (N_ord, nx)
str_y = sn_mesh.streaming_y # (N_ord, ny)
for n in range(N):
mx = mu_x[n]
my = mu_y[n]
w = weights[n]
# Per-ordinate source: isotropic + anisotropic (if present)
Q_n = Q_scaled
if has_aniso:
Q_n = Q_scaled + Q_aniso_scaled[n] # (nx, ny, ng)
if abs(mx) < 1e-15 and abs(my) < 1e-15:
# Pure z-directed ordinate: no streaming in x or y.
psi_avg = Q_n / sig_t # (nx, ny, ng)
angular_flux[n, :, :, :] = psi_avg
scalar_flux += w * psi_avg
continue
# Look up precomputed diagonal indices for this sweep direction
key = (1 if mx >= 0 else -1, 1 if my >= 0 else -1)
ix_in, ix_out, iy_in, iy_out, diags = _diag_cache[key]
# Reflective BC: incoming boundary flux from reflected partner
if mx >= 0:
psi_x[n, 0, :, :] = psi_x[ref_x[n], 0, :, :]
else:
psi_x[n, nx, :, :] = psi_x[ref_x[n], nx, :, :]
if my >= 0:
psi_y[n, :, 0, :] = psi_y[ref_y[n], :, 0, :]
else:
psi_y[n, :, ny, :] = psi_y[ref_y[n], :, ny, :]
# Precomputed streaming for this ordinate
str_x_n = str_x[n] # (nx,)
str_y_n = str_y[n] # (ny,)
for ii, jj in diags:
# Gather incoming face fluxes
psi_in_x = psi_x[n, ii + ix_in, jj, :] # (n_diag, ng)
psi_in_y = psi_y[n, ii, jj + iy_in, :] # (n_diag, ng)
# Precomputed streaming coefficients for these cells
sx_ii = str_x_n[ii, None] # (n_diag, 1)
sy_jj = str_y_n[jj, None] # (n_diag, 1)
# Diamond-difference equation using precomputed stencil
denom = sig_t[ii, jj, :] + sx_ii + sy_jj
psi_avg = (
Q_n[ii, jj, :]
+ sx_ii * psi_in_x
+ sy_jj * psi_in_y
) / denom
# Store outgoing face fluxes for next diagonal
psi_x[n, ii + ix_out, jj, :] = 2.0 * psi_avg - psi_in_x
psi_y[n, ii, jj + iy_out, :] = 2.0 * psi_avg - psi_in_y
# Accumulate angular and scalar flux
angular_flux[n, ii, jj, :] = psi_avg
scalar_flux[ii, jj, :] += w * psi_avg
return angular_flux, scalar_flux