Spaces:
Sleeping
Sleeping
| """ | |
| 🇬🇧 Module: metrics.py | |
| Purpose: Provides async utilities to fetch and display portfolio metrics as a DataFrame. | |
| 🇷🇺 Модуль: metrics.py | |
| Назначение: предоставляет асинхронные функции для получения и отображения метрик портфеля в виде DataFrame. | |
| """ | |
| import pandas as pd | |
| from infrastructure.cache import CacheUnavailableError | |
| from infrastructure.output_api import extract_portfolio_id, fetch_metrics_cached | |
| def show_metrics_table(portfolio_input: str): | |
| """Fetch portfolio metrics and return them as a DataFrame for Gradio.""" | |
| pid = extract_portfolio_id(portfolio_input) | |
| if not pid: | |
| return _message_df("❌ Invalid portfolioId format.") | |
| try: | |
| df = _get_metrics_df(pid) | |
| return df | |
| except CacheUnavailableError as e: | |
| wait = int(e.retry_in) + 1 | |
| return _message_df(f"⚠️ Metrics API cooling down. Retry in ~{wait} seconds.") | |
| except Exception: | |
| return _message_df("❌ Error fetching metrics. Please try again later.") | |
| def _get_metrics_df(portfolio_id: str) -> pd.DataFrame: | |
| """Internal helper to get metrics with caching.""" | |
| metrics = fetch_metrics_cached(portfolio_id) | |
| if not metrics: | |
| raise ValueError("No metrics found for given portfolio.") | |
| df = pd.DataFrame(list(metrics.items()), columns=["Metric", "Value"]) | |
| return df | |
| def _message_df(message: str) -> pd.DataFrame: | |
| return pd.DataFrame({"Message": [message]}) | |