# ruff: noqa: E501
"""
Dynamic transport equation for turbulent kinetic energy :math:`k`.
Implements GOTM Section 4.7.23 (tkeeq.F90) — solves the :math:`k`-equation for
a Boussinesq fluid in horizontally homogeneous flow (Eq. 150):
.. math::
\\dot{k} = \\mathcal{D}_k + P + G + P_x + P_s - \\varepsilon \\comma
where :math:`P` is shear production, :math:`G` is buoyancy production
(negative in stable stratification), :math:`P_s` is Stokes shear production,
:math:`P_x` is extra production, and :math:`\\varepsilon` is the dissipation
rate. The diffusive transport is (Eq. 151):
.. math::
\\mathcal{D}_k = \\frac{\\partial}{\\partial z}\\left(
\\frac{\\nu_t}{\\sigma_k} \\frac{\\partial k}{\\partial z}\\right) \\comma
with Schmidt number :math:`\\sigma_k`. The dissipation rate
:math:`\\varepsilon` is supplied either from the dynamic
:mod:`~pygotm.turbulence.dissipationeq` or :mod:`~pygotm.turbulence.omegaeq`,
or from a prescribed length-scale closure (Eq. 153):
.. math::
\\varepsilon = (c_\\mu^0)^3 \\frac{k^{3/2}}{l} \\point
Boundary conditions
-------------------
At solid boundaries (surface and bottom), boundary conditions follow
logarithmic-layer theory (Dirichlet):
.. math::
k = \\frac{u_\\tau^2}{(c_\\mu^0)^{1/2}} \\comma
or wave-breaking injection following Craig and Banner (1994):
.. math::
F_k = \\eta u_{\\tau s}^3 \\comma
where :math:`\\eta \\approx 100` is the Craig–Banner coefficient. The
injection boundary condition results in a non-trivial k-profile near the
surface (Dirichlet or Neumann depending on ``k_ubc``/``k_lbc``).
Authors (original Fortran): Lars Umlauf (rewrite), Hans Burchard, Karsten Bolding.
"""
import numba
import numpy as np
from pygotm.arrays import ColumnWorkspace, make_column_array
from pygotm.turbulence.turbulence import (
Dirichlet as _DIRICHLET,
)
from pygotm.turbulence.turbulence import (
Neumann as _NEUMANN,
)
from pygotm.turbulence.turbulence import (
injection as _INJECTION,
)
from pygotm.turbulence.turbulence import (
logarithmic as _LOGARITHMIC,
)
from pygotm.util.diff_face import diff_face
__all__ = [
"TKEEquationWorkspace",
"step_tkeeq",
"step_tkeeq_single",
]
_CNPAR: float = 1.0
[docs]
class TKEEquationWorkspace(ColumnWorkspace):
"""Workspace arrays for the translated TKE equation."""
tke: np.ndarray
tkeo: np.ndarray
h: np.ndarray
P: np.ndarray
B: np.ndarray
Px: np.ndarray
PSTK: np.ndarray
num: np.ndarray
eps: np.ndarray
avh: np.ndarray
l_sour: np.ndarray
q_sour: np.ndarray
u_taus: np.ndarray
u_taub: np.ndarray
z0s: np.ndarray
z0b: np.ndarray
au: np.ndarray
bu: np.ndarray
cu: np.ndarray
du: np.ndarray
ru: np.ndarray
qu: np.ndarray
def __init__(self, nlev: int, *, n_cols: int | None = None) -> None:
super().__init__(nlev, n_cols=n_cols)
self.tke = make_column_array(nlev, n_cols=n_cols)
self.tkeo = make_column_array(nlev, n_cols=n_cols)
self.h = make_column_array(nlev, n_cols=n_cols)
self.P = make_column_array(nlev, n_cols=n_cols)
self.B = make_column_array(nlev, n_cols=n_cols)
self.Px = make_column_array(nlev, n_cols=n_cols)
self.PSTK = make_column_array(nlev, n_cols=n_cols)
self.num = make_column_array(nlev, n_cols=n_cols)
self.eps = make_column_array(nlev, n_cols=n_cols)
self.avh = make_column_array(nlev, n_cols=n_cols)
self.l_sour = make_column_array(nlev, n_cols=n_cols)
self.q_sour = make_column_array(nlev, n_cols=n_cols)
self.u_taus = make_column_array(nlev, n_cols=n_cols)
self.u_taub = make_column_array(nlev, n_cols=n_cols)
self.z0s = make_column_array(nlev, n_cols=n_cols)
self.z0b = make_column_array(nlev, n_cols=n_cols)
self.au = make_column_array(nlev, n_cols=n_cols)
self.bu = make_column_array(nlev, n_cols=n_cols)
self.cu = make_column_array(nlev, n_cols=n_cols)
self.du = make_column_array(nlev, n_cols=n_cols)
self.ru = make_column_array(nlev, n_cols=n_cols)
self.qu = make_column_array(nlev, n_cols=n_cols)
@numba.njit(cache=True)
def _fk_craig(u_tau: float, eta: float) -> float:
return eta * u_tau**3
@numba.njit(cache=True)
def _k_bc_value(
bc: int,
type_: int,
zi: float,
z0: float,
u_tau: float,
cm0: float,
sig_k: float,
cmsf: float,
cw: float,
gen_alpha: float,
gen_l: float,
) -> float:
value = 0.0
if type_ == _LOGARITHMIC:
if bc == _DIRICHLET:
value = u_tau**2 / cm0**2
else:
value = 0.0
if type_ == _INJECTION:
f_k = _fk_craig(u_tau, cw)
capital_k = ((-sig_k * f_k / (cmsf * gen_alpha * gen_l)) ** (2.0 / 3.0)) / (
z0**gen_alpha
)
if bc == _DIRICHLET:
value = capital_k * (zi + z0) ** gen_alpha
else:
value = (
-cmsf
/ sig_k
* capital_k**1.5
* gen_alpha
* gen_l
* (zi + z0) ** (1.5 * gen_alpha)
)
return value
@numba.njit(cache=True)
def _step_tkeeq(
nlev: int,
dt: float,
sig_k: float,
k_min: float,
k_ubc: int,
k_lbc: int,
ubc_type: int,
lbc_type: int,
cm0: float,
cmsf: float,
cw: float,
gen_alpha: float,
gen_l: float,
tke: np.ndarray,
tkeo: np.ndarray,
h: np.ndarray,
P: np.ndarray,
B: np.ndarray,
Px: np.ndarray,
PSTK: np.ndarray,
num: np.ndarray,
eps: np.ndarray,
avh: np.ndarray,
l_sour: np.ndarray,
q_sour: np.ndarray,
u_taus: float,
u_taub: float,
z0s: float,
z0b: float,
au: np.ndarray,
bu: np.ndarray,
cu: np.ndarray,
du: np.ndarray,
ru: np.ndarray,
qu: np.ndarray,
) -> None:
r"""Advance the dynamic k-equation for a single column."""
for i in range(nlev + 1):
tkeo[i] = tke[i]
avh[i] = 0.0
l_sour[i] = 0.0
q_sour[i] = 0.0
for i in range(1, nlev):
avh[i] = num[i] / sig_k
prod = P[i] + Px[i] + PSTK[i]
buoyan = B[i]
diss = eps[i]
if prod + buoyan > 0.0:
q_sour[i] = prod + buoyan
l_sour[i] = -diss / tke[i]
else:
q_sour[i] = prod
l_sour[i] = -(diss - buoyan) / tke[i]
pos_bc = h[nlev]
if k_ubc == _NEUMANN:
pos_bc = 0.5 * h[nlev]
diff_k_up = _k_bc_value(
k_ubc,
ubc_type,
pos_bc,
z0s,
u_taus,
cm0,
sig_k,
cmsf,
cw,
gen_alpha,
gen_l,
)
pos_bc = h[1]
if k_lbc == _NEUMANN:
pos_bc = 0.5 * h[1]
diff_k_down = _k_bc_value(
k_lbc,
lbc_type,
pos_bc,
z0b,
u_taub,
cm0,
sig_k,
cmsf,
cw,
gen_alpha,
gen_l,
)
diff_face(
nlev,
dt,
_CNPAR,
h,
k_ubc,
k_lbc,
diff_k_up,
diff_k_down,
avh,
l_sour,
q_sour,
tke,
au,
bu,
cu,
du,
ru,
qu,
)
tke[nlev] = _k_bc_value(
_DIRICHLET,
ubc_type,
z0s,
z0s,
u_taus,
cm0,
sig_k,
cmsf,
cw,
gen_alpha,
gen_l,
)
tke[0] = _k_bc_value(
_DIRICHLET,
lbc_type,
z0b,
z0b,
u_taub,
cm0,
sig_k,
cmsf,
cw,
gen_alpha,
gen_l,
)
for i in range(nlev + 1):
if tke[i] < k_min:
tke[i] = k_min
[docs]
@numba.njit(parallel=True, cache=True)
def step_tkeeq(
batch_size: int,
nlev: int,
dt: float,
sig_k: float,
k_min: float,
k_ubc: int,
k_lbc: int,
ubc_type: int,
lbc_type: int,
cm0: float,
cmsf: float,
cw: float,
gen_alpha: float,
gen_l: float,
tke: np.ndarray,
tkeo: np.ndarray,
h: np.ndarray,
P: np.ndarray,
B: np.ndarray,
Px: np.ndarray,
PSTK: np.ndarray,
num: np.ndarray,
eps: np.ndarray,
avh: np.ndarray,
l_sour: np.ndarray,
q_sour: np.ndarray,
u_taus: np.ndarray,
u_taub: np.ndarray,
z0s: np.ndarray,
z0b: np.ndarray,
au: np.ndarray,
bu: np.ndarray,
cu: np.ndarray,
du: np.ndarray,
ru: np.ndarray,
qu: np.ndarray,
) -> None:
r"""Advance the dynamic k-equation for one or more columns."""
for col in numba.prange(batch_size):
_step_tkeeq(
nlev,
dt,
sig_k,
k_min,
k_ubc,
k_lbc,
ubc_type,
lbc_type,
cm0,
cmsf,
cw,
gen_alpha,
gen_l,
tke[col],
tkeo[col],
h[col],
P[col],
B[col],
Px[col],
PSTK[col],
num[col],
eps[col],
avh[col],
l_sour[col],
q_sour[col],
u_taus[col, 0],
u_taub[col, 0],
z0s[col, 0],
z0b[col, 0],
au[col],
bu[col],
cu[col],
du[col],
ru[col],
qu[col],
)
step_tkeeq_single = _step_tkeeq