DeceptivePatternDetector / py_files /dataset_upload.py
Asmit
Initial commit
9012453
raw
history blame
10.2 kB
import json
import os
import tempfile
import time
import uuid
from pathlib import Path
import datasets
import pandas as pd
from datasets import Image
from huggingface_hub import repo_exists, CommitScheduler
def update_dataset_with_new_splits(new_splits: dict, process_name: str = "Main"):
"""
Add new splits to a regular HuggingFace dataset without downloading existing data.
This function pushes individual splits to the Hub using the split parameter,
which preserves all existing splits and only adds/updates the specified ones.
Key Features:
- No downloading of existing dataset required
- Existing splits are preserved (not overwritten)
- Each split is pushed individually using dataset.push_to_hub(split="name")
- Efficient for large datasets with many splits
Args:
new_splits: dict of {split_name: DataFrame} - splits to add/update
process_name: Name for logging and commit messages
Example:
new_splits = {
"validation_2024": val_df,
"test_batch_1": test_df,
"custom_split": custom_df
}
update_dataset_with_new_splits(new_splits)
"""
repo_id = os.environ["REPO_ID"]
hf_token = os.environ["HF_TOKEN"]
print(f"\n[{process_name}] Starting dataset splits update process...")
# --- Start of Critical Section ---
if not repo_exists(repo_id, repo_type="dataset", token=hf_token):
print(f"[{process_name}] Repository {repo_id} not found. Cannot update.")
return
# Skip downloading existing dataset - we'll push only new splits
print(f"[{process_name}] Preparing to push {len(new_splits)} new splits individually...")
# Prepare each split for individual pushing
splits_to_push = []
for split_id, df in new_splits.items():
new_split_dataset = datasets.Dataset.from_pandas(df)
splits_to_push.append((split_id, new_split_dataset))
print(f"[{process_name}] Prepared split '{split_id}' with {len(new_split_dataset)} entries.")
# Push individual splits to Hub with Retry Mechanism
_push_splits_to_hub(splits_to_push, repo_id, hf_token, process_name)
print(f"[{process_name}] Finished pushing new dataset splits to Hub.")
def update_dataset_with_new_images(image_df: pd.DataFrame, process_name: str = "Main", scheduler: CommitScheduler=None, dataset_dir: Path=None, jsonl_path: Path=None):
"""
Add new images to an image HuggingFace dataset using smart approach:
- If dataset is empty/doesn't exist: Create proper HuggingFace dataset
- If dataset has data: Use CommitScheduler for efficient incremental updates
Key Features:
- Automatically detects empty datasets and bootstraps them
- Uses CommitScheduler for incremental updates on existing datasets
- Saves images as PNG files with unique names
- Stores metadata in JSONL format for file-based approach
- Thread-safe with scheduler locking
Args:
image_df: DataFrame with 'id' and 'image' columns (image should be PIL Image objects)
process_name: Name for logging and commit messages
Example:
img_df = pd.DataFrame([{"id": "img1", "image": pil_image}])
update_dataset_with_new_images(img_df)
"""
print(f"\n[{process_name}] Starting image dataset update...")
# Validate input format for image datasets
if not hasattr(image_df, 'columns'):
raise ValueError(f"image_df must be a pandas DataFrame with 'id' and 'image' columns, got {type(image_df)}")
# Validate required columns
required_columns = ['id', 'image', 'annotated_image']
missing_columns = [col for col in required_columns if col not in image_df.columns]
if missing_columns:
raise ValueError(f"Missing required columns: {missing_columns}. Found columns: {list(image_df.columns)}")
print(f"[{process_name}] Validated DataFrame with {len(image_df)} entries and columns: {list(image_df.columns)}")
return _append_images_with_scheduler(image_df, process_name, scheduler, dataset_dir, jsonl_path)
def _append_images_with_scheduler(image_df: pd.DataFrame, process_name: str, scheduler, dataset_dir, jsonl_path):
"""
Append images to existing dataset using CommitScheduler for efficient incremental updates.
"""
print(f"[{process_name}] Using CommitScheduler for incremental updates...")
print(f"[IMAGE_SCHEDULER] Created CommitScheduler for {os.environ['IMAGE_REPO_ID']} with local path: {dataset_dir}")
# Process each image
saved_count = 0
failed_count = 0
for idx, row in image_df.iterrows():
try:
image_id = row['id']
image = row['image']
annotated_image = row['annotated_image'] # Optional
# Skip if image is None
if image is None:
print(f"[{process_name}] Skipping image {image_id}: image is None")
failed_count += 1
continue
if annotated_image is None:
print(f"[{process_name}] Warning: annotated_image is None for {image_id}")
failed_count += 1
continue
# Generate unique filename
unique_filename_orig = f"{uuid.uuid4()}_orig.png"
unique_filename_ann = f"{uuid.uuid4()}_ann.png"
image_path_orig = dataset_dir / unique_filename_orig
image_path_ann = dataset_dir / unique_filename_ann
# Save image and metadata with scheduler
with scheduler.lock:
# Save image file
if hasattr(image, 'save'):
# PIL Image object
image.save(image_path_orig, format='PNG')
elif hasattr(image, 'shape'):
# Numpy array
from PIL import Image as PILImage
PILImage.fromarray(image).save(image_path_orig, format='PNG')
else:
print(f"[{process_name}] Warning: Unsupported image type for {image_id}: {type(image)}")
failed_count += 1
continue
# Save annotated image file
if hasattr(annotated_image, 'save'):
annotated_image.save(image_path_ann, format='PNG')
elif hasattr(annotated_image, 'shape'):
from PIL import Image as PILImage
PILImage.fromarray(annotated_image).save(image_path_ann, format='PNG')
else:
print(f"[{process_name}] Warning: Unsupported annotated_image type for {image_id}: {type(annotated_image)}")
failed_count += 1
continue
# Append metadata to JSONL
metadata = {
"id": image_id,
"file_name": unique_filename_orig,
"annotated_file_name": unique_filename_ann
}
with jsonl_path.open("a", encoding="utf-8") as f:
json.dump(metadata, f, ensure_ascii=False)
f.write("\n")
saved_count += 1
print(f"[{process_name}] Saved image {saved_count}/{len(image_df)}: {image_id} -> {unique_filename_orig}")
except Exception as e:
print(f"[{process_name}] Error processing image {image_id}: {e}")
failed_count += 1
continue
print(f"[{process_name}] Finished image dataset update:")
print(f"[{process_name}] - Successfully saved: {saved_count} images")
print(f"[{process_name}] - Failed: {failed_count} images")
print(f"[{process_name}] - Images will be automatically committed to dataset repository")
if saved_count == 0:
print(f"[{process_name}] Warning: No images were successfully saved")
return saved_count, failed_count
def _push_splits_to_hub(splits_to_push: list, repo_id: str, hf_token: str, process_name: str):
"""
Helper function to push individual splits to Hub with retry mechanism.
Args:
splits_to_push: List of (split_name, dataset) tuples
repo_id: HuggingFace repository ID
hf_token: HuggingFace token
process_name: Process name for logging
"""
max_retries = 5
successful_splits = []
failed_splits = []
for split_name, split_dataset in splits_to_push:
print(f"[{process_name}] Pushing split '{split_name}' with {len(split_dataset)} entries...")
for attempt in range(max_retries):
try:
print(f"[{process_name}] Pushing split '{split_name}' (Attempt {attempt + 1}/{max_retries})...")
split_dataset.push_to_hub(
repo_id=repo_id,
split=split_name, # This preserves existing splits
token=hf_token,
commit_message=f"feat: Add split '{split_name}' from {process_name} with {len(split_dataset)} entries"
)
print(f"[{process_name}] Split '{split_name}' pushed successfully on attempt {attempt + 1}.")
successful_splits.append(split_name)
break # Exit retry loop on success
except Exception as e:
print(f"[{process_name}] Split '{split_name}' push attempt {attempt + 1} failed: {e}")
if attempt < max_retries - 1:
wait_time = 5
print(f"[{process_name}] Waiting for {wait_time} seconds before retrying...")
time.sleep(wait_time)
else:
print(f"[{process_name}] All {max_retries} push attempts failed for split '{split_name}'.")
failed_splits.append(split_name)
# Report results
if successful_splits:
print(f"[{process_name}] Successfully pushed splits: {successful_splits}")
if failed_splits:
print(f"[{process_name}] Failed to push splits: {failed_splits}")