Spaces:
Sleeping
Sleeping
| """ | |
| Sistema de gerenciamento de metadados para documentos. | |
| Permite filtrar documentos por tipo, tags, autor, data, etc. | |
| """ | |
| from typing import Dict, List, Optional, Any | |
| from datetime import datetime | |
| import json | |
| from dataclasses import dataclass, asdict | |
| class DocumentMetadata: | |
| """Schema de metadados de documento.""" | |
| document_type: Optional[str] = None # PDF, TXT, MD, etc | |
| upload_date: Optional[datetime] = None | |
| tags: Optional[List[str]] = None | |
| author: Optional[str] = None | |
| language: Optional[str] = None | |
| department: Optional[str] = None # Para enterprise | |
| security_level: Optional[str] = None # public, internal, confidential | |
| custom: Optional[Dict[str, Any]] = None # Campos customizados | |
| def to_dict(self) -> Dict[str, Any]: | |
| """Converte para dicionario, lidando com datetime.""" | |
| data = asdict(self) | |
| if self.upload_date: | |
| data['upload_date'] = self.upload_date.isoformat() | |
| # Remove campos None | |
| return {k: v for k, v in data.items() if v is not None} | |
| def from_dict(cls, data: Dict[str, Any]) -> 'DocumentMetadata': | |
| """Cria a partir de dicionario.""" | |
| if 'upload_date' in data and isinstance(data['upload_date'], str): | |
| data['upload_date'] = datetime.fromisoformat(data['upload_date']) | |
| return cls(**data) | |
| def to_json(self) -> str: | |
| """Converte para JSON.""" | |
| return json.dumps(self.to_dict()) | |
| def from_json(cls, json_str: str) -> 'DocumentMetadata': | |
| """Cria a partir de JSON.""" | |
| return cls.from_dict(json.loads(json_str)) | |
| class MetadataManager: | |
| """Gerenciador de metadados de documentos.""" | |
| VALID_DOCUMENT_TYPES = ['PDF', 'TXT', 'MD', 'HTML', 'DOCX', 'CSV', 'JSON'] | |
| VALID_SECURITY_LEVELS = ['public', 'internal', 'confidential', 'restricted'] | |
| def __init__(self, db_manager): | |
| """ | |
| Inicializa gerenciador. | |
| Args: | |
| db_manager: Instancia de DatabaseManager | |
| """ | |
| self.db = db_manager | |
| def validate_metadata(self, metadata: DocumentMetadata) -> bool: | |
| """ | |
| Valida schema de metadata. | |
| Args: | |
| metadata: Metadados a validar | |
| Returns: | |
| True se valido | |
| Raises: | |
| ValueError: Se metadata invalido | |
| """ | |
| # Validar document_type | |
| if metadata.document_type and metadata.document_type.upper() not in self.VALID_DOCUMENT_TYPES: | |
| raise ValueError( | |
| f"document_type invalido: {metadata.document_type}. " | |
| f"Validos: {', '.join(self.VALID_DOCUMENT_TYPES)}" | |
| ) | |
| # Validar security_level | |
| if metadata.security_level and metadata.security_level not in self.VALID_SECURITY_LEVELS: | |
| raise ValueError( | |
| f"security_level invalido: {metadata.security_level}. " | |
| f"Validos: {', '.join(self.VALID_SECURITY_LEVELS)}" | |
| ) | |
| # Validar tags | |
| if metadata.tags: | |
| if not isinstance(metadata.tags, list): | |
| raise ValueError("tags deve ser uma lista") | |
| if not all(isinstance(tag, str) for tag in metadata.tags): | |
| raise ValueError("Todas as tags devem ser strings") | |
| return True | |
| def update_document_metadata( | |
| self, | |
| document_id: int, | |
| metadata: DocumentMetadata | |
| ) -> bool: | |
| """ | |
| Atualiza metadata de um documento. | |
| Args: | |
| document_id: ID do documento | |
| metadata: Novos metadados | |
| Returns: | |
| True se atualizado com sucesso | |
| """ | |
| # Validar metadata | |
| self.validate_metadata(metadata) | |
| # Converter para JSON | |
| metadata_json = metadata.to_json() | |
| # Atualizar no banco | |
| query = """ | |
| UPDATE documents | |
| SET metadata = %s::jsonb | |
| WHERE id = %s | |
| """ | |
| with self.db.get_connection() as conn: | |
| with conn.cursor() as cur: | |
| cur.execute(query, (metadata_json, document_id)) | |
| conn.commit() | |
| return cur.rowcount > 0 | |
| def get_document_metadata(self, document_id: int) -> Optional[DocumentMetadata]: | |
| """ | |
| Recupera metadata de um documento. | |
| Args: | |
| document_id: ID do documento | |
| Returns: | |
| DocumentMetadata ou None | |
| """ | |
| query = """ | |
| SELECT metadata | |
| FROM documents | |
| WHERE id = %s | |
| """ | |
| with self.db.get_connection() as conn: | |
| with conn.cursor() as cur: | |
| cur.execute(query, (document_id,)) | |
| result = cur.fetchone() | |
| if result and result[0]: | |
| return DocumentMetadata.from_dict(result[0]) | |
| return None | |
| def search_with_filters( | |
| self, | |
| query_embedding: List[float], | |
| filters: Optional[Dict[str, Any]] = None, | |
| top_k: int = 5, | |
| session_id: Optional[str] = None | |
| ) -> List[Dict[str, Any]]: | |
| """ | |
| Busca vetorial com filtros de metadata. | |
| Args: | |
| query_embedding: Embedding da query | |
| filters: Filtros a aplicar (ex: {"document_type": "PDF", "tags": ["tech"]}) | |
| top_k: Numero de resultados | |
| session_id: ID da sessao | |
| Returns: | |
| Lista de documentos com scores | |
| """ | |
| # Construir WHERE clause baseado em filtros | |
| where_clauses = [] | |
| params = [query_embedding, top_k] | |
| if session_id: | |
| where_clauses.append("session_id = %s") | |
| params.insert(0, session_id) | |
| if filters: | |
| # Filtro por document_type | |
| if 'document_type' in filters: | |
| where_clauses.append("metadata->>'document_type' = %s") | |
| params.insert(-1, filters['document_type']) | |
| # Filtro por tags (qualquer tag no array) | |
| if 'tags' in filters: | |
| tags = filters['tags'] if isinstance(filters['tags'], list) else [filters['tags']] | |
| where_clauses.append("metadata->'tags' ?| %s") | |
| params.insert(-1, tags) | |
| # Filtro por author | |
| if 'author' in filters: | |
| where_clauses.append("metadata->>'author' = %s") | |
| params.insert(-1, filters['author']) | |
| # Filtro por security_level | |
| if 'security_level' in filters: | |
| where_clauses.append("metadata->>'security_level' = %s") | |
| params.insert(-1, filters['security_level']) | |
| # Filtro por department | |
| if 'department' in filters: | |
| where_clauses.append("metadata->>'department' = %s") | |
| params.insert(-1, filters['department']) | |
| # Filtro por data (upload_date maior que) | |
| if 'upload_date_from' in filters: | |
| where_clauses.append("(metadata->>'upload_date')::timestamp >= %s") | |
| params.insert(-1, filters['upload_date_from']) | |
| # Filtro por data (upload_date menor que) | |
| if 'upload_date_to' in filters: | |
| where_clauses.append("(metadata->>'upload_date')::timestamp <= %s") | |
| params.insert(-1, filters['upload_date_to']) | |
| # Montar WHERE clause | |
| where_sql = " AND ".join(where_clauses) if where_clauses else "1=1" | |
| # Query com filtros | |
| query = f""" | |
| SELECT | |
| id, | |
| title, | |
| content, | |
| source, | |
| metadata, | |
| 1 - (embedding <=> %s::vector) AS similarity | |
| FROM documents | |
| WHERE {where_sql} | |
| ORDER BY embedding <=> %s::vector | |
| LIMIT %s | |
| """ | |
| with self.db.get_connection() as conn: | |
| with conn.cursor() as cur: | |
| cur.execute(query, params) | |
| results = cur.fetchall() | |
| documents = [] | |
| for row in results: | |
| doc = { | |
| 'id': row[0], | |
| 'title': row[1], | |
| 'content': row[2], | |
| 'source': row[3], | |
| 'metadata': row[4], | |
| 'similarity': float(row[5]) | |
| } | |
| documents.append(doc) | |
| return documents | |
| def get_available_filters(self, session_id: Optional[str] = None) -> Dict[str, List[str]]: | |
| """ | |
| Retorna valores disponiveis para cada filtro. | |
| Args: | |
| session_id: ID da sessao (opcional) | |
| Returns: | |
| Dicionario com valores unicos por campo | |
| """ | |
| where_clause = "WHERE session_id = %s" if session_id else "" | |
| params = [session_id] if session_id else [] | |
| query = f""" | |
| SELECT | |
| DISTINCT metadata->>'document_type' as document_type, | |
| DISTINCT metadata->>'author' as author, | |
| DISTINCT metadata->>'department' as department, | |
| DISTINCT metadata->>'security_level' as security_level | |
| FROM documents | |
| {where_clause} | |
| WHERE metadata IS NOT NULL | |
| """ | |
| with self.db.get_connection() as conn: | |
| with conn.cursor() as cur: | |
| cur.execute(query, params) | |
| # Agregar valores | |
| filters = { | |
| 'document_types': set(), | |
| 'authors': set(), | |
| 'departments': set(), | |
| 'security_levels': set(), | |
| 'tags': set() | |
| } | |
| # Query para tags (JSONB array) | |
| tags_query = f""" | |
| SELECT DISTINCT jsonb_array_elements_text(metadata->'tags') as tag | |
| FROM documents | |
| {where_clause} | |
| WHERE metadata->'tags' IS NOT NULL | |
| """ | |
| cur.execute(tags_query, params) | |
| for row in cur.fetchall(): | |
| if row[0]: | |
| filters['tags'].add(row[0]) | |
| # Query para outros campos | |
| for field in ['document_type', 'author', 'department', 'security_level']: | |
| field_query = f""" | |
| SELECT DISTINCT metadata->>'{field}' as value | |
| FROM documents | |
| {where_clause} | |
| WHERE metadata->>'{field}' IS NOT NULL | |
| """ | |
| cur.execute(field_query, params) | |
| key = f"{field}s" | |
| for row in cur.fetchall(): | |
| if row[0]: | |
| filters[key].add(row[0]) | |
| # Converter sets para listas ordenadas | |
| return {k: sorted(list(v)) for k, v in filters.items()} | |
| def get_documents_count_by_metadata( | |
| self, | |
| session_id: Optional[str] = None | |
| ) -> Dict[str, int]: | |
| """ | |
| Retorna contagem de documentos por metadata. | |
| Args: | |
| session_id: ID da sessao | |
| Returns: | |
| Dicionario com contagens | |
| """ | |
| where_clause = "WHERE session_id = %s" if session_id else "" | |
| params = [session_id] if session_id else [] | |
| stats = {} | |
| with self.db.get_connection() as conn: | |
| with conn.cursor() as cur: | |
| # Count por document_type | |
| query = f""" | |
| SELECT | |
| metadata->>'document_type' as type, | |
| COUNT(*) as count | |
| FROM documents | |
| {where_clause} | |
| GROUP BY metadata->>'document_type' | |
| """ | |
| cur.execute(query, params) | |
| stats['by_type'] = {row[0]: row[1] for row in cur.fetchall() if row[0]} | |
| # Count por security_level | |
| query = f""" | |
| SELECT | |
| metadata->>'security_level' as level, | |
| COUNT(*) as count | |
| FROM documents | |
| {where_clause} | |
| GROUP BY metadata->>'security_level' | |
| """ | |
| cur.execute(query, params) | |
| stats['by_security'] = {row[0]: row[1] for row in cur.fetchall() if row[0]} | |
| # Count total | |
| query = f""" | |
| SELECT COUNT(*) | |
| FROM documents | |
| {where_clause} | |
| """ | |
| cur.execute(query, params) | |
| stats['total'] = cur.fetchone()[0] | |
| return stats | |