understanding commited on
Commit
12a3d95
·
verified ·
1 Parent(s): d4eabd4

Update bot/youtube/resume.py

Browse files
Files changed (1) hide show
  1. bot/youtube/resume.py +137 -21
bot/youtube/resume.py CHANGED
@@ -1,7 +1,11 @@
1
  # PATH: bot/youtube/resume.py
2
  import os
 
 
3
  from typing import Callable, Awaitable
 
4
  import httpx
 
5
  from bot.core.settings import YOUTUBE_CHUNK_SIZE
6
 
7
  ProgressCB = Callable[[int, int], Awaitable[None]]
@@ -9,6 +13,20 @@ ProgressCB = Callable[[int, int], Awaitable[None]]
9
  YOUTUBE_RESUMABLE_INIT = "https://www.googleapis.com/upload/youtube/v3/videos"
10
 
11
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
12
  async def start_resumable_session(access_token: str, metadata: dict) -> str:
13
  """
14
  Returns upload_url (Location header)
@@ -19,7 +37,6 @@ async def start_resumable_session(access_token: str, metadata: dict) -> str:
19
  "X-Upload-Content-Type": "video/*",
20
  }
21
  params = {"uploadType": "resumable", "part": "snippet,status"}
22
-
23
  async with httpx.AsyncClient(timeout=60, follow_redirects=True) as c:
24
  r = await c.post(YOUTUBE_RESUMABLE_INIT, params=params, headers=headers, json=metadata)
25
  if r.status_code >= 400:
@@ -30,12 +47,43 @@ async def start_resumable_session(access_token: str, metadata: dict) -> str:
30
  return upload_url
31
 
32
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
33
  async def upload_resumable(
34
  upload_url: str,
35
  file_path: str,
36
  access_token: str,
37
- *,
38
- progress_cb: ProgressCB | None = None, # ✅ keyword-only (prevents “multiple values”)
39
  ) -> dict:
40
  """
41
  Upload file in chunks to upload_url.
@@ -46,38 +94,106 @@ async def upload_resumable(
46
 
47
  headers_base = {
48
  "Authorization": f"Bearer {access_token}",
 
49
  }
50
 
51
- async with httpx.AsyncClient(timeout=120, follow_redirects=True) as c:
 
 
 
 
 
52
  with open(file_path, "rb") as f:
53
  while sent < total:
54
- chunk = f.read(YOUTUBE_CHUNK_SIZE)
 
 
 
 
55
  if not chunk:
56
  break
57
 
58
  start = sent
59
- end = sent + len(chunk) - 1
60
 
61
  headers = dict(headers_base)
62
  headers["Content-Length"] = str(len(chunk))
63
- headers["Content-Type"] = "video/*"
64
  headers["Content-Range"] = f"bytes {start}-{end}/{total}"
65
 
66
- r = await c.put(upload_url, headers=headers, content=chunk)
67
-
68
- # 308 Resume Incomplete is normal for chunked upload
69
- if r.status_code == 308:
70
- sent = end + 1
71
- if progress_cb:
72
- await progress_cb(sent, total)
73
- continue
74
-
75
- if r.status_code >= 400:
76
- raise RuntimeError(f"resumable_put_failed:{r.status_code}:{r.text[:200]}")
77
-
78
- j = r.json()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
79
  if progress_cb:
80
  await progress_cb(total, total)
81
- return j
 
 
 
 
 
 
 
 
82
 
83
  raise RuntimeError("resumable_upload_incomplete")
 
1
  # PATH: bot/youtube/resume.py
2
  import os
3
+ import re
4
+ import asyncio
5
  from typing import Callable, Awaitable
6
+
7
  import httpx
8
+
9
  from bot.core.settings import YOUTUBE_CHUNK_SIZE
10
 
11
  ProgressCB = Callable[[int, int], Awaitable[None]]
 
13
  YOUTUBE_RESUMABLE_INIT = "https://www.googleapis.com/upload/youtube/v3/videos"
14
 
15
 
16
+ def _parse_range_last_byte(headers: dict) -> int | None:
17
+ """
18
+ Range header examples:
19
+ - "bytes=0-1048575"
20
+ - "0-1048575"
21
+ Return last received byte index, or None if missing/unparseable.
22
+ """
23
+ v = headers.get("Range") or headers.get("range") or ""
24
+ m = re.search(r"(\d+)\s*-\s*(\d+)", v)
25
+ if not m:
26
+ return None
27
+ return int(m.group(2))
28
+
29
+
30
  async def start_resumable_session(access_token: str, metadata: dict) -> str:
31
  """
32
  Returns upload_url (Location header)
 
37
  "X-Upload-Content-Type": "video/*",
38
  }
39
  params = {"uploadType": "resumable", "part": "snippet,status"}
 
40
  async with httpx.AsyncClient(timeout=60, follow_redirects=True) as c:
41
  r = await c.post(YOUTUBE_RESUMABLE_INIT, params=params, headers=headers, json=metadata)
42
  if r.status_code >= 400:
 
47
  return upload_url
48
 
49
 
50
+ async def _query_upload_status(
51
+ c: httpx.AsyncClient,
52
+ upload_url: str,
53
+ headers_base: dict,
54
+ total: int,
55
+ ) -> tuple[int, dict | None]:
56
+ """
57
+ Asks server what it has already received.
58
+ Returns (last_received_byte, final_json_if_completed)
59
+ """
60
+ headers = dict(headers_base)
61
+ headers["Content-Length"] = "0"
62
+ headers["Content-Range"] = f"bytes */{total}"
63
+
64
+ r = await c.put(upload_url, headers=headers)
65
+
66
+ if r.status_code == 308:
67
+ last = _parse_range_last_byte(r.headers)
68
+ if last is None:
69
+ return -1, None
70
+ return last, None
71
+
72
+ if r.status_code in (200, 201):
73
+ j = r.json() if r.text else {}
74
+ return total - 1, j
75
+
76
+ if r.status_code >= 400:
77
+ raise RuntimeError(f"resumable_status_failed:{r.status_code}:{r.text[:200]}")
78
+
79
+ return -1, None
80
+
81
+
82
  async def upload_resumable(
83
  upload_url: str,
84
  file_path: str,
85
  access_token: str,
86
+ progress_cb: ProgressCB | None = None,
 
87
  ) -> dict:
88
  """
89
  Upload file in chunks to upload_url.
 
94
 
95
  headers_base = {
96
  "Authorization": f"Bearer {access_token}",
97
+ "Content-Type": "video/*",
98
  }
99
 
100
+ retryable = {429, 500, 502, 503, 504}
101
+ max_retries = 8
102
+
103
+ timeout = httpx.Timeout(connect=30.0, read=300.0, write=300.0, pool=30.0)
104
+
105
+ async with httpx.AsyncClient(timeout=timeout, follow_redirects=True) as c:
106
  with open(file_path, "rb") as f:
107
  while sent < total:
108
+ # Always seek to where server says it is
109
+ f.seek(sent)
110
+
111
+ to_read = min(YOUTUBE_CHUNK_SIZE, total - sent)
112
+ chunk = f.read(to_read)
113
  if not chunk:
114
  break
115
 
116
  start = sent
117
+ end = start + len(chunk) - 1
118
 
119
  headers = dict(headers_base)
120
  headers["Content-Length"] = str(len(chunk))
 
121
  headers["Content-Range"] = f"bytes {start}-{end}/{total}"
122
 
123
+ attempt = 0
124
+ while True:
125
+ try:
126
+ r = await c.put(upload_url, headers=headers, content=chunk)
127
+ except (httpx.TimeoutException, httpx.NetworkError):
128
+ # Network issue: ask server what it has, then resume
129
+ last, done_json = await _query_upload_status(c, upload_url, headers_base, total)
130
+ if done_json is not None:
131
+ if progress_cb:
132
+ await progress_cb(total, total)
133
+ return done_json
134
+ sent = max(sent, last + 1)
135
+ if progress_cb:
136
+ await progress_cb(sent, total)
137
+ break # go to next outer loop iteration
138
+
139
+ # Normal: 308 means "keep going" but must read Range
140
+ if r.status_code == 308:
141
+ last = _parse_range_last_byte(r.headers)
142
+ if last is None:
143
+ # If Range missing, do an explicit status query
144
+ last, done_json = await _query_upload_status(c, upload_url, headers_base, total)
145
+ if done_json is not None:
146
+ if progress_cb:
147
+ await progress_cb(total, total)
148
+ return done_json
149
+ sent = max(sent, last + 1)
150
+ if progress_cb:
151
+ await progress_cb(sent, total)
152
+ break # next outer loop chunk (from sent)
153
+
154
+ # Success: final response JSON
155
+ if r.status_code in (200, 201):
156
+ j = r.json() if r.text else {}
157
+ if progress_cb:
158
+ await progress_cb(total, total)
159
+ return j
160
+
161
+ # Retryable server responses
162
+ if r.status_code in retryable and attempt < max_retries:
163
+ attempt += 1
164
+ # Before retrying, query status so we don't duplicate wrong chunk
165
+ last, done_json = await _query_upload_status(c, upload_url, headers_base, total)
166
+ if done_json is not None:
167
+ if progress_cb:
168
+ await progress_cb(total, total)
169
+ return done_json
170
+ sent = max(sent, last + 1)
171
+ if progress_cb:
172
+ await progress_cb(sent, total)
173
+ await asyncio.sleep(min(2 ** attempt, 20))
174
+ # break to outer loop (rebuild chunk from new sent)
175
+ break
176
+
177
+ # Non-retryable error
178
+ if r.status_code >= 400:
179
+ raise RuntimeError(f"resumable_put_failed:{r.status_code}:{r.text[:200]}")
180
+
181
+ # Unexpected status: treat like error
182
+ raise RuntimeError(f"resumable_put_unexpected:{r.status_code}:{r.text[:200]}")
183
+
184
+ # If loop ended, do a final status check instead of blindly erroring
185
+ last, done_json = await _query_upload_status(c, upload_url, headers_base, total)
186
+ if done_json is not None:
187
  if progress_cb:
188
  await progress_cb(total, total)
189
+ return done_json
190
+
191
+ if last >= total - 1:
192
+ # Server claims full data but didn't return JSON (rare), try one more status query
193
+ last2, done_json2 = await _query_upload_status(c, upload_url, headers_base, total)
194
+ if done_json2 is not None:
195
+ if progress_cb:
196
+ await progress_cb(total, total)
197
+ return done_json2
198
 
199
  raise RuntimeError("resumable_upload_incomplete")