Spaces:
Sleeping
Sleeping
| """ | |
| Dataset Loader para MGC v5.0 | |
| Sistema automático para descobrir estrutura e carregar múltiplos datasets do Hugging Face | |
| """ | |
| import requests | |
| import json | |
| from typing import Dict, List, Optional, Any | |
| from datasets import load_dataset | |
| from collections import defaultdict | |
| class DatasetStructureDiscover: | |
| """Descobre automaticamente a estrutura de qualquer dataset do Hugging Face""" | |
| def __init__(self): | |
| self.api_base = "https://datasets-server.huggingface.co" | |
| def get_splits(self, dataset_name: str) -> Dict: | |
| """Obtém splits e configurações do dataset""" | |
| url = f"{self.api_base}/splits?dataset={dataset_name}" | |
| try: | |
| response = requests.get(url, timeout=30) | |
| response.raise_for_status() | |
| data = response.json() | |
| return data | |
| except Exception as e: | |
| return {"error": str(e), "splits": []} | |
| def get_first_rows(self, dataset_name: str, split: str = "train", config: str = None, limit: int = 5) -> Dict: | |
| """Obtém as primeiras linhas para entender a estrutura""" | |
| if config: | |
| url = f"{self.api_base}/first-rows?dataset={dataset_name}&config={config}&split={split}" | |
| else: | |
| url = f"{self.api_base}/first-rows?dataset={dataset_name}&split={split}" | |
| try: | |
| response = requests.get(url, timeout=30) | |
| response.raise_for_status() | |
| data = response.json() | |
| return data | |
| except Exception as e: | |
| return {"error": str(e)} | |
| def discover_schema(self, dataset_name: str) -> Dict: | |
| """ | |
| Descobre o schema completo do dataset: | |
| - Colunas disponíveis | |
| - Tipos de dados | |
| - Candidatos a texto | |
| - Candidatos a label | |
| """ | |
| print(f"\n🔍 Descobrindo estrutura do dataset: {dataset_name}") | |
| print("-" * 50) | |
| # 1. Obtém splits | |
| splits_info = self.get_splits(dataset_name) | |
| if "error" in splits_info: | |
| return {"error": splits_info["error"], "name": dataset_name} | |
| splits = splits_info.get("splits", []) | |
| if not splits: | |
| return {"error": "No splits found", "name": dataset_name} | |
| # 2. Usa o primeiro split disponível para inspecionar | |
| first_split = splits[0] | |
| config_name = first_split.get("config", "default") | |
| split_name = first_split.get("split", "train") | |
| # 3. Obtém primeiras linhas | |
| rows_info = self.get_first_rows(dataset_name, split_name, config_name, 10) | |
| if "error" in rows_info: | |
| return {"error": rows_info["error"], "name": dataset_name} | |
| # 4. Analisa as colunas - CORREÇÃO AQUI | |
| features = rows_info.get("features", {}) | |
| # Se features for uma lista, converte para dicionário | |
| if isinstance(features, list): | |
| features_dict = {} | |
| for feat in features: | |
| if isinstance(feat, dict): | |
| feat_name = feat.get("name", "unknown") | |
| features_dict[feat_name] = feat | |
| features = features_dict | |
| rows = rows_info.get("rows", []) | |
| # 5. Identifica colunas de texto e label | |
| text_candidates = [] | |
| label_candidates = [] | |
| if isinstance(features, dict): | |
| for col_name, col_info in features.items(): | |
| col_type = col_info.get("type", "unknown") if isinstance(col_info, dict) else "unknown" | |
| # Colunas de texto comuns | |
| if any(keyword in col_name.lower() for keyword in | |
| ["text", "sentence", "content", "body", "instruction", "input", "question", "premise", "hypothesis"]): | |
| text_candidates.append({"name": col_name, "type": col_type, "reason": "nome sugestivo"}) | |
| elif col_type in ["string", "text", "large_string"]: | |
| text_candidates.append({"name": col_name, "type": col_type, "reason": "tipo string"}) | |
| # Colunas de label comuns | |
| if any(keyword in col_name.lower() for keyword in | |
| ["label", "class", "category", "sentiment", "output", "response", "target"]): | |
| label_candidates.append({"name": col_name, "type": col_type, "reason": "nome sugestivo"}) | |
| # 6. Amostra dos dados | |
| sample_rows = [] | |
| for row in rows[:3]: | |
| row_data = row.get("row", {}) | |
| sample_rows.append(row_data) | |
| schema = { | |
| "name": dataset_name, | |
| "splits": [{"config": s.get("config"), "split": s.get("split")} for s in splits[:5]], | |
| "features": features, | |
| "text_candidates": text_candidates, | |
| "label_candidates": label_candidates, | |
| "sample_rows": sample_rows, | |
| "total_splits": len(splits) | |
| } | |
| print(f" ✅ {len(splits)} splits encontrados") | |
| print(f" 📝 Colunas: {list(features.keys()) if isinstance(features, dict) else 'N/A'}") | |
| print(f" 📖 Candidatos a texto: {[c['name'] for c in text_candidates]}") | |
| print(f" 🏷️ Candidatos a label: {[c['name'] for c in label_candidates]}") | |
| return schema | |
| class MGCDataLoader: | |
| """ | |
| Carrega múltiplos datasets do Hugging Face para a MGC | |
| Estratégia: 1 dataset base grande + datasets complementares | |
| """ | |
| def __init__(self): | |
| self.discover = DatasetStructureDiscover() | |
| self.datasets_info = {} | |
| self.loaded_data = [] | |
| def get_dataset_structure(self, dataset_name: str) -> Dict: | |
| """Interface pública para descobrir estrutura""" | |
| return self.discover.discover_schema(dataset_name) | |
| def load_dataset_with_auto_detection(self, dataset_name: str, limite: int = None) -> List[Dict]: | |
| """ | |
| Carrega dataset automaticamente detectando as colunas corretas | |
| """ | |
| print(f"\n📥 Carregando dataset: {dataset_name}") | |
| # Descobre estrutura | |
| schema = self.discover.discover_schema(dataset_name) | |
| if "error" in schema: | |
| print(f" ❌ Erro: {schema['error']}") | |
| return [] | |
| # Tenta carregar o dataset | |
| try: | |
| # Pega o primeiro split disponível | |
| if schema["splits"]: | |
| first_split = schema["splits"][0] | |
| config = first_split.get("config") | |
| split = first_split.get("split", "train") | |
| if config and config != "default": | |
| dataset = load_dataset(dataset_name, config, split=split, trust_remote_code=True) | |
| else: | |
| dataset = load_dataset(dataset_name, split=split, trust_remote_code=True) | |
| else: | |
| dataset = load_dataset(dataset_name, split="train", trust_remote_code=True) | |
| # Limita tamanho | |
| if limite and len(dataset) > limite: | |
| dataset = dataset.select(range(limite)) | |
| # Detecta colunas corretas | |
| text_col = None | |
| label_col = None | |
| column_names = dataset.column_names | |
| # Prioridade para colunas de texto | |
| for col in schema["text_candidates"]: | |
| if col["name"] in column_names: | |
| text_col = col["name"] | |
| break | |
| # Se não achou, pega primeira coluna string | |
| if not text_col: | |
| for col in column_names: | |
| try: | |
| if dataset[col][0] and isinstance(dataset[col][0], str): | |
| text_col = col | |
| break | |
| except: | |
| pass | |
| # Detecta coluna de label | |
| for col in schema["label_candidates"]: | |
| if col["name"] in column_names: | |
| label_col = col["name"] | |
| break | |
| # Se não achou label, tenta coluna 'label' | |
| if not label_col and 'label' in column_names: | |
| label_col = 'label' | |
| # Converte para formato MGC | |
| data_list = [] | |
| for item in dataset: | |
| texto = "" | |
| label = "" | |
| if text_col: | |
| texto = str(item[text_col])[:500] | |
| if label_col: | |
| label = str(item[label_col])[:100] | |
| else: | |
| # Se não tem label, usa o nome do dataset como categoria | |
| label = dataset_name.split("/")[-1] | |
| if texto and len(texto) > 10: | |
| data_list.append({ | |
| "entrada": texto, | |
| "saida": label, | |
| "fonte": dataset_name | |
| }) | |
| print(f" ✅ Carregado {len(data_list)} exemplos") | |
| print(f" 📝 Coluna de texto: {text_col}") | |
| print(f" 🏷️ Coluna de label: {label_col}") | |
| return data_list | |
| except Exception as e: | |
| print(f" ❌ Erro ao carregar: {str(e)}") | |
| return [] | |
| def carregar_dataset_base(self, limite: int = 5000) -> List[Dict]: | |
| """Carrega o dataset base (Canarim - 300k+ instruções em PT-BR)""" | |
| print("\n" + "="*60) | |
| print("🚀 CARREGANDO DATASET BASE (MILHÕES DE TOKENS)") | |
| print("="*60) | |
| return self.load_dataset_with_auto_detection( | |
| "dominguesm/Canarim-Instruct-PTBR-Dataset", | |
| limite=limite | |
| ) | |
| def carregar_datasets_complementares(self, limites: Dict[str, int] = None) -> List[Dict]: | |
| """Carrega datasets complementares pequenos""" | |
| print("\n" + "="*60) | |
| print("📚 CARREGANDO DATASETS COMPLEMENTARES") | |
| print("="*60) | |
| if limites is None: | |
| limites = { | |
| "assin2": 3000, | |
| "hatebr": 3000, | |
| "fake_br": 3000, | |
| "portuguese_hate_speech": 3000 | |
| } | |
| datasets_complementares = [ | |
| {"name": "assin2", "desc": "Similaridade Semântica"}, | |
| {"name": "hatebr", "desc": "Discurso de Ódio"}, | |
| {"name": "fake_br", "desc": "Fake News"}, | |
| {"name": "portuguese_hate_speech", "desc": "Discurso de Ódio (Tweets)"} | |
| ] | |
| all_data = [] | |
| for ds in datasets_complementares: | |
| nome = ds["name"] | |
| desc = ds["desc"] | |
| limite = limites.get(nome, 3000) | |
| print(f"\n📖 {desc} ({nome})") | |
| dados = self.load_dataset_with_auto_detection(nome, limite=limite) | |
| all_data.extend(dados) | |
| return all_data | |
| def carregar_dataset_ingles(self, limite: int = 5000) -> List[Dict]: | |
| """Carrega dataset em inglês como complemento (opcional)""" | |
| print("\n" + "="*60) | |
| print("🇬🇧 CARREGANDO DATASET EM INGLÊS") | |
| print("="*60) | |
| return self.load_dataset_with_auto_detection("agnews", limite=limite) | |
| def carregar_tudo(self, | |
| limite_base: int = 5000, | |
| carregar_complementares: bool = True, | |
| carregar_ingles: bool = False) -> List[Dict]: | |
| """Carrega todos os datasets configurados""" | |
| todos_dados = [] | |
| # 1. Dataset base (Canarim - milhões de tokens) | |
| dados_base = self.carregar_dataset_base(limite=limite_base) | |
| todos_dados.extend(dados_base) | |
| # 2. Datasets complementares em português | |
| if carregar_complementares: | |
| dados_comp = self.carregar_datasets_complementares() | |
| todos_dados.extend(dados_comp) | |
| # 3. Dataset em inglês (opcional) | |
| if carregar_ingles: | |
| dados_eng = self.carregar_dataset_ingles(limite=3000) | |
| todos_dados.extend(dados_eng) | |
| print("\n" + "="*60) | |
| print(f"✅ TOTAL CARREGADO: {len(todos_dados)} exemplos") | |
| print("="*60) | |
| # Estatísticas por fonte | |
| fontes = defaultdict(int) | |
| for item in todos_dados: | |
| fontes[item["fonte"]] += 1 | |
| print("\n📊 Distribuição por dataset:") | |
| for fonte, qtd in fontes.items(): | |
| print(f" - {fonte}: {qtd}") | |
| return todos_dados | |
| # ============================================================ | |
| # FUNÇÃO PARA INSPEÇÃO MANUAL DE DATASET | |
| # ============================================================ | |
| def inspecionar_dataset(nome_dataset: str) -> Dict: | |
| """Inspeciona a estrutura de qualquer dataset do Hugging Face""" | |
| discover = DatasetStructureDiscover() | |
| schema = discover.discover_schema(nome_dataset) | |
| print("\n" + "="*60) | |
| print(f"📋 RELATÓRIO DO DATASET: {nome_dataset}") | |
| print("="*60) | |
| if "error" in schema: | |
| print(f"❌ Erro: {schema['error']}") | |
| return schema | |
| print(f"\n📂 Splits disponíveis: {schema['total_splits']}") | |
| for s in schema['splits'][:5]: | |
| print(f" - Config: {s.get('config')}, Split: {s.get('split')}") | |
| print(f"\n📝 Colunas: {list(schema.get('features', {}).keys())}") | |
| print(f"\n📖 Candidatos para texto:") | |
| for c in schema.get('text_candidates', []): | |
| print(f" - {c['name']} (tipo: {c['type']}) - {c['reason']}") | |
| print(f"\n🏷️ Candidatos para label:") | |
| for c in schema.get('label_candidates', []): | |
| print(f" - {c['name']} (tipo: {c['type']}) - {c['reason']}") | |
| print(f"\n📄 Amostra dos dados:") | |
| for i, row in enumerate(schema.get('sample_rows', [])[:2]): | |
| print(f"\n Exemplo {i+1}:") | |
| for key, value in list(row.items())[:3]: | |
| valor_str = str(value)[:100] + "..." if len(str(value)) > 100 else str(value) | |
| print(f" {key}: {valor_str}") | |
| return schema | |
| # ============================================================ | |
| # TESTE | |
| # ============================================================ | |
| if __name__ == "__main__": | |
| print("="*60) | |
| print("🧠 INSPEÇÃO DE DATASETS PARA MGC v5.0") | |
| print("="*60) | |
| # Testar inspeção | |
| inspecionar_dataset("assin2") |