Source code for mktlib.data._random_walk

from __future__ import annotations

import random

import polars as pl
from polars_sdist import sample_normal


def _ensure_rfft() -> None:
    """Import polars_rfft to register the .rfft namespace on pl.Expr."""
    import polars_rfft  # type: ignore[import-untyped]  # noqa: F401


def _build_covariance_row(n: int, hurst: float) -> pl.Series:
    """Toeplitz coefficients + circulant embedding (length 2n)."""
    h2 = 2 * hurst
    k = pl.arange(0, n, eager=True).alias("k")
    c = 0.5 * (
        (k + 1).cast(pl.Float64).pow(h2)
        - 2 * k.cast(pl.Float64).pow(h2)
        + (k - 1).cast(pl.Float64).abs().pow(h2)
    )
    # Embed in circulant matrix: [c_0, ..., c_{n-1}, 0, c_{n-1}, ..., c_1]
    padding = pl.Series("pad", [0.0])
    return pl.concat([c, padding, c[1:].reverse()], rechunk=True).rename("cov_row")


def _sqrt_eigenvalue_expr(col: str = "cov_row") -> pl.Expr:
    """FFT of circulant row → real part → clamp ≥ 0 → sqrt."""
    _ensure_rfft()
    return (
        pl.col(col)
        .rfft.fft()  # type: ignore[attr-defined]
        .struct.field("re")
        .clip(lower_bound=0.0)
        .sqrt()
        .alias("sqrt_eig")
    )


def _frw_increments_expr() -> pl.Expr:
    """Build complex product and IFFT, assuming sqrt_eig/z_re/z_im columns.

    Returns a real-valued expression of the IFFT result.
    """
    return (
        pl.struct(
            re=pl.col("sqrt_eig") * pl.col("z_re"),
            im=pl.col("sqrt_eig") * pl.col("z_im"),
        )
        .rfft.ifft_real()  # type: ignore[attr-defined]
        .alias("increment")
    )


def _derive_seeds(seed: int | None) -> tuple[int, int]:
    """Derive two child seeds from a parent seed."""
    rng = random.Random(seed)
    return rng.randrange(2**63), rng.randrange(2**63)


[docs] def fractional_random_walk( n: int, hurst: float = 0.5, base_price: float = 100.0, step_size: float = 1.0, seed: int | None = None, geometric: bool = False, ) -> pl.DataFrame: r"""Discrete-time fractional random walk using Davies-Harte circulant embedding. The Hurst exponent *H* controls the autocorrelation of increments: - **H = 0.5** — independent increments (standard random walk). - **H > 0.5** — positively correlated increments (trending / persistent). - **H < 0.5** — negatively correlated increments (mean-reverting / anti-persistent). Parameters ---------- n Number of time steps to generate. hurst Hurst exponent, must be in ``(0, 1)``. base_price Initial price :math:`S_0`. step_size Scale factor applied to each increment. seed RNG seed for reproducibility. geometric If ``True``, exponentiate the cumulative sum to produce geometric (lognormal) prices: ``base_price * exp(cumsum(increments))``. Prices are always positive. Default ``False`` preserves the additive construction. Returns ------- pl.DataFrame Columns ``step`` (int) and ``price`` (float). Notes ----- **H = 0.5 fast path.** Increments are i.i.d. normal — no FFT is needed, just ``sample_normal`` → scale → ``cum_sum``. **H ≠ 0.5 (Davies-Harte spectral method).** Fractional Brownian motion increments have autocovariance:: γ(k) = ½(|k−1|^{2H} − 2|k|^{2H} + |k+1|^{2H}) The algorithm embeds this Toeplitz sequence in a :math:`2n`-length circulant vector, then: 1. Compute eigenvalues via FFT: :math:`\Lambda = \text{FFT}(\gamma_{\text{circ}})` 2. Draw two independent standard normal vectors :math:`Z_{re}, Z_{im}` 3. Form the product :math:`\sqrt{\Lambda} \cdot (Z_{re} + i \, Z_{im})` 4. Apply IFFT and take the first *n* real values as increments This is O(n log n) via FFT, compared to O(n²) for Cholesky factorisation of the full covariance matrix. FFT and normal sampling are delegated to the ``polars-rfft`` and ``polars-sdist`` Rust plugins respectively — no NumPy dependency. """ if n < 1: raise ValueError("n must be >= 1") if not 0 < hurst < 1: raise ValueError("hurst must be in (0, 1)") if hurst == 0.5: z = sample_normal(n, seed=seed) increments = z * step_size cumulative = increments.cum_sum() if geometric: prices = pl.Series("price", [0.0]).append(cumulative)[:n].exp() * base_price else: prices = pl.Series("price", [base_price]).append(cumulative + base_price)[:n] return pl.DataFrame({"step": range(n), "price": prices}) # Davies-Harte / circulant embedding — O(n log n) cov_row = _build_covariance_row(n, hurst) m = len(cov_row) seed_re, seed_im = _derive_seeds(seed) z_re = sample_normal(m, seed=seed_re) z_im = sample_normal(m, seed=seed_im) scale = m**0.5 * step_size return ( pl.DataFrame({"cov_row": cov_row, "z_re": z_re, "z_im": z_im}) .with_columns(_sqrt_eigenvalue_expr()) .select(_frw_increments_expr()) .head(n) .with_row_index("step") .with_columns( (pl.col("increment") * scale).cum_sum().shift(1).fill_null(0.0).alias("_cumulative") ) .with_columns( ( base_price * pl.col("_cumulative").exp() if geometric else base_price + pl.col("_cumulative") ).alias("price") ) .cast({"step": pl.Int64}) .select("step", "price") )