File size: 5,046 Bytes
68c75e6 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 | # =============================================================================
# USAGE EXAMPLES (run from any directory)
# -----------------------------------------------------------------------------
# python file.py -scan C:\Users\User\Desktop\debugrem
# python file.py -scan "D:\My Data\Projects"
# python file.py --scan .
# python file.py -scan C:\Windows\Temp -j 32
#
# FLAGS
# -scan, --scan PATH Root folder to scan (required). Only direct children
# of PATH are listed; folder sizes are total bytes of
# all nested files (recursive).
# -j, --jobs N Parallel workers for sizing top-level folders
# (default: min(32, CPU count * 4)).
#
# OUTPUT FORMAT (columns separated by two spaces)
# <name> <path> <size_KB>
# Paths use OS separators; directories end with a separator.
# Rows are sorted by size_KB descending (largest first); ties by name.
# =============================================================================
from __future__ import annotations
import argparse
import os
import sys
from concurrent.futures import ThreadPoolExecutor, as_completed
from os import scandir, stat_result
def _bytes_to_kb(n: int) -> int:
return (n + 1023) // 1024 if n else 0
def _tree_size_bytes(root: str) -> int:
"""Sum st_size of all regular files under root (iterative, no recursion limit)."""
total = 0
stack = [root]
push = stack.append
pop = stack.pop
while stack:
d = pop()
try:
with scandir(d) as it:
for ent in it:
try:
if ent.is_file(follow_symlinks=False):
st: stat_result = ent.stat(follow_symlinks=False)
total += st.st_size
elif ent.is_dir(follow_symlinks=False):
push(ent.path)
except OSError:
continue
except OSError:
continue
return total
def _format_line(name: str, path_for_display: str, size_kb: int) -> str:
return f"{name} {path_for_display} {size_kb}"
def scan_root(root: str, jobs: int) -> None:
root = os.path.abspath(os.path.normpath(root))
if not os.path.isdir(root):
print(f"Not a directory: {root}", file=sys.stderr)
sys.exit(2)
sep = os.sep
entries: list[tuple[str, str, bool]] = []
try:
with scandir(root) as it:
for ent in it:
try:
is_dir = ent.is_dir(follow_symlinks=False)
except OSError:
continue
entries.append((ent.name, ent.path, is_dir))
except OSError as e:
print(f"Cannot read directory {root}: {e}", file=sys.stderr)
sys.exit(1)
files_ready: list[tuple[str, str, int]] = []
dir_jobs: list[tuple[str, str]] = []
for name, fullpath, is_dir in entries:
if is_dir:
display = fullpath if fullpath.endswith(sep) else fullpath + sep
dir_jobs.append((name, fullpath))
else:
try:
st = os.stat(fullpath, follow_symlinks=False)
sz = st.st_size
except OSError:
sz = 0
display = fullpath
files_ready.append((name, display, _bytes_to_kb(sz)))
dirs_ready: list[tuple[str, str, int]] = []
if dir_jobs:
workers = max(1, min(jobs, len(dir_jobs)))
with ThreadPoolExecutor(max_workers=workers) as ex:
futs = {
ex.submit(_tree_size_bytes, p): (n, p)
for n, p in dir_jobs
}
for fut in as_completed(futs):
name, fullpath = futs[fut]
display = fullpath if fullpath.endswith(sep) else fullpath + sep
try:
b = fut.result()
except Exception:
b = 0
dirs_ready.append((name, display, _bytes_to_kb(b)))
out: list[tuple[str, str, int]] = [*files_ready, *dirs_ready]
out.sort(key=lambda row: (-row[2], row[0].lower()))
for name, path_disp, kb in out:
print(_format_line(name, path_disp, kb))
def main() -> None:
p = argparse.ArgumentParser(
description="List direct children of PATH with sizes (folders = recursive total).",
)
p.add_argument(
"-scan",
"--scan",
dest="root",
metavar="PATH",
required=True,
help="Root directory to scan",
)
p.add_argument(
"-j",
"--jobs",
type=int,
default=max(1, min(32, (os.cpu_count() or 4) * 4)),
metavar="N",
help="Thread workers for parallel folder sizing",
)
args = p.parse_args()
scan_root(args.root, args.jobs)
if __name__ == "__main__":
main()
|