understanding commited on
Commit
20955c4
·
verified ·
1 Parent(s): 61b7300

Update bot/youtube/resume.py

Browse files
Files changed (1) hide show
  1. bot/youtube/resume.py +74 -57
bot/youtube/resume.py CHANGED
@@ -1,10 +1,8 @@
1
  # PATH: bot/youtube/resume.py
2
- from __future__ import annotations
3
-
4
- import asyncio
5
  import os
6
  import re
7
- from typing import Any, Awaitable, Callable, Dict, Optional, Tuple
 
8
 
9
  import httpx
10
 
@@ -15,7 +13,7 @@ ProgressCB = Callable[[int, int], Awaitable[None]]
15
  YOUTUBE_RESUMABLE_INIT = "https://www.googleapis.com/upload/youtube/v3/videos"
16
 
17
 
18
- def _parse_range_last_byte(headers: dict) -> int | None:
19
  """
20
  Range header examples:
21
  - "bytes=0-1048575"
@@ -26,32 +24,38 @@ def _parse_range_last_byte(headers: dict) -> int | None:
26
  m = re.search(r"(\d+)\s*-\s*(\d+)", v)
27
  if not m:
28
  return None
29
- return int(m.group(2))
 
 
 
30
 
31
 
32
  async def start_resumable_session(
33
  *,
34
  access_token: str,
35
  metadata: dict,
36
- total_bytes: int = 0,
37
  ) -> Dict[str, Any]:
38
  """
 
39
  Returns:
40
  {"ok": True, "session_url": "..."}
41
  {"ok": False, "err": "...", "detail": "..."}
42
  """
43
- if not access_token:
44
- return {"ok": False, "err": "missing_access_token"}
 
 
 
 
 
45
 
46
  headers = {
47
  "Authorization": f"Bearer {access_token}",
48
  "Content-Type": "application/json; charset=UTF-8",
49
  "X-Upload-Content-Type": "video/*",
 
50
  }
51
- # Optional but helpful
52
- if total_bytes and total_bytes > 0:
53
- headers["X-Upload-Content-Length"] = str(int(total_bytes))
54
-
55
  params = {"uploadType": "resumable", "part": "snippet,status"}
56
 
57
  try:
@@ -63,12 +67,15 @@ async def start_resumable_session(
63
  "err": "resumable_init_failed",
64
  "detail": f"{r.status_code}:{(r.text or '')[:400]}",
65
  }
66
- upload_url = r.headers.get("Location") or r.headers.get("location")
 
 
67
  if not upload_url:
68
  return {"ok": False, "err": "resumable_init_no_location"}
69
- return {"ok": True, "session_url": str(upload_url)}
 
70
  except Exception as e:
71
- return {"ok": False, "err": "resumable_init_exception", "detail": f"{type(e).__name__}:{e}"}
72
 
73
 
74
  async def _query_upload_status(
@@ -76,10 +83,10 @@ async def _query_upload_status(
76
  upload_url: str,
77
  headers_base: dict,
78
  total: int,
79
- ) -> tuple[int, dict | None]:
80
  """
81
  Asks server what it has already received.
82
- Returns (last_received_byte, final_json_if_completed)
83
  """
84
  headers = dict(headers_base)
85
  headers["Content-Length"] = "0"
@@ -90,17 +97,20 @@ async def _query_upload_status(
90
  if r.status_code == 308:
91
  last = _parse_range_last_byte(r.headers)
92
  if last is None:
93
- return -1, None
94
- return last, None
95
 
96
  if r.status_code in (200, 201):
97
- j = r.json() if r.text else {}
98
- return total - 1, j
 
 
 
99
 
100
  if r.status_code >= 400:
101
- raise RuntimeError(f"resumable_status_failed:{r.status_code}:{r.text[:200]}")
102
 
103
- return -1, None
104
 
105
 
106
  async def upload_resumable(
@@ -108,30 +118,30 @@ async def upload_resumable(
108
  session_url: str,
109
  file_path: str,
110
  access_token: str,
111
- total_bytes: int = 0,
112
- progress_cb: ProgressCB | None = None,
113
  ) -> Dict[str, Any]:
114
  """
115
  Upload file in chunks to session_url.
116
-
117
  Returns:
118
  {"ok": True, "response": {...}}
119
  {"ok": False, "err": "...", "detail": "..."}
120
  """
121
- if not access_token:
122
- return {"ok": False, "err": "missing_access_token"}
123
- if not session_url:
124
- return {"ok": False, "err": "missing_session_url"}
125
  if not file_path or not os.path.exists(file_path):
126
  return {"ok": False, "err": "file_not_found"}
127
 
128
  try:
129
- total = int(total_bytes) if total_bytes else int(os.path.getsize(file_path))
 
 
 
 
 
130
  except Exception:
131
- total = 0
132
 
133
  if total <= 0:
134
- return {"ok": False, "err": "bad_total_bytes"}
135
 
136
  sent = 0
137
 
@@ -142,7 +152,6 @@ async def upload_resumable(
142
 
143
  retryable = {429, 500, 502, 503, 504}
144
  max_retries = 8
145
-
146
  timeout = httpx.Timeout(connect=30.0, read=300.0, write=300.0, pool=30.0)
147
 
148
  try:
@@ -168,53 +177,58 @@ async def upload_resumable(
168
  try:
169
  r = await c.put(session_url, headers=headers, content=chunk)
170
  except (httpx.TimeoutException, httpx.NetworkError):
171
- # Network issue: ask server what it has, then resume
172
- last, done_json = await _query_upload_status(c, session_url, headers_base, total)
173
  if done_json is not None:
174
  if progress_cb:
175
  await progress_cb(total, total)
176
  return {"ok": True, "response": done_json}
177
- sent = max(sent, last + 1)
 
 
178
  if progress_cb:
179
  await progress_cb(sent, total)
180
- break # next outer loop iteration
181
 
182
- # 308 means "resume incomplete" - read Range
183
  if r.status_code == 308:
184
  last = _parse_range_last_byte(r.headers)
185
  if last is None:
186
- last, done_json = await _query_upload_status(c, session_url, headers_base, total)
187
  if done_json is not None:
188
  if progress_cb:
189
  await progress_cb(total, total)
190
  return {"ok": True, "response": done_json}
191
- sent = max(sent, last + 1)
 
192
  if progress_cb:
193
  await progress_cb(sent, total)
194
  break
195
 
196
- # Success
197
  if r.status_code in (200, 201):
198
- j = r.json() if r.text else {}
 
 
 
199
  if progress_cb:
200
  await progress_cb(total, total)
201
  return {"ok": True, "response": j}
202
 
203
- # Retryable
204
  if r.status_code in retryable and attempt < max_retries:
205
  attempt += 1
206
- last, done_json = await _query_upload_status(c, session_url, headers_base, total)
207
  if done_json is not None:
208
  if progress_cb:
209
  await progress_cb(total, total)
210
  return {"ok": True, "response": done_json}
211
- sent = max(sent, last + 1)
 
 
212
  if progress_cb:
213
  await progress_cb(sent, total)
214
- await asyncio.sleep(min(2 ** attempt, 20))
215
- break
216
 
217
- # Non-retryable error
 
 
218
  if r.status_code >= 400:
219
  return {
220
  "ok": False,
@@ -222,19 +236,22 @@ async def upload_resumable(
222
  "detail": f"{r.status_code}:{(r.text or '')[:400]}",
223
  }
224
 
225
- return {
226
- "ok": False,
227
- "err": "resumable_put_unexpected",
228
- "detail": f"{r.status_code}:{(r.text or '')[:200]}",
229
- }
230
 
231
  # final status check
232
- last, done_json = await _query_upload_status(c, session_url, headers_base, total)
233
  if done_json is not None:
234
  if progress_cb:
235
  await progress_cb(total, total)
236
  return {"ok": True, "response": done_json}
237
 
 
 
 
 
 
 
 
238
  return {"ok": False, "err": "resumable_upload_incomplete"}
239
  except Exception as e:
240
- return {"ok": False, "err": "resumable_exception", "detail": f"{type(e).__name__}:{e}"}
 
1
  # PATH: bot/youtube/resume.py
 
 
 
2
  import os
3
  import re
4
+ import asyncio
5
+ from typing import Callable, Awaitable, Optional, Tuple, Dict, Any
6
 
7
  import httpx
8
 
 
13
  YOUTUBE_RESUMABLE_INIT = "https://www.googleapis.com/upload/youtube/v3/videos"
14
 
15
 
16
+ def _parse_range_last_byte(headers: dict) -> Optional[int]:
17
  """
18
  Range header examples:
19
  - "bytes=0-1048575"
 
24
  m = re.search(r"(\d+)\s*-\s*(\d+)", v)
25
  if not m:
26
  return None
27
+ try:
28
+ return int(m.group(2))
29
+ except Exception:
30
+ return None
31
 
32
 
33
  async def start_resumable_session(
34
  *,
35
  access_token: str,
36
  metadata: dict,
37
+ total_bytes: int,
38
  ) -> Dict[str, Any]:
39
  """
40
+ Start YouTube resumable session.
41
  Returns:
42
  {"ok": True, "session_url": "..."}
43
  {"ok": False, "err": "...", "detail": "..."}
44
  """
45
+ try:
46
+ tb = int(total_bytes)
47
+ except Exception:
48
+ tb = 0
49
+
50
+ if tb <= 0:
51
+ return {"ok": False, "err": "bad_total_bytes", "detail": f"total_bytes={total_bytes}"}
52
 
53
  headers = {
54
  "Authorization": f"Bearer {access_token}",
55
  "Content-Type": "application/json; charset=UTF-8",
56
  "X-Upload-Content-Type": "video/*",
57
+ "X-Upload-Content-Length": str(tb),
58
  }
 
 
 
 
59
  params = {"uploadType": "resumable", "part": "snippet,status"}
60
 
61
  try:
 
67
  "err": "resumable_init_failed",
68
  "detail": f"{r.status_code}:{(r.text or '')[:400]}",
69
  }
70
+
71
+ upload_url = r.headers.get("Location") or r.headers.get("location") or ""
72
+ upload_url = str(upload_url).strip()
73
  if not upload_url:
74
  return {"ok": False, "err": "resumable_init_no_location"}
75
+
76
+ return {"ok": True, "session_url": upload_url}
77
  except Exception as e:
78
+ return {"ok": False, "err": "resumable_init_exception", "detail": f"{type(e).__name__}:{str(e)[:400]}"}
79
 
80
 
81
  async def _query_upload_status(
 
83
  upload_url: str,
84
  headers_base: dict,
85
  total: int,
86
+ ) -> Tuple[int, Optional[dict], Optional[str]]:
87
  """
88
  Asks server what it has already received.
89
+ Returns (last_received_byte, final_json_if_completed, err_detail_if_error)
90
  """
91
  headers = dict(headers_base)
92
  headers["Content-Length"] = "0"
 
97
  if r.status_code == 308:
98
  last = _parse_range_last_byte(r.headers)
99
  if last is None:
100
+ return -1, None, None
101
+ return last, None, None
102
 
103
  if r.status_code in (200, 201):
104
+ try:
105
+ j = r.json() if r.text else {}
106
+ except Exception:
107
+ j = {}
108
+ return total - 1, j, None
109
 
110
  if r.status_code >= 400:
111
+ return -1, None, f"{r.status_code}:{(r.text or '')[:250]}"
112
 
113
+ return -1, None, None
114
 
115
 
116
  async def upload_resumable(
 
118
  session_url: str,
119
  file_path: str,
120
  access_token: str,
121
+ total_bytes: int,
122
+ progress_cb: Optional[ProgressCB] = None,
123
  ) -> Dict[str, Any]:
124
  """
125
  Upload file in chunks to session_url.
 
126
  Returns:
127
  {"ok": True, "response": {...}}
128
  {"ok": False, "err": "...", "detail": "..."}
129
  """
 
 
 
 
130
  if not file_path or not os.path.exists(file_path):
131
  return {"ok": False, "err": "file_not_found"}
132
 
133
  try:
134
+ total_fs = int(os.path.getsize(file_path))
135
+ except Exception:
136
+ total_fs = 0
137
+
138
+ try:
139
+ total = int(total_bytes or total_fs)
140
  except Exception:
141
+ total = total_fs
142
 
143
  if total <= 0:
144
+ return {"ok": False, "err": "bad_total_bytes", "detail": f"fs={total_fs} arg={total_bytes}"}
145
 
146
  sent = 0
147
 
 
152
 
153
  retryable = {429, 500, 502, 503, 504}
154
  max_retries = 8
 
155
  timeout = httpx.Timeout(connect=30.0, read=300.0, write=300.0, pool=30.0)
156
 
157
  try:
 
177
  try:
178
  r = await c.put(session_url, headers=headers, content=chunk)
179
  except (httpx.TimeoutException, httpx.NetworkError):
180
+ # network issue => ask status and resume
181
+ last, done_json, errd = await _query_upload_status(c, session_url, headers_base, total)
182
  if done_json is not None:
183
  if progress_cb:
184
  await progress_cb(total, total)
185
  return {"ok": True, "response": done_json}
186
+
187
+ if last >= 0:
188
+ sent = max(sent, last + 1)
189
  if progress_cb:
190
  await progress_cb(sent, total)
191
+ break
192
 
 
193
  if r.status_code == 308:
194
  last = _parse_range_last_byte(r.headers)
195
  if last is None:
196
+ last, done_json, errd = await _query_upload_status(c, session_url, headers_base, total)
197
  if done_json is not None:
198
  if progress_cb:
199
  await progress_cb(total, total)
200
  return {"ok": True, "response": done_json}
201
+ if last is not None and last >= 0:
202
+ sent = max(sent, last + 1)
203
  if progress_cb:
204
  await progress_cb(sent, total)
205
  break
206
 
 
207
  if r.status_code in (200, 201):
208
+ try:
209
+ j = r.json() if r.text else {}
210
+ except Exception:
211
+ j = {}
212
  if progress_cb:
213
  await progress_cb(total, total)
214
  return {"ok": True, "response": j}
215
 
 
216
  if r.status_code in retryable and attempt < max_retries:
217
  attempt += 1
218
+ last, done_json, errd = await _query_upload_status(c, session_url, headers_base, total)
219
  if done_json is not None:
220
  if progress_cb:
221
  await progress_cb(total, total)
222
  return {"ok": True, "response": done_json}
223
+
224
+ if last >= 0:
225
+ sent = max(sent, last + 1)
226
  if progress_cb:
227
  await progress_cb(sent, total)
 
 
228
 
229
+ await asyncio.sleep(min(2**attempt, 20))
230
+ break # rebuild chunk from new sent
231
+
232
  if r.status_code >= 400:
233
  return {
234
  "ok": False,
 
236
  "detail": f"{r.status_code}:{(r.text or '')[:400]}",
237
  }
238
 
239
+ return {"ok": False, "err": "resumable_put_unexpected", "detail": str(r.status_code)}
 
 
 
 
240
 
241
  # final status check
242
+ last, done_json, errd = await _query_upload_status(c, session_url, headers_base, total)
243
  if done_json is not None:
244
  if progress_cb:
245
  await progress_cb(total, total)
246
  return {"ok": True, "response": done_json}
247
 
248
+ if last >= total - 1:
249
+ last2, done_json2, _ = await _query_upload_status(c, session_url, headers_base, total)
250
+ if done_json2 is not None:
251
+ if progress_cb:
252
+ await progress_cb(total, total)
253
+ return {"ok": True, "response": done_json2}
254
+
255
  return {"ok": False, "err": "resumable_upload_incomplete"}
256
  except Exception as e:
257
+ return {"ok": False, "err": "resumable_exception", "detail": f"{type(e).__name__}:{str(e)[:400]}"}