File size: 11,032 Bytes
531d32b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
67a1cd2
531d32b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
"""
Hugging Face uploader class with rate limiting
"""
import os
import asyncio
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Tuple
import aiohttp
import logging
from sqlalchemy.orm import Session
from models import (
    RateLimitLog,
    UploadQueue,
    UploadStatusEnum,
    UploadErrorLog,
    HFConfig,
)

logger = logging.getLogger(__name__)


class RateLimiter:
    """Rate limiter for Hugging Face uploads"""

    def __init__(self, max_uploads_per_hour: int = 128):
        """
        Initialize rate limiter

        Args:
            max_uploads_per_hour: Maximum uploads allowed per hour
        """
        self.max_uploads_per_hour = max_uploads_per_hour

    def get_current_hour_window(self) -> Tuple[datetime, datetime]:
        """Get current hour window (start and end times)"""
        now = datetime.utcnow()
        hour_start = now.replace(minute=0, second=0, microsecond=0)
        hour_end = hour_start + timedelta(hours=1)
        return hour_start, hour_end

    async def check_rate_limit(self, db: Session) -> Dict:
        """
        Check if we can upload in current hour

        Args:
            db: Database session

        Returns:
            Dictionary with canUpload, remainingUploads, and resumeTime
        """
        hour_start, hour_end = self.get_current_hour_window()

        # Find rate limit log for current hour
        rate_log = (
            db.query(RateLimitLog)
            .filter(RateLimitLog.hour_start == hour_start)
            .first()
        )

        if not rate_log:
            return {
                "can_upload": True,
                "remaining_uploads": self.max_uploads_per_hour,
                "resume_time": None,
            }

        if rate_log.limit_hit:
            return {
                "can_upload": False,
                "remaining_uploads": 0,
                "resume_time": rate_log.resume_time,
            }

        remaining = self.max_uploads_per_hour - rate_log.upload_count
        return {
            "can_upload": remaining > 0,
            "remaining_uploads": max(0, remaining),
            "resume_time": None,
        }

    async def increment_counter(self, db: Session) -> None:
        """
        Increment upload counter for current hour

        Args:
            db: Database session
        """
        hour_start, hour_end = self.get_current_hour_window()

        rate_log = (
            db.query(RateLimitLog)
            .filter(RateLimitLog.hour_start == hour_start)
            .first()
        )

        if rate_log:
            rate_log.upload_count += 1
        else:
            rate_log = RateLimitLog(
                upload_count=1,
                hour_start=hour_start,
                hour_end=hour_end,
                limit_hit=False,
            )
            db.add(rate_log)

        db.commit()

    async def mark_limit_hit(self, db: Session) -> None:
        """
        Mark rate limit as hit for current hour

        Args:
            db: Database session
        """
        hour_start, hour_end = self.get_current_hour_window()

        rate_log = (
            db.query(RateLimitLog)
            .filter(RateLimitLog.hour_start == hour_start)
            .first()
        )

        if rate_log:
            rate_log.limit_hit = True
            rate_log.resume_time = hour_end + timedelta(seconds=1)
            db.commit()


class HFUploader:
    """Object-oriented Hugging Face uploader"""

    def __init__(self, hf_token: str, target_repo: str):
        """
        Initialize uploader

        Args:
            hf_token: Hugging Face API token
            target_repo: Target repository ID (e.g., "samfred2/ALL2")
        """
        self.hf_token = os.getenv("HF_TOKEN")
        self.target_repo = target_repo
        self.rate_limiter = RateLimiter(max_uploads_per_hour=128)

    async def upload_file(
        self, file_path: str, file_name: str, db: Session
    ) -> Dict:
        """
        Upload a single file to Hugging Face

        Args:
            file_path: Path to file to upload
            file_name: Name of file in repository
            db: Database session

        Returns:
            Upload result dictionary
        """
        try:
            # Check if file exists
            if not os.path.exists(file_path):
                return {
                    "success": False,
                    "file_name": file_name,
                    "message": "File not found",
                    "retryable": False,
                }

            # Read file
            with open(file_path, "rb") as f:
                file_content = f.read()

            # Create multipart upload
            url = f"https://huggingface.co/api/datasets/{self.target_repo}/upload"

            # Use aiohttp for async upload
            async with aiohttp.ClientSession() as session:
                # Create form data
                data = aiohttp.FormData()
                data.add_field(
                    "files",
                    file_content,
                    filename=file_name,
                    content_type="application/json",
                )

                headers = {"Authorization": f"Bearer {self.hf_token}"}

                async with session.post(url, data=data, headers=headers) as response:
                    # Handle 429 rate limit
                    if response.status == 429:
                        logger.warning(f"Rate limit hit (429) for {file_name}")
                        await self.rate_limiter.mark_limit_hit(db)
                        return {
                            "success": False,
                            "file_name": file_name,
                            "message": "Rate limit hit (429). Will retry after 1 hour.",
                            "status_code": 429,
                            "retryable": True,
                        }

                    # Handle other errors
                    if response.status != 200:
                        error_text = await response.text()
                        retryable = (
                            response.status >= 500
                            or response.status == 408
                            or response.status == 429
                        )

                        logger.error(
                            f"Upload failed for {file_name}: HTTP {response.status}"
                        )

                        return {
                            "success": False,
                            "file_name": file_name,
                            "message": f"Upload failed: {response.reason}. {error_text}",
                            "status_code": response.status,
                            "retryable": retryable,
                        }

                    # Success
                    await self.rate_limiter.increment_counter(db)
                    logger.info(f"Successfully uploaded {file_name}")

                    return {
                        "success": True,
                        "file_name": file_name,
                        "message": "File uploaded successfully",
                    }

        except Exception as e:
            logger.error(f"Upload error for {file_name}: {e}")
            return {
                "success": False,
                "file_name": file_name,
                "message": f"Upload error: {str(e)}",
                "retryable": True,
            }

    async def upload_files_batch(
        self,
        files: List[Dict],
        db: Session,
        batch_size: int = 10,
    ) -> Dict:
        """
        Upload multiple files with rate limiting

        Args:
            files: List of dicts with 'id', 'file_name', 'file_path'
            db: Database session
            batch_size: Number of files to process before checking rate limit

        Returns:
            Upload batch result
        """
        results = []
        successful = 0
        failed = 0
        paused = False

        for i, file_info in enumerate(files):
            # Check rate limit before each upload
            rate_check = await self.rate_limiter.check_rate_limit(db)
            if not rate_check["can_upload"]:
                logger.info(
                    f"Rate limit reached. Pausing uploads. Resume at: {rate_check['resume_time']}"
                )
                paused = True
                break

            # Upload file
            result = await self.upload_file(
                file_info["file_path"], file_info["file_name"], db
            )
            results.append(result)

            if result["success"]:
                successful += 1
                # Update queue status
                queue_item = (
                    db.query(UploadQueue)
                    .filter(UploadQueue.id == file_info["id"])
                    .first()
                )
                if queue_item:
                    queue_item.status = UploadStatusEnum.COMPLETED
                    queue_item.uploaded_at = datetime.utcnow()
                    db.commit()
            else:
                failed += 1
                # Update queue status
                queue_item = (
                    db.query(UploadQueue)
                    .filter(UploadQueue.id == file_info["id"])
                    .first()
                )
                if queue_item:
                    queue_item.status = UploadStatusEnum.FAILED
                    queue_item.failure_reason = result["message"]
                    queue_item.retry_count += 1
                    db.commit()

                # Log error
                error_log = UploadErrorLog(
                    file_name=file_info["file_name"],
                    error_code=str(result.get("status_code")),
                    error_message=result["message"],
                    status_code=result.get("status_code"),
                    retryable=result.get("retryable", True),
                )
                db.add(error_log)
                db.commit()

            # Rate limiting delay between uploads
            await asyncio.sleep(0.5)

        return {
            "successful": successful,
            "failed": failed,
            "paused": paused,
            "results": results,
        }

    async def get_upload_status(self, db: Session) -> Dict:
        """
        Get current upload status

        Args:
            db: Database session

        Returns:
            Status dictionary
        """
        rate_check = await self.rate_limiter.check_rate_limit(db)
        config = db.query(HFConfig).first()

        return {
            "rate_limit": rate_check,
            "config": {
                "max_uploads_per_hour": config.max_uploads_per_hour
                if config
                else 128,
                "upload_batch_size": config.upload_batch_size if config else 10,
                "target_repo": config.target_repo if config else self.target_repo,
            }
            if config
            else None,
        }