understanding commited on
Commit
f1bd292
·
verified ·
1 Parent(s): 8637af4

Update bot/youtube/resume.py

Browse files
Files changed (1) hide show
  1. bot/youtube/resume.py +93 -54
bot/youtube/resume.py CHANGED
@@ -1,79 +1,118 @@
1
  # PATH: bot/youtube/resume.py
2
  import os
3
- from typing import Callable, Awaitable
4
  import httpx
5
- from bot.core.settings import YOUTUBE_CHUNK_SIZE
6
-
7
- ProgressCB = Callable[[int, int], Awaitable[None]]
8
 
9
- YOUTUBE_RESUMABLE_INIT = "https://www.googleapis.com/upload/youtube/v3/videos"
 
10
 
11
  async def start_resumable_session(access_token: str, metadata: dict) -> str:
12
  """
13
- Returns upload_url (Location header)
14
  """
 
15
  headers = {
16
  "Authorization": f"Bearer {access_token}",
17
  "Content-Type": "application/json; charset=UTF-8",
18
- "X-Upload-Content-Type": "video/*",
 
19
  }
20
- params = {"uploadType": "resumable", "part": "snippet,status"}
21
- async with httpx.AsyncClient(timeout=60, follow_redirects=True) as c:
22
- r = await c.post(YOUTUBE_RESUMABLE_INIT, params=params, headers=headers, json=metadata)
23
- if r.status_code >= 400:
24
- raise RuntimeError(f"resumable_init_failed:{r.status_code}:{r.text[:200]}")
25
- upload_url = r.headers.get("Location") or r.headers.get("location")
 
 
26
  if not upload_url:
27
- raise RuntimeError("resumable_init_no_location")
 
28
  return upload_url
29
 
30
  async def upload_resumable(
31
- upload_url: str,
32
- file_path: str,
33
- access_token: str,
34
- progress_cb: ProgressCB | None = None,
35
  ) -> dict:
36
  """
37
- Upload file in chunks to upload_url.
38
- Returns YouTube API JSON (contains id).
39
  """
40
- total = os.path.getsize(file_path)
41
- sent = 0
42
-
43
- headers_base = {
44
- "Authorization": f"Bearer {access_token}",
45
- }
46
-
47
- async with httpx.AsyncClient(timeout=120, follow_redirects=True) as c:
48
  with open(file_path, "rb") as f:
49
- while sent < total:
50
- chunk = f.read(YOUTUBE_CHUNK_SIZE)
51
- if not chunk:
52
- break
53
- start = sent
54
- end = sent + len(chunk) - 1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
55
 
56
- headers = dict(headers_base)
57
- headers["Content-Length"] = str(len(chunk))
58
- headers["Content-Type"] = "video/*"
59
- headers["Content-Range"] = f"bytes {start}-{end}/{total}"
60
 
61
- r = await c.put(upload_url, headers=headers, content=chunk)
 
 
 
 
 
 
62
 
63
- # 308 Resume Incomplete is normal for chunked upload
64
- if r.status_code == 308:
65
- sent = end + 1
66
- if progress_cb:
67
- await progress_cb(sent, total)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
68
  continue
69
 
70
- if r.status_code >= 400:
71
- raise RuntimeError(f"resumable_put_failed:{r.status_code}:{r.text[:200]}")
72
-
73
- # success: final response JSON
74
- j = r.json()
75
- if progress_cb:
76
- await progress_cb(total, total)
77
- return j
78
-
79
- raise RuntimeError("resumable_upload_incomplete")
 
1
  # PATH: bot/youtube/resume.py
2
  import os
 
3
  import httpx
4
+ import json
5
+ import asyncio
6
+ from typing import Callable, Awaitable
7
 
8
+ # Standard chunk size (multiple of 256KB). 10MB is a good balance for RAM/Speed.
9
+ CHUNK_SIZE = 10 * 1024 * 1024
10
 
11
  async def start_resumable_session(access_token: str, metadata: dict) -> str:
12
  """
13
+ Initializes the upload and gets the unique Upload URL.
14
  """
15
+ url = "https://www.googleapis.com/upload/youtube/v3/videos?uploadType=resumable&part=snippet,status"
16
  headers = {
17
  "Authorization": f"Bearer {access_token}",
18
  "Content-Type": "application/json; charset=UTF-8",
19
+ "X-Upload-Content-Type": "video/mp4",
20
+ "X-Upload-Content-Length": "0" # Initial draft is 0
21
  }
22
+
23
+ async with httpx.AsyncClient() as client:
24
+ r = await client.post(url, headers=headers, json=metadata, timeout=30.0)
25
+
26
+ if r.status_code != 200:
27
+ raise RuntimeError(f"Initiate failed: {r.status_code} - {r.text}")
28
+
29
+ upload_url = r.headers.get("Location")
30
  if not upload_url:
31
+ raise RuntimeError("No upload URL returned from YouTube.")
32
+
33
  return upload_url
34
 
35
  async def upload_resumable(
36
+ upload_url: str,
37
+ file_path: str,
38
+ access_token: str,
39
+ progress_cb: Callable[[int, int], Awaitable[None]] = None
40
  ) -> dict:
41
  """
42
+ Uploads the file chunks. Handles 308 Resume Incomplete robustly.
 
43
  """
44
+ file_size = os.path.getsize(file_path)
45
+
46
+ # Create a persistent client
47
+ async with httpx.AsyncClient(timeout=120.0) as client:
 
 
 
 
48
  with open(file_path, "rb") as f:
49
+ while True:
50
+ # 1. Query status (Ask YouTube: "How much have you got?")
51
+ # We do this by sending an empty PUT with Content-Range: bytes */total
52
+ r_status = await client.put(
53
+ upload_url,
54
+ headers={"Content-Range": f"bytes */{file_size}"},
55
+ content=None
56
+ )
57
+
58
+ # 2. Determine Offset
59
+ if r_status.status_code == 308:
60
+ range_header = r_status.headers.get("Range")
61
+ if range_header:
62
+ # Header format: "bytes=0-999" -> next byte is 1000
63
+ next_byte = int(range_header.split("-")[1]) + 1
64
+ else:
65
+ # No bytes uploaded yet
66
+ next_byte = 0
67
+ elif r_status.status_code in [200, 201]:
68
+ # Already done!
69
+ if progress_cb: await progress_cb(file_size, file_size)
70
+ return r_status.json()
71
+ else:
72
+ # Some other error (4xx/5xx)
73
+ # Try to start from 0 if status failed, or raise error
74
+ next_byte = 0
75
 
76
+ if next_byte >= file_size:
77
+ break # Should be handled by 200/201 check, but safety first
 
 
78
 
79
+ # 3. Read Chunk
80
+ f.seek(next_byte)
81
+ chunk = f.read(CHUNK_SIZE)
82
+ chunk_len = len(chunk)
83
+
84
+ if chunk_len == 0:
85
+ break
86
 
87
+ # 4. Upload Chunk
88
+ content_range = f"bytes {next_byte}-{next_byte + chunk_len - 1}/{file_size}"
89
+ headers = {
90
+ "Content-Length": str(chunk_len),
91
+ "Content-Range": content_range
92
+ }
93
+
94
+ try:
95
+ r_upload = await client.put(upload_url, headers=headers, content=chunk)
96
+
97
+ if r_upload.status_code in [200, 201]:
98
+ # Upload Complete!
99
+ if progress_cb: await progress_cb(file_size, file_size)
100
+ return r_upload.json()
101
+
102
+ elif r_upload.status_code == 308:
103
+ # Chunk received, loop again to send next
104
+ if progress_cb: await progress_cb(next_byte + chunk_len, file_size)
105
+ continue
106
+
107
+ else:
108
+ # Retryable error? Wait and loop (the status check at top will fix offset)
109
+ print(f"Chunk error {r_upload.status_code}, retrying...")
110
+ await asyncio.sleep(5)
111
+ continue
112
+
113
+ except Exception as e:
114
+ print(f"Network error: {e}, retrying...")
115
+ await asyncio.sleep(5)
116
  continue
117
 
118
+ raise RuntimeError("resumable_upload_incomplete: Loop finished without 200 OK")