from fastapi import APIRouter, Depends, HTTPException, BackgroundTasks from pydantic import BaseModel from typing import Optional from apps.api.dependencies.auth import get_api_key, get_current_user_tier from apps.worker.export_tasks import export_data_task router = APIRouter(prefix="/export", tags=["Export"]) class ExportRequest(BaseModel): hs_code: Optional[str] = None country: Optional[str] = None start_date: str end_date: str email: str @router.post("/") async def request_async_export( req: ExportRequest, api_key: str = Depends(get_api_key), tier: str = Depends(get_current_user_tier) ): """ 提交异步导出任务。 根据用户的层级 (tier),可以限制导出的数据量或范围。 """ if tier == "trial": raise HTTPException(status_code=403, detail="Trial users cannot export data. Please upgrade your plan.") # 将任务推入 Celery 队列 task = export_data_task.delay( query_params={ "hs_code": req.hs_code, "country": req.country, "start_date": req.start_date, "end_date": req.end_date }, user_email=req.email ) return { "message": "Export task submitted successfully.", "task_id": task.id, "tier": tier } @router.get("/status/{task_id}") async def get_export_status(task_id: str, api_key: str = Depends(get_api_key)): """ 查询异步导出任务状态。 """ from packages.core.celery_app import celery_app task = celery_app.AsyncResult(task_id) if task.state == 'PENDING': return {"task_id": task_id, "status": "Pending"} elif task.state == 'SUCCESS': return {"task_id": task_id, "status": "Completed", "result": task.result} elif task.state == 'FAILURE': return {"task_id": task_id, "status": "Failed", "error": str(task.info)} else: return {"task_id": task_id, "status": task.state}