broadfield-dev commited on
Commit
97353a3
·
verified ·
1 Parent(s): 22285df

Update processor.py

Browse files
Files changed (1) hide show
  1. processor.py +158 -91
processor.py CHANGED
@@ -1,6 +1,7 @@
1
  import json
2
  import logging
3
- from datasets import load_dataset, Dataset, Features, Value
 
4
  from huggingface_hub import HfApi
5
 
6
  # Configure logging
@@ -11,63 +12,115 @@ class DatasetCommandCenter:
11
  def __init__(self, token=None):
12
  self.token = token
13
 
14
- def inspect_dataset(self, dataset_id, split="train", config=None):
15
  """
16
- Loads the first N rows and detects JSON structures.
17
  """
18
  try:
19
- # Load in streaming mode to avoid RAM issues
20
- ds_stream = load_dataset(dataset_id, config, split=split, streaming=True, token=self.token)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
21
 
22
- # Peek at the first 5 rows
23
  sample_rows = []
24
  for i, row in enumerate(ds_stream):
25
  if i >= 5: break
26
- sample_rows.append(row)
 
 
 
 
 
 
 
 
 
 
27
 
28
  # Analyze Columns
29
  analysis = {}
30
- if sample_rows:
31
- keys = sample_rows[0].keys()
32
- for k in keys:
33
- # Check if it looks like JSON
34
- is_json_candidate = False
35
- sample_val = sample_rows[0][k]
36
-
37
- if isinstance(sample_val, str):
38
- sample_val = sample_val.strip()
39
- if (sample_val.startswith('{') and sample_val.endswith('}')) or \
40
- (sample_val.startswith('[') and sample_val.endswith(']')):
41
- try:
42
- json.loads(sample_val)
43
- is_json_candidate = True
44
- except:
45
- pass
46
-
47
- analysis[k] = {
48
- "type": str(type(sample_rows[0][k]).__name__),
49
- "is_json": is_json_candidate
50
- }
51
 
52
  return {
53
  "status": "success",
54
  "samples": sample_rows,
55
- "analysis": analysis,
56
- "dataset_id": dataset_id
57
  }
58
  except Exception as e:
59
  return {"status": "error", "message": str(e)}
60
 
61
  def _apply_transformations(self, row, recipe):
62
  """
63
- Applies a single row transformation based on the recipe.
64
- Recipe format:
65
- {
66
- "json_expansions": [{"col": "meta", "keys": ["url", "id"]}],
67
- "renames": {"old_col": "new_col"},
68
- "drops": ["unwanted_col"],
69
- "filters": ["len(text) > 50"] # List of python eval strings
70
- }
71
  """
72
  new_row = row.copy()
73
 
@@ -75,20 +128,40 @@ class DatasetCommandCenter:
75
  if "json_expansions" in recipe:
76
  for item in recipe["json_expansions"]:
77
  col_name = item["col"]
78
- target_keys = item["keys"] # List of keys to extract
 
 
 
 
 
79
 
80
- if col_name in new_row and isinstance(new_row[col_name], str):
 
 
 
 
81
  try:
82
- data = json.loads(new_row[col_name])
83
- for key in target_keys:
84
- # Support simple dot notation if needed, currently 1 level
 
 
 
 
 
 
 
 
 
 
 
85
  clean_key = key.replace('.', '_')
86
  new_col_name = f"{col_name}_{clean_key}"
87
- new_row[new_col_name] = data.get(key, None)
88
- except:
89
- # If parsing fails, fill None
90
- for key in target_keys:
91
- new_row[f"{col_name}_{key}"] = None
92
 
93
  # 2. Renames
94
  if "renames" in recipe:
@@ -105,64 +178,58 @@ class DatasetCommandCenter:
105
  return new_row
106
 
107
  def _passes_filter(self, row, filters):
108
- """
109
- Safe-ish eval for filtering.
110
- """
111
- if not filters:
112
- return True
113
-
114
- # We create a local context with the row data accessible as variables
115
- # e.g., if row has 'text', user can write "len(text) > 5"
116
  context = row.copy()
117
-
118
  for f_str in filters:
119
  try:
 
120
  if not eval(f_str, {}, context):
121
  return False
122
- except Exception as e:
123
- # If filter crashes (e.g. missing column), we skip the row or default to False
124
  return False
125
  return True
126
 
127
- def preview_transform(self, dataset_id, split, recipe):
128
- """Return a transformed sample of 5 rows"""
129
- ds_stream = load_dataset(dataset_id, split=split, streaming=True, token=self.token)
130
- processed = []
131
 
132
- for row in ds_stream:
133
- if len(processed) >= 5: break
134
-
135
- # Apply Filter
136
- if not self._passes_filter(row, recipe.get("filters", [])):
137
- continue
138
-
139
- # Apply Transform
140
- trans_row = self._apply_transformations(row, recipe)
141
- processed.append(trans_row)
142
-
143
- return processed
144
-
145
- def process_and_push(self, source_id, split, target_id, recipe, max_rows=None):
146
- """
147
- The heavy lifter: Streams, Transforms, Filters, and Pushes to Hub.
148
- """
149
- logger.info(f"Starting job: {source_id} -> {target_id}")
150
 
151
  def gen():
152
- ds_stream = load_dataset(source_id, split=split, streaming=True, token=self.token)
153
  count = 0
154
  for row in ds_stream:
155
  if max_rows and count >= int(max_rows):
156
  break
157
 
158
- if self._passes_filter(row, recipe.get("filters", [])):
159
- yield self._apply_transformations(row, recipe)
 
 
 
 
 
 
 
160
  count += 1
161
 
162
- # Create new dataset from generator
163
- # We let HF infer the features (schema) automatically from the first batch
164
- new_dataset = Dataset.from_generator(gen)
165
-
166
- # Push
167
- new_dataset.push_to_hub(target_id, token=self.token)
168
- return {"status": "success", "rows_processed": len(new_dataset)}
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
  import json
2
  import logging
3
+ import datasets
4
+ from datasets import load_dataset, get_dataset_config_names, get_dataset_infos
5
  from huggingface_hub import HfApi
6
 
7
  # Configure logging
 
12
  def __init__(self, token=None):
13
  self.token = token
14
 
15
+ def get_dataset_metadata(self, dataset_id):
16
  """
17
+ Step 1: Get available Configs (subsets) and Splits without downloading data.
18
  """
19
  try:
20
+ # 1. Get Configs (e.g., 'en', 'fr' or 'default')
21
+ try:
22
+ configs = get_dataset_config_names(dataset_id, token=self.token)
23
+ except Exception:
24
+ # Some datasets have no configs or throw errors, default to 'default' or None
25
+ configs = ['default']
26
+
27
+ # 2. Get Splits for the first config (to pre-populate)
28
+ # We will fetch specific splits for other configs dynamically if needed
29
+ selected_config = configs[0]
30
+
31
+ try:
32
+ # This fetches metadata (splits, columns) without downloading rows
33
+ infos = get_dataset_infos(dataset_id, token=self.token)
34
+ # If multiple configs, infos is a dict keyed by config name
35
+ if selected_config in infos:
36
+ splits = list(infos[selected_config].splits.keys())
37
+ else:
38
+ # Fallback if structure is flat
39
+ splits = list(infos.values())[0].splits.keys()
40
+ except:
41
+ # Fallback: try to just list simple splits
42
+ splits = ['train', 'test', 'validation']
43
+
44
+ return {
45
+ "status": "success",
46
+ "configs": configs,
47
+ "splits": list(splits)
48
+ }
49
+ except Exception as e:
50
+ return {"status": "error", "message": str(e)}
51
+
52
+ def get_splits_for_config(self, dataset_id, config_name):
53
+ """
54
+ Helper to update splits when user changes the Config dropdown
55
+ """
56
+ try:
57
+ infos = get_dataset_infos(dataset_id, config_name=config_name, token=self.token)
58
+ splits = list(infos[config_name].splits.keys())
59
+ return {"status": "success", "splits": splits}
60
+ except Exception as e:
61
+ # Fallback
62
+ return {"status": "success", "splits": ['train', 'test', 'validation']}
63
+
64
+ def inspect_dataset(self, dataset_id, config, split):
65
+ """
66
+ Step 2: Stream actual rows and detect JSON.
67
+ """
68
+ try:
69
+ # Handle 'default' config edge cases
70
+ conf = config if config != 'default' else None
71
+
72
+ ds_stream = load_dataset(dataset_id, name=conf, split=split, streaming=True, token=self.token)
73
 
 
74
  sample_rows = []
75
  for i, row in enumerate(ds_stream):
76
  if i >= 5: break
77
+ # Convert non-serializable objects (like PIL Images) to strings for preview
78
+ clean_row = {}
79
+ for k, v in row.items():
80
+ if not isinstance(v, (str, int, float, bool, list, dict, type(None))):
81
+ clean_row[k] = str(v)
82
+ else:
83
+ clean_row[k] = v
84
+ sample_rows.append(clean_row)
85
+
86
+ if not sample_rows:
87
+ return {"status": "error", "message": "Dataset is empty."}
88
 
89
  # Analyze Columns
90
  analysis = {}
91
+ keys = sample_rows[0].keys()
92
+
93
+ for k in keys:
94
+ sample_val = sample_rows[0][k]
95
+ col_type = type(sample_val).__name__
96
+ is_json_str = False
97
+
98
+ # Check if string looks like JSON
99
+ if isinstance(sample_val, str):
100
+ s = sample_val.strip()
101
+ if (s.startswith('{') and s.endswith('}')) or (s.startswith('[') and s.endswith(']')):
102
+ try:
103
+ json.loads(s)
104
+ is_json_str = True
105
+ except:
106
+ pass
107
+
108
+ analysis[k] = {
109
+ "type": col_type,
110
+ "is_json_string": is_json_str
111
+ }
112
 
113
  return {
114
  "status": "success",
115
  "samples": sample_rows,
116
+ "analysis": analysis
 
117
  }
118
  except Exception as e:
119
  return {"status": "error", "message": str(e)}
120
 
121
  def _apply_transformations(self, row, recipe):
122
  """
123
+ Apply Parsing, Renaming, Dropping, Filtering
 
 
 
 
 
 
 
124
  """
125
  new_row = row.copy()
126
 
 
128
  if "json_expansions" in recipe:
129
  for item in recipe["json_expansions"]:
130
  col_name = item["col"]
131
+ target_keys = item["keys"]
132
+
133
+ # Check if we need to parse string-json first
134
+ source_data = new_row.get(col_name)
135
+
136
+ parsed_obj = None
137
 
138
+ # Case A: It is already a dict (Struct)
139
+ if isinstance(source_data, dict):
140
+ parsed_obj = source_data
141
+ # Case B: It is a string (JSON String)
142
+ elif isinstance(source_data, str):
143
  try:
144
+ parsed_obj = json.loads(source_data)
145
+ except:
146
+ pass
147
+
148
+ if parsed_obj:
149
+ for key in target_keys:
150
+ # Handle Nested Dot Notation (e.g. "meta.url")
151
+ val = parsed_obj
152
+ parts = key.split('.')
153
+ try:
154
+ for p in parts:
155
+ val = val[p]
156
+
157
+ # Create new column name (replace dots with underscores)
158
  clean_key = key.replace('.', '_')
159
  new_col_name = f"{col_name}_{clean_key}"
160
+ new_row[new_col_name] = val
161
+ except:
162
+ # Key not found
163
+ clean_key = key.replace('.', '_')
164
+ new_row[f"{col_name}_{clean_key}"] = None
165
 
166
  # 2. Renames
167
  if "renames" in recipe:
 
178
  return new_row
179
 
180
  def _passes_filter(self, row, filters):
181
+ if not filters: return True
 
 
 
 
 
 
 
182
  context = row.copy()
 
183
  for f_str in filters:
184
  try:
185
+ # Safety: very basic eval.
186
  if not eval(f_str, {}, context):
187
  return False
188
+ except:
 
189
  return False
190
  return True
191
 
192
+ def process_and_push(self, source_id, config, split, target_id, recipe, max_rows=None):
193
+ logger.info(f"Starting job: {source_id} ({config}/{split}) -> {target_id}")
 
 
194
 
195
+ conf = config if config != 'default' else None
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
196
 
197
  def gen():
198
+ ds_stream = load_dataset(source_id, name=conf, split=split, streaming=True, token=self.token)
199
  count = 0
200
  for row in ds_stream:
201
  if max_rows and count >= int(max_rows):
202
  break
203
 
204
+ # Transform first (so filters apply to NEW schema if needed,
205
+ # OR change order depending on preference. Here we filter RAW data usually,
206
+ # but for JSON extraction we often filter on extracted fields.
207
+ # Let's Apply Transform -> Then Filter to allow filtering on extracted JSON fields)
208
+
209
+ trans_row = self._apply_transformations(row, recipe)
210
+
211
+ if self._passes_filter(trans_row, recipe.get("filters", [])):
212
+ yield trans_row
213
  count += 1
214
 
215
+ # Push to Hub
216
+ # Note: We must infer features or let HF do it.
217
+ # Using a generator allows HF to auto-detect the new schema.
218
+ try:
219
+ new_dataset = datasets.Dataset.from_generator(gen)
220
+ new_dataset.push_to_hub(target_id, token=self.token)
221
+ return {"status": "success", "rows_processed": len(new_dataset)}
222
+ except Exception as e:
223
+ logger.error(e)
224
+ raise e
225
+
226
+ def preview_transform(self, dataset_id, config, split, recipe):
227
+ conf = config if config != 'default' else None
228
+ ds_stream = load_dataset(dataset_id, name=conf, split=split, streaming=True, token=self.token)
229
+ processed = []
230
+ for i, row in enumerate(ds_stream):
231
+ if len(processed) >= 5: break
232
+ trans_row = self._apply_transformations(row, recipe)
233
+ if self._passes_filter(trans_row, recipe.get("filters", [])):
234
+ processed.append(trans_row)
235
+ return processed