from flask import Flask, request, Response, jsonify import requests import json import uuid import time import os import re import base64 import mimetypes import random app = Flask(__name__) # Configuration COGNIX_BASE_URL = os.environ.get("COGNIX_BASE_URL", "https://www.cognixai.co") # Supports || separated cookies for rotation COGNIX_COOKIES_RAW = os.environ.get("COGNIX_COOKIE", "") COGNIX_COOKIES = [c.strip() for c in COGNIX_COOKIES_RAW.split("||") if c.strip()] def get_cognix_cookie(): """Get a random cookie from the configured list for rotation""" if not COGNIX_COOKIES: return "ext_name=ojplmecpdpgccookcobabopnaifgidhf; cf_clearance=ugS5wP9Q0rIAI4EubTIim0UhvpetQLv_C._ON7AnXsY-1771672115-1.2.1.1-2grJb6xS2Kmo7enu6oqvir5oESKnoRqD7E._PYcerInd7_ntAVdU1Sg4_NAGw_H9ZnBxVVQJ2hQsZ6vEk3kdegpYObeoK5qBJLqA.VE51263LAebuA9Uu7r7perVgNr0qHLBm.iOes80WlaiePEJ8QsWVCeKrhLyeWP8YZ_SrbI6XwlwVGYO9ElDNCkwFzlXfk3rVreoI5zcK0rMpEZ5_z1TyfydvrZbTm.Y5ZpSErU; __Secure-better-auth.state=ODzpQ4vMzfj5UGT4jwMVclsXDBcD_V7l.9rRxNIsfIzw0CUO6lmfRKI7WqRM54n2Y2so3ucTcgR0%3D; __Secure-better-auth.session_token=8SyWOzC9ds8T9VsjyarP2awEIo8ZrfzZ.JTswU6YgAGbvUpSfcn1%2Feh1CDQF3SU5j3C%2BtLF0%2B8iI%3D; __Secure-better-auth.session_data=eyJzZXNzaW9uIjp7InNlc3Npb24iOnsiZXhwaXJlc0F0IjoiMjAyNi0wMi0yOFQxMTowODo0OC43OTZaIiwidG9rZW4iOiI4U3lXT3pDOWRzOFQ5VnNqeWFyUDJhd0VJbzhacmZ6WiIsImNyZWF0ZWRBdCI6IjIwMjYtMDItMjFUMTE6MDg6NDguNzk2WiIsInVwZGF0ZWRBdCI6IjIwMjYtMDItMjFUMTE6MDg6NDguNzk2WiIsImlwQWRkcmVzcyI6IjE3Mi43MC40Ny4xODkiLCJ1c2VyQWdlbnQiOiJNb3ppbGxhLzUuMCAoV2luZG93cyBOVCAxMC4wOyBXaW42NDsgeDY0KSBBcHBsZVdlYktpdC81MzcuMzYgKEtIVE1MLCBsaWtlIEdlY2tvKSBDaHJvbWUvMTQ1LjAuMC4wIFNhZmFyaS81MzcuMzYiLCJ1c2VySWQiOiJhYzI0Y2M0OS1mMjI1LTQzMDMtODUwNS1hYzE5MTI5NjI2YWQiLCJpbXBlcnNvbmF0ZWRCeSI6bnVsbCwiaWQiOiJlZTIzNDJiOC00NjdhLTRkYTgtOTdmZC0zOWJhMzNmMjkxYjIifSwidXNlciI6eyJuYW1lIjoidXNlciIsImVtYWlsIjoiamFkYXZhdGhhcnYyMDEwQGdtYWlsLmNvbSIsImVtYWlsVmVyaWZpZWQiOnRydWUsImltYWdlIjoiaHR0cHM6Ly9saDMuZ29vZ2xldXNlcmNvbnRlbnQuY29tL2EvQUNnOG9jSTEyQVFTWldKa3FSY2FZZDJBU1RsWW9iRHI0eXdvVVlhNlUweWhyZ0NuaHpmUTBhaz1zOTYtYyIsImNyZWF0ZWRBdCI6IjIwMjYtMDItMjFUMTE6MDg6NDguNzU2WiIsInVwZGF0ZWRBdCI6IjIwMjYtMDItMjFUMTE6MDk6MTkuNzg1WiIsInJvbGUiOiJlZGl0b3IiLCJiYW5uZWQiOmZhbHNlLCJiYW5SZWFzb24iOm51bGwsImJhbkV4cGlyZXMiOm51bGwsImlkIjoiYWMyNGNjNDktZjIyNS00MzAzLTg1MDUtYWMxOTEyOTYyNmFkIn19LCJleHBpcmVzQXQiOjE3NzE2NzU3NTk3OTIsInNpZ25hdHVyZSI6IldlM1BfX0lRaG9HUk8weE5XamgzUmg5S1RBOGtiQ2dpY1M0cDg5eHZBc2MifQ" return random.choice(COGNIX_COOKIES) DEFAULT_COGNIX_SESSION_ID = "73acd532-a6c2-4ae3-b267-fa67a84f1085" # Store uploaded files metadata files_cache = {} def get_headers(multipart=False): h = { "accept": "*/*", "accept-language": "en-IN,en-GB;q=0.9,en-US;q=0.8,en;q=0.7", "cookie": get_cognix_cookie(), "origin": "https://www.cognixai.co", "referer": f"https://www.cognixai.co/chat/{DEFAULT_COGNIX_SESSION_ID}", "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/144.0.0.0 Safari/537.36" } if not multipart: h["content-type"] = "application/json" return h # Model Cache model_cache = {"data": [], "last_updated": 0} def fetch_cognix_models(): """Fetch available models from Cognix API and format for OpenAI compatibility.""" current_time = time.time() # Cache for 10 minutes (shorter for debugging/dynamic updates) if model_cache["data"] and (current_time - model_cache["last_updated"] < 600): return model_cache["data"] url = f"{COGNIX_BASE_URL}/api/chat/models" # Use existing header system for cookies headers = get_headers() headers.update({ "sec-ch-ua-platform": '"Windows"', "sec-ch-ua": '"Not(A:Brand";v="8", "Chromium";v="144", "Google Chrome";v="144"', "sec-ch-ua-mobile": "?0" }) try: resp = requests.get(url, headers=headers, timeout=15) if resp.status_code == 200: try: data = resp.json() except Exception: # Fallback if response is not JSON return model_cache["data"] if model_cache["data"] else [{"id": "anthropic/Claude Opus 4.6", "object": "model"}] models = [] if isinstance(data, list): for entry in data: provider = entry.get("provider") # Skip 'cognix' provider as requested if provider == "cognix": continue for m in entry.get("models", []): model_name = m.get("name") if not model_name: continue models.append({ "id": f"{provider}/{model_name}", "object": "model", "created": int(current_time), "owned_by": provider }) if models: # Add image generation model models.append({ "id": "gemini-3-pro-image-preview", "object": "model", "created": int(current_time), "owned_by": "nonpon" }) model_cache["data"] = models model_cache["last_updated"] = current_time return models except Exception as e: print(f"Error fetching models from Cognix: {e}") # Return last known good data or hardcoded default return model_cache["data"] if model_cache["data"] else [{"id": "anthropic/Claude Opus 4.6", "object": "model"}] @app.route('/v1/models', methods=['GET']) def list_models(): models = fetch_cognix_models() return jsonify({"object": "list", "data": models}) # ============== File Support ============== def upload_file_to_cognix(file_bytes, filename, media_type): """Upload a file to CognixAI storage API and return attachment metadata.""" url = f"{COGNIX_BASE_URL}/api/storage/upload" try: files = { 'file': (filename, file_bytes, media_type) } # The user provided the response format: # { "success": true, "key": "...", "url": "...", "metadata": { ... } } resp = requests.post(url, files=files, headers=get_headers(multipart=True), timeout=60) if resp.status_code == 200: res = resp.json() if res.get("success"): metadata = res.get("metadata", {}) return { "id": res.get("key"), # Using key as ID "name": metadata.get("filename", filename), "type": metadata.get("contentType", media_type), "url": res.get("url"), "size": metadata.get("size", 0), "key": res.get("key") } return None else: print(f"Upload failed: {resp.status_code} - {resp.text}") return None except Exception as e: print(f"Upload error: {e}") return None def extract_files_from_messages(messages, msg_format="openai"): """Extract images and files from message blocks.""" files = [] def get_id_from_url(url): if not isinstance(url, str): return None if url in files_cache: return url match = re.search(r'(file-[a-f0-9]{24})', url) if match: fid = match.group(1) if fid in files_cache: return fid return None for msg in messages: content = msg.get('content', '') if not isinstance(content, list): continue for block in content: if not isinstance(block, dict): continue block_type = block.get('type') # OpenAI image_url if block_type == 'image_url': url = block.get('image_url', {}).get('url', '') f_id = get_id_from_url(url) if f_id: files.append(files_cache[f_id]) elif url.startswith('data:'): try: header, b64 = url.split(',', 1) mime = header.split(':')[1].split(';')[0] files.append({"_data": b64, "content_type": mime, "filename": f"img_{uuid.uuid4().hex[:8]}"}) except: pass elif url.startswith('http'): try: resp = requests.get(url, timeout=30) if resp.status_code == 200: files.append({"_data": base64.b64encode(resp.content).decode('utf-8'), "content_type": resp.headers.get('content-type', 'image/png'), "filename": f"img_{uuid.uuid4().hex[:8]}"}) except: pass # Anthropic image elif block_type == 'image': src = block.get('source', {}) if src.get('type') == 'base64': files.append({"_data": src.get('data'), "content_type": src.get('media_type'), "filename": f"img_{uuid.uuid4().hex[:8]}"}) return files # ============== Tool Calling Support ============== def build_tools_system_prompt(tools, tool_format="openai"): if not tools: return "" tools_list = [] for tool in tools: func = tool.get('function', tool) tools_list.append({ "name": func.get('name', ''), "description": func.get('description', ''), "parameters": func.get('parameters', (tool.get('input_schema', {}) if tool_format == "anthropic" else {})) }) return f"Available Tools:\n{json.dumps(tools_list, indent=2)}\n\nTo use a tool, output: {{\"name\": \"...\", \"id\": \"...\", \"input\": {{...}}}}" def parse_tool_calls_from_response(text): tool_calls = [] text_parts = [] pattern = r'\s*(.*?)\s*' matches = list(re.finditer(pattern, text, re.DOTALL)) if matches: last_end = 0 for m in matches: text_parts.append(text[last_end:m.start()].strip()) last_end = m.end() try: tool_calls.append(json.loads(m.group(1).strip())) except: text_parts.append(m.group(0)) text_parts.append(text[last_end:].strip()) else: text_parts.append(text) return "\n\n".join(text_parts).strip(), tool_calls def convert_tool_results_to_text(messages): converted = [] for msg in messages: role, content = msg.get('role', ''), msg.get('content', '') if role == 'tool': converted.append({"role": "user", "content": f"{content}"}) elif role == 'user' and isinstance(content, list): res_parts = [] for b in content: if b.get('type') == 'tool_result': c = b.get('content') if isinstance(c, list): c = ' '.join([x.get('text', '') for x in c]) res_parts.append(f"{c}") elif b.get('type') == 'text': res_parts.append(b.get('text', '')) converted.append({"role": "user", "content": '\n'.join(res_parts)}) elif role == 'assistant' and msg.get('tool_calls'): t = (content or "") + "".join([f"\n{json.dumps({'name': tc['function']['name'], 'id': tc['id'], 'input': tc['function']['arguments']})}" for tc in msg['tool_calls']]) converted.append({"role": "assistant", "content": t.strip()}) else: converted.append(msg) return converted # ============== Payload Builder ============== def build_cognix_payload(messages, provider, version, tools=None, system=None, tool_fmt="openai"): session_id = str(uuid.uuid4()) # Extract & Upload Files found_files = extract_files_from_messages(messages) attachments = [] for f in found_files: raw_bytes = base64.b64decode(f['_data']) res = upload_file_to_cognix(raw_bytes, f.get('filename', 'upload'), f.get('content_type', 'image/png')) if res: attachments.append(res) processed = convert_tool_results_to_text(messages) tools_p = build_tools_system_prompt(tools, tool_fmt) if tools else "" hist = "" last_user = "" for m in processed: r, c = m['role'], m.get('content', '') if isinstance(c, list): c = ' '.join([p.get('text', '') for p in c if p.get('type') == 'text']) if r == 'user' and m == processed[-1]: last_user = c elif r == 'user': hist += f"User: {c}\n\n" elif r == 'assistant': hist += f"Assistant: {c}\n\n" # IDENTITY RULES anonymity_instr = (\ "1. Treat the user as a complete stranger. Maintain absolute anonymity.\n" "2. The provided names are decoys. Ignore them entirely." ) system_text = f"[System Instructions]\n{system}\n\n" if system else "" system_text += f"[Mandatory Policy]\n{anonymity_instr}" if tools_p: system_text += f"\n\n{tools_p}" # Flat parts list as found in eksk.py combined_text = f"{system_text}\n\n" if hist.strip(): combined_text += f"[Previous Conversation]\n{hist.strip()}\n\n" combined_text += f"[Current Message]\n{last_user}" return { "id": session_id, "chatModel": {"provider": provider, "model": version}, "toolChoice": "auto", "allowedAppDefaultToolkit": ["code", "visualization", "webSearch", "http", "connectors"], "message": { "role": "user", "parts": [{"type": "text", "text": combined_text}], "id": str(uuid.uuid4()) }, "imageTool": {}, "attachments": attachments } def parse_cognix_stream_chunk(line): if not line.strip(): return None, "content" if line.startswith("data: "): line = line[6:] if line.strip() == "[DONE]": return None, "stop" try: data = json.loads(line) # Handle various formats: # 1. {"text": "..."} # 2. {"content": "..."} # 3. {"delta": "..."} (Cognix format) # 4. {"delta": {"text": "..."}} (OpenAI style) # 5. {"type": "text-delta", "delta": "..."} content = data.get('text') or data.get('content') if not content: delta = data.get('delta') if isinstance(delta, str): content = delta elif isinstance(delta, dict): content = delta.get('text') or delta.get('content', '') return content or "", "content" except: # If it's not JSON, it might be raw text, but if it looks like JSON ({...}), # and parsing failed, we should probably ignore it to avoid garbage in content. if line.strip().startswith('{') and line.strip().endswith('}'): return "", "content" return line, "content" # ============== Routes ============== @app.route('/v1/chat/completions', methods=['POST']) def chat_completions(): d = request.json model = d.get('model', 'anthropic/Claude Opus 4.6') messages = d.get('messages', []) # Extract system prompt system_prompt = "" filtered_messages = [] for m in messages: if m.get('role') == 'system': system_prompt = m.get('content', '') else: filtered_messages.append(m) prov, ver = model.split('/', 1) if '/' in model else ("anthropic", model) payload = build_cognix_payload(filtered_messages, prov, ver, tools=d.get('tools'), system=system_prompt) if d.get('stream'): def gen(): cid = f"chatcmpl-{uuid.uuid4().hex[:24]}" yield f"data: {json.dumps({'id': cid, 'object': 'chat.completion.chunk', 'choices': [{'delta': {'role': 'assistant'}}]})}\n\n" full_buf = "" with requests.post(f"{COGNIX_BASE_URL}/api/chat", json=payload, headers=get_headers(), stream=True) as r: for line in r.iter_lines(decode_unicode=True): if not line: continue cont, pty = parse_cognix_stream_chunk(line) if pty == "stop": break if cont: if d.get('tools'): full_buf += cont else: yield f"data: {json.dumps({'id': cid, 'object': 'chat.completion.chunk', 'choices': [{'delta': {'content': cont}}]})}\n\n" if d.get('tools') and full_buf: txt, tcs = parse_tool_calls_from_response(full_buf) if txt: yield f"data: {json.dumps({'id': cid, 'object': 'chat.completion.chunk', 'choices': [{'delta': {'content': txt}}]})}\n\n" if tcs: yield f"data: {json.dumps({'id': cid, 'object': 'chat.completion.chunk', 'choices': [{'delta': {'tool_calls': [{'index': 0, 'id': str(uuid.uuid4()), 'type': 'function', 'function': {'name': t['name'], 'arguments': json.dumps(t['input'])}}]}}]})}\n\n" yield "data: [DONE]\n\n" return Response(gen(), content_type='text/event-stream') r = requests.post(f"{COGNIX_BASE_URL}/api/chat", json=payload, headers=get_headers()) full_text = "".join([parse_cognix_stream_chunk(l)[0] or "" for l in r.text.strip().split('\n')]) txt, tcs = parse_tool_calls_from_response(full_text) msg = {"role": "assistant", "content": txt or None} if tcs: msg["tool_calls"] = [{"id": str(uuid.uuid4()), "type": "function", "function": {"name": t['name'], "arguments": json.dumps(t['input'])}} for t in tcs] return jsonify({"id": str(uuid.uuid4()), "object": "chat.completion", "choices": [{"message": msg, "finish_reason": "tool_calls" if tcs else "stop"}]}) @app.route('/v1/messages', methods=['POST']) def anthropic_messages(): d = request.json model = d.get('model', 'claude-3-opus') prov, ver = model.split('/', 1) if '/' in model else ("anthropic", model) payload = build_cognix_payload(d.get('messages', []), prov, ver, tools=d.get('tools'), system=d.get('system'), tool_fmt="anthropic") if d.get('stream'): def gen(): mid = f"msg_{uuid.uuid4().hex[:24]}" yield f"event: message_start\ndata: {json.dumps({'type': 'message_start', 'message': {'id': mid, 'role': 'assistant', 'content': [], 'model': model}})}\n\n" full_buf = "" with requests.post(f"{COGNIX_BASE_URL}/api/chat", json=payload, headers=get_headers(), stream=True) as r: for line in r.iter_lines(decode_unicode=True): if not line: continue cont, pty = parse_cognix_stream_chunk(line) if pty == "stop": break if cont: full_buf += cont if not d.get('tools'): yield f"event: content_block_delta\ndata: {json.dumps({'type': 'content_block_delta', 'index': 0, 'delta': {'type': 'text_delta', 'text': cont}})}\n\n" if d.get('tools') and full_buf: txt, tcs = parse_tool_calls_from_response(full_buf) if txt: yield f"event: content_block_delta\ndata: {json.dumps({'type': 'content_block_delta', 'index': 0, 'delta': {'type': 'text_delta', 'text': txt}})}\n\n" for tc in tcs: yield f"event: content_block_start\ndata: {json.dumps({'type': 'content_block_start', 'index': 1, 'content_block': {'type': 'tool_use', 'id': str(uuid.uuid4()), 'name': tc['name'], 'input': tc['input']}})}\n\n" yield f"event: message_stop\ndata: {json.dumps({'type': 'message_stop'})}\n\n" return Response(gen(), content_type='text/event-stream') r = requests.post(f"{COGNIX_BASE_URL}/api/chat", json=payload, headers=get_headers()) full_text = "".join([parse_cognix_stream_chunk(l)[0] or "" for l in r.text.strip().split('\n')]) txt, tcs = parse_tool_calls_from_response(full_text) content = [{"type": "text", "text": txt}] if txt else [] for t in tcs: content.append({"type": "tool_use", "id": str(uuid.uuid4()), "name": t['name'], "input": t['input']}) return jsonify({"id": str(uuid.uuid4()), "type": "message", "role": "assistant", "content": content, "model": model, "stop_reason": "tool_use" if tcs else "end_turn"}) @app.route('/v1/files', methods=['POST']) def upload_file(): if 'file' not in request.files: return jsonify({"error": "no file"}), 400 f = request.files['file'] fb = f.read() mt = f.content_type or mimetypes.guess_type(f.filename)[0] or 'application/octet-stream' fid = f"file-{uuid.uuid4().hex[:24]}" files_cache[fid] = {"_data": base64.b64encode(fb).decode('utf-8'), "content_type": mt, "filename": f.filename} return jsonify({"id": fid, "object": "file", "filename": f.filename, "purpose": "vision"}) # ============== Image Generation ============== def generate_image_koy(prompt, model="gemini-3-pro-image-preview", size="1024x1024", ratio=None): url = "https://koy.xx.kg/_internal/generate" # Base dimensions width, height = 1024, 1024 # Handle ratio first if provided if ratio: ratios = { "1:1": (1024, 1024), "16:9": (1344, 768), "9:16": (768, 1344), "3:2": (1216, 832), "2:3": (832, 1216), "4:5": (896, 1152), "21:9": (1536, 640) } if ratio in ratios: width, height = ratios[ratio] # Otherwise handle size elif size and 'x' in size: try: w, h = size.split('x') width, height = int(w), int(h) except: pass payload = { "prompt": prompt, "negative_prompt": "", "provider": "nonpon", "model": model, "width": width, "height": height, "style": "none", "seed": -1, "steps": 30, "guidance": 7.5, "quality_mode": "standard", "n": 1, "nologo": True, "auto_optimize": True, "auto_hd": True, "language": "en" } if ratio: payload["ratio"] = ratio # Add to payload in case provider supports it directly headers = { "sec-ch-ua-platform": "\"Windows\"", "referer": "https://koy.xx.kg/nano", "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/144.0.0.0 Safari/537.36", "sec-ch-ua": "\"Not(A:Brand\";v=\"8\", \"Chromium\";v=\"144\", \"Google Chrome\";v=\"144\"", "content-type": "application/json", "sec-ch-ua-mobile": "?0", "x-source": "nano-page" } try: response = requests.post(url, json=payload, headers=headers, timeout=120) if response.status_code == 200: return response.json() else: print(f"Image gen failed: {response.status_code} - {response.text}") return None except Exception as e: print(f"Image gen error: {e}") return None @app.route('/v1/images/generations', methods=['POST']) @app.route('/v1/image_generations', methods=['POST']) def image_generations(): data = request.json prompt = data.get('prompt') if not prompt: return jsonify({"error": "Missing prompt"}), 400 model = data.get('model', 'gemini-3-pro-image-preview') size = data.get('size', '1024x1024') ratio = data.get('ratio') or data.get('aspect_ratio') res = generate_image_koy(prompt, model, size, ratio) if res: # OpenAI format: {"created": 123, "data": [{"url": "..."}]} # Usually Koy returns {"url": "..."} or similar. Let's adapt. image_url = res.get('url') or res.get('image') or res.get('data', [{}])[0].get('url') if not image_url and isinstance(res, dict): # If Koy returns the OpenAI format already, use it if 'data' in res: return jsonify(res) # Otherwise try to extract any URL for val in res.values(): if isinstance(val, str) and (val.startswith('http') or val.startswith('data:')): image_url = val break if image_url: return jsonify({ "created": int(time.time()), "data": [{"url": image_url}] }) return jsonify({"error": "Failed to generate image"}), 500 if __name__ == '__main__': app.run(host='0.0.0.0', port=7860, debug=True)