Spaces:
Runtime error
Runtime error
| import argparse | |
| import json | |
| import subprocess | |
| from pathlib import Path | |
| from typing import Any, Dict, List | |
| ALLOWED_SUFFIXES = {".pdf", ".docx"} | |
| def convert_doc_to_docx(doc_path: Path) -> Dict[str, Any]: | |
| """Convert .doc file to .docx using soffice headless.""" | |
| out_dir = doc_path.parent | |
| target_path = doc_path.with_suffix(".docx") | |
| cmd = [ | |
| "soffice", | |
| "--headless", | |
| "--convert-to", | |
| "docx", | |
| "--outdir", | |
| str(out_dir), | |
| str(doc_path), | |
| ] | |
| try: | |
| completed = subprocess.run(cmd, check=False, capture_output=True, text=True) | |
| except FileNotFoundError: | |
| return { | |
| "source": str(doc_path), | |
| "output": str(target_path), | |
| "ok": False, | |
| "reason": "soffice_not_found", | |
| "stdout": "", | |
| "stderr": "", | |
| } | |
| if completed.returncode != 0: | |
| return { | |
| "source": str(doc_path), | |
| "output": str(target_path), | |
| "ok": False, | |
| "reason": "conversion_failed", | |
| "stdout": completed.stdout, | |
| "stderr": completed.stderr, | |
| } | |
| if not target_path.exists(): | |
| return { | |
| "source": str(doc_path), | |
| "output": str(target_path), | |
| "ok": False, | |
| "reason": "output_not_created", | |
| "stdout": completed.stdout, | |
| "stderr": completed.stderr, | |
| } | |
| return { | |
| "source": str(doc_path), | |
| "output": str(target_path), | |
| "ok": True, | |
| "reason": "converted", | |
| "stdout": completed.stdout, | |
| "stderr": completed.stderr, | |
| } | |
| def sanitize_docs(root: Path, delete_original_doc: bool) -> Dict[str, Any]: | |
| report: Dict[str, Any] = { | |
| "root": str(root), | |
| "found_doc": 0, | |
| "converted_doc_ok": 0, | |
| "converted_doc_failed": 0, | |
| "removed_non_allowed": 0, | |
| "kept_files": 0, | |
| "conversions": [], | |
| "removed": [], | |
| "errors": [], | |
| } | |
| if not root.exists() or not root.is_dir(): | |
| raise FileNotFoundError(f"Root path not found or is not a directory: {root}") | |
| # 1) Convert .doc files first. | |
| doc_files = [p for p in root.rglob("*") if p.is_file() and p.suffix.lower() == ".doc"] | |
| report["found_doc"] = len(doc_files) | |
| for doc_path in doc_files: | |
| result = convert_doc_to_docx(doc_path) | |
| report["conversions"].append(result) | |
| if result["ok"]: | |
| report["converted_doc_ok"] += 1 | |
| if delete_original_doc: | |
| try: | |
| doc_path.unlink(missing_ok=True) | |
| except Exception as exc: # noqa: BLE001 | |
| report["errors"].append({"path": str(doc_path), "reason": f"delete_doc_failed: {exc}"}) | |
| else: | |
| report["converted_doc_failed"] += 1 | |
| report["errors"].append({"path": str(doc_path), "reason": result["reason"]}) | |
| # 2) Remove files that are not allowed after conversion. | |
| for file_path in [p for p in root.rglob("*") if p.is_file()]: | |
| suffix = file_path.suffix.lower() | |
| if suffix in ALLOWED_SUFFIXES: | |
| report["kept_files"] += 1 | |
| continue | |
| try: | |
| file_path.unlink(missing_ok=True) | |
| report["removed_non_allowed"] += 1 | |
| report["removed"].append({"path": str(file_path), "reason": f"suffix_not_allowed:{suffix}"}) | |
| except Exception as exc: # noqa: BLE001 | |
| report["errors"].append({"path": str(file_path), "reason": f"remove_failed: {exc}"}) | |
| # 3) Cleanup empty directories. | |
| for path in sorted(root.rglob("*"), key=lambda p: len(p.parts), reverse=True): | |
| if path.is_dir(): | |
| try: | |
| if not any(path.iterdir()): | |
| path.rmdir() | |
| except Exception: | |
| pass | |
| return report | |
| def main() -> None: | |
| parser = argparse.ArgumentParser(description="Sanitize docs folder: convert DOC to DOCX and keep only PDF/DOCX.") | |
| parser.add_argument("--root", required=True, help="Root directory to sanitize") | |
| parser.add_argument("--report", required=True, help="Output report JSON path") | |
| parser.add_argument( | |
| "--delete-original-doc", | |
| action="store_true", | |
| help="Delete original .doc files after successful conversion", | |
| ) | |
| args = parser.parse_args() | |
| root = Path(args.root).resolve() | |
| report_path = Path(args.report).resolve() | |
| report_path.parent.mkdir(parents=True, exist_ok=True) | |
| report = sanitize_docs(root=root, delete_original_doc=args.delete_original_doc) | |
| report_path.write_text(json.dumps(report, ensure_ascii=False, indent=2), encoding="utf-8") | |
| if __name__ == "__main__": | |
| main() | |