3v324v23 commited on
Commit
e292e57
·
1 Parent(s): e8e3637

update flow poolling

Browse files
Files changed (1) hide show
  1. app/api/v1/endpoints/report.py +74 -12
app/api/v1/endpoints/report.py CHANGED
@@ -1,19 +1,81 @@
1
  # app/api/v1/endpoints/report.py
2
 
3
- from fastapi import APIRouter
4
- from app.models.models import ReportRequest, ReportResponse
5
- from app.api.v1.dependencies import ReportServiceDep
6
  import time
 
 
 
 
 
 
 
 
7
 
8
  router = APIRouter()
9
 
10
- @router.post("/report")
11
- async def get_report(
12
- request: ReportRequest,
13
- report_service: ReportServiceDep
14
- ) -> ReportResponse:
 
 
 
 
 
 
 
 
 
 
15
  start_time = time.time()
16
- report = await report_service.generate_report(request)
17
- end_time = time.time()
18
- return ReportResponse(readme_content=report, processing_time=end_time - start_time)
19
-
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
  # app/api/v1/endpoints/report.py
2
 
3
+ import asyncio
 
 
4
  import time
5
+ from typing import Any, Optional
6
+ from uuid import uuid4
7
+
8
+ from fastapi import APIRouter, HTTPException
9
+ from pydantic import BaseModel, Field
10
+
11
+ from app.api.v1.dependencies import ReportServiceDep
12
+ from app.models.models import ReportRequest
13
 
14
  router = APIRouter()
15
 
16
+ # In-memory job store for polling
17
+ _jobs: dict[str, dict[str, Any]] = {}
18
+ _tasks: dict[str, asyncio.Task] = {}
19
+ _lock = asyncio.Lock()
20
+
21
+
22
+ class ReportJobStatus(BaseModel):
23
+ job_id: str = Field(..., description="Polling job identifier")
24
+ status: str = Field(..., description="queued | processing | succeeded | failed")
25
+ readme_content: Optional[dict[str, Any]] = Field(None, description="Report content when completed")
26
+ processing_time: Optional[float] = Field(None, description="Processing time in seconds when completed")
27
+ error: Optional[str] = Field(None, description="Error message when failed")
28
+
29
+
30
+ async def _run_report_job(job_id: str, request: ReportRequest, report_service: ReportServiceDep) -> None:
31
  start_time = time.time()
32
+ try:
33
+ result = await report_service.generate_report(request)
34
+ duration = time.time() - start_time
35
+ async with _lock:
36
+ _jobs[job_id].update({
37
+ "status": "succeeded",
38
+ "readme_content": result,
39
+ "processing_time": duration,
40
+ })
41
+ except Exception as exc: # keep job record even if it fails
42
+ async with _lock:
43
+ _jobs[job_id].update({
44
+ "status": "failed",
45
+ "error": str(exc),
46
+ })
47
+
48
+
49
+ @router.post("/report", response_model=ReportJobStatus)
50
+ async def start_report_job(
51
+ request: ReportRequest,
52
+ report_service: ReportServiceDep,
53
+ ) -> ReportJobStatus:
54
+ job_id = uuid4().hex
55
+ async with _lock:
56
+ _jobs[job_id] = {
57
+ "job_id": job_id,
58
+ "status": "queued",
59
+ "readme_content": None,
60
+ "processing_time": None,
61
+ "error": None,
62
+ }
63
+ task = asyncio.create_task(_run_report_job(job_id, request, report_service))
64
+ _tasks[job_id] = task
65
+ async with _lock:
66
+ snapshot = _jobs[job_id].copy()
67
+ return ReportJobStatus(**snapshot)
68
+
69
+
70
+ @router.get("/report/{job_id}", response_model=ReportJobStatus)
71
+ async def get_report_job(job_id: str) -> ReportJobStatus:
72
+ async with _lock:
73
+ job = _jobs.get(job_id)
74
+ if not job:
75
+ raise HTTPException(status_code=404, detail="Job not found")
76
+ snapshot = job.copy()
77
+ # Clean up finished jobs to free memory
78
+ if snapshot["status"] in {"succeeded", "failed"}:
79
+ _jobs.pop(job_id, None)
80
+ _tasks.pop(job_id, None)
81
+ return ReportJobStatus(**snapshot)