Source code for orpheus.thermal_hydraulics.solver

"""Coupled thermal-hydraulics and fuel behaviour under LOCA conditions.

Simulates a Loss-of-Coolant Accident in a PWR for 600 s, coupling:
  - 1-D axial thermal-hydraulics (2 nodes, two-phase flow with regime selection)
  - 1-D radial fuel/clad heat conduction
  - Cladding thermo-mechanical deformation (elastic + plastic)
  - Cladding failure detection (burst stress criterion)

Port of MATLAB Module 09 (``thermalHydraulics.m``, ``initializeFuelRod.m``,
``initializeCoolant.m``, ``funRHS.m``, ``cladFailureEvent.m``).

Key restructuring vs. MATLAB: pressures and cladding stress components are
computed algebraically at each RHS evaluation (not carried as DAE state
variables).  The ODE state vector contains only genuinely time-evolving
quantities: temperatures, coolant enthalpy, plastic strains.

.. seealso:: :ref:`theory-thermal-hydraulics` — Key Facts, LOCA model, wall heat transfer.
"""

from __future__ import annotations

from dataclasses import dataclass, field

import numpy as np
from scipy.integrate import solve_ivp

from orpheus.data.materials import matpro
from orpheus.data.materials.h2o_properties import h2o_density, h2o_enthalpy, h2o_properties


# ============================================================================
# Data classes
# ============================================================================


[docs] @dataclass class THParams: """Geometry, boundary conditions and time parameters for the LOCA simulation.""" # --- axial mesh --- nz: int = 2 dz0: float = 1.5 # initial node height (m) # --- fuel --- fuel_r_in: float = 0.0 # inner fuel radius (m) fuel_r_out: float = 4.12e-3 # outer fuel radius (m) fuel_nr: int = 20 # radial nodes in fuel porosity: float = 0.05 # fuel porosity (-) burnup: float = 0.0 # fuel burnup (MWd/kgHM) fgr: float = 0.06 # fission gas release fraction (-) # --- cladding --- clad_r_in: float = 4.22e-3 # inner clad radius (m) clad_r_out: float = 4.75e-3 # outer clad radius (m) clad_nr: int = 5 # radial nodes in clad # --- gap and inner gas --- roughness: float = 6e-6 # effective roughness (m) p0_ingas: float = 1.2 # fabrication He pressure (MPa) v_gas_plenum: float = 10e-6 # gas plenum volume (m3) # --- coolant --- a_flow0: float = 8.914e-5 # channel flow area (m2) # --- time --- time_step: float = 1.0 # output time step (s) time_end: float = 600.0 # end time (s) # --- boundary condition tables (time, value) --- # Inlet temperature (K) vs. time (s) T0_table: np.ndarray = field(default_factory=lambda: np.array([ [0.0, 200.0, 210.0, 5000.0], [553.0, 553.0, 380.0, 380.0], ])) # Outlet pressure (MPa) vs. time (s) p0_table: np.ndarray = field(default_factory=lambda: np.array([ [0.0, 200.0, 200.3, 200.5, 201.4, 201.9, 202.2, 203.0, 203.8, 206.3, 207.9, 210.4, 213.1, 216.6, 218.8, 221.0, 224.8, 229.5, 235.8, 243.9, 250.2, 256.8, 264.1, 273.7, 291.7, 313.5, 334.0, 5000.0], [15.5, 15.5, 14.65727, 13.69425, 12.27957, 11.13598, 10.41352, 9.63108, 8.54763, 7.52432, 7.52432, 6.56115, 5.35727, 3.79223, 2.70878, 1.80575, 1.20389, 0.72230, 0.54173, 0.39122, 0.36115, 0.33108, 0.39122, 0.39122, 0.33108, 0.33108, 0.33108, 0.33108], ])) # Inlet velocity (m/s) vs. time (s) vel0_table: np.ndarray = field(default_factory=lambda: np.array([ [0.0, 200.0, 201.0, 202.0, 203.0, 440.0, 450.0, 5000.0], [4.8, 4.8, 2.0, 1.0, 0.002, 0.002, 0.1, 0.1], ])) # Power (W) vs. time (s) power_table: np.ndarray = field(default_factory=lambda: np.array([ [0.0, 200.0, 201.1, 211.1, 221.1, 231.1, 241.1, 251.1, 261.1, 271.1, 281.1, 291.1, 301.1, 311.1, 321.1, 331.1, 341.1, 351.1, 361.1, 371.1, 381.1, 391.1, 401.1, 411.1, 421.1, 431.1, 441.1, 451.1, 461.1, 471.1, 481.1, 491.1, 5000.0], [69140.625, 69140.625, 6659.609, 2551.766, 2203.547, 2020.457, 1899.090, 1809.574, 1739.305, 1681.844, 1633.465, 1591.848, 1555.434, 1523.145, 1494.195, 1468.004, 1444.125, 1422.207, 1401.977, 1383.207, 1365.719, 1349.359, 1334.000, 1319.535, 1305.875, 1292.938, 1280.660, 1268.980, 1257.848, 1247.219, 1237.051, 1227.309, 1217.961], ])) # Axial power peaking factors (per node) kz: np.ndarray = field(default_factory=lambda: np.ones(2))
[docs] @dataclass class THResult: """Complete time-history of the thermal-hydraulics LOCA calculation.""" time: np.ndarray # (nt,) # Fuel temperature (K) -- (nz, nf, nt) fuel_T: np.ndarray # Clad temperature (K) -- (nz, nc, nt) clad_T: np.ndarray # Coolant temperature (K) -- (nz, nt) cool_T: np.ndarray # Saturation temperature (K) -- (nz, nt) cool_Tsat: np.ndarray # CHF temperature (K) -- (nz, nt) cool_TCHF: np.ndarray # Coolant pressure (MPa) -- (nz, nt) cool_p: np.ndarray # Coolant enthalpy (J/kg) -- (nz, nt) cool_h: np.ndarray # Coolant velocity (m/s) -- (nz, nt) cool_vel: np.ndarray # Void fraction (-) -- (nz, nt) cool_void: np.ndarray # Equilibrium quality (-) -- (nz, nt) cool_x: np.ndarray # Flow regime (1/2/3) -- (nz, nt) cool_regime: np.ndarray # Inner gas pressure (MPa) -- (nt,) ingas_p: np.ndarray # Fuel outer radius (m) -- (nz, nt) fuel_r: np.ndarray # Clad inner/outer radius (m) -- (nz, nt) clad_r_in: np.ndarray clad_r_out: np.ndarray # Cladding stresses (MPa) -- (nz, nc, nt) clad_sig_r: np.ndarray clad_sig_h: np.ndarray clad_sig_z: np.ndarray # Engineering & burst stress (MPa) -- (nz, nt) clad_sig_I: np.ndarray clad_sig_B: np.ndarray # Effective plastic strain (fraction) -- (nz, nc, nt) clad_epsPeff: np.ndarray # Clad failure time (s), NaN if no failure clad_fail_time: float params: THParams = field(repr=False)
# ============================================================================ # Helpers # ============================================================================ def _interp_between(y: np.ndarray) -> np.ndarray: """Midpoint interpolation between adjacent nodes (last axis).""" return 0.5 * (y[..., :-1] + y[..., 1:]) def _von_mises(sig_r: np.ndarray, sig_h: np.ndarray, sig_z: np.ndarray) -> np.ndarray: return np.sqrt( 0.5 * (sig_r - sig_h) ** 2 + 0.5 * (sig_r - sig_z) ** 2 + 0.5 * (sig_h - sig_z) ** 2 ) def _interp_bc(table: np.ndarray, t: float) -> float: """Interpolate a boundary condition table at time *t*.""" return float(np.interp(t, table[0], table[1])) # ============================================================================ # Initialization # ============================================================================ def _initialize_th( params: THParams, ) -> tuple[dict, np.ndarray]: """Build internal parameter dict and initial ODE state vector. Returns ------- p : dict -- precomputed constants carried through the integration. y0 : 1-D array -- initial ODE state vector. """ nz = params.nz nf = params.fuel_nr nc = params.clad_nr dz0 = params.dz0 # --- fuel mesh --- fuel_dr0 = (params.fuel_r_out - params.fuel_r_in) / (nf - 1) fuel_r0 = np.linspace(params.fuel_r_in, params.fuel_r_out, nf) fuel_r0_bnd = np.concatenate( ([params.fuel_r_in], _interp_between(fuel_r0), [params.fuel_r_out]) ) # volume per node, per axial level (nz, nf) fuel_v0_1d = np.pi * np.diff(fuel_r0_bnd ** 2) * dz0 # (nf,) fuel_v0 = np.tile(fuel_v0_1d, (nz, 1)) # (nz, nf) fuel_mass = matpro.UO2_RHO * (1.0 - params.porosity) * fuel_v0 # initial power pow0 = params.power_table[1, 0] lhgr0 = pow0 / (dz0 * nz) fuel_qV = np.outer( lhgr0 * dz0 * params.kz / fuel_v0.sum(axis=1), np.ones(nf), ) # (nz, nf) # radial heat transfer areas between fuel nodes (nf-1,) fuel_a_bnd = 2 * np.pi * _interp_between(fuel_r0) * dz0 # --- clad mesh --- clad_dr0 = (params.clad_r_out - params.clad_r_in) / (nc - 1) clad_r0 = np.linspace(params.clad_r_in, params.clad_r_out, nc) clad_r0_bnd = np.concatenate( ([params.clad_r_in], _interp_between(clad_r0), [params.clad_r_out]) ) clad_v0_1d = np.pi * np.diff(clad_r0_bnd ** 2) * dz0 # (nc,) clad_v0 = np.tile(clad_v0_1d, (nz, 1)) # (nz, nc) # radial heat transfer areas between clad nodes (nc-1,) clad_a_bnd = 2 * np.pi * _interp_between(clad_r0) * dz0 # --- gap --- gap_dr0 = params.clad_r_in - params.fuel_r_out gap_r0_mid = 0.5 * (params.clad_r_in + params.fuel_r_out) gap_a_bnd = 2 * np.pi * gap_r0_mid * dz0 # scalar # --- coolant geometry --- a_flow0 = params.a_flow0 a_flow = np.full(nz, a_flow0) vol_flow = a_flow * dz0 area_HX = 2 * np.pi * params.clad_r_out * dz0 d_hyd = 4 * vol_flow / area_HX # --- inner gas initial moles --- v_gap0 = dz0 * nz * np.pi * (params.clad_r_in ** 2 - params.fuel_r_out ** 2) v_void0 = dz0 * nz * np.pi * params.fuel_r_in ** 2 mu_He0 = (params.p0_ingas * 1e6) * (params.v_gas_plenum + v_gap0 + v_void0) / (8.31 * 293.0) # --- initial coolant state --- p0_cool = params.p0_table[1, 0] T0_cool = params.T0_table[1, 0] h0_cool = h2o_enthalpy(p0_cool, T0_cool) # J/kg # --- pack internal params --- p = dict( nz=nz, nf=nf, nc=nc, dz0=dz0, fuel_r0=fuel_r0, fuel_dr0=fuel_dr0, fuel_r0_bnd=fuel_r0_bnd, fuel_v0=fuel_v0, fuel_mass=fuel_mass, fuel_a_bnd=fuel_a_bnd, clad_r0=clad_r0, clad_dr0=clad_dr0, clad_r0_bnd=clad_r0_bnd, clad_v0=clad_v0, clad_a_bnd=clad_a_bnd, gap_dr0=gap_dr0, gap_r0_mid=gap_r0_mid, gap_a_bnd=gap_a_bnd, roughness=params.roughness, a_flow0=a_flow0, a_flow=a_flow, vol_flow=vol_flow, area_HX=area_HX, d_hyd=d_hyd, mu_He0=mu_He0, v_gas_plenum=params.v_gas_plenum, porosity=params.porosity, burnup=params.burnup, fuel_r_in=params.fuel_r_in, fuel_r_out=params.fuel_r_out, clad_r_in_fab=params.clad_r_in, clad_r_out_fab=params.clad_r_out, T0_table=params.T0_table, p0_table=params.p0_table, vel0_table=params.vel0_table, power_table=params.power_table, kz=params.kz, clad_fail=False, ) # --- initial state vector --- # Layout: fuel_T (nz*nf), clad_T (nz*nc), cool_h (nz), # clad_epsPeff (nz*nc), clad_epsP (nz*nc*3) n_ode = nz * nf + nz * nc + nz + nz * nc + nz * nc * 3 y0 = np.zeros(n_ode) y0[: nz * nf] = T0_cool # fuel temperatures y0[nz * nf : nz * nf + nz * nc] = T0_cool # clad temperatures y0[nz * nf + nz * nc : nz * nf + nz * nc + nz] = h0_cool # coolant enthalpy # plastic strains start at zero return p, y0 # ============================================================================ # State vector packing / unpacking # ============================================================================ def _unpack(y: np.ndarray, p: dict) -> dict: """Unpack flat ODE state vector into named 2-D arrays.""" nz, nf, nc = p["nz"], p["nf"], p["nc"] idx = 0 fuel_T = y[idx : idx + nz * nf].reshape(nz, nf) idx += nz * nf clad_T = y[idx : idx + nz * nc].reshape(nz, nc) idx += nz * nc cool_h = y[idx : idx + nz].copy() idx += nz clad_epsPeff = y[idx : idx + nz * nc].reshape(nz, nc) idx += nz * nc clad_epsP = np.empty((3, nz, nc)) for i in range(3): clad_epsP[i] = y[idx : idx + nz * nc].reshape(nz, nc) idx += nz * nc return dict( fuel_T=fuel_T, clad_T=clad_T, cool_h=cool_h, clad_epsPeff=clad_epsPeff, clad_epsP=clad_epsP, ) def _pack_rates( rate_fuel_T: np.ndarray, # (nz, nf) rate_clad_T: np.ndarray, # (nz, nc) rate_cool_h: np.ndarray, # (nz,) rate_epsPeff: np.ndarray, # (nz, nc) rate_epsP: list, # [3 x (nz, nc)] ) -> np.ndarray: """Pack rates into a flat vector matching the state layout.""" parts = [ rate_fuel_T.ravel(), rate_clad_T.ravel(), rate_cool_h, rate_epsPeff.ravel(), ] for i in range(3): parts.append(rate_epsP[i].ravel()) return np.concatenate(parts) # ============================================================================ # Algebraic pressure computation # ============================================================================ def _compute_pressure(cool_h, cool_props, vel0, T0, p0, p): """Compute coolant pressure at each axial node algebraically. Uses the outlet pressure boundary condition and cumulative pressure drops from gravity, wall friction, and acceleration. Returns (pressure, velocity, mdot, Re) arrays. """ nz = p["nz"] dz0 = p["dz0"] a_flow = p["a_flow"] a_flow0 = p["a_flow0"] d_hyd = p["d_hyd"] rho_inlet = h2o_density(p0, T0) # mass flow rate at inlet mdot0 = vel0 * a_flow0 * rho_inlet # mass flow rate at junctions (nz+1,) -- uniform mdot_jnc = np.full(nz + 1, mdot0) # mass flow rate in nodes (nz,) -- midpoint interpolation mdot = 0.5 * (mdot_jnc[:-1] + mdot_jnc[1:]) # extract per-node density and kinematic viscosity rho = np.array([pro.rho for pro in cool_props]) nu = np.array([pro.nu for pro in cool_props]) # node velocities vel = mdot / rho / a_flow # Reynolds number Re = d_hyd * np.abs(vel) / nu # Churchill friction factor theta1 = -2.457 * np.log((7.0 / Re) ** 0.9) theta2 = 37530.0 / Re f_wall = 8 * ((8.0 / Re) ** 12 + 1.0 / (theta1 ** 16 + theta2 ** 16) ** 1.5) ** (1.0 / 12.0) # wall friction pressure drop (MPa) dp_fric = f_wall * dz0 / (2 * d_hyd) * rho * vel ** 2 / 2 * 1e-6 # gravity pressure drop (MPa) dp_grav = rho * 9.81e-6 * dz0 # acceleration pressure drop at junctions (nz-1,) dp_accel_jnc = -np.diff(rho * vel ** 2) * 1e-6 # junction pressures from outlet boundary upward: # p_jnc[nz] = p0, p_jnc[i] = p_jnc[i+1] + dp_grav[i] + dp_fric[i] p_jnc = np.empty(nz + 1) p_jnc[nz] = p0 for i in range(nz - 1, -1, -1): p_jnc[i] = p_jnc[i + 1] + dp_grav[i] + dp_fric[i] # node pressures (midpoint interpolation + acceleration correction) p_node = 0.5 * (p_jnc[:-1] + p_jnc[1:]) # acceleration correction: cumsum from top of dp_accel if nz > 1: accel_corr = np.zeros(nz) accel_corr[:-1] = np.cumsum(dp_accel_jnc[::-1])[::-1] p_node += accel_corr return p_node, vel, mdot, Re # ============================================================================ # Algebraic cladding stress solver # ============================================================================ def _solve_clad_stress(clad_T, clad_epsP, p_cool, p_ingas, p): """Solve for cladding stress components at each axial level. For each axial node independently, solve a linear system of 3*(nc-1) + 3 = 3*nc equations for sig_r, sig_h, sig_z. Returns dict with sig_r, sig_h, sig_z arrays (nz, nc). """ nz, nc = p["nz"], p["nc"] clad_r0 = p["clad_r0"] clad_dr0 = p["clad_dr0"] clad_r_mid = _interp_between(clad_r0) sig_r = np.zeros((nz, nc)) sig_h = np.zeros((nz, nc)) sig_z = np.zeros((nz, nc)) for iz in range(nz): T = clad_T[iz] # (nc,) epsP = clad_epsP[:, iz, :] # (3, nc) # material properties E = matpro.zry_E(T) # (nc,) MPa nu_c = matpro.ZRY_NU epsT = matpro.zry_thexp(T) # (nc,) # non-elastic strain epsNE = np.empty((3, nc)) for i in range(3): epsNE[i] = epsT + epsP[i] # compliance Cs = 1.0 / E Cc = -nu_c / E n_unk = 3 * nc A = np.zeros((n_unk, n_unk)) b = np.zeros(n_unk) # column offsets ir = 0 ih = nc izz = 2 * nc row = 0 # --- stress equilibrium: nc-1 equations --- for j in range(nc - 1): c_diff = 1.0 / clad_dr0 c_avg = 0.5 / clad_r_mid[j] A[row, ir + j] = -c_diff + c_avg A[row, ir + j + 1] = c_diff + c_avg A[row, ih + j] = -c_avg A[row, ih + j + 1] = -c_avg row += 1 # --- strain compatibility: nc-1 equations --- for j in range(nc - 1): jp = j + 1 Cs_j, Cs_jp = Cs[j], Cs[jp] Cc_j, Cc_jp = Cc[j], Cc[jp] r_ = clad_r_mid[j] dr = clad_dr0 # diff(eps_h)/dr A[row, ih + j] += -Cs_j / dr A[row, ih + jp] += Cs_jp / dr A[row, ir + j] += -Cc_j / dr A[row, ir + jp] += Cc_jp / dr A[row, izz + j] += -Cc_j / dr A[row, izz + jp] += Cc_jp / dr # -(eps_r_ - eps_h_)/r_ dC_j = Cs_j - Cc_j dC_jp = Cs_jp - Cc_jp A[row, ir + j] += -0.5 * dC_j / r_ A[row, ir + jp] += -0.5 * dC_jp / r_ A[row, ih + j] += 0.5 * dC_j / r_ A[row, ih + jp] += 0.5 * dC_jp / r_ dNE_h = epsNE[1, jp] - epsNE[1, j] rhs_ne = epsNE[0, j] - epsNE[1, j] + epsNE[0, jp] - epsNE[1, jp] b[row] = -dNE_h / dr + 0.5 * rhs_ne / r_ row += 1 # --- axial strain constant: nc-1 equations --- for j in range(nc - 1): jp = j + 1 A[row, izz + j] += Cs[j] A[row, ir + j] += Cc[j] A[row, ih + j] += Cc[j] A[row, izz + jp] += -Cs[jp] A[row, ir + jp] += -Cc[jp] A[row, ih + jp] += -Cc[jp] b[row] = -(epsNE[2, j] - epsNE[2, jp]) row += 1 # --- BC1: sig_r(outer) = -p_coolant --- A[row, ir + nc - 1] = 1.0 b[row] = -p_cool[iz] row += 1 # --- BC2: sig_r(inner) = -p_ingas --- A[row, ir + 0] = 1.0 b[row] = -p_ingas row += 1 # --- BC3: integral of sig_z = p_inner*r_inner^2 - p_outer*r_outer^2 --- clad_dr2 = np.diff(clad_r0 ** 2) for j in range(nc - 1): A[row, izz + j] += 0.5 * clad_dr2[j] A[row, izz + j + 1] += 0.5 * clad_dr2[j] b[row] = p_ingas * clad_r0[0] ** 2 - p_cool[iz] * clad_r0[-1] ** 2 row += 1 assert row == n_unk sigma = np.linalg.solve(A, b) sig_r[iz] = sigma[ir : ir + nc] sig_h[iz] = sigma[ih : ih + nc] sig_z[iz] = sigma[izz : izz + nc] return dict(sig_r=sig_r, sig_h=sig_h, sig_z=sig_z) # ============================================================================ # Wall heat transfer with regime selection # ============================================================================ def _wall_heat_transfer(Twall, cool_props, Lsat_arr, Vsat_arr, cool_p, cool_h, Re, d_hyd): """Compute wall heat flux (W/m2) with flow regime selection. Returns (qw, regime, TCHF) arrays of shape (nz,). """ nz = len(Twall) T_mix = np.array([pro.T for pro in cool_props]) Tsat = np.array([pro.Tsat for pro in cool_props]) void = np.array([pro.void for pro in cool_props]) k_mix = np.array([pro.k for pro in cool_props]) cp_mix = np.array([pro.c_p for pro in cool_props]) mu_mix = np.array([pro.mu for pro in cool_props]) rhoL = np.array([L.rho for L in Lsat_arr]) rhoV = np.array([V.rho for V in Vsat_arr]) hL = np.array([L.h for L in Lsat_arr]) hV = np.array([V.h for V in Vsat_arr]) kV = np.array([V.k for V in Vsat_arr]) cpV = np.array([V.c_p for V in Vsat_arr]) nuV = np.array([V.nu for V in Vsat_arr]) sigL = np.array([L.sig for L in Lsat_arr]) dTw = Twall - T_mix dTwSat = Twall - Tsat drhoSat = rhoL - rhoV dhLat = hV - hL dhSub = np.maximum(hL - cool_h, 0.0) # Prandtl number Pr = cp_mix * mu_mix / k_mix # subcooling and void correction multipliers kSub = 1.0 + 0.1 * (rhoL / rhoV) ** 0.75 * dhSub / dhLat kVoid = 1.0 - void # Single-phase forced convection (Dittus-Boelter) qw_1phase = np.maximum(4.36, 0.023 * Re ** 0.8 * Pr ** 0.4) * (k_mix / d_hyd) * dTw # Nucleate boiling (Thom) qw_NB = 2000 * np.abs(dTwSat) * dTwSat * np.exp(cool_p / 4.34) # Critical heat flux qw_CHF = ( 0.14 * dhLat * (9.81 * sigL * rhoV ** 2 * drhoSat) ** 0.25 * kSub * kVoid ) # Wall temperature at CHF TCHF = Tsat + np.sqrt(np.maximum(qw_CHF * np.exp(-cool_p / 4.34) / 2000, 0.0)) # Film boiling qw_FB = ( 0.25 * (9.81 * kV ** 2 * cpV * drhoSat / nuV) ** 0.333 * kSub * dTwSat ) # Regime selection qw = np.zeros(nz) regime = np.zeros(nz, dtype=int) Tsat_L = np.array([L.T for L in Lsat_arr]) for iz in range(nz): if Twall[iz] < Tsat_L[iz]: # Single-phase liquid forced convection regime[iz] = 1 qw[iz] = qw_1phase[iz] elif Twall[iz] < TCHF[iz]: # Pre-CHF boiling regime[iz] = 2 qw[iz] = max(qw_1phase[iz], qw_NB[iz]) else: # Post-CHF film boiling or single-phase steam regime[iz] = 3 qw[iz] = max(qw_1phase[iz] * (1 - kVoid[iz]), qw_FB[iz] * kVoid[iz]) return qw, regime, TCHF # ============================================================================ # RHS function # ============================================================================ def _rhs(t: float, y: np.ndarray, p: dict) -> np.ndarray: """Right-hand side of the ODE system.""" nz, nf, nc = p["nz"], p["nf"], p["nc"] dz0 = p["dz0"] state = _unpack(y, p) fuel_T = state["fuel_T"] # (nz, nf) clad_T = state["clad_T"] # (nz, nc) cool_h = state["cool_h"] # (nz,) clad_epsPeff = state["clad_epsPeff"] # (nz, nc) clad_epsP = state["clad_epsP"] # (3, nz, nc) # ====================================================================== # THERMAL HYDRAULICS # ====================================================================== # Interpolate boundary conditions vel0 = _interp_bc(p["vel0_table"], t) T0 = _interp_bc(p["T0_table"], t) p0 = _interp_bc(p["p0_table"], t) p0 = max(p0, 0.05) # floor to avoid sub-atmospheric singularities # Water properties at each node -- use previous-step pressure estimate # For the first call, use outlet pressure as estimate cool_props = [] Lsat_arr = [] Vsat_arr = [] # First pass: estimate pressure from outlet BC p_est = np.full(nz, p0) for iz in range(nz): pro, Lsat, Vsat = h2o_properties(max(p_est[iz], 0.05), cool_h[iz]) cool_props.append(pro) Lsat_arr.append(Lsat) Vsat_arr.append(Vsat) # Compute pressure algebraically cool_p, cool_vel, cool_mdot, cool_Re = _compute_pressure( cool_h, cool_props, vel0, T0, p0, p ) # Recompute water properties with corrected pressure cool_props.clear() Lsat_arr.clear() Vsat_arr.clear() for iz in range(nz): pro, Lsat, Vsat = h2o_properties(max(cool_p[iz], 0.05), cool_h[iz]) cool_props.append(pro) Lsat_arr.append(Lsat) Vsat_arr.append(Vsat) # Recompute velocity and Re with corrected density rho_corr = np.array([pro.rho for pro in cool_props]) nu_corr = np.array([pro.nu for pro in cool_props]) cool_vel = cool_mdot / rho_corr / p["a_flow"] cool_Re = p["d_hyd"] * np.abs(cool_vel) / nu_corr # ====================================================================== # FUEL STRAIN (simplified -- only thermal expansion) # ====================================================================== fuel_Tavg = np.sum(p["fuel_v0"] * fuel_T, axis=1) / np.sum(p["fuel_v0"], axis=1) fuel_epsT = matpro.uo2_thexp(fuel_Tavg) # (nz,) fuel_r = p["fuel_r_out"] * (1.0 + fuel_epsT) # (nz,) deformed outer radius fuel_dz = dz0 * (1.0 + fuel_epsT) # (nz,) deformed node height # ====================================================================== # CLAD STRAINS AND STRAIN RATES # ====================================================================== clad_Tavg = np.sum(p["clad_v0"] * clad_T, axis=1) / np.sum(p["clad_v0"], axis=1) # Thermal strain clad_epsT = matpro.zry_thexp(clad_T) # (nz, nc) # We need to solve for stresses to compute elastic strains and geometry. # First, compute inner gas pressure. # ====================================================================== # GAP AND INNER GAS # ====================================================================== gap_T = 0.5 * (fuel_T[:, -1] + clad_T[:, 0]) # (nz,) # Gas gap volume using fabricated clad inner radius (deformation is small) v_gas_gap = fuel_dz * np.pi * (p["clad_r_in_fab"] ** 2 - fuel_r ** 2) v_gas_gap = np.maximum(v_gas_gap, 1e-15) # prevent negative volumes ingas_Tplenum = np.array([pro.T for pro in cool_props])[-1] ingas_p = ( p["mu_He0"] * 8.31e-6 / (p["v_gas_plenum"] / ingas_Tplenum + np.sum(v_gas_gap / gap_T)) ) # Clad failure: inner gas pressure = coolant pressure if p["clad_fail"]: ingas_p = cool_p[-1] # ====================================================================== # CLAD STRESS (algebraic) # ====================================================================== stress = _solve_clad_stress(clad_T, clad_epsP, cool_p, ingas_p, p) sig_r = stress["sig_r"] # (nz, nc) sig_h = stress["sig_h"] sig_z = stress["sig_z"] # Store for event function access p["_last_stress"] = stress p["_last_ingas_p"] = ingas_p p["_last_cool_p"] = cool_p.copy() p["_last_clad_Tavg"] = clad_Tavg.copy() # ====================================================================== # CLAD STRAIN COMPUTATION AND GEOMETRY UPDATE # ====================================================================== sig_sum = sig_r + sig_h + sig_z clad_epsE = np.empty((3, nz, nc)) clad_eps = np.empty((3, nz, nc)) sigs = [sig_r, sig_h, sig_z] E_clad = matpro.zry_E(clad_T) # (nz, nc) for i in range(3): clad_epsE[i] = (sigs[i] - matpro.ZRY_NU * (sig_sum - sigs[i])) / E_clad clad_eps[i] = clad_epsT + clad_epsE[i] + clad_epsP[i] # Von Mises stress sigVM = _von_mises(sig_r, sig_h, sig_z) + 1e-6 # Effective plastic strain rate K_zry = matpro.zry_K(clad_T) m_zry = matpro.zry_m(clad_T) n_zry = matpro.zry_n(clad_T) fail_mult = 0.0 if p["clad_fail"] else 1.0 rate_epsPeff = fail_mult * np.minimum( 0.1, 1e-3 * (sigVM * 1e6 / K_zry / np.abs(clad_epsPeff + 1e-6) ** n_zry) ** (1.0 / m_zry), ) rate_epsP = [] for i in range(3): rate_epsP.append( rate_epsPeff * 1.5 * (sigs[i] - sig_sum / 3.0) / sigVM ) # Update clad geometry clad_eps_bnd = [_interp_between(clad_eps[i]) for i in range(3)] # (nz, nc-1) each clad_r = np.tile(p["clad_r0"], (nz, 1)) * (1.0 + clad_eps[1]) # hoop -> radial position clad_r_bnd = _interp_between(clad_r) # (nz, nc-1) clad_r_plus = np.column_stack([clad_r[:, 0], clad_r_bnd, clad_r[:, -1]]) # (nz, nc+1) clad_dz = np.tile(np.full(nc, dz0), (nz, 1)) * (1.0 + clad_eps[2]) # (nz, nc) clad_a_bnd_def = 2 * np.pi * clad_r_bnd * _interp_between(clad_dz) # (nz, nc-1) clad_v = np.pi * np.diff(clad_r_plus ** 2, axis=1) * clad_dz # (nz, nc) # Deformed clad heat transfer areas (nz, nc-1) — used for heat conduction clad_a_bnd = clad_a_bnd_def # computed on the line above # Store clad geometry for result extraction p["_last_clad_r"] = clad_r p["_last_fuel_r"] = fuel_r # ====================================================================== # GAP GEOMETRY UPDATE # ====================================================================== gap_dr = clad_r[:, 0] - fuel_r # (nz,) gap_open = np.all(gap_dr > 0) # Deformed gap geometry gap_r_mid = 0.5 * (clad_r[:, 0] + fuel_r) # (nz,) gap_a_bnd = 2 * np.pi * gap_r_mid * 0.5 * (clad_dz[:, 0] + fuel_dz) # (nz,) gap_kGasMix = matpro.k_He(gap_T) # (nz,) if gap_open: gap_h = gap_kGasMix / np.maximum(gap_dr, 1e-9) else: gap_h = gap_kGasMix / p["roughness"] # ====================================================================== # ENGINEERING STRESS (for failure detection) # ====================================================================== clad_sigI = ( (ingas_p - cool_p) * (clad_r[:, -1] + clad_r[:, 0]) / 2.0 / (clad_r[:, -1] - clad_r[:, 0]) ) p["_last_sigI"] = clad_sigI.copy() # ====================================================================== # WALL HEAT EXCHANGE # ====================================================================== Twall = clad_T[:, -1] # outer clad surface temperature qw, regime, TCHF = _wall_heat_transfer( Twall, cool_props, Lsat_arr, Vsat_arr, cool_p, cool_h, cool_Re, p["d_hyd"], ) # Store for result extraction p["_last_cool_props"] = cool_props p["_last_Lsat"] = Lsat_arr p["_last_Vsat"] = Vsat_arr p["_last_regime"] = regime.copy() p["_last_TCHF"] = TCHF.copy() p["_last_cool_vel"] = cool_vel.copy() # ====================================================================== # COOLANT ENTHALPY RATE # ====================================================================== h0_cool = h2o_enthalpy(max(p0, 0.05), T0) mdot_jnc = np.full(nz + 1, cool_mdot[0]) # uniform mass flow h_jnc = np.concatenate(([h0_cool], cool_h)) # upwind scheme area_HX = p["area_HX"] vol_flow = p["vol_flow"] rho = np.array([pro.rho for pro in cool_props]) rate_cool_h = ( -np.diff(mdot_jnc * h_jnc) + qw * area_HX ) / (rho * vol_flow) # ====================================================================== # FUEL ROD POWER # ====================================================================== pow_t = _interp_bc(p["power_table"], t) lhgr = pow_t / (dz0 * nz) fuel_qV = np.outer( lhgr * dz0 * p["kz"] / p["fuel_v0"].sum(axis=1), np.ones(nf), ) # (nz, nf) # ====================================================================== # FUEL ROD THERMAL CALCULATIONS # ====================================================================== # Temperature between fuel nodes (nz, nf-1) Tf_bnd = _interp_between(fuel_T) # Temperature between clad nodes (nz, nc-1) Tc_bnd = _interp_between(clad_T) # Gap heat flux (W/m2) qGap = gap_h * (fuel_T[:, -1] - clad_T[:, 0]) # Fuel radial heat flux between nodes (nz, nf-1) (W/m2) fuel_dr0 = p["fuel_dr0"] qFuel_bnd = -matpro.uo2_k(Tf_bnd, p["burnup"], p["porosity"]) * np.diff(fuel_T, axis=1) / fuel_dr0 # Fuel heat transfer with BCs: inner=0 (adiabatic), outer=gap # (nz, nf+1) array of heat flows at boundaries QFuel_bnd = np.column_stack([ np.zeros(nz), qFuel_bnd * p["fuel_a_bnd"], # (nz, nf-1) — fuel area stays initial (MATLAB convention) qGap * gap_a_bnd, # (nz,) — deformed gap area ]) # (nz, nf+1) # Clad radial heat flux between nodes (nz, nc-1) (W/m2) clad_dr0_val = p["clad_dr0"] qClad_bnd = -matpro.zry_k(Tc_bnd) * np.diff(clad_T, axis=1) / clad_dr0_val # Clad heat transfer with BCs: inner=gap, outer=wall QClad_bnd = np.column_stack([ qGap * gap_a_bnd, # (nz,) — deformed gap area qClad_bnd * clad_a_bnd, # (nz, nc-1) — deformed clad area qw * area_HX, # (nz,) ]) # (nz, nc+1) # Rate of fuel temperature rate_fuel_T = (-np.diff(QFuel_bnd, axis=1) + fuel_qV * p["fuel_v0"]) / ( p["fuel_mass"] * matpro.uo2_cp(fuel_T) ) # Rate of clad temperature rate_clad_T = -np.diff(QClad_bnd, axis=1) / ( matpro.ZRY_RHO * matpro.zry_cp(clad_T) * p["clad_v0"] ) return _pack_rates(rate_fuel_T, rate_clad_T, rate_cool_h, rate_epsPeff, rate_epsP) # ============================================================================ # Cladding failure event # ============================================================================ def _clad_failure_event(t: float, y: np.ndarray, p: dict) -> float: """Event function: returns sigB(Tavg) - sigI. When this crosses zero from above, cladding has failed (burst stress exceeded by engineering stress). """ # Use cached values from last RHS evaluation clad_Tavg = p.get("_last_clad_Tavg") sigI = p.get("_last_sigI") if clad_Tavg is None or sigI is None: # Not yet computed -- return large positive value return 1e6 sigB = matpro.zry_burst_stress(clad_Tavg) # Use outlet node (last axial level), matching MATLAB cladFailureEvent.m return float(sigB[-1] - sigI[-1]) _clad_failure_event.terminal = True _clad_failure_event.direction = -1 # trigger when burst stress is reached (positive -> negative) # ============================================================================ # Main solver # ============================================================================
[docs] def solve_thermal_hydraulics(params: THParams | None = None) -> THResult: """Run the LOCA thermal-hydraulics simulation. Parameters ---------- params : THParams, optional Simulation parameters. Uses defaults (PWR LOCA) if not provided. Returns ------- THResult with time histories of all key variables. """ if params is None: params = THParams() p, y0 = _initialize_th(params) time_end = params.time_end dt = params.time_step t_eval = np.arange(0, time_end + dt, dt) # Storage for results at each output step records = { "time": [], "fuel_T": [], "clad_T": [], "cool_T": [], "cool_Tsat": [], "cool_TCHF": [], "cool_p": [], "cool_h": [], "cool_vel": [], "cool_void": [], "cool_x": [], "cool_regime": [], "ingas_p": [], "fuel_r": [], "clad_r_in": [], "clad_r_out": [], "clad_sig_r": [], "clad_sig_h": [], "clad_sig_z": [], "clad_sig_I": [], "clad_sig_B": [], "clad_epsPeff": [], } def _record_snapshot(t_val, y_val): """Evaluate the RHS once to populate cached fields, then record.""" _rhs(t_val, y_val, p) state = _unpack(y_val, p) records["time"].append(t_val) records["fuel_T"].append(state["fuel_T"].copy()) records["clad_T"].append(state["clad_T"].copy()) records["cool_h"].append(state["cool_h"].copy()) records["clad_epsPeff"].append(state["clad_epsPeff"].copy()) cool_props = p.get("_last_cool_props", []) records["cool_T"].append(np.array([pro.T for pro in cool_props])) records["cool_Tsat"].append(np.array([pro.Tsat for pro in cool_props])) records["cool_p"].append(p.get("_last_cool_p", np.zeros(params.nz))) records["cool_vel"].append(p.get("_last_cool_vel", np.zeros(params.nz))) records["cool_void"].append(np.array([pro.void for pro in cool_props])) records["cool_x"].append(np.array([pro.x for pro in cool_props])) records["cool_regime"].append(p.get("_last_regime", np.zeros(params.nz))) records["cool_TCHF"].append(p.get("_last_TCHF", np.zeros(params.nz))) records["ingas_p"].append(p.get("_last_ingas_p", 0.0)) records["fuel_r"].append(p.get("_last_fuel_r", np.zeros(params.nz))) clad_r = p.get("_last_clad_r", np.zeros((params.nz, params.clad_nr))) records["clad_r_in"].append(clad_r[:, 0].copy()) records["clad_r_out"].append(clad_r[:, -1].copy()) stress = p.get("_last_stress", {}) records["clad_sig_r"].append(stress.get("sig_r", np.zeros((params.nz, params.clad_nr)))) records["clad_sig_h"].append(stress.get("sig_h", np.zeros((params.nz, params.clad_nr)))) records["clad_sig_z"].append(stress.get("sig_z", np.zeros((params.nz, params.clad_nr)))) records["clad_sig_I"].append(p.get("_last_sigI", np.zeros(params.nz))) clad_Tavg = p.get("_last_clad_Tavg", np.full(params.nz, 553.0)) records["clad_sig_B"].append(matpro.zry_burst_stress(clad_Tavg)) # --- Phase 1: integrate until clad failure or end --- clad_fail_time = np.nan t_eval_1 = t_eval[t_eval <= time_end] # Chunked integration with manual event detection (avoids scipy brentq # sign errors). Each chunk spans one output interval (time_step). t_cur = 0.0 y_cur = y0.copy() # Evaluate event at t=0 _rhs(t_cur, y_cur, p) ev_prev = _clad_failure_event(t_cur, y_cur, p) for t_next in t_eval_1: if t_next <= t_cur: continue sol_chunk = solve_ivp( fun=lambda t, y: _rhs(t, y, p), t_span=(t_cur, t_next), y0=y_cur, method="BDF", rtol=1e-6, atol=1e-4, max_step=10.0, ) if not sol_chunk.success: print(f"Warning: solver failed at t={t_cur:.2f} s: {sol_chunk.message}") break y_end = sol_chunk.y[:, -1] _rhs(t_next, y_end, p) ev_now = _clad_failure_event(t_next, y_end, p) if ev_prev > 0 and ev_now <= 0: # Clad failure detected in this chunk. Use the chunk endpoint # (1 s resolution) as the failure state — it is a fully converged # solver state, unlike dense-output or bisection interpolants # which can produce unphysical intermediate values. clad_fail_time = t_next y_fail = y_end.copy() print(f"Clad failure at time {clad_fail_time:.2f} s") # Record snapshot at failure time _record_snapshot(clad_fail_time, y_fail) break # No event — record snapshot at this output time _record_snapshot(t_next, y_end) y_cur = y_end.copy() t_cur = t_next ev_prev = ev_now # --- Phase 2: continue from failure to end (failed-clad mode) --- if not np.isnan(clad_fail_time): p["clad_fail"] = True t_eval_2 = np.arange( clad_fail_time + dt, time_end + dt, dt, ) t_eval_2 = t_eval_2[t_eval_2 <= time_end] # Chunked integration for post-failure: the Jacobian numerical # differencing can push water properties out of valid range, causing # NaN. Integrate chunk-by-chunk and stop gracefully if a chunk fails. y_cur2 = y_fail.copy() t_cur2 = clad_fail_time for t_next2 in t_eval_2: try: sol2 = solve_ivp( fun=lambda t, y: _rhs(t, y, p), t_span=(t_cur2, t_next2), y0=y_cur2, method="BDF", rtol=1e-6, atol=1e-4, max_step=1.0, ) if not sol2.success: print(f"Warning: post-failure solver stopped at t={t_cur2:.1f} s") break _record_snapshot(sol2.t[-1], sol2.y[:, -1]) y_cur2 = sol2.y[:, -1].copy() t_cur2 = sol2.t[-1] except ValueError: print(f"Warning: post-failure solver hit NaN at t={t_cur2:.1f} s, " "stopping Phase 2") break # --- Build result dataclass --- nt = len(records["time"]) nz = params.nz nf = params.fuel_nr nc = params.clad_nr result = THResult( time=np.array(records["time"]), fuel_T=np.array(records["fuel_T"]).transpose(1, 2, 0), # (nz, nf, nt) clad_T=np.array(records["clad_T"]).transpose(1, 2, 0), # (nz, nc, nt) cool_T=np.array(records["cool_T"]).T, # (nz, nt) cool_Tsat=np.array(records["cool_Tsat"]).T, # (nz, nt) cool_TCHF=np.array(records["cool_TCHF"]).T, # (nz, nt) cool_p=np.array(records["cool_p"]).T, # (nz, nt) cool_h=np.array(records["cool_h"]).T, # (nz, nt) cool_vel=np.array(records["cool_vel"]).T, # (nz, nt) cool_void=np.array(records["cool_void"]).T, # (nz, nt) cool_x=np.array(records["cool_x"]).T, # (nz, nt) cool_regime=np.array(records["cool_regime"]).T, # (nz, nt) ingas_p=np.array(records["ingas_p"]), # (nt,) fuel_r=np.array(records["fuel_r"]).T, # (nz, nt) clad_r_in=np.array(records["clad_r_in"]).T, # (nz, nt) clad_r_out=np.array(records["clad_r_out"]).T, # (nz, nt) clad_sig_r=np.array(records["clad_sig_r"]).transpose(1, 2, 0), # (nz, nc, nt) clad_sig_h=np.array(records["clad_sig_h"]).transpose(1, 2, 0), clad_sig_z=np.array(records["clad_sig_z"]).transpose(1, 2, 0), clad_sig_I=np.array(records["clad_sig_I"]).T, # (nz, nt) clad_sig_B=np.array(records["clad_sig_B"]).T, # (nz, nt) clad_epsPeff=np.array(records["clad_epsPeff"]).transpose(1, 2, 0), # (nz, nc, nt) clad_fail_time=clad_fail_time, params=params, ) return result