Spaces:
Runtime error
Runtime error
File size: 4,728 Bytes
1a0b19c | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 | import argparse
import json
import subprocess
from pathlib import Path
from typing import Any, Dict, List
ALLOWED_SUFFIXES = {".pdf", ".docx"}
def convert_doc_to_docx(doc_path: Path) -> Dict[str, Any]:
"""Convert .doc file to .docx using soffice headless."""
out_dir = doc_path.parent
target_path = doc_path.with_suffix(".docx")
cmd = [
"soffice",
"--headless",
"--convert-to",
"docx",
"--outdir",
str(out_dir),
str(doc_path),
]
try:
completed = subprocess.run(cmd, check=False, capture_output=True, text=True)
except FileNotFoundError:
return {
"source": str(doc_path),
"output": str(target_path),
"ok": False,
"reason": "soffice_not_found",
"stdout": "",
"stderr": "",
}
if completed.returncode != 0:
return {
"source": str(doc_path),
"output": str(target_path),
"ok": False,
"reason": "conversion_failed",
"stdout": completed.stdout,
"stderr": completed.stderr,
}
if not target_path.exists():
return {
"source": str(doc_path),
"output": str(target_path),
"ok": False,
"reason": "output_not_created",
"stdout": completed.stdout,
"stderr": completed.stderr,
}
return {
"source": str(doc_path),
"output": str(target_path),
"ok": True,
"reason": "converted",
"stdout": completed.stdout,
"stderr": completed.stderr,
}
def sanitize_docs(root: Path, delete_original_doc: bool) -> Dict[str, Any]:
report: Dict[str, Any] = {
"root": str(root),
"found_doc": 0,
"converted_doc_ok": 0,
"converted_doc_failed": 0,
"removed_non_allowed": 0,
"kept_files": 0,
"conversions": [],
"removed": [],
"errors": [],
}
if not root.exists() or not root.is_dir():
raise FileNotFoundError(f"Root path not found or is not a directory: {root}")
# 1) Convert .doc files first.
doc_files = [p for p in root.rglob("*") if p.is_file() and p.suffix.lower() == ".doc"]
report["found_doc"] = len(doc_files)
for doc_path in doc_files:
result = convert_doc_to_docx(doc_path)
report["conversions"].append(result)
if result["ok"]:
report["converted_doc_ok"] += 1
if delete_original_doc:
try:
doc_path.unlink(missing_ok=True)
except Exception as exc: # noqa: BLE001
report["errors"].append({"path": str(doc_path), "reason": f"delete_doc_failed: {exc}"})
else:
report["converted_doc_failed"] += 1
report["errors"].append({"path": str(doc_path), "reason": result["reason"]})
# 2) Remove files that are not allowed after conversion.
for file_path in [p for p in root.rglob("*") if p.is_file()]:
suffix = file_path.suffix.lower()
if suffix in ALLOWED_SUFFIXES:
report["kept_files"] += 1
continue
try:
file_path.unlink(missing_ok=True)
report["removed_non_allowed"] += 1
report["removed"].append({"path": str(file_path), "reason": f"suffix_not_allowed:{suffix}"})
except Exception as exc: # noqa: BLE001
report["errors"].append({"path": str(file_path), "reason": f"remove_failed: {exc}"})
# 3) Cleanup empty directories.
for path in sorted(root.rglob("*"), key=lambda p: len(p.parts), reverse=True):
if path.is_dir():
try:
if not any(path.iterdir()):
path.rmdir()
except Exception:
pass
return report
def main() -> None:
parser = argparse.ArgumentParser(description="Sanitize docs folder: convert DOC to DOCX and keep only PDF/DOCX.")
parser.add_argument("--root", required=True, help="Root directory to sanitize")
parser.add_argument("--report", required=True, help="Output report JSON path")
parser.add_argument(
"--delete-original-doc",
action="store_true",
help="Delete original .doc files after successful conversion",
)
args = parser.parse_args()
root = Path(args.root).resolve()
report_path = Path(args.report).resolve()
report_path.parent.mkdir(parents=True, exist_ok=True)
report = sanitize_docs(root=root, delete_original_doc=args.delete_original_doc)
report_path.write_text(json.dumps(report, ensure_ascii=False, indent=2), encoding="utf-8")
if __name__ == "__main__":
main()
|