File size: 3,569 Bytes
72d2ab9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
"""
Endpoints لإدارة طلبات التحليل.

POST /api/v1/analysis/trigger     - إطلاق تحليل لطلب موجود
GET  /api/v1/analysis/{id}/status - حالة طلب
GET  /api/v1/analysis/pending     - طلبات pending
POST /api/v1/analysis/{id}/retry  - إعادة تشغيل طلب فشل

التشغيل يتم داخل process الـ FastAPI نفسه عبر BackgroundTasks
مع تحكم بالتوازي عبر app.workers.runner (واحد في المرة).
"""

from __future__ import annotations

from typing import Any, Dict, List

from fastapi import APIRouter, BackgroundTasks, HTTPException

from app.database.models import StatusResponse, TriggerRequest
from app.database.supabase_client import (
    get_pending_requests,
    get_request,
    update_request_status,
)
from app.workers.runner import (
    claim,
    in_progress_count,
    is_in_progress,
    run_with_concurrency,
)


router = APIRouter()


@router.post("/trigger")
def trigger_analysis(
    payload: TriggerRequest,
    background_tasks: BackgroundTasks,
) -> Dict[str, Any]:
    """
    يضع المهمة في طابور التنفيذ. الطلب يجب أن يكون موجوداً
    في analysis_requests مسبقاً (ينشئه الـ Frontend).
    """
    request_id = str(payload.request_id)
    request_data = get_request(request_id)
    if request_data is None:
        raise HTTPException(status_code=404, detail="Analysis request not found")

    if request_data.get("status") == "completed":
        return {"status": "already_completed", "request_id": request_id}

    if is_in_progress(request_id):
        return {"status": "already_processing", "request_id": request_id}

    current_status = request_data.get("status")
    if current_status in {"collecting", "analyzing"}:
        return {"status": "already_processing", "request_id": request_id}

    if not claim(request_id):
        return {"status": "already_processing", "request_id": request_id}

    update_request_status(request_id, "pending", 5)
    background_tasks.add_task(run_with_concurrency, request_id, request_data)

    return {
        "status": "queued",
        "request_id": request_id,
        "in_progress_count": in_progress_count(),
    }


@router.get("/{request_id}/status", response_model=StatusResponse)
def status(request_id: str) -> StatusResponse:
    data = get_request(request_id)
    if data is None:
        raise HTTPException(status_code=404, detail="Analysis request not found")
    return StatusResponse(
        request_id=data["id"],
        status=data.get("status", "unknown"),
        progress=int(data.get("progress") or 0),
        error_message=data.get("error_message"),
        updated_at=data.get("updated_at"),
    )


@router.get("/pending")
def list_pending(limit: int = 20) -> List[Dict[str, Any]]:
    return get_pending_requests(limit=limit)


@router.post("/{request_id}/retry")
def retry_analysis(
    request_id: str,
    background_tasks: BackgroundTasks,
) -> Dict[str, Any]:
    data = get_request(request_id)
    if data is None:
        raise HTTPException(status_code=404, detail="Analysis request not found")

    if is_in_progress(request_id):
        return {"status": "already_processing", "request_id": request_id}

    if not claim(request_id):
        return {"status": "already_processing", "request_id": request_id}

    update_request_status(request_id, "pending", 0, error_message=None)
    background_tasks.add_task(run_with_concurrency, request_id, data)
    return {"status": "queued", "request_id": request_id}