| """
|
| Gemini Web Reverse Engineering Client
|
| 支持图文请求、上下文对话,OpenAI 格式输入输出
|
| 手动配置 token,无需代码登录
|
| """
|
|
|
| import re
|
| import json
|
| import random |
| import string |
| import base64 |
| import uuid |
| import codecs |
| import httpx |
| from typing import Optional, List, Dict, Any, Union, Iterator |
| from dataclasses import dataclass, field
|
| from datetime import datetime
|
| import time
|
|
|
|
|
| class CookieExpiredError(Exception):
|
| """Cookie 过期或无效异常"""
|
| pass
|
|
|
|
|
| class ImageUploadError(Exception):
|
| """图片上传失败异常"""
|
| pass
|
|
|
|
|
| @dataclass
|
| class Message:
|
| """OpenAI 格式消息"""
|
| role: str
|
| content: Union[str, List[Dict[str, Any]]]
|
|
|
|
|
| @dataclass
|
| class ChatCompletionChoice:
|
| index: int
|
| message: Message
|
| finish_reason: str = "stop"
|
|
|
|
|
| @dataclass
|
| class Usage:
|
| prompt_tokens: int = 0
|
| completion_tokens: int = 0
|
| total_tokens: int = 0
|
|
|
|
|
| @dataclass
|
| class ChatCompletionResponse:
|
| """OpenAI 格式响应"""
|
| id: str
|
| object: str = "chat.completion"
|
| created: int = 0
|
| model: str = "gemini-web"
|
| choices: List[ChatCompletionChoice] = field(default_factory=list)
|
| usage: Usage = field(default_factory=Usage)
|
|
|
| def to_dict(self) -> dict:
|
| return {
|
| "id": self.id,
|
| "object": self.object,
|
| "created": self.created,
|
| "model": self.model,
|
| "choices": [
|
| {
|
| "index": c.index,
|
| "message": {"role": c.message.role, "content": c.message.content},
|
| "finish_reason": c.finish_reason
|
| }
|
| for c in self.choices
|
| ],
|
| "usage": {
|
| "prompt_tokens": self.usage.prompt_tokens,
|
| "completion_tokens": self.usage.completion_tokens,
|
| "total_tokens": self.usage.total_tokens
|
| }
|
| }
|
|
|
|
|
| class GeminiClient:
|
| """
|
| Gemini 网页版逆向客户端
|
|
|
| 使用方法:
|
| 1. 打开 https://gemini.google.com 并登录
|
| 2. F12 打开开发者工具 -> Application -> Cookies
|
| 3. 复制以下 cookie 值:
|
| - __Secure-1PSID
|
| - __Secure-1PSIDTS (可选)
|
| 4. Network 标签 -> 找任意请求 -> 复制 SNlM0e 值 (在页面源码中搜索)
|
| """
|
|
|
| BASE_URL = "https://gemini.google.com"
|
|
|
| def __init__(
|
| self,
|
| secure_1psid: str,
|
| secure_1psidts: str = None,
|
| secure_1psidcc: str = None,
|
| snlm0e: str = None,
|
| bl: str = None,
|
| cookies_str: str = None,
|
| push_id: str = None,
|
| model_ids: dict = None,
|
| debug: bool = False,
|
| media_base_url: str = None,
|
| ):
|
| """
|
| 初始化客户端 - 手动填写 token
|
|
|
| Args:
|
| secure_1psid: __Secure-1PSID cookie (必填)
|
| secure_1psidts: __Secure-1PSIDTS cookie (推荐)
|
| secure_1psidcc: __Secure-1PSIDCC cookie (推荐)
|
| snlm0e: SNlM0e token (必填,从页面源码获取)
|
| bl: BL 版本号 (可选,自动获取)
|
| cookies_str: 完整 cookie 字符串 (可选,替代单独设置)
|
| push_id: Push ID for image upload (必填用于图片上传)
|
| model_ids: 模型 ID 映射 {"flash": "xxx", "pro": "xxx", "thinking": "xxx"}
|
| debug: 是否打印调试信息
|
| media_base_url: 媒体文件的基础 URL (如 http://localhost:8000),用于构建完整的媒体访问 URL
|
| """
|
| self.secure_1psid = secure_1psid
|
| self.secure_1psidts = secure_1psidts
|
| self.secure_1psidcc = secure_1psidcc
|
| self.snlm0e = snlm0e
|
| self.bl = bl
|
| self.push_id = push_id
|
| self.debug = debug
|
| self.media_base_url = media_base_url or ""
|
|
|
|
|
| self.model_ids = model_ids or {
|
| "flash": "56fdd199312815e2",
|
| "pro": "e6fa609c3fa255c0",
|
| "thinking": "e051ce1aa80aa576",
|
| }
|
|
|
| self.session = httpx.Client(
|
| timeout=1220.0,
|
| follow_redirects=True,
|
| headers={
|
| "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36",
|
| "Content-Type": "application/x-www-form-urlencoded;charset=UTF-8",
|
| "Origin": self.BASE_URL,
|
| "Referer": f"{self.BASE_URL}/",
|
| "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
|
| },
|
| )
|
|
|
|
|
| if cookies_str:
|
| self._set_cookies_from_string(cookies_str)
|
| else:
|
| self.session.cookies.set("__Secure-1PSID", secure_1psid, domain=".google.com")
|
| if secure_1psidts:
|
| self.session.cookies.set("__Secure-1PSIDTS", secure_1psidts, domain=".google.com")
|
| if secure_1psidcc:
|
| self.session.cookies.set("__Secure-1PSIDCC", secure_1psidcc, domain=".google.com")
|
|
|
|
|
| self.conversation_id: str = ""
|
| self.response_id: str = ""
|
| self.choice_id: str = ""
|
| self.request_count: int = 0
|
|
|
|
|
| self.messages: List[Message] = []
|
|
|
|
|
| if not self.snlm0e:
|
| raise ValueError(
|
| "SNlM0e 是必填参数!\n"
|
| "获取方法:\n"
|
| "1. 打开 https://gemini.google.com 并登录\n"
|
| "2. F12 -> 查看页面源代码 (Ctrl+U)\n"
|
| "3. 搜索 'SNlM0e' 找到类似: \"SNlM0e\":\"xxxxxx\"\n"
|
| "4. 复制引号内的值"
|
| )
|
|
|
|
|
| if not self.bl:
|
| self._fetch_bl()
|
|
|
| def _set_cookies_from_string(self, cookies_str: str):
|
| """从完整 cookie 字符串解析"""
|
| for item in cookies_str.split(";"):
|
| item = item.strip()
|
| if "=" in item:
|
| key, value = item.split("=", 1)
|
| self.session.cookies.set(key.strip(), value.strip(), domain=".google.com")
|
|
|
| def _fetch_bl(self):
|
| """获取 BL 版本号"""
|
| try:
|
| resp = self.session.get(self.BASE_URL)
|
| match = re.search(r'"cfb2h":"([^"]+)"', resp.text)
|
| if match:
|
| self.bl = match.group(1)
|
| else:
|
|
|
| self.bl = "boq_assistant-bard-web-server_20241209.00_p0"
|
| if self.debug:
|
| print(f"[DEBUG] BL: {self.bl}")
|
| except Exception as e:
|
| self.bl = "boq_assistant-bard-web-server_20241209.00_p0"
|
| if self.debug:
|
| print(f"[DEBUG] 获取 BL 失败,使用默认值: {e}")
|
|
|
| def refresh_tokens(self) -> dict:
|
| """
|
| 刷新 token (SNlM0e 和 push_id)
|
|
|
| Returns:
|
| dict: {"success": bool, "snlm0e": str, "push_id": str, "error": str}
|
| """
|
| result = {"success": False, "snlm0e": "", "push_id": "", "error": ""}
|
|
|
| try:
|
| if self.debug:
|
| print("[DEBUG] 开始刷新 token...")
|
|
|
|
|
| resp = self.session.get(self.BASE_URL)
|
|
|
| if resp.status_code != 200:
|
| result["error"] = f"访问 Gemini 失败: HTTP {resp.status_code}"
|
| return result
|
|
|
| html = resp.text
|
|
|
|
|
| snlm0e_patterns = [
|
| r'"SNlM0e":"([^"]+)"',
|
| r'SNlM0e["\s:]+["\']([^"\']+)["\']',
|
| r'"at":"([^"]+)"',
|
| ]
|
| new_snlm0e = ""
|
| for pattern in snlm0e_patterns:
|
| match = re.search(pattern, html)
|
| if match:
|
| new_snlm0e = match.group(1)
|
| break
|
|
|
| if new_snlm0e:
|
| self.snlm0e = new_snlm0e
|
| result["snlm0e"] = new_snlm0e
|
| if self.debug:
|
| print(f"[DEBUG] SNlM0e 已刷新: {new_snlm0e[:30]}...")
|
|
|
|
|
| push_id_patterns = [
|
| r'"push[_-]?id["\s:]+["\'](feeds/[a-z0-9]+)["\']',
|
| r'push[_-]?id["\s:=]+["\'](feeds/[a-z0-9]+)["\']',
|
| r'feedName["\s:]+["\'](feeds/[a-z0-9]+)["\']',
|
| r'clientId["\s:]+["\'](feeds/[a-z0-9]+)["\']',
|
| r'(feeds/[a-z0-9]{14,})',
|
| ]
|
| new_push_id = ""
|
| for pattern in push_id_patterns:
|
| matches = re.findall(pattern, html, re.IGNORECASE)
|
| if matches:
|
| new_push_id = matches[0]
|
| break
|
|
|
| if new_push_id:
|
| self.push_id = new_push_id
|
| result["push_id"] = new_push_id
|
| if self.debug:
|
| print(f"[DEBUG] push_id 已刷新: {new_push_id}")
|
|
|
|
|
| match = re.search(r'"cfb2h":"([^"]+)"', html)
|
| if match:
|
| self.bl = match.group(1)
|
|
|
| result["success"] = bool(new_snlm0e)
|
| if not new_snlm0e:
|
| result["error"] = "无法从页面提取 SNlM0e,Cookie 可能已完全失效"
|
|
|
| return result
|
|
|
| except Exception as e:
|
| result["error"] = f"刷新 token 失败: {str(e)}"
|
| if self.debug:
|
| print(f"[DEBUG] 刷新失败: {e}")
|
| return result
|
|
|
| def check_token_valid(self) -> bool:
|
| """
|
| 检查当前 token 是否有效
|
|
|
| Returns:
|
| bool: True 表示有效,False 表示需要刷新
|
| """
|
| try:
|
| resp = self.session.get(self.BASE_URL, timeout=10.0)
|
| if resp.status_code != 200:
|
| return False
|
|
|
|
|
| if 'SNlM0e' not in resp.text:
|
| return False
|
|
|
| return True
|
| except Exception:
|
| return False
|
|
|
|
|
| def _parse_content(self, content: Union[str, List[Dict]]) -> tuple:
|
| """解析 OpenAI 格式 content,返回 (text, images)"""
|
| if isinstance(content, str):
|
| return content, []
|
|
|
| text_parts = []
|
| images = []
|
|
|
| for item in content:
|
| if item.get("type") == "text":
|
| text_parts.append(item.get("text", ""))
|
| elif item.get("type") == "image_url":
|
|
|
| image_url_data = item.get("image_url", {})
|
| if isinstance(image_url_data, str):
|
| url = image_url_data
|
| else:
|
| url = image_url_data.get("url", "")
|
|
|
| if not url:
|
| continue
|
|
|
| if url.startswith("data:"):
|
|
|
| match = re.match(r'data:([^;]+);base64,(.+)', url)
|
| if match:
|
| images.append({"mime_type": match.group(1), "data": match.group(2)})
|
| elif url.startswith("http://") or url.startswith("https://"):
|
|
|
| try:
|
| if self.debug:
|
| print(f"[DEBUG] 尝试下载图片 URL: {url[:100]}...")
|
| resp = httpx.get(url, timeout=30, follow_redirects=True)
|
| if resp.status_code == 200:
|
| mime = resp.headers.get("content-type", "image/jpeg").split(";")[0]
|
| images.append({"mime_type": mime, "data": base64.b64encode(resp.content).decode()})
|
| if self.debug:
|
| print(f"[DEBUG] 图片下载成功: {len(resp.content)} bytes, mime: {mime}")
|
| else:
|
| if self.debug:
|
| print(f"[DEBUG] 图片下载失败: HTTP {resp.status_code}")
|
| except Exception as e:
|
| if self.debug:
|
| print(f"[DEBUG] 下载图片失败: {e}")
|
| else:
|
|
|
| try:
|
|
|
| base64.b64decode(url[:100])
|
| images.append({"mime_type": "image/png", "data": url})
|
| if self.debug:
|
| print(f"[DEBUG] 检测到纯 base64 图片数据")
|
| except:
|
| if self.debug:
|
| print(f"[DEBUG] 无法识别的图片格式: {url[:50]}...")
|
|
|
| return " ".join(text_parts) if text_parts else "", images
|
|
|
| def _upload_image(self, image_data: bytes, mime_type: str = "image/jpeg") -> str:
|
| """
|
| 上传图片到 Gemini 服务器
|
|
|
| Args:
|
| image_data: 图片二进制数据
|
| mime_type: 图片 MIME 类型
|
|
|
| Returns:
|
| str: 上传后的图片路径(带 token)
|
| """
|
| if not self.push_id:
|
| raise CookieExpiredError(
|
| "图片上传需要 push_id\n"
|
| "获取方法: 运行 python get_push_id.py 或从浏览器 Network 中获取"
|
| )
|
|
|
| try:
|
| upload_url = "https://push.clients6.google.com/upload/"
|
| filename = f"image_{random.randint(100000, 999999)}.png"
|
|
|
|
|
| browser_headers = {
|
| "accept": "*/*",
|
| "accept-language": "zh-CN,zh;q=0.9,en;q=0.8",
|
| "origin": "https://gemini.google.com",
|
| "referer": "https://gemini.google.com/",
|
| "sec-fetch-dest": "empty",
|
| "sec-fetch-mode": "cors",
|
| "sec-fetch-site": "same-site",
|
| "x-browser-channel": "stable",
|
| "x-browser-copyright": "Copyright 2025 Google LLC. All Rights reserved.",
|
| "x-browser-validation": "Aj9fzfu+SaGLBY9Oqr3S7RokOtM=",
|
| "x-browser-year": "2025",
|
| "x-client-data": "CIa2yQEIpbbJAQipncoBCNvaygEIk6HLAQiFoM0BCJaMzwEIkZHPAQiSpM8BGOyFzwEYsobPAQ==",
|
| }
|
|
|
|
|
| init_headers = {
|
| **browser_headers,
|
| "content-type": "application/x-www-form-urlencoded;charset=utf-8",
|
| "push-id": self.push_id,
|
| "x-goog-upload-command": "start",
|
| "x-goog-upload-header-content-length": str(len(image_data)),
|
| "x-goog-upload-protocol": "resumable",
|
| "x-tenant-id": "bard-storage",
|
| }
|
|
|
| init_resp = self.session.post(upload_url, data={"File name": filename}, headers=init_headers)
|
|
|
| if self.debug:
|
| print(f"[DEBUG] 初始化上传状态: {init_resp.status_code}")
|
|
|
|
|
| if init_resp.status_code == 401 or init_resp.status_code == 403:
|
| raise CookieExpiredError(
|
| f"Cookie 已过期或无效 (HTTP {init_resp.status_code})\n"
|
| "请重新获取以下信息:\n"
|
| "1. __Secure-1PSID\n"
|
| "2. __Secure-1PSIDTS\n"
|
| "3. SNlM0e\n"
|
| "4. push_id"
|
| )
|
|
|
| upload_id = init_resp.headers.get("x-guploader-uploadid")
|
| if not upload_id:
|
| raise CookieExpiredError(
|
| f"未获取到 upload_id (状态码: {init_resp.status_code})\n"
|
| "可能原因: Cookie 已过期,请重新获取所有 token"
|
| )
|
|
|
| if self.debug:
|
| print(f"[DEBUG] Upload ID: {upload_id[:50]}...")
|
|
|
|
|
| final_upload_url = f"{upload_url}?upload_id={upload_id}&upload_protocol=resumable"
|
|
|
| upload_headers = {
|
| **browser_headers,
|
| "content-type": "application/x-www-form-urlencoded;charset=utf-8",
|
| "push-id": self.push_id,
|
| "x-goog-upload-command": "upload, finalize",
|
| "x-goog-upload-offset": "0",
|
| "x-tenant-id": "bard-storage",
|
| "x-client-pctx": "CgcSBWjK7pYx",
|
| }
|
|
|
| upload_resp = self.session.post(
|
| final_upload_url,
|
| headers=upload_headers,
|
| content=image_data
|
| )
|
|
|
| if self.debug:
|
| print(f"[DEBUG] 上传数据状态: {upload_resp.status_code}")
|
| print(f"[DEBUG] 响应头: {dict(upload_resp.headers)}")
|
| print(f"[DEBUG] 响应内容完整: {upload_resp.text}")
|
|
|
|
|
| if upload_resp.status_code == 401 or upload_resp.status_code == 403:
|
| raise CookieExpiredError(
|
| f"上传图片认证失败 (HTTP {upload_resp.status_code})\n"
|
| "Cookie 已过期,请重新获取"
|
| )
|
|
|
| if upload_resp.status_code != 200:
|
| raise Exception(f"上传图片数据失败: {upload_resp.status_code}, 响应: {upload_resp.text[:200] if upload_resp.text else '(empty)'}")
|
|
|
|
|
| response_text = upload_resp.text
|
| image_path = None
|
|
|
|
|
| try:
|
| response_json = json.loads(response_text)
|
| image_path = self._extract_image_path(response_json)
|
| except json.JSONDecodeError:
|
|
|
| match = re.search(r'/contrib_service/[^\s"\']+', response_text)
|
| if match:
|
| image_path = match.group(0)
|
|
|
|
|
| if not image_path:
|
| raise CookieExpiredError(
|
| f"无法从响应中提取图片路径\n"
|
| f"响应内容: {response_text[:300]}\n"
|
| "可能原因: Cookie 已过期,请重新获取所有 token"
|
| )
|
|
|
|
|
| if "/contrib_service/" in image_path:
|
|
|
| if len(image_path) < 40:
|
| raise CookieExpiredError(
|
| f"图片路径不完整\n"
|
| f"返回路径: {image_path}\n"
|
| "原因: Cookie 已过期或权限不足\n"
|
| "解决方法:\n"
|
| "1. 重新登录 https://gemini.google.com\n"
|
| "2. 更新 config.py 中的所有 token:\n"
|
| " - SECURE_1PSID\n"
|
| " - SECURE_1PSIDTS\n"
|
| " - SNLM0E\n"
|
| " - PUSH_ID"
|
| )
|
|
|
| if self.debug:
|
| print(f"[DEBUG] 图片路径: {image_path}")
|
|
|
| return image_path
|
|
|
| except CookieExpiredError:
|
| raise
|
| except Exception as e:
|
| if self.debug:
|
| print(f"[DEBUG] 上传失败: {e}")
|
| raise Exception(f"图片上传失败: {e}")
|
|
|
| def _extract_image_path(self, data: Any) -> str:
|
| """从响应数据中递归提取图片路径"""
|
| if isinstance(data, str):
|
| if data.startswith("/contrib_service/"):
|
| return data
|
| elif isinstance(data, dict):
|
| for value in data.values():
|
| result = self._extract_image_path(value)
|
| if result:
|
| return result
|
| elif isinstance(data, list):
|
| for item in data:
|
| result = self._extract_image_path(item)
|
| if result:
|
| return result
|
| return None
|
|
|
| def _build_request_data(self, text: str, images: List[Dict] = None, image_paths: List[str] = None, model: str = None) -> str:
|
| """构建请求数据 - 基于真实请求格式"""
|
|
|
| conv_id = self.conversation_id or ""
|
| resp_id = self.response_id or ""
|
| choice_id = self.choice_id or ""
|
|
|
|
|
| image_data = None
|
| if image_paths and len(image_paths) > 0:
|
| image_data = []
|
| for i, path in enumerate(image_paths):
|
| mime_type = images[i]["mime_type"] if images and i < len(images) else "image/png"
|
| filename = f"image_{random.randint(100000, 999999)}.png"
|
| image_data.append([[path, 1, None, mime_type], filename])
|
|
|
|
|
| session_id = str(uuid.uuid4()).upper()
|
| timestamp = int(time.time() * 1000)
|
|
|
|
|
|
|
|
|
|
|
| model_code = [[1]]
|
| if model:
|
| model_lower = model.lower()
|
| if "pro" in model_lower:
|
| model_code = [[0]]
|
| elif "thinking" in model_lower or "think" in model_lower:
|
| model_code = [[3]]
|
|
|
|
|
|
|
|
|
| inner_data = [
|
| [text, 0, None, image_data, None, None, 0],
|
| ["zh-CN"],
|
| [conv_id, resp_id, choice_id, None, None, None, None, None, None, ""],
|
| self.snlm0e,
|
| None,
|
| None,
|
| [1],
|
| 1,
|
| None,
|
| None,
|
| 1,
|
| 0,
|
| None,
|
| None,
|
| None,
|
| None,
|
| None,
|
| model_code,
|
| 0,
|
| None,
|
| None,
|
| None,
|
| None,
|
| None,
|
| None,
|
| None,
|
| None,
|
| 1,
|
| None,
|
| None,
|
| [4],
|
| None,
|
| None,
|
| None,
|
| None,
|
| None,
|
| None,
|
| None,
|
| None,
|
| None,
|
| None,
|
| [1],
|
| None,
|
| None,
|
| None,
|
| None,
|
| None,
|
| None,
|
| None,
|
| None,
|
| None,
|
| None,
|
| None,
|
| 0,
|
| None,
|
| None,
|
| None,
|
| None,
|
| None,
|
| session_id,
|
| None,
|
| [],
|
| None,
|
| None,
|
| None,
|
| None,
|
| [timestamp // 1000, (timestamp % 1000) * 1000000]
|
| ]
|
|
|
|
|
| inner_json = json.dumps(inner_data, ensure_ascii=False, separators=(',', ':'))
|
|
|
|
|
| outer_data = [None, inner_json]
|
| f_req_value = json.dumps(outer_data, ensure_ascii=False, separators=(',', ':'))
|
|
|
| return f_req_value
|
|
|
|
|
| def _parse_response(self, response_text: str) -> str:
|
| """解析响应文本 - 修复版"""
|
| try:
|
|
|
| lines = response_text.split("\n")
|
| final_text = ""
|
| generated_images_set = set()
|
| last_inner_json = None
|
|
|
| for line in lines:
|
| line = line.strip()
|
| if not line or line.startswith(")]}'"):
|
| continue
|
|
|
|
|
| if line.isdigit():
|
| continue
|
|
|
| try:
|
| data = json.loads(line)
|
|
|
| if isinstance(data, list) and len(data) > 0 and isinstance(data[0], list):
|
| actual_data = data[0]
|
|
|
| if len(actual_data) >= 3 and actual_data[0] == "wrb.fr" and actual_data[2]:
|
| inner_json = json.loads(actual_data[2])
|
| last_inner_json = inner_json
|
|
|
|
|
| imgs = self._extract_generated_images(inner_json)
|
| if imgs:
|
| for img in imgs:
|
| generated_images_set.add(img)
|
| if self.debug:
|
| print(f"[DEBUG] 从响应中提取到 {len(imgs)} 个图片 URL,当前总数: {len(generated_images_set)}")
|
|
|
|
|
| if inner_json and len(inner_json) > 4 and inner_json[4]:
|
| candidates = inner_json[4]
|
| if candidates and len(candidates) > 0:
|
| candidate = candidates[0]
|
| if candidate and len(candidate) > 1 and candidate[1]:
|
|
|
| text = candidate[1][0] if isinstance(candidate[1], list) else candidate[1]
|
| if isinstance(text, str) and len(text) > len(final_text):
|
| final_text = text
|
|
|
| if len(inner_json) > 1 and inner_json[1]:
|
| if isinstance(inner_json[1], list):
|
| if len(inner_json[1]) > 0:
|
| self.conversation_id = inner_json[1][0] or self.conversation_id
|
| if len(inner_json[1]) > 1:
|
| self.response_id = inner_json[1][1] or self.response_id
|
| if len(candidate) > 0:
|
| self.choice_id = candidate[0] or self.choice_id
|
| except Exception as e:
|
| if self.debug:
|
| print(f"[DEBUG] 解析行时出错: {e}")
|
| continue
|
|
|
|
|
| generated_images = list(generated_images_set)
|
|
|
| if self.debug:
|
| print(f"[DEBUG] 解析完成: final_text长度={len(final_text)}, 图片数量={len(generated_images)}")
|
|
|
|
|
| if generated_images:
|
| if self.debug:
|
| print(f"[DEBUG] 提取到 {len(generated_images)} 个媒体 URL,开始下载...")
|
|
|
|
|
| local_media_urls = []
|
| for i, url in enumerate(generated_images):
|
| if self.debug:
|
| print(f"[DEBUG] 下载媒体 {i+1}/{len(generated_images)}: {url[:80]}...")
|
| local_url = self._download_media_as_data_url(url)
|
| if local_url:
|
| local_media_urls.append(local_url)
|
| if self.debug:
|
| print(f"[DEBUG] 媒体 {i+1} 下载成功: {local_url}")
|
| else:
|
|
|
| local_media_urls.append(url)
|
| if self.debug:
|
| print(f"[DEBUG] 媒体 {i+1} 下载失败,使用原始 URL")
|
|
|
|
|
| has_placeholder = False
|
| if final_text:
|
| has_placeholder = ('image_generation_content' in final_text or
|
| 'video_gen_chip' in final_text)
|
|
|
|
|
| media_parts = []
|
| for i, url in enumerate(local_media_urls):
|
| media_parts.append(f"")
|
|
|
| media_text = "\n\n".join(media_parts)
|
|
|
| if has_placeholder:
|
|
|
| cleaned_text = re.sub(r'https?://googleusercontent\.com/(?:image_generation_content|video_gen_chip)/\d+', '', final_text)
|
| cleaned_text = re.sub(r'http://googleusercontent\.com/(?:image_generation_content|video_gen_chip)/\d+', '', cleaned_text)
|
| cleaned_text = re.sub(r'!\[.*?\]\(\)', '', cleaned_text)
|
| cleaned_text = cleaned_text.strip()
|
| if cleaned_text:
|
| final_text = cleaned_text + "\n\n" + media_text
|
| else:
|
| final_text = media_text
|
| elif final_text:
|
|
|
| final_text = final_text + "\n\n" + media_text
|
| else:
|
|
|
| final_text = media_text
|
|
|
| if self.debug:
|
| print(f"[DEBUG] 媒体处理完成,成功下载 {len([u for u in local_media_urls if u.startswith('/media/')])} 个")
|
|
|
|
|
| is_video_generation = False
|
| if final_text and 'video_gen_chip' in final_text:
|
| is_video_generation = True
|
|
|
|
|
| if final_text:
|
|
|
| final_text = re.sub(r'https?://googleusercontent\.com/(?:image_generation_content|video_gen_chip)/\d+\s*', '', final_text)
|
| final_text = re.sub(r'http://googleusercontent\.com/(?:image_generation_content|video_gen_chip)/\d+\s*', '', final_text)
|
|
|
| final_text = re.sub(r'!\[[^\]]*\]\(https://[^)]*googleusercontent\.com/gg/[^)]+\)', '', final_text)
|
| final_text = re.sub(r'https://lh3\.googleusercontent\.com/gg/[^\s\)]+', '', final_text)
|
| final_text = final_text.strip()
|
|
|
|
|
| if is_video_generation:
|
| video_notice = "\n\n---\n📹 视频为异步生成,生成结果可在官网聊天窗口查看下载。\n\n⏱️ 使用限制:\n- 视频生成 (Veo 模型):每天总共可以生成 3 次\n- 图片生成 (Nano Banana 模型):每天总共可以生成 1000 次"
|
| if final_text:
|
| final_text = final_text + video_notice
|
| else:
|
| final_text = video_notice.strip()
|
|
|
| if final_text:
|
|
|
| final_text = self._optimize_image_urls(final_text)
|
| return final_text
|
|
|
|
|
| if self.debug and last_inner_json:
|
| print(f"[DEBUG] 无法提取内容,inner_json 结构: {str(last_inner_json)[:500]}...")
|
|
|
| except Exception as e:
|
| if self.debug:
|
| print(f"[DEBUG] 解析错误: {e}")
|
|
|
| return "无法解析响应"
|
|
|
| def _extract_generated_media(self, data: Any, depth: int = 0) -> List[str]:
|
| """从响应数据中递归提取生成的图片/视频 URL
|
|
|
| Gemini 会返回两个媒体(带水印和不带水印),我们只保留最后一个(不带水印)
|
| 只提取 AI 生成的媒体 (/gg-dl/ 路径),不提取用户上传的图片 (/gg/ 路径)
|
| """
|
| if depth > 30:
|
| return []
|
|
|
| media_urls = []
|
|
|
| if isinstance(data, list):
|
|
|
|
|
| if (len(data) >= 1 and
|
| isinstance(data[0], list) and len(data[0]) >= 4 and
|
| data[0][0] is None and
|
| isinstance(data[0][1], int) and
|
| isinstance(data[0][2], str) and
|
| isinstance(data[0][3], str) and
|
| data[0][3].startswith('https://') and
|
| 'gg-dl/' in data[0][3]):
|
|
|
| second_url = None
|
| if len(data) >= 4 and isinstance(data[3], list) and len(data[3]) >= 4:
|
| if (data[3][0] is None and
|
| isinstance(data[3][3], str) and
|
| 'gg-dl/' in data[3][3]):
|
| second_url = data[3][3]
|
|
|
|
|
| url = second_url if second_url else data[0][3]
|
| if 'image_generation_content' not in url and 'video_gen_chip' not in url:
|
| media_urls.append(url)
|
| return media_urls
|
|
|
|
|
| if (len(data) >= 4 and
|
| data[0] is None and
|
| isinstance(data[1], int) and
|
| isinstance(data[2], str) and
|
| isinstance(data[3], str) and
|
| data[3].startswith('https://') and
|
| 'gg-dl/' in data[3]):
|
| url = data[3]
|
| if 'image_generation_content' not in url and 'video_gen_chip' not in url:
|
| media_urls.append(url)
|
| return media_urls
|
|
|
|
|
| all_found = []
|
| for item in data:
|
| found = self._extract_generated_media(item, depth + 1)
|
| if found:
|
| all_found.extend(found)
|
|
|
|
|
| if all_found:
|
| seen = set()
|
| unique = []
|
| for u in all_found:
|
| if u not in seen:
|
| seen.add(u)
|
| unique.append(u)
|
|
|
| return [unique[-1]] if unique else []
|
|
|
| elif isinstance(data, dict):
|
| for value in data.values():
|
| found = self._extract_generated_media(value, depth + 1)
|
| if found:
|
| return found
|
|
|
| return media_urls
|
|
|
|
|
| def _extract_generated_images(self, data: Any, depth: int = 0) -> List[str]:
|
| """向后兼容的别名"""
|
| return self._extract_generated_media(data, depth)
|
|
|
| def _download_media_as_data_url(self, url: str) -> str:
|
| """下载媒体文件并保存到本地缓存,返回本地代理 URL
|
|
|
| Args:
|
| url: 媒体文件的 URL
|
|
|
| Returns:
|
| str: 本地代理 URL 或 base64 data URL
|
| 下载失败时返回空字符串
|
| """
|
| try:
|
|
|
| if ("googleusercontent" in url or "ggpht" in url) and not any(ext in url.lower() for ext in ['.mp4', '.webm', 'video']):
|
|
|
| url = re.sub(r'=w\d+(-h\d+)?(-[a-zA-Z]+)*$', '=s0', url)
|
| url = re.sub(r'=s\d+(-[a-zA-Z]+)*$', '=s0', url)
|
| url = re.sub(r'=h\d+(-[a-zA-Z]+)*$', '=s0', url)
|
|
|
| if not url.endswith('=s0') and '=' not in url.split('/')[-1]:
|
| url += '=s0'
|
|
|
| if self.debug:
|
| print(f"[DEBUG] 正在下载媒体 (高清): {url[:100]}...")
|
|
|
|
|
| headers = {
|
| "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36",
|
| "Accept": "image/webp,image/apng,image/*,*/*;q=0.8",
|
| "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
|
| "Referer": "https://gemini.google.com/",
|
| }
|
| resp = self.session.get(url, timeout=60.0, headers=headers)
|
|
|
| if self.debug:
|
| print(f"[DEBUG] 下载状态: {resp.status_code}, 大小: {len(resp.content)} bytes")
|
|
|
| if resp.status_code != 200:
|
| if self.debug:
|
| print(f"[DEBUG] 下载媒体失败: HTTP {resp.status_code}")
|
| return ""
|
|
|
|
|
| if len(resp.content) < 100:
|
| if self.debug:
|
| print(f"[DEBUG] 下载内容太小,可能是错误: {resp.content[:100]}")
|
| return ""
|
|
|
|
|
| content = resp.content
|
| if content[:8] == b'\x89PNG\r\n\x1a\n':
|
| ext = ".png"
|
| mime = "image/png"
|
| elif content[:3] == b'\xff\xd8\xff':
|
| ext = ".jpg"
|
| mime = "image/jpeg"
|
| elif content[:6] in (b'GIF87a', b'GIF89a'):
|
| ext = ".gif"
|
| mime = "image/gif"
|
| elif content[:4] == b'RIFF' and content[8:12] == b'WEBP':
|
| ext = ".webp"
|
| mime = "image/webp"
|
| elif content[4:8] == b'ftyp' or content[:4] == b'\x00\x00\x00\x1c':
|
| ext = ".mp4"
|
| mime = "video/mp4"
|
| else:
|
| ext = ".png"
|
| mime = "image/png"
|
|
|
|
|
| import os
|
| media_id = f"gen_{uuid.uuid4().hex[:16]}"
|
|
|
|
|
| cache_dir = os.path.join(os.path.dirname(__file__), "media_cache")
|
| os.makedirs(cache_dir, exist_ok=True)
|
| file_path = os.path.join(cache_dir, media_id + ext)
|
|
|
| with open(file_path, "wb") as f:
|
| f.write(content)
|
|
|
| if self.debug:
|
| print(f"[DEBUG] 媒体已保存: {file_path}")
|
|
|
|
|
| media_path = f"/media/{media_id}{ext}"
|
| if self.media_base_url:
|
| return f"{self.media_base_url}{media_path}"
|
| return media_path
|
|
|
| except Exception as e:
|
| if self.debug:
|
| print(f"[DEBUG] 下载媒体异常: {e}")
|
| return ""
|
|
|
| def _optimize_image_urls(self, text: str) -> str:
|
| """优化文本中的 Google 图片 URL 为原始高清尺寸
|
|
|
| Google 图片 URL 参数说明:
|
| - =w400 或 =h400: 指定宽度或高度
|
| - =s400: 指定最大边长
|
| - =s0 或 =w0-h0: 原始尺寸
|
| """
|
| import re
|
|
|
| def optimize_url(url: str) -> str:
|
|
|
| if "googleusercontent" not in url and "ggpht" not in url:
|
| return url
|
|
|
| url = re.sub(r'=w\d+(-h\d+)?(-[a-zA-Z]+)*$', '=s0', url)
|
| url = re.sub(r'=s\d+(-[a-zA-Z]+)*$', '=s0', url)
|
| url = re.sub(r'=h\d+(-[a-zA-Z]+)*$', '=s0', url)
|
|
|
| if not url.endswith('=s0') and '=' not in url.split('/')[-1]:
|
| url += '=s0'
|
| return url
|
|
|
|
|
|
|
| def replace_md_img(match):
|
| alt = match.group(1)
|
| url = match.group(2)
|
| return f"})"
|
|
|
| text = re.sub(r'!\[([^\]]*)\]\(([^)]+)\)', replace_md_img, text)
|
|
|
|
|
| def replace_url(match):
|
| return optimize_url(match.group(0))
|
|
|
| text = re.sub(r'https?://[^\s\)]+(?:googleusercontent|ggpht)[^\s\)]*', replace_url, text)
|
|
|
| return text
|
|
|
|
|
| def _extract_text(self, parsed_data: list) -> str:
|
| """从解析后的数据中提取文本"""
|
| try:
|
|
|
| if parsed_data and len(parsed_data) > 1:
|
| if parsed_data[1] and len(parsed_data[1]) > 0:
|
| self.conversation_id = parsed_data[1][0] or self.conversation_id
|
| if parsed_data[1] and len(parsed_data[1]) > 1:
|
| self.response_id = parsed_data[1][1] or self.response_id
|
|
|
|
|
| if parsed_data and len(parsed_data) > 4 and parsed_data[4]:
|
| candidates = parsed_data[4]
|
| if candidates and len(candidates) > 0:
|
| first_candidate = candidates[0]
|
| if first_candidate and len(first_candidate) > 1:
|
| self.choice_id = first_candidate[0] or self.choice_id
|
| content_parts = first_candidate[1]
|
| if content_parts and len(content_parts) > 0:
|
| return content_parts[0] if isinstance(content_parts[0], str) else str(content_parts[0])
|
|
|
|
|
| if parsed_data and len(parsed_data) > 0:
|
| def find_text(obj, depth=0):
|
| if depth > 10:
|
| return None
|
| if isinstance(obj, str) and len(obj) > 50:
|
| return obj
|
| if isinstance(obj, list):
|
| for item in obj:
|
| result = find_text(item, depth + 1)
|
| if result:
|
| return result
|
| return None
|
|
|
| text = find_text(parsed_data)
|
| if text:
|
| return text
|
|
|
| except Exception as e:
|
| pass
|
|
|
| return "无法提取回复内容"
|
|
|
| def chat(
|
| self,
|
| messages: List[Dict[str, Any]] = None,
|
| message: str = None,
|
| image: bytes = None,
|
| image_url: str = None,
|
| reset_context: bool = False,
|
| model: str = None, |
| stream: bool = False, |
| ) -> Union[ChatCompletionResponse, Iterator[Dict[str, Any]]]: |
| """
|
| 发送聊天请求 (OpenAI 兼容格式)
|
|
|
| Args:
|
| messages: OpenAI 格式消息列表
|
| message: 简单文本消息 (与 messages 二选一)
|
| image: 图片二进制数据
|
| image_url: 图片 URL
|
| reset_context: 是否重置上下文
|
| model: 模型名称 (gemini-3.0-flash/gemini-3.0-flash-thinking/gemini-3.0-pro)
|
|
|
| Returns:
|
| ChatCompletionResponse: OpenAI 格式响应
|
| """
|
| if reset_context:
|
| self.reset()
|
|
|
|
|
| text_parts = []
|
| images = []
|
|
|
| if messages:
|
|
|
| for msg in messages:
|
| role = msg.get("role", "user")
|
| content = msg.get("content", "")
|
|
|
| if role == "user":
|
| t, imgs = self._parse_content(content)
|
| if t:
|
| text_parts.append(t)
|
| if imgs:
|
| images.extend(imgs)
|
| elif role == "assistant":
|
|
|
| if isinstance(content, str) and content:
|
| text_parts.append(f"[助手回复]: {content}")
|
| elif role == "system":
|
|
|
| if isinstance(content, str) and content:
|
| text_parts.insert(0, content)
|
|
|
| self.messages.append(Message(role=role, content=content))
|
|
|
| text = "\n\n".join(text_parts)
|
| elif message:
|
| text = message
|
| self.messages.append(Message(role="user", content=message))
|
|
|
| if image:
|
| images = [{"mime_type": "image/jpeg", "data": base64.b64encode(image).decode()}]
|
| elif image_url:
|
| if image_url.startswith("data:"):
|
| match = re.match(r'data:([^;]+);base64,(.+)', image_url)
|
| if match:
|
| images = [{"mime_type": match.group(1), "data": match.group(2)}]
|
| else:
|
| try:
|
| resp = httpx.get(image_url, timeout=30)
|
| mime = resp.headers.get("content-type", "image/jpeg").split(";")[0]
|
| images = [{"mime_type": mime, "data": base64.b64encode(resp.content).decode()}]
|
| except:
|
| pass
|
| else:
|
| text = ""
|
|
|
| if not text:
|
| raise ValueError("消息内容不能为空")
|
|
|
|
|
| if stream: |
| return self._send_request_stream(text, images, model) |
| return self._send_request(text, images, model) |
|
|
|
|
| def _log_gemini_call(self, request_data: dict, response_text: str, error: str = None):
|
| """记录 Gemini 内部调用日志"""
|
| import datetime
|
| log_entry = {
|
| "timestamp": datetime.datetime.now().isoformat(),
|
| "type": "gemini_internal",
|
| "request": request_data,
|
| "response_raw": response_text,
|
| "error": error
|
| }
|
| try:
|
| with open("api_logs.json", "a", encoding="utf-8") as f:
|
| f.write(json.dumps(log_entry, ensure_ascii=False, indent=2) + "\n---\n")
|
| except Exception as e:
|
| print(f"[LOG ERROR] 写入 Gemini 日志失败: {e}")
|
|
|
| def _send_request(self, text: str, images: List[Dict] = None, model: str = None) -> ChatCompletionResponse:
|
| """发送请求到 Gemini"""
|
| url = f"{self.BASE_URL}/_/BardChatUi/data/assistant.lamda.BardFrontendService/StreamGenerate"
|
|
|
| params = {
|
| "bl": self.bl,
|
| "f.sid": "",
|
| "hl": "zh-CN",
|
| "_reqid": str(self.request_count * 100000 + random.randint(10000, 99999)),
|
| "rt": "c",
|
| }
|
|
|
|
|
| model_id = self.model_ids.get("flash", "56fdd199312815e2")
|
| if model:
|
| model_lower = model.lower()
|
| if "pro" in model_lower:
|
| model_id = self.model_ids.get("pro", "e6fa609c3fa255c0")
|
| elif "thinking" in model_lower or "think" in model_lower:
|
| model_id = self.model_ids.get("thinking", "e051ce1aa80aa576")
|
|
|
|
|
| image_paths = []
|
| if images and len(images) > 0:
|
| if not self.push_id:
|
| print("⚠️ 图片上传需要 push-id,请运行: python get_push_id.py")
|
| print(" 然后将获取的 push-id 添加到 config.py")
|
| else:
|
| try:
|
| for img in images:
|
|
|
| img_data = base64.b64decode(img["data"])
|
|
|
| path = self._upload_image(img_data, img["mime_type"])
|
| image_paths.append(path)
|
| if self.debug:
|
| print(f"[DEBUG] 图片上传成功: {path[:50]}...")
|
| except Exception as e:
|
| print(f"⚠️ 图片上传失败: {e}")
|
| image_paths = []
|
|
|
| req_data = self._build_request_data(text, images, image_paths, model)
|
|
|
| form_data = {
|
| "f.req": req_data,
|
| "at": self.snlm0e,
|
| }
|
|
|
|
|
| model_headers = {
|
| "x-goog-ext-525001261-jspb": json.dumps([1, None, None, None, model_id, None, None, 0, [4], None, None, 2], separators=(',', ':')),
|
| }
|
|
|
|
|
| gemini_request_log = {
|
| "url": url,
|
| "params": params,
|
| "text": text,
|
| "model": model,
|
| "model_id": model_id,
|
| "has_images": len(images) > 0 if images else False,
|
| "image_paths": image_paths,
|
| "f_req_preview": req_data[:500] + "..." if len(req_data) > 500 else req_data,
|
| }
|
|
|
| if self.debug:
|
| print(f"[DEBUG] 请求 URL: {url}")
|
| print(f"[DEBUG] AT Token: {self.snlm0e[:30]}...")
|
| print(f"[DEBUG] 模型: {model or '默认'}, ID: {model_id}")
|
| if image_paths:
|
| print(f"[DEBUG] 请求数据前300字符: {req_data[:300]}")
|
|
|
|
|
| max_retries = 3
|
| last_error = None
|
|
|
| for attempt in range(max_retries):
|
| try:
|
| resp = self.session.post(url, params=params, data=form_data, headers=model_headers, timeout=60.0)
|
|
|
| if self.debug:
|
| print(f"[DEBUG] 响应状态: {resp.status_code}")
|
| print(f"[DEBUG] 响应内容前500字符: {resp.text[:500]}")
|
|
|
| with open("debug_image_response.txt", "w", encoding="utf-8") as f:
|
| f.write(resp.text)
|
| print(f"[DEBUG] 完整响应已保存到 debug_image_response.txt")
|
|
|
|
|
| self._log_gemini_call(gemini_request_log, resp.text)
|
|
|
| resp.raise_for_status()
|
| self.request_count += 1
|
|
|
| reply_text = self._parse_response(resp.text)
|
|
|
|
|
| self.messages.append(Message(role="assistant", content=reply_text))
|
|
|
|
|
| return ChatCompletionResponse(
|
| id=f"chatcmpl-{self.conversation_id or 'gemini'}-{int(time.time())}",
|
| created=int(time.time()),
|
| model="gemini-web",
|
| choices=[
|
| ChatCompletionChoice(
|
| index=0,
|
| message=Message(role="assistant", content=reply_text),
|
| finish_reason="stop"
|
| )
|
| ],
|
| usage=Usage(
|
| prompt_tokens=len(text),
|
| completion_tokens=len(reply_text),
|
| total_tokens=len(text) + len(reply_text)
|
| )
|
| )
|
|
|
| except httpx.HTTPStatusError as e:
|
| self._log_gemini_call(gemini_request_log, e.response.text if hasattr(e, 'response') else "", error=f"HTTP {e.response.status_code}")
|
| raise Exception(f"HTTP 错误: {e.response.status_code}")
|
| except (httpx.RemoteProtocolError, httpx.ReadError, httpx.ConnectError) as e:
|
|
|
| last_error = e
|
| if attempt < max_retries - 1:
|
| wait_time = (attempt + 1) * 2
|
| print(f"⚠️ 连接中断,{wait_time}秒后重试 ({attempt + 1}/{max_retries})...")
|
| time.sleep(wait_time)
|
| continue
|
| self._log_gemini_call(gemini_request_log, "", error=str(e))
|
| raise Exception(f"网络连接失败(已重试{max_retries}次): {e}")
|
| except Exception as e:
|
| self._log_gemini_call(gemini_request_log, "", error=str(e))
|
| raise Exception(f"请求失败: {e}")
|
|
|
|
|
| if last_error:
|
| raise Exception(f"请求失败(已重试{max_retries}次): {last_error}")
|
|
|
| def _extract_text_from_inner_json(self, inner_json: list) -> str: |
| """Extract reply text from a single inner JSON packet and update session IDs.""" |
| try: |
| if not inner_json: |
| return "" |
|
|
| if len(inner_json) > 1 and inner_json[1]: |
| if isinstance(inner_json[1], list): |
| if len(inner_json[1]) > 0: |
| self.conversation_id = inner_json[1][0] or self.conversation_id |
| if len(inner_json[1]) > 1: |
| self.response_id = inner_json[1][1] or self.response_id |
|
|
| if len(inner_json) > 4 and inner_json[4]: |
| candidates = inner_json[4] |
| if candidates and len(candidates) > 0: |
| candidate = candidates[0] |
| if candidate and len(candidate) > 1 and candidate[1]: |
| if len(candidate) > 0: |
| self.choice_id = candidate[0] or self.choice_id |
| text = candidate[1][0] if isinstance(candidate[1], list) else candidate[1] |
| return text if isinstance(text, str) else str(text) |
| except Exception: |
| pass |
| return "" |
|
|
| def _send_request_stream(self, text: str, images: List[Dict] = None, model: str = None) -> Iterator[Dict[str, Any]]: |
| """True streaming path: parse Gemini length-prefixed frames and emit deltas.""" |
| url = f"{self.BASE_URL}/_/BardChatUi/data/assistant.lamda.BardFrontendService/StreamGenerate" |
|
|
| params = { |
| "bl": self.bl, |
| "f.sid": "", |
| "hl": "zh-CN", |
| "_reqid": str(self.request_count * 100000 + random.randint(10000, 99999)), |
| "rt": "c", |
| } |
|
|
| model_id = self.model_ids.get("flash", "56fdd199312815e2") |
| if model: |
| model_lower = model.lower() |
| if "pro" in model_lower: |
| model_id = self.model_ids.get("pro", "e6fa609c3fa255c0") |
| elif "thinking" in model_lower or "think" in model_lower: |
| model_id = self.model_ids.get("thinking", "e051ce1aa80aa576") |
|
|
| image_paths = [] |
| if images and len(images) > 0 and self.push_id: |
| for img in images: |
| img_data = base64.b64decode(img["data"]) |
| image_paths.append(self._upload_image(img_data, img["mime_type"])) |
|
|
| req_data = self._build_request_data(text, images, image_paths, model) |
| form_data = {"f.req": req_data, "at": self.snlm0e} |
| model_headers = { |
| "x-goog-ext-525001261-jspb": json.dumps( |
| [1, None, None, None, model_id, None, None, 0, [4], None, None, 2], |
| separators=(",", ":"), |
| ), |
| } |
|
|
| completion_id = f"chatcmpl-{uuid.uuid4().hex[:24]}" |
| created = int(time.time()) |
| chunk_model = model or "gemini-web" |
|
|
| yield { |
| "id": completion_id, |
| "object": "chat.completion.chunk", |
| "created": created, |
| "model": chunk_model, |
| "choices": [{"index": 0, "delta": {"role": "assistant"}, "finish_reason": None}], |
| } |
|
|
| full_text = "" |
| raw_lines: List[str] = [] |
|
|
| def utf16_char_len(ch: str) -> int: |
| return 2 if ord(ch) > 0xFFFF else 1 |
|
|
| def take_utf16_units(s: str, start: int, target_units: int) -> tuple[int, int]: |
| count = 0 |
| used = 0 |
| n = len(s) |
| while (start + count) < n and used < target_units: |
| c = s[start + count] |
| u = utf16_char_len(c) |
| if used + u > target_units: |
| break |
| used += u |
| count += 1 |
| return count, used |
|
|
| def parse_length_prefixed_frames(buffer: str) -> tuple[List[Any], str]: |
| frames: List[Any] = [] |
| pos = 0 |
| n = len(buffer) |
| while pos < n: |
| while pos < n and buffer[pos].isspace(): |
| pos += 1 |
| if pos >= n: |
| break |
|
|
| m = re.match(r"(\d+)\n", buffer[pos:]) |
| if not m: |
| break |
|
|
| frame_units = int(m.group(1)) |
| marker_len = len(m.group(1)) |
| start_content = pos + marker_len |
| char_count, used_units = take_utf16_units(buffer, start_content, frame_units) |
| if used_units < frame_units: |
| break |
|
|
| end_content = start_content + char_count |
| chunk = buffer[start_content:end_content].strip() |
| pos = end_content |
|
|
| if not chunk: |
| continue |
|
|
| try: |
| parsed = json.loads(chunk) |
| if isinstance(parsed, list): |
| frames.extend(parsed) |
| else: |
| frames.append(parsed) |
| except Exception: |
| continue |
|
|
| return frames, buffer[pos:] |
|
|
| def calc_delta(new_text: str, sent_text: str) -> str: |
| if not new_text: |
| return "" |
| if new_text.startswith(sent_text): |
| return new_text[len(sent_text):] |
| if sent_text.startswith(new_text): |
| return "" |
| common = 0 |
| max_common = min(len(new_text), len(sent_text)) |
| while common < max_common and new_text[common] == sent_text[common]: |
| common += 1 |
| return new_text[common:] |
|
|
| def process_packet(packet: Any) -> Iterator[Dict[str, Any]]: |
| nonlocal full_text |
| if not isinstance(packet, list) or len(packet) < 3 or packet[0] != "wrb.fr" or not packet[2]: |
| return |
| try: |
| inner_json = json.loads(packet[2]) |
| raw_lines.append(json.dumps([packet], ensure_ascii=False)) |
| text_candidate = self._extract_text_from_inner_json(inner_json) |
| if text_candidate: |
| delta = calc_delta(text_candidate, full_text) |
| if delta: |
| full_text = text_candidate |
| yield { |
| "id": completion_id, |
| "object": "chat.completion.chunk", |
| "created": created, |
| "model": chunk_model, |
| "choices": [{"index": 0, "delta": {"content": delta}, "finish_reason": None}], |
| } |
| except Exception: |
| return |
|
|
| with self.session.stream( |
| "POST", |
| url, |
| params=params, |
| data=form_data, |
| headers=model_headers, |
| timeout=60.0, |
| ) as resp: |
| resp.raise_for_status() |
| self.request_count += 1 |
|
|
| decoder = codecs.getincrementaldecoder("utf-8")(errors="replace") |
| stream_buffer = "" |
|
|
| for chunk_bytes in resp.iter_bytes(): |
| if not chunk_bytes: |
| continue |
|
|
| stream_buffer += decoder.decode(chunk_bytes, final=False) |
| if stream_buffer.startswith(")]}'"): |
| stream_buffer = stream_buffer[4:].lstrip() |
|
|
| packets, stream_buffer = parse_length_prefixed_frames(stream_buffer) |
| for packet in packets: |
| yield from process_packet(packet) |
|
|
| if len(stream_buffer) > 2_000_000: |
| stream_buffer = "" |
|
|
| stream_buffer += decoder.decode(b"", final=True) |
| tail_packets, _ = parse_length_prefixed_frames(stream_buffer) |
| for packet in tail_packets: |
| yield from process_packet(packet) |
|
|
| final_reply = self._parse_response("\n".join(raw_lines)) if raw_lines else full_text |
| if not final_reply: |
| final_reply = full_text |
|
|
| if final_reply and len(final_reply) > len(full_text) and final_reply.startswith(full_text): |
| tail = final_reply[len(full_text):] |
| if tail: |
| yield { |
| "id": completion_id, |
| "object": "chat.completion.chunk", |
| "created": created, |
| "model": chunk_model, |
| "choices": [{"index": 0, "delta": {"content": tail}, "finish_reason": None}], |
| } |
|
|
| self.messages.append(Message(role="assistant", content=final_reply)) |
| yield { |
| "id": completion_id, |
| "object": "chat.completion.chunk", |
| "created": created, |
| "model": chunk_model, |
| "choices": [{"index": 0, "delta": {}, "finish_reason": "stop"}], |
| } |
|
|
| def reset(self): |
| """重置会话上下文"""
|
| self.conversation_id = ""
|
| self.response_id = ""
|
| self.choice_id = ""
|
| self.messages = []
|
|
|
| def get_history(self) -> List[Dict]:
|
| """获取消息历史 (OpenAI 格式)"""
|
| return [{"role": m.role, "content": m.content} for m in self.messages]
|
|
|
|
|
|
|
| class OpenAICompatible:
|
| """OpenAI SDK 兼容封装"""
|
|
|
| def __init__(self, client: GeminiClient):
|
| self.client = client
|
| self.chat = self.Chat(client)
|
|
|
| class Chat:
|
| def __init__(self, client: GeminiClient):
|
| self.client = client
|
| self.completions = self.Completions(client)
|
|
|
| class Completions:
|
| def __init__(self, client: GeminiClient):
|
| self.client = client
|
|
|
| def create( |
| self, |
| model: str = "gemini-web", |
| messages: List[Dict] = None, |
| **kwargs |
| ) -> ChatCompletionResponse: |
| return self.client.chat( |
| messages=messages, |
| model=model, |
| stream=bool(kwargs.get("stream", False)), |
| ) |
|
|