|
|
""" |
|
|
Unified configuration for Hugging Face datasets integration. |
|
|
All runner modules should import from this module instead of defining their own paths. |
|
|
""" |
|
|
|
|
|
import os |
|
|
import json |
|
|
from pathlib import Path |
|
|
from typing import Any, Dict, Optional, List, Tuple |
|
|
|
|
|
|
|
|
try: |
|
|
from datasets import load_dataset |
|
|
DATASETS_AVAILABLE = True |
|
|
except ImportError: |
|
|
print("⚠️ datasets library not available - HF dataset loading disabled") |
|
|
DATASETS_AVAILABLE = False |
|
|
|
|
|
try: |
|
|
from huggingface_hub import hf_hub_download |
|
|
HF_HUB_AVAILABLE = True |
|
|
except ImportError: |
|
|
print("⚠️ huggingface_hub library not available - HF file loading disabled") |
|
|
HF_HUB_AVAILABLE = False |
|
|
|
|
|
|
|
|
ARTEFACT_JSON_DATASET = os.getenv('ARTEFACT_JSON_DATASET', 'samwaugh/artefact-json') |
|
|
ARTEFACT_EMBEDDINGS_DATASET = os.getenv('ARTEFACT_EMBEDDINGS_DATASET', 'samwaugh/artefact-embeddings') |
|
|
ARTEFACT_MARKDOWN_DATASET = os.getenv('ARTEFACT_MARKDOWN_DATASET', 'samwaugh/artefact-markdown') |
|
|
|
|
|
|
|
|
JSON_INFO_DIR = "/data/hub/datasets--samwaugh--artefact-json/snapshots/latest" |
|
|
EMBEDDINGS_DIR = "/data/hub/datasets--samwaugh--artefact-embeddings/snapshots/latest" |
|
|
MARKDOWN_DIR = "/data/hub/datasets--samwaugh--artefact-markdown/snapshots/latest" |
|
|
|
|
|
|
|
|
CLIP_EMBEDDINGS_ST = Path(EMBEDDINGS_DIR) / "clip_embeddings.safetensors" |
|
|
PAINTINGCLIP_EMBEDDINGS_ST = Path(EMBEDDINGS_DIR) / "paintingclip_embeddings.safetensors" |
|
|
CLIP_SENTENCE_IDS = Path(EMBEDDINGS_DIR) / "clip_embeddings_sentence_ids.json" |
|
|
PAINTINGCLIP_SENTENCE_IDS = Path(EMBEDDINGS_DIR) / "paintingclip_embeddings_sentence_ids.json" |
|
|
CLIP_EMBEDDINGS_DIR = EMBEDDINGS_DIR |
|
|
PAINTINGCLIP_EMBEDDINGS_DIR = EMBEDDINGS_DIR |
|
|
|
|
|
|
|
|
PROJECT_ROOT = Path(__file__).resolve().parents[2] |
|
|
DATA_READ_ROOT = PROJECT_ROOT / "data" |
|
|
|
|
|
|
|
|
|
|
|
WRITE_ROOT = Path(os.getenv("HF_HOME", "/data")) |
|
|
|
|
|
|
|
|
if not WRITE_ROOT.exists(): |
|
|
print(f"⚠️ WRITE_ROOT {WRITE_ROOT} does not exist, trying to create it") |
|
|
try: |
|
|
WRITE_ROOT.mkdir(parents=True, exist_ok=True) |
|
|
print(f"✅ Created WRITE_ROOT: {WRITE_ROOT}") |
|
|
except Exception as e: |
|
|
print(f"❌ Failed to create {WRITE_ROOT}: {e}") |
|
|
raise RuntimeError(f"Cannot create writable directory: {e}") |
|
|
|
|
|
|
|
|
if not os.access(WRITE_ROOT, os.W_OK): |
|
|
print(f"❌ WRITE_ROOT {WRITE_ROOT} is not writable") |
|
|
print(f"❌ Current permissions: {oct(WRITE_ROOT.stat().st_mode)[-3:]}") |
|
|
print(f"❌ Owner: {WRITE_ROOT.owner()}") |
|
|
raise RuntimeError(f"Directory {WRITE_ROOT} is not writable") |
|
|
|
|
|
print(f"✅ Using WRITE_ROOT: {WRITE_ROOT}") |
|
|
print(f"✅ Using READ_ROOT: {DATA_READ_ROOT}") |
|
|
|
|
|
|
|
|
MODELS_DIR = DATA_READ_ROOT / "models" |
|
|
MARKER_DIR = DATA_READ_ROOT / "marker_output" |
|
|
|
|
|
|
|
|
PAINTINGCLIP_MODEL_DIR = MODELS_DIR / "PaintingClip" |
|
|
|
|
|
|
|
|
OUTPUTS_DIR = WRITE_ROOT / "outputs" |
|
|
ARTIFACTS_DIR = WRITE_ROOT / "artifacts" |
|
|
|
|
|
|
|
|
for dir_path in [OUTPUTS_DIR, ARTIFACTS_DIR]: |
|
|
try: |
|
|
dir_path.mkdir(parents=True, exist_ok=True) |
|
|
print(f"✅ Ensured directory exists: {dir_path}") |
|
|
except Exception as e: |
|
|
print(f"⚠️ Could not create directory {dir_path}: {e}") |
|
|
|
|
|
|
|
|
sentences: Dict[str, Any] = {} |
|
|
works: Dict[str, Any] = {} |
|
|
creators: Dict[str, Any] = {} |
|
|
topics: Dict[str, Any] = {} |
|
|
topic_names: Dict[str, Any] = {} |
|
|
|
|
|
def load_json_from_hf(repo_id: str, filename: str) -> Optional[Dict[str, Any]]: |
|
|
"""Load a single JSON file from Hugging Face repository""" |
|
|
if not HF_HUB_AVAILABLE: |
|
|
print(f"⚠️ huggingface_hub not available - cannot load {filename}") |
|
|
return None |
|
|
|
|
|
try: |
|
|
print(f"🔍 Downloading {filename} from {repo_id}...") |
|
|
file_path = hf_hub_download( |
|
|
repo_id=repo_id, |
|
|
filename=filename, |
|
|
repo_type="dataset" |
|
|
) |
|
|
|
|
|
with open(file_path, 'r', encoding='utf-8') as f: |
|
|
data = json.load(f) |
|
|
|
|
|
print(f"✅ Successfully loaded {filename}: {len(data)} entries") |
|
|
return data |
|
|
except Exception as e: |
|
|
print(f"❌ Failed to load {filename} from {repo_id}: {e}") |
|
|
return None |
|
|
|
|
|
def load_json_datasets() -> Optional[Dict[str, Any]]: |
|
|
"""Load all JSON datasets from Hugging Face""" |
|
|
if not HF_HUB_AVAILABLE: |
|
|
print("⚠️ huggingface_hub library not available - skipping HF dataset loading") |
|
|
return None |
|
|
|
|
|
try: |
|
|
print("📥 Loading JSON files from Hugging Face repository...") |
|
|
|
|
|
|
|
|
global sentences, works, creators, topics, topic_names |
|
|
|
|
|
creators = load_json_from_hf(ARTEFACT_JSON_DATASET, 'creators.json') or {} |
|
|
sentences = load_json_from_hf(ARTEFACT_JSON_DATASET, 'sentences.json') or {} |
|
|
works = load_json_from_hf(ARTEFACT_JSON_DATASET, 'works.json') or {} |
|
|
topics = load_json_from_hf(ARTEFACT_JSON_DATASET, 'topics.json') or {} |
|
|
topic_names = load_json_from_hf(ARTEFACT_JSON_DATASET, 'topic_names.json') or {} |
|
|
|
|
|
print(f"✅ Successfully loaded JSON files from HF:") |
|
|
print(f" Sentences: {len(sentences)} entries") |
|
|
print(f" Works: {len(works)} entries") |
|
|
print(f" Creators: {len(creators)} entries") |
|
|
print(f" Topics: {len(topics)} entries") |
|
|
print(f" Topic Names: {len(topic_names)} entries") |
|
|
|
|
|
return { |
|
|
'creators': creators, |
|
|
'sentences': sentences, |
|
|
'works': works, |
|
|
'topics': topics, |
|
|
'topic_names': topic_names |
|
|
} |
|
|
except Exception as e: |
|
|
print(f"❌ Failed to load JSON datasets from HF: {e}") |
|
|
return None |
|
|
|
|
|
def load_embeddings_datasets() -> Optional[Dict[str, Any]]: |
|
|
"""Load embeddings datasets from Hugging Face using direct file download""" |
|
|
if not HF_HUB_AVAILABLE: |
|
|
print("⚠️ huggingface_hub library not available - skipping HF embeddings loading") |
|
|
return None |
|
|
|
|
|
try: |
|
|
print(f"📥 Loading embeddings from {ARTEFACT_EMBEDDINGS_DATASET}...") |
|
|
|
|
|
|
|
|
|
|
|
return { |
|
|
'use_direct_download': True, |
|
|
'repo_id': ARTEFACT_EMBEDDINGS_DATASET |
|
|
} |
|
|
except Exception as e: |
|
|
print(f"❌ Failed to load embeddings datasets from HF: {e}") |
|
|
return None |
|
|
|
|
|
|
|
|
_markdown_dir_cache = None |
|
|
|
|
|
def clear_markdown_cache() -> bool: |
|
|
"""Clear the markdown cache to force a fresh download""" |
|
|
try: |
|
|
import shutil |
|
|
markdown_cache_dir = WRITE_ROOT / "markdown_cache" |
|
|
if markdown_cache_dir.exists(): |
|
|
print(f"🗑️ Clearing markdown cache at {markdown_cache_dir}") |
|
|
shutil.rmtree(markdown_cache_dir) |
|
|
print(f"✅ Markdown cache cleared successfully") |
|
|
return True |
|
|
else: |
|
|
print(f"ℹ️ No markdown cache found to clear") |
|
|
return True |
|
|
except Exception as e: |
|
|
print(f"❌ Failed to clear markdown cache: {e}") |
|
|
return False |
|
|
|
|
|
def get_markdown_cache_info() -> dict: |
|
|
"""Get information about the current markdown cache""" |
|
|
try: |
|
|
import shutil |
|
|
markdown_cache_dir = WRITE_ROOT / "markdown_cache" |
|
|
works_dir = markdown_cache_dir / "works" |
|
|
|
|
|
if not works_dir.exists(): |
|
|
return { |
|
|
"exists": False, |
|
|
"size_gb": 0, |
|
|
"work_count": 0, |
|
|
"file_count": 0 |
|
|
} |
|
|
|
|
|
|
|
|
total_size = sum(f.stat().st_size for f in works_dir.rglob('*') if f.is_file()) |
|
|
size_gb = total_size / (1024**3) |
|
|
|
|
|
|
|
|
file_count = len(list(works_dir.rglob('*'))) |
|
|
work_count = len([d for d in works_dir.iterdir() if d.is_dir()]) |
|
|
|
|
|
return { |
|
|
"exists": True, |
|
|
"size_gb": round(size_gb, 2), |
|
|
"work_count": work_count, |
|
|
"file_count": file_count, |
|
|
"path": str(works_dir) |
|
|
} |
|
|
except Exception as e: |
|
|
print(f"❌ Failed to get cache info: {e}") |
|
|
return {"exists": False, "error": str(e)} |
|
|
|
|
|
def load_markdown_dataset(force_refresh: bool = False) -> Optional[Path]: |
|
|
"""Load markdown dataset from Hugging Face and return the local path""" |
|
|
if not HF_HUB_AVAILABLE: |
|
|
print("⚠️ huggingface_hub not available - cannot load markdown dataset") |
|
|
return None |
|
|
|
|
|
try: |
|
|
print(f"📥 Loading markdown dataset from {ARTEFACT_MARKDOWN_DATASET}...") |
|
|
|
|
|
|
|
|
markdown_cache_dir = WRITE_ROOT / "markdown_cache" |
|
|
markdown_cache_dir.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
works_dir = markdown_cache_dir / "works" |
|
|
|
|
|
|
|
|
if force_refresh: |
|
|
print("🔄 Force refresh requested - clearing cache") |
|
|
clear_markdown_cache() |
|
|
else: |
|
|
|
|
|
cache_info = get_markdown_cache_info() |
|
|
if cache_info["exists"]: |
|
|
print(f"📊 Cache info: {cache_info['work_count']} works, {cache_info['size_gb']}GB") |
|
|
|
|
|
|
|
|
expected_works = 7200 |
|
|
if cache_info["work_count"] < expected_works * 0.8: |
|
|
print(f"⚠️ Cache incomplete ({cache_info['work_count']}/{expected_works} works) - clearing and re-downloading") |
|
|
clear_markdown_cache() |
|
|
else: |
|
|
print(f"✅ Using cached markdown dataset at {works_dir}") |
|
|
return works_dir |
|
|
|
|
|
|
|
|
print("📥 Downloading markdown dataset with optimized approach...") |
|
|
return _download_markdown_optimized(works_dir) |
|
|
|
|
|
except Exception as e: |
|
|
print(f"❌ Failed to load markdown dataset: {e}") |
|
|
return None |
|
|
|
|
|
def _download_markdown_optimized(works_dir: Path) -> Optional[Path]: |
|
|
"""Optimized markdown dataset download with parallel processing""" |
|
|
try: |
|
|
from huggingface_hub import list_repo_files |
|
|
import concurrent.futures |
|
|
import threading |
|
|
import time |
|
|
|
|
|
|
|
|
print("🔍 Discovering files in dataset...") |
|
|
files = list_repo_files(repo_id=ARTEFACT_MARKDOWN_DATASET, repo_type="dataset") |
|
|
|
|
|
|
|
|
work_dirs = set() |
|
|
for file_path in files: |
|
|
if file_path.startswith("works/"): |
|
|
parts = file_path.split("/") |
|
|
if len(parts) >= 2: |
|
|
work_id = parts[1] |
|
|
if work_id.startswith("W"): |
|
|
work_dirs.add(work_id) |
|
|
|
|
|
print(f"📊 Found {len(work_dirs)} work directories to download") |
|
|
|
|
|
|
|
|
print("📄 Phase 1: Downloading markdown files only...") |
|
|
_download_markdown_files_parallel(works_dir, work_dirs, files) |
|
|
|
|
|
|
|
|
print("🖼️ Phase 2: Downloading images in batches...") |
|
|
_download_images_batch(works_dir, work_dirs, files) |
|
|
|
|
|
print(f"✅ Successfully downloaded markdown dataset to {works_dir}") |
|
|
return works_dir |
|
|
|
|
|
except Exception as e: |
|
|
print(f"❌ Optimized download failed: {e}") |
|
|
return None |
|
|
|
|
|
def _download_markdown_files_parallel(works_dir: Path, work_dirs: set, files: list) -> None: |
|
|
"""Download markdown files in parallel for speed""" |
|
|
import concurrent.futures |
|
|
import threading |
|
|
import time |
|
|
|
|
|
def download_markdown_file(work_id: str) -> bool: |
|
|
"""Download a single markdown file""" |
|
|
try: |
|
|
work_dir = works_dir / work_id |
|
|
work_dir.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
md_file = hf_hub_download( |
|
|
repo_id=ARTEFACT_MARKDOWN_DATASET, |
|
|
filename=f"works/{work_id}/{work_id}.md", |
|
|
repo_type="dataset" |
|
|
) |
|
|
|
|
|
import shutil |
|
|
shutil.copy2(md_file, work_dir / f"{work_id}.md") |
|
|
return True |
|
|
except Exception as e: |
|
|
print(f"⚠️ Could not download markdown for {work_id}: {e}") |
|
|
return False |
|
|
|
|
|
|
|
|
work_list = list(work_dirs) |
|
|
completed = 0 |
|
|
failed = 0 |
|
|
|
|
|
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor: |
|
|
future_to_work = {executor.submit(download_markdown_file, work_id): work_id for work_id in work_list} |
|
|
|
|
|
for future in concurrent.futures.as_completed(future_to_work): |
|
|
work_id = future_to_work[future] |
|
|
try: |
|
|
success = future.result() |
|
|
if success: |
|
|
completed += 1 |
|
|
else: |
|
|
failed += 1 |
|
|
|
|
|
if (completed + failed) % 500 == 0: |
|
|
print(f"📄 Downloaded {completed}/{len(work_list)} markdown files (failed: {failed})") |
|
|
|
|
|
except Exception as e: |
|
|
print(f"❌ Error processing {work_id}: {e}") |
|
|
failed += 1 |
|
|
|
|
|
print(f"✅ Phase 1 complete: {completed} markdown files downloaded, {failed} failed") |
|
|
|
|
|
def _download_images_batch(works_dir: Path, work_dirs: set, files: list) -> None: |
|
|
"""Download images in batches to avoid overwhelming the server""" |
|
|
import concurrent.futures |
|
|
import time |
|
|
|
|
|
def download_work_images(work_id: str) -> tuple: |
|
|
"""Download all images for a single work""" |
|
|
try: |
|
|
work_dir = works_dir / work_id |
|
|
images_dir = work_dir / "images" |
|
|
images_dir.mkdir(exist_ok=True) |
|
|
|
|
|
|
|
|
work_files = [f for f in files if f.startswith(f"works/{work_id}/images/")] |
|
|
|
|
|
downloaded = 0 |
|
|
failed = 0 |
|
|
|
|
|
for img_file in work_files: |
|
|
try: |
|
|
downloaded_file = hf_hub_download( |
|
|
repo_id=ARTEFACT_MARKDOWN_DATASET, |
|
|
filename=img_file, |
|
|
repo_type="dataset" |
|
|
) |
|
|
|
|
|
import shutil |
|
|
img_name = img_file.split("/")[-1] |
|
|
shutil.copy2(downloaded_file, images_dir / img_name) |
|
|
downloaded += 1 |
|
|
|
|
|
except Exception as e: |
|
|
failed += 1 |
|
|
|
|
|
if failed <= 3: |
|
|
print(f"⚠️ Could not download image {img_file}: {e}") |
|
|
|
|
|
return (work_id, downloaded, failed) |
|
|
|
|
|
except Exception as e: |
|
|
print(f"❌ Error downloading images for {work_id}: {e}") |
|
|
return (work_id, 0, 1) |
|
|
|
|
|
|
|
|
work_list = list(work_dirs) |
|
|
batch_size = 50 |
|
|
total_downloaded = 0 |
|
|
total_failed = 0 |
|
|
|
|
|
for i in range(0, len(work_list), batch_size): |
|
|
batch = work_list[i:i + batch_size] |
|
|
print(f"🖼️ Processing image batch {i//batch_size + 1}/{(len(work_list) + batch_size - 1)//batch_size} ({len(batch)} works)") |
|
|
|
|
|
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: |
|
|
future_to_work = {executor.submit(download_work_images, work_id): work_id for work_id in batch} |
|
|
|
|
|
for future in concurrent.futures.as_completed(future_to_work): |
|
|
work_id = future_to_work[future] |
|
|
try: |
|
|
work_id, downloaded, failed = future.result() |
|
|
total_downloaded += downloaded |
|
|
total_failed += failed |
|
|
except Exception as e: |
|
|
print(f"❌ Error processing {work_id}: {e}") |
|
|
total_failed += 1 |
|
|
|
|
|
|
|
|
time.sleep(1) |
|
|
|
|
|
print(f"✅ Phase 2 complete: {total_downloaded} images downloaded, {total_failed} failed") |
|
|
|
|
|
def _download_markdown_files_fallback(cache_dir: Path) -> Optional[Path]: |
|
|
"""Fallback method to download markdown files individually""" |
|
|
try: |
|
|
works_dir = cache_dir / "works" |
|
|
works_dir.mkdir(exist_ok=True) |
|
|
|
|
|
|
|
|
|
|
|
print("⚠️ Using fallback markdown loading - some files may be missing") |
|
|
return works_dir |
|
|
|
|
|
except Exception as e: |
|
|
print(f"❌ Fallback markdown loading failed: {e}") |
|
|
return None |
|
|
|
|
|
def get_markdown_dir(force_refresh: bool = False) -> Path: |
|
|
"""Get the markdown directory, loading from HF if needed""" |
|
|
global _markdown_dir_cache |
|
|
|
|
|
if _markdown_dir_cache is None or force_refresh: |
|
|
_markdown_dir_cache = load_markdown_dataset(force_refresh=force_refresh) |
|
|
|
|
|
if _markdown_dir_cache and _markdown_dir_cache.exists(): |
|
|
return _markdown_dir_cache |
|
|
else: |
|
|
|
|
|
print("⚠️ Using fallback local markdown directory") |
|
|
return DATA_READ_ROOT / "marker_output" |
|
|
|
|
|
|
|
|
JSON_DATASETS = load_json_datasets |
|
|
EMBEDDINGS_DATASETS = load_embeddings_datasets |
|
|
|