# Current Version: Architecture and Runtime Flow This document describes how the current `src/core` implementation works. ## 1. Scope The current version implements a single-process POC service with: - universe loading from no-auth sources - price ingestion via `yfinance` with process pool - in-memory pub/sub - multi-timeframe delta and risk event detection - market regime state machine - `risk.regime.change` and `risk.envelope.updated` events - data gap detection - alert queue with retries - basic telemetry counters Entry point: - `src/core/main.py` ## 2. Main Runtime Loop At startup (`main.py`): 1. Build shared components (`PubSub`, `RiskEngine`, `DataGapDetector`, `AlertQueue`, `Telemetry`, `RiskEnvelopeStore`). 2. Register topic subscriptions. 3. Load universe once and publish `market.universe.snapshot`. 4. Enter infinite loop with `POLL_INTERVAL_SEC`. Per polling iteration (`run_once`): 1. Publish `market.universe.updated` for each venue bucket. 2. Fetch prices for that venue through `PriceIngestor`. 3. Publish `market.prices.snapshot`. 4. Process pending alert queue retries. ## 3. Components and Responsibilities ### 3.1 Universe Loader File: `src/core/universe_loader.py` Responsibilities: - load symbol universe for US equities, EU equities, crypto, commodities - normalize to `yfinance`-compatible symbols - build universe snapshot event Produced events: - `market.universe.snapshot` - `market.universe.updated` ### 3.2 Price Ingestor File: `src/core/price_ingestor.py` Responsibilities: - split ticker list into chunks (`BATCH_SIZE`) - fetch each chunk (parallel via `ProcessPoolExecutor`) - apply retry/backoff wrapper around `yfinance.download` - emit snapshot with monotonic `sequence_id` Produced event: - `market.prices.snapshot` ### 3.3 Retry Utility File: `src/core/retry_utils.py` Responsibilities: - `backoff_delay(base_delay_sec, attempt)` exponential delay - `call_with_backoff(fn, max_attempts, base_delay_sec)` common retry wrapper Used by: - `PriceIngestor` fetch retries - `AlertQueue` retry scheduling ### 3.4 Risk Engine File: `src/core/risk_engine.py` Responsibilities: - maintain rolling price windows (`PriceBuffer`) - compute per-timeframe deltas (`Price Delta Calculator`) - detect threshold breaches and severity - emit per-ticker risk events and alerts - maintain and update regime state machine - publish regime transitions separately from envelope updates Produced events: - `market.price.delta` - `risk.event.detected` - `risk.regime.change` - `risk.envelope.updated` - `system.alert.ready` (risk alerts) ### 3.5 Regime State Machine File: `src/core/risk_regime.py` States: - `normal` - `tension` - `stress` - `panic` Inputs per snapshot: - `worst_drop_pct` - `high_severity_count` Logic: - escalates immediately on stronger stress conditions - downgrades only after `calm_snapshots_to_downgrade` to avoid oscillation ### 3.6 Data Gap Detector File: `src/core/data_gap.py` Responsibilities: - track last seen `sequence_id` per ticker - detect jumps (`current - last > 1`) - emit `data.gap.detected` ### 3.7 Alerting File: `src/core/alerting.py` Responsibilities: - build risk alerts (`build_alert`) - build data gap alerts (`build_data_gap_alert`) - send alerts via Telegram notifier - queue alerts and retry failed sends Queue behavior: - enqueue on `system.alert.ready` - process each loop - retry with exponential backoff - drop after `ALERT_MAX_RETRIES` ### 3.8 Telemetry File: `src/core/telemetry.py` Responsibilities: - count events by class and key dimensions - periodic stdout report every `TELEMETRY_REPORT_INTERVAL_SEC` Current counters include: - `alerts_enqueued` - `risk_events` - `risk_severity:*` - `risk_regime_changes` - `risk_regime_to:*` - `data_gap_events` - `data_gap_ticker:*` ## 4. Topic Taxonomy Configured in: - `configs/topics.yaml` Current keys: - `universe_snapshot`: `market.universe.snapshot` - `universe_updated`: `market.universe.updated` - `prices_snapshot`: `market.prices.snapshot` - `price_delta`: `market.price.delta` - `risk_event`: `risk.event.detected` - `risk_regime_change`: `risk.regime.change` - `risk_envelope`: `risk.envelope.updated` - `data_gap`: `data.gap.detected` - `alert_ready`: `system.alert.ready` Loaded by: - `src/core/topics.py` Behavior: - fallback to hardcoded defaults if YAML is missing or malformed ## 5. Event Contracts Schemas live in: - `src/core/schemas.py` Defined typed contracts: - `Instrument` - `UniverseSnapshot` - `RiskEnvelope` - `RiskRegimeChangeEvent` - `DataGapEvent` Notes: - runtime payloads are plain dicts; TypedDict provides development-time contract guidance ## 6. Subscription Graph Set in `src/core/main.py`: - `system.alert.ready -> AlertQueue.enqueue` - `system.alert.ready -> Telemetry.on_alert_enqueued` - `market.prices.snapshot -> RiskEngine.on_price_snapshot` - `market.prices.snapshot -> DataGapDetector.on_price_snapshot` - `risk.event.detected -> Telemetry.on_risk_event` - `risk.regime.change -> Telemetry.on_regime_change` - `data.gap.detected -> Telemetry.on_data_gap` - `data.gap.detected -> publish(system.alert.ready, build_data_gap_alert(event))` - `market.universe.snapshot -> RiskEngine.on_universe_snapshot` ## 7. Risk Envelope Semantics Envelope is generated on regime transitions and stored in `RiskEnvelopeStore`. Current regime mapping: - `normal`: permissive limits - `tension`: reduced position/order/leverage - `stress`: tighter risk limits - `panic`: `allowed = False`, trading blocked Envelope TTL: - represented by `valid_from_ms` / `valid_until_ms` - store returns `None` when expired ## 8. Data Gap Handling Data gaps are inferred from `sequence_id` discontinuity in snapshots. Current assumptions: - one global sequence per `PriceIngestor` process - gap is tracked per ticker Limitations: - no persistent sequence state across restarts - no exchange-native sequence reconciliation ## 9. Reliability and Retries ### Price fetch - retries around `yfinance.download` via `call_with_backoff` - returns empty chunk on exhaustion ### Alert delivery - failed send remains in queue - re-attempted with exponential backoff - dropped after max retries ## 10. Configuration Main runtime config: - `src/core/config.py` Important values: - polling: `POLL_INTERVAL_SEC` - ingestion parallelism: `PROCESS_POOL_WORKERS`, `BATCH_SIZE` - yfinance retry: `YF_MAX_RETRIES`, `YF_BACKOFF_BASE_SEC` - alert retry: `ALERT_MAX_RETRIES`, `ALERT_BACKOFF_BASE_SEC` - telemetry reporting: `TELEMETRY_REPORT_INTERVAL_SEC` - topic config path: `TOPICS_CONFIG_PATH` ## 11. Known Limitations - In-memory pub/sub and stores are process-local only. - Universe is loaded once at startup (no periodic refresh yet). - Telemetry outputs to stdout only (no external sink). - No durable queue for alerts. - No persistent event/audit storage. ## 12. Run and Verify Install dependencies and run: ```bash uv sync uv run python -m src.core.main ``` Recommended quick checks: 1. Verify topics loaded from `configs/topics.yaml`. 2. Confirm `risk.regime.change` appears when volatility increases. 3. Confirm `risk.envelope.updated` appears on same transitions. 4. Simulate send failures and verify alert queue retries. 5. Observe telemetry counters printed periodically. ## 13. Detailed Sequence Diagram (ASCII) ```text +====================================================================================================+ | DETAILED EVENT SEQUENCE (1 CYCLE) | +====================================================================================================+ Actors: M = main loop UL = UniverseLoader PI = PriceIngestor PS = PubSub RE = RiskEngine DG = DataGapDetector T = Telemetry AQ = AlertQueue TN = TelegramNotifier RS = RiskEnvelopeStore M UL PI PS RE DG T AQ TN RS | | | | | | | | | | |--load()-->| | | | | | | | | |<--universe| | | | | | | | | |--build_snapshot-------> | | | | | | | |------------------------------publish market.universe.snapshot--------------------->| | | | |--on_universe_snapshot---------->| | | | | | | | |== loop ================================================================================================> | |--for each venue--------------------------------------------------------------------------------------------| |--build_event------->| | | | | | | | |------------------------------publish market.universe.updated---------------------------------------------->| |--fetch_prices------------------>| | | | | | | | |--download+retry/backoff--> yfinance | | |<--prices + sequence_id | |<-------------------------------| | | | | | | |------------------------------publish market.prices.snapshot------------------------------------------------->| | |--on_price_snapshot-------------------------------------->| | | calc deltas/events/regime | | |--publish market.price.delta----------------------------->| | |--publish risk.event.detected---------------------------->| | | |--on_risk_event | |--publish system.alert.ready----------------------------->| | | |--enqueue---->| | | |--on_alert_enqueued | |--if regime changed: | | | publish risk.regime.change----------------------------->| | | |--on_regime_change | | build envelope | | |----------------------------------------------set()------->| | | publish risk.envelope.updated--------------------------->| | | | |--DG.on_price_snapshot------------------------------------>| | | if seq gap: | |--publish data.gap.detected------------------------------->| | | |--on_data_gap | |--publish system.alert.ready(data_gap)-------------------->| | | |--enqueue---->| |--AQ.process(TN.send)---------------------------------------------------------------------------------------->| | |--send alert------------->| | |<--ok/fail---------------| | |--retry schedule if fail | |== sleep POLL_INTERVAL_SEC =================================================================================>| ``` ## 14. Regime Transition Diagram (ASCII) ```text +====================================================================================================+ | RISK REGIME TRANSITION LOGIC | +====================================================================================================+ Inputs per snapshot: worst_drop_pct high_severity_count Threshold rules: panic if worst_drop_pct <= -8.0 OR high_severity_count >= 3 stress if worst_drop_pct <= -5.0 OR high_severity_count >= 2 tension if worst_drop_pct <= -3.0 OR high_severity_count >= 1 normal otherwise Transition behavior: - Escalation (to stronger regime): immediate - Downgrade (to weaker regime): requires calm_snapshots_to_downgrade (default 3) State graph: (escalate) +--------- normal ---------+ | | | | v | | tension | | | | | v | | stress | | | | | v | +---------- panic <--------+ ^ | (downgrade after calm streak) +-------------------------------- On each regime change: 1) publish risk.regime.change 2) build and store RiskEnvelope 3) publish risk.envelope.updated ``` ## 15. Failure, Retry, and Degradation Flows (ASCII) ```text +====================================================================================================+ | FAILURE / RETRY / BACKPRESSURE MAP | +====================================================================================================+ 1) Price fetch failures (yfinance) PI._fetch_chunk() -> call_with_backoff(max_attempts=YF_MAX_RETRIES, base=YF_BACKOFF_BASE_SEC) -> if exhausted: return empty chunk -> cycle continues (degraded data, no crash) 2) Telegram send failures AQ.process(TN.send) -> failed send increments attempts -> next attempt scheduled via exponential backoff -> dropped after ALERT_MAX_RETRIES 3) Data gaps DG compares sequence_id per ticker -> if jump detected: emit data.gap.detected -> converted into system alert + telemetry counter 4) Topic config failure topics.py load_topics() -> if YAML missing/malformed: fallback to hardcoded defaults ``` ## 16. System Overview Diagram (ASCII) ```text +====================================================================================================+ | MARKET ANALYZING PLATFORM (CURRENT) | +====================================================================================================+ STARTUP PHASE ============= +---------------------+ +---------------------+ +-----------------------+ | configs/topics.yaml| -----> | core/topics.py | -----> | TOPICS map in memory | +---------------------+ +---------------------+ +-----------------------+ +---------------------+ +---------------------+ +-----------------------+ | UniverseLoader | -----> | universe snapshot | -----> | market.universe.snapshot | (US/EU/Crypto/Comm) | | (universe_id, etc.) | | -> RiskEngine stores universe_id +---------------------+ +---------------------+ +-----------------------+ +---------------------+ +---------------------+ +-----------------------+ | Build runtime graph | -----> | PubSub subscriptions| -----> | Service ready | | (main.py) | | (handlers) | | polling loop starts | +---------------------+ +---------------------+ +-----------------------+ RUNTIME LOOP ============ every POLL_INTERVAL_SEC | v +---------------------------+ | for each venue universe | | (us.equities, eu..., etc) | +---------------------------+ | v +---------------------------+ +--------------------------------------------+ | PriceIngestor | | retry_utils.call_with_backoff | | - chunk tickers | <-----> | exponential backoff for yfinance download | | - process pool workers | +--------------------------------------------+ | - sequence_id++ | +---------------------------+ | v topic: market.prices.snapshot | +------------------------------------------------------------------------------------+ | | v v +-------------------------+ +---------------------------+ | RiskEngine | | DataGapDetector | | - update PriceBuffer | | - per ticker last seq | | - calc delta per TF | | - detect seq jumps | | - adaptive threshold | +---------------------------+ | - detect risk events | | +-------------------------+ v | topic: data.gap.detected | | | +--------------------------+ | | Telemetry.on_data_gap | | +--------------------------+ | | | v | build_data_gap_alert(event) | | | v | topic: system.alert.ready | +--> topic: market.price.delta | +--> topic: risk.event.detected -------> Telemetry.on_risk_event | +--> topic: system.alert.ready --------> AlertQueue.enqueue | +--> Regime State Machine (normal/tension/stress/panic) | | if regime changed: v topic: risk.regime.change -------> Telemetry.on_regime_change | v build RiskEnvelope (limits by regime) | v RiskEnvelopeStore.set(...) | v topic: risk.envelope.updated ALERT DELIVERY (ASYNC-ish IN LOOP) ================================ topic: system.alert.ready | +--> AlertQueue.enqueue +--> Telemetry.on_alert_enqueued end of each polling iteration: | v AlertQueue.process(TelegramNotifier.send) | +--> success: remove item | +--> failure: schedule retry with exponential backoff until ALERT_MAX_RETRIES exhausted RISK REGIME -> ENVELOPE MAP =========================== normal -> allowed=true, higher limits tension -> allowed=true, reduced limits stress -> allowed=true, tighter limits panic -> allowed=false, zero limits (block trading) TOPIC TAXONOMY ============== market.universe.snapshot market.universe.updated market.prices.snapshot market.price.delta risk.event.detected risk.regime.change risk.envelope.updated data.gap.detected system.alert.ready TELEMETRY OUTPUT ================ [TELEMETRY] alerts_enqueued=... risk_events=... risk_severity:high=... risk_regime_changes=... risk_regime_to:stress=... data_gap_events=... data_gap_ticker:BTC-USD=... ```