|
|
|
|
|
import json |
|
|
import mimetypes |
|
|
from pathlib import Path |
|
|
from typing import Dict, Tuple, Union, Optional |
|
|
|
|
|
from curl_cffi import CurlError, CurlMime |
|
|
from curl_cffi.requests import AsyncSession |
|
|
from requests.exceptions import RequestException, HTTPError, Timeout |
|
|
|
|
|
from rich.console import Console |
|
|
|
|
|
|
|
|
from .enums import Endpoint, Headers |
|
|
|
|
|
console = Console() |
|
|
|
|
|
async def upload_file( |
|
|
file: Union[bytes, str, Path], |
|
|
proxy: Optional[Union[str, Dict[str, str]]] = None, |
|
|
impersonate: str = "chrome110" |
|
|
) -> str: |
|
|
""" |
|
|
Uploads a file to Google's Gemini server using curl_cffi and returns its identifier. |
|
|
|
|
|
Args: |
|
|
file (bytes | str | Path): File data in bytes or path to the file to be uploaded. |
|
|
proxy (str | dict, optional): Proxy URL or dictionary for the request. |
|
|
impersonate (str, optional): Browser profile for curl_cffi to impersonate. Defaults to "chrome110". |
|
|
|
|
|
Returns: |
|
|
str: Identifier of the uploaded file. |
|
|
|
|
|
Raises: |
|
|
HTTPError: If the upload request fails. |
|
|
RequestException: For other network-related errors. |
|
|
FileNotFoundError: If the file path does not exist. |
|
|
""" |
|
|
|
|
|
proxies_dict = None |
|
|
if isinstance(proxy, str): |
|
|
proxies_dict = {"http": proxy, "https": proxy} |
|
|
elif isinstance(proxy, dict): |
|
|
proxies_dict = proxy |
|
|
|
|
|
|
|
|
mp = CurlMime() |
|
|
|
|
|
try: |
|
|
|
|
|
if isinstance(file, bytes): |
|
|
|
|
|
|
|
|
content_type = "image/jpeg" |
|
|
filename = "file.jpg" |
|
|
mp.addpart( |
|
|
name="file", |
|
|
content_type=content_type, |
|
|
filename=filename, |
|
|
data=file |
|
|
) |
|
|
else: |
|
|
|
|
|
file_path = Path(file) |
|
|
if not file_path.is_file(): |
|
|
raise FileNotFoundError(f"File not found at path: {file}") |
|
|
|
|
|
|
|
|
content_type, _ = mimetypes.guess_type(str(file_path)) |
|
|
if not content_type: |
|
|
|
|
|
ext = file_path.suffix.lower() |
|
|
if ext in ['.mp4', '.avi', '.mov', '.webm', '.mkv', '.flv', '.m4v']: |
|
|
content_type = "video/mp4" |
|
|
elif ext in ['.mp3', '.wav', '.ogg', '.flac', '.aac', '.m4a', '.wma']: |
|
|
content_type = "audio/mpeg" |
|
|
elif ext in ['.jpg', '.jpeg']: |
|
|
content_type = "image/jpeg" |
|
|
elif ext == '.png': |
|
|
content_type = "image/png" |
|
|
elif ext == '.gif': |
|
|
content_type = "image/gif" |
|
|
elif ext == '.webp': |
|
|
content_type = "image/webp" |
|
|
else: |
|
|
|
|
|
content_type = "image/jpeg" |
|
|
|
|
|
mp.addpart( |
|
|
name="file", |
|
|
content_type=content_type, |
|
|
filename=file_path.name, |
|
|
local_path=str(file_path) |
|
|
) |
|
|
|
|
|
|
|
|
async with AsyncSession( |
|
|
proxies=proxies_dict, |
|
|
impersonate=impersonate, |
|
|
headers=Headers.UPLOAD.value |
|
|
|
|
|
) as client: |
|
|
response = await client.post( |
|
|
url=Endpoint.UPLOAD.value, |
|
|
multipart=mp, |
|
|
) |
|
|
response.raise_for_status() |
|
|
result = response.text |
|
|
return result |
|
|
except HTTPError as e: |
|
|
console.log(f"[red]HTTP error during file upload: {e.response.status_code} {e}[/red]") |
|
|
raise |
|
|
except (RequestException, CurlError) as e: |
|
|
console.log(f"[red]Network error during file upload: {e}[/red]") |
|
|
raise |
|
|
finally: |
|
|
|
|
|
mp.close() |
|
|
|
|
|
def load_cookies(cookie_path: str) -> Tuple[str, str, Dict[str, str]]: |
|
|
""" |
|
|
Loads authentication cookies from a JSON file or Netscape HTTP Cookie File format. |
|
|
Supports both formats and can read them from the same file. |
|
|
|
|
|
Args: |
|
|
cookie_path (str): Path to the file containing cookies (JSON or Netscape format). |
|
|
|
|
|
Returns: |
|
|
tuple[str, str, dict]: Tuple containing __Secure-1PSID, __Secure-1PSIDTS, and additional cookies dict. |
|
|
|
|
|
Raises: |
|
|
Exception: If the file is not found, invalid, or required cookies are missing. |
|
|
""" |
|
|
try: |
|
|
with open(cookie_path, 'r', encoding='utf-8') as file: |
|
|
content = file.read() |
|
|
|
|
|
session_auth1 = None |
|
|
session_auth2 = None |
|
|
additional_cookies = {} |
|
|
|
|
|
|
|
|
json_end = content.find(']') |
|
|
if json_end == -1: |
|
|
json_end = content.find('}') |
|
|
if json_end != -1: |
|
|
try: |
|
|
json_content = content[:json_end + 1] |
|
|
cookies = json.loads(json_content) |
|
|
|
|
|
for item in cookies: |
|
|
name_upper = item['name'].upper() |
|
|
if name_upper == '__SECURE-1PSID': |
|
|
session_auth1 = item['value'] |
|
|
elif name_upper == '__SECURE-1PSIDTS': |
|
|
session_auth2 = item['value'] |
|
|
else: |
|
|
|
|
|
additional_cookies[item['name']] = item['value'] |
|
|
except json.JSONDecodeError: |
|
|
pass |
|
|
|
|
|
|
|
|
|
|
|
for line in content.split('\n'): |
|
|
line = line.strip() |
|
|
|
|
|
if not line or line.startswith('#'): |
|
|
continue |
|
|
|
|
|
|
|
|
parts = line.split('\t') |
|
|
if len(parts) >= 7: |
|
|
domain = parts[0].strip() |
|
|
name = parts[5].strip() |
|
|
value = parts[6].strip() |
|
|
|
|
|
|
|
|
if '.gemini.google.com' in domain or '.google.com' in domain: |
|
|
name_upper = name.upper() |
|
|
if name_upper == '__SECURE-1PSID': |
|
|
if not session_auth1: |
|
|
session_auth1 = value |
|
|
elif name_upper == '__SECURE-1PSIDTS': |
|
|
if not session_auth2: |
|
|
session_auth2 = value |
|
|
else: |
|
|
|
|
|
additional_cookies[name] = value |
|
|
|
|
|
if not session_auth1 or not session_auth2: |
|
|
raise StopIteration("Required cookies (__Secure-1PSID or __Secure-1PSIDTS) not found.") |
|
|
|
|
|
return session_auth1, session_auth2, additional_cookies |
|
|
except FileNotFoundError: |
|
|
raise Exception(f"Cookie file not found at path: {cookie_path}") |
|
|
except StopIteration as e: |
|
|
raise Exception(f"{e} Check the cookie file format and content.") |
|
|
except Exception as e: |
|
|
raise Exception(f"An unexpected error occurred while loading cookies: {e}") |
|
|
|
|
|
def save_cookies(cookie_path: str, secure_1psid: str, secure_1psidts: str, additional_cookies: Optional[Dict[str, str]] = None) -> None: |
|
|
""" |
|
|
Saves authentication cookies to a JSON file. |
|
|
Updates existing cookies or creates a new file if it doesn't exist. |
|
|
|
|
|
Args: |
|
|
cookie_path (str): Path to the JSON file where cookies will be saved. |
|
|
secure_1psid (str): The __Secure-1PSID cookie value. |
|
|
secure_1psidts (str): The __Secure-1PSIDTS cookie value. |
|
|
additional_cookies (dict, optional): Additional cookies to save (e.g., COMPASS). |
|
|
|
|
|
Raises: |
|
|
Exception: If there's an error writing to the file. |
|
|
""" |
|
|
try: |
|
|
cookies_list = [ |
|
|
{ |
|
|
"name": "__Secure-1PSID", |
|
|
"value": secure_1psid |
|
|
}, |
|
|
{ |
|
|
"name": "__Secure-1PSIDTS", |
|
|
"value": secure_1psidts |
|
|
} |
|
|
] |
|
|
|
|
|
|
|
|
if additional_cookies: |
|
|
for name, value in additional_cookies.items(): |
|
|
|
|
|
if not any(cookie["name"] == name for cookie in cookies_list): |
|
|
cookies_list.append({ |
|
|
"name": name, |
|
|
"value": value |
|
|
}) |
|
|
|
|
|
|
|
|
Path(cookie_path).parent.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
|
|
|
with open(cookie_path, 'w', encoding='utf-8') as f: |
|
|
json.dump(cookies_list, f, indent=4, ensure_ascii=False) |
|
|
|
|
|
console.log(f"[green]Cookies atualizados e salvos em {cookie_path}[/green]") |
|
|
except Exception as e: |
|
|
console.log(f"[red]Erro ao salvar cookies: {e}[/red]") |
|
|
raise Exception(f"Erro ao salvar cookies: {e}") |