File size: 10,070 Bytes
df62731
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
# -*- coding: utf-8 -*-
import json
import mimetypes
from pathlib import Path
from typing import Dict, Tuple, Union, Optional

from curl_cffi import CurlError, CurlMime
from curl_cffi.requests import AsyncSession
from requests.exceptions import RequestException, HTTPError, Timeout # Added Timeout

from rich.console import Console

# Assuming Endpoint and Headers enums are in 'enums.py' within the same package
from .enums import Endpoint, Headers

console = Console() # Instantiate console for logging

async def upload_file(
    file: Union[bytes, str, Path],
    proxy: Optional[Union[str, Dict[str, str]]] = None,
    impersonate: str = "chrome110"
) -> str:
    """
    Uploads a file to Google's Gemini server using curl_cffi and returns its identifier.

    Args:
        file (bytes | str | Path): File data in bytes or path to the file to be uploaded.
        proxy (str | dict, optional): Proxy URL or dictionary for the request.
        impersonate (str, optional): Browser profile for curl_cffi to impersonate. Defaults to "chrome110".

    Returns:
        str: Identifier of the uploaded file.

    Raises:
        HTTPError: If the upload request fails.
        RequestException: For other network-related errors.
        FileNotFoundError: If the file path does not exist.
    """
    # Prepare proxy dictionary for curl_cffi
    proxies_dict = None
    if isinstance(proxy, str):
        proxies_dict = {"http": proxy, "https": proxy} # curl_cffi uses http/https keys
    elif isinstance(proxy, dict):
        proxies_dict = proxy # Assume it's already in the correct format

    # Create CurlMime object for multipart upload
    mp = CurlMime()
    
    try:
        # Handle file input - determine content type and filename
        if isinstance(file, bytes):
            # If bytes, we can't determine type, default to image/jpeg
            # User should provide file path for better type detection
            content_type = "image/jpeg"
            filename = "file.jpg"
            mp.addpart(
                name="file",
                content_type=content_type,
                filename=filename,
                data=file
            )
        else:
            # If path, use the file directly
            file_path = Path(file)
            if not file_path.is_file():
                raise FileNotFoundError(f"File not found at path: {file}")
            
            # Determine content type from file extension
            content_type, _ = mimetypes.guess_type(str(file_path))
            if not content_type:
                # Try to guess from extension
                ext = file_path.suffix.lower()
                if ext in ['.mp4', '.avi', '.mov', '.webm', '.mkv', '.flv', '.m4v']:
                    content_type = "video/mp4"
                elif ext in ['.mp3', '.wav', '.ogg', '.flac', '.aac', '.m4a', '.wma']:
                    content_type = "audio/mpeg"
                elif ext in ['.jpg', '.jpeg']:
                    content_type = "image/jpeg"
                elif ext == '.png':
                    content_type = "image/png"
                elif ext == '.gif':
                    content_type = "image/gif"
                elif ext == '.webp':
                    content_type = "image/webp"
                else:
                    # Default to image/jpeg if can't determine
                    content_type = "image/jpeg"
            
            mp.addpart(
                name="file",
                content_type=content_type,
                filename=file_path.name,
                local_path=str(file_path)
            )

        # Use AsyncSession from curl_cffi
        async with AsyncSession(
            proxies=proxies_dict,
            impersonate=impersonate,
            headers=Headers.UPLOAD.value # Pass headers directly
            # follow_redirects is handled automatically by curl_cffi
        ) as client:
            response = await client.post(
                url=Endpoint.UPLOAD.value, # Use Endpoint enum
                multipart=mp,
            )
            response.raise_for_status() # Raises HTTPError for bad responses
            result = response.text
            return result
    except HTTPError as e:
        console.log(f"[red]HTTP error during file upload: {e.response.status_code} {e}[/red]")
        raise # Re-raise HTTPError
    except (RequestException, CurlError) as e: # Catch CurlError as well
        console.log(f"[red]Network error during file upload: {e}[/red]")
        raise # Re-raise other request errors
    finally:
        # Always close the multipart object to free memory
        mp.close()

def load_cookies(cookie_path: str) -> Tuple[str, str, Dict[str, str]]:
    """
    Loads authentication cookies from a JSON file or Netscape HTTP Cookie File format.
    Supports both formats and can read them from the same file.

    Args:
        cookie_path (str): Path to the file containing cookies (JSON or Netscape format).

    Returns:
        tuple[str, str, dict]: Tuple containing __Secure-1PSID, __Secure-1PSIDTS, and additional cookies dict.

    Raises:
        Exception: If the file is not found, invalid, or required cookies are missing.
    """
    try:
        with open(cookie_path, 'r', encoding='utf-8') as file:
            content = file.read()
        
        session_auth1 = None
        session_auth2 = None
        additional_cookies = {}
        
        # Try to parse JSON format first
        json_end = content.find(']')
        if json_end == -1:
            json_end = content.find('}')
        if json_end != -1:
            try:
                json_content = content[:json_end + 1]
                cookies = json.loads(json_content)
                
                for item in cookies:
                    name_upper = item['name'].upper()
                    if name_upper == '__SECURE-1PSID':
                        session_auth1 = item['value']
                    elif name_upper == '__SECURE-1PSIDTS':
                        session_auth2 = item['value']
                    else:
                        # Store additional cookies (like COMPASS, _ga_WC57KJ50ZZ, etc.)
                        additional_cookies[item['name']] = item['value']
            except json.JSONDecodeError:
                pass  # Fall through to Netscape format parsing
        
        # Also parse Netscape HTTP Cookie File format
        # Format: domain\tflag\tpath\tsecure\texpiration\tname\tvalue
        for line in content.split('\n'):
            line = line.strip()
            # Skip comments and empty lines
            if not line or line.startswith('#'):
                continue
            
            # Parse Netscape cookie format
            parts = line.split('\t')
            if len(parts) >= 7:
                domain = parts[0].strip()
                name = parts[5].strip()
                value = parts[6].strip()
                
                # Look for cookies from .gemini.google.com or .google.com
                if '.gemini.google.com' in domain or '.google.com' in domain:
                    name_upper = name.upper()
                    if name_upper == '__SECURE-1PSID':
                        if not session_auth1:  # Only use if not already set from JSON
                            session_auth1 = value
                    elif name_upper == '__SECURE-1PSIDTS':
                        if not session_auth2:  # Only use if not already set from JSON
                            session_auth2 = value
                    else:
                        # Store additional cookies (like COMPASS, _ga_WC57KJ50ZZ, etc.)
                        additional_cookies[name] = value

        if not session_auth1 or not session_auth2:
             raise StopIteration("Required cookies (__Secure-1PSID or __Secure-1PSIDTS) not found.")

        return session_auth1, session_auth2, additional_cookies
    except FileNotFoundError:
        raise Exception(f"Cookie file not found at path: {cookie_path}")
    except StopIteration as e:
        raise Exception(f"{e} Check the cookie file format and content.")
    except Exception as e: # Catch other potential errors
        raise Exception(f"An unexpected error occurred while loading cookies: {e}")

def save_cookies(cookie_path: str, secure_1psid: str, secure_1psidts: str, additional_cookies: Optional[Dict[str, str]] = None) -> None:
    """
    Saves authentication cookies to a JSON file.
    Updates existing cookies or creates a new file if it doesn't exist.

    Args:
        cookie_path (str): Path to the JSON file where cookies will be saved.
        secure_1psid (str): The __Secure-1PSID cookie value.
        secure_1psidts (str): The __Secure-1PSIDTS cookie value.
        additional_cookies (dict, optional): Additional cookies to save (e.g., COMPASS).

    Raises:
        Exception: If there's an error writing to the file.
    """
    try:
        cookies_list = [
            {
                "name": "__Secure-1PSID",
                "value": secure_1psid
            },
            {
                "name": "__Secure-1PSIDTS",
                "value": secure_1psidts
            }
        ]
        
        # Add additional cookies if provided
        if additional_cookies:
            for name, value in additional_cookies.items():
                # Skip if already in the list
                if not any(cookie["name"] == name for cookie in cookies_list):
                    cookies_list.append({
                        "name": name,
                        "value": value
                    })
        
        # Ensure directory exists
        Path(cookie_path).parent.mkdir(parents=True, exist_ok=True)
        
        # Write to file
        with open(cookie_path, 'w', encoding='utf-8') as f:
            json.dump(cookies_list, f, indent=4, ensure_ascii=False)
        
        console.log(f"[green]Cookies atualizados e salvos em {cookie_path}[/green]")
    except Exception as e:
        console.log(f"[red]Erro ao salvar cookies: {e}[/red]")
        raise Exception(f"Erro ao salvar cookies: {e}")