Spaces:
Running
on
Zero
Running
on
Zero
File size: 10,165 Bytes
9012453 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 |
import json
import os
import tempfile
import time
import uuid
from pathlib import Path
import datasets
import pandas as pd
from datasets import Image
from huggingface_hub import repo_exists, CommitScheduler
def update_dataset_with_new_splits(new_splits: dict, process_name: str = "Main"):
"""
Add new splits to a regular HuggingFace dataset without downloading existing data.
This function pushes individual splits to the Hub using the split parameter,
which preserves all existing splits and only adds/updates the specified ones.
Key Features:
- No downloading of existing dataset required
- Existing splits are preserved (not overwritten)
- Each split is pushed individually using dataset.push_to_hub(split="name")
- Efficient for large datasets with many splits
Args:
new_splits: dict of {split_name: DataFrame} - splits to add/update
process_name: Name for logging and commit messages
Example:
new_splits = {
"validation_2024": val_df,
"test_batch_1": test_df,
"custom_split": custom_df
}
update_dataset_with_new_splits(new_splits)
"""
repo_id = os.environ["REPO_ID"]
hf_token = os.environ["HF_TOKEN"]
print(f"\n[{process_name}] Starting dataset splits update process...")
# --- Start of Critical Section ---
if not repo_exists(repo_id, repo_type="dataset", token=hf_token):
print(f"[{process_name}] Repository {repo_id} not found. Cannot update.")
return
# Skip downloading existing dataset - we'll push only new splits
print(f"[{process_name}] Preparing to push {len(new_splits)} new splits individually...")
# Prepare each split for individual pushing
splits_to_push = []
for split_id, df in new_splits.items():
new_split_dataset = datasets.Dataset.from_pandas(df)
splits_to_push.append((split_id, new_split_dataset))
print(f"[{process_name}] Prepared split '{split_id}' with {len(new_split_dataset)} entries.")
# Push individual splits to Hub with Retry Mechanism
_push_splits_to_hub(splits_to_push, repo_id, hf_token, process_name)
print(f"[{process_name}] Finished pushing new dataset splits to Hub.")
def update_dataset_with_new_images(image_df: pd.DataFrame, process_name: str = "Main", scheduler: CommitScheduler=None, dataset_dir: Path=None, jsonl_path: Path=None):
"""
Add new images to an image HuggingFace dataset using smart approach:
- If dataset is empty/doesn't exist: Create proper HuggingFace dataset
- If dataset has data: Use CommitScheduler for efficient incremental updates
Key Features:
- Automatically detects empty datasets and bootstraps them
- Uses CommitScheduler for incremental updates on existing datasets
- Saves images as PNG files with unique names
- Stores metadata in JSONL format for file-based approach
- Thread-safe with scheduler locking
Args:
image_df: DataFrame with 'id' and 'image' columns (image should be PIL Image objects)
process_name: Name for logging and commit messages
Example:
img_df = pd.DataFrame([{"id": "img1", "image": pil_image}])
update_dataset_with_new_images(img_df)
"""
print(f"\n[{process_name}] Starting image dataset update...")
# Validate input format for image datasets
if not hasattr(image_df, 'columns'):
raise ValueError(f"image_df must be a pandas DataFrame with 'id' and 'image' columns, got {type(image_df)}")
# Validate required columns
required_columns = ['id', 'image', 'annotated_image']
missing_columns = [col for col in required_columns if col not in image_df.columns]
if missing_columns:
raise ValueError(f"Missing required columns: {missing_columns}. Found columns: {list(image_df.columns)}")
print(f"[{process_name}] Validated DataFrame with {len(image_df)} entries and columns: {list(image_df.columns)}")
return _append_images_with_scheduler(image_df, process_name, scheduler, dataset_dir, jsonl_path)
def _append_images_with_scheduler(image_df: pd.DataFrame, process_name: str, scheduler, dataset_dir, jsonl_path):
"""
Append images to existing dataset using CommitScheduler for efficient incremental updates.
"""
print(f"[{process_name}] Using CommitScheduler for incremental updates...")
print(f"[IMAGE_SCHEDULER] Created CommitScheduler for {os.environ['IMAGE_REPO_ID']} with local path: {dataset_dir}")
# Process each image
saved_count = 0
failed_count = 0
for idx, row in image_df.iterrows():
try:
image_id = row['id']
image = row['image']
annotated_image = row['annotated_image'] # Optional
# Skip if image is None
if image is None:
print(f"[{process_name}] Skipping image {image_id}: image is None")
failed_count += 1
continue
if annotated_image is None:
print(f"[{process_name}] Warning: annotated_image is None for {image_id}")
failed_count += 1
continue
# Generate unique filename
unique_filename_orig = f"{uuid.uuid4()}_orig.png"
unique_filename_ann = f"{uuid.uuid4()}_ann.png"
image_path_orig = dataset_dir / unique_filename_orig
image_path_ann = dataset_dir / unique_filename_ann
# Save image and metadata with scheduler
with scheduler.lock:
# Save image file
if hasattr(image, 'save'):
# PIL Image object
image.save(image_path_orig, format='PNG')
elif hasattr(image, 'shape'):
# Numpy array
from PIL import Image as PILImage
PILImage.fromarray(image).save(image_path_orig, format='PNG')
else:
print(f"[{process_name}] Warning: Unsupported image type for {image_id}: {type(image)}")
failed_count += 1
continue
# Save annotated image file
if hasattr(annotated_image, 'save'):
annotated_image.save(image_path_ann, format='PNG')
elif hasattr(annotated_image, 'shape'):
from PIL import Image as PILImage
PILImage.fromarray(annotated_image).save(image_path_ann, format='PNG')
else:
print(f"[{process_name}] Warning: Unsupported annotated_image type for {image_id}: {type(annotated_image)}")
failed_count += 1
continue
# Append metadata to JSONL
metadata = {
"id": image_id,
"file_name": unique_filename_orig,
"annotated_file_name": unique_filename_ann
}
with jsonl_path.open("a", encoding="utf-8") as f:
json.dump(metadata, f, ensure_ascii=False)
f.write("\n")
saved_count += 1
print(f"[{process_name}] Saved image {saved_count}/{len(image_df)}: {image_id} -> {unique_filename_orig}")
except Exception as e:
print(f"[{process_name}] Error processing image {image_id}: {e}")
failed_count += 1
continue
print(f"[{process_name}] Finished image dataset update:")
print(f"[{process_name}] - Successfully saved: {saved_count} images")
print(f"[{process_name}] - Failed: {failed_count} images")
print(f"[{process_name}] - Images will be automatically committed to dataset repository")
if saved_count == 0:
print(f"[{process_name}] Warning: No images were successfully saved")
return saved_count, failed_count
def _push_splits_to_hub(splits_to_push: list, repo_id: str, hf_token: str, process_name: str):
"""
Helper function to push individual splits to Hub with retry mechanism.
Args:
splits_to_push: List of (split_name, dataset) tuples
repo_id: HuggingFace repository ID
hf_token: HuggingFace token
process_name: Process name for logging
"""
max_retries = 5
successful_splits = []
failed_splits = []
for split_name, split_dataset in splits_to_push:
print(f"[{process_name}] Pushing split '{split_name}' with {len(split_dataset)} entries...")
for attempt in range(max_retries):
try:
print(f"[{process_name}] Pushing split '{split_name}' (Attempt {attempt + 1}/{max_retries})...")
split_dataset.push_to_hub(
repo_id=repo_id,
split=split_name, # This preserves existing splits
token=hf_token,
commit_message=f"feat: Add split '{split_name}' from {process_name} with {len(split_dataset)} entries"
)
print(f"[{process_name}] Split '{split_name}' pushed successfully on attempt {attempt + 1}.")
successful_splits.append(split_name)
break # Exit retry loop on success
except Exception as e:
print(f"[{process_name}] Split '{split_name}' push attempt {attempt + 1} failed: {e}")
if attempt < max_retries - 1:
wait_time = 5
print(f"[{process_name}] Waiting for {wait_time} seconds before retrying...")
time.sleep(wait_time)
else:
print(f"[{process_name}] All {max_retries} push attempts failed for split '{split_name}'.")
failed_splits.append(split_name)
# Report results
if successful_splits:
print(f"[{process_name}] Successfully pushed splits: {successful_splits}")
if failed_splits:
print(f"[{process_name}] Failed to push splits: {failed_splits}")
|