document-parser / app.py
ScottzillaSystems's picture
Refactor: production-grade error handling, progress bars, zip bomb protection, per-file isolation, Gradio 6 compat
d60d975 verified
"""
πŸ“¦ Document Parser β€” Production-Grade ZIP Document Extraction Tool
Features:
- Upload ZIP files and parse all supported document formats
- Supports 40+ text/code formats, PDF, DOCX, XLSX
- Zip bomb protection (decompression ratio + size limits)
- Per-file error isolation β€” one corrupt file won't crash the whole parse
- Progress bars for real-time feedback
- Concurrency-limited to prevent resource exhaustion
- Full structured JSON export + file detail drill-down
"""
from __future__ import annotations
import io
import logging
import os
import traceback
import zipfile
from dataclasses import dataclass, field
from enum import Enum
from typing import Optional
import gradio as gr
# ──────────────────────────────────────────────────────────────────────────────
# Configuration constants
# ──────────────────────────────────────────────────────────────────────────────
MAX_ZIP_SIZE_MB = 200
MAX_FILES_IN_ZIP = 500
MAX_SINGLE_FILE_MB = 50
MAX_DECOMPRESSION_RATIO = 100 # zip bomb guard: reject if total > ratio Γ— compressed
MAX_PREVIEW_CHARS = 5_000
MAX_FULL_TEXT_CHARS = 500_000
MAX_XLSX_ROWS = 100
CONCURRENCY_LIMIT = 3
# ──────────────────────────────────────────────────────────────────────────────
# Logging
# ──────────────────────────────────────────────────────────────────────────────
logger = logging.getLogger("document_parser")
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s | %(levelname)s | %(message)s",
)
# ──────────────────────────────────────────────────────────────────────────────
# File classification
# ──────────────────────────────────────────────────────────────────────────────
class FileCategory(str, Enum):
TEXT = "text"
PDF = "pdf"
DOCX = "docx"
XLSX = "xlsx"
IMAGE = "image"
BINARY = "binary"
TEXT_EXTENSIONS = frozenset({
".txt", ".md", ".rst", ".py", ".js", ".ts", ".jsx", ".tsx", ".html",
".htm", ".css", ".scss", ".less", ".json", ".jsonl", ".yaml", ".yml",
".csv", ".tsv", ".xml", ".toml", ".cfg", ".ini", ".conf", ".properties",
".sh", ".bash", ".zsh", ".fish", ".bat", ".ps1", ".cmd",
".r", ".rmd", ".java", ".c", ".cpp", ".h", ".hpp", ".cc", ".cxx",
".go", ".rs", ".rb", ".php", ".swift", ".kt", ".kts", ".scala", ".clj",
".sql", ".graphql", ".gql", ".proto", ".thrift",
".dockerfile", ".makefile", ".cmake",
".gitignore", ".gitattributes", ".dockerignore", ".editorconfig",
".env", ".env.example", ".log", ".tex", ".bib", ".sty",
".lua", ".vim", ".el", ".lisp", ".hs", ".ml", ".mli", ".ex", ".exs",
".erl", ".hrl", ".dart", ".v", ".sv", ".vhd", ".vhdl",
".tf", ".tfvars", ".hcl", ".nix", ".dhall",
".ipynb",
})
KNOWN_TEXT_FILENAMES = frozenset({
"Makefile", "Dockerfile", "Procfile", "Vagrantfile", "Gemfile",
"Rakefile", "Brewfile", "Justfile", "Taskfile",
".gitignore", ".gitattributes", ".dockerignore", ".editorconfig",
".eslintrc", ".prettierrc", ".babelrc", ".browserslistrc",
"LICENSE", "LICENCE", "COPYING", "AUTHORS", "CONTRIBUTORS",
"CHANGELOG", "CHANGES", "HISTORY", "NEWS",
"README", "INSTALL", "TODO", "HACKING",
"requirements.txt",
})
IMAGE_EXTENSIONS = frozenset({
".png", ".jpg", ".jpeg", ".gif", ".bmp", ".svg", ".webp", ".ico",
".tiff", ".tif", ".avif", ".heic", ".heif",
})
CATEGORY_EMOJI = {
FileCategory.TEXT: "πŸ“„",
FileCategory.PDF: "πŸ“•",
FileCategory.DOCX: "πŸ“˜",
FileCategory.XLSX: "πŸ“Š",
FileCategory.IMAGE: "πŸ–ΌοΈ",
FileCategory.BINARY: "πŸ“¦",
}
def classify_file(filename: str) -> tuple[FileCategory, str]:
"""Classify a file by its extension and known filename patterns."""
basename = filename.rsplit("/", 1)[-1] if "/" in filename else filename
ext = os.path.splitext(basename)[1].lower()
if not ext and basename in KNOWN_TEXT_FILENAMES:
return FileCategory.TEXT, ""
if not ext and basename.upper() in {n.upper() for n in KNOWN_TEXT_FILENAMES}:
return FileCategory.TEXT, ""
if ext in TEXT_EXTENSIONS:
return FileCategory.TEXT, ext
if ext == ".pdf":
return FileCategory.PDF, ext
if ext == ".docx":
return FileCategory.DOCX, ext
if ext in {".xlsx", ".xls"}:
return FileCategory.XLSX, ext
if ext in IMAGE_EXTENSIONS:
return FileCategory.IMAGE, ext
return FileCategory.BINARY, ext
# ──────────────────────────────────────────────────────────────────────────────
# Data classes
# ──────────────────────────────────────────────────────────────────────────────
@dataclass
class ParsedFile:
filename: str
category: str
extension: str
size_bytes: int
size_display: str
content: str = ""
preview: str = ""
error: Optional[str] = None
warnings: list[str] = field(default_factory=list)
def to_table_row(self) -> list:
status = "⚠️" if self.warnings else ("❌" if self.error else "βœ…")
preview_text = self.error or self.preview[:200].replace("\n", " ")
return [
status,
self.filename,
self.extension or "(none)",
self.category,
self.size_display,
preview_text,
]
@dataclass
class ParseStats:
total_files: int = 0
parsed_ok: int = 0
parse_warnings: int = 0
parse_errors: int = 0
skipped_dirs: int = 0
total_compressed_bytes: int = 0
total_uncompressed_bytes: int = 0
by_category: dict = field(default_factory=lambda: {c.value: 0 for c in FileCategory})
# ──────────────────────────────────────────────────────────────────────────────
# Size formatting
# ──────────────────────────────────────────────────────────────────────────────
def format_size(size_bytes: int) -> str:
if size_bytes < 0:
return "0 B"
if size_bytes < 1024:
return f"{size_bytes} B"
elif size_bytes < 1024 ** 2:
return f"{size_bytes / 1024:.1f} KB"
elif size_bytes < 1024 ** 3:
return f"{size_bytes / (1024 ** 2):.1f} MB"
else:
return f"{size_bytes / (1024 ** 3):.2f} GB"
# ──────────────────────────────────────────────────────────────────────────────
# Document parsers β€” each returns (content, warnings) or raises
# ──────────────────────────────────────────────────────────────────────────────
def parse_text_content(data: bytes, filename: str) -> tuple[str, list[str]]:
"""Parse plain text / code files."""
warnings = []
try:
content = data.decode("utf-8")
except UnicodeDecodeError:
try:
content = data.decode("latin-1")
warnings.append("Decoded with latin-1 fallback (not valid UTF-8)")
except Exception:
content = data.decode("utf-8", errors="replace")
warnings.append("Contains invalid bytes; replaced with placeholders")
if len(content) > MAX_FULL_TEXT_CHARS:
warnings.append(f"Content truncated to {MAX_FULL_TEXT_CHARS:,} characters (original: {len(content):,})")
content = content[:MAX_FULL_TEXT_CHARS] + "\n\n... [TRUNCATED]"
return content, warnings
def parse_pdf_content(data: bytes, filename: str) -> tuple[str, list[str]]:
"""Parse PDF bytes to text using PyMuPDF."""
warnings = []
try:
import fitz
except ImportError:
return "[PDF library not available]", ["PyMuPDF not installed β€” install with: pip install PyMuPDF"]
doc = None
try:
doc = fitz.open(stream=data, filetype="pdf")
if doc.is_encrypted:
return "", ["PDF is password-protected and cannot be parsed"]
page_count = len(doc)
if page_count == 0:
return "", ["PDF has 0 pages"]
text_parts = []
empty_pages = 0
for page_num in range(page_count):
try:
page = doc[page_num]
page_text = page.get_text().strip()
if page_text:
text_parts.append(f"\n--- Page {page_num + 1}/{page_count} ---\n{page_text}")
else:
empty_pages += 1
except Exception as e:
warnings.append(f"Page {page_num + 1} failed: {type(e).__name__}: {e}")
if empty_pages > 0:
warnings.append(f"{empty_pages}/{page_count} pages had no extractable text (may be scanned/image-based)")
content = "\n".join(text_parts) if text_parts else "[No extractable text found]"
if not text_parts and empty_pages == page_count:
warnings.append("PDF appears to be entirely image-based; OCR would be needed to extract text")
return content, warnings
except Exception as e:
logger.error(f"PDF parse error for {filename}: {e}")
return "", [f"PDF parse failed: {type(e).__name__}: {e}"]
finally:
if doc:
try:
doc.close()
except Exception:
pass
def parse_docx_content(data: bytes, filename: str) -> tuple[str, list[str]]:
"""Parse DOCX bytes to text."""
warnings = []
try:
from docx import Document
except ImportError:
return "[DOCX library not available]", ["python-docx not installed"]
try:
doc = Document(io.BytesIO(data))
parts = []
paragraphs = [p.text for p in doc.paragraphs if p.text.strip()]
if paragraphs:
parts.extend(paragraphs)
for i, table in enumerate(doc.tables):
try:
table_text = f"\n--- Table {i + 1} ---\n"
for row in table.rows:
cells = [cell.text.strip() for cell in row.cells]
table_text += " | ".join(cells) + "\n"
parts.append(table_text)
except Exception as e:
warnings.append(f"Table {i + 1} extraction failed: {e}")
content = "\n".join(parts) if parts else "[DOCX: empty document]"
if not parts:
warnings.append("Document contains no paragraphs or tables")
return content, warnings
except Exception as e:
logger.error(f"DOCX parse error for {filename}: {e}")
return "", [f"DOCX parse failed: {type(e).__name__}: {e}"]
def parse_xlsx_content(data: bytes, filename: str) -> tuple[str, list[str]]:
"""Parse XLSX bytes to text summary."""
warnings = []
try:
import openpyxl
except ImportError:
return "[XLSX library not available]", ["openpyxl not installed"]
wb = None
try:
wb = openpyxl.load_workbook(io.BytesIO(data), read_only=True, data_only=True)
parts = []
for sheet_name in wb.sheetnames:
try:
ws = wb[sheet_name]
sheet_text = f"\n--- Sheet: {sheet_name} ---\n"
row_count = 0
for row in ws.iter_rows(values_only=True):
if row_count >= MAX_XLSX_ROWS:
sheet_text += f"\n... (truncated at {MAX_XLSX_ROWS} rows)\n"
warnings.append(f"Sheet '{sheet_name}' truncated at {MAX_XLSX_ROWS} rows")
break
cells = [str(cell) if cell is not None else "" for cell in row]
sheet_text += " | ".join(cells) + "\n"
row_count += 1
if row_count == 0:
sheet_text += "(empty sheet)\n"
parts.append(sheet_text)
except Exception as e:
warnings.append(f"Sheet '{sheet_name}' failed: {type(e).__name__}: {e}")
content = "\n".join(parts) if parts else "[XLSX: empty workbook]"
return content, warnings
except Exception as e:
logger.error(f"XLSX parse error for {filename}: {e}")
return "", [f"XLSX parse failed: {type(e).__name__}: {e}"]
finally:
if wb:
try:
wb.close()
except Exception:
pass
# ──────────────────────────────────────────────────────────────────────────────
# Validation layer
# ──────────────────────────────────────────────────────────────────────────────
def validate_upload(file_path: str | None) -> str:
"""Validate the uploaded file. Returns the resolved file path. Raises gr.Error on failure."""
if file_path is None:
raise gr.Error("⚠️ Please upload a ZIP file first.")
if not os.path.isfile(file_path):
raise gr.Error("❌ Upload failed β€” file not found on server. Please try again.")
file_size = os.path.getsize(file_path)
if file_size == 0:
raise gr.Error("❌ The uploaded file is empty (0 bytes).")
size_mb = file_size / (1024 ** 2)
if size_mb > MAX_ZIP_SIZE_MB:
raise gr.Error(
f"❌ File too large: {size_mb:.1f} MB. "
f"Maximum allowed is {MAX_ZIP_SIZE_MB} MB."
)
if not zipfile.is_zipfile(file_path):
raise gr.Error(
"❌ Not a valid ZIP archive. The file may be corrupted, "
"or it may be a different archive format (tar, rar, 7z)."
)
return file_path
def check_zip_bomb(zf: zipfile.ZipFile, compressed_size: int) -> list[str]:
"""Check for zip bomb indicators. Returns warnings. Raises gr.Error if malicious."""
warnings = []
total_uncompressed = sum(info.file_size for info in zf.infolist() if not info.is_dir())
if compressed_size > 0:
ratio = total_uncompressed / compressed_size
if ratio > MAX_DECOMPRESSION_RATIO:
raise gr.Error(
f"πŸ›‘οΈ Zip bomb detected! Decompression ratio is {ratio:.0f}x "
f"(compressed: {format_size(compressed_size)}, "
f"uncompressed: {format_size(total_uncompressed)}). "
f"Maximum allowed ratio is {MAX_DECOMPRESSION_RATIO}x."
)
if ratio > MAX_DECOMPRESSION_RATIO / 2:
warnings.append(
f"High decompression ratio ({ratio:.0f}x) β€” approaching the "
f"{MAX_DECOMPRESSION_RATIO}x safety limit"
)
total_uncompressed_mb = total_uncompressed / (1024 ** 2)
if total_uncompressed_mb > MAX_ZIP_SIZE_MB * 5:
raise gr.Error(
f"πŸ›‘οΈ Uncompressed content too large: {total_uncompressed_mb:.0f} MB. "
f"Maximum is {MAX_ZIP_SIZE_MB * 5} MB."
)
return warnings
# ──────────────────────────────────────────────────────────────────────────────
# Core parsing engine
# ──────────────────────────────────────────────────────────────────────────────
def parse_zip(file_path: str, progress: gr.Progress) -> tuple[list[ParsedFile], ParseStats]:
"""Parse all files in a ZIP archive with per-file error isolation."""
file_size = os.path.getsize(file_path)
stats = ParseStats()
try:
zf = zipfile.ZipFile(file_path, "r")
except zipfile.BadZipFile:
raise gr.Error("❌ ZIP file is corrupted and cannot be opened.")
except Exception as e:
raise gr.Error(f"❌ Failed to open ZIP: {type(e).__name__}: {e}")
try:
bomb_warnings = check_zip_bomb(zf, file_size)
entries = [info for info in zf.infolist() if not info.is_dir()]
stats.skipped_dirs = len(zf.infolist()) - len(entries)
stats.total_files = len(entries)
stats.total_compressed_bytes = file_size
if stats.total_files == 0:
raise gr.Error("❌ ZIP archive contains no files (only directories).")
truncated = False
if stats.total_files > MAX_FILES_IN_ZIP:
gr.Warning(
f"ZIP contains {stats.total_files} files β€” "
f"processing first {MAX_FILES_IN_ZIP} only."
)
entries = entries[:MAX_FILES_IN_ZIP]
truncated = True
parsed_files: list[ParsedFile] = []
for i, info in enumerate(progress.tqdm(entries, desc="Parsing documents")):
category, ext = classify_file(info.filename)
stats.by_category[category.value] += 1
stats.total_uncompressed_bytes += info.file_size
pf = ParsedFile(
filename=info.filename,
category=category.value,
extension=ext or "(none)",
size_bytes=info.file_size,
size_display=format_size(info.file_size),
)
file_mb = info.file_size / (1024 ** 2)
if file_mb > MAX_SINGLE_FILE_MB:
pf.error = f"Skipped: file too large ({file_mb:.1f} MB > {MAX_SINGLE_FILE_MB} MB limit)"
pf.warnings.append(pf.error)
stats.parse_warnings += 1
parsed_files.append(pf)
continue
try:
raw_data = zf.read(info)
except RuntimeError as e:
pf.error = f"Cannot read: {e}"
if "password" in str(e).lower():
pf.error = "File is password-protected"
stats.parse_errors += 1
parsed_files.append(pf)
continue
except Exception as e:
pf.error = f"Read failed: {type(e).__name__}: {e}"
stats.parse_errors += 1
parsed_files.append(pf)
continue
try:
if category == FileCategory.TEXT:
content, warnings = parse_text_content(raw_data, info.filename)
elif category == FileCategory.PDF:
content, warnings = parse_pdf_content(raw_data, info.filename)
elif category == FileCategory.DOCX:
content, warnings = parse_docx_content(raw_data, info.filename)
elif category == FileCategory.XLSX:
content, warnings = parse_xlsx_content(raw_data, info.filename)
elif category == FileCategory.IMAGE:
content = ""
warnings = []
pf.preview = f"[Image: {ext}, {pf.size_display}]"
else:
content = ""
warnings = []
pf.preview = f"[Binary: {ext}, {pf.size_display}]"
pf.content = content
pf.preview = content[:MAX_PREVIEW_CHARS] if content else pf.preview
pf.warnings = warnings
if warnings:
stats.parse_warnings += 1
else:
stats.parsed_ok += 1
except MemoryError:
pf.error = "Out of memory while parsing this file"
stats.parse_errors += 1
logger.error(f"MemoryError parsing {info.filename}")
except Exception as e:
pf.error = f"Parse failed: {type(e).__name__}: {e}"
stats.parse_errors += 1
logger.error(f"Parse error for {info.filename}: {e}")
traceback.print_exc()
parsed_files.append(pf)
if bomb_warnings:
for w in bomb_warnings:
gr.Warning(w)
if truncated:
stats.parse_warnings += 1
return parsed_files, stats
finally:
try:
zf.close()
except Exception:
pass
# ──────────────────────────────────────────────────────────────────────────────
# Output formatters
# ──────────────────────────────────────────────────────────────────────────────
def build_summary(stats: ParseStats, parsed_files: list[ParsedFile]) -> str:
"""Build a rich markdown summary."""
alerts = []
if stats.parse_errors > 0:
alerts.append(f"⚠️ **{stats.parse_errors} file(s) failed to parse** β€” see ❌ markers in the file listing")
if stats.parse_warnings > 0:
alerts.append(f"ℹ️ **{stats.parse_warnings} file(s) had warnings** β€” see ⚠️ markers in the file listing")
alert_block = "\n".join(alerts) + "\n\n" if alerts else ""
error_files = [pf for pf in parsed_files if pf.error]
error_block = ""
if error_files:
error_lines = []
for pf in error_files[:10]:
error_lines.append(f"- `{pf.filename}`: {pf.error}")
if len(error_files) > 10:
error_lines.append(f"- ... and {len(error_files) - 10} more")
error_block = "\n### ❌ Failed Files\n" + "\n".join(error_lines) + "\n\n"
return f"""## πŸ“¦ ZIP Archive Summary
{alert_block}| Metric | Value |
|--------|-------|
| **Total files** | {stats.total_files} |
| **Parsed successfully** | {stats.parsed_ok} |
| **With warnings** | {stats.parse_warnings} |
| **Failed** | {stats.parse_errors} |
| **Compressed size** | {format_size(stats.total_compressed_bytes)} |
| **Uncompressed size** | {format_size(stats.total_uncompressed_bytes)} |
| **Directories skipped** | {stats.skipped_dirs} |
### πŸ“Š File Types
| Category | Count |
|----------|-------|
| Text/Code | {stats.by_category.get('text', 0)} |
| PDF | {stats.by_category.get('pdf', 0)} |
| DOCX | {stats.by_category.get('docx', 0)} |
| XLSX | {stats.by_category.get('xlsx', 0)} |
| Image | {stats.by_category.get('image', 0)} |
| Binary | {stats.by_category.get('binary', 0)} |
{error_block}"""
def build_full_text(parsed_files: list[ParsedFile]) -> str:
"""Build concatenated text output from all parsed files."""
parts = []
for pf in parsed_files:
if pf.content:
emoji = CATEGORY_EMOJI.get(FileCategory(pf.category), "πŸ“„")
parts.append(
f"\n{'=' * 70}\n"
f"{emoji} {pf.filename}"
f"{' ⚠️ ' + ', '.join(pf.warnings) if pf.warnings else ''}\n"
f"{'=' * 70}\n"
f"{pf.content}"
)
elif pf.error:
parts.append(
f"\n{'=' * 70}\n"
f"❌ {pf.filename} β€” ERROR: {pf.error}\n"
f"{'=' * 70}"
)
if not parts:
return "(No text content was extracted from any file in the archive.)"
full = "\n".join(parts)
if len(full) > MAX_FULL_TEXT_CHARS:
full = full[:MAX_FULL_TEXT_CHARS] + "\n\n... [OUTPUT TRUNCATED β€” too large to display fully]"
return full
def build_json(parsed_files: list[ParsedFile]) -> list[dict]:
"""Build structured JSON output."""
output = []
for pf in parsed_files:
entry = {
"filename": pf.filename,
"category": pf.category,
"extension": pf.extension,
"size_bytes": pf.size_bytes,
"size_display": pf.size_display,
"preview": pf.preview[:1000],
"status": "error" if pf.error else ("warning" if pf.warnings else "ok"),
}
if pf.error:
entry["error"] = pf.error
if pf.warnings:
entry["warnings"] = pf.warnings
output.append(entry)
return output
def build_detail(file_data: list[dict], evt: gr.SelectData) -> str:
"""Build detail view when user clicks a table row."""
if not file_data or not isinstance(file_data, list):
return "ℹ️ Select a file from the **File Listing** tab to see its full preview here."
try:
row_idx = evt.index[0] if isinstance(evt.index, (list, tuple)) else evt.index
except (TypeError, IndexError):
return "⚠️ Could not determine selected row. Please click a row in the file listing."
if not (0 <= row_idx < len(file_data)):
return f"⚠️ Row index {row_idx} is out of range (0–{len(file_data) - 1})."
item = file_data[row_idx]
header = f"## {CATEGORY_EMOJI.get(item.get('category', ''), 'πŸ“„')} {item['filename']}\n"
meta = f"**Category:** {item.get('category', 'unknown')} | **Size:** {item.get('size_display', 'unknown')}\n\n"
sections = [header, meta]
if item.get("error"):
sections.append(f"### ❌ Error\n```\n{item['error']}\n```\n")
if item.get("warnings"):
sections.append("### ⚠️ Warnings\n" + "\n".join(f"- {w}" for w in item["warnings"]) + "\n\n")
preview = item.get("preview", "")
if preview and not preview.startswith("["):
ext = item.get("extension", "").lstrip(".")
lang_map = {
"py": "python", "js": "javascript", "ts": "typescript",
"json": "json", "yaml": "yaml", "yml": "yaml",
"html": "html", "htm": "html", "css": "css",
"sql": "sql", "sh": "bash", "bash": "bash",
"java": "java", "c": "c", "cpp": "cpp", "go": "go",
"rs": "rust", "rb": "ruby", "php": "php", "xml": "xml",
"md": "markdown", "toml": "toml", "csv": "csv",
}
lang = lang_map.get(ext, "")
sections.append(f"### πŸ“ Content Preview\n```{lang}\n{preview}\n```")
elif preview:
sections.append(f"### πŸ“ Info\n{preview}")
else:
sections.append("*(No content to preview for this file type.)*")
return "\n".join(sections)
# ──────────────────────────────────────────────────────────────────────────────
# Main entry point
# ──────────────────────────────────────────────────────────────────────────────
def run_parse(file_obj, progress=gr.Progress()):
"""Top-level handler: validate β†’ parse β†’ format outputs."""
try:
file_path = file_obj if isinstance(file_obj, str) else getattr(file_obj, "name", None)
progress(0.0, desc="Validating upload...")
file_path = validate_upload(file_path)
gr.Info(f"πŸ“¦ Processing ZIP file ({format_size(os.path.getsize(file_path))})...")
parsed_files, stats = parse_zip(file_path, progress)
progress(0.95, desc="Building output...")
summary = build_summary(stats, parsed_files)
table_rows = [pf.to_table_row() for pf in parsed_files]
full_text = build_full_text(parsed_files)
json_data = build_json(parsed_files)
progress(1.0, desc="Done!")
if stats.parse_errors > 0:
gr.Warning(f"{stats.parse_errors} file(s) failed to parse. See details below.")
elif stats.parse_warnings > 0:
gr.Info(f"βœ… Parsed {stats.parsed_ok} files with {stats.parse_warnings} warning(s).")
else:
gr.Info(f"βœ… Successfully parsed all {stats.parsed_ok} files!")
return summary, table_rows, full_text, json_data, json_data
except gr.Error:
raise
except MemoryError:
logger.error("MemoryError during ZIP processing")
raise gr.Error(
"πŸ’₯ Out of memory! The ZIP file contents are too large to process. "
"Try a smaller archive or one with fewer/smaller files."
)
except Exception as e:
logger.error(f"Unexpected error: {type(e).__name__}: {e}")
traceback.print_exc()
raise gr.Error(
f"πŸ’₯ An unexpected error occurred: {type(e).__name__}: {e}\n\n"
"If this persists, please report it as a bug."
)
# ──────────────────────────────────────────────────────────────────────────────
# Gradio UI
# ──────────────────────────────────────────────────────────────────────────────
with gr.Blocks(
title="πŸ“¦ Document Parser",
) as demo:
gr.Markdown("""
# πŸ“¦ Document Parser
Upload a **ZIP file** and this tool extracts & parses text from every supported document inside it.
**Supported formats:** `.txt`, `.md`, `.py`, `.js`, `.ts`, `.json`, `.yaml`, `.csv`, `.html`, `.xml`,
`.pdf`, `.docx`, `.xlsx`, and **40+ more** text/code formats β€” including `Makefile`, `Dockerfile`, `LICENSE`, etc.
**Limits:** Max ZIP size: {max_zip}MB Β· Max files: {max_files} Β· Max single file: {max_file}MB Β· Zip bomb protection enabled
""".format(max_zip=MAX_ZIP_SIZE_MB, max_files=MAX_FILES_IN_ZIP, max_file=MAX_SINGLE_FILE_MB))
with gr.Row():
with gr.Column(scale=1):
zip_input = gr.File(
label="Upload ZIP File",
file_types=[".zip"],
type="filepath",
)
parse_btn = gr.Button(
"πŸ” Parse Documents",
variant="primary",
size="lg",
)
summary_output = gr.Markdown(label="Summary", value="*Upload a ZIP file to get started.*")
with gr.Tabs():
with gr.Tab("πŸ“‹ File Listing"):
file_table = gr.Dataframe(
headers=["Status", "Filename", "Extension", "Type", "Size", "Preview"],
label="Files in Archive",
interactive=False,
wrap=True,
)
with gr.Tab("πŸ“ Extracted Text"):
text_output = gr.Textbox(
label="Full Extracted Text (all parseable files concatenated)",
lines=30,
max_lines=100,
buttons=["copy"],
)
with gr.Tab("πŸ”Ž File Detail"):
gr.Markdown("*Click a row in the **File Listing** tab, then switch here to see the full preview.*")
detail_output = gr.Markdown(
"ℹ️ Select a file from the **File Listing** tab to see its full preview here."
)
with gr.Tab("πŸ“Š JSON Export"):
json_output = gr.JSON(label="Structured Parse Results")
file_data_state = gr.State([])
parse_btn.click(
fn=run_parse,
inputs=zip_input,
outputs=[summary_output, file_table, text_output, json_output, file_data_state],
concurrency_limit=CONCURRENCY_LIMIT,
concurrency_id="parse_engine",
trigger_mode="once",
)
zip_input.upload(
fn=run_parse,
inputs=zip_input,
outputs=[summary_output, file_table, text_output, json_output, file_data_state],
concurrency_limit=CONCURRENCY_LIMIT,
concurrency_id="parse_engine",
trigger_mode="once",
)
file_table.select(
fn=build_detail,
inputs=file_data_state,
outputs=detail_output,
)
demo.queue(default_concurrency_limit=CONCURRENCY_LIMIT, max_size=20)
if __name__ == "__main__":
demo.launch(
show_error=True,
theme=gr.themes.Soft(),
css="""
.file-table { font-size: 0.9em; }
footer { display: none !important; }
""",
)