Spaces:
Sleeping
Sleeping
Refactor: production-grade error handling, progress bars, zip bomb protection, per-file isolation, Gradio 6 compat
d60d975 verified | """ | |
| π¦ Document Parser β Production-Grade ZIP Document Extraction Tool | |
| Features: | |
| - Upload ZIP files and parse all supported document formats | |
| - Supports 40+ text/code formats, PDF, DOCX, XLSX | |
| - Zip bomb protection (decompression ratio + size limits) | |
| - Per-file error isolation β one corrupt file won't crash the whole parse | |
| - Progress bars for real-time feedback | |
| - Concurrency-limited to prevent resource exhaustion | |
| - Full structured JSON export + file detail drill-down | |
| """ | |
| from __future__ import annotations | |
| import io | |
| import logging | |
| import os | |
| import traceback | |
| import zipfile | |
| from dataclasses import dataclass, field | |
| from enum import Enum | |
| from typing import Optional | |
| import gradio as gr | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Configuration constants | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| MAX_ZIP_SIZE_MB = 200 | |
| MAX_FILES_IN_ZIP = 500 | |
| MAX_SINGLE_FILE_MB = 50 | |
| MAX_DECOMPRESSION_RATIO = 100 # zip bomb guard: reject if total > ratio Γ compressed | |
| MAX_PREVIEW_CHARS = 5_000 | |
| MAX_FULL_TEXT_CHARS = 500_000 | |
| MAX_XLSX_ROWS = 100 | |
| CONCURRENCY_LIMIT = 3 | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Logging | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| logger = logging.getLogger("document_parser") | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format="%(asctime)s | %(levelname)s | %(message)s", | |
| ) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # File classification | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class FileCategory(str, Enum): | |
| TEXT = "text" | |
| PDF = "pdf" | |
| DOCX = "docx" | |
| XLSX = "xlsx" | |
| IMAGE = "image" | |
| BINARY = "binary" | |
| TEXT_EXTENSIONS = frozenset({ | |
| ".txt", ".md", ".rst", ".py", ".js", ".ts", ".jsx", ".tsx", ".html", | |
| ".htm", ".css", ".scss", ".less", ".json", ".jsonl", ".yaml", ".yml", | |
| ".csv", ".tsv", ".xml", ".toml", ".cfg", ".ini", ".conf", ".properties", | |
| ".sh", ".bash", ".zsh", ".fish", ".bat", ".ps1", ".cmd", | |
| ".r", ".rmd", ".java", ".c", ".cpp", ".h", ".hpp", ".cc", ".cxx", | |
| ".go", ".rs", ".rb", ".php", ".swift", ".kt", ".kts", ".scala", ".clj", | |
| ".sql", ".graphql", ".gql", ".proto", ".thrift", | |
| ".dockerfile", ".makefile", ".cmake", | |
| ".gitignore", ".gitattributes", ".dockerignore", ".editorconfig", | |
| ".env", ".env.example", ".log", ".tex", ".bib", ".sty", | |
| ".lua", ".vim", ".el", ".lisp", ".hs", ".ml", ".mli", ".ex", ".exs", | |
| ".erl", ".hrl", ".dart", ".v", ".sv", ".vhd", ".vhdl", | |
| ".tf", ".tfvars", ".hcl", ".nix", ".dhall", | |
| ".ipynb", | |
| }) | |
| KNOWN_TEXT_FILENAMES = frozenset({ | |
| "Makefile", "Dockerfile", "Procfile", "Vagrantfile", "Gemfile", | |
| "Rakefile", "Brewfile", "Justfile", "Taskfile", | |
| ".gitignore", ".gitattributes", ".dockerignore", ".editorconfig", | |
| ".eslintrc", ".prettierrc", ".babelrc", ".browserslistrc", | |
| "LICENSE", "LICENCE", "COPYING", "AUTHORS", "CONTRIBUTORS", | |
| "CHANGELOG", "CHANGES", "HISTORY", "NEWS", | |
| "README", "INSTALL", "TODO", "HACKING", | |
| "requirements.txt", | |
| }) | |
| IMAGE_EXTENSIONS = frozenset({ | |
| ".png", ".jpg", ".jpeg", ".gif", ".bmp", ".svg", ".webp", ".ico", | |
| ".tiff", ".tif", ".avif", ".heic", ".heif", | |
| }) | |
| CATEGORY_EMOJI = { | |
| FileCategory.TEXT: "π", | |
| FileCategory.PDF: "π", | |
| FileCategory.DOCX: "π", | |
| FileCategory.XLSX: "π", | |
| FileCategory.IMAGE: "πΌοΈ", | |
| FileCategory.BINARY: "π¦", | |
| } | |
| def classify_file(filename: str) -> tuple[FileCategory, str]: | |
| """Classify a file by its extension and known filename patterns.""" | |
| basename = filename.rsplit("/", 1)[-1] if "/" in filename else filename | |
| ext = os.path.splitext(basename)[1].lower() | |
| if not ext and basename in KNOWN_TEXT_FILENAMES: | |
| return FileCategory.TEXT, "" | |
| if not ext and basename.upper() in {n.upper() for n in KNOWN_TEXT_FILENAMES}: | |
| return FileCategory.TEXT, "" | |
| if ext in TEXT_EXTENSIONS: | |
| return FileCategory.TEXT, ext | |
| if ext == ".pdf": | |
| return FileCategory.PDF, ext | |
| if ext == ".docx": | |
| return FileCategory.DOCX, ext | |
| if ext in {".xlsx", ".xls"}: | |
| return FileCategory.XLSX, ext | |
| if ext in IMAGE_EXTENSIONS: | |
| return FileCategory.IMAGE, ext | |
| return FileCategory.BINARY, ext | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Data classes | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class ParsedFile: | |
| filename: str | |
| category: str | |
| extension: str | |
| size_bytes: int | |
| size_display: str | |
| content: str = "" | |
| preview: str = "" | |
| error: Optional[str] = None | |
| warnings: list[str] = field(default_factory=list) | |
| def to_table_row(self) -> list: | |
| status = "β οΈ" if self.warnings else ("β" if self.error else "β ") | |
| preview_text = self.error or self.preview[:200].replace("\n", " ") | |
| return [ | |
| status, | |
| self.filename, | |
| self.extension or "(none)", | |
| self.category, | |
| self.size_display, | |
| preview_text, | |
| ] | |
| class ParseStats: | |
| total_files: int = 0 | |
| parsed_ok: int = 0 | |
| parse_warnings: int = 0 | |
| parse_errors: int = 0 | |
| skipped_dirs: int = 0 | |
| total_compressed_bytes: int = 0 | |
| total_uncompressed_bytes: int = 0 | |
| by_category: dict = field(default_factory=lambda: {c.value: 0 for c in FileCategory}) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Size formatting | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def format_size(size_bytes: int) -> str: | |
| if size_bytes < 0: | |
| return "0 B" | |
| if size_bytes < 1024: | |
| return f"{size_bytes} B" | |
| elif size_bytes < 1024 ** 2: | |
| return f"{size_bytes / 1024:.1f} KB" | |
| elif size_bytes < 1024 ** 3: | |
| return f"{size_bytes / (1024 ** 2):.1f} MB" | |
| else: | |
| return f"{size_bytes / (1024 ** 3):.2f} GB" | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Document parsers β each returns (content, warnings) or raises | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def parse_text_content(data: bytes, filename: str) -> tuple[str, list[str]]: | |
| """Parse plain text / code files.""" | |
| warnings = [] | |
| try: | |
| content = data.decode("utf-8") | |
| except UnicodeDecodeError: | |
| try: | |
| content = data.decode("latin-1") | |
| warnings.append("Decoded with latin-1 fallback (not valid UTF-8)") | |
| except Exception: | |
| content = data.decode("utf-8", errors="replace") | |
| warnings.append("Contains invalid bytes; replaced with placeholders") | |
| if len(content) > MAX_FULL_TEXT_CHARS: | |
| warnings.append(f"Content truncated to {MAX_FULL_TEXT_CHARS:,} characters (original: {len(content):,})") | |
| content = content[:MAX_FULL_TEXT_CHARS] + "\n\n... [TRUNCATED]" | |
| return content, warnings | |
| def parse_pdf_content(data: bytes, filename: str) -> tuple[str, list[str]]: | |
| """Parse PDF bytes to text using PyMuPDF.""" | |
| warnings = [] | |
| try: | |
| import fitz | |
| except ImportError: | |
| return "[PDF library not available]", ["PyMuPDF not installed β install with: pip install PyMuPDF"] | |
| doc = None | |
| try: | |
| doc = fitz.open(stream=data, filetype="pdf") | |
| if doc.is_encrypted: | |
| return "", ["PDF is password-protected and cannot be parsed"] | |
| page_count = len(doc) | |
| if page_count == 0: | |
| return "", ["PDF has 0 pages"] | |
| text_parts = [] | |
| empty_pages = 0 | |
| for page_num in range(page_count): | |
| try: | |
| page = doc[page_num] | |
| page_text = page.get_text().strip() | |
| if page_text: | |
| text_parts.append(f"\n--- Page {page_num + 1}/{page_count} ---\n{page_text}") | |
| else: | |
| empty_pages += 1 | |
| except Exception as e: | |
| warnings.append(f"Page {page_num + 1} failed: {type(e).__name__}: {e}") | |
| if empty_pages > 0: | |
| warnings.append(f"{empty_pages}/{page_count} pages had no extractable text (may be scanned/image-based)") | |
| content = "\n".join(text_parts) if text_parts else "[No extractable text found]" | |
| if not text_parts and empty_pages == page_count: | |
| warnings.append("PDF appears to be entirely image-based; OCR would be needed to extract text") | |
| return content, warnings | |
| except Exception as e: | |
| logger.error(f"PDF parse error for {filename}: {e}") | |
| return "", [f"PDF parse failed: {type(e).__name__}: {e}"] | |
| finally: | |
| if doc: | |
| try: | |
| doc.close() | |
| except Exception: | |
| pass | |
| def parse_docx_content(data: bytes, filename: str) -> tuple[str, list[str]]: | |
| """Parse DOCX bytes to text.""" | |
| warnings = [] | |
| try: | |
| from docx import Document | |
| except ImportError: | |
| return "[DOCX library not available]", ["python-docx not installed"] | |
| try: | |
| doc = Document(io.BytesIO(data)) | |
| parts = [] | |
| paragraphs = [p.text for p in doc.paragraphs if p.text.strip()] | |
| if paragraphs: | |
| parts.extend(paragraphs) | |
| for i, table in enumerate(doc.tables): | |
| try: | |
| table_text = f"\n--- Table {i + 1} ---\n" | |
| for row in table.rows: | |
| cells = [cell.text.strip() for cell in row.cells] | |
| table_text += " | ".join(cells) + "\n" | |
| parts.append(table_text) | |
| except Exception as e: | |
| warnings.append(f"Table {i + 1} extraction failed: {e}") | |
| content = "\n".join(parts) if parts else "[DOCX: empty document]" | |
| if not parts: | |
| warnings.append("Document contains no paragraphs or tables") | |
| return content, warnings | |
| except Exception as e: | |
| logger.error(f"DOCX parse error for {filename}: {e}") | |
| return "", [f"DOCX parse failed: {type(e).__name__}: {e}"] | |
| def parse_xlsx_content(data: bytes, filename: str) -> tuple[str, list[str]]: | |
| """Parse XLSX bytes to text summary.""" | |
| warnings = [] | |
| try: | |
| import openpyxl | |
| except ImportError: | |
| return "[XLSX library not available]", ["openpyxl not installed"] | |
| wb = None | |
| try: | |
| wb = openpyxl.load_workbook(io.BytesIO(data), read_only=True, data_only=True) | |
| parts = [] | |
| for sheet_name in wb.sheetnames: | |
| try: | |
| ws = wb[sheet_name] | |
| sheet_text = f"\n--- Sheet: {sheet_name} ---\n" | |
| row_count = 0 | |
| for row in ws.iter_rows(values_only=True): | |
| if row_count >= MAX_XLSX_ROWS: | |
| sheet_text += f"\n... (truncated at {MAX_XLSX_ROWS} rows)\n" | |
| warnings.append(f"Sheet '{sheet_name}' truncated at {MAX_XLSX_ROWS} rows") | |
| break | |
| cells = [str(cell) if cell is not None else "" for cell in row] | |
| sheet_text += " | ".join(cells) + "\n" | |
| row_count += 1 | |
| if row_count == 0: | |
| sheet_text += "(empty sheet)\n" | |
| parts.append(sheet_text) | |
| except Exception as e: | |
| warnings.append(f"Sheet '{sheet_name}' failed: {type(e).__name__}: {e}") | |
| content = "\n".join(parts) if parts else "[XLSX: empty workbook]" | |
| return content, warnings | |
| except Exception as e: | |
| logger.error(f"XLSX parse error for {filename}: {e}") | |
| return "", [f"XLSX parse failed: {type(e).__name__}: {e}"] | |
| finally: | |
| if wb: | |
| try: | |
| wb.close() | |
| except Exception: | |
| pass | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Validation layer | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def validate_upload(file_path: str | None) -> str: | |
| """Validate the uploaded file. Returns the resolved file path. Raises gr.Error on failure.""" | |
| if file_path is None: | |
| raise gr.Error("β οΈ Please upload a ZIP file first.") | |
| if not os.path.isfile(file_path): | |
| raise gr.Error("β Upload failed β file not found on server. Please try again.") | |
| file_size = os.path.getsize(file_path) | |
| if file_size == 0: | |
| raise gr.Error("β The uploaded file is empty (0 bytes).") | |
| size_mb = file_size / (1024 ** 2) | |
| if size_mb > MAX_ZIP_SIZE_MB: | |
| raise gr.Error( | |
| f"β File too large: {size_mb:.1f} MB. " | |
| f"Maximum allowed is {MAX_ZIP_SIZE_MB} MB." | |
| ) | |
| if not zipfile.is_zipfile(file_path): | |
| raise gr.Error( | |
| "β Not a valid ZIP archive. The file may be corrupted, " | |
| "or it may be a different archive format (tar, rar, 7z)." | |
| ) | |
| return file_path | |
| def check_zip_bomb(zf: zipfile.ZipFile, compressed_size: int) -> list[str]: | |
| """Check for zip bomb indicators. Returns warnings. Raises gr.Error if malicious.""" | |
| warnings = [] | |
| total_uncompressed = sum(info.file_size for info in zf.infolist() if not info.is_dir()) | |
| if compressed_size > 0: | |
| ratio = total_uncompressed / compressed_size | |
| if ratio > MAX_DECOMPRESSION_RATIO: | |
| raise gr.Error( | |
| f"π‘οΈ Zip bomb detected! Decompression ratio is {ratio:.0f}x " | |
| f"(compressed: {format_size(compressed_size)}, " | |
| f"uncompressed: {format_size(total_uncompressed)}). " | |
| f"Maximum allowed ratio is {MAX_DECOMPRESSION_RATIO}x." | |
| ) | |
| if ratio > MAX_DECOMPRESSION_RATIO / 2: | |
| warnings.append( | |
| f"High decompression ratio ({ratio:.0f}x) β approaching the " | |
| f"{MAX_DECOMPRESSION_RATIO}x safety limit" | |
| ) | |
| total_uncompressed_mb = total_uncompressed / (1024 ** 2) | |
| if total_uncompressed_mb > MAX_ZIP_SIZE_MB * 5: | |
| raise gr.Error( | |
| f"π‘οΈ Uncompressed content too large: {total_uncompressed_mb:.0f} MB. " | |
| f"Maximum is {MAX_ZIP_SIZE_MB * 5} MB." | |
| ) | |
| return warnings | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Core parsing engine | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def parse_zip(file_path: str, progress: gr.Progress) -> tuple[list[ParsedFile], ParseStats]: | |
| """Parse all files in a ZIP archive with per-file error isolation.""" | |
| file_size = os.path.getsize(file_path) | |
| stats = ParseStats() | |
| try: | |
| zf = zipfile.ZipFile(file_path, "r") | |
| except zipfile.BadZipFile: | |
| raise gr.Error("β ZIP file is corrupted and cannot be opened.") | |
| except Exception as e: | |
| raise gr.Error(f"β Failed to open ZIP: {type(e).__name__}: {e}") | |
| try: | |
| bomb_warnings = check_zip_bomb(zf, file_size) | |
| entries = [info for info in zf.infolist() if not info.is_dir()] | |
| stats.skipped_dirs = len(zf.infolist()) - len(entries) | |
| stats.total_files = len(entries) | |
| stats.total_compressed_bytes = file_size | |
| if stats.total_files == 0: | |
| raise gr.Error("β ZIP archive contains no files (only directories).") | |
| truncated = False | |
| if stats.total_files > MAX_FILES_IN_ZIP: | |
| gr.Warning( | |
| f"ZIP contains {stats.total_files} files β " | |
| f"processing first {MAX_FILES_IN_ZIP} only." | |
| ) | |
| entries = entries[:MAX_FILES_IN_ZIP] | |
| truncated = True | |
| parsed_files: list[ParsedFile] = [] | |
| for i, info in enumerate(progress.tqdm(entries, desc="Parsing documents")): | |
| category, ext = classify_file(info.filename) | |
| stats.by_category[category.value] += 1 | |
| stats.total_uncompressed_bytes += info.file_size | |
| pf = ParsedFile( | |
| filename=info.filename, | |
| category=category.value, | |
| extension=ext or "(none)", | |
| size_bytes=info.file_size, | |
| size_display=format_size(info.file_size), | |
| ) | |
| file_mb = info.file_size / (1024 ** 2) | |
| if file_mb > MAX_SINGLE_FILE_MB: | |
| pf.error = f"Skipped: file too large ({file_mb:.1f} MB > {MAX_SINGLE_FILE_MB} MB limit)" | |
| pf.warnings.append(pf.error) | |
| stats.parse_warnings += 1 | |
| parsed_files.append(pf) | |
| continue | |
| try: | |
| raw_data = zf.read(info) | |
| except RuntimeError as e: | |
| pf.error = f"Cannot read: {e}" | |
| if "password" in str(e).lower(): | |
| pf.error = "File is password-protected" | |
| stats.parse_errors += 1 | |
| parsed_files.append(pf) | |
| continue | |
| except Exception as e: | |
| pf.error = f"Read failed: {type(e).__name__}: {e}" | |
| stats.parse_errors += 1 | |
| parsed_files.append(pf) | |
| continue | |
| try: | |
| if category == FileCategory.TEXT: | |
| content, warnings = parse_text_content(raw_data, info.filename) | |
| elif category == FileCategory.PDF: | |
| content, warnings = parse_pdf_content(raw_data, info.filename) | |
| elif category == FileCategory.DOCX: | |
| content, warnings = parse_docx_content(raw_data, info.filename) | |
| elif category == FileCategory.XLSX: | |
| content, warnings = parse_xlsx_content(raw_data, info.filename) | |
| elif category == FileCategory.IMAGE: | |
| content = "" | |
| warnings = [] | |
| pf.preview = f"[Image: {ext}, {pf.size_display}]" | |
| else: | |
| content = "" | |
| warnings = [] | |
| pf.preview = f"[Binary: {ext}, {pf.size_display}]" | |
| pf.content = content | |
| pf.preview = content[:MAX_PREVIEW_CHARS] if content else pf.preview | |
| pf.warnings = warnings | |
| if warnings: | |
| stats.parse_warnings += 1 | |
| else: | |
| stats.parsed_ok += 1 | |
| except MemoryError: | |
| pf.error = "Out of memory while parsing this file" | |
| stats.parse_errors += 1 | |
| logger.error(f"MemoryError parsing {info.filename}") | |
| except Exception as e: | |
| pf.error = f"Parse failed: {type(e).__name__}: {e}" | |
| stats.parse_errors += 1 | |
| logger.error(f"Parse error for {info.filename}: {e}") | |
| traceback.print_exc() | |
| parsed_files.append(pf) | |
| if bomb_warnings: | |
| for w in bomb_warnings: | |
| gr.Warning(w) | |
| if truncated: | |
| stats.parse_warnings += 1 | |
| return parsed_files, stats | |
| finally: | |
| try: | |
| zf.close() | |
| except Exception: | |
| pass | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Output formatters | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def build_summary(stats: ParseStats, parsed_files: list[ParsedFile]) -> str: | |
| """Build a rich markdown summary.""" | |
| alerts = [] | |
| if stats.parse_errors > 0: | |
| alerts.append(f"β οΈ **{stats.parse_errors} file(s) failed to parse** β see β markers in the file listing") | |
| if stats.parse_warnings > 0: | |
| alerts.append(f"βΉοΈ **{stats.parse_warnings} file(s) had warnings** β see β οΈ markers in the file listing") | |
| alert_block = "\n".join(alerts) + "\n\n" if alerts else "" | |
| error_files = [pf for pf in parsed_files if pf.error] | |
| error_block = "" | |
| if error_files: | |
| error_lines = [] | |
| for pf in error_files[:10]: | |
| error_lines.append(f"- `{pf.filename}`: {pf.error}") | |
| if len(error_files) > 10: | |
| error_lines.append(f"- ... and {len(error_files) - 10} more") | |
| error_block = "\n### β Failed Files\n" + "\n".join(error_lines) + "\n\n" | |
| return f"""## π¦ ZIP Archive Summary | |
| {alert_block}| Metric | Value | | |
| |--------|-------| | |
| | **Total files** | {stats.total_files} | | |
| | **Parsed successfully** | {stats.parsed_ok} | | |
| | **With warnings** | {stats.parse_warnings} | | |
| | **Failed** | {stats.parse_errors} | | |
| | **Compressed size** | {format_size(stats.total_compressed_bytes)} | | |
| | **Uncompressed size** | {format_size(stats.total_uncompressed_bytes)} | | |
| | **Directories skipped** | {stats.skipped_dirs} | | |
| ### π File Types | |
| | Category | Count | | |
| |----------|-------| | |
| | Text/Code | {stats.by_category.get('text', 0)} | | |
| | PDF | {stats.by_category.get('pdf', 0)} | | |
| | DOCX | {stats.by_category.get('docx', 0)} | | |
| | XLSX | {stats.by_category.get('xlsx', 0)} | | |
| | Image | {stats.by_category.get('image', 0)} | | |
| | Binary | {stats.by_category.get('binary', 0)} | | |
| {error_block}""" | |
| def build_full_text(parsed_files: list[ParsedFile]) -> str: | |
| """Build concatenated text output from all parsed files.""" | |
| parts = [] | |
| for pf in parsed_files: | |
| if pf.content: | |
| emoji = CATEGORY_EMOJI.get(FileCategory(pf.category), "π") | |
| parts.append( | |
| f"\n{'=' * 70}\n" | |
| f"{emoji} {pf.filename}" | |
| f"{' β οΈ ' + ', '.join(pf.warnings) if pf.warnings else ''}\n" | |
| f"{'=' * 70}\n" | |
| f"{pf.content}" | |
| ) | |
| elif pf.error: | |
| parts.append( | |
| f"\n{'=' * 70}\n" | |
| f"β {pf.filename} β ERROR: {pf.error}\n" | |
| f"{'=' * 70}" | |
| ) | |
| if not parts: | |
| return "(No text content was extracted from any file in the archive.)" | |
| full = "\n".join(parts) | |
| if len(full) > MAX_FULL_TEXT_CHARS: | |
| full = full[:MAX_FULL_TEXT_CHARS] + "\n\n... [OUTPUT TRUNCATED β too large to display fully]" | |
| return full | |
| def build_json(parsed_files: list[ParsedFile]) -> list[dict]: | |
| """Build structured JSON output.""" | |
| output = [] | |
| for pf in parsed_files: | |
| entry = { | |
| "filename": pf.filename, | |
| "category": pf.category, | |
| "extension": pf.extension, | |
| "size_bytes": pf.size_bytes, | |
| "size_display": pf.size_display, | |
| "preview": pf.preview[:1000], | |
| "status": "error" if pf.error else ("warning" if pf.warnings else "ok"), | |
| } | |
| if pf.error: | |
| entry["error"] = pf.error | |
| if pf.warnings: | |
| entry["warnings"] = pf.warnings | |
| output.append(entry) | |
| return output | |
| def build_detail(file_data: list[dict], evt: gr.SelectData) -> str: | |
| """Build detail view when user clicks a table row.""" | |
| if not file_data or not isinstance(file_data, list): | |
| return "βΉοΈ Select a file from the **File Listing** tab to see its full preview here." | |
| try: | |
| row_idx = evt.index[0] if isinstance(evt.index, (list, tuple)) else evt.index | |
| except (TypeError, IndexError): | |
| return "β οΈ Could not determine selected row. Please click a row in the file listing." | |
| if not (0 <= row_idx < len(file_data)): | |
| return f"β οΈ Row index {row_idx} is out of range (0β{len(file_data) - 1})." | |
| item = file_data[row_idx] | |
| header = f"## {CATEGORY_EMOJI.get(item.get('category', ''), 'π')} {item['filename']}\n" | |
| meta = f"**Category:** {item.get('category', 'unknown')} | **Size:** {item.get('size_display', 'unknown')}\n\n" | |
| sections = [header, meta] | |
| if item.get("error"): | |
| sections.append(f"### β Error\n```\n{item['error']}\n```\n") | |
| if item.get("warnings"): | |
| sections.append("### β οΈ Warnings\n" + "\n".join(f"- {w}" for w in item["warnings"]) + "\n\n") | |
| preview = item.get("preview", "") | |
| if preview and not preview.startswith("["): | |
| ext = item.get("extension", "").lstrip(".") | |
| lang_map = { | |
| "py": "python", "js": "javascript", "ts": "typescript", | |
| "json": "json", "yaml": "yaml", "yml": "yaml", | |
| "html": "html", "htm": "html", "css": "css", | |
| "sql": "sql", "sh": "bash", "bash": "bash", | |
| "java": "java", "c": "c", "cpp": "cpp", "go": "go", | |
| "rs": "rust", "rb": "ruby", "php": "php", "xml": "xml", | |
| "md": "markdown", "toml": "toml", "csv": "csv", | |
| } | |
| lang = lang_map.get(ext, "") | |
| sections.append(f"### π Content Preview\n```{lang}\n{preview}\n```") | |
| elif preview: | |
| sections.append(f"### π Info\n{preview}") | |
| else: | |
| sections.append("*(No content to preview for this file type.)*") | |
| return "\n".join(sections) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Main entry point | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def run_parse(file_obj, progress=gr.Progress()): | |
| """Top-level handler: validate β parse β format outputs.""" | |
| try: | |
| file_path = file_obj if isinstance(file_obj, str) else getattr(file_obj, "name", None) | |
| progress(0.0, desc="Validating upload...") | |
| file_path = validate_upload(file_path) | |
| gr.Info(f"π¦ Processing ZIP file ({format_size(os.path.getsize(file_path))})...") | |
| parsed_files, stats = parse_zip(file_path, progress) | |
| progress(0.95, desc="Building output...") | |
| summary = build_summary(stats, parsed_files) | |
| table_rows = [pf.to_table_row() for pf in parsed_files] | |
| full_text = build_full_text(parsed_files) | |
| json_data = build_json(parsed_files) | |
| progress(1.0, desc="Done!") | |
| if stats.parse_errors > 0: | |
| gr.Warning(f"{stats.parse_errors} file(s) failed to parse. See details below.") | |
| elif stats.parse_warnings > 0: | |
| gr.Info(f"β Parsed {stats.parsed_ok} files with {stats.parse_warnings} warning(s).") | |
| else: | |
| gr.Info(f"β Successfully parsed all {stats.parsed_ok} files!") | |
| return summary, table_rows, full_text, json_data, json_data | |
| except gr.Error: | |
| raise | |
| except MemoryError: | |
| logger.error("MemoryError during ZIP processing") | |
| raise gr.Error( | |
| "π₯ Out of memory! The ZIP file contents are too large to process. " | |
| "Try a smaller archive or one with fewer/smaller files." | |
| ) | |
| except Exception as e: | |
| logger.error(f"Unexpected error: {type(e).__name__}: {e}") | |
| traceback.print_exc() | |
| raise gr.Error( | |
| f"π₯ An unexpected error occurred: {type(e).__name__}: {e}\n\n" | |
| "If this persists, please report it as a bug." | |
| ) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Gradio UI | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| with gr.Blocks( | |
| title="π¦ Document Parser", | |
| ) as demo: | |
| gr.Markdown(""" | |
| # π¦ Document Parser | |
| Upload a **ZIP file** and this tool extracts & parses text from every supported document inside it. | |
| **Supported formats:** `.txt`, `.md`, `.py`, `.js`, `.ts`, `.json`, `.yaml`, `.csv`, `.html`, `.xml`, | |
| `.pdf`, `.docx`, `.xlsx`, and **40+ more** text/code formats β including `Makefile`, `Dockerfile`, `LICENSE`, etc. | |
| **Limits:** Max ZIP size: {max_zip}MB Β· Max files: {max_files} Β· Max single file: {max_file}MB Β· Zip bomb protection enabled | |
| """.format(max_zip=MAX_ZIP_SIZE_MB, max_files=MAX_FILES_IN_ZIP, max_file=MAX_SINGLE_FILE_MB)) | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| zip_input = gr.File( | |
| label="Upload ZIP File", | |
| file_types=[".zip"], | |
| type="filepath", | |
| ) | |
| parse_btn = gr.Button( | |
| "π Parse Documents", | |
| variant="primary", | |
| size="lg", | |
| ) | |
| summary_output = gr.Markdown(label="Summary", value="*Upload a ZIP file to get started.*") | |
| with gr.Tabs(): | |
| with gr.Tab("π File Listing"): | |
| file_table = gr.Dataframe( | |
| headers=["Status", "Filename", "Extension", "Type", "Size", "Preview"], | |
| label="Files in Archive", | |
| interactive=False, | |
| wrap=True, | |
| ) | |
| with gr.Tab("π Extracted Text"): | |
| text_output = gr.Textbox( | |
| label="Full Extracted Text (all parseable files concatenated)", | |
| lines=30, | |
| max_lines=100, | |
| buttons=["copy"], | |
| ) | |
| with gr.Tab("π File Detail"): | |
| gr.Markdown("*Click a row in the **File Listing** tab, then switch here to see the full preview.*") | |
| detail_output = gr.Markdown( | |
| "βΉοΈ Select a file from the **File Listing** tab to see its full preview here." | |
| ) | |
| with gr.Tab("π JSON Export"): | |
| json_output = gr.JSON(label="Structured Parse Results") | |
| file_data_state = gr.State([]) | |
| parse_btn.click( | |
| fn=run_parse, | |
| inputs=zip_input, | |
| outputs=[summary_output, file_table, text_output, json_output, file_data_state], | |
| concurrency_limit=CONCURRENCY_LIMIT, | |
| concurrency_id="parse_engine", | |
| trigger_mode="once", | |
| ) | |
| zip_input.upload( | |
| fn=run_parse, | |
| inputs=zip_input, | |
| outputs=[summary_output, file_table, text_output, json_output, file_data_state], | |
| concurrency_limit=CONCURRENCY_LIMIT, | |
| concurrency_id="parse_engine", | |
| trigger_mode="once", | |
| ) | |
| file_table.select( | |
| fn=build_detail, | |
| inputs=file_data_state, | |
| outputs=detail_output, | |
| ) | |
| demo.queue(default_concurrency_limit=CONCURRENCY_LIMIT, max_size=20) | |
| if __name__ == "__main__": | |
| demo.launch( | |
| show_error=True, | |
| theme=gr.themes.Soft(), | |
| css=""" | |
| .file-table { font-size: 0.9em; } | |
| footer { display: none !important; } | |
| """, | |
| ) | |