SamChYe's picture
Publish EdgeEDA agent
aa677e3 verified
from __future__ import annotations
import glob
import gzip
import json
import logging
import os
from typing import Any, Dict, Optional, Tuple, List
def _list_json_files(dir_path: str):
if not os.path.isdir(dir_path):
return []
out = []
for fn in os.listdir(dir_path):
if fn.endswith(".json"):
out.append(os.path.join(dir_path, fn))
return out
def _list_json_and_gz_files(dir_path: str) -> List[str]:
"""Return .json and .json.gz files in directory (non-recursive)."""
if not os.path.isdir(dir_path):
return []
out: List[str] = []
for fn in os.listdir(dir_path):
if fn.endswith(".json") or fn.endswith(".json.gz"):
out.append(os.path.join(dir_path, fn))
return out
def find_best_metadata_json(
orfs_flow_dir: str,
platform: str,
design: str,
variant: str,
) -> Optional[str]:
"""
ORFS convention:
reports/<platform>/<design>/<FLOW_VARIANT>/
We search for likely metadata / metrics files and pick most recently modified.
Tries multiple patterns in order of preference:
1. Exact matches: metadata.json, metrics.json
2. Pattern matches: *metadata*.json, *metrics*.json
3. Fallback: any .json file
"""
base = os.path.join(orfs_flow_dir, "reports", platform, design, variant)
if not os.path.exists(base):
logging.debug(f"Reports directory does not exist: {base}")
return None
if not os.path.isdir(base):
logging.warning(f"Reports path exists but is not a directory: {base}")
return None
# Try multiple patterns, searching recursively so nested report dirs are found
patterns = [
"**/metadata.json",
"**/metrics.json",
"**/*metadata*.json",
"**/*metrics*.json",
"**/*final*.json",
"**/*report*.json",
"**/*results*.json",
]
candidates: List[str] = []
for pattern in patterns:
matches = glob.glob(os.path.join(base, pattern), recursive=True)
if matches:
candidates.extend(matches)
logging.debug(f"Found {len(matches)} files matching pattern '{pattern}' under {base}")
break # Prefer the first matching pattern set
# Fallback: any .json or .json.gz file in the dir (non-recursive)
if not candidates:
candidates = _list_json_and_gz_files(base)
if candidates:
logging.debug(f"Using fallback: found {len(candidates)} JSON(/gz) files in {base}")
if not candidates:
logging.warning(f"No JSON files found in {base}")
return None
# If still empty, try searching one level up or across siblings
if not candidates:
parent = os.path.dirname(base)
siblings = glob.glob(os.path.join(parent, "**/*.json"), recursive=True)
if siblings:
candidates = siblings
logging.debug(f"Fallback: found {len(siblings)} JSON files under {parent}")
if not candidates:
logging.warning(f"No JSON files found in {base} or nearby")
return None
# Sort by modification time (most recent first)
candidates.sort(key=lambda p: os.path.getmtime(p), reverse=True)
selected = candidates[0]
logging.debug(f"Selected metadata file: {selected} (from {len(candidates)} candidates)")
return selected
def load_json(path: str) -> Dict[str, Any]:
"""Load JSON file with error handling."""
try:
if path.endswith('.gz') or path.endswith('.json.gz'):
with gzip.open(path, 'rt', encoding='utf-8') as f:
return json.load(f)
with open(path, "r", encoding="utf-8") as f:
return json.load(f)
except json.JSONDecodeError as e:
logging.warning(f"Failed to parse JSON from {path}: {e}. Trying lenient read.")
# Try a lenient read: read file and attempt to find JSON-like substring
try:
if path.endswith('.gz') or path.endswith('.json.gz'):
with gzip.open(path, 'rt', encoding='utf-8', errors='ignore') as f:
txt = f.read()
else:
with open(path, 'r', encoding='utf-8', errors='ignore') as f:
txt = f.read()
# attempt to locate first JSON object within text
start = txt.find('{')
end = txt.rfind('}')
if start != -1 and end != -1 and end > start:
snippet = txt[start:end+1]
return json.loads(snippet)
except Exception as e2:
logging.error(f"Lenient parse failed for {path}: {e2}")
raise
except FileNotFoundError:
logging.error(f"JSON file not found: {path}")
raise
except Exception as e:
logging.error(f"Unexpected error loading JSON from {path}: {e}")
raise
def flatten_metrics(obj: Any, prefix: str = "") -> Dict[str, Any]:
"""
Flattens nested dicts into key paths joined by '__'.
Keeps non-dict leaf values.
"""
out: Dict[str, Any] = {}
if isinstance(obj, dict):
for k, v in obj.items():
kk = f"{prefix}__{k}" if prefix else str(k)
out.update(flatten_metrics(v, kk))
else:
out[prefix] = obj
return out
def coerce_float(x: Any) -> Optional[float]:
if x is None:
return None
if isinstance(x, (int, float)):
return float(x)
if isinstance(x, str):
try:
return float(x)
except ValueError:
return None
return None
def pick_first(metrics_flat: Dict[str, Any], keys: list[str]) -> Optional[float]:
for k in keys:
if k in metrics_flat:
v = coerce_float(metrics_flat[k])
if v is not None:
return v
# also try case-insensitive match
lower = {kk.lower(): kk for kk in metrics_flat.keys()}
for k in keys:
kk = lower.get(k.lower())
if kk:
v = coerce_float(metrics_flat[kk])
if v is not None:
return v
return None