""" Gerenciamento de conexão e operações com PostgreSQL + pgvector """ import time from typing import Optional, List, Dict, Any, Tuple import psycopg from pgvector.psycopg import register_vector import numpy as np from .config import DATABASE_URL, EMBEDDING_DIM, IVFFLAT_LISTS class DatabaseManager: """Gerenciador de conexão e operações com banco de dados""" def __init__(self): self.conn: Optional[psycopg.Connection] = None self.last_error: str = "" def connect(self) -> Optional[psycopg.Connection]: """Estabelece conexão com o banco de dados""" if not DATABASE_URL: self.last_error = "DATABASE_URL ausente" return None # Verifica se conexão existente ainda está ativa if self.conn is not None: try: with self.conn.cursor() as cur: cur.execute("SELECT 1") return self.conn except Exception: try: self.conn.close() except Exception: pass self.conn = None # Tenta estabelecer nova conexão com retry attempts = 0 delay = 0.5 while attempts < 10: try: self.conn = psycopg.connect(DATABASE_URL, autocommit=True) register_vector(self.conn) with self.conn.cursor() as cur: cur.execute("SELECT 1") cur.fetchone() self.last_error = "" return self.conn except Exception as e: self.last_error = f"Falha na conexão: {str(e)}" time.sleep(delay) attempts += 1 delay = min(delay * 2, 5) self.conn = None return None def init_schema(self) -> bool: """Inicializa schema do banco de dados""" conn = self.connect() if not conn: return False try: with conn.cursor() as cur: # Habilita extensão pgvector cur.execute("CREATE EXTENSION IF NOT EXISTS vector") # Tabela de documentos cur.execute( f""" CREATE TABLE IF NOT EXISTS documents ( id BIGSERIAL PRIMARY KEY, session_id TEXT, title TEXT, content TEXT, embedding vector({EMBEDDING_DIM}), created_at TIMESTAMP DEFAULT NOW() ) """ ) # Adiciona coluna session_id se não existir (migration) cur.execute( """ DO $$ BEGIN IF NOT EXISTS ( SELECT 1 FROM information_schema.columns WHERE table_name='documents' AND column_name='session_id' ) THEN ALTER TABLE documents ADD COLUMN session_id TEXT; END IF; END $$; """ ) # Tabela de chats cur.execute( """ CREATE TABLE IF NOT EXISTS chats ( id BIGSERIAL PRIMARY KEY, session_id TEXT UNIQUE, created_at TIMESTAMP DEFAULT NOW() ) """ ) # Tabela de mensagens cur.execute( """ CREATE TABLE IF NOT EXISTS messages ( id BIGSERIAL PRIMARY KEY, chat_id BIGINT REFERENCES chats(id) ON DELETE CASCADE, role TEXT, content TEXT, created_at TIMESTAMP DEFAULT NOW() ) """ ) # Tabela de métricas (para monitoramento) cur.execute( """ CREATE TABLE IF NOT EXISTS query_metrics ( id BIGSERIAL PRIMARY KEY, session_id TEXT, query TEXT, num_results INT, retrieval_time_ms FLOAT, generation_time_ms FLOAT, total_time_ms FLOAT, top_k INT, created_at TIMESTAMP DEFAULT NOW() ) """ ) return True except Exception as e: self.last_error = f"Falha ao criar schema: {str(e)}" return False def create_index(self, lists: int = IVFFLAT_LISTS) -> Tuple[bool, str]: """Cria ou recria índice IVFFLAT para embeddings""" conn = self.connect() if not conn: return False, "Banco não conectado" try: with conn.cursor() as cur: cur.execute("DROP INDEX IF EXISTS idx_documents_embedding_cosine") cur.execute( f""" CREATE INDEX idx_documents_embedding_cosine ON documents USING ivfflat (embedding vector_cosine_ops) WITH (lists = {int(lists)}) """ ) cur.execute("ANALYZE documents") return True, f"Índice criado com lists={lists}" except Exception as e: return False, f"Falha ao criar índice: {str(e)}" def insert_document( self, title: str, content: str, embedding: List[float], session_id: Optional[str] = None ) -> Optional[int]: """Insere documento no banco""" conn = self.connect() if not conn: return None try: with conn.cursor() as cur: cur.execute( "INSERT INTO documents (session_id, title, content, embedding) VALUES (%s, %s, %s, %s::vector) RETURNING id", (session_id, title, content, embedding) ) row = cur.fetchone() return row[0] if row else None except Exception as e: self.last_error = f"Falha ao inserir documento: {str(e)}" return None def insert_documents_batch( self, documents: List[Tuple[str, str, List[float]]], session_id: Optional[str] = None, batch_size: int = 100 ) -> Tuple[int, int]: """ Insere múltiplos documentos em lote (otimizado) Args: documents: Lista de tuplas (title, content, embedding) session_id: ID da sessão batch_size: Tamanho do lote para inserção Returns: Tupla (total_inseridos, total_falhas) """ conn = self.connect() if not conn: return 0, len(documents) inserted = 0 failed = 0 try: with conn.cursor() as cur: # Processa em lotes for i in range(0, len(documents), batch_size): batch = documents[i:i + batch_size] # Prepara valores para executemany values = [ (session_id, title, content, embedding) for title, content, embedding in batch ] try: cur.executemany( "INSERT INTO documents (session_id, title, content, embedding) VALUES (%s, %s, %s, %s::vector)", values ) inserted += len(batch) except Exception: failed += len(batch) return inserted, failed except Exception as e: self.last_error = f"Falha no batch insert: {str(e)}" return inserted, len(documents) - inserted def search_similar( self, query_embedding: List[float], k: int = 4, session_id: Optional[str] = None ) -> List[Dict[str, Any]]: """Busca documentos similares usando cosine similarity""" conn = self.connect() if not conn: return [] try: with conn.cursor() as cur: if session_id: # Busca apenas documentos da sessão cur.execute( """ SELECT id, title, content, 1 - (embedding <=> %s::vector) as score FROM documents WHERE session_id = %s ORDER BY embedding <=> %s::vector LIMIT %s """, (query_embedding, session_id, query_embedding, k) ) else: # Busca em todos os documentos (backward compatibility) cur.execute( """ SELECT id, title, content, 1 - (embedding <=> %s::vector) as score FROM documents ORDER BY embedding <=> %s::vector LIMIT %s """, (query_embedding, query_embedding, k) ) rows = cur.fetchall() return [ { "id": r[0], "title": r[1], "content": r[2], "score": float(r[3]) } for r in rows ] except Exception as e: self.last_error = f"Falha na busca: {str(e)}" return [] def get_chat_id(self, session_id: str) -> Optional[int]: """Obtém ou cria ID do chat""" conn = self.connect() if not conn: return None try: with conn.cursor() as cur: cur.execute("SELECT id FROM chats WHERE session_id=%s", (session_id,)) row = cur.fetchone() if row: return row[0] cur.execute( "INSERT INTO chats (session_id) VALUES (%s) RETURNING id", (session_id,) ) row = cur.fetchone() return row[0] if row else None except Exception: return None def save_message(self, chat_id: int, role: str, content: str) -> bool: """Salva mensagem no histórico""" conn = self.connect() if not conn: return False try: with conn.cursor() as cur: cur.execute( "INSERT INTO messages (chat_id, role, content) VALUES (%s, %s, %s)", (chat_id, role, content) ) return True except Exception: return False def save_query_metric( self, session_id: str, query: str, num_results: int, retrieval_time_ms: float, generation_time_ms: float, total_time_ms: float, top_k: int ) -> bool: """Salva métrica de query para monitoramento""" conn = self.connect() if not conn: return False try: with conn.cursor() as cur: cur.execute( """ INSERT INTO query_metrics (session_id, query, num_results, retrieval_time_ms, generation_time_ms, total_time_ms, top_k) VALUES (%s, %s, %s, %s, %s, %s, %s) """, (session_id, query, num_results, retrieval_time_ms, generation_time_ms, total_time_ms, top_k) ) return True except Exception: return False def get_database_stats(self) -> Dict[str, Any]: """Obtém estatísticas do banco de dados""" conn = self.connect() if not conn: return {} try: stats = {} with conn.cursor() as cur: # Total de documentos cur.execute("SELECT COUNT(*) FROM documents") stats['total_documents'] = cur.fetchone()[0] # Total de chunks cur.execute("SELECT COUNT(*) FROM documents") stats['total_chunks'] = cur.fetchone()[0] # Total de chats cur.execute("SELECT COUNT(*) FROM chats") stats['total_chats'] = cur.fetchone()[0] # Total de mensagens cur.execute("SELECT COUNT(*) FROM messages") stats['total_messages'] = cur.fetchone()[0] # Total de queries (métricas) cur.execute("SELECT COUNT(*) FROM query_metrics") stats['total_queries'] = cur.fetchone()[0] return stats except Exception: return {} def get_all_documents(self, limit: int = 100, session_id: Optional[str] = None) -> List[Dict[str, Any]]: """Obtém lista de documentos""" conn = self.connect() if not conn: return [] try: with conn.cursor() as cur: if session_id: # Lista apenas documentos da sessão cur.execute( """ SELECT id, title, content, created_at FROM documents WHERE session_id = %s ORDER BY created_at DESC LIMIT %s """, (session_id, limit) ) else: # Lista todos os documentos (backward compatibility) cur.execute( """ SELECT id, title, content, created_at FROM documents ORDER BY created_at DESC LIMIT %s """, (limit,) ) rows = cur.fetchall() return [ { "id": r[0], "title": r[1], "content": r[2], "created_at": r[3] } for r in rows ] except Exception: return []