understanding commited on
Commit
2e08224
·
verified ·
1 Parent(s): 5fdf621

Update bot/youtube/resume.py

Browse files
Files changed (1) hide show
  1. bot/youtube/resume.py +34 -41
bot/youtube/resume.py CHANGED
@@ -4,7 +4,7 @@ from __future__ import annotations
4
  import asyncio
5
  import os
6
  import re
7
- from typing import Awaitable, Callable, Optional, Tuple
8
 
9
  import httpx
10
 
@@ -15,7 +15,7 @@ ProgressCB = Callable[[int, int], Awaitable[None]]
15
  YOUTUBE_RESUMABLE_INIT = "https://www.googleapis.com/upload/youtube/v3/videos"
16
 
17
 
18
- def _parse_range_last_byte(headers: dict) -> Optional[int]:
19
  """
20
  Range header examples:
21
  - "bytes=0-1048575"
@@ -34,10 +34,8 @@ async def start_resumable_session(
34
  access_token: str,
35
  metadata: dict,
36
  total_bytes: int = 0,
37
- ) -> dict:
38
  """
39
- Starts a YouTube resumable upload session.
40
-
41
  Returns:
42
  {"ok": True, "session_url": "..."}
43
  {"ok": False, "err": "...", "detail": "..."}
@@ -50,29 +48,27 @@ async def start_resumable_session(
50
  "Content-Type": "application/json; charset=UTF-8",
51
  "X-Upload-Content-Type": "video/*",
52
  }
53
- # Helps YouTube know size (recommended)
54
- if isinstance(total_bytes, int) and total_bytes > 0:
55
- headers["X-Upload-Content-Length"] = str(total_bytes)
56
 
57
  params = {"uploadType": "resumable", "part": "snippet,status"}
58
 
59
- timeout = httpx.Timeout(connect=30.0, read=60.0, write=60.0, pool=30.0)
60
-
61
  try:
62
- async with httpx.AsyncClient(timeout=timeout, follow_redirects=True) as c:
63
  r = await c.post(YOUTUBE_RESUMABLE_INIT, params=params, headers=headers, json=metadata)
64
  if r.status_code >= 400:
65
  return {
66
  "ok": False,
67
- "err": f"resumable_init_failed:{r.status_code}",
68
- "detail": (r.text or "")[:600],
69
  }
70
  upload_url = r.headers.get("Location") or r.headers.get("location")
71
  if not upload_url:
72
- return {"ok": False, "err": "resumable_init_no_location", "detail": "Missing Location header"}
73
- return {"ok": True, "session_url": upload_url}
74
  except Exception as e:
75
- return {"ok": False, "err": f"resumable_init_exception:{type(e).__name__}", "detail": str(e)[:600]}
76
 
77
 
78
  async def _query_upload_status(
@@ -80,7 +76,7 @@ async def _query_upload_status(
80
  upload_url: str,
81
  headers_base: dict,
82
  total: int,
83
- ) -> Tuple[int, Optional[dict]]:
84
  """
85
  Asks server what it has already received.
86
  Returns (last_received_byte, final_json_if_completed)
@@ -102,7 +98,7 @@ async def _query_upload_status(
102
  return total - 1, j
103
 
104
  if r.status_code >= 400:
105
- raise RuntimeError(f"resumable_status_failed:{r.status_code}:{(r.text or '')[:200]}")
106
 
107
  return -1, None
108
 
@@ -113,8 +109,8 @@ async def upload_resumable(
113
  file_path: str,
114
  access_token: str,
115
  total_bytes: int = 0,
116
- progress_cb: Optional[ProgressCB] = None,
117
- ) -> dict:
118
  """
119
  Upload file in chunks to session_url.
120
 
@@ -122,10 +118,10 @@ async def upload_resumable(
122
  {"ok": True, "response": {...}}
123
  {"ok": False, "err": "...", "detail": "..."}
124
  """
125
- if not session_url:
126
- return {"ok": False, "err": "missing_session_url"}
127
  if not access_token:
128
  return {"ok": False, "err": "missing_access_token"}
 
 
129
  if not file_path or not os.path.exists(file_path):
130
  return {"ok": False, "err": "file_not_found"}
131
 
@@ -172,7 +168,7 @@ async def upload_resumable(
172
  try:
173
  r = await c.put(session_url, headers=headers, content=chunk)
174
  except (httpx.TimeoutException, httpx.NetworkError):
175
- # Ask server status, then resume
176
  last, done_json = await _query_upload_status(c, session_url, headers_base, total)
177
  if done_json is not None:
178
  if progress_cb:
@@ -181,8 +177,9 @@ async def upload_resumable(
181
  sent = max(sent, last + 1)
182
  if progress_cb:
183
  await progress_cb(sent, total)
184
- break # next outer loop
185
 
 
186
  if r.status_code == 308:
187
  last = _parse_range_last_byte(r.headers)
188
  if last is None:
@@ -191,17 +188,19 @@ async def upload_resumable(
191
  if progress_cb:
192
  await progress_cb(total, total)
193
  return {"ok": True, "response": done_json}
194
- sent = max(sent, (last or -1) + 1)
195
  if progress_cb:
196
  await progress_cb(sent, total)
197
  break
198
 
 
199
  if r.status_code in (200, 201):
200
  j = r.json() if r.text else {}
201
  if progress_cb:
202
  await progress_cb(total, total)
203
  return {"ok": True, "response": j}
204
 
 
205
  if r.status_code in retryable and attempt < max_retries:
206
  attempt += 1
207
  last, done_json = await _query_upload_status(c, session_url, headers_base, total)
@@ -215,33 +214,27 @@ async def upload_resumable(
215
  await asyncio.sleep(min(2 ** attempt, 20))
216
  break
217
 
 
218
  if r.status_code >= 400:
219
  return {
220
  "ok": False,
221
- "err": f"resumable_put_failed:{r.status_code}",
222
- "detail": (r.text or "")[:600],
223
  }
224
 
225
  return {
226
  "ok": False,
227
- "err": f"resumable_put_unexpected:{r.status_code}",
228
- "detail": (r.text or "")[:600],
229
  }
230
 
231
- # Final status check
232
- last, done_json = await _query_upload_status(c, session_url, headers_base, total)
233
- if done_json is not None:
234
- if progress_cb:
235
- await progress_cb(total, total)
236
- return {"ok": True, "response": done_json}
237
-
238
- if last >= total - 1:
239
- last2, done_json2 = await _query_upload_status(c, session_url, headers_base, total)
240
- if done_json2 is not None:
241
  if progress_cb:
242
  await progress_cb(total, total)
243
- return {"ok": True, "response": done_json2}
244
 
245
  return {"ok": False, "err": "resumable_upload_incomplete"}
246
  except Exception as e:
247
- return {"ok": False, "err": f"resumable_exception:{type(e).__name__}", "detail": str(e)[:600]}
 
4
  import asyncio
5
  import os
6
  import re
7
+ from typing import Any, Awaitable, Callable, Dict, Optional, Tuple
8
 
9
  import httpx
10
 
 
15
  YOUTUBE_RESUMABLE_INIT = "https://www.googleapis.com/upload/youtube/v3/videos"
16
 
17
 
18
+ def _parse_range_last_byte(headers: dict) -> int | None:
19
  """
20
  Range header examples:
21
  - "bytes=0-1048575"
 
34
  access_token: str,
35
  metadata: dict,
36
  total_bytes: int = 0,
37
+ ) -> Dict[str, Any]:
38
  """
 
 
39
  Returns:
40
  {"ok": True, "session_url": "..."}
41
  {"ok": False, "err": "...", "detail": "..."}
 
48
  "Content-Type": "application/json; charset=UTF-8",
49
  "X-Upload-Content-Type": "video/*",
50
  }
51
+ # Optional but helpful
52
+ if total_bytes and total_bytes > 0:
53
+ headers["X-Upload-Content-Length"] = str(int(total_bytes))
54
 
55
  params = {"uploadType": "resumable", "part": "snippet,status"}
56
 
 
 
57
  try:
58
+ async with httpx.AsyncClient(timeout=60, follow_redirects=True) as c:
59
  r = await c.post(YOUTUBE_RESUMABLE_INIT, params=params, headers=headers, json=metadata)
60
  if r.status_code >= 400:
61
  return {
62
  "ok": False,
63
+ "err": "resumable_init_failed",
64
+ "detail": f"{r.status_code}:{(r.text or '')[:400]}",
65
  }
66
  upload_url = r.headers.get("Location") or r.headers.get("location")
67
  if not upload_url:
68
+ return {"ok": False, "err": "resumable_init_no_location"}
69
+ return {"ok": True, "session_url": str(upload_url)}
70
  except Exception as e:
71
+ return {"ok": False, "err": "resumable_init_exception", "detail": f"{type(e).__name__}:{e}"}
72
 
73
 
74
  async def _query_upload_status(
 
76
  upload_url: str,
77
  headers_base: dict,
78
  total: int,
79
+ ) -> tuple[int, dict | None]:
80
  """
81
  Asks server what it has already received.
82
  Returns (last_received_byte, final_json_if_completed)
 
98
  return total - 1, j
99
 
100
  if r.status_code >= 400:
101
+ raise RuntimeError(f"resumable_status_failed:{r.status_code}:{r.text[:200]}")
102
 
103
  return -1, None
104
 
 
109
  file_path: str,
110
  access_token: str,
111
  total_bytes: int = 0,
112
+ progress_cb: ProgressCB | None = None,
113
+ ) -> Dict[str, Any]:
114
  """
115
  Upload file in chunks to session_url.
116
 
 
118
  {"ok": True, "response": {...}}
119
  {"ok": False, "err": "...", "detail": "..."}
120
  """
 
 
121
  if not access_token:
122
  return {"ok": False, "err": "missing_access_token"}
123
+ if not session_url:
124
+ return {"ok": False, "err": "missing_session_url"}
125
  if not file_path or not os.path.exists(file_path):
126
  return {"ok": False, "err": "file_not_found"}
127
 
 
168
  try:
169
  r = await c.put(session_url, headers=headers, content=chunk)
170
  except (httpx.TimeoutException, httpx.NetworkError):
171
+ # Network issue: ask server what it has, then resume
172
  last, done_json = await _query_upload_status(c, session_url, headers_base, total)
173
  if done_json is not None:
174
  if progress_cb:
 
177
  sent = max(sent, last + 1)
178
  if progress_cb:
179
  await progress_cb(sent, total)
180
+ break # next outer loop iteration
181
 
182
+ # 308 means "resume incomplete" - read Range
183
  if r.status_code == 308:
184
  last = _parse_range_last_byte(r.headers)
185
  if last is None:
 
188
  if progress_cb:
189
  await progress_cb(total, total)
190
  return {"ok": True, "response": done_json}
191
+ sent = max(sent, last + 1)
192
  if progress_cb:
193
  await progress_cb(sent, total)
194
  break
195
 
196
+ # Success
197
  if r.status_code in (200, 201):
198
  j = r.json() if r.text else {}
199
  if progress_cb:
200
  await progress_cb(total, total)
201
  return {"ok": True, "response": j}
202
 
203
+ # Retryable
204
  if r.status_code in retryable and attempt < max_retries:
205
  attempt += 1
206
  last, done_json = await _query_upload_status(c, session_url, headers_base, total)
 
214
  await asyncio.sleep(min(2 ** attempt, 20))
215
  break
216
 
217
+ # Non-retryable error
218
  if r.status_code >= 400:
219
  return {
220
  "ok": False,
221
+ "err": "resumable_put_failed",
222
+ "detail": f"{r.status_code}:{(r.text or '')[:400]}",
223
  }
224
 
225
  return {
226
  "ok": False,
227
+ "err": "resumable_put_unexpected",
228
+ "detail": f"{r.status_code}:{(r.text or '')[:200]}",
229
  }
230
 
231
+ # final status check
232
+ last, done_json = await _query_upload_status(c, session_url, headers_base, total)
233
+ if done_json is not None:
 
 
 
 
 
 
 
234
  if progress_cb:
235
  await progress_cb(total, total)
236
+ return {"ok": True, "response": done_json}
237
 
238
  return {"ok": False, "err": "resumable_upload_incomplete"}
239
  except Exception as e:
240
+ return {"ok": False, "err": "resumable_exception", "detail": f"{type(e).__name__}:{e}"}