import os import json import requests import subprocess import shutil import time import re import threading from typing import Dict, List, Set, Optional from huggingface_hub import HfApi, list_repo_files import cv2 import numpy as np from pathlib import Path import smtplib from email.message import EmailMessage import multiprocessing # ==== CONFIGURATION ==== HF_TOKEN = os.getenv("HF_TOKEN", "") SOURCE_REPO_ID = os.getenv("SOURCE_REPO", "Fred808/BG1") # Path Configuration DOWNLOAD_FOLDER = "downloads" EXTRACT_FOLDER = "extracted" FRAMES_OUTPUT_FOLDER = "extracted_frames" # New folder for extracted frames CURSOR_TRACKING_OUTPUT_FOLDER = "cursor_tracking_results" # New folder for cursor tracking results CURSOR_TEMPLATES_DIR = "cursors" os.makedirs(DOWNLOAD_FOLDER, exist_ok=True) os.makedirs(EXTRACT_FOLDER, exist_ok=True) os.makedirs(FRAMES_OUTPUT_FOLDER, exist_ok=True) os.makedirs(CURSOR_TRACKING_OUTPUT_FOLDER, exist_ok=True) os.makedirs(CURSOR_TEMPLATES_DIR, exist_ok=True) # Ensure cursor templates directory exists # State Files DOWNLOAD_STATE_FILE = "download_progress.json" PROCESS_STATE_FILE = "process_progress.json" FAILED_FILES_LOG = "failed_files.log" # Processing Parameters CHUNK_SIZE = 1 PROCESSING_DELAY = 2 MAX_RETRIES = 3 MIN_FREE_SPACE_GB = 2 # Minimum free space in GB before processing # Frame Extraction Parameters DEFAULT_FPS = 3 # Default frames per second for extraction # Cursor Tracking Parameters CURSOR_THRESHOLD = 0.8 # Initialize HF API hf_api = HfApi(token=HF_TOKEN) # Global State processing_status = { "is_running": False, "current_file": None, "total_files": 0, "processed_files": 0, "failed_files": 0, "extracted_courses": 0, "extracted_videos": 0, "extracted_frames_count": 0, "tracked_cursors_count": 0, "last_update": None, "logs": [] } def log_message(message: str): """Log messages with timestamp""" timestamp = time.strftime("%Y-%m-%d %H:%M:%S") log_entry = f"[{timestamp}] {message}" print(log_entry) processing_status["logs"].append(log_entry) processing_status["last_update"] = timestamp if len(processing_status["logs"]) > 100: processing_status["logs"] = processing_status["logs"][-100:] def log_failed_file(filename: str, error: str): """Log failed files to persistent file""" with open(FAILED_FILES_LOG, "a") as f: f.write(f"{time.strftime('%Y-%m-%d %H:%M:%S')} - {filename}: {error}\n") def get_disk_usage(path: str) -> Dict[str, float]: """Get disk usage statistics in GB""" statvfs = os.statvfs(path) total = statvfs.f_frsize * statvfs.f_blocks / (1024**3) free = statvfs.f_frsize * statvfs.f_bavail / (1024**3) used = total - free return {"total": total, "free": free, "used": used} def check_disk_space(path: str = ".") -> bool: """Check if there's enough disk space""" disk_info = get_disk_usage(path) if disk_info["free"] < MIN_FREE_SPACE_GB: log_message(f'โš ๏ธ Low disk space: {disk_info["free"]:.2f}GB free, {disk_info["used"]:.2f}GB used') return False return True def cleanup_temp_files(): """Clean up temporary files to free space""" log_message("๐Ÿงน Cleaning up temporary files...") # Clean old downloads (keep only current processing file) current_file = processing_status.get("current_file") for file in os.listdir(DOWNLOAD_FOLDER): if file != current_file and file.endswith((".rar", ".zip")): try: os.remove(os.path.join(DOWNLOAD_FOLDER, file)) log_message(f"๐Ÿ—‘๏ธ Removed old download: {file}") except Exception: pass def load_json_state(file_path: str, default_value): """Load state from JSON file""" if os.path.exists(file_path): try: with open(file_path, "r") as f: return json.load(f) except json.JSONDecodeError: log_message(f"โš ๏ธ Corrupted state file: {file_path}") return default_value def save_json_state(file_path: str, data): """Save state to JSON file""" with open(file_path, "w") as f: json.dump(data, f, indent=2) def download_with_retry(url: str, dest_path: str, max_retries: int = 3) -> bool: """Download file with retry logic and disk space checking""" if not check_disk_space(): cleanup_temp_files() if not check_disk_space(): log_message("โŒ Insufficient disk space even after cleanup") return False headers = {"Authorization": f"Bearer {HF_TOKEN}"} if HF_TOKEN else {} for attempt in range(max_retries): try: with requests.get(url, headers=headers, stream=True) as r: r.raise_for_status() # Check content length if available content_length = r.headers.get("content-length") if content_length: size_gb = int(content_length) / (1024**3) disk_info = get_disk_usage(".") if size_gb > disk_info["free"] - 0.5: # Leave 0.5GB buffer log_message(f'โŒ File too large: {size_gb:.2f}GB, only {disk_info["free"]:.2f}GB free') return False with open(dest_path, "wb") as f: for chunk in r.iter_content(chunk_size=8192): if chunk: f.write(chunk) return True except Exception as e: if attempt < max_retries - 1: time.sleep(2 ** attempt) continue log_message(f"โŒ Download failed after {max_retries} attempts: {e}") return False return False def is_multipart_rar(filename: str) -> bool: """Check if this is a multi-part RAR file""" return ".part" in filename.lower() and filename.lower().endswith(".rar") def get_rar_part_base(filename: str) -> str: """Get the base name for multi-part RAR files""" if ".part" in filename.lower(): return filename.split(".part")[0] return filename.replace(".rar", "") def extract_with_retry(rar_path: str, output_dir: str, max_retries: int = 2) -> bool: """Extract RAR with retry and recovery, handling multi-part archives""" filename = os.path.basename(rar_path) # For multi-part RARs, we need the first part if is_multipart_rar(filename): base_name = get_rar_part_base(filename) first_part = f"{base_name}.part01.rar" first_part_path = os.path.join(os.path.dirname(rar_path), first_part) if not os.path.exists(first_part_path): log_message(f"โš ๏ธ Multi-part RAR detected but first part not found: {first_part}") return False rar_path = first_part_path log_message(f"๐Ÿ“ฆ Processing multi-part RAR starting with: {first_part}") for attempt in range(max_retries): try: # Test RAR first test_cmd = ["unrar", "t", rar_path] test_result = subprocess.run(test_cmd, capture_output=True, text=True) if test_result.returncode != 0: log_message(f"โš ๏ธ RAR test failed: {test_result.stderr}") if attempt == max_retries - 1: return False continue # Extract RAR cmd = ["unrar", "x", "-o+", rar_path, output_dir] if attempt > 0: # Try recovery on subsequent attempts cmd.insert(2, "-kb") result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode == 0: log_message(f"โœ… Successfully extracted: {os.path.basename(rar_path)}") return True else: error_msg = result.stderr or result.stdout log_message(f"โš ๏ธ Extraction attempt {attempt + 1} failed: {error_msg}") if "checksum error" in error_msg.lower() or "CRC failed" in error_msg: log_message(f"โš ๏ธ Data corruption detected, attempt {attempt + 1}") elif result.returncode == 10: log_message(f"โš ๏ธ No files to extract (exit code 10)") return False elif result.returncode == 1: log_message(f"โš ๏ธ Non-fatal error (exit code 1)") except Exception as e: log_message(f"โŒ Extraction exception: {str(e)}") if attempt == max_retries - 1: return False time.sleep(1) return False # --- Frame Extraction Utilities --- def ensure_dir(path): os.makedirs(path, exist_ok=True) def extract_frames(video_path, output_dir, fps=DEFAULT_FPS): """Extract frames from video at the specified frames per second (fps).""" log_message(f"[INFO] Extracting frames from {video_path} to {output_dir} at {fps} fps...") ensure_dir(output_dir) cap = cv2.VideoCapture(str(video_path)) if not cap.isOpened(): log_message(f"[ERROR] Failed to open video file: {video_path}") return 0 video_fps = cap.get(cv2.CAP_PROP_FPS) if not video_fps or video_fps <= 0: video_fps = 30 # fallback if FPS is not available log_message(f"[WARN] Using fallback FPS: {video_fps}") frame_interval = int(round(video_fps / fps)) if frame_interval <= 0: frame_interval = 1 frame_idx = 0 saved_idx = 1 total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) log_message(f"[DEBUG] Total frames in video: {total_frames}") while cap.isOpened(): ret, frame = cap.read() if not ret: break if frame_idx % frame_interval == 0: frame_name = f"{saved_idx:04d}.png" cv2.imwrite(str(Path(output_dir) / frame_name), frame) saved_idx += 1 frame_idx += 1 cap.release() log_message(f"Extracted {saved_idx-1} frames from {video_path} to {output_dir}") return saved_idx - 1 # --- Cursor Tracking Utilities (multiprocessing) --- def to_rgb(img): if img is None: return None if len(img.shape) == 2: return cv2.cvtColor(img, cv2.COLOR_GRAY2BGR) if img.shape[2] == 4: return cv2.cvtColor(img, cv2.COLOR_BGRA2BGR) return img def get_mask_from_alpha(template_img): if template_img is not None and len(template_img.shape) == 3 and template_img.shape[2] == 4: # Use alpha channel as mask (nonzero alpha = 255) return (template_img[:, :, 3] > 0).astype(np.uint8) * 255 return None def detect_cursor_in_frame_multi(frame, cursor_templates, threshold=CURSOR_THRESHOLD): """Detect cursor position in a frame using multiple templates. Returns best match above threshold.""" best_pos = None best_conf = -1 best_template_name = None frame_rgb = to_rgb(frame) for template_name, cursor_template in cursor_templates.items(): template_rgb = to_rgb(cursor_template) mask = get_mask_from_alpha(cursor_template) if template_rgb is None or frame_rgb is None or template_rgb.shape[2] != frame_rgb.shape[2]: # Channel mismatch or load error continue try: result = cv2.matchTemplate(frame_rgb, template_rgb, cv2.TM_CCOEFF_NORMED, mask=mask) except Exception: continue min_val, max_val, min_loc, max_loc = cv2.minMaxLoc(result) if max_val > best_conf: best_conf = max_val if max_val >= threshold: cursor_w, cursor_h = template_rgb.shape[1], template_rgb.shape[0] cursor_x = max_loc[0] + cursor_w // 2 cursor_y = max_loc[1] + cursor_h // 2 best_pos = (cursor_x, cursor_y) best_template_name = template_name if best_conf >= threshold: return best_pos, best_conf, best_template_name return None, best_conf, None # Multiprocessing worker init and worker function # These globals are loaded in each worker process via initializer for efficiency _WORKER_CURSOR_TEMPLATES = None _WORKER_THRESHOLD = None def _init_worker(template_paths, threshold): """Initializer for pool workers: load templates into process-local global variable""" global _WORKER_CURSOR_TEMPLATES global _WORKER_THRESHOLD _WORKER_CURSOR_TEMPLATES = {} for tp in template_paths: try: img = cv2.imread(tp, cv2.IMREAD_UNCHANGED) if img is not None: _WORKER_CURSOR_TEMPLATES[os.path.basename(tp)] = img except Exception: pass _WORKER_THRESHOLD = threshold def track_cursor_worker(frame_file, cursor_templates, threshold, log_queue): """Worker function that tracks cursor and sends logs back.""" frame = cv2.imread(str(frame_file), cv2.IMREAD_UNCHANGED) if frame is None: log_queue.put(f"[WARN] Frame unreadable: {frame_file.name}") return { "frame": frame_file.name, "cursor_active": False, "x": None, "y": None, "confidence": -1, "template": None } pos, conf, template_name = detect_cursor_in_frame_multi(frame, cursor_templates, threshold) if pos is not None: log_queue.put( f"[FRAME] {frame_file.name} โ†’ FOUND cursor at ({pos[0]},{pos[1]}) conf={conf:.3f} template={template_name}" ) return { "frame": frame_file.name, "cursor_active": True, "x": pos[0], "y": pos[1], "confidence": conf, "template": template_name } else: log_queue.put( f"[FRAME] {frame_file.name} โ†’ NO cursor (max_conf={conf:.3f})" ) return { "frame": frame_file.name, "cursor_active": False, "x": None, "y": None, "confidence": conf, "template": None } def upload_to_hf_dataset(local_path, dataset_repo_id="Fred808/data", hf_token=None): """Upload JSON tracking results to Hugging Face dataset repo""" try: api = HfApi(token=hf_token or HF_TOKEN) filename = os.path.basename(local_path) repo_path = f"results/{filename}" api.upload_file( path_or_fileobj=local_path, path_in_repo=repo_path, repo_id=dataset_repo_id, repo_type="dataset" ) log_message(f"[UPLOAD] โœ… Uploaded {filename} to {dataset_repo_id}/{repo_path}") except Exception as e: log_message(f"[UPLOAD ERROR] {e}") def log_listener(log_queue): """Continuously print log messages from worker processes.""" while True: msg = log_queue.get() if msg == "STOP": break log_message(msg) def track_cursor_parallel(frames_dir, cursor_templates_dir, output_json_path, threshold=CURSOR_THRESHOLD, start_frame=1, batch_size=100, email_results=False): """Parallelized cursor tracking with real-time logging""" log_message(f"[INFO] Tracking cursors in {frames_dir} with real-time logging...") frames_dir = Path(frames_dir).resolve() output_json_path = Path(output_json_path).resolve() cursor_templates_dir = Path(cursor_templates_dir).resolve() ensure_dir(output_json_path.parent) # Load cursor templates cursor_templates = {} for template_file in cursor_templates_dir.glob("*.png"): img = cv2.imread(str(template_file), cv2.IMREAD_UNCHANGED) if img is not None: cursor_templates[template_file.name] = img if not cursor_templates: log_message(f"[ERROR] No cursor templates found in {cursor_templates_dir}") return 0 # List frames all_frames = sorted(frames_dir.glob("*.png")) all_frames = [f for f in all_frames if int(f.stem) >= start_frame] total_frames = len(all_frames) if not total_frames: log_message("[WARN] No frames found to process.") return 0 log_message(f"[INFO] Total frames to track: {total_frames}") # Multiprocessing setup manager = multiprocessing.Manager() log_queue = manager.Queue() listener = multiprocessing.Process(target=log_listener, args=(log_queue,)) listener.start() pool = multiprocessing.Pool(multiprocessing.cpu_count()) results = [] processed = 0 try: # Feed tasks to pool in batches for i in range(0, total_frames, batch_size): batch = all_frames[i:i + batch_size] tasks = [ pool.apply_async( track_cursor_worker, (frame_file, cursor_templates, threshold, log_queue) ) for frame_file in batch ] for t in tasks: res = t.get() results.append(res) processed += 1 if processed % 50 == 0 or processed == total_frames: log_message(f"[PROGRESS] {processed}/{total_frames} frames processed") with open(output_json_path, "w") as f: json.dump(results, f, indent=2) pool.close() pool.join() # Final write with open(output_json_path, "w") as f: json.dump(results, f, indent=2) log_message(f"[SUCCESS] Cursor tracking results saved to {output_json_path}") upload_to_hf_dataset(output_json_path, dataset_repo_id="Fred808/data", hf_token=HF_TOKEN) if email_results: log_message("[INFO] Sending email results (if configured)...") to_email = os.environ.get("TO_EMAIL") from_email = os.environ.get("FROM_EMAIL") app_password = os.environ.get("GMAIL_APP_PASSWORD") if to_email and from_email and app_password: send_email_with_attachment( subject="Cursor Tracking Results", body="See attached JSON results.", to_email=to_email, from_email=from_email, app_password=app_password, attachment_path=output_json_path ) except Exception as e: log_message(f"[ERROR] Exception during parallel tracking: {e}") pool.terminate() finally: log_queue.put("STOP") listener.join() active = len([r for r in results if r["cursor_active"]]) log_message(f"[DONE] {active}/{total_frames} frames contained cursors.") return active def send_email_with_attachment(subject, body, to_email, from_email, app_password, attachment_path): msg = EmailMessage() msg["Subject"] = subject msg["From"] = from_email msg["To"] = to_email msg.set_content(body) with open(attachment_path, "rb") as f: file_data = f.read() file_name = Path(attachment_path).name msg.add_attachment(file_data, maintype="application", subtype="octet-stream", filename=file_name) try: with smtplib.SMTP_SSL("smtp.gmail.com", 465) as smtp: smtp.login(from_email, app_password) smtp.send_message(msg) log_message(f"[SUCCESS] Email sent to {to_email}") except Exception as e: log_message(f"[ERROR] Failed to send email: {e}") def track_cursor(frames_dir, cursor_templates_dir, output_json_path, threshold=CURSOR_THRESHOLD, start_frame=1, batch_size=100, email_results=False): """ Backwards-compatible wrapper that calls the parallel implementation. Keep this name so other parts of your code that call track_cursor() keep working. """ return track_cursor_parallel(frames_dir, cursor_templates_dir, output_json_path, threshold, start_frame, batch_size, email_results) def process_rar_file(rar_path: str) -> bool: """Process a single RAR file - extract, then process videos for frames and cursor tracking""" filename = os.path.basename(rar_path) processing_status["current_file"] = filename # Handle multi-part RAR naming if is_multipart_rar(filename): course_name = get_rar_part_base(filename) else: course_name = filename.replace(".rar", "") extract_dir = os.path.join(EXTRACT_FOLDER, course_name) try: log_message(f"๐Ÿ”„ Processing: {filename}") # Clean up any existing directory if os.path.exists(extract_dir): shutil.rmtree(extract_dir, ignore_errors=True) # Extract RAR os.makedirs(extract_dir, exist_ok=True) if not extract_with_retry(rar_path, extract_dir): raise Exception("RAR extraction failed") # Count extracted files file_count = 0 video_files_found = [] for root, dirs, files in os.walk(extract_dir): for file in files: file_count += 1 if file.lower().endswith((".mp4", ".avi", ".mov", ".mkv")): video_files_found.append(os.path.join(root, file)) processing_status["extracted_courses"] += 1 log_message(f"โœ… Successfully extracted '{course_name}' ({file_count} files, {len(video_files_found)} videos)") # Process video files for frame extraction and cursor tracking for video_path in video_files_found: video_filename = Path(video_path).name # Create a unique output directory for frames for each video safe_video_name = video_filename.replace(".", "_") frames_output_dir = os.path.join(FRAMES_OUTPUT_FOLDER, f"{course_name}_{safe_video_name}_frames") ensure_dir(frames_output_dir) extracted_frames_count = extract_frames(video_path, frames_output_dir, fps=DEFAULT_FPS) processing_status["extracted_frames_count"] += extracted_frames_count if extracted_frames_count > 0: processing_status["extracted_videos"] += 1 log_message(f"[INFO] Extracted {extracted_frames_count} frames from {video_filename}") # Perform cursor tracking on the extracted frames cursor_output_json = os.path.join(CURSOR_TRACKING_OUTPUT_FOLDER, f"{course_name}_{safe_video_name}_cursor_data.json") tracked_cursors = track_cursor(frames_output_dir, CURSOR_TEMPLATES_DIR, cursor_output_json, threshold=CURSOR_THRESHOLD, batch_size=100) processing_status["tracked_cursors_count"] += tracked_cursors log_message(f"[INFO] Tracked {tracked_cursors} cursors in frames from {video_filename}") else: log_message(f"[WARN] No frames extracted from {video_filename}") return True except Exception as e: error_msg = str(e) log_message(f"โŒ Processing failed: {error_msg}") log_failed_file(filename, error_msg) return False finally: processing_status["current_file"] = None def main_processing_loop(start_index: int = 0): """Main processing workflow - extraction, frame extraction, and cursor tracking""" processing_status["is_running"] = True try: # Load state processed_rars = load_json_state(PROCESS_STATE_FILE, {"processed_rars": []})["processed_rars"] download_state = load_json_state(DOWNLOAD_STATE_FILE, {"next_download_index": 5}) # Use start_index if provided, otherwise use the saved state next_index = start_index if start_index > 0 else download_state["next_download_index"] log_message(f"๐Ÿ“Š Starting from index {next_index}") log_message(f"๐Ÿ“Š Previously processed: {len(processed_rars)} files") # Get file list try: files = list(hf_api.list_repo_files(repo_id=SOURCE_REPO_ID, repo_type="dataset")) rar_files = sorted([f for f in files if f.endswith(".rar")]) processing_status["total_files"] = len(rar_files) log_message(f"๐Ÿ“ Found {len(rar_files)} RAR files in repository") if next_index >= len(rar_files): log_message("โœ… All files have been processed!") return except Exception as e: log_message(f"โŒ Failed to get file list: {str(e)}") return # Process one file per run if next_index < len(rar_files): rar_file = rar_files[next_index] filename = os.path.basename(rar_file) if filename in processed_rars: log_message(f"โญ๏ธ Skipping already processed: {filename}") processing_status["processed_files"] += 1 # Move to next file next_index += 1 save_json_state(DOWNLOAD_STATE_FILE, {"next_download_index": next_index}) log_message(f"๐Ÿ“Š Moving to next file. Progress: {next_index}/{len(rar_files)}") return log_message(f"๐Ÿ“ฅ Downloading: {filename}") dest_path = os.path.join(DOWNLOAD_FOLDER, filename) # Download file download_url = f"https://huggingface.co/datasets/{SOURCE_REPO_ID}/resolve/main/{rar_file}" if download_with_retry(download_url, dest_path): # Process file if process_rar_file(dest_path): processed_rars.append(filename) save_json_state(PROCESS_STATE_FILE, {"processed_rars": processed_rars}) log_message(f"โœ… Successfully processed: {filename}") processing_status["processed_files"] += 1 else: log_message(f"โŒ Failed to process: {filename}") processing_status["failed_files"] += 1 # Clean up downloaded file try: os.remove(dest_path) log_message(f"๐Ÿ—‘๏ธ Cleaned up download: {filename}") except Exception: pass else: log_message(f"โŒ Failed to download: {filename}") processing_status["failed_files"] += 1 # Update download state for next run next_index += 1 save_json_state(DOWNLOAD_STATE_FILE, {"next_download_index": next_index}) # Status update log_message(f"๐Ÿ“Š Progress: {next_index}/{len(rar_files)} files processed") log_message(f'๐Ÿ“Š Extracted: {processing_status["extracted_courses"]} courses') log_message(f'๐Ÿ“Š Videos Processed: {processing_status["extracted_videos"]}') log_message(f'๐Ÿ“Š Frames Extracted: {processing_status["extracted_frames_count"]}') log_message(f'๐Ÿ“Š Cursors Tracked: {processing_status["tracked_cursors_count"]}') log_message(f'๐Ÿ“Š Failed: {processing_status["failed_files"]} files') if next_index < len(rar_files): log_message(f"๐Ÿ”„ Run the script again to process the next file: {os.path.basename(rar_files[next_index])}") else: log_message("๐ŸŽ‰ All files have been processed!") else: log_message("โœ… All files have been processed!") log_message("๐ŸŽ‰ Processing complete!") log_message(f'๐Ÿ“Š Final stats: {processing_status["extracted_courses"]} courses extracted, {processing_status["extracted_videos"]} videos processed, {processing_status["extracted_frames_count"]} frames extracted, {processing_status["tracked_cursors_count"]} cursors tracked') except KeyboardInterrupt: log_message("โน๏ธ Processing interrupted by user") except Exception as e: log_message(f"โŒ Fatal error: {str(e)}") finally: processing_status["is_running"] = False cleanup_temp_files() # Expose necessary functions and variables for download_api.py __all__ = [ "main_processing_loop", "processing_status", "CURSOR_TRACKING_OUTPUT_FOLDER", "CURSOR_TEMPLATES_DIR", "log_message", "send_email_with_attachment", "track_cursor", "extract_frames", "DEFAULT_FPS", "CURSOR_THRESHOLD", "ensure_dir" ]