Spaces:
Sleeping
Sleeping
File size: 5,923 Bytes
a10a6c0 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 | import os
import json
import hashlib
import unicodedata
import re
from typing import List
from langchain_community.document_loaders import PDFPlumberLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_chroma import Chroma
from langchain_huggingface import HuggingFaceEmbeddings
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain_core.messages import AIMessage
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
# Configuration
DATA_DIR = "data"
CHROMA_PATH = "chroma_db"
TRACKING_FILE = "ingested_files.json"
def clean_text(text):
text = unicodedata.normalize("NFKC", text)
text = text.replace("\ufffd", "'").replace("\u2019", "'").replace("\u2018", "'")
text = "".join(c for c in text if not unicodedata.category(c).startswith("C"))
text = re.sub(r'\s+', ' ', text)
return text.strip()
def get_file_hash(file_path):
hasher = hashlib.md5()
with open(file_path, "rb") as f:
buf = f.read()
hasher.update(buf)
return hasher.hexdigest()
def load_tracking():
if os.path.exists(TRACKING_FILE):
try:
with open(TRACKING_FILE, "r") as f: return json.load(f)
except: return {}
return {}
def save_tracking(tracking_data):
with open(TRACKING_FILE, "w") as f: json.dump(tracking_data, f, indent=4)
def get_text_content(content):
if isinstance(content, str): return content
elif isinstance(content, list):
return "".join([part.get("text", "") for part in content if isinstance(part, dict) and "text" in part])
return str(content)
def extract_consolidated_topics(texts: List[str], grade: str, subject: str):
"""Use Gemini to extract a unified set of topics for a whole subject."""
if not texts: return "General"
llm = ChatGoogleGenerativeAI(model="gemini-3.1-flash-lite", google_api_key=os.getenv("GOOGLE_API_KEY"), temperature=0.2)
combined_text = "\n---\n".join([t[:1000] for t in texts]) # Sample from each PDF
prompt = f"""You are a curriculum expert. Analyze these samples from {grade} {subject} educational materials.
Identify the distinct, major educational topics covered across these documents.
IMPORTANT: Do not over-summarize. If there are diverse topics like 'Circuits', 'Solar System', and 'Photosynthesis', you MUST list each one separately.
Acknowledge the variety in the curriculum.
Format: Comma-separated list (12-18 topics).
Keep them concise (1-3 words each). Return only the keywords.
Texts: {combined_text[:7000]}
Unique Topics:"""
try:
response = llm.invoke(prompt)
content = get_text_content(response.content)
# Clean up common AI conversational prefix
if ":" in content and len(content.split(":")[0]) < 20:
content = content.split(":", 1)[1]
return content.strip()
except Exception as e:
print(f" Error extracting topics: {e}")
return "General"
def process_new_files():
embeddings = HuggingFaceEmbeddings(model_name="all-MiniLM-L6-v2")
vector_store = Chroma(collection_name="socratic_knowledge", embedding_function=embeddings, persist_directory=CHROMA_PATH)
tracking_data = load_tracking()
print(f"Scanning {DATA_DIR} for new educational content...")
groups = {}
for root, dirs, files in os.walk(DATA_DIR):
rel_dir_path = os.path.relpath(root, DATA_DIR)
if rel_dir_path == ".": continue
for file in files:
if file.endswith(".pdf"):
full_path = os.path.join(root, file)
rel_path = os.path.relpath(full_path, DATA_DIR)
if rel_path in tracking_data and tracking_data[rel_path] == get_file_hash(full_path):
continue
parts = rel_path.split(os.sep)
if len(parts) >= 2:
grade, subject = parts[0], parts[1]
key = (grade, subject)
if key not in groups: groups[key] = []
groups[key].append(full_path)
if not groups:
return "No new files to process."
for (grade, subject), file_paths in groups.items():
print(f"\nProcessing {grade} - {subject}...")
all_subject_pages = []
samples = []
for fp in file_paths:
print(f" Loading {os.path.basename(fp)}...")
try:
loader = PDFPlumberLoader(fp)
pages = loader.load()
for i, p in enumerate(pages):
p.page_content = clean_text(p.page_content)
p.metadata["grade"] = grade
p.metadata["subject"] = subject
p.metadata["source"] = os.path.basename(fp)
# Sample more broadly (every 5th page) to catch topics like Solar System
if i % 5 == 0:
samples.append(p.page_content[:1500])
all_subject_pages.extend(pages)
except Exception as e:
print(f" Error loading {fp}: {e}")
consolidated_topics = extract_consolidated_topics(samples, grade, subject)
for page in all_subject_pages:
page.metadata["topics"] = consolidated_topics
if all_subject_pages:
# Normal fast ingestion (no rate limits with local embeddings)
text_splitter = RecursiveCharacterTextSplitter(chunk_size=750, chunk_overlap=75)
chunks = text_splitter.split_documents(all_subject_pages)
vector_store.add_documents(chunks)
for fp in file_paths:
tracking_data[os.path.relpath(fp, DATA_DIR)] = get_file_hash(fp)
save_tracking(tracking_data)
return "Ingestion complete."
if __name__ == "__main__":
process_new_files()
|