| from fastapi import FastAPI, HTTPException |
| from fastapi.middleware.cors import CORSMiddleware |
| from fastapi.staticfiles import StaticFiles |
| import time |
| import os |
| import threading |
| import csv |
| import re |
| from datetime import datetime |
| from typing import List, Optional, Dict, Any, TypedDict |
| from pydantic import BaseModel, Field |
| from dotenv import load_dotenv |
|
|
| |
| from langchain_google_genai import ChatGoogleGenerativeAI |
| from langchain_core.messages import HumanMessage, SystemMessage, AIMessage |
| from langchain_core.prompts import ChatPromptTemplate |
| from langgraph.graph import StateGraph, END |
|
|
| load_dotenv() |
|
|
| app = FastAPI(title="Educational Sentiment Chatbot API") |
|
|
| |
| app.add_middleware( |
| CORSMiddleware, |
| allow_origins=["*"], |
| allow_credentials=True, |
| allow_methods=["*"], |
| allow_headers=["*"], |
| ) |
|
|
| |
| classifier = None |
| model_status = "loading" |
| model_error = None |
|
|
| |
| ner_classifier = None |
| ner_status = "loading" |
| ner_error = None |
|
|
| def load_distilroberta(): |
| global classifier, model_status, model_error |
| try: |
| print("Loading j-hartmann/emotion-english-distilroberta-base model...") |
| |
| from transformers import pipeline |
| classifier = pipeline( |
| "text-classification", |
| model="j-hartmann/emotion-english-distilroberta-base", |
| top_k=None |
| ) |
| model_status = "ready" |
| print("DistilRoBERTa model loaded successfully!") |
| except Exception as e: |
| model_error = str(e) |
| model_status = "failed" |
| print(f"Error loading DistilRoBERTa model: {model_error}") |
|
|
| def load_ner_model(): |
| global ner_classifier, ner_status, ner_error |
| try: |
| print("Loading NER model (dslim/distilbert-NER)...") |
| from transformers import pipeline |
| ner_classifier = pipeline( |
| "ner", |
| model="dslim/distilbert-NER", |
| aggregation_strategy="simple" |
| ) |
| ner_status = "ready" |
| print("NER model loaded successfully!") |
| except Exception as e: |
| ner_error = str(e) |
| ner_status = "failed" |
| print(f"Error loading NER model: {ner_error}") |
|
|
| def scrub_pii(text: str) -> str: |
| if not text: |
| return text |
|
|
| |
| |
| text = re.sub(r'[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+', '[EMAIL]', text) |
| |
| text = re.sub(r'\b(?:\+?\d{1,3}[-.\s]?)?\(?\d{3}\)?[-.\s]?\d{3}[-.\s]?\d{4}\b', '[PHONE]', text) |
| |
| text = re.sub(r'\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b', '[IP_ADDRESS]', text) |
| |
| text = re.sub(r'\b\d{3}-\d{2}-\d{4}\b', '[SSN]', text) |
|
|
| |
| global ner_classifier |
| if ner_classifier is not None: |
| try: |
| entities = ner_classifier(text) |
| |
| entities = sorted(entities, key=lambda x: x["start"], reverse=True) |
| for ent in entities: |
| ent_type = ent["entity_group"] |
| if ent_type in ["PER", "LOC", "ORG"]: |
| start = ent["start"] |
| end = ent["end"] |
| text = text[:start] + f"[{ent_type}]" + text[end:] |
| except Exception as e: |
| print(f"NER PII scrub error: {e}") |
|
|
| return text |
|
|
| @app.on_event("startup") |
| def startup_event(): |
| threading.Thread(target=load_distilroberta, daemon=True).start() |
| threading.Thread(target=load_ner_model, daemon=True).start() |
|
|
| |
| class ChatMessage(BaseModel): |
| role: str |
| content: str |
|
|
| class ChatRequest(BaseModel): |
| message: str |
| gemini_api_key: Optional[str] = None |
| system_prompt: Optional[str] = None |
| history_a: Optional[List[ChatMessage]] = None |
| history_b: Optional[List[ChatMessage]] = None |
| history_c: Optional[List[ChatMessage]] = None |
| history_d: Optional[List[ChatMessage]] = None |
| selected_option: Optional[str] = "all" |
|
|
| class EmotionScore(BaseModel): |
| label: str |
| score: float |
|
|
| class SentimentDetailsA(BaseModel): |
| detected_sentiment: str |
| explanation: str |
|
|
| class SentimentDetailsB(BaseModel): |
| mapped_sentiment: str |
| raw_emotions: List[EmotionScore] |
|
|
| class ChatResponse(BaseModel): |
| sentiment_a: Optional[SentimentDetailsA] = None |
| response_a: Optional[str] = None |
| latency_a: Optional[float] = None |
| prompt_context_a: Optional[str] = None |
|
|
| sentiment_b: Optional[SentimentDetailsB] = None |
| response_b: Optional[str] = None |
| latency_b: Optional[float] = None |
| prompt_context_b: Optional[str] = None |
| |
| response_c: Optional[str] = None |
| latency_c: Optional[float] = None |
| prompt_context_c: Optional[str] = None |
|
|
| sentiment_d: Optional[SentimentDetailsB] = None |
| response_d: Optional[str] = None |
| latency_d: Optional[float] = None |
| prompt_context_d: Optional[str] = None |
|
|
| tokens_a: Optional[int] = None |
| tokens_b: Optional[int] = None |
| tokens_c: Optional[int] = None |
| tokens_d: Optional[int] = None |
|
|
| |
| class AgentState(TypedDict): |
| message: str |
| system_prompt: str |
| sentiment: str |
| explanation: str |
| response: str |
| input_tokens: int |
| output_tokens: int |
| history: List[ChatMessage] |
|
|
| |
| class SentimentAnalysis(BaseModel): |
| detected_sentiment: str = Field(description="Must be strictly one of: 'confusion', 'frustration', 'boredom', 'confidence', 'sadness', or 'neutral'.") |
| explanation: str = Field(description="An extremely concise, single-sentence explanation of why this sentiment was chosen to minimize tokens.") |
|
|
| class SentimentAndResponseB(BaseModel): |
| detected_sentiment: str = Field(description="Must be strictly one of: 'confusion', 'frustration', 'boredom', 'confidence', 'sadness', or 'neutral'.") |
| response: str = Field(description="Your Socratic tutor response. Adjust tone based on the detected sentiment. Keep it under 2 brief paragraphs.") |
|
|
| |
| def estimate_tokens(text: str) -> int: |
| return max(1, int(len(text) / 4.0)) |
|
|
| |
| def calculate_cost(input_tokens: int, output_tokens: int) -> float: |
| |
| input_cost = (input_tokens / 1_000_000.0) * 0.075 |
| output_cost = (output_tokens / 1_000_000.0) * 0.30 |
| return input_cost + output_cost |
|
|
| |
| MD_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), "sentiment_log.md") |
| MD_FILE_B = os.path.join(os.path.dirname(os.path.abspath(__file__)), "sentiment_log_b.md") |
|
|
| def log_to_md(question, sentiment_a, sentiment_b, sentiment_d, latency_a, latency_b, latency_c, latency_d, cost_a, cost_b, cost_c, cost_d, tokens_in_a, tokens_out_a, tokens_in_b, tokens_out_b, tokens_in_c, tokens_out_c, tokens_in_d, tokens_out_d, answer_a, answer_b, answer_c, answer_d, selected_option="all"): |
| target_file = MD_FILE_B if selected_option == "b" else MD_FILE |
| file_exists = os.path.exists(target_file) |
| try: |
| with open(target_file, mode="a", encoding="utf-8") as f: |
| if not file_exists: |
| if selected_option == "b": |
| f.write("# Sentiment Analysis Option B Log\n\n") |
| f.write("This file tracks Option B (Gemini Single-Pass) user queries, detected sentiments, latencies, estimated costs, and responses.\n\n") |
| else: |
| f.write("# Sentiment Analysis & Response Comparison Log\n\n") |
| f.write("This file tracks and compares user queries, detected sentiments, latencies, estimated costs, and responses across all options.\n\n") |
| |
| timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") |
| |
| f.write(f"## [{timestamp}] Query: \"{question}\"\n\n") |
| if selected_option == "b": |
| total_tokens_b = tokens_in_b + tokens_out_b |
| f.write("<table>\n") |
| f.write(" <thead>\n") |
| f.write(" <tr>\n") |
| f.write(" <th align=\"left\">Metric</th>\n") |
| f.write(" <th align=\"left\">Option B (Gemini Single-Pass)</th>\n") |
| f.write(" </tr>\n") |
| f.write(" </thead>\n") |
| f.write(" <tbody>\n") |
| f.write(" <tr>\n") |
| f.write(f" <td><strong>Detected Sentiment</strong></td>\n") |
| f.write(f" <td><code>{sentiment_b}</code></td>\n") |
| f.write(" </tr>\n") |
| f.write(" <tr>\n") |
| f.write(f" <td><strong>Latency</strong></td>\n") |
| f.write(f" <td>{round(latency_b, 3)}s</td>\n") |
| f.write(" </tr>\n") |
| f.write(" <tr>\n") |
| f.write(f" <td><strong>Estimated Cost</strong></td>\n") |
| f.write(f" <td><code>${cost_b:.7f}</code></td>\n") |
| f.write(" </tr>\n") |
| f.write(" <tr>\n") |
| f.write(f" <td><strong>Tokens Used</strong></td>\n") |
| f.write(f" <td>{total_tokens_b} ({tokens_in_b} in / {tokens_out_b} out)</td>\n") |
| f.write(" </tr>\n") |
| f.write(" </tbody>\n") |
| f.write("</table>\n\n") |
| f.write("### Option B Response\n") |
| f.write(f"{answer_b}\n\n") |
| f.write("---\n\n") |
| else: |
| total_tokens_a = tokens_in_a + tokens_out_a |
| total_tokens_b = tokens_in_b + tokens_out_b |
| total_tokens_c = tokens_in_c + tokens_out_c |
| total_tokens_d = tokens_in_d + tokens_out_d |
| f.write("<table>\n") |
| f.write(" <thead>\n") |
| f.write(" <tr>\n") |
| f.write(" <th align=\"left\">Metric</th>\n") |
| f.write(" <th align=\"left\">Option A (Gemini 3.1 Flash Lite Double-Pass)</th>\n") |
| f.write(" <th align=\"left\">Option B (Gemini Single-Pass)</th>\n") |
| f.write(" <th align=\"left\">Option C (DistilRoBERTa Distribution + Gemini)</th>\n") |
| f.write(" <th align=\"left\">Option D (DistilRoBERTa Classifier + Gemini)</th>\n") |
| f.write(" </tr>\n") |
| f.write(" </thead>\n") |
| f.write(" <tbody>\n") |
| f.write(" <tr>\n") |
| f.write(f" <td><strong>Detected Sentiment</strong></td>\n") |
| f.write(f" <td><code>{sentiment_a}</code></td>\n") |
| f.write(f" <td><code>{sentiment_b}</code></td>\n") |
| f.write(f" <td><code>Distribution Context</code></td>\n") |
| f.write(f" <td><code>{sentiment_d}</code></td>\n") |
| f.write(" </tr>\n") |
| f.write(" <tr>\n") |
| f.write(f" <td><strong>Latency</strong></td>\n") |
| f.write(f" <td>{round(latency_a, 3)}s</td>\n") |
| f.write(f" <td>{round(latency_b, 3)}s</td>\n") |
| f.write(f" <td>{round(latency_c, 3)}s</td>\n") |
| f.write(f" <td>{round(latency_d, 3)}s</td>\n") |
| f.write(" </tr>\n") |
| f.write(" <tr>\n") |
| f.write(f" <td><strong>Estimated Cost</strong></td>\n") |
| f.write(f" <td><code>${cost_a:.7f}</code></td>\n") |
| f.write(f" <td><code>${cost_b:.7f}</code></td>\n") |
| f.write(f" <td><code>${cost_c:.7f}</code></td>\n") |
| f.write(f" <td><code>${cost_d:.7f}</code></td>\n") |
| f.write(" </tr>\n") |
| f.write(" <tr>\n") |
| f.write(f" <td><strong>Tokens Used</strong></td>\n") |
| f.write(f" <td>{total_tokens_a} ({tokens_in_a} in / {tokens_out_a} out)</td>\n") |
| f.write(f" <td>{total_tokens_b} ({tokens_in_b} in / {tokens_out_b} out)</td>\n") |
| f.write(f" <td>{total_tokens_c} ({tokens_in_c} in / {tokens_out_c} out)</td>\n") |
| f.write(f" <td>{total_tokens_d} ({tokens_in_d} in / {tokens_out_d} out)</td>\n") |
| f.write(" </tr>\n") |
| f.write(" </tbody>\n") |
| f.write("</table>\n\n") |
| f.write("### Option A Response\n") |
| f.write(f"{answer_a}\n\n") |
| f.write("### Option B Response\n") |
| f.write(f"{answer_b}\n\n") |
| f.write("### Option C Response\n") |
| f.write(f"{answer_c}\n\n") |
| f.write("### Option D Response\n") |
| f.write(f"{answer_d}\n\n") |
| f.write("---\n\n") |
| except Exception as e: |
| print(f"Error writing to MD log: {e}") |
|
|
| |
| def get_text_content(content: Any) -> str: |
| if isinstance(content, str): |
| return content |
| elif isinstance(content, list): |
| text_parts = [] |
| for part in content: |
| if isinstance(part, dict) and part.get("type") == "text": |
| text_parts.append(part.get("text", "")) |
| elif isinstance(part, str): |
| text_parts.append(part) |
| return "".join(text_parts) |
| return str(content) |
|
|
| |
| def map_distilroberta_emotions(raw_emotions: List[Any]) -> str: |
| emo_dict = {} |
| for item in raw_emotions: |
| if isinstance(item, dict): |
| label = item.get("label", "").lower() |
| score = float(item.get("score", 0.0)) |
| else: |
| label = getattr(item, "label", "").lower() |
| score = float(getattr(item, "score", 0.0)) |
| emo_dict[label] = score |
|
|
| |
| |
| confusion_score = emo_dict.get("surprise", 0.0) * 1.2 + emo_dict.get("fear", 0.0) * 0.8 |
| |
| |
| frustration_score = emo_dict.get("anger", 0.0) * 1.2 + emo_dict.get("disgust", 0.0) * 0.8 |
| |
| |
| boredom_score = emo_dict.get("neutral", 0.0) * 1.3 + emo_dict.get("sadness", 0.0) * 0.2 |
| |
| |
| confidence_score = emo_dict.get("joy", 0.0) * 1.2 |
| |
| |
| sadness_score = emo_dict.get("sadness", 0.0) * 1.2 |
|
|
| scores = { |
| "confusion": confusion_score, |
| "frustration": frustration_score, |
| "boredom": boredom_score, |
| "confidence": confidence_score, |
| "sadness": sadness_score |
| } |
| |
| return max(scores, key=scores.get) |
|
|
| |
| def run_flow_a_langgraph(message: str, system_prompt: Optional[str], api_key: str, history: Optional[List[ChatMessage]] = None): |
| llm = ChatGoogleGenerativeAI( |
| model="gemini-3.1-flash-lite", |
| google_api_key=api_key, |
| temperature=0.0, |
| max_tokens=300 |
| ) |
| structured_llm = llm.with_structured_output(SentimentAnalysis) |
| |
| def detect_sentiment_node(state: AgentState) -> dict: |
| prompt = ChatPromptTemplate.from_messages([ |
| ("system", "Analyze the user's educational query. Determine their emotional state. Classify it strictly as one of: 'confusion', 'frustration', 'boredom', 'confidence', 'sadness', or 'neutral'. Keep the explanation extremely short and concise (under 10 words)."), |
| ("human", "{message}") |
| ]) |
| chain = prompt | structured_llm |
| res = chain.invoke({"message": state["message"]}) |
| |
| |
| |
| input_prompt = f"Analyze the user's educational query. Determine their emotional state. Classify it strictly as one of: 'confusion', 'frustration', 'boredom', 'confidence', 'sadness', or 'neutral'. {state['message']}" |
| est_input = estimate_tokens(input_prompt) |
| est_output = 40 |
| |
| return { |
| "sentiment": res.detected_sentiment.lower(), |
| "explanation": res.explanation, |
| "input_tokens": est_input, |
| "output_tokens": est_output |
| } |
| |
| def generate_response_node(state: AgentState) -> dict: |
| custom_system = state.get("system_prompt") or ( |
| "You are a concise, Socratic educational tutor. Your focus is strictly to teach. " |
| "NEVER give the user the direct answer or solution. Instead, guide them, nudge them, and ask leading questions to help them figure it out. " |
| "Adjust your behavior and tone based on the user's sentiment. Keep responses brief (max 5 sentences)." |
| ) |
| |
| sentiment = state["sentiment"] |
| tone_instruction = ( |
| "IMPORTANT: You are a Socratic tutor. NEVER directly state the answer, definition, or solution. " |
| "Instead, nudge the user and guide them to find the answer themselves through questions. " |
| "Be extremely concise and direct (strictly limit your response to max 5 sentences).\n" |
| ) |
| if sentiment == "confusion": |
| tone_instruction += "The user is confused. Give them a stronger, clearer hint to guide them, and ask a direct question to help them take the next step towards the answer without telling it to them." |
| elif sentiment == "sadness": |
| tone_instruction += "The user is sad. Give them brief, warm, empathetic encouragement and practical tips to overcome it (like taking a micro-break or focusing on progress), and ask a gentle guiding question to continue." |
| elif sentiment == "frustration": |
| tone_instruction += "The user is frustrated. Empathetically acknowledge their frustration, give them a helpful hint or alternative perspective, and ask a guiding question to help them work through it." |
| elif sentiment == "boredom": |
| tone_instruction += "The user is bored. Suggest a completely different way to learn this concept (e.g., through a hands-on project, analogy, or challenge) to spark interest, and ask a guiding question to get them started." |
| elif sentiment == "confidence": |
| tone_instruction += "The user is confident. Celebrate their success briefly, and offer a quick challenge or question to test their understanding." |
| else: |
| tone_instruction += "Ask a guiding question to nudge them towards the answer." |
| |
| prompt_context = f"{tone_instruction}\n\nUser Query: {state['message']}" |
| |
| messages = [SystemMessage(content=custom_system)] |
| |
| |
| if state.get("history"): |
| for msg in state["history"]: |
| if msg.role == "user": |
| messages.append(HumanMessage(content=msg.content)) |
| else: |
| messages.append(AIMessage(content=msg.content)) |
| |
| messages.append(HumanMessage(content=prompt_context)) |
| |
| res = llm.invoke(messages) |
| response_text = get_text_content(res.content) |
| |
| est_input = estimate_tokens(custom_system) + estimate_tokens(prompt_context) |
| est_output = estimate_tokens(response_text) |
| |
| return { |
| "response": response_text, |
| "input_tokens": state.get("input_tokens", 0) + est_input, |
| "output_tokens": state.get("output_tokens", 0) + est_output |
| } |
| |
| builder = StateGraph(AgentState) |
| builder.add_node("detect_sentiment", detect_sentiment_node) |
| builder.add_node("generate_response", generate_response_node) |
| builder.set_entry_point("detect_sentiment") |
| builder.add_edge("detect_sentiment", "generate_response") |
| builder.add_edge("generate_response", END) |
| |
| graph = builder.compile() |
| |
| initial_state = { |
| "message": message, |
| "system_prompt": system_prompt or "", |
| "sentiment": "", |
| "explanation": "", |
| "response": "", |
| "input_tokens": 0, |
| "output_tokens": 0, |
| "history": history or [] |
| } |
| |
| return graph.invoke(initial_state) |
|
|
| |
| def run_flow_b(message: str, system_prompt: Optional[str], api_key: str, history: Optional[List[ChatMessage]] = None): |
| import json |
| from datetime import datetime |
| |
| |
| llm = ChatGoogleGenerativeAI( |
| model="gemini-3.1-flash-lite", |
| google_api_key=api_key, |
| temperature=0.0, |
| max_tokens=350, |
| generation_config={"response_mime_type": "application/json"} |
| ) |
| |
| |
| custom_system = system_prompt or "Socratic tutor. Never give direct answers. Guide using leading questions." |
| |
| |
| tone_instruction = ( |
| "JSON: {\"state\": \"confusion|frustration|boredom|confidence\", \"reply\": \"string\"}\n" |
| "Rules: Socratic reply (max 5 sentences, no direct solutions). Acknowledge sentiment (confusion/frustration/boredom/confidence) with natural, varied phrasing. NEVER repeat the same acknowledgment templates (e.g., 'I understand', 'It's normal', 'Understandable').\n" |
| "- confusion: Acknowledge confusion + hint + guiding question.\n" |
| "- frustration: Validate frustration + alternative view + guiding question.\n" |
| "- boredom: Acknowledge boredom + analogy/challenge + guiding question.\n" |
| "- confidence: Praise + challenge question." |
| ) |
| |
| messages = [SystemMessage(content=f"{custom_system}\n{tone_instruction}")] |
| |
| if history: |
| for msg in history: |
| if msg.role == "user": |
| messages.append(HumanMessage(content=msg.content)) |
| else: |
| messages.append(AIMessage(content=msg.content)) |
| |
| messages.append(HumanMessage(content=message)) |
| |
| res = llm.invoke(messages) |
| raw_response = get_text_content(res.content) |
| |
| |
| cleaned_json = raw_response.strip() |
| |
| try: |
| parsed = json.loads(cleaned_json) |
| state_val = parsed.get("state", "confusion") |
| reply_val = parsed.get("reply", "") |
| |
| supabase_payload = { |
| "state": state_val, |
| "reply": reply_val, |
| "query": message, |
| "timestamp": datetime.now().isoformat() |
| } |
| print(f"[Supabase Prototype] Directly writing payload to tracking table: {json.dumps(supabase_payload)}") |
| except Exception as e: |
| print(f"Failed to parse LLM JSON response: {e}. Raw response: {raw_response}") |
| state_val = "confusion" |
| reply_val = "Let's take a look at this concept step by step. What do you think is the first part?" |
| |
| prompt_context = f"{custom_system}\n{tone_instruction}\nUser Query: {message}" |
| est_in = estimate_tokens(prompt_context) |
| est_out = estimate_tokens(raw_response) |
| |
| return state_val, reply_val, prompt_context, est_in, est_out |
|
|
|
|
| |
| def run_flow_c(message: str, system_prompt: Optional[str], api_key: str, raw_emotions: List[EmotionScore], history: Optional[List[ChatMessage]] = None): |
| llm = ChatGoogleGenerativeAI( |
| model="gemini-3.1-flash-lite", |
| google_api_key=api_key, |
| temperature=0.0, |
| max_tokens=300 |
| ) |
| custom_system = system_prompt or ( |
| "You are a concise, Socratic educational tutor. Your focus is strictly to teach. " |
| "NEVER give the user the direct answer or solution. Instead, guide them, nudge them, and ask leading questions to help them figure it out. " |
| "Adjust your behavior and tone based on the user's emotional state. Keep responses brief (max 5 sentences)." |
| ) |
| |
| |
| emotion_context_str = ", ".join([f"{item.label}: {item.score:.3f}" for item in raw_emotions]) |
| |
| tone_instruction = ( |
| "IMPORTANT: Socratic tutor. NEVER state answer/definition/solution. Nudge/guide using questions. " |
| "Max 5 sentences.\n" |
| f"User emotions: {emotion_context_str}.\n" |
| "Synthesize: If confusion (surprise/fear), give a stronger hint. If frustration (anger/disgust), be empathetic. " |
| "If boredom (neutral), suggest alternative hands-on/analogy path. If sadness, offer quick warm tips. If confidence (joy), challenge them." |
| ) |
| |
| prompt_context = f"{tone_instruction}\n\nUser Query: {message}" |
| |
| messages = [SystemMessage(content=custom_system)] |
| |
| |
| if history: |
| for msg in history: |
| if msg.role == "user": |
| messages.append(HumanMessage(content=msg.content)) |
| else: |
| messages.append(AIMessage(content=msg.content)) |
| |
| messages.append(HumanMessage(content=prompt_context)) |
| |
| res = llm.invoke(messages) |
| response_text = get_text_content(res.content) |
| |
| |
| est_input = estimate_tokens(custom_system) + estimate_tokens(prompt_context) |
| est_output = estimate_tokens(response_text) |
| |
| return response_text, prompt_context, est_input, est_output |
|
|
| |
| def run_flow_d(message: str, system_prompt: Optional[str], api_key: str, mapped_sentiment: str, history: Optional[List[ChatMessage]] = None): |
| llm = ChatGoogleGenerativeAI( |
| model="gemini-3.1-flash-lite", |
| google_api_key=api_key, |
| temperature=0.0, |
| max_tokens=300 |
| ) |
| custom_system = system_prompt or ( |
| "You are a concise, Socratic educational tutor. Your focus is strictly to teach. " |
| "NEVER give the user the direct answer or solution. Instead, guide them, nudge them, and ask leading questions to help them figure it out. " |
| "Adjust your behavior and tone based on the user's sentiment. Keep responses brief (max 5 sentences)." |
| ) |
| |
| tone_instruction = ( |
| "IMPORTANT: You are a Socratic tutor. NEVER directly state the answer, definition, or solution. " |
| "Instead, nudge the user and guide them to find the answer themselves through questions. " |
| "Be extremely concise and direct (strictly limit your response to max 5 sentences).\n" |
| ) |
| if mapped_sentiment == "confusion": |
| tone_instruction += "The user is confused. Give them a stronger, clearer hint to guide them, and ask a direct question to help them take the next step towards the answer without telling it to them." |
| elif mapped_sentiment == "sadness": |
| tone_instruction += "The user is sad. Give them brief, warm, empathetic encouragement and practical tips to overcome it (like taking a micro-break or focusing on progress), and ask a gentle guiding question to continue." |
| elif mapped_sentiment == "frustration": |
| tone_instruction += "The user is frustrated. Empathetically acknowledge their frustration, give them a helpful hint or alternative perspective, and ask a guiding question to help them work through it." |
| elif mapped_sentiment == "boredom": |
| tone_instruction += "The user is bored. Suggest a completely different way to learn this concept (e.g., through a hands-on project, analogy, or challenge) to spark interest, and ask a guiding question to get them started." |
| elif mapped_sentiment == "confidence": |
| tone_instruction += "The user is confident. Celebrate their success briefly, and offer a quick challenge or question to test their understanding." |
| else: |
| tone_instruction += "Ask a guiding question to nudge them towards the answer." |
| |
| prompt_context = f"{tone_instruction}\n\nUser Query: {message}" |
| |
| messages = [SystemMessage(content=custom_system)] |
| if history: |
| for msg in history: |
| if msg.role == "user": |
| messages.append(HumanMessage(content=msg.content)) |
| else: |
| messages.append(AIMessage(content=msg.content)) |
| messages.append(HumanMessage(content=prompt_context)) |
| |
| res = llm.invoke(messages) |
| response_text = get_text_content(res.content) |
| |
| est_input = estimate_tokens(custom_system) + estimate_tokens(prompt_context) |
| est_output = estimate_tokens(response_text) |
| |
| return response_text, prompt_context, est_input, est_output |
|
|
|
|
| |
| @app.get("/api/status") |
| def get_status(): |
| return { |
| "roberta_status": model_status, |
| "roberta_error": model_error, |
| "ner_status": ner_status, |
| "ner_error": ner_error, |
| "gemini_api_key_configured": bool(os.environ.get("GEMINI_API_KEY")) |
| } |
|
|
| @app.post("/api/chat", response_model=ChatResponse) |
| def chat_endpoint(request: ChatRequest): |
| |
| api_key = request.gemini_api_key or os.environ.get("GEMINI_API_KEY") |
| if not api_key: |
| raise HTTPException( |
| status_code=400, |
| detail="Gemini API Key is missing. Please provide it in the Settings panel." |
| ) |
| |
| |
| request.message = scrub_pii(request.message) |
| |
| |
| sentiment_details_a = None |
| response_a = None |
| latency_a = None |
| prompt_context_a = None |
| tokens_a = None |
|
|
| sentiment_details_b = None |
| response_b = None |
| latency_b = None |
| prompt_context_b = None |
| tokens_b = None |
|
|
| response_c = None |
| latency_c = None |
| prompt_context_c = None |
| tokens_c = None |
|
|
| sentiment_details_d = None |
| response_d = None |
| latency_d = None |
| prompt_context_d = None |
| tokens_d = None |
|
|
| |
| detected_sentiment_a = "N/A" |
| mapped_sentiment_b = "N/A" |
| mapped_sentiment_d = "N/A" |
| cost_a = 0.0 |
| cost_b = 0.0 |
| cost_c = 0.0 |
| cost_d = 0.0 |
| est_in_b = 0 |
| est_out_b = 0 |
| est_in_c = 0 |
| est_out_c = 0 |
| est_in_d = 0 |
| est_out_d = 0 |
|
|
| selected = request.selected_option.lower() if request.selected_option else "all" |
| run_a = (selected == "all" or selected == "a") |
| run_b = (selected == "all" or selected == "b") |
| run_c = (selected == "all" or selected == "c") |
| run_d = (selected == "all" or selected == "d") |
|
|
| |
| |
| |
| if run_a: |
| start_a = time.time() |
| try: |
| final_state_a = run_flow_a_langgraph( |
| message=request.message, |
| system_prompt=request.system_prompt, |
| api_key=api_key, |
| history=request.history_a |
| ) |
| |
| detected_sentiment_a = final_state_a["sentiment"] |
| explanation_a = final_state_a["explanation"] |
| response_a = final_state_a["response"] |
| |
| prompt_context_a = f"Detected Sentiment (LangGraph): {detected_sentiment_a}\nExplanation: {explanation_a}" |
| tokens_a = final_state_a.get("input_tokens", 0) + final_state_a.get("output_tokens", 0) |
| cost_a = calculate_cost(final_state_a["input_tokens"], final_state_a["output_tokens"]) |
| |
| sentiment_details_a = SentimentDetailsA( |
| detected_sentiment=detected_sentiment_a, |
| explanation=explanation_a |
| ) |
| except Exception as e: |
| print(f"Error in Flow A (LangGraph): {e}") |
| detected_sentiment_a = "neutral" |
| explanation_a = f"Error: {str(e)}" |
| response_a = "An error occurred during Flow A generation." |
| prompt_context_a = "N/A" |
| cost_a = 0.0 |
| sentiment_details_a = SentimentDetailsA( |
| detected_sentiment="neutral", |
| explanation=explanation_a |
| ) |
| latency_a = time.time() - start_a |
|
|
| |
| |
| |
| if run_b: |
| start_b = time.time() |
| try: |
| mapped_sentiment_b, response_b, prompt_context_b, est_in_b, est_out_b = run_flow_b( |
| message=request.message, |
| system_prompt=request.system_prompt, |
| api_key=api_key, |
| history=request.history_b |
| ) |
| cost_b = calculate_cost(est_in_b, est_out_b) |
| tokens_b = est_in_b + est_out_b |
| sentiment_details_b = SentimentDetailsB( |
| mapped_sentiment=mapped_sentiment_b, |
| raw_emotions=[] |
| ) |
| except Exception as e: |
| print(f"Flow B single-pass error: {e}") |
| mapped_sentiment_b = "neutral" |
| response_b = "An error occurred during Flow B generation." |
| prompt_context_b = "N/A" |
| cost_b = 0.0 |
| est_in_b = 0 |
| est_out_b = 0 |
| sentiment_details_b = SentimentDetailsB( |
| mapped_sentiment="neutral", |
| raw_emotions=[] |
| ) |
| latency_b = time.time() - start_b |
|
|
| |
| |
| |
| raw_emotions = [] |
| classifier_ran = False |
| |
| if run_c or run_d: |
| try: |
| if model_status == "loading": |
| raise HTTPException( |
| status_code=503, |
| detail="DistilRoBERTa model is still downloading/loading. Please wait a few seconds and try again." |
| ) |
| elif model_status == "failed" or classifier is None: |
| raise HTTPException( |
| status_code=500, |
| detail=f"DistilRoBERTa model is unavailable. Load error: {model_error}" |
| ) |
| |
| |
| classifier_results = classifier(request.message)[0] |
| raw_emotions = [ |
| EmotionScore(label=item["label"], score=float(item["score"])) |
| for item in classifier_results |
| ] |
| mapped_sentiment_d = map_distilroberta_emotions(classifier_results) |
| classifier_ran = True |
| except HTTPException as he: |
| raise he |
| except Exception as e: |
| print(f"DistilRoBERTa classification error: {e}") |
|
|
| |
| |
| |
| if run_c: |
| start_c = time.time() |
| try: |
| if not classifier_ran: |
| raise Exception("Classifier did not run successfully.") |
| response_c, prompt_context_c, est_in_c, est_out_c = run_flow_c( |
| message=request.message, |
| system_prompt=request.system_prompt, |
| api_key=api_key, |
| raw_emotions=raw_emotions, |
| history=request.history_c |
| ) |
| cost_c = calculate_cost(est_in_c, est_out_c) |
| tokens_c = est_in_c + est_out_c |
| except Exception as e: |
| print(f"Flow C error: {e}") |
| response_c = "An error occurred during Flow C generation." |
| prompt_context_c = "N/A" |
| cost_c = 0.0 |
| est_in_c = 0 |
| est_out_c = 0 |
| latency_c = time.time() - start_c |
|
|
| |
| |
| |
| if run_d: |
| start_d = time.time() |
| try: |
| if not classifier_ran: |
| raise Exception("Classifier did not run successfully.") |
| response_d, prompt_context_d, est_in_d, est_out_d = run_flow_d( |
| message=request.message, |
| system_prompt=request.system_prompt, |
| api_key=api_key, |
| mapped_sentiment=mapped_sentiment_d, |
| history=request.history_d |
| ) |
| cost_d = calculate_cost(est_in_d, est_out_d) |
| tokens_d = est_in_d + est_out_d |
| sentiment_details_d = SentimentDetailsB( |
| mapped_sentiment=mapped_sentiment_d, |
| raw_emotions=raw_emotions |
| ) |
| except Exception as e: |
| print(f"Flow D error: {e}") |
| response_d = "An error occurred during Flow D generation." |
| prompt_context_d = "N/A" |
| cost_d = 0.0 |
| est_in_d = 0 |
| est_out_d = 0 |
| sentiment_details_d = SentimentDetailsB( |
| mapped_sentiment="neutral", |
| raw_emotions=[] |
| ) |
| latency_d = time.time() - start_d |
|
|
| |
| log_to_md( |
| question=request.message, |
| sentiment_a=detected_sentiment_a, |
| sentiment_b=mapped_sentiment_b, |
| sentiment_d=mapped_sentiment_d, |
| latency_a=latency_a or 0.0, |
| latency_b=latency_b or 0.0, |
| latency_c=latency_c or 0.0, |
| latency_d=latency_d or 0.0, |
| cost_a=cost_a, |
| cost_b=cost_b, |
| cost_c=cost_c, |
| cost_d=cost_d, |
| tokens_in_a=final_state_a.get("input_tokens", 0) if (run_a and "final_state_a" in locals()) else 0, |
| tokens_out_a=final_state_a.get("output_tokens", 0) if (run_a and "final_state_a" in locals()) else 0, |
| tokens_in_b=est_in_b, |
| tokens_out_b=est_out_b, |
| tokens_in_c=est_in_c, |
| tokens_out_c=est_out_c, |
| tokens_in_d=est_in_d, |
| tokens_out_d=est_out_d, |
| answer_a=response_a or "Skipped", |
| answer_b=response_b or "Skipped", |
| answer_c=response_c or "Skipped", |
| answer_d=response_d or "Skipped", |
| selected_option=selected |
| ) |
|
|
| return ChatResponse( |
| sentiment_a=sentiment_details_a, |
| response_a=response_a, |
| latency_a=round(latency_a, 3) if latency_a is not None else None, |
| prompt_context_a=prompt_context_a, |
| |
| sentiment_b=sentiment_details_b, |
| response_b=response_b, |
| latency_b=round(latency_b, 3) if latency_b is not None else None, |
| prompt_context_b=prompt_context_b, |
| |
| response_c=response_c, |
| latency_c=round(latency_c, 3) if latency_c is not None else None, |
| prompt_context_c=prompt_context_c, |
| |
| sentiment_d=sentiment_details_d, |
| response_d=response_d, |
| latency_d=round(latency_d, 3) if latency_d is not None else None, |
| prompt_context_d=prompt_context_d, |
| |
| tokens_a=tokens_a, |
| tokens_b=tokens_b, |
| tokens_c=tokens_c, |
| tokens_d=tokens_d |
| ) |
|
|
| |
| frontend_dist_path = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "frontend", "dist") |
| if os.path.exists(frontend_dist_path): |
| app.mount("/", StaticFiles(directory=frontend_dist_path, html=True), name="frontend") |
|
|