broadfield-dev commited on
Commit
5015673
·
verified ·
1 Parent(s): ff3a113

Update processor.py

Browse files
Files changed (1) hide show
  1. processor.py +72 -113
processor.py CHANGED
@@ -1,11 +1,11 @@
1
  import json
2
  import logging
3
  import datasets
 
4
  from datasets import load_dataset, get_dataset_config_names, get_dataset_infos
5
  from huggingface_hub import HfApi, DatasetCard, DatasetCardData
6
  import re
7
 
8
- # Configure logging
9
  logging.basicConfig(level=logging.INFO)
10
  logger = logging.getLogger(__name__)
11
 
@@ -14,42 +14,28 @@ class DatasetCommandCenter:
14
  self.token = token
15
  self.api = HfApi(token=token)
16
 
17
- # ==========================================
18
- # 1. METADATA & INSPECTION
19
- # ==========================================
20
 
21
  def get_dataset_metadata(self, dataset_id):
22
  configs = ['default']
23
  splits = ['train', 'test', 'validation']
24
  license_name = "unknown"
25
-
26
  try:
27
- # 1. Fetch Configs
28
  try:
29
- found_configs = get_dataset_config_names(dataset_id, token=self.token)
30
- if found_configs: configs = found_configs
31
  except: pass
32
 
33
- # 2. Fetch Metadata
34
  try:
35
- selected = configs[0]
36
  infos = get_dataset_infos(dataset_id, token=self.token)
37
- info = None
38
- if selected in infos: info = infos[selected]
39
- elif 'default' in infos: info = infos['default']
40
- elif infos: info = list(infos.values())[0]
41
-
42
  if info:
43
  splits = list(info.splits.keys())
44
  license_name = info.license or "unknown"
45
  except: pass
46
-
47
- return {
48
- "status": "success",
49
- "configs": configs,
50
- "splits": splits,
51
- "license_detected": license_name
52
- }
53
  except Exception as e:
54
  return {"status": "error", "message": str(e)}
55
 
@@ -61,8 +47,22 @@ class DatasetCommandCenter:
61
  except:
62
  return {"status": "success", "splits": ['train', 'test', 'validation']}
63
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
64
  def _flatten_object(self, obj, parent_key='', sep='.'):
65
- """Recursively finds keys for the UI dropdowns."""
66
  items = {}
67
  if isinstance(obj, str):
68
  s = obj.strip()
@@ -73,7 +73,7 @@ class DatasetCommandCenter:
73
  if isinstance(obj, dict):
74
  for k, v in obj.items():
75
  new_key = f"{parent_key}{sep}{k}" if parent_key else k
76
- items.update(self._flatten_object(v, new_key, sep=sep))
77
  elif isinstance(obj, list):
78
  items[parent_key or "list"] = "List"
79
  else:
@@ -92,7 +92,7 @@ class DatasetCommandCenter:
92
  for i, row in enumerate(ds_stream):
93
  if i >= 10: break
94
 
95
- # Clean row for UI (No objects)
96
  clean_row = self._sanitize_for_json(row)
97
  sample_rows.append(clean_row)
98
 
@@ -100,7 +100,7 @@ class DatasetCommandCenter:
100
  flattened = self._flatten_object(row)
101
  available_paths.update(flattened.keys())
102
 
103
- # List Mode Detection
104
  for k, v in row.items():
105
  if k not in schema_map: schema_map[k] = {"type": "Object"}
106
  val = v
@@ -126,26 +126,21 @@ class DatasetCommandCenter:
126
  except Exception as e:
127
  return {"status": "error", "message": str(e)}
128
 
129
- # ==========================================
130
- # 2. CORE LOGIC
131
- # ==========================================
132
 
133
  def _get_value_by_path(self, obj, path):
134
  if not path: return obj
135
  keys = path.split('.')
136
  current = obj
137
-
138
  for key in keys:
139
  if isinstance(current, str):
140
  s = current.strip()
141
  if (s.startswith('{') and s.endswith('}')) or (s.startswith('[') and s.endswith(']')):
142
  try: current = json.loads(s)
143
- except: pass
144
-
145
  if isinstance(current, dict) and key in current:
146
  current = current[key]
147
- else:
148
- return None
149
  return current
150
 
151
  def _extract_from_list_logic(self, row, source_col, filter_key, filter_val, target_path):
@@ -155,118 +150,84 @@ class DatasetCommandCenter:
155
  except: return None
156
  if not isinstance(data, list): return None
157
 
158
- matched_item = None
159
  for item in data:
160
  if str(item.get(filter_key, '')) == str(filter_val):
161
- matched_item = item
162
  break
163
-
164
- if matched_item:
165
- return self._get_value_by_path(matched_item, target_path)
166
  return None
167
 
168
  def _apply_projection(self, row, recipe):
169
  new_row = {}
 
170
  eval_context = row.copy()
171
  eval_context['row'] = row
172
  eval_context['json'] = json
173
  eval_context['re'] = re
174
 
175
- for col_def in recipe['columns']:
176
- t_type = col_def.get('type', 'simple')
177
- target_col = col_def['name']
178
-
179
  try:
180
- if t_type == 'simple':
181
- new_row[target_col] = self._get_value_by_path(row, col_def['source'])
182
- elif t_type == 'list_search':
183
- new_row[target_col] = self._extract_from_list_logic(
184
- row, col_def['source'], col_def['filter_key'], col_def['filter_val'], col_def['target_key']
185
- )
186
- elif t_type == 'python':
187
- val = eval(col_def['expression'], {}, eval_context)
188
- new_row[target_col] = val
189
  except Exception as e:
190
- raise ValueError(f"Column '{target_col}' failed: {str(e)}")
191
-
192
  return new_row
193
 
194
- def _sanitize_for_json(self, obj):
195
- """Helper to ensure objects are JSON serializable (fixes Preview crash)."""
196
- if isinstance(obj, dict):
197
- return {k: self._sanitize_for_json(v) for k, v in obj.items()}
198
- elif isinstance(obj, list):
199
- return [self._sanitize_for_json(v) for v in obj]
200
- elif isinstance(obj, (str, int, float, bool, type(None))):
201
- return obj
202
- else:
203
- return str(obj) # Convert Timestamps, Images, etc to string
204
-
205
- # ==========================================
206
- # 3. PREVIEW & EXECUTE
207
- # ==========================================
208
 
209
  def preview_transform(self, dataset_id, config, split, recipe):
210
  conf = config if config != 'default' else None
211
-
212
  try:
213
- ds_stream = load_dataset(dataset_id, name=conf, split=split, streaming=True, token=self.token)
214
- processed = []
215
-
216
- for i, row in enumerate(ds_stream):
217
- if len(processed) >= 5: break
218
 
219
  # Filter
220
- passed = True
221
  if recipe.get('filter_rule'):
222
  try:
223
  ctx = row.copy()
224
  ctx['row'] = row
225
  ctx['json'] = json
226
  ctx['re'] = re
227
- if not eval(recipe['filter_rule'], {}, ctx): passed = False
228
- except: passed = False # Skip crashing rows in preview
229
 
230
- if passed:
231
- try:
232
- projected = self._apply_projection(row, recipe)
233
- # SANITIZE OUTPUT so Flask doesn't crash on Timestamps/Images
234
- clean_projected = self._sanitize_for_json(projected)
235
- processed.append(clean_projected)
236
- except Exception as e:
237
- processed.append({"_preview_error": f"Error: {str(e)}"})
238
-
239
- return processed
240
  except Exception as e:
241
  raise e
242
 
243
  def _generate_card(self, source_id, target_id, recipe, license_name):
244
- card_data = DatasetCardData(
245
- language="en",
246
- license=license_name,
247
- tags=["dataset-command-center", "etl"],
248
- base_model=source_id,
249
- )
250
- content = f"""
251
- # {target_id.split('/')[-1]}
252
- This dataset is a transformation of [{source_id}](https://huggingface.co/datasets/{source_id}).
253
- ## Recipe
254
- """
255
- for col in recipe['columns']:
256
- content += f"- **{col['name']}**: {col.get('type')} ({col.get('source') or 'expr'})\n"
257
  content += f"\n**License:** {license_name}"
258
- return DatasetCard.from_template(card_data, content=content)
259
 
260
  def process_and_push(self, source_id, config, split, target_id, recipe, max_rows=None, new_license=None):
261
- logger.info(f"Job: {source_id} -> {target_id}")
262
  conf = config if config != 'default' else None
263
 
264
  def gen():
265
- ds_stream = load_dataset(source_id, name=conf, split=split, streaming=True, token=self.token)
266
  count = 0
267
- for i, row in enumerate(ds_stream):
268
  if max_rows and count >= int(max_rows): break
269
-
 
270
  if recipe.get('filter_rule'):
271
  try:
272
  ctx = row.copy()
@@ -274,23 +235,21 @@ This dataset is a transformation of [{source_id}](https://huggingface.co/dataset
274
  ctx['json'] = json
275
  ctx['re'] = re
276
  if not eval(recipe['filter_rule'], {}, ctx): continue
277
- except Exception as e:
278
- raise ValueError(f"Filter error row {i}: {e}")
279
 
 
280
  try:
281
  yield self._apply_projection(row, recipe)
282
  count += 1
283
- except ValueError as ve: raise ve
284
- except Exception as e: raise ValueError(f"Error row {i}: {e}")
285
 
286
  try:
287
- new_dataset = datasets.Dataset.from_generator(gen)
288
- new_dataset.push_to_hub(target_id, token=self.token)
289
  try:
290
  card = self._generate_card(source_id, target_id, recipe, new_license or "unknown")
291
  card.push_to_hub(target_id, token=self.token)
292
  except: pass
293
- return {"status": "success", "rows_processed": len(new_dataset)}
294
  except Exception as e:
295
- logger.error(f"Job Failed: {e}")
296
  return {"status": "failed", "error": str(e)}
 
1
  import json
2
  import logging
3
  import datasets
4
+ import math
5
  from datasets import load_dataset, get_dataset_config_names, get_dataset_infos
6
  from huggingface_hub import HfApi, DatasetCard, DatasetCardData
7
  import re
8
 
 
9
  logging.basicConfig(level=logging.INFO)
10
  logger = logging.getLogger(__name__)
11
 
 
14
  self.token = token
15
  self.api = HfApi(token=token)
16
 
17
+ # --- 1. INSPECTION ---
 
 
18
 
19
  def get_dataset_metadata(self, dataset_id):
20
  configs = ['default']
21
  splits = ['train', 'test', 'validation']
22
  license_name = "unknown"
 
23
  try:
 
24
  try:
25
+ c = get_dataset_config_names(dataset_id, token=self.token)
26
+ if c: configs = c
27
  except: pass
28
 
 
29
  try:
 
30
  infos = get_dataset_infos(dataset_id, token=self.token)
31
+ sel = configs[0]
32
+ info = infos.get(sel) or infos.get('default') or (list(infos.values())[0] if infos else None)
 
 
 
33
  if info:
34
  splits = list(info.splits.keys())
35
  license_name = info.license or "unknown"
36
  except: pass
37
+
38
+ return {"status": "success", "configs": configs, "splits": splits, "license_detected": license_name}
 
 
 
 
 
39
  except Exception as e:
40
  return {"status": "error", "message": str(e)}
41
 
 
47
  except:
48
  return {"status": "success", "splits": ['train', 'test', 'validation']}
49
 
50
+ def _sanitize_for_json(self, obj):
51
+ """Recursively cleans data for JSON serialization (Fixes NaN crash)."""
52
+ if isinstance(obj, float):
53
+ if math.isnan(obj) or math.isinf(obj):
54
+ return None
55
+ return obj
56
+ elif isinstance(obj, dict):
57
+ return {k: self._sanitize_for_json(v) for k, v in obj.items()}
58
+ elif isinstance(obj, list):
59
+ return [self._sanitize_for_json(v) for v in obj]
60
+ elif isinstance(obj, (str, int, bool, type(None))):
61
+ return obj
62
+ else:
63
+ return str(obj)
64
+
65
  def _flatten_object(self, obj, parent_key='', sep='.'):
 
66
  items = {}
67
  if isinstance(obj, str):
68
  s = obj.strip()
 
73
  if isinstance(obj, dict):
74
  for k, v in obj.items():
75
  new_key = f"{parent_key}{sep}{k}" if parent_key else k
76
+ items.update(self._flatten_object(v, new_key, sep))
77
  elif isinstance(obj, list):
78
  items[parent_key or "list"] = "List"
79
  else:
 
92
  for i, row in enumerate(ds_stream):
93
  if i >= 10: break
94
 
95
+ # Sanitize entire row to prevent JSON crash on UI
96
  clean_row = self._sanitize_for_json(row)
97
  sample_rows.append(clean_row)
98
 
 
100
  flattened = self._flatten_object(row)
101
  available_paths.update(flattened.keys())
102
 
103
+ # List Detection
104
  for k, v in row.items():
105
  if k not in schema_map: schema_map[k] = {"type": "Object"}
106
  val = v
 
126
  except Exception as e:
127
  return {"status": "error", "message": str(e)}
128
 
129
+ # --- 2. LOGIC ---
 
 
130
 
131
  def _get_value_by_path(self, obj, path):
132
  if not path: return obj
133
  keys = path.split('.')
134
  current = obj
 
135
  for key in keys:
136
  if isinstance(current, str):
137
  s = current.strip()
138
  if (s.startswith('{') and s.endswith('}')) or (s.startswith('[') and s.endswith(']')):
139
  try: current = json.loads(s)
140
+ except: pass
 
141
  if isinstance(current, dict) and key in current:
142
  current = current[key]
143
+ else: return None
 
144
  return current
145
 
146
  def _extract_from_list_logic(self, row, source_col, filter_key, filter_val, target_path):
 
150
  except: return None
151
  if not isinstance(data, list): return None
152
 
153
+ matched = None
154
  for item in data:
155
  if str(item.get(filter_key, '')) == str(filter_val):
156
+ matched = item
157
  break
158
+ if matched: return self._get_value_by_path(matched, target_path)
 
 
159
  return None
160
 
161
  def _apply_projection(self, row, recipe):
162
  new_row = {}
163
+ # Context
164
  eval_context = row.copy()
165
  eval_context['row'] = row
166
  eval_context['json'] = json
167
  eval_context['re'] = re
168
 
169
+ for col in recipe['columns']:
 
 
 
170
  try:
171
+ c_type = col.get('type', 'simple')
172
+ name = col['name']
173
+ if c_type == 'simple':
174
+ new_row[name] = self._get_value_by_path(row, col['source'])
175
+ elif c_type == 'list_search':
176
+ new_row[name] = self._extract_from_list_logic(row, col['source'], col['filter_key'], col['filter_val'], col['target_key'])
177
+ elif c_type == 'python':
178
+ new_row[name] = eval(col['expression'], {}, eval_context)
 
179
  except Exception as e:
180
+ raise ValueError(f"Column '{col['name']}' error: {e}")
 
181
  return new_row
182
 
183
+ # --- 3. PREVIEW & PUSH ---
 
 
 
 
 
 
 
 
 
 
 
 
 
184
 
185
  def preview_transform(self, dataset_id, config, split, recipe):
186
  conf = config if config != 'default' else None
 
187
  try:
188
+ ds = load_dataset(dataset_id, name=conf, split=split, streaming=True, token=self.token)
189
+ out = []
190
+ for i, row in enumerate(ds):
191
+ if len(out) >= 5: break
 
192
 
193
  # Filter
 
194
  if recipe.get('filter_rule'):
195
  try:
196
  ctx = row.copy()
197
  ctx['row'] = row
198
  ctx['json'] = json
199
  ctx['re'] = re
200
+ if not eval(recipe['filter_rule'], {}, ctx): continue
201
+ except: continue # Skip crashing filters in preview
202
 
203
+ try:
204
+ # Apply & Sanitize
205
+ proj = self._apply_projection(row, recipe)
206
+ out.append(self._sanitize_for_json(proj))
207
+ except Exception as e:
208
+ out.append({"_preview_error": str(e)})
209
+ return out
 
 
 
210
  except Exception as e:
211
  raise e
212
 
213
  def _generate_card(self, source_id, target_id, recipe, license_name):
214
+ content = f"# {target_id}\nDerived from [{source_id}](https://huggingface.co/datasets/{source_id}).\n\n## Recipe\n"
215
+ for c in recipe['columns']:
216
+ content += f"- **{c['name']}**: {c.get('type')} ({c.get('source') or c.get('expression')})\n"
 
 
 
 
 
 
 
 
 
 
217
  content += f"\n**License:** {license_name}"
218
+ return DatasetCard.from_template(DatasetCardData(license=license_name, tags=["etl"]), content=content)
219
 
220
  def process_and_push(self, source_id, config, split, target_id, recipe, max_rows=None, new_license=None):
221
+ logger.info(f"Pushing {source_id} -> {target_id}")
222
  conf = config if config != 'default' else None
223
 
224
  def gen():
225
+ ds = load_dataset(source_id, name=conf, split=split, streaming=True, token=self.token)
226
  count = 0
227
+ for i, row in enumerate(ds):
228
  if max_rows and count >= int(max_rows): break
229
+
230
+ # Filter
231
  if recipe.get('filter_rule'):
232
  try:
233
  ctx = row.copy()
 
235
  ctx['json'] = json
236
  ctx['re'] = re
237
  if not eval(recipe['filter_rule'], {}, ctx): continue
238
+ except Exception as e: raise ValueError(f"Filter error row {i}: {e}")
 
239
 
240
+ # Project
241
  try:
242
  yield self._apply_projection(row, recipe)
243
  count += 1
244
+ except Exception as e: raise ValueError(f"Row {i} error: {e}")
 
245
 
246
  try:
247
+ new_ds = datasets.Dataset.from_generator(gen)
248
+ new_ds.push_to_hub(target_id, token=self.token)
249
  try:
250
  card = self._generate_card(source_id, target_id, recipe, new_license or "unknown")
251
  card.push_to_hub(target_id, token=self.token)
252
  except: pass
253
+ return {"status": "success", "rows_processed": len(new_ds)}
254
  except Exception as e:
 
255
  return {"status": "failed", "error": str(e)}