broadfield-dev commited on
Commit
6baf7e5
·
verified ·
1 Parent(s): 917097f

Update processor.py

Browse files
Files changed (1) hide show
  1. processor.py +81 -13
processor.py CHANGED
@@ -222,23 +222,28 @@ class DatasetCommandCenter:
222
  def _apply_projection(self, row, recipe):
223
  new_row = {}
224
 
225
- # 1. Prepare Execution Context for Python Mode
226
- # This allows users to write: row['text'] OR just text
 
 
 
 
 
 
 
 
227
  eval_context = row.copy()
228
- eval_context['row'] = row # <--- CRITICAL FIX: explicit 'row' variable
229
  eval_context['json'] = json
230
- import re
231
- eval_context['re'] = re # Allow regex usage
232
 
233
  for col_def in recipe['columns']:
234
  t_type = col_def.get('type', 'simple')
235
 
236
  if t_type == 'simple':
237
- # Standard Dot Notation
238
  new_row[col_def['name']] = self._get_value_by_path(row, col_def['source'])
239
 
240
  elif t_type == 'list_search':
241
- # GET x WHERE y=z
242
  val = self._extract_from_list_logic(
243
  row,
244
  col_def['source'],
@@ -249,17 +254,21 @@ class DatasetCommandCenter:
249
  new_row[col_def['name']] = val
250
 
251
  elif t_type == 'python':
252
- # Advanced Python Eval
253
  try:
254
- # Logic: eval("row['text'].upper()", globals, locals)
255
  val = eval(col_def['expression'], {}, eval_context)
256
  new_row[col_def['name']] = val
257
  except Exception as e:
258
- # If it fails (e.g. key error on specific row), return None
259
- new_row[col_def['name']] = None
 
 
260
 
261
  return new_row
262
 
 
 
263
  def _passes_filter(self, row, filter_str):
264
  if not filter_str or not filter_str.strip():
265
  return True
@@ -331,7 +340,7 @@ The following operations were applied to the source data:
331
  card = DatasetCard.from_template(card_data, content=content)
332
  return card
333
 
334
- def process_and_push(self, source_id, config, split, target_id, recipe, max_rows=None):
335
  logger.info(f"Job started: {source_id}")
336
  conf = config if config != 'default' else None
337
 
@@ -357,8 +366,67 @@ The following operations were applied to the source data:
357
 
358
  return {"status": "success", "rows_processed": len(new_dataset)}
359
  except Exception as e:
360
- return {"status": "error", "message": str(e)}
 
 
 
 
 
 
 
 
 
 
 
 
 
 
361
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
362
 
363
  def preview_transform(self, dataset_id, config, split, recipe):
364
  conf = config if config != 'default' else None
 
222
  def _apply_projection(self, row, recipe):
223
  new_row = {}
224
 
225
+ # OPTIMIZATION: Prepare context once per row, but imports are global/cached
226
+ # We assume standard libraries are available.
227
+ # For safety, we only import them once inside the method scope if needed,
228
+ # but Python caches imports so doing it here is fine.
229
+ import re
230
+ import json
231
+
232
+ # 1. Context Creation
233
+ # We use a shallow copy. If deep nested edits are needed, users should handle that,
234
+ # but shallow copy prevents modifying the original 'row' variable accidentally in the context.
235
  eval_context = row.copy()
236
+ eval_context['row'] = row
237
  eval_context['json'] = json
238
+ eval_context['re'] = re
 
239
 
240
  for col_def in recipe['columns']:
241
  t_type = col_def.get('type', 'simple')
242
 
243
  if t_type == 'simple':
 
244
  new_row[col_def['name']] = self._get_value_by_path(row, col_def['source'])
245
 
246
  elif t_type == 'list_search':
 
247
  val = self._extract_from_list_logic(
248
  row,
249
  col_def['source'],
 
254
  new_row[col_def['name']] = val
255
 
256
  elif t_type == 'python':
257
+ # --- CRITICAL FIX: Error Visibility ---
258
  try:
259
+ # Execute the user's python string
260
  val = eval(col_def['expression'], {}, eval_context)
261
  new_row[col_def['name']] = val
262
  except Exception as e:
263
+ # Instead of silently setting None, we raise a custom error
264
+ # that the generator can catch to abort the job.
265
+ # We include the column name to help you debug.
266
+ raise RuntimeError(f"Python Error in column '{col_def['name']}': {str(e)}")
267
 
268
  return new_row
269
 
270
+
271
+
272
  def _passes_filter(self, row, filter_str):
273
  if not filter_str or not filter_str.strip():
274
  return True
 
340
  card = DatasetCard.from_template(card_data, content=content)
341
  return card
342
 
343
+ '''def process_and_push(self, source_id, config, split, target_id, recipe, max_rows=None):
344
  logger.info(f"Job started: {source_id}")
345
  conf = config if config != 'default' else None
346
 
 
366
 
367
  return {"status": "success", "rows_processed": len(new_dataset)}
368
  except Exception as e:
369
+ return {"status": "error", "message": str(e)}'''
370
+ def process_and_push(self, source_id, config, split, target_id, recipe, max_rows=None, new_license=None):
371
+ logger.info(f"Job started: {source_id} -> {target_id}")
372
+ conf = config if config != 'default' else None
373
+
374
+ # We need a way to bubble exceptions out of the generator
375
+ # to the main thread.
376
+
377
+ def gen():
378
+ ds_stream = load_dataset(source_id, name=conf, split=split, streaming=True, token=self.token)
379
+ count = 0
380
+
381
+ for i, row in enumerate(ds_stream):
382
+ if max_rows and count >= int(max_rows):
383
+ break
384
 
385
+ # 1. Apply Filter (Safe Guarded)
386
+ if recipe.get('filter_rule'):
387
+ try:
388
+ # Re-create context for filter
389
+ import re, json
390
+ ctx = row.copy()
391
+ ctx['row'] = row
392
+ ctx['json'] = json
393
+ ctx['re'] = re
394
+ if not eval(recipe['filter_rule'], {}, ctx):
395
+ continue # Skip this row naturally
396
+ except Exception as e:
397
+ raise RuntimeError(f"Error in Row Filter: {str(e)}")
398
+
399
+ # 2. Apply Projection (The heavy lifter)
400
+ try:
401
+ projected_row = self._apply_projection(row, recipe)
402
+ yield projected_row
403
+ count += 1
404
+ except RuntimeError as re_err:
405
+ # This is our custom error from above.
406
+ # We stop the generator immediately.
407
+ raise re_err
408
+ except Exception as e:
409
+ raise RuntimeError(f"Unexpected error processing row {i}: {str(e)}")
410
+
411
+ try:
412
+ # from_generator will iterate `gen()`. If gen() raises an error,
413
+ # from_generator stops and re-raises it.
414
+ new_dataset = datasets.Dataset.from_generator(gen)
415
+
416
+ new_dataset.push_to_hub(target_id, token=self.token)
417
+
418
+ # Generate Card
419
+ try:
420
+ card = self._generate_card(source_id, target_id, recipe, new_license or "unknown")
421
+ card.push_to_hub(target_id, token=self.token)
422
+ except: pass
423
+
424
+ return {"status": "success", "rows_processed": len(new_dataset)}
425
+
426
+ except Exception as e:
427
+ # Return the specific error message to the UI
428
+ logger.error(f"Job Failed: {e}")
429
+ return {"status": "failed", "error": str(e)}
430
 
431
  def preview_transform(self, dataset_id, config, split, recipe):
432
  conf = config if config != 'default' else None