File size: 4,672 Bytes
9e3d618 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 |
import argparse
import json
import os
import re
from typing import Any, Dict
TIMESTAMP_REGEX = re.compile(r"_(\d{8}_\d{6})\.json$")
# Capture kind and timestamp anywhere in the filename, like ..._analysis_YYYYMMDD_HHMMSS.json
KIND_TS_INFIX_REGEX = re.compile(r"(analysis|iterations|messages)_(\d{8}_\d{6})\.json$", re.IGNORECASE)
def extract_timestamp_from_filename(filename: str) -> str:
match = TIMESTAMP_REGEX.search(filename)
return match.group(1) if match else ""
def remove_name_keys(obj: Any) -> Any:
if isinstance(obj, dict):
return {k: remove_name_keys(v) for k, v in obj.items() if k != "name"}
if isinstance(obj, list):
return [remove_name_keys(v) for v in obj]
return obj
def reduce_payload(original: Any, filename: str) -> Dict[str, Any]:
cleaned = remove_name_keys(original)
timestamp = extract_timestamp_from_filename(filename)
if isinstance(cleaned, dict):
result: Dict[str, Any] = {}
if "function" in cleaned:
result["function"] = cleaned["function"]
if "analysis" in cleaned:
result["analysis"] = cleaned["analysis"]
if not result:
# If this is an analysis file (by name), wrap the content as analysis
if "analysis" in os.path.basename(filename):
result = {"analysis": cleaned}
else:
# For other files, keep only top-level function if available in nested items
result = {"analysis": cleaned}
else:
# Non-dict JSON (e.g., list). Treat as analysis content.
result = {"analysis": cleaned}
if timestamp:
result["timestamp"] = timestamp
return result
def compute_new_basename(filename: str) -> str | None:
base = os.path.basename(filename)
m = KIND_TS_INFIX_REGEX.search(base)
if not m:
return None
kind = m.group(1).lower()
ts = m.group(2)
return f"{kind}_{ts}.json"
def safe_rename(path: str, new_basename: str) -> str:
directory = os.path.dirname(path)
target = os.path.join(directory, new_basename)
if os.path.abspath(path) == os.path.abspath(target):
return path
if not os.path.exists(target):
os.replace(path, target)
return target
stem, ext = os.path.splitext(new_basename)
counter = 1
while True:
candidate = os.path.join(directory, f"{stem}_{counter}{ext}")
if not os.path.exists(candidate):
os.replace(path, candidate)
return candidate
counter += 1
def process_file(path: str, do_rename: bool) -> str:
with open(path, "r", encoding="utf-8") as f:
data = json.load(f)
reduced = reduce_payload(data, os.path.basename(path))
with open(path, "w", encoding="utf-8") as f:
json.dump(reduced, f, ensure_ascii=False, indent=2)
f.write("\n")
if do_rename:
new_base = compute_new_basename(path)
if new_base:
path = safe_rename(path, new_base)
print(f"Processed: {os.path.basename(path)}")
return path
def process_directory(target_dir: str, do_rename: bool) -> None:
if not os.path.isdir(target_dir):
raise FileNotFoundError(f"Directory not found: {target_dir}")
for root, _dirs, files in os.walk(target_dir):
for entry in files:
if not entry.lower().endswith(".json"):
continue
path = os.path.join(root, entry)
try:
process_file(path, do_rename)
except Exception as e:
print(f"Failed: {os.path.relpath(path, start=target_dir)}: {e}")
def main() -> None:
parser = argparse.ArgumentParser(description="Trim JSONs to keep only function, analysis, and timestamp; remove name fields. Recurses directories.")
parser.add_argument(
"--path",
help="A file or directory path to process. If omitted, defaults to the bitsadmin analysis dir.",
)
parser.add_argument("--no-rename", action="store_true", help="Do not rename files to kind_timestamp.json")
args = parser.parse_args()
default_dir = os.path.join(
"mordor_dataset",
"eval_output",
"analysis",
)
target = args.path or default_dir
if os.path.isdir(target):
process_directory(target, do_rename=not args.no_rename)
elif os.path.isfile(target):
process_file(target, do_rename=not args.no_rename)
else:
raise FileNotFoundError(f"Path not found: {target}")
if __name__ == "__main__":
main()
|