""" rita2api โ€” OpenAI-compatible reverse proxy for rita.ai Bridges the OpenAI Chat Completions API to rita.ai's /aichat/completions endpoint. Supports streaming SSE, multi-account rotation with WebUI management, and tool calling. """ import json import os import re import time import hashlib import datetime import requests from dotenv import load_dotenv from flask import Flask, request, Response, jsonify, render_template, session from threading import Lock # MUST load .env before importing AccountManager (which reads env vars at module level) load_dotenv() from accounts import AccountManager import auto_register from quota import get_cost, get_all_costs from database import get_db # ===================== Configuration ===================== DEBUG_MODE = os.getenv("DEBUG", "1") == "1" UPSTREAM_URL = os.getenv("RITA_UPSTREAM", "https://api_v2.rita.ai") RITA_ORIGIN = os.getenv("RITA_ORIGIN", "https://www.rita.ai") HOST = os.getenv("HOST", "0.0.0.0") PORT = int(os.getenv("PORT", "10089")) # Disable SSL verification for api_v2.rita.ai (hostname mismatch in upstream cert) DISABLE_SSL_VERIFY = os.getenv("DISABLE_SSL_VERIFY", "0") == "1" AUTH_TOKEN = os.getenv("AUTH_TOKEN", "") app = Flask(__name__) app.secret_key = os.getenv("FLASK_SECRET", os.urandom(24).hex()) acm = AccountManager() _conv_lock = Lock() # ===================== Auth Middleware ===================== def _check_auth() -> bool: """Return True if the request is authenticated (or auth is disabled).""" if not AUTH_TOKEN: return True # Check Authorization header auth_header = request.headers.get("Authorization", "") if auth_header.startswith("Bearer ") and auth_header[7:] == AUTH_TOKEN: return True # Check query param if request.args.get("auth") == AUTH_TOKEN: return True # Check session cookie if session.get("auth_token") == AUTH_TOKEN: return True return False @app.before_request def require_auth(): path = request.path # Always allow without auth if path == "/" or path == "/health": return None # Allow login and auth-check endpoints if path in ("/api/login", "/api/auth/check"): return None # /v1/* proxy endpoints: check Bearer token (OpenAI client style) if path.startswith("/v1/"): if AUTH_TOKEN and not _check_auth(): return jsonify({"error": {"message": "Incorrect API key provided", "type": "invalid_request_error", "code": "invalid_api_key"}}), 401 return None # /api/* and /debug/*: require auth if path.startswith("/api/") or path.startswith("/debug/"): if not _check_auth(): return jsonify({"error": "Unauthorized", "auth_required": True}), 401 return None # ===================== Usage Statistics ===================== _stats_lock = Lock() _usage_stats = { "total_requests": 0, "total_tokens_approx": 0, "requests_by_model": {}, "requests_today": 0, "requests_today_date": datetime.date.today().isoformat(), "last_reset": time.time(), "uptime_start": time.time(), } def _increment_stats(model: str, messages: list, response_text: str = ""): with _stats_lock: # Reset today counter if day changed today = datetime.date.today().isoformat() if _usage_stats["requests_today_date"] != today: _usage_stats["requests_today"] = 0 _usage_stats["requests_today_date"] = today _usage_stats["total_requests"] += 1 _usage_stats["requests_today"] += 1 _usage_stats["requests_by_model"][model] = \ _usage_stats["requests_by_model"].get(model, 0) + 1 # Rough token estimate: chars / 4 msg_chars = sum(len(str(m.get("content", ""))) for m in messages) resp_chars = len(response_text) _usage_stats["total_tokens_approx"] += (msg_chars + resp_chars) // 4 # In-memory conversation state _conversation_state: dict[str, dict] = {} # Cache _models_cache: dict = {} _models_cache_ts: float = 0 MODELS_CACHE_TTL = 3600 _ai_tools_cache: dict = {} _ai_tools_cache_ts: float = 0 AI_TOOLS_CACHE_TTL = 3600 # ===================== Logging ===================== def log(msg, level="INFO"): if not DEBUG_MODE and level == "DEBUG": return color = {"INFO": "\033[94m", "DEBUG": "\033[92m", "WARNING": "\033[93m", "ERROR": "\033[91m", "SUCCESS": "\033[92m"}.get(level, "") print(f"{color}[{time.strftime('%H:%M:%S')}] {msg}\033[0m") # ===================== Message Format Translation ===================== def extract_text(content) -> str: if isinstance(content, str): return content if isinstance(content, list): parts = [] for block in content: if isinstance(block, dict): if block.get("type") == "text": parts.append(block.get("text", "")) elif block.get("type") == "image_url": parts.append("[image]") return "".join(parts) return str(content) if content else "" def build_rita_messages(messages: list) -> list: rita_msgs = [] system_parts = [] for msg in messages: role = msg.get("role", "") content = msg.get("content", "") if role == "system": system_parts.append(extract_text(content)) elif role in ("user", "assistant"): text = extract_text(content) if text: rita_msgs.append({"type": "text", "text": text}) if system_parts and rita_msgs: rita_msgs[0]["text"] = "\n" + "\n".join(system_parts) + "\n\n\n" + rita_msgs[0]["text"] return rita_msgs # ===================== Model Resolution ===================== def resolve_rita_model(model: str) -> str: """Resolve client model name to Rita's model_xxx format. If already in model_xxx format, pass through directly. Otherwise, try to match known aliases. """ # Already in Rita format if model.startswith("model_"): return model model_lower = model.lower() # Static alias mappings (will be supplemented by dynamic catalog) mappings = { "rita": "model_25", "rita-pro": "model_37", } if model_lower in mappings: return mappings[model_lower] for key, value in mappings.items(): if model_lower.startswith(key): return value # Dynamic mapping: try to find from cached model catalog if _models_cache: for m in _models_cache.get("data", []): mid = m.get("id", "") mname = m.get("name", "").lower() # Match by name (e.g., "GPT-4o" -> "model_xx") if model_lower == mname or model_lower in mname or mname.startswith(model_lower): return mid # Fallback: return as-is (let upstream handle it) return model # ===================== Tool Calling ===================== _TOOL_PROMPT_TMPL = ( "You have access to the following tools. When you need to use a tool, " "respond ONLY with a JSON object (no other text):\n" "{tool_defs}\n" "Format:\n" ' Single tool: {{"tool":"tool_name","args":{{"param":"value"}}}}\n' ' Multiple tools: {{"calls":[{{"tool":"name","args":{{}}}}]}}\n' "If no tool is needed, respond normally.\n\n" "{query}" ) _tool_prompt_cache: dict[str, str] = {} def _tools_hash(tools: list) -> str: key = json.dumps([ t.get("name") or t.get("function", {}).get("name", "") for t in tools ], ensure_ascii=False) return hashlib.md5(key.encode()).hexdigest()[:12] def _compact_tools(tools: list) -> str: parts = [] for t in tools: if t.get("type") == "function": fn = t["function"] name = fn.get("name", "") props = fn.get("parameters", {}).get("properties", {}) req = set(fn.get("parameters", {}).get("required", [])) elif "input_schema" in t: name = t.get("name", "") props = t.get("input_schema", {}).get("properties", {}) req = set(t.get("input_schema", {}).get("required", [])) else: continue params = [f"{p}{'*' if p in req else '?'}" for p in props] parts.append(f"{name}({','.join(params)})" if params else name) return " | ".join(parts) def inject_tool_prompt(user_text: str, tools: list) -> str: th = _tools_hash(tools) if th not in _tool_prompt_cache: _tool_prompt_cache[th] = _compact_tools(tools) return _TOOL_PROMPT_TMPL.format(tool_defs=_tool_prompt_cache[th], query=user_text) _JSON_RE = re.compile(r'\{.*\}', re.DOTALL) def parse_tool_response(raw: str) -> dict: m = _JSON_RE.search(raw) if not m: return {"type": "text", "text": raw} try: obj = json.loads(m.group()) except json.JSONDecodeError: return {"type": "text", "text": raw} if "tool" in obj and "args" in obj: return {"type": "tool_calls", "calls": [{"name": obj["tool"], "input": obj["args"]}]} calls_raw = obj.get("calls", []) if calls_raw: calls = [] for c in calls_raw: name = c.get("tool") or c.get("name", "") inp = c.get("args") or c.get("parameters") or c.get("input") or {} calls.append({"name": name, "input": inp}) return {"type": "tool_calls", "calls": calls} return {"type": "text", "text": raw} # ===================== Conversation State ===================== def get_conv_key(messages: list) -> str: key_parts = [] for msg in messages[:-1]: role = msg.get("role", "") content = extract_text(msg.get("content", ""))[:100] key_parts.append(f"{role}:{content}") return hashlib.md5("|".join(key_parts).encode()).hexdigest()[:16] def get_or_create_conversation(messages: list) -> tuple[int, str]: key = get_conv_key(messages) with _conv_lock: state = _conversation_state.get(key, {}) return state.get("chat_id", 0), state.get("parent", "0") def update_conversation_state(messages: list, assistant_msg_id: str, chat_id: int): key = get_conv_key(messages) with _conv_lock: _conversation_state[key] = { "chat_id": chat_id, "parent": assistant_msg_id, "last_updated": time.time(), } def _detect_chinese(messages: list) -> bool: for msg in messages: content = msg.get("content", "") if isinstance(content, list): for block in content: if block.get("type") == "text": if re.search(r"[\u4e00-\u9fff]", block.get("text", "")): return True elif isinstance(content, str): if re.search(r"[\u4e00-\u9fff]", content): return True return False # ========================================================================= # WebUI # ========================================================================= @app.route("/", methods=["GET"]) def webui(): return render_template("index.html") # ========================================================================= # Auth API # ========================================================================= @app.route("/api/login", methods=["POST"]) def api_login(): data = request.json or {} token = data.get("token", "").strip() if not AUTH_TOKEN: # No auth configured โ€” always succeed return jsonify({"ok": True, "auth_required": False}) if token == AUTH_TOKEN: session["auth_token"] = AUTH_TOKEN return jsonify({"ok": True, "auth_required": True}) return jsonify({"ok": False, "error": "Invalid token"}), 401 @app.route("/api/auth/check", methods=["GET"]) def api_auth_check(): return jsonify({ "auth_required": bool(AUTH_TOKEN), "authenticated": _check_auth(), }) # ========================================================================= # Stats API # ========================================================================= @app.route("/api/stats", methods=["GET"]) def api_stats(): db = get_db() db_stats = db.get_usage_stats() with _stats_lock: uptime_secs = int(time.time() - _usage_stats["uptime_start"]) summary = acm.summary() return jsonify({ **db_stats, "uptime_seconds": uptime_secs, "uptime_start": _usage_stats["uptime_start"], "total_quota": summary.get("total_quota", 0), "active_accounts": summary.get("active", 0), "model_costs": get_all_costs(), }) # ========================================================================= # Account Management API (/api/accounts/*) # ========================================================================= @app.route("/api/accounts", methods=["GET"]) def api_list_accounts(): return jsonify({"accounts": acm.list_all()}) @app.route("/api/accounts/summary", methods=["GET"]) def api_account_summary(): return jsonify(acm.summary()) @app.route("/api/accounts", methods=["POST"]) def api_add_account(): """Add a single account: {token, visitorid?, name?, email?, password?, mail_provider?, mail_api_key?}""" data = request.json or {} token = data.get("token", "").strip() if not token: return jsonify({"error": "token is required"}), 400 acc = acm.add( token=token, visitorid=data.get("visitorid", "").strip(), name=data.get("name", "").strip(), email=data.get("email", "").strip(), password=data.get("password", "").strip(), mail_provider=data.get("mail_provider", "").strip(), mail_api_key=data.get("mail_api_key", "").strip(), ) log(f"โž• Account added: {acc.name} ({acc.id})", "SUCCESS") return jsonify({"ok": True, "account": acc.to_status()}), 201 @app.route("/api/accounts/batch", methods=["POST"]) def api_batch_add(): """ Batch add accounts. Body: { "accounts": [ {token, visitorid?, name?}, ... ] } """ data = request.json or {} items = data.get("accounts", []) if not items: return jsonify({"error": "accounts array is required"}), 400 added = acm.add_batch(items) log(f"๐Ÿ“ฆ Batch added {len(added)} accounts", "SUCCESS") return jsonify({ "ok": True, "added": len(added), "accounts": [a.to_status() for a in added], }), 201 @app.route("/api/accounts/", methods=["PUT"]) def api_update_account(account_id): data = request.json or {} acc = acm.update(account_id, **data) if not acc: return jsonify({"error": "not found"}), 404 log(f"โœ๏ธ Account updated: {acc.name} ({acc.id})", "INFO") return jsonify({"ok": True, "account": acc.to_status()}) @app.route("/api/accounts/", methods=["DELETE"]) def api_delete_account(account_id): if acm.delete(account_id): log(f"๐Ÿ—‘ Account deleted: {account_id}", "WARNING") return jsonify({"ok": True}) return jsonify({"error": "not found"}), 404 @app.route("/api/accounts//toggle", methods=["POST"]) def api_toggle_account(account_id): acc = acm.toggle(account_id) if not acc: return jsonify({"error": "not found"}), 404 state = "enabled" if acc.enabled else "disabled" log(f"โฏ Account {acc.name} โ†’ {state}", "INFO") return jsonify({"ok": True, "enabled": acc.enabled}) @app.route("/api/accounts//test", methods=["POST"]) def api_test_account(account_id): result = acm.test_account(account_id, UPSTREAM_URL, RITA_ORIGIN) return jsonify(result) @app.route("/api/accounts/test-all", methods=["POST"]) def api_test_all(): accounts = acm.list_all() results = {} ok_count = 0 for a in accounts: r = acm.test_account(a["id"], UPSTREAM_URL, RITA_ORIGIN) results[a["id"]] = r if r.get("ok"): ok_count += 1 return jsonify({"results": results, "total": len(accounts), "ok_count": ok_count}) @app.route("/api/accounts//refresh", methods=["POST"]) def api_refresh_account(account_id): """Re-login to get a fresh token using stored email credentials.""" acc = acm.get(account_id) if not acc: return jsonify({"error": "not found"}), 404 if not acc.email: return jsonify({"error": "no email stored for this account, cannot refresh"}), 400 log(f"๐Ÿ”„ Refreshing token for {acc.name} (email={acc.email})...", "INFO") try: result = auto_register.refresh_account_token( email=acc.email, password=acc.password, mail_provider=acc.mail_provider, mail_api_key=acc.mail_api_key, ) new_token = result["token"] # Update account with new token and re-enable acm.reactivate_account(account_id, new_token=new_token) log(f"โœ… Token refreshed for {acc.name}", "SUCCESS") return jsonify({"ok": True, "account": acc.to_status()}) except Exception as e: log(f"โŒ Token refresh failed for {acc.name}: {e}", "ERROR") return jsonify({"error": str(e)}), 500 @app.route("/api/accounts/reset", methods=["POST"]) def api_reset_failures(): count = acm.reset_failures() log(f"๐Ÿ”„ Reset failures for {count} accounts", "INFO") return jsonify({"ok": True, "reset": count}) @app.route("/api/accounts/batch-action", methods=["POST"]) def api_batch_action(): """Batch operations on selected accounts. Body: { "ids": ["id1","id2",...], "action": "enable|disable|delete|test|refresh" } Or: { "all": true, "action": "enable|disable|delete|test|refresh" } """ data = request.json or {} action = data.get("action", "").strip() use_all = data.get("all", False) if use_all: ids = [a["id"] for a in acm.list_all()] else: ids = data.get("ids", []) if not ids: return jsonify({"error": "no accounts specified"}), 400 if action not in ("enable", "disable", "delete", "test", "refresh"): return jsonify({"error": f"unknown action: {action}"}), 400 results = {"action": action, "total": len(ids), "success": 0, "failed": 0, "details": {}} for aid in ids: try: if action == "enable": acm.reactivate_account(aid) results["success"] += 1 elif action == "disable": acc = acm.get(aid) if acc and acc.enabled: acm.toggle(aid) results["success"] += 1 elif action == "delete": if acm.delete(aid): results["success"] += 1 else: results["failed"] += 1 elif action == "test": r = acm.test_account(aid, UPSTREAM_URL, RITA_ORIGIN) results["details"][aid] = r if r.get("ok"): results["success"] += 1 else: results["failed"] += 1 elif action == "refresh": acc = acm.get(aid) if not acc or not acc.email: results["details"][aid] = {"ok": False, "error": "no email"} results["failed"] += 1 continue try: result = auto_register.refresh_account_token( email=acc.email, password=acc.password, mail_provider=acc.mail_provider, mail_api_key=acc.mail_api_key, ) acm.reactivate_account(aid, new_token=result["token"]) results["details"][aid] = {"ok": True} results["success"] += 1 except Exception as e: results["details"][aid] = {"ok": False, "error": str(e)} results["failed"] += 1 except Exception as e: results["details"][aid] = {"ok": False, "error": str(e)} results["failed"] += 1 log(f"๐Ÿ“ฆ Batch {action}: {results['success']}/{results['total']} success", "INFO") return jsonify({"ok": True, **results}) @app.route("/api/accounts/clear", methods=["DELETE"]) def api_clear_all(): count = acm.delete_all() log(f"๐Ÿงน Cleared all {count} accounts", "WARNING") return jsonify({"ok": True, "deleted": count}) @app.route("/api/accounts//reactivate", methods=["POST"]) def api_reactivate_account(account_id): """Re-enable a disabled account, optionally with a new token.""" data = request.json or {} new_token = data.get("token", "").strip() acc = acm.reactivate_account(account_id, new_token) if not acc: return jsonify({"error": "not found"}), 404 log(f"โ™ป๏ธ Account reactivated: {acc.name} ({acc.id})" + (" with new token" if new_token else ""), "SUCCESS") return jsonify({"ok": True, "account": acc.to_status()}) @app.route("/api/accounts/purge-invalid", methods=["POST"]) def api_purge_invalid(): """Remove all accounts with expired/invalid tokens.""" count = acm.purge_invalid() log(f"๐Ÿ—‘ Purged {count} invalid accounts", "WARNING") return jsonify({"ok": True, "purged": count}) @app.route("/api/health-check", methods=["GET"]) def api_health_check_status(): """Return the latest background health check results.""" return jsonify(acm.get_health_status()) @app.route("/api/health-check/run", methods=["POST"]) def api_run_health_check(): """Trigger an immediate health check on all accounts.""" accounts = acm.list_all() results = {} ok_count = 0 disabled_count = 0 for a in accounts: r = acm.test_account(a["id"], UPSTREAM_URL, RITA_ORIGIN) results[a["id"]] = r if r.get("ok"): ok_count += 1 elif r.get("code") == 401: # Auto-disable expired token acc = acm.get(a["id"]) if acc: acc.token_valid = False acc.enabled = False acc.disabled_reason = "token_expired_401" acc.last_error = "Token expired (manual check)" disabled_count += 1 log(f"๐Ÿ”ด Auto-disabled {acc.name}: token expired (401)", "WARNING") if disabled_count > 0: acm._save() return jsonify({ "results": results, "total": len(accounts), "ok_count": ok_count, "auto_disabled": disabled_count, }) # ========================================================================= # Mail Verification Code Query API # ========================================================================= @app.route("/api/mail/check-code", methods=["POST"]) def api_check_mail_code(): """Query the latest verification code for an email address. Body: { "email": "...", "mail_provider": "gptmail", "mail_api_key": "..." } Or: { "account_id": "abc123" } โ€” looks up email/provider from account """ data = request.json or {} account_id = data.get("account_id", "").strip() email = data.get("email", "").strip() mail_provider = data.get("mail_provider", "").strip() mail_api_key = data.get("mail_api_key", "").strip() # If account_id is given, look up the account's email and provider if account_id and not email: acc = acm.get(account_id) if not acc: return jsonify({"ok": False, "error": "account not found"}), 404 if not acc.email: return jsonify({"ok": False, "error": "account has no email set"}), 400 email = acc.email mail_provider = mail_provider or acc.mail_provider or "gptmail" mail_api_key = mail_api_key or acc.mail_api_key or "" if not email: return jsonify({"error": "email or account_id is required"}), 400 mail_provider = mail_provider or "gptmail" try: code = auto_register.wait_for_code_by_provider( email=email, mail_provider=mail_provider, mail_api_key=mail_api_key, timeout=15, # Quick check, not full wait ) if code: return jsonify({"ok": True, "code": code, "email": email}) else: return jsonify({"ok": False, "message": "No verification code found", "email": email}) except Exception as e: return jsonify({"ok": False, "error": str(e)}), 500 @app.route("/api/accounts/emails", methods=["GET"]) def api_list_account_emails(): """Return a list of accounts that have emails configured (for mail code lookup).""" accounts = acm.list_all() result = [ {"id": a["id"], "name": a["name"], "email": a["email"], "mail_provider": a.get("mail_provider", "")} for a in accounts if a.get("email") ] return jsonify({"accounts": result}) @app.route("/api/mail/status", methods=["GET"]) def api_mail_status(): """Return mail service configuration status.""" db = get_db() return jsonify({ "gptmail": { "configured": bool(db.get_config("GPTMAIL_API_KEY")), "api_base": db.get_config("GPTMAIL_API_BASE", "https://mail.chatgpt.org.uk"), }, "yydsmail": { "configured": bool(db.get_config("YYDSMAIL_API_KEY")), "api_base": db.get_config("YYDSMAIL_API_BASE", "https://maliapi.215.im/v1"), }, }) # ========================================================================= # Ticket API (re-login to get ticket) # ========================================================================= @app.route("/api/accounts//ticket", methods=["POST"]) def api_get_ticket(account_id): """Get a fresh ticket for an account by re-authenticating with its token.""" acc = acm.get(account_id) if not acc: return jsonify({"error": "not found"}), 404 if not acc.token: return jsonify({"error": "no token set"}), 400 try: result = auto_register.relogin_for_ticket(acc.token) if result.get("ticket"): return jsonify({"ok": True, "ticket": result["ticket"]}) else: return jsonify({"ok": False, "message": "Could not get ticket, token may be expired"}) except Exception as e: return jsonify({"ok": False, "error": str(e)}), 500 # ========================================================================= # Config Management API # ========================================================================= @app.route("/api/config", methods=["GET"]) def api_get_config(): """Return all config key-value pairs.""" db = get_db() configs = db.get_all_config() # Mask sensitive values for display for c in configs: key = c["key"].upper() if any(s in key for s in ("TOKEN", "KEY", "PASSWORD", "SECRET")): val = c["value"] if val: c["value_masked"] = val[:4] + "***" + val[-4:] if len(val) > 8 else "***" else: c["value_masked"] = "" else: c["value_masked"] = c["value"] return jsonify({"configs": configs}) @app.route("/api/config", methods=["PUT"]) def api_set_config(): """Update config values. Body: { "configs": { "key": "value", ... } }""" data = request.json or {} configs = data.get("configs", {}) if not configs: return jsonify({"error": "configs dict is required"}), 400 db = get_db() for key, value in configs.items(): db.set_config(key, value) log(f"Config updated: {list(configs.keys())}", "INFO") return jsonify({"ok": True, "updated": list(configs.keys())}) # ========================================================================= # Auto Registration API # ========================================================================= @app.route("/api/auto-register/config", methods=["GET"]) def api_auto_register_config(): """Check auto-register configuration status.""" return jsonify(auto_register.check_config()) @app.route("/api/auto-register", methods=["POST"]) def api_auto_register(): """Manually trigger registration of new account(s). Body: { "count": 1 } """ data = request.json or {} count = min(data.get("count", 1), 5) # Max 5 at a time config = auto_register.check_config() if not config["ready"]: missing = [] if not config["yescaptcha_configured"]: missing.append("YESCAPTCHA_KEY") if not config["gptmail_configured"]: missing.append("GPTMAIL_API_KEY") return jsonify({ "error": f"Auto-register not configured. Missing: {', '.join(missing)}", "config": config, }), 400 log(f"๐Ÿ”„ Manual auto-register triggered: {count} account(s)", "INFO") results = auto_register.auto_register_batch( count=count, account_manager=acm, upstream_url=UPSTREAM_URL, origin=RITA_ORIGIN, ) return jsonify({ "ok": True, "registered": len(results), "requested": count, "accounts": results, }) # ========================================================================= # Health # ========================================================================= @app.route("/health", methods=["GET"]) def health(): s = acm.summary() return jsonify({ "status": "ok", **s, "upstream": UPSTREAM_URL, }) # ========================================================================= # Model Catalog # ========================================================================= @app.route("/v1/models", methods=["GET"]) def list_models(): global _models_cache, _models_cache_ts now = time.time() if _models_cache and now - _models_cache_ts < MODELS_CACHE_TTL: return jsonify(_models_cache) acc, _ = acm.next() if not acc: return jsonify({"error": "no accounts configured"}), 500 try: r = requests.post( f"{UPSTREAM_URL}/aichat/categoryModels", headers=acm.upstream_headers(acc, RITA_ORIGIN), json={"language": "zh"}, timeout=15, verify=not DISABLE_SSL_VERIFY, ) r.raise_for_status() upstream = r.json() acm.mark_ok(acc) data = [] for cat in upstream.get("data", {}).get("category_models", []): cat_name = cat.get("name", "") for m in cat.get("models", []): mid = m.get("key", "") data.append({ "id": mid, "object": "model", "created": 0, "owned_by": "rita", "name": m.get("name", mid), "description": m.get("desc", ""), "quota": m.get("quota", 0), "tool": m.get("tool", ""), "ability": m.get("ability", ""), "category": cat_name, }) result = {"object": "list", "data": data} _models_cache, _models_cache_ts = result, now return jsonify(result) except Exception as e: log(f"โš ๏ธ Failed to fetch model catalog: {e}", "WARNING") acm.mark_fail(acc, str(e)) return jsonify({"error": str(e)}), 502 # ========================================================================= # AI Tools # ========================================================================= @app.route("/v1/tools", methods=["GET"]) def list_ai_tools(): global _ai_tools_cache, _ai_tools_cache_ts now = time.time() if _ai_tools_cache and now - _ai_tools_cache_ts < AI_TOOLS_CACHE_TTL: return jsonify(_ai_tools_cache) acc, _ = acm.next() if not acc: return jsonify({"error": "no accounts configured"}), 500 try: r = requests.post( f"{UPSTREAM_URL}/gamsai_api/v1/page_service/aiTools", headers=acm.upstream_headers(acc, RITA_ORIGIN), json={"language": "zh"}, timeout=15, verify=not DISABLE_SSL_VERIFY, ) r.raise_for_status() upstream = r.json() acm.mark_ok(acc) tools = upstream.get("data", []) data = [{ "id": t.get("id"), "name": t.get("name"), "description": t.get("description"), "tool_type": t.get("tool_type"), "price": t.get("price", 0), "rules": t.get("rules", {}), } for t in tools] result = {"object": "list", "data": data} _ai_tools_cache, _ai_tools_cache_ts = result, now return jsonify(result) except Exception as e: log(f"โš ๏ธ Failed to fetch AI tools: {e}", "WARNING") acm.mark_fail(acc, str(e)) return jsonify({"error": str(e)}), 502 @app.route("/v1/tools/execute", methods=["POST"]) def execute_ai_tool(): data = request.json or {} tool_id = data.get("tool_id") if not tool_id: return jsonify({"error": "tool_id is required"}), 400 acc, _ = acm.next() if not acc: return jsonify({"error": "no accounts configured"}), 500 try: payload = {"action": data.get("action", "edit_prompt"), "prompt": data.get("prompt", "")} if data.get("image_url"): payload["image_url"] = data["image_url"] r = requests.post( f"{UPSTREAM_URL}/gamsai_api/v1/page_service/aiTools/{tool_id}/execute", headers=acm.upstream_headers(acc, RITA_ORIGIN), json=payload, timeout=60, verify=not DISABLE_SSL_VERIFY, ) r.raise_for_status() acm.mark_ok(acc) return jsonify(r.json()) except Exception as e: log(f"โŒ AI tool execution error: {e}", "ERROR") acm.mark_fail(acc, str(e)) return jsonify({"error": str(e)}), 502 # ========================================================================= # Conversation Management # ========================================================================= @app.route("/v1/conversations", methods=["POST"]) def list_conversations(): data = request.json or {} acc, _ = acm.next() if not acc: return jsonify({"error": "no accounts configured"}), 500 try: r = requests.post( f"{UPSTREAM_URL}/aichat/conversations", headers=acm.upstream_headers(acc, RITA_ORIGIN), json={"page": data.get("page", 1), "limit": data.get("limit", 20)}, timeout=15, verify=not DISABLE_SSL_VERIFY, ) r.raise_for_status() acm.mark_ok(acc) return jsonify(r.json()) except Exception as e: acm.mark_fail(acc, str(e)) return jsonify({"error": str(e)}), 502 @app.route("/v1/chat/init", methods=["POST"]) def new_conversation(): data = request.json or {} acc, _ = acm.next() if not acc: return jsonify({"error": "no accounts configured"}), 500 try: model = data.get("model", "model_25") r = requests.post( f"{UPSTREAM_URL}/chatgpt/newConversation", headers=acm.upstream_headers(acc, RITA_ORIGIN), json={"model": resolve_rita_model(model)}, timeout=15, verify=not DISABLE_SSL_VERIFY, ) r.raise_for_status() acm.mark_ok(acc) return jsonify(r.json()) except Exception as e: acm.mark_fail(acc, str(e)) return jsonify({"error": str(e)}), 502 @app.route("/v1/chat/title", methods=["POST"]) def get_title(): data = request.json or {} acc, _ = acm.next() if not acc: return jsonify({"error": "no accounts configured"}), 500 try: r = requests.post( f"{UPSTREAM_URL}/aichat/getTitle", headers=acm.upstream_headers(acc, RITA_ORIGIN), json={"chat_id": data.get("chat_id"), "messages": build_rita_messages(data.get("messages", []))}, timeout=15, verify=not DISABLE_SSL_VERIFY, ) r.raise_for_status() acm.mark_ok(acc) return jsonify(r.json()) except Exception as e: acm.mark_fail(acc, str(e)) return jsonify({"error": str(e)}), 502 # ========================================================================= # Core Chat Completions # ========================================================================= @app.route("/v1/chat/completions", methods=["POST"]) def chat_completions(): data = request.json or {} messages = data.get("messages", []) model = data.get("model", "gpt-4o") stream = data.get("stream", False) client_tools = data.get("tools", []) log(f"๐Ÿ“ฅ /v1/chat/completions model={model} stream={stream} msgs={len(messages)} tools={len(client_tools)}", "INFO") _increment_stats(model, messages) if not messages: return jsonify({"error": {"message": "messages is required", "type": "invalid_request_error"}}), 400 # Get account (client header override or rotation) client_token = request.headers.get("token", "") client_visitorid = request.headers.get("visitorid", "") acc, _ = acm.next() if not acc and not client_token: return jsonify({"error": {"message": "no accounts configured", "type": "config_error"}}), 500 try: rita_model = resolve_rita_model(model) chat_id, parent = get_or_create_conversation(messages) rita_messages = build_rita_messages(messages) if client_tools and rita_messages: last_text = rita_messages[-1]["text"] rita_messages[-1]["text"] = inject_tool_prompt(last_text, client_tools) log(f"๐Ÿ”ง tool prompt injected ({len(client_tools)} tools)", "DEBUG") # Build headers: prefer client-provided auth, otherwise use rotated account if client_token: hdrs = { "Content-Type": "application/json", "Origin": RITA_ORIGIN, "Referer": RITA_ORIGIN, "token": client_token, "Cookie": f"token={client_token}", } if client_visitorid: hdrs["visitorid"] = client_visitorid else: hdrs = acm.upstream_headers(acc, RITA_ORIGIN) # Auto-create conversation if needed (Rita requires valid chat_id) if not chat_id: try: init_r = requests.post( f"{UPSTREAM_URL}/chatgpt/newConversation", headers=hdrs, json={"model": rita_model}, timeout=15, verify=not DISABLE_SSL_VERIFY, ) init_data = init_r.json() if init_data.get("code") == 0: chat_id = init_data.get("data", {}).get("chat_id", 0) log(f"๐Ÿ“ Auto-created conversation: chat_id={chat_id}", "DEBUG") except Exception as e: log(f"โš ๏ธ Failed to create conversation: {e}", "WARNING") payload = { "model": rita_model, "messages": rita_messages, "online": 0, "model_type_id": 0, "chat_id": chat_id, "parent": parent, } resp = requests.post( f"{UPSTREAM_URL}/aichat/completions", headers=hdrs, json=payload, stream=True, timeout=120, verify=not DISABLE_SSL_VERIFY, ) if resp.status_code >= 500: log(f"๐Ÿ’ฅ Upstream 500: {resp.text[:200]}", "ERROR") if acc: acm.mark_fail(acc, f"HTTP {resp.status_code}") return jsonify({"error": {"message": "upstream error", "type": "upstream_error"}}), 502 # Check if upstream returned a JSON error (not SSE) ct = resp.headers.get("content-type", "") if "application/json" in ct: try: err_body = resp.json() err_code = err_body.get("code", 0) if err_code and err_code != 0: err_msg = err_body.get("error", err_body.get("message", "upstream error")) log(f"โš ๏ธ Upstream JSON error: code={err_code} msg={err_msg}", "WARNING") if acc: acm.mark_fail(acc, str(err_msg)) return jsonify({"error": {"message": str(err_msg), "type": "upstream_error"}}), 502 except Exception: pass resp.raise_for_status() if acc: acm.mark_ok(acc) # Deduct quota points based on model cost = get_cost(rita_model) acm.deduct_quota(acc.id, cost) # Log usage to database db = get_db() msg_chars = sum(len(str(m.get("content", ""))) for m in messages) db.log_usage(acc.id, model, msg_chars // 4) if stream: def gen(): cid = f"chatcmpl-{int(time.time()*1000)}" captured_msg_id = None with resp: resp.encoding = "utf-8" for line in resp.iter_lines(decode_unicode=True): if not line or not line.startswith("data: "): continue raw = line[6:] if raw == "[DONE]": break try: obj = json.loads(raw) except json.JSONDecodeError: continue etype = obj.get("type", "") if etype == "quota_remain": yield f'data: {json.dumps({"id": cid, "object": "chat.completion.chunk", "choices": [{"index": 0, "delta": {}, "finish_reason": None}], "x_quota": {"quota_remain": obj.get("quota_remain"), "service_quota_remain": obj.get("service_quota_remain")}})}\n\n' continue if etype in ("assistant_complete", "conv_title"): if etype == "assistant_complete": break continue rid = obj.get("id", "") if not captured_msg_id and rid.startswith("ai"): captured_msg_id = rid choices = obj.get("choices", []) if not choices: continue delta = choices[0].get("delta", {}) content = delta.get("content", "") if content: yield f'data: {json.dumps({"id": cid, "object": "chat.completion.chunk", "created": obj.get("created", int(time.time())), "model": model, "choices": [{"index": 0, "delta": {"content": content}, "finish_reason": None}]}, ensure_ascii=False)}\n\n' if captured_msg_id: update_conversation_state(messages, captured_msg_id, chat_id) yield f'data: {json.dumps({"id": cid, "object": "chat.completion.chunk", "created": int(time.time()), "model": model, "choices": [{"index": 0, "delta": {}, "finish_reason": "stop"}]})}\n\n' yield 'data: [DONE]\n\n' return Response(gen(), mimetype="text/event-stream", headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"}) else: # Non-streaming: Rita always returns SSE, so we collect the full response cid = f"chatcmpl-{int(time.time()*1000)}" content_parts = [] captured_msg_id = None created_ts = int(time.time()) with resp: resp.encoding = "utf-8" for line in resp.iter_lines(decode_unicode=True): if not line or not line.startswith("data: "): continue raw = line[6:] if raw == "[DONE]": break try: obj = json.loads(raw) except json.JSONDecodeError: continue etype = obj.get("type", "") if etype in ("quota_remain", "conv_title"): continue if etype == "assistant_complete": break rid = obj.get("id", "") if not captured_msg_id and rid.startswith("ai"): captured_msg_id = rid created_ts = obj.get("created", created_ts) choices = obj.get("choices", []) if choices: delta = choices[0].get("delta", {}) c = delta.get("content", "") if c: content_parts.append(c) content = "".join(content_parts) if captured_msg_id: update_conversation_state(messages, captured_msg_id, chat_id) if client_tools and content: parsed = parse_tool_response(content) if parsed["type"] == "tool_calls": log(f"๐Ÿ”ง tool_calls: {[c['name'] for c in parsed['calls']]}", "INFO") oai_tc = [{ "id": f"call_{i}", "type": "function", "function": {"name": c["name"], "arguments": json.dumps(c["input"], ensure_ascii=False)}, } for i, c in enumerate(parsed["calls"])] return jsonify({ "id": cid, "object": "chat.completion", "created": created_ts, "model": model, "choices": [{"index": 0, "message": {"role": "assistant", "content": None, "tool_calls": oai_tc}, "finish_reason": "tool_calls"}], "usage": {"prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0}, }) content = parsed.get("text", content) return jsonify({ "id": cid, "object": "chat.completion", "created": created_ts, "model": model, "choices": [{"index": 0, "message": {"role": "assistant", "content": content}, "finish_reason": "stop"}], "usage": {"prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0}, }) except requests.RequestException as e: log(f"โŒ Request error: {e}", "ERROR") if acc: acm.mark_fail(acc, str(e)) return jsonify({"error": {"message": str(e), "type": "upstream_error"}}), 502 except Exception as e: log(f"โŒ Unexpected error: {e}", "ERROR") import traceback; traceback.print_exc() return jsonify({"error": {"message": str(e), "type": "internal_error"}}), 500 # ========================================================================= # OpenAI Responses API (/v1/responses) # ========================================================================= def _responses_parse_input(data: dict) -> list[dict]: """Convert Responses API input to OpenAI messages format. Handles: string input, array of {role, content}, and typed items. """ raw_input = data.get("input", "") instructions = data.get("instructions", "") messages = [] if instructions: messages.append({"role": "system", "content": instructions}) if isinstance(raw_input, str): messages.append({"role": "user", "content": raw_input}) elif isinstance(raw_input, list): for item in raw_input: if isinstance(item, str): messages.append({"role": "user", "content": item}) elif isinstance(item, dict): role = item.get("role", "user") content = item.get("content", "") item_type = item.get("type", "") if item_type == "function_call_output": messages.append({ "role": "tool", "tool_call_id": item.get("call_id", ""), "content": item.get("output", ""), }) elif role in ("user", "assistant", "system", "developer"): if role == "developer": role = "system" # content can be string or array of parts if isinstance(content, list): text_parts = [] for part in content: if isinstance(part, dict): if part.get("type") == "input_text": text_parts.append(part.get("text", "")) elif part.get("type") == "text": text_parts.append(part.get("text", "")) elif isinstance(part, str): text_parts.append(part) content = "".join(text_parts) messages.append({"role": role, "content": content}) return messages def _responses_make_base(resp_id: str, model: str, created: float, instructions=None, status="completed") -> dict: """Build the base response object shared by events and final response.""" return { "id": resp_id, "object": "response", "created_at": created, "status": status, "model": model, "output": [], "parallel_tool_calls": True, "tool_choice": "auto", "tools": [], "temperature": 1.0, "top_p": 1.0, "max_output_tokens": None, "truncation": "disabled", "instructions": instructions, "metadata": {}, "incomplete_details": None, "error": None, "usage": None, } @app.route("/v1/responses", methods=["POST"]) def responses_api(): """OpenAI Responses API compatible endpoint. Accepts: model, input (str or array), instructions, stream, etc. Returns: Responses API format (non-streaming JSON or streaming SSE). """ data = request.json or {} model = data.get("model", "model_25") stream = data.get("stream", False) instructions = data.get("instructions") # Convert input to messages messages = _responses_parse_input(data) if not messages: return jsonify({"error": {"message": "input is required", "type": "invalid_request_error"}}), 400 log(f"๐Ÿ“ฅ /v1/responses model={model} stream={stream} msgs={len(messages)}", "INFO") _increment_stats(model, messages) acc, _ = acm.next() if not acc: return jsonify({"error": {"message": "no accounts configured", "type": "config_error"}}), 500 try: rita_model = resolve_rita_model(model) rita_messages = build_rita_messages(messages) hdrs = acm.upstream_headers(acc, RITA_ORIGIN) # Create conversation chat_id = 0 try: init_r = requests.post( f"{UPSTREAM_URL}/chatgpt/newConversation", headers=hdrs, json={"model": rita_model}, timeout=15, verify=not DISABLE_SSL_VERIFY, ) init_data = init_r.json() if init_data.get("code") == 0: chat_id = init_data.get("data", {}).get("chat_id", 0) except Exception: pass payload = { "model": rita_model, "messages": rita_messages, "online": 0, "model_type_id": 0, "chat_id": chat_id, "parent": "0", } resp = requests.post( f"{UPSTREAM_URL}/aichat/completions", headers=hdrs, json=payload, stream=True, timeout=120, verify=not DISABLE_SSL_VERIFY, ) # Check for JSON error ct = resp.headers.get("content-type", "") if "application/json" in ct: try: err = resp.json() if err.get("code") and err["code"] != 0: acm.mark_fail(acc, str(err.get("error", err.get("message", "")))) return jsonify({"error": {"message": str(err.get("error", err.get("message", "upstream error"))), "type": "upstream_error"}}), 502 except Exception: pass acm.mark_ok(acc) cost = get_cost(rita_model) acm.deduct_quota(acc.id, cost) db = get_db() msg_chars = sum(len(str(m.get("content", ""))) for m in messages) db.log_usage(acc.id, model, msg_chars // 4) resp_id = f"resp_{int(time.time()*1000)}" msg_id = f"msg_{int(time.time()*1000)}" created = time.time() if stream: def gen_responses_stream(): seq = 0 full_text_parts = [] base = _responses_make_base(resp_id, model, created, instructions, "in_progress") # Event 1: response.created yield f"event: response.created\ndata: {json.dumps({'type':'response.created','sequence_number':seq,'response':base}, ensure_ascii=False)}\n\n" seq += 1 # Event 2: response.in_progress yield f"event: response.in_progress\ndata: {json.dumps({'type':'response.in_progress','sequence_number':seq,'response':base}, ensure_ascii=False)}\n\n" seq += 1 # Event 3: output_item.added item_skeleton = {"id": msg_id, "type": "message", "role": "assistant", "status": "in_progress", "content": []} yield f"event: response.output_item.added\ndata: {json.dumps({'type':'response.output_item.added','sequence_number':seq,'output_index':0,'item':item_skeleton}, ensure_ascii=False)}\n\n" seq += 1 # Event 4: content_part.added part_skeleton = {"type": "output_text", "text": "", "annotations": []} yield f"event: response.content_part.added\ndata: {json.dumps({'type':'response.content_part.added','sequence_number':seq,'item_id':msg_id,'output_index':0,'content_index':0,'part':part_skeleton}, ensure_ascii=False)}\n\n" seq += 1 # Read SSE from upstream and emit text deltas with resp: resp.encoding = "utf-8" for line in resp.iter_lines(decode_unicode=True): if not line or not line.startswith("data: "): continue raw = line[6:] if raw == "[DONE]": break try: obj = json.loads(raw) except json.JSONDecodeError: continue etype = obj.get("type", "") if etype in ("quota_remain", "conv_title"): continue if etype == "assistant_complete": break choices = obj.get("choices", []) if choices: delta_content = choices[0].get("delta", {}).get("content", "") if delta_content: full_text_parts.append(delta_content) yield f"event: response.output_text.delta\ndata: {json.dumps({'type':'response.output_text.delta','sequence_number':seq,'item_id':msg_id,'output_index':0,'content_index':0,'delta':delta_content}, ensure_ascii=False)}\n\n" seq += 1 full_text = "".join(full_text_parts) # output_text.done yield f"event: response.output_text.done\ndata: {json.dumps({'type':'response.output_text.done','sequence_number':seq,'item_id':msg_id,'output_index':0,'content_index':0,'text':full_text}, ensure_ascii=False)}\n\n" seq += 1 # content_part.done done_part = {"type": "output_text", "text": full_text, "annotations": []} yield f"event: response.content_part.done\ndata: {json.dumps({'type':'response.content_part.done','sequence_number':seq,'item_id':msg_id,'output_index':0,'content_index':0,'part':done_part}, ensure_ascii=False)}\n\n" seq += 1 # output_item.done done_item = {"id": msg_id, "type": "message", "role": "assistant", "status": "completed", "content": [done_part]} yield f"event: response.output_item.done\ndata: {json.dumps({'type':'response.output_item.done','sequence_number':seq,'output_index':0,'item':done_item}, ensure_ascii=False)}\n\n" seq += 1 # response.completed usage = {"input_tokens": msg_chars // 4, "input_tokens_details": {"cached_tokens": 0}, "output_tokens": len(full_text) // 4, "output_tokens_details": {"reasoning_tokens": 0}, "total_tokens": (msg_chars + len(full_text)) // 4} final = _responses_make_base(resp_id, model, created, instructions, "completed") final["output"] = [done_item] final["usage"] = usage yield f"event: response.completed\ndata: {json.dumps({'type':'response.completed','sequence_number':seq,'response':final}, ensure_ascii=False)}\n\n" return Response(gen_responses_stream(), mimetype="text/event-stream", headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"}) else: # Non-streaming: collect SSE content content_parts = [] with resp: resp.encoding = "utf-8" for line in resp.iter_lines(decode_unicode=True): if not line or not line.startswith("data: "): continue raw = line[6:] if raw == "[DONE]": break try: obj = json.loads(raw) except json.JSONDecodeError: continue etype = obj.get("type", "") if etype in ("quota_remain", "conv_title"): continue if etype == "assistant_complete": break choices = obj.get("choices", []) if choices: c = choices[0].get("delta", {}).get("content", "") if c: content_parts.append(c) full_text = "".join(content_parts) output_item = { "id": msg_id, "type": "message", "role": "assistant", "status": "completed", "content": [{"type": "output_text", "text": full_text, "annotations": []}], } usage = { "input_tokens": msg_chars // 4, "input_tokens_details": {"cached_tokens": 0}, "output_tokens": len(full_text) // 4, "output_tokens_details": {"reasoning_tokens": 0}, "total_tokens": (msg_chars + len(full_text)) // 4, } result = _responses_make_base(resp_id, model, created, instructions, "completed") result["output"] = [output_item] result["usage"] = usage return jsonify(result) except requests.RequestException as e: log(f"โŒ Responses API error: {e}", "ERROR") if acc: acm.mark_fail(acc, str(e)) return jsonify({"error": {"message": str(e), "type": "upstream_error"}}), 502 except Exception as e: log(f"โŒ Responses API unexpected error: {e}", "ERROR") import traceback; traceback.print_exc() return jsonify({"error": {"message": str(e), "type": "internal_error"}}), 500 # ========================================================================= # Debug # ========================================================================= @app.route("/debug/state", methods=["GET"]) def debug_state(): with _conv_lock: return jsonify({ "conversations": len(_conversation_state), **acm.summary(), "upstream": UPSTREAM_URL, }) @app.route("/debug/clear", methods=["POST"]) def debug_clear(): with _conv_lock: _conversation_state.clear() log("๐Ÿงน Cleared conversation state", "WARNING") return jsonify({"status": "ok"}) # ========================================================================= # Startup # ========================================================================= if __name__ == "__main__": s = acm.summary() print("\n" + "=" * 60) print("๐Ÿš€ rita2api starting") print(f"๐Ÿ“ Port: {PORT} Upstream: {UPSTREAM_URL}") print(f"๐Ÿ”‘ Accounts: {s['total']} total, {s['active']} active, {s['disabled']} disabled") print(f"๐ŸŒ WebUI: http://localhost:{PORT}/") print("=" * 60 + "\n") # Start background token health checker acm.start_health_checker(UPSTREAM_URL, RITA_ORIGIN, log_fn=log) # Start auto-replenish (registers new accounts when pool runs low) auto_register.start_auto_replenish(acm, UPSTREAM_URL, RITA_ORIGIN, log_fn=log) app.run(host=HOST, port=PORT, debug=False, threaded=True)