Spaces:
Sleeping
Sleeping
| from typing import Optional, Dict | |
| import streamlit as st | |
| import requests | |
| import json | |
| import fitz # PyMuPDF | |
| from fpdf import FPDF | |
| import os | |
| import tempfile | |
| from dotenv import load_dotenv | |
| import torch | |
| from transformers import DistilBertForSequenceClassification, DistilBertTokenizer | |
| from torch.nn.functional import softmax | |
| from doctr.models import ocr_predictor | |
| from doctr.io import DocumentFile | |
| import tempfile | |
| load_dotenv() | |
| model = DistilBertForSequenceClassification.from_pretrained('./fine_tuned_distilbert') | |
| tokenizer = DistilBertTokenizer.from_pretrained('./fine_tuned_distilbert') | |
| device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| model.to(device) | |
| mapping = {"Remembering": 0, "Understanding": 1, "Applying": 2, "Analyzing": 3, "Evaluating": 4, "Creating": 5} | |
| reverse_mapping = {v: k for k, v in mapping.items()} | |
| modelocr = ocr_predictor(det_arch='db_resnet50', reco_arch='crnn_vgg16_bn', pretrained=True) | |
| def save_uploaded_file(uploaded_file): | |
| if uploaded_file is not None: | |
| file_extension = uploaded_file.name.split('.')[-1].lower() | |
| temp_file = tempfile.NamedTemporaryFile(delete=False, suffix = f'.{file_extension}') | |
| temp_file.write(uploaded_file.getvalue()) | |
| temp_file.close() | |
| return temp_file.name | |
| return None | |
| # Previous functions from Question Generator | |
| def get_pdf_path(pdf_source=None, uploaded_file=None): | |
| try: | |
| # If a file is uploaded locally | |
| if uploaded_file is not None: | |
| # Create a temporary file to save the uploaded PDF | |
| temp_dir = tempfile.mkdtemp() | |
| pdf_path = os.path.join(temp_dir, uploaded_file.name) | |
| # Save the uploaded file | |
| with open(pdf_path, "wb") as pdf_file: | |
| pdf_file.write(uploaded_file.getvalue()) | |
| return pdf_path | |
| # If a URL is provided | |
| if pdf_source: | |
| response = requests.get(pdf_source, timeout=30) | |
| response.raise_for_status() | |
| # Create a temporary file | |
| temp_dir = tempfile.mkdtemp() | |
| pdf_path = os.path.join(temp_dir, "downloaded.pdf") | |
| with open(pdf_path, "wb") as pdf_file: | |
| pdf_file.write(response.content) | |
| return pdf_path | |
| # If no source is provided | |
| st.error("No PDF source provided.") | |
| return None | |
| except Exception as e: | |
| st.error(f"Error getting PDF: {e}") | |
| return None | |
| def extract_text_pymupdf(pdf_path): | |
| try: | |
| doc = fitz.open(pdf_path) | |
| pages_content = [] | |
| for page_num in range(len(doc)): | |
| page = doc[page_num] | |
| pages_content.append(page.get_text()) | |
| doc.close() | |
| return " ".join(pages_content) # Join all pages into one large context string | |
| except Exception as e: | |
| st.error(f"Error extracting text from PDF: {e}") | |
| return "" | |
| def get_bloom_taxonomy_scores(question: str) -> Dict[str, float]: | |
| # Default scores in case of API failure | |
| default_scores = { | |
| "Remembering": 0.2, | |
| "Understanding": 0.2, | |
| "Applying": 0.15, | |
| "Analyzing": 0.15, | |
| "Evaluating": 0.15, | |
| "Creating": 0.15 | |
| } | |
| try: | |
| scores = predict_with_loaded_model(question) | |
| for key, value in scores.items(): | |
| if not (0 <= value <= 1): | |
| st.warning(f"Invalid score value for {key}. Using default scores.") | |
| return default_scores | |
| return scores | |
| except Exception as e: | |
| st.warning(f"Unexpected error: {e}. Using default scores.") | |
| return default_scores | |
| def generate_ai_response(api_key, assistant_context, user_query, role_description, response_instructions, bloom_taxonomy_weights, num_questions, question_length, include_numericals, user_input): | |
| try: | |
| url = f"https://generativelanguage.googleapis.com/v1beta/models/gemini-2.0-flash:generateContent?key={api_key}" | |
| # Define length guidelines | |
| length_guidelines = { | |
| "Short": "Keep questions concise, around 10-15 words each.", | |
| "Medium": "Create moderately detailed questions, around 20-25 words each.", | |
| "Long": "Generate detailed, comprehensive questions, around 30-40 words each that may include multiple parts." | |
| } | |
| prompt = f""" | |
| You are a highly knowledgeable assistant. Your task is to assist the user with the following context from an academic paper. | |
| **Role**: {role_description} | |
| **Context**: {assistant_context} | |
| **User Query**: {user_input} | |
| **Instructions**: {response_instructions} | |
| Question Length Requirement: {length_guidelines[question_length]} | |
| **Bloom's Taxonomy Weights**: | |
| Knowledge: {bloom_taxonomy_weights['Knowledge']}% | |
| Comprehension: {bloom_taxonomy_weights['Comprehension']}% | |
| Application: {bloom_taxonomy_weights['Application']}% | |
| Analysis: {bloom_taxonomy_weights['Analysis']}% | |
| Synthesis: {bloom_taxonomy_weights['Synthesis']}% | |
| Evaluation: {bloom_taxonomy_weights['Evaluation']}% | |
| **Query**: {user_query} | |
| **Number of Questions**: {num_questions} | |
| **Include Numericals**: {include_numericals} | |
| """ | |
| payload = { | |
| "contents": [ | |
| { | |
| "parts": [ | |
| {"text": prompt} | |
| ] | |
| } | |
| ] | |
| } | |
| headers = {"Content-Type": "application/json"} | |
| response = requests.post(url, headers=headers, data=json.dumps(payload), timeout=60) | |
| response.raise_for_status() | |
| result = response.json() | |
| questions = result.get("candidates", [{}])[0].get("content", {}).get("parts", [{}])[0].get("text", "") | |
| questions_list = [question.strip() for question in questions.split("\n") if question.strip()] | |
| # Get Bloom's taxonomy scores for each question with progress bar | |
| questions_with_scores = [] | |
| progress_bar = st.progress(0) | |
| for idx, question in enumerate(questions_list): | |
| scores = get_bloom_taxonomy_scores(question) | |
| if scores: # Only add questions that got valid scores | |
| questions_with_scores.append((question, scores)) | |
| progress_bar.progress((idx + 1) / len(questions_list)) | |
| if not questions_with_scores: | |
| st.warning("Could not get Bloom's Taxonomy scores for any questions. Using default scores.") | |
| # Use default scores if no scores were obtained | |
| questions_with_scores = [(q, get_bloom_taxonomy_scores("")) for q in questions_list] | |
| # Update session state with scores | |
| st.session_state.question_scores = {q: s for q, s in questions_with_scores} | |
| # Return just the questions | |
| return [q for q, _ in questions_with_scores] | |
| except requests.RequestException as e: | |
| st.error(f"API request error: {e}") | |
| return [] | |
| except Exception as e: | |
| st.error(f"Error generating questions: {e}") | |
| return [] | |
| def normalize_bloom_weights(bloom_weights): | |
| total = sum(bloom_weights.values()) | |
| if total != 100: | |
| normalization_factor = 100 / total | |
| # Normalize each weight by multiplying it by the normalization factor | |
| bloom_weights = {key: round(value * normalization_factor, 2) for key, value in bloom_weights.items()} | |
| return bloom_weights | |
| def generate_pdf(questions, filename="questions.pdf"): | |
| try: | |
| pdf = FPDF() | |
| pdf.set_auto_page_break(auto=True, margin=15) | |
| pdf.add_page() | |
| # Set font | |
| pdf.add_font("ArialUnicode", "", "ArialUnicodeMS.ttf", uni=True) | |
| pdf.set_font("ArialUnicode", size=12) | |
| # Add a title or heading | |
| pdf.cell(200, 10, txt="Generated Questions", ln=True, align="C") | |
| # Add space between title and questions | |
| pdf.ln(10) | |
| # Loop through questions and add them to the PDF | |
| for i, question in enumerate(questions, 1): | |
| # Using multi_cell for wrapping the text in case it's too long | |
| pdf.multi_cell(0, 10, f"Q{i}: {question}") | |
| # Save the generated PDF to the file | |
| pdf.output(filename) | |
| return filename | |
| except Exception as e: | |
| st.error(f"Error generating PDF: {e}") | |
| return None | |
| def process_pdf_and_generate_questions(pdf_source, uploaded_file, api_key, role_description, response_instructions, bloom_taxonomy_weights, num_questions, question_length, include_numericals, user_input): | |
| try: | |
| pdf_path = get_pdf_path(pdf_source, uploaded_file) | |
| if not pdf_path: | |
| return [] | |
| # Extract text | |
| pdf_text = extract_text_pymupdf(pdf_path) | |
| if not pdf_text: | |
| return [] | |
| # Generate questions | |
| assistant_context = pdf_text | |
| user_query = "Generate questions based on the above context." | |
| normalized_bloom_weights = normalize_bloom_weights(bloom_taxonomy_weights) | |
| questions = generate_ai_response( | |
| api_key, | |
| assistant_context, | |
| user_query, | |
| role_description, | |
| response_instructions, | |
| normalized_bloom_weights, | |
| num_questions, | |
| question_length, | |
| include_numericals, | |
| user_input | |
| ) | |
| # Clean up temporary PDF file | |
| try: | |
| os.remove(pdf_path) | |
| # Remove the temporary directory | |
| os.rmdir(os.path.dirname(pdf_path)) | |
| except Exception as e: | |
| st.warning(f"Could not delete temporary PDF file: {e}") | |
| return questions | |
| except Exception as e: | |
| st.error(f"Error processing PDF and generating questions: {e}") | |
| return [] | |
| def get_bloom_taxonomy_details(question_scores: Optional[Dict[str, float]] = None) -> str: | |
| """ | |
| Generate a detailed explanation of Bloom's Taxonomy scores. | |
| Handles missing or invalid scores gracefully. | |
| """ | |
| try: | |
| if question_scores is None or not isinstance(question_scores, dict): | |
| return "Bloom's Taxonomy scores not available" | |
| # Validate scores | |
| valid_categories = {"Remembering", "Understanding", "Applying", | |
| "Analyzing", "Evaluating", "Creating"} | |
| if not all(isinstance(score, (int, float)) for score in question_scores.values()): | |
| return "Invalid score values detected" | |
| if not all(category in valid_categories for category in question_scores.keys()): | |
| return "Invalid score categories detected" | |
| details_text = "Bloom's Taxonomy Analysis:\n\n" | |
| try: | |
| # Sort scores by value in descending order | |
| sorted_scores = sorted(question_scores.items(), key=lambda x: x[1], reverse=True) | |
| # Format each score as a percentage | |
| for category, score in sorted_scores: | |
| percentage = min(max(score * 100, 0), 100) # Ensure percentage is between 0 and 100 | |
| details_text += f"{category}: {percentage:.1f}%\n" | |
| # Add the predicted level | |
| predicted_level = max(question_scores.items(), key=lambda x: x[1])[0] | |
| details_text += f"\nPredicted Level: {predicted_level}" | |
| return details_text.strip() | |
| except Exception as e: | |
| return f"Error processing scores: {str(e)}" | |
| except Exception as e: | |
| return f"Error generating taxonomy details: {str(e)}" | |
| def predict_with_loaded_model(text): | |
| inputs = tokenizer(text, return_tensors='pt', padding=True, truncation=True, max_length=512) | |
| input_ids = inputs['input_ids'].to(device) | |
| model.eval() | |
| with torch.no_grad(): | |
| outputs = model(input_ids) | |
| logits = outputs.logits | |
| probabilities = softmax(logits, dim=-1) | |
| probabilities = probabilities.squeeze().cpu().numpy() | |
| # Convert to float and format to 3 decimal places | |
| class_probabilities = {reverse_mapping[i]: float(f"{prob:.3f}") for i, prob in enumerate(probabilities)} | |
| return class_probabilities | |
| def process_document(input_path): | |
| if input_path.lower().endswith(".pdf"): | |
| doc = DocumentFile.from_pdf(input_path) | |
| #print(f"Number of pages: {len(doc)}") | |
| elif input_path.lower().endswith((".jpg", ".jpeg", ".png", ".bmp", ".tiff")): | |
| doc = DocumentFile.from_images(input_path) | |
| else: | |
| raise ValueError("Unsupported file type. Please provide a PDF or an image file.") | |
| result = modelocr(doc) | |
| def calculate_average_confidence(result): | |
| total_confidence = 0 | |
| word_count = 0 | |
| for page in result.pages: | |
| for block in page.blocks: | |
| for line in block.lines: | |
| for word in line.words: | |
| total_confidence += word.confidence | |
| word_count += 1 | |
| average_confidence = total_confidence / word_count if word_count > 0 else 0 | |
| return average_confidence | |
| average_confidence = calculate_average_confidence(result) | |
| string_result = result.render() | |
| return {'Avg_Confidence': average_confidence, 'String':string_result.split('\n')} | |
| def sendtogemini(inputpath, question): | |
| if inputpath and inputpath.lower().endswith((".pdf", ".jpg", ".jpeg", ".png")): | |
| qw = process_document(inputpath) | |
| elif question: | |
| qw = {'String': [question]} | |
| else: | |
| raise ValueError("Unsupported file type. Please provide a PDF or an image file.") | |
| questionset = str(qw['String']) | |
| # send this prompt to gemini : | |
| questionset += """You are given a list of text fragments containing questions fragments extracted by an ocr model. Your task is to: | |
| # only Merge the question fragments into complete and coherent questions.Don't answer then. | |
| # Separate each question , start a new question with @ to make them easily distinguishable for further processing.""" | |
| url = f"https://generativelanguage.googleapis.com/v1beta/models/gemini-1.5-flash-latest:generateContent?key={os.getenv('GEMINI_API_KEY')}" | |
| payload = { | |
| "contents": [ | |
| { | |
| "parts": [ | |
| {"text": questionset} | |
| ] | |
| } | |
| ] | |
| } | |
| headers = {"Content-Type": "application/json"} | |
| response = requests.post(url, headers=headers, data=json.dumps(payload), timeout=60) | |
| result = response.json() | |
| res1 = result.get("candidates", [{}])[0].get("content", {}).get("parts", [{}])[0].get("text", "") | |
| question = [] | |
| for i in res1.split('\n'): | |
| i = i.strip() | |
| if len(i) > 0: | |
| if i[0] == '@': | |
| i = i[1:].strip().lower() | |
| if i[0] == 'q': | |
| question.append(i[1:].strip()) | |
| else: | |
| question.append(i) | |
| data = [] | |
| for i in question: | |
| d = {} | |
| d['question'] = i | |
| d['score'] = predict_with_loaded_model(i) | |
| data.append(d) | |
| return data | |