Spaces:
Paused
Paused
| """ | |
| Centralized Configuration for Path Management | |
| This module provides environment-aware path management to ensure | |
| compatibility between local development and HuggingFace Space deployment. | |
| Usage: | |
| from code.cube3d.config import DATA_DIR, LABEL_MAPPINGS, get_mapping_paths | |
| # Get paths for a specific mapping set | |
| forward_path, inverse_path = get_mapping_paths("subset_1k") | |
| """ | |
| import os | |
| import json | |
| import re | |
| import tempfile | |
| from pathlib import Path | |
| from typing import Dict, Tuple, Optional | |
| # ============================================================================ | |
| # Environment Detection | |
| # ============================================================================ | |
| def detect_environment() -> str: | |
| """ | |
| Detect current runtime environment | |
| Returns: | |
| "huggingface" if running on HF Space, "local" otherwise | |
| """ | |
| if os.getenv("SPACE_ID") or os.getenv("SPACE_AUTHOR_NAME"): | |
| return "huggingface" | |
| return "local" | |
| # ============================================================================ | |
| # Path Configuration | |
| # ============================================================================ | |
| ENVIRONMENT = detect_environment() | |
| # Project root detection | |
| if ENVIRONMENT == "huggingface": | |
| # HuggingFace Space: app runs from /home/user/app | |
| PROJECT_ROOT = Path("/home/user/app") | |
| else: | |
| # Local: calculate from this file's location | |
| # config.py is at: code/cube3d/config.py | |
| # So PROJECT_ROOT = ../../.. from here | |
| PROJECT_ROOT = Path(__file__).parent.parent.parent.resolve() | |
| # Data directory | |
| DATA_DIR = PROJECT_ROOT / "data" | |
| # Subdirectories | |
| CAR_1K_DIR = DATA_DIR / "car_1k" | |
| CAR_DATA_DIR = DATA_DIR / "1313个筛选车结构和对照渲染图" | |
| # HuggingFace model cache directory | |
| # Note: preload_from_hub ALWAYS saves to ~/.cache/huggingface/hub | |
| # It does NOT respect HF_HOME environment variable (HF official docs confirmed) | |
| if ENVIRONMENT == "huggingface": | |
| # Use default HF cache location where preload_from_hub saves models | |
| HF_CACHE_DIR = os.path.expanduser("~/.cache/huggingface") | |
| print(f"[Config] Using HF cache directory: {HF_CACHE_DIR}") | |
| # Ensure directory exists with proper permissions | |
| cache_path = Path(HF_CACHE_DIR) | |
| try: | |
| cache_path.mkdir(parents=True, exist_ok=True, mode=0o755) | |
| print(f"[Config] Cache directory created/verified with mode 755") | |
| except Exception as e: | |
| print(f"[Config] Warning: Could not set cache directory permissions: {e}") | |
| # Clean stale lock files that may prevent model loading | |
| try: | |
| lock_files = list(cache_path.glob("**/*.lock")) | |
| if lock_files: | |
| print(f"[Config] Found {len(lock_files)} lock files, attempting cleanup...") | |
| cleaned = 0 | |
| for lock_file in lock_files: | |
| try: | |
| lock_file.unlink() | |
| cleaned += 1 | |
| except Exception as e: | |
| print(f"[Config] Could not remove {lock_file.name}: {e}") | |
| print(f"[Config] Cleaned {cleaned}/{len(lock_files)} lock files") | |
| else: | |
| print(f"[Config] No stale lock files found") | |
| except Exception as e: | |
| print(f"[Config] Lock file cleanup failed: {e}") | |
| else: | |
| # Local development | |
| HF_CACHE_DIR = os.path.expanduser("~/.cache/huggingface") | |
| # Ensure cache directory exists (fallback for local) | |
| if ENVIRONMENT != "huggingface": | |
| os.makedirs(HF_CACHE_DIR, exist_ok=True) | |
| # ============================================================================ | |
| # Label Mapping Paths | |
| # ============================================================================ | |
| LABEL_MAPPINGS: Dict[str, Dict[str, Path]] = { | |
| "subset_self": { | |
| "forward": CAR_1K_DIR / "subset_self" / "label_mapping.json", | |
| "inverse": CAR_1K_DIR / "subset_self" / "label_inverse_mapping.json", | |
| }, | |
| "subset_1k": { | |
| "forward": CAR_1K_DIR / "subset_1k" / "label_mapping_merge.json", | |
| "inverse": CAR_1K_DIR / "subset_1k" / "label_inverse_mapping_merge.json", | |
| }, | |
| } | |
| # Runtime-generated mapping cache (for HuggingFace Space with storage limits) | |
| _RUNTIME_MAPPING_CACHE: Dict[str, Tuple[str, str]] = {} | |
| # ============================================================================ | |
| # Helper Functions | |
| # ============================================================================ | |
| def generate_label_mappings_from_ldr(ldr_dir: Path, mapping_type: str = "subset_1k") -> Tuple[str, str]: | |
| """ | |
| Generate label mappings by scanning LDR files at runtime | |
| This is a fallback for HuggingFace Spaces where storage limits prevent | |
| pre-uploading large mapping files. Mappings are cached in memory. | |
| Args: | |
| ldr_dir: Directory containing LDR files | |
| mapping_type: Type of mapping to generate | |
| Returns: | |
| Tuple of (forward_mapping_path, inverse_mapping_path) in /tmp | |
| """ | |
| print(f"🔧 Generating label mappings from LDR files in {ldr_dir}...") | |
| # Check cache first | |
| if mapping_type in _RUNTIME_MAPPING_CACHE: | |
| print(f"✅ Using cached mappings for {mapping_type}") | |
| return _RUNTIME_MAPPING_CACHE[mapping_type] | |
| # Scan LDR files | |
| label_mapping = {} # part_name -> ID | |
| label_inverse_mapping = {} # ID -> part_name | |
| label_counter = 0 | |
| ldr_files = list(ldr_dir.glob("**/*.ldr")) | |
| print(f"📂 Found {len(ldr_files)} LDR files to process") | |
| for ldr_file in ldr_files: | |
| try: | |
| with open(ldr_file, 'r', encoding='utf-8', errors='ignore') as f: | |
| for line in f: | |
| if line.startswith('1'): # Part data line | |
| parts = line.split() | |
| if len(parts) < 15: | |
| continue | |
| # Extract part identifier (lowercase, starting digits) | |
| filename = parts[14].lower() | |
| match = re.match(r'^\d+', filename) | |
| part_identifier = match.group() if match else filename | |
| if part_identifier not in label_mapping: | |
| label_mapping[part_identifier] = label_counter | |
| label_inverse_mapping[label_counter] = part_identifier | |
| label_counter += 1 | |
| except Exception as e: | |
| print(f"⚠️ Error processing {ldr_file}: {e}") | |
| continue | |
| print(f"✅ Generated {len(label_mapping)} unique part mappings") | |
| # Save to /tmp directory | |
| tmp_dir = Path(tempfile.gettempdir()) / "lego_mappings" / mapping_type | |
| tmp_dir.mkdir(parents=True, exist_ok=True) | |
| forward_path = tmp_dir / "label_mapping_merge.json" | |
| inverse_path = tmp_dir / "label_inverse_mapping_merge.json" | |
| with open(forward_path, 'w', encoding='utf-8') as f: | |
| json.dump(label_mapping, f, ensure_ascii=False, indent=2) | |
| # Convert int keys to str keys for JSON | |
| inverse_str_keys = {str(k): v for k, v in label_inverse_mapping.items()} | |
| with open(inverse_path, 'w', encoding='utf-8') as f: | |
| json.dump(inverse_str_keys, f, ensure_ascii=False, indent=2) | |
| print(f"💾 Saved mappings to:") | |
| print(f" {forward_path}") | |
| print(f" {inverse_path}") | |
| # Cache the paths | |
| result = (str(forward_path), str(inverse_path)) | |
| _RUNTIME_MAPPING_CACHE[mapping_type] = result | |
| return result | |
| def get_mapping_paths(mapping_type: str = "subset_1k") -> Tuple[str, str]: | |
| """ | |
| Get label mapping file paths for a given mapping type | |
| Automatically generates mappings from LDR files if not found. | |
| Args: | |
| mapping_type: Either "subset_self" or "subset_1k" | |
| Returns: | |
| Tuple of (forward_mapping_path, inverse_mapping_path) as strings | |
| Raises: | |
| ValueError: If mapping_type is invalid | |
| """ | |
| if mapping_type not in LABEL_MAPPINGS: | |
| raise ValueError( | |
| f"Invalid mapping_type: {mapping_type}. " | |
| f"Must be one of: {list(LABEL_MAPPINGS.keys())}" | |
| ) | |
| forward_path = LABEL_MAPPINGS[mapping_type]["forward"] | |
| inverse_path = LABEL_MAPPINGS[mapping_type]["inverse"] | |
| # Check if files exist | |
| if forward_path.exists() and inverse_path.exists(): | |
| return str(forward_path), str(inverse_path) | |
| # Files don't exist - generate from LDR files as fallback | |
| print(f"⚠️ Label mapping files not found for {mapping_type}") | |
| print(f" Missing: {forward_path}") | |
| print(f" Missing: {inverse_path}") | |
| print(f"🔄 Generating label mappings from LDR files (this may take 1-2 minutes)...") | |
| # Determine LDR directory to scan | |
| if mapping_type == "subset_1k": | |
| ldr_dir = CAR_DATA_DIR / "ldr" | |
| if not ldr_dir.exists(): | |
| ldr_dir = CAR_DATA_DIR # Try parent directory | |
| else: | |
| ldr_dir = CAR_1K_DIR / mapping_type | |
| if not ldr_dir.exists(): | |
| raise FileNotFoundError( | |
| f"Cannot generate mappings: LDR directory not found: {ldr_dir}\n" | |
| f"Please ensure LDR files are available." | |
| ) | |
| return generate_label_mappings_from_ldr(ldr_dir, mapping_type) | |
| def create_default_mappings(mapping_type: str = "subset_1k") -> Tuple[Dict, Dict]: | |
| """ | |
| Create minimal default label mappings if files are missing | |
| This is a fallback for development/testing. Production should have real files. | |
| Args: | |
| mapping_type: Mapping type identifier | |
| Returns: | |
| Tuple of (label_mapping, label_inverse_mapping) dictionaries | |
| """ | |
| print(f"⚠️ WARNING: Creating default empty mappings for {mapping_type}") | |
| print(" This is for fallback only. Production should have real mapping files.") | |
| # Minimal mapping structure | |
| label_mapping = {} | |
| label_inverse_mapping = {} | |
| return label_mapping, label_inverse_mapping | |
| def load_mappings_safe(mapping_type: str = "subset_1k") -> Tuple[Dict, Dict]: | |
| """ | |
| Safely load label mappings with fallback | |
| Attempts to load from files, falls back to defaults if missing. | |
| Args: | |
| mapping_type: Either "subset_self" or "subset_1k" | |
| Returns: | |
| Tuple of (label_mapping, label_inverse_mapping) dictionaries | |
| """ | |
| try: | |
| forward_path, inverse_path = get_mapping_paths(mapping_type) | |
| with open(forward_path, 'r', encoding='utf-8') as f: | |
| label_mapping = json.load(f) | |
| with open(inverse_path, 'r', encoding='utf-8') as f: | |
| label_inverse_mapping = json.load(f) | |
| return label_mapping, label_inverse_mapping | |
| except FileNotFoundError as e: | |
| print(f"⚠️ {e}") | |
| return create_default_mappings(mapping_type) | |
| # ============================================================================ | |
| # Debug Information | |
| # ============================================================================ | |
| def print_config_info(): | |
| """Print current configuration for debugging""" | |
| print("=" * 60) | |
| print("Configuration Information") | |
| print("=" * 60) | |
| print(f"Environment: {ENVIRONMENT}") | |
| print(f"Project Root: {PROJECT_ROOT}") | |
| print(f"Data Directory: {DATA_DIR}") | |
| print(f"Data Dir Exists: {DATA_DIR.exists()}") | |
| print("\nLabel Mapping Paths:") | |
| for mapping_type, paths in LABEL_MAPPINGS.items(): | |
| print(f"\n {mapping_type}:") | |
| for key, path in paths.items(): | |
| exists = "✅" if path.exists() else "❌" | |
| print(f" {key}: {exists} {path}") | |
| print("=" * 60) | |
| # ============================================================================ | |
| # Module Test | |
| # ============================================================================ | |
| if __name__ == "__main__": | |
| print_config_info() | |
| # Test loading mappings | |
| print("\n\nTesting mapping load:") | |
| try: | |
| forward, inverse = get_mapping_paths("subset_1k") | |
| print(f"✅ subset_1k paths retrieved successfully") | |
| print(f" Forward: {forward}") | |
| print(f" Inverse: {inverse}") | |
| except Exception as e: | |
| print(f"❌ Error: {e}") | |
| try: | |
| forward, inverse = get_mapping_paths("subset_self") | |
| print(f"✅ subset_self paths retrieved successfully") | |
| print(f" Forward: {forward}") | |
| print(f" Inverse: {inverse}") | |
| except Exception as e: | |
| print(f"❌ Error: {e}") | |