Bromeo777 commited on
Commit
3185fd9
·
verified ·
1 Parent(s): 8bd5f4a

Add app\api\v1\data.py

Browse files
Files changed (1) hide show
  1. app//api//v1//data.py +142 -0
app//api//v1//data.py ADDED
@@ -0,0 +1,142 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import hashlib
2
+ import time
3
+ import os # Added for secure path handling
4
+ from typing import List, Dict, Any, Optional
5
+
6
+ from fastapi import APIRouter, Depends, HTTPException, status, BackgroundTasks, UploadFile, File
7
+ from sqlalchemy.ext.asyncio import AsyncSession
8
+ from sqlalchemy import select
9
+
10
+ from app.api import deps
11
+ from app.models.data import Dataset, DataCleaningJob, DataJobStatus
12
+ from app.schemas.data import (
13
+ DatasetResponse,
14
+ DataCleaningJobResponse,
15
+ DataCleaningJobCreate,
16
+ # DataProfileRequest removed (Dead Code Cleanup)
17
+ DataQualityReport,
18
+ ImputationRequest
19
+ )
20
+ from app.tasks.datapure_jobs import trigger_datapure_job
21
+ from app.services.datapure.engine import DataPureEngine
22
+
23
+ router = APIRouter()
24
+ engine = DataPureEngine()
25
+
26
+ @router.post("/upload", response_model=DatasetResponse, status_code=status.HTTP_201_CREATED)
27
+ async def upload_research_dataset(
28
+ background_tasks: BackgroundTasks,
29
+ file: UploadFile = File(...),
30
+ db: AsyncSession = Depends(deps.get_db),
31
+ current_user = Depends(deps.get_current_active_user)
32
+ ):
33
+ """
34
+ Stage 1: Intelligent Ingestion.
35
+ Supports CSV, Excel, and SPSS formats with chunked processing for 1M row scale.
36
+ """
37
+ # 1. Securely handle file storage [cite: 19]
38
+ content = await file.read()
39
+ file_id = hashlib.sha256(f"{current_user.id}:{file.filename}:{time.time()}".encode()).hexdigest()[:16]
40
+
41
+ # Path Traversal Fix: Sanitize the filename to prevent ../ sequences [cite: 20-21]
42
+ safe_filename = os.path.basename(file.filename)
43
+ storage_path = f"storage/datasets/{file_id}_{safe_filename}"
44
+
45
+ # 2. Create Dataset Record
46
+ new_dataset = Dataset(
47
+ id=file_id,
48
+ user_id=current_user.id,
49
+ filename=safe_filename,
50
+ storage_path=storage_path,
51
+ institution_id=getattr(current_user, 'institution_id', None)
52
+ )
53
+
54
+ db.add(new_dataset)
55
+ await db.commit()
56
+ await db.refresh(new_dataset)
57
+
58
+ # 3. Queue Stage 2 & 3: Profiling and Quality Diagnostics automatically
59
+ job_id = f"job_{file_id}"
60
+
61
+ background_tasks.add_task(
62
+ trigger_datapure_job,
63
+ dataset_id=file_id,
64
+ job_id=job_id,
65
+ study_design="General"
66
+ )
67
+
68
+ return new_dataset
69
+
70
+ @router.post("/clean", response_model=DataCleaningJobResponse, status_code=status.HTTP_202_ACCEPTED)
71
+ async def initiate_cleaning_protocol(
72
+ req: DataCleaningJobCreate,
73
+ background_tasks: BackgroundTasks,
74
+ db: AsyncSession = Depends(deps.get_db),
75
+ current_user = Depends(deps.get_current_active_user)
76
+ ):
77
+ """
78
+ Stage 4: Cleaning Orchestration.
79
+ """
80
+ result = await db.execute(
81
+ select(Dataset).where(Dataset.id == req.dataset_id, Dataset.user_id == current_user.id)
82
+ )
83
+ dataset = result.scalar_one_or_none()
84
+ if not dataset:
85
+ raise HTTPException(status_code=404, detail="Dataset not found")
86
+
87
+ job_id = hashlib.sha256(f"{req.dataset_id}:{time.time()}".encode()).hexdigest()[:16]
88
+ new_job = DataCleaningJob(
89
+ id=job_id,
90
+ dataset_id=req.dataset_id,
91
+ status=DataJobStatus.PENDING,
92
+ study_design=req.study_design
93
+ )
94
+ db.add(new_job)
95
+ await db.commit()
96
+
97
+ background_tasks.add_task(
98
+ trigger_datapure_job,
99
+ dataset_id=req.dataset_id,
100
+ job_id=job_id,
101
+ study_design=req.study_design
102
+ )
103
+
104
+ return new_job
105
+
106
+ @router.get("/jobs/{job_id}", response_model=DataCleaningJobResponse)
107
+ async def get_cleaning_status(
108
+ job_id: str,
109
+ db: AsyncSession = Depends(deps.get_db),
110
+ current_user = Depends(deps.get_current_active_user)
111
+ ):
112
+ result = await db.execute(
113
+ select(DataCleaningJob).where(DataCleaningJob.id == job_id)
114
+ )
115
+ job = result.scalar_one_or_none()
116
+ if not job:
117
+ raise HTTPException(status_code=404, detail="Cleaning job not found")
118
+
119
+ return job
120
+
121
+ @router.post("/impute", status_code=status.HTTP_202_ACCEPTED)
122
+ async def trigger_mice_imputation(
123
+ req: ImputationRequest,
124
+ db: AsyncSession = Depends(deps.get_db),
125
+ current_user = Depends(deps.get_current_active_user)
126
+ ):
127
+ status_update = await engine.run_mice_imputation(req)
128
+ return status_update
129
+
130
+ @router.get("/diagnostics/{dataset_id}", response_model=DataQualityReport)
131
+ async def get_quality_diagnostics(
132
+ dataset_id: str,
133
+ db: AsyncSession = Depends(deps.get_db),
134
+ current_user = Depends(deps.get_current_active_user)
135
+ ):
136
+ result = await db.execute(select(Dataset).where(Dataset.id == dataset_id))
137
+ dataset = result.scalar_one_or_none()
138
+
139
+ if not dataset or not dataset.column_metadata:
140
+ raise HTTPException(status_code=404, detail="Diagnostics not yet available")
141
+
142
+ return dataset.column_metadata