Spaces:
Build error
Build error
| import os | |
| import logging | |
| from pathlib import Path | |
| from typing import List, Dict, Optional | |
| import json | |
| import hashlib | |
| from datetime import datetime | |
| import google.generativeai as genai | |
| from langchain_google_genai import GoogleGenerativeAI | |
| import pypdf | |
| from dataclasses import dataclass | |
| import logging | |
| logging.basicConfig(level=logging.DEBUG) | |
| # Configuração de logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| class DocumentChunk: | |
| """Representa um chunk de documento processado""" | |
| chunk_id: str | |
| content: str | |
| metadata: Dict | |
| page_range: tuple | |
| summary: Optional[str] = None | |
| topics: List[str] = None | |
| keywords: List[str] = None | |
| def __post_init__(self): | |
| """Inicializa listas vazias para campos opcionais se necessário""" | |
| if self.topics is None: | |
| self.topics = [] | |
| if self.keywords is None: | |
| self.keywords = [] | |
| class DocumentProcessor: | |
| """Responsável pelo processamento inicial dos documentos""" | |
| def __init__(self, chunk_size: int = 10): | |
| self.chunk_size = chunk_size | |
| def _generate_chunk_id(self, content: str, metadata: Dict) -> str: | |
| """Gera um ID único para o chunk""" | |
| content_hash = hashlib.md5(content.encode()).hexdigest() | |
| return f"{metadata['source']}_{content_hash[:8]}" | |
| def process_pdf(self, file_path: str) -> List[DocumentChunk]: | |
| """Processa um arquivo PDF e o divide em chunks""" | |
| chunks = [] | |
| try: | |
| with open(file_path, 'rb') as file: | |
| pdf = pypdf.PdfReader(file) | |
| total_pages = len(pdf.pages) | |
| # Processa chunks de páginas | |
| for start_page in range(0, total_pages, self.chunk_size): | |
| end_page = min(start_page + self.chunk_size, total_pages) | |
| content = "" | |
| # Extrai texto das páginas do chunk | |
| for page_num in range(start_page, end_page): | |
| page_text = pdf.pages[page_num].extract_text() | |
| content += f"\n=== Page {page_num + 1} ===\n{page_text}" | |
| metadata = { | |
| "source": os.path.basename(file_path), | |
| "created_at": datetime.now().isoformat(), | |
| "page_range": (start_page + 1, end_page) | |
| } | |
| chunk_id = self._generate_chunk_id(content, metadata) | |
| if content.strip(): | |
| chunk = DocumentChunk( | |
| chunk_id=chunk_id, | |
| content=content, | |
| metadata=metadata, | |
| page_range=(start_page + 1, end_page) | |
| ) | |
| chunks.append(chunk) | |
| else: | |
| logger.warning(f"Chunk vazio ignorado (páginas {start_page + 1}-{end_page})") | |
| logger.info(f"Processadas {total_pages} páginas em {len(chunks)} chunks") | |
| return chunks | |
| except Exception as e: | |
| logger.error(f"Error processing PDF {file_path}: {str(e)}") | |
| raise | |
| class DocumentRepository: | |
| """Gerencia o armazenamento e recuperação de chunks processados""" | |
| def __init__(self, storage_path: str = "processed_documents"): | |
| self.storage_path = Path(storage_path) | |
| self.storage_path.mkdir(exist_ok=True) | |
| self._cache = {} | |
| def save_chunk(self, chunk: 'DocumentChunk'): | |
| """Salva um chunk processado em formato JSON""" | |
| try: | |
| chunk_path = self.storage_path / f"{chunk.chunk_id}.json" | |
| # Converter o chunk para um dicionário manualmente | |
| chunk_data = { | |
| "chunk_id": chunk.chunk_id, | |
| "content": chunk.content, | |
| "metadata": chunk.metadata, | |
| "page_range": list(chunk.page_range), # Converte tupla para lista | |
| "summary": chunk.summary, | |
| "topics": chunk.topics if chunk.topics else [], | |
| "keywords": chunk.keywords if chunk.keywords else [] | |
| } | |
| # Salvar o chunk | |
| with open(chunk_path, 'w', encoding='utf-8') as f: | |
| json.dump(chunk_data, f, ensure_ascii=False, indent=2) | |
| # Atualizar cache | |
| self._cache[chunk.chunk_id] = chunk | |
| logging.debug(f"Chunk {chunk.chunk_id} salvo com sucesso") | |
| except Exception as e: | |
| logging.error(f"Erro ao salvar chunk {chunk.chunk_id}: {str(e)}") | |
| raise | |
| def load_chunk(self, chunk_id: str) -> Optional['DocumentChunk']: | |
| """Carrega um chunk específico""" | |
| try: | |
| # Verificar cache primeiro | |
| if chunk_id in self._cache: | |
| return self._cache[chunk_id] | |
| chunk_path = self.storage_path / f"{chunk_id}.json" | |
| if not chunk_path.exists(): | |
| return None | |
| with open(chunk_path, 'r', encoding='utf-8') as f: | |
| data = json.load(f) | |
| # Converter lista de volta para tupla no page_range | |
| data['page_range'] = tuple(data['page_range']) | |
| # Criar novo DocumentChunk | |
| chunk = DocumentChunk( | |
| chunk_id=data['chunk_id'], | |
| content=data['content'], | |
| metadata=data['metadata'], | |
| page_range=data['page_range'], | |
| summary=data.get('summary'), | |
| topics=data.get('topics', []), | |
| keywords=data.get('keywords', []) | |
| ) | |
| self._cache[chunk_id] = chunk | |
| return chunk | |
| except Exception as e: | |
| logging.error(f"Erro ao carregar chunk {chunk_id}: {str(e)}") | |
| return None | |
| def get_all_chunks(self) -> List['DocumentChunk']: | |
| """Retorna todos os chunks disponíveis""" | |
| chunks = [] | |
| try: | |
| for chunk_id in self.list_chunks(): | |
| chunk = self.load_chunk(chunk_id) | |
| if chunk: | |
| chunks.append(chunk) | |
| return chunks | |
| except Exception as e: | |
| logging.error(f"Erro ao carregar chunks: {str(e)}") | |
| return [] | |
| def list_chunks(self) -> List[str]: | |
| """Lista todos os chunks disponíveis""" | |
| try: | |
| return [f.stem for f in self.storage_path.glob("*.json")] | |
| except Exception as e: | |
| logging.error(f"Erro ao listar chunks: {str(e)}") | |
| return [] | |
| def clear_storage(self): | |
| """Limpa todos os chunks armazenados""" | |
| try: | |
| for file in self.storage_path.glob("*.json"): | |
| file.unlink() | |
| self._cache.clear() | |
| logging.info("Armazenamento limpo com sucesso") | |
| except Exception as e: | |
| logging.error(f"Erro ao limpar armazenamento: {str(e)}") | |
| raise | |
| class ContentAnalyzer: | |
| """Responsável pela análise do conteúdo usando IA""" | |
| def __init__(self, api_key: str = None): | |
| """ | |
| Inicializa o ContentAnalyzer | |
| Args: | |
| api_key (str, optional): Google API key. Se não fornecida, tentará obter da variável de ambiente. | |
| """ | |
| try: | |
| # Se api_key não for fornecida, tenta obter da variável de ambiente | |
| self.api_key = api_key or os.getenv('GOOGLE_API_KEY') | |
| if not self.api_key: | |
| raise ValueError("API key não fornecida e não encontrada nas variáveis de ambiente") | |
| # Configura o cliente genai | |
| genai.configure(api_key=self.api_key) | |
| # Inicializa o modelo | |
| self.llm = GoogleGenerativeAI( | |
| model="gemini-1.5-pro-002", | |
| temperature=0.5 | |
| ) | |
| logging.info("ContentAnalyzer inicializado com sucesso") | |
| except Exception as e: | |
| logging.error(f"Erro na inicialização do ContentAnalyzer: {str(e)}") | |
| raise | |
| def analyze_chunk(self, chunk: 'DocumentChunk') -> Optional['DocumentChunk']: | |
| """Analisa um chunk com compreensão mais focada""" | |
| try: | |
| # Verifica se o conteúdo está vazio | |
| if not chunk.content.strip(): | |
| logging.warning(f"Chunk {chunk.chunk_id} está vazio") | |
| return None | |
| analysis_prompt = f""" | |
| Analise este texto e forneça: | |
| 1. Um resumo conciso (5 a 10 frases) | |
| 2. 3-5 tópicos principais | |
| 3. 10 palavras-chave essenciais | |
| Texto para análise: | |
| {chunk.content} | |
| Responda no formato: | |
| RESUMO: [seu resumo aqui] | |
| TÓPICOS: [tópico 1], [tópico 2], [tópico 3] | |
| PALAVRAS-CHAVE: [palavra1], [palavra2], [palavra3], [palavra4], [palavra5] | |
| """ | |
| # Log do prompt | |
| logging.debug(f"Enviando prompt para análise do chunk {chunk.chunk_id}") | |
| response = self.llm.invoke(analysis_prompt) | |
| response_text = str(response) | |
| # Log da resposta | |
| logging.debug(f"Resposta recebida para chunk {chunk.chunk_id}: {response_text[:200]}...") | |
| # Parse da resposta | |
| lines = response_text.split('\n') | |
| summary = "" | |
| topics = [] | |
| keywords = [] | |
| for line in lines: | |
| if line.startswith('RESUMO:'): | |
| summary = line.replace('RESUMO:', '').strip() | |
| elif line.startswith('TÓPICOS:'): | |
| topics = [t.strip() for t in line.replace('TÓPICOS:', '').split(',')] | |
| elif line.startswith('PALAVRAS-CHAVE:'): | |
| keywords = [k.strip() for k in line.replace('PALAVRAS-CHAVE:', '').split(',')] | |
| # Atualiza o chunk com os resultados | |
| chunk.summary = summary | |
| chunk.topics = topics[:5] # Limita a 5 tópicos | |
| chunk.keywords = keywords[:5] # Limita a 5 palavras-chave | |
| logging.info(f"Chunk {chunk.chunk_id} analisado com sucesso") | |
| return chunk | |
| except Exception as e: | |
| logging.error(f"Erro na análise do chunk {chunk.chunk_id}: {str(e)}") | |
| return None | |
| class DocumentAgent: | |
| """Agente que gerencia a interação com documentos e geração de respostas""" | |
| def __init__(self): | |
| """Inicializa o DocumentAgent com todos os seus componentes""" | |
| try: | |
| # Configuração da API key | |
| self.api_key = os.getenv('GOOGLE_API_KEY') | |
| if not self.api_key: | |
| raise ValueError("GOOGLE_API_KEY não encontrada nas variáveis de ambiente") | |
| # Inicializa os componentes | |
| self.processor = DocumentProcessor(chunk_size=10) | |
| self.analyzer = ContentAnalyzer(api_key=self.api_key) # Passa a API key aqui | |
| self.repository = DocumentRepository() | |
| # Configura o modelo LLM | |
| genai.configure(api_key=self.api_key) | |
| self.llm = GoogleGenerativeAI( | |
| model="gemini-1.5-pro-002", | |
| temperature=0.5 | |
| ) | |
| # Garante que o diretório de documentos processados existe | |
| os.makedirs("processed_documents", exist_ok=True) | |
| logging.info("DocumentAgent inicializado com sucesso!") | |
| except Exception as e: | |
| logging.error(f"Erro na inicialização do DocumentAgent: {str(e)}") | |
| raise | |
| def reset_state(self): | |
| """Reseta o estado do agente""" | |
| try: | |
| # Reinicializa os componentes | |
| self.processor = DocumentProcessor(chunk_size=10) | |
| self.analyzer = ContentAnalyzer(self.api_key) | |
| self.repository = DocumentRepository() | |
| self.repository.clear_storage() | |
| logging.info("Estado do DocumentAgent resetado com sucesso") | |
| except Exception as e: | |
| logging.error(f"Erro ao resetar estado do DocumentAgent: {str(e)}") | |
| raise | |
| def process_document(self, file_path: str) -> int: | |
| """Processa um novo documento e retorna o número de chunks gerados""" | |
| try: | |
| if not os.path.exists(file_path): | |
| raise FileNotFoundError(f"Arquivo não encontrado: {file_path}") | |
| # Log do início do processamento | |
| logging.info(f"Iniciando processamento do documento: {file_path}") | |
| # Processa o documento em chunks | |
| chunks = self.processor.process_pdf(file_path) | |
| processed_chunks = 0 | |
| # Analisa e salva cada chunk | |
| for chunk in chunks: | |
| try: | |
| logging.debug(f"Processando chunk {chunk.chunk_id}") | |
| # Analisa o chunk | |
| analyzed_chunk = self.analyzer.analyze_chunk(chunk) | |
| if analyzed_chunk: | |
| # Salva o chunk analisado | |
| self.repository.save_chunk(analyzed_chunk) | |
| processed_chunks += 1 | |
| logging.debug(f"Chunk {chunk.chunk_id} processado e salvo com sucesso") | |
| else: | |
| logging.warning(f"Chunk {chunk.chunk_id} não pôde ser analisado") | |
| except Exception as chunk_error: | |
| logging.error(f"Erro ao processar chunk {chunk.chunk_id}: {str(chunk_error)}") | |
| continue | |
| # Log do resultado final | |
| logging.info(f"Documento processado com sucesso: {processed_chunks} chunks gerados") | |
| return processed_chunks | |
| except Exception as e: | |
| logging.error(f"Erro ao processar documento {file_path}: {str(e)}") | |
| raise | |
| def answer_question(self, question: str) -> str: | |
| """Responde perguntas com base nos documentos processados""" | |
| try: | |
| # Verifica se existem chunks processados | |
| chunks = self.repository.get_all_chunks() | |
| if not chunks: | |
| return "Não há documentos processados para responder à pergunta." | |
| # Prepara o contexto para a resposta | |
| context = [] | |
| for chunk in chunks: | |
| context_entry = { | |
| 'source': f"{chunk.metadata['source']} (páginas {chunk.page_range[0]}-{chunk.page_range[1]})", | |
| 'content': chunk.content, | |
| 'summary': chunk.summary if chunk.summary else "", | |
| 'topics': chunk.topics if chunk.topics else [] | |
| } | |
| context.append(context_entry) | |
| # Gera a resposta | |
| response_prompt = f""" | |
| Pergunta: {question} | |
| Contexto disponível: | |
| {json.dumps(context, indent=2, ensure_ascii=False)} | |
| Por favor, responda à pergunta usando apenas as informações fornecidas no contexto acima. | |
| Se não houver informações suficientes, indique isso na resposta. | |
| """ | |
| response = self.llm.invoke(response_prompt) | |
| return str(response) | |
| except Exception as e: | |
| logging.error(f"Erro ao responder pergunta: {str(e)}") | |
| return f"Desculpe, ocorreu um erro ao processar sua pergunta: {str(e)}" |