File size: 10,165 Bytes
9012453
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
import json
import os
import tempfile
import time
import uuid
from pathlib import Path

import datasets
import pandas as pd
from datasets import Image
from huggingface_hub import repo_exists, CommitScheduler


def update_dataset_with_new_splits(new_splits: dict, process_name: str = "Main"):
    """
    Add new splits to a regular HuggingFace dataset without downloading existing data.
    
    This function pushes individual splits to the Hub using the split parameter,
    which preserves all existing splits and only adds/updates the specified ones.
    
    Key Features:
    - No downloading of existing dataset required
    - Existing splits are preserved (not overwritten)
    - Each split is pushed individually using dataset.push_to_hub(split="name")
    - Efficient for large datasets with many splits
    
    Args:
        new_splits: dict of {split_name: DataFrame} - splits to add/update
        process_name: Name for logging and commit messages
                         
    Example:
        new_splits = {
            "validation_2024": val_df,
            "test_batch_1": test_df,
            "custom_split": custom_df
        }
        update_dataset_with_new_splits(new_splits)
    """
    repo_id = os.environ["REPO_ID"]
    hf_token = os.environ["HF_TOKEN"]
    print(f"\n[{process_name}] Starting dataset splits update process...")

    # --- Start of Critical Section ---
    if not repo_exists(repo_id, repo_type="dataset", token=hf_token):
        print(f"[{process_name}] Repository {repo_id} not found. Cannot update.")
        return

    # Skip downloading existing dataset - we'll push only new splits
    print(f"[{process_name}] Preparing to push {len(new_splits)} new splits individually...")
    
    # Prepare each split for individual pushing
    splits_to_push = []
    for split_id, df in new_splits.items():
        new_split_dataset = datasets.Dataset.from_pandas(df)
        splits_to_push.append((split_id, new_split_dataset))
        print(f"[{process_name}] Prepared split '{split_id}' with {len(new_split_dataset)} entries.")

    # Push individual splits to Hub with Retry Mechanism
    _push_splits_to_hub(splits_to_push, repo_id, hf_token, process_name)
    print(f"[{process_name}] Finished pushing new dataset splits to Hub.")


def update_dataset_with_new_images(image_df: pd.DataFrame, process_name: str = "Main", scheduler: CommitScheduler=None, dataset_dir: Path=None, jsonl_path: Path=None):
    """
    Add new images to an image HuggingFace dataset using smart approach:
    - If dataset is empty/doesn't exist: Create proper HuggingFace dataset
    - If dataset has data: Use CommitScheduler for efficient incremental updates
    
    Key Features:
    - Automatically detects empty datasets and bootstraps them
    - Uses CommitScheduler for incremental updates on existing datasets
    - Saves images as PNG files with unique names  
    - Stores metadata in JSONL format for file-based approach
    - Thread-safe with scheduler locking
    
    Args:
        image_df: DataFrame with 'id' and 'image' columns (image should be PIL Image objects)
        process_name: Name for logging and commit messages
                         
    Example:
        img_df = pd.DataFrame([{"id": "img1", "image": pil_image}])
        update_dataset_with_new_images(img_df)
    """
    print(f"\n[{process_name}] Starting image dataset update...")

    # Validate input format for image datasets
    if not hasattr(image_df, 'columns'):
        raise ValueError(f"image_df must be a pandas DataFrame with 'id' and 'image' columns, got {type(image_df)}")
    
    # Validate required columns
    required_columns = ['id', 'image', 'annotated_image']
    missing_columns = [col for col in required_columns if col not in image_df.columns]
    if missing_columns:
        raise ValueError(f"Missing required columns: {missing_columns}. Found columns: {list(image_df.columns)}")
    
    print(f"[{process_name}] Validated DataFrame with {len(image_df)} entries and columns: {list(image_df.columns)}")

    return _append_images_with_scheduler(image_df, process_name, scheduler, dataset_dir, jsonl_path)


def _append_images_with_scheduler(image_df: pd.DataFrame, process_name: str, scheduler, dataset_dir, jsonl_path):
    """
    Append images to existing dataset using CommitScheduler for efficient incremental updates.
    """

    print(f"[{process_name}] Using CommitScheduler for incremental updates...")

    print(f"[IMAGE_SCHEDULER] Created CommitScheduler for {os.environ['IMAGE_REPO_ID']} with local path: {dataset_dir}")
    
    # Process each image
    saved_count = 0
    failed_count = 0
    
    for idx, row in image_df.iterrows():
        try:
            image_id = row['id']
            image = row['image']
            annotated_image = row['annotated_image']  # Optional
            
            # Skip if image is None
            if image is None:
                print(f"[{process_name}] Skipping image {image_id}: image is None")
                failed_count += 1
                continue

            if annotated_image is None:
                print(f"[{process_name}] Warning: annotated_image is None for {image_id}")
                failed_count += 1
                continue
            
            # Generate unique filename
            unique_filename_orig = f"{uuid.uuid4()}_orig.png"
            unique_filename_ann = f"{uuid.uuid4()}_ann.png"

            image_path_orig = dataset_dir / unique_filename_orig
            image_path_ann = dataset_dir / unique_filename_ann
            
            # Save image and metadata with scheduler
            with scheduler.lock:
                # Save image file
                if hasattr(image, 'save'):
                    # PIL Image object
                    image.save(image_path_orig, format='PNG')
                elif hasattr(image, 'shape'):
                    # Numpy array
                    from PIL import Image as PILImage
                    PILImage.fromarray(image).save(image_path_orig, format='PNG')
                else:
                    print(f"[{process_name}] Warning: Unsupported image type for {image_id}: {type(image)}")
                    failed_count += 1
                    continue

                # Save annotated image file
                if hasattr(annotated_image, 'save'):
                    annotated_image.save(image_path_ann, format='PNG')
                elif hasattr(annotated_image, 'shape'):
                    from PIL import Image as PILImage
                    PILImage.fromarray(annotated_image).save(image_path_ann, format='PNG')
                else:
                    print(f"[{process_name}] Warning: Unsupported annotated_image type for {image_id}: {type(annotated_image)}")
                    failed_count += 1
                    continue
                
                # Append metadata to JSONL
                metadata = {
                    "id": image_id,
                    "file_name": unique_filename_orig,
                    "annotated_file_name": unique_filename_ann
                }

                with jsonl_path.open("a", encoding="utf-8") as f:
                    json.dump(metadata, f, ensure_ascii=False)
                    f.write("\n")
            
            saved_count += 1
            print(f"[{process_name}] Saved image {saved_count}/{len(image_df)}: {image_id} -> {unique_filename_orig}")
            
        except Exception as e:
            print(f"[{process_name}] Error processing image {image_id}: {e}")
            failed_count += 1
            continue
    
    print(f"[{process_name}] Finished image dataset update:")
    print(f"[{process_name}] - Successfully saved: {saved_count} images")
    print(f"[{process_name}] - Failed: {failed_count} images")
    print(f"[{process_name}] - Images will be automatically committed to dataset repository")
    
    if saved_count == 0:
        print(f"[{process_name}] Warning: No images were successfully saved")
        
    return saved_count, failed_count


def _push_splits_to_hub(splits_to_push: list, repo_id: str, hf_token: str, process_name: str):
    """
    Helper function to push individual splits to Hub with retry mechanism.
    
    Args:
        splits_to_push: List of (split_name, dataset) tuples
        repo_id: HuggingFace repository ID
        hf_token: HuggingFace token
        process_name: Process name for logging
    """
    max_retries = 5
    successful_splits = []
    failed_splits = []
    
    for split_name, split_dataset in splits_to_push:
        print(f"[{process_name}] Pushing split '{split_name}' with {len(split_dataset)} entries...")
        
        for attempt in range(max_retries):
            try:
                print(f"[{process_name}] Pushing split '{split_name}' (Attempt {attempt + 1}/{max_retries})...")
                split_dataset.push_to_hub(
                    repo_id=repo_id,
                    split=split_name,  # This preserves existing splits
                    token=hf_token,
                    commit_message=f"feat: Add split '{split_name}' from {process_name} with {len(split_dataset)} entries"
                )
                print(f"[{process_name}] Split '{split_name}' pushed successfully on attempt {attempt + 1}.")
                successful_splits.append(split_name)
                break  # Exit retry loop on success
            except Exception as e:
                print(f"[{process_name}] Split '{split_name}' push attempt {attempt + 1} failed: {e}")
                if attempt < max_retries - 1:
                    wait_time = 5
                    print(f"[{process_name}] Waiting for {wait_time} seconds before retrying...")
                    time.sleep(wait_time)
                else:
                    print(f"[{process_name}] All {max_retries} push attempts failed for split '{split_name}'.")
                    failed_splits.append(split_name)
    
    # Report results
    if successful_splits:
        print(f"[{process_name}] Successfully pushed splits: {successful_splits}")
    if failed_splits:
        print(f"[{process_name}] Failed to push splits: {failed_splits}")