|
|
import os |
|
|
import json |
|
|
import requests |
|
|
import subprocess |
|
|
import shutil |
|
|
import time |
|
|
import re |
|
|
import threading |
|
|
from typing import Dict, List, Set, Optional |
|
|
from huggingface_hub import HfApi, list_repo_files |
|
|
|
|
|
import cv2 |
|
|
import numpy as np |
|
|
from pathlib import Path |
|
|
import smtplib |
|
|
from email.message import EmailMessage |
|
|
import multiprocessing |
|
|
|
|
|
|
|
|
HF_TOKEN = os.getenv("HF_TOKEN", "") |
|
|
SOURCE_REPO_ID = os.getenv("SOURCE_REPO", "Fred808/BG1") |
|
|
|
|
|
|
|
|
DOWNLOAD_FOLDER = "downloads" |
|
|
EXTRACT_FOLDER = "extracted" |
|
|
FRAMES_OUTPUT_FOLDER = "extracted_frames" |
|
|
CURSOR_TRACKING_OUTPUT_FOLDER = "cursor_tracking_results" |
|
|
CURSOR_TEMPLATES_DIR = "cursors" |
|
|
|
|
|
os.makedirs(DOWNLOAD_FOLDER, exist_ok=True) |
|
|
os.makedirs(EXTRACT_FOLDER, exist_ok=True) |
|
|
os.makedirs(FRAMES_OUTPUT_FOLDER, exist_ok=True) |
|
|
os.makedirs(CURSOR_TRACKING_OUTPUT_FOLDER, exist_ok=True) |
|
|
os.makedirs(CURSOR_TEMPLATES_DIR, exist_ok=True) |
|
|
|
|
|
|
|
|
DOWNLOAD_STATE_FILE = "download_progress.json" |
|
|
PROCESS_STATE_FILE = "process_progress.json" |
|
|
FAILED_FILES_LOG = "failed_files.log" |
|
|
|
|
|
|
|
|
CHUNK_SIZE = 1 |
|
|
PROCESSING_DELAY = 2 |
|
|
MAX_RETRIES = 3 |
|
|
MIN_FREE_SPACE_GB = 2 |
|
|
|
|
|
|
|
|
DEFAULT_FPS = 3 |
|
|
|
|
|
|
|
|
CURSOR_THRESHOLD = 0.8 |
|
|
|
|
|
|
|
|
hf_api = HfApi(token=HF_TOKEN) |
|
|
|
|
|
|
|
|
processing_status = { |
|
|
"is_running": False, |
|
|
"current_file": None, |
|
|
"total_files": 0, |
|
|
"processed_files": 0, |
|
|
"failed_files": 0, |
|
|
"extracted_courses": 0, |
|
|
"extracted_videos": 0, |
|
|
"extracted_frames_count": 0, |
|
|
"tracked_cursors_count": 0, |
|
|
"last_update": None, |
|
|
"logs": [] |
|
|
} |
|
|
|
|
|
|
|
|
def log_message(message: str): |
|
|
"""Log messages with timestamp""" |
|
|
timestamp = time.strftime("%Y-%m-%d %H:%M:%S") |
|
|
log_entry = f"[{timestamp}] {message}" |
|
|
print(log_entry) |
|
|
processing_status["logs"].append(log_entry) |
|
|
processing_status["last_update"] = timestamp |
|
|
if len(processing_status["logs"]) > 100: |
|
|
processing_status["logs"] = processing_status["logs"][-100:] |
|
|
|
|
|
|
|
|
def log_failed_file(filename: str, error: str): |
|
|
"""Log failed files to persistent file""" |
|
|
with open(FAILED_FILES_LOG, "a") as f: |
|
|
f.write(f"{time.strftime('%Y-%m-%d %H:%M:%S')} - {filename}: {error}\n") |
|
|
|
|
|
|
|
|
def get_disk_usage(path: str) -> Dict[str, float]: |
|
|
"""Get disk usage statistics in GB""" |
|
|
statvfs = os.statvfs(path) |
|
|
total = statvfs.f_frsize * statvfs.f_blocks / (1024**3) |
|
|
free = statvfs.f_frsize * statvfs.f_bavail / (1024**3) |
|
|
used = total - free |
|
|
return {"total": total, "free": free, "used": used} |
|
|
|
|
|
|
|
|
def check_disk_space(path: str = ".") -> bool: |
|
|
"""Check if there's enough disk space""" |
|
|
disk_info = get_disk_usage(path) |
|
|
if disk_info["free"] < MIN_FREE_SPACE_GB: |
|
|
log_message(f'β οΈ Low disk space: {disk_info["free"]:.2f}GB free, {disk_info["used"]:.2f}GB used') |
|
|
return False |
|
|
return True |
|
|
|
|
|
|
|
|
def cleanup_temp_files(): |
|
|
"""Clean up temporary files to free space""" |
|
|
log_message("π§Ή Cleaning up temporary files...") |
|
|
|
|
|
|
|
|
current_file = processing_status.get("current_file") |
|
|
for file in os.listdir(DOWNLOAD_FOLDER): |
|
|
if file != current_file and file.endswith((".rar", ".zip")): |
|
|
try: |
|
|
os.remove(os.path.join(DOWNLOAD_FOLDER, file)) |
|
|
log_message(f"ποΈ Removed old download: {file}") |
|
|
except Exception: |
|
|
pass |
|
|
|
|
|
|
|
|
def load_json_state(file_path: str, default_value): |
|
|
"""Load state from JSON file""" |
|
|
if os.path.exists(file_path): |
|
|
try: |
|
|
with open(file_path, "r") as f: |
|
|
return json.load(f) |
|
|
except json.JSONDecodeError: |
|
|
log_message(f"β οΈ Corrupted state file: {file_path}") |
|
|
return default_value |
|
|
|
|
|
|
|
|
def save_json_state(file_path: str, data): |
|
|
"""Save state to JSON file""" |
|
|
with open(file_path, "w") as f: |
|
|
json.dump(data, f, indent=2) |
|
|
|
|
|
|
|
|
def download_with_retry(url: str, dest_path: str, max_retries: int = 3) -> bool: |
|
|
"""Download file with retry logic and disk space checking""" |
|
|
if not check_disk_space(): |
|
|
cleanup_temp_files() |
|
|
if not check_disk_space(): |
|
|
log_message("β Insufficient disk space even after cleanup") |
|
|
return False |
|
|
|
|
|
headers = {"Authorization": f"Bearer {HF_TOKEN}"} if HF_TOKEN else {} |
|
|
for attempt in range(max_retries): |
|
|
try: |
|
|
with requests.get(url, headers=headers, stream=True) as r: |
|
|
r.raise_for_status() |
|
|
|
|
|
|
|
|
content_length = r.headers.get("content-length") |
|
|
if content_length: |
|
|
size_gb = int(content_length) / (1024**3) |
|
|
disk_info = get_disk_usage(".") |
|
|
if size_gb > disk_info["free"] - 0.5: |
|
|
log_message(f'β File too large: {size_gb:.2f}GB, only {disk_info["free"]:.2f}GB free') |
|
|
return False |
|
|
|
|
|
with open(dest_path, "wb") as f: |
|
|
for chunk in r.iter_content(chunk_size=8192): |
|
|
if chunk: |
|
|
f.write(chunk) |
|
|
return True |
|
|
except Exception as e: |
|
|
if attempt < max_retries - 1: |
|
|
time.sleep(2 ** attempt) |
|
|
continue |
|
|
log_message(f"β Download failed after {max_retries} attempts: {e}") |
|
|
return False |
|
|
return False |
|
|
|
|
|
|
|
|
def is_multipart_rar(filename: str) -> bool: |
|
|
"""Check if this is a multi-part RAR file""" |
|
|
return ".part" in filename.lower() and filename.lower().endswith(".rar") |
|
|
|
|
|
|
|
|
def get_rar_part_base(filename: str) -> str: |
|
|
"""Get the base name for multi-part RAR files""" |
|
|
if ".part" in filename.lower(): |
|
|
return filename.split(".part")[0] |
|
|
return filename.replace(".rar", "") |
|
|
|
|
|
|
|
|
def extract_with_retry(rar_path: str, output_dir: str, max_retries: int = 2) -> bool: |
|
|
"""Extract RAR with retry and recovery, handling multi-part archives""" |
|
|
filename = os.path.basename(rar_path) |
|
|
|
|
|
|
|
|
if is_multipart_rar(filename): |
|
|
base_name = get_rar_part_base(filename) |
|
|
first_part = f"{base_name}.part01.rar" |
|
|
first_part_path = os.path.join(os.path.dirname(rar_path), first_part) |
|
|
|
|
|
if not os.path.exists(first_part_path): |
|
|
log_message(f"β οΈ Multi-part RAR detected but first part not found: {first_part}") |
|
|
return False |
|
|
|
|
|
rar_path = first_part_path |
|
|
log_message(f"π¦ Processing multi-part RAR starting with: {first_part}") |
|
|
|
|
|
for attempt in range(max_retries): |
|
|
try: |
|
|
|
|
|
test_cmd = ["unrar", "t", rar_path] |
|
|
test_result = subprocess.run(test_cmd, capture_output=True, text=True) |
|
|
if test_result.returncode != 0: |
|
|
log_message(f"β οΈ RAR test failed: {test_result.stderr}") |
|
|
if attempt == max_retries - 1: |
|
|
return False |
|
|
continue |
|
|
|
|
|
|
|
|
cmd = ["unrar", "x", "-o+", rar_path, output_dir] |
|
|
if attempt > 0: |
|
|
cmd.insert(2, "-kb") |
|
|
|
|
|
result = subprocess.run(cmd, capture_output=True, text=True) |
|
|
if result.returncode == 0: |
|
|
log_message(f"β
Successfully extracted: {os.path.basename(rar_path)}") |
|
|
return True |
|
|
else: |
|
|
error_msg = result.stderr or result.stdout |
|
|
log_message(f"β οΈ Extraction attempt {attempt + 1} failed: {error_msg}") |
|
|
|
|
|
if "checksum error" in error_msg.lower() or "CRC failed" in error_msg: |
|
|
log_message(f"β οΈ Data corruption detected, attempt {attempt + 1}") |
|
|
elif result.returncode == 10: |
|
|
log_message(f"β οΈ No files to extract (exit code 10)") |
|
|
return False |
|
|
elif result.returncode == 1: |
|
|
log_message(f"β οΈ Non-fatal error (exit code 1)") |
|
|
|
|
|
except Exception as e: |
|
|
log_message(f"β Extraction exception: {str(e)}") |
|
|
if attempt == max_retries - 1: |
|
|
return False |
|
|
time.sleep(1) |
|
|
|
|
|
return False |
|
|
|
|
|
|
|
|
|
|
|
def ensure_dir(path): |
|
|
os.makedirs(path, exist_ok=True) |
|
|
|
|
|
|
|
|
def extract_frames(video_path, output_dir, fps=DEFAULT_FPS): |
|
|
"""Extract frames from video at the specified frames per second (fps).""" |
|
|
log_message(f"[INFO] Extracting frames from {video_path} to {output_dir} at {fps} fps...") |
|
|
ensure_dir(output_dir) |
|
|
cap = cv2.VideoCapture(str(video_path)) |
|
|
if not cap.isOpened(): |
|
|
log_message(f"[ERROR] Failed to open video file: {video_path}") |
|
|
return 0 |
|
|
video_fps = cap.get(cv2.CAP_PROP_FPS) |
|
|
if not video_fps or video_fps <= 0: |
|
|
video_fps = 30 |
|
|
log_message(f"[WARN] Using fallback FPS: {video_fps}") |
|
|
frame_interval = int(round(video_fps / fps)) |
|
|
if frame_interval <= 0: |
|
|
frame_interval = 1 |
|
|
frame_idx = 0 |
|
|
saved_idx = 1 |
|
|
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) |
|
|
log_message(f"[DEBUG] Total frames in video: {total_frames}") |
|
|
while cap.isOpened(): |
|
|
ret, frame = cap.read() |
|
|
if not ret: |
|
|
break |
|
|
if frame_idx % frame_interval == 0: |
|
|
frame_name = f"{saved_idx:04d}.png" |
|
|
cv2.imwrite(str(Path(output_dir) / frame_name), frame) |
|
|
saved_idx += 1 |
|
|
frame_idx += 1 |
|
|
cap.release() |
|
|
log_message(f"Extracted {saved_idx-1} frames from {video_path} to {output_dir}") |
|
|
return saved_idx - 1 |
|
|
|
|
|
|
|
|
|
|
|
def to_rgb(img): |
|
|
if img is None: |
|
|
return None |
|
|
if len(img.shape) == 2: |
|
|
return cv2.cvtColor(img, cv2.COLOR_GRAY2BGR) |
|
|
if img.shape[2] == 4: |
|
|
return cv2.cvtColor(img, cv2.COLOR_BGRA2BGR) |
|
|
return img |
|
|
|
|
|
|
|
|
def get_mask_from_alpha(template_img): |
|
|
if template_img is not None and len(template_img.shape) == 3 and template_img.shape[2] == 4: |
|
|
|
|
|
return (template_img[:, :, 3] > 0).astype(np.uint8) * 255 |
|
|
return None |
|
|
|
|
|
|
|
|
def detect_cursor_in_frame_multi(frame, cursor_templates, threshold=CURSOR_THRESHOLD): |
|
|
"""Detect cursor position in a frame using multiple templates. Returns best match above threshold.""" |
|
|
best_pos = None |
|
|
best_conf = -1 |
|
|
best_template_name = None |
|
|
frame_rgb = to_rgb(frame) |
|
|
for template_name, cursor_template in cursor_templates.items(): |
|
|
template_rgb = to_rgb(cursor_template) |
|
|
mask = get_mask_from_alpha(cursor_template) |
|
|
if template_rgb is None or frame_rgb is None or template_rgb.shape[2] != frame_rgb.shape[2]: |
|
|
|
|
|
continue |
|
|
try: |
|
|
result = cv2.matchTemplate(frame_rgb, template_rgb, cv2.TM_CCOEFF_NORMED, mask=mask) |
|
|
except Exception: |
|
|
continue |
|
|
min_val, max_val, min_loc, max_loc = cv2.minMaxLoc(result) |
|
|
if max_val > best_conf: |
|
|
best_conf = max_val |
|
|
if max_val >= threshold: |
|
|
cursor_w, cursor_h = template_rgb.shape[1], template_rgb.shape[0] |
|
|
cursor_x = max_loc[0] + cursor_w // 2 |
|
|
cursor_y = max_loc[1] + cursor_h // 2 |
|
|
best_pos = (cursor_x, cursor_y) |
|
|
best_template_name = template_name |
|
|
if best_conf >= threshold: |
|
|
return best_pos, best_conf, best_template_name |
|
|
return None, best_conf, None |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
_WORKER_CURSOR_TEMPLATES = None |
|
|
_WORKER_THRESHOLD = None |
|
|
|
|
|
|
|
|
def _init_worker(template_paths, threshold): |
|
|
"""Initializer for pool workers: load templates into process-local global variable""" |
|
|
global _WORKER_CURSOR_TEMPLATES |
|
|
global _WORKER_THRESHOLD |
|
|
_WORKER_CURSOR_TEMPLATES = {} |
|
|
for tp in template_paths: |
|
|
try: |
|
|
img = cv2.imread(tp, cv2.IMREAD_UNCHANGED) |
|
|
if img is not None: |
|
|
_WORKER_CURSOR_TEMPLATES[os.path.basename(tp)] = img |
|
|
except Exception: |
|
|
pass |
|
|
_WORKER_THRESHOLD = threshold |
|
|
|
|
|
|
|
|
def track_cursor_worker(frame_file, cursor_templates, threshold, log_queue): |
|
|
"""Worker function that tracks cursor and sends logs back.""" |
|
|
frame = cv2.imread(str(frame_file), cv2.IMREAD_UNCHANGED) |
|
|
if frame is None: |
|
|
log_queue.put(f"[WARN] Frame unreadable: {frame_file.name}") |
|
|
return { |
|
|
"frame": frame_file.name, |
|
|
"cursor_active": False, |
|
|
"x": None, |
|
|
"y": None, |
|
|
"confidence": -1, |
|
|
"template": None |
|
|
} |
|
|
|
|
|
pos, conf, template_name = detect_cursor_in_frame_multi(frame, cursor_templates, threshold) |
|
|
|
|
|
if pos is not None: |
|
|
log_queue.put( |
|
|
f"[FRAME] {frame_file.name} β FOUND cursor at ({pos[0]},{pos[1]}) conf={conf:.3f} template={template_name}" |
|
|
) |
|
|
return { |
|
|
"frame": frame_file.name, |
|
|
"cursor_active": True, |
|
|
"x": pos[0], |
|
|
"y": pos[1], |
|
|
"confidence": conf, |
|
|
"template": template_name |
|
|
} |
|
|
else: |
|
|
log_queue.put( |
|
|
f"[FRAME] {frame_file.name} β NO cursor (max_conf={conf:.3f})" |
|
|
) |
|
|
return { |
|
|
"frame": frame_file.name, |
|
|
"cursor_active": False, |
|
|
"x": None, |
|
|
"y": None, |
|
|
"confidence": conf, |
|
|
"template": None |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
def upload_to_hf_dataset(local_path, dataset_repo_id="Fred808/data", hf_token=None): |
|
|
"""Upload JSON tracking results to Hugging Face dataset repo""" |
|
|
try: |
|
|
api = HfApi(token=hf_token or HF_TOKEN) |
|
|
filename = os.path.basename(local_path) |
|
|
repo_path = f"results/{filename}" |
|
|
api.upload_file( |
|
|
path_or_fileobj=local_path, |
|
|
path_in_repo=repo_path, |
|
|
repo_id=dataset_repo_id, |
|
|
repo_type="dataset" |
|
|
) |
|
|
log_message(f"[UPLOAD] β
Uploaded {filename} to {dataset_repo_id}/{repo_path}") |
|
|
except Exception as e: |
|
|
log_message(f"[UPLOAD ERROR] {e}") |
|
|
|
|
|
def log_listener(log_queue): |
|
|
"""Continuously print log messages from worker processes.""" |
|
|
while True: |
|
|
msg = log_queue.get() |
|
|
if msg == "STOP": |
|
|
break |
|
|
log_message(msg) |
|
|
|
|
|
def track_cursor_parallel(frames_dir, cursor_templates_dir, output_json_path, |
|
|
threshold=CURSOR_THRESHOLD, start_frame=1, |
|
|
batch_size=100, email_results=False): |
|
|
"""Parallelized cursor tracking with real-time logging""" |
|
|
log_message(f"[INFO] Tracking cursors in {frames_dir} with real-time logging...") |
|
|
|
|
|
frames_dir = Path(frames_dir).resolve() |
|
|
output_json_path = Path(output_json_path).resolve() |
|
|
cursor_templates_dir = Path(cursor_templates_dir).resolve() |
|
|
ensure_dir(output_json_path.parent) |
|
|
|
|
|
|
|
|
cursor_templates = {} |
|
|
for template_file in cursor_templates_dir.glob("*.png"): |
|
|
img = cv2.imread(str(template_file), cv2.IMREAD_UNCHANGED) |
|
|
if img is not None: |
|
|
cursor_templates[template_file.name] = img |
|
|
if not cursor_templates: |
|
|
log_message(f"[ERROR] No cursor templates found in {cursor_templates_dir}") |
|
|
return 0 |
|
|
|
|
|
|
|
|
all_frames = sorted(frames_dir.glob("*.png")) |
|
|
all_frames = [f for f in all_frames if int(f.stem) >= start_frame] |
|
|
total_frames = len(all_frames) |
|
|
if not total_frames: |
|
|
log_message("[WARN] No frames found to process.") |
|
|
return 0 |
|
|
|
|
|
log_message(f"[INFO] Total frames to track: {total_frames}") |
|
|
|
|
|
|
|
|
manager = multiprocessing.Manager() |
|
|
log_queue = manager.Queue() |
|
|
listener = multiprocessing.Process(target=log_listener, args=(log_queue,)) |
|
|
listener.start() |
|
|
|
|
|
pool = multiprocessing.Pool(multiprocessing.cpu_count()) |
|
|
results = [] |
|
|
processed = 0 |
|
|
|
|
|
try: |
|
|
|
|
|
for i in range(0, total_frames, batch_size): |
|
|
batch = all_frames[i:i + batch_size] |
|
|
tasks = [ |
|
|
pool.apply_async( |
|
|
track_cursor_worker, |
|
|
(frame_file, cursor_templates, threshold, log_queue) |
|
|
) |
|
|
for frame_file in batch |
|
|
] |
|
|
|
|
|
for t in tasks: |
|
|
res = t.get() |
|
|
results.append(res) |
|
|
processed += 1 |
|
|
if processed % 50 == 0 or processed == total_frames: |
|
|
log_message(f"[PROGRESS] {processed}/{total_frames} frames processed") |
|
|
with open(output_json_path, "w") as f: |
|
|
json.dump(results, f, indent=2) |
|
|
|
|
|
pool.close() |
|
|
pool.join() |
|
|
|
|
|
|
|
|
with open(output_json_path, "w") as f: |
|
|
json.dump(results, f, indent=2) |
|
|
log_message(f"[SUCCESS] Cursor tracking results saved to {output_json_path}") |
|
|
|
|
|
upload_to_hf_dataset(output_json_path, dataset_repo_id="Fred808/data", hf_token=HF_TOKEN) |
|
|
|
|
|
if email_results: |
|
|
log_message("[INFO] Sending email results (if configured)...") |
|
|
to_email = os.environ.get("TO_EMAIL") |
|
|
from_email = os.environ.get("FROM_EMAIL") |
|
|
app_password = os.environ.get("GMAIL_APP_PASSWORD") |
|
|
if to_email and from_email and app_password: |
|
|
send_email_with_attachment( |
|
|
subject="Cursor Tracking Results", |
|
|
body="See attached JSON results.", |
|
|
to_email=to_email, |
|
|
from_email=from_email, |
|
|
app_password=app_password, |
|
|
attachment_path=output_json_path |
|
|
) |
|
|
|
|
|
except Exception as e: |
|
|
log_message(f"[ERROR] Exception during parallel tracking: {e}") |
|
|
pool.terminate() |
|
|
|
|
|
finally: |
|
|
log_queue.put("STOP") |
|
|
listener.join() |
|
|
|
|
|
active = len([r for r in results if r["cursor_active"]]) |
|
|
log_message(f"[DONE] {active}/{total_frames} frames contained cursors.") |
|
|
return active |
|
|
|
|
|
def send_email_with_attachment(subject, body, to_email, from_email, app_password, attachment_path): |
|
|
msg = EmailMessage() |
|
|
msg["Subject"] = subject |
|
|
msg["From"] = from_email |
|
|
msg["To"] = to_email |
|
|
msg.set_content(body) |
|
|
with open(attachment_path, "rb") as f: |
|
|
file_data = f.read() |
|
|
file_name = Path(attachment_path).name |
|
|
msg.add_attachment(file_data, maintype="application", subtype="octet-stream", filename=file_name) |
|
|
try: |
|
|
with smtplib.SMTP_SSL("smtp.gmail.com", 465) as smtp: |
|
|
smtp.login(from_email, app_password) |
|
|
smtp.send_message(msg) |
|
|
log_message(f"[SUCCESS] Email sent to {to_email}") |
|
|
except Exception as e: |
|
|
log_message(f"[ERROR] Failed to send email: {e}") |
|
|
|
|
|
|
|
|
def track_cursor(frames_dir, cursor_templates_dir, output_json_path, threshold=CURSOR_THRESHOLD, start_frame=1, batch_size=100, email_results=False): |
|
|
""" |
|
|
Backwards-compatible wrapper that calls the parallel implementation. |
|
|
Keep this name so other parts of your code that call track_cursor() keep working. |
|
|
""" |
|
|
return track_cursor_parallel(frames_dir, cursor_templates_dir, output_json_path, threshold, start_frame, batch_size, email_results) |
|
|
|
|
|
|
|
|
def process_rar_file(rar_path: str) -> bool: |
|
|
"""Process a single RAR file - extract, then process videos for frames and cursor tracking""" |
|
|
filename = os.path.basename(rar_path) |
|
|
processing_status["current_file"] = filename |
|
|
|
|
|
|
|
|
if is_multipart_rar(filename): |
|
|
course_name = get_rar_part_base(filename) |
|
|
else: |
|
|
course_name = filename.replace(".rar", "") |
|
|
|
|
|
extract_dir = os.path.join(EXTRACT_FOLDER, course_name) |
|
|
|
|
|
try: |
|
|
log_message(f"π Processing: {filename}") |
|
|
|
|
|
|
|
|
if os.path.exists(extract_dir): |
|
|
shutil.rmtree(extract_dir, ignore_errors=True) |
|
|
|
|
|
|
|
|
os.makedirs(extract_dir, exist_ok=True) |
|
|
if not extract_with_retry(rar_path, extract_dir): |
|
|
raise Exception("RAR extraction failed") |
|
|
|
|
|
|
|
|
file_count = 0 |
|
|
video_files_found = [] |
|
|
for root, dirs, files in os.walk(extract_dir): |
|
|
for file in files: |
|
|
file_count += 1 |
|
|
if file.lower().endswith((".mp4", ".avi", ".mov", ".mkv")): |
|
|
video_files_found.append(os.path.join(root, file)) |
|
|
|
|
|
processing_status["extracted_courses"] += 1 |
|
|
log_message(f"β
Successfully extracted '{course_name}' ({file_count} files, {len(video_files_found)} videos)") |
|
|
|
|
|
|
|
|
for video_path in video_files_found: |
|
|
video_filename = Path(video_path).name |
|
|
|
|
|
safe_video_name = video_filename.replace(".", "_") |
|
|
frames_output_dir = os.path.join(FRAMES_OUTPUT_FOLDER, f"{course_name}_{safe_video_name}_frames") |
|
|
ensure_dir(frames_output_dir) |
|
|
|
|
|
extracted_frames_count = extract_frames(video_path, frames_output_dir, fps=DEFAULT_FPS) |
|
|
processing_status["extracted_frames_count"] += extracted_frames_count |
|
|
if extracted_frames_count > 0: |
|
|
processing_status["extracted_videos"] += 1 |
|
|
log_message(f"[INFO] Extracted {extracted_frames_count} frames from {video_filename}") |
|
|
|
|
|
|
|
|
cursor_output_json = os.path.join(CURSOR_TRACKING_OUTPUT_FOLDER, f"{course_name}_{safe_video_name}_cursor_data.json") |
|
|
tracked_cursors = track_cursor(frames_output_dir, CURSOR_TEMPLATES_DIR, cursor_output_json, threshold=CURSOR_THRESHOLD, batch_size=100) |
|
|
processing_status["tracked_cursors_count"] += tracked_cursors |
|
|
log_message(f"[INFO] Tracked {tracked_cursors} cursors in frames from {video_filename}") |
|
|
else: |
|
|
log_message(f"[WARN] No frames extracted from {video_filename}") |
|
|
|
|
|
return True |
|
|
|
|
|
except Exception as e: |
|
|
error_msg = str(e) |
|
|
log_message(f"β Processing failed: {error_msg}") |
|
|
log_failed_file(filename, error_msg) |
|
|
return False |
|
|
|
|
|
finally: |
|
|
processing_status["current_file"] = None |
|
|
|
|
|
|
|
|
def main_processing_loop(start_index: int = 0): |
|
|
"""Main processing workflow - extraction, frame extraction, and cursor tracking""" |
|
|
processing_status["is_running"] = True |
|
|
|
|
|
try: |
|
|
|
|
|
processed_rars = load_json_state(PROCESS_STATE_FILE, {"processed_rars": []})["processed_rars"] |
|
|
download_state = load_json_state(DOWNLOAD_STATE_FILE, {"next_download_index": 5}) |
|
|
|
|
|
|
|
|
next_index = start_index if start_index > 0 else download_state["next_download_index"] |
|
|
|
|
|
log_message(f"π Starting from index {next_index}") |
|
|
log_message(f"π Previously processed: {len(processed_rars)} files") |
|
|
|
|
|
|
|
|
try: |
|
|
files = list(hf_api.list_repo_files(repo_id=SOURCE_REPO_ID, repo_type="dataset")) |
|
|
rar_files = sorted([f for f in files if f.endswith(".rar")]) |
|
|
|
|
|
processing_status["total_files"] = len(rar_files) |
|
|
log_message(f"π Found {len(rar_files)} RAR files in repository") |
|
|
|
|
|
if next_index >= len(rar_files): |
|
|
log_message("β
All files have been processed!") |
|
|
return |
|
|
|
|
|
except Exception as e: |
|
|
log_message(f"β Failed to get file list: {str(e)}") |
|
|
return |
|
|
|
|
|
|
|
|
if next_index < len(rar_files): |
|
|
rar_file = rar_files[next_index] |
|
|
filename = os.path.basename(rar_file) |
|
|
|
|
|
if filename in processed_rars: |
|
|
log_message(f"βοΈ Skipping already processed: {filename}") |
|
|
processing_status["processed_files"] += 1 |
|
|
|
|
|
next_index += 1 |
|
|
save_json_state(DOWNLOAD_STATE_FILE, {"next_download_index": next_index}) |
|
|
log_message(f"π Moving to next file. Progress: {next_index}/{len(rar_files)}") |
|
|
return |
|
|
|
|
|
log_message(f"π₯ Downloading: {filename}") |
|
|
dest_path = os.path.join(DOWNLOAD_FOLDER, filename) |
|
|
|
|
|
|
|
|
download_url = f"https://huggingface.co/datasets/{SOURCE_REPO_ID}/resolve/main/{rar_file}" |
|
|
if download_with_retry(download_url, dest_path): |
|
|
|
|
|
if process_rar_file(dest_path): |
|
|
processed_rars.append(filename) |
|
|
save_json_state(PROCESS_STATE_FILE, {"processed_rars": processed_rars}) |
|
|
log_message(f"β
Successfully processed: {filename}") |
|
|
processing_status["processed_files"] += 1 |
|
|
else: |
|
|
log_message(f"β Failed to process: {filename}") |
|
|
processing_status["failed_files"] += 1 |
|
|
|
|
|
|
|
|
try: |
|
|
os.remove(dest_path) |
|
|
log_message(f"ποΈ Cleaned up download: {filename}") |
|
|
except Exception: |
|
|
pass |
|
|
else: |
|
|
log_message(f"β Failed to download: {filename}") |
|
|
processing_status["failed_files"] += 1 |
|
|
|
|
|
|
|
|
next_index += 1 |
|
|
save_json_state(DOWNLOAD_STATE_FILE, {"next_download_index": next_index}) |
|
|
|
|
|
|
|
|
log_message(f"π Progress: {next_index}/{len(rar_files)} files processed") |
|
|
log_message(f'π Extracted: {processing_status["extracted_courses"]} courses') |
|
|
log_message(f'π Videos Processed: {processing_status["extracted_videos"]}') |
|
|
log_message(f'π Frames Extracted: {processing_status["extracted_frames_count"]}') |
|
|
log_message(f'π Cursors Tracked: {processing_status["tracked_cursors_count"]}') |
|
|
log_message(f'π Failed: {processing_status["failed_files"]} files') |
|
|
|
|
|
if next_index < len(rar_files): |
|
|
log_message(f"π Run the script again to process the next file: {os.path.basename(rar_files[next_index])}") |
|
|
else: |
|
|
log_message("π All files have been processed!") |
|
|
else: |
|
|
log_message("β
All files have been processed!") |
|
|
|
|
|
log_message("π Processing complete!") |
|
|
log_message(f'π Final stats: {processing_status["extracted_courses"]} courses extracted, {processing_status["extracted_videos"]} videos processed, {processing_status["extracted_frames_count"]} frames extracted, {processing_status["tracked_cursors_count"]} cursors tracked') |
|
|
|
|
|
except KeyboardInterrupt: |
|
|
log_message("βΉοΈ Processing interrupted by user") |
|
|
except Exception as e: |
|
|
log_message(f"β Fatal error: {str(e)}") |
|
|
finally: |
|
|
processing_status["is_running"] = False |
|
|
cleanup_temp_files() |
|
|
|
|
|
|
|
|
|
|
|
__all__ = [ |
|
|
"main_processing_loop", |
|
|
"processing_status", |
|
|
"CURSOR_TRACKING_OUTPUT_FOLDER", |
|
|
"CURSOR_TEMPLATES_DIR", |
|
|
"log_message", |
|
|
"send_email_with_attachment", |
|
|
"track_cursor", |
|
|
"extract_frames", |
|
|
"DEFAULT_FPS", |
|
|
"CURSOR_THRESHOLD", |
|
|
"ensure_dir" |
|
|
] |
|
|
|