File size: 10,198 Bytes
cb7ce4a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
#!/usr/bin/env python3
"""
Normalize Layer 1 financial instruction datasets into ChatML JSONL format.

Handles 5 financial instruction datasets with different column naming conventions:
- finance_instruct_500k (Josephgflowers/Finance-Instruct-500k)
- sujet_finance_177k (sujet-ai/Sujet-Finance-Instruct-177k)
- financial_qa_10k (virattt/financial-qa-10K)
- fingpt_convfinqa (FinGPT/fingpt-convfinqa)
- earnings_calls_qa (lamini/earnings-calls-qa)

Converts to unified ChatML format with system prompt, user message, and assistant response.
Filters out low-quality samples based on message length thresholds.
"""

import json
import os
from pathlib import Path
from datasets import load_from_disk
from typing import Optional, Dict, List, Tuple


def get_system_prompt(data_dir: Path) -> str:
    """Load the CFO system prompt from file."""
    prompt_path = data_dir / "cfo_system_prompt.txt"
    with open(prompt_path, "r", encoding="utf-8") as f:
        return f.read().strip()


def extract_field(sample: Dict, possible_names: List[str], default: str = "") -> str:
    """
    Extract a field from a sample using multiple possible column names with fallbacks.

    Args:
        sample: The data sample dictionary
        possible_names: List of possible column names to try
        default: Default value if none of the names are found

    Returns:
        The field value as a string, or default if not found
    """
    for name in possible_names:
        if name in sample:
            value = sample[name]
            if value is not None:
                return str(value).strip()
    return default


def is_valid_sample(user_content: str, assistant_content: str,
                   min_user_len: int = 10, min_assistant_len: int = 20) -> bool:
    """
    Check if a sample meets quality thresholds.

    Args:
        user_content: The user message content
        assistant_content: The assistant response content
        min_user_len: Minimum acceptable length for user message
        min_assistant_len: Minimum acceptable length for assistant response

    Returns:
        True if sample meets quality thresholds, False otherwise
    """
    return (len(user_content) >= min_user_len and
            len(assistant_content) >= min_assistant_len)


def normalize_finance_instruct_500k(sample: Dict) -> Optional[Dict]:
    """
    Normalize finance_instruct_500k dataset.
    Likely columns: instruction/context/output or input/output
    """
    # Try instruction + context + output pattern
    instruction = extract_field(sample, ["instruction", "input", "prompt"])
    context = extract_field(sample, ["context", "background", ""])
    output = extract_field(sample, ["output", "response", "answer"])

    # Combine context with instruction if available
    if context:
        user_content = f"{instruction}\n\nContext: {context}".strip()
    else:
        user_content = instruction

    if not is_valid_sample(user_content, output):
        return None

    return {
        "messages": [
            {"role": "system", "content": ""},  # Will be filled later
            {"role": "user", "content": user_content},
            {"role": "assistant", "content": output}
        ]
    }


def normalize_sujet_finance_177k(sample: Dict) -> Optional[Dict]:
    """
    Normalize sujet_finance_177k dataset.
    Likely columns: instruction/output
    """
    instruction = extract_field(sample, ["instruction", "input", "question"])
    output = extract_field(sample, ["output", "response", "answer"])

    if not is_valid_sample(instruction, output):
        return None

    return {
        "messages": [
            {"role": "system", "content": ""},
            {"role": "user", "content": instruction},
            {"role": "assistant", "content": output}
        ]
    }


def normalize_financial_qa_10k(sample: Dict) -> Optional[Dict]:
    """
    Normalize financial_qa_10k dataset.
    Likely columns: question/answer/context
    """
    question = extract_field(sample, ["question", "query", "input"])
    answer = extract_field(sample, ["answer", "response", "output"])
    context = extract_field(sample, ["context", "background", "document"])

    # Combine context with question if available
    if context:
        user_content = f"{question}\n\nDocument context: {context}".strip()
    else:
        user_content = question

    if not is_valid_sample(user_content, answer):
        return None

    return {
        "messages": [
            {"role": "system", "content": ""},
            {"role": "user", "content": user_content},
            {"role": "assistant", "content": answer}
        ]
    }


def normalize_fingpt_convfinqa(sample: Dict) -> Optional[Dict]:
    """
    Normalize fingpt_convfinqa dataset.
    Likely columns: input/output
    """
    user_input = extract_field(sample, ["input", "instruction", "question"])
    output = extract_field(sample, ["output", "response", "answer"])

    if not is_valid_sample(user_input, output):
        return None

    return {
        "messages": [
            {"role": "system", "content": ""},
            {"role": "user", "content": user_input},
            {"role": "assistant", "content": output}
        ]
    }


def normalize_earnings_calls_qa(sample: Dict) -> Optional[Dict]:
    """
    Normalize earnings_calls_qa dataset.
    Likely columns: question/answer
    """
    question = extract_field(sample, ["question", "query", "input"])
    answer = extract_field(sample, ["answer", "response", "output"])

    if not is_valid_sample(question, answer):
        return None

    return {
        "messages": [
            {"role": "system", "content": ""},
            {"role": "user", "content": question},
            {"role": "assistant", "content": answer}
        ]
    }


def process_dataset(dataset_name: str, dataset, normalize_fn) -> Tuple[int, int]:
    """
    Process a single dataset and return (valid_count, filtered_count).

    Args:
        dataset_name: Name of the dataset for logging
        dataset: The loaded dataset object
        normalize_fn: Function to normalize samples from this dataset

    Returns:
        Tuple of (number of valid samples, number of filtered samples)
    """
    valid_count = 0
    filtered_count = 0

    # Handle both single split and multiple splits
    if isinstance(dataset, dict):
        splits = list(dataset.keys())
    else:
        splits = ["train"] if hasattr(dataset, "__len__") else []

    for split in splits:
        split_data = dataset[split] if isinstance(dataset, dict) else dataset

        for sample in split_data:
            normalized = normalize_fn(sample)
            if normalized is not None:
                normalized_samples.append(normalized)
                valid_count += 1
            else:
                filtered_count += 1

    print(f"  {dataset_name}: {valid_count} valid, {filtered_count} filtered")
    return valid_count, filtered_count


def main():
    """Main normalization pipeline."""
    # Setup paths
    script_dir = Path(__file__).parent
    raw_dir = script_dir / "raw"
    processed_dir = script_dir / "processed"
    processed_dir.mkdir(exist_ok=True)

    # Load system prompt
    system_prompt = get_system_prompt(script_dir)

    print(f"Loading datasets from: {raw_dir}")
    print(f"Output will be saved to: {processed_dir / 'layer1.jsonl'}")
    print("-" * 60)

    # Define datasets and their normalization functions
    datasets_config = [
        ("finance_instruct_500k", normalize_finance_instruct_500k),
        ("sujet_finance_177k", normalize_sujet_finance_177k),
        ("financial_qa_10k", normalize_financial_qa_10k),
        ("fingpt_convfinqa", normalize_fingpt_convfinqa),
        ("earnings_calls_qa", normalize_earnings_calls_qa),
    ]

    all_samples = []
    total_valid = 0
    total_filtered = 0

    for dataset_name, normalize_fn in datasets_config:
        dataset_path = raw_dir / dataset_name

        if not dataset_path.exists():
            print(f"  {dataset_name}: SKIPPED (directory not found)")
            continue

        try:
            print(f"\nProcessing {dataset_name}...")

            # Load dataset from disk
            dataset = load_from_disk(str(dataset_path))

            # Process the dataset
            normalized_samples = []
            valid_count = 0
            filtered_count = 0

            # Handle both single split and multiple splits
            if isinstance(dataset, dict):
                splits = list(dataset.keys())
            else:
                splits = ["train"]

            for split in splits:
                split_data = dataset[split] if isinstance(dataset, dict) else dataset

                for sample in split_data:
                    normalized = normalize_fn(sample)
                    if normalized is not None:
                        normalized_samples.append(normalized)
                        valid_count += 1
                    else:
                        filtered_count += 1

            print(f"  {dataset_name}: {valid_count} valid, {filtered_count} filtered")

            all_samples.extend(normalized_samples)
            total_valid += valid_count
            total_filtered += filtered_count

        except Exception as e:
            print(f"  {dataset_name}: ERROR - {type(e).__name__}: {str(e)[:100]}")

    # Add system prompt to all samples
    print("\nAdding system prompt to all samples...")
    for sample in all_samples:
        sample["messages"][0]["content"] = system_prompt

    # Write to output file
    output_path = processed_dir / "layer1.jsonl"
    print(f"\nWriting {len(all_samples)} samples to {output_path}...")

    with open(output_path, "w", encoding="utf-8") as f:
        for sample in all_samples:
            f.write(json.dumps(sample, ensure_ascii=False) + "\n")

    # Print summary
    print("\n" + "=" * 60)
    print("NORMALIZATION SUMMARY")
    print("=" * 60)
    print(f"Total valid samples: {total_valid}")
    print(f"Total filtered samples: {total_filtered}")
    print(f"Output file: {output_path}")
    print(f"Output file size: {output_path.stat().st_size / (1024*1024):.2f} MB")


if __name__ == "__main__":
    main()