|
|
""" |
|
|
Unified configuration for Hugging Face datasets integration. |
|
|
All runner modules should import from this module instead of defining their own paths. |
|
|
""" |
|
|
|
|
|
import os |
|
|
import json |
|
|
from pathlib import Path |
|
|
from typing import Any, Dict, Optional, List, Tuple |
|
|
|
|
|
|
|
|
try: |
|
|
from datasets import load_dataset |
|
|
DATASETS_AVAILABLE = True |
|
|
except ImportError: |
|
|
print("⚠️ datasets library not available - HF dataset loading disabled") |
|
|
DATASETS_AVAILABLE = False |
|
|
|
|
|
try: |
|
|
from huggingface_hub import hf_hub_download |
|
|
HF_HUB_AVAILABLE = True |
|
|
except ImportError: |
|
|
print("⚠️ huggingface_hub library not available - HF file loading disabled") |
|
|
HF_HUB_AVAILABLE = False |
|
|
|
|
|
|
|
|
ARTEFACT_JSON_DATASET = os.getenv('ARTEFACT_JSON_DATASET', 'samwaugh/artefact-json') |
|
|
ARTEFACT_EMBEDDINGS_DATASET = os.getenv('ARTEFACT_EMBEDDINGS_DATASET', 'samwaugh/artefact-embeddings') |
|
|
ARTEFACT_MARKDOWN_DATASET = os.getenv('ARTEFACT_MARKDOWN_DATASET', 'samwaugh/artefact-markdown') |
|
|
|
|
|
|
|
|
JSON_INFO_DIR = "/data/hub/datasets--samwaugh--artefact-json/snapshots/latest" |
|
|
EMBEDDINGS_DIR = "/data/hub/datasets--samwaugh--artefact-embeddings/snapshots/latest" |
|
|
MARKDOWN_DIR = "/data/hub/datasets--samwaugh--artefact-markdown/snapshots/latest" |
|
|
|
|
|
|
|
|
CLIP_EMBEDDINGS_ST = Path(EMBEDDINGS_DIR) / "clip_embeddings.safetensors" |
|
|
PAINTINGCLIP_EMBEDDINGS_ST = Path(EMBEDDINGS_DIR) / "paintingclip_embeddings.safetensors" |
|
|
CLIP_SENTENCE_IDS = Path(EMBEDDINGS_DIR) / "clip_embeddings_sentence_ids.json" |
|
|
PAINTINGCLIP_SENTENCE_IDS = Path(EMBEDDINGS_DIR) / "paintingclip_embeddings_sentence_ids.json" |
|
|
CLIP_EMBEDDINGS_DIR = EMBEDDINGS_DIR |
|
|
PAINTINGCLIP_EMBEDDINGS_DIR = EMBEDDINGS_DIR |
|
|
|
|
|
|
|
|
PROJECT_ROOT = Path(__file__).resolve().parents[2] |
|
|
DATA_READ_ROOT = PROJECT_ROOT / "data" |
|
|
|
|
|
|
|
|
|
|
|
WRITE_ROOT = Path(os.getenv("HF_HOME", "/data")) |
|
|
|
|
|
|
|
|
if not WRITE_ROOT.exists(): |
|
|
print(f"⚠️ WRITE_ROOT {WRITE_ROOT} does not exist, trying to create it") |
|
|
try: |
|
|
WRITE_ROOT.mkdir(parents=True, exist_ok=True) |
|
|
print(f"✅ Created WRITE_ROOT: {WRITE_ROOT}") |
|
|
except Exception as e: |
|
|
print(f"⚠️ Failed to create {WRITE_ROOT}: {e}") |
|
|
print(f"⚠️ This may be expected in local development - continuing anyway") |
|
|
|
|
|
|
|
|
|
|
|
if WRITE_ROOT.exists() and not os.access(WRITE_ROOT, os.W_OK): |
|
|
print(f"❌ WRITE_ROOT {WRITE_ROOT} is not writable") |
|
|
print(f"❌ Current permissions: {oct(WRITE_ROOT.stat().st_mode)[-3:]}") |
|
|
print(f"❌ Owner: {WRITE_ROOT.owner()}") |
|
|
print(f"⚠️ This may be expected in local development - continuing anyway") |
|
|
|
|
|
|
|
|
print(f"✅ Using WRITE_ROOT: {WRITE_ROOT}") |
|
|
print(f"✅ Using READ_ROOT: {DATA_READ_ROOT}") |
|
|
|
|
|
|
|
|
MODELS_DIR = DATA_READ_ROOT / "models" |
|
|
MARKER_DIR = DATA_READ_ROOT / "marker_output" |
|
|
|
|
|
|
|
|
PAINTINGCLIP_MODEL_DIR = MODELS_DIR / "PaintingClip" |
|
|
|
|
|
|
|
|
OUTPUTS_DIR = WRITE_ROOT / "outputs" |
|
|
ARTIFACTS_DIR = WRITE_ROOT / "artifacts" |
|
|
|
|
|
|
|
|
for dir_path in [OUTPUTS_DIR, ARTIFACTS_DIR]: |
|
|
try: |
|
|
dir_path.mkdir(parents=True, exist_ok=True) |
|
|
print(f"✅ Ensured directory exists: {dir_path}") |
|
|
except Exception as e: |
|
|
print(f"⚠️ Could not create directory {dir_path}: {e}") |
|
|
|
|
|
|
|
|
sentences: Dict[str, Any] = {} |
|
|
works: Dict[str, Any] = {} |
|
|
creators: Dict[str, Any] = {} |
|
|
topics: Dict[str, Any] = {} |
|
|
topic_names: Dict[str, Any] = {} |
|
|
|
|
|
def load_json_from_hf(repo_id: str, filename: str) -> Optional[Dict[str, Any]]: |
|
|
"""Load a single JSON file from Hugging Face repository""" |
|
|
if not HF_HUB_AVAILABLE: |
|
|
print(f"⚠️ huggingface_hub not available - cannot load {filename}") |
|
|
return None |
|
|
|
|
|
try: |
|
|
print(f"🔍 Downloading {filename} from {repo_id}...") |
|
|
file_path = hf_hub_download( |
|
|
repo_id=repo_id, |
|
|
filename=filename, |
|
|
repo_type="dataset" |
|
|
) |
|
|
|
|
|
with open(file_path, 'r', encoding='utf-8') as f: |
|
|
data = json.load(f) |
|
|
|
|
|
print(f"✅ Successfully loaded {filename}: {len(data)} entries") |
|
|
return data |
|
|
except Exception as e: |
|
|
print(f"❌ Failed to load {filename} from {repo_id}: {e}") |
|
|
return None |
|
|
|
|
|
def load_json_datasets() -> Optional[Dict[str, Any]]: |
|
|
"""Load all JSON datasets from Hugging Face""" |
|
|
if not HF_HUB_AVAILABLE: |
|
|
print("⚠️ huggingface_hub library not available - skipping HF dataset loading") |
|
|
return None |
|
|
|
|
|
try: |
|
|
print("📥 Loading JSON files from Hugging Face repository...") |
|
|
|
|
|
|
|
|
global sentences, works, creators, topics, topic_names |
|
|
|
|
|
creators = load_json_from_hf(ARTEFACT_JSON_DATASET, 'creators.json') or {} |
|
|
sentences = load_json_from_hf(ARTEFACT_JSON_DATASET, 'sentences.json') or {} |
|
|
works = load_json_from_hf(ARTEFACT_JSON_DATASET, 'works.json') or {} |
|
|
topics = load_json_from_hf(ARTEFACT_JSON_DATASET, 'topics.json') or {} |
|
|
topic_names = load_json_from_hf(ARTEFACT_JSON_DATASET, 'topic_names.json') or {} |
|
|
|
|
|
print(f"✅ Successfully loaded JSON files from HF:") |
|
|
print(f" Sentences: {len(sentences)} entries") |
|
|
print(f" Works: {len(works)} entries") |
|
|
print(f" Creators: {len(creators)} entries") |
|
|
print(f" Topics: {len(topics)} entries") |
|
|
print(f" Topic Names: {len(topic_names)} entries") |
|
|
|
|
|
return { |
|
|
'creators': creators, |
|
|
'sentences': sentences, |
|
|
'works': works, |
|
|
'topics': topics, |
|
|
'topic_names': topic_names |
|
|
} |
|
|
except Exception as e: |
|
|
print(f"❌ Failed to load JSON datasets from HF: {e}") |
|
|
return None |
|
|
|
|
|
def load_embeddings_datasets() -> Optional[Dict[str, Any]]: |
|
|
"""Load embeddings datasets from Hugging Face using direct file download""" |
|
|
if not HF_HUB_AVAILABLE: |
|
|
print("⚠️ huggingface_hub library not available - skipping HF embeddings loading") |
|
|
return None |
|
|
|
|
|
try: |
|
|
print(f"📥 Loading embeddings from {ARTEFACT_EMBEDDINGS_DATASET}...") |
|
|
|
|
|
|
|
|
|
|
|
return { |
|
|
'use_direct_download': True, |
|
|
'repo_id': ARTEFACT_EMBEDDINGS_DATASET |
|
|
} |
|
|
except Exception as e: |
|
|
print(f"❌ Failed to load embeddings datasets from HF: {e}") |
|
|
return None |
|
|
|
|
|
|
|
|
_markdown_dir_cache = None |
|
|
|
|
|
def clear_markdown_cache() -> bool: |
|
|
"""Clear the markdown cache to force a fresh download""" |
|
|
try: |
|
|
import shutil |
|
|
markdown_cache_dir = WRITE_ROOT / "markdown_cache" |
|
|
if markdown_cache_dir.exists(): |
|
|
print(f"🗑️ Clearing markdown cache at {markdown_cache_dir}") |
|
|
shutil.rmtree(markdown_cache_dir) |
|
|
print(f"✅ Markdown cache cleared successfully") |
|
|
return True |
|
|
else: |
|
|
print(f"ℹ️ No markdown cache found to clear") |
|
|
return True |
|
|
except Exception as e: |
|
|
print(f"❌ Failed to clear markdown cache: {e}") |
|
|
return False |
|
|
|
|
|
def get_markdown_cache_info() -> dict: |
|
|
"""Get information about the current markdown cache""" |
|
|
try: |
|
|
import shutil |
|
|
markdown_cache_dir = WRITE_ROOT / "markdown_cache" |
|
|
works_dir = markdown_cache_dir / "works" |
|
|
|
|
|
if not works_dir.exists(): |
|
|
return { |
|
|
"exists": False, |
|
|
"size_gb": 0, |
|
|
"work_count": 0, |
|
|
"file_count": 0 |
|
|
} |
|
|
|
|
|
|
|
|
total_size = sum(f.stat().st_size for f in works_dir.rglob('*') if f.is_file()) |
|
|
size_gb = total_size / (1024**3) |
|
|
|
|
|
|
|
|
file_count = len(list(works_dir.rglob('*'))) |
|
|
work_count = len([d for d in works_dir.iterdir() if d.is_dir()]) |
|
|
|
|
|
return { |
|
|
"exists": True, |
|
|
"size_gb": round(size_gb, 2), |
|
|
"work_count": work_count, |
|
|
"file_count": file_count, |
|
|
"path": str(works_dir) |
|
|
} |
|
|
except Exception as e: |
|
|
print(f"❌ Failed to get cache info: {e}") |
|
|
return {"exists": False, "error": str(e)} |
|
|
|
|
|
def load_markdown_dataset(force_refresh: bool = False) -> Optional[Path]: |
|
|
"""Load markdown dataset from Hugging Face and return the local path""" |
|
|
if not HF_HUB_AVAILABLE: |
|
|
print("⚠️ huggingface_hub not available - cannot load markdown dataset") |
|
|
return None |
|
|
|
|
|
try: |
|
|
print(f"📥 Loading markdown dataset from {ARTEFACT_MARKDOWN_DATASET}...") |
|
|
|
|
|
|
|
|
markdown_cache_dir = WRITE_ROOT / "markdown_cache" |
|
|
markdown_cache_dir.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
works_dir = markdown_cache_dir / "works" |
|
|
|
|
|
|
|
|
if force_refresh: |
|
|
print("🔄 Force refresh requested - clearing cache") |
|
|
clear_markdown_cache() |
|
|
else: |
|
|
|
|
|
cache_info = get_markdown_cache_info() |
|
|
if cache_info["exists"]: |
|
|
print(f"📊 Cache info: {cache_info['work_count']} works, {cache_info['size_gb']}GB") |
|
|
|
|
|
|
|
|
expected_works = 7200 |
|
|
if cache_info["work_count"] < expected_works * 0.8: |
|
|
print(f"⚠️ Cache incomplete ({cache_info['work_count']}/{expected_works} works) - clearing and re-downloading") |
|
|
clear_markdown_cache() |
|
|
else: |
|
|
print(f"✅ Using cached markdown dataset at {works_dir}") |
|
|
|
|
|
|
|
|
try: |
|
|
if _images_likely_missing(works_dir): |
|
|
print("🖼️ Images appear to be missing or incomplete – resuming image download phase...") |
|
|
_download_images_phase_only(works_dir) |
|
|
else: |
|
|
print("🖼️ Images appear present for sampled works – skipping image download phase") |
|
|
except Exception as e: |
|
|
print(f"⚠️ Image presence check failed: {e}") |
|
|
return works_dir |
|
|
|
|
|
|
|
|
print("📥 Downloading markdown dataset with optimized approach...") |
|
|
return _download_markdown_optimized(works_dir) |
|
|
|
|
|
except Exception as e: |
|
|
print(f"❌ Failed to load markdown dataset: {e}") |
|
|
return None |
|
|
|
|
|
def _download_markdown_optimized(works_dir: Path) -> Optional[Path]: |
|
|
"""Robust markdown dataset download with error handling and progress persistence""" |
|
|
try: |
|
|
from huggingface_hub import list_repo_files |
|
|
import concurrent.futures |
|
|
import threading |
|
|
import time |
|
|
import json |
|
|
|
|
|
|
|
|
progress_file = works_dir.parent / "download_progress.json" |
|
|
|
|
|
|
|
|
progress = {"markdown_completed": set(), "image_batches_completed": set()} |
|
|
if progress_file.exists(): |
|
|
try: |
|
|
with open(progress_file, 'r') as f: |
|
|
saved_progress = json.load(f) |
|
|
progress["markdown_completed"] = set(saved_progress.get("markdown_completed", [])) |
|
|
progress["image_batches_completed"] = set(saved_progress.get("image_batches_completed", [])) |
|
|
print(f"📊 Resuming download from previous progress...") |
|
|
except Exception as e: |
|
|
print(f"⚠️ Could not load progress file: {e}") |
|
|
|
|
|
|
|
|
print("🔍 Discovering files in dataset...") |
|
|
files = list_repo_files(repo_id=ARTEFACT_MARKDOWN_DATASET, repo_type="dataset") |
|
|
|
|
|
|
|
|
work_dirs = set() |
|
|
for file_path in files: |
|
|
if file_path.startswith("works/"): |
|
|
parts = file_path.split("/") |
|
|
if len(parts) >= 2: |
|
|
work_id = parts[1] |
|
|
if work_id.startswith("W"): |
|
|
work_dirs.add(work_id) |
|
|
|
|
|
print(f"📊 Found {len(work_dirs)} work directories to download") |
|
|
|
|
|
|
|
|
print("📄 Phase 1: Downloading markdown files only...") |
|
|
_download_markdown_files_robust(works_dir, work_dirs, files, progress, progress_file) |
|
|
|
|
|
|
|
|
print("🖼️ Phase 2: Downloading images in smaller batches...") |
|
|
_download_images_robust(works_dir, work_dirs, files, progress, progress_file) |
|
|
|
|
|
|
|
|
if progress_file.exists(): |
|
|
progress_file.unlink() |
|
|
|
|
|
print(f"✅ Successfully downloaded markdown dataset to {works_dir}") |
|
|
return works_dir |
|
|
|
|
|
except Exception as e: |
|
|
print(f"❌ Optimized download failed: {e}") |
|
|
import traceback |
|
|
traceback.print_exc() |
|
|
return None |
|
|
|
|
|
def _images_likely_missing(works_dir: Path, sample_size: int = 20) -> bool: |
|
|
"""Quickly assess whether images are present in the cache. |
|
|
|
|
|
We sample up to `sample_size` work directories and check for any .jpg/.png |
|
|
files either under <work>/images/ or directly inside <work>/. |
|
|
|
|
|
Returns True if fewer than 20% of sampled works have at least one image. |
|
|
""" |
|
|
try: |
|
|
work_dirs = [d for d in works_dir.iterdir() if d.is_dir()] |
|
|
if not work_dirs: |
|
|
print("🖼️ Image check: no work directories found – treating as missing") |
|
|
return True |
|
|
|
|
|
sampled = work_dirs[:sample_size] |
|
|
has_images_count = 0 |
|
|
for work_dir in sampled: |
|
|
images_dir = work_dir / "images" |
|
|
found = False |
|
|
if images_dir.exists(): |
|
|
if any(images_dir.glob("*.jpg")) or any(images_dir.glob("*.jpeg")) or any(images_dir.glob("*.png")): |
|
|
found = True |
|
|
|
|
|
if not found: |
|
|
if any(work_dir.glob("*.jpg")) or any(work_dir.glob("*.jpeg")) or any(work_dir.glob("*.png")): |
|
|
found = True |
|
|
if found: |
|
|
has_images_count += 1 |
|
|
|
|
|
ratio = has_images_count / max(1, len(sampled)) |
|
|
print(f"🖼️ Image check: {has_images_count}/{len(sampled)} sampled works have images (ratio={ratio:.2f})") |
|
|
return ratio < 0.2 |
|
|
except Exception as e: |
|
|
print(f"⚠️ Image sampling check failed: {e}") |
|
|
|
|
|
return True |
|
|
|
|
|
def _download_images_phase_only(works_dir: Path) -> Optional[Path]: |
|
|
"""Resume/perform only the image download phase without touching markdown files. |
|
|
|
|
|
This function discovers files on the HF repo, constructs the list of works, |
|
|
loads any existing download progress, and runs the robust image downloader. |
|
|
""" |
|
|
try: |
|
|
from huggingface_hub import list_repo_files |
|
|
import json |
|
|
|
|
|
progress_file = works_dir.parent / "download_progress.json" |
|
|
|
|
|
|
|
|
progress = {"markdown_completed": set(), "image_batches_completed": set()} |
|
|
if progress_file.exists(): |
|
|
try: |
|
|
with open(progress_file, 'r') as f: |
|
|
saved_progress = json.load(f) |
|
|
progress["markdown_completed"] = set(saved_progress.get("markdown_completed", [])) |
|
|
progress["image_batches_completed"] = set(saved_progress.get("image_batches_completed", [])) |
|
|
print(f"📊 Resuming image download from previous progress...") |
|
|
except Exception as e: |
|
|
print(f"⚠️ Could not load progress file: {e}") |
|
|
|
|
|
print("🔍 Discovering files in dataset (images phase only)...") |
|
|
files = list_repo_files(repo_id=ARTEFACT_MARKDOWN_DATASET, repo_type="dataset") |
|
|
|
|
|
work_dirs = set() |
|
|
for file_path in files: |
|
|
if file_path.startswith("works/"): |
|
|
parts = file_path.split("/") |
|
|
if len(parts) >= 2: |
|
|
work_id = parts[1] |
|
|
if work_id.startswith("W"): |
|
|
work_dirs.add(work_id) |
|
|
|
|
|
print(f"📊 Images phase: {len(work_dirs)} work directories discovered") |
|
|
_download_images_robust(works_dir, work_dirs, files, progress, progress_file) |
|
|
return works_dir |
|
|
except Exception as e: |
|
|
print(f"❌ Images-phase-only download failed: {e}") |
|
|
import traceback |
|
|
traceback.print_exc() |
|
|
return None |
|
|
|
|
|
def _download_markdown_files_robust(works_dir: Path, work_dirs: set, files: list, progress: dict, progress_file: Path) -> None: |
|
|
"""Download markdown files with retry logic and progress persistence""" |
|
|
import concurrent.futures |
|
|
import threading |
|
|
import time |
|
|
import json |
|
|
from requests.exceptions import ReadTimeout, ConnectionError, HTTPError |
|
|
|
|
|
def download_markdown_file_with_retry(work_id: str, max_retries: int = 3) -> bool: |
|
|
"""Download a single markdown file with retry logic""" |
|
|
for attempt in range(max_retries): |
|
|
try: |
|
|
work_dir = works_dir / work_id |
|
|
work_dir.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
|
|
|
if (work_dir / f"{work_id}.md").exists(): |
|
|
return True |
|
|
|
|
|
md_file = hf_hub_download( |
|
|
repo_id=ARTEFACT_MARKDOWN_DATASET, |
|
|
filename=f"works/{work_id}/{work_id}.md", |
|
|
repo_type="dataset" |
|
|
) |
|
|
|
|
|
import shutil |
|
|
shutil.copy2(md_file, work_dir / f"{work_id}.md") |
|
|
return True |
|
|
|
|
|
except (ReadTimeout, ConnectionError, HTTPError) as e: |
|
|
if attempt < max_retries - 1: |
|
|
wait_time = 2 ** attempt |
|
|
print(f"⚠️ Retry {attempt + 1}/{max_retries} for {work_id} after {wait_time}s (error: {e})") |
|
|
time.sleep(wait_time) |
|
|
else: |
|
|
print(f"❌ Failed to download markdown for {work_id} after {max_retries} attempts: {e}") |
|
|
return False |
|
|
except Exception as e: |
|
|
print(f"❌ Unexpected error downloading {work_id}: {e}") |
|
|
return False |
|
|
|
|
|
return False |
|
|
|
|
|
def save_progress(): |
|
|
"""Save current progress to file""" |
|
|
try: |
|
|
progress_data = { |
|
|
"markdown_completed": list(progress["markdown_completed"]), |
|
|
"image_batches_completed": list(progress["image_batches_completed"]) |
|
|
} |
|
|
with open(progress_file, 'w') as f: |
|
|
json.dump(progress_data, f) |
|
|
except Exception as e: |
|
|
print(f"⚠️ Could not save progress: {e}") |
|
|
|
|
|
|
|
|
remaining_works = [w for w in work_dirs if w not in progress["markdown_completed"]] |
|
|
|
|
|
if not remaining_works: |
|
|
print("📄 All markdown files already downloaded") |
|
|
return |
|
|
|
|
|
print(f"📄 Downloading {len(remaining_works)} markdown files...") |
|
|
completed = len(progress["markdown_completed"]) |
|
|
failed = 0 |
|
|
|
|
|
|
|
|
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor: |
|
|
future_to_work = {executor.submit(download_markdown_file_with_retry, work_id): work_id for work_id in remaining_works} |
|
|
|
|
|
for future in concurrent.futures.as_completed(future_to_work): |
|
|
work_id = future_to_work[future] |
|
|
try: |
|
|
success = future.result() |
|
|
if success: |
|
|
progress["markdown_completed"].add(work_id) |
|
|
completed += 1 |
|
|
else: |
|
|
failed += 1 |
|
|
|
|
|
|
|
|
if (completed + failed) % 100 == 0: |
|
|
save_progress() |
|
|
print(f"📄 Progress: {completed}/{len(work_dirs)} markdown files (failed: {failed})") |
|
|
|
|
|
time.sleep(3) |
|
|
|
|
|
except Exception as e: |
|
|
print(f"❌ Error processing {work_id}: {e}") |
|
|
failed += 1 |
|
|
|
|
|
|
|
|
save_progress() |
|
|
print(f"✅ Phase 1 complete: {completed} markdown files downloaded, {failed} failed") |
|
|
|
|
|
def _download_images_robust(works_dir: Path, work_dirs: set, files: list, progress: dict, progress_file: Path) -> None: |
|
|
"""Download images with retry logic, progress persistence, and smaller batches""" |
|
|
import concurrent.futures |
|
|
import time |
|
|
import json |
|
|
from requests.exceptions import ReadTimeout, ConnectionError, HTTPError |
|
|
|
|
|
def download_work_images_with_retry(work_id: str, max_retries: int = 2) -> tuple: |
|
|
"""Download all images for a single work with retry logic""" |
|
|
try: |
|
|
work_dir = works_dir / work_id |
|
|
images_dir = work_dir / "images" |
|
|
images_dir.mkdir(exist_ok=True) |
|
|
|
|
|
|
|
|
work_files = [f for f in files if f.startswith(f"works/{work_id}/images/")] |
|
|
print(f"🔍 Work {work_id}: Found {len(work_files)} image files to download") |
|
|
|
|
|
downloaded = 0 |
|
|
failed = 0 |
|
|
|
|
|
for img_file in work_files: |
|
|
img_name = img_file.split("/")[-1] |
|
|
local_path = images_dir / img_name |
|
|
|
|
|
|
|
|
if local_path.exists(): |
|
|
downloaded += 1 |
|
|
continue |
|
|
|
|
|
for attempt in range(max_retries): |
|
|
try: |
|
|
downloaded_file = hf_hub_download( |
|
|
repo_id=ARTEFACT_MARKDOWN_DATASET, |
|
|
filename=img_file, |
|
|
repo_type="dataset" |
|
|
) |
|
|
|
|
|
import shutil |
|
|
shutil.copy2(downloaded_file, local_path) |
|
|
downloaded += 1 |
|
|
break |
|
|
|
|
|
except (ReadTimeout, ConnectionError, HTTPError) as e: |
|
|
if attempt < max_retries - 1: |
|
|
wait_time = 1 + attempt |
|
|
time.sleep(wait_time) |
|
|
else: |
|
|
failed += 1 |
|
|
if failed <= 5: |
|
|
print(f"⚠️ Could not download image {img_file}: {e}") |
|
|
except Exception as e: |
|
|
failed += 1 |
|
|
if failed <= 5: |
|
|
print(f"⚠️ Unexpected error downloading {img_file}: {e}") |
|
|
break |
|
|
|
|
|
return (work_id, downloaded, failed) |
|
|
|
|
|
except Exception as e: |
|
|
print(f"❌ Error downloading images for {work_id}: {e}") |
|
|
return (work_id, 0, 1) |
|
|
|
|
|
def save_progress(): |
|
|
"""Save current progress to file""" |
|
|
try: |
|
|
progress_data = { |
|
|
"markdown_completed": list(progress["markdown_completed"]), |
|
|
"image_batches_completed": list(progress["image_batches_completed"]) |
|
|
} |
|
|
with open(progress_file, 'w') as f: |
|
|
json.dump(progress_data, f) |
|
|
except Exception as e: |
|
|
print(f"⚠️ Could not save progress: {e}") |
|
|
|
|
|
|
|
|
work_list = list(work_dirs) |
|
|
batch_size = 5 |
|
|
total_downloaded = 0 |
|
|
total_failed = 0 |
|
|
|
|
|
for i in range(0, len(work_list), batch_size): |
|
|
batch = work_list[i:i + batch_size] |
|
|
batch_id = f"batch_{i//batch_size + 1}" |
|
|
|
|
|
|
|
|
if batch_id in progress["image_batches_completed"]: |
|
|
print(f"⏭️ Skipping already completed batch {batch_id}") |
|
|
continue |
|
|
|
|
|
print(f"🖼️ Processing image batch {i//batch_size + 1}/{(len(work_list) + batch_size - 1)//batch_size} ({len(batch)} works)") |
|
|
|
|
|
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor: |
|
|
future_to_work = {executor.submit(download_work_images_with_retry, work_id): work_id for work_id in batch} |
|
|
|
|
|
for future in concurrent.futures.as_completed(future_to_work): |
|
|
work_id = future_to_work[future] |
|
|
try: |
|
|
work_id, downloaded, failed = future.result() |
|
|
total_downloaded += downloaded |
|
|
total_failed += failed |
|
|
except Exception as e: |
|
|
print(f"❌ Error processing {work_id}: {e}") |
|
|
total_failed += 1 |
|
|
|
|
|
|
|
|
progress["image_batches_completed"].add(batch_id) |
|
|
save_progress() |
|
|
|
|
|
|
|
|
print(f"⏳ Waiting 10 seconds before next batch...") |
|
|
time.sleep(10) |
|
|
|
|
|
print(f"✅ Phase 2 complete: {total_downloaded} images downloaded, {total_failed} failed") |
|
|
|
|
|
def _download_markdown_files_fallback(cache_dir: Path) -> Optional[Path]: |
|
|
"""Fallback method to download markdown files individually""" |
|
|
try: |
|
|
works_dir = cache_dir / "works" |
|
|
works_dir.mkdir(exist_ok=True) |
|
|
|
|
|
|
|
|
|
|
|
print("⚠️ Using fallback markdown loading - some files may be missing") |
|
|
return works_dir |
|
|
|
|
|
except Exception as e: |
|
|
print(f"❌ Fallback markdown loading failed: {e}") |
|
|
return None |
|
|
|
|
|
def get_markdown_dir(force_refresh: bool = False) -> Path: |
|
|
"""Get the markdown directory, loading from HF if needed""" |
|
|
global _markdown_dir_cache |
|
|
|
|
|
if _markdown_dir_cache is None or force_refresh: |
|
|
_markdown_dir_cache = load_markdown_dataset(force_refresh=force_refresh) |
|
|
|
|
|
if _markdown_dir_cache and _markdown_dir_cache.exists(): |
|
|
return _markdown_dir_cache |
|
|
else: |
|
|
|
|
|
print("⚠️ Using fallback local markdown directory") |
|
|
return DATA_READ_ROOT / "marker_output" |
|
|
|
|
|
|
|
|
JSON_DATASETS = load_json_datasets() |
|
|
EMBEDDINGS_DATASETS = load_embeddings_datasets |
|
|
|