from __future__ import annotations import json from flask import Blueprint, jsonify, request, current_app from .routes_openai import chat_completions custom_bp = Blueprint("custom", __name__) @custom_bp.route("/api", methods=["POST"]) @custom_bp.route("/model/api", methods=["POST"]) @custom_bp.route("//api", methods=["POST"]) def custom_api(model_name: str | None = None): raw = request.get_data(as_text=True) or "" try: payload = json.loads(raw) except Exception: return jsonify({"status": "error", "message": "Invalid JSON"}), 400 prompt = payload.get("prompt") if not prompt: return jsonify({"status": "error", "message": "Missing prompt"}), 400 from .upstream import normalize_model_name, start_upstream_request from .responses_api import instructions_for_model from .reasoning import build_reasoning_param, allowed_efforts_for_model # Use model_name from URL if provided, otherwise default to config or None model = normalize_model_name(model_name, current_app.config.get("DEBUG_MODEL")) input_items = [ {"type": "message", "role": "user", "content": [{"type": "input_text", "text": prompt}]} ] reasoning_param = build_reasoning_param( current_app.config.get("REASONING_EFFORT", "medium"), current_app.config.get("REASONING_SUMMARY", "auto"), None, allowed_efforts=allowed_efforts_for_model(model) ) instructions = instructions_for_model(current_app.config, model) upstream, error_resp = start_upstream_request( model, input_items, instructions=instructions, reasoning_param=reasoning_param ) if error_resp: return jsonify({"status": "error", "message": "Upstream error"}), 502 if upstream.status_code >= 400: return jsonify({"status": "error", "message": f"Upstream returned {upstream.status_code}"}), upstream.status_code full_text = "" try: for raw_line in upstream.iter_lines(decode_unicode=False): if not raw_line: continue line = raw_line.decode("utf-8", errors="ignore") if not line.startswith("data: "): continue data = line[len("data: "):].strip() if not data or data == "[DONE]": break try: evt = json.loads(data) if evt.get("type") == "response.output_text.delta": full_text += evt.get("delta") or "" elif evt.get("type") == "response.completed": break except Exception: continue finally: upstream.close() # Clean the text: remove markdown, special unicode, and newlines clean_text = full_text.strip() # 1. Handle common special unicode characters replacements = { "\u2019": "'", "\u2018": "'", "\u201d": '"', "\u201c": '"', "\u2014": "-", "\u2013": "-", "\u2022": "*", "\u2026": "..." } for old, new in replacements.items(): clean_text = clean_text.replace(old, new) # 2. Strip basic markdown (bold, italics) import re clean_text = re.sub(r'\*\*(.*?)\*\*', r'\1', clean_text) # bold clean_text = re.sub(r'\*(.*?)\*', r'\1', clean_text) # italics clean_text = re.sub(r'__(.*?)__', r'\1', clean_text) # bold clean_text = re.sub(r'_(.*?)_', r'\1', clean_text) # italics # 3. Handle newlines: replace with spaces clean_text = clean_text.replace("\n\n", " ").replace("\n", " ") # 4. Remove emojis and other special non-ascii characters for clean plain text # This removes the \ud83d\ude04 type sequences clean_text = clean_text.encode('ascii', 'ignore').decode('ascii') # 5. Final trim clean_text = re.sub(r'\s+', ' ', clean_text).strip() return jsonify({ "status": "success", "text": clean_text })