optima / src /vectorization.py
wicaksonolm2's picture
[30.06.25] wicaksono-tmr | ✨ feat : ""
8b763c3
import os
import json
from typing import List, Dict, Any, Optional
from datetime import datetime
from dotenv import load_dotenv
from pathlib import Path
# LangChain imports
from langchain.embeddings import OpenAIEmbeddings
from langchain.vectorstores import Chroma
from langchain.schema import Document
load_dotenv()
class LangChainMultimodalVectorizer:
def __init__(self):
self.embeddings = OpenAIEmbeddings(
# openai_api_key=os.getenv("OPENAI_API_KEY"),
# model=os.getenv("OPENAI_EMBEDDING_MODEL", "text-embedding-ada-002")
)
# self.persist_dir = os.getenv("CHROMA_PERSIST_DIR", "./chroma_persist")
def get_or_create_vectorstore(self, year: int) -> Chroma:
"""Get or create Chroma vectorstore for specific year"""
collection_name = f"optima_multimodal_{year}"
# Create persist directory for this year
year_persist_dir = os.path.join(self.persist_dir, f"year_{year}")
os.makedirs(year_persist_dir, exist_ok=True)
try:
# Try to load existing vectorstore
vectorstore = Chroma(
collection_name=collection_name,
embedding_function=self.embeddings,
persist_directory=year_persist_dir
)
# Check if collection exists and has documents
if vectorstore._collection.count() > 0:
print(f"πŸ“š Using existing vectorstore: {collection_name} ({vectorstore._collection.count()} docs)")
else:
print(f"πŸ†• Created new vectorstore: {collection_name}")
except Exception as e:
print(f"πŸ†• Creating new vectorstore: {collection_name}")
vectorstore = Chroma(
collection_name=collection_name,
embedding_function=self.embeddings,
persist_directory=year_persist_dir
)
return vectorstore
def create_embedding_text(self, item: Dict[str, Any]) -> str:
"""Create optimized text for embedding based on content_type"""
content_type = item.get("content_type", "")
content = item.get("content", "")
context_text = item.get("context_text", "")
# Create rich embedding text based on content_type
if content_type == "silabus":
mata_kuliah = item.get("mata_kuliah", "")
course_code = item.get("course_code", "")
silabus_type = item.get("silabus_type", "")
program = item.get("program", "")
semester = item.get("semester", "")
embedding_text = f"Silabus {program} semester {semester} {mata_kuliah} {course_code} {silabus_type}: {content} {context_text}"
elif content_type == "curriculum":
program = item.get("program", "")
semester = item.get("semester", "")
table_type = item.get("table_type", "")
embedding_text = f"Kurikulum {program} semester {semester} {table_type}: {content} {context_text}"
elif content_type == "image":
title = item.get("title", "")
caption = item.get("caption", "")
embedding_text = f"Gambar: {title} {caption} {content} {context_text}"
elif content_type == "table":
title = item.get("title", "")
caption = item.get("caption", "")
rows = item.get("rows", 0)
cols = item.get("cols", 0)
embedding_text = f"Tabel {rows}x{cols}: {title} {caption} {content} {context_text}"
else: # text_chunk
chapter = item.get("chapter", "")
section = item.get("section", "")
embedding_text = f"Teks {chapter} {section}: {content} {context_text}"
return embedding_text
def prepare_document_metadata(self, item: Dict[str, Any]) -> Dict[str, Any]:
"""Prepare metadata for LangChain Document"""
content_type = item.get("content_type", "")
# Base metadata (common for all types)
metadata = {
"id": item.get("id", ""),
"content_type": content_type,
"year": item.get("year", 0),
"page": item.get("page", 0),
"filename": item.get("filename", "")[:200],
"filepath": item.get("filepath", "")[:300],
"extracted_at": item.get("extracted_at", "")
}
# Add specific metadata based on content_type
if content_type == "silabus":
metadata.update({
"mata_kuliah": item.get("mata_kuliah", "")[:200],
"course_code": item.get("course_code", ""),
"sks": item.get("sks", ""),
"program": item.get("program", ""),
"semester": item.get("semester", ""),
"silabus_type": item.get("silabus_type", "")
})
elif content_type == "curriculum":
metadata.update({
"program": item.get("program", ""),
"semester": item.get("semester", ""),
"table_type": item.get("table_type", ""),
"content_type_detail": item.get("content_type_detail", ""),
"rows_count": item.get("rows_count", 0)
})
elif content_type == "image":
metadata.update({
"title": item.get("title", "")[:200],
"caption": item.get("caption", "")[:300],
"image_index": item.get("image_index", 0),
"image_path": item.get("filepath", "")
})
elif content_type == "table":
metadata.update({
"title": item.get("title", "")[:200],
"caption": item.get("caption", "")[:300],
"table_index": item.get("table_index", 0),
"rows": item.get("rows", 0),
"cols": item.get("cols", 0),
"table_path": item.get("filepath", "")
})
else: # text_chunk
metadata.update({
"chapter": item.get("chapter", "")[:200],
"section": item.get("section", "")[:200],
"subsection": item.get("subsection", "")[:200],
"chunk_type": item.get("chunk_type", ""),
"quality_score": item.get("quality_score", 0.0)
})
return metadata
def process_unified_json(self, json_file_path: str, year: int) -> Dict[str, int]:
"""Process unified multimodal JSON file using LangChain"""
if not os.path.exists(json_file_path):
print(f"❌ File not found: {json_file_path}")
return {}
print(f"πŸ”„ Processing: {json_file_path}")
with open(json_file_path, 'r', encoding='utf-8') as f:
raw_data = json.load(f)
# πŸ”§ Handle different JSON structures
if isinstance(raw_data, dict):
if 'content' in raw_data:
data = raw_data['content'] # Extract from content array
print(f"πŸ“¦ Detected structured JSON with 'content' key")
else:
print(f"❌ Unexpected JSON structure: {list(raw_data.keys())}")
return {}
elif isinstance(raw_data, list):
data = raw_data # Direct array
print(f"πŸ“¦ Detected direct array JSON")
else:
print(f"❌ Unexpected JSON type: {type(raw_data)}")
return {}
# Get vectorstore for this year
vectorstore = self.get_or_create_vectorstore(year)
# Statistics
stats = {
"text_chunk": 0,
"image": 0,
"table": 0,
"curriculum": 0,
"silabus": 0,
"total": 0,
"errors": 0,
"skipped": 0
}
print(f"πŸ“Š Found {len(data)} items for year {year}")
# Prepare documents for batch processing
documents = []
batch_size = 50
for idx, item in enumerate(data):
try:
# πŸ”§ Ensure item is dict
if not isinstance(item, dict):
print(f"⚠️ Skipping non-dict item at index {idx}: {type(item)}")
stats["skipped"] += 1
continue
content_type = item.get("content_type", "unknown")
content = item.get("content", "")
context_text = item.get("context_text", "")
# Skip if no meaningful content
if not content and not context_text:
stats["skipped"] += 1
continue
if len(str(content).strip()) < 3 and len(str(context_text).strip()) < 10:
stats["skipped"] += 1
continue
# Create embedding text
embedding_text = self.create_embedding_text(item)
# Prepare metadata
metadata = self.prepare_document_metadata(item)
# Create LangChain Document
doc = Document(
page_content=embedding_text,
metadata=metadata
)
documents.append(doc)
# Update stats
if content_type in stats:
stats[content_type] += 1
else:
stats["unknown"] = stats.get("unknown", 0) + 1
stats["total"] += 1
# Process batch when full
if len(documents) >= batch_size:
self.add_documents_to_vectorstore(vectorstore, documents)
print(f" βœ… Processed batch {stats['total']//batch_size} ({stats['total']} items)")
documents = [] # Reset batch
except Exception as e:
print(f"❌ Error processing item {idx}: {e}")
print(f" Item type: {type(item)}")
if isinstance(item, dict):
print(f" Item keys: {list(item.keys())[:5]}...")
else:
print(f" Item content preview: {str(item)[:100]}...")
stats["errors"] += 1
# Process remaining documents
if documents:
self.add_documents_to_vectorstore(vectorstore, documents)
# Persist the vectorstore
vectorstore.persist()
print(f"πŸ“Š Processing complete for year {year}:")
for key, value in stats.items():
if value > 0:
print(f" πŸ“ {key}: {value}")
return stats
def add_documents_to_vectorstore(self, vectorstore: Chroma, documents: List[Document]):
"""Add documents to vectorstore"""
try:
vectorstore.add_documents(documents)
except Exception as e:
print(f"❌ Error adding documents to vectorstore: {e}")
def query_multimodal(self, query_text: str, year: Optional[int] = None,
content_types: Optional[List[str]] = None,
n_results: int = 10) -> List[Dict]:
results = []
years_to_search = [year] if year else [2022, 2023, 2024]
for search_year in years_to_search:
try:
vectorstore = self.get_or_create_vectorstore(search_year)
# Build filter for content types
search_kwargs = {"k": n_results}
if content_types:
search_kwargs["filter"] = {"content_type": {"$in": content_types}}
# Perform similarity search
docs = vectorstore.similarity_search_with_score(
query_text,
k=n_results,
filter=search_kwargs.get("filter")
)
# Format results
for doc, score in docs:
result = {
"content": doc.page_content,
"metadata": doc.metadata,
"score": score,
"year": search_year
}
# Add special handling for images
if result["metadata"]["content_type"] == "image":
result["image_path"] = result["metadata"].get("image_path", "")
result["retrievable"] = os.path.exists(result["image_path"]) if result["image_path"] else False
# Add special handling for tables
elif result["metadata"]["content_type"] == "table":
result["table_path"] = result["metadata"].get("table_path", "")
result["retrievable"] = os.path.exists(result["table_path"]) if result["table_path"] else False
results.append(result)
except Exception as e:
print(f"❌ Error querying year {search_year}: {e}")
# Sort by score (lower is better for distance-based scoring)
results.sort(key=lambda x: x["score"])
return results[:n_results]
def get_vectorstore_stats(self, year: int) -> Dict:
"""Get statistics for a vectorstore"""
try:
vectorstore = self.get_or_create_vectorstore(year)
count = vectorstore._collection.count()
return {
"year": year,
"total_documents": count,
"collection_name": f"optima_multimodal_{year}"
}
except Exception as e:
print(f"❌ Error getting stats for year {year}: {e}")
return {"year": year, "total_documents": 0, "error": str(e)}
def process_all_unified_files(data_dir: str = "./chunked"):
vectorizer = LangChainMultimodalVectorizer()
years = [2022, 2023, 2024]
total_stats = {"total": 0, "errors": 0}
for year in years:
json_file = os.path.join(data_dir, f"multimodal_unified_{year}.json")
if not os.path.exists(json_file):
print(f"⚠️ File not found: {json_file}")
continue
print(f"\nπŸ”„ Processing year {year}...")
stats = vectorizer.process_unified_json(json_file, year)
if stats:
print(f"πŸ“Š Year {year} Final Statistics:")
for content_type, count in stats.items():
print(f" πŸ“ {content_type}: {count}")
total_stats["total"] += stats.get("total", 0)
total_stats["errors"] += stats.get("errors", 0)
print(f"\nπŸŽ‰ FINAL PROCESSING SUMMARY:")
print(f" 🎯 Total documents processed: {total_stats['total']}")
print(f" ❌ Total errors: {total_stats['errors']}")
# Show vectorstore stats
print(f"\nπŸ“š VECTORSTORE STATISTICS:")
for year in years:
stats = vectorizer.get_vectorstore_stats(year)
print(f" {year}: {stats['total_documents']} documents")
if __name__ == "__main__":
process_all_unified_files()