File size: 12,080 Bytes
1c23b7c 9c30c74 1c23b7c 9c30c74 b4d57a6 1c23b7c 85b82f6 1c23b7c 9c30c74 1c23b7c c503371 9c30c74 c503371 9c30c74 c503371 1c23b7c b4d57a6 9c30c74 e405ff6 9c30c74 f77d065 b4d57a6 f77d065 0ca72c8 b4d57a6 0ca72c8 f77d065 e405ff6 f77d065 0ca72c8 f77d065 0ca72c8 b4d57a6 1c23b7c 9c30c74 b4d57a6 1c23b7c 9c30c74 1c23b7c 9c30c74 1c23b7c 85b82f6 9c30c74 85b82f6 9c30c74 85b82f6 e161246 1c23b7c e161246 1c23b7c 85b82f6 c503371 53469df c503371 85b82f6 9c30c74 0ca72c8 9c30c74 0ca72c8 9c30c74 0ca72c8 9c30c74 85b82f6 9c30c74 85b82f6 9c30c74 85b82f6 cd41eff b987a23 f77d065 9c30c74 0ca72c8 9c30c74 cd41eff e405ff6 0ca72c8 f77d065 0ca72c8 9c30c74 cd41eff 85b82f6 cd41eff 85b82f6 1c23b7c b4d57a6 9a7cb3e 53469df b4d57a6 53469df b4d57a6 53469df b4d57a6 53469df b4d57a6 9c30c74 f77d065 e8f1ece b4d57a6 9c30c74 b4d57a6 9c30c74 b4d57a6 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 | from __future__ import annotations
from typing import List, Optional, Dict, Any
from pathlib import Path
import json
from fastapi import FastAPI
from fastapi.responses import ORJSONResponse, HTMLResponse
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel, Field
from .config import settings
from .data_loader import load_feedback
from .sql_service import SQLFeedbackService
# FastAPI application for Feedback Analysis using SQL-based approach
app = FastAPI(
title="Feedback Analysis Agent",
version="2.0.0",
description="SQL-based feedback analysis system using LLM-generated queries",
default_response_class=ORJSONResponse
)
# Initialize SQL service lazily to avoid errors on startup if data is missing
# This service handles all query processing using SQL-based approach
sql_svc: Optional[SQLFeedbackService] = None
try:
sql_svc = SQLFeedbackService()
print("SQL service initialized successfully", flush=True)
except Exception as e:
print(f"Warning: Could not initialize SQL service: {e}", flush=True)
# Simple in-memory history persisted best-effort to `.query_history.json`
history_file = Path(".query_history.json")
history = []
if history_file.exists():
try:
with history_file.open("r", encoding="utf-8") as f:
history = json.load(f)
except Exception:
history = []
def save_history() -> None:
"""
Save query history to disk.
This is a best-effort operation - if saving fails (e.g., disk full,
permissions issue), the error is logged but doesn't break the main flow.
History is stored in `.query_history.json`.
"""
global history
try:
# Ensure we're saving the current history
# Use default=str to handle any non-serializable types
with history_file.open("w", encoding="utf-8") as f:
json.dump(history, f, ensure_ascii=False, indent=2, default=str)
print(f"History saved successfully to {history_file.absolute()}: {len(history)} entries", flush=True)
except Exception as e:
# Log error but don't break main flow
import traceback
print(f"CRITICAL ERROR: Could not save history to {history_file}: {e}", flush=True)
traceback.print_exc()
# Try to save a simplified version
try:
simplified_history = []
for entry in history:
simplified_entry = {
"query": entry.get("query", ""),
"summary": entry.get("response", {}).get("summary", ""),
"timestamp": entry.get("timestamp", "")
}
simplified_history.append(simplified_entry)
with history_file.open("w", encoding="utf-8") as f:
json.dump(simplified_history, f, ensure_ascii=False, indent=2)
print(f"Saved simplified history: {len(simplified_history)} entries", flush=True)
except Exception as e2:
print(f"Failed to save even simplified history: {e2}", flush=True)
class QueryRequest(BaseModel):
"""
Request model for query endpoints.
Attributes:
query: The natural language question to analyze
top_k: Number of results to return (kept for compatibility, not actively used)
"""
query: str = Field(..., example="转住讜讜讙 讗转 讛转诇讜谞讜转 5 住讜讙讬诐")
top_k: int = Field(5, example=5)
class QueryResponse(BaseModel):
"""
Response model for legacy query endpoint (deprecated).
Kept for backward compatibility but not actively used.
"""
query: str
summary: Optional[str]
results: Optional[List[Dict[str, Any]]] = None
class SQLQueryResponse(BaseModel):
"""
Response model for SQL-based query endpoint.
Attributes:
query: The original user query
summary: Final synthesized answer in natural language
sql_queries: List of SQL queries that were generated and executed
query_results: Results from each SQL query (as dictionaries for JSON serialization)
visualizations: Optional list of visualization specifications for frontend rendering
"""
query: str
summary: str
sql_queries: List[str]
query_results: List[Dict[str, Any]]
visualizations: Optional[List[Dict[str, Any]]] = None
@app.post("/health")
def health() -> Dict[str, str]:
"""Healthcheck endpoint.
Converted to POST so all endpoints consistently use JSON/POST semantics.
"""
return {"status": "ok"}
@app.post("/query-sql", response_model=SQLQueryResponse)
def query_sql(req: QueryRequest) -> SQLQueryResponse:
"""SQL-based question answering over feedback data.
This endpoint uses a SQL-based approach:
1. LLM generates 1-5 SQL queries
2. Executes queries on feedback data
3. LLM synthesizes comprehensive answer
4. Returns answer with query results and visualizations
"""
global sql_svc
if sql_svc is None:
try:
sql_svc = SQLFeedbackService()
except Exception as e:
return SQLQueryResponse(
query=req.query,
summary=f"砖讙讬讗讛 讘讗转讞讜诇 砖讬专讜转 SQL: {str(e)}. 讗谞讗 讜讚讗 砖拽讜讘抓 feedback_transformed_2.csv 拽讬讬诐 讘转讬拽讬讬转 0_preprocessing/.",
sql_queries=[],
query_results=[],
visualizations=None
)
try:
result = sql_svc.analyze_query(req.query)
# Convert query results to JSON-serializable format
# Pandas DataFrames may contain numpy types that aren't JSON-serializable
# This helper function converts them to native Python types
def convert_to_python_type(val):
"""
Convert numpy types to native Python types for JSON serialization.
FastAPI/Pydantic can't serialize numpy types directly, so we need
to convert them. This function handles integers, floats, arrays, and NaN.
"""
import numpy as np
import math
# Handle NaN and None
if val is None or (isinstance(val, float) and math.isnan(val)):
return None
if isinstance(val, (np.integer, np.int64, np.int32)):
return int(val)
elif isinstance(val, (np.floating, np.float64, np.float32)):
if math.isnan(val):
return None
return float(val)
elif isinstance(val, np.ndarray):
return val.tolist()
return val
query_results = []
for qr in result.query_results:
# Convert DataFrame to dict and clean numpy types
records = []
if not qr.error and len(qr.result) > 0:
for record in qr.result.to_dict('records'):
cleaned_record = {k: convert_to_python_type(v) for k, v in record.items()}
records.append(cleaned_record)
query_results.append({
"query": qr.query,
"result": records,
"error": qr.error,
"row_count": len(qr.result) if not qr.error else 0
})
# Save to history - ensure it's always saved BEFORE returning response
# Use the already-converted query_results (which are JSON-serializable)
global history
try:
history_entry = {
"query": result.user_query,
"response": {
"summary": result.summary,
"sql_queries": result.sql_queries,
"query_results": query_results, # Already converted to JSON-serializable format above
"visualizations": result.visualizations
},
"timestamp": __import__("datetime").datetime.now().isoformat()
}
history.append(history_entry)
print(f"History entry added to memory: {len(history)} entries. Query: {result.user_query[:50]}...", flush=True)
# Save to disk immediately - use force save
save_history()
print(f"History saved to disk: {history_file.absolute()}", flush=True)
except Exception as e:
print(f"CRITICAL: Error saving history: {e}", flush=True)
import traceback
traceback.print_exc()
# Don't fail the request, but log the error
response = SQLQueryResponse(
query=result.user_query,
summary=result.summary,
sql_queries=result.sql_queries,
query_results=query_results,
visualizations=result.visualizations
)
return response
except Exception as e:
import traceback
error_details = traceback.format_exc()
print(f"Error in /query-sql endpoint: {error_details}", flush=True)
return SQLQueryResponse(
query=req.query,
summary=f"砖讙讬讗讛: {str(e)}. 讗谞讗 讘讚讜拽 讗转 讛诇讜讙讬诐 诇驻专讟讬诐 谞讜住驻讬诐.",
sql_queries=[],
query_results=[],
visualizations=None
)
# Mount static files for a simple frontend if present
# Frontend files are in 1_frontend/ directory
# Calculate path relative to this file: 2_backend_llm/app/api.py -> root/1_frontend
static_dir = Path(__file__).resolve().parent.parent.parent / "1_frontend"
print(f"Looking for frontend at: {static_dir}", flush=True)
print(f"Frontend exists: {static_dir.exists()}", flush=True)
if static_dir.exists():
# Serve static assets under /static/* (so index.html can reference /static/app.js)
app.mount("/static", StaticFiles(directory=str(static_dir)), name="static")
print(f"Mounted static files from: {static_dir}", flush=True)
else:
print(f"WARNING: Frontend directory not found at {static_dir}", flush=True)
@app.get("/")
def root() -> HTMLResponse:
"""Serve the main index.html for the frontend."""
try:
if not static_dir.exists():
return HTMLResponse(
f"<html><body><h1>Frontend not available</h1><p>Looking for: {static_dir}</p><p>Current working directory: {Path.cwd()}</p></body></html>",
status_code=404
)
html = (static_dir / "index.html").read_text(encoding="utf-8")
return HTMLResponse(html)
except Exception as e:
import traceback
error_msg = traceback.format_exc()
return HTMLResponse(
f"<html><body><h1>Frontend not available</h1><p>Error: {str(e)}</p><pre>{error_msg}</pre></body></html>",
status_code=404
)
@app.get("/history")
def get_history() -> Dict[str, Any]:
"""
Get query history.
Returns all previously asked questions and their responses.
History is persisted to `.query_history.json` and loaded on startup.
"""
global history
# Reload history from disk to ensure we have the latest
if history_file.exists():
try:
with history_file.open("r", encoding="utf-8") as f:
loaded_history = json.load(f)
history = loaded_history if isinstance(loaded_history, list) else []
print(f"History loaded from disk: {len(history)} entries", flush=True)
except Exception as e:
print(f"Error loading history from disk: {e}", flush=True)
import traceback
traceback.print_exc()
else:
print(f"History file does not exist: {history_file.absolute()}", flush=True)
print(f"Returning history: {len(history)} entries", flush=True)
return {"history": history}
@app.post("/history/clear")
def clear_history() -> Dict[str, Any]:
"""
Clear query history.
Removes all stored queries from memory and disk.
Useful for testing or privacy purposes.
"""
global history
history = []
save_history() # Persist the cleared state to disk
return {"status": "cleared"}
|