Spaces:
Sleeping
Sleeping
| """Multimodal: Scotty resumable upload for Gemini image input.""" | |
| import json | |
| import base64 | |
| import urllib.request | |
| import urllib.parse | |
| import time | |
| import ssl | |
| import re | |
| from .config import CONFIG | |
| from .gemini import load_cookie, make_sapisidhash, _get_ssl_ctx, log | |
| def _get_page_tokens() -> dict: | |
| """Fetch WIZ_global_data tokens from Gemini page (Push-ID, X-Client-Pctx).""" | |
| headers = { | |
| "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36", | |
| } | |
| cookie_str, sapisid = load_cookie() | |
| if cookie_str: | |
| headers["Cookie"] = cookie_str | |
| try: | |
| req = urllib.request.Request("https://gemini.google.com/app", headers=headers) | |
| resp = urllib.request.urlopen(req, context=_get_ssl_ctx(), timeout=30) | |
| html = resp.read().decode() | |
| tokens = {} | |
| for key, pattern in [ | |
| ("push_id", r'"qKIAYe":"([^"]+)"'), | |
| ("pctx", r'"Ylro7b":"([^"]+)"'), | |
| ("at", r'"thykhd":"([^"]+)"'), | |
| ]: | |
| m = re.search(pattern, html) | |
| if m: | |
| tokens[key] = m.group(1) | |
| return tokens | |
| except Exception as e: | |
| log(f"Page token fetch failed: {e}") | |
| return {} | |
| _page_tokens_cache = {"tokens": {}, "ts": 0} | |
| def _cached_page_tokens() -> dict: | |
| now = time.time() | |
| if now - _page_tokens_cache["ts"] > 600: | |
| _page_tokens_cache["tokens"] = _get_page_tokens() | |
| _page_tokens_cache["ts"] = now | |
| return _page_tokens_cache["tokens"] | |
| def upload_image(image_bytes: bytes, filename: str = "image.png", mime_type: str = "image/png") -> str: | |
| """Upload image via Scotty resumable upload. Returns file reference path.""" | |
| tokens = _cached_page_tokens() | |
| push_id = tokens.get("push_id", "feeds/mcudyrk2a4khkz") | |
| pctx = tokens.get("pctx", "CgcSBWjK7pYx") | |
| cookie_str, sapisid = load_cookie() | |
| ctx = _get_ssl_ctx() | |
| proxy = CONFIG.get("proxy") | |
| # Step 1: Initiate resumable upload | |
| start_headers = { | |
| "Push-ID": push_id, | |
| "X-Tenant-Id": "bard-storage", | |
| "X-Client-Pctx": pctx, | |
| "X-Goog-Upload-Header-Content-Length": str(len(image_bytes)), | |
| "X-Goog-Upload-Header-Content-Type": mime_type, | |
| "X-Goog-Upload-Protocol": "resumable", | |
| "X-Goog-Upload-Command": "start", | |
| "Content-Type": "application/x-www-form-urlencoded;charset=utf-8", | |
| "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36", | |
| } | |
| if cookie_str: | |
| start_headers["Cookie"] = cookie_str | |
| if sapisid: | |
| start_headers["Authorization"] = make_sapisidhash(sapisid) | |
| start_url = "https://content-push.googleapis.com/upload/" | |
| req = urllib.request.Request(start_url, data=b"", headers=start_headers, method="POST") | |
| if proxy: | |
| opener = urllib.request.build_opener( | |
| urllib.request.ProxyHandler({"http": proxy, "https": proxy}), | |
| urllib.request.HTTPSHandler(context=ctx) | |
| ) | |
| resp = opener.open(req, timeout=30) | |
| else: | |
| resp = urllib.request.urlopen(req, context=ctx, timeout=30) | |
| upload_url = resp.headers.get("X-Goog-Upload-URL") or resp.headers.get("x-goog-upload-url") | |
| if not upload_url: | |
| raise RuntimeError(f"No upload URL in response headers: {dict(resp.headers)}") | |
| log(f"Upload session started: {upload_url[:80]}...") | |
| # Step 2: Upload file data + finalize | |
| upload_headers = { | |
| "X-Goog-Upload-Command": "upload, finalize", | |
| "X-Goog-Upload-Offset": "0", | |
| "Content-Type": "application/octet-stream", | |
| "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36", | |
| } | |
| req2 = urllib.request.Request(upload_url, data=image_bytes, headers=upload_headers, method="POST") | |
| if proxy: | |
| resp2 = opener.open(req2, timeout=60) | |
| else: | |
| resp2 = urllib.request.urlopen(req2, context=ctx, timeout=60) | |
| file_ref = resp2.read().decode().strip() | |
| if not file_ref or not file_ref.startswith("/"): | |
| raise RuntimeError(f"Invalid file reference: {file_ref[:100]}") | |
| log(f"Image uploaded: {filename} -> {file_ref[:50]}...") | |
| return file_ref | |
| def fetch_image_bytes(url: str) -> bytes: | |
| """Fetch image from URL.""" | |
| try: | |
| req = urllib.request.Request(url, headers={"User-Agent": "Mozilla/5.0"}) | |
| resp = urllib.request.urlopen(req, timeout=30) | |
| return resp.read() | |
| except Exception as e: | |
| log(f"Image fetch failed: {e}") | |
| return b"" | |