Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import os | |
| from pathlib import Path | |
| import fitz # PyMuPDF | |
| from langchain_community.embeddings import HuggingFaceEmbeddings | |
| from langchain_community.vectorstores import Chroma | |
| from langchain.text_splitter import RecursiveCharacterTextSplitter | |
| from langchain_huggingface import HuggingFacePipeline | |
| from langchain.prompts import PromptTemplate | |
| from transformers import pipeline | |
| import torch | |
| import base64 | |
| from PIL import Image | |
| import io | |
| import re | |
| import time | |
| # --- Optimized Curriculum Assistant with Full LLM Features --- | |
| class OptimizedCurriculumChatbot: | |
| def __init__(self, slides_dir="Slides"): | |
| self.pdf_pages = {} # {filename: {page_num: text}} | |
| self.pdf_files = {} # {filename: path} | |
| self.chunks = [] | |
| self.chunk_metadata = [] | |
| self.vector_db = None | |
| self.embeddings = None | |
| self.llm = None | |
| self.qa_chain = None | |
| self.slide_selection_chain = None | |
| self.focused_qa_chain = None | |
| self.response_cache = {} # Cache for responses | |
| self._process_pdfs(slides_dir) | |
| self._build_vector_db() | |
| self._setup_optimized_llm() | |
| def _process_pdfs(self, slides_dir): | |
| slides_path = Path(slides_dir) | |
| pdf_files = list(slides_path.glob("*.pdf")) | |
| for pdf_file in pdf_files: | |
| self.pdf_files[pdf_file.name] = str(pdf_file) | |
| doc = fitz.open(str(pdf_file)) | |
| pages = {} | |
| for page_num in range(len(doc)): | |
| page = doc[page_num] | |
| text = page.get_text() | |
| if text.strip(): | |
| pages[page_num + 1] = text.strip() | |
| self.pdf_pages[pdf_file.name] = pages | |
| doc.close() | |
| # Add each page as a chunk | |
| for page_num, text in pages.items(): | |
| self.chunks.append(text) | |
| self.chunk_metadata.append({ | |
| "filename": pdf_file.name, | |
| "page_number": page_num | |
| }) | |
| def _build_vector_db(self): | |
| self.embeddings = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2") | |
| self.vector_db = Chroma.from_texts( | |
| texts=self.chunks, | |
| embedding=self.embeddings, | |
| metadatas=self.chunk_metadata, | |
| persist_directory="./chroma_db" | |
| ) | |
| def _setup_optimized_llm(self): | |
| try: | |
| # Use a much faster but still capable model | |
| # Microsoft/DialoGPT-medium is ~345M parameters vs 8B for Llama | |
| model_name = "microsoft/DialoGPT-medium" | |
| # Get token from secrets | |
| token = os.environ.get("IW_Token") | |
| if not token: | |
| raise ValueError("IW_Token not found in environment variables") | |
| pipe = pipeline( | |
| "text-generation", | |
| model=model_name, | |
| max_new_tokens=150, # Optimized for speed | |
| temperature=0.3, | |
| do_sample=True, | |
| top_p=0.9, | |
| repetition_penalty=1.1, | |
| device_map="auto" if torch.cuda.is_available() else None, | |
| token=token, | |
| # Performance optimizations | |
| torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32, | |
| low_cpu_mem_usage=True | |
| ) | |
| self.llm = HuggingFacePipeline(pipeline=pipe) | |
| # Optimized prompt templates for faster processing | |
| qa_template = """You are a helpful AI programming tutor. Answer questions about programming concepts clearly and educationally. | |
| Question: {question} | |
| Context: {filled_context} | |
| Answer:""" | |
| self.qa_prompt = PromptTemplate( | |
| input_variables=["question", "filled_context"], | |
| template=qa_template | |
| ) | |
| self.qa_chain = self.qa_prompt | self.llm | |
| # Optimized slide selection template | |
| slide_selection_template = """You are an AI that analyzes curriculum slides to find the best one for teaching a concept. | |
| Question: {question} | |
| Available slides: | |
| {slide_contents} | |
| Select the best slide (filename.pdf - Page X):""" | |
| self.slide_selection_prompt = PromptTemplate( | |
| input_variables=["question", "slide_contents"], | |
| template=slide_selection_template | |
| ) | |
| self.slide_selection_chain = self.slide_selection_prompt | self.llm | |
| # Optimized focused QA template | |
| focused_qa_template = """You are a helpful AI programming tutor. Answer questions based on the provided slide content. | |
| Slide Content: {slide_content} | |
| Question: {question} | |
| Answer:""" | |
| self.focused_qa_prompt = PromptTemplate( | |
| input_variables=["question", "slide_content"], | |
| template=focused_qa_template | |
| ) | |
| self.focused_qa_chain = self.focused_qa_prompt | self.llm | |
| print("β Optimized LLM loaded successfully!") | |
| except Exception as e: | |
| print(f"Warning: Could not load optimized LLM: {e}") | |
| print("Falling back to basic search mode...") | |
| self.llm = None | |
| self.qa_chain = None | |
| self.slide_selection_chain = None | |
| def get_pdf_page_image(self, pdf_path, page_num): | |
| try: | |
| doc = fitz.open(pdf_path) | |
| if page_num <= len(doc): | |
| page = doc[page_num - 1] | |
| mat = fitz.Matrix(1.5, 1.5) | |
| pix = page.get_pixmap(matrix=mat) | |
| img_data = pix.tobytes("png") | |
| img = Image.open(io.BytesIO(img_data)) | |
| if img.mode != 'RGB': | |
| img = img.convert('RGB') | |
| doc.close() | |
| return img | |
| doc.close() | |
| return None | |
| except Exception as e: | |
| print(f"Error rendering PDF page: {str(e)}") | |
| return None | |
| def chat(self, query): | |
| """Optimized chat function with full LLM features""" | |
| start_time = time.time() | |
| # Check cache first for faster responses | |
| if query in self.response_cache: | |
| print(f"β Using cached response (took {time.time() - start_time:.2f}s)") | |
| return self.response_cache[query] | |
| # First, try to find relevant curriculum content | |
| results = self.vector_db.similarity_search(query, k=3) # Optimized for speed | |
| # Check if query is curriculum-related | |
| curriculum_relevance_score = 0 | |
| if results: | |
| curriculum_relevance_score = len([r for r in results if r.page_content.strip()]) | |
| # Debug: Print what we found | |
| print(f"Query: {query}") | |
| print(f"Found {len(results)} relevant results in {time.time() - start_time:.2f}s") | |
| # Use LLM to analyze slides and select the best one for teaching | |
| best_slide_content = "" | |
| best_result = None | |
| if curriculum_relevance_score > 0 and self.slide_selection_chain: | |
| try: | |
| # Prepare slide contents for LLM analysis | |
| slide_contents = [] | |
| for i, result in enumerate(results[:3]): # Top 3 results for speed | |
| filename = result.metadata["filename"] | |
| page_num = result.metadata["page_number"] | |
| content = result.page_content | |
| slide_contents.append(f"Slide {i+1}: {filename} - Page {page_num}\nContent: {content}\n") | |
| slide_contents_text = "\n".join(slide_contents) | |
| # Use LLM to select the best slide | |
| slide_response = self.slide_selection_chain.invoke({ | |
| "question": query, | |
| "slide_contents": slide_contents_text | |
| }) | |
| # Extract filename and page from response | |
| slide_response = slide_response.strip() | |
| # Parse the response to get filename and page | |
| match = re.search(r'(.+\.pdf)\s*-\s*Page\s*(\d+)', slide_response) | |
| if match: | |
| filename = match.group(1) | |
| page_num = int(match.group(2)) | |
| # Find the corresponding result | |
| for result in results: | |
| if (result.metadata["filename"] == filename and | |
| result.metadata["page_number"] == page_num): | |
| best_result = result | |
| best_slide_content = result.page_content | |
| break | |
| # If LLM selection failed, fall back to first result | |
| if not best_result: | |
| best_result = results[0] | |
| best_slide_content = results[0].page_content | |
| else: | |
| # Fallback to first result if parsing failed | |
| best_result = results[0] | |
| best_slide_content = results[0].page_content | |
| except Exception as e: | |
| print(f"Error in LLM slide selection: {e}") | |
| # Fallback to first result | |
| best_result = results[0] | |
| best_slide_content = results[0].page_content | |
| else: | |
| # Fallback without LLM | |
| if curriculum_relevance_score > 0: | |
| best_result = results[0] | |
| best_slide_content = results[0].page_content | |
| # Generate focused LLM answer using the most relevant slide | |
| if self.focused_qa_chain and curriculum_relevance_score > 0: | |
| try: | |
| answer = self.focused_qa_chain.invoke({ | |
| "question": query, | |
| "slide_content": best_slide_content | |
| }) | |
| # Clean up the answer | |
| answer = answer.strip() | |
| # Check if the answer is too short or generic | |
| if len(answer.strip()) < 50: | |
| # Generate a proper answer using the slide content | |
| slide_info = f"π **Slide Reference:** {best_result.metadata['filename']} - Page {best_result.metadata['page_number']}" | |
| answer = f"{slide_info}\n\n**Slide Content:**\n{best_slide_content}\n\n**AI Explanation:**\n{answer}" | |
| except Exception as e: | |
| print(f"Error generating focused answer: {e}") | |
| # Generate a proper answer using the slide content | |
| slide_info = f"π **Slide Reference:** {best_result.metadata['filename']} - Page {best_result.metadata['page_number']}" | |
| answer = f"{slide_info}\n\n**Slide Content:**\n{best_slide_content}\n\nThis slide contains relevant information about your question." | |
| elif self.qa_chain: | |
| # Fallback to general LLM if focused chain fails | |
| try: | |
| if curriculum_relevance_score > 0: | |
| context = "\n\n".join([result.page_content for result in results]) | |
| filled_context = f"Curriculum Context:\n{context}\n\nPlease answer based on this curriculum content." | |
| else: | |
| filled_context = "Note: This question is not covered in the current curriculum. Please provide a general programming answer." | |
| answer = self.qa_chain.invoke({ | |
| "question": query, | |
| "filled_context": filled_context | |
| }) | |
| # Clean up the answer | |
| answer = answer.strip() | |
| # Check if the answer is too short | |
| if len(answer.strip()) < 50: | |
| if curriculum_relevance_score > 0: | |
| slide_info = f"π **Slide Reference:** {best_result.metadata['filename']} - Page {best_result.metadata['page_number']}" | |
| answer = f"{slide_info}\n\n**Slide Content:**\n{best_slide_content}\n\n**AI Explanation:**\n{answer}" | |
| else: | |
| answer = "I'm sorry, I couldn't generate a proper answer. Please try rephrasing your question." | |
| # Add warning if not in curriculum | |
| if curriculum_relevance_score == 0: | |
| answer = "β οΈ **Note: This topic is not covered in the current curriculum.**\n\n" + answer | |
| except Exception as e: | |
| print(f"Error generating answer: {e}") | |
| if curriculum_relevance_score > 0: | |
| slide_info = f"π **Slide Reference:** {best_result.metadata['filename']} - Page {best_result.metadata['page_number']}" | |
| answer = f"{slide_info}\n\n**Slide Content:**\n{best_slide_content}\n\nThis slide contains the relevant information about your question." | |
| else: | |
| answer = "I'm sorry, I couldn't generate an answer at the moment. Please try rephrasing your question." | |
| else: | |
| # If no LLM available | |
| if curriculum_relevance_score > 0: | |
| slide_info = f"π **Slide Reference:** {best_result.metadata['filename']} - Page {best_result.metadata['page_number']}" | |
| answer = f"{slide_info}\n\n**Slide Content:**\n{best_slide_content}\n\n*Note: AI generation is not available, but here's the relevant curriculum content.*" | |
| else: | |
| answer = "I couldn't find relevant content in the curriculum for this question. Please try rephrasing or ask about a different programming topic." | |
| # Get the most relevant slide and its neighboring pages | |
| relevant_slides = [] | |
| if curriculum_relevance_score > 0: | |
| # Get multiple relevant results to find the best one | |
| best_result = results[0] | |
| filename = best_result.metadata["filename"] | |
| page_number = best_result.metadata["page_number"] | |
| # Get the specific PDF and its pages | |
| if filename in self.pdf_files: | |
| pdf_path = self.pdf_files[filename] | |
| doc = fitz.open(pdf_path) | |
| total_pages = len(doc) | |
| doc.close() | |
| # Find the best content page by analyzing all results | |
| target_page = page_number | |
| best_content_score = 0 | |
| # Check all search results for the best content page | |
| for result in results: | |
| if result.metadata["filename"] == filename: | |
| page_num = result.metadata["page_number"] | |
| page_text = self.pdf_pages[filename].get(page_num, "") | |
| text_length = len(page_text.strip()) | |
| # Score based on text length and relevance | |
| content_score = text_length | |
| if text_length > 100: # Prefer content pages over title slides | |
| content_score += 500 | |
| if content_score > best_content_score: | |
| best_content_score = content_score | |
| target_page = page_num | |
| # Get the target page and neighboring pages (2 before, 2 after) | |
| start_page = max(1, target_page - 2) | |
| end_page = min(total_pages, target_page + 2) | |
| for page_num in range(start_page, end_page + 1): | |
| img = self.get_pdf_page_image(pdf_path, page_num) | |
| if img: | |
| if page_num == target_page: | |
| # Highlight the most relevant page | |
| label = f"π {filename} - Page {page_num} (Most Relevant)" | |
| else: | |
| label = f"{filename} - Page {page_num}" | |
| relevant_slides.append((img, label)) | |
| recommended_slide = relevant_slides[0][0] if relevant_slides else None | |
| recommended_label = relevant_slides[0][1] if relevant_slides else None | |
| else: | |
| # Fallback if filename not found | |
| recommended_slide = None | |
| recommended_label = None | |
| else: | |
| # If no curriculum content, show a few slides from different PDFs | |
| relevant_slides = [] | |
| for filename, pages in list(self.pdf_pages.items())[:3]: # Show first 3 PDFs | |
| for page_num in list(pages.keys())[:2]: # Show first 2 pages of each | |
| img = self.get_pdf_page_image(self.pdf_files[filename], page_num) | |
| if img: | |
| relevant_slides.append((img, f"{filename} - Page {page_num}")) | |
| recommended_slide = relevant_slides[0][0] if relevant_slides else None | |
| recommended_label = relevant_slides[0][1] if relevant_slides else None | |
| # Cache the response | |
| self.response_cache[query] = (answer, recommended_slide, recommended_label, relevant_slides) | |
| # Limit cache size to prevent memory issues | |
| if len(self.response_cache) > 50: | |
| # Remove oldest entries | |
| oldest_key = next(iter(self.response_cache)) | |
| del self.response_cache[oldest_key] | |
| total_time = time.time() - start_time | |
| print(f"β Full LLM response generated in {total_time:.2f} seconds") | |
| return answer, recommended_slide, recommended_label, relevant_slides | |
| # --- Gradio UI --- | |
| chatbot = OptimizedCurriculumChatbot() | |
| def gradio_chat(query): | |
| answer, recommended_slide, recommended_label, relevant_slides = chatbot.chat(query) | |
| # Use the relevant slides (specific PDF with neighboring pages) | |
| gallery_items = relevant_slides if relevant_slides else [] | |
| return answer, gallery_items | |
| with gr.Blocks(title="Optimized Curriculum Assistant", theme=gr.themes.Soft()) as demo: | |
| gr.Markdown("# π€ Optimized Curriculum Assistant\nYour AI programming tutor with full LLM features and fast responses!") | |
| with gr.Row(): | |
| # Left Column - Chatbot Interface | |
| with gr.Column(scale=1): | |
| gr.Markdown("### π¬ Smart AI Chatbot") | |
| gr.Markdown("**Ask questions about programming concepts!**") | |
| question = gr.Textbox( | |
| label="Question Input", | |
| placeholder="e.g., What are for loops? How do variables work? Explain functions...", | |
| lines=3 | |
| ) | |
| submit = gr.Button("π€ Ask AI", variant="primary", size="lg") | |
| answer = gr.Markdown(label="AI Generated Response") | |
| # Right Column - Slides Display | |
| with gr.Column(scale=1): | |
| gr.Markdown("### π Smart Slide Navigation") | |
| gallery = gr.Gallery( | |
| label="Curriculum Slides", | |
| columns=1, | |
| rows=3, | |
| height="600px", | |
| object_fit="contain", | |
| show_label=False | |
| ) | |
| # Event handlers | |
| submit.click(fn=gradio_chat, inputs=question, outputs=[answer, gallery]) | |
| question.submit(fn=gradio_chat, inputs=question, outputs=[answer, gallery]) | |
| if __name__ == "__main__": | |
| demo.launch() |