|
|
import os |
|
|
import json |
|
|
import time |
|
|
import asyncio |
|
|
import aiohttp |
|
|
import zipfile |
|
|
from typing import Dict, List, Set, Optional |
|
|
from urllib.parse import quote |
|
|
from datetime import datetime |
|
|
from pathlib import Path |
|
|
import io |
|
|
|
|
|
from fastapi import FastAPI, BackgroundTasks, HTTPException, status |
|
|
from pydantic import BaseModel, Field |
|
|
from huggingface_hub import HfApi, hf_hub_download |
|
|
import uvicorn |
|
|
|
|
|
|
|
|
|
|
|
FLOW_ID = os.getenv("FLOW_ID", "flow_default") |
|
|
FLOW_PORT = int(os.getenv("FLOW_PORT", 8001)) |
|
|
|
|
|
|
|
|
MANAGER_URL = os.getenv("MANAGER_URL", "https://fred808-fcord.hf.space") |
|
|
MANAGER_COMPLETE_TASK_URL = f"{MANAGER_URL}/task/complete" |
|
|
|
|
|
|
|
|
HF_TOKEN = os.getenv("HF_TOKEN", "") |
|
|
HF_DATASET_ID = os.getenv("HF_DATASET_ID", "Fred808/BG3") |
|
|
HF_OUTPUT_DATASET_ID = os.getenv("HF_OUTPUT_DATASET_ID", "fred808/helium") |
|
|
|
|
|
|
|
|
CAPTION_SERVERS = [ |
|
|
"https://fred808-pil-4-1.hf.space/analyze", |
|
|
"https://fred808-pil-4-2.hf.space/analyze", |
|
|
"https://fred808-pil-4-3.hf.space/analyze", |
|
|
"https://fred1012-fred1012-gw0j2h.hf.space/analyze", |
|
|
"https://fred1012-fred1012-wqs6c2.hf.space/analyze", |
|
|
"https://fred1012-fred1012-oncray.hf.space/analyze", |
|
|
"https://fred1012-fred1012-4goge7.hf.space/analyze", |
|
|
"https://fred1012-fred1012-z0eh7m.hf.space/analyze", |
|
|
"https://fred1012-fred1012-u95rte.hf.space/analyze", |
|
|
"https://fred1012-fred1012-igje22.hf.space/analyze", |
|
|
"https://fred1012-fred1012-ibkuf8.hf.space/analyze", |
|
|
"https://fred1012-fred1012-nwqthy.hf.space/analyze", |
|
|
"https://fred1012-fred1012-4ldqj4.hf.space/analyze", |
|
|
"https://fred1012-fred1012-pivlzg.hf.space/analyze", |
|
|
"https://fred1012-fred1012-ptlc5u.hf.space/analyze", |
|
|
"https://fred1012-fred1012-u7lh57.hf.space/analyze", |
|
|
"https://fred1012-fred1012-q8djv1.hf.space/analyze", |
|
|
"https://fredalone-fredalone-ozugrp.hf.space/analyze", |
|
|
"https://fredalone-fredalone-9brxj2.hf.space/analyze", |
|
|
"https://fredalone-fredalone-p8vq9a.hf.space/analyze", |
|
|
"https://fredalone-fredalone-vbli2y.hf.space/analyze", |
|
|
"https://fredalone-fredalone-uggger.hf.space/analyze", |
|
|
"https://fredalone-fredalone-nmi7e8.hf.space/analyze", |
|
|
"https://fredalone-fredalone-d1f26d.hf.space/analyze", |
|
|
"https://fredalone-fredalone-461jp2.hf.space/analyze", |
|
|
"https://fredalone-fredalone-3enfg4.hf.space/analyze", |
|
|
"https://fredalone-fredalone-dqdbpv.hf.space/analyze", |
|
|
"https://fredalone-fredalone-ivtjua.hf.space/analyze", |
|
|
"https://fredalone-fredalone-6bezt2.hf.space/analyze", |
|
|
"https://fredalone-fredalone-e0wfnk.hf.space/analyze", |
|
|
"https://fredalone-fredalone-zu2t7j.hf.space/analyze", |
|
|
"https://fredalone-fredalone-dqtv1o.hf.space/analyze", |
|
|
"https://fredalone-fredalone-wclyog.hf.space/analyze", |
|
|
"https://fredalone-fredalone-t27vig.hf.space/analyze", |
|
|
"https://fredalone-fredalone-gahbxh.hf.space/analyze", |
|
|
"https://fredalone-fredalone-kw2po4.hf.space/analyze", |
|
|
"https://fredalone-fredalone-8h285h.hf.space/analyze" |
|
|
] |
|
|
MODEL_TYPE = "Florence-2-large" |
|
|
|
|
|
|
|
|
TEMP_DIR = Path(f"temp_images_{FLOW_ID}") |
|
|
TEMP_DIR.mkdir(exist_ok=True) |
|
|
|
|
|
|
|
|
class ProcessCourseRequest(BaseModel): |
|
|
course_name: Optional[str] = None |
|
|
|
|
|
class CaptionServer: |
|
|
def __init__(self, url): |
|
|
self.url = url |
|
|
self.busy = False |
|
|
self.total_processed = 0 |
|
|
self.total_time = 0 |
|
|
self.model = MODEL_TYPE |
|
|
|
|
|
@property |
|
|
def fps(self): |
|
|
return self.total_processed / self.total_time if self.total_time > 0 else 0 |
|
|
|
|
|
|
|
|
servers = [CaptionServer(url) for url in CAPTION_SERVERS] |
|
|
server_index = 0 |
|
|
|
|
|
|
|
|
|
|
|
async def get_available_server(timeout: float = 300.0) -> CaptionServer: |
|
|
"""Round-robin selection of an available caption server.""" |
|
|
global server_index |
|
|
start_time = time.time() |
|
|
while True: |
|
|
|
|
|
for _ in range(len(servers)): |
|
|
server = servers[server_index] |
|
|
server_index = (server_index + 1) % len(servers) |
|
|
if not server.busy: |
|
|
return server |
|
|
|
|
|
|
|
|
await asyncio.sleep(0.5) |
|
|
|
|
|
|
|
|
if time.time() - start_time > timeout: |
|
|
raise TimeoutError(f"Timeout ({timeout}s) waiting for an available caption server.") |
|
|
|
|
|
async def send_image_for_captioning(image_path: Path, course_name: str, progress_tracker: Dict) -> Optional[Dict]: |
|
|
"""Sends a single image to a caption server for processing.""" |
|
|
|
|
|
MAX_RETRIES = 3 |
|
|
for attempt in range(MAX_RETRIES): |
|
|
server = None |
|
|
try: |
|
|
|
|
|
server = await get_available_server() |
|
|
server.busy = True |
|
|
start_time = time.time() |
|
|
|
|
|
|
|
|
if attempt == 0: |
|
|
print(f"[{FLOW_ID}] Starting attempt on {image_path.name}...") |
|
|
|
|
|
|
|
|
form_data = aiohttp.FormData() |
|
|
form_data.add_field('file', |
|
|
image_path.open('rb'), |
|
|
filename=image_path.name, |
|
|
content_type='image/jpeg') |
|
|
form_data.add_field('model_choice', MODEL_TYPE) |
|
|
|
|
|
|
|
|
async with aiohttp.ClientSession() as session: |
|
|
|
|
|
async with session.post(server.url, data=form_data, timeout=600) as resp: |
|
|
if resp.status == 200: |
|
|
result = await resp.json() |
|
|
caption = result.get("caption") |
|
|
|
|
|
if caption: |
|
|
|
|
|
progress_tracker['completed'] += 1 |
|
|
if progress_tracker['completed'] % 50 == 0: |
|
|
print(f"[{FLOW_ID}] PROGRESS: {progress_tracker['completed']}/{progress_tracker['total']} captions completed.") |
|
|
|
|
|
|
|
|
if progress_tracker['completed'] % 50 != 0: |
|
|
print(f"[{FLOW_ID}] Success: {image_path.name} captioned by {server.url}") |
|
|
|
|
|
return { |
|
|
"course": course_name, |
|
|
"image_path": image_path.name, |
|
|
"caption": caption, |
|
|
"timestamp": datetime.now().isoformat() |
|
|
} |
|
|
else: |
|
|
print(f"[{FLOW_ID}] Server {server.url} returned success but no caption for {image_path.name}. Retrying...") |
|
|
continue |
|
|
else: |
|
|
error_text = await resp.text() |
|
|
print(f"[{FLOW_ID}] Error from server {server.url} for {image_path.name}: {resp.status} - {error_text}. Retrying...") |
|
|
continue |
|
|
|
|
|
except (aiohttp.ClientError, asyncio.TimeoutError, TimeoutError) as e: |
|
|
print(f"[{FLOW_ID}] Connection/Timeout error for {image_path.name} on {server.url if server else 'unknown server'}: {e}. Retrying...") |
|
|
continue |
|
|
except Exception as e: |
|
|
print(f"[{FLOW_ID}] Unexpected error during captioning for {image_path.name}: {e}. Retrying...") |
|
|
continue |
|
|
finally: |
|
|
if server: |
|
|
end_time = time.time() |
|
|
server.busy = False |
|
|
server.total_processed += 1 |
|
|
server.total_time += (end_time - start_time) |
|
|
|
|
|
print(f"[{FLOW_ID}] FAILED after {MAX_RETRIES} attempts for {image_path.name}.") |
|
|
return None |
|
|
|
|
|
async def download_and_extract_zip(course_name: str, processed_files: Set[str]) -> Optional[tuple[Path, str, str]]: |
|
|
"""Downloads the zip file for the course and extracts its contents.""" |
|
|
print(f"[{FLOW_ID}] Looking for files starting with '{course_name}' in frames/ directory...") |
|
|
|
|
|
try: |
|
|
api = HfApi(token=HF_TOKEN) |
|
|
|
|
|
|
|
|
repo_files = api.list_repo_files( |
|
|
repo_id=HF_DATASET_ID, |
|
|
repo_type="dataset" |
|
|
) |
|
|
|
|
|
|
|
|
matching_files = [ |
|
|
f for f in repo_files |
|
|
if f.startswith(f"frames/{course_name}") and f.endswith('.zip') |
|
|
] |
|
|
|
|
|
if not matching_files: |
|
|
print(f"[{FLOW_ID}] No zip files found starting with '{course_name}' in frames/ directory.") |
|
|
return None, None |
|
|
|
|
|
|
|
|
unprocessed_files = [f for f in matching_files if f not in processed_files] |
|
|
|
|
|
if not unprocessed_files: |
|
|
print(f"[{FLOW_ID}] No new zip files found for '{course_name}'.") |
|
|
return None, None, None |
|
|
|
|
|
repo_file_full_path = unprocessed_files[0] |
|
|
|
|
|
|
|
|
zip_full_name = Path(repo_file_full_path).name |
|
|
print(f"[{FLOW_ID}] Found new matching file: {repo_file_full_path}. Full name: {zip_full_name}") |
|
|
|
|
|
|
|
|
zip_path = hf_hub_download( |
|
|
repo_id=HF_DATASET_ID, |
|
|
filename=repo_file_full_path, |
|
|
repo_type="dataset", |
|
|
token=HF_TOKEN, |
|
|
) |
|
|
|
|
|
print(f"[{FLOW_ID}] Downloaded to {zip_path}. Extracting...") |
|
|
|
|
|
|
|
|
extract_dir = TEMP_DIR / course_name |
|
|
extract_dir.mkdir(exist_ok=True) |
|
|
|
|
|
with zipfile.ZipFile(zip_path, 'r') as zip_ref: |
|
|
zip_ref.extractall(extract_dir) |
|
|
|
|
|
print(f"[{FLOW_ID}] Extraction complete to {extract_dir}.") |
|
|
|
|
|
|
|
|
return extract_dir, zip_full_name, repo_file_full_path |
|
|
|
|
|
except Exception as e: |
|
|
print(f"[{FLOW_ID}] Error downloading or extracting zip for {course_name}: {e}") |
|
|
return None, None, None |
|
|
|
|
|
async def upload_captions_to_hf(zip_full_name: str, captions: List[Dict]) -> bool: |
|
|
"""Uploads the final captions JSON file to the output dataset. |
|
|
|
|
|
The user requested the output JSON file to be named after the full zip file name. |
|
|
""" |
|
|
|
|
|
caption_filename = Path(zip_full_name).with_suffix('.json').name |
|
|
|
|
|
try: |
|
|
print(f"[{FLOW_ID}] Uploading {len(captions)} captions for {zip_full_name} as {caption_filename} to {HF_OUTPUT_DATASET_ID}...") |
|
|
|
|
|
|
|
|
json_content = json.dumps(captions, indent=2, ensure_ascii=False).encode('utf-8') |
|
|
|
|
|
api = HfApi(token=HF_TOKEN) |
|
|
api.upload_file( |
|
|
path_or_fileobj=io.BytesIO(json_content), |
|
|
path_in_repo=caption_filename, |
|
|
repo_id=HF_OUTPUT_DATASET_ID, |
|
|
repo_type="dataset", |
|
|
commit_message=f"[{FLOW_ID}] Captions for {zip_full_name}" |
|
|
) |
|
|
|
|
|
print(f"[{FLOW_ID}] Successfully uploaded captions for {zip_full_name}.") |
|
|
return True |
|
|
|
|
|
except Exception as e: |
|
|
print(f"[{FLOW_ID}] Error uploading captions for {zip_full_name}: {e}") |
|
|
return False |
|
|
|
|
|
async def process_course_task(course_name: str): |
|
|
"""Main task to process a single course, looping until all files are processed.""" |
|
|
print(f"[{FLOW_ID}] Starting continuous processing for course: {course_name}") |
|
|
|
|
|
processed_files = set() |
|
|
all_processed_files_log = [] |
|
|
global_success = True |
|
|
|
|
|
|
|
|
while True: |
|
|
extract_dir = None |
|
|
zip_full_name = None |
|
|
repo_file_full_path = None |
|
|
|
|
|
try: |
|
|
|
|
|
download_result = await download_and_extract_zip(course_name, processed_files) |
|
|
|
|
|
if download_result is None or download_result[0] is None: |
|
|
|
|
|
if download_result is not None and download_result[0] is None and download_result[1] is None: |
|
|
print(f"[{FLOW_ID}] No new files found for {course_name}. Exiting loop.") |
|
|
break |
|
|
else: |
|
|
|
|
|
raise Exception("Failed to download or extract zip file.") |
|
|
|
|
|
extract_dir, zip_full_name, repo_file_full_path = download_result |
|
|
|
|
|
|
|
|
processed_files.add(repo_file_full_path) |
|
|
all_processed_files_log.append(repo_file_full_path) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
image_paths = [p for p in extract_dir.glob("**/*") if p.is_file() and p.suffix.lower() in ['.jpg', '.jpeg', '.png']] |
|
|
print(f"[{FLOW_ID}] Found {len(image_paths)} images to process in {zip_full_name}.") |
|
|
|
|
|
current_file_success = False |
|
|
|
|
|
if not image_paths: |
|
|
print(f"[{FLOW_ID}] No images found in {zip_full_name}. Marking as complete.") |
|
|
current_file_success = True |
|
|
else: |
|
|
|
|
|
progress_tracker = { |
|
|
'total': len(image_paths), |
|
|
'completed': 0 |
|
|
} |
|
|
print(f"[{FLOW_ID}] Starting captioning for {progress_tracker['total']} images in {zip_full_name}...") |
|
|
|
|
|
|
|
|
semaphore = asyncio.Semaphore(len(servers)) |
|
|
|
|
|
async def limited_send_image_for_captioning(image_path, course_name, progress_tracker): |
|
|
async with semaphore: |
|
|
return await send_image_for_captioning(image_path, course_name, progress_tracker) |
|
|
|
|
|
|
|
|
caption_tasks = [] |
|
|
for image_path in image_paths: |
|
|
caption_tasks.append(limited_send_image_for_captioning(image_path, course_name, progress_tracker)) |
|
|
|
|
|
|
|
|
results = await asyncio.gather(*caption_tasks) |
|
|
|
|
|
|
|
|
all_captions = [r for r in results if r is not None] |
|
|
|
|
|
|
|
|
if len(all_captions) == len(image_paths): |
|
|
print(f"[{FLOW_ID}] FINAL PROGRESS for {zip_full_name}: Successfully completed all {len(all_captions)} captions.") |
|
|
current_file_success = True |
|
|
else: |
|
|
print(f"[{FLOW_ID}] FINAL PROGRESS for {zip_full_name}: Completed with partial result: {len(all_captions)}/{len(image_paths)} captions.") |
|
|
current_file_success = False |
|
|
|
|
|
|
|
|
if all_captions and zip_full_name: |
|
|
|
|
|
print(f"[{FLOW_ID}] Uploading {len(all_captions)} captions for {zip_full_name}...") |
|
|
if await upload_captions_to_hf(zip_full_name, all_captions): |
|
|
print(f"[{FLOW_ID}] Successfully uploaded captions for {zip_full_name}.") |
|
|
|
|
|
if not current_file_success: |
|
|
global_success = False |
|
|
else: |
|
|
print(f"[{FLOW_ID}] Failed to upload captions for {zip_full_name}.") |
|
|
current_file_success = False |
|
|
global_success = False |
|
|
else: |
|
|
print(f"[{FLOW_ID}] No captions generated or zip_full_name is missing. Skipping upload for {zip_full_name}.") |
|
|
current_file_success = False |
|
|
global_success = False |
|
|
|
|
|
|
|
|
|
|
|
except Exception as e: |
|
|
error_message = str(e) |
|
|
print(f"[{FLOW_ID}] Critical error in process_course_task for {course_name}: {error_message}") |
|
|
global_success = False |
|
|
|
|
|
finally: |
|
|
|
|
|
if extract_dir and extract_dir.exists(): |
|
|
print(f"[{FLOW_ID}] Cleaned up temporary directory {extract_dir}.") |
|
|
import shutil |
|
|
shutil.rmtree(extract_dir, ignore_errors=True) |
|
|
|
|
|
|
|
|
if download_result is None and extract_dir is None: |
|
|
break |
|
|
|
|
|
|
|
|
print(f"[{FLOW_ID}] All processing loops complete for {course_name}.") |
|
|
print(f"[{FLOW_ID}] Total files processed: {len(all_processed_files_log)}") |
|
|
print(f"[{FLOW_ID}] List of processed files: {all_processed_files_log}") |
|
|
|
|
|
|
|
|
final_error_message = error_message if not global_success else None |
|
|
|
|
|
|
|
|
|
|
|
return global_success |
|
|
|
|
|
async def report_completion(course_name: str, success: bool, error_message: Optional[str] = None): |
|
|
"""Reports the task result back to the Manager Server.""" |
|
|
print(f"[{FLOW_ID}] Reporting completion for {course_name} (Success: {success})...") |
|
|
|
|
|
payload = { |
|
|
"flow_id": FLOW_ID, |
|
|
"course_name": course_name, |
|
|
"success": success, |
|
|
"error_message": error_message |
|
|
} |
|
|
|
|
|
try: |
|
|
async with aiohttp.ClientSession() as session: |
|
|
async with session.post(MANAGER_COMPLETE_TASK_URL, json=payload) as resp: |
|
|
if resp.status != 200: |
|
|
print(f"[{FLOW_ID}] ERROR: Manager reported non-200 status: {resp.status} - {await resp.text()}") |
|
|
else: |
|
|
print(f"[{FLOW_ID}] Successfully reported completion to Manager.") |
|
|
|
|
|
except aiohttp.ClientError as e: |
|
|
print(f"[{FLOW_ID}] CRITICAL ERROR: Could not connect to Manager at {MANAGER_COMPLETE_TASK_URL}. Task completion not reported. Error: {e}") |
|
|
except Exception as e: |
|
|
print(f"[{FLOW_ID}] Unexpected error during reporting: {e}") |
|
|
|
|
|
|
|
|
|
|
|
app = FastAPI( |
|
|
title=f"Flow Server {FLOW_ID} API", |
|
|
description="Fetches, extracts, and captions images for a given course.", |
|
|
version="1.0.0" |
|
|
) |
|
|
|
|
|
@app.on_event("startup") |
|
|
async def startup_event(): |
|
|
print(f"Flow Server {FLOW_ID} started on port {FLOW_PORT}. Manager URL: {MANAGER_URL}") |
|
|
|
|
|
@app.get("/") |
|
|
async def root(): |
|
|
return { |
|
|
"flow_id": FLOW_ID, |
|
|
"status": "ready", |
|
|
"manager_url": MANAGER_URL, |
|
|
"total_servers": len(servers), |
|
|
"busy_servers": sum(1 for s in servers if s.busy), |
|
|
} |
|
|
|
|
|
@app.post("/process_course") |
|
|
async def process_course(request: ProcessCourseRequest, background_tasks: BackgroundTasks): |
|
|
""" |
|
|
Receives a course name from the Manager and starts processing in the background. |
|
|
""" |
|
|
course_name = request.course_name |
|
|
|
|
|
if not course_name: |
|
|
print(f"[{FLOW_ID}] Received empty course name. Stopping processing loop.") |
|
|
return {"status": "stopped", "message": "No more courses to process."} |
|
|
|
|
|
print(f"[{FLOW_ID}] Received course: {course_name}. Starting background task.") |
|
|
|
|
|
|
|
|
background_tasks.add_task(process_course_task, course_name) |
|
|
|
|
|
return {"status": "processing", "course_name": course_name, "message": "Processing started in background."} |
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
|
|
uvicorn.run(app, host="0.0.0.0", port=FLOW_PORT) |