Spaces:
Running
Running
| import os | |
| import gzip | |
| import logging | |
| from pathlib import Path | |
| from typing import List, Any | |
| import config | |
| logger = logging.getLogger(__name__) | |
| class ValidationError(Exception): | |
| """Custom exception for ingestion validation errors.""" | |
| pass | |
| def validate_uploads(uploaded_files: List[Any]) -> None: | |
| """ | |
| Validate basic constraints: file count, individual size, total size, and extensions. | |
| """ | |
| if not uploaded_files: | |
| return | |
| # 1. File count check | |
| if len(uploaded_files) > config.MAX_UPLOAD_FILES: | |
| raise ValidationError( | |
| f"Too many files uploaded. Maximum allowed is {config.MAX_UPLOAD_FILES}." | |
| ) | |
| total_size_bytes = 0 | |
| max_file_size_bytes = config.MAX_UPLOAD_FILE_SIZE_MB * 1024 * 1024 | |
| max_total_size_bytes = config.MAX_UPLOAD_TOTAL_SIZE_MB * 1024 * 1024 | |
| for f in uploaded_files: | |
| # Gradio 'filepath' type returns a path string or a file object with a .name attribute | |
| file_path = getattr(f, "name", None) or str(f) | |
| if not os.path.exists(file_path): | |
| continue | |
| file_size = os.path.getsize(file_path) | |
| # 2. Individual file size check | |
| if file_size > max_file_size_bytes: | |
| filename = Path(file_path).name | |
| raise ValidationError( | |
| f"File '{filename}' exceeds maximum size of {config.MAX_UPLOAD_FILE_SIZE_MB}MB." | |
| ) | |
| total_size_bytes += file_size | |
| # 3. Extension check | |
| filename_lower = Path(file_path).name.lower() | |
| allowed = config.ALLOWED_UPLOAD_EXTENSIONS | |
| if not any(filename_lower.endswith(ext.lower()) for ext in allowed): | |
| raise ValidationError( | |
| f"File '{Path(file_path).name}' has an unsupported extension. Allowed: {', '.join(allowed)}" | |
| ) | |
| # 4. Total size check | |
| if total_size_bytes > max_total_size_bytes: | |
| raise ValidationError(f"Total upload size exceeds {config.MAX_UPLOAD_TOTAL_SIZE_MB}MB.") | |
| def safe_gzip_decompress(src_path: str, dest_path: str) -> None: | |
| """ | |
| Decompress a gzip file with size limits to prevent zip bombs. | |
| """ | |
| max_bytes = config.MAX_GZIP_DECOMPRESSED_SIZE_MB * 1024 * 1024 | |
| current_bytes = 0 | |
| try: | |
| with gzip.open(src_path, "rb") as f_in: | |
| with open(dest_path, "wb") as f_out: | |
| while True: | |
| chunk = f_in.read(1024 * 1024) # 1MB chunks | |
| if not chunk: | |
| break | |
| current_bytes += len(chunk) | |
| if current_bytes > max_bytes: | |
| raise ValidationError( | |
| f"Decompressed file size exceeds {config.MAX_GZIP_DECOMPRESSED_SIZE_MB}MB safety limit." | |
| ) | |
| f_out.write(chunk) | |
| except gzip.BadGzipFile: | |
| raise ValidationError("Invalid gzip file.") | |
| except Exception as e: | |
| if isinstance(e, ValidationError): | |
| raise | |
| logger.error(f"Gzip decompression failed: {e}") | |
| raise ValidationError(f"Failed to decompress file: {str(e)}") | |
| def is_xml_heuristic(file_path: str) -> bool: | |
| """ | |
| Quick heuristic check if the file starts with '<' (XML-like). | |
| Reads the first 1KB. | |
| """ | |
| try: | |
| with open(file_path, "rb") as f: | |
| header = f.read(1024).strip() | |
| if not header: | |
| return False | |
| # Look for common XML starters | |
| return header.startswith(b"<") | |
| except Exception as e: | |
| logger.error(f"XML heuristic check failed: {e}") | |
| return False | |