mlforge / api /routes /datasets.py
senthil2421
fix: remove project dependencies and redundant imports to fix server startup
8302f42
"""
api/routes/datasets.py β€” Dataset Discovery REST API (Cloud Registry Version).
"""
from __future__ import annotations
from typing import Optional
from datetime import datetime
from fastapi import APIRouter, HTTPException, Query
from adapters.roboflow_adapter import RoboflowAdapter
from datasets import registry as ds_reg
from models.dataset import (
Dataset, DatasetSummary, RoboflowSearchRequest,
)
from models.analytics import DatasetAnalytics, SplitAnalytics, QualityIssues, ClassDistributionItem
from observability.logger import audit, get_logger
log = get_logger("datasets_route")
router = APIRouter(prefix="/datasets", tags=["datasets"])
# ── Analytics ─────────────────────────────────────────────────────────────────
@router.get("/{dataset_id}/analytics", response_model=DatasetAnalytics)
async def get_dataset_analytics(dataset_id: str):
"""
Fetch comprehensive analytics for a dataset.
"""
ds = await ds_reg.get_dataset(dataset_id)
if not ds:
raise HTTPException(404, f"Dataset {dataset_id!r} not found")
stats = ds.stats
analytics = DatasetAnalytics(
dataset_id=dataset_id,
healthScore=stats.health_score,
split=SplitAnalytics(
train=stats.split.train if stats.split.total > 0 else 70,
val=stats.split.val if stats.split.total > 0 else 20,
test=stats.split.test if stats.split.total > 0 else 10
),
qualityIssues=QualityIssues(
missingLabels=stats.missing_labels,
emptyImages=stats.empty_images,
duplicates=stats.duplicate_count,
outliers=int(ds.images * 0.005) # placeholder
),
classDistribution=[
ClassDistributionItem(name=name, count=int(ds.images / ds.classes) if ds.classes > 0 else 0)
for name in ds.class_names[:20]
]
)
return analytics
# ── List / Search datasets ────────────────────────────────────────────────────
@router.get("", response_model=list[DatasetSummary])
async def list_datasets(
task: Optional[str] = Query(None),
format: Optional[str] = Query(None),
source: Optional[str] = Query(None),
status: Optional[str] = Query(None),
search: Optional[str] = Query(None),
starred: Optional[bool] = Query(None),
limit: int = Query(100, ge=1, le=1000),
offset: int = Query(0, ge=0),
):
try:
datasets = await ds_reg.get_all_datasets(
task=task, format=format, source=source,
status=status, search=search, starred=starred,
limit=limit, offset=offset,
)
return [_to_summary(d) for d in datasets]
except Exception as exc:
log.exception("list_datasets_error")
raise HTTPException(status_code=500, detail=str(exc))
# ── Roboflow Search & Sync ────────────────────────────────────────────────────
@router.post("/search/roboflow", response_model=list[DatasetSummary])
async def search_roboflow(req: RoboflowSearchRequest):
"""
Live search Roboflow Universe. Results are cached for 1 hour.
"""
try:
datasets = await RoboflowAdapter.search_datasets(
api_key = req.api_key,
query = req.query,
workspace = req.workspace,
page = req.page,
page_size = req.page_size,
)
except Exception as exc:
log.error("roboflow_search_error", error=str(exc))
raise HTTPException(502, f"Roboflow API error: {exc}")
# Upsert to registry DB
await ds_reg.bulk_upsert_datasets(datasets)
await audit("roboflow_search", {"query": req.query, "count": len(datasets)})
return [_to_summary(d) for d in datasets]
@router.post("/sync/roboflow", response_model=dict)
async def sync_roboflow_workspace(
api_key: str = Query(..., description="Roboflow API key"),
workspace: str = Query(..., description="Workspace slug"),
):
"""Sync all datasets from a Roboflow workspace into the global registry."""
try:
datasets = await RoboflowAdapter.list_workspace_datasets(api_key, workspace)
except Exception as exc:
raise HTTPException(502, f"Roboflow API error: {exc}")
count = await ds_reg.bulk_upsert_datasets(datasets)
return {"synced": count, "workspace": workspace}
# ── Dataset detail ────────────────────────────────────────────────────────────
@router.get("/{dataset_id}", response_model=Dataset)
async def get_dataset(dataset_id: str):
ds = await ds_reg.get_dataset(dataset_id)
if not ds:
raise HTTPException(404, f"Dataset {dataset_id!r} not found")
return ds
# ── Star ─────────────────────────────────────────────────────────────
@router.post("/{dataset_id}/star", response_model=dict)
async def toggle_star(dataset_id: str):
new_val = await ds_reg.toggle_starred(dataset_id)
return {"dataset_id": dataset_id, "starred": new_val}
# ── Helper ────────────────────────────────────────────────────────────────────
def _to_summary(d: Dataset) -> DatasetSummary:
health_score = 0.0
try:
if hasattr(d, 'stats') and d.stats:
health_score = getattr(d.stats, 'health_score', 0.0)
except Exception:
pass
return DatasetSummary(
id = d.id,
name = d.name,
task = str(d.task),
format = str(d.format),
source = str(d.source),
status = str(d.status),
images = d.images,
classes = d.classes,
size_label = d.size_label,
tags = d.tags,
starred = d.starred,
import_progress = d.import_progress,
health_score = health_score,
created_at = d.created_at,
updated_at = d.updated_at,
)