File size: 10,070 Bytes
df62731 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 |
# -*- coding: utf-8 -*-
import json
import mimetypes
from pathlib import Path
from typing import Dict, Tuple, Union, Optional
from curl_cffi import CurlError, CurlMime
from curl_cffi.requests import AsyncSession
from requests.exceptions import RequestException, HTTPError, Timeout # Added Timeout
from rich.console import Console
# Assuming Endpoint and Headers enums are in 'enums.py' within the same package
from .enums import Endpoint, Headers
console = Console() # Instantiate console for logging
async def upload_file(
file: Union[bytes, str, Path],
proxy: Optional[Union[str, Dict[str, str]]] = None,
impersonate: str = "chrome110"
) -> str:
"""
Uploads a file to Google's Gemini server using curl_cffi and returns its identifier.
Args:
file (bytes | str | Path): File data in bytes or path to the file to be uploaded.
proxy (str | dict, optional): Proxy URL or dictionary for the request.
impersonate (str, optional): Browser profile for curl_cffi to impersonate. Defaults to "chrome110".
Returns:
str: Identifier of the uploaded file.
Raises:
HTTPError: If the upload request fails.
RequestException: For other network-related errors.
FileNotFoundError: If the file path does not exist.
"""
# Prepare proxy dictionary for curl_cffi
proxies_dict = None
if isinstance(proxy, str):
proxies_dict = {"http": proxy, "https": proxy} # curl_cffi uses http/https keys
elif isinstance(proxy, dict):
proxies_dict = proxy # Assume it's already in the correct format
# Create CurlMime object for multipart upload
mp = CurlMime()
try:
# Handle file input - determine content type and filename
if isinstance(file, bytes):
# If bytes, we can't determine type, default to image/jpeg
# User should provide file path for better type detection
content_type = "image/jpeg"
filename = "file.jpg"
mp.addpart(
name="file",
content_type=content_type,
filename=filename,
data=file
)
else:
# If path, use the file directly
file_path = Path(file)
if not file_path.is_file():
raise FileNotFoundError(f"File not found at path: {file}")
# Determine content type from file extension
content_type, _ = mimetypes.guess_type(str(file_path))
if not content_type:
# Try to guess from extension
ext = file_path.suffix.lower()
if ext in ['.mp4', '.avi', '.mov', '.webm', '.mkv', '.flv', '.m4v']:
content_type = "video/mp4"
elif ext in ['.mp3', '.wav', '.ogg', '.flac', '.aac', '.m4a', '.wma']:
content_type = "audio/mpeg"
elif ext in ['.jpg', '.jpeg']:
content_type = "image/jpeg"
elif ext == '.png':
content_type = "image/png"
elif ext == '.gif':
content_type = "image/gif"
elif ext == '.webp':
content_type = "image/webp"
else:
# Default to image/jpeg if can't determine
content_type = "image/jpeg"
mp.addpart(
name="file",
content_type=content_type,
filename=file_path.name,
local_path=str(file_path)
)
# Use AsyncSession from curl_cffi
async with AsyncSession(
proxies=proxies_dict,
impersonate=impersonate,
headers=Headers.UPLOAD.value # Pass headers directly
# follow_redirects is handled automatically by curl_cffi
) as client:
response = await client.post(
url=Endpoint.UPLOAD.value, # Use Endpoint enum
multipart=mp,
)
response.raise_for_status() # Raises HTTPError for bad responses
result = response.text
return result
except HTTPError as e:
console.log(f"[red]HTTP error during file upload: {e.response.status_code} {e}[/red]")
raise # Re-raise HTTPError
except (RequestException, CurlError) as e: # Catch CurlError as well
console.log(f"[red]Network error during file upload: {e}[/red]")
raise # Re-raise other request errors
finally:
# Always close the multipart object to free memory
mp.close()
def load_cookies(cookie_path: str) -> Tuple[str, str, Dict[str, str]]:
"""
Loads authentication cookies from a JSON file or Netscape HTTP Cookie File format.
Supports both formats and can read them from the same file.
Args:
cookie_path (str): Path to the file containing cookies (JSON or Netscape format).
Returns:
tuple[str, str, dict]: Tuple containing __Secure-1PSID, __Secure-1PSIDTS, and additional cookies dict.
Raises:
Exception: If the file is not found, invalid, or required cookies are missing.
"""
try:
with open(cookie_path, 'r', encoding='utf-8') as file:
content = file.read()
session_auth1 = None
session_auth2 = None
additional_cookies = {}
# Try to parse JSON format first
json_end = content.find(']')
if json_end == -1:
json_end = content.find('}')
if json_end != -1:
try:
json_content = content[:json_end + 1]
cookies = json.loads(json_content)
for item in cookies:
name_upper = item['name'].upper()
if name_upper == '__SECURE-1PSID':
session_auth1 = item['value']
elif name_upper == '__SECURE-1PSIDTS':
session_auth2 = item['value']
else:
# Store additional cookies (like COMPASS, _ga_WC57KJ50ZZ, etc.)
additional_cookies[item['name']] = item['value']
except json.JSONDecodeError:
pass # Fall through to Netscape format parsing
# Also parse Netscape HTTP Cookie File format
# Format: domain\tflag\tpath\tsecure\texpiration\tname\tvalue
for line in content.split('\n'):
line = line.strip()
# Skip comments and empty lines
if not line or line.startswith('#'):
continue
# Parse Netscape cookie format
parts = line.split('\t')
if len(parts) >= 7:
domain = parts[0].strip()
name = parts[5].strip()
value = parts[6].strip()
# Look for cookies from .gemini.google.com or .google.com
if '.gemini.google.com' in domain or '.google.com' in domain:
name_upper = name.upper()
if name_upper == '__SECURE-1PSID':
if not session_auth1: # Only use if not already set from JSON
session_auth1 = value
elif name_upper == '__SECURE-1PSIDTS':
if not session_auth2: # Only use if not already set from JSON
session_auth2 = value
else:
# Store additional cookies (like COMPASS, _ga_WC57KJ50ZZ, etc.)
additional_cookies[name] = value
if not session_auth1 or not session_auth2:
raise StopIteration("Required cookies (__Secure-1PSID or __Secure-1PSIDTS) not found.")
return session_auth1, session_auth2, additional_cookies
except FileNotFoundError:
raise Exception(f"Cookie file not found at path: {cookie_path}")
except StopIteration as e:
raise Exception(f"{e} Check the cookie file format and content.")
except Exception as e: # Catch other potential errors
raise Exception(f"An unexpected error occurred while loading cookies: {e}")
def save_cookies(cookie_path: str, secure_1psid: str, secure_1psidts: str, additional_cookies: Optional[Dict[str, str]] = None) -> None:
"""
Saves authentication cookies to a JSON file.
Updates existing cookies or creates a new file if it doesn't exist.
Args:
cookie_path (str): Path to the JSON file where cookies will be saved.
secure_1psid (str): The __Secure-1PSID cookie value.
secure_1psidts (str): The __Secure-1PSIDTS cookie value.
additional_cookies (dict, optional): Additional cookies to save (e.g., COMPASS).
Raises:
Exception: If there's an error writing to the file.
"""
try:
cookies_list = [
{
"name": "__Secure-1PSID",
"value": secure_1psid
},
{
"name": "__Secure-1PSIDTS",
"value": secure_1psidts
}
]
# Add additional cookies if provided
if additional_cookies:
for name, value in additional_cookies.items():
# Skip if already in the list
if not any(cookie["name"] == name for cookie in cookies_list):
cookies_list.append({
"name": name,
"value": value
})
# Ensure directory exists
Path(cookie_path).parent.mkdir(parents=True, exist_ok=True)
# Write to file
with open(cookie_path, 'w', encoding='utf-8') as f:
json.dump(cookies_list, f, indent=4, ensure_ascii=False)
console.log(f"[green]Cookies atualizados e salvos em {cookie_path}[/green]")
except Exception as e:
console.log(f"[red]Erro ao salvar cookies: {e}[/red]")
raise Exception(f"Erro ao salvar cookies: {e}") |