Spaces:
Paused
Paused
| import asyncio | |
| import sys # For platform check | |
| import logging | |
| import os | |
| import uuid | |
| import json | |
| import time | |
| import random | |
| import asyncio | |
| import httpx # For getSpaces API call | |
| from contextlib import asynccontextmanager # For lifespan | |
| from playwright.async_api import async_playwright, Error as PlaywrightError | |
| from fastapi import FastAPI, Request, HTTPException, Depends, status | |
| from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials | |
| from fastapi.responses import StreamingResponse | |
| from dotenv import load_dotenv | |
| import secrets # Added for secure comparison | |
| from datetime import datetime, timedelta, timezone # Explicit datetime imports | |
| from zoneinfo import ZoneInfo # For timezone handling | |
| from typing import List, Optional # Add List and Optional for typing | |
| from models import ( | |
| ChatMessage, ChatCompletionRequest, NotionTranscriptConfigValue, | |
| NotionTranscriptContextValue, NotionTranscriptItem, NotionDebugOverrides, | |
| NotionRequestBody, ChoiceDelta, Choice, ChatCompletionChunk, Model, ModelList | |
| ) | |
| # Load environment variables from .env file | |
| load_dotenv() | |
| # --- Event Loop Policy for Windows --- | |
| # For Playwright compatibility, especially with subprocesses on Windows | |
| if sys.platform == "win32": | |
| asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy()) | |
| logging.info("Set WindowsProactorEventLoopPolicy for asyncio.") | |
| # --- Logging Configuration --- | |
| # Change logging level from INFO to WARNING to reduce verbosity | |
| # Only important messages and errors will be displayed | |
| logging.basicConfig( | |
| level=logging.WARNING, # Changed from INFO to WARNING | |
| format='%(asctime)s - %(levelname)s - %(message)s', | |
| datefmt='%Y-%m-%d %H:%M:%S' # Standardize date format | |
| ) | |
| # Create a custom logger for critical account operations that we always want to see | |
| account_logger = logging.getLogger("notion_account") | |
| account_logger.setLevel(logging.INFO) | |
| # --- Account Management Class --- | |
| class NotionAccount: | |
| """Represents a single Notion account with its cookie and fetched IDs.""" | |
| def __init__(self, cookie: str): | |
| self.cookie = cookie | |
| self.space_id: Optional[str] = None | |
| self.user_id: Optional[str] = None | |
| self.user_email: Optional[str] = None # Added for identification | |
| self.is_healthy: bool = False | |
| self.lock = asyncio.Lock() # To prevent concurrent fetches for the same account | |
| def __str__(self): | |
| return f"Account(user_email={self.user_email}, user_id={self.user_id}, healthy={self.is_healthy})" | |
| # --- Configuration --- | |
| NOTION_API_URL = "https://www.notion.so/api/v3/runInferenceTranscript" | |
| # IMPORTANT: Load multiple Notion cookies securely from environment variables | |
| # Use a unique separator like '|' because cookies can contain ';' and ',' | |
| NOTION_COOKIES_RAW = os.getenv("NOTION_COOKIES") | |
| # --- Global State for Account Polling --- | |
| ACCOUNTS: List[NotionAccount] = [] | |
| CURRENT_ACCOUNT_INDEX = 0 # Index for round-robin | |
| if not NOTION_COOKIES_RAW: | |
| # This is a critical error, app cannot function without it. | |
| logging.error("CRITICAL: NOTION_COOKIES environment variable not set. Application will not work.") | |
| # In a real app, you might exit or raise a more severe startup error. | |
| # --- Proxy Configuration --- | |
| PROXY_URL = os.getenv("PROXY_URL", "") # Empty string as default | |
| # --- Authentication --- | |
| EXPECTED_TOKEN = os.getenv("PROXY_AUTH_TOKEN", "default_token") # Default token | |
| security = HTTPBearer() | |
| def authenticate(credentials: HTTPAuthorizationCredentials = Depends(security)): | |
| """Compares provided token with the expected token.""" | |
| correct_token = secrets.compare_digest(credentials.credentials, EXPECTED_TOKEN) | |
| if not correct_token: | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail="Invalid authentication credentials", | |
| # WWW-Authenticate header removed for Bearer | |
| ) | |
| return True # Indicate successful authentication | |
| # --- Notion Account Management --- | |
| def get_next_account() -> NotionAccount: | |
| """Selects the next healthy Notion account using a round-robin strategy.""" | |
| global CURRENT_ACCOUNT_INDEX | |
| # This check runs on every request to ensure we don't try to select from an empty list | |
| # in case all accounts failed during startup. | |
| if not ACCOUNTS: | |
| raise HTTPException( | |
| status_code=status.HTTP_503_SERVICE_UNAVAILABLE, | |
| detail="No Notion accounts are configured in the server." | |
| ) | |
| healthy_accounts = [acc for acc in ACCOUNTS if acc.is_healthy] | |
| if not healthy_accounts: | |
| raise HTTPException( | |
| status_code=status.HTTP_503_SERVICE_UNAVAILABLE, | |
| detail="No healthy Notion accounts available to process the request." | |
| ) | |
| # Round-robin logic | |
| start_index = CURRENT_ACCOUNT_INDEX | |
| while True: | |
| account = ACCOUNTS[CURRENT_ACCOUNT_INDEX] | |
| CURRENT_ACCOUNT_INDEX = (CURRENT_ACCOUNT_INDEX + 1) % len(ACCOUNTS) | |
| if account.is_healthy: | |
| account_logger.warning(f"Using account: {account.user_email}") | |
| return account | |
| if CURRENT_ACCOUNT_INDEX == start_index: | |
| raise HTTPException( | |
| status_code=status.HTTP_503_SERVICE_UNAVAILABLE, | |
| detail="Critical error in account selection: No healthy accounts found in rotation." | |
| ) | |
| async def fetch_and_set_notion_ids(account: NotionAccount): | |
| """Fetches space ID and user ID for a given Notion account and marks it as healthy on success.""" | |
| async with account.lock: # Ensure only one fetch operation per account at a time | |
| if account.is_healthy: # Don't re-fetch if already healthy | |
| return | |
| if not account.cookie: | |
| account_logger.error("Account validation failed: Cookie not set") | |
| account_logger.error(f"Failing cookie (empty): {account.cookie}") | |
| account.is_healthy = False | |
| return | |
| get_spaces_url = "https://www.notion.so/api/v3/getSpaces" | |
| # Headers for the JS fetch call (cookie is handled by context) | |
| js_fetch_headers = { | |
| 'Content-Type': 'application/json', | |
| 'accept': '*/*', | |
| 'accept-language': 'en-US,en;q=0.9', | |
| 'notion-audit-log-platform': 'web', | |
| 'notion-client-version': '23.13.0.3686', | |
| 'origin': 'https://www.notion.so', | |
| 'priority': 'u=1, i', | |
| 'referer': 'https://www.notion.so/', | |
| 'sec-ch-ua': '"Chromium";v="136", "Google Chrome";v="136", "Not.A/Brand";v="99"', | |
| 'sec-ch-ua-mobile': '?0', | |
| 'sec-ch-ua-platform': '"Windows"', | |
| 'sec-fetch-dest': 'empty', | |
| 'sec-fetch-mode': 'cors', | |
| 'sec-fetch-site': 'same-origin', | |
| 'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/136.0.0.0 Safari/537.36' | |
| } | |
| browser = None | |
| context = None | |
| page = None | |
| try: | |
| async with async_playwright() as p: | |
| # Configure browser launch with proxy if PROXY_URL is set | |
| launch_args = { | |
| 'headless': True, | |
| 'args': ['--no-sandbox', '--disable-setuid-sandbox'] | |
| } | |
| if PROXY_URL: | |
| launch_args['proxy'] = {'server': PROXY_URL} | |
| browser = await p.chromium.launch(**launch_args) | |
| context = await browser.new_context(user_agent=js_fetch_headers['user-agent']) | |
| # Add cookies from the account's cookie string | |
| cookies_to_add = [] | |
| cookie_pairs = account.cookie.split('; ') | |
| for pair in cookie_pairs: | |
| if '=' in pair: | |
| name, value = pair.split('=', 1) | |
| cookies_to_add.append({ | |
| 'name': name.strip(), 'value': value.strip(), | |
| 'domain': '.notion.so', 'path': '/', | |
| 'secure': True, 'httpOnly': True, 'sameSite': 'Lax' | |
| }) | |
| if cookies_to_add: | |
| await context.add_cookies(cookies_to_add) | |
| else: | |
| account_logger.error("Account validation failed: No valid cookies parsed") | |
| account_logger.error(f"Failing cookie: {account.cookie}") | |
| account.is_healthy = False | |
| return | |
| page = await context.new_page() | |
| await page.goto("https://www.notion.so/", wait_until="domcontentloaded", timeout=15000) | |
| # JavaScript to perform the fetch for getSpaces | |
| javascript_code_get_spaces = """ | |
| async (args) => { | |
| const { apiUrl, headers, body } = args; | |
| try { | |
| const response = await fetch(apiUrl, { | |
| method: 'POST', | |
| headers: headers, | |
| body: JSON.stringify(body) // Ensure body is stringified | |
| }); | |
| if (!response.ok) { | |
| console.error('getSpaces Fetch error:', response.status, await response.text()); | |
| return { success: false, error: `HTTP ${response.status}` }; | |
| } | |
| const data = await response.json(); | |
| return { success: true, data: data }; | |
| } catch (error) { | |
| console.error('getSpaces JS Exception:', error); | |
| return { success: false, error: error.toString() }; | |
| } | |
| } | |
| """ | |
| js_args = {"apiUrl": get_spaces_url, "headers": js_fetch_headers, "body": {}} # Empty JSON body for getSpaces | |
| result = await page.evaluate(javascript_code_get_spaces, js_args) | |
| if not result or not result.get('success'): | |
| error_detail = result.get('error', 'Unknown error during getSpaces JS execution') | |
| account_logger.error(f"Account validation failed: {error_detail}") | |
| account_logger.error(f"Failing cookie: {account.cookie}") | |
| account.is_healthy = False | |
| return | |
| data = result.get('data') | |
| if not data: | |
| account_logger.error("Account validation failed: No data returned") | |
| account_logger.error(f"Failing cookie: {account.cookie}") | |
| account.is_healthy = False | |
| return | |
| # Extract user ID | |
| user_id_key = next(iter(data), None) | |
| if not user_id_key: | |
| account_logger.error("Account validation failed: No user ID found") | |
| account_logger.error(f"Failing cookie: {account.cookie}") | |
| account.is_healthy = False | |
| return | |
| account.user_id = user_id_key | |
| # Extract space ID | |
| user_root = data.get(user_id_key, {}).get("user_root", {}).get(user_id_key, {}) | |
| space_view_pointers = user_root.get("value", {}).get("value", {}).get("space_view_pointers", []) | |
| if space_view_pointers and isinstance(space_view_pointers, list) and len(space_view_pointers) > 0: | |
| account.space_id = space_view_pointers[0].get("spaceId") | |
| # If spaceId is found, proceed to get user email | |
| if account.space_id: | |
| get_user_analytics_url = "https://www.notion.so/api/v3/getUserAnalyticsSettings" | |
| analytics_headers = { | |
| 'Content-Type': 'application/json', | |
| 'accept': '*/*', | |
| 'notion-audit-log-platform': 'web', | |
| 'notion-client-version': '23.13.0.3833', # From user's example | |
| 'x-notion-active-user-header': account.user_id, | |
| 'x-notion-space-id': account.space_id, | |
| 'user-agent': js_fetch_headers['user-agent'] | |
| } | |
| javascript_code_get_analytics = """ | |
| async (args) => { | |
| const { apiUrl, headers, body } = args; | |
| try { | |
| const response = await fetch(apiUrl, { | |
| method: 'POST', | |
| headers: headers, | |
| body: JSON.stringify(body) | |
| }); | |
| if (!response.ok) { | |
| console.error('getUserAnalyticsSettings Fetch error:', response.status, await response.text()); | |
| return { success: false, error: `HTTP ${response.status}: ${await response.text()}` }; | |
| } | |
| const data = await response.json(); | |
| return { success: true, data: data }; | |
| } catch (error) { | |
| console.error('getUserAnalyticsSettings JS Exception:', error); | |
| return { success: false, error: error.toString() }; | |
| } | |
| } | |
| """ | |
| analytics_args = {"apiUrl": get_user_analytics_url, "headers": analytics_headers, "body": {}} | |
| analytics_result = await page.evaluate(javascript_code_get_analytics, analytics_args) | |
| if analytics_result and analytics_result.get('success'): | |
| analytics_data = analytics_result.get('data') | |
| account.user_email = analytics_data.get('user_email') | |
| if account.user_email: | |
| account_logger.warning(f"Account validated: {account.user_email}") | |
| account.is_healthy = True # Mark as healthy only on complete success | |
| else: | |
| account_logger.error(f"Account validation failed: No email found for User ID: {account.user_id}") | |
| account_logger.error(f"Failing cookie: {account.cookie}") | |
| account.is_healthy = False | |
| else: | |
| error_detail = analytics_result.get('error', 'Unknown error') if analytics_result else 'No result from JS' | |
| account_logger.error(f"Account validation failed: getUserAnalyticsSettings error: {error_detail}") | |
| account_logger.error(f"Failing cookie: {account.cookie}") | |
| account.is_healthy = False | |
| else: | |
| account_logger.error(f"Account validation failed: No spaceId for User ID: {account.user_id}") | |
| account_logger.error(f"Failing cookie: {account.cookie}") | |
| account.is_healthy = False | |
| else: | |
| account_logger.error(f"Account validation failed: No space_view_pointers for User ID: {account.user_id}") | |
| account_logger.error(f"Failing cookie: {account.cookie}") | |
| account.is_healthy = False | |
| except PlaywrightError as e: | |
| account_logger.error(f"Account validation failed: Playwright error: {e}") | |
| account_logger.error(f"Failing cookie: {account.cookie}") | |
| account.is_healthy = False | |
| except Exception as e: | |
| account_logger.error(f"Account validation failed: General error: {e}") | |
| account_logger.error(f"Failing cookie: {account.cookie}") | |
| account.is_healthy = False | |
| finally: | |
| if browser and browser.is_connected(): | |
| try: | |
| await browser.close() | |
| except: | |
| pass # Suppress any errors during browser close | |
| async def lifespan(app: FastAPI): | |
| # On startup | |
| account_logger.warning("Starting application: Initializing Notion accounts...") | |
| if NOTION_COOKIES_RAW: | |
| # Split cookies by a unique separator, e.g., '|' | |
| cookie_list = [c.strip() for c in NOTION_COOKIES_RAW.split('|') if c.strip()] | |
| for cookie in cookie_list: | |
| ACCOUNTS.append(NotionAccount(cookie=cookie)) | |
| account_logger.warning(f"Loaded {len(ACCOUNTS)} Notion account(s) from environment variable.") | |
| if not ACCOUNTS: | |
| account_logger.error("CRITICAL: No Notion accounts loaded. The application will not be able to process requests.") | |
| else: | |
| # Concurrently fetch IDs for all loaded Notion accounts... | |
| account_logger.warning("Fetching IDs for all Notion accounts...") | |
| fetch_tasks = [fetch_and_set_notion_ids(acc) for acc in ACCOUNTS] | |
| await asyncio.gather(*fetch_tasks) | |
| healthy_count = sum(1 for acc in ACCOUNTS if acc.is_healthy) | |
| account_logger.warning(f"Initialization complete. {healthy_count} of {len(ACCOUNTS)} accounts are healthy.") | |
| if healthy_count == 0: | |
| account_logger.error("CRITICAL: No healthy Notion accounts available after initialization.") | |
| yield | |
| # On shutdown | |
| account_logger.warning("Application shutdown.") | |
| app = FastAPI(lifespan=lifespan) | |
| # --- Helper Functions --- | |
| def build_notion_request(request_data: ChatCompletionRequest, account: NotionAccount) -> NotionRequestBody: | |
| """Transforms OpenAI-style messages to Notion transcript format, using the provided account.""" | |
| # --- Timestamp and User ID Logic --- | |
| # Use the user ID from the selected account | |
| user_id = account.user_id | |
| # Get all non-assistant messages to assign timestamps | |
| non_assistant_messages = [msg for msg in request_data.messages if msg.role != "assistant"] | |
| num_non_assistant_messages = len(non_assistant_messages) | |
| message_timestamps = {} # Store timestamps keyed by message id | |
| if num_non_assistant_messages > 0: | |
| # Get current time specifically in Pacific Time (America/Los_Angeles) | |
| pacific_tz = ZoneInfo("America/Los_Angeles") | |
| now_pacific = datetime.now(timezone.utc).astimezone(pacific_tz) | |
| # Assign timestamp to the last non-assistant message | |
| last_msg_id = non_assistant_messages[-1].id | |
| message_timestamps[last_msg_id] = now_pacific | |
| # Calculate timestamps for previous non-assistant messages (random intervals earlier) | |
| current_timestamp = now_pacific | |
| for i in range(num_non_assistant_messages - 2, -1, -1): # Iterate backwards from second-to-last | |
| current_timestamp -= timedelta(minutes=random.randint(3, 20)) # Use random interval (3-20 mins) | |
| message_timestamps[non_assistant_messages[i].id] = current_timestamp | |
| # --- Build Transcript --- | |
| # Get current time in Pacific timezone for context | |
| pacific_tz = ZoneInfo("America/Los_Angeles") | |
| now_pacific = datetime.now(timezone.utc).astimezone(pacific_tz) | |
| # Format timestamp exactly as YYYY-MM-DDTHH:MM:SS.fff-HH:MM | |
| dt_str = now_pacific.strftime("%Y-%m-%dT%H:%M:%S") | |
| ms = f"{now_pacific.microsecond // 1000:03d}" # Ensure 3 digits for milliseconds | |
| tz_str = now_pacific.strftime("%z") # Gets +HHMM or -HHMM | |
| formatted_tz = f"{tz_str[:-2]}:{tz_str[-2:]}" # Insert colon | |
| current_datetime_iso = f"{dt_str}.{ms}{formatted_tz}" | |
| # Generate random text for userName and spaceName | |
| random_words = ["Project", "Workspace", "Team", "Studio", "Lab", "Hub", "Zone", "Space"] | |
| user_name = f"User{random.randint(100, 999)}" | |
| space_name = f"{random.choice(random_words)} {random.randint(1, 99)}" | |
| transcript = [ | |
| NotionTranscriptItem( | |
| type="config", | |
| value=NotionTranscriptConfigValue(model=request_data.notion_model) | |
| ), | |
| NotionTranscriptItem( | |
| type="context", | |
| value=NotionTranscriptContextValue( | |
| userId=user_id or "", # Use the user_id from the selected account | |
| spaceId=account.space_id, # Use space_id from the selected account | |
| surface="home_module", | |
| timezone="America/Los_Angeles", | |
| userName=user_name, | |
| spaceName=space_name, | |
| spaceViewId=str(uuid.uuid4()), # Random UUID for spaceViewId | |
| currentDatetime=current_datetime_iso | |
| ) | |
| ), | |
| NotionTranscriptItem( | |
| type="agent-integration" | |
| # No value field needed for agent-integration | |
| ) | |
| ] | |
| for message in request_data.messages: | |
| if message.role == "assistant": | |
| # Assistant messages get type="markdown-chat" and a traceId | |
| transcript.append(NotionTranscriptItem( | |
| type="markdown-chat", | |
| value=message.content, | |
| traceId=str(uuid.uuid4()) # Generate unique traceId for assistant message | |
| )) | |
| else: # Treat all other roles (user, system, etc.) as "user" type | |
| created_at_dt = message_timestamps.get(message.id) # Use the unified timestamp dict | |
| created_at_iso = None | |
| if created_at_dt: | |
| # Format timestamp exactly as YYYY-MM-DDTHH:MM:SS.fff-HH:MM | |
| dt_str = created_at_dt.strftime("%Y-%m-%dT%H:%M:%S") | |
| ms = f"{created_at_dt.microsecond // 1000:03d}" # Ensure 3 digits for milliseconds | |
| tz_str = created_at_dt.strftime("%z") # Gets +HHMM or -HHMM | |
| formatted_tz = f"{tz_str[:-2]}:{tz_str[-2:]}" # Insert colon | |
| created_at_iso = f"{dt_str}.{ms}{formatted_tz}" | |
| content = message.content | |
| # Ensure content is treated as a string for user/system messages | |
| if isinstance(content, list): | |
| # Attempt to extract text from list format, default to empty string | |
| text_content = "" | |
| for part in content: | |
| if isinstance(part, dict) and part.get("type") == "text": | |
| text_part = part.get("text") | |
| if isinstance(text_part, str): | |
| text_content += text_part # Concatenate text parts if needed | |
| content = text_content if text_content else "" # Use extracted text or empty string | |
| elif not isinstance(content, str): | |
| content = "" # Default to empty string if not list or string | |
| # Format value as expected by Notion for user type: [[content_string]] | |
| notion_value = [[content]] if content else [[""]] | |
| transcript.append(NotionTranscriptItem( | |
| type="user", # Set type to "user" for non-assistant roles | |
| value=notion_value, | |
| userId=user_id, # Assign userId | |
| createdAt=created_at_iso # Assign timestamp | |
| # No traceId for user/system messages | |
| )) | |
| # Use spaceId from the selected account, set createThread=True | |
| return NotionRequestBody( | |
| spaceId=account.space_id, # From selected account | |
| transcript=transcript, | |
| createThread=True, # Always create a new thread | |
| # Generate a new traceId for each request | |
| traceId=str(uuid.uuid4()), | |
| # Explicitly set debugOverrides, generateTitle, and saveAllThreadOperations | |
| debugOverrides=NotionDebugOverrides( | |
| cachedInferences={}, | |
| annotationInferences={}, | |
| emitInferences=False | |
| ), | |
| generateTitle=False, | |
| saveAllThreadOperations=False | |
| ) | |
| # --- Background Playwright Task --- | |
| async def _run_playwright_fetch( | |
| chunk_queue: asyncio.Queue, | |
| notion_request_body: NotionRequestBody, | |
| headers_template: dict, | |
| notion_api_url: str, | |
| account: NotionAccount # Pass the whole account object | |
| ): | |
| """Runs Playwright fetch in the background, putting results into a queue.""" | |
| browser = None | |
| context = None | |
| page = None | |
| # Construct headers for this specific task run | |
| current_headers = headers_template.copy() | |
| current_headers['x-notion-space-id'] = account.space_id | |
| if account.user_id: | |
| current_headers['x-notion-active-user-header'] = account.user_id | |
| async def handle_chunk(chunk_str: str): | |
| await chunk_queue.put(chunk_str) | |
| async def handle_stream_end(): | |
| await chunk_queue.put(None) | |
| try: | |
| async with async_playwright() as p: | |
| # Configure browser launch with proxy if PROXY_URL is set | |
| launch_args = { | |
| 'headless': True, | |
| 'args': ['--no-sandbox', '--disable-setuid-sandbox'] | |
| } | |
| if PROXY_URL: | |
| launch_args['proxy'] = {'server': PROXY_URL} | |
| browser = await p.chromium.launch(**launch_args) | |
| context = await browser.new_context(user_agent=current_headers.get('user-agent', 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/136.0.0.0 Safari/537.36')) | |
| if account.cookie: | |
| cookies_to_add = [] | |
| cookie_pairs = account.cookie.split('; ') | |
| for pair in cookie_pairs: | |
| if '=' in pair: | |
| name, value = pair.split('=', 1) | |
| cookies_to_add.append({ | |
| 'name': name.strip(), 'value': value.strip(), | |
| 'domain': '.notion.so', 'path': '/', | |
| 'secure': True, 'httpOnly': True, 'sameSite': 'Lax' | |
| }) | |
| if cookies_to_add: | |
| await context.add_cookies(cookies_to_add) | |
| else: | |
| raise ValueError("Server configuration error: No valid cookies for request") | |
| else: | |
| raise ValueError("Server configuration error: Notion cookie not set for request") | |
| page = await context.new_page() | |
| await page.goto("https://www.notion.so/chat", wait_until="domcontentloaded") | |
| await page.expose_function("sendChunkToPython", handle_chunk) | |
| await page.expose_function("signalStreamEnd", handle_stream_end) | |
| request_body_json_str = notion_request_body.json() | |
| # Prepare headers for JS fetch (cookie is handled by context) | |
| js_fetch_headers = current_headers.copy() | |
| if 'cookie' in js_fetch_headers: | |
| del js_fetch_headers['cookie'] | |
| javascript_code = """ | |
| async (args) => { | |
| const { apiUrl, headers, body } = args; | |
| try { | |
| const response = await fetch(apiUrl, { | |
| method: 'POST', | |
| headers: headers, | |
| body: body | |
| }); | |
| if (!response.ok) { | |
| const errorText = await response.text(); | |
| console.error('JS Fetch error:', response.status, errorText); | |
| await window.signalStreamEnd(); | |
| return { success: false, status: response.status, error: errorText }; | |
| } | |
| if (!response.body) { | |
| console.error('JS Response body is null'); | |
| await window.signalStreamEnd(); | |
| return { success: false, error: 'Response body is null' }; | |
| } | |
| const reader = response.body.getReader(); | |
| const decoder = new TextDecoder(); | |
| while (true) { | |
| const { done, value } = await reader.read(); | |
| if (done) break; | |
| await window.sendChunkToPython(decoder.decode(value, { stream: true })); | |
| } | |
| await window.signalStreamEnd(); | |
| return { success: true }; | |
| } catch (error) { | |
| console.error('JS Exception during fetch:', error); | |
| await window.signalStreamEnd(); | |
| return { success: false, error: error.toString() }; | |
| } | |
| } | |
| """ | |
| js_args = {"apiUrl": notion_api_url, "headers": js_fetch_headers, "body": request_body_json_str} | |
| js_result = await page.evaluate(javascript_code, js_args) | |
| if not js_result or not js_result.get('success'): | |
| error_detail = js_result.get('error', 'Unknown JS execution error') | |
| # Error already signaled to queue by JS calling signalStreamEnd | |
| # Re-raise to be caught by the task's main try/except | |
| raise PlaywrightError(f"JS Fetch Error: {error_detail}") | |
| except Exception as e: | |
| await chunk_queue.put(None) # Ensure queue is terminated on error | |
| # Exception will be caught by playwright_task.exception() in the main generator | |
| finally: | |
| if browser and browser.is_connected(): | |
| try: | |
| await browser.close() | |
| except: | |
| pass # Suppress any errors during browser close | |
| # --- Main Generator Called by Endpoint --- | |
| async def stream_notion_response(notion_request_body: NotionRequestBody, account: NotionAccount): | |
| """Creates background task for Playwright and yields results from queue.""" | |
| chunk_queue = asyncio.Queue() | |
| playwright_task = None | |
| # These should be defined once per request stream | |
| chunk_id = f"chatcmpl-{uuid.uuid4()}" | |
| created_time = int(time.time()) | |
| # Define the template for headers here, to be passed to the background task | |
| headers_template = { | |
| 'accept': 'application/x-ndjson', | |
| 'accept-language': 'en-US,en;q=0.9', | |
| 'content-type': 'application/json', | |
| 'notion-audit-log-platform': 'web', | |
| 'notion-client-version': '23.13.0.3668', | |
| 'origin': 'https://www.notion.so', | |
| 'priority': 'u=1, i', | |
| 'referer': 'https://www.notion.so/chat', | |
| 'sec-ch-ua': '"Chromium";v="136", "Google Chrome";v="136", "Not.A/Brand";v="99"', | |
| 'sec-ch-ua-mobile': '?0', | |
| 'sec-ch-ua-platform': '"Windows"', | |
| 'sec-fetch-dest': 'empty', | |
| 'sec-fetch-mode': 'cors', | |
| 'sec-fetch-site': 'same-origin', | |
| 'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/136.0.0.0 Safari/537.36', | |
| # 'cookie' and 'x-notion-space-id' will be handled/added by _run_playwright_fetch | |
| } | |
| try: | |
| # Health check is now performed by get_next_account() before this function is called. | |
| playwright_task = asyncio.create_task( | |
| _run_playwright_fetch( | |
| chunk_queue, | |
| notion_request_body, | |
| headers_template, | |
| NOTION_API_URL, # Global constant | |
| account # Pass the selected account | |
| ) | |
| ) | |
| accumulated_line = "" | |
| while True: | |
| chunk = await chunk_queue.get() # Wait for a chunk from the background task | |
| if chunk is None: | |
| break | |
| accumulated_line += chunk | |
| while '\n' in accumulated_line: | |
| line, accumulated_line = accumulated_line.split('\n', 1) | |
| if not line.strip(): | |
| continue | |
| try: | |
| data = json.loads(line) | |
| if data.get("type") == "markdown-chat" and isinstance(data.get("value"), str): | |
| content_chunk = data["value"] | |
| if content_chunk: | |
| sse_chunk = ChatCompletionChunk( | |
| id=chunk_id, created=created_time, | |
| choices=[Choice(delta=ChoiceDelta(content=content_chunk))] | |
| ) | |
| yield f"data: {sse_chunk.json()}\n\n" | |
| elif "recordMap" in data: | |
| pass # Ignore recordMap chunks | |
| except json.JSONDecodeError: | |
| pass # Ignore JSON decode errors in silent mode | |
| except Exception as e: | |
| # Only log critical errors | |
| account_logger.error(f"Critical error processing stream: {e}") | |
| # Process any final accumulated data after None sentinel | |
| if accumulated_line.strip(): | |
| try: | |
| data = json.loads(accumulated_line) | |
| if data.get("type") == "markdown-chat" and isinstance(data.get("value"), str): | |
| content_chunk = data["value"] | |
| if content_chunk: | |
| sse_chunk = ChatCompletionChunk( | |
| id=chunk_id, created=created_time, | |
| choices=[Choice(delta=ChoiceDelta(content=content_chunk))] | |
| ) | |
| yield f"data: {sse_chunk.json()}\n\n" | |
| except json.JSONDecodeError: | |
| pass # Ignore JSON decode errors in silent mode | |
| except Exception as e: | |
| # Only log critical errors | |
| account_logger.error(f"Critical error processing final data: {e}") | |
| # After loop, check if the background task raised an exception | |
| if playwright_task.done() and playwright_task.exception(): | |
| task_exception = playwright_task.exception() | |
| account_logger.error(f"Background task error: {task_exception}") | |
| raise HTTPException(status_code=500, detail=f"Error during background processing: {task_exception}") | |
| else: | |
| final_chunk = ChatCompletionChunk( | |
| id=chunk_id, created=created_time, | |
| choices=[Choice(delta=ChoiceDelta(), finish_reason="stop")] | |
| ) | |
| yield f"data: {final_chunk.json()}\n\n" | |
| yield "data: [DONE]\n\n" | |
| except Exception as e: | |
| account_logger.error(f"Stream response error: {e}") | |
| if playwright_task and not playwright_task.done(): | |
| playwright_task.cancel() | |
| raise | |
| finally: | |
| if playwright_task and not playwright_task.done(): | |
| playwright_task.cancel() | |
| try: | |
| await playwright_task # Allow cancellation to propagate | |
| except asyncio.CancelledError: | |
| pass | |
| except Exception: | |
| pass # Suppress errors during cleanup | |
| # --- API Endpoint --- | |
| async def list_models(authenticated: bool = Depends(authenticate)): | |
| """ | |
| Endpoint to list available Notion models, mimicking OpenAI's /v1/models. | |
| """ | |
| available_models = [ | |
| "openai-gpt-4.1", | |
| "anthropic-opus-4", | |
| "anthropic-sonnet-4" | |
| ] | |
| model_list = [ | |
| Model(id=model_id, owned_by="notion") # created uses default_factory | |
| for model_id in available_models | |
| ] | |
| return ModelList(data=model_list) | |
| async def chat_completions(request_data: ChatCompletionRequest, request: Request, authenticated: bool = Depends(authenticate)): | |
| """ | |
| Endpoint to mimic OpenAI's chat completions, proxying to Notion. | |
| It uses round-robin to select a healthy Notion account for each request. | |
| """ | |
| account = get_next_account() # Select a healthy account | |
| notion_request_body = build_notion_request(request_data, account) | |
| if request_data.stream: | |
| # Call the Playwright generator, passing the selected account | |
| return StreamingResponse( | |
| stream_notion_response(notion_request_body, account), | |
| media_type="text/event-stream" | |
| ) | |
| else: | |
| # --- Non-Streaming Logic --- | |
| full_response_content = "" | |
| final_finish_reason = None | |
| chunk_id = f"chatcmpl-{uuid.uuid4()}" # Generate ID for the non-streamed response | |
| created_time = int(time.time()) | |
| try: | |
| # Call the Playwright generator, passing the selected account | |
| async for line in stream_notion_response(notion_request_body, account): | |
| if line.startswith("data: ") and "[DONE]" not in line: | |
| try: | |
| data_json = line[len("data: "):].strip() | |
| if data_json: | |
| chunk_data = json.loads(data_json) | |
| if chunk_data.get("choices"): | |
| delta = chunk_data["choices"][0].get("delta", {}) | |
| content = delta.get("content") | |
| if content: | |
| full_response_content += content | |
| finish_reason = chunk_data["choices"][0].get("finish_reason") | |
| if finish_reason: | |
| final_finish_reason = finish_reason | |
| except json.JSONDecodeError: | |
| pass # Ignore JSON errors in non-streaming mode | |
| except Exception as e: | |
| account_logger.error(f"Error processing non-streaming response: {e}") | |
| # Construct the final OpenAI-compatible non-streaming response | |
| return { | |
| "id": chunk_id, | |
| "object": "chat.completion", | |
| "created": created_time, | |
| "model": request_data.model, # Return the model requested by the client | |
| "choices": [ | |
| { | |
| "index": 0, | |
| "message": { | |
| "role": "assistant", | |
| "content": full_response_content, | |
| }, | |
| "finish_reason": final_finish_reason or "stop", # Default to stop if not explicitly set | |
| } | |
| ], | |
| "usage": { # Note: Token usage is not available from Notion | |
| "prompt_tokens": None, | |
| "completion_tokens": None, | |
| "total_tokens": None, | |
| }, | |
| } | |
| except HTTPException as e: | |
| # Re-raise HTTP exceptions from the streaming function | |
| raise e | |
| except Exception as e: | |
| account_logger.error(f"Error during non-streaming processing: {e}") | |
| raise HTTPException(status_code=500, detail="Internal server error processing Notion response") | |
| # --- Uvicorn Runner --- | |
| if __name__ == "__main__": | |
| import uvicorn | |
| print("Starting server. Access at http://127.0.0.1:7860") | |
| print("Ensure NOTION_COOKIES is set in your .env file or environment, separated by '|'.") | |
| uvicorn.run(app, host="127.0.0.1", port=7860) |