rag_template / src /metadata.py
Guilherme Favaron
Sync: Complete project update (Phase 6) - API, Metadata, Eval, Docs
a686b1b
"""
Sistema de gerenciamento de metadados para documentos.
Permite filtrar documentos por tipo, tags, autor, data, etc.
"""
from typing import Dict, List, Optional, Any
from datetime import datetime
import json
from dataclasses import dataclass, asdict
@dataclass
class DocumentMetadata:
"""Schema de metadados de documento."""
document_type: Optional[str] = None # PDF, TXT, MD, etc
upload_date: Optional[datetime] = None
tags: Optional[List[str]] = None
author: Optional[str] = None
language: Optional[str] = None
department: Optional[str] = None # Para enterprise
security_level: Optional[str] = None # public, internal, confidential
custom: Optional[Dict[str, Any]] = None # Campos customizados
def to_dict(self) -> Dict[str, Any]:
"""Converte para dicionario, lidando com datetime."""
data = asdict(self)
if self.upload_date:
data['upload_date'] = self.upload_date.isoformat()
# Remove campos None
return {k: v for k, v in data.items() if v is not None}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'DocumentMetadata':
"""Cria a partir de dicionario."""
if 'upload_date' in data and isinstance(data['upload_date'], str):
data['upload_date'] = datetime.fromisoformat(data['upload_date'])
return cls(**data)
def to_json(self) -> str:
"""Converte para JSON."""
return json.dumps(self.to_dict())
@classmethod
def from_json(cls, json_str: str) -> 'DocumentMetadata':
"""Cria a partir de JSON."""
return cls.from_dict(json.loads(json_str))
class MetadataManager:
"""Gerenciador de metadados de documentos."""
VALID_DOCUMENT_TYPES = ['PDF', 'TXT', 'MD', 'HTML', 'DOCX', 'CSV', 'JSON']
VALID_SECURITY_LEVELS = ['public', 'internal', 'confidential', 'restricted']
def __init__(self, db_manager):
"""
Inicializa gerenciador.
Args:
db_manager: Instancia de DatabaseManager
"""
self.db = db_manager
def validate_metadata(self, metadata: DocumentMetadata) -> bool:
"""
Valida schema de metadata.
Args:
metadata: Metadados a validar
Returns:
True se valido
Raises:
ValueError: Se metadata invalido
"""
# Validar document_type
if metadata.document_type and metadata.document_type.upper() not in self.VALID_DOCUMENT_TYPES:
raise ValueError(
f"document_type invalido: {metadata.document_type}. "
f"Validos: {', '.join(self.VALID_DOCUMENT_TYPES)}"
)
# Validar security_level
if metadata.security_level and metadata.security_level not in self.VALID_SECURITY_LEVELS:
raise ValueError(
f"security_level invalido: {metadata.security_level}. "
f"Validos: {', '.join(self.VALID_SECURITY_LEVELS)}"
)
# Validar tags
if metadata.tags:
if not isinstance(metadata.tags, list):
raise ValueError("tags deve ser uma lista")
if not all(isinstance(tag, str) for tag in metadata.tags):
raise ValueError("Todas as tags devem ser strings")
return True
def update_document_metadata(
self,
document_id: int,
metadata: DocumentMetadata
) -> bool:
"""
Atualiza metadata de um documento.
Args:
document_id: ID do documento
metadata: Novos metadados
Returns:
True se atualizado com sucesso
"""
# Validar metadata
self.validate_metadata(metadata)
# Converter para JSON
metadata_json = metadata.to_json()
# Atualizar no banco
query = """
UPDATE documents
SET metadata = %s::jsonb
WHERE id = %s
"""
with self.db.get_connection() as conn:
with conn.cursor() as cur:
cur.execute(query, (metadata_json, document_id))
conn.commit()
return cur.rowcount > 0
def get_document_metadata(self, document_id: int) -> Optional[DocumentMetadata]:
"""
Recupera metadata de um documento.
Args:
document_id: ID do documento
Returns:
DocumentMetadata ou None
"""
query = """
SELECT metadata
FROM documents
WHERE id = %s
"""
with self.db.get_connection() as conn:
with conn.cursor() as cur:
cur.execute(query, (document_id,))
result = cur.fetchone()
if result and result[0]:
return DocumentMetadata.from_dict(result[0])
return None
def search_with_filters(
self,
query_embedding: List[float],
filters: Optional[Dict[str, Any]] = None,
top_k: int = 5,
session_id: Optional[str] = None
) -> List[Dict[str, Any]]:
"""
Busca vetorial com filtros de metadata.
Args:
query_embedding: Embedding da query
filters: Filtros a aplicar (ex: {"document_type": "PDF", "tags": ["tech"]})
top_k: Numero de resultados
session_id: ID da sessao
Returns:
Lista de documentos com scores
"""
# Construir WHERE clause baseado em filtros
where_clauses = []
params = [query_embedding, top_k]
if session_id:
where_clauses.append("session_id = %s")
params.insert(0, session_id)
if filters:
# Filtro por document_type
if 'document_type' in filters:
where_clauses.append("metadata->>'document_type' = %s")
params.insert(-1, filters['document_type'])
# Filtro por tags (qualquer tag no array)
if 'tags' in filters:
tags = filters['tags'] if isinstance(filters['tags'], list) else [filters['tags']]
where_clauses.append("metadata->'tags' ?| %s")
params.insert(-1, tags)
# Filtro por author
if 'author' in filters:
where_clauses.append("metadata->>'author' = %s")
params.insert(-1, filters['author'])
# Filtro por security_level
if 'security_level' in filters:
where_clauses.append("metadata->>'security_level' = %s")
params.insert(-1, filters['security_level'])
# Filtro por department
if 'department' in filters:
where_clauses.append("metadata->>'department' = %s")
params.insert(-1, filters['department'])
# Filtro por data (upload_date maior que)
if 'upload_date_from' in filters:
where_clauses.append("(metadata->>'upload_date')::timestamp >= %s")
params.insert(-1, filters['upload_date_from'])
# Filtro por data (upload_date menor que)
if 'upload_date_to' in filters:
where_clauses.append("(metadata->>'upload_date')::timestamp <= %s")
params.insert(-1, filters['upload_date_to'])
# Montar WHERE clause
where_sql = " AND ".join(where_clauses) if where_clauses else "1=1"
# Query com filtros
query = f"""
SELECT
id,
title,
content,
source,
metadata,
1 - (embedding <=> %s::vector) AS similarity
FROM documents
WHERE {where_sql}
ORDER BY embedding <=> %s::vector
LIMIT %s
"""
with self.db.get_connection() as conn:
with conn.cursor() as cur:
cur.execute(query, params)
results = cur.fetchall()
documents = []
for row in results:
doc = {
'id': row[0],
'title': row[1],
'content': row[2],
'source': row[3],
'metadata': row[4],
'similarity': float(row[5])
}
documents.append(doc)
return documents
def get_available_filters(self, session_id: Optional[str] = None) -> Dict[str, List[str]]:
"""
Retorna valores disponiveis para cada filtro.
Args:
session_id: ID da sessao (opcional)
Returns:
Dicionario com valores unicos por campo
"""
where_clause = "WHERE session_id = %s" if session_id else ""
params = [session_id] if session_id else []
query = f"""
SELECT
DISTINCT metadata->>'document_type' as document_type,
DISTINCT metadata->>'author' as author,
DISTINCT metadata->>'department' as department,
DISTINCT metadata->>'security_level' as security_level
FROM documents
{where_clause}
WHERE metadata IS NOT NULL
"""
with self.db.get_connection() as conn:
with conn.cursor() as cur:
cur.execute(query, params)
# Agregar valores
filters = {
'document_types': set(),
'authors': set(),
'departments': set(),
'security_levels': set(),
'tags': set()
}
# Query para tags (JSONB array)
tags_query = f"""
SELECT DISTINCT jsonb_array_elements_text(metadata->'tags') as tag
FROM documents
{where_clause}
WHERE metadata->'tags' IS NOT NULL
"""
cur.execute(tags_query, params)
for row in cur.fetchall():
if row[0]:
filters['tags'].add(row[0])
# Query para outros campos
for field in ['document_type', 'author', 'department', 'security_level']:
field_query = f"""
SELECT DISTINCT metadata->>'{field}' as value
FROM documents
{where_clause}
WHERE metadata->>'{field}' IS NOT NULL
"""
cur.execute(field_query, params)
key = f"{field}s"
for row in cur.fetchall():
if row[0]:
filters[key].add(row[0])
# Converter sets para listas ordenadas
return {k: sorted(list(v)) for k, v in filters.items()}
def get_documents_count_by_metadata(
self,
session_id: Optional[str] = None
) -> Dict[str, int]:
"""
Retorna contagem de documentos por metadata.
Args:
session_id: ID da sessao
Returns:
Dicionario com contagens
"""
where_clause = "WHERE session_id = %s" if session_id else ""
params = [session_id] if session_id else []
stats = {}
with self.db.get_connection() as conn:
with conn.cursor() as cur:
# Count por document_type
query = f"""
SELECT
metadata->>'document_type' as type,
COUNT(*) as count
FROM documents
{where_clause}
GROUP BY metadata->>'document_type'
"""
cur.execute(query, params)
stats['by_type'] = {row[0]: row[1] for row in cur.fetchall() if row[0]}
# Count por security_level
query = f"""
SELECT
metadata->>'security_level' as level,
COUNT(*) as count
FROM documents
{where_clause}
GROUP BY metadata->>'security_level'
"""
cur.execute(query, params)
stats['by_security'] = {row[0]: row[1] for row in cur.fetchall() if row[0]}
# Count total
query = f"""
SELECT COUNT(*)
FROM documents
{where_clause}
"""
cur.execute(query, params)
stats['total'] = cur.fetchone()[0]
return stats