File size: 2,813 Bytes
b1aa633
8ca4657
 
 
 
 
 
b1aa633
 
8ca4657
6d44285
8ca4657
2bb79a1
8ca4657
 
 
2bb79a1
8ca4657
6d44285
8ca4657
 
6d44285
 
8ca4657
 
 
6d44285
 
 
 
 
 
 
8ca4657
 
 
 
 
2bb79a1
8ca4657
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
from fastapi import APIRouter, Depends, Request
from sqlalchemy.ext.asyncio import AsyncSession
from sqlmodel import select
from typing import List, Any

from app.api import deps
from app.core.db import get_db
from app.models.models import Workspace, Message, User
from app.schemas.envelope import ResponseEnvelope, wrap_data
from app.services.dispatch_service import DispatchService
from app.services.audit_service import audit_event
from app.core.modules import require_module_enabled, MODULE_DISPATCH_ENGINE
from app.services.entitlements import require_entitlement

router = APIRouter()

@router.post("/run", response_model=ResponseEnvelope[dict], dependencies=[Depends(require_module_enabled(MODULE_DISPATCH_ENGINE, "write")), Depends(require_entitlement("dispatch_engine", increment=True))])
async def trigger_dispatch(
    request: Request,
    db: AsyncSession = Depends(get_db),
    workspace: Workspace = Depends(deps.get_active_workspace),
    current_user: User = Depends(deps.get_current_user),
    _ = Depends(deps.require_role(["owner", "member"])),
) -> Any:
    """Manually trigger dispatch for current workspace."""
    processed, failed = await DispatchService.dispatch_pending_messages(db, workspace_id=workspace.id)
    await audit_event(
        db, action="dispatch_trigger", entity_type="workspace",
        entity_id=str(workspace.id), actor_user_id=current_user.id,
        outcome="success", workspace_id=workspace.id, request=request,
        metadata={"processed": processed, "failed": failed},
    )
    await db.commit()
    return wrap_data({
        "processed_count": processed,
        "failed_count": failed
    })

@router.get("/queue", response_model=ResponseEnvelope[List[dict]], dependencies=[Depends(require_module_enabled(MODULE_DISPATCH_ENGINE, "read")), Depends(require_entitlement("dispatch_engine"))])
async def get_dispatch_queue(
    db: AsyncSession = Depends(get_db),
    workspace: Workspace = Depends(deps.get_active_workspace),
    _ = Depends(deps.require_role(["owner", "member"]))
) -> Any:
    """Returns latest 50 outbound messages with statuses."""
    result = await db.execute(
        select(Message)
        .where(Message.workspace_id == workspace.id, Message.direction == "outbound")
        .order_by(Message.created_at.desc())
        .limit(50)
    )
    messages = result.scalars().all()
    
    # Format for UI
    output = []
    for m in messages:
        output.append({
            "id": str(m.id),
            "content": m.content[:50] + "..." if len(m.content) > 50 else m.content,
            "status": m.delivery_status,
            "attempts": m.attempt_count,
            "last_error": m.last_error,
            "platform": m.platform,
            "created_at": m.created_at.isoformat()
        })
    
    return wrap_data(output)