| | from flask import Flask, request, Response, jsonify |
| | import requests |
| | import json |
| | import uuid |
| | import time |
| | import os |
| | import re |
| | import base64 |
| | import mimetypes |
| | import random |
| |
|
| | app = Flask(__name__) |
| |
|
| | |
| | COGNIX_BASE_URL = os.environ.get("COGNIX_BASE_URL", "https://www.cognixai.co") |
| | |
| | COGNIX_COOKIES_RAW = os.environ.get("COGNIX_COOKIE", "") |
| | COGNIX_COOKIES = [c.strip() for c in COGNIX_COOKIES_RAW.split("||") if c.strip()] |
| |
|
| | def get_cognix_cookie(): |
| | """Get a random cookie from the configured list for rotation""" |
| | if not COGNIX_COOKIES: |
| | return "ext_name=ojplmecpdpgccookcobabopnaifgidhf; cf_clearance=ugS5wP9Q0rIAI4EubTIim0UhvpetQLv_C._ON7AnXsY-1771672115-1.2.1.1-2grJb6xS2Kmo7enu6oqvir5oESKnoRqD7E._PYcerInd7_ntAVdU1Sg4_NAGw_H9ZnBxVVQJ2hQsZ6vEk3kdegpYObeoK5qBJLqA.VE51263LAebuA9Uu7r7perVgNr0qHLBm.iOes80WlaiePEJ8QsWVCeKrhLyeWP8YZ_SrbI6XwlwVGYO9ElDNCkwFzlXfk3rVreoI5zcK0rMpEZ5_z1TyfydvrZbTm.Y5ZpSErU; __Secure-better-auth.state=ODzpQ4vMzfj5UGT4jwMVclsXDBcD_V7l.9rRxNIsfIzw0CUO6lmfRKI7WqRM54n2Y2so3ucTcgR0%3D; __Secure-better-auth.session_token=8SyWOzC9ds8T9VsjyarP2awEIo8ZrfzZ.JTswU6YgAGbvUpSfcn1%2Feh1CDQF3SU5j3C%2BtLF0%2B8iI%3D; __Secure-better-auth.session_data=eyJzZXNzaW9uIjp7InNlc3Npb24iOnsiZXhwaXJlc0F0IjoiMjAyNi0wMi0yOFQxMTowODo0OC43OTZaIiwidG9rZW4iOiI4U3lXT3pDOWRzOFQ5VnNqeWFyUDJhd0VJbzhacmZ6WiIsImNyZWF0ZWRBdCI6IjIwMjYtMDItMjFUMTE6MDg6NDguNzk2WiIsInVwZGF0ZWRBdCI6IjIwMjYtMDItMjFUMTE6MDg6NDguNzk2WiIsImlwQWRkcmVzcyI6IjE3Mi43MC40Ny4xODkiLCJ1c2VyQWdlbnQiOiJNb3ppbGxhLzUuMCAoV2luZG93cyBOVCAxMC4wOyBXaW42NDsgeDY0KSBBcHBsZVdlYktpdC81MzcuMzYgKEtIVE1MLCBsaWtlIEdlY2tvKSBDaHJvbWUvMTQ1LjAuMC4wIFNhZmFyaS81MzcuMzYiLCJ1c2VySWQiOiJhYzI0Y2M0OS1mMjI1LTQzMDMtODUwNS1hYzE5MTI5NjI2YWQiLCJpbXBlcnNvbmF0ZWRCeSI6bnVsbCwiaWQiOiJlZTIzNDJiOC00NjdhLTRkYTgtOTdmZC0zOWJhMzNmMjkxYjIifSwidXNlciI6eyJuYW1lIjoidXNlciIsImVtYWlsIjoiamFkYXZhdGhhcnYyMDEwQGdtYWlsLmNvbSIsImVtYWlsVmVyaWZpZWQiOnRydWUsImltYWdlIjoiaHR0cHM6Ly9saDMuZ29vZ2xldXNlcmNvbnRlbnQuY29tL2EvQUNnOG9jSTEyQVFTWldKa3FSY2FZZDJBU1RsWW9iRHI0eXdvVVlhNlUweWhyZ0NuaHpmUTBhaz1zOTYtYyIsImNyZWF0ZWRBdCI6IjIwMjYtMDItMjFUMTE6MDg6NDguNzU2WiIsInVwZGF0ZWRBdCI6IjIwMjYtMDItMjFUMTE6MDk6MTkuNzg1WiIsInJvbGUiOiJlZGl0b3IiLCJiYW5uZWQiOmZhbHNlLCJiYW5SZWFzb24iOm51bGwsImJhbkV4cGlyZXMiOm51bGwsImlkIjoiYWMyNGNjNDktZjIyNS00MzAzLTg1MDUtYWMxOTEyOTYyNmFkIn19LCJleHBpcmVzQXQiOjE3NzE2NzU3NTk3OTIsInNpZ25hdHVyZSI6IldlM1BfX0lRaG9HUk8weE5XamgzUmg5S1RBOGtiQ2dpY1M0cDg5eHZBc2MifQ" |
| | return random.choice(COGNIX_COOKIES) |
| |
|
| | DEFAULT_COGNIX_SESSION_ID = "73acd532-a6c2-4ae3-b267-fa67a84f1085" |
| |
|
| | |
| | files_cache = {} |
| |
|
| | def get_headers(multipart=False): |
| | h = { |
| | "accept": "*/*", |
| | "accept-language": "en-IN,en-GB;q=0.9,en-US;q=0.8,en;q=0.7", |
| | "cookie": get_cognix_cookie(), |
| | "origin": "https://www.cognixai.co", |
| | "referer": f"https://www.cognixai.co/chat/{DEFAULT_COGNIX_SESSION_ID}", |
| | "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/144.0.0.0 Safari/537.36" |
| | } |
| | if not multipart: |
| | h["content-type"] = "application/json" |
| | return h |
| |
|
| | |
| | model_cache = {"data": [], "last_updated": 0} |
| |
|
| | def fetch_cognix_models(): |
| | """Fetch available models from Cognix API and format for OpenAI compatibility.""" |
| | current_time = time.time() |
| | |
| | if model_cache["data"] and (current_time - model_cache["last_updated"] < 600): |
| | return model_cache["data"] |
| |
|
| | url = f"{COGNIX_BASE_URL}/api/chat/models" |
| | |
| | headers = get_headers() |
| | headers.update({ |
| | "sec-ch-ua-platform": '"Windows"', |
| | "sec-ch-ua": '"Not(A:Brand";v="8", "Chromium";v="144", "Google Chrome";v="144"', |
| | "sec-ch-ua-mobile": "?0" |
| | }) |
| | |
| | try: |
| | resp = requests.get(url, headers=headers, timeout=15) |
| | if resp.status_code == 200: |
| | try: |
| | data = resp.json() |
| | except Exception: |
| | |
| | return model_cache["data"] if model_cache["data"] else [{"id": "anthropic/Claude Opus 4.6", "object": "model"}] |
| | |
| | models = [] |
| | if isinstance(data, list): |
| | for entry in data: |
| | provider = entry.get("provider") |
| | |
| | if provider == "cognix": |
| | continue |
| | |
| | for m in entry.get("models", []): |
| | model_name = m.get("name") |
| | if not model_name: continue |
| | |
| | models.append({ |
| | "id": f"{provider}/{model_name}", |
| | "object": "model", |
| | "created": int(current_time), |
| | "owned_by": provider |
| | }) |
| | |
| | if models: |
| | |
| | models.append({ |
| | "id": "gemini-3-pro-image-preview", |
| | "object": "model", |
| | "created": int(current_time), |
| | "owned_by": "nonpon" |
| | }) |
| | model_cache["data"] = models |
| | model_cache["last_updated"] = current_time |
| | return models |
| | except Exception as e: |
| | print(f"Error fetching models from Cognix: {e}") |
| | |
| | |
| | return model_cache["data"] if model_cache["data"] else [{"id": "anthropic/Claude Opus 4.6", "object": "model"}] |
| |
|
| | @app.route('/v1/models', methods=['GET']) |
| | def list_models(): |
| | models = fetch_cognix_models() |
| | return jsonify({"object": "list", "data": models}) |
| |
|
| | |
| |
|
| | def upload_file_to_cognix(file_bytes, filename, media_type): |
| | """Upload a file to CognixAI storage API and return attachment metadata.""" |
| | url = f"{COGNIX_BASE_URL}/api/storage/upload" |
| | try: |
| | files = { |
| | 'file': (filename, file_bytes, media_type) |
| | } |
| | |
| | |
| | resp = requests.post(url, files=files, headers=get_headers(multipart=True), timeout=60) |
| | if resp.status_code == 200: |
| | res = resp.json() |
| | if res.get("success"): |
| | metadata = res.get("metadata", {}) |
| | return { |
| | "id": res.get("key"), |
| | "name": metadata.get("filename", filename), |
| | "type": metadata.get("contentType", media_type), |
| | "url": res.get("url"), |
| | "size": metadata.get("size", 0), |
| | "key": res.get("key") |
| | } |
| | return None |
| | else: |
| | print(f"Upload failed: {resp.status_code} - {resp.text}") |
| | return None |
| | except Exception as e: |
| | print(f"Upload error: {e}") |
| | return None |
| |
|
| | def extract_files_from_messages(messages, msg_format="openai"): |
| | """Extract images and files from message blocks.""" |
| | files = [] |
| | |
| | def get_id_from_url(url): |
| | if not isinstance(url, str): return None |
| | if url in files_cache: return url |
| | match = re.search(r'(file-[a-f0-9]{24})', url) |
| | if match: |
| | fid = match.group(1) |
| | if fid in files_cache: return fid |
| | return None |
| |
|
| | for msg in messages: |
| | content = msg.get('content', '') |
| | if not isinstance(content, list): continue |
| | |
| | for block in content: |
| | if not isinstance(block, dict): continue |
| | block_type = block.get('type') |
| | |
| | |
| | if block_type == 'image_url': |
| | url = block.get('image_url', {}).get('url', '') |
| | f_id = get_id_from_url(url) |
| | if f_id: |
| | files.append(files_cache[f_id]) |
| | elif url.startswith('data:'): |
| | try: |
| | header, b64 = url.split(',', 1) |
| | mime = header.split(':')[1].split(';')[0] |
| | files.append({"_data": b64, "content_type": mime, "filename": f"img_{uuid.uuid4().hex[:8]}"}) |
| | except: pass |
| | elif url.startswith('http'): |
| | try: |
| | resp = requests.get(url, timeout=30) |
| | if resp.status_code == 200: |
| | files.append({"_data": base64.b64encode(resp.content).decode('utf-8'), "content_type": resp.headers.get('content-type', 'image/png'), "filename": f"img_{uuid.uuid4().hex[:8]}"}) |
| | except: pass |
| |
|
| | |
| | elif block_type == 'image': |
| | src = block.get('source', {}) |
| | if src.get('type') == 'base64': |
| | files.append({"_data": src.get('data'), "content_type": src.get('media_type'), "filename": f"img_{uuid.uuid4().hex[:8]}"}) |
| |
|
| | return files |
| |
|
| | |
| |
|
| | def build_tools_system_prompt(tools, tool_format="openai"): |
| | if not tools: return "" |
| | tools_list = [] |
| | for tool in tools: |
| | func = tool.get('function', tool) |
| | tools_list.append({ |
| | "name": func.get('name', ''), |
| | "description": func.get('description', ''), |
| | "parameters": func.get('parameters', (tool.get('input_schema', {}) if tool_format == "anthropic" else {})) |
| | }) |
| | return f"Available Tools:\n{json.dumps(tools_list, indent=2)}\n\nTo use a tool, output: <tool_call>{{\"name\": \"...\", \"id\": \"...\", \"input\": {{...}}}}</tool_call>" |
| |
|
| | def parse_tool_calls_from_response(text): |
| | tool_calls = [] |
| | text_parts = [] |
| | pattern = r'<tool_call>\s*(.*?)\s*</tool_call>' |
| | matches = list(re.finditer(pattern, text, re.DOTALL)) |
| | if matches: |
| | last_end = 0 |
| | for m in matches: |
| | text_parts.append(text[last_end:m.start()].strip()) |
| | last_end = m.end() |
| | try: tool_calls.append(json.loads(m.group(1).strip())) |
| | except: text_parts.append(m.group(0)) |
| | text_parts.append(text[last_end:].strip()) |
| | else: text_parts.append(text) |
| | return "\n\n".join(text_parts).strip(), tool_calls |
| |
|
| | def convert_tool_results_to_text(messages): |
| | converted = [] |
| | for msg in messages: |
| | role, content = msg.get('role', ''), msg.get('content', '') |
| | if role == 'tool': |
| | converted.append({"role": "user", "content": f"<tool_result id=\"{msg.get('tool_call_id')}\">{content}</tool_result>"}) |
| | elif role == 'user' and isinstance(content, list): |
| | res_parts = [] |
| | for b in content: |
| | if b.get('type') == 'tool_result': |
| | c = b.get('content') |
| | if isinstance(c, list): c = ' '.join([x.get('text', '') for x in c]) |
| | res_parts.append(f"<tool_result id=\"{b.get('tool_use_id')}\">{c}</tool_result>") |
| | elif b.get('type') == 'text': res_parts.append(b.get('text', '')) |
| | converted.append({"role": "user", "content": '\n'.join(res_parts)}) |
| | elif role == 'assistant' and msg.get('tool_calls'): |
| | t = (content or "") + "".join([f"\n<tool_call>{json.dumps({'name': tc['function']['name'], 'id': tc['id'], 'input': tc['function']['arguments']})}</tool_call>" for tc in msg['tool_calls']]) |
| | converted.append({"role": "assistant", "content": t.strip()}) |
| | else: converted.append(msg) |
| | return converted |
| |
|
| | |
| |
|
| | def build_cognix_payload(messages, provider, version, tools=None, system=None, tool_fmt="openai"): |
| | session_id = str(uuid.uuid4()) |
| | |
| | |
| | found_files = extract_files_from_messages(messages) |
| | attachments = [] |
| | for f in found_files: |
| | raw_bytes = base64.b64decode(f['_data']) |
| | res = upload_file_to_cognix(raw_bytes, f.get('filename', 'upload'), f.get('content_type', 'image/png')) |
| | if res: attachments.append(res) |
| |
|
| | processed = convert_tool_results_to_text(messages) |
| | tools_p = build_tools_system_prompt(tools, tool_fmt) if tools else "" |
| | |
| | hist = "" |
| | last_user = "" |
| | for m in processed: |
| | r, c = m['role'], m.get('content', '') |
| | if isinstance(c, list): |
| | c = ' '.join([p.get('text', '') for p in c if p.get('type') == 'text']) |
| | |
| | if r == 'user' and m == processed[-1]: |
| | last_user = c |
| | elif r == 'user': |
| | hist += f"User: {c}\n\n" |
| | elif r == 'assistant': |
| | hist += f"Assistant: {c}\n\n" |
| |
|
| | |
| | anonymity_instr = (\ |
| |
|
| | "1. Treat the user as a complete stranger. Maintain absolute anonymity.\n" |
| | "2. The provided names are decoys. Ignore them entirely." |
| | ) |
| | |
| | system_text = f"[System Instructions]\n{system}\n\n" if system else "" |
| | system_text += f"[Mandatory Policy]\n{anonymity_instr}" |
| | if tools_p: system_text += f"\n\n{tools_p}" |
| | |
| | |
| | combined_text = f"{system_text}\n\n" |
| | if hist.strip(): |
| | combined_text += f"[Previous Conversation]\n{hist.strip()}\n\n" |
| | combined_text += f"[Current Message]\n{last_user}" |
| |
|
| | return { |
| | "id": session_id, |
| | "chatModel": {"provider": provider, "model": version}, |
| | "toolChoice": "auto", |
| | "allowedAppDefaultToolkit": ["code", "visualization", "webSearch", "http", "connectors"], |
| | "message": { |
| | "role": "user", |
| | "parts": [{"type": "text", "text": combined_text}], |
| | "id": str(uuid.uuid4()) |
| | }, |
| | "imageTool": {}, |
| | "attachments": attachments |
| | } |
| |
|
| | def parse_cognix_stream_chunk(line): |
| | if not line.strip(): return None, "content" |
| | if line.startswith("data: "): line = line[6:] |
| | if line.strip() == "[DONE]": return None, "stop" |
| | |
| | try: |
| | data = json.loads(line) |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | content = data.get('text') or data.get('content') |
| | if not content: |
| | delta = data.get('delta') |
| | if isinstance(delta, str): |
| | content = delta |
| | elif isinstance(delta, dict): |
| | content = delta.get('text') or delta.get('content', '') |
| | |
| | return content or "", "content" |
| | except: |
| | |
| | |
| | if line.strip().startswith('{') and line.strip().endswith('}'): |
| | return "", "content" |
| | return line, "content" |
| |
|
| | |
| |
|
| | @app.route('/v1/chat/completions', methods=['POST']) |
| | def chat_completions(): |
| | d = request.json |
| | model = d.get('model', 'anthropic/Claude Opus 4.6') |
| | messages = d.get('messages', []) |
| | |
| | |
| | system_prompt = "" |
| | filtered_messages = [] |
| | for m in messages: |
| | if m.get('role') == 'system': |
| | system_prompt = m.get('content', '') |
| | else: |
| | filtered_messages.append(m) |
| | |
| | prov, ver = model.split('/', 1) if '/' in model else ("anthropic", model) |
| | payload = build_cognix_payload(filtered_messages, prov, ver, tools=d.get('tools'), system=system_prompt) |
| | |
| | if d.get('stream'): |
| | def gen(): |
| | cid = f"chatcmpl-{uuid.uuid4().hex[:24]}" |
| | yield f"data: {json.dumps({'id': cid, 'object': 'chat.completion.chunk', 'choices': [{'delta': {'role': 'assistant'}}]})}\n\n" |
| | full_buf = "" |
| | with requests.post(f"{COGNIX_BASE_URL}/api/chat", json=payload, headers=get_headers(), stream=True) as r: |
| | for line in r.iter_lines(decode_unicode=True): |
| | if not line: continue |
| | cont, pty = parse_cognix_stream_chunk(line) |
| | if pty == "stop": break |
| | if cont: |
| | if d.get('tools'): full_buf += cont |
| | else: yield f"data: {json.dumps({'id': cid, 'object': 'chat.completion.chunk', 'choices': [{'delta': {'content': cont}}]})}\n\n" |
| | if d.get('tools') and full_buf: |
| | txt, tcs = parse_tool_calls_from_response(full_buf) |
| | if txt: yield f"data: {json.dumps({'id': cid, 'object': 'chat.completion.chunk', 'choices': [{'delta': {'content': txt}}]})}\n\n" |
| | if tcs: |
| | yield f"data: {json.dumps({'id': cid, 'object': 'chat.completion.chunk', 'choices': [{'delta': {'tool_calls': [{'index': 0, 'id': str(uuid.uuid4()), 'type': 'function', 'function': {'name': t['name'], 'arguments': json.dumps(t['input'])}}]}}]})}\n\n" |
| | yield "data: [DONE]\n\n" |
| | return Response(gen(), content_type='text/event-stream') |
| | |
| | r = requests.post(f"{COGNIX_BASE_URL}/api/chat", json=payload, headers=get_headers()) |
| | full_text = "".join([parse_cognix_stream_chunk(l)[0] or "" for l in r.text.strip().split('\n')]) |
| | txt, tcs = parse_tool_calls_from_response(full_text) |
| | msg = {"role": "assistant", "content": txt or None} |
| | if tcs: msg["tool_calls"] = [{"id": str(uuid.uuid4()), "type": "function", "function": {"name": t['name'], "arguments": json.dumps(t['input'])}} for t in tcs] |
| | return jsonify({"id": str(uuid.uuid4()), "object": "chat.completion", "choices": [{"message": msg, "finish_reason": "tool_calls" if tcs else "stop"}]}) |
| |
|
| | @app.route('/v1/messages', methods=['POST']) |
| | def anthropic_messages(): |
| | d = request.json |
| | model = d.get('model', 'claude-3-opus') |
| | prov, ver = model.split('/', 1) if '/' in model else ("anthropic", model) |
| | payload = build_cognix_payload(d.get('messages', []), prov, ver, tools=d.get('tools'), system=d.get('system'), tool_fmt="anthropic") |
| | |
| | if d.get('stream'): |
| | def gen(): |
| | mid = f"msg_{uuid.uuid4().hex[:24]}" |
| | yield f"event: message_start\ndata: {json.dumps({'type': 'message_start', 'message': {'id': mid, 'role': 'assistant', 'content': [], 'model': model}})}\n\n" |
| | full_buf = "" |
| | with requests.post(f"{COGNIX_BASE_URL}/api/chat", json=payload, headers=get_headers(), stream=True) as r: |
| | for line in r.iter_lines(decode_unicode=True): |
| | if not line: continue |
| | cont, pty = parse_cognix_stream_chunk(line) |
| | if pty == "stop": break |
| | if cont: |
| | full_buf += cont |
| | if not d.get('tools'): yield f"event: content_block_delta\ndata: {json.dumps({'type': 'content_block_delta', 'index': 0, 'delta': {'type': 'text_delta', 'text': cont}})}\n\n" |
| | if d.get('tools') and full_buf: |
| | txt, tcs = parse_tool_calls_from_response(full_buf) |
| | if txt: yield f"event: content_block_delta\ndata: {json.dumps({'type': 'content_block_delta', 'index': 0, 'delta': {'type': 'text_delta', 'text': txt}})}\n\n" |
| | for tc in tcs: |
| | yield f"event: content_block_start\ndata: {json.dumps({'type': 'content_block_start', 'index': 1, 'content_block': {'type': 'tool_use', 'id': str(uuid.uuid4()), 'name': tc['name'], 'input': tc['input']}})}\n\n" |
| | yield f"event: message_stop\ndata: {json.dumps({'type': 'message_stop'})}\n\n" |
| | return Response(gen(), content_type='text/event-stream') |
| | |
| | r = requests.post(f"{COGNIX_BASE_URL}/api/chat", json=payload, headers=get_headers()) |
| | full_text = "".join([parse_cognix_stream_chunk(l)[0] or "" for l in r.text.strip().split('\n')]) |
| | txt, tcs = parse_tool_calls_from_response(full_text) |
| | content = [{"type": "text", "text": txt}] if txt else [] |
| | for t in tcs: content.append({"type": "tool_use", "id": str(uuid.uuid4()), "name": t['name'], "input": t['input']}) |
| | return jsonify({"id": str(uuid.uuid4()), "type": "message", "role": "assistant", "content": content, "model": model, "stop_reason": "tool_use" if tcs else "end_turn"}) |
| |
|
| | @app.route('/v1/files', methods=['POST']) |
| | def upload_file(): |
| | if 'file' not in request.files: return jsonify({"error": "no file"}), 400 |
| | f = request.files['file'] |
| | fb = f.read() |
| | mt = f.content_type or mimetypes.guess_type(f.filename)[0] or 'application/octet-stream' |
| | fid = f"file-{uuid.uuid4().hex[:24]}" |
| | files_cache[fid] = {"_data": base64.b64encode(fb).decode('utf-8'), "content_type": mt, "filename": f.filename} |
| | return jsonify({"id": fid, "object": "file", "filename": f.filename, "purpose": "vision"}) |
| |
|
| |
|
| |
|
| | |
| |
|
| | def generate_image_koy(prompt, model="gemini-3-pro-image-preview", size="1024x1024", ratio=None): |
| | url = "https://koy.xx.kg/_internal/generate" |
| | |
| | |
| | width, height = 1024, 1024 |
| | |
| | |
| | if ratio: |
| | ratios = { |
| | "1:1": (1024, 1024), |
| | "16:9": (1344, 768), |
| | "9:16": (768, 1344), |
| | "3:2": (1216, 832), |
| | "2:3": (832, 1216), |
| | "4:5": (896, 1152), |
| | "21:9": (1536, 640) |
| | } |
| | if ratio in ratios: |
| | width, height = ratios[ratio] |
| | |
| | elif size and 'x' in size: |
| | try: |
| | w, h = size.split('x') |
| | width, height = int(w), int(h) |
| | except: pass |
| |
|
| | payload = { |
| | "prompt": prompt, |
| | "negative_prompt": "", |
| | "provider": "nonpon", |
| | "model": model, |
| | "width": width, |
| | "height": height, |
| | "style": "none", |
| | "seed": -1, |
| | "steps": 30, |
| | "guidance": 7.5, |
| | "quality_mode": "standard", |
| | "n": 1, |
| | "nologo": True, |
| | "auto_optimize": True, |
| | "auto_hd": True, |
| | "language": "en" |
| | } |
| | |
| | if ratio: payload["ratio"] = ratio |
| |
|
| | headers = { |
| | "sec-ch-ua-platform": "\"Windows\"", |
| | "referer": "https://koy.xx.kg/nano", |
| | "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/144.0.0.0 Safari/537.36", |
| | "sec-ch-ua": "\"Not(A:Brand\";v=\"8\", \"Chromium\";v=\"144\", \"Google Chrome\";v=\"144\"", |
| | "content-type": "application/json", |
| | "sec-ch-ua-mobile": "?0", |
| | "x-source": "nano-page" |
| | } |
| |
|
| | try: |
| | response = requests.post(url, json=payload, headers=headers, timeout=120) |
| | if response.status_code == 200: |
| | return response.json() |
| | else: |
| | print(f"Image gen failed: {response.status_code} - {response.text}") |
| | return None |
| | except Exception as e: |
| | print(f"Image gen error: {e}") |
| | return None |
| |
|
| | @app.route('/v1/images/generations', methods=['POST']) |
| | @app.route('/v1/image_generations', methods=['POST']) |
| | def image_generations(): |
| | data = request.json |
| | prompt = data.get('prompt') |
| | if not prompt: |
| | return jsonify({"error": "Missing prompt"}), 400 |
| | |
| | model = data.get('model', 'gemini-3-pro-image-preview') |
| | size = data.get('size', '1024x1024') |
| | ratio = data.get('ratio') or data.get('aspect_ratio') |
| | |
| | res = generate_image_koy(prompt, model, size, ratio) |
| | if res: |
| | |
| | |
| | image_url = res.get('url') or res.get('image') or res.get('data', [{}])[0].get('url') |
| | if not image_url and isinstance(res, dict): |
| | |
| | if 'data' in res: return jsonify(res) |
| | |
| | for val in res.values(): |
| | if isinstance(val, str) and (val.startswith('http') or val.startswith('data:')): |
| | image_url = val |
| | break |
| | |
| | if image_url: |
| | return jsonify({ |
| | "created": int(time.time()), |
| | "data": [{"url": image_url}] |
| | }) |
| | |
| | return jsonify({"error": "Failed to generate image"}), 500 |
| |
|
| | if __name__ == '__main__': |
| | app.run(host='0.0.0.0', port=7860, debug=True) |