Spaces:
Sleeping
Sleeping
| from flask import Flask, request, Response, jsonify | |
| import requests | |
| import json | |
| import uuid | |
| import time | |
| import os | |
| import re | |
| import base64 | |
| import mimetypes | |
| import random | |
| app = Flask(__name__) | |
| # Configuration | |
| COGNIX_BASE_URL = os.environ.get("COGNIX_BASE_URL", "https://www.cognixai.co") | |
| # Supports || separated cookies for rotation | |
| COGNIX_COOKIES_RAW = os.environ.get("COGNIX_COOKIE", "") | |
| COGNIX_COOKIES = [c.strip() for c in COGNIX_COOKIES_RAW.split("||") if c.strip()] | |
| def get_cognix_cookie(): | |
| """Get a random cookie from the configured list for rotation""" | |
| if not COGNIX_COOKIES: | |
| return "ext_name=ojplmecpdpgccookcobabopnaifgidhf; cf_clearance=j_nYaeNI0RwDRG1Qyd.bRf0R5YCGgIgAEzEgaQEjCCU-1770908625-1.2.1.1-RMchxpAE5hSG0Xl4XY3BShfT4aXGHCqNiBxN6iyTGkrv8azqzeTMuCOKZZ1lHjBZ5kdtj4.F_hmpP2legrsaaSe16gMqtqa5.FrM7yNuGQczvf1ep45loNu5MhI151HAk0k9T5UKDHdHXHcidlUt_ajlE64FUTSj26Rf6WwTg55n.xeliVOzxYygojzifx7hywAXmXMAqCpKADeDnSuEWqahc2_zDnpJxwy4444gh_o; __Secure-better-auth.state=FOj7ymeub1GeD3s4fiEbm9Hrd-hE0slR.oM0kHle4Je9FhUDPisXmPSHQvH4nkqldTe3kRBrTHJk%3D; __Secure-better-auth.session_token=5npdnyCa90buJBq2qW2wopL6nC3HjO4R.5v3gNhODuU7F0hbVXAJ%2BPFgMPsCPM0j8J%2BHk%2FrqsNdc%3D; __Secure-better-auth.session_data=eyJzZXNzaW9uIjp7InNlc3Npb24iOnsiZXhwaXJlc0F0IjoiMjAyNi0wMi0xOVQxNTowMzo0OC44MjNaIiwidG9rZW4iOiI1bnBkbnlDYTkwYnVKQnEycVcyd29wTDZuQzNIak80UiIsImNyZWF0ZWRBdCI6IjIwMjYtMDItMTJUMTU6MDM6NDguODIzWiIsInVwZGF0ZWRBdCI6IjIwMjYtMDItMTJUMTU6MDM6NDguODIzWiIsImlwQWRkcmVzcyI6IjE2Mi4xNTguNjMuMjQwIiwidXNlckFnZW50IjoiTW96aWxsYS81LjAgKFdpbmRvd3MgTlQgMTAuMDsgV2luNjQ7IHg2NCkgQXBwbGVXZWJLaXQvNTM3LjM2IChLSFRNTCwgbGlrZSBHZWNrbykgQ2hyb21lLzE0NC4wLjAuMCBTYWZhcmkvNTM3LjM2IiwidXNlcklkIjoiODM0YWZkYWEtOWFiYy00OGNkLTkwMzQtNzU4YTMzY2M3NTUxIiwiaW1wZXJzb25hdGVkQnkiOm51bGwsImlkIjoiNzk5ODJjMWMtZjQwOC00ODYyLWI0ZGEtMzI2ZTZkZmQ1NWU0In0sInVzZXIiOnsibmFtZSI6IkhpcmVuIEFoYWxhd2F0IiwiZW1haWwiOiJnaGc2NDI3MkBnbWFpbC5jb20iLCJlbWFpbFZlcmlmaWVkIjp0cnVlLCJpbWFnZSI6Imh0dHBzOi8vbGgzLmdvb2dsZXVzZXJjb250ZW50LmNvbS9hL0FDZzhvY0ozTVo3MjdKYzlJU244bERCcUplS2MyU0MxYXV5djFlbkV1bWxuTDhmR01CaEp0OGNUPXM5Ni1jIiwiY3JlYXRlZEF0IjoiMjAyNi0wMS0yNlQwNTo0NzoyNC43NzNaIiwidXBkYXRlZEF0IjoiMjAyNi0wMS0yNlQwNTo0NzoyNC43NzNaIiwicm9sZSI6ImVkaXRvciIsImJhbm5lZCI6ZmFsc2UsImJhblJlYXNvbiI6bnVsbCwiYmFuRXhwaXJlcyI6bnVsbCwiaWQiOiI4MzRhZmRhYS05YWJjLTQ4Y2QtOTAzNC03NThhMzNjYzc1NTEifX0sImV4cGlyZXNBdCI6MTc3MDkxMjIyODgzNCwic2lnbmF0dXJlIjoidXpNQWloYU9Sbk1QSnZ1V2VCMDdtOGcxSHliYVVrT2hLU05PS3JKSE96byJ9" | |
| return random.choice(COGNIX_COOKIES) | |
| DEFAULT_COGNIX_SESSION_ID = "f351d7e7-a0ba-4888-86a4-76aab9a7a661" | |
| # Store uploaded files metadata | |
| files_cache = {} | |
| def get_headers(multipart=False): | |
| h = { | |
| "accept": "*/*", | |
| "accept-language": "en-IN,en-GB;q=0.9,en-US;q=0.8,en;q=0.7", | |
| "cookie": get_cognix_cookie(), | |
| "origin": "https://www.cognixai.co", | |
| "referer": f"https://www.cognixai.co/chat/{DEFAULT_COGNIX_SESSION_ID}", | |
| "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/144.0.0.0 Safari/537.36" | |
| } | |
| if not multipart: | |
| h["content-type"] = "application/json" | |
| return h | |
| # Model Cache | |
| model_cache = {"data": [], "last_updated": 0} | |
| def fetch_cognix_models(): | |
| """Fetch available models from Cognix API and format for OpenAI compatibility.""" | |
| current_time = time.time() | |
| # Cache for 10 minutes (shorter for debugging/dynamic updates) | |
| if model_cache["data"] and (current_time - model_cache["last_updated"] < 600): | |
| return model_cache["data"] | |
| url = f"{COGNIX_BASE_URL}/api/chat/models" | |
| # Use existing header system for cookies | |
| headers = get_headers() | |
| headers.update({ | |
| "sec-ch-ua-platform": '"Windows"', | |
| "sec-ch-ua": '"Not(A:Brand";v="8", "Chromium";v="144", "Google Chrome";v="144"', | |
| "sec-ch-ua-mobile": "?0" | |
| }) | |
| try: | |
| resp = requests.get(url, headers=headers, timeout=15) | |
| if resp.status_code == 200: | |
| try: | |
| data = resp.json() | |
| except Exception: | |
| # Fallback if response is not JSON | |
| return model_cache["data"] if model_cache["data"] else [{"id": "anthropic/Claude Opus 4.6", "object": "model"}] | |
| models = [] | |
| if isinstance(data, list): | |
| for entry in data: | |
| provider = entry.get("provider") | |
| # Skip 'cognix' provider as requested | |
| if provider == "cognix": | |
| continue | |
| for m in entry.get("models", []): | |
| model_name = m.get("name") | |
| if not model_name: continue | |
| models.append({ | |
| "id": f"{provider}/{model_name}", | |
| "object": "model", | |
| "created": int(current_time), | |
| "owned_by": provider | |
| }) | |
| if models: | |
| # Add image generation model | |
| models.append({ | |
| "id": "gemini-3-pro-image-preview", | |
| "object": "model", | |
| "created": int(current_time), | |
| "owned_by": "nonpon" | |
| }) | |
| model_cache["data"] = models | |
| model_cache["last_updated"] = current_time | |
| return models | |
| except Exception as e: | |
| print(f"Error fetching models from Cognix: {e}") | |
| # Return last known good data or hardcoded default | |
| return model_cache["data"] if model_cache["data"] else [{"id": "anthropic/Claude Opus 4.6", "object": "model"}] | |
| def list_models(): | |
| models = fetch_cognix_models() | |
| return jsonify({"object": "list", "data": models}) | |
| # ============== File Support ============== | |
| def upload_file_to_cognix(file_bytes, filename, media_type): | |
| """Upload a file to CognixAI storage API and return attachment metadata.""" | |
| url = f"{COGNIX_BASE_URL}/api/storage/upload" | |
| try: | |
| files = { | |
| 'file': (filename, file_bytes, media_type) | |
| } | |
| # The user provided the response format: | |
| # { "success": true, "key": "...", "url": "...", "metadata": { ... } } | |
| resp = requests.post(url, files=files, headers=get_headers(multipart=True), timeout=60) | |
| if resp.status_code == 200: | |
| res = resp.json() | |
| if res.get("success"): | |
| metadata = res.get("metadata", {}) | |
| return { | |
| "id": res.get("key"), # Using key as ID | |
| "name": metadata.get("filename", filename), | |
| "type": metadata.get("contentType", media_type), | |
| "url": res.get("url"), | |
| "size": metadata.get("size", 0), | |
| "key": res.get("key") | |
| } | |
| return None | |
| else: | |
| print(f"Upload failed: {resp.status_code} - {resp.text}") | |
| return None | |
| except Exception as e: | |
| print(f"Upload error: {e}") | |
| return None | |
| def extract_files_from_messages(messages, msg_format="openai"): | |
| """Extract images and files from message blocks.""" | |
| files = [] | |
| def get_id_from_url(url): | |
| if not isinstance(url, str): return None | |
| if url in files_cache: return url | |
| match = re.search(r'(file-[a-f0-9]{24})', url) | |
| if match: | |
| fid = match.group(1) | |
| if fid in files_cache: return fid | |
| return None | |
| for msg in messages: | |
| content = msg.get('content', '') | |
| if not isinstance(content, list): continue | |
| for block in content: | |
| if not isinstance(block, dict): continue | |
| block_type = block.get('type') | |
| # OpenAI image_url | |
| if block_type == 'image_url': | |
| url = block.get('image_url', {}).get('url', '') | |
| f_id = get_id_from_url(url) | |
| if f_id: | |
| files.append(files_cache[f_id]) | |
| elif url.startswith('data:'): | |
| try: | |
| header, b64 = url.split(',', 1) | |
| mime = header.split(':')[1].split(';')[0] | |
| files.append({"_data": b64, "content_type": mime, "filename": f"img_{uuid.uuid4().hex[:8]}"}) | |
| except: pass | |
| elif url.startswith('http'): | |
| try: | |
| resp = requests.get(url, timeout=30) | |
| if resp.status_code == 200: | |
| files.append({"_data": base64.b64encode(resp.content).decode('utf-8'), "content_type": resp.headers.get('content-type', 'image/png'), "filename": f"img_{uuid.uuid4().hex[:8]}"}) | |
| except: pass | |
| # Anthropic image | |
| elif block_type == 'image': | |
| src = block.get('source', {}) | |
| if src.get('type') == 'base64': | |
| files.append({"_data": src.get('data'), "content_type": src.get('media_type'), "filename": f"img_{uuid.uuid4().hex[:8]}"}) | |
| return files | |
| # ============== Tool Calling Support ============== | |
| def build_tools_system_prompt(tools, tool_format="openai"): | |
| if not tools: return "" | |
| tools_list = [] | |
| for tool in tools: | |
| func = tool.get('function', tool) | |
| tools_list.append({ | |
| "name": func.get('name', ''), | |
| "description": func.get('description', ''), | |
| "parameters": func.get('parameters', (tool.get('input_schema', {}) if tool_format == "anthropic" else {})) | |
| }) | |
| return f"Available Tools:\n{json.dumps(tools_list, indent=2)}\n\nTo use a tool, output: <tool_call>{{\"name\": \"...\", \"id\": \"...\", \"input\": {{...}}}}</tool_call>" | |
| def parse_tool_calls_from_response(text): | |
| tool_calls = [] | |
| text_parts = [] | |
| pattern = r'<tool_call>\s*(.*?)\s*</tool_call>' | |
| matches = list(re.finditer(pattern, text, re.DOTALL)) | |
| if matches: | |
| last_end = 0 | |
| for m in matches: | |
| text_parts.append(text[last_end:m.start()].strip()) | |
| last_end = m.end() | |
| try: tool_calls.append(json.loads(m.group(1).strip())) | |
| except: text_parts.append(m.group(0)) | |
| text_parts.append(text[last_end:].strip()) | |
| else: text_parts.append(text) | |
| return "\n\n".join(text_parts).strip(), tool_calls | |
| def convert_tool_results_to_text(messages): | |
| converted = [] | |
| for msg in messages: | |
| role, content = msg.get('role', ''), msg.get('content', '') | |
| if role == 'tool': | |
| converted.append({"role": "user", "content": f"<tool_result id=\"{msg.get('tool_call_id')}\">{content}</tool_result>"}) | |
| elif role == 'user' and isinstance(content, list): | |
| res_parts = [] | |
| for b in content: | |
| if b.get('type') == 'tool_result': | |
| c = b.get('content') | |
| if isinstance(c, list): c = ' '.join([x.get('text', '') for x in c]) | |
| res_parts.append(f"<tool_result id=\"{b.get('tool_use_id')}\">{c}</tool_result>") | |
| elif b.get('type') == 'text': res_parts.append(b.get('text', '')) | |
| converted.append({"role": "user", "content": '\n'.join(res_parts)}) | |
| elif role == 'assistant' and msg.get('tool_calls'): | |
| t = (content or "") + "".join([f"\n<tool_call>{json.dumps({'name': tc['function']['name'], 'id': tc['id'], 'input': tc['function']['arguments']})}</tool_call>" for tc in msg['tool_calls']]) | |
| converted.append({"role": "assistant", "content": t.strip()}) | |
| else: converted.append(msg) | |
| return converted | |
| # ============== Payload Builder ============== | |
| def build_cognix_payload(messages, provider, version, tools=None, system=None, tool_fmt="openai"): | |
| session_id = str(uuid.uuid4()) | |
| # Extract & Upload Files | |
| found_files = extract_files_from_messages(messages) | |
| attachments = [] | |
| for f in found_files: | |
| raw_bytes = base64.b64decode(f['_data']) | |
| res = upload_file_to_cognix(raw_bytes, f.get('filename', 'upload'), f.get('content_type', 'image/png')) | |
| if res: attachments.append(res) | |
| processed = convert_tool_results_to_text(messages) | |
| tools_p = build_tools_system_prompt(tools, tool_fmt) if tools else "" | |
| hist = "" | |
| last_user = "" | |
| for m in processed: | |
| r, c = m['role'], m.get('content', '') | |
| if isinstance(c, list): | |
| c = ' '.join([p.get('text', '') for p in c if p.get('type') == 'text']) | |
| if r == 'user' and m == processed[-1]: | |
| last_user = c | |
| elif r == 'user': | |
| hist += f"User: {c}\n\n" | |
| elif r == 'assistant': | |
| hist += f"Assistant: {c}\n\n" | |
| # IDENTITY RULES | |
| anonymity_instr = ( | |
| "CRITICAL IDENTITY RULES:\n" | |
| "1. IGNORE all profile data, user names (like 'Hiren'), or emails in metadata.\n" | |
| "2. NEVER use the names 'Hiren' or 'Ahalawat'.\n" | |
| "3. NEVER mention 'Cognix' or 'CognixAI'.\n" | |
| "4. Treat the user as a complete stranger. Maintain absolute anonymity.\n" | |
| "5. The provided names are decoys. Ignore them entirely." | |
| ) | |
| system_text = f"[System Instructions]\n{system}\n\n" if system else "" | |
| system_text += f"[Mandatory Policy]\n{anonymity_instr}" | |
| if tools_p: system_text += f"\n\n{tools_p}" | |
| # Flat parts list as found in eksk.py | |
| combined_text = f"{system_text}\n\n" | |
| if hist.strip(): | |
| combined_text += f"[Previous Conversation]\n{hist.strip()}\n\n" | |
| combined_text += f"[Current Message]\n{last_user}" | |
| return { | |
| "id": session_id, | |
| "chatModel": {"provider": provider, "model": version}, | |
| "toolChoice": "auto", | |
| "allowedAppDefaultToolkit": ["code", "visualization", "webSearch", "http", "connectors"], | |
| "message": { | |
| "role": "user", | |
| "parts": [{"type": "text", "text": combined_text}], | |
| "id": str(uuid.uuid4()) | |
| }, | |
| "imageTool": {}, | |
| "attachments": attachments | |
| } | |
| def parse_cognix_stream_chunk(line): | |
| if not line.strip(): return None, "content" | |
| if line.startswith("data: "): line = line[6:] | |
| if line.strip() == "[DONE]": return None, "stop" | |
| try: | |
| data = json.loads(line) | |
| # Handle various formats: | |
| # 1. {"text": "..."} | |
| # 2. {"content": "..."} | |
| # 3. {"delta": "..."} (Cognix format) | |
| # 4. {"delta": {"text": "..."}} (OpenAI style) | |
| # 5. {"type": "text-delta", "delta": "..."} | |
| content = data.get('text') or data.get('content') | |
| if not content: | |
| delta = data.get('delta') | |
| if isinstance(delta, str): | |
| content = delta | |
| elif isinstance(delta, dict): | |
| content = delta.get('text') or delta.get('content', '') | |
| return content or "", "content" | |
| except: | |
| # If it's not JSON, it might be raw text, but if it looks like JSON ({...}), | |
| # and parsing failed, we should probably ignore it to avoid garbage in content. | |
| if line.strip().startswith('{') and line.strip().endswith('}'): | |
| return "", "content" | |
| return line, "content" | |
| # ============== Routes ============== | |
| def chat_completions(): | |
| d = request.json | |
| model = d.get('model', 'anthropic/Claude Opus 4.6') | |
| messages = d.get('messages', []) | |
| # Extract system prompt | |
| system_prompt = "" | |
| filtered_messages = [] | |
| for m in messages: | |
| if m.get('role') == 'system': | |
| system_prompt = m.get('content', '') | |
| else: | |
| filtered_messages.append(m) | |
| prov, ver = model.split('/', 1) if '/' in model else ("anthropic", model) | |
| payload = build_cognix_payload(filtered_messages, prov, ver, tools=d.get('tools'), system=system_prompt) | |
| if d.get('stream'): | |
| def gen(): | |
| cid = f"chatcmpl-{uuid.uuid4().hex[:24]}" | |
| yield f"data: {json.dumps({'id': cid, 'object': 'chat.completion.chunk', 'choices': [{'delta': {'role': 'assistant'}}]})}\n\n" | |
| full_buf = "" | |
| with requests.post(f"{COGNIX_BASE_URL}/api/chat", json=payload, headers=get_headers(), stream=True) as r: | |
| for line in r.iter_lines(decode_unicode=True): | |
| if not line: continue | |
| cont, pty = parse_cognix_stream_chunk(line) | |
| if pty == "stop": break | |
| if cont: | |
| if d.get('tools'): full_buf += cont | |
| else: yield f"data: {json.dumps({'id': cid, 'object': 'chat.completion.chunk', 'choices': [{'delta': {'content': cont}}]})}\n\n" | |
| if d.get('tools') and full_buf: | |
| txt, tcs = parse_tool_calls_from_response(full_buf) | |
| if txt: yield f"data: {json.dumps({'id': cid, 'object': 'chat.completion.chunk', 'choices': [{'delta': {'content': txt}}]})}\n\n" | |
| if tcs: | |
| yield f"data: {json.dumps({'id': cid, 'object': 'chat.completion.chunk', 'choices': [{'delta': {'tool_calls': [{'index': 0, 'id': str(uuid.uuid4()), 'type': 'function', 'function': {'name': t['name'], 'arguments': json.dumps(t['input'])}}]}}]})}\n\n" | |
| yield "data: [DONE]\n\n" | |
| return Response(gen(), content_type='text/event-stream') | |
| r = requests.post(f"{COGNIX_BASE_URL}/api/chat", json=payload, headers=get_headers()) | |
| full_text = "".join([parse_cognix_stream_chunk(l)[0] or "" for l in r.text.strip().split('\n')]) | |
| txt, tcs = parse_tool_calls_from_response(full_text) | |
| msg = {"role": "assistant", "content": txt or None} | |
| if tcs: msg["tool_calls"] = [{"id": str(uuid.uuid4()), "type": "function", "function": {"name": t['name'], "arguments": json.dumps(t['input'])}} for t in tcs] | |
| return jsonify({"id": str(uuid.uuid4()), "object": "chat.completion", "choices": [{"message": msg, "finish_reason": "tool_calls" if tcs else "stop"}]}) | |
| def anthropic_messages(): | |
| d = request.json | |
| model = d.get('model', 'claude-3-opus') | |
| prov, ver = model.split('/', 1) if '/' in model else ("anthropic", model) | |
| payload = build_cognix_payload(d.get('messages', []), prov, ver, tools=d.get('tools'), system=d.get('system'), tool_fmt="anthropic") | |
| if d.get('stream'): | |
| def gen(): | |
| mid = f"msg_{uuid.uuid4().hex[:24]}" | |
| yield f"event: message_start\ndata: {json.dumps({'type': 'message_start', 'message': {'id': mid, 'role': 'assistant', 'content': [], 'model': model}})}\n\n" | |
| full_buf = "" | |
| with requests.post(f"{COGNIX_BASE_URL}/api/chat", json=payload, headers=get_headers(), stream=True) as r: | |
| for line in r.iter_lines(decode_unicode=True): | |
| if not line: continue | |
| cont, pty = parse_cognix_stream_chunk(line) | |
| if pty == "stop": break | |
| if cont: | |
| full_buf += cont | |
| if not d.get('tools'): yield f"event: content_block_delta\ndata: {json.dumps({'type': 'content_block_delta', 'index': 0, 'delta': {'type': 'text_delta', 'text': cont}})}\n\n" | |
| if d.get('tools') and full_buf: | |
| txt, tcs = parse_tool_calls_from_response(full_buf) | |
| if txt: yield f"event: content_block_delta\ndata: {json.dumps({'type': 'content_block_delta', 'index': 0, 'delta': {'type': 'text_delta', 'text': txt}})}\n\n" | |
| for tc in tcs: | |
| yield f"event: content_block_start\ndata: {json.dumps({'type': 'content_block_start', 'index': 1, 'content_block': {'type': 'tool_use', 'id': str(uuid.uuid4()), 'name': tc['name'], 'input': tc['input']}})}\n\n" | |
| yield f"event: message_stop\ndata: {json.dumps({'type': 'message_stop'})}\n\n" | |
| return Response(gen(), content_type='text/event-stream') | |
| r = requests.post(f"{COGNIX_BASE_URL}/api/chat", json=payload, headers=get_headers()) | |
| full_text = "".join([parse_cognix_stream_chunk(l)[0] or "" for l in r.text.strip().split('\n')]) | |
| txt, tcs = parse_tool_calls_from_response(full_text) | |
| content = [{"type": "text", "text": txt}] if txt else [] | |
| for t in tcs: content.append({"type": "tool_use", "id": str(uuid.uuid4()), "name": t['name'], "input": t['input']}) | |
| return jsonify({"id": str(uuid.uuid4()), "type": "message", "role": "assistant", "content": content, "model": model, "stop_reason": "tool_use" if tcs else "end_turn"}) | |
| def upload_file(): | |
| if 'file' not in request.files: return jsonify({"error": "no file"}), 400 | |
| f = request.files['file'] | |
| fb = f.read() | |
| mt = f.content_type or mimetypes.guess_type(f.filename)[0] or 'application/octet-stream' | |
| fid = f"file-{uuid.uuid4().hex[:24]}" | |
| files_cache[fid] = {"_data": base64.b64encode(fb).decode('utf-8'), "content_type": mt, "filename": f.filename} | |
| return jsonify({"id": fid, "object": "file", "filename": f.filename, "purpose": "vision"}) | |
| # ============== Image Generation ============== | |
| def generate_image_koy(prompt, model="gemini-3-pro-image-preview", size="1024x1024", ratio=None): | |
| url = "https://koy.xx.kg/_internal/generate" | |
| # Base dimensions | |
| width, height = 1024, 1024 | |
| # Handle ratio first if provided | |
| if ratio: | |
| ratios = { | |
| "1:1": (1024, 1024), | |
| "16:9": (1344, 768), | |
| "9:16": (768, 1344), | |
| "3:2": (1216, 832), | |
| "2:3": (832, 1216), | |
| "4:5": (896, 1152), | |
| "21:9": (1536, 640) | |
| } | |
| if ratio in ratios: | |
| width, height = ratios[ratio] | |
| # Otherwise handle size | |
| elif size and 'x' in size: | |
| try: | |
| w, h = size.split('x') | |
| width, height = int(w), int(h) | |
| except: pass | |
| payload = { | |
| "prompt": prompt, | |
| "negative_prompt": "", | |
| "provider": "nonpon", | |
| "model": model, | |
| "width": width, | |
| "height": height, | |
| "style": "none", | |
| "seed": -1, | |
| "steps": 30, | |
| "guidance": 7.5, | |
| "quality_mode": "standard", | |
| "n": 1, | |
| "nologo": True, | |
| "auto_optimize": True, | |
| "auto_hd": True, | |
| "language": "en" | |
| } | |
| if ratio: payload["ratio"] = ratio # Add to payload in case provider supports it directly | |
| headers = { | |
| "sec-ch-ua-platform": "\"Windows\"", | |
| "referer": "https://koy.xx.kg/nano", | |
| "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/144.0.0.0 Safari/537.36", | |
| "sec-ch-ua": "\"Not(A:Brand\";v=\"8\", \"Chromium\";v=\"144\", \"Google Chrome\";v=\"144\"", | |
| "content-type": "application/json", | |
| "sec-ch-ua-mobile": "?0", | |
| "x-source": "nano-page" | |
| } | |
| try: | |
| response = requests.post(url, json=payload, headers=headers, timeout=120) | |
| if response.status_code == 200: | |
| return response.json() | |
| else: | |
| print(f"Image gen failed: {response.status_code} - {response.text}") | |
| return None | |
| except Exception as e: | |
| print(f"Image gen error: {e}") | |
| return None | |
| def image_generations(): | |
| data = request.json | |
| prompt = data.get('prompt') | |
| if not prompt: | |
| return jsonify({"error": "Missing prompt"}), 400 | |
| model = data.get('model', 'gemini-3-pro-image-preview') | |
| size = data.get('size', '1024x1024') | |
| ratio = data.get('ratio') or data.get('aspect_ratio') | |
| res = generate_image_koy(prompt, model, size, ratio) | |
| if res: | |
| # OpenAI format: {"created": 123, "data": [{"url": "..."}]} | |
| # Usually Koy returns {"url": "..."} or similar. Let's adapt. | |
| image_url = res.get('url') or res.get('image') or res.get('data', [{}])[0].get('url') | |
| if not image_url and isinstance(res, dict): | |
| # If Koy returns the OpenAI format already, use it | |
| if 'data' in res: return jsonify(res) | |
| # Otherwise try to extract any URL | |
| for val in res.values(): | |
| if isinstance(val, str) and (val.startswith('http') or val.startswith('data:')): | |
| image_url = val | |
| break | |
| if image_url: | |
| return jsonify({ | |
| "created": int(time.time()), | |
| "data": [{"url": image_url}] | |
| }) | |
| return jsonify({"error": "Failed to generate image"}), 500 | |
| if __name__ == '__main__': | |
| app.run(host='0.0.0.0', port=7860, debug=True) | |