Samfredoly commited on
Commit
531d32b
·
verified ·
1 Parent(s): a1c6fc0

Create hf_uploader.py

Browse files
Files changed (1) hide show
  1. hf_uploader.py +351 -0
hf_uploader.py ADDED
@@ -0,0 +1,351 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Hugging Face uploader class with rate limiting
3
+ """
4
+ import os
5
+ import asyncio
6
+ from datetime import datetime, timedelta
7
+ from typing import Dict, List, Optional, Tuple
8
+ import aiohttp
9
+ import logging
10
+ from sqlalchemy.orm import Session
11
+ from models import (
12
+ RateLimitLog,
13
+ UploadQueue,
14
+ UploadStatusEnum,
15
+ UploadErrorLog,
16
+ HFConfig,
17
+ )
18
+
19
+ logger = logging.getLogger(__name__)
20
+
21
+
22
+ class RateLimiter:
23
+ """Rate limiter for Hugging Face uploads"""
24
+
25
+ def __init__(self, max_uploads_per_hour: int = 128):
26
+ """
27
+ Initialize rate limiter
28
+
29
+ Args:
30
+ max_uploads_per_hour: Maximum uploads allowed per hour
31
+ """
32
+ self.max_uploads_per_hour = max_uploads_per_hour
33
+
34
+ def get_current_hour_window(self) -> Tuple[datetime, datetime]:
35
+ """Get current hour window (start and end times)"""
36
+ now = datetime.utcnow()
37
+ hour_start = now.replace(minute=0, second=0, microsecond=0)
38
+ hour_end = hour_start + timedelta(hours=1)
39
+ return hour_start, hour_end
40
+
41
+ async def check_rate_limit(self, db: Session) -> Dict:
42
+ """
43
+ Check if we can upload in current hour
44
+
45
+ Args:
46
+ db: Database session
47
+
48
+ Returns:
49
+ Dictionary with canUpload, remainingUploads, and resumeTime
50
+ """
51
+ hour_start, hour_end = self.get_current_hour_window()
52
+
53
+ # Find rate limit log for current hour
54
+ rate_log = (
55
+ db.query(RateLimitLog)
56
+ .filter(RateLimitLog.hour_start == hour_start)
57
+ .first()
58
+ )
59
+
60
+ if not rate_log:
61
+ return {
62
+ "can_upload": True,
63
+ "remaining_uploads": self.max_uploads_per_hour,
64
+ "resume_time": None,
65
+ }
66
+
67
+ if rate_log.limit_hit:
68
+ return {
69
+ "can_upload": False,
70
+ "remaining_uploads": 0,
71
+ "resume_time": rate_log.resume_time,
72
+ }
73
+
74
+ remaining = self.max_uploads_per_hour - rate_log.upload_count
75
+ return {
76
+ "can_upload": remaining > 0,
77
+ "remaining_uploads": max(0, remaining),
78
+ "resume_time": None,
79
+ }
80
+
81
+ async def increment_counter(self, db: Session) -> None:
82
+ """
83
+ Increment upload counter for current hour
84
+
85
+ Args:
86
+ db: Database session
87
+ """
88
+ hour_start, hour_end = self.get_current_hour_window()
89
+
90
+ rate_log = (
91
+ db.query(RateLimitLog)
92
+ .filter(RateLimitLog.hour_start == hour_start)
93
+ .first()
94
+ )
95
+
96
+ if rate_log:
97
+ rate_log.upload_count += 1
98
+ else:
99
+ rate_log = RateLimitLog(
100
+ upload_count=1,
101
+ hour_start=hour_start,
102
+ hour_end=hour_end,
103
+ limit_hit=False,
104
+ )
105
+ db.add(rate_log)
106
+
107
+ db.commit()
108
+
109
+ async def mark_limit_hit(self, db: Session) -> None:
110
+ """
111
+ Mark rate limit as hit for current hour
112
+
113
+ Args:
114
+ db: Database session
115
+ """
116
+ hour_start, hour_end = self.get_current_hour_window()
117
+
118
+ rate_log = (
119
+ db.query(RateLimitLog)
120
+ .filter(RateLimitLog.hour_start == hour_start)
121
+ .first()
122
+ )
123
+
124
+ if rate_log:
125
+ rate_log.limit_hit = True
126
+ rate_log.resume_time = hour_end + timedelta(seconds=1)
127
+ db.commit()
128
+
129
+
130
+ class HFUploader:
131
+ """Object-oriented Hugging Face uploader"""
132
+
133
+ def __init__(self, hf_token: str, target_repo: str):
134
+ """
135
+ Initialize uploader
136
+
137
+ Args:
138
+ hf_token: Hugging Face API token
139
+ target_repo: Target repository ID (e.g., "samfred2/ALL2")
140
+ """
141
+ self.hf_token = hf_token
142
+ self.target_repo = target_repo
143
+ self.rate_limiter = RateLimiter(max_uploads_per_hour=128)
144
+
145
+ async def upload_file(
146
+ self, file_path: str, file_name: str, db: Session
147
+ ) -> Dict:
148
+ """
149
+ Upload a single file to Hugging Face
150
+
151
+ Args:
152
+ file_path: Path to file to upload
153
+ file_name: Name of file in repository
154
+ db: Database session
155
+
156
+ Returns:
157
+ Upload result dictionary
158
+ """
159
+ try:
160
+ # Check if file exists
161
+ if not os.path.exists(file_path):
162
+ return {
163
+ "success": False,
164
+ "file_name": file_name,
165
+ "message": "File not found",
166
+ "retryable": False,
167
+ }
168
+
169
+ # Read file
170
+ with open(file_path, "rb") as f:
171
+ file_content = f.read()
172
+
173
+ # Create multipart upload
174
+ url = f"https://huggingface.co/api/datasets/{self.target_repo}/upload"
175
+
176
+ # Use aiohttp for async upload
177
+ async with aiohttp.ClientSession() as session:
178
+ # Create form data
179
+ data = aiohttp.FormData()
180
+ data.add_field(
181
+ "files",
182
+ file_content,
183
+ filename=file_name,
184
+ content_type="application/json",
185
+ )
186
+
187
+ headers = {"Authorization": f"Bearer {self.hf_token}"}
188
+
189
+ async with session.post(url, data=data, headers=headers) as response:
190
+ # Handle 429 rate limit
191
+ if response.status == 429:
192
+ logger.warning(f"Rate limit hit (429) for {file_name}")
193
+ await self.rate_limiter.mark_limit_hit(db)
194
+ return {
195
+ "success": False,
196
+ "file_name": file_name,
197
+ "message": "Rate limit hit (429). Will retry after 1 hour.",
198
+ "status_code": 429,
199
+ "retryable": True,
200
+ }
201
+
202
+ # Handle other errors
203
+ if response.status != 200:
204
+ error_text = await response.text()
205
+ retryable = (
206
+ response.status >= 500
207
+ or response.status == 408
208
+ or response.status == 429
209
+ )
210
+
211
+ logger.error(
212
+ f"Upload failed for {file_name}: HTTP {response.status}"
213
+ )
214
+
215
+ return {
216
+ "success": False,
217
+ "file_name": file_name,
218
+ "message": f"Upload failed: {response.reason}. {error_text}",
219
+ "status_code": response.status,
220
+ "retryable": retryable,
221
+ }
222
+
223
+ # Success
224
+ await self.rate_limiter.increment_counter(db)
225
+ logger.info(f"Successfully uploaded {file_name}")
226
+
227
+ return {
228
+ "success": True,
229
+ "file_name": file_name,
230
+ "message": "File uploaded successfully",
231
+ }
232
+
233
+ except Exception as e:
234
+ logger.error(f"Upload error for {file_name}: {e}")
235
+ return {
236
+ "success": False,
237
+ "file_name": file_name,
238
+ "message": f"Upload error: {str(e)}",
239
+ "retryable": True,
240
+ }
241
+
242
+ async def upload_files_batch(
243
+ self,
244
+ files: List[Dict],
245
+ db: Session,
246
+ batch_size: int = 10,
247
+ ) -> Dict:
248
+ """
249
+ Upload multiple files with rate limiting
250
+
251
+ Args:
252
+ files: List of dicts with 'id', 'file_name', 'file_path'
253
+ db: Database session
254
+ batch_size: Number of files to process before checking rate limit
255
+
256
+ Returns:
257
+ Upload batch result
258
+ """
259
+ results = []
260
+ successful = 0
261
+ failed = 0
262
+ paused = False
263
+
264
+ for i, file_info in enumerate(files):
265
+ # Check rate limit before each upload
266
+ rate_check = await self.rate_limiter.check_rate_limit(db)
267
+ if not rate_check["can_upload"]:
268
+ logger.info(
269
+ f"Rate limit reached. Pausing uploads. Resume at: {rate_check['resume_time']}"
270
+ )
271
+ paused = True
272
+ break
273
+
274
+ # Upload file
275
+ result = await self.upload_file(
276
+ file_info["file_path"], file_info["file_name"], db
277
+ )
278
+ results.append(result)
279
+
280
+ if result["success"]:
281
+ successful += 1
282
+ # Update queue status
283
+ queue_item = (
284
+ db.query(UploadQueue)
285
+ .filter(UploadQueue.id == file_info["id"])
286
+ .first()
287
+ )
288
+ if queue_item:
289
+ queue_item.status = UploadStatusEnum.COMPLETED
290
+ queue_item.uploaded_at = datetime.utcnow()
291
+ db.commit()
292
+ else:
293
+ failed += 1
294
+ # Update queue status
295
+ queue_item = (
296
+ db.query(UploadQueue)
297
+ .filter(UploadQueue.id == file_info["id"])
298
+ .first()
299
+ )
300
+ if queue_item:
301
+ queue_item.status = UploadStatusEnum.FAILED
302
+ queue_item.failure_reason = result["message"]
303
+ queue_item.retry_count += 1
304
+ db.commit()
305
+
306
+ # Log error
307
+ error_log = UploadErrorLog(
308
+ file_name=file_info["file_name"],
309
+ error_code=str(result.get("status_code")),
310
+ error_message=result["message"],
311
+ status_code=result.get("status_code"),
312
+ retryable=result.get("retryable", True),
313
+ )
314
+ db.add(error_log)
315
+ db.commit()
316
+
317
+ # Rate limiting delay between uploads
318
+ await asyncio.sleep(0.5)
319
+
320
+ return {
321
+ "successful": successful,
322
+ "failed": failed,
323
+ "paused": paused,
324
+ "results": results,
325
+ }
326
+
327
+ async def get_upload_status(self, db: Session) -> Dict:
328
+ """
329
+ Get current upload status
330
+
331
+ Args:
332
+ db: Database session
333
+
334
+ Returns:
335
+ Status dictionary
336
+ """
337
+ rate_check = await self.rate_limiter.check_rate_limit(db)
338
+ config = db.query(HFConfig).first()
339
+
340
+ return {
341
+ "rate_limit": rate_check,
342
+ "config": {
343
+ "max_uploads_per_hour": config.max_uploads_per_hour
344
+ if config
345
+ else 128,
346
+ "upload_batch_size": config.upload_batch_size if config else 10,
347
+ "target_repo": config.target_repo if config else self.target_repo,
348
+ }
349
+ if config
350
+ else None,
351
+ }