broadfield-dev commited on
Commit
5c97387
·
verified ·
1 Parent(s): b8b84f7

Create processor.py

Browse files
Files changed (1) hide show
  1. processor.py +168 -0
processor.py ADDED
@@ -0,0 +1,168 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import json
2
+ import logging
3
+ from datasets import load_dataset, Dataset, Features, Value
4
+ from huggingface_hub import HfApi
5
+
6
+ # Configure logging
7
+ logging.basicConfig(level=logging.INFO)
8
+ logger = logging.getLogger(__name__)
9
+
10
+ class DatasetCommandCenter:
11
+ def __init__(self, token=None):
12
+ self.token = token
13
+
14
+ def inspect_dataset(self, dataset_id, split="train", config=None):
15
+ """
16
+ Loads the first N rows and detects JSON structures.
17
+ """
18
+ try:
19
+ # Load in streaming mode to avoid RAM issues
20
+ ds_stream = load_dataset(dataset_id, config, split=split, streaming=True, token=self.token)
21
+
22
+ # Peek at the first 5 rows
23
+ sample_rows = []
24
+ for i, row in enumerate(ds_stream):
25
+ if i >= 5: break
26
+ sample_rows.append(row)
27
+
28
+ # Analyze Columns
29
+ analysis = {}
30
+ if sample_rows:
31
+ keys = sample_rows[0].keys()
32
+ for k in keys:
33
+ # Check if it looks like JSON
34
+ is_json_candidate = False
35
+ sample_val = sample_rows[0][k]
36
+
37
+ if isinstance(sample_val, str):
38
+ sample_val = sample_val.strip()
39
+ if (sample_val.startswith('{') and sample_val.endswith('}')) or \
40
+ (sample_val.startswith('[') and sample_val.endswith(']')):
41
+ try:
42
+ json.loads(sample_val)
43
+ is_json_candidate = True
44
+ except:
45
+ pass
46
+
47
+ analysis[k] = {
48
+ "type": str(type(sample_rows[0][k]).__name__),
49
+ "is_json": is_json_candidate
50
+ }
51
+
52
+ return {
53
+ "status": "success",
54
+ "samples": sample_rows,
55
+ "analysis": analysis,
56
+ "dataset_id": dataset_id
57
+ }
58
+ except Exception as e:
59
+ return {"status": "error", "message": str(e)}
60
+
61
+ def _apply_transformations(self, row, recipe):
62
+ """
63
+ Applies a single row transformation based on the recipe.
64
+ Recipe format:
65
+ {
66
+ "json_expansions": [{"col": "meta", "keys": ["url", "id"]}],
67
+ "renames": {"old_col": "new_col"},
68
+ "drops": ["unwanted_col"],
69
+ "filters": ["len(text) > 50"] # List of python eval strings
70
+ }
71
+ """
72
+ new_row = row.copy()
73
+
74
+ # 1. JSON Expansions
75
+ if "json_expansions" in recipe:
76
+ for item in recipe["json_expansions"]:
77
+ col_name = item["col"]
78
+ target_keys = item["keys"] # List of keys to extract
79
+
80
+ if col_name in new_row and isinstance(new_row[col_name], str):
81
+ try:
82
+ data = json.loads(new_row[col_name])
83
+ for key in target_keys:
84
+ # Support simple dot notation if needed, currently 1 level
85
+ clean_key = key.replace('.', '_')
86
+ new_col_name = f"{col_name}_{clean_key}"
87
+ new_row[new_col_name] = data.get(key, None)
88
+ except:
89
+ # If parsing fails, fill None
90
+ for key in target_keys:
91
+ new_row[f"{col_name}_{key}"] = None
92
+
93
+ # 2. Renames
94
+ if "renames" in recipe:
95
+ for old, new in recipe["renames"].items():
96
+ if old in new_row:
97
+ new_row[new] = new_row.pop(old)
98
+
99
+ # 3. Drops
100
+ if "drops" in recipe:
101
+ for drop_col in recipe["drops"]:
102
+ if drop_col in new_row:
103
+ del new_row[drop_col]
104
+
105
+ return new_row
106
+
107
+ def _passes_filter(self, row, filters):
108
+ """
109
+ Safe-ish eval for filtering.
110
+ """
111
+ if not filters:
112
+ return True
113
+
114
+ # We create a local context with the row data accessible as variables
115
+ # e.g., if row has 'text', user can write "len(text) > 5"
116
+ context = row.copy()
117
+
118
+ for f_str in filters:
119
+ try:
120
+ if not eval(f_str, {}, context):
121
+ return False
122
+ except Exception as e:
123
+ # If filter crashes (e.g. missing column), we skip the row or default to False
124
+ return False
125
+ return True
126
+
127
+ def preview_transform(self, dataset_id, split, recipe):
128
+ """Return a transformed sample of 5 rows"""
129
+ ds_stream = load_dataset(dataset_id, split=split, streaming=True, token=self.token)
130
+ processed = []
131
+
132
+ for row in ds_stream:
133
+ if len(processed) >= 5: break
134
+
135
+ # Apply Filter
136
+ if not self._passes_filter(row, recipe.get("filters", [])):
137
+ continue
138
+
139
+ # Apply Transform
140
+ trans_row = self._apply_transformations(row, recipe)
141
+ processed.append(trans_row)
142
+
143
+ return processed
144
+
145
+ def process_and_push(self, source_id, split, target_id, recipe, max_rows=None):
146
+ """
147
+ The heavy lifter: Streams, Transforms, Filters, and Pushes to Hub.
148
+ """
149
+ logger.info(f"Starting job: {source_id} -> {target_id}")
150
+
151
+ def gen():
152
+ ds_stream = load_dataset(source_id, split=split, streaming=True, token=self.token)
153
+ count = 0
154
+ for row in ds_stream:
155
+ if max_rows and count >= int(max_rows):
156
+ break
157
+
158
+ if self._passes_filter(row, recipe.get("filters", [])):
159
+ yield self._apply_transformations(row, recipe)
160
+ count += 1
161
+
162
+ # Create new dataset from generator
163
+ # We let HF infer the features (schema) automatically from the first batch
164
+ new_dataset = Dataset.from_generator(gen)
165
+
166
+ # Push
167
+ new_dataset.push_to_hub(target_id, token=self.token)
168
+ return {"status": "success", "rows_processed": len(new_dataset)}