Spaces:
Build error
Build error
| import os | |
| import json | |
| import pandas as pd | |
| from typing import List, Dict, Any, Optional, Tuple, Set | |
| from datetime import datetime | |
| from dotenv import load_dotenv | |
| import numpy as np | |
| from sklearn.metrics.pairwise import cosine_similarity | |
| import re | |
| from langchain_community.embeddings import OpenAIEmbeddings | |
| from langchain_community.vectorstores import Chroma | |
| from langchain_community.chat_models import ChatOpenAI | |
| from langchain.prompts import PromptTemplate | |
| from collections import defaultdict | |
| from vectorization import LangChainMultimodalVectorizer | |
| from year_parser import YearParser | |
| from config import * | |
| load_dotenv() | |
| class EnhancedMultimodalRAGSystem: | |
| def __init__(self): | |
| """Initialize enhanced RAG system with multimodal capabilities""" | |
| self.vectorizer = LangChainMultimodalVectorizer() | |
| self.llm = ChatOpenAI( | |
| # openai_api_key=os.getenv("OPENAI_API_KEY"), | |
| # model_name=os.getenv("OPENAI_MODEL", DEFAULT_LLM_MODEL), | |
| temperature=LLM_TEMPERATURE, | |
| max_tokens=MAX_TOKENS, | |
| request_timeout=LLM_TIMEOUT | |
| ) | |
| self.year_parser = YearParser() | |
| self.COSINE_SIMILARITY_THRESHOLD = COSINE_SIMILARITY_THRESHOLD | |
| self.MAX_SIMILAR_CONTEXT = MAX_SIMILAR_CONTEXT | |
| self.VALID_YEARS = VALID_YEARS | |
| # New: Context expansion settings | |
| self.CONTEXT_EXPANSION_ENABLED = True | |
| self.MAX_CONTEXT_CHUNKS_PER_SOURCE = 5 # Max additional chunks per source | |
| self.CONTEXT_SIMILARITY_THRESHOLD = 0.7 # Similarity threshold for context expansion | |
| if VERBOSE_LOGGING: | |
| print(f"π Enhanced Multimodal RAG System initialized") | |
| print(f" π§ LLM Model: {os.getenv('OPENAI_MODEL', DEFAULT_LLM_MODEL)}") | |
| print(f" π Cosine Similarity Threshold: {self.COSINE_SIMILARITY_THRESHOLD}") | |
| print(f" π Valid Years: {self.VALID_YEARS}") | |
| print(f" π Context Expansion: {self.CONTEXT_EXPANSION_ENABLED}") | |
| def get_metadata_similarity_score(self, meta1: Dict, meta2: Dict) -> float: | |
| """Calculate similarity score between two metadata objects""" | |
| similarity_score = 0.0 | |
| total_weight = 0.0 | |
| # Define weights for different metadata fields | |
| field_weights = { | |
| 'year': 0.3, | |
| 'page': 0.2, | |
| 'program': 0.25, | |
| 'semester': 0.15, | |
| 'chapter': 0.2, | |
| 'section': 0.15, | |
| 'subsection': 0.1, | |
| 'content_type': 0.2, | |
| 'course_code': 0.15, | |
| 'mata_kuliah': 0.15 | |
| } | |
| for field, weight in field_weights.items(): | |
| if field in meta1 and field in meta2: | |
| total_weight += weight | |
| if field in ['year', 'page', 'semester']: | |
| if meta1[field] == meta2[field]: | |
| similarity_score += weight | |
| elif field == 'page': | |
| try: | |
| page1, page2 = int(meta1[field]), int(meta2[field]) | |
| page_diff = abs(page1 - page2) | |
| if page_diff == 0: | |
| similarity_score += weight | |
| elif page_diff <= 2: | |
| similarity_score += weight * 0.5 | |
| except: | |
| pass | |
| else: | |
| str1, str2 = str(meta1[field]).lower(), str(meta2[field]).lower() | |
| if str1 == str2: | |
| similarity_score += weight | |
| elif str1 in str2 or str2 in str1: | |
| similarity_score += weight * 0.7 | |
| return similarity_score / total_weight if total_weight > 0 else 0.0 | |
| def find_contextual_chunks(self, base_result: Dict, all_results: List[Dict]) -> List[Dict]: | |
| base_metadata = base_result["metadata"] | |
| contextual_chunks = [] | |
| for result in all_results: | |
| if result["metadata"].get("id") == base_metadata.get("id"): | |
| continue | |
| if result["metadata"].get("year") != base_metadata.get("year"): | |
| continue | |
| similarity_score = self.get_metadata_similarity_score(base_metadata, result["metadata"]) | |
| if similarity_score >= self.CONTEXT_SIMILARITY_THRESHOLD: | |
| result["context_similarity_score"] = similarity_score | |
| contextual_chunks.append(result) | |
| # Sort by similarity score and limit | |
| contextual_chunks.sort(key=lambda x: x["context_similarity_score"], reverse=True) | |
| return contextual_chunks[:self.MAX_CONTEXT_CHUNKS_PER_SOURCE] | |
| def get_document_chunks_by_metadata(self, metadata: Dict, year: int) -> List[Dict]: | |
| """Get all chunks from the same document/source with similar metadata""" | |
| try: | |
| # Build a more specific query based on metadata | |
| search_filters = [] | |
| if metadata.get('program'): | |
| search_filters.append(f"program:{metadata['program']}") | |
| if metadata.get('semester'): | |
| search_filters.append(f"semester:{metadata['semester']}") | |
| if metadata.get('chapter'): | |
| search_filters.append(f"chapter:{metadata['chapter']}") | |
| if metadata.get('section'): | |
| search_filters.append(f"section:{metadata['section']}") | |
| # Create a search query from metadata | |
| search_query = " ".join(search_filters) if search_filters else metadata.get('title', '') | |
| # Get chunks from vectorstore with broader search | |
| results = self.vectorizer.query_multimodal( | |
| query_text=search_query, | |
| year=year, | |
| content_types=None, | |
| n_results=20 # Get more results for context expansion | |
| ) | |
| return results | |
| except Exception as e: | |
| print(f"β Error getting document chunks: {e}") | |
| return [] | |
| def expand_context_for_results(self, initial_results: List[Dict]) -> List[Dict]: | |
| """Expand context by finding related chunks for each initial result""" | |
| if not self.CONTEXT_EXPANSION_ENABLED: | |
| return initial_results | |
| expanded_results = [] | |
| seen_ids = set() | |
| for result in initial_results: | |
| # Add the original result | |
| result_id = result["metadata"].get("id", "") | |
| if result_id not in seen_ids: | |
| result["is_primary_result"] = True | |
| expanded_results.append(result) | |
| seen_ids.add(result_id) | |
| # Find contextual chunks | |
| year = result.get("search_year", result["metadata"].get("year")) | |
| if year: | |
| document_chunks = self.get_document_chunks_by_metadata( | |
| result["metadata"], year | |
| ) | |
| contextual_chunks = self.find_contextual_chunks(result, document_chunks) | |
| # Add contextual chunks | |
| for ctx_chunk in contextual_chunks: | |
| ctx_id = ctx_chunk["metadata"].get("id", "") | |
| if ctx_id not in seen_ids: | |
| ctx_chunk["is_primary_result"] = False | |
| ctx_chunk["parent_result_id"] = result_id | |
| expanded_results.append(ctx_chunk) | |
| seen_ids.add(ctx_id) | |
| if VERBOSE_LOGGING: | |
| print(f"π Added contextual chunk for {result_id}: {ctx_id}") | |
| if VERBOSE_LOGGING: | |
| primary_count = sum(1 for r in expanded_results if r.get("is_primary_result", False)) | |
| context_count = len(expanded_results) - primary_count | |
| print( | |
| f"π Context expansion: {primary_count} primary + {context_count} contextual = {len(expanded_results)} total") | |
| return expanded_results | |
| def group_related_content(self, results: List[Dict]) -> Dict[str, List[Dict]]: | |
| """Group results by their relationships (same document, similar metadata, etc.)""" | |
| groups = defaultdict(list) | |
| for result in results: | |
| metadata = result["metadata"] | |
| # Create grouping key based on metadata | |
| group_key_parts = [] | |
| if metadata.get('program'): | |
| group_key_parts.append(f"prog_{metadata['program']}") | |
| if metadata.get('year'): | |
| group_key_parts.append(f"year_{metadata['year']}") | |
| if metadata.get('semester'): | |
| group_key_parts.append(f"sem_{metadata['semester']}") | |
| if metadata.get('chapter'): | |
| group_key_parts.append(f"ch_{metadata['chapter']}") | |
| if metadata.get('content_type'): | |
| group_key_parts.append(f"type_{metadata['content_type']}") | |
| group_key = "_".join(group_key_parts) if group_key_parts else "general" | |
| groups[group_key].append(result) | |
| return dict(groups) | |
| def retrieve_multimodal_context_enhanced(self, query_context: Dict[str, Any], k: int = 10) -> List[Dict]: | |
| """Enhanced retrieval with context expansion""" | |
| all_results = [] | |
| content_strategies = {} | |
| for content_type, ratio in CONTENT_TYPE_STRATEGIES.items(): | |
| content_strategies[content_type] = max(1, int(k * ratio)) | |
| if LOG_RETRIEVAL_DETAILS: | |
| print(f"π― Content strategies: {content_strategies}") | |
| print(f"π Searching years: {query_context['years']}") | |
| # Step 1: Get initial results | |
| for year in query_context["years"]: | |
| if year not in self.VALID_YEARS: | |
| print(f"β οΈ Skipping invalid year: {year}") | |
| continue | |
| try: | |
| if query_context.get("preferred_content_types"): | |
| for content_type in query_context["preferred_content_types"]: | |
| results = self.vectorizer.query_multimodal( | |
| query_text=query_context["cleaned_query"], | |
| year=year, | |
| content_types=[content_type], | |
| n_results=content_strategies.get(content_type, k//4) | |
| ) | |
| for result in results: | |
| result["search_year"] = year | |
| result["content_priority"] = True | |
| all_results.extend(results) | |
| remaining_k = max(1, k - len(all_results)) | |
| general_results = self.vectorizer.query_multimodal( | |
| query_text=query_context["cleaned_query"], | |
| year=year, | |
| content_types=None, | |
| n_results=remaining_k | |
| ) | |
| for result in general_results: | |
| result["search_year"] = year | |
| result["content_priority"] = False | |
| all_results.extend(general_results) | |
| except Exception as e: | |
| print(f"β Error retrieving from year {year}: {e}") | |
| # Step 2: deduplikasi | |
| unique_results = self._deduplicate_and_rank_results(all_results, k) | |
| # Step 3: Mencari konteks diluar dengana meta | |
| expanded_results = self.expand_context_for_results(unique_results) | |
| # Step 4: Final ranking and limiting | |
| final_results = self._final_ranking_with_context(expanded_results, k * 2) # Allow more results due to context | |
| if VERBOSE_LOGGING: | |
| print(f"π Final results with context: {len(final_results)}") | |
| return final_results | |
| def _final_ranking_with_context(self, results: List[Dict], max_results: int) -> List[Dict]: | |
| """Final ranking that considers both primary results and their context""" | |
| # Separate primary and contextual results | |
| primary_results = [r for r in results if r.get("is_primary_result", True)] | |
| contextual_results = [r for r in results if not r.get("is_primary_result", True)] | |
| # Sort primary results by score | |
| primary_results.sort(key=lambda x: x.get("score", 0), reverse=True) | |
| # For each primary result, add its best contextual chunks | |
| final_results = [] | |
| for primary in primary_results: | |
| if len(final_results) >= max_results: | |
| break | |
| final_results.append(primary) | |
| # Add related contextual chunks | |
| primary_id = primary["metadata"].get("id", "") | |
| related_contexts = [ | |
| r for r in contextual_results | |
| if r.get("parent_result_id") == primary_id | |
| ] | |
| # Sort contextual chunks by their similarity score | |
| related_contexts.sort(key=lambda x: x.get("context_similarity_score", 0), reverse=True) | |
| # Add top contextual chunks | |
| for ctx in related_contexts[:2]: # Limit to 2 contextual chunks per primary | |
| if len(final_results) < max_results: | |
| final_results.append(ctx) | |
| return final_results | |
| def format_enhanced_context_with_grouping(self, results: List[Dict]) -> str: | |
| """Format context with grouping and relationship indicators""" | |
| if not results: | |
| return "Tidak ada informasi yang relevan ditemukan." | |
| # Group related content | |
| grouped_results = self.group_related_content(results) | |
| context_parts = [] | |
| for group_key, group_results in grouped_results.items(): | |
| context_parts.append(f"\n{'='*60}") | |
| context_parts.append(f"π GRUP: {group_key.replace('_', ' ').upper()}") | |
| context_parts.append(f"{'='*60}") | |
| for i, result in enumerate(group_results, 1): | |
| content_type = result["metadata"]["content_type"] | |
| is_primary = result.get("is_primary_result", True) | |
| # Add indicator for primary vs contextual | |
| result_type = "π― PRIMARY" if is_primary else "π CONTEXT" | |
| # Enhanced formatting based on content type | |
| if content_type == "table": | |
| context_part = self.enhance_table_context_with_markdown(result) | |
| elif content_type == "image": | |
| context_part = self.enhance_image_context_with_details(result) | |
| elif content_type == "silabus": | |
| context_part = self.enhance_silabus_context_detailed(result) | |
| elif content_type == "curriculum": | |
| context_part = self.enhance_curriculum_context_detailed(result) | |
| elif content_type == "text_chunk": | |
| context_part = self.enhance_text_context_detailed(result) | |
| else: | |
| context_part = f""" | |
| **KONTEN {content_type.upper()}:** | |
| - **Tahun:** {result["metadata"].get('year', 'N/A')} | |
| - **Halaman:** {result["metadata"].get('page', 'N/A')} | |
| - **Context:** {result.get('context_text', '')[:200]}... | |
| **Konten:** | |
| {result['content'][:500]}... | |
| """ | |
| header = f"**{result_type} SUMBER {i}:**" | |
| if not is_primary: | |
| similarity_score = result.get("context_similarity_score", 0) | |
| header += f" (Similarity: {similarity_score:.2f})" | |
| context_parts.append(f"{header}\n{context_part}") | |
| return "\n\n".join(context_parts) | |
| def _deduplicate_and_rank_results(self, all_results: List[Dict], k: int) -> List[Dict]: | |
| seen_ids = set() | |
| unique_results = [] | |
| sorted_results = sorted( | |
| all_results, | |
| key=lambda x: (x.get("score", 0), not x.get("content_priority", False)) | |
| ) | |
| content_type_counts = {} | |
| max_per_type = max(1, k // len(CONTENT_TYPE_STRATEGIES)) | |
| for result in sorted_results: | |
| result_id = result["metadata"].get("id", "") | |
| content_type = result["metadata"]["content_type"] | |
| # Skip duplicates | |
| if result_id in seen_ids: | |
| continue | |
| # Limit per content type for diversity (unless priority content) | |
| if not result.get("content_priority", False): | |
| if content_type_counts.get(content_type, 0) >= max_per_type: | |
| continue | |
| seen_ids.add(result_id) | |
| content_type_counts[content_type] = content_type_counts.get(content_type, 0) + 1 | |
| # Enhance with context_text | |
| if "context_text" not in result: | |
| result["context_text"] = result["metadata"].get("context_text", "") | |
| unique_results.append(result) | |
| if len(unique_results) >= k: | |
| break | |
| return unique_results | |
| def enhance_table_context_with_markdown(self, result: Dict) -> str: | |
| """Enhanced table context with markdown formatting""" | |
| metadata = result["metadata"] | |
| context_text = result.get("context_text", "") | |
| enhanced_context = f""" | |
| **TABEL ENHANCED:** | |
| - **Judul:** {metadata.get('title', 'N/A')} | |
| - **Ukuran:** {metadata.get('rows', 0)} baris Γ {metadata.get('cols', 0)} kolom | |
| - **Tahun:** {metadata.get('year', 'N/A')} | |
| - **Halaman:** {metadata.get('page', 'N/A')} | |
| - **Context:** {context_text} | |
| - **Preview:** {result['content'][:300]}... | |
| **Konten Lengkap:** | |
| {result['content']} | |
| """ | |
| return enhanced_context | |
| def enhance_image_context_with_details(self, result: Dict) -> str: | |
| """Enhanced image context with detailed metadata""" | |
| metadata = result["metadata"] | |
| context_text = result.get("context_text", "") | |
| enhanced_context = f""" | |
| **GAMBAR ENHANCED:** | |
| - **Judul:** {metadata.get('title', 'N/A')} | |
| - **Caption:** {metadata.get('caption', 'N/A')} | |
| - **Tahun:** {metadata.get('year', 'N/A')} | |
| - **Halaman:** {metadata.get('page', 'N/A')} | |
| - **Context:** {context_text} | |
| - **Deskripsi:** {result['content'][:300]}... | |
| **Path Gambar:** {metadata.get('image_path', 'N/A')} | |
| """ | |
| return enhanced_context | |
| def enhance_silabus_context_detailed(self, result: Dict) -> str: | |
| """Enhanced silabus context with comprehensive details""" | |
| metadata = result["metadata"] | |
| context_text = result.get("context_text", "") | |
| enhanced_context = f""" | |
| **SILABUS ENHANCED:** | |
| - **Mata Kuliah:** {metadata.get('mata_kuliah', 'N/A')} ({metadata.get('course_code', 'N/A')}) | |
| - **Program Studi:** {metadata.get('program', 'N/A').title()} | |
| - **Semester:** {metadata.get('semester', 'N/A')} | |
| - **SKS:** {metadata.get('sks', 'N/A')} | |
| - **Tipe Silabus:** {metadata.get('silabus_type', 'N/A')} | |
| - **Tahun Kurikulum:** {metadata.get('year', 'N/A')} | |
| - **Halaman:** {metadata.get('page', 'N/A')} | |
| - **Context Text:** {context_text} | |
| **Konten Lengkap:** | |
| {result['content']} | |
| """ | |
| return enhanced_context | |
| def enhance_curriculum_context_detailed(self, result: Dict) -> str: | |
| """Enhanced curriculum context with comprehensive details""" | |
| metadata = result["metadata"] | |
| context_text = result.get("context_text", "") | |
| enhanced_context = f""" | |
| **KURIKULUM ENHANCED:** | |
| - **Program Studi:** {metadata.get('program', 'N/A').title()} | |
| - **Semester:** {metadata.get('semester', 'N/A')} | |
| - **Jenis Tabel:** {metadata.get('table_type', 'N/A')} | |
| - **Jumlah Mata Kuliah:** {metadata.get('rows_count', 'N/A')} | |
| - **Tahun Kurikulum:** {metadata.get('year', 'N/A')} | |
| - **Halaman:** {metadata.get('page', 'N/A')} | |
| - **Context Text:** {context_text} | |
| **Konten Lengkap:** | |
| {result['content']} | |
| """ | |
| return enhanced_context | |
| def enhance_text_context_detailed(self, result: Dict) -> str: | |
| """Enhanced text context with comprehensive details""" | |
| metadata = result["metadata"] | |
| context_text = result.get("context_text", "") | |
| enhanced_context = f""" | |
| **TEKS ENHANCED:** | |
| - **Bab:** {metadata.get('chapter', 'N/A')} | |
| - **Bagian:** {metadata.get('section', 'N/A')} | |
| - **Sub-bagian:** {metadata.get('subsection', 'N/A')} | |
| - **Tahun:** {metadata.get('year', 'N/A')} | |
| - **Halaman:** {metadata.get('page', 'N/A')} | |
| - **Context Text:** {context_text} | |
| **Konten Lengkap:** | |
| {result['content']} | |
| """ | |
| return enhanced_context | |
| def format_enhanced_context(self, results: List[Dict]) -> str: | |
| """Format context with comprehensive enhancements and grouping""" | |
| return self.format_enhanced_context_with_grouping(results) | |
| def generate_response(self, query: str, context: str, chat_history: List[Dict] = None) -> str: | |
| """Generate response using LLM with context and chat history""" | |
| # Prepare chat history context | |
| chat_history_text = "" | |
| if chat_history and len(chat_history) > 1: | |
| recent_messages = chat_history[-CONTEXT_WINDOW_SIZE:] | |
| chat_history_text = "\n\nRiwayat Percakapan Terakhir:\n" | |
| for msg in recent_messages[:-1]: # Exclude current message | |
| role = "User" if msg["role"] == "user" else "Assistant" | |
| chat_history_text += f"{role}: {msg['content'][:200]}...\n" | |
| # Enhanced prompt | |
| enhanced_prompt = f""" | |
| Anda adalah asisten akademik DTMI UGM yang membantu mahasiswa dan dosen. | |
| {chat_history_text} | |
| Pertanyaan Saat Ini: {query} | |
| Konteks Informasi: | |
| {context} | |
| Instruksi: | |
| 1. Berikan jawaban yang komprehensif dan akurat | |
| 2. Gunakan informasi dari konteks yang relevan | |
| 3. Jika merujuk ke tahun atau program studi, sebutkan secara spesifik | |
| 4. Format jawaban dengan struktur yang jelas (gunakan bullet points, numbering jika perlu) | |
| 5. Jika ada tabel atau data, jelaskan dengan detail | |
| 6. Akhiri dengan saran atau informasi tambahan yang berguna | |
| 7. Pertimbangkan konteks percakapan sebelumnya jika relevan | |
| 8. Manfaatkan informasi kontekstual yang tersedia untuk memberikan jawaban yang lebih lengkap | |
| Jawaban: | |
| """ | |
| for attempt in range(MAX_RETRIES): | |
| try: | |
| response = self.llm.predict(enhanced_prompt) | |
| return response | |
| except Exception as e: | |
| if attempt == MAX_RETRIES - 1: | |
| return FALLBACK_RESPONSE | |
| else: | |
| import time | |
| time.sleep(RETRY_DELAY) | |
| return FALLBACK_RESPONSE | |
| def parse_query_context(self, query: str) -> Dict[str, Any]: | |
| """Parse query context with year extraction and content type detection""" | |
| years, cleaned_query, user_mentioned_year, user_mentioned_invalid_year = self.year_parser.extract_years(query) | |
| comparison_keywords = ["bandingkan", "banding", "perbandingan", | |
| "dibanding", "vs", "versus", "perbedaan"] | |
| year_comparison_mode = any(keyword in cleaned_query.lower() | |
| for keyword in comparison_keywords) and len(years) > 1 | |
| content_type_hints = { | |
| "silabus": ["silabus", "mata kuliah", "course", "sks", "pembelajaran", "materi"], | |
| "curriculum": ["kurikulum", "curriculum", "semester", "program studi", "struktur"], | |
| "table": ["tabel", "table", "data", "statistik", "daftar", "distribusi"], | |
| "image": ["gambar", "image", "foto", "diagram", "struktur", "chart"], | |
| "text_chunk": ["informasi", "penjelasan", "deskripsi", "detail", "tentang"] | |
| } | |
| preferred_types = [] | |
| query_lower = cleaned_query.lower() | |
| for content_type, keywords in content_type_hints.items(): | |
| if any(keyword in query_lower for keyword in keywords): | |
| preferred_types.append(content_type) | |
| return { | |
| "original_query": query, | |
| "cleaned_query": cleaned_query, | |
| "years": years, | |
| "preferred_content_types": preferred_types, | |
| "year_comparison_mode": year_comparison_mode | |
| } | |
| def query(self, question: str, k: int = 10, content_filter: List[str] = None) -> Dict[str, Any]: | |
| years, cleaned_query, user_mentioned_year, user_mentioned_invalid_year = self.year_parser.extract_years( | |
| question) | |
| if user_mentioned_invalid_year and not years: | |
| return { | |
| "question": question, | |
| "answer": "Maaf, informasi mengenai kurikulum tahun yang Anda minta tidak tersedia dalam konteks database ini.", | |
| "context": "", | |
| "sources": [], | |
| "primary_sources": [], | |
| "contextual_sources": [], | |
| "years_searched": [], | |
| "content_types_used": [], | |
| "total_sources": 0, | |
| "primary_sources_count": 0, | |
| "contextual_sources_count": 0, | |
| "has_images": False, | |
| "has_tables": False, | |
| "image_data": [], | |
| "table_data": [], | |
| "image_paths": [], | |
| "table_paths": [], | |
| "year_comparison_mode": False, | |
| "context_expansion_enabled": self.CONTEXT_EXPANSION_ENABLED, | |
| "processing_time": datetime.now().isoformat() | |
| } | |
| if VERBOSE_LOGGING: | |
| print(f"π Processing query: {question}") | |
| query_context = self.parse_query_context(question) | |
| if content_filter: | |
| query_context["preferred_content_types"] = content_filter | |
| if LOG_RETRIEVAL_DETAILS: | |
| print(f"π Years: {query_context['years']}") | |
| print(f"π― Content types: {query_context['preferred_content_types']}") | |
| print(f"π Content filter: {content_filter}") | |
| results = self.retrieve_multimodal_context_enhanced(query_context, k) | |
| context = self.format_enhanced_context(results) | |
| try: | |
| response = self.generate_response(question, context) | |
| except Exception as e: | |
| print(f"β Error generating answer: {e}") | |
| response = FALLBACK_RESPONSE | |
| image_data = [] | |
| table_data = [] | |
| for result in results: | |
| metadata = result["metadata"] | |
| content_type = metadata.get("content_type", "") | |
| # β FILTER: HANYA AMBIL YANG PRIMARY SOURCES | |
| is_primary = result.get("is_primary_result", True) | |
| if not is_primary: | |
| continue # Skip contextual sources | |
| # πΌοΈ EXTRACT IMAGE INFORMATION - HANYA PRIMARY | |
| if content_type == "image": | |
| original_image_path = metadata.get("image_path", "") | |
| if original_image_path: | |
| # Path fixing logic (sama seperti sebelumnya) | |
| fixed_path = original_image_path | |
| if fixed_path.startswith("./src/"): | |
| fixed_path = fixed_path.replace("./src/", "./") | |
| elif fixed_path.startswith("src/"): | |
| fixed_path = fixed_path.replace("src/", "./") | |
| if os.path.exists(fixed_path): | |
| image_path = fixed_path | |
| elif os.path.exists(original_image_path): | |
| image_path = original_image_path | |
| else: | |
| alternatives = [ | |
| original_image_path.lstrip('./'), | |
| f"../{original_image_path.lstrip('./')}", | |
| original_image_path.replace("./src/", "../") | |
| ] | |
| image_path = None | |
| for alt in alternatives: | |
| if os.path.exists(alt): | |
| image_path = alt | |
| break | |
| if not image_path: | |
| image_path = original_image_path | |
| if VERBOSE_LOGGING: | |
| print(f"πΌοΈ PRIMARY Image path resolution:") | |
| print(f" Original: {original_image_path}") | |
| print(f" Fixed: {image_path}") | |
| print(f" Exists: {os.path.exists(image_path)}") | |
| image_info = { | |
| "path": image_path, | |
| "original_path": original_image_path, | |
| "title": metadata.get("title", "Gambar"), | |
| "caption": metadata.get("caption", result['content'][:100] + "..."), | |
| "page": metadata.get("page", "N/A"), | |
| "year": metadata.get("year", "N/A"), | |
| "description": result['content'][:200] + "..." if len(result['content']) > 200 else result['content'], | |
| "score": result.get("score", 0.0), | |
| "is_primary": True # Semua yang masuk ke sini adalah primary | |
| } | |
| image_data.append(image_info) | |
| if VERBOSE_LOGGING: | |
| print(f"πΌοΈ Added PRIMARY image: {image_path}") | |
| # π EXTRACT TABLE INFORMATION - HANYA PRIMARY | |
| elif content_type == "table": | |
| table_path = metadata.get("table_path", "") | |
| if table_path and os.path.exists(table_path): | |
| try: | |
| table_info = { | |
| "path": table_path, | |
| "title": metadata.get("title", "Tabel"), | |
| "page": metadata.get("page", "N/A"), | |
| "year": metadata.get("year", "N/A"), | |
| "rows": metadata.get("rows", 0), | |
| "cols": metadata.get("cols", 0), | |
| "description": result['content'][:200] + "..." if len(result['content']) > 200 else result['content'], | |
| "score": result.get("score", 0.0), | |
| "is_primary": True # Semua yang masuk ke sini adalah primary | |
| } | |
| # Load actual table data | |
| if table_path.endswith('.csv'): | |
| df = pd.read_csv(table_path) | |
| table_info["data"] = df | |
| table_info["data_type"] = "dataframe" | |
| elif table_path.endswith('.json'): | |
| with open(table_path, 'r', encoding='utf-8') as f: | |
| json_data = json.load(f) | |
| table_info["data"] = json_data | |
| table_info["data_type"] = "json" | |
| table_data.append(table_info) | |
| if VERBOSE_LOGGING: | |
| print(f"π Found PRIMARY table: {table_path}") | |
| except Exception as e: | |
| print(f"β Error loading table {table_path}: {e}") | |
| primary_results = [r for r in results if r.get("is_primary_result", True)] | |
| contextual_results = [r for r in results if not r.get("is_primary_result", True)] | |
| response_data = { | |
| "question": question, | |
| "answer": response.strip(), | |
| "context": context, | |
| "sources": results, | |
| "primary_sources": primary_results, | |
| "contextual_sources": contextual_results, | |
| "years_searched": query_context["years"], | |
| "content_types_used": query_context["preferred_content_types"], | |
| "total_sources": len(results), | |
| "primary_sources_count": len(primary_results), | |
| "contextual_sources_count": len(contextual_results), | |
| "has_images": len(image_data) > 0, | |
| "has_tables": len(table_data) > 0, | |
| "image_data": image_data, # Full image metadata dengan path, title, etc | |
| "table_data": table_data, # Loaded table data dengan DataFrame/JSON | |
| "image_paths": [img["path"] for img in image_data], | |
| "table_paths": [tbl["path"] for tbl in table_data], | |
| "year_comparison_mode": query_context["year_comparison_mode"], | |
| "context_expansion_enabled": self.CONTEXT_EXPANSION_ENABLED, | |
| "processing_time": datetime.now().isoformat() | |
| } | |
| if VERBOSE_LOGGING: | |
| print(f"β Query processed successfully") | |
| print(f"π― Primary sources: {len(primary_results)}") | |
| print(f"π Contextual sources: {len(contextual_results)}") | |
| print(f"πΌοΈ Images found: {len(image_data)}") | |
| print(f"π Tables found: {len(table_data)}") | |
| return response_data | |
| def get_context_chain(self, result_id: str, max_depth: int = 3) -> List[Dict]: | |
| """Get a chain of contextually related chunks starting from a specific result""" | |
| try: | |
| # This would work with your vectorstore to find chunks with similar metadata | |
| # Implementation depends on your vectorstore structure | |
| chain = [] | |
| current_id = result_id | |
| for depth in range(max_depth): | |
| # Find chunks with similar metadata to current chunk | |
| similar_chunks = self.vectorizer.find_similar_by_metadata(current_id) | |
| if not similar_chunks: | |
| break | |
| # Add the most similar chunk to chain | |
| best_match = similar_chunks[0] | |
| chain.append(best_match) | |
| current_id = best_match["metadata"]["id"] | |
| return chain | |
| except Exception as e: | |
| print(f"β Error building context chain: {e}") | |
| return [] | |
| def get_full_document_context(self, metadata: Dict, year: int) -> str: | |
| """Get comprehensive context from the entire document/source""" | |
| try: | |
| # Build document identifier | |
| doc_identifiers = [] | |
| if metadata.get('program'): | |
| doc_identifiers.append(metadata['program']) | |
| if metadata.get('year'): | |
| doc_identifiers.append(str(metadata['year'])) | |
| if metadata.get('chapter'): | |
| doc_identifiers.append(metadata['chapter']) | |
| # Search for all chunks from the same document | |
| doc_query = " ".join(doc_identifiers) | |
| # Get broader context | |
| doc_chunks = self.vectorizer.query_multimodal( | |
| query_text=doc_query, | |
| year=year, | |
| content_types=None, | |
| n_results=50 # Get many chunks from same document | |
| ) | |
| # Filter chunks that are actually from the same document | |
| same_doc_chunks = [] | |
| for chunk in doc_chunks: | |
| chunk_meta = chunk["metadata"] | |
| similarity_score = self.get_metadata_similarity_score(metadata, chunk_meta) | |
| if similarity_score > 0.5: # Adjust threshold as needed | |
| same_doc_chunks.append(chunk) | |
| # Sort by page number or similarity | |
| same_doc_chunks.sort(key=lambda x: ( | |
| x["metadata"].get("page", 999), | |
| x.get("score", 0) | |
| )) | |
| # Combine content with clear separators | |
| full_context = "" | |
| for i, chunk in enumerate(same_doc_chunks[:10]): # Limit to avoid token overflow | |
| page = chunk["metadata"].get("page", "N/A") | |
| content_type = chunk["metadata"].get("content_type", "unknown") | |
| full_context += f"\n--- {content_type.upper()} (Page {page}) ---\n" | |
| full_context += chunk["content"][:500] + "...\n" | |
| return full_context | |
| except Exception as e: | |
| print(f"β Error getting full document context: {e}") | |
| return "" | |
| def advanced_context_retrieval(self, query_context: Dict[str, Any], k: int = 10) -> List[Dict]: | |
| """Advanced retrieval that considers document structure and relationships""" | |
| # Step 1: Get initial high-quality results | |
| initial_results = self.retrieve_multimodal_context_enhanced(query_context, k//2) | |
| # Step 2: For each high-quality result, get its document context | |
| enhanced_results = [] | |
| seen_ids = set() | |
| for result in initial_results: | |
| result_id = result["metadata"].get("id", "") | |
| if result_id in seen_ids: | |
| continue | |
| seen_ids.add(result_id) | |
| result["context_level"] = "primary" | |
| enhanced_results.append(result) | |
| # Get document-level context | |
| year = result.get("search_year", result["metadata"].get("year")) | |
| if year: | |
| doc_context = self.get_full_document_context(result["metadata"], year) | |
| if doc_context: | |
| # Create a synthetic result with full document context | |
| doc_result = { | |
| "content": doc_context, | |
| "metadata": { | |
| **result["metadata"], | |
| "content_type": "document_context", | |
| "id": f"{result_id}_doc_context" | |
| }, | |
| "score": result.get("score", 0) * 0.8, # Slightly lower score | |
| "context_level": "document", | |
| "parent_id": result_id | |
| } | |
| enhanced_results.append(doc_result) | |
| # Step 3: Fill remaining slots with diverse content | |
| remaining_k = k - len(enhanced_results) | |
| if remaining_k > 0: | |
| additional_results = self.vectorizer.query_multimodal( | |
| query_text=query_context["cleaned_query"], | |
| year=query_context["years"][0] if query_context["years"] else 2024, | |
| content_types=None, | |
| n_results=remaining_k * 2 | |
| ) | |
| for add_result in additional_results: | |
| add_id = add_result["metadata"].get("id", "") | |
| if add_id not in seen_ids and len(enhanced_results) < k: | |
| add_result["context_level"] = "supplementary" | |
| enhanced_results.append(add_result) | |
| seen_ids.add(add_id) | |
| return enhanced_results[:k] | |