"""EduScanner AI - University Edition - FastAPI Backend"""
from fastapi import FastAPI, APIRouter, HTTPException, Depends, Request, Response, UploadFile, File, Form, Cookie
from fastapi.responses import JSONResponse
from dotenv import load_dotenv
from starlette.middleware.cors import CORSMiddleware
from motor.motor_asyncio import AsyncIOMotorClient
import os
import re
import json
import logging
import uuid
import shutil
import httpx
from pathlib import Path
from pydantic import BaseModel, Field, ConfigDict
from typing import List, Optional, Any
from datetime import datetime, timezone, timedelta
from bson import Binary
import asyncio
from typing import Dict
import random
import tempfile
import time
import google.generativeai as genai
from openai import OpenAI as GroqClient
from pypdf import PdfReader, PdfWriter
try:
from PIL import Image
PIL_AVAILABLE = True
except ImportError:
PIL_AVAILABLE = False
Image = None
import io
ROOT_DIR = Path(__file__).parent
if (ROOT_DIR / '.env').exists():
load_dotenv(ROOT_DIR / '.env', override=True)
UPLOAD_DIR = Path(os.environ.get('UPLOAD_DIR', str(ROOT_DIR / 'uploads')))
UPLOAD_DIR.mkdir(parents=True, exist_ok=True)
AUDIO_DIR = ROOT_DIR / "audio"
AUDIO_DIR.mkdir(parents=True, exist_ok=True)
MONGO_URL = os.environ.get('MONGO_URL', '')
DB_NAME = os.environ.get('DB_NAME', 'eduscanner_ai')
if MONGO_URL:
client = AsyncIOMotorClient(MONGO_URL)
db = client[DB_NAME]
else:
client = None
db = None
GEMINI_API_KEY = os.environ.get('GEMINI_API_KEY', "AIzaSyAhVnCOblQvDvq9VIG6A4ztOdGh_yqarfk")
GEMINI_BASE_URL = os.environ.get('GEMINI_BASE_URL', "https://generativelanguage.googleapis.com/v1beta")
GEMINI_MODEL = os.environ.get('GEMINI_MODEL', "gemini-2.5-flash")
GEMINI_ANALYSIS_MODEL = os.environ.get('GEMINI_ANALYSIS_MODEL', "gemini-2.5-flash")
GROQ_API_KEY = os.environ.get('GROQ_API_KEY', "")
GROQ_MODEL = os.environ.get('GROQ_MODEL', "llama-3.3-70b-versatile")
SUPABASE_URL = os.environ.get("SUPABASE_URL", "").rstrip("/")
SUPABASE_ANON_KEY = os.environ.get("SUPABASE_ANON_KEY", "")
app = fastapi_app = FastAPI(title="EduScanner AI")
API_PREFIX = "" if os.environ.get("VERCEL") == "1" else "/api"
api_router = APIRouter(prefix=API_PREFIX)
logger = logging.getLogger(__name__)
HF_DOCUMENT_SEMAPHORE = asyncio.Semaphore(1)
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
@fastapi_app.middleware("http")
async def log_requests(request: Request, call_next):
logger.info(f"Incoming request: {request.method} {request.url.path}")
try:
response = await call_next(request)
logger.info(f"Response status: {response.status_code}")
return response
except Exception as e:
logger.exception(f"Request failed: {e}")
return JSONResponse(
status_code=500,
content={"detail": "Internal server error"},
)
# ============== Models ==============
class User(BaseModel):
user_id: str
email: str
name: str
picture: Optional[str] = None
friend_code: Optional[str] = None
education_level: Optional[str] = None # SD, SMP, SMA, SMK, MA, Universitas
major: Optional[str] = None # jurusan / prodi (kosong untuk SD/SMP)
institution: Optional[str] = None # nama sekolah / universitas
current_semester: Optional[int] = None # kelas/tingkat untuk sekolah, semester untuk univ
subjects: Optional[list] = None # [{id, name, folder_id}]
schedule: Optional[list] = None # [{day, start_time, end_time, subject_id}]
teaching_methods: Optional[list] = None # ["real_world","imagination","independence","confidence"]
onboarded: bool = False
clone_voice_enabled: Optional[bool] = None
clone_voice_url: Optional[str] = None
created_at: datetime
class ProfileUpdate(BaseModel):
education_level: str
major: Optional[str] = None
institution: str
current_semester: int
teaching_methods: Optional[List[str]] = None
clone_voice_enabled: Optional[bool] = None
clone_voice_url: Optional[str] = None
class TeachingMethodsUpdate(BaseModel):
teaching_methods: List[str]
class FriendCodeUpdate(BaseModel):
friend_code: str
class SubjectItem(BaseModel):
id: str
name: str
folder_id: Optional[str] = None
class ScheduleItem(BaseModel):
day: str
start_time: str
end_time: str
subject_id: str
class EducationSettingsPayload(BaseModel):
subjects: list[SubjectItem]
schedule: list[ScheduleItem]
class MaterialGeneratePayload(BaseModel):
subject_id: str
subject_name: str
topic: Optional[str] = None
class DocumentMeta(BaseModel):
document_id: str
user_id: str
filename: str
title: Optional[str] = None
summary: Optional[str] = None
key_concepts: List[dict] = []
diagrams: List[dict] = []
learning_objectives: List[str] = []
status: str = "processing"
created_at: datetime
class QuizQuestion(BaseModel):
id: str
question: str
options: List[str]
correct_index: int
skill_type: str # analisis_kode, troubleshooting, perancangan_db, konsep
class Quiz(BaseModel):
quiz_id: str
user_id: str
document_id: str
questions: List[QuizQuestion]
created_at: datetime
class QuizGenerateRequest(BaseModel):
document_id: Optional[str] = None
document_ids: Optional[List[str]] = None
folder_id: Optional[str] = None
recap_id: Optional[str] = None
question_count: int = 5
class QuizSubmission(BaseModel):
quiz_id: str
answers: List[int] # selected option indexes
class FolderCreate(BaseModel):
name: str
class FolderUpdate(BaseModel):
name: str
class DocumentMove(BaseModel):
document_ids: List[str]
folder_id: Optional[str] = None # null to remove from folder
class RecapRequest(BaseModel):
document_ids: Optional[List[str]] = None
folder_id: Optional[str] = None
class FeedbackItem(BaseModel):
question: str
selected: str
correct: str
is_correct: bool
explanation: str
references: List[str]
# ============== Auth helpers ==============
async def fetch_supabase_user(access_token: str) -> Optional[dict]:
if not SUPABASE_URL or not SUPABASE_ANON_KEY:
logger.error("SUPABASE_URL / SUPABASE_ANON_KEY belum diset")
return None
try:
async with httpx.AsyncClient(timeout=15.0) as hc:
r = await hc.get(
f"{SUPABASE_URL}/auth/v1/user",
headers={
"Authorization": f"Bearer {access_token}",
"apikey": SUPABASE_ANON_KEY,
"X-Supabase-Api-Version": "2025-04-01",
},
)
if r.status_code != 200:
logger.warning(f"Supabase /auth/v1/user responded {r.status_code}: {r.text[:200]}")
return None
return r.json()
except Exception as e:
logger.exception(f"Gagal validasi token Supabase: {e}")
return None
async def get_or_create_local_user(email: str, name: str, picture: Optional[str] = None) -> dict:
users_cursor = db.users.find({"email": email}, {"_id": 0}).sort([("onboarded", -1), ("created_at", 1)])
users = await users_cursor.to_list(length=10)
user = users[0] if users else None
if user:
updates = {"name": name, "picture": picture}
if not user.get("friend_code"):
updates["friend_code"] = await _generate_unique_friend_code(name)
await db.users.update_one(
{"user_id": user["user_id"]},
{"$set": updates},
)
user = await db.users.find_one({"user_id": user["user_id"]}, {"_id": 0})
else:
user_id = f"user_{uuid.uuid4().hex[:12]}"
friend_code = await _generate_unique_friend_code(name)
await db.users.insert_one({
"user_id": user_id,
"email": email,
"name": name,
"picture": picture,
"friend_code": friend_code,
"onboarded": False,
"created_at": datetime.now(timezone.utc).isoformat(),
})
user = await db.users.find_one({"user_id": user_id}, {"_id": 0})
return user
async def get_current_user(request: Request) -> User:
# Legacy session cookie (emergent auth)
cookie_token = request.cookies.get("session_token")
if cookie_token:
session = await db.user_sessions.find_one({"session_token": cookie_token}, {"_id": 0})
if session:
expires_at = session["expires_at"]
if isinstance(expires_at, str):
expires_at = datetime.fromisoformat(expires_at)
if expires_at.tzinfo is None:
expires_at = expires_at.replace(tzinfo=timezone.utc)
if expires_at >= datetime.now(timezone.utc):
user_doc = await db.users.find_one({"user_id": session["user_id"]}, {"_id": 0})
if user_doc:
if isinstance(user_doc.get("created_at"), str):
user_doc["created_at"] = datetime.fromisoformat(user_doc["created_at"])
return User(**user_doc)
# Supabase bearer token (primary auth)
auth = request.headers.get("Authorization", "")
if not auth.startswith("Bearer "):
raise HTTPException(401, "Tidak terautentikasi")
access_token = auth.split(" ", 1)[1]
supa_user = await fetch_supabase_user(access_token)
if not supa_user:
raise HTTPException(401, "Token Supabase tidak valid")
email = supa_user.get("email")
if not email:
raise HTTPException(401, "Email user tidak ditemukan")
metadata = supa_user.get("user_metadata") or {}
name = metadata.get("full_name") or metadata.get("name") or email.split("@")[0]
picture = metadata.get("avatar_url")
user_doc = await get_or_create_local_user(email=email, name=name, picture=picture)
if isinstance(user_doc.get("created_at"), str):
user_doc["created_at"] = datetime.fromisoformat(user_doc["created_at"])
return User(**user_doc)
async def write_audit(user_id: str, action: str, details: dict = None, ip: str = ""):
today = datetime.now(timezone.utc).strftime("%Y%m%d")
counter = await db.counters.find_one_and_update(
{"_id": f"audit_{today}"},
{"$inc": {"seq": 1}},
upsert=True,
return_document=True,
)
seq = counter["seq"] if counter else 1
log_id = f"AUD-{today}-{seq:04d}"
doc = {
"log_id": log_id,
"user_id": user_id,
"action": action,
"details": details or {},
"ip_address": ip,
"audit_date": today,
"timestamp": datetime.now(timezone.utc).isoformat(),
}
await db.audit_logs.insert_one(doc)
return log_id
# ============== Auth Endpoints ==============
@api_router.post("/auth/session")
async def auth_session(request: Request, response: Response):
"""
Supabase-based session endpoint (replaces emergent oauth session exchange).
Accepts access_token in JSON body and returns local user profile.
"""
body = await request.json()
access_token = body.get("access_token")
if not access_token:
raise HTTPException(400, "access_token wajib ada")
supa_user = await fetch_supabase_user(access_token)
if not supa_user:
raise HTTPException(401, "Token Supabase tidak valid")
email = supa_user.get("email")
if not email:
raise HTTPException(401, "Email user tidak ditemukan")
metadata = supa_user.get("user_metadata") or {}
name = metadata.get("full_name") or metadata.get("name") or email.split("@")[0]
picture = metadata.get("avatar_url")
user_doc = await get_or_create_local_user(email=email, name=name, picture=picture)
await write_audit(user_doc["user_id"], "LOGIN_SUCCESS", {"email": email}, request.client.host if request.client else "")
if isinstance(user_doc.get("created_at"), str):
user_doc["created_at"] = datetime.fromisoformat(user_doc["created_at"])
return {"user": user_doc}
@api_router.get("/auth/me")
async def auth_me(user: User = Depends(get_current_user)):
return user.model_dump()
@api_router.post("/auth/logout")
async def logout(request: Request, response: Response):
token = request.cookies.get("session_token")
if token:
await db.user_sessions.delete_one({"session_token": token})
response.delete_cookie("session_token", path="/")
return {"ok": True}
# ============== Profile ==============
@api_router.put("/profile")
async def update_profile(payload: ProfileUpdate, request: Request, user: User = Depends(get_current_user)):
LEVELS_NO_MAJOR = {"SD", "SMP"}
major = payload.major if payload.education_level not in LEVELS_NO_MAJOR else None
update_data = {
"education_level": payload.education_level,
"major": major,
"institution": payload.institution,
"current_semester": payload.current_semester,
"onboarded": True,
}
if payload.teaching_methods is not None:
update_data["teaching_methods"] = payload.teaching_methods
if payload.clone_voice_enabled is not None:
update_data["clone_voice_enabled"] = payload.clone_voice_enabled
if payload.clone_voice_url is not None:
update_data["clone_voice_url"] = payload.clone_voice_url
await db.users.update_one(
{"user_id": user.user_id},
{"$set": update_data},
)
await write_audit(user.user_id, "PROFILE_UPDATE", payload.model_dump(), request.client.host if request.client else "")
user_doc = await db.users.find_one({"user_id": user.user_id}, {"_id": 0})
return user_doc
@api_router.put("/profile/friend-code")
async def update_friend_code(payload: FriendCodeUpdate, request: Request, user: User = Depends(get_current_user)):
code = payload.friend_code.strip().lower()
if not code:
raise HTTPException(400, "Friend code tidak boleh kosong")
if not re.match(r'^[a-z0-9_]{3,20}$', code):
raise HTTPException(400, "Friend code hanya boleh huruf, angka, underscore (3-20 karakter)")
if code == user.friend_code:
return {"friend_code": code}
existing = await db.users.find_one({"friend_code": code}, {"_id": 0, "user_id": 1})
if existing:
raise HTTPException(409, "Friend code sudah dipakai user lain")
await db.users.update_one(
{"user_id": user.user_id},
{"$set": {"friend_code": code}},
)
await write_audit(user.user_id, "FRIEND_CODE_UPDATE", {"friend_code": code}, request.client.host if request.client else "")
return {"friend_code": code}
@api_router.put("/profile/teaching-methods")
async def update_teaching_methods(payload: TeachingMethodsUpdate, request: Request, user: User = Depends(get_current_user)):
valid = {"real_world", "imagination", "independence", "confidence"}
for m in payload.teaching_methods:
if m not in valid:
raise HTTPException(400, f"Metode '{m}' tidak valid. Pilihan: {', '.join(valid)}")
await db.users.update_one(
{"user_id": user.user_id},
{"$set": {"teaching_methods": payload.teaching_methods}},
)
await write_audit(user.user_id, "TEACHING_METHODS_UPDATE", {"teaching_methods": payload.teaching_methods}, request.client.host if request.client else "")
return {"teaching_methods": payload.teaching_methods}
# ============== PDF Analysis (Adaptive Chunked) ==============
def _determine_chunk_size(total_pages: int) -> int:
"""Return chunk size based on page count for adaptive analysis."""
if total_pages <= 15:
return 2
if total_pages <= 30:
return 3
if total_pages <= 60:
return 4
return 5
def _extract_pages_text(reader, start_page: int, end_page: int) -> str:
"""Extract text from page range (1-indexed), format each page as [PAGE X]\\ntext."""
pages_text = []
for i in range(start_page - 1, min(end_page, len(reader.pages))):
page = reader.pages[i]
text = page.extract_text() or ""
pages_text.append(f"[PAGE {i+1}]\n{text}")
return "\\n\\n".join(pages_text)
def _normalize_concept_name(name: str) -> str:
"""Normalize concept name for deduplication: lowercase, strip, remove articles, punctuation, normalize whitespace."""
name = name.lower().strip()
name = re.sub(r'^(?:the|a|an)\s+', '', name)
name = re.sub(r'[^\w\s-]', '', name)
name = re.sub(r'\s+', ' ', name).strip()
return name
def _deduplicate_concepts(concepts: list) -> list:
"""Remove duplicate concepts by normalized name; keep longer explanation+code_example."""
seen = {}
for c in concepts:
concept_name = c.get("concept", "")
norm = _normalize_concept_name(concept_name)
if not norm:
continue
current_len = len(c.get("explanation", "") + c.get("code_example", ""))
if norm not in seen or current_len > len(seen[norm].get("explanation", "") + seen[norm].get("code_example", "")):
seen[norm] = c
return list(seen.values())
def _merge_diagrams(diagrams: list) -> list:
"""Merge diagram duplicates by name+type, keep longer explanation."""
merged = {}
for d in diagrams:
key = (_normalize_concept_name(d.get("name", "")), d.get("type", ""))
current_len = len(d.get("explanation", ""))
if key not in merged or current_len > len(merged[key].get("explanation", "")):
merged[key] = d
return list(merged.values())
async def _synthesize_summary_from_chunks(summaries: list, user: User) -> str:
"""Merge batch summaries into final document summary."""
if len(summaries) <= 2:
return "\\n\\n".join(s for s in summaries if s).strip()
total_words = sum(len(s.split()) for s in summaries if s)
if total_words < 300:
return "\\n\\n".join(s for s in summaries if s).strip()
# Use Gemini (cheaper model) to merge into 3-4 paragraphs (150-200 words)
audience = _audience(user)
system = (
f"Kamu adalah EduScanner AI untuk {audience}. "
f"Gabungkan ringkasan-rumkasan berikut menjadi 3-4 paragraf padat (150-200 kata). "
f"Jangan ulangi pengantar. Hasilkan ringkasan yang koheren dan fokus pada isi teknis."
)
prompt = "RINGKASAN-BATCH:\\n\\n" + "\\n\\n---\\n\\n".join(s for s in summaries if s)
try:
merged = await _call_gemini(system, prompt, model=GEMINI_MODEL)
return merged.strip()
except Exception as e:
logger.warning(f"Summary merge failed: {e}, using concat fallback")
return "\\n\\n".join(s for s in summaries if s).strip()
async def _analyze_batch(reader, start_page: int, end_page: int, user: User, total_batches: int, batch_idx: int) -> dict:
"""Analyze a single batch of pages."""
text = _extract_pages_text(reader, start_page, end_page)
if len(text) < 50:
return {
"summary": "",
"key_concepts": [],
"diagrams": [],
"learning_objectives": [],
"_batch_meta": {"start_page": start_page, "end_page": end_page, "concept_count": 0, "skipped": True, "error": None}
}
audience = _audience(user)
system = (
f"Kamu adalah EduScanner AI, asisten akademik elit untuk {audience}. "
f"Tugasmu melakukan 'Deep Technical Extraction'. "
f"Analisis HANYA halaman {start_page} sampai {end_page} dari dokumen ini. "
f"Jangan berikan ringkasan umum. Bahasa Indonesia. Akademik, padat, fakta-oriented."
)
prompt = (
"Ekstrak informasi dari halaman tertentu ini. Output JSON:\\n"
"{\\n"
' "summary": "ringkasan batch maks 50 kata",\\n'
' "key_concepts": [{"concept": "nama", "explanation": "penjelasan", "code_example": "kode?"}],\\n'
' "diagrams": [{"name": "...", "type": "flowchart|diagram|chart|graph", "explanation": "logika alur"}],\\n'
' "learning_objectives": ["objektif 1", "objektif 2"]\\n'
"}\\n\\n"
"Kewajiban:\\n"
"- Key concepts: 2-4 item (jika ada).\\n"
"- Diagrams: max 1 (jika ada).\\n"
"- Rangkuman: padat, teknis, menyertakan data/angka jika ada."
)
try:
resp = await _call_gemini(system, prompt, model=GEMINI_ANALYSIS_MODEL)
data = _parse_json_block(resp)
return {
"summary": data.get("summary", ""),
"key_concepts": data.get("key_concepts", []),
"diagrams": data.get("diagrams", []),
"learning_objectives": data.get("learning_objectives", []),
"_batch_meta": {
"start_page": start_page,
"end_page": end_page,
"concept_count": len(data.get("key_concepts", [])),
"skipped": False,
"error": None
}
}
except Exception as e:
logger.warning(f"Batch {batch_idx+1}/{total_batches} (halaman {start_page}-{end_page}) gagal: {e}")
return {
"summary": "",
"key_concepts": [],
"diagrams": [],
"learning_objectives": [],
"_batch_meta": {"start_page": start_page, "end_page": end_page, "concept_count": 0, "skipped": True, "error": str(e)[:200]}
}
async def _analyze_pdf_legacy(file_path: str, user: User, total_pages: int = 0) -> dict:
"""Single-pass analysis. Pakai gemini-3-flash hanya untuk <30 halaman (hemat quota)."""
audience = _audience(user)
system = (
f"Kamu adalah EduScanner AI, asisten akademik elit untuk {audience}. "
f"Tugasmu melakukan 'Deep Technical Extraction'. Jangan berikan rangkuman umum yang dangkal. "
f"Gaya bahasa: Akademik, Padat, Fakta-Oriented. Bahasa Indonesia."
)
prompt = (
"Analisis PDF ini dengan kedalaman tinggi. Ekstrak informasi menggunakan struktur JSON berikut:\n"
"{\n"
' "title": "judul dokumen yang paling tepat",\n'
' "summary": "Rangkuman densitas tinggi (4-6 paragraf). Fokus pada: 1. Masalah utama, 2. Metodologi/Logika detail, 3. Temuan/Data spesifik, 4. Kesimpulan teknis.",\n'
' "key_concepts": [\n'
' {"concept": "nama konsep", "explanation": "Penjelasan mendalam mencakup cara kerja, rumus, atau logika internalnya.", "code_example": "kode implementasi jika relevan"}\n'
' ],\n'
' "diagrams": [{"name": "...", "type": "...", "explanation": "Logika alur data secara step-by-step"}],\n'
' "learning_objectives": ["Mampu menganalisis X...", "Mampu mengimplementasi Y...", ...]\n'
"}\n\n"
"Kewajiban:\n"
"- Jika ada angka, data statistik, atau terminologi sulit, WAJIB disertakan.\n"
"- key_concepts minimal 7-10 poin agar sangat detail.\n"
"- Hindari kalimat pembuka seperti 'Dokumen ini membahas tentang...'. Langsung ke substansi teknis."
)
use_flash3 = total_pages < 30
if use_flash3:
try:
resp = await _call_gemini(system, prompt, file_path=file_path, model=GEMINI_ANALYSIS_MODEL)
return _parse_json_block(resp)
except Exception as e:
logger.warning(f"Gemini-3-flash gagal ({e}), fallback ke gemini-2.5-flash")
try:
resp = await _call_gemini(system, prompt, file_path=file_path)
return _parse_json_block(resp)
except Exception as e:
logger.error(f"Semua model gagal: {e}")
return _empty_analysis()
async def analyze_pdf(file_path: str, user: User) -> dict:
"""Analyze PDF using single-pass strategy for high quality with Gemini 2.5."""
# Kasih jeda awal sesuai permintaan user
await asyncio.sleep(2)
reader = PdfReader(file_path)
total_pages = len(reader.pages)
# Gemini 2.5 memiliki context window 1jt+ token.
# Single-pass untuk dokumen sampai 200 halaman.
# Gemin-3-flash (kualitas tinggi) hanya untuk <30 halaman agar hemat quota free tier.
if total_pages <= 200:
return await _analyze_pdf_legacy(file_path, user, total_pages)
# Chunking hanya untuk file ekstrim (> 200 halaman)
chunk_size = _determine_chunk_size(total_pages) * 10
overlap = 5
step = chunk_size - overlap
# Calculate exact number of batches
remaining = total_pages - chunk_size
total_batches = (remaining + step - 1) // step + 1 if remaining > 0 else 1
all_concepts = []
all_diagrams = []
all_objectives = []
summaries = []
for batch_idx in range(total_batches):
if batch_idx > 0:
await asyncio.sleep(5) # Jeda lebih lama untuk file raksasa
start = 1 + batch_idx * step
end = min(start + chunk_size - 1, total_pages)
if start > total_pages:
break
batch = await _analyze_batch(reader, start, end, user, total_batches, batch_idx)
if batch.get("summary"):
summaries.append(batch.get("summary", ""))
all_concepts.extend(batch.get("key_concepts", []))
all_diagrams.extend(batch.get("diagrams", []))
all_objectives.extend(batch.get("learning_objectives", []))
# Final merge logic
unique_concepts = _deduplicate_concepts(all_concepts)[:25]
merged_diagrams = _merge_diagrams(all_diagrams)[:12]
seen_obj = set()
unique_objectives = []
for obj in all_objectives:
norm = _normalize_concept_name(obj)
if norm and norm not in seen_obj:
seen_obj.add(norm)
unique_objectives.append(obj)
return {
"title": f"Analisis Dokumen ({total_pages} hal)",
"summary": await _synthesize_summary_from_chunks(summaries, user),
"key_concepts": unique_concepts,
"diagrams": merged_diagrams,
"learning_objectives": unique_objectives[:12]
}
# ============== AI Helpers ==============
async def _generate_unique_friend_code(name: str) -> str:
base = re.sub(r'[^a-z0-9]', '', name.lower().replace(' ', '_'))[:12]
if not base:
base = "user"
for _ in range(20):
code = f"{base}_{random.randint(1000, 9999)}"
existing = await db.users.find_one({"friend_code": code}, {"_id": 0, "user_id": 1})
if not existing:
return code
return f"user_{uuid.uuid4().hex[:8]}"
async def _call_gemini(
system_message: str,
prompt: str,
file_path: Optional[str] = None,
model: Optional[str] = None,
) -> str:
model_name = model or GEMINI_MODEL
genai.configure(api_key=GEMINI_API_KEY)
def _parse_retry_delay(err_msg: str) -> Optional[float]:
m = re.search(r'retry_delay\s*\{(?:[^}]*seconds:\s*)?(\d+(?:\.\d+)?)', err_msg)
if m:
return float(m.group(1))
m = re.search(r'Please retry in\s*(\d+(?:\.\d+)?)s', err_msg)
if m:
return float(m.group(1))
return None
async def _gen_content_async(model_name: str, system_message: str, prompt: str, uploaded_file=None) -> str:
import random
max_retries = 5
base_delay = 1.0
last_exception = None
for attempt in range(max_retries):
try:
gen_model = genai.GenerativeModel(
model_name=model_name,
system_instruction=system_message,
generation_config=genai.types.GenerationConfig(
temperature=0.7, max_output_tokens=8192,
),
)
parts = [prompt]
if uploaded_file:
parts.insert(0, uploaded_file)
resp = await gen_model.generate_content_async(parts)
return resp.text
except Exception as e:
last_exception = e
err_msg = str(e)
if any(x in err_msg for x in ["503", "UNAVAILABLE", "429", "RESOURCE_EXHAUSTED", "high demand"]):
suggested = _parse_retry_delay(err_msg)
if suggested:
delay = suggested + random.uniform(0.5, 2.0)
else:
delay = base_delay * (2 ** attempt) + random.uniform(0.1, 1.0)
logger.warning(f"Gemini API rate-limit. Retrying in {delay:.1f}s (Attempt {attempt + 1}/{max_retries})... Error: {e}")
await asyncio.sleep(delay)
else:
raise e
raise last_exception
if file_path:
def _upload_pdf(file_path: str) -> Any:
return genai.upload_file(file_path)
uploaded = await asyncio.to_thread(_upload_pdf, file_path)
try:
return await _gen_content_async(model_name, system_message, prompt, uploaded)
finally:
try:
await asyncio.to_thread(genai.delete_file, uploaded.name)
except Exception:
pass
else:
return await _gen_content_async(model_name, system_message, prompt)
async def _call_groq(system_message: str, prompt: str) -> str:
# Token Safety Check: Limit TPM 6000 (applicable for any model on Groq)
# Jika total karakter > 8.000 (estimasi ~3000 token), pangkas paksa.
if len(system_message) + len(prompt) > 8000:
prompt = prompt[:7000] + "... [truncated for token safety]"
def _gen() -> str:
client = GroqClient(api_key=GROQ_API_KEY, base_url="https://api.groq.com/openai/v1")
# max_tokens diturunkan ke 3000 agar (Input ~3000 + Output 3000) = 6000 (Limit TPM)
resp = client.chat.completions.create(
model=GROQ_MODEL, # now set to llama3-70b or other Llama variant
messages=[
{"role": "system", "content": system_message},
{"role": "user", "content": prompt},
],
temperature=0.7,
max_tokens=3000,
)
return resp.choices[0].message.content
return await asyncio.to_thread(_gen)
TEACHING_METHOD_PROMPTS = {
"real_world": (
"Pemanfaatan Lingkungan Sekitar: AI mengaitkan setiap konsep dengan fenomena sehari-hari, "
"menggunakan analogi kehidupan nyata (seperti kecepatan bola, laju kereta, atau transaksi jual-beli). "
"Tujuannya agar siswa melihat bahwa ilmu pengetahuan ada di sekeliling mereka."
),
"imagination": (
"Membangkitkan Imajinasi & Kreativitas: AI mendorong visualisasi konsep, memberikan pertanyaan terbuka, "
"dan membuat skenario 'bagaimana jika' — fokus pada pemahaman logika dan metode, bukan sekadar menghafal rumus."
),
"independence": (
"Kemandirian dalam Keterbatasan: AI memberi tantangan yang memaksa siswa berpikir kreatif dan "
"menemukan jawaban sendiri menggunakan sumber daya yang ada — menanamkan mentalitas problem-solver."
),
"confidence": (
"Peningkatan Kepercayaan Diri: AI mengapresiasi proses berpikir siswa, menggunakan bahasa yang membangun, "
"dan memberikan tantangan di 'zona nyaman atas' untuk meningkatkan motivasi secara bertahap."
),
"anand_kumar": (
"Metode Pengajaran Anand Kumar: Mengubah cara AI mengajar dari sekadar memberi materi menjadi "
"pengalaman belajar yang hidup dan bermakna. Gunakan pendekatan yang sangat motivasional, "
"fokus pada pemecahan masalah yang menantang, dan buat siswa merasa mampu menaklukkan materi sesulit apa pun."
),
}
def _audience(user: User) -> str:
level = user.education_level or "Umum"
major = user.major or ("Umum" if level in {"SD", "SMP"} else "Umum")
sem = user.current_semester
if level in {"SD", "SMP", "SMA", "SMK", "MA"}:
sem_label = f"kelas {sem}" if sem else ""
base = f"siswa {level} {sem_label} jurusan/peminatan {major}".strip()
else:
base = f"mahasiswa {level} semester {sem or '-'} prodi {major}"
# Default to all including anand_kumar if not specified
methods = user.teaching_methods or ["anand_kumar", "real_world", "imagination", "independence", "confidence"]
method_instructions = []
for m in methods:
if m in TEACHING_METHOD_PROMPTS:
method_instructions.append(TEACHING_METHOD_PROMPTS[m])
if method_instructions:
return (
f"{base}\n\n"
"VISI PENGAJARAN:\n"
"Tujuanmu adalah membuat orang yang malas belajar menjadi mau, yang sudah mau menjadi rajin, "
"dan yang sudah rajin menjadi semakin pintar.\n\n"
"GAYA MENGAJAR & METODE:\n" + "\n".join(f"- {inst}" for inst in method_instructions)
)
return base
def _parse_json_block(text: str) -> Any:
text = text.strip()
text = re.sub(r'[\s\S]*?', '', text, flags=re.IGNORECASE).strip()
m = re.search(r"```(?:json)?\s*([\s\S]+?)```", text)
if m:
text = m.group(1).strip()
try:
return json.loads(text)
except json.JSONDecodeError:
start = min((i for i in [text.find("{"), text.find("[")] if i != -1), default=-1)
end = max(text.rfind("}"), text.rfind("]"))
if start != -1 and end != -1 and end > start:
candidate = text[start:end+1]
if not candidate.startswith(("{", "[")):
raise ValueError("Response AI tidak mengandung JSON yang valid")
try:
return json.loads(candidate)
except json.JSONDecodeError:
cleaned = _clean_json(candidate)
try:
return json.loads(cleaned)
except json.JSONDecodeError:
raise ValueError("Gagal parse JSON dari response AI meski sudah dibersihkan")
raise ValueError("Response AI tidak mengandung JSON")
def _empty_analysis() -> dict:
return {
"title": "",
"summary": "",
"key_concepts": [],
"diagrams": [],
"learning_objectives": [],
}
def _clean_json(s: str) -> str:
if not s or not s.strip():
return s
s = re.sub(r",\s*}", "}", s)
s = re.sub(r",\s*\]", "]", s)
s = s.replace("'", "\"")
s = s.replace("True", "true").replace("False", "false").replace("None", "null")
s = re.sub(r"//[^\n]*", "", s)
s = re.sub(r'\\([^"\\/bfnrtu])', r'\\\1', s)
return s.strip()
async def _call_groq_qwen(system_message: str, prompt: str) -> str:
return await _call_gemini(system_message, prompt)
async def generate_quiz_questions(documents: List[dict], user: User, n: int = 5, recap_text: str = "") -> List[dict]:
batch_size = 10
batches = []
temp_n = n
while temp_n > 0:
batches.append(min(batch_size, temp_n))
temp_n -= batch_size
audience = _audience(user)
async def _generate_batch(batch_count: int) -> List[dict]:
if recap_text:
context = recap_text[:5000]
system = (
f"Kamu adalah EduScanner AI, generator soal kuis HOTS bahasa Indonesia "
f"untuk {audience}. Soal harus menguji analisis, evaluasi, dan kreativitas — bukan hafalan. "
f"Sesuaikan tingkat kesulitan dengan jenjang. Buat soal berdasarkan rangkuman materi berikut."
)
prompt = (
f"Berdasarkan rangkuman materi berikut, buat {batch_count} soal pilihan ganda HOTS. "
f"Setiap soal punya 4 opsi (A-D), satu jawaban benar.\n\nRANGKUMAN:\n{context}\n\n"
"Kembalikan JSON array saja, tanpa markdown:\n"
'[{"question": "...", "options": ["...","...","...","..."], "correct_index": 0, "skill_type": "konsep", "source_title": ""}]'
)
else:
multi = len(documents) > 1
system = (
f"Kamu adalah EduScanner AI, generator soal kuis HOTS bahasa Indonesia "
f"untuk {audience}. Soal harus menguji analisis, evaluasi, dan kreativitas — bukan hafalan. "
f"Sesuaikan tingkat kesulitan dengan jenjang."
+ (f" Soal harus mencakup keseluruhan {len(documents)} materi yang diberikan, distribusikan secara merata." if multi else "")
)
sources = []
per_doc_budget = 1500 if len(documents) == 1 else (3000 // len(documents))
for d in documents:
sources.append({
"source": d.get("title") or d.get("filename") or "Dokumen",
"summary": (d.get("summary") or "")[:per_doc_budget],
"key_concepts": [c.get("concept", "") for c in d.get("key_concepts", [])[:3]],
"learning_objectives": d.get("learning_objectives", [])[:2],
})
context = json.dumps(sources, ensure_ascii=False)
if len(context) > 4000:
context = context[:4000] + "..."
prompt = (
f"Berdasarkan materi berikut ({len(documents)} sumber), buat {batch_count} soal pilihan ganda HOTS. "
f"Setiap soal punya 4 opsi (A-D), satu jawaban benar.\n\nMATERI:\n{context}\n\n"
"Kembalikan JSON array saja, tanpa markdown. Tiap soal sertakan source_title (nama dokumen sumber):\n"
'[{"question": "...", "options": ["...","...","...","..."], "correct_index": 0, "skill_type": "analisis_kode|troubleshooting|perancangan_db|konsep", "source_title": "judul dokumen"}]'
)
try:
if len(documents) == 1:
logger.info(f"Generating batch of {batch_count} quiz questions for 1 document...")
resp = await _call_groq_qwen(system, prompt)
else:
logger.info(f"Generating batch of {batch_count} quiz questions for {len(documents)} documents...")
resp = await _call_gemini(system, prompt)
data = _parse_json_block(resp)
except Exception as e:
logger.warning(f"Quiz gen batch failed, retry: {e}")
if len(documents) == 1:
if recap_text:
resp = await _call_groq_qwen(system, f"Buat {batch_count} soal HOTS dari rangkuman ini: {recap_text[:3000]}")
else:
minimal_sources = [{"source": s["source"], "summary": s["summary"][:500]} for s in sources]
minimal_context = json.dumps(minimal_sources, ensure_ascii=False)
resp = await _call_groq_qwen(system, f"Buat {batch_count} soal HOTS dari materi ini: {minimal_context}")
else:
if recap_text:
resp = await _call_gemini(system, f"Buat {batch_count} soal HOTS dari rangkuman ini: {recap_text[:3000]}")
else:
minimal_sources = [{"source": s["source"], "summary": s["summary"][:500]} for s in sources]
minimal_context = json.dumps(minimal_sources, ensure_ascii=False)
resp = await _call_gemini(system, f"Buat {batch_count} soal HOTS dari materi ini: {minimal_context}")
data = _parse_json_block(resp)
out = []
for q in data[:batch_count]:
out.append({
"id": uuid.uuid4().hex[:8],
"question": q["question"],
"options": q["options"][:4],
"correct_index": int(q["correct_index"]),
"skill_type": q.get("skill_type", "konsep"),
"source_title": q.get("source_title", ""),
})
return out
tasks = [_generate_batch(b) for b in batches]
results = await asyncio.gather(*tasks)
all_questions = []
for r in results:
all_questions.extend(r)
return all_questions[:n]
async def generate_recap(documents: List[dict], user: User) -> dict:
audience = _audience(user)
system = (
f"Kamu EduScanner AI yang menggabungkan materi belajar untuk {audience}. "
f"Bahasa Indonesia, jelas, sistematis. Output JSON saja tanpa markdown."
)
sources = []
# Token Optimization: Budget sangat ketat untuk Qwen 6000 limit
per_budget = 2500 if len(documents) == 1 else (4000 // len(documents))
for d in documents:
sources.append({
"title": d.get("title") or d.get("filename") or "Dokumen",
"summary": (d.get("summary") or "")[:per_budget],
"key_concepts": [c.get("concept", "") for c in d.get("key_concepts", [])[:4]], # Hanya konsep, tanpa penjelasan
"learning_objectives": d.get("learning_objectives", [])[:3],
})
context = json.dumps(sources, ensure_ascii=False)
# Emergency trim
if len(context) > 4500:
context = context[:4500] + "..."
prompt = (
f"Buat RANGKUMAN GABUNGAN dari {len(documents)} materi berikut:\n{context}\n\n"
"Output JSON:\n"
'{\n'
' "title": "judul rangkuman gabungan",\n'
' "unified_summary": "ringkasan terpadu 3-5 paragraf",\n'
' "per_document": [{"source_title":"...", "highlight":"poin penting"}],\n'
' "shared_concepts": [{"concept":"...","explanation":"..."}],\n'
' "study_path": ["langkah 1","..."]\n'
'}'
)
try:
resp = await _call_gemini(system, prompt)
return _parse_json_block(resp)
except Exception:
# Retry dengan konteks super minimal (gunakan Gemini lagi)
mini_context = json.dumps([{"t": s["title"]} for s in sources])
try:
resp = await _call_gemini(system, f"Buat ringkasan sangat singkat dari daftar materi ini: {mini_context}")
return {"title": "Ringkasan Minimal", "unified_summary": resp, "per_document": [], "shared_concepts": [], "study_path": []}
except Exception:
# Last resort: return empty
return {"title": "Ringkasan", "unified_summary": "", "per_document": [], "shared_concepts": [], "study_path": []}
async def generate_deep_feedback(quiz: dict, answers: List[int], user: User) -> dict:
audience = _audience(user)
questions = quiz["questions"]
total_q = len(questions)
# Batasi batch size untuk menghindari 413 (Request too large)
BATCH_SIZE = 5
all_items = []
total_score = 0
summaries = []
for i in range(0, total_q, BATCH_SIZE):
batch_qs = questions[i:i + BATCH_SIZE]
batch_ans = answers[i:i + BATCH_SIZE]
items = []
for j, q in enumerate(batch_qs):
sel = batch_ans[j] if j < len(batch_ans) else -1
items.append({
"question": q["question"],
"options": q["options"],
"correct_index": q["correct_index"],
"selected_index": sel,
})
system = (
f"Kamu EduScanner AI memberi feedback akademik mendalam bahasa Indonesia untuk {audience}. "
f"Batch {i//BATCH_SIZE + 1}. Selalu sertakan minimal satu referensi akademik atau buku pelajaran."
)
prompt = (
"Berikan feedback per soal. Kembalikan JSON saja tanpa markdown.\n\n"
f"SOAL+JAWABAN (Batch): {json.dumps(items, ensure_ascii=False)}\n\n"
"Format:\n"
'{\n'
' "score": 0-100 (untuk batch ini saja),\n'
' "summary": "ringkasan performa batch ini",\n'
' "items": [{"question":"...","selected":"...","correct":"...","is_correct":true,"explanation":"...","references":["...","..."]}]\n'
'}'
)
try:
resp = await _call_groq(system, prompt)
batch_feedback = _parse_json_block(resp)
except Exception:
# Retry sekali jika gagal
resp = await _call_groq(system, prompt)
batch_feedback = _parse_json_block(resp)
all_items.extend(batch_feedback.get("items", []))
total_score += batch_feedback.get("score", 0) * (len(batch_qs) / total_q)
summaries.append(batch_feedback.get("summary", ""))
# Hitung skor objektif dari is_correct agar konsisten dengan Deep Feedback
correct_count = sum(1 for it in all_items if it.get("is_correct"))
actual_score = round((correct_count / total_q) * 100) if total_q > 0 else 0
# Gabungkan summary akhir menggunakan Groq lagi (singkat saja)
final_summary = " ".join(summaries)
if len(summaries) > 1:
try:
sum_system = f"Kamu EduScanner AI. Gabungkan {len(summaries)} ringkasan performa kuis menjadi 1 paragraf padat bahasa Indonesia."
final_summary = await _call_groq(sum_system, f"RINGKASAN-RINGKASAN:\n{final_summary}")
except:
pass
return {
"score": actual_score,
"summary": final_summary,
"items": all_items
}
def _sync_run(async_fn, *args):
return asyncio.run(async_fn(*args))
async def _bg_analyze_document(doc_id: str, file_path: str, user: User, ip: str):
try:
analysis = await analyze_pdf(file_path, user)
current = await db.documents.find_one({"document_id": doc_id}, {"_id": 0, "status": 1})
if not current or current.get("status") in ("cancelled", "deleted"):
return
update = {
"title": analysis.get("title") or "",
"summary": analysis.get("summary", ""),
"key_concepts": analysis.get("key_concepts", []),
"diagrams": analysis.get("diagrams", []),
"learning_objectives": analysis.get("learning_objectives", []),
"status": "ready",
}
await db.documents.update_one({"document_id": doc_id}, {"$set": update})
await write_audit(user.user_id, "DOCUMENT_ANALYZED", {"document_id": doc_id}, ip)
except Exception as e:
logger.exception("Background analyze gagal")
current = await db.documents.find_one({"document_id": doc_id}, {"_id": 0, "status": 1})
if current and current.get("status") not in ("cancelled", "deleted"):
err_msg = str(e)[:300]
await db.documents.update_one(
{"document_id": doc_id},
{"$set": {"status": "failed", "error": err_msg}},
)
await write_audit(user.user_id, "DOCUMENT_ANALYSIS_FAILED", {"document_id": doc_id, "error": err_msg}, ip)
async def run_analysis_queued(doc_id: str, file_path: str, user: User, ip: str):
"""Proses analisis dengan antrean global — 1 file at a time untuk CPU Hugging Face."""
async with HF_DOCUMENT_SEMAPHORE:
await _bg_analyze_document(doc_id, file_path, user, ip)
# ============== TTS (Text-to-Speech) ==============
_TTS_LOCK = asyncio.Lock()
_TTS_VOICE = "id-ID-GadisNeural"
_SYMBOLS_RE = re.compile(r'[*#_~`^\\\[\]<>{}|@$%&+=/]')
def _clean_tts_text(text: str) -> str:
return _SYMBOLS_RE.sub("", text).strip()
async def _generate_tts(text: str, output_path: str, user: Optional[User] = None):
cleaned = _clean_tts_text(text)
if not cleaned:
raise HTTPException(400, "Teks kosong setelah dibersihkan")
use_cloning = True
voice_url = None
if user:
if getattr(user, "clone_voice_enabled", None) is False:
use_cloning = False
voice_url = getattr(user, "clone_voice_url", None)
if use_cloning:
def _chunk_text(text_to_chunk, max_chars=120):
words = text_to_chunk.split()
if not words:
return []
chunks_list = []
current_chunk = ""
for word in words:
candidate = (current_chunk + " " + word).strip() if current_chunk else word
if len(candidate) > max_chars:
if current_chunk.strip():
chunks_list.append(current_chunk.strip())
current_chunk = word
else:
current_chunk = candidate
if current_chunk.strip():
chunks_list.append(current_chunk.strip())
return chunks_list
def _concatenate_wavs(wav_contents):
if not wav_contents:
return b""
if len(wav_contents) == 1:
return wav_contents[0]
first_wav = wav_contents[0]
pcm_datas = [wav[44:] for wav in wav_contents]
total_pcm_length = sum(len(pcm) for pcm in pcm_datas)
import struct
header = bytearray(first_wav[:44])
header[4:8] = struct.pack(" Optional[bytes]:
try:
logger.info(f"Generating chunk {idx + 1}/{len(chunks)} with gradio_client: '{chunk_text}'")
client = Client("https://alstears-chatterbox-id-clone-api.hf.space")
# Decide whether to use audio_file or audio_url
audio_file_param = None
audio_url_param = ""
if voice_url:
if voice_url.startswith("http://") or voice_url.startswith("https://"):
audio_url_param = voice_url
elif os.path.exists(voice_url):
from gradio_client import handle_file
audio_file_param = handle_file(voice_url)
else:
audio_url_param = voice_url
else:
if os.path.exists(local_default_path):
from gradio_client import handle_file
audio_file_param = handle_file(local_default_path)
else:
audio_url_param = "https://eduai-deploy.vercel.app/suara/cara-membedakan-voice-changer-atau-murni-ala-miti-mythia-batford-aesood.wav"
result_path = client.predict(
text=chunk_text,
audio_file=audio_file_param,
audio_url=audio_url_param,
api_name="/clone_voice"
)
logger.info(f"gradio_client generated chunk {idx + 1} at: {result_path}")
if result_path and os.path.exists(result_path):
with open(result_path, "rb") as f:
return f.read()
elif result_path and result_path.startswith("http"):
import requests
r = requests.get(result_path)
r.raise_for_status()
return r.content
except Exception as e:
logger.error(f"gradio_client failed on chunk {idx + 1}: {e}")
return None
loop = asyncio.get_event_loop()
tasks = [
loop.run_in_executor(None, _gen_chunk_client, chunk, idx)
for idx, chunk in enumerate(chunks)
]
results = await asyncio.gather(*tasks)
wav_results = [r for r in results if r is not None]
if len(wav_results) == len(chunks):
logger.info(f"gradio_client successfully generated all {len(chunks)} chunks. Combining WAVs...")
return _concatenate_wavs(wav_results)
logger.error(f"gradio_client generated only {len(wav_results)}/{len(chunks)} chunks. Falling back to HTTP client...")
except ImportError:
logger.info("gradio_client not installed. Falling back to HTTP client.")
except Exception as e:
logger.error(f"gradio_client setup failed: {e}. Falling back to HTTP client.")
# Fallback raw HTTP client logic
async def _gen_chunk(chunk: str, idx: int, client: httpx.AsyncClient) -> Optional[bytes]:
try:
logger.info(f"Generating chunk {idx + 1}/{len(chunks)}: '{chunk}'")
submit_resp = await client.post(
"https://alstears-chatterbox-id-clone-api.hf.space/gradio_api/call/clone_voice",
json={"data": [chunk, None, target_voice_url]}
)
submit_resp.raise_for_status()
submit_data = submit_resp.json()
event_id = submit_data.get("event_id")
if not event_id:
logger.error(f"Gradio 6 submission failed to return event_id for chunk {idx + 1}: {submit_data}")
return None
logger.info(f"Job submitted for chunk {idx + 1}. Event ID: {event_id}. Fetching stream...")
stream_resp = await client.get(
f"https://alstears-chatterbox-id-clone-api.hf.space/gradio_api/call/clone_voice/{event_id}"
)
stream_resp.raise_for_status()
stream_text = stream_resp.text
error_match = re.search(r"event:\s*error\s*[\r\n]+data:\s*([^\r\n]+)", stream_text)
if error_match:
try:
import json
err_obj = json.loads(error_match.group(1))
if err_obj and "error" in err_obj:
logger.error(f"Gradio returned error for chunk {idx + 1}: {err_obj['error']}")
except Exception:
pass
matches = re.findall(r"data:\s*([^\r\n]+)", stream_text)
if not matches:
logger.error(f"No data events received in Gradio 6 stream for chunk {idx + 1}: {stream_text}")
return None
for match in reversed(matches):
try:
import json
parsed = json.loads(match.strip())
if isinstance(parsed, list) and len(parsed) > 0:
audio_info = parsed[0]
audio_url = None
if isinstance(audio_info, dict):
if "url" in audio_info:
audio_url = audio_info["url"]
elif "path" in audio_info:
audio_url = f"https://alstears-chatterbox-id-clone-api.hf.space/gradio_api/file={audio_info['path']}"
elif isinstance(audio_info, str):
audio_url = audio_info
if audio_url:
if audio_url.startswith("/"):
audio_url = f"https://alstears-chatterbox-id-clone-api.hf.space{audio_url}"
elif not audio_url.startswith("http"):
audio_url = f"https://alstears-chatterbox-id-clone-api.hf.space/gradio_api/file={audio_url}"
logger.info(f"Downloading audio for chunk {idx + 1} from: {audio_url}")
audio_resp = await client.get(audio_url)
audio_resp.raise_for_status()
return audio_resp.content
except Exception as parse_err:
logger.debug(f"Skipping unparseable stream data block: {parse_err}")
except Exception as e:
logger.exception(f"Failed to generate chunk {idx + 1}: {e}")
return None
async with httpx.AsyncClient(timeout=180.0) as client:
tasks = [_gen_chunk(chunk, idx, client) for idx, chunk in enumerate(chunks)]
results = await asyncio.gather(*tasks)
wav_results = [r for r in results if r is not None]
if len(wav_results) == len(chunks):
logger.info(f"Successfully generated all {len(chunks)} chunks. Combining WAVs...")
return _concatenate_wavs(wav_results)
logger.error(f"Generated only {len(wav_results)}/{len(chunks)} chunks successfully.")
return None
audio_content = await _try_chatterbox()
if audio_content:
with open(output_path, "wb") as f:
f.write(audio_content)
return
else:
logger.error("Chatterbox Gradio 6 failed to generate audio. Falling back to default TTS.")
# Default edge-tts neural voice fallback (fast and instant)
import edge_tts
communicate = edge_tts.Communicate(cleaned, _TTS_VOICE)
await communicate.save(output_path)
# ============== Education Settings ==============
@api_router.put("/user/education")
async def save_education_settings(payload: EducationSettingsPayload, request: Request, user: User = Depends(get_current_user)):
user_doc = await db.users.find_one({"user_id": user.user_id}, {"_id": 0})
if not user_doc:
raise HTTPException(404, "User tidak ditemukan")
subjects_out = []
for subj in payload.subjects:
subj_id = subj.id or f"subj_{uuid.uuid4().hex[:12]}"
folder_id = subj.folder_id
if not folder_id:
existing = await db.folders.find_one({"user_id": user.user_id, "name": subj.name}, {"_id": 0, "folder_id": 1})
if existing:
folder_id = existing["folder_id"]
else:
folder_id = uuid.uuid4().hex
await db.folders.insert_one({
"folder_id": folder_id,
"user_id": user.user_id,
"name": subj.name,
"created_at": datetime.now(timezone.utc).isoformat(),
})
await write_audit(user.user_id, "FOLDER_CREATED", {"folder_id": folder_id, "name": subj.name}, request.client.host if request.client else "")
subjects_out.append({"id": subj_id, "name": subj.name, "folder_id": folder_id})
schedule_out = [s.model_dump() for s in payload.schedule]
await db.users.update_one(
{"user_id": user.user_id},
{"$set": {"subjects": subjects_out, "schedule": schedule_out}},
)
await write_audit(user.user_id, "EDUCATION_SETTINGS_UPDATED", {"subjects": [s["name"] for s in subjects_out]}, request.client.host if request.client else "")
return {"subjects": subjects_out, "schedule": schedule_out}
@api_router.get("/user/education")
async def get_education_settings(user: User = Depends(get_current_user)):
doc = await db.users.find_one({"user_id": user.user_id}, {"_id": 0, "subjects": 1, "schedule": 1, "education_level": 1, "major": 1, "current_semester": 1, "institution": 1})
if not doc:
raise HTTPException(404, "User tidak ditemukan")
subjects = doc.get("subjects") or []
needs_save = False
for subj in subjects:
if not subj.get("id"):
subj["id"] = f"subj_{uuid.uuid4().hex[:12]}"
needs_save = True
if needs_save:
await db.users.update_one(
{"user_id": user.user_id},
{"$set": {"subjects": subjects}},
)
return {
"subjects": subjects,
"schedule": doc.get("schedule") or [],
"education_level": doc.get("education_level"),
"major": doc.get("major"),
"current_semester": doc.get("current_semester"),
"institution": doc.get("institution"),
}
@api_router.post("/user/education/generate")
async def generate_material(payload: MaterialGeneratePayload, request: Request, user: User = Depends(get_current_user)):
if not payload.subject_name.strip():
raise HTTPException(400, "Nama mapel wajib")
user_doc = await db.users.find_one({"user_id": user.user_id}, {"_id": 0})
if not user_doc:
raise HTTPException(404, "User tidak ditemukan")
level = user_doc.get("education_level", "Umum")
grade = user_doc.get("current_semester", "")
major = user_doc.get("major", "")
institution = user_doc.get("institution", "")
subj_data = None
for s in (user_doc.get("subjects") or []):
if s.get("id") == payload.subject_id:
subj_data = s
break
if not subj_data:
raise HTTPException(404, "Mapel tidak ditemukan di profil kamu")
folder_id = subj_data.get("folder_id")
topic = payload.topic or payload.subject_name
audience = _audience(user)
system = (
f"Kamu adalah asisten pembelajaran untuk {audience}. "
f"Buat materi belajar tentang {topic} untuk {level}, kelas {grade}"
+ (f", jurusan {major}" if major else "")
+ (f", {institution}" if institution else "") + ". "
f"Gunakan bahasa Indonesia. Format output sebagai JSON dengan keys: "
f"title (string), summary (string, 2-3 paragraf), key_concepts (array of {{concept, explanation}}), "
f"study_notes (string, penjelasan detail poin-poin penting), "
f"practice_questions (array of {{question, options (array of 4), correct_index, explanation}})."
)
prompt = (
f"Buat materi belajar tentang {topic} untuk {level} kelas {grade}"
+ (f" jurusan {major}" if major else "") + ". "
f"Topik ini adalah bagian dari mata pelajaran {payload.subject_name}. "
f"Buat materi yang sesuai dengan kurikulum Indonesia. "
f"Sertakan ringkasan, konsep kunci, catatan belajar, dan 3 soal latihan."
)
try:
resp = await _call_groq(system, prompt)
except Exception as e:
raise HTTPException(500, f"Gagal generate materi: {str(e)[:200]}")
# parse JSON from response
import json as _json
import re as _re
match = _re.search(r'\{.*\}', resp, _re.DOTALL)
if match:
try:
data = _json.loads(match.group())
except:
data = {"title": f"Materi {topic}", "summary": resp, "key_concepts": [], "study_notes": "", "practice_questions": []}
else:
data = {"title": f"Materi {topic}", "summary": resp, "key_concepts": [], "study_notes": "", "practice_questions": []}
material_id = uuid.uuid4().hex
title = data.get("title", f"Materi {topic}")
# Create study_material record
material = {
"material_id": material_id,
"user_id": user.user_id,
"subject_id": payload.subject_id,
"subject_name": payload.subject_name,
"folder_id": folder_id,
"topic": topic,
"title": title,
"summary": data.get("summary", ""),
"key_concepts": data.get("key_concepts", []),
"study_notes": data.get("study_notes", ""),
"practice_questions": data.get("practice_questions", []),
"created_at": datetime.now(timezone.utc).isoformat(),
}
await db.study_materials.insert_one(material.copy())
# Also create a document entry so it appears in folder & documents list
await db.documents.insert_one({
"document_id": material_id,
"user_id": user.user_id,
"filename": f"Materi - {title}.md",
"title": title,
"folder_id": folder_id,
"subject_id": payload.subject_id,
"subject_name": payload.subject_name,
"ai_generated": True,
"ai_content": data,
"summary": data.get("summary", ""),
"key_concepts": data.get("key_concepts", []),
"status": "ready",
"created_at": datetime.now(timezone.utc).isoformat(),
})
await write_audit(user.user_id, "MATERIAL_GENERATED", {"material_id": material_id, "subject": payload.subject_name, "topic": topic}, request.client.host if request.client else "")
material.pop("_id", None)
return material
@api_router.get("/user/education/materials")
async def list_materials(user: User = Depends(get_current_user)):
materials = await db.study_materials.find({"user_id": user.user_id}, {"_id": 0}).sort("created_at", -1).to_list(200)
return {"materials": materials}
@api_router.get("/user/education/materials/{material_id}")
async def get_material(material_id: str, user: User = Depends(get_current_user)):
m = await db.study_materials.find_one({"material_id": material_id, "user_id": user.user_id}, {"_id": 0})
if not m:
raise HTTPException(404, "Materi tidak ditemukan")
return m
@api_router.delete("/user/education/materials/{material_id}")
async def delete_material(material_id: str, request: Request, user: User = Depends(get_current_user)):
m = await db.study_materials.find_one({"material_id": material_id, "user_id": user.user_id}, {"_id": 0})
if not m:
raise HTTPException(404, "Materi tidak ditemukan")
await db.study_materials.delete_one({"material_id": material_id, "user_id": user.user_id})
await db.documents.delete_one({"document_id": material_id, "user_id": user.user_id, "ai_generated": True})
await write_audit(user.user_id, "MATERIAL_DELETED", {"material_id": material_id}, request.client.host if request.client else "")
return {"deleted": True}
# ============== Documents ==============
# _bg_analyze_document is defined in AI Helpers section above
@api_router.post("/documents/upload")
async def upload_document(request: Request, file: UploadFile = File(...), user: User = Depends(get_current_user)):
if not file.filename.lower().endswith(".pdf"):
raise HTTPException(400, "Format tidak didukung. Hanya PDF yang diterima.")
doc_id = uuid.uuid4().hex
saved_path = UPLOAD_DIR / f"{doc_id}.pdf"
with saved_path.open("wb") as f:
shutil.copyfileobj(file.file, f)
# Store PDF bytes in MongoDB for persistent serving
try:
with open(saved_path, "rb") as f:
pdf_bytes = f.read()
await db.pdf_files.insert_one({
"document_id": doc_id,
"user_id": user.user_id,
"data": Binary(pdf_bytes),
"created_at": datetime.now(timezone.utc).isoformat(),
})
except Exception as e:
logger.warning(f"Gagal simpan PDF ke MongoDB: {e}")
# Also try Supabase Storage (optional, non-blocking)
asyncio.create_task(_try_upload_supabase(user.user_id, doc_id, str(saved_path)))
base = {
"document_id": doc_id,
"user_id": user.user_id,
"filename": file.filename,
"title": file.filename,
"file_path": str(saved_path),
"summary": "",
"key_concepts": [],
"diagrams": [],
"learning_objectives": [],
"status": "processing",
"created_at": datetime.now(timezone.utc).isoformat(),
}
await db.documents.insert_one(base.copy())
ip = request.client.host if request.client else ""
await write_audit(user.user_id, "DOCUMENT_UPLOAD", {"document_id": doc_id, "filename": file.filename}, ip)
# Proses analisis lewat antrean global — return instan ke Vercel
asyncio.create_task(run_analysis_queued(doc_id, str(saved_path), user, ip))
doc = await db.documents.find_one({"document_id": doc_id}, {"_id": 0, "file_path": 0})
return JSONResponse(status_code=202, content=doc)
ALLOWED_IMAGE_TYPES = {"image/jpeg", "image/png", "image/webp", "image/bmp"}
@api_router.post("/documents/upload-subject-material/{subject_id}")
async def upload_subject_material(
request: Request,
subject_id: str,
files: List[UploadFile] = File(...),
user: User = Depends(get_current_user),
):
if not files or len(files) == 0:
raise HTTPException(400, "Pilih minimal 1 file")
user_doc = await db.users.find_one({"user_id": user.user_id}, {"_id": 0, "subjects": 1})
if not user_doc:
raise HTTPException(404, "User tidak ditemukan")
subj = None
for s in (user_doc.get("subjects") or []):
if s.get("id") == subject_id:
subj = s
break
if not subj:
raise HTTPException(404, "Mapel tidak ditemukan di profil kamu")
folder_id = subj.get("folder_id")
created = []
ip = request.client.host if request.client else ""
for file in files:
if not file.filename:
continue
ext = os.path.splitext(file.filename)[1].lower()
is_image = file.content_type in ALLOWED_IMAGE_TYPES or ext in (".jpg", ".jpeg", ".png", ".webp", ".bmp")
is_pdf = ext == ".pdf" or file.content_type == "application/pdf"
if not is_image and not is_pdf:
raise HTTPException(400, f"Format {file.filename} tidak didukung. Gunakan PDF atau gambar (JPG/PNG).")
doc_id = uuid.uuid4().hex
if is_image:
if not PIL_AVAILABLE:
raise HTTPException(400, "Konversi gambar ke PDF tidak tersedia di server ini.")
try:
contents = await file.read()
img = Image.open(io.BytesIO(contents))
if img.mode == "RGBA":
img = img.convert("RGB")
pdf_bytes_io = io.BytesIO()
img.save(pdf_bytes_io, format="PDF")
pdf_bytes = pdf_bytes_io.getvalue()
except Exception as e:
raise HTTPException(400, f"Gagal konversi gambar {file.filename} ke PDF: {str(e)}")
finally:
file.file.close()
else:
pdf_bytes = await file.read()
saved_path = UPLOAD_DIR / f"{doc_id}.pdf"
with saved_path.open("wb") as f:
f.write(pdf_bytes)
try:
await db.pdf_files.insert_one({
"document_id": doc_id,
"user_id": user.user_id,
"data": Binary(pdf_bytes),
"created_at": datetime.now(timezone.utc).isoformat(),
})
except Exception:
pass
asyncio.create_task(_try_upload_supabase(user.user_id, doc_id, str(saved_path)))
source_label = file.filename or (f"Foto_{doc_id[:8]}.png" if is_image else f"Dokumen_{doc_id[:8]}.pdf")
base = {
"document_id": doc_id,
"user_id": user.user_id,
"filename": source_label,
"title": source_label,
"file_path": str(saved_path),
"folder_id": folder_id,
"summary": "",
"key_concepts": [],
"diagrams": [],
"learning_objectives": [],
"subject_id": subject_id,
"subject_name": subj.get("name", ""),
"status": "processing",
"created_at": datetime.now(timezone.utc).isoformat(),
}
await db.documents.insert_one(base.copy())
await write_audit(user.user_id, "SUBJECT_MATERIAL_UPLOAD", {
"document_id": doc_id,
"filename": source_label,
"subject_id": subject_id,
"subject_name": subj.get("name", ""),
}, ip)
# Proses lewat antrean global — 1 file at a time untuk CPU stabil
asyncio.create_task(run_analysis_queued(doc_id, str(saved_path), user, ip))
doc_view = base.copy()
doc_view.pop("file_path", None)
created.append(doc_view)
return JSONResponse(status_code=202, content={"documents": created, "subject_id": subject_id, "subject_name": subj.get("name", "")})
@api_router.get("/documents")
async def list_documents(user: User = Depends(get_current_user)):
docs = await db.documents.find({"user_id": user.user_id}, {"_id": 0, "file_path": 0}).sort("created_at", -1).to_list(100)
return docs
@api_router.get("/documents/{doc_id}")
async def get_document(doc_id: str, user: User = Depends(get_current_user)):
doc = await db.documents.find_one({"document_id": doc_id, "user_id": user.user_id}, {"_id": 0, "file_path": 0})
if not doc:
raise HTTPException(404, "Dokumen tidak ditemukan")
return doc
@api_router.post("/documents/{doc_id}/cancel")
async def cancel_document(request: Request, doc_id: str, user: User = Depends(get_current_user)):
doc = await db.documents.find_one({"document_id": doc_id, "user_id": user.user_id}, {"_id": 0})
if not doc:
raise HTTPException(404, "Dokumen tidak ditemukan")
if doc.get("status") != "processing":
raise HTTPException(400, "Dokumen tidak sedang diproses")
await db.documents.update_one({"document_id": doc_id}, {"$set": {"status": "cancelled"}})
await write_audit(user.user_id, "DOCUMENT_CANCELLED", {"document_id": doc_id}, request.client.host if request.client else "")
return {"document_id": doc_id, "status": "cancelled"}
@api_router.delete("/documents/{doc_id}")
async def delete_document(request: Request, doc_id: str, user: User = Depends(get_current_user)):
doc = await db.documents.find_one({"document_id": doc_id, "user_id": user.user_id}, {"_id": 0})
if not doc:
raise HTTPException(404, "Dokumen tidak ditemukan")
# Mark first so any in-flight bg task notices on completion
await db.documents.update_one({"document_id": doc_id}, {"$set": {"status": "deleted"}})
# Cascade: collect quiz_ids for this doc, delete results+quizzes+doc, remove file
quiz_ids = [q["quiz_id"] async for q in db.quizzes.find({"document_id": doc_id, "user_id": user.user_id}, {"quiz_id": 1, "_id": 0})]
if quiz_ids:
await db.quiz_results.delete_many({"quiz_id": {"$in": quiz_ids}, "user_id": user.user_id})
await db.quizzes.delete_many({"quiz_id": {"$in": quiz_ids}, "user_id": user.user_id})
fp = doc.get("file_path")
if fp:
try:
Path(fp).unlink(missing_ok=True)
except Exception:
logger.warning(f"Gagal hapus file {fp}")
user_id = user.user_id
await _delete_from_supabase_storage(user_id, doc_id)
await db.pdf_files.delete_one({"document_id": doc_id})
await db.documents.delete_one({"document_id": doc_id, "user_id": user.user_id})
await write_audit(user.user_id, "DOCUMENT_DELETED", {"document_id": doc_id, "filename": doc.get("filename")}, request.client.host if request.client else "")
return {"document_id": doc_id, "deleted": True}
# ============== Quiz ==============
def _public_quiz(quiz_doc: dict) -> dict:
out = {k: v for k, v in quiz_doc.items() if k != "_id"}
if isinstance(out.get("questions"), list):
out["questions"] = [{k: v for k, v in q.items() if k != "correct_index"} for q in out["questions"]]
return out
async def _bg_generate_quiz(quiz_id: str, documents: List[dict], user: User, n: int, ip: str, recap_text: str = ""):
try:
questions = await generate_quiz_questions(documents, user, n, recap_text=recap_text)
current = await db.quizzes.find_one({"quiz_id": quiz_id}, {"_id": 0, "status": 1})
if not current or current.get("status") in ("cancelled", "deleted"):
return
await db.quizzes.update_one(
{"quiz_id": quiz_id},
{"$set": {"questions": questions, "status": "ready"}},
)
await write_audit(
user.user_id,
"QUIZ_GENERATED",
{"quiz_id": quiz_id, "document_ids": [d.get("document_id") for d in documents]},
ip,
)
except Exception as e:
logger.exception("Background quiz gen gagal")
current = await db.quizzes.find_one({"quiz_id": quiz_id}, {"_id": 0, "status": 1})
if current and current.get("status") not in ("cancelled", "deleted"):
await db.quizzes.update_one(
{"quiz_id": quiz_id},
{"$set": {"status": "failed", "error": str(e)[:300]}},
)
async def _resolve_documents(payload_document_id, payload_document_ids, payload_folder_id, user) -> List[dict]:
"""Resolve a list of ready documents from any of the input shapes."""
ids: List[str] = []
if payload_document_ids:
ids = list(payload_document_ids)
elif payload_document_id:
ids = [payload_document_id]
elif payload_folder_id:
async for d in db.documents.find(
{"user_id": user.user_id, "folder_id": payload_folder_id, "status": "ready"},
{"_id": 0},
):
ids.append(d["document_id"])
if not ids:
raise HTTPException(400, "Pilih minimal satu dokumen atau folder")
docs = []
async for d in db.documents.find(
{"document_id": {"$in": ids}, "user_id": user.user_id},
{"_id": 0, "file_path": 0},
):
docs.append(d)
if not docs:
raise HTTPException(404, "Dokumen tidak ditemukan")
not_ready = [d.get("filename") for d in docs if d.get("status") != "ready"]
if not_ready:
raise HTTPException(400, f"Dokumen belum siap: {', '.join(not_ready)}")
return docs
@api_router.post("/quiz/generate")
async def quiz_generate(request: Request, payload: QuizGenerateRequest, user: User = Depends(get_current_user)):
documents = await _resolve_documents(payload.document_id, payload.document_ids, payload.folder_id, user)
recap_text = ""
if payload.recap_id:
recap = await db.recaps.find_one({"recap_id": payload.recap_id, "user_id": user.user_id})
if recap:
recap_text = recap.get("unified_summary", "") or ""
if not recap_text and recap.get("per_document_summaries"):
recap_text = "\n".join(recap["per_document_summaries"].values())
quiz_id = uuid.uuid4().hex
quiz_doc = {
"quiz_id": quiz_id,
"user_id": user.user_id,
"document_id": documents[0]["document_id"], # primary (BC)
"document_ids": [d["document_id"] for d in documents],
"source_titles": [d.get("title") or d.get("filename") for d in documents],
"recap_id": payload.recap_id,
"folder_id": payload.folder_id,
"questions": [],
"status": "processing",
"created_at": datetime.now(timezone.utc).isoformat(),
}
await db.quizzes.insert_one(quiz_doc.copy())
ip = request.client.host if request.client else ""
asyncio.create_task(_bg_generate_quiz(quiz_id, documents, user, payload.question_count, ip, recap_text))
return _public_quiz(quiz_doc)
@api_router.get("/quiz/results")
async def quiz_results_list(user: User = Depends(get_current_user), limit: int = 50):
try:
# Ambil semua status agar user tahu jika masih processing atau gagal
results = await db.quiz_results.find(
{"user_id": user.user_id},
{"_id": 0, "items": 0},
).sort("created_at", -1).to_list(limit)
logger.info(f"User {user.user_id} requested quiz results. Found: {len(results)}")
out = []
for r in results:
try:
quiz_id = r.get("quiz_id")
if quiz_id:
quiz = await db.quizzes.find_one({"quiz_id": quiz_id}, {"_id": 0, "source_titles": 1, "folder_id": 1})
r["source_titles"] = (quiz or {}).get("source_titles", [])
r["folder_id"] = (quiz or {}).get("folder_id")
else:
r["source_titles"] = []
r["folder_id"] = None
out.append(r)
except Exception as e:
logger.warning(f"Gagal memproses satu item kuis: {e}")
out.append(r) # Tetap tambahkan meskipun gagal ambil detail kuis
return {"results": out}
except Exception as e:
logger.exception("Gagal mengambil riwayat kuis")
raise HTTPException(500, f"Gagal mengambil riwayat: {str(e)}")
@api_router.get("/quiz/{quiz_id}")
async def quiz_get(quiz_id: str, user: User = Depends(get_current_user)):
quiz = await db.quizzes.find_one({"quiz_id": quiz_id, "user_id": user.user_id}, {"_id": 0})
if not quiz:
raise HTTPException(404, "Kuis tidak ditemukan")
return _public_quiz(quiz)
@api_router.post("/quiz/{quiz_id}/cancel")
async def cancel_quiz(request: Request, quiz_id: str, user: User = Depends(get_current_user)):
quiz = await db.quizzes.find_one({"quiz_id": quiz_id, "user_id": user.user_id}, {"_id": 0})
if not quiz:
raise HTTPException(404, "Kuis tidak ditemukan")
if quiz.get("status") != "processing":
raise HTTPException(400, "Kuis tidak sedang diproses")
await db.quizzes.update_one({"quiz_id": quiz_id}, {"$set": {"status": "cancelled"}})
await write_audit(user.user_id, "QUIZ_CANCELLED", {"quiz_id": quiz_id}, request.client.host if request.client else "")
return {"quiz_id": quiz_id, "status": "cancelled"}
@api_router.delete("/quiz/{quiz_id}")
async def delete_quiz(request: Request, quiz_id: str, user: User = Depends(get_current_user)):
quiz = await db.quizzes.find_one({"quiz_id": quiz_id, "user_id": user.user_id}, {"_id": 0})
if not quiz:
raise HTTPException(404, "Kuis tidak ditemukan")
await db.quiz_results.delete_many({"quiz_id": quiz_id, "user_id": user.user_id})
await db.quizzes.delete_one({"quiz_id": quiz_id, "user_id": user.user_id})
await write_audit(user.user_id, "QUIZ_DELETED", {"quiz_id": quiz_id}, request.client.host if request.client else "")
return {"quiz_id": quiz_id, "deleted": True}
async def _bg_grade_quiz(result_id: str, quiz: dict, answers: List[int], user: User, ip: str):
try:
feedback = await generate_deep_feedback(quiz, answers, user)
current = await db.quiz_results.find_one({"result_id": result_id}, {"_id": 0, "status": 1})
if not current or current.get("status") in ("cancelled", "deleted"):
return
await db.quiz_results.update_one(
{"result_id": result_id},
{"$set": {
"score": int(feedback.get("score", 0)),
"summary": feedback.get("summary", ""),
"items": feedback.get("items", []),
"status": "ready",
}},
)
await write_audit(user.user_id, "QUIZ_SUBMITTED", {"quiz_id": quiz["quiz_id"], "score": int(feedback.get("score", 0))}, ip)
except Exception as e:
logger.exception("Background grading gagal")
current = await db.quiz_results.find_one({"result_id": result_id}, {"_id": 0, "status": 1})
if current and current.get("status") not in ("cancelled", "deleted"):
await db.quiz_results.update_one(
{"result_id": result_id},
{"$set": {"status": "failed", "error": str(e)[:300]}},
)
@api_router.post("/quiz/submit")
async def quiz_submit(request: Request, payload: QuizSubmission, user: User = Depends(get_current_user)):
quiz = await db.quizzes.find_one({"quiz_id": payload.quiz_id, "user_id": user.user_id}, {"_id": 0})
if not quiz:
raise HTTPException(404, "Kuis tidak ditemukan")
if quiz.get("status") != "ready":
raise HTTPException(400, "Kuis belum siap dinilai")
result_id = uuid.uuid4().hex
doc = {
"result_id": result_id,
"quiz_id": payload.quiz_id,
"document_id": quiz["document_id"],
"user_id": user.user_id,
"answers": payload.answers,
"score": 0,
"summary": "",
"items": [],
"status": "processing",
"created_at": datetime.now(timezone.utc).isoformat(),
}
await db.quiz_results.insert_one(doc.copy())
ip = request.client.host if request.client else ""
asyncio.create_task(_bg_grade_quiz(result_id, quiz, payload.answers, user, ip))
doc.pop("_id", None)
return doc
@api_router.get("/documents/{doc_id}/latest-result")
async def get_latest_doc_result(doc_id: str, user: User = Depends(get_current_user)):
# Cari hasil kuis terbaru untuk dokumen ini
r = await db.quiz_results.find_one(
{"document_id": doc_id, "user_id": user.user_id, "status": "ready"},
{"_id": 0},
sort=[("created_at", -1)]
)
return r
@api_router.get("/folders/{folder_id}/latest-result")
async def get_latest_folder_result(folder_id: str, user: User = Depends(get_current_user)):
# Cari kuis yang punya folder_id ini
quizzes = await db.quizzes.find({"folder_id": folder_id, "user_id": user.user_id}, {"quiz_id": 1}).to_list(100)
quiz_ids = [q["quiz_id"] for q in quizzes]
if not quiz_ids:
return None
r = await db.quiz_results.find_one(
{"quiz_id": {"$in": quiz_ids}, "user_id": user.user_id, "status": "ready"},
{"_id": 0},
sort=[("created_at", -1)]
)
return r
@api_router.get("/quiz/result/{result_id}")
async def quiz_result_get(result_id: str, user: User = Depends(get_current_user)):
r = await db.quiz_results.find_one({"result_id": result_id, "user_id": user.user_id}, {"_id": 0})
if not r:
raise HTTPException(404, "Hasil kuis tidak ditemukan")
quiz = await db.quizzes.find_one({"quiz_id": r.get("quiz_id")}, {"_id": 0})
if quiz:
r["source_titles"] = quiz.get("source_titles", [])
r["questions"] = []
for i, q in enumerate(quiz.get("questions", [])):
fb = None
if r.get("items") and i < len(r["items"]):
fb = r["items"][i]
r["questions"].append({
"question": q.get("question", ""),
"options": q.get("options", []),
"correct_index": q.get("correct_index", 0),
"user_answer_index": r.get("answers", [])[i] if i < len(r.get("answers", [])) else -1,
"is_correct": fb.get("is_correct") if fb else None,
"explanation": fb.get("explanation", "") if fb else "",
})
return r
class QuizChatPayload(BaseModel):
question: str
@api_router.post("/quiz/result/{result_id}/chat")
async def quiz_result_chat(result_id: str, payload: QuizChatPayload, user: User = Depends(get_current_user)):
r = await db.quiz_results.find_one({"result_id": result_id, "user_id": user.user_id}, {"_id": 0})
if not r:
raise HTTPException(404, "Hasil tidak ditemukan")
if r.get("status") != "ready":
raise HTTPException(400, "Hasil belum siap")
if not payload.question.strip():
raise HTTPException(400, "Pertanyaan tidak boleh kosong")
quiz = await db.quizzes.find_one({"quiz_id": r.get("quiz_id")}, {"_id": 0})
audience = _audience(user)
context = {
"score": r.get("score", 0),
"summary": r.get("summary", ""),
"questions": [],
}
if quiz:
for i, q in enumerate(quiz.get("questions", [])):
fb = None
if r.get("items") and i < len(r["items"]):
fb = r["items"][i]
is_correct = fb.get("is_correct") if fb else False
# Optimasi Token: Jika benar, jangan kirim opsi jawaban untuk hemat konteks
q_data = {
"question": q.get("question", ""),
"is_correct": is_correct,
"explanation": fb.get("explanation", "") if fb else "",
}
if not is_correct:
q_data["options"] = q.get("options", [])
q_data["correct_index"] = q.get("correct_index", 0)
q_data["user_answer_index"] = r.get("answers", [])[i] if i < len(r.get("answers", [])) else -1
context["questions"].append(q_data)
system = (
f"Kamu adalah EduScanner AI, tutor akademik untuk {audience}. "
f"Kamu membantu user memahami hasil kuis mereka. Bahasa Indonesia. "
f"Gunakan data kuis (detail hanya ada untuk soal salah) untuk menjawab."
)
prompt = (
f"DATA KUIS:\n{json.dumps(context, ensure_ascii=False)}\n\n"
f"PERTANYAAN USER: {payload.question}"
)
try:
resp = await _call_groq(system, prompt)
return {"answer": resp}
except Exception as e:
raise HTTPException(500, f"Gagal menjawab: {str(e)[:200]}")
@api_router.post("/quiz/result/{result_id}/cancel")
async def cancel_result(request: Request, result_id: str, user: User = Depends(get_current_user)):
r = await db.quiz_results.find_one({"result_id": result_id, "user_id": user.user_id}, {"_id": 0})
if not r:
raise HTTPException(404, "Hasil tidak ditemukan")
if r.get("status") != "processing":
raise HTTPException(400, "Hasil tidak sedang diproses")
await db.quiz_results.update_one({"result_id": result_id}, {"$set": {"status": "cancelled"}})
await write_audit(user.user_id, "RESULT_CANCELLED", {"result_id": result_id}, request.client.host if request.client else "")
return {"result_id": result_id, "status": "cancelled"}
@api_router.delete("/quiz/result/{result_id}")
async def delete_result(request: Request, result_id: str, user: User = Depends(get_current_user)):
r = await db.quiz_results.find_one({"result_id": result_id, "user_id": user.user_id}, {"_id": 0})
if not r:
raise HTTPException(404, "Hasil tidak ditemukan")
await db.quiz_results.delete_one({"result_id": result_id, "user_id": user.user_id})
await write_audit(user.user_id, "RESULT_DELETED", {"result_id": result_id}, request.client.host if request.client else "")
return {"result_id": result_id, "deleted": True}
# ============== Folders ==============
@api_router.post("/folders")
async def folder_create(request: Request, payload: FolderCreate, user: User = Depends(get_current_user)):
name = payload.name.strip()
if not name:
raise HTTPException(400, "Nama folder wajib")
folder_id = uuid.uuid4().hex
doc = {
"folder_id": folder_id,
"user_id": user.user_id,
"name": name,
"created_at": datetime.now(timezone.utc).isoformat(),
}
await db.folders.insert_one(doc.copy())
await write_audit(user.user_id, "FOLDER_CREATED", {"folder_id": folder_id, "name": name}, request.client.host if request.client else "")
doc.pop("_id", None)
return {**doc, "document_count": 0}
@api_router.get("/folders")
async def folder_list(user: User = Depends(get_current_user)):
folders = await db.folders.find({"user_id": user.user_id}, {"_id": 0}).sort("created_at", -1).to_list(200)
# attach document counts
for f in folders:
f["document_count"] = await db.documents.count_documents({"user_id": user.user_id, "folder_id": f["folder_id"]})
return folders
@api_router.get("/folders/{folder_id}")
async def folder_get(folder_id: str, user: User = Depends(get_current_user)):
folder = await db.folders.find_one({"folder_id": folder_id, "user_id": user.user_id}, {"_id": 0})
if not folder:
raise HTTPException(404, "Folder tidak ditemukan")
docs = await db.documents.find({"user_id": user.user_id, "folder_id": folder_id}, {"_id": 0, "file_path": 0}).sort("created_at", -1).to_list(500)
folder_doc_ids = [d["document_id"] for d in docs]
recaps = await db.recaps.find({
"user_id": user.user_id,
"$or": [
{"folder_id": folder_id},
{"document_ids": {"$in": folder_doc_ids}},
]
}, {"_id": 0, "unified_summary": 0, "per_document": 0, "shared_concepts": 0, "study_path": 0}).sort("created_at", -1).to_list(50)
folder["documents"] = docs
folder["recaps"] = recaps
folder["document_count"] = len(docs)
return folder
@api_router.put("/folders/{folder_id}")
async def folder_update(request: Request, folder_id: str, payload: FolderUpdate, user: User = Depends(get_current_user)):
folder = await db.folders.find_one({"folder_id": folder_id, "user_id": user.user_id}, {"_id": 0})
if not folder:
raise HTTPException(404, "Folder tidak ditemukan")
name = payload.name.strip()
if not name:
raise HTTPException(400, "Nama folder wajib")
await db.folders.update_one({"folder_id": folder_id}, {"$set": {"name": name}})
await write_audit(user.user_id, "FOLDER_RENAMED", {"folder_id": folder_id, "name": name}, request.client.host if request.client else "")
return {**folder, "name": name}
@api_router.delete("/folders/{folder_id}")
async def folder_delete(request: Request, folder_id: str, user: User = Depends(get_current_user)):
"""Delete folder AND cascade-delete its documents (per user choice)."""
folder = await db.folders.find_one({"folder_id": folder_id, "user_id": user.user_id}, {"_id": 0})
if not folder:
raise HTTPException(404, "Folder tidak ditemukan")
doc_ids = []
async for d in db.documents.find({"user_id": user.user_id, "folder_id": folder_id}, {"_id": 0}):
doc_ids.append(d["document_id"])
fp = d.get("file_path")
if fp:
try:
Path(fp).unlink(missing_ok=True)
except Exception:
pass
if doc_ids:
quiz_ids = [q["quiz_id"] async for q in db.quizzes.find({"user_id": user.user_id, "document_ids": {"$in": doc_ids}}, {"quiz_id": 1, "_id": 0})]
# also legacy single document_id quizzes
async for q in db.quizzes.find({"user_id": user.user_id, "document_id": {"$in": doc_ids}}, {"quiz_id": 1, "_id": 0}):
if q["quiz_id"] not in quiz_ids:
quiz_ids.append(q["quiz_id"])
if quiz_ids:
await db.quiz_results.delete_many({"user_id": user.user_id, "quiz_id": {"$in": quiz_ids}})
await db.quizzes.delete_many({"user_id": user.user_id, "quiz_id": {"$in": quiz_ids}})
await db.pdf_files.delete_many({"document_id": {"$in": doc_ids}})
await db.documents.delete_many({"user_id": user.user_id, "document_id": {"$in": doc_ids}})
await db.folders.delete_one({"folder_id": folder_id, "user_id": user.user_id})
await write_audit(user.user_id, "FOLDER_DELETED", {"folder_id": folder_id, "name": folder.get("name"), "documents_deleted": len(doc_ids)}, request.client.host if request.client else "")
return {"folder_id": folder_id, "deleted": True, "documents_deleted": len(doc_ids)}
@api_router.post("/documents/move")
async def documents_move(request: Request, payload: DocumentMove, user: User = Depends(get_current_user)):
if payload.folder_id:
folder = await db.folders.find_one({"folder_id": payload.folder_id, "user_id": user.user_id}, {"_id": 0})
if not folder:
raise HTTPException(404, "Folder tidak ditemukan")
result = await db.documents.update_many(
{"user_id": user.user_id, "document_id": {"$in": payload.document_ids}},
{"$set": {"folder_id": payload.folder_id}},
)
await write_audit(user.user_id, "DOCUMENTS_MOVED", {"folder_id": payload.folder_id, "count": result.modified_count}, request.client.host if request.client else "")
return {"moved": result.modified_count, "folder_id": payload.folder_id}
# ============== Recap (Multi-doc Summary) ==============
async def _bg_generate_recap(recap_id: str, documents: List[dict], user: User, ip: str):
try:
data = await generate_recap(documents, user)
current = await db.recaps.find_one({"recap_id": recap_id}, {"_id": 0, "status": 1, "folder_id": 1, "document_ids": 1})
if not current or current.get("status") in ("cancelled", "deleted"):
return
# Update folder snapshot first (if applicable), BEFORE marking recap as ready
folder_id = current.get("folder_id")
if folder_id:
await db.folders.update_one(
{"folder_id": folder_id},
{"$set": {
"recap_id": recap_id,
"recap_title": data.get("title", ""),
"recap_summary": data.get("unified_summary", ""),
"recap_document_ids": current.get("document_ids", []),
"recap_generated_at": datetime.now(timezone.utc).isoformat(),
}},
)
# Now mark recap as ready
await db.recaps.update_one(
{"recap_id": recap_id},
{"$set": {
"title": data.get("title", ""),
"unified_summary": data.get("unified_summary", ""),
"per_document": data.get("per_document", []),
"shared_concepts": data.get("shared_concepts", []),
"study_path": data.get("study_path", []),
"status": "ready",
}},
)
await write_audit(user.user_id, "RECAP_GENERATED", {"recap_id": recap_id, "count": len(documents)}, ip)
except Exception as e:
logger.exception("Background recap gagal")
current = await db.recaps.find_one({"recap_id": recap_id}, {"_id": 0, "status": 1})
if current and current.get("status") not in ("cancelled", "deleted"):
await db.recaps.update_one(
{"recap_id": recap_id},
{"$set": {"status": "failed", "error": str(e)[:300]}},
)
@api_router.post("/recap")
async def recap_generate(request: Request, payload: RecapRequest, user: User = Depends(get_current_user)):
documents = await _resolve_documents(None, payload.document_ids, payload.folder_id, user)
recap_id = uuid.uuid4().hex
doc = {
"recap_id": recap_id,
"user_id": user.user_id,
"document_ids": [d["document_id"] for d in documents],
"source_titles": [d.get("title") or d.get("filename") for d in documents],
"folder_id": payload.folder_id,
"title": "",
"unified_summary": "",
"per_document": [],
"shared_concepts": [],
"study_path": [],
"status": "processing",
"created_at": datetime.now(timezone.utc).isoformat(),
}
await db.recaps.insert_one(doc.copy())
ip = request.client.host if request.client else ""
asyncio.create_task(_bg_generate_recap(recap_id, documents, user, ip))
doc.pop("_id", None)
return doc
@api_router.get("/recap/{recap_id}")
async def recap_get(recap_id: str, user: User = Depends(get_current_user)):
r = await db.recaps.find_one({"recap_id": recap_id, "user_id": user.user_id}, {"_id": 0})
if not r:
raise HTTPException(404, "Recap tidak ditemukan")
return r
@api_router.post("/recap/{recap_id}/cancel")
async def recap_cancel(request: Request, recap_id: str, user: User = Depends(get_current_user)):
r = await db.recaps.find_one({"recap_id": recap_id, "user_id": user.user_id}, {"_id": 0})
if not r:
raise HTTPException(404, "Recap tidak ditemukan")
if r.get("status") != "processing":
raise HTTPException(400, "Recap tidak sedang diproses")
await db.recaps.update_one({"recap_id": recap_id}, {"$set": {"status": "cancelled"}})
await write_audit(user.user_id, "RECAP_CANCELLED", {"recap_id": recap_id}, request.client.host if request.client else "")
return {"recap_id": recap_id, "status": "cancelled"}
@api_router.delete("/recap/{recap_id}")
async def recap_delete(request: Request, recap_id: str, user: User = Depends(get_current_user)):
r = await db.recaps.find_one({"recap_id": recap_id, "user_id": user.user_id}, {"_id": 0})
if not r:
raise HTTPException(404, "Recap tidak ditemukan")
await db.recaps.delete_one({"recap_id": recap_id, "user_id": user.user_id})
if r.get("folder_id"):
await db.folders.update_one(
{"folder_id": r["folder_id"]},
{"$unset": {
"recap_id": "",
"recap_title": "",
"recap_summary": "",
"recap_document_ids": "",
"recap_generated_at": "",
}}
)
await write_audit(user.user_id, "RECAP_DELETED", {"recap_id": recap_id}, request.client.host if request.client else "")
return {"recap_id": recap_id, "deleted": True}
@api_router.get("/recaps")
async def recap_list(user: User = Depends(get_current_user)):
recaps = await db.recaps.find({"user_id": user.user_id}, {"_id": 0}).sort("created_at", -1).to_list(100)
return recaps
@api_router.post("/recap/{recap_id}/tts")
async def recap_tts(recap_id: str, user: User = Depends(get_current_user)):
recap = await db.recaps.find_one({"recap_id": recap_id, "user_id": user.user_id}, {"_id": 0})
if not recap:
raise HTTPException(404, "Recap tidak ditemukan")
text = (recap.get("unified_summary") or "").strip()
if not text:
raise HTTPException(400, "Recap belum memiliki konten ringkasan")
audio_filename = f"recap_{recap_id}.wav"
audio_path = AUDIO_DIR / audio_filename
if not audio_path.exists():
await _generate_tts(text, str(audio_path), user)
existing = await db.audio_files.find_one({"filename": audio_filename}, {"_id": 1})
if not existing and audio_path.exists():
with open(audio_path, "rb") as f:
data = f.read()
await db.audio_files.update_one(
{"filename": audio_filename},
{"$set": {
"filename": audio_filename,
"data": Binary(data),
"created_at": datetime.now(timezone.utc).isoformat()
}},
upsert=True
)
audio_url = f"{API_PREFIX}/audio/{audio_filename}"
await db.recaps.update_one(
{"recap_id": recap_id},
{"$set": {"audio_url": audio_url}},
)
return {"audio_url": audio_url, "status": "ready"}
@api_router.get("/audio/{filename}")
async def serve_audio(filename: str):
file_path = AUDIO_DIR / filename
if not file_path.exists():
raise HTTPException(404, "Audio tidak ditemukan")
from fastapi.responses import FileResponse
return FileResponse(str(file_path), media_type="audio/wav")
@api_router.post("/documents/{document_id}/tts")
async def document_tts(document_id: str, user: User = Depends(get_current_user)):
doc = await db.documents.find_one({"document_id": document_id, "user_id": user.user_id}, {"_id": 0})
if not doc:
raise HTTPException(404, "Dokumen tidak ditemukan")
text = (doc.get("summary") or "").strip()
if not text:
raise HTTPException(400, "Dokumen belum memiliki ringkasan")
audio_filename = f"doc_{document_id}.wav"
audio_path = AUDIO_DIR / audio_filename
if not audio_path.exists():
await _generate_tts(text, str(audio_path), user)
existing = await db.audio_files.find_one({"filename": audio_filename}, {"_id": 1})
if not existing and audio_path.exists():
with open(audio_path, "rb") as f:
data = f.read()
await db.audio_files.update_one(
{"filename": audio_filename},
{"$set": {
"filename": audio_filename,
"data": Binary(data),
"created_at": datetime.now(timezone.utc).isoformat()
}},
upsert=True
)
audio_url = f"{API_PREFIX}/audio/{audio_filename}"
return {"audio_url": audio_url, "status": "ready"}
# ============== Audit & Progress ==============
@api_router.get("/audit-logs")
async def audit_logs(user: User = Depends(get_current_user), limit: int = 100):
logs = await db.audit_logs.find({"user_id": user.user_id}, {"_id": 0}).sort("timestamp", -1).to_list(limit)
return logs
@api_router.get("/progress")
async def progress(user: User = Depends(get_current_user)):
docs_count = await db.documents.count_documents({"user_id": user.user_id, "status": "ready"})
quizzes_count = await db.quizzes.count_documents({"user_id": user.user_id})
results = await db.quiz_results.find({"user_id": user.user_id}, {"_id": 0}).sort("created_at", -1).to_list(50)
avg_score = round(sum(r.get("score", 0) for r in results) / len(results), 1) if results else 0
last_results = [
{"result_id": r["result_id"], "score": r.get("score", 0), "created_at": r["created_at"]}
for r in results[:10]
]
return {
"documents": docs_count,
"quizzes": quizzes_count,
"average_score": avg_score,
"recent_results": last_results,
}
@api_router.get("/diag/gemini")
async def diag_gemini():
gemini_ok = False
gemini_detail = ""
try:
url = f"{GEMINI_BASE_URL}/models/{GEMINI_MODEL}:generateContent?key={GEMINI_API_KEY}"
body = {"contents": [{"role": "user", "parts": [{"text": "Balas: OK"}]}]}
async with httpx.AsyncClient(timeout=15.0) as hc:
r = await hc.post(url, json=body)
gemini_ok = r.status_code == 200
gemini_detail = f"status={r.status_code}"
except Exception as e:
gemini_detail = str(e)[:200]
supa_ok = False
supa_detail = ""
if SUPABASE_URL and SUPABASE_ANON_KEY:
try:
async with httpx.AsyncClient(timeout=10.0) as hc:
r = await hc.get(
f"{SUPABASE_URL}/auth/v1/user",
headers={
"Authorization": f"Bearer {SUPABASE_ANON_KEY}",
"apikey": SUPABASE_ANON_KEY,
"X-Supabase-Api-Version": "2025-04-01",
},
)
supa_ok = r.status_code in (200, 401)
supa_detail = f"status={r.status_code}"
except Exception as e:
supa_detail = str(e)[:200]
else:
supa_detail = "SUPABASE_URL / ANON_KEY not set"
return {
"gemini": {"ok": gemini_ok, "detail": gemini_detail, "model": GEMINI_MODEL, "analysis_model": GEMINI_ANALYSIS_MODEL},
"supabase": {"ok": supa_ok, "detail": supa_detail, "url": SUPABASE_URL[:30] + "..." if SUPABASE_URL else None},
}
# ============== Supabase Storage helpers ==============
SUPABASE_STORAGE_URL = f"{SUPABASE_URL}/storage/v1" if SUPABASE_URL else ""
async def _ensure_pdfs_bucket():
if not SUPABASE_STORAGE_URL or not SUPABASE_URL:
logger.warning("Supabase not configured, skipping bucket creation")
return
try:
async with httpx.AsyncClient() as hc:
r = await hc.post(
f"{SUPABASE_STORAGE_URL}/bucket",
headers={
"Authorization": f"Bearer {SUPABASE_ANON_KEY}",
"Content-Type": "application/json",
},
json={"id": "pdfs", "name": "pdfs", "public": True},
)
if r.status_code in (200, 201):
logger.info("Supabase bucket 'pdfs' created")
elif r.status_code == 409:
logger.info("Supabase bucket 'pdfs' already exists")
else:
logger.warning(f"Supabase bucket creation: {r.status_code} {r.text[:200]}")
except Exception as e:
logger.warning(f"Could not create Supabase bucket (may already exist): {e}")
async def _upload_to_supabase_storage(user_id: str, document_id: str, file_path: str) -> Optional[str]:
if not SUPABASE_STORAGE_URL:
return None
storage_path = f"{user_id}/{document_id}.pdf"
try:
with open(file_path, "rb") as f:
content = f.read()
async with httpx.AsyncClient(timeout=30.0) as hc:
r = await hc.post(
f"{SUPABASE_STORAGE_URL}/object/pdfs/{storage_path}",
headers={
"Authorization": f"Bearer {SUPABASE_ANON_KEY}",
"Content-Type": "application/pdf",
},
content=content,
)
if r.status_code not in (200, 201):
logger.warning(f"Supabase storage upload failed: {r.status_code} {r.text[:200]}")
return None
return f"{SUPABASE_STORAGE_URL}/object/public/pdfs/{storage_path}"
except Exception as e:
logger.warning(f"Supabase storage upload error: {e}")
return None
async def _delete_from_supabase_storage(user_id: str, document_id: str):
if not SUPABASE_STORAGE_URL:
return
storage_path = f"{user_id}/{document_id}.pdf"
try:
async with httpx.AsyncClient(timeout=15.0) as hc:
await hc.delete(
f"{SUPABASE_STORAGE_URL}/object/pdfs/{storage_path}",
headers={"Authorization": f"Bearer {SUPABASE_ANON_KEY}"},
)
except Exception as e:
logger.warning(f"Supabase storage delete error: {e}")
async def _try_upload_supabase(user_id: str, doc_id: str, file_path: str):
"""Non-blocking upload to Supabase Storage. Failures are logged only."""
try:
url = await _upload_to_supabase_storage(user_id, doc_id, file_path)
if url:
await db.documents.update_one(
{"document_id": doc_id},
{"$set": {"pdf_url": url}},
)
except Exception as e:
logger.warning(f"Supabase background upload skipped: {e}")
# ============== Friend System ==============
class FriendRequestPayload(BaseModel):
target_user_id: str
class FriendRequestAction(BaseModel):
request_id: str
@api_router.get("/users/search")
async def search_users(q: str = "", user: User = Depends(get_current_user)):
if not q.strip():
return {"users": []}
pattern = {"$regex": q.strip(), "$options": "i"}
found = await db.users.find(
{
"$and": [
{"user_id": {"$ne": user.user_id}},
{"$or": [
{"name": pattern},
{"email": pattern},
{"friend_code": pattern},
]},
]
},
{"_id": 0, "user_id": 1, "name": 1, "email": 1, "picture": 1, "friend_code": 1, "education_level": 1, "institution": 1},
).limit(20).to_list(20)
return {"users": found}
@api_router.post("/friends/request")
async def send_friend_request(payload: FriendRequestPayload, user: User = Depends(get_current_user)):
target = await db.users.find_one({"user_id": payload.target_user_id}, {"_id": 0})
if not target:
raise HTTPException(404, "User tidak ditemukan")
if target["user_id"] == user.user_id:
raise HTTPException(400, "Tidak bisa menambahkan diri sendiri")
existing = await db.friend_requests.find_one({
"$or": [
{"from_user_id": user.user_id, "to_user_id": payload.target_user_id},
{"from_user_id": payload.target_user_id, "to_user_id": user.user_id},
],
}, {"_id": 0})
if existing:
if existing["status"] == "accepted":
raise HTTPException(400, "Sudah menjadi teman")
if existing["status"] == "pending":
if existing["from_user_id"] == user.user_id:
raise HTTPException(400, "Permintaan pertemanan sudah dikirim")
else:
# Auto-accept if they already sent us one
await db.friend_requests.update_one(
{"friend_request_id": existing["friend_request_id"]},
{"$set": {"status": "accepted"}},
)
await _create_notification(payload.target_user_id, "friend_accepted",
f"{user.name} menerima permintaan pertemananmu", {"user_id": user.user_id})
return {"status": "accepted", "message": "Sekarang kalian berteman"}
fr_id = uuid.uuid4().hex
doc = {
"friend_request_id": fr_id,
"from_user_id": user.user_id,
"from_user_name": user.name,
"to_user_id": payload.target_user_id,
"status": "pending",
"created_at": datetime.now(timezone.utc).isoformat(),
}
await db.friend_requests.insert_one(doc)
# notify target
await _create_notification(
payload.target_user_id,
"friend_request",
f"{user.name} ingin menjadi temanmu",
{"from_user_id": user.user_id, "friend_request_id": fr_id},
)
return {"friend_request_id": fr_id, "status": "pending"}
@api_router.get("/friends/requests")
async def list_friend_requests(user: User = Depends(get_current_user)):
incoming = await db.friend_requests.find(
{"to_user_id": user.user_id, "status": "pending"},
{"_id": 0},
).sort("created_at", -1).to_list(50)
return {"requests": incoming}
@api_router.post("/friends/requests/{request_id}/accept")
async def accept_friend_request(request_id: str, user: User = Depends(get_current_user)):
fr = await db.friend_requests.find_one({"friend_request_id": request_id}, {"_id": 0})
if not fr:
raise HTTPException(404, "Permintaan tidak ditemukan")
if fr["to_user_id"] != user.user_id:
raise HTTPException(403, "Bukan permintaan untukmu")
if fr["status"] != "pending":
raise HTTPException(400, f"Permintaan sudah {fr['status']}")
await db.friend_requests.update_one(
{"friend_request_id": request_id},
{"$set": {"status": "accepted"}},
)
# notify sender
await _create_notification(
fr["from_user_id"],
"friend_accepted",
f"{user.name} menerima permintaan pertemananmu",
{"user_id": user.user_id},
)
return {"status": "accepted"}
@api_router.post("/friends/requests/{request_id}/reject")
async def reject_friend_request(request_id: str, user: User = Depends(get_current_user)):
fr = await db.friend_requests.find_one({"friend_request_id": request_id}, {"_id": 0})
if not fr:
raise HTTPException(404, "Permintaan tidak ditemukan")
if fr["to_user_id"] != user.user_id:
raise HTTPException(403, "Bukan permintaan untukmu")
if fr["status"] != "pending":
raise HTTPException(400, f"Permintaan sudah {fr['status']}")
await db.friend_requests.update_one(
{"friend_request_id": request_id},
{"$set": {"status": "rejected"}},
)
return {"status": "rejected"}
@api_router.get("/friends")
async def list_friends(user: User = Depends(get_current_user)):
accepted = await db.friend_requests.find(
{
"$or": [
{"from_user_id": user.user_id, "status": "accepted"},
{"to_user_id": user.user_id, "status": "accepted"},
]
},
{"_id": 0},
).to_list(200)
friend_ids = set()
for fr in accepted:
friend_ids.add(fr["from_user_id"] if fr["to_user_id"] == user.user_id else fr["to_user_id"])
friends = []
for fid in friend_ids:
u = await db.users.find_one({"user_id": fid}, {"_id": 0, "user_id": 1, "name": 1, "picture": 1, "education_level": 1, "institution": 1})
if u:
friends.append(u)
return {"friends": friends}
# ============== Notifications ==============
async def _create_notification(user_id: str, notif_type: str, message: str, data: Optional[dict] = None):
doc = {
"notification_id": uuid.uuid4().hex,
"user_id": user_id,
"type": notif_type,
"message": message,
"data": data or {},
"read": False,
"created_at": datetime.now(timezone.utc).isoformat(),
}
await db.notifications.insert_one(doc)
return doc["notification_id"]
@api_router.get("/notifications")
async def list_notifications(user: User = Depends(get_current_user), limit: int = 50):
notifs = await db.notifications.find(
{"user_id": user.user_id},
{"_id": 0},
).sort("created_at", -1).to_list(limit)
return {"notifications": notifs}
@api_router.get("/notifications/unread-count")
async def unread_notification_count(user: User = Depends(get_current_user)):
count = await db.notifications.count_documents({"user_id": user.user_id, "read": False})
return {"count": count}
@api_router.post("/notifications/{notif_id}/read")
async def mark_notification_read(notif_id: str, user: User = Depends(get_current_user)):
result = await db.notifications.update_one(
{"notification_id": notif_id, "user_id": user.user_id},
{"$set": {"read": True}},
)
if result.modified_count == 0:
raise HTTPException(404, "Notifikasi tidak ditemukan")
return {"ok": True}
@api_router.post("/notifications/read-all")
async def mark_all_notifications_read(user: User = Depends(get_current_user)):
await db.notifications.update_many(
{"user_id": user.user_id, "read": False},
{"$set": {"read": True}},
)
return {"ok": True}
# ============== Document Discussion ==============
class SendMessagePayload(BaseModel):
content: str
class DiscussionInvitePayload(BaseModel):
user_ids: List[str]
class DiscussionKickPayload(BaseModel):
user_id: str
async def _can_access_discussion(doc_id: str, user_id: str) -> Optional[dict]:
doc = await db.documents.find_one({"document_id": doc_id, "user_id": user_id}, {"_id": 0})
if doc:
return doc
participant = await db.discussion_participants.find_one(
{"document_id": doc_id, "user_id": user_id}, {"_id": 0}
)
if participant:
doc = await db.documents.find_one({"document_id": doc_id}, {"_id": 0, "file_path": 0})
return doc
return None
def _build_doc_context(doc: dict) -> str:
return json.dumps({
"title": doc.get("title", ""),
"summary": (doc.get("summary") or "")[:5000],
"key_concepts": doc.get("key_concepts", [])[:10],
"learning_objectives": doc.get("learning_objectives", [])[:8],
}, ensure_ascii=False)
async def _bg_respond_bot(doc_id: str, question: str, doc: dict, audience: str, owner_id: str):
context = _build_doc_context(doc)
system = (
f"Kamu adalah EduBot, asisten belajar untuk {audience}. "
f"Jawab pertanyaan berdasarkan konten dokumen. Bahasa Indonesia. "
f"Jika di luar konteks, beri tahu dengan sopan."
)
prompt = (
f"KONTEN DOKUMEN:\n{context}\n\n"
f"PERTANYAAN: {question}\n\n"
f"Jawab dengan jelas. Berikan contoh bila memungkinkan."
)
try:
resp = await _call_groq(system, prompt)
bot_msg = {
"message_id": uuid.uuid4().hex,
"document_id": doc_id,
"user_id": "bot",
"user_name": "EduBot",
"user_picture": None,
"content": resp,
"created_at": datetime.now(timezone.utc).isoformat(),
}
await db.discussion_messages.insert_one(bot_msg)
except Exception as e:
logger.warning(f"Bot respond gagal: {e}")
@api_router.get("/documents/{doc_id}/messages")
async def list_discussion_messages(doc_id: str, user: User = Depends(get_current_user), limit: int = 50, before: Optional[str] = None):
doc = await _can_access_discussion(doc_id, user.user_id)
if not doc:
raise HTTPException(404, "Dokumen tidak ditemukan")
query = {"document_id": doc_id}
if before:
query["created_at"] = {"$lt": before}
messages = await db.discussion_messages.find(
query, {"_id": 0},
).sort("created_at", -1).to_list(limit)
messages.reverse()
return {"messages": messages, "document": {"title": doc.get("title", ""), "filename": doc.get("filename", "")}, "has_more": len(messages) >= limit}
@api_router.post("/documents/{doc_id}/messages")
async def send_discussion_message(doc_id: str, payload: SendMessagePayload, user: User = Depends(get_current_user)):
doc = await _can_access_discussion(doc_id, user.user_id)
if not doc:
raise HTTPException(404, "Dokumen tidak ditemukan")
if not payload.content.strip():
raise HTTPException(400, "Pesan tidak boleh kosong")
content = payload.content.strip()
msg_id = uuid.uuid4().hex
msg = {
"message_id": msg_id,
"document_id": doc_id,
"user_id": user.user_id,
"user_name": user.name,
"user_picture": user.picture,
"content": content,
"created_at": datetime.now(timezone.utc).isoformat(),
}
await db.discussion_messages.insert_one(msg)
# Notify participants
participant_ids = set()
participant_ids.add(doc.get("user_id", user.user_id))
async for p in db.discussion_participants.find({"document_id": doc_id}, {"_id": 0, "user_id": 1}):
participant_ids.add(p["user_id"])
for pid in participant_ids:
if pid != user.user_id and pid != "bot":
doc_title = doc.get("title") or doc.get("filename") or "dokumen"
await _create_notification(
pid, "discussion_message",
f"{user.name} mengirim pesan di diskusi {doc_title}",
{"document_id": doc_id, "document_title": doc_title},
)
# @bot trigger (wrapped so it never crashes the send)
try:
if "@bot" in content.lower() and doc and doc.get("status") == "ready":
audience = _audience(user)
question = content.lower().replace("@bot", "").strip()
if not question:
question = "Jelaskan materi ini secara singkat"
asyncio.create_task(_bg_respond_bot(doc_id, question, doc, audience, doc.get("user_id", "")))
except Exception as e:
logger.exception(f"Bot trigger gagal: {e}")
return msg
@api_router.post("/documents/{doc_id}/discussion/invite")
async def invite_to_discussion(doc_id: str, payload: DiscussionInvitePayload, user: User = Depends(get_current_user)):
doc = await db.documents.find_one({"document_id": doc_id, "user_id": user.user_id}, {"_id": 0})
if not doc:
raise HTTPException(404, "Dokumen tidak ditemukan")
if not payload.user_ids:
raise HTTPException(400, "Pilih minimal satu teman")
invited = []
for target_id in payload.user_ids:
existing = await db.discussion_participants.find_one(
{"document_id": doc_id, "user_id": target_id}, {"_id": 0}
)
if existing:
continue
friend = await db.users.find_one({"user_id": target_id}, {"_id": 0, "name": 1})
if not friend:
continue
await db.discussion_participants.insert_one({
"document_id": doc_id,
"user_id": target_id,
"invited_by": user.user_id,
"created_at": datetime.now(timezone.utc).isoformat(),
})
invited.append(target_id)
doc_title = doc.get("title") or doc.get("filename") or "dokumen"
await _create_notification(
target_id, "discussion_invite",
f"{user.name} mengundangmu ke diskusi {doc_title}",
{"document_id": doc_id, "document_title": doc_title, "invited_by": user.user_id},
)
return {"invited": invited, "count": len(invited)}
@api_router.get("/documents/{doc_id}/discussion/participants")
async def list_discussion_participants(doc_id: str, user: User = Depends(get_current_user)):
doc = await _can_access_discussion(doc_id, user.user_id)
if not doc:
raise HTTPException(404, "Dokumen tidak ditemukan")
participants = []
owner = await db.users.find_one({"user_id": doc.get("user_id")}, {"_id": 0, "user_id": 1, "name": 1, "picture": 1, "friend_code": 1})
if owner:
participants.append({**owner, "role": "owner"})
async for p in db.discussion_participants.find({"document_id": doc_id}, {"_id": 0}):
u = await db.users.find_one({"user_id": p["user_id"]}, {"_id": 0, "user_id": 1, "name": 1, "picture": 1, "friend_code": 1})
if u:
participants.append({**u, "role": "member"})
return {"participants": participants}
@api_router.post("/documents/{doc_id}/discussion/kick")
async def kick_from_discussion(doc_id: str, payload: DiscussionKickPayload, user: User = Depends(get_current_user)):
doc = await db.documents.find_one({"document_id": doc_id, "user_id": user.user_id}, {"_id": 0})
if not doc:
raise HTTPException(404, "Hanya pemilik dokumen yang bisa mengeluarkan peserta")
if payload.user_id == user.user_id:
raise HTTPException(400, "Tidak bisa mengeluarkan diri sendiri")
result = await db.discussion_participants.delete_one(
{"document_id": doc_id, "user_id": payload.user_id}
)
if result.deleted_count == 0:
raise HTTPException(404, "Peserta tidak ditemukan")
target = await db.users.find_one({"user_id": payload.user_id}, {"_id": 0, "name": 1})
if target:
doc_title = doc.get("title") or doc.get("filename") or "dokumen"
await _create_notification(
payload.user_id, "discussion_kicked",
f"Kamu dikeluarkan dari diskusi {doc_title} oleh {user.name}",
{"document_id": doc_id, "document_title": doc_title},
)
return {"ok": True}
@api_router.post("/documents/{doc_id}/discussion/leave")
async def leave_discussion(doc_id: str, user: User = Depends(get_current_user)):
result = await db.discussion_participants.delete_one(
{"document_id": doc_id, "user_id": user.user_id}
)
if result.deleted_count == 0:
raise HTTPException(404, "Kamu bukan peserta diskusi ini")
return {"ok": True}
# ============== AI Chat per Document ==============
class ChatQuestion(BaseModel):
question: str
@api_router.post("/documents/{doc_id}/chat")
async def chat_with_document(doc_id: str, payload: ChatQuestion, user: User = Depends(get_current_user)):
doc = await db.documents.find_one({"document_id": doc_id, "user_id": user.user_id}, {"_id": 0})
if not doc:
raise HTTPException(404, "Dokumen tidak ditemukan")
if doc.get("status") != "ready":
raise HTTPException(400, "Dokumen belum siap dianalisis")
if not payload.question.strip():
raise HTTPException(400, "Pertanyaan tidak boleh kosong")
audience = _audience(user)
# Token Optimization: Budget sangat ketat untuk fitur chat
context = json.dumps({
"title": doc.get("title", ""),
"summary": (doc.get("summary") or "")[:2500],
"key_concepts": [c.get("concept", "") for c in doc.get("key_concepts", [])[:5]],
"learning_objectives": doc.get("learning_objectives", [])[:4],
}, ensure_ascii=False)
system = (
f"Kamu EduScanner AI, asisten belajar untuk {audience}. "
f"Jawab pertanyaan berdasarkan dokumen. Bahasa Indonesia."
)
prompt = (
f"DOKUMEN:\n{context}\n\n"
f"PERTANYAAN: {payload.question}"
)
try:
resp = await _call_groq(system, prompt)
return {"answer": resp}
except Exception as e:
# Fallback: prompt sangat minimal jika 413
try:
resp = await _call_groq(system, f"Materi: {doc.get('title')}. Tanya: {payload.question}")
return {"answer": resp}
except:
raise HTTPException(500, f"Gagal menjawab: {str(e)[:100]}")
# ============== PDF serving ==============
@api_router.get("/documents/{doc_id}/pdf")
async def get_document_pdf(doc_id: str, user: User = Depends(get_current_user)):
doc = await db.documents.find_one({"document_id": doc_id, "user_id": user.user_id}, {"_id": 0})
if not doc:
raise HTTPException(404, "Dokumen tidak ditemukan")
# 1. Try MongoDB storage (primary)
stored = await db.pdf_files.find_one({"document_id": doc_id}, {"_id": 0, "data": 1})
if stored:
filename = doc.get("filename", "document.pdf")
return Response(
content=stored["data"],
media_type="application/pdf",
headers={"Content-Disposition": f'inline; filename="{filename}"'},
)
# 2. Try Supabase Storage URL
pdf_url = doc.get("pdf_url")
if pdf_url:
return {"pdf_url": pdf_url}
# 3. Fallback: try local file
fp = doc.get("file_path")
if fp and Path(fp).exists():
from fastapi.responses import FileResponse
return FileResponse(fp, media_type="application/pdf", filename=doc.get("filename", "document.pdf"))
raise HTTPException(404, "File PDF tidak ditemukan")
# ============== Mount ==============
fastapi_app.include_router(api_router)
fastapi_app.add_middleware(
CORSMiddleware,
allow_credentials=True,
allow_origins=os.environ.get('CORS_ORIGINS', '*').split(','),
allow_methods=["*"],
allow_headers=["*"],
)
async def _sync_local_audios_to_mongodb():
logger.info("Memulai sinkronisasi file audio lokal ke MongoDB...")
try:
if not AUDIO_DIR.exists():
return
import glob
wav_files = glob.glob(str(AUDIO_DIR / "*.wav"))
for file_path_str in wav_files:
file_path = Path(file_path_str)
filename = file_path.name
if file_path.stat().st_size > 15 * 1024 * 1024:
logger.warning(f"File audio {filename} terlalu besar untuk MongoDB (>15MB), dilewati.")
continue
existing = await db.audio_files.find_one({"filename": filename}, {"_id": 1})
if not existing:
logger.info(f"Mengunggah file audio baru ke MongoDB: {filename}")
with open(file_path_str, "rb") as f:
data = f.read()
await db.audio_files.update_one(
{"filename": filename},
{"$set": {
"filename": filename,
"data": Binary(data),
"created_at": datetime.now(timezone.utc).isoformat()
}},
upsert=True
)
logger.info("Sinkronisasi audio selesai.")
except Exception as e:
logger.warning(f"Gagal melakukan sinkronisasi audio ke MongoDB: {e}")
@fastapi_app.on_event("startup")
async def startup():
await _ensure_pdfs_bucket()
await _sync_local_audios_to_mongodb()
@fastapi_app.on_event("shutdown")
async def shutdown_db_client():
client.close()
# ============== Supabase Storage helpers ==============
SUPABASE_STORAGE_URL = f"{SUPABASE_URL}/storage/v1" if SUPABASE_URL else ""
async def _ensure_pdfs_bucket():
if not SUPABASE_STORAGE_URL or not SUPABASE_URL:
return
try:
async with httpx.AsyncClient() as hc:
await hc.post(
f"{SUPABASE_STORAGE_URL}/bucket",
headers={
"Authorization": f"Bearer {SUPABASE_ANON_KEY}",
"Content-Type": "application/json",
},
json={"id": "pdfs", "name": "pdfs", "public": True},
)
except Exception:
pass