import os import json import hashlib import unicodedata import re from typing import List from langchain_community.document_loaders import PDFPlumberLoader from langchain_text_splitters import RecursiveCharacterTextSplitter from langchain_chroma import Chroma from langchain_huggingface import HuggingFaceEmbeddings from langchain_google_genai import ChatGoogleGenerativeAI from langchain_core.messages import AIMessage from dotenv import load_dotenv # Load environment variables load_dotenv() # Configuration DATA_DIR = "data" CHROMA_PATH = "chroma_db" TRACKING_FILE = "ingested_files.json" def clean_text(text): text = unicodedata.normalize("NFKC", text) text = text.replace("\ufffd", "'").replace("\u2019", "'").replace("\u2018", "'") text = "".join(c for c in text if not unicodedata.category(c).startswith("C")) text = re.sub(r'\s+', ' ', text) return text.strip() def get_file_hash(file_path): hasher = hashlib.md5() with open(file_path, "rb") as f: buf = f.read() hasher.update(buf) return hasher.hexdigest() def load_tracking(): if os.path.exists(TRACKING_FILE): try: with open(TRACKING_FILE, "r") as f: return json.load(f) except: return {} return {} def save_tracking(tracking_data): with open(TRACKING_FILE, "w") as f: json.dump(tracking_data, f, indent=4) def get_text_content(content): if isinstance(content, str): return content elif isinstance(content, list): return "".join([part.get("text", "") for part in content if isinstance(part, dict) and "text" in part]) return str(content) def extract_consolidated_topics(texts: List[str], grade: str, subject: str): """Use Gemini to extract a unified set of topics for a whole subject.""" if not texts: return "General" llm = ChatGoogleGenerativeAI(model="gemini-3.1-flash-lite", google_api_key=os.getenv("GOOGLE_API_KEY"), temperature=0.2) combined_text = "\n---\n".join([t[:1000] for t in texts]) # Sample from each PDF prompt = f"""You are a curriculum expert. Analyze these samples from {grade} {subject} educational materials. Identify the distinct, major educational topics covered across these documents. IMPORTANT: Do not over-summarize. If there are diverse topics like 'Circuits', 'Solar System', and 'Photosynthesis', you MUST list each one separately. Acknowledge the variety in the curriculum. Format: Comma-separated list (12-18 topics). Keep them concise (1-3 words each). Return only the keywords. Texts: {combined_text[:7000]} Unique Topics:""" try: response = llm.invoke(prompt) content = get_text_content(response.content) # Clean up common AI conversational prefix if ":" in content and len(content.split(":")[0]) < 20: content = content.split(":", 1)[1] return content.strip() except Exception as e: print(f" Error extracting topics: {e}") return "General" def process_new_files(): embeddings = HuggingFaceEmbeddings(model_name="all-MiniLM-L6-v2") vector_store = Chroma(collection_name="socratic_knowledge", embedding_function=embeddings, persist_directory=CHROMA_PATH) tracking_data = load_tracking() print(f"Scanning {DATA_DIR} for new educational content...") groups = {} for root, dirs, files in os.walk(DATA_DIR): rel_dir_path = os.path.relpath(root, DATA_DIR) if rel_dir_path == ".": continue for file in files: if file.endswith(".pdf"): full_path = os.path.join(root, file) rel_path = os.path.relpath(full_path, DATA_DIR) if rel_path in tracking_data and tracking_data[rel_path] == get_file_hash(full_path): continue parts = rel_path.split(os.sep) if len(parts) >= 2: grade, subject = parts[0], parts[1] key = (grade, subject) if key not in groups: groups[key] = [] groups[key].append(full_path) if not groups: return "No new files to process." for (grade, subject), file_paths in groups.items(): print(f"\nProcessing {grade} - {subject}...") all_subject_pages = [] samples = [] for fp in file_paths: print(f" Loading {os.path.basename(fp)}...") try: loader = PDFPlumberLoader(fp) pages = loader.load() for i, p in enumerate(pages): p.page_content = clean_text(p.page_content) p.metadata["grade"] = grade p.metadata["subject"] = subject p.metadata["source"] = os.path.basename(fp) # Sample more broadly (every 5th page) to catch topics like Solar System if i % 5 == 0: samples.append(p.page_content[:1500]) all_subject_pages.extend(pages) except Exception as e: print(f" Error loading {fp}: {e}") consolidated_topics = extract_consolidated_topics(samples, grade, subject) for page in all_subject_pages: page.metadata["topics"] = consolidated_topics if all_subject_pages: # Normal fast ingestion (no rate limits with local embeddings) text_splitter = RecursiveCharacterTextSplitter(chunk_size=750, chunk_overlap=75) chunks = text_splitter.split_documents(all_subject_pages) vector_store.add_documents(chunks) for fp in file_paths: tracking_data[os.path.relpath(fp, DATA_DIR)] = get_file_hash(fp) save_tracking(tracking_data) return "Ingestion complete." if __name__ == "__main__": process_new_files()