trace / server /app.py
Ayush
Update
4156f51
Raw
History Blame Contribute Delete
17.7 kB
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
"""
FastAPI application for the Trace Environment.
This module creates an HTTP server that exposes the TraceEnvironment
over HTTP and WebSocket endpoints, compatible with EnvClient.
Endpoints:
- POST /reset: Reset the environment
- POST /step: Execute an action
- GET /state: Get current environment state
- GET /schema: Get action/observation schemas
- WS /ws: WebSocket endpoint for persistent sessions
Usage:
# Development (with auto-reload):
uvicorn server.app:app --reload --host 0.0.0.0 --port 8000
# Production:
uvicorn server.app:app --host 0.0.0.0 --port 8000 --workers 4
# Or run directly:
python -m server.app
"""
import os
import sys
# Ensure project root is on sys.path for internal imports
_PROJECT_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
if _PROJECT_ROOT not in sys.path:
sys.path.insert(0, _PROJECT_ROOT)
from fastapi.responses import FileResponse, JSONResponse
from fastapi.staticfiles import StaticFiles
from openenv.core.env_server.http_server import create_app
from models import TraceAction, TraceObservation
from server.trace_environment import TraceEnvironment
# Create the app with web interface and README integration
app = create_app(
TraceEnvironment,
TraceAction,
TraceObservation,
env_name="trace",
max_concurrent_envs=1,
)
# Serve static assets
_STATIC_DIR = os.path.join(_PROJECT_ROOT, "static")
if os.path.isdir(_STATIC_DIR):
app.mount("/static", StaticFiles(directory=_STATIC_DIR), name="static")
@app.get("/", include_in_schema=False)
async def root():
index = os.path.join(_STATIC_DIR, "index.html")
if os.path.exists(index):
return FileResponse(index)
return JSONResponse({"status": "ok", "name": "Trace Environment", "version": "1.0"})
@app.get("/dashboard", include_in_schema=False)
async def dashboard(refresh: bool = False, prompt: str = None):
"""
Dynamically fetch Gmail transactions and render the live dashboard.
On first visit (or when cache expires / ?refresh=true), this endpoint:
1. Searches Gmail for financial emails (text-first pass)
2. Parses all transactions via transaction_parser
3. Renders a full HTML dashboard via dashboard_renderer
Results are cached for 10 minutes to avoid hammering the Gmail API.
"""
import time as _time
import logging as _logging
import anyio
_logger = _logging.getLogger("dashboard")
# ── In-memory cache (module-level via app.state) ─────────────────────
if not hasattr(app.state, "_dashboard_cache_dict"):
app.state._dashboard_cache_dict = {}
cache = app.state._dashboard_cache_dict
cache_ttl = 600 # 10 minutes
cache_key = prompt.strip().lower() if prompt else "default"
if (
not refresh
and cache_key in cache
and cache[cache_key].get("html")
and (_time.time() - cache[cache_key].get("timestamp", 0)) < cache_ttl
):
_logger.info(f"[DASHBOARD] Serving cached dashboard for '{cache_key}'")
from fastapi.responses import HTMLResponse
return HTMLResponse(content=cache[cache_key]["html"])
# ── Fetch and render live ────────────────────────────────────────────
def _build_dashboard() -> str:
from environments.trace_env.tools.gmail_tool import search_gmail_with_attachments
from environments.trace_env.tools.transaction_parser import parse_transactions_bulk
from environments.trace_env.tools.dashboard_renderer import render_dashboard
_logger.info(f"[DASHBOARD] Running _build_dashboard (Prompt: {prompt})")
# Step 1: Search Gmail for financial emails
query = (
"newer_than:180d "
"(receipt OR invoice OR payment OR transaction OR booking OR order "
"OR trip OR ride OR bill OR statement OR subscription OR recharge "
"OR GST OR tax invoice) "
"-category:promotions -in:chats"
)
_logger.info(f"[DASHBOARD] Fetching Gmail: {query[:80]}...")
emails = search_gmail_with_attachments(
query=query,
max_results=50,
analyse_images=False, # skip slow VLM β€” text parsing is enough
)
_logger.info(f"[DASHBOARD] Retrieved {len(emails)} emails from Gmail")
# Step 2: Parse Gmail transactions
parsed = parse_transactions_bulk(emails)
gmail_transactions = parsed.get("transactions", [])
gmail_count = len(gmail_transactions)
_logger.info(f"[DASHBOARD] Parsed {gmail_count} Gmail transactions")
# Step 3: Fetch Google Sheets historical data and merge
sheet_url = None
all_transactions = list(gmail_transactions) # start with Gmail
try:
from environments.trace_env.tools.sheets_tool import fetch_and_summarize
_logger.info("[DASHBOARD] Fetching Google Sheets historical data...")
sheets_summary = fetch_and_summarize()
if sheets_summary and sheets_summary.get("count", 0) > 0:
sheets_txs = sheets_summary.get("transactions", [])
sheet_url = sheets_summary.get("sheet_url")
# Deduplicate: only add Sheets rows not already in Gmail set
gmail_ids = {tx.get("id") for tx in gmail_transactions if tx.get("id")}
sheets_only = [
tx for tx in sheets_txs
if not tx.get("id") or tx["id"] not in gmail_ids
]
if sheets_only:
# Ensure Sheets transactions have category_config for renderer
from environments.trace_env.tools.transaction_parser import CATEGORY_CONFIG
for tx in sheets_only:
if "category_config" not in tx:
cat = tx.get("category", "unknown")
tx["category_config"] = CATEGORY_CONFIG.get(cat, CATEGORY_CONFIG["unknown"])
# Fill in fields the renderer expects
tx.setdefault("subject", tx.get("notes", ""))
tx.setdefault("from_email", "")
tx.setdefault("vendor", tx.get("vendor", "Unknown"))
tx.setdefault("amounts", [])
tx.setdefault("dates", [])
tx.setdefault("order_id", tx.get("order_id", ""))
tx.setdefault("details", {})
tx.setdefault("snippet", "")
tx.setdefault("body_preview", "")
tx.setdefault("image_analyses", [])
tx.setdefault("doc_analyses", [])
tx.setdefault("attachment_count", 0)
tx.setdefault("reimbursable", False)
tx.setdefault("payment_method", tx.get("payment_method", "Unknown"))
all_transactions.extend(sheets_only)
_logger.info(
f"[DASHBOARD] Sheets: {len(sheets_txs)} total, "
f"{len(sheets_only)} new (not in Gmail). "
f"Combined: {len(all_transactions)} transactions"
)
else:
_logger.info("[DASHBOARD] No data from Google Sheets (empty or unavailable)")
except Exception as e:
_logger.warning(f"[DASHBOARD] Sheets fetch failed (Gmail-only mode): {e}")
# Step 4: Apply prompt filter if any
if prompt:
def parse_prompt_with_groq(p: str):
import os, json, requests
from datetime import datetime
api_key = os.environ.get("GROQ_API_KEY")
if not api_key:
return None
headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"}
sys_prompt = (
f"You are a data parsing assistant. The current date is {datetime.now().strftime('%Y-%m-%d')}. "
"Extract the filtering constraints from the user prompt and return ONLY valid JSON with exactly "
"these keys: 'start_date' (YYYY-MM-DD or null), 'end_date' (YYYY-MM-DD or null), "
"'category' (string matching the intent or null), 'keywords' (list of strings for arbitrary text matching). "
"Calculate relative dates correctly based on the current date."
)
try:
res = requests.post("https://api.groq.com/openai/v1/chat/completions", headers=headers, json={
"model": "llama3-70b-8192",
"messages": [
{"role": "system", "content": sys_prompt},
{"role": "user", "content": p}
],
"response_format": {"type": "json_object"},
"temperature": 0.0
}, timeout=10)
res.raise_for_status()
return json.loads(res.json()["choices"][0]["message"]["content"])
except Exception as e:
_logger.error(f"[GROQ] Failed to parse prompt: {e}")
return None
groq_parsed = parse_prompt_with_groq(prompt)
p = prompt.lower()
start_date = None
end_date = None
kws = []
req_category = None
if groq_parsed:
_logger.info(f"[GROQ] Parsed prompt: {groq_parsed}")
start_date = groq_parsed.get("start_date")
end_date = groq_parsed.get("end_date")
req_category = groq_parsed.get("category")
if req_category:
req_category = req_category.lower()
kws = [k.lower() for k in groq_parsed.get("keywords", []) if k]
else:
_logger.info("[DASHBOARD] Using fallback regex parser")
import re
from datetime import datetime, timedelta
# Date range: "between 2023-01-01 and 2023-01-31"
between_match = re.search(r'between\s+(\d{4}-\d{2}-\d{2})\s+and\s+(\d{4}-\d{2}-\d{2})', p)
if between_match:
start_date = between_match.group(1)
end_date = between_match.group(2)
p = p.replace(between_match.group(0), "")
# "last X days"
last_days_match = re.search(r'last\s+(\d+)\s+days?', p)
if last_days_match:
days = int(last_days_match.group(1))
end_date = datetime.now().strftime("%Y-%m-%d")
start_date = (datetime.now() - timedelta(days=days)).strftime("%Y-%m-%d")
p = p.replace(last_days_match.group(0), "")
# "this month"
if "this month" in p:
now = datetime.now()
start_date = now.replace(day=1).strftime("%Y-%m-%d")
end_date = now.strftime("%Y-%m-%d")
p = p.replace("this month", "")
# "last month"
if "last month" in p:
now = datetime.now()
first_of_this_month = now.replace(day=1)
last_month = first_of_this_month - timedelta(days=1)
start_date = last_month.replace(day=1).strftime("%Y-%m-%d")
end_date = last_month.strftime("%Y-%m-%d")
p = p.replace("last month", "")
# Extract remaining keywords
stop_words = {'show', 'me', 'the', 'financial', 'records', 'from', 'dates', 'kind', 'of', 'for', 'in', 'and', 'provide', 'record'}
kws = [w.strip() for w in p.split() if w.strip() and w.strip() not in stop_words]
filtered = []
for t in all_transactions:
keep = True
t_date = t.get('date') or ""
t_date_ymd = t_date[:10] if len(t_date) >= 10 else ""
# Check date bounds
if start_date and t_date_ymd and t_date_ymd < start_date:
keep = False
if end_date and t_date_ymd and t_date_ymd > end_date:
keep = False
# Check category if specified
if req_category and keep:
if req_category not in str(t.get('category', '')).lower():
keep = False
# Check keywords against a concatenated text blob
if kws and keep:
text_blob = f"{t.get('vendor','')} {t.get('category','')} {t.get('subject','')} {t.get('notes','')} {t.get('payment_method','')}".lower()
if not any(k in text_blob for k in kws):
keep = False
if keep:
filtered.append(t)
_logger.info(f"[DASHBOARD] Filtered from {len(all_transactions)} down to {len(filtered)} using prompt.")
all_transactions = filtered
# Step 5: Rebuild combined summary
import re
total_spend = 0.0
by_category = {}
by_vendor = {}
for t in all_transactions:
total_str = t.get("total")
if total_str:
amount_str = re.sub(r'[^\d.]', '', str(total_str))
try:
amount = float(amount_str) if amount_str else 0.0
if amount > 0:
total_spend += amount
cat = t.get("category", "unknown")
vendor = t.get("vendor", "Unknown")
by_category[cat] = by_category.get(cat, 0) + amount
by_vendor[vendor] = by_vendor.get(vendor, 0) + amount
except ValueError:
pass
combined_summary = {
"total_spend": round(total_spend, 2),
"count": len(all_transactions),
"by_category": {k: round(v, 2) for k, v in sorted(by_category.items(), key=lambda x: -x[1])},
"by_vendor": {k: round(v, 2) for k, v in sorted(by_vendor.items(), key=lambda x: -x[1])},
}
final_data = {
"transactions": all_transactions,
"summary": combined_summary,
"prompt": prompt or ""
}
if sheet_url:
final_data["sheet_url"] = sheet_url
_logger.info(
f"[DASHBOARD] Final: {combined_summary['count']} transactions, "
f"β‚Ή{combined_summary['total_spend']:,.2f} total spend"
)
# Step 6: Render HTML dashboard
html = render_dashboard(final_data)
# Step 7: Also persist the file for offline use (only if no prompt)
if not prompt:
try:
from pathlib import Path
import os
Path(os.path.join(_PROJECT_ROOT, "all_financial_dashboard.html")).write_text(
html, encoding="utf-8"
)
except Exception:
pass
return html
try:
html = await anyio.to_thread.run_sync(_build_dashboard)
except Exception as e:
_logger.error(f"[DASHBOARD] Live generation failed: {e}", exc_info=True)
# Fallback: serve the static file if it exists
dashboard_path = os.path.join(_PROJECT_ROOT, "all_financial_dashboard.html")
if os.path.exists(dashboard_path):
from fastapi.responses import FileResponse
return FileResponse(dashboard_path)
from fastapi.responses import HTMLResponse
return HTMLResponse(
content=(
f"<html><body style='font-family:monospace;padding:40px'>"
f"<h2>⚠️ Dashboard generation failed</h2>"
f"<p>Error: {e}</p>"
f"<p>Check that Gmail credentials (GMAIL_TOKEN_B64, GCP_CREDENTIALS_B64) "
f"are configured in Hugging Face Secrets.</p>"
f"</body></html>"
),
status_code=500,
)
# ── Cache the result ─────────────────────────────────────────────────
app.state._dashboard_cache_dict[cache_key] = {
"html": html,
"timestamp": _time.time(),
}
from fastapi.responses import HTMLResponse
return HTMLResponse(content=html)
@app.get("/health", include_in_schema=False)
async def health():
return JSONResponse({"status": "ok"})
def main(host: str = "0.0.0.0", port: int = 8000):
"""
Entry point for direct execution via uv run or python -m.
This function enables running the server without Docker:
uv run --project . server
uv run --project . server --port 8001
python -m server.app
Args:
host: Host address to bind to (default: "0.0.0.0")
port: Port number to listen on (default: 8000)
For production deployments, consider using uvicorn directly with
multiple workers:
uvicorn server.app:app --workers 4
"""
import uvicorn
uvicorn.run(app, host=host, port=port)
if __name__ == "__main__":
main()