File size: 2,488 Bytes
12c8085 1aeb36c 12c8085 1aeb36c 12c8085 1aeb36c | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 | """Admin endpoints — live data refresh, agent replay triggers."""
from __future__ import annotations
import logging
from fastapi import APIRouter, Depends
from sqlalchemy.orm import Session
from app.core.config import get_settings
from app.core.db import get_db
log = logging.getLogger(__name__)
router = APIRouter()
@router.post("/refresh-live")
def refresh_live(db: Session = Depends(get_db)) -> dict:
"""Re-pull curve / filings / news from live sources on demand.
Only does work when USE_LIVE_DATA=true and the relevant keys/UA are set.
Returns a summary describing what was refreshed.
"""
from app.services.ingestion.live_refresh import refresh_live_data
settings = get_settings()
if not settings.use_live_data:
return {
"enabled": False,
"message": "USE_LIVE_DATA is false — set it to true and restart, or set it as a HF Space Variable.",
"curve_source": "seeded",
"news_source": "seeded",
"filings_source": "seeded",
}
summary = refresh_live_data(db)
summary["enabled"] = True
summary["curve_source"] = "FRED" if settings.live_curve_enabled else "seeded"
summary["news_source"] = (
settings.news_provider.upper() if settings.live_news_enabled else "seeded"
)
summary["filings_source"] = "SEC EDGAR" if settings.live_filings_enabled else "seeded"
summary["llm_provider"] = settings.llm_provider if settings.llm_enabled else "none"
return summary
@router.get("/status")
def live_status() -> dict:
"""Inspect the live-data configuration without revealing secrets."""
from app.services.scheduler import scheduler_status
s = get_settings()
return {
"use_live_data": s.use_live_data,
"llm_provider": s.llm_provider if s.llm_enabled else "none",
"llm_model": s.llm_model if s.llm_enabled else None,
"fred_configured": bool(s.fred_api_key),
"sec_configured": bool(s.sec_user_agent),
"news_provider": s.news_provider,
"news_configured": s.live_news_enabled,
"live_curve": s.live_curve_enabled,
"live_filings": s.live_filings_enabled,
"live_news": s.live_news_enabled,
"scheduler": scheduler_status(),
}
@router.get("/scheduler")
def scheduler_info() -> dict:
"""Detailed scheduler state — running, cron, last run, next run."""
from app.services.scheduler import scheduler_status
return scheduler_status()
|