understanding commited on
Commit
045dcbe
·
verified ·
1 Parent(s): eb339a4

Update bot/youtube/resume.py

Browse files
Files changed (1) hide show
  1. bot/youtube/resume.py +148 -100
bot/youtube/resume.py CHANGED
@@ -1,8 +1,10 @@
1
  # PATH: bot/youtube/resume.py
 
 
 
2
  import os
3
  import re
4
- import asyncio
5
- from typing import Callable, Awaitable
6
 
7
  import httpx
8
 
@@ -13,7 +15,7 @@ ProgressCB = Callable[[int, int], Awaitable[None]]
13
  YOUTUBE_RESUMABLE_INIT = "https://www.googleapis.com/upload/youtube/v3/videos"
14
 
15
 
16
- def _parse_range_last_byte(headers: dict) -> int | None:
17
  """
18
  Range header examples:
19
  - "bytes=0-1048575"
@@ -27,24 +29,50 @@ def _parse_range_last_byte(headers: dict) -> int | None:
27
  return int(m.group(2))
28
 
29
 
30
- async def start_resumable_session(access_token: str, metadata: dict) -> str:
 
 
 
 
 
31
  """
32
- Returns upload_url (Location header)
 
 
 
 
33
  """
 
 
 
34
  headers = {
35
  "Authorization": f"Bearer {access_token}",
36
  "Content-Type": "application/json; charset=UTF-8",
37
  "X-Upload-Content-Type": "video/*",
38
  }
 
 
 
 
39
  params = {"uploadType": "resumable", "part": "snippet,status"}
40
- async with httpx.AsyncClient(timeout=60, follow_redirects=True) as c:
41
- r = await c.post(YOUTUBE_RESUMABLE_INIT, params=params, headers=headers, json=metadata)
42
- if r.status_code >= 400:
43
- raise RuntimeError(f"resumable_init_failed:{r.status_code}:{r.text[:200]}")
44
- upload_url = r.headers.get("Location") or r.headers.get("location")
45
- if not upload_url:
46
- raise RuntimeError("resumable_init_no_location")
47
- return upload_url
 
 
 
 
 
 
 
 
 
 
48
 
49
 
50
  async def _query_upload_status(
@@ -52,7 +80,7 @@ async def _query_upload_status(
52
  upload_url: str,
53
  headers_base: dict,
54
  total: int,
55
- ) -> tuple[int, dict | None]:
56
  """
57
  Asks server what it has already received.
58
  Returns (last_received_byte, final_json_if_completed)
@@ -74,22 +102,41 @@ async def _query_upload_status(
74
  return total - 1, j
75
 
76
  if r.status_code >= 400:
77
- raise RuntimeError(f"resumable_status_failed:{r.status_code}:{r.text[:200]}")
78
 
79
  return -1, None
80
 
81
 
82
  async def upload_resumable(
83
- upload_url: str,
 
84
  file_path: str,
85
  access_token: str,
86
- progress_cb: ProgressCB | None = None,
 
87
  ) -> dict:
88
  """
89
- Upload file in chunks to upload_url.
90
- Returns YouTube API JSON (contains id).
 
 
 
91
  """
92
- total = os.path.getsize(file_path)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
93
  sent = 0
94
 
95
  headers_base = {
@@ -102,98 +149,99 @@ async def upload_resumable(
102
 
103
  timeout = httpx.Timeout(connect=30.0, read=300.0, write=300.0, pool=30.0)
104
 
105
- async with httpx.AsyncClient(timeout=timeout, follow_redirects=True) as c:
106
- with open(file_path, "rb") as f:
107
- while sent < total:
108
- # Always seek to where server says it is
109
- f.seek(sent)
110
-
111
- to_read = min(YOUTUBE_CHUNK_SIZE, total - sent)
112
- chunk = f.read(to_read)
113
- if not chunk:
114
- break
115
-
116
- start = sent
117
- end = start + len(chunk) - 1
118
-
119
- headers = dict(headers_base)
120
- headers["Content-Length"] = str(len(chunk))
121
- headers["Content-Range"] = f"bytes {start}-{end}/{total}"
122
-
123
- attempt = 0
124
- while True:
125
- try:
126
- r = await c.put(upload_url, headers=headers, content=chunk)
127
- except (httpx.TimeoutException, httpx.NetworkError):
128
- # Network issue: ask server what it has, then resume
129
- last, done_json = await _query_upload_status(c, upload_url, headers_base, total)
130
- if done_json is not None:
131
- if progress_cb:
132
- await progress_cb(total, total)
133
- return done_json
134
- sent = max(sent, last + 1)
135
- if progress_cb:
136
- await progress_cb(sent, total)
137
- break # go to next outer loop iteration
138
-
139
- # Normal: 308 means "keep going" but must read Range
140
- if r.status_code == 308:
141
- last = _parse_range_last_byte(r.headers)
142
- if last is None:
143
- # If Range missing, do an explicit status query
144
- last, done_json = await _query_upload_status(c, upload_url, headers_base, total)
145
  if done_json is not None:
146
  if progress_cb:
147
  await progress_cb(total, total)
148
- return done_json
149
- sent = max(sent, last + 1)
150
- if progress_cb:
151
- await progress_cb(sent, total)
152
- break # next outer loop chunk (from sent)
153
-
154
- # Success: final response JSON
155
- if r.status_code in (200, 201):
156
- j = r.json() if r.text else {}
157
- if progress_cb:
158
- await progress_cb(total, total)
159
- return j
160
-
161
- # Retryable server responses
162
- if r.status_code in retryable and attempt < max_retries:
163
- attempt += 1
164
- # Before retrying, query status so we don't duplicate wrong chunk
165
- last, done_json = await _query_upload_status(c, upload_url, headers_base, total)
166
- if done_json is not None:
167
  if progress_cb:
168
- await progress_cb(total, total)
169
- return done_json
170
- sent = max(sent, last + 1)
171
- if progress_cb:
172
- await progress_cb(sent, total)
173
- await asyncio.sleep(min(2 ** attempt, 20))
174
- # break to outer loop (rebuild chunk from new sent)
175
- break
176
-
177
- # Non-retryable error
178
- if r.status_code >= 400:
179
- raise RuntimeError(f"resumable_put_failed:{r.status_code}:{r.text[:200]}")
 
 
 
180
 
181
- # Unexpected status: treat like error
182
- raise RuntimeError(f"resumable_put_unexpected:{r.status_code}:{r.text[:200]}")
 
 
 
183
 
184
- # If loop ended, do a final status check instead of blindly erroring
185
- last, done_json = await _query_upload_status(c, upload_url, headers_base, total)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
186
  if done_json is not None:
187
  if progress_cb:
188
  await progress_cb(total, total)
189
- return done_json
190
 
191
  if last >= total - 1:
192
- # Server claims full data but didn't return JSON (rare), try one more status query
193
- last2, done_json2 = await _query_upload_status(c, upload_url, headers_base, total)
194
  if done_json2 is not None:
195
  if progress_cb:
196
  await progress_cb(total, total)
197
- return done_json2
198
 
199
- raise RuntimeError("resumable_upload_incomplete")
 
 
 
1
  # PATH: bot/youtube/resume.py
2
+ from __future__ import annotations
3
+
4
+ import asyncio
5
  import os
6
  import re
7
+ from typing import Awaitable, Callable, Optional, Tuple
 
8
 
9
  import httpx
10
 
 
15
  YOUTUBE_RESUMABLE_INIT = "https://www.googleapis.com/upload/youtube/v3/videos"
16
 
17
 
18
+ def _parse_range_last_byte(headers: dict) -> Optional[int]:
19
  """
20
  Range header examples:
21
  - "bytes=0-1048575"
 
29
  return int(m.group(2))
30
 
31
 
32
+ async def start_resumable_session(
33
+ *,
34
+ access_token: str,
35
+ metadata: dict,
36
+ total_bytes: int = 0,
37
+ ) -> dict:
38
  """
39
+ Starts a YouTube resumable upload session.
40
+
41
+ Returns:
42
+ {"ok": True, "session_url": "..."}
43
+ {"ok": False, "err": "...", "detail": "..."}
44
  """
45
+ if not access_token:
46
+ return {"ok": False, "err": "missing_access_token"}
47
+
48
  headers = {
49
  "Authorization": f"Bearer {access_token}",
50
  "Content-Type": "application/json; charset=UTF-8",
51
  "X-Upload-Content-Type": "video/*",
52
  }
53
+ # Helps YouTube know size (recommended)
54
+ if isinstance(total_bytes, int) and total_bytes > 0:
55
+ headers["X-Upload-Content-Length"] = str(total_bytes)
56
+
57
  params = {"uploadType": "resumable", "part": "snippet,status"}
58
+
59
+ timeout = httpx.Timeout(connect=30.0, read=60.0, write=60.0, pool=30.0)
60
+
61
+ try:
62
+ async with httpx.AsyncClient(timeout=timeout, follow_redirects=True) as c:
63
+ r = await c.post(YOUTUBE_RESUMABLE_INIT, params=params, headers=headers, json=metadata)
64
+ if r.status_code >= 400:
65
+ return {
66
+ "ok": False,
67
+ "err": f"resumable_init_failed:{r.status_code}",
68
+ "detail": (r.text or "")[:600],
69
+ }
70
+ upload_url = r.headers.get("Location") or r.headers.get("location")
71
+ if not upload_url:
72
+ return {"ok": False, "err": "resumable_init_no_location", "detail": "Missing Location header"}
73
+ return {"ok": True, "session_url": upload_url}
74
+ except Exception as e:
75
+ return {"ok": False, "err": f"resumable_init_exception:{type(e).__name__}", "detail": str(e)[:600]}
76
 
77
 
78
  async def _query_upload_status(
 
80
  upload_url: str,
81
  headers_base: dict,
82
  total: int,
83
+ ) -> Tuple[int, Optional[dict]]:
84
  """
85
  Asks server what it has already received.
86
  Returns (last_received_byte, final_json_if_completed)
 
102
  return total - 1, j
103
 
104
  if r.status_code >= 400:
105
+ raise RuntimeError(f"resumable_status_failed:{r.status_code}:{(r.text or '')[:200]}")
106
 
107
  return -1, None
108
 
109
 
110
  async def upload_resumable(
111
+ *,
112
+ session_url: str,
113
  file_path: str,
114
  access_token: str,
115
+ total_bytes: int = 0,
116
+ progress_cb: Optional[ProgressCB] = None,
117
  ) -> dict:
118
  """
119
+ Upload file in chunks to session_url.
120
+
121
+ Returns:
122
+ {"ok": True, "response": {...}}
123
+ {"ok": False, "err": "...", "detail": "..."}
124
  """
125
+ if not session_url:
126
+ return {"ok": False, "err": "missing_session_url"}
127
+ if not access_token:
128
+ return {"ok": False, "err": "missing_access_token"}
129
+ if not file_path or not os.path.exists(file_path):
130
+ return {"ok": False, "err": "file_not_found"}
131
+
132
+ try:
133
+ total = int(total_bytes) if total_bytes else int(os.path.getsize(file_path))
134
+ except Exception:
135
+ total = 0
136
+
137
+ if total <= 0:
138
+ return {"ok": False, "err": "bad_total_bytes"}
139
+
140
  sent = 0
141
 
142
  headers_base = {
 
149
 
150
  timeout = httpx.Timeout(connect=30.0, read=300.0, write=300.0, pool=30.0)
151
 
152
+ try:
153
+ async with httpx.AsyncClient(timeout=timeout, follow_redirects=True) as c:
154
+ with open(file_path, "rb") as f:
155
+ while sent < total:
156
+ f.seek(sent)
157
+
158
+ to_read = min(YOUTUBE_CHUNK_SIZE, total - sent)
159
+ chunk = f.read(to_read)
160
+ if not chunk:
161
+ break
162
+
163
+ start = sent
164
+ end = start + len(chunk) - 1
165
+
166
+ headers = dict(headers_base)
167
+ headers["Content-Length"] = str(len(chunk))
168
+ headers["Content-Range"] = f"bytes {start}-{end}/{total}"
169
+
170
+ attempt = 0
171
+ while True:
172
+ try:
173
+ r = await c.put(session_url, headers=headers, content=chunk)
174
+ except (httpx.TimeoutException, httpx.NetworkError):
175
+ # Ask server status, then resume
176
+ last, done_json = await _query_upload_status(c, session_url, headers_base, total)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
177
  if done_json is not None:
178
  if progress_cb:
179
  await progress_cb(total, total)
180
+ return {"ok": True, "response": done_json}
181
+ sent = max(sent, last + 1)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
182
  if progress_cb:
183
+ await progress_cb(sent, total)
184
+ break # next outer loop
185
+
186
+ if r.status_code == 308:
187
+ last = _parse_range_last_byte(r.headers)
188
+ if last is None:
189
+ last, done_json = await _query_upload_status(c, session_url, headers_base, total)
190
+ if done_json is not None:
191
+ if progress_cb:
192
+ await progress_cb(total, total)
193
+ return {"ok": True, "response": done_json}
194
+ sent = max(sent, (last or -1) + 1)
195
+ if progress_cb:
196
+ await progress_cb(sent, total)
197
+ break
198
 
199
+ if r.status_code in (200, 201):
200
+ j = r.json() if r.text else {}
201
+ if progress_cb:
202
+ await progress_cb(total, total)
203
+ return {"ok": True, "response": j}
204
 
205
+ if r.status_code in retryable and attempt < max_retries:
206
+ attempt += 1
207
+ last, done_json = await _query_upload_status(c, session_url, headers_base, total)
208
+ if done_json is not None:
209
+ if progress_cb:
210
+ await progress_cb(total, total)
211
+ return {"ok": True, "response": done_json}
212
+ sent = max(sent, last + 1)
213
+ if progress_cb:
214
+ await progress_cb(sent, total)
215
+ await asyncio.sleep(min(2 ** attempt, 20))
216
+ break
217
+
218
+ if r.status_code >= 400:
219
+ return {
220
+ "ok": False,
221
+ "err": f"resumable_put_failed:{r.status_code}",
222
+ "detail": (r.text or "")[:600],
223
+ }
224
+
225
+ return {
226
+ "ok": False,
227
+ "err": f"resumable_put_unexpected:{r.status_code}",
228
+ "detail": (r.text or "")[:600],
229
+ }
230
+
231
+ # Final status check
232
+ last, done_json = await _query_upload_status(c, session_url, headers_base, total)
233
  if done_json is not None:
234
  if progress_cb:
235
  await progress_cb(total, total)
236
+ return {"ok": True, "response": done_json}
237
 
238
  if last >= total - 1:
239
+ last2, done_json2 = await _query_upload_status(c, session_url, headers_base, total)
 
240
  if done_json2 is not None:
241
  if progress_cb:
242
  await progress_cb(total, total)
243
+ return {"ok": True, "response": done_json2}
244
 
245
+ return {"ok": False, "err": "resumable_upload_incomplete"}
246
+ except Exception as e:
247
+ return {"ok": False, "err": f"resumable_exception:{type(e).__name__}", "detail": str(e)[:600]}