CHATSAM / services /content_filter.py
xbang
fix(content-filter): strip base64, truncate, defensive parse, fail-open
92a7396
Raw
History Blame Contribute Delete
7.42 kB
from __future__ import annotations
import re
from curl_cffi import requests
from fastapi import HTTPException
from services.config import config
from services.proxy_service import proxy_settings
from utils.log import logger
DEFAULT_REVIEW_PROMPT = "判断用户请求是否允许。只回答 ALLOW 或 REJECT。"
# Strip base64 image data URIs before review: a text-only review model can't
# analyze image bytes, and a single inlined image easily blows past the token
# budget of the upstream review service.
_BASE64_DATA_URI = re.compile(r"data:[\w/.+;-]+;base64,[A-Za-z0-9+/=]+")
# Cap aligned to the upstream review service's max context. If text still
# exceeds the cap after base64 stripping, keep equal head/tail halves so both
# the system prompt and the most recent user message survive.
_MAX_REVIEW_TEXT_LEN = 100_000
_TRUNCATION_MARKER = "\n…[truncated]…\n"
def _text(value: object) -> str:
if isinstance(value, str):
return value
if isinstance(value, list):
return "\n".join(_text(item) for item in value)
if isinstance(value, dict):
return "\n".join(_text(value.get(key)) for key in ("text", "input_text", "content", "input", "instructions", "system", "prompt"))
return ""
def request_text(*values: object) -> str:
return "\n".join(part for value in values if (part := _text(value).strip()))
def _sanitize_for_review(text: str) -> tuple[str, dict[str, int]]:
"""Strip base64 data URIs and truncate to the review-service context limit.
Returns (sanitized_text, stats) where stats carries base64_blocks_stripped
and truncated_chars so callers can emit structured logs.
"""
sanitized, base64_blocks_stripped = _BASE64_DATA_URI.subn("[image]", text)
truncated_chars = 0
if len(sanitized) > _MAX_REVIEW_TEXT_LEN:
# Reserve marker space so the result stays within the cap.
half = (_MAX_REVIEW_TEXT_LEN - len(_TRUNCATION_MARKER)) // 2
truncated_chars = len(sanitized) - 2 * half
sanitized = sanitized[:half] + _TRUNCATION_MARKER + sanitized[-half:]
stats = {
"base64_blocks_stripped": base64_blocks_stripped,
"truncated_chars": truncated_chars,
}
return sanitized, stats
def _extract_review_decision(data: object) -> str | None:
"""Defensively pull the decision text out of the review service response.
Returns None when the response shape doesn't match the OpenAI chat-completion
contract (e.g. {"error": ...} with no choices). The caller treats None as
"undecided" and applies the configured fail-open policy.
"""
if not isinstance(data, dict):
return None
choices = data.get("choices")
if not isinstance(choices, list) or not choices:
return None
first = choices[0]
if not isinstance(first, dict):
return None
message = first.get("message")
if not isinstance(message, dict):
return None
content = message.get("content")
if content is None:
return None
return str(content).strip().lower()
def _is_allow_decision(decision: str) -> bool:
return decision.startswith(("allow", "pass", "true", "yes", "通过", "允许", "安全"))
def _is_reject_decision(decision: str) -> bool:
return decision.startswith(("reject", "deny", "block", "false", "no", "拒绝", "不允许", "违规", "禁止"))
def _resolve_fail_open(review: dict) -> bool:
"""Resolve fail_open from review config. Defaults to True."""
value = review.get("fail_open")
if value is None:
return True
if isinstance(value, bool):
return value
if isinstance(value, str):
return value.strip().lower() in {"1", "true", "yes", "on"}
return bool(value)
def check_request(text: str) -> None:
text = str(text or "")
if not text.strip():
return
# Local sensitive-word match runs on the raw text (cheap, no network).
for word in config.sensitive_words:
if word in text:
raise HTTPException(status_code=400, detail={"error": "检测到敏感词,拒绝本次任务"})
review = config.ai_review
if not review.get("enabled"):
return
base_url = str(review.get("base_url") or "").strip().rstrip("/")
api_key = str(review.get("api_key") or "").strip()
model = str(review.get("model") or "").strip()
if not base_url or not api_key or not model:
raise HTTPException(status_code=400, detail={"error": "ai review config is incomplete"})
fail_open = _resolve_fail_open(review)
review_text, sanitize_stats = _sanitize_for_review(text)
if sanitize_stats["base64_blocks_stripped"] or sanitize_stats["truncated_chars"]:
logger.info({
"event": "ai_review_text_sanitized",
"original_text_len": len(text),
"review_text_len": len(review_text),
**sanitize_stats,
})
prompt = str(review.get("prompt") or DEFAULT_REVIEW_PROMPT).strip()
content = f"{prompt}\n\n用户请求:\n{review_text}\n\n只回答 ALLOW 或 REJECT。"
# fail_open=True (default): on upstream failure or ambiguous reply, let the
# request through. The review is a soft safety net; one missed review is
# preferable to a 5xx storm when the review service is flaky. Set
# config.ai_review.fail_open=false for strict-compliance deployments.
def _on_failure(event_payload: dict) -> None:
logger.warning(event_payload)
if not fail_open:
raise HTTPException(
status_code=503,
detail={"error": "AI 审核服务暂时不可用,请稍后重试"},
)
try:
response = requests.post(
f"{base_url}/v1/chat/completions",
headers={"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"},
json={"model": model, "messages": [{"role": "user", "content": content}], "temperature": 0},
timeout=60,
**proxy_settings.build_session_kwargs(),
)
except Exception as exc:
_on_failure({
"event": "ai_review_request_failed",
"error": str(exc),
"error_type": exc.__class__.__name__,
"review_text_len": len(review_text),
"original_text_len": len(text),
})
return
try:
data = response.json()
except Exception as exc:
_on_failure({
"event": "ai_review_response_not_json",
"status_code": response.status_code,
"body_preview": str(response.text or "")[:200],
"error": str(exc),
})
return
decision = _extract_review_decision(data)
if decision is None:
_on_failure({
"event": "ai_review_malformed_response",
"status_code": response.status_code,
"body_preview": str(data)[:300],
"review_text_len": len(review_text),
"original_text_len": len(text),
})
return
if _is_allow_decision(decision):
return
if _is_reject_decision(decision):
raise HTTPException(status_code=400, detail={"error": "AI 审核未通过,拒绝本次任务"})
# Ambiguous decisions (e.g. "MAYBE", empty content) fall back to fail-open policy.
_on_failure({
"event": "ai_review_ambiguous_decision",
"decision": decision[:100],
"review_text_len": len(review_text),
})
return