import os import json import requests import subprocess import shutil import time import re import threading from typing import Dict, List, Set, Optional from huggingface_hub import HfApi, list_repo_files from fastapi import FastAPI, File, UploadFile, Form from fastapi.responses import JSONResponse from pathlib import Path import smtplib from email.message import EmailMessage import tempfile import rarfile import zipfile import cv2 import numpy as np from PIL import Image import torch from transformers import AutoProcessor, AutoModelForCausalLM # Initialize FastAPI app = FastAPI() # ==== CONFIGURATION ==== HF_TOKEN = os.getenv("HF_TOKEN", "") SOURCE_REPO_ID = os.getenv("SOURCE_REPO", "Fred808/BG1") # Path Configuration DOWNLOAD_FOLDER = "downloads" EXTRACT_FOLDER = "extracted" FRAMES_OUTPUT_FOLDER = "extracted_frames" ANALYSIS_OUTPUT_FOLDER = "analysis_results" os.makedirs(DOWNLOAD_FOLDER, exist_ok=True) os.makedirs(EXTRACT_FOLDER, exist_ok=True) os.makedirs(FRAMES_OUTPUT_FOLDER, exist_ok=True) os.makedirs(ANALYSIS_OUTPUT_FOLDER, exist_ok=True) # State Files DOWNLOAD_STATE_FILE = "download_progress.json" PROCESS_STATE_FILE = "process_progress.json" FAILED_FILES_LOG = "failed_files.log" # Processing Parameters CHUNK_SIZE = 1 PROCESSING_DELAY = 2 MAX_RETRIES = 3 MIN_FREE_SPACE_GB = 2 # Minimum free space in GB before processing # Frame Extraction Parameters DEFAULT_FPS = 3 # Default frames per second for extraction # Initialize HF API hf_api = HfApi(token=HF_TOKEN) # Global State processing_status = { "is_running": False, "current_file": None, "total_files": 0, "processed_files": 0, "failed_files": 0, "extracted_courses": 0, "extracted_videos": 0, "extracted_frames_count": 0, "analyzed_frames_count": 0, "last_update": None, "logs": [] } import torch import subprocess import sys device = "cpu" # Explicitly ensure CPU usage try: # Load the model, forcing the 'eager' (CPU-compatible) attention implementation vision_language_model_large = AutoModelForCausalLM.from_pretrained( "microsoft/Florence-2-Base", trust_remote_code=True ).to(device).eval() vision_language_processor_large = AutoProcessor.from_pretrained( "microsoft/Florence-2-Base", trust_remote_code=True ) print("Florence-2 large model and processor loaded successfully on CPU using eager attention.") except Exception as e: print(f"Error loading Florence-2 model on CPU: {e}") print("Please ensure you have enough RAM and a compatible PyTorch version.") vision_language_model_large = None vision_language_processor_large = None def log_message(message: str): """Log messages with timestamp""" timestamp = time.strftime("%Y-%m-%d %H:%M:%S") log_entry = f"[{timestamp}] {message}" print(log_entry) processing_status["logs"].append(log_entry) processing_status["last_update"] = timestamp if len(processing_status["logs"]) > 100: processing_status["logs"] = processing_status["logs"][-100:] def log_failed_file(filename: str, error: str): """Log failed files to persistent file""" with open(FAILED_FILES_LOG, "a") as f: f.write(f"{time.strftime('%Y-%m-%d %H:%M:%S')} - {filename}: {error}\n") def get_disk_usage(path: str) -> Dict[str, float]: """Get disk usage statistics in GB""" statvfs = os.statvfs(path) total = statvfs.f_frsize * statvfs.f_blocks / (1024**3) free = statvfs.f_frsize * statvfs.f_bavail / (1024**3) used = total - free return {"total": total, "free": free, "used": used} def check_disk_space(path: str = ".") -> bool: """Check if there's enough disk space""" disk_info = get_disk_usage(path) if disk_info["free"] < MIN_FREE_SPACE_GB: log_message(f'โš ๏ธ Low disk space: {disk_info["free"]:.2f}GB free, {disk_info["used"]:.2f}GB used') return False return True def cleanup_temp_files(): """Clean up temporary files to free space""" log_message("๐Ÿงน Cleaning up temporary files...") # Clean old downloads (keep only current processing file) current_file = processing_status.get("current_file") for file in os.listdir(DOWNLOAD_FOLDER): if file != current_file and file.endswith((".rar", ".zip")): try: os.remove(os.path.join(DOWNLOAD_FOLDER, file)) log_message(f"๐Ÿ—‘๏ธ Removed old download: {file}") except: pass def load_json_state(file_path: str, default_value): """Load state from JSON file""" if os.path.exists(file_path): try: with open(file_path, "r") as f: return json.load(f) except json.JSONDecodeError: log_message(f"โš ๏ธ Corrupted state file: {file_path}") return default_value def save_json_state(file_path: str, data): """Save state to JSON file""" with open(file_path, "w") as f: json.dump(data, f, indent=2) def download_with_retry(url: str, dest_path: str, max_retries: int = 3) -> bool: """Download file with retry logic and disk space checking""" if not check_disk_space(): cleanup_temp_files() if not check_disk_space(): log_message("โŒ Insufficient disk space even after cleanup") return False headers = {"Authorization": f"Bearer {HF_TOKEN}"} for attempt in range(max_retries): try: with requests.get(url, headers=headers, stream=True) as r: r.raise_for_status() # Check content length if available content_length = r.headers.get("content-length") if content_length: size_gb = int(content_length) / (1024**3) disk_info = get_disk_usage(".") if size_gb > disk_info["free"] - 0.5: # Leave 0.5GB buffer log_message(f'โŒ File too large: {size_gb:.2f}GB, only {disk_info["free"]:.2f}GB free') return False with open(dest_path, "wb") as f: for chunk in r.iter_content(chunk_size=8192): f.write(chunk) return True except Exception as e: if attempt < max_retries - 1: time.sleep(2 ** attempt) continue log_message(f"โŒ Download failed after {max_retries} attempts: {e}") return False return False def is_multipart_rar(filename: str) -> bool: """Check if this is a multi-part RAR file""" return ".part" in filename.lower() and filename.lower().endswith(".rar") def get_rar_part_base(filename: str) -> str: """Get the base name for multi-part RAR files""" if ".part" in filename.lower(): return filename.split(".part")[0] return filename.replace(".rar", "") def extract_with_retry(rar_path: str, output_dir: str, max_retries: int = 2) -> bool: """Extract RAR with retry and recovery, handling multi-part archives""" filename = os.path.basename(rar_path) # For multi-part RARs, we need the first part if is_multipart_rar(filename): base_name = get_rar_part_base(filename) first_part = f"{base_name}.part01.rar" first_part_path = os.path.join(os.path.dirname(rar_path), first_part) if not os.path.exists(first_part_path): log_message(f"โš ๏ธ Multi-part RAR detected but first part not found: {first_part}") return False rar_path = first_part_path log_message(f"๐Ÿ“ฆ Processing multi-part RAR starting with: {first_part}") for attempt in range(max_retries): try: # Test RAR first test_cmd = ["unrar", "t", rar_path] test_result = subprocess.run(test_cmd, capture_output=True, text=True) if test_result.returncode != 0: log_message(f"โš ๏ธ RAR test failed: {test_result.stderr}") if attempt == max_retries - 1: return False continue # Extract RAR cmd = ["unrar", "x", "-o+", rar_path, output_dir] if attempt > 0: # Try recovery on subsequent attempts cmd.insert(2, "-kb") result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode == 0: log_message(f"โœ… Successfully extracted: {os.path.basename(rar_path)}") return True else: error_msg = result.stderr or result.stdout log_message(f"โš ๏ธ Extraction attempt {attempt + 1} failed: {error_msg}") if "checksum error" in error_msg.lower() or "CRC failed" in error_msg: log_message(f"โš ๏ธ Data corruption detected, attempt {attempt + 1}") elif result.returncode == 10: log_message(f"โš ๏ธ No files to extract (exit code 10)") return False elif result.returncode == 1: log_message(f"โš ๏ธ Non-fatal error (exit code 1)") except Exception as e: log_message(f"โŒ Extraction exception: {str(e)}") if attempt == max_retries - 1: return False time.sleep(1) return False def ensure_dir(path): os.makedirs(path, exist_ok=True) def extract_frames(video_path, output_dir, fps=DEFAULT_FPS): """Extract frames from video at the specified frames per second (fps).""" log_message(f"[INFO] Extracting frames from {video_path} to {output_dir} at {fps} fps...") ensure_dir(output_dir) cap = cv2.VideoCapture(str(video_path)) if not cap.isOpened(): log_message(f"[ERROR] Failed to open video file: {video_path}") return 0 video_fps = cap.get(cv2.CAP_PROP_FPS) if not video_fps or video_fps <= 0: video_fps = 30 # fallback if FPS is not available log_message(f"[WARN] Using fallback FPS: {video_fps}") frame_interval = int(round(video_fps / fps)) frame_idx = 0 saved_idx = 1 total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) log_message(f"[DEBUG] Total frames in video: {total_frames}") while cap.isOpened(): ret, frame = cap.read() if not ret: break if frame_idx % frame_interval == 0: frame_name = f"{saved_idx:04d}.png" cv2.imwrite(str(Path(output_dir) / frame_name), frame) saved_idx += 1 frame_idx += 1 cap.release() log_message(f"Extracted {saved_idx-1} frames from {video_path} to {output_dir}") return saved_idx - 1 def analyze_frame_with_florence2(image_path: str, prompt: str = "") -> Dict: """Analyze a single frame using Florence-2 vision model.""" if not vision_language_model_large or not vision_language_processor_large: return { "image": os.path.basename(image_path), "description": "[ERROR] Vision model not loaded." } image = Image.open(image_path).convert("RGB") inputs = vision_language_processor_large(images=image, text=prompt, return_tensors="pt").to(device) with torch.no_grad(): generated_ids = vision_language_model_large.generate( input_ids=inputs["input_ids"], pixel_values=inputs["pixel_values"], max_new_tokens=512, do_sample=False, num_beams=3 ) generated_text = vision_language_processor_large.batch_decode(generated_ids, skip_special_tokens=False)[0] description = vision_language_processor_large.post_process_generation( generated_text, task="", image_size=(image.width, image.height) )[""] return { "image": os.path.basename(image_path), "description": description } def summarize_activities(frame_analyses: List[Dict]) -> Dict: """Summarize activities from frame analyses.""" return { # "steps": [ # { # "action": "Open Blender software", # "description": "User launches Blender 3D modeling application on their computer" # }, # { # "action": "Create 3D object", # "description": "User works with a default cube object in the 3D viewport" # }, # { # "action": "Manipulate 3D model", # "description": "User rotates and transforms the cube using mouse interactions" # }, # { # "action": "Navigate interface", # "description": "User explores different tools and panels in the Blender interface" # } # ], # "high_level_goal": "Learning basic 3D modeling operations in Blender software", # "creative_actions": "3D object manipulation, interface navigation, basic modeling workflow", # "objects": ["computer", "monitor", "mouse", "keyboard", "Blender software", "3D cube", "desktop interface"], # "final_goal": "Introduction to Blender 3D modeling fundamentals and basic object manipulation" } def analyze_frames(frames_dir: str, output_json_path: str, prompt: Optional[str] = None) -> int: """Analyze all frames in directory using Florence-2 model.""" log_message(f"[INFO] Analyzing frames in {frames_dir}...") frames_dir = Path(frames_dir).resolve() output_json_path = Path(output_json_path).resolve() ensure_dir(frames_dir) ensure_dir(output_json_path.parent) frame_analyses = [] analyzed_count = 0 for frame_file in sorted(frames_dir.glob("*.png")): analysis = analyze_frame_with_florence2(str(frame_file), prompt) frame_analyses.append(analysis) analyzed_count += 1 # Generate summary summary = summarize_activities(frame_analyses) # Save results results = { "frame_analyses": frame_analyses, "summary": summary } try: with open(output_json_path, "w") as f: json.dump(results, f, indent=2) log_message(f"[SUCCESS] Analysis results saved to {output_json_path}") except Exception as e: log_message(f"[ERROR] Failed to write output JSON: {e}") return analyzed_count def process_rar_file(rar_path: str) -> bool: """Process a single RAR file - extract, then process videos for frames and vision analysis""" filename = os.path.basename(rar_path) processing_status["current_file"] = filename # Handle multi-part RAR naming if is_multipart_rar(filename): course_name = get_rar_part_base(filename) else: course_name = filename.replace(".rar", "") extract_dir = os.path.join(EXTRACT_FOLDER, course_name) try: log_message(f"๐Ÿ”„ Processing: {filename}") # Clean up any existing directory if os.path.exists(extract_dir): shutil.rmtree(extract_dir, ignore_errors=True) # Extract RAR os.makedirs(extract_dir, exist_ok=True) if not extract_with_retry(rar_path, extract_dir): raise Exception("RAR extraction failed") # Count extracted files file_count = 0 video_files_found = [] for root, dirs, files in os.walk(extract_dir): for file in files: file_count += 1 if file.lower().endswith((".mp4", ".avi", ".mov", ".mkv")): video_files_found.append(os.path.join(root, file)) processing_status["extracted_courses"] += 1 log_message(f"โœ… Successfully extracted '{course_name}' ({file_count} files, {len(video_files_found)} videos)") # Process video files for frame extraction and vision analysis for video_path in video_files_found: video_filename = Path(video_path).name # Create a unique output directory for frames for each video frames_output_dir = os.path.join(FRAMES_OUTPUT_FOLDER, f"{course_name}_{video_filename.replace('.', '_')}_frames") ensure_dir(frames_output_dir) extracted_frames_count = extract_frames(video_path, frames_output_dir, fps=DEFAULT_FPS) processing_status["extracted_frames_count"] += extracted_frames_count if extracted_frames_count > 0: processing_status["extracted_videos"] += 1 log_message(f"[INFO] Extracted {extracted_frames_count} frames from {video_filename}") # Perform vision analysis on the extracted frames analysis_output_json = os.path.join(ANALYSIS_OUTPUT_FOLDER, f"{course_name}_{video_filename.replace('.', '_')}_analysis.json") analyzed_frames = analyze_frames(frames_output_dir, analysis_output_json) processing_status["analyzed_frames_count"] += analyzed_frames log_message(f"[INFO] Analyzed {analyzed_frames} frames from {video_filename}") else: log_message(f"[WARN] No frames extracted from {video_filename}") return True except Exception as e: error_msg = str(e) log_message(f"โŒ Processing failed: {error_msg}") log_failed_file(filename, error_msg) return False finally: processing_status["current_file"] = None def main_processing_loop(start_index: int = 0): """Main processing workflow - extraction, frame extraction, and vision analysis""" processing_status["is_running"] = True try: # Load state processed_rars = load_json_state(PROCESS_STATE_FILE, {"processed_rars": []})["processed_rars"] download_state = load_json_state(DOWNLOAD_STATE_FILE, {"next_download_index": 0}) # Use start_index if provided, otherwise use the saved state next_index = start_index if start_index > 0 else download_state["next_download_index"] log_message(f"๐Ÿ“Š Starting from index {next_index}") log_message(f"๐Ÿ“Š Previously processed: {len(processed_rars)} files") # Get file list try: files = list(hf_api.list_repo_files(repo_id=SOURCE_REPO_ID, repo_type="dataset")) rar_files = sorted([f for f in files if f.endswith(".rar")]) processing_status["total_files"] = len(rar_files) log_message(f"๐Ÿ“ Found {len(rar_files)} RAR files in repository") if next_index >= len(rar_files): log_message("โœ… All files have been processed!") return except Exception as e: log_message(f"โŒ Failed to get file list: {str(e)}") return # Process only one file per run if next_index < len(rar_files): rar_file = rar_files[next_index] filename = os.path.basename(rar_file) if filename in processed_rars: log_message(f"โญ๏ธ Skipping already processed: {filename}") processing_status["processed_files"] += 1 # Move to next file next_index += 1 save_json_state(DOWNLOAD_STATE_FILE, {"next_download_index": next_index}) log_message(f"๐Ÿ“Š Moving to next file. Progress: {next_index}/{len(rar_files)}") return log_message(f"๐Ÿ“ฅ Downloading: {filename}") dest_path = os.path.join(DOWNLOAD_FOLDER, filename) # Download file download_url = f"https://huggingface.co/datasets/{SOURCE_REPO_ID}/resolve/main/{rar_file}" if download_with_retry(download_url, dest_path): # Process file if process_rar_file(dest_path): processed_rars.append(filename) save_json_state(PROCESS_STATE_FILE, {"processed_rars": processed_rars}) log_message(f"โœ… Successfully processed: {filename}") processing_status["processed_files"] += 1 else: log_message(f"โŒ Failed to process: {filename}") processing_status["failed_files"] += 1 # Clean up downloaded file try: os.remove(dest_path) log_message(f"๐Ÿ—‘๏ธ Cleaned up download: {filename}") except: pass else: log_message(f"โŒ Failed to download: {filename}") processing_status["failed_files"] += 1 # Update download state for next run next_index += 1 save_json_state(DOWNLOAD_STATE_FILE, {"next_download_index": next_index}) # Status update log_message(f"๐Ÿ“Š Progress: {next_index}/{len(rar_files)} files processed") log_message(f'๐Ÿ“Š Extracted: {processing_status["extracted_courses"]} courses') log_message(f'๐Ÿ“Š Videos Processed: {processing_status["extracted_videos"]}') log_message(f'๐Ÿ“Š Frames Extracted: {processing_status["extracted_frames_count"]}') log_message(f'๐Ÿ“Š Frames Analyzed: {processing_status["analyzed_frames_count"]}') log_message(f'๐Ÿ“Š Failed: {processing_status["failed_files"]} files') if next_index < len(rar_files): log_message(f"๐Ÿ”„ Run the script again to process the next file: {os.path.basename(rar_files[next_index])}") else: log_message("๐ŸŽ‰ All files have been processed!") else: log_message("โœ… All files have been processed!") log_message("๐ŸŽ‰ Processing complete!") log_message(f'๐Ÿ“Š Final stats: {processing_status["extracted_courses"]} courses extracted, {processing_status["extracted_videos"]} videos processed, {processing_status["extracted_frames_count"]} frames extracted, {processing_status["analyzed_frames_count"]} frames analyzed') except KeyboardInterrupt: log_message("โน๏ธ Processing interrupted by user") except Exception as e: log_message(f"โŒ Fatal error: {str(e)}") finally: processing_status["is_running"] = False cleanup_temp_files() # FastAPI Endpoints @app.post("/analyze-video") async def analyze_video_endpoint( file: UploadFile = File(...), fps: int = Form(DEFAULT_FPS), prompt: Optional[str] = Form(None) ): """Analyze a single video file and return frame-by-frame analysis.""" if not file.filename.lower().endswith((".mp4", ".avi", ".mov", ".mkv")): return JSONResponse(status_code=400, content={ "error": "File type not allowed", "allowed_types": [".mp4", ".avi", ".mov", ".mkv"] }) with tempfile.TemporaryDirectory() as temp_dir: temp_dir_path = Path(temp_dir) file_path = temp_dir_path / file.filename with open(file_path, "wb") as buffer: shutil.copyfileobj(file.file, buffer) frames_dir = temp_dir_path / "frames" frame_count = extract_frames(file_path, frames_dir, fps) frame_analyses = [] for frame_file in sorted(frames_dir.glob("*.png")): analysis = analyze_frame_with_florence2(str(frame_file), prompt) frame_analyses.append(analysis) summary = summarize_activities(frame_analyses) return JSONResponse(content={ "video_filename": file.filename, "frame_count": frame_count, "fps": fps, "frame_analyses": frame_analyses, "summary": summary }) @app.post("/analyze-archive") async def analyze_archive_endpoint( file: UploadFile = File(...), fps: int = Form(DEFAULT_FPS), prompt: Optional[str] = Form(None) ): """Analyze videos from RAR/ZIP archive and return frame-by-frame analysis.""" if not file.filename.lower().endswith((".rar", ".zip")): return JSONResponse(status_code=400, content={ "error": "File type not allowed", "allowed_types": [".rar", ".zip"] }) with tempfile.TemporaryDirectory() as temp_dir: temp_dir_path = Path(temp_dir) file_path = temp_dir_path / file.filename with open(file_path, "wb") as buffer: shutil.copyfileobj(file.file, buffer) extract_dir = temp_dir_path / "extracted" video_files = [] if file.filename.lower().endswith(".rar"): with rarfile.RarFile(file_path) as rf: rf.extractall(extract_dir) else: with zipfile.ZipFile(file_path) as zf: zf.extractall(extract_dir) # Find video files in extracted content for root, dirs, files in os.walk(extract_dir): for file in files: if file.lower().endswith((".mp4", ".avi", ".mov", ".mkv")): video_files.append(Path(root) / file) if not video_files: return JSONResponse(status_code=400, content={ "error": "No video files found in archive" }) results = [] for video_path in video_files: video_name = video_path.name frames_dir = temp_dir_path / f"frames_{video_name}" frame_count = extract_frames(video_path, frames_dir, fps) frame_analyses = [] for frame_file in sorted(frames_dir.glob("*.png")): analysis = analyze_frame_with_florence2(str(frame_file), prompt) frame_analyses.append(analysis) summary = summarize_activities(frame_analyses) results.append({ "video_filename": video_name, "frame_count": frame_count, "fps": fps, "frame_analyses": frame_analyses, "summary": summary }) return JSONResponse(content={ "archive_filename": file.filename, "videos_processed": len(video_files), "results": results }) @app.get("/health") async def health_check(): """Health check endpoint.""" return JSONResponse(content={ "status": "healthy", "model": "Florence-2 (Mock)", "note": "Florence-2 model is mocked due to sandbox memory limitations." }) @app.get("/status") async def get_processing_status(): """Get current processing status.""" return JSONResponse(content=processing_status) # Expose necessary functions and variables __all__ = [ "main_processing_loop", "processing_status", "ANALYSIS_OUTPUT_FOLDER", "log_message", "send_email_with_attachment", "analyze_frames", "extract_frames", "DEFAULT_FPS", "ensure_dir" ]