RM / app\api\v1\data.py
Bromeo777's picture
Add app\api\v1\data.py
3185fd9 verified
import hashlib
import time
import os # Added for secure path handling
from typing import List, Dict, Any, Optional
from fastapi import APIRouter, Depends, HTTPException, status, BackgroundTasks, UploadFile, File
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from app.api import deps
from app.models.data import Dataset, DataCleaningJob, DataJobStatus
from app.schemas.data import (
DatasetResponse,
DataCleaningJobResponse,
DataCleaningJobCreate,
# DataProfileRequest removed (Dead Code Cleanup)
DataQualityReport,
ImputationRequest
)
from app.tasks.datapure_jobs import trigger_datapure_job
from app.services.datapure.engine import DataPureEngine
router = APIRouter()
engine = DataPureEngine()
@router.post("/upload", response_model=DatasetResponse, status_code=status.HTTP_201_CREATED)
async def upload_research_dataset(
background_tasks: BackgroundTasks,
file: UploadFile = File(...),
db: AsyncSession = Depends(deps.get_db),
current_user = Depends(deps.get_current_active_user)
):
"""
Stage 1: Intelligent Ingestion.
Supports CSV, Excel, and SPSS formats with chunked processing for 1M row scale.
"""
# 1. Securely handle file storage [cite: 19]
content = await file.read()
file_id = hashlib.sha256(f"{current_user.id}:{file.filename}:{time.time()}".encode()).hexdigest()[:16]
# Path Traversal Fix: Sanitize the filename to prevent ../ sequences [cite: 20-21]
safe_filename = os.path.basename(file.filename)
storage_path = f"storage/datasets/{file_id}_{safe_filename}"
# 2. Create Dataset Record
new_dataset = Dataset(
id=file_id,
user_id=current_user.id,
filename=safe_filename,
storage_path=storage_path,
institution_id=getattr(current_user, 'institution_id', None)
)
db.add(new_dataset)
await db.commit()
await db.refresh(new_dataset)
# 3. Queue Stage 2 & 3: Profiling and Quality Diagnostics automatically
job_id = f"job_{file_id}"
background_tasks.add_task(
trigger_datapure_job,
dataset_id=file_id,
job_id=job_id,
study_design="General"
)
return new_dataset
@router.post("/clean", response_model=DataCleaningJobResponse, status_code=status.HTTP_202_ACCEPTED)
async def initiate_cleaning_protocol(
req: DataCleaningJobCreate,
background_tasks: BackgroundTasks,
db: AsyncSession = Depends(deps.get_db),
current_user = Depends(deps.get_current_active_user)
):
"""
Stage 4: Cleaning Orchestration.
"""
result = await db.execute(
select(Dataset).where(Dataset.id == req.dataset_id, Dataset.user_id == current_user.id)
)
dataset = result.scalar_one_or_none()
if not dataset:
raise HTTPException(status_code=404, detail="Dataset not found")
job_id = hashlib.sha256(f"{req.dataset_id}:{time.time()}".encode()).hexdigest()[:16]
new_job = DataCleaningJob(
id=job_id,
dataset_id=req.dataset_id,
status=DataJobStatus.PENDING,
study_design=req.study_design
)
db.add(new_job)
await db.commit()
background_tasks.add_task(
trigger_datapure_job,
dataset_id=req.dataset_id,
job_id=job_id,
study_design=req.study_design
)
return new_job
@router.get("/jobs/{job_id}", response_model=DataCleaningJobResponse)
async def get_cleaning_status(
job_id: str,
db: AsyncSession = Depends(deps.get_db),
current_user = Depends(deps.get_current_active_user)
):
result = await db.execute(
select(DataCleaningJob).where(DataCleaningJob.id == job_id)
)
job = result.scalar_one_or_none()
if not job:
raise HTTPException(status_code=404, detail="Cleaning job not found")
return job
@router.post("/impute", status_code=status.HTTP_202_ACCEPTED)
async def trigger_mice_imputation(
req: ImputationRequest,
db: AsyncSession = Depends(deps.get_db),
current_user = Depends(deps.get_current_active_user)
):
status_update = await engine.run_mice_imputation(req)
return status_update
@router.get("/diagnostics/{dataset_id}", response_model=DataQualityReport)
async def get_quality_diagnostics(
dataset_id: str,
db: AsyncSession = Depends(deps.get_db),
current_user = Depends(deps.get_current_active_user)
):
result = await db.execute(select(Dataset).where(Dataset.id == dataset_id))
dataset = result.scalar_one_or_none()
if not dataset or not dataset.column_metadata:
raise HTTPException(status_code=404, detail="Diagnostics not yet available")
return dataset.column_metadata