|
|
|
|
|
""" |
|
|
Test and profile YLFF API endpoints using assets folder. |
|
|
|
|
|
This script: |
|
|
1. Tests all available API endpoints |
|
|
2. Profiles code execution using the built-in profiler |
|
|
3. Generates performance reports |
|
|
4. Uses data from assets/ or data/ folders |
|
|
""" |
|
|
|
|
|
import argparse |
|
|
import json |
|
|
import logging |
|
|
import sys |
|
|
import time |
|
|
from datetime import datetime |
|
|
from pathlib import Path |
|
|
from typing import Any, Dict, Optional |
|
|
import requests |
|
|
|
|
|
|
|
|
logging.basicConfig( |
|
|
level=logging.INFO, |
|
|
format="%(asctime)s - %(levelname)s - %(message)s", |
|
|
datefmt="%Y-%m-%d %H:%M:%S", |
|
|
) |
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
project_root = Path(__file__).parent.parent |
|
|
sys.path.insert(0, str(project_root)) |
|
|
|
|
|
try: |
|
|
from ylff.utils.profiler import Profiler, profile_context |
|
|
|
|
|
profiler = Profiler.get_instance() |
|
|
profiler.enabled = True |
|
|
profiler.reset() |
|
|
logger.info("Profiler initialized") |
|
|
except ImportError as e: |
|
|
logger.warning(f"Could not import profiler: {e}. Continuing without local profiling.") |
|
|
profiler = None |
|
|
profile_context = lambda *args, **kwargs: type( |
|
|
"context", (), {"__enter__": lambda self: self, "__exit__": lambda *args: None} |
|
|
)() |
|
|
|
|
|
|
|
|
class APITester: |
|
|
"""Test and profile YLFF API endpoints.""" |
|
|
|
|
|
def __init__(self, base_url: str = "http://localhost:8000", timeout: int = 300): |
|
|
""" |
|
|
Args: |
|
|
base_url: Base URL of the API server |
|
|
timeout: Request timeout in seconds |
|
|
""" |
|
|
self.base_url = base_url.rstrip("/") |
|
|
self.timeout = timeout |
|
|
self.session = requests.Session() |
|
|
self.results: Dict[str, Any] = { |
|
|
"start_time": datetime.now().isoformat(), |
|
|
"endpoints_tested": [], |
|
|
"errors": [], |
|
|
"profiling": {}, |
|
|
} |
|
|
|
|
|
def _make_request(self, method: str, endpoint: str, **kwargs) -> Optional[Dict[str, Any]]: |
|
|
"""Make an API request with profiling.""" |
|
|
url = f"{self.base_url}{endpoint}" |
|
|
func_name = f"{method.upper()}_{endpoint.replace('/', '_').replace('-', '_')}" |
|
|
|
|
|
logger.info(f"Making {method} request to {url}") |
|
|
|
|
|
ctx = ( |
|
|
profile_context(stage="api_request", endpoint=endpoint, method=method) |
|
|
if profiler |
|
|
else type( |
|
|
"context", (), {"__enter__": lambda self: self, "__exit__": lambda *args: None} |
|
|
)() |
|
|
) |
|
|
with ctx: |
|
|
try: |
|
|
start_time = time.time() |
|
|
logger.debug(f"Request starting: {method} {url}") |
|
|
response = self.session.request(method, url, timeout=self.timeout, **kwargs) |
|
|
duration = time.time() - start_time |
|
|
logger.info( |
|
|
f"Request completed: {method} {url} - " |
|
|
f"Status: {response.status_code} - " |
|
|
f"Duration: {duration:.3f}s" |
|
|
) |
|
|
|
|
|
if profiler: |
|
|
profiler.record( |
|
|
function_name=func_name, |
|
|
stage="api_request", |
|
|
duration=duration, |
|
|
status_code=response.status_code, |
|
|
endpoint=endpoint, |
|
|
) |
|
|
|
|
|
try: |
|
|
return { |
|
|
"status_code": response.status_code, |
|
|
"data": response.json() if response.content else None, |
|
|
"duration": duration, |
|
|
"success": 200 <= response.status_code < 300, |
|
|
} |
|
|
except json.JSONDecodeError: |
|
|
return { |
|
|
"status_code": response.status_code, |
|
|
"data": response.text, |
|
|
"duration": duration, |
|
|
"success": 200 <= response.status_code < 300, |
|
|
} |
|
|
except requests.exceptions.RequestException as e: |
|
|
duration = time.time() - start_time |
|
|
logger.error( |
|
|
f"Request failed: {method} {url} - Error: {str(e)} - Duration: {duration:.3f}s" |
|
|
) |
|
|
if profiler: |
|
|
profiler.record( |
|
|
function_name=func_name, |
|
|
stage="api_request", |
|
|
duration=duration, |
|
|
error=str(e), |
|
|
endpoint=endpoint, |
|
|
) |
|
|
return {"status_code": None, "error": str(e), "success": False} |
|
|
|
|
|
def test_health(self) -> Dict[str, Any]: |
|
|
"""Test health endpoint.""" |
|
|
print("\n[1/10] Testing /health endpoint...") |
|
|
result = self._make_request("GET", "/health") |
|
|
self.results["endpoints_tested"].append({"endpoint": "/health", "result": result}) |
|
|
if result and result.get("success"): |
|
|
print(f" ✓ Health check passed: {result.get('data')}") |
|
|
else: |
|
|
print(f" ✗ Health check failed: {result}") |
|
|
self.results["errors"].append(f"Health check failed: {result}") |
|
|
return result |
|
|
|
|
|
def test_root(self) -> Dict[str, Any]: |
|
|
"""Test root endpoint.""" |
|
|
print("\n[2/10] Testing / endpoint...") |
|
|
result = self._make_request("GET", "/") |
|
|
self.results["endpoints_tested"].append({"endpoint": "/", "result": result}) |
|
|
if result and result.get("success"): |
|
|
data = result.get("data", {}) |
|
|
print(f" ✓ API info retrieved: {data.get('name')} v{data.get('version')}") |
|
|
else: |
|
|
print(f" ✗ Root endpoint failed: {result}") |
|
|
return result |
|
|
|
|
|
def test_models(self) -> Dict[str, Any]: |
|
|
"""Test models endpoint.""" |
|
|
print("\n[3/10] Testing /models endpoint...") |
|
|
result = self._make_request("GET", "/models") |
|
|
self.results["endpoints_tested"].append({"endpoint": "/models", "result": result}) |
|
|
if result and result.get("success"): |
|
|
data = result.get("data", {}) |
|
|
models = data.get("models", []) |
|
|
print(f" ✓ Found {len(models)} models") |
|
|
else: |
|
|
print(f" ✗ Models endpoint failed: {result}") |
|
|
return result |
|
|
|
|
|
def test_validate_sequence(self, sequence_dir: str) -> Dict[str, Any]: |
|
|
"""Test sequence validation endpoint.""" |
|
|
print("\n[4/10] Testing /api/v1/validate/sequence endpoint...") |
|
|
print(f" Using sequence: {sequence_dir}") |
|
|
|
|
|
payload = { |
|
|
"sequence_dir": sequence_dir, |
|
|
"model_name": None, |
|
|
"use_case": "ba_validation", |
|
|
"accept_threshold": 2.0, |
|
|
"reject_threshold": 30.0, |
|
|
} |
|
|
|
|
|
with profile_context(stage="validate_sequence", sequence_dir=sequence_dir): |
|
|
result = self._make_request("POST", "/api/v1/validate/sequence", json=payload) |
|
|
|
|
|
self.results["endpoints_tested"].append( |
|
|
{"endpoint": "/api/v1/validate/sequence", "result": result} |
|
|
) |
|
|
|
|
|
if result and result.get("success"): |
|
|
data = result.get("data", {}) |
|
|
job_id = data.get("job_id") |
|
|
print(f" ✓ Validation job queued: {job_id}") |
|
|
return {"job_id": job_id, "result": result} |
|
|
else: |
|
|
print(f" ✗ Validation failed: {result}") |
|
|
self.results["errors"].append(f"Sequence validation failed: {result}") |
|
|
return {"job_id": None, "result": result} |
|
|
|
|
|
def test_validate_arkit(self, arkit_dir: str) -> Dict[str, Any]: |
|
|
"""Test ARKit validation endpoint.""" |
|
|
print("\n[5/10] Testing /api/v1/validate/arkit endpoint...") |
|
|
print(f" Using ARKit dir: {arkit_dir}") |
|
|
|
|
|
payload = { |
|
|
"arkit_dir": arkit_dir, |
|
|
"output_dir": "data/test_arkit_output", |
|
|
"model_name": None, |
|
|
"max_frames": 10, |
|
|
"frame_interval": 1, |
|
|
"device": "cuda", |
|
|
"gui": False, |
|
|
} |
|
|
|
|
|
with profile_context(stage="validate_arkit", arkit_dir=arkit_dir): |
|
|
result = self._make_request("POST", "/api/v1/validate/arkit", json=payload) |
|
|
|
|
|
self.results["endpoints_tested"].append( |
|
|
{"endpoint": "/api/v1/validate/arkit", "result": result} |
|
|
) |
|
|
|
|
|
if result and result.get("success"): |
|
|
data = result.get("data", {}) |
|
|
job_id = data.get("job_id") |
|
|
print(f" ✓ ARKit validation job queued: {job_id}") |
|
|
return {"job_id": job_id, "result": result} |
|
|
else: |
|
|
print(f" ✗ ARKit validation failed: {result}") |
|
|
self.results["errors"].append(f"ARKit validation failed: {result}") |
|
|
return {"job_id": None, "result": result} |
|
|
|
|
|
def test_job_status(self, job_id: str) -> Dict[str, Any]: |
|
|
"""Test job status endpoint.""" |
|
|
if not job_id: |
|
|
return None |
|
|
|
|
|
print(f"\n[6/10] Testing /api/v1/jobs/{job_id} endpoint...") |
|
|
result = self._make_request("GET", f"/api/v1/jobs/{job_id}") |
|
|
|
|
|
if result and result.get("success"): |
|
|
data = result.get("data", {}) |
|
|
status = data.get("status", "unknown") |
|
|
print(f" ✓ Job status: {status}") |
|
|
return result |
|
|
else: |
|
|
print(f" ✗ Job status check failed: {result}") |
|
|
return result |
|
|
|
|
|
def test_list_jobs(self) -> Dict[str, Any]: |
|
|
"""Test list jobs endpoint.""" |
|
|
print("\n[7/10] Testing /api/v1/jobs endpoint...") |
|
|
result = self._make_request("GET", "/api/v1/jobs") |
|
|
|
|
|
self.results["endpoints_tested"].append({"endpoint": "/api/v1/jobs", "result": result}) |
|
|
|
|
|
if result and result.get("success"): |
|
|
data = result.get("data", {}) |
|
|
jobs = data.get("jobs", []) |
|
|
print(f" ✓ Found {len(jobs)} jobs") |
|
|
else: |
|
|
print(f" ✗ List jobs failed: {result}") |
|
|
return result |
|
|
|
|
|
def test_profiling_metrics(self) -> Dict[str, Any]: |
|
|
"""Test profiling metrics endpoint.""" |
|
|
print("\n[8/10] Testing /api/v1/profiling/metrics endpoint...") |
|
|
result = self._make_request("GET", "/api/v1/profiling/metrics") |
|
|
|
|
|
self.results["endpoints_tested"].append( |
|
|
{"endpoint": "/api/v1/profiling/metrics", "result": result} |
|
|
) |
|
|
|
|
|
if result and result.get("success"): |
|
|
data = result.get("data", {}) |
|
|
total_entries = data.get("total_entries", 0) |
|
|
print(f" ✓ Profiling metrics retrieved: {total_entries} entries") |
|
|
self.results["profiling"]["metrics"] = data |
|
|
else: |
|
|
print(f" ✗ Profiling metrics failed: {result}") |
|
|
return result |
|
|
|
|
|
def test_profiling_hot_paths(self) -> Dict[str, Any]: |
|
|
"""Test profiling hot paths endpoint.""" |
|
|
print("\n[9/10] Testing /api/v1/profiling/hot-paths endpoint...") |
|
|
result = self._make_request("GET", "/api/v1/profiling/hot-paths") |
|
|
|
|
|
self.results["endpoints_tested"].append( |
|
|
{"endpoint": "/api/v1/profiling/hot-paths", "result": result} |
|
|
) |
|
|
|
|
|
if result and result.get("success"): |
|
|
data = result.get("data", {}) |
|
|
hot_paths = data.get("hot_paths", []) |
|
|
print(f" ✓ Hot paths retrieved: {len(hot_paths)} paths") |
|
|
if hot_paths: |
|
|
print(" Top 5 hot paths:") |
|
|
for i, path in enumerate(hot_paths[:5], 1): |
|
|
print(f" {i}. {path.get('function')}: {path.get('total_time', 0):.3f}s") |
|
|
else: |
|
|
print(f" ✗ Hot paths failed: {result}") |
|
|
return result |
|
|
|
|
|
def test_profiling_latency(self) -> Dict[str, Any]: |
|
|
"""Test profiling latency endpoint.""" |
|
|
print("\n[10/11] Testing /api/v1/profiling/latency endpoint...") |
|
|
result = self._make_request("GET", "/api/v1/profiling/latency") |
|
|
|
|
|
self.results["endpoints_tested"].append( |
|
|
{"endpoint": "/api/v1/profiling/latency", "result": result} |
|
|
) |
|
|
|
|
|
if result and result.get("success"): |
|
|
data = result.get("data", {}) |
|
|
breakdown = data.get("breakdown", {}) |
|
|
print(f" ✓ Latency breakdown retrieved: {len(breakdown)} stages") |
|
|
if breakdown: |
|
|
print(" Stage breakdown:") |
|
|
for stage, stats in list(breakdown.items())[:5]: |
|
|
print( |
|
|
f" {stage}: {stats.get('avg_time', 0):.3f}s avg, " |
|
|
f"{stats.get('percentage', 0):.1f}% of total" |
|
|
) |
|
|
else: |
|
|
print(f" ✗ Latency breakdown failed: {result}") |
|
|
return result |
|
|
|
|
|
def test_profiling_system(self) -> Dict[str, Any]: |
|
|
"""Test profiling system metrics endpoint.""" |
|
|
print("\n[11/11] Testing /api/v1/profiling/system endpoint...") |
|
|
result = self._make_request("GET", "/api/v1/profiling/system") |
|
|
|
|
|
self.results["endpoints_tested"].append( |
|
|
{"endpoint": "/api/v1/profiling/system", "result": result} |
|
|
) |
|
|
|
|
|
if result and result.get("success"): |
|
|
data = result.get("data", {}) |
|
|
metrics = data.get("metrics", []) |
|
|
count = data.get("count", 0) |
|
|
print(f" ✓ System metrics retrieved: {count} samples") |
|
|
if metrics: |
|
|
latest = metrics[-1] |
|
|
print(" Latest metrics:") |
|
|
if latest.get("cpu_percent") is not None: |
|
|
print(f" CPU: {latest.get('cpu_percent'):.1f}%") |
|
|
if latest.get("memory_percent") is not None: |
|
|
print(f" Memory: {latest.get('memory_percent'):.1f}%") |
|
|
if latest.get("gpu_memory_used") is not None: |
|
|
print( |
|
|
f" GPU Memory: {latest.get('gpu_memory_used'):.1f} MB / " |
|
|
f"{latest.get('gpu_memory_total', 0):.1f} MB" |
|
|
) |
|
|
else: |
|
|
print(f" ✗ System metrics failed: {result}") |
|
|
return result |
|
|
|
|
|
def get_profiling_summary(self) -> Dict[str, Any]: |
|
|
"""Get local profiling summary.""" |
|
|
if profiler: |
|
|
metrics = profiler.get_metrics() |
|
|
latency = profiler.get_latency_breakdown() |
|
|
|
|
|
return {"local_profiler": {"metrics": metrics, "latency_breakdown": latency}} |
|
|
else: |
|
|
return {"local_profiler": {"metrics": {}, "latency_breakdown": {}}} |
|
|
|
|
|
def run_all_tests(self, sequence_dir: Optional[str] = None, arkit_dir: Optional[str] = None): |
|
|
"""Run all API tests.""" |
|
|
logger.info("=" * 80) |
|
|
logger.info("YLFF API Testing and Profiling") |
|
|
logger.info("=" * 80) |
|
|
logger.info(f"Base URL: {self.base_url}") |
|
|
logger.info(f"Start time: {self.results['start_time']}") |
|
|
logger.info(f"Sequence dir: {sequence_dir}") |
|
|
logger.info(f"ARKit dir: {arkit_dir}") |
|
|
print("=" * 80) |
|
|
print("YLFF API Testing and Profiling") |
|
|
print("=" * 80) |
|
|
print(f"Base URL: {self.base_url}") |
|
|
print(f"Start time: {self.results['start_time']}") |
|
|
|
|
|
|
|
|
self.test_health() |
|
|
self.test_root() |
|
|
self.test_models() |
|
|
|
|
|
|
|
|
validate_job_id = None |
|
|
if sequence_dir and Path(sequence_dir).exists(): |
|
|
validate_result = self.test_validate_sequence(sequence_dir) |
|
|
validate_job_id = validate_result.get("job_id") if validate_result else None |
|
|
else: |
|
|
print("\n[4/10] Skipping /api/v1/validate/sequence (no sequence dir provided)") |
|
|
|
|
|
if arkit_dir and Path(arkit_dir).exists(): |
|
|
arkit_result = self.test_validate_arkit(arkit_dir) |
|
|
|
|
|
if arkit_result: |
|
|
_ = arkit_result.get("job_id") |
|
|
else: |
|
|
print("\n[5/10] Skipping /api/v1/validate/arkit (no ARKit dir provided)") |
|
|
|
|
|
|
|
|
self.test_list_jobs() |
|
|
if validate_job_id: |
|
|
time.sleep(2) |
|
|
self.test_job_status(validate_job_id) |
|
|
|
|
|
|
|
|
self.test_profiling_metrics() |
|
|
self.test_profiling_hot_paths() |
|
|
self.test_profiling_latency() |
|
|
self.test_profiling_system() |
|
|
|
|
|
|
|
|
self.results["profiling"]["local"] = self.get_profiling_summary() |
|
|
|
|
|
|
|
|
self.results["end_time"] = datetime.now().isoformat() |
|
|
|
|
|
print("\n" + "=" * 80) |
|
|
print("Testing Complete") |
|
|
print("=" * 80) |
|
|
print(f"Endpoints tested: {len(self.results['endpoints_tested'])}") |
|
|
print(f"Errors: {len(self.results['errors'])}") |
|
|
if self.results["errors"]: |
|
|
print("\nErrors encountered:") |
|
|
for error in self.results["errors"]: |
|
|
print(f" - {error}") |
|
|
|
|
|
return self.results |
|
|
|
|
|
def save_results(self, output_path: Path): |
|
|
"""Save test results to JSON file.""" |
|
|
output_path.parent.mkdir(parents=True, exist_ok=True) |
|
|
with open(output_path, "w") as f: |
|
|
json.dump(self.results, f, indent=2, default=str) |
|
|
print(f"\nResults saved to: {output_path}") |
|
|
|
|
|
|
|
|
def find_test_data(project_root: Path) -> tuple[Optional[str], Optional[str]]: |
|
|
"""Find available test data in assets/ or data/ folders.""" |
|
|
|
|
|
assets_dir = project_root / "assets" |
|
|
if assets_dir.exists(): |
|
|
|
|
|
arkit_dirs = list(assets_dir.rglob("ARKit")) + list(assets_dir.rglob("arkit")) |
|
|
if arkit_dirs: |
|
|
arkit_dir = str(arkit_dirs[0]) |
|
|
else: |
|
|
arkit_dir = None |
|
|
|
|
|
|
|
|
image_dirs = [d for d in assets_dir.rglob("*") if d.is_dir() and any(d.glob("*.jpg"))] |
|
|
sequence_dir = str(image_dirs[0]) if image_dirs else None |
|
|
|
|
|
if arkit_dir or sequence_dir: |
|
|
return sequence_dir, arkit_dir |
|
|
|
|
|
|
|
|
data_dir = project_root / "data" |
|
|
if data_dir.exists(): |
|
|
|
|
|
arkit_test_dir = data_dir / "arkit_ba_validation" |
|
|
if arkit_test_dir.exists(): |
|
|
arkit_dir = str(arkit_test_dir) |
|
|
else: |
|
|
arkit_dir = None |
|
|
|
|
|
|
|
|
ba_images_dir = data_dir / "arkit_ba_validation" / "ba_work" / "images" |
|
|
if ba_images_dir.exists() and any(ba_images_dir.glob("*.jpg")): |
|
|
sequence_dir = str(ba_images_dir) |
|
|
else: |
|
|
sequence_dir = None |
|
|
|
|
|
return sequence_dir, arkit_dir |
|
|
|
|
|
return None, None |
|
|
|
|
|
|
|
|
def main(): |
|
|
parser = argparse.ArgumentParser(description="Test and profile YLFF API endpoints") |
|
|
parser.add_argument( |
|
|
"--base-url", |
|
|
default="http://localhost:8000", |
|
|
help="Base URL of the API server (default: http://localhost:8000)", |
|
|
) |
|
|
parser.add_argument( |
|
|
"--sequence-dir", |
|
|
type=str, |
|
|
help="Directory containing image sequence (auto-detected if not provided)", |
|
|
) |
|
|
parser.add_argument( |
|
|
"--arkit-dir", |
|
|
type=str, |
|
|
help="Directory containing ARKit data (auto-detected if not provided)", |
|
|
) |
|
|
parser.add_argument( |
|
|
"--output", |
|
|
type=str, |
|
|
default="data/api_test_results.json", |
|
|
help="Output path for test results (default: data/api_test_results.json)", |
|
|
) |
|
|
parser.add_argument( |
|
|
"--timeout", type=int, default=300, help="Request timeout in seconds (default: 300)" |
|
|
) |
|
|
|
|
|
args = parser.parse_args() |
|
|
|
|
|
|
|
|
sequence_dir = args.sequence_dir |
|
|
arkit_dir = args.arkit_dir |
|
|
|
|
|
if not sequence_dir or not arkit_dir: |
|
|
found_sequence, found_arkit = find_test_data(project_root) |
|
|
if not sequence_dir: |
|
|
sequence_dir = found_sequence |
|
|
if not arkit_dir: |
|
|
arkit_dir = found_arkit |
|
|
|
|
|
|
|
|
tester = APITester(base_url=args.base_url, timeout=args.timeout) |
|
|
results = tester.run_all_tests(sequence_dir=sequence_dir, arkit_dir=arkit_dir) |
|
|
|
|
|
|
|
|
output_path = project_root / args.output |
|
|
tester.save_results(output_path) |
|
|
|
|
|
|
|
|
if results.get("profiling", {}).get("local"): |
|
|
local_prof = results["profiling"]["local"]["local_profiler"] |
|
|
metrics = local_prof.get("metrics", {}) |
|
|
latency = local_prof.get("latency_breakdown", {}) |
|
|
|
|
|
print("\n" + "=" * 80) |
|
|
print("Profiling Summary") |
|
|
print("=" * 80) |
|
|
print(f"Total entries: {metrics.get('total_entries', 0)}") |
|
|
print(f"Stages tracked: {len(metrics.get('stage_stats', {}))}") |
|
|
print(f"Functions tracked: {len(metrics.get('function_stats', {}))}") |
|
|
|
|
|
if latency.get("breakdown"): |
|
|
print("\nLatency Breakdown:") |
|
|
for stage, stats in sorted( |
|
|
latency["breakdown"].items(), key=lambda x: x[1].get("total_time", 0), reverse=True |
|
|
)[:10]: |
|
|
print( |
|
|
f" {stage:30s} {stats.get('total_time', 0):8.3f}s " |
|
|
f"({stats.get('percentage', 0):5.1f}%) " |
|
|
f"avg: {stats.get('avg_time', 0):.3f}s " |
|
|
f"calls: {stats.get('call_count', 0)}" |
|
|
) |
|
|
|
|
|
return 0 if len(results.get("errors", [])) == 0 else 1 |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
sys.exit(main()) |
|
|
|