offrails / app /schemas.py
Omkar Sreekanth
Add FastAPI backend wrapping ML inference pipeline
6ea5be4
"""
Pydantic schemas for the Agent Trace Anomaly Detection API.
Matches the interface defined in scripts/inference.py:
TraceAnomalyDetector.predict() returns:
is_anomalous: bool
confidence: float
label: int (0=normal, 1=anomalous)
anomaly_signals: list[str]
features: dict (xgboost only)
"""
from pydantic import BaseModel, Field
from typing import Optional
# ── Request Schemas ──────────────────────────────────────────────────────────
class TraceMessage(BaseModel):
"""
A single message in an agent execution trace.
Accepts both ToolBench/ShareGPT format ('from'/'value')
and OpenAI format ('role'/'content').
"""
role: Optional[str] = Field(None, alias="from", description="Message role (OpenAI format)")
value: Optional[str] = Field(None, description="Message content (ShareGPT format)")
content: Optional[str] = Field(None, description="Message content (OpenAI format)")
model_config = {"populate_by_name": True}
def to_dict(self) -> dict:
"""Normalize to the format inference.py expects."""
d = {}
if self.role is not None:
d["from"] = self.role
if self.value is not None:
d["value"] = self.value
if self.content is not None:
d["value"] = self.content # map content β†’ value for ToolBench compat
if "from" not in d and self.role:
d["from"] = self.role
return d
class PredictRequest(BaseModel):
"""Request body for single-trace anomaly prediction."""
conversations: list[dict] = Field(
...,
description=(
"List of message dicts in ShareGPT/ToolBench format. "
"Each dict should have 'from' (role) and 'value' (content) keys."
),
)
model_config = {
"json_schema_extra": {
"examples": [
{
"conversations": [
{"from": "user", "value": "Find me flights from NYC to London"},
{"from": "assistant", "value": "I'll search for flights using the travel API."},
{"from": "function", "value": '{"flights": [{"price": 450}]}'},
{"from": "assistant", "value": "I found flights starting at $450."},
]
}
]
}
}
class PredictBatchRequest(BaseModel):
"""Request body for batch prediction on multiple traces."""
traces: list[list[dict]] = Field(
..., description="List of traces, each trace is a list of message dicts"
)
class ModelLoadRequest(BaseModel):
"""Request to load a specific model type."""
model_type: str = Field(
"xgboost",
description="Model to load: 'xgboost' or 'distilbert'",
pattern="^(xgboost|distilbert)$",
)
model_dir: str = Field("models", description="Path to saved models directory")
# ── Response Schemas ─────────────────────────────────────────────────────────
class PredictResponse(BaseModel):
"""Response from the anomaly detector β€” mirrors TraceAnomalyDetector.predict() output."""
is_anomalous: bool = Field(..., description="True if the trace is predicted anomalous")
confidence: float = Field(..., ge=0.0, le=1.0, description="Probability of anomaly")
label: int = Field(..., description="0 = normal, 1 = anomalous")
anomaly_signals: list[str] = Field(
default_factory=list,
description="Human-readable explanations of detected anomaly patterns",
)
model_used: str = Field(..., description="Which model produced this prediction")
features: Optional[dict] = Field(
None, description="Extracted feature values (xgboost only)"
)
class PredictBatchResponse(BaseModel):
"""Response for batch predictions."""
predictions: list[PredictResponse]
anomaly_count: int
total: int
class CompareResponse(BaseModel):
"""Side-by-side prediction from both models on the same trace."""
xgboost: Optional[PredictResponse] = None
distilbert: Optional[PredictResponse] = None
agreement: bool = Field(..., description="Whether both models agree on the label")
class HealthResponse(BaseModel):
status: str
loaded_model: Optional[str] = None
available_models: list[str]
model_dir: str