| | import random |
| | import time |
| | import json |
| | import re |
| | import aiohttp |
| | import requests |
| | import asyncio |
| | from flask import Flask, request, Response |
| | from flask_cors import CORS |
| | import uuid |
| |
|
| | app = Flask(__name__) |
| | CORS(app) |
| |
|
| | MAGAI_TOKEN = { |
| | "cookie": "_fbp=fb.1.1731157547017.999175969945264246; _ga=GA1.1.1511565936.1731157547; soquick-mobile_live_u2main=bus|1731157558019x939242898939773400|1731157558072x230077743876915040; soquick-mobile_live_u2main.sig=O-AoJ9Vd_oxtGTWtsC31B_SsLlU; soquick-mobile_u1main=1731157558019x939242898939773400; _ga_N5J29RVHDJ=GS1.1.1731157547.1.1.1731157568.0.0.0", |
| | "app_last_change": "21388518093", |
| | "current_page_item": "1348695171700984260__LOOKUP__1726124636560x692535552825360400", |
| | "current_user": "1348695171700984260__LOOKUP__1722349236461x685414888067651300", |
| | } |
| |
|
| | MAGAI_MAPPING = { |
| | "gpt-4o": "openai/gpt-4o", |
| | "claude-3.5-sonnet": "anthropic/claude-3.5-sonnet:beta", |
| | "claude-3-opus": "anthropic/claude-3-opus:beta", |
| | "gemini-1.5-pro": "google/gemini-pro-1.5" |
| | } |
| |
|
| | UUID_LENGTH = 1e18 |
| | MODULO = 1e18 |
| |
|
| | def generate_uuid(): |
| | return f"{int(time.time() * 1000)}x{str(round(random.random() * UUID_LENGTH)).zfill(18)}" |
| |
|
| | def create_luid(separator="x"): |
| | timestamp = int(time.time() * 1000) |
| | return f"{timestamp}{separator}1" |
| |
|
| | def format_model_name(model_name): |
| | return re.sub(r"_+", "_", re.sub(r"[/:-]", "_", model_name)) |
| |
|
| |
|
| | def find_token_in_object(obj): |
| | if isinstance(obj, dict): |
| | for key, value in obj.items(): |
| | if key == "token" and isinstance(value, str): |
| | return value |
| | token = find_token_in_object(value) |
| | if token: |
| | return token |
| | return None |
| |
|
| |
|
| | def get_last_user_content(messages): |
| | for message in reversed(messages): |
| | if message["role"] == "user": |
| | return message["content"] |
| | return None |
| |
|
| |
|
| | async def get_token(model, message): |
| | server_call_id = generate_uuid() |
| | created_id = MAGAI_TOKEN["current_page_item"].split("__")[0] |
| | user_id = MAGAI_TOKEN["current_user"].split("__")[2] |
| | model_id = "0060f9accd1dbade552f65ac646fb3da" |
| | item_id = "bUNih7" |
| | element_id = "bUNib7" |
| |
|
| | body = { |
| | "app_last_change": MAGAI_TOKEN["app_last_change"], |
| | "calls": [ |
| | { |
| | "client_state": { |
| | "element_instances": { |
| | "bUNib7": { |
| | "dehydrated": f"{created_id}__LOOKUP__ElementInstance::bUNib7", |
| | "parent_element_id": "bUMiq3", |
| | }, |
| | "bTezP": { |
| | "dehydrated": f"{created_id}__LOOKUP__ElementInstance::bTezP", |
| | "parent_element_id": "bTezJ", |
| | }, |
| | "bTezE": { |
| | "dehydrated": f"{created_id}__LOOKUP__ElementInstance::bTezE", |
| | "parent_element_id": "bTeqc", |
| | }, |
| | "bTezJ": { |
| | "dehydrated": f"{created_id}__LOOKUP__ElementInstance::bTezJ", |
| | "parent_element_id": "bUKFL2", |
| | }, |
| | "bTezQ": { |
| | "dehydrated": f"{created_id}__LOOKUP__ElementInstance::bTezQ", |
| | "parent_element_id": "bUKFL2", |
| | }, |
| | "bUiru0": { |
| | "dehydrated": f"{created_id}__LOOKUP__ElementInstance::bUiru0", |
| | "parent_element_id": "bUjNK", |
| | }, |
| | "bUDVj0": { |
| | "dehydrated": f"{created_id}__LOOKUP__ElementInstance::bUDVj0", |
| | "parent_element_id": "bUMiq3", |
| | }, |
| | "bUXzm2": { |
| | "dehydrated": f"{created_id}__LOOKUP__ElementInstance::bUXzm2", |
| | "parent_element_id": "bUMhk3", |
| | }, |
| | "bUifI1": { |
| | "dehydrated": f"{created_id}__LOOKUP__ElementInstance::bUifI1", |
| | "parent_element_id": "bTeqg", |
| | }, |
| | "bUMiq3": { |
| | "dehydrated": f"{created_id}__LOOKUP__ElementInstance::bUMiq3", |
| | "parent_element_id": "bTezE", |
| | }, |
| | "bTekm": { |
| | "dehydrated": f"{created_id}__LOOKUP__ElementInstance::bTekm", |
| | "parent_element_id": None, |
| | }, |
| | }, |
| | "element_state": { |
| | f"{created_id}__LOOKUP__ElementInstance::bTezP": { |
| | "is_visible": True, |
| | "value_that_is_valid": message, |
| | "value": message, |
| | }, |
| | f"{created_id}__LOOKUP__ElementInstance::bTezE": { |
| | "custom.images_": None, |
| | "custom.file_": None, |
| | "custom.file_content_": None, |
| | "custom.file_name_": None, |
| | "custom.file_type_": None, |
| | }, |
| | f"{created_id}__LOOKUP__ElementInstance::bTezJ": { |
| | "custom.isrecording_": None, |
| | "custom.prompt_": None, |
| | }, |
| | f"{created_id}__LOOKUP__ElementInstance::bUiru0": { |
| | "AAE": message |
| | }, |
| | f"{created_id}__LOOKUP__ElementInstance::bUDVj0": { |
| | "AAE": message |
| | }, |
| | f"{created_id}__LOOKUP__ElementInstance::bUifI1": { |
| | "custom.is_visible_": None, |
| | "group_data": None, |
| | }, |
| | f"{created_id}__LOOKUP__ElementInstance::bUMiq3": { |
| | "group_data": None |
| | }, |
| | }, |
| | "other_data": { |
| | "Current Page Scroll Position": 0, |
| | "Current Page Width": 661, |
| | }, |
| | "cache": { |
| | f"{model_id}": format_model_name(model), |
| | "true": True, |
| | "CurrentPageItem": MAGAI_TOKEN["current_page_item"], |
| | "CurrentUser": MAGAI_TOKEN["current_user"], |
| | }, |
| | "exists": { |
| | f"{model_id}": True, |
| | "true": True, |
| | "CurrentPageItem": True, |
| | "CurrentUser": True, |
| | }, |
| | }, |
| | "run_id": generate_uuid(), |
| | "server_call_id": server_call_id, |
| | "item_id": item_id, |
| | "element_id": element_id, |
| | "page_id": "bTekm", |
| | "uid_generator": { |
| | "timestamp": int(time.time() * 1000), |
| | "seed": round(random.random() * UUID_LENGTH) % MODULO, |
| | }, |
| | "random_seed": random.random(), |
| | "current_date_time": int(time.time() * 1000), |
| | "current_wf_params": {}, |
| | } |
| | ], |
| | "client_breaking_revision": 5, |
| | "timezone_offset": -480, |
| | "timezone_string": "Asia/Shanghai", |
| | "user_id": user_id, |
| | "wait_for": [], |
| | } |
| |
|
| | url = "https://app.magai.co/workflow/start" |
| | async with aiohttp.ClientSession() as session: |
| | async with session.post( |
| | url, |
| | headers={ |
| | "x-bubble-fiber-id": generate_uuid(), |
| | "x-bubble-pl": create_luid(), |
| | "accept": "application/json, text/javascript, */*; q=0.01", |
| | "cookie": MAGAI_TOKEN["cookie"], |
| | }, |
| | json=body, |
| | ) as response: |
| | response_data = await response.json() |
| |
|
| | if "error_class" in response_data: |
| | raise Exception(response_data) |
| |
|
| | server_call_data = response_data.get(server_call_id) |
| | if not server_call_data or "step_results" not in server_call_data: |
| | return None |
| | for step_result in server_call_data["step_results"].values(): |
| | if isinstance(step_result.get("return_value"), dict): |
| | token = find_token_in_object(step_result["return_value"]) |
| | if token: |
| | return token |
| |
|
| |
|
| | async def get_request_data(model, messages): |
| | if model not in MAGAI_MAPPING: |
| | return Response( |
| | json.dumps( |
| | { |
| | "error": { |
| | "message": "This model is currently unavailable. Please try again later or choose another model.", |
| | "code": "model_not_exists", |
| | } |
| | } |
| | ), |
| | status=400, |
| | mimetype="application/json", |
| | ) |
| |
|
| | last_user_message = get_last_user_content(messages) |
| | token = await get_token(MAGAI_MAPPING[model], last_user_message) |
| | headers = { |
| | "Content-Type": "application/json", |
| | "HTTP-Referer": "https://magai.co", |
| | "Origin": "https://app.magai.co", |
| | "Pragma": "no-cache", |
| | "Referer": "https://app.magai.co/", |
| | "Token": token, |
| | "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/500.00 (KHTML, like Gecko) Chrome/100.0.0.0 Safari/500.00", |
| | } |
| |
|
| | json_data = { |
| | "model": MAGAI_MAPPING[model], |
| | "messages": [{"role": "system", "content": "You are a helpful assistant."}] |
| | + messages, |
| | "tools": [ |
| | { |
| | "type": "function", |
| | "function": { |
| | "name": "get_actual_time_info", |
| | "description": "Returns actual information from web about prompt theme.", |
| | "parameters": { |
| | "type": "object", |
| | "properties": { |
| | "query": { |
| | "type": "string", |
| | "description": "The query string based on users prompt to search information about.", |
| | } |
| | }, |
| | "required": ["query"], |
| | }, |
| | }, |
| | }, |
| | { |
| | "type": "function", |
| | "function": { |
| | "name": "generate_image", |
| | "description": "Returns generated image URL.", |
| | "parameters": { |
| | "type": "object", |
| | "properties": { |
| | "query": { |
| | "type": "string", |
| | "description": "Prompt to image generation AI model, that describes what image to generate.", |
| | } |
| | }, |
| | "required": ["query"], |
| | }, |
| | }, |
| | }, |
| | ], |
| | "provider": {"data_collection": "deny"}, |
| | "tool_choice": "auto", |
| | "stream": True, |
| | } |
| |
|
| | response = requests.post( |
| | "https://live.proxy.magai.co:4430/opr/api/v1/chat/completions", |
| | headers=headers, |
| | json=json_data, |
| | ) |
| | return response |
| |
|
| |
|
| | def format_response(response): |
| | content = "" |
| | for line in response.iter_lines(): |
| | if line: |
| | decoded_line = line.decode("utf-8") |
| | if decoded_line.startswith("data:"): |
| | try: |
| | data = json.loads(decoded_line[5:].strip()) |
| | if "choices" in data and len(data["choices"]) > 0: |
| | delta = data["choices"][0].get("delta", {}) |
| | if "content" in delta: |
| | content += delta["content"] |
| | except json.JSONDecodeError: |
| | pass |
| | return content |
| |
|
| |
|
| | @app.route("/hf/v1/chat/completions", methods=["POST"]) |
| | def chat_completions(): |
| | data = request.json |
| | messages = data.get("messages", []) |
| | model = data.get("model", "claude-3.5-sonnet") |
| |
|
| | async def process_request(): |
| | response = await get_request_data(model, messages) |
| | return format_response(response) |
| |
|
| | loop = asyncio.new_event_loop() |
| | asyncio.set_event_loop(loop) |
| | result = loop.run_until_complete(process_request()) |
| |
|
| | event_stream_response = "" |
| | for part in result: |
| | part = part.replace("\n", "\\n") |
| | event_stream_response += f'data:{{"id":"{uuid.uuid4()}","object":"chat.completion.chunk","created":{int(time.time())},"model":"{model}","system_fingerprint":"fp_45ah8ld5a7","choices":[{{"index":0,"delta":{{"content":"{part}"}},"logprobs":null,"finish_reason":null}}]}}\n\n' |
| | event_stream_response += "data:[DONE]\n" |
| |
|
| | return Response(event_stream_response, mimetype="text/event-stream") |
| |
|
| |
|
| | if __name__ == "__main__": |
| | app.run(host="0.0.0.0", port=7860) |