import os
import json
import random
import sys
from pathlib import Path
import torch
from datasets import Dataset
from transformers import AutoModelForCausalLM, AutoTokenizer, TrainingArguments, DataCollatorForLanguageModeling
ROOT = Path(__file__).resolve().parents[1]
if str(ROOT) not in sys.path:
sys.path.insert(0, str(ROOT))
# 1. Configuration
MODEL_NAME = "Qwen/Qwen2.5-1.5B-Instruct"
TRAJECTORY_PATH = "checkpoints/sft_trajectories.jsonl"
OUTPUT_DIR = "models/local_policy"
SYSTEM_PROMPT = """You are a Quant Trader. Analyze the scenario and return a single action.
Scenario:
{scenario}
"""
# 2. Load and Tokenize Data
print("Loading model and tokenizer...")
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)
tokenizer.pad_token = tokenizer.eos_token
def tokenize_function(example):
prompt = SYSTEM_PROMPT.format(scenario=example["scenario"])
text = (
f"{prompt}\n"
f"\n{example['reasoning']}\n\n"
f"\n{example['action']}\n{tokenizer.eos_token}"
)
return tokenizer(text, truncation=True, max_length=512)
print(f"Loading data from {TRAJECTORY_PATH}...")
records = []
if os.path.exists(TRAJECTORY_PATH):
with open(TRAJECTORY_PATH, "r", encoding="utf-8") as f:
for line in f:
row = json.loads(line)
if row.get("final_grade", 0.0) >= 0.50:
records.append({
"scenario": json.dumps({
"state": row["state"],
"signals": {
"ta": row["signals"]["ta_score"],
"fa": row["signals"]["fa_sentiment"],
"position_limit": row["signals"]["position_limit"],
},
}),
"action": json.dumps(row["action"]),
"reasoning": row["signals"].get("reasoning", {}).get(
"trader",
"Follow trend, respect the position limit, and size conservatively.",
),
})
if not records:
print("No high-quality data found!")
exit()
# Subset to save RAM
random.shuffle(records)
records = records[:10000] # Use top 10k samples only
dataset = Dataset.from_list(records)
tokenized_dataset = dataset.map(tokenize_function, remove_columns=dataset.column_names)
print(f"Tokenized dataset ready: {len(tokenized_dataset)} samples.")
# 3. Load Model
print("Loading model to CPU...")
model = AutoModelForCausalLM.from_pretrained(
MODEL_NAME,
torch_dtype=torch.float32, # type: ignore
device_map="cpu"
)
# 4. Train
print("Starting CPU Training (Lighter on RAM)...")
training_args = TrainingArguments(
output_dir="outputs",
max_steps=100, # Faster for CPU
per_device_train_batch_size=1, # Lowest RAM usage
gradient_accumulation_steps=8, # Maintain effective batch size of 8
learning_rate=1e-4,
logging_steps=10,
save_strategy="no",
use_cpu=True,
report_to="none"
)
# Standard Trainer (skipping SFTTrainer specific helper args)
from transformers import Trainer
trainer = Trainer(
model=model,
args=training_args,
train_dataset=tokenized_dataset,
data_collator=DataCollatorForLanguageModeling(tokenizer=tokenizer, mlm=False),
)
trainer.train()
# 5. Save
print(f"Saving fine-tuned model to {OUTPUT_DIR}...")
os.makedirs(OUTPUT_DIR, exist_ok=True)
model.save_pretrained(OUTPUT_DIR)
tokenizer.save_pretrained(OUTPUT_DIR)
print("Done! Your model is graduated.")