Spaces:
Running
Running
File size: 3,600 Bytes
d64fd55 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 | import os
import gzip
import logging
from pathlib import Path
from typing import List, Any
import config
logger = logging.getLogger(__name__)
class ValidationError(Exception):
"""Custom exception for ingestion validation errors."""
pass
def validate_uploads(uploaded_files: List[Any]) -> None:
"""
Validate basic constraints: file count, individual size, total size, and extensions.
"""
if not uploaded_files:
return
# 1. File count check
if len(uploaded_files) > config.MAX_UPLOAD_FILES:
raise ValidationError(
f"Too many files uploaded. Maximum allowed is {config.MAX_UPLOAD_FILES}."
)
total_size_bytes = 0
max_file_size_bytes = config.MAX_UPLOAD_FILE_SIZE_MB * 1024 * 1024
max_total_size_bytes = config.MAX_UPLOAD_TOTAL_SIZE_MB * 1024 * 1024
for f in uploaded_files:
# Gradio 'filepath' type returns a path string or a file object with a .name attribute
file_path = getattr(f, "name", None) or str(f)
if not os.path.exists(file_path):
continue
file_size = os.path.getsize(file_path)
# 2. Individual file size check
if file_size > max_file_size_bytes:
filename = Path(file_path).name
raise ValidationError(
f"File '{filename}' exceeds maximum size of {config.MAX_UPLOAD_FILE_SIZE_MB}MB."
)
total_size_bytes += file_size
# 3. Extension check
filename_lower = Path(file_path).name.lower()
allowed = config.ALLOWED_UPLOAD_EXTENSIONS
if not any(filename_lower.endswith(ext.lower()) for ext in allowed):
raise ValidationError(
f"File '{Path(file_path).name}' has an unsupported extension. Allowed: {', '.join(allowed)}"
)
# 4. Total size check
if total_size_bytes > max_total_size_bytes:
raise ValidationError(f"Total upload size exceeds {config.MAX_UPLOAD_TOTAL_SIZE_MB}MB.")
def safe_gzip_decompress(src_path: str, dest_path: str) -> None:
"""
Decompress a gzip file with size limits to prevent zip bombs.
"""
max_bytes = config.MAX_GZIP_DECOMPRESSED_SIZE_MB * 1024 * 1024
current_bytes = 0
try:
with gzip.open(src_path, "rb") as f_in:
with open(dest_path, "wb") as f_out:
while True:
chunk = f_in.read(1024 * 1024) # 1MB chunks
if not chunk:
break
current_bytes += len(chunk)
if current_bytes > max_bytes:
raise ValidationError(
f"Decompressed file size exceeds {config.MAX_GZIP_DECOMPRESSED_SIZE_MB}MB safety limit."
)
f_out.write(chunk)
except gzip.BadGzipFile:
raise ValidationError("Invalid gzip file.")
except Exception as e:
if isinstance(e, ValidationError):
raise
logger.error(f"Gzip decompression failed: {e}")
raise ValidationError(f"Failed to decompress file: {str(e)}")
def is_xml_heuristic(file_path: str) -> bool:
"""
Quick heuristic check if the file starts with '<' (XML-like).
Reads the first 1KB.
"""
try:
with open(file_path, "rb") as f:
header = f.read(1024).strip()
if not header:
return False
# Look for common XML starters
return header.startswith(b"<")
except Exception as e:
logger.error(f"XML heuristic check failed: {e}")
return False
|