# -*- coding: utf-8 -*- import json import mimetypes from pathlib import Path from typing import Dict, Tuple, Union, Optional from curl_cffi import CurlError, CurlMime from curl_cffi.requests import AsyncSession from requests.exceptions import RequestException, HTTPError, Timeout # Added Timeout from rich.console import Console # Assuming Endpoint and Headers enums are in 'enums.py' within the same package from .enums import Endpoint, Headers console = Console() # Instantiate console for logging async def upload_file( file: Union[bytes, str, Path], proxy: Optional[Union[str, Dict[str, str]]] = None, impersonate: str = "chrome110" ) -> str: """ Uploads a file to Google's Gemini server using curl_cffi and returns its identifier. Args: file (bytes | str | Path): File data in bytes or path to the file to be uploaded. proxy (str | dict, optional): Proxy URL or dictionary for the request. impersonate (str, optional): Browser profile for curl_cffi to impersonate. Defaults to "chrome110". Returns: str: Identifier of the uploaded file. Raises: HTTPError: If the upload request fails. RequestException: For other network-related errors. FileNotFoundError: If the file path does not exist. """ # Prepare proxy dictionary for curl_cffi proxies_dict = None if isinstance(proxy, str): proxies_dict = {"http": proxy, "https": proxy} # curl_cffi uses http/https keys elif isinstance(proxy, dict): proxies_dict = proxy # Assume it's already in the correct format # Create CurlMime object for multipart upload mp = CurlMime() try: # Handle file input - determine content type and filename if isinstance(file, bytes): # If bytes, we can't determine type, default to image/jpeg # User should provide file path for better type detection content_type = "image/jpeg" filename = "file.jpg" mp.addpart( name="file", content_type=content_type, filename=filename, data=file ) else: # If path, use the file directly file_path = Path(file) if not file_path.is_file(): raise FileNotFoundError(f"File not found at path: {file}") # Determine content type from file extension content_type, _ = mimetypes.guess_type(str(file_path)) if not content_type: # Try to guess from extension ext = file_path.suffix.lower() if ext in ['.mp4', '.avi', '.mov', '.webm', '.mkv', '.flv', '.m4v']: content_type = "video/mp4" elif ext in ['.mp3', '.wav', '.ogg', '.flac', '.aac', '.m4a', '.wma']: content_type = "audio/mpeg" elif ext in ['.jpg', '.jpeg']: content_type = "image/jpeg" elif ext == '.png': content_type = "image/png" elif ext == '.gif': content_type = "image/gif" elif ext == '.webp': content_type = "image/webp" else: # Default to image/jpeg if can't determine content_type = "image/jpeg" mp.addpart( name="file", content_type=content_type, filename=file_path.name, local_path=str(file_path) ) # Use AsyncSession from curl_cffi async with AsyncSession( proxies=proxies_dict, impersonate=impersonate, headers=Headers.UPLOAD.value # Pass headers directly # follow_redirects is handled automatically by curl_cffi ) as client: response = await client.post( url=Endpoint.UPLOAD.value, # Use Endpoint enum multipart=mp, ) response.raise_for_status() # Raises HTTPError for bad responses result = response.text return result except HTTPError as e: console.log(f"[red]HTTP error during file upload: {e.response.status_code} {e}[/red]") raise # Re-raise HTTPError except (RequestException, CurlError) as e: # Catch CurlError as well console.log(f"[red]Network error during file upload: {e}[/red]") raise # Re-raise other request errors finally: # Always close the multipart object to free memory mp.close() def load_cookies(cookie_path: str) -> Tuple[str, str, Dict[str, str]]: """ Loads authentication cookies from a JSON file or Netscape HTTP Cookie File format. Supports both formats and can read them from the same file. Args: cookie_path (str): Path to the file containing cookies (JSON or Netscape format). Returns: tuple[str, str, dict]: Tuple containing __Secure-1PSID, __Secure-1PSIDTS, and additional cookies dict. Raises: Exception: If the file is not found, invalid, or required cookies are missing. """ try: with open(cookie_path, 'r', encoding='utf-8') as file: content = file.read() session_auth1 = None session_auth2 = None additional_cookies = {} # Try to parse JSON format first json_end = content.find(']') if json_end == -1: json_end = content.find('}') if json_end != -1: try: json_content = content[:json_end + 1] cookies = json.loads(json_content) for item in cookies: name_upper = item['name'].upper() if name_upper == '__SECURE-1PSID': session_auth1 = item['value'] elif name_upper == '__SECURE-1PSIDTS': session_auth2 = item['value'] else: # Store additional cookies (like COMPASS, _ga_WC57KJ50ZZ, etc.) additional_cookies[item['name']] = item['value'] except json.JSONDecodeError: pass # Fall through to Netscape format parsing # Also parse Netscape HTTP Cookie File format # Format: domain\tflag\tpath\tsecure\texpiration\tname\tvalue for line in content.split('\n'): line = line.strip() # Skip comments and empty lines if not line or line.startswith('#'): continue # Parse Netscape cookie format parts = line.split('\t') if len(parts) >= 7: domain = parts[0].strip() name = parts[5].strip() value = parts[6].strip() # Look for cookies from .gemini.google.com or .google.com if '.gemini.google.com' in domain or '.google.com' in domain: name_upper = name.upper() if name_upper == '__SECURE-1PSID': if not session_auth1: # Only use if not already set from JSON session_auth1 = value elif name_upper == '__SECURE-1PSIDTS': if not session_auth2: # Only use if not already set from JSON session_auth2 = value else: # Store additional cookies (like COMPASS, _ga_WC57KJ50ZZ, etc.) additional_cookies[name] = value if not session_auth1 or not session_auth2: raise StopIteration("Required cookies (__Secure-1PSID or __Secure-1PSIDTS) not found.") return session_auth1, session_auth2, additional_cookies except FileNotFoundError: raise Exception(f"Cookie file not found at path: {cookie_path}") except StopIteration as e: raise Exception(f"{e} Check the cookie file format and content.") except Exception as e: # Catch other potential errors raise Exception(f"An unexpected error occurred while loading cookies: {e}") def save_cookies(cookie_path: str, secure_1psid: str, secure_1psidts: str, additional_cookies: Optional[Dict[str, str]] = None) -> None: """ Saves authentication cookies to a JSON file. Updates existing cookies or creates a new file if it doesn't exist. Args: cookie_path (str): Path to the JSON file where cookies will be saved. secure_1psid (str): The __Secure-1PSID cookie value. secure_1psidts (str): The __Secure-1PSIDTS cookie value. additional_cookies (dict, optional): Additional cookies to save (e.g., COMPASS). Raises: Exception: If there's an error writing to the file. """ try: cookies_list = [ { "name": "__Secure-1PSID", "value": secure_1psid }, { "name": "__Secure-1PSIDTS", "value": secure_1psidts } ] # Add additional cookies if provided if additional_cookies: for name, value in additional_cookies.items(): # Skip if already in the list if not any(cookie["name"] == name for cookie in cookies_list): cookies_list.append({ "name": name, "value": value }) # Ensure directory exists Path(cookie_path).parent.mkdir(parents=True, exist_ok=True) # Write to file with open(cookie_path, 'w', encoding='utf-8') as f: json.dump(cookies_list, f, indent=4, ensure_ascii=False) console.log(f"[green]Cookies atualizados e salvos em {cookie_path}[/green]") except Exception as e: console.log(f"[red]Erro ao salvar cookies: {e}[/red]") raise Exception(f"Erro ao salvar cookies: {e}")