import os import json import pandas as pd from typing import List, Dict, Any, Optional, Tuple, Set from datetime import datetime from dotenv import load_dotenv import numpy as np from sklearn.metrics.pairwise import cosine_similarity import re from langchain_community.embeddings import OpenAIEmbeddings from langchain_community.vectorstores import Chroma from langchain_community.chat_models import ChatOpenAI from langchain.prompts import PromptTemplate from collections import defaultdict from vectorization import LangChainMultimodalVectorizer from year_parser import YearParser from config import * load_dotenv() class EnhancedMultimodalRAGSystem: def __init__(self): """Initialize enhanced RAG system with multimodal capabilities""" self.vectorizer = LangChainMultimodalVectorizer() self.llm = ChatOpenAI( # openai_api_key=os.getenv("OPENAI_API_KEY"), # model_name=os.getenv("OPENAI_MODEL", DEFAULT_LLM_MODEL), temperature=LLM_TEMPERATURE, max_tokens=MAX_TOKENS, request_timeout=LLM_TIMEOUT ) self.year_parser = YearParser() self.COSINE_SIMILARITY_THRESHOLD = COSINE_SIMILARITY_THRESHOLD self.MAX_SIMILAR_CONTEXT = MAX_SIMILAR_CONTEXT self.VALID_YEARS = VALID_YEARS # New: Context expansion settings self.CONTEXT_EXPANSION_ENABLED = True self.MAX_CONTEXT_CHUNKS_PER_SOURCE = 5 # Max additional chunks per source self.CONTEXT_SIMILARITY_THRESHOLD = 0.7 # Similarity threshold for context expansion if VERBOSE_LOGGING: print(f"🚀 Enhanced Multimodal RAG System initialized") print(f" 🧠 LLM Model: {os.getenv('OPENAI_MODEL', DEFAULT_LLM_MODEL)}") print(f" 📊 Cosine Similarity Threshold: {self.COSINE_SIMILARITY_THRESHOLD}") print(f" 📅 Valid Years: {self.VALID_YEARS}") print(f" 🔗 Context Expansion: {self.CONTEXT_EXPANSION_ENABLED}") def get_metadata_similarity_score(self, meta1: Dict, meta2: Dict) -> float: """Calculate similarity score between two metadata objects""" similarity_score = 0.0 total_weight = 0.0 # Define weights for different metadata fields field_weights = { 'year': 0.3, 'page': 0.2, 'program': 0.25, 'semester': 0.15, 'chapter': 0.2, 'section': 0.15, 'subsection': 0.1, 'content_type': 0.2, 'course_code': 0.15, 'mata_kuliah': 0.15 } for field, weight in field_weights.items(): if field in meta1 and field in meta2: total_weight += weight if field in ['year', 'page', 'semester']: if meta1[field] == meta2[field]: similarity_score += weight elif field == 'page': try: page1, page2 = int(meta1[field]), int(meta2[field]) page_diff = abs(page1 - page2) if page_diff == 0: similarity_score += weight elif page_diff <= 2: similarity_score += weight * 0.5 except: pass else: str1, str2 = str(meta1[field]).lower(), str(meta2[field]).lower() if str1 == str2: similarity_score += weight elif str1 in str2 or str2 in str1: similarity_score += weight * 0.7 return similarity_score / total_weight if total_weight > 0 else 0.0 def find_contextual_chunks(self, base_result: Dict, all_results: List[Dict]) -> List[Dict]: base_metadata = base_result["metadata"] contextual_chunks = [] for result in all_results: if result["metadata"].get("id") == base_metadata.get("id"): continue if result["metadata"].get("year") != base_metadata.get("year"): continue similarity_score = self.get_metadata_similarity_score(base_metadata, result["metadata"]) if similarity_score >= self.CONTEXT_SIMILARITY_THRESHOLD: result["context_similarity_score"] = similarity_score contextual_chunks.append(result) # Sort by similarity score and limit contextual_chunks.sort(key=lambda x: x["context_similarity_score"], reverse=True) return contextual_chunks[:self.MAX_CONTEXT_CHUNKS_PER_SOURCE] def get_document_chunks_by_metadata(self, metadata: Dict, year: int) -> List[Dict]: """Get all chunks from the same document/source with similar metadata""" try: # Build a more specific query based on metadata search_filters = [] if metadata.get('program'): search_filters.append(f"program:{metadata['program']}") if metadata.get('semester'): search_filters.append(f"semester:{metadata['semester']}") if metadata.get('chapter'): search_filters.append(f"chapter:{metadata['chapter']}") if metadata.get('section'): search_filters.append(f"section:{metadata['section']}") # Create a search query from metadata search_query = " ".join(search_filters) if search_filters else metadata.get('title', '') # Get chunks from vectorstore with broader search results = self.vectorizer.query_multimodal( query_text=search_query, year=year, content_types=None, n_results=20 # Get more results for context expansion ) return results except Exception as e: print(f"❌ Error getting document chunks: {e}") return [] def expand_context_for_results(self, initial_results: List[Dict]) -> List[Dict]: """Expand context by finding related chunks for each initial result""" if not self.CONTEXT_EXPANSION_ENABLED: return initial_results expanded_results = [] seen_ids = set() for result in initial_results: # Add the original result result_id = result["metadata"].get("id", "") if result_id not in seen_ids: result["is_primary_result"] = True expanded_results.append(result) seen_ids.add(result_id) # Find contextual chunks year = result.get("search_year", result["metadata"].get("year")) if year: document_chunks = self.get_document_chunks_by_metadata( result["metadata"], year ) contextual_chunks = self.find_contextual_chunks(result, document_chunks) # Add contextual chunks for ctx_chunk in contextual_chunks: ctx_id = ctx_chunk["metadata"].get("id", "") if ctx_id not in seen_ids: ctx_chunk["is_primary_result"] = False ctx_chunk["parent_result_id"] = result_id expanded_results.append(ctx_chunk) seen_ids.add(ctx_id) if VERBOSE_LOGGING: print(f"🔗 Added contextual chunk for {result_id}: {ctx_id}") if VERBOSE_LOGGING: primary_count = sum(1 for r in expanded_results if r.get("is_primary_result", False)) context_count = len(expanded_results) - primary_count print( f"📈 Context expansion: {primary_count} primary + {context_count} contextual = {len(expanded_results)} total") return expanded_results def group_related_content(self, results: List[Dict]) -> Dict[str, List[Dict]]: """Group results by their relationships (same document, similar metadata, etc.)""" groups = defaultdict(list) for result in results: metadata = result["metadata"] # Create grouping key based on metadata group_key_parts = [] if metadata.get('program'): group_key_parts.append(f"prog_{metadata['program']}") if metadata.get('year'): group_key_parts.append(f"year_{metadata['year']}") if metadata.get('semester'): group_key_parts.append(f"sem_{metadata['semester']}") if metadata.get('chapter'): group_key_parts.append(f"ch_{metadata['chapter']}") if metadata.get('content_type'): group_key_parts.append(f"type_{metadata['content_type']}") group_key = "_".join(group_key_parts) if group_key_parts else "general" groups[group_key].append(result) return dict(groups) def retrieve_multimodal_context_enhanced(self, query_context: Dict[str, Any], k: int = 10) -> List[Dict]: """Enhanced retrieval with context expansion""" all_results = [] content_strategies = {} for content_type, ratio in CONTENT_TYPE_STRATEGIES.items(): content_strategies[content_type] = max(1, int(k * ratio)) if LOG_RETRIEVAL_DETAILS: print(f"🎯 Content strategies: {content_strategies}") print(f"📅 Searching years: {query_context['years']}") # Step 1: Get initial results for year in query_context["years"]: if year not in self.VALID_YEARS: print(f"⚠️ Skipping invalid year: {year}") continue try: if query_context.get("preferred_content_types"): for content_type in query_context["preferred_content_types"]: results = self.vectorizer.query_multimodal( query_text=query_context["cleaned_query"], year=year, content_types=[content_type], n_results=content_strategies.get(content_type, k//4) ) for result in results: result["search_year"] = year result["content_priority"] = True all_results.extend(results) remaining_k = max(1, k - len(all_results)) general_results = self.vectorizer.query_multimodal( query_text=query_context["cleaned_query"], year=year, content_types=None, n_results=remaining_k ) for result in general_results: result["search_year"] = year result["content_priority"] = False all_results.extend(general_results) except Exception as e: print(f"❌ Error retrieving from year {year}: {e}") # Step 2: deduplikasi unique_results = self._deduplicate_and_rank_results(all_results, k) # Step 3: Mencari konteks diluar dengana meta expanded_results = self.expand_context_for_results(unique_results) # Step 4: Final ranking and limiting final_results = self._final_ranking_with_context(expanded_results, k * 2) # Allow more results due to context if VERBOSE_LOGGING: print(f"📚 Final results with context: {len(final_results)}") return final_results def _final_ranking_with_context(self, results: List[Dict], max_results: int) -> List[Dict]: """Final ranking that considers both primary results and their context""" # Separate primary and contextual results primary_results = [r for r in results if r.get("is_primary_result", True)] contextual_results = [r for r in results if not r.get("is_primary_result", True)] # Sort primary results by score primary_results.sort(key=lambda x: x.get("score", 0), reverse=True) # For each primary result, add its best contextual chunks final_results = [] for primary in primary_results: if len(final_results) >= max_results: break final_results.append(primary) # Add related contextual chunks primary_id = primary["metadata"].get("id", "") related_contexts = [ r for r in contextual_results if r.get("parent_result_id") == primary_id ] # Sort contextual chunks by their similarity score related_contexts.sort(key=lambda x: x.get("context_similarity_score", 0), reverse=True) # Add top contextual chunks for ctx in related_contexts[:2]: # Limit to 2 contextual chunks per primary if len(final_results) < max_results: final_results.append(ctx) return final_results def format_enhanced_context_with_grouping(self, results: List[Dict]) -> str: """Format context with grouping and relationship indicators""" if not results: return "Tidak ada informasi yang relevan ditemukan." # Group related content grouped_results = self.group_related_content(results) context_parts = [] for group_key, group_results in grouped_results.items(): context_parts.append(f"\n{'='*60}") context_parts.append(f"📂 GRUP: {group_key.replace('_', ' ').upper()}") context_parts.append(f"{'='*60}") for i, result in enumerate(group_results, 1): content_type = result["metadata"]["content_type"] is_primary = result.get("is_primary_result", True) # Add indicator for primary vs contextual result_type = "🎯 PRIMARY" if is_primary else "🔗 CONTEXT" # Enhanced formatting based on content type if content_type == "table": context_part = self.enhance_table_context_with_markdown(result) elif content_type == "image": context_part = self.enhance_image_context_with_details(result) elif content_type == "silabus": context_part = self.enhance_silabus_context_detailed(result) elif content_type == "curriculum": context_part = self.enhance_curriculum_context_detailed(result) elif content_type == "text_chunk": context_part = self.enhance_text_context_detailed(result) else: context_part = f""" **KONTEN {content_type.upper()}:** - **Tahun:** {result["metadata"].get('year', 'N/A')} - **Halaman:** {result["metadata"].get('page', 'N/A')} - **Context:** {result.get('context_text', '')[:200]}... **Konten:** {result['content'][:500]}... """ header = f"**{result_type} SUMBER {i}:**" if not is_primary: similarity_score = result.get("context_similarity_score", 0) header += f" (Similarity: {similarity_score:.2f})" context_parts.append(f"{header}\n{context_part}") return "\n\n".join(context_parts) def _deduplicate_and_rank_results(self, all_results: List[Dict], k: int) -> List[Dict]: seen_ids = set() unique_results = [] sorted_results = sorted( all_results, key=lambda x: (x.get("score", 0), not x.get("content_priority", False)) ) content_type_counts = {} max_per_type = max(1, k // len(CONTENT_TYPE_STRATEGIES)) for result in sorted_results: result_id = result["metadata"].get("id", "") content_type = result["metadata"]["content_type"] # Skip duplicates if result_id in seen_ids: continue # Limit per content type for diversity (unless priority content) if not result.get("content_priority", False): if content_type_counts.get(content_type, 0) >= max_per_type: continue seen_ids.add(result_id) content_type_counts[content_type] = content_type_counts.get(content_type, 0) + 1 # Enhance with context_text if "context_text" not in result: result["context_text"] = result["metadata"].get("context_text", "") unique_results.append(result) if len(unique_results) >= k: break return unique_results def enhance_table_context_with_markdown(self, result: Dict) -> str: """Enhanced table context with markdown formatting""" metadata = result["metadata"] context_text = result.get("context_text", "") enhanced_context = f""" **TABEL ENHANCED:** - **Judul:** {metadata.get('title', 'N/A')} - **Ukuran:** {metadata.get('rows', 0)} baris × {metadata.get('cols', 0)} kolom - **Tahun:** {metadata.get('year', 'N/A')} - **Halaman:** {metadata.get('page', 'N/A')} - **Context:** {context_text} - **Preview:** {result['content'][:300]}... **Konten Lengkap:** {result['content']} """ return enhanced_context def enhance_image_context_with_details(self, result: Dict) -> str: """Enhanced image context with detailed metadata""" metadata = result["metadata"] context_text = result.get("context_text", "") enhanced_context = f""" **GAMBAR ENHANCED:** - **Judul:** {metadata.get('title', 'N/A')} - **Caption:** {metadata.get('caption', 'N/A')} - **Tahun:** {metadata.get('year', 'N/A')} - **Halaman:** {metadata.get('page', 'N/A')} - **Context:** {context_text} - **Deskripsi:** {result['content'][:300]}... **Path Gambar:** {metadata.get('image_path', 'N/A')} """ return enhanced_context def enhance_silabus_context_detailed(self, result: Dict) -> str: """Enhanced silabus context with comprehensive details""" metadata = result["metadata"] context_text = result.get("context_text", "") enhanced_context = f""" **SILABUS ENHANCED:** - **Mata Kuliah:** {metadata.get('mata_kuliah', 'N/A')} ({metadata.get('course_code', 'N/A')}) - **Program Studi:** {metadata.get('program', 'N/A').title()} - **Semester:** {metadata.get('semester', 'N/A')} - **SKS:** {metadata.get('sks', 'N/A')} - **Tipe Silabus:** {metadata.get('silabus_type', 'N/A')} - **Tahun Kurikulum:** {metadata.get('year', 'N/A')} - **Halaman:** {metadata.get('page', 'N/A')} - **Context Text:** {context_text} **Konten Lengkap:** {result['content']} """ return enhanced_context def enhance_curriculum_context_detailed(self, result: Dict) -> str: """Enhanced curriculum context with comprehensive details""" metadata = result["metadata"] context_text = result.get("context_text", "") enhanced_context = f""" **KURIKULUM ENHANCED:** - **Program Studi:** {metadata.get('program', 'N/A').title()} - **Semester:** {metadata.get('semester', 'N/A')} - **Jenis Tabel:** {metadata.get('table_type', 'N/A')} - **Jumlah Mata Kuliah:** {metadata.get('rows_count', 'N/A')} - **Tahun Kurikulum:** {metadata.get('year', 'N/A')} - **Halaman:** {metadata.get('page', 'N/A')} - **Context Text:** {context_text} **Konten Lengkap:** {result['content']} """ return enhanced_context def enhance_text_context_detailed(self, result: Dict) -> str: """Enhanced text context with comprehensive details""" metadata = result["metadata"] context_text = result.get("context_text", "") enhanced_context = f""" **TEKS ENHANCED:** - **Bab:** {metadata.get('chapter', 'N/A')} - **Bagian:** {metadata.get('section', 'N/A')} - **Sub-bagian:** {metadata.get('subsection', 'N/A')} - **Tahun:** {metadata.get('year', 'N/A')} - **Halaman:** {metadata.get('page', 'N/A')} - **Context Text:** {context_text} **Konten Lengkap:** {result['content']} """ return enhanced_context def format_enhanced_context(self, results: List[Dict]) -> str: """Format context with comprehensive enhancements and grouping""" return self.format_enhanced_context_with_grouping(results) def generate_response(self, query: str, context: str, chat_history: List[Dict] = None) -> str: """Generate response using LLM with context and chat history""" # Prepare chat history context chat_history_text = "" if chat_history and len(chat_history) > 1: recent_messages = chat_history[-CONTEXT_WINDOW_SIZE:] chat_history_text = "\n\nRiwayat Percakapan Terakhir:\n" for msg in recent_messages[:-1]: # Exclude current message role = "User" if msg["role"] == "user" else "Assistant" chat_history_text += f"{role}: {msg['content'][:200]}...\n" # Enhanced prompt enhanced_prompt = f""" Anda adalah asisten akademik DTMI UGM yang membantu mahasiswa dan dosen. {chat_history_text} Pertanyaan Saat Ini: {query} Konteks Informasi: {context} Instruksi: 1. Berikan jawaban yang komprehensif dan akurat 2. Gunakan informasi dari konteks yang relevan 3. Jika merujuk ke tahun atau program studi, sebutkan secara spesifik 4. Format jawaban dengan struktur yang jelas (gunakan bullet points, numbering jika perlu) 5. Jika ada tabel atau data, jelaskan dengan detail 6. Akhiri dengan saran atau informasi tambahan yang berguna 7. Pertimbangkan konteks percakapan sebelumnya jika relevan 8. Manfaatkan informasi kontekstual yang tersedia untuk memberikan jawaban yang lebih lengkap Jawaban: """ for attempt in range(MAX_RETRIES): try: response = self.llm.predict(enhanced_prompt) return response except Exception as e: if attempt == MAX_RETRIES - 1: return FALLBACK_RESPONSE else: import time time.sleep(RETRY_DELAY) return FALLBACK_RESPONSE def parse_query_context(self, query: str) -> Dict[str, Any]: """Parse query context with year extraction and content type detection""" years, cleaned_query, user_mentioned_year, user_mentioned_invalid_year = self.year_parser.extract_years(query) comparison_keywords = ["bandingkan", "banding", "perbandingan", "dibanding", "vs", "versus", "perbedaan"] year_comparison_mode = any(keyword in cleaned_query.lower() for keyword in comparison_keywords) and len(years) > 1 content_type_hints = { "silabus": ["silabus", "mata kuliah", "course", "sks", "pembelajaran", "materi"], "curriculum": ["kurikulum", "curriculum", "semester", "program studi", "struktur"], "table": ["tabel", "table", "data", "statistik", "daftar", "distribusi"], "image": ["gambar", "image", "foto", "diagram", "struktur", "chart"], "text_chunk": ["informasi", "penjelasan", "deskripsi", "detail", "tentang"] } preferred_types = [] query_lower = cleaned_query.lower() for content_type, keywords in content_type_hints.items(): if any(keyword in query_lower for keyword in keywords): preferred_types.append(content_type) return { "original_query": query, "cleaned_query": cleaned_query, "years": years, "preferred_content_types": preferred_types, "year_comparison_mode": year_comparison_mode } def query(self, question: str, k: int = 10, content_filter: List[str] = None) -> Dict[str, Any]: years, cleaned_query, user_mentioned_year, user_mentioned_invalid_year = self.year_parser.extract_years( question) if user_mentioned_invalid_year and not years: return { "question": question, "answer": "Maaf, informasi mengenai kurikulum tahun yang Anda minta tidak tersedia dalam konteks database ini.", "context": "", "sources": [], "primary_sources": [], "contextual_sources": [], "years_searched": [], "content_types_used": [], "total_sources": 0, "primary_sources_count": 0, "contextual_sources_count": 0, "has_images": False, "has_tables": False, "image_data": [], "table_data": [], "image_paths": [], "table_paths": [], "year_comparison_mode": False, "context_expansion_enabled": self.CONTEXT_EXPANSION_ENABLED, "processing_time": datetime.now().isoformat() } if VERBOSE_LOGGING: print(f"🔍 Processing query: {question}") query_context = self.parse_query_context(question) if content_filter: query_context["preferred_content_types"] = content_filter if LOG_RETRIEVAL_DETAILS: print(f"📅 Years: {query_context['years']}") print(f"🎯 Content types: {query_context['preferred_content_types']}") print(f"🔍 Content filter: {content_filter}") results = self.retrieve_multimodal_context_enhanced(query_context, k) context = self.format_enhanced_context(results) try: response = self.generate_response(question, context) except Exception as e: print(f"❌ Error generating answer: {e}") response = FALLBACK_RESPONSE image_data = [] table_data = [] for result in results: metadata = result["metadata"] content_type = metadata.get("content_type", "") # ✅ FILTER: HANYA AMBIL YANG PRIMARY SOURCES is_primary = result.get("is_primary_result", True) if not is_primary: continue # Skip contextual sources # 🖼️ EXTRACT IMAGE INFORMATION - HANYA PRIMARY if content_type == "image": original_image_path = metadata.get("image_path", "") if original_image_path: # Path fixing logic (sama seperti sebelumnya) fixed_path = original_image_path if fixed_path.startswith("./src/"): fixed_path = fixed_path.replace("./src/", "./") elif fixed_path.startswith("src/"): fixed_path = fixed_path.replace("src/", "./") if os.path.exists(fixed_path): image_path = fixed_path elif os.path.exists(original_image_path): image_path = original_image_path else: alternatives = [ original_image_path.lstrip('./'), f"../{original_image_path.lstrip('./')}", original_image_path.replace("./src/", "../") ] image_path = None for alt in alternatives: if os.path.exists(alt): image_path = alt break if not image_path: image_path = original_image_path if VERBOSE_LOGGING: print(f"🖼️ PRIMARY Image path resolution:") print(f" Original: {original_image_path}") print(f" Fixed: {image_path}") print(f" Exists: {os.path.exists(image_path)}") image_info = { "path": image_path, "original_path": original_image_path, "title": metadata.get("title", "Gambar"), "caption": metadata.get("caption", result['content'][:100] + "..."), "page": metadata.get("page", "N/A"), "year": metadata.get("year", "N/A"), "description": result['content'][:200] + "..." if len(result['content']) > 200 else result['content'], "score": result.get("score", 0.0), "is_primary": True # Semua yang masuk ke sini adalah primary } image_data.append(image_info) if VERBOSE_LOGGING: print(f"🖼️ Added PRIMARY image: {image_path}") # 📊 EXTRACT TABLE INFORMATION - HANYA PRIMARY elif content_type == "table": table_path = metadata.get("table_path", "") if table_path and os.path.exists(table_path): try: table_info = { "path": table_path, "title": metadata.get("title", "Tabel"), "page": metadata.get("page", "N/A"), "year": metadata.get("year", "N/A"), "rows": metadata.get("rows", 0), "cols": metadata.get("cols", 0), "description": result['content'][:200] + "..." if len(result['content']) > 200 else result['content'], "score": result.get("score", 0.0), "is_primary": True # Semua yang masuk ke sini adalah primary } # Load actual table data if table_path.endswith('.csv'): df = pd.read_csv(table_path) table_info["data"] = df table_info["data_type"] = "dataframe" elif table_path.endswith('.json'): with open(table_path, 'r', encoding='utf-8') as f: json_data = json.load(f) table_info["data"] = json_data table_info["data_type"] = "json" table_data.append(table_info) if VERBOSE_LOGGING: print(f"📊 Found PRIMARY table: {table_path}") except Exception as e: print(f"❌ Error loading table {table_path}: {e}") primary_results = [r for r in results if r.get("is_primary_result", True)] contextual_results = [r for r in results if not r.get("is_primary_result", True)] response_data = { "question": question, "answer": response.strip(), "context": context, "sources": results, "primary_sources": primary_results, "contextual_sources": contextual_results, "years_searched": query_context["years"], "content_types_used": query_context["preferred_content_types"], "total_sources": len(results), "primary_sources_count": len(primary_results), "contextual_sources_count": len(contextual_results), "has_images": len(image_data) > 0, "has_tables": len(table_data) > 0, "image_data": image_data, # Full image metadata dengan path, title, etc "table_data": table_data, # Loaded table data dengan DataFrame/JSON "image_paths": [img["path"] for img in image_data], "table_paths": [tbl["path"] for tbl in table_data], "year_comparison_mode": query_context["year_comparison_mode"], "context_expansion_enabled": self.CONTEXT_EXPANSION_ENABLED, "processing_time": datetime.now().isoformat() } if VERBOSE_LOGGING: print(f"✅ Query processed successfully") print(f"🎯 Primary sources: {len(primary_results)}") print(f"🔗 Contextual sources: {len(contextual_results)}") print(f"🖼️ Images found: {len(image_data)}") print(f"📊 Tables found: {len(table_data)}") return response_data def get_context_chain(self, result_id: str, max_depth: int = 3) -> List[Dict]: """Get a chain of contextually related chunks starting from a specific result""" try: # This would work with your vectorstore to find chunks with similar metadata # Implementation depends on your vectorstore structure chain = [] current_id = result_id for depth in range(max_depth): # Find chunks with similar metadata to current chunk similar_chunks = self.vectorizer.find_similar_by_metadata(current_id) if not similar_chunks: break # Add the most similar chunk to chain best_match = similar_chunks[0] chain.append(best_match) current_id = best_match["metadata"]["id"] return chain except Exception as e: print(f"❌ Error building context chain: {e}") return [] def get_full_document_context(self, metadata: Dict, year: int) -> str: """Get comprehensive context from the entire document/source""" try: # Build document identifier doc_identifiers = [] if metadata.get('program'): doc_identifiers.append(metadata['program']) if metadata.get('year'): doc_identifiers.append(str(metadata['year'])) if metadata.get('chapter'): doc_identifiers.append(metadata['chapter']) # Search for all chunks from the same document doc_query = " ".join(doc_identifiers) # Get broader context doc_chunks = self.vectorizer.query_multimodal( query_text=doc_query, year=year, content_types=None, n_results=50 # Get many chunks from same document ) # Filter chunks that are actually from the same document same_doc_chunks = [] for chunk in doc_chunks: chunk_meta = chunk["metadata"] similarity_score = self.get_metadata_similarity_score(metadata, chunk_meta) if similarity_score > 0.5: # Adjust threshold as needed same_doc_chunks.append(chunk) # Sort by page number or similarity same_doc_chunks.sort(key=lambda x: ( x["metadata"].get("page", 999), x.get("score", 0) )) # Combine content with clear separators full_context = "" for i, chunk in enumerate(same_doc_chunks[:10]): # Limit to avoid token overflow page = chunk["metadata"].get("page", "N/A") content_type = chunk["metadata"].get("content_type", "unknown") full_context += f"\n--- {content_type.upper()} (Page {page}) ---\n" full_context += chunk["content"][:500] + "...\n" return full_context except Exception as e: print(f"❌ Error getting full document context: {e}") return "" def advanced_context_retrieval(self, query_context: Dict[str, Any], k: int = 10) -> List[Dict]: """Advanced retrieval that considers document structure and relationships""" # Step 1: Get initial high-quality results initial_results = self.retrieve_multimodal_context_enhanced(query_context, k//2) # Step 2: For each high-quality result, get its document context enhanced_results = [] seen_ids = set() for result in initial_results: result_id = result["metadata"].get("id", "") if result_id in seen_ids: continue seen_ids.add(result_id) result["context_level"] = "primary" enhanced_results.append(result) # Get document-level context year = result.get("search_year", result["metadata"].get("year")) if year: doc_context = self.get_full_document_context(result["metadata"], year) if doc_context: # Create a synthetic result with full document context doc_result = { "content": doc_context, "metadata": { **result["metadata"], "content_type": "document_context", "id": f"{result_id}_doc_context" }, "score": result.get("score", 0) * 0.8, # Slightly lower score "context_level": "document", "parent_id": result_id } enhanced_results.append(doc_result) # Step 3: Fill remaining slots with diverse content remaining_k = k - len(enhanced_results) if remaining_k > 0: additional_results = self.vectorizer.query_multimodal( query_text=query_context["cleaned_query"], year=query_context["years"][0] if query_context["years"] else 2024, content_types=None, n_results=remaining_k * 2 ) for add_result in additional_results: add_id = add_result["metadata"].get("id", "") if add_id not in seen_ids and len(enhanced_results) < k: add_result["context_level"] = "supplementary" enhanced_results.append(add_result) seen_ids.add(add_id) return enhanced_results[:k]