favoredone commited on
Commit
dab3de5
Β·
verified Β·
1 Parent(s): 93104ee

Upload 3 files

Browse files
Files changed (3) hide show
  1. Dockerfile +19 -0
  2. app.py +748 -0
  3. requirements.txt +7 -0
Dockerfile ADDED
@@ -0,0 +1,19 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # Read the doc: https://huggingface.co/docs/hub/spaces-sdks-docker
2
+ # you will also find guides on how best to write your Dockerfile
3
+
4
+ FROM python:3.9
5
+
6
+ RUN useradd -m -u 1000 user
7
+ USER user
8
+ ENV PATH="/home/user/.local/bin:$PATH"
9
+
10
+ WORKDIR /app
11
+
12
+ COPY --chown=user ./requirements.txt requirements.txt
13
+ RUN pip install --no-cache-dir --upgrade -r requirements.txt
14
+
15
+ RUN chmod -R 777 /app
16
+
17
+
18
+ COPY --chown=user . /app
19
+ CMD ["uvicorn", "app:app", "--host", "0.0.0.0", "--port", "7860"]
app.py ADDED
@@ -0,0 +1,748 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ import json
3
+ import time
4
+ import asyncio
5
+ import aiohttp
6
+ from typing import Dict, List, Set, Optional
7
+ from urllib.parse import quote, urljoin
8
+ from datetime import datetime
9
+ from pathlib import Path
10
+ import huggingface_hub
11
+ from datasets import Dataset
12
+
13
+ from fastapi import FastAPI, BackgroundTasks, HTTPException, status
14
+ from fastapi.responses import JSONResponse
15
+ from pydantic import BaseModel, Field
16
+ from contextlib import asynccontextmanager
17
+ import uvicorn
18
+
19
+ # Path for storing tracking data
20
+ TRACKS_DIR = Path("tracks_data")
21
+ TRACKS_DIR.mkdir(exist_ok=True)
22
+
23
+ # Hugging Face configuration
24
+ HF_TOKEN = os.getenv("HF_TOKEN", "")
25
+ HF_DATASET_ID = os.getenv("HF_DATASET_ID", "fred808/data")
26
+
27
+ if not HF_TOKEN:
28
+ raise ValueError("HF_TOKEN environment variable is required")
29
+
30
+ def get_track_file_path(course: str) -> Path:
31
+ """Get the path to the JSON file for storing course tracking results"""
32
+ safe_name = quote(course, safe='')
33
+ return TRACKS_DIR / f"{safe_name}_tracks.json"
34
+
35
+ def save_tracks_to_file(course: str, tracks: List[Dict]) -> None:
36
+ """Save tracking results to a JSON file"""
37
+ try:
38
+ file_path = get_track_file_path(course)
39
+ with open(file_path, 'w', encoding='utf-8') as f:
40
+ json.dump(tracks, f, indent=2, ensure_ascii=False)
41
+ print(f"βœ“ Saved {len(tracks)} tracks for {course}")
42
+ except Exception as e:
43
+ print(f"Error saving tracks for {course}: {e}")
44
+
45
+ def load_tracks_from_file(course: str) -> List[Dict]:
46
+ """Load existing tracking results from JSON file"""
47
+ try:
48
+ file_path = get_track_file_path(course)
49
+ if file_path.exists():
50
+ with open(file_path, 'r', encoding='utf-8') as f:
51
+ tracks = json.load(f)
52
+ print(f"βœ“ Loaded {len(tracks)} existing tracks for {course}")
53
+ return tracks
54
+ except Exception as e:
55
+ print(f"Error loading tracks for {course}: {e}")
56
+ return []
57
+
58
+ # Configuration
59
+ SOURCE_SERVER = "https://fred808-vs2.hf.space"
60
+ CURSOR_SERVERS = [
61
+ "https://elias2211-elias2211-4zhhex.hf.space/track_cursor_url",
62
+ "https://elias2211-cur1.hf.space/track_cursor_url",
63
+
64
+ ]
65
+
66
+ # This coordinator now sends image URLs to cursor-tracker servers.
67
+ # The servers listed above expose an endpoint (originally /analyze) β€”
68
+ # we will replace the trailing '/analyze' with '/track_cursor_url' when calling.
69
+
70
+ # FastAPI Models
71
+ class CourseInfo(BaseModel):
72
+ course_folder: str
73
+
74
+ class ImageInfo(BaseModel):
75
+ filename: str
76
+
77
+ class TrackRequest(BaseModel):
78
+ image_url: str
79
+ threshold: float = 0.8
80
+
81
+ class TrackResponse(BaseModel):
82
+ success: bool
83
+ cursor_active: Optional[bool] = None
84
+ x: Optional[int] = None
85
+ y: Optional[int] = None
86
+ confidence: Optional[float] = None
87
+ template: Optional[str] = None
88
+ error: Optional[str] = None
89
+
90
+ class ServerStatus(BaseModel):
91
+ url: str
92
+ model: str
93
+ busy: bool
94
+ total_processed: int
95
+ total_time: float
96
+ fps: float
97
+
98
+ class ProcessingStatus(BaseModel):
99
+ course: str
100
+ total_images: int
101
+ processed_images: int
102
+ progress_percent: float
103
+ status: str
104
+
105
+ class StartProcessingRequest(BaseModel):
106
+ courses: Optional[List[str]] = None # If None, process all courses
107
+ continuous: bool = True # Default to continuous like original
108
+
109
+ # Lifespan context manager for startup/shutdown events
110
+ @asynccontextmanager
111
+ async def lifespan(app: FastAPI):
112
+ """Lifespan context manager for startup/shutdown events"""
113
+ # Startup
114
+ initialize_servers()
115
+ print("Cursor Tracking Coordinator API started")
116
+ print(f"Source server: {SOURCE_SERVER}")
117
+ print(f"Cursor servers: {len(CURSOR_SERVERS)}")
118
+ print(f"Hugging Face dataset: {HF_DATASET_ID}")
119
+ print(f"HF Token: {'βœ… Set' if HF_TOKEN else '❌ Missing'}")
120
+
121
+ # Start processing automatically
122
+ if auto_start_processing:
123
+ print("Auto-starting processing loop...")
124
+ global is_processing, current_processing_task
125
+ is_processing = True
126
+ current_processing_task = asyncio.create_task(processing_loop())
127
+
128
+ yield # Server is running
129
+
130
+ # Shutdown
131
+ if current_processing_task:
132
+ is_processing = False
133
+ current_processing_task.cancel()
134
+ try:
135
+ await current_processing_task
136
+ except asyncio.CancelledError:
137
+ pass
138
+
139
+ # FastAPI App with lifespan
140
+ app = FastAPI(
141
+ title="Cursor Tracking Coordinator API",
142
+ description="Distributed cursor-tracking coordinator",
143
+ version="1.0.0",
144
+ lifespan=lifespan
145
+ )
146
+
147
+ # Global state
148
+ processed_images: Dict[str, Set[str]] = {} # {course: set(image_names)}
149
+ course_tracks: Dict[str, List[Dict]] = {} # {course: [{image, tracking, metadata}]}
150
+ failed_images: Dict[str, Set[str]] = {} # {course: set(image_names)}
151
+ servers = []
152
+ is_processing = False
153
+ current_processing_task = None
154
+ auto_start_processing = True # Set to False if you don't want auto-start
155
+
156
+ class CursorServer:
157
+ def __init__(self, url):
158
+ self.url = url
159
+ self.busy = False
160
+ self.model = "unknown"
161
+ self.total_processed = 0
162
+ self.total_time = 0
163
+
164
+ @property
165
+ def fps(self):
166
+ return self.total_processed / self.total_time if self.total_time > 0 else 0
167
+
168
+ # Initialize servers
169
+ def initialize_servers():
170
+ global servers
171
+ servers = [CursorServer(url) for url in CURSOR_SERVERS]
172
+
173
+ # API Routes
174
+ @app.get("/")
175
+ async def root():
176
+ return {
177
+ "message": "Caption Coordinator API",
178
+ "status": "running",
179
+ "auto_processing": auto_start_processing,
180
+ "is_processing": is_processing
181
+ }
182
+
183
+ @app.get("/health")
184
+ async def health():
185
+ return {
186
+ "status": "healthy",
187
+ "servers_available": len([s for s in servers if not s.busy]),
188
+ "total_servers": len(servers),
189
+ "is_processing": is_processing,
190
+ "auto_processing": auto_start_processing
191
+ }
192
+
193
+ @app.get("/courses")
194
+ async def get_courses():
195
+ """Fetch available courses from source server"""
196
+ try:
197
+ async with aiohttp.ClientSession() as session:
198
+ async with session.get(f"{SOURCE_SERVER}/courses") as resp:
199
+ data = await resp.json()
200
+ if isinstance(data, dict) and 'courses' in data:
201
+ return [c['course_folder'] for c in data['courses'] if isinstance(c, dict)]
202
+ return []
203
+ except Exception as e:
204
+ raise HTTPException(status_code=500, detail=f"Error fetching courses: {e}")
205
+
206
+ @app.get("/courses/{course}/images")
207
+ async def get_course_images(course: str):
208
+ """Fetch images list for a course"""
209
+ try:
210
+ course_frames = f"{course}_frames" if not course.endswith("_frames") else course
211
+ url = f"{SOURCE_SERVER}/images/{quote(course_frames)}"
212
+ async with aiohttp.ClientSession() as session:
213
+ async with session.get(url) as resp:
214
+ data = await resp.json()
215
+ if isinstance(data, dict) and 'images' in data:
216
+ return data['images']
217
+ return []
218
+ except Exception as e:
219
+ raise HTTPException(status_code=500, detail=f"Error fetching images: {e}")
220
+
221
+ @app.get("/servers/status")
222
+ async def get_servers_status():
223
+ """Get status of all caption servers"""
224
+ server_statuses = []
225
+ for server in servers:
226
+ server_statuses.append(ServerStatus(
227
+ url=server.url,
228
+ model=server.model,
229
+ busy=server.busy,
230
+ total_processed=server.total_processed,
231
+ total_time=server.total_time,
232
+ fps=server.fps
233
+ ))
234
+ return server_statuses
235
+
236
+ @app.get("/processing/status")
237
+ async def get_processing_status():
238
+ """Get current processing status"""
239
+ status_info = {}
240
+ for course in processed_images:
241
+ total = len(processed_images[course])
242
+ processed = len(course_tracks.get(course, []))
243
+ failed = len(failed_images.get(course, set()))
244
+ status_info[course] = {
245
+ "course": course,
246
+ "total_images": total,
247
+ "processed_images": processed,
248
+ "failed_images": failed,
249
+ "progress_percent": (processed / total * 100) if total > 0 else 0,
250
+ "status": "completed" if processed + failed >= total else "processing"
251
+ }
252
+ return status_info
253
+
254
+ @app.post("/processing/start")
255
+ async def start_processing(request: StartProcessingRequest = StartProcessingRequest()):
256
+ """Start caption processing"""
257
+ global is_processing, current_processing_task
258
+
259
+ if is_processing:
260
+ raise HTTPException(status_code=400, detail="Processing is already running")
261
+
262
+ is_processing = True
263
+ current_processing_task = asyncio.create_task(
264
+ processing_loop(request.courses, request.continuous)
265
+ )
266
+
267
+ return {
268
+ "message": "Processing started",
269
+ "continuous": request.continuous,
270
+ "specific_courses": request.courses
271
+ }
272
+
273
+ @app.post("/processing/stop")
274
+ async def stop_processing():
275
+ """Stop caption processing"""
276
+ global is_processing, current_processing_task
277
+
278
+ if not is_processing:
279
+ raise HTTPException(status_code=400, detail="Processing is not running")
280
+
281
+ is_processing = False
282
+ if current_processing_task:
283
+ current_processing_task.cancel()
284
+ try:
285
+ await current_processing_task
286
+ except asyncio.CancelledError:
287
+ pass
288
+ current_processing_task = None
289
+
290
+ return {"message": "Processing stopped"}
291
+
292
+ @app.get("/tracks/{course}")
293
+ async def get_tracks(course: str):
294
+ """Get tracking results for a specific course"""
295
+ tracks = load_tracks_from_file(course)
296
+ return {
297
+ "course": course,
298
+ "total_tracks": len(tracks),
299
+ "tracks": tracks
300
+ }
301
+
302
+ @app.delete("/tracks/{course}")
303
+ async def delete_tracks(course: str):
304
+ """Delete tracking results for a specific course"""
305
+ try:
306
+ file_path = get_track_file_path(course)
307
+ if file_path.exists():
308
+ file_path.unlink()
309
+ if course in processed_images:
310
+ del processed_images[course]
311
+ if course in course_tracks:
312
+ del course_tracks[course]
313
+ if course in failed_images:
314
+ del failed_images[course]
315
+ return {"message": f"Tracks for {course} deleted"}
316
+ else:
317
+ raise HTTPException(status_code=404, detail=f"No tracks found for {course}")
318
+ except Exception as e:
319
+ raise HTTPException(status_code=500, detail=f"Error deleting tracks: {e}")
320
+
321
+ # Core processing functions
322
+ async def fetch_courses() -> List[str]:
323
+ """Fetch available courses from source server"""
324
+ async with aiohttp.ClientSession() as session:
325
+ async with session.get(f"{SOURCE_SERVER}/courses") as resp:
326
+ data = await resp.json()
327
+ if isinstance(data, dict) and 'courses' in data:
328
+ return [c['course_folder'] for c in data['courses'] if isinstance(c, dict)]
329
+ return []
330
+
331
+ async def fetch_course_images(course: str) -> List[Dict]:
332
+ """Fetch images list for a course"""
333
+ course_frames = f"{course}_frames" if not course.endswith("_frames") else course
334
+ url = f"{SOURCE_SERVER}/images/{quote(course_frames)}"
335
+ async with aiohttp.ClientSession() as session:
336
+ async with session.get(url) as resp:
337
+ data = await resp.json()
338
+ if isinstance(data, dict) and 'images' in data:
339
+ return data['images']
340
+ return []
341
+
342
+ async def get_track(server: str, image_url: str, threshold: float = 0.8) -> Optional[Dict]:
343
+ """Send image_url to a cursor-tracker server's /track_cursor_url endpoint and return the JSON response."""
344
+ try:
345
+ # Replace trailing /analyze with /track_cursor_url if present
346
+ track_endpoint = server
347
+ if track_endpoint.endswith('/analyze'):
348
+ track_endpoint = track_endpoint.rsplit('/analyze', 1)[0] + '/track_cursor_url'
349
+
350
+ data = {
351
+ 'image_url': image_url,
352
+ 'threshold': str(threshold)
353
+ }
354
+
355
+ async with aiohttp.ClientSession() as session:
356
+ async with session.post(track_endpoint, data=data, timeout=30) as resp:
357
+ # Expect JSON with keys similar to TrackResponse
358
+ return await resp.json()
359
+ except Exception as e:
360
+ print(f"Error contacting tracker {server}: {e}")
361
+ return None
362
+
363
+ async def get_server_info():
364
+ """Get basic info from cursor servers (health)"""
365
+ server_info = []
366
+ async with aiohttp.ClientSession() as session:
367
+ for server in CURSOR_SERVERS:
368
+ try:
369
+ health_url = server.rsplit('/analyze', 1)[0] + '/health' if server.endswith('/analyze') else server.rstrip('/') + '/health'
370
+ async with session.get(health_url, timeout=10) as resp:
371
+ info = await resp.json()
372
+ server_info.append({
373
+ 'url': server,
374
+ 'info': info
375
+ })
376
+ except Exception as e:
377
+ print(f"Couldn't get health info from {server}: {e}")
378
+ return server_info
379
+
380
+ async def process_image(server: CursorServer, course: str, image: Dict) -> Optional[Dict]:
381
+ """Process single image through one cursor server and return tracking metadata."""
382
+ if server.busy:
383
+ return None
384
+
385
+ server.busy = True
386
+ start_time = time.time()
387
+
388
+ try:
389
+ # Structure URL correctly: /images/COURSE_NAME_frames/IMAGE.png
390
+ course_frames = f"{course}_frames" if not course.endswith("_frames") else course
391
+ image_url = urljoin(SOURCE_SERVER, f"/images/{quote(course_frames)}/{quote(image['filename'])}")
392
+ result = await get_track(server.url, image_url)
393
+
394
+ processing_time = time.time() - start_time
395
+ server.total_time += processing_time
396
+
397
+ # Expect result to contain cursor tracking fields
398
+ if result and result.get('cursor_active') is not None:
399
+ server.total_processed += 1
400
+ metadata = {
401
+ "image": image['filename'],
402
+ "tracking": {
403
+ "cursor_active": bool(result.get('cursor_active', False)),
404
+ "x": result.get('x'),
405
+ "y": result.get('y'),
406
+ "confidence": float(result.get('confidence')) if result.get('confidence') is not None else None,
407
+ "template": result.get('template')
408
+ },
409
+ "server": server.url,
410
+ "processing_time": processing_time,
411
+ "timestamp": datetime.now().isoformat()
412
+ }
413
+ print(f"Server {server.url} tracked {image['filename']} in {processing_time:.2f}s ({server.fps:.2f} fps)")
414
+ return metadata
415
+ else:
416
+ error_msg = result.get('error', 'Unknown error') if isinstance(result, dict) else 'No response'
417
+ print(f"Server {server.url} failed for {image['filename']}: {error_msg}")
418
+ return None
419
+
420
+ except asyncio.TimeoutError:
421
+ print(f"Server {server.url} timeout for {image['filename']}")
422
+ return None
423
+ except Exception as e:
424
+ print(f"Error processing {image['filename']} on {server.url}: {e}")
425
+ return None
426
+
427
+ finally:
428
+ server.busy = False
429
+
430
+ async def upload_to_huggingface(course: str, metadata_list: List[Dict]):
431
+ """Upload course tracking results to Hugging Face dataset"""
432
+ try:
433
+ print(f"πŸ“€ Uploading {len(metadata_list)} tracks for {course} to Hugging Face...")
434
+
435
+ # Prepare data for Hugging Face dataset
436
+ dataset_data = {
437
+ "course": [],
438
+ "image_filename": [],
439
+ "cursor_active": [],
440
+ "x": [],
441
+ "y": [],
442
+ "confidence": [],
443
+ "template": [],
444
+ "processing_server": [],
445
+ "processing_time": [],
446
+ "timestamp": []
447
+ }
448
+
449
+ for metadata in metadata_list:
450
+ tr = metadata.get('tracking', {})
451
+ dataset_data["course"].append(course)
452
+ dataset_data["image_filename"].append(metadata.get("image"))
453
+ dataset_data["cursor_active"].append(bool(tr.get("cursor_active", False)))
454
+ dataset_data["x"].append(tr.get("x"))
455
+ dataset_data["y"].append(tr.get("y"))
456
+ dataset_data["confidence"].append(tr.get("confidence"))
457
+ dataset_data["template"].append(tr.get("template"))
458
+ dataset_data["processing_server"].append(metadata.get("server"))
459
+ dataset_data["processing_time"].append(metadata.get("processing_time"))
460
+ dataset_data["timestamp"].append(metadata.get("timestamp"))
461
+
462
+ # Create dataset
463
+ dataset = Dataset.from_dict(dataset_data)
464
+
465
+ # Login to Hugging Face
466
+ huggingface_hub.login(token=HF_TOKEN)
467
+
468
+ # Push to hub
469
+ dataset.push_to_hub(
470
+ HF_DATASET_ID,
471
+ config_name=course.replace("/", "_").replace(" ", "_"),
472
+ split="train",
473
+ commit_message=f"Add tracks for course {course} - {len(metadata_list)} images"
474
+ )
475
+
476
+ print(f"βœ… Successfully uploaded {len(metadata_list)} tracks for {course} to {HF_DATASET_ID}")
477
+ return True
478
+
479
+ except Exception as e:
480
+ print(f"❌ Error uploading to Hugging Face: {e}")
481
+ return False
482
+
483
+ async def process_course(course: str, servers: List[CursorServer]):
484
+ """Process all images in a course using available servers with proper retry logic"""
485
+ # Initialize course tracking
486
+ if course not in processed_images:
487
+ processed_images[course] = set()
488
+ if course not in course_tracks:
489
+ course_tracks[course] = load_tracks_from_file(course)
490
+ # Update processed images set from loaded tracks
491
+ for cap in course_tracks[course]:
492
+ processed_images[course].add(cap['image'])
493
+ if course not in failed_images:
494
+ failed_images[course] = set()
495
+
496
+ # Get list of images
497
+ images = await fetch_course_images(course)
498
+ if not images:
499
+ print(f"No images found for course {course}")
500
+ return
501
+
502
+ print(f"\nProcessing {len(images)} images for course {course}")
503
+
504
+ # Track images that need processing with retry count (5 retries)
505
+ pending_images = {}
506
+ for img in images:
507
+ filename = img['filename']
508
+ if filename not in processed_images[course] and filename not in failed_images[course]:
509
+ pending_images[filename] = {'image': img, 'retries': 0, 'max_retries': 5}
510
+
511
+ if not pending_images:
512
+ print(f"All images already processed or failed for course {course}")
513
+ print(f"- Processed: {len(processed_images[course])}, Failed: {len(failed_images[course])}")
514
+
515
+ # If course is completed, upload to Hugging Face
516
+ if len(processed_images[course]) + len(failed_images[course]) >= len(images):
517
+ if course_tracks[course]:
518
+ print(f"πŸ“€ Course {course} completed, uploading to Hugging Face...")
519
+ await upload_to_huggingface(course, course_tracks[course])
520
+ return
521
+
522
+ print(f"Images to process: {len(pending_images)} (already processed: {len(processed_images[course])}, failed: {len(failed_images[course])})")
523
+
524
+ batch_size = len([s for s in servers if not s.busy])
525
+ processed_in_this_run = 0
526
+
527
+ while pending_images and is_processing:
528
+ # Create tasks for each available server
529
+ tasks = []
530
+ assigned_images = []
531
+
532
+ for server in servers:
533
+ if not server.busy and pending_images:
534
+ # Get the next pending image
535
+ filename, img_data = next(iter(pending_images.items()))
536
+ img = img_data['image']
537
+
538
+ # Assign this image to the server
539
+ tasks.append(process_image(server, course, img))
540
+ assigned_images.append((filename, img, img_data['retries']))
541
+ # Remove from pending temporarily while it's being processed
542
+ del pending_images[filename]
543
+
544
+ if not tasks:
545
+ # If no servers available, wait a bit
546
+ await asyncio.sleep(0.1)
547
+ continue
548
+
549
+ # Process images in parallel across servers
550
+ results = await asyncio.gather(*tasks)
551
+
552
+ # Handle results and retry logic
553
+ has_new_results = False
554
+ for (filename, img, current_retries), result in zip(assigned_images, results):
555
+ if result:
556
+ # Success - image was processed
557
+ processed_images[course].add(filename)
558
+ course_tracks[course].append(result)
559
+ has_new_results = True
560
+ processed_in_this_run += 1
561
+ print(f"βœ“ Successfully processed {filename}")
562
+ else:
563
+ # Failure - check if we should retry
564
+ if current_retries < 5: # max_retries
565
+ # Put back in pending for retry with incremented retry count
566
+ pending_images[filename] = {
567
+ 'image': img,
568
+ 'retries': current_retries + 1,
569
+ 'max_retries': 5
570
+ }
571
+ print(f"↻ Retry {current_retries + 1}/5 for {filename}")
572
+ else:
573
+ # Max retries exceeded, mark as failed
574
+ failed_images[course].add(filename)
575
+ print(f"βœ— Failed to process {filename} after 5 retries")
576
+
577
+ # Save progress after each batch with new results
578
+ if has_new_results:
579
+ save_tracks_to_file(course, course_tracks[course])
580
+
581
+ # Show progress
582
+ total = len(images)
583
+ done = len(processed_images[course])
584
+ failed_count = len(failed_images[course])
585
+ pending_count = len(pending_images)
586
+ progress_percent = (done / total * 100) if total > 0 else 0
587
+
588
+ print(f"\rProgress: {done}/{total} ({progress_percent:.1f}%) - {pending_count} pending, {failed_count} failed, {processed_in_this_run} new", end="", flush=True)
589
+
590
+ # Small delay to prevent overwhelming the servers
591
+ await asyncio.sleep(0.5)
592
+
593
+ # Final status for this course
594
+ total = len(images)
595
+ done = len(processed_images[course])
596
+ failed_count = len(failed_images[course])
597
+
598
+ if done + failed_count >= total:
599
+ if failed_count > 0:
600
+ print(f"\nβœ“ Course {course} completed with {failed_count} failed images")
601
+ else:
602
+ print(f"\nβœ“ Course {course} fully completed")
603
+
604
+ # Upload to Hugging Face when course is completed
605
+ if course_tracks[course]:
606
+ print(f"πŸ“€ Uploading {len(course_tracks[course])} tracks to Hugging Face...")
607
+ success = await upload_to_huggingface(course, course_tracks[course])
608
+ if success:
609
+ print(f"βœ… Successfully uploaded {course} to Hugging Face")
610
+ else:
611
+ print(f"❌ Failed to upload {course} to Hugging Face")
612
+ else:
613
+ print(f"\n→ Course {course} partially completed: {done}/{total} processed, {failed_count} failed")
614
+
615
+ async def processing_loop(specific_courses: Optional[List[str]] = None, continuous: bool = True):
616
+ """Main processing loop with proper error handling"""
617
+ global is_processing
618
+
619
+ # Get model information and verify Florence-2-large availability
620
+ server_info = await get_server_info()
621
+ print("\nCursor servers (health check):")
622
+ available_servers = []
623
+ # Map server info responses to our server objects (servers list)
624
+ for info, server in zip(server_info, servers):
625
+ server.model = info.get('info', {}).get('model_choice', 'cursor-tracker') if isinstance(info, dict) else 'cursor-tracker'
626
+ available_servers.append(server)
627
+ print(f"- {server.url}: {server.model}")
628
+
629
+ if not available_servers:
630
+ print(f"\nError: No available cursor servers!")
631
+ is_processing = False
632
+ return
633
+
634
+ processing_servers = available_servers
635
+ print(f"\nUsing {len(processing_servers)} cursor servers")
636
+
637
+ # Check for existing track files and report
638
+ existing_tracks = list(TRACKS_DIR.glob("*_tracks.json"))
639
+ if existing_tracks:
640
+ print("\nFound existing track files:")
641
+ for cap_file in existing_tracks:
642
+ course = cap_file.stem.replace("_tracks", "")
643
+ try:
644
+ with open(cap_file, 'r', encoding='utf-8') as f:
645
+ tracks = json.load(f)
646
+ print(f"- {course}: {len(tracks)} tracks")
647
+ except Exception as e:
648
+ print(f"- Error reading {cap_file.name}: {e}")
649
+ print()
650
+
651
+ start_time = time.time()
652
+ iteration = 0
653
+
654
+ while is_processing:
655
+ try:
656
+ iteration += 1
657
+ print(f"\n{'='*50}")
658
+ print(f"Processing Iteration {iteration}")
659
+ print(f"{'='*50}")
660
+
661
+ # Get available courses
662
+ if specific_courses:
663
+ courses = specific_courses
664
+ print(f"Processing specific courses: {courses}")
665
+ else:
666
+ courses = await fetch_courses()
667
+ print(f"Found {len(courses)} courses")
668
+
669
+ if not courses:
670
+ print("No courses found, waiting...")
671
+ if not continuous:
672
+ break
673
+ await asyncio.sleep(10)
674
+ continue
675
+
676
+ # Process each course with all available servers
677
+ for course in courses:
678
+ if not is_processing:
679
+ break
680
+
681
+ print(f"\n--- Processing course: {course} ---")
682
+ await process_course(course, processing_servers)
683
+
684
+ # Show server stats
685
+ print("\nServer Stats:")
686
+ total_processed = sum(s.total_processed for s in processing_servers)
687
+ elapsed = time.time() - start_time
688
+ if elapsed > 0:
689
+ print(f"Total images processed: {total_processed}")
690
+ print(f"Overall speed: {total_processed/elapsed:.2f} fps")
691
+ for s in processing_servers:
692
+ print(f"- {s.url}: {s.total_processed} images, {s.fps:.2f} fps")
693
+ print()
694
+
695
+ if not continuous:
696
+ print("One-time processing completed")
697
+ break
698
+
699
+ # Wait before next check
700
+ print("Waiting for new courses...")
701
+ await asyncio.sleep(5)
702
+
703
+ except asyncio.CancelledError:
704
+ print("Processing cancelled")
705
+ break
706
+ except Exception as e:
707
+ print(f"Error in processing loop: {str(e)}")
708
+ import traceback
709
+ traceback.print_exc()
710
+ await asyncio.sleep(10)
711
+
712
+ is_processing = False
713
+ print("Processing loop stopped")
714
+
715
+ # Lifespan context manager for startup/shutdown events
716
+ @asynccontextmanager
717
+ async def lifespan(app: FastAPI):
718
+ """Lifespan context manager for startup/shutdown events"""
719
+ # Startup
720
+ initialize_servers()
721
+ print("Cursor Tracking Coordinator API started")
722
+ print(f"Source server: {SOURCE_SERVER}")
723
+ print(f"Cursor servers: {len(CURSOR_SERVERS)}")
724
+ print(f"Hugging Face dataset: {HF_DATASET_ID}")
725
+ print(f"HF Token: {'βœ… Set' if HF_TOKEN else '❌ Missing'}")
726
+
727
+ # Start processing automatically
728
+ if auto_start_processing:
729
+ print("Auto-starting processing loop...")
730
+ global is_processing, current_processing_task
731
+ is_processing = True
732
+ current_processing_task = asyncio.create_task(processing_loop())
733
+
734
+ yield # Server is running
735
+
736
+ # Shutdown
737
+ if current_processing_task:
738
+ is_processing = False
739
+ current_processing_task.cancel()
740
+ try:
741
+ await current_processing_task
742
+ except asyncio.CancelledError:
743
+ pass
744
+
745
+
746
+ if __name__ == "__main__":
747
+ # Run with module-style import string for reload support
748
+ uvicorn.run("app:app", host="0.0.0.0", port=8000, reload=True)
requirements.txt ADDED
@@ -0,0 +1,7 @@
 
 
 
 
 
 
 
 
1
+ fastapi==0.104.1
2
+ uvicorn==0.24.0
3
+ aiofiles==23.2.1
4
+ python-multipart==0.0.6
5
+ huggingface-hub==0.18.0
6
+ aiohttp
7
+ datasets