Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| import json | |
| import sys | |
| import time | |
| import traceback | |
| from pathlib import Path | |
| TASKS_ROOT = Path(__file__).parent.parent / "server" / "tasks" | |
| sys.path.insert(0, str(Path(__file__).parent.parent)) | |
| def verify_reference_code(task_dir: Path) -> dict: | |
| task_json_path = task_dir / "task.json" | |
| ref_code_path = task_dir / "reference_code.py" | |
| with open(task_json_path) as f: | |
| task_data = json.load(f) | |
| code = ref_code_path.read_text() | |
| result_dict = { | |
| "task_id": task_data["id"], | |
| "code_executes": False, | |
| "shape_valid": False, | |
| "volume_gt_zero": False, | |
| "ground_truth_generated": False, | |
| "error": None, | |
| } | |
| try: | |
| import cadquery as cq | |
| import math | |
| local_ns = {"cq": cq, "cadquery": cq, "math": math} | |
| exec(code, local_ns) | |
| if "result" not in local_ns: | |
| result_dict["error"] = "No 'result' variable defined" | |
| return result_dict | |
| result_dict["code_executes"] = True | |
| result_obj = local_ns["result"] | |
| if hasattr(result_obj, "val"): | |
| shape = result_obj.val() | |
| else: | |
| shape = result_obj | |
| result_dict["shape_valid"] = shape.isValid() | |
| vol = shape.Volume() | |
| result_dict["volume"] = round(vol, 2) | |
| result_dict["volume_gt_zero"] = vol > 0 | |
| bb = shape.BoundingBox() | |
| result_dict["bbox"] = [round(bb.xlen, 2), round(bb.ylen, 2), round(bb.zlen, 2)] | |
| faces = shape.Faces() | |
| result_dict["face_count"] = len(faces) | |
| except Exception as e: | |
| result_dict["error"] = f"{type(e).__name__}: {e}" | |
| return result_dict | |
| try: | |
| from server.preprocessor import preprocess_from_code | |
| gt = preprocess_from_code(code, str(task_dir), task_id=task_data["id"]) | |
| result_dict["ground_truth_generated"] = True | |
| result_dict["gt_volume"] = gt.get("volume_mm3") | |
| result_dict["gt_euler"] = gt.get("euler_characteristic") | |
| result_dict["gt_dominant_face"] = gt.get("dominant_face_type") | |
| except Exception as e: | |
| result_dict["error"] = f"Preprocessing failed: {type(e).__name__}: {e}" | |
| return result_dict | |
| def run_reward_verification(task_dir: Path) -> dict: | |
| result_dict = { | |
| "reward_computed": False, | |
| "reward_value": 0.0, | |
| "error": None, | |
| } | |
| gt_json = task_dir / "ground_truth.json" | |
| if not gt_json.exists(): | |
| result_dict["error"] = "No ground_truth.json" | |
| return result_dict | |
| ref_code = (task_dir / "reference_code.py").read_text() | |
| try: | |
| from server.executor import execute_cadquery_code | |
| exec_result = execute_cadquery_code(ref_code, timeout=15.0) | |
| if not exec_result["success"]: | |
| result_dict["error"] = f"Execution failed: {exec_result['error']}" | |
| return result_dict | |
| props = exec_result["properties"] | |
| import numpy as np | |
| from server.preprocessor import sample_surface_points, voxelize, normalize_shape | |
| import cadquery as cq | |
| import math | |
| local_ns = {"cq": cq, "cadquery": cq, "math": math} | |
| exec(ref_code, local_ns) | |
| result_obj = local_ns["result"] | |
| shape = result_obj.val() if hasattr(result_obj, "val") else result_obj | |
| normalized_shape, _ = normalize_shape(shape) | |
| agent_points = sample_surface_points(normalized_shape, 2048) | |
| agent_voxels = voxelize(normalized_shape, 64) | |
| gt_points = np.load(str(task_dir / "surface_points.npy")) | |
| gt_voxels = np.load(str(task_dir / "voxels_64.npy")) | |
| from server.reward import compute_iou, best_of_6_iou, compute_mean_chamfer, compute_median_chamfer | |
| iou = compute_iou(agent_voxels, gt_voxels) | |
| iou_best = best_of_6_iou(agent_voxels, gt_voxels) | |
| mean_cd = compute_mean_chamfer(agent_points, gt_points) | |
| median_cd = compute_median_chamfer(agent_points, gt_points) | |
| with open(gt_json) as f: | |
| gt_data = json.load(f) | |
| bbox_mm = gt_data.get("bbox_mm", [1, 1, 1]) | |
| bbox_diag = (sum(d**2 for d in bbox_mm)) ** 0.5 | |
| threshold = bbox_diag * 0.1 | |
| mean_cd_r = max(0, 1 - mean_cd / threshold) if threshold > 0 else 0 | |
| median_cd_r = max(0, 1 - median_cd / threshold) if threshold > 0 else 0 | |
| rgeom = 0.60 * iou_best + 0.20 * mean_cd_r + 0.20 * median_cd_r | |
| frame_gap = iou_best - iou | |
| frame_score = 0.1 if frame_gap > 0.15 else 1.0 | |
| norm_bb = normalized_shape.BoundingBox() | |
| agent_bbox = [round(norm_bb.xlen, 4), round(norm_bb.ylen, 4), round(norm_bb.zlen, 4)] | |
| gt_bbox = gt_data["bbox_mm"] | |
| sorted_a = sorted(agent_bbox, reverse=True) | |
| sorted_t = sorted(gt_bbox, reverse=True) | |
| def match(a, b, tol=0.05): | |
| for ai, bi in zip(a, b): | |
| if bi == 0: | |
| continue | |
| if abs(ai - bi) / max(abs(bi), 1e-6) > tol: | |
| return False | |
| return True | |
| s_match = match(sorted_a, sorted_t) | |
| u_match = match(agent_bbox, gt_bbox) | |
| param_score = 0.1 if (s_match and not u_match) else 1.0 | |
| face_score = 1.0 if props["dominant_face_type"] == gt_data["dominant_face_type"] else 0.0 | |
| reval = 0.40 * frame_score + 0.40 * param_score + 0.20 * face_score | |
| total = 1.0 * (0.70 * rgeom + 0.30 * reval) | |
| result_dict["reward_computed"] = True | |
| result_dict["reward_value"] = round(total, 4) | |
| result_dict["detail"] = { | |
| "iou": round(iou, 4), | |
| "iou_best": round(iou_best, 4), | |
| "mean_cd": round(mean_cd, 4), | |
| "mean_cd_reward": round(mean_cd_r, 4), | |
| "median_cd": round(median_cd, 4), | |
| "median_cd_reward": round(median_cd_r, 4), | |
| "rgeom": round(rgeom, 4), | |
| "frame_score": frame_score, | |
| "param_score": param_score, | |
| "face_score": face_score, | |
| "reval": round(reval, 4), | |
| } | |
| except Exception as e: | |
| result_dict["error"] = f"{type(e).__name__}: {e}\n{traceback.format_exc()}" | |
| return result_dict | |
| def main(): | |
| print("=" * 80) | |
| print("CadForge Task Verification Pipeline") | |
| print("=" * 80) | |
| task_dirs = sorted(TASKS_ROOT.glob("task_*")) | |
| print(f"\nFound {len(task_dirs)} tasks\n") | |
| phase1_results = [] | |
| print("PHASE 1: Verify reference codes execute and generate ground truth") | |
| print("-" * 60) | |
| for task_dir in task_dirs: | |
| t0 = time.time() | |
| result = verify_reference_code(task_dir) | |
| elapsed = time.time() - t0 | |
| status = "PASS" if all([ | |
| result["code_executes"], | |
| result["shape_valid"], | |
| result["volume_gt_zero"], | |
| result["ground_truth_generated"], | |
| ]) else "FAIL" | |
| print(f" {result['task_id']:35s} {status:5s} ({elapsed:.1f}s) " | |
| f"vol={result.get('volume', 'N/A')} bbox={result.get('bbox', 'N/A')}") | |
| if result.get("error"): | |
| print(f" ERROR: {result['error']}") | |
| phase1_results.append(result) | |
| phase1_pass = sum(1 for r in phase1_results if r["ground_truth_generated"]) | |
| print(f"\nPhase 1: {phase1_pass}/{len(phase1_results)} tasks passed\n") | |
| print("PHASE 2: Verify reward scores for reference code (should be ~1.0)") | |
| print("-" * 60) | |
| phase2_results = [] | |
| for task_dir in task_dirs: | |
| t0 = time.time() | |
| result = run_reward_verification(task_dir) | |
| elapsed = time.time() - t0 | |
| task_id = task_dir.name | |
| reward = result.get("reward_value", 0) | |
| status = "PASS" if reward > 0.85 else "WARN" if reward > 0.5 else "FAIL" | |
| print(f" {task_id:35s} {status:5s} reward={reward:.4f} ({elapsed:.1f}s)") | |
| if result.get("error"): | |
| print(f" ERROR: {result['error'][:200]}") | |
| if result.get("detail"): | |
| d = result["detail"] | |
| print(f" IoU={d['iou_best']:.3f} MeanCD_r={d['mean_cd_reward']:.3f} " | |
| f"MedianCD_r={d['median_cd_reward']:.3f} Rgeom={d['rgeom']:.3f} Reval={d['reval']:.3f}") | |
| phase2_results.append(result) | |
| phase2_pass = sum(1 for r in phase2_results if r.get("reward_value", 0) > 0.85) | |
| print(f"\nPhase 2: {phase2_pass}/{len(phase2_results)} tasks score > 0.85") | |
| print("\n" + "=" * 80) | |
| print("SUMMARY") | |
| print(f" Phase 1 (code + ground truth): {phase1_pass}/{len(phase1_results)}") | |
| print(f" Phase 2 (reward > 0.85): {phase2_pass}/{len(phase2_results)}") | |
| print("=" * 80) | |
| with open(TASKS_ROOT.parent / "verification_results.json", "w") as f: | |
| json.dump({ | |
| "phase1": phase1_results, | |
| "phase2": phase2_results, | |
| }, f, indent=2, default=str) | |
| print(f"\nDetailed results saved to verification_results.json") | |
| if phase1_pass < len(phase1_results) or phase2_pass < len(phase2_results): | |
| sys.exit(1) | |
| if __name__ == "__main__": | |
| main() | |