Spaces:
Build error
Build error
| import os | |
| import json | |
| from typing import List, Dict, Any, Optional | |
| from datetime import datetime | |
| from dotenv import load_dotenv | |
| from pathlib import Path | |
| # LangChain imports | |
| from langchain.embeddings import OpenAIEmbeddings | |
| from langchain.vectorstores import Chroma | |
| from langchain.schema import Document | |
| load_dotenv() | |
| class LangChainMultimodalVectorizer: | |
| def __init__(self): | |
| self.embeddings = OpenAIEmbeddings( | |
| # openai_api_key=os.getenv("OPENAI_API_KEY"), | |
| # model=os.getenv("OPENAI_EMBEDDING_MODEL", "text-embedding-ada-002") | |
| ) | |
| # self.persist_dir = os.getenv("CHROMA_PERSIST_DIR", "./chroma_persist") | |
| def get_or_create_vectorstore(self, year: int) -> Chroma: | |
| """Get or create Chroma vectorstore for specific year""" | |
| collection_name = f"optima_multimodal_{year}" | |
| # Create persist directory for this year | |
| year_persist_dir = os.path.join(self.persist_dir, f"year_{year}") | |
| os.makedirs(year_persist_dir, exist_ok=True) | |
| try: | |
| # Try to load existing vectorstore | |
| vectorstore = Chroma( | |
| collection_name=collection_name, | |
| embedding_function=self.embeddings, | |
| persist_directory=year_persist_dir | |
| ) | |
| # Check if collection exists and has documents | |
| if vectorstore._collection.count() > 0: | |
| print(f"π Using existing vectorstore: {collection_name} ({vectorstore._collection.count()} docs)") | |
| else: | |
| print(f"π Created new vectorstore: {collection_name}") | |
| except Exception as e: | |
| print(f"π Creating new vectorstore: {collection_name}") | |
| vectorstore = Chroma( | |
| collection_name=collection_name, | |
| embedding_function=self.embeddings, | |
| persist_directory=year_persist_dir | |
| ) | |
| return vectorstore | |
| def create_embedding_text(self, item: Dict[str, Any]) -> str: | |
| """Create optimized text for embedding based on content_type""" | |
| content_type = item.get("content_type", "") | |
| content = item.get("content", "") | |
| context_text = item.get("context_text", "") | |
| # Create rich embedding text based on content_type | |
| if content_type == "silabus": | |
| mata_kuliah = item.get("mata_kuliah", "") | |
| course_code = item.get("course_code", "") | |
| silabus_type = item.get("silabus_type", "") | |
| program = item.get("program", "") | |
| semester = item.get("semester", "") | |
| embedding_text = f"Silabus {program} semester {semester} {mata_kuliah} {course_code} {silabus_type}: {content} {context_text}" | |
| elif content_type == "curriculum": | |
| program = item.get("program", "") | |
| semester = item.get("semester", "") | |
| table_type = item.get("table_type", "") | |
| embedding_text = f"Kurikulum {program} semester {semester} {table_type}: {content} {context_text}" | |
| elif content_type == "image": | |
| title = item.get("title", "") | |
| caption = item.get("caption", "") | |
| embedding_text = f"Gambar: {title} {caption} {content} {context_text}" | |
| elif content_type == "table": | |
| title = item.get("title", "") | |
| caption = item.get("caption", "") | |
| rows = item.get("rows", 0) | |
| cols = item.get("cols", 0) | |
| embedding_text = f"Tabel {rows}x{cols}: {title} {caption} {content} {context_text}" | |
| else: # text_chunk | |
| chapter = item.get("chapter", "") | |
| section = item.get("section", "") | |
| embedding_text = f"Teks {chapter} {section}: {content} {context_text}" | |
| return embedding_text | |
| def prepare_document_metadata(self, item: Dict[str, Any]) -> Dict[str, Any]: | |
| """Prepare metadata for LangChain Document""" | |
| content_type = item.get("content_type", "") | |
| # Base metadata (common for all types) | |
| metadata = { | |
| "id": item.get("id", ""), | |
| "content_type": content_type, | |
| "year": item.get("year", 0), | |
| "page": item.get("page", 0), | |
| "filename": item.get("filename", "")[:200], | |
| "filepath": item.get("filepath", "")[:300], | |
| "extracted_at": item.get("extracted_at", "") | |
| } | |
| # Add specific metadata based on content_type | |
| if content_type == "silabus": | |
| metadata.update({ | |
| "mata_kuliah": item.get("mata_kuliah", "")[:200], | |
| "course_code": item.get("course_code", ""), | |
| "sks": item.get("sks", ""), | |
| "program": item.get("program", ""), | |
| "semester": item.get("semester", ""), | |
| "silabus_type": item.get("silabus_type", "") | |
| }) | |
| elif content_type == "curriculum": | |
| metadata.update({ | |
| "program": item.get("program", ""), | |
| "semester": item.get("semester", ""), | |
| "table_type": item.get("table_type", ""), | |
| "content_type_detail": item.get("content_type_detail", ""), | |
| "rows_count": item.get("rows_count", 0) | |
| }) | |
| elif content_type == "image": | |
| metadata.update({ | |
| "title": item.get("title", "")[:200], | |
| "caption": item.get("caption", "")[:300], | |
| "image_index": item.get("image_index", 0), | |
| "image_path": item.get("filepath", "") | |
| }) | |
| elif content_type == "table": | |
| metadata.update({ | |
| "title": item.get("title", "")[:200], | |
| "caption": item.get("caption", "")[:300], | |
| "table_index": item.get("table_index", 0), | |
| "rows": item.get("rows", 0), | |
| "cols": item.get("cols", 0), | |
| "table_path": item.get("filepath", "") | |
| }) | |
| else: # text_chunk | |
| metadata.update({ | |
| "chapter": item.get("chapter", "")[:200], | |
| "section": item.get("section", "")[:200], | |
| "subsection": item.get("subsection", "")[:200], | |
| "chunk_type": item.get("chunk_type", ""), | |
| "quality_score": item.get("quality_score", 0.0) | |
| }) | |
| return metadata | |
| def process_unified_json(self, json_file_path: str, year: int) -> Dict[str, int]: | |
| """Process unified multimodal JSON file using LangChain""" | |
| if not os.path.exists(json_file_path): | |
| print(f"β File not found: {json_file_path}") | |
| return {} | |
| print(f"π Processing: {json_file_path}") | |
| with open(json_file_path, 'r', encoding='utf-8') as f: | |
| raw_data = json.load(f) | |
| # π§ Handle different JSON structures | |
| if isinstance(raw_data, dict): | |
| if 'content' in raw_data: | |
| data = raw_data['content'] # Extract from content array | |
| print(f"π¦ Detected structured JSON with 'content' key") | |
| else: | |
| print(f"β Unexpected JSON structure: {list(raw_data.keys())}") | |
| return {} | |
| elif isinstance(raw_data, list): | |
| data = raw_data # Direct array | |
| print(f"π¦ Detected direct array JSON") | |
| else: | |
| print(f"β Unexpected JSON type: {type(raw_data)}") | |
| return {} | |
| # Get vectorstore for this year | |
| vectorstore = self.get_or_create_vectorstore(year) | |
| # Statistics | |
| stats = { | |
| "text_chunk": 0, | |
| "image": 0, | |
| "table": 0, | |
| "curriculum": 0, | |
| "silabus": 0, | |
| "total": 0, | |
| "errors": 0, | |
| "skipped": 0 | |
| } | |
| print(f"π Found {len(data)} items for year {year}") | |
| # Prepare documents for batch processing | |
| documents = [] | |
| batch_size = 50 | |
| for idx, item in enumerate(data): | |
| try: | |
| # π§ Ensure item is dict | |
| if not isinstance(item, dict): | |
| print(f"β οΈ Skipping non-dict item at index {idx}: {type(item)}") | |
| stats["skipped"] += 1 | |
| continue | |
| content_type = item.get("content_type", "unknown") | |
| content = item.get("content", "") | |
| context_text = item.get("context_text", "") | |
| # Skip if no meaningful content | |
| if not content and not context_text: | |
| stats["skipped"] += 1 | |
| continue | |
| if len(str(content).strip()) < 3 and len(str(context_text).strip()) < 10: | |
| stats["skipped"] += 1 | |
| continue | |
| # Create embedding text | |
| embedding_text = self.create_embedding_text(item) | |
| # Prepare metadata | |
| metadata = self.prepare_document_metadata(item) | |
| # Create LangChain Document | |
| doc = Document( | |
| page_content=embedding_text, | |
| metadata=metadata | |
| ) | |
| documents.append(doc) | |
| # Update stats | |
| if content_type in stats: | |
| stats[content_type] += 1 | |
| else: | |
| stats["unknown"] = stats.get("unknown", 0) + 1 | |
| stats["total"] += 1 | |
| # Process batch when full | |
| if len(documents) >= batch_size: | |
| self.add_documents_to_vectorstore(vectorstore, documents) | |
| print(f" β Processed batch {stats['total']//batch_size} ({stats['total']} items)") | |
| documents = [] # Reset batch | |
| except Exception as e: | |
| print(f"β Error processing item {idx}: {e}") | |
| print(f" Item type: {type(item)}") | |
| if isinstance(item, dict): | |
| print(f" Item keys: {list(item.keys())[:5]}...") | |
| else: | |
| print(f" Item content preview: {str(item)[:100]}...") | |
| stats["errors"] += 1 | |
| # Process remaining documents | |
| if documents: | |
| self.add_documents_to_vectorstore(vectorstore, documents) | |
| # Persist the vectorstore | |
| vectorstore.persist() | |
| print(f"π Processing complete for year {year}:") | |
| for key, value in stats.items(): | |
| if value > 0: | |
| print(f" π {key}: {value}") | |
| return stats | |
| def add_documents_to_vectorstore(self, vectorstore: Chroma, documents: List[Document]): | |
| """Add documents to vectorstore""" | |
| try: | |
| vectorstore.add_documents(documents) | |
| except Exception as e: | |
| print(f"β Error adding documents to vectorstore: {e}") | |
| def query_multimodal(self, query_text: str, year: Optional[int] = None, | |
| content_types: Optional[List[str]] = None, | |
| n_results: int = 10) -> List[Dict]: | |
| results = [] | |
| years_to_search = [year] if year else [2022, 2023, 2024] | |
| for search_year in years_to_search: | |
| try: | |
| vectorstore = self.get_or_create_vectorstore(search_year) | |
| # Build filter for content types | |
| search_kwargs = {"k": n_results} | |
| if content_types: | |
| search_kwargs["filter"] = {"content_type": {"$in": content_types}} | |
| # Perform similarity search | |
| docs = vectorstore.similarity_search_with_score( | |
| query_text, | |
| k=n_results, | |
| filter=search_kwargs.get("filter") | |
| ) | |
| # Format results | |
| for doc, score in docs: | |
| result = { | |
| "content": doc.page_content, | |
| "metadata": doc.metadata, | |
| "score": score, | |
| "year": search_year | |
| } | |
| # Add special handling for images | |
| if result["metadata"]["content_type"] == "image": | |
| result["image_path"] = result["metadata"].get("image_path", "") | |
| result["retrievable"] = os.path.exists(result["image_path"]) if result["image_path"] else False | |
| # Add special handling for tables | |
| elif result["metadata"]["content_type"] == "table": | |
| result["table_path"] = result["metadata"].get("table_path", "") | |
| result["retrievable"] = os.path.exists(result["table_path"]) if result["table_path"] else False | |
| results.append(result) | |
| except Exception as e: | |
| print(f"β Error querying year {search_year}: {e}") | |
| # Sort by score (lower is better for distance-based scoring) | |
| results.sort(key=lambda x: x["score"]) | |
| return results[:n_results] | |
| def get_vectorstore_stats(self, year: int) -> Dict: | |
| """Get statistics for a vectorstore""" | |
| try: | |
| vectorstore = self.get_or_create_vectorstore(year) | |
| count = vectorstore._collection.count() | |
| return { | |
| "year": year, | |
| "total_documents": count, | |
| "collection_name": f"optima_multimodal_{year}" | |
| } | |
| except Exception as e: | |
| print(f"β Error getting stats for year {year}: {e}") | |
| return {"year": year, "total_documents": 0, "error": str(e)} | |
| def process_all_unified_files(data_dir: str = "./chunked"): | |
| vectorizer = LangChainMultimodalVectorizer() | |
| years = [2022, 2023, 2024] | |
| total_stats = {"total": 0, "errors": 0} | |
| for year in years: | |
| json_file = os.path.join(data_dir, f"multimodal_unified_{year}.json") | |
| if not os.path.exists(json_file): | |
| print(f"β οΈ File not found: {json_file}") | |
| continue | |
| print(f"\nπ Processing year {year}...") | |
| stats = vectorizer.process_unified_json(json_file, year) | |
| if stats: | |
| print(f"π Year {year} Final Statistics:") | |
| for content_type, count in stats.items(): | |
| print(f" π {content_type}: {count}") | |
| total_stats["total"] += stats.get("total", 0) | |
| total_stats["errors"] += stats.get("errors", 0) | |
| print(f"\nπ FINAL PROCESSING SUMMARY:") | |
| print(f" π― Total documents processed: {total_stats['total']}") | |
| print(f" β Total errors: {total_stats['errors']}") | |
| # Show vectorstore stats | |
| print(f"\nπ VECTORSTORE STATISTICS:") | |
| for year in years: | |
| stats = vectorizer.get_vectorstore_stats(year) | |
| print(f" {year}: {stats['total_documents']} documents") | |
| if __name__ == "__main__": | |
| process_all_unified_files() | |