File size: 8,524 Bytes
ac5551d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
"""
models/benchmark.py β€” Pydantic domain models for the Benchmark Bridge System.
Single source of truth for all benchmark-related data shapes across API,
execution engine, and database layer.
"""
from __future__ import annotations

import json
from typing import Any

from pydantic import BaseModel, Field, ConfigDict


# ── Input ─────────────────────────────────────────────────────────────────────

class BenchmarkContext(BaseModel):
    """Payload the UI sends to initiate a benchmark run."""
    model_config = ConfigDict(protected_namespaces=())
    model_id:   str
    dataset_id: str
    task:       str
    framework:  str
    hardware:   str = "cpu"
    precision:  str = "FP32"
    batch_size: int = Field(1, ge=1, le=512)
    # Task-specific overrides
    max_tokens:      int | None = 512
    sequence_length: int | None = 512
    img_size:        int | None = 640
    vid_stride:      int | None = 1
    stream:          bool | None = False
    input_source:    str | None = "dataset"
    video_path:      str | None = None
    rtsp_url:        str | None = None
    # Object Detection live preview data
    detections:      list[dict[str, Any]] = Field(default_factory=list)


# ── Validation ────────────────────────────────────────────────────────────────

class ValidationCheck(BaseModel):
    """Result of a single compatibility gate."""
    name:       str
    passed:     bool
    detail:     str
    suggestion: str | None = None


class ValidationReport(BaseModel):
    """Aggregated result of all compatibility checks for a model+dataset pair."""
    model_config = ConfigDict(protected_namespaces=())
    model_id:   str
    dataset_id: str
    passed:     bool                          # True only if ALL checks pass
    checks:     list[ValidationCheck]
    errors:     list[str]                     # details from failed checks
    warnings:   list[str] = Field(default_factory=list)


# ── Metrics ───────────────────────────────────────────────────────────────────

class BenchmarkMetrics(BaseModel):
    """Task-specific + hardware performance metrics from a completed run."""
    # Detection / Segmentation
    mAP:             float | None = None
    mAP_50:          float | None = None
    mAP_50_95:       float | None = None
    # Classification
    accuracy:        float | None = None
    top1:            float | None = None
    top5:            float | None = None
    # Segmentation
    iou_mean:        float | None = None
    # NLP / Generation
    rouge_l:         float | None = None
    bleu:            float | None = None
    perplexity:      float | None = None
    tokens_per_sec:  float | None = None
    # Throughput & Latency
    fps:             float | None = None
    latency_mean_ms: float | None = None
    latency_p95_ms:  float | None = None
    latency_p99_ms:  float | None = None
    # Memory
    vram_peak_gb:    float | None = None
    vram_avg_gb:     float | None = None
    # Dataset info
    total_images:    int | None = None
    total_tokens:    int | None = None
    batch_size:      int | None = None

    class Config:
        extra = "allow"


# ── Telemetry ─────────────────────────────────────────────────────────────────

class TelemetrySample(BaseModel):
    """Single hardware reading captured during benchmark execution."""
    timestamp:     float          # Unix epoch seconds
    gpu_util_pct:  float = 0.0   # 0–100
    vram_used_gb:  float = 0.0
    vram_total_gb: float = 0.0
    temp_c:        float = 0.0
    power_w:       float = 0.0
    batch_idx:     int   = 0
    progress:      float = 0.0   # 0.0–1.0
    # Optional task-specific live data (e.g. BBoxes for detection)
    live_data:     dict[str, Any] = Field(default_factory=dict)
    detections:    list[dict[str, Any]] = Field(default_factory=list)


class LayerBreakdown(BaseModel):
    """Single layer entry in a bottleneck analysis."""
    name:    str
    time_ms: float
    percent: float


class TelemetrySummary(BaseModel):
    """Aggregated telemetry statistics over the full benchmark run."""
    gpu_util_avg:   float = 0.0
    gpu_util_peak:  float = 0.0
    vram_avg_gb:    float = 0.0
    vram_peak_gb:   float = 0.0
    temp_avg_c:     float = 0.0
    temp_peak_c:    float = 0.0
    power_avg_w:    float = 0.0
    power_peak_w:   float = 0.0
    layer_breakdown: list[LayerBreakdown] = Field(default_factory=list)


# ── Job & Result ──────────────────────────────────────────────────────────────

class BenchmarkJob(BaseModel):
    id:         str
    model_config = ConfigDict(protected_namespaces=())
    model_id:   str
    dataset_id: str
    task:       str
    framework:  str
    hardware:   str
    precision:  str
    batch_size: int
    config:     dict = Field(default_factory=dict)
    status:     str  = "queued"   # queued|running|completed|failed
    progress:   float = 0.0
    logs:       list[str] = Field(default_factory=list)
    created_at: str | None = None
    updated_at: str | None = None
    started_at: str | None = None
    ended_at:   str | None = None
    last_telemetry: TelemetrySample | None = None


class BenchmarkResult(BaseModel):
    model_config = ConfigDict(protected_namespaces=())
    id:                str
    job_id:            str
    metrics:           BenchmarkMetrics
    telemetry_summary: TelemetrySummary
    created_at:        str | None = None
    # Denormalized from Job for UI efficiency
    model_id:          str | None = None
    dataset_id:        str | None = None
    task:              str | None = None
    framework:         str | None = None
    hardware:          str | None = None
    precision:         str | None = None


# ── API Responses ─────────────────────────────────────────────────────────────

class BenchmarkRunResponse(BaseModel):
    job_id:  str
    status:  str
    message: str


# ── DB Row helpers ────────────────────────────────────────────────────────────

def row_to_job(row: Any) -> BenchmarkJob:
    d = dict(row)
    cfg = json.loads(d.get("config") or "{}")
    return BenchmarkJob(
        id         = d["id"],
        model_id   = d["model_id"],
        dataset_id = d["dataset_id"],
        task       = d["task"],
        framework  = d["framework"],
        hardware   = d["hardware"],
        precision  = d["precision"],
        batch_size = d["batch_size"],
        config     = cfg,
        status     = d["status"],
        progress   = float(d.get("progress", 0.0)),
        logs       = json.loads(d.get("logs") or "[]"),
        error      = d.get("error"),
        created_at = d.get("created_at"),
        updated_at = d.get("updated_at"),
        started_at = d.get("started_at"),
        ended_at   = d.get("ended_at"),
        last_telemetry = TelemetrySample(**json.loads(d.get("last_telemetry") or "{}")) if d.get("last_telemetry") else None,
    )


def row_to_result(row: Any) -> BenchmarkResult:
    d = dict(row)
    metrics_raw   = json.loads(d.get("metrics") or "{}")
    telemetry_raw = json.loads(d.get("telemetry_summary") or "{}")
    return BenchmarkResult(
        id                = d["id"],
        job_id            = d["job_id"],
        metrics           = BenchmarkMetrics(**metrics_raw),
        telemetry_summary = TelemetrySummary(**telemetry_raw),
        created_at        = d.get("created_at"),
        model_id          = d.get("model_id"),
        dataset_id        = d.get("dataset_id"),
        task              = d.get("task"),
        framework         = d.get("framework"),
        hardware          = d.get("hardware"),
        precision         = d.get("precision"),
    )