| | import logging |
| | from fastapi import APIRouter, Depends, HTTPException, BackgroundTasks |
| | from sqlalchemy.orm import Session |
| | from typing import List, Dict |
| | import asyncio |
| | from datetime import datetime |
| |
|
| | from api.auth import get_current_user |
| | from models import db_models |
| | from models.schemas import QuizGenerateRequest, QuizSetResponse |
| | from core.database import get_db, SessionLocal |
| | from api.websocket_routes import manager |
| | from services.quiz_service import quiz_service |
| | from core import constants |
| |
|
| | router = APIRouter(prefix="/api/quizzes", tags=["quizzes"]) |
| | logger = logging.getLogger(__name__) |
| |
|
| | async def run_quiz_generation(set_id: int, request: QuizGenerateRequest, user_id: int): |
| | """Background task for quiz generation""" |
| | db = SessionLocal() |
| | connection_id = f"user_{user_id}" |
| | try: |
| | db_set = db.query(db_models.QuizSet).filter(db_models.QuizSet.id == set_id).first() |
| | if not db_set: return |
| |
|
| | |
| | quizzes_data = await quiz_service.generate_quiz( |
| | file_key=request.file_key, |
| | text_input=request.text_input, |
| | difficulty=request.difficulty, |
| | topic=request.topic, |
| | language=request.language, |
| | count_mode=request.count, |
| | progress_callback=lambda p, m: asyncio.create_task( |
| | manager.send_progress(connection_id, p, "processing", m) |
| | ) |
| | ) |
| |
|
| | if not quizzes_data: |
| | raise Exception("AI failed to generate quiz questions") |
| |
|
| | |
| | for item in quizzes_data: |
| | db_question = db_models.QuizQuestion( |
| | quiz_set_id=db_set.id, |
| | question=item.get("question", ""), |
| | hint=item.get("hint", ""), |
| | choices=item.get("choices", {}), |
| | answer=str(item.get("answer", "1")), |
| | explanation=item.get("explanation", "") |
| | ) |
| | db.add(db_question) |
| | |
| | db_set.status = "completed" |
| | db.commit() |
| |
|
| | |
| | await manager.send_result(connection_id, { |
| | "type": "quiz", |
| | "id": db_set.id, |
| | "status": "completed", |
| | "title": db_set.title |
| | }) |
| |
|
| | except Exception as e: |
| | logger.error(f"Background quiz generation failed: {e}") |
| | db_set = db.query(db_models.QuizSet).filter(db_models.QuizSet.id == set_id).first() |
| | if db_set: |
| | db_set.status = "failed" |
| | db_set.error_message = str(e) |
| | db.commit() |
| | await manager.send_error(connection_id, f"Quiz generation failed: {str(e)}") |
| | finally: |
| | db.close() |
| |
|
| | @router.get("/config") |
| | async def get_quiz_config(): |
| | """Returns available difficulties, count options, and languages for quizzes.""" |
| | return { |
| | "difficulties": constants.DIFFICULTIES, |
| | "counts": constants.QUIZ_COUNTS, |
| | "languages": constants.LANGUAGES |
| | } |
| |
|
| | @router.post("/generate", response_model=QuizSetResponse) |
| | async def generate_quiz( |
| | request: QuizGenerateRequest, |
| | background_tasks: BackgroundTasks, |
| | current_user: db_models.User = Depends(get_current_user), |
| | db: Session = Depends(get_db) |
| | ): |
| | """ |
| | Initiates quiz generation in the background. |
| | """ |
| | source_id = None |
| | if request.file_key: |
| | source = db.query(db_models.Source).filter( |
| | db_models.Source.s3_key == request.file_key, |
| | db_models.Source.user_id == current_user.id |
| | ).first() |
| | if not source: |
| | raise HTTPException(status_code=403, detail="Not authorized to access this file") |
| | source_id = source.id |
| |
|
| | |
| | file_base = request.file_key.split('/')[-1].rsplit('.', 1)[0] if request.file_key else None |
| | |
| | |
| | if file_base: |
| | title = f"Quiz-{file_base}" |
| | elif request.topic and request.topic != "string": |
| | title = request.topic |
| | else: |
| | title = f"Quiz {datetime.utcnow().strftime('%Y-%m-%d %H:%M')}" |
| | db_set = db_models.QuizSet( |
| | title=title, |
| | difficulty=request.difficulty, |
| | user_id=current_user.id, |
| | source_id=source_id, |
| | status="processing" |
| | ) |
| | db.add(db_set) |
| | db.commit() |
| | db.refresh(db_set) |
| |
|
| | |
| | background_tasks.add_task(run_quiz_generation, db_set.id, request, current_user.id) |
| |
|
| | return db_set |
| |
|
| | @router.get("/sets", response_model=List[QuizSetResponse]) |
| | async def list_quiz_sets( |
| | current_user: db_models.User = Depends(get_current_user), |
| | db: Session = Depends(get_db) |
| | ): |
| | """ |
| | Lists all quiz sets for the current user. |
| | """ |
| | try: |
| | sets = db.query(db_models.QuizSet).filter( |
| | db_models.QuizSet.user_id == current_user.id |
| | ).order_by(db_models.QuizSet.created_at.desc()).all() |
| | return [QuizSetResponse.model_validate(s) for s in sets] |
| | except Exception as e: |
| | raise HTTPException(status_code=500, detail=str(e)) |
| |
|
| | @router.get("/set/{set_id}", response_model=QuizSetResponse) |
| | async def get_quiz_set( |
| | set_id: int, |
| | current_user: db_models.User = Depends(get_current_user), |
| | db: Session = Depends(get_db) |
| | ): |
| | """ |
| | Retrieves a specific quiz set. |
| | """ |
| | db_set = db.query(db_models.QuizSet).filter( |
| | db_models.QuizSet.id == set_id, |
| | db_models.QuizSet.user_id == current_user.id |
| | ).first() |
| | |
| | if not db_set: |
| | raise HTTPException(status_code=404, detail="Quiz set not found") |
| | |
| | return QuizSetResponse.model_validate(db_set) |
| |
|
| | @router.delete("/set/{set_id}") |
| | async def delete_quiz_set( |
| | set_id: int, |
| | current_user: db_models.User = Depends(get_current_user), |
| | db: Session = Depends(get_db)): |
| | """ |
| | Deletes a specific quiz set and all its questions. |
| | """ |
| | db_set = db.query(db_models.QuizSet).filter( |
| | db_models.QuizSet.id == set_id, |
| | db_models.QuizSet.user_id == current_user.id |
| | ).first() |
| | |
| | if not db_set: |
| | raise HTTPException(status_code=404, detail="Quiz set not found") |
| | |
| | db.delete(db_set) |
| | db.commit() |
| | return {"message": "Quiz set and all associated questions deleted successfully"} |
| |
|