| """Reverse pipeline executor — the 7-step request lifecycle. |
| |
| Pipeline: plan → account → proxy → serialize → execute → classify → feedback |
| |
| This executor is opt-in. Existing products-layer code that calls transport |
| directly continues to work; the executor wraps the same pattern with |
| structured feedback and classification. |
| """ |
|
|
| import asyncio |
| from typing import Any |
|
|
| from app.platform.logging.logger import logger |
| from app.platform.runtime.clock import now_ms |
| from app.platform.errors import UpstreamError |
| from app.control.model.spec import ModelSpec |
| from app.dataplane.account import AccountDirectory, get_account_directory |
| from app.dataplane.account.lease import AccountLease |
| from app.dataplane.proxy import get_proxy_runtime |
|
|
| from .types import ReversePlan, ReverseLeaseSet, ReverseResult, ResultCategory |
| from .planner import build_plan |
| from .classifier import classify_result |
| from .feedback import build_proxy_feedback |
|
|
|
|
| async def execute( |
| spec: ModelSpec, |
| request: dict[str, Any], |
| *, |
| payload_builder: Any | None = None, |
| ) -> ReverseResult: |
| """Execute the full reverse pipeline for one request. |
| |
| Parameters |
| ---------- |
| spec : ModelSpec |
| The resolved model specification. |
| request : dict |
| The raw API request body. |
| payload_builder : callable, optional |
| ``(plan, token, request) → bytes`` — serializes the request into the |
| upstream payload format. If None, the request dict is JSON-encoded. |
| |
| Returns |
| ------- |
| ReverseResult |
| Classified outcome of the upstream call. |
| """ |
| t0 = now_ms() |
|
|
| |
| plan = build_plan(spec, request) |
|
|
| |
| directory = await get_account_directory() |
|
|
| lease = await directory.reserve(plan.pool_candidates, plan.mode_id) |
| if lease is None: |
| return ReverseResult( |
| category=ResultCategory.RATE_LIMITED, |
| error="No available accounts", |
| latency_ms=int(now_ms() - t0), |
| ) |
|
|
| |
| proxy_runtime = await get_proxy_runtime() |
| proxy_lease = await proxy_runtime.acquire() |
|
|
| leases = ReverseLeaseSet( |
| account_idx=lease.idx, |
| account_token=lease.token, |
| proxy_lease=proxy_lease, |
| ) |
|
|
| |
| result = await _execute_transport(plan, leases, request, payload_builder) |
| result.latency_ms = int(now_ms() - t0) |
|
|
| |
|
|
| |
| asyncio.create_task( |
| _apply_feedback_and_release(plan, leases, result, directory, lease), |
| ) |
|
|
| return result |
|
|
|
|
| async def _execute_transport( |
| plan: ReversePlan, |
| leases: ReverseLeaseSet, |
| request: dict[str, Any], |
| payload_builder: Any | None, |
| ) -> ReverseResult: |
| """Execute the transport call and classify the result.""" |
| try: |
| import orjson |
|
|
| if payload_builder: |
| payload = payload_builder(plan, leases.account_token, request) |
| else: |
| payload = orjson.dumps(request) |
|
|
| from app.dataplane.reverse.transport.http import post_json |
|
|
| raw = await post_json( |
| plan.endpoint, |
| leases.account_token, |
| payload, |
| lease=leases.proxy_lease, |
| timeout_s=plan.timeout_s, |
| content_type=plan.content_type, |
| origin=plan.origin, |
| referer=plan.referer, |
| ) |
| category = classify_result(200) |
| return ReverseResult( |
| category=category, |
| status_code=200, |
| payload=raw, |
| ) |
| except UpstreamError as exc: |
| category = classify_result(exc.status, exc.details.get("body", "")) |
| return ReverseResult( |
| category=category, |
| status_code=exc.status, |
| body=exc.details.get("body", ""), |
| error=str(exc), |
| ) |
| except Exception as exc: |
| logger.error( |
| "reverse transport execution failed: error_type={} error={}", |
| type(exc).__name__, |
| exc, |
| ) |
| return ReverseResult( |
| category=ResultCategory.TRANSPORT_ERR, |
| error=str(exc), |
| ) |
|
|
|
|
| async def _apply_feedback_and_release( |
| plan: ReversePlan, |
| leases: ReverseLeaseSet, |
| result: ReverseResult, |
| directory: AccountDirectory, |
| account_lease: AccountLease, |
| ) -> None: |
| """Apply account and proxy feedback, then release the lease (best-effort).""" |
| try: |
| |
| await directory.release(account_lease) |
|
|
| |
| from app.control.account.enums import FeedbackKind |
|
|
| _CATEGORY_TO_FEEDBACK = { |
| ResultCategory.SUCCESS: FeedbackKind.SUCCESS, |
| ResultCategory.RATE_LIMITED: FeedbackKind.RATE_LIMITED, |
| ResultCategory.AUTH_FAILURE: FeedbackKind.UNAUTHORIZED, |
| ResultCategory.FORBIDDEN: FeedbackKind.FORBIDDEN, |
| ResultCategory.UPSTREAM_5XX: FeedbackKind.SERVER_ERROR, |
| ResultCategory.TRANSPORT_ERR: FeedbackKind.SERVER_ERROR, |
| ResultCategory.UNKNOWN: FeedbackKind.SERVER_ERROR, |
| } |
| fb_kind = _CATEGORY_TO_FEEDBACK.get(result.category) |
| if fb_kind is not None: |
| await directory.feedback( |
| leases.account_token, |
| fb_kind, |
| plan.mode_id, |
| ) |
|
|
| |
| if leases.proxy_lease: |
| proxy_fb = build_proxy_feedback(result) |
| proxy_runtime = await get_proxy_runtime() |
| await proxy_runtime.feedback(leases.proxy_lease, proxy_fb) |
| except Exception as exc: |
| logger.debug("reverse feedback update failed (non-fatal): error={}", exc) |
|
|
|
|
| __all__ = ["execute"] |
|
|