chatbot / server.py
Pravin30994's picture
Upload server.py
e2727f6 verified
"""
Career Discovery AI β€” Production Backend
=========================================
Models
STT : UsefulSensors/moonshine-base
LLM : microsoft/Phi-3.5-mini-instruct
TTS : hexgrad/Kokoro-82M
Run
pip install -r requirements.txt
python server.py
Endpoints
GET /api/health
GET /api/models/status
POST /api/stt β€” audio file β†’ { text }
POST /api/llm β€” { messages, student } β†’ { reply, insights }
POST /api/tts β€” { text } β†’ audio/wav stream
POST /api/report β€” { student, history } β†’ { report }
POST /api/report/pdf β€” { student, history } β†’ application/pdf
"""
import io, json, re, time, logging, wave, struct
from contextlib import asynccontextmanager
from typing import List, Optional, Dict, Any
import numpy as np
import torch
import torchaudio
from fastapi import FastAPI, UploadFile, File, HTTPException, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import StreamingResponse, JSONResponse
from pydantic import BaseModel
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
log = logging.getLogger("career-ai")
# ─────────────────────────────────────────
# Global model holders
# ─────────────────────────────────────────
MODELS: Dict[str, Any] = {}
def load_stt():
from transformers import pipeline as hf_pipeline
log.info("Loading STT (moonshine-base)…")
device = 0 if torch.cuda.is_available() else -1
pipe = hf_pipeline(
"automatic-speech-recognition",
model="UsefulSensors/moonshine-base",
device=device,
)
log.info("STT ready βœ“")
return pipe
def load_llm():
from transformers import AutoTokenizer, AutoModelForCausalLM
model_id = "microsoft/Phi-3.5-mini-instruct"
log.info("Loading LLM (Phi-3.5-mini-instruct)…")
tokenizer = AutoTokenizer.from_pretrained(model_id, trust_remote_code=True)
model = AutoModelForCausalLM.from_pretrained(
model_id,
torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32,
device_map="auto",
trust_remote_code=True,
)
log.info("LLM ready βœ“")
return tokenizer, model
def load_tts():
from transformers import pipeline as hf_pipeline
log.info("Loading TTS (Kokoro-82M)…")
device = 0 if torch.cuda.is_available() else -1
pipe = hf_pipeline(
"text-to-speech",
model="hexgrad/Kokoro-82M",
device=device,
)
log.info("TTS ready βœ“")
return pipe
@asynccontextmanager
async def lifespan(app: FastAPI):
# ── startup ──
MODELS["stt"] = load_stt()
tok, mdl = load_llm()
MODELS["tok"] = tok
MODELS["llm"] = mdl
MODELS["tts"] = load_tts()
log.info("All models loaded β€” server ready πŸš€")
yield
# ── shutdown ──
log.info("Shutting down…")
# ─────────────────────────────────────────
# App
# ─────────────────────────────────────────
app = FastAPI(title="Career Discovery AI", version="2.0", lifespan=lifespan)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # tighten in production
allow_methods=["*"],
allow_headers=["*"],
)
# ─────────────────────────────────────────
# Pydantic schemas
# ─────────────────────────────────────────
class StudentInfo(BaseModel):
name: str
grade: str
curriculum: str
subjects: Optional[str] = ""
interests: Optional[str] = ""
class ChatMessage(BaseModel):
role: str # "user" | "assistant"
content: str
class LLMRequest(BaseModel):
student: StudentInfo
messages: List[ChatMessage] # full history from frontend
phase: str = "interests"
class TTSRequest(BaseModel):
text: str
speed: Optional[float] = 1.0
class ReportRequest(BaseModel):
student: StudentInfo
history: List[ChatMessage]
# ─────────────────────────────────────────
# Helper β€” numpy/bytes β†’ WAV bytes
# ─────────────────────────────────────────
def audio_to_wav_bytes(audio_data, sample_rate: int) -> bytes:
"""
audio_data may be ndarray, list, or torch.Tensor.
Returns raw WAV bytes suitable for streaming.
"""
if isinstance(audio_data, torch.Tensor):
audio_np = audio_data.squeeze().cpu().numpy()
elif isinstance(audio_data, list):
audio_np = np.array(audio_data, dtype=np.float32)
else:
audio_np = np.array(audio_data, dtype=np.float32)
# Normalize to [-1, 1]
max_val = np.abs(audio_np).max()
if max_val > 0:
audio_np = audio_np / max_val
# Convert to 16-bit PCM
pcm = (audio_np * 32767).astype(np.int16)
buf = io.BytesIO()
with wave.open(buf, "wb") as wf:
wf.setnchannels(1)
wf.setsampwidth(2) # 16-bit
wf.setframerate(sample_rate)
wf.writeframes(pcm.tobytes())
buf.seek(0)
return buf.read()
# ─────────────────────────────────────────
# Helper β€” bytes β†’ numpy array for STT
# ─────────────────────────────────────────
def wav_bytes_to_numpy(raw: bytes) -> np.ndarray:
"""
Accept WAV or raw bytes from the browser's MediaRecorder.
Returns float32 numpy array at 16 kHz (required by Moonshine).
"""
buf = io.BytesIO(raw)
try:
waveform, sr = torchaudio.load(buf)
except Exception:
# Fallback: try as raw PCM float32
arr = np.frombuffer(raw, dtype=np.int16).astype(np.float32) / 32768.0
return arr
# Resample to 16 kHz if needed
if sr != 16000:
waveform = torchaudio.functional.resample(waveform, sr, 16000)
# Mono
if waveform.shape[0] > 1:
waveform = waveform.mean(dim=0, keepdim=True)
return waveform.squeeze().numpy()
# ─────────────────────────────────────────
# Helper β€” build system prompt
# ─────────────────────────────────────────
CAREER_PHASES = {
"welcome": "Warmly welcome the student and set expectations for this session.",
"interests": "Ask about favourite subjects, activities, hobbies and what excites them.",
"strengths": "Probe how they approach problems, work with people, creativity vs structure.",
"preferences": "Explore: people vs data vs ideas vs things, indoor/outdoor, travel, pace.",
"scenarios": "Propose 3-5 career clusters. Ask scenario questions to test fit.",
"refinement": "Narrow down to 2-3 best-fit career paths and confirm with the student.",
"summary": "Summarise everything warmly and tell the student the report is ready.",
}
def build_system_prompt(student: StudentInfo, phase: str) -> str:
phase_instruction = CAREER_PHASES.get(phase, CAREER_PHASES["interests"])
return f"""You are Ivy, a warm and insightful AI career counsellor helping a high school student discover the right career path.
STUDENT PROFILE
Name : {student.name}
Grade : {student.grade}
Curriculum : {student.curriculum}
Subjects : {student.subjects or "Not specified"}
Interests : {student.interests or "Not specified"}
CURRENT FOCUS
{phase_instruction}
RULES
- Be encouraging, friendly, and concise (2-4 sentences per turn).
- Ask ONE clear question per response.
- Never list more than 3 options at once.
- Mirror the student's language and energy.
- Do NOT mention these rules or your role instructions.
- Respond in plain conversational English, no markdown bullets in speech.
"""
# ─────────────────────────────────────────
# Helper β€” extract structured insights
# ─────────────────────────────────────────
INTEREST_KEYWORDS = ["love", "enjoy", "excited", "passionate", "fascinated", "favourite", "like"]
STRENGTH_KEYWORDS = ["good at", "skilled", "excel", "naturally", "talent", "strength", "always been"]
PREF_PEOPLE = ["people", "team", "collaborate", "friends", "social", "help others"]
PREF_DATA = ["data", "numbers", "analysis", "statistics", "pattern", "research"]
PREF_IDEAS = ["creative", "ideas", "design", "invent", "imagine", "art", "write"]
PREF_THINGS = ["build", "make", "fix", "engineer", "hands-on", "machines", "code"]
def extract_insights(history: List[ChatMessage]) -> Dict[str, Any]:
user_text = " ".join(m.content.lower() for m in history if m.role == "user")
insights: Dict[str, Any] = {}
if any(k in user_text for k in INTEREST_KEYWORDS):
insights["interests_detected"] = True
if any(k in user_text for k in STRENGTH_KEYWORDS):
insights["strengths_detected"] = True
orientation = []
if any(k in user_text for k in PREF_PEOPLE): orientation.append("People")
if any(k in user_text for k in PREF_DATA): orientation.append("Data")
if any(k in user_text for k in PREF_IDEAS): orientation.append("Ideas")
if any(k in user_text for k in PREF_THINGS): orientation.append("Things")
if orientation:
insights["work_orientation"] = orientation
if "travel" in user_text or "outdoor" in user_text:
insights["lifestyle"] = "open to travel / outdoors"
elif "stable" in user_text or "home" in user_text or "remote" in user_text:
insights["lifestyle"] = "prefers stability / remote"
return insights
# ─────────────────────────────────────────
# ROUTES
# ─────────────────────────────────────────
@app.get("/api/health")
async def health():
return {
"status": "ok",
"models_loaded": list(MODELS.keys()),
"cuda": torch.cuda.is_available(),
"device": torch.cuda.get_device_name(0) if torch.cuda.is_available() else "CPU",
}
@app.get("/api/models/status")
async def models_status():
return {
"stt": "UsefulSensors/moonshine-base" if "stt" in MODELS else None,
"llm": "Phi-3.5-mini-instruct" if "llm" in MODELS else None,
"tts": "hexgrad/Kokoro-82M" if "tts" in MODELS else None,
}
# ── STT ────────────────────────────────
@app.post("/api/stt")
async def speech_to_text(audio: UploadFile = File(...)):
"""
Accepts: multipart audio file (wav / webm / ogg from browser MediaRecorder)
Returns: { text, duration_ms }
"""
if "stt" not in MODELS:
raise HTTPException(503, "STT model not loaded")
raw = await audio.read()
if not raw:
raise HTTPException(400, "Empty audio file")
t0 = time.perf_counter()
try:
audio_np = wav_bytes_to_numpy(raw)
result = MODELS["stt"]({"array": audio_np, "sampling_rate": 16000})
text = result.get("text", "").strip()
except Exception as e:
log.exception("STT error")
raise HTTPException(500, f"STT failed: {e}")
ms = int((time.perf_counter() - t0) * 1000)
log.info(f"STT [{ms}ms]: '{text[:80]}'")
return {"text": text, "duration_ms": ms}
# ── LLM ────────────────────────────────
@app.post("/api/llm")
async def generate_response(req: LLMRequest):
"""
Accepts: { student, messages:[{role,content}], phase }
Returns: { reply, insights, phase }
"""
if "llm" not in MODELS:
raise HTTPException(503, "LLM model not loaded")
tokenizer = MODELS["tok"]
model = MODELS["llm"]
system = build_system_prompt(req.student, req.phase)
# Build full message list with system prompt at head
hf_messages = [{"role": "system", "content": system}]
for m in req.messages[-10:]: # keep last 10 for context window
hf_messages.append({"role": m.role, "content": m.content})
t0 = time.perf_counter()
try:
inputs = tokenizer.apply_chat_template(
hf_messages,
return_tensors="pt",
add_generation_prompt=True,
).to(model.device)
with torch.no_grad():
outputs = model.generate(
inputs,
max_new_tokens=180,
temperature=0.75,
top_p=0.9,
do_sample=True,
repetition_penalty=1.1,
pad_token_id=tokenizer.eos_token_id,
)
new_tokens = outputs[0][inputs.shape[1]:]
reply = tokenizer.decode(new_tokens, skip_special_tokens=True).strip()
# Clean up any leaked prompt fragments
reply = re.sub(r"<\|.*?\|>", "", reply).strip()
except Exception as e:
log.exception("LLM error")
raise HTTPException(500, f"LLM failed: {e}")
ms = int((time.perf_counter() - t0) * 1000)
log.info(f"LLM [{ms}ms] phase={req.phase}: '{reply[:80]}'")
insights = extract_insights(req.messages)
return {
"reply": reply,
"insights": insights,
"phase": req.phase,
"duration_ms": ms,
}
# ── TTS ────────────────────────────────
@app.post("/api/tts")
async def text_to_speech(req: TTSRequest):
"""
Accepts: { text, speed? }
Returns: audio/wav stream
"""
if "tts" not in MODELS:
raise HTTPException(503, "TTS model not loaded")
text = req.text.strip()
if not text:
raise HTTPException(400, "Empty text")
# Kokoro has a ~500-char practical limit per call β€” split long texts
MAX_CHARS = 400
if len(text) > MAX_CHARS:
text = text[:MAX_CHARS] + "…"
t0 = time.perf_counter()
try:
result = MODELS["tts"](text)
audio_data = result["audio"]
sample_rate = result["sampling_rate"]
wav_bytes = audio_to_wav_bytes(audio_data, sample_rate)
except Exception as e:
log.exception("TTS error")
raise HTTPException(500, f"TTS failed: {e}")
ms = int((time.perf_counter() - t0) * 1000)
log.info(f"TTS [{ms}ms] {len(text)} chars β†’ {len(wav_bytes)//1024}KB wav")
return StreamingResponse(
io.BytesIO(wav_bytes),
media_type="audio/wav",
headers={
"X-Duration-Ms": str(ms),
"X-Sample-Rate": str(sample_rate),
},
)
# ── REPORT (JSON) ──────────────────────
REPORT_SYSTEM = """You are a career analysis expert.
Given a conversation between a career counsellor (assistant) and a student (user),
output ONLY a valid JSON object (no markdown, no explanation) in this exact schema:
{
"topInterests": ["<string>", "<string>", "<string>"],
"keyStrengths": ["<string>", "<string>", "<string>", "<string>", "<string>"],
"careerPaths": [
{
"name": "<Career Name>",
"cluster": "<Cluster>",
"fitReasons": ["<reason1>", "<reason2>", "<reason3>"],
"skills": ["<skill1>", "<skill2>", "<skill3>", "<skill4>"],
"applicationHints": ["<hint1>", "<hint2>", "<hint3>"]
}
],
"nextSteps": ["<step1>", "<step2>", "<step3>", "<step4>"]
}
Return exactly 3 careerPaths. Use evidence from the conversation.
"""
@app.post("/api/report")
async def generate_report(req: ReportRequest):
"""
Analyses full conversation and returns structured career report JSON.
"""
if "llm" not in MODELS:
raise HTTPException(503, "LLM model not loaded")
tokenizer = MODELS["tok"]
model = MODELS["llm"]
# Format conversation for the prompt
convo_text = "\n".join(
f"{'STUDENT' if m.role=='user' else 'IVY'}: {m.content}"
for m in req.history
)
user_prompt = (
f"Student: {req.student.name}, Grade {req.student.grade}, "
f"{req.student.curriculum}. "
f"Subjects: {req.student.subjects}. "
f"Interests: {req.student.interests}.\n\n"
f"CONVERSATION:\n{convo_text}\n\n"
"Now output the JSON report."
)
hf_messages = [
{"role": "system", "content": REPORT_SYSTEM},
{"role": "user", "content": user_prompt},
]
try:
inputs = tokenizer.apply_chat_template(
hf_messages,
return_tensors="pt",
add_generation_prompt=True,
).to(model.device)
with torch.no_grad():
outputs = model.generate(
inputs,
max_new_tokens=900,
temperature=0.3, # low temp for structured output
do_sample=False,
pad_token_id=tokenizer.eos_token_id,
)
raw = tokenizer.decode(outputs[0][inputs.shape[1]:], skip_special_tokens=True).strip()
# Extract JSON block
json_match = re.search(r"\{[\s\S]*\}", raw)
if json_match:
report = json.loads(json_match.group())
else:
raise ValueError("No JSON found in LLM output")
except Exception as e:
log.warning(f"Report LLM parse failed ({e}), using fallback")
report = _fallback_report(req)
return {"report": report, "student": req.student.dict()}
def _fallback_report(req: ReportRequest) -> dict:
return {
"topInterests": ["Technology", "Problem Solving", "Creative Thinking"],
"keyStrengths": ["Analytical", "Curious learner", "Communicator", "Collaborative", "Adaptable"],
"careerPaths": [
{
"name": "Software Engineering",
"cluster": "Technology",
"fitReasons": [
"Showed enthusiasm for building things",
"Strong logical reasoning across the conversation",
"Interest in technology and problem solving",
],
"skills": ["Programming", "System Design", "Problem Solving", "Teamwork"],
"applicationHints": [
"Highlight personal coding projects",
"Describe a bug you fixed and how you approached it",
"Mention open-source or hackathon experience",
],
},
{
"name": "Data Science",
"cluster": "Technology & Analytics",
"fitReasons": [
"Affinity for numbers and pattern finding",
"Interest in research and evidence-based decisions",
"Comfort with both technical and narrative thinking",
],
"skills": ["Statistics", "Python/R", "Visualisation", "ML basics"],
"applicationHints": [
"Discuss a data-driven project or analysis",
"Show mathematical curiosity in your personal statement",
"Mention interest in AI or predictive modelling",
],
},
{
"name": "Product Management",
"cluster": "Business & Technology",
"fitReasons": [
"Strong communication and collaborative skills",
"Interest in understanding user needs",
"Ability to balance technical and business perspectives",
],
"skills": ["Strategic Thinking", "Communication", "UX Basics", "Leadership"],
"applicationHints": [
"Highlight leadership roles or initiatives",
"Discuss a problem you solved for others",
"Show cross-disciplinary thinking",
],
},
],
"nextSteps": [
"Explore Coursera / edX courses in your top 2 paths",
"Start a small personal project combining your interests",
"Connect with professionals via LinkedIn for informational interviews",
"Research universities with strong programmes in these areas",
],
}
# ── PDF REPORT ─────────────────────────
@app.post("/api/report/pdf")
async def generate_pdf(req: ReportRequest):
"""
Calls /api/report internally, then renders a PDF via ReportLab.
Returns application/pdf.
"""
from reportlab.lib.pagesizes import A4
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
from reportlab.lib.units import cm
from reportlab.lib import colors
from reportlab.lib.enums import TA_CENTER, TA_LEFT
from reportlab.platypus import (
SimpleDocTemplate, Paragraph, Spacer, Table, TableStyle,
HRFlowable, PageBreak,
)
# Get report data
report_resp = await generate_report(req)
rpt = report_resp["report"]
stu = req.student
buf = io.BytesIO()
doc = SimpleDocTemplate(
buf, pagesize=A4,
topMargin=1.8*cm, bottomMargin=1.8*cm,
leftMargin=2*cm, rightMargin=2*cm,
)
styles = getSampleStyleSheet()
# ── Custom styles ──
H1 = ParagraphStyle("H1", parent=styles["Heading1"],
fontSize=22, textColor=colors.HexColor("#3730a3"),
alignment=TA_CENTER, spaceAfter=6)
H2 = ParagraphStyle("H2", parent=styles["Heading2"],
fontSize=13, textColor=colors.HexColor("#1e40af"),
spaceBefore=14, spaceAfter=6)
H3 = ParagraphStyle("H3", parent=styles["Heading3"],
fontSize=11, textColor=colors.HexColor("#374151"),
spaceBefore=10, spaceAfter=4)
BODY = ParagraphStyle("BODY", parent=styles["Normal"],
fontSize=10, textColor=colors.HexColor("#374151"),
spaceAfter=3, leading=15)
SMALL = ParagraphStyle("SMALL", parent=BODY, fontSize=9,
textColor=colors.HexColor("#6b7280"))
CENTER = ParagraphStyle("CENTER", parent=BODY, alignment=TA_CENTER)
TAG = ParagraphStyle("TAG", parent=BODY, fontSize=9,
textColor=colors.HexColor("#3730a3"))
story = []
# ── Title page header ──
story.append(Spacer(1, 0.4*cm))
story.append(Paragraph("Career Discovery Report", H1))
story.append(Paragraph(f"Prepared for <b>{stu.name}</b>", CENTER))
story.append(Spacer(1, 0.3*cm))
story.append(HRFlowable(width="100%", thickness=1,
color=colors.HexColor("#c7d2fe")))
story.append(Spacer(1, 0.5*cm))
# ── Snapshot table ──
story.append(Paragraph("Student Snapshot", H2))
snap_data = [
["Name", stu.name],
["Grade", stu.grade],
["Curriculum", stu.curriculum],
["Subjects", stu.subjects or "β€”"],
["Interests", stu.interests or "β€”"],
]
snap_table = Table(snap_data, colWidths=[4*cm, 12*cm])
snap_table.setStyle(TableStyle([
("FONTNAME", (0,0), (-1,-1), "Helvetica"),
("FONTSIZE", (0,0), (-1,-1), 10),
("FONTNAME", (0,0), (0,-1), "Helvetica-Bold"),
("TEXTCOLOR", (0,0), (0,-1), colors.HexColor("#3730a3")),
("TEXTCOLOR", (1,0), (1,-1), colors.HexColor("#374151")),
("ROWBACKGROUNDS",(0,0),(-1,-1),[colors.HexColor("#eef2ff"), colors.white]),
("TOPPADDING", (0,0), (-1,-1), 6),
("BOTTOMPADDING",(0,0),(-1,-1), 6),
("LEFTPADDING", (0,0), (-1,-1), 8),
]))
story.append(snap_table)
story.append(Spacer(1, 0.5*cm))
# ── Interests ──
story.append(Paragraph("Top Interests", H2))
for i in rpt.get("topInterests", []):
story.append(Paragraph(f"β—† {i}", BODY))
story.append(Spacer(1, 0.4*cm))
# ── Strengths ──
story.append(Paragraph("Key Strengths", H2))
for s in rpt.get("keyStrengths", []):
story.append(Paragraph(f"β–Έ {s}", BODY))
story.append(Spacer(1, 0.4*cm))
# ── Career Paths ──
story.append(Paragraph("Recommended Career Paths", H2))
for idx, cp in enumerate(rpt.get("careerPaths", []), 1):
story.append(Paragraph(f"{idx}. {cp['name']}", H3))
story.append(Paragraph(f"<i>Cluster: {cp.get('cluster','')}</i>", SMALL))
story.append(Spacer(1, 0.15*cm))
story.append(Paragraph("<b>Why this fits you</b>", SMALL))
for r in cp.get("fitReasons", []):
story.append(Paragraph(f" β€’ {r}", BODY))
story.append(Paragraph("<b>Key skills</b>", SMALL))
story.append(Paragraph(", ".join(cp.get("skills", [])), TAG))
story.append(Paragraph("<b>Application hints</b>", SMALL))
for h in cp.get("applicationHints", []):
story.append(Paragraph(f" {chr(9312+idx-1)} {h}", BODY))
if idx < len(rpt.get("careerPaths", [])):
story.append(HRFlowable(width="100%", thickness=0.5,
color=colors.HexColor("#e5e7eb")))
story.append(Spacer(1, 0.2*cm))
story.append(Spacer(1, 0.5*cm))
# ── Next Steps ──
story.append(HRFlowable(width="100%", thickness=1,
color=colors.HexColor("#c7d2fe")))
story.append(Spacer(1, 0.3*cm))
story.append(Paragraph("Suggested Next Steps", H2))
for i, step in enumerate(rpt.get("nextSteps", []), 1):
story.append(Paragraph(f"{i}. {step}", BODY))
# ── Footer ──
story.append(Spacer(1, 0.8*cm))
story.append(Paragraph(
f"Generated by Career Discovery AI β€’ HelloIvy β€’ {stu.name}",
ParagraphStyle("FOOTER", parent=SMALL, alignment=TA_CENTER,
textColor=colors.HexColor("#9ca3af"))
))
doc.build(story)
buf.seek(0)
safe_name = re.sub(r"[^\w]", "_", stu.name)
return StreamingResponse(
buf,
media_type="application/pdf",
headers={"Content-Disposition": f'attachment; filename="career_report_{safe_name}.pdf"'},
)
# ─────────────────────────────────────────
# Run
# ─────────────────────────────────────────
if __name__ == "__main__":
import uvicorn
uvicorn.run("server:app", host="0.0.0.0", port=8000, reload=False)