VS2 / vision_analyzer.py
Factor Studios
Upload 28 files
c8ba96f verified
import os
import json
import requests
import subprocess
import shutil
import time
import re
import threading
from typing import Dict, List, Set, Optional
from huggingface_hub import HfApi, list_repo_files
from fastapi import FastAPI, File, UploadFile, Form
from fastapi.responses import JSONResponse
from pathlib import Path
import smtplib
from email.message import EmailMessage
import tempfile
import rarfile
import zipfile
import cv2
import numpy as np
from PIL import Image
import torch
from transformers import AutoProcessor, AutoModelForCausalLM
# Initialize FastAPI
app = FastAPI()
# ==== CONFIGURATION ====
HF_TOKEN = os.getenv("HF_TOKEN", "")
SOURCE_REPO_ID = os.getenv("SOURCE_REPO", "Fred808/BG1")
# Path Configuration
DOWNLOAD_FOLDER = "downloads"
EXTRACT_FOLDER = "extracted"
FRAMES_OUTPUT_FOLDER = "extracted_frames"
ANALYSIS_OUTPUT_FOLDER = "analysis_results"
os.makedirs(DOWNLOAD_FOLDER, exist_ok=True)
os.makedirs(EXTRACT_FOLDER, exist_ok=True)
os.makedirs(FRAMES_OUTPUT_FOLDER, exist_ok=True)
os.makedirs(ANALYSIS_OUTPUT_FOLDER, exist_ok=True)
# State Files
DOWNLOAD_STATE_FILE = "download_progress.json"
PROCESS_STATE_FILE = "process_progress.json"
FAILED_FILES_LOG = "failed_files.log"
# Processing Parameters
CHUNK_SIZE = 1
PROCESSING_DELAY = 2
MAX_RETRIES = 3
MIN_FREE_SPACE_GB = 2 # Minimum free space in GB before processing
# Frame Extraction Parameters
DEFAULT_FPS = 3 # Default frames per second for extraction
# Initialize HF API
hf_api = HfApi(token=HF_TOKEN)
# Global State
processing_status = {
"is_running": False,
"current_file": None,
"total_files": 0,
"processed_files": 0,
"failed_files": 0,
"extracted_courses": 0,
"extracted_videos": 0,
"extracted_frames_count": 0,
"analyzed_frames_count": 0,
"last_update": None,
"logs": []
}
import torch
import subprocess
import sys
device = "cpu" # Explicitly ensure CPU usage
try:
# Load the model, forcing the 'eager' (CPU-compatible) attention implementation
vision_language_model_large = AutoModelForCausalLM.from_pretrained(
"microsoft/Florence-2-Base",
trust_remote_code=True
).to(device).eval()
vision_language_processor_large = AutoProcessor.from_pretrained(
"microsoft/Florence-2-Base",
trust_remote_code=True
)
print("Florence-2 large model and processor loaded successfully on CPU using eager attention.")
except Exception as e:
print(f"Error loading Florence-2 model on CPU: {e}")
print("Please ensure you have enough RAM and a compatible PyTorch version.")
vision_language_model_large = None
vision_language_processor_large = None
def log_message(message: str):
"""Log messages with timestamp"""
timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
log_entry = f"[{timestamp}] {message}"
print(log_entry)
processing_status["logs"].append(log_entry)
processing_status["last_update"] = timestamp
if len(processing_status["logs"]) > 100:
processing_status["logs"] = processing_status["logs"][-100:]
def log_failed_file(filename: str, error: str):
"""Log failed files to persistent file"""
with open(FAILED_FILES_LOG, "a") as f:
f.write(f"{time.strftime('%Y-%m-%d %H:%M:%S')} - {filename}: {error}\n")
def get_disk_usage(path: str) -> Dict[str, float]:
"""Get disk usage statistics in GB"""
statvfs = os.statvfs(path)
total = statvfs.f_frsize * statvfs.f_blocks / (1024**3)
free = statvfs.f_frsize * statvfs.f_bavail / (1024**3)
used = total - free
return {"total": total, "free": free, "used": used}
def check_disk_space(path: str = ".") -> bool:
"""Check if there's enough disk space"""
disk_info = get_disk_usage(path)
if disk_info["free"] < MIN_FREE_SPACE_GB:
log_message(f'⚠️ Low disk space: {disk_info["free"]:.2f}GB free, {disk_info["used"]:.2f}GB used')
return False
return True
def cleanup_temp_files():
"""Clean up temporary files to free space"""
log_message("🧹 Cleaning up temporary files...")
# Clean old downloads (keep only current processing file)
current_file = processing_status.get("current_file")
for file in os.listdir(DOWNLOAD_FOLDER):
if file != current_file and file.endswith((".rar", ".zip")):
try:
os.remove(os.path.join(DOWNLOAD_FOLDER, file))
log_message(f"🗑️ Removed old download: {file}")
except:
pass
def load_json_state(file_path: str, default_value):
"""Load state from JSON file"""
if os.path.exists(file_path):
try:
with open(file_path, "r") as f:
return json.load(f)
except json.JSONDecodeError:
log_message(f"⚠️ Corrupted state file: {file_path}")
return default_value
def save_json_state(file_path: str, data):
"""Save state to JSON file"""
with open(file_path, "w") as f:
json.dump(data, f, indent=2)
def download_with_retry(url: str, dest_path: str, max_retries: int = 3) -> bool:
"""Download file with retry logic and disk space checking"""
if not check_disk_space():
cleanup_temp_files()
if not check_disk_space():
log_message("❌ Insufficient disk space even after cleanup")
return False
headers = {"Authorization": f"Bearer {HF_TOKEN}"}
for attempt in range(max_retries):
try:
with requests.get(url, headers=headers, stream=True) as r:
r.raise_for_status()
# Check content length if available
content_length = r.headers.get("content-length")
if content_length:
size_gb = int(content_length) / (1024**3)
disk_info = get_disk_usage(".")
if size_gb > disk_info["free"] - 0.5: # Leave 0.5GB buffer
log_message(f'❌ File too large: {size_gb:.2f}GB, only {disk_info["free"]:.2f}GB free')
return False
with open(dest_path, "wb") as f:
for chunk in r.iter_content(chunk_size=8192):
f.write(chunk)
return True
except Exception as e:
if attempt < max_retries - 1:
time.sleep(2 ** attempt)
continue
log_message(f"❌ Download failed after {max_retries} attempts: {e}")
return False
return False
def is_multipart_rar(filename: str) -> bool:
"""Check if this is a multi-part RAR file"""
return ".part" in filename.lower() and filename.lower().endswith(".rar")
def get_rar_part_base(filename: str) -> str:
"""Get the base name for multi-part RAR files"""
if ".part" in filename.lower():
return filename.split(".part")[0]
return filename.replace(".rar", "")
def extract_with_retry(rar_path: str, output_dir: str, max_retries: int = 2) -> bool:
"""Extract RAR with retry and recovery, handling multi-part archives"""
filename = os.path.basename(rar_path)
# For multi-part RARs, we need the first part
if is_multipart_rar(filename):
base_name = get_rar_part_base(filename)
first_part = f"{base_name}.part01.rar"
first_part_path = os.path.join(os.path.dirname(rar_path), first_part)
if not os.path.exists(first_part_path):
log_message(f"⚠️ Multi-part RAR detected but first part not found: {first_part}")
return False
rar_path = first_part_path
log_message(f"📦 Processing multi-part RAR starting with: {first_part}")
for attempt in range(max_retries):
try:
# Test RAR first
test_cmd = ["unrar", "t", rar_path]
test_result = subprocess.run(test_cmd, capture_output=True, text=True)
if test_result.returncode != 0:
log_message(f"⚠️ RAR test failed: {test_result.stderr}")
if attempt == max_retries - 1:
return False
continue
# Extract RAR
cmd = ["unrar", "x", "-o+", rar_path, output_dir]
if attempt > 0: # Try recovery on subsequent attempts
cmd.insert(2, "-kb")
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode == 0:
log_message(f"✅ Successfully extracted: {os.path.basename(rar_path)}")
return True
else:
error_msg = result.stderr or result.stdout
log_message(f"⚠️ Extraction attempt {attempt + 1} failed: {error_msg}")
if "checksum error" in error_msg.lower() or "CRC failed" in error_msg:
log_message(f"⚠️ Data corruption detected, attempt {attempt + 1}")
elif result.returncode == 10:
log_message(f"⚠️ No files to extract (exit code 10)")
return False
elif result.returncode == 1:
log_message(f"⚠️ Non-fatal error (exit code 1)")
except Exception as e:
log_message(f"❌ Extraction exception: {str(e)}")
if attempt == max_retries - 1:
return False
time.sleep(1)
return False
def ensure_dir(path):
os.makedirs(path, exist_ok=True)
def extract_frames(video_path, output_dir, fps=DEFAULT_FPS):
"""Extract frames from video at the specified frames per second (fps)."""
log_message(f"[INFO] Extracting frames from {video_path} to {output_dir} at {fps} fps...")
ensure_dir(output_dir)
cap = cv2.VideoCapture(str(video_path))
if not cap.isOpened():
log_message(f"[ERROR] Failed to open video file: {video_path}")
return 0
video_fps = cap.get(cv2.CAP_PROP_FPS)
if not video_fps or video_fps <= 0:
video_fps = 30 # fallback if FPS is not available
log_message(f"[WARN] Using fallback FPS: {video_fps}")
frame_interval = int(round(video_fps / fps))
frame_idx = 0
saved_idx = 1
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
log_message(f"[DEBUG] Total frames in video: {total_frames}")
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
if frame_idx % frame_interval == 0:
frame_name = f"{saved_idx:04d}.png"
cv2.imwrite(str(Path(output_dir) / frame_name), frame)
saved_idx += 1
frame_idx += 1
cap.release()
log_message(f"Extracted {saved_idx-1} frames from {video_path} to {output_dir}")
return saved_idx - 1
def analyze_frame_with_florence2(image_path: str, prompt: str = "<CAPTION>") -> Dict:
"""Analyze a single frame using Florence-2 vision model."""
if not vision_language_model_large or not vision_language_processor_large:
return {
"image": os.path.basename(image_path),
"description": "[ERROR] Vision model not loaded."
}
image = Image.open(image_path).convert("RGB")
inputs = vision_language_processor_large(images=image, text=prompt, return_tensors="pt").to(device)
with torch.no_grad():
generated_ids = vision_language_model_large.generate(
input_ids=inputs["input_ids"],
pixel_values=inputs["pixel_values"],
max_new_tokens=512,
do_sample=False,
num_beams=3
)
generated_text = vision_language_processor_large.batch_decode(generated_ids, skip_special_tokens=False)[0]
description = vision_language_processor_large.post_process_generation(
generated_text,
task="<CAPTION>",
image_size=(image.width, image.height)
)["<CAPTION>"]
return {
"image": os.path.basename(image_path),
"description": description
}
def summarize_activities(frame_analyses: List[Dict]) -> Dict:
"""Summarize activities from frame analyses."""
return {
# "steps": [
# {
# "action": "Open Blender software",
# "description": "User launches Blender 3D modeling application on their computer"
# },
# {
# "action": "Create 3D object",
# "description": "User works with a default cube object in the 3D viewport"
# },
# {
# "action": "Manipulate 3D model",
# "description": "User rotates and transforms the cube using mouse interactions"
# },
# {
# "action": "Navigate interface",
# "description": "User explores different tools and panels in the Blender interface"
# }
# ],
# "high_level_goal": "Learning basic 3D modeling operations in Blender software",
# "creative_actions": "3D object manipulation, interface navigation, basic modeling workflow",
# "objects": ["computer", "monitor", "mouse", "keyboard", "Blender software", "3D cube", "desktop interface"],
# "final_goal": "Introduction to Blender 3D modeling fundamentals and basic object manipulation"
}
def analyze_frames(frames_dir: str, output_json_path: str, prompt: Optional[str] = None) -> int:
"""Analyze all frames in directory using Florence-2 model."""
log_message(f"[INFO] Analyzing frames in {frames_dir}...")
frames_dir = Path(frames_dir).resolve()
output_json_path = Path(output_json_path).resolve()
ensure_dir(frames_dir)
ensure_dir(output_json_path.parent)
frame_analyses = []
analyzed_count = 0
for frame_file in sorted(frames_dir.glob("*.png")):
analysis = analyze_frame_with_florence2(str(frame_file), prompt)
frame_analyses.append(analysis)
analyzed_count += 1
# Generate summary
summary = summarize_activities(frame_analyses)
# Save results
results = {
"frame_analyses": frame_analyses,
"summary": summary
}
try:
with open(output_json_path, "w") as f:
json.dump(results, f, indent=2)
log_message(f"[SUCCESS] Analysis results saved to {output_json_path}")
except Exception as e:
log_message(f"[ERROR] Failed to write output JSON: {e}")
return analyzed_count
def process_rar_file(rar_path: str) -> bool:
"""Process a single RAR file - extract, then process videos for frames and vision analysis"""
filename = os.path.basename(rar_path)
processing_status["current_file"] = filename
# Handle multi-part RAR naming
if is_multipart_rar(filename):
course_name = get_rar_part_base(filename)
else:
course_name = filename.replace(".rar", "")
extract_dir = os.path.join(EXTRACT_FOLDER, course_name)
try:
log_message(f"🔄 Processing: {filename}")
# Clean up any existing directory
if os.path.exists(extract_dir):
shutil.rmtree(extract_dir, ignore_errors=True)
# Extract RAR
os.makedirs(extract_dir, exist_ok=True)
if not extract_with_retry(rar_path, extract_dir):
raise Exception("RAR extraction failed")
# Count extracted files
file_count = 0
video_files_found = []
for root, dirs, files in os.walk(extract_dir):
for file in files:
file_count += 1
if file.lower().endswith((".mp4", ".avi", ".mov", ".mkv")):
video_files_found.append(os.path.join(root, file))
processing_status["extracted_courses"] += 1
log_message(f"✅ Successfully extracted '{course_name}' ({file_count} files, {len(video_files_found)} videos)")
# Process video files for frame extraction and vision analysis
for video_path in video_files_found:
video_filename = Path(video_path).name
# Create a unique output directory for frames for each video
frames_output_dir = os.path.join(FRAMES_OUTPUT_FOLDER, f"{course_name}_{video_filename.replace('.', '_')}_frames")
ensure_dir(frames_output_dir)
extracted_frames_count = extract_frames(video_path, frames_output_dir, fps=DEFAULT_FPS)
processing_status["extracted_frames_count"] += extracted_frames_count
if extracted_frames_count > 0:
processing_status["extracted_videos"] += 1
log_message(f"[INFO] Extracted {extracted_frames_count} frames from {video_filename}")
# Perform vision analysis on the extracted frames
analysis_output_json = os.path.join(ANALYSIS_OUTPUT_FOLDER, f"{course_name}_{video_filename.replace('.', '_')}_analysis.json")
analyzed_frames = analyze_frames(frames_output_dir, analysis_output_json)
processing_status["analyzed_frames_count"] += analyzed_frames
log_message(f"[INFO] Analyzed {analyzed_frames} frames from {video_filename}")
else:
log_message(f"[WARN] No frames extracted from {video_filename}")
return True
except Exception as e:
error_msg = str(e)
log_message(f"❌ Processing failed: {error_msg}")
log_failed_file(filename, error_msg)
return False
finally:
processing_status["current_file"] = None
def main_processing_loop(start_index: int = 0):
"""Main processing workflow - extraction, frame extraction, and vision analysis"""
processing_status["is_running"] = True
try:
# Load state
processed_rars = load_json_state(PROCESS_STATE_FILE, {"processed_rars": []})["processed_rars"]
download_state = load_json_state(DOWNLOAD_STATE_FILE, {"next_download_index": 0})
# Use start_index if provided, otherwise use the saved state
next_index = start_index if start_index > 0 else download_state["next_download_index"]
log_message(f"📊 Starting from index {next_index}")
log_message(f"📊 Previously processed: {len(processed_rars)} files")
# Get file list
try:
files = list(hf_api.list_repo_files(repo_id=SOURCE_REPO_ID, repo_type="dataset"))
rar_files = sorted([f for f in files if f.endswith(".rar")])
processing_status["total_files"] = len(rar_files)
log_message(f"📁 Found {len(rar_files)} RAR files in repository")
if next_index >= len(rar_files):
log_message("✅ All files have been processed!")
return
except Exception as e:
log_message(f"❌ Failed to get file list: {str(e)}")
return
# Process only one file per run
if next_index < len(rar_files):
rar_file = rar_files[next_index]
filename = os.path.basename(rar_file)
if filename in processed_rars:
log_message(f"⏭️ Skipping already processed: {filename}")
processing_status["processed_files"] += 1
# Move to next file
next_index += 1
save_json_state(DOWNLOAD_STATE_FILE, {"next_download_index": next_index})
log_message(f"📊 Moving to next file. Progress: {next_index}/{len(rar_files)}")
return
log_message(f"📥 Downloading: {filename}")
dest_path = os.path.join(DOWNLOAD_FOLDER, filename)
# Download file
download_url = f"https://huggingface.co/datasets/{SOURCE_REPO_ID}/resolve/main/{rar_file}"
if download_with_retry(download_url, dest_path):
# Process file
if process_rar_file(dest_path):
processed_rars.append(filename)
save_json_state(PROCESS_STATE_FILE, {"processed_rars": processed_rars})
log_message(f"✅ Successfully processed: {filename}")
processing_status["processed_files"] += 1
else:
log_message(f"❌ Failed to process: {filename}")
processing_status["failed_files"] += 1
# Clean up downloaded file
try:
os.remove(dest_path)
log_message(f"🗑️ Cleaned up download: {filename}")
except:
pass
else:
log_message(f"❌ Failed to download: {filename}")
processing_status["failed_files"] += 1
# Update download state for next run
next_index += 1
save_json_state(DOWNLOAD_STATE_FILE, {"next_download_index": next_index})
# Status update
log_message(f"📊 Progress: {next_index}/{len(rar_files)} files processed")
log_message(f'📊 Extracted: {processing_status["extracted_courses"]} courses')
log_message(f'📊 Videos Processed: {processing_status["extracted_videos"]}')
log_message(f'📊 Frames Extracted: {processing_status["extracted_frames_count"]}')
log_message(f'📊 Frames Analyzed: {processing_status["analyzed_frames_count"]}')
log_message(f'📊 Failed: {processing_status["failed_files"]} files')
if next_index < len(rar_files):
log_message(f"🔄 Run the script again to process the next file: {os.path.basename(rar_files[next_index])}")
else:
log_message("🎉 All files have been processed!")
else:
log_message("✅ All files have been processed!")
log_message("🎉 Processing complete!")
log_message(f'📊 Final stats: {processing_status["extracted_courses"]} courses extracted, {processing_status["extracted_videos"]} videos processed, {processing_status["extracted_frames_count"]} frames extracted, {processing_status["analyzed_frames_count"]} frames analyzed')
except KeyboardInterrupt:
log_message("⏹️ Processing interrupted by user")
except Exception as e:
log_message(f"❌ Fatal error: {str(e)}")
finally:
processing_status["is_running"] = False
cleanup_temp_files()
# FastAPI Endpoints
@app.post("/analyze-video")
async def analyze_video_endpoint(
file: UploadFile = File(...),
fps: int = Form(DEFAULT_FPS),
prompt: Optional[str] = Form(None)
):
"""Analyze a single video file and return frame-by-frame analysis."""
if not file.filename.lower().endswith((".mp4", ".avi", ".mov", ".mkv")):
return JSONResponse(status_code=400, content={
"error": "File type not allowed",
"allowed_types": [".mp4", ".avi", ".mov", ".mkv"]
})
with tempfile.TemporaryDirectory() as temp_dir:
temp_dir_path = Path(temp_dir)
file_path = temp_dir_path / file.filename
with open(file_path, "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
frames_dir = temp_dir_path / "frames"
frame_count = extract_frames(file_path, frames_dir, fps)
frame_analyses = []
for frame_file in sorted(frames_dir.glob("*.png")):
analysis = analyze_frame_with_florence2(str(frame_file), prompt)
frame_analyses.append(analysis)
summary = summarize_activities(frame_analyses)
return JSONResponse(content={
"video_filename": file.filename,
"frame_count": frame_count,
"fps": fps,
"frame_analyses": frame_analyses,
"summary": summary
})
@app.post("/analyze-archive")
async def analyze_archive_endpoint(
file: UploadFile = File(...),
fps: int = Form(DEFAULT_FPS),
prompt: Optional[str] = Form(None)
):
"""Analyze videos from RAR/ZIP archive and return frame-by-frame analysis."""
if not file.filename.lower().endswith((".rar", ".zip")):
return JSONResponse(status_code=400, content={
"error": "File type not allowed",
"allowed_types": [".rar", ".zip"]
})
with tempfile.TemporaryDirectory() as temp_dir:
temp_dir_path = Path(temp_dir)
file_path = temp_dir_path / file.filename
with open(file_path, "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
extract_dir = temp_dir_path / "extracted"
video_files = []
if file.filename.lower().endswith(".rar"):
with rarfile.RarFile(file_path) as rf:
rf.extractall(extract_dir)
else:
with zipfile.ZipFile(file_path) as zf:
zf.extractall(extract_dir)
# Find video files in extracted content
for root, dirs, files in os.walk(extract_dir):
for file in files:
if file.lower().endswith((".mp4", ".avi", ".mov", ".mkv")):
video_files.append(Path(root) / file)
if not video_files:
return JSONResponse(status_code=400, content={
"error": "No video files found in archive"
})
results = []
for video_path in video_files:
video_name = video_path.name
frames_dir = temp_dir_path / f"frames_{video_name}"
frame_count = extract_frames(video_path, frames_dir, fps)
frame_analyses = []
for frame_file in sorted(frames_dir.glob("*.png")):
analysis = analyze_frame_with_florence2(str(frame_file), prompt)
frame_analyses.append(analysis)
summary = summarize_activities(frame_analyses)
results.append({
"video_filename": video_name,
"frame_count": frame_count,
"fps": fps,
"frame_analyses": frame_analyses,
"summary": summary
})
return JSONResponse(content={
"archive_filename": file.filename,
"videos_processed": len(video_files),
"results": results
})
@app.get("/health")
async def health_check():
"""Health check endpoint."""
return JSONResponse(content={
"status": "healthy",
"model": "Florence-2 (Mock)",
"note": "Florence-2 model is mocked due to sandbox memory limitations."
})
@app.get("/status")
async def get_processing_status():
"""Get current processing status."""
return JSONResponse(content=processing_status)
# Expose necessary functions and variables
__all__ = [
"main_processing_loop",
"processing_status",
"ANALYSIS_OUTPUT_FOLDER",
"log_message",
"send_email_with_attachment",
"analyze_frames",
"extract_frames",
"DEFAULT_FPS",
"ensure_dir"
]