optima / src /RAG.py
wicaksonolm2's picture
[30.06.25] wicaksono-tmr | ✨ feat : ""
6f320d1
import os
import json
import pandas as pd
from typing import List, Dict, Any, Optional, Tuple, Set
from datetime import datetime
from dotenv import load_dotenv
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity
import re
from langchain_community.embeddings import OpenAIEmbeddings
from langchain_community.vectorstores import Chroma
from langchain_community.chat_models import ChatOpenAI
from langchain.prompts import PromptTemplate
from collections import defaultdict
from vectorization import LangChainMultimodalVectorizer
from year_parser import YearParser
from config import *
load_dotenv()
class EnhancedMultimodalRAGSystem:
def __init__(self):
"""Initialize enhanced RAG system with multimodal capabilities"""
self.vectorizer = LangChainMultimodalVectorizer()
self.llm = ChatOpenAI(
# openai_api_key=os.getenv("OPENAI_API_KEY"),
# model_name=os.getenv("OPENAI_MODEL", DEFAULT_LLM_MODEL),
temperature=LLM_TEMPERATURE,
max_tokens=MAX_TOKENS,
request_timeout=LLM_TIMEOUT
)
self.year_parser = YearParser()
self.COSINE_SIMILARITY_THRESHOLD = COSINE_SIMILARITY_THRESHOLD
self.MAX_SIMILAR_CONTEXT = MAX_SIMILAR_CONTEXT
self.VALID_YEARS = VALID_YEARS
# New: Context expansion settings
self.CONTEXT_EXPANSION_ENABLED = True
self.MAX_CONTEXT_CHUNKS_PER_SOURCE = 5 # Max additional chunks per source
self.CONTEXT_SIMILARITY_THRESHOLD = 0.7 # Similarity threshold for context expansion
if VERBOSE_LOGGING:
print(f"πŸš€ Enhanced Multimodal RAG System initialized")
print(f" 🧠 LLM Model: {os.getenv('OPENAI_MODEL', DEFAULT_LLM_MODEL)}")
print(f" πŸ“Š Cosine Similarity Threshold: {self.COSINE_SIMILARITY_THRESHOLD}")
print(f" πŸ“… Valid Years: {self.VALID_YEARS}")
print(f" πŸ”— Context Expansion: {self.CONTEXT_EXPANSION_ENABLED}")
def get_metadata_similarity_score(self, meta1: Dict, meta2: Dict) -> float:
"""Calculate similarity score between two metadata objects"""
similarity_score = 0.0
total_weight = 0.0
# Define weights for different metadata fields
field_weights = {
'year': 0.3,
'page': 0.2,
'program': 0.25,
'semester': 0.15,
'chapter': 0.2,
'section': 0.15,
'subsection': 0.1,
'content_type': 0.2,
'course_code': 0.15,
'mata_kuliah': 0.15
}
for field, weight in field_weights.items():
if field in meta1 and field in meta2:
total_weight += weight
if field in ['year', 'page', 'semester']:
if meta1[field] == meta2[field]:
similarity_score += weight
elif field == 'page':
try:
page1, page2 = int(meta1[field]), int(meta2[field])
page_diff = abs(page1 - page2)
if page_diff == 0:
similarity_score += weight
elif page_diff <= 2:
similarity_score += weight * 0.5
except:
pass
else:
str1, str2 = str(meta1[field]).lower(), str(meta2[field]).lower()
if str1 == str2:
similarity_score += weight
elif str1 in str2 or str2 in str1:
similarity_score += weight * 0.7
return similarity_score / total_weight if total_weight > 0 else 0.0
def find_contextual_chunks(self, base_result: Dict, all_results: List[Dict]) -> List[Dict]:
base_metadata = base_result["metadata"]
contextual_chunks = []
for result in all_results:
if result["metadata"].get("id") == base_metadata.get("id"):
continue
if result["metadata"].get("year") != base_metadata.get("year"):
continue
similarity_score = self.get_metadata_similarity_score(base_metadata, result["metadata"])
if similarity_score >= self.CONTEXT_SIMILARITY_THRESHOLD:
result["context_similarity_score"] = similarity_score
contextual_chunks.append(result)
# Sort by similarity score and limit
contextual_chunks.sort(key=lambda x: x["context_similarity_score"], reverse=True)
return contextual_chunks[:self.MAX_CONTEXT_CHUNKS_PER_SOURCE]
def get_document_chunks_by_metadata(self, metadata: Dict, year: int) -> List[Dict]:
"""Get all chunks from the same document/source with similar metadata"""
try:
# Build a more specific query based on metadata
search_filters = []
if metadata.get('program'):
search_filters.append(f"program:{metadata['program']}")
if metadata.get('semester'):
search_filters.append(f"semester:{metadata['semester']}")
if metadata.get('chapter'):
search_filters.append(f"chapter:{metadata['chapter']}")
if metadata.get('section'):
search_filters.append(f"section:{metadata['section']}")
# Create a search query from metadata
search_query = " ".join(search_filters) if search_filters else metadata.get('title', '')
# Get chunks from vectorstore with broader search
results = self.vectorizer.query_multimodal(
query_text=search_query,
year=year,
content_types=None,
n_results=20 # Get more results for context expansion
)
return results
except Exception as e:
print(f"❌ Error getting document chunks: {e}")
return []
def expand_context_for_results(self, initial_results: List[Dict]) -> List[Dict]:
"""Expand context by finding related chunks for each initial result"""
if not self.CONTEXT_EXPANSION_ENABLED:
return initial_results
expanded_results = []
seen_ids = set()
for result in initial_results:
# Add the original result
result_id = result["metadata"].get("id", "")
if result_id not in seen_ids:
result["is_primary_result"] = True
expanded_results.append(result)
seen_ids.add(result_id)
# Find contextual chunks
year = result.get("search_year", result["metadata"].get("year"))
if year:
document_chunks = self.get_document_chunks_by_metadata(
result["metadata"], year
)
contextual_chunks = self.find_contextual_chunks(result, document_chunks)
# Add contextual chunks
for ctx_chunk in contextual_chunks:
ctx_id = ctx_chunk["metadata"].get("id", "")
if ctx_id not in seen_ids:
ctx_chunk["is_primary_result"] = False
ctx_chunk["parent_result_id"] = result_id
expanded_results.append(ctx_chunk)
seen_ids.add(ctx_id)
if VERBOSE_LOGGING:
print(f"πŸ”— Added contextual chunk for {result_id}: {ctx_id}")
if VERBOSE_LOGGING:
primary_count = sum(1 for r in expanded_results if r.get("is_primary_result", False))
context_count = len(expanded_results) - primary_count
print(
f"πŸ“ˆ Context expansion: {primary_count} primary + {context_count} contextual = {len(expanded_results)} total")
return expanded_results
def group_related_content(self, results: List[Dict]) -> Dict[str, List[Dict]]:
"""Group results by their relationships (same document, similar metadata, etc.)"""
groups = defaultdict(list)
for result in results:
metadata = result["metadata"]
# Create grouping key based on metadata
group_key_parts = []
if metadata.get('program'):
group_key_parts.append(f"prog_{metadata['program']}")
if metadata.get('year'):
group_key_parts.append(f"year_{metadata['year']}")
if metadata.get('semester'):
group_key_parts.append(f"sem_{metadata['semester']}")
if metadata.get('chapter'):
group_key_parts.append(f"ch_{metadata['chapter']}")
if metadata.get('content_type'):
group_key_parts.append(f"type_{metadata['content_type']}")
group_key = "_".join(group_key_parts) if group_key_parts else "general"
groups[group_key].append(result)
return dict(groups)
def retrieve_multimodal_context_enhanced(self, query_context: Dict[str, Any], k: int = 10) -> List[Dict]:
"""Enhanced retrieval with context expansion"""
all_results = []
content_strategies = {}
for content_type, ratio in CONTENT_TYPE_STRATEGIES.items():
content_strategies[content_type] = max(1, int(k * ratio))
if LOG_RETRIEVAL_DETAILS:
print(f"🎯 Content strategies: {content_strategies}")
print(f"πŸ“… Searching years: {query_context['years']}")
# Step 1: Get initial results
for year in query_context["years"]:
if year not in self.VALID_YEARS:
print(f"⚠️ Skipping invalid year: {year}")
continue
try:
if query_context.get("preferred_content_types"):
for content_type in query_context["preferred_content_types"]:
results = self.vectorizer.query_multimodal(
query_text=query_context["cleaned_query"],
year=year,
content_types=[content_type],
n_results=content_strategies.get(content_type, k//4)
)
for result in results:
result["search_year"] = year
result["content_priority"] = True
all_results.extend(results)
remaining_k = max(1, k - len(all_results))
general_results = self.vectorizer.query_multimodal(
query_text=query_context["cleaned_query"],
year=year,
content_types=None,
n_results=remaining_k
)
for result in general_results:
result["search_year"] = year
result["content_priority"] = False
all_results.extend(general_results)
except Exception as e:
print(f"❌ Error retrieving from year {year}: {e}")
# Step 2: deduplikasi
unique_results = self._deduplicate_and_rank_results(all_results, k)
# Step 3: Mencari konteks diluar dengana meta
expanded_results = self.expand_context_for_results(unique_results)
# Step 4: Final ranking and limiting
final_results = self._final_ranking_with_context(expanded_results, k * 2) # Allow more results due to context
if VERBOSE_LOGGING:
print(f"πŸ“š Final results with context: {len(final_results)}")
return final_results
def _final_ranking_with_context(self, results: List[Dict], max_results: int) -> List[Dict]:
"""Final ranking that considers both primary results and their context"""
# Separate primary and contextual results
primary_results = [r for r in results if r.get("is_primary_result", True)]
contextual_results = [r for r in results if not r.get("is_primary_result", True)]
# Sort primary results by score
primary_results.sort(key=lambda x: x.get("score", 0), reverse=True)
# For each primary result, add its best contextual chunks
final_results = []
for primary in primary_results:
if len(final_results) >= max_results:
break
final_results.append(primary)
# Add related contextual chunks
primary_id = primary["metadata"].get("id", "")
related_contexts = [
r for r in contextual_results
if r.get("parent_result_id") == primary_id
]
# Sort contextual chunks by their similarity score
related_contexts.sort(key=lambda x: x.get("context_similarity_score", 0), reverse=True)
# Add top contextual chunks
for ctx in related_contexts[:2]: # Limit to 2 contextual chunks per primary
if len(final_results) < max_results:
final_results.append(ctx)
return final_results
def format_enhanced_context_with_grouping(self, results: List[Dict]) -> str:
"""Format context with grouping and relationship indicators"""
if not results:
return "Tidak ada informasi yang relevan ditemukan."
# Group related content
grouped_results = self.group_related_content(results)
context_parts = []
for group_key, group_results in grouped_results.items():
context_parts.append(f"\n{'='*60}")
context_parts.append(f"πŸ“‚ GRUP: {group_key.replace('_', ' ').upper()}")
context_parts.append(f"{'='*60}")
for i, result in enumerate(group_results, 1):
content_type = result["metadata"]["content_type"]
is_primary = result.get("is_primary_result", True)
# Add indicator for primary vs contextual
result_type = "🎯 PRIMARY" if is_primary else "πŸ”— CONTEXT"
# Enhanced formatting based on content type
if content_type == "table":
context_part = self.enhance_table_context_with_markdown(result)
elif content_type == "image":
context_part = self.enhance_image_context_with_details(result)
elif content_type == "silabus":
context_part = self.enhance_silabus_context_detailed(result)
elif content_type == "curriculum":
context_part = self.enhance_curriculum_context_detailed(result)
elif content_type == "text_chunk":
context_part = self.enhance_text_context_detailed(result)
else:
context_part = f"""
**KONTEN {content_type.upper()}:**
- **Tahun:** {result["metadata"].get('year', 'N/A')}
- **Halaman:** {result["metadata"].get('page', 'N/A')}
- **Context:** {result.get('context_text', '')[:200]}...
**Konten:**
{result['content'][:500]}...
"""
header = f"**{result_type} SUMBER {i}:**"
if not is_primary:
similarity_score = result.get("context_similarity_score", 0)
header += f" (Similarity: {similarity_score:.2f})"
context_parts.append(f"{header}\n{context_part}")
return "\n\n".join(context_parts)
def _deduplicate_and_rank_results(self, all_results: List[Dict], k: int) -> List[Dict]:
seen_ids = set()
unique_results = []
sorted_results = sorted(
all_results,
key=lambda x: (x.get("score", 0), not x.get("content_priority", False))
)
content_type_counts = {}
max_per_type = max(1, k // len(CONTENT_TYPE_STRATEGIES))
for result in sorted_results:
result_id = result["metadata"].get("id", "")
content_type = result["metadata"]["content_type"]
# Skip duplicates
if result_id in seen_ids:
continue
# Limit per content type for diversity (unless priority content)
if not result.get("content_priority", False):
if content_type_counts.get(content_type, 0) >= max_per_type:
continue
seen_ids.add(result_id)
content_type_counts[content_type] = content_type_counts.get(content_type, 0) + 1
# Enhance with context_text
if "context_text" not in result:
result["context_text"] = result["metadata"].get("context_text", "")
unique_results.append(result)
if len(unique_results) >= k:
break
return unique_results
def enhance_table_context_with_markdown(self, result: Dict) -> str:
"""Enhanced table context with markdown formatting"""
metadata = result["metadata"]
context_text = result.get("context_text", "")
enhanced_context = f"""
**TABEL ENHANCED:**
- **Judul:** {metadata.get('title', 'N/A')}
- **Ukuran:** {metadata.get('rows', 0)} baris Γ— {metadata.get('cols', 0)} kolom
- **Tahun:** {metadata.get('year', 'N/A')}
- **Halaman:** {metadata.get('page', 'N/A')}
- **Context:** {context_text}
- **Preview:** {result['content'][:300]}...
**Konten Lengkap:**
{result['content']}
"""
return enhanced_context
def enhance_image_context_with_details(self, result: Dict) -> str:
"""Enhanced image context with detailed metadata"""
metadata = result["metadata"]
context_text = result.get("context_text", "")
enhanced_context = f"""
**GAMBAR ENHANCED:**
- **Judul:** {metadata.get('title', 'N/A')}
- **Caption:** {metadata.get('caption', 'N/A')}
- **Tahun:** {metadata.get('year', 'N/A')}
- **Halaman:** {metadata.get('page', 'N/A')}
- **Context:** {context_text}
- **Deskripsi:** {result['content'][:300]}...
**Path Gambar:** {metadata.get('image_path', 'N/A')}
"""
return enhanced_context
def enhance_silabus_context_detailed(self, result: Dict) -> str:
"""Enhanced silabus context with comprehensive details"""
metadata = result["metadata"]
context_text = result.get("context_text", "")
enhanced_context = f"""
**SILABUS ENHANCED:**
- **Mata Kuliah:** {metadata.get('mata_kuliah', 'N/A')} ({metadata.get('course_code', 'N/A')})
- **Program Studi:** {metadata.get('program', 'N/A').title()}
- **Semester:** {metadata.get('semester', 'N/A')}
- **SKS:** {metadata.get('sks', 'N/A')}
- **Tipe Silabus:** {metadata.get('silabus_type', 'N/A')}
- **Tahun Kurikulum:** {metadata.get('year', 'N/A')}
- **Halaman:** {metadata.get('page', 'N/A')}
- **Context Text:** {context_text}
**Konten Lengkap:**
{result['content']}
"""
return enhanced_context
def enhance_curriculum_context_detailed(self, result: Dict) -> str:
"""Enhanced curriculum context with comprehensive details"""
metadata = result["metadata"]
context_text = result.get("context_text", "")
enhanced_context = f"""
**KURIKULUM ENHANCED:**
- **Program Studi:** {metadata.get('program', 'N/A').title()}
- **Semester:** {metadata.get('semester', 'N/A')}
- **Jenis Tabel:** {metadata.get('table_type', 'N/A')}
- **Jumlah Mata Kuliah:** {metadata.get('rows_count', 'N/A')}
- **Tahun Kurikulum:** {metadata.get('year', 'N/A')}
- **Halaman:** {metadata.get('page', 'N/A')}
- **Context Text:** {context_text}
**Konten Lengkap:**
{result['content']}
"""
return enhanced_context
def enhance_text_context_detailed(self, result: Dict) -> str:
"""Enhanced text context with comprehensive details"""
metadata = result["metadata"]
context_text = result.get("context_text", "")
enhanced_context = f"""
**TEKS ENHANCED:**
- **Bab:** {metadata.get('chapter', 'N/A')}
- **Bagian:** {metadata.get('section', 'N/A')}
- **Sub-bagian:** {metadata.get('subsection', 'N/A')}
- **Tahun:** {metadata.get('year', 'N/A')}
- **Halaman:** {metadata.get('page', 'N/A')}
- **Context Text:** {context_text}
**Konten Lengkap:**
{result['content']}
"""
return enhanced_context
def format_enhanced_context(self, results: List[Dict]) -> str:
"""Format context with comprehensive enhancements and grouping"""
return self.format_enhanced_context_with_grouping(results)
def generate_response(self, query: str, context: str, chat_history: List[Dict] = None) -> str:
"""Generate response using LLM with context and chat history"""
# Prepare chat history context
chat_history_text = ""
if chat_history and len(chat_history) > 1:
recent_messages = chat_history[-CONTEXT_WINDOW_SIZE:]
chat_history_text = "\n\nRiwayat Percakapan Terakhir:\n"
for msg in recent_messages[:-1]: # Exclude current message
role = "User" if msg["role"] == "user" else "Assistant"
chat_history_text += f"{role}: {msg['content'][:200]}...\n"
# Enhanced prompt
enhanced_prompt = f"""
Anda adalah asisten akademik DTMI UGM yang membantu mahasiswa dan dosen.
{chat_history_text}
Pertanyaan Saat Ini: {query}
Konteks Informasi:
{context}
Instruksi:
1. Berikan jawaban yang komprehensif dan akurat
2. Gunakan informasi dari konteks yang relevan
3. Jika merujuk ke tahun atau program studi, sebutkan secara spesifik
4. Format jawaban dengan struktur yang jelas (gunakan bullet points, numbering jika perlu)
5. Jika ada tabel atau data, jelaskan dengan detail
6. Akhiri dengan saran atau informasi tambahan yang berguna
7. Pertimbangkan konteks percakapan sebelumnya jika relevan
8. Manfaatkan informasi kontekstual yang tersedia untuk memberikan jawaban yang lebih lengkap
Jawaban:
"""
for attempt in range(MAX_RETRIES):
try:
response = self.llm.predict(enhanced_prompt)
return response
except Exception as e:
if attempt == MAX_RETRIES - 1:
return FALLBACK_RESPONSE
else:
import time
time.sleep(RETRY_DELAY)
return FALLBACK_RESPONSE
def parse_query_context(self, query: str) -> Dict[str, Any]:
"""Parse query context with year extraction and content type detection"""
years, cleaned_query, user_mentioned_year, user_mentioned_invalid_year = self.year_parser.extract_years(query)
comparison_keywords = ["bandingkan", "banding", "perbandingan",
"dibanding", "vs", "versus", "perbedaan"]
year_comparison_mode = any(keyword in cleaned_query.lower()
for keyword in comparison_keywords) and len(years) > 1
content_type_hints = {
"silabus": ["silabus", "mata kuliah", "course", "sks", "pembelajaran", "materi"],
"curriculum": ["kurikulum", "curriculum", "semester", "program studi", "struktur"],
"table": ["tabel", "table", "data", "statistik", "daftar", "distribusi"],
"image": ["gambar", "image", "foto", "diagram", "struktur", "chart"],
"text_chunk": ["informasi", "penjelasan", "deskripsi", "detail", "tentang"]
}
preferred_types = []
query_lower = cleaned_query.lower()
for content_type, keywords in content_type_hints.items():
if any(keyword in query_lower for keyword in keywords):
preferred_types.append(content_type)
return {
"original_query": query,
"cleaned_query": cleaned_query,
"years": years,
"preferred_content_types": preferred_types,
"year_comparison_mode": year_comparison_mode
}
def query(self, question: str, k: int = 10, content_filter: List[str] = None) -> Dict[str, Any]:
years, cleaned_query, user_mentioned_year, user_mentioned_invalid_year = self.year_parser.extract_years(
question)
if user_mentioned_invalid_year and not years:
return {
"question": question,
"answer": "Maaf, informasi mengenai kurikulum tahun yang Anda minta tidak tersedia dalam konteks database ini.",
"context": "",
"sources": [],
"primary_sources": [],
"contextual_sources": [],
"years_searched": [],
"content_types_used": [],
"total_sources": 0,
"primary_sources_count": 0,
"contextual_sources_count": 0,
"has_images": False,
"has_tables": False,
"image_data": [],
"table_data": [],
"image_paths": [],
"table_paths": [],
"year_comparison_mode": False,
"context_expansion_enabled": self.CONTEXT_EXPANSION_ENABLED,
"processing_time": datetime.now().isoformat()
}
if VERBOSE_LOGGING:
print(f"πŸ” Processing query: {question}")
query_context = self.parse_query_context(question)
if content_filter:
query_context["preferred_content_types"] = content_filter
if LOG_RETRIEVAL_DETAILS:
print(f"πŸ“… Years: {query_context['years']}")
print(f"🎯 Content types: {query_context['preferred_content_types']}")
print(f"πŸ” Content filter: {content_filter}")
results = self.retrieve_multimodal_context_enhanced(query_context, k)
context = self.format_enhanced_context(results)
try:
response = self.generate_response(question, context)
except Exception as e:
print(f"❌ Error generating answer: {e}")
response = FALLBACK_RESPONSE
image_data = []
table_data = []
for result in results:
metadata = result["metadata"]
content_type = metadata.get("content_type", "")
# βœ… FILTER: HANYA AMBIL YANG PRIMARY SOURCES
is_primary = result.get("is_primary_result", True)
if not is_primary:
continue # Skip contextual sources
# πŸ–ΌοΈ EXTRACT IMAGE INFORMATION - HANYA PRIMARY
if content_type == "image":
original_image_path = metadata.get("image_path", "")
if original_image_path:
# Path fixing logic (sama seperti sebelumnya)
fixed_path = original_image_path
if fixed_path.startswith("./src/"):
fixed_path = fixed_path.replace("./src/", "./")
elif fixed_path.startswith("src/"):
fixed_path = fixed_path.replace("src/", "./")
if os.path.exists(fixed_path):
image_path = fixed_path
elif os.path.exists(original_image_path):
image_path = original_image_path
else:
alternatives = [
original_image_path.lstrip('./'),
f"../{original_image_path.lstrip('./')}",
original_image_path.replace("./src/", "../")
]
image_path = None
for alt in alternatives:
if os.path.exists(alt):
image_path = alt
break
if not image_path:
image_path = original_image_path
if VERBOSE_LOGGING:
print(f"πŸ–ΌοΈ PRIMARY Image path resolution:")
print(f" Original: {original_image_path}")
print(f" Fixed: {image_path}")
print(f" Exists: {os.path.exists(image_path)}")
image_info = {
"path": image_path,
"original_path": original_image_path,
"title": metadata.get("title", "Gambar"),
"caption": metadata.get("caption", result['content'][:100] + "..."),
"page": metadata.get("page", "N/A"),
"year": metadata.get("year", "N/A"),
"description": result['content'][:200] + "..." if len(result['content']) > 200 else result['content'],
"score": result.get("score", 0.0),
"is_primary": True # Semua yang masuk ke sini adalah primary
}
image_data.append(image_info)
if VERBOSE_LOGGING:
print(f"πŸ–ΌοΈ Added PRIMARY image: {image_path}")
# πŸ“Š EXTRACT TABLE INFORMATION - HANYA PRIMARY
elif content_type == "table":
table_path = metadata.get("table_path", "")
if table_path and os.path.exists(table_path):
try:
table_info = {
"path": table_path,
"title": metadata.get("title", "Tabel"),
"page": metadata.get("page", "N/A"),
"year": metadata.get("year", "N/A"),
"rows": metadata.get("rows", 0),
"cols": metadata.get("cols", 0),
"description": result['content'][:200] + "..." if len(result['content']) > 200 else result['content'],
"score": result.get("score", 0.0),
"is_primary": True # Semua yang masuk ke sini adalah primary
}
# Load actual table data
if table_path.endswith('.csv'):
df = pd.read_csv(table_path)
table_info["data"] = df
table_info["data_type"] = "dataframe"
elif table_path.endswith('.json'):
with open(table_path, 'r', encoding='utf-8') as f:
json_data = json.load(f)
table_info["data"] = json_data
table_info["data_type"] = "json"
table_data.append(table_info)
if VERBOSE_LOGGING:
print(f"πŸ“Š Found PRIMARY table: {table_path}")
except Exception as e:
print(f"❌ Error loading table {table_path}: {e}")
primary_results = [r for r in results if r.get("is_primary_result", True)]
contextual_results = [r for r in results if not r.get("is_primary_result", True)]
response_data = {
"question": question,
"answer": response.strip(),
"context": context,
"sources": results,
"primary_sources": primary_results,
"contextual_sources": contextual_results,
"years_searched": query_context["years"],
"content_types_used": query_context["preferred_content_types"],
"total_sources": len(results),
"primary_sources_count": len(primary_results),
"contextual_sources_count": len(contextual_results),
"has_images": len(image_data) > 0,
"has_tables": len(table_data) > 0,
"image_data": image_data, # Full image metadata dengan path, title, etc
"table_data": table_data, # Loaded table data dengan DataFrame/JSON
"image_paths": [img["path"] for img in image_data],
"table_paths": [tbl["path"] for tbl in table_data],
"year_comparison_mode": query_context["year_comparison_mode"],
"context_expansion_enabled": self.CONTEXT_EXPANSION_ENABLED,
"processing_time": datetime.now().isoformat()
}
if VERBOSE_LOGGING:
print(f"βœ… Query processed successfully")
print(f"🎯 Primary sources: {len(primary_results)}")
print(f"πŸ”— Contextual sources: {len(contextual_results)}")
print(f"πŸ–ΌοΈ Images found: {len(image_data)}")
print(f"πŸ“Š Tables found: {len(table_data)}")
return response_data
def get_context_chain(self, result_id: str, max_depth: int = 3) -> List[Dict]:
"""Get a chain of contextually related chunks starting from a specific result"""
try:
# This would work with your vectorstore to find chunks with similar metadata
# Implementation depends on your vectorstore structure
chain = []
current_id = result_id
for depth in range(max_depth):
# Find chunks with similar metadata to current chunk
similar_chunks = self.vectorizer.find_similar_by_metadata(current_id)
if not similar_chunks:
break
# Add the most similar chunk to chain
best_match = similar_chunks[0]
chain.append(best_match)
current_id = best_match["metadata"]["id"]
return chain
except Exception as e:
print(f"❌ Error building context chain: {e}")
return []
def get_full_document_context(self, metadata: Dict, year: int) -> str:
"""Get comprehensive context from the entire document/source"""
try:
# Build document identifier
doc_identifiers = []
if metadata.get('program'):
doc_identifiers.append(metadata['program'])
if metadata.get('year'):
doc_identifiers.append(str(metadata['year']))
if metadata.get('chapter'):
doc_identifiers.append(metadata['chapter'])
# Search for all chunks from the same document
doc_query = " ".join(doc_identifiers)
# Get broader context
doc_chunks = self.vectorizer.query_multimodal(
query_text=doc_query,
year=year,
content_types=None,
n_results=50 # Get many chunks from same document
)
# Filter chunks that are actually from the same document
same_doc_chunks = []
for chunk in doc_chunks:
chunk_meta = chunk["metadata"]
similarity_score = self.get_metadata_similarity_score(metadata, chunk_meta)
if similarity_score > 0.5: # Adjust threshold as needed
same_doc_chunks.append(chunk)
# Sort by page number or similarity
same_doc_chunks.sort(key=lambda x: (
x["metadata"].get("page", 999),
x.get("score", 0)
))
# Combine content with clear separators
full_context = ""
for i, chunk in enumerate(same_doc_chunks[:10]): # Limit to avoid token overflow
page = chunk["metadata"].get("page", "N/A")
content_type = chunk["metadata"].get("content_type", "unknown")
full_context += f"\n--- {content_type.upper()} (Page {page}) ---\n"
full_context += chunk["content"][:500] + "...\n"
return full_context
except Exception as e:
print(f"❌ Error getting full document context: {e}")
return ""
def advanced_context_retrieval(self, query_context: Dict[str, Any], k: int = 10) -> List[Dict]:
"""Advanced retrieval that considers document structure and relationships"""
# Step 1: Get initial high-quality results
initial_results = self.retrieve_multimodal_context_enhanced(query_context, k//2)
# Step 2: For each high-quality result, get its document context
enhanced_results = []
seen_ids = set()
for result in initial_results:
result_id = result["metadata"].get("id", "")
if result_id in seen_ids:
continue
seen_ids.add(result_id)
result["context_level"] = "primary"
enhanced_results.append(result)
# Get document-level context
year = result.get("search_year", result["metadata"].get("year"))
if year:
doc_context = self.get_full_document_context(result["metadata"], year)
if doc_context:
# Create a synthetic result with full document context
doc_result = {
"content": doc_context,
"metadata": {
**result["metadata"],
"content_type": "document_context",
"id": f"{result_id}_doc_context"
},
"score": result.get("score", 0) * 0.8, # Slightly lower score
"context_level": "document",
"parent_id": result_id
}
enhanced_results.append(doc_result)
# Step 3: Fill remaining slots with diverse content
remaining_k = k - len(enhanced_results)
if remaining_k > 0:
additional_results = self.vectorizer.query_multimodal(
query_text=query_context["cleaned_query"],
year=query_context["years"][0] if query_context["years"] else 2024,
content_types=None,
n_results=remaining_k * 2
)
for add_result in additional_results:
add_id = add_result["metadata"].get("id", "")
if add_id not in seen_ids and len(enhanced_results) < k:
add_result["context_level"] = "supplementary"
enhanced_results.append(add_result)
seen_ids.add(add_id)
return enhanced_results[:k]