Spaces:
Running
Running
| # import gradio as gr | |
| # import fitz # PyMuPDF | |
| # import torch | |
| # import os | |
| # import onnxruntime as ort | |
| # # --- IMPORT SESSION OPTIONS --- | |
| # from onnxruntime import SessionOptions, GraphOptimizationLevel | |
| # # --- LANGCHAIN & RAG IMPORTS --- | |
| # from langchain_text_splitters import RecursiveCharacterTextSplitter | |
| # from langchain_community.vectorstores import FAISS | |
| # from langchain_core.embeddings import Embeddings | |
| # # --- ONNX & MODEL IMPORTS --- | |
| # from transformers import AutoTokenizer | |
| # from optimum.onnxruntime import ORTModelForFeatureExtraction, ORTModelForCausalLM | |
| # from huggingface_hub import snapshot_download | |
| # # Force CPU Provider | |
| # PROVIDERS = ["CPUExecutionProvider"] | |
| # print(f"β‘ Running on: {PROVIDERS}") | |
| # # --------------------------------------------------------- | |
| # # 1. OPTIMIZED EMBEDDINGS (BGE-SMALL) | |
| # # --------------------------------------------------------- | |
| # class OnnxBgeEmbeddings(Embeddings): | |
| # def __init__(self): | |
| # model_name = "Xenova/bge-small-en-v1.5" | |
| # print(f"π Loading Embeddings: {model_name}...") | |
| # self.tokenizer = AutoTokenizer.from_pretrained(model_name) | |
| # self.model = ORTModelForFeatureExtraction.from_pretrained( | |
| # model_name, | |
| # export=False, | |
| # provider=PROVIDERS[0] | |
| # ) | |
| # def _process_batch(self, texts): | |
| # inputs = self.tokenizer(texts, padding=True, truncation=True, max_length=512, return_tensors="pt") | |
| # with torch.no_grad(): | |
| # outputs = self.model(**inputs) | |
| # embeddings = outputs.last_hidden_state[:, 0] | |
| # embeddings = torch.nn.functional.normalize(embeddings, p=2, dim=1) | |
| # return embeddings.numpy().tolist() | |
| # def embed_documents(self, texts): | |
| # return self._process_batch(texts) | |
| # def embed_query(self, text): | |
| # return self._process_batch(["Represent this sentence for searching relevant passages: " + text])[0] | |
| # # --------------------------------------------------------- | |
| # # 2. OPTIMIZED LLM (Qwen 2.5 - 0.5B) - STRICT GRADING | |
| # # --------------------------------------------------------- | |
| # class LLMEvaluator: | |
| # def __init__(self): | |
| # # Qwen 2.5 0.5B is fast but needs "Few-Shot" examples to be strict. | |
| # self.repo_id = "onnx-community/Qwen2.5-1.5B-Instruct" | |
| # self.local_dir = "onnx_qwen_local" | |
| # print(f"π Preparing CPU LLM: {self.repo_id}...") | |
| # if not os.path.exists(self.local_dir): | |
| # print(f"π₯ Downloading FP16 model to {self.local_dir}...") | |
| # snapshot_download( | |
| # repo_id=self.repo_id, | |
| # local_dir=self.local_dir, | |
| # allow_patterns=["config.json", "generation_config.json", "tokenizer*", "special_tokens_map.json", "*.jinja", "onnx/model_fp16.onnx*"] | |
| # ) | |
| # print("β Download complete.") | |
| # self.tokenizer = AutoTokenizer.from_pretrained(self.local_dir) | |
| # sess_options = SessionOptions() | |
| # sess_options.graph_optimization_level = GraphOptimizationLevel.ORT_DISABLE_ALL | |
| # self.model = ORTModelForCausalLM.from_pretrained( | |
| # self.local_dir, | |
| # subfolder="onnx", | |
| # file_name="model_fp16.onnx", | |
| # use_cache=True, | |
| # use_io_binding=False, | |
| # provider=PROVIDERS[0], | |
| # session_options=sess_options | |
| # ) | |
| # def evaluate(self, context, question, student_answer, max_marks): | |
| # # --- IMPROVED PROMPT STRATEGY --- | |
| # # 1. Role: We set the persona to a "Strict Logical Validator" not a "Teacher". | |
| # # 2. Few-Shot: We give examples of HALLUCINATIONS getting 0 marks. | |
| # system_prompt = f"""You are a strict Logic Validator. You are NOT a helpful assistant. | |
| # Your job is to check if the Student Answer is FACTUALLY present in the Context. | |
| # GRADING ALGORITHM: | |
| # 1. IF the Student Answer mentions things NOT in the Context -> PENALTY (-50% of the marks). | |
| # 2. IF the Student Answer interprets the text opposite to its meaning -> PENALTY (-100% of the marks). | |
| # 3. IF the Student Answer is generic fluff -> SCORE: 0. | |
| # --- EXAMPLE 1 (HALLUCINATION) --- | |
| # Context: The sky is blue due to Rayleigh scattering. | |
| # Question: Why is the sky blue? | |
| # Student Answer: Because the ocean reflects the water into the sky. | |
| # Analysis: The Context mentions 'Rayleigh scattering'. The student mentions 'ocean reflection'. These are different. The student is hallucinating outside facts. | |
| # Score: 0/{max_marks} | |
| # --- EXAMPLE 2 (CONTRADICTION) --- | |
| # Context: One must efface one's own personality. Good prose is like a windowpane. | |
| # Question: What does the author mean? | |
| # Student Answer: It means we should see the author's personality clearly. | |
| # Analysis: The text says 'efface' (remove) personality. The student says 'see' personality. This is a direct contradiction. | |
| # Score: 0/{max_marks} | |
| # --- EXAMPLE 3 (CORRECT) --- | |
| # Context: Mitochondria is the powerhouse of the cell. | |
| # Question: What is mitochondria? | |
| # Student Answer: It is the cell's powerhouse. | |
| # Analysis: Matches the text meaning exactly. | |
| # Score: {max_marks}/{max_marks} | |
| # """ | |
| # user_prompt = f""" | |
| # --- YOUR TASK --- | |
| # Context: | |
| # {context} | |
| # Question: | |
| # {question} | |
| # Student Answer: | |
| # {student_answer} | |
| # OUTPUT FORMAT: | |
| # Analysis: [Compare Student Answer vs Context. List any hallucinations or contradictions.] | |
| # Score: [X]/{max_marks} | |
| # """ | |
| # messages = [ | |
| # {"role": "system", "content": system_prompt}, | |
| # {"role": "user", "content": user_prompt} | |
| # ] | |
| # input_text = self.tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True) | |
| # inputs = self.tokenizer(input_text, return_tensors="pt") | |
| # # Lower temperature for strictness | |
| # with torch.no_grad(): | |
| # outputs = self.model.generate( | |
| # **inputs, | |
| # max_new_tokens=150, | |
| # temperature=0.1, # Strict logic, no creativity | |
| # top_p=0.2, # Cut off unlikely tokens | |
| # do_sample=True, | |
| # repetition_penalty=1.2 # Penalize repetition | |
| # ) | |
| # input_length = inputs['input_ids'].shape[1] | |
| # response = self.tokenizer.decode(outputs[0][input_length:], skip_special_tokens=True) | |
| # return response | |
| # # --------------------------------------------------------- | |
| # # 3. Main Application Logic | |
| # # --------------------------------------------------------- | |
| # class VectorSystem: | |
| # def __init__(self): | |
| # self.vector_store = None | |
| # self.embeddings = OnnxBgeEmbeddings() | |
| # self.llm = LLMEvaluator() | |
| # self.all_chunks = [] | |
| # self.total_chunks = 0 | |
| # def process_content(self, file_obj, raw_text): | |
| # # LOGIC: Check for exclusivity (Cannot have both file and text) | |
| # has_file = file_obj is not None | |
| # has_text = raw_text is not None and len(raw_text.strip()) > 0 | |
| # if has_file and has_text: | |
| # return "β Error: Please provide EITHER a file OR paste text, not both at the same time." | |
| # if not has_file and not has_text: | |
| # return "β οΈ No content provided. Please upload a file or paste text." | |
| # try: | |
| # text = "" | |
| # # Case 1: Process File | |
| # if has_file: | |
| # if file_obj.name.endswith('.pdf'): | |
| # doc = fitz.open(file_obj.name) | |
| # for page in doc: text += page.get_text() | |
| # elif file_obj.name.endswith('.txt'): | |
| # with open(file_obj.name, 'r', encoding='utf-8') as f: text = f.read() | |
| # else: | |
| # return "β Error: Only .pdf and .txt supported." | |
| # # Case 2: Process Raw Text | |
| # else: | |
| # text = raw_text | |
| # text_splitter = RecursiveCharacterTextSplitter(chunk_size=800, chunk_overlap=100) | |
| # self.all_chunks = text_splitter.split_text(text) | |
| # self.total_chunks = len(self.all_chunks) | |
| # if not self.all_chunks: return "Content empty." | |
| # metadatas = [{"id": i} for i in range(self.total_chunks)] | |
| # self.vector_store = FAISS.from_texts(self.all_chunks, self.embeddings, metadatas=metadatas) | |
| # return f"β Indexed {self.total_chunks} chunks." | |
| # except Exception as e: | |
| # return f"Error: {str(e)}" | |
| # def process_query(self, question, student_answer, max_marks): | |
| # if not self.vector_store: return "β οΈ Please upload a file or paste text first.", "" | |
| # if not question: return "β οΈ Enter a question.", "" | |
| # results = self.vector_store.similarity_search_with_score(question, k=1) | |
| # top_doc, score = results[0] | |
| # center_id = top_doc.metadata['id'] | |
| # start_id = max(0, center_id - 1) | |
| # end_id = min(self.total_chunks - 1, center_id + 1) | |
| # expanded_context = "" | |
| # for i in range(start_id, end_id + 1): | |
| # expanded_context += self.all_chunks[i] + "\n" | |
| # evidence_display = f"### π Expanded Context (Chunks {start_id} to {end_id}):\n" | |
| # evidence_display += f"> ... {expanded_context} ..." | |
| # llm_feedback = "Please enter a student answer to grade." | |
| # if student_answer: | |
| # llm_feedback = self.llm.evaluate(expanded_context, question, student_answer, max_marks) | |
| # return evidence_display, llm_feedback | |
| # system = VectorSystem() | |
| # with gr.Blocks(title="EduGenius AI Grader") as demo: | |
| # gr.Markdown("# β‘ EduGenius: CPU Optimized RAG") | |
| # gr.Markdown("Powered by **Qwen-2.5-0.5B** and **BGE-Small** (ONNX Optimized)") | |
| # with gr.Row(): | |
| # with gr.Column(scale=1): | |
| # gr.Markdown("### Source Input (Choose One)") | |
| # pdf_input = gr.File(label="Option A: Upload Chapter (PDF/TXT)") | |
| # gr.Markdown("**OR**") | |
| # text_input = gr.Textbox(label="Option B: Paste Context", placeholder="Paste text here if you don't have a file...", lines=5) | |
| # upload_btn = gr.Button("Index Content", variant="primary") | |
| # status_msg = gr.Textbox(label="Status", interactive=False) | |
| # with gr.Column(scale=2): | |
| # with gr.Row(): | |
| # q_input = gr.Textbox(label="Question", scale=2) | |
| # max_marks = gr.Slider(minimum=1, maximum=20, value=5, step=1, label="Max Marks") | |
| # a_input = gr.TextArea(label="Student Answer") | |
| # run_btn = gr.Button("Retrieve & Grade", variant="secondary") | |
| # with gr.Row(): | |
| # evidence_box = gr.Markdown(label="Context Used") | |
| # grade_box = gr.Markdown(label="Grading Result") | |
| # # Pass both inputs to the process_content function | |
| # upload_btn.click(system.process_content, inputs=[pdf_input, text_input], outputs=[status_msg]) | |
| # run_btn.click(system.process_query, inputs=[q_input, a_input, max_marks], outputs=[evidence_box, grade_box]) | |
| # if __name__ == "__main__": | |
| # demo.launch() | |
| # import gradio as gr | |
| # import fitz # PyMuPDF | |
| # import torch | |
| # import os | |
| # import numpy as np | |
| # # --- IMPORT SESSION OPTIONS --- | |
| # from onnxruntime import SessionOptions, GraphOptimizationLevel | |
| # # --- LANGCHAIN & RAG IMPORTS --- | |
| # from langchain_text_splitters import RecursiveCharacterTextSplitter | |
| # from langchain_community.vectorstores import FAISS | |
| # from langchain_core.embeddings import Embeddings | |
| # from langchain_core.documents import Document | |
| # # --- ONNX & MODEL IMPORTS --- | |
| # from transformers import AutoTokenizer | |
| # from optimum.onnxruntime import ORTModelForFeatureExtraction, ORTModelForCausalLM, ORTModelForSequenceClassification | |
| # from huggingface_hub import snapshot_download | |
| # # Force CPU Provider | |
| # PROVIDERS = ["CPUExecutionProvider"] | |
| # print(f"β‘ Running on: {PROVIDERS}") | |
| # # --------------------------------------------------------- | |
| # # 1. OPTIMIZED EMBEDDINGS (BGE-SMALL) | |
| # # --------------------------------------------------------- | |
| # class OnnxBgeEmbeddings(Embeddings): | |
| # def __init__(self): | |
| # model_name = "Xenova/bge-small-en-v1.5" | |
| # print(f"π Loading Embeddings: {model_name}...") | |
| # self.tokenizer = AutoTokenizer.from_pretrained(model_name) | |
| # self.model = ORTModelForFeatureExtraction.from_pretrained( | |
| # model_name, | |
| # export=False, | |
| # provider=PROVIDERS[0] | |
| # ) | |
| # def _process_batch(self, texts): | |
| # inputs = self.tokenizer(texts, padding=True, truncation=True, max_length=512, return_tensors="pt") | |
| # with torch.no_grad(): | |
| # outputs = self.model(**inputs) | |
| # embeddings = outputs.last_hidden_state[:, 0] | |
| # embeddings = torch.nn.functional.normalize(embeddings, p=2, dim=1) | |
| # return embeddings.numpy().tolist() | |
| # def embed_documents(self, texts): | |
| # return self._process_batch(texts) | |
| # def embed_query(self, text): | |
| # return self._process_batch(["Represent this sentence for searching relevant passages: " + text])[0] | |
| # # --------------------------------------------------------- | |
| # # 2. OPTIMIZED LLM (Qwen 2.5 - 0.5B) - STRICT GRADING | |
| # # --------------------------------------------------------- | |
| # class LLMEvaluator: | |
| # def __init__(self): | |
| # # Qwen 2.5 0.5B is fast but needs "Few-Shot" examples to be strict. | |
| # self.repo_id = "onnx-community/Qwen2.5-1.5B-Instruct" | |
| # self.local_dir = "onnx_qwen_local" | |
| # print(f"π Preparing CPU LLM: {self.repo_id}...") | |
| # if not os.path.exists(self.local_dir): | |
| # print(f"π₯ Downloading FP16 model to {self.local_dir}...") | |
| # snapshot_download( | |
| # repo_id=self.repo_id, | |
| # local_dir=self.local_dir, | |
| # allow_patterns=["config.json", "generation_config.json", "tokenizer*", "special_tokens_map.json", "*.jinja", "onnx/model_fp16.onnx*"] | |
| # ) | |
| # print("β Download complete.") | |
| # self.tokenizer = AutoTokenizer.from_pretrained(self.local_dir) | |
| # sess_options = SessionOptions() | |
| # sess_options.graph_optimization_level = GraphOptimizationLevel.ORT_DISABLE_ALL | |
| # self.model = ORTModelForCausalLM.from_pretrained( | |
| # self.local_dir, | |
| # subfolder="onnx", | |
| # file_name="model_fp16.onnx", | |
| # use_cache=True, | |
| # use_io_binding=False, | |
| # provider=PROVIDERS[0], | |
| # session_options=sess_options | |
| # ) | |
| # def evaluate(self, context, question, student_answer, max_marks): | |
| # # --- IMPROVED PROMPT STRATEGY --- | |
| # system_prompt = f"""You are a strict Logic Validator. You are NOT a helpful assistant. | |
| # Your job is to check if the Student Answer is FACTUALLY present in the Context. | |
| # GRADING ALGORITHM: | |
| # 1. IF the Student Answer mentions things NOT in the Context -> PENALTY (-50% of the marks). | |
| # 2. IF the Student Answer interprets the text opposite to its meaning -> PENALTY (-100% of the marks). | |
| # 3. IF the Student Answer is generic fluff -> SCORE: 0. | |
| # --- EXAMPLE 1 (HALLUCINATION) --- | |
| # Context: The sky is blue due to Rayleigh scattering. | |
| # Question: Why is the sky blue? | |
| # Student Answer: Because the ocean reflects the water into the sky. | |
| # Analysis: The Context mentions 'Rayleigh scattering'. The student mentions 'ocean reflection'. These are different. The student is hallucinating outside facts. | |
| # Score: 0/{max_marks} | |
| # --- EXAMPLE 2 (CONTRADICTION) --- | |
| # Context: One must efface one's own personality. Good prose is like a windowpane. | |
| # Question: What does the author mean? | |
| # Student Answer: It means we should see the author's personality clearly. | |
| # Analysis: The text says 'efface' (remove) personality. The student says 'see' personality. This is a direct contradiction. | |
| # Score: 0/{max_marks} | |
| # --- EXAMPLE 3 (CORRECT) --- | |
| # Context: Mitochondria is the powerhouse of the cell. | |
| # Question: What is mitochondria? | |
| # Student Answer: It is the cell's powerhouse. | |
| # Analysis: Matches the text meaning exactly. | |
| # Score: {max_marks}/{max_marks} | |
| # """ | |
| # user_prompt = f""" | |
| # --- YOUR TASK --- | |
| # Context: | |
| # {context} | |
| # Question: | |
| # {question} | |
| # Student Answer: | |
| # {student_answer} | |
| # OUTPUT FORMAT: | |
| # Analysis: [Compare Student Answer vs Context. List any hallucinations or contradictions.] | |
| # Score: [X]/{max_marks} | |
| # """ | |
| # messages = [ | |
| # {"role": "system", "content": system_prompt}, | |
| # {"role": "user", "content": user_prompt} | |
| # ] | |
| # input_text = self.tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True) | |
| # inputs = self.tokenizer(input_text, return_tensors="pt") | |
| # # Lower temperature for strictness | |
| # with torch.no_grad(): | |
| # outputs = self.model.generate( | |
| # **inputs, | |
| # max_new_tokens=150, | |
| # temperature=0.1, # Strict logic, no creativity | |
| # top_p=0.2, # Cut off unlikely tokens | |
| # do_sample=True, | |
| # repetition_penalty=1.2 # Penalize repetition | |
| # ) | |
| # input_length = inputs['input_ids'].shape[1] | |
| # response = self.tokenizer.decode(outputs[0][input_length:], skip_special_tokens=True) | |
| # return response | |
| # # --------------------------------------------------------- | |
| # # 3. NEW: ONNX RERANKER (Cross-Encoder) | |
| # # Uses existing 'optimum' & 'transformers' libs (No new deps) | |
| # # --------------------------------------------------------- | |
| # class OnnxReranker: | |
| # def __init__(self): | |
| # # TinyBERT is ~17MB and very fast on CPU | |
| # self.model_name = "Xenova/ms-marco-TinyBERT-L-2-v2" | |
| # print(f"π Loading Reranker: {self.model_name}...") | |
| # self.tokenizer = AutoTokenizer.from_pretrained(self.model_name) | |
| # self.model = ORTModelForSequenceClassification.from_pretrained( | |
| # self.model_name, | |
| # export=False, | |
| # provider=PROVIDERS[0] | |
| # ) | |
| # def rank(self, query, docs, top_k=3): | |
| # if not docs: | |
| # return [] | |
| # # Prepare pairs: [query, doc_text] | |
| # pairs = [[query, doc.page_content] for doc in docs] | |
| # inputs = self.tokenizer( | |
| # pairs, | |
| # padding=True, | |
| # truncation=True, | |
| # max_length=512, | |
| # return_tensors="pt" | |
| # ) | |
| # with torch.no_grad(): | |
| # outputs = self.model(**inputs) | |
| # # Get logits (Relevance scores) | |
| # # MS-Marco models typically output a single logit or [irrelevant, relevant] | |
| # logits = outputs.logits | |
| # if logits.shape[1] == 2: | |
| # scores = logits[:, 1] # Take the "relevant" class score | |
| # else: | |
| # scores = logits.flatten() | |
| # # Sort docs by score (descending) | |
| # scores = scores.numpy().tolist() | |
| # doc_score_pairs = list(zip(docs, scores)) | |
| # doc_score_pairs.sort(key=lambda x: x[1], reverse=True) | |
| # # Return top K docs | |
| # return [doc for doc, score in doc_score_pairs[:top_k]] | |
| # # --------------------------------------------------------- | |
| # # 4. Main Application Logic | |
| # # --------------------------------------------------------- | |
| # class VectorSystem: | |
| # def __init__(self): | |
| # self.vector_store = None | |
| # self.embeddings = OnnxBgeEmbeddings() | |
| # self.llm = LLMEvaluator() | |
| # self.reranker = OnnxReranker() # Initialize Reranker | |
| # self.all_chunks = [] | |
| # self.total_chunks = 0 | |
| # def process_content(self, file_obj, raw_text): | |
| # has_file = file_obj is not None | |
| # has_text = raw_text is not None and len(raw_text.strip()) > 0 | |
| # if has_file and has_text: | |
| # return "β Error: Please provide EITHER a file OR paste text, not both at the same time." | |
| # if not has_file and not has_text: | |
| # return "β οΈ No content provided. Please upload a file or paste text." | |
| # try: | |
| # text = "" | |
| # if has_file: | |
| # if file_obj.name.endswith('.pdf'): | |
| # doc = fitz.open(file_obj.name) | |
| # for page in doc: text += page.get_text() | |
| # elif file_obj.name.endswith('.txt'): | |
| # with open(file_obj.name, 'r', encoding='utf-8') as f: text = f.read() | |
| # else: | |
| # return "β Error: Only .pdf and .txt supported." | |
| # else: | |
| # text = raw_text | |
| # # Smaller chunks for Reranking precision (500 chars) | |
| # text_splitter = RecursiveCharacterTextSplitter(chunk_size=500, chunk_overlap=100) | |
| # texts = text_splitter.split_text(text) | |
| # self.all_chunks = texts # Keep plain text list for reference | |
| # # Create Document objects with metadata | |
| # docs = [Document(page_content=t, metadata={"id": i}) for i, t in enumerate(texts)] | |
| # self.total_chunks = len(docs) | |
| # if not docs: return "Content empty." | |
| # self.vector_store = FAISS.from_documents(docs, self.embeddings) | |
| # return f"β Indexed {self.total_chunks} chunks." | |
| # except Exception as e: | |
| # return f"Error: {str(e)}" | |
| # def process_query(self, question, student_answer, max_marks): | |
| # if not self.vector_store: return "β οΈ Please upload a file or paste text first.", "" | |
| # if not question: return "β οΈ Enter a question.", "" | |
| # # Step A: Wide Net Retrieval (Get top 15 candidates) | |
| # # We fetch more than we need to ensure the answer is in the candidate pool | |
| # initial_docs = self.vector_store.similarity_search(question, k=15) | |
| # # Step B: Rerank (Get top 3 best matches) | |
| # # The Cross-Encoder strictly judges relevance | |
| # top_docs = self.reranker.rank(question, initial_docs, top_k=3) | |
| # # Step C: Construct Context | |
| # # We merge the top 3 specific chunks | |
| # expanded_context = "\n\n---\n\n".join([d.page_content for d in top_docs]) | |
| # evidence_display = f"### π Optimized Context (Top {len(top_docs)} chunks after Reranking):\n" | |
| # evidence_display += f"> {expanded_context} ..." | |
| # llm_feedback = "Please enter a student answer to grade." | |
| # if student_answer: | |
| # llm_feedback = self.llm.evaluate(expanded_context, question, student_answer, max_marks) | |
| # return evidence_display, llm_feedback | |
| # system = VectorSystem() | |
| # with gr.Blocks(title="EduGenius AI Grader") as demo: | |
| # gr.Markdown("# β‘ EduGenius: CPU Optimized RAG") | |
| # gr.Markdown("Powered by **Qwen-2.5-0.5B**, **BGE-Small** & **TinyBERT Reranker**") | |
| # with gr.Row(): | |
| # with gr.Column(scale=1): | |
| # gr.Markdown("### Source Input (Choose One)") | |
| # pdf_input = gr.File(label="Option A: Upload Chapter (PDF/TXT)") | |
| # gr.Markdown("**OR**") | |
| # text_input = gr.Textbox(label="Option B: Paste Context", placeholder="Paste text here if you don't have a file...", lines=5) | |
| # upload_btn = gr.Button("Index Content", variant="primary") | |
| # status_msg = gr.Textbox(label="Status", interactive=False) | |
| # with gr.Column(scale=2): | |
| # with gr.Row(): | |
| # q_input = gr.Textbox(label="Question", scale=2) | |
| # max_marks = gr.Slider(minimum=1, maximum=20, value=5, step=1, label="Max Marks") | |
| # a_input = gr.TextArea(label="Student Answer") | |
| # run_btn = gr.Button("Retrieve & Grade", variant="secondary") | |
| # with gr.Row(): | |
| # evidence_box = gr.Markdown(label="Context Used") | |
| # grade_box = gr.Markdown(label="Grading Result") | |
| # # Pass both inputs to the process_content function | |
| # upload_btn.click(system.process_content, inputs=[pdf_input, text_input], outputs=[status_msg]) | |
| # run_btn.click(system.process_query, inputs=[q_input, a_input, max_marks], outputs=[evidence_box, grade_box]) | |
| # if __name__ == "__main__": | |
| # demo.launch() | |
| # import gradio as gr | |
| # import fitz # PyMuPDF | |
| # import torch | |
| # import os | |
| # import numpy as np | |
| # # --- IMPORT SESSION OPTIONS --- | |
| # from onnxruntime import SessionOptions, GraphOptimizationLevel | |
| # # --- LANGCHAIN & RAG IMPORTS --- | |
| # from langchain_text_splitters import RecursiveCharacterTextSplitter | |
| # from langchain_community.vectorstores import FAISS | |
| # from langchain_core.embeddings import Embeddings | |
| # from langchain_core.documents import Document | |
| # # --- ONNX & MODEL IMPORTS --- | |
| # from transformers import AutoTokenizer | |
| # from optimum.onnxruntime import ORTModelForFeatureExtraction, ORTModelForCausalLM, ORTModelForSequenceClassification | |
| # from huggingface_hub import snapshot_download | |
| # # Force CPU Provider | |
| # PROVIDERS = ["CPUExecutionProvider"] | |
| # print(f"β‘ Running on: {PROVIDERS}") | |
| # # --------------------------------------------------------- | |
| # # 1. OPTIMIZED EMBEDDINGS (BGE-SMALL) | |
| # # --------------------------------------------------------- | |
| # class OnnxBgeEmbeddings(Embeddings): | |
| # def __init__(self): | |
| # model_name = "Xenova/bge-small-en-v1.5" | |
| # print(f"π Loading Embeddings: {model_name}...") | |
| # self.tokenizer = AutoTokenizer.from_pretrained(model_name) | |
| # self.model = ORTModelForFeatureExtraction.from_pretrained( | |
| # model_name, | |
| # export=False, | |
| # provider=PROVIDERS[0] | |
| # ) | |
| # def _process_batch(self, texts): | |
| # inputs = self.tokenizer(texts, padding=True, truncation=True, max_length=512, return_tensors="pt") | |
| # with torch.no_grad(): | |
| # outputs = self.model(**inputs) | |
| # embeddings = outputs.last_hidden_state[:, 0] | |
| # embeddings = torch.nn.functional.normalize(embeddings, p=2, dim=1) | |
| # return embeddings.numpy().tolist() | |
| # def embed_documents(self, texts): | |
| # return self._process_batch(texts) | |
| # def embed_query(self, text): | |
| # return self._process_batch(["Represent this sentence for searching relevant passages: " + text])[0] | |
| # # --------------------------------------------------------- | |
| # # 2. OPTIMIZED LLM (Qwen 2.5 - 0.5B) - STRICT GRADING | |
| # # --------------------------------------------------------- | |
| # class LLMEvaluator: | |
| # def __init__(self): | |
| # # Qwen 2.5 0.5B is fast but needs "Few-Shot" examples to be strict. | |
| # self.repo_id = "onnx-community/Qwen2.5-0.5B-Instruct" | |
| # self.local_dir = "onnx_qwen_local" | |
| # print(f"π Preparing CPU LLM: {self.repo_id}...") | |
| # if not os.path.exists(self.local_dir): | |
| # print(f"π₯ Downloading FP16 model to {self.local_dir}...") | |
| # snapshot_download( | |
| # repo_id=self.repo_id, | |
| # local_dir=self.local_dir, | |
| # allow_patterns=["config.json", "generation_config.json", "tokenizer*", "special_tokens_map.json", "*.jinja", "onnx/model_fp16.onnx*"] | |
| # ) | |
| # print("β Download complete.") | |
| # self.tokenizer = AutoTokenizer.from_pretrained(self.local_dir) | |
| # sess_options = SessionOptions() | |
| # sess_options.graph_optimization_level = GraphOptimizationLevel.ORT_DISABLE_ALL | |
| # self.model = ORTModelForCausalLM.from_pretrained( | |
| # self.local_dir, | |
| # subfolder="onnx", | |
| # file_name="model_fp16.onnx", | |
| # use_cache=True, | |
| # use_io_binding=False, | |
| # provider=PROVIDERS[0], | |
| # session_options=sess_options | |
| # ) | |
| # def evaluate(self, context, question, student_answer, max_marks): | |
| # # --- IMPROVED PROMPT STRATEGY --- | |
| # system_prompt = f"""You are a strict Logic Validator. You are NOT a helpful assistant. | |
| # Your job is to check if the Student Answer is FACTUALLY present in the Context. | |
| # GRADING ALGORITHM: | |
| # 1. IF the Student Answer mentions things NOT in the Context -> PENALTY (-50% of the marks). | |
| # 2. IF the Student Answer interprets the text opposite to its meaning -> PENALTY (-100% of the marks). | |
| # 3. IF the Student Answer is generic fluff -> SCORE: 0. | |
| # --- EXAMPLE 1 (HALLUCINATION) --- | |
| # Context: The sky is blue due to Rayleigh scattering. | |
| # Question: Why is the sky blue? | |
| # Student Answer: Because the ocean reflects the water into the sky. | |
| # Analysis: The Context mentions 'Rayleigh scattering'. The student mentions 'ocean reflection'. These are different. The student is hallucinating outside facts. | |
| # Score: 0/{max_marks} | |
| # --- EXAMPLE 2 (CONTRADICTION) --- | |
| # Context: One must efface one's own personality. Good prose is like a windowpane. | |
| # Question: What does the author mean? | |
| # Student Answer: It means we should see the author's personality clearly. | |
| # Analysis: The text says 'efface' (remove) personality. The student says 'see' personality. This is a direct contradiction. | |
| # Score: 0/{max_marks} | |
| # --- EXAMPLE 3 (CORRECT) --- | |
| # Context: Mitochondria is the powerhouse of the cell. | |
| # Question: What is mitochondria? | |
| # Student Answer: It is the cell's powerhouse. | |
| # Analysis: Matches the text meaning exactly. | |
| # Score: {max_marks}/{max_marks} | |
| # """ | |
| # user_prompt = f""" | |
| # --- YOUR TASK --- | |
| # Context: | |
| # {context} | |
| # Question: | |
| # {question} | |
| # Student Answer: | |
| # {student_answer} | |
| # OUTPUT FORMAT: | |
| # Analysis: [Compare Student Answer vs Context. List any hallucinations or contradictions.] | |
| # Score: [X]/{max_marks} | |
| # """ | |
| # messages = [ | |
| # {"role": "system", "content": system_prompt}, | |
| # {"role": "user", "content": user_prompt} | |
| # ] | |
| # input_text = self.tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True) | |
| # inputs = self.tokenizer(input_text, return_tensors="pt") | |
| # # Lower temperature for strictness | |
| # with torch.no_grad(): | |
| # outputs = self.model.generate( | |
| # **inputs, | |
| # max_new_tokens=150, | |
| # temperature=0.1, # Strict logic, no creativity | |
| # top_p=0.2, # Cut off unlikely tokens | |
| # do_sample=True, | |
| # repetition_penalty=1.2 # Penalize repetition | |
| # ) | |
| # input_length = inputs['input_ids'].shape[1] | |
| # response = self.tokenizer.decode(outputs[0][input_length:], skip_special_tokens=True) | |
| # return response | |
| # # --------------------------------------------------------- | |
| # # 3. NEW: ONNX RERANKER (Cross-Encoder) | |
| # # Uses existing 'optimum' & 'transformers' libs (No new deps) | |
| # # --------------------------------------------------------- | |
| # class OnnxReranker: | |
| # def __init__(self): | |
| # # TinyBERT is ~17MB and very fast on CPU | |
| # self.model_name = "Xenova/ms-marco-TinyBERT-L-2-v2" | |
| # print(f"π Loading Reranker: {self.model_name}...") | |
| # self.tokenizer = AutoTokenizer.from_pretrained(self.model_name) | |
| # self.model = ORTModelForSequenceClassification.from_pretrained( | |
| # self.model_name, | |
| # export=False, | |
| # provider=PROVIDERS[0] | |
| # ) | |
| # def rank(self, query, docs, top_k=3): | |
| # if not docs: | |
| # return [] | |
| # # Prepare pairs: [query, doc_text] | |
| # pairs = [[query, doc.page_content] for doc in docs] | |
| # inputs = self.tokenizer( | |
| # pairs, | |
| # padding=True, | |
| # truncation=True, | |
| # max_length=512, | |
| # return_tensors="pt" | |
| # ) | |
| # with torch.no_grad(): | |
| # outputs = self.model(**inputs) | |
| # # Get logits (Relevance scores) | |
| # # MS-Marco models typically output a single logit or [irrelevant, relevant] | |
| # logits = outputs.logits | |
| # if logits.shape[1] == 2: | |
| # scores = logits[:, 1] # Take the "relevant" class score | |
| # else: | |
| # scores = logits.flatten() | |
| # # Sort docs by score (descending) | |
| # scores = scores.numpy().tolist() | |
| # doc_score_pairs = list(zip(docs, scores)) | |
| # doc_score_pairs.sort(key=lambda x: x[1], reverse=True) | |
| # # Return top K docs | |
| # return [doc for doc, score in doc_score_pairs[:top_k]] | |
| # # --------------------------------------------------------- | |
| # # 4. Main Application Logic | |
| # # --------------------------------------------------------- | |
| # class VectorSystem: | |
| # def __init__(self): | |
| # self.vector_store = None | |
| # self.embeddings = OnnxBgeEmbeddings() | |
| # self.llm = LLMEvaluator() | |
| # self.reranker = OnnxReranker() # Initialize Reranker | |
| # self.all_chunks = [] | |
| # self.total_chunks = 0 | |
| # def process_content(self, file_obj, raw_text): | |
| # has_file = file_obj is not None | |
| # has_text = raw_text is not None and len(raw_text.strip()) > 0 | |
| # if has_file and has_text: | |
| # return "β Error: Please provide EITHER a file OR paste text, not both at the same time." | |
| # if not has_file and not has_text: | |
| # return "β οΈ No content provided. Please upload a file or paste text." | |
| # try: | |
| # text = "" | |
| # if has_file: | |
| # if file_obj.name.endswith('.pdf'): | |
| # doc = fitz.open(file_obj.name) | |
| # for page in doc: text += page.get_text() | |
| # elif file_obj.name.endswith('.txt'): | |
| # with open(file_obj.name, 'r', encoding='utf-8') as f: text = f.read() | |
| # else: | |
| # return "β Error: Only .pdf and .txt supported." | |
| # else: | |
| # text = raw_text | |
| # # Smaller chunks for Reranking precision (500 chars) | |
| # text_splitter = RecursiveCharacterTextSplitter(chunk_size=500, chunk_overlap=100) | |
| # texts = text_splitter.split_text(text) | |
| # self.all_chunks = texts # Keep plain text list for reference | |
| # # Create Document objects with metadata | |
| # docs = [Document(page_content=t, metadata={"id": i}) for i, t in enumerate(texts)] | |
| # self.total_chunks = len(docs) | |
| # if not docs: return "Content empty." | |
| # self.vector_store = FAISS.from_documents(docs, self.embeddings) | |
| # return f"β Indexed {self.total_chunks} chunks." | |
| # except Exception as e: | |
| # return f"Error: {str(e)}" | |
| # def process_query(self, question, student_answer, max_marks): | |
| # if not self.vector_store: return "β οΈ Please upload a file or paste text first.", "" | |
| # if not question: return "β οΈ Enter a question.", "" | |
| # # Step A: Wide Net Retrieval (Get top 15 candidates) | |
| # # We fetch more than we need to ensure the answer is in the candidate pool | |
| # initial_docs = self.vector_store.similarity_search(question, k=15) | |
| # # Step B: Rerank (Get top 3 best matches) | |
| # # The Cross-Encoder strictly judges relevance | |
| # top_docs = self.reranker.rank(question, initial_docs, top_k=3) | |
| # # Step C: Construct Context | |
| # # We merge the top 3 specific chunks | |
| # expanded_context = "\n\n---\n\n".join([d.page_content for d in top_docs]) | |
| # evidence_display = f"### π Optimized Context (Top {len(top_docs)} chunks after Reranking):\n" | |
| # evidence_display += f"> {expanded_context} ..." | |
| # llm_feedback = "Please enter a student answer to grade." | |
| # if student_answer: | |
| # llm_feedback = self.llm.evaluate(expanded_context, question, student_answer, max_marks) | |
| # return evidence_display, llm_feedback | |
| # system = VectorSystem() | |
| # with gr.Blocks(title="EduGenius AI Grader") as demo: | |
| # gr.Markdown("# β‘ EduGenius: CPU Optimized RAG") | |
| # gr.Markdown("Powered by **Qwen-2.5-0.5B**, **BGE-Small** & **TinyBERT Reranker**") | |
| # with gr.Row(): | |
| # with gr.Column(scale=1): | |
| # gr.Markdown("### Source Input (Choose One)") | |
| # pdf_input = gr.File(label="Option A: Upload Chapter (PDF/TXT)") | |
| # gr.Markdown("**OR**") | |
| # text_input = gr.Textbox(label="Option B: Paste Context", placeholder="Paste text here if you don't have a file...", lines=5) | |
| # upload_btn = gr.Button("Index Content", variant="primary") | |
| # status_msg = gr.Textbox(label="Status", interactive=False) | |
| # with gr.Column(scale=2): | |
| # with gr.Row(): | |
| # q_input = gr.Textbox(label="Question", scale=2) | |
| # max_marks = gr.Slider(minimum=1, maximum=20, value=5, step=1, label="Max Marks") | |
| # a_input = gr.TextArea(label="Student Answer") | |
| # run_btn = gr.Button("Retrieve & Grade", variant="secondary") | |
| # with gr.Row(): | |
| # evidence_box = gr.Markdown(label="Context Used") | |
| # grade_box = gr.Markdown(label="Grading Result") | |
| # # Pass both inputs to the process_content function | |
| # upload_btn.click(system.process_content, inputs=[pdf_input, text_input], outputs=[status_msg]) | |
| # run_btn.click(system.process_query, inputs=[q_input, a_input, max_marks], outputs=[evidence_box, grade_box]) | |
| # if __name__ == "__main__": | |
| # demo.launch() | |
| import gradio as gr | |
| import fitz # PyMuPDF | |
| import torch | |
| import os | |
| import numpy as np | |
| import re | |
| from typing import List, Dict, Tuple, Optional | |
| # --- IMPORT SESSION OPTIONS --- | |
| from onnxruntime import SessionOptions, GraphOptimizationLevel | |
| # --- LANGCHAIN & RAG IMPORTS --- | |
| from langchain_text_splitters import RecursiveCharacterTextSplitter | |
| from langchain_community.vectorstores import FAISS | |
| from langchain_core.embeddings import Embeddings | |
| from langchain_core.documents import Document | |
| # --- ONNX & MODEL IMPORTS --- | |
| from transformers import AutoTokenizer | |
| from optimum.onnxruntime import ORTModelForFeatureExtraction, ORTModelForCausalLM, ORTModelForSequenceClassification | |
| from huggingface_hub import snapshot_download | |
| # Force CPU Provider | |
| PROVIDERS = ["CPUExecutionProvider"] | |
| print(f"β‘ Running on: {PROVIDERS}") | |
| # --------------------------------------------------------- | |
| # 1. OPTIMIZED EMBEDDINGS (BGE-SMALL) - UNCHANGED | |
| # --------------------------------------------------------- | |
| class OnnxBgeEmbeddings(Embeddings): | |
| def __init__(self): | |
| model_name = "Xenova/bge-small-en-v1.5" | |
| print(f"π Loading Embeddings: {model_name}...") | |
| self.tokenizer = AutoTokenizer.from_pretrained(model_name) | |
| self.model = ORTModelForFeatureExtraction.from_pretrained( | |
| model_name, | |
| export=False, | |
| provider=PROVIDERS[0] | |
| ) | |
| def _process_batch(self, texts): | |
| inputs = self.tokenizer(texts, padding=True, truncation=True, max_length=512, return_tensors="pt") | |
| with torch.no_grad(): | |
| outputs = self.model(**inputs) | |
| embeddings = outputs.last_hidden_state[:, 0] | |
| embeddings = torch.nn.functional.normalize(embeddings, p=2, dim=1) | |
| return embeddings.numpy().tolist() | |
| def embed_documents(self, texts): | |
| return self._process_batch(texts) | |
| def embed_query(self, text): | |
| return self._process_batch(["Represent this sentence for searching relevant passages: " + text])[0] | |
| # --------------------------------------------------------- | |
| # 2. NEW: ANSWER PRESENCE CHECKER | |
| # Paper insight: Prevent grading blank/missing answers | |
| # --------------------------------------------------------- | |
| class AnswerPresenceChecker: | |
| """Checks if a student answer actually exists and contains substance.""" | |
| def __init__(self): | |
| self.min_length = 10 # Minimum characters for valid answer | |
| self.min_words = 3 # Minimum words for valid answer | |
| def check_presence(self, student_answer: str) -> Tuple[bool, str]: | |
| """ | |
| Returns: (is_present, reason) | |
| """ | |
| if not student_answer or len(student_answer.strip()) == 0: | |
| return False, "Answer is empty" | |
| answer = student_answer.strip() | |
| # Check minimum length | |
| if len(answer) < self.min_length: | |
| return False, f"Answer too short ({len(answer)} chars, need {self.min_length})" | |
| # Check minimum word count | |
| words = answer.split() | |
| if len(words) < self.min_words: | |
| return False, f"Answer too brief ({len(words)} words, need {self.min_words})" | |
| # Check for placeholder text | |
| placeholder_patterns = [ | |
| r'^[.\s]*$', # Only dots/spaces | |
| r'^[?]+$', # Only question marks | |
| r'^(n/?a|na|idk|dunno)\s*$', # Common non-answers | |
| ] | |
| for pattern in placeholder_patterns: | |
| if re.match(pattern, answer.lower()): | |
| return False, "Answer appears to be placeholder text" | |
| return True, "Answer present and valid" | |
| # --------------------------------------------------------- | |
| # 3. ENHANCED LLM EVALUATOR WITH ENSEMBLE SUPPORT | |
| # Paper insights: Structured prompting, reference grounding, ensemble grading | |
| # --------------------------------------------------------- | |
| class LLMEvaluator: | |
| def __init__(self): | |
| self.repo_id = "onnx-community/Qwen2.5-0.5B-Instruct" | |
| self.local_dir = "onnx_qwen_local" | |
| print(f"π Preparing CPU LLM: {self.repo_id}...") | |
| if not os.path.exists(self.local_dir): | |
| print(f"π₯ Downloading FP16 model to {self.local_dir}...") | |
| snapshot_download( | |
| repo_id=self.repo_id, | |
| local_dir=self.local_dir, | |
| allow_patterns=["config.json", "generation_config.json", "tokenizer*", "special_tokens_map.json", "*.jinja", "onnx/model_fp16.onnx*"] | |
| ) | |
| print("β Download complete.") | |
| self.tokenizer = AutoTokenizer.from_pretrained(self.local_dir) | |
| sess_options = SessionOptions() | |
| sess_options.graph_optimization_level = GraphOptimizationLevel.ORT_DISABLE_ALL | |
| self.model = ORTModelForCausalLM.from_pretrained( | |
| self.local_dir, | |
| subfolder="onnx", | |
| file_name="model_fp16.onnx", | |
| use_cache=True, | |
| use_io_binding=False, | |
| provider=PROVIDERS[0], | |
| session_options=sess_options | |
| ) | |
| def evaluate_single(self, context: str, question: str, student_answer: str, | |
| max_marks: int, grader_id: int = 1, | |
| reference_summary: Optional[str] = None) -> Dict: | |
| """ | |
| Single grader evaluation with structured output. | |
| Paper insight: Use rigid templates with deterministic validation. | |
| Returns structured dict with: | |
| - analysis: str | |
| - score: int | |
| - raw_response: str | |
| """ | |
| # Enhanced system prompt with reference grounding | |
| system_prompt = f"""You are Grader #{grader_id}, a strict Logic Validator for educational assessment. | |
| YOUR GRADING ALGORITHM: | |
| 1. Compare Student Answer ONLY against the provided Context | |
| 2. IF Student Answer mentions facts NOT in Context β PENALTY (-50% of marks) | |
| 3. IF Student Answer contradicts the Context β PENALTY (-100% of marks) | |
| 4. IF Student Answer is vague/generic without specific facts β SCORE: 0-20% | |
| 5. IF Student Answer accurately reflects Context β SCORE: 80-100% | |
| CRITICAL RULES: | |
| [R1] Grade ONLY based on Context provided, not general knowledge | |
| [R2] Penalize hallucinations (facts not in Context) heavily | |
| [R3] Penalize contradictions (opposite meaning) completely | |
| [R4] Reward specific, accurate paraphrasing from Context | |
| [R5] Partial credit for partially correct answers | |
| OUTPUT FORMAT (MANDATORY): | |
| You MUST output in this exact format: | |
| ## Analysis | |
| [Your detailed comparison of Student Answer vs Context] | |
| ## Score | |
| [X]/{max_marks} | |
| Do NOT deviate from this format.""" | |
| # Add reference summary if provided (paper's key insight) | |
| reference_section = "" | |
| if reference_summary: | |
| reference_section = f""" | |
| ### REFERENCE SOLUTION (Perfect Answer Example): | |
| {reference_summary} | |
| Use this as calibration for what a 100% answer looks like.""" | |
| user_prompt = f""" | |
| ### Context (Retrieved from Source): | |
| {context} | |
| {reference_section} | |
| ### Question: | |
| {question} | |
| ### Student Answer: | |
| {student_answer} | |
| ### Maximum Marks: {max_marks} | |
| Provide your grading following the mandatory output format. | |
| """ | |
| messages = [ | |
| {"role": "system", "content": system_prompt}, | |
| {"role": "user", "content": user_prompt} | |
| ] | |
| input_text = self.tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True) | |
| inputs = self.tokenizer(input_text, return_tensors="pt") | |
| # Strict sampling for consistency | |
| with torch.no_grad(): | |
| outputs = self.model.generate( | |
| **inputs, | |
| max_new_tokens=200, # Increased for structured output | |
| temperature=0.1, # Very strict | |
| top_p=0.2, | |
| do_sample=True, | |
| repetition_penalty=1.2 | |
| ) | |
| input_length = inputs['input_ids'].shape[1] | |
| response = self.tokenizer.decode(outputs[0][input_length:], skip_special_tokens=True) | |
| # Parse structured output | |
| analysis, score = self._parse_response(response, max_marks) | |
| return { | |
| "grader_id": grader_id, | |
| "analysis": analysis, | |
| "score": score, | |
| "raw_response": response | |
| } | |
| def _parse_response(self, response: str, max_marks: int) -> Tuple[str, int]: | |
| """ | |
| Parse structured response to extract analysis and score. | |
| Paper insight: Deterministic parsing of rigid templates. | |
| """ | |
| # Extract score using regex | |
| score_pattern = r'##\s*Score\s*\n\s*\[?(\d+)\]?/\d+' | |
| score_match = re.search(score_pattern, response, re.IGNORECASE) | |
| if score_match: | |
| score = int(score_match.group(1)) | |
| score = min(score, max_marks) # Cap at max | |
| else: | |
| # Fallback: look for any number/max pattern | |
| fallback_pattern = r'(\d+)\s*/\s*\d+' | |
| fallback_match = re.search(fallback_pattern, response) | |
| if fallback_match: | |
| score = min(int(fallback_match.group(1)), max_marks) | |
| else: | |
| score = 0 # Default if parsing fails | |
| # Extract analysis | |
| analysis_pattern = r'##\s*Analysis\s*\n(.*?)(?=##\s*Score|$)' | |
| analysis_match = re.search(analysis_pattern, response, re.DOTALL | re.IGNORECASE) | |
| if analysis_match: | |
| analysis = analysis_match.group(1).strip() | |
| else: | |
| # Fallback: use everything before score section | |
| analysis = response.split('##')[0].strip() if '##' in response else response | |
| return analysis, score | |
| # --------------------------------------------------------- | |
| # 4. NEW: SUPERVISOR AGGREGATOR | |
| # Paper insight: Merge ensemble outputs into final decision | |
| # --------------------------------------------------------- | |
| # class SupervisorAggregator: | |
| # """ | |
| # Aggregates multiple grader outputs into a final consensus grade. | |
| # Paper uses another LLM call; we use statistical aggregation for CPU efficiency. | |
| # """ | |
| # def aggregate(self, grader_results: List[Dict], max_marks: int) -> Dict: | |
| # """ | |
| # Aggregate K=3 grader results into final score. | |
| # Returns: | |
| # - final_score: int (median of ensemble) | |
| # - disagreement: int (max - min score) | |
| # - needs_review: bool (high disagreement flag) | |
| # - consensus_analysis: str | |
| # """ | |
| # scores = [r['score'] for r in grader_results] | |
| # # Use median for robustness (paper uses supervisor LLM call) | |
| # final_score = int(np.median(scores)) | |
| # # Calculate disagreement | |
| # disagreement = max(scores) - min(scores) | |
| # # Flag for manual review if disagreement too high | |
| # # Paper uses Dmax thresholds; we use 40% of max marks | |
| # needs_review = disagreement >= (0.4 * max_marks) | |
| # # Merge analyses | |
| # consensus_analysis = self._merge_analyses(grader_results, final_score, disagreement) | |
| # return { | |
| # "final_score": final_score, | |
| # "individual_scores": scores, | |
| # "disagreement": disagreement, | |
| # "needs_review": needs_review, | |
| # "consensus_analysis": consensus_analysis, | |
| # "grader_details": grader_results | |
| # } | |
| # def _merge_analyses(self, results: List[Dict], final_score: int, disagreement: int) -> str: | |
| # """Create consensus analysis from multiple graders.""" | |
| # output = f"**Ensemble Grading Results** (Final: {final_score}, Disagreement: Β±{disagreement})\n\n" | |
| # for i, result in enumerate(results, 1): | |
| # output += f"**Grader {i} ({result['score']} points):**\n{result['analysis']}\n\n" | |
| # if disagreement > 0: | |
| # output += f"\nβ οΈ **Note:** Graders disagreed by {disagreement} points. " | |
| # if disagreement >= 5: | |
| # output += "Consider manual review." | |
| # return output | |
| # --------------------------------------------------------- | |
| # 5. ONNX RERANKER - UNCHANGED | |
| # --------------------------------------------------------- | |
| class OnnxReranker: | |
| def __init__(self): | |
| self.model_name = "Xenova/ms-marco-TinyBERT-L-2-v2" | |
| print(f"π Loading Reranker: {self.model_name}...") | |
| self.tokenizer = AutoTokenizer.from_pretrained(self.model_name) | |
| self.model = ORTModelForSequenceClassification.from_pretrained( | |
| self.model_name, | |
| export=False, | |
| provider=PROVIDERS[0] | |
| ) | |
| def rank(self, query, docs, top_k=3): | |
| if not docs: | |
| return [] | |
| pairs = [[query, doc.page_content] for doc in docs] | |
| inputs = self.tokenizer( | |
| pairs, | |
| padding=True, | |
| truncation=True, | |
| max_length=512, | |
| return_tensors="pt" | |
| ) | |
| with torch.no_grad(): | |
| outputs = self.model(**inputs) | |
| logits = outputs.logits | |
| if logits.shape[1] == 2: | |
| scores = logits[:, 1] | |
| else: | |
| scores = logits.flatten() | |
| scores = scores.numpy().tolist() | |
| doc_score_pairs = list(zip(docs, scores)) | |
| doc_score_pairs.sort(key=lambda x: x[1], reverse=True) | |
| return [doc for doc, score in doc_score_pairs[:top_k]] | |
| # --------------------------------------------------------- | |
| # 6. ENHANCED MAIN SYSTEM WITH MULTI-STAGE PIPELINE | |
| # --------------------------------------------------------- | |
| class EnhancedVectorSystem: | |
| def __init__(self): | |
| self.vector_store = None | |
| self.embeddings = OnnxBgeEmbeddings() | |
| self.llm = LLMEvaluator() | |
| self.reranker = OnnxReranker() | |
| self.presence_checker = AnswerPresenceChecker() | |
| # self.supervisor = SupervisorAggregator() | |
| self.all_chunks = [] | |
| self.total_chunks = 0 | |
| self.reference_summary = None # Store reference answer summary | |
| def process_content(self, file_obj, raw_text): | |
| has_file = file_obj is not None | |
| has_text = raw_text is not None and len(raw_text.strip()) > 0 | |
| if has_file and has_text: | |
| return "β Error: Please provide EITHER a file OR paste text, not both at the same time." | |
| if not has_file and not has_text: | |
| return "β οΈ No content provided. Please upload a file or paste text." | |
| try: | |
| text = "" | |
| if has_file: | |
| if file_obj.name.endswith('.pdf'): | |
| doc = fitz.open(file_obj.name) | |
| for page in doc: | |
| text += page.get_text() | |
| elif file_obj.name.endswith('.txt'): | |
| with open(file_obj.name, 'r', encoding='utf-8') as f: | |
| text = f.read() | |
| else: | |
| return "β Error: Only .pdf and .txt supported." | |
| else: | |
| text = raw_text | |
| # Smaller chunks for precision | |
| text_splitter = RecursiveCharacterTextSplitter(chunk_size=500, chunk_overlap=100) | |
| texts = text_splitter.split_text(text) | |
| self.all_chunks = texts | |
| docs = [Document(page_content=t, metadata={"id": i}) for i, t in enumerate(texts)] | |
| self.total_chunks = len(docs) | |
| if not docs: | |
| return "Content empty." | |
| self.vector_store = FAISS.from_documents(docs, self.embeddings) | |
| return f"β Indexed {self.total_chunks} chunks. Ready for grading." | |
| except Exception as e: | |
| return f"Error: {str(e)}" | |
| def set_reference_answer(self, reference_text: str) -> str: | |
| """ | |
| Set reference answer for grading calibration. | |
| Paper insight: Reference grounding prevents over-grading. | |
| """ | |
| if not reference_text or len(reference_text.strip()) == 0: | |
| self.reference_summary = None | |
| return "βΉοΈ Reference answer cleared." | |
| self.reference_summary = reference_text.strip() | |
| return f"β Reference answer set ({len(self.reference_summary)} chars). Will be used to calibrate grading." | |
| # def process_query(self, question, student_answer, max_marks, enable_ensemble=True): | |
| def process_query(self, question, student_answer, max_marks): | |
| """ | |
| Enhanced grading pipeline with multi-stage processing. | |
| """ | |
| if not self.vector_store: | |
| return "β οΈ Please upload a file or paste text first.", "" | |
| if not question: | |
| return "β οΈ Enter a question.", "" | |
| # Stage 1: Presence Check (Paper insight) | |
| is_present, presence_reason = self.presence_checker.check_presence(student_answer) | |
| if not is_present: | |
| return f"β οΈ **No valid answer detected:** {presence_reason}", f"**Score: 0/{max_marks}**\n\nNo answer to grade." | |
| # Stage 2: Retrieval + Reranking | |
| initial_docs = self.vector_store.similarity_search(question, k=15) | |
| top_docs = self.reranker.rank(question, initial_docs, top_k=3) | |
| expanded_context = "\n\n---\n\n".join([d.page_content for d in top_docs]) | |
| evidence_display = f"### π Retrieved Context (Top {len(top_docs)} chunks):\n" | |
| evidence_display += f"> {expanded_context[:500]}..." | |
| # Stage 3: Ensemble Grading (Paper's key innovation) | |
| # if not student_answer: | |
| # return evidence_display, "Please enter a student answer to grade." | |
| # if enable_ensemble: | |
| # # Run K=3 independent graders | |
| # grader_results = [] | |
| # for grader_id in range(1, 4): # K=3 ensemble | |
| # result = self.llm.evaluate_single( | |
| # context=expanded_context, | |
| # question=question, | |
| # student_answer=student_answer, | |
| # max_marks=max_marks, | |
| # grader_id=grader_id, | |
| # reference_summary=self.reference_summary | |
| # ) | |
| # grader_results.append(result) | |
| # # Stage 4: Supervisor Aggregation | |
| # final_result = self.supervisor.aggregate(grader_results, max_marks) | |
| # # Format output | |
| # llm_feedback = f"# π Final Grade: {final_result['final_score']}/{max_marks}\n\n" | |
| # if final_result['needs_review']: | |
| # llm_feedback += "β οΈ **Manual Review Recommended** (High grader disagreement)\n\n" | |
| # llm_feedback += final_result['consensus_analysis'] | |
| # # Add statistics | |
| # llm_feedback += f"\n\n---\n**Grading Statistics:**\n" | |
| # llm_feedback += f"- Individual Scores: {final_result['individual_scores']}\n" | |
| # llm_feedback += f"- Score Range: {min(final_result['individual_scores'])}-{max(final_result['individual_scores'])}\n" | |
| # llm_feedback += f"- Disagreement: Β±{final_result['disagreement']} points\n" | |
| # else: | |
| # # Single grader mode (for comparison) | |
| # result = self.llm.evaluate_single( | |
| # context=expanded_context, | |
| # question=question, | |
| # student_answer=student_answer, | |
| # max_marks=max_marks, | |
| # grader_id=1, | |
| # reference_summary=self.reference_summary | |
| # ) | |
| # llm_feedback = f"# π Grade: {result['score']}/{max_marks}\n\n{result['analysis']}" | |
| # return evidence_display, llm_feedback | |
| # Stage 3: Single Grading | |
| if not student_answer: | |
| return evidence_display, "Please enter a student answer to grade." | |
| # Single grader call | |
| result = self.llm.evaluate_single( | |
| context=expanded_context, | |
| question=question, | |
| student_answer=student_answer, | |
| max_marks=max_marks, | |
| grader_id=1, | |
| reference_summary=self.reference_summary | |
| ) | |
| llm_feedback = f"# π Grade: {result['score']}/{max_marks}\n\n{result['analysis']}" | |
| return evidence_display, llm_feedback | |
| # --------------------------------------------------------- | |
| # 7. GRADIO INTERFACE | |
| # --------------------------------------------------------- | |
| system = EnhancedVectorSystem() | |
| with gr.Blocks(title="EduGenius AI Grader - Enhanced", theme=gr.themes.Soft()) as demo: | |
| gr.Markdown("# β‘ EduGenius: Enhanced RAG-Based Grader") | |
| gr.Markdown("Powered by **Ensemble Grading**, **Reference Grounding** & **Presence Checking**") | |
| gr.Markdown("*Implements multi-stage pipeline from research: arXiv:2601.00730*") | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| gr.Markdown("### π Source Content") | |
| pdf_input = gr.File(label="Option A: Upload Document (PDF/TXT)") | |
| gr.Markdown("**OR**") | |
| text_input = gr.Textbox(label="Option B: Paste Text", placeholder="Paste context here...", lines=5) | |
| upload_btn = gr.Button("π₯ Index Content", variant="primary") | |
| status_msg = gr.Textbox(label="Status", interactive=False) | |
| gr.Markdown("---") | |
| gr.Markdown("### π― Reference Answer (Optional)") | |
| gr.Markdown("*Providing a reference answer improves grading accuracy*") | |
| reference_input = gr.Textbox( | |
| label="Perfect Answer Example", | |
| placeholder="What would a 100% answer look like?", | |
| lines=3 | |
| ) | |
| ref_btn = gr.Button("Set Reference", variant="secondary") | |
| ref_status = gr.Textbox(label="Reference Status", interactive=False) | |
| with gr.Column(scale=2): | |
| gr.Markdown("### β Grading Interface") | |
| with gr.Row(): | |
| q_input = gr.Textbox(label="Question", scale=2) | |
| max_marks = gr.Slider(minimum=1, maximum=20, value=5, step=1, label="Max Marks") | |
| a_input = gr.TextArea(label="Student Answer", lines=4) | |
| with gr.Row(): | |
| ensemble_check = gr.Checkbox(label="Enable Ensemble Grading (K=3)", value=True) | |
| run_btn = gr.Button("π Grade Answer", variant="primary", scale=2) | |
| gr.Markdown("---") | |
| with gr.Row(): | |
| with gr.Column(): | |
| evidence_box = gr.Markdown(label="π Retrieved Context") | |
| with gr.Column(): | |
| grade_box = gr.Markdown(label="π Grading Result") | |
| # Event handlers | |
| upload_btn.click( | |
| system.process_content, | |
| inputs=[pdf_input, text_input], | |
| outputs=[status_msg] | |
| ) | |
| ref_btn.click( | |
| system.set_reference_answer, | |
| inputs=[reference_input], | |
| outputs=[ref_status] | |
| ) | |
| # run_btn.click( | |
| # system.process_query, | |
| # inputs=[q_input, a_input, max_marks, ensemble_check], | |
| # outputs=[evidence_box, grade_box] | |
| # ) | |
| run_btn.click( | |
| system.process_query, | |
| inputs=[q_input, a_input, max_marks], # Removed ensemble_check | |
| outputs=[evidence_box, grade_box] | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch() |