Spaces:
Sleeping
Sleeping
| # essay_assessor_gradio.py | |
| """ | |
| Essay Assessment Integrating Large Language Model (Gradio app) | |
| Requirements (install with pip): | |
| pip install gradio pymupdf groq python-docx pandas | |
| Environment: | |
| export GROQ_API_KEY="your_groq_api_key_here" | |
| Run: | |
| python essay_assessor_gradio.py | |
| """ | |
| import os | |
| import json | |
| import csv | |
| import io | |
| from typing import List, Dict, Any, Tuple | |
| from datetime import datetime | |
| # extra imports (add these near the top of your file) | |
| from docx import Document as DocxDocument # python-docx | |
| from pptx import Presentation as PptxPresentation # python-pptx | |
| from bs4 import BeautifulSoup | |
| import re | |
| import string | |
| import fitz # PyMuPDF for PDF text extraction | |
| import gradio as gr | |
| from groq import Groq | |
| from docx import Document | |
| import pandas as pd | |
| # --- Config --- | |
| GROQ_MODEL = "meta-llama/llama-4-maverick-17b-128e-instruct" | |
| client = Groq(api_key = "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") # reads GROQ_API_KEY from env var | |
| # --------- LLM helpers ---------- | |
| def call_groq(system_prompt: str, user_prompt: str, temperature: float = 0.7, max_tokens: int = 2048) -> str: | |
| """ | |
| Robust Groq caller: streams tokens and accumulates them into a final string. | |
| """ | |
| completion = client.chat.completions.create( | |
| model=GROQ_MODEL, | |
| messages=[ | |
| {"role": "system", "content": system_prompt}, | |
| {"role": "user", "content": user_prompt}, | |
| ], | |
| temperature=temperature, | |
| max_completion_tokens=max_tokens, | |
| top_p=1, | |
| stream=True, | |
| stop=None | |
| ) | |
| chunks = [] | |
| for chunk in completion: | |
| piece = chunk.choices[0].delta.content or "" | |
| chunks.append(piece) | |
| return "".join(chunks) | |
| # ---------- PDF / file utils ---------- | |
| def extract_text_from_pdf_bytes(pdf_bytes: bytes, max_pages: int = 200) -> str: | |
| text_parts = [] | |
| try: | |
| with fitz.open(stream=pdf_bytes, filetype="pdf") as doc: | |
| page_count = min(len(doc), max_pages) | |
| for i in range(page_count): | |
| page = doc.load_page(i) | |
| text_parts.append(page.get_text("text")) | |
| except Exception as e: | |
| return "" | |
| lines = [ln.strip() for ln in ("\n".join(text_parts)).splitlines() if ln.strip()] | |
| return "\n".join(lines) | |
| def read_text_file(filepath: str) -> str: | |
| try: | |
| with open(filepath, "r", encoding="utf-8") as f: | |
| return f.read() | |
| except Exception: | |
| try: | |
| with open(filepath, "rb") as f: | |
| raw = f.read() | |
| return raw.decode("utf-8", errors="ignore") | |
| except Exception: | |
| return "" | |
| def sanitize_text_for_preview(text: str, max_len: int = 4000) -> str: | |
| """ | |
| Keep printable characters + common whitespace (newline/tab). Truncate to max_len. | |
| Removes null bytes and weird binary residues. | |
| """ | |
| if not text: | |
| return "" | |
| # remove NULs | |
| text = text.replace("\x00", " ") | |
| # keep printable or newline/tab/carriage return | |
| cleaned_chars = [] | |
| for ch in text: | |
| if ch.isprintable() or ch in ("\n", "\r", "\t"): | |
| cleaned_chars.append(ch) | |
| else: | |
| # replace control / non-printable with space to avoid merging words | |
| cleaned_chars.append(" ") | |
| cleaned = "".join(cleaned_chars) | |
| # collapse excessive whitespace | |
| cleaned = re.sub(r"[ \t]{2,}", " ", cleaned) | |
| # protect long strings | |
| return cleaned[:max_len] | |
| def read_text_file(path: str) -> str: | |
| """Try reading text files robustly (txt, md).""" | |
| try: | |
| with open(path, "r", encoding="utf-8") as f: | |
| return f.read() | |
| except Exception: | |
| try: | |
| with open(path, "rb") as f: | |
| raw = f.read() | |
| return raw.decode("utf-8", errors="ignore") | |
| except Exception: | |
| return "" | |
| def extract_text_from_docx(path: str) -> str: | |
| """Extract paragraph and table text from a .docx file.""" | |
| try: | |
| doc = DocxDocument(path) | |
| parts = [] | |
| for p in doc.paragraphs: | |
| if p.text and p.text.strip(): | |
| parts.append(p.text.strip()) | |
| # also pull text from tables | |
| for table in doc.tables: | |
| for row in table.rows: | |
| for cell in row.cells: | |
| if cell.text and cell.text.strip(): | |
| parts.append(cell.text.strip()) | |
| return "\n".join(parts) | |
| except Exception: | |
| return "" | |
| def extract_text_from_pptx(path: str) -> str: | |
| """Extract text from slides in a .pptx file.""" | |
| try: | |
| prs = PptxPresentation(path) | |
| parts = [] | |
| for slide in prs.slides: | |
| for shape in slide.shapes: | |
| # Many shapes (pictures) don't have 'text' attr | |
| try: | |
| text = shape.text | |
| except Exception: | |
| text = None | |
| if text: | |
| t = text.strip() | |
| if t: | |
| parts.append(t) | |
| return "\n".join(parts) | |
| except Exception: | |
| return "" | |
| def extract_text_from_html(path: str) -> str: | |
| try: | |
| with open(path, "r", encoding="utf-8", errors="ignore") as f: | |
| html = f.read() | |
| soup = BeautifulSoup(html, "html.parser") | |
| return soup.get_text(separator="\n") | |
| except Exception: | |
| return "" | |
| def extract_text_from_path(path: str) -> str: | |
| """ | |
| Robust extractor that handles PDF, DOCX, PPTX, HTML, TXT/MD and falls back safely. | |
| Returns plain text (not binary). | |
| """ | |
| if not path: | |
| return "" | |
| path = str(path) | |
| _, ext = os.path.splitext(path.lower()) | |
| try: | |
| if ext == ".pdf": | |
| with open(path, "rb") as f: | |
| b = f.read() | |
| return extract_text_from_pdf_bytes(b) # keep your existing function | |
| elif ext == ".docx": | |
| return extract_text_from_docx(path) | |
| elif ext == ".pptx": | |
| return extract_text_from_pptx(path) | |
| elif ext in (".html", ".htm"): | |
| return extract_text_from_html(path) | |
| elif ext in (".txt", ".md", ".csv"): | |
| return read_text_file(path) | |
| else: | |
| # Try docx/pptx first (some uploads may have wrong extension) | |
| txt = extract_text_from_docx(path) | |
| if txt: | |
| return txt | |
| txt = extract_text_from_pptx(path) | |
| if txt: | |
| return txt | |
| # fallback: try reading as text | |
| return read_text_file(path) | |
| except Exception: | |
| return "" | |
| # ---------- Question generation ---------- | |
| SYSTEM_PROMPT_GEN = """ | |
| You are an experienced educational content creator. Given course material and a marking guide, | |
| produce a structured list of essay and short-answer questions (no MCQs). Output ONLY valid JSON. | |
| The output MUST be a JSON array of question objects. Each question object must include: | |
| - id: integer | |
| - type: "essay" or "short_answer" | |
| - question: string (the prompt shown to students) | |
| - rubric: string (brief marking guide / key points to look for) | |
| - max_score: integer (e.g., 10 for essay; 3-5 for short-answer) | |
| You MUST ensure questions are faithful to the provided course material and that rubrics reference key points. | |
| Do NOT include any additional commentary or text outside the JSON array. | |
| """ | |
| def build_gen_prompt(course_text: str, marking_guide: str, n_essay: int, n_short: int) -> str: | |
| return f""" | |
| Course material: | |
| \"\"\"{course_text[:20000]}\"\"\" | |
| Marking guide (instructor-provided). Use this to shape rubrics: | |
| \"\"\"{marking_guide[:5000]}\"\"\" | |
| Generate {n_essay} essay question(s) and {n_short} short-answer question(s). | |
| Return a JSON array of question objects (see schema in system message). | |
| """ | |
| def parse_json_from_model_output(raw: str) -> Any: | |
| s = raw.strip() | |
| # strip fences if present | |
| if s.startswith("```"): | |
| s = s.strip("`") | |
| if s.startswith("json"): | |
| s = s[len("json"):].strip() | |
| start = s.find("[") | |
| end = s.rfind("]") | |
| if start != -1 and end != -1 and end > start: | |
| s = s[start:end+1] | |
| try: | |
| return json.loads(s) | |
| except Exception: | |
| # fallback: try to find first brace/array and parse | |
| try: | |
| return json.loads(raw) | |
| except Exception: | |
| return None | |
| def generate_questions(course_text: str, marking_guide: str, n_essay: int, n_short: int, temperature: float = 0.6) -> Tuple[List[Dict[str, Any]], str]: | |
| prompt = build_gen_prompt(course_text, marking_guide, n_essay, n_short) | |
| raw = call_groq(SYSTEM_PROMPT_GEN, prompt, temperature=temperature, max_tokens=2048) | |
| parsed = parse_json_from_model_output(raw) | |
| questions = [] | |
| if isinstance(parsed, list): | |
| # enforce id and fields | |
| qid = 1 | |
| for item in parsed: | |
| if not isinstance(item, dict): | |
| continue | |
| qtype = item.get("type", "").lower() | |
| if qtype not in ("essay", "short_answer", "short-answer", "short"): | |
| continue | |
| qtype = "essay" if qtype == "essay" else "short_answer" | |
| question_text = item.get("question", "") or item.get("prompt", "") | |
| rubric = item.get("rubric", "") or item.get("marking_points", "") | |
| max_score = int(item.get("max_score", item.get("max", 10))) | |
| questions.append({ | |
| "id": qid, | |
| "type": qtype, | |
| "question": question_text, | |
| "rubric": rubric, | |
| "max_score": max_score | |
| }) | |
| qid += 1 | |
| # if parsed is None or empty, return empty questions and raw for debug | |
| return questions, raw | |
| # ---------- Grading ---------- | |
| SYSTEM_PROMPT_GRADE = """ | |
| You are a strict but fair grader. You will receive: | |
| - Question text | |
| - The rubric (key points, marking guide) | |
| - Maximum score | |
| - A student's answer | |
| Return ONLY a JSON object with keys: | |
| - score: integer (0 .. max_score) | |
| - feedback: string (concise feedback and suggestions to improve; include inline markup suggestions if useful) | |
| - annotated_answer: optional string (student's answer with short inline edits or markup for improvement) | |
| Be specific and consistent with the rubric. Do not add any explanation outside the JSON. | |
| """ | |
| def build_grade_prompt(question: str, rubric: str, max_score: int, student_answer: str) -> str: | |
| return f""" | |
| Question: | |
| {question} | |
| Rubric / Marking guide: | |
| {rubric} | |
| Max score: {max_score} | |
| Student answer: | |
| \"\"\"{student_answer}\"\"\" | |
| Return the grade JSON as specified. | |
| """ | |
| def parse_json_object(raw: str) -> Dict[str, Any]: | |
| s = raw.strip() | |
| if s.startswith("```"): | |
| s = s.strip("`") | |
| if s.startswith("json"): | |
| s = s[len("json"):].strip() | |
| start = s.find("{") | |
| end = s.rfind("}") | |
| if start != -1 and end != -1 and end > start: | |
| s = s[start:end+1] | |
| try: | |
| return json.loads(s) | |
| except Exception: | |
| # fallback: return basic failure dict | |
| return {"score": 0, "feedback": "Failed to parse grader output."} | |
| def grade_answer_with_llm(question: str, rubric: str, max_score: int, student_answer: str, temperature: float=0.2) -> Dict[str, Any]: | |
| prompt = build_grade_prompt(question, rubric, max_score, student_answer) | |
| raw = call_groq(SYSTEM_PROMPT_GRADE, prompt, temperature=temperature, max_tokens=1024) | |
| parsed = parse_json_object(raw) | |
| # clamp | |
| try: | |
| parsed["score"] = int(parsed.get("score", 0)) | |
| except Exception: | |
| parsed["score"] = 0 | |
| if parsed["score"] < 0: parsed["score"] = 0 | |
| if parsed["score"] > max_score: parsed["score"] = max_score | |
| if "feedback" not in parsed: | |
| parsed["feedback"] = "" | |
| return parsed | |
| # ---------- Export helpers ---------- | |
| def export_results_csv(questions: List[Dict[str,Any]], student_answers: List[Dict[str,Any]], results: List[Dict[str,Any]]) -> str: | |
| # Align by question id | |
| qmap = {q["id"]: q for q in questions} | |
| rows = [] | |
| for res in results: | |
| qid = res.get("id") | |
| q = qmap.get(qid, {}) | |
| rows.append({ | |
| "id": qid, | |
| "type": q.get("type",""), | |
| "question": q.get("question",""), | |
| "student_answer": res.get("student_answer",""), | |
| "score": res.get("score", 0), | |
| "max_score": res.get("max", q.get("max_score", None)), | |
| "feedback": res.get("feedback","") | |
| }) | |
| filename = f"essay_assessment_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv" | |
| with open(filename, "w", encoding="utf-8", newline="") as f: | |
| fieldnames = ["id","type","question","student_answer","score","max_score","feedback"] | |
| writer = csv.DictWriter(f, fieldnames=fieldnames) | |
| writer.writeheader() | |
| for r in rows: | |
| writer.writerow(r) | |
| return filename | |
| def export_results_docx(questions: List[Dict[str,Any]], results: List[Dict[str,Any]]) -> str: | |
| doc = Document() | |
| doc.add_heading("Essay Assessment Results", level=1) | |
| doc.add_paragraph(f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") | |
| total_score = sum([r.get("score",0) for r in results]) | |
| max_total = sum([r.get("max",0) for r in results]) | |
| doc.add_paragraph(f"Total score: {total_score} / {max_total}") | |
| for r in results: | |
| qid = r.get("id") | |
| qtext = r.get("question","") | |
| student_answer = r.get("student_answer","") | |
| score = r.get("score",0) | |
| max_score = r.get("max",0) | |
| feedback = r.get("feedback","") | |
| annotated = r.get("annotated_answer","") | |
| doc.add_heading(f"Q{qid} ({score}/{max_score})", level=2) | |
| doc.add_paragraph("Question:") | |
| doc.add_paragraph(qtext) | |
| doc.add_paragraph("Student answer:") | |
| doc.add_paragraph(student_answer) | |
| if annotated: | |
| doc.add_paragraph("Annotated answer (suggested edits):") | |
| doc.add_paragraph(annotated) | |
| doc.add_paragraph("Feedback:") | |
| doc.add_paragraph(feedback) | |
| doc.add_paragraph("-----") | |
| filename = f"essay_assessment_{datetime.now().strftime('%Y%m%d_%H%M%S')}.docx" | |
| doc.save(filename) | |
| return filename | |
| # ---------- Gradio app ---------- | |
| with gr.Blocks(title="Essay Assessment with LLM") as demo: | |
| gr.Markdown("# Essay Assessment Integrating Large Language Model") | |
| gr.Markdown( | |
| "Lecturers upload course material + marking guide to auto-create essay & short-answer questions. " | |
| "Students answer questions one-by-one. The AI grades answers against the marking guide and returns scores, feedback and annotated suggestions." | |
| ) | |
| # ----- States ----- | |
| state_course_text = gr.State("") | |
| state_marking_text = gr.State("") | |
| state_questions = gr.State([]) # list of question dicts | |
| state_student_answers = gr.State([]) # list of {"id", "student_answer", "timestamp"} | |
| state_current_idx = gr.State(0) | |
| state_results = gr.State([]) # list of graded results per question | |
| with gr.Tabs(): | |
| # ---------------- Lecturer tab ---------------- | |
| with gr.Tab("1) Lecturer — Upload & Generate"): | |
| with gr.Row(): | |
| course_file = gr.File(label="Course Material (PDF or TXT)", type="filepath") | |
| marking_file = gr.File(label="Marking Guide (PDF or TXT, optional)", type="filepath") | |
| manual_marking = gr.Textbox(label="Or paste Marking Guide (optional)", lines=6, placeholder="Paste marking guide or rubric here (optional)") | |
| with gr.Row(): | |
| n_essay = gr.Number(label="Number of essay questions", value=2, precision=0) | |
| n_short = gr.Number(label="Number of short-answer questions", value=2, precision=0) | |
| temp_slider = gr.Slider(0.0, 1.0, value=0.5, step=0.1, label="LLM creativity (temperature)") | |
| generate_btn = gr.Button("Generate Questions (LLM)") | |
| course_preview = gr.Textbox(label="Course preview (first 4000 chars)", lines=10) | |
| marking_preview = gr.Textbox(label="Marking guide preview (first 2000 chars)", lines=6) | |
| questions_json = gr.JSON(label="Generated Questions (JSON)") | |
| gen_debug = gr.Textbox(label="Raw model output (debug)", visible=False) | |
| def do_generate(course_path, marking_path, manual_marking_text, ne, ns, temp): | |
| # Extract texts | |
| course_text = "" | |
| marking_text = "" | |
| if course_path: | |
| course_text = extract_text_from_path(course_path) or "" | |
| if marking_path: | |
| marking_text = extract_text_from_path(marking_path) or "" | |
| if (not marking_text) and manual_marking_text: | |
| marking_text = manual_marking_text or "" | |
| # Basic validation: don't return empty string for JSON outputs | |
| if not course_text.strip(): | |
| # Return values MUST match the outputs list exactly (see below) | |
| return ( | |
| gr.update(value=""), # course_preview textbox | |
| gr.update(value=""), # marking_preview textbox | |
| {}, # questions_json (gr.JSON) -> empty dict | |
| "", # state_course_text (string) | |
| "", # state_marking_text (string) | |
| "", # gen_debug (textbox) | |
| [] # state_questions (gr.State) -> empty list | |
| ) | |
| # Call LLM to generate only essays & short answers | |
| try: | |
| questions, raw = generate_questions( | |
| course_text, | |
| marking_text, | |
| n_essay=int(ne or 0), | |
| n_short=int(ns or 0), | |
| temperature=float(temp or 0.6) | |
| ) | |
| except Exception as e: | |
| # If generation fails, return safe types | |
| return ( | |
| gr.update(value=sanitize_text_for_preview(course_text, max_len=4000)), | |
| gr.update(value=sanitize_text_for_preview(marking_text, max_len=2000)), | |
| {}, # questions JSON safe fallback | |
| course_text, | |
| marking_text, | |
| f"Generation failed: {e}", | |
| [] # state_questions safe fallback | |
| ) | |
| # sanitize previews (never show raw binary) | |
| course_preview = sanitize_text_for_preview(course_text, max_len=4000) | |
| marking_preview = sanitize_text_for_preview(marking_text, max_len=2000) | |
| # questions: expected to be a Python list of dicts (or empty list) | |
| questions_for_state = questions if isinstance(questions, list) else [] | |
| # return values in the exact order that the UI's generate_btn.click expects | |
| return ( | |
| gr.update(value=course_preview), # course_preview textbox | |
| gr.update(value=marking_preview), # marking_preview textbox | |
| questions if questions else {}, # questions_json (show list or {} to avoid JSON parse errors) | |
| course_text, # state_course_text (store full text) | |
| marking_text, # state_marking_text | |
| raw if raw is not None else "", # gen_debug raw model output (string) | |
| questions_for_state # state_questions (actual list stored for Student tab) | |
| ) | |
| generate_btn.click( | |
| do_generate, | |
| inputs=[course_file, marking_file, manual_marking, n_essay, n_short, temp_slider], | |
| outputs=[ | |
| course_preview, # course_preview (Textbox) | |
| marking_preview, # (if you have marking_preview separate change appropriately) | |
| questions_json, # Generated JSON shown | |
| state_course_text, # store full course text | |
| state_marking_text, # store source name or marking preview | |
| gen_debug, # raw debug | |
| state_questions # <-- NEW: store generated questions into state | |
| ] | |
| ) | |
| # ---------------- Student tab ---------------- | |
| with gr.Tab("2) Student — Take Test"): | |
| gr.Markdown("Students answer questions one at a time. Click **Start Test** to begin. Use **Submit Answer** to save each answer; when finished click **Finish & Grade** to have the AI assess everything.") | |
| start_btn = gr.Button("Start Test") | |
| progress_md = gr.Markdown("", visible=True) | |
| q_text = gr.Textbox(label="Question", interactive=False, lines=6) | |
| answer_tb = gr.Textbox(label="Your answer", lines=8, placeholder="Type your essay or short answer here...") | |
| submit_btn = gr.Button("Submit Answer (Save & Next)") | |
| finish_btn = gr.Button("Finish & Grade (AI)") | |
| student_status = gr.Textbox(label="Status", interactive=False, visible=False) | |
| # Start test: initialize index and answers list (state_student_answers) | |
| def start_test(questions): | |
| qs = questions or [] | |
| if not qs: | |
| return gr.update(value="No questions generated yet. Please ask your lecturer to create questions."), gr.update(value=""), [], 0, [], gr.update(value="No questions.", visible=True) | |
| # initialize | |
| idx0 = 0 | |
| student_answers0 = [] | |
| # show first question | |
| q0 = qs[idx0] | |
| prog = f"Question 1 of {len(qs)}" | |
| qtext_val = q0.get("question","") | |
| return gr.update(value=prog), gr.update(value=qtext_val), "", idx0, student_answers0, gr.update(value="Test started.", visible=True) | |
| start_btn.click( | |
| start_test, | |
| inputs=[state_questions], | |
| outputs=[progress_md, q_text, answer_tb, state_current_idx, state_student_answers, student_status] | |
| ) | |
| # Submit answer & advance | |
| def submit_answer(answer, questions, idx, student_answers): | |
| qs = questions or [] | |
| if not qs: | |
| return gr.update(value="No questions."), gr.update(value=""), "", idx, student_answers, gr.update(value="No questions.", visible=True) | |
| # store answer for current idx | |
| student_answers = list(student_answers or []) | |
| try: | |
| q = qs[idx] | |
| except IndexError: | |
| return gr.update(value=""), gr.update(value=""), "", idx, student_answers, gr.update(value="Index out of range.", visible=True) | |
| student_answers.append({ | |
| "id": q.get("id"), | |
| "type": q.get("type"), | |
| "question": q.get("question",""), | |
| "student_answer": answer or "", | |
| "timestamp": datetime.now().isoformat() | |
| }) | |
| # advance | |
| idx_next = idx + 1 | |
| total = len(qs) | |
| if idx_next >= total: | |
| # finished taking test (but not graded yet) | |
| return ( | |
| gr.update(value=f"Completed all {total} questions. Click 'Finish & Grade' to assess."), | |
| gr.update(value=""), | |
| "", | |
| idx_next, | |
| student_answers, | |
| gr.update(value="Answers saved. Ready to grade.", visible=True) | |
| ) | |
| # else show next question | |
| qnext = qs[idx_next] | |
| prog = f"Question {idx_next+1} of {total}" | |
| return gr.update(value=prog), gr.update(value=qnext.get("question","")), "", idx_next, student_answers, gr.update(value="Answer saved.", visible=True) | |
| submit_btn.click( | |
| submit_answer, | |
| inputs=[answer_tb, state_questions, state_current_idx, state_student_answers], | |
| outputs=[progress_md, q_text, answer_tb, state_current_idx, state_student_answers, student_status] | |
| ) | |
| # Finish & grade (calls LLM graders for each answer) | |
| def finish_and_grade(questions, student_answers): | |
| qs = questions or [] | |
| answers = student_answers or [] | |
| if not qs: | |
| return {}, "No questions to grade.", [] | |
| if len(answers) == 0: | |
| return {}, "No answers submitted.", [] | |
| # map student answers by id for easy lookup (preserve order from qs) | |
| ans_map = {a["id"]: a for a in answers} | |
| graded = [] | |
| total_score = 0 | |
| max_total = 0 | |
| for q in qs: | |
| qid = q.get("id") | |
| qtype = q.get("type") | |
| qtext = q.get("question","") | |
| rubric = q.get("rubric","") | |
| max_score = int(q.get("max_score", 10)) | |
| student_answer_obj = ans_map.get(qid, {}) | |
| student_text = student_answer_obj.get("student_answer","") | |
| # Grade using LLM | |
| grade_obj = grade_answer_with_llm(qtext, rubric, max_score, student_text) | |
| graded_item = { | |
| "id": qid, | |
| "type": qtype, | |
| "question": qtext, | |
| "student_answer": student_text, | |
| "score": grade_obj.get("score", 0), | |
| "max": max_score, | |
| "feedback": grade_obj.get("feedback",""), | |
| "annotated_answer": grade_obj.get("annotated_answer","") | |
| } | |
| graded.append(graded_item) | |
| total_score += graded_item["score"] | |
| max_total += max_score | |
| percentage = round(100.0 * total_score / max(1, max_total), 2) | |
| summary = { | |
| "total_score": total_score, | |
| "max_total": max_total, | |
| "percentage": percentage, | |
| "graded_count": len(graded) | |
| } | |
| # Save results to state_results | |
| return graded, json.dumps({"summary": summary, "details_count": len(graded)}), graded | |
| finish_btn.click( | |
| finish_and_grade, | |
| inputs=[state_questions, state_student_answers], | |
| outputs=[gr.JSON(label="Graded results (JSON)"), gr.Textbox(label="Summary (JSON string)"), state_results] | |
| ) | |
| # ---------------- Assessment tab ---------------- | |
| with gr.Tab("3) Assessment — Review Feedback"): | |
| gr.Markdown("View per-question AI assessment, feedback, annotated answer suggestions and plain text summary.") | |
| graded_json = gr.JSON(label="Graded results (list)") | |
| summary_text = gr.Textbox(label="Plaintext summary", lines=6, interactive=False) | |
| per_question_feedback = gr.Dataframe(headers=["id","type","score","max","feedback"], row_count=(1, "dynamic")) | |
| refresh_btn = gr.Button("Load latest assessment") | |
| def load_assessment(results): | |
| if not results: | |
| return {}, "", pd.DataFrame([], columns=["id","type","score","max","feedback"]) | |
| # build plain text summary | |
| total = sum([r.get("score",0) for r in results]) | |
| max_total = sum([r.get("max",0) for r in results]) | |
| pct = round(100.0 * total / max(1, max_total), 2) | |
| lines = [f"Total: {total} / {max_total} ({pct}%)", ""] | |
| rows = [] | |
| for r in results: | |
| lines.append(f"Q{r.get('id')}: {r.get('score')} / {r.get('max')} — {r.get('feedback')[:200]}") | |
| rows.append([r.get("id"), r.get("type"), r.get("score"), r.get("max"), r.get("feedback")]) | |
| return results, "\n".join(lines), pd.DataFrame(rows, columns=["id","type","score","max","feedback"]) | |
| refresh_btn.click(load_assessment, inputs=[state_results], outputs=[graded_json, summary_text, per_question_feedback]) | |
| # ---------------- Export tab ---------------- | |
| with gr.Tab("4) Export"): | |
| gr.Markdown("Export graded assessment (CSV / DOCX).") | |
| export_csv_btn = gr.Button("Export results CSV") | |
| export_docx_btn = gr.Button("Export results DOCX") | |
| exported_csv = gr.File(label="Downloaded CSV") | |
| exported_docx = gr.File(label="Downloaded DOCX") | |
| def do_export_csv(questions, results): | |
| if not results: | |
| return "" | |
| path = export_results_csv(questions or [], [], results or []) | |
| return path | |
| def do_export_docx(questions, results): | |
| if not results: | |
| return "" | |
| path = export_results_docx(questions or [], results or []) | |
| return path | |
| export_csv_btn.click(do_export_csv, inputs=[state_questions, state_results], outputs=[exported_csv]) | |
| export_docx_btn.click(do_export_docx, inputs=[state_questions, state_results], outputs=[exported_docx]) | |
| # Footer/help | |
| with gr.Accordion("Notes & Setup", open=False): | |
| gr.Markdown(""" | |
| **Setup** | |
| - Install: `pip install gradio pymupdf groq python-docx pandas` | |
| - Set `GROQ_API_KEY` in your environment. | |
| - Run: `python essay_assessor_gradio.py` | |
| **How it works** | |
| 1. Lecturer uploads course material & marking guide (or pastes the guide). Click *Generate Questions*. | |
| 2. Student clicks *Start Test*, answers each question one-by-one, and clicks *Finish & Grade*. | |
| 3. AI grades answers against provided marking guide; results appear in Assessment tab. | |
| 4. Export CSV/DOCX from Export tab. | |
| **Notes** | |
| - This MVP expects text-based PDFs (not scanned images). For OCR, add `pytesseract` + `pdf2image`. | |
| - The LLM is asked to return strict JSON. If generation/parsing fails, check the debug output on the Lecturer tab. | |
| """) | |
| if __name__ == "__main__": | |
| if not os.getenv("GROQ_API_KEY"): | |
| print("WARNING: GROQ_API_KEY not set. Set it before running the app.") | |
| demo.launch(debug=True) | |