favoredone commited on
Commit
4e80026
Β·
verified Β·
1 Parent(s): e4f005a

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +704 -704
app.py CHANGED
@@ -1,705 +1,705 @@
1
- import os
2
- import json
3
- import time
4
- import asyncio
5
- import aiohttp
6
- from typing import Dict, List, Set, Optional
7
- from urllib.parse import quote, urljoin
8
- from datetime import datetime
9
- from pathlib import Path
10
- from datasets import Dataset, DatasetDict
11
- import huggingface_hub
12
-
13
- from fastapi import FastAPI, BackgroundTasks, HTTPException, status
14
- from fastapi.responses import JSONResponse
15
- from pydantic import BaseModel, Field
16
- import uvicorn
17
-
18
- # Path for storing caption data
19
- CAPTIONS_DIR = Path("captions_data")
20
- CAPTIONS_DIR.mkdir(exist_ok=True)
21
-
22
- # Hugging Face configuration
23
- HF_TOKEN = os.getenv("HF_TOKEN")
24
- HF_DATASET_ID = os.getenv("HF_DATASET_ID", "fred808/helium")
25
-
26
- if not HF_TOKEN:
27
- raise ValueError("HF_TOKEN environment variable is required")
28
-
29
- def get_caption_file_path(course: str) -> Path:
30
- """Get the path to the JSON file for storing course captions"""
31
- safe_name = quote(course, safe='')
32
- return CAPTIONS_DIR / f"{safe_name}_captions.json"
33
-
34
- def save_captions_to_file(course: str, captions: List[Dict]) -> None:
35
- """Save captions to a JSON file"""
36
- try:
37
- file_path = get_caption_file_path(course)
38
- with open(file_path, 'w', encoding='utf-8') as f:
39
- json.dump(captions, f, indent=2, ensure_ascii=False)
40
- print(f"βœ“ Saved {len(captions)} captions for {course}")
41
- except Exception as e:
42
- print(f"Error saving captions for {course}: {e}")
43
-
44
- def load_captions_from_file(course: str) -> List[Dict]:
45
- """Load existing captions from JSON file"""
46
- try:
47
- file_path = get_caption_file_path(course)
48
- if file_path.exists():
49
- with open(file_path, 'r', encoding='utf-8') as f:
50
- captions = json.load(f)
51
- print(f"βœ“ Loaded {len(captions)} existing captions for {course}")
52
- return captions
53
- except Exception as e:
54
- print(f"Error loading captions for {course}: {e}")
55
- return []
56
-
57
- # Configuration
58
- SOURCE_SERVER = "https://fred808-vs2.hf.space"
59
- CAPTION_SERVERS = [
60
- "https://favoredone-tv88mp.hf.space",
61
- "https://favoredone-7p1dcf.hf.space",
62
- "https://favoredone-k7b4mf.hf.space",
63
- "https://favoredone-mzlxc7.hf.space",
64
- "https://favoredone-aomfwa.hf.space",
65
- "https://favoredone-7g6v04.hf.space",
66
- "https://favoredone-dk1skh.hf.space",
67
- "https://favoredone-z4yo0y.hf.space",
68
- "https://favoredone-f6czeq.hf.space",
69
- "https://favoredone-5fo8ga.hf.space",
70
- "https://favoredone-zde8x6.hf.space",
71
- "https://favoredone-r0biih.hf.space",
72
- "https://favoredone-ljdzkf.hf.space",
73
- "https://favoredone-irrpe5.hf.space",
74
- "https://favoredone-bh9rwz.hf.space",
75
- "https://favoredone-u8c4dt.hf.space",
76
- "https://favoredone-futwyd.hf.space",
77
- "https://favoredone-hg2sot.hf.space",
78
- "https://favoredone-pvweug.hf.space",
79
- "https://favoredone-z6azk2.hf.space",
80
- "https://favoredone-4zid9w.hf.space",
81
- "https://favoredone-be7a1r.hf.space",
82
- "https://favoredone-ayazxa.hf.space",
83
- "https://favoredone-6ckj4m.hf.space",
84
- "https://favoredone-whn0xu.hf.space",
85
- "https://favoredone-t49exm.hf.space",
86
- "https://favoredone-cgrh0a.hf.space",
87
- "https://favoredone-r1kb5g.hf.space"
88
- ]
89
- MODEL_TYPE = "Florence-2-large" # Explicitly request large model
90
-
91
- # FastAPI Models
92
- class CourseInfo(BaseModel):
93
- course_folder: str
94
-
95
- class ImageInfo(BaseModel):
96
- filename: str
97
-
98
- class CaptionRequest(BaseModel):
99
- image_url: str
100
- model_choice: str = MODEL_TYPE
101
-
102
- class CaptionResponse(BaseModel):
103
- success: bool
104
- caption: Optional[str] = None
105
- error: Optional[str] = None
106
-
107
- class ServerStatus(BaseModel):
108
- url: str
109
- model: str
110
- busy: bool
111
- total_processed: int
112
- total_time: float
113
- fps: float
114
-
115
- class ProcessingStatus(BaseModel):
116
- course: str
117
- total_images: int
118
- processed_images: int
119
- progress_percent: float
120
- status: str
121
-
122
- class StartProcessingRequest(BaseModel):
123
- courses: Optional[List[str]] = None # If None, process all courses
124
- continuous: bool = True # Default to continuous like original
125
-
126
- # FastAPI App
127
- app = FastAPI(
128
- title="Caption Coordinator API",
129
- description="Distributed caption processing coordinator",
130
- version="1.0.0"
131
- )
132
-
133
- # Global state
134
- processed_images: Dict[str, Set[str]] = {} # {course: set(image_names)}
135
- course_captions: Dict[str, List[Dict]] = {} # {course: [{image, caption, metadata}]}
136
- failed_images: Dict[str, Set[str]] = {} # {course: set(image_names)}
137
- servers = []
138
- is_processing = False
139
- current_processing_task = None
140
- auto_start_processing = True # Set to False if you don't want auto-start
141
-
142
- class CaptionServer:
143
- def __init__(self, url):
144
- self.url = url
145
- self.busy = False
146
- self.model = "unknown"
147
- self.total_processed = 0
148
- self.total_time = 0
149
-
150
- @property
151
- def fps(self):
152
- return self.total_processed / self.total_time if self.total_time > 0 else 0
153
-
154
- # Initialize servers
155
- def initialize_servers():
156
- global servers
157
- servers = [CaptionServer(url) for url in CAPTION_SERVERS]
158
-
159
- # API Routes
160
- @app.get("/")
161
- async def root():
162
- return {
163
- "message": "Caption Coordinator API",
164
- "status": "running",
165
- "auto_processing": auto_start_processing,
166
- "is_processing": is_processing
167
- }
168
-
169
- @app.get("/health")
170
- async def health():
171
- return {
172
- "status": "healthy",
173
- "servers_available": len([s for s in servers if not s.busy]),
174
- "total_servers": len(servers),
175
- "is_processing": is_processing,
176
- "auto_processing": auto_start_processing
177
- }
178
-
179
- @app.get("/courses")
180
- async def get_courses():
181
- """Fetch available courses from source server"""
182
- try:
183
- async with aiohttp.ClientSession() as session:
184
- async with session.get(f"{SOURCE_SERVER}/courses") as resp:
185
- data = await resp.json()
186
- if isinstance(data, dict) and 'courses' in data:
187
- return [c['course_folder'] for c in data['courses'] if isinstance(c, dict)]
188
- return []
189
- except Exception as e:
190
- raise HTTPException(status_code=500, detail=f"Error fetching courses: {e}")
191
-
192
- @app.get("/courses/{course}/images")
193
- async def get_course_images(course: str):
194
- """Fetch images list for a course"""
195
- try:
196
- course_frames = f"{course}_frames" if not course.endswith("_frames") else course
197
- url = f"{SOURCE_SERVER}/images/{quote(course_frames)}"
198
- async with aiohttp.ClientSession() as session:
199
- async with session.get(url) as resp:
200
- data = await resp.json()
201
- if isinstance(data, dict) and 'images' in data:
202
- return data['images']
203
- return []
204
- except Exception as e:
205
- raise HTTPException(status_code=500, detail=f"Error fetching images: {e}")
206
-
207
- @app.get("/servers/status")
208
- async def get_servers_status():
209
- """Get status of all caption servers"""
210
- server_statuses = []
211
- for server in servers:
212
- server_statuses.append(ServerStatus(
213
- url=server.url,
214
- model=server.model,
215
- busy=server.busy,
216
- total_processed=server.total_processed,
217
- total_time=server.total_time,
218
- fps=server.fps
219
- ))
220
- return server_statuses
221
-
222
- @app.get("/processing/status")
223
- async def get_processing_status():
224
- """Get current processing status"""
225
- status_info = {}
226
- for course in processed_images:
227
- total = len(processed_images[course])
228
- processed = len(course_captions.get(course, []))
229
- failed = len(failed_images.get(course, set()))
230
- status_info[course] = {
231
- "course": course,
232
- "total_images": total,
233
- "processed_images": processed,
234
- "failed_images": failed,
235
- "progress_percent": (processed / total * 100) if total > 0 else 0,
236
- "status": "completed" if processed + failed >= total else "processing"
237
- }
238
- return status_info
239
-
240
- @app.post("/processing/start")
241
- async def start_processing(request: StartProcessingRequest = StartProcessingRequest()):
242
- """Start caption processing"""
243
- global is_processing, current_processing_task
244
-
245
- if is_processing:
246
- raise HTTPException(status_code=400, detail="Processing is already running")
247
-
248
- is_processing = True
249
- current_processing_task = asyncio.create_task(
250
- processing_loop(request.courses, request.continuous)
251
- )
252
-
253
- return {
254
- "message": "Processing started",
255
- "continuous": request.continuous,
256
- "specific_courses": request.courses
257
- }
258
-
259
- @app.post("/processing/stop")
260
- async def stop_processing():
261
- """Stop caption processing"""
262
- global is_processing, current_processing_task
263
-
264
- if not is_processing:
265
- raise HTTPException(status_code=400, detail="Processing is not running")
266
-
267
- is_processing = False
268
- if current_processing_task:
269
- current_processing_task.cancel()
270
- try:
271
- await current_processing_task
272
- except asyncio.CancelledError:
273
- pass
274
- current_processing_task = None
275
-
276
- return {"message": "Processing stopped"}
277
-
278
- @app.get("/captions/{course}")
279
- async def get_captions(course: str):
280
- """Get captions for a specific course"""
281
- captions = load_captions_from_file(course)
282
- return {
283
- "course": course,
284
- "total_captions": len(captions),
285
- "captions": captions
286
- }
287
-
288
- @app.delete("/captions/{course}")
289
- async def delete_captions(course: str):
290
- """Delete captions for a specific course"""
291
- try:
292
- file_path = get_caption_file_path(course)
293
- if file_path.exists():
294
- file_path.unlink()
295
- if course in processed_images:
296
- del processed_images[course]
297
- if course in course_captions:
298
- del course_captions[course]
299
- if course in failed_images:
300
- del failed_images[course]
301
- return {"message": f"Captions for {course} deleted"}
302
- else:
303
- raise HTTPException(status_code=404, detail=f"No captions found for {course}")
304
- except Exception as e:
305
- raise HTTPException(status_code=500, detail=f"Error deleting captions: {e}")
306
-
307
- # Core processing functions
308
- async def fetch_courses() -> List[str]:
309
- """Fetch available courses from source server"""
310
- async with aiohttp.ClientSession() as session:
311
- async with session.get(f"{SOURCE_SERVER}/courses") as resp:
312
- data = await resp.json()
313
- if isinstance(data, dict) and 'courses' in data:
314
- return [c['course_folder'] for c in data['courses'] if isinstance(c, dict)]
315
- return []
316
-
317
- async def fetch_course_images(course: str) -> List[Dict]:
318
- """Fetch images list for a course"""
319
- course_frames = f"{course}_frames" if not course.endswith("_frames") else course
320
- url = f"{SOURCE_SERVER}/images/{quote(course_frames)}"
321
- async with aiohttp.ClientSession() as session:
322
- async with session.get(url) as resp:
323
- data = await resp.json()
324
- if isinstance(data, dict) and 'images' in data:
325
- return data['images']
326
- return []
327
-
328
- async def get_caption(server: str, image_url: str) -> Dict:
329
- """Get caption from a specific server"""
330
- params = {
331
- 'image_url': image_url,
332
- 'model_choice': MODEL_TYPE
333
- }
334
- try:
335
- async with aiohttp.ClientSession() as session:
336
- async with session.get(server, params=params, timeout=30) as resp:
337
- return await resp.json()
338
- except Exception as e:
339
- print(f"Error from {server}: {e}")
340
- return None
341
-
342
- async def get_model_info():
343
- """Get model information from caption servers"""
344
- model_info = []
345
- async with aiohttp.ClientSession() as session:
346
- for server in CAPTION_SERVERS:
347
- try:
348
- health_url = server.rsplit('/analyze', 1)[0] + '/health'
349
- async with session.get(health_url) as resp:
350
- info = await resp.json()
351
- model_info.append({
352
- 'url': server,
353
- 'model': info.get('model_choice', 'unknown')
354
- })
355
- except Exception as e:
356
- print(f"Couldn't get model info from {server}: {e}")
357
- return model_info
358
-
359
- async def process_image(server: CaptionServer, course: str, image: Dict) -> Dict:
360
- """Process single image through one caption server with better error handling"""
361
- if server.busy:
362
- return None
363
-
364
- server.busy = True
365
- start_time = time.time()
366
-
367
- try:
368
- # Structure URL correctly: /images/COURSE_NAME_frames/IMAGE.png
369
- course_frames = f"{course}_frames" if not course.endswith("_frames") else course
370
- image_url = urljoin(SOURCE_SERVER, f"/images/{quote(course_frames)}/{quote(image['filename'])}")
371
- result = await get_caption(server.url, image_url)
372
-
373
- processing_time = time.time() - start_time
374
- server.total_time += processing_time
375
-
376
- if result and result.get('success') and result.get('caption'):
377
- server.total_processed += 1
378
- metadata = {
379
- "image": image['filename'],
380
- "caption": result['caption'],
381
- "server": server.url,
382
- "processing_time": processing_time,
383
- "timestamp": datetime.now().isoformat()
384
- }
385
- print(f"Server {server.url} processed {image['filename']} in {processing_time:.2f}s ({server.fps:.2f} fps)")
386
- return metadata
387
- else:
388
- # Server responded but no caption (might be error or empty response)
389
- error_msg = result.get('error', 'Unknown error') if result else 'No response'
390
- print(f"Server {server.url} failed for {image['filename']}: {error_msg}")
391
- return None
392
-
393
- except asyncio.TimeoutError:
394
- print(f"Server {server.url} timeout for {image['filename']}")
395
- return None
396
- except Exception as e:
397
- print(f"Error processing {image['filename']} on {server.url}: {e}")
398
- return None
399
-
400
- finally:
401
- server.busy = False
402
-
403
- async def upload_to_huggingface(course: str, metadata_list: List[Dict]):
404
- """Upload course captions to Hugging Face dataset"""
405
- try:
406
- print(f"πŸ“€ Uploading {len(metadata_list)} captions for {course} to Hugging Face...")
407
-
408
- # Prepare data for Hugging Face dataset
409
- dataset_data = {
410
- "course": [],
411
- "image_filename": [],
412
- "caption": [],
413
- "processing_server": [],
414
- "processing_time": [],
415
- "timestamp": []
416
- }
417
-
418
- for metadata in metadata_list:
419
- dataset_data["course"].append(course)
420
- dataset_data["image_filename"].append(metadata["image"])
421
- dataset_data["caption"].append(metadata["caption"])
422
- dataset_data["processing_server"].append(metadata["server"])
423
- dataset_data["processing_time"].append(metadata["processing_time"])
424
- dataset_data["timestamp"].append(metadata["timestamp"])
425
-
426
- # Login to Hugging Face
427
- huggingface_hub.login(token=HF_TOKEN)
428
-
429
- # Convert to JSON string
430
- json_data = json.dumps(dataset_data, indent=2, ensure_ascii=False)
431
-
432
- # Create filename for the course
433
- filename = f"{course.replace('/', '_').replace(' ', '_')}_captions.json"
434
-
435
- # Upload directly to hub as JSON file
436
- huggingface_hub.upload_file(
437
- path_or_fileobj=json_data.encode(),
438
- path_in_repo=filename,
439
- repo_id=HF_DATASET_ID,
440
- commit_message=f"Add captions for course {course} - {len(metadata_list)} images"
441
- )
442
-
443
- print(f"βœ… Successfully uploaded {len(metadata_list)} captions for {course} to {HF_DATASET_ID}/{filename}")
444
- return True
445
-
446
- except Exception as e:
447
- print(f"❌ Error uploading to Hugging Face: {e}")
448
- return False
449
-
450
- async def process_course(course: str, servers: List[CaptionServer]):
451
- """Process all images in a course using available servers with proper retry logic"""
452
- # Initialize course tracking
453
- if course not in processed_images:
454
- processed_images[course] = set()
455
- if course not in course_captions:
456
- course_captions[course] = load_captions_from_file(course)
457
- # Update processed images set from loaded captions
458
- for cap in course_captions[course]:
459
- processed_images[course].add(cap['image'])
460
- if course not in failed_images:
461
- failed_images[course] = set()
462
-
463
- # Get list of images
464
- images = await fetch_course_images(course)
465
- if not images:
466
- print(f"No images found for course {course}")
467
- return
468
-
469
- print(f"\nProcessing {len(images)} images for course {course}")
470
-
471
- # Track images that need processing with retry count (5 retries)
472
- pending_images = {}
473
- for img in images:
474
- filename = img['filename']
475
- if filename not in processed_images[course] and filename not in failed_images[course]:
476
- pending_images[filename] = {'image': img, 'retries': 0, 'max_retries': 5}
477
-
478
- if not pending_images:
479
- print(f"All images already processed or failed for course {course}")
480
- print(f"- Processed: {len(processed_images[course])}, Failed: {len(failed_images[course])}")
481
-
482
- # If course is completed, upload to Hugging Face
483
- if len(processed_images[course]) + len(failed_images[course]) >= len(images):
484
- if course_captions[course]:
485
- print(f"πŸ“€ Course {course} completed, uploading to Hugging Face...")
486
- await upload_to_huggingface(course, course_captions[course])
487
- return
488
-
489
- print(f"Images to process: {len(pending_images)} (already processed: {len(processed_images[course])}, failed: {len(failed_images[course])})")
490
-
491
- batch_size = len([s for s in servers if not s.busy])
492
- processed_in_this_run = 0
493
-
494
- while pending_images and is_processing:
495
- # Create tasks for each available server
496
- tasks = []
497
- assigned_images = []
498
-
499
- for server in servers:
500
- if not server.busy and pending_images:
501
- # Get the next pending image
502
- filename, img_data = next(iter(pending_images.items()))
503
- img = img_data['image']
504
-
505
- # Assign this image to the server
506
- tasks.append(process_image(server, course, img))
507
- assigned_images.append((filename, img, img_data['retries']))
508
- # Remove from pending temporarily while it's being processed
509
- del pending_images[filename]
510
-
511
- if not tasks:
512
- # If no servers available, wait a bit
513
- await asyncio.sleep(0.1)
514
- continue
515
-
516
- # Process images in parallel across servers
517
- results = await asyncio.gather(*tasks)
518
-
519
- # Handle results and retry logic
520
- has_new_results = False
521
- for (filename, img, current_retries), result in zip(assigned_images, results):
522
- if result:
523
- # Success - image was processed
524
- processed_images[course].add(filename)
525
- course_captions[course].append(result)
526
- has_new_results = True
527
- processed_in_this_run += 1
528
- print(f"βœ“ Successfully processed {filename}")
529
- else:
530
- # Failure - check if we should retry
531
- if current_retries < 5: # max_retries
532
- # Put back in pending for retry with incremented retry count
533
- pending_images[filename] = {
534
- 'image': img,
535
- 'retries': current_retries + 1,
536
- 'max_retries': 5
537
- }
538
- print(f"↻ Retry {current_retries + 1}/5 for {filename}")
539
- else:
540
- # Max retries exceeded, mark as failed
541
- failed_images[course].add(filename)
542
- print(f"βœ— Failed to process {filename} after 5 retries")
543
-
544
- # Save progress after each batch with new results
545
- if has_new_results:
546
- save_captions_to_file(course, course_captions[course])
547
-
548
- # Show progress
549
- total = len(images)
550
- done = len(processed_images[course])
551
- failed_count = len(failed_images[course])
552
- pending_count = len(pending_images)
553
- progress_percent = (done / total * 100) if total > 0 else 0
554
-
555
- print(f"\rProgress: {done}/{total} ({progress_percent:.1f}%) - {pending_count} pending, {failed_count} failed, {processed_in_this_run} new", end="", flush=True)
556
-
557
- # Small delay to prevent overwhelming the servers
558
- await asyncio.sleep(0.5)
559
-
560
- # Final status for this course
561
- total = len(images)
562
- done = len(processed_images[course])
563
- failed_count = len(failed_images[course])
564
-
565
- if done + failed_count >= total:
566
- if failed_count > 0:
567
- print(f"\nβœ“ Course {course} completed with {failed_count} failed images")
568
- else:
569
- print(f"\nβœ“ Course {course} fully completed")
570
-
571
- # Upload to Hugging Face when course is completed
572
- if course_captions[course]:
573
- print(f"πŸ“€ Uploading {len(course_captions[course])} captions to Hugging Face...")
574
- success = await upload_to_huggingface(course, course_captions[course])
575
- if success:
576
- print(f"βœ… Successfully uploaded {course} to Hugging Face")
577
- else:
578
- print(f"❌ Failed to upload {course} to Hugging Face")
579
- else:
580
- print(f"\n→ Course {course} partially completed: {done}/{total} processed, {failed_count} failed")
581
-
582
- async def processing_loop(specific_courses: Optional[List[str]] = None, continuous: bool = True):
583
- """Main processing loop with proper error handling"""
584
- global is_processing
585
-
586
- # Get model information and verify Florence-2-large availability
587
- model_info = await get_model_info()
588
- print("\nCaption Servers:")
589
- available_servers = []
590
- for info, server in zip(model_info, servers):
591
- server.model = info['model']
592
- if MODEL_TYPE in info.get('model', ''):
593
- available_servers.append(server)
594
- print(f"βœ“ {server.url} confirmed {MODEL_TYPE}")
595
- else:
596
- print(f"βœ— {server.url} using {server.model} - skipping (requires {MODEL_TYPE})")
597
-
598
- if not available_servers:
599
- print(f"\nError: No servers with {MODEL_TYPE} available!")
600
- is_processing = False
601
- return
602
-
603
- # Update servers list to only use those with large model
604
- processing_servers = available_servers
605
- print(f"\nUsing {len(processing_servers)} servers with {MODEL_TYPE}")
606
-
607
- # Check for existing caption files and report
608
- existing_captions = list(CAPTIONS_DIR.glob("*_captions.json"))
609
- if existing_captions:
610
- print("\nFound existing caption files:")
611
- for cap_file in existing_captions:
612
- course = cap_file.stem.replace("_captions", "")
613
- try:
614
- with open(cap_file, 'r', encoding='utf-8') as f:
615
- captions = json.load(f)
616
- print(f"- {course}: {len(captions)} captions")
617
- except Exception as e:
618
- print(f"- Error reading {cap_file.name}: {e}")
619
- print()
620
-
621
- start_time = time.time()
622
- iteration = 0
623
-
624
- while is_processing:
625
- try:
626
- iteration += 1
627
- print(f"\n{'='*50}")
628
- print(f"Processing Iteration {iteration}")
629
- print(f"{'='*50}")
630
-
631
- # Get available courses
632
- if specific_courses:
633
- courses = specific_courses
634
- print(f"Processing specific courses: {courses}")
635
- else:
636
- courses = await fetch_courses()
637
- print(f"Found {len(courses)} courses")
638
-
639
- if not courses:
640
- print("No courses found, waiting...")
641
- if not continuous:
642
- break
643
- await asyncio.sleep(10)
644
- continue
645
-
646
- # Process each course with all available servers
647
- for course in courses:
648
- if not is_processing:
649
- break
650
-
651
- print(f"\n--- Processing course: {course} ---")
652
- await process_course(course, processing_servers)
653
-
654
- # Show server stats
655
- print("\nServer Stats:")
656
- total_processed = sum(s.total_processed for s in processing_servers)
657
- elapsed = time.time() - start_time
658
- if elapsed > 0:
659
- print(f"Total images processed: {total_processed}")
660
- print(f"Overall speed: {total_processed/elapsed:.2f} fps")
661
- for s in processing_servers:
662
- print(f"- {s.url}: {s.total_processed} images, {s.fps:.2f} fps")
663
- print()
664
-
665
- if not continuous:
666
- print("One-time processing completed")
667
- break
668
-
669
- # Wait before next check
670
- print("Waiting for new courses...")
671
- await asyncio.sleep(5)
672
-
673
- except asyncio.CancelledError:
674
- print("Processing cancelled")
675
- break
676
- except Exception as e:
677
- print(f"Error in processing loop: {str(e)}")
678
- import traceback
679
- traceback.print_exc()
680
- await asyncio.sleep(10)
681
-
682
- is_processing = False
683
- print("Processing loop stopped")
684
-
685
- # Startup event
686
- @app.on_event("startup")
687
- async def startup_event():
688
- """Initialize servers and start processing on startup"""
689
- initialize_servers()
690
- print("Caption Coordinator API started")
691
- print(f"Source server: {SOURCE_SERVER}")
692
- print(f"Caption servers: {len(CAPTION_SERVERS)}")
693
- print(f"Hugging Face dataset: {HF_DATASET_ID}")
694
- print(f"HF Token: {'βœ… Set' if HF_TOKEN else '❌ Missing'}")
695
-
696
- # Start processing automatically (like original main())
697
- if auto_start_processing:
698
- print("Auto-starting processing loop...")
699
- global is_processing, current_processing_task
700
- is_processing = True
701
- current_processing_task = asyncio.create_task(processing_loop())
702
-
703
-
704
- if __name__ == "__main__":
705
  uvicorn.run(app, host="0.0.0.0", port=8000, reload=True)
 
1
+ import os
2
+ import json
3
+ import time
4
+ import asyncio
5
+ import aiohttp
6
+ from typing import Dict, List, Set, Optional
7
+ from urllib.parse import quote, urljoin
8
+ from datetime import datetime
9
+ from pathlib import Path
10
+ from datasets import Dataset, DatasetDict
11
+ import huggingface_hub
12
+
13
+ from fastapi import FastAPI, BackgroundTasks, HTTPException, status
14
+ from fastapi.responses import JSONResponse
15
+ from pydantic import BaseModel, Field
16
+ import uvicorn
17
+
18
+ # Path for storing caption data
19
+ CAPTIONS_DIR = Path("captions_data")
20
+ CAPTIONS_DIR.mkdir(exist_ok=True)
21
+
22
+ # Hugging Face configuration
23
+ HF_TOKEN = os.getenv("HF_TOKEN")
24
+ HF_DATASET_ID = os.getenv("HF_DATASET_ID", "fred808/helium")
25
+
26
+ if not HF_TOKEN:
27
+ raise ValueError("HF_TOKEN environment variable is required")
28
+
29
+ def get_caption_file_path(course: str) -> Path:
30
+ """Get the path to the JSON file for storing course captions"""
31
+ safe_name = quote(course, safe='')
32
+ return CAPTIONS_DIR / f"{safe_name}_captions.json"
33
+
34
+ def save_captions_to_file(course: str, captions: List[Dict]) -> None:
35
+ """Save captions to a JSON file"""
36
+ try:
37
+ file_path = get_caption_file_path(course)
38
+ with open(file_path, 'w', encoding='utf-8') as f:
39
+ json.dump(captions, f, indent=2, ensure_ascii=False)
40
+ print(f"βœ“ Saved {len(captions)} captions for {course}")
41
+ except Exception as e:
42
+ print(f"Error saving captions for {course}: {e}")
43
+
44
+ def load_captions_from_file(course: str) -> List[Dict]:
45
+ """Load existing captions from JSON file"""
46
+ try:
47
+ file_path = get_caption_file_path(course)
48
+ if file_path.exists():
49
+ with open(file_path, 'r', encoding='utf-8') as f:
50
+ captions = json.load(f)
51
+ print(f"βœ“ Loaded {len(captions)} existing captions for {course}")
52
+ return captions
53
+ except Exception as e:
54
+ print(f"Error loading captions for {course}: {e}")
55
+ return []
56
+
57
+ # Configuration
58
+ SOURCE_SERVER = "https://favoredone-imm.hf.space"
59
+ CAPTION_SERVERS = [
60
+ "https://favoredone-tv88mp.hf.space",
61
+ "https://favoredone-7p1dcf.hf.space",
62
+ "https://favoredone-k7b4mf.hf.space",
63
+ "https://favoredone-mzlxc7.hf.space",
64
+ "https://favoredone-aomfwa.hf.space",
65
+ "https://favoredone-7g6v04.hf.space",
66
+ "https://favoredone-dk1skh.hf.space",
67
+ "https://favoredone-z4yo0y.hf.space",
68
+ "https://favoredone-f6czeq.hf.space",
69
+ "https://favoredone-5fo8ga.hf.space",
70
+ "https://favoredone-zde8x6.hf.space",
71
+ "https://favoredone-r0biih.hf.space",
72
+ "https://favoredone-ljdzkf.hf.space",
73
+ "https://favoredone-irrpe5.hf.space",
74
+ "https://favoredone-bh9rwz.hf.space",
75
+ "https://favoredone-u8c4dt.hf.space",
76
+ "https://favoredone-futwyd.hf.space",
77
+ "https://favoredone-hg2sot.hf.space",
78
+ "https://favoredone-pvweug.hf.space",
79
+ "https://favoredone-z6azk2.hf.space",
80
+ "https://favoredone-4zid9w.hf.space",
81
+ "https://favoredone-be7a1r.hf.space",
82
+ "https://favoredone-ayazxa.hf.space",
83
+ "https://favoredone-6ckj4m.hf.space",
84
+ "https://favoredone-whn0xu.hf.space",
85
+ "https://favoredone-t49exm.hf.space",
86
+ "https://favoredone-cgrh0a.hf.space",
87
+ "https://favoredone-r1kb5g.hf.space"
88
+ ]
89
+ MODEL_TYPE = "Florence-2-large" # Explicitly request large model
90
+
91
+ # FastAPI Models
92
+ class CourseInfo(BaseModel):
93
+ course_folder: str
94
+
95
+ class ImageInfo(BaseModel):
96
+ filename: str
97
+
98
+ class CaptionRequest(BaseModel):
99
+ image_url: str
100
+ model_choice: str = MODEL_TYPE
101
+
102
+ class CaptionResponse(BaseModel):
103
+ success: bool
104
+ caption: Optional[str] = None
105
+ error: Optional[str] = None
106
+
107
+ class ServerStatus(BaseModel):
108
+ url: str
109
+ model: str
110
+ busy: bool
111
+ total_processed: int
112
+ total_time: float
113
+ fps: float
114
+
115
+ class ProcessingStatus(BaseModel):
116
+ course: str
117
+ total_images: int
118
+ processed_images: int
119
+ progress_percent: float
120
+ status: str
121
+
122
+ class StartProcessingRequest(BaseModel):
123
+ courses: Optional[List[str]] = None # If None, process all courses
124
+ continuous: bool = True # Default to continuous like original
125
+
126
+ # FastAPI App
127
+ app = FastAPI(
128
+ title="Caption Coordinator API",
129
+ description="Distributed caption processing coordinator",
130
+ version="1.0.0"
131
+ )
132
+
133
+ # Global state
134
+ processed_images: Dict[str, Set[str]] = {} # {course: set(image_names)}
135
+ course_captions: Dict[str, List[Dict]] = {} # {course: [{image, caption, metadata}]}
136
+ failed_images: Dict[str, Set[str]] = {} # {course: set(image_names)}
137
+ servers = []
138
+ is_processing = False
139
+ current_processing_task = None
140
+ auto_start_processing = True # Set to False if you don't want auto-start
141
+
142
+ class CaptionServer:
143
+ def __init__(self, url):
144
+ self.url = url
145
+ self.busy = False
146
+ self.model = "unknown"
147
+ self.total_processed = 0
148
+ self.total_time = 0
149
+
150
+ @property
151
+ def fps(self):
152
+ return self.total_processed / self.total_time if self.total_time > 0 else 0
153
+
154
+ # Initialize servers
155
+ def initialize_servers():
156
+ global servers
157
+ servers = [CaptionServer(url) for url in CAPTION_SERVERS]
158
+
159
+ # API Routes
160
+ @app.get("/")
161
+ async def root():
162
+ return {
163
+ "message": "Caption Coordinator API",
164
+ "status": "running",
165
+ "auto_processing": auto_start_processing,
166
+ "is_processing": is_processing
167
+ }
168
+
169
+ @app.get("/health")
170
+ async def health():
171
+ return {
172
+ "status": "healthy",
173
+ "servers_available": len([s for s in servers if not s.busy]),
174
+ "total_servers": len(servers),
175
+ "is_processing": is_processing,
176
+ "auto_processing": auto_start_processing
177
+ }
178
+
179
+ @app.get("/courses")
180
+ async def get_courses():
181
+ """Fetch available courses from source server"""
182
+ try:
183
+ async with aiohttp.ClientSession() as session:
184
+ async with session.get(f"{SOURCE_SERVER}/courses") as resp:
185
+ data = await resp.json()
186
+ if isinstance(data, dict) and 'courses' in data:
187
+ return [c['course_folder'] for c in data['courses'] if isinstance(c, dict)]
188
+ return []
189
+ except Exception as e:
190
+ raise HTTPException(status_code=500, detail=f"Error fetching courses: {e}")
191
+
192
+ @app.get("/courses/{course}/images")
193
+ async def get_course_images(course: str):
194
+ """Fetch images list for a course"""
195
+ try:
196
+ course_frames = f"{course}_frames" if not course.endswith("_frames") else course
197
+ url = f"{SOURCE_SERVER}/images/{quote(course_frames)}"
198
+ async with aiohttp.ClientSession() as session:
199
+ async with session.get(url) as resp:
200
+ data = await resp.json()
201
+ if isinstance(data, dict) and 'images' in data:
202
+ return data['images']
203
+ return []
204
+ except Exception as e:
205
+ raise HTTPException(status_code=500, detail=f"Error fetching images: {e}")
206
+
207
+ @app.get("/servers/status")
208
+ async def get_servers_status():
209
+ """Get status of all caption servers"""
210
+ server_statuses = []
211
+ for server in servers:
212
+ server_statuses.append(ServerStatus(
213
+ url=server.url,
214
+ model=server.model,
215
+ busy=server.busy,
216
+ total_processed=server.total_processed,
217
+ total_time=server.total_time,
218
+ fps=server.fps
219
+ ))
220
+ return server_statuses
221
+
222
+ @app.get("/processing/status")
223
+ async def get_processing_status():
224
+ """Get current processing status"""
225
+ status_info = {}
226
+ for course in processed_images:
227
+ total = len(processed_images[course])
228
+ processed = len(course_captions.get(course, []))
229
+ failed = len(failed_images.get(course, set()))
230
+ status_info[course] = {
231
+ "course": course,
232
+ "total_images": total,
233
+ "processed_images": processed,
234
+ "failed_images": failed,
235
+ "progress_percent": (processed / total * 100) if total > 0 else 0,
236
+ "status": "completed" if processed + failed >= total else "processing"
237
+ }
238
+ return status_info
239
+
240
+ @app.post("/processing/start")
241
+ async def start_processing(request: StartProcessingRequest = StartProcessingRequest()):
242
+ """Start caption processing"""
243
+ global is_processing, current_processing_task
244
+
245
+ if is_processing:
246
+ raise HTTPException(status_code=400, detail="Processing is already running")
247
+
248
+ is_processing = True
249
+ current_processing_task = asyncio.create_task(
250
+ processing_loop(request.courses, request.continuous)
251
+ )
252
+
253
+ return {
254
+ "message": "Processing started",
255
+ "continuous": request.continuous,
256
+ "specific_courses": request.courses
257
+ }
258
+
259
+ @app.post("/processing/stop")
260
+ async def stop_processing():
261
+ """Stop caption processing"""
262
+ global is_processing, current_processing_task
263
+
264
+ if not is_processing:
265
+ raise HTTPException(status_code=400, detail="Processing is not running")
266
+
267
+ is_processing = False
268
+ if current_processing_task:
269
+ current_processing_task.cancel()
270
+ try:
271
+ await current_processing_task
272
+ except asyncio.CancelledError:
273
+ pass
274
+ current_processing_task = None
275
+
276
+ return {"message": "Processing stopped"}
277
+
278
+ @app.get("/captions/{course}")
279
+ async def get_captions(course: str):
280
+ """Get captions for a specific course"""
281
+ captions = load_captions_from_file(course)
282
+ return {
283
+ "course": course,
284
+ "total_captions": len(captions),
285
+ "captions": captions
286
+ }
287
+
288
+ @app.delete("/captions/{course}")
289
+ async def delete_captions(course: str):
290
+ """Delete captions for a specific course"""
291
+ try:
292
+ file_path = get_caption_file_path(course)
293
+ if file_path.exists():
294
+ file_path.unlink()
295
+ if course in processed_images:
296
+ del processed_images[course]
297
+ if course in course_captions:
298
+ del course_captions[course]
299
+ if course in failed_images:
300
+ del failed_images[course]
301
+ return {"message": f"Captions for {course} deleted"}
302
+ else:
303
+ raise HTTPException(status_code=404, detail=f"No captions found for {course}")
304
+ except Exception as e:
305
+ raise HTTPException(status_code=500, detail=f"Error deleting captions: {e}")
306
+
307
+ # Core processing functions
308
+ async def fetch_courses() -> List[str]:
309
+ """Fetch available courses from source server"""
310
+ async with aiohttp.ClientSession() as session:
311
+ async with session.get(f"{SOURCE_SERVER}/courses") as resp:
312
+ data = await resp.json()
313
+ if isinstance(data, dict) and 'courses' in data:
314
+ return [c['course_folder'] for c in data['courses'] if isinstance(c, dict)]
315
+ return []
316
+
317
+ async def fetch_course_images(course: str) -> List[Dict]:
318
+ """Fetch images list for a course"""
319
+ course_frames = f"{course}_frames" if not course.endswith("_frames") else course
320
+ url = f"{SOURCE_SERVER}/images/{quote(course_frames)}"
321
+ async with aiohttp.ClientSession() as session:
322
+ async with session.get(url) as resp:
323
+ data = await resp.json()
324
+ if isinstance(data, dict) and 'images' in data:
325
+ return data['images']
326
+ return []
327
+
328
+ async def get_caption(server: str, image_url: str) -> Dict:
329
+ """Get caption from a specific server"""
330
+ params = {
331
+ 'image_url': image_url,
332
+ 'model_choice': MODEL_TYPE
333
+ }
334
+ try:
335
+ async with aiohttp.ClientSession() as session:
336
+ async with session.get(server, params=params, timeout=30) as resp:
337
+ return await resp.json()
338
+ except Exception as e:
339
+ print(f"Error from {server}: {e}")
340
+ return None
341
+
342
+ async def get_model_info():
343
+ """Get model information from caption servers"""
344
+ model_info = []
345
+ async with aiohttp.ClientSession() as session:
346
+ for server in CAPTION_SERVERS:
347
+ try:
348
+ health_url = server.rsplit('/analyze', 1)[0] + '/health'
349
+ async with session.get(health_url) as resp:
350
+ info = await resp.json()
351
+ model_info.append({
352
+ 'url': server,
353
+ 'model': info.get('model_choice', 'unknown')
354
+ })
355
+ except Exception as e:
356
+ print(f"Couldn't get model info from {server}: {e}")
357
+ return model_info
358
+
359
+ async def process_image(server: CaptionServer, course: str, image: Dict) -> Dict:
360
+ """Process single image through one caption server with better error handling"""
361
+ if server.busy:
362
+ return None
363
+
364
+ server.busy = True
365
+ start_time = time.time()
366
+
367
+ try:
368
+ # Structure URL correctly: /images/COURSE_NAME_frames/IMAGE.png
369
+ course_frames = f"{course}_frames" if not course.endswith("_frames") else course
370
+ image_url = urljoin(SOURCE_SERVER, f"/images/{quote(course_frames)}/{quote(image['filename'])}")
371
+ result = await get_caption(server.url, image_url)
372
+
373
+ processing_time = time.time() - start_time
374
+ server.total_time += processing_time
375
+
376
+ if result and result.get('success') and result.get('caption'):
377
+ server.total_processed += 1
378
+ metadata = {
379
+ "image": image['filename'],
380
+ "caption": result['caption'],
381
+ "server": server.url,
382
+ "processing_time": processing_time,
383
+ "timestamp": datetime.now().isoformat()
384
+ }
385
+ print(f"Server {server.url} processed {image['filename']} in {processing_time:.2f}s ({server.fps:.2f} fps)")
386
+ return metadata
387
+ else:
388
+ # Server responded but no caption (might be error or empty response)
389
+ error_msg = result.get('error', 'Unknown error') if result else 'No response'
390
+ print(f"Server {server.url} failed for {image['filename']}: {error_msg}")
391
+ return None
392
+
393
+ except asyncio.TimeoutError:
394
+ print(f"Server {server.url} timeout for {image['filename']}")
395
+ return None
396
+ except Exception as e:
397
+ print(f"Error processing {image['filename']} on {server.url}: {e}")
398
+ return None
399
+
400
+ finally:
401
+ server.busy = False
402
+
403
+ async def upload_to_huggingface(course: str, metadata_list: List[Dict]):
404
+ """Upload course captions to Hugging Face dataset"""
405
+ try:
406
+ print(f"πŸ“€ Uploading {len(metadata_list)} captions for {course} to Hugging Face...")
407
+
408
+ # Prepare data for Hugging Face dataset
409
+ dataset_data = {
410
+ "course": [],
411
+ "image_filename": [],
412
+ "caption": [],
413
+ "processing_server": [],
414
+ "processing_time": [],
415
+ "timestamp": []
416
+ }
417
+
418
+ for metadata in metadata_list:
419
+ dataset_data["course"].append(course)
420
+ dataset_data["image_filename"].append(metadata["image"])
421
+ dataset_data["caption"].append(metadata["caption"])
422
+ dataset_data["processing_server"].append(metadata["server"])
423
+ dataset_data["processing_time"].append(metadata["processing_time"])
424
+ dataset_data["timestamp"].append(metadata["timestamp"])
425
+
426
+ # Login to Hugging Face
427
+ huggingface_hub.login(token=HF_TOKEN)
428
+
429
+ # Convert to JSON string
430
+ json_data = json.dumps(dataset_data, indent=2, ensure_ascii=False)
431
+
432
+ # Create filename for the course
433
+ filename = f"{course.replace('/', '_').replace(' ', '_')}_captions.json"
434
+
435
+ # Upload directly to hub as JSON file
436
+ huggingface_hub.upload_file(
437
+ path_or_fileobj=json_data.encode(),
438
+ path_in_repo=filename,
439
+ repo_id=HF_DATASET_ID,
440
+ commit_message=f"Add captions for course {course} - {len(metadata_list)} images"
441
+ )
442
+
443
+ print(f"βœ… Successfully uploaded {len(metadata_list)} captions for {course} to {HF_DATASET_ID}/{filename}")
444
+ return True
445
+
446
+ except Exception as e:
447
+ print(f"❌ Error uploading to Hugging Face: {e}")
448
+ return False
449
+
450
+ async def process_course(course: str, servers: List[CaptionServer]):
451
+ """Process all images in a course using available servers with proper retry logic"""
452
+ # Initialize course tracking
453
+ if course not in processed_images:
454
+ processed_images[course] = set()
455
+ if course not in course_captions:
456
+ course_captions[course] = load_captions_from_file(course)
457
+ # Update processed images set from loaded captions
458
+ for cap in course_captions[course]:
459
+ processed_images[course].add(cap['image'])
460
+ if course not in failed_images:
461
+ failed_images[course] = set()
462
+
463
+ # Get list of images
464
+ images = await fetch_course_images(course)
465
+ if not images:
466
+ print(f"No images found for course {course}")
467
+ return
468
+
469
+ print(f"\nProcessing {len(images)} images for course {course}")
470
+
471
+ # Track images that need processing with retry count (5 retries)
472
+ pending_images = {}
473
+ for img in images:
474
+ filename = img['filename']
475
+ if filename not in processed_images[course] and filename not in failed_images[course]:
476
+ pending_images[filename] = {'image': img, 'retries': 0, 'max_retries': 5}
477
+
478
+ if not pending_images:
479
+ print(f"All images already processed or failed for course {course}")
480
+ print(f"- Processed: {len(processed_images[course])}, Failed: {len(failed_images[course])}")
481
+
482
+ # If course is completed, upload to Hugging Face
483
+ if len(processed_images[course]) + len(failed_images[course]) >= len(images):
484
+ if course_captions[course]:
485
+ print(f"πŸ“€ Course {course} completed, uploading to Hugging Face...")
486
+ await upload_to_huggingface(course, course_captions[course])
487
+ return
488
+
489
+ print(f"Images to process: {len(pending_images)} (already processed: {len(processed_images[course])}, failed: {len(failed_images[course])})")
490
+
491
+ batch_size = len([s for s in servers if not s.busy])
492
+ processed_in_this_run = 0
493
+
494
+ while pending_images and is_processing:
495
+ # Create tasks for each available server
496
+ tasks = []
497
+ assigned_images = []
498
+
499
+ for server in servers:
500
+ if not server.busy and pending_images:
501
+ # Get the next pending image
502
+ filename, img_data = next(iter(pending_images.items()))
503
+ img = img_data['image']
504
+
505
+ # Assign this image to the server
506
+ tasks.append(process_image(server, course, img))
507
+ assigned_images.append((filename, img, img_data['retries']))
508
+ # Remove from pending temporarily while it's being processed
509
+ del pending_images[filename]
510
+
511
+ if not tasks:
512
+ # If no servers available, wait a bit
513
+ await asyncio.sleep(0.1)
514
+ continue
515
+
516
+ # Process images in parallel across servers
517
+ results = await asyncio.gather(*tasks)
518
+
519
+ # Handle results and retry logic
520
+ has_new_results = False
521
+ for (filename, img, current_retries), result in zip(assigned_images, results):
522
+ if result:
523
+ # Success - image was processed
524
+ processed_images[course].add(filename)
525
+ course_captions[course].append(result)
526
+ has_new_results = True
527
+ processed_in_this_run += 1
528
+ print(f"βœ“ Successfully processed {filename}")
529
+ else:
530
+ # Failure - check if we should retry
531
+ if current_retries < 5: # max_retries
532
+ # Put back in pending for retry with incremented retry count
533
+ pending_images[filename] = {
534
+ 'image': img,
535
+ 'retries': current_retries + 1,
536
+ 'max_retries': 5
537
+ }
538
+ print(f"↻ Retry {current_retries + 1}/5 for {filename}")
539
+ else:
540
+ # Max retries exceeded, mark as failed
541
+ failed_images[course].add(filename)
542
+ print(f"βœ— Failed to process {filename} after 5 retries")
543
+
544
+ # Save progress after each batch with new results
545
+ if has_new_results:
546
+ save_captions_to_file(course, course_captions[course])
547
+
548
+ # Show progress
549
+ total = len(images)
550
+ done = len(processed_images[course])
551
+ failed_count = len(failed_images[course])
552
+ pending_count = len(pending_images)
553
+ progress_percent = (done / total * 100) if total > 0 else 0
554
+
555
+ print(f"\rProgress: {done}/{total} ({progress_percent:.1f}%) - {pending_count} pending, {failed_count} failed, {processed_in_this_run} new", end="", flush=True)
556
+
557
+ # Small delay to prevent overwhelming the servers
558
+ await asyncio.sleep(0.5)
559
+
560
+ # Final status for this course
561
+ total = len(images)
562
+ done = len(processed_images[course])
563
+ failed_count = len(failed_images[course])
564
+
565
+ if done + failed_count >= total:
566
+ if failed_count > 0:
567
+ print(f"\nβœ“ Course {course} completed with {failed_count} failed images")
568
+ else:
569
+ print(f"\nβœ“ Course {course} fully completed")
570
+
571
+ # Upload to Hugging Face when course is completed
572
+ if course_captions[course]:
573
+ print(f"πŸ“€ Uploading {len(course_captions[course])} captions to Hugging Face...")
574
+ success = await upload_to_huggingface(course, course_captions[course])
575
+ if success:
576
+ print(f"βœ… Successfully uploaded {course} to Hugging Face")
577
+ else:
578
+ print(f"❌ Failed to upload {course} to Hugging Face")
579
+ else:
580
+ print(f"\n→ Course {course} partially completed: {done}/{total} processed, {failed_count} failed")
581
+
582
+ async def processing_loop(specific_courses: Optional[List[str]] = None, continuous: bool = True):
583
+ """Main processing loop with proper error handling"""
584
+ global is_processing
585
+
586
+ # Get model information and verify Florence-2-large availability
587
+ model_info = await get_model_info()
588
+ print("\nCaption Servers:")
589
+ available_servers = []
590
+ for info, server in zip(model_info, servers):
591
+ server.model = info['model']
592
+ if MODEL_TYPE in info.get('model', ''):
593
+ available_servers.append(server)
594
+ print(f"βœ“ {server.url} confirmed {MODEL_TYPE}")
595
+ else:
596
+ print(f"βœ— {server.url} using {server.model} - skipping (requires {MODEL_TYPE})")
597
+
598
+ if not available_servers:
599
+ print(f"\nError: No servers with {MODEL_TYPE} available!")
600
+ is_processing = False
601
+ return
602
+
603
+ # Update servers list to only use those with large model
604
+ processing_servers = available_servers
605
+ print(f"\nUsing {len(processing_servers)} servers with {MODEL_TYPE}")
606
+
607
+ # Check for existing caption files and report
608
+ existing_captions = list(CAPTIONS_DIR.glob("*_captions.json"))
609
+ if existing_captions:
610
+ print("\nFound existing caption files:")
611
+ for cap_file in existing_captions:
612
+ course = cap_file.stem.replace("_captions", "")
613
+ try:
614
+ with open(cap_file, 'r', encoding='utf-8') as f:
615
+ captions = json.load(f)
616
+ print(f"- {course}: {len(captions)} captions")
617
+ except Exception as e:
618
+ print(f"- Error reading {cap_file.name}: {e}")
619
+ print()
620
+
621
+ start_time = time.time()
622
+ iteration = 0
623
+
624
+ while is_processing:
625
+ try:
626
+ iteration += 1
627
+ print(f"\n{'='*50}")
628
+ print(f"Processing Iteration {iteration}")
629
+ print(f"{'='*50}")
630
+
631
+ # Get available courses
632
+ if specific_courses:
633
+ courses = specific_courses
634
+ print(f"Processing specific courses: {courses}")
635
+ else:
636
+ courses = await fetch_courses()
637
+ print(f"Found {len(courses)} courses")
638
+
639
+ if not courses:
640
+ print("No courses found, waiting...")
641
+ if not continuous:
642
+ break
643
+ await asyncio.sleep(10)
644
+ continue
645
+
646
+ # Process each course with all available servers
647
+ for course in courses:
648
+ if not is_processing:
649
+ break
650
+
651
+ print(f"\n--- Processing course: {course} ---")
652
+ await process_course(course, processing_servers)
653
+
654
+ # Show server stats
655
+ print("\nServer Stats:")
656
+ total_processed = sum(s.total_processed for s in processing_servers)
657
+ elapsed = time.time() - start_time
658
+ if elapsed > 0:
659
+ print(f"Total images processed: {total_processed}")
660
+ print(f"Overall speed: {total_processed/elapsed:.2f} fps")
661
+ for s in processing_servers:
662
+ print(f"- {s.url}: {s.total_processed} images, {s.fps:.2f} fps")
663
+ print()
664
+
665
+ if not continuous:
666
+ print("One-time processing completed")
667
+ break
668
+
669
+ # Wait before next check
670
+ print("Waiting for new courses...")
671
+ await asyncio.sleep(5)
672
+
673
+ except asyncio.CancelledError:
674
+ print("Processing cancelled")
675
+ break
676
+ except Exception as e:
677
+ print(f"Error in processing loop: {str(e)}")
678
+ import traceback
679
+ traceback.print_exc()
680
+ await asyncio.sleep(10)
681
+
682
+ is_processing = False
683
+ print("Processing loop stopped")
684
+
685
+ # Startup event
686
+ @app.on_event("startup")
687
+ async def startup_event():
688
+ """Initialize servers and start processing on startup"""
689
+ initialize_servers()
690
+ print("Caption Coordinator API started")
691
+ print(f"Source server: {SOURCE_SERVER}")
692
+ print(f"Caption servers: {len(CAPTION_SERVERS)}")
693
+ print(f"Hugging Face dataset: {HF_DATASET_ID}")
694
+ print(f"HF Token: {'βœ… Set' if HF_TOKEN else '❌ Missing'}")
695
+
696
+ # Start processing automatically (like original main())
697
+ if auto_start_processing:
698
+ print("Auto-starting processing loop...")
699
+ global is_processing, current_processing_task
700
+ is_processing = True
701
+ current_processing_task = asyncio.create_task(processing_loop())
702
+
703
+
704
+ if __name__ == "__main__":
705
  uvicorn.run(app, host="0.0.0.0", port=8000, reload=True)