Ranjit Behera
FinEE v1.0 - Finance Entity Extractor
dcc24f8
#!/usr/bin/env python3
"""
FinEE Training Pipeline v1.0
Master orchestrator for training the Finance Entity Extractor.
Handles data generation, domain adaptation, fine-tuning, and export.
"""
import argparse
import json
import subprocess
import sys
import logging
import time
from pathlib import Path
from datetime import datetime
from typing import List, Dict, Any
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
logger = logging.getLogger(__name__)
# Pipeline Configuration
CONFIG = {
"version": "1.0.0",
"project_name": "finee",
"models": {
"base": "microsoft/Phi-3-mini-4k-instruct",
"domain": "models/base/phi3-finance-base",
"final": "models/finee-v1.0",
"adapter": "models/adapters/finee-adapter-v1",
},
"data_generation": {
"script": "scripts/generate_comprehensive_data.py",
"output_dir": "data/training",
"samples": 10000,
},
"domain_pretrain": {
"enabled": False, # Skip if using pre-trained base
"script": "scripts/domain_pretrain.py",
"iters": 2000,
},
"finetune": {
"script": "scripts/retrain_v8.py", # Using latest retraining script logic
"iters": 1000,
"batch_size": 4, # Increased for M-series
"learning_rate": 1e-5,
"lora_layers": 16,
},
"evaluation": {
"script": "scripts/test_multi_bank.py",
"benchmark_dir": "data/benchmark",
},
"export": {
"script": "scripts/upload_to_hf.py",
"repo_id": "Ranjit0034/finance-entity-extractor",
}
}
class Pipeline:
def __init__(self, dry_run: bool = False):
self.dry_run = dry_run
self.start_time = time.time()
self.ensure_directories()
def ensure_directories(self):
"""Create necessary directories."""
dirs = [
"data/training",
"data/benchmark",
"models/base",
"models/adapters",
"logs"
]
for d in dirs:
Path(d).mkdir(parents=True, exist_ok=True)
def run_step(self, name: str, cmd: List[str], cwd: str = ".") -> bool:
"""Run a single pipeline step."""
logger.info(f"▶️ STARTING STEP: {name}")
logger.info(f"Command: {' '.join(cmd)}")
if self.dry_run:
logger.info("Dry run - Skipping execution")
return True
try:
subprocess.run(cmd, cwd=cwd, check=True)
logger.info(f"✅ COMPLETED STEP: {name}")
return True
except subprocess.CalledProcessError as e:
logger.error(f"❌ FAILED STEP: {name}")
logger.error(str(e))
return False
def check_dependencies(self):
"""Verify dependencies are installed."""
logger.info("Verifying dependencies...")
try:
import mlx.core
import finee
logger.info(f"Found finee version: {finee.__version__}")
return True
except ImportError as e:
logger.error(f"Missing dependency: {e}")
logger.error("Please run: pip install -e .[metal]")
return False
def generate_data(self):
"""Step 1: Generate synthetic training data."""
script = CONFIG["data_generation"]["script"]
return self.run_step(
"Data Generation",
[sys.executable, script]
)
def domain_pretrain(self):
"""Step 2: Domain Adaptation (Optional)."""
if not CONFIG["domain_pretrain"]["enabled"]:
logger.info("Skipping domain pre-training (disabled in config)")
return True
script = CONFIG["domain_pretrain"]["script"]
return self.run_step(
"Domain Pre-training",
[sys.executable, script]
)
def finetune(self):
"""Step 3: Fine-tuning."""
# We'll use mlx_lm directly or the wrapper script
# Using direct mlx_lm command for transparency
cmd = [
"mlx_lm.lora",
"--model", CONFIG["models"]["base"],
"--train",
"--data", CONFIG["data_generation"]["output_dir"],
"--adapter-path", CONFIG["models"]["adapter"],
"--iters", str(CONFIG["finetune"]["iters"]),
"--batch-size", str(CONFIG["finetune"]["batch_size"]),
"--learning-rate", str(CONFIG["finetune"]["learning_rate"]),
"--lora-layers", str(CONFIG["finetune"]["lora_layers"]),
"--seed", "42"
]
return self.run_step("Fine-tuning", cmd)
def fuse_model(self):
"""Step 4: Fuse adapters."""
cmd = [
"mlx_lm.fuse",
"--model", CONFIG["models"]["base"],
"--adapter-path", CONFIG["models"]["adapter"],
"--save-path", CONFIG["models"]["final"]
]
return self.run_step("Model Fusion", cmd)
def evaluate(self):
"""Step 5: Evaluation."""
script = CONFIG["evaluation"]["script"]
return self.run_step(
"Evaluation",
[sys.executable, script]
)
def export(self):
"""Step 6: Export/Upload."""
script = CONFIG["export"]["script"]
return self.run_step(
"HugginFace Export",
[sys.executable, script]
)
def run_all(self):
"""Run full pipeline."""
if not self.check_dependencies():
return
steps = [
self.generate_data,
self.domain_pretrain,
self.finetune,
self.fuse_model,
self.evaluate,
self.export
]
for step in steps:
if not step():
logger.error("Pipeline aborted due to failure.")
sys.exit(1)
duration = time.time() - self.start_time
logger.info(f"🎉 Pipeline completed successfully in {duration/60:.2f} minutes")
def main():
parser = argparse.ArgumentParser(description="FinEE Training Pipeline")
parser.add_argument("--step", choices=["data", "pretrain", "finetune", "fuse", "eval", "export", "all"], default="all")
parser.add_argument("--dry-run", action="store_true", help="Print commands without executing")
args = parser.parse_args()
pipeline = Pipeline(dry_run=args.dry_run)
if args.step == "all":
pipeline.run_all()
else:
pipeline.check_dependencies()
steps = {
"data": pipeline.generate_data,
"pretrain": pipeline.domain_pretrain,
"finetune": pipeline.finetune,
"fuse": pipeline.fuse_model,
"eval": pipeline.evaluate,
"export": pipeline.export
}
steps[args.step]()
if __name__ == "__main__":
main()