final_project2 / src /rag_system.py
dnj0's picture
Simplify
b802cc4
raw
history blame
19 kB
"""
RAG основной pipeline
"""
from typing import List, Dict
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, SystemMessage
import base64
import os
from pathlib import Path
from config import (
OPENAI_API_KEY, OPENAI_MODEL, TEMPERATURE, MAX_TOKENS,
LANGUAGE, CHROMA_DB_PATH
)
class VisualMultimodalRAG:
"""
RAG - подготовительный этап:
1. Кодирует изображение в base64 и отправляет в gpt-4o-mini
2. Получает описание изображения
3. Сохраняет описание в векторное хранилище
"""
def __init__(self, api_key: str = None, debug: bool = True):
api_key = api_key or OPENAI_API_KEY
self.debug = debug
self.llm = ChatOpenAI(
model_name=OPENAI_MODEL,
api_key=api_key,
temperature=TEMPERATURE,
max_tokens=MAX_TOKENS,
)
self.language = LANGUAGE
self.visual_summaries_log = []
if self.debug:
print(f"VisualMultimodalRAG with {OPENAI_MODEL}")
def _debug_print(self, label: str, data: any):
"""Debug"""
if self.debug:
print(f"\nDEBUG [{label}]:")
if isinstance(data, (list, dict)):
print(f" Type: {type(data).__name__}")
print(f" Content: {str(data)[:300]}...")
else:
print(f" {data}")
def _image_to_base64(self, image_path: str) -> str:
"""Конвертирует изображение в base64"""
try:
with open(image_path, 'rb') as image_file:
image_data = base64.b64encode(image_file.read()).decode('utf-8')
return image_data
except Exception as e:
print(f"Error converting image to base64: {e}")
return None
def analyze_image_visually(self, image_path: str, image_idx: int) -> str:
"""
Отправляет в модель изображение для суммаризации
"""
if not os.path.exists(image_path):
return f"[Image {image_idx}: File not found - {image_path}]"
try:
image_base64 = self._image_to_base64(image_path)
if not image_base64:
return f"[Image {image_idx}: Error converting to base64]"
file_ext = Path(image_path).suffix.lower()
media_type_map = {
'.jpg': 'image/jpeg',
'.jpeg': 'image/jpeg',
'.png': 'image/png',
'.gif': 'image/gif',
'.webp': 'image/webp'
}
media_type = media_type_map.get(file_ext, 'image/png')
print(f" Analyzing image {image_idx}...")
message = HumanMessage(
content=[
{
"type": "image_url",
"image_url": {
"url": f"data:{media_type};base64,{image_base64}",
},
},
{
"type": "text",
"text": f"""Ты - ассистент по сбору и обобщению информации. Проанализируй изображение.
По результатам анализа предоставь информацию:
1. Что изображено на картинке - основные объекты и элементы
2. Тип данных и содержимое - числа, графики, зависимости.
3. Назначение изображения - для чего оно представлено и что отображает
4. Связь с текстом
Будь краток и содержателен. Фокусируйся на визуальной информации.
Результат:"""
}
],
)
response = self.llm.invoke([message])
analysis = response.content.strip()
if self.debug:
self._debug_print(f"Image {image_idx} Visual Analysis", analysis)
print(f" Image {image_idx} analyzed successfully")
return analysis
except Exception as e:
error_msg = f"[Image {image_idx}: Vision analysis failed - {str(e)}]"
print(f" Error analyzing image {image_idx}: {e}")
return error_msg
def analyze_images_visually(self, images: List[Dict]) -> List[Dict]:
"""
Считывает изображения и отправляет на анализ
"""
visual_analyses = []
for idx, image in enumerate(images):
image_path = image.get('path', '')
if not image_path:
print(f" Image {idx}: No path")
continue
visual_analysis = self.analyze_image_visually(image_path, idx)
visual_analyses.append({
'type': 'image_visual',
'image_index': idx,
'image_path': image_path,
'visual_analysis': visual_analysis,
'ocr_text': image.get('ocr_text', '')
})
return visual_analyses
def summarize_text_chunks(self, text: str, chunk_size: int = 1500) -> List[Dict]:
"""
Отправляет куски текста на суммаризацию
"""
chunks = []
text_chunks = self._chunk_text(text, chunk_size=chunk_size, overlap=300)
self._debug_print("Text Chunking", f"Created {len(text_chunks)} chunks")
for idx, chunk in enumerate(text_chunks):
if len(chunk.strip()) < 50:
continue
try:
prompt = f"""Ты - ассистент по обобщению и суммаризации информации. Проанализируй и суммаризируй следующий кусок текста.
Выдели основные моменты, факты и идеи. Будь краток.
Текст :
{chunk}
Результат:"""
message = HumanMessage(content=prompt)
response = self.llm.invoke([message])
summary = response.content.strip()
chunks.append({
'type': 'text_chunk',
'chunk_index': len(chunks),
'original_text': chunk[:500],
'summary': summary,
'chunk_length': len(chunk)
})
if self.debug:
self._debug_print(f"Text Chunk {len(chunks)-1} Summary", summary)
except Exception as e:
print(f"Error summarizing text chunk: {e}")
return chunks
def summarize_tables(self, tables: List[Dict]) -> List[Dict]:
"""
Отправляет таблицы на суммаризацию
"""
summaries = []
for idx, table in enumerate(tables):
table_content = table.get('content', '')
if not table_content or len(table_content.strip()) < 10:
continue
try:
prompt = f"""Ты - ассистент по обобщению и суммаризации информации. Проанализируй и суммаризируй следующию таблицу.
Выдели основные моменты, числа, и значения строк/колонок. Будь краток.
Таблица:
{table_content}
Результат:"""
message = HumanMessage(content=prompt)
response = self.llm.invoke([message])
summary = response.content.strip()
summaries.append({
'type': 'table',
'table_index': idx,
'original_content': table_content[:500],
'summary': summary,
'table_length': len(table_content)
})
if self.debug:
self._debug_print(f"Table {idx} Summary", summary)
except Exception as e:
print(f"Error summarizing table {idx}: {e}")
return summaries
def process_and_store_document(
self,
text: str,
images: List[Dict],
tables: List[Dict],
vector_store,
doc_id: str
) -> Dict:
"""
Основной pipeline анализирует и сохраняет документы в хранилище
"""
print(f"PROCESSING ANALYSIS: {doc_id}")
results = {
'doc_id': doc_id,
'image_visual_analyses': [],
'text_summaries': [],
'table_summaries': [],
'total_stored': 0
}
print(f"\n VISUAL IMAGE ANALYSIS ({len(images)} )")
image_analyses = self.analyze_images_visually(images)
results['image_visual_analyses'] = image_analyses
image_docs = {
'text': ' | '.join([
f"Image {a['image_index']}: {a['visual_analysis']}"
for a in image_analyses
]),
'images': [],
'tables': []
}
for analysis in image_analyses:
print(f" Image {analysis['image_index']}")
print(f" Path: {analysis['image_path']}")
print(f" Analysis: {analysis['visual_analysis'][:100]}...")
if image_analyses:
try:
vector_store.add_documents(
image_docs,
f"{doc_id}_images_visual"
)
results['total_stored'] += len(image_analyses)
print(f" Stored {len(image_analyses)} imagу analyses")
except Exception as e:
print(f"Error storing image analyses: {e}")
print(f"\n TEXT CHUNK SUMMARIZATION")
text_summaries = self.summarize_text_chunks(text)
results['text_summaries'] = text_summaries
text_docs = {
'text': ' | '.join([f"Chunk {s['chunk_index']}: {s['summary']}"
for s in text_summaries]),
'images': [],
'tables': []
}
for summary in text_summaries:
print(f" Chunk {summary['chunk_index']}: {summary['summary'][:50]}...")
if text_summaries:
try:
vector_store.add_documents(
text_docs,
f"{doc_id}_text_chunks"
)
results['total_stored'] += len(text_summaries)
print(f" Stored {len(text_summaries)} text chunk summaries")
except Exception as e:
print(f" Error text summaries: {e}")
print(f"\n TABLE SUMMARIZATION ({len(tables)}")
table_summaries = self.summarize_tables(tables)
results['table_summaries'] = table_summaries
table_docs = {
'text': ' | '.join([f"Table {s['table_index']}: {s['summary']}"
for s in table_summaries]),
'images': [],
'tables': []
}
for summary in table_summaries:
print(f" Table {summary['table_index']}: {summary['summary'][:50]}...")
if table_summaries:
try:
vector_store.add_documents(
table_docs,
f"{doc_id}_tables"
)
results['total_stored'] += len(table_summaries)
print(f" Stored {len(table_summaries)} table summaries")
except Exception as e:
print(f" Error storing table summaries: {e}")
print(f" STORAGE SUMMARY")
print(f" Images analyzed: {len(image_analyses)}")
print(f" Text chunks summarized: {len(text_summaries)}")
print(f" Tables summarized: {len(table_summaries)}")
print(f" Total items stored in vector: {results['total_stored']}")
self.visual_summaries_log.append(results)
return results
def _chunk_text(self, text: str, chunk_size: int = 1500, overlap: int = 300) -> List[str]:
chunks = []
start = 0
while start < len(text):
end = start + chunk_size
chunks.append(text[start:end])
start = end - overlap
return chunks
def get_visual_summaries_log(self) -> List[Dict]:
return self.visual_summaries_log
class AnsweringRAG:
"""
RAG - работа с ответом на запрос:
1. Поиск в векторном хранилище
2. Анализ результатов
3. Предоставление ответа
"""
def __init__(self, api_key: str = None, debug: bool = True):
api_key = api_key or OPENAI_API_KEY
self.debug = debug
self.llm = ChatOpenAI(
model_name=OPENAI_MODEL,
api_key=api_key,
temperature=TEMPERATURE,
max_tokens=MAX_TOKENS,
)
self.language = LANGUAGE
self.answer_log = []
if self.debug:
print(" AnsweringRAG initialized")
def _debug_print(self, label: str, data: any):
"""Debug"""
if self.debug:
print(f"\n🔍 DEBUG [{label}]:")
if isinstance(data, (list, dict)):
print(f" Type: {type(data).__name__}")
print(f" Content: {str(data)[:300]}...")
else:
print(f" {data}")
def analyze_and_answer(
self,
question: str,
search_results: List[Dict]
) -> Dict:
"""
Проанализируй найденные документов и на основе их предоставь ответ на вопрос пользователя
Ответ:
{
'question': user question,
'answer': detailed answer,
'sources_used': number of sources,
'confidence': low/medium/high,
'search_results': original search results
}
"""
print(f"ANALYZING QUESTION & GENERATING ANSWER")
print(f"\n Question: {question}")
print(f" Search Results: {len(search_results)}")
if not search_results:
print(f" No search results found!")
answer = f"""Релевантная информация в документах отсутствует: "{question}"
"""
result = {
'question': question,
'answer': answer,
'sources_used': 0,
'confidence': 'low',
'search_results': []
}
self.answer_log.append(result)
return result
context_parts = []
for idx, result in enumerate(search_results, 1):
content = result.get('content', '')
metadata = result.get('metadata', {})
content_type = result.get('type', 'unknown')
distance = result.get('distance', 0)
relevance = 1 - distance if distance else 0
context_parts.append(f"""
[Source {idx} - {content_type.upper()} (relevance: {relevance:.1%})]
{content}""")
full_context = "\n".join(context_parts)
self._debug_print("Context Prepared", f"{len(context_parts)} sources")
analysis_prompt = f"""Ты - ассистент по анализу документов и ответов на вопросы по ним.
ВОПРОС:
"{question}"
РЕЛЕВАНТНАЯ ИНФОРМАЦИЯ:
{full_context}
ИНСТРУКЦИИ:
1. Проанализируй предоставленный контент
2. Выдели информацию имеющую отношение к вопросу
3. Предоставь понятный и исчерпывающий ответ
4. Если контент полностью не отвечает на вопрос предосавь информацию которая доступна в контенте
5. Построй свой ответ опираясь на ключевые моменты
Ответ:"""
print(f"\n Analyzing search results...")
print(f" Context size: {len(full_context)} chars")
print(f" Sources: {len(search_results)}")
try:
message = HumanMessage(content=analysis_prompt)
response = self.llm.invoke([message])
answer = response.content.strip()
confidence = self._estimate_confidence(len(search_results), answer)
print(f" Answer generated successfully")
print(f" Confidence: {confidence}")
print(f" Answer length: {len(answer)} chars")
result = {
'question': question,
'answer': answer,
'sources_used': len(search_results),
'confidence': confidence,
'search_results': search_results
}
self.answer_log.append(result)
return result
except Exception as e:
print(f" Error generating answer: {e}")
answer = f"Error while analyzing the search results."
result = {
'question': question,
'answer': answer,
'sources_used': len(search_results),
'confidence': 'low',
'error': str(e),
'search_results': search_results
}
self.answer_log.append(result)
return result
def _estimate_confidence(self, sources_count: int, answer: str) -> str:
"""Уверенность в ответе на основании найденных источников информации"""
answer_length = len(answer)
if sources_count >= 3 and answer_length > 500:
return "high"
elif sources_count >= 2 and answer_length > 200:
return "medium"
else:
return "low"