""" api/routes/datasets.py — Dataset Discovery REST API (Cloud Registry Version). """ from __future__ import annotations from typing import Optional from datetime import datetime from fastapi import APIRouter, HTTPException, Query from adapters.roboflow_adapter import RoboflowAdapter from datasets import registry as ds_reg from models.dataset import ( Dataset, DatasetSummary, RoboflowSearchRequest, ) from models.analytics import DatasetAnalytics, SplitAnalytics, QualityIssues, ClassDistributionItem from observability.logger import audit, get_logger log = get_logger("datasets_route") router = APIRouter(prefix="/datasets", tags=["datasets"]) # ── Analytics ───────────────────────────────────────────────────────────────── @router.get("/{dataset_id}/analytics", response_model=DatasetAnalytics) async def get_dataset_analytics(dataset_id: str): """ Fetch comprehensive analytics for a dataset. """ ds = await ds_reg.get_dataset(dataset_id) if not ds: raise HTTPException(404, f"Dataset {dataset_id!r} not found") stats = ds.stats analytics = DatasetAnalytics( dataset_id=dataset_id, healthScore=stats.health_score, split=SplitAnalytics( train=stats.split.train if stats.split.total > 0 else 70, val=stats.split.val if stats.split.total > 0 else 20, test=stats.split.test if stats.split.total > 0 else 10 ), qualityIssues=QualityIssues( missingLabels=stats.missing_labels, emptyImages=stats.empty_images, duplicates=stats.duplicate_count, outliers=int(ds.images * 0.005) # placeholder ), classDistribution=[ ClassDistributionItem(name=name, count=int(ds.images / ds.classes) if ds.classes > 0 else 0) for name in ds.class_names[:20] ] ) return analytics # ── List / Search datasets ──────────────────────────────────────────────────── @router.get("", response_model=list[DatasetSummary]) async def list_datasets( task: Optional[str] = Query(None), format: Optional[str] = Query(None), source: Optional[str] = Query(None), status: Optional[str] = Query(None), search: Optional[str] = Query(None), starred: Optional[bool] = Query(None), limit: int = Query(100, ge=1, le=1000), offset: int = Query(0, ge=0), ): try: datasets = await ds_reg.get_all_datasets( task=task, format=format, source=source, status=status, search=search, starred=starred, limit=limit, offset=offset, ) return [_to_summary(d) for d in datasets] except Exception as exc: log.exception("list_datasets_error") raise HTTPException(status_code=500, detail=str(exc)) # ── Roboflow Search & Sync ──────────────────────────────────────────────────── @router.post("/search/roboflow", response_model=list[DatasetSummary]) async def search_roboflow(req: RoboflowSearchRequest): """ Live search Roboflow Universe. Results are cached for 1 hour. """ try: datasets = await RoboflowAdapter.search_datasets( api_key = req.api_key, query = req.query, workspace = req.workspace, page = req.page, page_size = req.page_size, ) except Exception as exc: log.error("roboflow_search_error", error=str(exc)) raise HTTPException(502, f"Roboflow API error: {exc}") # Upsert to registry DB await ds_reg.bulk_upsert_datasets(datasets) await audit("roboflow_search", {"query": req.query, "count": len(datasets)}) return [_to_summary(d) for d in datasets] @router.post("/sync/roboflow", response_model=dict) async def sync_roboflow_workspace( api_key: str = Query(..., description="Roboflow API key"), workspace: str = Query(..., description="Workspace slug"), ): """Sync all datasets from a Roboflow workspace into the global registry.""" try: datasets = await RoboflowAdapter.list_workspace_datasets(api_key, workspace) except Exception as exc: raise HTTPException(502, f"Roboflow API error: {exc}") count = await ds_reg.bulk_upsert_datasets(datasets) return {"synced": count, "workspace": workspace} # ── Dataset detail ──────────────────────────────────────────────────────────── @router.get("/{dataset_id}", response_model=Dataset) async def get_dataset(dataset_id: str): ds = await ds_reg.get_dataset(dataset_id) if not ds: raise HTTPException(404, f"Dataset {dataset_id!r} not found") return ds # ── Star ───────────────────────────────────────────────────────────── @router.post("/{dataset_id}/star", response_model=dict) async def toggle_star(dataset_id: str): new_val = await ds_reg.toggle_starred(dataset_id) return {"dataset_id": dataset_id, "starred": new_val} # ── Helper ──────────────────────────────────────────────────────────────────── def _to_summary(d: Dataset) -> DatasetSummary: health_score = 0.0 try: if hasattr(d, 'stats') and d.stats: health_score = getattr(d.stats, 'health_score', 0.0) except Exception: pass return DatasetSummary( id = d.id, name = d.name, task = str(d.task), format = str(d.format), source = str(d.source), status = str(d.status), images = d.images, classes = d.classes, size_label = d.size_label, tags = d.tags, starred = d.starred, import_progress = d.import_progress, health_score = health_score, created_at = d.created_at, updated_at = d.updated_at, )