Spaces:
Sleeping
Sleeping
| import json | |
| import logging | |
| import datasets | |
| from datasets import load_dataset, get_dataset_config_names, get_dataset_infos | |
| from huggingface_hub import HfApi, DatasetCard, DatasetCardData | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| class DatasetCommandCenter: | |
| def __init__(self, token=None): | |
| self.token = token | |
| def get_dataset_metadata(self, dataset_id): | |
| try: | |
| try: | |
| configs = get_dataset_config_names(dataset_id, token=self.token) | |
| except: | |
| configs = ['default'] | |
| try: | |
| infos = get_dataset_infos(dataset_id, token=self.token) | |
| first_conf = configs[0] | |
| if first_conf in infos: | |
| splits = list(infos[first_conf].splits.keys()) | |
| else: | |
| splits = list(infos.values())[0].splits.keys() | |
| except: | |
| splits = ['train', 'test', 'validation'] | |
| # license | |
| try: | |
| configs = get_dataset_config_names(dataset_id, token=self.token) | |
| except: | |
| configs = ['default'] | |
| license_name = "unknown" | |
| try: | |
| infos = get_dataset_infos(dataset_id, token=self.token) | |
| # Try to grab license from first config | |
| first = list(infos.values())[0] | |
| license_name = first.license or "unknown" | |
| except: | |
| pass | |
| return { | |
| "status": "success", | |
| "configs": configs, | |
| # We assume user will pick splits later, just return configs + license hint | |
| "license_detected": license_name | |
| } | |
| except Exception as e: | |
| return {"status": "error", "message": str(e)} | |
| def get_splits_for_config(self, dataset_id, config_name): | |
| try: | |
| infos = get_dataset_infos(dataset_id, config_name=config_name, token=self.token) | |
| splits = list(infos[config_name].splits.keys()) | |
| return {"status": "success", "splits": splits} | |
| except: | |
| return {"status": "success", "splits": ['train', 'test', 'validation']} | |
| # --- HELPER: Recursive JSON/Dot Notation Getter --- | |
| def _get_value_by_path(self, obj, path): | |
| if not path: return obj | |
| keys = path.split('.') | |
| current = obj | |
| for key in keys: | |
| # Auto-parse JSON string if encountered | |
| if isinstance(current, str): | |
| s = current.strip() | |
| if (s.startswith('{') and s.endswith('}')) or (s.startswith('[') and s.endswith(']')): | |
| try: | |
| current = json.loads(s) | |
| except: | |
| pass | |
| if isinstance(current, dict) and key in current: | |
| current = current[key] | |
| else: | |
| return None | |
| return current | |
| # --- HELPER: List Search Logic --- | |
| def _extract_from_list_logic(self, row, source_col, filter_key, filter_val, target_path): | |
| """ | |
| Logic: Look inside row[source_col] (which is a list). | |
| Find first item where item[filter_key] == filter_val. | |
| Then extract item[target_path]. | |
| """ | |
| # 1. Get the list (handling JSON string if needed) | |
| data = row.get(source_col) | |
| if isinstance(data, str): | |
| try: | |
| data = json.loads(data) | |
| except: | |
| return None | |
| if not isinstance(data, list): | |
| return None | |
| # 2. Search the list | |
| matched_item = None | |
| for item in data: | |
| # We treat values as strings for comparison to be safe | |
| if str(item.get(filter_key, '')) == str(filter_val): | |
| matched_item = item | |
| break | |
| if matched_item: | |
| # 3. Extract the target (supporting nested json parsing via dot notation) | |
| # e.g. target_path = "content.analysis" | |
| return self._get_value_by_path(matched_item, target_path) | |
| return None | |
| def _flatten_schema(self, obj, parent='', visited=None): | |
| if visited is None: visited = set() | |
| items = [] | |
| # Avoid infinite recursion | |
| if id(obj) in visited: return [] | |
| visited.add(id(obj)) | |
| # Handle JSON strings | |
| if isinstance(obj, str): | |
| s = obj.strip() | |
| if (s.startswith('{') and s.endswith('}')) or (s.startswith('[') and s.endswith(']')): | |
| try: | |
| obj = json.loads(s) | |
| except: | |
| pass | |
| if isinstance(obj, dict): | |
| for k, v in obj.items(): | |
| full_key = f"{parent}.{k}" if parent else k | |
| items.append((full_key, type(v).__name__)) | |
| items.extend(self._flatten_schema(v, full_key, visited)) | |
| elif isinstance(obj, list) and len(obj) > 0: | |
| # For lists, we just peek at the first item to guess schema | |
| full_key = f"{parent}[]" if parent else "[]" | |
| items.append((parent, "List")) # Mark the parent as a List | |
| items.extend(self._flatten_schema(obj[0], full_key, visited)) | |
| return items | |
| def inspect_dataset(self, dataset_id, config, split): | |
| try: | |
| conf = config if config != 'default' else None | |
| ds_stream = load_dataset(dataset_id, name=conf, split=split, streaming=True, token=self.token) | |
| sample_rows = [] | |
| schema_map = {} # stores { "col_name": { "is_list": bool, "keys": [] } } | |
| for i, row in enumerate(ds_stream): | |
| if i >= 10: break | |
| # Create clean sample for UI | |
| clean_row = {} | |
| for k, v in row.items(): | |
| if not isinstance(v, (str, int, float, bool, list, dict, type(None))): | |
| clean_row[k] = str(v) | |
| else: | |
| clean_row[k] = v | |
| sample_rows.append(clean_row) | |
| # Analyze Schema | |
| for k, v in row.items(): | |
| if k not in schema_map: | |
| schema_map[k] = {"is_list": False, "keys": set()} | |
| # Check if it's a list (or json-string list) | |
| val = v | |
| if isinstance(val, str): | |
| try: | |
| val = json.loads(val) | |
| except: pass | |
| if isinstance(val, list): | |
| schema_map[k]["is_list"] = True | |
| if len(val) > 0 and isinstance(val[0], dict): | |
| schema_map[k]["keys"].update(val[0].keys()) | |
| elif isinstance(val, dict): | |
| schema_map[k]["keys"].update(val.keys()) | |
| # Format schema for UI | |
| formatted_schema = {} | |
| for k, info in schema_map.items(): | |
| formatted_schema[k] = { | |
| "type": "List" if info["is_list"] else "Object", | |
| "keys": list(info["keys"]) | |
| } | |
| return { | |
| "status": "success", | |
| "samples": sample_rows, | |
| "schema": formatted_schema, | |
| "dataset_id": dataset_id | |
| } | |
| except Exception as e: | |
| return {"status": "error", "message": str(e)} | |
| def _apply_projection(self, row, recipe): | |
| new_row = {} | |
| for col_def in recipe['columns']: | |
| t_type = col_def.get('type', 'simple') | |
| if t_type == 'simple': | |
| # Standard Dot Notation | |
| new_row[col_def['name']] = self._get_value_by_path(row, col_def['source']) | |
| elif t_type == 'list_search': | |
| # GET x WHERE y=z | |
| val = self._extract_from_list_logic( | |
| row, | |
| col_def['source'], | |
| col_def['filter_key'], | |
| col_def['filter_val'], | |
| col_def['target_key'] | |
| ) | |
| new_row[col_def['name']] = val | |
| elif t_type == 'python': | |
| # Advanced Python Eval | |
| try: | |
| context = row.copy() | |
| # We inject 'json' module into context for user scripts | |
| context['json'] = json | |
| val = eval(col_def['expression'], {}, context) | |
| new_row[col_def['name']] = val | |
| except: | |
| new_row[col_def['name']] = None | |
| return new_row | |
| def _passes_filter(self, row, filter_str): | |
| if not filter_str: return True | |
| try: | |
| context = row.copy() | |
| return eval(filter_str, {}, context) | |
| except: | |
| return False | |
| def _generate_card(self, source_id, target_id, recipe, license_name): | |
| """ | |
| Generates a README.md with YAML metadata and a report of operations. | |
| """ | |
| # 1. YAML Metadata | |
| card_data = DatasetCardData( | |
| language="en", | |
| license=license_name, | |
| tags=["dataset-command-center", "etl", "generated-dataset"], | |
| base_model=source_id, # Linking source | |
| ) | |
| # 2. Description & Recipe Table | |
| content = f""" | |
| # {target_id.split('/')[-1]} | |
| This dataset is a transformation of [{source_id}](https://huggingface.co/datasets/{source_id}). | |
| It was generated using the **Hugging Face Dataset Command Center**. | |
| ## Transformation Recipe | |
| The following operations were applied to the source data: | |
| | Target Column | Source | Type | Logic / Filter | | |
| |---------------|--------|------|----------------| | |
| """ | |
| for col in recipe['columns']: | |
| c_type = col.get('type', 'simple') | |
| c_name = col['name'] | |
| c_src = col.get('source', '-') | |
| if c_type == 'simple': | |
| logic = "Direct Mapping" | |
| elif c_type == 'list_search': | |
| logic = f"Get `{col['target_key']}` where `{col['filter_key']} == {col['filter_val']}`" | |
| elif c_type == 'python': | |
| logic = f"`{col.get('expression')}`" | |
| else: | |
| logic = "-" | |
| content += f"| **{c_name}** | `{c_src}` | {c_type} | {logic} |\n" | |
| if recipe.get('filter_rule'): | |
| content += f"\n### Row Filtering\n**Filter Applied:** `{recipe['filter_rule']}`\n" | |
| content += f"\n## Original License\nThis dataset inherits the license: `{license_name}` from the source." | |
| card = DatasetCard.from_template(card_data, content=content) | |
| return card | |
| def process_and_push(self, source_id, config, split, target_id, recipe, max_rows=None): | |
| logger.info(f"Job started: {source_id}") | |
| conf = config if config != 'default' else None | |
| def gen(): | |
| ds_stream = load_dataset(source_id, name=conf, split=split, streaming=True, token=self.token) | |
| count = 0 | |
| for row in ds_stream: | |
| if max_rows and count >= int(max_rows): break | |
| if self._passes_filter(row, recipe.get('filter_rule')): | |
| yield self._apply_projection(row, recipe) | |
| count += 1 | |
| try: | |
| new_dataset = datasets.Dataset.from_generator(gen) | |
| new_dataset.push_to_hub(target_id, token=self.token) | |
| # 2. GENERATE & PUSH CARD | |
| try: | |
| card = self._generate_card(source_id, target_id, recipe, new_license or "unknown") | |
| card.push_to_hub(target_id, token=self.token) | |
| except Exception as e: | |
| logger.warning(f"Could not push dataset card: {e}") | |
| return {"status": "success", "rows_processed": len(new_dataset)} | |
| except Exception as e: | |
| return {"status": "error", "message": str(e)} | |
| def preview_transform(self, dataset_id, config, split, recipe): | |
| conf = config if config != 'default' else None | |
| ds_stream = load_dataset(dataset_id, name=conf, split=split, streaming=True, token=self.token) | |
| processed = [] | |
| for row in ds_stream: | |
| if len(processed) >= 5: break | |
| if self._passes_filter(row, recipe.get('filter_rule')): | |
| processed.append(self._apply_projection(row, recipe)) | |
| return processed |