| from fastapi import FastAPI, HTTPException |
| from fastapi.middleware.cors import CORSMiddleware |
| from fastapi.responses import Response |
| from typing import List, Dict, Optional |
| import os |
| from datetime import datetime, timedelta |
| from dotenv import load_dotenv |
|
|
| from code_executor import CodeExecutor |
| from database import Database |
| from models import Room, Question, Worksheet, ExecutionResult, TestCase, SubmissionResult |
| from report_generator import generate_exam_report |
|
|
| |
| load_dotenv() |
| MONGO_URI = os.getenv("MONGO_URI") |
|
|
| if not MONGO_URI: |
| print("WARNING: MONGO_URI not found in .env") |
|
|
| |
| |
| |
|
|
| app = FastAPI(title="Online Exam IDE API",docs_url="/docs", redoc_url="/redoc") |
| |
| app.add_middleware( |
| CORSMiddleware, |
| allow_origins=["*"], |
| allow_credentials=True, |
| allow_methods=["*"], |
| allow_headers=["*"], |
| ) |
|
|
| |
| db = Database(MONGO_URI) |
| code_executor = CodeExecutor() |
|
|
| |
| MAX_SCORE_PER_QUESTION = 100 |
|
|
| |
| MAX_VIOLATIONS_ALLOWED = 3 |
|
|
|
|
| |
| |
| |
|
|
| async def check_room_active(room_id: str) -> dict: |
| """Check if room exists and is active. Auto-expire if time is up.""" |
| room = await db.get_room(room_id) |
| if not room: |
| raise HTTPException(status_code=404, detail="Room not found") |
| |
| if room.get("status") == "expired": |
| raise HTTPException(status_code=403, detail="This exam has ended. Room is no longer accessible.") |
| |
| if db.is_room_expired(room): |
| await db.expire_room(room_id) |
| raise HTTPException(status_code=403, detail="This exam has ended. Room code has been revoked.") |
| |
| return room |
|
|
|
|
| |
| |
| |
|
|
| @app.get("/health") |
| async def health_check(): |
| return {"status": "healthy", "timestamp": datetime.now().isoformat()} |
|
|
|
|
| |
| |
| |
|
|
| @app.post("/api/rooms/create") |
| async def create_room(data: dict): |
| """Create a new exam room""" |
| room_name = data.get("room_name") |
| teacher_name = data.get("teacher_name") |
| language = data.get("language", "Python") |
| duration_minutes = int(data.get("duration", 30)) |
| start_time = data.get("start_time") |
| |
| room = await db.create_room( |
| room_name=room_name, |
| teacher_name=teacher_name, |
| language=language, |
| duration_minutes=duration_minutes, |
| start_time=start_time |
| ) |
| |
| return { |
| "room_id": room["room_id"], |
| "room_code": room["room_code"], |
| "room_name": room["room_name"], |
| "teacher_name": room["teacher_name"], |
| "duration_minutes": room["duration_minutes"], |
| "start_time": room["start_time"], |
| "end_time": room["end_time"] |
| } |
|
|
| @app.post("/api/rooms/{room_id}/start") |
| async def start_exam(room_id: str, data: dict): |
| """Start an exam by setting the start time explicitly.""" |
| room = await db.get_room(room_id) |
| if not room: |
| raise HTTPException(status_code=404, detail="Room not found") |
| |
| start_time = data.get("start_time") |
| if not start_time: |
| raise HTTPException(status_code=400, detail="start_time is required") |
| |
| duration = room.get("duration_minutes", 60) |
| success = await db.start_exam(room_id, start_time, duration) |
| |
| if not success: |
| raise HTTPException(status_code=500, detail="Failed to start exam") |
| |
| return {"success": True, "message": "Exam started"} |
|
|
|
|
| @app.post("/api/rooms/{room_id}/report_violation") |
| async def report_violation(room_id: str, data: dict): |
| """Report a student violation (tab switch, etc.). Auto-blocks after threshold.""" |
| student_id = data.get("student_id") |
| |
| room = await db.get_room(room_id) |
| |
| if not room: |
| raise HTTPException(status_code=404, detail="Room not found") |
| |
| if student_id: |
| current_flags = room.get("student_red_flags", {}).get(student_id, 0) |
| new_flags = current_flags + 1 |
| await db.update_red_flags(room_id, student_id, new_flags) |
| |
| blocked = new_flags > MAX_VIOLATIONS_ALLOWED |
| |
| return {"success": True, "new_count": new_flags, "blocked": blocked} |
| |
| return {"success": False} |
|
|
|
|
| @app.get("/api/rooms/{room_id}") |
| async def get_room(room_id: str): |
| """Get room details. Auto-detects and marks expired rooms.""" |
| room = await db.get_room(room_id) |
| if not room: |
| raise HTTPException(status_code=404, detail="Room not found") |
|
|
| |
| status = room.get("status", "active") |
| if status != "expired" and db.is_room_expired(room): |
| await db.expire_room(room_id) |
| status = "expired" |
|
|
| |
| student_red_flags = room.get("student_red_flags", {}) |
| blocked_students = [sid for sid, flags in student_red_flags.items() if flags > MAX_VIOLATIONS_ALLOWED] |
|
|
| return { |
| "room_id": room["room_id"], |
| "room_code": room["room_code"], |
| "room_name": room["room_name"], |
| "teacher_name": room["teacher_name"], |
| "students": room.get("students", []), |
| "student_names": room.get("student_names", {}), |
| "questions": room.get("questions", []), |
| "language": room.get("language"), |
| "duration_minutes": room.get("duration_minutes"), |
| "start_time": room.get("start_time"), |
| "end_time": room.get("end_time"), |
| "student_red_flags": student_red_flags, |
| "blocked_students": blocked_students, |
| "status": status |
| } |
|
|
|
|
| @app.post("/api/rooms/{room_code}/join") |
| async def join_room(room_code: str, data: dict): |
| """Join an existing room. BLOCKED if room is expired.""" |
| result = await db.join_room(room_code, data.get("student_name")) |
| if not result: |
| raise HTTPException(status_code=404, detail="Room not found") |
|
|
| |
| if "error" in result and result["error"] == "expired": |
| raise HTTPException(status_code=403, detail=result["detail"]) |
|
|
| return result |
|
|
|
|
| |
| |
| |
|
|
| @app.post("/api/rooms/{room_id}/questions") |
| async def create_question(room_id: str, data: dict): |
| """Create a question in a room, with optional test cases.""" |
| question = await db.create_question( |
| room_id=room_id, |
| question_text=data.get("question_text"), |
| language=data.get("language", "Python"), |
| test_cases=data.get("test_cases", []) |
| ) |
|
|
| if not question: |
| raise HTTPException(status_code=404, detail="Room not found") |
|
|
| return question |
|
|
|
|
| @app.get("/api/rooms/{room_id}/questions") |
| async def get_questions(room_id: str): |
| """ |
| Get all questions in a room. |
| Hidden test cases are filtered out for student consumption. |
| """ |
| questions = await db.get_questions(room_id) |
| |
| filtered_questions = [] |
| for q in questions: |
| filtered_q = { |
| "question_id": q["question_id"], |
| "question_text": q["question_text"], |
| "language": q.get("language", "Python"), |
| "test_cases": [ |
| tc for tc in q.get("test_cases", []) |
| if not tc.get("is_hidden", False) |
| ], |
| "total_test_cases": len(q.get("test_cases", [])), |
| "hidden_test_cases_count": len([ |
| tc for tc in q.get("test_cases", []) |
| if tc.get("is_hidden", False) |
| ]) |
| } |
| filtered_questions.append(filtered_q) |
| |
| return {"questions": filtered_questions} |
|
|
|
|
| @app.get("/api/rooms/{room_id}/questions/full") |
| async def get_questions_full(room_id: str): |
| """Get all questions including hidden test cases (for teacher use).""" |
| questions = await db.get_questions(room_id) |
| return {"questions": questions} |
|
|
|
|
| |
| |
| |
|
|
| @app.get("/api/worksheets/{room_id}/{student_id}/{question_id}") |
| async def get_worksheet(room_id: str, student_id: str, question_id: str): |
| """Get or create a worksheet""" |
| ws = await db.get_worksheet(room_id, student_id, question_id) |
|
|
| if ws: |
| pass |
| else: |
| room = await db.get_room(room_id) |
| language = "Python" |
| if room: |
| for q in room.get("questions", []): |
| if q["question_id"] == question_id: |
| language = q.get("language", "Python") |
| break |
| |
| ws = await db.create_worksheet(room_id, student_id, question_id, language) |
|
|
| return { |
| "worksheet_id": ws["worksheet_id"], |
| "code": ws["code"], |
| "language": ws["language"], |
| "status": ws["status"], |
| "last_updated": ws["last_updated"] |
| } |
|
|
|
|
| @app.post("/api/worksheets/{worksheet_id}/save") |
| async def save_worksheet(worksheet_id: str, data: dict): |
| """Save worksheet code. BLOCKED if room is expired or student is blocked.""" |
| |
| ws = await db.get_worksheet_by_id(worksheet_id) |
| if not ws: |
| raise HTTPException(status_code=404, detail="Worksheet not found") |
| |
| room = await db.get_room(ws["room_id"]) |
| if room and (room.get("status") == "expired" or db.is_room_expired(room)): |
| if room.get("status") != "expired": |
| await db.expire_room(room["room_id"]) |
| raise HTTPException(status_code=403, detail="Exam has ended. Cannot save code.") |
|
|
| |
| if room: |
| student_id = ws.get("student_id") |
| student_flags = room.get("student_red_flags", {}).get(student_id, 0) |
| if student_flags > MAX_VIOLATIONS_ALLOWED: |
| raise HTTPException(status_code=403, detail="You have been blocked from this exam due to repeated violations. Your score is 0.") |
|
|
| success = await db.save_worksheet(worksheet_id, data.get("code", "")) |
|
|
| if not success: |
| return {"success": True, "message": "Code saved (no changes)"} |
|
|
| return {"success": True, "message": "Code saved"} |
|
|
|
|
| |
| |
| |
|
|
| @app.post("/api/execute") |
| async def execute_code(data: dict): |
| """Execute code (freeform run, no test case checking)""" |
| code = data.get("code") |
| language = data.get("language", "python").lower() |
|
|
| result = code_executor.execute(code, language) |
|
|
| return result |
|
|
|
|
| |
| |
| |
|
|
| @app.post("/api/submit") |
| async def submit_solution(data: dict): |
| """ |
| Submit a solution for judging against all test cases. |
| Returns per-case results AND a score. |
| |
| Score = (passed_cases / total_cases) * MAX_SCORE_PER_QUESTION |
| |
| BLOCKED if room is expired. |
| """ |
| code = data.get("code", "") |
| language = data.get("language", "python").lower() |
| room_id = data.get("room_id") |
| question_id = data.get("question_id") |
| student_id = data.get("student_id") |
|
|
| if not all([code, room_id, question_id, student_id]): |
| raise HTTPException(status_code=400, detail="Missing required fields") |
|
|
| |
| room = await db.get_room(room_id) |
| if not room: |
| raise HTTPException(status_code=404, detail="Room not found") |
| |
| if room.get("status") == "expired" or db.is_room_expired(room): |
| if room.get("status") != "expired": |
| await db.expire_room(room_id) |
| raise HTTPException(status_code=403, detail="Exam has ended. Cannot submit solutions.") |
|
|
| |
| student_flags = room.get("student_red_flags", {}).get(student_id, 0) |
| if student_flags > MAX_VIOLATIONS_ALLOWED: |
| raise HTTPException(status_code=403, detail="You have been blocked from this exam due to repeated violations. Your score is 0.") |
|
|
| |
| question = await db.get_question(room_id, question_id) |
| if not question: |
| raise HTTPException(status_code=404, detail="Question not found") |
|
|
| test_cases = question.get("test_cases", []) |
| if not test_cases: |
| raise HTTPException(status_code=400, detail="No test cases defined for this question") |
|
|
| |
| results = [] |
| passed_count = 0 |
| has_error = False |
|
|
| for i, tc in enumerate(test_cases): |
| tc_result = code_executor.execute_with_test_case( |
| code=code, |
| language=language, |
| input_data=tc.get("input_data", ""), |
| expected_output=tc.get("expected_output", "") |
| ) |
|
|
| is_hidden = tc.get("is_hidden", False) |
|
|
| case_result = { |
| "case_number": i + 1, |
| "is_hidden": is_hidden, |
| "passed": tc_result["passed"], |
| "status": tc_result["status"], |
| } |
|
|
| |
| if not is_hidden: |
| case_result["input_data"] = tc.get("input_data", "") |
| case_result["expected_output"] = tc_result["expected_output"] |
| case_result["actual_output"] = tc_result["actual_output"] |
|
|
| if tc_result.get("error"): |
| case_result["error"] = tc_result["error"] if not is_hidden else "Runtime Error" |
| has_error = True |
|
|
| if tc_result["passed"]: |
| passed_count += 1 |
|
|
| results.append(case_result) |
|
|
| |
| total = len(test_cases) |
| if passed_count == total: |
| overall = "Accepted" |
| elif has_error: |
| overall = "Runtime Error" |
| else: |
| overall = "Wrong Answer" |
|
|
| |
| score = (passed_count / total) * MAX_SCORE_PER_QUESTION if total > 0 else 0 |
|
|
| submission_result = { |
| "question_id": question_id, |
| "total_cases": total, |
| "passed_cases": passed_count, |
| "results": results, |
| "overall": overall, |
| "score": round(score, 1), |
| "max_score": MAX_SCORE_PER_QUESTION, |
| "submitted_at": datetime.now().isoformat() |
| } |
|
|
| |
| try: |
| ws = await db.get_worksheet(room_id, student_id, question_id) |
| if ws: |
| await db.save_worksheet(ws["worksheet_id"], code) |
| await db.save_submission_result(ws["worksheet_id"], submission_result) |
| except Exception as e: |
| print(f"Warning: Could not save submission result: {e}") |
|
|
| return submission_result |
|
|
|
|
| |
| |
| |
|
|
| @app.get("/api/rooms/{room_id}/scores") |
| async def get_room_scores(room_id: str): |
| """ |
| Get aggregated scores for all students in a room. |
| Returns per-student, per-question scores based on their best submission. |
| """ |
| room = await db.get_room(room_id) |
| if not room: |
| raise HTTPException(status_code=404, detail="Room not found") |
|
|
| questions = room.get("questions", []) |
| students = room.get("students", []) |
| student_names = room.get("student_names", {}) |
| student_red_flags = room.get("student_red_flags", {}) |
|
|
| |
| all_worksheets = await db.get_all_worksheets_for_room(room_id) |
|
|
| |
| ws_lookup = {} |
| for ws in all_worksheets: |
| key = (ws["student_id"], ws["question_id"]) |
| ws_lookup[key] = ws |
|
|
| scores_data = [] |
| for sid in students: |
| is_blocked = student_red_flags.get(sid, 0) > MAX_VIOLATIONS_ALLOWED |
|
|
| student_entry = { |
| "student_id": sid, |
| "student_name": student_names.get(sid, "Unknown"), |
| "red_flags": student_red_flags.get(sid, 0), |
| "blocked": is_blocked, |
| "questions": [], |
| "total_score": 0, |
| "max_total": len(questions) * MAX_SCORE_PER_QUESTION |
| } |
|
|
| for q in questions: |
| qid = q["question_id"] |
| ws = ws_lookup.get((sid, qid)) |
|
|
| best_score = 0 |
| status = "not_attempted" |
|
|
| if ws and ws.get("submission_results"): |
| |
| for sub in ws["submission_results"]: |
| sub_score = sub.get("score", 0) |
| if sub_score > best_score: |
| best_score = sub_score |
| if sub.get("overall") == "Accepted": |
| status = "accepted" |
| elif status != "accepted": |
| status = "attempted" |
|
|
| |
| if is_blocked: |
| best_score = 0 |
| status = "blocked" |
|
|
| student_entry["questions"].append({ |
| "question_id": qid, |
| "score": best_score, |
| "max_score": MAX_SCORE_PER_QUESTION, |
| "status": status |
| }) |
| student_entry["total_score"] += best_score |
|
|
| scores_data.append(student_entry) |
|
|
| |
| scores_data.sort(key=lambda x: x["total_score"], reverse=True) |
|
|
| return { |
| "room_id": room_id, |
| "room_name": room.get("room_name"), |
| "max_score_per_question": MAX_SCORE_PER_QUESTION, |
| "total_questions": len(questions), |
| "total_students": len(students), |
| "scores": scores_data |
| } |
|
|
|
|
| |
| |
| |
|
|
| @app.get("/api/rooms/{room_id}/report") |
| async def generate_report(room_id: str): |
| """ |
| Generate and download a PDF exam report with scores and violations. |
| Uses ReportLab library. |
| """ |
| room = await db.get_room(room_id) |
| if not room: |
| raise HTTPException(status_code=404, detail="Room not found") |
|
|
| questions = room.get("questions", []) |
| students = room.get("students", []) |
| student_names = room.get("student_names", {}) |
| student_red_flags = room.get("student_red_flags", {}) |
|
|
| |
| all_worksheets = await db.get_all_worksheets_for_room(room_id) |
|
|
| ws_lookup = {} |
| for ws in all_worksheets: |
| key = (ws["student_id"], ws["question_id"]) |
| ws_lookup[key] = ws |
|
|
| |
| scores_data = [] |
| for sid in students: |
| student_entry = { |
| "student_id": sid, |
| "student_name": student_names.get(sid, "Unknown"), |
| "red_flags": student_red_flags.get(sid, 0), |
| "questions": [], |
| "total_score": 0, |
| "max_total": len(questions) * MAX_SCORE_PER_QUESTION |
| } |
|
|
| for q in questions: |
| qid = q["question_id"] |
| ws = ws_lookup.get((sid, qid)) |
|
|
| best_score = 0 |
| status = "not_attempted" |
|
|
| if ws and ws.get("submission_results"): |
| for sub in ws["submission_results"]: |
| sub_score = sub.get("score", 0) |
| if sub_score > best_score: |
| best_score = sub_score |
| if sub.get("overall") == "Accepted": |
| status = "accepted" |
| elif status != "accepted": |
| status = "attempted" |
|
|
| student_entry["questions"].append({ |
| "question_id": qid, |
| "score": best_score, |
| "max_score": MAX_SCORE_PER_QUESTION, |
| "status": status |
| }) |
| student_entry["total_score"] += best_score |
|
|
| scores_data.append(student_entry) |
|
|
| |
| scores_data.sort(key=lambda x: x["total_score"], reverse=True) |
|
|
| |
| pdf_bytes = generate_exam_report( |
| room_data=room, |
| scores_data=scores_data, |
| questions=questions |
| ) |
|
|
| |
| filename = f"exam_report_{room.get('room_name', 'report').replace(' ', '_')}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.pdf" |
| |
| return Response( |
| content=pdf_bytes, |
| media_type="application/pdf", |
| headers={ |
| "Content-Disposition": f"attachment; filename={filename}" |
| } |
| ) |
|
|
|
|
| |
| |
| |
|
|
| @app.get("/api/rooms/{room_id}/student-codes") |
| async def get_student_codes(room_id: str): |
| """Get all student codes in a room with student NAMES""" |
| room = await db.get_room(room_id) |
| if not room: |
| raise HTTPException(status_code=404, detail="Room not found") |
|
|
| student_codes = await db.get_student_codes_with_names(room_id) |
|
|
| return { |
| "room_id": room_id, |
| "student_codes": student_codes, |
| "timestamp": datetime.now().isoformat() |
| } |
|
|
|
|
| if __name__ == "__main__": |
| import uvicorn |
| uvicorn.run(app, host="0.0.0.0", port=8000) |