import os import json from typing import List, Dict, Any, Optional from datetime import datetime from dotenv import load_dotenv from pathlib import Path # LangChain imports from langchain.embeddings import OpenAIEmbeddings from langchain.vectorstores import Chroma from langchain.schema import Document load_dotenv() class LangChainMultimodalVectorizer: def __init__(self): self.embeddings = OpenAIEmbeddings( # openai_api_key=os.getenv("OPENAI_API_KEY"), # model=os.getenv("OPENAI_EMBEDDING_MODEL", "text-embedding-ada-002") ) # self.persist_dir = os.getenv("CHROMA_PERSIST_DIR", "./chroma_persist") def get_or_create_vectorstore(self, year: int) -> Chroma: """Get or create Chroma vectorstore for specific year""" collection_name = f"optima_multimodal_{year}" # Create persist directory for this year year_persist_dir = os.path.join(self.persist_dir, f"year_{year}") os.makedirs(year_persist_dir, exist_ok=True) try: # Try to load existing vectorstore vectorstore = Chroma( collection_name=collection_name, embedding_function=self.embeddings, persist_directory=year_persist_dir ) # Check if collection exists and has documents if vectorstore._collection.count() > 0: print(f"šŸ“š Using existing vectorstore: {collection_name} ({vectorstore._collection.count()} docs)") else: print(f"šŸ†• Created new vectorstore: {collection_name}") except Exception as e: print(f"šŸ†• Creating new vectorstore: {collection_name}") vectorstore = Chroma( collection_name=collection_name, embedding_function=self.embeddings, persist_directory=year_persist_dir ) return vectorstore def create_embedding_text(self, item: Dict[str, Any]) -> str: """Create optimized text for embedding based on content_type""" content_type = item.get("content_type", "") content = item.get("content", "") context_text = item.get("context_text", "") # Create rich embedding text based on content_type if content_type == "silabus": mata_kuliah = item.get("mata_kuliah", "") course_code = item.get("course_code", "") silabus_type = item.get("silabus_type", "") program = item.get("program", "") semester = item.get("semester", "") embedding_text = f"Silabus {program} semester {semester} {mata_kuliah} {course_code} {silabus_type}: {content} {context_text}" elif content_type == "curriculum": program = item.get("program", "") semester = item.get("semester", "") table_type = item.get("table_type", "") embedding_text = f"Kurikulum {program} semester {semester} {table_type}: {content} {context_text}" elif content_type == "image": title = item.get("title", "") caption = item.get("caption", "") embedding_text = f"Gambar: {title} {caption} {content} {context_text}" elif content_type == "table": title = item.get("title", "") caption = item.get("caption", "") rows = item.get("rows", 0) cols = item.get("cols", 0) embedding_text = f"Tabel {rows}x{cols}: {title} {caption} {content} {context_text}" else: # text_chunk chapter = item.get("chapter", "") section = item.get("section", "") embedding_text = f"Teks {chapter} {section}: {content} {context_text}" return embedding_text def prepare_document_metadata(self, item: Dict[str, Any]) -> Dict[str, Any]: """Prepare metadata for LangChain Document""" content_type = item.get("content_type", "") # Base metadata (common for all types) metadata = { "id": item.get("id", ""), "content_type": content_type, "year": item.get("year", 0), "page": item.get("page", 0), "filename": item.get("filename", "")[:200], "filepath": item.get("filepath", "")[:300], "extracted_at": item.get("extracted_at", "") } # Add specific metadata based on content_type if content_type == "silabus": metadata.update({ "mata_kuliah": item.get("mata_kuliah", "")[:200], "course_code": item.get("course_code", ""), "sks": item.get("sks", ""), "program": item.get("program", ""), "semester": item.get("semester", ""), "silabus_type": item.get("silabus_type", "") }) elif content_type == "curriculum": metadata.update({ "program": item.get("program", ""), "semester": item.get("semester", ""), "table_type": item.get("table_type", ""), "content_type_detail": item.get("content_type_detail", ""), "rows_count": item.get("rows_count", 0) }) elif content_type == "image": metadata.update({ "title": item.get("title", "")[:200], "caption": item.get("caption", "")[:300], "image_index": item.get("image_index", 0), "image_path": item.get("filepath", "") }) elif content_type == "table": metadata.update({ "title": item.get("title", "")[:200], "caption": item.get("caption", "")[:300], "table_index": item.get("table_index", 0), "rows": item.get("rows", 0), "cols": item.get("cols", 0), "table_path": item.get("filepath", "") }) else: # text_chunk metadata.update({ "chapter": item.get("chapter", "")[:200], "section": item.get("section", "")[:200], "subsection": item.get("subsection", "")[:200], "chunk_type": item.get("chunk_type", ""), "quality_score": item.get("quality_score", 0.0) }) return metadata def process_unified_json(self, json_file_path: str, year: int) -> Dict[str, int]: """Process unified multimodal JSON file using LangChain""" if not os.path.exists(json_file_path): print(f"āŒ File not found: {json_file_path}") return {} print(f"šŸ”„ Processing: {json_file_path}") with open(json_file_path, 'r', encoding='utf-8') as f: raw_data = json.load(f) # šŸ”§ Handle different JSON structures if isinstance(raw_data, dict): if 'content' in raw_data: data = raw_data['content'] # Extract from content array print(f"šŸ“¦ Detected structured JSON with 'content' key") else: print(f"āŒ Unexpected JSON structure: {list(raw_data.keys())}") return {} elif isinstance(raw_data, list): data = raw_data # Direct array print(f"šŸ“¦ Detected direct array JSON") else: print(f"āŒ Unexpected JSON type: {type(raw_data)}") return {} # Get vectorstore for this year vectorstore = self.get_or_create_vectorstore(year) # Statistics stats = { "text_chunk": 0, "image": 0, "table": 0, "curriculum": 0, "silabus": 0, "total": 0, "errors": 0, "skipped": 0 } print(f"šŸ“Š Found {len(data)} items for year {year}") # Prepare documents for batch processing documents = [] batch_size = 50 for idx, item in enumerate(data): try: # šŸ”§ Ensure item is dict if not isinstance(item, dict): print(f"āš ļø Skipping non-dict item at index {idx}: {type(item)}") stats["skipped"] += 1 continue content_type = item.get("content_type", "unknown") content = item.get("content", "") context_text = item.get("context_text", "") # Skip if no meaningful content if not content and not context_text: stats["skipped"] += 1 continue if len(str(content).strip()) < 3 and len(str(context_text).strip()) < 10: stats["skipped"] += 1 continue # Create embedding text embedding_text = self.create_embedding_text(item) # Prepare metadata metadata = self.prepare_document_metadata(item) # Create LangChain Document doc = Document( page_content=embedding_text, metadata=metadata ) documents.append(doc) # Update stats if content_type in stats: stats[content_type] += 1 else: stats["unknown"] = stats.get("unknown", 0) + 1 stats["total"] += 1 # Process batch when full if len(documents) >= batch_size: self.add_documents_to_vectorstore(vectorstore, documents) print(f" āœ… Processed batch {stats['total']//batch_size} ({stats['total']} items)") documents = [] # Reset batch except Exception as e: print(f"āŒ Error processing item {idx}: {e}") print(f" Item type: {type(item)}") if isinstance(item, dict): print(f" Item keys: {list(item.keys())[:5]}...") else: print(f" Item content preview: {str(item)[:100]}...") stats["errors"] += 1 # Process remaining documents if documents: self.add_documents_to_vectorstore(vectorstore, documents) # Persist the vectorstore vectorstore.persist() print(f"šŸ“Š Processing complete for year {year}:") for key, value in stats.items(): if value > 0: print(f" šŸ“ {key}: {value}") return stats def add_documents_to_vectorstore(self, vectorstore: Chroma, documents: List[Document]): """Add documents to vectorstore""" try: vectorstore.add_documents(documents) except Exception as e: print(f"āŒ Error adding documents to vectorstore: {e}") def query_multimodal(self, query_text: str, year: Optional[int] = None, content_types: Optional[List[str]] = None, n_results: int = 10) -> List[Dict]: results = [] years_to_search = [year] if year else [2022, 2023, 2024] for search_year in years_to_search: try: vectorstore = self.get_or_create_vectorstore(search_year) # Build filter for content types search_kwargs = {"k": n_results} if content_types: search_kwargs["filter"] = {"content_type": {"$in": content_types}} # Perform similarity search docs = vectorstore.similarity_search_with_score( query_text, k=n_results, filter=search_kwargs.get("filter") ) # Format results for doc, score in docs: result = { "content": doc.page_content, "metadata": doc.metadata, "score": score, "year": search_year } # Add special handling for images if result["metadata"]["content_type"] == "image": result["image_path"] = result["metadata"].get("image_path", "") result["retrievable"] = os.path.exists(result["image_path"]) if result["image_path"] else False # Add special handling for tables elif result["metadata"]["content_type"] == "table": result["table_path"] = result["metadata"].get("table_path", "") result["retrievable"] = os.path.exists(result["table_path"]) if result["table_path"] else False results.append(result) except Exception as e: print(f"āŒ Error querying year {search_year}: {e}") # Sort by score (lower is better for distance-based scoring) results.sort(key=lambda x: x["score"]) return results[:n_results] def get_vectorstore_stats(self, year: int) -> Dict: """Get statistics for a vectorstore""" try: vectorstore = self.get_or_create_vectorstore(year) count = vectorstore._collection.count() return { "year": year, "total_documents": count, "collection_name": f"optima_multimodal_{year}" } except Exception as e: print(f"āŒ Error getting stats for year {year}: {e}") return {"year": year, "total_documents": 0, "error": str(e)} def process_all_unified_files(data_dir: str = "./chunked"): vectorizer = LangChainMultimodalVectorizer() years = [2022, 2023, 2024] total_stats = {"total": 0, "errors": 0} for year in years: json_file = os.path.join(data_dir, f"multimodal_unified_{year}.json") if not os.path.exists(json_file): print(f"āš ļø File not found: {json_file}") continue print(f"\nšŸ”„ Processing year {year}...") stats = vectorizer.process_unified_json(json_file, year) if stats: print(f"šŸ“Š Year {year} Final Statistics:") for content_type, count in stats.items(): print(f" šŸ“ {content_type}: {count}") total_stats["total"] += stats.get("total", 0) total_stats["errors"] += stats.get("errors", 0) print(f"\nšŸŽ‰ FINAL PROCESSING SUMMARY:") print(f" šŸŽÆ Total documents processed: {total_stats['total']}") print(f" āŒ Total errors: {total_stats['errors']}") # Show vectorstore stats print(f"\nšŸ“š VECTORSTORE STATISTICS:") for year in years: stats = vectorizer.get_vectorstore_stats(year) print(f" {year}: {stats['total_documents']} documents") if __name__ == "__main__": process_all_unified_files()