SocraticAI / ingest.py
Deployer
Initial deployment commit with Git LFS tracking
a10a6c0
Raw
History Blame Contribute Delete
5.92 kB
import os
import json
import hashlib
import unicodedata
import re
from typing import List
from langchain_community.document_loaders import PDFPlumberLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_chroma import Chroma
from langchain_huggingface import HuggingFaceEmbeddings
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain_core.messages import AIMessage
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
# Configuration
DATA_DIR = "data"
CHROMA_PATH = "chroma_db"
TRACKING_FILE = "ingested_files.json"
def clean_text(text):
text = unicodedata.normalize("NFKC", text)
text = text.replace("\ufffd", "'").replace("\u2019", "'").replace("\u2018", "'")
text = "".join(c for c in text if not unicodedata.category(c).startswith("C"))
text = re.sub(r'\s+', ' ', text)
return text.strip()
def get_file_hash(file_path):
hasher = hashlib.md5()
with open(file_path, "rb") as f:
buf = f.read()
hasher.update(buf)
return hasher.hexdigest()
def load_tracking():
if os.path.exists(TRACKING_FILE):
try:
with open(TRACKING_FILE, "r") as f: return json.load(f)
except: return {}
return {}
def save_tracking(tracking_data):
with open(TRACKING_FILE, "w") as f: json.dump(tracking_data, f, indent=4)
def get_text_content(content):
if isinstance(content, str): return content
elif isinstance(content, list):
return "".join([part.get("text", "") for part in content if isinstance(part, dict) and "text" in part])
return str(content)
def extract_consolidated_topics(texts: List[str], grade: str, subject: str):
"""Use Gemini to extract a unified set of topics for a whole subject."""
if not texts: return "General"
llm = ChatGoogleGenerativeAI(model="gemini-3.1-flash-lite", google_api_key=os.getenv("GOOGLE_API_KEY"), temperature=0.2)
combined_text = "\n---\n".join([t[:1000] for t in texts]) # Sample from each PDF
prompt = f"""You are a curriculum expert. Analyze these samples from {grade} {subject} educational materials.
Identify the distinct, major educational topics covered across these documents.
IMPORTANT: Do not over-summarize. If there are diverse topics like 'Circuits', 'Solar System', and 'Photosynthesis', you MUST list each one separately.
Acknowledge the variety in the curriculum.
Format: Comma-separated list (12-18 topics).
Keep them concise (1-3 words each). Return only the keywords.
Texts: {combined_text[:7000]}
Unique Topics:"""
try:
response = llm.invoke(prompt)
content = get_text_content(response.content)
# Clean up common AI conversational prefix
if ":" in content and len(content.split(":")[0]) < 20:
content = content.split(":", 1)[1]
return content.strip()
except Exception as e:
print(f" Error extracting topics: {e}")
return "General"
def process_new_files():
embeddings = HuggingFaceEmbeddings(model_name="all-MiniLM-L6-v2")
vector_store = Chroma(collection_name="socratic_knowledge", embedding_function=embeddings, persist_directory=CHROMA_PATH)
tracking_data = load_tracking()
print(f"Scanning {DATA_DIR} for new educational content...")
groups = {}
for root, dirs, files in os.walk(DATA_DIR):
rel_dir_path = os.path.relpath(root, DATA_DIR)
if rel_dir_path == ".": continue
for file in files:
if file.endswith(".pdf"):
full_path = os.path.join(root, file)
rel_path = os.path.relpath(full_path, DATA_DIR)
if rel_path in tracking_data and tracking_data[rel_path] == get_file_hash(full_path):
continue
parts = rel_path.split(os.sep)
if len(parts) >= 2:
grade, subject = parts[0], parts[1]
key = (grade, subject)
if key not in groups: groups[key] = []
groups[key].append(full_path)
if not groups:
return "No new files to process."
for (grade, subject), file_paths in groups.items():
print(f"\nProcessing {grade} - {subject}...")
all_subject_pages = []
samples = []
for fp in file_paths:
print(f" Loading {os.path.basename(fp)}...")
try:
loader = PDFPlumberLoader(fp)
pages = loader.load()
for i, p in enumerate(pages):
p.page_content = clean_text(p.page_content)
p.metadata["grade"] = grade
p.metadata["subject"] = subject
p.metadata["source"] = os.path.basename(fp)
# Sample more broadly (every 5th page) to catch topics like Solar System
if i % 5 == 0:
samples.append(p.page_content[:1500])
all_subject_pages.extend(pages)
except Exception as e:
print(f" Error loading {fp}: {e}")
consolidated_topics = extract_consolidated_topics(samples, grade, subject)
for page in all_subject_pages:
page.metadata["topics"] = consolidated_topics
if all_subject_pages:
# Normal fast ingestion (no rate limits with local embeddings)
text_splitter = RecursiveCharacterTextSplitter(chunk_size=750, chunk_overlap=75)
chunks = text_splitter.split_documents(all_subject_pages)
vector_store.add_documents(chunks)
for fp in file_paths:
tracking_data[os.path.relpath(fp, DATA_DIR)] = get_file_hash(fp)
save_tracking(tracking_data)
return "Ingestion complete."
if __name__ == "__main__":
process_new_files()