Spaces:
Running
Running
File size: 5,491 Bytes
24f95f0 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 | """
Dataset Extractor for Janus.
Streams datasets from HF Hub, extracts facts and patterns,
converts to knowledge entries for the knowledge base.
"""
import json
import logging
import time
from typing import Dict, List, Any, Optional
logger = logging.getLogger(__name__)
class DatasetExtractor:
"""
Extracts knowledge from HF datasets and converts to knowledge entries.
Works with streaming datasets to avoid full downloads.
"""
def __init__(self):
self.extraction_stats = {
"total_datasets_processed": 0,
"total_entries_extracted": 0,
"last_extraction": None,
}
def extract_facts_from_samples(self, samples: List[Dict], topic: str) -> List[Dict]:
"""
Extract knowledge facts from dataset samples.
Args:
samples: List of dataset records
topic: Topic for the extracted knowledge
Returns:
List of knowledge entry dicts
"""
if not samples:
return []
facts = []
for sample in samples:
# Extract text fields
text_fields = self._extract_text_fields(sample)
for field_name, text in text_fields.items():
if len(text) > 50: # Only meaningful content
facts.append(
{
"text": text[:1000],
"source": f"dataset:{topic}",
"topic": topic,
"field": field_name,
"timestamp": time.time(),
"confidence": 0.7, # Dataset quality varies
}
)
self.extraction_stats["total_entries_extracted"] += len(facts)
self.extraction_stats["total_datasets_processed"] += 1
self.extraction_stats["last_extraction"] = time.time()
logger.info(f"Extracted {len(facts)} facts from {topic} dataset")
return facts
def extract_instruction_pairs(self, samples: List[Dict]) -> List[Dict]:
"""
Extract instruction-tuning pairs from dataset samples.
Args:
samples: List of dataset records
Returns:
List of {"instruction", "input", "output"} dicts
"""
pairs = []
for sample in samples:
pair = self._convert_to_instruction_pair(sample)
if pair:
pairs.append(pair)
logger.info(
f"Extracted {len(pairs)} instruction pairs from {len(samples)} samples"
)
return pairs
def _extract_text_fields(self, sample: Dict) -> Dict[str, str]:
"""Extract all text fields from a sample."""
text_fields = {}
for key, value in sample.items():
if isinstance(value, str) and len(value) > 50:
text_fields[key] = value
elif isinstance(value, list):
# Join list items if they're strings
text_items = [str(item) for item in value if isinstance(item, str)]
if text_items:
combined = " ".join(text_items)
if len(combined) > 50:
text_fields[key] = combined
return text_fields
def _convert_to_instruction_pair(self, sample: Dict) -> Optional[Dict]:
"""
Convert a dataset sample to instruction-tuning format.
Handles common dataset formats.
"""
# Common field patterns for instruction tuning
instruction_fields = ["instruction", "prompt", "question", "query", "input"]
output_fields = ["output", "response", "answer", "completion", "target"]
instruction = None
output = None
# Find instruction
for field in instruction_fields:
if (
field in sample
and isinstance(sample[field], str)
and len(sample[field]) > 10
):
instruction = sample[field]
break
# Find output
for field in output_fields:
if (
field in sample
and isinstance(sample[field], str)
and len(sample[field]) > 10
):
output = sample[field]
break
# Handle nested formats (e.g., conversations)
if instruction is None and "conversations" in sample:
convos = sample["conversations"]
if isinstance(convos, list) and len(convos) >= 2:
# First message is instruction, second is output
if isinstance(convos[0], dict):
instruction = convos[0].get("value", "")
if isinstance(convos[1], dict):
output = convos[1].get("value", "")
# Handle Q&A format
if instruction is None and "question" in sample:
instruction = sample["question"]
if "answer" in sample:
output = sample["answer"]
if instruction and output:
return {
"instruction": instruction[:2000],
"input": "",
"output": output[:2000],
"source": "dataset_extract",
}
return None
def get_extraction_stats(self) -> Dict:
"""Get extraction statistics."""
return self.extraction_stats.copy()
dataset_extractor = DatasetExtractor()
|