Spaces:
Runtime error
Runtime error
| import os | |
| import json | |
| import time | |
| import asyncio | |
| import aiohttp | |
| from typing import Dict, List, Set, Optional | |
| from urllib.parse import quote, urljoin | |
| from datetime import datetime | |
| from pathlib import Path | |
| from datasets import Dataset, DatasetDict | |
| import huggingface_hub | |
| from fastapi import FastAPI, BackgroundTasks, HTTPException, status | |
| from fastapi.responses import JSONResponse | |
| from pydantic import BaseModel, Field | |
| import uvicorn | |
| # Path for storing caption data | |
| CAPTIONS_DIR = Path("captions_data") | |
| CAPTIONS_DIR.mkdir(exist_ok=True) | |
| # Hugging Face configuration | |
| HF_TOKEN = os.getenv("HF_TOKEN") | |
| HF_DATASET_ID = os.getenv("HF_DATASET_ID", "fred808/helium") | |
| if not HF_TOKEN: | |
| raise ValueError("HF_TOKEN environment variable is required") | |
| def get_caption_file_path(course: str) -> Path: | |
| """Get the path to the JSON file for storing course captions""" | |
| safe_name = quote(course, safe='') | |
| return CAPTIONS_DIR / f"{safe_name}_captions.json" | |
| def save_captions_to_file(course: str, captions: List[Dict]) -> None: | |
| """Save captions to a JSON file""" | |
| try: | |
| file_path = get_caption_file_path(course) | |
| with open(file_path, 'w', encoding='utf-8') as f: | |
| json.dump(captions, f, indent=2, ensure_ascii=False) | |
| print(f"β Saved {len(captions)} captions for {course}") | |
| except Exception as e: | |
| print(f"Error saving captions for {course}: {e}") | |
| def load_captions_from_file(course: str) -> List[Dict]: | |
| """Load existing captions from JSON file""" | |
| try: | |
| file_path = get_caption_file_path(course) | |
| if file_path.exists(): | |
| with open(file_path, 'r', encoding='utf-8') as f: | |
| captions = json.load(f) | |
| print(f"β Loaded {len(captions)} existing captions for {course}") | |
| return captions | |
| except Exception as e: | |
| print(f"Error loading captions for {course}: {e}") | |
| return [] | |
| # Configuration | |
| SOURCE_SERVER = "https://favoredone-flowk.hf.space" | |
| CAPTION_SERVERS = [ | |
| "https://favoredone-favoredone-tv88mp.hf.space/analyze", | |
| "https://favoredone-favoredone-7p1dcf.hf.space/analyze", | |
| "https://favoredone-favoredone-k7b4mf.hf.space/analyze", | |
| "https://favoredone-favoredone-mzlxc7.hf.space/analyze", | |
| "https://favoredone-favoredone-aomfwa.hf.space/analyze", | |
| "https://favoredone-favoredone-7g6v04.hf.space/analyze", | |
| "https://favoredone-favoredone-dk1skh.hf.space/analyze", | |
| "https://favoredone-favoredone-z4yo0y.hf.space/analyze", | |
| "https://favoredone-favoredone-f6czeq.hf.space/analyze", | |
| "https://favoredone-favoredone-5fo8ga.hf.space/analyze", | |
| "https://favoredone-favoredone-zde8x6.hf.space/analyze", | |
| "https://favoredone-favoredone-r0biih.hf.space/analyze", | |
| "https://favoredone-favoredone-ljdzkf.hf.space/analyze", | |
| "https://favoredone-favoredone-irrpe5.hf.space/analyze", | |
| "https://favoredone-favoredone-bh9rwz.hf.space/analyze", | |
| "https://favoredone-favoredone-u8c4dt.hf.space/analyze", | |
| "https://favoredone-favoredone-futwyd.hf.space/analyze", | |
| "https://favoredone-favoredone-hg2sot.hf.space/analyze", | |
| "https://favoredone-favoredone-pvweug.hf.space/analyze", | |
| "https://favoredone-favoredone-z6azk2.hf.space/analyze", | |
| "https://favoredone-favoredone-4zid9w.hf.space/analyze", | |
| "https://favoredone-favoredone-be7a1r.hf.space/analyze", | |
| "https://favoredone-favoredone-ayazxa.hf.space/analyze", | |
| "https://favoredone-favoredone-6ckj4m.hf.space/analyze", | |
| "https://favoredone-favoredone-whn0xu.hf.space/analyze", | |
| "https://favoredone-favoredone-t49exm.hf.space/analyze", | |
| "https://favoredone-favoredone-cgrh0a.hf.space/analyze", | |
| "https://favoredone-favoredone-r1kb5g.hf.space/analyze" | |
| ] | |
| MODEL_TYPE = "Florence-2-large" # Explicitly request large model | |
| # FastAPI Models | |
| class CourseInfo(BaseModel): | |
| course_folder: str | |
| class ImageInfo(BaseModel): | |
| filename: str | |
| class CaptionRequest(BaseModel): | |
| image_url: str | |
| model_choice: str = MODEL_TYPE | |
| class CaptionResponse(BaseModel): | |
| success: bool | |
| caption: Optional[str] = None | |
| error: Optional[str] = None | |
| class ServerStatus(BaseModel): | |
| url: str | |
| model: str | |
| busy: bool | |
| total_processed: int | |
| total_time: float | |
| fps: float | |
| class ProcessingStatus(BaseModel): | |
| course: str | |
| total_images: int | |
| processed_images: int | |
| progress_percent: float | |
| status: str | |
| class StartProcessingRequest(BaseModel): | |
| courses: Optional[List[str]] = None # If None, process all courses | |
| continuous: bool = True # Default to continuous like original | |
| # FastAPI App | |
| app = FastAPI( | |
| title="Caption Coordinator API", | |
| description="Distributed caption processing coordinator", | |
| version="1.0.0" | |
| ) | |
| # Global state | |
| processed_images: Dict[str, Set[str]] = {} # {course: set(image_names)} | |
| course_captions: Dict[str, List[Dict]] = {} # {course: [{image, caption, metadata}]} | |
| failed_images: Dict[str, Set[str]] = {} # {course: set(image_names)} | |
| servers = [] | |
| is_processing = False | |
| current_processing_task = None | |
| auto_start_processing = True # Set to False if you don't want auto-start | |
| class CaptionServer: | |
| def __init__(self, url): | |
| self.url = url | |
| self.busy = False | |
| self.model = "unknown" | |
| self.total_processed = 0 | |
| self.total_time = 0 | |
| def fps(self): | |
| return self.total_processed / self.total_time if self.total_time > 0 else 0 | |
| # Initialize servers | |
| def initialize_servers(): | |
| global servers | |
| servers = [CaptionServer(url) for url in CAPTION_SERVERS] | |
| # API Routes | |
| async def root(): | |
| return { | |
| "message": "Caption Coordinator API", | |
| "status": "running", | |
| "auto_processing": auto_start_processing, | |
| "is_processing": is_processing | |
| } | |
| async def health(): | |
| return { | |
| "status": "healthy", | |
| "servers_available": len([s for s in servers if not s.busy]), | |
| "total_servers": len(servers), | |
| "is_processing": is_processing, | |
| "auto_processing": auto_start_processing | |
| } | |
| async def get_courses(): | |
| """Fetch available courses from source server""" | |
| try: | |
| async with aiohttp.ClientSession() as session: | |
| async with session.get(f"{SOURCE_SERVER}/courses") as resp: | |
| data = await resp.json() | |
| if isinstance(data, dict) and 'courses' in data: | |
| return [c['course_folder'] for c in data['courses'] if isinstance(c, dict)] | |
| return [] | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Error fetching courses: {e}") | |
| async def get_course_images(course: str): | |
| """Fetch images list for a course""" | |
| try: | |
| course_frames = f"{course}_frames" if not course.endswith("_frames") else course | |
| url = f"{SOURCE_SERVER}/images/{quote(course_frames)}" | |
| async with aiohttp.ClientSession() as session: | |
| async with session.get(url) as resp: | |
| data = await resp.json() | |
| if isinstance(data, dict) and 'images' in data: | |
| return data['images'] | |
| return [] | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Error fetching images: {e}") | |
| async def get_servers_status(): | |
| """Get status of all caption servers""" | |
| server_statuses = [] | |
| for server in servers: | |
| server_statuses.append(ServerStatus( | |
| url=server.url, | |
| model=server.model, | |
| busy=server.busy, | |
| total_processed=server.total_processed, | |
| total_time=server.total_time, | |
| fps=server.fps | |
| )) | |
| return server_statuses | |
| async def get_processing_status(): | |
| """Get current processing status""" | |
| status_info = {} | |
| for course in processed_images: | |
| total = len(processed_images[course]) | |
| processed = len(course_captions.get(course, [])) | |
| failed = len(failed_images.get(course, set())) | |
| status_info[course] = { | |
| "course": course, | |
| "total_images": total, | |
| "processed_images": processed, | |
| "failed_images": failed, | |
| "progress_percent": (processed / total * 100) if total > 0 else 0, | |
| "status": "completed" if processed + failed >= total else "processing" | |
| } | |
| return status_info | |
| async def start_processing(request: StartProcessingRequest = StartProcessingRequest()): | |
| """Start caption processing""" | |
| global is_processing, current_processing_task | |
| if is_processing: | |
| raise HTTPException(status_code=400, detail="Processing is already running") | |
| is_processing = True | |
| current_processing_task = asyncio.create_task( | |
| processing_loop(request.courses, request.continuous) | |
| ) | |
| return { | |
| "message": "Processing started", | |
| "continuous": request.continuous, | |
| "specific_courses": request.courses | |
| } | |
| async def stop_processing(): | |
| """Stop caption processing""" | |
| global is_processing, current_processing_task | |
| if not is_processing: | |
| raise HTTPException(status_code=400, detail="Processing is not running") | |
| is_processing = False | |
| if current_processing_task: | |
| current_processing_task.cancel() | |
| try: | |
| await current_processing_task | |
| except asyncio.CancelledError: | |
| pass | |
| current_processing_task = None | |
| return {"message": "Processing stopped"} | |
| async def get_captions(course: str): | |
| """Get captions for a specific course""" | |
| captions = load_captions_from_file(course) | |
| return { | |
| "course": course, | |
| "total_captions": len(captions), | |
| "captions": captions | |
| } | |
| async def delete_captions(course: str): | |
| """Delete captions for a specific course""" | |
| try: | |
| file_path = get_caption_file_path(course) | |
| if file_path.exists(): | |
| file_path.unlink() | |
| if course in processed_images: | |
| del processed_images[course] | |
| if course in course_captions: | |
| del course_captions[course] | |
| if course in failed_images: | |
| del failed_images[course] | |
| return {"message": f"Captions for {course} deleted"} | |
| else: | |
| raise HTTPException(status_code=404, detail=f"No captions found for {course}") | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Error deleting captions: {e}") | |
| # Core processing functions | |
| async def fetch_courses() -> List[str]: | |
| """Fetch available courses from source server""" | |
| async with aiohttp.ClientSession() as session: | |
| async with session.get(f"{SOURCE_SERVER}/courses") as resp: | |
| data = await resp.json() | |
| if isinstance(data, dict) and 'courses' in data: | |
| return [c['course_folder'] for c in data['courses'] if isinstance(c, dict)] | |
| return [] | |
| async def fetch_course_images(course: str) -> List[Dict]: | |
| """Fetch images list for a course""" | |
| course_frames = f"{course}_frames" if not course.endswith("_frames") else course | |
| url = f"{SOURCE_SERVER}/images/{quote(course_frames)}" | |
| async with aiohttp.ClientSession() as session: | |
| async with session.get(url) as resp: | |
| data = await resp.json() | |
| if isinstance(data, dict) and 'images' in data: | |
| return data['images'] | |
| return [] | |
| async def get_caption(server: str, image_url: str) -> Dict: | |
| """Get caption from a specific server""" | |
| params = { | |
| 'image_url': image_url, | |
| 'model_choice': MODEL_TYPE | |
| } | |
| try: | |
| async with aiohttp.ClientSession() as session: | |
| async with session.get(server, params=params, timeout=30) as resp: | |
| return await resp.json() | |
| except Exception as e: | |
| print(f"Error from {server}: {e}") | |
| return None | |
| async def get_model_info(): | |
| """Get model information from caption servers""" | |
| model_info = [] | |
| async with aiohttp.ClientSession() as session: | |
| for server in CAPTION_SERVERS: | |
| try: | |
| health_url = server.rsplit('/analyze', 1)[0] + '/health' | |
| async with session.get(health_url) as resp: | |
| info = await resp.json() | |
| model_info.append({ | |
| 'url': server, | |
| 'model': info.get('model_choice', 'unknown') | |
| }) | |
| except Exception as e: | |
| print(f"Couldn't get model info from {server}: {e}") | |
| return model_info | |
| async def process_image(server: CaptionServer, course: str, image: Dict) -> Dict: | |
| """Process single image through one caption server with better error handling""" | |
| if server.busy: | |
| return None | |
| server.busy = True | |
| start_time = time.time() | |
| try: | |
| # Structure URL correctly: /images/COURSE_NAME_frames/IMAGE.png | |
| course_frames = f"{course}_frames" if not course.endswith("_frames") else course | |
| image_url = urljoin(SOURCE_SERVER, f"/images/{quote(course_frames)}/{quote(image['filename'])}") | |
| result = await get_caption(server.url, image_url) | |
| processing_time = time.time() - start_time | |
| server.total_time += processing_time | |
| if result and result.get('success') and result.get('caption'): | |
| server.total_processed += 1 | |
| metadata = { | |
| "image": image['filename'], | |
| "caption": result['caption'], | |
| "server": server.url, | |
| "processing_time": processing_time, | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| print(f"Server {server.url} processed {image['filename']} in {processing_time:.2f}s ({server.fps:.2f} fps)") | |
| return metadata | |
| else: | |
| # Server responded but no caption (might be error or empty response) | |
| error_msg = result.get('error', 'Unknown error') if result else 'No response' | |
| print(f"Server {server.url} failed for {image['filename']}: {error_msg}") | |
| return None | |
| except asyncio.TimeoutError: | |
| print(f"Server {server.url} timeout for {image['filename']}") | |
| return None | |
| except Exception as e: | |
| print(f"Error processing {image['filename']} on {server.url}: {e}") | |
| return None | |
| finally: | |
| server.busy = False | |
| async def upload_to_huggingface(course: str, metadata_list: List[Dict]): | |
| """Upload course captions to Hugging Face dataset""" | |
| try: | |
| print(f"π€ Uploading {len(metadata_list)} captions for {course} to Hugging Face...") | |
| # Prepare data for Hugging Face dataset | |
| dataset_data = { | |
| "course": [], | |
| "image_filename": [], | |
| "caption": [], | |
| "processing_server": [], | |
| "processing_time": [], | |
| "timestamp": [] | |
| } | |
| for metadata in metadata_list: | |
| dataset_data["course"].append(course) | |
| dataset_data["image_filename"].append(metadata["image"]) | |
| dataset_data["caption"].append(metadata["caption"]) | |
| dataset_data["processing_server"].append(metadata["server"]) | |
| dataset_data["processing_time"].append(metadata["processing_time"]) | |
| dataset_data["timestamp"].append(metadata["timestamp"]) | |
| # Create dataset | |
| dataset = Dataset.from_dict(dataset_data) | |
| # Login to Hugging Face | |
| huggingface_hub.login(token=HF_TOKEN) | |
| # Push to hub | |
| dataset.push_to_hub( | |
| HF_DATASET_ID, | |
| config_name=course.replace("/", "_").replace(" ", "_"), | |
| split="train", # You can change this to "train", "validation", "test" as needed | |
| commit_message=f"Add captions for course {course} - {len(metadata_list)} images" | |
| ) | |
| print(f"β Successfully uploaded {len(metadata_list)} captions for {course} to {HF_DATASET_ID}") | |
| return True | |
| except Exception as e: | |
| print(f"β Error uploading to Hugging Face: {e}") | |
| return False | |
| async def process_course(course: str, servers: List[CaptionServer]): | |
| """Process all images in a course using available servers with proper retry logic""" | |
| # Initialize course tracking | |
| if course not in processed_images: | |
| processed_images[course] = set() | |
| if course not in course_captions: | |
| course_captions[course] = load_captions_from_file(course) | |
| # Update processed images set from loaded captions | |
| for cap in course_captions[course]: | |
| processed_images[course].add(cap['image']) | |
| if course not in failed_images: | |
| failed_images[course] = set() | |
| # Get list of images | |
| images = await fetch_course_images(course) | |
| if not images: | |
| print(f"No images found for course {course}") | |
| return | |
| print(f"\nProcessing {len(images)} images for course {course}") | |
| # Track images that need processing with retry count (5 retries) | |
| pending_images = {} | |
| for img in images: | |
| filename = img['filename'] | |
| if filename not in processed_images[course] and filename not in failed_images[course]: | |
| pending_images[filename] = {'image': img, 'retries': 0, 'max_retries': 5} | |
| if not pending_images: | |
| print(f"All images already processed or failed for course {course}") | |
| print(f"- Processed: {len(processed_images[course])}, Failed: {len(failed_images[course])}") | |
| # If course is completed, upload to Hugging Face | |
| if len(processed_images[course]) + len(failed_images[course]) >= len(images): | |
| if course_captions[course]: | |
| print(f"π€ Course {course} completed, uploading to Hugging Face...") | |
| await upload_to_huggingface(course, course_captions[course]) | |
| return | |
| print(f"Images to process: {len(pending_images)} (already processed: {len(processed_images[course])}, failed: {len(failed_images[course])})") | |
| batch_size = len([s for s in servers if not s.busy]) | |
| processed_in_this_run = 0 | |
| while pending_images and is_processing: | |
| # Create tasks for each available server | |
| tasks = [] | |
| assigned_images = [] | |
| for server in servers: | |
| if not server.busy and pending_images: | |
| # Get the next pending image | |
| filename, img_data = next(iter(pending_images.items())) | |
| img = img_data['image'] | |
| # Assign this image to the server | |
| tasks.append(process_image(server, course, img)) | |
| assigned_images.append((filename, img, img_data['retries'])) | |
| # Remove from pending temporarily while it's being processed | |
| del pending_images[filename] | |
| if not tasks: | |
| # If no servers available, wait a bit | |
| await asyncio.sleep(0.1) | |
| continue | |
| # Process images in parallel across servers | |
| results = await asyncio.gather(*tasks) | |
| # Handle results and retry logic | |
| has_new_results = False | |
| for (filename, img, current_retries), result in zip(assigned_images, results): | |
| if result: | |
| # Success - image was processed | |
| processed_images[course].add(filename) | |
| course_captions[course].append(result) | |
| has_new_results = True | |
| processed_in_this_run += 1 | |
| print(f"β Successfully processed {filename}") | |
| else: | |
| # Failure - check if we should retry | |
| if current_retries < 5: # max_retries | |
| # Put back in pending for retry with incremented retry count | |
| pending_images[filename] = { | |
| 'image': img, | |
| 'retries': current_retries + 1, | |
| 'max_retries': 5 | |
| } | |
| print(f"β» Retry {current_retries + 1}/5 for {filename}") | |
| else: | |
| # Max retries exceeded, mark as failed | |
| failed_images[course].add(filename) | |
| print(f"β Failed to process {filename} after 5 retries") | |
| # Save progress after each batch with new results | |
| if has_new_results: | |
| save_captions_to_file(course, course_captions[course]) | |
| # Show progress | |
| total = len(images) | |
| done = len(processed_images[course]) | |
| failed_count = len(failed_images[course]) | |
| pending_count = len(pending_images) | |
| progress_percent = (done / total * 100) if total > 0 else 0 | |
| print(f"\rProgress: {done}/{total} ({progress_percent:.1f}%) - {pending_count} pending, {failed_count} failed, {processed_in_this_run} new", end="", flush=True) | |
| # Small delay to prevent overwhelming the servers | |
| await asyncio.sleep(0.5) | |
| # Final status for this course | |
| total = len(images) | |
| done = len(processed_images[course]) | |
| failed_count = len(failed_images[course]) | |
| if done + failed_count >= total: | |
| if failed_count > 0: | |
| print(f"\nβ Course {course} completed with {failed_count} failed images") | |
| else: | |
| print(f"\nβ Course {course} fully completed") | |
| # Upload to Hugging Face when course is completed | |
| if course_captions[course]: | |
| print(f"π€ Uploading {len(course_captions[course])} captions to Hugging Face...") | |
| success = await upload_to_huggingface(course, course_captions[course]) | |
| if success: | |
| print(f"β Successfully uploaded {course} to Hugging Face") | |
| else: | |
| print(f"β Failed to upload {course} to Hugging Face") | |
| else: | |
| print(f"\nβ Course {course} partially completed: {done}/{total} processed, {failed_count} failed") | |
| async def processing_loop(specific_courses: Optional[List[str]] = None, continuous: bool = True): | |
| """Main processing loop with proper error handling""" | |
| global is_processing | |
| # Get model information and verify Florence-2-large availability | |
| model_info = await get_model_info() | |
| print("\nCaption Servers:") | |
| available_servers = [] | |
| for info, server in zip(model_info, servers): | |
| server.model = info['model'] | |
| if MODEL_TYPE in info.get('model', ''): | |
| available_servers.append(server) | |
| print(f"β {server.url} confirmed {MODEL_TYPE}") | |
| else: | |
| print(f"β {server.url} using {server.model} - skipping (requires {MODEL_TYPE})") | |
| if not available_servers: | |
| print(f"\nError: No servers with {MODEL_TYPE} available!") | |
| is_processing = False | |
| return | |
| # Update servers list to only use those with large model | |
| processing_servers = available_servers | |
| print(f"\nUsing {len(processing_servers)} servers with {MODEL_TYPE}") | |
| # Check for existing caption files and report | |
| existing_captions = list(CAPTIONS_DIR.glob("*_captions.json")) | |
| if existing_captions: | |
| print("\nFound existing caption files:") | |
| for cap_file in existing_captions: | |
| course = cap_file.stem.replace("_captions", "") | |
| try: | |
| with open(cap_file, 'r', encoding='utf-8') as f: | |
| captions = json.load(f) | |
| print(f"- {course}: {len(captions)} captions") | |
| except Exception as e: | |
| print(f"- Error reading {cap_file.name}: {e}") | |
| print() | |
| start_time = time.time() | |
| iteration = 0 | |
| while is_processing: | |
| try: | |
| iteration += 1 | |
| print(f"\n{'='*50}") | |
| print(f"Processing Iteration {iteration}") | |
| print(f"{'='*50}") | |
| # Get available courses | |
| if specific_courses: | |
| courses = specific_courses | |
| print(f"Processing specific courses: {courses}") | |
| else: | |
| courses = await fetch_courses() | |
| print(f"Found {len(courses)} courses") | |
| if not courses: | |
| print("No courses found, waiting...") | |
| if not continuous: | |
| break | |
| await asyncio.sleep(10) | |
| continue | |
| # Process each course with all available servers | |
| for course in courses: | |
| if not is_processing: | |
| break | |
| print(f"\n--- Processing course: {course} ---") | |
| await process_course(course, processing_servers) | |
| # Show server stats | |
| print("\nServer Stats:") | |
| total_processed = sum(s.total_processed for s in processing_servers) | |
| elapsed = time.time() - start_time | |
| if elapsed > 0: | |
| print(f"Total images processed: {total_processed}") | |
| print(f"Overall speed: {total_processed/elapsed:.2f} fps") | |
| for s in processing_servers: | |
| print(f"- {s.url}: {s.total_processed} images, {s.fps:.2f} fps") | |
| print() | |
| if not continuous: | |
| print("One-time processing completed") | |
| break | |
| # Wait before next check | |
| print("Waiting for new courses...") | |
| await asyncio.sleep(5) | |
| except asyncio.CancelledError: | |
| print("Processing cancelled") | |
| break | |
| except Exception as e: | |
| print(f"Error in processing loop: {str(e)}") | |
| import traceback | |
| traceback.print_exc() | |
| await asyncio.sleep(10) | |
| is_processing = False | |
| print("Processing loop stopped") | |
| # Startup event | |
| async def startup_event(): | |
| """Initialize servers and start processing on startup""" | |
| initialize_servers() | |
| print("Caption Coordinator API started") | |
| print(f"Source server: {SOURCE_SERVER}") | |
| print(f"Caption servers: {len(CAPTION_SERVERS)}") | |
| print(f"Hugging Face dataset: {HF_DATASET_ID}") | |
| print(f"HF Token: {'β Set' if HF_TOKEN else 'β Missing'}") | |
| # Start processing automatically (like original main()) | |
| if auto_start_processing: | |
| print("Auto-starting processing loop...") | |
| global is_processing, current_processing_task | |
| is_processing = True | |
| current_processing_task = asyncio.create_task(processing_loop()) | |
| if __name__ == "__main__": | |
| uvicorn.run(app, host="0.0.0.0", port=8000, reload=True) |