import os import json import requests import subprocess import shutil import time import re import threading from typing import Dict, List, Set, Optional from huggingface_hub import HfApi, list_repo_files from fastapi import FastAPI, File, UploadFile, Form from fastapi.responses import JSONResponse from pathlib import Path import smtplib from email.message import EmailMessage import tempfile import rarfile import zipfile import cv2 import numpy as np from PIL import Image import torch from transformers import AutoProcessor, AutoModelForCausalLM # from transformers import AutoProcessor, AutoModelForCausalLM # import os # # Try to ensure unrar is available # def setup_unrar(): # # Try system installation (may not work on all Spaces) # if not os.path.exists("/usr/bin/unrar"): # os.system("apt-get update > /dev/null 2>&1 && apt-get install -y unrar > /dev/null 2>&1 || true") # # Setup Python alternatives # try: # import rarfile # if os.path.exists("/usr/bin/unrar"): # rarfile.UNRAR_TOOL = "/usr/bin/unrar" # except ImportError: # os.system("pip install rarfile") # try: # import patoolib # except ImportError: # os.system("pip install patool") # setup_unrar() rarfile.UNRAR_TOOL = None # Forces use of internal Python extractor rarfile.PATH_SEP = '/' # Safe default def log_message(message: str): """Log messages with timestamp""" timestamp = time.strftime("%Y-%m-%d %H:%M:%S") log_entry = f"[{timestamp}] {message}" print(log_entry) if 'processing_status' in globals(): processing_status["logs"].append(log_entry) processing_status["last_update"] = timestamp if len(processing_status["logs"]) > 100: processing_status["logs"] = processing_status["logs"][-100:] # Attempt to install flash-attn try: subprocess.run('pip install flash-attn --no-build-isolation', env={'FLASH_ATTENTION_SKIP_CUDA_BUILD': "TRUE"}, check=True, shell=True) except subprocess.CalledProcessError as e: print(f"Error installing flash-attn: {e}") print("Continuing without flash-attn.") # Determine the device to use device = "cuda" if torch.cuda.is_available() else "cpu" # Initialize FastAPI app = FastAPI() # ==== CONFIGURATION ==== HF_TOKEN = os.getenv("HF_TOKEN", "") SOURCE_REPO_ID = os.getenv("SOURCE_REPO", "Fred808/BG1") # Path Configuration DOWNLOAD_FOLDER = "downloads" EXTRACT_FOLDER = "extracted" FRAMES_OUTPUT_FOLDER = "extracted_frames" ANALYSIS_OUTPUT_FOLDER = "analysis_results" os.makedirs(DOWNLOAD_FOLDER, exist_ok=True) os.makedirs(EXTRACT_FOLDER, exist_ok=True) os.makedirs(FRAMES_OUTPUT_FOLDER, exist_ok=True) os.makedirs(ANALYSIS_OUTPUT_FOLDER, exist_ok=True) # State Files DOWNLOAD_STATE_FILE = "download_progress.json" PROCESS_STATE_FILE = "process_progress.json" FAILED_FILES_LOG = "failed_files.log" # Processing Parameters CHUNK_SIZE = 1 PROCESSING_DELAY = 2 MAX_RETRIES = 3 MIN_FREE_SPACE_GB = 2 # Minimum free space in GB before processing # Frame Extraction Parameters DEFAULT_FPS = 0.1 # Default frames per second for extraction # Initialize HF API hf_api = HfApi(token=HF_TOKEN) # Global State processing_status = { "is_running": False, "current_file": None, "total_files": 0, "processed_files": 0, "failed_files": 0, "extracted_courses": 0, "extracted_videos": 0, "extracted_frames_count": 0, "analyzed_frames_count": 0, "last_update": None, "logs": [] } import torch import subprocess import sys device = "cpu" # Explicitly ensure CPU usage try: vision_model = AutoModelForCausalLM.from_pretrained('microsoft/Florence-2-base', trust_remote_code=True, attn_implementation="eager" ).to(device).eval() vision_processor = AutoProcessor.from_pretrained('microsoft/Florence-2-base', trust_remote_code=True) except Exception as e: print(f"Error loading base model: {e}") vision_language_model_base = None vision_language_processor_base = None # Preprompt templates PREPROMPT_TEMPLATES = { "default": "This image shows: ", "design": "This design tutorial frame shows: ", "ui": "This user interface demonstrates: ", "motion": "This motion design example illustrates: " } def get_preprompt(video_filename: str) -> str: """Select appropriate preprompt based on video content""" filename = video_filename.lower() if any(x in filename for x in ["ui", "interface", "ux"]): return PREPROMPT_TEMPLATES["ui"] elif any(x in filename for x in ["design", "tutorial"]): return PREPROMPT_TEMPLATES["design"] elif any(x in filename for x in ["motion", "animation"]): return PREPROMPT_TEMPLATES["motion"] return PREPROMPT_TEMPLATES["default"] def log_message(message: str): """Log messages with timestamp""" timestamp = time.strftime("%Y-%m-%d %H:%M:%S") log_entry = f"[{timestamp}] {message}" print(log_entry) processing_status["logs"].append(log_entry) processing_status["last_update"] = timestamp if len(processing_status["logs"]) > 100: processing_status["logs"] = processing_status["logs"][-100:] import patoolib from patoolib.util import PatoolError def extract_with_retry(rar_path: str, output_dir: str, max_retries: int = 2) -> bool: """Extract RAR or ZIP file using patoolib with retry logic""" filename = os.path.basename(rar_path) # Handle multi-part RARs (skip them if patoolib can't handle) if is_multipart_rar(filename): base_name = get_rar_part_base(filename) first_part = f"{base_name}.part01.rar" first_part_path = os.path.join(os.path.dirname(rar_path), first_part) if not os.path.exists(first_part_path): log_message(f"โš ๏ธ Multi-part RAR detected but first part not found: {first_part}") return False rar_path = first_part_path log_message(f"๐Ÿ“ฆ Processing multi-part RAR starting with: {first_part}") for attempt in range(max_retries): try: os.makedirs(output_dir, exist_ok=True) patoolib.extract_archive(rar_path, outdir=output_dir, verbosity=-1) log_message(f"โœ… Successfully extracted: {filename} using patoolib") return True except PatoolError as e: log_message(f"โš ๏ธ patoolib extraction attempt {attempt + 1} failed: {str(e)}") time.sleep(1) except Exception as e: log_message(f"โŒ patoolib exception: {str(e)}") time.sleep(1) return False def log_failed_file(filename: str, error: str): """Log failed files to persistent file""" with open(FAILED_FILES_LOG, "a") as f: f.write(f'{time.strftime("%Y-%m-%d %H:%M:%S")} - {filename}: {error}\n') def get_disk_usage(path: str) -> Dict[str, float]: """Get disk usage statistics in GB""" statvfs = os.statvfs(path) total = statvfs.f_frsize * statvfs.f_blocks / (1024**3) free = statvfs.f_frsize * statvfs.f_bavail / (1024**3) used = total - free return {"total": total, "free": free, "used": used} def check_disk_space(path: str = ".") -> bool: """Check if there\'s enough disk space""" disk_info = get_disk_usage(path) if disk_info["free"] < MIN_FREE_SPACE_GB: log_message(f'โš ๏ธ Low disk space: {disk_info["free"]:.2f}GB free, {disk_info["used"]:.2f}GB used') return False return True def cleanup_temp_files(): """Clean up temporary files to free space""" log_message("๐Ÿงน Cleaning up temporary files...") # Clean old downloads (keep only current processing file) current_file = processing_status.get("current_file") for file in os.listdir(DOWNLOAD_FOLDER): if file != current_file and file.endswith((".rar", ".zip")): try: os.remove(os.path.join(DOWNLOAD_FOLDER, file)) log_message(f"๐Ÿ—‘๏ธ Removed old download: {file}") except: pass def load_json_state(file_path: str, default_value): """Load state from JSON file""" if os.path.exists(file_path): try: with open(file_path, "r") as f: return json.load(f) except json.JSONDecodeError: log_message(f"โš ๏ธ Corrupted state file: {file_path}") return default_value def save_json_state(file_path: str, data): """Save state to JSON file""" with open(file_path, "w") as f: json.dump(data, f, indent=2) def download_with_retry(url: str, dest_path: str, max_retries: int = 3) -> bool: """Download file with retry logic and disk space checking""" if not check_disk_space(): cleanup_temp_files() if not check_disk_space(): log_message("โŒ Insufficient disk space even after cleanup") return False headers = {"Authorization": f"Bearer {HF_TOKEN}"} for attempt in range(max_retries): try: with requests.get(url, headers=headers, stream=True) as r: r.raise_for_status() # Check content length if available content_length = r.headers.get("content-length") if content_length: size_gb = int(content_length) / (1024**3) disk_info = get_disk_usage(".") if size_gb > disk_info["free"] - 0.5: # Leave 0.5GB buffer log_message(f'โŒ File too large: {size_gb:.2f}GB, only {disk_info["free"]:.2f}GB free') return False with open(dest_path, "wb") as f: for chunk in r.iter_content(chunk_size=8192): f.write(chunk) return True except Exception as e: if attempt < max_retries - 1: time.sleep(2 ** attempt) continue log_message(f"โŒ Download failed after {max_retries} attempts: {e}") return False return False def is_multipart_rar(filename: str) -> bool: """Check if this is a multi-part RAR file""" return ".part" in filename.lower() and filename.lower().endswith(".rar") def get_rar_part_base(filename: str) -> str: """Get the base name for multi-part RAR files""" if ".part" in filename.lower(): return filename.split(".part")[0] return filename.replace(".rar", "") def ensure_dir(path): os.makedirs(path, exist_ok=True) def extract_frames(video_path, output_dir, fps=DEFAULT_FPS): """Extract frames from video at the specified frames per second (fps).""" log_message(f"[INFO] Extracting frames from {video_path} to {output_dir} at {fps} fps...") ensure_dir(output_dir) cap = cv2.VideoCapture(str(video_path)) if not cap.isOpened(): log_message(f"[ERROR] Failed to open video file: {video_path}") return 0 video_fps = cap.get(cv2.CAP_PROP_FPS) if not video_fps or video_fps <= 0: video_fps = 30 # fallback if FPS is not available log_message(f"[WARN] Using fallback FPS: {video_fps}") frame_interval = int(round(video_fps / fps)) frame_idx = 0 saved_idx = 1 total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) log_message(f"[DEBUG] Total frames in video: {total_frames}") while cap.isOpened(): ret, frame = cap.read() if not ret: break if frame_idx % frame_interval == 0: if saved_idx <= 10: # Limit to 10 frames for testing frame_name = f"{saved_idx:04d}.png" cv2.imwrite(str(Path(output_dir) / frame_name), frame) saved_idx += 1 else: break # Stop extracting after 10 frames frame_idx += 1 cap.release() log_message(f"Extracted {saved_idx-1} frames from {video_path} to {output_dir}") return saved_idx - 1 def analyze_single_frame(image_path: str, preprompt: str = "") -> dict: """Consistent frame processing function with robust error handling""" if not vision_model or not vision_processor: return { "image": os.path.basename(image_path), "description": "[ERROR] Model not loaded", "success": False } try: # Load and resize image image = Image.open(image_path).convert("RGB") image = image.resize((224, 224)) # Ensure tokenizer padding config is safe tokenizer = vision_processor.tokenizer if tokenizer.pad_token is None: tokenizer.pad_token = tokenizer.eos_token tokenizer.padding_side = "right" # Preprocess inputs inputs = vision_processor( images=[image], text=preprompt, return_tensors="pt", padding="max_length", truncation=True, max_length=512 ).to(device) # Safety: check pixel_values shape pixel_values = inputs["pixel_values"] if pixel_values.dim() == 3: pixel_values = pixel_values.unsqueeze(0) # Generate caption with torch.no_grad(): outputs = vision_model.generate( input_ids=inputs["input_ids"], attention_mask=inputs["attention_mask"], pixel_values=pixel_values, max_new_tokens=500, num_beams=5, early_stopping=False, pad_token_id=tokenizer.pad_token_id ) caption = vision_processor.batch_decode( outputs, skip_special_tokens=True )[0].strip() return { "image": os.path.basename(image_path), "description": caption, "success": True } except Exception as e: return { "image": os.path.basename(image_path), "description": f"[ERROR] {str(e)}", "success": False } def process_video_frames(frames_dir: str, video_filename: str, output_file: str) -> bool: """Main processing function with first-frame validation""" try: frames = sorted(Path(frames_dir).glob("*.png")) if not frames: print("โŒ No frames found in directory") return False # Validate first frame first_frame_result = analyze_single_frame(str(frames[0]), get_preprompt(video_filename)) print("\n=== FIRST FRAME VALIDATION ===") print(f'Image: {first_frame_result["image"]}') print(f'Result: {first_frame_result["description"]}') print(f'Status: {"Success" if first_frame_result["success"] else "Failed"}\n') if not first_frame_result["success"]: print("โŒ Aborting due to first frame failure") return False preprompt = get_preprompt(video_filename) results = { "metadata": { "video": video_filename, "preprompt": preprompt, "total_frames": len(frames), "processed_frames": 0, "failed_frames": 0 }, "frames": [] } for i, frame_path in enumerate(frames): result = analyze_single_frame(str(frame_path), preprompt) results["frames"].append(result) if result["success"]: results["metadata"]["processed_frames"] += 1 else: results["metadata"]["failed_frames"] += 1 # Periodic saving if i % 10 == 0: with open(output_file, "w") as f: json.dump(results, f, indent=2) # Final save with open(output_file, "w") as f: json.dump(results, f, indent=2) return True except Exception as e: print(f"โŒ Processing failed: {str(e)}") return False def summarize_activities(frame_analyses: List[Dict]) -> Dict: """Summarize activities from frame analyses.""" return {} def process_rar_file(rar_path: str) -> bool: """Process a single RAR file with new frame processing""" filename = os.path.basename(rar_path) processing_status["current_file"] = filename # Handle multi-part RAR naming if is_multipart_rar(filename): course_name = get_rar_part_base(filename) else: course_name = filename.replace(".rar", "") extract_dir = os.path.join(EXTRACT_FOLDER, course_name) try: log_message(f"๐Ÿ”„ Processing: {filename}") # Clean up any existing directory if os.path.exists(extract_dir): shutil.rmtree(extract_dir, ignore_errors=True) # Extract RAR os.makedirs(extract_dir, exist_ok=True) if not extract_with_retry(rar_path, extract_dir): raise Exception("RAR extraction failed") # Process video files video_files = [] for root, _, files in os.walk(extract_dir): for file in files: if file.lower().endswith((".mp4", ".avi", ".mov", ".mkv")): video_files.append(os.path.join(root, file)) processing_status["extracted_courses"] += 1 log_message(f"โœ… Extracted {len(video_files)} videos from \'{course_name}\'") # Process each video for video_path in video_files: video_filename = Path(video_path).name video_filename_clean = video_filename.replace(".", "_") frames_dir = os.path.join(FRAMES_OUTPUT_FOLDER, f"{course_name}_{video_filename_clean}_frames") ensure_dir(frames_dir) # Extract frames extracted_count = extract_frames(video_path, frames_dir, DEFAULT_FPS) if extracted_count == 0: raise Exception(f"No frames extracted from {video_filename}") processing_status["extracted_frames_count"] += extracted_count # Analyze frames video_filename_clean = video_filename.replace(".", "_") analysis_output = os.path.join(ANALYSIS_OUTPUT_FOLDER, f"{course_name}_{video_filename_clean}_analysis.json") if process_video_frames(frames_dir, video_filename, analysis_output): processing_status["analyzed_frames_count"] += extracted_count processing_status["extracted_videos"] += 1 else: raise Exception(f"Frame analysis failed for {video_filename}") return True except Exception as e: error_msg = str(e) log_message(f"โŒ Processing failed: {error_msg}") log_failed_file(filename, error_msg) return False finally: processing_status["current_file"] = None def main_processing_loop(start_index: int = 0): """Main processing workflow - extraction, frame extraction, and vision analysis""" processing_status["is_running"] = True try: # Load state processed_rars = load_json_state(PROCESS_STATE_FILE, {"processed_rars": []})["processed_rars"] download_state = load_json_state(DOWNLOAD_STATE_FILE, {"next_download_index": 0}) # Use start_index if provided, otherwise use the saved state next_index = start_index if start_index > 0 else download_state["next_download_index"] log_message(f"๐Ÿ“Š Starting from index {next_index}") log_message(f"๐Ÿ“Š Previously processed: {len(processed_rars)} files") # Get file list try: files = list(hf_api.list_repo_files(repo_id=SOURCE_REPO_ID, repo_type="dataset")) rar_files = sorted([f for f in files if f.endswith(".rar")]) processing_status["total_files"] = len(rar_files) log_message(f"๐Ÿ“ Found {len(rar_files)} RAR files in repository") if next_index >= len(rar_files): log_message("โœ… All files have been processed!") return except Exception as e: log_message(f"โŒ Failed to get file list: {str(e)}") return # Process only one file per run if next_index < len(rar_files): rar_file = rar_files[next_index] filename = os.path.basename(rar_file) if filename in processed_rars: log_message(f"โญ๏ธ Skipping already processed: {filename}") processing_status["processed_files"] += 1 # Move to next file next_index += 1 save_json_state(DOWNLOAD_STATE_FILE, {"next_download_index": next_index}) log_message(f"๐Ÿ“Š Moving to next file. Progress: {next_index}/{len(rar_files)}") return log_message(f"๐Ÿ“ฅ Downloading: {filename}") dest_path = os.path.join(DOWNLOAD_FOLDER, filename) # Download file download_url = f"https://huggingface.co/datasets/{SOURCE_REPO_ID}/resolve/main/{rar_file}" if download_with_retry(download_url, dest_path): # Process file if process_rar_file(dest_path): processed_rars.append(filename) save_json_state(PROCESS_STATE_FILE, {"processed_rars": processed_rars}) log_message(f"โœ… Successfully processed: {filename}") processing_status["processed_files"] += 1 else: log_message(f"โŒ Failed to process: {filename}") processing_status["failed_files"] += 1 # Clean up downloaded file try: os.remove(dest_path) log_message(f"๐Ÿ—‘๏ธ Cleaned up download: {filename}") except: pass else: log_message(f"โŒ Failed to download: {filename}") processing_status["failed_files"] += 1 # Update download state for next run next_index += 1 save_json_state(DOWNLOAD_STATE_FILE, {"next_download_index": next_index}) # Status update log_message(f"๐Ÿ“Š Progress: {next_index}/{len(rar_files)} files processed") log_message(f"๐Ÿ“Š Extracted: {processing_status['extracted_courses']} courses") log_message(f"๐Ÿ“Š Videos Processed: {processing_status['extracted_videos']} videos") log_message(f"๐Ÿ“Š Frames Extracted: {processing_status['extracted_frames_count']} frames") log_message(f"๐Ÿ“Š Frames Analyzed: {processing_status['analyzed_frames_count']} frames") log_message(f"๐Ÿ“Š Failed: {processing_status['failed_files']} files") if next_index < len(rar_files): log_message(f"๐Ÿ”„ Run the script again to process the next file: {os.path.basename(rar_files[next_index])}") else: log_message("๐ŸŽ‰ All files have been processed!") else: log_message("โœ… All files have been processed!") log_message(f"๐ŸŽ‰ Processing complete!") log_message(f"๐Ÿ“Š Final stats: {processing_status['extracted_courses']} courses extracted, {processing_status['extracted_videos']} videos processed, {processing_status['extracted_frames_count']} frames extracted, {processing_status['analyzed_frames_count']} frames analyzed") except KeyboardInterrupt: log_message("โน๏ธ Processing interrupted by user") except Exception as e: log_message(f"โŒ Fatal error: {str(e)}") finally: processing_status["is_running"] = False cleanup_temp_files() # FastAPI Endpoints @app.post("/analyze-video") async def analyze_video_endpoint( file: UploadFile = File(...), fps: float = Form(DEFAULT_FPS), prompt: Optional[str] = Form(None) ): """Analyze a single video file and return frame-by-frame analysis.""" if not file.filename.lower().endswith((".mp4", ".avi", ".mov", ".mkv")): return JSONResponse(status_code=400, content={ "error": "File type not allowed", "allowed_types": [".mp4", ".avi", ".mov", ".mkv"] }) with tempfile.TemporaryDirectory() as temp_dir: temp_dir_path = Path(temp_dir) file_path = temp_dir_path / file.filename with open(file_path, "wb") as buffer: shutil.copyfileobj(file.file, buffer) frames_dir = temp_dir_path / "frames" frame_count = extract_frames(file_path, frames_dir, fps) frame_analyses = [] for frame_file in sorted(frames_dir.glob("*.png")): analysis = analyze_single_frame(str(frame_file), prompt or "") frame_analyses.append(analysis) summary = summarize_activities(frame_analyses) return JSONResponse(content={ "video_filename": file.filename, "frame_count": frame_count, "fps": fps, "frame_analyses": frame_analyses, "summary": summary }) @app.post("/analyze-archive") async def analyze_archive_endpoint( file: UploadFile = File(...), fps: float = Form(DEFAULT_FPS), prompt: Optional[str] = Form(None) ): """Analyze videos from RAR/ZIP archive and return frame-by-frame analysis.""" if not file.filename.lower().endswith((".rar", ".zip")): return JSONResponse(status_code=400, content={ "error": "File type not allowed", "allowed_types": [".rar", ".zip"] }) with tempfile.TemporaryDirectory() as temp_dir: temp_dir_path = Path(temp_dir) file_path = temp_dir_path / file.filename with open(file_path, "wb") as buffer: shutil.copyfileobj(file.file, buffer) extract_dir = temp_dir_path / "extracted" video_files = [] if file.filename.lower().endswith(".rar"): with rarfile.RarFile(file_path) as rf: rf.extractall(extract_dir) else: with zipfile.ZipFile(file_path) as zf: zf.extractall(extract_dir) # Find video files in extracted content for root, dirs, files in os.walk(extract_dir): for file in files: if file.lower().endswith((".mp4", ".avi", ".mov", ".mkv")): video_files.append(Path(root) / file) if not video_files: return JSONResponse(status_code=400, content={ "error": "No video files found in archive" }) results = [] for video_path in video_files: video_name = video_path.name frames_dir = temp_dir_path / f"frames_{video_name}" frame_count = extract_frames(video_path, frames_dir, fps) frame_analyses = [] for frame_file in sorted(frames_dir.glob("*.png")): analysis = analyze_single_frame(str(frame_file), prompt or "") frame_analyses.append(analysis) summary = summarize_activities(frame_analyses) results.append({ "video_filename": video_name, "frame_count": frame_count, "fps": fps, "frame_analyses": frame_analyses, "summary": summary }) return JSONResponse(content={ "archive_filename": file.filename, "videos_processed": len(video_files), "results": results }) @app.get("/health") async def health_check(): """Health check endpoint.""" return JSONResponse(content={ "status": "healthy", "model": "GIT", "note": "Now using GIT model." }) @app.get("/status") async def get_processing_status(): """Get current processing status.""" return JSONResponse(content=processing_status) # Expose necessary functions and variables __all__ = [ "main_processing_loop", "processing_status", "ANALYSIS_OUTPUT_FOLDER", "log_message", "analyze_single_frame", "extract_frames", "DEFAULT_FPS", "ensure_dir" ]