|
|
import os |
|
|
import json |
|
|
import time |
|
|
import asyncio |
|
|
import aiohttp |
|
|
from typing import Dict, List, Set, Optional |
|
|
from urllib.parse import quote, urljoin |
|
|
from datetime import datetime |
|
|
from pathlib import Path |
|
|
from datasets import Dataset, DatasetDict |
|
|
import huggingface_hub |
|
|
|
|
|
from fastapi import FastAPI, BackgroundTasks, HTTPException, status |
|
|
from fastapi.responses import JSONResponse |
|
|
from pydantic import BaseModel, Field |
|
|
import uvicorn |
|
|
|
|
|
|
|
|
CAPTIONS_DIR = Path("captions_data") |
|
|
CAPTIONS_DIR.mkdir(exist_ok=True) |
|
|
|
|
|
|
|
|
HF_TOKEN = os.getenv("HF_TOKEN") |
|
|
HF_DATASET_ID = os.getenv("HF_DATASET_ID", "fred808/helium") |
|
|
|
|
|
if not HF_TOKEN: |
|
|
raise ValueError("HF_TOKEN environment variable is required") |
|
|
|
|
|
def get_caption_file_path(course: str) -> Path: |
|
|
"""Get the path to the JSON file for storing course captions""" |
|
|
safe_name = quote(course, safe='') |
|
|
return CAPTIONS_DIR / f"{safe_name}_captions.json" |
|
|
|
|
|
def save_captions_to_file(course: str, captions: List[Dict]) -> None: |
|
|
"""Save captions to a JSON file""" |
|
|
try: |
|
|
file_path = get_caption_file_path(course) |
|
|
with open(file_path, 'w', encoding='utf-8') as f: |
|
|
json.dump(captions, f, indent=2, ensure_ascii=False) |
|
|
print(f"β Saved {len(captions)} captions for {course}") |
|
|
except Exception as e: |
|
|
print(f"Error saving captions for {course}: {e}") |
|
|
|
|
|
def load_captions_from_file(course: str) -> List[Dict]: |
|
|
"""Load existing captions from JSON file""" |
|
|
try: |
|
|
file_path = get_caption_file_path(course) |
|
|
if file_path.exists(): |
|
|
with open(file_path, 'r', encoding='utf-8') as f: |
|
|
captions = json.load(f) |
|
|
print(f"β Loaded {len(captions)} existing captions for {course}") |
|
|
return captions |
|
|
except Exception as e: |
|
|
print(f"Error loading captions for {course}: {e}") |
|
|
return [] |
|
|
|
|
|
|
|
|
SOURCE_SERVER = "https://fred808-vssee.hf.space" |
|
|
CAPTION_SERVERS = [ |
|
|
"https://fred808-pil-4-1.hf.space/analyze", |
|
|
"https://fred808-pil-4-2.hf.space/analyze", |
|
|
"https://fred808-pil-4-3.hf.space/analyze", |
|
|
"https://fred1012-fred1012-gw0j2h.hf.space/analyze", |
|
|
"https://fred1012-fred1012-wqs6c2.hf.space/analyze", |
|
|
"https://fred1012-fred1012-oncray.hf.space/analyze", |
|
|
"https://fred1012-fred1012-4goge7.hf.space/analyze", |
|
|
"https://fred1012-fred1012-z0eh7m.hf.space/analyze", |
|
|
"https://fred1012-fred1012-u95rte.hf.space/analyze", |
|
|
"https://fred1012-fred1012-igje22.hf.space/analyze", |
|
|
"https://fred1012-fred1012-ibkuf8.hf.space/analyze", |
|
|
"https://fred1012-fred1012-nwqthy.hf.space/analyze", |
|
|
"https://fred1012-fred1012-4ldqj4.hf.space/analyze", |
|
|
"https://fred1012-fred1012-pivlzg.hf.space/analyze", |
|
|
"https://fred1012-fred1012-ptlc5u.hf.space/analyze", |
|
|
"https://fred1012-fred1012-u7lh57.hf.space/analyze", |
|
|
"https://fred1012-fred1012-q8djv1.hf.space/analyze", |
|
|
"https://fredalone-fredalone-ozugrp.hf.space/analyze", |
|
|
"https://fredalone-fredalone-9brxj2.hf.space/analyze", |
|
|
"https://fredalone-fredalone-p8vq9a.hf.space/analyze", |
|
|
"https://fredalone-fredalone-vbli2y.hf.space/analyze", |
|
|
"https://fredalone-fredalone-uggger.hf.space/analyze", |
|
|
"https://fredalone-fredalone-nmi7e8.hf.space/analyze", |
|
|
"https://fredalone-fredalone-d1f26d.hf.space/analyze", |
|
|
"https://fredalone-fredalone-461jp2.hf.space/analyze", |
|
|
"https://fredalone-fredalone-3enfg4.hf.space/analyze", |
|
|
"https://fredalone-fredalone-dqdbpv.hf.space/analyze", |
|
|
"https://fredalone-fredalone-ivtjua.hf.space/analyze", |
|
|
"https://fredalone-fredalone-6bezt2.hf.space/analyze", |
|
|
"https://fredalone-fredalone-e0wfnk.hf.space/analyze", |
|
|
"https://fredalone-fredalone-zu2t7j.hf.space/analyze", |
|
|
"https://fredalone-fredalone-dqtv1o.hf.space/analyze", |
|
|
"https://fredalone-fredalone-wclyog.hf.space/analyze", |
|
|
"https://fredalone-fredalone-t27vig.hf.space/analyze", |
|
|
"https://fredalone-fredalone-gahbxh.hf.space/analyze", |
|
|
"https://fredalone-fredalone-kw2po4.hf.space/analyze", |
|
|
"https://fredalone-fredalone-8h285h.hf.space/analyze" |
|
|
] |
|
|
MODEL_TYPE = "Florence-2-large" |
|
|
|
|
|
|
|
|
class CourseInfo(BaseModel): |
|
|
course_folder: str |
|
|
|
|
|
class ImageInfo(BaseModel): |
|
|
filename: str |
|
|
|
|
|
class CaptionRequest(BaseModel): |
|
|
image_url: str |
|
|
model_choice: str = MODEL_TYPE |
|
|
|
|
|
class CaptionResponse(BaseModel): |
|
|
success: bool |
|
|
caption: Optional[str] = None |
|
|
error: Optional[str] = None |
|
|
|
|
|
class ServerStatus(BaseModel): |
|
|
url: str |
|
|
model: str |
|
|
busy: bool |
|
|
total_processed: int |
|
|
total_time: float |
|
|
fps: float |
|
|
|
|
|
class ProcessingStatus(BaseModel): |
|
|
course: str |
|
|
total_images: int |
|
|
processed_images: int |
|
|
progress_percent: float |
|
|
status: str |
|
|
|
|
|
class StartProcessingRequest(BaseModel): |
|
|
courses: Optional[List[str]] = None |
|
|
continuous: bool = True |
|
|
|
|
|
|
|
|
app = FastAPI( |
|
|
title="Caption Coordinator API", |
|
|
description="Distributed caption processing coordinator", |
|
|
version="1.0.0" |
|
|
) |
|
|
|
|
|
|
|
|
processed_images: Dict[str, Set[str]] = {} |
|
|
course_captions: Dict[str, List[Dict]] = {} |
|
|
failed_images: Dict[str, Set[str]] = {} |
|
|
servers = [] |
|
|
is_processing = False |
|
|
current_processing_task = None |
|
|
auto_start_processing = True |
|
|
|
|
|
class CaptionServer: |
|
|
def __init__(self, url): |
|
|
self.url = url |
|
|
self.busy = False |
|
|
self.model = "unknown" |
|
|
self.total_processed = 0 |
|
|
self.total_time = 0 |
|
|
|
|
|
@property |
|
|
def fps(self): |
|
|
return self.total_processed / self.total_time if self.total_time > 0 else 0 |
|
|
|
|
|
|
|
|
def initialize_servers(): |
|
|
global servers |
|
|
servers = [CaptionServer(url) for url in CAPTION_SERVERS] |
|
|
|
|
|
|
|
|
@app.get("/") |
|
|
async def root(): |
|
|
return { |
|
|
"message": "Caption Coordinator API", |
|
|
"status": "running", |
|
|
"auto_processing": auto_start_processing, |
|
|
"is_processing": is_processing |
|
|
} |
|
|
|
|
|
@app.get("/health") |
|
|
async def health(): |
|
|
return { |
|
|
"status": "healthy", |
|
|
"servers_available": len([s for s in servers if not s.busy]), |
|
|
"total_servers": len(servers), |
|
|
"is_processing": is_processing, |
|
|
"auto_processing": auto_start_processing |
|
|
} |
|
|
|
|
|
@app.get("/courses") |
|
|
async def get_courses(): |
|
|
"""Fetch available courses from source server""" |
|
|
try: |
|
|
async with aiohttp.ClientSession() as session: |
|
|
async with session.get(f"{SOURCE_SERVER}/courses") as resp: |
|
|
data = await resp.json() |
|
|
if isinstance(data, dict) and 'courses' in data: |
|
|
return [c['course_folder'] for c in data['courses'] if isinstance(c, dict)] |
|
|
return [] |
|
|
except Exception as e: |
|
|
raise HTTPException(status_code=500, detail=f"Error fetching courses: {e}") |
|
|
|
|
|
@app.get("/courses/{course}/images") |
|
|
async def get_course_images(course: str): |
|
|
"""Fetch images list for a course""" |
|
|
try: |
|
|
course_frames = f"{course}_frames" if not course.endswith("_frames") else course |
|
|
url = f"{SOURCE_SERVER}/images/{quote(course_frames)}" |
|
|
async with aiohttp.ClientSession() as session: |
|
|
async with session.get(url) as resp: |
|
|
data = await resp.json() |
|
|
if isinstance(data, dict) and 'images' in data: |
|
|
return data['images'] |
|
|
return [] |
|
|
except Exception as e: |
|
|
raise HTTPException(status_code=500, detail=f"Error fetching images: {e}") |
|
|
|
|
|
@app.get("/servers/status") |
|
|
async def get_servers_status(): |
|
|
"""Get status of all caption servers""" |
|
|
server_statuses = [] |
|
|
for server in servers: |
|
|
server_statuses.append(ServerStatus( |
|
|
url=server.url, |
|
|
model=server.model, |
|
|
busy=server.busy, |
|
|
total_processed=server.total_processed, |
|
|
total_time=server.total_time, |
|
|
fps=server.fps |
|
|
)) |
|
|
return server_statuses |
|
|
|
|
|
@app.get("/processing/status") |
|
|
async def get_processing_status(): |
|
|
"""Get current processing status""" |
|
|
status_info = {} |
|
|
for course in processed_images: |
|
|
total = len(processed_images[course]) |
|
|
processed = len(course_captions.get(course, [])) |
|
|
failed = len(failed_images.get(course, set())) |
|
|
status_info[course] = { |
|
|
"course": course, |
|
|
"total_images": total, |
|
|
"processed_images": processed, |
|
|
"failed_images": failed, |
|
|
"progress_percent": (processed / total * 100) if total > 0 else 0, |
|
|
"status": "completed" if processed + failed >= total else "processing" |
|
|
} |
|
|
return status_info |
|
|
|
|
|
@app.post("/processing/start") |
|
|
async def start_processing(request: StartProcessingRequest = StartProcessingRequest()): |
|
|
"""Start caption processing""" |
|
|
global is_processing, current_processing_task |
|
|
|
|
|
if is_processing: |
|
|
raise HTTPException(status_code=400, detail="Processing is already running") |
|
|
|
|
|
is_processing = True |
|
|
current_processing_task = asyncio.create_task( |
|
|
processing_loop(request.courses, request.continuous) |
|
|
) |
|
|
|
|
|
return { |
|
|
"message": "Processing started", |
|
|
"continuous": request.continuous, |
|
|
"specific_courses": request.courses |
|
|
} |
|
|
|
|
|
@app.post("/processing/stop") |
|
|
async def stop_processing(): |
|
|
"""Stop caption processing""" |
|
|
global is_processing, current_processing_task |
|
|
|
|
|
if not is_processing: |
|
|
raise HTTPException(status_code=400, detail="Processing is not running") |
|
|
|
|
|
is_processing = False |
|
|
if current_processing_task: |
|
|
current_processing_task.cancel() |
|
|
try: |
|
|
await current_processing_task |
|
|
except asyncio.CancelledError: |
|
|
pass |
|
|
current_processing_task = None |
|
|
|
|
|
return {"message": "Processing stopped"} |
|
|
|
|
|
@app.get("/captions/{course}") |
|
|
async def get_captions(course: str): |
|
|
"""Get captions for a specific course""" |
|
|
captions = load_captions_from_file(course) |
|
|
return { |
|
|
"course": course, |
|
|
"total_captions": len(captions), |
|
|
"captions": captions |
|
|
} |
|
|
|
|
|
@app.delete("/captions/{course}") |
|
|
async def delete_captions(course: str): |
|
|
"""Delete captions for a specific course""" |
|
|
try: |
|
|
file_path = get_caption_file_path(course) |
|
|
if file_path.exists(): |
|
|
file_path.unlink() |
|
|
if course in processed_images: |
|
|
del processed_images[course] |
|
|
if course in course_captions: |
|
|
del course_captions[course] |
|
|
if course in failed_images: |
|
|
del failed_images[course] |
|
|
return {"message": f"Captions for {course} deleted"} |
|
|
else: |
|
|
raise HTTPException(status_code=404, detail=f"No captions found for {course}") |
|
|
except Exception as e: |
|
|
raise HTTPException(status_code=500, detail=f"Error deleting captions: {e}") |
|
|
|
|
|
|
|
|
async def fetch_courses() -> List[str]: |
|
|
"""Fetch available courses from source server""" |
|
|
async with aiohttp.ClientSession() as session: |
|
|
async with session.get(f"{SOURCE_SERVER}/courses") as resp: |
|
|
data = await resp.json() |
|
|
if isinstance(data, dict) and 'courses' in data: |
|
|
return [c['course_folder'] for c in data['courses'] if isinstance(c, dict)] |
|
|
return [] |
|
|
|
|
|
async def fetch_course_images(course: str) -> List[Dict]: |
|
|
"""Fetch images list for a course""" |
|
|
course_frames = f"{course}_frames" if not course.endswith("_frames") else course |
|
|
url = f"{SOURCE_SERVER}/images/{quote(course_frames)}" |
|
|
async with aiohttp.ClientSession() as session: |
|
|
async with session.get(url) as resp: |
|
|
data = await resp.json() |
|
|
if isinstance(data, dict) and 'images' in data: |
|
|
return data['images'] |
|
|
return [] |
|
|
|
|
|
async def get_caption(server: str, image_url: str) -> Dict: |
|
|
"""Get caption from a specific server""" |
|
|
params = { |
|
|
'image_url': image_url, |
|
|
'model_choice': MODEL_TYPE |
|
|
} |
|
|
try: |
|
|
async with aiohttp.ClientSession() as session: |
|
|
async with session.get(server, params=params, timeout=30) as resp: |
|
|
return await resp.json() |
|
|
except Exception as e: |
|
|
print(f"Error from {server}: {e}") |
|
|
return None |
|
|
|
|
|
async def get_model_info(): |
|
|
"""Get model information from caption servers""" |
|
|
model_info = [] |
|
|
async with aiohttp.ClientSession() as session: |
|
|
for server in CAPTION_SERVERS: |
|
|
try: |
|
|
health_url = server.rsplit('/analyze', 1)[0] + '/health' |
|
|
async with session.get(health_url) as resp: |
|
|
info = await resp.json() |
|
|
model_info.append({ |
|
|
'url': server, |
|
|
'model': info.get('model_choice', 'unknown') |
|
|
}) |
|
|
except Exception as e: |
|
|
print(f"Couldn't get model info from {server}: {e}") |
|
|
return model_info |
|
|
|
|
|
async def process_image(server: CaptionServer, course: str, image: Dict) -> Dict: |
|
|
"""Process single image through one caption server with better error handling""" |
|
|
if server.busy: |
|
|
return None |
|
|
|
|
|
server.busy = True |
|
|
start_time = time.time() |
|
|
|
|
|
try: |
|
|
|
|
|
course_frames = f"{course}_frames" if not course.endswith("_frames") else course |
|
|
image_url = urljoin(SOURCE_SERVER, f"/images/{quote(course_frames)}/{quote(image['filename'])}") |
|
|
result = await get_caption(server.url, image_url) |
|
|
|
|
|
processing_time = time.time() - start_time |
|
|
server.total_time += processing_time |
|
|
|
|
|
if result and result.get('success') and result.get('caption'): |
|
|
server.total_processed += 1 |
|
|
metadata = { |
|
|
"image": image['filename'], |
|
|
"caption": result['caption'], |
|
|
"server": server.url, |
|
|
"processing_time": processing_time, |
|
|
"timestamp": datetime.now().isoformat() |
|
|
} |
|
|
print(f"Server {server.url} processed {image['filename']} in {processing_time:.2f}s ({server.fps:.2f} fps)") |
|
|
return metadata |
|
|
else: |
|
|
|
|
|
error_msg = result.get('error', 'Unknown error') if result else 'No response' |
|
|
print(f"Server {server.url} failed for {image['filename']}: {error_msg}") |
|
|
return None |
|
|
|
|
|
except asyncio.TimeoutError: |
|
|
print(f"Server {server.url} timeout for {image['filename']}") |
|
|
return None |
|
|
except Exception as e: |
|
|
print(f"Error processing {image['filename']} on {server.url}: {e}") |
|
|
return None |
|
|
|
|
|
finally: |
|
|
server.busy = False |
|
|
|
|
|
async def upload_to_huggingface(course: str, metadata_list: List[Dict]): |
|
|
"""Upload course captions to Hugging Face dataset""" |
|
|
try: |
|
|
print(f"π€ Uploading {len(metadata_list)} captions for {course} to Hugging Face...") |
|
|
|
|
|
|
|
|
dataset_data = { |
|
|
"course": [], |
|
|
"image_filename": [], |
|
|
"caption": [], |
|
|
"processing_server": [], |
|
|
"processing_time": [], |
|
|
"timestamp": [] |
|
|
} |
|
|
|
|
|
for metadata in metadata_list: |
|
|
dataset_data["course"].append(course) |
|
|
dataset_data["image_filename"].append(metadata["image"]) |
|
|
dataset_data["caption"].append(metadata["caption"]) |
|
|
dataset_data["processing_server"].append(metadata["server"]) |
|
|
dataset_data["processing_time"].append(metadata["processing_time"]) |
|
|
dataset_data["timestamp"].append(metadata["timestamp"]) |
|
|
|
|
|
|
|
|
dataset = Dataset.from_dict(dataset_data) |
|
|
|
|
|
|
|
|
huggingface_hub.login(token=HF_TOKEN) |
|
|
|
|
|
|
|
|
dataset.push_to_hub( |
|
|
HF_DATASET_ID, |
|
|
config_name=course.replace("/", "_").replace(" ", "_"), |
|
|
split="train", |
|
|
commit_message=f"Add captions for course {course} - {len(metadata_list)} images" |
|
|
) |
|
|
|
|
|
print(f"β
Successfully uploaded {len(metadata_list)} captions for {course} to {HF_DATASET_ID}") |
|
|
return True |
|
|
|
|
|
except Exception as e: |
|
|
print(f"β Error uploading to Hugging Face: {e}") |
|
|
return False |
|
|
|
|
|
async def process_course(course: str, servers: List[CaptionServer]): |
|
|
"""Process all images in a course using available servers with proper retry logic""" |
|
|
|
|
|
if course not in processed_images: |
|
|
processed_images[course] = set() |
|
|
if course not in course_captions: |
|
|
course_captions[course] = load_captions_from_file(course) |
|
|
|
|
|
for cap in course_captions[course]: |
|
|
processed_images[course].add(cap['image']) |
|
|
if course not in failed_images: |
|
|
failed_images[course] = set() |
|
|
|
|
|
|
|
|
images = await fetch_course_images(course) |
|
|
if not images: |
|
|
print(f"No images found for course {course}") |
|
|
return |
|
|
|
|
|
print(f"\nProcessing {len(images)} images for course {course}") |
|
|
|
|
|
|
|
|
pending_images = {} |
|
|
for img in images: |
|
|
filename = img['filename'] |
|
|
if filename not in processed_images[course] and filename not in failed_images[course]: |
|
|
pending_images[filename] = {'image': img, 'retries': 0, 'max_retries': 5} |
|
|
|
|
|
if not pending_images: |
|
|
print(f"All images already processed or failed for course {course}") |
|
|
print(f"- Processed: {len(processed_images[course])}, Failed: {len(failed_images[course])}") |
|
|
|
|
|
|
|
|
if len(processed_images[course]) + len(failed_images[course]) >= len(images): |
|
|
if course_captions[course]: |
|
|
print(f"π€ Course {course} completed, uploading to Hugging Face...") |
|
|
await upload_to_huggingface(course, course_captions[course]) |
|
|
return |
|
|
|
|
|
print(f"Images to process: {len(pending_images)} (already processed: {len(processed_images[course])}, failed: {len(failed_images[course])})") |
|
|
|
|
|
batch_size = len([s for s in servers if not s.busy]) |
|
|
processed_in_this_run = 0 |
|
|
|
|
|
while pending_images and is_processing: |
|
|
|
|
|
tasks = [] |
|
|
assigned_images = [] |
|
|
|
|
|
for server in servers: |
|
|
if not server.busy and pending_images: |
|
|
|
|
|
filename, img_data = next(iter(pending_images.items())) |
|
|
img = img_data['image'] |
|
|
|
|
|
|
|
|
tasks.append(process_image(server, course, img)) |
|
|
assigned_images.append((filename, img, img_data['retries'])) |
|
|
|
|
|
del pending_images[filename] |
|
|
|
|
|
if not tasks: |
|
|
|
|
|
await asyncio.sleep(0.1) |
|
|
continue |
|
|
|
|
|
|
|
|
results = await asyncio.gather(*tasks) |
|
|
|
|
|
|
|
|
has_new_results = False |
|
|
for (filename, img, current_retries), result in zip(assigned_images, results): |
|
|
if result: |
|
|
|
|
|
processed_images[course].add(filename) |
|
|
course_captions[course].append(result) |
|
|
has_new_results = True |
|
|
processed_in_this_run += 1 |
|
|
print(f"β Successfully processed {filename}") |
|
|
else: |
|
|
|
|
|
if current_retries < 5: |
|
|
|
|
|
pending_images[filename] = { |
|
|
'image': img, |
|
|
'retries': current_retries + 1, |
|
|
'max_retries': 5 |
|
|
} |
|
|
print(f"β» Retry {current_retries + 1}/5 for {filename}") |
|
|
else: |
|
|
|
|
|
failed_images[course].add(filename) |
|
|
print(f"β Failed to process {filename} after 5 retries") |
|
|
|
|
|
|
|
|
if has_new_results: |
|
|
save_captions_to_file(course, course_captions[course]) |
|
|
|
|
|
|
|
|
total = len(images) |
|
|
done = len(processed_images[course]) |
|
|
failed_count = len(failed_images[course]) |
|
|
pending_count = len(pending_images) |
|
|
progress_percent = (done / total * 100) if total > 0 else 0 |
|
|
|
|
|
print(f"\rProgress: {done}/{total} ({progress_percent:.1f}%) - {pending_count} pending, {failed_count} failed, {processed_in_this_run} new", end="", flush=True) |
|
|
|
|
|
|
|
|
await asyncio.sleep(0.5) |
|
|
|
|
|
|
|
|
total = len(images) |
|
|
done = len(processed_images[course]) |
|
|
failed_count = len(failed_images[course]) |
|
|
|
|
|
if done + failed_count >= total: |
|
|
if failed_count > 0: |
|
|
print(f"\nβ Course {course} completed with {failed_count} failed images") |
|
|
else: |
|
|
print(f"\nβ Course {course} fully completed") |
|
|
|
|
|
|
|
|
if course_captions[course]: |
|
|
print(f"π€ Uploading {len(course_captions[course])} captions to Hugging Face...") |
|
|
success = await upload_to_huggingface(course, course_captions[course]) |
|
|
if success: |
|
|
print(f"β
Successfully uploaded {course} to Hugging Face") |
|
|
else: |
|
|
print(f"β Failed to upload {course} to Hugging Face") |
|
|
else: |
|
|
print(f"\nβ Course {course} partially completed: {done}/{total} processed, {failed_count} failed") |
|
|
|
|
|
async def processing_loop(specific_courses: Optional[List[str]] = None, continuous: bool = True): |
|
|
"""Main processing loop with proper error handling""" |
|
|
global is_processing |
|
|
|
|
|
|
|
|
model_info = await get_model_info() |
|
|
print("\nCaption Servers:") |
|
|
available_servers = [] |
|
|
for info, server in zip(model_info, servers): |
|
|
server.model = info['model'] |
|
|
if MODEL_TYPE in info.get('model', ''): |
|
|
available_servers.append(server) |
|
|
print(f"β {server.url} confirmed {MODEL_TYPE}") |
|
|
else: |
|
|
print(f"β {server.url} using {server.model} - skipping (requires {MODEL_TYPE})") |
|
|
|
|
|
if not available_servers: |
|
|
print(f"\nError: No servers with {MODEL_TYPE} available!") |
|
|
is_processing = False |
|
|
return |
|
|
|
|
|
|
|
|
processing_servers = available_servers |
|
|
print(f"\nUsing {len(processing_servers)} servers with {MODEL_TYPE}") |
|
|
|
|
|
|
|
|
existing_captions = list(CAPTIONS_DIR.glob("*_captions.json")) |
|
|
if existing_captions: |
|
|
print("\nFound existing caption files:") |
|
|
for cap_file in existing_captions: |
|
|
course = cap_file.stem.replace("_captions", "") |
|
|
try: |
|
|
with open(cap_file, 'r', encoding='utf-8') as f: |
|
|
captions = json.load(f) |
|
|
print(f"- {course}: {len(captions)} captions") |
|
|
except Exception as e: |
|
|
print(f"- Error reading {cap_file.name}: {e}") |
|
|
print() |
|
|
|
|
|
start_time = time.time() |
|
|
iteration = 0 |
|
|
|
|
|
while is_processing: |
|
|
try: |
|
|
iteration += 1 |
|
|
print(f"\n{'='*50}") |
|
|
print(f"Processing Iteration {iteration}") |
|
|
print(f"{'='*50}") |
|
|
|
|
|
|
|
|
if specific_courses: |
|
|
courses = specific_courses |
|
|
print(f"Processing specific courses: {courses}") |
|
|
else: |
|
|
courses = await fetch_courses() |
|
|
print(f"Found {len(courses)} courses") |
|
|
|
|
|
if not courses: |
|
|
print("No courses found, waiting...") |
|
|
if not continuous: |
|
|
break |
|
|
await asyncio.sleep(10) |
|
|
continue |
|
|
|
|
|
|
|
|
for course in courses: |
|
|
if not is_processing: |
|
|
break |
|
|
|
|
|
print(f"\n--- Processing course: {course} ---") |
|
|
await process_course(course, processing_servers) |
|
|
|
|
|
|
|
|
print("\nServer Stats:") |
|
|
total_processed = sum(s.total_processed for s in processing_servers) |
|
|
elapsed = time.time() - start_time |
|
|
if elapsed > 0: |
|
|
print(f"Total images processed: {total_processed}") |
|
|
print(f"Overall speed: {total_processed/elapsed:.2f} fps") |
|
|
for s in processing_servers: |
|
|
print(f"- {s.url}: {s.total_processed} images, {s.fps:.2f} fps") |
|
|
print() |
|
|
|
|
|
if not continuous: |
|
|
print("One-time processing completed") |
|
|
break |
|
|
|
|
|
|
|
|
print("Waiting for new courses...") |
|
|
await asyncio.sleep(5) |
|
|
|
|
|
except asyncio.CancelledError: |
|
|
print("Processing cancelled") |
|
|
break |
|
|
except Exception as e: |
|
|
print(f"Error in processing loop: {str(e)}") |
|
|
import traceback |
|
|
traceback.print_exc() |
|
|
await asyncio.sleep(10) |
|
|
|
|
|
is_processing = False |
|
|
print("Processing loop stopped") |
|
|
|
|
|
|
|
|
@app.on_event("startup") |
|
|
async def startup_event(): |
|
|
"""Initialize servers and start processing on startup""" |
|
|
initialize_servers() |
|
|
print("Caption Coordinator API started") |
|
|
print(f"Source server: {SOURCE_SERVER}") |
|
|
print(f"Caption servers: {len(CAPTION_SERVERS)}") |
|
|
print(f"Hugging Face dataset: {HF_DATASET_ID}") |
|
|
print(f"HF Token: {'β
Set' if HF_TOKEN else 'β Missing'}") |
|
|
|
|
|
|
|
|
if auto_start_processing: |
|
|
print("Auto-starting processing loop...") |
|
|
global is_processing, current_processing_task |
|
|
is_processing = True |
|
|
current_processing_task = asyncio.create_task(processing_loop()) |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
uvicorn.run(app, host="0.0.0.0", port=8000, reload=True) |