File size: 4,728 Bytes
1a0b19c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
import argparse
import json
import subprocess
from pathlib import Path
from typing import Any, Dict, List

ALLOWED_SUFFIXES = {".pdf", ".docx"}


def convert_doc_to_docx(doc_path: Path) -> Dict[str, Any]:
    """Convert .doc file to .docx using soffice headless."""
    out_dir = doc_path.parent
    target_path = doc_path.with_suffix(".docx")

    cmd = [
        "soffice",
        "--headless",
        "--convert-to",
        "docx",
        "--outdir",
        str(out_dir),
        str(doc_path),
    ]

    try:
        completed = subprocess.run(cmd, check=False, capture_output=True, text=True)
    except FileNotFoundError:
        return {
            "source": str(doc_path),
            "output": str(target_path),
            "ok": False,
            "reason": "soffice_not_found",
            "stdout": "",
            "stderr": "",
        }

    if completed.returncode != 0:
        return {
            "source": str(doc_path),
            "output": str(target_path),
            "ok": False,
            "reason": "conversion_failed",
            "stdout": completed.stdout,
            "stderr": completed.stderr,
        }

    if not target_path.exists():
        return {
            "source": str(doc_path),
            "output": str(target_path),
            "ok": False,
            "reason": "output_not_created",
            "stdout": completed.stdout,
            "stderr": completed.stderr,
        }

    return {
        "source": str(doc_path),
        "output": str(target_path),
        "ok": True,
        "reason": "converted",
        "stdout": completed.stdout,
        "stderr": completed.stderr,
    }


def sanitize_docs(root: Path, delete_original_doc: bool) -> Dict[str, Any]:
    report: Dict[str, Any] = {
        "root": str(root),
        "found_doc": 0,
        "converted_doc_ok": 0,
        "converted_doc_failed": 0,
        "removed_non_allowed": 0,
        "kept_files": 0,
        "conversions": [],
        "removed": [],
        "errors": [],
    }

    if not root.exists() or not root.is_dir():
        raise FileNotFoundError(f"Root path not found or is not a directory: {root}")

    # 1) Convert .doc files first.
    doc_files = [p for p in root.rglob("*") if p.is_file() and p.suffix.lower() == ".doc"]
    report["found_doc"] = len(doc_files)

    for doc_path in doc_files:
        result = convert_doc_to_docx(doc_path)
        report["conversions"].append(result)
        if result["ok"]:
            report["converted_doc_ok"] += 1
            if delete_original_doc:
                try:
                    doc_path.unlink(missing_ok=True)
                except Exception as exc:  # noqa: BLE001
                    report["errors"].append({"path": str(doc_path), "reason": f"delete_doc_failed: {exc}"})
        else:
            report["converted_doc_failed"] += 1
            report["errors"].append({"path": str(doc_path), "reason": result["reason"]})

    # 2) Remove files that are not allowed after conversion.
    for file_path in [p for p in root.rglob("*") if p.is_file()]:
        suffix = file_path.suffix.lower()
        if suffix in ALLOWED_SUFFIXES:
            report["kept_files"] += 1
            continue

        try:
            file_path.unlink(missing_ok=True)
            report["removed_non_allowed"] += 1
            report["removed"].append({"path": str(file_path), "reason": f"suffix_not_allowed:{suffix}"})
        except Exception as exc:  # noqa: BLE001
            report["errors"].append({"path": str(file_path), "reason": f"remove_failed: {exc}"})

    # 3) Cleanup empty directories.
    for path in sorted(root.rglob("*"), key=lambda p: len(p.parts), reverse=True):
        if path.is_dir():
            try:
                if not any(path.iterdir()):
                    path.rmdir()
            except Exception:
                pass

    return report


def main() -> None:
    parser = argparse.ArgumentParser(description="Sanitize docs folder: convert DOC to DOCX and keep only PDF/DOCX.")
    parser.add_argument("--root", required=True, help="Root directory to sanitize")
    parser.add_argument("--report", required=True, help="Output report JSON path")
    parser.add_argument(
        "--delete-original-doc",
        action="store_true",
        help="Delete original .doc files after successful conversion",
    )
    args = parser.parse_args()

    root = Path(args.root).resolve()
    report_path = Path(args.report).resolve()
    report_path.parent.mkdir(parents=True, exist_ok=True)

    report = sanitize_docs(root=root, delete_original_doc=args.delete_original_doc)
    report_path.write_text(json.dumps(report, ensure_ascii=False, indent=2), encoding="utf-8")


if __name__ == "__main__":
    main()