File size: 12,717 Bytes
5c97387
 
97353a3
 
d6558bb
5c97387
 
 
 
 
 
 
 
97353a3
5c97387
97353a3
 
4425935
97353a3
 
 
4425935
 
 
97353a3
 
 
 
d6558bb
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
97353a3
 
 
 
 
 
 
 
4425935
97353a3
 
a76c50f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4425935
a76c50f
 
 
4425935
a76c50f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4425935
a76c50f
 
 
 
 
 
 
 
 
 
 
4425935
a76c50f
 
4425935
a76c50f
4425935
a76c50f
4425935
 
 
a76c50f
 
 
 
 
 
 
 
4425935
 
 
97353a3
 
 
 
5c97387
 
a76c50f
 
5c97387
a76c50f
4425935
a76c50f
97353a3
 
 
a76c50f
97353a3
 
 
 
a76c50f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
5c97387
 
a76c50f
 
 
4425935
5c97387
 
 
 
4425935
 
a76c50f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
5c97387
 
4425935
a76c50f
4425935
 
 
 
a76c50f
5c97387
d6558bb
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
97353a3
a76c50f
97353a3
5c97387
 
97353a3
5c97387
 
a76c50f
5c97387
4425935
a76c50f
5c97387
a76c50f
97353a3
 
 
d6558bb
 
 
 
 
 
 
97353a3
 
a76c50f
d6558bb
a76c50f
97353a3
 
 
 
4425935
97353a3
4425935
a76c50f
97353a3
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
import json
import logging
import datasets
from datasets import load_dataset, get_dataset_config_names, get_dataset_infos
from huggingface_hub import HfApi, DatasetCard, DatasetCardData

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class DatasetCommandCenter:
    def __init__(self, token=None):
        self.token = token

    def get_dataset_metadata(self, dataset_id):
        try:
            try:
                configs = get_dataset_config_names(dataset_id, token=self.token)
            except:
                configs = ['default']
            try:
                infos = get_dataset_infos(dataset_id, token=self.token)
                first_conf = configs[0]
                if first_conf in infos:
                    splits = list(infos[first_conf].splits.keys())
                else:
                    splits = list(infos.values())[0].splits.keys()
            except:
                splits = ['train', 'test', 'validation']
            # license    
            try:
                configs = get_dataset_config_names(dataset_id, token=self.token)
            except:
                configs = ['default']
            
            license_name = "unknown"
            try:
                infos = get_dataset_infos(dataset_id, token=self.token)
                # Try to grab license from first config
                first = list(infos.values())[0]
                license_name = first.license or "unknown"
            except:
                pass
    
            return {
                "status": "success", 
                "configs": configs, 
                # We assume user will pick splits later, just return configs + license hint
                "license_detected": license_name
            }
        except Exception as e:
            return {"status": "error", "message": str(e)}

    def get_splits_for_config(self, dataset_id, config_name):
        try:
            infos = get_dataset_infos(dataset_id, config_name=config_name, token=self.token)
            splits = list(infos[config_name].splits.keys())
            return {"status": "success", "splits": splits}
        except:
            return {"status": "success", "splits": ['train', 'test', 'validation']}

    # --- HELPER: Recursive JSON/Dot Notation Getter ---
    def _get_value_by_path(self, obj, path):
        if not path: return obj
        keys = path.split('.')
        current = obj
        
        for key in keys:
            # Auto-parse JSON string if encountered
            if isinstance(current, str):
                s = current.strip()
                if (s.startswith('{') and s.endswith('}')) or (s.startswith('[') and s.endswith(']')):
                    try:
                        current = json.loads(s)
                    except:
                        pass 

            if isinstance(current, dict) and key in current:
                current = current[key]
            else:
                return None
        return current

    # --- HELPER: List Search Logic ---
    def _extract_from_list_logic(self, row, source_col, filter_key, filter_val, target_path):
        """
        Logic: Look inside row[source_col] (which is a list).
        Find first item where item[filter_key] == filter_val.
        Then extract item[target_path].
        """
        # 1. Get the list (handling JSON string if needed)
        data = row.get(source_col)
        if isinstance(data, str):
            try:
                data = json.loads(data)
            except:
                return None
        
        if not isinstance(data, list):
            return None

        # 2. Search the list
        matched_item = None
        for item in data:
            # We treat values as strings for comparison to be safe
            if str(item.get(filter_key, '')) == str(filter_val):
                matched_item = item
                break
        
        if matched_item:
            # 3. Extract the target (supporting nested json parsing via dot notation)
            # e.g. target_path = "content.analysis"
            return self._get_value_by_path(matched_item, target_path)
        
        return None

    def _flatten_schema(self, obj, parent='', visited=None):
        if visited is None: visited = set()
        items = []
        
        # Avoid infinite recursion
        if id(obj) in visited: return []
        visited.add(id(obj))

        # Handle JSON strings
        if isinstance(obj, str):
            s = obj.strip()
            if (s.startswith('{') and s.endswith('}')) or (s.startswith('[') and s.endswith(']')):
                try:
                    obj = json.loads(s)
                except:
                    pass

        if isinstance(obj, dict):
            for k, v in obj.items():
                full_key = f"{parent}.{k}" if parent else k
                items.append((full_key, type(v).__name__))
                items.extend(self._flatten_schema(v, full_key, visited))
        elif isinstance(obj, list) and len(obj) > 0:
            # For lists, we just peek at the first item to guess schema
            full_key = f"{parent}[]" if parent else "[]"
            items.append((parent, "List")) # Mark the parent as a List
            items.extend(self._flatten_schema(obj[0], full_key, visited))
            
        return items

    def inspect_dataset(self, dataset_id, config, split):
        try:
            conf = config if config != 'default' else None
            ds_stream = load_dataset(dataset_id, name=conf, split=split, streaming=True, token=self.token)
            
            sample_rows = []
            schema_map = {} # stores { "col_name": { "is_list": bool, "keys": [] } }

            for i, row in enumerate(ds_stream):
                if i >= 10: break
                
                # Create clean sample for UI
                clean_row = {}
                for k, v in row.items():
                    if not isinstance(v, (str, int, float, bool, list, dict, type(None))):
                        clean_row[k] = str(v)
                    else:
                        clean_row[k] = v
                sample_rows.append(clean_row)

                # Analyze Schema
                for k, v in row.items():
                    if k not in schema_map:
                        schema_map[k] = {"is_list": False, "keys": set()}
                    
                    # Check if it's a list (or json-string list)
                    val = v
                    if isinstance(val, str):
                        try:
                            val = json.loads(val)
                        except: pass
                    
                    if isinstance(val, list):
                        schema_map[k]["is_list"] = True
                        if len(val) > 0 and isinstance(val[0], dict):
                            schema_map[k]["keys"].update(val[0].keys())
                    elif isinstance(val, dict):
                        schema_map[k]["keys"].update(val.keys())

            # Format schema for UI
            formatted_schema = {}
            for k, info in schema_map.items():
                formatted_schema[k] = {
                    "type": "List" if info["is_list"] else "Object",
                    "keys": list(info["keys"])
                }

            return {
                "status": "success", 
                "samples": sample_rows, 
                "schema": formatted_schema,
                "dataset_id": dataset_id
            }
        except Exception as e:
            return {"status": "error", "message": str(e)}

    def _apply_projection(self, row, recipe):
        new_row = {}
        for col_def in recipe['columns']:
            t_type = col_def.get('type', 'simple')
            
            if t_type == 'simple':
                # Standard Dot Notation
                new_row[col_def['name']] = self._get_value_by_path(row, col_def['source'])
                
            elif t_type == 'list_search':
                # GET x WHERE y=z
                val = self._extract_from_list_logic(
                    row, 
                    col_def['source'], 
                    col_def['filter_key'], 
                    col_def['filter_val'], 
                    col_def['target_key']
                )
                new_row[col_def['name']] = val
                
            elif t_type == 'python':
                # Advanced Python Eval
                try:
                    context = row.copy()
                    # We inject 'json' module into context for user scripts
                    context['json'] = json 
                    val = eval(col_def['expression'], {}, context)
                    new_row[col_def['name']] = val
                except:
                    new_row[col_def['name']] = None
                    
        return new_row

    def _passes_filter(self, row, filter_str):
        if not filter_str: return True
        try:
            context = row.copy()
            return eval(filter_str, {}, context)
        except:
            return False

            
    def _generate_card(self, source_id, target_id, recipe, license_name):
        """
        Generates a README.md with YAML metadata and a report of operations.
        """
        
        # 1. YAML Metadata
        card_data = DatasetCardData(
            language="en",
            license=license_name,
            tags=["dataset-command-center", "etl", "generated-dataset"],
            base_model=source_id, # Linking source
        )
        
        # 2. Description & Recipe Table
        content = f"""
# {target_id.split('/')[-1]}

This dataset is a transformation of [{source_id}](https://huggingface.co/datasets/{source_id}).
It was generated using the **Hugging Face Dataset Command Center**.

## Transformation Recipe

The following operations were applied to the source data:

| Target Column | Source | Type | Logic / Filter |
|---------------|--------|------|----------------|
"""
        
        for col in recipe['columns']:
            c_type = col.get('type', 'simple')
            c_name = col['name']
            c_src = col.get('source', '-')
            
            if c_type == 'simple':
                logic = "Direct Mapping"
            elif c_type == 'list_search':
                logic = f"Get `{col['target_key']}` where `{col['filter_key']} == {col['filter_val']}`"
            elif c_type == 'python':
                logic = f"`{col.get('expression')}`"
            else:
                logic = "-"
            
            content += f"| **{c_name}** | `{c_src}` | {c_type} | {logic} |\n"

        if recipe.get('filter_rule'):
            content += f"\n### Row Filtering\n**Filter Applied:** `{recipe['filter_rule']}`\n"

        content += f"\n## Original License\nThis dataset inherits the license: `{license_name}` from the source."

        card = DatasetCard.from_template(card_data, content=content)
        return card

    def process_and_push(self, source_id, config, split, target_id, recipe, max_rows=None):
        logger.info(f"Job started: {source_id}")
        conf = config if config != 'default' else None
        
        def gen():
            ds_stream = load_dataset(source_id, name=conf, split=split, streaming=True, token=self.token)
            count = 0
            for row in ds_stream:
                if max_rows and count >= int(max_rows): break
                
                if self._passes_filter(row, recipe.get('filter_rule')):
                    yield self._apply_projection(row, recipe)
                    count += 1
        
        try:
            new_dataset = datasets.Dataset.from_generator(gen)
            new_dataset.push_to_hub(target_id, token=self.token)
            # 2. GENERATE & PUSH CARD
            try:
                card = self._generate_card(source_id, target_id, recipe, new_license or "unknown")
                card.push_to_hub(target_id, token=self.token)
            except Exception as e:
                logger.warning(f"Could not push dataset card: {e}")

            return {"status": "success", "rows_processed": len(new_dataset)}
        except Exception as e:
            return {"status": "error", "message": str(e)}

            
    def preview_transform(self, dataset_id, config, split, recipe):
        conf = config if config != 'default' else None
        ds_stream = load_dataset(dataset_id, name=conf, split=split, streaming=True, token=self.token)
        processed = []
        for row in ds_stream:
            if len(processed) >= 5: break
            if self._passes_filter(row, recipe.get('filter_rule')):
                processed.append(self._apply_projection(row, recipe))
        return processed