"""Reverse pipeline executor — the 7-step request lifecycle. Pipeline: plan → account → proxy → serialize → execute → classify → feedback This executor is opt-in. Existing products-layer code that calls transport directly continues to work; the executor wraps the same pattern with structured feedback and classification. """ import asyncio from typing import Any from app.platform.logging.logger import logger from app.platform.runtime.clock import now_ms from app.platform.errors import UpstreamError from app.control.model.spec import ModelSpec from app.dataplane.account import AccountDirectory, get_account_directory from app.dataplane.account.lease import AccountLease from app.dataplane.proxy import get_proxy_runtime from .types import ReversePlan, ReverseLeaseSet, ReverseResult, ResultCategory from .planner import build_plan from .classifier import classify_result from .feedback import build_proxy_feedback async def execute( spec: ModelSpec, request: dict[str, Any], *, payload_builder: Any | None = None, ) -> ReverseResult: """Execute the full reverse pipeline for one request. Parameters ---------- spec : ModelSpec The resolved model specification. request : dict The raw API request body. payload_builder : callable, optional ``(plan, token, request) → bytes`` — serializes the request into the upstream payload format. If None, the request dict is JSON-encoded. Returns ------- ReverseResult Classified outcome of the upstream call. """ t0 = now_ms() # Step 1: Plan plan = build_plan(spec, request) # Step 2: Acquire account directory = await get_account_directory() lease = await directory.reserve(plan.pool_candidates, plan.mode_id) if lease is None: return ReverseResult( category=ResultCategory.RATE_LIMITED, error="No available accounts", latency_ms=int(now_ms() - t0), ) # Step 3: Acquire proxy proxy_runtime = await get_proxy_runtime() proxy_lease = await proxy_runtime.acquire() leases = ReverseLeaseSet( account_idx=lease.idx, account_token=lease.token, proxy_lease=proxy_lease, ) # Step 4-5: Serialize + Execute result = await _execute_transport(plan, leases, request, payload_builder) result.latency_ms = int(now_ms() - t0) # Step 6: Classify (done inside _execute_transport) # Step 7: Feedback + release (fire-and-forget) asyncio.create_task( _apply_feedback_and_release(plan, leases, result, directory, lease), ) return result async def _execute_transport( plan: ReversePlan, leases: ReverseLeaseSet, request: dict[str, Any], payload_builder: Any | None, ) -> ReverseResult: """Execute the transport call and classify the result.""" try: import orjson if payload_builder: payload = payload_builder(plan, leases.account_token, request) else: payload = orjson.dumps(request) from app.dataplane.reverse.transport.http import post_json raw = await post_json( plan.endpoint, leases.account_token, payload, lease=leases.proxy_lease, timeout_s=plan.timeout_s, content_type=plan.content_type, origin=plan.origin, referer=plan.referer, ) category = classify_result(200) return ReverseResult( category=category, status_code=200, payload=raw, ) except UpstreamError as exc: category = classify_result(exc.status, exc.details.get("body", "")) return ReverseResult( category=category, status_code=exc.status, body=exc.details.get("body", ""), error=str(exc), ) except Exception as exc: logger.error( "reverse transport execution failed: error_type={} error={}", type(exc).__name__, exc, ) return ReverseResult( category=ResultCategory.TRANSPORT_ERR, error=str(exc), ) async def _apply_feedback_and_release( plan: ReversePlan, leases: ReverseLeaseSet, result: ReverseResult, directory: AccountDirectory, account_lease: AccountLease, ) -> None: """Apply account and proxy feedback, then release the lease (best-effort).""" try: # Release inflight counter. await directory.release(account_lease) # Account feedback via the directory's feedback API. from app.control.account.enums import FeedbackKind _CATEGORY_TO_FEEDBACK = { ResultCategory.SUCCESS: FeedbackKind.SUCCESS, ResultCategory.RATE_LIMITED: FeedbackKind.RATE_LIMITED, ResultCategory.AUTH_FAILURE: FeedbackKind.UNAUTHORIZED, ResultCategory.FORBIDDEN: FeedbackKind.FORBIDDEN, ResultCategory.UPSTREAM_5XX: FeedbackKind.SERVER_ERROR, ResultCategory.TRANSPORT_ERR: FeedbackKind.SERVER_ERROR, ResultCategory.UNKNOWN: FeedbackKind.SERVER_ERROR, } fb_kind = _CATEGORY_TO_FEEDBACK.get(result.category) if fb_kind is not None: await directory.feedback( leases.account_token, fb_kind, plan.mode_id, ) # Proxy feedback. if leases.proxy_lease: proxy_fb = build_proxy_feedback(result) proxy_runtime = await get_proxy_runtime() await proxy_runtime.feedback(leases.proxy_lease, proxy_fb) except Exception as exc: logger.debug("reverse feedback update failed (non-fatal): error={}", exc) __all__ = ["execute"]