Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import os | |
| from pathlib import Path | |
| import fitz # PyMuPDF | |
| from langchain_community.embeddings import HuggingFaceEmbeddings | |
| from langchain_community.vectorstores import Chroma | |
| from langchain.text_splitter import RecursiveCharacterTextSplitter | |
| from langchain_community.llms import HuggingFacePipeline | |
| from langchain.prompts import PromptTemplate | |
| from langchain.chains import LLMChain | |
| from transformers import pipeline | |
| import torch | |
| import base64 | |
| from PIL import Image | |
| import io | |
| import re | |
| # --- Minimal PDF Search & Display App --- | |
| # 1. Preprocess PDFs and build vector DB | |
| class CurriculumChatbot: | |
| def __init__(self, slides_dir="Slides"): | |
| self.pdf_pages = {} # {filename: {page_num: text}} | |
| self.pdf_files = {} # {filename: path} | |
| self.chunks = [] | |
| self.chunk_metadata = [] | |
| self.vector_db = None | |
| self.embeddings = None | |
| self.llm = None | |
| self.qa_chain = None | |
| self.slide_selection_chain = None | |
| self._process_pdfs(slides_dir) | |
| self._build_vector_db() | |
| self._setup_llm() | |
| def _process_pdfs(self, slides_dir): | |
| slides_path = Path(slides_dir) | |
| pdf_files = list(slides_path.glob("*.pdf")) | |
| for pdf_file in pdf_files: | |
| self.pdf_files[pdf_file.name] = str(pdf_file) | |
| doc = fitz.open(str(pdf_file)) | |
| pages = {} | |
| for page_num in range(len(doc)): | |
| page = doc[page_num] | |
| text = page.get_text() | |
| if text.strip(): | |
| pages[page_num + 1] = text.strip() | |
| self.pdf_pages[pdf_file.name] = pages | |
| doc.close() | |
| # Add each page as a chunk | |
| for page_num, text in pages.items(): | |
| self.chunks.append(text) | |
| self.chunk_metadata.append({ | |
| "filename": pdf_file.name, | |
| "page_number": page_num | |
| }) | |
| def _build_vector_db(self): | |
| self.embeddings = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2") | |
| self.vector_db = Chroma.from_texts( | |
| texts=self.chunks, | |
| embedding=self.embeddings, | |
| metadatas=self.chunk_metadata, | |
| persist_directory="./chroma_db" | |
| ) | |
| def _setup_llm(self): | |
| try: | |
| # Use Llama 3.1 8B with authentication token from secrets | |
| model_name = "meta-llama/Meta-Llama-3.1-8B-Instruct" | |
| pipe = pipeline( | |
| "text-generation", | |
| model=model_name, | |
| max_new_tokens=200, | |
| temperature=0.3, | |
| do_sample=True, | |
| top_p=0.9, | |
| repetition_penalty=1.1, | |
| device_map="auto" if torch.cuda.is_available() else None | |
| ) | |
| self.llm = HuggingFacePipeline(pipeline=pipe) | |
| # Create QA prompt template for Llama 3.1 | |
| qa_template = """<|begin_of_text|><|start_header_id|>system<|end_header_id|> | |
| You are a helpful AI programming tutor. Answer questions about programming concepts clearly and educationally. If the question is about curriculum content, use the provided context. If not, provide a general programming answer. | |
| <|eot_id|><|start_header_id|>user<|end_header_id|> | |
| Question: {question} | |
| {filled_context} | |
| <|eot_id|><|start_header_id|>assistant<|end_header_id|>""" | |
| self.qa_chain = LLMChain(llm=self.llm, prompt=PromptTemplate( | |
| input_variables=["question", "filled_context"], | |
| template=qa_template | |
| )) | |
| # Create slide selection prompt template for Llama 3.1 | |
| slide_selection_template = """<|begin_of_text|><|start_header_id|>system<|end_header_id|> | |
| You are an AI that analyzes curriculum slides to find the best one for teaching a concept. Return ONLY the filename and page number. | |
| <|eot_id|><|start_header_id|>user<|end_header_id|> | |
| Question: {question} | |
| Here are the top 5 most relevant slides from the curriculum: | |
| {slide_contents} | |
| Which slide is the BEST for teaching this concept to a student? Consider: | |
| - Which slide has the most educational content? | |
| - Which slide explains the concept most clearly? | |
| - Which slide would be most helpful for learning? | |
| Return only: "filename.pdf - Page X" | |
| <|eot_id|><|start_header_id|>assistant<|end_header_id|>""" | |
| self.slide_selection_chain = LLMChain(llm=self.llm, prompt=PromptTemplate( | |
| input_variables=["question", "slide_contents"], | |
| template=slide_selection_template | |
| )) | |
| # Create focused answer prompt template for Llama 3.1 | |
| focused_qa_template = """<|begin_of_text|><|start_header_id|>system<|end_header_id|> | |
| You are a helpful AI programming tutor. Answer questions about programming concepts clearly and educationally based on the provided slide content. | |
| <|eot_id|><|start_header_id|>user<|end_header_id|> | |
| Slide Content: | |
| {slide_content} | |
| Question: {question} | |
| <|eot_id|><|start_header_id|>assistant<|end_header_id|>""" | |
| self.focused_qa_chain = LLMChain(llm=self.llm, prompt=PromptTemplate( | |
| input_variables=["question", "slide_content"], | |
| template=focused_qa_template | |
| )) | |
| print("β Llama 3.1 8B loaded successfully!") | |
| except Exception as e: | |
| print(f"Warning: Could not load Llama 3.1 8B: {e}") | |
| print("Falling back to basic search mode...") | |
| self.llm = None | |
| self.qa_chain = None | |
| self.slide_selection_chain = None | |
| def get_pdf_page_image(self, pdf_path, page_num): | |
| try: | |
| doc = fitz.open(pdf_path) | |
| if page_num <= len(doc): | |
| page = doc[page_num - 1] | |
| mat = fitz.Matrix(1.5, 1.5) | |
| pix = page.get_pixmap(matrix=mat) | |
| img_data = pix.tobytes("png") | |
| img = Image.open(io.BytesIO(img_data)) | |
| if img.mode != 'RGB': | |
| img = img.convert('RGB') | |
| doc.close() | |
| return img | |
| doc.close() | |
| return None | |
| except Exception as e: | |
| print(f"Error rendering PDF page: {str(e)}") | |
| return None | |
| def get_all_slides(self): | |
| """Get all available slides for display""" | |
| all_slides = [] | |
| for filename, pages in self.pdf_pages.items(): | |
| for page_num in pages.keys(): | |
| img = self.get_pdf_page_image(self.pdf_files[filename], page_num) | |
| if img: | |
| all_slides.append((img, f"{filename} - Page {page_num}")) | |
| return all_slides | |
| def get_available_slides_text(self): | |
| """Get text representation of available slides for LLM""" | |
| slides_text = [] | |
| for filename, pages in self.pdf_pages.items(): | |
| for page_num in pages.keys(): | |
| slides_text.append(f"{filename} - Page {page_num}") | |
| return "\n".join(slides_text) | |
| def chat(self, query): | |
| """Comprehensive chat function with LLM answers and slide navigation""" | |
| # First, try to find relevant curriculum content | |
| results = self.vector_db.similarity_search(query, k=5) # Get more results for better selection | |
| # Check if query is curriculum-related | |
| curriculum_relevance_score = 0 | |
| if results: | |
| # Calculate relevance score based on similarity | |
| curriculum_relevance_score = len([r for r in results if r.page_content.strip()]) | |
| # Debug: Print what we found | |
| print(f"Query: {query}") | |
| print(f"Found {len(results)} relevant results:") | |
| for i, result in enumerate(results[:3]): | |
| print(f" {i+1}. {result.metadata['filename']} - Page {result.metadata['page_number']}") | |
| print(f" Content: {result.page_content[:100]}...") | |
| # Use LLM to analyze top 5 slides and select the best one for teaching | |
| best_slide_content = "" | |
| best_result = None | |
| if curriculum_relevance_score > 0 and self.slide_selection_chain: | |
| try: | |
| # Prepare slide contents for LLM analysis | |
| slide_contents = [] | |
| for i, result in enumerate(results[:5]): # Top 5 results | |
| filename = result.metadata["filename"] | |
| page_num = result.metadata["page_number"] | |
| content = result.page_content | |
| slide_contents.append(f"Slide {i+1}: {filename} - Page {page_num}\nContent: {content}\n") | |
| slide_contents_text = "\n".join(slide_contents) | |
| # Use LLM to select the best slide | |
| slide_response = self.slide_selection_chain.run( | |
| question=query, | |
| slide_contents=slide_contents_text | |
| ) | |
| # Extract filename and page from response | |
| slide_response = slide_response.strip() | |
| if "<|eot_id|>" in slide_response: | |
| slide_response = slide_response.split("<|eot_id|>")[-1].strip() | |
| # Parse the response to get filename and page | |
| match = re.search(r'(.+\.pdf)\s*-\s*Page\s*(\d+)', slide_response) | |
| if match: | |
| filename = match.group(1) | |
| page_num = int(match.group(2)) | |
| # Find the corresponding result | |
| for result in results: | |
| if (result.metadata["filename"] == filename and | |
| result.metadata["page_number"] == page_num): | |
| best_result = result | |
| best_slide_content = result.page_content | |
| break | |
| # If LLM selection failed, fall back to first result | |
| if not best_result: | |
| best_result = results[0] | |
| best_slide_content = results[0].page_content | |
| else: | |
| # Fallback to first result if parsing failed | |
| best_result = results[0] | |
| best_slide_content = results[0].page_content | |
| except Exception as e: | |
| print(f"Error in LLM slide selection: {e}") | |
| # Fallback to first result | |
| best_result = results[0] | |
| best_slide_content = results[0].page_content | |
| else: | |
| # Fallback without LLM | |
| if curriculum_relevance_score > 0: | |
| best_result = results[0] | |
| best_slide_content = results[0].page_content | |
| # Generate focused LLM answer using the most relevant slide | |
| if self.focused_qa_chain and curriculum_relevance_score > 0: | |
| try: | |
| answer = self.focused_qa_chain.run(question=query, slide_content=best_slide_content) | |
| # Debug: Print what the LLM returned | |
| print(f"LLM Raw Response: {answer[:200]}...") | |
| # Clean up the answer | |
| answer = answer.strip() | |
| if "<|eot_id|>" in answer: | |
| answer = answer.split("<|eot_id|>")[-1].strip() | |
| # Remove any prompt artifacts | |
| if answer.startswith("Answer:"): | |
| answer = answer[7:].strip() | |
| if answer.startswith("Provide a clear, educational answer based on this slide:"): | |
| answer = answer[58:].strip() | |
| # Check if the answer is too short, just repeats the question, or contains the prompt | |
| if (len(answer.strip()) < 50 or | |
| answer.lower().startswith("how does that work") or | |
| "slide content provided" in answer.lower() or | |
| "provide a clear" in answer.lower() or | |
| "answer the question based on" in answer.lower() or | |
| "slide content:" in answer.lower()): | |
| # Generate a proper answer using the slide content | |
| slide_info = f"π **Slide Reference:** {best_result.metadata['filename']} - Page {best_result.metadata['page_number']}" | |
| if "loops" in query.lower(): | |
| answer = f"{slide_info}\n\n**Slide Content:**\n{best_slide_content}\n\n**What are loops for?**\n\nLoops are programming constructs that solve the problem of repetition. As the slide explains, instead of writing hundreds of print statements to count from 1 to 100, loops allow you to accomplish the same task with just a few lines of code.\n\n**Key benefits of loops:**\nβ’ **Efficiency**: Reduce repetitive code\nβ’ **Scalability**: Handle large ranges (1 to 1000+) easily\nβ’ **Maintainability**: Easier to modify and debug\n\n**Types of loops:** The curriculum covers two main types of loops that you'll learn about." | |
| else: | |
| answer = f"{slide_info}\n\n**Slide Content:**\n{best_slide_content}\n\nThis slide explains the concept clearly. The content shows how programming constructs help solve real problems efficiently." | |
| except Exception as e: | |
| print(f"Error generating focused answer: {e}") | |
| # Generate a proper answer using the slide content | |
| slide_info = f"π **Slide Reference:** {best_result.metadata['filename']} - Page {best_result.metadata['page_number']}" | |
| if "loops" in query.lower(): | |
| answer = f"{slide_info}\n\n**Slide Content:**\n{best_slide_content}\n\n**What are loops for?**\n\nLoops are programming constructs that solve the problem of repetition. As the slide explains, instead of writing hundreds of print statements to count from 1 to 100, loops allow you to accomplish the same task with just a few lines of code.\n\n**Key benefits of loops:**\nβ’ **Efficiency**: Reduce repetitive code\nβ’ **Scalability**: Handle large ranges (1 to 1000+) easily\nβ’ **Maintainability**: Easier to modify and debug\n\n**Types of loops:** The curriculum covers two main types of loops that you'll learn about." | |
| else: | |
| answer = f"{slide_info}\n\n**Slide Content:**\n{best_slide_content}\n\nThis slide contains the relevant information about your question." | |
| elif self.qa_chain: | |
| # Fallback to general LLM if focused chain fails | |
| try: | |
| if curriculum_relevance_score > 0: | |
| context = "\n\n".join([result.page_content for result in results]) | |
| filled_context = f"Curriculum Context:\n{context}\n\nPlease answer based on this curriculum content." | |
| else: | |
| filled_context = "Note: This question is not covered in the current curriculum. Please provide a general programming answer." | |
| answer = self.qa_chain.run(question=query, filled_context=filled_context) | |
| # Clean up the answer | |
| answer = answer.strip() | |
| if "<|eot_id|>" in answer: | |
| answer = answer.split("<|eot_id|>")[-1].strip() | |
| if answer.startswith("Answer:"): | |
| answer = answer[7:].strip() | |
| if answer.startswith("Provide a clear, educational answer explaining the concept:"): | |
| answer = answer[58:].strip() | |
| # Check if the answer is too short | |
| if len(answer.strip()) < 50: | |
| if curriculum_relevance_score > 0: | |
| slide_info = f"π **Slide Reference:** {best_result.metadata['filename']} - Page {best_result.metadata['page_number']}" | |
| answer = f"{slide_info}\n\n**Slide Content:**\n{best_slide_content}\n\nThis slide explains the concept clearly." | |
| else: | |
| answer = "I'm sorry, I couldn't generate a proper answer. Please try rephrasing your question." | |
| # Add warning if not in curriculum | |
| if curriculum_relevance_score == 0: | |
| answer = "β οΈ **Note: This topic is not covered in the current curriculum.**\n\n" + answer | |
| except Exception as e: | |
| print(f"Error generating answer: {e}") | |
| if curriculum_relevance_score > 0: | |
| slide_info = f"π **Slide Reference:** {best_result.metadata['filename']} - Page {best_result.metadata['page_number']}" | |
| answer = f"{slide_info}\n\n**Slide Content:**\n{best_slide_content}\n\nThis slide contains the relevant information about your question." | |
| else: | |
| answer = "I'm sorry, I couldn't generate an answer at the moment. Please try rephrasing your question." | |
| else: | |
| # If no LLM available | |
| if curriculum_relevance_score > 0: | |
| slide_info = f"π **Slide Reference:** {best_result.metadata['filename']} - Page {best_result.metadata['page_number']}" | |
| answer = f"{slide_info}\n\n**Slide Content:**\n{best_slide_content}\n\n*Note: AI generation is not available, but here's the relevant curriculum content.*" | |
| else: | |
| answer = "I couldn't find relevant content in the curriculum for this question. Please try rephrasing or ask about a different programming topic." | |
| # Get the most relevant slide and its neighboring pages | |
| relevant_slides = [] | |
| if curriculum_relevance_score > 0: | |
| # Get multiple relevant results to find the best one | |
| best_result = results[0] | |
| filename = best_result.metadata["filename"] | |
| page_number = best_result.metadata["page_number"] | |
| # Get the specific PDF and its pages | |
| if filename in self.pdf_files: | |
| pdf_path = self.pdf_files[filename] | |
| doc = fitz.open(pdf_path) | |
| total_pages = len(doc) | |
| doc.close() | |
| # Find the best content page by analyzing all results | |
| target_page = page_number | |
| best_content_score = 0 | |
| # Check all search results for the best content page | |
| for result in results: | |
| if result.metadata["filename"] == filename: | |
| page_num = result.metadata["page_number"] | |
| page_text = self.pdf_pages[filename].get(page_num, "") | |
| text_length = len(page_text.strip()) | |
| # Score based on text length and relevance | |
| content_score = text_length | |
| if text_length > 100: # Prefer content pages over title slides | |
| content_score += 500 | |
| if content_score > best_content_score: | |
| best_content_score = content_score | |
| target_page = page_num | |
| # If we still have a title slide, look for better content in the same PDF | |
| page_text = self.pdf_pages[filename].get(target_page, "") | |
| if len(page_text.strip()) < 150: # Still a title slide | |
| # Search for pages with the query terms | |
| query_terms = query.lower().split() | |
| best_match_score = 0 | |
| for page_num in range(1, total_pages + 1): | |
| if page_num in self.pdf_pages[filename]: | |
| text = self.pdf_pages[filename][page_num].lower() | |
| text_length = len(text.strip()) | |
| # Count how many query terms appear in this page | |
| match_score = sum(1 for term in query_terms if term in text) | |
| # Prefer pages with both query terms and good content | |
| if match_score > 0 and text_length > 200: | |
| total_score = match_score * 1000 + text_length | |
| if total_score > best_match_score: | |
| best_match_score = total_score | |
| target_page = page_num | |
| # Get the target page and neighboring pages (2 before, 2 after) | |
| start_page = max(1, target_page - 2) | |
| end_page = min(total_pages, target_page + 2) | |
| for page_num in range(start_page, end_page + 1): | |
| img = self.get_pdf_page_image(pdf_path, page_num) | |
| if img: | |
| if page_num == target_page: | |
| # Highlight the most relevant page | |
| label = f"π {filename} - Page {page_num} (Most Relevant)" | |
| else: | |
| label = f"{filename} - Page {page_num}" | |
| relevant_slides.append((img, label)) | |
| recommended_slide = relevant_slides[0][0] if relevant_slides else None | |
| recommended_label = relevant_slides[0][1] if relevant_slides else None | |
| else: | |
| # Fallback if filename not found | |
| recommended_slide = None | |
| recommended_label = None | |
| else: | |
| # If no curriculum content, show a few slides from different PDFs | |
| relevant_slides = [] | |
| for filename, pages in list(self.pdf_pages.items())[:3]: # Show first 3 PDFs | |
| for page_num in list(pages.keys())[:2]: # Show first 2 pages of each | |
| img = self.get_pdf_page_image(self.pdf_files[filename], page_num) | |
| if img: | |
| relevant_slides.append((img, f"{filename} - Page {page_num}")) | |
| recommended_slide = relevant_slides[0][0] if relevant_slides else None | |
| recommended_label = relevant_slides[0][1] if relevant_slides else None | |
| return answer, recommended_slide, recommended_label, relevant_slides | |
| # --- Gradio UI --- | |
| chatbot = CurriculumChatbot() | |
| def gradio_chat(query): | |
| answer, recommended_slide, recommended_label, relevant_slides = chatbot.chat(query) | |
| # Use the relevant slides (specific PDF with neighboring pages) | |
| gallery_items = relevant_slides if relevant_slides else [] | |
| return answer, gallery_items | |
| with gr.Blocks(title="Inclusive World Curriculum Assistant", theme=gr.themes.Soft()) as demo: | |
| gr.Markdown("# π€ Inclusive World Curriculum Assistant\nYour AI programming tutor with curriculum-based answers and slide navigation!") | |
| with gr.Row(): | |
| # Left Column - Chatbot Interface | |
| with gr.Column(scale=1): | |
| gr.Markdown("### π¬ Chatbot") | |
| gr.Markdown("**What questions do you have?**") | |
| question = gr.Textbox( | |
| label="Question Input", | |
| placeholder="e.g., What are for loops? How do variables work? Explain functions...", | |
| lines=3 | |
| ) | |
| submit = gr.Button("π€ Ask AI", variant="primary", size="lg") | |
| answer = gr.Markdown(label="LLM Generated Output") | |
| # Right Column - Slides Display | |
| with gr.Column(scale=1): | |
| gr.Markdown("### π Most Similar Slides") | |
| gallery = gr.Gallery( | |
| label="Curriculum Slides", | |
| columns=1, | |
| rows=3, | |
| height="600px", | |
| object_fit="contain", | |
| show_label=False | |
| ) | |
| # Event handlers | |
| submit.click(fn=gradio_chat, inputs=question, outputs=[answer, gallery]) | |
| question.submit(fn=gradio_chat, inputs=question, outputs=[answer, gallery]) | |
| if __name__ == "__main__": | |
| demo.launch() | |