Dmitry Beresnev commited on
Commit
7b8151d
·
1 Parent(s): abd065f

add core logic for marcoeconomical analysis

Browse files
README.md CHANGED
@@ -19,15 +19,19 @@ This POC implements a market-wide risk radar using:
19
  - Process pool for price loading
20
 
21
  ## Topics
22
- - `universe.updated`
23
- - `prices.snapshot`
24
- - `price.delta.calculated`
25
- - `event.detected`
26
- - `alert.ready`
 
 
 
 
27
 
28
  ## Run
29
  ```bash
30
- python -m src.main
31
  ```
32
 
33
  ## Notes
@@ -38,6 +42,8 @@ python -m src.main
38
  - EU equities from Wikipedia index constituents (FTSE 100, DAX, CAC 40)
39
  - Crypto from Binance `exchangeInfo`
40
  - Commodities from a curated Yahoo Finance futures list
 
 
41
 
42
  ## Telegram Bot
43
  Set environment variables:
@@ -45,4 +51,7 @@ Set environment variables:
45
  export TELEGRAM_BOT_TOKEN=...
46
  export TELEGRAM_CHAT_ID=...
47
  ```
 
 
 
48
  ```
 
19
  - Process pool for price loading
20
 
21
  ## Topics
22
+ Defined in `configs/topics.yaml`:
23
+ - `market.universe.snapshot`
24
+ - `market.universe.updated`
25
+ - `market.prices.snapshot`
26
+ - `market.price.delta`
27
+ - `risk.event.detected`
28
+ - `risk.envelope.updated`
29
+ - `data.gap.detected`
30
+ - `system.alert.ready`
31
 
32
  ## Run
33
  ```bash
34
+ python -m src.core.main
35
  ```
36
 
37
  ## Notes
 
42
  - EU equities from Wikipedia index constituents (FTSE 100, DAX, CAC 40)
43
  - Crypto from Binance `exchangeInfo`
44
  - Commodities from a curated Yahoo Finance futures list
45
+ - Universe schema lives in `src/core/schemas.py`
46
+ - Topic taxonomy is loaded from `configs/topics.yaml`
47
 
48
  ## Telegram Bot
49
  Set environment variables:
 
51
  export TELEGRAM_BOT_TOKEN=...
52
  export TELEGRAM_CHAT_ID=...
53
  ```
54
+
55
+ ## Detailed Architecture Doc
56
+ - `docs/CURRENT_VERSION.md`
57
  ```
configs/topics.yaml ADDED
@@ -0,0 +1,10 @@
 
 
 
 
 
 
 
 
 
 
 
1
+ topics:
2
+ universe_snapshot: market.universe.snapshot
3
+ universe_updated: market.universe.updated
4
+ prices_snapshot: market.prices.snapshot
5
+ price_delta: market.price.delta
6
+ risk_event: risk.event.detected
7
+ risk_regime_change: risk.regime.change
8
+ risk_envelope: risk.envelope.updated
9
+ data_gap: data.gap.detected
10
+ alert_ready: system.alert.ready
docs/CURRENT_VERSION.md ADDED
@@ -0,0 +1,545 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # Current Version: Architecture and Runtime Flow
2
+
3
+ This document describes how the current `src/core` implementation works.
4
+
5
+ ## 1. Scope
6
+
7
+ The current version implements a single-process POC service with:
8
+ - universe loading from no-auth sources
9
+ - price ingestion via `yfinance` with process pool
10
+ - in-memory pub/sub
11
+ - multi-timeframe delta and risk event detection
12
+ - market regime state machine
13
+ - `risk.regime.change` and `risk.envelope.updated` events
14
+ - data gap detection
15
+ - alert queue with retries
16
+ - basic telemetry counters
17
+
18
+ Entry point:
19
+ - `src/core/main.py`
20
+
21
+ ## 2. Main Runtime Loop
22
+
23
+ At startup (`main.py`):
24
+ 1. Build shared components (`PubSub`, `RiskEngine`, `DataGapDetector`, `AlertQueue`, `Telemetry`, `RiskEnvelopeStore`).
25
+ 2. Register topic subscriptions.
26
+ 3. Load universe once and publish `market.universe.snapshot`.
27
+ 4. Enter infinite loop with `POLL_INTERVAL_SEC`.
28
+
29
+ Per polling iteration (`run_once`):
30
+ 1. Publish `market.universe.updated` for each venue bucket.
31
+ 2. Fetch prices for that venue through `PriceIngestor`.
32
+ 3. Publish `market.prices.snapshot`.
33
+ 4. Process pending alert queue retries.
34
+
35
+ ## 3. Components and Responsibilities
36
+
37
+ ### 3.1 Universe Loader
38
+ File: `src/core/universe_loader.py`
39
+
40
+ Responsibilities:
41
+ - load symbol universe for US equities, EU equities, crypto, commodities
42
+ - normalize to `yfinance`-compatible symbols
43
+ - build universe snapshot event
44
+
45
+ Produced events:
46
+ - `market.universe.snapshot`
47
+ - `market.universe.updated`
48
+
49
+ ### 3.2 Price Ingestor
50
+ File: `src/core/price_ingestor.py`
51
+
52
+ Responsibilities:
53
+ - split ticker list into chunks (`BATCH_SIZE`)
54
+ - fetch each chunk (parallel via `ProcessPoolExecutor`)
55
+ - apply retry/backoff wrapper around `yfinance.download`
56
+ - emit snapshot with monotonic `sequence_id`
57
+
58
+ Produced event:
59
+ - `market.prices.snapshot`
60
+
61
+ ### 3.3 Retry Utility
62
+ File: `src/core/retry_utils.py`
63
+
64
+ Responsibilities:
65
+ - `backoff_delay(base_delay_sec, attempt)` exponential delay
66
+ - `call_with_backoff(fn, max_attempts, base_delay_sec)` common retry wrapper
67
+
68
+ Used by:
69
+ - `PriceIngestor` fetch retries
70
+ - `AlertQueue` retry scheduling
71
+
72
+ ### 3.4 Risk Engine
73
+ File: `src/core/risk_engine.py`
74
+
75
+ Responsibilities:
76
+ - maintain rolling price windows (`PriceBuffer`)
77
+ - compute per-timeframe deltas (`Price Delta Calculator`)
78
+ - detect threshold breaches and severity
79
+ - emit per-ticker risk events and alerts
80
+ - maintain and update regime state machine
81
+ - publish regime transitions separately from envelope updates
82
+
83
+ Produced events:
84
+ - `market.price.delta`
85
+ - `risk.event.detected`
86
+ - `risk.regime.change`
87
+ - `risk.envelope.updated`
88
+ - `system.alert.ready` (risk alerts)
89
+
90
+ ### 3.5 Regime State Machine
91
+ File: `src/core/risk_regime.py`
92
+
93
+ States:
94
+ - `normal`
95
+ - `tension`
96
+ - `stress`
97
+ - `panic`
98
+
99
+ Inputs per snapshot:
100
+ - `worst_drop_pct`
101
+ - `high_severity_count`
102
+
103
+ Logic:
104
+ - escalates immediately on stronger stress conditions
105
+ - downgrades only after `calm_snapshots_to_downgrade` to avoid oscillation
106
+
107
+ ### 3.6 Data Gap Detector
108
+ File: `src/core/data_gap.py`
109
+
110
+ Responsibilities:
111
+ - track last seen `sequence_id` per ticker
112
+ - detect jumps (`current - last > 1`)
113
+ - emit `data.gap.detected`
114
+
115
+ ### 3.7 Alerting
116
+ File: `src/core/alerting.py`
117
+
118
+ Responsibilities:
119
+ - build risk alerts (`build_alert`)
120
+ - build data gap alerts (`build_data_gap_alert`)
121
+ - send alerts via Telegram notifier
122
+ - queue alerts and retry failed sends
123
+
124
+ Queue behavior:
125
+ - enqueue on `system.alert.ready`
126
+ - process each loop
127
+ - retry with exponential backoff
128
+ - drop after `ALERT_MAX_RETRIES`
129
+
130
+ ### 3.8 Telemetry
131
+ File: `src/core/telemetry.py`
132
+
133
+ Responsibilities:
134
+ - count events by class and key dimensions
135
+ - periodic stdout report every `TELEMETRY_REPORT_INTERVAL_SEC`
136
+
137
+ Current counters include:
138
+ - `alerts_enqueued`
139
+ - `risk_events`
140
+ - `risk_severity:*`
141
+ - `risk_regime_changes`
142
+ - `risk_regime_to:*`
143
+ - `data_gap_events`
144
+ - `data_gap_ticker:*`
145
+
146
+ ## 4. Topic Taxonomy
147
+
148
+ Configured in:
149
+ - `configs/topics.yaml`
150
+
151
+ Current keys:
152
+ - `universe_snapshot`: `market.universe.snapshot`
153
+ - `universe_updated`: `market.universe.updated`
154
+ - `prices_snapshot`: `market.prices.snapshot`
155
+ - `price_delta`: `market.price.delta`
156
+ - `risk_event`: `risk.event.detected`
157
+ - `risk_regime_change`: `risk.regime.change`
158
+ - `risk_envelope`: `risk.envelope.updated`
159
+ - `data_gap`: `data.gap.detected`
160
+ - `alert_ready`: `system.alert.ready`
161
+
162
+ Loaded by:
163
+ - `src/core/topics.py`
164
+
165
+ Behavior:
166
+ - fallback to hardcoded defaults if YAML is missing or malformed
167
+
168
+ ## 5. Event Contracts
169
+
170
+ Schemas live in:
171
+ - `src/core/schemas.py`
172
+
173
+ Defined typed contracts:
174
+ - `Instrument`
175
+ - `UniverseSnapshot`
176
+ - `RiskEnvelope`
177
+ - `RiskRegimeChangeEvent`
178
+ - `DataGapEvent`
179
+
180
+ Notes:
181
+ - runtime payloads are plain dicts; TypedDict provides development-time contract guidance
182
+
183
+ ## 6. Subscription Graph
184
+
185
+ Set in `src/core/main.py`:
186
+ - `system.alert.ready -> AlertQueue.enqueue`
187
+ - `system.alert.ready -> Telemetry.on_alert_enqueued`
188
+ - `market.prices.snapshot -> RiskEngine.on_price_snapshot`
189
+ - `market.prices.snapshot -> DataGapDetector.on_price_snapshot`
190
+ - `risk.event.detected -> Telemetry.on_risk_event`
191
+ - `risk.regime.change -> Telemetry.on_regime_change`
192
+ - `data.gap.detected -> Telemetry.on_data_gap`
193
+ - `data.gap.detected -> publish(system.alert.ready, build_data_gap_alert(event))`
194
+ - `market.universe.snapshot -> RiskEngine.on_universe_snapshot`
195
+
196
+ ## 7. Risk Envelope Semantics
197
+
198
+ Envelope is generated on regime transitions and stored in `RiskEnvelopeStore`.
199
+
200
+ Current regime mapping:
201
+ - `normal`: permissive limits
202
+ - `tension`: reduced position/order/leverage
203
+ - `stress`: tighter risk limits
204
+ - `panic`: `allowed = False`, trading blocked
205
+
206
+ Envelope TTL:
207
+ - represented by `valid_from_ms` / `valid_until_ms`
208
+ - store returns `None` when expired
209
+
210
+ ## 8. Data Gap Handling
211
+
212
+ Data gaps are inferred from `sequence_id` discontinuity in snapshots.
213
+
214
+ Current assumptions:
215
+ - one global sequence per `PriceIngestor` process
216
+ - gap is tracked per ticker
217
+
218
+ Limitations:
219
+ - no persistent sequence state across restarts
220
+ - no exchange-native sequence reconciliation
221
+
222
+ ## 9. Reliability and Retries
223
+
224
+ ### Price fetch
225
+ - retries around `yfinance.download` via `call_with_backoff`
226
+ - returns empty chunk on exhaustion
227
+
228
+ ### Alert delivery
229
+ - failed send remains in queue
230
+ - re-attempted with exponential backoff
231
+ - dropped after max retries
232
+
233
+ ## 10. Configuration
234
+
235
+ Main runtime config:
236
+ - `src/core/config.py`
237
+
238
+ Important values:
239
+ - polling: `POLL_INTERVAL_SEC`
240
+ - ingestion parallelism: `PROCESS_POOL_WORKERS`, `BATCH_SIZE`
241
+ - yfinance retry: `YF_MAX_RETRIES`, `YF_BACKOFF_BASE_SEC`
242
+ - alert retry: `ALERT_MAX_RETRIES`, `ALERT_BACKOFF_BASE_SEC`
243
+ - telemetry reporting: `TELEMETRY_REPORT_INTERVAL_SEC`
244
+ - topic config path: `TOPICS_CONFIG_PATH`
245
+
246
+ ## 11. Known Limitations
247
+
248
+ - In-memory pub/sub and stores are process-local only.
249
+ - Universe is loaded once at startup (no periodic refresh yet).
250
+ - Telemetry outputs to stdout only (no external sink).
251
+ - No durable queue for alerts.
252
+ - No persistent event/audit storage.
253
+
254
+ ## 12. Run and Verify
255
+
256
+ Install dependencies and run:
257
+ ```bash
258
+ pip install -r requirements.txt
259
+ python -m src.core.main
260
+ ```
261
+
262
+ Recommended quick checks:
263
+ 1. Verify topics loaded from `configs/topics.yaml`.
264
+ 2. Confirm `risk.regime.change` appears when volatility increases.
265
+ 3. Confirm `risk.envelope.updated` appears on same transitions.
266
+ 4. Simulate send failures and verify alert queue retries.
267
+ 5. Observe telemetry counters printed periodically.
268
+
269
+ ## 13. Detailed Sequence Diagram (ASCII)
270
+
271
+ ```text
272
+ +====================================================================================================+
273
+ | DETAILED EVENT SEQUENCE (1 CYCLE) |
274
+ +====================================================================================================+
275
+
276
+ Actors:
277
+ M = main loop
278
+ UL = UniverseLoader
279
+ PI = PriceIngestor
280
+ PS = PubSub
281
+ RE = RiskEngine
282
+ DG = DataGapDetector
283
+ T = Telemetry
284
+ AQ = AlertQueue
285
+ TN = TelegramNotifier
286
+ RS = RiskEnvelopeStore
287
+
288
+ M UL PI PS RE DG T AQ TN RS
289
+ | | | | | | | | | |
290
+ |--load()-->| | | | | | | | |
291
+ |<--universe| | | | | | | | |
292
+ |--build_snapshot-------> | | | | | | |
293
+ |------------------------------publish market.universe.snapshot--------------------->| | |
294
+ | |--on_universe_snapshot---------->| | |
295
+ | | | | |
296
+ |== loop ================================================================================================> |
297
+ |--for each venue--------------------------------------------------------------------------------------------|
298
+ |--build_event------->| | | | | | | |
299
+ |------------------------------publish market.universe.updated---------------------------------------------->|
300
+ |--fetch_prices------------------>| | | | | | |
301
+ | |--download+retry/backoff--> yfinance |
302
+ | |<--prices + sequence_id |
303
+ |<-------------------------------| | | | | | |
304
+ |------------------------------publish market.prices.snapshot------------------------------------------------->|
305
+ | |--on_price_snapshot-------------------------------------->|
306
+ | | calc deltas/events/regime |
307
+ | |--publish market.price.delta----------------------------->|
308
+ | |--publish risk.event.detected---------------------------->|
309
+ | | |--on_risk_event
310
+ | |--publish system.alert.ready----------------------------->|
311
+ | | |--enqueue---->|
312
+ | | |--on_alert_enqueued
313
+ | |--if regime changed: |
314
+ | | publish risk.regime.change----------------------------->|
315
+ | | |--on_regime_change
316
+ | | build envelope |
317
+ | |----------------------------------------------set()------->|
318
+ | | publish risk.envelope.updated--------------------------->|
319
+ | |
320
+ | |--DG.on_price_snapshot------------------------------------>|
321
+ | | if seq gap:
322
+ | |--publish data.gap.detected------------------------------->|
323
+ | | |--on_data_gap
324
+ | |--publish system.alert.ready(data_gap)-------------------->|
325
+ | | |--enqueue---->|
326
+ |--AQ.process(TN.send)---------------------------------------------------------------------------------------->|
327
+ | |--send alert------------->|
328
+ | |<--ok/fail---------------|
329
+ | |--retry schedule if fail |
330
+ |== sleep POLL_INTERVAL_SEC =================================================================================>|
331
+ ```
332
+
333
+ ## 14. Regime Transition Diagram (ASCII)
334
+
335
+ ```text
336
+ +====================================================================================================+
337
+ | RISK REGIME TRANSITION LOGIC |
338
+ +====================================================================================================+
339
+
340
+ Inputs per snapshot:
341
+ worst_drop_pct
342
+ high_severity_count
343
+
344
+ Threshold rules:
345
+ panic if worst_drop_pct <= -8.0 OR high_severity_count >= 3
346
+ stress if worst_drop_pct <= -5.0 OR high_severity_count >= 2
347
+ tension if worst_drop_pct <= -3.0 OR high_severity_count >= 1
348
+ normal otherwise
349
+
350
+ Transition behavior:
351
+ - Escalation (to stronger regime): immediate
352
+ - Downgrade (to weaker regime): requires calm_snapshots_to_downgrade (default 3)
353
+
354
+ State graph:
355
+
356
+ (escalate)
357
+ +--------- normal ---------+
358
+ | | |
359
+ | v |
360
+ | tension |
361
+ | | |
362
+ | v |
363
+ | stress |
364
+ | | |
365
+ | v |
366
+ +---------- panic <--------+
367
+ ^
368
+ | (downgrade after calm streak)
369
+ +--------------------------------
370
+
371
+ On each regime change:
372
+ 1) publish risk.regime.change
373
+ 2) build and store RiskEnvelope
374
+ 3) publish risk.envelope.updated
375
+ ```
376
+
377
+ ## 15. Failure, Retry, and Degradation Flows (ASCII)
378
+
379
+ ```text
380
+ +====================================================================================================+
381
+ | FAILURE / RETRY / BACKPRESSURE MAP |
382
+ +====================================================================================================+
383
+
384
+ 1) Price fetch failures (yfinance)
385
+ PI._fetch_chunk()
386
+ -> call_with_backoff(max_attempts=YF_MAX_RETRIES, base=YF_BACKOFF_BASE_SEC)
387
+ -> if exhausted: return empty chunk
388
+ -> cycle continues (degraded data, no crash)
389
+
390
+ 2) Telegram send failures
391
+ AQ.process(TN.send)
392
+ -> failed send increments attempts
393
+ -> next attempt scheduled via exponential backoff
394
+ -> dropped after ALERT_MAX_RETRIES
395
+
396
+ 3) Data gaps
397
+ DG compares sequence_id per ticker
398
+ -> if jump detected: emit data.gap.detected
399
+ -> converted into system alert + telemetry counter
400
+
401
+ 4) Topic config failure
402
+ topics.py load_topics()
403
+ -> if YAML missing/malformed: fallback to hardcoded defaults
404
+ ```
405
+
406
+ ## 16. System Overview Diagram (ASCII)
407
+
408
+ ```text
409
+ +====================================================================================================+
410
+ | MARKET ANALYZING PLATFORM (CURRENT) |
411
+ +====================================================================================================+
412
+
413
+ STARTUP PHASE
414
+ =============
415
+
416
+ +---------------------+ +---------------------+ +-----------------------+
417
+ | configs/topics.yaml| -----> | core/topics.py | -----> | TOPICS map in memory |
418
+ +---------------------+ +---------------------+ +-----------------------+
419
+
420
+ +---------------------+ +---------------------+ +-----------------------+
421
+ | UniverseLoader | -----> | universe snapshot | -----> | market.universe.snapshot
422
+ | (US/EU/Crypto/Comm) | | (universe_id, etc.) | | -> RiskEngine stores universe_id
423
+ +---------------------+ +---------------------+ +-----------------------+
424
+
425
+ +---------------------+ +---------------------+ +-----------------------+
426
+ | Build runtime graph | -----> | PubSub subscriptions| -----> | Service ready |
427
+ | (main.py) | | (handlers) | | polling loop starts |
428
+ +---------------------+ +---------------------+ +-----------------------+
429
+
430
+
431
+ RUNTIME LOOP
432
+ ============
433
+
434
+ every POLL_INTERVAL_SEC
435
+ |
436
+ v
437
+ +---------------------------+
438
+ | for each venue universe |
439
+ | (us.equities, eu..., etc) |
440
+ +---------------------------+
441
+ |
442
+ v
443
+ +---------------------------+ +--------------------------------------------+
444
+ | PriceIngestor | | retry_utils.call_with_backoff |
445
+ | - chunk tickers | <-----> | exponential backoff for yfinance download |
446
+ | - process pool workers | +--------------------------------------------+
447
+ | - sequence_id++ |
448
+ +---------------------------+
449
+ |
450
+ v
451
+ topic: market.prices.snapshot
452
+ |
453
+ +------------------------------------------------------------------------------------+
454
+ | |
455
+ v v
456
+ +-------------------------+ +---------------------------+
457
+ | RiskEngine | | DataGapDetector |
458
+ | - update PriceBuffer | | - per ticker last seq |
459
+ | - calc delta per TF | | - detect seq jumps |
460
+ | - adaptive threshold | +---------------------------+
461
+ | - detect risk events | |
462
+ +-------------------------+ v
463
+ | topic: data.gap.detected
464
+ | |
465
+ | +--------------------------+
466
+ | | Telemetry.on_data_gap |
467
+ | +--------------------------+
468
+ | |
469
+ | v
470
+ | build_data_gap_alert(event)
471
+ | |
472
+ | v
473
+ | topic: system.alert.ready
474
+ |
475
+ +--> topic: market.price.delta
476
+ |
477
+ +--> topic: risk.event.detected -------> Telemetry.on_risk_event
478
+ |
479
+ +--> topic: system.alert.ready --------> AlertQueue.enqueue
480
+ |
481
+ +--> Regime State Machine (normal/tension/stress/panic)
482
+ |
483
+ | if regime changed:
484
+ v
485
+ topic: risk.regime.change -------> Telemetry.on_regime_change
486
+ |
487
+ v
488
+ build RiskEnvelope (limits by regime)
489
+ |
490
+ v
491
+ RiskEnvelopeStore.set(...)
492
+ |
493
+ v
494
+ topic: risk.envelope.updated
495
+
496
+
497
+ ALERT DELIVERY (ASYNC-ish IN LOOP)
498
+ ================================
499
+
500
+ topic: system.alert.ready
501
+ |
502
+ +--> AlertQueue.enqueue
503
+ +--> Telemetry.on_alert_enqueued
504
+
505
+ end of each polling iteration:
506
+ |
507
+ v
508
+ AlertQueue.process(TelegramNotifier.send)
509
+ |
510
+ +--> success: remove item
511
+ |
512
+ +--> failure: schedule retry with exponential backoff
513
+ until ALERT_MAX_RETRIES exhausted
514
+
515
+
516
+ RISK REGIME -> ENVELOPE MAP
517
+ ===========================
518
+
519
+ normal -> allowed=true, higher limits
520
+ tension -> allowed=true, reduced limits
521
+ stress -> allowed=true, tighter limits
522
+ panic -> allowed=false, zero limits (block trading)
523
+
524
+
525
+ TOPIC TAXONOMY
526
+ ==============
527
+
528
+ market.universe.snapshot
529
+ market.universe.updated
530
+ market.prices.snapshot
531
+ market.price.delta
532
+ risk.event.detected
533
+ risk.regime.change
534
+ risk.envelope.updated
535
+ data.gap.detected
536
+ system.alert.ready
537
+
538
+
539
+ TELEMETRY OUTPUT
540
+ ================
541
+
542
+ [TELEMETRY] alerts_enqueued=... risk_events=... risk_severity:high=...
543
+ risk_regime_changes=... risk_regime_to:stress=...
544
+ data_gap_events=... data_gap_ticker:BTC-USD=...
545
+ ```
requirements.txt CHANGED
@@ -3,3 +3,4 @@ pandas
3
  numpy
4
  requests
5
  lxml
 
 
3
  numpy
4
  requests
5
  lxml
6
+ pyyaml
src/core/__init__.py CHANGED
@@ -0,0 +1,20 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from .schemas import (
2
+ Instrument,
3
+ UniverseSnapshot,
4
+ RiskEnvelope,
5
+ RiskRegimeChangeEvent,
6
+ DataGapEvent,
7
+ )
8
+ from .topics import TOPICS
9
+ from .retry_utils import backoff_delay, call_with_backoff
10
+
11
+ __all__ = [
12
+ "Instrument",
13
+ "UniverseSnapshot",
14
+ "RiskEnvelope",
15
+ "RiskRegimeChangeEvent",
16
+ "DataGapEvent",
17
+ "TOPICS",
18
+ "backoff_delay",
19
+ "call_with_backoff",
20
+ ]
src/core/alerting.py CHANGED
@@ -1,5 +1,5 @@
1
  from collections import defaultdict
2
- from typing import Dict
3
  import time
4
  import uuid
5
  import os
@@ -7,6 +7,7 @@ import os
7
  import requests
8
 
9
  from .config import MIN_ALERT_GAP_MIN, TELEGRAM_BOT_TOKEN_ENV, TELEGRAM_CHAT_ID_ENV
 
10
 
11
 
12
  class AlertDeduplicator:
@@ -54,18 +55,79 @@ def build_alert(
54
  }
55
 
56
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
57
  class TelegramNotifier:
58
  def __init__(self) -> None:
59
  self._token = os.getenv(TELEGRAM_BOT_TOKEN_ENV)
60
  self._chat_id = os.getenv(TELEGRAM_CHAT_ID_ENV)
61
 
62
- def send(self, alert: Dict) -> None:
63
  if not self._token or not self._chat_id:
64
  print(alert["message"])
65
- return
66
  url = f"https://api.telegram.org/bot{self._token}/sendMessage"
67
  payload = {"chat_id": self._chat_id, "text": alert["message"]}
68
  try:
69
- requests.post(url, json=payload, timeout=10)
 
70
  except Exception:
71
  print(alert["message"])
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
  from collections import defaultdict
2
+ from typing import Dict, List
3
  import time
4
  import uuid
5
  import os
 
7
  import requests
8
 
9
  from .config import MIN_ALERT_GAP_MIN, TELEGRAM_BOT_TOKEN_ENV, TELEGRAM_CHAT_ID_ENV
10
+ from .retry_utils import backoff_delay
11
 
12
 
13
  class AlertDeduplicator:
 
55
  }
56
 
57
 
58
+ def build_data_gap_alert(gap_event: Dict) -> Dict:
59
+ ticker = gap_event["ticker"]
60
+ gap_size = gap_event["gap_size"]
61
+ seq_from = gap_event["sequence_from"]
62
+ seq_to = gap_event["sequence_to"]
63
+ return {
64
+ "event_id": str(uuid.uuid4()),
65
+ "ts": int(time.time() * 1000),
66
+ "ticker": ticker,
67
+ "asset_class": "system",
68
+ "venue": "system",
69
+ "timeframe": "n/a",
70
+ "alert_type": "data_gap",
71
+ "delta_pct": 0.0,
72
+ "threshold": 0.0,
73
+ "cooldown_sec": 0,
74
+ "destination": "telegram",
75
+ "message": (
76
+ f"[DATA_GAP] {ticker} missing={gap_size} "
77
+ f"seq:{seq_from}->{seq_to}"
78
+ ),
79
+ }
80
+
81
+
82
  class TelegramNotifier:
83
  def __init__(self) -> None:
84
  self._token = os.getenv(TELEGRAM_BOT_TOKEN_ENV)
85
  self._chat_id = os.getenv(TELEGRAM_CHAT_ID_ENV)
86
 
87
+ def send(self, alert: Dict) -> bool:
88
  if not self._token or not self._chat_id:
89
  print(alert["message"])
90
+ return True
91
  url = f"https://api.telegram.org/bot{self._token}/sendMessage"
92
  payload = {"chat_id": self._chat_id, "text": alert["message"]}
93
  try:
94
+ resp = requests.post(url, json=payload, timeout=10)
95
+ return resp.status_code == 200
96
  except Exception:
97
  print(alert["message"])
98
+ return False
99
+
100
+
101
+ class AlertQueue:
102
+ def __init__(self, max_retries: int = 3, backoff_base_sec: int = 5) -> None:
103
+ self._max_retries = max_retries
104
+ self._backoff_base_sec = backoff_base_sec
105
+ self._items: List[Dict] = []
106
+
107
+ def enqueue(self, alert: Dict) -> None:
108
+ alert = dict(alert)
109
+ alert.setdefault("_attempts", 0)
110
+ alert.setdefault("_next_attempt_ts", 0.0)
111
+ self._items.append(alert)
112
+
113
+ def process(self, sender) -> None:
114
+ now = time.time()
115
+ remaining: List[Dict] = []
116
+ for alert in self._items:
117
+ if alert.get("_next_attempt_ts", 0.0) > now:
118
+ remaining.append(alert)
119
+ continue
120
+
121
+ ok = sender(alert)
122
+ if ok:
123
+ continue
124
+
125
+ attempts = int(alert.get("_attempts", 0)) + 1
126
+ if attempts > self._max_retries:
127
+ continue
128
+
129
+ alert["_attempts"] = attempts
130
+ alert["_next_attempt_ts"] = now + backoff_delay(self._backoff_base_sec, attempts)
131
+ remaining.append(alert)
132
+
133
+ self._items = remaining
src/core/config.py CHANGED
@@ -23,6 +23,9 @@ ASSET_CLASSES = ["equity", "crypto", "commodity"]
23
  # Adaptive threshold parameters
24
  MIN_WINDOW_POINTS = 5
25
 
 
 
 
26
  # Universe sources (no-auth)
27
  US_TICKER_SOURCES = {
28
  "us.nasdaq": "https://raw.githubusercontent.com/rreichel3/US-Stock-Symbols/main/nasdaq/nasdaq_tickers.txt",
@@ -65,3 +68,14 @@ COMMODITY_TICKERS = [
65
  # Telegram bot (no auth on load; uses env vars at runtime)
66
  TELEGRAM_BOT_TOKEN_ENV = "TELEGRAM_BOT_TOKEN"
67
  TELEGRAM_CHAT_ID_ENV = "TELEGRAM_CHAT_ID"
 
 
 
 
 
 
 
 
 
 
 
 
23
  # Adaptive threshold parameters
24
  MIN_WINDOW_POINTS = 5
25
 
26
+ # Topic taxonomy config
27
+ TOPICS_CONFIG_PATH = "configs/topics.yaml"
28
+
29
  # Universe sources (no-auth)
30
  US_TICKER_SOURCES = {
31
  "us.nasdaq": "https://raw.githubusercontent.com/rreichel3/US-Stock-Symbols/main/nasdaq/nasdaq_tickers.txt",
 
68
  # Telegram bot (no auth on load; uses env vars at runtime)
69
  TELEGRAM_BOT_TOKEN_ENV = "TELEGRAM_BOT_TOKEN"
70
  TELEGRAM_CHAT_ID_ENV = "TELEGRAM_CHAT_ID"
71
+
72
+ # yfinance backoff
73
+ YF_MAX_RETRIES = 3
74
+ YF_BACKOFF_BASE_SEC = 2
75
+
76
+ # Alert queue retries
77
+ ALERT_MAX_RETRIES = 3
78
+ ALERT_BACKOFF_BASE_SEC = 5
79
+
80
+ # Telemetry
81
+ TELEMETRY_REPORT_INTERVAL_SEC = 120
src/core/data_gap.py ADDED
@@ -0,0 +1,32 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import time
2
+ import uuid
3
+ from typing import Dict
4
+
5
+
6
+ class DataGapDetector:
7
+ def __init__(self, pubsub, topic: str) -> None:
8
+ self._pubsub = pubsub
9
+ self._topic = topic
10
+ self._last_seq = {}
11
+
12
+ def on_price_snapshot(self, price_event: Dict) -> None:
13
+ seq = price_event.get("sequence_id")
14
+ if seq is None:
15
+ return
16
+
17
+ for p in price_event.get("prices", []):
18
+ ticker = p.get("ticker")
19
+ if not ticker:
20
+ continue
21
+ last = self._last_seq.get(ticker)
22
+ if last is not None and seq - last > 1:
23
+ gap_event = {
24
+ "event_id": str(uuid.uuid4()),
25
+ "ts": int(time.time() * 1000),
26
+ "ticker": ticker,
27
+ "gap_size": int(seq - last - 1),
28
+ "sequence_from": int(last),
29
+ "sequence_to": int(seq),
30
+ }
31
+ self._pubsub.publish(self._topic, gap_event)
32
+ self._last_seq[ticker] = seq
src/core/main.py CHANGED
@@ -1,103 +1,35 @@
1
  import time
2
- import uuid
3
 
4
- from .config import TIMEFRAMES_MIN
 
 
 
 
 
5
  from .pubsub import PubSub
6
  from .universe_loader import UniverseLoader
7
  from .price_ingestor import PriceIngestor
8
- from .price_buffer import PriceBuffer
9
- from .price_delta import price_delta, velocity_pct_per_min
10
- from .event_detector import adaptive_threshold, detect_event, severity
11
- from .alerting import AlertDeduplicator, classify, build_alert, TelegramNotifier
 
 
12
 
13
 
14
- TOPICS = {
15
- "universe": "universe.updated",
16
- "prices": "prices.snapshot",
17
- "delta": "price.delta.calculated",
18
- "event": "event.detected",
19
- "alert": "alert.ready",
20
- }
21
-
22
-
23
- def run_once():
24
- pubsub = PubSub()
25
- buffer = PriceBuffer()
26
- dedupe = AlertDeduplicator()
27
- notifier = TelegramNotifier()
28
-
29
- pubsub.subscribe(TOPICS["alert"], notifier.send)
30
-
31
- loader = UniverseLoader()
32
- universe = loader.load()
33
-
34
  for venue_key, tickers in universe.items():
35
  if not tickers:
36
  continue
37
  asset_class = _infer_asset_class(venue_key)
38
  venue = venue_key
39
- pubsub.publish(TOPICS["universe"], loader.build_event(asset_class, venue, tickers))
40
 
41
  ingestor = PriceIngestor(venue=venue, asset_class=asset_class)
42
  price_event = ingestor.fetch_prices(tickers)
43
- pubsub.publish(TOPICS["prices"], price_event)
44
-
45
- for p in price_event["prices"]:
46
- buffer.update(p["ticker"], p["price"])
47
 
48
- for ticker in tickers:
49
- for tf, minutes in TIMEFRAMES_MIN.items():
50
- window = buffer.window(ticker, tf)
51
- delta = price_delta(window)
52
- if not delta:
53
- continue
54
- delta_pct = delta["delta_pct"]
55
- vel = velocity_pct_per_min(delta_pct, minutes)
56
-
57
- delta_event = {
58
- "event_id": str(uuid.uuid4()),
59
- "ts": int(time.time() * 1000),
60
- "ticker": ticker,
61
- "asset_class": asset_class,
62
- "venue": venue,
63
- "timeframe": tf,
64
- **delta,
65
- "velocity_pct_per_min": vel,
66
- "window_points": len(window),
67
- }
68
- pubsub.publish(TOPICS["delta"], delta_event)
69
-
70
- thr = adaptive_threshold(window)
71
- if thr is None:
72
- continue
73
- if detect_event(delta_pct, thr):
74
- if not dedupe.allow(ticker, tf):
75
- continue
76
- event = {
77
- "event_id": str(uuid.uuid4()),
78
- "ts": int(time.time() * 1000),
79
- "ticker": ticker,
80
- "asset_class": asset_class,
81
- "venue": venue,
82
- "timeframe": tf,
83
- "delta_pct": delta_pct,
84
- "threshold": thr,
85
- "threshold_type": "adaptive",
86
- "severity": severity(delta_pct, thr),
87
- }
88
- pubsub.publish(TOPICS["event"], event)
89
-
90
- alert_type = classify(minutes)
91
- alert = build_alert(
92
- ticker,
93
- asset_class,
94
- venue,
95
- tf,
96
- alert_type,
97
- delta_pct,
98
- thr,
99
- )
100
- pubsub.publish(TOPICS["alert"], alert)
101
 
102
 
103
 
@@ -110,6 +42,30 @@ def _infer_asset_class(venue_key: str) -> str:
110
 
111
 
112
  if __name__ == "__main__":
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
113
  while True:
114
- run_once()
115
- time.sleep(60)
 
1
  import time
 
2
 
3
+ from .config import (
4
+ POLL_INTERVAL_SEC,
5
+ ALERT_MAX_RETRIES,
6
+ ALERT_BACKOFF_BASE_SEC,
7
+ TELEMETRY_REPORT_INTERVAL_SEC,
8
+ )
9
  from .pubsub import PubSub
10
  from .universe_loader import UniverseLoader
11
  from .price_ingestor import PriceIngestor
12
+ from .alerting import TelegramNotifier, AlertQueue, build_data_gap_alert
13
+ from .risk_engine import RiskEngine
14
+ from .risk_store import RiskEnvelopeStore
15
+ from .data_gap import DataGapDetector
16
+ from .topics import TOPICS
17
+ from .telemetry import Telemetry
18
 
19
 
20
+ def run_once(pubsub, loader, universe, queue, notifier):
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
21
  for venue_key, tickers in universe.items():
22
  if not tickers:
23
  continue
24
  asset_class = _infer_asset_class(venue_key)
25
  venue = venue_key
26
+ pubsub.publish(TOPICS["universe_updated"], loader.build_event(asset_class, venue, tickers))
27
 
28
  ingestor = PriceIngestor(venue=venue, asset_class=asset_class)
29
  price_event = ingestor.fetch_prices(tickers)
30
+ pubsub.publish(TOPICS["prices_snapshot"], price_event)
 
 
 
31
 
32
+ queue.process(notifier.send)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
33
 
34
 
35
 
 
42
 
43
 
44
  if __name__ == "__main__":
45
+ pubsub = PubSub()
46
+ notifier = TelegramNotifier()
47
+ queue = AlertQueue(max_retries=ALERT_MAX_RETRIES, backoff_base_sec=ALERT_BACKOFF_BASE_SEC)
48
+ telemetry = Telemetry(report_interval_sec=TELEMETRY_REPORT_INTERVAL_SEC)
49
+
50
+ envelope_store = RiskEnvelopeStore()
51
+ risk = RiskEngine(pubsub, envelope_store)
52
+ gap_detector = DataGapDetector(pubsub, TOPICS["data_gap"])
53
+
54
+ pubsub.subscribe(TOPICS["alert_ready"], queue.enqueue)
55
+ pubsub.subscribe(TOPICS["alert_ready"], telemetry.on_alert_enqueued)
56
+ pubsub.subscribe(TOPICS["prices_snapshot"], risk.on_price_snapshot)
57
+ pubsub.subscribe(TOPICS["prices_snapshot"], gap_detector.on_price_snapshot)
58
+ pubsub.subscribe(TOPICS["risk_event"], telemetry.on_risk_event)
59
+ pubsub.subscribe(TOPICS["risk_regime_change"], telemetry.on_regime_change)
60
+ pubsub.subscribe(TOPICS["data_gap"], telemetry.on_data_gap)
61
+ pubsub.subscribe(TOPICS["data_gap"], lambda event: pubsub.publish(TOPICS["alert_ready"], build_data_gap_alert(event)))
62
+ pubsub.subscribe(TOPICS["universe_snapshot"], risk.on_universe_snapshot)
63
+
64
+ loader = UniverseLoader()
65
+ universe = loader.load()
66
+ snapshot = loader.build_snapshot(universe)
67
+ pubsub.publish(TOPICS["universe_snapshot"], snapshot)
68
+
69
  while True:
70
+ run_once(pubsub, loader, universe, queue, notifier)
71
+ time.sleep(POLL_INTERVAL_SEC)
src/core/price_ingestor.py CHANGED
@@ -2,8 +2,15 @@ from concurrent.futures import ProcessPoolExecutor, as_completed
2
  from typing import Dict, List
3
  import time
4
  import uuid
 
5
 
6
- from .config import BATCH_SIZE, PROCESS_POOL_WORKERS
 
 
 
 
 
 
7
 
8
 
9
  def _fetch_chunk(tickers: List[str]) -> Dict[str, Dict]:
@@ -13,14 +20,24 @@ def _fetch_chunk(tickers: List[str]) -> Dict[str, Dict]:
13
  if not tickers:
14
  return {}
15
 
16
- data = yf.download(
17
- tickers=" ".join(tickers),
18
- period="1d",
19
- interval="1m",
20
- group_by="ticker",
21
- progress=False,
22
- threads=False,
23
- )
 
 
 
 
 
 
 
 
 
 
24
 
25
  result: Dict[str, Dict] = {}
26
 
@@ -64,11 +81,17 @@ class PriceIngestor:
64
  with ProcessPoolExecutor(max_workers=PROCESS_POOL_WORKERS) as pool:
65
  futures = [pool.submit(_fetch_chunk, chunk) for chunk in chunks]
66
  for f in as_completed(futures):
67
- prices.update(f.result())
 
 
 
 
 
68
 
69
  return {
70
  "event_id": str(uuid.uuid4()),
71
  "ts": int(time.time() * 1000),
 
72
  "asset_class": self.asset_class,
73
  "venue": self.venue,
74
  "is_delayed": False,
@@ -81,3 +104,6 @@ class PriceIngestor:
81
  for t, p in prices.items()
82
  ],
83
  }
 
 
 
 
2
  from typing import Dict, List
3
  import time
4
  import uuid
5
+ import itertools
6
 
7
+ from .config import (
8
+ BATCH_SIZE,
9
+ PROCESS_POOL_WORKERS,
10
+ YF_MAX_RETRIES,
11
+ YF_BACKOFF_BASE_SEC,
12
+ )
13
+ from .retry_utils import call_with_backoff
14
 
15
 
16
  def _fetch_chunk(tickers: List[str]) -> Dict[str, Dict]:
 
20
  if not tickers:
21
  return {}
22
 
23
+ def _download():
24
+ return yf.download(
25
+ tickers=" ".join(tickers),
26
+ period="1d",
27
+ interval="1m",
28
+ group_by="ticker",
29
+ progress=False,
30
+ threads=False,
31
+ )
32
+
33
+ try:
34
+ data = call_with_backoff(
35
+ _download,
36
+ max_attempts=YF_MAX_RETRIES,
37
+ base_delay_sec=YF_BACKOFF_BASE_SEC,
38
+ )
39
+ except Exception:
40
+ return {}
41
 
42
  result: Dict[str, Dict] = {}
43
 
 
81
  with ProcessPoolExecutor(max_workers=PROCESS_POOL_WORKERS) as pool:
82
  futures = [pool.submit(_fetch_chunk, chunk) for chunk in chunks]
83
  for f in as_completed(futures):
84
+ try:
85
+ prices.update(f.result())
86
+ except Exception:
87
+ continue
88
+
89
+ sequence_id = next(_SEQ)
90
 
91
  return {
92
  "event_id": str(uuid.uuid4()),
93
  "ts": int(time.time() * 1000),
94
+ "sequence_id": sequence_id,
95
  "asset_class": self.asset_class,
96
  "venue": self.venue,
97
  "is_delayed": False,
 
104
  for t, p in prices.items()
105
  ],
106
  }
107
+
108
+
109
+ _SEQ = itertools.count(1)
src/core/retry_utils.py ADDED
@@ -0,0 +1,28 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import time
2
+ from typing import Callable, TypeVar
3
+
4
+
5
+ T = TypeVar("T")
6
+
7
+
8
+ def backoff_delay(base_delay_sec: float, attempt: int) -> float:
9
+ if attempt <= 0:
10
+ return 0.0
11
+ return base_delay_sec * (2 ** (attempt - 1))
12
+
13
+
14
+ def call_with_backoff(fn: Callable[[], T], max_attempts: int, base_delay_sec: float) -> T:
15
+ if max_attempts < 1:
16
+ raise ValueError("max_attempts must be >= 1")
17
+
18
+ last_error = None
19
+ for attempt in range(1, max_attempts + 1):
20
+ try:
21
+ return fn()
22
+ except Exception as exc: # noqa: BLE001
23
+ last_error = exc
24
+ if attempt >= max_attempts:
25
+ break
26
+ time.sleep(backoff_delay(base_delay_sec, attempt))
27
+
28
+ raise RuntimeError("retry attempts exhausted") from last_error
src/core/risk_engine.py ADDED
@@ -0,0 +1,203 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import time
2
+ import uuid
3
+ from typing import Dict
4
+
5
+ from .config import TIMEFRAMES_MIN
6
+ from .price_buffer import PriceBuffer
7
+ from .price_delta import price_delta, velocity_pct_per_min
8
+ from .event_detector import adaptive_threshold, detect_event, severity
9
+ from .alerting import AlertDeduplicator, classify, build_alert
10
+ from .topics import TOPICS
11
+ from .schemas import RiskEnvelope, RiskRegimeChangeEvent
12
+ from .risk_store import RiskEnvelopeStore
13
+ from .risk_regime import RiskRegimeStateMachine
14
+
15
+
16
+ class RiskEngine:
17
+ def __init__(self, pubsub, envelope_store: RiskEnvelopeStore) -> None:
18
+ self._pubsub = pubsub
19
+ self._buffer = PriceBuffer()
20
+ self._dedupe = AlertDeduplicator()
21
+ self._envelope_store = envelope_store
22
+ self._regime = RiskRegimeStateMachine()
23
+ self._universe_id = "unknown"
24
+ self._last_regime = self._regime.state
25
+
26
+ def on_universe_snapshot(self, universe_snapshot: Dict) -> None:
27
+ self._universe_id = universe_snapshot.get("universe_id", "unknown")
28
+
29
+ def on_price_snapshot(self, price_event: Dict) -> None:
30
+ asset_class = price_event.get("asset_class", "equity")
31
+ venue = price_event.get("venue", "unknown")
32
+
33
+ worst_drop = 0.0
34
+ high_severity_count = 0
35
+
36
+ for price_point in price_event.get("prices", []):
37
+ self._buffer.update(price_point["ticker"], price_point["price"])
38
+
39
+ for price_point in price_event.get("prices", []):
40
+ ticker = price_point["ticker"]
41
+ for timeframe, minutes in TIMEFRAMES_MIN.items():
42
+ window = self._buffer.window(ticker, timeframe)
43
+ delta = price_delta(window)
44
+ if not delta:
45
+ continue
46
+
47
+ delta_pct = delta["delta_pct"]
48
+ worst_drop = min(worst_drop, delta_pct)
49
+
50
+ delta_event = {
51
+ "event_id": str(uuid.uuid4()),
52
+ "ts": int(time.time() * 1000),
53
+ "ticker": ticker,
54
+ "asset_class": asset_class,
55
+ "venue": venue,
56
+ "timeframe": timeframe,
57
+ **delta,
58
+ "velocity_pct_per_min": velocity_pct_per_min(delta_pct, minutes),
59
+ "window_points": len(window),
60
+ }
61
+ self._pubsub.publish(TOPICS["price_delta"], delta_event)
62
+
63
+ threshold = adaptive_threshold(window)
64
+ if threshold is None:
65
+ continue
66
+
67
+ if not detect_event(delta_pct, threshold):
68
+ continue
69
+
70
+ if not self._dedupe.allow(ticker, timeframe):
71
+ continue
72
+
73
+ event_severity = severity(delta_pct, threshold)
74
+ if event_severity == "high":
75
+ high_severity_count += 1
76
+
77
+ risk_event = {
78
+ "event_id": str(uuid.uuid4()),
79
+ "ts": int(time.time() * 1000),
80
+ "ticker": ticker,
81
+ "asset_class": asset_class,
82
+ "venue": venue,
83
+ "timeframe": timeframe,
84
+ "delta_pct": delta_pct,
85
+ "threshold": threshold,
86
+ "threshold_type": "adaptive",
87
+ "severity": event_severity,
88
+ }
89
+ self._pubsub.publish(TOPICS["risk_event"], risk_event)
90
+
91
+ alert = build_alert(
92
+ ticker=ticker,
93
+ asset_class=asset_class,
94
+ venue=venue,
95
+ timeframe=timeframe,
96
+ alert_type=classify(minutes),
97
+ delta_pct=delta_pct,
98
+ threshold=threshold,
99
+ )
100
+ self._pubsub.publish(TOPICS["alert_ready"], alert)
101
+
102
+ regime, changed = self._regime.update(
103
+ worst_drop_pct=worst_drop,
104
+ high_severity_count=high_severity_count,
105
+ )
106
+ if changed:
107
+ regime_event = self._build_regime_change_event(
108
+ from_regime=self._last_regime,
109
+ to_regime=regime,
110
+ worst_drop_pct=worst_drop,
111
+ high_severity_count=high_severity_count,
112
+ )
113
+ self._pubsub.publish(TOPICS["risk_regime_change"], regime_event)
114
+ self._last_regime = regime
115
+
116
+ envelope = self._build_envelope(regime=regime)
117
+ self._envelope_store.set(envelope)
118
+ self._pubsub.publish(TOPICS["risk_envelope"], envelope)
119
+
120
+ def _build_regime_change_event(
121
+ self,
122
+ from_regime: str,
123
+ to_regime: str,
124
+ worst_drop_pct: float,
125
+ high_severity_count: int,
126
+ ) -> RiskRegimeChangeEvent:
127
+ return {
128
+ "event_id": str(uuid.uuid4()),
129
+ "ts": int(time.time() * 1000),
130
+ "universe_id": self._universe_id,
131
+ "from_regime": from_regime,
132
+ "to_regime": to_regime,
133
+ "reason": (
134
+ f"worst_drop_pct={worst_drop_pct:.4f};"
135
+ f"high_severity_count={high_severity_count}"
136
+ ),
137
+ "confidence": self._regime_confidence(to_regime),
138
+ }
139
+
140
+ @staticmethod
141
+ def _regime_confidence(regime: str) -> float:
142
+ if regime == "panic":
143
+ return 0.95
144
+ if regime == "stress":
145
+ return 0.85
146
+ if regime == "tension":
147
+ return 0.75
148
+ return 0.6
149
+
150
+ def _build_envelope(self, regime: str) -> RiskEnvelope:
151
+ now_ms = int(time.time() * 1000)
152
+ if regime == "panic":
153
+ allowed = False
154
+ max_position = 0.0
155
+ max_order = 0.0
156
+ max_leverage = 0.0
157
+ max_turnover = 0.0
158
+ cooldown = 300
159
+ confidence = 0.95
160
+ reason = "PANIC_REGIME"
161
+ elif regime == "stress":
162
+ allowed = True
163
+ max_position = 250_000.0
164
+ max_order = 25_000.0
165
+ max_leverage = 0.5
166
+ max_turnover = 500_000.0
167
+ cooldown = 120
168
+ confidence = 0.85
169
+ reason = "STRESS_REGIME"
170
+ elif regime == "tension":
171
+ allowed = True
172
+ max_position = 500_000.0
173
+ max_order = 50_000.0
174
+ max_leverage = 0.8
175
+ max_turnover = 1_000_000.0
176
+ cooldown = 60
177
+ confidence = 0.75
178
+ reason = "TENSION_REGIME"
179
+ else:
180
+ allowed = True
181
+ max_position = 1_000_000.0
182
+ max_order = 100_000.0
183
+ max_leverage = 1.0
184
+ max_turnover = 5_000_000.0
185
+ cooldown = 30
186
+ confidence = 0.6
187
+ reason = "NORMAL_REGIME"
188
+
189
+ return {
190
+ "envelope_id": str(uuid.uuid4()),
191
+ "universe_id": self._universe_id,
192
+ "allowed": allowed,
193
+ "max_position_usd": max_position,
194
+ "max_order_usd": max_order,
195
+ "max_leverage": max_leverage,
196
+ "max_turnover_per_hour": max_turnover,
197
+ "cooldown_seconds": cooldown,
198
+ "regime": regime,
199
+ "confidence": confidence,
200
+ "reason": reason,
201
+ "valid_from_ms": now_ms,
202
+ "valid_until_ms": now_ms + (cooldown * 1000),
203
+ }
src/core/risk_regime.py ADDED
@@ -0,0 +1,57 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from dataclasses import dataclass
2
+ from typing import Tuple
3
+
4
+
5
+ @dataclass(frozen=True)
6
+ class RegimeConfig:
7
+ tension_drop_pct: float = -3.0
8
+ stress_drop_pct: float = -5.0
9
+ panic_drop_pct: float = -8.0
10
+ calm_snapshots_to_downgrade: int = 3
11
+
12
+
13
+ class RiskRegimeStateMachine:
14
+ def __init__(self, config: RegimeConfig | None = None) -> None:
15
+ self._cfg = config or RegimeConfig()
16
+ self._state = "normal"
17
+ self._calm_streak = 0
18
+
19
+ @property
20
+ def state(self) -> str:
21
+ return self._state
22
+
23
+ def update(self, worst_drop_pct: float, high_severity_count: int) -> Tuple[str, bool]:
24
+ prev = self._state
25
+ target = self._state
26
+
27
+ if worst_drop_pct <= self._cfg.panic_drop_pct or high_severity_count >= 3:
28
+ target = "panic"
29
+ elif worst_drop_pct <= self._cfg.stress_drop_pct or high_severity_count >= 2:
30
+ target = "stress"
31
+ elif worst_drop_pct <= self._cfg.tension_drop_pct or high_severity_count >= 1:
32
+ target = "tension"
33
+ else:
34
+ target = "normal"
35
+
36
+ if target == self._state:
37
+ if target == "normal":
38
+ self._calm_streak = 0
39
+ return self._state, False
40
+
41
+ if self._rank(target) > self._rank(self._state):
42
+ self._state = target
43
+ self._calm_streak = 0
44
+ return self._state, True
45
+
46
+ self._calm_streak += 1
47
+ if self._calm_streak >= self._cfg.calm_snapshots_to_downgrade:
48
+ self._state = target
49
+ self._calm_streak = 0
50
+ return self._state, True
51
+
52
+ return prev, False
53
+
54
+ @staticmethod
55
+ def _rank(state: str) -> int:
56
+ order = {"normal": 0, "tension": 1, "stress": 2, "panic": 3}
57
+ return order.get(state, 0)
src/core/risk_store.py ADDED
@@ -0,0 +1,20 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import time
2
+ from typing import Optional
3
+
4
+ from .schemas import RiskEnvelope
5
+
6
+
7
+ class RiskEnvelopeStore:
8
+ def __init__(self) -> None:
9
+ self._envelope: Optional[RiskEnvelope] = None
10
+
11
+ def set(self, envelope: RiskEnvelope) -> None:
12
+ self._envelope = envelope
13
+
14
+ def get(self) -> Optional[RiskEnvelope]:
15
+ if not self._envelope:
16
+ return None
17
+ now_ms = int(time.time() * 1000)
18
+ if now_ms > self._envelope["valid_until_ms"]:
19
+ return None
20
+ return self._envelope
src/core/schemas.py ADDED
@@ -0,0 +1,53 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from typing import TypedDict, List, Dict, Optional
2
+
3
+
4
+ class Instrument(TypedDict):
5
+ symbol: str
6
+ exchange: str
7
+ asset_class: str
8
+ base_asset: Optional[str]
9
+ quote_asset: Optional[str]
10
+ metadata: Dict[str, str]
11
+
12
+
13
+ class UniverseSnapshot(TypedDict):
14
+ universe_id: str
15
+ version: str
16
+ generated_at_ms: int
17
+ source: str
18
+ instruments: List[Instrument]
19
+
20
+
21
+ class RiskEnvelope(TypedDict):
22
+ envelope_id: str
23
+ universe_id: str
24
+ allowed: bool
25
+ max_position_usd: float
26
+ max_order_usd: float
27
+ max_leverage: float
28
+ max_turnover_per_hour: float
29
+ cooldown_seconds: int
30
+ regime: str
31
+ confidence: float
32
+ reason: str
33
+ valid_from_ms: int
34
+ valid_until_ms: int
35
+
36
+
37
+ class RiskRegimeChangeEvent(TypedDict):
38
+ event_id: str
39
+ ts: int
40
+ universe_id: str
41
+ from_regime: str
42
+ to_regime: str
43
+ reason: str
44
+ confidence: float
45
+
46
+
47
+ class DataGapEvent(TypedDict):
48
+ event_id: str
49
+ ts: int
50
+ ticker: str
51
+ gap_size: int
52
+ sequence_from: int
53
+ sequence_to: int
src/core/telemetry.py ADDED
@@ -0,0 +1,44 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import time
2
+ from collections import defaultdict
3
+ from typing import Dict
4
+
5
+
6
+ class Telemetry:
7
+ def __init__(self, report_interval_sec: int = 120) -> None:
8
+ self._counters = defaultdict(int)
9
+ self._report_interval_sec = report_interval_sec
10
+ self._last_report_ts = time.time()
11
+
12
+ def on_data_gap(self, gap_event: Dict) -> None:
13
+ self._counters["data_gap_events"] += 1
14
+ self._counters[f"data_gap_ticker:{gap_event.get('ticker', 'unknown')}"] += 1
15
+ self._maybe_report()
16
+
17
+ def on_risk_event(self, risk_event: Dict) -> None:
18
+ self._counters["risk_events"] += 1
19
+ sev = risk_event.get("severity", "unknown")
20
+ self._counters[f"risk_severity:{sev}"] += 1
21
+ self._maybe_report()
22
+
23
+ def on_alert_enqueued(self, _alert: Dict) -> None:
24
+ self._counters["alerts_enqueued"] += 1
25
+ self._maybe_report()
26
+
27
+ def on_regime_change(self, event: Dict) -> None:
28
+ self._counters["risk_regime_changes"] += 1
29
+ to_regime = event.get("to_regime", "unknown")
30
+ self._counters[f"risk_regime_to:{to_regime}"] += 1
31
+ self._maybe_report()
32
+
33
+ def report(self) -> None:
34
+ if not self._counters:
35
+ return
36
+ summary = " ".join(f"{k}={v}" for k, v in sorted(self._counters.items()))
37
+ print(f"[TELEMETRY] {summary}")
38
+
39
+ def _maybe_report(self) -> None:
40
+ now = time.time()
41
+ if now - self._last_report_ts < self._report_interval_sec:
42
+ return
43
+ self._last_report_ts = now
44
+ self.report()
src/core/topics.py ADDED
@@ -0,0 +1,44 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ from typing import Dict
3
+
4
+ import yaml
5
+
6
+ from .config import TOPICS_CONFIG_PATH
7
+
8
+
9
+ _DEFAULT_TOPICS: Dict[str, str] = {
10
+ "universe_snapshot": "market.universe.snapshot",
11
+ "universe_updated": "market.universe.updated",
12
+ "prices_snapshot": "market.prices.snapshot",
13
+ "price_delta": "market.price.delta",
14
+ "risk_event": "risk.event.detected",
15
+ "risk_regime_change": "risk.regime.change",
16
+ "risk_envelope": "risk.envelope.updated",
17
+ "data_gap": "data.gap.detected",
18
+ "alert_ready": "system.alert.ready",
19
+ }
20
+
21
+
22
+ def _resolve_path(path: str) -> str:
23
+ if os.path.isabs(path):
24
+ return path
25
+ base_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", ".."))
26
+ return os.path.join(base_dir, path)
27
+
28
+
29
+ def load_topics(path: str) -> Dict[str, str]:
30
+ path = _resolve_path(path)
31
+ if not os.path.exists(path):
32
+ return dict(_DEFAULT_TOPICS)
33
+ try:
34
+ with open(path, "r", encoding="utf-8") as f:
35
+ data = yaml.safe_load(f) or {}
36
+ topics = data.get("topics", {})
37
+ merged = dict(_DEFAULT_TOPICS)
38
+ merged.update({k: v for k, v in topics.items() if isinstance(v, str)})
39
+ return merged
40
+ except Exception:
41
+ return dict(_DEFAULT_TOPICS)
42
+
43
+
44
+ TOPICS = load_topics(TOPICS_CONFIG_PATH)
src/core/universe_loader.py CHANGED
@@ -11,6 +11,7 @@ from .config import (
11
  BINANCE_EXCHANGE_INFO,
12
  COMMODITY_TICKERS,
13
  )
 
14
 
15
 
16
  class UniverseLoader:
@@ -42,6 +43,30 @@ class UniverseLoader:
42
  "changes": {"added": tickers, "removed": []},
43
  }
44
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
45
  def _load_us_equities(self) -> Set[str]:
46
  tickers: Set[str] = set()
47
  for _, url in US_TICKER_SOURCES.items():
@@ -99,3 +124,23 @@ class UniverseLoader:
99
  except Exception:
100
  return tickers
101
  return tickers
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
11
  BINANCE_EXCHANGE_INFO,
12
  COMMODITY_TICKERS,
13
  )
14
+ from .schemas import UniverseSnapshot, Instrument
15
 
16
 
17
  class UniverseLoader:
 
43
  "changes": {"added": tickers, "removed": []},
44
  }
45
 
46
+ def build_snapshot(self, universe: Dict[str, List[str]]) -> UniverseSnapshot:
47
+ instruments: List[Instrument] = []
48
+ for venue_key, tickers in universe.items():
49
+ asset_class = self._infer_asset_class(venue_key)
50
+ for ticker in tickers:
51
+ instruments.append(
52
+ {
53
+ "symbol": ticker,
54
+ "exchange": venue_key,
55
+ "asset_class": asset_class,
56
+ "base_asset": self._base_asset(ticker, asset_class),
57
+ "quote_asset": self._quote_asset(ticker, asset_class),
58
+ "metadata": {},
59
+ }
60
+ )
61
+
62
+ return {
63
+ "universe_id": str(uuid.uuid4()),
64
+ "version": self._version,
65
+ "generated_at_ms": int(time.time() * 1000),
66
+ "source": "universe_loader",
67
+ "instruments": instruments,
68
+ }
69
+
70
  def _load_us_equities(self) -> Set[str]:
71
  tickers: Set[str] = set()
72
  for _, url in US_TICKER_SOURCES.items():
 
124
  except Exception:
125
  return tickers
126
  return tickers
127
+
128
+ @staticmethod
129
+ def _infer_asset_class(venue_key: str) -> str:
130
+ if "crypto" in venue_key:
131
+ return "crypto"
132
+ if "commodities" in venue_key:
133
+ return "commodity"
134
+ return "equity"
135
+
136
+ @staticmethod
137
+ def _base_asset(symbol: str, asset_class: str) -> str:
138
+ if asset_class == "crypto" and "-" in symbol:
139
+ return symbol.split("-")[0]
140
+ return symbol
141
+
142
+ @staticmethod
143
+ def _quote_asset(symbol: str, asset_class: str) -> str:
144
+ if asset_class == "crypto" and "-" in symbol:
145
+ return symbol.split("-")[1]
146
+ return "USD"