from __future__ import annotations
import math
import polars as pl
from polars_sdist import sample_normal
[docs]
def ornstein_uhlenbeck(
n: int,
theta: float = 0.7,
mu: float = 100.0,
sigma: float = 1.0,
x0: float | None = None,
dt: float = 1 / 252,
seed: int | None = None,
geometric: bool = False,
) -> pl.DataFrame:
r"""Discrete Ornstein-Uhlenbeck process via vectorized Euler-Maruyama.
Simulates the mean-reverting SDE
:math:`dx = \theta(\mu - x) \, dt + \sigma \, dW`, discretised as an
AR(1) recurrence and unrolled into a prefix sum.
Parameters
----------
n
Number of time steps to generate.
theta
Mean-reversion speed. Larger values pull *x* back to *mu* faster.
mu
Long-run mean (equilibrium level).
sigma
Volatility of the noise term.
x0
Starting value. Defaults to *mu* (start at equilibrium).
dt
Time step size in annualised units. Defaults to ``1/252`` (one
trading day). For sub-daily granularity, divide further — e.g.
``1/(252*6.5*3600)`` for one-second ticks during US equity hours.
seed
RNG seed for reproducibility.
geometric
If ``True``, exponentiate the OU process to produce geometric
(lognormal) prices: ``exp(OU)``. Parameters *mu* and *x0* are
interpreted in log-space (standard convention, Schwartz 1997).
Output column is ``"price"`` instead of ``"value"``.
Returns
-------
pl.DataFrame
Columns ``step`` (int) and ``value`` (float), or ``"price"``
when ``geometric=True``.
Notes
-----
The Euler-Maruyama recurrence is::
x[i] = x[i-1] + θ(μ - x[i-1])dt + σ√dt·z[i]
= α·x[i-1] + β + σ√dt·z[i]
where ``α = 1 - θ·dt`` and ``β = θ·μ·dt``. This is an AR(1) process
whose closed-form unrolling is::
x[i] = αⁱ·x₀ + Σⱼ₌₁ⁱ α^(i-j)·(β + σ√dt·z[j])
Factoring out ``αⁱ`` gives::
x[i] = αⁱ · (x₀ + Σⱼ₌₁ⁱ (β + σ√dt·z[j]) / αʲ)
The inner sum is a prefix sum of ``innovation[j] / αʲ``, which is
computed with :meth:`polars.Expr.cum_sum` — no Python loop required.
**Stability.** The scheme requires ``θ·dt < 1`` (i.e. ``α > 0``).
When this is violated the recurrence oscillates around *mu* instead of
reverting smoothly. For large *theta*, reduce *dt* accordingly.
"""
if n < 1:
raise ValueError("n must be >= 1")
start = mu if x0 is None else x0
alpha = 1.0 - theta * dt
beta = theta * mu * dt
noise_scale = sigma * math.sqrt(dt)
z = sample_normal(n, seed=seed)
step = pl.arange(0, n, eager=True).alias("step")
df = (
pl.DataFrame({"step": step, "z": z})
.with_columns(_ou_value_expr(start, alpha, beta, noise_scale))
)
if geometric:
return df.with_columns(pl.col("value").exp().alias("price")).select("step", "price")
return df.select("step", "value")
def _ou_value_expr(
start: float, alpha: float, beta: float, noise_scale: float
) -> pl.Expr:
"""OU value expression. Assumes columns ``step`` and ``z`` exist."""
innov = (
pl.when(pl.col("step") == 0)
.then(0.0)
.otherwise(beta + noise_scale * pl.col("z"))
)
alpha_pow = pl.lit(alpha).pow(pl.col("step"))
return (
(innov / alpha_pow).cum_sum() * alpha_pow + start * alpha_pow
).alias("value")