| """ |
| Shared singleton store that bridges pipeline output to API responses. |
| |
| After a pipeline run completes, call ``store.update(run_result)`` to populate |
| the store with real data. The API checks ``store.has_real_data`` and serves |
| from the store when True, falling back to synthetic demo data when False. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import logging |
| import threading |
| from typing import Any |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| class PipelineStore: |
| """Thread-safe singleton that holds the latest pipeline run results.""" |
|
|
| def __init__(self): |
| self.has_real_data = False |
| self.mandis: list[dict] = [] |
| self.market_prices: list[dict] = [] |
| self.price_forecasts: list[dict] = [] |
| self.sell_recommendations: list[dict] = [] |
| self.price_conflicts: list[dict] = [] |
| self.pipeline_runs: list[dict] = [] |
| self.stats: dict[str, Any] = {} |
|
|
| |
| self.raw_inputs: dict = {} |
| self.extracted_data: dict = {} |
| self.reconciliation_results: dict = {} |
| self.model_metrics: dict = {} |
| self.recommendation_reasoning: list[dict] = [] |
| self.rag_retrievals: list[dict] = [] |
|
|
| self._lock = threading.Lock() |
|
|
| def update(self, run_result: dict): |
| """Update the store with results from a pipeline run. |
| |
| Args: |
| run_result: dict with keys matching the store attributes. |
| """ |
| with self._lock: |
| try: |
| self.mandis = run_result.get("mandis", []) |
| self.market_prices = run_result.get("market_prices", []) |
| self.price_forecasts = run_result.get("price_forecasts", []) |
| sell_recs = run_result.get("sell_recommendations", []) |
| if isinstance(sell_recs, dict): |
| sell_recs = [sell_recs] |
| self.sell_recommendations = sell_recs |
| self.price_conflicts = run_result.get("price_conflicts", []) |
|
|
| |
| self.raw_inputs = run_result.get("raw_inputs", {}) |
| self.extracted_data = run_result.get("extracted_data", {}) |
| self.reconciliation_results = run_result.get("reconciliation_results", {}) |
| self.model_metrics = run_result.get("model_metrics", {}) |
| self.recommendation_reasoning = run_result.get("recommendation_reasoning", []) |
| self.rag_retrievals = run_result.get("rag_retrievals", []) |
|
|
| run_info = run_result.get("run_info", {}) |
| if run_info: |
| self.pipeline_runs.insert(0, run_info) |
| self.pipeline_runs = self.pipeline_runs[:50] |
|
|
| self.stats = self._build_stats(run_result) |
| self.has_real_data = True |
|
|
| logger.info( |
| "Store updated: %d mandis, %d prices, %d forecasts", |
| len(self.mandis), |
| len(self.market_prices), |
| len(self.price_forecasts), |
| ) |
| except Exception: |
| logger.exception("Failed to update store from pipeline") |
|
|
| def _build_stats(self, run_result: dict) -> dict: |
| """Build aggregate stats from run result.""" |
| runs = self.pipeline_runs |
| total_runs = len(runs) |
| successful = sum(1 for r in runs if r.get("status") == "ok") |
| total_cost = sum(r.get("total_cost_usd", 0) for r in runs) |
|
|
| price_conflicts = run_result.get("price_conflicts", []) |
| unresolved = sum( |
| 1 for c in price_conflicts |
| if c.get("resolution") == "unresolved" |
| ) |
|
|
| return { |
| "total_runs": total_runs, |
| "successful_runs": successful, |
| "success_rate": round(successful / max(1, total_runs), 2), |
| "mandis_monitored": len(run_result.get("mandis", [])), |
| "commodities_tracked": len(set( |
| p.get("commodity_id") for p in run_result.get("market_prices", []) |
| )), |
| "price_conflicts_found": len(price_conflicts), |
| "unresolved_conflicts": unresolved, |
| "total_cost_usd": round(total_cost, 2), |
| "avg_cost_per_run_usd": round(total_cost / max(1, total_runs), 4), |
| "last_run": runs[0].get("started_at") if runs else None, |
| "data_sources": ["Agmarknet (data.gov.in)", "eNAM", "NASA POWER"], |
| } |
|
|
|
|
| |
| store = PipelineStore() |
|
|