DeceptivePatternDetector / py_files /dataset_upload.py
Asmit
Initial commit
9012453
import json
import os
import tempfile
import time
import uuid
from pathlib import Path
import datasets
import pandas as pd
from datasets import Image
from huggingface_hub import repo_exists, CommitScheduler
def update_dataset_with_new_splits(new_splits: dict, process_name: str = "Main"):
"""
Add new splits to a regular HuggingFace dataset without downloading existing data.
This function pushes individual splits to the Hub using the split parameter,
which preserves all existing splits and only adds/updates the specified ones.
Key Features:
- No downloading of existing dataset required
- Existing splits are preserved (not overwritten)
- Each split is pushed individually using dataset.push_to_hub(split="name")
- Efficient for large datasets with many splits
Args:
new_splits: dict of {split_name: DataFrame} - splits to add/update
process_name: Name for logging and commit messages
Example:
new_splits = {
"validation_2024": val_df,
"test_batch_1": test_df,
"custom_split": custom_df
}
update_dataset_with_new_splits(new_splits)
"""
repo_id = os.environ["REPO_ID"]
hf_token = os.environ["HF_TOKEN"]
print(f"\n[{process_name}] Starting dataset splits update process...")
# --- Start of Critical Section ---
if not repo_exists(repo_id, repo_type="dataset", token=hf_token):
print(f"[{process_name}] Repository {repo_id} not found. Cannot update.")
return
# Skip downloading existing dataset - we'll push only new splits
print(f"[{process_name}] Preparing to push {len(new_splits)} new splits individually...")
# Prepare each split for individual pushing
splits_to_push = []
for split_id, df in new_splits.items():
new_split_dataset = datasets.Dataset.from_pandas(df)
splits_to_push.append((split_id, new_split_dataset))
print(f"[{process_name}] Prepared split '{split_id}' with {len(new_split_dataset)} entries.")
# Push individual splits to Hub with Retry Mechanism
_push_splits_to_hub(splits_to_push, repo_id, hf_token, process_name)
print(f"[{process_name}] Finished pushing new dataset splits to Hub.")
def update_dataset_with_new_images(image_df: pd.DataFrame, process_name: str = "Main", scheduler: CommitScheduler=None, dataset_dir: Path=None, jsonl_path: Path=None):
"""
Add new images to an image HuggingFace dataset using smart approach:
- If dataset is empty/doesn't exist: Create proper HuggingFace dataset
- If dataset has data: Use CommitScheduler for efficient incremental updates
Key Features:
- Automatically detects empty datasets and bootstraps them
- Uses CommitScheduler for incremental updates on existing datasets
- Saves images as PNG files with unique names
- Stores metadata in JSONL format for file-based approach
- Thread-safe with scheduler locking
Args:
image_df: DataFrame with 'id' and 'image' columns (image should be PIL Image objects)
process_name: Name for logging and commit messages
Example:
img_df = pd.DataFrame([{"id": "img1", "image": pil_image}])
update_dataset_with_new_images(img_df)
"""
print(f"\n[{process_name}] Starting image dataset update...")
# Validate input format for image datasets
if not hasattr(image_df, 'columns'):
raise ValueError(f"image_df must be a pandas DataFrame with 'id' and 'image' columns, got {type(image_df)}")
# Validate required columns
required_columns = ['id', 'image', 'annotated_image']
missing_columns = [col for col in required_columns if col not in image_df.columns]
if missing_columns:
raise ValueError(f"Missing required columns: {missing_columns}. Found columns: {list(image_df.columns)}")
print(f"[{process_name}] Validated DataFrame with {len(image_df)} entries and columns: {list(image_df.columns)}")
return _append_images_with_scheduler(image_df, process_name, scheduler, dataset_dir, jsonl_path)
def _append_images_with_scheduler(image_df: pd.DataFrame, process_name: str, scheduler, dataset_dir, jsonl_path):
"""
Append images to existing dataset using CommitScheduler for efficient incremental updates.
"""
print(f"[{process_name}] Using CommitScheduler for incremental updates...")
print(f"[IMAGE_SCHEDULER] Created CommitScheduler for {os.environ['IMAGE_REPO_ID']} with local path: {dataset_dir}")
# Process each image
saved_count = 0
failed_count = 0
for idx, row in image_df.iterrows():
try:
image_id = row['id']
image = row['image']
annotated_image = row['annotated_image'] # Optional
# Skip if image is None
if image is None:
print(f"[{process_name}] Skipping image {image_id}: image is None")
failed_count += 1
continue
if annotated_image is None:
print(f"[{process_name}] Warning: annotated_image is None for {image_id}")
failed_count += 1
continue
# Generate unique filename
unique_filename_orig = f"{uuid.uuid4()}_orig.png"
unique_filename_ann = f"{uuid.uuid4()}_ann.png"
image_path_orig = dataset_dir / unique_filename_orig
image_path_ann = dataset_dir / unique_filename_ann
# Save image and metadata with scheduler
with scheduler.lock:
# Save image file
if hasattr(image, 'save'):
# PIL Image object
image.save(image_path_orig, format='PNG')
elif hasattr(image, 'shape'):
# Numpy array
from PIL import Image as PILImage
PILImage.fromarray(image).save(image_path_orig, format='PNG')
else:
print(f"[{process_name}] Warning: Unsupported image type for {image_id}: {type(image)}")
failed_count += 1
continue
# Save annotated image file
if hasattr(annotated_image, 'save'):
annotated_image.save(image_path_ann, format='PNG')
elif hasattr(annotated_image, 'shape'):
from PIL import Image as PILImage
PILImage.fromarray(annotated_image).save(image_path_ann, format='PNG')
else:
print(f"[{process_name}] Warning: Unsupported annotated_image type for {image_id}: {type(annotated_image)}")
failed_count += 1
continue
# Append metadata to JSONL
metadata = {
"id": image_id,
"file_name": unique_filename_orig,
"annotated_file_name": unique_filename_ann
}
with jsonl_path.open("a", encoding="utf-8") as f:
json.dump(metadata, f, ensure_ascii=False)
f.write("\n")
saved_count += 1
print(f"[{process_name}] Saved image {saved_count}/{len(image_df)}: {image_id} -> {unique_filename_orig}")
except Exception as e:
print(f"[{process_name}] Error processing image {image_id}: {e}")
failed_count += 1
continue
print(f"[{process_name}] Finished image dataset update:")
print(f"[{process_name}] - Successfully saved: {saved_count} images")
print(f"[{process_name}] - Failed: {failed_count} images")
print(f"[{process_name}] - Images will be automatically committed to dataset repository")
if saved_count == 0:
print(f"[{process_name}] Warning: No images were successfully saved")
return saved_count, failed_count
def _push_splits_to_hub(splits_to_push: list, repo_id: str, hf_token: str, process_name: str):
"""
Helper function to push individual splits to Hub with retry mechanism.
Args:
splits_to_push: List of (split_name, dataset) tuples
repo_id: HuggingFace repository ID
hf_token: HuggingFace token
process_name: Process name for logging
"""
max_retries = 5
successful_splits = []
failed_splits = []
for split_name, split_dataset in splits_to_push:
print(f"[{process_name}] Pushing split '{split_name}' with {len(split_dataset)} entries...")
for attempt in range(max_retries):
try:
print(f"[{process_name}] Pushing split '{split_name}' (Attempt {attempt + 1}/{max_retries})...")
split_dataset.push_to_hub(
repo_id=repo_id,
split=split_name, # This preserves existing splits
token=hf_token,
commit_message=f"feat: Add split '{split_name}' from {process_name} with {len(split_dataset)} entries"
)
print(f"[{process_name}] Split '{split_name}' pushed successfully on attempt {attempt + 1}.")
successful_splits.append(split_name)
break # Exit retry loop on success
except Exception as e:
print(f"[{process_name}] Split '{split_name}' push attempt {attempt + 1} failed: {e}")
if attempt < max_retries - 1:
wait_time = 5
print(f"[{process_name}] Waiting for {wait_time} seconds before retrying...")
time.sleep(wait_time)
else:
print(f"[{process_name}] All {max_retries} push attempts failed for split '{split_name}'.")
failed_splits.append(split_name)
# Report results
if successful_splits:
print(f"[{process_name}] Successfully pushed splits: {successful_splits}")
if failed_splits:
print(f"[{process_name}] Failed to push splits: {failed_splits}")