Update processor.py
Browse files- processor.py +46 -24
processor.py
CHANGED
|
@@ -179,35 +179,35 @@ class DatasetCommandCenter:
|
|
| 179 |
# 2. CORE EXTRACTION LOGIC
|
| 180 |
# ==========================================
|
| 181 |
|
| 182 |
-
|
| 183 |
"""
|
| 184 |
-
Retrieves
|
| 185 |
"""
|
| 186 |
if not path: return obj
|
| 187 |
|
| 188 |
-
# 1. Try Direct Access (
|
| 189 |
try:
|
| 190 |
-
|
| 191 |
-
|
| 192 |
-
|
|
|
|
|
|
|
| 193 |
|
| 194 |
-
# 2. Try Dot Notation
|
| 195 |
keys = path.split('.')
|
| 196 |
current = obj
|
| 197 |
|
| 198 |
for i, key in enumerate(keys):
|
| 199 |
try:
|
| 200 |
-
#
|
| 201 |
-
if isinstance(current,
|
| 202 |
-
current = current
|
| 203 |
else:
|
| 204 |
-
|
| 205 |
except:
|
| 206 |
return None
|
| 207 |
|
| 208 |
-
if
|
| 209 |
-
|
| 210 |
-
# Lazy Parsing: Only parse string if we need to go deeper
|
| 211 |
is_last_key = (i == len(keys) - 1)
|
| 212 |
if not is_last_key and isinstance(current, str):
|
| 213 |
s = current.strip()
|
|
@@ -219,6 +219,8 @@ class DatasetCommandCenter:
|
|
| 219 |
|
| 220 |
return current
|
| 221 |
|
|
|
|
|
|
|
| 222 |
def _extract_from_list_logic(self, row, source_col, filter_key, filter_val, target_path):
|
| 223 |
"""
|
| 224 |
FROM source_col FIND ITEM WHERE filter_key == filter_val EXTRACT target_path
|
|
@@ -249,11 +251,9 @@ class DatasetCommandCenter:
|
|
| 249 |
def _apply_projection(self, row, recipe):
|
| 250 |
new_row = {}
|
| 251 |
|
| 252 |
-
#
|
| 253 |
-
|
| 254 |
-
eval_context
|
| 255 |
-
eval_context['json'] = json
|
| 256 |
-
eval_context['re'] = re
|
| 257 |
|
| 258 |
for col_def in recipe['columns']:
|
| 259 |
t_type = col_def.get('type', 'simple')
|
|
@@ -261,9 +261,11 @@ class DatasetCommandCenter:
|
|
| 261 |
|
| 262 |
try:
|
| 263 |
if t_type == 'simple':
|
|
|
|
| 264 |
new_row[target_col] = self._get_value_by_path(row, col_def['source'])
|
| 265 |
|
| 266 |
elif t_type == 'list_search':
|
|
|
|
| 267 |
new_row[target_col] = self._extract_from_list_logic(
|
| 268 |
row,
|
| 269 |
col_def['source'],
|
|
@@ -273,6 +275,13 @@ class DatasetCommandCenter:
|
|
| 273 |
)
|
| 274 |
|
| 275 |
elif t_type == 'python':
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 276 |
val = eval(col_def['expression'], {}, eval_context)
|
| 277 |
new_row[target_col] = val
|
| 278 |
|
|
@@ -394,19 +403,24 @@ The following operations were applied to the source data:
|
|
| 394 |
conf = config if config != 'default' else None
|
| 395 |
|
| 396 |
try:
|
|
|
|
| 397 |
ds_stream = load_dataset(dataset_id, name=conf, split=split, streaming=True, token=self.token)
|
| 398 |
processed = []
|
| 399 |
|
| 400 |
for i, row in enumerate(ds_stream):
|
| 401 |
-
|
|
|
|
|
|
|
| 402 |
|
| 403 |
-
# CRITICAL
|
|
|
|
| 404 |
row = dict(row)
|
| 405 |
|
| 406 |
-
#
|
| 407 |
passed = True
|
| 408 |
if recipe.get('filter_rule'):
|
| 409 |
try:
|
|
|
|
| 410 |
ctx = row.copy()
|
| 411 |
ctx['row'] = row
|
| 412 |
ctx['json'] = json
|
|
@@ -414,17 +428,25 @@ The following operations were applied to the source data:
|
|
| 414 |
if not eval(recipe['filter_rule'], {}, ctx):
|
| 415 |
passed = False
|
| 416 |
except:
|
|
|
|
| 417 |
passed = False
|
| 418 |
|
| 419 |
if passed:
|
| 420 |
try:
|
|
|
|
| 421 |
new_row = self._apply_projection(row, recipe)
|
| 422 |
-
|
|
|
|
|
|
|
| 423 |
clean_new_row = self._sanitize_for_json(new_row)
|
|
|
|
| 424 |
processed.append(clean_new_row)
|
| 425 |
except Exception as e:
|
| 426 |
-
|
|
|
|
| 427 |
|
| 428 |
return processed
|
|
|
|
| 429 |
except Exception as e:
|
|
|
|
| 430 |
raise e
|
|
|
|
| 179 |
# 2. CORE EXTRACTION LOGIC
|
| 180 |
# ==========================================
|
| 181 |
|
| 182 |
+
def _get_value_by_path(self, obj, path):
|
| 183 |
"""
|
| 184 |
+
Retrieves value. PRIORITY: Direct Key Access (Fastest).
|
| 185 |
"""
|
| 186 |
if not path: return obj
|
| 187 |
|
| 188 |
+
# 1. Try Direct Access (Fastest, handles 99% of cases)
|
| 189 |
try:
|
| 190 |
+
# We treat obj as a dict-like object (works for dict, UserDict, LazyRow)
|
| 191 |
+
# We DO NOT check isinstance(dict) to allow duck-typing
|
| 192 |
+
return obj[path]
|
| 193 |
+
except:
|
| 194 |
+
pass
|
| 195 |
|
| 196 |
+
# 2. Try Dot Notation (Only if direct access failed)
|
| 197 |
keys = path.split('.')
|
| 198 |
current = obj
|
| 199 |
|
| 200 |
for i, key in enumerate(keys):
|
| 201 |
try:
|
| 202 |
+
# Array/List index access support (e.g. solutions.0.code)
|
| 203 |
+
if isinstance(current, list) and key.isdigit():
|
| 204 |
+
current = current[int(key)]
|
| 205 |
else:
|
| 206 |
+
current = current[key]
|
| 207 |
except:
|
| 208 |
return None
|
| 209 |
|
| 210 |
+
# Lazy Parsing: Only parse string if we strictly need to go deeper
|
|
|
|
|
|
|
| 211 |
is_last_key = (i == len(keys) - 1)
|
| 212 |
if not is_last_key and isinstance(current, str):
|
| 213 |
s = current.strip()
|
|
|
|
| 219 |
|
| 220 |
return current
|
| 221 |
|
| 222 |
+
|
| 223 |
+
|
| 224 |
def _extract_from_list_logic(self, row, source_col, filter_key, filter_val, target_path):
|
| 225 |
"""
|
| 226 |
FROM source_col FIND ITEM WHERE filter_key == filter_val EXTRACT target_path
|
|
|
|
| 251 |
def _apply_projection(self, row, recipe):
|
| 252 |
new_row = {}
|
| 253 |
|
| 254 |
+
# OPTIMIZATION: Only create eval_context if we actually have a Python column.
|
| 255 |
+
# This prevents expensive row.copy() calls for Simple Path operations.
|
| 256 |
+
eval_context = None
|
|
|
|
|
|
|
| 257 |
|
| 258 |
for col_def in recipe['columns']:
|
| 259 |
t_type = col_def.get('type', 'simple')
|
|
|
|
| 261 |
|
| 262 |
try:
|
| 263 |
if t_type == 'simple':
|
| 264 |
+
# Fast path - no context needed
|
| 265 |
new_row[target_col] = self._get_value_by_path(row, col_def['source'])
|
| 266 |
|
| 267 |
elif t_type == 'list_search':
|
| 268 |
+
# Fast path - no context needed
|
| 269 |
new_row[target_col] = self._extract_from_list_logic(
|
| 270 |
row,
|
| 271 |
col_def['source'],
|
|
|
|
| 275 |
)
|
| 276 |
|
| 277 |
elif t_type == 'python':
|
| 278 |
+
# Lazy Context Creation: Only pay the cost if used
|
| 279 |
+
if eval_context is None:
|
| 280 |
+
eval_context = row.copy()
|
| 281 |
+
eval_context['row'] = row
|
| 282 |
+
eval_context['json'] = json
|
| 283 |
+
eval_context['re'] = re
|
| 284 |
+
|
| 285 |
val = eval(col_def['expression'], {}, eval_context)
|
| 286 |
new_row[target_col] = val
|
| 287 |
|
|
|
|
| 403 |
conf = config if config != 'default' else None
|
| 404 |
|
| 405 |
try:
|
| 406 |
+
# Load dataset in streaming mode
|
| 407 |
ds_stream = load_dataset(dataset_id, name=conf, split=split, streaming=True, token=self.token)
|
| 408 |
processed = []
|
| 409 |
|
| 410 |
for i, row in enumerate(ds_stream):
|
| 411 |
+
# Stop after 5 successful rows
|
| 412 |
+
if len(processed) >= 5:
|
| 413 |
+
break
|
| 414 |
|
| 415 |
+
# CRITICAL: Force materialization from LazyRow to standard Dict.
|
| 416 |
+
# This fixes the interaction between Streaming datasets and JSON serialization.
|
| 417 |
row = dict(row)
|
| 418 |
|
| 419 |
+
# --- Filter Logic ---
|
| 420 |
passed = True
|
| 421 |
if recipe.get('filter_rule'):
|
| 422 |
try:
|
| 423 |
+
# Create context only for the filter check
|
| 424 |
ctx = row.copy()
|
| 425 |
ctx['row'] = row
|
| 426 |
ctx['json'] = json
|
|
|
|
| 428 |
if not eval(recipe['filter_rule'], {}, ctx):
|
| 429 |
passed = False
|
| 430 |
except:
|
| 431 |
+
# If filter errors out (e.g. missing column), treat as filtered out
|
| 432 |
passed = False
|
| 433 |
|
| 434 |
if passed:
|
| 435 |
try:
|
| 436 |
+
# --- Projection Logic ---
|
| 437 |
new_row = self._apply_projection(row, recipe)
|
| 438 |
+
|
| 439 |
+
# --- Sanitization ---
|
| 440 |
+
# Convert NaNs, Infinity, and complex objects to prevent browser/Flask crash
|
| 441 |
clean_new_row = self._sanitize_for_json(new_row)
|
| 442 |
+
|
| 443 |
processed.append(clean_new_row)
|
| 444 |
except Exception as e:
|
| 445 |
+
# Capture specific row errors for the UI
|
| 446 |
+
processed.append({"_preview_error": f"Row {i} Error: {str(e)}"})
|
| 447 |
|
| 448 |
return processed
|
| 449 |
+
|
| 450 |
except Exception as e:
|
| 451 |
+
# Raise global errors (like 404 Dataset Not Found) so the UI sees them
|
| 452 |
raise e
|