import json import os import time import datetime import threading from huggingface_hub import hf_hub_download, list_repo_files, HfApi from fastapi import FastAPI from fastapi.responses import HTMLResponse from uvicorn import run as uvicorn_run import logging # --- Configuration --- ALL_REPO_ID = "samfred2/ALL" ATO_REPO_ID = "samfred2/ATO" OUTPUT_REPO_ID = "samfred2/ALL2" OUTPUT_DIR = "processed_files" HF_TOKEN = os.getenv("HF_TOKEN", "") MAX_UPLOADS_PER_HOUR = 128 RATE_LIMIT_DELAY = 3600 # 1 hour in seconds os.makedirs(OUTPUT_DIR, exist_ok=True) api = HfApi(token=HF_TOKEN) # Logging setup logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) # Global state processing_state = { 'running': False, 'status': 'idle', 'processed': 0, 'uploaded': 0, 'total': 0, 'current_file': '', 'error': None, 'start_time': None, 'eta': None } # FastAPI app app = FastAPI(title="Dataset Merger & Uploader", version="1.0.0") # HTML Dashboard embedded HTML_DASHBOARD = """ Dataset Merger & Uploader

📊 Dataset Merger & Uploader

Real-time processing dashboard for samfred2/ALL → samfred2/ALL2

Status
⏸️
Current state
IDLE
Processed
0
files completed
Uploaded
0
files to samfred2/ALL2
Total
0
matched pairs
Elapsed Time
0s
since start
ETA
--
estimated remaining
Overall Progress
0%0 / 0
Processing Rate
0/min
Upload Rate
0/min
Avg Time/File
0s
""" @app.on_event("startup") async def startup_event(): """Auto-start processing when server starts.""" logger.info("Server startup - Auto-starting processing...") processing_state['running'] = True processing_state['status'] = 'starting' processing_state['start_time'] = time.time() thread = threading.Thread(target=run_processing_thread, daemon=True) thread.start() @app.get("/", response_class=HTMLResponse) def get_dashboard(): """Serve dashboard HTML.""" return HTML_DASHBOARD @app.get("/status") def get_status(): """Get current processing status.""" return processing_state @app.post("/stop") def stop_processing(): """Stop the processing.""" processing_state['running'] = False return {"message": "Stop signal sent"} # --- Helper Functions --- def download_file(repo_id, filename, local_dir): """Downloads a single file with a delay to mitigate rate-limiting.""" logger.info(f"Downloading {filename} from {repo_id}...") try: time.sleep(1) local_path = hf_hub_download( repo_id=repo_id, filename=filename, repo_type="dataset", local_dir=local_dir, local_dir_use_symlinks=False, token=HF_TOKEN ) logger.info(f"Downloaded to {local_path}") return local_path except Exception as e: logger.error(f"Could not download {filename}. Error: {e}") return None def upload_file_with_rate_limit(api, file_path, path_in_repo, upload_state, progress=None): """ Uploads a file to HF with rate limiting (128 files per hour). Handles 429 errors by waiting an hour and resuming. Tracks rate limit events in progress file. """ while True: try: # Check if we've reached the limit if upload_state['count'] >= MAX_UPLOADS_PER_HOUR: elapsed = time.time() - upload_state['hour_start'] if elapsed < RATE_LIMIT_DELAY: wait_time = RATE_LIMIT_DELAY - elapsed wait_until = datetime.datetime.now() + datetime.timedelta(seconds=wait_time) logger.info(f"Rate limit reached ({MAX_UPLOADS_PER_HOUR} uploads/hour). Waiting until {wait_until}") # Track rate limit event if progress is not None: event = { 'timestamp': datetime.datetime.now().isoformat(), 'type': 'hourly_limit_reached', 'reason': f'Reached {MAX_UPLOADS_PER_HOUR} uploads/hour', 'wait_seconds': int(wait_time), 'resume_time': wait_until.isoformat() } progress.setdefault('rate_limit_events', []).append(event) time.sleep(wait_time) upload_state['hour_start'] = time.time() upload_state['count'] = 0 logger.info(f"Uploading {path_in_repo}... ({upload_state['count']+1}/{MAX_UPLOADS_PER_HOUR})") api.upload_file( path_or_fileobj=file_path, path_in_repo=path_in_repo, repo_id=OUTPUT_REPO_ID, repo_type="dataset", token=HF_TOKEN ) upload_state['count'] += 1 logger.info(f"Uploaded {path_in_repo}") break except Exception as e: if "429" in str(e) or "rate" in str(e).lower(): wait_until = datetime.datetime.now() + datetime.timedelta(seconds=RATE_LIMIT_DELAY) logger.warning(f"Rate limit hit (429). Waiting 1 hour until {wait_until}") # Track rate limit error event if progress is not None: event = { 'timestamp': datetime.datetime.now().isoformat(), 'type': 'http_429_error', 'reason': 'HTTP 429 Too Many Requests from HF', 'wait_seconds': RATE_LIMIT_DELAY, 'resume_time': wait_until.isoformat(), 'file': path_in_repo } progress.setdefault('rate_limit_events', []).append(event) time.sleep(RATE_LIMIT_DELAY) upload_state['hour_start'] = time.time() upload_state['count'] = 0 else: logger.error(f"Could not upload {path_in_repo}. Error: {e}") break def load_json_file(local_path): """Loads and returns content of a JSON file.""" try: with open(local_path, 'r') as f: return json.load(f) except json.JSONDecodeError: logger.error(f"Could not decode JSON for {local_path}. Skipping.") return None except Exception as e: logger.error(f"Could not read file {local_path}. Error: {e}") return None def find_matching_all_file(ato_filename, all_file_names): """ Finds the full course file name in the 'all' dataset that corresponds to the given lesson file name from the 'ato' dataset using suffix matching. """ for all_name in all_file_names: if all_name.endswith(ato_filename): return all_name return None def load_progress(progress_file): """Load progress tracking file.""" if os.path.exists(progress_file): try: with open(progress_file, 'r') as f: return json.load(f) except: return {'processed': [], 'uploaded': [], 'rate_limit_events': []} return {'processed': [], 'uploaded': [], 'rate_limit_events': []} def save_progress(progress_file, progress): """Save progress tracking file.""" with open(progress_file, 'w') as f: json.dump(progress, f, indent=2) def run_processing_thread(): """Wrapper to run process_datasets in thread with error handling.""" try: processing_state['status'] = 'running' process_datasets() processing_state['status'] = 'completed' except Exception as e: processing_state['status'] = 'error' processing_state['error'] = str(e) logger.error(f"Processing error: {e}", exc_info=True) finally: processing_state['running'] = False logger.info("Processing thread finished") # --- Main Logic --- def process_datasets(): """ Two-phase processing: 1. Process matched pairs (ALL files with corresponding ATO transcriptions) 2. Upload remaining ALL files without transcriptions With rate limiting (128 files/hour, 429 error handling). """ # Load progress progress_file = os.path.join(OUTPUT_DIR, "progress.json") progress = load_progress(progress_file) upload_state = {'count': 0, 'hour_start': time.time()} # Ensure output repo exists try: api.create_repo(OUTPUT_REPO_ID, repo_type="dataset", exist_ok=True) logger.info(f"Using repo: {OUTPUT_REPO_ID}") except Exception as e: logger.warning(f"Could not create repo. {e}") # 1. Get the list of all files in both repositories logger.info("--- 1. Listing Repository Files ---") try: all_file_list = list_repo_files(repo_id=ALL_REPO_ID, repo_type="dataset", token=HF_TOKEN) ato_file_list = list_repo_files(repo_id=ATO_REPO_ID, repo_type="dataset", token=HF_TOKEN) except Exception as e: logger.error(f"Could not list repository files. {e}") processing_state['error'] = str(e) return # Filter for JSON files all_json_files = [f for f in all_file_list if f.endswith(".json")] ato_json_files = [f for f in ato_file_list if f.endswith(".json")] logger.info(f"Found {len(all_json_files)} JSON files in {ALL_REPO_ID}") logger.info(f"Found {len(ato_json_files)} JSON files in {ATO_REPO_ID}") # 2. Match ATO files to ALL files logger.info("--- 2. Matching ATO to ALL Files ---") match_map = {} for ato_file in ato_json_files: matching_all_file = find_matching_all_file(ato_file, all_json_files) if matching_all_file: match_map[ato_file] = matching_all_file logger.info(f"Found {len(match_map)} matching pairs.") # Create temporary directories for downloads all_download_dir = os.path.join(OUTPUT_DIR, "all_raw") ato_download_dir = os.path.join(OUTPUT_DIR, "ato_raw") os.makedirs(all_download_dir, exist_ok=True) os.makedirs(ato_download_dir, exist_ok=True) # ============================================================ # PHASE 1: Process matched pairs (with transcriptions) # ============================================================ logger.info("--- PHASE 1: Processing matched pairs (files with transcriptions) ---") logger.info(f"Total pairs to process: {len(match_map)}") logger.info(f"Already processed: {len(progress['processed'])}") logger.info(f"Already uploaded: {len(progress['uploaded'])}") processed_count = 0 matched_all_files = set() # Track which ALL files were processed in phase 1 for ato_filename, all_filename in match_map.items(): if not processing_state['running']: logger.info("Processing stopped by user") break # Skip if already processed if all_filename in progress['processed']: logger.info(f"Skipping already processed: {all_filename}") matched_all_files.add(all_filename) continue processing_state['current_file'] = all_filename logger.info(f"Processing {processed_count + 1}/{len(match_map)}: {ato_filename} <-> {all_filename}") # a. Download ATO file ato_local_path = download_file(ATO_REPO_ID, ato_filename, ato_download_dir) if not ato_local_path: continue ato_data = load_json_file(ato_local_path) if not ato_data: continue # b. Download ALL file all_local_path = download_file(ALL_REPO_ID, all_filename, all_download_dir) if not all_local_path: continue all_data = load_json_file(all_local_path) if not all_data: continue # c. Integrate transcription logger.info("Integrating transcription...") all_data["transcription_content"] = ato_data all_data["transcription_content"]["full_course_name"] = all_filename # d. Save locally final_output_path = os.path.join(OUTPUT_DIR, all_filename) with open(final_output_path, 'w') as f: json.dump(all_data, f, indent=4) logger.info(f"Saved locally to {final_output_path}") # e. Upload to samfred2/ALL2 if all_filename not in progress['uploaded']: upload_file_with_rate_limit(api, final_output_path, all_filename, upload_state, progress) progress['uploaded'].append(all_filename) processing_state['uploaded'] += 1 progress['processed'].append(all_filename) matched_all_files.add(all_filename) save_progress(progress_file, progress) processed_count += 1 processing_state['processed'] = processed_count logger.info(f"Progress: {processed_count}/{len(match_map)} | Uploaded: {len(progress['uploaded'])}") logger.info("--- PHASE 1 Complete ---") logger.info(f"Phase 1 processed: {processed_count} files with transcriptions") # ============================================================ # PHASE 2: Upload remaining ALL files without transcriptions # ============================================================ logger.info("--- PHASE 2: Uploading remaining ALL files without transcriptions ---") # Find files in ALL that don't have matches (no transcription) remaining_files = [f for f in all_json_files if f not in matched_all_files and f not in progress['uploaded']] logger.info(f"Found {len(remaining_files)} files without transcriptions to upload") remaining_count = 0 for all_filename in remaining_files: if not processing_state['running']: logger.info("Processing stopped by user during phase 2") break processing_state['current_file'] = all_filename logger.info(f"Uploading remaining file {remaining_count + 1}/{len(remaining_files)}: {all_filename}") # Download ALL file all_local_path = download_file(ALL_REPO_ID, all_filename, all_download_dir) if not all_local_path: continue # Load and prepare file all_data = load_json_file(all_local_path) if not all_data: continue # Save locally (no transcription added) final_output_path = os.path.join(OUTPUT_DIR, all_filename) with open(final_output_path, 'w') as f: json.dump(all_data, f, indent=4) logger.info(f"Saved locally to {final_output_path}") # Upload to samfred2/ALL2 upload_file_with_rate_limit(api, final_output_path, all_filename, upload_state, progress) progress['uploaded'].append(all_filename) processing_state['uploaded'] += 1 save_progress(progress_file, progress) remaining_count += 1 logger.info(f"Phase 2 Progress: {remaining_count}/{len(remaining_files)} | Total Uploaded: {len(progress['uploaded'])}") logger.info("--- PHASE 2 Complete ---") logger.info(f"Phase 2 uploaded: {remaining_count} files without transcriptions") logger.info("=== ALL PROCESSING COMPLETE ===") logger.info(f"Total processed (with transcriptions): {len(progress['processed'])}") logger.info(f"Total uploaded (all files): {len(progress['uploaded'])}") logger.info(f"Final stats: {len(progress['processed'])} with transcriptions + {remaining_count} without transcriptions = {len(progress['uploaded'])} total") if __name__ == "__main__": import sys logger.info(f"Starting server on http://127.0.0.1:8000") logger.info(f"Dashboard: http://127.0.0.1:8000") logger.info(f"Status: http://127.0.0.1:8000/status") logger.info(f"Stop: POST http://127.0.0.1:8000/stop") logger.info("Processing will auto-start on server startup...") uvicorn_run(app, host="127.0.0.1", port=8000)