File size: 14,370 Bytes
c622774 131da12 c622774 131da12 c622774 131da12 c622774 131da12 c622774 131da12 c622774 131da12 c622774 131da12 c622774 131da12 c622774 131da12 c622774 131da12 c622774 131da12 c622774 131da12 c622774 131da12 c622774 131da12 0389c2d a39c156 131da12 c622774 d507933 c622774 131da12 d507933 c622774 d507933 e9f83a3 d507933 131da12 c622774 131da12 c622774 131da12 c622774 131da12 c622774 131da12 c622774 131da12 c622774 131da12 c622774 131da12 c622774 131da12 c622774 131da12 c622774 131da12 c622774 131da12 c622774 131da12 c622774 131da12 c622774 131da12 c622774 131da12 c622774 131da12 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 | from fastapi import FastAPI, HTTPException, WebSocket, WebSocketDisconnect
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
import time
import os
import re
import asyncio
import base64
from datetime import datetime
from typing import List, Optional, Any
from pydantic import BaseModel
from dotenv import load_dotenv
# LangChain / Google GenAI imports
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain_core.messages import HumanMessage, SystemMessage, AIMessage
load_dotenv()
app = FastAPI(title="Socratic Sentiment Chatbot API")
# Enable CORS for frontend integration
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Pydantic Schemas
class ChatMessage(BaseModel):
role: str # "user" or "assistant"
content: str
class ChatRequest(BaseModel):
message: str
gemini_api_key: Optional[str] = None
history: Optional[List[ChatMessage]] = None
class ChatResponse(BaseModel):
sentiment: str
response: str
latency: float
prompt_context: str
tokens: int
cost: float
# Token estimation helper (using standard ~4 characters per token multiplier for English)
def estimate_tokens(text: str) -> int:
return max(1, int(len(text) / 4.0))
# Cost calculation helper
def calculate_cost(input_tokens: int, output_tokens: int) -> float:
# Gemini 3.1 Flash Lite pricing ($0.075/1M input tokens, $0.30/1M output tokens)
input_cost = (input_tokens / 1_000_000.0) * 0.075
output_cost = (output_tokens / 1_000_000.0) * 0.30
return input_cost + output_cost
# Helper to extract text from LangChain message content
def get_text_content(content: Any) -> str:
if isinstance(content, str):
return content
elif isinstance(content, list):
text_parts = []
for part in content:
if isinstance(part, dict) and part.get("type") == "text":
text_parts.append(part.get("text", ""))
elif isinstance(part, str):
text_parts.append(part)
return "".join(text_parts)
return str(content)
# Regex PII scrubbing helper
def scrub_pii(text: str) -> str:
if not text:
return text
# Email addresses
text = re.sub(r'[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+', '[EMAIL]', text)
# Phone numbers
text = re.sub(r'\b(?:\+?\d{1,3}[-.\s]?)?\(?\d{3}\)?[-.\s]?\d{3}[-.\s]?\d{4}\b', '[PHONE]', text)
# IP Addresses
text = re.sub(r'\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b', '[IP_ADDRESS]', text)
# SSNs
text = re.sub(r'\b\d{3}-\d{2}-\d{4}\b', '[SSN]', text)
return text
# Markdown Logging helper
MD_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), "sentiment_log.md")
def log_to_md(question: str, sentiment: str, latency: float, cost: float, tokens_in: int, tokens_out: int, reply: str):
file_exists = os.path.exists(MD_FILE)
try:
with open(MD_FILE, mode="a", encoding="utf-8") as f:
if not file_exists:
f.write("# Socratic Chatbot Sentiment & Response Log\n\n")
f.write("This file tracks detected user sentiments, response latencies, costs, and Socratic replies.\n\n")
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
f.write(f"## [{timestamp}] Query: \"{question}\"\n\n")
f.write("<table>\n")
f.write(" <thead>\n")
f.write(" <tr><th align=\"left\">Metric</th><th align=\"left\">Value</th></tr>\n")
f.write(" </thead>\n")
f.write(" <tbody>\n")
f.write(f" <tr><td><strong>Detected Sentiment</strong></td><td><code>{sentiment}</code></td></tr>\n")
f.write(f" <tr><td><strong>Latency</strong></td><td>{round(latency, 3)}s</td></tr>\n")
f.write(f" <tr><td><strong>Estimated Cost</strong></td><td><code>${cost:.7f}</code></td></tr>\n")
f.write(f" <tr><td><strong>Tokens</strong></td><td>{tokens_in + tokens_out} ({tokens_in} in / {tokens_out} out)</td></tr>\n")
f.write(" </tbody>\n")
f.write("</table>\n\n")
f.write(f"### Socratic Tutor Reply\n{reply}\n\n")
f.write("---\n\n")
except Exception as e:
print(f"Error writing to MD log: {e}")
# Option B response helper doing both sentiment detection and response generation in one pass
def run_flow_b(message: str, api_key: str, history: Optional[List[ChatMessage]] = None):
import json
# Enforce structural JSON natively.
llm = ChatGoogleGenerativeAI(
model="gemini-3.1-flash-lite",
google_api_key=api_key,
temperature=0.5,
max_tokens=450,
generation_config={"response_mime_type": "application/json"}
)
custom_system = (
"Socratic tutor: guide with clear, substantial hints. Continue unless close. "
"If close: give answer & ask: 'Do you want to learn something else?'"
)
tone_instruction = (
"JSON: {\"s\":\"sentiment\",\"r\":\"reply\"}\n"
"s values: confusion|frustration|confused_but_engaged|confused_and_frustrated|starting_to_get_bored|confident_and_engaged|neutral\n"
"Rules:\n"
"- Sympathize with s implicitly (tone/style); never name or mention the sentiment/emotion itself.\n"
"- NEVER use 'if you' (use direct phrasing: 'think about', 'imagine').\n"
"- Ask 1 question max.\n"
"Responses:\n"
"- frustration: acknowledge sentiment indirectly + simplify + question.\n"
"- starting_to_get_bored: acknowledge sentiment indirectly + puzzle/analogy + question.\n"
"- other: hint + question."
)
messages = [SystemMessage(content=f"{custom_system}\n\n{tone_instruction}")]
# Minimize tokens: slice history to last 4 messages and truncate to 60 characters
if history:
compact_history = history[-4:]
for msg in compact_history:
content = msg.content
if len(content) > 60:
content = content[:60] + "..."
if msg.role == "user":
messages.append(HumanMessage(content=content))
else:
messages.append(AIMessage(content=content))
messages.append(HumanMessage(content=message))
res = llm.invoke(messages)
raw_response = get_text_content(res.content)
cleaned_json = raw_response.strip()
try:
parsed = json.loads(cleaned_json)
state_val = parsed.get("s", "neutral")
reply_val = parsed.get("r", "")
except Exception as e:
print(f"Failed to parse LLM JSON response: {e}. Raw response: {raw_response}")
state_val = "neutral"
reply_val = "Let's take a look at this concept step by step. What do you think is the first part?"
prompt_context = f"{custom_system}\n{tone_instruction}\nUser Query: {message}"
est_in = estimate_tokens(prompt_context)
est_out = estimate_tokens(raw_response)
return state_val, reply_val, prompt_context, est_in, est_out
# API Routes
@app.get("/api/status")
def get_status():
return {
"status": "ready",
"gemini_api_key_configured": bool(os.environ.get("GEMINI_API_KEY"))
}
@app.post("/api/chat", response_model=ChatResponse)
def chat_endpoint(request: ChatRequest):
# Retrieve Gemini API Key
api_key = request.gemini_api_key or os.environ.get("GEMINI_API_KEY")
if not api_key:
raise HTTPException(
status_code=400,
detail="Gemini API Key is missing. Please provide it in the Settings panel or environment."
)
start_time = time.time()
# Scrub PII
scrubbed_message = scrub_pii(request.message)
try:
sentiment, reply, prompt_context, est_in, est_out = run_flow_b(
message=scrubbed_message,
api_key=api_key,
history=request.history
)
latency = time.time() - start_time
cost = calculate_cost(est_in, est_out)
tokens = est_in + est_out
# Log to Markdown
log_to_md(
question=request.message,
sentiment=sentiment,
latency=latency,
cost=cost,
tokens_in=est_in,
tokens_out=est_out,
reply=reply
)
return ChatResponse(
sentiment=sentiment,
response=reply,
latency=round(latency, 3),
prompt_context=prompt_context,
tokens=tokens,
cost=cost
)
except Exception as e:
print(f"Chat endpoint error: {e}")
raise HTTPException(
status_code=500,
detail=f"An error occurred: {str(e)}"
)
# WebSocket Endpoint for Socratic voice dialogue via Gemini Multimodal Live API
@app.websocket("/api/live-ws")
async def websocket_live_endpoint(websocket: WebSocket):
await websocket.accept()
# Retrieve Gemini API Key from query params or environment
api_key = websocket.query_params.get("api_key") or os.environ.get("GEMINI_API_KEY")
if not api_key:
await websocket.close(code=4000, reason="GEMINI_API_KEY is missing.")
return
try:
from google import genai
from google.genai import types
except ImportError:
await websocket.close(code=4001, reason="google-genai SDK not installed.")
return
client = genai.Client(api_key=api_key)
# Configure Socratic Tutor instruction for Gemini Live API
config = types.LiveConnectConfig(
response_modalities=["AUDIO"], # Audio modality
system_instruction=types.Content(
parts=[types.Part.from_text(
text="Socratic tutor: guide with clear, substantial hints (no tiny nudges) to solve faster. "
"Confidence is not mastery—continue Socratic hints unless they are close. "
"Only when close to the solution, give the final answer & ask: 'Do you want to learn something else?' "
"NEVER use the phrase 'if you' anywhere in your response (e.g. do not say 'if you think', 'if you were', etc.). Instead, frame instructions or scenarios directly (e.g., say 'think about', 'imagine', 'when looking at', or 'sometimes'). "
"Only ask one question at a time to avoid overwhelming the user. "
"Keep replies extremely concise (maximum 3 brief sentences) and conversational."
)]
)
)
try:
# Establish async WebSocket connection to Gemini Live using the Gemini 3.1 Flash Live model
async with client.aio.live.connect(model="gemini-3.1-flash-live-preview", config=config) as session:
async def receive_from_client():
try:
while True:
# Receive JSON from browser client
message = await websocket.receive_json()
msg_type = message.get("type")
if msg_type == "audio":
# Decode base64 PCM audio chunk sent from frontend
audio_bytes = base64.b64decode(message["data"])
# Stream real-time audio (using 'audio' instead of deprecated 'media')
await session.send_realtime_input(
audio=types.Blob(data=audio_bytes, mime_type="audio/pcm;rate=16000")
)
elif msg_type == "text":
# Send real-time text input
await session.send_realtime_input(text=message["data"])
except WebSocketDisconnect:
pass
except Exception as e:
print(f"[WebSocket Proxy Client -> Gemini] Error: {e}")
async def send_to_client():
try:
async for response in session.receive():
server_content = response.server_content
if server_content is not None:
model_turn = server_content.model_turn
if model_turn is not None:
for part in model_turn.parts:
if part.inline_data is not None:
# Stream PCM audio output back to client as Base64
audio_b64 = base64.b64encode(part.inline_data.data).decode('utf-8')
await websocket.send_json({
"type": "audio",
"data": audio_b64
})
elif part.text is not None:
# Stream text transcription back to client
await websocket.send_json({
"type": "text",
"data": part.text
})
# Handle turn completion (model finished speaking)
if server_content.turn_complete:
await websocket.send_json({"type": "turn_complete"})
except Exception as e:
print(f"[WebSocket Proxy Gemini -> Client] Error: {e}")
# Run both tasks concurrently
await asyncio.gather(receive_from_client(), send_to_client())
except Exception as e:
print(f"WebSocket Gemini Live connection failed: {e}")
finally:
try:
await websocket.close()
except Exception:
pass
# Mount frontend static files in production if dist folder is built
frontend_dist_path = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "frontend", "dist")
if os.path.exists(frontend_dist_path):
app.mount("/", StaticFiles(directory=frontend_dist_path, html=True), name="frontend")
|