File size: 5,491 Bytes
24f95f0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
"""
Dataset Extractor for Janus.

Streams datasets from HF Hub, extracts facts and patterns,
converts to knowledge entries for the knowledge base.
"""

import json
import logging
import time
from typing import Dict, List, Any, Optional

logger = logging.getLogger(__name__)


class DatasetExtractor:
    """
    Extracts knowledge from HF datasets and converts to knowledge entries.
    Works with streaming datasets to avoid full downloads.
    """

    def __init__(self):
        self.extraction_stats = {
            "total_datasets_processed": 0,
            "total_entries_extracted": 0,
            "last_extraction": None,
        }

    def extract_facts_from_samples(self, samples: List[Dict], topic: str) -> List[Dict]:
        """
        Extract knowledge facts from dataset samples.

        Args:
            samples: List of dataset records
            topic: Topic for the extracted knowledge

        Returns:
            List of knowledge entry dicts
        """
        if not samples:
            return []

        facts = []
        for sample in samples:
            # Extract text fields
            text_fields = self._extract_text_fields(sample)
            for field_name, text in text_fields.items():
                if len(text) > 50:  # Only meaningful content
                    facts.append(
                        {
                            "text": text[:1000],
                            "source": f"dataset:{topic}",
                            "topic": topic,
                            "field": field_name,
                            "timestamp": time.time(),
                            "confidence": 0.7,  # Dataset quality varies
                        }
                    )

        self.extraction_stats["total_entries_extracted"] += len(facts)
        self.extraction_stats["total_datasets_processed"] += 1
        self.extraction_stats["last_extraction"] = time.time()

        logger.info(f"Extracted {len(facts)} facts from {topic} dataset")
        return facts

    def extract_instruction_pairs(self, samples: List[Dict]) -> List[Dict]:
        """
        Extract instruction-tuning pairs from dataset samples.

        Args:
            samples: List of dataset records

        Returns:
            List of {"instruction", "input", "output"} dicts
        """
        pairs = []

        for sample in samples:
            pair = self._convert_to_instruction_pair(sample)
            if pair:
                pairs.append(pair)

        logger.info(
            f"Extracted {len(pairs)} instruction pairs from {len(samples)} samples"
        )
        return pairs

    def _extract_text_fields(self, sample: Dict) -> Dict[str, str]:
        """Extract all text fields from a sample."""
        text_fields = {}

        for key, value in sample.items():
            if isinstance(value, str) and len(value) > 50:
                text_fields[key] = value
            elif isinstance(value, list):
                # Join list items if they're strings
                text_items = [str(item) for item in value if isinstance(item, str)]
                if text_items:
                    combined = " ".join(text_items)
                    if len(combined) > 50:
                        text_fields[key] = combined

        return text_fields

    def _convert_to_instruction_pair(self, sample: Dict) -> Optional[Dict]:
        """
        Convert a dataset sample to instruction-tuning format.
        Handles common dataset formats.
        """
        # Common field patterns for instruction tuning
        instruction_fields = ["instruction", "prompt", "question", "query", "input"]
        output_fields = ["output", "response", "answer", "completion", "target"]

        instruction = None
        output = None

        # Find instruction
        for field in instruction_fields:
            if (
                field in sample
                and isinstance(sample[field], str)
                and len(sample[field]) > 10
            ):
                instruction = sample[field]
                break

        # Find output
        for field in output_fields:
            if (
                field in sample
                and isinstance(sample[field], str)
                and len(sample[field]) > 10
            ):
                output = sample[field]
                break

        # Handle nested formats (e.g., conversations)
        if instruction is None and "conversations" in sample:
            convos = sample["conversations"]
            if isinstance(convos, list) and len(convos) >= 2:
                # First message is instruction, second is output
                if isinstance(convos[0], dict):
                    instruction = convos[0].get("value", "")
                if isinstance(convos[1], dict):
                    output = convos[1].get("value", "")

        # Handle Q&A format
        if instruction is None and "question" in sample:
            instruction = sample["question"]
            if "answer" in sample:
                output = sample["answer"]

        if instruction and output:
            return {
                "instruction": instruction[:2000],
                "input": "",
                "output": output[:2000],
                "source": "dataset_extract",
            }

        return None

    def get_extraction_stats(self) -> Dict:
        """Get extraction statistics."""
        return self.extraction_stats.copy()


dataset_extractor = DatasetExtractor()