import asyncio import sys # For platform check import logging import os import uuid import json import time import random import asyncio import httpx # For getSpaces API call from contextlib import asynccontextmanager # For lifespan from playwright.async_api import async_playwright, Error as PlaywrightError from fastapi import FastAPI, Request, HTTPException, Depends, status from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from fastapi.responses import StreamingResponse from dotenv import load_dotenv import secrets # Added for secure comparison from datetime import datetime, timedelta, timezone # Explicit datetime imports from zoneinfo import ZoneInfo # For timezone handling from typing import List, Optional # Add List and Optional for typing from models import ( ChatMessage, ChatCompletionRequest, NotionTranscriptConfigValue, NotionTranscriptContextValue, NotionTranscriptItem, NotionDebugOverrides, NotionRequestBody, ChoiceDelta, Choice, ChatCompletionChunk, Model, ModelList ) # Load environment variables from .env file load_dotenv() # --- Event Loop Policy for Windows --- # For Playwright compatibility, especially with subprocesses on Windows if sys.platform == "win32": asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy()) logging.info("Set WindowsProactorEventLoopPolicy for asyncio.") # --- Logging Configuration --- # Change logging level from INFO to WARNING to reduce verbosity # Only important messages and errors will be displayed logging.basicConfig( level=logging.WARNING, # Changed from INFO to WARNING format='%(asctime)s - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S' # Standardize date format ) # Create a custom logger for critical account operations that we always want to see account_logger = logging.getLogger("notion_account") account_logger.setLevel(logging.INFO) # --- Account Management Class --- class NotionAccount: """Represents a single Notion account with its cookie and fetched IDs.""" def __init__(self, cookie: str): self.cookie = cookie self.space_id: Optional[str] = None self.user_id: Optional[str] = None self.user_email: Optional[str] = None # Added for identification self.is_healthy: bool = False self.lock = asyncio.Lock() # To prevent concurrent fetches for the same account def __str__(self): return f"Account(user_email={self.user_email}, user_id={self.user_id}, healthy={self.is_healthy})" # --- Configuration --- NOTION_API_URL = "https://www.notion.so/api/v3/runInferenceTranscript" # IMPORTANT: Load multiple Notion cookies securely from environment variables # Use a unique separator like '|' because cookies can contain ';' and ',' NOTION_COOKIES_RAW = os.getenv("NOTION_COOKIES") # --- Global State for Account Polling --- ACCOUNTS: List[NotionAccount] = [] CURRENT_ACCOUNT_INDEX = 0 # Index for round-robin if not NOTION_COOKIES_RAW: # This is a critical error, app cannot function without it. logging.error("CRITICAL: NOTION_COOKIES environment variable not set. Application will not work.") # In a real app, you might exit or raise a more severe startup error. # --- Proxy Configuration --- PROXY_URL = os.getenv("PROXY_URL", "") # Empty string as default # --- Authentication --- EXPECTED_TOKEN = os.getenv("PROXY_AUTH_TOKEN", "default_token") # Default token security = HTTPBearer() def authenticate(credentials: HTTPAuthorizationCredentials = Depends(security)): """Compares provided token with the expected token.""" correct_token = secrets.compare_digest(credentials.credentials, EXPECTED_TOKEN) if not correct_token: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid authentication credentials", # WWW-Authenticate header removed for Bearer ) return True # Indicate successful authentication # --- Notion Account Management --- def get_next_account() -> NotionAccount: """Selects the next healthy Notion account using a round-robin strategy.""" global CURRENT_ACCOUNT_INDEX # This check runs on every request to ensure we don't try to select from an empty list # in case all accounts failed during startup. if not ACCOUNTS: raise HTTPException( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="No Notion accounts are configured in the server." ) healthy_accounts = [acc for acc in ACCOUNTS if acc.is_healthy] if not healthy_accounts: raise HTTPException( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="No healthy Notion accounts available to process the request." ) # Round-robin logic start_index = CURRENT_ACCOUNT_INDEX while True: account = ACCOUNTS[CURRENT_ACCOUNT_INDEX] CURRENT_ACCOUNT_INDEX = (CURRENT_ACCOUNT_INDEX + 1) % len(ACCOUNTS) if account.is_healthy: account_logger.warning(f"Using account: {account.user_email}") return account if CURRENT_ACCOUNT_INDEX == start_index: raise HTTPException( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="Critical error in account selection: No healthy accounts found in rotation." ) async def fetch_and_set_notion_ids(account: NotionAccount): """Fetches space ID and user ID for a given Notion account and marks it as healthy on success.""" async with account.lock: # Ensure only one fetch operation per account at a time if account.is_healthy: # Don't re-fetch if already healthy return if not account.cookie: account_logger.error("Account validation failed: Cookie not set") account_logger.error(f"Failing cookie (empty): {account.cookie}") account.is_healthy = False return get_spaces_url = "https://www.notion.so/api/v3/getSpaces" # Headers for the JS fetch call (cookie is handled by context) js_fetch_headers = { 'Content-Type': 'application/json', 'accept': '*/*', 'accept-language': 'en-US,en;q=0.9', 'notion-audit-log-platform': 'web', 'notion-client-version': '23.13.0.3686', 'origin': 'https://www.notion.so', 'priority': 'u=1, i', 'referer': 'https://www.notion.so/', 'sec-ch-ua': '"Chromium";v="136", "Google Chrome";v="136", "Not.A/Brand";v="99"', 'sec-ch-ua-mobile': '?0', 'sec-ch-ua-platform': '"Windows"', 'sec-fetch-dest': 'empty', 'sec-fetch-mode': 'cors', 'sec-fetch-site': 'same-origin', 'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/136.0.0.0 Safari/537.36' } browser = None context = None page = None try: async with async_playwright() as p: # Configure browser launch with proxy if PROXY_URL is set launch_args = { 'headless': True, 'args': ['--no-sandbox', '--disable-setuid-sandbox'] } if PROXY_URL: launch_args['proxy'] = {'server': PROXY_URL} browser = await p.chromium.launch(**launch_args) context = await browser.new_context(user_agent=js_fetch_headers['user-agent']) # Add cookies from the account's cookie string cookies_to_add = [] cookie_pairs = account.cookie.split('; ') for pair in cookie_pairs: if '=' in pair: name, value = pair.split('=', 1) cookies_to_add.append({ 'name': name.strip(), 'value': value.strip(), 'domain': '.notion.so', 'path': '/', 'secure': True, 'httpOnly': True, 'sameSite': 'Lax' }) if cookies_to_add: await context.add_cookies(cookies_to_add) else: account_logger.error("Account validation failed: No valid cookies parsed") account_logger.error(f"Failing cookie: {account.cookie}") account.is_healthy = False return page = await context.new_page() await page.goto("https://www.notion.so/", wait_until="domcontentloaded", timeout=15000) # JavaScript to perform the fetch for getSpaces javascript_code_get_spaces = """ async (args) => { const { apiUrl, headers, body } = args; try { const response = await fetch(apiUrl, { method: 'POST', headers: headers, body: JSON.stringify(body) // Ensure body is stringified }); if (!response.ok) { console.error('getSpaces Fetch error:', response.status, await response.text()); return { success: false, error: `HTTP ${response.status}` }; } const data = await response.json(); return { success: true, data: data }; } catch (error) { console.error('getSpaces JS Exception:', error); return { success: false, error: error.toString() }; } } """ js_args = {"apiUrl": get_spaces_url, "headers": js_fetch_headers, "body": {}} # Empty JSON body for getSpaces result = await page.evaluate(javascript_code_get_spaces, js_args) if not result or not result.get('success'): error_detail = result.get('error', 'Unknown error during getSpaces JS execution') account_logger.error(f"Account validation failed: {error_detail}") account_logger.error(f"Failing cookie: {account.cookie}") account.is_healthy = False return data = result.get('data') if not data: account_logger.error("Account validation failed: No data returned") account_logger.error(f"Failing cookie: {account.cookie}") account.is_healthy = False return # Extract user ID user_id_key = next(iter(data), None) if not user_id_key: account_logger.error("Account validation failed: No user ID found") account_logger.error(f"Failing cookie: {account.cookie}") account.is_healthy = False return account.user_id = user_id_key # Extract space ID user_root = data.get(user_id_key, {}).get("user_root", {}).get(user_id_key, {}) space_view_pointers = user_root.get("value", {}).get("value", {}).get("space_view_pointers", []) if space_view_pointers and isinstance(space_view_pointers, list) and len(space_view_pointers) > 0: account.space_id = space_view_pointers[0].get("spaceId") # If spaceId is found, proceed to get user email if account.space_id: get_user_analytics_url = "https://www.notion.so/api/v3/getUserAnalyticsSettings" analytics_headers = { 'Content-Type': 'application/json', 'accept': '*/*', 'notion-audit-log-platform': 'web', 'notion-client-version': '23.13.0.3833', # From user's example 'x-notion-active-user-header': account.user_id, 'x-notion-space-id': account.space_id, 'user-agent': js_fetch_headers['user-agent'] } javascript_code_get_analytics = """ async (args) => { const { apiUrl, headers, body } = args; try { const response = await fetch(apiUrl, { method: 'POST', headers: headers, body: JSON.stringify(body) }); if (!response.ok) { console.error('getUserAnalyticsSettings Fetch error:', response.status, await response.text()); return { success: false, error: `HTTP ${response.status}: ${await response.text()}` }; } const data = await response.json(); return { success: true, data: data }; } catch (error) { console.error('getUserAnalyticsSettings JS Exception:', error); return { success: false, error: error.toString() }; } } """ analytics_args = {"apiUrl": get_user_analytics_url, "headers": analytics_headers, "body": {}} analytics_result = await page.evaluate(javascript_code_get_analytics, analytics_args) if analytics_result and analytics_result.get('success'): analytics_data = analytics_result.get('data') account.user_email = analytics_data.get('user_email') if account.user_email: account_logger.warning(f"Account validated: {account.user_email}") account.is_healthy = True # Mark as healthy only on complete success else: account_logger.error(f"Account validation failed: No email found for User ID: {account.user_id}") account_logger.error(f"Failing cookie: {account.cookie}") account.is_healthy = False else: error_detail = analytics_result.get('error', 'Unknown error') if analytics_result else 'No result from JS' account_logger.error(f"Account validation failed: getUserAnalyticsSettings error: {error_detail}") account_logger.error(f"Failing cookie: {account.cookie}") account.is_healthy = False else: account_logger.error(f"Account validation failed: No spaceId for User ID: {account.user_id}") account_logger.error(f"Failing cookie: {account.cookie}") account.is_healthy = False else: account_logger.error(f"Account validation failed: No space_view_pointers for User ID: {account.user_id}") account_logger.error(f"Failing cookie: {account.cookie}") account.is_healthy = False except PlaywrightError as e: account_logger.error(f"Account validation failed: Playwright error: {e}") account_logger.error(f"Failing cookie: {account.cookie}") account.is_healthy = False except Exception as e: account_logger.error(f"Account validation failed: General error: {e}") account_logger.error(f"Failing cookie: {account.cookie}") account.is_healthy = False finally: if browser and browser.is_connected(): try: await browser.close() except: pass # Suppress any errors during browser close @asynccontextmanager async def lifespan(app: FastAPI): # On startup account_logger.warning("Starting application: Initializing Notion accounts...") if NOTION_COOKIES_RAW: # Split cookies by a unique separator, e.g., '|' cookie_list = [c.strip() for c in NOTION_COOKIES_RAW.split('|') if c.strip()] for cookie in cookie_list: ACCOUNTS.append(NotionAccount(cookie=cookie)) account_logger.warning(f"Loaded {len(ACCOUNTS)} Notion account(s) from environment variable.") if not ACCOUNTS: account_logger.error("CRITICAL: No Notion accounts loaded. The application will not be able to process requests.") else: # Concurrently fetch IDs for all loaded Notion accounts... account_logger.warning("Fetching IDs for all Notion accounts...") fetch_tasks = [fetch_and_set_notion_ids(acc) for acc in ACCOUNTS] await asyncio.gather(*fetch_tasks) healthy_count = sum(1 for acc in ACCOUNTS if acc.is_healthy) account_logger.warning(f"Initialization complete. {healthy_count} of {len(ACCOUNTS)} accounts are healthy.") if healthy_count == 0: account_logger.error("CRITICAL: No healthy Notion accounts available after initialization.") yield # On shutdown account_logger.warning("Application shutdown.") app = FastAPI(lifespan=lifespan) # --- Helper Functions --- def build_notion_request(request_data: ChatCompletionRequest, account: NotionAccount) -> NotionRequestBody: """Transforms OpenAI-style messages to Notion transcript format, using the provided account.""" # --- Timestamp and User ID Logic --- # Use the user ID from the selected account user_id = account.user_id # Get all non-assistant messages to assign timestamps non_assistant_messages = [msg for msg in request_data.messages if msg.role != "assistant"] num_non_assistant_messages = len(non_assistant_messages) message_timestamps = {} # Store timestamps keyed by message id if num_non_assistant_messages > 0: # Get current time specifically in Pacific Time (America/Los_Angeles) pacific_tz = ZoneInfo("America/Los_Angeles") now_pacific = datetime.now(timezone.utc).astimezone(pacific_tz) # Assign timestamp to the last non-assistant message last_msg_id = non_assistant_messages[-1].id message_timestamps[last_msg_id] = now_pacific # Calculate timestamps for previous non-assistant messages (random intervals earlier) current_timestamp = now_pacific for i in range(num_non_assistant_messages - 2, -1, -1): # Iterate backwards from second-to-last current_timestamp -= timedelta(minutes=random.randint(3, 20)) # Use random interval (3-20 mins) message_timestamps[non_assistant_messages[i].id] = current_timestamp # --- Build Transcript --- # Get current time in Pacific timezone for context pacific_tz = ZoneInfo("America/Los_Angeles") now_pacific = datetime.now(timezone.utc).astimezone(pacific_tz) # Format timestamp exactly as YYYY-MM-DDTHH:MM:SS.fff-HH:MM dt_str = now_pacific.strftime("%Y-%m-%dT%H:%M:%S") ms = f"{now_pacific.microsecond // 1000:03d}" # Ensure 3 digits for milliseconds tz_str = now_pacific.strftime("%z") # Gets +HHMM or -HHMM formatted_tz = f"{tz_str[:-2]}:{tz_str[-2:]}" # Insert colon current_datetime_iso = f"{dt_str}.{ms}{formatted_tz}" # Generate random text for userName and spaceName random_words = ["Project", "Workspace", "Team", "Studio", "Lab", "Hub", "Zone", "Space"] user_name = f"User{random.randint(100, 999)}" space_name = f"{random.choice(random_words)} {random.randint(1, 99)}" transcript = [ NotionTranscriptItem( type="config", value=NotionTranscriptConfigValue(model=request_data.notion_model) ), NotionTranscriptItem( type="context", value=NotionTranscriptContextValue( userId=user_id or "", # Use the user_id from the selected account spaceId=account.space_id, # Use space_id from the selected account surface="home_module", timezone="America/Los_Angeles", userName=user_name, spaceName=space_name, spaceViewId=str(uuid.uuid4()), # Random UUID for spaceViewId currentDatetime=current_datetime_iso ) ), NotionTranscriptItem( type="agent-integration" # No value field needed for agent-integration ) ] for message in request_data.messages: if message.role == "assistant": # Assistant messages get type="markdown-chat" and a traceId transcript.append(NotionTranscriptItem( type="markdown-chat", value=message.content, traceId=str(uuid.uuid4()) # Generate unique traceId for assistant message )) else: # Treat all other roles (user, system, etc.) as "user" type created_at_dt = message_timestamps.get(message.id) # Use the unified timestamp dict created_at_iso = None if created_at_dt: # Format timestamp exactly as YYYY-MM-DDTHH:MM:SS.fff-HH:MM dt_str = created_at_dt.strftime("%Y-%m-%dT%H:%M:%S") ms = f"{created_at_dt.microsecond // 1000:03d}" # Ensure 3 digits for milliseconds tz_str = created_at_dt.strftime("%z") # Gets +HHMM or -HHMM formatted_tz = f"{tz_str[:-2]}:{tz_str[-2:]}" # Insert colon created_at_iso = f"{dt_str}.{ms}{formatted_tz}" content = message.content # Ensure content is treated as a string for user/system messages if isinstance(content, list): # Attempt to extract text from list format, default to empty string text_content = "" for part in content: if isinstance(part, dict) and part.get("type") == "text": text_part = part.get("text") if isinstance(text_part, str): text_content += text_part # Concatenate text parts if needed content = text_content if text_content else "" # Use extracted text or empty string elif not isinstance(content, str): content = "" # Default to empty string if not list or string # Format value as expected by Notion for user type: [[content_string]] notion_value = [[content]] if content else [[""]] transcript.append(NotionTranscriptItem( type="user", # Set type to "user" for non-assistant roles value=notion_value, userId=user_id, # Assign userId createdAt=created_at_iso # Assign timestamp # No traceId for user/system messages )) # Use spaceId from the selected account, set createThread=True return NotionRequestBody( spaceId=account.space_id, # From selected account transcript=transcript, createThread=True, # Always create a new thread # Generate a new traceId for each request traceId=str(uuid.uuid4()), # Explicitly set debugOverrides, generateTitle, and saveAllThreadOperations debugOverrides=NotionDebugOverrides( cachedInferences={}, annotationInferences={}, emitInferences=False ), generateTitle=False, saveAllThreadOperations=False ) # --- Background Playwright Task --- async def _run_playwright_fetch( chunk_queue: asyncio.Queue, notion_request_body: NotionRequestBody, headers_template: dict, notion_api_url: str, account: NotionAccount # Pass the whole account object ): """Runs Playwright fetch in the background, putting results into a queue.""" browser = None context = None page = None # Construct headers for this specific task run current_headers = headers_template.copy() current_headers['x-notion-space-id'] = account.space_id if account.user_id: current_headers['x-notion-active-user-header'] = account.user_id async def handle_chunk(chunk_str: str): await chunk_queue.put(chunk_str) async def handle_stream_end(): await chunk_queue.put(None) try: async with async_playwright() as p: # Configure browser launch with proxy if PROXY_URL is set launch_args = { 'headless': True, 'args': ['--no-sandbox', '--disable-setuid-sandbox'] } if PROXY_URL: launch_args['proxy'] = {'server': PROXY_URL} browser = await p.chromium.launch(**launch_args) context = await browser.new_context(user_agent=current_headers.get('user-agent', 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/136.0.0.0 Safari/537.36')) if account.cookie: cookies_to_add = [] cookie_pairs = account.cookie.split('; ') for pair in cookie_pairs: if '=' in pair: name, value = pair.split('=', 1) cookies_to_add.append({ 'name': name.strip(), 'value': value.strip(), 'domain': '.notion.so', 'path': '/', 'secure': True, 'httpOnly': True, 'sameSite': 'Lax' }) if cookies_to_add: await context.add_cookies(cookies_to_add) else: raise ValueError("Server configuration error: No valid cookies for request") else: raise ValueError("Server configuration error: Notion cookie not set for request") page = await context.new_page() await page.goto("https://www.notion.so/chat", wait_until="domcontentloaded") await page.expose_function("sendChunkToPython", handle_chunk) await page.expose_function("signalStreamEnd", handle_stream_end) request_body_json_str = notion_request_body.json() # Prepare headers for JS fetch (cookie is handled by context) js_fetch_headers = current_headers.copy() if 'cookie' in js_fetch_headers: del js_fetch_headers['cookie'] javascript_code = """ async (args) => { const { apiUrl, headers, body } = args; try { const response = await fetch(apiUrl, { method: 'POST', headers: headers, body: body }); if (!response.ok) { const errorText = await response.text(); console.error('JS Fetch error:', response.status, errorText); await window.signalStreamEnd(); return { success: false, status: response.status, error: errorText }; } if (!response.body) { console.error('JS Response body is null'); await window.signalStreamEnd(); return { success: false, error: 'Response body is null' }; } const reader = response.body.getReader(); const decoder = new TextDecoder(); while (true) { const { done, value } = await reader.read(); if (done) break; await window.sendChunkToPython(decoder.decode(value, { stream: true })); } await window.signalStreamEnd(); return { success: true }; } catch (error) { console.error('JS Exception during fetch:', error); await window.signalStreamEnd(); return { success: false, error: error.toString() }; } } """ js_args = {"apiUrl": notion_api_url, "headers": js_fetch_headers, "body": request_body_json_str} js_result = await page.evaluate(javascript_code, js_args) if not js_result or not js_result.get('success'): error_detail = js_result.get('error', 'Unknown JS execution error') # Error already signaled to queue by JS calling signalStreamEnd # Re-raise to be caught by the task's main try/except raise PlaywrightError(f"JS Fetch Error: {error_detail}") except Exception as e: await chunk_queue.put(None) # Ensure queue is terminated on error # Exception will be caught by playwright_task.exception() in the main generator finally: if browser and browser.is_connected(): try: await browser.close() except: pass # Suppress any errors during browser close # --- Main Generator Called by Endpoint --- async def stream_notion_response(notion_request_body: NotionRequestBody, account: NotionAccount): """Creates background task for Playwright and yields results from queue.""" chunk_queue = asyncio.Queue() playwright_task = None # These should be defined once per request stream chunk_id = f"chatcmpl-{uuid.uuid4()}" created_time = int(time.time()) # Define the template for headers here, to be passed to the background task headers_template = { 'accept': 'application/x-ndjson', 'accept-language': 'en-US,en;q=0.9', 'content-type': 'application/json', 'notion-audit-log-platform': 'web', 'notion-client-version': '23.13.0.3668', 'origin': 'https://www.notion.so', 'priority': 'u=1, i', 'referer': 'https://www.notion.so/chat', 'sec-ch-ua': '"Chromium";v="136", "Google Chrome";v="136", "Not.A/Brand";v="99"', 'sec-ch-ua-mobile': '?0', 'sec-ch-ua-platform': '"Windows"', 'sec-fetch-dest': 'empty', 'sec-fetch-mode': 'cors', 'sec-fetch-site': 'same-origin', 'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/136.0.0.0 Safari/537.36', # 'cookie' and 'x-notion-space-id' will be handled/added by _run_playwright_fetch } try: # Health check is now performed by get_next_account() before this function is called. playwright_task = asyncio.create_task( _run_playwright_fetch( chunk_queue, notion_request_body, headers_template, NOTION_API_URL, # Global constant account # Pass the selected account ) ) accumulated_line = "" while True: chunk = await chunk_queue.get() # Wait for a chunk from the background task if chunk is None: break accumulated_line += chunk while '\n' in accumulated_line: line, accumulated_line = accumulated_line.split('\n', 1) if not line.strip(): continue try: data = json.loads(line) if data.get("type") == "markdown-chat" and isinstance(data.get("value"), str): content_chunk = data["value"] if content_chunk: sse_chunk = ChatCompletionChunk( id=chunk_id, created=created_time, choices=[Choice(delta=ChoiceDelta(content=content_chunk))] ) yield f"data: {sse_chunk.json()}\n\n" elif "recordMap" in data: pass # Ignore recordMap chunks except json.JSONDecodeError: pass # Ignore JSON decode errors in silent mode except Exception as e: # Only log critical errors account_logger.error(f"Critical error processing stream: {e}") # Process any final accumulated data after None sentinel if accumulated_line.strip(): try: data = json.loads(accumulated_line) if data.get("type") == "markdown-chat" and isinstance(data.get("value"), str): content_chunk = data["value"] if content_chunk: sse_chunk = ChatCompletionChunk( id=chunk_id, created=created_time, choices=[Choice(delta=ChoiceDelta(content=content_chunk))] ) yield f"data: {sse_chunk.json()}\n\n" except json.JSONDecodeError: pass # Ignore JSON decode errors in silent mode except Exception as e: # Only log critical errors account_logger.error(f"Critical error processing final data: {e}") # After loop, check if the background task raised an exception if playwright_task.done() and playwright_task.exception(): task_exception = playwright_task.exception() account_logger.error(f"Background task error: {task_exception}") raise HTTPException(status_code=500, detail=f"Error during background processing: {task_exception}") else: final_chunk = ChatCompletionChunk( id=chunk_id, created=created_time, choices=[Choice(delta=ChoiceDelta(), finish_reason="stop")] ) yield f"data: {final_chunk.json()}\n\n" yield "data: [DONE]\n\n" except Exception as e: account_logger.error(f"Stream response error: {e}") if playwright_task and not playwright_task.done(): playwright_task.cancel() raise finally: if playwright_task and not playwright_task.done(): playwright_task.cancel() try: await playwright_task # Allow cancellation to propagate except asyncio.CancelledError: pass except Exception: pass # Suppress errors during cleanup # --- API Endpoint --- @app.get("/v1/models", response_model=ModelList) async def list_models(authenticated: bool = Depends(authenticate)): """ Endpoint to list available Notion models, mimicking OpenAI's /v1/models. """ available_models = [ "openai-gpt-4.1", "anthropic-opus-4", "anthropic-sonnet-4" ] model_list = [ Model(id=model_id, owned_by="notion") # created uses default_factory for model_id in available_models ] return ModelList(data=model_list) @app.post("/v1/chat/completions") async def chat_completions(request_data: ChatCompletionRequest, request: Request, authenticated: bool = Depends(authenticate)): """ Endpoint to mimic OpenAI's chat completions, proxying to Notion. It uses round-robin to select a healthy Notion account for each request. """ account = get_next_account() # Select a healthy account notion_request_body = build_notion_request(request_data, account) if request_data.stream: # Call the Playwright generator, passing the selected account return StreamingResponse( stream_notion_response(notion_request_body, account), media_type="text/event-stream" ) else: # --- Non-Streaming Logic --- full_response_content = "" final_finish_reason = None chunk_id = f"chatcmpl-{uuid.uuid4()}" # Generate ID for the non-streamed response created_time = int(time.time()) try: # Call the Playwright generator, passing the selected account async for line in stream_notion_response(notion_request_body, account): if line.startswith("data: ") and "[DONE]" not in line: try: data_json = line[len("data: "):].strip() if data_json: chunk_data = json.loads(data_json) if chunk_data.get("choices"): delta = chunk_data["choices"][0].get("delta", {}) content = delta.get("content") if content: full_response_content += content finish_reason = chunk_data["choices"][0].get("finish_reason") if finish_reason: final_finish_reason = finish_reason except json.JSONDecodeError: pass # Ignore JSON errors in non-streaming mode except Exception as e: account_logger.error(f"Error processing non-streaming response: {e}") # Construct the final OpenAI-compatible non-streaming response return { "id": chunk_id, "object": "chat.completion", "created": created_time, "model": request_data.model, # Return the model requested by the client "choices": [ { "index": 0, "message": { "role": "assistant", "content": full_response_content, }, "finish_reason": final_finish_reason or "stop", # Default to stop if not explicitly set } ], "usage": { # Note: Token usage is not available from Notion "prompt_tokens": None, "completion_tokens": None, "total_tokens": None, }, } except HTTPException as e: # Re-raise HTTP exceptions from the streaming function raise e except Exception as e: account_logger.error(f"Error during non-streaming processing: {e}") raise HTTPException(status_code=500, detail="Internal server error processing Notion response") # --- Uvicorn Runner --- if __name__ == "__main__": import uvicorn print("Starting server. Access at http://127.0.0.1:7860") print("Ensure NOTION_COOKIES is set in your .env file or environment, separated by '|'.") uvicorn.run(app, host="127.0.0.1", port=7860)