| | from __future__ import annotations |
| |
|
| | from typing import List, Optional, Dict, Any |
| | from pathlib import Path |
| | import json |
| |
|
| | from fastapi import FastAPI |
| | from fastapi.responses import ORJSONResponse, HTMLResponse |
| | from fastapi.staticfiles import StaticFiles |
| | from pydantic import BaseModel, Field |
| |
|
| | from .config import settings |
| | from .data_loader import load_feedback |
| | from .sql_service import SQLFeedbackService |
| |
|
| | |
| | app = FastAPI( |
| | title="Feedback Analysis Agent", |
| | version="2.0.0", |
| | description="SQL-based feedback analysis system using LLM-generated queries", |
| | default_response_class=ORJSONResponse |
| | ) |
| |
|
| | |
| | |
| | sql_svc: Optional[SQLFeedbackService] = None |
| | try: |
| | sql_svc = SQLFeedbackService() |
| | print("SQL service initialized successfully", flush=True) |
| | except Exception as e: |
| | print(f"Warning: Could not initialize SQL service: {e}", flush=True) |
| |
|
| | |
| | history_file = Path(".query_history.json") |
| | history = [] |
| | if history_file.exists(): |
| | try: |
| | with history_file.open("r", encoding="utf-8") as f: |
| | history = json.load(f) |
| | except Exception: |
| | history = [] |
| |
|
| |
|
| | def save_history() -> None: |
| | """ |
| | Save query history to disk. |
| | |
| | This is a best-effort operation - if saving fails (e.g., disk full, |
| | permissions issue), the error is logged but doesn't break the main flow. |
| | History is stored in `.query_history.json`. |
| | """ |
| | global history |
| | try: |
| | |
| | |
| | with history_file.open("w", encoding="utf-8") as f: |
| | json.dump(history, f, ensure_ascii=False, indent=2, default=str) |
| | print(f"History saved successfully to {history_file.absolute()}: {len(history)} entries", flush=True) |
| | except Exception as e: |
| | |
| | import traceback |
| | print(f"CRITICAL ERROR: Could not save history to {history_file}: {e}", flush=True) |
| | traceback.print_exc() |
| | |
| | try: |
| | simplified_history = [] |
| | for entry in history: |
| | simplified_entry = { |
| | "query": entry.get("query", ""), |
| | "summary": entry.get("response", {}).get("summary", ""), |
| | "timestamp": entry.get("timestamp", "") |
| | } |
| | simplified_history.append(simplified_entry) |
| | with history_file.open("w", encoding="utf-8") as f: |
| | json.dump(simplified_history, f, ensure_ascii=False, indent=2) |
| | print(f"Saved simplified history: {len(simplified_history)} entries", flush=True) |
| | except Exception as e2: |
| | print(f"Failed to save even simplified history: {e2}", flush=True) |
| |
|
| |
|
| | class QueryRequest(BaseModel): |
| | """ |
| | Request model for query endpoints. |
| | |
| | Attributes: |
| | query: The natural language question to analyze |
| | top_k: Number of results to return (kept for compatibility, not actively used) |
| | """ |
| | query: str = Field(..., example="转住讜讜讙 讗转 讛转诇讜谞讜转 5 住讜讙讬诐") |
| | top_k: int = Field(5, example=5) |
| |
|
| |
|
| | class QueryResponse(BaseModel): |
| | """ |
| | Response model for legacy query endpoint (deprecated). |
| | |
| | Kept for backward compatibility but not actively used. |
| | """ |
| | query: str |
| | summary: Optional[str] |
| | results: Optional[List[Dict[str, Any]]] = None |
| |
|
| |
|
| | class SQLQueryResponse(BaseModel): |
| | """ |
| | Response model for SQL-based query endpoint. |
| | |
| | Attributes: |
| | query: The original user query |
| | summary: Final synthesized answer in natural language |
| | sql_queries: List of SQL queries that were generated and executed |
| | query_results: Results from each SQL query (as dictionaries for JSON serialization) |
| | visualizations: Optional list of visualization specifications for frontend rendering |
| | """ |
| | query: str |
| | summary: str |
| | sql_queries: List[str] |
| | query_results: List[Dict[str, Any]] |
| | visualizations: Optional[List[Dict[str, Any]]] = None |
| |
|
| |
|
| | @app.post("/health") |
| | def health() -> Dict[str, str]: |
| | """Healthcheck endpoint. |
| | |
| | Converted to POST so all endpoints consistently use JSON/POST semantics. |
| | """ |
| | return {"status": "ok"} |
| |
|
| |
|
| | @app.post("/query-sql", response_model=SQLQueryResponse) |
| | def query_sql(req: QueryRequest) -> SQLQueryResponse: |
| | """SQL-based question answering over feedback data. |
| | |
| | This endpoint uses a SQL-based approach: |
| | 1. LLM generates 1-5 SQL queries |
| | 2. Executes queries on feedback data |
| | 3. LLM synthesizes comprehensive answer |
| | 4. Returns answer with query results and visualizations |
| | """ |
| | global sql_svc |
| | if sql_svc is None: |
| | try: |
| | sql_svc = SQLFeedbackService() |
| | except Exception as e: |
| | return SQLQueryResponse( |
| | query=req.query, |
| | summary=f"砖讙讬讗讛 讘讗转讞讜诇 砖讬专讜转 SQL: {str(e)}. 讗谞讗 讜讚讗 砖拽讜讘抓 feedback_transformed_2.csv 拽讬讬诐 讘转讬拽讬讬转 0_preprocessing/.", |
| | sql_queries=[], |
| | query_results=[], |
| | visualizations=None |
| | ) |
| | |
| | try: |
| | result = sql_svc.analyze_query(req.query) |
| | |
| | |
| | |
| | |
| | def convert_to_python_type(val): |
| | """ |
| | Convert numpy types to native Python types for JSON serialization. |
| | |
| | FastAPI/Pydantic can't serialize numpy types directly, so we need |
| | to convert them. This function handles integers, floats, arrays, and NaN. |
| | """ |
| | import numpy as np |
| | import math |
| | |
| | if val is None or (isinstance(val, float) and math.isnan(val)): |
| | return None |
| | if isinstance(val, (np.integer, np.int64, np.int32)): |
| | return int(val) |
| | elif isinstance(val, (np.floating, np.float64, np.float32)): |
| | if math.isnan(val): |
| | return None |
| | return float(val) |
| | elif isinstance(val, np.ndarray): |
| | return val.tolist() |
| | return val |
| | |
| | query_results = [] |
| | for qr in result.query_results: |
| | |
| | records = [] |
| | if not qr.error and len(qr.result) > 0: |
| | for record in qr.result.to_dict('records'): |
| | cleaned_record = {k: convert_to_python_type(v) for k, v in record.items()} |
| | records.append(cleaned_record) |
| | |
| | query_results.append({ |
| | "query": qr.query, |
| | "result": records, |
| | "error": qr.error, |
| | "row_count": len(qr.result) if not qr.error else 0 |
| | }) |
| | |
| | |
| | |
| | global history |
| | try: |
| | history_entry = { |
| | "query": result.user_query, |
| | "response": { |
| | "summary": result.summary, |
| | "sql_queries": result.sql_queries, |
| | "query_results": query_results, |
| | "visualizations": result.visualizations |
| | }, |
| | "timestamp": __import__("datetime").datetime.now().isoformat() |
| | } |
| | history.append(history_entry) |
| | print(f"History entry added to memory: {len(history)} entries. Query: {result.user_query[:50]}...", flush=True) |
| | |
| | |
| | save_history() |
| | print(f"History saved to disk: {history_file.absolute()}", flush=True) |
| | except Exception as e: |
| | print(f"CRITICAL: Error saving history: {e}", flush=True) |
| | import traceback |
| | traceback.print_exc() |
| | |
| | |
| | response = SQLQueryResponse( |
| | query=result.user_query, |
| | summary=result.summary, |
| | sql_queries=result.sql_queries, |
| | query_results=query_results, |
| | visualizations=result.visualizations |
| | ) |
| | |
| | return response |
| | except Exception as e: |
| | import traceback |
| | error_details = traceback.format_exc() |
| | print(f"Error in /query-sql endpoint: {error_details}", flush=True) |
| | return SQLQueryResponse( |
| | query=req.query, |
| | summary=f"砖讙讬讗讛: {str(e)}. 讗谞讗 讘讚讜拽 讗转 讛诇讜讙讬诐 诇驻专讟讬诐 谞讜住驻讬诐.", |
| | sql_queries=[], |
| | query_results=[], |
| | visualizations=None |
| | ) |
| |
|
| |
|
| |
|
| |
|
| | |
| | |
| | |
| | static_dir = Path(__file__).resolve().parent.parent.parent / "1_frontend" |
| | print(f"Looking for frontend at: {static_dir}", flush=True) |
| | print(f"Frontend exists: {static_dir.exists()}", flush=True) |
| | if static_dir.exists(): |
| | |
| | app.mount("/static", StaticFiles(directory=str(static_dir)), name="static") |
| | print(f"Mounted static files from: {static_dir}", flush=True) |
| | else: |
| | print(f"WARNING: Frontend directory not found at {static_dir}", flush=True) |
| |
|
| |
|
| | @app.get("/") |
| | def root() -> HTMLResponse: |
| | """Serve the main index.html for the frontend.""" |
| | try: |
| | if not static_dir.exists(): |
| | return HTMLResponse( |
| | f"<html><body><h1>Frontend not available</h1><p>Looking for: {static_dir}</p><p>Current working directory: {Path.cwd()}</p></body></html>", |
| | status_code=404 |
| | ) |
| | html = (static_dir / "index.html").read_text(encoding="utf-8") |
| | return HTMLResponse(html) |
| | except Exception as e: |
| | import traceback |
| | error_msg = traceback.format_exc() |
| | return HTMLResponse( |
| | f"<html><body><h1>Frontend not available</h1><p>Error: {str(e)}</p><pre>{error_msg}</pre></body></html>", |
| | status_code=404 |
| | ) |
| |
|
| |
|
| | @app.get("/history") |
| | def get_history() -> Dict[str, Any]: |
| | """ |
| | Get query history. |
| | |
| | Returns all previously asked questions and their responses. |
| | History is persisted to `.query_history.json` and loaded on startup. |
| | """ |
| | global history |
| | |
| | if history_file.exists(): |
| | try: |
| | with history_file.open("r", encoding="utf-8") as f: |
| | loaded_history = json.load(f) |
| | history = loaded_history if isinstance(loaded_history, list) else [] |
| | print(f"History loaded from disk: {len(history)} entries", flush=True) |
| | except Exception as e: |
| | print(f"Error loading history from disk: {e}", flush=True) |
| | import traceback |
| | traceback.print_exc() |
| | else: |
| | print(f"History file does not exist: {history_file.absolute()}", flush=True) |
| | |
| | print(f"Returning history: {len(history)} entries", flush=True) |
| | return {"history": history} |
| |
|
| |
|
| | @app.post("/history/clear") |
| | def clear_history() -> Dict[str, Any]: |
| | """ |
| | Clear query history. |
| | |
| | Removes all stored queries from memory and disk. |
| | Useful for testing or privacy purposes. |
| | """ |
| | global history |
| | history = [] |
| | save_history() |
| | return {"status": "cleared"} |
| |
|
| |
|