| from __future__ import annotations |
|
|
| import json |
| from flask import Blueprint, jsonify, request, current_app |
| from .routes_openai import chat_completions |
|
|
| custom_bp = Blueprint("custom", __name__) |
|
|
| @custom_bp.route("/api", methods=["POST"]) |
| @custom_bp.route("/model/api", methods=["POST"]) |
| @custom_bp.route("/<model_name>/api", methods=["POST"]) |
| def custom_api(model_name: str | None = None): |
| raw = request.get_data(as_text=True) or "" |
| try: |
| payload = json.loads(raw) |
| except Exception: |
| return jsonify({"status": "error", "message": "Invalid JSON"}), 400 |
| |
| prompt = payload.get("prompt") |
| if not prompt: |
| return jsonify({"status": "error", "message": "Missing prompt"}), 400 |
|
|
| from .upstream import normalize_model_name, start_upstream_request |
| from .responses_api import instructions_for_model |
| from .reasoning import build_reasoning_param, allowed_efforts_for_model |
| |
| |
| model = normalize_model_name(model_name, current_app.config.get("DEBUG_MODEL")) |
| |
| input_items = [ |
| {"type": "message", "role": "user", "content": [{"type": "input_text", "text": prompt}]} |
| ] |
| |
| reasoning_param = build_reasoning_param( |
| current_app.config.get("REASONING_EFFORT", "medium"), |
| current_app.config.get("REASONING_SUMMARY", "auto"), |
| None, |
| allowed_efforts=allowed_efforts_for_model(model) |
| ) |
| |
| instructions = instructions_for_model(current_app.config, model) |
| |
| upstream, error_resp = start_upstream_request( |
| model, |
| input_items, |
| instructions=instructions, |
| reasoning_param=reasoning_param |
| ) |
| |
| if error_resp: |
| return jsonify({"status": "error", "message": "Upstream error"}), 502 |
| |
| if upstream.status_code >= 400: |
| return jsonify({"status": "error", "message": f"Upstream returned {upstream.status_code}"}), upstream.status_code |
|
|
| full_text = "" |
| try: |
| for raw_line in upstream.iter_lines(decode_unicode=False): |
| if not raw_line: continue |
| line = raw_line.decode("utf-8", errors="ignore") |
| if not line.startswith("data: "): continue |
| data = line[len("data: "):].strip() |
| if not data or data == "[DONE]": break |
| try: |
| evt = json.loads(data) |
| if evt.get("type") == "response.output_text.delta": |
| full_text += evt.get("delta") or "" |
| elif evt.get("type") == "response.completed": |
| break |
| except Exception: |
| continue |
| finally: |
| upstream.close() |
|
|
| |
| clean_text = full_text.strip() |
| |
| |
| replacements = { |
| "\u2019": "'", |
| "\u2018": "'", |
| "\u201d": '"', |
| "\u201c": '"', |
| "\u2014": "-", |
| "\u2013": "-", |
| "\u2022": "*", |
| "\u2026": "..." |
| } |
| for old, new in replacements.items(): |
| clean_text = clean_text.replace(old, new) |
| |
| |
| import re |
| clean_text = re.sub(r'\*\*(.*?)\*\*', r'\1', clean_text) |
| clean_text = re.sub(r'\*(.*?)\*', r'\1', clean_text) |
| clean_text = re.sub(r'__(.*?)__', r'\1', clean_text) |
| clean_text = re.sub(r'_(.*?)_', r'\1', clean_text) |
| |
| |
| clean_text = clean_text.replace("\n\n", " ").replace("\n", " ") |
| |
| |
| |
| clean_text = clean_text.encode('ascii', 'ignore').decode('ascii') |
| |
| |
| clean_text = re.sub(r'\s+', ' ', clean_text).strip() |
|
|
| return jsonify({ |
| "status": "success", |
| "text": clean_text |
| }) |
|
|