Spaces:
Sleeping
Sleeping
| import os | |
| import json | |
| import hashlib | |
| import unicodedata | |
| import re | |
| from typing import List | |
| from langchain_community.document_loaders import PDFPlumberLoader | |
| from langchain_text_splitters import RecursiveCharacterTextSplitter | |
| from langchain_chroma import Chroma | |
| from langchain_huggingface import HuggingFaceEmbeddings | |
| from langchain_google_genai import ChatGoogleGenerativeAI | |
| from langchain_core.messages import AIMessage | |
| from dotenv import load_dotenv | |
| # Load environment variables | |
| load_dotenv() | |
| # Configuration | |
| DATA_DIR = "data" | |
| CHROMA_PATH = "chroma_db" | |
| TRACKING_FILE = "ingested_files.json" | |
| def clean_text(text): | |
| text = unicodedata.normalize("NFKC", text) | |
| text = text.replace("\ufffd", "'").replace("\u2019", "'").replace("\u2018", "'") | |
| text = "".join(c for c in text if not unicodedata.category(c).startswith("C")) | |
| text = re.sub(r'\s+', ' ', text) | |
| return text.strip() | |
| def get_file_hash(file_path): | |
| hasher = hashlib.md5() | |
| with open(file_path, "rb") as f: | |
| buf = f.read() | |
| hasher.update(buf) | |
| return hasher.hexdigest() | |
| def load_tracking(): | |
| if os.path.exists(TRACKING_FILE): | |
| try: | |
| with open(TRACKING_FILE, "r") as f: return json.load(f) | |
| except: return {} | |
| return {} | |
| def save_tracking(tracking_data): | |
| with open(TRACKING_FILE, "w") as f: json.dump(tracking_data, f, indent=4) | |
| def get_text_content(content): | |
| if isinstance(content, str): return content | |
| elif isinstance(content, list): | |
| return "".join([part.get("text", "") for part in content if isinstance(part, dict) and "text" in part]) | |
| return str(content) | |
| def extract_consolidated_topics(texts: List[str], grade: str, subject: str): | |
| """Use Gemini to extract a unified set of topics for a whole subject.""" | |
| if not texts: return "General" | |
| llm = ChatGoogleGenerativeAI(model="gemini-3.1-flash-lite", google_api_key=os.getenv("GOOGLE_API_KEY"), temperature=0.2) | |
| combined_text = "\n---\n".join([t[:1000] for t in texts]) # Sample from each PDF | |
| prompt = f"""You are a curriculum expert. Analyze these samples from {grade} {subject} educational materials. | |
| Identify the distinct, major educational topics covered across these documents. | |
| IMPORTANT: Do not over-summarize. If there are diverse topics like 'Circuits', 'Solar System', and 'Photosynthesis', you MUST list each one separately. | |
| Acknowledge the variety in the curriculum. | |
| Format: Comma-separated list (12-18 topics). | |
| Keep them concise (1-3 words each). Return only the keywords. | |
| Texts: {combined_text[:7000]} | |
| Unique Topics:""" | |
| try: | |
| response = llm.invoke(prompt) | |
| content = get_text_content(response.content) | |
| # Clean up common AI conversational prefix | |
| if ":" in content and len(content.split(":")[0]) < 20: | |
| content = content.split(":", 1)[1] | |
| return content.strip() | |
| except Exception as e: | |
| print(f" Error extracting topics: {e}") | |
| return "General" | |
| def process_new_files(): | |
| embeddings = HuggingFaceEmbeddings(model_name="all-MiniLM-L6-v2") | |
| vector_store = Chroma(collection_name="socratic_knowledge", embedding_function=embeddings, persist_directory=CHROMA_PATH) | |
| tracking_data = load_tracking() | |
| print(f"Scanning {DATA_DIR} for new educational content...") | |
| groups = {} | |
| for root, dirs, files in os.walk(DATA_DIR): | |
| rel_dir_path = os.path.relpath(root, DATA_DIR) | |
| if rel_dir_path == ".": continue | |
| for file in files: | |
| if file.endswith(".pdf"): | |
| full_path = os.path.join(root, file) | |
| rel_path = os.path.relpath(full_path, DATA_DIR) | |
| if rel_path in tracking_data and tracking_data[rel_path] == get_file_hash(full_path): | |
| continue | |
| parts = rel_path.split(os.sep) | |
| if len(parts) >= 2: | |
| grade, subject = parts[0], parts[1] | |
| key = (grade, subject) | |
| if key not in groups: groups[key] = [] | |
| groups[key].append(full_path) | |
| if not groups: | |
| return "No new files to process." | |
| for (grade, subject), file_paths in groups.items(): | |
| print(f"\nProcessing {grade} - {subject}...") | |
| all_subject_pages = [] | |
| samples = [] | |
| for fp in file_paths: | |
| print(f" Loading {os.path.basename(fp)}...") | |
| try: | |
| loader = PDFPlumberLoader(fp) | |
| pages = loader.load() | |
| for i, p in enumerate(pages): | |
| p.page_content = clean_text(p.page_content) | |
| p.metadata["grade"] = grade | |
| p.metadata["subject"] = subject | |
| p.metadata["source"] = os.path.basename(fp) | |
| # Sample more broadly (every 5th page) to catch topics like Solar System | |
| if i % 5 == 0: | |
| samples.append(p.page_content[:1500]) | |
| all_subject_pages.extend(pages) | |
| except Exception as e: | |
| print(f" Error loading {fp}: {e}") | |
| consolidated_topics = extract_consolidated_topics(samples, grade, subject) | |
| for page in all_subject_pages: | |
| page.metadata["topics"] = consolidated_topics | |
| if all_subject_pages: | |
| # Normal fast ingestion (no rate limits with local embeddings) | |
| text_splitter = RecursiveCharacterTextSplitter(chunk_size=750, chunk_overlap=75) | |
| chunks = text_splitter.split_documents(all_subject_pages) | |
| vector_store.add_documents(chunks) | |
| for fp in file_paths: | |
| tracking_data[os.path.relpath(fp, DATA_DIR)] = get_file_hash(fp) | |
| save_tracking(tracking_data) | |
| return "Ingestion complete." | |
| if __name__ == "__main__": | |
| process_new_files() | |