Spaces:
Runtime error
Runtime error
| import os | |
| import json | |
| import time | |
| import threading | |
| import asyncio | |
| from fastapi import FastAPI, HTTPException, BackgroundTasks | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.responses import JSONResponse, FileResponse | |
| from fastapi.staticfiles import StaticFiles | |
| import uvicorn | |
| from typing import Dict | |
| from pathlib import Path | |
| import subprocess | |
| from datetime import datetime | |
| import torch | |
| # Import core functionality | |
| from vision_analyzer import ( | |
| main_processing_loop, | |
| processing_status, | |
| log_message, | |
| FRAMES_OUTPUT_FOLDER | |
| ) | |
| # FastAPI App Definition | |
| app = FastAPI(title="Video Analysis API", | |
| description="API to access video frame analysis results and extracted images", | |
| version="1.0.0") | |
| # Add CORS middleware to allow cross-origin requests | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], # Allows all origins | |
| allow_credentials=True, | |
| allow_methods=["*"], # Allows all methods | |
| allow_headers=["*"], | |
| ) | |
| # Global variables for processing and frame tracking | |
| processing_thread = None | |
| frame_locks = {} # Dict to track frame locks: {course: {frame: {"locked_by": id, "locked_at": timestamp}}} | |
| processed_frames = {} # Dict to track processed frames: {course: {frame: {"processed_by": id, "processed_at": timestamp}}} | |
| LOCK_TIMEOUT = 300 # 5 minutes timeout for locks | |
| TRACKING_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), "frame_tracking.json") | |
| def save_tracking_state(): | |
| """Save frame tracking state to disk""" | |
| state = { | |
| "frame_locks": frame_locks, | |
| "processed_frames": processed_frames | |
| } | |
| try: | |
| with open(TRACKING_FILE, "w") as f: | |
| json.dump(state, f, indent=2) | |
| except Exception as e: | |
| log_message(f"Error saving tracking state: {e}") | |
| def load_tracking_state(): | |
| """Load frame tracking state from disk""" | |
| global frame_locks, processed_frames | |
| try: | |
| with open(TRACKING_FILE, "r") as f: | |
| state = json.load(f) | |
| frame_locks = state.get("frame_locks", {}) | |
| processed_frames = state.get("processed_frames", {}) | |
| except FileNotFoundError: | |
| log_message("No previous tracking state found") | |
| except Exception as e: | |
| log_message(f"Error loading tracking state: {e}") | |
| def check_frame_lock(course: str, frame: str) -> bool: | |
| """Check if frame is locked and lock hasn't expired""" | |
| if course in frame_locks and frame in frame_locks[course]: | |
| lock = frame_locks[course][frame] | |
| if time.time() - lock["locked_at"] < LOCK_TIMEOUT: | |
| return True | |
| # Lock expired, remove it | |
| del frame_locks[course][frame] | |
| save_tracking_state() | |
| return False | |
| def lock_frame(course: str, frame: str, requester_id: str) -> bool: | |
| """Attempt to lock a frame for processing""" | |
| if check_frame_lock(course, frame): | |
| return False | |
| if course not in frame_locks: | |
| frame_locks[course] = {} | |
| frame_locks[course][frame] = { | |
| "locked_by": requester_id, | |
| "locked_at": time.time() | |
| } | |
| save_tracking_state() | |
| return True | |
| def mark_frame_processed(course: str, frame: str, requester_id: str): | |
| """Mark a frame as successfully processed""" | |
| if course not in processed_frames: | |
| processed_frames[course] = {} | |
| processed_frames[course][frame] = { | |
| "processed_by": requester_id, | |
| "processed_at": time.time() | |
| } | |
| # Remove the lock if it exists | |
| if course in frame_locks and frame in frame_locks[course]: | |
| del frame_locks[course][frame] | |
| save_tracking_state() | |
| def log_message(message): | |
| """Add a log message with timestamp""" | |
| timestamp = datetime.now().strftime("%H:%M:%S") | |
| log_entry = f"[{timestamp}] {message}" | |
| processing_status["logs"].append(log_entry) | |
| # Keep only the last 100 logs | |
| if len(processing_status["logs"]) > 100: | |
| processing_status["logs"] = processing_status["logs"][-100:] | |
| print(log_entry) | |
| async def startup_event(): | |
| """Initialize frame tracking and start processing loop""" | |
| # Load frame tracking state | |
| load_tracking_state() | |
| log_message("✓ Loaded frame tracking state") | |
| # Start processing thread | |
| global processing_thread | |
| if not (processing_thread and processing_thread.is_alive()): | |
| log_message("🚀 Starting RAR extraction, frame extraction, and vision analysis pipeline in background...") | |
| processing_thread = threading.Thread(target=main_processing_loop) | |
| processing_thread.daemon = True | |
| processing_thread.start() | |
| async def root(): | |
| """Root endpoint that returns basic info""" | |
| return { | |
| "message": "Video Analysis API", | |
| "status": "running", | |
| "endpoints": { | |
| "/status": "Get processing status", | |
| "/courses": "List all available course folders", | |
| "/images/{course_folder}": "List images in a course folder", | |
| "/images/{course_folder}/{frame_filename}": "Get specific frame image", | |
| "/start-processing": "Start processing pipeline", | |
| "/stop-processing": "Stop processing pipeline" | |
| } | |
| } | |
| async def get_status(): | |
| """Get current processing status""" | |
| return { | |
| "processing_status": processing_status, | |
| "frames_folder": FRAMES_OUTPUT_FOLDER, | |
| "frames_folder_exists": os.path.exists(FRAMES_OUTPUT_FOLDER) | |
| } | |
| # ===== NEW IMAGE SERVING ENDPOINTS ===== | |
| async def get_next_course(requester_id: str): | |
| """Get next available course for processing""" | |
| if not os.path.exists(FRAMES_OUTPUT_FOLDER): | |
| raise HTTPException(status_code=404, detail="No courses available") | |
| # Load latest state | |
| load_tracking_state() | |
| # Find a course with unprocessed frames | |
| for folder in os.listdir(FRAMES_OUTPUT_FOLDER): | |
| folder_path = os.path.join(FRAMES_OUTPUT_FOLDER, folder) | |
| if not os.path.isdir(folder_path): | |
| continue | |
| # Check if course has any unprocessed frames | |
| image_files = [f for f in os.listdir(folder_path) | |
| if f.lower().endswith(('.png', '.jpg', '.jpeg'))] | |
| for image in image_files: | |
| if (folder not in processed_frames or | |
| image not in processed_frames[folder]): | |
| return {"course": folder} | |
| raise HTTPException(status_code=404, detail="No courses with unprocessed frames") | |
| async def get_next_image(course_folder: str, requester_id: str): | |
| """Get next available image from a course""" | |
| folder_path = os.path.join(FRAMES_OUTPUT_FOLDER, course_folder) | |
| if not os.path.exists(folder_path): | |
| raise HTTPException(status_code=404, detail=f"Course not found: {course_folder}") | |
| # Load latest state | |
| load_tracking_state() | |
| # Find first unprocessed and unlocked frame | |
| for file in sorted(os.listdir(folder_path)): | |
| if not file.lower().endswith(('.png', '.jpg', '.jpeg')): | |
| continue | |
| # Skip if processed | |
| if (course_folder in processed_frames and | |
| file in processed_frames[course_folder]): | |
| continue | |
| # Skip if locked by another requester | |
| if check_frame_lock(course_folder, file): | |
| continue | |
| # Try to lock the frame | |
| if lock_frame(course_folder, file, requester_id): | |
| file_path = os.path.join(folder_path, file) | |
| file_stats = os.stat(file_path) | |
| return { | |
| "file_id": f"frame:{course_folder}/{file}", | |
| "frame": file, | |
| "video": os.path.splitext(file)[0], | |
| "size_bytes": file_stats.st_size, | |
| "modified_time": time.ctime(file_stats.st_mtime), | |
| "url": f"/images/{course_folder}/{file}" | |
| } | |
| raise HTTPException(status_code=404, detail="No available frames in course") | |
| async def release_frame(course_folder: str, video: str, frame: str, requester_id: str): | |
| """Release a frame lock""" | |
| if course_folder in frame_locks and frame in frame_locks[course_folder]: | |
| lock = frame_locks[course_folder][frame] | |
| if lock["locked_by"] == requester_id: | |
| del frame_locks[course_folder][frame] | |
| save_tracking_state() | |
| return {"status": "released"} | |
| return {"status": "not_found"} | |
| async def release_course(course_folder: str, requester_id: str): | |
| """Release all frame locks for a course""" | |
| if course_folder in frame_locks: | |
| # Only release frames locked by this requester | |
| frames_to_release = [ | |
| frame for frame, lock in frame_locks[course_folder].items() | |
| if lock["locked_by"] == requester_id | |
| ] | |
| for frame in frames_to_release: | |
| del frame_locks[course_folder][frame] | |
| save_tracking_state() | |
| return {"status": "released"} | |
| async def get_frame_image(course_folder: str, frame_filename: str, requester_id: str = None): | |
| """ | |
| Serve extracted frame images from course folders with locking | |
| Args: | |
| course_folder: The course folder name (e.g., "course1_video1_mp4_frames") | |
| frame_filename: The frame file name (e.g., "0001.png") | |
| requester_id: Optional requester ID for frame locking | |
| """ | |
| # Load latest state | |
| load_tracking_state() | |
| # Construct the full path to the image | |
| image_path = os.path.join(FRAMES_OUTPUT_FOLDER, course_folder, frame_filename) | |
| # Check if file exists | |
| if not os.path.exists(image_path): | |
| raise HTTPException(status_code=404, detail=f"Image not found: {course_folder}/{frame_filename}") | |
| # Verify it's an image file | |
| if not frame_filename.lower().endswith(('.png', '.jpg', '.jpeg')): | |
| raise HTTPException(status_code=400, detail="File must be an image (PNG, JPG, JPEG)") | |
| # If requester_id provided, verify frame lock | |
| if requester_id: | |
| if check_frame_lock(course_folder, frame_filename): | |
| lock = frame_locks[course_folder][frame_filename] | |
| if lock["locked_by"] != requester_id: | |
| raise HTTPException(status_code=423, detail="Frame is locked by another requester") | |
| # Return the image file | |
| return FileResponse(image_path) | |
| async def list_course_images(course_folder: str): | |
| """ | |
| List all available images in a specific course folder | |
| Args: | |
| course_folder: The course folder name | |
| """ | |
| folder_path = os.path.join(FRAMES_OUTPUT_FOLDER, course_folder) | |
| if not os.path.exists(folder_path): | |
| raise HTTPException(status_code=404, detail=f"Course folder not found: {course_folder}") | |
| # Get all image files | |
| image_files = [] | |
| for file in os.listdir(folder_path): | |
| if file.lower().endswith(('.png', '.jpg', '.jpeg')): | |
| file_path = os.path.join(folder_path, file) | |
| file_stats = os.stat(file_path) | |
| image_files.append({ | |
| "filename": file, | |
| "size_bytes": file_stats.st_size, | |
| "modified_time": time.ctime(file_stats.st_mtime), | |
| "url": f"/images/{course_folder}/{file}" | |
| }) | |
| return { | |
| "course_folder": course_folder, | |
| "total_images": len(image_files), | |
| "images": image_files | |
| } | |
| async def list_all_courses(): | |
| """ | |
| List all available course folders with their image counts | |
| """ | |
| if not os.path.exists(FRAMES_OUTPUT_FOLDER): | |
| return {"courses": [], "message": "Frames output folder does not exist yet"} | |
| courses = [] | |
| for folder in os.listdir(FRAMES_OUTPUT_FOLDER): | |
| folder_path = os.path.join(FRAMES_OUTPUT_FOLDER, folder) | |
| if os.path.isdir(folder_path): | |
| # Count image files | |
| image_count = len([f for f in os.listdir(folder_path) | |
| if f.lower().endswith(('.png', '.jpg', '.jpeg'))]) | |
| courses.append({ | |
| "course_folder": folder, | |
| "image_count": image_count, | |
| "images_url": f"/images/{folder}", | |
| "sample_image_url": f"/images/{folder}/0001.png" if image_count > 0 else None | |
| }) | |
| return { | |
| "total_courses": len(courses), | |
| "courses": courses | |
| } | |
| # Signal handlers to prevent accidental shutdown | |
| def handle_shutdown(signum, frame): | |
| """Prevent shutdown on SIGTERM/SIGINT""" | |
| print(f"\n⚠️ Received signal {signum}. Server will continue running.") | |
| print("Use Ctrl+Break or kill -9 to force stop.") | |
| # Setup signal handlers for graceful shutdown prevention | |
| import signal | |
| signal.signal(signal.SIGINT, handle_shutdown) | |
| signal.signal(signal.SIGTERM, handle_shutdown) | |
| # Server lifecycle events | |
| async def shutdown_event(): | |
| """Save state on shutdown attempt""" | |
| save_tracking_state() | |
| print("💾 Saved tracking state") | |
| print("⚠️ Server shutdown prevented - use Ctrl+Break or kill -9 to force stop") | |
| # Prevent shutdown by not returning | |
| while True: | |
| await asyncio.sleep(1) | |
| if __name__ == "__main__": | |
| # Start the FastAPI server | |
| print("🚀 Starting Video Analysis FastAPI Server (Persistent Mode)...") | |
| print("API Documentation will be available at: http://localhost:8000/docs") | |
| print("API Root endpoint: http://localhost:8000/") | |
| print("⚠️ Server will continue running even after processing completes") | |
| print("Use Ctrl+Break or kill -9 to force stop") | |
| # Ensure the analysis output folder exists | |
| os.makedirs(FRAMES_OUTPUT_FOLDER, exist_ok=True) | |
| # Start processing in thread instead of blocking | |
| processing_thread = threading.Thread(target=main_processing_loop) | |
| processing_thread.daemon = False # Make non-daemon so it doesn't exit | |
| processing_thread.start() | |
| # Configure uvicorn for persistent running | |
| config = uvicorn.Config( | |
| app=app, | |
| host="0.0.0.0", | |
| port=8000, | |
| log_level="info", | |
| reload=False, | |
| workers=1, | |
| loop="asyncio", | |
| timeout_keep_alive=600, # Keep connections alive longer | |
| access_log=True | |
| ) | |
| # Run server with persistent config | |
| server = uvicorn.Server(config) | |
| server.run() | |