""" Sistema de gerenciamento de metadados para documentos. Permite filtrar documentos por tipo, tags, autor, data, etc. """ from typing import Dict, List, Optional, Any from datetime import datetime import json from dataclasses import dataclass, asdict @dataclass class DocumentMetadata: """Schema de metadados de documento.""" document_type: Optional[str] = None # PDF, TXT, MD, etc upload_date: Optional[datetime] = None tags: Optional[List[str]] = None author: Optional[str] = None language: Optional[str] = None department: Optional[str] = None # Para enterprise security_level: Optional[str] = None # public, internal, confidential custom: Optional[Dict[str, Any]] = None # Campos customizados def to_dict(self) -> Dict[str, Any]: """Converte para dicionario, lidando com datetime.""" data = asdict(self) if self.upload_date: data['upload_date'] = self.upload_date.isoformat() # Remove campos None return {k: v for k, v in data.items() if v is not None} @classmethod def from_dict(cls, data: Dict[str, Any]) -> 'DocumentMetadata': """Cria a partir de dicionario.""" if 'upload_date' in data and isinstance(data['upload_date'], str): data['upload_date'] = datetime.fromisoformat(data['upload_date']) return cls(**data) def to_json(self) -> str: """Converte para JSON.""" return json.dumps(self.to_dict()) @classmethod def from_json(cls, json_str: str) -> 'DocumentMetadata': """Cria a partir de JSON.""" return cls.from_dict(json.loads(json_str)) class MetadataManager: """Gerenciador de metadados de documentos.""" VALID_DOCUMENT_TYPES = ['PDF', 'TXT', 'MD', 'HTML', 'DOCX', 'CSV', 'JSON'] VALID_SECURITY_LEVELS = ['public', 'internal', 'confidential', 'restricted'] def __init__(self, db_manager): """ Inicializa gerenciador. Args: db_manager: Instancia de DatabaseManager """ self.db = db_manager def validate_metadata(self, metadata: DocumentMetadata) -> bool: """ Valida schema de metadata. Args: metadata: Metadados a validar Returns: True se valido Raises: ValueError: Se metadata invalido """ # Validar document_type if metadata.document_type and metadata.document_type.upper() not in self.VALID_DOCUMENT_TYPES: raise ValueError( f"document_type invalido: {metadata.document_type}. " f"Validos: {', '.join(self.VALID_DOCUMENT_TYPES)}" ) # Validar security_level if metadata.security_level and metadata.security_level not in self.VALID_SECURITY_LEVELS: raise ValueError( f"security_level invalido: {metadata.security_level}. " f"Validos: {', '.join(self.VALID_SECURITY_LEVELS)}" ) # Validar tags if metadata.tags: if not isinstance(metadata.tags, list): raise ValueError("tags deve ser uma lista") if not all(isinstance(tag, str) for tag in metadata.tags): raise ValueError("Todas as tags devem ser strings") return True def update_document_metadata( self, document_id: int, metadata: DocumentMetadata ) -> bool: """ Atualiza metadata de um documento. Args: document_id: ID do documento metadata: Novos metadados Returns: True se atualizado com sucesso """ # Validar metadata self.validate_metadata(metadata) # Converter para JSON metadata_json = metadata.to_json() # Atualizar no banco query = """ UPDATE documents SET metadata = %s::jsonb WHERE id = %s """ with self.db.get_connection() as conn: with conn.cursor() as cur: cur.execute(query, (metadata_json, document_id)) conn.commit() return cur.rowcount > 0 def get_document_metadata(self, document_id: int) -> Optional[DocumentMetadata]: """ Recupera metadata de um documento. Args: document_id: ID do documento Returns: DocumentMetadata ou None """ query = """ SELECT metadata FROM documents WHERE id = %s """ with self.db.get_connection() as conn: with conn.cursor() as cur: cur.execute(query, (document_id,)) result = cur.fetchone() if result and result[0]: return DocumentMetadata.from_dict(result[0]) return None def search_with_filters( self, query_embedding: List[float], filters: Optional[Dict[str, Any]] = None, top_k: int = 5, session_id: Optional[str] = None ) -> List[Dict[str, Any]]: """ Busca vetorial com filtros de metadata. Args: query_embedding: Embedding da query filters: Filtros a aplicar (ex: {"document_type": "PDF", "tags": ["tech"]}) top_k: Numero de resultados session_id: ID da sessao Returns: Lista de documentos com scores """ # Construir WHERE clause baseado em filtros where_clauses = [] params = [query_embedding, top_k] if session_id: where_clauses.append("session_id = %s") params.insert(0, session_id) if filters: # Filtro por document_type if 'document_type' in filters: where_clauses.append("metadata->>'document_type' = %s") params.insert(-1, filters['document_type']) # Filtro por tags (qualquer tag no array) if 'tags' in filters: tags = filters['tags'] if isinstance(filters['tags'], list) else [filters['tags']] where_clauses.append("metadata->'tags' ?| %s") params.insert(-1, tags) # Filtro por author if 'author' in filters: where_clauses.append("metadata->>'author' = %s") params.insert(-1, filters['author']) # Filtro por security_level if 'security_level' in filters: where_clauses.append("metadata->>'security_level' = %s") params.insert(-1, filters['security_level']) # Filtro por department if 'department' in filters: where_clauses.append("metadata->>'department' = %s") params.insert(-1, filters['department']) # Filtro por data (upload_date maior que) if 'upload_date_from' in filters: where_clauses.append("(metadata->>'upload_date')::timestamp >= %s") params.insert(-1, filters['upload_date_from']) # Filtro por data (upload_date menor que) if 'upload_date_to' in filters: where_clauses.append("(metadata->>'upload_date')::timestamp <= %s") params.insert(-1, filters['upload_date_to']) # Montar WHERE clause where_sql = " AND ".join(where_clauses) if where_clauses else "1=1" # Query com filtros query = f""" SELECT id, title, content, source, metadata, 1 - (embedding <=> %s::vector) AS similarity FROM documents WHERE {where_sql} ORDER BY embedding <=> %s::vector LIMIT %s """ with self.db.get_connection() as conn: with conn.cursor() as cur: cur.execute(query, params) results = cur.fetchall() documents = [] for row in results: doc = { 'id': row[0], 'title': row[1], 'content': row[2], 'source': row[3], 'metadata': row[4], 'similarity': float(row[5]) } documents.append(doc) return documents def get_available_filters(self, session_id: Optional[str] = None) -> Dict[str, List[str]]: """ Retorna valores disponiveis para cada filtro. Args: session_id: ID da sessao (opcional) Returns: Dicionario com valores unicos por campo """ where_clause = "WHERE session_id = %s" if session_id else "" params = [session_id] if session_id else [] query = f""" SELECT DISTINCT metadata->>'document_type' as document_type, DISTINCT metadata->>'author' as author, DISTINCT metadata->>'department' as department, DISTINCT metadata->>'security_level' as security_level FROM documents {where_clause} WHERE metadata IS NOT NULL """ with self.db.get_connection() as conn: with conn.cursor() as cur: cur.execute(query, params) # Agregar valores filters = { 'document_types': set(), 'authors': set(), 'departments': set(), 'security_levels': set(), 'tags': set() } # Query para tags (JSONB array) tags_query = f""" SELECT DISTINCT jsonb_array_elements_text(metadata->'tags') as tag FROM documents {where_clause} WHERE metadata->'tags' IS NOT NULL """ cur.execute(tags_query, params) for row in cur.fetchall(): if row[0]: filters['tags'].add(row[0]) # Query para outros campos for field in ['document_type', 'author', 'department', 'security_level']: field_query = f""" SELECT DISTINCT metadata->>'{field}' as value FROM documents {where_clause} WHERE metadata->>'{field}' IS NOT NULL """ cur.execute(field_query, params) key = f"{field}s" for row in cur.fetchall(): if row[0]: filters[key].add(row[0]) # Converter sets para listas ordenadas return {k: sorted(list(v)) for k, v in filters.items()} def get_documents_count_by_metadata( self, session_id: Optional[str] = None ) -> Dict[str, int]: """ Retorna contagem de documentos por metadata. Args: session_id: ID da sessao Returns: Dicionario com contagens """ where_clause = "WHERE session_id = %s" if session_id else "" params = [session_id] if session_id else [] stats = {} with self.db.get_connection() as conn: with conn.cursor() as cur: # Count por document_type query = f""" SELECT metadata->>'document_type' as type, COUNT(*) as count FROM documents {where_clause} GROUP BY metadata->>'document_type' """ cur.execute(query, params) stats['by_type'] = {row[0]: row[1] for row in cur.fetchall() if row[0]} # Count por security_level query = f""" SELECT metadata->>'security_level' as level, COUNT(*) as count FROM documents {where_clause} GROUP BY metadata->>'security_level' """ cur.execute(query, params) stats['by_security'] = {row[0]: row[1] for row in cur.fetchall() if row[0]} # Count total query = f""" SELECT COUNT(*) FROM documents {where_clause} """ cur.execute(query, params) stats['total'] = cur.fetchone()[0] return stats