Spaces:
Sleeping
Sleeping
| from fastapi import HTTPException, Response, APIRouter, Request | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.staticfiles import StaticFiles | |
| from fastapi.responses import FileResponse, StreamingResponse | |
| from typing import Dict, Optional | |
| from pydantic import BaseModel, VERSION as PYDANTIC_VERSION | |
| import json | |
| import os | |
| from .services.openai_service import assistant_manager | |
| from .exercises.exercise_service import load_exercises, exercises | |
| from .exercises.models import Exercise | |
| # 先定義 router 及相關 endpoint | |
| router = APIRouter(prefix="/api") | |
| class IntentRequest(BaseModel): | |
| text: str | |
| class ContextRequest(BaseModel): | |
| history: list | |
| class AssistantRequest(BaseModel): | |
| context: dict | |
| user_input: str | |
| thread_id: Optional[str] = None | |
| async def analyze_intent(req: IntentRequest): | |
| # 這裡呼叫你的 intent 分析邏輯 | |
| result = your_intent_analyze_function(req.text) | |
| return {"intent": result} | |
| async def build_context(req: ContextRequest): | |
| # 這裡呼叫你的 context builder 邏輯 | |
| context = your_context_builder_function(req.history) | |
| return {"context": context} | |
| async def assistant_reply(req: AssistantRequest): | |
| # 呼叫 assistant_manager.get_response_stream 並合併 stream 內容 | |
| reply_chunks = [] | |
| async for chunk in assistant_manager.get_response_stream( | |
| thread_id=req.thread_id, | |
| user_question=req.user_input, | |
| # 你可以根據需要傳 context, 這裡只傳 user_input | |
| ): | |
| reply_chunks.append(chunk) | |
| reply = "".join(reply_chunks) | |
| return {"reply": reply} | |
| # 讀取知識圖譜 | |
| def load_knowledge_graphs(): | |
| """從多個 JSON 文件加載知識圖譜""" | |
| graphs = {} | |
| current_dir = os.path.dirname(os.path.abspath(__file__)) | |
| graphs_dir = os.path.join(current_dir, 'knowledge_graphs', 'graphs') | |
| # 確保目錄存在 | |
| if not os.path.exists(graphs_dir): | |
| os.makedirs(graphs_dir) | |
| # 讀取所有 JSON 文件 | |
| for filename in os.listdir(graphs_dir): | |
| if filename.endswith('.json'): | |
| file_path = os.path.join(graphs_dir, filename) | |
| with open(file_path, 'r', encoding='utf-8') as f: | |
| graph_data = json.load(f) | |
| graph_id = graph_data['id'] | |
| graphs[graph_id] = { | |
| "id": graph_id, | |
| "title": graph_data["title"], | |
| "graph": { | |
| "title": graph_data["title"], # 修改:在 graph 中也包含 title | |
| "nodes": graph_data["nodes"], | |
| "edges": graph_data["edges"] | |
| } | |
| } | |
| return graphs | |
| # 載入知識圖譜 | |
| knowledge_graphs = load_knowledge_graphs() | |
| def list_graphs(): | |
| return { | |
| "available_graphs": [ | |
| { | |
| "id": graph_id, | |
| "title": graph_data["title"] | |
| } | |
| for graph_id, graph_data in knowledge_graphs.items() | |
| ] | |
| } | |
| def get_graph(graph_id: str): | |
| if graph_id not in knowledge_graphs: | |
| raise HTTPException(status_code=404, detail="Graph not found") | |
| graph_data = knowledge_graphs[graph_id] | |
| # 修改:傳遞完整的圖譜資料,包含 title | |
| graph_for_context = graph_data["graph"].copy() | |
| graph_for_context["title"] = graph_data["title"] | |
| assistant_manager.set_graph_context(graph_id, graph_for_context) | |
| return graph_data | |
| def get_node(graph_id: str, node_id: str): | |
| if graph_id not in knowledge_graphs: | |
| raise HTTPException(status_code=404, detail="Graph not found") | |
| nodes = knowledge_graphs[graph_id]["graph"].get("nodes", {}) | |
| node_data = nodes.get(node_id) | |
| if not node_data: | |
| raise HTTPException(status_code=404, detail="Node not found") | |
| return node_data | |
| async def create_thread(): | |
| thread = await assistant_manager.create_thread() | |
| return {"thread_id": thread.id} | |
| # 定義請求 Body 的模型 | |
| class ChatRequest(BaseModel): | |
| message: str | |
| graph_id: Optional[str] = None | |
| node_id: Optional[str] = None | |
| current_content_status: Optional[dict] = None | |
| async def chat(thread_id: str, request: ChatRequest): | |
| try: | |
| # 從請求模型中獲取數據 | |
| message = request.message | |
| graph_id = request.graph_id | |
| node_id = request.node_id | |
| current_content_status = request.current_content_status | |
| # 在後端打印接收到的 ID,進行驗證 | |
| print(f"[Chat Endpoint] Received Thread ID: {thread_id}") | |
| print(f"[Chat Endpoint] Received Graph ID: {graph_id}") | |
| print(f"[Chat Endpoint] Received Node ID: {node_id}") | |
| print(f"[Chat Endpoint] Received Message: {message}") | |
| # 確保圖譜上下文已加載 (如果提供了 graph_id) | |
| if graph_id and graph_id in knowledge_graphs: | |
| # 檢查 assistant_manager 是否有這個圖譜的上下文 | |
| if graph_id not in assistant_manager.graph_contexts: | |
| print(f"[Chat Endpoint] Setting graph context for {graph_id} in chat endpoint.") | |
| assistant_manager.set_graph_context(graph_id, knowledge_graphs[graph_id]["graph"]) | |
| else: | |
| print(f"[Chat Endpoint] Graph context for {graph_id} already exists.") | |
| elif graph_id: | |
| print(f"[Chat Endpoint] Warning: Received graph_id '{graph_id}' not found in loaded graphs.") | |
| else: | |
| print("[Chat Endpoint] No graph_id provided in the request.") | |
| return StreamingResponse( | |
| assistant_manager.get_response_stream( | |
| thread_id=thread_id, | |
| user_question=message, | |
| graph_id=graph_id, | |
| node_id=node_id, | |
| current_content_status=current_content_status | |
| ), | |
| media_type="text/event-stream" | |
| ) | |
| except Exception as e: | |
| import traceback | |
| traceback.print_exc() | |
| raise HTTPException(status_code=500, detail=f"An internal error occurred: {str(e)}") | |
| def get_node_exercises(node_id: str): | |
| """獲取特定節點的練習題""" | |
| print(f"[Get Exercises] Pydantic version being used: {PYDANTIC_VERSION}") | |
| # --- 新增:在每次請求時重新加載練習題 --- | |
| print("[Get Exercises] Reloading exercises for this request...") | |
| try: | |
| # 調用 load_exercises 獲取最新的練習題列表和節點映射 | |
| exercises_local, node_exercises_local = load_exercises() | |
| print(f"[Get Exercises] Reloaded {len(exercises_local)} total exercises.") | |
| except Exception as e: | |
| print(f"[Get Exercises] Error reloading exercises: {e}") | |
| raise HTTPException(status_code=500, detail="Failed to reload exercises") | |
| # --- 結束新增 --- | |
| print(f"[Get Exercises] Request for node_id: {node_id}") | |
| # --- 修改:使用本地加載的 node_exercises_local --- | |
| node_related_exercises = node_exercises_local.get(node_id, []) | |
| # --- 結束修改 --- | |
| print(f"[Get Exercises] Found {len(node_related_exercises)} exercises for node {node_id}.") | |
| if node_related_exercises: | |
| print(f"[Get Exercises] Type of first exercise object: {type(node_related_exercises[0])}") | |
| print(f"[Get Exercises] Attributes of first exercise object: {dir(node_related_exercises[0])}") | |
| try: | |
| exercises_dict_list = [] | |
| for i, ex in enumerate(node_related_exercises): | |
| print(f"[Get Exercises] Processing index {i}, object type: {type(ex)}") | |
| if hasattr(ex, 'model_dump'): | |
| exercises_dict_list.append(ex.model_dump()) | |
| else: | |
| print(f"[Get Exercises] ERROR: Object at index {i} (type: {type(ex)}) lacks 'model_dump' method!") | |
| obj_id = getattr(ex, 'id', 'N/A') | |
| print(f"[Get Exercises] Object ID (if available): {obj_id}") | |
| print(f"[Get Exercises] Skipping object at index {i} due to missing method.") | |
| continue | |
| print(f"[Get Exercises] Successfully serialized {len(exercises_dict_list)} exercises.") | |
| return { | |
| "exercises": exercises_dict_list | |
| } | |
| except AttributeError as e: | |
| print(f"[Get Exercises] Caught AttributeError during serialization: {e}") | |
| raise HTTPException(status_code=500, detail=f"Internal error processing exercises: {str(e)}") | |
| except Exception as e: | |
| print(f"[Get Exercises] Caught other Exception: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| raise HTTPException(status_code=500, detail=f"An unexpected error occurred processing exercises: {str(e)}") | |
| # 定義檢查答案的請求 Body 模型 | |
| class CheckAnswerRequest(BaseModel): | |
| answer: str | |
| show_explanation: Optional[bool] = False | |
| def check_exercise(exercise_id: str, request: CheckAnswerRequest): | |
| """檢查答案""" | |
| user_answer = request.answer | |
| show_explanation = request.show_explanation | |
| # 從 exercise_service 模組導入 exercises | |
| from backend.exercises.exercise_service import exercises | |
| exercise = next((ex for ex in exercises if ex.id == exercise_id), None) | |
| if not exercise: | |
| raise HTTPException(status_code=404, detail="Exercise not found") | |
| is_correct = user_answer.strip().lower() == exercise.answer.strip().lower() | |
| explanation = None | |
| if is_correct or show_explanation: | |
| explanation = exercise.explanation | |
| return { | |
| "correct": is_correct, | |
| "explanation": explanation | |
| } | |
| # @router.post("/assistant") | |
| # async def assistant_reply(req: AssistantRequest): | |
| # # 這裡呼叫你的 assistant 回覆邏輯 | |
| # reply = your_assistant_function(req.context, req.user_input) | |
| # return {"reply": reply} |