Fred808 commited on
Commit
02ac6bc
Β·
verified Β·
1 Parent(s): 13544f1

Create processing_logic.py

Browse files
Files changed (1) hide show
  1. processing_logic.py +371 -0
processing_logic.py ADDED
@@ -0,0 +1,371 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ import json
3
+ import requests
4
+ import subprocess
5
+ import shutil
6
+ import time
7
+ import threading
8
+ from typing import Dict, List, Optional
9
+ from pathlib import Path
10
+ from huggingface_hub import HfApi
11
+ import uuid
12
+
13
+ # ==== CONFIGURATION ====
14
+ HF_TOKEN = os.getenv("HF_TOKEN", "")
15
+ SOURCE_REPO_ID = os.getenv("SOURCE_REPO", "Fred808/BG1")
16
+
17
+ # Directory Configuration
18
+ UPLOAD_DIRECTORY = "./uploads"
19
+ DOWNLOAD_FOLDER = "./downloads"
20
+ EXTRACT_FOLDER = "./extracted"
21
+ MP4_OUTPUT_FOLDER = "./mp4_files"
22
+
23
+ # Create directories
24
+ for directory in [UPLOAD_DIRECTORY, DOWNLOAD_FOLDER, EXTRACT_FOLDER, MP4_OUTPUT_FOLDER]:
25
+ os.makedirs(directory, exist_ok=True)
26
+
27
+ # State Files
28
+ DOWNLOAD_STATE_FILE = "download_progress.json"
29
+ PROCESS_STATE_FILE = "process_progress.json"
30
+ FAILED_FILES_LOG = "failed_files.log"
31
+
32
+ # Processing Parameters
33
+ MAX_RETRIES = 3
34
+ MIN_FREE_SPACE_GB = 2
35
+ DEFAULT_RAR_LIMIT = 5 # Default number of RAR files to process
36
+
37
+ # Initialize HF API
38
+ hf_api = HfApi(token=HF_TOKEN) if HF_TOKEN else None
39
+
40
+ # Global State
41
+ processing_status = {
42
+ "is_running": False,
43
+ "current_file": None,
44
+ "total_files": 0,
45
+ "processed_files": 0,
46
+ "failed_files": 0,
47
+ "extracted_courses": 0,
48
+ "extracted_mp4s": 0,
49
+ "last_update": None,
50
+ "logs": []
51
+ }
52
+
53
+ # Store for uploaded MP4s with metadata (this will be managed by the API part, but needs to be accessible)
54
+ uploaded_mp4s = {}
55
+
56
+ def log_message(message: str):
57
+ """Log messages with timestamp"""
58
+ timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
59
+ log_entry = f"[{timestamp}] {message}"
60
+ print(log_entry)
61
+ processing_status["logs"].append(log_entry)
62
+ processing_status["last_update"] = timestamp
63
+ if len(processing_status["logs"]) > 100:
64
+ processing_status["logs"] = processing_status["logs"][-100:]
65
+
66
+ def log_failed_file(filename: str, error: str):
67
+ """Log failed files to persistent file"""
68
+ timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
69
+ with open(FAILED_FILES_LOG, "a") as f:
70
+ f.write(f"{timestamp} - {filename}: {error}\n")
71
+
72
+ def get_disk_usage(path: str) -> Dict[str, float]:
73
+ """Get disk usage statistics in GB"""
74
+ statvfs = os.statvfs(path)
75
+ total = statvfs.f_frsize * statvfs.f_blocks / (1024**3)
76
+ free = statvfs.f_frsize * statvfs.f_bavail / (1024**3)
77
+ used = total - free
78
+ return {"total": total, "free": free, "used": used}
79
+
80
+ def check_disk_space(path: str = ".") -> bool:
81
+ """Check if there\'s enough disk space"""
82
+ disk_info = get_disk_usage(path)
83
+ if disk_info["free"] < MIN_FREE_SPACE_GB:
84
+ log_message(f'⚠️ Low disk space: {disk_info["free"]:.2f}GB free, {disk_info["used"]:.2f}GB used')
85
+ return False
86
+ return True
87
+
88
+ def cleanup_temp_files():
89
+ """Clean up temporary files to free space"""
90
+ log_message("🧹 Cleaning up temporary files...")
91
+
92
+ # Clean old downloads (keep only current processing file)
93
+ current_file = processing_status.get("current_file")
94
+ for file in os.listdir(DOWNLOAD_FOLDER):
95
+ if file != current_file and file.endswith((".rar", ".zip")):
96
+ try:
97
+ os.remove(os.path.join(DOWNLOAD_FOLDER, file))
98
+ log_message(f"πŸ—‘οΈ Removed old download: {file}")
99
+ except:
100
+ pass
101
+
102
+ def load_json_state(file_path: str, default_value):
103
+ """Load state from JSON file"""
104
+ if os.path.exists(file_path):
105
+ try:
106
+ with open(file_path, "r") as f:
107
+ return json.load(f)
108
+ except json.JSONDecodeError:
109
+ log_message(f"⚠️ Corrupted state file: {file_path}")
110
+ return default_value
111
+
112
+ def save_json_state(file_path: str, data):
113
+ """Save state to JSON file"""
114
+ with open(file_path, "w") as f:
115
+ json.dump(data, f, indent=2)
116
+
117
+ def download_with_retry(url: str, dest_path: str, max_retries: int = 3) -> bool:
118
+ """Download file with retry logic and disk space checking"""
119
+ if not check_disk_space():
120
+ cleanup_temp_files()
121
+ if not check_disk_space():
122
+ log_message("❌ Insufficient disk space even after cleanup")
123
+ return False
124
+
125
+ headers = {"Authorization": f"Bearer {HF_TOKEN}"} if HF_TOKEN else {}
126
+ for attempt in range(max_retries):
127
+ try:
128
+ with requests.get(url, headers=headers, stream=True) as r:
129
+ r.raise_for_status()
130
+
131
+ # Check content length if available
132
+ content_length = r.headers.get("content-length")
133
+ if content_length:
134
+ size_gb = int(content_length) / (1024**3)
135
+ disk_info = get_disk_usage(".")
136
+ if size_gb > disk_info["free"] - 0.5: # Leave 0.5GB buffer
137
+ log_message(f'❌ File too large: {size_gb:.2f}GB, only {disk_info["free"]:.2f}GB free')
138
+ return False
139
+
140
+ with open(dest_path, "wb") as f:
141
+ for chunk in r.iter_content(chunk_size=8192):
142
+ f.write(chunk)
143
+ return True
144
+ except Exception as e:
145
+ if attempt < max_retries - 1:
146
+ time.sleep(2 ** attempt)
147
+ continue
148
+ log_message(f"❌ Download failed after {max_retries} attempts: {e}")
149
+ return False
150
+ return False
151
+
152
+ def is_multipart_rar(filename: str) -> bool:
153
+ """Check if this is a multi-part RAR file"""
154
+ return ".part" in filename.lower() and filename.lower().endswith(".rar")
155
+
156
+ def get_rar_part_base(filename: str) -> str:
157
+ """Get the base name for multi-part RAR files"""
158
+ if ".part" in filename.lower():
159
+ return filename.split(".part")[0]
160
+ return filename.replace(".rar", "")
161
+
162
+ def extract_with_retry(rar_path: str, output_dir: str, max_retries: int = 2) -> bool:
163
+ """Extract RAR with retry and recovery, handling multi-part archives"""
164
+ filename = os.path.basename(rar_path)
165
+
166
+ # For multi-part RARs, we need the first part
167
+ if is_multipart_rar(filename):
168
+ base_name = get_rar_part_base(filename)
169
+ first_part = f"{base_name}.part01.rar"
170
+ first_part_path = os.path.join(os.path.dirname(rar_path), first_part)
171
+
172
+ if not os.path.exists(first_part_path):
173
+ log_message(f"⚠️ Multi-part RAR detected but first part not found: {first_part}")
174
+ return False
175
+
176
+ rar_path = first_part_path
177
+ log_message(f"πŸ“¦ Processing multi-part RAR starting with: {first_part}")
178
+
179
+ for attempt in range(max_retries):
180
+ try:
181
+ # Test RAR first
182
+ test_cmd = ["unrar", "t", rar_path]
183
+ test_result = subprocess.run(test_cmd, capture_output=True, text=True)
184
+ if test_result.returncode != 0:
185
+ log_message(f"⚠️ RAR test failed: {test_result.stderr}")
186
+ if attempt == max_retries - 1:
187
+ return False
188
+ continue
189
+
190
+ # Extract RAR
191
+ cmd = ["unrar", "x", "-o+", rar_path, output_dir]
192
+ if attempt > 0: # Try recovery on subsequent attempts
193
+ cmd.insert(2, "-kb")
194
+
195
+ result = subprocess.run(cmd, capture_output=True, text=True)
196
+ if result.returncode == 0:
197
+ log_message(f"βœ… Successfully extracted: {os.path.basename(rar_path)}")
198
+ return True
199
+ else:
200
+ error_msg = result.stderr or result.stdout
201
+ log_message(f"⚠️ Extraction attempt {attempt + 1} failed: {error_msg}")
202
+
203
+ except Exception as e:
204
+ log_message(f"❌ Extraction exception: {str(e)}")
205
+ if attempt == max_retries - 1:
206
+ return False
207
+ time.sleep(1)
208
+
209
+ return False
210
+
211
+ def process_rar_file(rar_path: str) -> List[Dict]:
212
+ """Process a single RAR file - extract and find MP4 files"""
213
+ filename = os.path.basename(rar_path)
214
+ processing_status["current_file"] = filename
215
+
216
+ # Handle multi-part RAR naming
217
+ if is_multipart_rar(filename):
218
+ course_name = get_rar_part_base(filename)
219
+ else:
220
+ course_name = filename.replace(".rar", "")
221
+
222
+ # Create a unique directory for this course's extracted MP4s
223
+ course_mp4_output_dir = os.path.join(MP4_OUTPUT_FOLDER, course_name)
224
+ os.makedirs(course_mp4_output_dir, exist_ok=True)
225
+
226
+ extract_dir = os.path.join(EXTRACT_FOLDER, course_name)
227
+ mp4_files = []
228
+
229
+ try:
230
+ log_message(f"πŸ”„ Processing: {filename}")
231
+
232
+ # Clean up any existing directory
233
+ if os.path.exists(extract_dir):
234
+ shutil.rmtree(extract_dir, ignore_errors=True)
235
+
236
+ # Extract RAR
237
+ os.makedirs(extract_dir, exist_ok=True)
238
+ if not extract_with_retry(rar_path, extract_dir):
239
+ raise Exception("RAR extraction failed")
240
+
241
+ # Find and copy MP4 files
242
+ for root, dirs, files in os.walk(extract_dir):
243
+ for file in files:
244
+ if file.lower().endswith(".mp4"):
245
+ source_path = os.path.join(root, file)
246
+ # Use original filename for MP4 output within the course directory
247
+ dest_path = os.path.join(course_mp4_output_dir, file)
248
+
249
+ try:
250
+ shutil.copy2(source_path, dest_path)
251
+ mp4_files.append({
252
+ "id": os.path.join(course_name, file), # Store as course_name/filename.mp4
253
+ "original_name": file,
254
+ "course_name": course_name,
255
+ "size": os.path.getsize(dest_path),
256
+ "created_at": time.strftime("%Y-%m-%d %H:%M:%S")
257
+ })
258
+ log_message(f"βœ… Extracted MP4: {file} -> {os.path.join(course_name, file)}")
259
+ except Exception as e:
260
+ log_message(f"❌ Failed to copy MP4 {file}: {e}")
261
+
262
+ processing_status["extracted_courses"] += 1
263
+ processing_status["extracted_mp4s"] += len(mp4_files)
264
+ log_message(f"βœ… Successfully processed '{course_name}' - found {len(mp4_files)} MP4 files")
265
+
266
+ return mp4_files
267
+
268
+ except Exception as e:
269
+ error_msg = str(e)
270
+ log_message(f"❌ Processing failed: {error_msg}")
271
+ log_failed_file(filename, error_msg)
272
+ return []
273
+
274
+ finally:
275
+ processing_status["current_file"] = None
276
+ # Clean up extracted directory
277
+ if os.path.exists(extract_dir):
278
+ shutil.rmtree(extract_dir, ignore_errors=True)
279
+
280
+ def process_hf_files_background(start_index: int = 5, limit: int = DEFAULT_RAR_LIMIT):
281
+ """Background task to process HuggingFace files"""
282
+ if not hf_api:
283
+ log_message("❌ HuggingFace API not configured (missing HF_TOKEN)")
284
+ return
285
+
286
+ processing_status["is_running"] = True
287
+
288
+ try:
289
+ # Load state
290
+ processed_rars = load_json_state(PROCESS_STATE_FILE, {"processed_rars": []})["processed_rars"]
291
+ download_state = load_json_state(DOWNLOAD_STATE_FILE, {"next_download_index": 0})
292
+
293
+ # Use start_index if provided, otherwise use the saved state
294
+ current_index = start_index if start_index > 0 else download_state["next_download_index"]
295
+
296
+ log_message(f"πŸ“Š Starting processing from index {current_index} with a limit of {limit} files.")
297
+ log_message(f"πŸ“Š Previously processed: {len(processed_rars)} files")
298
+
299
+ # Get file list
300
+ try:
301
+ files = list(hf_api.list_repo_files(repo_id=SOURCE_REPO_ID, repo_type="dataset"))
302
+ rar_files = sorted([f for f in files if f.endswith(".rar")])
303
+
304
+ processing_status["total_files"] = len(rar_files)
305
+ log_message(f"πŸ“ Found {len(rar_files)} RAR files in repository")
306
+
307
+ if current_index >= len(rar_files):
308
+ log_message("βœ… All files have been processed!")
309
+ return
310
+
311
+ except Exception as e:
312
+ log_message(f"❌ Failed to get file list: {str(e)}")
313
+ return
314
+
315
+ processed_count = 0
316
+ while processed_count < limit and current_index < len(rar_files) and processing_status["is_running"]:
317
+ rar_file = rar_files[current_index]
318
+ filename = os.path.basename(rar_file)
319
+
320
+ if filename in processed_rars:
321
+ log_message(f"⏭️ Skipping already processed: {filename}")
322
+ processing_status["processed_files"] += 1
323
+ current_index += 1
324
+ save_json_state(DOWNLOAD_STATE_FILE, {"next_download_index": current_index})
325
+ continue
326
+
327
+ log_message(f"πŸ“₯ Downloading: {filename}")
328
+ dest_path = os.path.join(DOWNLOAD_FOLDER, filename)
329
+
330
+ # Download file
331
+ download_url = f"https://huggingface.co/datasets/{SOURCE_REPO_ID}/resolve/main/{rar_file}"
332
+ if download_with_retry(download_url, dest_path):
333
+ # Process file
334
+ mp4_files = process_rar_file(dest_path)
335
+ if mp4_files:
336
+ processed_rars.append(filename)
337
+ save_json_state(PROCESS_STATE_FILE, {"processed_rars": processed_rars})
338
+ log_message(f"βœ… Successfully processed: {filename}")
339
+ processing_status["processed_files"] += 1
340
+ else:
341
+ log_message(f"❌ Failed to process: {filename}")
342
+ processing_status["failed_files"] += 1
343
+
344
+ # Clean up downloaded file
345
+ try:
346
+ os.remove(dest_path)
347
+ log_message(f"πŸ—‘οΈ Cleaned up download: {filename}")
348
+ except:
349
+ pass
350
+ else:
351
+ log_message(f"❌ Failed to download: {filename}")
352
+ processing_status["failed_files"] += 1
353
+
354
+ # Update download state for next run
355
+ current_index += 1
356
+ processed_count += 1
357
+ save_json_state(DOWNLOAD_STATE_FILE, {"next_download_index": current_index})
358
+
359
+ if current_index >= len(rar_files):
360
+ log_message("πŸŽ‰ All available RAR files have been processed!")
361
+ elif not processing_status["is_running"]:
362
+ log_message("⏹️ Processing stopped by request.")
363
+ else:
364
+ log_message(f"βœ… Processed {processed_count} RAR files. Next index to process: {current_index}")
365
+
366
+ except Exception as e:
367
+ log_message(f"❌ Fatal error in background processing: {str(e)}")
368
+ finally:
369
+ processing_status["is_running"] = False
370
+ cleanup_temp_files()
371
+