broadfield-dev commited on
Commit
2f3537a
·
verified ·
1 Parent(s): 5015673

Update processor.py

Browse files
Files changed (1) hide show
  1. processor.py +274 -95
processor.py CHANGED
@@ -2,10 +2,11 @@ import json
2
  import logging
3
  import datasets
4
  import math
 
5
  from datasets import load_dataset, get_dataset_config_names, get_dataset_infos
6
  from huggingface_hub import HfApi, DatasetCard, DatasetCardData
7
- import re
8
 
 
9
  logging.basicConfig(level=logging.INFO)
10
  logger = logging.getLogger(__name__)
11
 
@@ -14,41 +15,78 @@ class DatasetCommandCenter:
14
  self.token = token
15
  self.api = HfApi(token=token)
16
 
17
- # --- 1. INSPECTION ---
 
 
18
 
19
  def get_dataset_metadata(self, dataset_id):
 
 
 
 
20
  configs = ['default']
21
  splits = ['train', 'test', 'validation']
22
  license_name = "unknown"
 
23
  try:
 
24
  try:
25
- c = get_dataset_config_names(dataset_id, token=self.token)
26
- if c: configs = c
27
- except: pass
 
 
28
 
 
29
  try:
 
 
30
  infos = get_dataset_infos(dataset_id, token=self.token)
31
- sel = configs[0]
32
- info = infos.get(sel) or infos.get('default') or (list(infos.values())[0] if infos else None)
 
 
 
 
 
 
 
33
  if info:
34
  splits = list(info.splits.keys())
35
  license_name = info.license or "unknown"
36
- except: pass
37
-
38
- return {"status": "success", "configs": configs, "splits": splits, "license_detected": license_name}
 
 
 
 
 
 
39
  except Exception as e:
40
  return {"status": "error", "message": str(e)}
41
 
42
  def get_splits_for_config(self, dataset_id, config_name):
 
 
 
43
  try:
44
  infos = get_dataset_infos(dataset_id, config_name=config_name, token=self.token)
45
- splits = list(infos[config_name].splits.keys())
 
 
 
 
 
46
  return {"status": "success", "splits": splits}
47
  except:
48
  return {"status": "success", "splits": ['train', 'test', 'validation']}
49
 
50
  def _sanitize_for_json(self, obj):
51
- """Recursively cleans data for JSON serialization (Fixes NaN crash)."""
 
 
 
52
  if isinstance(obj, float):
53
  if math.isnan(obj) or math.isinf(obj):
54
  return None
@@ -60,196 +98,337 @@ class DatasetCommandCenter:
60
  elif isinstance(obj, (str, int, bool, type(None))):
61
  return obj
62
  else:
 
63
  return str(obj)
64
 
65
  def _flatten_object(self, obj, parent_key='', sep='.'):
 
 
 
 
66
  items = {}
 
 
67
  if isinstance(obj, str):
68
  s = obj.strip()
69
  if (s.startswith('{') and s.endswith('}')) or (s.startswith('[') and s.endswith(']')):
70
- try: obj = json.loads(s)
71
- except: pass
 
 
72
 
73
  if isinstance(obj, dict):
74
  for k, v in obj.items():
75
  new_key = f"{parent_key}{sep}{k}" if parent_key else k
76
- items.update(self._flatten_object(v, new_key, sep))
77
  elif isinstance(obj, list):
78
- items[parent_key or "list"] = "List"
 
 
79
  else:
 
80
  items[parent_key] = type(obj).__name__
 
81
  return items
82
 
83
  def inspect_dataset(self, dataset_id, config, split):
 
 
 
84
  try:
85
  conf = config if config != 'default' else None
86
  ds_stream = load_dataset(dataset_id, name=conf, split=split, streaming=True, token=self.token)
87
 
88
  sample_rows = []
89
  available_paths = set()
90
- schema_map = {}
91
 
92
  for i, row in enumerate(ds_stream):
93
  if i >= 10: break
94
 
95
- # Sanitize entire row to prevent JSON crash on UI
96
  clean_row = self._sanitize_for_json(row)
97
  sample_rows.append(clean_row)
98
 
99
- # Schema Discovery
100
  flattened = self._flatten_object(row)
101
  available_paths.update(flattened.keys())
102
 
103
- # List Detection
104
  for k, v in row.items():
105
- if k not in schema_map: schema_map[k] = {"type": "Object"}
 
 
106
  val = v
107
  if isinstance(val, str):
108
  try: val = json.loads(val)
109
  except: pass
110
- if isinstance(val, list): schema_map[k]["type"] = "List"
 
 
111
 
 
112
  sorted_paths = sorted(list(available_paths))
113
  schema_tree = {}
114
  for path in sorted_paths:
115
  root = path.split('.')[0]
116
- if root not in schema_tree: schema_tree[root] = []
 
117
  schema_tree[root].append(path)
118
 
119
  return {
120
  "status": "success",
121
  "samples": sample_rows,
122
- "schema_tree": schema_tree,
123
- "schema": schema_map,
124
  "dataset_id": dataset_id
125
  }
126
  except Exception as e:
127
  return {"status": "error", "message": str(e)}
128
 
129
- # --- 2. LOGIC ---
 
 
130
 
131
  def _get_value_by_path(self, obj, path):
 
 
 
 
132
  if not path: return obj
133
  keys = path.split('.')
134
  current = obj
 
135
  for key in keys:
 
136
  if isinstance(current, str):
137
  s = current.strip()
138
  if (s.startswith('{') and s.endswith('}')) or (s.startswith('[') and s.endswith(']')):
139
- try: current = json.loads(s)
140
- except: pass
 
 
 
141
  if isinstance(current, dict) and key in current:
142
  current = current[key]
143
- else: return None
 
144
  return current
145
 
146
  def _extract_from_list_logic(self, row, source_col, filter_key, filter_val, target_path):
 
 
 
147
  data = row.get(source_col)
 
 
148
  if isinstance(data, str):
149
- try: data = json.loads(data)
150
- except: return None
151
- if not isinstance(data, list): return None
 
 
 
 
152
 
153
- matched = None
154
  for item in data:
 
155
  if str(item.get(filter_key, '')) == str(filter_val):
156
- matched = item
157
  break
158
- if matched: return self._get_value_by_path(matched, target_path)
 
 
 
159
  return None
160
 
161
  def _apply_projection(self, row, recipe):
 
 
 
 
162
  new_row = {}
163
- # Context
 
164
  eval_context = row.copy()
165
  eval_context['row'] = row
166
  eval_context['json'] = json
167
  eval_context['re'] = re
168
 
169
- for col in recipe['columns']:
 
 
 
170
  try:
171
- c_type = col.get('type', 'simple')
172
- name = col['name']
173
- if c_type == 'simple':
174
- new_row[name] = self._get_value_by_path(row, col['source'])
175
- elif c_type == 'list_search':
176
- new_row[name] = self._extract_from_list_logic(row, col['source'], col['filter_key'], col['filter_val'], col['target_key'])
177
- elif c_type == 'python':
178
- new_row[name] = eval(col['expression'], {}, eval_context)
 
 
 
 
 
 
 
 
 
 
179
  except Exception as e:
180
- raise ValueError(f"Column '{col['name']}' error: {e}")
 
 
181
  return new_row
182
 
183
- # --- 3. PREVIEW & PUSH ---
184
-
185
- def preview_transform(self, dataset_id, config, split, recipe):
186
- conf = config if config != 'default' else None
187
- try:
188
- ds = load_dataset(dataset_id, name=conf, split=split, streaming=True, token=self.token)
189
- out = []
190
- for i, row in enumerate(ds):
191
- if len(out) >= 5: break
192
-
193
- # Filter
194
- if recipe.get('filter_rule'):
195
- try:
196
- ctx = row.copy()
197
- ctx['row'] = row
198
- ctx['json'] = json
199
- ctx['re'] = re
200
- if not eval(recipe['filter_rule'], {}, ctx): continue
201
- except: continue # Skip crashing filters in preview
202
-
203
- try:
204
- # Apply & Sanitize
205
- proj = self._apply_projection(row, recipe)
206
- out.append(self._sanitize_for_json(proj))
207
- except Exception as e:
208
- out.append({"_preview_error": str(e)})
209
- return out
210
- except Exception as e:
211
- raise e
212
 
213
  def _generate_card(self, source_id, target_id, recipe, license_name):
214
- content = f"# {target_id}\nDerived from [{source_id}](https://huggingface.co/datasets/{source_id}).\n\n## Recipe\n"
215
- for c in recipe['columns']:
216
- content += f"- **{c['name']}**: {c.get('type')} ({c.get('source') or c.get('expression')})\n"
217
- content += f"\n**License:** {license_name}"
218
- return DatasetCard.from_template(DatasetCardData(license=license_name, tags=["etl"]), content=content)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
219
 
220
  def process_and_push(self, source_id, config, split, target_id, recipe, max_rows=None, new_license=None):
221
- logger.info(f"Pushing {source_id} -> {target_id}")
222
  conf = config if config != 'default' else None
223
 
224
  def gen():
225
- ds = load_dataset(source_id, name=conf, split=split, streaming=True, token=self.token)
226
  count = 0
227
- for i, row in enumerate(ds):
228
- if max_rows and count >= int(max_rows): break
229
-
230
- # Filter
 
231
  if recipe.get('filter_rule'):
232
  try:
233
  ctx = row.copy()
234
  ctx['row'] = row
235
  ctx['json'] = json
236
  ctx['re'] = re
237
- if not eval(recipe['filter_rule'], {}, ctx): continue
238
- except Exception as e: raise ValueError(f"Filter error row {i}: {e}")
 
 
239
 
240
- # Project
241
  try:
242
  yield self._apply_projection(row, recipe)
243
  count += 1
244
- except Exception as e: raise ValueError(f"Row {i} error: {e}")
 
 
 
 
245
 
246
  try:
247
- new_ds = datasets.Dataset.from_generator(gen)
248
- new_ds.push_to_hub(target_id, token=self.token)
 
 
 
249
  try:
250
  card = self._generate_card(source_id, target_id, recipe, new_license or "unknown")
251
  card.push_to_hub(target_id, token=self.token)
252
- except: pass
253
- return {"status": "success", "rows_processed": len(new_ds)}
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
254
  except Exception as e:
255
- return {"status": "failed", "error": str(e)}
 
 
2
  import logging
3
  import datasets
4
  import math
5
+ import re
6
  from datasets import load_dataset, get_dataset_config_names, get_dataset_infos
7
  from huggingface_hub import HfApi, DatasetCard, DatasetCardData
 
8
 
9
+ # Configure logging
10
  logging.basicConfig(level=logging.INFO)
11
  logger = logging.getLogger(__name__)
12
 
 
15
  self.token = token
16
  self.api = HfApi(token=token)
17
 
18
+ # ==========================================
19
+ # 1. METADATA & SCHEMA INSPECTION
20
+ # ==========================================
21
 
22
  def get_dataset_metadata(self, dataset_id):
23
+ """
24
+ Robustly fetches Configs and Splits.
25
+ Handles 404 errors gracefully if metadata files are missing.
26
+ """
27
  configs = ['default']
28
  splits = ['train', 'test', 'validation']
29
  license_name = "unknown"
30
+
31
  try:
32
+ # 1. Fetch Configs
33
  try:
34
+ found_configs = get_dataset_config_names(dataset_id, token=self.token)
35
+ if found_configs:
36
+ configs = found_configs
37
+ except Exception:
38
+ pass # Keep default
39
 
40
+ # 2. Fetch Metadata (Splits & License)
41
  try:
42
+ selected = configs[0]
43
+ # This API call can fail on some datasets, so we wrap it safely
44
  infos = get_dataset_infos(dataset_id, token=self.token)
45
+
46
+ info = None
47
+ if selected in infos:
48
+ info = infos[selected]
49
+ elif 'default' in infos:
50
+ info = infos['default']
51
+ elif infos:
52
+ info = list(infos.values())[0]
53
+
54
  if info:
55
  splits = list(info.splits.keys())
56
  license_name = info.license or "unknown"
57
+ except Exception:
58
+ pass # Keep defaults if metadata fails
59
+
60
+ return {
61
+ "status": "success",
62
+ "configs": configs,
63
+ "splits": splits,
64
+ "license_detected": license_name
65
+ }
66
  except Exception as e:
67
  return {"status": "error", "message": str(e)}
68
 
69
  def get_splits_for_config(self, dataset_id, config_name):
70
+ """
71
+ Updates the Split dropdown when the user changes the Config.
72
+ """
73
  try:
74
  infos = get_dataset_infos(dataset_id, config_name=config_name, token=self.token)
75
+ if config_name in infos:
76
+ splits = list(infos[config_name].splits.keys())
77
+ elif len(infos) > 0:
78
+ splits = list(infos.values())[0].splits.keys()
79
+ else:
80
+ splits = ['train', 'test']
81
  return {"status": "success", "splits": splits}
82
  except:
83
  return {"status": "success", "splits": ['train', 'test', 'validation']}
84
 
85
  def _sanitize_for_json(self, obj):
86
+ """
87
+ Recursively cleans data for JSON serialization.
88
+ CRITICAL FIX: Prevents 'Preview' crashes caused by NaN, Infinity, or Timestamps.
89
+ """
90
  if isinstance(obj, float):
91
  if math.isnan(obj) or math.isinf(obj):
92
  return None
 
98
  elif isinstance(obj, (str, int, bool, type(None))):
99
  return obj
100
  else:
101
+ # Convert complex objects (Images, Dates) to string
102
  return str(obj)
103
 
104
  def _flatten_object(self, obj, parent_key='', sep='.'):
105
+ """
106
+ Recursively finds all keys in nested dicts or JSON strings
107
+ to populate the 'Simple Path' dropdown in the UI.
108
+ """
109
  items = {}
110
+
111
+ # Transparently parse JSON strings
112
  if isinstance(obj, str):
113
  s = obj.strip()
114
  if (s.startswith('{') and s.endswith('}')) or (s.startswith('[') and s.endswith(']')):
115
+ try:
116
+ obj = json.loads(s)
117
+ except:
118
+ pass
119
 
120
  if isinstance(obj, dict):
121
  for k, v in obj.items():
122
  new_key = f"{parent_key}{sep}{k}" if parent_key else k
123
+ items.update(self._flatten_object(v, new_key, sep=sep))
124
  elif isinstance(obj, list):
125
+ # We mark lists but do not recurse infinitely
126
+ new_key = f"{parent_key}" if parent_key else "list_content"
127
+ items[new_key] = "List"
128
  else:
129
+ # Leaf node
130
  items[parent_key] = type(obj).__name__
131
+
132
  return items
133
 
134
  def inspect_dataset(self, dataset_id, config, split):
135
+ """
136
+ Scans the first 10 rows to build a Schema Tree for the UI.
137
+ """
138
  try:
139
  conf = config if config != 'default' else None
140
  ds_stream = load_dataset(dataset_id, name=conf, split=split, streaming=True, token=self.token)
141
 
142
  sample_rows = []
143
  available_paths = set()
144
+ schema_map = {} # Used for List Mode detection
145
 
146
  for i, row in enumerate(ds_stream):
147
  if i >= 10: break
148
 
149
+ # 1. Clean row for UI Preview (Handle NaN/Objects)
150
  clean_row = self._sanitize_for_json(row)
151
  sample_rows.append(clean_row)
152
 
153
+ # 2. Deep Flattening for "Simple Path" dropdowns
154
  flattened = self._flatten_object(row)
155
  available_paths.update(flattened.keys())
156
 
157
+ # 3. Top Level Analysis for "List Mode" detection
158
  for k, v in row.items():
159
+ if k not in schema_map:
160
+ schema_map[k] = {"type": "Object"}
161
+
162
  val = v
163
  if isinstance(val, str):
164
  try: val = json.loads(val)
165
  except: pass
166
+
167
+ if isinstance(val, list):
168
+ schema_map[k]["type"] = "List"
169
 
170
+ # Reconstruct Schema Tree for UI grouping
171
  sorted_paths = sorted(list(available_paths))
172
  schema_tree = {}
173
  for path in sorted_paths:
174
  root = path.split('.')[0]
175
+ if root not in schema_tree:
176
+ schema_tree[root] = []
177
  schema_tree[root].append(path)
178
 
179
  return {
180
  "status": "success",
181
  "samples": sample_rows,
182
+ "schema_tree": schema_tree, # Used by Simple Path Dropdown
183
+ "schema": schema_map, # Used by List Mode Dropdown
184
  "dataset_id": dataset_id
185
  }
186
  except Exception as e:
187
  return {"status": "error", "message": str(e)}
188
 
189
+ # ==========================================
190
+ # 2. CORE EXTRACTION LOGIC
191
+ # ==========================================
192
 
193
  def _get_value_by_path(self, obj, path):
194
+ """
195
+ Navigates dot notation (meta.user.id), automatically parsing
196
+ JSON strings if encountered along the path.
197
+ """
198
  if not path: return obj
199
  keys = path.split('.')
200
  current = obj
201
+
202
  for key in keys:
203
+ # Auto-parse JSON string if encountered
204
  if isinstance(current, str):
205
  s = current.strip()
206
  if (s.startswith('{') and s.endswith('}')) or (s.startswith('[') and s.endswith(']')):
207
+ try:
208
+ current = json.loads(s)
209
+ except:
210
+ pass
211
+
212
  if isinstance(current, dict) and key in current:
213
  current = current[key]
214
+ else:
215
+ return None # Path broken
216
  return current
217
 
218
  def _extract_from_list_logic(self, row, source_col, filter_key, filter_val, target_path):
219
+ """
220
+ Logic for: FROM source_col FIND ITEM WHERE filter_key == filter_val EXTRACT target_path
221
+ """
222
  data = row.get(source_col)
223
+
224
+ # Parse if string
225
  if isinstance(data, str):
226
+ try:
227
+ data = json.loads(data)
228
+ except:
229
+ return None
230
+
231
+ if not isinstance(data, list):
232
+ return None
233
 
234
+ matched_item = None
235
  for item in data:
236
+ # String comparison for safety
237
  if str(item.get(filter_key, '')) == str(filter_val):
238
+ matched_item = item
239
  break
240
+
241
+ if matched_item:
242
+ return self._get_value_by_path(matched_item, target_path)
243
+
244
  return None
245
 
246
  def _apply_projection(self, row, recipe):
247
+ """
248
+ Builds the new row based on the recipe.
249
+ Raises ValueError if user Python code fails (Fail Fast).
250
+ """
251
  new_row = {}
252
+
253
+ # Setup Eval Context (Variables available in Python Mode)
254
  eval_context = row.copy()
255
  eval_context['row'] = row
256
  eval_context['json'] = json
257
  eval_context['re'] = re
258
 
259
+ for col_def in recipe['columns']:
260
+ t_type = col_def.get('type', 'simple')
261
+ target_col = col_def['name']
262
+
263
  try:
264
+ if t_type == 'simple':
265
+ new_row[target_col] = self._get_value_by_path(row, col_def['source'])
266
+
267
+ elif t_type == 'list_search':
268
+ new_row[target_col] = self._extract_from_list_logic(
269
+ row,
270
+ col_def['source'],
271
+ col_def['filter_key'],
272
+ col_def['filter_val'],
273
+ col_def['target_key']
274
+ )
275
+
276
+ elif t_type == 'python':
277
+ # Execute user code
278
+ expression = col_def['expression']
279
+ val = eval(expression, {}, eval_context)
280
+ new_row[target_col] = val
281
+
282
  except Exception as e:
283
+ # Fail Fast: Stop the generator immediately if a column fails
284
+ raise ValueError(f"Column '{target_col}' failed: {str(e)}")
285
+
286
  return new_row
287
 
288
+ # ==========================================
289
+ # 3. DOCUMENTATION (MODEL CARD)
290
+ # ==========================================
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
291
 
292
  def _generate_card(self, source_id, target_id, recipe, license_name):
293
+ """
294
+ Creates a high-quality README.md with a Markdown table of operations.
295
+ """
296
+ card_data = DatasetCardData(
297
+ language="en",
298
+ license=license_name,
299
+ tags=["dataset-command-center", "etl", "generated-dataset"],
300
+ base_model=source_id,
301
+ )
302
+
303
+ content = f"""
304
+ # {target_id.split('/')[-1]}
305
+
306
+ This dataset is a transformation of [{source_id}](https://huggingface.co/datasets/{source_id}).
307
+ It was generated using the **Hugging Face Dataset Command Center**.
308
+
309
+ ## Transformation Recipe
310
+
311
+ The following operations were applied to the source data:
312
+
313
+ | Target Column | Operation Type | Source / Logic |
314
+ |---------------|----------------|----------------|
315
+ """
316
+ for col in recipe['columns']:
317
+ c_type = col.get('type', 'simple')
318
+ c_name = col['name']
319
+ c_src = col.get('source', '-')
320
+
321
+ logic = "-"
322
+ if c_type == 'simple':
323
+ logic = f"Mapped from `{c_src}`"
324
+ elif c_type == 'list_search':
325
+ logic = f"Get `{col['target_key']}` where `{col['filter_key']} == {col['filter_val']}`"
326
+ elif c_type == 'python':
327
+ logic = f"Python: `{col.get('expression')}`"
328
+
329
+ content += f"| **{c_name}** | {c_type} | {logic} |\n"
330
+
331
+ if recipe.get('filter_rule'):
332
+ content += f"\n### Row Filtering\n**Filter Applied:** `{recipe['filter_rule']}`\n"
333
+
334
+ content += f"\n## Original License\nThis dataset inherits the license: `{license_name}` from the source."
335
+
336
+ card = DatasetCard.from_template(card_data, content=content)
337
+ return card
338
+
339
+ # ==========================================
340
+ # 4. EXECUTION
341
+ # ==========================================
342
 
343
  def process_and_push(self, source_id, config, split, target_id, recipe, max_rows=None, new_license=None):
344
+ logger.info(f"Job started: {source_id} -> {target_id}")
345
  conf = config if config != 'default' else None
346
 
347
  def gen():
348
+ ds_stream = load_dataset(source_id, name=conf, split=split, streaming=True, token=self.token)
349
  count = 0
350
+ for i, row in enumerate(ds_stream):
351
+ if max_rows and count >= int(max_rows):
352
+ break
353
+
354
+ # 1. Filter
355
  if recipe.get('filter_rule'):
356
  try:
357
  ctx = row.copy()
358
  ctx['row'] = row
359
  ctx['json'] = json
360
  ctx['re'] = re
361
+ if not eval(recipe['filter_rule'], {}, ctx):
362
+ continue
363
+ except Exception as e:
364
+ raise ValueError(f"Filter crashed on row {i}: {e}")
365
 
366
+ # 2. Projection
367
  try:
368
  yield self._apply_projection(row, recipe)
369
  count += 1
370
+ except ValueError as ve:
371
+ # Pass the specific column error up
372
+ raise ve
373
+ except Exception as e:
374
+ raise ValueError(f"Unexpected crash on row {i}: {e}")
375
 
376
  try:
377
+ # 1. Process & Push Data
378
+ new_dataset = datasets.Dataset.from_generator(gen)
379
+ new_dataset.push_to_hub(target_id, token=self.token)
380
+
381
+ # 2. Generate & Push Card
382
  try:
383
  card = self._generate_card(source_id, target_id, recipe, new_license or "unknown")
384
  card.push_to_hub(target_id, token=self.token)
385
+ except Exception as e:
386
+ logger.error(f"Failed to push Dataset Card: {e}")
387
+
388
+ return {"status": "success", "rows_processed": len(new_dataset)}
389
+
390
+ except Exception as e:
391
+ logger.error(f"Job Failed: {e}")
392
+ return {"status": "failed", "error": str(e)}
393
+
394
+ # ==========================================
395
+ # 5. PREVIEW
396
+ # ==========================================
397
+
398
+ def preview_transform(self, dataset_id, config, split, recipe):
399
+ conf = config if config != 'default' else None
400
+
401
+ try:
402
+ ds_stream = load_dataset(dataset_id, name=conf, split=split, streaming=True, token=self.token)
403
+ processed = []
404
+
405
+ for i, row in enumerate(ds_stream):
406
+ if len(processed) >= 5: break
407
+
408
+ # Check Filter
409
+ passed = True
410
+ if recipe.get('filter_rule'):
411
+ try:
412
+ ctx = row.copy()
413
+ ctx['row'] = row
414
+ ctx['json'] = json
415
+ ctx['re'] = re
416
+ if not eval(recipe['filter_rule'], {}, ctx):
417
+ passed = False
418
+ except:
419
+ passed = False # Skip invalid rows in preview
420
+
421
+ if passed:
422
+ try:
423
+ new_row = self._apply_projection(row, recipe)
424
+ # CRITICAL: Sanitize for JSON (handles NaNs, Dates, Images)
425
+ clean_new_row = self._sanitize_for_json(new_row)
426
+ processed.append(clean_new_row)
427
+ except Exception as e:
428
+ # In preview, we want to see the error, not crash
429
+ processed.append({"_preview_error": f"Error: {str(e)}"})
430
+
431
+ return processed
432
  except Exception as e:
433
+ # Return global error if loading fails
434
+ raise e