# -*- coding: utf-8 -*- ######################################### # Code Modified to use curl_cffi ######################################### import asyncio import json import os import random import re import string from pathlib import Path from datetime import datetime from typing import Dict, List, Tuple, Union, Optional from gemini_client.enums import Endpoint, Headers, Model # Use curl_cffi for requests from curl_cffi import CurlError from curl_cffi.requests import AsyncSession # Import common request exceptions (curl_cffi often wraps these) from requests.exceptions import RequestException, Timeout, HTTPError # For image models using validation. Adjust based on organization internal pydantic. # Updated import for Pydantic V2 from pydantic import BaseModel, field_validator # Rich is retained for logging within image methods. from rich.console import Console from rich.markdown import Markdown console = Console() ######################################### # New Enums and functions for endpoints, # headers, models, file upload and images. ######################################### ######################################### # Cookie loading and Chatbot classes ######################################### from gemini_client.utils import upload_file, load_cookies, save_cookies class Chatbot: """ Synchronous wrapper for the AsyncChatbot class. This class provides a synchronous interface to interact with Google Gemini, handling authentication, conversation management, and message sending. Attributes: loop (asyncio.AbstractEventLoop): Event loop for running async tasks. secure_1psid (str): Authentication cookie. secure_1psidts (str): Authentication cookie. async_chatbot (AsyncChatbot): Underlying asynchronous chatbot instance. """ def __init__( self, cookie_path: str, proxy: Optional[Union[str, Dict[str, str]]] = None, # Allow string or dict proxy timeout: int = 20, model: Model = Model.UNSPECIFIED, impersonate: str = "chrome110" # Added impersonate ): # Use asyncio.run() for cleaner async execution in sync context # Handle potential RuntimeError if an event loop is already running try: self.loop = asyncio.get_running_loop() except RuntimeError: self.loop = asyncio.new_event_loop() asyncio.set_event_loop(self.loop) self.secure_1psid, self.secure_1psidts, additional_cookies = load_cookies(cookie_path) self.async_chatbot = self.loop.run_until_complete( AsyncChatbot.create(self.secure_1psid, self.secure_1psidts, proxy, timeout, model, impersonate, additional_cookies, cookie_path) ) def save_conversation(self, file_path: str, conversation_name: str): return self.loop.run_until_complete( self.async_chatbot.save_conversation(file_path, conversation_name) ) def load_conversations(self, file_path: str) -> List[Dict]: return self.loop.run_until_complete( self.async_chatbot.load_conversations(file_path) ) def load_conversation(self, file_path: str, conversation_name: str) -> bool: return self.loop.run_until_complete( self.async_chatbot.load_conversation(file_path, conversation_name) ) def refresh_cookie(self) -> bool: """ Atualiza proativamente o cookie __Secure-1PSIDTS e salva automaticamente. Útil para atualizar o cookie antes que expire. Returns: bool: True se o cookie foi atualizado com sucesso, False caso contrário. """ return self.loop.run_until_complete( self.async_chatbot.refresh_cookie() ) def ask( self, message: str, image: Optional[Union[bytes, str, Path]] = None, video: Optional[Union[bytes, str, Path]] = None, audio: Optional[Union[bytes, str, Path]] = None ) -> dict: # Pass media to async ask method return self.loop.run_until_complete(self.async_chatbot.ask(message, image=image, video=video, audio=audio)) class AsyncChatbot: """ Asynchronous chatbot client for interacting with Google Gemini using curl_cffi. This class manages authentication, session state, conversation history, and sending/receiving messages (including images) asynchronously. Attributes: headers (dict): HTTP headers for requests. _reqid (int): Request identifier for Gemini API. SNlM0e (str): Session token required for API requests. conversation_id (str): Current conversation ID. response_id (str): Current response ID. choice_id (str): Current choice ID. proxy (str | dict | None): Proxy configuration. proxies_dict (dict | None): Proxy dictionary for curl_cffi. secure_1psid (str): Authentication cookie. secure_1psidts (str): Authentication cookie. session (AsyncSession): curl_cffi session for HTTP requests. timeout (int): Request timeout in seconds. model (Model): Selected Gemini model. impersonate (str): Browser profile for curl_cffi to impersonate. """ __slots__ = [ "headers", "_reqid", "SNlM0e", "conversation_id", "response_id", "choice_id", "proxy", # Store the original proxy config "proxies_dict", # Store the curl_cffi-compatible proxy dict "secure_1psidts", "secure_1psid", "session", "timeout", "model", "impersonate", # Store impersonate setting "cookie_path", # Path to cookie file for auto-saving "additional_cookies", # Store additional cookies ] def __init__( self, secure_1psid: str, secure_1psidts: str, proxy: Optional[Union[str, Dict[str, str]]] = None, # Allow string or dict proxy timeout: int = 20, model: Model = Model.UNSPECIFIED, impersonate: str = "chrome110", # Added impersonate additional_cookies: Optional[Dict[str, str]] = None, # Additional cookies like COMPASS cookie_path: Optional[str] = None, # Path to cookie file for auto-saving ): headers = Headers.GEMINI.value.copy() if model != Model.UNSPECIFIED: headers.update(model.model_header) self._reqid = int("".join(random.choices(string.digits, k=7))) # Increased length for less collision chance self.proxy = proxy # Store original proxy setting self.impersonate = impersonate # Store impersonate setting # Prepare proxy dictionary for curl_cffi self.proxies_dict = None if isinstance(proxy, str): self.proxies_dict = {"http": proxy, "https": proxy} # curl_cffi uses http/https keys elif isinstance(proxy, dict): self.proxies_dict = proxy # Assume it's already in the correct format self.conversation_id = "" self.response_id = "" self.choice_id = "" self.secure_1psid = secure_1psid self.secure_1psidts = secure_1psidts # Prepare cookies dict with required cookies and any additional ones cookies_dict = { "__Secure-1PSID": secure_1psid, "__Secure-1PSIDTS": secure_1psidts } if additional_cookies: cookies_dict.update(additional_cookies) # Initialize curl_cffi AsyncSession self.session = AsyncSession( headers=headers, cookies=cookies_dict, proxies=self.proxies_dict, timeout=timeout, impersonate=self.impersonate # verify and http2 are handled automatically by curl_cffi ) # No need to set proxies/headers/cookies again, done in constructor self.timeout = timeout # Store timeout for potential direct use in requests self.model = model self.SNlM0e = None # Initialize SNlM0e self.cookie_path = cookie_path # Store cookie path for auto-saving self.additional_cookies = additional_cookies or {} # Store additional cookies @classmethod async def create( cls, secure_1psid: str, secure_1psidts: str, proxy: Optional[Union[str, Dict[str, str]]] = None, # Allow string or dict proxy timeout: int = 20, model: Model = Model.UNSPECIFIED, impersonate: str = "chrome110", # Added impersonate additional_cookies: Optional[Dict[str, str]] = None, # Additional cookies like COMPASS cookie_path: Optional[str] = None, # Path to cookie file for auto-saving ) -> "AsyncChatbot": """ Factory method to create and initialize an AsyncChatbot instance. Fetches the necessary SNlM0e value asynchronously. """ instance = cls(secure_1psid, secure_1psidts, proxy, timeout, model, impersonate, additional_cookies, cookie_path) try: instance.SNlM0e = await instance.__get_snlm0e() except Exception as e: # Log the error and re-raise or handle appropriately console.log(f"[red]Error during AsyncChatbot initialization (__get_snlm0e): {e}[/red]", style="bold red") # Optionally close the session if initialization fails critically await instance.session.close() # Use close() for AsyncSession raise # Re-raise the exception to signal failure return instance async def refresh_cookie(self) -> bool: """ Atualiza proativamente o cookie __Secure-1PSIDTS e salva automaticamente. Útil para atualizar o cookie antes que expire. Returns: bool: True se o cookie foi atualizado com sucesso, False caso contrário. """ try: await self.__rotate_cookies() # Atualizar SNlM0e após atualizar o cookie self.SNlM0e = await self.__get_snlm0e() return True except Exception as e: console.log(f"[red]Falha ao atualizar cookie proativamente: {e}[/red]") return False async def save_conversation(self, file_path: str, conversation_name: str) -> None: # Logic remains the same conversations = await self.load_conversations(file_path) conversation_data = { "conversation_name": conversation_name, "_reqid": self._reqid, "conversation_id": self.conversation_id, "response_id": self.response_id, "choice_id": self.choice_id, "SNlM0e": self.SNlM0e, "model_name": self.model.model_name, # Save the model used "timestamp": datetime.now().isoformat(), # Add timestamp } found = False for i, conv in enumerate(conversations): if conv.get("conversation_name") == conversation_name: conversations[i] = conversation_data # Update existing found = True break if not found: conversations.append(conversation_data) # Add new try: # Ensure directory exists Path(file_path).parent.mkdir(parents=True, exist_ok=True) with open(file_path, "w", encoding="utf-8") as f: json.dump(conversations, f, indent=4, ensure_ascii=False) except IOError as e: console.log(f"[red]Error saving conversation to {file_path}: {e}[/red]") raise async def load_conversations(self, file_path: str) -> List[Dict]: # Logic remains the same if not os.path.isfile(file_path): return [] try: with open(file_path, 'r', encoding="utf-8") as f: return json.load(f) except (json.JSONDecodeError, IOError) as e: console.log(f"[red]Error loading conversations from {file_path}: {e}[/red]") return [] async def load_conversation(self, file_path: str, conversation_name: str) -> bool: # Logic remains the same, but update headers on the session conversations = await self.load_conversations(file_path) for conversation in conversations: if conversation.get("conversation_name") == conversation_name: try: self._reqid = conversation["_reqid"] self.conversation_id = conversation["conversation_id"] self.response_id = conversation["response_id"] self.choice_id = conversation["choice_id"] self.SNlM0e = conversation["SNlM0e"] if "model_name" in conversation: try: self.model = Model.from_name(conversation["model_name"]) # Update headers in the session if model changed self.session.headers.update(self.model.model_header) except ValueError as e: console.log(f"[yellow]Warning: Model '{conversation['model_name']}' from saved conversation not found. Using current model '{self.model.model_name}'. Error: {e}[/yellow]") console.log(f"Loaded conversation '{conversation_name}'") return True except KeyError as e: console.log(f"[red]Error loading conversation '{conversation_name}': Missing key {e}[/red]") return False console.log(f"[yellow]Conversation '{conversation_name}' not found in {file_path}[/yellow]") return False async def __get_snlm0e(self): """Fetches the SNlM0e value required for API requests using curl_cffi.""" if not self.secure_1psid: raise ValueError("__Secure-1PSID cookie is required.") try: # Use the session's get method resp = await self.session.get( Endpoint.INIT.value, timeout=self.timeout # Timeout is already set in session, but can override # follow_redirects is handled automatically by curl_cffi ) resp.raise_for_status() # Raise HTTPError for bad responses (4xx or 5xx) # Check for authentication issues if "Sign in to continue" in resp.text or "accounts.google.com" in str(resp.url): raise PermissionError("Authentication failed. Cookies might be invalid or expired. Please update them.") # Regex to find the SNlM0e value snlm0e_match = re.search(r'''["']SNlM0e["']\s*:\s*["'](.*?)["']''', resp.text) if not snlm0e_match: # Se não encontrou SNlM0e, pode ser cookie expirado - tentar atualizar console.log("[yellow]SNlM0e não encontrado na resposta. Tentando atualizar cookie...[/yellow]") try: # Tentar atualizar cookie new_cookie = await self.__rotate_cookies() # Se obteve novo cookie, atualizar na sessão e tentar novamente if new_cookie and new_cookie != self.secure_1psidts: # Retentar buscar SNlM0e após atualizar cookie resp = await self.session.get( Endpoint.INIT.value, timeout=self.timeout ) resp.raise_for_status() if "Sign in to continue" in resp.text or "accounts.google.com" in str(resp.url): raise PermissionError("Authentication failed. Cookies might be invalid or expired. Please update them.") snlm0e_match = re.search(r'''["']SNlM0e["']\s*:\s*["'](.*?)["']''', resp.text) if snlm0e_match: console.log("[green]✓ SNlM0e obtido com sucesso após atualização do cookie![/green]") return snlm0e_match.group(1) # Se não conseguiu atualizar ou ainda não encontrou SNlM0e error_message = "SNlM0e value not found in response." if resp.status_code == 429: error_message += " Rate limit likely exceeded." else: error_message += ( f" Response status: {resp.status_code}. " f"Por favor, acesse https://gemini.google.com/app e copie os novos cookies " f"__Secure-1PSID e __Secure-1PSIDTS manualmente." ) raise ValueError(error_message) except ValueError as rotate_error: # Re-raise ValueError (já tem mensagem clara sobre atualização manual) raise except Exception as rotate_error: console.log(f"[red]Falha ao atualizar cookie: {rotate_error}[/red]") error_message = "SNlM0e value not found in response." if resp.status_code == 429: error_message += " Rate limit likely exceeded." else: error_message += ( f" Response status: {resp.status_code}. " f"Falha ao atualizar cookie automaticamente. " f"Por favor, acesse https://gemini.google.com/app e copie os novos cookies " f"__Secure-1PSID e __Secure-1PSIDTS manualmente." ) raise ValueError(error_message) # Try to refresh PSIDTS if needed if not self.secure_1psidts and "PSIDTS" not in self.session.cookies: try: # Attempt to rotate cookies to get a fresh PSIDTS await self.__rotate_cookies() except Exception as e: console.log(f"[yellow]Warning: Could not refresh PSIDTS cookie: {e}[/yellow]") # Continue anyway as some accounts don't need PSIDTS return snlm0e_match.group(1) except Timeout as e: # Catch requests.exceptions.Timeout raise TimeoutError(f"Request timed out while fetching SNlM0e: {e}") from e except (RequestException, CurlError) as e: # Catch general request errors and Curl specific errors raise ConnectionError(f"Network error while fetching SNlM0e: {e}") from e except HTTPError as e: # Catch requests.exceptions.HTTPError # Tentar atualizar cookie automaticamente se for erro de autenticação if e.response.status_code in (401, 403): console.log(f"[yellow]Erro de autenticação ao buscar SNlM0e (status {e.response.status_code}). Tentando atualizar cookie...[/yellow]") try: await self.__rotate_cookies() # Retentar buscar SNlM0e após atualizar cookie resp = await self.session.get( Endpoint.INIT.value, timeout=self.timeout ) resp.raise_for_status() if "Sign in to continue" in resp.text or "accounts.google.com" in str(resp.url): raise PermissionError("Authentication failed. Cookies might be invalid or expired. Please update them.") snlm0e_match = re.search(r'''["']SNlM0e["']\s*:\s*["'](.*?)["']''', resp.text) if snlm0e_match: console.log("[green]✓ SNlM0e obtido com sucesso após atualização do cookie![/green]") return snlm0e_match.group(1) else: raise ValueError("SNlM0e value not found in response after cookie update.") except Exception as rotate_error: console.log(f"[red]Falha ao atualizar cookie: {rotate_error}[/red]") raise PermissionError(f"Authentication failed (status {e.response.status_code}). Cookie update failed. {e}") from e else: raise Exception(f"HTTP error {e.response.status_code} while fetching SNlM0e: {e}") from e async def __rotate_cookies(self): """ Rotates the __Secure-1PSIDTS cookie and saves it automatically. Returns the new cookie value if successful. Usa todos os cookies disponíveis para autenticação. """ try: console.log("[yellow]Atualizando cookie __Secure-1PSIDTS automaticamente...[/yellow]") # Método mais confiável: Acessar a página do Gemini para obter novos cookies # Isso funciona porque o Google atualiza o cookie automaticamente quando você acessa a página console.log("[cyan]Acessando página do Gemini para obter novo cookie...[/cyan]") init_response = await self.session.get( Endpoint.INIT.value, timeout=self.timeout ) init_response.raise_for_status() # Verificar se obteve novo cookie na resposta new_1psidts = init_response.cookies.get("__Secure-1PSIDTS") if new_1psidts and new_1psidts != self.secure_1psidts: self.secure_1psidts = new_1psidts self.session.cookies.set("__Secure-1PSIDTS", new_1psidts) if self.cookie_path: try: save_cookies( self.cookie_path, self.secure_1psid, self.secure_1psidts, self.additional_cookies ) console.log("[green]✓ Cookie atualizado e salvo automaticamente via acesso à página![/green]") except Exception as save_error: console.log(f"[yellow]Aviso: Cookie atualizado mas não foi possível salvar: {save_error}[/yellow]") return new_1psidts # Se não obteve novo cookie, tentar endpoint de rotação como fallback console.log("[yellow]Não obteve novo cookie via página, tentando endpoint de rotação...[/yellow]") try: response = await self.session.post( Endpoint.ROTATE_COOKIES.value, headers={ "Content-Type": "application/json", "Origin": "https://accounts.google.com", "Referer": "https://accounts.google.com/", "X-Requested-With": "XMLHttpRequest" }, json=[0, "-0000000000000000000"], timeout=self.timeout ) if response.status_code == 200: new_1psidts = response.cookies.get("__Secure-1PSIDTS") if new_1psidts: self.secure_1psidts = new_1psidts self.session.cookies.set("__Secure-1PSIDTS", new_1psidts) if self.cookie_path: try: save_cookies( self.cookie_path, self.secure_1psid, self.secure_1psidts, self.additional_cookies ) console.log("[green]✓ Cookie atualizado via endpoint de rotação![/green]") except Exception as save_error: console.log(f"[yellow]Aviso: Cookie atualizado mas não foi possível salvar: {save_error}[/yellow]") return new_1psidts # Se endpoint retornou 404, pode ser que não exista mais ou mudeu if response.status_code == 404: console.log("[yellow]Endpoint de rotação retornou 404 - pode ter mudado ou não estar disponível[/yellow]") except Exception as rotate_error: console.log(f"[yellow]Endpoint de rotação falhou: {rotate_error}[/yellow]") # Se não conseguiu atualizar, retornar o cookie atual (pode ainda ser válido) console.log("[yellow]Não foi possível obter novo cookie - usando cookie atual[/yellow]") return self.secure_1psidts except Exception as e: console.log(f"[red]Falha ao atualizar cookie: {e}[/red]") # Se falhou completamente, retornar o cookie atual em vez de falhar # Isso permite que o sistema continue funcionando mesmo se não conseguir atualizar console.log("[yellow]Retornando cookie atual - sistema continuará funcionando[/yellow]") return self.secure_1psidts async def ask( self, message: str, image: Optional[Union[bytes, str, Path]] = None, video: Optional[Union[bytes, str, Path]] = None, audio: Optional[Union[bytes, str, Path]] = None ) -> dict: """ Sends a message to Google Gemini and returns the response using curl_cffi. Parameters: message: str The message to send. image: Optional[Union[bytes, str, Path]] Optional image data (bytes) or path to an image file to include. video: Optional[Union[bytes, str, Path]] Optional video data (bytes) or path to a video file to include. audio: Optional[Union[bytes, str, Path]] Optional audio data (bytes) or path to an audio file to include. Returns: dict: A dictionary containing the response content and metadata. """ if self.SNlM0e is None: raise RuntimeError("AsyncChatbot not properly initialized. Call AsyncChatbot.create()") params = { "bl": "boq_assistant-bard-web-server_20240625.13_p0", "_reqid": str(self._reqid), "rt": "c", } # Handle media upload (image, video, or audio) - only one at a time media_upload_id = None media_filename = None media_mime_type = None media_file = None media_type = None # Initialize to None # Determine which media type to use (priority: image > video > audio) if image: media_file = image media_type = "image" elif video: media_file = video media_type = "video" elif audio: media_file = audio media_type = "audio" if media_file: try: # Get filename and mime type if file is a path if not isinstance(media_file, bytes): file_path = Path(media_file) media_filename = file_path.name import mimetypes media_mime_type, _ = mimetypes.guess_type(str(file_path)) if not media_mime_type: # Fallback based on file extension ext = file_path.suffix.lower() if media_type == "video" or ext in ['.mp4', '.avi', '.mov', '.webm', '.mkv']: media_mime_type = "video/mp4" elif media_type == "audio" or ext in ['.mp3', '.wav', '.ogg', '.flac', '.aac', '.m4a']: media_mime_type = "audio/mpeg" else: media_mime_type = "image/jpeg" else: # Default filenames for bytes if media_type == "video": media_filename = "video.mp4" media_mime_type = "video/mp4" elif media_type == "audio": media_filename = "audio.mp3" media_mime_type = "audio/mpeg" else: media_filename = "image.jpg" media_mime_type = "image/jpeg" # Pass proxy and impersonate settings to upload_file media_upload_id = await upload_file(media_file, proxy=self.proxies_dict, impersonate=self.impersonate) console.log(f"{media_type.capitalize()} uploaded successfully. ID: {media_upload_id}") except Exception as e: console.log(f"[red]Error uploading {media_type}: {e}[/red]") return {"content": f"Error uploading {media_type}: {e}", "error": True} # Always start a new conversation (no history/context) # Reset conversation IDs to ensure each message is independent self.conversation_id = "" self.response_id = "" self.choice_id = "" # Prepare message structure if media_upload_id: # Structure when media (image/video/audio) is included (based on real Gemini request): # The structure has many more fields than just the message # Element 0: [message, 0, null, [[[media_id, 1, null, mime_type], filename, null*7, [0]]], null, null, 0] # Element 1: ["pt"] - language # Element 2: ["", "", "", null, null, null, null, null, null, ""] - empty strings array # Element 3+: Many more fields (tokens, hashes, etc.) media_data = [ [media_upload_id, 1, None, media_mime_type], media_filename, None, None, None, None, None, None, [0] ] # Build the complete structure with all required fields message_struct = [ [message, 0, None, [media_data], None, None, 0], # Element 0: message with media ["pt"], # Element 1: language ["", "", "", None, None, None, None, None, None, ""], # Element 2: empty strings array # Note: The real request has many more fields (tokens, hashes, etc.) but # we'll start with the minimal required structure and see if it works ] else: # Even without media, start fresh conversation message_struct = [ [message], None, ["", "", ""], # Empty conversation IDs for new conversation ] # Prepare request data # Use separators to match exact format (no spaces) like Gemini expects inner_json = json.dumps(message_struct, ensure_ascii=False, separators=(',', ':')) data = { "f.req": json.dumps([None, inner_json], ensure_ascii=False, separators=(',', ':')), "at": self.SNlM0e, } try: # Debug: log the request structure if there's media if media_upload_id: console.log(f"[cyan]Debug - Message struct with {media_type}:[/cyan] {json.dumps(message_struct, indent=2)[:500]}") console.log(f"[cyan]Debug - {media_type.capitalize()} ID:[/cyan] {media_upload_id}") console.log(f"[cyan]Debug - {media_type.capitalize()} filename:[/cyan] {media_filename}") console.log(f"[cyan]Debug - {media_type.capitalize()} mime type:[/cyan] {media_mime_type}") # Send request # Use longer timeout for videos and audios (they take longer to process) request_timeout = self.timeout if media_upload_id: if media_type == "video": request_timeout = max(self.timeout, 300) # At least 5 minutes for videos (they can be slow) elif media_type == "audio": request_timeout = max(self.timeout, 180) # At least 3 minutes for audios else: request_timeout = max(self.timeout, 60) # At least 1 minute for images resp = await self.session.post( Endpoint.GENERATE.value, params=params, data=data, timeout=request_timeout, ) # Check status before raising if resp.status_code != 200: # Detectar erros de autenticação e tentar atualizar cookie automaticamente if resp.status_code in (401, 403): console.log(f"[yellow]Erro de autenticação detectado (status {resp.status_code}). Tentando atualizar cookie automaticamente...[/yellow]") try: # Tentar atualizar o cookie await self.__rotate_cookies() # Atualizar SNlM0e com o novo cookie self.SNlM0e = await self.__get_snlm0e() # Atualizar o token na requisição data["at"] = self.SNlM0e # Retentar a requisição uma vez console.log("[cyan]Retentando requisição com cookie atualizado...[/cyan]") resp = await self.session.post( Endpoint.GENERATE.value, params=params, data=data, timeout=request_timeout, ) if resp.status_code == 200: console.log("[green]✓ Requisição bem-sucedida após atualização do cookie![/green]") else: console.log(f"[red]Ainda recebendo status {resp.status_code} após atualização do cookie[/red]") except Exception as rotate_error: console.log(f"[red]Falha ao atualizar cookie automaticamente: {rotate_error}[/red]") if resp.status_code != 200: console.log(f"[red]Non-200 status code: {resp.status_code}[/red]") console.log(f"[yellow]Response headers:[/yellow] {dict(resp.headers)}") console.log(f"[yellow]Response text (first 3000 chars):[/yellow]\n{resp.text[:3000]}") console.log(f"[yellow]Request URL:[/yellow] {resp.url}") console.log(f"[yellow]Request params:[/yellow] {params}") # Log the data being sent (sanitized) debug_data = data.copy() if 'f.req' in debug_data: # Show first 500 chars of f.req console.log(f"[yellow]f.req (first 500 chars):[/yellow] {debug_data['f.req'][:500]}") resp.raise_for_status() # Process response lines = resp.text.splitlines() if len(lines) < 3: raise ValueError(f"Unexpected response format. Status: {resp.status_code}. Content: {resp.text[:200]}...") # Find the line with the response data - process all JSON lines body = None body_index = 0 response_json = None # Store the full response_json for model extraction all_parsed_parts = [] # Store all parsed parts for model search # Try to parse all JSON lines in the response for line_index, line in enumerate(lines): # Skip empty lines and the prefix line if not line.strip() or line.startswith(")]}'") or line.isdigit(): continue # Try to parse JSON lines if line.startswith("["): try: parsed_json = json.loads(line) response_json = parsed_json # Store for later use # Process all parts in this JSON array for part_index, part in enumerate(parsed_json): try: if isinstance(part, list) and len(part) > 2: # part[2] might be a string that needs parsing if isinstance(part[2], str): main_part = json.loads(part[2]) else: main_part = part[2] # Store all parsed parts for model search if main_part: all_parsed_parts.append(main_part) # Check if this part contains conversation data if main_part and isinstance(main_part, list) and len(main_part) > 4 and main_part[4]: body = main_part body_index = part_index except (IndexError, TypeError, json.JSONDecodeError, AttributeError): continue # If we found a body, stop looking if body: break except json.JSONDecodeError: continue if not body: return {"content": "Failed to parse response body. No valid data found.", "error": True} # Extract data from the response try: # Extract main content content = "" if len(body) > 4 and len(body[4]) > 0 and len(body[4][0]) > 1: content = body[4][0][1][0] if len(body[4][0][1]) > 0 else "" # Extract conversation metadata conversation_id = body[1][0] if len(body) > 1 and len(body[1]) > 0 else self.conversation_id response_id = body[1][1] if len(body) > 1 and len(body[1]) > 1 else self.response_id # Extract additional data factualityQueries = body[3] if len(body) > 3 else None textQuery = body[2][0] if len(body) > 2 and body[2] else "" # Extract choices choices = [] if len(body) > 4: for candidate in body[4]: if len(candidate) > 1 and isinstance(candidate[1], list) and len(candidate[1]) > 0: choices.append({"id": candidate[0], "content": candidate[1][0]}) choice_id = choices[0]["id"] if choices else self.choice_id # Extract images - multiple possible formats images = [] # Format 1: Regular web images if len(body) > 4 and len(body[4]) > 0 and len(body[4][0]) > 4 and body[4][0][4]: for img_data in body[4][0][4]: try: img_url = img_data[0][0][0] img_alt = img_data[2] if len(img_data) > 2 else "" img_title = img_data[1] if len(img_data) > 1 else "[Image]" images.append({"url": img_url, "alt": img_alt, "title": img_title}) except (IndexError, TypeError): console.log("[yellow]Warning: Could not parse image data structure (format 1).[/yellow]") continue # Format 2: Generated images in standard location generated_images = [] if len(body) > 4 and len(body[4]) > 0 and len(body[4][0]) > 12 and body[4][0][12]: try: # Path 1: Check for images in [12][7][0] if body[4][0][12][7] and body[4][0][12][7][0]: # This is the standard path for generated images for img_index, img_data in enumerate(body[4][0][12][7][0]): try: img_url = img_data[0][3][3] img_title = f"[Generated Image {img_index+1}]" img_alt = img_data[3][5][0] if len(img_data[3]) > 5 and len(img_data[3][5]) > 0 else "" generated_images.append({"url": img_url, "alt": img_alt, "title": img_title}) except (IndexError, TypeError): continue # If we found images, but they might be in a different part of the response if not generated_images: # Look for image generation data in other response parts for part_index, part in enumerate(response_json): if part_index <= body_index: continue try: img_part = json.loads(part[2]) if img_part[4][0][12][7][0]: for img_index, img_data in enumerate(img_part[4][0][12][7][0]): try: img_url = img_data[0][3][3] img_title = f"[Generated Image {img_index+1}]" img_alt = img_data[3][5][0] if len(img_data[3]) > 5 and len(img_data[3][5]) > 0 else "" generated_images.append({"url": img_url, "alt": img_alt, "title": img_title}) except (IndexError, TypeError): continue break except (IndexError, TypeError, json.JSONDecodeError): continue except (IndexError, TypeError): pass # Format 3: Alternative location for generated images if len(generated_images) == 0 and len(body) > 4 and len(body[4]) > 0: try: # Try to find images in candidate[4] structure candidate = body[4][0] if len(candidate) > 22 and candidate[22]: # Look for URLs in the candidate[22] field import re content = candidate[22][0] if isinstance(candidate[22], list) and len(candidate[22]) > 0 else str(candidate[22]) urls = re.findall(r'https?://[^\s]+', content) for i, url in enumerate(urls): # Clean up URL if it ends with punctuation if url[-1] in ['.', ',', ')', ']', '}', '"', "'"]: url = url[:-1] generated_images.append({ "url": url, "title": f"[Generated Image {i+1}]", "alt": "" }) except (IndexError, TypeError) as e: console.log(f"[yellow]Warning: Could not parse alternative image structure: {e}[/yellow]") # Format 4: Look for image URLs in the text content if len(images) == 0 and len(generated_images) == 0 and content: try: import re # Look for image URLs in the content - try multiple patterns # Pattern 1: Standard image URLs urls = re.findall(r'(https?://[^\s]+\.(jpg|jpeg|png|gif|webp))', content.lower()) # Pattern 2: Google image URLs (which might not have extensions) google_urls = re.findall(r'(https?://lh\d+\.googleusercontent\.com/[^\s]+)', content) # Pattern 3: General URLs that might be images general_urls = re.findall(r'(https?://[^\s]+)', content) # Combine all found URLs all_urls = [] if urls: all_urls.extend([url_tuple[0] for url_tuple in urls]) if google_urls: all_urls.extend(google_urls) # Add general URLs only if we didn't find any specific image URLs if not all_urls and general_urls: all_urls = general_urls # Process all found URLs if all_urls: for i, url in enumerate(all_urls): # Clean up URL if it ends with punctuation if url[-1] in ['.', ',', ')', ']', '}', '"', "'"]: url = url[:-1] images.append({ "url": url, "title": f"[Image in Content {i+1}]", "alt": "" }) console.log(f"[green]Found {len(all_urls)} potential image URLs in content.[/green]") except Exception as e: console.log(f"[yellow]Warning: Error extracting URLs from content: {e}[/yellow]") # Combine all images all_images = images + generated_images # Extract model name from response # Model appears near the end of the structure, look for strings like "Fast", "3 Pro", "Thinking" model_name = self.model.model_name # Default fallback try: def find_model_in_structure(obj, depth=0, max_depth=15): """Recursively search for model name""" if depth > max_depth: return None if isinstance(obj, list): # Check elements from end to beginning (model is usually near the end) for i in range(len(obj) - 1, -1, -1): item = obj[i] if isinstance(item, str): # Common model names: Fast, 3 Pro, Thinking, Pro, Exp # Check for known model patterns if item in ["Fast", "3 Pro", "Thinking", "Pro", "Exp", "Raciocinio"]: # Verify it's followed by a boolean (typical pattern: model name, then bool) if i + 1 < len(obj) and isinstance(obj[i + 1], bool): return item # Also check for patterns like "Pro" in the string elif any(pattern in item for pattern in ["Pro", "Fast", "Thinking"]): # More careful check - should be a short string if len(item) < 20 and (i + 1 < len(obj) and isinstance(obj[i + 1], bool)): return item elif isinstance(item, (list, dict)): result = find_model_in_structure(item, depth + 1, max_depth) if result: return result elif isinstance(obj, dict): for value in obj.values(): result = find_model_in_structure(value, depth + 1, max_depth) if result: return result return None # Search in the body structure first found_model = find_model_in_structure(body) # If not found in body, search in all parsed parts if not found_model: for parsed_part in all_parsed_parts: found_model = find_model_in_structure(parsed_part) if found_model: break # If still not found, search in the full response_json structure if not found_model and response_json: found_model = find_model_in_structure(response_json) if found_model: model_name = found_model except Exception as e: console.log(f"[yellow]Warning: Could not extract model name from response: {e}[/yellow]") # Use default model name # Prepare results results = { "content": content, "conversation_id": conversation_id, "response_id": response_id, "factualityQueries": factualityQueries, "textQuery": textQuery, "choices": choices, "images": all_images, "model": model_name, # Use extracted model name from response "error": False, } # Don't update state - we want each message to be independent (no conversation history) # Reset IDs to ensure next message is a new conversation self.conversation_id = "" self.response_id = "" self.choice_id = "" self._reqid += random.randint(1000, 9000) return results except (IndexError, TypeError) as e: console.log(f"[red]Error extracting data from response: {e}[/red]") return {"content": f"Error extracting data from response: {e}", "error": True} except json.JSONDecodeError as e: console.log(f"[red]Error parsing JSON response: {e}[/red]") return {"content": f"Error parsing JSON response: {e}. Response: {resp.text[:200]}...", "error": True} except Timeout as e: console.log(f"[red]Request timed out: {e}[/red]") return {"content": f"Request timed out: {e}", "error": True} except (RequestException, CurlError) as e: error_msg = f"Network error: {e}" # Try to get more details if it's an HTTPError wrapped if hasattr(e, 'response') and e.response is not None: try: error_msg += f"\nStatus: {e.response.status_code}" error_msg += f"\nResponse: {e.response.text[:1000]}" console.log(f"[red]{error_msg}[/red]") console.log(f"[yellow]Full response text (first 2000 chars):[/yellow]\n{e.response.text[:2000]}") except: pass console.log(f"[red]{error_msg}[/red]") return {"content": error_msg, "error": True} except HTTPError as e: # Detectar erros de autenticação e tentar atualizar cookie automaticamente if e.response.status_code in (401, 403): console.log(f"[yellow]Erro de autenticação detectado (status {e.response.status_code}). Tentando atualizar cookie automaticamente...[/yellow]") try: # Tentar atualizar o cookie await self.__rotate_cookies() # Atualizar SNlM0e com o novo cookie self.SNlM0e = await self.__get_snlm0e() # Retentar a requisição original console.log("[cyan]Retentando requisição com cookie atualizado...[/cyan]") return await self.ask(message, image=image, video=video, audio=audio) except Exception as rotate_error: console.log(f"[red]Falha ao atualizar cookie automaticamente: {rotate_error}[/red]") error_details = f"HTTP error {e.response.status_code}: {e}" try: error_text = e.response.text[:1000] if hasattr(e.response, 'text') else str(e.response) error_details += f"\nResponse: {error_text}" console.log(f"[red]{error_details}[/red]") # Log full response for debugging if hasattr(e.response, 'text'): console.log(f"[yellow]Full response text (first 2000 chars):[/yellow]\n{e.response.text[:2000]}") except: pass return {"content": error_details, "error": True} except Exception as e: console.log(f"[red]An unexpected error occurred during ask: {e}[/red]", style="bold red") return {"content": f"An unexpected error occurred: {e}", "error": True} ######################################### # Imports for refactored classes #########################################