| import os |
| import json |
| import time |
| import asyncio |
| import aiohttp |
| import zipfile |
| import shutil |
| from typing import Dict, List, Set, Optional, Tuple, Any |
| from urllib.parse import quote |
| from datetime import datetime |
| from pathlib import Path |
| import io |
|
|
| from fastapi import FastAPI, BackgroundTasks, HTTPException, status |
| from pydantic import BaseModel, Field |
| from huggingface_hub import HfApi, hf_hub_download |
|
|
| |
| AUTO_START_INDEX = 1 |
| FLOW_ID = os.getenv("FLOW_ID", "flow_default") |
| FLOW_PORT = int(os.getenv("FLOW_PORT", 8001)) |
| HF_TOKEN = os.getenv("HF_TOKEN", "") |
| HF_DATASET_ID = os.getenv("HF_DATASET_ID", "samfred2/BG4") |
| HF_OUTPUT_DATASET_ID = os.getenv("HF_OUTPUT_DATASET_ID", "samelias1/Data_TG") |
|
|
| |
| PROGRESS_FILE = Path("processing_progress.json") |
| HF_STATE_FILE = "processing_state_cursors.json" |
| LOCAL_STATE_FOLDER = Path(".state") |
| LOCAL_STATE_FOLDER.mkdir(exist_ok=True) |
|
|
| |
| ZIP_FILE_PREFIX = "frames_zips/" |
|
|
| |
| CAPTION_SERVERS = [ |
| "https://elias5584-supreme-1.hf.space/track_cursor", |
| "https://elias5584-supreme-2.hf.space/track_cursor", |
| "https://elias5584-supreme-3.hf.space/track_cursor", |
| "https://elias5584-supreme-4.hf.space/track_cursor", |
| "https://elias5584-supreme-5.hf.space/track_cursor", |
| "https://elias5584-supreme-6.hf.space/track_cursor", |
| "https://elias5584-supreme-7.hf.space/track_cursor", |
| "https://elias5584-supreme-8.hf.space/track_cursor", |
| "https://elias5584-supreme-9.hf.space/track_cursor", |
| "https://elias5584-supreme-10.hf.space/track_cursor", |
| "https://elias5584-supreme-11.hf.space/track_cursor", |
| "https://elias5584-supreme-12.hf.space/track_cursor", |
| "https://elias5584-supreme-13.hf.space/track_cursor", |
| "https://elias5584-supreme-14.hf.space/track_cursor", |
| "https://elias5584-supreme-15.hf.space/track_cursor", |
| "https://elias5584-supreme-16.hf.space/track_cursor", |
| "https://elias5584-supreme-17.hf.space/track_cursor", |
| "https://elias5584-supreme-18.hf.space/track_cursor", |
| "https://elias5584-supreme-19.hf.space/track_cursor", |
| "https://elias5584-supreme-20.hf.space/track_cursor", |
| "https://lovyone-elite-1.hf.space/track_cursor", |
| "https://lovyone-elite-2.hf.space/track_cursor", |
| "https://lovyone-elite-3.hf.space/track_cursor", |
| "https://lovyone-elite-4.hf.space/track_cursor", |
| "https://lovyone-elite-5.hf.space/track_cursor", |
| "https://lovyone-elite-6.hf.space/track_cursor", |
| "https://lovyone-elite-7.hf.space/track_cursor", |
| "https://lovyone-elite-8.hf.space/track_cursor", |
| "https://lovyone-elite-9.hf.space/track_cursor", |
| "https://lovyone-elite-10.hf.space/track_cursor", |
| "https://lovyone-elite-11.hf.space/track_cursor", |
| "https://lovyone-elite-12.hf.space/track_cursor", |
| "https://lovyone-elite-13.hf.space/track_cursor", |
| "https://lovyone-elite-14.hf.space/track_cursor", |
| "https://lovyone-elite-15.hf.space/track_cursor", |
| "https://lovyone-elite-16.hf.space/track_cursor", |
| "https://lovyone-elite-17.hf.space/track_cursor", |
| "https://lovyone-elite-18.hf.space/track_cursor", |
| "https://lovyone-elite-19.hf.space/track_cursor", |
| "https://lovyone-elite-20.hf.space/track_cursor", |
| "https://eliason1-pia-3.hf.space/track_cursor", |
| "https://eliason1-pia-4.hf.space/track_cursor", |
| "https://eliason1-pia-5.hf.space/track_cursor", |
| "https://eliason1-pia-6.hf.space/track_cursor", |
| "https://eliason1-pia-7.hf.space/track_cursor", |
| "https://eliason1-pia-8.hf.space/track_cursor", |
| "https://eliason1-pia-9.hf.space/track_cursor", |
| "https://eliason1-pia-10.hf.space/track_cursor", |
| "https://eliason1-pia-11.hf.space/track_cursor", |
| "https://eliason1-pia-12.hf.space/track_cursor", |
| "https://eliason1-pia-13.hf.space/track_cursor", |
| "https://eliason1-pia-14.hf.space/track_cursor", |
| "https://eliason1-pia-15.hf.space/track_cursor", |
| "https://eliason1-pia-16.hf.space/track_cursor", |
| "https://eliason1-pia-17.hf.space/track_cursor", |
| "https://eliason1-pia-18.hf.space/track_cursor", |
| "https://eliason1-pia-19.hf.space/track_cursor", |
| "https://eliason1-pia-20.hf.space/track_cursor", |
| "https://jirehlove-clearly-1.hf.space/track_cursor", |
| "https://jirehlove-clearly-2.hf.space/track_cursor", |
| "https://jirehlove-clearly-3.hf.space/track_cursor", |
| "https://jirehlove-clearly-4.hf.space/track_cursor", |
| "https://jirehlove-clearly-5.hf.space/track_cursor", |
| "https://jirehlove-clearly-6.hf.space/track_cursor", |
| "https://jirehlove-clearly-7.hf.space/track_cursor", |
| "https://jirehlove-clearly-8.hf.space/track_cursor", |
| "https://jirehlove-clearly-9.hf.space/track_cursor", |
| "https://jirehlove-clearly-10.hf.space/track_cursor", |
| "https://jirehlove-clearly-11.hf.space/track_cursor", |
| "https://jirehlove-clearly-12.hf.space/track_cursor", |
| "https://jirehlove-clearly-13.hf.space/track_cursor", |
| "https://jirehlove-clearly-14.hf.space/track_cursor", |
| "https://jirehlove-clearly-15.hf.space/track_cursor", |
| "https://jirehlove-clearly-16.hf.space/track_cursor", |
| "https://jirehlove-clearly-17.hf.space/track_cursor", |
| "https://jirehlove-clearly-18.hf.space/track_cursor", |
| "https://jirehlove-clearly-19.hf.space/track_cursor", |
| "https://jirehlove-clearly-20.hf.space/track_cursor" |
| ] |
| MODEL_TYPE = "Florence-2-large" |
|
|
| |
| TEMP_DIR = Path(f"temp_images_{FLOW_ID}") |
| TEMP_DIR.mkdir(exist_ok=True) |
|
|
| |
| class ProcessStartRequest(BaseModel): |
| start_index: int = Field(AUTO_START_INDEX, ge=1, description="The index number of the zip file to start processing from (1-indexed).") |
|
|
| class CaptionServer: |
| def __init__(self, url): |
| self.url = url |
| self.busy = False |
| self.total_processed = 0 |
| self.total_time = 0 |
| self.model = MODEL_TYPE |
|
|
| @property |
| def fps(self): |
| return self.total_processed / self.total_time if self.total_time > 0 else 0 |
|
|
| |
| servers = [CaptionServer(url) for url in CAPTION_SERVERS] |
| server_index = 0 |
|
|
| |
|
|
| def load_progress() -> Dict: |
| """Loads the local processing progress from the JSON file.""" |
| if PROGRESS_FILE.exists(): |
| try: |
| with PROGRESS_FILE.open('r') as f: |
| return json.load(f) |
| except json.JSONDecodeError: |
| print(f"[{FLOW_ID}] WARNING: Progress file is corrupted. Starting fresh.") |
| |
| |
| |
| return { |
| "last_processed_index": 0, |
| "processed_files": {}, |
| "file_list": [] |
| } |
|
|
| def save_progress(progress_data: Dict): |
| """Saves the local processing progress to the JSON file.""" |
| try: |
| with PROGRESS_FILE.open('w') as f: |
| json.dump(progress_data, f, indent=4) |
| except Exception as e: |
| print(f"[{FLOW_ID}] CRITICAL ERROR: Could not save progress to {PROGRESS_FILE}: {e}") |
|
|
| def load_json_state(file_path: str, default_value: Dict[str, Any]) -> Dict[str, Any]: |
| """Load state from JSON file with migration logic for new structure.""" |
| if os.path.exists(file_path): |
| try: |
| with open(file_path, "r") as f: |
| data = json.load(f) |
| |
| |
| if "file_states" not in data or not isinstance(data["file_states"], dict): |
| print(f"[{FLOW_ID}] Initializing 'file_states' dictionary.") |
| data["file_states"] = {} |
| |
| if "next_download_index" not in data: |
| data["next_download_index"] = 0 |
| |
| return data |
| except json.JSONDecodeError: |
| print(f"[{FLOW_ID}] WARNING: Corrupted state file: {file_path}") |
| return default_value |
|
|
| def save_json_state(file_path: str, data: Dict[str, Any]): |
| """Save state to JSON file""" |
| with open(file_path, "w") as f: |
| json.dump(data, f, indent=2) |
|
|
| async def download_hf_state() -> Dict[str, Any]: |
| """Downloads the state file from Hugging Face or returns a default state.""" |
| local_path = LOCAL_STATE_FOLDER / HF_STATE_FILE |
| default_state = {"next_download_index": 0, "file_states": {}} |
| |
| try: |
| |
| files = HfApi(token=HF_TOKEN).list_repo_files( |
| repo_id=HF_OUTPUT_DATASET_ID, |
| repo_type="dataset" |
| ) |
| |
| if HF_STATE_FILE not in files: |
| print(f"[{FLOW_ID}] State file not found in {HF_OUTPUT_DATASET_ID}. Starting fresh.") |
| return default_state |
|
|
| |
| hf_hub_download( |
| repo_id=HF_OUTPUT_DATASET_ID, |
| filename=HF_STATE_FILE, |
| repo_type="dataset", |
| local_dir=LOCAL_STATE_FOLDER, |
| local_dir_use_symlinks=False, |
| token=HF_TOKEN |
| ) |
| |
| print(f"[{FLOW_ID}] Successfully downloaded state file.") |
| return load_json_state(str(local_path), default_state) |
| |
| except Exception as e: |
| print(f"[{FLOW_ID}] Failed to download state file: {str(e)}. Starting fresh.") |
| return default_state |
|
|
| async def upload_hf_state(state: Dict[str, Any]) -> bool: |
| """Uploads the state file to Hugging Face.""" |
| local_path = LOCAL_STATE_FOLDER / HF_STATE_FILE |
| |
| try: |
| |
| save_json_state(str(local_path), state) |
| |
| |
| HfApi(token=HF_TOKEN).upload_file( |
| path_or_fileobj=str(local_path), |
| path_in_repo=HF_STATE_FILE, |
| repo_id=HF_OUTPUT_DATASET_ID, |
| repo_type="dataset", |
| commit_message=f"Update caption processing state: next_index={state['next_download_index']}" |
| ) |
| print(f"[{FLOW_ID}] Successfully uploaded state file.") |
| return True |
| except Exception as e: |
| print(f"[{FLOW_ID}] Failed to upload state file: {str(e)}") |
| return False |
|
|
| async def lock_file_for_processing(zip_filename: str, state: Dict[str, Any]) -> bool: |
| """Marks a file as 'processing' in the state file and uploads the lock.""" |
| print(f"[{FLOW_ID}] 🔒 Attempting to lock file: {zip_filename}") |
| |
| |
| state["file_states"][zip_filename] = "processing" |
| |
| |
| if await upload_hf_state(state): |
| print(f"[{FLOW_ID}] ✅ Successfully locked file: {zip_filename}") |
| return True |
| else: |
| print(f"[{FLOW_ID}] ❌ Failed to lock file: {zip_filename}") |
| |
| if zip_filename in state["file_states"]: |
| del state["file_states"][zip_filename] |
| return False |
|
|
| async def unlock_file_as_processed(zip_filename: str, state: Dict[str, Any], next_index: int) -> bool: |
| """Marks a file as 'processed', updates the index, and uploads the state.""" |
| print(f"[{FLOW_ID}] 🔓 Marking file as processed: {zip_filename}") |
| |
| |
| state["file_states"][zip_filename] = "processed" |
| state["next_download_index"] = next_index |
| |
| |
| if await upload_hf_state(state): |
| print(f"[{FLOW_ID}] ✅ Successfully marked as processed: {zip_filename}") |
| return True |
| else: |
| print(f"[{FLOW_ID}] ❌ Failed to update state for: {zip_filename}") |
| return False |
|
|
| |
|
|
| async def get_zip_file_list(progress_data: Dict) -> List[str]: |
| """ |
| Fetches the list of all zip files from the dataset, or uses the cached list. |
| Updates the progress_data with the file list if a new list is fetched. |
| """ |
| if progress_data['file_list']: |
| print(f"[{FLOW_ID}] Using cached file list with {len(progress_data['file_list'])} files.") |
| return progress_data['file_list'] |
|
|
| print(f"[{FLOW_ID}] Fetching full list of zip files from {HF_DATASET_ID}...") |
| try: |
| api = HfApi(token=HF_TOKEN) |
| repo_files = api.list_repo_files( |
| repo_id=HF_DATASET_ID, |
| repo_type="dataset" |
| ) |
| |
| |
| zip_files = sorted([ |
| f for f in repo_files |
| if f.startswith(ZIP_FILE_PREFIX) and f.endswith('.zip') |
| ]) |
| |
| if not zip_files: |
| raise FileNotFoundError(f"No zip files found in '{ZIP_FILE_PREFIX}' directory of dataset '{HF_DATASET_ID}'.") |
| |
| print(f"[{FLOW_ID}] Found {len(zip_files)} zip files.") |
| |
| |
| progress_data['file_list'] = zip_files |
| save_progress(progress_data) |
| |
| return zip_files |
| |
| except Exception as e: |
| print(f"[{FLOW_ID}] Error fetching file list from Hugging Face: {e}") |
| return [] |
|
|
| async def download_and_extract_zip_by_index(file_index: int, repo_file_full_path: str) -> Optional[Path]: |
| """Downloads the zip file for the given index and extracts its contents.""" |
| |
| |
| zip_full_name = Path(repo_file_full_path).name |
| course_name = zip_full_name.replace('.zip', '') |
| |
| print(f"[{FLOW_ID}] Processing file #{file_index}: {repo_file_full_path}. Full name: {zip_full_name}") |
| |
| try: |
| |
| zip_path = hf_hub_download( |
| repo_id=HF_DATASET_ID, |
| filename=repo_file_full_path, |
| repo_type="dataset", |
| token=HF_TOKEN, |
| ) |
| |
| print(f"[{FLOW_ID}] Downloaded to {zip_path}. Extracting...") |
| |
| |
| extract_dir = TEMP_DIR / course_name |
| |
| if extract_dir.exists(): |
| shutil.rmtree(extract_dir) |
| extract_dir.mkdir(exist_ok=True) |
| |
| with zipfile.ZipFile(zip_path, 'r') as zip_ref: |
| zip_ref.extractall(extract_dir) |
| |
| print(f"[{FLOW_ID}] Extraction complete to {extract_dir}.") |
| |
| |
| os.remove(zip_path) |
| |
| return extract_dir |
| |
| except Exception as e: |
| print(f"[{FLOW_ID}] Error downloading or extracting zip for {repo_file_full_path}: {e}") |
| return None |
|
|
| async def upload_captions_to_hf(zip_full_name: str, captions: List[Dict]) -> bool: |
| """Uploads the final captions JSON file to the output dataset.""" |
| |
| caption_filename = Path(zip_full_name).with_suffix('.json').name |
| |
| try: |
| print(f"[{FLOW_ID}] Uploading {len(captions)} captions for {zip_full_name} as {caption_filename} to {HF_OUTPUT_DATASET_ID}...") |
| |
| |
| json_content = json.dumps(captions, indent=2, ensure_ascii=False).encode('utf-8') |
| |
| api = HfApi(token=HF_TOKEN) |
| api.upload_file( |
| path_or_fileobj=io.BytesIO(json_content), |
| path_in_repo=caption_filename, |
| repo_id=HF_OUTPUT_DATASET_ID, |
| repo_type="dataset", |
| commit_message=f"[{FLOW_ID}] Captions for {zip_full_name}" |
| ) |
| |
| print(f"[{FLOW_ID}] Successfully uploaded captions for {zip_full_name}.") |
| return True |
| |
| except Exception as e: |
| print(f"[{FLOW_ID}] Error uploading captions for {zip_full_name}: {e}") |
| return False |
|
|
| |
|
|
| async def get_available_server(timeout: float = 300.0) -> CaptionServer: |
| """Round-robin selection of an available caption server.""" |
| global server_index |
| start_time = time.time() |
| while True: |
| |
| for _ in range(len(servers)): |
| server = servers[server_index] |
| server_index = (server_index + 1) % len(servers) |
| if not server.busy: |
| return server |
| |
| |
| await asyncio.sleep(0.5) |
| |
| |
| if time.time() - start_time > timeout: |
| raise TimeoutError(f"Timeout ({timeout}s) waiting for an available caption server.") |
|
|
| async def send_image_for_captioning(image_path: Path, course_name: str, progress_tracker: Dict) -> Optional[Dict]: |
| """Sends a single image to a caption server for processing.""" |
| |
| MAX_RETRIES = 3 |
| for attempt in range(MAX_RETRIES): |
| server = None |
| try: |
| |
| server = await get_available_server() |
| server.busy = True |
| start_time = time.time() |
| |
| |
| if attempt == 0: |
| print(f"[{FLOW_ID}] Starting attempt on {image_path.name}...") |
| |
| |
| form_data = aiohttp.FormData() |
| form_data.add_field('file', |
| image_path.open('rb'), |
| filename=image_path.name, |
| content_type='image/jpeg') |
| form_data.add_field('model_choice', MODEL_TYPE) |
| |
| |
| async with aiohttp.ClientSession() as session: |
| |
| async with session.post(server.url, data=form_data, timeout=600) as resp: |
| if resp.status == 200: |
| result = await resp.json() |
| |
| |
| if result.get('cursor_active') is not None: |
| |
| progress_tracker['completed'] += 1 |
| if progress_tracker['completed'] % 50 == 0: |
| print(f"[{FLOW_ID}] PROGRESS: {progress_tracker['completed']}/{progress_tracker['total']} detections completed.") |
| |
| |
| if progress_tracker['completed'] % 50 != 0: |
| print(f"[{FLOW_ID}] Success: {image_path.name} processed by {server.url}") |
| |
| |
| return { |
| "course": course_name, |
| "image_path": image_path.name, |
| "cursor_active": result.get('cursor_active', False), |
| "x": result.get('x'), |
| "y": result.get('y'), |
| "confidence": result.get('confidence'), |
| "template": result.get('template'), |
| "image_shape": result.get('image_shape'), |
| "server_url": server.url, |
| "timestamp": datetime.now().isoformat() |
| } |
| else: |
| print(f"[{FLOW_ID}] Server {server.url} returned invalid response format for {image_path.name}. Response: {result}") |
| continue |
| else: |
| error_text = await resp.text() |
| print(f"[{FLOW_ID}] Error from server {server.url} for {image_path.name}: {resp.status} - {error_text}. Retrying...") |
| continue |
| |
| except (aiohttp.ClientError, asyncio.TimeoutError, TimeoutError) as e: |
| print(f"[{FLOW_ID}] Connection/Timeout error for {image_path.name} on {server.url if server else 'unknown server'}: {e}. Retrying...") |
| continue |
| except Exception as e: |
| print(f"[{FLOW_ID}] Unexpected error during captioning for {image_path.name}: {e}. Retrying...") |
| continue |
| finally: |
| if server: |
| end_time = time.time() |
| server.busy = False |
| server.total_processed += 1 |
| server.total_time += (end_time - start_time) |
|
|
| print(f"[{FLOW_ID}] FAILED after {MAX_RETRIES} attempts for {image_path.name}.") |
| return None |
|
|
| async def process_dataset_task(start_index: int): |
| """Main task to process the dataset sequentially starting from a given index.""" |
| |
| |
| progress = load_progress() |
| current_state = await download_hf_state() |
| file_list = await get_zip_file_list(progress) |
| |
| if not file_list: |
| print(f"[{FLOW_ID}] ERROR: Cannot proceed. File list is empty.") |
| return False |
| |
| |
| if start_index > len(file_list): |
| print(f"[{FLOW_ID}] WARNING: Start index {start_index} is greater than the total number of files ({len(file_list)}). Exiting.") |
| return True |
| |
| |
| start_list_index = start_index - 1 |
| |
| print(f"[{FLOW_ID}] Starting dataset processing from file index: {start_index} out of {len(file_list)}.") |
| |
| global_success = True |
| |
| for i in range(start_list_index, len(file_list)): |
| file_index = i + 1 |
| repo_file_full_path = file_list[i] |
| zip_full_name = Path(repo_file_full_path).name |
| course_name = zip_full_name.replace('.zip', '') |
| |
| |
| file_state = current_state["file_states"].get(zip_full_name) |
| if file_state == "processed": |
| print(f"[{FLOW_ID}] Skipping {zip_full_name}: Already processed in global state.") |
| continue |
| elif file_state == "processing": |
| print(f"[{FLOW_ID}] Skipping {zip_full_name}: Currently being processed by another worker.") |
| continue |
| |
| |
| if not await lock_file_for_processing(zip_full_name, current_state): |
| print(f"[{FLOW_ID}] Failed to lock {zip_full_name}. Skipping.") |
| continue |
| |
| extract_dir = None |
| current_file_success = False |
| |
| try: |
| |
| extract_dir = await download_and_extract_zip_by_index(file_index, repo_file_full_path) |
| |
| if not extract_dir: |
| raise Exception("Failed to download or extract zip file.") |
| |
| |
| |
| image_paths = [p for p in extract_dir.glob("**/*") if p.is_file() and p.suffix.lower() in ['.jpg', '.jpeg', '.png']] |
| print(f"[{FLOW_ID}] Found {len(image_paths)} images to process in {zip_full_name}.") |
| |
| if not image_paths: |
| print(f"[{FLOW_ID}] No images found in {zip_full_name}. Marking as complete.") |
| current_file_success = True |
| else: |
| |
| progress_tracker = { |
| 'total': len(image_paths), |
| 'completed': 0 |
| } |
| print(f"[{FLOW_ID}] Starting captioning for {progress_tracker['total']} images in {zip_full_name}...") |
| |
| |
| semaphore = asyncio.Semaphore(len(servers)) |
| |
| async def limited_send_image_for_captioning(image_path, course_name, progress_tracker): |
| async with semaphore: |
| return await send_image_for_captioning(image_path, course_name, progress_tracker) |
| |
| |
| caption_tasks = [limited_send_image_for_captioning(p, course_name, progress_tracker) for p in image_paths] |
| |
| |
| results = await asyncio.gather(*caption_tasks) |
| |
| |
| all_captions = [r for r in results if r is not None] |
| |
| |
| if len(all_captions) == len(image_paths): |
| print(f"[{FLOW_ID}] FINAL PROGRESS for {zip_full_name}: Successfully processed all {len(all_captions)} images.") |
| else: |
| print(f"[{FLOW_ID}] FINAL PROGRESS for {zip_full_name}: Completed with partial result: {len(all_captions)}/{len(image_paths)} images.") |
| |
| |
| cursor_detected = sum(1 for c in all_captions if c.get('cursor_active', False)) |
| print(f"[{FLOW_ID}] Detection Statistics:") |
| print(f"- Total processed: {len(all_captions)}") |
| print(f"- Cursors detected: {cursor_detected}") |
| print(f"- Detection rate: {(cursor_detected/len(all_captions)*100):.2f}%") |
| |
| |
| current_file_success = len(all_captions) > 0 |
| |
| |
| if all_captions: |
| print(f"[{FLOW_ID}] Uploading {len(all_captions)} captions for {zip_full_name}...") |
| if await upload_captions_to_hf(zip_full_name, all_captions): |
| print(f"[{FLOW_ID}] Successfully uploaded captions for {zip_full_name}.") |
| |
| current_file_success = True |
| else: |
| print(f"[{FLOW_ID}] Failed to upload captions for {zip_full_name}.") |
| current_file_success = False |
| else: |
| print(f"[{FLOW_ID}] No captions generated. Skipping upload for {zip_full_name}.") |
| current_file_success = False |
| |
| except Exception as e: |
| print(f"[{FLOW_ID}] Critical error in process_dataset_task for file #{file_index} ({zip_full_name}): {e}") |
| current_file_success = False |
| global_success = False |
| |
| finally: |
| |
| if extract_dir and extract_dir.exists(): |
| print(f"[{FLOW_ID}] Cleaned up temporary directory {extract_dir}.") |
| shutil.rmtree(extract_dir, ignore_errors=True) |
| |
| if current_file_success: |
| |
| progress['last_processed_index'] = file_index |
| progress['processed_files'][str(file_index)] = repo_file_full_path |
| save_progress(progress) |
| |
| |
| if await unlock_file_as_processed(zip_full_name, current_state, file_index + 1): |
| print(f"[{FLOW_ID}] Progress saved and file unlocked: {zip_full_name}") |
| else: |
| print(f"[{FLOW_ID}] Warning: File processed but state update failed: {zip_full_name}") |
| else: |
| |
| current_state["file_states"][zip_full_name] = "failed" |
| await upload_hf_state(current_state) |
| print(f"[{FLOW_ID}] File {zip_full_name} marked as failed. Continuing with next file.") |
| global_success = False |
| |
| print(f"[{FLOW_ID}] All processing loops complete. Overall success: {global_success}") |
| return global_success |
|
|
| |
|
|
| app = FastAPI( |
| title=f"Flow Server {FLOW_ID} API", |
| description="Sequentially processes zip files from a dataset, captions images, and tracks progress.", |
| version="1.0.0" |
| ) |
|
|
| @app.on_event("startup") |
| async def startup_event(): |
| print(f"Flow Server {FLOW_ID} started on port {FLOW_PORT}.") |
| |
| |
| progress = load_progress() |
| current_state = await download_hf_state() |
| |
| |
| hf_next_index = current_state.get("next_download_index", 0) |
| |
| |
| if hf_next_index > 0: |
| start_index = hf_next_index |
| print(f"[{FLOW_ID}] Using next_download_index from HF state: {start_index}") |
| else: |
| |
| start_index = progress.get('last_processed_index', 0) + 1 |
| if start_index < AUTO_START_INDEX: |
| start_index = AUTO_START_INDEX |
|
|
| |
| |
| |
| print(f"[{FLOW_ID}] Auto-starting processing from index: {start_index}...") |
| asyncio.create_task(process_dataset_task(start_index)) |
|
|
| @app.get("/") |
| async def root(): |
| progress = load_progress() |
| return { |
| "flow_id": FLOW_ID, |
| "status": "ready", |
| "last_processed_index": progress['last_processed_index'], |
| "total_files_in_list": len(progress['file_list']), |
| "processed_files_count": len(progress['processed_files']), |
| "total_servers": len(servers), |
| "busy_servers": sum(1 for s in servers if s.busy), |
| } |
|
|
| @app.post("/start_processing") |
| async def start_processing(request: ProcessStartRequest, background_tasks: BackgroundTasks): |
| """ |
| Starts the sequential processing of zip files from the given index in the background. |
| """ |
| start_index = request.start_index |
| |
| print(f"[{FLOW_ID}] Received request to start processing from index: {start_index}. Starting background task.") |
| |
| |
| |
| background_tasks.add_task(process_dataset_task, start_index) |
| |
| return {"status": "processing", "start_index": start_index, "message": "Dataset processing started in background."} |
|
|
| if __name__ == "__main__": |
| import uvicorn |
| |
| uvicorn.run(app, host="0.0.0.0", port=FLOW_PORT) |