| import gradio as gr |
| import fitz |
| import torch |
| import os |
| import re |
| import numpy as np |
| from collections import Counter |
| import onnxruntime as ort |
| from onnxruntime import SessionOptions, GraphOptimizationLevel |
| from langchain_text_splitters import RecursiveCharacterTextSplitter |
| from langchain_community.vectorstores import FAISS |
| from langchain_core.embeddings import Embeddings |
| from transformers import AutoTokenizer |
| from optimum.onnxruntime import ORTModelForFeatureExtraction, ORTModelForCausalLM |
| from huggingface_hub import snapshot_download |
| from sentence_transformers import SentenceTransformer |
|
|
| PROVIDERS = ["CPUExecutionProvider"] |
|
|
| |
| |
| |
| class OnnxBgeEmbeddings(Embeddings): |
| def __init__(self): |
| model_name = "Xenova/bge-small-en-v1.5" |
| print(f"π Loading Embeddings: {model_name}...") |
| self.tokenizer = AutoTokenizer.from_pretrained(model_name) |
| self.model = ORTModelForFeatureExtraction.from_pretrained( |
| model_name, export=False, provider=PROVIDERS[0] |
| ) |
|
|
| def _process_batch(self, texts): |
| inputs = self.tokenizer(texts, padding=True, truncation=True, max_length=512, return_tensors="pt") |
| with torch.no_grad(): |
| outputs = self.model(**inputs) |
| embeddings = outputs.last_hidden_state[:, 0] |
| embeddings = torch.nn.functional.normalize(embeddings, p=2, dim=1) |
| return embeddings.numpy().tolist() |
|
|
| def embed_documents(self, texts): |
| return self._process_batch(texts) |
|
|
| def embed_query(self, text): |
| return self._process_batch([text])[0] |
|
|
| |
| |
| |
| class RuleBasedGrader: |
| """ |
| Extracts key concepts from context and checks student answer coverage. |
| Works 100% on CPU, deterministic, explainable. |
| """ |
| |
| def __init__(self): |
| |
| |
| pass |
| |
| def extract_key_concepts(self, text, top_k=10): |
| """ |
| Extract key noun phrases and important terms from context. |
| Uses simple but effective heuristics. |
| """ |
| |
| text = re.sub(r'[^\w\s]', ' ', text.lower()) |
| words = text.split() |
| |
| |
| stopwords = {'the', 'a', 'an', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for', 'of', 'with', 'by', 'is', 'are', 'was', 'were', 'be', 'been', 'being', 'have', 'has', 'had', 'do', 'does', 'did', 'will', 'would', 'could', 'should', 'may', 'might', 'must', 'shall', 'can', 'need', 'dare', 'ought', 'used', 'it', 'this', 'that', 'these', 'those', 'i', 'you', 'he', 'she', 'we', 'they'} |
| |
| |
| words = [w for w in words if w not in stopwords and len(w) > 2] |
| word_freq = Counter(words) |
| |
| |
| bigrams = [f"{words[i]} {words[i+1]}" for i in range(len(words)-1)] |
| bigram_freq = Counter(bigrams) |
| |
| |
| concepts = [] |
| for word, count in word_freq.most_common(top_k): |
| if count > 1: |
| concepts.append(word) |
| |
| for bigram, count in bigram_freq.most_common(top_k//2): |
| if count > 1: |
| concepts.append(bigram) |
| |
| return list(set(concepts))[:top_k] |
| |
| def check_concept_coverage(self, student_answer, key_concepts): |
| """ |
| Check which key concepts from context appear in student answer. |
| Returns coverage score and missing concepts. |
| """ |
| student_lower = student_answer.lower() |
| found_concepts = [] |
| missing_concepts = [] |
| |
| for concept in key_concepts: |
| |
| if concept in student_lower: |
| found_concepts.append(concept) |
| else: |
| |
| concept_words = concept.split() |
| if all(any(word in student_lower for word in [cw, cw+'s', cw+'es', cw+'ed', cw+'ing']) for cw in concept_words): |
| found_concepts.append(concept) |
| else: |
| missing_concepts.append(concept) |
| |
| coverage = len(found_concepts) / len(key_concepts) if key_concepts else 0 |
| return coverage, found_concepts, missing_concepts |
| |
| def detect_contradictions(self, context, student_answer): |
| """ |
| Simple contradiction detection using negation patterns. |
| """ |
| context_lower = context.lower() |
| answer_lower = student_answer.lower() |
| |
| |
| negation_words = ['not', 'no', 'never', 'none', 'nothing', 'nobody', 'neither', 'nowhere', 'hardly', 'scarcely', 'barely', "doesn't", "isn't", "wasn't", "shouldn't", "wouldn't", "couldn't", "can't", "don't", "didn't", "hasn't", "haven't", "hadn't", "won't"] |
| |
| contradictions = [] |
| |
| |
| context_sentences = [s.strip() for s in context.split('.') if len(s.strip()) > 10] |
| |
| for sent in context_sentences: |
| sent_lower = sent.lower() |
| |
| for neg in negation_words: |
| if neg in sent_lower: |
| |
| positive_version = sent_lower.replace(neg, '').strip() |
| if any(word in answer_lower for word in positive_version.split()[:5]): |
| contradictions.append(f"Context says: '{sent}' but student contradicts this") |
| else: |
| |
| |
| pass |
| |
| return contradictions |
| |
| def calculate_semantic_similarity(self, context, student_answer, embeddings_model): |
| """ |
| Use embeddings to calculate semantic similarity. |
| """ |
| context_emb = embeddings_model.embed_query(context) |
| answer_emb = embeddings_model.embed_query(student_answer) |
| |
| |
| similarity = np.dot(context_emb, answer_emb) / (np.linalg.norm(context_emb) * np.linalg.norm(answer_emb)) |
| return float(similarity) |
| |
| def grade(self, context, question, student_answer, max_marks, embeddings_model): |
| """ |
| Main grading function combining multiple signals. |
| """ |
| |
| key_concepts = self.extract_key_concepts(context) |
| |
| |
| coverage, found, missing = self.check_concept_coverage(student_answer, key_concepts) |
| |
| |
| contradictions = self.detect_contradictions(context, student_answer) |
| |
| |
| semantic_sim = self.calculate_semantic_similarity(context, student_answer, embeddings_model) |
| |
| |
| |
| |
| |
| base_score = (coverage * 0.6 + semantic_sim * 0.4) * max_marks |
| |
| |
| contradiction_penalty = len(contradictions) * (max_marks * 0.5) |
| final_score = max(0, base_score - contradiction_penalty) |
| |
| |
| feedback = f""" |
| **Grading Analysis:** |
| |
| **Key Concepts Found ({len(found)}/{len(key_concepts)}):** {', '.join(found) if found else 'None'} |
| **Key Concepts Missing:** {', '.join(missing) if missing else 'None'} |
| |
| **Concept Coverage:** {coverage:.1%} |
| **Semantic Similarity:** {semantic_sim:.1%} |
| |
| **Contradictions Detected:** {len(contradictions)} |
| {chr(10).join(['- ' + c for c in contradictions]) if contradictions else 'None'} |
| |
| **Calculation:** ({coverage:.1%} Γ 0.6 + {semantic_sim:.1%} Γ 0.4) Γ {max_marks} - {contradiction_penalty:.1f} penalty = **{final_score:.1f}/{max_marks}** |
| """ |
| |
| return final_score, feedback |
|
|
| |
| |
| |
| class LLMEvaluator: |
| def __init__(self): |
| self.repo_id = "onnx-community/Qwen2.5-0.5B-Instruct" |
| self.local_dir = "onnx_qwen_local" |
| |
| if not os.path.exists(self.local_dir): |
| snapshot_download( |
| repo_id=self.repo_id, |
| local_dir=self.local_dir, |
| allow_patterns=["config.json", "generation_config.json", "tokenizer*", "special_tokens_map.json", "*.jinja", "onnx/model_fp16.onnx*"] |
| ) |
|
|
| self.tokenizer = AutoTokenizer.from_pretrained(self.local_dir) |
| |
| sess_options = SessionOptions() |
| sess_options.graph_optimization_level = GraphOptimizationLevel.ORT_DISABLE_ALL |
| |
| self.model = ORTModelForCausalLM.from_pretrained( |
| self.local_dir, |
| subfolder="onnx", |
| file_name="model_fp16.onnx", |
| use_cache=True, |
| use_io_binding=False, |
| provider=PROVIDERS[0], |
| session_options=sess_options |
| ) |
|
|
| def evaluate(self, context, question, student_answer, max_marks, rule_based_score): |
| """ |
| Use LLM only for ambiguous cases or to verify edge cases. |
| Simplified prompt for 0.5B model. |
| """ |
| |
| if rule_based_score == 0: |
| return "Score: 0/{max_marks}\nFeedback: Answer contains significant errors or contradictions with the reference text." |
| if rule_based_score == max_marks: |
| return "Score: {max_marks}/{max_marks}\nFeedback: Excellent answer that fully covers the reference material." |
| |
| |
| prompt = f"""Grade this answer based ONLY on the context provided. |
| |
| Context: {context[:500]} |
| Question: {question} |
| Student Answer: {student_answer} |
| |
| Rules: |
| 1. Give 0 if answer contradicts context or adds outside information |
| 2. Give full marks only if answer matches context exactly |
| 3. Give partial marks for partial matches |
| |
| Output exactly: |
| Score: X/{max_marks} |
| Feedback: One sentence explanation""" |
|
|
| inputs = self.tokenizer(prompt, return_tensors="pt", truncation=True, max_length=512) |
| |
| with torch.no_grad(): |
| outputs = self.model.generate( |
| **inputs, |
| max_new_tokens=50, |
| temperature=0.1, |
| do_sample=False, |
| pad_token_id=self.tokenizer.eos_token_id |
| ) |
| |
| response = self.tokenizer.decode(outputs[0], skip_special_tokens=True) |
| |
| response = response[len(self.tokenizer.decode(inputs['input_ids'][0], skip_special_tokens=True)):] |
| return response.strip() |
|
|
| |
| |
| |
| class VectorSystem: |
| def __init__(self): |
| self.vector_store = None |
| self.embeddings = OnnxBgeEmbeddings() |
| self.rule_grader = RuleBasedGrader() |
| self.llm = LLMEvaluator() |
| self.all_chunks = [] |
| self.total_chunks = 0 |
|
|
| def process_content(self, file_obj, raw_text): |
| has_file = file_obj is not None |
| has_text = raw_text is not None and len(raw_text.strip()) > 0 |
|
|
| if has_file and has_text: |
| return "β Error: Provide EITHER file OR text, not both." |
| |
| if not has_file and not has_text: |
| return "β οΈ No content provided." |
|
|
| try: |
| text = "" |
| if has_file: |
| if file_obj.name.endswith('.pdf'): |
| doc = fitz.open(file_obj.name) |
| for page in doc: |
| text += page.get_text() |
| elif file_obj.name.endswith('.txt'): |
| with open(file_obj.name, 'r', encoding='utf-8') as f: |
| text = f.read() |
| else: |
| return "β Only .pdf and .txt supported." |
| else: |
| text = raw_text |
|
|
| |
| text_splitter = RecursiveCharacterTextSplitter( |
| chunk_size=1000, |
| chunk_overlap=200, |
| separators=["\n\n", "\n", ". ", " ", ""] |
| ) |
| self.all_chunks = text_splitter.split_text(text) |
| self.total_chunks = len(self.all_chunks) |
| |
| if not self.all_chunks: |
| return "Content empty." |
|
|
| metadatas = [{"id": i} for i in range(self.total_chunks)] |
| self.vector_store = FAISS.from_texts( |
| self.all_chunks, |
| self.embeddings, |
| metadatas=metadatas |
| ) |
| |
| return f"β
Indexed {self.total_chunks} chunks." |
| except Exception as e: |
| return f"Error: {str(e)}" |
|
|
| def process_query(self, question, student_answer, max_marks): |
| if not self.vector_store: |
| return "β οΈ Upload content first.", "" |
| if not question: |
| return "β οΈ Enter a question.", "" |
| if not student_answer: |
| return "β οΈ Enter a student answer.", "" |
|
|
| |
| results = self.vector_store.similarity_search_with_score(question, k=2) |
| |
| |
| context_parts = [] |
| for doc, score in results: |
| context_parts.append(self.all_chunks[doc.metadata['id']]) |
| |
| expanded_context = "\n".join(context_parts) |
| |
| |
| score, feedback = self.rule_grader.grade( |
| expanded_context, |
| question, |
| student_answer, |
| max_marks, |
| self.embeddings |
| ) |
| |
| |
| |
| |
| |
| |
| |
| evidence_display = f"### π Context Used:\n{expanded_context[:800]}..." |
| grade_display = f"### π Grade: {score:.1f}/{max_marks}\n\n{feedback}" |
| |
| return evidence_display, grade_display |
|
|
| |
| system = VectorSystem() |
|
|
| with gr.Blocks(title="EduGenius AI Grader") as demo: |
| gr.Markdown("# β‘ EduGenius: CPU Optimized RAG") |
| gr.Markdown("Hybrid Rule-Based + LLM Grading (ONNX Optimized)") |
| |
| with gr.Row(): |
| with gr.Column(scale=1): |
| gr.Markdown("### Source Input") |
| pdf_input = gr.File(label="Upload Chapter (PDF/TXT)") |
| gr.Markdown("**OR**") |
| text_input = gr.Textbox( |
| label="Paste Context", |
| placeholder="Paste text here...", |
| lines=5 |
| ) |
| upload_btn = gr.Button("Index Content", variant="primary") |
| status_msg = gr.Textbox(label="Status", interactive=False) |
|
|
| with gr.Column(scale=2): |
| q_input = gr.Textbox(label="Question", scale=2) |
| max_marks = gr.Slider(minimum=1, maximum=20, value=5, step=1, label="Max Marks") |
| a_input = gr.TextArea(label="Student Answer", lines=5) |
| run_btn = gr.Button("Retrieve & Grade", variant="secondary") |
| |
| with gr.Row(): |
| evidence_box = gr.Markdown() |
| grade_box = gr.Markdown() |
|
|
| upload_btn.click( |
| system.process_content, |
| inputs=[pdf_input, text_input], |
| outputs=[status_msg] |
| ) |
| run_btn.click( |
| system.process_query, |
| inputs=[q_input, a_input, max_marks], |
| outputs=[evidence_box, grade_box] |
| ) |
|
|
| if __name__ == "__main__": |
| demo.launch() |