Spaces:
Paused
Paused
| import os | |
| import zipfile | |
| from pathlib import Path | |
| from typing import Optional | |
| from huggingface_hub import snapshot_download # type: ignore | |
| def _unzip_images_if_needed(root: str) -> None: | |
| """ | |
| If an archive like images.zip exists in the dataset root, extract it to root/images. | |
| """ | |
| images_dir = os.path.join(root, "images") | |
| if os.path.isdir(images_dir) and any(Path(images_dir).glob("*")): | |
| print(f"β Images already present in {images_dir}") | |
| return | |
| # Common zip names at root or subfolders | |
| candidates = [os.path.join(root, name) for name in ("images.zip", "polyvore-images.zip", "imgs.zip")] | |
| # Also search recursively for any *images*.zip | |
| for p in Path(root).rglob("*images*.zip"): | |
| candidates.append(str(p)) | |
| for zpath in candidates: | |
| if os.path.isfile(zpath): | |
| print(f"π§ Found image archive: {zpath}") | |
| print(f"π Extracting to: {images_dir}") | |
| os.makedirs(images_dir, exist_ok=True) | |
| try: | |
| with zipfile.ZipFile(zpath, "r") as zf: | |
| # Get total size for progress | |
| total_size = sum(f.file_size for f in zf.filelist) | |
| extracted_size = 0 | |
| for file_info in zf.filelist: | |
| zf.extract(file_info, images_dir) | |
| extracted_size += file_info.file_size | |
| # Progress update every 100MB | |
| if extracted_size % (100 * 1024 * 1024) < file_info.file_size: | |
| progress = (extracted_size / total_size) * 100 | |
| print(f"π¦ Extraction progress: {progress:.1f}%") | |
| print(f"β Successfully extracted {len(zf.filelist)} files") | |
| return | |
| except Exception as e: | |
| print(f"β Failed to extract {zpath}: {e}") | |
| continue | |
| print("β οΈ No image archive found to extract") | |
| def ensure_dataset_ready() -> Optional[str]: | |
| """ | |
| Self-contained dataset fetcher for the Polyvore dataset from Hugging Face. | |
| - Downloads the dataset repo Stylique/Polyvore into ./data/Polyvore | |
| - Unzips images.zip into ./data/Polyvore/images | |
| - Returns the dataset root path | |
| """ | |
| root = os.path.abspath(os.path.join(os.getcwd(), "data", "Polyvore")) | |
| Path(root).mkdir(parents=True, exist_ok=True) | |
| print(f"π Checking dataset at: {root}") | |
| # Check if we already have the essential files | |
| images_dir = os.path.join(root, "images") | |
| metadata_files = [ | |
| "polyvore_item_metadata.json", | |
| "polyvore_outfit_titles.json", | |
| "categories.csv" | |
| ] | |
| has_images = os.path.isdir(images_dir) and any(Path(images_dir).glob("*")) | |
| has_metadata = all(os.path.exists(os.path.join(root, f)) for f in metadata_files) | |
| if has_images and has_metadata: | |
| print("β Dataset already complete - skipping download and extraction") | |
| return root | |
| # Download the HF dataset snapshot into root | |
| try: | |
| print("π₯ Downloading Polyvore dataset from Hugging Face...") | |
| # Only fetch what's needed to run and prepare splits | |
| allow = [ | |
| "images.zip", | |
| # root-level (some mirrors place jsons here) | |
| "train.json", | |
| "valid.json", | |
| "test.json", | |
| # official splits often live here | |
| "nondisjoint/train.json", | |
| "nondisjoint/valid.json", | |
| "nondisjoint/test.json", | |
| "disjoint/train.json", | |
| "disjoint/valid.json", | |
| "disjoint/test.json", | |
| # light metadata | |
| "polyvore_item_metadata.json", | |
| "polyvore_outfit_titles.json", | |
| "categories.csv", | |
| ] | |
| # Explicit ignores to prevent huge downloads (>10GB) | |
| ignore = [ | |
| "**/*hglmm*", | |
| "**/*.tar", | |
| "**/*.tar.gz", | |
| "**/*.7z", | |
| "**/large/**", | |
| ] | |
| need_download = not ( | |
| has_metadata and ( | |
| # any location providing official splits is acceptable | |
| all(os.path.exists(os.path.join(root, f)) for f in ["train.json", "valid.json", "test.json"]) or | |
| all(os.path.exists(os.path.join(root, "nondisjoint", f)) for f in ["train.json", "valid.json", "test.json"]) or | |
| all(os.path.exists(os.path.join(root, "disjoint", f)) for f in ["train.json", "valid.json", "test.json"]) | |
| ) | |
| ) | |
| # Only download if images are missing | |
| if not has_images: | |
| print("π Starting download...") | |
| snapshot_download( | |
| "Stylique/Polyvore", | |
| repo_type="dataset", | |
| local_dir=root, | |
| local_dir_use_symlinks=False, | |
| allow_patterns=allow, | |
| ignore_patterns=ignore, | |
| ) | |
| print("β Download completed") | |
| # Extract images after download | |
| _unzip_images_if_needed(root) | |
| elif not has_metadata: | |
| # Only download metadata if images exist but metadata is missing | |
| print("π₯ Downloading missing metadata files...") | |
| snapshot_download( | |
| "Stylique/Polyvore", | |
| repo_type="dataset", | |
| local_dir=root, | |
| local_dir_use_symlinks=False, | |
| allow_patterns=["polyvore_item_metadata.json", "polyvore_outfit_titles.json", "categories.csv"], | |
| ignore_patterns=ignore, | |
| ) | |
| print("β Metadata download completed") | |
| else: | |
| print("β All required files already present") | |
| except Exception as e: | |
| print(f"β Failed to download Stylique/Polyvore dataset: {e}") | |
| print("π§ Trying to work with existing files...") | |
| # Check what we have locally | |
| existing_files = [] | |
| for file_path in Path(root).rglob("*"): | |
| if file_path.is_file(): | |
| existing_files.append(str(file_path.relative_to(root))) | |
| if existing_files: | |
| print(f"π Found {len(existing_files)} existing files:") | |
| for f in sorted(existing_files)[:10]: # Show first 10 | |
| print(f" - {f}") | |
| if len(existing_files) > 10: | |
| print(f" ... and {len(existing_files) - 10} more") | |
| else: | |
| print("π No existing files found") | |
| return None | |
| # Unzip images if needed | |
| _unzip_images_if_needed(root) | |
| # Final verification | |
| if os.path.isdir(images_dir) and any(Path(images_dir).glob("*")): | |
| print(f"β Dataset ready at: {root}") | |
| print(f"π Images: {len(list(Path(images_dir).glob('*')))} files") | |
| # Check metadata | |
| for meta_file in metadata_files: | |
| meta_path = os.path.join(root, meta_file) | |
| if os.path.exists(meta_path): | |
| size_bytes = os.path.getsize(meta_path) | |
| if size_bytes < 1024 * 1024: # Less than 1MB | |
| size_kb = size_bytes / 1024 | |
| print(f"π {meta_file}: {size_kb:.1f} KB") | |
| else: | |
| size_mb = size_bytes / (1024 * 1024) | |
| print(f"π {meta_file}: {size_mb:.1f} MB") | |
| else: | |
| print(f"β οΈ Missing: {meta_file}") | |
| return root | |
| else: | |
| print("β Failed to prepare dataset") | |
| return None | |
| def check_dataset_structure(root: str) -> dict: | |
| """Check the structure of the downloaded dataset.""" | |
| structure = { | |
| "root": root, | |
| "images": {"exists": False, "count": 0, "path": os.path.join(root, "images")}, | |
| "metadata": {}, | |
| "splits": {}, | |
| "status": "unknown" | |
| } | |
| # Check images | |
| images_dir = os.path.join(root, "images") | |
| if os.path.isdir(images_dir): | |
| image_files = list(Path(images_dir).glob("*")) | |
| structure["images"]["exists"] = True | |
| structure["images"]["count"] = len(image_files) | |
| structure["images"]["extensions"] = list(set(f.suffix.lower() for f in image_files)) | |
| # Check metadata files | |
| metadata_files = [ | |
| "polyvore_item_metadata.json", | |
| "polyvore_outfit_titles.json", | |
| "categories.csv" | |
| ] | |
| for meta_file in metadata_files: | |
| meta_path = os.path.join(root, meta_file) | |
| if os.path.exists(meta_path): | |
| size_bytes = os.path.getsize(meta_path) | |
| if size_bytes < 1024 * 1024: # Less than 1MB | |
| size_kb = size_bytes / 1024 | |
| structure["metadata"][meta_file] = {"exists": True, "size_kb": size_kb} | |
| else: | |
| size_mb = size_bytes / (1024 * 1024) | |
| structure["metadata"][meta_file] = {"exists": True, "size_mb": size_mb} | |
| else: | |
| structure["metadata"][meta_file] = {"exists": False, "size_mb": 0, "size_kb": 0} | |
| # Check for splits | |
| split_locations = [ | |
| ("root", ["train.json", "valid.json", "test.json"]), | |
| ("nondisjoint", ["train.json", "valid.json", "test.json"]), | |
| ("disjoint", ["train.json", "valid.json", "test.json"]), | |
| ("splits", ["train.json", "valid.json", "test.json"]) | |
| ] | |
| for location, files in split_locations: | |
| location_path = os.path.join(root, location) | |
| if os.path.exists(location_path): | |
| structure["splits"][location] = {} | |
| for split_file in files: | |
| split_path = os.path.join(location_path, split_file) | |
| if os.path.exists(split_path): | |
| size_bytes = os.path.getsize(split_path) | |
| if size_bytes < 1024 * 1024: # Less than 1MB | |
| size_kb = size_bytes / 1024 | |
| structure["splits"][location][split_file] = {"exists": True, "size_kb": size_kb} | |
| else: | |
| size_mb = size_bytes / (1024 * 1024) | |
| structure["splits"][location][split_file] = {"exists": True, "size_mb": size_mb} | |
| else: | |
| structure["splits"][location][split_file] = {"exists": False, "size_mb": 0, "size_kb": 0} | |
| else: | |
| structure["splits"][location] = "directory_not_found" | |
| # Determine overall status | |
| if structure["images"]["exists"] and structure["images"]["count"] > 0: | |
| if any(meta["exists"] for meta in structure["metadata"].values()): | |
| structure["status"] = "ready" | |
| else: | |
| structure["status"] = "partial" | |
| else: | |
| structure["status"] = "incomplete" | |
| return structure | |
| if __name__ == "__main__": | |
| # Test the dataset fetcher | |
| print("π§ͺ Testing Polyvore dataset fetcher...") | |
| root = ensure_dataset_ready() | |
| if root: | |
| print(f"\nπ Dataset structure:") | |
| structure = check_dataset_structure(root) | |
| import json | |
| print(json.dumps(structure, indent=2)) | |
| else: | |
| print("β Failed to prepare dataset") | |