example / app.py
heerjtdev's picture
Update app.py
95abb5a verified
raw
history blame
16.8 kB
import gradio as gr
import fitz
import torch
import os
import re
import numpy as np
from collections import Counter
import onnxruntime as ort
from onnxruntime import SessionOptions, GraphOptimizationLevel
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_community.vectorstores import FAISS
from langchain_core.embeddings import Embeddings
from transformers import AutoTokenizer
from optimum.onnxruntime import ORTModelForFeatureExtraction, ORTModelForCausalLM
from huggingface_hub import snapshot_download
from sentence_transformers import SentenceTransformer # Add this for cross-encoder
PROVIDERS = ["CPUExecutionProvider"]
# ---------------------------------------------------------
# 1. EMBEDDINGS (Your existing code - good)
# ---------------------------------------------------------
class OnnxBgeEmbeddings(Embeddings):
def __init__(self):
model_name = "Xenova/bge-small-en-v1.5"
print(f"πŸ”„ Loading Embeddings: {model_name}...")
self.tokenizer = AutoTokenizer.from_pretrained(model_name)
self.model = ORTModelForFeatureExtraction.from_pretrained(
model_name, export=False, provider=PROVIDERS[0]
)
def _process_batch(self, texts):
inputs = self.tokenizer(texts, padding=True, truncation=True, max_length=512, return_tensors="pt")
with torch.no_grad():
outputs = self.model(**inputs)
embeddings = outputs.last_hidden_state[:, 0]
embeddings = torch.nn.functional.normalize(embeddings, p=2, dim=1)
return embeddings.numpy().tolist()
def embed_documents(self, texts):
return self._process_batch(texts)
def embed_query(self, text):
return self._process_batch([text])[0]
# ---------------------------------------------------------
# 2. RULE-BASED GRADING ENGINE (NEW - No LLM needed)
# ---------------------------------------------------------
class RuleBasedGrader:
"""
Extracts key concepts from context and checks student answer coverage.
Works 100% on CPU, deterministic, explainable.
"""
def __init__(self):
# Load a small NER or keyword extraction model if needed
# Or use simple TF-IDF/RAKE algorithm
pass
def extract_key_concepts(self, text, top_k=10):
"""
Extract key noun phrases and important terms from context.
Uses simple but effective heuristics.
"""
# Clean text
text = re.sub(r'[^\w\s]', ' ', text.lower())
words = text.split()
# Remove stopwords
stopwords = {'the', 'a', 'an', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for', 'of', 'with', 'by', 'is', 'are', 'was', 'were', 'be', 'been', 'being', 'have', 'has', 'had', 'do', 'does', 'did', 'will', 'would', 'could', 'should', 'may', 'might', 'must', 'shall', 'can', 'need', 'dare', 'ought', 'used', 'it', 'this', 'that', 'these', 'those', 'i', 'you', 'he', 'she', 'we', 'they'}
# Get word frequencies (excluding stopwords)
words = [w for w in words if w not in stopwords and len(w) > 2]
word_freq = Counter(words)
# Get bigrams (two-word phrases)
bigrams = [f"{words[i]} {words[i+1]}" for i in range(len(words)-1)]
bigram_freq = Counter(bigrams)
# Combine unigrams and bigrams
concepts = []
for word, count in word_freq.most_common(top_k):
if count > 1: # Only include words that appear multiple times
concepts.append(word)
for bigram, count in bigram_freq.most_common(top_k//2):
if count > 1:
concepts.append(bigram)
return list(set(concepts))[:top_k] # Remove duplicates, limit to top_k
def check_concept_coverage(self, student_answer, key_concepts):
"""
Check which key concepts from context appear in student answer.
Returns coverage score and missing concepts.
"""
student_lower = student_answer.lower()
found_concepts = []
missing_concepts = []
for concept in key_concepts:
# Check for exact match or partial match
if concept in student_lower:
found_concepts.append(concept)
else:
# Check for word stems (e.g., "running" matches "run")
concept_words = concept.split()
if all(any(word in student_lower for word in [cw, cw+'s', cw+'es', cw+'ed', cw+'ing']) for cw in concept_words):
found_concepts.append(concept)
else:
missing_concepts.append(concept)
coverage = len(found_concepts) / len(key_concepts) if key_concepts else 0
return coverage, found_concepts, missing_concepts
def detect_contradictions(self, context, student_answer):
"""
Simple contradiction detection using negation patterns.
"""
context_lower = context.lower()
answer_lower = student_answer.lower()
# Common negation patterns
negation_words = ['not', 'no', 'never', 'none', 'nothing', 'nobody', 'neither', 'nowhere', 'hardly', 'scarcely', 'barely', "doesn't", "isn't", "wasn't", "shouldn't", "wouldn't", "couldn't", "can't", "don't", "didn't", "hasn't", "haven't", "hadn't", "won't"]
contradictions = []
# Extract sentences from context that contain key facts
context_sentences = [s.strip() for s in context.split('.') if len(s.strip()) > 10]
for sent in context_sentences:
sent_lower = sent.lower()
# Check if student says opposite
for neg in negation_words:
if neg in sent_lower:
# Context has negation, check if student affirms
positive_version = sent_lower.replace(neg, '').strip()
if any(word in answer_lower for word in positive_version.split()[:5]):
contradictions.append(f"Context says: '{sent}' but student contradicts this")
else:
# Context is positive, check if student negates
# This is harder - would need semantic understanding
pass
return contradictions
def calculate_semantic_similarity(self, context, student_answer, embeddings_model):
"""
Use embeddings to calculate semantic similarity.
"""
context_emb = embeddings_model.embed_query(context)
answer_emb = embeddings_model.embed_query(student_answer)
# Cosine similarity
similarity = np.dot(context_emb, answer_emb) / (np.linalg.norm(context_emb) * np.linalg.norm(answer_emb))
return float(similarity)
def grade(self, context, question, student_answer, max_marks, embeddings_model):
"""
Main grading function combining multiple signals.
"""
# 1. Extract key concepts from context
key_concepts = self.extract_key_concepts(context)
# 2. Check concept coverage
coverage, found, missing = self.check_concept_coverage(student_answer, key_concepts)
# 3. Check for contradictions
contradictions = self.detect_contradictions(context, student_answer)
# 4. Calculate semantic similarity
semantic_sim = self.calculate_semantic_similarity(context, student_answer, embeddings_model)
# 5. Calculate final score
# Weight: 60% concept coverage, 40% semantic similarity
# Penalty for contradictions: -50% per contradiction
base_score = (coverage * 0.6 + semantic_sim * 0.4) * max_marks
# Apply contradiction penalties
contradiction_penalty = len(contradictions) * (max_marks * 0.5)
final_score = max(0, base_score - contradiction_penalty)
# Generate feedback
feedback = f"""
**Grading Analysis:**
**Key Concepts Found ({len(found)}/{len(key_concepts)}):** {', '.join(found) if found else 'None'}
**Key Concepts Missing:** {', '.join(missing) if missing else 'None'}
**Concept Coverage:** {coverage:.1%}
**Semantic Similarity:** {semantic_sim:.1%}
**Contradictions Detected:** {len(contradictions)}
{chr(10).join(['- ' + c for c in contradictions]) if contradictions else 'None'}
**Calculation:** ({coverage:.1%} Γ— 0.6 + {semantic_sim:.1%} Γ— 0.4) Γ— {max_marks} - {contradiction_penalty:.1f} penalty = **{final_score:.1f}/{max_marks}**
"""
return final_score, feedback
# ---------------------------------------------------------
# 3. LLM EVALUATOR (Fallback for edge cases)
# ---------------------------------------------------------
class LLMEvaluator:
def __init__(self):
self.repo_id = "onnx-community/Qwen2.5-0.5B-Instruct"
self.local_dir = "onnx_qwen_local"
if not os.path.exists(self.local_dir):
snapshot_download(
repo_id=self.repo_id,
local_dir=self.local_dir,
allow_patterns=["config.json", "generation_config.json", "tokenizer*", "special_tokens_map.json", "*.jinja", "onnx/model_fp16.onnx*"]
)
self.tokenizer = AutoTokenizer.from_pretrained(self.local_dir)
sess_options = SessionOptions()
sess_options.graph_optimization_level = GraphOptimizationLevel.ORT_DISABLE_ALL
self.model = ORTModelForCausalLM.from_pretrained(
self.local_dir,
subfolder="onnx",
file_name="model_fp16.onnx",
use_cache=True,
use_io_binding=False,
provider=PROVIDERS[0],
session_options=sess_options
)
def evaluate(self, context, question, student_answer, max_marks, rule_based_score):
"""
Use LLM only for ambiguous cases or to verify edge cases.
Simplified prompt for 0.5B model.
"""
# If rule-based gave clear 0 or max, don't bother with LLM
if rule_based_score == 0:
return "Score: 0/{max_marks}\nFeedback: Answer contains significant errors or contradictions with the reference text."
if rule_based_score == max_marks:
return "Score: {max_marks}/{max_marks}\nFeedback: Excellent answer that fully covers the reference material."
# Otherwise, use LLM for nuanced cases
prompt = f"""Grade this answer based ONLY on the context provided.
Context: {context[:500]}
Question: {question}
Student Answer: {student_answer}
Rules:
1. Give 0 if answer contradicts context or adds outside information
2. Give full marks only if answer matches context exactly
3. Give partial marks for partial matches
Output exactly:
Score: X/{max_marks}
Feedback: One sentence explanation"""
inputs = self.tokenizer(prompt, return_tensors="pt", truncation=True, max_length=512)
with torch.no_grad():
outputs = self.model.generate(
**inputs,
max_new_tokens=50,
temperature=0.1,
do_sample=False,
pad_token_id=self.tokenizer.eos_token_id
)
response = self.tokenizer.decode(outputs[0], skip_special_tokens=True)
# Extract just the generated part (after the prompt)
response = response[len(self.tokenizer.decode(inputs['input_ids'][0], skip_special_tokens=True)):]
return response.strip()
# ---------------------------------------------------------
# 4. MAIN APPLICATION
# ---------------------------------------------------------
class VectorSystem:
def __init__(self):
self.vector_store = None
self.embeddings = OnnxBgeEmbeddings()
self.rule_grader = RuleBasedGrader()
self.llm = LLMEvaluator()
self.all_chunks = []
self.total_chunks = 0
def process_content(self, file_obj, raw_text):
has_file = file_obj is not None
has_text = raw_text is not None and len(raw_text.strip()) > 0
if has_file and has_text:
return "❌ Error: Provide EITHER file OR text, not both."
if not has_file and not has_text:
return "⚠️ No content provided."
try:
text = ""
if has_file:
if file_obj.name.endswith('.pdf'):
doc = fitz.open(file_obj.name)
for page in doc:
text += page.get_text()
elif file_obj.name.endswith('.txt'):
with open(file_obj.name, 'r', encoding='utf-8') as f:
text = f.read()
else:
return "❌ Only .pdf and .txt supported."
else:
text = raw_text
# Larger chunks for better context
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200,
separators=["\n\n", "\n", ". ", " ", ""]
)
self.all_chunks = text_splitter.split_text(text)
self.total_chunks = len(self.all_chunks)
if not self.all_chunks:
return "Content empty."
metadatas = [{"id": i} for i in range(self.total_chunks)]
self.vector_store = FAISS.from_texts(
self.all_chunks,
self.embeddings,
metadatas=metadatas
)
return f"βœ… Indexed {self.total_chunks} chunks."
except Exception as e:
return f"Error: {str(e)}"
def process_query(self, question, student_answer, max_marks):
if not self.vector_store:
return "⚠️ Upload content first.", ""
if not question:
return "⚠️ Enter a question.", ""
if not student_answer:
return "⚠️ Enter a student answer.", ""
# Retrieve relevant context
results = self.vector_store.similarity_search_with_score(question, k=2)
# Combine top 2 chunks for better context
context_parts = []
for doc, score in results:
context_parts.append(self.all_chunks[doc.metadata['id']])
expanded_context = "\n".join(context_parts)
# Use rule-based grading (fast, deterministic)
score, feedback = self.rule_grader.grade(
expanded_context,
question,
student_answer,
max_marks,
self.embeddings
)
# Optional: Use LLM for ambiguous cases (score between 20-80%)
# Uncomment if you want LLM verification
# if 0.2 < (score/max_marks) < 0.8:
# llm_feedback = self.llm.evaluate(expanded_context, question, student_answer, max_marks, score)
# feedback += f"\n\n**LLM Verification:**\n{llm_feedback}"
evidence_display = f"### πŸ“š Context Used:\n{expanded_context[:800]}..."
grade_display = f"### πŸ“ Grade: {score:.1f}/{max_marks}\n\n{feedback}"
return evidence_display, grade_display
# Initialize and launch
system = VectorSystem()
with gr.Blocks(title="EduGenius AI Grader") as demo:
gr.Markdown("# ⚑ EduGenius: CPU Optimized RAG")
gr.Markdown("Hybrid Rule-Based + LLM Grading (ONNX Optimized)")
with gr.Row():
with gr.Column(scale=1):
gr.Markdown("### Source Input")
pdf_input = gr.File(label="Upload Chapter (PDF/TXT)")
gr.Markdown("**OR**")
text_input = gr.Textbox(
label="Paste Context",
placeholder="Paste text here...",
lines=5
)
upload_btn = gr.Button("Index Content", variant="primary")
status_msg = gr.Textbox(label="Status", interactive=False)
with gr.Column(scale=2):
q_input = gr.Textbox(label="Question", scale=2)
max_marks = gr.Slider(minimum=1, maximum=20, value=5, step=1, label="Max Marks")
a_input = gr.TextArea(label="Student Answer", lines=5)
run_btn = gr.Button("Retrieve & Grade", variant="secondary")
with gr.Row():
evidence_box = gr.Markdown()
grade_box = gr.Markdown()
upload_btn.click(
system.process_content,
inputs=[pdf_input, text_input],
outputs=[status_msg]
)
run_btn.click(
system.process_query,
inputs=[q_input, a_input, max_marks],
outputs=[evidence_box, grade_box]
)
if __name__ == "__main__":
demo.launch()