from __future__ import annotations import ast import json import re from typing import Any from models import ChatRequest def clamp01(x: Any, default: float = 0.5) -> float: try: v = float(x) return max(0.0, min(1.0, v)) except Exception: return default def normalize_spaces(text: str) -> str: return re.sub(r"\s+", " ", text).strip() def clean_math_text(text: str) -> str: t = text t = t.replace("×", "*").replace("÷", "/").replace("–", "-").replace("—", "-") t = t.replace("−", "-").replace("^", "**") return t def extract_text_from_any_payload(payload: Any) -> str: if payload is None: return "" if isinstance(payload, str): s = payload.strip() if (s.startswith("{") and s.endswith("}")) or (s.startswith("[") and s.endswith("]")): try: decoded = json.loads(s) return extract_text_from_any_payload(decoded) except Exception: pass try: decoded = ast.literal_eval(s) if isinstance(decoded, (dict, list)): return extract_text_from_any_payload(decoded) except Exception: pass return s if isinstance(payload, dict): for key in [ "message", "prompt", "query", "text", "user_message", "input", "data", "payload", "body", "content" ]: if key in payload: extracted = extract_text_from_any_payload(payload[key]) if extracted: return extracted strings = [] for v in payload.values(): if isinstance(v, (str, dict, list)): maybe = extract_text_from_any_payload(v) if maybe: strings.append(maybe) return "\n".join(strings).strip() if isinstance(payload, list): parts = [extract_text_from_any_payload(x) for x in payload] return "\n".join([p for p in parts if p]).strip() return str(payload).strip() def get_user_text(req: ChatRequest, raw_body: Any = None) -> str: for field in ["message", "prompt", "query", "text", "user_message"]: value = getattr(req, field, None) if isinstance(value, str) and value.strip(): return value.strip() return extract_text_from_any_payload(raw_body).strip()