eliason1 commited on
Commit
2a3ba2f
Β·
verified Β·
1 Parent(s): 3a86067

Upload 4 files

Browse files
Files changed (4) hide show
  1. Dockerfile +45 -0
  2. download_api.py +267 -0
  3. requirements.txt +11 -0
  4. vision_analyzer.py +692 -0
Dockerfile ADDED
@@ -0,0 +1,45 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ FROM python:3.11-slim-bullseye
2
+
3
+ # Install system dependencies
4
+ RUN sed -i 's/main/main contrib non-free/' /etc/apt/sources.list && \
5
+ apt-get update && \
6
+ apt-get install -y --no-install-recommends \
7
+ unrar \
8
+ libgl1 \
9
+ libglib2.0-0 \
10
+ && rm -rf /var/lib/apt/lists/*
11
+
12
+ WORKDIR /app
13
+
14
+ # Upgrade pip and install core dependencies first
15
+ RUN pip install --no-cache-dir --upgrade pip setuptools wheel packaging
16
+
17
+ # Install CPU-only PyTorch first
18
+
19
+ # Copy requirements and install with special handling for flash_attn
20
+ COPY requirements.txt .
21
+ RUN pip install --no-cache-dir \
22
+ -r requirements.txt \
23
+ --find-links https://download.pytorch.org/whl/cpu \
24
+ --extra-index-url https://pypi.org/simple && \
25
+ # Install remaining packages that might have been skipped
26
+ pip install --no-cache-dir \
27
+ accelerate \
28
+ transformers==4.36.2 \
29
+ timm==0.9.12 \
30
+ einops==0.7.0
31
+
32
+ # Copy application code
33
+ COPY . .
34
+
35
+ # Create non-root user
36
+ RUN useradd -m -u 1000 user && \
37
+ chown -R user:user /app
38
+
39
+ USER user
40
+
41
+ # Environment variables to suppress warnings
42
+ ENV HF_HUB_DISABLE_PROGRESS=1
43
+ ENV TF_CPP_MIN_LOG_LEVEL=3
44
+
45
+ CMD ["uvicorn", "download_api:app", "--host", "0.0.0.0", "--port", "7860"]
download_api.py ADDED
@@ -0,0 +1,267 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ import json
3
+ import time
4
+ import threading
5
+ import asyncio
6
+ from fastapi import FastAPI, HTTPException, BackgroundTasks
7
+ from fastapi.middleware.cors import CORSMiddleware
8
+ from fastapi.responses import JSONResponse, FileResponse
9
+ from fastapi.staticfiles import StaticFiles
10
+ import uvicorn
11
+ from typing import Dict
12
+ from pathlib import Path
13
+ import subprocess
14
+ from datetime import datetime
15
+
16
+ import torch
17
+
18
+ # Import core functionality
19
+ from vision_analyzer import (
20
+ main_processing_loop,
21
+ processing_status,
22
+ log_message,
23
+ )
24
+
25
+ # FastAPI App Definition
26
+ app = FastAPI(title="Video Analysis API",
27
+ description="API to access video frame analysis results and extracted images",
28
+ version="1.0.0")
29
+
30
+ # Add CORS middleware to allow cross-origin requests
31
+ app.add_middleware(
32
+ CORSMiddleware,
33
+ allow_origins=["*"], # Allows all origins
34
+ allow_credentials=True,
35
+ allow_methods=["*"], # Allows all methods
36
+ allow_headers=["*"],
37
+ )
38
+
39
+ # Global variables for processing and frame tracking
40
+ processing_thread = None
41
+ frame_locks = {} # Dict to track frame locks: {course: {frame: {"locked_by": id, "locked_at": timestamp}}}
42
+ processed_frames = {} # Dict to track processed frames: {course: {frame: {"processed_by": id, "processed_at": timestamp}}}
43
+ LOCK_TIMEOUT = 300 # 5 minutes timeout for locks
44
+ TRACKING_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), "frame_tracking.json")
45
+
46
+ def save_tracking_state():
47
+ """Save frame tracking state to disk"""
48
+ state = {
49
+ "frame_locks": frame_locks,
50
+ "processed_frames": processed_frames
51
+ }
52
+ try:
53
+ with open(TRACKING_FILE, "w") as f:
54
+ json.dump(state, f, indent=2)
55
+ except Exception as e:
56
+ log_message(f"Error saving tracking state: {e}")
57
+
58
+ def load_tracking_state():
59
+ """Load frame tracking state from disk"""
60
+ global frame_locks, processed_frames
61
+ try:
62
+ with open(TRACKING_FILE, "r") as f:
63
+ state = json.load(f)
64
+ frame_locks = state.get("frame_locks", {})
65
+ processed_frames = state.get("processed_frames", {})
66
+ except FileNotFoundError:
67
+ log_message("No previous tracking state found")
68
+ except Exception as e:
69
+ log_message(f"Error loading tracking state: {e}")
70
+
71
+ def check_frame_lock(course: str, frame: str) -> bool:
72
+ """Check if frame is locked and lock hasn't expired"""
73
+ if course in frame_locks and frame in frame_locks[course]:
74
+ lock = frame_locks[course][frame]
75
+ if time.time() - lock["locked_at"] < LOCK_TIMEOUT:
76
+ return True
77
+ # Lock expired, remove it
78
+ del frame_locks[course][frame]
79
+ save_tracking_state()
80
+ return False
81
+
82
+ def lock_frame(course: str, frame: str, requester_id: str) -> bool:
83
+ """Attempt to lock a frame for processing"""
84
+ if check_frame_lock(course, frame):
85
+ return False
86
+
87
+ if course not in frame_locks:
88
+ frame_locks[course] = {}
89
+
90
+ frame_locks[course][frame] = {
91
+ "locked_by": requester_id,
92
+ "locked_at": time.time()
93
+ }
94
+ save_tracking_state()
95
+ return True
96
+
97
+ def mark_frame_processed(course: str, frame: str, requester_id: str):
98
+ """Mark a frame as successfully processed"""
99
+ if course not in processed_frames:
100
+ processed_frames[course] = {}
101
+
102
+ processed_frames[course][frame] = {
103
+ "processed_by": requester_id,
104
+ "processed_at": time.time()
105
+ }
106
+
107
+ # Remove the lock if it exists
108
+ if course in frame_locks and frame in frame_locks[course]:
109
+ del frame_locks[course][frame]
110
+
111
+ save_tracking_state()
112
+
113
+ def log_message(message):
114
+ """Add a log message with timestamp"""
115
+ timestamp = datetime.now().strftime("%H:%M:%S")
116
+ log_entry = f"[{timestamp}] {message}"
117
+ processing_status["logs"].append(log_entry)
118
+
119
+ # Keep only the last 100 logs
120
+ if len(processing_status["logs"]) > 100:
121
+ processing_status["logs"] = processing_status["logs"][-100:]
122
+
123
+ print(log_entry)
124
+
125
+ @app.on_event("startup")
126
+ async def startup_event():
127
+ """Initialize frame tracking and start processing loop"""
128
+ # Load frame tracking state
129
+ load_tracking_state()
130
+ log_message("βœ“ Loaded frame tracking state")
131
+
132
+ # Start processing thread
133
+ global processing_thread
134
+ if not (processing_thread and processing_thread.is_alive()):
135
+ log_message("πŸš€ Starting RAR extraction, frame extraction, and vision analysis pipeline in background...")
136
+ processing_thread = threading.Thread(target=main_processing_loop)
137
+ processing_thread.daemon = True
138
+ processing_thread.start()
139
+
140
+ @app.get("/")
141
+ async def root():
142
+ """Root endpoint that returns basic info"""
143
+ return {
144
+ "message": "Video Analysis API",
145
+ "status": "running",
146
+ "endpoints": {
147
+ "/status": "Get processing status",
148
+ "/courses": "List all available course folders",
149
+ "/images/{course_folder}": "List images in a course folder",
150
+ "/images/{course_folder}/{frame_filename}": "Get specific frame image",
151
+ "/start-processing": "Start processing pipeline",
152
+ "/stop-processing": "Stop processing pipeline"
153
+ }
154
+ }
155
+
156
+ @app.get("/status")
157
+ async def get_status():
158
+ """Get current processing status"""
159
+ return {
160
+ "processing_status": processing_status
161
+ }
162
+
163
+ # ===== NEW IMAGE SERVING ENDPOINTS =====
164
+
165
+ @app.post("/middleware/release/frame/{course_folder}/{video}/{frame}")
166
+ async def release_frame(course_folder: str, video: str, frame: str, requester_id: str):
167
+ """Release a frame lock"""
168
+ if course_folder in frame_locks and frame in frame_locks[course_folder]:
169
+ lock = frame_locks[course_folder][frame]
170
+ if lock["locked_by"] == requester_id:
171
+ del frame_locks[course_folder][frame]
172
+ save_tracking_state()
173
+ return {"status": "released"}
174
+ return {"status": "not_found"}
175
+
176
+ @app.post("/middleware/release/course/{course_folder}")
177
+ async def release_course(course_folder: str, requester_id: str):
178
+ """Release all frame locks for a course"""
179
+ if course_folder in frame_locks:
180
+ # Only release frames locked by this requester
181
+ frames_to_release = [
182
+ frame for frame, lock in frame_locks[course_folder].items()
183
+ if lock["locked_by"] == requester_id
184
+ ]
185
+ for frame in frames_to_release:
186
+ del frame_locks[course_folder][frame]
187
+ save_tracking_state()
188
+ return {"status": "released"}
189
+
190
+ """
191
+ List all available course folders with their image counts
192
+ """
193
+ if not os.path.exists(FRAMES_OUTPUT_FOLDER):
194
+ return {"courses": [], "message": "Frames output folder does not exist yet"}
195
+
196
+ courses = []
197
+ for folder in os.listdir(FRAMES_OUTPUT_FOLDER):
198
+ folder_path = os.path.join(FRAMES_OUTPUT_FOLDER, folder)
199
+ if os.path.isdir(folder_path):
200
+ # Count image files
201
+ image_count = len([f for f in os.listdir(folder_path)
202
+ if f.lower().endswith(('.png', '.jpg', '.jpeg'))])
203
+ courses.append({
204
+ "course_folder": folder,
205
+ "image_count": image_count,
206
+ "images_url": f"/images/{folder}",
207
+ "sample_image_url": f"/images/{folder}/0001.png" if image_count > 0 else None
208
+ })
209
+
210
+ return {
211
+ "total_courses": len(courses),
212
+ "courses": courses
213
+ }
214
+
215
+
216
+ # Signal handlers to prevent accidental shutdown
217
+ def handle_shutdown(signum, frame):
218
+ """Prevent shutdown on SIGTERM/SIGINT"""
219
+ print(f"\n⚠️ Received signal {signum}. Server will continue running.")
220
+ print("Use Ctrl+Break or kill -9 to force stop.")
221
+
222
+ # Setup signal handlers for graceful shutdown prevention
223
+ import signal
224
+ signal.signal(signal.SIGINT, handle_shutdown)
225
+ signal.signal(signal.SIGTERM, handle_shutdown)
226
+
227
+ # Server lifecycle events
228
+ @app.on_event("shutdown")
229
+ async def shutdown_event():
230
+ """Save state on shutdown attempt"""
231
+ save_tracking_state()
232
+ print("πŸ’Ύ Saved tracking state")
233
+ print("⚠️ Server shutdown prevented - use Ctrl+Break or kill -9 to force stop")
234
+ # Prevent shutdown by not returning
235
+ while True:
236
+ await asyncio.sleep(1)
237
+
238
+ if __name__ == "__main__":
239
+ # Start the FastAPI server
240
+ print("πŸš€ Starting Video Analysis FastAPI Server (Persistent Mode)...")
241
+ print("API Documentation will be available at: http://localhost:8000/docs")
242
+ print("API Root endpoint: http://localhost:8000/")
243
+ print("⚠️ Server will continue running even after processing completes")
244
+ print("Use Ctrl+Break or kill -9 to force stop")
245
+
246
+
247
+ # Start processing in thread instead of blocking
248
+ processing_thread = threading.Thread(target=main_processing_loop)
249
+ processing_thread.daemon = False # Make non-daemon so it doesn't exit
250
+ processing_thread.start()
251
+
252
+ # Configure uvicorn for persistent running
253
+ config = uvicorn.Config(
254
+ app=app,
255
+ host="0.0.0.0",
256
+ port=8000,
257
+ log_level="info",
258
+ reload=False,
259
+ workers=1,
260
+ loop="asyncio",
261
+ timeout_keep_alive=600, # Keep connections alive longer
262
+ access_log=True
263
+ )
264
+
265
+ # Run server with persistent config
266
+ server = uvicorn.Server(config)
267
+ server.run()
requirements.txt ADDED
@@ -0,0 +1,11 @@
 
 
 
 
 
 
 
 
 
 
 
 
1
+ accelerate
2
+ fastapi
3
+ uvicorn
4
+ opencv-python-headless
5
+ numpy
6
+ pathlib
7
+ huggingface_hub
8
+ pillow
9
+ rarfile
10
+ python-multipart
11
+ moviepy
vision_analyzer.py ADDED
@@ -0,0 +1,692 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ import json
3
+ import requests
4
+ import subprocess
5
+ import shutil
6
+ import time
7
+ import re
8
+ import threading
9
+ from typing import Dict, List, Set, Optional, Any
10
+ from huggingface_hub import HfApi, list_repo_files, CommitOperationAdd, hf_hub_download, hf_hub_url
11
+
12
+ import cv2
13
+ import numpy as np
14
+ from pathlib import Path
15
+ import smtplib
16
+ from email.message import EmailMessage
17
+ from moviepy import VideoFileClip
18
+
19
+ # ==== CONFIGURATION ====
20
+ HF_TOKEN = os.getenv("HF_TOKEN", "")
21
+ SOURCE_REPO_ID = os.getenv("SOURCE_REPO", "Fred808/BG1")
22
+ TARGET_REPO_ID = os.getenv("TARGET_REPO", "Samfredoly/BG_Vid") # New target repo for uploads
23
+
24
+ # Path Configuration
25
+ DOWNLOAD_FOLDER = "downloads"
26
+ EXTRACT_FOLDER = "extracted"
27
+ LOCAL_STATE_FOLDER = ".state" # Folder to temporarily store the downloaded state file
28
+ FRAMES_OUTPUT_FOLDER = "frames" # Folder to store extracted frames
29
+
30
+ os.makedirs(DOWNLOAD_FOLDER, exist_ok=True)
31
+ os.makedirs(EXTRACT_FOLDER, exist_ok=True)
32
+ os.makedirs(LOCAL_STATE_FOLDER, exist_ok=True)
33
+
34
+ # State Files
35
+ FAILED_FILES_LOG = "failed_files.log"
36
+ HF_STATE_FILE = "processing_state2.json" # New remote state file name
37
+
38
+ # Processing Parameters
39
+ CHUNK_SIZE = 2
40
+ PROCESSING_DELAY = 2
41
+ MAX_RETRIES = 3
42
+ MIN_FREE_SPACE_GB = 2 # Minimum free space in GB before processing
43
+
44
+
45
+ # Initialize HF API
46
+ hf_api = HfApi(token=HF_TOKEN)
47
+
48
+ # Global State
49
+ processing_status = {
50
+ "is_running": False,
51
+ "current_file": None,
52
+ "total_files": 0,
53
+ "processed_files": 0,
54
+ "failed_files": 0,
55
+ "extracted_courses": 0,
56
+ "extracted_videos": 0,
57
+ "last_update": None,
58
+ "logs": []
59
+ }
60
+
61
+ def log_message(message: str, level: str = "INFO"):
62
+ """Log messages with timestamp"""
63
+ timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
64
+ log_entry = f"[{timestamp}] {level}: {message}"
65
+ print(log_entry)
66
+ processing_status["logs"].append(log_entry)
67
+ processing_status["last_update"] = timestamp
68
+ if len(processing_status["logs"]) > 100:
69
+ processing_status["logs"] = processing_status["logs"][-100:]
70
+
71
+ def log_failed_file(filename: str, error: str):
72
+ """Log failed files to persistent file"""
73
+ with open(FAILED_FILES_LOG, "a") as f:
74
+ f.write(f"{time.strftime('%Y-%m-%d %H:%M:%S')} - {filename}: {error}\n")
75
+
76
+ def get_disk_usage(path: str) -> Dict[str, float]:
77
+ """Get disk usage statistics in GB"""
78
+ statvfs = os.statvfs(path)
79
+ total = statvfs.f_frsize * statvfs.f_blocks / (1024**3)
80
+ free = statvfs.f_frsize * statvfs.f_bavail / (1024**3)
81
+ used = total - free
82
+ return {"total": total, "free": free, "used": used}
83
+
84
+ def check_disk_space(path: str = ".") -> bool:
85
+ """Check if there's enough disk space"""
86
+ disk_info = get_disk_usage(path)
87
+ if disk_info["free"] < MIN_FREE_SPACE_GB:
88
+ log_message(f'⚠️ Low disk space: {disk_info["free"]:.2f}GB free, {disk_info["used"]:.2f}GB used')
89
+ return False
90
+ return True
91
+
92
+ def cleanup_temp_files():
93
+ """Clean up temporary files to free space"""
94
+ log_message("🧹 Cleaning up temporary files...", "INFO")
95
+
96
+ # Clean old downloads (keep only current processing file)
97
+ current_file = processing_status.get("current_file")
98
+ for file in os.listdir(DOWNLOAD_FOLDER):
99
+ if file != current_file and file.endswith((".rar", ".zip")):
100
+ try:
101
+ os.remove(os.path.join(DOWNLOAD_FOLDER, file))
102
+ log_message(f"πŸ—‘οΈ Removed old download: {file}", "INFO")
103
+ except:
104
+ pass
105
+
106
+ def load_json_state(file_path: str, default_value: Dict[str, Any]) -> Dict[str, Any]:
107
+ """Load state from JSON file with migration logic for new structure."""
108
+ if os.path.exists(file_path):
109
+ try:
110
+ with open(file_path, "r") as f:
111
+ data = json.load(f)
112
+
113
+ # --- MIGRATION LOGIC ---
114
+ # 1. Convert old "processed_rars" list to new "file_states" dictionary
115
+ if "processed_rars" in data and isinstance(data["processed_rars"], list):
116
+ log_message("ℹ️ Migrating old 'processed_rars' list to new 'file_states' dictionary.", "INFO")
117
+ data["file_states"] = {
118
+ filename: "processed" for filename in data.pop("processed_rars")
119
+ }
120
+
121
+ # 2. Ensure file_states exists and is a dict
122
+ if "file_states" not in data or not isinstance(data["file_states"], dict):
123
+ log_message("ℹ️ Initializing 'file_states' dictionary.", "INFO")
124
+ data["file_states"] = {}
125
+
126
+ # 3. Ensure next_download_index exists
127
+ if "next_download_index" not in data:
128
+ data["next_download_index"] = 0
129
+
130
+ return data
131
+ except json.JSONDecodeError:
132
+ log_message(f"⚠️ Corrupted state file: {file_path}", "WARNING")
133
+ return default_value
134
+
135
+ def save_json_state(file_path: str, data: Dict[str, Any]):
136
+ """Save state to JSON file"""
137
+ with open(file_path, "w") as f:
138
+ json.dump(data, f, indent=2)
139
+
140
+ def download_hf_state(repo_id: str, filename: str) -> Dict[str, Any]:
141
+ """Downloads the state file from Hugging Face or returns a default state."""
142
+ local_path = os.path.join(LOCAL_STATE_FOLDER, filename)
143
+ # Changed default state to use 'file_states' for the new structure
144
+ default_state = {"next_download_index": 0, "file_states": {}}
145
+
146
+ try:
147
+ # Check if the file exists in the repo first
148
+ files = hf_api.list_repo_files(repo_id=repo_id, repo_type="dataset")
149
+ if filename not in files:
150
+ log_message(f"ℹ️ State file {filename} not found in {repo_id}. Starting from default state.", "INFO")
151
+ return default_state
152
+
153
+ # Download the file
154
+ hf_hub_download(
155
+ repo_id=repo_id,
156
+ filename=filename,
157
+ repo_type="dataset",
158
+ local_dir=LOCAL_STATE_FOLDER,
159
+ local_dir_use_symlinks=False
160
+ )
161
+
162
+ log_message(f"βœ… Successfully downloaded state file from {repo_id}.", "INFO")
163
+ # Use the modified load_json_state which handles migration
164
+ return load_json_state(local_path, default_state)
165
+
166
+ except Exception as e:
167
+ log_message(f"⚠️ Failed to download state file from Hugging Face: {str(e)}. Starting from default state.", "WARNING")
168
+ return default_state
169
+
170
+ def upload_hf_state(repo_id: str, filename: str, state: Dict[str, Any]) -> bool:
171
+ """Uploads the state file to Hugging Face."""
172
+ local_path = os.path.join(LOCAL_STATE_FOLDER, filename)
173
+
174
+ try:
175
+ # 1. Save the updated state locally
176
+ save_json_state(local_path, state)
177
+
178
+ # 2. Upload the file
179
+ hf_api.upload_file(
180
+ path_or_fileobj=local_path,
181
+ path_in_repo=filename,
182
+ repo_id=repo_id,
183
+ repo_type="dataset",
184
+ commit_message=f"Update processing state: next_index={state['next_download_index']}"
185
+ )
186
+ log_message(f"βœ… Successfully uploaded updated state file to {repo_id}", "INFO")
187
+ return True
188
+ except Exception as e:
189
+ log_message(f"❌ Failed to upload state file to Hugging Face: {str(e)}", "ERROR")
190
+ return False
191
+
192
+ # --- NEW LOCKING FUNCTIONS ---
193
+
194
+ def lock_file_for_processing(rar_filename: str, state: Dict[str, Any]) -> bool:
195
+ """Marks a file as 'processing' in the state file and uploads the lock."""
196
+ log_message(f"πŸ”’ Attempting to lock file: {rar_filename} (Marking as 'processing')", "INFO")
197
+
198
+ # Update state locally
199
+ state["file_states"][rar_filename] = "processing"
200
+
201
+ # Upload the updated state file immediately to establish the lock
202
+ if upload_hf_state(TARGET_REPO_ID, HF_STATE_FILE, state):
203
+ log_message(f"βœ… Successfully locked file: {rar_filename}", "INFO")
204
+ return True
205
+ else:
206
+ log_message(f"❌ Failed to upload lock for file: {rar_filename}. Aborting processing.", "ERROR")
207
+ # Revert local state to avoid confusion if upload failed
208
+ if rar_filename in state["file_states"]:
209
+ del state["file_states"][rar_filename]
210
+ return False
211
+
212
+ def unlock_file_as_processed(rar_filename: str, state: Dict[str, Any], next_index: int) -> bool:
213
+ """Marks a file as 'processed', updates the index, and uploads the state."""
214
+ log_message(f"πŸ”“ Attempting to unlock file: {rar_filename} (Marking as 'processed')", "INFO")
215
+
216
+ # Update state locally
217
+ state["file_states"][rar_filename] = "processed"
218
+ state["next_download_index"] = next_index
219
+
220
+ # Upload the updated state file
221
+ if upload_hf_state(TARGET_REPO_ID, HF_STATE_FILE, state):
222
+ log_message(f"βœ… Successfully unlocked and marked as processed: {rar_filename}", "INFO")
223
+ return True
224
+ else:
225
+ log_message(f"❌ Failed to upload final state for file: {rar_filename}. The file is processed locally but state is not updated.", "ERROR")
226
+ return False
227
+
228
+ # --- Original Utility Functions ---
229
+
230
+ def download_with_retry(url: str, dest_path: str, max_retries: int = 3) -> bool:
231
+ """Download file with retry logic and disk space checking"""
232
+ if not check_disk_space():
233
+ cleanup_temp_files()
234
+ if not check_disk_space():
235
+ log_message("❌ Insufficient disk space even after cleanup", "ERROR")
236
+ return False
237
+
238
+ # NEW FIX: Ensure the directory structure exists before attempting to write the file
239
+ try:
240
+ os.makedirs(os.path.dirname(dest_path), exist_ok=True)
241
+ except Exception as e:
242
+ log_message(f"❌ Failed to create directory for download path {os.path.dirname(dest_path)}: {str(e)}", "ERROR")
243
+ return False
244
+
245
+ headers = {"Authorization": f"Bearer {HF_TOKEN}"}
246
+ for attempt in range(max_retries):
247
+ try:
248
+ with requests.get(url, headers=headers, stream=True) as r:
249
+ r.raise_for_status()
250
+
251
+ # Check content length if available
252
+ content_length = r.headers.get("content-length")
253
+ if content_length:
254
+ size_gb = int(content_length) / (1024**3)
255
+ disk_info = get_disk_usage(".")
256
+ # Check if there is enough space for the full download
257
+ if disk_info["free"] < size_gb + MIN_FREE_SPACE_GB:
258
+ log_message(f"⚠️ Not enough space for download ({size_gb:.2f}GB required). Freeing space...", "WARNING")
259
+ cleanup_temp_files()
260
+ disk_info = get_disk_usage(".")
261
+ if disk_info["free"] < size_gb + MIN_FREE_SPACE_GB:
262
+ log_message(f"❌ Still not enough space for download. Required: {size_gb + MIN_FREE_SPACE_GB:.2f}GB, Available: {disk_info['free']:.2f}GB", "ERROR")
263
+ return False
264
+
265
+ # Download the file chunk by chunk
266
+ with open(dest_path, "wb") as f:
267
+ for chunk in r.iter_content(chunk_size=8192):
268
+ if chunk: # filter out keep-alive new chunks
269
+ f.write(chunk)
270
+
271
+ log_message(f"βœ… Download successful: {dest_path}", "INFO")
272
+ return True
273
+
274
+ except requests.exceptions.RequestException as e:
275
+ log_message(f"❌ Download attempt {attempt + 1} failed for {url}: {str(e)}", "WARNING")
276
+ time.sleep(PROCESSING_DELAY)
277
+ except Exception as e:
278
+ log_message(f"❌ An unexpected error occurred during download: {str(e)}", "ERROR")
279
+ return False
280
+
281
+ log_message(f"❌ Failed to download {url} after {max_retries} attempts.", "ERROR")
282
+ return False
283
+
284
+ def extract_rar(rar_path: str, extract_path: str) -> bool:
285
+ """Extracts a RAR file using unrar (requires unrar to be installed)."""
286
+ log_message(f"πŸ“¦ Attempting to extract RAR: {rar_path} to {extract_path}", "INFO")
287
+
288
+ # Helper to run a command and return (success, completed_process_or_exception)
289
+ def _run(cmd):
290
+ try:
291
+ proc = subprocess.run(cmd, capture_output=True, text=True, check=True)
292
+ return True, proc
293
+ except subprocess.CalledProcessError as e:
294
+ return False, e
295
+
296
+ try:
297
+ # Create the extraction directory if it doesn't exist
298
+ os.makedirs(extract_path, exist_ok=True)
299
+
300
+ # First try a normal full extraction
301
+ command = ["unrar", "x", "-o+", "-y", rar_path, extract_path]
302
+ ok, result = _run(command)
303
+ if ok:
304
+ # Successful full extraction
305
+ if hasattr(result, 'stdout') and "All OK" not in result.stdout:
306
+ log_message(f"⚠️ RAR extraction finished with warnings/non-fatal errors for {rar_path}: {result.stdout}", "WARNING")
307
+ log_message(f"βœ… Successfully extracted RAR: {rar_path}", "INFO")
308
+ return True
309
+
310
+ # If full extraction failed, inspect the error to see if it's a multipart/volume dependency
311
+ stderr = ''
312
+ if isinstance(result, subprocess.CalledProcessError):
313
+ stderr = (result.stderr or '')
314
+
315
+ # Common message when a previous volume is required
316
+ if "start extraction from a previous volume" in stderr.lower() or "previous volume" in stderr.lower() or "you need to start extraction" in stderr.lower():
317
+ log_message(f"⚠️ Full extraction failed due to multipart volume dependency for {rar_path}. Will attempt per-file extraction fallback.", "WARNING")
318
+
319
+ # Attempt to list files contained in this archive volume
320
+ list_cmd = ["unrar", "lb", rar_path]
321
+ ok_list, list_result = _run(list_cmd)
322
+ if not ok_list:
323
+ log_message(f"❌ Failed to list archive contents for {rar_path}: {(list_result.stderr if isinstance(list_result, subprocess.CalledProcessError) else str(list_result))}", "ERROR")
324
+ return False
325
+
326
+ file_list = [ln.strip() for ln in (list_result.stdout or '').splitlines() if ln.strip()]
327
+ if not file_list:
328
+ log_message(f"⚠️ Archive {rar_path} appears empty or listing failed. Cannot extract.", "WARNING")
329
+ return False
330
+
331
+ extracted_any = False
332
+ # Try to extract each file individually; skip files that require previous volumes
333
+ for member in file_list:
334
+ # Use 'unrar x <archive> <member> <dest>' to extract a specific file
335
+ cmd = ["unrar", "x", "-o+", "-y", rar_path, member, extract_path]
336
+ ok_member, member_result = _run(cmd)
337
+ if ok_member:
338
+ extracted_any = True
339
+ log_message(f"βœ… Extracted member {member} from {rar_path}", "INFO")
340
+ else:
341
+ # If this member failed due to missing previous volume, log and continue
342
+ member_err = ''
343
+ if isinstance(member_result, subprocess.CalledProcessError):
344
+ member_err = (member_result.stderr or '')
345
+ log_message(f"⚠️ Could not extract member {member} from {rar_path}: {member_err.strip()}", "WARNING")
346
+
347
+ if extracted_any:
348
+ log_message(f"βœ… Finished partial extraction from {rar_path} (some members extracted)", "INFO")
349
+ return True
350
+ else:
351
+ log_message(f"❌ No members could be extracted from {rar_path} independently.", "ERROR")
352
+ return False
353
+
354
+ # Otherwise, full extraction failed for another reason
355
+ log_message(f"❌ RAR extraction failed for {rar_path}. Error: {stderr}", "ERROR")
356
+ return False
357
+
358
+ except FileNotFoundError:
359
+ log_message("❌ 'unrar' command not found. Please ensure 'unrar' is installed.", "ERROR")
360
+ return False
361
+ except Exception as e:
362
+ log_message(f"❌ An unexpected error occurred during RAR extraction: {str(e)}", "ERROR")
363
+ return False
364
+
365
+ def extract_audio_from_video(video_path: str, output_wav_path: str) -> bool:
366
+ """Extracts audio from a video file and saves it as WAV format using moviepy."""
367
+ log_message(f"πŸ”Š Extracting audio from {video_path} to {output_wav_path}", "INFO")
368
+
369
+ try:
370
+ # Ensure the output directory exists
371
+ os.makedirs(os.path.dirname(output_wav_path), exist_ok=True)
372
+
373
+ # Load the video file
374
+ video = VideoFileClip(video_path)
375
+
376
+ # Check if the video has audio
377
+ if video.audio is None:
378
+ log_message(f"⚠️ No audio track found in video: {video_path}", "WARNING")
379
+ video.close()
380
+ return False
381
+
382
+ # Extract audio and save as WAV
383
+ video.audio.write_audiofile(output_wav_path, logger=None)
384
+ video.close()
385
+
386
+ if os.path.exists(output_wav_path) and os.path.getsize(output_wav_path) > 0:
387
+ log_message(f"βœ… Successfully extracted audio to WAV: {output_wav_path}", "INFO")
388
+ return True
389
+ else:
390
+ log_message(f"❌ Audio extraction produced empty or missing file: {output_wav_path}", "ERROR")
391
+ return False
392
+
393
+ except Exception as e:
394
+ log_message(f"❌ An error occurred during audio extraction from {video_path}: {str(e)}", "ERROR")
395
+ return False
396
+
397
+ def upload_folder_to_hf(folder_path: str, repo_id: str, path_in_repo: str, commit_message: str) -> bool:
398
+ """Uploads an entire folder's contents to a Hugging Face repository."""
399
+ log_message(f"⬆️ Uploading folder {folder_path} to {repo_id}/{path_in_repo}", "INFO")
400
+ try:
401
+ # Collect all files to be uploaded
402
+ operations = []
403
+ for root, _, files in os.walk(folder_path):
404
+ for file in files:
405
+ local_path = os.path.join(root, file)
406
+ # Calculate the path inside the repository
407
+ relative_path = os.path.relpath(local_path, folder_path)
408
+ repo_path = os.path.join(path_in_repo, relative_path)
409
+
410
+ operations.append(
411
+ CommitOperationAdd(path_in_repo=repo_path, path_or_fileobj=local_path)
412
+ )
413
+
414
+ if not operations:
415
+ log_message(f"⚠️ Folder {folder_path} is empty. Skipping upload.", "WARNING")
416
+ return True # Consider an empty folder upload successful
417
+
418
+ # Perform the upload
419
+ hf_api.create_commit(
420
+ repo_id=repo_id,
421
+ operations=operations,
422
+ commit_message=commit_message,
423
+ repo_type="dataset"
424
+ )
425
+
426
+ log_message(f"βœ… Successfully uploaded {len(operations)} files from {folder_path}", "INFO")
427
+ return True
428
+
429
+ except Exception as e:
430
+ log_message(f"❌ Failed to upload folder {folder_path} to Hugging Face: {str(e)}", "ERROR")
431
+ return False
432
+
433
+ def process_rar_file(rar_path: str) -> bool:
434
+ """
435
+ Main processing logic for a single RAR file:
436
+ 1. Extract RAR
437
+ 2. Find video files (MP4s)
438
+ 3. Extract audio from each video and convert to WAV
439
+ 4. Upload each WAV file to HF one by one
440
+ 5. Clean up local files
441
+ """
442
+ rar_filename = os.path.basename(rar_path)
443
+ base_name = os.path.splitext(rar_filename)[0]
444
+
445
+ # 1. Extract RAR
446
+ extract_dir = os.path.join(EXTRACT_FOLDER, base_name)
447
+ if not extract_rar(rar_path, extract_dir):
448
+ log_failed_file(rar_filename, "RAR extraction failed")
449
+ return False
450
+
451
+ video_files = []
452
+ # Search for common video extensions recursively
453
+ for ext in ['*.mp4', '*.mkv', '*.avi', '*.mov', '*.webm']:
454
+ video_files.extend(Path(extract_dir).rglob(ext))
455
+
456
+ if not video_files:
457
+ log_message(f"⚠️ No video files found in extracted content for {rar_filename}", "WARNING")
458
+ # Clean up the extracted folder
459
+ shutil.rmtree(extract_dir, ignore_errors=True)
460
+ log_message(f"πŸ—‘οΈ Cleaned up extracted folder: {extract_dir}", "INFO")
461
+ log_failed_file(rar_filename, "No video files found")
462
+ return False
463
+
464
+ success_count = 0
465
+
466
+ for video_path_obj in video_files:
467
+ video_path = str(video_path_obj)
468
+ video_filename = video_path_obj.name
469
+ video_base_name = os.path.splitext(video_filename)[0]
470
+
471
+ # 3. Extract audio from video and convert to WAV
472
+ wav_filename = f"{video_base_name}.wav"
473
+ wav_output_path = os.path.join(EXTRACT_FOLDER, wav_filename)
474
+
475
+ if not extract_audio_from_video(video_path, wav_output_path):
476
+ log_failed_file(rar_filename, f"Failed to extract audio from {video_filename}")
477
+ continue
478
+
479
+ # 4. Upload each WAV file to HF
480
+ path_in_repo = f"audio/{wav_filename}"
481
+ commit_message = f"Add audio: {wav_filename} extracted from {video_filename} in archive {rar_filename}"
482
+
483
+ try:
484
+ log_message(f"⬆️ Uploading audio: {wav_filename}", "INFO")
485
+ hf_api.upload_file(
486
+ path_or_fileobj=wav_output_path,
487
+ path_in_repo=path_in_repo,
488
+ repo_id=TARGET_REPO_ID,
489
+ repo_type="dataset",
490
+ commit_message=commit_message
491
+ )
492
+ log_message(f"βœ… Successfully uploaded audio: {wav_filename}", "INFO")
493
+ success_count += 1
494
+ processing_status["extracted_videos"] += 1
495
+
496
+ # Wait 60 seconds before next upload
497
+ log_message(f"⏳ Waiting 60 seconds before next upload...", "INFO")
498
+ time.sleep(60)
499
+
500
+ except Exception as e:
501
+ log_message(f"❌ Failed to upload audio {wav_filename}: {str(e)}", "ERROR")
502
+ log_failed_file(rar_filename, f"Failed to upload audio {wav_filename}: {str(e)}")
503
+
504
+ finally:
505
+ # Clean up the WAV file after upload attempt
506
+ if os.path.exists(wav_output_path):
507
+ os.remove(wav_output_path)
508
+ log_message(f"πŸ—‘οΈ Cleaned up WAV file: {wav_output_path}", "INFO")
509
+
510
+ # 5. Clean up the extracted folder
511
+ shutil.rmtree(extract_dir, ignore_errors=True)
512
+ log_message(f"πŸ—‘οΈ Cleaned up extracted folder: {extract_dir}", "INFO")
513
+
514
+ if success_count > 0:
515
+ processing_status["extracted_courses"] += 1 # Assuming one rar is one course
516
+ return True
517
+ else:
518
+ log_message(f"❌ All audio extraction/upload failed for {rar_filename}", "ERROR")
519
+ return False
520
+
521
+ def get_next_file_to_process(repo_id: str, state: Dict[str, Any]) -> Optional[Dict[str, Any]]:
522
+ """
523
+ Finds the next file to process from the source repo.
524
+ Returns: { 'filename': str, 'url': str, 'index': int } or None
525
+ """
526
+ log_message(f"πŸ” Searching for next file to process in {repo_id}", "INFO")
527
+
528
+ try:
529
+ # 1. List all files in the source repository
530
+ files_list = hf_api.list_repo_files(repo_id=repo_id, repo_type="dataset")
531
+
532
+ # 2. Filter for .rar and .zip files
533
+ archive_files = sorted([f for f in files_list if f.endswith(('.rar', '.zip'))])
534
+
535
+ if not archive_files:
536
+ log_message("ℹ️ No .rar or .zip files found in the source repository.", "INFO")
537
+ return None
538
+
539
+ processing_status["total_files"] = len(archive_files)
540
+
541
+ # 3. Get the next index from the state
542
+ start_index = state.get("next_download_index", 0)
543
+
544
+ # 4. Iterate through files starting from the index
545
+ for index in range(start_index, len(archive_files)):
546
+ filename = archive_files[index]
547
+
548
+ # Check the state of the file
549
+ file_state = state["file_states"].get(filename)
550
+
551
+ # Only process if the file is not in the state or is marked as 'failed'
552
+ if file_state is None or file_state == "failed":
553
+
554
+ # Construct the download URL
555
+ url = hf_hub_url(repo_id=repo_id, filename=filename, repo_type="dataset", subfolder=None)
556
+
557
+ log_message(f"βœ… Found next file: {filename} at index {index}", "INFO")
558
+ return {
559
+ 'filename': filename,
560
+ 'url': url,
561
+ 'index': index
562
+ }
563
+
564
+ elif file_state == "processing":
565
+ log_message(f"⚠️ File {filename} is currently marked as 'processing'. Skipping for now.", "WARNING")
566
+ # Advance the index if a file is stuck in 'processing' for too long,
567
+ # but for now, we'll just skip it and let the loop continue.
568
+
569
+ elif file_state == "processed":
570
+ log_message(f"ℹ️ File {filename} already processed. Skipping.", "INFO")
571
+
572
+ log_message("ℹ️ All files up to the current index have been processed or skipped.", "INFO")
573
+
574
+ # If we reach the end of the list, reset the index to 0 to check for new files
575
+ if start_index >= len(archive_files):
576
+ log_message("ℹ️ Reached end of file list. Resetting index to 0 for next loop.", "INFO")
577
+ state["next_download_index"] = 0
578
+ upload_hf_state(TARGET_REPO_ID, HF_STATE_FILE, state)
579
+
580
+ return None
581
+
582
+ except Exception as e:
583
+ log_message(f"❌ Failed to list files from Hugging Face: {str(e)}", "ERROR")
584
+ return None
585
+
586
+ def main_processing_loop():
587
+ """The main loop that orchestrates the download, processing, and upload cycle."""
588
+
589
+ if processing_status["is_running"]:
590
+ log_message("⚠️ Processing loop is already running.", "WARNING")
591
+ return
592
+
593
+ processing_status["is_running"] = True
594
+
595
+ try:
596
+ log_message("πŸš€ Starting main processing loop...", "INFO")
597
+
598
+ while processing_status["is_running"]:
599
+
600
+ # 1. Download the current state
601
+ current_state = download_hf_state(TARGET_REPO_ID, HF_STATE_FILE)
602
+
603
+ # 2. Find the next file to process
604
+ next_file_info = get_next_file_to_process(SOURCE_REPO_ID, current_state)
605
+
606
+ if next_file_info is None:
607
+ log_message("πŸ’€ No new files to process. Sleeping for a while...", "INFO")
608
+ time.sleep(PROCESSING_DELAY * 5) # Sleep longer if nothing to do
609
+ continue
610
+
611
+ target_file = next_file_info['filename']
612
+ rar_url = next_file_info['url']
613
+ target_index = next_file_info['index']
614
+
615
+ processing_status["current_file"] = target_file
616
+ success = False
617
+
618
+ try:
619
+ # 3. Lock the file for processing
620
+ if not lock_file_for_processing(target_file, current_state):
621
+ log_message(f"❌ Failed to lock file {target_file}. Skipping.", "ERROR")
622
+ time.sleep(PROCESSING_DELAY)
623
+ continue # Start next iteration
624
+
625
+ # 4. Download the file
626
+ local_rar_path = os.path.join(DOWNLOAD_FOLDER, target_file)
627
+ log_message(f"⬇️ Downloading file: {target_file}", "INFO")
628
+
629
+ if download_with_retry(rar_url, local_rar_path):
630
+
631
+ # 5. Process the file (extraction, frame processing, zipping, uploading results, etc.)
632
+ if process_rar_file(local_rar_path):
633
+ success = True
634
+ log_message(f"βœ… Finished all processing steps for: {target_file}", "INFO")
635
+ else:
636
+ log_message(f"❌ Processing failed for: {target_file}", "ERROR")
637
+ else:
638
+ log_message(f"❌ Download failed for: {target_file}", "ERROR")
639
+
640
+ except Exception as e:
641
+ log_message(f"πŸ”₯ An unhandled error occurred while processing {target_file}: {str(e)}", "ERROR")
642
+ log_failed_file(target_file, str(e))
643
+
644
+ finally:
645
+ # 6. Release Lock / Update State
646
+
647
+ # The next index to check will be the one *after* the current file, regardless of success.
648
+ next_index_to_save = target_index + 1
649
+
650
+ # Download the latest state again before final upload to ensure we don't overwrite
651
+ # changes made by other workers in the meantime (e.g. if they processed a file
652
+ # that was before this one in the queue).
653
+ current_state = download_hf_state(TARGET_REPO_ID, HF_STATE_FILE)
654
+
655
+ if success:
656
+ # Mark as 'processed' and update the next_download_index
657
+ unlock_file_as_processed(target_file, current_state, next_index_to_save)
658
+ processing_status["processed_files"] += 1
659
+ else:
660
+ # If processing failed, we still want to release the 'processing' lock,
661
+ # but we mark it as 'failed' instead of 'processed' and still advance the index.
662
+ log_message(f"⚠️ Processing failed for {target_file}. Marking as 'failed' and advancing index.", "WARNING")
663
+ current_state["file_states"][target_file] = "failed"
664
+ current_state["next_download_index"] = next_index_to_save
665
+ upload_hf_state(TARGET_REPO_ID, HF_STATE_FILE, current_state)
666
+ processing_status["failed_files"] += 1
667
+
668
+ # Clean up local files
669
+ if os.path.exists(local_rar_path):
670
+ os.remove(local_rar_path)
671
+ log_message(f"πŸ—‘οΈ Cleaned up local file: {local_rar_path}", "INFO")
672
+
673
+ # Wait a bit before checking for the next file to avoid hammering the HF API
674
+ time.sleep(PROCESSING_DELAY)
675
+
676
+ log_message("πŸŽ‰ Processing complete!", "INFO")
677
+ log_message(f'πŸ“Š Final stats: {processing_status["extracted_courses"]} courses extracted, {processing_status["extracted_videos"]} videos processed, frames extracted', "INFO")
678
+
679
+ except KeyboardInterrupt:
680
+ log_message("⏹️ Processing interrupted by user", "WARNING")
681
+ except Exception as e:
682
+ log_message(f"❌ Fatal error: {str(e)}", "ERROR")
683
+ finally:
684
+ processing_status["is_running"] = False
685
+ cleanup_temp_files()
686
+
687
+ # Expose necessary functions and variables
688
+ __all__ = [
689
+ "main_processing_loop",
690
+ "processing_status",
691
+ "log_message",
692
+ ]