# Copyright (c) Meta Platforms, Inc. and affiliates. # All rights reserved. """ FastAPI application for the Trace Environment. This module creates an HTTP server that exposes the TraceEnvironment over HTTP and WebSocket endpoints, compatible with EnvClient. Endpoints: - POST /reset: Reset the environment - POST /step: Execute an action - GET /state: Get current environment state - GET /schema: Get action/observation schemas - WS /ws: WebSocket endpoint for persistent sessions Usage: # Development (with auto-reload): uvicorn server.app:app --reload --host 0.0.0.0 --port 8000 # Production: uvicorn server.app:app --host 0.0.0.0 --port 8000 --workers 4 # Or run directly: python -m server.app """ import os import sys # Ensure project root is on sys.path for internal imports _PROJECT_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), "..")) if _PROJECT_ROOT not in sys.path: sys.path.insert(0, _PROJECT_ROOT) from fastapi.responses import FileResponse, JSONResponse from fastapi.staticfiles import StaticFiles from openenv.core.env_server.http_server import create_app from models import TraceAction, TraceObservation from server.trace_environment import TraceEnvironment # Create the app with web interface and README integration app = create_app( TraceEnvironment, TraceAction, TraceObservation, env_name="trace", max_concurrent_envs=1, ) # Serve static assets _STATIC_DIR = os.path.join(_PROJECT_ROOT, "static") if os.path.isdir(_STATIC_DIR): app.mount("/static", StaticFiles(directory=_STATIC_DIR), name="static") @app.get("/", include_in_schema=False) async def root(): index = os.path.join(_STATIC_DIR, "index.html") if os.path.exists(index): return FileResponse(index) return JSONResponse({"status": "ok", "name": "Trace Environment", "version": "1.0"}) @app.get("/dashboard", include_in_schema=False) async def dashboard(refresh: bool = False, prompt: str = None): """ Dynamically fetch Gmail transactions and render the live dashboard. On first visit (or when cache expires / ?refresh=true), this endpoint: 1. Searches Gmail for financial emails (text-first pass) 2. Parses all transactions via transaction_parser 3. Renders a full HTML dashboard via dashboard_renderer Results are cached for 10 minutes to avoid hammering the Gmail API. """ import time as _time import logging as _logging import anyio _logger = _logging.getLogger("dashboard") # ── In-memory cache (module-level via app.state) ───────────────────── if not hasattr(app.state, "_dashboard_cache_dict"): app.state._dashboard_cache_dict = {} cache = app.state._dashboard_cache_dict cache_ttl = 600 # 10 minutes cache_key = prompt.strip().lower() if prompt else "default" if ( not refresh and cache_key in cache and cache[cache_key].get("html") and (_time.time() - cache[cache_key].get("timestamp", 0)) < cache_ttl ): _logger.info(f"[DASHBOARD] Serving cached dashboard for '{cache_key}'") from fastapi.responses import HTMLResponse return HTMLResponse(content=cache[cache_key]["html"]) # ── Fetch and render live ──────────────────────────────────────────── def _build_dashboard() -> str: from environments.trace_env.tools.gmail_tool import search_gmail_with_attachments from environments.trace_env.tools.transaction_parser import parse_transactions_bulk from environments.trace_env.tools.dashboard_renderer import render_dashboard _logger.info(f"[DASHBOARD] Running _build_dashboard (Prompt: {prompt})") # Step 1: Search Gmail for financial emails query = ( "newer_than:180d " "(receipt OR invoice OR payment OR transaction OR booking OR order " "OR trip OR ride OR bill OR statement OR subscription OR recharge " "OR GST OR tax invoice) " "-category:promotions -in:chats" ) _logger.info(f"[DASHBOARD] Fetching Gmail: {query[:80]}...") emails = search_gmail_with_attachments( query=query, max_results=50, analyse_images=False, # skip slow VLM — text parsing is enough ) _logger.info(f"[DASHBOARD] Retrieved {len(emails)} emails from Gmail") # Step 2: Parse Gmail transactions parsed = parse_transactions_bulk(emails) gmail_transactions = parsed.get("transactions", []) gmail_count = len(gmail_transactions) _logger.info(f"[DASHBOARD] Parsed {gmail_count} Gmail transactions") # Step 3: Fetch Google Sheets historical data and merge sheet_url = None all_transactions = list(gmail_transactions) # start with Gmail try: from environments.trace_env.tools.sheets_tool import fetch_and_summarize _logger.info("[DASHBOARD] Fetching Google Sheets historical data...") sheets_summary = fetch_and_summarize() if sheets_summary and sheets_summary.get("count", 0) > 0: sheets_txs = sheets_summary.get("transactions", []) sheet_url = sheets_summary.get("sheet_url") # Deduplicate: only add Sheets rows not already in Gmail set gmail_ids = {tx.get("id") for tx in gmail_transactions if tx.get("id")} sheets_only = [ tx for tx in sheets_txs if not tx.get("id") or tx["id"] not in gmail_ids ] if sheets_only: # Ensure Sheets transactions have category_config for renderer from environments.trace_env.tools.transaction_parser import CATEGORY_CONFIG for tx in sheets_only: if "category_config" not in tx: cat = tx.get("category", "unknown") tx["category_config"] = CATEGORY_CONFIG.get(cat, CATEGORY_CONFIG["unknown"]) # Fill in fields the renderer expects tx.setdefault("subject", tx.get("notes", "")) tx.setdefault("from_email", "") tx.setdefault("vendor", tx.get("vendor", "Unknown")) tx.setdefault("amounts", []) tx.setdefault("dates", []) tx.setdefault("order_id", tx.get("order_id", "")) tx.setdefault("details", {}) tx.setdefault("snippet", "") tx.setdefault("body_preview", "") tx.setdefault("image_analyses", []) tx.setdefault("doc_analyses", []) tx.setdefault("attachment_count", 0) tx.setdefault("reimbursable", False) tx.setdefault("payment_method", tx.get("payment_method", "Unknown")) all_transactions.extend(sheets_only) _logger.info( f"[DASHBOARD] Sheets: {len(sheets_txs)} total, " f"{len(sheets_only)} new (not in Gmail). " f"Combined: {len(all_transactions)} transactions" ) else: _logger.info("[DASHBOARD] No data from Google Sheets (empty or unavailable)") except Exception as e: _logger.warning(f"[DASHBOARD] Sheets fetch failed (Gmail-only mode): {e}") # Step 4: Apply prompt filter if any if prompt: def parse_prompt_with_groq(p: str): import os, json, requests from datetime import datetime api_key = os.environ.get("GROQ_API_KEY") if not api_key: return None headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"} sys_prompt = ( f"You are a data parsing assistant. The current date is {datetime.now().strftime('%Y-%m-%d')}. " "Extract the filtering constraints from the user prompt and return ONLY valid JSON with exactly " "these keys: 'start_date' (YYYY-MM-DD or null), 'end_date' (YYYY-MM-DD or null), " "'category' (string matching the intent or null), 'keywords' (list of strings for arbitrary text matching). " "Calculate relative dates correctly based on the current date." ) try: res = requests.post("https://api.groq.com/openai/v1/chat/completions", headers=headers, json={ "model": "llama3-70b-8192", "messages": [ {"role": "system", "content": sys_prompt}, {"role": "user", "content": p} ], "response_format": {"type": "json_object"}, "temperature": 0.0 }, timeout=10) res.raise_for_status() return json.loads(res.json()["choices"][0]["message"]["content"]) except Exception as e: _logger.error(f"[GROQ] Failed to parse prompt: {e}") return None groq_parsed = parse_prompt_with_groq(prompt) p = prompt.lower() start_date = None end_date = None kws = [] req_category = None if groq_parsed: _logger.info(f"[GROQ] Parsed prompt: {groq_parsed}") start_date = groq_parsed.get("start_date") end_date = groq_parsed.get("end_date") req_category = groq_parsed.get("category") if req_category: req_category = req_category.lower() kws = [k.lower() for k in groq_parsed.get("keywords", []) if k] else: _logger.info("[DASHBOARD] Using fallback regex parser") import re from datetime import datetime, timedelta # Date range: "between 2023-01-01 and 2023-01-31" between_match = re.search(r'between\s+(\d{4}-\d{2}-\d{2})\s+and\s+(\d{4}-\d{2}-\d{2})', p) if between_match: start_date = between_match.group(1) end_date = between_match.group(2) p = p.replace(between_match.group(0), "") # "last X days" last_days_match = re.search(r'last\s+(\d+)\s+days?', p) if last_days_match: days = int(last_days_match.group(1)) end_date = datetime.now().strftime("%Y-%m-%d") start_date = (datetime.now() - timedelta(days=days)).strftime("%Y-%m-%d") p = p.replace(last_days_match.group(0), "") # "this month" if "this month" in p: now = datetime.now() start_date = now.replace(day=1).strftime("%Y-%m-%d") end_date = now.strftime("%Y-%m-%d") p = p.replace("this month", "") # "last month" if "last month" in p: now = datetime.now() first_of_this_month = now.replace(day=1) last_month = first_of_this_month - timedelta(days=1) start_date = last_month.replace(day=1).strftime("%Y-%m-%d") end_date = last_month.strftime("%Y-%m-%d") p = p.replace("last month", "") # Extract remaining keywords stop_words = {'show', 'me', 'the', 'financial', 'records', 'from', 'dates', 'kind', 'of', 'for', 'in', 'and', 'provide', 'record'} kws = [w.strip() for w in p.split() if w.strip() and w.strip() not in stop_words] filtered = [] for t in all_transactions: keep = True t_date = t.get('date') or "" t_date_ymd = t_date[:10] if len(t_date) >= 10 else "" # Check date bounds if start_date and t_date_ymd and t_date_ymd < start_date: keep = False if end_date and t_date_ymd and t_date_ymd > end_date: keep = False # Check category if specified if req_category and keep: if req_category not in str(t.get('category', '')).lower(): keep = False # Check keywords against a concatenated text blob if kws and keep: text_blob = f"{t.get('vendor','')} {t.get('category','')} {t.get('subject','')} {t.get('notes','')} {t.get('payment_method','')}".lower() if not any(k in text_blob for k in kws): keep = False if keep: filtered.append(t) _logger.info(f"[DASHBOARD] Filtered from {len(all_transactions)} down to {len(filtered)} using prompt.") all_transactions = filtered # Step 5: Rebuild combined summary import re total_spend = 0.0 by_category = {} by_vendor = {} for t in all_transactions: total_str = t.get("total") if total_str: amount_str = re.sub(r'[^\d.]', '', str(total_str)) try: amount = float(amount_str) if amount_str else 0.0 if amount > 0: total_spend += amount cat = t.get("category", "unknown") vendor = t.get("vendor", "Unknown") by_category[cat] = by_category.get(cat, 0) + amount by_vendor[vendor] = by_vendor.get(vendor, 0) + amount except ValueError: pass combined_summary = { "total_spend": round(total_spend, 2), "count": len(all_transactions), "by_category": {k: round(v, 2) for k, v in sorted(by_category.items(), key=lambda x: -x[1])}, "by_vendor": {k: round(v, 2) for k, v in sorted(by_vendor.items(), key=lambda x: -x[1])}, } final_data = { "transactions": all_transactions, "summary": combined_summary, "prompt": prompt or "" } if sheet_url: final_data["sheet_url"] = sheet_url _logger.info( f"[DASHBOARD] Final: {combined_summary['count']} transactions, " f"₹{combined_summary['total_spend']:,.2f} total spend" ) # Step 6: Render HTML dashboard html = render_dashboard(final_data) # Step 7: Also persist the file for offline use (only if no prompt) if not prompt: try: from pathlib import Path import os Path(os.path.join(_PROJECT_ROOT, "all_financial_dashboard.html")).write_text( html, encoding="utf-8" ) except Exception: pass return html try: html = await anyio.to_thread.run_sync(_build_dashboard) except Exception as e: _logger.error(f"[DASHBOARD] Live generation failed: {e}", exc_info=True) # Fallback: serve the static file if it exists dashboard_path = os.path.join(_PROJECT_ROOT, "all_financial_dashboard.html") if os.path.exists(dashboard_path): from fastapi.responses import FileResponse return FileResponse(dashboard_path) from fastapi.responses import HTMLResponse return HTMLResponse( content=( f"" f"

⚠️ Dashboard generation failed

" f"

Error: {e}

" f"

Check that Gmail credentials (GMAIL_TOKEN_B64, GCP_CREDENTIALS_B64) " f"are configured in Hugging Face Secrets.

" f"" ), status_code=500, ) # ── Cache the result ───────────────────────────────────────────────── app.state._dashboard_cache_dict[cache_key] = { "html": html, "timestamp": _time.time(), } from fastapi.responses import HTMLResponse return HTMLResponse(content=html) @app.get("/health", include_in_schema=False) async def health(): return JSONResponse({"status": "ok"}) def main(host: str = "0.0.0.0", port: int = 8000): """ Entry point for direct execution via uv run or python -m. This function enables running the server without Docker: uv run --project . server uv run --project . server --port 8001 python -m server.app Args: host: Host address to bind to (default: "0.0.0.0") port: Port number to listen on (default: 8000) For production deployments, consider using uvicorn directly with multiple workers: uvicorn server.app:app --workers 4 """ import uvicorn uvicorn.run(app, host=host, port=port) if __name__ == "__main__": main()