| import os
|
| import json
|
| import time
|
| import threading
|
| import asyncio
|
| from fastapi import FastAPI, HTTPException, BackgroundTasks
|
| from fastapi.middleware.cors import CORSMiddleware
|
| from fastapi.responses import JSONResponse, FileResponse
|
| from fastapi.staticfiles import StaticFiles
|
| import uvicorn
|
| from typing import Dict
|
| from pathlib import Path
|
| import subprocess
|
| from datetime import datetime
|
|
|
| import torch
|
|
|
|
|
| from vision_analyzer import (
|
| main_processing_loop,
|
| processing_status,
|
| log_message,
|
| FRAMES_OUTPUT_FOLDER
|
| )
|
|
|
|
|
| app = FastAPI(title="Video Analysis API",
|
| description="API to access video frame analysis results and extracted images",
|
| version="1.0.0")
|
|
|
|
|
| app.add_middleware(
|
| CORSMiddleware,
|
| allow_origins=["*"],
|
| allow_credentials=True,
|
| allow_methods=["*"],
|
| allow_headers=["*"],
|
| )
|
|
|
|
|
| processing_thread = None
|
| frame_locks = {}
|
| processed_frames = {}
|
| LOCK_TIMEOUT = 300
|
| TRACKING_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), "frame_tracking.json")
|
|
|
| def save_tracking_state():
|
| """Save frame tracking state to disk"""
|
| state = {
|
| "frame_locks": frame_locks,
|
| "processed_frames": processed_frames
|
| }
|
| try:
|
| with open(TRACKING_FILE, "w") as f:
|
| json.dump(state, f, indent=2)
|
| except Exception as e:
|
| log_message(f"Error saving tracking state: {e}")
|
|
|
| def load_tracking_state():
|
| """Load frame tracking state from disk"""
|
| global frame_locks, processed_frames
|
| try:
|
| with open(TRACKING_FILE, "r") as f:
|
| state = json.load(f)
|
| frame_locks = state.get("frame_locks", {})
|
| processed_frames = state.get("processed_frames", {})
|
| except FileNotFoundError:
|
| log_message("No previous tracking state found")
|
| except Exception as e:
|
| log_message(f"Error loading tracking state: {e}")
|
|
|
| def check_frame_lock(course: str, frame: str) -> bool:
|
| """Check if frame is locked and lock hasn't expired"""
|
| if course in frame_locks and frame in frame_locks[course]:
|
| lock = frame_locks[course][frame]
|
| if time.time() - lock["locked_at"] < LOCK_TIMEOUT:
|
| return True
|
|
|
| del frame_locks[course][frame]
|
| save_tracking_state()
|
| return False
|
|
|
| def lock_frame(course: str, frame: str, requester_id: str) -> bool:
|
| """Attempt to lock a frame for processing"""
|
| if check_frame_lock(course, frame):
|
| return False
|
|
|
| if course not in frame_locks:
|
| frame_locks[course] = {}
|
|
|
| frame_locks[course][frame] = {
|
| "locked_by": requester_id,
|
| "locked_at": time.time()
|
| }
|
| save_tracking_state()
|
| return True
|
|
|
| def mark_frame_processed(course: str, frame: str, requester_id: str):
|
| """Mark a frame as successfully processed"""
|
| if course not in processed_frames:
|
| processed_frames[course] = {}
|
|
|
| processed_frames[course][frame] = {
|
| "processed_by": requester_id,
|
| "processed_at": time.time()
|
| }
|
|
|
|
|
| if course in frame_locks and frame in frame_locks[course]:
|
| del frame_locks[course][frame]
|
|
|
| save_tracking_state()
|
|
|
| def log_message(message):
|
| """Add a log message with timestamp"""
|
| timestamp = datetime.now().strftime("%H:%M:%S")
|
| log_entry = f"[{timestamp}] {message}"
|
| processing_status["logs"].append(log_entry)
|
|
|
|
|
| if len(processing_status["logs"]) > 100:
|
| processing_status["logs"] = processing_status["logs"][-100:]
|
|
|
| print(log_entry)
|
|
|
| @app.on_event("startup")
|
| async def startup_event():
|
| """Initialize frame tracking and start processing loop"""
|
|
|
| load_tracking_state()
|
| log_message("✓ Loaded frame tracking state")
|
|
|
|
|
| global processing_thread
|
| if not (processing_thread and processing_thread.is_alive()):
|
| log_message("🚀 Starting RAR extraction, frame extraction, and vision analysis pipeline in background...")
|
| processing_thread = threading.Thread(target=main_processing_loop)
|
| processing_thread.daemon = True
|
| processing_thread.start()
|
|
|
| @app.get("/")
|
| async def root():
|
| """Root endpoint that returns basic info"""
|
| return {
|
| "message": "Video Analysis API",
|
| "status": "running",
|
| "endpoints": {
|
| "/status": "Get processing status",
|
| "/courses": "List all available course folders",
|
| "/images/{course_folder}": "List images in a course folder",
|
| "/images/{course_folder}/{frame_filename}": "Get specific frame image",
|
| "/start-processing": "Start processing pipeline",
|
| "/stop-processing": "Stop processing pipeline"
|
| }
|
| }
|
|
|
| @app.get("/status")
|
| async def get_status():
|
| """Get current processing status"""
|
| return {
|
| "processing_status": processing_status,
|
| "frames_folder": FRAMES_OUTPUT_FOLDER,
|
| "frames_folder_exists": os.path.exists(FRAMES_OUTPUT_FOLDER)
|
| }
|
|
|
|
|
|
|
| @app.get("/middleware/next/course")
|
| async def get_next_course(requester_id: str):
|
| """Get next available course for processing"""
|
| if not os.path.exists(FRAMES_OUTPUT_FOLDER):
|
| raise HTTPException(status_code=404, detail="No courses available")
|
|
|
|
|
| load_tracking_state()
|
|
|
|
|
| for folder in os.listdir(FRAMES_OUTPUT_FOLDER):
|
| folder_path = os.path.join(FRAMES_OUTPUT_FOLDER, folder)
|
| if not os.path.isdir(folder_path):
|
| continue
|
|
|
|
|
| image_files = [f for f in os.listdir(folder_path)
|
| if f.lower().endswith(('.png', '.jpg', '.jpeg'))]
|
|
|
| for image in image_files:
|
| if (folder not in processed_frames or
|
| image not in processed_frames[folder]):
|
| return {"course": folder}
|
|
|
| raise HTTPException(status_code=404, detail="No courses with unprocessed frames")
|
|
|
| @app.get("/middleware/next/image/{course_folder}")
|
| async def get_next_image(course_folder: str, requester_id: str):
|
| """Get next available image from a course"""
|
| folder_path = os.path.join(FRAMES_OUTPUT_FOLDER, course_folder)
|
|
|
| if not os.path.exists(folder_path):
|
| raise HTTPException(status_code=404, detail=f"Course not found: {course_folder}")
|
|
|
|
|
| load_tracking_state()
|
|
|
|
|
| for file in sorted(os.listdir(folder_path)):
|
| if not file.lower().endswith(('.png', '.jpg', '.jpeg')):
|
| continue
|
|
|
|
|
| if (course_folder in processed_frames and
|
| file in processed_frames[course_folder]):
|
| continue
|
|
|
|
|
| if check_frame_lock(course_folder, file):
|
| continue
|
|
|
|
|
| if lock_frame(course_folder, file, requester_id):
|
| file_path = os.path.join(folder_path, file)
|
| file_stats = os.stat(file_path)
|
| return {
|
| "file_id": f"frame:{course_folder}/{file}",
|
| "frame": file,
|
| "video": os.path.splitext(file)[0],
|
| "size_bytes": file_stats.st_size,
|
| "modified_time": time.ctime(file_stats.st_mtime),
|
| "url": f"/images/{course_folder}/{file}"
|
| }
|
|
|
| raise HTTPException(status_code=404, detail="No available frames in course")
|
|
|
| @app.post("/middleware/release/frame/{course_folder}/{video}/{frame}")
|
| async def release_frame(course_folder: str, video: str, frame: str, requester_id: str):
|
| """Release a frame lock"""
|
| if course_folder in frame_locks and frame in frame_locks[course_folder]:
|
| lock = frame_locks[course_folder][frame]
|
| if lock["locked_by"] == requester_id:
|
| del frame_locks[course_folder][frame]
|
| save_tracking_state()
|
| return {"status": "released"}
|
| return {"status": "not_found"}
|
|
|
| @app.post("/middleware/release/course/{course_folder}")
|
| async def release_course(course_folder: str, requester_id: str):
|
| """Release all frame locks for a course"""
|
| if course_folder in frame_locks:
|
|
|
| frames_to_release = [
|
| frame for frame, lock in frame_locks[course_folder].items()
|
| if lock["locked_by"] == requester_id
|
| ]
|
| for frame in frames_to_release:
|
| del frame_locks[course_folder][frame]
|
| save_tracking_state()
|
| return {"status": "released"}
|
|
|
| @app.get("/images/{course_folder}/{frame_filename}")
|
| async def get_frame_image(course_folder: str, frame_filename: str, requester_id: str = None):
|
| """
|
| Serve extracted frame images from course folders with locking
|
|
|
| Args:
|
| course_folder: The course folder name (e.g., "course1_video1_mp4_frames")
|
| frame_filename: The frame file name (e.g., "0001.png")
|
| requester_id: Optional requester ID for frame locking
|
| """
|
|
|
| load_tracking_state()
|
|
|
|
|
| image_path = os.path.join(FRAMES_OUTPUT_FOLDER, course_folder, frame_filename)
|
|
|
|
|
| if not os.path.exists(image_path):
|
| raise HTTPException(status_code=404, detail=f"Image not found: {course_folder}/{frame_filename}")
|
|
|
|
|
| if not frame_filename.lower().endswith(('.png', '.jpg', '.jpeg')):
|
| raise HTTPException(status_code=400, detail="File must be an image (PNG, JPG, JPEG)")
|
|
|
|
|
| if requester_id:
|
| if check_frame_lock(course_folder, frame_filename):
|
| lock = frame_locks[course_folder][frame_filename]
|
| if lock["locked_by"] != requester_id:
|
| raise HTTPException(status_code=423, detail="Frame is locked by another requester")
|
|
|
|
|
| return FileResponse(image_path)
|
|
|
| @app.get("/images/{course_folder}")
|
| async def list_course_images(course_folder: str):
|
| """
|
| List all available images in a specific course folder
|
|
|
| Args:
|
| course_folder: The course folder name
|
| """
|
| folder_path = os.path.join(FRAMES_OUTPUT_FOLDER, course_folder)
|
|
|
| if not os.path.exists(folder_path):
|
| raise HTTPException(status_code=404, detail=f"Course folder not found: {course_folder}")
|
|
|
|
|
| image_files = []
|
| for file in os.listdir(folder_path):
|
| if file.lower().endswith(('.png', '.jpg', '.jpeg')):
|
| file_path = os.path.join(folder_path, file)
|
| file_stats = os.stat(file_path)
|
| image_files.append({
|
| "filename": file,
|
| "size_bytes": file_stats.st_size,
|
| "modified_time": time.ctime(file_stats.st_mtime),
|
| "url": f"/images/{course_folder}/{file}"
|
| })
|
|
|
| return {
|
| "course_folder": course_folder,
|
| "total_images": len(image_files),
|
| "images": image_files
|
| }
|
|
|
| @app.get("/courses")
|
| async def list_all_courses():
|
| """
|
| List all available course folders with their image counts
|
| """
|
| if not os.path.exists(FRAMES_OUTPUT_FOLDER):
|
| return {"courses": [], "message": "Frames output folder does not exist yet"}
|
|
|
| courses = []
|
| for folder in os.listdir(FRAMES_OUTPUT_FOLDER):
|
| folder_path = os.path.join(FRAMES_OUTPUT_FOLDER, folder)
|
| if os.path.isdir(folder_path):
|
|
|
| image_count = len([f for f in os.listdir(folder_path)
|
| if f.lower().endswith(('.png', '.jpg', '.jpeg'))])
|
| courses.append({
|
| "course_folder": folder,
|
| "image_count": image_count,
|
| "images_url": f"/images/{folder}",
|
| "sample_image_url": f"/images/{folder}/0001.png" if image_count > 0 else None
|
| })
|
|
|
| return {
|
| "total_courses": len(courses),
|
| "courses": courses
|
| }
|
|
|
|
|
|
|
| def handle_shutdown(signum, frame):
|
| """Prevent shutdown on SIGTERM/SIGINT"""
|
| print(f"\n⚠️ Received signal {signum}. Server will continue running.")
|
| print("Use Ctrl+Break or kill -9 to force stop.")
|
|
|
|
|
| import signal
|
| signal.signal(signal.SIGINT, handle_shutdown)
|
| signal.signal(signal.SIGTERM, handle_shutdown)
|
|
|
|
|
| @app.on_event("shutdown")
|
| async def shutdown_event():
|
| """Save state on shutdown attempt"""
|
| save_tracking_state()
|
| print("💾 Saved tracking state")
|
| print("⚠️ Server shutdown prevented - use Ctrl+Break or kill -9 to force stop")
|
|
|
| while True:
|
| await asyncio.sleep(1)
|
|
|
| if __name__ == "__main__":
|
|
|
| print("🚀 Starting Video Analysis FastAPI Server (Persistent Mode)...")
|
| print("API Documentation will be available at: http://localhost:8000/docs")
|
| print("API Root endpoint: http://localhost:8000/")
|
| print("⚠️ Server will continue running even after processing completes")
|
| print("Use Ctrl+Break or kill -9 to force stop")
|
|
|
|
|
| os.makedirs(FRAMES_OUTPUT_FOLDER, exist_ok=True)
|
|
|
|
|
| processing_thread = threading.Thread(target=main_processing_loop)
|
| processing_thread.daemon = False
|
| processing_thread.start()
|
|
|
|
|
| config = uvicorn.Config(
|
| app=app,
|
| host="0.0.0.0",
|
| port=8000,
|
| log_level="info",
|
| reload=False,
|
| workers=1,
|
| loop="asyncio",
|
| timeout_keep_alive=600,
|
| access_log=True
|
| )
|
|
|
|
|
| server = uvicorn.Server(config)
|
| server.run()
|
|
|
|
|