3d_model / scripts /experiments /test_api_with_profiling.py
Azan
Clean deployment build (Squashed)
7a87926
#!/usr/bin/env python3
"""
Test and profile YLFF API endpoints using assets folder.
This script:
1. Tests all available API endpoints
2. Profiles code execution using the built-in profiler
3. Generates performance reports
4. Uses data from assets/ or data/ folders
"""
import argparse
import json
import logging
import sys
import time
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, Optional
import requests
# Setup logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
logger = logging.getLogger(__name__)
# Add project root to path
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root))
try:
from ylff.utils.profiler import Profiler, profile_context
profiler = Profiler.get_instance()
profiler.enabled = True
profiler.reset()
logger.info("Profiler initialized")
except ImportError as e:
logger.warning(f"Could not import profiler: {e}. Continuing without local profiling.")
profiler = None
profile_context = lambda *args, **kwargs: type(
"context", (), {"__enter__": lambda self: self, "__exit__": lambda *args: None}
)()
class APITester:
"""Test and profile YLFF API endpoints."""
def __init__(self, base_url: str = "http://localhost:8000", timeout: int = 300):
"""
Args:
base_url: Base URL of the API server
timeout: Request timeout in seconds
"""
self.base_url = base_url.rstrip("/")
self.timeout = timeout
self.session = requests.Session()
self.results: Dict[str, Any] = {
"start_time": datetime.now().isoformat(),
"endpoints_tested": [],
"errors": [],
"profiling": {},
}
def _make_request(self, method: str, endpoint: str, **kwargs) -> Optional[Dict[str, Any]]:
"""Make an API request with profiling."""
url = f"{self.base_url}{endpoint}"
func_name = f"{method.upper()}_{endpoint.replace('/', '_').replace('-', '_')}"
logger.info(f"Making {method} request to {url}")
ctx = (
profile_context(stage="api_request", endpoint=endpoint, method=method)
if profiler
else type(
"context", (), {"__enter__": lambda self: self, "__exit__": lambda *args: None}
)()
)
with ctx:
try:
start_time = time.time()
logger.debug(f"Request starting: {method} {url}")
response = self.session.request(method, url, timeout=self.timeout, **kwargs)
duration = time.time() - start_time
logger.info(
f"Request completed: {method} {url} - "
f"Status: {response.status_code} - "
f"Duration: {duration:.3f}s"
)
if profiler:
profiler.record(
function_name=func_name,
stage="api_request",
duration=duration,
status_code=response.status_code,
endpoint=endpoint,
)
try:
return {
"status_code": response.status_code,
"data": response.json() if response.content else None,
"duration": duration,
"success": 200 <= response.status_code < 300,
}
except json.JSONDecodeError:
return {
"status_code": response.status_code,
"data": response.text,
"duration": duration,
"success": 200 <= response.status_code < 300,
}
except requests.exceptions.RequestException as e:
duration = time.time() - start_time
logger.error(
f"Request failed: {method} {url} - Error: {str(e)} - Duration: {duration:.3f}s"
)
if profiler:
profiler.record(
function_name=func_name,
stage="api_request",
duration=duration,
error=str(e),
endpoint=endpoint,
)
return {"status_code": None, "error": str(e), "success": False}
def test_health(self) -> Dict[str, Any]:
"""Test health endpoint."""
print("\n[1/10] Testing /health endpoint...")
result = self._make_request("GET", "/health")
self.results["endpoints_tested"].append({"endpoint": "/health", "result": result})
if result and result.get("success"):
print(f" ✓ Health check passed: {result.get('data')}")
else:
print(f" ✗ Health check failed: {result}")
self.results["errors"].append(f"Health check failed: {result}")
return result
def test_root(self) -> Dict[str, Any]:
"""Test root endpoint."""
print("\n[2/10] Testing / endpoint...")
result = self._make_request("GET", "/")
self.results["endpoints_tested"].append({"endpoint": "/", "result": result})
if result and result.get("success"):
data = result.get("data", {})
print(f" ✓ API info retrieved: {data.get('name')} v{data.get('version')}")
else:
print(f" ✗ Root endpoint failed: {result}")
return result
def test_models(self) -> Dict[str, Any]:
"""Test models endpoint."""
print("\n[3/10] Testing /models endpoint...")
result = self._make_request("GET", "/models")
self.results["endpoints_tested"].append({"endpoint": "/models", "result": result})
if result and result.get("success"):
data = result.get("data", {})
models = data.get("models", [])
print(f" ✓ Found {len(models)} models")
else:
print(f" ✗ Models endpoint failed: {result}")
return result
def test_validate_sequence(self, sequence_dir: str) -> Dict[str, Any]:
"""Test sequence validation endpoint."""
print("\n[4/10] Testing /api/v1/validate/sequence endpoint...")
print(f" Using sequence: {sequence_dir}")
payload = {
"sequence_dir": sequence_dir,
"model_name": None,
"use_case": "ba_validation",
"accept_threshold": 2.0,
"reject_threshold": 30.0,
}
with profile_context(stage="validate_sequence", sequence_dir=sequence_dir):
result = self._make_request("POST", "/api/v1/validate/sequence", json=payload)
self.results["endpoints_tested"].append(
{"endpoint": "/api/v1/validate/sequence", "result": result}
)
if result and result.get("success"):
data = result.get("data", {})
job_id = data.get("job_id")
print(f" ✓ Validation job queued: {job_id}")
return {"job_id": job_id, "result": result}
else:
print(f" ✗ Validation failed: {result}")
self.results["errors"].append(f"Sequence validation failed: {result}")
return {"job_id": None, "result": result}
def test_validate_arkit(self, arkit_dir: str) -> Dict[str, Any]:
"""Test ARKit validation endpoint."""
print("\n[5/10] Testing /api/v1/validate/arkit endpoint...")
print(f" Using ARKit dir: {arkit_dir}")
payload = {
"arkit_dir": arkit_dir,
"output_dir": "data/test_arkit_output",
"model_name": None,
"max_frames": 10,
"frame_interval": 1,
"device": "cuda",
"gui": False,
}
with profile_context(stage="validate_arkit", arkit_dir=arkit_dir):
result = self._make_request("POST", "/api/v1/validate/arkit", json=payload)
self.results["endpoints_tested"].append(
{"endpoint": "/api/v1/validate/arkit", "result": result}
)
if result and result.get("success"):
data = result.get("data", {})
job_id = data.get("job_id")
print(f" ✓ ARKit validation job queued: {job_id}")
return {"job_id": job_id, "result": result}
else:
print(f" ✗ ARKit validation failed: {result}")
self.results["errors"].append(f"ARKit validation failed: {result}")
return {"job_id": None, "result": result}
def test_job_status(self, job_id: str) -> Dict[str, Any]:
"""Test job status endpoint."""
if not job_id:
return None
print(f"\n[6/10] Testing /api/v1/jobs/{job_id} endpoint...")
result = self._make_request("GET", f"/api/v1/jobs/{job_id}")
if result and result.get("success"):
data = result.get("data", {})
status = data.get("status", "unknown")
print(f" ✓ Job status: {status}")
return result
else:
print(f" ✗ Job status check failed: {result}")
return result
def test_list_jobs(self) -> Dict[str, Any]:
"""Test list jobs endpoint."""
print("\n[7/10] Testing /api/v1/jobs endpoint...")
result = self._make_request("GET", "/api/v1/jobs")
self.results["endpoints_tested"].append({"endpoint": "/api/v1/jobs", "result": result})
if result and result.get("success"):
data = result.get("data", {})
jobs = data.get("jobs", [])
print(f" ✓ Found {len(jobs)} jobs")
else:
print(f" ✗ List jobs failed: {result}")
return result
def test_profiling_metrics(self) -> Dict[str, Any]:
"""Test profiling metrics endpoint."""
print("\n[8/10] Testing /api/v1/profiling/metrics endpoint...")
result = self._make_request("GET", "/api/v1/profiling/metrics")
self.results["endpoints_tested"].append(
{"endpoint": "/api/v1/profiling/metrics", "result": result}
)
if result and result.get("success"):
data = result.get("data", {})
total_entries = data.get("total_entries", 0)
print(f" ✓ Profiling metrics retrieved: {total_entries} entries")
self.results["profiling"]["metrics"] = data
else:
print(f" ✗ Profiling metrics failed: {result}")
return result
def test_profiling_hot_paths(self) -> Dict[str, Any]:
"""Test profiling hot paths endpoint."""
print("\n[9/10] Testing /api/v1/profiling/hot-paths endpoint...")
result = self._make_request("GET", "/api/v1/profiling/hot-paths")
self.results["endpoints_tested"].append(
{"endpoint": "/api/v1/profiling/hot-paths", "result": result}
)
if result and result.get("success"):
data = result.get("data", {})
hot_paths = data.get("hot_paths", [])
print(f" ✓ Hot paths retrieved: {len(hot_paths)} paths")
if hot_paths:
print(" Top 5 hot paths:")
for i, path in enumerate(hot_paths[:5], 1):
print(f" {i}. {path.get('function')}: {path.get('total_time', 0):.3f}s")
else:
print(f" ✗ Hot paths failed: {result}")
return result
def test_profiling_latency(self) -> Dict[str, Any]:
"""Test profiling latency endpoint."""
print("\n[10/11] Testing /api/v1/profiling/latency endpoint...")
result = self._make_request("GET", "/api/v1/profiling/latency")
self.results["endpoints_tested"].append(
{"endpoint": "/api/v1/profiling/latency", "result": result}
)
if result and result.get("success"):
data = result.get("data", {})
breakdown = data.get("breakdown", {})
print(f" ✓ Latency breakdown retrieved: {len(breakdown)} stages")
if breakdown:
print(" Stage breakdown:")
for stage, stats in list(breakdown.items())[:5]:
print(
f" {stage}: {stats.get('avg_time', 0):.3f}s avg, "
f"{stats.get('percentage', 0):.1f}% of total"
)
else:
print(f" ✗ Latency breakdown failed: {result}")
return result
def test_profiling_system(self) -> Dict[str, Any]:
"""Test profiling system metrics endpoint."""
print("\n[11/11] Testing /api/v1/profiling/system endpoint...")
result = self._make_request("GET", "/api/v1/profiling/system")
self.results["endpoints_tested"].append(
{"endpoint": "/api/v1/profiling/system", "result": result}
)
if result and result.get("success"):
data = result.get("data", {})
metrics = data.get("metrics", [])
count = data.get("count", 0)
print(f" ✓ System metrics retrieved: {count} samples")
if metrics:
latest = metrics[-1]
print(" Latest metrics:")
if latest.get("cpu_percent") is not None:
print(f" CPU: {latest.get('cpu_percent'):.1f}%")
if latest.get("memory_percent") is not None:
print(f" Memory: {latest.get('memory_percent'):.1f}%")
if latest.get("gpu_memory_used") is not None:
print(
f" GPU Memory: {latest.get('gpu_memory_used'):.1f} MB / "
f"{latest.get('gpu_memory_total', 0):.1f} MB"
)
else:
print(f" ✗ System metrics failed: {result}")
return result
def get_profiling_summary(self) -> Dict[str, Any]:
"""Get local profiling summary."""
if profiler:
metrics = profiler.get_metrics()
latency = profiler.get_latency_breakdown()
return {"local_profiler": {"metrics": metrics, "latency_breakdown": latency}}
else:
return {"local_profiler": {"metrics": {}, "latency_breakdown": {}}}
def run_all_tests(self, sequence_dir: Optional[str] = None, arkit_dir: Optional[str] = None):
"""Run all API tests."""
logger.info("=" * 80)
logger.info("YLFF API Testing and Profiling")
logger.info("=" * 80)
logger.info(f"Base URL: {self.base_url}")
logger.info(f"Start time: {self.results['start_time']}")
logger.info(f"Sequence dir: {sequence_dir}")
logger.info(f"ARKit dir: {arkit_dir}")
print("=" * 80)
print("YLFF API Testing and Profiling")
print("=" * 80)
print(f"Base URL: {self.base_url}")
print(f"Start time: {self.results['start_time']}")
# Basic endpoints
self.test_health()
self.test_root()
self.test_models()
# Validation endpoints (if data available)
validate_job_id = None
if sequence_dir and Path(sequence_dir).exists():
validate_result = self.test_validate_sequence(sequence_dir)
validate_job_id = validate_result.get("job_id") if validate_result else None
else:
print("\n[4/10] Skipping /api/v1/validate/sequence (no sequence dir provided)")
if arkit_dir and Path(arkit_dir).exists():
arkit_result = self.test_validate_arkit(arkit_dir)
# Store job_id for potential future use
if arkit_result:
_ = arkit_result.get("job_id") # noqa: F841
else:
print("\n[5/10] Skipping /api/v1/validate/arkit (no ARKit dir provided)")
# Job management
self.test_list_jobs()
if validate_job_id:
time.sleep(2) # Wait a bit for job to start
self.test_job_status(validate_job_id)
# Profiling endpoints
self.test_profiling_metrics()
self.test_profiling_hot_paths()
self.test_profiling_latency()
self.test_profiling_system()
# Get local profiling summary
self.results["profiling"]["local"] = self.get_profiling_summary()
# Final summary
self.results["end_time"] = datetime.now().isoformat()
print("\n" + "=" * 80)
print("Testing Complete")
print("=" * 80)
print(f"Endpoints tested: {len(self.results['endpoints_tested'])}")
print(f"Errors: {len(self.results['errors'])}")
if self.results["errors"]:
print("\nErrors encountered:")
for error in self.results["errors"]:
print(f" - {error}")
return self.results
def save_results(self, output_path: Path):
"""Save test results to JSON file."""
output_path.parent.mkdir(parents=True, exist_ok=True)
with open(output_path, "w") as f:
json.dump(self.results, f, indent=2, default=str)
print(f"\nResults saved to: {output_path}")
def find_test_data(project_root: Path) -> tuple[Optional[str], Optional[str]]:
"""Find available test data in assets/ or data/ folders."""
# Check for assets folder
assets_dir = project_root / "assets"
if assets_dir.exists():
# Look for ARKit data
arkit_dirs = list(assets_dir.rglob("ARKit")) + list(assets_dir.rglob("arkit"))
if arkit_dirs:
arkit_dir = str(arkit_dirs[0])
else:
arkit_dir = None
# Look for image sequences
image_dirs = [d for d in assets_dir.rglob("*") if d.is_dir() and any(d.glob("*.jpg"))]
sequence_dir = str(image_dirs[0]) if image_dirs else None
if arkit_dir or sequence_dir:
return sequence_dir, arkit_dir
# Fall back to data folder
data_dir = project_root / "data"
if data_dir.exists():
# Use arkit_ba_validation as test data
arkit_test_dir = data_dir / "arkit_ba_validation"
if arkit_test_dir.exists():
arkit_dir = str(arkit_test_dir)
else:
arkit_dir = None
# Use ba_work/images as sequence
ba_images_dir = data_dir / "arkit_ba_validation" / "ba_work" / "images"
if ba_images_dir.exists() and any(ba_images_dir.glob("*.jpg")):
sequence_dir = str(ba_images_dir)
else:
sequence_dir = None
return sequence_dir, arkit_dir
return None, None
def main():
parser = argparse.ArgumentParser(description="Test and profile YLFF API endpoints")
parser.add_argument(
"--base-url",
default="http://localhost:8000",
help="Base URL of the API server (default: http://localhost:8000)",
)
parser.add_argument(
"--sequence-dir",
type=str,
help="Directory containing image sequence (auto-detected if not provided)",
)
parser.add_argument(
"--arkit-dir",
type=str,
help="Directory containing ARKit data (auto-detected if not provided)",
)
parser.add_argument(
"--output",
type=str,
default="data/api_test_results.json",
help="Output path for test results (default: data/api_test_results.json)",
)
parser.add_argument(
"--timeout", type=int, default=300, help="Request timeout in seconds (default: 300)"
)
args = parser.parse_args()
# Find test data if not provided
sequence_dir = args.sequence_dir
arkit_dir = args.arkit_dir
if not sequence_dir or not arkit_dir:
found_sequence, found_arkit = find_test_data(project_root)
if not sequence_dir:
sequence_dir = found_sequence
if not arkit_dir:
arkit_dir = found_arkit
# Create tester and run tests
tester = APITester(base_url=args.base_url, timeout=args.timeout)
results = tester.run_all_tests(sequence_dir=sequence_dir, arkit_dir=arkit_dir)
# Save results
output_path = project_root / args.output
tester.save_results(output_path)
# Print profiling summary
if results.get("profiling", {}).get("local"):
local_prof = results["profiling"]["local"]["local_profiler"]
metrics = local_prof.get("metrics", {})
latency = local_prof.get("latency_breakdown", {})
print("\n" + "=" * 80)
print("Profiling Summary")
print("=" * 80)
print(f"Total entries: {metrics.get('total_entries', 0)}")
print(f"Stages tracked: {len(metrics.get('stage_stats', {}))}")
print(f"Functions tracked: {len(metrics.get('function_stats', {}))}")
if latency.get("breakdown"):
print("\nLatency Breakdown:")
for stage, stats in sorted(
latency["breakdown"].items(), key=lambda x: x[1].get("total_time", 0), reverse=True
)[:10]:
print(
f" {stage:30s} {stats.get('total_time', 0):8.3f}s "
f"({stats.get('percentage', 0):5.1f}%) "
f"avg: {stats.get('avg_time', 0):.3f}s "
f"calls: {stats.get('call_count', 0)}"
)
return 0 if len(results.get("errors", [])) == 0 else 1
if __name__ == "__main__":
sys.exit(main())