|
|
import os |
|
|
import json |
|
|
import time |
|
|
import asyncio |
|
|
import aiohttp |
|
|
import zipfile |
|
|
import shutil |
|
|
from typing import Dict, List, Set, Optional, Tuple, Any |
|
|
from urllib.parse import quote |
|
|
from datetime import datetime |
|
|
from pathlib import Path |
|
|
import io |
|
|
|
|
|
from fastapi import FastAPI, BackgroundTasks, HTTPException, status |
|
|
from pydantic import BaseModel, Field |
|
|
from huggingface_hub import HfApi, hf_hub_download |
|
|
|
|
|
|
|
|
AUTO_START_INDEX = 20 |
|
|
FLOW_ID = os.getenv("FLOW_ID", "flow_default") |
|
|
FLOW_PORT = int(os.getenv("FLOW_PORT", 8001)) |
|
|
HF_TOKEN = os.getenv("HF_TOKEN", "") |
|
|
HF_DATASET_ID = os.getenv("HF_DATASET_ID", "Fred808/BG3") |
|
|
HF_OUTPUT_DATASET_ID = os.getenv("HF_OUTPUT_DATASET_ID", "fred808/helium") |
|
|
|
|
|
|
|
|
PROGRESS_FILE = Path("processing_progress2.json") |
|
|
HF_STATE_FILE = "processing_state_captions2.json" |
|
|
LOCAL_STATE_FOLDER = Path(".state") |
|
|
LOCAL_STATE_FOLDER.mkdir(exist_ok=True) |
|
|
|
|
|
|
|
|
ZIP_FILE_PREFIX = "frames_zips/" |
|
|
|
|
|
|
|
|
CAPTION_SERVERS = [ |
|
|
"https://eliason1-fserve-1.hf.space/analyze", |
|
|
"https://eliason1-fserve-2.hf.space/analyze", |
|
|
"https://eliason1-fserve-3.hf.space/analyze", |
|
|
"https://eliason1-fserve-4.hf.space/analyze", |
|
|
"https://eliason1-fserve-5.hf.space/analyze", |
|
|
"https://eliason1-fserve-6.hf.space/analyze", |
|
|
"https://eliason1-fserve-7.hf.space/analyze", |
|
|
"https://eliason1-fserve-8.hf.space/analyze", |
|
|
"https://eliason1-fserve-9.hf.space/analyze", |
|
|
"https://eliason1-fserve-10.hf.space/analyze", |
|
|
"https://eliason1-fserve-11.hf.space/analyze", |
|
|
"https://eliason1-fserve-12.hf.space/analyze", |
|
|
"https://eliason1-fserve-13.hf.space/analyze", |
|
|
"https://eliason1-fserve-14.hf.space/analyze", |
|
|
"https://eliason1-fserve-15.hf.space/analyze", |
|
|
"https://eliason1-fserve-16.hf.space/analyze", |
|
|
"https://eliason1-fserve-17.hf.space/analyze", |
|
|
"https://eliason1-fserve-18.hf.space/analyze" |
|
|
] |
|
|
MODEL_TYPE = "Florence-2-large" |
|
|
|
|
|
|
|
|
TEMP_DIR = Path(f"temp_images_{FLOW_ID}") |
|
|
TEMP_DIR.mkdir(exist_ok=True) |
|
|
|
|
|
|
|
|
class ProcessStartRequest(BaseModel): |
|
|
start_index: int = Field(AUTO_START_INDEX, ge=1, description="The index number of the zip file to start processing from (1-indexed).") |
|
|
|
|
|
class CaptionServer: |
|
|
def __init__(self, url): |
|
|
self.url = url |
|
|
self.busy = False |
|
|
self.total_processed = 0 |
|
|
self.total_time = 0 |
|
|
self.model = MODEL_TYPE |
|
|
|
|
|
@property |
|
|
def fps(self): |
|
|
return self.total_processed / self.total_time if self.total_time > 0 else 0 |
|
|
|
|
|
|
|
|
servers = [CaptionServer(url) for url in CAPTION_SERVERS] |
|
|
server_index = 0 |
|
|
|
|
|
|
|
|
|
|
|
def load_progress() -> Dict: |
|
|
"""Loads the local processing progress from the JSON file.""" |
|
|
if PROGRESS_FILE.exists(): |
|
|
try: |
|
|
with PROGRESS_FILE.open('r') as f: |
|
|
return json.load(f) |
|
|
except json.JSONDecodeError: |
|
|
print(f"[{FLOW_ID}] WARNING: Progress file is corrupted. Starting fresh.") |
|
|
|
|
|
|
|
|
|
|
|
return { |
|
|
"last_processed_index": 0, |
|
|
"processed_files": {}, |
|
|
"file_list": [] |
|
|
} |
|
|
|
|
|
def save_progress(progress_data: Dict): |
|
|
"""Saves the local processing progress to the JSON file.""" |
|
|
try: |
|
|
with PROGRESS_FILE.open('w') as f: |
|
|
json.dump(progress_data, f, indent=4) |
|
|
except Exception as e: |
|
|
print(f"[{FLOW_ID}] CRITICAL ERROR: Could not save progress to {PROGRESS_FILE}: {e}") |
|
|
|
|
|
def load_json_state(file_path: str, default_value: Dict[str, Any]) -> Dict[str, Any]: |
|
|
"""Load state from JSON file with migration logic for new structure.""" |
|
|
if os.path.exists(file_path): |
|
|
try: |
|
|
with open(file_path, "r") as f: |
|
|
data = json.load(f) |
|
|
|
|
|
|
|
|
if "file_states" not in data or not isinstance(data["file_states"], dict): |
|
|
print(f"[{FLOW_ID}] Initializing 'file_states' dictionary.") |
|
|
data["file_states"] = {} |
|
|
|
|
|
if "next_download_index" not in data: |
|
|
data["next_download_index"] = 0 |
|
|
|
|
|
return data |
|
|
except json.JSONDecodeError: |
|
|
print(f"[{FLOW_ID}] WARNING: Corrupted state file: {file_path}") |
|
|
return default_value |
|
|
|
|
|
def save_json_state(file_path: str, data: Dict[str, Any]): |
|
|
"""Save state to JSON file""" |
|
|
with open(file_path, "w") as f: |
|
|
json.dump(data, f, indent=2) |
|
|
|
|
|
async def download_hf_state() -> Dict[str, Any]: |
|
|
"""Downloads the state file from Hugging Face or returns a default state.""" |
|
|
local_path = LOCAL_STATE_FOLDER / HF_STATE_FILE |
|
|
default_state = {"next_download_index": 0, "file_states": {}} |
|
|
|
|
|
try: |
|
|
|
|
|
files = HfApi(token=HF_TOKEN).list_repo_files( |
|
|
repo_id=HF_OUTPUT_DATASET_ID, |
|
|
repo_type="dataset" |
|
|
) |
|
|
|
|
|
if HF_STATE_FILE not in files: |
|
|
print(f"[{FLOW_ID}] State file not found in {HF_OUTPUT_DATASET_ID}. Starting fresh.") |
|
|
return default_state |
|
|
|
|
|
|
|
|
hf_hub_download( |
|
|
repo_id=HF_OUTPUT_DATASET_ID, |
|
|
filename=HF_STATE_FILE, |
|
|
repo_type="dataset", |
|
|
local_dir=LOCAL_STATE_FOLDER, |
|
|
local_dir_use_symlinks=False, |
|
|
token=HF_TOKEN |
|
|
) |
|
|
|
|
|
print(f"[{FLOW_ID}] Successfully downloaded state file.") |
|
|
return load_json_state(str(local_path), default_state) |
|
|
|
|
|
except Exception as e: |
|
|
print(f"[{FLOW_ID}] Failed to download state file: {str(e)}. Starting fresh.") |
|
|
return default_state |
|
|
|
|
|
async def upload_hf_state(state: Dict[str, Any]) -> bool: |
|
|
"""Uploads the state file to Hugging Face.""" |
|
|
local_path = LOCAL_STATE_FOLDER / HF_STATE_FILE |
|
|
|
|
|
try: |
|
|
|
|
|
save_json_state(str(local_path), state) |
|
|
|
|
|
|
|
|
HfApi(token=HF_TOKEN).upload_file( |
|
|
path_or_fileobj=str(local_path), |
|
|
path_in_repo=HF_STATE_FILE, |
|
|
repo_id=HF_OUTPUT_DATASET_ID, |
|
|
repo_type="dataset", |
|
|
commit_message=f"Update caption processing state: next_index={state['next_download_index']}" |
|
|
) |
|
|
print(f"[{FLOW_ID}] Successfully uploaded state file.") |
|
|
return True |
|
|
except Exception as e: |
|
|
print(f"[{FLOW_ID}] Failed to upload state file: {str(e)}") |
|
|
return False |
|
|
|
|
|
async def lock_file_for_processing(zip_filename: str, state: Dict[str, Any]) -> bool: |
|
|
"""Marks a file as 'processing' in the state file and uploads the lock.""" |
|
|
print(f"[{FLOW_ID}] 🔒 Attempting to lock file: {zip_filename}") |
|
|
|
|
|
|
|
|
state["file_states"][zip_filename] = "processing" |
|
|
|
|
|
|
|
|
if await upload_hf_state(state): |
|
|
print(f"[{FLOW_ID}] ✅ Successfully locked file: {zip_filename}") |
|
|
return True |
|
|
else: |
|
|
print(f"[{FLOW_ID}] ❌ Failed to lock file: {zip_filename}") |
|
|
|
|
|
if zip_filename in state["file_states"]: |
|
|
del state["file_states"][zip_filename] |
|
|
return False |
|
|
|
|
|
async def unlock_file_as_processed(zip_filename: str, state: Dict[str, Any], next_index: int) -> bool: |
|
|
"""Marks a file as 'processed', updates the index, and uploads the state.""" |
|
|
print(f"[{FLOW_ID}] 🔓 Marking file as processed: {zip_filename}") |
|
|
|
|
|
|
|
|
state["file_states"][zip_filename] = "processed" |
|
|
state["next_download_index"] = next_index |
|
|
|
|
|
|
|
|
if await upload_hf_state(state): |
|
|
print(f"[{FLOW_ID}] ✅ Successfully marked as processed: {zip_filename}") |
|
|
return True |
|
|
else: |
|
|
print(f"[{FLOW_ID}] ❌ Failed to update state for: {zip_filename}") |
|
|
return False |
|
|
|
|
|
|
|
|
|
|
|
async def get_zip_file_list(progress_data: Dict) -> List[str]: |
|
|
""" |
|
|
Fetches the list of all zip files from the dataset, or uses the cached list. |
|
|
Updates the progress_data with the file list if a new list is fetched. |
|
|
""" |
|
|
if progress_data['file_list']: |
|
|
print(f"[{FLOW_ID}] Using cached file list with {len(progress_data['file_list'])} files.") |
|
|
return progress_data['file_list'] |
|
|
|
|
|
print(f"[{FLOW_ID}] Fetching full list of zip files from {HF_DATASET_ID}...") |
|
|
try: |
|
|
api = HfApi(token=HF_TOKEN) |
|
|
repo_files = api.list_repo_files( |
|
|
repo_id=HF_DATASET_ID, |
|
|
repo_type="dataset" |
|
|
) |
|
|
|
|
|
|
|
|
zip_files = sorted([ |
|
|
f for f in repo_files |
|
|
if f.startswith(ZIP_FILE_PREFIX) and f.endswith('.zip') |
|
|
]) |
|
|
|
|
|
if not zip_files: |
|
|
raise FileNotFoundError(f"No zip files found in '{ZIP_FILE_PREFIX}' directory of dataset '{HF_DATASET_ID}'.") |
|
|
|
|
|
print(f"[{FLOW_ID}] Found {len(zip_files)} zip files.") |
|
|
|
|
|
|
|
|
progress_data['file_list'] = zip_files |
|
|
save_progress(progress_data) |
|
|
|
|
|
return zip_files |
|
|
|
|
|
except Exception as e: |
|
|
print(f"[{FLOW_ID}] Error fetching file list from Hugging Face: {e}") |
|
|
return [] |
|
|
|
|
|
async def download_and_extract_zip_by_index(file_index: int, repo_file_full_path: str) -> Optional[Path]: |
|
|
"""Downloads the zip file for the given index and extracts its contents.""" |
|
|
|
|
|
|
|
|
zip_full_name = Path(repo_file_full_path).name |
|
|
course_name = zip_full_name.replace('.zip', '') |
|
|
|
|
|
print(f"[{FLOW_ID}] Processing file #{file_index}: {repo_file_full_path}. Full name: {zip_full_name}") |
|
|
|
|
|
try: |
|
|
|
|
|
zip_path = hf_hub_download( |
|
|
repo_id=HF_DATASET_ID, |
|
|
filename=repo_file_full_path, |
|
|
repo_type="dataset", |
|
|
token=HF_TOKEN, |
|
|
) |
|
|
|
|
|
print(f"[{FLOW_ID}] Downloaded to {zip_path}. Extracting...") |
|
|
|
|
|
|
|
|
extract_dir = TEMP_DIR / course_name |
|
|
|
|
|
if extract_dir.exists(): |
|
|
shutil.rmtree(extract_dir) |
|
|
extract_dir.mkdir(exist_ok=True) |
|
|
|
|
|
with zipfile.ZipFile(zip_path, 'r') as zip_ref: |
|
|
zip_ref.extractall(extract_dir) |
|
|
|
|
|
print(f"[{FLOW_ID}] Extraction complete to {extract_dir}.") |
|
|
|
|
|
|
|
|
os.remove(zip_path) |
|
|
|
|
|
return extract_dir |
|
|
|
|
|
except Exception as e: |
|
|
print(f"[{FLOW_ID}] Error downloading or extracting zip for {repo_file_full_path}: {e}") |
|
|
return None |
|
|
|
|
|
async def upload_captions_to_hf(zip_full_name: str, captions: List[Dict]) -> bool: |
|
|
"""Uploads the final captions JSON file to the output dataset.""" |
|
|
|
|
|
caption_filename = Path(zip_full_name).with_suffix('.json').name |
|
|
|
|
|
try: |
|
|
print(f"[{FLOW_ID}] Uploading {len(captions)} captions for {zip_full_name} as {caption_filename} to {HF_OUTPUT_DATASET_ID}...") |
|
|
|
|
|
|
|
|
json_content = json.dumps(captions, indent=2, ensure_ascii=False).encode('utf-8') |
|
|
|
|
|
api = HfApi(token=HF_TOKEN) |
|
|
api.upload_file( |
|
|
path_or_fileobj=io.BytesIO(json_content), |
|
|
path_in_repo=caption_filename, |
|
|
repo_id=HF_OUTPUT_DATASET_ID, |
|
|
repo_type="dataset", |
|
|
commit_message=f"[{FLOW_ID}] Captions for {zip_full_name}" |
|
|
) |
|
|
|
|
|
print(f"[{FLOW_ID}] Successfully uploaded captions for {zip_full_name}.") |
|
|
return True |
|
|
|
|
|
except Exception as e: |
|
|
print(f"[{FLOW_ID}] Error uploading captions for {zip_full_name}: {e}") |
|
|
return False |
|
|
|
|
|
|
|
|
|
|
|
async def get_available_server(timeout: float = 300.0) -> CaptionServer: |
|
|
"""Round-robin selection of an available caption server.""" |
|
|
global server_index |
|
|
start_time = time.time() |
|
|
while True: |
|
|
|
|
|
for _ in range(len(servers)): |
|
|
server = servers[server_index] |
|
|
server_index = (server_index + 1) % len(servers) |
|
|
if not server.busy: |
|
|
return server |
|
|
|
|
|
|
|
|
await asyncio.sleep(0.5) |
|
|
|
|
|
|
|
|
if time.time() - start_time > timeout: |
|
|
raise TimeoutError(f"Timeout ({timeout}s) waiting for an available caption server.") |
|
|
|
|
|
async def send_image_for_captioning(image_path: Path, course_name: str, progress_tracker: Dict) -> Optional[Dict]: |
|
|
"""Sends a single image to a caption server for processing.""" |
|
|
|
|
|
MAX_RETRIES = 3 |
|
|
for attempt in range(MAX_RETRIES): |
|
|
server = None |
|
|
try: |
|
|
|
|
|
server = await get_available_server() |
|
|
server.busy = True |
|
|
start_time = time.time() |
|
|
|
|
|
|
|
|
if attempt == 0: |
|
|
print(f"[{FLOW_ID}] Starting attempt on {image_path.name}...") |
|
|
|
|
|
|
|
|
form_data = aiohttp.FormData() |
|
|
form_data.add_field('file', |
|
|
image_path.open('rb'), |
|
|
filename=image_path.name, |
|
|
content_type='image/jpeg') |
|
|
form_data.add_field('model_choice', MODEL_TYPE) |
|
|
|
|
|
|
|
|
async with aiohttp.ClientSession() as session: |
|
|
|
|
|
async with session.post(server.url, data=form_data, timeout=600) as resp: |
|
|
if resp.status == 200: |
|
|
result = await resp.json() |
|
|
caption = result.get("caption") |
|
|
|
|
|
if caption: |
|
|
|
|
|
progress_tracker['completed'] += 1 |
|
|
if progress_tracker['completed'] % 50 == 0: |
|
|
print(f"[{FLOW_ID}] PROGRESS: {progress_tracker['completed']}/{progress_tracker['total']} captions completed.") |
|
|
|
|
|
|
|
|
if progress_tracker['completed'] % 50 != 0: |
|
|
print(f"[{FLOW_ID}] Success: {image_path.name} captioned by {server.url}") |
|
|
|
|
|
return { |
|
|
"course": course_name, |
|
|
"image_path": image_path.name, |
|
|
"caption": caption, |
|
|
"timestamp": datetime.now().isoformat() |
|
|
} |
|
|
else: |
|
|
print(f"[{FLOW_ID}] Server {server.url} returned success but no caption for {image_path.name}. Retrying...") |
|
|
continue |
|
|
else: |
|
|
error_text = await resp.text() |
|
|
print(f"[{FLOW_ID}] Error from server {server.url} for {image_path.name}: {resp.status} - {error_text}. Retrying...") |
|
|
continue |
|
|
|
|
|
except (aiohttp.ClientError, asyncio.TimeoutError, TimeoutError) as e: |
|
|
print(f"[{FLOW_ID}] Connection/Timeout error for {image_path.name} on {server.url if server else 'unknown server'}: {e}. Retrying...") |
|
|
continue |
|
|
except Exception as e: |
|
|
print(f"[{FLOW_ID}] Unexpected error during captioning for {image_path.name}: {e}. Retrying...") |
|
|
continue |
|
|
finally: |
|
|
if server: |
|
|
end_time = time.time() |
|
|
server.busy = False |
|
|
server.total_processed += 1 |
|
|
server.total_time += (end_time - start_time) |
|
|
|
|
|
print(f"[{FLOW_ID}] FAILED after {MAX_RETRIES} attempts for {image_path.name}.") |
|
|
return None |
|
|
|
|
|
async def process_dataset_task(start_index: int): |
|
|
"""Main task to process the dataset sequentially starting from a given index.""" |
|
|
|
|
|
|
|
|
progress = load_progress() |
|
|
current_state = await download_hf_state() |
|
|
file_list = await get_zip_file_list(progress) |
|
|
|
|
|
if not file_list: |
|
|
print(f"[{FLOW_ID}] ERROR: Cannot proceed. File list is empty.") |
|
|
return False |
|
|
|
|
|
|
|
|
if start_index > len(file_list): |
|
|
print(f"[{FLOW_ID}] WARNING: Start index {start_index} is greater than the total number of files ({len(file_list)}). Exiting.") |
|
|
return True |
|
|
|
|
|
|
|
|
start_list_index = start_index - 1 |
|
|
|
|
|
print(f"[{FLOW_ID}] Starting dataset processing from file index: {start_index} out of {len(file_list)}.") |
|
|
|
|
|
global_success = True |
|
|
|
|
|
for i in range(start_list_index, len(file_list)): |
|
|
file_index = i + 1 |
|
|
repo_file_full_path = file_list[i] |
|
|
zip_full_name = Path(repo_file_full_path).name |
|
|
course_name = zip_full_name.replace('.zip', '') |
|
|
|
|
|
|
|
|
file_state = current_state["file_states"].get(zip_full_name) |
|
|
if file_state == "processed": |
|
|
print(f"[{FLOW_ID}] Skipping {zip_full_name}: Already processed in global state.") |
|
|
continue |
|
|
elif file_state == "processing": |
|
|
print(f"[{FLOW_ID}] Skipping {zip_full_name}: Currently being processed by another worker.") |
|
|
continue |
|
|
|
|
|
|
|
|
if not await lock_file_for_processing(zip_full_name, current_state): |
|
|
print(f"[{FLOW_ID}] Failed to lock {zip_full_name}. Skipping.") |
|
|
continue |
|
|
|
|
|
extract_dir = None |
|
|
current_file_success = False |
|
|
|
|
|
try: |
|
|
|
|
|
extract_dir = await download_and_extract_zip_by_index(file_index, repo_file_full_path) |
|
|
|
|
|
if not extract_dir: |
|
|
raise Exception("Failed to download or extract zip file.") |
|
|
|
|
|
|
|
|
|
|
|
image_paths = [p for p in extract_dir.glob("**/*") if p.is_file() and p.suffix.lower() in ['.jpg', '.jpeg', '.png']] |
|
|
print(f"[{FLOW_ID}] Found {len(image_paths)} images to process in {zip_full_name}.") |
|
|
|
|
|
if not image_paths: |
|
|
print(f"[{FLOW_ID}] No images found in {zip_full_name}. Marking as complete.") |
|
|
current_file_success = True |
|
|
else: |
|
|
|
|
|
progress_tracker = { |
|
|
'total': len(image_paths), |
|
|
'completed': 0 |
|
|
} |
|
|
print(f"[{FLOW_ID}] Starting captioning for {progress_tracker['total']} images in {zip_full_name}...") |
|
|
|
|
|
|
|
|
semaphore = asyncio.Semaphore(len(servers)) |
|
|
|
|
|
async def limited_send_image_for_captioning(image_path, course_name, progress_tracker): |
|
|
async with semaphore: |
|
|
return await send_image_for_captioning(image_path, course_name, progress_tracker) |
|
|
|
|
|
|
|
|
caption_tasks = [limited_send_image_for_captioning(p, course_name, progress_tracker) for p in image_paths] |
|
|
|
|
|
|
|
|
results = await asyncio.gather(*caption_tasks) |
|
|
|
|
|
|
|
|
all_captions = [r for r in results if r is not None] |
|
|
|
|
|
|
|
|
if len(all_captions) == len(image_paths): |
|
|
print(f"[{FLOW_ID}] FINAL PROGRESS for {zip_full_name}: Successfully completed all {len(all_captions)} captions.") |
|
|
current_file_success = True |
|
|
else: |
|
|
print(f"[{FLOW_ID}] FINAL PROGRESS for {zip_full_name}: Completed with partial result: {len(all_captions)}/{len(image_paths)} captions. Marking as partial failure.") |
|
|
current_file_success = False |
|
|
|
|
|
|
|
|
if all_captions: |
|
|
print(f"[{FLOW_ID}] Uploading {len(all_captions)} captions for {zip_full_name}...") |
|
|
if await upload_captions_to_hf(zip_full_name, all_captions): |
|
|
print(f"[{FLOW_ID}] Successfully uploaded captions for {zip_full_name}.") |
|
|
|
|
|
pass |
|
|
else: |
|
|
print(f"[{FLOW_ID}] Failed to upload captions for {zip_full_name}.") |
|
|
current_file_success = False |
|
|
else: |
|
|
print(f"[{FLOW_ID}] No captions generated. Skipping upload for {zip_full_name}.") |
|
|
current_file_success = False |
|
|
|
|
|
except Exception as e: |
|
|
print(f"[{FLOW_ID}] Critical error in process_dataset_task for file #{file_index} ({zip_full_name}): {e}") |
|
|
current_file_success = False |
|
|
global_success = False |
|
|
|
|
|
finally: |
|
|
|
|
|
if extract_dir and extract_dir.exists(): |
|
|
print(f"[{FLOW_ID}] Cleaned up temporary directory {extract_dir}.") |
|
|
shutil.rmtree(extract_dir, ignore_errors=True) |
|
|
|
|
|
if current_file_success: |
|
|
|
|
|
progress['last_processed_index'] = file_index |
|
|
progress['processed_files'][str(file_index)] = repo_file_full_path |
|
|
save_progress(progress) |
|
|
|
|
|
|
|
|
if await unlock_file_as_processed(zip_full_name, current_state, file_index + 1): |
|
|
print(f"[{FLOW_ID}] Progress saved and file unlocked: {zip_full_name}") |
|
|
else: |
|
|
print(f"[{FLOW_ID}] Warning: File processed but state update failed: {zip_full_name}") |
|
|
else: |
|
|
|
|
|
current_state["file_states"][zip_full_name] = "failed" |
|
|
await upload_hf_state(current_state) |
|
|
print(f"[{FLOW_ID}] File {zip_full_name} marked as failed. Continuing with next file.") |
|
|
global_success = False |
|
|
|
|
|
print(f"[{FLOW_ID}] All processing loops complete. Overall success: {global_success}") |
|
|
return global_success |
|
|
|
|
|
|
|
|
|
|
|
app = FastAPI( |
|
|
title=f"Flow Server {FLOW_ID} API", |
|
|
description="Sequentially processes zip files from a dataset, captions images, and tracks progress.", |
|
|
version="1.0.0" |
|
|
) |
|
|
|
|
|
@app.on_event("startup") |
|
|
async def startup_event(): |
|
|
print(f"Flow Server {FLOW_ID} started on port {FLOW_PORT}.") |
|
|
|
|
|
|
|
|
progress = load_progress() |
|
|
|
|
|
start_index = progress.get('last_processed_index', 0) + 1 |
|
|
if start_index < AUTO_START_INDEX: |
|
|
start_index = AUTO_START_INDEX |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
print(f"[{FLOW_ID}] Auto-starting processing from index: {start_index}...") |
|
|
asyncio.create_task(process_dataset_task(start_index)) |
|
|
|
|
|
@app.get("/") |
|
|
async def root(): |
|
|
progress = load_progress() |
|
|
return { |
|
|
"flow_id": FLOW_ID, |
|
|
"status": "ready", |
|
|
"last_processed_index": progress['last_processed_index'], |
|
|
"total_files_in_list": len(progress['file_list']), |
|
|
"processed_files_count": len(progress['processed_files']), |
|
|
"total_servers": len(servers), |
|
|
"busy_servers": sum(1 for s in servers if s.busy), |
|
|
} |
|
|
|
|
|
@app.post("/start_processing") |
|
|
async def start_processing(request: ProcessStartRequest, background_tasks: BackgroundTasks): |
|
|
""" |
|
|
Starts the sequential processing of zip files from the given index in the background. |
|
|
""" |
|
|
start_index = request.start_index |
|
|
|
|
|
print(f"[{FLOW_ID}] Received request to start processing from index: {start_index}. Starting background task.") |
|
|
|
|
|
|
|
|
|
|
|
background_tasks.add_task(process_dataset_task, start_index) |
|
|
|
|
|
return {"status": "processing", "start_index": start_index, "message": "Dataset processing started in background."} |
|
|
|
|
|
if __name__ == "__main__": |
|
|
import uvicorn |
|
|
|
|
|
uvicorn.run(app, host="0.0.0.0", port=FLOW_PORT) |
|
|
|