polarbearblue commited on
Commit
8718031
·
verified ·
1 Parent(s): 4726024

Update main.py

Browse files
Files changed (1) hide show
  1. main.py +840 -835
main.py CHANGED
@@ -1,835 +1,840 @@
1
- import os
2
- import uuid
3
- import json
4
- import time
5
- import asyncio
6
- import random
7
- import threading
8
- from curl_cffi.requests import AsyncSession
9
- from fastapi import FastAPI, Request, HTTPException, Depends, status
10
- from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
11
- from fastapi.responses import StreamingResponse
12
- from dotenv import load_dotenv
13
- import secrets
14
- from pydantic import BaseModel, Field
15
- from typing import List, Optional, Dict, Any, Literal, Union
16
- from contextlib import asynccontextmanager
17
-
18
- # Load environment variables from .env file
19
- load_dotenv()
20
-
21
- # --- 并发请求配置 ---
22
- CONCURRENT_REQUESTS = 1 # 可自定义并发请求数量
23
-
24
- # --- 重试配置 ---
25
- MAX_RETRIES = 3
26
- RETRY_DELAY = 1 # 秒
27
-
28
- # --- Models (Integrated from models.py) ---
29
-
30
- # Input Models (OpenAI-like)
31
- class ChatMessage(BaseModel):
32
- role: Literal["system", "user", "assistant"]
33
- content: str
34
-
35
- class ChatCompletionRequest(BaseModel):
36
- messages: List[ChatMessage]
37
- model: str = "notion-proxy"
38
- stream: bool = False
39
- notion_model: str = "anthropic-opus-4"
40
-
41
-
42
- # Notion Models
43
- class NotionTranscriptConfigValue(BaseModel):
44
- type: str = "markdown-chat"
45
- model: str # e.g., "anthropic-opus-4"
46
-
47
- class NotionTranscriptItem(BaseModel):
48
- type: Literal["config", "user", "markdown-chat"]
49
- value: Union[List[List[str]], str, NotionTranscriptConfigValue]
50
-
51
- class NotionDebugOverrides(BaseModel):
52
- cachedInferences: Dict = Field(default_factory=dict)
53
- annotationInferences: Dict = Field(default_factory=dict)
54
- emitInferences: bool = False
55
-
56
- class NotionRequestBody(BaseModel):
57
- traceId: str = Field(default_factory=lambda: str(uuid.uuid4()))
58
- spaceId: str
59
- transcript: List[NotionTranscriptItem]
60
- # threadId is removed, createThread will be set to true
61
- createThread: bool = True
62
- debugOverrides: NotionDebugOverrides = Field(default_factory=NotionDebugOverrides)
63
- generateTitle: bool = False
64
- saveAllThreadOperations: bool = True
65
-
66
-
67
- # Output Models (OpenAI SSE)
68
- class ChoiceDelta(BaseModel):
69
- content: Optional[str] = None
70
-
71
- class Choice(BaseModel):
72
- index: int = 0
73
- delta: ChoiceDelta
74
- finish_reason: Optional[Literal["stop", "length"]] = None
75
-
76
- class ChatCompletionChunk(BaseModel):
77
- id: str = Field(default_factory=lambda: f"chatcmpl-{uuid.uuid4()}")
78
- object: str = "chat.completion.chunk"
79
- created: int = Field(default_factory=lambda: int(time.time()))
80
- model: str = "notion-proxy" # Or could reflect the underlying Notion model
81
- choices: List[Choice]
82
-
83
-
84
- # Models for /v1/models Endpoint
85
- class Model(BaseModel):
86
- id: str
87
- object: str = "model"
88
- created: int = Field(default_factory=lambda: int(time.time()))
89
- owned_by: str = "notion" # Or specify based on actual model origin if needed
90
-
91
- class ModelList(BaseModel):
92
- object: str = "list"
93
- data: List[Model]
94
-
95
- # --- Configuration ---
96
- NOTION_API_URL = "https://www.notion.so/api/v3/runInferenceTranscript"
97
- # IMPORTANT: Load the Notion cookie securely from environment variables
98
- NOTION_COOKIE = os.getenv("NOTION_COOKIE")
99
-
100
- NOTION_SPACE_ID = os.getenv("NOTION_SPACE_ID")
101
- if not NOTION_COOKIE:
102
- print("Error: NOTION_COOKIE environment variable not set.")
103
- # Consider raising HTTPException or exiting in a real app
104
- if not NOTION_SPACE_ID:
105
- print("Warning: NOTION_SPACE_ID environment variable not set. Using a default UUID.")
106
- # Using a default might not be ideal, depends on Notion's behavior
107
- # Consider raising an error instead: raise ValueError("NOTION_SPACE_ID not set")
108
- NOTION_SPACE_ID = str(uuid.uuid4()) # Default or raise error
109
-
110
- # --- Cookie Management ---
111
- browser_cookies = ""
112
- cookie_lock = threading.Lock()
113
- last_cookie_update = 0
114
- COOKIE_UPDATE_INTERVAL = 30 * 60 # 30 minutes in seconds
115
-
116
- async def get_browser_cookies():
117
- """获取Notion网站的浏览器cookie"""
118
- global browser_cookies, last_cookie_update
119
-
120
- try:
121
- print("正在获取Notion浏览器cookie...")
122
- async with AsyncSession(impersonate="chrome136") as session:
123
- response = await session.get("https://www.notion.so")
124
-
125
- if response.status_code == 200:
126
- # 获取所有cookie
127
- cookies = response.cookies
128
- notion_so_cookies = []
129
-
130
- # 处理CookieConflict问题,只获取.notion.so域名的cookie
131
- try:
132
- # 尝试通过域名过滤来避免冲突
133
- if hasattr(cookies, 'get_dict'):
134
- # 使用get_dict方法并指定域名
135
- notion_so_dict = cookies.get_dict(domain='.notion.so')
136
- for name, value in notion_so_dict.items():
137
- notion_so_cookies.append(f"{name}={value}")
138
- elif hasattr(cookies, 'jar'):
139
- # 如果cookies有jar属性,遍历并过滤域名
140
- for cookie in cookies.jar:
141
- if hasattr(cookie, 'domain') and cookie.domain:
142
- if '.notion.so' in cookie.domain and '.notion.com' not in cookie.domain:
143
- notion_so_cookies.append(f"{cookie.name}={cookie.value}")
144
- else:
145
- # 尝试手动构建cookie字符串,避免冲突
146
- # 直接从响应头中提取Set-Cookie信息
147
- set_cookie_headers = response.headers.get_list('Set-Cookie') if hasattr(response.headers, 'get_list') else []
148
- if not set_cookie_headers and 'Set-Cookie' in response.headers:
149
- set_cookie_headers = [response.headers['Set-Cookie']]
150
-
151
- for cookie_header in set_cookie_headers:
152
- if 'domain=.notion.so' in cookie_header or ('notion.so' in cookie_header and 'notion.com' not in cookie_header):
153
- # 提取cookie名称和值
154
- cookie_parts = cookie_header.split(';')[0].strip()
155
- if '=' in cookie_parts:
156
- notion_so_cookies.append(cookie_parts)
157
-
158
- # 如果还是没有获取到,尝试使用requests-like的方式
159
- if not notion_so_cookies and hasattr(response, 'cookies'):
160
- try:
161
- # 遍历所有cookie,手动过滤
162
- for cookie in response.cookies:
163
- if hasattr(cookie, 'domain') and cookie.domain and '.notion.so' in cookie.domain:
164
- notion_so_cookies.append(f"{cookie.name}={cookie.value}")
165
- except Exception as inner_e:
166
- print(f"内部cookie处理错误: {inner_e}")
167
-
168
- except Exception as cookie_error:
169
- print(f"处理cookie时出现错误: {cookie_error}")
170
- # 如果所有方法都失败,尝试从session获取
171
- if hasattr(session, 'cookies'):
172
- try:
173
- for name, value in session.cookies.items():
174
- notion_so_cookies.append(f"{name}={value}")
175
- except:
176
- pass
177
-
178
- # 添加环境变量中的cookie,加上token_v2前缀
179
- if NOTION_COOKIE:
180
- notion_so_cookies.append(f"token_v2={NOTION_COOKIE}")
181
-
182
- # 如果没有获取到任何cookie,至少使用环境变量的
183
- if not notion_so_cookies and NOTION_COOKIE:
184
- notion_so_cookies = [f"token_v2={NOTION_COOKIE}"]
185
-
186
- with cookie_lock:
187
- browser_cookies = "; ".join(notion_so_cookies)
188
- last_cookie_update = time.time()
189
-
190
- # 提取cookie名称用于日志显示
191
- cookie_names = []
192
- for cookie_str in notion_so_cookies:
193
- if '=' in cookie_str:
194
- name = cookie_str.split('=')[0]
195
- cookie_names.append(name)
196
-
197
- print(f"成功获取到 {len(notion_so_cookies)} 个cookie")
198
- print(f"Cookie名称列表: {', '.join(cookie_names)}")
199
- return True
200
- else:
201
- print(f"获取cookie失败,HTTP状态码: {response.status_code}")
202
- return False
203
-
204
- except Exception as e:
205
- print(f"获取browser cookie时出错: {e}")
206
- print(f"错误详情: {type(e).__name__}: {str(e)}")
207
-
208
- # 如果完全失败,至少使用环境变量的cookie
209
- if NOTION_COOKIE:
210
- with cookie_lock:
211
- browser_cookies = f"token_v2={NOTION_COOKIE}"
212
- last_cookie_update = time.time()
213
- print("使用环境变量cookie作为备用")
214
- return True
215
- return False
216
-
217
- def should_update_cookies():
218
- """检查是否需要更新cookie"""
219
- return time.time() - last_cookie_update > COOKIE_UPDATE_INTERVAL
220
-
221
- async def ensure_cookies_available():
222
- """确保cookie可用,如果需要则更新"""
223
- global browser_cookies
224
-
225
- if not browser_cookies or should_update_cookies():
226
- success = await get_browser_cookies()
227
- if not success and not browser_cookies:
228
- # 如果获取失败且没有备用cookie,使用环境变量的cookie
229
- if NOTION_COOKIE:
230
- with cookie_lock:
231
- browser_cookies = f"token_v2={NOTION_COOKIE}"
232
- print("使用环境变量cookie作为备用")
233
- else:
234
- raise HTTPException(status_code=500, detail="无法获取Notion cookie")
235
-
236
- def start_cookie_updater():
237
- """启动cookie定时更新器"""
238
- def cookie_updater():
239
- loop = asyncio.new_event_loop()
240
- asyncio.set_event_loop(loop)
241
-
242
- while True:
243
- try:
244
- if should_update_cookies():
245
- print("开始定时更新cookie...")
246
- loop.run_until_complete(get_browser_cookies())
247
- time.sleep(60) # 每分钟检查一次
248
- except Exception as e:
249
- print(f"定时更新cookie时出错: {e}")
250
- time.sleep(60)
251
-
252
- thread = threading.Thread(target=cookie_updater, daemon=True)
253
- thread.start()
254
- print("cookie定时更新器已启动")
255
-
256
- # --- Authentication ---
257
- EXPECTED_TOKEN = os.getenv("PROXY_AUTH_TOKEN", "default_token") # Default token
258
- security = HTTPBearer()
259
-
260
- def authenticate(credentials: HTTPAuthorizationCredentials = Depends(security)):
261
- """Compares provided token with the expected token."""
262
- correct_token = secrets.compare_digest(credentials.credentials, EXPECTED_TOKEN)
263
- if not correct_token:
264
- raise HTTPException(
265
- status_code=status.HTTP_401_UNAUTHORIZED,
266
- detail="Invalid authentication credentials",
267
- # WWW-Authenticate header removed for Bearer
268
- )
269
- return True # Indicate successful authentication
270
-
271
- # --- Lifespan Event Handler ---
272
- @asynccontextmanager
273
- async def lifespan(app: FastAPI):
274
- """应用生命周期管理"""
275
- # 启动时的初始化
276
- print("正在初始化Notion浏览器cookie...")
277
- await get_browser_cookies()
278
- # 启动cookie定时更新器
279
- start_cookie_updater()
280
- yield
281
- # 关闭时的清理(如果需要)
282
-
283
- # --- FastAPI App ---
284
- app = FastAPI(lifespan=lifespan)
285
-
286
- # --- Helper Functions ---
287
-
288
- def build_notion_request(request_data: ChatCompletionRequest) -> NotionRequestBody:
289
- """Transforms OpenAI-style messages to Notion transcript format."""
290
- transcript = [
291
- NotionTranscriptItem(
292
- type="config",
293
- value=NotionTranscriptConfigValue(model=request_data.notion_model)
294
- )
295
- ]
296
- for message in request_data.messages:
297
- # Map 'assistant' role to 'markdown-chat', all others to 'user'
298
- if message.role == "assistant":
299
- # Notion uses "markdown-chat" for assistant replies in the transcript history
300
- transcript.append(NotionTranscriptItem(type="markdown-chat", value=message.content))
301
- else:
302
- # Map user, system, and any other potential roles to 'user'
303
- transcript.append(NotionTranscriptItem(type="user", value=[[message.content]]))
304
-
305
- # Use globally configured spaceId, set createThread=True
306
- return NotionRequestBody(
307
- spaceId=NOTION_SPACE_ID, # From environment variable
308
- transcript=transcript,
309
- createThread=True, # Always create a new thread
310
- # Generate a new traceId for each request
311
- traceId=str(uuid.uuid4()),
312
- # Explicitly set debugOverrides, generateTitle, and saveAllThreadOperations
313
- debugOverrides=NotionDebugOverrides(
314
- cachedInferences={},
315
- annotationInferences={},
316
- emitInferences=False
317
- ),
318
- generateTitle=False,
319
- saveAllThreadOperations=False
320
- )
321
-
322
-
323
- async def check_first_response_line(session: AsyncSession, notion_request_body: NotionRequestBody, headers: dict, request_id: int):
324
- """检查响应的第一行,判断是否为500错误"""
325
- try:
326
- # 当并发请求数大于1时,添加随机延迟以避免同时到达
327
- if CONCURRENT_REQUESTS > 1:
328
- delay = random.uniform(0, 1.0)
329
- print(f"并发请求 {request_id} 延迟 {delay:.2f}秒")
330
- await asyncio.sleep(delay)
331
-
332
- # 为每个并发请求创建独立的请求体,生成新的traceId
333
- request_body_copy = notion_request_body.model_copy()
334
- request_body_copy.traceId = str(uuid.uuid4())
335
-
336
- response = await session.post(
337
- NOTION_API_URL,
338
- json=request_body_copy.model_dump(),
339
- headers=headers,
340
- stream=True
341
- )
342
-
343
- if response.status_code != 200:
344
- return None, response, f"HTTP {response.status_code}"
345
-
346
- # 读取第一行来检查是否是错误
347
- buffer = ""
348
- async for chunk in response.aiter_content():
349
- if isinstance(chunk, bytes):
350
- chunk = chunk.decode('utf-8')
351
- buffer += chunk
352
-
353
- # 尝试解析第一个完整的JSON行
354
- lines = buffer.split('\n')
355
- for line in lines:
356
- line = line.strip()
357
- if line:
358
- try:
359
- data = json.loads(line)
360
- if (data.get("type") == "error" and
361
- data.get("message") and
362
- "error code 500" in data.get("message", "")):
363
- print(f"并发请求 {request_id} 检测到500错误: {data}")
364
- return None, response, "500 error"
365
- else:
366
- # 正常响应,返回response和已读取的buffer
367
- print(f"并发请求 {request_id} 响应正常")
368
- return (response, buffer), None, None
369
- except json.JSONDecodeError:
370
- continue
371
-
372
- return None, response, "No valid response"
373
- except Exception as e:
374
- print(f"并发请求 {request_id} 发生异常: {e}")
375
- return None, None, str(e)
376
-
377
- async def stream_notion_response_single(session: AsyncSession, response, initial_buffer: str, chunk_id: str, created_time: int):
378
- """处理单个响应的流式输出"""
379
- buffer = initial_buffer
380
-
381
- # 首先处理已经读取的buffer中的内容
382
- lines = buffer.split('\n')
383
- buffer = lines[-1]
384
-
385
- for line in lines[:-1]:
386
- line = line.strip()
387
- if not line:
388
- continue
389
-
390
- try:
391
- data = json.loads(line)
392
-
393
- if data.get("type") == "markdown-chat" and isinstance(data.get("value"), str):
394
- content_chunk = data["value"]
395
- if content_chunk:
396
- chunk_obj = ChatCompletionChunk(
397
- id=chunk_id,
398
- created=created_time,
399
- choices=[Choice(delta=ChoiceDelta(content=content_chunk))]
400
- )
401
- yield f"data: {chunk_obj.model_dump_json()}\n\n"
402
- elif "recordMap" in data:
403
- print("Detected recordMap, stopping stream.")
404
- # 继续处理剩余的buffer
405
- if buffer.strip():
406
- try:
407
- last_data = json.loads(buffer.strip())
408
- if last_data.get("type") == "markdown-chat" and isinstance(last_data.get("value"), str):
409
- if last_data["value"]:
410
- last_chunk = ChatCompletionChunk(
411
- id=chunk_id,
412
- created=created_time,
413
- choices=[Choice(delta=ChoiceDelta(content=last_data["value"]))]
414
- )
415
- yield f"data: {last_chunk.model_dump_json()}\n\n"
416
- except:
417
- pass
418
- return
419
- except json.JSONDecodeError as e:
420
- print(f"Warning: Could not decode JSON line: {line[:100]}... Error: {str(e)}")
421
- except Exception as e:
422
- print(f"Error processing line: {str(e)}")
423
-
424
- # 继续读取剩余的响应
425
- async for chunk in response.aiter_content():
426
- if isinstance(chunk, bytes):
427
- chunk = chunk.decode('utf-8')
428
-
429
- buffer += chunk
430
-
431
- lines = buffer.split('\n')
432
- buffer = lines[-1]
433
-
434
- for line in lines[:-1]:
435
- line = line.strip()
436
- if not line:
437
- continue
438
-
439
- try:
440
- data = json.loads(line)
441
-
442
- if data.get("type") == "markdown-chat" and isinstance(data.get("value"), str):
443
- content_chunk = data["value"]
444
- if content_chunk:
445
- chunk_obj = ChatCompletionChunk(
446
- id=chunk_id,
447
- created=created_time,
448
- choices=[Choice(delta=ChoiceDelta(content=content_chunk))]
449
- )
450
- yield f"data: {chunk_obj.model_dump_json()}\n\n"
451
- elif "recordMap" in data:
452
- print("Detected recordMap, stopping stream.")
453
- if buffer.strip():
454
- try:
455
- last_data = json.loads(buffer.strip())
456
- if last_data.get("type") == "markdown-chat" and isinstance(last_data.get("value"), str):
457
- if last_data["value"]:
458
- last_chunk = ChatCompletionChunk(
459
- id=chunk_id,
460
- created=created_time,
461
- choices=[Choice(delta=ChoiceDelta(content=last_data["value"]))]
462
- )
463
- yield f"data: {last_chunk.model_dump_json()}\n\n"
464
- except:
465
- pass
466
- return
467
- except json.JSONDecodeError as e:
468
- print(f"Warning: Could not decode JSON line: {line[:100]}... Error: {str(e)}")
469
- except Exception as e:
470
- print(f"Error processing line: {str(e)}")
471
-
472
- async def stream_notion_response(notion_request_body: NotionRequestBody):
473
- """Streams the request to Notion and yields OpenAI-compatible SSE chunks."""
474
-
475
- # 确保cookie可用
476
- await ensure_cookies_available()
477
-
478
- with cookie_lock:
479
- current_cookies = browser_cookies
480
-
481
- headers = {
482
- 'accept': 'application/x-ndjson',
483
- 'accept-encoding': 'gzip, deflate, br, zstd',
484
- 'accept-language': 'en-US,zh;q=0.9',
485
- 'content-type': 'application/json',
486
- 'dnt': '1',
487
- 'notion-audit-log-platform': 'web',
488
- 'notion-client-version': '23.13.0.3661',
489
- 'origin': 'https://www.notion.so',
490
- 'referer': 'https://www.notion.so/',
491
- 'priority': 'u=1, i',
492
- 'sec-ch-ua-mobile': '?0',
493
- 'sec-ch-ua-platform': '"Windows"',
494
- 'sec-fetch-dest': 'empty',
495
- 'sec-fetch-mode': 'cors',
496
- 'sec-fetch-site': 'same-origin',
497
- 'cookie': current_cookies,
498
- 'x-notion-space-id': NOTION_SPACE_ID
499
- }
500
-
501
- # Conditionally add the active user header
502
- notion_active_user = os.getenv("NOTION_ACTIVE_USER_HEADER")
503
- if notion_active_user: # Checks for None and empty string implicitly
504
- headers['x-notion-active-user-header'] = notion_active_user
505
-
506
- chunk_id = f"chatcmpl-{uuid.uuid4()}"
507
- created_time = int(time.time())
508
-
509
- # 使用全局重试配置
510
- max_retries = MAX_RETRIES
511
- retry_delay = RETRY_DELAY
512
-
513
- # 首先尝试并发请求
514
- print(f"同时发起 {CONCURRENT_REQUESTS} 个并发请求...")
515
- async with AsyncSession(impersonate="chrome136") as session:
516
- # 同时创建并发任务(每个都是独立的异步任务)
517
- tasks = []
518
- for i in range(CONCURRENT_REQUESTS):
519
- task = asyncio.create_task(
520
- check_first_response_line(session, notion_request_body, headers, i + 1)
521
- )
522
- tasks.append(task)
523
-
524
- # 等待所有任务完成或找到第一个成功的响应
525
- successful_response = None
526
- failed_count = 0
527
- completed_tasks = set()
528
-
529
- while len(completed_tasks) < CONCURRENT_REQUESTS and not successful_response:
530
- # 等待任意一个任务完成
531
- done, pending = await asyncio.wait(
532
- [t for t in tasks if t not in completed_tasks],
533
- return_when=asyncio.FIRST_COMPLETED
534
- )
535
-
536
- for task in done:
537
- completed_tasks.add(task)
538
- result, response, error = await task
539
- if result:
540
- # 找到成功的响应,立即使用
541
- successful_response = result
542
- print(f"找到成功的并发响应,立即使用")
543
- # 取消其他还在运行的任务
544
- for t in tasks:
545
- if t not in completed_tasks:
546
- t.cancel()
547
- break
548
- else:
549
- # 记录失败
550
- failed_count += 1
551
- if error:
552
- print(f"并发请求失败: {error}")
553
-
554
- # 如果有成功的响应,使用它进行流式传输
555
- if successful_response:
556
- response, initial_buffer = successful_response
557
- print("使用成功的并发响应进行流式传输")
558
-
559
- # 流式输出响应
560
- async for data in stream_notion_response_single(session, response, initial_buffer, chunk_id, created_time):
561
- yield data
562
-
563
- # Send the final chunk indicating stop
564
- final_chunk = ChatCompletionChunk(
565
- id=chunk_id,
566
- created=created_time,
567
- choices=[Choice(delta=ChoiceDelta(), finish_reason="stop")]
568
- )
569
- yield f"data: {final_chunk.model_dump_json()}\n\n"
570
- yield "data: [DONE]\n\n"
571
- return
572
-
573
- # 只有当所有并发请求都失败时,才进入重试流程
574
- print(f"所有 {CONCURRENT_REQUESTS} 个并发请求都失败,开始单请求重试流程...")
575
-
576
- # 进入原有的重试逻辑(不使用并发)
577
- for attempt in range(max_retries):
578
- try:
579
- # Using curl_cffi with chrome136 impersonation for better anti-bot bypass
580
- async with AsyncSession(impersonate="chrome136") as session:
581
- # Stream the response
582
- response = await session.post(
583
- NOTION_API_URL,
584
- json=notion_request_body.model_dump(),
585
- headers=headers,
586
- stream=True
587
- )
588
-
589
- if response.status_code != 200:
590
- error_content = await response.atext()
591
- print(f"Error from Notion API: {response.status_code}")
592
- print(f"Response: {error_content}")
593
- raise HTTPException(status_code=response.status_code, detail=f"Notion API Error: {error_content}")
594
-
595
- # Process streaming response
596
- # curl_cffi streaming works differently - we need to read the content in chunks
597
- buffer = ""
598
- first_line_checked = False
599
- is_error_response = False
600
-
601
- async for chunk in response.aiter_content():
602
- # Decode chunk if it's bytes
603
- if isinstance(chunk, bytes):
604
- chunk = chunk.decode('utf-8')
605
-
606
- buffer += chunk
607
-
608
- # Split by newlines and process complete lines
609
- lines = buffer.split('\n')
610
- # Keep the last incomplete line in the buffer
611
- buffer = lines[-1]
612
-
613
- for line in lines[:-1]:
614
- line = line.strip()
615
- if not line:
616
- continue
617
-
618
- try:
619
- data = json.loads(line)
620
-
621
- # 检查第一行是否是500错误响应
622
- if not first_line_checked:
623
- first_line_checked = True
624
- if (data.get("type") == "error" and
625
- data.get("message") and
626
- "error code 500" in data.get("message", "")):
627
- print(f"检测到Notion API 500错误 (重试 {attempt + 1}/{max_retries}): {data}")
628
- is_error_response = True
629
- break
630
-
631
- # 如果不是错误响应,实时流式转发
632
- # Check if it's the type of message containing text chunks
633
- if data.get("type") == "markdown-chat" and isinstance(data.get("value"), str):
634
- content_chunk = data["value"]
635
- if content_chunk: # Only send if there's content
636
- chunk_obj = ChatCompletionChunk(
637
- id=chunk_id,
638
- created=created_time,
639
- choices=[Choice(delta=ChoiceDelta(content=content_chunk))]
640
- )
641
- yield f"data: {chunk_obj.model_dump_json()}\n\n"
642
- # Add logic here to detect the end of the stream if Notion has a specific marker
643
- # For now, we assume markdown-chat stops when the main content is done.
644
- # If we see a recordMap, it's definitely past the text stream.
645
- elif "recordMap" in data:
646
- print("Detected recordMap, stopping stream.")
647
- # Process any remaining buffer
648
- if buffer.strip():
649
- try:
650
- last_data = json.loads(buffer.strip())
651
- if last_data.get("type") == "markdown-chat" and isinstance(last_data.get("value"), str):
652
- if last_data["value"]:
653
- last_chunk = ChatCompletionChunk(
654
- id=chunk_id,
655
- created=created_time,
656
- choices=[Choice(delta=ChoiceDelta(content=last_data["value"]))]
657
- )
658
- yield f"data: {last_chunk.model_dump_json()}\n\n"
659
- except:
660
- pass
661
- # Exit the loop
662
- break
663
-
664
- except json.JSONDecodeError as e:
665
- print(f"Warning: Could not decode JSON line: {line[:100]}... Error: {str(e)}")
666
- except Exception as e:
667
- print(f"Error processing line: {str(e)}")
668
- # Continue processing other lines
669
-
670
- if is_error_response:
671
- break
672
-
673
- # 如果检测到错误,进行重试
674
- if is_error_response:
675
- if attempt < max_retries - 1:
676
- print(f"等待 {retry_delay} 秒后重试...")
677
- await asyncio.sleep(retry_delay)
678
- continue # 重试
679
- else:
680
- # 所有重试都失败了,通过流式响应返回错误信息
681
- print("所有重试都失败,返回500错误给客户端")
682
- error_chunk = ChatCompletionChunk(
683
- id=chunk_id,
684
- created=created_time,
685
- choices=[Choice(delta=ChoiceDelta(content="Error: Notion API returned error code 500 after all retries"), finish_reason="stop")]
686
- )
687
- yield f"data: {error_chunk.model_dump_json()}\n\n"
688
- yield "data: [DONE]\n\n"
689
- return
690
-
691
- # 如果没有错误,发送最终的停止信号
692
- # Send the final chunk indicating stop
693
- final_chunk = ChatCompletionChunk(
694
- id=chunk_id,
695
- created=created_time,
696
- choices=[Choice(delta=ChoiceDelta(), finish_reason="stop")]
697
- )
698
- yield f"data: {final_chunk.model_dump_json()}\n\n"
699
- yield "data: [DONE]\n\n"
700
-
701
- # 成功完成,退出重试循环
702
- break
703
-
704
- except HTTPException:
705
- # 在流式响应中不能抛出HTTPException,通过流式响应返回错误
706
- if attempt < max_retries - 1:
707
- print(f"HTTP异常,等待 {retry_delay} 秒后重试...")
708
- await asyncio.sleep(retry_delay)
709
- continue
710
- else:
711
- print("HTTP异常且无更多重试,返回错误信息")
712
- error_chunk = ChatCompletionChunk(
713
- id=chunk_id,
714
- created=created_time,
715
- choices=[Choice(delta=ChoiceDelta(content="Error: HTTP exception occurred after all retries"), finish_reason="stop")]
716
- )
717
- yield f"data: {error_chunk.model_dump_json()}\n\n"
718
- yield "data: [DONE]\n\n"
719
- return
720
- except Exception as e:
721
- print(f"Unexpected error during streaming (attempt {attempt + 1}/{max_retries}): {e}")
722
- if attempt < max_retries - 1:
723
- print(f"等待 {retry_delay} 秒后重试...")
724
- await asyncio.sleep(retry_delay)
725
- continue
726
- else:
727
- print("意外错误且无更多重试,返回错误信息")
728
- error_chunk = ChatCompletionChunk(
729
- id=chunk_id,
730
- created=created_time,
731
- choices=[Choice(delta=ChoiceDelta(content=f"Error: Internal server error during streaming: {e}"), finish_reason="stop")]
732
- )
733
- yield f"data: {error_chunk.model_dump_json()}\n\n"
734
- yield "data: [DONE]\n\n"
735
- return
736
-
737
-
738
- # --- API Endpoints ---
739
-
740
- @app.get("/v1/models", response_model=ModelList)
741
- async def list_models(authenticated: bool = Depends(authenticate)):
742
- """
743
- Endpoint to list available Notion models, mimicking OpenAI's /v1/models.
744
- """
745
- available_models = [
746
- "openai-gpt-4.1",
747
- "anthropic-opus-4",
748
- "anthropic-sonnet-4"
749
- ]
750
- model_list = [
751
- Model(id=model_id, owned_by="notion") # created uses default_factory
752
- for model_id in available_models
753
- ]
754
- return ModelList(data=model_list)
755
-
756
- @app.post("/v1/chat/completions")
757
- async def chat_completions(request_data: ChatCompletionRequest, request: Request, authenticated: bool = Depends(authenticate)):
758
- """
759
- Endpoint to mimic OpenAI's chat completions, proxying to Notion.
760
- """
761
- if not NOTION_COOKIE:
762
- raise HTTPException(status_code=500, detail="Server configuration error: Notion cookie not set.")
763
-
764
- notion_request_body = build_notion_request(request_data)
765
-
766
- if request_data.stream:
767
- return StreamingResponse(
768
- stream_notion_response(notion_request_body),
769
- media_type="text/event-stream"
770
- )
771
- else:
772
- # --- Non-Streaming Logic (Optional - Collects stream internally) ---
773
- # Note: The primary goal is streaming, but a non-streaming version
774
- # might be useful for testing or simpler clients.
775
- # This requires collecting all chunks from the async generator.
776
- full_response_content = ""
777
- final_finish_reason = None
778
- chunk_id = f"chatcmpl-{uuid.uuid4()}" # Generate ID for the non-streamed response
779
- created_time = int(time.time())
780
-
781
- try:
782
- async for line in stream_notion_response(notion_request_body):
783
- if line.startswith("data: ") and "[DONE]" not in line:
784
- try:
785
- data_json = line[len("data: "):].strip()
786
- if data_json:
787
- chunk_data = json.loads(data_json)
788
- if chunk_data.get("choices"):
789
- delta = chunk_data["choices"][0].get("delta", {})
790
- content = delta.get("content")
791
- if content:
792
- full_response_content += content
793
- finish_reason = chunk_data["choices"][0].get("finish_reason")
794
- if finish_reason:
795
- final_finish_reason = finish_reason
796
- except json.JSONDecodeError:
797
- print(f"Warning: Could not decode JSON line in non-streaming mode: {line}")
798
-
799
- # Construct the final OpenAI-compatible non-streaming response
800
- return {
801
- "id": chunk_id,
802
- "object": "chat.completion",
803
- "created": created_time,
804
- "model": request_data.model, # Return the model requested by the client
805
- "choices": [
806
- {
807
- "index": 0,
808
- "message": {
809
- "role": "assistant",
810
- "content": full_response_content,
811
- },
812
- "finish_reason": final_finish_reason or "stop", # Default to stop if not explicitly set
813
- }
814
- ],
815
- "usage": { # Note: Token usage is not available from Notion
816
- "prompt_tokens": None,
817
- "completion_tokens": None,
818
- "total_tokens": None,
819
- },
820
- }
821
- except HTTPException as e:
822
- # Re-raise HTTP exceptions from the streaming function
823
- raise e
824
- except Exception as e:
825
- print(f"Error during non-streaming processing: {e}")
826
- raise HTTPException(status_code=500, detail="Internal server error processing Notion response")
827
-
828
- if __name__ == "__main__":
829
- import uvicorn
830
- print("Starting server. Access at http://localhost:7860")
831
- print("Ensure NOTION_COOKIE is set in your .env file or environment.")
832
- print("Cookie管理系统已启用,将自动获取和更新Notion浏览器cookie")
833
-
834
- # 运行服务器
835
- uvicorn.run(app, host="0.0.0.0", port=7860)
 
 
 
 
 
 
1
+ import os
2
+ import uuid
3
+ import json
4
+ import time
5
+ import asyncio
6
+ import random
7
+ import threading
8
+ from curl_cffi.requests import AsyncSession
9
+ from fastapi import FastAPI, Request, HTTPException, Depends, status
10
+ from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
11
+ from fastapi.responses import StreamingResponse
12
+ from dotenv import load_dotenv
13
+ import secrets
14
+ from pydantic import BaseModel, Field
15
+ from typing import List, Optional, Dict, Any, Literal, Union
16
+ from contextlib import asynccontextmanager
17
+
18
+ # Load environment variables from .env file
19
+ load_dotenv()
20
+
21
+ # --- 并发请求配置 ---
22
+ CONCURRENT_REQUESTS = 1 # 可自定义并发请求数量
23
+
24
+ # --- 重试配置 ---
25
+ MAX_RETRIES = 3
26
+ RETRY_DELAY = 1 # 秒
27
+
28
+ # --- Models (Integrated from models.py) ---
29
+
30
+ # Input Models (OpenAI-like)
31
+ class ChatMessage(BaseModel):
32
+ role: Literal["system", "user", "assistant"]
33
+ content: str
34
+
35
+ class ChatCompletionRequest(BaseModel):
36
+ messages: List[ChatMessage]
37
+ model: str = "notion-proxy"
38
+ stream: bool = False
39
+ notion_model: str = "anthropic-opus-4"
40
+
41
+
42
+ # Notion Models
43
+ class NotionTranscriptConfigValue(BaseModel):
44
+ type: str = "markdown-chat"
45
+ model: str # e.g., "anthropic-opus-4"
46
+
47
+ class NotionTranscriptItem(BaseModel):
48
+ type: Literal["config", "user", "markdown-chat"]
49
+ value: Union[List[List[str]], str, NotionTranscriptConfigValue]
50
+
51
+ class NotionDebugOverrides(BaseModel):
52
+ cachedInferences: Dict = Field(default_factory=dict)
53
+ annotationInferences: Dict = Field(default_factory=dict)
54
+ emitInferences: bool = False
55
+
56
+ class NotionRequestBody(BaseModel):
57
+ traceId: str = Field(default_factory=lambda: str(uuid.uuid4()))
58
+ spaceId: str
59
+ transcript: List[NotionTranscriptItem]
60
+ # threadId is removed, createThread will be set to true
61
+ createThread: bool = True
62
+ debugOverrides: NotionDebugOverrides = Field(default_factory=NotionDebugOverrides)
63
+ generateTitle: bool = False
64
+ saveAllThreadOperations: bool = True
65
+
66
+
67
+ # Output Models (OpenAI SSE)
68
+ class ChoiceDelta(BaseModel):
69
+ content: Optional[str] = None
70
+
71
+ class Choice(BaseModel):
72
+ index: int = 0
73
+ delta: ChoiceDelta
74
+ finish_reason: Optional[Literal["stop", "length"]] = None
75
+
76
+ class ChatCompletionChunk(BaseModel):
77
+ id: str = Field(default_factory=lambda: f"chatcmpl-{uuid.uuid4()}")
78
+ object: str = "chat.completion.chunk"
79
+ created: int = Field(default_factory=lambda: int(time.time()))
80
+ model: str = "notion-proxy" # Or could reflect the underlying Notion model
81
+ choices: List[Choice]
82
+
83
+
84
+ # Models for /v1/models Endpoint
85
+ class Model(BaseModel):
86
+ id: str
87
+ object: str = "model"
88
+ created: int = Field(default_factory=lambda: int(time.time()))
89
+ owned_by: str = "notion" # Or specify based on actual model origin if needed
90
+
91
+ class ModelList(BaseModel):
92
+ object: str = "list"
93
+ data: List[Model]
94
+
95
+ # --- Configuration ---
96
+ NOTION_API_URL = "https://www.notion.so/api/v3/runInferenceTranscript"
97
+ # IMPORTANT: Load the Notion cookie securely from environment variables
98
+ NOTION_COOKIE = os.getenv("NOTION_COOKIE")
99
+
100
+ NOTION_SPACE_ID = os.getenv("NOTION_SPACE_ID")
101
+ if not NOTION_COOKIE:
102
+ print("Error: NOTION_COOKIE environment variable not set.")
103
+ # Consider raising HTTPException or exiting in a real app
104
+ if not NOTION_SPACE_ID:
105
+ print("Warning: NOTION_SPACE_ID environment variable not set. Using a default UUID.")
106
+ # Using a default might not be ideal, depends on Notion's behavior
107
+ # Consider raising an error instead: raise ValueError("NOTION_SPACE_ID not set")
108
+ NOTION_SPACE_ID = str(uuid.uuid4()) # Default or raise error
109
+
110
+ # --- Cookie Management ---
111
+ browser_cookies = ""
112
+ cookie_lock = threading.Lock()
113
+ last_cookie_update = 0
114
+ COOKIE_UPDATE_INTERVAL = 30 * 60 # 30 minutes in seconds
115
+
116
+ async def get_browser_cookies():
117
+ """获取Notion网站的浏览器cookie"""
118
+ global browser_cookies, last_cookie_update
119
+
120
+ try:
121
+ print("正在获取Notion浏览器cookie...")
122
+ async with AsyncSession(impersonate="chrome136") as session:
123
+ response = await session.get("https://www.notion.so")
124
+
125
+ if response.status_code == 200:
126
+ # 获取所有cookie
127
+ cookies = response.cookies
128
+ notion_so_cookies = []
129
+
130
+ # 处理CookieConflict问题,只获取.notion.so域名的cookie
131
+ try:
132
+ # 尝试通过域名过滤来避免冲突
133
+ if hasattr(cookies, 'get_dict'):
134
+ # 使用get_dict方法并指定域名
135
+ notion_so_dict = cookies.get_dict(domain='.notion.so')
136
+ for name, value in notion_so_dict.items():
137
+ notion_so_cookies.append(f"{name}={value}")
138
+ elif hasattr(cookies, 'jar'):
139
+ # 如果cookies有jar属性,遍历并过滤域名
140
+ for cookie in cookies.jar:
141
+ if hasattr(cookie, 'domain') and cookie.domain:
142
+ if '.notion.so' in cookie.domain and '.notion.com' not in cookie.domain:
143
+ notion_so_cookies.append(f"{cookie.name}={cookie.value}")
144
+ else:
145
+ # 尝试手动构建cookie字符串,避免冲突
146
+ # 直接从响应头中提取Set-Cookie信息
147
+ set_cookie_headers = response.headers.get_list('Set-Cookie') if hasattr(response.headers, 'get_list') else []
148
+ if not set_cookie_headers and 'Set-Cookie' in response.headers:
149
+ set_cookie_headers = [response.headers['Set-Cookie']]
150
+
151
+ for cookie_header in set_cookie_headers:
152
+ if 'domain=.notion.so' in cookie_header or ('notion.so' in cookie_header and 'notion.com' not in cookie_header):
153
+ # 提取cookie名称和值
154
+ cookie_parts = cookie_header.split(';')[0].strip()
155
+ if '=' in cookie_parts:
156
+ notion_so_cookies.append(cookie_parts)
157
+
158
+ # 如果还是没有获取到,尝试使用requests-like的方式
159
+ if not notion_so_cookies and hasattr(response, 'cookies'):
160
+ try:
161
+ # 遍历所有cookie,手动过滤
162
+ for cookie in response.cookies:
163
+ if hasattr(cookie, 'domain') and cookie.domain and '.notion.so' in cookie.domain:
164
+ notion_so_cookies.append(f"{cookie.name}={cookie.value}")
165
+ except Exception as inner_e:
166
+ print(f"内部cookie处理错误: {inner_e}")
167
+
168
+ except Exception as cookie_error:
169
+ print(f"处理cookie时出现错误: {cookie_error}")
170
+ # 如果所有方法都失败,尝试从session获取
171
+ if hasattr(session, 'cookies'):
172
+ try:
173
+ for name, value in session.cookies.items():
174
+ notion_so_cookies.append(f"{name}={value}")
175
+ except:
176
+ pass
177
+
178
+ # 添加环境变量中的cookie,加上token_v2前缀
179
+ if NOTION_COOKIE:
180
+ notion_so_cookies.append(f"token_v2={NOTION_COOKIE}")
181
+
182
+ # 如果没有获取到任何cookie,至少使用环境变量的
183
+ if not notion_so_cookies and NOTION_COOKIE:
184
+ notion_so_cookies = [f"token_v2={NOTION_COOKIE}"]
185
+
186
+ with cookie_lock:
187
+ browser_cookies = "; ".join(notion_so_cookies)
188
+ last_cookie_update = time.time()
189
+
190
+ # 提取cookie名称用于日志显示
191
+ cookie_names = []
192
+ for cookie_str in notion_so_cookies:
193
+ if '=' in cookie_str:
194
+ name = cookie_str.split('=')[0]
195
+ cookie_names.append(name)
196
+
197
+ print(f"成功获取到 {len(notion_so_cookies)} 个cookie")
198
+ print(f"Cookie名称列表: {', '.join(cookie_names)}")
199
+ return True
200
+ else:
201
+ print(f"获取cookie失败,HTTP状态码: {response.status_code}")
202
+ return False
203
+
204
+ except Exception as e:
205
+ print(f"获取browser cookie时出错: {e}")
206
+ print(f"错误详情: {type(e).__name__}: {str(e)}")
207
+
208
+ # 如果完全失败,至少使用环境变量的cookie
209
+ if NOTION_COOKIE:
210
+ with cookie_lock:
211
+ browser_cookies = f"token_v2={NOTION_COOKIE}"
212
+ last_cookie_update = time.time()
213
+ print("使用环境变量cookie作为备用")
214
+ return True
215
+ return False
216
+
217
+ def should_update_cookies():
218
+ """检查是否需要更新cookie"""
219
+ return time.time() - last_cookie_update > COOKIE_UPDATE_INTERVAL
220
+
221
+ async def ensure_cookies_available():
222
+ """确保cookie可用,如果需要则更新"""
223
+ global browser_cookies
224
+
225
+ if not browser_cookies or should_update_cookies():
226
+ success = await get_browser_cookies()
227
+ if not success and not browser_cookies:
228
+ # 如果获取失败且没有备用cookie,使用环境变量的cookie
229
+ if NOTION_COOKIE:
230
+ with cookie_lock:
231
+ browser_cookies = f"token_v2={NOTION_COOKIE}"
232
+ print("使用环境变量cookie作为备用")
233
+ else:
234
+ raise HTTPException(status_code=500, detail="无法获取Notion cookie")
235
+
236
+ def start_cookie_updater():
237
+ """启动cookie定时更新器"""
238
+ def cookie_updater():
239
+ loop = asyncio.new_event_loop()
240
+ asyncio.set_event_loop(loop)
241
+
242
+ while True:
243
+ try:
244
+ if should_update_cookies():
245
+ print("开始定时更新cookie...")
246
+ loop.run_until_complete(get_browser_cookies())
247
+ time.sleep(60) # 每分钟检查一次
248
+ except Exception as e:
249
+ print(f"定时更新cookie时出错: {e}")
250
+ time.sleep(60)
251
+
252
+ thread = threading.Thread(target=cookie_updater, daemon=True)
253
+ thread.start()
254
+ print("cookie定时更新器已启动")
255
+
256
+ # --- Authentication ---
257
+ EXPECTED_TOKEN = os.getenv("PROXY_AUTH_TOKEN", "default_token") # Default token
258
+ security = HTTPBearer()
259
+
260
+ def authenticate(credentials: HTTPAuthorizationCredentials = Depends(security)):
261
+ """Compares provided token with the expected token."""
262
+ correct_token = secrets.compare_digest(credentials.credentials, EXPECTED_TOKEN)
263
+ if not correct_token:
264
+ raise HTTPException(
265
+ status_code=status.HTTP_401_UNAUTHORIZED,
266
+ detail="Invalid authentication credentials",
267
+ # WWW-Authenticate header removed for Bearer
268
+ )
269
+ return True # Indicate successful authentication
270
+
271
+ # --- Lifespan Event Handler ---
272
+ @asynccontextmanager
273
+ async def lifespan(app: FastAPI):
274
+ """应用生命周期管理"""
275
+ # 启动时的初始化
276
+ print("正在初始化Notion浏览器cookie...")
277
+ await get_browser_cookies()
278
+ # 启动cookie定时更新器
279
+ start_cookie_updater()
280
+ yield
281
+ # 关闭时的清理(如果需要)
282
+
283
+ # --- FastAPI App ---
284
+ app = FastAPI(lifespan=lifespan)
285
+
286
+ # --- Helper Functions ---
287
+
288
+ def build_notion_request(request_data: ChatCompletionRequest) -> NotionRequestBody:
289
+ """Transforms OpenAI-style messages to Notion transcript format."""
290
+ transcript = [
291
+ NotionTranscriptItem(
292
+ type="config",
293
+ value=NotionTranscriptConfigValue(model=request_data.notion_model)
294
+ )
295
+ ]
296
+ for message in request_data.messages:
297
+ # Map 'assistant' role to 'markdown-chat', all others to 'user'
298
+ if message.role == "assistant":
299
+ # Notion uses "markdown-chat" for assistant replies in the transcript history
300
+ transcript.append(NotionTranscriptItem(type="markdown-chat", value=message.content))
301
+ else:
302
+ # Map user, system, and any other potential roles to 'user'
303
+ transcript.append(NotionTranscriptItem(type="user", value=[[message.content]]))
304
+
305
+ # Use globally configured spaceId, set createThread=True
306
+ return NotionRequestBody(
307
+ spaceId=NOTION_SPACE_ID, # From environment variable
308
+ transcript=transcript,
309
+ createThread=True, # Always create a new thread
310
+ # Generate a new traceId for each request
311
+ traceId=str(uuid.uuid4()),
312
+ # Explicitly set debugOverrides, generateTitle, and saveAllThreadOperations
313
+ debugOverrides=NotionDebugOverrides(
314
+ cachedInferences={},
315
+ annotationInferences={},
316
+ emitInferences=False
317
+ ),
318
+ generateTitle=False,
319
+ saveAllThreadOperations=False
320
+ )
321
+
322
+
323
+ async def check_first_response_line(session: AsyncSession, notion_request_body: NotionRequestBody, headers: dict, request_id: int):
324
+ """检查响应的第一行,判断是否为500错误"""
325
+ try:
326
+ # 当并发请求数大于1时,添加随机延迟以避免同时到达
327
+ if CONCURRENT_REQUESTS > 1:
328
+ delay = random.uniform(0, 1.0)
329
+ print(f"并发请求 {request_id} 延迟 {delay:.2f}秒")
330
+ await asyncio.sleep(delay)
331
+
332
+ # 为每个并发请求创建独立的请求体,生成新的traceId
333
+ request_body_copy = notion_request_body.model_copy()
334
+ request_body_copy.traceId = str(uuid.uuid4())
335
+
336
+ response = await session.post(
337
+ NOTION_API_URL,
338
+ json=request_body_copy.model_dump(),
339
+ headers=headers,
340
+ stream=True
341
+ )
342
+
343
+ if response.status_code != 200:
344
+ return None, response, f"HTTP {response.status_code}"
345
+
346
+ # 读取第一行来检查是否是错误
347
+ buffer = ""
348
+ async for chunk in response.aiter_content():
349
+ if isinstance(chunk, bytes):
350
+ chunk = chunk.decode('utf-8')
351
+ buffer += chunk
352
+
353
+ # 尝试解析第一个完整的JSON行
354
+ lines = buffer.split('\n')
355
+ for line in lines:
356
+ line = line.strip()
357
+ if line:
358
+ try:
359
+ data = json.loads(line)
360
+ if (data.get("type") == "error" and
361
+ data.get("message") and
362
+ "error code 500" in data.get("message", "")):
363
+ print(f"并发请求 {request_id} 检测到500错误: {data}")
364
+ return None, response, "500 error"
365
+ else:
366
+ # 正常响应,返回response和已读取的buffer
367
+ print(f"并发请求 {request_id} 响应正常")
368
+ return (response, buffer), None, None
369
+ except json.JSONDecodeError:
370
+ continue
371
+
372
+ return None, response, "No valid response"
373
+ except Exception as e:
374
+ print(f"并发请求 {request_id} 发生异常: {e}")
375
+ return None, None, str(e)
376
+
377
+ async def stream_notion_response_single(session: AsyncSession, response, initial_buffer: str, chunk_id: str, created_time: int):
378
+ """处理单个响应的流式输出"""
379
+ buffer = initial_buffer
380
+
381
+ # 首先处理已经读取的buffer中的内容
382
+ lines = buffer.split('\n')
383
+ buffer = lines[-1]
384
+
385
+ for line in lines[:-1]:
386
+ line = line.strip()
387
+ if not line:
388
+ continue
389
+
390
+ try:
391
+ data = json.loads(line)
392
+
393
+ if data.get("type") == "markdown-chat" and isinstance(data.get("value"), str):
394
+ content_chunk = data["value"]
395
+ if content_chunk:
396
+ chunk_obj = ChatCompletionChunk(
397
+ id=chunk_id,
398
+ created=created_time,
399
+ choices=[Choice(delta=ChoiceDelta(content=content_chunk))]
400
+ )
401
+ yield f"data: {chunk_obj.model_dump_json()}\n\n"
402
+ elif "recordMap" in data:
403
+ print("Detected recordMap, stopping stream.")
404
+ # 继续处理剩余的buffer
405
+ if buffer.strip():
406
+ try:
407
+ last_data = json.loads(buffer.strip())
408
+ if last_data.get("type") == "markdown-chat" and isinstance(last_data.get("value"), str):
409
+ if last_data["value"]:
410
+ last_chunk = ChatCompletionChunk(
411
+ id=chunk_id,
412
+ created=created_time,
413
+ choices=[Choice(delta=ChoiceDelta(content=last_data["value"]))]
414
+ )
415
+ yield f"data: {last_chunk.model_dump_json()}\n\n"
416
+ except:
417
+ pass
418
+ return
419
+ except json.JSONDecodeError as e:
420
+ print(f"Warning: Could not decode JSON line: {line[:100]}... Error: {str(e)}")
421
+ except Exception as e:
422
+ print(f"Error processing line: {str(e)}")
423
+
424
+ # 继续读取剩余的响应
425
+ async for chunk in response.aiter_content():
426
+ if isinstance(chunk, bytes):
427
+ chunk = chunk.decode('utf-8')
428
+
429
+ buffer += chunk
430
+
431
+ lines = buffer.split('\n')
432
+ buffer = lines[-1]
433
+
434
+ for line in lines[:-1]:
435
+ line = line.strip()
436
+ if not line:
437
+ continue
438
+
439
+ try:
440
+ data = json.loads(line)
441
+
442
+ if data.get("type") == "markdown-chat" and isinstance(data.get("value"), str):
443
+ content_chunk = data["value"]
444
+ if content_chunk:
445
+ chunk_obj = ChatCompletionChunk(
446
+ id=chunk_id,
447
+ created=created_time,
448
+ choices=[Choice(delta=ChoiceDelta(content=content_chunk))]
449
+ )
450
+ yield f"data: {chunk_obj.model_dump_json()}\n\n"
451
+ elif "recordMap" in data:
452
+ print("Detected recordMap, stopping stream.")
453
+ if buffer.strip():
454
+ try:
455
+ last_data = json.loads(buffer.strip())
456
+ if last_data.get("type") == "markdown-chat" and isinstance(last_data.get("value"), str):
457
+ if last_data["value"]:
458
+ last_chunk = ChatCompletionChunk(
459
+ id=chunk_id,
460
+ created=created_time,
461
+ choices=[Choice(delta=ChoiceDelta(content=last_data["value"]))]
462
+ )
463
+ yield f"data: {last_chunk.model_dump_json()}\n\n"
464
+ except:
465
+ pass
466
+ return
467
+ except json.JSONDecodeError as e:
468
+ print(f"Warning: Could not decode JSON line: {line[:100]}... Error: {str(e)}")
469
+ except Exception as e:
470
+ print(f"Error processing line: {str(e)}")
471
+
472
+ async def stream_notion_response(notion_request_body: NotionRequestBody):
473
+ """Streams the request to Notion and yields OpenAI-compatible SSE chunks."""
474
+
475
+ # 确保cookie可用
476
+ await ensure_cookies_available()
477
+
478
+ with cookie_lock:
479
+ current_cookies = browser_cookies
480
+
481
+ headers = {
482
+ 'accept': 'application/x-ndjson',
483
+ 'accept-encoding': 'gzip, deflate, br, zstd',
484
+ 'accept-language': 'en-US,zh;q=0.9',
485
+ 'content-type': 'application/json',
486
+ 'dnt': '1',
487
+ 'notion-audit-log-platform': 'web',
488
+ 'notion-client-version': '23.13.0.3661',
489
+ 'origin': 'https://www.notion.so',
490
+ 'referer': 'https://www.notion.so/',
491
+ 'priority': 'u=1, i',
492
+ 'sec-ch-ua-mobile': '?0',
493
+ 'sec-ch-ua-platform': '"Windows"',
494
+ 'sec-fetch-dest': 'empty',
495
+ 'sec-fetch-mode': 'cors',
496
+ 'sec-fetch-site': 'same-origin',
497
+ 'cookie': current_cookies,
498
+ 'x-notion-space-id': NOTION_SPACE_ID
499
+ }
500
+
501
+ # Conditionally add the active user header
502
+ notion_active_user = os.getenv("NOTION_ACTIVE_USER_HEADER")
503
+ if notion_active_user: # Checks for None and empty string implicitly
504
+ headers['x-notion-active-user-header'] = notion_active_user
505
+
506
+ chunk_id = f"chatcmpl-{uuid.uuid4()}"
507
+ created_time = int(time.time())
508
+
509
+ # 使用全局重试配置
510
+ max_retries = MAX_RETRIES
511
+ retry_delay = RETRY_DELAY
512
+
513
+ # 首先尝试并发请求
514
+ print(f"同时发起 {CONCURRENT_REQUESTS} 个并发请求...")
515
+ async with AsyncSession(impersonate="chrome136") as session:
516
+ # 同时创建并发任务(每个都是独立的异步任务)
517
+ tasks = []
518
+ for i in range(CONCURRENT_REQUESTS):
519
+ task = asyncio.create_task(
520
+ check_first_response_line(session, notion_request_body, headers, i + 1)
521
+ )
522
+ tasks.append(task)
523
+
524
+ # 等待所有任务完成或找到第一个成功的响应
525
+ successful_response = None
526
+ failed_count = 0
527
+ completed_tasks = set()
528
+
529
+ while len(completed_tasks) < CONCURRENT_REQUESTS and not successful_response:
530
+ # 等待任意一个任务完成
531
+ done, pending = await asyncio.wait(
532
+ [t for t in tasks if t not in completed_tasks],
533
+ return_when=asyncio.FIRST_COMPLETED
534
+ )
535
+
536
+ for task in done:
537
+ completed_tasks.add(task)
538
+ result, response, error = await task
539
+ if result:
540
+ # 找到成功的响应,立即使用
541
+ successful_response = result
542
+ print(f"找到成功的并发响应,立即使用")
543
+ # 取消其他还在运行的任务
544
+ for t in tasks:
545
+ if t not in completed_tasks:
546
+ t.cancel()
547
+ break
548
+ else:
549
+ # 记录失败
550
+ failed_count += 1
551
+ if error:
552
+ print(f"并发请求失败: {error}")
553
+
554
+ # 如果有成功的响应,使用它进行流式传输
555
+ if successful_response:
556
+ response, initial_buffer = successful_response
557
+ print("使用成功的并发响应进行流式传输")
558
+
559
+ # 流式输出响应
560
+ async for data in stream_notion_response_single(session, response, initial_buffer, chunk_id, created_time):
561
+ yield data
562
+
563
+ # Send the final chunk indicating stop
564
+ final_chunk = ChatCompletionChunk(
565
+ id=chunk_id,
566
+ created=created_time,
567
+ choices=[Choice(delta=ChoiceDelta(), finish_reason="stop")]
568
+ )
569
+ yield f"data: {final_chunk.model_dump_json()}\n\n"
570
+ yield "data: [DONE]\n\n"
571
+ return
572
+
573
+ # 只有当所有并发请求都失败时,才进入重试流程
574
+ print(f"所有 {CONCURRENT_REQUESTS} 个并发请求都失败,开始单请求重试流程...")
575
+
576
+ # 进入原有的重试逻辑(不使用并发)
577
+ for attempt in range(max_retries):
578
+ try:
579
+ # Using curl_cffi with chrome136 impersonation for better anti-bot bypass
580
+ async with AsyncSession(impersonate="chrome136") as session:
581
+ # Stream the response
582
+ response = await session.post(
583
+ NOTION_API_URL,
584
+ json=notion_request_body.model_dump(),
585
+ headers=headers,
586
+ stream=True
587
+ )
588
+
589
+ if response.status_code != 200:
590
+ error_content = await response.atext()
591
+ print(f"Error from Notion API: {response.status_code}")
592
+ print(f"Response: {error_content}")
593
+ raise HTTPException(status_code=response.status_code, detail=f"Notion API Error: {error_content}")
594
+
595
+ # Process streaming response
596
+ # curl_cffi streaming works differently - we need to read the content in chunks
597
+ buffer = ""
598
+ first_line_checked = False
599
+ is_error_response = False
600
+
601
+ async for chunk in response.aiter_content():
602
+ # Decode chunk if it's bytes
603
+ if isinstance(chunk, bytes):
604
+ chunk = chunk.decode('utf-8')
605
+
606
+ buffer += chunk
607
+
608
+ # Split by newlines and process complete lines
609
+ lines = buffer.split('\n')
610
+ # Keep the last incomplete line in the buffer
611
+ buffer = lines[-1]
612
+
613
+ for line in lines[:-1]:
614
+ line = line.strip()
615
+ if not line:
616
+ continue
617
+
618
+ try:
619
+ data = json.loads(line)
620
+
621
+ # 检查第一行是否是500错误响应
622
+ if not first_line_checked:
623
+ first_line_checked = True
624
+ if (data.get("type") == "error" and
625
+ data.get("message") and
626
+ "error code 500" in data.get("message", "")):
627
+ print(f"检测到Notion API 500错误 (重试 {attempt + 1}/{max_retries}): {data}")
628
+ is_error_response = True
629
+ break
630
+
631
+ # 如果不是错误响应,实时流式转发
632
+ # Check if it's the type of message containing text chunks
633
+ if data.get("type") == "markdown-chat" and isinstance(data.get("value"), str):
634
+ content_chunk = data["value"]
635
+ if content_chunk: # Only send if there's content
636
+ chunk_obj = ChatCompletionChunk(
637
+ id=chunk_id,
638
+ created=created_time,
639
+ choices=[Choice(delta=ChoiceDelta(content=content_chunk))]
640
+ )
641
+ yield f"data: {chunk_obj.model_dump_json()}\n\n"
642
+ # Add logic here to detect the end of the stream if Notion has a specific marker
643
+ # For now, we assume markdown-chat stops when the main content is done.
644
+ # If we see a recordMap, it's definitely past the text stream.
645
+ elif "recordMap" in data:
646
+ print("Detected recordMap, stopping stream.")
647
+ # Process any remaining buffer
648
+ if buffer.strip():
649
+ try:
650
+ last_data = json.loads(buffer.strip())
651
+ if last_data.get("type") == "markdown-chat" and isinstance(last_data.get("value"), str):
652
+ if last_data["value"]:
653
+ last_chunk = ChatCompletionChunk(
654
+ id=chunk_id,
655
+ created=created_time,
656
+ choices=[Choice(delta=ChoiceDelta(content=last_data["value"]))]
657
+ )
658
+ yield f"data: {last_chunk.model_dump_json()}\n\n"
659
+ except:
660
+ pass
661
+ # Exit the loop
662
+ break
663
+
664
+ except json.JSONDecodeError as e:
665
+ print(f"Warning: Could not decode JSON line: {line[:100]}... Error: {str(e)}")
666
+ except Exception as e:
667
+ print(f"Error processing line: {str(e)}")
668
+ # Continue processing other lines
669
+
670
+ if is_error_response:
671
+ break
672
+
673
+ # 如果检测到错误,进行重试
674
+ if is_error_response:
675
+ if attempt < max_retries - 1:
676
+ print(f"等待 {retry_delay} 秒后重试...")
677
+ await asyncio.sleep(retry_delay)
678
+ continue # 重试
679
+ else:
680
+ # 所有重试都失败了,通过流式响应返回错误信息
681
+ print("所有重试都失败,返回500错误给客户端")
682
+ error_chunk = ChatCompletionChunk(
683
+ id=chunk_id,
684
+ created=created_time,
685
+ choices=[Choice(delta=ChoiceDelta(content="Error: Notion API returned error code 500 after all retries"), finish_reason="stop")]
686
+ )
687
+ yield f"data: {error_chunk.model_dump_json()}\n\n"
688
+ yield "data: [DONE]\n\n"
689
+ return
690
+
691
+ # 如果没有错误,发送最终的停止信号
692
+ # Send the final chunk indicating stop
693
+ final_chunk = ChatCompletionChunk(
694
+ id=chunk_id,
695
+ created=created_time,
696
+ choices=[Choice(delta=ChoiceDelta(), finish_reason="stop")]
697
+ )
698
+ yield f"data: {final_chunk.model_dump_json()}\n\n"
699
+ yield "data: [DONE]\n\n"
700
+
701
+ # 成功完成,退出重试循环
702
+ break
703
+
704
+ except HTTPException:
705
+ # 在流式响应中不能抛出HTTPException,通过流式响应返回错误
706
+ if attempt < max_retries - 1:
707
+ print(f"HTTP异常,等待 {retry_delay} 秒后重试...")
708
+ await asyncio.sleep(retry_delay)
709
+ continue
710
+ else:
711
+ print("HTTP异常且无更多重试,返回错误信息")
712
+ error_chunk = ChatCompletionChunk(
713
+ id=chunk_id,
714
+ created=created_time,
715
+ choices=[Choice(delta=ChoiceDelta(content="Error: HTTP exception occurred after all retries"), finish_reason="stop")]
716
+ )
717
+ yield f"data: {error_chunk.model_dump_json()}\n\n"
718
+ yield "data: [DONE]\n\n"
719
+ return
720
+ except Exception as e:
721
+ print(f"Unexpected error during streaming (attempt {attempt + 1}/{max_retries}): {e}")
722
+ if attempt < max_retries - 1:
723
+ print(f"等待 {retry_delay} 秒后重试...")
724
+ await asyncio.sleep(retry_delay)
725
+ continue
726
+ else:
727
+ print("意外错误且无更多重试,返回错误信息")
728
+ error_chunk = ChatCompletionChunk(
729
+ id=chunk_id,
730
+ created=created_time,
731
+ choices=[Choice(delta=ChoiceDelta(content=f"Error: Internal server error during streaming: {e}"), finish_reason="stop")]
732
+ )
733
+ yield f"data: {error_chunk.model_dump_json()}\n\n"
734
+ yield "data: [DONE]\n\n"
735
+ return
736
+
737
+
738
+ # --- API Endpoints ---
739
+
740
+ @app.get("/v1/models", response_model=ModelList)
741
+ async def list_models(authenticated: bool = Depends(authenticate)):
742
+ """
743
+ Endpoint to list available Notion models, mimicking OpenAI's /v1/models.
744
+ """
745
+ available_models = [
746
+ "openai-gpt-4.1",
747
+ "anthropic-opus-4.1",
748
+ "anthropic-sonnet-4",
749
+ "openai-gpt-4.1",
750
+ "anthropic-sonnet-3.x-stable",
751
+ "google-gemini-2.5-pro",
752
+ "google-gemini-2.5-flash",
753
+ "openai-gpt-5-beta"
754
+ ]
755
+ model_list = [
756
+ Model(id=model_id, owned_by="notion") # created uses default_factory
757
+ for model_id in available_models
758
+ ]
759
+ return ModelList(data=model_list)
760
+
761
+ @app.post("/v1/chat/completions")
762
+ async def chat_completions(request_data: ChatCompletionRequest, request: Request, authenticated: bool = Depends(authenticate)):
763
+ """
764
+ Endpoint to mimic OpenAI's chat completions, proxying to Notion.
765
+ """
766
+ if not NOTION_COOKIE:
767
+ raise HTTPException(status_code=500, detail="Server configuration error: Notion cookie not set.")
768
+
769
+ notion_request_body = build_notion_request(request_data)
770
+
771
+ if request_data.stream:
772
+ return StreamingResponse(
773
+ stream_notion_response(notion_request_body),
774
+ media_type="text/event-stream"
775
+ )
776
+ else:
777
+ # --- Non-Streaming Logic (Optional - Collects stream internally) ---
778
+ # Note: The primary goal is streaming, but a non-streaming version
779
+ # might be useful for testing or simpler clients.
780
+ # This requires collecting all chunks from the async generator.
781
+ full_response_content = ""
782
+ final_finish_reason = None
783
+ chunk_id = f"chatcmpl-{uuid.uuid4()}" # Generate ID for the non-streamed response
784
+ created_time = int(time.time())
785
+
786
+ try:
787
+ async for line in stream_notion_response(notion_request_body):
788
+ if line.startswith("data: ") and "[DONE]" not in line:
789
+ try:
790
+ data_json = line[len("data: "):].strip()
791
+ if data_json:
792
+ chunk_data = json.loads(data_json)
793
+ if chunk_data.get("choices"):
794
+ delta = chunk_data["choices"][0].get("delta", {})
795
+ content = delta.get("content")
796
+ if content:
797
+ full_response_content += content
798
+ finish_reason = chunk_data["choices"][0].get("finish_reason")
799
+ if finish_reason:
800
+ final_finish_reason = finish_reason
801
+ except json.JSONDecodeError:
802
+ print(f"Warning: Could not decode JSON line in non-streaming mode: {line}")
803
+
804
+ # Construct the final OpenAI-compatible non-streaming response
805
+ return {
806
+ "id": chunk_id,
807
+ "object": "chat.completion",
808
+ "created": created_time,
809
+ "model": request_data.model, # Return the model requested by the client
810
+ "choices": [
811
+ {
812
+ "index": 0,
813
+ "message": {
814
+ "role": "assistant",
815
+ "content": full_response_content,
816
+ },
817
+ "finish_reason": final_finish_reason or "stop", # Default to stop if not explicitly set
818
+ }
819
+ ],
820
+ "usage": { # Note: Token usage is not available from Notion
821
+ "prompt_tokens": None,
822
+ "completion_tokens": None,
823
+ "total_tokens": None,
824
+ },
825
+ }
826
+ except HTTPException as e:
827
+ # Re-raise HTTP exceptions from the streaming function
828
+ raise e
829
+ except Exception as e:
830
+ print(f"Error during non-streaming processing: {e}")
831
+ raise HTTPException(status_code=500, detail="Internal server error processing Notion response")
832
+
833
+ if __name__ == "__main__":
834
+ import uvicorn
835
+ print("Starting server. Access at http://localhost:7860")
836
+ print("Ensure NOTION_COOKIE is set in your .env file or environment.")
837
+ print("Cookie管理系统已启用,将自动获取和更新Notion浏览器cookie")
838
+
839
+ # 运行服务器
840
+ uvicorn.run(app, host="0.0.0.0", port=7860)