Spaces:
Running
Running
File size: 2,813 Bytes
b1aa633 8ca4657 b1aa633 8ca4657 6d44285 8ca4657 2bb79a1 8ca4657 2bb79a1 8ca4657 6d44285 8ca4657 6d44285 8ca4657 6d44285 8ca4657 2bb79a1 8ca4657 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 | from fastapi import APIRouter, Depends, Request
from sqlalchemy.ext.asyncio import AsyncSession
from sqlmodel import select
from typing import List, Any
from app.api import deps
from app.core.db import get_db
from app.models.models import Workspace, Message, User
from app.schemas.envelope import ResponseEnvelope, wrap_data
from app.services.dispatch_service import DispatchService
from app.services.audit_service import audit_event
from app.core.modules import require_module_enabled, MODULE_DISPATCH_ENGINE
from app.services.entitlements import require_entitlement
router = APIRouter()
@router.post("/run", response_model=ResponseEnvelope[dict], dependencies=[Depends(require_module_enabled(MODULE_DISPATCH_ENGINE, "write")), Depends(require_entitlement("dispatch_engine", increment=True))])
async def trigger_dispatch(
request: Request,
db: AsyncSession = Depends(get_db),
workspace: Workspace = Depends(deps.get_active_workspace),
current_user: User = Depends(deps.get_current_user),
_ = Depends(deps.require_role(["owner", "member"])),
) -> Any:
"""Manually trigger dispatch for current workspace."""
processed, failed = await DispatchService.dispatch_pending_messages(db, workspace_id=workspace.id)
await audit_event(
db, action="dispatch_trigger", entity_type="workspace",
entity_id=str(workspace.id), actor_user_id=current_user.id,
outcome="success", workspace_id=workspace.id, request=request,
metadata={"processed": processed, "failed": failed},
)
await db.commit()
return wrap_data({
"processed_count": processed,
"failed_count": failed
})
@router.get("/queue", response_model=ResponseEnvelope[List[dict]], dependencies=[Depends(require_module_enabled(MODULE_DISPATCH_ENGINE, "read")), Depends(require_entitlement("dispatch_engine"))])
async def get_dispatch_queue(
db: AsyncSession = Depends(get_db),
workspace: Workspace = Depends(deps.get_active_workspace),
_ = Depends(deps.require_role(["owner", "member"]))
) -> Any:
"""Returns latest 50 outbound messages with statuses."""
result = await db.execute(
select(Message)
.where(Message.workspace_id == workspace.id, Message.direction == "outbound")
.order_by(Message.created_at.desc())
.limit(50)
)
messages = result.scalars().all()
# Format for UI
output = []
for m in messages:
output.append({
"id": str(m.id),
"content": m.content[:50] + "..." if len(m.content) > 50 else m.content,
"status": m.delivery_status,
"attempts": m.attempt_count,
"last_error": m.last_error,
"platform": m.platform,
"created_at": m.created_at.isoformat()
})
return wrap_data(output)
|