habulaj's picture
Update gemini_client/core.py
49b0506 verified
# -*- coding: utf-8 -*-
#########################################
# Code Modified to use curl_cffi
#########################################
import asyncio
import json
import os
import random
import re
import string
from pathlib import Path
from datetime import datetime
from typing import Dict, List, Tuple, Union, Optional
from gemini_client.enums import Endpoint, Headers, Model
# Use curl_cffi for requests
from curl_cffi import CurlError
from curl_cffi.requests import AsyncSession
# Import common request exceptions (curl_cffi often wraps these)
from requests.exceptions import RequestException, Timeout, HTTPError
# For image models using validation. Adjust based on organization internal pydantic.
# Updated import for Pydantic V2
from pydantic import BaseModel, field_validator
# Rich is retained for logging within image methods.
from rich.console import Console
from rich.markdown import Markdown
console = Console()
#########################################
# New Enums and functions for endpoints,
# headers, models, file upload and images.
#########################################
#########################################
# Cookie loading and Chatbot classes
#########################################
from gemini_client.utils import upload_file, load_cookies, save_cookies
class Chatbot:
"""
Synchronous wrapper for the AsyncChatbot class.
This class provides a synchronous interface to interact with Google Gemini,
handling authentication, conversation management, and message sending.
Attributes:
loop (asyncio.AbstractEventLoop): Event loop for running async tasks.
secure_1psid (str): Authentication cookie.
secure_1psidts (str): Authentication cookie.
async_chatbot (AsyncChatbot): Underlying asynchronous chatbot instance.
"""
def __init__(
self,
cookie_path: str,
proxy: Optional[Union[str, Dict[str, str]]] = None, # Allow string or dict proxy
timeout: int = 20,
model: Model = Model.UNSPECIFIED,
impersonate: str = "chrome110" # Added impersonate
):
# Use asyncio.run() for cleaner async execution in sync context
# Handle potential RuntimeError if an event loop is already running
try:
self.loop = asyncio.get_running_loop()
except RuntimeError:
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
self.secure_1psid, self.secure_1psidts, additional_cookies = load_cookies(cookie_path)
self.async_chatbot = self.loop.run_until_complete(
AsyncChatbot.create(self.secure_1psid, self.secure_1psidts, proxy, timeout, model, impersonate, additional_cookies, cookie_path)
)
def save_conversation(self, file_path: str, conversation_name: str):
return self.loop.run_until_complete(
self.async_chatbot.save_conversation(file_path, conversation_name)
)
def load_conversations(self, file_path: str) -> List[Dict]:
return self.loop.run_until_complete(
self.async_chatbot.load_conversations(file_path)
)
def load_conversation(self, file_path: str, conversation_name: str) -> bool:
return self.loop.run_until_complete(
self.async_chatbot.load_conversation(file_path, conversation_name)
)
def refresh_cookie(self) -> bool:
"""
Atualiza proativamente o cookie __Secure-1PSIDTS e salva automaticamente.
Útil para atualizar o cookie antes que expire.
Returns:
bool: True se o cookie foi atualizado com sucesso, False caso contrário.
"""
return self.loop.run_until_complete(
self.async_chatbot.refresh_cookie()
)
def ask(
self,
message: str,
image: Optional[Union[bytes, str, Path]] = None,
video: Optional[Union[bytes, str, Path]] = None,
audio: Optional[Union[bytes, str, Path]] = None
) -> dict:
# Pass media to async ask method
return self.loop.run_until_complete(self.async_chatbot.ask(message, image=image, video=video, audio=audio))
class AsyncChatbot:
"""
Asynchronous chatbot client for interacting with Google Gemini using curl_cffi.
This class manages authentication, session state, conversation history,
and sending/receiving messages (including images) asynchronously.
Attributes:
headers (dict): HTTP headers for requests.
_reqid (int): Request identifier for Gemini API.
SNlM0e (str): Session token required for API requests.
conversation_id (str): Current conversation ID.
response_id (str): Current response ID.
choice_id (str): Current choice ID.
proxy (str | dict | None): Proxy configuration.
proxies_dict (dict | None): Proxy dictionary for curl_cffi.
secure_1psid (str): Authentication cookie.
secure_1psidts (str): Authentication cookie.
session (AsyncSession): curl_cffi session for HTTP requests.
timeout (int): Request timeout in seconds.
model (Model): Selected Gemini model.
impersonate (str): Browser profile for curl_cffi to impersonate.
"""
__slots__ = [
"headers",
"_reqid",
"SNlM0e",
"conversation_id",
"response_id",
"choice_id",
"proxy", # Store the original proxy config
"proxies_dict", # Store the curl_cffi-compatible proxy dict
"secure_1psidts",
"secure_1psid",
"session",
"timeout",
"model",
"impersonate", # Store impersonate setting
"cookie_path", # Path to cookie file for auto-saving
"additional_cookies", # Store additional cookies
]
def __init__(
self,
secure_1psid: str,
secure_1psidts: str,
proxy: Optional[Union[str, Dict[str, str]]] = None, # Allow string or dict proxy
timeout: int = 20,
model: Model = Model.UNSPECIFIED,
impersonate: str = "chrome110", # Added impersonate
additional_cookies: Optional[Dict[str, str]] = None, # Additional cookies like COMPASS
cookie_path: Optional[str] = None, # Path to cookie file for auto-saving
):
headers = Headers.GEMINI.value.copy()
if model != Model.UNSPECIFIED:
headers.update(model.model_header)
self._reqid = int("".join(random.choices(string.digits, k=7))) # Increased length for less collision chance
self.proxy = proxy # Store original proxy setting
self.impersonate = impersonate # Store impersonate setting
# Prepare proxy dictionary for curl_cffi
self.proxies_dict = None
if isinstance(proxy, str):
self.proxies_dict = {"http": proxy, "https": proxy} # curl_cffi uses http/https keys
elif isinstance(proxy, dict):
self.proxies_dict = proxy # Assume it's already in the correct format
self.conversation_id = ""
self.response_id = ""
self.choice_id = ""
self.secure_1psid = secure_1psid
self.secure_1psidts = secure_1psidts
# Prepare cookies dict with required cookies and any additional ones
cookies_dict = {
"__Secure-1PSID": secure_1psid,
"__Secure-1PSIDTS": secure_1psidts
}
if additional_cookies:
cookies_dict.update(additional_cookies)
# Initialize curl_cffi AsyncSession
self.session = AsyncSession(
headers=headers,
cookies=cookies_dict,
proxies=self.proxies_dict,
timeout=timeout,
impersonate=self.impersonate
# verify and http2 are handled automatically by curl_cffi
)
# No need to set proxies/headers/cookies again, done in constructor
self.timeout = timeout # Store timeout for potential direct use in requests
self.model = model
self.SNlM0e = None # Initialize SNlM0e
self.cookie_path = cookie_path # Store cookie path for auto-saving
self.additional_cookies = additional_cookies or {} # Store additional cookies
@classmethod
async def create(
cls,
secure_1psid: str,
secure_1psidts: str,
proxy: Optional[Union[str, Dict[str, str]]] = None, # Allow string or dict proxy
timeout: int = 20,
model: Model = Model.UNSPECIFIED,
impersonate: str = "chrome110", # Added impersonate
additional_cookies: Optional[Dict[str, str]] = None, # Additional cookies like COMPASS
cookie_path: Optional[str] = None, # Path to cookie file for auto-saving
) -> "AsyncChatbot":
"""
Factory method to create and initialize an AsyncChatbot instance.
Fetches the necessary SNlM0e value asynchronously.
"""
instance = cls(secure_1psid, secure_1psidts, proxy, timeout, model, impersonate, additional_cookies, cookie_path)
try:
instance.SNlM0e = await instance.__get_snlm0e()
except Exception as e:
# Log the error and re-raise or handle appropriately
console.log(f"[red]Error during AsyncChatbot initialization (__get_snlm0e): {e}[/red]", style="bold red")
# Optionally close the session if initialization fails critically
await instance.session.close() # Use close() for AsyncSession
raise # Re-raise the exception to signal failure
return instance
async def refresh_cookie(self) -> bool:
"""
Atualiza proativamente o cookie __Secure-1PSIDTS e salva automaticamente.
Útil para atualizar o cookie antes que expire.
Returns:
bool: True se o cookie foi atualizado com sucesso, False caso contrário.
"""
try:
await self.__rotate_cookies()
# Atualizar SNlM0e após atualizar o cookie
self.SNlM0e = await self.__get_snlm0e()
return True
except Exception as e:
console.log(f"[red]Falha ao atualizar cookie proativamente: {e}[/red]")
return False
async def save_conversation(self, file_path: str, conversation_name: str) -> None:
# Logic remains the same
conversations = await self.load_conversations(file_path)
conversation_data = {
"conversation_name": conversation_name,
"_reqid": self._reqid,
"conversation_id": self.conversation_id,
"response_id": self.response_id,
"choice_id": self.choice_id,
"SNlM0e": self.SNlM0e,
"model_name": self.model.model_name, # Save the model used
"timestamp": datetime.now().isoformat(), # Add timestamp
}
found = False
for i, conv in enumerate(conversations):
if conv.get("conversation_name") == conversation_name:
conversations[i] = conversation_data # Update existing
found = True
break
if not found:
conversations.append(conversation_data) # Add new
try:
# Ensure directory exists
Path(file_path).parent.mkdir(parents=True, exist_ok=True)
with open(file_path, "w", encoding="utf-8") as f:
json.dump(conversations, f, indent=4, ensure_ascii=False)
except IOError as e:
console.log(f"[red]Error saving conversation to {file_path}: {e}[/red]")
raise
async def load_conversations(self, file_path: str) -> List[Dict]:
# Logic remains the same
if not os.path.isfile(file_path):
return []
try:
with open(file_path, 'r', encoding="utf-8") as f:
return json.load(f)
except (json.JSONDecodeError, IOError) as e:
console.log(f"[red]Error loading conversations from {file_path}: {e}[/red]")
return []
async def load_conversation(self, file_path: str, conversation_name: str) -> bool:
# Logic remains the same, but update headers on the session
conversations = await self.load_conversations(file_path)
for conversation in conversations:
if conversation.get("conversation_name") == conversation_name:
try:
self._reqid = conversation["_reqid"]
self.conversation_id = conversation["conversation_id"]
self.response_id = conversation["response_id"]
self.choice_id = conversation["choice_id"]
self.SNlM0e = conversation["SNlM0e"]
if "model_name" in conversation:
try:
self.model = Model.from_name(conversation["model_name"])
# Update headers in the session if model changed
self.session.headers.update(self.model.model_header)
except ValueError as e:
console.log(f"[yellow]Warning: Model '{conversation['model_name']}' from saved conversation not found. Using current model '{self.model.model_name}'. Error: {e}[/yellow]")
console.log(f"Loaded conversation '{conversation_name}'")
return True
except KeyError as e:
console.log(f"[red]Error loading conversation '{conversation_name}': Missing key {e}[/red]")
return False
console.log(f"[yellow]Conversation '{conversation_name}' not found in {file_path}[/yellow]")
return False
async def __get_snlm0e(self):
"""Fetches the SNlM0e value required for API requests using curl_cffi."""
if not self.secure_1psid:
raise ValueError("__Secure-1PSID cookie is required.")
try:
# Use the session's get method
resp = await self.session.get(
Endpoint.INIT.value,
timeout=self.timeout # Timeout is already set in session, but can override
# follow_redirects is handled automatically by curl_cffi
)
resp.raise_for_status() # Raise HTTPError for bad responses (4xx or 5xx)
# Check for authentication issues
if "Sign in to continue" in resp.text or "accounts.google.com" in str(resp.url):
raise PermissionError("Authentication failed. Cookies might be invalid or expired. Please update them.")
# Regex to find the SNlM0e value
snlm0e_match = re.search(r'''["']SNlM0e["']\s*:\s*["'](.*?)["']''', resp.text)
if not snlm0e_match:
# Se não encontrou SNlM0e, pode ser cookie expirado - tentar atualizar
console.log("[yellow]SNlM0e não encontrado na resposta. Tentando atualizar cookie...[/yellow]")
try:
# Tentar atualizar cookie
new_cookie = await self.__rotate_cookies()
# Se obteve novo cookie, atualizar na sessão e tentar novamente
if new_cookie and new_cookie != self.secure_1psidts:
# Retentar buscar SNlM0e após atualizar cookie
resp = await self.session.get(
Endpoint.INIT.value,
timeout=self.timeout
)
resp.raise_for_status()
if "Sign in to continue" in resp.text or "accounts.google.com" in str(resp.url):
raise PermissionError("Authentication failed. Cookies might be invalid or expired. Please update them.")
snlm0e_match = re.search(r'''["']SNlM0e["']\s*:\s*["'](.*?)["']''', resp.text)
if snlm0e_match:
console.log("[green]✓ SNlM0e obtido com sucesso após atualização do cookie![/green]")
return snlm0e_match.group(1)
# Se não conseguiu atualizar ou ainda não encontrou SNlM0e
error_message = "SNlM0e value not found in response."
if resp.status_code == 429:
error_message += " Rate limit likely exceeded."
else:
error_message += (
f" Response status: {resp.status_code}. "
f"Por favor, acesse https://gemini.google.com/app e copie os novos cookies "
f"__Secure-1PSID e __Secure-1PSIDTS manualmente."
)
raise ValueError(error_message)
except ValueError as rotate_error:
# Re-raise ValueError (já tem mensagem clara sobre atualização manual)
raise
except Exception as rotate_error:
console.log(f"[red]Falha ao atualizar cookie: {rotate_error}[/red]")
error_message = "SNlM0e value not found in response."
if resp.status_code == 429:
error_message += " Rate limit likely exceeded."
else:
error_message += (
f" Response status: {resp.status_code}. "
f"Falha ao atualizar cookie automaticamente. "
f"Por favor, acesse https://gemini.google.com/app e copie os novos cookies "
f"__Secure-1PSID e __Secure-1PSIDTS manualmente."
)
raise ValueError(error_message)
# Try to refresh PSIDTS if needed
if not self.secure_1psidts and "PSIDTS" not in self.session.cookies:
try:
# Attempt to rotate cookies to get a fresh PSIDTS
await self.__rotate_cookies()
except Exception as e:
console.log(f"[yellow]Warning: Could not refresh PSIDTS cookie: {e}[/yellow]")
# Continue anyway as some accounts don't need PSIDTS
return snlm0e_match.group(1)
except Timeout as e: # Catch requests.exceptions.Timeout
raise TimeoutError(f"Request timed out while fetching SNlM0e: {e}") from e
except (RequestException, CurlError) as e: # Catch general request errors and Curl specific errors
raise ConnectionError(f"Network error while fetching SNlM0e: {e}") from e
except HTTPError as e: # Catch requests.exceptions.HTTPError
# Tentar atualizar cookie automaticamente se for erro de autenticação
if e.response.status_code in (401, 403):
console.log(f"[yellow]Erro de autenticação ao buscar SNlM0e (status {e.response.status_code}). Tentando atualizar cookie...[/yellow]")
try:
await self.__rotate_cookies()
# Retentar buscar SNlM0e após atualizar cookie
resp = await self.session.get(
Endpoint.INIT.value,
timeout=self.timeout
)
resp.raise_for_status()
if "Sign in to continue" in resp.text or "accounts.google.com" in str(resp.url):
raise PermissionError("Authentication failed. Cookies might be invalid or expired. Please update them.")
snlm0e_match = re.search(r'''["']SNlM0e["']\s*:\s*["'](.*?)["']''', resp.text)
if snlm0e_match:
console.log("[green]✓ SNlM0e obtido com sucesso após atualização do cookie![/green]")
return snlm0e_match.group(1)
else:
raise ValueError("SNlM0e value not found in response after cookie update.")
except Exception as rotate_error:
console.log(f"[red]Falha ao atualizar cookie: {rotate_error}[/red]")
raise PermissionError(f"Authentication failed (status {e.response.status_code}). Cookie update failed. {e}") from e
else:
raise Exception(f"HTTP error {e.response.status_code} while fetching SNlM0e: {e}") from e
async def __rotate_cookies(self):
"""
Rotates the __Secure-1PSIDTS cookie and saves it automatically.
Returns the new cookie value if successful.
Usa todos os cookies disponíveis para autenticação.
"""
try:
console.log("[yellow]Atualizando cookie __Secure-1PSIDTS automaticamente...[/yellow]")
# Método mais confiável: Acessar a página do Gemini para obter novos cookies
# Isso funciona porque o Google atualiza o cookie automaticamente quando você acessa a página
console.log("[cyan]Acessando página do Gemini para obter novo cookie...[/cyan]")
init_response = await self.session.get(
Endpoint.INIT.value,
timeout=self.timeout
)
init_response.raise_for_status()
# Verificar se obteve novo cookie na resposta
new_1psidts = init_response.cookies.get("__Secure-1PSIDTS")
if new_1psidts and new_1psidts != self.secure_1psidts:
self.secure_1psidts = new_1psidts
self.session.cookies.set("__Secure-1PSIDTS", new_1psidts)
if self.cookie_path:
try:
save_cookies(
self.cookie_path,
self.secure_1psid,
self.secure_1psidts,
self.additional_cookies
)
console.log("[green]✓ Cookie atualizado e salvo automaticamente via acesso à página![/green]")
except Exception as save_error:
console.log(f"[yellow]Aviso: Cookie atualizado mas não foi possível salvar: {save_error}[/yellow]")
return new_1psidts
# Se não obteve novo cookie, tentar endpoint de rotação como fallback
console.log("[yellow]Não obteve novo cookie via página, tentando endpoint de rotação...[/yellow]")
try:
response = await self.session.post(
Endpoint.ROTATE_COOKIES.value,
headers={
"Content-Type": "application/json",
"Origin": "https://accounts.google.com",
"Referer": "https://accounts.google.com/",
"X-Requested-With": "XMLHttpRequest"
},
json=[0, "-0000000000000000000"],
timeout=self.timeout
)
if response.status_code == 200:
new_1psidts = response.cookies.get("__Secure-1PSIDTS")
if new_1psidts:
self.secure_1psidts = new_1psidts
self.session.cookies.set("__Secure-1PSIDTS", new_1psidts)
if self.cookie_path:
try:
save_cookies(
self.cookie_path,
self.secure_1psid,
self.secure_1psidts,
self.additional_cookies
)
console.log("[green]✓ Cookie atualizado via endpoint de rotação![/green]")
except Exception as save_error:
console.log(f"[yellow]Aviso: Cookie atualizado mas não foi possível salvar: {save_error}[/yellow]")
return new_1psidts
# Se endpoint retornou 404, pode ser que não exista mais ou mudeu
if response.status_code == 404:
console.log("[yellow]Endpoint de rotação retornou 404 - pode ter mudado ou não estar disponível[/yellow]")
except Exception as rotate_error:
console.log(f"[yellow]Endpoint de rotação falhou: {rotate_error}[/yellow]")
# Se não conseguiu atualizar, retornar o cookie atual (pode ainda ser válido)
console.log("[yellow]Não foi possível obter novo cookie - usando cookie atual[/yellow]")
return self.secure_1psidts
except Exception as e:
console.log(f"[red]Falha ao atualizar cookie: {e}[/red]")
# Se falhou completamente, retornar o cookie atual em vez de falhar
# Isso permite que o sistema continue funcionando mesmo se não conseguir atualizar
console.log("[yellow]Retornando cookie atual - sistema continuará funcionando[/yellow]")
return self.secure_1psidts
async def ask(
self,
message: str,
image: Optional[Union[bytes, str, Path]] = None,
video: Optional[Union[bytes, str, Path]] = None,
audio: Optional[Union[bytes, str, Path]] = None
) -> dict:
"""
Sends a message to Google Gemini and returns the response using curl_cffi.
Parameters:
message: str
The message to send.
image: Optional[Union[bytes, str, Path]]
Optional image data (bytes) or path to an image file to include.
video: Optional[Union[bytes, str, Path]]
Optional video data (bytes) or path to a video file to include.
audio: Optional[Union[bytes, str, Path]]
Optional audio data (bytes) or path to an audio file to include.
Returns:
dict: A dictionary containing the response content and metadata.
"""
if self.SNlM0e is None:
raise RuntimeError("AsyncChatbot not properly initialized. Call AsyncChatbot.create()")
params = {
"bl": "boq_assistant-bard-web-server_20240625.13_p0",
"_reqid": str(self._reqid),
"rt": "c",
}
# Handle media upload (image, video, or audio) - only one at a time
media_upload_id = None
media_filename = None
media_mime_type = None
media_file = None
media_type = None # Initialize to None
# Determine which media type to use (priority: image > video > audio)
if image:
media_file = image
media_type = "image"
elif video:
media_file = video
media_type = "video"
elif audio:
media_file = audio
media_type = "audio"
if media_file:
try:
# Get filename and mime type if file is a path
if not isinstance(media_file, bytes):
file_path = Path(media_file)
media_filename = file_path.name
import mimetypes
media_mime_type, _ = mimetypes.guess_type(str(file_path))
if not media_mime_type:
# Fallback based on file extension
ext = file_path.suffix.lower()
if media_type == "video" or ext in ['.mp4', '.avi', '.mov', '.webm', '.mkv']:
media_mime_type = "video/mp4"
elif media_type == "audio" or ext in ['.mp3', '.wav', '.ogg', '.flac', '.aac', '.m4a']:
media_mime_type = "audio/mpeg"
else:
media_mime_type = "image/jpeg"
else:
# Default filenames for bytes
if media_type == "video":
media_filename = "video.mp4"
media_mime_type = "video/mp4"
elif media_type == "audio":
media_filename = "audio.mp3"
media_mime_type = "audio/mpeg"
else:
media_filename = "image.jpg"
media_mime_type = "image/jpeg"
# Pass proxy and impersonate settings to upload_file
media_upload_id = await upload_file(media_file, proxy=self.proxies_dict, impersonate=self.impersonate)
console.log(f"{media_type.capitalize()} uploaded successfully. ID: {media_upload_id}")
except Exception as e:
console.log(f"[red]Error uploading {media_type}: {e}[/red]")
return {"content": f"Error uploading {media_type}: {e}", "error": True}
# Always start a new conversation (no history/context)
# Reset conversation IDs to ensure each message is independent
self.conversation_id = ""
self.response_id = ""
self.choice_id = ""
# Prepare message structure
if media_upload_id:
# Structure when media (image/video/audio) is included (based on real Gemini request):
# The structure has many more fields than just the message
# Element 0: [message, 0, null, [[[media_id, 1, null, mime_type], filename, null*7, [0]]], null, null, 0]
# Element 1: ["pt"] - language
# Element 2: ["", "", "", null, null, null, null, null, null, ""] - empty strings array
# Element 3+: Many more fields (tokens, hashes, etc.)
media_data = [
[media_upload_id, 1, None, media_mime_type],
media_filename,
None, None, None, None, None, None,
[0]
]
# Build the complete structure with all required fields
message_struct = [
[message, 0, None, [media_data], None, None, 0], # Element 0: message with media
["pt"], # Element 1: language
["", "", "", None, None, None, None, None, None, ""], # Element 2: empty strings array
# Note: The real request has many more fields (tokens, hashes, etc.) but
# we'll start with the minimal required structure and see if it works
]
else:
# Even without media, start fresh conversation
message_struct = [
[message],
None,
["", "", ""], # Empty conversation IDs for new conversation
]
# Prepare request data
# Use separators to match exact format (no spaces) like Gemini expects
inner_json = json.dumps(message_struct, ensure_ascii=False, separators=(',', ':'))
data = {
"f.req": json.dumps([None, inner_json], ensure_ascii=False, separators=(',', ':')),
"at": self.SNlM0e,
}
try:
# Debug: log the request structure if there's media
if media_upload_id:
console.log(f"[cyan]Debug - Message struct with {media_type}:[/cyan] {json.dumps(message_struct, indent=2)[:500]}")
console.log(f"[cyan]Debug - {media_type.capitalize()} ID:[/cyan] {media_upload_id}")
console.log(f"[cyan]Debug - {media_type.capitalize()} filename:[/cyan] {media_filename}")
console.log(f"[cyan]Debug - {media_type.capitalize()} mime type:[/cyan] {media_mime_type}")
# Send request
# Use longer timeout for videos and audios (they take longer to process)
request_timeout = self.timeout
if media_upload_id:
if media_type == "video":
request_timeout = max(self.timeout, 300) # At least 5 minutes for videos (they can be slow)
elif media_type == "audio":
request_timeout = max(self.timeout, 180) # At least 3 minutes for audios
else:
request_timeout = max(self.timeout, 60) # At least 1 minute for images
resp = await self.session.post(
Endpoint.GENERATE.value,
params=params,
data=data,
timeout=request_timeout,
)
# Check status before raising
if resp.status_code != 200:
# Detectar erros de autenticação e tentar atualizar cookie automaticamente
if resp.status_code in (401, 403):
console.log(f"[yellow]Erro de autenticação detectado (status {resp.status_code}). Tentando atualizar cookie automaticamente...[/yellow]")
try:
# Tentar atualizar o cookie
await self.__rotate_cookies()
# Atualizar SNlM0e com o novo cookie
self.SNlM0e = await self.__get_snlm0e()
# Atualizar o token na requisição
data["at"] = self.SNlM0e
# Retentar a requisição uma vez
console.log("[cyan]Retentando requisição com cookie atualizado...[/cyan]")
resp = await self.session.post(
Endpoint.GENERATE.value,
params=params,
data=data,
timeout=request_timeout,
)
if resp.status_code == 200:
console.log("[green]✓ Requisição bem-sucedida após atualização do cookie![/green]")
else:
console.log(f"[red]Ainda recebendo status {resp.status_code} após atualização do cookie[/red]")
except Exception as rotate_error:
console.log(f"[red]Falha ao atualizar cookie automaticamente: {rotate_error}[/red]")
if resp.status_code != 200:
console.log(f"[red]Non-200 status code: {resp.status_code}[/red]")
console.log(f"[yellow]Response headers:[/yellow] {dict(resp.headers)}")
console.log(f"[yellow]Response text (first 3000 chars):[/yellow]\n{resp.text[:3000]}")
console.log(f"[yellow]Request URL:[/yellow] {resp.url}")
console.log(f"[yellow]Request params:[/yellow] {params}")
# Log the data being sent (sanitized)
debug_data = data.copy()
if 'f.req' in debug_data:
# Show first 500 chars of f.req
console.log(f"[yellow]f.req (first 500 chars):[/yellow] {debug_data['f.req'][:500]}")
resp.raise_for_status()
# Process response
lines = resp.text.splitlines()
if len(lines) < 3:
raise ValueError(f"Unexpected response format. Status: {resp.status_code}. Content: {resp.text[:200]}...")
# Find the line with the response data - process all JSON lines
body = None
body_index = 0
response_json = None # Store the full response_json for model extraction
all_parsed_parts = [] # Store all parsed parts for model search
# Try to parse all JSON lines in the response
for line_index, line in enumerate(lines):
# Skip empty lines and the prefix line
if not line.strip() or line.startswith(")]}'") or line.isdigit():
continue
# Try to parse JSON lines
if line.startswith("["):
try:
parsed_json = json.loads(line)
response_json = parsed_json # Store for later use
# Process all parts in this JSON array
for part_index, part in enumerate(parsed_json):
try:
if isinstance(part, list) and len(part) > 2:
# part[2] might be a string that needs parsing
if isinstance(part[2], str):
main_part = json.loads(part[2])
else:
main_part = part[2]
# Store all parsed parts for model search
if main_part:
all_parsed_parts.append(main_part)
# Check if this part contains conversation data
if main_part and isinstance(main_part, list) and len(main_part) > 4 and main_part[4]:
body = main_part
body_index = part_index
except (IndexError, TypeError, json.JSONDecodeError, AttributeError):
continue
# If we found a body, stop looking
if body:
break
except json.JSONDecodeError:
continue
if not body:
return {"content": "Failed to parse response body. No valid data found.", "error": True}
# Extract data from the response
try:
# Extract main content
content = ""
if len(body) > 4 and len(body[4]) > 0 and len(body[4][0]) > 1:
content = body[4][0][1][0] if len(body[4][0][1]) > 0 else ""
# Extract conversation metadata
conversation_id = body[1][0] if len(body) > 1 and len(body[1]) > 0 else self.conversation_id
response_id = body[1][1] if len(body) > 1 and len(body[1]) > 1 else self.response_id
# Extract additional data
factualityQueries = body[3] if len(body) > 3 else None
textQuery = body[2][0] if len(body) > 2 and body[2] else ""
# Extract choices
choices = []
if len(body) > 4:
for candidate in body[4]:
if len(candidate) > 1 and isinstance(candidate[1], list) and len(candidate[1]) > 0:
choices.append({"id": candidate[0], "content": candidate[1][0]})
choice_id = choices[0]["id"] if choices else self.choice_id
# Extract images - multiple possible formats
images = []
# Format 1: Regular web images
if len(body) > 4 and len(body[4]) > 0 and len(body[4][0]) > 4 and body[4][0][4]:
for img_data in body[4][0][4]:
try:
img_url = img_data[0][0][0]
img_alt = img_data[2] if len(img_data) > 2 else ""
img_title = img_data[1] if len(img_data) > 1 else "[Image]"
images.append({"url": img_url, "alt": img_alt, "title": img_title})
except (IndexError, TypeError):
console.log("[yellow]Warning: Could not parse image data structure (format 1).[/yellow]")
continue
# Format 2: Generated images in standard location
generated_images = []
if len(body) > 4 and len(body[4]) > 0 and len(body[4][0]) > 12 and body[4][0][12]:
try:
# Path 1: Check for images in [12][7][0]
if body[4][0][12][7] and body[4][0][12][7][0]:
# This is the standard path for generated images
for img_index, img_data in enumerate(body[4][0][12][7][0]):
try:
img_url = img_data[0][3][3]
img_title = f"[Generated Image {img_index+1}]"
img_alt = img_data[3][5][0] if len(img_data[3]) > 5 and len(img_data[3][5]) > 0 else ""
generated_images.append({"url": img_url, "alt": img_alt, "title": img_title})
except (IndexError, TypeError):
continue
# If we found images, but they might be in a different part of the response
if not generated_images:
# Look for image generation data in other response parts
for part_index, part in enumerate(response_json):
if part_index <= body_index:
continue
try:
img_part = json.loads(part[2])
if img_part[4][0][12][7][0]:
for img_index, img_data in enumerate(img_part[4][0][12][7][0]):
try:
img_url = img_data[0][3][3]
img_title = f"[Generated Image {img_index+1}]"
img_alt = img_data[3][5][0] if len(img_data[3]) > 5 and len(img_data[3][5]) > 0 else ""
generated_images.append({"url": img_url, "alt": img_alt, "title": img_title})
except (IndexError, TypeError):
continue
break
except (IndexError, TypeError, json.JSONDecodeError):
continue
except (IndexError, TypeError):
pass
# Format 3: Alternative location for generated images
if len(generated_images) == 0 and len(body) > 4 and len(body[4]) > 0:
try:
# Try to find images in candidate[4] structure
candidate = body[4][0]
if len(candidate) > 22 and candidate[22]:
# Look for URLs in the candidate[22] field
import re
content = candidate[22][0] if isinstance(candidate[22], list) and len(candidate[22]) > 0 else str(candidate[22])
urls = re.findall(r'https?://[^\s]+', content)
for i, url in enumerate(urls):
# Clean up URL if it ends with punctuation
if url[-1] in ['.', ',', ')', ']', '}', '"', "'"]:
url = url[:-1]
generated_images.append({
"url": url,
"title": f"[Generated Image {i+1}]",
"alt": ""
})
except (IndexError, TypeError) as e:
console.log(f"[yellow]Warning: Could not parse alternative image structure: {e}[/yellow]")
# Format 4: Look for image URLs in the text content
if len(images) == 0 and len(generated_images) == 0 and content:
try:
import re
# Look for image URLs in the content - try multiple patterns
# Pattern 1: Standard image URLs
urls = re.findall(r'(https?://[^\s]+\.(jpg|jpeg|png|gif|webp))', content.lower())
# Pattern 2: Google image URLs (which might not have extensions)
google_urls = re.findall(r'(https?://lh\d+\.googleusercontent\.com/[^\s]+)', content)
# Pattern 3: General URLs that might be images
general_urls = re.findall(r'(https?://[^\s]+)', content)
# Combine all found URLs
all_urls = []
if urls:
all_urls.extend([url_tuple[0] for url_tuple in urls])
if google_urls:
all_urls.extend(google_urls)
# Add general URLs only if we didn't find any specific image URLs
if not all_urls and general_urls:
all_urls = general_urls
# Process all found URLs
if all_urls:
for i, url in enumerate(all_urls):
# Clean up URL if it ends with punctuation
if url[-1] in ['.', ',', ')', ']', '}', '"', "'"]:
url = url[:-1]
images.append({
"url": url,
"title": f"[Image in Content {i+1}]",
"alt": ""
})
console.log(f"[green]Found {len(all_urls)} potential image URLs in content.[/green]")
except Exception as e:
console.log(f"[yellow]Warning: Error extracting URLs from content: {e}[/yellow]")
# Combine all images
all_images = images + generated_images
# Extract model name from response
# Model appears near the end of the structure, look for strings like "Fast", "3 Pro", "Thinking"
model_name = self.model.model_name # Default fallback
try:
def find_model_in_structure(obj, depth=0, max_depth=15):
"""Recursively search for model name"""
if depth > max_depth:
return None
if isinstance(obj, list):
# Check elements from end to beginning (model is usually near the end)
for i in range(len(obj) - 1, -1, -1):
item = obj[i]
if isinstance(item, str):
# Common model names: Fast, 3 Pro, Thinking, Pro, Exp
# Check for known model patterns
if item in ["Fast", "3 Pro", "Thinking", "Pro", "Exp", "Raciocinio"]:
# Verify it's followed by a boolean (typical pattern: model name, then bool)
if i + 1 < len(obj) and isinstance(obj[i + 1], bool):
return item
# Also check for patterns like "Pro" in the string
elif any(pattern in item for pattern in ["Pro", "Fast", "Thinking"]):
# More careful check - should be a short string
if len(item) < 20 and (i + 1 < len(obj) and isinstance(obj[i + 1], bool)):
return item
elif isinstance(item, (list, dict)):
result = find_model_in_structure(item, depth + 1, max_depth)
if result:
return result
elif isinstance(obj, dict):
for value in obj.values():
result = find_model_in_structure(value, depth + 1, max_depth)
if result:
return result
return None
# Search in the body structure first
found_model = find_model_in_structure(body)
# If not found in body, search in all parsed parts
if not found_model:
for parsed_part in all_parsed_parts:
found_model = find_model_in_structure(parsed_part)
if found_model:
break
# If still not found, search in the full response_json structure
if not found_model and response_json:
found_model = find_model_in_structure(response_json)
if found_model:
model_name = found_model
except Exception as e:
console.log(f"[yellow]Warning: Could not extract model name from response: {e}[/yellow]")
# Use default model name
# Prepare results
results = {
"content": content,
"conversation_id": conversation_id,
"response_id": response_id,
"factualityQueries": factualityQueries,
"textQuery": textQuery,
"choices": choices,
"images": all_images,
"model": model_name, # Use extracted model name from response
"error": False,
}
# Don't update state - we want each message to be independent (no conversation history)
# Reset IDs to ensure next message is a new conversation
self.conversation_id = ""
self.response_id = ""
self.choice_id = ""
self._reqid += random.randint(1000, 9000)
return results
except (IndexError, TypeError) as e:
console.log(f"[red]Error extracting data from response: {e}[/red]")
return {"content": f"Error extracting data from response: {e}", "error": True}
except json.JSONDecodeError as e:
console.log(f"[red]Error parsing JSON response: {e}[/red]")
return {"content": f"Error parsing JSON response: {e}. Response: {resp.text[:200]}...", "error": True}
except Timeout as e:
console.log(f"[red]Request timed out: {e}[/red]")
return {"content": f"Request timed out: {e}", "error": True}
except (RequestException, CurlError) as e:
error_msg = f"Network error: {e}"
# Try to get more details if it's an HTTPError wrapped
if hasattr(e, 'response') and e.response is not None:
try:
error_msg += f"\nStatus: {e.response.status_code}"
error_msg += f"\nResponse: {e.response.text[:1000]}"
console.log(f"[red]{error_msg}[/red]")
console.log(f"[yellow]Full response text (first 2000 chars):[/yellow]\n{e.response.text[:2000]}")
except:
pass
console.log(f"[red]{error_msg}[/red]")
return {"content": error_msg, "error": True}
except HTTPError as e:
# Detectar erros de autenticação e tentar atualizar cookie automaticamente
if e.response.status_code in (401, 403):
console.log(f"[yellow]Erro de autenticação detectado (status {e.response.status_code}). Tentando atualizar cookie automaticamente...[/yellow]")
try:
# Tentar atualizar o cookie
await self.__rotate_cookies()
# Atualizar SNlM0e com o novo cookie
self.SNlM0e = await self.__get_snlm0e()
# Retentar a requisição original
console.log("[cyan]Retentando requisição com cookie atualizado...[/cyan]")
return await self.ask(message, image=image, video=video, audio=audio)
except Exception as rotate_error:
console.log(f"[red]Falha ao atualizar cookie automaticamente: {rotate_error}[/red]")
error_details = f"HTTP error {e.response.status_code}: {e}"
try:
error_text = e.response.text[:1000] if hasattr(e.response, 'text') else str(e.response)
error_details += f"\nResponse: {error_text}"
console.log(f"[red]{error_details}[/red]")
# Log full response for debugging
if hasattr(e.response, 'text'):
console.log(f"[yellow]Full response text (first 2000 chars):[/yellow]\n{e.response.text[:2000]}")
except:
pass
return {"content": error_details, "error": True}
except Exception as e:
console.log(f"[red]An unexpected error occurred during ask: {e}[/red]", style="bold red")
return {"content": f"An unexpected error occurred: {e}", "error": True}
#########################################
# Imports for refactored classes
#########################################