Spaces:
Sleeping
Sleeping
| from flask import Flask, request, Response, jsonify | |
| import requests | |
| import json | |
| import uuid | |
| import time | |
| import os | |
| import re | |
| import base64 | |
| import mimetypes | |
| import random | |
| from datetime import datetime | |
| app = Flask(__name__) | |
| # ================= CONFIGURATION ================= | |
| COGNIX_BASE_URL = "https://www.cognixai.co" | |
| DEFAULT_SESSION_ID = "f351d7e7-a0ba-4888-86a4-76aab9a7a661" | |
| KEYS_FILE = os.path.join(os.getcwd(), "keys.json") | |
| MASTER_KEY = "sk-master-onyx-2026" # The Master Key to create other keys | |
| # Tier Limits | |
| RPM_LIMIT = 10 | |
| # Models NOT supported in this tier | |
| RESTRICTED_MODELS = ["claude opus-4.6", "3-pro", "gpt-5.3 codex"] | |
| # In-memory database | |
| if not os.path.exists(KEYS_FILE): | |
| with open(KEYS_FILE, 'w') as f: | |
| json.dump({"api_keys": {}}, f) | |
| def load_keys(): | |
| try: | |
| with open(KEYS_FILE, 'r') as f: | |
| return json.load(f) | |
| except: | |
| return {"api_keys": {}} | |
| def save_keys(keys_data): | |
| with open(KEYS_FILE, 'w') as f: | |
| json.dump(keys_data, f, indent=4) | |
| # Tracking Data | |
| rpm_tracking = {} | |
| COGNIX_COOKIES = [ | |
| "ext_name=ojplmecpdpgccookcobabopnaifgidhf; cf_clearance=j_nYaeNI0RwDRG1Qyd.bRf0R5YCGgIgAEzEgaQEjCCU-1770908625-1.2.1.1-RMchxpAE5hSG0Xl4XY3BShfT4aXGHCqNiBxN6iyTGkrv8azqzeTMuCOKZZ1lHjBZ5kdtj4.F_hmpP2legrsaaSe16gMqtqa5.FrM7yNuGQczvf1ep45loNu5MhI151HAk0k9T5UKDHdHXHcidlUt_ajlE64FUTSj26Rf6WwTg55n.xeliVOzxYygojzifx7hywAXmXMAqCpKADeDnSuEWqahc2_zDnpJxwy4444gh_o; __Secure-better-auth.state=FOj7ymeub1GeD3s4fiEbm9Hrd-hE0slR.oM0kHle4Je9FhUDPisXmPSHQvH4nkqldTe3kRBrTHJk%3D; __Secure-better-auth.session_token=5npdnyCa90buJBq2qW2wopL6nC3HjO4R.5v3gNhODuU7F0hbVXAJ%2BPFgMPsCPM0j8J%2BHk%2FrqsNdc%3D; __Secure-better-auth.session_data=eyJzZXNzaW9uIjp7InNlc3Npb24iOnsiZXhwaXJlc0F0IjoiMjAyNi0wMi0xOVQxNTowMzo0OC44MjNaIiwidG9rZW4iOiI1bnBkbnlDYTkwYnVKQnEycVcyd29wTDZuQzNIak80UiIsImNyZWF0ZWRBdCI6IjIwMjYtMDItMTJUMTU6MDM6NDguODIzWiIsInVwZGF0ZWRBdCI6IjIwMjYtMDItMTJUMTU6MDM6NDguODIzWiIsImlwQWRkcmVzcyI6IjE2Mi4xNTguNjMuMjQwIiwidXNlckFnZW50IjoiTW96aWxsYS81LjAgKFdpbmRvd3MgTlQgMTAuMDsgV2luNjQ7IHg2NCkgQXBwbGVXZWJLaXQvNTM3LjM2IChLSFRNTCwgbGlrZSBHZWNrbykgQ2hyb21lLzE0NC4wLjAuMCBTYWZhcmkvNTM3LjM2IiwidXNlcklkIjoiODM0YWZkYWEtOWFiYy00OGNkLTkwMzQtNzU4YTMzY2M3NTUxIiwiaW1wZXJzb25hdGVkQnkiOm51bGwsImlkIjoiNzk5ODJjMWMtZjQwOC00ODYyLWI0ZGEtMzI2ZTZkZmQ1NWU0In0sInVzZXIiOnsibmFtZSI6IkhpcmVuIEFoYWxhd2F0IiwiZW1haWwiOiJnaGc2NDI3MkBnbWFpbC5jb20iLCJlbWFpbFZlcmlmaWVkIjp0cnVlLCJpbWFnZSI6Imh0dHBzOi8vbGgzLmdvb2dsZXVzZXJjb250ZW50LmNvbS9hL0FDZzhvY0ozTVo3MjdKYzlJU244bERCcUplS2MyU0MxYXV5djFlbkV1bWxuTDhmR01CaEp0OGNUPXM5Ni1jIiwiY3JlYXRlZEF0IjoiMjAyNi0wMS0yNlQwNTo0NzoyNC43NzNaIiwidXBkYXRlZEF0IjoiMjAyNi0wMS0yNlQwNTo0NzoyNC43NzNaIiwicm9sZSI6ImVkaXRvciIsImJhbm5lZCI6ZmFsc2UsImJhblJlYXNvbiI6bnVsbCwiYmFuRXhwaXJlcyI6bnVsbCwiaWQiOiI4MzRhZmRhYS05YWJjLTQ4Y2QtOTAzNC03NThhMzNjYzc1NTEifX0sImV4cGlyZXNBdCI6MTc3MDkxMjIyODgzNCwic2lnbmF0dXJlIjoidXpNQWloYU9Sbk1QSnZ1V2VCMDdtOGcxSHliYVVrT2hLU05PS3JKSE96byJ9" | |
| ] | |
| def get_headers(): | |
| return { | |
| "accept": "*/*", | |
| "accept-language": "en-IN,en;q=0.9", | |
| "cookie": random.choice(COGNIX_COOKIES), | |
| "origin": "https://www.cognixai.co", | |
| "referer": f"https://www.cognixai.co/chat/{DEFAULT_SESSION_ID}", | |
| "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/144.0.0.0 Safari/537.36" | |
| } | |
| # ================= AUTH & LIMITS ================= | |
| def validate_access(): | |
| if request.path == "/" or request.path == "/v1/models" or not request.path.startswith("/v1/"): | |
| return | |
| # Master Key exclusion for its own route | |
| if request.path == "/v1/admin/create-key": | |
| auth_header = request.headers.get("Authorization", "") | |
| if auth_header != f"Bearer {MASTER_KEY}": | |
| return jsonify({"error": "Admin Access Denied"}), 403 | |
| return | |
| # Standard Key Validation | |
| auth_header = request.headers.get("Authorization", "") | |
| if not auth_header.startswith("Bearer "): | |
| return jsonify({"error": "Unauthorized", "message": "Missing Bearer token"}), 401 | |
| api_key = auth_header.replace("Bearer ", "").strip() | |
| data = load_keys() | |
| if api_key not in data["api_keys"]: | |
| return jsonify({"error": "Unauthorized", "message": "Invalid API Key"}), 401 | |
| # RPM Enforcement (10 RPM) | |
| now = time.time() | |
| if api_key not in rpm_tracking: | |
| rpm_tracking[api_key] = [] | |
| rpm_tracking[api_key] = [t for t in rpm_tracking[api_key] if now - t < 60] | |
| if len(rpm_tracking[api_key]) >= RPM_LIMIT: | |
| return jsonify({"error": "Rate limit exceeded", "message": f"Free Tier: {RPM_LIMIT} RPM"}), 429 | |
| rpm_tracking[api_key].append(now) | |
| # Model Exclusion Check | |
| if request.method == "POST" and request.path.endswith("/chat/completions"): | |
| body = request.get_json(silent=True) or {} | |
| model = body.get("model", "").lower() | |
| if any(rm in model for rm in RESTRICTED_MODELS): | |
| return jsonify({ | |
| "error": "Access Forbidden", | |
| "message": f"Model '{model}' is not supported in the Free Tier. Upgrade to Development Plan." | |
| }), 403 | |
| # ================= ADMIN ROUTES ================= | |
| def create_key(): | |
| d = request.json | |
| name = d.get('name') | |
| if not name: | |
| return jsonify({"error": "Name is required"}), 400 | |
| new_key = f"sk-free-{name.lower().replace(' ', '-')}-{uuid.uuid4().hex[:6]}" | |
| data = load_keys() | |
| data["api_keys"][new_key] = { | |
| "name": name, | |
| "created_at": datetime.now().isoformat() | |
| } | |
| save_keys(data) | |
| return jsonify({ | |
| "status": "success", | |
| "key": new_key, | |
| "name": name, | |
| "details": "This key is limited to 10 RPM and basic models only." | |
| }) | |
| # ================= CORE PROXY LOGIC ================= | |
| def parse_cognix_stream_chunk(line): | |
| if not line.strip(): return None, "content" | |
| if line.startswith("data: "): line = line[6:] | |
| if line.strip() == "[DONE]": return None, "stop" | |
| try: | |
| data = json.loads(line) | |
| content = data.get('text') or data.get('content') | |
| if not content: | |
| delta = data.get('delta') | |
| if isinstance(delta, str): content = delta | |
| elif isinstance(delta, dict): content = delta.get('text') or delta.get('content', '') | |
| return content or "", "content" | |
| except: | |
| return line, "content" | |
| def chat_completions(): | |
| d = request.json | |
| model = d.get('model', 'openai/gpt-4o') | |
| messages = d.get('messages', []) | |
| system = next((m.get('content', '') for m in messages if m.get('role') == 'system'), "") | |
| filtered = [m for m in messages if m.get('role') != 'system'] | |
| prov, ver = model.split('/', 1) if '/' in model else ("openai", model) | |
| payload = { | |
| "id": str(uuid.uuid4()), | |
| "chatModel": {"provider": prov, "model": ver}, | |
| "message": {"role": "user", "parts": [{"type": "text", "text": f"{system}\n\n{filtered[-1].get('content', '') if filtered else ''}"}], "id": str(uuid.uuid4())}, | |
| "allowedAppDefaultToolkit": ["webSearch"], | |
| "attachments": [] | |
| } | |
| if d.get('stream'): | |
| def gen(): | |
| cid = f"chatcmpl-{uuid.uuid4().hex[:24]}" | |
| yield f"data: {json.dumps({'id': cid, 'object': 'chat.completion.chunk', 'choices': [{'delta': {'role': 'assistant'}}]})}\n\n" | |
| with requests.post(f"{COGNIX_BASE_URL}/api/chat", json=payload, headers=get_headers(), stream=True) as r: | |
| for line in r.iter_lines(decode_unicode=True): | |
| if not line: continue | |
| cont, pty = parse_cognix_stream_chunk(line) | |
| if pty == "stop": break | |
| if cont: | |
| yield f"data: {json.dumps({'id': cid, 'object': 'chat.completion.chunk', 'choices': [{'delta': {'content': cont}}]})}\n\n" | |
| yield "data: [DONE]\n\n" | |
| return Response(gen(), content_type='text/event-stream') | |
| r = requests.post(f"{COGNIX_BASE_URL}/api/chat", json=payload, headers=get_headers()) | |
| full_text = "".join([parse_cognix_stream_chunk(l)[0] or "" for l in r.text.strip().split('\n')]) | |
| return jsonify({"id": str(uuid.uuid4()), "object": "chat.completion", "choices": [{"message": {"role": "assistant", "content": full_text}, "finish_reason": "stop"}]}) | |
| def list_models(): | |
| return jsonify({ | |
| "object": "list", | |
| "data": [ | |
| {"id": "openai/gpt-4o", "object": "model"}, | |
| {"id": "anthropic/claude-3-5-sonnet", "object": "model"} | |
| ] | |
| }) | |
| def index(): | |
| return jsonify({ | |
| "tier": "Public Free Tier", | |
| "rpm_limit": RPM_LIMIT, | |
| "restricted_models": RESTRICTED_MODELS, | |
| "status": "online" | |
| }) | |
| if __name__ == '__main__': | |
| print("🚀 Public Free Tier Proxy (Port 7866) Started...") | |
| app.run(host='0.0.0.0', port=7860, debug=True) | |