broadfield-dev commited on
Commit
cca5bb1
·
verified ·
1 Parent(s): 2f3537a

Update processor.py

Browse files
Files changed (1) hide show
  1. processor.py +41 -59
processor.py CHANGED
@@ -21,8 +21,8 @@ class DatasetCommandCenter:
21
 
22
  def get_dataset_metadata(self, dataset_id):
23
  """
24
- Robustly fetches Configs and Splits.
25
- Handles 404 errors gracefully if metadata files are missing.
26
  """
27
  configs = ['default']
28
  splits = ['train', 'test', 'validation']
@@ -35,12 +35,11 @@ class DatasetCommandCenter:
35
  if found_configs:
36
  configs = found_configs
37
  except Exception:
38
- pass # Keep default
39
 
40
  # 2. Fetch Metadata (Splits & License)
41
  try:
42
  selected = configs[0]
43
- # This API call can fail on some datasets, so we wrap it safely
44
  infos = get_dataset_infos(dataset_id, token=self.token)
45
 
46
  info = None
@@ -55,7 +54,7 @@ class DatasetCommandCenter:
55
  splits = list(info.splits.keys())
56
  license_name = info.license or "unknown"
57
  except Exception:
58
- pass # Keep defaults if metadata fails
59
 
60
  return {
61
  "status": "success",
@@ -67,9 +66,6 @@ class DatasetCommandCenter:
67
  return {"status": "error", "message": str(e)}
68
 
69
  def get_splits_for_config(self, dataset_id, config_name):
70
- """
71
- Updates the Split dropdown when the user changes the Config.
72
- """
73
  try:
74
  infos = get_dataset_infos(dataset_id, config_name=config_name, token=self.token)
75
  if config_name in infos:
@@ -85,7 +81,7 @@ class DatasetCommandCenter:
85
  def _sanitize_for_json(self, obj):
86
  """
87
  Recursively cleans data for JSON serialization.
88
- CRITICAL FIX: Prevents 'Preview' crashes caused by NaN, Infinity, or Timestamps.
89
  """
90
  if isinstance(obj, float):
91
  if math.isnan(obj) or math.isinf(obj):
@@ -98,17 +94,16 @@ class DatasetCommandCenter:
98
  elif isinstance(obj, (str, int, bool, type(None))):
99
  return obj
100
  else:
101
- # Convert complex objects (Images, Dates) to string
102
  return str(obj)
103
 
104
  def _flatten_object(self, obj, parent_key='', sep='.'):
105
  """
106
- Recursively finds all keys in nested dicts or JSON strings
107
- to populate the 'Simple Path' dropdown in the UI.
108
  """
109
  items = {}
110
 
111
- # Transparently parse JSON strings
112
  if isinstance(obj, str):
113
  s = obj.strip()
114
  if (s.startswith('{') and s.endswith('}')) or (s.startswith('[') and s.endswith(']')):
@@ -122,39 +117,34 @@ class DatasetCommandCenter:
122
  new_key = f"{parent_key}{sep}{k}" if parent_key else k
123
  items.update(self._flatten_object(v, new_key, sep=sep))
124
  elif isinstance(obj, list):
125
- # We mark lists but do not recurse infinitely
126
  new_key = f"{parent_key}" if parent_key else "list_content"
127
  items[new_key] = "List"
128
  else:
129
- # Leaf node
130
  items[parent_key] = type(obj).__name__
131
 
132
  return items
133
 
134
  def inspect_dataset(self, dataset_id, config, split):
135
- """
136
- Scans the first 10 rows to build a Schema Tree for the UI.
137
- """
138
  try:
139
  conf = config if config != 'default' else None
140
  ds_stream = load_dataset(dataset_id, name=conf, split=split, streaming=True, token=self.token)
141
 
142
  sample_rows = []
143
  available_paths = set()
144
- schema_map = {} # Used for List Mode detection
145
 
146
  for i, row in enumerate(ds_stream):
147
  if i >= 10: break
148
 
149
- # 1. Clean row for UI Preview (Handle NaN/Objects)
150
  clean_row = self._sanitize_for_json(row)
151
  sample_rows.append(clean_row)
152
 
153
- # 2. Deep Flattening for "Simple Path" dropdowns
154
  flattened = self._flatten_object(row)
155
  available_paths.update(flattened.keys())
156
 
157
- # 3. Top Level Analysis for "List Mode" detection
158
  for k, v in row.items():
159
  if k not in schema_map:
160
  schema_map[k] = {"type": "Object"}
@@ -167,7 +157,6 @@ class DatasetCommandCenter:
167
  if isinstance(val, list):
168
  schema_map[k]["type"] = "List"
169
 
170
- # Reconstruct Schema Tree for UI grouping
171
  sorted_paths = sorted(list(available_paths))
172
  schema_tree = {}
173
  for path in sorted_paths:
@@ -179,8 +168,8 @@ class DatasetCommandCenter:
179
  return {
180
  "status": "success",
181
  "samples": sample_rows,
182
- "schema_tree": schema_tree, # Used by Simple Path Dropdown
183
- "schema": schema_map, # Used by List Mode Dropdown
184
  "dataset_id": dataset_id
185
  }
186
  except Exception as e:
@@ -192,32 +181,38 @@ class DatasetCommandCenter:
192
 
193
  def _get_value_by_path(self, obj, path):
194
  """
195
- Navigates dot notation (meta.user.id), automatically parsing
196
- JSON strings if encountered along the path.
 
197
  """
198
  if not path: return obj
199
  keys = path.split('.')
200
  current = obj
201
 
202
- for key in keys:
203
- # Auto-parse JSON string if encountered
204
- if isinstance(current, str):
 
 
 
 
 
 
 
 
 
205
  s = current.strip()
206
  if (s.startswith('{') and s.endswith('}')) or (s.startswith('[') and s.endswith(']')):
207
  try:
208
  current = json.loads(s)
209
  except:
210
- pass
211
-
212
- if isinstance(current, dict) and key in current:
213
- current = current[key]
214
- else:
215
- return None # Path broken
216
  return current
217
 
218
  def _extract_from_list_logic(self, row, source_col, filter_key, filter_val, target_path):
219
  """
220
- Logic for: FROM source_col FIND ITEM WHERE filter_key == filter_val EXTRACT target_path
221
  """
222
  data = row.get(source_col)
223
 
@@ -233,7 +228,6 @@ class DatasetCommandCenter:
233
 
234
  matched_item = None
235
  for item in data:
236
- # String comparison for safety
237
  if str(item.get(filter_key, '')) == str(filter_val):
238
  matched_item = item
239
  break
@@ -244,13 +238,9 @@ class DatasetCommandCenter:
244
  return None
245
 
246
  def _apply_projection(self, row, recipe):
247
- """
248
- Builds the new row based on the recipe.
249
- Raises ValueError if user Python code fails (Fail Fast).
250
- """
251
  new_row = {}
252
 
253
- # Setup Eval Context (Variables available in Python Mode)
254
  eval_context = row.copy()
255
  eval_context['row'] = row
256
  eval_context['json'] = json
@@ -274,13 +264,11 @@ class DatasetCommandCenter:
274
  )
275
 
276
  elif t_type == 'python':
277
- # Execute user code
278
- expression = col_def['expression']
279
- val = eval(expression, {}, eval_context)
280
  new_row[target_col] = val
281
 
282
  except Exception as e:
283
- # Fail Fast: Stop the generator immediately if a column fails
284
  raise ValueError(f"Column '{target_col}' failed: {str(e)}")
285
 
286
  return new_row
@@ -290,13 +278,10 @@ class DatasetCommandCenter:
290
  # ==========================================
291
 
292
  def _generate_card(self, source_id, target_id, recipe, license_name):
293
- """
294
- Creates a high-quality README.md with a Markdown table of operations.
295
- """
296
  card_data = DatasetCardData(
297
  language="en",
298
  license=license_name,
299
- tags=["dataset-command-center", "etl", "generated-dataset"],
300
  base_model=source_id,
301
  )
302
 
@@ -368,17 +353,16 @@ The following operations were applied to the source data:
368
  yield self._apply_projection(row, recipe)
369
  count += 1
370
  except ValueError as ve:
371
- # Pass the specific column error up
372
  raise ve
373
  except Exception as e:
374
  raise ValueError(f"Unexpected crash on row {i}: {e}")
375
 
376
  try:
377
- # 1. Process & Push Data
378
  new_dataset = datasets.Dataset.from_generator(gen)
379
  new_dataset.push_to_hub(target_id, token=self.token)
380
 
381
- # 2. Generate & Push Card
382
  try:
383
  card = self._generate_card(source_id, target_id, recipe, new_license or "unknown")
384
  card.push_to_hub(target_id, token=self.token)
@@ -405,7 +389,7 @@ The following operations were applied to the source data:
405
  for i, row in enumerate(ds_stream):
406
  if len(processed) >= 5: break
407
 
408
- # Check Filter
409
  passed = True
410
  if recipe.get('filter_rule'):
411
  try:
@@ -416,19 +400,17 @@ The following operations were applied to the source data:
416
  if not eval(recipe['filter_rule'], {}, ctx):
417
  passed = False
418
  except:
419
- passed = False # Skip invalid rows in preview
420
 
421
  if passed:
422
  try:
423
  new_row = self._apply_projection(row, recipe)
424
- # CRITICAL: Sanitize for JSON (handles NaNs, Dates, Images)
425
  clean_new_row = self._sanitize_for_json(new_row)
426
  processed.append(clean_new_row)
427
  except Exception as e:
428
- # In preview, we want to see the error, not crash
429
  processed.append({"_preview_error": f"Error: {str(e)}"})
430
 
431
  return processed
432
  except Exception as e:
433
- # Return global error if loading fails
434
  raise e
 
21
 
22
  def get_dataset_metadata(self, dataset_id):
23
  """
24
+ Fetches Configs, Splits, and License info.
25
+ Gracefully handles missing metadata/404s.
26
  """
27
  configs = ['default']
28
  splits = ['train', 'test', 'validation']
 
35
  if found_configs:
36
  configs = found_configs
37
  except Exception:
38
+ pass
39
 
40
  # 2. Fetch Metadata (Splits & License)
41
  try:
42
  selected = configs[0]
 
43
  infos = get_dataset_infos(dataset_id, token=self.token)
44
 
45
  info = None
 
54
  splits = list(info.splits.keys())
55
  license_name = info.license or "unknown"
56
  except Exception:
57
+ pass
58
 
59
  return {
60
  "status": "success",
 
66
  return {"status": "error", "message": str(e)}
67
 
68
  def get_splits_for_config(self, dataset_id, config_name):
 
 
 
69
  try:
70
  infos = get_dataset_infos(dataset_id, config_name=config_name, token=self.token)
71
  if config_name in infos:
 
81
  def _sanitize_for_json(self, obj):
82
  """
83
  Recursively cleans data for JSON serialization.
84
+ FIXES: 'Preview' crashes caused by NaN, Infinity, or complex Objects.
85
  """
86
  if isinstance(obj, float):
87
  if math.isnan(obj) or math.isinf(obj):
 
94
  elif isinstance(obj, (str, int, bool, type(None))):
95
  return obj
96
  else:
 
97
  return str(obj)
98
 
99
  def _flatten_object(self, obj, parent_key='', sep='.'):
100
  """
101
+ Recursively finds all keys in nested dicts/JSON to populate
102
+ the 'Simple Path' dropdown in the UI.
103
  """
104
  items = {}
105
 
106
+ # Transparently parse JSON strings for Schema Discovery
107
  if isinstance(obj, str):
108
  s = obj.strip()
109
  if (s.startswith('{') and s.endswith('}')) or (s.startswith('[') and s.endswith(']')):
 
117
  new_key = f"{parent_key}{sep}{k}" if parent_key else k
118
  items.update(self._flatten_object(v, new_key, sep=sep))
119
  elif isinstance(obj, list):
 
120
  new_key = f"{parent_key}" if parent_key else "list_content"
121
  items[new_key] = "List"
122
  else:
 
123
  items[parent_key] = type(obj).__name__
124
 
125
  return items
126
 
127
  def inspect_dataset(self, dataset_id, config, split):
 
 
 
128
  try:
129
  conf = config if config != 'default' else None
130
  ds_stream = load_dataset(dataset_id, name=conf, split=split, streaming=True, token=self.token)
131
 
132
  sample_rows = []
133
  available_paths = set()
134
+ schema_map = {}
135
 
136
  for i, row in enumerate(ds_stream):
137
  if i >= 10: break
138
 
139
+ # Clean row for UI Preview
140
  clean_row = self._sanitize_for_json(row)
141
  sample_rows.append(clean_row)
142
 
143
+ # Schema Discovery
144
  flattened = self._flatten_object(row)
145
  available_paths.update(flattened.keys())
146
 
147
+ # List Mode Detection
148
  for k, v in row.items():
149
  if k not in schema_map:
150
  schema_map[k] = {"type": "Object"}
 
157
  if isinstance(val, list):
158
  schema_map[k]["type"] = "List"
159
 
 
160
  sorted_paths = sorted(list(available_paths))
161
  schema_tree = {}
162
  for path in sorted_paths:
 
168
  return {
169
  "status": "success",
170
  "samples": sample_rows,
171
+ "schema_tree": schema_tree,
172
+ "schema": schema_map,
173
  "dataset_id": dataset_id
174
  }
175
  except Exception as e:
 
181
 
182
  def _get_value_by_path(self, obj, path):
183
  """
184
+ Extracts values using dot notation.
185
+ FIX: Lazy Parsing. Only parses JSON strings if we strictly need to
186
+ traverse deeper. This preserves raw strings for top-level columns.
187
  """
188
  if not path: return obj
189
  keys = path.split('.')
190
  current = obj
191
 
192
+ for i, key in enumerate(keys):
193
+ # 1. Access Key
194
+ if isinstance(current, dict) and key in current:
195
+ current = current[key]
196
+ else:
197
+ return None
198
+
199
+ # 2. Traverse Deeper?
200
+ is_last_key = (i == len(keys) - 1)
201
+
202
+ # Only parse if we are NOT at the end (we need to go inside)
203
+ if not is_last_key and isinstance(current, str):
204
  s = current.strip()
205
  if (s.startswith('{') and s.endswith('}')) or (s.startswith('[') and s.endswith(']')):
206
  try:
207
  current = json.loads(s)
208
  except:
209
+ return None # Broken JSON in path
210
+
 
 
 
 
211
  return current
212
 
213
  def _extract_from_list_logic(self, row, source_col, filter_key, filter_val, target_path):
214
  """
215
+ FROM source_col FIND ITEM WHERE filter_key == filter_val EXTRACT target_path
216
  """
217
  data = row.get(source_col)
218
 
 
228
 
229
  matched_item = None
230
  for item in data:
 
231
  if str(item.get(filter_key, '')) == str(filter_val):
232
  matched_item = item
233
  break
 
238
  return None
239
 
240
  def _apply_projection(self, row, recipe):
 
 
 
 
241
  new_row = {}
242
 
243
+ # Eval Context
244
  eval_context = row.copy()
245
  eval_context['row'] = row
246
  eval_context['json'] = json
 
264
  )
265
 
266
  elif t_type == 'python':
267
+ val = eval(col_def['expression'], {}, eval_context)
 
 
268
  new_row[target_col] = val
269
 
270
  except Exception as e:
271
+ # Fail Fast
272
  raise ValueError(f"Column '{target_col}' failed: {str(e)}")
273
 
274
  return new_row
 
278
  # ==========================================
279
 
280
  def _generate_card(self, source_id, target_id, recipe, license_name):
 
 
 
281
  card_data = DatasetCardData(
282
  language="en",
283
  license=license_name,
284
+ tags=["dataset-command-center", "etl"],
285
  base_model=source_id,
286
  )
287
 
 
353
  yield self._apply_projection(row, recipe)
354
  count += 1
355
  except ValueError as ve:
 
356
  raise ve
357
  except Exception as e:
358
  raise ValueError(f"Unexpected crash on row {i}: {e}")
359
 
360
  try:
361
+ # 1. Process & Push
362
  new_dataset = datasets.Dataset.from_generator(gen)
363
  new_dataset.push_to_hub(target_id, token=self.token)
364
 
365
+ # 2. Card
366
  try:
367
  card = self._generate_card(source_id, target_id, recipe, new_license or "unknown")
368
  card.push_to_hub(target_id, token=self.token)
 
389
  for i, row in enumerate(ds_stream):
390
  if len(processed) >= 5: break
391
 
392
+ # Filter Check
393
  passed = True
394
  if recipe.get('filter_rule'):
395
  try:
 
400
  if not eval(recipe['filter_rule'], {}, ctx):
401
  passed = False
402
  except:
403
+ passed = False
404
 
405
  if passed:
406
  try:
407
  new_row = self._apply_projection(row, recipe)
408
+ # Sanitize to prevent JSON crashes (NaN, Infinity, Images)
409
  clean_new_row = self._sanitize_for_json(new_row)
410
  processed.append(clean_new_row)
411
  except Exception as e:
 
412
  processed.append({"_preview_error": f"Error: {str(e)}"})
413
 
414
  return processed
415
  except Exception as e:
 
416
  raise e