import os import gzip import logging from pathlib import Path from typing import List, Any import config logger = logging.getLogger(__name__) class ValidationError(Exception): """Custom exception for ingestion validation errors.""" pass def validate_uploads(uploaded_files: List[Any]) -> None: """ Validate basic constraints: file count, individual size, total size, and extensions. """ if not uploaded_files: return # 1. File count check if len(uploaded_files) > config.MAX_UPLOAD_FILES: raise ValidationError( f"Too many files uploaded. Maximum allowed is {config.MAX_UPLOAD_FILES}." ) total_size_bytes = 0 max_file_size_bytes = config.MAX_UPLOAD_FILE_SIZE_MB * 1024 * 1024 max_total_size_bytes = config.MAX_UPLOAD_TOTAL_SIZE_MB * 1024 * 1024 for f in uploaded_files: # Gradio 'filepath' type returns a path string or a file object with a .name attribute file_path = getattr(f, "name", None) or str(f) if not os.path.exists(file_path): continue file_size = os.path.getsize(file_path) # 2. Individual file size check if file_size > max_file_size_bytes: filename = Path(file_path).name raise ValidationError( f"File '{filename}' exceeds maximum size of {config.MAX_UPLOAD_FILE_SIZE_MB}MB." ) total_size_bytes += file_size # 3. Extension check filename_lower = Path(file_path).name.lower() allowed = config.ALLOWED_UPLOAD_EXTENSIONS if not any(filename_lower.endswith(ext.lower()) for ext in allowed): raise ValidationError( f"File '{Path(file_path).name}' has an unsupported extension. Allowed: {', '.join(allowed)}" ) # 4. Total size check if total_size_bytes > max_total_size_bytes: raise ValidationError(f"Total upload size exceeds {config.MAX_UPLOAD_TOTAL_SIZE_MB}MB.") def safe_gzip_decompress(src_path: str, dest_path: str) -> None: """ Decompress a gzip file with size limits to prevent zip bombs. """ max_bytes = config.MAX_GZIP_DECOMPRESSED_SIZE_MB * 1024 * 1024 current_bytes = 0 try: with gzip.open(src_path, "rb") as f_in: with open(dest_path, "wb") as f_out: while True: chunk = f_in.read(1024 * 1024) # 1MB chunks if not chunk: break current_bytes += len(chunk) if current_bytes > max_bytes: raise ValidationError( f"Decompressed file size exceeds {config.MAX_GZIP_DECOMPRESSED_SIZE_MB}MB safety limit." ) f_out.write(chunk) except gzip.BadGzipFile: raise ValidationError("Invalid gzip file.") except Exception as e: if isinstance(e, ValidationError): raise logger.error(f"Gzip decompression failed: {e}") raise ValidationError(f"Failed to decompress file: {str(e)}") def is_xml_heuristic(file_path: str) -> bool: """ Quick heuristic check if the file starts with '<' (XML-like). Reads the first 1KB. """ try: with open(file_path, "rb") as f: header = f.read(1024).strip() if not header: return False # Look for common XML starters return header.startswith(b"<") except Exception as e: logger.error(f"XML heuristic check failed: {e}") return False