chat_cheetah / document_agent.py
guifav's picture
all
8559d4d
import os
import logging
from pathlib import Path
from typing import List, Dict, Optional
import json
import hashlib
from datetime import datetime
import google.generativeai as genai
from langchain_google_genai import GoogleGenerativeAI
import pypdf
from dataclasses import dataclass
import logging
logging.basicConfig(level=logging.DEBUG)
# Configuração de logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class DocumentChunk:
"""Representa um chunk de documento processado"""
chunk_id: str
content: str
metadata: Dict
page_range: tuple
summary: Optional[str] = None
topics: List[str] = None
keywords: List[str] = None
def __post_init__(self):
"""Inicializa listas vazias para campos opcionais se necessário"""
if self.topics is None:
self.topics = []
if self.keywords is None:
self.keywords = []
class DocumentProcessor:
"""Responsável pelo processamento inicial dos documentos"""
def __init__(self, chunk_size: int = 10):
self.chunk_size = chunk_size
def _generate_chunk_id(self, content: str, metadata: Dict) -> str:
"""Gera um ID único para o chunk"""
content_hash = hashlib.md5(content.encode()).hexdigest()
return f"{metadata['source']}_{content_hash[:8]}"
def process_pdf(self, file_path: str) -> List[DocumentChunk]:
"""Processa um arquivo PDF e o divide em chunks"""
chunks = []
try:
with open(file_path, 'rb') as file:
pdf = pypdf.PdfReader(file)
total_pages = len(pdf.pages)
# Processa chunks de páginas
for start_page in range(0, total_pages, self.chunk_size):
end_page = min(start_page + self.chunk_size, total_pages)
content = ""
# Extrai texto das páginas do chunk
for page_num in range(start_page, end_page):
page_text = pdf.pages[page_num].extract_text()
content += f"\n=== Page {page_num + 1} ===\n{page_text}"
metadata = {
"source": os.path.basename(file_path),
"created_at": datetime.now().isoformat(),
"page_range": (start_page + 1, end_page)
}
chunk_id = self._generate_chunk_id(content, metadata)
if content.strip():
chunk = DocumentChunk(
chunk_id=chunk_id,
content=content,
metadata=metadata,
page_range=(start_page + 1, end_page)
)
chunks.append(chunk)
else:
logger.warning(f"Chunk vazio ignorado (páginas {start_page + 1}-{end_page})")
logger.info(f"Processadas {total_pages} páginas em {len(chunks)} chunks")
return chunks
except Exception as e:
logger.error(f"Error processing PDF {file_path}: {str(e)}")
raise
class DocumentRepository:
"""Gerencia o armazenamento e recuperação de chunks processados"""
def __init__(self, storage_path: str = "processed_documents"):
self.storage_path = Path(storage_path)
self.storage_path.mkdir(exist_ok=True)
self._cache = {}
def save_chunk(self, chunk: 'DocumentChunk'):
"""Salva um chunk processado em formato JSON"""
try:
chunk_path = self.storage_path / f"{chunk.chunk_id}.json"
# Converter o chunk para um dicionário manualmente
chunk_data = {
"chunk_id": chunk.chunk_id,
"content": chunk.content,
"metadata": chunk.metadata,
"page_range": list(chunk.page_range), # Converte tupla para lista
"summary": chunk.summary,
"topics": chunk.topics if chunk.topics else [],
"keywords": chunk.keywords if chunk.keywords else []
}
# Salvar o chunk
with open(chunk_path, 'w', encoding='utf-8') as f:
json.dump(chunk_data, f, ensure_ascii=False, indent=2)
# Atualizar cache
self._cache[chunk.chunk_id] = chunk
logging.debug(f"Chunk {chunk.chunk_id} salvo com sucesso")
except Exception as e:
logging.error(f"Erro ao salvar chunk {chunk.chunk_id}: {str(e)}")
raise
def load_chunk(self, chunk_id: str) -> Optional['DocumentChunk']:
"""Carrega um chunk específico"""
try:
# Verificar cache primeiro
if chunk_id in self._cache:
return self._cache[chunk_id]
chunk_path = self.storage_path / f"{chunk_id}.json"
if not chunk_path.exists():
return None
with open(chunk_path, 'r', encoding='utf-8') as f:
data = json.load(f)
# Converter lista de volta para tupla no page_range
data['page_range'] = tuple(data['page_range'])
# Criar novo DocumentChunk
chunk = DocumentChunk(
chunk_id=data['chunk_id'],
content=data['content'],
metadata=data['metadata'],
page_range=data['page_range'],
summary=data.get('summary'),
topics=data.get('topics', []),
keywords=data.get('keywords', [])
)
self._cache[chunk_id] = chunk
return chunk
except Exception as e:
logging.error(f"Erro ao carregar chunk {chunk_id}: {str(e)}")
return None
def get_all_chunks(self) -> List['DocumentChunk']:
"""Retorna todos os chunks disponíveis"""
chunks = []
try:
for chunk_id in self.list_chunks():
chunk = self.load_chunk(chunk_id)
if chunk:
chunks.append(chunk)
return chunks
except Exception as e:
logging.error(f"Erro ao carregar chunks: {str(e)}")
return []
def list_chunks(self) -> List[str]:
"""Lista todos os chunks disponíveis"""
try:
return [f.stem for f in self.storage_path.glob("*.json")]
except Exception as e:
logging.error(f"Erro ao listar chunks: {str(e)}")
return []
def clear_storage(self):
"""Limpa todos os chunks armazenados"""
try:
for file in self.storage_path.glob("*.json"):
file.unlink()
self._cache.clear()
logging.info("Armazenamento limpo com sucesso")
except Exception as e:
logging.error(f"Erro ao limpar armazenamento: {str(e)}")
raise
class ContentAnalyzer:
"""Responsável pela análise do conteúdo usando IA"""
def __init__(self, api_key: str = None):
"""
Inicializa o ContentAnalyzer
Args:
api_key (str, optional): Google API key. Se não fornecida, tentará obter da variável de ambiente.
"""
try:
# Se api_key não for fornecida, tenta obter da variável de ambiente
self.api_key = api_key or os.getenv('GOOGLE_API_KEY')
if not self.api_key:
raise ValueError("API key não fornecida e não encontrada nas variáveis de ambiente")
# Configura o cliente genai
genai.configure(api_key=self.api_key)
# Inicializa o modelo
self.llm = GoogleGenerativeAI(
model="gemini-1.5-pro-002",
temperature=0.5
)
logging.info("ContentAnalyzer inicializado com sucesso")
except Exception as e:
logging.error(f"Erro na inicialização do ContentAnalyzer: {str(e)}")
raise
def analyze_chunk(self, chunk: 'DocumentChunk') -> Optional['DocumentChunk']:
"""Analisa um chunk com compreensão mais focada"""
try:
# Verifica se o conteúdo está vazio
if not chunk.content.strip():
logging.warning(f"Chunk {chunk.chunk_id} está vazio")
return None
analysis_prompt = f"""
Analise este texto e forneça:
1. Um resumo conciso (5 a 10 frases)
2. 3-5 tópicos principais
3. 10 palavras-chave essenciais
Texto para análise:
{chunk.content}
Responda no formato:
RESUMO: [seu resumo aqui]
TÓPICOS: [tópico 1], [tópico 2], [tópico 3]
PALAVRAS-CHAVE: [palavra1], [palavra2], [palavra3], [palavra4], [palavra5]
"""
# Log do prompt
logging.debug(f"Enviando prompt para análise do chunk {chunk.chunk_id}")
response = self.llm.invoke(analysis_prompt)
response_text = str(response)
# Log da resposta
logging.debug(f"Resposta recebida para chunk {chunk.chunk_id}: {response_text[:200]}...")
# Parse da resposta
lines = response_text.split('\n')
summary = ""
topics = []
keywords = []
for line in lines:
if line.startswith('RESUMO:'):
summary = line.replace('RESUMO:', '').strip()
elif line.startswith('TÓPICOS:'):
topics = [t.strip() for t in line.replace('TÓPICOS:', '').split(',')]
elif line.startswith('PALAVRAS-CHAVE:'):
keywords = [k.strip() for k in line.replace('PALAVRAS-CHAVE:', '').split(',')]
# Atualiza o chunk com os resultados
chunk.summary = summary
chunk.topics = topics[:5] # Limita a 5 tópicos
chunk.keywords = keywords[:5] # Limita a 5 palavras-chave
logging.info(f"Chunk {chunk.chunk_id} analisado com sucesso")
return chunk
except Exception as e:
logging.error(f"Erro na análise do chunk {chunk.chunk_id}: {str(e)}")
return None
class DocumentAgent:
"""Agente que gerencia a interação com documentos e geração de respostas"""
def __init__(self):
"""Inicializa o DocumentAgent com todos os seus componentes"""
try:
# Configuração da API key
self.api_key = os.getenv('GOOGLE_API_KEY')
if not self.api_key:
raise ValueError("GOOGLE_API_KEY não encontrada nas variáveis de ambiente")
# Inicializa os componentes
self.processor = DocumentProcessor(chunk_size=10)
self.analyzer = ContentAnalyzer(api_key=self.api_key) # Passa a API key aqui
self.repository = DocumentRepository()
# Configura o modelo LLM
genai.configure(api_key=self.api_key)
self.llm = GoogleGenerativeAI(
model="gemini-1.5-pro-002",
temperature=0.5
)
# Garante que o diretório de documentos processados existe
os.makedirs("processed_documents", exist_ok=True)
logging.info("DocumentAgent inicializado com sucesso!")
except Exception as e:
logging.error(f"Erro na inicialização do DocumentAgent: {str(e)}")
raise
def reset_state(self):
"""Reseta o estado do agente"""
try:
# Reinicializa os componentes
self.processor = DocumentProcessor(chunk_size=10)
self.analyzer = ContentAnalyzer(self.api_key)
self.repository = DocumentRepository()
self.repository.clear_storage()
logging.info("Estado do DocumentAgent resetado com sucesso")
except Exception as e:
logging.error(f"Erro ao resetar estado do DocumentAgent: {str(e)}")
raise
def process_document(self, file_path: str) -> int:
"""Processa um novo documento e retorna o número de chunks gerados"""
try:
if not os.path.exists(file_path):
raise FileNotFoundError(f"Arquivo não encontrado: {file_path}")
# Log do início do processamento
logging.info(f"Iniciando processamento do documento: {file_path}")
# Processa o documento em chunks
chunks = self.processor.process_pdf(file_path)
processed_chunks = 0
# Analisa e salva cada chunk
for chunk in chunks:
try:
logging.debug(f"Processando chunk {chunk.chunk_id}")
# Analisa o chunk
analyzed_chunk = self.analyzer.analyze_chunk(chunk)
if analyzed_chunk:
# Salva o chunk analisado
self.repository.save_chunk(analyzed_chunk)
processed_chunks += 1
logging.debug(f"Chunk {chunk.chunk_id} processado e salvo com sucesso")
else:
logging.warning(f"Chunk {chunk.chunk_id} não pôde ser analisado")
except Exception as chunk_error:
logging.error(f"Erro ao processar chunk {chunk.chunk_id}: {str(chunk_error)}")
continue
# Log do resultado final
logging.info(f"Documento processado com sucesso: {processed_chunks} chunks gerados")
return processed_chunks
except Exception as e:
logging.error(f"Erro ao processar documento {file_path}: {str(e)}")
raise
def answer_question(self, question: str) -> str:
"""Responde perguntas com base nos documentos processados"""
try:
# Verifica se existem chunks processados
chunks = self.repository.get_all_chunks()
if not chunks:
return "Não há documentos processados para responder à pergunta."
# Prepara o contexto para a resposta
context = []
for chunk in chunks:
context_entry = {
'source': f"{chunk.metadata['source']} (páginas {chunk.page_range[0]}-{chunk.page_range[1]})",
'content': chunk.content,
'summary': chunk.summary if chunk.summary else "",
'topics': chunk.topics if chunk.topics else []
}
context.append(context_entry)
# Gera a resposta
response_prompt = f"""
Pergunta: {question}
Contexto disponível:
{json.dumps(context, indent=2, ensure_ascii=False)}
Por favor, responda à pergunta usando apenas as informações fornecidas no contexto acima.
Se não houver informações suficientes, indique isso na resposta.
"""
response = self.llm.invoke(response_prompt)
return str(response)
except Exception as e:
logging.error(f"Erro ao responder pergunta: {str(e)}")
return f"Desculpe, ocorreu um erro ao processar sua pergunta: {str(e)}"