AI_Agent_Final / api /clare_core.py
SarahXia0405's picture
Update api/clare_core.py
4ddc6e7 verified
# api/clare_core.py
import os
import re
import math
from typing import List, Dict, Tuple, Optional
from docx import Document
from .config import (
client,
DEFAULT_MODEL,
EMBEDDING_MODEL,
DEFAULT_COURSE_TOPICS,
CLARE_SYSTEM_PROMPT,
LEARNING_MODE_INSTRUCTIONS,
)
# ----------------------------
# Tracing toggle (LangSmith)
# ----------------------------
# Default OFF for speed + stability in HF cold start environments.
ENABLE_TRACING = os.getenv("CLARE_ENABLE_TRACING", "0").strip() == "1"
if ENABLE_TRACING:
from langsmith import traceable # type: ignore
from langsmith.run_helpers import set_run_metadata # type: ignore
try:
# Available in newer langsmith versions
from langsmith.run_helpers import get_current_run_tree # type: ignore
except Exception:
get_current_run_tree = None # type: ignore
else:
# no-op decorators / funcs
def traceable(*args, **kwargs): # type: ignore
def _decorator(fn):
return fn
return _decorator
def set_run_metadata(**kwargs): # type: ignore
return None
get_current_run_tree = None # type: ignore
# ----------------------------
# Speed knobs (simple + stable)
# ----------------------------
MAX_HISTORY_TURNS = int(os.getenv("CLARE_MAX_HISTORY_TURNS", "10"))
MAX_RAG_CHARS_IN_PROMPT = int(os.getenv("CLARE_MAX_RAG_CHARS", "2000"))
DEFAULT_MAX_OUTPUT_TOKENS = int(os.getenv("CLARE_MAX_OUTPUT_TOKENS", "384"))
# Similarity knobs
ENABLE_EMBEDDING_SIM = os.getenv("CLARE_ENABLE_EMBEDDING_SIMILARITY", "0").strip() == "1"
# ---------- syllabus 解析 ----------
def parse_syllabus_docx(file_path: str, max_lines: int = 15) -> List[str]:
topics: List[str] = []
try:
doc = Document(file_path)
for para in doc.paragraphs:
text = para.text.strip()
if not text:
continue
topics.append(text)
if len(topics) >= max_lines:
break
except Exception as e:
topics = [f"[Error parsing syllabus: {e}]"]
return topics
# ---------- 简单“弱项”检测 ----------
WEAKNESS_KEYWORDS = [
"don't understand",
"do not understand",
"not understand",
"not sure",
"confused",
"hard to",
"difficult",
"struggle",
"不会",
"不懂",
"看不懂",
"搞不清",
"很难",
]
# ---------- 简单“掌握”检测 ----------
MASTERY_KEYWORDS = [
"got it",
"makes sense",
"now i see",
"i see",
"understand now",
"clear now",
"easy",
"no problem",
"没问题",
"懂了",
"明白了",
"清楚了",
]
def update_weaknesses_from_message(message: str, weaknesses: List[str]) -> List[str]:
lower_msg = (message or "").lower()
if any(k in lower_msg for k in WEAKNESS_KEYWORDS):
weaknesses = weaknesses or []
weaknesses.append(message)
return weaknesses
def update_cognitive_state_from_message(
message: str,
state: Optional[Dict[str, int]],
) -> Dict[str, int]:
if state is None:
state = {"confusion": 0, "mastery": 0}
lower_msg = (message or "").lower()
if any(k in lower_msg for k in WEAKNESS_KEYWORDS):
state["confusion"] = state.get("confusion", 0) + 1
if any(k in lower_msg for k in MASTERY_KEYWORDS):
state["mastery"] = state.get("mastery", 0) + 1
return state
def describe_cognitive_state(state: Optional[Dict[str, int]]) -> str:
if not state:
return "unknown"
confusion = state.get("confusion", 0)
mastery = state.get("mastery", 0)
if confusion >= 2 and confusion >= mastery + 1:
return "student shows signs of HIGH cognitive load (often confused)."
elif mastery >= 2 and mastery >= confusion + 1:
return "student seems COMFORTABLE; material may be slightly easy."
else:
return "mixed or uncertain cognitive state."
# ---------- Session Memory ----------
def build_session_memory_summary(
history: List[Tuple[str, str]],
weaknesses: Optional[List[str]],
cognitive_state: Optional[Dict[str, int]],
max_questions: int = 3,
max_weaknesses: int = 2,
) -> str:
parts: List[str] = []
if history:
recent_qs = [u for (u, _a) in history[-max_questions:]]
trimmed_qs = []
for q in recent_qs:
q = (q or "").strip()
if len(q) > 120:
q = q[:117] + "..."
if q:
trimmed_qs.append(q)
if trimmed_qs:
parts.append("Recent student questions: " + " | ".join(trimmed_qs))
if weaknesses:
recent_weak = weaknesses[-max_weaknesses:]
trimmed_weak = []
for w in recent_weak:
w = (w or "").strip()
if len(w) > 120:
w = w[:117] + "..."
if w:
trimmed_weak.append(w)
if trimmed_weak:
parts.append("Recent difficulties: " + " | ".join(trimmed_weak))
if cognitive_state:
parts.append("Cognitive state: " + describe_cognitive_state(cognitive_state))
if not parts:
return "No prior session memory. Start with a short explanation and ask a quick check-up question."
return " | ".join(parts)
# ---------- 语言检测 ----------
def detect_language(message: str, preference: str) -> str:
if preference in ("English", "中文"):
return preference
if re.search(r"[\u4e00-\u9fff]", message or ""):
return "中文"
return "English"
def build_error_message(e: Exception, lang: str, op: str = "chat") -> str:
if lang == "中文":
prefix = {
"chat": "抱歉,刚刚在和模型对话时出现了一点问题。",
"quiz": "抱歉,刚刚在生成测验题目时出现了一点问题。",
"summary": "抱歉,刚刚在生成总结时出现了一点问题。",
}.get(op, "抱歉,刚刚出现了一点问题。")
return prefix + " 请稍后再试一次,或者换个问法试试。"
prefix_en = {
"chat": "Sorry, I ran into a problem while talking to the model.",
"quiz": "Sorry, there was a problem while generating the quiz.",
"summary": "Sorry, there was a problem while generating the summary.",
}.get(op, "Sorry, something went wrong just now.")
return prefix_en + " Please try again in a moment or rephrase your request."
# ---------- Session 状态展示 ----------
def render_session_status(
learning_mode: str,
weaknesses: Optional[List[str]],
cognitive_state: Optional[Dict[str, int]],
) -> str:
lines: List[str] = []
lines.append("### Session status\n")
lines.append(f"- Learning mode: **{learning_mode}**")
lines.append(f"- Cognitive state: {describe_cognitive_state(cognitive_state)}")
if weaknesses:
lines.append("- Recent difficulties (last 3):")
for w in weaknesses[-3:]:
lines.append(f" - {w}")
else:
lines.append("- Recent difficulties: *(none yet)*")
return "\n".join(lines)
# ---------- Similarity helpers ----------
def _normalize_text(text: str) -> str:
text = (text or "").lower().strip()
text = re.sub(r"[^\w\s]", " ", text)
text = re.sub(r"\s+", " ", text)
return text
def _jaccard_similarity(a: str, b: str) -> float:
tokens_a = set(a.split())
tokens_b = set(b.split())
if not tokens_a or not tokens_b:
return 0.0
return len(tokens_a & tokens_b) / len(tokens_a | tokens_b)
def cosine_similarity(a: List[float], b: List[float]) -> float:
if not a or not b or len(a) != len(b):
return 0.0
dot = sum(x * y for x, y in zip(a, b))
norm_a = math.sqrt(sum(x * x for x in a))
norm_b = math.sqrt(sum(y * y for y in b))
if norm_a == 0 or norm_b == 0:
return 0.0
return dot / (norm_a * norm_b)
@traceable(run_type="embedding", name="get_embedding")
def get_embedding(text: str) -> Optional[List[float]]:
try:
resp = client.embeddings.create(
model=EMBEDDING_MODEL,
input=[text],
)
return resp.data[0].embedding
except Exception as e:
print(f"[Embedding error] {repr(e)}")
return None
def find_similar_past_question(
message: str,
history: List[Tuple[str, str]],
jaccard_threshold: float = 0.65,
embedding_threshold: float = 0.85,
max_turns_to_check: int = 6,
) -> Optional[Tuple[str, str, float]]:
"""
Fast path:
- Always do Jaccard on normalized text for up to max_turns_to_check.
Optional path (disabled by default for speed/stability):
- Embedding-based similarity if ENABLE_EMBEDDING_SIM=1
"""
norm_msg = _normalize_text(message)
if not norm_msg:
return None
best_sim_j = 0.0
best_pair_j: Optional[Tuple[str, str]] = None
checked = 0
for user_q, assistant_a in reversed(history):
checked += 1
if checked > max_turns_to_check:
break
norm_hist_q = _normalize_text(user_q)
if not norm_hist_q:
continue
if norm_msg == norm_hist_q:
return user_q, assistant_a, 1.0
sim_j = _jaccard_similarity(norm_msg, norm_hist_q)
if sim_j > best_sim_j:
best_sim_j = sim_j
best_pair_j = (user_q, assistant_a)
if best_pair_j and best_sim_j >= jaccard_threshold:
return best_pair_j[0], best_pair_j[1], best_sim_j
# Optional: embedding similarity (OFF by default)
if not ENABLE_EMBEDDING_SIM:
return None
if not history:
return None
msg_emb = get_embedding(message)
if msg_emb is None:
return None
best_sim_e = 0.0
best_pair_e: Optional[Tuple[str, str]] = None
checked = 0
for user_q, assistant_a in reversed(history):
checked += 1
if checked > max_turns_to_check:
break
hist_emb = get_embedding(user_q)
if hist_emb is None:
continue
sim_e = cosine_similarity(msg_emb, hist_emb)
if sim_e > best_sim_e:
best_sim_e = sim_e
best_pair_e = (user_q, assistant_a)
if best_pair_e and best_sim_e >= embedding_threshold:
return best_pair_e[0], best_pair_e[1], best_sim_e
return None
@traceable(run_type="llm", name="safe_chat_completion")
def safe_chat_completion(
model_name: str,
messages: List[Dict[str, str]],
lang: str,
op: str = "chat",
temperature: float = 0.5,
max_tokens: Optional[int] = None,
) -> str:
preferred_model = model_name_or_default(model_name)
last_error: Optional[Exception] = None
max_tokens = int(max_tokens or DEFAULT_MAX_OUTPUT_TOKENS)
for attempt in range(2):
current_model = preferred_model if attempt == 0 else DEFAULT_MODEL
try:
resp = client.chat.completions.create(
model=current_model,
messages=messages,
temperature=temperature,
max_tokens=max_tokens,
timeout=20,
)
return resp.choices[0].message.content or ""
except Exception as e:
print(
f"[safe_chat_completion][{op}] attempt {attempt+1} failed with model={current_model}: {repr(e)}"
)
last_error = e
if current_model == DEFAULT_MODEL or attempt == 1:
break
return build_error_message(last_error or Exception("unknown error"), lang, op)
def build_messages(
user_message: str,
history: List[Tuple[str, str]],
language_preference: str,
learning_mode: str,
doc_type: str,
course_outline: Optional[List[str]],
weaknesses: Optional[List[str]],
cognitive_state: Optional[Dict[str, int]],
rag_context: Optional[str] = None,
) -> List[Dict[str, str]]:
messages: List[Dict[str, str]] = [{"role": "system", "content": CLARE_SYSTEM_PROMPT}]
if learning_mode in LEARNING_MODE_INSTRUCTIONS:
mode_instruction = LEARNING_MODE_INSTRUCTIONS[learning_mode]
messages.append(
{
"role": "system",
"content": f"Current learning mode: {learning_mode}. {mode_instruction}",
}
)
topics = course_outline if course_outline else DEFAULT_COURSE_TOPICS
topics_text = " | ".join(topics)
messages.append(
{
"role": "system",
"content": (
"Here is the course syllabus context. Use this to stay aligned "
"with the course topics when answering: " + topics_text
),
}
)
if doc_type and doc_type != "Syllabus":
messages.append(
{
"role": "system",
"content": f"The student also uploaded a {doc_type} document as supporting material.",
}
)
if weaknesses:
weak_text = " | ".join((weaknesses or [])[-4:])
messages.append(
{
"role": "system",
"content": "Student struggles (recent). Be extra clear on these: " + weak_text,
}
)
if cognitive_state:
confusion = cognitive_state.get("confusion", 0)
mastery = cognitive_state.get("mastery", 0)
if confusion >= 2 and confusion >= mastery + 1:
messages.append(
{
"role": "system",
"content": "Student under HIGH cognitive load. Use simpler language and shorter steps.",
}
)
elif mastery >= 2 and mastery >= confusion + 1:
messages.append(
{
"role": "system",
"content": "Student comfortable. You may go slightly deeper and add a follow-up question.",
}
)
if language_preference == "English":
messages.append({"role": "system", "content": "Please answer in English."})
elif language_preference == "中文":
messages.append({"role": "system", "content": "请用中文回答学生的问题。"})
session_memory_text = build_session_memory_summary(
history=history,
weaknesses=weaknesses,
cognitive_state=cognitive_state,
)
messages.append({"role": "system", "content": "Session memory: " + session_memory_text})
if rag_context:
rc = (rag_context or "")[:MAX_RAG_CHARS_IN_PROMPT]
messages.append(
{
"role": "system",
"content": "Relevant excerpts (use as primary grounding):\n\n" + rc,
}
)
trimmed_history = history[-MAX_HISTORY_TURNS:] if history else []
for user, assistant in trimmed_history:
messages.append({"role": "user", "content": user})
if assistant is not None:
messages.append({"role": "assistant", "content": assistant})
messages.append({"role": "user", "content": user_message})
return messages
def model_name_or_default(x: str) -> str:
return (x or "").strip() or DEFAULT_MODEL
def get_langsmith_run_id() -> Optional[str]:
"""
从 traceable 上下文里获取当前 run_id(用于把 UI feedback 挂到同一个 run 上)
若 tracing 关闭或环境不支持,则返回 None
"""
if not ENABLE_TRACING:
return None
try:
if get_current_run_tree is None:
return None
rt = get_current_run_tree()
if not rt:
return None
rid = getattr(rt, "id", None)
if not rid:
return None
return str(rid)
except Exception as e:
print(f"[LangSmith get run id error] {repr(e)}")
return None
@traceable(run_type="chain", name="chat_with_clare")
def chat_with_clare(
message: str,
history: List[Tuple[str, str]],
model_name: str,
language_preference: str,
learning_mode: str,
doc_type: str,
course_outline: Optional[List[str]],
weaknesses: Optional[List[str]],
cognitive_state: Optional[Dict[str, int]],
rag_context: Optional[str] = None,
) -> Tuple[str, List[Tuple[str, str]], Optional[str]]:
# avoid any tracing overhead when disabled (set_run_metadata is no-op in that case)
try:
set_run_metadata(
learning_mode=learning_mode,
language_preference=language_preference,
doc_type=doc_type,
)
except Exception as e:
# safe even if tracing enabled but misconfigured
print(f"[LangSmith metadata error in chat_with_clare] {repr(e)}")
messages = build_messages(
user_message=message,
history=history,
language_preference=language_preference,
learning_mode=learning_mode,
doc_type=doc_type,
course_outline=course_outline,
weaknesses=weaknesses,
cognitive_state=cognitive_state,
rag_context=rag_context,
)
answer = safe_chat_completion(
model_name=model_name,
messages=messages,
lang=language_preference,
op="chat",
temperature=0.5,
max_tokens=DEFAULT_MAX_OUTPUT_TOKENS,
)
# Get run_id AFTER the run exists (works when tracing enabled; otherwise None)
run_id = get_langsmith_run_id()
history = history + [(message, answer)]
return answer, history, run_id
def export_conversation(
history: List[Tuple[str, str]],
course_outline: List[str],
learning_mode_val: str,
weaknesses: List[str],
cognitive_state: Optional[Dict[str, int]],
) -> str:
lines: List[str] = []
lines.append("# Clare – Conversation Export\n")
lines.append(f"- Learning mode: **{learning_mode_val}**\n")
lines.append("- Course topics (short): " + "; ".join(course_outline[:5]) + "\n")
lines.append(f"- Cognitive state snapshot: {describe_cognitive_state(cognitive_state)}\n")
if weaknesses:
lines.append("- Observed student difficulties:\n")
for w in weaknesses[-5:]:
lines.append(f" - {w}\n")
lines.append("\n---\n\n")
for user, assistant in history:
lines.append(f"**Student:** {user}\n\n")
lines.append(f"**Clare:** {assistant}\n\n")
lines.append("---\n\n")
return "".join(lines)
@traceable(run_type="chain", name="summarize_conversation")
def summarize_conversation(
history: List[Tuple[str, str]],
course_outline: List[str],
weaknesses: List[str],
cognitive_state: Optional[Dict[str, int]],
model_name: str,
language_preference: str,
) -> str:
conversation_text = ""
for user, assistant in history[-10:]:
conversation_text += f"Student: {user}\nClare: {assistant}\n"
topics_text = "; ".join(course_outline[:8])
weakness_text = "; ".join(weaknesses[-5:]) if weaknesses else "N/A"
cog_text = describe_cognitive_state(cognitive_state)
messages = [
{"role": "system", "content": CLARE_SYSTEM_PROMPT},
{"role": "system", "content": "Produce a concept-only summary. Use bullet points. No off-topic text."},
{"role": "system", "content": f"Course topics: {topics_text}"},
{"role": "system", "content": f"Student difficulties: {weakness_text}"},
{"role": "system", "content": f"Cognitive state: {cog_text}"},
{"role": "user", "content": "Conversation:\n\n" + conversation_text},
]
if language_preference == "中文":
messages.append({"role": "system", "content": "请用中文输出要点总结(bullet points)。"})
summary_text = safe_chat_completion(
model_name=model_name,
messages=messages,
lang=language_preference,
op="summary",
temperature=0.4,
max_tokens=DEFAULT_MAX_OUTPUT_TOKENS,
)
return summary_text