final_project2 / src /rag_system.py
dnj0's picture
Simplify
a6680e7
raw
history blame
21.8 kB
"""
Enhanced RAG System - Visual Image Analysis
Sends base64 images directly to GPT-4o for visual analysis (not just OCR)
Then stores results in vector store
"""
from typing import List, Dict
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, SystemMessage
import base64
import os
from pathlib import Path
from config import (
OPENAI_API_KEY, OPENAI_MODEL, TEMPERATURE, MAX_TOKENS,
LANGUAGE, CHROMA_DB_PATH
)
class VisualMultimodalRAG:
"""
RAG system that:
1. Sends images as base64 to GPT-4o for visual analysis
2. Gets detailed visual descriptions and insights
3. Stores visual analysis in vector store
4. Enables image-based semantic search
"""
def __init__(self, api_key: str = None, debug: bool = True):
api_key = api_key or OPENAI_API_KEY
self.debug = debug
# Use gpt-4o for vision capabilities
self.llm = ChatOpenAI(
model_name="gpt-4o-mini", # CRITICAL: gpt-4o has vision
api_key=api_key,
temperature=TEMPERATURE,
max_tokens=MAX_TOKENS,
)
self.language = LANGUAGE
self.visual_summaries_log = []
if self.debug:
print("โœ… VisualMultimodalRAG initialized with gpt-4o (vision model)")
def _debug_print(self, label: str, data: any):
"""Print debug information"""
if self.debug:
print(f"\n๐Ÿ” DEBUG [{label}]:")
if isinstance(data, (list, dict)):
print(f" Type: {type(data).__name__}")
print(f" Content: {str(data)[:300]}...")
else:
print(f" {data}")
def _image_to_base64(self, image_path: str) -> str:
"""Convert image file to base64 string"""
try:
with open(image_path, 'rb') as image_file:
image_data = base64.b64encode(image_file.read()).decode('utf-8')
return image_data
except Exception as e:
print(f"Error converting image to base64: {e}")
return None
def analyze_image_visually(self, image_path: str, image_idx: int) -> str:
"""
Send actual image (base64) to gpt-4o for visual analysis
Returns detailed visual analysis/description
gpt-4o can see:
- Charts, graphs, diagrams
- Tables and structured data
- Photos and drawings
- Handwritten text
- Screenshots
- Any visual content
"""
if not os.path.exists(image_path):
return f"[Image {image_idx}: File not found - {image_path}]"
try:
# Convert image to base64
image_base64 = self._image_to_base64(image_path)
if not image_base64:
return f"[Image {image_idx}: Could not convert to base64]"
# Determine image type
file_ext = Path(image_path).suffix.lower()
media_type_map = {
'.jpg': 'image/jpeg',
'.jpeg': 'image/jpeg',
'.png': 'image/png',
'.gif': 'image/gif',
'.webp': 'image/webp'
}
media_type = media_type_map.get(file_ext, 'image/png')
print(f"๐Ÿ” Analyzing image {image_idx} visually (as {media_type})...")
# Create message with image
message = HumanMessage(
content=[
{
"type": "image_url",
"image_url": {
"url": f"data:{media_type};base64,{image_base64}",
},
},
{
"type": "text",
"text": f"""Analyze this image in detail in {self.language}.
Provide a comprehensive visual analysis including:
1. **What you see** - Main objects, elements, structure
2. **Data/Content** - Any numbers, text, charts, graphs
3. **Purpose** - What this image is showing or representing
4. **Key insights** - Important patterns, trends, or information
5. **Connections** - How this relates to document content
Be specific and detailed. Focus on visual information that cannot be extracted from text alone.
Analysis:"""
}
],
)
# Call gpt-4o with vision
response = self.llm.invoke([message])
analysis = response.content.strip()
if self.debug:
self._debug_print(f"Image {image_idx} Visual Analysis", analysis)
print(f"โœ… Image {image_idx} analyzed successfully")
return analysis
except Exception as e:
error_msg = f"[Image {image_idx}: Vision analysis failed - {str(e)}]"
print(f"โŒ Error analyzing image {image_idx}: {e}")
return error_msg
def analyze_images_visually(self, images: List[Dict]) -> List[Dict]:
"""
Analyze each image visually using gpt-4o vision
Returns list of {image_index, visual_analysis, type}
"""
visual_analyses = []
for idx, image in enumerate(images):
image_path = image.get('path', '')
if not image_path:
print(f"โš ๏ธ Image {idx}: No path provided")
continue
# Analyze image visually (not just OCR)
visual_analysis = self.analyze_image_visually(image_path, idx)
visual_analyses.append({
'type': 'image_visual',
'image_index': idx,
'image_path': image_path,
'visual_analysis': visual_analysis,
'ocr_text': image.get('ocr_text', '') # Keep OCR as backup
})
return visual_analyses
def summarize_text_chunks(self, text: str, chunk_size: int = 1500) -> List[Dict]:
"""
Chunk text and summarize each chunk individually
"""
chunks = []
text_chunks = self._chunk_text(text, chunk_size=chunk_size, overlap=300)
self._debug_print("Text Chunking", f"Created {len(text_chunks)} chunks")
for idx, chunk in enumerate(text_chunks):
if len(chunk.strip()) < 50:
continue
try:
prompt = f"""Summarize this text chunk in {self.language}.
Keep it concise. Extract key points, facts, and main ideas.
Text Chunk:
{chunk}
Summary (2-3 sentences maximum):"""
message = HumanMessage(content=prompt)
response = self.llm.invoke([message])
summary = response.content.strip()
chunks.append({
'type': 'text_chunk',
'chunk_index': len(chunks),
'original_text': chunk[:500],
'summary': summary,
'chunk_length': len(chunk)
})
if self.debug:
self._debug_print(f"Text Chunk {len(chunks)-1} Summary", summary)
except Exception as e:
print(f"Error summarizing text chunk: {e}")
return chunks
def summarize_tables(self, tables: List[Dict]) -> List[Dict]:
"""
Summarize each table individually
"""
summaries = []
for idx, table in enumerate(tables):
table_content = table.get('content', '')
if not table_content or len(table_content.strip()) < 10:
continue
try:
prompt = f"""Analyze and summarize this table/structured data in {self.language}.
Extract key insights, row/column meanings, and important figures.
Table Content:
{table_content}
Summary (2-3 sentences maximum):"""
message = HumanMessage(content=prompt)
response = self.llm.invoke([message])
summary = response.content.strip()
summaries.append({
'type': 'table',
'table_index': idx,
'original_content': table_content[:500],
'summary': summary,
'table_length': len(table_content)
})
if self.debug:
self._debug_print(f"Table {idx} Summary", summary)
except Exception as e:
print(f"Error summarizing table {idx}: {e}")
return summaries
def process_and_store_document(
self,
text: str,
images: List[Dict],
tables: List[Dict],
vector_store,
doc_id: str
) -> Dict:
"""
Main function: Analyze all components visually and store in vector store
Images are analyzed using gpt-4o vision (not just OCR)
"""
print(f"\n{'='*70}")
print(f"PROCESSING WITH VISUAL IMAGE ANALYSIS: {doc_id}")
print(f"{'='*70}")
results = {
'doc_id': doc_id,
'image_visual_analyses': [],
'text_summaries': [],
'table_summaries': [],
'total_stored': 0
}
# 1. Analyze images VISUALLY using gpt-4o
print(f"\n๐Ÿ–ผ๏ธ VISUAL IMAGE ANALYSIS (gpt-4o vision) ({len(images)} total)")
print(f"{'โ”€'*70}")
image_analyses = self.analyze_images_visually(images)
results['image_visual_analyses'] = image_analyses
# Store each image analysis in vector store
image_docs = {
'text': ' | '.join([
f"Image {a['image_index']}: {a['visual_analysis']}"
for a in image_analyses
]),
'images': [],
'tables': []
}
for analysis in image_analyses:
print(f" โœ… Image {analysis['image_index']} (visual analysis)")
print(f" Path: {analysis['image_path']}")
print(f" Analysis: {analysis['visual_analysis'][:100]}...")
if image_analyses:
try:
vector_store.add_documents(
image_docs,
f"{doc_id}_images_visual"
)
results['total_stored'] += len(image_analyses)
print(f"โœ… Stored {len(image_analyses)} image visual analyses")
except Exception as e:
print(f"โŒ Error storing image analyses: {e}")
# 2. Summarize and store text chunks
print(f"\n๐Ÿ“ TEXT CHUNK SUMMARIZATION")
print(f"{'โ”€'*70}")
text_summaries = self.summarize_text_chunks(text)
results['text_summaries'] = text_summaries
text_docs = {
'text': ' | '.join([f"Chunk {s['chunk_index']}: {s['summary']}"
for s in text_summaries]),
'images': [],
'tables': []
}
for summary in text_summaries:
print(f" โœ… Chunk {summary['chunk_index']}: {summary['summary'][:50]}...")
if text_summaries:
try:
vector_store.add_documents(
text_docs,
f"{doc_id}_text_chunks"
)
results['total_stored'] += len(text_summaries)
print(f"โœ… Stored {len(text_summaries)} text chunk summaries")
except Exception as e:
print(f"โŒ Error storing text summaries: {e}")
# 3. Summarize and store tables
print(f"\n๐Ÿ“‹ TABLE SUMMARIZATION ({len(tables)} total)")
print(f"{'โ”€'*70}")
table_summaries = self.summarize_tables(tables)
results['table_summaries'] = table_summaries
table_docs = {
'text': ' | '.join([f"Table {s['table_index']}: {s['summary']}"
for s in table_summaries]),
'images': [],
'tables': []
}
for summary in table_summaries:
print(f" โœ… Table {summary['table_index']}: {summary['summary'][:50]}...")
if table_summaries:
try:
vector_store.add_documents(
table_docs,
f"{doc_id}_tables"
)
results['total_stored'] += len(table_summaries)
print(f"โœ… Stored {len(table_summaries)} table summaries")
except Exception as e:
print(f"โŒ Error storing table summaries: {e}")
# 4. Summary statistics
print(f"\n{'='*70}")
print(f"๐Ÿ“Š STORAGE SUMMARY")
print(f"{'='*70}")
print(f" Images analyzed visually & stored: {len(image_analyses)}")
print(f" Text chunks summarized & stored: {len(text_summaries)}")
print(f" Tables summarized & stored: {len(table_summaries)}")
print(f" Total items stored in vector: {results['total_stored']}")
print(f"{'='*70}")
self.visual_summaries_log.append(results)
return results
def _chunk_text(self, text: str, chunk_size: int = 1500, overlap: int = 300) -> List[str]:
"""Split text into overlapping chunks"""
chunks = []
start = 0
while start < len(text):
end = start + chunk_size
chunks.append(text[start:end])
start = end - overlap
return chunks
def get_visual_summaries_log(self) -> List[Dict]:
"""Get all visual analysis logs"""
return self.visual_summaries_log
class AnsweringRAG:
"""
RAG system that:
1. Searches vector store for relevant content
2. ANALYZES search results
3. Generates intelligent answers based on context
"""
def __init__(self, api_key: str = None, debug: bool = True):
api_key = api_key or OPENAI_API_KEY
self.debug = debug
self.llm = ChatOpenAI(
model_name="gpt-4o-mini", # Use gpt-4o for better understanding
api_key=api_key,
temperature=TEMPERATURE,
max_tokens=MAX_TOKENS,
)
self.language = LANGUAGE
self.answer_log = []
if self.debug:
print("โœ… AnsweringRAG initialized with answer generation")
def _debug_print(self, label: str, data: any):
"""Print debug information"""
if self.debug:
print(f"\n๐Ÿ” DEBUG [{label}]:")
if isinstance(data, (list, dict)):
print(f" Type: {type(data).__name__}")
print(f" Content: {str(data)[:300]}...")
else:
print(f" {data}")
def analyze_and_answer(
self,
question: str,
search_results: List[Dict]
) -> Dict:
"""
Analyze search results and generate intelligent answer
Returns:
{
'question': user question,
'answer': detailed answer,
'sources_used': number of sources,
'confidence': low/medium/high,
'search_results': original search results
}
"""
print(f"\n{'='*70}")
print(f"ANALYZING QUESTION & GENERATING ANSWER")
print(f"{'='*70}")
print(f"\nโ“ Question: {question}")
print(f"๐Ÿ“Š Search Results Found: {len(search_results)}")
# Check if we have search results
if not search_results:
print(f"โš ๏ธ No search results found!")
answer = f"""I could not find relevant information in the document to answer your question: "{question}"
Try:
- Using different keywords
- Breaking the question into smaller parts
- Asking about other topics in the document"""
result = {
'question': question,
'answer': answer,
'sources_used': 0,
'confidence': 'low',
'search_results': []
}
self.answer_log.append(result)
return result
# Build context from search results
context_parts = []
for idx, result in enumerate(search_results, 1):
content = result.get('content', '')
metadata = result.get('metadata', {})
content_type = result.get('type', 'unknown')
distance = result.get('distance', 0)
relevance = 1 - distance if distance else 0
context_parts.append(f"""
[Source {idx} - {content_type.upper()} (relevance: {relevance:.1%})]
{content}""")
full_context = "\n".join(context_parts)
self._debug_print("Context Prepared", f"{len(context_parts)} sources, {len(full_context)} chars")
# Build prompt to analyze results and answer question
analysis_prompt = f"""You are a helpful assistant analyzing document content to answer user questions.
USER QUESTION:
"{question}"
RELEVANT CONTENT FROM DOCUMENT:
{full_context}
INSTRUCTIONS:
1. Analyze the provided content carefully
2. Extract information relevant to the question
3. Synthesize a clear, comprehensive answer in {self.language}
4. If the content doesn't fully answer the question, explain what information is available
5. Be specific and cite the content when relevant
6. Structure your answer clearly with key points
ANSWER:"""
print(f"\n๐Ÿ” Analyzing search results...")
print(f" Context size: {len(full_context)} characters")
print(f" Sources: {len(search_results)}")
try:
# Call LLM to analyze and answer
message = HumanMessage(content=analysis_prompt)
response = self.llm.invoke([message])
answer = response.content.strip()
# Determine confidence level
confidence = self._estimate_confidence(len(search_results), answer)
print(f"โœ… Answer generated successfully")
print(f" Confidence: {confidence}")
print(f" Answer length: {len(answer)} characters")
result = {
'question': question,
'answer': answer,
'sources_used': len(search_results),
'confidence': confidence,
'search_results': search_results
}
self.answer_log.append(result)
return result
except Exception as e:
print(f"โŒ Error generating answer: {e}")
answer = f"I encountered an error while analyzing the search results. Please try again."
result = {
'question': question,
'answer': answer,
'sources_used': len(search_results),
'confidence': 'low',
'error': str(e),
'search_results': search_results
}
self.answer_log.append(result)
return result
def _estimate_confidence(self, sources_count: int, answer: str) -> str:
"""Estimate confidence level of answer"""
answer_length = len(answer)
# High confidence: multiple sources, substantial answer
if sources_count >= 3 and answer_length > 500:
return "high"
# Medium confidence: some sources, decent answer
elif sources_count >= 2 and answer_length > 200:
return "medium"
# Low confidence: few sources or short answer
else:
return "low"
def get_answer_with_sources(
self,
question: str,
search_results: List[Dict]
) -> Dict:
"""
Get answer AND properly formatted sources
Returns both answer and formatted source citations
"""
result = self.analyze_and_answer(question, search_results)
# Format sources for display
formatted_sources = []
for idx, source in enumerate(result['search_results'], 1):
formatted_sources.append({
'index': idx,
'type': source.get('type', 'unknown'),
'content': source.get('content', ''),
'relevance': 1 - source.get('distance', 0) if source.get('distance') else 0
})
result['formatted_sources'] = formatted_sources
return result
def get_answer_log(self) -> List[Dict]:
"""Get all answer generation logs"""
return self.answer_log
def print_answer_with_sources(self, result: Dict, max_source_length: int = 300):
"""Pretty print answer with sources"""
print(f"\n{'='*70}")
print(f"ANSWER TO: {result['question']}")
print(f"{'='*70}")
print(f"\n๐Ÿ“ ANSWER (Confidence: {result['confidence'].upper()}):")
print(f"{'-'*70}")
print(result['answer'])
print(f"{'-'*70}")
if result.get('formatted_sources'):
print(f"\n๐Ÿ“š SOURCES USED ({len(result['formatted_sources'])} total):")
for source in result['formatted_sources']:
print(f"\n[Source {source['index']} - {source['type'].upper()} ({source['relevance']:.0%} relevant)]")
print(f"{source['content'][:max_source_length]}...")
print(f"\n{'='*70}")