|
|
""" |
|
|
Training Data Preparation Module |
|
|
Converts parsed emails into training format for fine-tuning. |
|
|
""" |
|
|
|
|
|
import json |
|
|
import random |
|
|
from pathlib import Path |
|
|
from typing import List, Dict, Tuple, Optional |
|
|
from dataclasses import dataclass |
|
|
|
|
|
|
|
|
import sys |
|
|
sys.path.insert(0, str(Path(__file__).parent.parent)) |
|
|
from data.extractor import EntityExtractor |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class TrainingExample: |
|
|
"""A single training example.""" |
|
|
prompt: str |
|
|
completion: str |
|
|
|
|
|
def to_dict(self) -> Dict: |
|
|
return {"prompt": self.prompt, "completion": self.completion} |
|
|
|
|
|
def to_jsonl(self) -> str: |
|
|
return json.dumps(self.to_dict()) |
|
|
|
|
|
|
|
|
class TrainingDataPreparer: |
|
|
""" |
|
|
Prepare training data from parsed emails. |
|
|
|
|
|
Converts emails into prompt-completion pairs for fine-tuning. |
|
|
""" |
|
|
|
|
|
|
|
|
EXTRACTION_PROMPT_TEMPLATE = """Extract financial entities from this email: |
|
|
|
|
|
Subject: {subject} |
|
|
|
|
|
Body: {body}""" |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
emails_path: Path, |
|
|
output_dir: Path, |
|
|
extractor: Optional[EntityExtractor] = None |
|
|
): |
|
|
""" |
|
|
Initialize preparer. |
|
|
|
|
|
Args: |
|
|
emails_path: Path to parsed emails JSON |
|
|
output_dir: Directory to save training files |
|
|
extractor: EntityExtractor instance (created if not provided) |
|
|
""" |
|
|
self.emails_path = Path(emails_path) |
|
|
self.output_dir = Path(output_dir) |
|
|
self.extractor = extractor or EntityExtractor() |
|
|
|
|
|
|
|
|
with open(self.emails_path, 'r', encoding='utf-8') as f: |
|
|
self.all_emails = json.load(f) |
|
|
|
|
|
print(f"โ
Loaded {len(self.all_emails):,} emails") |
|
|
|
|
|
def filter_transaction_emails(self) -> List[Dict]: |
|
|
"""Filter emails that contain transaction data.""" |
|
|
transaction_emails = [] |
|
|
|
|
|
for email in self.all_emails: |
|
|
body = email.get('body', '').lower() |
|
|
subject = email.get('subject', '').lower() |
|
|
combined = f"{subject} {body}" |
|
|
|
|
|
|
|
|
has_transaction = any( |
|
|
kw in combined |
|
|
for kw in ['debited', 'credited', 'payment of', 'transferred'] |
|
|
) |
|
|
|
|
|
|
|
|
has_amount = 'rs' in combined or 'โน' in combined |
|
|
|
|
|
if has_transaction and has_amount: |
|
|
transaction_emails.append(email) |
|
|
|
|
|
print(f"๐ง Transaction emails found: {len(transaction_emails):,}") |
|
|
return transaction_emails |
|
|
|
|
|
def create_training_examples( |
|
|
self, |
|
|
emails: Optional[List[Dict]] = None, |
|
|
min_entities: int = 2, |
|
|
max_body_length: int = 1500 |
|
|
) -> List[TrainingExample]: |
|
|
""" |
|
|
Convert emails to training examples. |
|
|
|
|
|
Args: |
|
|
emails: List of email dicts (uses transaction emails if None) |
|
|
min_entities: Minimum entities required for valid example |
|
|
max_body_length: Maximum body length in prompt |
|
|
|
|
|
Returns: |
|
|
List of TrainingExample objects |
|
|
""" |
|
|
if emails is None: |
|
|
emails = self.filter_transaction_emails() |
|
|
|
|
|
examples = [] |
|
|
skipped = 0 |
|
|
|
|
|
for email in emails: |
|
|
|
|
|
entities = self.extractor.extract_to_dict(email.get('body', '')) |
|
|
|
|
|
|
|
|
if len(entities) < min_entities: |
|
|
skipped += 1 |
|
|
continue |
|
|
|
|
|
|
|
|
prompt = self.EXTRACTION_PROMPT_TEMPLATE.format( |
|
|
subject=email.get('subject', '')[:200], |
|
|
body=email.get('body', '')[:max_body_length] |
|
|
) |
|
|
|
|
|
|
|
|
completion = json.dumps(entities, indent=2) |
|
|
|
|
|
examples.append(TrainingExample(prompt=prompt, completion=completion)) |
|
|
|
|
|
print(f"โ
Created {len(examples):,} training examples") |
|
|
print(f"โญ๏ธ Skipped {skipped:,} (insufficient entities)") |
|
|
|
|
|
return examples |
|
|
|
|
|
def split_data( |
|
|
self, |
|
|
examples: List[TrainingExample], |
|
|
train_ratio: float = 0.9, |
|
|
seed: int = 42 |
|
|
) -> Tuple[List[TrainingExample], List[TrainingExample]]: |
|
|
""" |
|
|
Split examples into train and validation sets. |
|
|
|
|
|
Args: |
|
|
examples: List of training examples |
|
|
train_ratio: Ratio of examples for training (default 0.9) |
|
|
seed: Random seed for reproducibility |
|
|
|
|
|
Returns: |
|
|
Tuple of (train_examples, valid_examples) |
|
|
""" |
|
|
random.seed(seed) |
|
|
shuffled = examples.copy() |
|
|
random.shuffle(shuffled) |
|
|
|
|
|
split_idx = int(len(shuffled) * train_ratio) |
|
|
train = shuffled[:split_idx] |
|
|
valid = shuffled[split_idx:] |
|
|
|
|
|
print(f"๐ Train: {len(train):,}, Validation: {len(valid):,}") |
|
|
|
|
|
return train, valid |
|
|
|
|
|
def save_jsonl( |
|
|
self, |
|
|
examples: List[TrainingExample], |
|
|
filename: str |
|
|
) -> Path: |
|
|
"""Save examples to JSONL file.""" |
|
|
output_path = self.output_dir / filename |
|
|
output_path.parent.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
with open(output_path, 'w', encoding='utf-8') as f: |
|
|
for example in examples: |
|
|
f.write(example.to_jsonl() + '\n') |
|
|
|
|
|
print(f"๐พ Saved {len(examples):,} examples to {output_path}") |
|
|
return output_path |
|
|
|
|
|
def prepare( |
|
|
self, |
|
|
train_ratio: float = 0.9, |
|
|
min_entities: int = 2, |
|
|
seed: int = 42 |
|
|
) -> Tuple[Path, Path]: |
|
|
""" |
|
|
Full pipeline: filter โ create examples โ split โ save. |
|
|
|
|
|
Returns: |
|
|
Tuple of (train_path, valid_path) |
|
|
""" |
|
|
print("\n๐ Starting training data preparation...") |
|
|
|
|
|
|
|
|
examples = self.create_training_examples(min_entities=min_entities) |
|
|
|
|
|
if not examples: |
|
|
raise ValueError("No training examples created!") |
|
|
|
|
|
|
|
|
train, valid = self.split_data(examples, train_ratio, seed) |
|
|
|
|
|
|
|
|
train_path = self.save_jsonl(train, "train.jsonl") |
|
|
valid_path = self.save_jsonl(valid, "valid.jsonl") |
|
|
|
|
|
|
|
|
print("\n๐ Summary:") |
|
|
print(f" Total examples: {len(examples):,}") |
|
|
print(f" Train: {len(train):,}") |
|
|
print(f" Valid: {len(valid):,}") |
|
|
print(f" Output: {self.output_dir}") |
|
|
|
|
|
return train_path, valid_path |
|
|
|
|
|
def analyze_balance(self, examples: List[TrainingExample]) -> Dict[str, int]: |
|
|
"""Analyze balance of transaction types in examples.""" |
|
|
debit_count = sum( |
|
|
1 for e in examples |
|
|
if '"type": "debit"' in e.completion |
|
|
) |
|
|
credit_count = sum( |
|
|
1 for e in examples |
|
|
if '"type": "credit"' in e.completion |
|
|
) |
|
|
|
|
|
return { |
|
|
'debit': debit_count, |
|
|
'credit': credit_count, |
|
|
'other': len(examples) - debit_count - credit_count |
|
|
} |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
from pathlib import Path |
|
|
|
|
|
PROJECT = Path.home() / "llm-mail-trainer" |
|
|
|
|
|
preparer = TrainingDataPreparer( |
|
|
emails_path=PROJECT / "data/parsed/emails.json", |
|
|
output_dir=PROJECT / "data/training" |
|
|
) |
|
|
|
|
|
train_path, valid_path = preparer.prepare() |
|
|
|
|
|
|
|
|
examples = preparer.create_training_examples() |
|
|
balance = preparer.analyze_balance(examples) |
|
|
print(f"\n๐ Data Balance: {balance}") |
|
|
|