|
|
|
|
|
""" |
|
|
Instruction-Format Training Data Converter |
|
|
========================================== |
|
|
|
|
|
Converts existing training data to instruction-following format |
|
|
compatible with Llama 3.1 / Qwen2.5 fine-tuning. |
|
|
|
|
|
Formats: |
|
|
1. Alpaca format (instruction/input/output) |
|
|
2. ChatML format (messages with roles) |
|
|
3. Llama 3 format (native template) |
|
|
""" |
|
|
|
|
|
import json |
|
|
import argparse |
|
|
from pathlib import Path |
|
|
from typing import List, Dict, Optional |
|
|
from dataclasses import dataclass, asdict |
|
|
import random |
|
|
|
|
|
|
|
|
SYSTEM_PROMPTS = { |
|
|
"extraction": """You are a financial entity extraction assistant specialized in Indian banking messages. |
|
|
|
|
|
Your task is to extract structured information from SMS, email, or bank statements. |
|
|
|
|
|
Always output valid JSON with these fields (only include if found in the message): |
|
|
- amount: float (the transaction amount in INR) |
|
|
- type: "debit" or "credit" |
|
|
- account: string (last 4 digits of account number) |
|
|
- bank: string (bank name) |
|
|
- date: string (transaction date in YYYY-MM-DD format) |
|
|
- time: string (transaction time if present) |
|
|
- reference: string (UPI/NEFT/IMPS reference number) |
|
|
- merchant: string (business/merchant name for P2M transactions) |
|
|
- beneficiary: string (person name for P2P transfers) |
|
|
- vpa: string (UPI ID/VPA) |
|
|
- category: string (food, shopping, travel, bills, investment, transfer, etc.) |
|
|
- is_p2m: boolean (true if merchant, false if person-to-person) |
|
|
- balance: float (available balance after transaction) |
|
|
- status: string (success, failed, pending) |
|
|
|
|
|
Be precise. Extract exactly what's present in the message.""", |
|
|
|
|
|
"categorization": """You are a financial categorization assistant. |
|
|
|
|
|
Given a transaction or merchant name, categorize it into one of these categories: |
|
|
- food: Restaurants, food delivery (Swiggy, Zomato) |
|
|
- grocery: Supermarkets, grocery stores (BigBasket, Zepto) |
|
|
- shopping: E-commerce, retail (Amazon, Flipkart) |
|
|
- transport: Ride-hailing, fuel (Uber, Ola, petrol) |
|
|
- travel: Flights, hotels, trains (IRCTC, MakeMyTrip) |
|
|
- bills: Utilities, recharges (Airtel, electricity) |
|
|
- entertainment: Movies, streaming (Netflix, BookMyShow) |
|
|
- healthcare: Pharmacy, hospitals (Apollo, PharmEasy) |
|
|
- investment: Stocks, mutual funds (Zerodha, Groww) |
|
|
- transfer: P2P money transfers |
|
|
- salary: Income credits |
|
|
- emi: Loan EMI payments |
|
|
- other: Uncategorized |
|
|
|
|
|
Output a single category name.""", |
|
|
|
|
|
"analysis": """You are a financial analysis assistant. |
|
|
|
|
|
Help users understand their spending patterns, detect anomalies, and provide insights. |
|
|
Use the provided transaction history as context. |
|
|
Be concise and data-driven in your responses.""" |
|
|
} |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class AlpacaFormat: |
|
|
"""Alpaca instruction format.""" |
|
|
instruction: str |
|
|
input: str |
|
|
output: str |
|
|
|
|
|
def to_dict(self) -> Dict: |
|
|
return asdict(self) |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class ChatMessage: |
|
|
"""Single chat message.""" |
|
|
role: str |
|
|
content: str |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class ChatMLFormat: |
|
|
"""ChatML format with messages.""" |
|
|
messages: List[Dict[str, str]] |
|
|
|
|
|
def to_dict(self) -> Dict: |
|
|
return {"messages": self.messages} |
|
|
|
|
|
|
|
|
class Llama3Format: |
|
|
"""Llama 3 native format.""" |
|
|
|
|
|
@staticmethod |
|
|
def format(system: str, user: str, assistant: str) -> str: |
|
|
return f"""<|begin_of_text|><|start_header_id|>system<|end_header_id|> |
|
|
|
|
|
{system}<|eot_id|><|start_header_id|>user<|end_header_id|> |
|
|
|
|
|
{user}<|eot_id|><|start_header_id|>assistant<|end_header_id|> |
|
|
|
|
|
{assistant}<|eot_id|>""" |
|
|
|
|
|
|
|
|
class InstructionConverter: |
|
|
"""Convert training data to instruction format.""" |
|
|
|
|
|
def __init__(self, task: str = "extraction"): |
|
|
self.task = task |
|
|
self.system_prompt = SYSTEM_PROMPTS.get(task, SYSTEM_PROMPTS["extraction"]) |
|
|
|
|
|
|
|
|
self.instructions = [ |
|
|
"Extract financial entities from this message:", |
|
|
"Parse the financial information from this banking message:", |
|
|
"Identify and extract all transaction details:", |
|
|
"Extract structured data from this bank notification:", |
|
|
"Parse this banking SMS and extract entities:", |
|
|
"Analyze this transaction message and extract details:", |
|
|
"Extract the financial entities from the following message:", |
|
|
"Identify transaction details in this message:", |
|
|
] |
|
|
|
|
|
def convert_to_alpaca(self, record: Dict) -> AlpacaFormat: |
|
|
"""Convert to Alpaca format.""" |
|
|
input_text = record.get("input", record.get("text", "")) |
|
|
output = record.get("output", record.get("ground_truth", {})) |
|
|
|
|
|
if isinstance(output, str): |
|
|
try: |
|
|
output = json.loads(output) |
|
|
except json.JSONDecodeError: |
|
|
output = {"raw": output} |
|
|
|
|
|
|
|
|
output = {k: v for k, v in output.items() if v is not None} |
|
|
|
|
|
instruction = random.choice(self.instructions) |
|
|
|
|
|
return AlpacaFormat( |
|
|
instruction=instruction, |
|
|
input=input_text, |
|
|
output=json.dumps(output, ensure_ascii=False, indent=2) |
|
|
) |
|
|
|
|
|
def convert_to_chatml(self, record: Dict) -> ChatMLFormat: |
|
|
"""Convert to ChatML format.""" |
|
|
input_text = record.get("input", record.get("text", "")) |
|
|
output = record.get("output", record.get("ground_truth", {})) |
|
|
|
|
|
if isinstance(output, str): |
|
|
try: |
|
|
output = json.loads(output) |
|
|
except json.JSONDecodeError: |
|
|
output = {"raw": output} |
|
|
|
|
|
output = {k: v for k, v in output.items() if v is not None} |
|
|
|
|
|
instruction = random.choice(self.instructions) |
|
|
|
|
|
return ChatMLFormat(messages=[ |
|
|
{"role": "system", "content": self.system_prompt}, |
|
|
{"role": "user", "content": f"{instruction}\n\n{input_text}"}, |
|
|
{"role": "assistant", "content": json.dumps(output, ensure_ascii=False)} |
|
|
]) |
|
|
|
|
|
def convert_to_llama3(self, record: Dict) -> str: |
|
|
"""Convert to Llama 3 native format.""" |
|
|
input_text = record.get("input", record.get("text", "")) |
|
|
output = record.get("output", record.get("ground_truth", {})) |
|
|
|
|
|
if isinstance(output, str): |
|
|
try: |
|
|
output = json.loads(output) |
|
|
except json.JSONDecodeError: |
|
|
output = {"raw": output} |
|
|
|
|
|
output = {k: v for k, v in output.items() if v is not None} |
|
|
|
|
|
instruction = random.choice(self.instructions) |
|
|
user_content = f"{instruction}\n\n{input_text}" |
|
|
|
|
|
return Llama3Format.format( |
|
|
system=self.system_prompt, |
|
|
user=user_content, |
|
|
assistant=json.dumps(output, ensure_ascii=False) |
|
|
) |
|
|
|
|
|
def convert_file( |
|
|
self, |
|
|
input_path: Path, |
|
|
output_path: Path, |
|
|
format_type: str = "chatml", |
|
|
max_samples: Optional[int] = None |
|
|
) -> int: |
|
|
"""Convert entire file.""" |
|
|
print(f"Converting {input_path} to {format_type} format...") |
|
|
|
|
|
|
|
|
records = [] |
|
|
with open(input_path, 'r', encoding='utf-8') as f: |
|
|
for line in f: |
|
|
try: |
|
|
records.append(json.loads(line)) |
|
|
except json.JSONDecodeError: |
|
|
continue |
|
|
|
|
|
print(f" Loaded {len(records):,} records") |
|
|
|
|
|
|
|
|
random.shuffle(records) |
|
|
if max_samples: |
|
|
records = records[:max_samples] |
|
|
|
|
|
|
|
|
output_path.parent.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
with open(output_path, 'w', encoding='utf-8') as f: |
|
|
for record in records: |
|
|
if format_type == "alpaca": |
|
|
converted = self.convert_to_alpaca(record).to_dict() |
|
|
elif format_type == "chatml": |
|
|
converted = self.convert_to_chatml(record).to_dict() |
|
|
elif format_type == "llama3": |
|
|
|
|
|
converted = {"text": self.convert_to_llama3(record)} |
|
|
else: |
|
|
raise ValueError(f"Unknown format: {format_type}") |
|
|
|
|
|
f.write(json.dumps(converted, ensure_ascii=False) + '\n') |
|
|
|
|
|
print(f" Saved {len(records):,} records to {output_path}") |
|
|
return len(records) |
|
|
|
|
|
|
|
|
def create_train_val_test_split( |
|
|
input_path: Path, |
|
|
output_dir: Path, |
|
|
format_type: str = "chatml", |
|
|
train_ratio: float = 0.9, |
|
|
val_ratio: float = 0.05, |
|
|
test_ratio: float = 0.05 |
|
|
): |
|
|
"""Create train/val/test splits.""" |
|
|
print(f"Creating data splits from {input_path}...") |
|
|
|
|
|
|
|
|
records = [] |
|
|
with open(input_path, 'r', encoding='utf-8') as f: |
|
|
for line in f: |
|
|
try: |
|
|
records.append(json.loads(line)) |
|
|
except json.JSONDecodeError: |
|
|
continue |
|
|
|
|
|
print(f" Total records: {len(records):,}") |
|
|
|
|
|
|
|
|
random.shuffle(records) |
|
|
|
|
|
|
|
|
n = len(records) |
|
|
train_end = int(n * train_ratio) |
|
|
val_end = train_end + int(n * val_ratio) |
|
|
|
|
|
train_records = records[:train_end] |
|
|
val_records = records[train_end:val_end] |
|
|
test_records = records[val_end:] |
|
|
|
|
|
print(f" Train: {len(train_records):,}") |
|
|
print(f" Val: {len(val_records):,}") |
|
|
print(f" Test: {len(test_records):,}") |
|
|
|
|
|
|
|
|
converter = InstructionConverter() |
|
|
output_dir.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
for name, split_records in [ |
|
|
("train", train_records), |
|
|
("valid", val_records), |
|
|
("test", test_records) |
|
|
]: |
|
|
output_path = output_dir / f"{name}.jsonl" |
|
|
|
|
|
with open(output_path, 'w', encoding='utf-8') as f: |
|
|
for record in split_records: |
|
|
if format_type == "alpaca": |
|
|
converted = converter.convert_to_alpaca(record).to_dict() |
|
|
elif format_type == "chatml": |
|
|
converted = converter.convert_to_chatml(record).to_dict() |
|
|
elif format_type == "llama3": |
|
|
converted = {"text": converter.convert_to_llama3(record)} |
|
|
else: |
|
|
raise ValueError(f"Unknown format: {format_type}") |
|
|
|
|
|
f.write(json.dumps(converted, ensure_ascii=False) + '\n') |
|
|
|
|
|
print(f" Saved {output_path}") |
|
|
|
|
|
|
|
|
def main(): |
|
|
parser = argparse.ArgumentParser(description="Convert training data to instruction format") |
|
|
parser.add_argument("input", help="Input JSONL file") |
|
|
parser.add_argument("-o", "--output", help="Output directory") |
|
|
parser.add_argument("-f", "--format", choices=["alpaca", "chatml", "llama3"], |
|
|
default="chatml", help="Output format") |
|
|
parser.add_argument("-n", "--max-samples", type=int, help="Max samples") |
|
|
parser.add_argument("--split", action="store_true", help="Create train/val/test splits") |
|
|
parser.add_argument("--seed", type=int, default=42, help="Random seed") |
|
|
|
|
|
args = parser.parse_args() |
|
|
random.seed(args.seed) |
|
|
|
|
|
input_path = Path(args.input) |
|
|
|
|
|
if args.split: |
|
|
output_dir = Path(args.output) if args.output else Path("data/instruction") |
|
|
create_train_val_test_split(input_path, output_dir, args.format) |
|
|
else: |
|
|
output_path = Path(args.output) if args.output else input_path.with_suffix(f".{args.format}.jsonl") |
|
|
converter = InstructionConverter() |
|
|
converter.convert_file(input_path, output_path, args.format, args.max_samples) |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
main() |
|
|
|