""" ๐Ÿ“ฆ Document Parser โ€” Production-Grade ZIP Document Extraction Tool Features: - Upload ZIP files and parse all supported document formats - Supports 40+ text/code formats, PDF, DOCX, XLSX - Zip bomb protection (decompression ratio + size limits) - Per-file error isolation โ€” one corrupt file won't crash the whole parse - Progress bars for real-time feedback - Concurrency-limited to prevent resource exhaustion - Full structured JSON export + file detail drill-down """ from __future__ import annotations import io import logging import os import traceback import zipfile from dataclasses import dataclass, field from enum import Enum from typing import Optional import gradio as gr # โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ # Configuration constants # โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ MAX_ZIP_SIZE_MB = 200 MAX_FILES_IN_ZIP = 500 MAX_SINGLE_FILE_MB = 50 MAX_DECOMPRESSION_RATIO = 100 # zip bomb guard: reject if total > ratio ร— compressed MAX_PREVIEW_CHARS = 5_000 MAX_FULL_TEXT_CHARS = 500_000 MAX_XLSX_ROWS = 100 CONCURRENCY_LIMIT = 3 # โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ # Logging # โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ logger = logging.getLogger("document_parser") logging.basicConfig( level=logging.INFO, format="%(asctime)s | %(levelname)s | %(message)s", ) # โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ # File classification # โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ class FileCategory(str, Enum): TEXT = "text" PDF = "pdf" DOCX = "docx" XLSX = "xlsx" IMAGE = "image" BINARY = "binary" TEXT_EXTENSIONS = frozenset({ ".txt", ".md", ".rst", ".py", ".js", ".ts", ".jsx", ".tsx", ".html", ".htm", ".css", ".scss", ".less", ".json", ".jsonl", ".yaml", ".yml", ".csv", ".tsv", ".xml", ".toml", ".cfg", ".ini", ".conf", ".properties", ".sh", ".bash", ".zsh", ".fish", ".bat", ".ps1", ".cmd", ".r", ".rmd", ".java", ".c", ".cpp", ".h", ".hpp", ".cc", ".cxx", ".go", ".rs", ".rb", ".php", ".swift", ".kt", ".kts", ".scala", ".clj", ".sql", ".graphql", ".gql", ".proto", ".thrift", ".dockerfile", ".makefile", ".cmake", ".gitignore", ".gitattributes", ".dockerignore", ".editorconfig", ".env", ".env.example", ".log", ".tex", ".bib", ".sty", ".lua", ".vim", ".el", ".lisp", ".hs", ".ml", ".mli", ".ex", ".exs", ".erl", ".hrl", ".dart", ".v", ".sv", ".vhd", ".vhdl", ".tf", ".tfvars", ".hcl", ".nix", ".dhall", ".ipynb", }) KNOWN_TEXT_FILENAMES = frozenset({ "Makefile", "Dockerfile", "Procfile", "Vagrantfile", "Gemfile", "Rakefile", "Brewfile", "Justfile", "Taskfile", ".gitignore", ".gitattributes", ".dockerignore", ".editorconfig", ".eslintrc", ".prettierrc", ".babelrc", ".browserslistrc", "LICENSE", "LICENCE", "COPYING", "AUTHORS", "CONTRIBUTORS", "CHANGELOG", "CHANGES", "HISTORY", "NEWS", "README", "INSTALL", "TODO", "HACKING", "requirements.txt", }) IMAGE_EXTENSIONS = frozenset({ ".png", ".jpg", ".jpeg", ".gif", ".bmp", ".svg", ".webp", ".ico", ".tiff", ".tif", ".avif", ".heic", ".heif", }) CATEGORY_EMOJI = { FileCategory.TEXT: "๐Ÿ“„", FileCategory.PDF: "๐Ÿ“•", FileCategory.DOCX: "๐Ÿ“˜", FileCategory.XLSX: "๐Ÿ“Š", FileCategory.IMAGE: "๐Ÿ–ผ๏ธ", FileCategory.BINARY: "๐Ÿ“ฆ", } def classify_file(filename: str) -> tuple[FileCategory, str]: """Classify a file by its extension and known filename patterns.""" basename = filename.rsplit("/", 1)[-1] if "/" in filename else filename ext = os.path.splitext(basename)[1].lower() if not ext and basename in KNOWN_TEXT_FILENAMES: return FileCategory.TEXT, "" if not ext and basename.upper() in {n.upper() for n in KNOWN_TEXT_FILENAMES}: return FileCategory.TEXT, "" if ext in TEXT_EXTENSIONS: return FileCategory.TEXT, ext if ext == ".pdf": return FileCategory.PDF, ext if ext == ".docx": return FileCategory.DOCX, ext if ext in {".xlsx", ".xls"}: return FileCategory.XLSX, ext if ext in IMAGE_EXTENSIONS: return FileCategory.IMAGE, ext return FileCategory.BINARY, ext # โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ # Data classes # โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ @dataclass class ParsedFile: filename: str category: str extension: str size_bytes: int size_display: str content: str = "" preview: str = "" error: Optional[str] = None warnings: list[str] = field(default_factory=list) def to_table_row(self) -> list: status = "โš ๏ธ" if self.warnings else ("โŒ" if self.error else "โœ…") preview_text = self.error or self.preview[:200].replace("\n", " ") return [ status, self.filename, self.extension or "(none)", self.category, self.size_display, preview_text, ] @dataclass class ParseStats: total_files: int = 0 parsed_ok: int = 0 parse_warnings: int = 0 parse_errors: int = 0 skipped_dirs: int = 0 total_compressed_bytes: int = 0 total_uncompressed_bytes: int = 0 by_category: dict = field(default_factory=lambda: {c.value: 0 for c in FileCategory}) # โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ # Size formatting # โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ def format_size(size_bytes: int) -> str: if size_bytes < 0: return "0 B" if size_bytes < 1024: return f"{size_bytes} B" elif size_bytes < 1024 ** 2: return f"{size_bytes / 1024:.1f} KB" elif size_bytes < 1024 ** 3: return f"{size_bytes / (1024 ** 2):.1f} MB" else: return f"{size_bytes / (1024 ** 3):.2f} GB" # โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ # Document parsers โ€” each returns (content, warnings) or raises # โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ def parse_text_content(data: bytes, filename: str) -> tuple[str, list[str]]: """Parse plain text / code files.""" warnings = [] try: content = data.decode("utf-8") except UnicodeDecodeError: try: content = data.decode("latin-1") warnings.append("Decoded with latin-1 fallback (not valid UTF-8)") except Exception: content = data.decode("utf-8", errors="replace") warnings.append("Contains invalid bytes; replaced with placeholders") if len(content) > MAX_FULL_TEXT_CHARS: warnings.append(f"Content truncated to {MAX_FULL_TEXT_CHARS:,} characters (original: {len(content):,})") content = content[:MAX_FULL_TEXT_CHARS] + "\n\n... [TRUNCATED]" return content, warnings def parse_pdf_content(data: bytes, filename: str) -> tuple[str, list[str]]: """Parse PDF bytes to text using PyMuPDF.""" warnings = [] try: import fitz except ImportError: return "[PDF library not available]", ["PyMuPDF not installed โ€” install with: pip install PyMuPDF"] doc = None try: doc = fitz.open(stream=data, filetype="pdf") if doc.is_encrypted: return "", ["PDF is password-protected and cannot be parsed"] page_count = len(doc) if page_count == 0: return "", ["PDF has 0 pages"] text_parts = [] empty_pages = 0 for page_num in range(page_count): try: page = doc[page_num] page_text = page.get_text().strip() if page_text: text_parts.append(f"\n--- Page {page_num + 1}/{page_count} ---\n{page_text}") else: empty_pages += 1 except Exception as e: warnings.append(f"Page {page_num + 1} failed: {type(e).__name__}: {e}") if empty_pages > 0: warnings.append(f"{empty_pages}/{page_count} pages had no extractable text (may be scanned/image-based)") content = "\n".join(text_parts) if text_parts else "[No extractable text found]" if not text_parts and empty_pages == page_count: warnings.append("PDF appears to be entirely image-based; OCR would be needed to extract text") return content, warnings except Exception as e: logger.error(f"PDF parse error for {filename}: {e}") return "", [f"PDF parse failed: {type(e).__name__}: {e}"] finally: if doc: try: doc.close() except Exception: pass def parse_docx_content(data: bytes, filename: str) -> tuple[str, list[str]]: """Parse DOCX bytes to text.""" warnings = [] try: from docx import Document except ImportError: return "[DOCX library not available]", ["python-docx not installed"] try: doc = Document(io.BytesIO(data)) parts = [] paragraphs = [p.text for p in doc.paragraphs if p.text.strip()] if paragraphs: parts.extend(paragraphs) for i, table in enumerate(doc.tables): try: table_text = f"\n--- Table {i + 1} ---\n" for row in table.rows: cells = [cell.text.strip() for cell in row.cells] table_text += " | ".join(cells) + "\n" parts.append(table_text) except Exception as e: warnings.append(f"Table {i + 1} extraction failed: {e}") content = "\n".join(parts) if parts else "[DOCX: empty document]" if not parts: warnings.append("Document contains no paragraphs or tables") return content, warnings except Exception as e: logger.error(f"DOCX parse error for {filename}: {e}") return "", [f"DOCX parse failed: {type(e).__name__}: {e}"] def parse_xlsx_content(data: bytes, filename: str) -> tuple[str, list[str]]: """Parse XLSX bytes to text summary.""" warnings = [] try: import openpyxl except ImportError: return "[XLSX library not available]", ["openpyxl not installed"] wb = None try: wb = openpyxl.load_workbook(io.BytesIO(data), read_only=True, data_only=True) parts = [] for sheet_name in wb.sheetnames: try: ws = wb[sheet_name] sheet_text = f"\n--- Sheet: {sheet_name} ---\n" row_count = 0 for row in ws.iter_rows(values_only=True): if row_count >= MAX_XLSX_ROWS: sheet_text += f"\n... (truncated at {MAX_XLSX_ROWS} rows)\n" warnings.append(f"Sheet '{sheet_name}' truncated at {MAX_XLSX_ROWS} rows") break cells = [str(cell) if cell is not None else "" for cell in row] sheet_text += " | ".join(cells) + "\n" row_count += 1 if row_count == 0: sheet_text += "(empty sheet)\n" parts.append(sheet_text) except Exception as e: warnings.append(f"Sheet '{sheet_name}' failed: {type(e).__name__}: {e}") content = "\n".join(parts) if parts else "[XLSX: empty workbook]" return content, warnings except Exception as e: logger.error(f"XLSX parse error for {filename}: {e}") return "", [f"XLSX parse failed: {type(e).__name__}: {e}"] finally: if wb: try: wb.close() except Exception: pass # โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ # Validation layer # โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ def validate_upload(file_path: str | None) -> str: """Validate the uploaded file. Returns the resolved file path. Raises gr.Error on failure.""" if file_path is None: raise gr.Error("โš ๏ธ Please upload a ZIP file first.") if not os.path.isfile(file_path): raise gr.Error("โŒ Upload failed โ€” file not found on server. Please try again.") file_size = os.path.getsize(file_path) if file_size == 0: raise gr.Error("โŒ The uploaded file is empty (0 bytes).") size_mb = file_size / (1024 ** 2) if size_mb > MAX_ZIP_SIZE_MB: raise gr.Error( f"โŒ File too large: {size_mb:.1f} MB. " f"Maximum allowed is {MAX_ZIP_SIZE_MB} MB." ) if not zipfile.is_zipfile(file_path): raise gr.Error( "โŒ Not a valid ZIP archive. The file may be corrupted, " "or it may be a different archive format (tar, rar, 7z)." ) return file_path def check_zip_bomb(zf: zipfile.ZipFile, compressed_size: int) -> list[str]: """Check for zip bomb indicators. Returns warnings. Raises gr.Error if malicious.""" warnings = [] total_uncompressed = sum(info.file_size for info in zf.infolist() if not info.is_dir()) if compressed_size > 0: ratio = total_uncompressed / compressed_size if ratio > MAX_DECOMPRESSION_RATIO: raise gr.Error( f"๐Ÿ›ก๏ธ Zip bomb detected! Decompression ratio is {ratio:.0f}x " f"(compressed: {format_size(compressed_size)}, " f"uncompressed: {format_size(total_uncompressed)}). " f"Maximum allowed ratio is {MAX_DECOMPRESSION_RATIO}x." ) if ratio > MAX_DECOMPRESSION_RATIO / 2: warnings.append( f"High decompression ratio ({ratio:.0f}x) โ€” approaching the " f"{MAX_DECOMPRESSION_RATIO}x safety limit" ) total_uncompressed_mb = total_uncompressed / (1024 ** 2) if total_uncompressed_mb > MAX_ZIP_SIZE_MB * 5: raise gr.Error( f"๐Ÿ›ก๏ธ Uncompressed content too large: {total_uncompressed_mb:.0f} MB. " f"Maximum is {MAX_ZIP_SIZE_MB * 5} MB." ) return warnings # โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ # Core parsing engine # โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ def parse_zip(file_path: str, progress: gr.Progress) -> tuple[list[ParsedFile], ParseStats]: """Parse all files in a ZIP archive with per-file error isolation.""" file_size = os.path.getsize(file_path) stats = ParseStats() try: zf = zipfile.ZipFile(file_path, "r") except zipfile.BadZipFile: raise gr.Error("โŒ ZIP file is corrupted and cannot be opened.") except Exception as e: raise gr.Error(f"โŒ Failed to open ZIP: {type(e).__name__}: {e}") try: bomb_warnings = check_zip_bomb(zf, file_size) entries = [info for info in zf.infolist() if not info.is_dir()] stats.skipped_dirs = len(zf.infolist()) - len(entries) stats.total_files = len(entries) stats.total_compressed_bytes = file_size if stats.total_files == 0: raise gr.Error("โŒ ZIP archive contains no files (only directories).") truncated = False if stats.total_files > MAX_FILES_IN_ZIP: gr.Warning( f"ZIP contains {stats.total_files} files โ€” " f"processing first {MAX_FILES_IN_ZIP} only." ) entries = entries[:MAX_FILES_IN_ZIP] truncated = True parsed_files: list[ParsedFile] = [] for i, info in enumerate(progress.tqdm(entries, desc="Parsing documents")): category, ext = classify_file(info.filename) stats.by_category[category.value] += 1 stats.total_uncompressed_bytes += info.file_size pf = ParsedFile( filename=info.filename, category=category.value, extension=ext or "(none)", size_bytes=info.file_size, size_display=format_size(info.file_size), ) file_mb = info.file_size / (1024 ** 2) if file_mb > MAX_SINGLE_FILE_MB: pf.error = f"Skipped: file too large ({file_mb:.1f} MB > {MAX_SINGLE_FILE_MB} MB limit)" pf.warnings.append(pf.error) stats.parse_warnings += 1 parsed_files.append(pf) continue try: raw_data = zf.read(info) except RuntimeError as e: pf.error = f"Cannot read: {e}" if "password" in str(e).lower(): pf.error = "File is password-protected" stats.parse_errors += 1 parsed_files.append(pf) continue except Exception as e: pf.error = f"Read failed: {type(e).__name__}: {e}" stats.parse_errors += 1 parsed_files.append(pf) continue try: if category == FileCategory.TEXT: content, warnings = parse_text_content(raw_data, info.filename) elif category == FileCategory.PDF: content, warnings = parse_pdf_content(raw_data, info.filename) elif category == FileCategory.DOCX: content, warnings = parse_docx_content(raw_data, info.filename) elif category == FileCategory.XLSX: content, warnings = parse_xlsx_content(raw_data, info.filename) elif category == FileCategory.IMAGE: content = "" warnings = [] pf.preview = f"[Image: {ext}, {pf.size_display}]" else: content = "" warnings = [] pf.preview = f"[Binary: {ext}, {pf.size_display}]" pf.content = content pf.preview = content[:MAX_PREVIEW_CHARS] if content else pf.preview pf.warnings = warnings if warnings: stats.parse_warnings += 1 else: stats.parsed_ok += 1 except MemoryError: pf.error = "Out of memory while parsing this file" stats.parse_errors += 1 logger.error(f"MemoryError parsing {info.filename}") except Exception as e: pf.error = f"Parse failed: {type(e).__name__}: {e}" stats.parse_errors += 1 logger.error(f"Parse error for {info.filename}: {e}") traceback.print_exc() parsed_files.append(pf) if bomb_warnings: for w in bomb_warnings: gr.Warning(w) if truncated: stats.parse_warnings += 1 return parsed_files, stats finally: try: zf.close() except Exception: pass # โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ # Output formatters # โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ def build_summary(stats: ParseStats, parsed_files: list[ParsedFile]) -> str: """Build a rich markdown summary.""" alerts = [] if stats.parse_errors > 0: alerts.append(f"โš ๏ธ **{stats.parse_errors} file(s) failed to parse** โ€” see โŒ markers in the file listing") if stats.parse_warnings > 0: alerts.append(f"โ„น๏ธ **{stats.parse_warnings} file(s) had warnings** โ€” see โš ๏ธ markers in the file listing") alert_block = "\n".join(alerts) + "\n\n" if alerts else "" error_files = [pf for pf in parsed_files if pf.error] error_block = "" if error_files: error_lines = [] for pf in error_files[:10]: error_lines.append(f"- `{pf.filename}`: {pf.error}") if len(error_files) > 10: error_lines.append(f"- ... and {len(error_files) - 10} more") error_block = "\n### โŒ Failed Files\n" + "\n".join(error_lines) + "\n\n" return f"""## ๐Ÿ“ฆ ZIP Archive Summary {alert_block}| Metric | Value | |--------|-------| | **Total files** | {stats.total_files} | | **Parsed successfully** | {stats.parsed_ok} | | **With warnings** | {stats.parse_warnings} | | **Failed** | {stats.parse_errors} | | **Compressed size** | {format_size(stats.total_compressed_bytes)} | | **Uncompressed size** | {format_size(stats.total_uncompressed_bytes)} | | **Directories skipped** | {stats.skipped_dirs} | ### ๐Ÿ“Š File Types | Category | Count | |----------|-------| | Text/Code | {stats.by_category.get('text', 0)} | | PDF | {stats.by_category.get('pdf', 0)} | | DOCX | {stats.by_category.get('docx', 0)} | | XLSX | {stats.by_category.get('xlsx', 0)} | | Image | {stats.by_category.get('image', 0)} | | Binary | {stats.by_category.get('binary', 0)} | {error_block}""" def build_full_text(parsed_files: list[ParsedFile]) -> str: """Build concatenated text output from all parsed files.""" parts = [] for pf in parsed_files: if pf.content: emoji = CATEGORY_EMOJI.get(FileCategory(pf.category), "๐Ÿ“„") parts.append( f"\n{'=' * 70}\n" f"{emoji} {pf.filename}" f"{' โš ๏ธ ' + ', '.join(pf.warnings) if pf.warnings else ''}\n" f"{'=' * 70}\n" f"{pf.content}" ) elif pf.error: parts.append( f"\n{'=' * 70}\n" f"โŒ {pf.filename} โ€” ERROR: {pf.error}\n" f"{'=' * 70}" ) if not parts: return "(No text content was extracted from any file in the archive.)" full = "\n".join(parts) if len(full) > MAX_FULL_TEXT_CHARS: full = full[:MAX_FULL_TEXT_CHARS] + "\n\n... [OUTPUT TRUNCATED โ€” too large to display fully]" return full def build_json(parsed_files: list[ParsedFile]) -> list[dict]: """Build structured JSON output.""" output = [] for pf in parsed_files: entry = { "filename": pf.filename, "category": pf.category, "extension": pf.extension, "size_bytes": pf.size_bytes, "size_display": pf.size_display, "preview": pf.preview[:1000], "status": "error" if pf.error else ("warning" if pf.warnings else "ok"), } if pf.error: entry["error"] = pf.error if pf.warnings: entry["warnings"] = pf.warnings output.append(entry) return output def build_detail(file_data: list[dict], evt: gr.SelectData) -> str: """Build detail view when user clicks a table row.""" if not file_data or not isinstance(file_data, list): return "โ„น๏ธ Select a file from the **File Listing** tab to see its full preview here." try: row_idx = evt.index[0] if isinstance(evt.index, (list, tuple)) else evt.index except (TypeError, IndexError): return "โš ๏ธ Could not determine selected row. Please click a row in the file listing." if not (0 <= row_idx < len(file_data)): return f"โš ๏ธ Row index {row_idx} is out of range (0โ€“{len(file_data) - 1})." item = file_data[row_idx] header = f"## {CATEGORY_EMOJI.get(item.get('category', ''), '๐Ÿ“„')} {item['filename']}\n" meta = f"**Category:** {item.get('category', 'unknown')} | **Size:** {item.get('size_display', 'unknown')}\n\n" sections = [header, meta] if item.get("error"): sections.append(f"### โŒ Error\n```\n{item['error']}\n```\n") if item.get("warnings"): sections.append("### โš ๏ธ Warnings\n" + "\n".join(f"- {w}" for w in item["warnings"]) + "\n\n") preview = item.get("preview", "") if preview and not preview.startswith("["): ext = item.get("extension", "").lstrip(".") lang_map = { "py": "python", "js": "javascript", "ts": "typescript", "json": "json", "yaml": "yaml", "yml": "yaml", "html": "html", "htm": "html", "css": "css", "sql": "sql", "sh": "bash", "bash": "bash", "java": "java", "c": "c", "cpp": "cpp", "go": "go", "rs": "rust", "rb": "ruby", "php": "php", "xml": "xml", "md": "markdown", "toml": "toml", "csv": "csv", } lang = lang_map.get(ext, "") sections.append(f"### ๐Ÿ“ Content Preview\n```{lang}\n{preview}\n```") elif preview: sections.append(f"### ๐Ÿ“ Info\n{preview}") else: sections.append("*(No content to preview for this file type.)*") return "\n".join(sections) # โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ # Main entry point # โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ def run_parse(file_obj, progress=gr.Progress()): """Top-level handler: validate โ†’ parse โ†’ format outputs.""" try: file_path = file_obj if isinstance(file_obj, str) else getattr(file_obj, "name", None) progress(0.0, desc="Validating upload...") file_path = validate_upload(file_path) gr.Info(f"๐Ÿ“ฆ Processing ZIP file ({format_size(os.path.getsize(file_path))})...") parsed_files, stats = parse_zip(file_path, progress) progress(0.95, desc="Building output...") summary = build_summary(stats, parsed_files) table_rows = [pf.to_table_row() for pf in parsed_files] full_text = build_full_text(parsed_files) json_data = build_json(parsed_files) progress(1.0, desc="Done!") if stats.parse_errors > 0: gr.Warning(f"{stats.parse_errors} file(s) failed to parse. See details below.") elif stats.parse_warnings > 0: gr.Info(f"โœ… Parsed {stats.parsed_ok} files with {stats.parse_warnings} warning(s).") else: gr.Info(f"โœ… Successfully parsed all {stats.parsed_ok} files!") return summary, table_rows, full_text, json_data, json_data except gr.Error: raise except MemoryError: logger.error("MemoryError during ZIP processing") raise gr.Error( "๐Ÿ’ฅ Out of memory! The ZIP file contents are too large to process. " "Try a smaller archive or one with fewer/smaller files." ) except Exception as e: logger.error(f"Unexpected error: {type(e).__name__}: {e}") traceback.print_exc() raise gr.Error( f"๐Ÿ’ฅ An unexpected error occurred: {type(e).__name__}: {e}\n\n" "If this persists, please report it as a bug." ) # โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ # Gradio UI # โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ with gr.Blocks( title="๐Ÿ“ฆ Document Parser", ) as demo: gr.Markdown(""" # ๐Ÿ“ฆ Document Parser Upload a **ZIP file** and this tool extracts & parses text from every supported document inside it. **Supported formats:** `.txt`, `.md`, `.py`, `.js`, `.ts`, `.json`, `.yaml`, `.csv`, `.html`, `.xml`, `.pdf`, `.docx`, `.xlsx`, and **40+ more** text/code formats โ€” including `Makefile`, `Dockerfile`, `LICENSE`, etc. **Limits:** Max ZIP size: {max_zip}MB ยท Max files: {max_files} ยท Max single file: {max_file}MB ยท Zip bomb protection enabled """.format(max_zip=MAX_ZIP_SIZE_MB, max_files=MAX_FILES_IN_ZIP, max_file=MAX_SINGLE_FILE_MB)) with gr.Row(): with gr.Column(scale=1): zip_input = gr.File( label="Upload ZIP File", file_types=[".zip"], type="filepath", ) parse_btn = gr.Button( "๐Ÿ” Parse Documents", variant="primary", size="lg", ) summary_output = gr.Markdown(label="Summary", value="*Upload a ZIP file to get started.*") with gr.Tabs(): with gr.Tab("๐Ÿ“‹ File Listing"): file_table = gr.Dataframe( headers=["Status", "Filename", "Extension", "Type", "Size", "Preview"], label="Files in Archive", interactive=False, wrap=True, ) with gr.Tab("๐Ÿ“ Extracted Text"): text_output = gr.Textbox( label="Full Extracted Text (all parseable files concatenated)", lines=30, max_lines=100, buttons=["copy"], ) with gr.Tab("๐Ÿ”Ž File Detail"): gr.Markdown("*Click a row in the **File Listing** tab, then switch here to see the full preview.*") detail_output = gr.Markdown( "โ„น๏ธ Select a file from the **File Listing** tab to see its full preview here." ) with gr.Tab("๐Ÿ“Š JSON Export"): json_output = gr.JSON(label="Structured Parse Results") file_data_state = gr.State([]) parse_btn.click( fn=run_parse, inputs=zip_input, outputs=[summary_output, file_table, text_output, json_output, file_data_state], concurrency_limit=CONCURRENCY_LIMIT, concurrency_id="parse_engine", trigger_mode="once", ) zip_input.upload( fn=run_parse, inputs=zip_input, outputs=[summary_output, file_table, text_output, json_output, file_data_state], concurrency_limit=CONCURRENCY_LIMIT, concurrency_id="parse_engine", trigger_mode="once", ) file_table.select( fn=build_detail, inputs=file_data_state, outputs=detail_output, ) demo.queue(default_concurrency_limit=CONCURRENCY_LIMIT, max_size=20) if __name__ == "__main__": demo.launch( show_error=True, theme=gr.themes.Soft(), css=""" .file-table { font-size: 0.9em; } footer { display: none !important; } """, )