broadfield-dev commited on
Commit
a738515
·
verified ·
1 Parent(s): 6baf7e5

Update processor.py

Browse files
Files changed (1) hide show
  1. processor.py +115 -229
processor.py CHANGED
@@ -3,13 +3,18 @@ import logging
3
  import datasets
4
  from datasets import load_dataset, get_dataset_config_names, get_dataset_infos
5
  from huggingface_hub import HfApi, DatasetCard, DatasetCardData
 
6
 
 
7
  logging.basicConfig(level=logging.INFO)
8
  logger = logging.getLogger(__name__)
9
 
10
  class DatasetCommandCenter:
11
  def __init__(self, token=None):
12
  self.token = token
 
 
 
13
 
14
  def get_dataset_metadata(self, dataset_id):
15
  configs = []
@@ -20,26 +25,20 @@ class DatasetCommandCenter:
20
  try:
21
  configs = get_dataset_config_names(dataset_id, token=self.token)
22
  except Exception as e:
23
- logger.warning(f"Could not fetch configs for {dataset_id}: {e}")
24
- # Fallback: if we can't get configs, assume 'default'
25
  configs = ['default']
26
 
27
  # 2. Get Splits & License
28
- # Many datasets return 404 on dataset_infos.json. We must catch this.
29
  try:
30
  selected_config = configs[0] if configs else 'default'
31
-
32
- # This API call frequently fails on datasets without metadata cards
33
  infos = get_dataset_infos(dataset_id, token=self.token)
34
 
35
- # Attempt to find the info object for our config
36
  info_obj = None
37
  if selected_config in infos:
38
  info_obj = infos[selected_config]
39
  elif 'default' in infos:
40
  info_obj = infos['default']
41
  elif len(infos) > 0:
42
- # Fallback to the first available if names don't match
43
  info_obj = list(infos.values())[0]
44
 
45
  if info_obj:
@@ -47,12 +46,9 @@ class DatasetCommandCenter:
47
  license_name = info_obj.license or "unknown"
48
 
49
  except Exception as e:
50
- logger.warning(f"Could not fetch dataset_infos (using fallbacks): {e}")
51
- # Safe Fallback if metadata fails
52
  splits = ['train', 'test', 'validation']
53
- license_name = "unknown"
54
 
55
- # Ensure we NEVER return None for lists
56
  return {
57
  "status": "success",
58
  "configs": configs if configs else ['default'],
@@ -61,107 +57,18 @@ class DatasetCommandCenter:
61
  }
62
 
63
  def get_splits_for_config(self, dataset_id, config_name):
64
- splits = []
65
  try:
66
  infos = get_dataset_infos(dataset_id, config_name=config_name, token=self.token)
67
-
68
  if config_name in infos:
69
  splits = list(infos[config_name].splits.keys())
70
  elif len(infos) > 0:
71
- # Fallback to first available
72
  splits = list(infos.values())[0].splits.keys()
73
-
74
- except Exception as e:
75
- logger.warning(f"Could not fetch splits for config {config_name}: {e}")
76
- # Fallback
77
- splits = ['train', 'test', 'validation']
78
-
79
- return {"status": "success", "splits": list(splits) if splits else ['train']}
80
-
81
- # --- HELPER: Recursive JSON/Dot Notation Getter ---
82
- def _get_value_by_path(self, obj, path):
83
- if not path: return obj
84
- keys = path.split('.')
85
- current = obj
86
-
87
- for key in keys:
88
- # Auto-parse JSON string if encountered
89
- if isinstance(current, str):
90
- s = current.strip()
91
- if (s.startswith('{') and s.endswith('}')) or (s.startswith('[') and s.endswith(']')):
92
- try:
93
- current = json.loads(s)
94
- except:
95
- pass
96
-
97
- if isinstance(current, dict) and key in current:
98
- current = current[key]
99
  else:
100
- return None
101
- return current
102
-
103
- # --- HELPER: List Search Logic ---
104
- def _extract_from_list_logic(self, row, source_col, filter_key, filter_val, target_path):
105
- """
106
- Logic: Look inside row[source_col] (which is a list).
107
- Find first item where item[filter_key] == filter_val.
108
- Then extract item[target_path].
109
- """
110
- # 1. Get the list (handling JSON string if needed)
111
- data = row.get(source_col)
112
- if isinstance(data, str):
113
- try:
114
- data = json.loads(data)
115
- except:
116
- return None
117
-
118
- if not isinstance(data, list):
119
- return None
120
-
121
- # 2. Search the list
122
- matched_item = None
123
- for item in data:
124
- # We treat values as strings for comparison to be safe
125
- if str(item.get(filter_key, '')) == str(filter_val):
126
- matched_item = item
127
- break
128
-
129
- if matched_item:
130
- # 3. Extract the target (supporting nested json parsing via dot notation)
131
- # e.g. target_path = "content.analysis"
132
- return self._get_value_by_path(matched_item, target_path)
133
-
134
- return None
135
-
136
- def _flatten_schema(self, obj, parent='', visited=None):
137
- if visited is None: visited = set()
138
- items = []
139
-
140
- # Avoid infinite recursion
141
- if id(obj) in visited: return []
142
- visited.add(id(obj))
143
-
144
- # Handle JSON strings
145
- if isinstance(obj, str):
146
- s = obj.strip()
147
- if (s.startswith('{') and s.endswith('}')) or (s.startswith('[') and s.endswith(']')):
148
- try:
149
- obj = json.loads(s)
150
- except:
151
- pass
152
-
153
- if isinstance(obj, dict):
154
- for k, v in obj.items():
155
- full_key = f"{parent}.{k}" if parent else k
156
- items.append((full_key, type(v).__name__))
157
- items.extend(self._flatten_schema(v, full_key, visited))
158
- elif isinstance(obj, list) and len(obj) > 0:
159
- # For lists, we just peek at the first item to guess schema
160
- full_key = f"{parent}[]" if parent else "[]"
161
- items.append((parent, "List")) # Mark the parent as a List
162
- items.extend(self._flatten_schema(obj[0], full_key, visited))
163
 
164
- return items
165
 
166
  def inspect_dataset(self, dataset_id, config, split):
167
  try:
@@ -169,7 +76,7 @@ class DatasetCommandCenter:
169
  ds_stream = load_dataset(dataset_id, name=conf, split=split, streaming=True, token=self.token)
170
 
171
  sample_rows = []
172
- schema_map = {} # stores { "col_name": { "is_list": bool, "keys": [] } }
173
 
174
  for i, row in enumerate(ds_stream):
175
  if i >= 10: break
@@ -177,6 +84,7 @@ class DatasetCommandCenter:
177
  # Create clean sample for UI
178
  clean_row = {}
179
  for k, v in row.items():
 
180
  if not isinstance(v, (str, int, float, bool, list, dict, type(None))):
181
  clean_row[k] = str(v)
182
  else:
@@ -188,8 +96,8 @@ class DatasetCommandCenter:
188
  if k not in schema_map:
189
  schema_map[k] = {"is_list": False, "keys": set()}
190
 
191
- # Check if it's a list (or json-string list)
192
  val = v
 
193
  if isinstance(val, str):
194
  try:
195
  val = json.loads(val)
@@ -202,7 +110,6 @@ class DatasetCommandCenter:
202
  elif isinstance(val, dict):
203
  schema_map[k]["keys"].update(val.keys())
204
 
205
- # Format schema for UI
206
  formatted_schema = {}
207
  for k, info in schema_map.items():
208
  formatted_schema[k] = {
@@ -219,19 +126,51 @@ class DatasetCommandCenter:
219
  except Exception as e:
220
  return {"status": "error", "message": str(e)}
221
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
222
  def _apply_projection(self, row, recipe):
223
  new_row = {}
224
 
225
- # OPTIMIZATION: Prepare context once per row, but imports are global/cached
226
- # We assume standard libraries are available.
227
- # For safety, we only import them once inside the method scope if needed,
228
- # but Python caches imports so doing it here is fine.
229
- import re
230
- import json
231
-
232
- # 1. Context Creation
233
- # We use a shallow copy. If deep nested edits are needed, users should handle that,
234
- # but shallow copy prevents modifying the original 'row' variable accidentally in the context.
235
  eval_context = row.copy()
236
  eval_context['row'] = row
237
  eval_context['json'] = json
@@ -239,69 +178,44 @@ class DatasetCommandCenter:
239
 
240
  for col_def in recipe['columns']:
241
  t_type = col_def.get('type', 'simple')
 
242
 
243
- if t_type == 'simple':
244
- new_row[col_def['name']] = self._get_value_by_path(row, col_def['source'])
 
245
 
246
- elif t_type == 'list_search':
247
- val = self._extract_from_list_logic(
248
- row,
249
- col_def['source'],
250
- col_def['filter_key'],
251
- col_def['filter_val'],
252
- col_def['target_key']
253
- )
254
- new_row[col_def['name']] = val
255
 
256
- elif t_type == 'python':
257
- # --- CRITICAL FIX: Error Visibility ---
258
- try:
259
- # Execute the user's python string
260
- val = eval(col_def['expression'], {}, eval_context)
261
- new_row[col_def['name']] = val
262
- except Exception as e:
263
- # Instead of silently setting None, we raise a custom error
264
- # that the generator can catch to abort the job.
265
- # We include the column name to help you debug.
266
- raise RuntimeError(f"Python Error in column '{col_def['name']}': {str(e)}")
267
 
268
- return new_row
269
-
270
-
271
 
272
- def _passes_filter(self, row, filter_str):
273
- if not filter_str or not filter_str.strip():
274
- return True
275
- try:
276
- # Fix context here as well so filters like "len(row['text']) > 5" work
277
- context = row.copy()
278
- context['row'] = row
279
- context['json'] = json
280
- import re
281
- context['re'] = re
282
-
283
- return eval(filter_str, {}, context)
284
- except:
285
- return False
286
  return new_row
287
 
 
288
 
289
-
290
-
291
  def _generate_card(self, source_id, target_id, recipe, license_name):
292
- """
293
- Generates a README.md with YAML metadata and a report of operations.
294
- """
295
 
296
- # 1. YAML Metadata
297
  card_data = DatasetCardData(
298
  language="en",
299
  license=license_name,
300
  tags=["dataset-command-center", "etl", "generated-dataset"],
301
- base_model=source_id, # Linking source
302
  )
303
 
304
- # 2. Description & Recipe Table
305
  content = f"""
306
  # {target_id.split('/')[-1]}
307
 
@@ -315,20 +229,15 @@ The following operations were applied to the source data:
315
  | Target Column | Source | Type | Logic / Filter |
316
  |---------------|--------|------|----------------|
317
  """
318
-
319
  for col in recipe['columns']:
320
  c_type = col.get('type', 'simple')
321
  c_name = col['name']
322
  c_src = col.get('source', '-')
323
 
324
- if c_type == 'simple':
325
- logic = "Direct Mapping"
326
- elif c_type == 'list_search':
327
- logic = f"Get `{col['target_key']}` where `{col['filter_key']} == {col['filter_val']}`"
328
- elif c_type == 'python':
329
- logic = f"`{col.get('expression')}`"
330
- else:
331
- logic = "-"
332
 
333
  content += f"| **{c_name}** | `{c_src}` | {c_type} | {logic} |\n"
334
 
@@ -340,100 +249,77 @@ The following operations were applied to the source data:
340
  card = DatasetCard.from_template(card_data, content=content)
341
  return card
342
 
343
- '''def process_and_push(self, source_id, config, split, target_id, recipe, max_rows=None):
344
- logger.info(f"Job started: {source_id}")
345
- conf = config if config != 'default' else None
346
-
347
- def gen():
348
- ds_stream = load_dataset(source_id, name=conf, split=split, streaming=True, token=self.token)
349
- count = 0
350
- for row in ds_stream:
351
- if max_rows and count >= int(max_rows): break
352
-
353
- if self._passes_filter(row, recipe.get('filter_rule')):
354
- yield self._apply_projection(row, recipe)
355
- count += 1
356
-
357
- try:
358
- new_dataset = datasets.Dataset.from_generator(gen)
359
- new_dataset.push_to_hub(target_id, token=self.token)
360
- # 2. GENERATE & PUSH CARD
361
- try:
362
- card = self._generate_card(source_id, target_id, recipe, new_license or "unknown")
363
- card.push_to_hub(target_id, token=self.token)
364
- except Exception as e:
365
- logger.warning(f"Could not push dataset card: {e}")
366
 
367
- return {"status": "success", "rows_processed": len(new_dataset)}
368
- except Exception as e:
369
- return {"status": "error", "message": str(e)}'''
370
  def process_and_push(self, source_id, config, split, target_id, recipe, max_rows=None, new_license=None):
371
  logger.info(f"Job started: {source_id} -> {target_id}")
372
  conf = config if config != 'default' else None
373
 
374
- # We need a way to bubble exceptions out of the generator
375
- # to the main thread.
376
-
377
  def gen():
378
  ds_stream = load_dataset(source_id, name=conf, split=split, streaming=True, token=self.token)
379
  count = 0
380
-
381
  for i, row in enumerate(ds_stream):
382
- if max_rows and count >= int(max_rows):
383
- break
384
 
385
- # 1. Apply Filter (Safe Guarded)
386
  if recipe.get('filter_rule'):
387
  try:
388
- # Re-create context for filter
389
- import re, json
390
  ctx = row.copy()
391
  ctx['row'] = row
392
- ctx['json'] = json
393
- ctx['re'] = re
394
  if not eval(recipe['filter_rule'], {}, ctx):
395
- continue # Skip this row naturally
396
  except Exception as e:
397
- raise RuntimeError(f"Error in Row Filter: {str(e)}")
398
 
399
- # 2. Apply Projection (The heavy lifter)
400
  try:
401
- projected_row = self._apply_projection(row, recipe)
402
- yield projected_row
403
  count += 1
404
- except RuntimeError as re_err:
405
- # This is our custom error from above.
406
- # We stop the generator immediately.
407
- raise re_err
408
  except Exception as e:
409
- raise RuntimeError(f"Unexpected error processing row {i}: {str(e)}")
410
 
411
  try:
412
- # from_generator will iterate `gen()`. If gen() raises an error,
413
- # from_generator stops and re-raises it.
414
  new_dataset = datasets.Dataset.from_generator(gen)
415
-
416
  new_dataset.push_to_hub(target_id, token=self.token)
417
 
418
- # Generate Card
419
  try:
420
  card = self._generate_card(source_id, target_id, recipe, new_license or "unknown")
421
  card.push_to_hub(target_id, token=self.token)
422
- except: pass
 
 
423
 
424
  return {"status": "success", "rows_processed": len(new_dataset)}
425
 
426
  except Exception as e:
427
- # Return the specific error message to the UI
428
  logger.error(f"Job Failed: {e}")
429
  return {"status": "failed", "error": str(e)}
430
-
 
431
  def preview_transform(self, dataset_id, config, split, recipe):
432
  conf = config if config != 'default' else None
433
  ds_stream = load_dataset(dataset_id, name=conf, split=split, streaming=True, token=self.token)
434
  processed = []
435
  for row in ds_stream:
436
  if len(processed) >= 5: break
437
- if self._passes_filter(row, recipe.get('filter_rule')):
438
- processed.append(self._apply_projection(row, recipe))
 
 
 
 
 
 
 
 
 
 
 
 
 
 
439
  return processed
 
3
  import datasets
4
  from datasets import load_dataset, get_dataset_config_names, get_dataset_infos
5
  from huggingface_hub import HfApi, DatasetCard, DatasetCardData
6
+ import re
7
 
8
+ # Configure logging
9
  logging.basicConfig(level=logging.INFO)
10
  logger = logging.getLogger(__name__)
11
 
12
  class DatasetCommandCenter:
13
  def __init__(self, token=None):
14
  self.token = token
15
+ self.api = HfApi(token=token)
16
+
17
+ # --- 1. METADATA & INSPECTION ---
18
 
19
  def get_dataset_metadata(self, dataset_id):
20
  configs = []
 
25
  try:
26
  configs = get_dataset_config_names(dataset_id, token=self.token)
27
  except Exception as e:
28
+ logger.warning(f"Could not fetch configs: {e}")
 
29
  configs = ['default']
30
 
31
  # 2. Get Splits & License
 
32
  try:
33
  selected_config = configs[0] if configs else 'default'
 
 
34
  infos = get_dataset_infos(dataset_id, token=self.token)
35
 
 
36
  info_obj = None
37
  if selected_config in infos:
38
  info_obj = infos[selected_config]
39
  elif 'default' in infos:
40
  info_obj = infos['default']
41
  elif len(infos) > 0:
 
42
  info_obj = list(infos.values())[0]
43
 
44
  if info_obj:
 
46
  license_name = info_obj.license or "unknown"
47
 
48
  except Exception as e:
49
+ logger.warning(f"Metadata fetch fallback: {e}")
 
50
  splits = ['train', 'test', 'validation']
 
51
 
 
52
  return {
53
  "status": "success",
54
  "configs": configs if configs else ['default'],
 
57
  }
58
 
59
  def get_splits_for_config(self, dataset_id, config_name):
 
60
  try:
61
  infos = get_dataset_infos(dataset_id, config_name=config_name, token=self.token)
 
62
  if config_name in infos:
63
  splits = list(infos[config_name].splits.keys())
64
  elif len(infos) > 0:
 
65
  splits = list(infos.values())[0].splits.keys()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
66
  else:
67
+ splits = ['train', 'test']
68
+ except:
69
+ splits = ['train', 'test', 'validation']
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
70
 
71
+ return {"status": "success", "splits": splits}
72
 
73
  def inspect_dataset(self, dataset_id, config, split):
74
  try:
 
76
  ds_stream = load_dataset(dataset_id, name=conf, split=split, streaming=True, token=self.token)
77
 
78
  sample_rows = []
79
+ schema_map = {}
80
 
81
  for i, row in enumerate(ds_stream):
82
  if i >= 10: break
 
84
  # Create clean sample for UI
85
  clean_row = {}
86
  for k, v in row.items():
87
+ # Convert objects to strings for display safety
88
  if not isinstance(v, (str, int, float, bool, list, dict, type(None))):
89
  clean_row[k] = str(v)
90
  else:
 
96
  if k not in schema_map:
97
  schema_map[k] = {"is_list": False, "keys": set()}
98
 
 
99
  val = v
100
+ # Check for JSON string
101
  if isinstance(val, str):
102
  try:
103
  val = json.loads(val)
 
110
  elif isinstance(val, dict):
111
  schema_map[k]["keys"].update(val.keys())
112
 
 
113
  formatted_schema = {}
114
  for k, info in schema_map.items():
115
  formatted_schema[k] = {
 
126
  except Exception as e:
127
  return {"status": "error", "message": str(e)}
128
 
129
+ # --- 2. EXTRACTION LOGIC ---
130
+
131
+ def _get_value_by_path(self, obj, path):
132
+ if not path: return obj
133
+ keys = path.split('.')
134
+ current = obj
135
+
136
+ for key in keys:
137
+ if isinstance(current, str):
138
+ s = current.strip()
139
+ if (s.startswith('{') and s.endswith('}')) or (s.startswith('[') and s.endswith(']')):
140
+ try:
141
+ current = json.loads(s)
142
+ except: pass
143
+
144
+ if isinstance(current, dict) and key in current:
145
+ current = current[key]
146
+ else:
147
+ return None
148
+ return current
149
+
150
+ def _extract_from_list_logic(self, row, source_col, filter_key, filter_val, target_path):
151
+ data = row.get(source_col)
152
+ if isinstance(data, str):
153
+ try:
154
+ data = json.loads(data)
155
+ except: return None
156
+
157
+ if not isinstance(data, list):
158
+ return None
159
+
160
+ matched_item = None
161
+ for item in data:
162
+ if str(item.get(filter_key, '')) == str(filter_val):
163
+ matched_item = item
164
+ break
165
+
166
+ if matched_item:
167
+ return self._get_value_by_path(matched_item, target_path)
168
+ return None
169
+
170
  def _apply_projection(self, row, recipe):
171
  new_row = {}
172
 
173
+ # Setup Context for Python/Eval
 
 
 
 
 
 
 
 
 
174
  eval_context = row.copy()
175
  eval_context['row'] = row
176
  eval_context['json'] = json
 
178
 
179
  for col_def in recipe['columns']:
180
  t_type = col_def.get('type', 'simple')
181
+ target_col = col_def['name']
182
 
183
+ try:
184
+ if t_type == 'simple':
185
+ new_row[target_col] = self._get_value_by_path(row, col_def['source'])
186
 
187
+ elif t_type == 'list_search':
188
+ new_row[target_col] = self._extract_from_list_logic(
189
+ row,
190
+ col_def['source'],
191
+ col_def['filter_key'],
192
+ col_def['filter_val'],
193
+ col_def['target_key']
194
+ )
 
195
 
196
+ elif t_type == 'python':
197
+ expression = col_def['expression']
198
+ val = eval(expression, {}, eval_context)
199
+ new_row[target_col] = val
 
 
 
 
 
 
 
200
 
201
+ except Exception as e:
202
+ # Fail Fast: Raise error to stop the generator
203
+ raise ValueError(f"Column '{target_col}' failed: {str(e)}")
204
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
205
  return new_row
206
 
207
+ # --- 3. DOCUMENTATION (CARD) ---
208
 
 
 
209
  def _generate_card(self, source_id, target_id, recipe, license_name):
210
+ logger.info(f"Generating card for {target_id} with license {license_name}")
 
 
211
 
 
212
  card_data = DatasetCardData(
213
  language="en",
214
  license=license_name,
215
  tags=["dataset-command-center", "etl", "generated-dataset"],
216
+ base_model=source_id,
217
  )
218
 
 
219
  content = f"""
220
  # {target_id.split('/')[-1]}
221
 
 
229
  | Target Column | Source | Type | Logic / Filter |
230
  |---------------|--------|------|----------------|
231
  """
 
232
  for col in recipe['columns']:
233
  c_type = col.get('type', 'simple')
234
  c_name = col['name']
235
  c_src = col.get('source', '-')
236
 
237
+ logic = "-"
238
+ if c_type == 'simple': logic = "Direct Mapping"
239
+ elif c_type == 'list_search': logic = f"Get `{col['target_key']}` where `{col['filter_key']} == {col['filter_val']}`"
240
+ elif c_type == 'python': logic = f"`{col.get('expression')}`"
 
 
 
 
241
 
242
  content += f"| **{c_name}** | `{c_src}` | {c_type} | {logic} |\n"
243
 
 
249
  card = DatasetCard.from_template(card_data, content=content)
250
  return card
251
 
252
+ # --- 4. EXECUTION ---
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
253
 
 
 
 
254
  def process_and_push(self, source_id, config, split, target_id, recipe, max_rows=None, new_license=None):
255
  logger.info(f"Job started: {source_id} -> {target_id}")
256
  conf = config if config != 'default' else None
257
 
 
 
 
258
  def gen():
259
  ds_stream = load_dataset(source_id, name=conf, split=split, streaming=True, token=self.token)
260
  count = 0
 
261
  for i, row in enumerate(ds_stream):
262
+ if max_rows and count >= int(max_rows): break
 
263
 
264
+ # Filter
265
  if recipe.get('filter_rule'):
266
  try:
 
 
267
  ctx = row.copy()
268
  ctx['row'] = row
 
 
269
  if not eval(recipe['filter_rule'], {}, ctx):
270
+ continue
271
  except Exception as e:
272
+ raise ValueError(f"Filter crashed on row {i}: {e}")
273
 
274
+ # Projection
275
  try:
276
+ yield self._apply_projection(row, recipe)
 
277
  count += 1
278
+ except ValueError as ve:
279
+ raise ve
 
 
280
  except Exception as e:
281
+ raise ValueError(f"Crash on row {i}: {e}")
282
 
283
  try:
284
+ # 1. Push Data
 
285
  new_dataset = datasets.Dataset.from_generator(gen)
 
286
  new_dataset.push_to_hub(target_id, token=self.token)
287
 
288
+ # 2. Push Card
289
  try:
290
  card = self._generate_card(source_id, target_id, recipe, new_license or "unknown")
291
  card.push_to_hub(target_id, token=self.token)
292
+ except Exception as e:
293
+ logger.error(f"Failed to push Dataset Card: {e}")
294
+ # We do NOT fail the whole job, but we log it.
295
 
296
  return {"status": "success", "rows_processed": len(new_dataset)}
297
 
298
  except Exception as e:
 
299
  logger.error(f"Job Failed: {e}")
300
  return {"status": "failed", "error": str(e)}
301
+
302
+ # --- 5. PREVIEW ---
303
  def preview_transform(self, dataset_id, config, split, recipe):
304
  conf = config if config != 'default' else None
305
  ds_stream = load_dataset(dataset_id, name=conf, split=split, streaming=True, token=self.token)
306
  processed = []
307
  for row in ds_stream:
308
  if len(processed) >= 5: break
309
+
310
+ # Filter
311
+ passed = True
312
+ if recipe.get('filter_rule'):
313
+ try:
314
+ ctx = row.copy()
315
+ ctx['row'] = row
316
+ if not eval(recipe['filter_rule'], {}, ctx): passed = False
317
+ except: passed = False
318
+
319
+ if passed:
320
+ try:
321
+ processed.append(self._apply_projection(row, recipe))
322
+ except Exception as e:
323
+ processed.append({"error": str(e)}) # Show error in preview
324
+
325
  return processed