| import os |
| import streamlit as st |
| import pdfplumber |
| import pickle |
| import faiss |
| import numpy as np |
| import re |
| from langchain_community.vectorstores import FAISS |
| from langchain.text_splitter import RecursiveCharacterTextSplitter |
| from langchain_google_genai import GoogleGenerativeAIEmbeddings |
|
|
| |
| API_KEY = "" |
|
|
| |
| INDEX_COMPLETE = "index_complete" |
| INDEX_INDIVIDUAL = "index_individual" |
|
|
| |
| st.title("π Automated Answer Evaluation System") |
|
|
| |
| feature = st.sidebar.radio("Select Feature", ["Complete Template Answer Sheet", "Individual Question Answer PDFs"]) |
| precompute_button = st.sidebar.button("π Precompute Indexes") |
|
|
| |
| def extract_text_from_pdf(pdf_file): |
| """Extracts text from a PDF file.""" |
| text = "" |
| try: |
| with pdfplumber.open(pdf_file) as pdf_reader: |
| for page in pdf_reader.pages: |
| extracted = page.extract_text() |
| if extracted: |
| text += extracted |
| else: |
| st.warning(f"No text extracted from page in {pdf_file}. It might be an image-based PDF.") |
| except Exception as e: |
| st.error(f"Error extracting text from {pdf_file}: {e}") |
| return text.strip() |
|
|
| |
| def generate_index_complete(api_key): |
| """Generates FAISS index for the complete template answer sheet.""" |
| st.info("π Extracting text from complete template answer sheet...") |
| template_text = "" |
| found_files = [] |
| for file in os.listdir("."): |
| if file.startswith("complete_template") and file.endswith(".pdf"): |
| found_files.append(file) |
| template_text += extract_text_from_pdf(file) |
| |
| if not found_files: |
| st.error("β No PDF files starting with 'complete_template' found in the root directory.") |
| return |
| |
| st.info(f"Found files: {found_files}") |
| |
| if not template_text: |
| st.error("β No valid text extracted from the complete template PDF(s). Ensure the PDF contains selectable text, not just images.") |
| return |
|
|
| st.info("π Splitting text into smaller chunks...") |
| text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200) |
| text_chunks = text_splitter.split_text(template_text) |
|
|
| st.info("π Generating embeddings using Google AI...") |
| embeddings = GoogleGenerativeAIEmbeddings(model="models/embedding-001", google_api_key=api_key) |
|
|
| st.info("π Creating FAISS vector store...") |
| vector_store = FAISS.from_texts(text_chunks, embedding=embeddings) |
|
|
| st.info("πΎ Saving FAISS index...") |
| vector_store.save_local(INDEX_COMPLETE) |
|
|
| with open(f"{INDEX_COMPLETE}.pkl", "wb") as f: |
| pickle.dump(text_chunks, f) |
|
|
| st.success("β
FAISS index and metadata saved successfully for complete template!") |
|
|
| def generate_index_individual(api_key): |
| """Generates FAISS index for individual question answer PDFs.""" |
| st.info("π Extracting template answers...") |
| template_answers = {} |
| |
| for file in os.listdir("."): |
| if file.endswith(".pdf") and not file.startswith("complete_template"): |
| question_number = file.replace(".pdf", "").upper() |
| extracted_text = extract_text_from_pdf(file) |
| if extracted_text: |
| template_answers[question_number] = extracted_text |
| |
| if not template_answers: |
| st.error("β No valid individual question answer PDFs found or no text extracted.") |
| return |
|
|
| st.info("π Generating embeddings...") |
| embeddings = GoogleGenerativeAIEmbeddings(model="models/embedding-001", google_api_key=api_key) |
| |
| texts = list(template_answers.values()) |
| question_numbers = list(template_answers.keys()) |
|
|
| text_embeddings = np.array([embeddings.embed_query(text) for text in texts]).astype('float32') |
|
|
| st.info("π Creating FAISS index...") |
| dimension = text_embeddings.shape[1] |
| index = faiss.IndexFlatL2(dimension) |
| index.add(text_embeddings) |
|
|
| st.info("πΎ Saving FAISS index...") |
| faiss.write_index(index, f"{INDEX_INDIVIDUAL}.faiss") |
|
|
| with open(f"{INDEX_INDIVIDUAL}.pkl", "wb") as f: |
| pickle.dump(question_numbers, f) |
|
|
| st.success("β
Indexing complete for individual question answers!") |
|
|
| |
| def load_faiss_index(feature, api_key): |
| """Loads FAISS index and metadata based on the selected feature.""" |
| embeddings = GoogleGenerativeAIEmbeddings(model="models/embedding-001", google_api_key=api_key) |
| |
| if feature == "Complete Template Answer Sheet": |
| index_name = INDEX_COMPLETE |
| try: |
| vector_store = FAISS.load_local(index_name, embeddings, allow_dangerous_deserialization=True) |
| with open(f"{index_name}.pkl", "rb") as f: |
| text_chunks = pickle.load(f) |
| return vector_store, text_chunks, embeddings |
| except Exception as e: |
| st.warning(f"FAISS index not found for complete template: {e}. Please precompute.") |
| return None, None, None |
| else: |
| index_name = INDEX_INDIVIDUAL |
| try: |
| index = faiss.read_index(f"{index_name}.faiss") |
| with open(f"{index_name}.pkl", "rb") as f: |
| question_numbers = pickle.load(f) |
| return index, question_numbers, embeddings |
| except Exception as e: |
| st.warning(f"FAISS index not found for individual answers: {e}. Please precompute.") |
| return None, None, None |
|
|
| |
| def extract_student_answers(pdf_file): |
| """Extracts question-wise answers from student PDF.""" |
| text = extract_text_from_pdf(pdf_file) |
| answers = {} |
| pattern = r"(###\d+[A-Z])\s*(.+?)(?=###|\Z)" |
| matches = re.findall(pattern, text, re.DOTALL) |
| |
| for match in matches: |
| question, answer = match |
| answers[question.upper()] = answer.strip() |
| |
| return answers |
|
|
| |
| def extract_attempted_questions(student_answer): |
| """Extracts and counts the number of questions attempted by the student.""" |
| pattern = r"###\d+[A-Z]" |
| matches = re.findall(pattern, student_answer) |
| return len(matches) |
|
|
| |
| def compute_similarity_complete(student_answer, vector_store): |
| """Finds the most similar template answer and computes relevance score.""" |
| if not student_answer: |
| return "No answer provided.", 0.0 |
|
|
| results = vector_store.similarity_search_with_score(student_answer, k=1) |
| |
| if not results: |
| return "No relevant match found.", 0.0 |
| |
| matched_doc, score = results[0] |
| similarity_percentage = round((1 / (1 + score)) * 100, 2) |
| return matched_doc.page_content, similarity_percentage |
|
|
| |
| def evaluate_answers_complete(student_answer, vector_store, max_marks=5): |
| """Evaluates the student's answer and calculates marks.""" |
| if not student_answer: |
| return {"error": "No answer provided."}, 0.0 |
|
|
| matched_text, similarity_score = compute_similarity_complete(student_answer, vector_store) |
| marks_obtained = (similarity_score * max_marks) / 100 |
|
|
| decimal_part = marks_obtained - int(marks_obtained) |
| if decimal_part < 0.25: |
| marks_obtained = int(marks_obtained) |
| elif 0.25 <= decimal_part < 0.75: |
| marks_obtained = int(marks_obtained) + 0.5 |
| else: |
| marks_obtained = int(marks_obtained) + 1 |
|
|
| return { |
| "similarity": f"{similarity_score}%", |
| "marks_obtained": marks_obtained, |
| "max_marks": max_marks |
| }, marks_obtained |
|
|
| |
| def compute_similarity_individual(student_answer, index, question_numbers, embeddings): |
| """Finds most similar template answer and calculates similarity.""" |
| if not student_answer: |
| return "No answer provided.", 0.0 |
| |
| student_embedding = np.array(embeddings.embed_query(student_answer)).astype('float32').reshape(1, -1) |
| _, closest_idx = index.search(student_embedding, 1) |
| matched_question = question_numbers[closest_idx[0][0]] |
| return matched_question, (1 / (1 + _[0][0])) * 100 |
|
|
| |
| def evaluate_answers(student_answers, index, question_numbers, embeddings, max_marks=5): |
| results = {} |
| for question, student_answer in student_answers.items(): |
| matched_question, similarity = compute_similarity_individual(student_answer, index, question_numbers, embeddings) |
| marks_obtained = (similarity * max_marks) / 100 |
|
|
| decimal_part = marks_obtained - int(marks_obtained) |
| if decimal_part < 0.25: |
| marks_obtained = int(marks_obtained) |
| elif 0.25 <= decimal_part < 0.75: |
| marks_obtained = int(marks_obtained) + 0.5 |
| else: |
| marks_obtained = int(marks_obtained) + 1 |
|
|
| results[question] = { |
| "similarity": f"{round(similarity, 2)}%", |
| "marks_obtained": marks_obtained, |
| "max_marks": max_marks |
| } |
| return results |
|
|
| |
| if precompute_button: |
| with st.spinner("Precomputing indexes..."): |
| generate_index_complete(API_KEY) |
| generate_index_individual(API_KEY) |
|
|
| |
| index, metadata, embeddings = load_faiss_index(feature, API_KEY) |
|
|
| uploaded_file = st.file_uploader("π Upload Student Answer Sheet (PDF)", type="pdf") |
|
|
| if uploaded_file: |
| if index is None: |
| st.error("Indexes not precomputed. Please click 'Precompute Indexes' first.") |
| else: |
| if feature == "Complete Template Answer Sheet": |
| with st.spinner("Extracting text from student answer..."): |
| student_answer = extract_text_from_pdf(uploaded_file) |
| |
| if student_answer: |
| st.text_area("π Extracted Student Answer:", student_answer, height=150) |
| num_attempted = extract_attempted_questions(student_answer) |
| st.write(f"**Number of questions attempted:** {num_attempted}") |
|
|
| if st.button("π Check Similarity"): |
| with st.spinner("Comparing with template answer..."): |
| result, marks_obtained = evaluate_answers_complete(student_answer, index) |
| st.subheader("π Similarity Score:") |
| st.write(f"**{result['similarity']} relevant to the template answer.**") |
| st.subheader("π Student's Score:") |
| st.write(f"**{marks_obtained*num_attempted} Marks** (Out of {num_attempted * 5} Marks)") |
| else: |
| with st.spinner("Extracting text from student answer sheet..."): |
| student_answers = extract_student_answers(uploaded_file) |
| |
| if student_answers: |
| st.text_area("π Extracted Student Answers:", "\n".join(f"{q}: {a}" for q, a in student_answers.items()), height=150) |
|
|
| if st.button("π Evaluate Answers"): |
| with st.spinner("Comparing answers with templates..."): |
| results = evaluate_answers(student_answers, index, metadata, embeddings) |
| st.subheader("π Score Breakdown:") |
| st.json(results) |
| total_marks = sum(v["marks_obtained"] for v in results.values()) |
| total_questions = len(results) |
| marks_per_question = 5 |
| total_marks_obtainable = total_questions * marks_per_question |
| st.subheader(f"π Total Score: {total_marks} Marks (Out of {total_marks_obtainable} Marks)") |