Spaces:
Running
Running
| import requests | |
| import hashlib | |
| import uuid | |
| import time | |
| import json | |
| from typing import Dict, Any, Optional, Tuple | |
| import asyncio | |
| from pathlib import Path | |
| import mimetypes | |
| from flask import Flask, request, jsonify | |
| from werkzeug.utils import secure_filename | |
| import io | |
| # --- Configuration --- | |
| API_BASE_URL = "https://api.grid.plus" | |
| APP_ID = "808645" | |
| PLATFORM = "h5" | |
| APP_VERSION = "8.9.7" | |
| SIGNATURE_SALT = "Pg@photo_photogrid#20250225" | |
| SIGNATURE_PREFIX = "XX" | |
| COMMON_USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/136.0.0.0 Safari/537.36" | |
| app = Flask(__name__) | |
| def generate_uuid() -> str: | |
| return str(uuid.uuid4()) | |
| def hash_md5(data: str) -> str: | |
| return hashlib.md5(data.encode('utf-8')).hexdigest() | |
| def hash_sha256(data: str) -> str: | |
| return hashlib.sha256(data.encode('utf-8')).hexdigest() | |
| async def fetch_current_ip(session: requests.Session) -> str: | |
| placeholder_ip = f"98.76.54.{generate_uuid()[:3].replace('-', '')}" | |
| return placeholder_ip | |
| async def generate_ghost_id_once(session: requests.Session, device_id: str) -> str: | |
| ip_address = await fetch_current_ip(session) | |
| ghost_id = hash_md5(device_id + ip_address) | |
| return ghost_id | |
| def build_headers(token: str, uid: str, device_id: str, mcc: str = "en-US") -> Dict[str, str]: | |
| return { | |
| "X-AppID": APP_ID, | |
| "X-Platform": PLATFORM, | |
| "X-Version": APP_VERSION, | |
| "X-SessionToken": token, | |
| "X-UniqueID": uid, | |
| "X-DeviceID": device_id, | |
| "X-MCC": mcc, | |
| "User-Agent": COMMON_USER_AGENT, | |
| } | |
| def _value_to_string_for_signature(value: Any) -> str: | |
| if isinstance(value, bool): | |
| return "true" if value else "false" | |
| if value is None: | |
| return "null" | |
| return str(value) | |
| async def create_signature(data_obj: Dict[str, Any], step_name: str) -> str: | |
| sorted_keys = sorted(data_obj.keys()) | |
| concatenated_string = "" | |
| for key in sorted_keys: | |
| value_str = _value_to_string_for_signature(data_obj[key]) | |
| concatenated_string += key + value_str | |
| string_to_hash = concatenated_string + SIGNATURE_SALT | |
| signature = SIGNATURE_PREFIX + hash_sha256(string_to_hash) | |
| return signature | |
| async def http_request( | |
| session: requests.Session, | |
| endpoint: str, | |
| data: Optional[Dict[str, Any]], | |
| method: str, | |
| content_type_key: str, | |
| cookies: Dict[str, str], | |
| precomputed_ghost_id: str, | |
| step_name: str, | |
| is_s3_upload: bool = False | |
| ) -> Tuple[Optional[Dict[str, Any]], int, Optional[requests.Response]]: | |
| url = API_BASE_URL + endpoint if not is_s3_upload else endpoint | |
| if is_s3_upload: | |
| s3_headers = {"Content-Type": data.get("Content-Type", "application/octet-stream")} | |
| request_kwargs = {"method": "PUT", "url": url, "headers": s3_headers, "data": data.get('file_content'), "timeout": 60} | |
| else: | |
| default_headers = build_headers( | |
| token=cookies.get("t", ""), | |
| uid=cookies.get("u", ""), | |
| device_id=cookies.get("did", "") | |
| ) | |
| ghost_id = precomputed_ghost_id | |
| x_headers_for_sig = {k: v for k, v in default_headers.items() if k.startswith("X-")} | |
| signable_payload_parts = {} | |
| if data: | |
| if content_type_key == "MULTIPART": | |
| signable_payload_parts = {k: v for k, v in data.items() if k != 'file'} | |
| else: | |
| signable_payload_parts = data | |
| data_to_sign = {**x_headers_for_sig, "X-GhostID": ghost_id, **signable_payload_parts} | |
| signature = await create_signature(data_to_sign, step_name) | |
| final_headers = {**default_headers, "sig": signature, "X-GhostID": ghost_id} | |
| content_type_map = {"JSON": "application/json", "FORM": "application/x-www-form-urlencoded"} | |
| if content_type_key != "MULTIPART" and content_type_key in content_type_map: | |
| final_headers["Content-Type"] = content_type_map[content_type_key] | |
| request_kwargs = {"method": method.upper(), "url": url, "headers": final_headers, "cookies": cookies, "timeout": 30} | |
| if data: | |
| if content_type_key == "MULTIPART": | |
| request_kwargs["files"] = {'file': data.get('file')} if data.get('file') else None | |
| request_kwargs["data"] = {k: str(v) for k, v in data.items() if k != 'file'} | |
| elif content_type_key == "JSON": | |
| request_kwargs["json"] = data | |
| else: | |
| request_kwargs["data"] = data | |
| try: | |
| response = await asyncio.to_thread(session.request, **request_kwargs) | |
| if is_s3_upload: | |
| return ({"s3_status": "success"} if response.ok else {"s3_status": "failed", "raw_text": response.text}, | |
| response.status_code, response) | |
| try: | |
| json_data = response.json() | |
| return json_data, response.status_code, response | |
| except json.JSONDecodeError: | |
| return {"raw_text": response.text, "code": 0 if response.ok else response.status_code, "_non_json_response": True}, response.status_code, response | |
| except requests.RequestException as e: | |
| status_code = 0 | |
| response_text = "" | |
| full_response_obj = e.response | |
| if e.response is not None: | |
| status_code = e.response.status_code | |
| response_text = e.response.text | |
| return {"error_message": str(e), "raw_text": response_text, "code": -1, "_request_exception": True}, status_code, full_response_obj | |
| async def upscale_image(image_bytes: bytes, filename: str) -> Optional[str]: | |
| print(f"Starting upscale process for uploaded image: {filename}") | |
| async with asyncio.Semaphore(1): | |
| with requests.Session() as session: | |
| device_id = generate_uuid() | |
| session_token = "" | |
| unique_id = "" | |
| _vid_cookie = generate_uuid() | |
| cookies = { | |
| "did": device_id, "t": session_token, | |
| "u": unique_id, "_vid": _vid_cookie | |
| } | |
| flow_ghost_id = await generate_ghost_id_once(session, device_id) | |
| file_extension = filename.rsplit('.', 1)[-1].lower() | |
| if file_extension == "jpg": file_extension = "jpeg" | |
| get_upload_url_payload = { | |
| 'ext': file_extension, | |
| 'method': 'wn_superresolution' | |
| } | |
| s3_url_response_data, status, _ = await http_request( | |
| session, "/v1/ai/web/nologin/getuploadurl", | |
| get_upload_url_payload, "POST", "MULTIPART", cookies, | |
| precomputed_ghost_id=flow_ghost_id, | |
| step_name="GetUploadURL" | |
| ) | |
| if not s3_url_response_data or s3_url_response_data.get("code") != 0: | |
| return None | |
| s3_upload_url = s3_url_response_data.get("data", {}).get("upload_url") | |
| s3_img_url = s3_url_response_data.get("data", {}).get("img_url") | |
| if not s3_upload_url or not s3_img_url: | |
| return None | |
| content_type_for_s3, _ = mimetypes.guess_type(filename) | |
| content_type_for_s3 = content_type_for_s3 or 'application/octet-stream' | |
| s3_payload = {'file_content': image_bytes, 'Content-Type': content_type_for_s3} | |
| s3_upload_response_data, s3_status, _ = await http_request( | |
| session, s3_upload_url, s3_payload, "PUT", "", cookies, | |
| precomputed_ghost_id=flow_ghost_id, | |
| step_name="S3Upload", is_s3_upload=True | |
| ) | |
| if s3_status not in [200, 201, 204]: | |
| return None | |
| trigger_upscale_payload = { | |
| 'url': s3_img_url, | |
| 'method': 'wn_superresolution' | |
| } | |
| trigger_response_data, status, _ = await http_request( | |
| session, "/v1/ai/web/super_resolution/nologinupload", | |
| trigger_upscale_payload, "POST", "MULTIPART", cookies, | |
| precomputed_ghost_id=flow_ghost_id, | |
| step_name="TriggerUpscale" | |
| ) | |
| if not trigger_response_data or trigger_response_data.get("code") != 0: | |
| return None | |
| task_id = trigger_response_data.get("task_id") or trigger_response_data.get("data", {}).get("task_id") | |
| if not task_id: | |
| return None | |
| max_retries, poll_interval = 20, 7 | |
| for i in range(max_retries): | |
| await asyncio.sleep(poll_interval) | |
| poll_payload = {"task_ids": [task_id]} | |
| poll_response_data, status, _ = await http_request( | |
| session, "/v1/ai/web/super_resolution/nologinbatchresult", | |
| poll_payload, "POST", "JSON", cookies, | |
| precomputed_ghost_id=flow_ghost_id, | |
| step_name="PollResult" | |
| ) | |
| if not poll_response_data: | |
| continue | |
| if poll_response_data.get("code") == 0: | |
| results_list = poll_response_data.get("data", []) | |
| if results_list: | |
| task_result = results_list[0] | |
| current_status = task_result.get("status") | |
| if current_status == 0 or current_status == 2: | |
| upscaled_url_list = task_result.get("result_image_url") or task_result.get("image_url") | |
| if isinstance(upscaled_url_list, list) and upscaled_url_list: | |
| return upscaled_url_list[0] | |
| else: | |
| return None | |
| elif current_status == 1: | |
| continue | |
| else: | |
| return None | |
| else: | |
| return None | |
| return None | |
| async def upscale_endpoint(): | |
| # Check for file in multipart/form-data | |
| if 'file' in request.files: | |
| file = request.files['file'] | |
| if file.filename == '': | |
| print("File filename is empty") | |
| return jsonify({"error": "No file selected"}), 400 | |
| filename = secure_filename(file.filename) | |
| image_bytes = file.read() | |
| # Fallback for raw binary data | |
| elif request.content_type and 'application/octet-stream' in request.content_type: | |
| image_bytes = request.get_data() | |
| filename = f"uploaded_{generate_uuid()}.bin" # Default filename for raw binary | |
| else: | |
| print("No file part in request.files and no raw binary data") | |
| return jsonify({"error": "No file provided"}), 400 | |
| print(f"Received file: {filename}, size: {len(image_bytes)} bytes") | |
| upscaled_url = await upscale_image(image_bytes, filename) | |
| if upscaled_url: | |
| return jsonify({"upscaled_url": upscaled_url}), 200 | |
| else: | |
| return jsonify({"error": "Upscaling failed or timed out"}), 500 | |
| if __name__ == "__main__": | |
| app.run(host="0.0.0.0", port=7860) |