galbendavids's picture
转讬拽讜谉: 注讚讻讜谉 谞转讬讘讬 拽讘爪讬诐 - feedback_transformed_2.csv, 转讬拽讜谉 谞转讬讘 frontend, 讛讜住驻转 诇讜讙讬诐
53469df
from __future__ import annotations
from typing import List, Optional, Dict, Any
from pathlib import Path
import json
from fastapi import FastAPI
from fastapi.responses import ORJSONResponse, HTMLResponse
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel, Field
from .config import settings
from .data_loader import load_feedback
from .sql_service import SQLFeedbackService
# FastAPI application for Feedback Analysis using SQL-based approach
app = FastAPI(
title="Feedback Analysis Agent",
version="2.0.0",
description="SQL-based feedback analysis system using LLM-generated queries",
default_response_class=ORJSONResponse
)
# Initialize SQL service lazily to avoid errors on startup if data is missing
# This service handles all query processing using SQL-based approach
sql_svc: Optional[SQLFeedbackService] = None
try:
sql_svc = SQLFeedbackService()
print("SQL service initialized successfully", flush=True)
except Exception as e:
print(f"Warning: Could not initialize SQL service: {e}", flush=True)
# Simple in-memory history persisted best-effort to `.query_history.json`
history_file = Path(".query_history.json")
history = []
if history_file.exists():
try:
with history_file.open("r", encoding="utf-8") as f:
history = json.load(f)
except Exception:
history = []
def save_history() -> None:
"""
Save query history to disk.
This is a best-effort operation - if saving fails (e.g., disk full,
permissions issue), the error is logged but doesn't break the main flow.
History is stored in `.query_history.json`.
"""
global history
try:
# Ensure we're saving the current history
# Use default=str to handle any non-serializable types
with history_file.open("w", encoding="utf-8") as f:
json.dump(history, f, ensure_ascii=False, indent=2, default=str)
print(f"History saved successfully to {history_file.absolute()}: {len(history)} entries", flush=True)
except Exception as e:
# Log error but don't break main flow
import traceback
print(f"CRITICAL ERROR: Could not save history to {history_file}: {e}", flush=True)
traceback.print_exc()
# Try to save a simplified version
try:
simplified_history = []
for entry in history:
simplified_entry = {
"query": entry.get("query", ""),
"summary": entry.get("response", {}).get("summary", ""),
"timestamp": entry.get("timestamp", "")
}
simplified_history.append(simplified_entry)
with history_file.open("w", encoding="utf-8") as f:
json.dump(simplified_history, f, ensure_ascii=False, indent=2)
print(f"Saved simplified history: {len(simplified_history)} entries", flush=True)
except Exception as e2:
print(f"Failed to save even simplified history: {e2}", flush=True)
class QueryRequest(BaseModel):
"""
Request model for query endpoints.
Attributes:
query: The natural language question to analyze
top_k: Number of results to return (kept for compatibility, not actively used)
"""
query: str = Field(..., example="转住讜讜讙 讗转 讛转诇讜谞讜转 5 住讜讙讬诐")
top_k: int = Field(5, example=5)
class QueryResponse(BaseModel):
"""
Response model for legacy query endpoint (deprecated).
Kept for backward compatibility but not actively used.
"""
query: str
summary: Optional[str]
results: Optional[List[Dict[str, Any]]] = None
class SQLQueryResponse(BaseModel):
"""
Response model for SQL-based query endpoint.
Attributes:
query: The original user query
summary: Final synthesized answer in natural language
sql_queries: List of SQL queries that were generated and executed
query_results: Results from each SQL query (as dictionaries for JSON serialization)
visualizations: Optional list of visualization specifications for frontend rendering
"""
query: str
summary: str
sql_queries: List[str]
query_results: List[Dict[str, Any]]
visualizations: Optional[List[Dict[str, Any]]] = None
@app.post("/health")
def health() -> Dict[str, str]:
"""Healthcheck endpoint.
Converted to POST so all endpoints consistently use JSON/POST semantics.
"""
return {"status": "ok"}
@app.post("/query-sql", response_model=SQLQueryResponse)
def query_sql(req: QueryRequest) -> SQLQueryResponse:
"""SQL-based question answering over feedback data.
This endpoint uses a SQL-based approach:
1. LLM generates 1-5 SQL queries
2. Executes queries on feedback data
3. LLM synthesizes comprehensive answer
4. Returns answer with query results and visualizations
"""
global sql_svc
if sql_svc is None:
try:
sql_svc = SQLFeedbackService()
except Exception as e:
return SQLQueryResponse(
query=req.query,
summary=f"砖讙讬讗讛 讘讗转讞讜诇 砖讬专讜转 SQL: {str(e)}. 讗谞讗 讜讚讗 砖拽讜讘抓 feedback_transformed_2.csv 拽讬讬诐 讘转讬拽讬讬转 0_preprocessing/.",
sql_queries=[],
query_results=[],
visualizations=None
)
try:
result = sql_svc.analyze_query(req.query)
# Convert query results to JSON-serializable format
# Pandas DataFrames may contain numpy types that aren't JSON-serializable
# This helper function converts them to native Python types
def convert_to_python_type(val):
"""
Convert numpy types to native Python types for JSON serialization.
FastAPI/Pydantic can't serialize numpy types directly, so we need
to convert them. This function handles integers, floats, arrays, and NaN.
"""
import numpy as np
import math
# Handle NaN and None
if val is None or (isinstance(val, float) and math.isnan(val)):
return None
if isinstance(val, (np.integer, np.int64, np.int32)):
return int(val)
elif isinstance(val, (np.floating, np.float64, np.float32)):
if math.isnan(val):
return None
return float(val)
elif isinstance(val, np.ndarray):
return val.tolist()
return val
query_results = []
for qr in result.query_results:
# Convert DataFrame to dict and clean numpy types
records = []
if not qr.error and len(qr.result) > 0:
for record in qr.result.to_dict('records'):
cleaned_record = {k: convert_to_python_type(v) for k, v in record.items()}
records.append(cleaned_record)
query_results.append({
"query": qr.query,
"result": records,
"error": qr.error,
"row_count": len(qr.result) if not qr.error else 0
})
# Save to history - ensure it's always saved BEFORE returning response
# Use the already-converted query_results (which are JSON-serializable)
global history
try:
history_entry = {
"query": result.user_query,
"response": {
"summary": result.summary,
"sql_queries": result.sql_queries,
"query_results": query_results, # Already converted to JSON-serializable format above
"visualizations": result.visualizations
},
"timestamp": __import__("datetime").datetime.now().isoformat()
}
history.append(history_entry)
print(f"History entry added to memory: {len(history)} entries. Query: {result.user_query[:50]}...", flush=True)
# Save to disk immediately - use force save
save_history()
print(f"History saved to disk: {history_file.absolute()}", flush=True)
except Exception as e:
print(f"CRITICAL: Error saving history: {e}", flush=True)
import traceback
traceback.print_exc()
# Don't fail the request, but log the error
response = SQLQueryResponse(
query=result.user_query,
summary=result.summary,
sql_queries=result.sql_queries,
query_results=query_results,
visualizations=result.visualizations
)
return response
except Exception as e:
import traceback
error_details = traceback.format_exc()
print(f"Error in /query-sql endpoint: {error_details}", flush=True)
return SQLQueryResponse(
query=req.query,
summary=f"砖讙讬讗讛: {str(e)}. 讗谞讗 讘讚讜拽 讗转 讛诇讜讙讬诐 诇驻专讟讬诐 谞讜住驻讬诐.",
sql_queries=[],
query_results=[],
visualizations=None
)
# Mount static files for a simple frontend if present
# Frontend files are in 1_frontend/ directory
# Calculate path relative to this file: 2_backend_llm/app/api.py -> root/1_frontend
static_dir = Path(__file__).resolve().parent.parent.parent / "1_frontend"
print(f"Looking for frontend at: {static_dir}", flush=True)
print(f"Frontend exists: {static_dir.exists()}", flush=True)
if static_dir.exists():
# Serve static assets under /static/* (so index.html can reference /static/app.js)
app.mount("/static", StaticFiles(directory=str(static_dir)), name="static")
print(f"Mounted static files from: {static_dir}", flush=True)
else:
print(f"WARNING: Frontend directory not found at {static_dir}", flush=True)
@app.get("/")
def root() -> HTMLResponse:
"""Serve the main index.html for the frontend."""
try:
if not static_dir.exists():
return HTMLResponse(
f"<html><body><h1>Frontend not available</h1><p>Looking for: {static_dir}</p><p>Current working directory: {Path.cwd()}</p></body></html>",
status_code=404
)
html = (static_dir / "index.html").read_text(encoding="utf-8")
return HTMLResponse(html)
except Exception as e:
import traceback
error_msg = traceback.format_exc()
return HTMLResponse(
f"<html><body><h1>Frontend not available</h1><p>Error: {str(e)}</p><pre>{error_msg}</pre></body></html>",
status_code=404
)
@app.get("/history")
def get_history() -> Dict[str, Any]:
"""
Get query history.
Returns all previously asked questions and their responses.
History is persisted to `.query_history.json` and loaded on startup.
"""
global history
# Reload history from disk to ensure we have the latest
if history_file.exists():
try:
with history_file.open("r", encoding="utf-8") as f:
loaded_history = json.load(f)
history = loaded_history if isinstance(loaded_history, list) else []
print(f"History loaded from disk: {len(history)} entries", flush=True)
except Exception as e:
print(f"Error loading history from disk: {e}", flush=True)
import traceback
traceback.print_exc()
else:
print(f"History file does not exist: {history_file.absolute()}", flush=True)
print(f"Returning history: {len(history)} entries", flush=True)
return {"history": history}
@app.post("/history/clear")
def clear_history() -> Dict[str, Any]:
"""
Clear query history.
Removes all stored queries from memory and disk.
Useful for testing or privacy purposes.
"""
global history
history = []
save_history() # Persist the cleared state to disk
return {"status": "cleared"}