| import hashlib |
| import time |
| import os |
| from typing import List, Dict, Any, Optional |
|
|
| from fastapi import APIRouter, Depends, HTTPException, status, BackgroundTasks, UploadFile, File |
| from sqlalchemy.ext.asyncio import AsyncSession |
| from sqlalchemy import select |
|
|
| from app.api import deps |
| from app.models.data import Dataset, DataCleaningJob, DataJobStatus |
| from app.schemas.data import ( |
| DatasetResponse, |
| DataCleaningJobResponse, |
| DataCleaningJobCreate, |
| |
| DataQualityReport, |
| ImputationRequest |
| ) |
| from app.tasks.datapure_jobs import trigger_datapure_job |
| from app.services.datapure.engine import DataPureEngine |
|
|
| router = APIRouter() |
| engine = DataPureEngine() |
|
|
| @router.post("/upload", response_model=DatasetResponse, status_code=status.HTTP_201_CREATED) |
| async def upload_research_dataset( |
| background_tasks: BackgroundTasks, |
| file: UploadFile = File(...), |
| db: AsyncSession = Depends(deps.get_db), |
| current_user = Depends(deps.get_current_active_user) |
| ): |
| """ |
| Stage 1: Intelligent Ingestion. |
| Supports CSV, Excel, and SPSS formats with chunked processing for 1M row scale. |
| """ |
| |
| content = await file.read() |
| file_id = hashlib.sha256(f"{current_user.id}:{file.filename}:{time.time()}".encode()).hexdigest()[:16] |
| |
| |
| safe_filename = os.path.basename(file.filename) |
| storage_path = f"storage/datasets/{file_id}_{safe_filename}" |
| |
| |
| new_dataset = Dataset( |
| id=file_id, |
| user_id=current_user.id, |
| filename=safe_filename, |
| storage_path=storage_path, |
| institution_id=getattr(current_user, 'institution_id', None) |
| ) |
| |
| db.add(new_dataset) |
| await db.commit() |
| await db.refresh(new_dataset) |
|
|
| |
| job_id = f"job_{file_id}" |
| |
| background_tasks.add_task( |
| trigger_datapure_job, |
| dataset_id=file_id, |
| job_id=job_id, |
| study_design="General" |
| ) |
|
|
| return new_dataset |
|
|
| @router.post("/clean", response_model=DataCleaningJobResponse, status_code=status.HTTP_202_ACCEPTED) |
| async def initiate_cleaning_protocol( |
| req: DataCleaningJobCreate, |
| background_tasks: BackgroundTasks, |
| db: AsyncSession = Depends(deps.get_db), |
| current_user = Depends(deps.get_current_active_user) |
| ): |
| """ |
| Stage 4: Cleaning Orchestration. |
| """ |
| result = await db.execute( |
| select(Dataset).where(Dataset.id == req.dataset_id, Dataset.user_id == current_user.id) |
| ) |
| dataset = result.scalar_one_or_none() |
| if not dataset: |
| raise HTTPException(status_code=404, detail="Dataset not found") |
|
|
| job_id = hashlib.sha256(f"{req.dataset_id}:{time.time()}".encode()).hexdigest()[:16] |
| new_job = DataCleaningJob( |
| id=job_id, |
| dataset_id=req.dataset_id, |
| status=DataJobStatus.PENDING, |
| study_design=req.study_design |
| ) |
| db.add(new_job) |
| await db.commit() |
|
|
| background_tasks.add_task( |
| trigger_datapure_job, |
| dataset_id=req.dataset_id, |
| job_id=job_id, |
| study_design=req.study_design |
| ) |
|
|
| return new_job |
|
|
| @router.get("/jobs/{job_id}", response_model=DataCleaningJobResponse) |
| async def get_cleaning_status( |
| job_id: str, |
| db: AsyncSession = Depends(deps.get_db), |
| current_user = Depends(deps.get_current_active_user) |
| ): |
| result = await db.execute( |
| select(DataCleaningJob).where(DataCleaningJob.id == job_id) |
| ) |
| job = result.scalar_one_or_none() |
| if not job: |
| raise HTTPException(status_code=404, detail="Cleaning job not found") |
| |
| return job |
|
|
| @router.post("/impute", status_code=status.HTTP_202_ACCEPTED) |
| async def trigger_mice_imputation( |
| req: ImputationRequest, |
| db: AsyncSession = Depends(deps.get_db), |
| current_user = Depends(deps.get_current_active_user) |
| ): |
| status_update = await engine.run_mice_imputation(req) |
| return status_update |
|
|
| @router.get("/diagnostics/{dataset_id}", response_model=DataQualityReport) |
| async def get_quality_diagnostics( |
| dataset_id: str, |
| db: AsyncSession = Depends(deps.get_db), |
| current_user = Depends(deps.get_current_active_user) |
| ): |
| result = await db.execute(select(Dataset).where(Dataset.id == dataset_id)) |
| dataset = result.scalar_one_or_none() |
| |
| if not dataset or not dataset.column_metadata: |
| raise HTTPException(status_code=404, detail="Diagnostics not yet available") |
| |
| return dataset.column_metadata |
|
|