from __future__ import annotations from typing import List, Optional, Dict, Any from pathlib import Path import json from fastapi import FastAPI from fastapi.responses import ORJSONResponse, HTMLResponse from fastapi.staticfiles import StaticFiles from pydantic import BaseModel, Field from .config import settings from .data_loader import load_feedback from .sql_service import SQLFeedbackService # FastAPI application for Feedback Analysis using SQL-based approach app = FastAPI( title="Feedback Analysis Agent", version="2.0.0", description="SQL-based feedback analysis system using LLM-generated queries", default_response_class=ORJSONResponse ) # Initialize SQL service lazily to avoid errors on startup if data is missing # This service handles all query processing using SQL-based approach sql_svc: Optional[SQLFeedbackService] = None try: sql_svc = SQLFeedbackService() print("SQL service initialized successfully", flush=True) except Exception as e: print(f"Warning: Could not initialize SQL service: {e}", flush=True) # Simple in-memory history persisted best-effort to `.query_history.json` history_file = Path(".query_history.json") history = [] if history_file.exists(): try: with history_file.open("r", encoding="utf-8") as f: history = json.load(f) except Exception: history = [] def save_history() -> None: """ Save query history to disk. This is a best-effort operation - if saving fails (e.g., disk full, permissions issue), the error is logged but doesn't break the main flow. History is stored in `.query_history.json`. """ global history try: # Ensure we're saving the current history # Use default=str to handle any non-serializable types with history_file.open("w", encoding="utf-8") as f: json.dump(history, f, ensure_ascii=False, indent=2, default=str) print(f"History saved successfully to {history_file.absolute()}: {len(history)} entries", flush=True) except Exception as e: # Log error but don't break main flow import traceback print(f"CRITICAL ERROR: Could not save history to {history_file}: {e}", flush=True) traceback.print_exc() # Try to save a simplified version try: simplified_history = [] for entry in history: simplified_entry = { "query": entry.get("query", ""), "summary": entry.get("response", {}).get("summary", ""), "timestamp": entry.get("timestamp", "") } simplified_history.append(simplified_entry) with history_file.open("w", encoding="utf-8") as f: json.dump(simplified_history, f, ensure_ascii=False, indent=2) print(f"Saved simplified history: {len(simplified_history)} entries", flush=True) except Exception as e2: print(f"Failed to save even simplified history: {e2}", flush=True) class QueryRequest(BaseModel): """ Request model for query endpoints. Attributes: query: The natural language question to analyze top_k: Number of results to return (kept for compatibility, not actively used) """ query: str = Field(..., example="תסווג את התלונות 5 סוגים") top_k: int = Field(5, example=5) class QueryResponse(BaseModel): """ Response model for legacy query endpoint (deprecated). Kept for backward compatibility but not actively used. """ query: str summary: Optional[str] results: Optional[List[Dict[str, Any]]] = None class SQLQueryResponse(BaseModel): """ Response model for SQL-based query endpoint. Attributes: query: The original user query summary: Final synthesized answer in natural language sql_queries: List of SQL queries that were generated and executed query_results: Results from each SQL query (as dictionaries for JSON serialization) visualizations: Optional list of visualization specifications for frontend rendering """ query: str summary: str sql_queries: List[str] query_results: List[Dict[str, Any]] visualizations: Optional[List[Dict[str, Any]]] = None @app.post("/health") def health() -> Dict[str, str]: """Healthcheck endpoint. Converted to POST so all endpoints consistently use JSON/POST semantics. """ return {"status": "ok"} @app.post("/query-sql", response_model=SQLQueryResponse) def query_sql(req: QueryRequest) -> SQLQueryResponse: """SQL-based question answering over feedback data. This endpoint uses a SQL-based approach: 1. LLM generates 1-5 SQL queries 2. Executes queries on feedback data 3. LLM synthesizes comprehensive answer 4. Returns answer with query results and visualizations """ global sql_svc if sql_svc is None: try: sql_svc = SQLFeedbackService() except Exception as e: return SQLQueryResponse( query=req.query, summary=f"שגיאה באתחול שירות SQL: {str(e)}. אנא ודא שקובץ feedback_transformed_2.csv קיים בתיקיית 0_preprocessing/.", sql_queries=[], query_results=[], visualizations=None ) try: result = sql_svc.analyze_query(req.query) # Convert query results to JSON-serializable format # Pandas DataFrames may contain numpy types that aren't JSON-serializable # This helper function converts them to native Python types def convert_to_python_type(val): """ Convert numpy types to native Python types for JSON serialization. FastAPI/Pydantic can't serialize numpy types directly, so we need to convert them. This function handles integers, floats, arrays, and NaN. """ import numpy as np import math # Handle NaN and None if val is None or (isinstance(val, float) and math.isnan(val)): return None if isinstance(val, (np.integer, np.int64, np.int32)): return int(val) elif isinstance(val, (np.floating, np.float64, np.float32)): if math.isnan(val): return None return float(val) elif isinstance(val, np.ndarray): return val.tolist() return val query_results = [] for qr in result.query_results: # Convert DataFrame to dict and clean numpy types records = [] if not qr.error and len(qr.result) > 0: for record in qr.result.to_dict('records'): cleaned_record = {k: convert_to_python_type(v) for k, v in record.items()} records.append(cleaned_record) query_results.append({ "query": qr.query, "result": records, "error": qr.error, "row_count": len(qr.result) if not qr.error else 0 }) # Save to history - ensure it's always saved BEFORE returning response # Use the already-converted query_results (which are JSON-serializable) global history try: history_entry = { "query": result.user_query, "response": { "summary": result.summary, "sql_queries": result.sql_queries, "query_results": query_results, # Already converted to JSON-serializable format above "visualizations": result.visualizations }, "timestamp": __import__("datetime").datetime.now().isoformat() } history.append(history_entry) print(f"History entry added to memory: {len(history)} entries. Query: {result.user_query[:50]}...", flush=True) # Save to disk immediately - use force save save_history() print(f"History saved to disk: {history_file.absolute()}", flush=True) except Exception as e: print(f"CRITICAL: Error saving history: {e}", flush=True) import traceback traceback.print_exc() # Don't fail the request, but log the error response = SQLQueryResponse( query=result.user_query, summary=result.summary, sql_queries=result.sql_queries, query_results=query_results, visualizations=result.visualizations ) return response except Exception as e: import traceback error_details = traceback.format_exc() print(f"Error in /query-sql endpoint: {error_details}", flush=True) return SQLQueryResponse( query=req.query, summary=f"שגיאה: {str(e)}. אנא בדוק את הלוגים לפרטים נוספים.", sql_queries=[], query_results=[], visualizations=None ) # Mount static files for a simple frontend if present # Frontend files are in 1_frontend/ directory # Calculate path relative to this file: 2_backend_llm/app/api.py -> root/1_frontend static_dir = Path(__file__).resolve().parent.parent.parent / "1_frontend" print(f"Looking for frontend at: {static_dir}", flush=True) print(f"Frontend exists: {static_dir.exists()}", flush=True) if static_dir.exists(): # Serve static assets under /static/* (so index.html can reference /static/app.js) app.mount("/static", StaticFiles(directory=str(static_dir)), name="static") print(f"Mounted static files from: {static_dir}", flush=True) else: print(f"WARNING: Frontend directory not found at {static_dir}", flush=True) @app.get("/") def root() -> HTMLResponse: """Serve the main index.html for the frontend.""" try: if not static_dir.exists(): return HTMLResponse( f"
Looking for: {static_dir}
Current working directory: {Path.cwd()}
", status_code=404 ) html = (static_dir / "index.html").read_text(encoding="utf-8") return HTMLResponse(html) except Exception as e: import traceback error_msg = traceback.format_exc() return HTMLResponse( f"Error: {str(e)}
{error_msg}",
status_code=404
)
@app.get("/history")
def get_history() -> Dict[str, Any]:
"""
Get query history.
Returns all previously asked questions and their responses.
History is persisted to `.query_history.json` and loaded on startup.
"""
global history
# Reload history from disk to ensure we have the latest
if history_file.exists():
try:
with history_file.open("r", encoding="utf-8") as f:
loaded_history = json.load(f)
history = loaded_history if isinstance(loaded_history, list) else []
print(f"History loaded from disk: {len(history)} entries", flush=True)
except Exception as e:
print(f"Error loading history from disk: {e}", flush=True)
import traceback
traceback.print_exc()
else:
print(f"History file does not exist: {history_file.absolute()}", flush=True)
print(f"Returning history: {len(history)} entries", flush=True)
return {"history": history}
@app.post("/history/clear")
def clear_history() -> Dict[str, Any]:
"""
Clear query history.
Removes all stored queries from memory and disk.
Useful for testing or privacy purposes.
"""
global history
history = []
save_history() # Persist the cleared state to disk
return {"status": "cleared"}