HF-Dataset-Commander / processor.py
broadfield-dev's picture
Update processor.py
d6558bb verified
raw
history blame
12.7 kB
import json
import logging
import datasets
from datasets import load_dataset, get_dataset_config_names, get_dataset_infos
from huggingface_hub import HfApi, DatasetCard, DatasetCardData
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class DatasetCommandCenter:
def __init__(self, token=None):
self.token = token
def get_dataset_metadata(self, dataset_id):
try:
try:
configs = get_dataset_config_names(dataset_id, token=self.token)
except:
configs = ['default']
try:
infos = get_dataset_infos(dataset_id, token=self.token)
first_conf = configs[0]
if first_conf in infos:
splits = list(infos[first_conf].splits.keys())
else:
splits = list(infos.values())[0].splits.keys()
except:
splits = ['train', 'test', 'validation']
# license
try:
configs = get_dataset_config_names(dataset_id, token=self.token)
except:
configs = ['default']
license_name = "unknown"
try:
infos = get_dataset_infos(dataset_id, token=self.token)
# Try to grab license from first config
first = list(infos.values())[0]
license_name = first.license or "unknown"
except:
pass
return {
"status": "success",
"configs": configs,
# We assume user will pick splits later, just return configs + license hint
"license_detected": license_name
}
except Exception as e:
return {"status": "error", "message": str(e)}
def get_splits_for_config(self, dataset_id, config_name):
try:
infos = get_dataset_infos(dataset_id, config_name=config_name, token=self.token)
splits = list(infos[config_name].splits.keys())
return {"status": "success", "splits": splits}
except:
return {"status": "success", "splits": ['train', 'test', 'validation']}
# --- HELPER: Recursive JSON/Dot Notation Getter ---
def _get_value_by_path(self, obj, path):
if not path: return obj
keys = path.split('.')
current = obj
for key in keys:
# Auto-parse JSON string if encountered
if isinstance(current, str):
s = current.strip()
if (s.startswith('{') and s.endswith('}')) or (s.startswith('[') and s.endswith(']')):
try:
current = json.loads(s)
except:
pass
if isinstance(current, dict) and key in current:
current = current[key]
else:
return None
return current
# --- HELPER: List Search Logic ---
def _extract_from_list_logic(self, row, source_col, filter_key, filter_val, target_path):
"""
Logic: Look inside row[source_col] (which is a list).
Find first item where item[filter_key] == filter_val.
Then extract item[target_path].
"""
# 1. Get the list (handling JSON string if needed)
data = row.get(source_col)
if isinstance(data, str):
try:
data = json.loads(data)
except:
return None
if not isinstance(data, list):
return None
# 2. Search the list
matched_item = None
for item in data:
# We treat values as strings for comparison to be safe
if str(item.get(filter_key, '')) == str(filter_val):
matched_item = item
break
if matched_item:
# 3. Extract the target (supporting nested json parsing via dot notation)
# e.g. target_path = "content.analysis"
return self._get_value_by_path(matched_item, target_path)
return None
def _flatten_schema(self, obj, parent='', visited=None):
if visited is None: visited = set()
items = []
# Avoid infinite recursion
if id(obj) in visited: return []
visited.add(id(obj))
# Handle JSON strings
if isinstance(obj, str):
s = obj.strip()
if (s.startswith('{') and s.endswith('}')) or (s.startswith('[') and s.endswith(']')):
try:
obj = json.loads(s)
except:
pass
if isinstance(obj, dict):
for k, v in obj.items():
full_key = f"{parent}.{k}" if parent else k
items.append((full_key, type(v).__name__))
items.extend(self._flatten_schema(v, full_key, visited))
elif isinstance(obj, list) and len(obj) > 0:
# For lists, we just peek at the first item to guess schema
full_key = f"{parent}[]" if parent else "[]"
items.append((parent, "List")) # Mark the parent as a List
items.extend(self._flatten_schema(obj[0], full_key, visited))
return items
def inspect_dataset(self, dataset_id, config, split):
try:
conf = config if config != 'default' else None
ds_stream = load_dataset(dataset_id, name=conf, split=split, streaming=True, token=self.token)
sample_rows = []
schema_map = {} # stores { "col_name": { "is_list": bool, "keys": [] } }
for i, row in enumerate(ds_stream):
if i >= 10: break
# Create clean sample for UI
clean_row = {}
for k, v in row.items():
if not isinstance(v, (str, int, float, bool, list, dict, type(None))):
clean_row[k] = str(v)
else:
clean_row[k] = v
sample_rows.append(clean_row)
# Analyze Schema
for k, v in row.items():
if k not in schema_map:
schema_map[k] = {"is_list": False, "keys": set()}
# Check if it's a list (or json-string list)
val = v
if isinstance(val, str):
try:
val = json.loads(val)
except: pass
if isinstance(val, list):
schema_map[k]["is_list"] = True
if len(val) > 0 and isinstance(val[0], dict):
schema_map[k]["keys"].update(val[0].keys())
elif isinstance(val, dict):
schema_map[k]["keys"].update(val.keys())
# Format schema for UI
formatted_schema = {}
for k, info in schema_map.items():
formatted_schema[k] = {
"type": "List" if info["is_list"] else "Object",
"keys": list(info["keys"])
}
return {
"status": "success",
"samples": sample_rows,
"schema": formatted_schema,
"dataset_id": dataset_id
}
except Exception as e:
return {"status": "error", "message": str(e)}
def _apply_projection(self, row, recipe):
new_row = {}
for col_def in recipe['columns']:
t_type = col_def.get('type', 'simple')
if t_type == 'simple':
# Standard Dot Notation
new_row[col_def['name']] = self._get_value_by_path(row, col_def['source'])
elif t_type == 'list_search':
# GET x WHERE y=z
val = self._extract_from_list_logic(
row,
col_def['source'],
col_def['filter_key'],
col_def['filter_val'],
col_def['target_key']
)
new_row[col_def['name']] = val
elif t_type == 'python':
# Advanced Python Eval
try:
context = row.copy()
# We inject 'json' module into context for user scripts
context['json'] = json
val = eval(col_def['expression'], {}, context)
new_row[col_def['name']] = val
except:
new_row[col_def['name']] = None
return new_row
def _passes_filter(self, row, filter_str):
if not filter_str: return True
try:
context = row.copy()
return eval(filter_str, {}, context)
except:
return False
def _generate_card(self, source_id, target_id, recipe, license_name):
"""
Generates a README.md with YAML metadata and a report of operations.
"""
# 1. YAML Metadata
card_data = DatasetCardData(
language="en",
license=license_name,
tags=["dataset-command-center", "etl", "generated-dataset"],
base_model=source_id, # Linking source
)
# 2. Description & Recipe Table
content = f"""
# {target_id.split('/')[-1]}
This dataset is a transformation of [{source_id}](https://huggingface.co/datasets/{source_id}).
It was generated using the **Hugging Face Dataset Command Center**.
## Transformation Recipe
The following operations were applied to the source data:
| Target Column | Source | Type | Logic / Filter |
|---------------|--------|------|----------------|
"""
for col in recipe['columns']:
c_type = col.get('type', 'simple')
c_name = col['name']
c_src = col.get('source', '-')
if c_type == 'simple':
logic = "Direct Mapping"
elif c_type == 'list_search':
logic = f"Get `{col['target_key']}` where `{col['filter_key']} == {col['filter_val']}`"
elif c_type == 'python':
logic = f"`{col.get('expression')}`"
else:
logic = "-"
content += f"| **{c_name}** | `{c_src}` | {c_type} | {logic} |\n"
if recipe.get('filter_rule'):
content += f"\n### Row Filtering\n**Filter Applied:** `{recipe['filter_rule']}`\n"
content += f"\n## Original License\nThis dataset inherits the license: `{license_name}` from the source."
card = DatasetCard.from_template(card_data, content=content)
return card
def process_and_push(self, source_id, config, split, target_id, recipe, max_rows=None):
logger.info(f"Job started: {source_id}")
conf = config if config != 'default' else None
def gen():
ds_stream = load_dataset(source_id, name=conf, split=split, streaming=True, token=self.token)
count = 0
for row in ds_stream:
if max_rows and count >= int(max_rows): break
if self._passes_filter(row, recipe.get('filter_rule')):
yield self._apply_projection(row, recipe)
count += 1
try:
new_dataset = datasets.Dataset.from_generator(gen)
new_dataset.push_to_hub(target_id, token=self.token)
# 2. GENERATE & PUSH CARD
try:
card = self._generate_card(source_id, target_id, recipe, new_license or "unknown")
card.push_to_hub(target_id, token=self.token)
except Exception as e:
logger.warning(f"Could not push dataset card: {e}")
return {"status": "success", "rows_processed": len(new_dataset)}
except Exception as e:
return {"status": "error", "message": str(e)}
def preview_transform(self, dataset_id, config, split, recipe):
conf = config if config != 'default' else None
ds_stream = load_dataset(dataset_id, name=conf, split=split, streaming=True, token=self.token)
processed = []
for row in ds_stream:
if len(processed) >= 5: break
if self._passes_filter(row, recipe.get('filter_rule')):
processed.append(self._apply_projection(row, recipe))
return processed