notion2api / main.py
clash-linux's picture
Upload 9 files
b9ebd37 verified
import asyncio
import sys # For platform check
import logging
import os
import uuid
import json
import time
import random
import asyncio
import httpx # For getSpaces API call
from contextlib import asynccontextmanager # For lifespan
from playwright.async_api import async_playwright, Error as PlaywrightError
from fastapi import FastAPI, Request, HTTPException, Depends, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from fastapi.responses import StreamingResponse
from dotenv import load_dotenv
import secrets # Added for secure comparison
from datetime import datetime, timedelta, timezone # Explicit datetime imports
from zoneinfo import ZoneInfo # For timezone handling
from typing import List, Optional # Add List and Optional for typing
from models import (
ChatMessage, ChatCompletionRequest, NotionTranscriptConfigValue,
NotionTranscriptContextValue, NotionTranscriptItem, NotionDebugOverrides,
NotionRequestBody, ChoiceDelta, Choice, ChatCompletionChunk, Model, ModelList
)
# Load environment variables from .env file
load_dotenv()
# --- Event Loop Policy for Windows ---
# For Playwright compatibility, especially with subprocesses on Windows
if sys.platform == "win32":
asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy())
logging.info("Set WindowsProactorEventLoopPolicy for asyncio.")
# --- Logging Configuration ---
# Change logging level from INFO to WARNING to reduce verbosity
# Only important messages and errors will be displayed
logging.basicConfig(
level=logging.WARNING, # Changed from INFO to WARNING
format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S' # Standardize date format
)
# Create a custom logger for critical account operations that we always want to see
account_logger = logging.getLogger("notion_account")
account_logger.setLevel(logging.INFO)
# --- Account Management Class ---
class NotionAccount:
"""Represents a single Notion account with its cookie and fetched IDs."""
def __init__(self, cookie: str):
self.cookie = cookie
self.space_id: Optional[str] = None
self.user_id: Optional[str] = None
self.user_email: Optional[str] = None # Added for identification
self.is_healthy: bool = False
self.lock = asyncio.Lock() # To prevent concurrent fetches for the same account
def __str__(self):
return f"Account(user_email={self.user_email}, user_id={self.user_id}, healthy={self.is_healthy})"
# --- Configuration ---
NOTION_API_URL = "https://www.notion.so/api/v3/runInferenceTranscript"
# IMPORTANT: Load multiple Notion cookies securely from environment variables
# Use a unique separator like '|' because cookies can contain ';' and ','
NOTION_COOKIES_RAW = os.getenv("NOTION_COOKIES")
# --- Global State for Account Polling ---
ACCOUNTS: List[NotionAccount] = []
CURRENT_ACCOUNT_INDEX = 0 # Index for round-robin
if not NOTION_COOKIES_RAW:
# This is a critical error, app cannot function without it.
logging.error("CRITICAL: NOTION_COOKIES environment variable not set. Application will not work.")
# In a real app, you might exit or raise a more severe startup error.
# --- Proxy Configuration ---
PROXY_URL = os.getenv("PROXY_URL", "") # Empty string as default
# --- Authentication ---
EXPECTED_TOKEN = os.getenv("PROXY_AUTH_TOKEN", "default_token") # Default token
security = HTTPBearer()
def authenticate(credentials: HTTPAuthorizationCredentials = Depends(security)):
"""Compares provided token with the expected token."""
correct_token = secrets.compare_digest(credentials.credentials, EXPECTED_TOKEN)
if not correct_token:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials",
# WWW-Authenticate header removed for Bearer
)
return True # Indicate successful authentication
# --- Notion Account Management ---
def get_next_account() -> NotionAccount:
"""Selects the next healthy Notion account using a round-robin strategy."""
global CURRENT_ACCOUNT_INDEX
# This check runs on every request to ensure we don't try to select from an empty list
# in case all accounts failed during startup.
if not ACCOUNTS:
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail="No Notion accounts are configured in the server."
)
healthy_accounts = [acc for acc in ACCOUNTS if acc.is_healthy]
if not healthy_accounts:
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail="No healthy Notion accounts available to process the request."
)
# Round-robin logic
start_index = CURRENT_ACCOUNT_INDEX
while True:
account = ACCOUNTS[CURRENT_ACCOUNT_INDEX]
CURRENT_ACCOUNT_INDEX = (CURRENT_ACCOUNT_INDEX + 1) % len(ACCOUNTS)
if account.is_healthy:
account_logger.warning(f"Using account: {account.user_email}")
return account
if CURRENT_ACCOUNT_INDEX == start_index:
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail="Critical error in account selection: No healthy accounts found in rotation."
)
async def fetch_and_set_notion_ids(account: NotionAccount):
"""Fetches space ID and user ID for a given Notion account and marks it as healthy on success."""
async with account.lock: # Ensure only one fetch operation per account at a time
if account.is_healthy: # Don't re-fetch if already healthy
return
if not account.cookie:
account_logger.error("Account validation failed: Cookie not set")
account_logger.error(f"Failing cookie (empty): {account.cookie}")
account.is_healthy = False
return
get_spaces_url = "https://www.notion.so/api/v3/getSpaces"
# Headers for the JS fetch call (cookie is handled by context)
js_fetch_headers = {
'Content-Type': 'application/json',
'accept': '*/*',
'accept-language': 'en-US,en;q=0.9',
'notion-audit-log-platform': 'web',
'notion-client-version': '23.13.0.3686',
'origin': 'https://www.notion.so',
'priority': 'u=1, i',
'referer': 'https://www.notion.so/',
'sec-ch-ua': '"Chromium";v="136", "Google Chrome";v="136", "Not.A/Brand";v="99"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"Windows"',
'sec-fetch-dest': 'empty',
'sec-fetch-mode': 'cors',
'sec-fetch-site': 'same-origin',
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/136.0.0.0 Safari/537.36'
}
browser = None
context = None
page = None
try:
async with async_playwright() as p:
# Configure browser launch with proxy if PROXY_URL is set
launch_args = {
'headless': True,
'args': ['--no-sandbox', '--disable-setuid-sandbox']
}
if PROXY_URL:
launch_args['proxy'] = {'server': PROXY_URL}
browser = await p.chromium.launch(**launch_args)
context = await browser.new_context(user_agent=js_fetch_headers['user-agent'])
# Add cookies from the account's cookie string
cookies_to_add = []
cookie_pairs = account.cookie.split('; ')
for pair in cookie_pairs:
if '=' in pair:
name, value = pair.split('=', 1)
cookies_to_add.append({
'name': name.strip(), 'value': value.strip(),
'domain': '.notion.so', 'path': '/',
'secure': True, 'httpOnly': True, 'sameSite': 'Lax'
})
if cookies_to_add:
await context.add_cookies(cookies_to_add)
else:
account_logger.error("Account validation failed: No valid cookies parsed")
account_logger.error(f"Failing cookie: {account.cookie}")
account.is_healthy = False
return
page = await context.new_page()
await page.goto("https://www.notion.so/", wait_until="domcontentloaded", timeout=15000)
# JavaScript to perform the fetch for getSpaces
javascript_code_get_spaces = """
async (args) => {
const { apiUrl, headers, body } = args;
try {
const response = await fetch(apiUrl, {
method: 'POST',
headers: headers,
body: JSON.stringify(body) // Ensure body is stringified
});
if (!response.ok) {
console.error('getSpaces Fetch error:', response.status, await response.text());
return { success: false, error: `HTTP ${response.status}` };
}
const data = await response.json();
return { success: true, data: data };
} catch (error) {
console.error('getSpaces JS Exception:', error);
return { success: false, error: error.toString() };
}
}
"""
js_args = {"apiUrl": get_spaces_url, "headers": js_fetch_headers, "body": {}} # Empty JSON body for getSpaces
result = await page.evaluate(javascript_code_get_spaces, js_args)
if not result or not result.get('success'):
error_detail = result.get('error', 'Unknown error during getSpaces JS execution')
account_logger.error(f"Account validation failed: {error_detail}")
account_logger.error(f"Failing cookie: {account.cookie}")
account.is_healthy = False
return
data = result.get('data')
if not data:
account_logger.error("Account validation failed: No data returned")
account_logger.error(f"Failing cookie: {account.cookie}")
account.is_healthy = False
return
# Extract user ID
user_id_key = next(iter(data), None)
if not user_id_key:
account_logger.error("Account validation failed: No user ID found")
account_logger.error(f"Failing cookie: {account.cookie}")
account.is_healthy = False
return
account.user_id = user_id_key
# Extract space ID
user_root = data.get(user_id_key, {}).get("user_root", {}).get(user_id_key, {})
space_view_pointers = user_root.get("value", {}).get("value", {}).get("space_view_pointers", [])
if space_view_pointers and isinstance(space_view_pointers, list) and len(space_view_pointers) > 0:
account.space_id = space_view_pointers[0].get("spaceId")
# If spaceId is found, proceed to get user email
if account.space_id:
get_user_analytics_url = "https://www.notion.so/api/v3/getUserAnalyticsSettings"
analytics_headers = {
'Content-Type': 'application/json',
'accept': '*/*',
'notion-audit-log-platform': 'web',
'notion-client-version': '23.13.0.3833', # From user's example
'x-notion-active-user-header': account.user_id,
'x-notion-space-id': account.space_id,
'user-agent': js_fetch_headers['user-agent']
}
javascript_code_get_analytics = """
async (args) => {
const { apiUrl, headers, body } = args;
try {
const response = await fetch(apiUrl, {
method: 'POST',
headers: headers,
body: JSON.stringify(body)
});
if (!response.ok) {
console.error('getUserAnalyticsSettings Fetch error:', response.status, await response.text());
return { success: false, error: `HTTP ${response.status}: ${await response.text()}` };
}
const data = await response.json();
return { success: true, data: data };
} catch (error) {
console.error('getUserAnalyticsSettings JS Exception:', error);
return { success: false, error: error.toString() };
}
}
"""
analytics_args = {"apiUrl": get_user_analytics_url, "headers": analytics_headers, "body": {}}
analytics_result = await page.evaluate(javascript_code_get_analytics, analytics_args)
if analytics_result and analytics_result.get('success'):
analytics_data = analytics_result.get('data')
account.user_email = analytics_data.get('user_email')
if account.user_email:
account_logger.warning(f"Account validated: {account.user_email}")
account.is_healthy = True # Mark as healthy only on complete success
else:
account_logger.error(f"Account validation failed: No email found for User ID: {account.user_id}")
account_logger.error(f"Failing cookie: {account.cookie}")
account.is_healthy = False
else:
error_detail = analytics_result.get('error', 'Unknown error') if analytics_result else 'No result from JS'
account_logger.error(f"Account validation failed: getUserAnalyticsSettings error: {error_detail}")
account_logger.error(f"Failing cookie: {account.cookie}")
account.is_healthy = False
else:
account_logger.error(f"Account validation failed: No spaceId for User ID: {account.user_id}")
account_logger.error(f"Failing cookie: {account.cookie}")
account.is_healthy = False
else:
account_logger.error(f"Account validation failed: No space_view_pointers for User ID: {account.user_id}")
account_logger.error(f"Failing cookie: {account.cookie}")
account.is_healthy = False
except PlaywrightError as e:
account_logger.error(f"Account validation failed: Playwright error: {e}")
account_logger.error(f"Failing cookie: {account.cookie}")
account.is_healthy = False
except Exception as e:
account_logger.error(f"Account validation failed: General error: {e}")
account_logger.error(f"Failing cookie: {account.cookie}")
account.is_healthy = False
finally:
if browser and browser.is_connected():
try:
await browser.close()
except:
pass # Suppress any errors during browser close
@asynccontextmanager
async def lifespan(app: FastAPI):
# On startup
account_logger.warning("Starting application: Initializing Notion accounts...")
if NOTION_COOKIES_RAW:
# Split cookies by a unique separator, e.g., '|'
cookie_list = [c.strip() for c in NOTION_COOKIES_RAW.split('|') if c.strip()]
for cookie in cookie_list:
ACCOUNTS.append(NotionAccount(cookie=cookie))
account_logger.warning(f"Loaded {len(ACCOUNTS)} Notion account(s) from environment variable.")
if not ACCOUNTS:
account_logger.error("CRITICAL: No Notion accounts loaded. The application will not be able to process requests.")
else:
# Concurrently fetch IDs for all loaded Notion accounts...
account_logger.warning("Fetching IDs for all Notion accounts...")
fetch_tasks = [fetch_and_set_notion_ids(acc) for acc in ACCOUNTS]
await asyncio.gather(*fetch_tasks)
healthy_count = sum(1 for acc in ACCOUNTS if acc.is_healthy)
account_logger.warning(f"Initialization complete. {healthy_count} of {len(ACCOUNTS)} accounts are healthy.")
if healthy_count == 0:
account_logger.error("CRITICAL: No healthy Notion accounts available after initialization.")
yield
# On shutdown
account_logger.warning("Application shutdown.")
app = FastAPI(lifespan=lifespan)
# --- Helper Functions ---
def build_notion_request(request_data: ChatCompletionRequest, account: NotionAccount) -> NotionRequestBody:
"""Transforms OpenAI-style messages to Notion transcript format, using the provided account."""
# --- Timestamp and User ID Logic ---
# Use the user ID from the selected account
user_id = account.user_id
# Get all non-assistant messages to assign timestamps
non_assistant_messages = [msg for msg in request_data.messages if msg.role != "assistant"]
num_non_assistant_messages = len(non_assistant_messages)
message_timestamps = {} # Store timestamps keyed by message id
if num_non_assistant_messages > 0:
# Get current time specifically in Pacific Time (America/Los_Angeles)
pacific_tz = ZoneInfo("America/Los_Angeles")
now_pacific = datetime.now(timezone.utc).astimezone(pacific_tz)
# Assign timestamp to the last non-assistant message
last_msg_id = non_assistant_messages[-1].id
message_timestamps[last_msg_id] = now_pacific
# Calculate timestamps for previous non-assistant messages (random intervals earlier)
current_timestamp = now_pacific
for i in range(num_non_assistant_messages - 2, -1, -1): # Iterate backwards from second-to-last
current_timestamp -= timedelta(minutes=random.randint(3, 20)) # Use random interval (3-20 mins)
message_timestamps[non_assistant_messages[i].id] = current_timestamp
# --- Build Transcript ---
# Get current time in Pacific timezone for context
pacific_tz = ZoneInfo("America/Los_Angeles")
now_pacific = datetime.now(timezone.utc).astimezone(pacific_tz)
# Format timestamp exactly as YYYY-MM-DDTHH:MM:SS.fff-HH:MM
dt_str = now_pacific.strftime("%Y-%m-%dT%H:%M:%S")
ms = f"{now_pacific.microsecond // 1000:03d}" # Ensure 3 digits for milliseconds
tz_str = now_pacific.strftime("%z") # Gets +HHMM or -HHMM
formatted_tz = f"{tz_str[:-2]}:{tz_str[-2:]}" # Insert colon
current_datetime_iso = f"{dt_str}.{ms}{formatted_tz}"
# Generate random text for userName and spaceName
random_words = ["Project", "Workspace", "Team", "Studio", "Lab", "Hub", "Zone", "Space"]
user_name = f"User{random.randint(100, 999)}"
space_name = f"{random.choice(random_words)} {random.randint(1, 99)}"
transcript = [
NotionTranscriptItem(
type="config",
value=NotionTranscriptConfigValue(model=request_data.notion_model)
),
NotionTranscriptItem(
type="context",
value=NotionTranscriptContextValue(
userId=user_id or "", # Use the user_id from the selected account
spaceId=account.space_id, # Use space_id from the selected account
surface="home_module",
timezone="America/Los_Angeles",
userName=user_name,
spaceName=space_name,
spaceViewId=str(uuid.uuid4()), # Random UUID for spaceViewId
currentDatetime=current_datetime_iso
)
),
NotionTranscriptItem(
type="agent-integration"
# No value field needed for agent-integration
)
]
for message in request_data.messages:
if message.role == "assistant":
# Assistant messages get type="markdown-chat" and a traceId
transcript.append(NotionTranscriptItem(
type="markdown-chat",
value=message.content,
traceId=str(uuid.uuid4()) # Generate unique traceId for assistant message
))
else: # Treat all other roles (user, system, etc.) as "user" type
created_at_dt = message_timestamps.get(message.id) # Use the unified timestamp dict
created_at_iso = None
if created_at_dt:
# Format timestamp exactly as YYYY-MM-DDTHH:MM:SS.fff-HH:MM
dt_str = created_at_dt.strftime("%Y-%m-%dT%H:%M:%S")
ms = f"{created_at_dt.microsecond // 1000:03d}" # Ensure 3 digits for milliseconds
tz_str = created_at_dt.strftime("%z") # Gets +HHMM or -HHMM
formatted_tz = f"{tz_str[:-2]}:{tz_str[-2:]}" # Insert colon
created_at_iso = f"{dt_str}.{ms}{formatted_tz}"
content = message.content
# Ensure content is treated as a string for user/system messages
if isinstance(content, list):
# Attempt to extract text from list format, default to empty string
text_content = ""
for part in content:
if isinstance(part, dict) and part.get("type") == "text":
text_part = part.get("text")
if isinstance(text_part, str):
text_content += text_part # Concatenate text parts if needed
content = text_content if text_content else "" # Use extracted text or empty string
elif not isinstance(content, str):
content = "" # Default to empty string if not list or string
# Format value as expected by Notion for user type: [[content_string]]
notion_value = [[content]] if content else [[""]]
transcript.append(NotionTranscriptItem(
type="user", # Set type to "user" for non-assistant roles
value=notion_value,
userId=user_id, # Assign userId
createdAt=created_at_iso # Assign timestamp
# No traceId for user/system messages
))
# Use spaceId from the selected account, set createThread=True
return NotionRequestBody(
spaceId=account.space_id, # From selected account
transcript=transcript,
createThread=True, # Always create a new thread
# Generate a new traceId for each request
traceId=str(uuid.uuid4()),
# Explicitly set debugOverrides, generateTitle, and saveAllThreadOperations
debugOverrides=NotionDebugOverrides(
cachedInferences={},
annotationInferences={},
emitInferences=False
),
generateTitle=False,
saveAllThreadOperations=False
)
# --- Background Playwright Task ---
async def _run_playwright_fetch(
chunk_queue: asyncio.Queue,
notion_request_body: NotionRequestBody,
headers_template: dict,
notion_api_url: str,
account: NotionAccount # Pass the whole account object
):
"""Runs Playwright fetch in the background, putting results into a queue."""
browser = None
context = None
page = None
# Construct headers for this specific task run
current_headers = headers_template.copy()
current_headers['x-notion-space-id'] = account.space_id
if account.user_id:
current_headers['x-notion-active-user-header'] = account.user_id
async def handle_chunk(chunk_str: str):
await chunk_queue.put(chunk_str)
async def handle_stream_end():
await chunk_queue.put(None)
try:
async with async_playwright() as p:
# Configure browser launch with proxy if PROXY_URL is set
launch_args = {
'headless': True,
'args': ['--no-sandbox', '--disable-setuid-sandbox']
}
if PROXY_URL:
launch_args['proxy'] = {'server': PROXY_URL}
browser = await p.chromium.launch(**launch_args)
context = await browser.new_context(user_agent=current_headers.get('user-agent', 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/136.0.0.0 Safari/537.36'))
if account.cookie:
cookies_to_add = []
cookie_pairs = account.cookie.split('; ')
for pair in cookie_pairs:
if '=' in pair:
name, value = pair.split('=', 1)
cookies_to_add.append({
'name': name.strip(), 'value': value.strip(),
'domain': '.notion.so', 'path': '/',
'secure': True, 'httpOnly': True, 'sameSite': 'Lax'
})
if cookies_to_add:
await context.add_cookies(cookies_to_add)
else:
raise ValueError("Server configuration error: No valid cookies for request")
else:
raise ValueError("Server configuration error: Notion cookie not set for request")
page = await context.new_page()
await page.goto("https://www.notion.so/chat", wait_until="domcontentloaded")
await page.expose_function("sendChunkToPython", handle_chunk)
await page.expose_function("signalStreamEnd", handle_stream_end)
request_body_json_str = notion_request_body.json()
# Prepare headers for JS fetch (cookie is handled by context)
js_fetch_headers = current_headers.copy()
if 'cookie' in js_fetch_headers:
del js_fetch_headers['cookie']
javascript_code = """
async (args) => {
const { apiUrl, headers, body } = args;
try {
const response = await fetch(apiUrl, {
method: 'POST',
headers: headers,
body: body
});
if (!response.ok) {
const errorText = await response.text();
console.error('JS Fetch error:', response.status, errorText);
await window.signalStreamEnd();
return { success: false, status: response.status, error: errorText };
}
if (!response.body) {
console.error('JS Response body is null');
await window.signalStreamEnd();
return { success: false, error: 'Response body is null' };
}
const reader = response.body.getReader();
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) break;
await window.sendChunkToPython(decoder.decode(value, { stream: true }));
}
await window.signalStreamEnd();
return { success: true };
} catch (error) {
console.error('JS Exception during fetch:', error);
await window.signalStreamEnd();
return { success: false, error: error.toString() };
}
}
"""
js_args = {"apiUrl": notion_api_url, "headers": js_fetch_headers, "body": request_body_json_str}
js_result = await page.evaluate(javascript_code, js_args)
if not js_result or not js_result.get('success'):
error_detail = js_result.get('error', 'Unknown JS execution error')
# Error already signaled to queue by JS calling signalStreamEnd
# Re-raise to be caught by the task's main try/except
raise PlaywrightError(f"JS Fetch Error: {error_detail}")
except Exception as e:
await chunk_queue.put(None) # Ensure queue is terminated on error
# Exception will be caught by playwright_task.exception() in the main generator
finally:
if browser and browser.is_connected():
try:
await browser.close()
except:
pass # Suppress any errors during browser close
# --- Main Generator Called by Endpoint ---
async def stream_notion_response(notion_request_body: NotionRequestBody, account: NotionAccount):
"""Creates background task for Playwright and yields results from queue."""
chunk_queue = asyncio.Queue()
playwright_task = None
# These should be defined once per request stream
chunk_id = f"chatcmpl-{uuid.uuid4()}"
created_time = int(time.time())
# Define the template for headers here, to be passed to the background task
headers_template = {
'accept': 'application/x-ndjson',
'accept-language': 'en-US,en;q=0.9',
'content-type': 'application/json',
'notion-audit-log-platform': 'web',
'notion-client-version': '23.13.0.3668',
'origin': 'https://www.notion.so',
'priority': 'u=1, i',
'referer': 'https://www.notion.so/chat',
'sec-ch-ua': '"Chromium";v="136", "Google Chrome";v="136", "Not.A/Brand";v="99"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"Windows"',
'sec-fetch-dest': 'empty',
'sec-fetch-mode': 'cors',
'sec-fetch-site': 'same-origin',
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/136.0.0.0 Safari/537.36',
# 'cookie' and 'x-notion-space-id' will be handled/added by _run_playwright_fetch
}
try:
# Health check is now performed by get_next_account() before this function is called.
playwright_task = asyncio.create_task(
_run_playwright_fetch(
chunk_queue,
notion_request_body,
headers_template,
NOTION_API_URL, # Global constant
account # Pass the selected account
)
)
accumulated_line = ""
while True:
chunk = await chunk_queue.get() # Wait for a chunk from the background task
if chunk is None:
break
accumulated_line += chunk
while '\n' in accumulated_line:
line, accumulated_line = accumulated_line.split('\n', 1)
if not line.strip():
continue
try:
data = json.loads(line)
if data.get("type") == "markdown-chat" and isinstance(data.get("value"), str):
content_chunk = data["value"]
if content_chunk:
sse_chunk = ChatCompletionChunk(
id=chunk_id, created=created_time,
choices=[Choice(delta=ChoiceDelta(content=content_chunk))]
)
yield f"data: {sse_chunk.json()}\n\n"
elif "recordMap" in data:
pass # Ignore recordMap chunks
except json.JSONDecodeError:
pass # Ignore JSON decode errors in silent mode
except Exception as e:
# Only log critical errors
account_logger.error(f"Critical error processing stream: {e}")
# Process any final accumulated data after None sentinel
if accumulated_line.strip():
try:
data = json.loads(accumulated_line)
if data.get("type") == "markdown-chat" and isinstance(data.get("value"), str):
content_chunk = data["value"]
if content_chunk:
sse_chunk = ChatCompletionChunk(
id=chunk_id, created=created_time,
choices=[Choice(delta=ChoiceDelta(content=content_chunk))]
)
yield f"data: {sse_chunk.json()}\n\n"
except json.JSONDecodeError:
pass # Ignore JSON decode errors in silent mode
except Exception as e:
# Only log critical errors
account_logger.error(f"Critical error processing final data: {e}")
# After loop, check if the background task raised an exception
if playwright_task.done() and playwright_task.exception():
task_exception = playwright_task.exception()
account_logger.error(f"Background task error: {task_exception}")
raise HTTPException(status_code=500, detail=f"Error during background processing: {task_exception}")
else:
final_chunk = ChatCompletionChunk(
id=chunk_id, created=created_time,
choices=[Choice(delta=ChoiceDelta(), finish_reason="stop")]
)
yield f"data: {final_chunk.json()}\n\n"
yield "data: [DONE]\n\n"
except Exception as e:
account_logger.error(f"Stream response error: {e}")
if playwright_task and not playwright_task.done():
playwright_task.cancel()
raise
finally:
if playwright_task and not playwright_task.done():
playwright_task.cancel()
try:
await playwright_task # Allow cancellation to propagate
except asyncio.CancelledError:
pass
except Exception:
pass # Suppress errors during cleanup
# --- API Endpoint ---
@app.get("/v1/models", response_model=ModelList)
async def list_models(authenticated: bool = Depends(authenticate)):
"""
Endpoint to list available Notion models, mimicking OpenAI's /v1/models.
"""
available_models = [
"openai-gpt-4.1",
"anthropic-opus-4",
"anthropic-sonnet-4"
]
model_list = [
Model(id=model_id, owned_by="notion") # created uses default_factory
for model_id in available_models
]
return ModelList(data=model_list)
@app.post("/v1/chat/completions")
async def chat_completions(request_data: ChatCompletionRequest, request: Request, authenticated: bool = Depends(authenticate)):
"""
Endpoint to mimic OpenAI's chat completions, proxying to Notion.
It uses round-robin to select a healthy Notion account for each request.
"""
account = get_next_account() # Select a healthy account
notion_request_body = build_notion_request(request_data, account)
if request_data.stream:
# Call the Playwright generator, passing the selected account
return StreamingResponse(
stream_notion_response(notion_request_body, account),
media_type="text/event-stream"
)
else:
# --- Non-Streaming Logic ---
full_response_content = ""
final_finish_reason = None
chunk_id = f"chatcmpl-{uuid.uuid4()}" # Generate ID for the non-streamed response
created_time = int(time.time())
try:
# Call the Playwright generator, passing the selected account
async for line in stream_notion_response(notion_request_body, account):
if line.startswith("data: ") and "[DONE]" not in line:
try:
data_json = line[len("data: "):].strip()
if data_json:
chunk_data = json.loads(data_json)
if chunk_data.get("choices"):
delta = chunk_data["choices"][0].get("delta", {})
content = delta.get("content")
if content:
full_response_content += content
finish_reason = chunk_data["choices"][0].get("finish_reason")
if finish_reason:
final_finish_reason = finish_reason
except json.JSONDecodeError:
pass # Ignore JSON errors in non-streaming mode
except Exception as e:
account_logger.error(f"Error processing non-streaming response: {e}")
# Construct the final OpenAI-compatible non-streaming response
return {
"id": chunk_id,
"object": "chat.completion",
"created": created_time,
"model": request_data.model, # Return the model requested by the client
"choices": [
{
"index": 0,
"message": {
"role": "assistant",
"content": full_response_content,
},
"finish_reason": final_finish_reason or "stop", # Default to stop if not explicitly set
}
],
"usage": { # Note: Token usage is not available from Notion
"prompt_tokens": None,
"completion_tokens": None,
"total_tokens": None,
},
}
except HTTPException as e:
# Re-raise HTTP exceptions from the streaming function
raise e
except Exception as e:
account_logger.error(f"Error during non-streaming processing: {e}")
raise HTTPException(status_code=500, detail="Internal server error processing Notion response")
# --- Uvicorn Runner ---
if __name__ == "__main__":
import uvicorn
print("Starting server. Access at http://127.0.0.1:7860")
print("Ensure NOTION_COOKIES is set in your .env file or environment, separated by '|'.")
uvicorn.run(app, host="127.0.0.1", port=7860)