Spaces:
Sleeping
Sleeping
Guilherme Favaron
Major update: Add hybrid search, reranking, multiple LLMs, and UI improvements
1b447de
| """ | |
| Gerenciamento de conexão e operações com PostgreSQL + pgvector | |
| """ | |
| import time | |
| from typing import Optional, List, Dict, Any, Tuple | |
| import psycopg | |
| from pgvector.psycopg import register_vector | |
| import numpy as np | |
| from .config import DATABASE_URL, EMBEDDING_DIM, IVFFLAT_LISTS | |
| class DatabaseManager: | |
| """Gerenciador de conexão e operações com banco de dados""" | |
| def __init__(self): | |
| self.conn: Optional[psycopg.Connection] = None | |
| self.last_error: str = "" | |
| def connect(self) -> Optional[psycopg.Connection]: | |
| """Estabelece conexão com o banco de dados""" | |
| if not DATABASE_URL: | |
| self.last_error = "DATABASE_URL ausente" | |
| return None | |
| # Verifica se conexão existente ainda está ativa | |
| if self.conn is not None: | |
| try: | |
| with self.conn.cursor() as cur: | |
| cur.execute("SELECT 1") | |
| return self.conn | |
| except Exception: | |
| try: | |
| self.conn.close() | |
| except Exception: | |
| pass | |
| self.conn = None | |
| # Tenta estabelecer nova conexão com retry | |
| attempts = 0 | |
| delay = 0.5 | |
| while attempts < 10: | |
| try: | |
| self.conn = psycopg.connect(DATABASE_URL, autocommit=True) | |
| register_vector(self.conn) | |
| with self.conn.cursor() as cur: | |
| cur.execute("SELECT 1") | |
| cur.fetchone() | |
| self.last_error = "" | |
| return self.conn | |
| except Exception as e: | |
| self.last_error = f"Falha na conexão: {str(e)}" | |
| time.sleep(delay) | |
| attempts += 1 | |
| delay = min(delay * 2, 5) | |
| self.conn = None | |
| return None | |
| def init_schema(self) -> bool: | |
| """Inicializa schema do banco de dados""" | |
| conn = self.connect() | |
| if not conn: | |
| return False | |
| try: | |
| with conn.cursor() as cur: | |
| # Habilita extensão pgvector | |
| cur.execute("CREATE EXTENSION IF NOT EXISTS vector") | |
| # Tabela de documentos | |
| cur.execute( | |
| f""" | |
| CREATE TABLE IF NOT EXISTS documents ( | |
| id BIGSERIAL PRIMARY KEY, | |
| session_id TEXT, | |
| title TEXT, | |
| content TEXT, | |
| embedding vector({EMBEDDING_DIM}), | |
| created_at TIMESTAMP DEFAULT NOW() | |
| ) | |
| """ | |
| ) | |
| # Adiciona coluna session_id se não existir (migration) | |
| cur.execute( | |
| """ | |
| DO $$ | |
| BEGIN | |
| IF NOT EXISTS ( | |
| SELECT 1 FROM information_schema.columns | |
| WHERE table_name='documents' AND column_name='session_id' | |
| ) THEN | |
| ALTER TABLE documents ADD COLUMN session_id TEXT; | |
| END IF; | |
| END $$; | |
| """ | |
| ) | |
| # Tabela de chats | |
| cur.execute( | |
| """ | |
| CREATE TABLE IF NOT EXISTS chats ( | |
| id BIGSERIAL PRIMARY KEY, | |
| session_id TEXT UNIQUE, | |
| created_at TIMESTAMP DEFAULT NOW() | |
| ) | |
| """ | |
| ) | |
| # Tabela de mensagens | |
| cur.execute( | |
| """ | |
| CREATE TABLE IF NOT EXISTS messages ( | |
| id BIGSERIAL PRIMARY KEY, | |
| chat_id BIGINT REFERENCES chats(id) ON DELETE CASCADE, | |
| role TEXT, | |
| content TEXT, | |
| created_at TIMESTAMP DEFAULT NOW() | |
| ) | |
| """ | |
| ) | |
| # Tabela de métricas (para monitoramento) | |
| cur.execute( | |
| """ | |
| CREATE TABLE IF NOT EXISTS query_metrics ( | |
| id BIGSERIAL PRIMARY KEY, | |
| session_id TEXT, | |
| query TEXT, | |
| num_results INT, | |
| retrieval_time_ms FLOAT, | |
| generation_time_ms FLOAT, | |
| total_time_ms FLOAT, | |
| top_k INT, | |
| created_at TIMESTAMP DEFAULT NOW() | |
| ) | |
| """ | |
| ) | |
| return True | |
| except Exception as e: | |
| self.last_error = f"Falha ao criar schema: {str(e)}" | |
| return False | |
| def create_index(self, lists: int = IVFFLAT_LISTS) -> Tuple[bool, str]: | |
| """Cria ou recria índice IVFFLAT para embeddings""" | |
| conn = self.connect() | |
| if not conn: | |
| return False, "Banco não conectado" | |
| try: | |
| with conn.cursor() as cur: | |
| cur.execute("DROP INDEX IF EXISTS idx_documents_embedding_cosine") | |
| cur.execute( | |
| f""" | |
| CREATE INDEX idx_documents_embedding_cosine | |
| ON documents USING ivfflat (embedding vector_cosine_ops) | |
| WITH (lists = {int(lists)}) | |
| """ | |
| ) | |
| cur.execute("ANALYZE documents") | |
| return True, f"Índice criado com lists={lists}" | |
| except Exception as e: | |
| return False, f"Falha ao criar índice: {str(e)}" | |
| def insert_document( | |
| self, | |
| title: str, | |
| content: str, | |
| embedding: List[float], | |
| session_id: Optional[str] = None | |
| ) -> Optional[int]: | |
| """Insere documento no banco""" | |
| conn = self.connect() | |
| if not conn: | |
| return None | |
| try: | |
| with conn.cursor() as cur: | |
| cur.execute( | |
| "INSERT INTO documents (session_id, title, content, embedding) VALUES (%s, %s, %s, %s::vector) RETURNING id", | |
| (session_id, title, content, embedding) | |
| ) | |
| row = cur.fetchone() | |
| return row[0] if row else None | |
| except Exception as e: | |
| self.last_error = f"Falha ao inserir documento: {str(e)}" | |
| return None | |
| def insert_documents_batch( | |
| self, | |
| documents: List[Tuple[str, str, List[float]]], | |
| session_id: Optional[str] = None, | |
| batch_size: int = 100 | |
| ) -> Tuple[int, int]: | |
| """ | |
| Insere múltiplos documentos em lote (otimizado) | |
| Args: | |
| documents: Lista de tuplas (title, content, embedding) | |
| session_id: ID da sessão | |
| batch_size: Tamanho do lote para inserção | |
| Returns: | |
| Tupla (total_inseridos, total_falhas) | |
| """ | |
| conn = self.connect() | |
| if not conn: | |
| return 0, len(documents) | |
| inserted = 0 | |
| failed = 0 | |
| try: | |
| with conn.cursor() as cur: | |
| # Processa em lotes | |
| for i in range(0, len(documents), batch_size): | |
| batch = documents[i:i + batch_size] | |
| # Prepara valores para executemany | |
| values = [ | |
| (session_id, title, content, embedding) | |
| for title, content, embedding in batch | |
| ] | |
| try: | |
| cur.executemany( | |
| "INSERT INTO documents (session_id, title, content, embedding) VALUES (%s, %s, %s, %s::vector)", | |
| values | |
| ) | |
| inserted += len(batch) | |
| except Exception: | |
| failed += len(batch) | |
| return inserted, failed | |
| except Exception as e: | |
| self.last_error = f"Falha no batch insert: {str(e)}" | |
| return inserted, len(documents) - inserted | |
| def search_similar( | |
| self, | |
| query_embedding: List[float], | |
| k: int = 4, | |
| session_id: Optional[str] = None | |
| ) -> List[Dict[str, Any]]: | |
| """Busca documentos similares usando cosine similarity""" | |
| conn = self.connect() | |
| if not conn: | |
| return [] | |
| try: | |
| with conn.cursor() as cur: | |
| if session_id: | |
| # Busca apenas documentos da sessão | |
| cur.execute( | |
| """ | |
| SELECT id, title, content, 1 - (embedding <=> %s::vector) as score | |
| FROM documents | |
| WHERE session_id = %s | |
| ORDER BY embedding <=> %s::vector | |
| LIMIT %s | |
| """, | |
| (query_embedding, session_id, query_embedding, k) | |
| ) | |
| else: | |
| # Busca em todos os documentos (backward compatibility) | |
| cur.execute( | |
| """ | |
| SELECT id, title, content, 1 - (embedding <=> %s::vector) as score | |
| FROM documents | |
| ORDER BY embedding <=> %s::vector | |
| LIMIT %s | |
| """, | |
| (query_embedding, query_embedding, k) | |
| ) | |
| rows = cur.fetchall() | |
| return [ | |
| { | |
| "id": r[0], | |
| "title": r[1], | |
| "content": r[2], | |
| "score": float(r[3]) | |
| } | |
| for r in rows | |
| ] | |
| except Exception as e: | |
| self.last_error = f"Falha na busca: {str(e)}" | |
| return [] | |
| def get_chat_id(self, session_id: str) -> Optional[int]: | |
| """Obtém ou cria ID do chat""" | |
| conn = self.connect() | |
| if not conn: | |
| return None | |
| try: | |
| with conn.cursor() as cur: | |
| cur.execute("SELECT id FROM chats WHERE session_id=%s", (session_id,)) | |
| row = cur.fetchone() | |
| if row: | |
| return row[0] | |
| cur.execute( | |
| "INSERT INTO chats (session_id) VALUES (%s) RETURNING id", | |
| (session_id,) | |
| ) | |
| row = cur.fetchone() | |
| return row[0] if row else None | |
| except Exception: | |
| return None | |
| def save_message(self, chat_id: int, role: str, content: str) -> bool: | |
| """Salva mensagem no histórico""" | |
| conn = self.connect() | |
| if not conn: | |
| return False | |
| try: | |
| with conn.cursor() as cur: | |
| cur.execute( | |
| "INSERT INTO messages (chat_id, role, content) VALUES (%s, %s, %s)", | |
| (chat_id, role, content) | |
| ) | |
| return True | |
| except Exception: | |
| return False | |
| def save_query_metric( | |
| self, | |
| session_id: str, | |
| query: str, | |
| num_results: int, | |
| retrieval_time_ms: float, | |
| generation_time_ms: float, | |
| total_time_ms: float, | |
| top_k: int | |
| ) -> bool: | |
| """Salva métrica de query para monitoramento""" | |
| conn = self.connect() | |
| if not conn: | |
| return False | |
| try: | |
| with conn.cursor() as cur: | |
| cur.execute( | |
| """ | |
| INSERT INTO query_metrics | |
| (session_id, query, num_results, retrieval_time_ms, | |
| generation_time_ms, total_time_ms, top_k) | |
| VALUES (%s, %s, %s, %s, %s, %s, %s) | |
| """, | |
| (session_id, query, num_results, retrieval_time_ms, | |
| generation_time_ms, total_time_ms, top_k) | |
| ) | |
| return True | |
| except Exception: | |
| return False | |
| def get_database_stats(self) -> Dict[str, Any]: | |
| """Obtém estatísticas do banco de dados""" | |
| conn = self.connect() | |
| if not conn: | |
| return {} | |
| try: | |
| stats = {} | |
| with conn.cursor() as cur: | |
| # Total de documentos | |
| cur.execute("SELECT COUNT(*) FROM documents") | |
| stats['total_documents'] = cur.fetchone()[0] | |
| # Total de chunks | |
| cur.execute("SELECT COUNT(*) FROM documents") | |
| stats['total_chunks'] = cur.fetchone()[0] | |
| # Total de chats | |
| cur.execute("SELECT COUNT(*) FROM chats") | |
| stats['total_chats'] = cur.fetchone()[0] | |
| # Total de mensagens | |
| cur.execute("SELECT COUNT(*) FROM messages") | |
| stats['total_messages'] = cur.fetchone()[0] | |
| # Total de queries (métricas) | |
| cur.execute("SELECT COUNT(*) FROM query_metrics") | |
| stats['total_queries'] = cur.fetchone()[0] | |
| return stats | |
| except Exception: | |
| return {} | |
| def get_all_documents(self, limit: int = 100, session_id: Optional[str] = None) -> List[Dict[str, Any]]: | |
| """Obtém lista de documentos""" | |
| conn = self.connect() | |
| if not conn: | |
| return [] | |
| try: | |
| with conn.cursor() as cur: | |
| if session_id: | |
| # Lista apenas documentos da sessão | |
| cur.execute( | |
| """ | |
| SELECT id, title, content, created_at | |
| FROM documents | |
| WHERE session_id = %s | |
| ORDER BY created_at DESC | |
| LIMIT %s | |
| """, | |
| (session_id, limit) | |
| ) | |
| else: | |
| # Lista todos os documentos (backward compatibility) | |
| cur.execute( | |
| """ | |
| SELECT id, title, content, created_at | |
| FROM documents | |
| ORDER BY created_at DESC | |
| LIMIT %s | |
| """, | |
| (limit,) | |
| ) | |
| rows = cur.fetchall() | |
| return [ | |
| { | |
| "id": r[0], | |
| "title": r[1], | |
| "content": r[2], | |
| "created_at": r[3] | |
| } | |
| for r in rows | |
| ] | |
| except Exception: | |
| return [] | |