| import os |
| import json |
| from langchain_community.document_loaders import PyPDFLoader |
| from langchain_text_splitters import RecursiveCharacterTextSplitter |
| from langchain_chroma import Chroma |
| from langchain_community.llms import Ollama |
| from backend.config import get_llm, get_embeddings |
| from langchain_core.documents import Document |
| import logging |
| import time |
| import re |
| import requests |
| import xml.etree.ElementTree as ET |
|
|
| |
| logger = logging.getLogger(__name__) |
|
|
| CACHE_DIR = "./chroma_db" |
|
|
| def _get_user_chroma_dir(student_id: str = "anonymous") -> str: |
| """Get per-user ChromaDB persist directory.""" |
| if not student_id or student_id == "anonymous": |
| return CACHE_DIR |
| |
| safe_id = "".join(c if c.isalnum() or c in "_-" else "_" for c in student_id) |
| return f"{CACHE_DIR}/student_{safe_id}" |
|
|
| def clear_all_chroma_data(): |
| """Delete ALL ChromaDB data (all users). Used by admin clear endpoint.""" |
| import shutil |
| try: |
| if os.path.exists(CACHE_DIR): |
| shutil.rmtree(CACHE_DIR) |
| os.makedirs(CACHE_DIR, exist_ok=True) |
| logger.info("✅ All ChromaDB data deleted") |
| return True |
| logger.info("ChromaDB directory does not exist, nothing to clear") |
| return True |
| except Exception as e: |
| logger.error(f"❌ ChromaDB clear error: {e}") |
| return False |
|
|
| INVIDIOUS_INSTANCES = [ |
| "https://inv.nadeko.net", |
| "https://invidious.slipfox.xyz", |
| "https://invidious.privacydev.net", |
| "https://yt.artemislena.eu" |
| ] |
|
|
| def get_youtube_transcript(video_id: str) -> str: |
| """Fetch YouTube transcripts via the Invidious API to bypass network blocks.""" |
| from backend.config import IS_CLOUD |
| if IS_CLOUD: |
| raise ValueError( |
| "YouTube is not available in cloud mode. " |
| "Please upload a PDF instead." |
| ) |
|
|
| |
| captions_data = None |
| last_error = None |
|
|
| for instance in INVIDIOUS_INSTANCES: |
| try: |
| url = f"{instance}/api/v1/captions/{video_id}" |
| response = requests.get(url, timeout=10) |
| if response.status_code == 200: |
| captions_data = response.json() |
| break |
| else: |
| last_error = f"HTTP {response.status_code}" |
| except Exception as e: |
| last_error = str(e) |
| continue |
|
|
| if not captions_data: |
| raise ValueError( |
| "Could not reach any transcript service. " |
| f"Last error: {last_error}. " |
| "Please try again later or upload a PDF instead." |
| ) |
|
|
| |
| captions = captions_data.get("captions", []) |
|
|
| if not captions: |
| raise ValueError( |
| "No captions available for this video. " |
| "The creator may have disabled captions. " |
| "Try a video with the CC button visible, " |
| "or upload a PDF instead." |
| ) |
|
|
| |
| selected = None |
| for cap in captions: |
| lang = cap.get("languageCode", "") |
| auto = cap.get("autoGenerated", False) |
| if lang == "en" and not auto: |
| selected = cap |
| break |
| if not selected: |
| for cap in captions: |
| lang = cap.get("languageCode", "") |
| if lang == "en": |
| selected = cap |
| break |
| if not selected: |
| selected = captions[0] |
|
|
| |
| caption_url = selected.get("url") |
| if not caption_url: |
| label = selected.get("label", "English") |
| lang_code = selected.get("languageCode", "en") |
| caption_url = ( |
| f"{INVIDIOUS_INSTANCES[0]}/api/v1/captions/{video_id}" |
| f"?label={label}&lang={lang_code}" |
| ) |
|
|
| cap_response = requests.get(caption_url, timeout=15) |
| if cap_response.status_code != 200: |
| raise ValueError("Failed to download caption track.") |
|
|
| content = cap_response.text |
|
|
| |
| try: |
| root = ET.fromstring(content) |
| text_parts = [] |
| for elem in root.iter(): |
| if elem.text and elem.text.strip(): |
| text_parts.append(elem.text.strip()) |
| transcript_text = " ".join(text_parts) |
| except ET.ParseError: |
| lines = content.split('\n') |
| text_parts = [] |
| for line in lines: |
| line = line.strip() |
| if not line: |
| continue |
| if re.match(r'^\d+$', line): |
| continue |
| if re.match(r'[\d:,]+ --> [\d:,]+', line): |
| continue |
| if line in ['WEBVTT', 'NOTE']: |
| continue |
| text_parts.append(line) |
| transcript_text = " ".join(text_parts) |
|
|
| |
| transcript_text = re.sub(r'<[^>]+>', '', transcript_text) |
| transcript_text = re.sub(r'\[.*?\]', '', transcript_text) |
| transcript_text = re.sub(r'&', '&', transcript_text) |
| transcript_text = re.sub(r'"', '"', transcript_text) |
| transcript_text = re.sub(r''', "'", transcript_text) |
| transcript_text = re.sub(r'\s+', ' ', transcript_text).strip() |
|
|
| if len(transcript_text) < 50: |
| raise ValueError("Transcript is too short or empty. Try a different video.") |
|
|
| return transcript_text |
|
|
|
|
| def ingest_document(file_path: str, student_id: str = "anonymous"): |
| """ |
| Ingests a PDF document into the vector database. |
| Falls back to OCR (pytesseract) if standard text extraction yields little/no text. |
| """ |
| if not os.path.exists(file_path): |
| raise FileNotFoundError(f"File not found: {file_path}") |
|
|
| user_chroma_dir = _get_user_chroma_dir(student_id) |
|
|
| |
| loader = PyPDFLoader(file_path) |
| docs = loader.load() |
|
|
| |
| docs = [d for d in docs if d.page_content.strip()] |
|
|
| |
| total_text = "".join(d.page_content.strip() for d in docs) |
|
|
| |
| if len(total_text) < 50: |
| logger.info(f"Standard extraction found only {len(total_text)} chars, attempting OCR fallback...") |
| try: |
| from pdf2image import convert_from_path |
| import pytesseract |
|
|
| |
| images = convert_from_path(file_path, dpi=300) |
| ocr_pages = [] |
|
|
| for page_num, image in enumerate(images): |
| page_text = pytesseract.image_to_string(image) |
| if page_text.strip(): |
| ocr_pages.append(Document( |
| page_content=page_text, |
| metadata={"source": file_path, "page": page_num} |
| )) |
|
|
| if ocr_pages: |
| ocr_total = "".join(d.page_content.strip() for d in ocr_pages) |
| if len(ocr_total) < 50: |
| raise ValueError( |
| "Could not extract text even after OCR. " |
| "Please upload a clearer scan." |
| ) |
| docs = ocr_pages |
| logger.info(f"OCR extracted {len(ocr_total)} chars from {len(ocr_pages)} pages") |
| else: |
| raise ValueError( |
| "Could not extract text even after OCR. " |
| "Please upload a clearer scan." |
| ) |
| except ImportError: |
| logger.warning("pytesseract/pdf2image not installed, cannot OCR") |
| raise ValueError( |
| "No readable text found and OCR libraries are not available. " |
| "Please upload a text-based PDF." |
| ) |
| except ValueError: |
| raise |
| except Exception as e: |
| logger.error(f"OCR fallback failed: {e}") |
| raise ValueError( |
| f"OCR processing failed: {str(e)}. " |
| "Please try a clearer scan or a text-based PDF." |
| ) |
|
|
| |
| splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200) |
| splits = splitter.split_documents(docs) |
|
|
| if not splits: |
| raise ValueError( |
| "No readable text found in this PDF. " |
| "It may be a scanned/image-only document." |
| ) |
| |
| |
| Chroma.from_documents( |
| documents=splits, |
| embedding=get_embeddings(), |
| persist_directory=user_chroma_dir |
| ) |
| |
|
|
| def ingest_url(url: str, student_id: str = "anonymous"): |
| """ |
| Ingests content from a web page URL. |
| YouTube transcripts are now handled browser-side via frontend/youtube_transcript.html. |
| """ |
| from langchain_community.document_loaders import WebBaseLoader |
| |
| user_chroma_dir = _get_user_chroma_dir(student_id) |
| |
| try: |
| logger.info(f"Processing web page: {url}") |
| loader = WebBaseLoader(url) |
| loader.requests_kwargs = {'timeout': 30} |
| docs = loader.load() |
| logger.info(f"Successfully loaded {len(docs)} documents") |
| title = docs[0].metadata.get("title", url) if docs else url |
| |
| |
| splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200) |
| splits = splitter.split_documents(docs) |
| |
| if not splits: |
| raise ValueError("No content found to ingest") |
| |
| logger.info(f"Split into {len(splits)} chunks, storing in ChromaDB") |
| |
| |
| Chroma.from_documents( |
| documents=splits, |
| embedding=get_embeddings(), |
| persist_directory=user_chroma_dir |
| ) |
| |
| logger.info(f"Successfully ingested: {title}") |
| return title |
| |
| except ValueError as e: |
| logger.error(f"Validation error: {e}") |
| raise |
| except Exception as e: |
| logger.error(f"Error ingesting URL: {e}") |
| raise ValueError(f"Failed to process URL: {str(e)}") |
|
|
| def ingest_text(text: str, source_name: str, source_type: str = "text", student_id: str = "anonymous"): |
| """ |
| Ingests raw text content into the vector database. |
| Used for browser-fetched YouTube transcripts and other text sources. |
| Reuses the same chunking/embedding pipeline as PDF ingestion. |
| """ |
| if not text or len(text.strip()) < 50: |
| raise ValueError("Text content is too short or empty.") |
|
|
| user_chroma_dir = _get_user_chroma_dir(student_id) |
|
|
| |
| docs = [Document( |
| page_content=text, |
| metadata={ |
| "source": source_name, |
| "type": source_type |
| } |
| )] |
|
|
| |
| splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200) |
| splits = splitter.split_documents(docs) |
|
|
| if not splits: |
| raise ValueError("No content found to ingest after splitting.") |
|
|
| logger.info(f"Split into {len(splits)} chunks, storing in ChromaDB") |
|
|
| |
| Chroma.from_documents( |
| documents=splits, |
| embedding=get_embeddings(), |
| persist_directory=user_chroma_dir |
| ) |
|
|
| logger.info(f"Successfully ingested text: {source_name}") |
| return source_name |
|
|
| def delete_document(source_path: str, student_id: str = "anonymous"): |
| """ |
| Removes a document from the vector database by its source path. |
| """ |
| user_chroma_dir = _get_user_chroma_dir(student_id) |
| vector_store = Chroma( |
| persist_directory=user_chroma_dir, |
| embedding_function=get_embeddings() |
| ) |
| |
| |
| try: |
| |
| vector_store._collection.delete(where={"source": source_path}) |
|
|
| except Exception as e: |
| print(f"Error deleting from ChromaDB: {e}") |
|
|
| |
|
|
|
|
| def generate_study_plan(user_request: str, student_id: str = "anonymous"): |
|
|
| user_chroma_dir = _get_user_chroma_dir(student_id) |
| |
| |
| vector_store = Chroma( |
| persist_directory=user_chroma_dir, |
| embedding_function=get_embeddings() |
| ) |
| llm = get_llm() |
| |
| |
| import re |
| day_match = re.search(r'(\d+)\s*day', user_request.lower()) |
| num_days = int(day_match.group(1)) if day_match else 5 |
| |
| |
| docs = vector_store.similarity_search("topics subjects syllabus overview", k=20) |
| |
| |
| topics_by_source = {} |
| for doc in docs: |
| source = doc.metadata.get("source", "unknown") |
| if source not in topics_by_source: |
| topics_by_source[source] = { |
| "topics": [], |
| "subject_name": None |
| } |
| |
| content = doc.page_content |
| |
| |
| if topics_by_source[source]["subject_name"] is None: |
| |
| first_part = content[:200].upper() |
| if "MANUFACTURING" in first_part: |
| topics_by_source[source]["subject_name"] = "Manufacturing Technology" |
| elif "OOPS" in first_part or "OBJECT" in first_part: |
| topics_by_source[source]["subject_name"] = "Object-Oriented Programming" |
| elif "DATA STRUCT" in first_part: |
| topics_by_source[source]["subject_name"] = "Data Structures" |
| else: |
| |
| filename = source.split('/')[-1].replace('.pdf', '').replace('-', ' ').title() |
| topics_by_source[source]["subject_name"] = filename |
| |
| |
| sentences = content.split('.') |
| for sentence in sentences: |
| sentence = sentence.strip() |
| if len(sentence) > 20 and len(sentence) < 150: |
| |
| if any(kw in sentence.lower() for kw in ['topic', 'chapter', 'module', 'unit', 'concept', 'introduction', 'process', 'method']): |
| topics_by_source[source]["topics"].append(sentence) |
| elif sentence[0].isupper() and len(sentence.split()) > 4: |
| topics_by_source[source]["topics"].append(sentence) |
| |
| |
| for source in topics_by_source: |
| topics_by_source[source]["topics"] = list(dict.fromkeys(topics_by_source[source]["topics"]))[:num_days * 2] |
| |
| |
| all_sources = list(topics_by_source.keys()) |
| num_subjects = len(all_sources) |
|
|
| |
| if num_subjects == 0: |
| |
| return { |
| "days": [ |
| {"day": i, "topic": f"Topic {i}", "details": "Study material", "status": "unlocked" if i == 1 else "locked", "subject": "General", "id": i} |
| for i in range(1, num_days + 1) |
| ] |
| } |
| |
| |
| plan_days = [] |
| topic_id = 1 |
| |
| for day_num in range(1, num_days + 1): |
| |
| for source_idx, source in enumerate(all_sources): |
| subject_name = topics_by_source[source]["subject_name"] |
| source_topics = topics_by_source[source]["topics"] |
| |
| |
| |
| topic_idx = (day_num - 1) % len(source_topics) if source_topics else 0 |
| |
| if source_topics and topic_idx < len(source_topics): |
| topic_text = source_topics[topic_idx] |
| |
| topic_text = topic_text[:100] |
| else: |
| topic_text = f"Concepts and Principles" |
| |
| |
| plan_days.append({ |
| "day": day_num, |
| "id": topic_id, |
| "subject": subject_name, |
| "topic": f"{subject_name}: {topic_text}", |
| "details": f"Study material for {subject_name}", |
| "status": "unlocked" if day_num == 1 else "locked", |
| "quiz_passed": False |
| }) |
| topic_id += 1 |
| |
|
|
| return {"days": plan_days} |
|
|
| def generate_lesson_content(topic_title: str, student_id: str = "anonymous"): |
|
|
| user_chroma_dir = _get_user_chroma_dir(student_id) |
| |
| |
| vector_store = Chroma( |
| persist_directory=user_chroma_dir, |
| embedding_function=get_embeddings() |
| ) |
| llm = get_llm() |
| |
| |
| docs = vector_store.similarity_search(topic_title, k=8) |
| context_text = "\n".join([d.page_content[:500] for d in docs]) |
| |
| |
| sources_list = [] |
| seen_sources = set() |
| for doc in docs[:5]: |
| source_file = doc.metadata.get("source", "Unknown") |
| source_filename = source_file.split("/")[-1] if "/" in source_file else source_file |
| page = doc.metadata.get("page", "N/A") |
| |
| |
| source_key = f"{source_filename}_p{page}" |
| if source_key not in seen_sources: |
| sources_list.append({ |
| "filename": source_filename, |
| "page": page |
| }) |
| seen_sources.add(source_key) |
| |
| |
| sources_text = "\n".join([f"- {src['filename']}, page {src['page']}" for src in sources_list]) |
| |
| |
| prompt = f"""Create a comprehensive study guide for: {topic_title} |
| |
| Context from course materials: |
| {context_text} |
| |
| Available sources: {sources_text} |
| |
| Write a DETAILED study guide in Markdown format with these sections: |
| |
| ## Introduction |
| Explain what this topic is and why it's important (2-3 paragraphs) |
| |
| ## Core Concepts |
| Break down the main ideas into clear subsections. For each concept: |
| - Define it clearly |
| - Explain how it works |
| - Describe when and why to use it |
| |
| ## Key Points & Rules |
| List important formulas, rules, syntax, or principles. Include code examples if applicable. |
| |
| ## Practical Examples |
| Provide 2-3 real-world examples showing: |
| - The problem scenario |
| - How the concept solves it |
| - Step-by-step walkthrough |
| |
| ## Common Mistakes |
| Highlight typical errors students make and how to avoid them |
| |
| ## Summary |
| Quick bullet-point recap of key takeaways |
| |
| IMPORTANT: Add inline citations where appropriate using the format [Source: filename]. |
| Make this comprehensive and educational. Aim for 600-800 words. Use clear explanations a student can understand. |
| |
| Markdown content:""" |
| |
| |
| try: |
| response = llm.invoke(prompt) |
| |
| |
| if hasattr(response, 'content'): |
| response_text = response.content |
| else: |
| response_text = str(response) |
| |
| |
| clean_text = response_text.replace("```markdown", "").replace("```", "").strip() |
| |
| |
| if len(clean_text) < 200: |
| clean_text += "\n\n*Note: For more detailed information, please refer to your course materials or ask specific questions in the chat.*" |
| |
| |
| if sources_list: |
| clean_text += "\n\n---\n\n### 📚 References\n\n" |
| for idx, src in enumerate(sources_list, 1): |
| clean_text += f"{idx}. **{src['filename']}**, page {src['page']}\n" |
| |
| return clean_text |
| except Exception as e: |
| return f"### Error Generating Lesson\nCould not retrieve content: {e}" |
|
|
|
|
| def query_knowledge_base(question: str, history: list = [], student_id: str = "anonymous"): |
|
|
| user_chroma_dir = _get_user_chroma_dir(student_id) |
| |
| |
| vector_store = Chroma( |
| persist_directory=user_chroma_dir, |
| embedding_function=get_embeddings() |
| ) |
| llm = get_llm() |
| |
| |
| docs = vector_store.similarity_search(question, k=3) |
| context = "\n".join([d.page_content[:500] for d in docs]) |
| |
| |
| history_text = "\n".join([f"{msg['role']}: {msg['content']}" for msg in history]) |
| |
| |
| prompt = f""" |
| Context: {context} |
| Chat History: |
| {history_text} |
| |
| User Question: {question} |
| |
| TASK: Answer the user's question based on the context. |
| If you don't know, say "I don't know". |
| """ |
| |
| res = llm.invoke(prompt) |
| |
| |
| if hasattr(res, 'content'): |
| answer_text = res.content |
| else: |
| answer_text = str(res) |
| |
| |
| sources_list = [] |
| for d in docs: |
| meta = d.metadata |
| sources_list.append({"source": meta.get("source", "Unknown"), "page": meta.get("page", 1)}) |
| |
| return { |
| "answer": answer_text, |
| "sources": sources_list |
| } |
| def generate_quiz_data(topic_title: str, student_id: str = "anonymous"): |
|
|
| user_chroma_dir = _get_user_chroma_dir(student_id) |
| |
| |
| vector_store = Chroma( |
| persist_directory=user_chroma_dir, |
| embedding_function=get_embeddings() |
| ) |
| llm = get_llm() |
|
|
| |
| docs = vector_store.similarity_search(topic_title, k=3) |
| context_text = "\n".join([d.page_content[:300] for d in docs]) |
| |
| |
| def create_context_based_fallback(): |
| """Generate realistic quiz questions from context when LLM fails""" |
| |
| sentences = context_text.split('.') |
| key_concepts = [] |
| for sentence in sentences[:10]: |
| words = sentence.strip().split() |
| if len(words) > 3: |
| key_concepts.append(sentence.strip()) |
| |
| if not key_concepts or len(key_concepts) < 3: |
| |
| return [ |
| { |
| "question": f"Which statement best describes {topic_title}?", |
| "options": [ |
| "A core concept that requires understanding of fundamentals", |
| "An advanced technique used in specialized applications", |
| "A theoretical framework with practical implementations" |
| ], |
| "answer": "A core concept that requires understanding of fundamentals" |
| }, |
| { |
| "question": f"What is the primary purpose of {topic_title}?", |
| "options": [ |
| "To optimize performance and efficiency", |
| "To provide structure and organization", |
| "To enable complex problem solving" |
| ], |
| "answer": "To provide structure and organization" |
| }, |
| { |
| "question": f"When should you apply {topic_title}?", |
| "options": [ |
| "When dealing with large-scale systems", |
| "During the initial design phase", |
| "When specific requirements are identified" |
| ], |
| "answer": "When specific requirements are identified" |
| } |
| ] |
| |
| |
| fallback_quiz = [] |
| for i, concept in enumerate(key_concepts[:3]): |
| |
| words = concept.split() |
| if len(words) > 5: |
| |
| correct_answer = ' '.join(words[:15]) |
| distractor1 = ' '.join(words[2:10] + words[:2]) if len(words) > 10 else "Alternative interpretation of the concept" |
| distractor2 = ' '.join(words[5:15]) if len(words) > 15 else "Related but distinct concept" |
| |
| fallback_quiz.append({ |
| "question": f"Regarding {topic_title}, which statement is most accurate?", |
| "options": [correct_answer, distractor1, distractor2], |
| "answer": correct_answer |
| }) |
| |
| while len(fallback_quiz) < 3: |
| fallback_quiz.append({ |
| "question": f"What is an important aspect of {topic_title}?", |
| "options": [ |
| "Understanding the underlying principles", |
| "Memorizing specific implementation details", |
| "Following standard industry practices" |
| ], |
| "answer": "Understanding the underlying principles" |
| }) |
| |
| return fallback_quiz[:3] |
| |
| |
| prompt = f"""Create 3 challenging multiple choice questions about: {topic_title} |
| |
| Context: {context_text} |
| |
| CRITICAL REQUIREMENTS for answer choices: |
| 1. Make wrong answers (distractors) PLAUSIBLE and REALISTIC |
| 2. Use common misconceptions as wrong answers |
| 3. Make distractors similar enough that students need real understanding to choose correctly |
| 4. Avoid obviously wrong or silly options like "Option A", "Option B" |
| 5. Base all options on the actual context provided |
| |
| Example of GOOD distractors (realistic and plausible): |
| Q: "What is encapsulation in OOP?" |
| - "Hiding implementation details and exposing only necessary interfaces" [CORRECT] |
| - "Combining data and methods that operate on that data into a single unit" [PLAUSIBLE - related to OOP but describes a class] |
| - "The ability of objects to take multiple forms through inheritance" [PLAUSIBLE - actually polymorphism] |
| |
| Example of BAD distractors (too obvious): |
| - "A type of loop" |
| - "Option A" |
| - "None of the above" |
| |
| Output as JSON array with 3 questions: |
| [ |
| {{ |
| "question": "Specific question text?", |
| "options": ["Realistic wrong answer 1", "Correct answer", "Realistic wrong answer 2"], |
| "answer": "Correct answer" |
| }}, |
| ... (2 more questions) |
| ] |
| |
| JSON:""" |
| |
| try: |
| response = llm.invoke(prompt) |
| |
| |
| if hasattr(response, 'content'): |
| response_text = response.content |
| else: |
| response_text = str(response) |
| |
| clean_json = response_text.replace("```json", "").replace("```", "").strip() |
| import json |
| quiz_data = json.loads(clean_json) |
| |
| |
| if not isinstance(quiz_data, list): |
| raise ValueError("Quiz data must be a list") |
| |
| |
| if len(quiz_data) < 3: |
|
|
| context_fallback = create_context_based_fallback() |
| |
| questions_needed = 3 - len(quiz_data) |
| quiz_data.extend(context_fallback[:questions_needed]) |
| elif len(quiz_data) > 3: |
| quiz_data = quiz_data[:3] |
| |
| return quiz_data |
| |
| except Exception as e: |
|
|
| |
| return create_context_based_fallback() |