Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| Hugging Face Data Processor - Single Unified Server (Modified) | |
| A complete, self-contained FastAPI application that: | |
| 1. Automatically processes all courses from samelias1/Helium and samelias1/Data | |
| 2. Merges frame data with cursor information | |
| 3. Searches for exact transcription matches in samfred2/ATO | |
| 4. Generates combined JSON output and individual course JSONs | |
| 5. **Uploads all generated files to samfred2/ALL using upload_folder with a robust file-by-file retry fallback.** | |
| 6. Provides REST API for monitoring and management | |
| 7. **Web dashboard moved to the root path (/)** | |
| Run with: python server.py | |
| Then open: http://localhost:8000 | |
| """ | |
| import json | |
| import asyncio | |
| import os | |
| import sys | |
| import time | |
| from pathlib import Path | |
| from typing import Optional, List, Dict, Any | |
| from datetime import datetime | |
| from enum import Enum | |
| from collections import defaultdict | |
| import traceback | |
| from fastapi import FastAPI, HTTPException, BackgroundTasks | |
| from fastapi.responses import FileResponse, HTMLResponse, JSONResponse | |
| from fastapi.staticfiles import StaticFiles | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from pydantic import BaseModel | |
| from huggingface_hub import hf_hub_download, HfApi | |
| from huggingface_hub.utils import HfHubHTTPError | |
| import uvicorn | |
| # ============================================================================ | |
| # Configuration | |
| # ============================================================================ | |
| DATASET_HELIUM = "samelias1/Helium" | |
| DATASET_DATA = "samelias1/Data" | |
| DATASET_ATO = "samfred2/ATO" | |
| DATASET_OUTPUT = "samfred2/ALL" | |
| OUTPUT_DIR = Path("./output") | |
| OUTPUT_DIR.mkdir(exist_ok=True) | |
| # ============================================================================ | |
| # Models & Enums | |
| # ============================================================================ | |
| class JobStatus(str, Enum): | |
| PENDING = "pending" | |
| FETCHING_FILES = "fetching_files" | |
| PROCESSING = "processing" | |
| SAVING = "saving" | |
| UPLOADING = "uploading" | |
| COMPLETED = "completed" | |
| FAILED = "failed" | |
| CANCELLED = "cancelled" | |
| class ProcessingJob(BaseModel): | |
| job_id: str | |
| status: JobStatus | |
| total_files: int = 0 | |
| processed_files: int = 0 | |
| matched_transcriptions: int = 0 | |
| error_message: Optional[str] = None | |
| created_at: str | |
| started_at: Optional[str] = None | |
| completed_at: Optional[str] = None | |
| output_file: Optional[str] = None | |
| total_uploads: int = 0 | |
| completed_uploads: int = 0 | |
| progress_percent: float = 0.0 | |
| # ============================================================================ | |
| # Global State | |
| # ============================================================================ | |
| jobs_db: Dict[str, ProcessingJob] = {} | |
| jobs_lock = asyncio.Lock() | |
| # ============================================================================ | |
| # FastAPI App Setup | |
| # ============================================================================ | |
| app = FastAPI( | |
| title="Hugging Face Data Processor", | |
| description="Process and merge Hugging Face datasets automatically", | |
| version="1.0.0" | |
| ) | |
| # Add CORS middleware | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # ============================================================================ | |
| # Helper Functions (Original) | |
| # ============================================================================ | |
| def get_hf_api() -> HfApi: | |
| """Initialize Hugging Face API client.""" | |
| return HfApi() | |
| def list_dataset_files(dataset_id: str) -> List[str]: | |
| """Fetch all file names from a Hugging Face dataset.""" | |
| try: | |
| print(f"[HF] Listing files from {dataset_id}...") | |
| api = get_hf_api() | |
| files = api.list_repo_files(repo_id=dataset_id, repo_type="dataset") | |
| file_list = list(files) | |
| print(f"[HF] Found {len(file_list)} files in {dataset_id}") | |
| return file_list | |
| except Exception as e: | |
| print(f"[ERROR] Failed to list files from {dataset_id}: {e}") | |
| return [] | |
| def download_file(repo_id: str, file_name: str) -> Optional[str]: | |
| """Download a file from Hugging Face dataset to cache.""" | |
| try: | |
| path = hf_hub_download( | |
| repo_id=repo_id, | |
| filename=file_name, | |
| repo_type="dataset" | |
| ) | |
| return path | |
| except Exception as e: | |
| print(f"[ERROR] Failed to download {file_name}: {e}") | |
| return None | |
| def load_json_file(file_path: str) -> Optional[Dict | List]: | |
| """Load and parse a JSON file.""" | |
| try: | |
| with open(file_path, "r") as f: | |
| return json.load(f) | |
| except Exception as e: | |
| print(f"[ERROR] Failed to load JSON from {file_path}: {e}") | |
| return None | |
| def merge_course_data(helium_path: str, data_path: str) -> List[Dict]: | |
| """Merge frame data from Helium with cursor data from Data dataset.""" | |
| try: | |
| helium_data = load_json_file(helium_path) | |
| data_data = load_json_file(data_path) | |
| if not helium_data or not data_data: | |
| return [] | |
| # Create lookup dictionary from Data dataset | |
| cursor_lookup = {} | |
| for item in data_data: | |
| key = (item.get("course"), item.get("image_path")) | |
| cursor_lookup[key] = {k: v for k, v in item.items() if k not in ["course", "image_path"]} | |
| # Merge with Helium data | |
| merged_data = [] | |
| for index, item in enumerate(helium_data): | |
| key = (item.get("course"), item.get("image_path")) | |
| merged_item = item.copy() | |
| if key in cursor_lookup: | |
| merged_item.update(cursor_lookup[key]) | |
| # Clean up unwanted fields | |
| merged_item.pop("server_url", None) | |
| merged_item.pop("timestamp", None) | |
| # Renumber image_path sequentially | |
| merged_item["image_path"] = index + 1 | |
| merged_data.append(merged_item) | |
| return merged_data | |
| except Exception as e: | |
| print(f"[ERROR] Failed to merge course data: {e}") | |
| return [] | |
| def find_exact_transcription(course_file: str, ato_files: List[str]) -> Optional[str]: | |
| """Search for exact transcription file match in ATO dataset.""" | |
| expected_file = course_file.replace("_frames.json", ".json") | |
| if expected_file in ato_files: | |
| return expected_file | |
| return None | |
| # ============================================================================ | |
| # Upload Logic with Intelligent Fallback | |
| # ============================================================================ | |
| def upload_file_with_retry(api: HfApi, local_path: Path, path_in_repo: str, repo_id: str): | |
| """Uploads a single file to Hugging Face with a 1-hour retry on rate limit error (HTTP 429).""" | |
| while True: | |
| try: | |
| print(f"[HF UPLOAD] Uploading {local_path.name} to {repo_id}/{path_in_repo}...") | |
| api.upload_file( | |
| path_or_fileobj=str(local_path), | |
| path_in_repo=path_in_repo, | |
| repo_id=repo_id, | |
| repo_type="dataset", | |
| commit_message=f"Automated upload: {local_path.name}" | |
| ) | |
| print(f"[HF UPLOAD] ✓ Successfully uploaded {local_path.name}") | |
| break # Success, exit the loop | |
| except HfHubHTTPError as e: | |
| if e.response.status_code == 429: | |
| print(f"\n{'='*70}") | |
| print(f"[RATE LIMIT HIT] Received HTTP 429 for {local_path.name}.") | |
| print("Pausing for 1 hour (3600 seconds) before retrying...") | |
| print(f"{'='*70}\n") | |
| time.sleep(3600) # Pause for 1 hour | |
| print(f"\n{'='*70}") | |
| print(f"[RETRY] Resuming upload for {local_path.name}...") | |
| print(f"{'='*70}\n") | |
| else: | |
| print(f"[ERROR] Failed to upload {local_path.name} with unhandled HTTP error: {e}") | |
| raise # Re-raise other HTTP errors | |
| except Exception as e: | |
| print(f"[ERROR] An unexpected error occurred during upload of {local_path.name}: {e}") | |
| raise # Re-raise other errors | |
| def upload_all_files(job: ProcessingJob, all_courses: List[Dict], combined_file_path: Path): | |
| """ | |
| Handles the saving of individual course files and the combined upload process. | |
| Attempts upload_folder first, then falls back to file-by-file with retry. | |
| """ | |
| api = get_hf_api() | |
| # 1. Save all files (combined and individual) to OUTPUT_DIR | |
| print("\n[SAVE] Saving individual course JSONs...") | |
| # Ensure the combined file is saved first (it was in the main processing loop, but we ensure it here) | |
| if not combined_file_path.exists(): | |
| with open(combined_file_path, "w") as f: | |
| json.dump(all_courses, f, indent=2) | |
| # Save individual course JSONs | |
| for course_data in all_courses: | |
| course_name = course_data["course"] | |
| individual_file_name = f"{course_name}.json" | |
| individual_file_path = OUTPUT_DIR / individual_file_name | |
| with open(individual_file_path, "w") as f: | |
| json.dump(course_data, f, indent=2) | |
| print(f" ✓ Saved {individual_file_name}") | |
| # Get list of all files to upload for fallback and tracking | |
| files_to_upload = [p for p in OUTPUT_DIR.iterdir() if p.is_file() and p.suffix == '.json'] | |
| job.total_uploads = len(files_to_upload) | |
| print(f"\n[UPLOAD] Starting intelligent upload of {job.total_uploads} files to {DATASET_OUTPUT}...") | |
| # --- Strategy 1: Try upload_folder --- | |
| try: | |
| print(f"[UPLOAD] Attempting bulk upload using HfApi.upload_folder...") | |
| api.upload_folder( | |
| folder_path=str(OUTPUT_DIR), | |
| repo_id=DATASET_OUTPUT, | |
| repo_type="dataset", | |
| commit_message=f"Automated bulk upload of {job.total_uploads} files" | |
| ) | |
| job.completed_uploads = job.total_uploads | |
| print(f"[UPLOAD] ✓ Bulk upload successful.") | |
| return # Exit if successful | |
| except Exception as e: | |
| print(f"\n{'='*70}") | |
| print(f"[UPLOAD FALLBACK] Bulk upload failed: {e}") | |
| print(f"Falling back to file-by-file upload with 1-hour retry mechanism.") | |
| print(f"{'='*70}\n") | |
| # --- Strategy 2: Fallback to file-by-file with retry --- | |
| job.completed_uploads = 0 | |
| for idx, local_path in enumerate(files_to_upload): | |
| try: | |
| upload_file_with_retry( | |
| api=api, | |
| local_path=local_path, | |
| path_in_repo=local_path.name, | |
| repo_id=DATASET_OUTPUT | |
| ) | |
| job.completed_uploads = idx + 1 | |
| except Exception as upload_e: | |
| # If even the retry logic fails, we log and re-raise to fail the job | |
| print(f"[FATAL ERROR] File-by-file upload failed for {local_path.name}: {upload_e}") | |
| raise upload_e | |
| print(f"\n[UPLOAD] All {job.completed_uploads}/{job.total_uploads} files successfully uploaded to {DATASET_OUTPUT}.") | |
| # ============================================================================ | |
| # Main Processing Logic (Modified - FIX APPLIED HERE) | |
| # ============================================================================ | |
| # FIX: Changed from 'async def' to 'def' because this function contains blocking I/O | |
| # and is intended to be run in a separate thread via asyncio.to_thread. | |
| def process_single_course( | |
| course_file: str, | |
| job: ProcessingJob, | |
| ato_files: List[str] | |
| ) -> Optional[Dict]: | |
| """Process a single course: merge data and fetch transcription if available.""" | |
| try: | |
| # Download from Helium and Data | |
| helium_path = download_file(DATASET_HELIUM, course_file) | |
| data_path = download_file(DATASET_DATA, course_file) | |
| if not helium_path or not data_path: | |
| return None | |
| # Merge frame data | |
| merged_frames = merge_course_data(helium_path, data_path) | |
| if not merged_frames: | |
| return None | |
| # Try to find and download transcription | |
| transcription_data = None | |
| expected_ato_file = find_exact_transcription(course_file, ato_files) | |
| if expected_ato_file: | |
| ato_path = download_file(DATASET_ATO, expected_ato_file) | |
| if ato_path: | |
| transcription_data = load_json_file(ato_path) | |
| # NOTE: job.matched_transcriptions is a mutable attribute of the job object | |
| # which is safe to modify here as it's running in a single thread per job. | |
| if transcription_data: | |
| job.matched_transcriptions += 1 | |
| # Prepare output: frames + transcription (or "none") | |
| course_name = course_file.replace("_frames.json", "") | |
| output = { | |
| "course": course_name, | |
| "frames": merged_frames, | |
| "transcription": transcription_data if transcription_data else "none" | |
| } | |
| return output | |
| except Exception as e: | |
| print(f"[ERROR] Failed to process {course_file}: {e}") | |
| traceback.print_exc() | |
| return None | |
| async def process_all_courses_background(job_id: str): | |
| """Main background processing function.""" | |
| job = jobs_db.get(job_id) | |
| if not job: | |
| return | |
| try: | |
| job.status = JobStatus.FETCHING_FILES | |
| job.started_at = datetime.utcnow().isoformat() | |
| print(f"\n{'='*70}") | |
| print(f"[JOB] Starting job: {job_id}") | |
| print(f"{'='*70}\n") | |
| # Fetch file lists from all datasets | |
| # NOTE: list_dataset_files contains blocking I/O, so it should be run in a thread. | |
| # However, since it's only called once at the start, we can use asyncio.to_thread. | |
| print("[INIT] Fetching file lists from datasets...") | |
| helium_files = await asyncio.to_thread(list_dataset_files, DATASET_HELIUM) | |
| ato_files = await asyncio.to_thread(list_dataset_files, DATASET_ATO) | |
| # Filter to only _frames.json files from Helium | |
| course_files = [f for f in helium_files if f.endswith("_frames.json")] | |
| job.total_files = len(course_files) | |
| print(f"[INIT] Found {len(course_files)} courses to process") | |
| print(f"[INIT] Found {len(ato_files)} files in ATO dataset\n") | |
| # Process each course | |
| job.status = JobStatus.PROCESSING | |
| all_courses = [] | |
| for idx, course_file in enumerate(course_files): | |
| try: | |
| # process_single_course is now synchronous and correctly run in a thread | |
| course_data = await asyncio.to_thread( | |
| process_single_course, | |
| course_file, | |
| job, | |
| ato_files | |
| ) | |
| if course_data: | |
| all_courses.append(course_data) | |
| job.processed_files = idx + 1 | |
| job.progress_percent = (job.processed_files / job.total_files) * 100 | |
| print(f"[PROGRESS] {job.processed_files}/{job.total_files} ({job.progress_percent:.1f}%)") | |
| # Small delay to avoid rate limiting | |
| await asyncio.sleep(0.05) | |
| except Exception as e: | |
| print(f"[ERROR] Failed to process {course_file}: {e}") | |
| continue | |
| # Save combined output (needed for upload_all_files) | |
| job.status = JobStatus.SAVING | |
| timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S") | |
| output_file_name = f"combined_output_{timestamp}.json" | |
| output_file = OUTPUT_DIR / output_file_name | |
| print(f"\n[SAVE] Saving combined output to {output_file}...") | |
| with open(output_file, "w") as f: | |
| json.dump(all_courses, f, indent=2) | |
| job.output_file = str(output_file) | |
| # Upload all files with intelligent fallback | |
| job.status = JobStatus.UPLOADING | |
| await asyncio.to_thread(upload_all_files, job, all_courses, output_file) | |
| job.status = JobStatus.COMPLETED | |
| job.completed_at = datetime.utcnow().isoformat() | |
| print(f"\n{'='*70}") | |
| print(f"[SUCCESS] Job completed!") | |
| print(f"{'='*70}") | |
| print(f"Total courses processed: {len(all_courses)}") | |
| print(f"Transcriptions matched: {job.matched_transcriptions}") | |
| print(f"Output file: {output_file}") | |
| print(f"File size: {output_file.stat().st_size / (1024*1024):.2f} MB") | |
| print(f"{'='*70}\n") | |
| except Exception as e: | |
| job.status = JobStatus.FAILED | |
| job.error_message = str(e) | |
| job.completed_at = datetime.utcnow().isoformat() | |
| print(f"\n[FAILED] Job failed: {e}") | |
| traceback.print_exc() | |
| # ============================================================================ | |
| # API Endpoints (Modified) | |
| # ============================================================================ | |
| async def health_check(): | |
| """Health check endpoint (moved from /).""" | |
| return { | |
| "status": "running", | |
| "service": "Hugging Face Data Processor", | |
| "version": "1.0.0", | |
| "dashboard": "http://localhost:8000/" | |
| } | |
| async def create_job(background_tasks: BackgroundTasks): | |
| """Create and start a new processing job.""" | |
| job_id = f"job_{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}" | |
| job = ProcessingJob( | |
| job_id=job_id, | |
| status=JobStatus.PENDING, | |
| created_at=datetime.utcnow().isoformat() | |
| ) | |
| async with jobs_lock: | |
| jobs_db[job_id] = job | |
| # Start processing in background | |
| background_tasks.add_task(process_all_courses_background, job_id) | |
| return { | |
| "job_id": job_id, | |
| "status": "started", | |
| "message": "Processing job created and started" | |
| } | |
| async def get_job_status(job_id: str): | |
| """Get the status of a processing job.""" | |
| job = jobs_db.get(job_id) | |
| if not job: | |
| raise HTTPException(status_code=404, detail="Job not found") | |
| return job | |
| async def list_jobs(): | |
| """List all processing jobs.""" | |
| return { | |
| "total_jobs": len(jobs_db), | |
| "jobs": list(jobs_db.values()) | |
| } | |
| async def cancel_job(job_id: str): | |
| """Cancel a processing job.""" | |
| job = jobs_db.get(job_id) | |
| if not job: | |
| raise HTTPException(status_code=404, detail="Job not found") | |
| if job.status in [JobStatus.COMPLETED, JobStatus.FAILED, JobStatus.CANCELLED]: | |
| raise HTTPException(status_code=400, detail="Cannot cancel completed or failed job") | |
| job.status = JobStatus.CANCELLED | |
| job.error_message = "Job cancelled by user" | |
| job.completed_at = datetime.utcnow().isoformat() | |
| return {"status": "cancelled", "job_id": job_id} | |
| async def get_job_output(job_id: str): | |
| """Download the combined output JSON for a completed job.""" | |
| job = jobs_db.get(job_id) | |
| if not job: | |
| raise HTTPException(status_code=404, detail="Job not found") | |
| if job.status != JobStatus.COMPLETED: | |
| raise HTTPException(status_code=400, detail="Job not completed yet") | |
| if not job.output_file: | |
| raise HTTPException(status_code=404, detail="Output file not found") | |
| try: | |
| return FileResponse( | |
| path=job.output_file, | |
| filename=Path(job.output_file).name, | |
| media_type="application/json" | |
| ) | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Error reading output: {str(e)}") | |
| async def get_stats(): | |
| """Get overall statistics about all jobs.""" | |
| total_jobs = len(jobs_db) | |
| completed = sum(1 for j in jobs_db.values() if j.status == JobStatus.COMPLETED) | |
| failed = sum(1 for j in jobs_db.values() if j.status == JobStatus.FAILED) | |
| processing = sum(1 for j in jobs_db.values() if j.status in [JobStatus.PROCESSING, JobStatus.FETCHING_FILES, JobStatus.SAVING, JobStatus.UPLOADING]) | |
| total_files = sum(j.total_files for j in jobs_db.values()) | |
| total_processed = sum(j.processed_files for j in jobs_db.values()) | |
| total_matched = sum(j.matched_transcriptions for j in jobs_db.values()) | |
| return { | |
| "total_jobs": total_jobs, | |
| "completed_jobs": completed, | |
| "failed_jobs": failed, | |
| "processing_jobs": processing, | |
| "total_files_processed": total_processed, | |
| "total_files": total_files, | |
| "total_transcriptions_matched": total_matched | |
| } | |
| # ============================================================================ | |
| # Web Dashboard (Original - Truncated for brevity, assuming it's the same) | |
| # ============================================================================ | |
| DASHBOARD_HTML = """ | |
| <!DOCTYPE html> | |
| <html lang="en"> | |
| <head> | |
| <meta charset="UTF-8"> | |
| <meta name="viewport" content="width=device-width, initial-scale=1.0"> | |
| <title>Hugging Face Data Processor</title> | |
| <style> | |
| /* ... (Original CSS) ... */ | |
| * { | |
| margin: 0; | |
| padding: 0; | |
| box-sizing: border-box; | |
| } | |
| body { | |
| font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, Oxygen, Ubuntu, Cantarell, sans-serif; | |
| background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); | |
| min-height: 100vh; | |
| padding: 20px; | |
| } | |
| .container { | |
| max-width: 1200px; | |
| margin: 0 auto; | |
| } | |
| header { | |
| background: rgba(255, 255, 255, 0.95); | |
| padding: 30px; | |
| border-radius: 12px; | |
| margin-bottom: 30px; | |
| box-shadow: 0 10px 40px rgba(0, 0, 0, 0.1); | |
| } | |
| h1 { | |
| color: #333; | |
| margin-bottom: 10px; | |
| font-size: 2.5em; | |
| } | |
| .subtitle { | |
| color: #666; | |
| font-size: 1.1em; | |
| } | |
| .controls { | |
| display: flex; | |
| gap: 15px; | |
| margin-top: 20px; | |
| flex-wrap: wrap; | |
| } | |
| button { | |
| background: #667eea; | |
| color: white; | |
| border: none; | |
| padding: 12px 24px; | |
| border-radius: 6px; | |
| cursor: pointer; | |
| font-size: 1em; | |
| font-weight: 600; | |
| transition: all 0.3s ease; | |
| } | |
| button:hover { | |
| background: #764ba2; | |
| transform: translateY(-2px); | |
| box-shadow: 0 5px 15px rgba(0, 0, 0, 0.2); | |
| } | |
| button:disabled { | |
| background: #ccc; | |
| cursor: not-allowed; | |
| transform: none; | |
| } | |
| .grid { | |
| display: grid; | |
| grid-template-columns: repeat(auto-fit, minmax(300px, 1fr)); | |
| gap: 20px; | |
| margin-bottom: 30px; | |
| } | |
| .card { | |
| background: rgba(255, 255, 255, 0.95); | |
| padding: 25px; | |
| border-radius: 12px; | |
| box-shadow: 0 10px 40px rgba(0, 0, 0, 0.1); | |
| } | |
| .card h2 { | |
| color: #333; | |
| margin-bottom: 15px; | |
| font-size: 1.3em; | |
| } | |
| .stat { | |
| display: flex; | |
| justify-content: space-between; | |
| padding: 10px 0; | |
| border-bottom: 1px solid #eee; | |
| } | |
| .stat:last-child { | |
| border-bottom: none; | |
| } | |
| .stat-label { | |
| color: #666; | |
| font-weight: 500; | |
| } | |
| .stat-value { | |
| color: #333; | |
| font-weight: 700; | |
| font-size: 1.1em; | |
| } | |
| .job-list { | |
| background: rgba(255, 255, 255, 0.95); | |
| padding: 25px; | |
| border-radius: 12px; | |
| box-shadow: 0 10px 40px rgba(0, 0, 0, 0.1); | |
| } | |
| .job-item { | |
| padding: 20px; | |
| border: 1px solid #eee; | |
| border-radius: 8px; | |
| margin-bottom: 15px; | |
| background: #f9f9f9; | |
| } | |
| .job-header { | |
| display: flex; | |
| justify-content: space-between; | |
| align-items: center; | |
| margin-bottom: 15px; | |
| } | |
| .job-id { | |
| font-family: monospace; | |
| color: #667eea; | |
| font-weight: 600; | |
| } | |
| .job-status { | |
| padding: 6px 12px; | |
| border-radius: 20px; | |
| font-size: 0.9em; | |
| font-weight: 600; | |
| } | |
| .status-pending { | |
| background: #fff3cd; | |
| color: #856404; | |
| } | |
| .status-processing, .status-fetching_files, .status-saving, .status-uploading { | |
| background: #cfe2ff; | |
| color: #084298; | |
| } | |
| .status-completed { | |
| background: #d1e7dd; | |
| color: #0f5132; | |
| } | |
| .status-failed { | |
| background: #f8d7da; | |
| color: #842029; | |
| } | |
| .status-cancelled { | |
| background: #e2e3e5; | |
| color: #495057; | |
| } | |
| .progress-bar-container { | |
| background-color: #e0e0e0; | |
| border-radius: 5px; | |
| overflow: hidden; | |
| margin-top: 10px; | |
| } | |
| .progress-bar { | |
| height: 20px; | |
| background-color: #667eea; | |
| text-align: center; | |
| line-height: 20px; | |
| color: white; | |
| transition: width 0.5s ease; | |
| } | |
| .job-details { | |
| font-size: 0.9em; | |
| color: #555; | |
| } | |
| .job-details p { | |
| margin: 5px 0; | |
| } | |
| .job-details strong { | |
| color: #333; | |
| } | |
| .error-message { | |
| color: #842029; | |
| background: #f8d7da; | |
| padding: 10px; | |
| border-radius: 5px; | |
| margin-top: 10px; | |
| font-weight: 500; | |
| } | |
| footer { | |
| text-align: center; | |
| margin-top: 30px; | |
| color: rgba(255, 255, 255, 0.8); | |
| font-size: 0.9em; | |
| } | |
| </style> | |
| <script> | |
| const API_BASE = "/api"; | |
| let isProcessing = false; | |
| function formatStatus(status) { | |
| return status.replace('_', ' ').toUpperCase(); | |
| } | |
| function getStatusClass(status) { | |
| return `status-${status}`; | |
| } | |
| function updateStats(stats) { | |
| document.getElementById('total-jobs').textContent = stats.total_jobs; | |
| document.getElementById('completed-jobs').textContent = stats.completed_jobs; | |
| document.getElementById('failed-jobs').textContent = stats.failed_jobs; | |
| document.getElementById('processing-jobs').textContent = stats.processing_jobs; | |
| document.getElementById('total-files').textContent = stats.total_files; | |
| document.getElementById('processed-files').textContent = stats.total_files_processed; | |
| document.getElementById('matched-transcriptions').textContent = stats.total_transcriptions_matched; | |
| } | |
| function updateJobList(jobs) { | |
| const jobList = document.getElementById('job-list'); | |
| jobList.innerHTML = ''; | |
| jobs.sort((a, b) => new Date(b.created_at) - new Date(a.created_at)); | |
| jobs.forEach(job => { | |
| const jobItem = document.createElement('div'); | |
| jobItem.className = 'job-item'; | |
| const statusClass = getStatusClass(job.status); | |
| const progress = job.progress_percent.toFixed(1); | |
| let uploadProgress = ''; | |
| if (job.status === 'uploading' && job.total_uploads > 0) { | |
| // Display upload progress based on completed_uploads | |
| const uploadPercent = (job.completed_uploads / job.total_uploads) * 100; | |
| uploadProgress = `<p><strong>Upload Progress:</strong> ${job.completed_uploads} / ${job.total_uploads} files uploaded (${uploadPercent.toFixed(1)}%)</p>`; | |
| } | |
| jobItem.innerHTML = ` | |
| <div class="job-header"> | |
| <span class="job-id">${job.job_id}</span> | |
| <span class="job-status ${statusClass}">${formatStatus(job.status)}</span> | |
| </div> | |
| <div class="job-details"> | |
| <p><strong>Created:</strong> ${new Date(job.created_at).toLocaleString()}</p> | |
| ${job.started_at ? `<p><strong>Started:</strong> ${new Date(job.started_at).toLocaleString()}</p>` : ''} | |
| ${job.completed_at ? `<p><strong>Completed:</strong> ${new Date(job.completed_at).toLocaleString()}</p>` : ''} | |
| <p><strong>Files:</strong> ${job.processed_files} / ${job.total_files} processed</p> | |
| <p><strong>Transcriptions Matched:</strong> ${job.matched_transcriptions}</p> | |
| ${uploadProgress} | |
| ${job.output_file ? `<p><strong>Output:</strong> <a href="${API_BASE}/jobs/${job.job_id}/output" target="_blank">${job.output_file.split('/').pop()}</a></p>` : ''} | |
| ${job.error_message ? `<div class="error-message">Error: ${job.error_message}</div>` : ''} | |
| </div> | |
| <div class="progress-bar-container"> | |
| <div class="progress-bar" style="width: ${progress}%;"> | |
| ${progress}% | |
| </div> | |
| </div> | |
| `; | |
| jobList.appendChild(jobItem); | |
| }); | |
| isProcessing = jobs.some(j => j.status === 'processing' || j.status === 'fetching_files' || j.status === 'saving' || j.status === 'uploading'); | |
| document.getElementById('create-job-btn').disabled = isProcessing; | |
| } | |
| async function fetchData() { | |
| try { | |
| const [statsResponse, jobsResponse] = await Promise.all([ | |
| fetch(`${API_BASE}/stats`), | |
| fetch(`${API_BASE}/jobs`) | |
| ]); | |
| const stats = await statsResponse.json(); | |
| const jobsData = await jobsResponse.json(); | |
| updateStats(stats); | |
| updateJobList(jobsData.jobs); | |
| } catch (error) { | |
| console.error("Error fetching data:", error); | |
| } | |
| } | |
| async function createJob() { | |
| if (isProcessing) return; | |
| document.getElementById('create-job-btn').disabled = true; | |
| document.getElementById('create-job-btn').textContent = 'Starting...'; | |
| try { | |
| const response = await fetch(`${API_BASE}/jobs/create`, { method: 'POST' }); | |
| const result = await response.json(); | |
| if (response.ok) { | |
| console.log("Job created:", result); | |
| } else { | |
| alert(`Failed to create job: ${result.detail || response.statusText}`); | |
| } | |
| } catch (error) { | |
| console.error("Error creating job:", error); | |
| alert("An error occurred while trying to create the job."); | |
| } finally { | |
| document.getElementById('create-job-btn').textContent = 'Start New Processing Job'; | |
| fetchData(); // Refresh immediately after attempt | |
| } | |
| } | |
| document.addEventListener('DOMContentLoaded', () => { | |
| document.getElementById('create-job-btn').addEventListener('click', createJob); | |
| fetchData(); | |
| setInterval(fetchData, 5000); // Refresh every 5 seconds | |
| }); | |
| </script> | |
| </head> | |
| <body> | |
| <div class="container"> | |
| <header> | |
| <h1>Hugging Face Data Processor</h1> | |
| <p class="subtitle">Automated processing and upload service for Helium/Data datasets.</p> | |
| <div class="controls"> | |
| <button id="create-job-btn">Start New Processing Job</button> | |
| </div> | |
| </header> | |
| <div class="grid"> | |
| <div class="card"> | |
| <h2>Overall Statistics</h2> | |
| <div class="stat"> | |
| <span class="stat-label">Total Jobs</span> | |
| <span class="stat-value" id="total-jobs">0</span> | |
| </div> | |
| <div class="stat"> | |
| <span class="stat-label">Completed Jobs</span> | |
| <span class="stat-value" id="completed-jobs">0</span> | |
| </div> | |
| <div class="stat"> | |
| <span class="stat-label">Failed Jobs</span> | |
| <span class="stat-value" id="failed-jobs">0</span> | |
| </div> | |
| <div class="stat"> | |
| <span class="stat-label">Processing Jobs</span> | |
| <span class="stat-value" id="processing-jobs">0</span> | |
| </div> | |
| </div> | |
| <div class="card"> | |
| <h2>Processing Totals</h2> | |
| <div class="stat"> | |
| <span class="stat-label">Total Files Found</span> | |
| <span class="stat-value" id="total-files">0</span> | |
| </div> | |
| <div class="stat"> | |
| <span class="stat-label">Total Files Processed</span> | |
| <span class="stat-value" id="processed-files">0</span> | |
| </div> | |
| <div class="stat"> | |
| <span class="stat-label">Transcriptions Matched</span> | |
| <span class="stat-value" id="matched-transcriptions">0</span> | |
| </div> | |
| </div> | |
| </div> | |
| <div class="job-list"> | |
| <h2>Recent Jobs</h2> | |
| <div id="job-list"> | |
| <!-- Job items will be inserted here by JavaScript --> | |
| </div> | |
| </div> | |
| <footer> | |
| Hugging Face Data Processor v1.0.0 | Running on Uvicorn/FastAPI | |
| </footer> | |
| </div> | |
| </body> | |
| </html> | |
| """ | |
| async def dashboard(): | |
| """Web dashboard endpoint (moved to root).""" | |
| return DASHBOARD_HTML | |
| # ============================================================================ | |
| # Main Execution Block | |
| # ============================================================================ | |
| def main(): | |
| print("="*70) | |
| print("Hugging Face Data Processor Server") | |
| print(f"Dashboard: http://localhost:8000/") | |
| print(f"Health Check: http://localhost:8000/api/health") | |
| print(f"Output Dir: {OUTPUT_DIR.absolute()}") | |
| print("="*70 + "\n") | |
| uvicorn.run( | |
| app, | |
| host="0.0.0.0", | |
| port=8000, | |
| log_level="info" | |
| ) | |
| if __name__ == "__main__": | |
| # Ensure the huggingface_hub library is installed | |
| try: | |
| import huggingface_hub | |
| except ImportError: | |
| print("The 'huggingface_hub' library is not installed. Please install it with: pip install huggingface-hub") | |
| sys.exit(1) | |
| main() | |