Spaces:
Running
Running
| import os | |
| import shutil | |
| from pathlib import Path | |
| from fastapi import APIRouter, HTTPException, Query, UploadFile, File | |
| from fastapi.responses import Response | |
| import openslide | |
| from app.models.schemas import SlideLoadRequest, SlideMetadata | |
| from app.services.slide_cache import get_slide, load_slide | |
| from app.services.tile_service import get_tile_jpeg, get_thumbnail_jpeg | |
| router = APIRouter() | |
| UPLOAD_DIR = Path( | |
| os.getenv( | |
| "TILE_SERVER_UPLOAD_DIR", | |
| Path(__file__).resolve().parents[2] / "data" / "uploads", | |
| ) | |
| ) | |
| UPLOAD_DIR.mkdir(parents=True, exist_ok=True) | |
| def load(slide_id: str, payload: SlideLoadRequest): | |
| try: | |
| load_slide(slide_id, payload.path) | |
| except Exception as exc: | |
| raise HTTPException(status_code=400, detail=str(exc)) | |
| return {"status": "loaded", "slide_id": slide_id} | |
| def upload(slide_id: str, file: UploadFile = File(...)): | |
| if not slide_id: | |
| raise HTTPException(status_code=400, detail="Slide id is required") | |
| filename = Path(file.filename or "") | |
| suffix = filename.suffix.lower() | |
| if suffix not in {".svs", ".tif", ".tiff"}: | |
| suffix = ".svs" | |
| dest_path = UPLOAD_DIR / f"{slide_id}{suffix}" | |
| try: | |
| with dest_path.open("wb") as buffer: | |
| shutil.copyfileobj(file.file, buffer) | |
| except Exception as exc: | |
| raise HTTPException(status_code=500, detail=f"Upload failed: {exc}") | |
| finally: | |
| try: | |
| file.file.close() | |
| except Exception: | |
| pass | |
| try: | |
| load_slide(slide_id, str(dest_path)) | |
| except Exception as exc: | |
| raise HTTPException(status_code=400, detail=str(exc)) | |
| return {"status": "uploaded", "slide_id": slide_id} | |
| def reload_uploaded(slide_id: str): | |
| if not slide_id: | |
| raise HTTPException(status_code=400, detail="Slide id is required") | |
| matches = list(UPLOAD_DIR.glob(f"{slide_id}.*")) | |
| if not matches: | |
| raise HTTPException(status_code=404, detail="Uploaded slide not found") | |
| try: | |
| load_slide(slide_id, str(matches[0])) | |
| except Exception as exc: | |
| raise HTTPException(status_code=400, detail=str(exc)) | |
| return {"status": "reloaded", "slide_id": slide_id} | |
| def metadata(slide_id: str): | |
| slide = get_slide(slide_id) | |
| if slide is None: | |
| raise HTTPException(status_code=404, detail="Slide not loaded") | |
| mpp_x = slide.properties.get(openslide.PROPERTY_NAME_MPP_X) | |
| mpp_y = slide.properties.get(openslide.PROPERTY_NAME_MPP_Y) | |
| def _to_float(value): | |
| try: | |
| return float(value) | |
| except (TypeError, ValueError): | |
| return None | |
| return SlideMetadata( | |
| width=slide.dimensions[0], | |
| height=slide.dimensions[1], | |
| level_count=slide.level_count, | |
| level_dimensions=[[int(w), int(h)] for w, h in slide.level_dimensions], | |
| level_downsamples=[float(d) for d in slide.level_downsamples], | |
| mpp_x=_to_float(mpp_x), | |
| mpp_y=_to_float(mpp_y), | |
| ) | |
| def tile( | |
| slide_id: str, | |
| level: int, | |
| x: int, | |
| y: int, | |
| tile_size: int = Query(256, ge=64, le=2048), | |
| channel: str = Query("original"), | |
| ): | |
| slide = get_slide(slide_id) | |
| if slide is None: | |
| raise HTTPException(status_code=404, detail="Slide not loaded") | |
| try: | |
| jpeg_bytes = get_tile_jpeg(slide, level, x, y, tile_size, channel) | |
| except Exception as exc: | |
| raise HTTPException(status_code=400, detail=str(exc)) | |
| return Response(content=jpeg_bytes, media_type="image/jpeg") | |
| def thumbnail( | |
| slide_id: str, | |
| size: int = Query(256, ge=64, le=1024), | |
| channel: str = Query("original"), | |
| ): | |
| slide = get_slide(slide_id) | |
| if slide is None: | |
| raise HTTPException(status_code=404, detail="Slide not loaded") | |
| try: | |
| jpeg_bytes = get_thumbnail_jpeg(slide, size, channel) | |
| except Exception as exc: | |
| raise HTTPException(status_code=400, detail=str(exc)) | |
| return Response(content=jpeg_bytes, media_type="image/jpeg") | |