Spaces:
Sleeping
Sleeping
| # Copyright (c) Meta Platforms, Inc. and affiliates. | |
| # All rights reserved. | |
| """ | |
| FastAPI application for the Trace Environment. | |
| This module creates an HTTP server that exposes the TraceEnvironment | |
| over HTTP and WebSocket endpoints, compatible with EnvClient. | |
| Endpoints: | |
| - POST /reset: Reset the environment | |
| - POST /step: Execute an action | |
| - GET /state: Get current environment state | |
| - GET /schema: Get action/observation schemas | |
| - WS /ws: WebSocket endpoint for persistent sessions | |
| Usage: | |
| # Development (with auto-reload): | |
| uvicorn server.app:app --reload --host 0.0.0.0 --port 8000 | |
| # Production: | |
| uvicorn server.app:app --host 0.0.0.0 --port 8000 --workers 4 | |
| # Or run directly: | |
| python -m server.app | |
| """ | |
| import os | |
| import sys | |
| # Ensure project root is on sys.path for internal imports | |
| _PROJECT_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), "..")) | |
| if _PROJECT_ROOT not in sys.path: | |
| sys.path.insert(0, _PROJECT_ROOT) | |
| from fastapi.responses import FileResponse, JSONResponse | |
| from fastapi.staticfiles import StaticFiles | |
| from openenv.core.env_server.http_server import create_app | |
| from models import TraceAction, TraceObservation | |
| from server.trace_environment import TraceEnvironment | |
| # Create the app with web interface and README integration | |
| app = create_app( | |
| TraceEnvironment, | |
| TraceAction, | |
| TraceObservation, | |
| env_name="trace", | |
| max_concurrent_envs=1, | |
| ) | |
| # Serve static assets | |
| _STATIC_DIR = os.path.join(_PROJECT_ROOT, "static") | |
| if os.path.isdir(_STATIC_DIR): | |
| app.mount("/static", StaticFiles(directory=_STATIC_DIR), name="static") | |
| async def root(): | |
| index = os.path.join(_STATIC_DIR, "index.html") | |
| if os.path.exists(index): | |
| return FileResponse(index) | |
| return JSONResponse({"status": "ok", "name": "Trace Environment", "version": "1.0"}) | |
| async def dashboard(refresh: bool = False, prompt: str = None): | |
| """ | |
| Dynamically fetch Gmail transactions and render the live dashboard. | |
| On first visit (or when cache expires / ?refresh=true), this endpoint: | |
| 1. Searches Gmail for financial emails (text-first pass) | |
| 2. Parses all transactions via transaction_parser | |
| 3. Renders a full HTML dashboard via dashboard_renderer | |
| Results are cached for 10 minutes to avoid hammering the Gmail API. | |
| """ | |
| import time as _time | |
| import logging as _logging | |
| import anyio | |
| _logger = _logging.getLogger("dashboard") | |
| # ββ In-memory cache (module-level via app.state) βββββββββββββββββββββ | |
| if not hasattr(app.state, "_dashboard_cache_dict"): | |
| app.state._dashboard_cache_dict = {} | |
| cache = app.state._dashboard_cache_dict | |
| cache_ttl = 600 # 10 minutes | |
| cache_key = prompt.strip().lower() if prompt else "default" | |
| if ( | |
| not refresh | |
| and cache_key in cache | |
| and cache[cache_key].get("html") | |
| and (_time.time() - cache[cache_key].get("timestamp", 0)) < cache_ttl | |
| ): | |
| _logger.info(f"[DASHBOARD] Serving cached dashboard for '{cache_key}'") | |
| from fastapi.responses import HTMLResponse | |
| return HTMLResponse(content=cache[cache_key]["html"]) | |
| # ββ Fetch and render live ββββββββββββββββββββββββββββββββββββββββββββ | |
| def _build_dashboard() -> str: | |
| from environments.trace_env.tools.gmail_tool import search_gmail_with_attachments | |
| from environments.trace_env.tools.transaction_parser import parse_transactions_bulk | |
| from environments.trace_env.tools.dashboard_renderer import render_dashboard | |
| _logger.info(f"[DASHBOARD] Running _build_dashboard (Prompt: {prompt})") | |
| # Step 1: Search Gmail for financial emails | |
| query = ( | |
| "newer_than:180d " | |
| "(receipt OR invoice OR payment OR transaction OR booking OR order " | |
| "OR trip OR ride OR bill OR statement OR subscription OR recharge " | |
| "OR GST OR tax invoice) " | |
| "-category:promotions -in:chats" | |
| ) | |
| _logger.info(f"[DASHBOARD] Fetching Gmail: {query[:80]}...") | |
| emails = search_gmail_with_attachments( | |
| query=query, | |
| max_results=50, | |
| analyse_images=False, # skip slow VLM β text parsing is enough | |
| ) | |
| _logger.info(f"[DASHBOARD] Retrieved {len(emails)} emails from Gmail") | |
| # Step 2: Parse Gmail transactions | |
| parsed = parse_transactions_bulk(emails) | |
| gmail_transactions = parsed.get("transactions", []) | |
| gmail_count = len(gmail_transactions) | |
| _logger.info(f"[DASHBOARD] Parsed {gmail_count} Gmail transactions") | |
| # Step 3: Fetch Google Sheets historical data and merge | |
| sheet_url = None | |
| all_transactions = list(gmail_transactions) # start with Gmail | |
| try: | |
| from environments.trace_env.tools.sheets_tool import fetch_and_summarize | |
| _logger.info("[DASHBOARD] Fetching Google Sheets historical data...") | |
| sheets_summary = fetch_and_summarize() | |
| if sheets_summary and sheets_summary.get("count", 0) > 0: | |
| sheets_txs = sheets_summary.get("transactions", []) | |
| sheet_url = sheets_summary.get("sheet_url") | |
| # Deduplicate: only add Sheets rows not already in Gmail set | |
| gmail_ids = {tx.get("id") for tx in gmail_transactions if tx.get("id")} | |
| sheets_only = [ | |
| tx for tx in sheets_txs | |
| if not tx.get("id") or tx["id"] not in gmail_ids | |
| ] | |
| if sheets_only: | |
| # Ensure Sheets transactions have category_config for renderer | |
| from environments.trace_env.tools.transaction_parser import CATEGORY_CONFIG | |
| for tx in sheets_only: | |
| if "category_config" not in tx: | |
| cat = tx.get("category", "unknown") | |
| tx["category_config"] = CATEGORY_CONFIG.get(cat, CATEGORY_CONFIG["unknown"]) | |
| # Fill in fields the renderer expects | |
| tx.setdefault("subject", tx.get("notes", "")) | |
| tx.setdefault("from_email", "") | |
| tx.setdefault("vendor", tx.get("vendor", "Unknown")) | |
| tx.setdefault("amounts", []) | |
| tx.setdefault("dates", []) | |
| tx.setdefault("order_id", tx.get("order_id", "")) | |
| tx.setdefault("details", {}) | |
| tx.setdefault("snippet", "") | |
| tx.setdefault("body_preview", "") | |
| tx.setdefault("image_analyses", []) | |
| tx.setdefault("doc_analyses", []) | |
| tx.setdefault("attachment_count", 0) | |
| tx.setdefault("reimbursable", False) | |
| tx.setdefault("payment_method", tx.get("payment_method", "Unknown")) | |
| all_transactions.extend(sheets_only) | |
| _logger.info( | |
| f"[DASHBOARD] Sheets: {len(sheets_txs)} total, " | |
| f"{len(sheets_only)} new (not in Gmail). " | |
| f"Combined: {len(all_transactions)} transactions" | |
| ) | |
| else: | |
| _logger.info("[DASHBOARD] No data from Google Sheets (empty or unavailable)") | |
| except Exception as e: | |
| _logger.warning(f"[DASHBOARD] Sheets fetch failed (Gmail-only mode): {e}") | |
| # Step 4: Apply prompt filter if any | |
| if prompt: | |
| def parse_prompt_with_groq(p: str): | |
| import os, json, requests | |
| from datetime import datetime | |
| api_key = os.environ.get("GROQ_API_KEY") | |
| if not api_key: | |
| return None | |
| headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"} | |
| sys_prompt = ( | |
| f"You are a data parsing assistant. The current date is {datetime.now().strftime('%Y-%m-%d')}. " | |
| "Extract the filtering constraints from the user prompt and return ONLY valid JSON with exactly " | |
| "these keys: 'start_date' (YYYY-MM-DD or null), 'end_date' (YYYY-MM-DD or null), " | |
| "'category' (string matching the intent or null), 'keywords' (list of strings for arbitrary text matching). " | |
| "Calculate relative dates correctly based on the current date." | |
| ) | |
| try: | |
| res = requests.post("https://api.groq.com/openai/v1/chat/completions", headers=headers, json={ | |
| "model": "llama3-70b-8192", | |
| "messages": [ | |
| {"role": "system", "content": sys_prompt}, | |
| {"role": "user", "content": p} | |
| ], | |
| "response_format": {"type": "json_object"}, | |
| "temperature": 0.0 | |
| }, timeout=10) | |
| res.raise_for_status() | |
| return json.loads(res.json()["choices"][0]["message"]["content"]) | |
| except Exception as e: | |
| _logger.error(f"[GROQ] Failed to parse prompt: {e}") | |
| return None | |
| groq_parsed = parse_prompt_with_groq(prompt) | |
| p = prompt.lower() | |
| start_date = None | |
| end_date = None | |
| kws = [] | |
| req_category = None | |
| if groq_parsed: | |
| _logger.info(f"[GROQ] Parsed prompt: {groq_parsed}") | |
| start_date = groq_parsed.get("start_date") | |
| end_date = groq_parsed.get("end_date") | |
| req_category = groq_parsed.get("category") | |
| if req_category: | |
| req_category = req_category.lower() | |
| kws = [k.lower() for k in groq_parsed.get("keywords", []) if k] | |
| else: | |
| _logger.info("[DASHBOARD] Using fallback regex parser") | |
| import re | |
| from datetime import datetime, timedelta | |
| # Date range: "between 2023-01-01 and 2023-01-31" | |
| between_match = re.search(r'between\s+(\d{4}-\d{2}-\d{2})\s+and\s+(\d{4}-\d{2}-\d{2})', p) | |
| if between_match: | |
| start_date = between_match.group(1) | |
| end_date = between_match.group(2) | |
| p = p.replace(between_match.group(0), "") | |
| # "last X days" | |
| last_days_match = re.search(r'last\s+(\d+)\s+days?', p) | |
| if last_days_match: | |
| days = int(last_days_match.group(1)) | |
| end_date = datetime.now().strftime("%Y-%m-%d") | |
| start_date = (datetime.now() - timedelta(days=days)).strftime("%Y-%m-%d") | |
| p = p.replace(last_days_match.group(0), "") | |
| # "this month" | |
| if "this month" in p: | |
| now = datetime.now() | |
| start_date = now.replace(day=1).strftime("%Y-%m-%d") | |
| end_date = now.strftime("%Y-%m-%d") | |
| p = p.replace("this month", "") | |
| # "last month" | |
| if "last month" in p: | |
| now = datetime.now() | |
| first_of_this_month = now.replace(day=1) | |
| last_month = first_of_this_month - timedelta(days=1) | |
| start_date = last_month.replace(day=1).strftime("%Y-%m-%d") | |
| end_date = last_month.strftime("%Y-%m-%d") | |
| p = p.replace("last month", "") | |
| # Extract remaining keywords | |
| stop_words = {'show', 'me', 'the', 'financial', 'records', 'from', 'dates', 'kind', 'of', 'for', 'in', 'and', 'provide', 'record'} | |
| kws = [w.strip() for w in p.split() if w.strip() and w.strip() not in stop_words] | |
| filtered = [] | |
| for t in all_transactions: | |
| keep = True | |
| t_date = t.get('date') or "" | |
| t_date_ymd = t_date[:10] if len(t_date) >= 10 else "" | |
| # Check date bounds | |
| if start_date and t_date_ymd and t_date_ymd < start_date: | |
| keep = False | |
| if end_date and t_date_ymd and t_date_ymd > end_date: | |
| keep = False | |
| # Check category if specified | |
| if req_category and keep: | |
| if req_category not in str(t.get('category', '')).lower(): | |
| keep = False | |
| # Check keywords against a concatenated text blob | |
| if kws and keep: | |
| text_blob = f"{t.get('vendor','')} {t.get('category','')} {t.get('subject','')} {t.get('notes','')} {t.get('payment_method','')}".lower() | |
| if not any(k in text_blob for k in kws): | |
| keep = False | |
| if keep: | |
| filtered.append(t) | |
| _logger.info(f"[DASHBOARD] Filtered from {len(all_transactions)} down to {len(filtered)} using prompt.") | |
| all_transactions = filtered | |
| # Step 5: Rebuild combined summary | |
| import re | |
| total_spend = 0.0 | |
| by_category = {} | |
| by_vendor = {} | |
| for t in all_transactions: | |
| total_str = t.get("total") | |
| if total_str: | |
| amount_str = re.sub(r'[^\d.]', '', str(total_str)) | |
| try: | |
| amount = float(amount_str) if amount_str else 0.0 | |
| if amount > 0: | |
| total_spend += amount | |
| cat = t.get("category", "unknown") | |
| vendor = t.get("vendor", "Unknown") | |
| by_category[cat] = by_category.get(cat, 0) + amount | |
| by_vendor[vendor] = by_vendor.get(vendor, 0) + amount | |
| except ValueError: | |
| pass | |
| combined_summary = { | |
| "total_spend": round(total_spend, 2), | |
| "count": len(all_transactions), | |
| "by_category": {k: round(v, 2) for k, v in sorted(by_category.items(), key=lambda x: -x[1])}, | |
| "by_vendor": {k: round(v, 2) for k, v in sorted(by_vendor.items(), key=lambda x: -x[1])}, | |
| } | |
| final_data = { | |
| "transactions": all_transactions, | |
| "summary": combined_summary, | |
| "prompt": prompt or "" | |
| } | |
| if sheet_url: | |
| final_data["sheet_url"] = sheet_url | |
| _logger.info( | |
| f"[DASHBOARD] Final: {combined_summary['count']} transactions, " | |
| f"βΉ{combined_summary['total_spend']:,.2f} total spend" | |
| ) | |
| # Step 6: Render HTML dashboard | |
| html = render_dashboard(final_data) | |
| # Step 7: Also persist the file for offline use (only if no prompt) | |
| if not prompt: | |
| try: | |
| from pathlib import Path | |
| import os | |
| Path(os.path.join(_PROJECT_ROOT, "all_financial_dashboard.html")).write_text( | |
| html, encoding="utf-8" | |
| ) | |
| except Exception: | |
| pass | |
| return html | |
| try: | |
| html = await anyio.to_thread.run_sync(_build_dashboard) | |
| except Exception as e: | |
| _logger.error(f"[DASHBOARD] Live generation failed: {e}", exc_info=True) | |
| # Fallback: serve the static file if it exists | |
| dashboard_path = os.path.join(_PROJECT_ROOT, "all_financial_dashboard.html") | |
| if os.path.exists(dashboard_path): | |
| from fastapi.responses import FileResponse | |
| return FileResponse(dashboard_path) | |
| from fastapi.responses import HTMLResponse | |
| return HTMLResponse( | |
| content=( | |
| f"<html><body style='font-family:monospace;padding:40px'>" | |
| f"<h2>β οΈ Dashboard generation failed</h2>" | |
| f"<p>Error: {e}</p>" | |
| f"<p>Check that Gmail credentials (GMAIL_TOKEN_B64, GCP_CREDENTIALS_B64) " | |
| f"are configured in Hugging Face Secrets.</p>" | |
| f"</body></html>" | |
| ), | |
| status_code=500, | |
| ) | |
| # ββ Cache the result βββββββββββββββββββββββββββββββββββββββββββββββββ | |
| app.state._dashboard_cache_dict[cache_key] = { | |
| "html": html, | |
| "timestamp": _time.time(), | |
| } | |
| from fastapi.responses import HTMLResponse | |
| return HTMLResponse(content=html) | |
| async def health(): | |
| return JSONResponse({"status": "ok"}) | |
| def main(host: str = "0.0.0.0", port: int = 8000): | |
| """ | |
| Entry point for direct execution via uv run or python -m. | |
| This function enables running the server without Docker: | |
| uv run --project . server | |
| uv run --project . server --port 8001 | |
| python -m server.app | |
| Args: | |
| host: Host address to bind to (default: "0.0.0.0") | |
| port: Port number to listen on (default: 8000) | |
| For production deployments, consider using uvicorn directly with | |
| multiple workers: | |
| uvicorn server.app:app --workers 4 | |
| """ | |
| import uvicorn | |
| uvicorn.run(app, host=host, port=port) | |
| if __name__ == "__main__": | |
| main() | |