Source code for mktlib.data._monte_carlo

from __future__ import annotations

import enum
import math
import random
from typing import Any, Callable, overload

import polars as pl
from polars_sdist import sample_normal, sample_students_t

from mktlib.data._gbm import _gbm_price_expr
from mktlib.data._ornstein_uhlenbeck import _ou_value_expr


[docs] class Innovations(enum.Enum): """Noise distribution for stochastic-process simulations. All variants produce **unit-variance** i.i.d. samples — the ``volatility`` (or analogous) parameter of the host process remains the controlling scale. Switching innovations changes only the *shape* of the tails, not the second moment of bar-level log-returns. Currently honoured by :class:`Process.GBM`; using a non-Gaussian member with :class:`Process.OU` or :class:`Process.FRW` raises :class:`NotImplementedError` (FRW's Davies–Harte construction is only meaningful under Gaussian noise; OU's direct-``sigma`` parameterisation tangles with the unit-variance contract). """ GAUSSIAN = "gaussian" """Standard normal :math:`Z \\sim N(0, 1)` via ``polars_sdist.sample_normal``.""" STUDENT_T = "students_t" """Student-t with degrees-of-freedom *df* (>2), rescaled to unit variance.""" BOOTSTRAP = "bootstrap" """Resample with replacement from a caller-supplied unit-variance residual series."""
def _child_seeds(n_simulations: int, seed: int | None) -> list[int]: rng = random.Random(seed) return [rng.randrange(2**63) for _ in range(n_simulations)] def _draw_unit_variance( n: int, seed: int | None, *, innovations: Innovations | Callable[[int, int | None], pl.Series], df: float | None, residuals: pl.Series | None, ) -> pl.Series: """Single-simulation unit-variance noise draw. Dispatches on *innovations*; called once per child seed and concatenated upstream. All return paths produce a Series of length *n* with (asymptotic) unit variance. """ if callable(innovations): return innovations(n, seed) match innovations: case Innovations.GAUSSIAN: return sample_normal(n, seed=seed) case Innovations.STUDENT_T: if df is None: msg = "innovations=STUDENT_T requires df= (degrees of freedom)" raise ValueError(msg) if df <= 2: msg = f"Student-t df must be > 2 for finite variance (got {df})" raise ValueError(msg) scale = math.sqrt(df / (df - 2.0)) return sample_students_t(n, df=df, seed=seed) / scale case Innovations.BOOTSTRAP: if residuals is None: msg = "innovations=BOOTSTRAP requires residuals= (unit-variance series)" raise ValueError(msg) return residuals.sample(n=n, with_replacement=True, seed=seed) def _noise_frame( n_simulations: int, n: int, *, seed: int | None = None, innovations: Innovations | Callable[[int, int | None], pl.Series] = Innovations.GAUSSIAN, df: float | None = None, residuals: pl.Series | None = None, independent_streams: bool = True, ) -> pl.DataFrame: """Build ``simulation | seed | step | z`` frame with per-sim noise draws. The *z* column is unit-variance regardless of *innovations* — the distributional choice only changes tail shape. See :class:`Innovations` for the contract. Parameters ---------- independent_streams When ``True`` (default), each simulation draws from its own deterministically-derived child RNG and the *seed* column reports that per-stream seed. Preserves byte-for-byte equivalence with pre-v0.11.x behaviour. When ``False``, draws ``n_simulations * n`` unit-variance samples in a single batched call to the underlying sampler. The result is statistically identical (i.i.d. samples by construction) but runs **5–7× faster for Gaussian / Student-t and ~200× faster for bootstrap** because per-call RNG construction overhead dominates the wall-clock at typical (10k sims) scales. The *seed* column is filled with one parent-derived seed shared by every row — callers that introspect per-simulation seeds must opt out. Notes ----- The ``seed`` column is **never null**, even when ``seed=None`` is passed. Under ``seed=None`` we materialize a fresh OS-time-derived integer per call and write it to every row, so the column always holds a concrete value. Reproducibility-via-seed-column introspection is therefore unsafe — use the *parent* seed argument you passed in to detect deterministic vs. non-deterministic runs, not the column. """ total = n_simulations * n if independent_streams: child_seeds = _child_seeds(n_simulations, seed) z = pl.concat( [ _draw_unit_variance( n, s, innovations=innovations, df=df, residuals=residuals, ) for s in child_seeds ], rechunk=True, ) seeds_per_sim = pl.Series("seed", child_seeds) else: # One concrete parent-derived seed drives both the batched draw # and the seed column, so the column stays meaningful even when # the caller passes seed=None (we materialize an OS-time-based # int rather than leaving it null). parent_seed = _child_seeds(1, seed)[0] z = _draw_unit_variance( total, parent_seed, innovations=innovations, df=df, residuals=residuals, ) seeds_per_sim = pl.Series("seed", [parent_seed] * n_simulations) idx = pl.arange(0, total, eager=True) sim = idx // n return pl.DataFrame( { "simulation": sim, "seed": seeds_per_sim.gather(sim), "step": idx % n, "z": z, } )
[docs] class Process(enum.Enum): """Built-in stochastic processes for :func:`monte_carlo`. Each variant maps to a vectorized implementation that draws all normal samples in a single ``polars-sdist`` call and partitions them across simulations with ``.over("simulation")`` — no Python loop. Pass the enum member as the first argument to :func:`monte_carlo`, along with any keyword arguments accepted by the underlying single-path generator (e.g. ``n``, ``drift``, ``volatility`` for GBM). """ GBM = "gbm" """Geometric Brownian motion — lognormal price paths.""" OU = "ou" """Ornstein-Uhlenbeck — mean-reverting process.""" FRW = "frw" """Fractional random walk — fBm increments via Davies-Harte."""
@overload def monte_carlo( process: Process, n_simulations: int = 1000, *, seed: int | None = None, innovations: Innovations | Callable[[int, int | None], pl.Series] = Innovations.GAUSSIAN, df: float | None = None, residuals: pl.Series | None = None, independent_streams: bool = True, **process_kwargs: Any, ) -> pl.DataFrame: ... @overload def monte_carlo( process: Callable[..., pl.DataFrame], n_simulations: int = 1000, *, seed: int | None = None, innovations: Innovations | Callable[[int, int | None], pl.Series] = Innovations.GAUSSIAN, df: float | None = None, residuals: pl.Series | None = None, independent_streams: bool = True, **process_kwargs: Any, ) -> pl.DataFrame: ...
[docs] def monte_carlo( process: Process | Callable[..., pl.DataFrame], n_simulations: int = 1000, *, seed: int | None = None, innovations: Innovations | Callable[[int, int | None], pl.Series] = Innovations.GAUSSIAN, df: float | None = None, residuals: pl.Series | None = None, independent_streams: bool = True, **process_kwargs: Any, ) -> pl.DataFrame: r"""Run multiple simulations of a stochastic process. Generates *n_simulations* independent paths from the same process and returns them stacked in a single DataFrame with a ``simulation`` index column prepended. Parameters ---------- process Which process to simulate. A :class:`Process` enum member selects the vectorized fast path; a callable with signature ``(*, seed: int, **kwargs) -> pl.DataFrame`` uses a serial loop fallback. n_simulations Number of independent paths to generate. seed Parent RNG seed. Deterministic child seeds are derived so that each simulation is reproducible and independent. innovations Noise distribution feeding the SDE. Currently honoured by :class:`Process.GBM`; passing a non-Gaussian member with :class:`Process.OU`, :class:`Process.FRW`, or a callable *process* raises :class:`NotImplementedError`. All variants emit unit-variance i.i.d. samples — the host process's ``volatility`` parameter remains the controlling scale. See :class:`Innovations`. df Required when ``innovations=Innovations.STUDENT_T``; degrees of freedom (must be > 2 for finite variance). residuals Required when ``innovations=Innovations.BOOTSTRAP``; a unit-variance ``pl.Series`` of empirical residuals to resample with replacement. independent_streams Default ``True`` — preserves the per-simulation child-seed contract. Set to ``False`` to draw all noise in one batched sampler call (statistically identical i.i.d. samples, **5×–200× faster** at typical scales). Trade-off: the *seed* column reports a single parent-derived seed instead of one per simulation. GBM only — passing ``False`` with OU / FRW raises :class:`NotImplementedError`. **process_kwargs Forwarded to the underlying generator. For :class:`Process` members these match the single-path function signatures (e.g. ``n``, ``drift``, ``volatility`` for GBM). Returns ------- pl.DataFrame Columns ``simulation`` (int), ``seed`` (int), ``step`` (int), and a value column whose name depends on the process — ``price`` for GBM / FRW, ``value`` for OU. The ``seed`` column contains the child seed used for that simulation — constant within each simulation group. Notes ----- All built-in processes are discretizations of the general SDE:: dX(t) = a(X, t) dt + b(X, t) dW(t) where :math:`dW(t) = Z \sqrt{dt}`, :math:`Z \sim N(0,1)`. The drift and diffusion coefficients for each :class:`Process` variant are: ===== ======================== =================== Enum Drift *a(X, t)* Diffusion *b(X, t)* ===== ======================== =================== GBM :math:`\mu X` :math:`\sigma X` OU :math:`\theta(\mu - X)` :math:`\sigma` FRW 0 step_size ===== ======================== =================== **Vectorized path (Process enum).** A single ``sample_normal(n_simulations * n)`` call draws all noise upfront. Per-simulation recurrences are computed via Polars ``.over("simulation")`` expressions, keeping the entire operation in Rust with no Python loop. For FRW with :math:`H \neq 0.5`, the sqrt-eigenvalues of the circulant covariance are computed once and tiled across simulations. **Serial fallback (callable).** The callable is invoked once per simulation with a deterministic child seed derived from *seed*. Results are concatenated with :func:`polars.concat`. This path is slower but supports arbitrary user-defined generators. **Seed derivation.** Child seeds are produced by a ``random.Random(seed)`` instance, so results are fully reproducible for a given *seed* / *n_simulations* pair. """ if n_simulations < 1: raise ValueError("n_simulations must be >= 1") non_gaussian = ( callable(innovations) or innovations is not Innovations.GAUSSIAN ) if isinstance(process, Process): if process is Process.GBM: return _vectorized_gbm( n_simulations, seed=seed, innovations=innovations, df=df, residuals=residuals, independent_streams=independent_streams, **process_kwargs, ) if non_gaussian: msg = ( f"innovations={innovations!r} is only supported for " "Process.GBM in v0.11.0; OU and FRW require Gaussian noise" ) raise NotImplementedError(msg) if process is Process.OU: return _vectorized_ou( n_simulations, seed=seed, independent_streams=independent_streams, **process_kwargs, ) if process is Process.FRW: return _vectorized_frw( n_simulations, seed=seed, independent_streams=independent_streams, **process_kwargs, ) if non_gaussian: msg = ( "custom innovations are not supported for callable processes; " "the user-supplied generator owns its own noise source" ) raise NotImplementedError(msg) if not independent_streams: # Callable processes own their own noise source; we have no way to # batch a user-defined generator, so the flag has no effect there. msg = ( "independent_streams=False is not supported for callable processes; " "the user-supplied generator owns its own noise source" ) raise NotImplementedError(msg) return _loop(process, n_simulations, seed=seed, **process_kwargs)
def _vectorized_gbm( n_simulations: int, *, seed: int | None = None, n: int = 100, base_price: float = 100.0, drift: float = 0.0, volatility: float = 1.0, dt: float = 1 / 252, innovations: Innovations | Callable[[int, int | None], pl.Series] = Innovations.GAUSSIAN, df: float | None = None, residuals: pl.Series | None = None, independent_streams: bool = True, ) -> pl.DataFrame: if n < 1: raise ValueError("n must be >= 1") log_base = math.log(base_price) mu_dt = (drift - 0.5 * volatility**2) * dt sigma_sqrt_dt = volatility * math.sqrt(dt) return ( _noise_frame( n_simulations, n, seed=seed, innovations=innovations, df=df, residuals=residuals, independent_streams=independent_streams, ) .with_columns( _gbm_price_expr(log_base, mu_dt, sigma_sqrt_dt).over("simulation") ) .select("simulation", "seed", "step", "price") ) def _vectorized_ou( n_simulations: int, *, seed: int | None = None, n: int = 100, theta: float = 0.7, mu: float = 100.0, sigma: float = 1.0, x0: float | None = None, dt: float = 1 / 252, geometric: bool = False, independent_streams: bool = True, ) -> pl.DataFrame: if n < 1: raise ValueError("n must be >= 1") start = mu if x0 is None else x0 alpha = 1.0 - theta * dt beta = theta * mu * dt noise_scale = sigma * math.sqrt(dt) df = ( _noise_frame( n_simulations, n, seed=seed, independent_streams=independent_streams, ) .with_columns( _ou_value_expr(start, alpha, beta, noise_scale).over("simulation") ) ) if geometric: return df.with_columns(pl.col("value").exp().alias("price")).select("simulation", "seed", "step", "price") return df.select("simulation", "seed", "step", "value") def _vectorized_frw( n_simulations: int, *, seed: int | None = None, n: int = 100, hurst: float = 0.5, base_price: float = 100.0, step_size: float = 1.0, geometric: bool = False, independent_streams: bool = True, ) -> pl.DataFrame: if n < 1: raise ValueError("n must be >= 1") if hurst == 0.5: # Simple cumsum path — no FFT needed cumulative_expr = ( (pl.col("z") * step_size) .cum_sum() .shift(1) .fill_null(0.0) .over("simulation") ) if geometric: price_expr = (base_price * cumulative_expr.exp()).alias("price") else: price_expr = (base_price + cumulative_expr).alias("price") return ( _noise_frame( n_simulations, n, seed=seed, independent_streams=independent_streams, ) .with_columns(price_expr) .select("simulation", "seed", "step", "price") ) from mktlib.data._random_walk import ( _build_covariance_row, _derive_seeds, _frw_increments_expr, _sqrt_eigenvalue_expr, ) cov_row = _build_covariance_row(n, hurst) m = len(cov_row) # 2n if independent_streams: # Per-simulation child seeds → per-sim re/im seeds via _derive_seeds child_seeds = _child_seeds(n_simulations, seed) z_re_parts: list[pl.Series] = [] z_im_parts: list[pl.Series] = [] for cs in child_seeds: s_re, s_im = _derive_seeds(cs) z_re_parts.append(sample_normal(m, seed=s_re)) z_im_parts.append(sample_normal(m, seed=s_im)) z_re_all = pl.concat(z_re_parts, rechunk=True) z_im_all = pl.concat(z_im_parts, rechunk=True) seeds_per_sim = pl.Series("seed", child_seeds) else: # Single-batch: derive one parent-derived seed pair (re, im) and # draw all 2*n_simulations*m unit normals in two sampler calls. # Statistically identical (i.i.d. unit-normal pairs across sims) # because the Davies-Harte construction only requires per-sim # independence of the real/imaginary parts, which holds trivially # under i.i.d. draws. parent_seed = _child_seeds(1, seed)[0] parent_re, parent_im = _derive_seeds(parent_seed) z_re_all = sample_normal(n_simulations * m, seed=parent_re) z_im_all = sample_normal(n_simulations * m, seed=parent_im) seeds_per_sim = pl.Series("seed", [parent_seed] * n_simulations) # Compute sqrt_eig once, then tile across simulations sqrt_eig = ( pl.DataFrame({"cov_row": cov_row}) .select(_sqrt_eigenvalue_expr()) .to_series() ) sqrt_eig_tiled = pl.Series("sqrt_eig", sqrt_eig.to_list() * n_simulations) total_noise = n_simulations * m idx = pl.arange(0, total_noise, eager=True) seed_col = seeds_per_sim.gather(idx // m) df = pl.DataFrame( { "simulation": idx // m, "seed": seed_col, "embed_idx": idx % m, "sqrt_eig": sqrt_eig_tiled, "z_re": z_re_all, "z_im": z_im_all, } ) # Apply IFFT per simulation group df = df.with_columns(_frw_increments_expr().over("simulation")) # Filter to first n rows per simulation (discard circulant padding) df = df.filter(pl.col("embed_idx") < n) # Scale and cumsum → prices cumulative_expr = ( (pl.col("increment") * (m**0.5 * step_size)) .cum_sum() .shift(1) .fill_null(0.0) .over("simulation") ) if geometric: price_expr = (base_price * cumulative_expr.exp()).alias("price") else: price_expr = (base_price + cumulative_expr).alias("price") return ( df.with_columns(price_expr) .rename({"embed_idx": "step"}) .select("simulation", "seed", "step", "price") ) def _loop( process: Process | Callable[..., pl.DataFrame], n_simulations: int, *, seed: int | None = None, **process_kwargs: Any, ) -> pl.DataFrame: child_seeds = _child_seeds(n_simulations, seed) if isinstance(process, Process): match process: case Process.GBM: from mktlib.data._gbm import geometric_brownian_motion func = geometric_brownian_motion case Process.OU: from mktlib.data._ornstein_uhlenbeck import ornstein_uhlenbeck func = ornstein_uhlenbeck case Process.FRW: from mktlib.data._random_walk import fractional_random_walk func = fractional_random_walk else: func = process frames: list[pl.DataFrame] = [] for i, s in enumerate(child_seeds): df = func(**process_kwargs, seed=s) frames.append( df.with_columns( pl.lit(i).alias("simulation"), pl.lit(s).alias("seed"), ) ) result = pl.concat(frames) cols = result.columns return result.select( ["simulation", "seed"] + [c for c in cols if c not in ("simulation", "seed")] )