Spaces:
Sleeping
Sleeping
| # ClareVoice FastAPI server: React UI + same backend as app.py (Weaviate + FAISS). | |
| # Run: uvicorn server:app --host 0.0.0.0 --port 7860 | |
| import os | |
| import re | |
| import time | |
| import concurrent.futures | |
| from collections import defaultdict | |
| from typing import Dict, List, Any, Optional | |
| from fastapi import FastAPI, UploadFile, File, Form, Request | |
| from fastapi.responses import FileResponse, JSONResponse, Response | |
| from fastapi.staticfiles import StaticFiles | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from pydantic import BaseModel | |
| from config import ( | |
| DEFAULT_MODEL, | |
| DEFAULT_COURSE_TOPICS, | |
| USE_WEAVIATE_DIRECT, | |
| GENAI_COURSES_SPACE, | |
| WEAVIATE_URL, | |
| WEAVIATE_API_KEY, | |
| WEAVIATE_COLLECTION, | |
| WEAVIATE_EMBEDDING, | |
| EMBEDDING_MODEL, | |
| ) | |
| from clare_core import ( | |
| detect_language, | |
| chat_with_clare, | |
| update_weaknesses_from_message, | |
| update_cognitive_state_from_message, | |
| generate_hint_for_question, | |
| grade_quiz_answers, | |
| render_session_status, | |
| export_conversation, | |
| summarize_conversation, | |
| generate_quiz_for_external, | |
| ) | |
| from rag_engine import build_rag_chunks_from_file, retrieve_relevant_chunks | |
| from syllabus_utils import extract_course_topics_from_file | |
| from tts_podcast import ( | |
| text_to_speech, | |
| build_podcast_script_from_history, | |
| build_podcast_script_from_summary, | |
| generate_podcast_audio, | |
| ) | |
| MODULE10_PATH = os.path.join(os.path.dirname(__file__), "module10_responsible_ai.pdf") | |
| MODULE10_DOC_TYPE = "Literature Review / Paper" | |
| # Preload Module 10 (same as app.py) | |
| preloaded_topics: List[str] = [] | |
| preloaded_chunks: List[Dict] = [] | |
| if os.path.exists(MODULE10_PATH): | |
| try: | |
| class _FileObj: | |
| name = MODULE10_PATH | |
| preloaded_topics = extract_course_topics_from_file(_FileObj(), MODULE10_DOC_TYPE) or [] | |
| preloaded_chunks = build_rag_chunks_from_file(MODULE10_PATH, MODULE10_DOC_TYPE) or [] | |
| print("[server] Module 10 preloaded.") | |
| except Exception as e: | |
| print(f"[server] Module 10 preload failed: {e}") | |
| if not preloaded_topics: | |
| preloaded_topics = list(DEFAULT_COURSE_TOPICS) | |
| _WEAVIATE_EMBED_MODEL = None | |
| def _get_weaviate_embed_model(): | |
| """与建索引一致:WEAVIATE_EMBEDDING=openai 时用 OpenAI,否则用 HuggingFace。""" | |
| global _WEAVIATE_EMBED_MODEL | |
| if _WEAVIATE_EMBED_MODEL is None: | |
| if WEAVIATE_EMBEDDING == "openai": | |
| from llama_index.embeddings.openai import OpenAIEmbedding | |
| _WEAVIATE_EMBED_MODEL = OpenAIEmbedding(model=EMBEDDING_MODEL) | |
| else: | |
| from llama_index.embeddings.huggingface import HuggingFaceEmbedding | |
| _WEAVIATE_EMBED_MODEL = HuggingFaceEmbedding( | |
| model_name="sentence-transformers/all-MiniLM-L6-v2" | |
| ) | |
| return _WEAVIATE_EMBED_MODEL | |
| def _retrieve_from_weaviate(question: str, top_k: int = 5, timeout_sec: float = 45.0) -> str: | |
| if not USE_WEAVIATE_DIRECT or len(question.strip()) < 5: | |
| return "" | |
| def _call(): | |
| try: | |
| import weaviate | |
| from weaviate.classes.init import Auth | |
| from llama_index.core import Settings, VectorStoreIndex | |
| from llama_index.vector_stores.weaviate import WeaviateVectorStore | |
| Settings.embed_model = _get_weaviate_embed_model() | |
| client = weaviate.connect_to_weaviate_cloud( | |
| cluster_url=WEAVIATE_URL, | |
| auth_credentials=Auth.api_key(WEAVIATE_API_KEY), | |
| ) | |
| try: | |
| if not client.is_ready(): | |
| return "" | |
| vs = WeaviateVectorStore(weaviate_client=client, index_name=WEAVIATE_COLLECTION) | |
| index = VectorStoreIndex.from_vector_store(vs) | |
| nodes = index.as_retriever(similarity_top_k=top_k).retrieve(question) | |
| return "\n\n---\n\n".join(n.get_content() for n in nodes) if nodes else "" | |
| finally: | |
| client.close() | |
| except Exception as e: | |
| print(f"[weaviate] retrieve failed: {repr(e)}") | |
| return "" | |
| try: | |
| with concurrent.futures.ThreadPoolExecutor(max_workers=1) as ex: | |
| return ex.submit(_call).result(timeout=timeout_sec) | |
| except concurrent.futures.TimeoutError: | |
| print(f"[weaviate] timeout after {timeout_sec}s") | |
| return "" | |
| def _retrieve_from_genai_courses(question: str, top_k: int = 5, timeout_sec: float = 25.0) -> str: | |
| if not GENAI_COURSES_SPACE or len(question.strip()) < 5: | |
| return "" | |
| def _call(): | |
| try: | |
| from gradio_client import Client | |
| c = Client(GENAI_COURSES_SPACE) | |
| return (c.predict(question, api_name="/retrieve") or "").strip() | |
| except Exception as e: | |
| print(f"[genai_courses] failed: {repr(e)}") | |
| return "" | |
| try: | |
| with concurrent.futures.ThreadPoolExecutor(max_workers=1) as ex: | |
| return ex.submit(_call).result(timeout=timeout_sec) | |
| except concurrent.futures.TimeoutError: | |
| return "" | |
| def format_references(rag_chunks: List[Dict], max_files: int = 2, max_sections_per_file: int = 3) -> str: | |
| if not rag_chunks: | |
| return "\n".join(["**References:**", "- (No RAG context used. Answer is based on the model's general knowledge.)"]) | |
| chunks = list(rag_chunks) | |
| chunks.sort(key=lambda c: float(c.get("_rag_score", 0.0)), reverse=True) | |
| refs_by_file: Dict[str, List[str]] = defaultdict(list) | |
| for chunk in chunks: | |
| file_name = chunk.get("source_file") or "module10_responsible_ai.pdf" | |
| section = chunk.get("section") or "Related section" | |
| score = chunk.get("_rag_score") | |
| score_str = f" (score={float(score):.2f})" if score is not None else "" | |
| entry = section + score_str | |
| if entry not in refs_by_file[file_name]: | |
| refs_by_file[file_name].append(entry) | |
| if not refs_by_file: | |
| return "\n".join(["**References:**", "- (No RAG context used.)"]) | |
| lines = ["**References (RAG context used):**"] | |
| for i, (file_name, sections) in enumerate(refs_by_file.items()): | |
| if i >= max_files: | |
| break | |
| lines.append(f"- *{file_name}* — {'; '.join(sections[:max_sections_per_file])}") | |
| return "\n".join(lines) | |
| def is_academic_query(message: str) -> bool: | |
| if not message or not message.strip(): | |
| return False | |
| m = " ".join(message.strip().lower().split()) | |
| smalltalk = {"hi", "hello", "hey", "thanks", "thank", "ok", "okay", "bye", "goodbye", "haha", "lol"} | |
| tokens = m.split() | |
| if "?" not in m and all(t in smalltalk for t in tokens): | |
| return False | |
| meta = ["who are you", "what are you", "what is your name", "what can you do", "what is clare"] | |
| if any(p in m for p in meta): | |
| return False | |
| if len(tokens) <= 2 and "?" not in m: | |
| return False | |
| return True | |
| MODULE10_DOC_TYPE = "Literature Review / Paper" | |
| MICRO_QUIZ_INSTRUCTION = ( | |
| "We are running a short micro-quiz session based ONLY on **Module 10 – " | |
| "Responsible AI (Alto, 2024, Chapter 12)** and the pre-loaded materials.\n\n" | |
| "Step 1 – Before asking any content question:\n" | |
| "• First ask me which quiz style I prefer right now:\n" | |
| " - (1) Multiple-choice questions\n" | |
| " - (2) Short-answer / open-ended questions\n" | |
| "• Ask me explicitly: \"Which quiz style do you prefer now: 1) Multiple-choice or 2) Short-answer? " | |
| "Please reply with 1 or 2.\"\n" | |
| "• Do NOT start a content question until I have answered 1 or 2.\n\n" | |
| "Step 2 – After I choose the style:\n" | |
| "• If I choose 1 (multiple-choice):\n" | |
| " - Ask ONE multiple-choice question at a time, based on Module 10 concepts.\n" | |
| " - Provide 3–4 options (A, B, C, D) and make only one option clearly correct.\n" | |
| "• If I choose 2 (short-answer):\n" | |
| " - Ask ONE short-answer question at a time, also based on Module 10 concepts.\n" | |
| " - Do NOT show the answer when you ask the question.\n\n" | |
| "Step 3 – For each answer I give:\n" | |
| "• Grade my answer (correct / partially correct / incorrect).\n" | |
| "• Give a brief explanation and the correct answer.\n" | |
| "• Then ask if I want another question of the SAME style.\n\n" | |
| "Please start by asking me which quiz style I prefer (1 = multiple-choice, 2 = short-answer). " | |
| "Do not ask any content question before I choose." | |
| ) | |
| # ---------------------------- | |
| # Session store (in-memory) | |
| # ---------------------------- | |
| SESSIONS: Dict[str, Dict[str, Any]] = {} | |
| def _get_session(user_id: str) -> Dict[str, Any]: | |
| if user_id not in SESSIONS: | |
| SESSIONS[user_id] = { | |
| "user_id": user_id, | |
| "name": "", | |
| "history": [], | |
| "weaknesses": [], | |
| "cognitive_state": {"confusion": 0, "mastery": 0}, | |
| "course_outline": list(preloaded_topics) if preloaded_topics else list(DEFAULT_COURSE_TOPICS), | |
| "rag_chunks": list(preloaded_chunks) if preloaded_chunks else [], | |
| "model_name": DEFAULT_MODEL, | |
| "uploaded_files": [], | |
| "profile_bio": "", | |
| "init_answers": {}, | |
| "init_dismiss_until": 0, | |
| } | |
| return SESSIONS[user_id] | |
| # ---------------------------- | |
| # App | |
| # ---------------------------- | |
| app = FastAPI(title="ClareVoice API") | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| WEB_DIR = os.path.join(os.path.dirname(__file__), "web", "build") | |
| WEB_INDEX = os.path.join(WEB_DIR, "index.html") | |
| WEB_ASSETS = os.path.join(WEB_DIR, "assets") | |
| if os.path.isdir(WEB_ASSETS): | |
| app.mount("/assets", StaticFiles(directory=WEB_ASSETS), name="assets") | |
| if os.path.isdir(WEB_DIR): | |
| app.mount("/static", StaticFiles(directory=WEB_DIR), name="static") | |
| # ---------------------------- | |
| # Request models | |
| # ---------------------------- | |
| class LoginReq(BaseModel): | |
| name: str | |
| user_id: str | |
| class ChatReq(BaseModel): | |
| user_id: str | |
| message: str | |
| learning_mode: str = "Concept Explainer" | |
| language_preference: str = "Auto" | |
| doc_type: str = "Syllabus" | |
| class QuizStartReq(BaseModel): | |
| user_id: str | |
| language_preference: str = "Auto" | |
| doc_type: str = MODULE10_DOC_TYPE | |
| learning_mode: str = "quiz" | |
| class ExportReq(BaseModel): | |
| user_id: str | |
| learning_mode: str | |
| class SummaryReq(BaseModel): | |
| user_id: str | |
| learning_mode: str | |
| language_preference: str = "Auto" | |
| class TtsReq(BaseModel): | |
| user_id: str | |
| text: str | |
| voice: Optional[str] = "nova" | |
| class PodcastReq(BaseModel): | |
| user_id: str | |
| source: str = "summary" # "summary" | "conversation" | |
| voice: Optional[str] = "nova" | |
| title: Optional[str] = None # 自定义播客标题,用于开场白 | |
| class QuizGenerateContext(BaseModel): | |
| """教学上下文信息(用于 Generate,完整格式)""" | |
| courseId: Optional[int] = None | |
| moduleId: Optional[int] = None | |
| topics: Optional[List[str]] = None | |
| class QuizConfigurations(BaseModel): | |
| """生成配置(完整格式)""" | |
| questionCount: Optional[int] = None | |
| questionTypes: Optional[List[str]] = None | |
| language: Optional[str] = None # EN/CN 或 en/zh | |
| class QuizGenerateReq(BaseModel): | |
| """供外部网站调用的 Quiz 生成请求(支持简化格式和完整格式)""" | |
| # 简化格式字段 | |
| topic: Optional[str] = None | |
| num_questions: Optional[int] = None | |
| language: Optional[str] = None # en | zh | |
| # 完整格式字段(向后兼容) | |
| requestId: Optional[str] = None | |
| context: Optional[QuizGenerateContext] = None | |
| configurations: Optional[QuizConfigurations] = None | |
| class QuestionOption(BaseModel): | |
| """题目选项""" | |
| label: Optional[str] = None | |
| content: Optional[str] = None | |
| key: Optional[str] = None # 兼容简化格式 | |
| text: Optional[str] = None # 兼容简化格式 | |
| class QuestionContext(BaseModel): | |
| """题目上下文(用于 Hint)""" | |
| content: str | |
| options: Optional[List[QuestionOption]] = None | |
| class HintRequest(BaseModel): | |
| """Hint 请求""" | |
| requestId: Optional[str] = None | |
| questionContext: QuestionContext | |
| type: str = "HINT" | |
| language: Optional[str] = None # en | zh | |
| class CorePoint(BaseModel): | |
| """Rubric 核心要点""" | |
| point: str | |
| weight: float = 0.0 | |
| class CommonError(BaseModel): | |
| """Rubric 常见错误及扣分""" | |
| description: str | |
| deduction: float = 0.0 | |
| class AnswerScope(BaseModel): | |
| """Rubric 允许的回答范围""" | |
| minLength: Optional[int] = None | |
| maxLength: Optional[int] = None | |
| language: Optional[str] = None | |
| requireExample: Optional[bool] = None | |
| class QuestionRubric(BaseModel): | |
| """简答题题目 Rubric(动态传入)""" | |
| corePoints: Optional[List[dict]] = None # [{ "point": str, "weight": number }] | |
| acceptableSynonyms: Optional[List[str]] = None | |
| commonErrors: Optional[List[dict]] = None # [{ "description": str, "deduction": number }] | |
| answerScope: Optional[AnswerScope] = None | |
| class UserAnswer(BaseModel): | |
| """用户答案""" | |
| questionContent: Optional[str] = None | |
| questionId: Optional[str] = None | |
| userChoiceLabel: Optional[str] = None | |
| correctChoiceLabel: Optional[str] = None | |
| isCorrect: Optional[bool] = None | |
| userTextAnswer: Optional[str] = None | |
| referenceAnswer: Optional[str] = None | |
| rubric: Optional[QuestionRubric] = None # 简答题本题评分标准 | |
| class QuizContext(BaseModel): | |
| """测验上下文(用于 Grade)""" | |
| title: Optional[str] = None | |
| totalScore: Optional[float] = None | |
| maxScore: Optional[float] = None | |
| class GradeRequest(BaseModel): | |
| """判卷请求""" | |
| requestId: Optional[str] = None | |
| quizContext: Optional[QuizContext] = None | |
| userAnswers: List[UserAnswer] | |
| language: Optional[str] = None # en | zh | |
| class FeedbackReq(BaseModel): | |
| user_id: str | |
| rating: str | |
| run_id: Optional[str] = None | |
| assistant_message_id: Optional[str] = None | |
| assistant_text: str = "" | |
| user_text: Optional[str] = None | |
| comment: Optional[str] = None | |
| refs: Optional[List] = None | |
| tags: Optional[List] = None | |
| timestamp_ms: Optional[int] = None | |
| learning_mode: Optional[str] = None | |
| doc_type: Optional[str] = None | |
| # ---------------------------- | |
| # Routes | |
| # ---------------------------- | |
| def index(): | |
| if os.path.exists(WEB_INDEX): | |
| return FileResponse(WEB_INDEX) | |
| return JSONResponse({"detail": "web/build not found. Build frontend first."}, status_code=500) | |
| def login(req: LoginReq): | |
| user_id = (req.user_id or "").strip() | |
| name = (req.name or "").strip() | |
| if not user_id or not name: | |
| return JSONResponse({"ok": False, "error": "Missing name/user_id"}, status_code=400) | |
| sess = _get_session(user_id) | |
| sess["name"] = name | |
| return {"ok": True, "user": {"name": name, "user_id": user_id}} | |
| def chat(req: ChatReq): | |
| user_id = (req.user_id or "").strip() | |
| msg = (req.message or "").strip() | |
| if not user_id: | |
| return JSONResponse({"error": "Missing user_id"}, status_code=400) | |
| sess = _get_session(user_id) | |
| if not msg: | |
| return { | |
| "reply": "", | |
| "session_status_md": render_session_status( | |
| req.learning_mode, sess["weaknesses"], sess["cognitive_state"] | |
| ), | |
| "refs": [], | |
| "latency_ms": 0.0, | |
| "run_id": None, | |
| } | |
| t0 = time.time() | |
| resolved_lang = detect_language(msg, req.language_preference) | |
| sess["weaknesses"] = update_weaknesses_from_message(msg, sess["weaknesses"]) | |
| sess["cognitive_state"] = update_cognitive_state_from_message(msg, sess["cognitive_state"]) | |
| rag_context_text = "" | |
| rag_used_chunks: List[Dict] = [] | |
| if is_academic_query(msg): | |
| rag_context_text, rag_used_chunks = retrieve_relevant_chunks(msg, sess["rag_chunks"] or []) | |
| course_chunks = "" | |
| course_source = "" | |
| if USE_WEAVIATE_DIRECT: | |
| course_chunks = _retrieve_from_weaviate(msg) | |
| course_source = "Weaviate Cloud (GENAI COURSES)" | |
| elif GENAI_COURSES_SPACE: | |
| course_chunks = _retrieve_from_genai_courses(msg) | |
| course_source = "GenAICoursesDB" | |
| if course_chunks and course_source: | |
| rag_context_text = (rag_context_text or "") + "\n\n[来自 GENAI 课程知识库]\n\n" + course_chunks | |
| rag_used_chunks = list(rag_used_chunks or []) + [ | |
| {"source_file": course_source, "section": "retrieve (GENAI COURSES dataset)", "_rag_score": 1.0} | |
| ] | |
| try: | |
| answer, new_history = chat_with_clare( | |
| message=msg, | |
| history=sess["history"], | |
| model_name=sess["model_name"], | |
| language_preference=resolved_lang, | |
| learning_mode=req.learning_mode, | |
| doc_type=req.doc_type, | |
| course_outline=sess["course_outline"], | |
| weaknesses=sess["weaknesses"], | |
| cognitive_state=sess["cognitive_state"], | |
| rag_context=rag_context_text, | |
| ) | |
| except Exception as e: | |
| print(f"[chat] error: {repr(e)}") | |
| return JSONResponse({"error": f"chat failed: {repr(e)}"}, status_code=500) | |
| sess["history"] = new_history | |
| total_ms = (time.time() - t0) * 1000.0 | |
| ref_text = format_references(rag_used_chunks) if is_academic_query(msg) else "" | |
| if ref_text and new_history: | |
| last_u, last_a = new_history[-1] | |
| if "References (RAG context used):" not in (last_a or ""): | |
| answer = f"{last_a or ''}\n\n{ref_text}" | |
| refs = [{"source_file": c.get("source_file"), "section": c.get("section")} for c in (rag_used_chunks or [])] | |
| if not refs: | |
| refs = [{"source_file": "No RAG", "section": "Answer based on model general knowledge."}] | |
| return { | |
| "reply": answer, | |
| "session_status_md": render_session_status(req.learning_mode, sess["weaknesses"], sess["cognitive_state"]), | |
| "refs": refs, | |
| "latency_ms": total_ms, | |
| "run_id": None, | |
| } | |
| def quiz_start(req: QuizStartReq): | |
| user_id = (req.user_id or "").strip() | |
| if not user_id: | |
| return JSONResponse({"error": "Missing user_id"}, status_code=400) | |
| sess = _get_session(user_id) | |
| resolved_lang = detect_language(MICRO_QUIZ_INSTRUCTION, req.language_preference) | |
| quiz_ctx_text, _ = retrieve_relevant_chunks("Module 10 quiz", sess["rag_chunks"] or []) | |
| try: | |
| answer, new_history = chat_with_clare( | |
| message=MICRO_QUIZ_INSTRUCTION, | |
| history=sess["history"], | |
| model_name=sess["model_name"], | |
| language_preference=resolved_lang, | |
| learning_mode=req.learning_mode, | |
| doc_type=req.doc_type, | |
| course_outline=sess["course_outline"], | |
| weaknesses=sess["weaknesses"], | |
| cognitive_state=sess["cognitive_state"], | |
| rag_context=quiz_ctx_text, | |
| ) | |
| except Exception as e: | |
| print(f"[quiz] error: {repr(e)}") | |
| return JSONResponse({"error": str(e)}, status_code=500) | |
| sess["history"] = new_history | |
| return { | |
| "reply": answer, | |
| "session_status_md": render_session_status(req.learning_mode, sess["weaknesses"], sess["cognitive_state"]), | |
| "refs": [], | |
| "latency_ms": 0.0, | |
| "run_id": None, | |
| } | |
| async def upload( | |
| user_id: str = Form(...), | |
| doc_type: str = Form(...), | |
| file: UploadFile = File(...), | |
| ): | |
| user_id = (user_id or "").strip() | |
| doc_type = (doc_type or "").strip() | |
| if not user_id: | |
| return JSONResponse({"ok": False, "error": "Missing user_id"}, status_code=400) | |
| if not file or not file.filename: | |
| return JSONResponse({"ok": False, "error": "Missing file"}, status_code=400) | |
| sess = _get_session(user_id) | |
| safe_name = os.path.basename(file.filename).replace("..", "_") | |
| tmp_path = os.path.join("/tmp", safe_name) | |
| content = await file.read() | |
| with open(tmp_path, "wb") as f: | |
| f.write(content) | |
| if doc_type == "Syllabus": | |
| class _F: | |
| pass | |
| fo = _F() | |
| fo.name = tmp_path | |
| try: | |
| sess["course_outline"] = extract_course_topics_from_file(fo, doc_type) | |
| except Exception as e: | |
| print(f"[upload] syllabus parse error: {repr(e)}") | |
| try: | |
| new_chunks = build_rag_chunks_from_file(tmp_path, doc_type) or [] | |
| sess["rag_chunks"] = (sess["rag_chunks"] or []) + new_chunks | |
| except Exception as e: | |
| print(f"[upload] rag build error: {repr(e)}") | |
| new_chunks = [] | |
| sess.setdefault("uploaded_files", []).append({ | |
| "filename": safe_name, "doc_type": doc_type, "added_chunks": len(new_chunks), "ts": int(time.time()), | |
| }) | |
| return {"ok": True, "added_chunks": len(new_chunks), "status_md": f"✅ Loaded base reading + uploaded {doc_type} file."} | |
| def api_feedback(req: FeedbackReq): | |
| user_id = (req.user_id or "").strip() | |
| if not user_id: | |
| return JSONResponse({"ok": False, "error": "Missing user_id"}, status_code=400) | |
| if (req.rating or "").strip().lower() not in ("helpful", "not_helpful"): | |
| return JSONResponse({"ok": False, "error": "Invalid rating"}, status_code=400) | |
| return {"ok": True} | |
| def api_export(req: ExportReq): | |
| user_id = (req.user_id or "").strip() | |
| if not user_id: | |
| return JSONResponse({"error": "Missing user_id"}, status_code=400) | |
| sess = _get_session(user_id) | |
| md = export_conversation( | |
| sess["history"], sess["course_outline"], req.learning_mode, | |
| sess["weaknesses"], sess["cognitive_state"], | |
| ) | |
| return {"markdown": md} | |
| def api_summary(req: SummaryReq): | |
| user_id = (req.user_id or "").strip() | |
| if not user_id: | |
| return JSONResponse({"error": "Missing user_id"}, status_code=400) | |
| sess = _get_session(user_id) | |
| lang = (req.language_preference or "Auto").strip() | |
| if lang == "简体中文": | |
| lang = "中文" | |
| md = summarize_conversation( | |
| sess["history"], sess["course_outline"], sess["weaknesses"], | |
| sess["cognitive_state"], sess["model_name"], lang, | |
| ) | |
| return {"markdown": md} | |
| def api_tts(req: TtsReq): | |
| user_id = (req.user_id or "").strip() | |
| if not user_id: | |
| return JSONResponse({"error": "Missing user_id"}, status_code=400) | |
| text = (req.text or "").strip() | |
| if not text: | |
| return JSONResponse({"error": "Missing text"}, status_code=400) | |
| try: | |
| audio_bytes = text_to_speech(text, voice=req.voice or "nova") | |
| except Exception as e: | |
| print(f"[tts] error: {repr(e)}") | |
| return JSONResponse({"error": str(e)}, status_code=500) | |
| if not audio_bytes: | |
| return JSONResponse({"error": "No audio generated"}, status_code=500) | |
| return Response(content=audio_bytes, media_type="audio/mpeg") | |
| def api_podcast(req: PodcastReq): | |
| user_id = (req.user_id or "").strip() | |
| if not user_id: | |
| return JSONResponse({"error": "Missing user_id"}, status_code=400) | |
| sess = _get_session(user_id) | |
| source = (req.source or "summary").lower() | |
| voice = req.voice or "nova" | |
| title = (req.title or "").strip() or None | |
| try: | |
| if source == "conversation": | |
| script = build_podcast_script_from_history( | |
| sess["history"], | |
| intro_title=title or "Clare Learning Summary", | |
| ) | |
| else: | |
| md = summarize_conversation( | |
| sess["history"], sess["course_outline"], sess["weaknesses"], | |
| sess["cognitive_state"], sess["model_name"], "Auto", | |
| ) | |
| script = build_podcast_script_from_summary( | |
| md, | |
| intro_title=title or "Clare Summary Podcast", | |
| ) | |
| audio_bytes = generate_podcast_audio(script, voice=voice) | |
| except Exception as e: | |
| print(f"[podcast] error: {repr(e)}") | |
| return JSONResponse({"error": str(e)}, status_code=500) | |
| if not audio_bytes: | |
| return JSONResponse({"error": "No audio generated"}, status_code=500) | |
| return Response(content=audio_bytes, media_type="audio/mpeg") | |
| # 可选:设置 QUIZ_API_KEY 后,外部调用 /api/quiz/generate 需在 Header 带 X-API-Key | |
| QUIZ_API_KEY = (os.getenv("QUIZ_API_KEY") or "").strip() | |
| def quiz_generate(req: QuizGenerateReq, request: Request): | |
| """供外部网站调用的 AI Quiz 生成接口(无会话,无需 user_id)。 | |
| 支持两种格式: | |
| 1. 简化格式:{"topic": "...", "num_questions": 3, "language": "en"} | |
| 2. 完整格式:{"requestId": "...", "context": {...}, "configurations": {...}} | |
| """ | |
| if QUIZ_API_KEY: | |
| key = request.headers.get("X-API-Key") or request.headers.get("Authorization", "").replace("Bearer ", "") | |
| if (key or "").strip() != QUIZ_API_KEY: | |
| return JSONResponse( | |
| status_code=429, | |
| content={"code": 429, "error": {"type": "RATE_LIMIT", "reason": "missing_or_invalid_api_key"}}, | |
| ) | |
| # 解析请求参数(支持两种格式) | |
| topic = None | |
| num_questions = 3 | |
| language = "en" | |
| use_full_format = False | |
| # 检查是否使用完整格式 | |
| if req.context is not None or req.configurations is not None: | |
| use_full_format = True | |
| # 从完整格式中提取参数 | |
| if req.configurations: | |
| if req.configurations.questionCount is not None: | |
| num_questions = req.configurations.questionCount | |
| if req.configurations.language: | |
| lang = req.configurations.language.strip().upper() | |
| language = "zh" if lang in ("CN", "中文", "ZH") else "en" | |
| if req.context and req.context.topics: | |
| topic = ", ".join(req.context.topics) | |
| elif req.topic: | |
| topic = req.topic | |
| else: | |
| # 使用简化格式 | |
| if req.topic: | |
| topic = req.topic | |
| if req.num_questions is not None: | |
| num_questions = req.num_questions | |
| if req.language: | |
| language = req.language | |
| topic = (topic or "").strip() | |
| if not topic: | |
| return JSONResponse( | |
| status_code=422, | |
| content={"code": 422, "error": {"type": "INVALID_GENERATION", "reason": "topic_required"}}, | |
| ) | |
| t0 = time.time() | |
| try: | |
| questions, tokens_used = generate_quiz_for_external( | |
| topic=topic, | |
| num_questions=num_questions, | |
| language=language, | |
| ) | |
| except ValueError as e: | |
| return JSONResponse( | |
| status_code=422, | |
| content={"code": 422, "error": {"type": "INVALID_GENERATION", "reason": str(e).replace(" ", "_")}}, | |
| ) | |
| except Exception as e: | |
| print(f"[quiz_generate] error: {repr(e)}") | |
| return JSONResponse( | |
| status_code=500, | |
| content={"code": 500, "error": {"type": "MODEL_ERROR", "reason": "generation_failed"}}, | |
| ) | |
| latency_ms = (time.time() - t0) * 1000.0 | |
| meta = { | |
| "model": DEFAULT_MODEL, | |
| "model_version": "", | |
| "prompt_version": "quiz_generate_v1", | |
| "temperature": 0.4, | |
| "tokens_used": tokens_used, | |
| "latency_ms": round(latency_ms, 2), | |
| } | |
| # 转换题目格式以匹配接口文档 | |
| formatted_questions = [] | |
| for q in questions: | |
| formatted_q = { | |
| "question_id": q.get("question_id", ""), | |
| "type": q.get("type", "SINGLE_CHOICE"), | |
| "content": q.get("question_text", q.get("content", "")), # 支持两种字段名 | |
| "correct_answers": q.get("correct_answers", []), | |
| } | |
| # 转换 options 格式 | |
| if "options" in q and q["options"]: | |
| formatted_q["options"] = [] | |
| for opt in q["options"]: | |
| # 支持两种格式:{key, text} 或 {label, content} | |
| formatted_q["options"].append({ | |
| "label": opt.get("key", opt.get("label", "")), | |
| "content": opt.get("text", opt.get("content", "")), | |
| }) | |
| if "explanation" in q: | |
| formatted_q["explanation"] = q["explanation"] | |
| formatted_questions.append(formatted_q) | |
| # 根据请求格式返回相应格式的响应 | |
| if use_full_format: | |
| # 完整格式:使用 data 包装 | |
| return { | |
| "data": { | |
| "questions": formatted_questions, | |
| }, | |
| "meta": meta, | |
| } | |
| else: | |
| # 简化格式:直接返回 questions | |
| return { | |
| "questions": formatted_questions, | |
| "meta": meta, | |
| } | |
| def quiz_hint(req: HintRequest, request: Request): | |
| """生成题目提示(不泄露答案)。""" | |
| if QUIZ_API_KEY: | |
| key = request.headers.get("X-API-Key") or request.headers.get("Authorization", "").replace("Bearer ", "") | |
| if (key or "").strip() != QUIZ_API_KEY: | |
| return JSONResponse( | |
| status_code=429, | |
| content={"code": 429, "error": {"type": "RATE_LIMIT", "reason": "missing_or_invalid_api_key"}}, | |
| ) | |
| question_content = (req.questionContext.content or "").strip() | |
| if not question_content: | |
| return JSONResponse( | |
| status_code=422, | |
| content={"code": 422, "error": {"type": "INVALID_GENERATION", "reason": "question_content_required"}}, | |
| ) | |
| language = (req.language or "en").strip().lower() | |
| if language not in ("en", "zh", "中文"): | |
| language = "en" | |
| # 转换选项格式 | |
| options = [] | |
| if req.questionContext.options: | |
| for opt in req.questionContext.options: | |
| options.append({ | |
| "label": opt.label or opt.key or "", | |
| "content": opt.content or opt.text or "", | |
| }) | |
| t0 = time.time() | |
| try: | |
| hint_text, tokens_used = generate_hint_for_question( | |
| question_content=question_content, | |
| options=options if options else None, | |
| language=language, | |
| ) | |
| except Exception as e: | |
| print(f"[quiz_hint] error: {repr(e)}") | |
| return JSONResponse( | |
| status_code=500, | |
| content={"code": 500, "error": {"type": "MODEL_ERROR", "reason": "hint_generation_failed"}}, | |
| ) | |
| latency_ms = (time.time() - t0) * 1000.0 | |
| meta = { | |
| "model": DEFAULT_MODEL, | |
| "model_version": "", | |
| "prompt_version": "quiz_hint_v1", | |
| "temperature": 0.6, | |
| "tokens_used": tokens_used, | |
| "latency_ms": round(latency_ms, 2), | |
| } | |
| return { | |
| "data": { | |
| "hint": hint_text, | |
| }, | |
| "meta": meta, | |
| } | |
| def quiz_grade(req: GradeRequest, request: Request): | |
| """智能判卷:对用户提交的答案进行评分和反馈。""" | |
| if QUIZ_API_KEY: | |
| key = request.headers.get("X-API-Key") or request.headers.get("Authorization", "").replace("Bearer ", "") | |
| if (key or "").strip() != QUIZ_API_KEY: | |
| return JSONResponse( | |
| status_code=429, | |
| content={"code": 429, "error": {"type": "RATE_LIMIT", "reason": "missing_or_invalid_api_key"}}, | |
| ) | |
| if not req.userAnswers or len(req.userAnswers) == 0: | |
| return JSONResponse( | |
| status_code=422, | |
| content={"code": 422, "error": {"type": "INVALID_GENERATION", "reason": "user_answers_required"}}, | |
| ) | |
| language = (req.language or "en").strip().lower() | |
| if language not in ("en", "zh", "中文"): | |
| language = "en" | |
| # 转换用户答案格式(含简答题 rubric) | |
| user_answers = [] | |
| for ans in req.userAnswers: | |
| rubric_dict = None | |
| if ans.rubric: | |
| r = ans.rubric | |
| rubric_dict = { | |
| "corePoints": r.corePoints, | |
| "acceptableSynonyms": r.acceptableSynonyms, | |
| "commonErrors": r.commonErrors, | |
| "answerScope": r.answerScope.model_dump() if r.answerScope else None, | |
| } | |
| user_answers.append({ | |
| "questionContent": ans.questionContent or "", | |
| "questionId": ans.questionId, | |
| "userChoiceLabel": ans.userChoiceLabel, | |
| "correctChoiceLabel": ans.correctChoiceLabel, | |
| "isCorrect": ans.isCorrect, | |
| "userTextAnswer": ans.userTextAnswer, | |
| "referenceAnswer": ans.referenceAnswer, | |
| "rubric": rubric_dict, | |
| }) | |
| t0 = time.time() | |
| try: | |
| grading_result, tokens_used = grade_quiz_answers( | |
| user_answers=user_answers, | |
| language=language, | |
| ) | |
| except Exception as e: | |
| print(f"[quiz_grade] error: {repr(e)}") | |
| return JSONResponse( | |
| status_code=500, | |
| content={"code": 500, "error": {"type": "MODEL_ERROR", "reason": "grading_failed"}}, | |
| ) | |
| latency_ms = (time.time() - t0) * 1000.0 | |
| meta = { | |
| "model": DEFAULT_MODEL, | |
| "model_version": "", | |
| "prompt_version": "quiz_grade_v1", | |
| "temperature": 0.4, | |
| "tokens_used": tokens_used, | |
| "latency_ms": round(latency_ms, 2), | |
| } | |
| return { | |
| "data": grading_result, | |
| "meta": meta, | |
| } | |
| def memoryline(user_id: str): | |
| _ = _get_session((user_id or "").strip()) | |
| return {"next_review_label": "T+7", "progress_pct": 0.4} | |
| def profile_status(user_id: str): | |
| user_id = (user_id or "").strip() | |
| if not user_id: | |
| return JSONResponse({"error": "Missing user_id"}, status_code=400) | |
| sess = _get_session(user_id) | |
| bio = (sess.get("profile_bio") or "").strip() | |
| need_init = len(bio) <= 50 and (int(time.time()) >= int(sess.get("init_dismiss_until") or 0)) | |
| return {"need_init": need_init, "bio_length": len(bio)} | |
| def health(): | |
| return {"status": "ok"} | |
| def spa_fallback(full_path: str): | |
| if full_path.startswith("api/") or full_path.startswith("assets/") or full_path.startswith("static/"): | |
| return JSONResponse({"detail": "Not Found"}, status_code=404) | |
| if os.path.exists(WEB_INDEX): | |
| return FileResponse(WEB_INDEX) | |
| return JSONResponse({"detail": "web/build not found"}, status_code=500) | |