grok2api / app /dataplane /reverse /transport /_proxy_feedback.py
FUCAT's picture
Deploy grok2api to HF Spaces (Docker)
7e55e53
Raw
History Blame Contribute Delete
1.17 kB
"""Shared helper: map an UpstreamError to the correct ProxyFeedbackKind.
All transport modules (assets, media, livekit, imagine_ws …) use this so the
mapping stays consistent and clearance bundles are properly invalidated.
Rules
-----
401 β†’ UNAUTHORIZED (invalidates clearance bundle)
403 β†’ CHALLENGE (invalidates clearance bundle β€” treat all 403s as potential CF)
429 β†’ RATE_LIMITED
β‰₯500 β†’ UPSTREAM_5XX
else β†’ TRANSPORT_ERROR
"""
from app.platform.errors import UpstreamError
from app.control.proxy.models import ProxyFeedback, ProxyFeedbackKind
def upstream_feedback(exc: UpstreamError) -> ProxyFeedback:
"""Return a ``ProxyFeedback`` for an ``UpstreamError`` response."""
status = exc.status or 0
if status == 401:
kind = ProxyFeedbackKind.UNAUTHORIZED
elif status == 403:
kind = ProxyFeedbackKind.CHALLENGE
elif status == 429:
kind = ProxyFeedbackKind.RATE_LIMITED
elif status >= 500:
kind = ProxyFeedbackKind.UPSTREAM_5XX
else:
kind = ProxyFeedbackKind.TRANSPORT_ERROR
return ProxyFeedback(kind=kind, status_code=status or None)
__all__ = ["upstream_feedback"]