Spaces:
No application file
Current Version: Architecture and Runtime Flow
This document describes how the current src/core implementation works.
1. Scope
The current version implements a single-process POC service with:
- universe loading from no-auth sources
- price ingestion via
yfinancewith process pool - in-memory pub/sub
- multi-timeframe delta and risk event detection
- market regime state machine
risk.regime.changeandrisk.envelope.updatedevents- data gap detection
- alert queue with retries
- basic telemetry counters
Entry point:
src/core/main.py
2. Main Runtime Loop
At startup (main.py):
- Build shared components (
PubSub,RiskEngine,DataGapDetector,AlertQueue,Telemetry,RiskEnvelopeStore). - Register topic subscriptions.
- Load universe once and publish
market.universe.snapshot. - Enter infinite loop with
POLL_INTERVAL_SEC.
Per polling iteration (run_once):
- Publish
market.universe.updatedfor each venue bucket. - Fetch prices for that venue through
PriceIngestor. - Publish
market.prices.snapshot. - Process pending alert queue retries.
3. Components and Responsibilities
3.1 Universe Loader
File: src/core/universe_loader.py
Responsibilities:
- load symbol universe for US equities, EU equities, crypto, commodities
- normalize to
yfinance-compatible symbols - build universe snapshot event
Produced events:
market.universe.snapshotmarket.universe.updated
3.2 Price Ingestor
File: src/core/price_ingestor.py
Responsibilities:
- split ticker list into chunks (
BATCH_SIZE) - fetch each chunk (parallel via
ProcessPoolExecutor) - apply retry/backoff wrapper around
yfinance.download - emit snapshot with monotonic
sequence_id
Produced event:
market.prices.snapshot
3.3 Retry Utility
File: src/core/retry_utils.py
Responsibilities:
backoff_delay(base_delay_sec, attempt)exponential delaycall_with_backoff(fn, max_attempts, base_delay_sec)common retry wrapper
Used by:
PriceIngestorfetch retriesAlertQueueretry scheduling
3.4 Risk Engine
File: src/core/risk_engine.py
Responsibilities:
- maintain rolling price windows (
PriceBuffer) - compute per-timeframe deltas (
Price Delta Calculator) - detect threshold breaches and severity
- emit per-ticker risk events and alerts
- maintain and update regime state machine
- publish regime transitions separately from envelope updates
Produced events:
market.price.deltarisk.event.detectedrisk.regime.changerisk.envelope.updatedsystem.alert.ready(risk alerts)
3.5 Regime State Machine
File: src/core/risk_regime.py
States:
normaltensionstresspanic
Inputs per snapshot:
worst_drop_pcthigh_severity_count
Logic:
- escalates immediately on stronger stress conditions
- downgrades only after
calm_snapshots_to_downgradeto avoid oscillation
3.6 Data Gap Detector
File: src/core/data_gap.py
Responsibilities:
- track last seen
sequence_idper ticker - detect jumps (
current - last > 1) - emit
data.gap.detected
3.7 Alerting
File: src/core/alerting.py
Responsibilities:
- build risk alerts (
build_alert) - build data gap alerts (
build_data_gap_alert) - send alerts via Telegram notifier
- queue alerts and retry failed sends
Queue behavior:
- enqueue on
system.alert.ready - process each loop
- retry with exponential backoff
- drop after
ALERT_MAX_RETRIES
3.8 Telemetry
File: src/core/telemetry.py
Responsibilities:
- count events by class and key dimensions
- periodic stdout report every
TELEMETRY_REPORT_INTERVAL_SEC
Current counters include:
alerts_enqueuedrisk_eventsrisk_severity:*risk_regime_changesrisk_regime_to:*data_gap_eventsdata_gap_ticker:*
4. Topic Taxonomy
Configured in:
configs/topics.yaml
Current keys:
universe_snapshot:market.universe.snapshotuniverse_updated:market.universe.updatedprices_snapshot:market.prices.snapshotprice_delta:market.price.deltarisk_event:risk.event.detectedrisk_regime_change:risk.regime.changerisk_envelope:risk.envelope.updateddata_gap:data.gap.detectedalert_ready:system.alert.ready
Loaded by:
src/core/topics.py
Behavior:
- fallback to hardcoded defaults if YAML is missing or malformed
5. Event Contracts
Schemas live in:
src/core/schemas.py
Defined typed contracts:
InstrumentUniverseSnapshotRiskEnvelopeRiskRegimeChangeEventDataGapEvent
Notes:
- runtime payloads are plain dicts; TypedDict provides development-time contract guidance
6. Subscription Graph
Set in src/core/main.py:
system.alert.ready -> AlertQueue.enqueuesystem.alert.ready -> Telemetry.on_alert_enqueuedmarket.prices.snapshot -> RiskEngine.on_price_snapshotmarket.prices.snapshot -> DataGapDetector.on_price_snapshotrisk.event.detected -> Telemetry.on_risk_eventrisk.regime.change -> Telemetry.on_regime_changedata.gap.detected -> Telemetry.on_data_gapdata.gap.detected -> publish(system.alert.ready, build_data_gap_alert(event))market.universe.snapshot -> RiskEngine.on_universe_snapshot
7. Risk Envelope Semantics
Envelope is generated on regime transitions and stored in RiskEnvelopeStore.
Current regime mapping:
normal: permissive limitstension: reduced position/order/leveragestress: tighter risk limitspanic:allowed = False, trading blocked
Envelope TTL:
- represented by
valid_from_ms/valid_until_ms - store returns
Nonewhen expired
8. Data Gap Handling
Data gaps are inferred from sequence_id discontinuity in snapshots.
Current assumptions:
- one global sequence per
PriceIngestorprocess - gap is tracked per ticker
Limitations:
- no persistent sequence state across restarts
- no exchange-native sequence reconciliation
9. Reliability and Retries
Price fetch
- retries around
yfinance.downloadviacall_with_backoff - returns empty chunk on exhaustion
Alert delivery
- failed send remains in queue
- re-attempted with exponential backoff
- dropped after max retries
10. Configuration
Main runtime config:
src/core/config.py
Important values:
- polling:
POLL_INTERVAL_SEC - ingestion parallelism:
PROCESS_POOL_WORKERS,BATCH_SIZE - yfinance retry:
YF_MAX_RETRIES,YF_BACKOFF_BASE_SEC - alert retry:
ALERT_MAX_RETRIES,ALERT_BACKOFF_BASE_SEC - telemetry reporting:
TELEMETRY_REPORT_INTERVAL_SEC - topic config path:
TOPICS_CONFIG_PATH
11. Known Limitations
- In-memory pub/sub and stores are process-local only.
- Universe is loaded once at startup (no periodic refresh yet).
- Telemetry outputs to stdout only (no external sink).
- No durable queue for alerts.
- No persistent event/audit storage.
12. Run and Verify
Install dependencies and run:
uv sync
uv run python -m src.core.main
Recommended quick checks:
- Verify topics loaded from
configs/topics.yaml. - Confirm
risk.regime.changeappears when volatility increases. - Confirm
risk.envelope.updatedappears on same transitions. - Simulate send failures and verify alert queue retries.
- Observe telemetry counters printed periodically.
13. Detailed Sequence Diagram (ASCII)
+====================================================================================================+
| DETAILED EVENT SEQUENCE (1 CYCLE) |
+====================================================================================================+
Actors:
M = main loop
UL = UniverseLoader
PI = PriceIngestor
PS = PubSub
RE = RiskEngine
DG = DataGapDetector
T = Telemetry
AQ = AlertQueue
TN = TelegramNotifier
RS = RiskEnvelopeStore
M UL PI PS RE DG T AQ TN RS
| | | | | | | | | |
|--load()-->| | | | | | | | |
|<--universe| | | | | | | | |
|--build_snapshot-------> | | | | | | |
|------------------------------publish market.universe.snapshot--------------------->| | |
| |--on_universe_snapshot---------->| | |
| | | | |
|== loop ================================================================================================> |
|--for each venue--------------------------------------------------------------------------------------------|
|--build_event------->| | | | | | | |
|------------------------------publish market.universe.updated---------------------------------------------->|
|--fetch_prices------------------>| | | | | | |
| |--download+retry/backoff--> yfinance |
| |<--prices + sequence_id |
|<-------------------------------| | | | | | |
|------------------------------publish market.prices.snapshot------------------------------------------------->|
| |--on_price_snapshot-------------------------------------->|
| | calc deltas/events/regime |
| |--publish market.price.delta----------------------------->|
| |--publish risk.event.detected---------------------------->|
| | |--on_risk_event
| |--publish system.alert.ready----------------------------->|
| | |--enqueue---->|
| | |--on_alert_enqueued
| |--if regime changed: |
| | publish risk.regime.change----------------------------->|
| | |--on_regime_change
| | build envelope |
| |----------------------------------------------set()------->|
| | publish risk.envelope.updated--------------------------->|
| |
| |--DG.on_price_snapshot------------------------------------>|
| | if seq gap:
| |--publish data.gap.detected------------------------------->|
| | |--on_data_gap
| |--publish system.alert.ready(data_gap)-------------------->|
| | |--enqueue---->|
|--AQ.process(TN.send)---------------------------------------------------------------------------------------->|
| |--send alert------------->|
| |<--ok/fail---------------|
| |--retry schedule if fail |
|== sleep POLL_INTERVAL_SEC =================================================================================>|
14. Regime Transition Diagram (ASCII)
+====================================================================================================+
| RISK REGIME TRANSITION LOGIC |
+====================================================================================================+
Inputs per snapshot:
worst_drop_pct
high_severity_count
Threshold rules:
panic if worst_drop_pct <= -8.0 OR high_severity_count >= 3
stress if worst_drop_pct <= -5.0 OR high_severity_count >= 2
tension if worst_drop_pct <= -3.0 OR high_severity_count >= 1
normal otherwise
Transition behavior:
- Escalation (to stronger regime): immediate
- Downgrade (to weaker regime): requires calm_snapshots_to_downgrade (default 3)
State graph:
(escalate)
+--------- normal ---------+
| | |
| v |
| tension |
| | |
| v |
| stress |
| | |
| v |
+---------- panic <--------+
^
| (downgrade after calm streak)
+--------------------------------
On each regime change:
1) publish risk.regime.change
2) build and store RiskEnvelope
3) publish risk.envelope.updated
15. Failure, Retry, and Degradation Flows (ASCII)
+====================================================================================================+
| FAILURE / RETRY / BACKPRESSURE MAP |
+====================================================================================================+
1) Price fetch failures (yfinance)
PI._fetch_chunk()
-> call_with_backoff(max_attempts=YF_MAX_RETRIES, base=YF_BACKOFF_BASE_SEC)
-> if exhausted: return empty chunk
-> cycle continues (degraded data, no crash)
2) Telegram send failures
AQ.process(TN.send)
-> failed send increments attempts
-> next attempt scheduled via exponential backoff
-> dropped after ALERT_MAX_RETRIES
3) Data gaps
DG compares sequence_id per ticker
-> if jump detected: emit data.gap.detected
-> converted into system alert + telemetry counter
4) Topic config failure
topics.py load_topics()
-> if YAML missing/malformed: fallback to hardcoded defaults
16. System Overview Diagram (ASCII)
+====================================================================================================+
| MARKET ANALYZING PLATFORM (CURRENT) |
+====================================================================================================+
STARTUP PHASE
=============
+---------------------+ +---------------------+ +-----------------------+
| configs/topics.yaml| -----> | core/topics.py | -----> | TOPICS map in memory |
+---------------------+ +---------------------+ +-----------------------+
+---------------------+ +---------------------+ +-----------------------+
| UniverseLoader | -----> | universe snapshot | -----> | market.universe.snapshot
| (US/EU/Crypto/Comm) | | (universe_id, etc.) | | -> RiskEngine stores universe_id
+---------------------+ +---------------------+ +-----------------------+
+---------------------+ +---------------------+ +-----------------------+
| Build runtime graph | -----> | PubSub subscriptions| -----> | Service ready |
| (main.py) | | (handlers) | | polling loop starts |
+---------------------+ +---------------------+ +-----------------------+
RUNTIME LOOP
============
every POLL_INTERVAL_SEC
|
v
+---------------------------+
| for each venue universe |
| (us.equities, eu..., etc) |
+---------------------------+
|
v
+---------------------------+ +--------------------------------------------+
| PriceIngestor | | retry_utils.call_with_backoff |
| - chunk tickers | <-----> | exponential backoff for yfinance download |
| - process pool workers | +--------------------------------------------+
| - sequence_id++ |
+---------------------------+
|
v
topic: market.prices.snapshot
|
+------------------------------------------------------------------------------------+
| |
v v
+-------------------------+ +---------------------------+
| RiskEngine | | DataGapDetector |
| - update PriceBuffer | | - per ticker last seq |
| - calc delta per TF | | - detect seq jumps |
| - adaptive threshold | +---------------------------+
| - detect risk events | |
+-------------------------+ v
| topic: data.gap.detected
| |
| +--------------------------+
| | Telemetry.on_data_gap |
| +--------------------------+
| |
| v
| build_data_gap_alert(event)
| |
| v
| topic: system.alert.ready
|
+--> topic: market.price.delta
|
+--> topic: risk.event.detected -------> Telemetry.on_risk_event
|
+--> topic: system.alert.ready --------> AlertQueue.enqueue
|
+--> Regime State Machine (normal/tension/stress/panic)
|
| if regime changed:
v
topic: risk.regime.change -------> Telemetry.on_regime_change
|
v
build RiskEnvelope (limits by regime)
|
v
RiskEnvelopeStore.set(...)
|
v
topic: risk.envelope.updated
ALERT DELIVERY (ASYNC-ish IN LOOP)
================================
topic: system.alert.ready
|
+--> AlertQueue.enqueue
+--> Telemetry.on_alert_enqueued
end of each polling iteration:
|
v
AlertQueue.process(TelegramNotifier.send)
|
+--> success: remove item
|
+--> failure: schedule retry with exponential backoff
until ALERT_MAX_RETRIES exhausted
RISK REGIME -> ENVELOPE MAP
===========================
normal -> allowed=true, higher limits
tension -> allowed=true, reduced limits
stress -> allowed=true, tighter limits
panic -> allowed=false, zero limits (block trading)
TOPIC TAXONOMY
==============
market.universe.snapshot
market.universe.updated
market.prices.snapshot
market.price.delta
risk.event.detected
risk.regime.change
risk.envelope.updated
data.gap.detected
system.alert.ready
TELEMETRY OUTPUT
================
[TELEMETRY] alerts_enqueued=... risk_events=... risk_severity:high=...
risk_regime_changes=... risk_regime_to:stress=...
data_gap_events=... data_gap_ticker:BTC-USD=...