Source code for mktlib.scheduling.calendar

from __future__ import annotations

from datetime import date, datetime, time, timedelta
from typing import Any, TYPE_CHECKING
from zoneinfo import ZoneInfo

import polars as pl

from mktlib.scheduling._mixins import (
    MinuteQueryMixin,
    SessionNavigationMixin,
    TradingHelperMixin,
)
from mktlib.scheduling._types import MarketDailySchedule, parse_date
from mktlib.scheduling.rules import AdhocClosure, EarlyClose, HolidayRule

from mktlib.scheduling._break_mixin import BreakMixin

if TYPE_CHECKING:
    from collections.abc import Callable


[docs] class ExchangeCalendar( SessionNavigationMixin, MinuteQueryMixin, TradingHelperMixin ): """Polars-native exchange calendar with holiday/early-close support.""" def __init__( self, name: str, *, timezone: str, open_time: time, close_time: time, holidays: list[HolidayRule], adhoc_closures: list[AdhocClosure] | None = None, early_closes: list[EarlyClose] | None = None, special_closures_fn: Callable[[date, date], list[date]] | None = None, special_early_closes_fn: ( Callable[[date, date], dict[date, time]] | None ) = None, exclusions: set[date] | None = None, open_offset: int = 0, ): self.name = name self.timezone = timezone self.tz = ZoneInfo(timezone) self.open_time = open_time self.close_time = close_time self.holidays = holidays self.adhoc_closures = adhoc_closures or [] self.early_closes = early_closes or [] self._special_closures_fn = special_closures_fn self._special_early_closes_fn = special_early_closes_fn self._exclusions = exclusions or set() self.open_offset = open_offset def _closure_dates(self, start: date, end: date) -> set[date]: """Collect all closure dates (holidays + adhoc) within [start, end].""" closures: set[date] = set() for rule in self.holidays: closures.update(rule.dates_in_range(start, end)) for adhoc in self.adhoc_closures: for d in adhoc.dates: if start <= d <= end: closures.add(d) if self._special_closures_fn is not None: closures.update(self._special_closures_fn(start, end)) closures -= self._exclusions return closures def _early_close_map(self, start: date, end: date) -> dict[date, time]: """Map of date -> early close time within [start, end].""" ec_map: dict[date, time] = {} for ec in self.early_closes: for d in ec.dates_in_range(start, end): ec_map[d] = ec.close_time if self._special_early_closes_fn is not None: ec_map.update(self._special_early_closes_fn(start, end)) return ec_map
[docs] def valid_days(self, start: date | str, end: date | str) -> pl.Series: """Return a Polars Series of trading days within [start, end]. Trading dates are weekdays (Mon-Fri) minus closures. For calendars with ``open_offset < 0`` (e.g. Globex, FX), a session's market_open may fall on the prior calendar day (e.g. Sunday 17:00 for Monday's session), but the session is still attributed to the weekday. Minute-level methods like ``is_open_on_minute`` handle the cross-day lookup. """ start_d, end_d = parse_date(start), parse_date(end) closures = self._closure_dates(start_d, end_d) all_days = pl.date_range(start_d, end_d, eager=True) mask = all_days.dt.weekday() <= 5 # Polars ISO: 1=Mon..7=Sun if closures: mask = mask & ~all_days.is_in(list(closures)) return all_days.filter(mask).alias("date")
[docs] def schedule(self, start: date | str, end: date | str) -> pl.DataFrame: """Return a Polars DataFrame with columns: date, market_open, market_close. For calendars with ``open_offset < 0``, market_open falls on the prior calendar day (e.g. Sunday 17:00 for a Monday FX session). """ start_d, end_d = parse_date(start), parse_date(end) days = self.valid_days(start_d, end_d) ec_map = self._early_close_map(start_d, end_d) df = days.to_frame() open_expr = ( pl.col("date") .dt.offset_by(f"{self.open_offset}d") .dt.combine(self.open_time) .dt.replace_time_zone(self.timezone) .alias("market_open") ) if ec_map: ec_df = pl.DataFrame( { "date": pl.Series(list(ec_map.keys()), dtype=pl.Date), "_ec_time": pl.Series( list(ec_map.values()), dtype=pl.Time ), } ) df = df.join(ec_df, on="date", how="left") close_expr = ( pl.col("date") .dt.combine( pl.coalesce(pl.col("_ec_time"), pl.lit(self.close_time)) ) .dt.replace_time_zone(self.timezone) .alias("market_close") ) return df.with_columns(open_expr, close_expr).select( "date", "market_open", "market_close" ) close_expr = ( pl.col("date") .dt.combine(self.close_time) .dt.replace_time_zone(self.timezone) .alias("market_close") ) return df.with_columns(open_expr, close_expr)
[docs] def is_session(self, day: date | str) -> bool: """Check if a given date is a trading day.""" d = parse_date(day) if d.weekday() >= 5: return False closures = self._closure_dates(d, d) return d not in closures
[docs] def get_schedule(self, day: date | str) -> MarketDailySchedule | None: """Get the schedule for a single day, or None if not a trading day.""" d = parse_date(day) if not self.is_session(d): return None ec_map = self._early_close_map(d, d) close_t = ec_map.get(d, self.close_time) open_date = d + timedelta(days=self.open_offset) return MarketDailySchedule( date=d, market_open=datetime.combine( open_date, self.open_time, tzinfo=self.tz ), market_close=datetime.combine(d, close_t, tzinfo=self.tz), )
[docs] class ExchangeCalendarWithBreaks(BreakMixin, ExchangeCalendar): """Exchange calendar with lunch break support (e.g. JPX, HKEX).""" def __init__( self, name: str, *, break_start: time, break_end: time, **kwargs: Any ): self.break_start = break_start self.break_end = break_end super().__init__(name, **kwargs)