Spaces:
Sleeping
Sleeping
File size: 8,524 Bytes
ac5551d | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 | """
models/benchmark.py β Pydantic domain models for the Benchmark Bridge System.
Single source of truth for all benchmark-related data shapes across API,
execution engine, and database layer.
"""
from __future__ import annotations
import json
from typing import Any
from pydantic import BaseModel, Field, ConfigDict
# ββ Input βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
class BenchmarkContext(BaseModel):
"""Payload the UI sends to initiate a benchmark run."""
model_config = ConfigDict(protected_namespaces=())
model_id: str
dataset_id: str
task: str
framework: str
hardware: str = "cpu"
precision: str = "FP32"
batch_size: int = Field(1, ge=1, le=512)
# Task-specific overrides
max_tokens: int | None = 512
sequence_length: int | None = 512
img_size: int | None = 640
vid_stride: int | None = 1
stream: bool | None = False
input_source: str | None = "dataset"
video_path: str | None = None
rtsp_url: str | None = None
# Object Detection live preview data
detections: list[dict[str, Any]] = Field(default_factory=list)
# ββ Validation ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
class ValidationCheck(BaseModel):
"""Result of a single compatibility gate."""
name: str
passed: bool
detail: str
suggestion: str | None = None
class ValidationReport(BaseModel):
"""Aggregated result of all compatibility checks for a model+dataset pair."""
model_config = ConfigDict(protected_namespaces=())
model_id: str
dataset_id: str
passed: bool # True only if ALL checks pass
checks: list[ValidationCheck]
errors: list[str] # details from failed checks
warnings: list[str] = Field(default_factory=list)
# ββ Metrics βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
class BenchmarkMetrics(BaseModel):
"""Task-specific + hardware performance metrics from a completed run."""
# Detection / Segmentation
mAP: float | None = None
mAP_50: float | None = None
mAP_50_95: float | None = None
# Classification
accuracy: float | None = None
top1: float | None = None
top5: float | None = None
# Segmentation
iou_mean: float | None = None
# NLP / Generation
rouge_l: float | None = None
bleu: float | None = None
perplexity: float | None = None
tokens_per_sec: float | None = None
# Throughput & Latency
fps: float | None = None
latency_mean_ms: float | None = None
latency_p95_ms: float | None = None
latency_p99_ms: float | None = None
# Memory
vram_peak_gb: float | None = None
vram_avg_gb: float | None = None
# Dataset info
total_images: int | None = None
total_tokens: int | None = None
batch_size: int | None = None
class Config:
extra = "allow"
# ββ Telemetry βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
class TelemetrySample(BaseModel):
"""Single hardware reading captured during benchmark execution."""
timestamp: float # Unix epoch seconds
gpu_util_pct: float = 0.0 # 0β100
vram_used_gb: float = 0.0
vram_total_gb: float = 0.0
temp_c: float = 0.0
power_w: float = 0.0
batch_idx: int = 0
progress: float = 0.0 # 0.0β1.0
# Optional task-specific live data (e.g. BBoxes for detection)
live_data: dict[str, Any] = Field(default_factory=dict)
detections: list[dict[str, Any]] = Field(default_factory=list)
class LayerBreakdown(BaseModel):
"""Single layer entry in a bottleneck analysis."""
name: str
time_ms: float
percent: float
class TelemetrySummary(BaseModel):
"""Aggregated telemetry statistics over the full benchmark run."""
gpu_util_avg: float = 0.0
gpu_util_peak: float = 0.0
vram_avg_gb: float = 0.0
vram_peak_gb: float = 0.0
temp_avg_c: float = 0.0
temp_peak_c: float = 0.0
power_avg_w: float = 0.0
power_peak_w: float = 0.0
layer_breakdown: list[LayerBreakdown] = Field(default_factory=list)
# ββ Job & Result ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
class BenchmarkJob(BaseModel):
id: str
model_config = ConfigDict(protected_namespaces=())
model_id: str
dataset_id: str
task: str
framework: str
hardware: str
precision: str
batch_size: int
config: dict = Field(default_factory=dict)
status: str = "queued" # queued|running|completed|failed
progress: float = 0.0
logs: list[str] = Field(default_factory=list)
created_at: str | None = None
updated_at: str | None = None
started_at: str | None = None
ended_at: str | None = None
last_telemetry: TelemetrySample | None = None
class BenchmarkResult(BaseModel):
model_config = ConfigDict(protected_namespaces=())
id: str
job_id: str
metrics: BenchmarkMetrics
telemetry_summary: TelemetrySummary
created_at: str | None = None
# Denormalized from Job for UI efficiency
model_id: str | None = None
dataset_id: str | None = None
task: str | None = None
framework: str | None = None
hardware: str | None = None
precision: str | None = None
# ββ API Responses βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
class BenchmarkRunResponse(BaseModel):
job_id: str
status: str
message: str
# ββ DB Row helpers ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def row_to_job(row: Any) -> BenchmarkJob:
d = dict(row)
cfg = json.loads(d.get("config") or "{}")
return BenchmarkJob(
id = d["id"],
model_id = d["model_id"],
dataset_id = d["dataset_id"],
task = d["task"],
framework = d["framework"],
hardware = d["hardware"],
precision = d["precision"],
batch_size = d["batch_size"],
config = cfg,
status = d["status"],
progress = float(d.get("progress", 0.0)),
logs = json.loads(d.get("logs") or "[]"),
error = d.get("error"),
created_at = d.get("created_at"),
updated_at = d.get("updated_at"),
started_at = d.get("started_at"),
ended_at = d.get("ended_at"),
last_telemetry = TelemetrySample(**json.loads(d.get("last_telemetry") or "{}")) if d.get("last_telemetry") else None,
)
def row_to_result(row: Any) -> BenchmarkResult:
d = dict(row)
metrics_raw = json.loads(d.get("metrics") or "{}")
telemetry_raw = json.loads(d.get("telemetry_summary") or "{}")
return BenchmarkResult(
id = d["id"],
job_id = d["job_id"],
metrics = BenchmarkMetrics(**metrics_raw),
telemetry_summary = TelemetrySummary(**telemetry_raw),
created_at = d.get("created_at"),
model_id = d.get("model_id"),
dataset_id = d.get("dataset_id"),
task = d.get("task"),
framework = d.get("framework"),
hardware = d.get("hardware"),
precision = d.get("precision"),
)
|