elian / vision_analyzer.py
Fred808's picture
Update vision_analyzer.py
58f427e verified
import os
import json
import requests
import subprocess
import shutil
import time
import re
import threading
from typing import Dict, List, Set, Optional, Any
from huggingface_hub import HfApi, list_repo_files, CommitOperationAdd, hf_hub_download, hf_hub_url
import cv2
import numpy as np
from pathlib import Path
import smtplib
from email.message import EmailMessage
# ==== CONFIGURATION ====
HF_TOKEN = os.getenv("HF_TOKEN", "") # Using provided token as fallback
SOURCE_REPO_ID = os.getenv("SOURCE_REPO", "Fred808/BG1")
TARGET_REPO_ID = os.getenv("TARGET_REPO", "Fred808/BG3") # New target repo for uploads
# Path Configuration
DOWNLOAD_FOLDER = "downloads"
EXTRACT_FOLDER = "extracted"
FRAMES_OUTPUT_FOLDER = "extracted_frames"
ZIP_OUTPUT_FOLDER = "zipped_frames" # New folder for zip files
LOCAL_STATE_FOLDER = ".state" # Folder to temporarily store the downloaded state file
os.makedirs(DOWNLOAD_FOLDER, exist_ok=True)
os.makedirs(EXTRACT_FOLDER, exist_ok=True)
os.makedirs(FRAMES_OUTPUT_FOLDER, exist_ok=True)
os.makedirs(ZIP_OUTPUT_FOLDER, exist_ok=True) # Create zip output folder
os.makedirs(LOCAL_STATE_FOLDER, exist_ok=True)
# State Files
FAILED_FILES_LOG = "failed_files.log"
HF_STATE_FILE = "processing_state2.json" # New remote state file name
# Processing Parameters
CHUNK_SIZE = 2
PROCESSING_DELAY = 2
MAX_RETRIES = 3
MIN_FREE_SPACE_GB = 2 # Minimum free space in GB before processing
# Frame Extraction Parameters
DEFAULT_FPS = 3 # Default frames per second for extraction
# Initialize HF API
hf_api = HfApi(token=HF_TOKEN)
# Global State
processing_status = {
"is_running": False,
"current_file": None,
"total_files": 0,
"processed_files": 0,
"failed_files": 0,
"extracted_courses": 0,
"extracted_videos": 0,
"last_update": None,
"logs": []
}
def log_message(message: str):
"""Log messages with timestamp"""
timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
log_entry = f"[{timestamp}] {message}"
print(log_entry)
processing_status["logs"].append(log_entry)
processing_status["last_update"] = timestamp
if len(processing_status["logs"]) > 100:
processing_status["logs"] = processing_status["logs"][-100:]
def log_failed_file(filename: str, error: str):
"""Log failed files to persistent file"""
with open(FAILED_FILES_LOG, "a") as f:
f.write(f"{time.strftime('%Y-%m-%d %H:%M:%S')} - {filename}: {error}\n")
def get_disk_usage(path: str) -> Dict[str, float]:
"""Get disk usage statistics in GB"""
statvfs = os.statvfs(path)
total = statvfs.f_frsize * statvfs.f_blocks / (1024**3)
free = statvfs.f_frsize * statvfs.f_bavail / (1024**3)
used = total - free
return {"total": total, "free": free, "used": used}
def check_disk_space(path: str = ".") -> bool:
"""Check if there's enough disk space"""
disk_info = get_disk_usage(path)
if disk_info["free"] < MIN_FREE_SPACE_GB:
log_message(f'⚠️ Low disk space: {disk_info["free"]:.2f}GB free, {disk_info["used"]:.2f}GB used')
return False
return True
def cleanup_temp_files():
"""Clean up temporary files to free space"""
log_message("🧹 Cleaning up temporary files...")
# Clean old downloads (keep only current processing file)
current_file = processing_status.get("current_file")
for file in os.listdir(DOWNLOAD_FOLDER):
if file != current_file and file.endswith((".rar", ".zip")):
try:
os.remove(os.path.join(DOWNLOAD_FOLDER, file))
log_message(f"πŸ—‘οΈ Removed old download: {file}")
except:
pass
def load_json_state(file_path: str, default_value: Dict[str, Any]) -> Dict[str, Any]:
"""Load state from JSON file with migration logic for new structure."""
if os.path.exists(file_path):
try:
with open(file_path, "r") as f:
data = json.load(f)
# --- MIGRATION LOGIC ---
# 1. Convert old "processed_rars" list to new "file_states" dictionary
if "processed_rars" in data and isinstance(data["processed_rars"], list):
log_message("ℹ️ Migrating old 'processed_rars' list to new 'file_states' dictionary.")
data["file_states"] = {
filename: "processed" for filename in data.pop("processed_rars")
}
# 2. Ensure file_states exists and is a dict
if "file_states" not in data or not isinstance(data["file_states"], dict):
log_message("ℹ️ Initializing 'file_states' dictionary.")
data["file_states"] = {}
# 3. Ensure next_download_index exists
if "next_download_index" not in data:
data["next_download_index"] = 0
return data
except json.JSONDecodeError:
log_message(f"⚠️ Corrupted state file: {file_path}")
return default_value
def save_json_state(file_path: str, data: Dict[str, Any]):
"""Save state to JSON file"""
with open(file_path, "w") as f:
json.dump(data, f, indent=2)
def download_hf_state(repo_id: str, filename: str) -> Dict[str, Any]:
"""Downloads the state file from Hugging Face or returns a default state."""
local_path = os.path.join(LOCAL_STATE_FOLDER, filename)
# Changed default state to use 'file_states' for the new structure
default_state = {"next_download_index": 0, "file_states": {}}
try:
# Check if the file exists in the repo first
files = hf_api.list_repo_files(repo_id=repo_id, repo_type="dataset")
if filename not in files:
log_message(f"ℹ️ State file {filename} not found in {repo_id}. Starting from default state.")
return default_state
# Download the file
hf_hub_download(
repo_id=repo_id,
filename=filename,
repo_type="dataset",
local_dir=LOCAL_STATE_FOLDER,
local_dir_use_symlinks=False
)
log_message(f"βœ… Successfully downloaded state file from {repo_id}.")
# Use the modified load_json_state which handles migration
return load_json_state(local_path, default_state)
except Exception as e:
log_message(f"⚠️ Failed to download state file from Hugging Face: {str(e)}. Starting from default state.")
return default_state
def upload_hf_state(repo_id: str, filename: str, state: Dict[str, Any]) -> bool:
"""Uploads the state file to Hugging Face."""
local_path = os.path.join(LOCAL_STATE_FOLDER, filename)
try:
# 1. Save the updated state locally
save_json_state(local_path, state)
# 2. Upload the file
hf_api.upload_file(
path_or_fileobj=local_path,
path_in_repo=filename,
repo_id=repo_id,
repo_type="dataset",
commit_message=f"Update processing state: next_index={state['next_download_index']}"
)
log_message(f"βœ… Successfully uploaded updated state file to {repo_id}")
return True
except Exception as e:
log_message(f"❌ Failed to upload state file to Hugging Face: {str(e)}")
return False
# --- NEW LOCKING FUNCTIONS ---
def lock_file_for_processing(rar_filename: str, state: Dict[str, Any]) -> bool:
"""Marks a file as 'processing' in the state file and uploads the lock."""
log_message(f"πŸ”’ Attempting to lock file: {rar_filename} (Marking as 'processing')")
# Update state locally
state["file_states"][rar_filename] = "processing"
# Upload the updated state file immediately to establish the lock
if upload_hf_state(TARGET_REPO_ID, HF_STATE_FILE, state):
log_message(f"βœ… Successfully locked file: {rar_filename}")
return True
else:
log_message(f"❌ Failed to upload lock for file: {rar_filename}. Aborting processing.")
# Revert local state to avoid confusion if upload failed
if rar_filename in state["file_states"]:
del state["file_states"][rar_filename]
return False
def unlock_file_as_processed(rar_filename: str, state: Dict[str, Any], next_index: int) -> bool:
"""Marks a file as 'processed', updates the index, and uploads the state."""
log_message(f"πŸ”“ Attempting to unlock file: {rar_filename} (Marking as 'processed')")
# Update state locally
state["file_states"][rar_filename] = "processed"
state["next_download_index"] = next_index
# Upload the updated state file
if upload_hf_state(TARGET_REPO_ID, HF_STATE_FILE, state):
log_message(f"βœ… Successfully unlocked and marked as processed: {rar_filename}")
return True
else:
log_message(f"❌ Failed to upload final state for file: {rar_filename}. The file is processed locally but state is not updated.")
return False
# --- Original Utility Functions ---
def download_with_retry(url: str, dest_path: str, max_retries: int = 3) -> bool:
"""Download file with retry logic and disk space checking"""
if not check_disk_space():
cleanup_temp_files()
if not check_disk_space():
log_message("❌ Insufficient disk space even after cleanup")
return False
# NEW FIX: Ensure the directory structure exists before attempting to write the file
try:
os.makedirs(os.path.dirname(dest_path), exist_ok=True)
except Exception as e:
log_message(f"❌ Failed to create directory for download path {os.path.dirname(dest_path)}: {str(e)}")
return False
headers = {"Authorization": f"Bearer {HF_TOKEN}"}
for attempt in range(max_retries):
try:
with requests.get(url, headers=headers, stream=True) as r:
r.raise_for_status()
# Check content length if available
content_length = r.headers.get("content-length")
if content_length:
size_gb = int(content_length) / (1024**3)
disk_info = get_disk_usage(".")
if size_gb > disk_info["free"] - 0.5: # Leave 0.5GB buffer
log_message(f'❌ File too large: {size_gb:.2f}GB, only {disk_info["free"]:.2f}GB free')
return False
with open(dest_path, "wb") as f:
for chunk in r.iter_content(chunk_size=8192):
f.write(chunk)
return True
except Exception as e:
if attempt < max_retries - 1:
time.sleep(2 ** attempt)
continue
log_message(f"❌ Download failed after {max_retries} attempts: {e}")
return False
return False
def is_multipart_rar(filename: str) -> bool:
"""Check if this is a multi-part RAR file"""
return ".part" in filename.lower() and filename.lower().endswith(".rar")
def get_rar_part_base(filename: str) -> str:
"""Get the base name for multi-part RAR files"""
if ".part" in filename.lower():
return filename.split(".part")[0]
return filename.replace(".rar", "")
def extract_with_retry(rar_path: str, output_dir: str, max_retries: int = 2) -> bool:
"""Extract RAR with retry and recovery, handling multi-part archives"""
filename = os.path.basename(rar_path)
log_message("πŸ“¦ Processing RAR file independently as requested by user.")
for attempt in range(max_retries):
try:
# Use 'e' (extract) instead of 'x' (extract with full paths) and the -kb switch
cmd = ["unrar", "e", "-o+", "-kb", rar_path, output_dir]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode == 0:
log_message(f"βœ… Successfully extracted: {os.path.basename(rar_path)}")
return True
else:
error_msg = result.stderr or result.stdout
log_message(f"⚠️ Extraction attempt {attempt + 1} failed: {error_msg}")
# Check for the expected "Cannot find volume" error and treat it as a success
if "Cannot find volume" in error_msg:
log_message(f"βœ… Extracted all possible files from independent part (expected volume error).")
return True
if "checksum error" in error_msg.lower() or "CRC failed" in error_msg:
log_message(f"⚠️ Data corruption detected, attempt {attempt + 1}")
elif result.returncode == 10:
log_message(f"⚠️ No files to extract (exit code 10)")
elif result.returncode == 1:
log_message(f"⚠️ Non-fatal error (exit code 1)")
except Exception as e:
log_message(f"❌ Extraction exception: {str(e)}")
if attempt == max_retries - 1:
return False
time.sleep(1)
return False
# --- Frame Extraction Utilities ---
def ensure_dir(path):
os.makedirs(path, exist_ok=True)
def extract_frames(video_path, output_dir, fps=DEFAULT_FPS):
"""Extract frames from video at the specified frames per second (fps)."""
log_message(f"[INFO] Extracting frames from {video_path} to {output_dir} at {fps} fps...")
ensure_dir(output_dir)
cap = cv2.VideoCapture(str(video_path))
if not cap.isOpened():
log_message(f"[ERROR] Failed to open video file: {video_path}")
return 0
video_fps = cap.get(cv2.CAP_PROP_FPS)
if not video_fps or video_fps <= 0:
video_fps = 30 # fallback if FPS is not available
log_message(f"[WARN] Using fallback FPS: {video_fps}")
frame_interval = int(round(video_fps / fps))
frame_idx = 0
saved_idx = 1
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
log_message(f"[DEBUG] Total frames in video: {total_frames}")
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
if frame_idx % frame_interval == 0:
frame_name = f"{saved_idx:04d}.png"
cv2.imwrite(str(Path(output_dir) / frame_name), frame)
saved_idx += 1
frame_idx += 1
cap.release()
frame_count = saved_idx - 1
log_message(f"Extracted {frame_count} frames from {video_path} to {output_dir}")
return frame_count
def zip_frames_folder(frames_dir: str, output_folder: str) -> Optional[str]:
"""Zips the extracted frames folder."""
base_dir_name = os.path.basename(frames_dir)
# The output path for the zip file (without .zip extension)
zip_base_name = os.path.join(output_folder, base_dir_name)
log_message(f"[INFO] Zipping frames from {frames_dir} to {zip_base_name}.zip...")
try:
archive_path = shutil.make_archive(
base_name=zip_base_name,
format='zip',
root_dir=os.path.dirname(frames_dir),
base_dir=base_dir_name
)
log_message(f"βœ… Successfully created zip file: {archive_path}")
return archive_path
except Exception as e:
log_message(f"❌ Failed to create zip archive for {frames_dir}: {str(e)}")
return None
def upload_to_huggingface(file_path: str, repo_id: str, path_in_repo: str) -> bool:
"""Uploads a file to a Hugging Face dataset repository."""
filename = os.path.basename(file_path)
log_message(f"[INFO] Uploading {filename} to {repo_id}/{path_in_repo}...")
try:
# Use HfApi.upload_file for a single file upload
hf_api.upload_file(
path_or_fileobj=file_path,
path_in_repo=path_in_repo,
repo_id=repo_id,
repo_type="dataset",
commit_message=f"Add extracted frames zip: {filename}"
)
log_message(f"βœ… Successfully uploaded {filename} to {repo_id}")
return True
except Exception as e:
log_message(f"❌ Failed to upload {filename} to Hugging Face: {str(e)}")
return False
def process_rar_file(rar_path: str) -> bool:
"""Process a single RAR file - extract, then process videos for frames, zip, and upload"""
filename = os.path.basename(rar_path)
processing_status["current_file"] = filename
# Handle multi-part RAR naming
if is_multipart_rar(filename):
course_name = get_rar_part_base(filename)
else:
course_name = filename.replace(".rar", "")
extract_dir = os.path.join(EXTRACT_FOLDER, course_name)
try:
log_message(f"πŸ”„ Processing: {filename}")
# Extract RAR
os.makedirs(extract_dir, exist_ok=True)
if not extract_with_retry(rar_path, extract_dir):
raise Exception("RAR extraction failed")
# Count extracted files
file_count = 0
video_files_found = []
for root, dirs, files in os.walk(extract_dir):
for file in files:
file_count += 1
if file.lower().endswith(('.mp4', '.avi', '.mov', '.mkv', '.webm')):
video_files_found.append(os.path.join(root, file))
processing_status["extracted_courses"] += 1
log_message(f"βœ… Successfully extracted '{course_name}' ({file_count} files, {len(video_files_found)} videos)")
# Process video files for frame extraction, zipping, and uploading
for video_path in video_files_found:
video_filename = Path(video_path).name
# Unique output directory for frames
frames_output_dir_name = f"{course_name}_{video_filename.replace('.', '_')}_frames"
frames_output_dir = os.path.join(
FRAMES_OUTPUT_FOLDER,
frames_output_dir_name
)
ensure_dir(frames_output_dir)
# 1. Extract frames
frame_count = extract_frames(video_path, frames_output_dir, fps=DEFAULT_FPS)
processing_status["extracted_videos"] += 1
if frame_count == 0:
log_message(f"⚠️ No frames extracted from {video_filename}. Skipping zip/upload.")
# Clean up empty directory
shutil.rmtree(frames_output_dir, ignore_errors=True)
continue
else:
log_message(f"βœ… {frame_count} frames extracted from {video_filename}")
# 2. Zip the extracted frames
zip_path = zip_frames_folder(frames_output_dir, ZIP_OUTPUT_FOLDER)
if zip_path:
# 3. Upload the zip file to Hugging Face
path_in_repo = f"frames/{os.path.basename(zip_path)}"
if upload_to_huggingface(zip_path, TARGET_REPO_ID, path_in_repo):
log_message(f"βœ… Upload successful for {os.path.basename(zip_path)}")
else:
log_message(f"❌ Upload failed for {os.path.basename(zip_path)}. Keeping file for manual inspection.")
# 4. Clean up local zip and frame files after successful upload
try:
os.remove(zip_path)
log_message(f"πŸ—‘οΈ Cleaned up local zip file: {os.path.basename(zip_path)}")
shutil.rmtree(frames_output_dir, ignore_errors=True)
log_message(f"πŸ—‘οΈ Cleaned up local frames directory: {frames_output_dir_name}")
except Exception as e:
log_message(f"⚠️ Failed to clean up temporary files: {str(e)}")
return True
except Exception as e:
error_msg = str(e)
log_message(f"❌ Processing failed: {error_msg}")
log_failed_file(filename, error_msg)
return False
finally:
processing_status["current_file"] = None
def get_all_rar_files(repo_id: str) -> List[str]:
"""Fetches the list of all RAR files in the source repository."""
log_message(f"πŸ” Fetching all files from source repo: {repo_id}")
try:
files = list(hf_api.list_repo_files(repo_id=repo_id, repo_type="dataset"))
rar_files = sorted([f for f in files if f.lower().endswith(".rar")])
log_message(f"Found {len(rar_files)} RAR files.")
return rar_files
except Exception as e:
log_message(f"❌ Failed to list files from source repo: {str(e)}")
return []
def main_processing_loop(start_index: int = 0):
"""Main processing workflow with locking mechanism."""
processing_status["is_running"] = True
log_message("πŸš€ Starting main processing loop with locking mechanism...")
try:
# 1. Get the list of all RAR files from the source repo
all_rar_files = get_all_rar_files(SOURCE_REPO_ID)
if not all_rar_files:
log_message("πŸ›‘ No RAR files found in the source repository. Exiting.")
return
processing_status["total_files"] = len(all_rar_files)
while True:
# 2. Download the latest state file
current_state = download_hf_state(TARGET_REPO_ID, HF_STATE_FILE)
file_states = current_state.get("file_states", {})
# Use start_index if provided, otherwise use the remote state
next_download_index = start_index if start_index > 0 else current_state.get("next_download_index", 0)
# 3. Find the next *available* file to process
target_file = None
target_index = -1
# Check if we have processed all files
if next_download_index >= len(all_rar_files):
log_message("πŸŽ‰ All files have been processed! Exiting loop.")
break
for i in range(next_download_index, len(all_rar_files)):
rar_filename = all_rar_files[i]
state = file_states.get(rar_filename)
if state in ["processed", "failed"]:
# Already done (or failed and we are skipping it), skip to the next index
continue
if state == "processing":
# This is the lock mechanism: another worker is currently processing this file
log_message(f"⚠️ File is currently 'processing' (locked): {rar_filename}. Skipping to next available file.")
continue
# If state is None (not in file_states), it's a new, available file
target_file = rar_filename
target_index = i
break
if target_file is None:
log_message("πŸŽ‰ All available files have been processed or are currently locked. Exiting loop.")
break
log_message(f"βœ… Selected file for processing: {target_file} (Index: {target_index})")
processing_status["current_file"] = target_file
# 4. Acquire Lock: Update state to 'processing' and upload immediately
if not lock_file_for_processing(target_file, current_state):
log_message("❌ Failed to acquire lock. Retrying loop to get latest state.")
time.sleep(PROCESSING_DELAY)
continue
# 5. Perform the actual work
rar_url = hf_hub_url(repo_id=SOURCE_REPO_ID, filename=target_file, repo_type="dataset")
# The download path now includes the subdirectory from the Hugging Face repo
local_rar_path = os.path.join(DOWNLOAD_FOLDER, target_file)
success = False
try:
# Download the file
if download_with_retry(rar_url, local_rar_path):
# Process the file (extraction, frame processing, zipping, uploading results, etc.)
if process_rar_file(local_rar_path):
success = True
log_message(f"βœ… Finished all processing steps for: {target_file}")
else:
log_message(f"❌ Processing failed for: {target_file}")
else:
log_message(f"❌ Download failed for: {target_file}")
except Exception as e:
log_message(f"πŸ”₯ An unhandled error occurred while processing {target_file}: {str(e)}")
log_failed_file(target_file, str(e))
finally:
# 6. Release Lock / Update State
# The next index to check will be the one *after* the current file, regardless of success.
next_index_to_save = target_index + 1
# Download the latest state again before final upload to ensure we don't overwrite
# changes made by other workers in the meantime (e.g. if they processed a file
# that was before this one in the queue).
current_state = download_hf_state(TARGET_REPO_ID, HF_STATE_FILE)
if success:
# Mark as 'processed' and update the next_download_index
unlock_file_as_processed(target_file, current_state, next_index_to_save)
processing_status["processed_files"] += 1
else:
# If processing failed, we still want to release the 'processing' lock,
# but we mark it as 'failed' instead of 'processed' and still advance the index.
log_message(f"⚠️ Processing failed for {target_file}. Marking as 'failed' and advancing index.")
current_state["file_states"][target_file] = "failed"
current_state["next_download_index"] = next_index_to_save
upload_hf_state(TARGET_REPO_ID, HF_STATE_FILE, current_state)
processing_status["failed_files"] += 1
# Clean up local files
if os.path.exists(local_rar_path):
os.remove(local_rar_path)
log_message(f"πŸ—‘οΈ Cleaned up local file: {local_rar_path}")
# Wait a bit before checking for the next file to avoid hammering the HF API
time.sleep(PROCESSING_DELAY)
log_message("πŸŽ‰ Processing complete!")
log_message(f'πŸ“Š Final stats: {processing_status["extracted_courses"]} courses extracted, {processing_status["extracted_videos"]} videos processed, frames extracted')
except KeyboardInterrupt:
log_message("⏹️ Processing interrupted by user")
except Exception as e:
log_message(f"❌ Fatal error: {str(e)}")
finally:
processing_status["is_running"] = False
cleanup_temp_files()
# Expose necessary functions and variables
__all__ = [
"main_processing_loop",
"processing_status",
"log_message",
"extract_frames",
"DEFAULT_FPS",
"ensure_dir"
]