|
|
import os
|
|
|
import json
|
|
|
import time
|
|
|
import asyncio
|
|
|
import aiohttp
|
|
|
import zipfile
|
|
|
import shutil
|
|
|
from typing import Dict, List, Set, Optional, Tuple, Any
|
|
|
from urllib.parse import quote
|
|
|
from datetime import datetime
|
|
|
from pathlib import Path
|
|
|
import io
|
|
|
|
|
|
from fastapi import FastAPI, BackgroundTasks, HTTPException, status
|
|
|
from pydantic import BaseModel, Field
|
|
|
from huggingface_hub import HfApi, hf_hub_download
|
|
|
|
|
|
|
|
|
AUTO_START_INDEX = 0
|
|
|
FLOW_ID = os.getenv("FLOW_ID", "flow_default")
|
|
|
FLOW_PORT = int(os.getenv("FLOW_PORT", 8001))
|
|
|
HF_TOKEN = os.getenv("HF_TOKEN", "")
|
|
|
HF_DATASET_ID = os.getenv("HF_DATASET_ID", "Fred808/BG3")
|
|
|
HF_OUTPUT_DATASET_ID = os.getenv("HF_OUTPUT_DATASET_ID", "fred808/helium")
|
|
|
|
|
|
|
|
|
PROGRESS_FILE = Path("processing_progress.json")
|
|
|
HF_STATE_FILE = "processing_state_captions.json"
|
|
|
LOCAL_STATE_FOLDER = Path(".state")
|
|
|
LOCAL_STATE_FOLDER.mkdir(exist_ok=True)
|
|
|
|
|
|
|
|
|
ZIP_FILE_PREFIX = "frames_zips/"
|
|
|
|
|
|
|
|
|
CAPTION_SERVERS = [
|
|
|
"https://fred808-pil-4-1.hf.space/analyze",
|
|
|
"https://fred808-pil-4-2.hf.space/analyze",
|
|
|
"https://fred808-pil-4-3.hf.space/analyze",
|
|
|
"https://fred1012-fred1012-gw0j2h.hf.space/analyze",
|
|
|
"https://fred1012-fred1012-wqs6c2.hf.space/analyze",
|
|
|
"https://fred1012-fred1012-oncray.hf.space/analyze",
|
|
|
"https://fred1012-fred1012-4goge7.hf.space/analyze",
|
|
|
"https://fred1012-fred1012-z0eh7m.hf.space/analyze",
|
|
|
"https://fred1012-fred1012-u95rte.hf.space/analyze",
|
|
|
"https://fred1012-fred1012-igje22.hf.space/analyze",
|
|
|
"https://fred1012-fred1012-ibkuf8.hf.space/analyze",
|
|
|
"https://fred1012-fred1012-nwqthy.hf.space/analyze",
|
|
|
"https://fred1012-fred1012-4ldqj4.hf.space/analyze",
|
|
|
"https://fred1012-fred1012-pivlzg.hf.space/analyze",
|
|
|
"https://fred1012-fred1012-ptlc5u.hf.space/analyze",
|
|
|
"https://fred1012-fred1012-u7lh57.hf.space/analyze",
|
|
|
"https://fred1012-fred1012-q8djv1.hf.space/analyze",
|
|
|
"https://fredalone-fredalone-ozugrp.hf.space/analyze",
|
|
|
"https://fredalone-fredalone-9brxj2.hf.space/analyze",
|
|
|
"https://fredalone-fredalone-p8vq9a.hf.space/analyze",
|
|
|
"https://fredalone-fredalone-vbli2y.hf.space/analyze",
|
|
|
"https://fredalone-fredalone-uggger.hf.space/analyze",
|
|
|
"https://fredalone-fredalone-nmi7e8.hf.space/analyze",
|
|
|
"https://fredalone-fredalone-d1f26d.hf.space/analyze",
|
|
|
"https://fredalone-fredalone-461jp2.hf.space/analyze",
|
|
|
"https://fredalone-fredalone-3enfg4.hf.space/analyze",
|
|
|
"https://fredalone-fredalone-dqdbpv.hf.space/analyze",
|
|
|
"https://fredalone-fredalone-ivtjua.hf.space/analyze",
|
|
|
"https://fredalone-fredalone-6bezt2.hf.space/analyze",
|
|
|
"https://fredalone-fredalone-e0wfnk.hf.space/analyze",
|
|
|
"https://fredalone-fredalone-zu2t7j.hf.space/analyze",
|
|
|
"https://fredalone-fredalone-dqtv1o.hf.space/analyze",
|
|
|
"https://fredalone-fredalone-wclyog.hf.space/analyze",
|
|
|
"https://fredalone-fredalone-t27vig.hf.space/analyze",
|
|
|
"https://fredalone-fredalone-gahbxh.hf.space/analyze",
|
|
|
"https://fredalone-fredalone-kw2po4.hf.space/analyze",
|
|
|
"https://fredalone-fredalone-8h285h.hf.space/analyze"
|
|
|
]
|
|
|
MODEL_TYPE = "Florence-2-large"
|
|
|
|
|
|
|
|
|
TEMP_DIR = Path(f"temp_images_{FLOW_ID}")
|
|
|
TEMP_DIR.mkdir(exist_ok=True)
|
|
|
|
|
|
|
|
|
class ProcessStartRequest(BaseModel):
|
|
|
start_index: int = Field(AUTO_START_INDEX, ge=1, description="The index number of the zip file to start processing from (1-indexed).")
|
|
|
|
|
|
class CaptionServer:
|
|
|
def __init__(self, url):
|
|
|
self.url = url
|
|
|
self.busy = False
|
|
|
self.total_processed = 0
|
|
|
self.total_time = 0
|
|
|
self.model = MODEL_TYPE
|
|
|
|
|
|
@property
|
|
|
def fps(self):
|
|
|
return self.total_processed / self.total_time if self.total_time > 0 else 0
|
|
|
|
|
|
|
|
|
servers = [CaptionServer(url) for url in CAPTION_SERVERS]
|
|
|
server_index = 0
|
|
|
|
|
|
|
|
|
|
|
|
def load_progress() -> Dict:
|
|
|
"""Loads the local processing progress from the JSON file."""
|
|
|
if PROGRESS_FILE.exists():
|
|
|
try:
|
|
|
with PROGRESS_FILE.open('r') as f:
|
|
|
return json.load(f)
|
|
|
except json.JSONDecodeError:
|
|
|
print(f"[{FLOW_ID}] WARNING: Progress file is corrupted. Starting fresh.")
|
|
|
|
|
|
|
|
|
|
|
|
return {
|
|
|
"last_processed_index": 0,
|
|
|
"processed_files": {},
|
|
|
"file_list": []
|
|
|
}
|
|
|
|
|
|
def save_progress(progress_data: Dict):
|
|
|
"""Saves the local processing progress to the JSON file."""
|
|
|
try:
|
|
|
with PROGRESS_FILE.open('w') as f:
|
|
|
json.dump(progress_data, f, indent=4)
|
|
|
except Exception as e:
|
|
|
print(f"[{FLOW_ID}] CRITICAL ERROR: Could not save progress to {PROGRESS_FILE}: {e}")
|
|
|
|
|
|
def load_json_state(file_path: str, default_value: Dict[str, Any]) -> Dict[str, Any]:
|
|
|
"""Load state from JSON file with migration logic for new structure."""
|
|
|
if os.path.exists(file_path):
|
|
|
try:
|
|
|
with open(file_path, "r") as f:
|
|
|
data = json.load(f)
|
|
|
|
|
|
|
|
|
if "file_states" not in data or not isinstance(data["file_states"], dict):
|
|
|
print(f"[{FLOW_ID}] Initializing 'file_states' dictionary.")
|
|
|
data["file_states"] = {}
|
|
|
|
|
|
if "next_download_index" not in data:
|
|
|
data["next_download_index"] = 0
|
|
|
|
|
|
return data
|
|
|
except json.JSONDecodeError:
|
|
|
print(f"[{FLOW_ID}] WARNING: Corrupted state file: {file_path}")
|
|
|
return default_value
|
|
|
|
|
|
def save_json_state(file_path: str, data: Dict[str, Any]):
|
|
|
"""Save state to JSON file"""
|
|
|
with open(file_path, "w") as f:
|
|
|
json.dump(data, f, indent=2)
|
|
|
|
|
|
async def download_hf_state() -> Dict[str, Any]:
|
|
|
"""Downloads the state file from Hugging Face or returns a default state."""
|
|
|
local_path = LOCAL_STATE_FOLDER / HF_STATE_FILE
|
|
|
default_state = {"next_download_index": 0, "file_states": {}}
|
|
|
|
|
|
try:
|
|
|
|
|
|
files = HfApi(token=HF_TOKEN).list_repo_files(
|
|
|
repo_id=HF_OUTPUT_DATASET_ID,
|
|
|
repo_type="dataset"
|
|
|
)
|
|
|
|
|
|
if HF_STATE_FILE not in files:
|
|
|
print(f"[{FLOW_ID}] State file not found in {HF_OUTPUT_DATASET_ID}. Starting fresh.")
|
|
|
return default_state
|
|
|
|
|
|
|
|
|
hf_hub_download(
|
|
|
repo_id=HF_OUTPUT_DATASET_ID,
|
|
|
filename=HF_STATE_FILE,
|
|
|
repo_type="dataset",
|
|
|
local_dir=LOCAL_STATE_FOLDER,
|
|
|
local_dir_use_symlinks=False,
|
|
|
token=HF_TOKEN
|
|
|
)
|
|
|
|
|
|
print(f"[{FLOW_ID}] Successfully downloaded state file.")
|
|
|
return load_json_state(str(local_path), default_state)
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f"[{FLOW_ID}] Failed to download state file: {str(e)}. Starting fresh.")
|
|
|
return default_state
|
|
|
|
|
|
async def upload_hf_state(state: Dict[str, Any]) -> bool:
|
|
|
"""Uploads the state file to Hugging Face."""
|
|
|
local_path = LOCAL_STATE_FOLDER / HF_STATE_FILE
|
|
|
|
|
|
try:
|
|
|
|
|
|
save_json_state(str(local_path), state)
|
|
|
|
|
|
|
|
|
HfApi(token=HF_TOKEN).upload_file(
|
|
|
path_or_fileobj=str(local_path),
|
|
|
path_in_repo=HF_STATE_FILE,
|
|
|
repo_id=HF_OUTPUT_DATASET_ID,
|
|
|
repo_type="dataset",
|
|
|
commit_message=f"Update caption processing state: next_index={state['next_download_index']}"
|
|
|
)
|
|
|
print(f"[{FLOW_ID}] Successfully uploaded state file.")
|
|
|
return True
|
|
|
except Exception as e:
|
|
|
print(f"[{FLOW_ID}] Failed to upload state file: {str(e)}")
|
|
|
return False
|
|
|
|
|
|
async def lock_file_for_processing(zip_filename: str, state: Dict[str, Any]) -> bool:
|
|
|
"""Marks a file as 'processing' in the state file and uploads the lock."""
|
|
|
print(f"[{FLOW_ID}] 🔒 Attempting to lock file: {zip_filename}")
|
|
|
|
|
|
|
|
|
state["file_states"][zip_filename] = "processing"
|
|
|
|
|
|
|
|
|
if await upload_hf_state(state):
|
|
|
print(f"[{FLOW_ID}] ✅ Successfully locked file: {zip_filename}")
|
|
|
return True
|
|
|
else:
|
|
|
print(f"[{FLOW_ID}] ❌ Failed to lock file: {zip_filename}")
|
|
|
|
|
|
if zip_filename in state["file_states"]:
|
|
|
del state["file_states"][zip_filename]
|
|
|
return False
|
|
|
|
|
|
async def unlock_file_as_processed(zip_filename: str, state: Dict[str, Any], next_index: int) -> bool:
|
|
|
"""Marks a file as 'processed', updates the index, and uploads the state."""
|
|
|
print(f"[{FLOW_ID}] 🔓 Marking file as processed: {zip_filename}")
|
|
|
|
|
|
|
|
|
state["file_states"][zip_filename] = "processed"
|
|
|
state["next_download_index"] = next_index
|
|
|
|
|
|
|
|
|
if await upload_hf_state(state):
|
|
|
print(f"[{FLOW_ID}] ✅ Successfully marked as processed: {zip_filename}")
|
|
|
return True
|
|
|
else:
|
|
|
print(f"[{FLOW_ID}] ❌ Failed to update state for: {zip_filename}")
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
|
|
async def get_zip_file_list(progress_data: Dict) -> List[str]:
|
|
|
"""
|
|
|
Fetches the list of all zip files from the dataset, or uses the cached list.
|
|
|
Updates the progress_data with the file list if a new list is fetched.
|
|
|
"""
|
|
|
if progress_data['file_list']:
|
|
|
print(f"[{FLOW_ID}] Using cached file list with {len(progress_data['file_list'])} files.")
|
|
|
return progress_data['file_list']
|
|
|
|
|
|
print(f"[{FLOW_ID}] Fetching full list of zip files from {HF_DATASET_ID}...")
|
|
|
try:
|
|
|
api = HfApi(token=HF_TOKEN)
|
|
|
repo_files = api.list_repo_files(
|
|
|
repo_id=HF_DATASET_ID,
|
|
|
repo_type="dataset"
|
|
|
)
|
|
|
|
|
|
|
|
|
zip_files = sorted([
|
|
|
f for f in repo_files
|
|
|
if f.startswith(ZIP_FILE_PREFIX) and f.endswith('.zip')
|
|
|
])
|
|
|
|
|
|
if not zip_files:
|
|
|
raise FileNotFoundError(f"No zip files found in '{ZIP_FILE_PREFIX}' directory of dataset '{HF_DATASET_ID}'.")
|
|
|
|
|
|
print(f"[{FLOW_ID}] Found {len(zip_files)} zip files.")
|
|
|
|
|
|
|
|
|
progress_data['file_list'] = zip_files
|
|
|
save_progress(progress_data)
|
|
|
|
|
|
return zip_files
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f"[{FLOW_ID}] Error fetching file list from Hugging Face: {e}")
|
|
|
return []
|
|
|
|
|
|
async def download_and_extract_zip_by_index(file_index: int, repo_file_full_path: str) -> Optional[Path]:
|
|
|
"""Downloads the zip file for the given index and extracts its contents."""
|
|
|
|
|
|
|
|
|
zip_full_name = Path(repo_file_full_path).name
|
|
|
course_name = zip_full_name.replace('.zip', '')
|
|
|
|
|
|
print(f"[{FLOW_ID}] Processing file #{file_index}: {repo_file_full_path}. Full name: {zip_full_name}")
|
|
|
|
|
|
try:
|
|
|
|
|
|
zip_path = hf_hub_download(
|
|
|
repo_id=HF_DATASET_ID,
|
|
|
filename=repo_file_full_path,
|
|
|
repo_type="dataset",
|
|
|
token=HF_TOKEN,
|
|
|
)
|
|
|
|
|
|
print(f"[{FLOW_ID}] Downloaded to {zip_path}. Extracting...")
|
|
|
|
|
|
|
|
|
extract_dir = TEMP_DIR / course_name
|
|
|
|
|
|
if extract_dir.exists():
|
|
|
shutil.rmtree(extract_dir)
|
|
|
extract_dir.mkdir(exist_ok=True)
|
|
|
|
|
|
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
|
|
|
zip_ref.extractall(extract_dir)
|
|
|
|
|
|
print(f"[{FLOW_ID}] Extraction complete to {extract_dir}.")
|
|
|
|
|
|
|
|
|
os.remove(zip_path)
|
|
|
|
|
|
return extract_dir
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f"[{FLOW_ID}] Error downloading or extracting zip for {repo_file_full_path}: {e}")
|
|
|
return None
|
|
|
|
|
|
async def upload_captions_to_hf(zip_full_name: str, captions: List[Dict]) -> bool:
|
|
|
"""Uploads the final captions JSON file to the output dataset."""
|
|
|
|
|
|
caption_filename = Path(zip_full_name).with_suffix('.json').name
|
|
|
|
|
|
try:
|
|
|
print(f"[{FLOW_ID}] Uploading {len(captions)} captions for {zip_full_name} as {caption_filename} to {HF_OUTPUT_DATASET_ID}...")
|
|
|
|
|
|
|
|
|
json_content = json.dumps(captions, indent=2, ensure_ascii=False).encode('utf-8')
|
|
|
|
|
|
api = HfApi(token=HF_TOKEN)
|
|
|
api.upload_file(
|
|
|
path_or_fileobj=io.BytesIO(json_content),
|
|
|
path_in_repo=caption_filename,
|
|
|
repo_id=HF_OUTPUT_DATASET_ID,
|
|
|
repo_type="dataset",
|
|
|
commit_message=f"[{FLOW_ID}] Captions for {zip_full_name}"
|
|
|
)
|
|
|
|
|
|
print(f"[{FLOW_ID}] Successfully uploaded captions for {zip_full_name}.")
|
|
|
return True
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f"[{FLOW_ID}] Error uploading captions for {zip_full_name}: {e}")
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
|
|
async def get_available_server(timeout: float = 300.0) -> CaptionServer:
|
|
|
"""Round-robin selection of an available caption server."""
|
|
|
global server_index
|
|
|
start_time = time.time()
|
|
|
while True:
|
|
|
|
|
|
for _ in range(len(servers)):
|
|
|
server = servers[server_index]
|
|
|
server_index = (server_index + 1) % len(servers)
|
|
|
if not server.busy:
|
|
|
return server
|
|
|
|
|
|
|
|
|
await asyncio.sleep(0.5)
|
|
|
|
|
|
|
|
|
if time.time() - start_time > timeout:
|
|
|
raise TimeoutError(f"Timeout ({timeout}s) waiting for an available caption server.")
|
|
|
|
|
|
async def send_image_for_captioning(image_path: Path, course_name: str, progress_tracker: Dict) -> Optional[Dict]:
|
|
|
"""Sends a single image to a caption server for processing."""
|
|
|
|
|
|
MAX_RETRIES = 3
|
|
|
for attempt in range(MAX_RETRIES):
|
|
|
server = None
|
|
|
try:
|
|
|
|
|
|
server = await get_available_server()
|
|
|
server.busy = True
|
|
|
start_time = time.time()
|
|
|
|
|
|
|
|
|
if attempt == 0:
|
|
|
print(f"[{FLOW_ID}] Starting attempt on {image_path.name}...")
|
|
|
|
|
|
|
|
|
form_data = aiohttp.FormData()
|
|
|
form_data.add_field('file',
|
|
|
image_path.open('rb'),
|
|
|
filename=image_path.name,
|
|
|
content_type='image/jpeg')
|
|
|
form_data.add_field('model_choice', MODEL_TYPE)
|
|
|
|
|
|
|
|
|
async with aiohttp.ClientSession() as session:
|
|
|
|
|
|
async with session.post(server.url, data=form_data, timeout=600) as resp:
|
|
|
if resp.status == 200:
|
|
|
result = await resp.json()
|
|
|
caption = result.get("caption")
|
|
|
|
|
|
if caption:
|
|
|
|
|
|
progress_tracker['completed'] += 1
|
|
|
if progress_tracker['completed'] % 50 == 0:
|
|
|
print(f"[{FLOW_ID}] PROGRESS: {progress_tracker['completed']}/{progress_tracker['total']} captions completed.")
|
|
|
|
|
|
|
|
|
if progress_tracker['completed'] % 50 != 0:
|
|
|
print(f"[{FLOW_ID}] Success: {image_path.name} captioned by {server.url}")
|
|
|
|
|
|
return {
|
|
|
"course": course_name,
|
|
|
"image_path": image_path.name,
|
|
|
"caption": caption,
|
|
|
"timestamp": datetime.now().isoformat()
|
|
|
}
|
|
|
else:
|
|
|
print(f"[{FLOW_ID}] Server {server.url} returned success but no caption for {image_path.name}. Retrying...")
|
|
|
continue
|
|
|
else:
|
|
|
error_text = await resp.text()
|
|
|
print(f"[{FLOW_ID}] Error from server {server.url} for {image_path.name}: {resp.status} - {error_text}. Retrying...")
|
|
|
continue
|
|
|
|
|
|
except (aiohttp.ClientError, asyncio.TimeoutError, TimeoutError) as e:
|
|
|
print(f"[{FLOW_ID}] Connection/Timeout error for {image_path.name} on {server.url if server else 'unknown server'}: {e}. Retrying...")
|
|
|
continue
|
|
|
except Exception as e:
|
|
|
print(f"[{FLOW_ID}] Unexpected error during captioning for {image_path.name}: {e}. Retrying...")
|
|
|
continue
|
|
|
finally:
|
|
|
if server:
|
|
|
end_time = time.time()
|
|
|
server.busy = False
|
|
|
server.total_processed += 1
|
|
|
server.total_time += (end_time - start_time)
|
|
|
|
|
|
print(f"[{FLOW_ID}] FAILED after {MAX_RETRIES} attempts for {image_path.name}.")
|
|
|
return None
|
|
|
|
|
|
async def process_dataset_task(start_index: int):
|
|
|
"""Main task to process the dataset sequentially starting from a given index."""
|
|
|
|
|
|
|
|
|
progress = load_progress()
|
|
|
current_state = await download_hf_state()
|
|
|
file_list = await get_zip_file_list(progress)
|
|
|
|
|
|
if not file_list:
|
|
|
print(f"[{FLOW_ID}] ERROR: Cannot proceed. File list is empty.")
|
|
|
return False
|
|
|
|
|
|
|
|
|
if start_index > len(file_list):
|
|
|
print(f"[{FLOW_ID}] WARNING: Start index {start_index} is greater than the total number of files ({len(file_list)}). Exiting.")
|
|
|
return True
|
|
|
|
|
|
|
|
|
start_list_index = start_index - 1
|
|
|
|
|
|
print(f"[{FLOW_ID}] Starting dataset processing from file index: {start_index} out of {len(file_list)}.")
|
|
|
|
|
|
global_success = True
|
|
|
|
|
|
for i in range(start_list_index, len(file_list)):
|
|
|
file_index = i + 1
|
|
|
repo_file_full_path = file_list[i]
|
|
|
zip_full_name = Path(repo_file_full_path).name
|
|
|
course_name = zip_full_name.replace('.zip', '')
|
|
|
|
|
|
|
|
|
file_state = current_state["file_states"].get(zip_full_name)
|
|
|
if file_state == "processed":
|
|
|
print(f"[{FLOW_ID}] Skipping {zip_full_name}: Already processed in global state.")
|
|
|
continue
|
|
|
elif file_state == "processing":
|
|
|
print(f"[{FLOW_ID}] Skipping {zip_full_name}: Currently being processed by another worker.")
|
|
|
continue
|
|
|
|
|
|
|
|
|
if not await lock_file_for_processing(zip_full_name, current_state):
|
|
|
print(f"[{FLOW_ID}] Failed to lock {zip_full_name}. Skipping.")
|
|
|
continue
|
|
|
|
|
|
extract_dir = None
|
|
|
current_file_success = False
|
|
|
|
|
|
try:
|
|
|
|
|
|
extract_dir = await download_and_extract_zip_by_index(file_index, repo_file_full_path)
|
|
|
|
|
|
if not extract_dir:
|
|
|
raise Exception("Failed to download or extract zip file.")
|
|
|
|
|
|
|
|
|
|
|
|
image_paths = [p for p in extract_dir.glob("**/*") if p.is_file() and p.suffix.lower() in ['.jpg', '.jpeg', '.png']]
|
|
|
print(f"[{FLOW_ID}] Found {len(image_paths)} images to process in {zip_full_name}.")
|
|
|
|
|
|
if not image_paths:
|
|
|
print(f"[{FLOW_ID}] No images found in {zip_full_name}. Marking as complete.")
|
|
|
current_file_success = True
|
|
|
else:
|
|
|
|
|
|
progress_tracker = {
|
|
|
'total': len(image_paths),
|
|
|
'completed': 0
|
|
|
}
|
|
|
print(f"[{FLOW_ID}] Starting captioning for {progress_tracker['total']} images in {zip_full_name}...")
|
|
|
|
|
|
|
|
|
semaphore = asyncio.Semaphore(len(servers))
|
|
|
|
|
|
async def limited_send_image_for_captioning(image_path, course_name, progress_tracker):
|
|
|
async with semaphore:
|
|
|
return await send_image_for_captioning(image_path, course_name, progress_tracker)
|
|
|
|
|
|
|
|
|
caption_tasks = [limited_send_image_for_captioning(p, course_name, progress_tracker) for p in image_paths]
|
|
|
|
|
|
|
|
|
results = await asyncio.gather(*caption_tasks)
|
|
|
|
|
|
|
|
|
all_captions = [r for r in results if r is not None]
|
|
|
|
|
|
|
|
|
if len(all_captions) == len(image_paths):
|
|
|
print(f"[{FLOW_ID}] FINAL PROGRESS for {zip_full_name}: Successfully completed all {len(all_captions)} captions.")
|
|
|
current_file_success = True
|
|
|
else:
|
|
|
print(f"[{FLOW_ID}] FINAL PROGRESS for {zip_full_name}: Completed with partial result: {len(all_captions)}/{len(image_paths)} captions. Marking as partial failure.")
|
|
|
current_file_success = False
|
|
|
|
|
|
|
|
|
if all_captions:
|
|
|
print(f"[{FLOW_ID}] Uploading {len(all_captions)} captions for {zip_full_name}...")
|
|
|
if await upload_captions_to_hf(zip_full_name, all_captions):
|
|
|
print(f"[{FLOW_ID}] Successfully uploaded captions for {zip_full_name}.")
|
|
|
|
|
|
pass
|
|
|
else:
|
|
|
print(f"[{FLOW_ID}] Failed to upload captions for {zip_full_name}.")
|
|
|
current_file_success = False
|
|
|
else:
|
|
|
print(f"[{FLOW_ID}] No captions generated. Skipping upload for {zip_full_name}.")
|
|
|
current_file_success = False
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f"[{FLOW_ID}] Critical error in process_dataset_task for file #{file_index} ({zip_full_name}): {e}")
|
|
|
current_file_success = False
|
|
|
global_success = False
|
|
|
|
|
|
finally:
|
|
|
|
|
|
if extract_dir and extract_dir.exists():
|
|
|
print(f"[{FLOW_ID}] Cleaned up temporary directory {extract_dir}.")
|
|
|
shutil.rmtree(extract_dir, ignore_errors=True)
|
|
|
|
|
|
if current_file_success:
|
|
|
|
|
|
progress['last_processed_index'] = file_index
|
|
|
progress['processed_files'][str(file_index)] = repo_file_full_path
|
|
|
save_progress(progress)
|
|
|
|
|
|
|
|
|
if await unlock_file_as_processed(zip_full_name, current_state, file_index + 1):
|
|
|
print(f"[{FLOW_ID}] Progress saved and file unlocked: {zip_full_name}")
|
|
|
else:
|
|
|
print(f"[{FLOW_ID}] Warning: File processed but state update failed: {zip_full_name}")
|
|
|
else:
|
|
|
|
|
|
current_state["file_states"][zip_full_name] = "failed"
|
|
|
await upload_hf_state(current_state)
|
|
|
print(f"[{FLOW_ID}] File {zip_full_name} marked as failed. Continuing with next file.")
|
|
|
global_success = False
|
|
|
|
|
|
print(f"[{FLOW_ID}] All processing loops complete. Overall success: {global_success}")
|
|
|
return global_success
|
|
|
|
|
|
|
|
|
|
|
|
app = FastAPI(
|
|
|
title=f"Flow Server {FLOW_ID} API",
|
|
|
description="Sequentially processes zip files from a dataset, captions images, and tracks progress.",
|
|
|
version="1.0.0"
|
|
|
)
|
|
|
|
|
|
@app.on_event("startup")
|
|
|
async def startup_event():
|
|
|
print(f"Flow Server {FLOW_ID} started on port {FLOW_PORT}.")
|
|
|
|
|
|
|
|
|
progress = load_progress()
|
|
|
|
|
|
start_index = progress.get('last_processed_index', 0) + 1
|
|
|
if start_index < AUTO_START_INDEX:
|
|
|
start_index = AUTO_START_INDEX
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
print(f"[{FLOW_ID}] Auto-starting processing from index: {start_index}...")
|
|
|
asyncio.create_task(process_dataset_task(start_index))
|
|
|
|
|
|
@app.get("/")
|
|
|
async def root():
|
|
|
progress = load_progress()
|
|
|
return {
|
|
|
"flow_id": FLOW_ID,
|
|
|
"status": "ready",
|
|
|
"last_processed_index": progress['last_processed_index'],
|
|
|
"total_files_in_list": len(progress['file_list']),
|
|
|
"processed_files_count": len(progress['processed_files']),
|
|
|
"total_servers": len(servers),
|
|
|
"busy_servers": sum(1 for s in servers if s.busy),
|
|
|
}
|
|
|
|
|
|
@app.post("/start_processing")
|
|
|
async def start_processing(request: ProcessStartRequest, background_tasks: BackgroundTasks):
|
|
|
"""
|
|
|
Starts the sequential processing of zip files from the given index in the background.
|
|
|
"""
|
|
|
start_index = request.start_index
|
|
|
|
|
|
print(f"[{FLOW_ID}] Received request to start processing from index: {start_index}. Starting background task.")
|
|
|
|
|
|
|
|
|
|
|
|
background_tasks.add_task(process_dataset_task, start_index)
|
|
|
|
|
|
return {"status": "processing", "start_index": start_index, "message": "Dataset processing started in background."}
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
import uvicorn
|
|
|
|
|
|
uvicorn.run(app, host="0.0.0.0", port=FLOW_PORT)
|
|
|
|