tai-JY / backend /main.py
youngtsai's picture
api debug
418bc87
from fastapi import HTTPException, Response, APIRouter, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
from fastapi.responses import FileResponse, StreamingResponse
from typing import Dict, Optional
from pydantic import BaseModel, VERSION as PYDANTIC_VERSION
import json
import os
from .services.openai_service import assistant_manager
from .exercises.exercise_service import load_exercises, exercises
from .exercises.models import Exercise
# 先定義 router 及相關 endpoint
router = APIRouter(prefix="/api")
class IntentRequest(BaseModel):
text: str
class ContextRequest(BaseModel):
history: list
class AssistantRequest(BaseModel):
context: dict
user_input: str
thread_id: Optional[str] = None
@router.post("/intent")
async def analyze_intent(req: IntentRequest):
# 這裡呼叫你的 intent 分析邏輯
result = your_intent_analyze_function(req.text)
return {"intent": result}
@router.post("/context")
async def build_context(req: ContextRequest):
# 這裡呼叫你的 context builder 邏輯
context = your_context_builder_function(req.history)
return {"context": context}
@router.post("/assistant")
async def assistant_reply(req: AssistantRequest):
# 呼叫 assistant_manager.get_response_stream 並合併 stream 內容
reply_chunks = []
async for chunk in assistant_manager.get_response_stream(
thread_id=req.thread_id,
user_question=req.user_input,
# 你可以根據需要傳 context, 這裡只傳 user_input
):
reply_chunks.append(chunk)
reply = "".join(reply_chunks)
return {"reply": reply}
# 讀取知識圖譜
def load_knowledge_graphs():
"""從多個 JSON 文件加載知識圖譜"""
graphs = {}
current_dir = os.path.dirname(os.path.abspath(__file__))
graphs_dir = os.path.join(current_dir, 'knowledge_graphs', 'graphs')
# 確保目錄存在
if not os.path.exists(graphs_dir):
os.makedirs(graphs_dir)
# 讀取所有 JSON 文件
for filename in os.listdir(graphs_dir):
if filename.endswith('.json'):
file_path = os.path.join(graphs_dir, filename)
with open(file_path, 'r', encoding='utf-8') as f:
graph_data = json.load(f)
graph_id = graph_data['id']
graphs[graph_id] = {
"id": graph_id,
"title": graph_data["title"],
"graph": {
"title": graph_data["title"], # 修改:在 graph 中也包含 title
"nodes": graph_data["nodes"],
"edges": graph_data["edges"]
}
}
return graphs
# 載入知識圖譜
knowledge_graphs = load_knowledge_graphs()
@router.get("/graphs")
def list_graphs():
return {
"available_graphs": [
{
"id": graph_id,
"title": graph_data["title"]
}
for graph_id, graph_data in knowledge_graphs.items()
]
}
@router.get("/graph/{graph_id}")
def get_graph(graph_id: str):
if graph_id not in knowledge_graphs:
raise HTTPException(status_code=404, detail="Graph not found")
graph_data = knowledge_graphs[graph_id]
# 修改:傳遞完整的圖譜資料,包含 title
graph_for_context = graph_data["graph"].copy()
graph_for_context["title"] = graph_data["title"]
assistant_manager.set_graph_context(graph_id, graph_for_context)
return graph_data
@router.get("/graph/{graph_id}/node/{node_id}")
def get_node(graph_id: str, node_id: str):
if graph_id not in knowledge_graphs:
raise HTTPException(status_code=404, detail="Graph not found")
nodes = knowledge_graphs[graph_id]["graph"].get("nodes", {})
node_data = nodes.get(node_id)
if not node_data:
raise HTTPException(status_code=404, detail="Node not found")
return node_data
@router.post("/chat/thread")
async def create_thread():
thread = await assistant_manager.create_thread()
return {"thread_id": thread.id}
# 定義請求 Body 的模型
class ChatRequest(BaseModel):
message: str
graph_id: Optional[str] = None
node_id: Optional[str] = None
current_content_status: Optional[dict] = None
@router.post("/chat/{thread_id}")
async def chat(thread_id: str, request: ChatRequest):
try:
# 從請求模型中獲取數據
message = request.message
graph_id = request.graph_id
node_id = request.node_id
current_content_status = request.current_content_status
# 在後端打印接收到的 ID,進行驗證
print(f"[Chat Endpoint] Received Thread ID: {thread_id}")
print(f"[Chat Endpoint] Received Graph ID: {graph_id}")
print(f"[Chat Endpoint] Received Node ID: {node_id}")
print(f"[Chat Endpoint] Received Message: {message}")
# 確保圖譜上下文已加載 (如果提供了 graph_id)
if graph_id and graph_id in knowledge_graphs:
# 檢查 assistant_manager 是否有這個圖譜的上下文
if graph_id not in assistant_manager.graph_contexts:
print(f"[Chat Endpoint] Setting graph context for {graph_id} in chat endpoint.")
assistant_manager.set_graph_context(graph_id, knowledge_graphs[graph_id]["graph"])
else:
print(f"[Chat Endpoint] Graph context for {graph_id} already exists.")
elif graph_id:
print(f"[Chat Endpoint] Warning: Received graph_id '{graph_id}' not found in loaded graphs.")
else:
print("[Chat Endpoint] No graph_id provided in the request.")
return StreamingResponse(
assistant_manager.get_response_stream(
thread_id=thread_id,
user_question=message,
graph_id=graph_id,
node_id=node_id,
current_content_status=current_content_status
),
media_type="text/event-stream"
)
except Exception as e:
import traceback
traceback.print_exc()
raise HTTPException(status_code=500, detail=f"An internal error occurred: {str(e)}")
@router.get("/exercises/{node_id}")
def get_node_exercises(node_id: str):
"""獲取特定節點的練習題"""
print(f"[Get Exercises] Pydantic version being used: {PYDANTIC_VERSION}")
# --- 新增:在每次請求時重新加載練習題 ---
print("[Get Exercises] Reloading exercises for this request...")
try:
# 調用 load_exercises 獲取最新的練習題列表和節點映射
exercises_local, node_exercises_local = load_exercises()
print(f"[Get Exercises] Reloaded {len(exercises_local)} total exercises.")
except Exception as e:
print(f"[Get Exercises] Error reloading exercises: {e}")
raise HTTPException(status_code=500, detail="Failed to reload exercises")
# --- 結束新增 ---
print(f"[Get Exercises] Request for node_id: {node_id}")
# --- 修改:使用本地加載的 node_exercises_local ---
node_related_exercises = node_exercises_local.get(node_id, [])
# --- 結束修改 ---
print(f"[Get Exercises] Found {len(node_related_exercises)} exercises for node {node_id}.")
if node_related_exercises:
print(f"[Get Exercises] Type of first exercise object: {type(node_related_exercises[0])}")
print(f"[Get Exercises] Attributes of first exercise object: {dir(node_related_exercises[0])}")
try:
exercises_dict_list = []
for i, ex in enumerate(node_related_exercises):
print(f"[Get Exercises] Processing index {i}, object type: {type(ex)}")
if hasattr(ex, 'model_dump'):
exercises_dict_list.append(ex.model_dump())
else:
print(f"[Get Exercises] ERROR: Object at index {i} (type: {type(ex)}) lacks 'model_dump' method!")
obj_id = getattr(ex, 'id', 'N/A')
print(f"[Get Exercises] Object ID (if available): {obj_id}")
print(f"[Get Exercises] Skipping object at index {i} due to missing method.")
continue
print(f"[Get Exercises] Successfully serialized {len(exercises_dict_list)} exercises.")
return {
"exercises": exercises_dict_list
}
except AttributeError as e:
print(f"[Get Exercises] Caught AttributeError during serialization: {e}")
raise HTTPException(status_code=500, detail=f"Internal error processing exercises: {str(e)}")
except Exception as e:
print(f"[Get Exercises] Caught other Exception: {e}")
import traceback
traceback.print_exc()
raise HTTPException(status_code=500, detail=f"An unexpected error occurred processing exercises: {str(e)}")
# 定義檢查答案的請求 Body 模型
class CheckAnswerRequest(BaseModel):
answer: str
show_explanation: Optional[bool] = False
@router.post("/exercises/check/{exercise_id}")
def check_exercise(exercise_id: str, request: CheckAnswerRequest):
"""檢查答案"""
user_answer = request.answer
show_explanation = request.show_explanation
# 從 exercise_service 模組導入 exercises
from backend.exercises.exercise_service import exercises
exercise = next((ex for ex in exercises if ex.id == exercise_id), None)
if not exercise:
raise HTTPException(status_code=404, detail="Exercise not found")
is_correct = user_answer.strip().lower() == exercise.answer.strip().lower()
explanation = None
if is_correct or show_explanation:
explanation = exercise.explanation
return {
"correct": is_correct,
"explanation": explanation
}
# @router.post("/assistant")
# async def assistant_reply(req: AssistantRequest):
# # 這裡呼叫你的 assistant 回覆邏輯
# reply = your_assistant_function(req.context, req.user_input)
# return {"reply": reply}