rag_template / src /database.py
Guilherme Favaron
Major update: Add hybrid search, reranking, multiple LLMs, and UI improvements
1b447de
"""
Gerenciamento de conexão e operações com PostgreSQL + pgvector
"""
import time
from typing import Optional, List, Dict, Any, Tuple
import psycopg
from pgvector.psycopg import register_vector
import numpy as np
from .config import DATABASE_URL, EMBEDDING_DIM, IVFFLAT_LISTS
class DatabaseManager:
"""Gerenciador de conexão e operações com banco de dados"""
def __init__(self):
self.conn: Optional[psycopg.Connection] = None
self.last_error: str = ""
def connect(self) -> Optional[psycopg.Connection]:
"""Estabelece conexão com o banco de dados"""
if not DATABASE_URL:
self.last_error = "DATABASE_URL ausente"
return None
# Verifica se conexão existente ainda está ativa
if self.conn is not None:
try:
with self.conn.cursor() as cur:
cur.execute("SELECT 1")
return self.conn
except Exception:
try:
self.conn.close()
except Exception:
pass
self.conn = None
# Tenta estabelecer nova conexão com retry
attempts = 0
delay = 0.5
while attempts < 10:
try:
self.conn = psycopg.connect(DATABASE_URL, autocommit=True)
register_vector(self.conn)
with self.conn.cursor() as cur:
cur.execute("SELECT 1")
cur.fetchone()
self.last_error = ""
return self.conn
except Exception as e:
self.last_error = f"Falha na conexão: {str(e)}"
time.sleep(delay)
attempts += 1
delay = min(delay * 2, 5)
self.conn = None
return None
def init_schema(self) -> bool:
"""Inicializa schema do banco de dados"""
conn = self.connect()
if not conn:
return False
try:
with conn.cursor() as cur:
# Habilita extensão pgvector
cur.execute("CREATE EXTENSION IF NOT EXISTS vector")
# Tabela de documentos
cur.execute(
f"""
CREATE TABLE IF NOT EXISTS documents (
id BIGSERIAL PRIMARY KEY,
session_id TEXT,
title TEXT,
content TEXT,
embedding vector({EMBEDDING_DIM}),
created_at TIMESTAMP DEFAULT NOW()
)
"""
)
# Adiciona coluna session_id se não existir (migration)
cur.execute(
"""
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM information_schema.columns
WHERE table_name='documents' AND column_name='session_id'
) THEN
ALTER TABLE documents ADD COLUMN session_id TEXT;
END IF;
END $$;
"""
)
# Tabela de chats
cur.execute(
"""
CREATE TABLE IF NOT EXISTS chats (
id BIGSERIAL PRIMARY KEY,
session_id TEXT UNIQUE,
created_at TIMESTAMP DEFAULT NOW()
)
"""
)
# Tabela de mensagens
cur.execute(
"""
CREATE TABLE IF NOT EXISTS messages (
id BIGSERIAL PRIMARY KEY,
chat_id BIGINT REFERENCES chats(id) ON DELETE CASCADE,
role TEXT,
content TEXT,
created_at TIMESTAMP DEFAULT NOW()
)
"""
)
# Tabela de métricas (para monitoramento)
cur.execute(
"""
CREATE TABLE IF NOT EXISTS query_metrics (
id BIGSERIAL PRIMARY KEY,
session_id TEXT,
query TEXT,
num_results INT,
retrieval_time_ms FLOAT,
generation_time_ms FLOAT,
total_time_ms FLOAT,
top_k INT,
created_at TIMESTAMP DEFAULT NOW()
)
"""
)
return True
except Exception as e:
self.last_error = f"Falha ao criar schema: {str(e)}"
return False
def create_index(self, lists: int = IVFFLAT_LISTS) -> Tuple[bool, str]:
"""Cria ou recria índice IVFFLAT para embeddings"""
conn = self.connect()
if not conn:
return False, "Banco não conectado"
try:
with conn.cursor() as cur:
cur.execute("DROP INDEX IF EXISTS idx_documents_embedding_cosine")
cur.execute(
f"""
CREATE INDEX idx_documents_embedding_cosine
ON documents USING ivfflat (embedding vector_cosine_ops)
WITH (lists = {int(lists)})
"""
)
cur.execute("ANALYZE documents")
return True, f"Índice criado com lists={lists}"
except Exception as e:
return False, f"Falha ao criar índice: {str(e)}"
def insert_document(
self,
title: str,
content: str,
embedding: List[float],
session_id: Optional[str] = None
) -> Optional[int]:
"""Insere documento no banco"""
conn = self.connect()
if not conn:
return None
try:
with conn.cursor() as cur:
cur.execute(
"INSERT INTO documents (session_id, title, content, embedding) VALUES (%s, %s, %s, %s::vector) RETURNING id",
(session_id, title, content, embedding)
)
row = cur.fetchone()
return row[0] if row else None
except Exception as e:
self.last_error = f"Falha ao inserir documento: {str(e)}"
return None
def insert_documents_batch(
self,
documents: List[Tuple[str, str, List[float]]],
session_id: Optional[str] = None,
batch_size: int = 100
) -> Tuple[int, int]:
"""
Insere múltiplos documentos em lote (otimizado)
Args:
documents: Lista de tuplas (title, content, embedding)
session_id: ID da sessão
batch_size: Tamanho do lote para inserção
Returns:
Tupla (total_inseridos, total_falhas)
"""
conn = self.connect()
if not conn:
return 0, len(documents)
inserted = 0
failed = 0
try:
with conn.cursor() as cur:
# Processa em lotes
for i in range(0, len(documents), batch_size):
batch = documents[i:i + batch_size]
# Prepara valores para executemany
values = [
(session_id, title, content, embedding)
for title, content, embedding in batch
]
try:
cur.executemany(
"INSERT INTO documents (session_id, title, content, embedding) VALUES (%s, %s, %s, %s::vector)",
values
)
inserted += len(batch)
except Exception:
failed += len(batch)
return inserted, failed
except Exception as e:
self.last_error = f"Falha no batch insert: {str(e)}"
return inserted, len(documents) - inserted
def search_similar(
self,
query_embedding: List[float],
k: int = 4,
session_id: Optional[str] = None
) -> List[Dict[str, Any]]:
"""Busca documentos similares usando cosine similarity"""
conn = self.connect()
if not conn:
return []
try:
with conn.cursor() as cur:
if session_id:
# Busca apenas documentos da sessão
cur.execute(
"""
SELECT id, title, content, 1 - (embedding <=> %s::vector) as score
FROM documents
WHERE session_id = %s
ORDER BY embedding <=> %s::vector
LIMIT %s
""",
(query_embedding, session_id, query_embedding, k)
)
else:
# Busca em todos os documentos (backward compatibility)
cur.execute(
"""
SELECT id, title, content, 1 - (embedding <=> %s::vector) as score
FROM documents
ORDER BY embedding <=> %s::vector
LIMIT %s
""",
(query_embedding, query_embedding, k)
)
rows = cur.fetchall()
return [
{
"id": r[0],
"title": r[1],
"content": r[2],
"score": float(r[3])
}
for r in rows
]
except Exception as e:
self.last_error = f"Falha na busca: {str(e)}"
return []
def get_chat_id(self, session_id: str) -> Optional[int]:
"""Obtém ou cria ID do chat"""
conn = self.connect()
if not conn:
return None
try:
with conn.cursor() as cur:
cur.execute("SELECT id FROM chats WHERE session_id=%s", (session_id,))
row = cur.fetchone()
if row:
return row[0]
cur.execute(
"INSERT INTO chats (session_id) VALUES (%s) RETURNING id",
(session_id,)
)
row = cur.fetchone()
return row[0] if row else None
except Exception:
return None
def save_message(self, chat_id: int, role: str, content: str) -> bool:
"""Salva mensagem no histórico"""
conn = self.connect()
if not conn:
return False
try:
with conn.cursor() as cur:
cur.execute(
"INSERT INTO messages (chat_id, role, content) VALUES (%s, %s, %s)",
(chat_id, role, content)
)
return True
except Exception:
return False
def save_query_metric(
self,
session_id: str,
query: str,
num_results: int,
retrieval_time_ms: float,
generation_time_ms: float,
total_time_ms: float,
top_k: int
) -> bool:
"""Salva métrica de query para monitoramento"""
conn = self.connect()
if not conn:
return False
try:
with conn.cursor() as cur:
cur.execute(
"""
INSERT INTO query_metrics
(session_id, query, num_results, retrieval_time_ms,
generation_time_ms, total_time_ms, top_k)
VALUES (%s, %s, %s, %s, %s, %s, %s)
""",
(session_id, query, num_results, retrieval_time_ms,
generation_time_ms, total_time_ms, top_k)
)
return True
except Exception:
return False
def get_database_stats(self) -> Dict[str, Any]:
"""Obtém estatísticas do banco de dados"""
conn = self.connect()
if not conn:
return {}
try:
stats = {}
with conn.cursor() as cur:
# Total de documentos
cur.execute("SELECT COUNT(*) FROM documents")
stats['total_documents'] = cur.fetchone()[0]
# Total de chunks
cur.execute("SELECT COUNT(*) FROM documents")
stats['total_chunks'] = cur.fetchone()[0]
# Total de chats
cur.execute("SELECT COUNT(*) FROM chats")
stats['total_chats'] = cur.fetchone()[0]
# Total de mensagens
cur.execute("SELECT COUNT(*) FROM messages")
stats['total_messages'] = cur.fetchone()[0]
# Total de queries (métricas)
cur.execute("SELECT COUNT(*) FROM query_metrics")
stats['total_queries'] = cur.fetchone()[0]
return stats
except Exception:
return {}
def get_all_documents(self, limit: int = 100, session_id: Optional[str] = None) -> List[Dict[str, Any]]:
"""Obtém lista de documentos"""
conn = self.connect()
if not conn:
return []
try:
with conn.cursor() as cur:
if session_id:
# Lista apenas documentos da sessão
cur.execute(
"""
SELECT id, title, content, created_at
FROM documents
WHERE session_id = %s
ORDER BY created_at DESC
LIMIT %s
""",
(session_id, limit)
)
else:
# Lista todos os documentos (backward compatibility)
cur.execute(
"""
SELECT id, title, content, created_at
FROM documents
ORDER BY created_at DESC
LIMIT %s
""",
(limit,)
)
rows = cur.fetchall()
return [
{
"id": r[0],
"title": r[1],
"content": r[2],
"created_at": r[3]
}
for r in rows
]
except Exception:
return []