Spaces:
Sleeping
Sleeping
| """Upload and analysis API endpoints.""" | |
| from __future__ import annotations | |
| import uuid | |
| from typing import Optional | |
| from fastapi import APIRouter, BackgroundTasks, Depends, File, HTTPException, Query, UploadFile | |
| from app.core.config import settings | |
| from app.core.logging import get_logger | |
| from app.core.security import get_api_key | |
| from app.models.schemas import ( | |
| AnalysisResult, | |
| AnalysisStatus, | |
| ComparisonRequest, | |
| ComparisonResult, | |
| FilterParams, | |
| JobStatus, | |
| TopicInfo, | |
| ) | |
| from app.services.analysis_pipeline import ( | |
| filter_entries, | |
| get_all_jobs, | |
| get_job, | |
| run_analysis, | |
| ) | |
| from app.services.file_processing import parse_file | |
| logger = get_logger(__name__) | |
| router = APIRouter(prefix="/api/v1", tags=["analysis"]) | |
| async def upload_file( | |
| background_tasks: BackgroundTasks, | |
| file: UploadFile = File(...), | |
| source: Optional[str] = Query(None, description="Data source label"), | |
| api_key: str = Depends(get_api_key), | |
| ): | |
| """Upload a file for analysis. Supports CSV, JSON, Excel, ZIP.""" | |
| if not file.filename: | |
| raise HTTPException(status_code=400, detail="No filename provided") | |
| content = await file.read() | |
| size_mb = len(content) / (1024 * 1024) | |
| if size_mb > settings.max_upload_size_mb: | |
| raise HTTPException( | |
| status_code=413, | |
| detail=f"File too large ({size_mb:.1f}MB). Maximum: {settings.max_upload_size_mb}MB", | |
| ) | |
| try: | |
| entries = parse_file(content, file.filename, source) | |
| except ValueError as exc: | |
| raise HTTPException(status_code=400, detail=str(exc)) | |
| if not entries: | |
| raise HTTPException(status_code=400, detail="No valid entries found in the uploaded file") | |
| job_id = uuid.uuid4().hex[:12] | |
| logger.info("upload_received", job_id=job_id, filename=file.filename, entries=len(entries), size_mb=round(size_mb, 2)) | |
| background_tasks.add_task(run_analysis, entries, job_id) | |
| from datetime import datetime | |
| return JobStatus( | |
| job_id=job_id, | |
| status=AnalysisStatus.PENDING, | |
| progress=0.0, | |
| message=f"Processing {len(entries)} entries from {file.filename}", | |
| created_at=datetime.utcnow(), | |
| ) | |
| async def upload_chunked( | |
| background_tasks: BackgroundTasks, | |
| file: UploadFile = File(...), | |
| chunk_index: int = Query(0, ge=0), | |
| total_chunks: int = Query(1, ge=1), | |
| upload_id: Optional[str] = Query(None), | |
| source: Optional[str] = Query(None), | |
| api_key: str = Depends(get_api_key), | |
| ): | |
| """Chunked upload for files >10MB.""" | |
| from pathlib import Path | |
| upload_id = upload_id or uuid.uuid4().hex[:12] | |
| chunk_dir = settings.upload_path / f"chunks_{upload_id}" | |
| chunk_dir.mkdir(parents=True, exist_ok=True) | |
| content = await file.read() | |
| chunk_path = chunk_dir / f"chunk_{chunk_index:04d}" | |
| chunk_path.write_bytes(content) | |
| logger.info("chunk_received", upload_id=upload_id, chunk=chunk_index, total=total_chunks) | |
| if chunk_index + 1 < total_chunks: | |
| from datetime import datetime | |
| return JobStatus( | |
| job_id=upload_id, | |
| status=AnalysisStatus.PENDING, | |
| progress=chunk_index / total_chunks, | |
| message=f"Received chunk {chunk_index + 1}/{total_chunks}", | |
| created_at=datetime.utcnow(), | |
| ) | |
| # All chunks received — reassemble | |
| chunks = sorted(chunk_dir.glob("chunk_*")) | |
| combined = b"".join(c.read_bytes() for c in chunks) | |
| # Clean up chunks | |
| for c in chunks: | |
| c.unlink() | |
| chunk_dir.rmdir() | |
| try: | |
| filename = file.filename or "upload.csv" | |
| entries = parse_file(combined, filename, source) | |
| except ValueError as exc: | |
| raise HTTPException(status_code=400, detail=str(exc)) | |
| if not entries: | |
| raise HTTPException(status_code=400, detail="No valid entries found") | |
| background_tasks.add_task(run_analysis, entries, upload_id) | |
| from datetime import datetime | |
| return JobStatus( | |
| job_id=upload_id, | |
| status=AnalysisStatus.PROCESSING, | |
| progress=0.0, | |
| message=f"All chunks received. Processing {len(entries)} entries.", | |
| created_at=datetime.utcnow(), | |
| ) | |
| async def list_jobs(api_key: str = Depends(get_api_key)): | |
| """List all analysis jobs.""" | |
| jobs = get_all_jobs() | |
| return [ | |
| JobStatus( | |
| job_id=j.job_id, | |
| status=j.status, | |
| progress=1.0 if j.status == AnalysisStatus.COMPLETED else 0.5, | |
| message="", | |
| created_at=j.created_at, | |
| completed_at=j.completed_at, | |
| ) | |
| for j in jobs | |
| ] | |
| async def get_job_result(job_id: str, api_key: str = Depends(get_api_key)): | |
| """Get analysis results for a specific job.""" | |
| job = get_job(job_id) | |
| if not job: | |
| raise HTTPException(status_code=404, detail=f"Job {job_id} not found") | |
| return job | |
| async def get_job_status(job_id: str, api_key: str = Depends(get_api_key)): | |
| """Get status of an analysis job.""" | |
| job = get_job(job_id) | |
| if not job: | |
| raise HTTPException(status_code=404, detail=f"Job {job_id} not found") | |
| return JobStatus( | |
| job_id=job.job_id, | |
| status=job.status, | |
| progress=1.0 if job.status == AnalysisStatus.COMPLETED else 0.5, | |
| message="", | |
| created_at=job.created_at, | |
| completed_at=job.completed_at, | |
| ) | |
| async def filter_job_results( | |
| job_id: str, | |
| filters: FilterParams, | |
| api_key: str = Depends(get_api_key), | |
| ): | |
| """Filter analysis results.""" | |
| job = get_job(job_id) | |
| if not job: | |
| raise HTTPException(status_code=404, detail=f"Job {job_id} not found") | |
| if job.status != AnalysisStatus.COMPLETED: | |
| raise HTTPException(status_code=400, detail="Analysis not yet completed") | |
| filtered = filter_entries( | |
| job.entries, | |
| date_from=filters.date_from, | |
| date_to=filters.date_to, | |
| sentiment_min=filters.sentiment_min, | |
| sentiment_max=filters.sentiment_max, | |
| topics=filters.topics, | |
| languages=filters.languages, | |
| sources=filters.sources, | |
| search_text=filters.search_text, | |
| ) | |
| # Paginate | |
| start = (filters.page - 1) * filters.page_size | |
| end = start + filters.page_size | |
| return { | |
| "total": len(filtered), | |
| "page": filters.page, | |
| "page_size": filters.page_size, | |
| "entries": filtered[start:end], | |
| } | |
| async def compare_segments( | |
| job_id: str, | |
| comparison: ComparisonRequest, | |
| api_key: str = Depends(get_api_key), | |
| ): | |
| """Compare two data segments from the same job.""" | |
| job = get_job(job_id) | |
| if not job: | |
| raise HTTPException(status_code=404, detail=f"Job {job_id} not found") | |
| if job.status != AnalysisStatus.COMPLETED: | |
| raise HTTPException(status_code=400, detail="Analysis not yet completed") | |
| from collections import Counter | |
| import numpy as np | |
| from app.models.schemas import AnalysisSummary, SentimentLabel | |
| seg_a_entries = filter_entries( | |
| job.entries, **comparison.segment_a.model_dump(exclude={"page", "page_size"}) | |
| ) | |
| seg_b_entries = filter_entries( | |
| job.entries, **comparison.segment_b.model_dump(exclude={"page", "page_size"}) | |
| ) | |
| def make_summary(entries): | |
| if not entries: | |
| return AnalysisSummary( | |
| total_entries=0, avg_sentiment=0.5, | |
| dominant_sentiment=SentimentLabel.NEUTRAL, | |
| num_topics=0, top_topics=[], languages_detected=[], | |
| ) | |
| sentiments = [e.sentiment for e in entries] | |
| topic_counts = Counter(e.topic_id for e in entries) | |
| return AnalysisSummary( | |
| total_entries=len(entries), | |
| avg_sentiment=round(float(np.mean([s.score for s in sentiments])), 4), | |
| dominant_sentiment=SentimentLabel( | |
| Counter(s.label.value for s in sentiments).most_common(1)[0][0] | |
| ), | |
| num_topics=len(set(e.topic_id for e in entries) - {-1}), | |
| top_topics=[ | |
| TopicInfo(topic_id=tid, label=f"Topic {tid}", keywords=[], size=cnt) | |
| for tid, cnt in topic_counts.most_common(5) if tid != -1 | |
| ], | |
| languages_detected=list(set(e.language.language for e in entries)), | |
| ) | |
| sum_a = make_summary(seg_a_entries) | |
| sum_b = make_summary(seg_b_entries) | |
| topics_a = set(e.topic_id for e in seg_a_entries) - {-1} | |
| topics_b = set(e.topic_id for e in seg_b_entries) - {-1} | |
| return ComparisonResult( | |
| segment_a=sum_a, | |
| segment_b=sum_b, | |
| sentiment_delta=round(sum_b.avg_sentiment - sum_a.avg_sentiment, 4), | |
| topic_changes=[], | |
| new_topics=[ | |
| TopicInfo(topic_id=t, label=f"Topic {t}", keywords=[], size=0) | |
| for t in topics_b - topics_a | |
| ], | |
| disappeared_topics=[ | |
| TopicInfo(topic_id=t, label=f"Topic {t}", keywords=[], size=0) | |
| for t in topics_a - topics_b | |
| ], | |
| ) | |