VSG / vision_analyzer.py
Factor Studios
Update vision_analyzer.py
ca2d4cf verified
import os
import json
import requests
import subprocess
import shutil
import time
import re
import threading
from typing import Dict, List, Set, Optional
from huggingface_hub import HfApi, list_repo_files
from fastapi import FastAPI, File, UploadFile, Form
from fastapi.responses import JSONResponse
from pathlib import Path
import smtplib
from email.message import EmailMessage
import tempfile
import rarfile
import zipfile
import cv2
import numpy as np
from PIL import Image
import torch
from transformers import AutoProcessor, AutoModelForCausalLM
# from transformers import AutoProcessor, AutoModelForCausalLM
# import os
# # Try to ensure unrar is available
# def setup_unrar():
# # Try system installation (may not work on all Spaces)
# if not os.path.exists("/usr/bin/unrar"):
# os.system("apt-get update > /dev/null 2>&1 && apt-get install -y unrar > /dev/null 2>&1 || true")
# # Setup Python alternatives
# try:
# import rarfile
# if os.path.exists("/usr/bin/unrar"):
# rarfile.UNRAR_TOOL = "/usr/bin/unrar"
# except ImportError:
# os.system("pip install rarfile")
# try:
# import patoolib
# except ImportError:
# os.system("pip install patool")
# setup_unrar()
rarfile.UNRAR_TOOL = None # Forces use of internal Python extractor
rarfile.PATH_SEP = '/' # Safe default
def log_message(message: str):
"""Log messages with timestamp"""
timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
log_entry = f"[{timestamp}] {message}"
print(log_entry)
if 'processing_status' in globals():
processing_status["logs"].append(log_entry)
processing_status["last_update"] = timestamp
if len(processing_status["logs"]) > 100:
processing_status["logs"] = processing_status["logs"][-100:]
# Attempt to install flash-attn
try:
subprocess.run('pip install flash-attn --no-build-isolation', env={'FLASH_ATTENTION_SKIP_CUDA_BUILD': "TRUE"}, check=True, shell=True)
except subprocess.CalledProcessError as e:
print(f"Error installing flash-attn: {e}")
print("Continuing without flash-attn.")
# Determine the device to use
device = "cuda" if torch.cuda.is_available() else "cpu"
# Initialize FastAPI
app = FastAPI()
# ==== CONFIGURATION ====
HF_TOKEN = os.getenv("HF_TOKEN", "")
SOURCE_REPO_ID = os.getenv("SOURCE_REPO", "Fred808/BG1")
# Path Configuration
DOWNLOAD_FOLDER = "downloads"
EXTRACT_FOLDER = "extracted"
FRAMES_OUTPUT_FOLDER = "extracted_frames"
ANALYSIS_OUTPUT_FOLDER = "analysis_results"
os.makedirs(DOWNLOAD_FOLDER, exist_ok=True)
os.makedirs(EXTRACT_FOLDER, exist_ok=True)
os.makedirs(FRAMES_OUTPUT_FOLDER, exist_ok=True)
os.makedirs(ANALYSIS_OUTPUT_FOLDER, exist_ok=True)
# State Files
DOWNLOAD_STATE_FILE = "download_progress.json"
PROCESS_STATE_FILE = "process_progress.json"
FAILED_FILES_LOG = "failed_files.log"
# Processing Parameters
CHUNK_SIZE = 1
PROCESSING_DELAY = 2
MAX_RETRIES = 3
MIN_FREE_SPACE_GB = 2 # Minimum free space in GB before processing
# Frame Extraction Parameters
DEFAULT_FPS = 0.1 # Default frames per second for extraction
# Initialize HF API
hf_api = HfApi(token=HF_TOKEN)
# Global State
processing_status = {
"is_running": False,
"current_file": None,
"total_files": 0,
"processed_files": 0,
"failed_files": 0,
"extracted_courses": 0,
"extracted_videos": 0,
"extracted_frames_count": 0,
"analyzed_frames_count": 0,
"last_update": None,
"logs": []
}
import torch
import subprocess
import sys
device = "cpu" # Explicitly ensure CPU usage
try:
vision_model = AutoModelForCausalLM.from_pretrained('microsoft/Florence-2-base', trust_remote_code=True,
attn_implementation="eager" ).to(device).eval()
vision_processor = AutoProcessor.from_pretrained('microsoft/Florence-2-base', trust_remote_code=True)
except Exception as e:
print(f"Error loading base model: {e}")
vision_language_model_base = None
vision_language_processor_base = None
# Preprompt templates
PREPROMPT_TEMPLATES = {
"default": "This image shows: ",
"design": "This design tutorial frame shows: ",
"ui": "This user interface demonstrates: ",
"motion": "This motion design example illustrates: "
}
def get_preprompt(video_filename: str) -> str:
"""Select appropriate preprompt based on video content"""
filename = video_filename.lower()
if any(x in filename for x in ["ui", "interface", "ux"]):
return PREPROMPT_TEMPLATES["ui"]
elif any(x in filename for x in ["design", "tutorial"]):
return PREPROMPT_TEMPLATES["design"]
elif any(x in filename for x in ["motion", "animation"]):
return PREPROMPT_TEMPLATES["motion"]
return PREPROMPT_TEMPLATES["default"]
def log_message(message: str):
"""Log messages with timestamp"""
timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
log_entry = f"[{timestamp}] {message}"
print(log_entry)
processing_status["logs"].append(log_entry)
processing_status["last_update"] = timestamp
if len(processing_status["logs"]) > 100:
processing_status["logs"] = processing_status["logs"][-100:]
import patoolib
from patoolib.util import PatoolError
def extract_with_retry(rar_path: str, output_dir: str, max_retries: int = 2) -> bool:
"""Extract RAR or ZIP file using patoolib with retry logic"""
filename = os.path.basename(rar_path)
# Handle multi-part RARs (skip them if patoolib can't handle)
if is_multipart_rar(filename):
base_name = get_rar_part_base(filename)
first_part = f"{base_name}.part01.rar"
first_part_path = os.path.join(os.path.dirname(rar_path), first_part)
if not os.path.exists(first_part_path):
log_message(f"⚠️ Multi-part RAR detected but first part not found: {first_part}")
return False
rar_path = first_part_path
log_message(f"📦 Processing multi-part RAR starting with: {first_part}")
for attempt in range(max_retries):
try:
os.makedirs(output_dir, exist_ok=True)
patoolib.extract_archive(rar_path, outdir=output_dir, verbosity=-1)
log_message(f"✅ Successfully extracted: {filename} using patoolib")
return True
except PatoolError as e:
log_message(f"⚠️ patoolib extraction attempt {attempt + 1} failed: {str(e)}")
time.sleep(1)
except Exception as e:
log_message(f"❌ patoolib exception: {str(e)}")
time.sleep(1)
return False
def log_failed_file(filename: str, error: str):
"""Log failed files to persistent file"""
with open(FAILED_FILES_LOG, "a") as f:
f.write(f'{time.strftime("%Y-%m-%d %H:%M:%S")} - {filename}: {error}\n')
def get_disk_usage(path: str) -> Dict[str, float]:
"""Get disk usage statistics in GB"""
statvfs = os.statvfs(path)
total = statvfs.f_frsize * statvfs.f_blocks / (1024**3)
free = statvfs.f_frsize * statvfs.f_bavail / (1024**3)
used = total - free
return {"total": total, "free": free, "used": used}
def check_disk_space(path: str = ".") -> bool:
"""Check if there\'s enough disk space"""
disk_info = get_disk_usage(path)
if disk_info["free"] < MIN_FREE_SPACE_GB:
log_message(f'⚠️ Low disk space: {disk_info["free"]:.2f}GB free, {disk_info["used"]:.2f}GB used')
return False
return True
def cleanup_temp_files():
"""Clean up temporary files to free space"""
log_message("🧹 Cleaning up temporary files...")
# Clean old downloads (keep only current processing file)
current_file = processing_status.get("current_file")
for file in os.listdir(DOWNLOAD_FOLDER):
if file != current_file and file.endswith((".rar", ".zip")):
try:
os.remove(os.path.join(DOWNLOAD_FOLDER, file))
log_message(f"🗑️ Removed old download: {file}")
except:
pass
def load_json_state(file_path: str, default_value):
"""Load state from JSON file"""
if os.path.exists(file_path):
try:
with open(file_path, "r") as f:
return json.load(f)
except json.JSONDecodeError:
log_message(f"⚠️ Corrupted state file: {file_path}")
return default_value
def save_json_state(file_path: str, data):
"""Save state to JSON file"""
with open(file_path, "w") as f:
json.dump(data, f, indent=2)
def download_with_retry(url: str, dest_path: str, max_retries: int = 3) -> bool:
"""Download file with retry logic and disk space checking"""
if not check_disk_space():
cleanup_temp_files()
if not check_disk_space():
log_message("❌ Insufficient disk space even after cleanup")
return False
headers = {"Authorization": f"Bearer {HF_TOKEN}"}
for attempt in range(max_retries):
try:
with requests.get(url, headers=headers, stream=True) as r:
r.raise_for_status()
# Check content length if available
content_length = r.headers.get("content-length")
if content_length:
size_gb = int(content_length) / (1024**3)
disk_info = get_disk_usage(".")
if size_gb > disk_info["free"] - 0.5: # Leave 0.5GB buffer
log_message(f'❌ File too large: {size_gb:.2f}GB, only {disk_info["free"]:.2f}GB free')
return False
with open(dest_path, "wb") as f:
for chunk in r.iter_content(chunk_size=8192):
f.write(chunk)
return True
except Exception as e:
if attempt < max_retries - 1:
time.sleep(2 ** attempt)
continue
log_message(f"❌ Download failed after {max_retries} attempts: {e}")
return False
return False
def is_multipart_rar(filename: str) -> bool:
"""Check if this is a multi-part RAR file"""
return ".part" in filename.lower() and filename.lower().endswith(".rar")
def get_rar_part_base(filename: str) -> str:
"""Get the base name for multi-part RAR files"""
if ".part" in filename.lower():
return filename.split(".part")[0]
return filename.replace(".rar", "")
def ensure_dir(path):
os.makedirs(path, exist_ok=True)
def extract_frames(video_path, output_dir, fps=DEFAULT_FPS):
"""Extract frames from video at the specified frames per second (fps)."""
log_message(f"[INFO] Extracting frames from {video_path} to {output_dir} at {fps} fps...")
ensure_dir(output_dir)
cap = cv2.VideoCapture(str(video_path))
if not cap.isOpened():
log_message(f"[ERROR] Failed to open video file: {video_path}")
return 0
video_fps = cap.get(cv2.CAP_PROP_FPS)
if not video_fps or video_fps <= 0:
video_fps = 30 # fallback if FPS is not available
log_message(f"[WARN] Using fallback FPS: {video_fps}")
frame_interval = int(round(video_fps / fps))
frame_idx = 0
saved_idx = 1
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
log_message(f"[DEBUG] Total frames in video: {total_frames}")
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
if frame_idx % frame_interval == 0:
if saved_idx <= 10: # Limit to 10 frames for testing
frame_name = f"{saved_idx:04d}.png"
cv2.imwrite(str(Path(output_dir) / frame_name), frame)
saved_idx += 1
else:
break # Stop extracting after 10 frames
frame_idx += 1
cap.release()
log_message(f"Extracted {saved_idx-1} frames from {video_path} to {output_dir}")
return saved_idx - 1
def analyze_single_frame(image_path: str, preprompt: str = "") -> dict:
"""Consistent frame processing function with robust error handling"""
if not vision_model or not vision_processor:
return {
"image": os.path.basename(image_path),
"description": "[ERROR] Model not loaded",
"success": False
}
try:
# Load and resize image
image = Image.open(image_path).convert("RGB")
image = image.resize((224, 224))
# Ensure tokenizer padding config is safe
tokenizer = vision_processor.tokenizer
if tokenizer.pad_token is None:
tokenizer.pad_token = tokenizer.eos_token
tokenizer.padding_side = "right"
# Preprocess inputs
inputs = vision_processor(
images=[image],
text=preprompt,
return_tensors="pt",
padding="max_length",
truncation=True,
max_length=512
).to(device)
# Safety: check pixel_values shape
pixel_values = inputs["pixel_values"]
if pixel_values.dim() == 3:
pixel_values = pixel_values.unsqueeze(0)
# Generate caption
with torch.no_grad():
outputs = vision_model.generate(
input_ids=inputs["input_ids"],
attention_mask=inputs["attention_mask"],
pixel_values=pixel_values,
max_new_tokens=500,
num_beams=5,
early_stopping=False,
pad_token_id=tokenizer.pad_token_id
)
caption = vision_processor.batch_decode(
outputs,
skip_special_tokens=True
)[0].strip()
return {
"image": os.path.basename(image_path),
"description": caption,
"success": True
}
except Exception as e:
return {
"image": os.path.basename(image_path),
"description": f"[ERROR] {str(e)}",
"success": False
}
def process_video_frames(frames_dir: str, video_filename: str, output_file: str) -> bool:
"""Main processing function with first-frame validation"""
try:
frames = sorted(Path(frames_dir).glob("*.png"))
if not frames:
print("❌ No frames found in directory")
return False
# Validate first frame
first_frame_result = analyze_single_frame(str(frames[0]), get_preprompt(video_filename))
print("\n=== FIRST FRAME VALIDATION ===")
print(f'Image: {first_frame_result["image"]}')
print(f'Result: {first_frame_result["description"]}')
print(f'Status: {"Success" if first_frame_result["success"] else "Failed"}\n')
if not first_frame_result["success"]:
print("❌ Aborting due to first frame failure")
return False
preprompt = get_preprompt(video_filename)
results = {
"metadata": {
"video": video_filename,
"preprompt": preprompt,
"total_frames": len(frames),
"processed_frames": 0,
"failed_frames": 0
},
"frames": []
}
for i, frame_path in enumerate(frames):
result = analyze_single_frame(str(frame_path), preprompt)
results["frames"].append(result)
if result["success"]:
results["metadata"]["processed_frames"] += 1
else:
results["metadata"]["failed_frames"] += 1
# Periodic saving
if i % 10 == 0:
with open(output_file, "w") as f:
json.dump(results, f, indent=2)
# Final save
with open(output_file, "w") as f:
json.dump(results, f, indent=2)
return True
except Exception as e:
print(f"❌ Processing failed: {str(e)}")
return False
def summarize_activities(frame_analyses: List[Dict]) -> Dict:
"""Summarize activities from frame analyses."""
return {}
def process_rar_file(rar_path: str) -> bool:
"""Process a single RAR file with new frame processing"""
filename = os.path.basename(rar_path)
processing_status["current_file"] = filename
# Handle multi-part RAR naming
if is_multipart_rar(filename):
course_name = get_rar_part_base(filename)
else:
course_name = filename.replace(".rar", "")
extract_dir = os.path.join(EXTRACT_FOLDER, course_name)
try:
log_message(f"🔄 Processing: {filename}")
# Clean up any existing directory
if os.path.exists(extract_dir):
shutil.rmtree(extract_dir, ignore_errors=True)
# Extract RAR
os.makedirs(extract_dir, exist_ok=True)
if not extract_with_retry(rar_path, extract_dir):
raise Exception("RAR extraction failed")
# Process video files
video_files = []
for root, _, files in os.walk(extract_dir):
for file in files:
if file.lower().endswith((".mp4", ".avi", ".mov", ".mkv")):
video_files.append(os.path.join(root, file))
processing_status["extracted_courses"] += 1
log_message(f"✅ Extracted {len(video_files)} videos from \'{course_name}\'")
# Process each video
for video_path in video_files:
video_filename = Path(video_path).name
video_filename_clean = video_filename.replace(".", "_")
frames_dir = os.path.join(FRAMES_OUTPUT_FOLDER, f"{course_name}_{video_filename_clean}_frames")
ensure_dir(frames_dir)
# Extract frames
extracted_count = extract_frames(video_path, frames_dir, DEFAULT_FPS)
if extracted_count == 0:
raise Exception(f"No frames extracted from {video_filename}")
processing_status["extracted_frames_count"] += extracted_count
# Analyze frames
video_filename_clean = video_filename.replace(".", "_")
analysis_output = os.path.join(ANALYSIS_OUTPUT_FOLDER, f"{course_name}_{video_filename_clean}_analysis.json")
if process_video_frames(frames_dir, video_filename, analysis_output):
processing_status["analyzed_frames_count"] += extracted_count
processing_status["extracted_videos"] += 1
else:
raise Exception(f"Frame analysis failed for {video_filename}")
return True
except Exception as e:
error_msg = str(e)
log_message(f"❌ Processing failed: {error_msg}")
log_failed_file(filename, error_msg)
return False
finally:
processing_status["current_file"] = None
def main_processing_loop(start_index: int = 0):
"""Main processing workflow - extraction, frame extraction, and vision analysis"""
processing_status["is_running"] = True
try:
# Load state
processed_rars = load_json_state(PROCESS_STATE_FILE, {"processed_rars": []})["processed_rars"]
download_state = load_json_state(DOWNLOAD_STATE_FILE, {"next_download_index": 0})
# Use start_index if provided, otherwise use the saved state
next_index = start_index if start_index > 0 else download_state["next_download_index"]
log_message(f"📊 Starting from index {next_index}")
log_message(f"📊 Previously processed: {len(processed_rars)} files")
# Get file list
try:
files = list(hf_api.list_repo_files(repo_id=SOURCE_REPO_ID, repo_type="dataset"))
rar_files = sorted([f for f in files if f.endswith(".rar")])
processing_status["total_files"] = len(rar_files)
log_message(f"📁 Found {len(rar_files)} RAR files in repository")
if next_index >= len(rar_files):
log_message("✅ All files have been processed!")
return
except Exception as e:
log_message(f"❌ Failed to get file list: {str(e)}")
return
# Process only one file per run
if next_index < len(rar_files):
rar_file = rar_files[next_index]
filename = os.path.basename(rar_file)
if filename in processed_rars:
log_message(f"⏭️ Skipping already processed: {filename}")
processing_status["processed_files"] += 1
# Move to next file
next_index += 1
save_json_state(DOWNLOAD_STATE_FILE, {"next_download_index": next_index})
log_message(f"📊 Moving to next file. Progress: {next_index}/{len(rar_files)}")
return
log_message(f"📥 Downloading: {filename}")
dest_path = os.path.join(DOWNLOAD_FOLDER, filename)
# Download file
download_url = f"https://huggingface.co/datasets/{SOURCE_REPO_ID}/resolve/main/{rar_file}"
if download_with_retry(download_url, dest_path):
# Process file
if process_rar_file(dest_path):
processed_rars.append(filename)
save_json_state(PROCESS_STATE_FILE, {"processed_rars": processed_rars})
log_message(f"✅ Successfully processed: {filename}")
processing_status["processed_files"] += 1
else:
log_message(f"❌ Failed to process: {filename}")
processing_status["failed_files"] += 1
# Clean up downloaded file
try:
os.remove(dest_path)
log_message(f"🗑️ Cleaned up download: {filename}")
except:
pass
else:
log_message(f"❌ Failed to download: {filename}")
processing_status["failed_files"] += 1
# Update download state for next run
next_index += 1
save_json_state(DOWNLOAD_STATE_FILE, {"next_download_index": next_index})
# Status update
log_message(f"📊 Progress: {next_index}/{len(rar_files)} files processed")
log_message(f"📊 Extracted: {processing_status['extracted_courses']} courses")
log_message(f"📊 Videos Processed: {processing_status['extracted_videos']} videos")
log_message(f"📊 Frames Extracted: {processing_status['extracted_frames_count']} frames")
log_message(f"📊 Frames Analyzed: {processing_status['analyzed_frames_count']} frames")
log_message(f"📊 Failed: {processing_status['failed_files']} files")
if next_index < len(rar_files):
log_message(f"🔄 Run the script again to process the next file: {os.path.basename(rar_files[next_index])}")
else:
log_message("🎉 All files have been processed!")
else:
log_message("✅ All files have been processed!")
log_message(f"🎉 Processing complete!")
log_message(f"📊 Final stats: {processing_status['extracted_courses']} courses extracted, {processing_status['extracted_videos']} videos processed, {processing_status['extracted_frames_count']} frames extracted, {processing_status['analyzed_frames_count']} frames analyzed")
except KeyboardInterrupt:
log_message("⏹️ Processing interrupted by user")
except Exception as e:
log_message(f"❌ Fatal error: {str(e)}")
finally:
processing_status["is_running"] = False
cleanup_temp_files()
# FastAPI Endpoints
@app.post("/analyze-video")
async def analyze_video_endpoint(
file: UploadFile = File(...),
fps: float = Form(DEFAULT_FPS),
prompt: Optional[str] = Form(None)
):
"""Analyze a single video file and return frame-by-frame analysis."""
if not file.filename.lower().endswith((".mp4", ".avi", ".mov", ".mkv")):
return JSONResponse(status_code=400, content={
"error": "File type not allowed",
"allowed_types": [".mp4", ".avi", ".mov", ".mkv"]
})
with tempfile.TemporaryDirectory() as temp_dir:
temp_dir_path = Path(temp_dir)
file_path = temp_dir_path / file.filename
with open(file_path, "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
frames_dir = temp_dir_path / "frames"
frame_count = extract_frames(file_path, frames_dir, fps)
frame_analyses = []
for frame_file in sorted(frames_dir.glob("*.png")):
analysis = analyze_single_frame(str(frame_file), prompt or "")
frame_analyses.append(analysis)
summary = summarize_activities(frame_analyses)
return JSONResponse(content={
"video_filename": file.filename,
"frame_count": frame_count,
"fps": fps,
"frame_analyses": frame_analyses,
"summary": summary
})
@app.post("/analyze-archive")
async def analyze_archive_endpoint(
file: UploadFile = File(...),
fps: float = Form(DEFAULT_FPS),
prompt: Optional[str] = Form(None)
):
"""Analyze videos from RAR/ZIP archive and return frame-by-frame analysis."""
if not file.filename.lower().endswith((".rar", ".zip")):
return JSONResponse(status_code=400, content={
"error": "File type not allowed",
"allowed_types": [".rar", ".zip"]
})
with tempfile.TemporaryDirectory() as temp_dir:
temp_dir_path = Path(temp_dir)
file_path = temp_dir_path / file.filename
with open(file_path, "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
extract_dir = temp_dir_path / "extracted"
video_files = []
if file.filename.lower().endswith(".rar"):
with rarfile.RarFile(file_path) as rf:
rf.extractall(extract_dir)
else:
with zipfile.ZipFile(file_path) as zf:
zf.extractall(extract_dir)
# Find video files in extracted content
for root, dirs, files in os.walk(extract_dir):
for file in files:
if file.lower().endswith((".mp4", ".avi", ".mov", ".mkv")):
video_files.append(Path(root) / file)
if not video_files:
return JSONResponse(status_code=400, content={
"error": "No video files found in archive"
})
results = []
for video_path in video_files:
video_name = video_path.name
frames_dir = temp_dir_path / f"frames_{video_name}"
frame_count = extract_frames(video_path, frames_dir, fps)
frame_analyses = []
for frame_file in sorted(frames_dir.glob("*.png")):
analysis = analyze_single_frame(str(frame_file), prompt or "")
frame_analyses.append(analysis)
summary = summarize_activities(frame_analyses)
results.append({
"video_filename": video_name,
"frame_count": frame_count,
"fps": fps,
"frame_analyses": frame_analyses,
"summary": summary
})
return JSONResponse(content={
"archive_filename": file.filename,
"videos_processed": len(video_files),
"results": results
})
@app.get("/health")
async def health_check():
"""Health check endpoint."""
return JSONResponse(content={
"status": "healthy",
"model": "GIT",
"note": "Now using GIT model."
})
@app.get("/status")
async def get_processing_status():
"""Get current processing status."""
return JSONResponse(content=processing_status)
# Expose necessary functions and variables
__all__ = [
"main_processing_loop",
"processing_status",
"ANALYSIS_OUTPUT_FOLDER",
"log_message",
"analyze_single_frame",
"extract_frames",
"DEFAULT_FPS",
"ensure_dir"
]