File size: 14,526 Bytes
78a0ca9 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 | import os
import pandas as pd
import re
from typing import List, Optional, Dict, Any
from dataclasses import dataclass
try:
import data_designer.config as dd
from data_designer.config.column_configs import Score
from data_designer.interface import DataDesigner
except ImportError:
dd = None
Score = None
DataDesigner = None
@dataclass
class AgenticDataConfig:
name: str = "agentic_dataset"
num_records: int = 10
task_description: str = "SQL-to-Natural-Language conversion"
scenarios_path: Optional[str] = None # Optional path to a JSONL file with 'scenario' column
model_alias: str = "llm-text"
judge_model_alias: str = "llm-judge"
output_path: str = "agentic_synthetic_data.jsonl"
min_quality_score: int = 2 # Perplexity often gets penalized for citations even when they are accurate
generate_dpo: bool = False # Whether to generate 'rejected' responses for DPO
generate_reasoning: bool = False # Whether to generate <reasoning>...<answer> format
num_instructions_per_scenario: int = 1 # Number of instructions per scenario for diversity
max_tokens: int = 4096 # Max tokens for generation
class AgenticDataGenerator:
def __init__(self, designer: Optional[DataDesigner] = None):
if not designer:
# Configure OpenAI and Perplexity providers
model_providers = []
if os.environ.get("OPENAI_API_KEY"):
model_providers.append(dd.ModelProvider(
name="openai",
provider_type="openai",
api_key="OPENAI_API_KEY",
endpoint="https://api.openai.com/v1"
))
if os.environ.get("PERPLEXITY_API_KEY"):
model_providers.append(dd.ModelProvider(
name="perplexity",
provider_type="openai",
api_key="PERPLEXITY_API_KEY",
endpoint="https://api.perplexity.ai"
))
if os.environ.get("PAPERCLIP_API_KEY"):
model_providers.append(dd.ModelProvider(
name="paperclip",
provider_type="openai",
api_key="PAPERCLIP_API_KEY",
endpoint=os.environ.get("PAPERCLIP_API_URL", "") + "/v1"
))
if not model_providers:
raise ValueError("Neither OPENAI_API_KEY nor PERPLEXITY_API_KEY is set.")
designer = DataDesigner(model_providers=model_providers)
self.designer = designer
def strip_citations(self, text: str) -> str:
"""Removes Perplexity-style citations like [1], [2], etc."""
if not isinstance(text, str):
return text
return re.sub(r'\[\d+\]', '', text).strip()
def generate(self, config: AgenticDataConfig) -> pd.DataFrame:
print(f"Starting advanced agentic data generation for task: {config.task_description}")
# Determine default provider and model
# Switch to Paperclip as it's locally available
provider_name = "paperclip"
model_name = "gpt-4o"
llm_model = dd.ModelConfig(
alias=config.model_alias,
model=model_name,
provider=provider_name,
inference_parameters=dd.ChatCompletionInferenceParams(
max_parallel_requests=1,
max_tokens=config.max_tokens
)
)
builder = dd.DataDesignerConfigBuilder(model_configs=[llm_model])
if config.scenarios_path and os.path.exists(config.scenarios_path):
print(f"Loading scenarios from: {config.scenarios_path}")
scenarios_df = pd.read_json(config.scenarios_path, orient="records", lines=True)
if "scenario" not in scenarios_df.columns:
raise ValueError(f"Input file {config.scenarios_path} must contain a 'scenario' column.")
# Use SeedDatasetColumnConfig to load existing scenarios
builder.add_column(
dd.SamplerColumnConfig(
name="task",
sampler_type="category",
params=dd.CategorySamplerParams(values=[config.task_description])
)
)
scenarios = scenarios_df["scenario"].tolist()[:config.num_records]
builder.add_column(
dd.SamplerColumnConfig(
name="scenario",
sampler_type="category",
params=dd.CategorySamplerParams(values=scenarios)
)
)
else:
# Add task description as a sampler column
builder.add_column(
dd.SamplerColumnConfig(
name="task",
sampler_type="category",
params=dd.CategorySamplerParams(values=[config.task_description])
)
)
# Phase 1: Brainstorming Scenarios
builder.add_column(
dd.LLMTextColumnConfig(
name="scenario",
model_alias=config.model_alias,
prompt="Brainstorm a highly complex and challenging scenario for the task: '{{ task }}'. Focus on realistic edge cases, multi-step logic, and potential pitfalls. DO NOT use search. DO NOT use citations. Output a detailed scenario description."
)
)
# Phase 1.1: Solvability & Constraint Verification
builder.add_column(
dd.LLMTextColumnConfig(
name="scenario_verification",
model_alias=config.model_alias,
prompt="Review the scenario: '{{ scenario }}'. Is it clearly defined and solvable without external information? Identify any ambiguities or missing constraints. Output 'VERIFIED' if good, or a list of required clarifications. NO citations."
)
)
# Phase 2: Instruction Generation
instruction_prompt = "Based on the scenario: '{{ scenario }}', create a natural language request that a user might make for the task: '{{ task }}'. Output ONLY the request text. NO citations."
if config.num_instructions_per_scenario > 1:
# In a real production system, we'd use a seed dataset expansion here.
# For simplicity in this script, we'll just generate one instruction,
# as DataDesigner processes row-by-row.
pass
builder.add_column(
dd.LLMTextColumnConfig(
name="instruction",
model_alias=config.model_alias,
prompt=instruction_prompt
)
)
# Phase 2.1: Reasoning Output
output_prompt = "Based on the instruction: '{{ instruction }}', provide the expected output for the task: '{{ task }}'. Output ONLY the direct answer/code, no conversational filler. NO citations."
if config.generate_reasoning:
output_prompt = "Based on the instruction: '{{ instruction }}', provide the expected output for the task: '{{ task }}'. Use the following format: <reasoning>STEP BY STEP REASONING HERE</reasoning><answer>DIRECT ANSWER HERE</answer>. Ensure the reasoning is rigorous, comprehensive, and logically flawless."
builder.add_column(
dd.LLMTextColumnConfig(
name="initial_output",
model_alias=config.model_alias,
prompt=output_prompt
)
)
# Phase 2.2: Critique (Expert Review)
builder.add_column(
dd.LLMTextColumnConfig(
name="critique",
model_alias=config.model_alias,
prompt="Act as an expert reviewer. Critique the initial_output: '{{ initial_output }}' for the instruction: '{{ instruction }}' within scenario: '{{ scenario }}'. Identify any inaccuracies, logical gaps, mathematical errors, or formatting issues. Be extremely critical. DO NOT use search. DO NOT use citations."
)
)
# Phase 2.3: Refinement (Self-Correction)
format_instruction = "Use the following format: <reasoning>STEP BY STEP REASONING HERE</reasoning><answer>DIRECT ANSWER HERE</answer>." if config.generate_reasoning else "Output ONLY the direct answer/code, no conversational filler."
builder.add_column(
dd.LLMTextColumnConfig(
name="output",
model_alias=config.model_alias,
prompt="Based on the original instruction: '{{ instruction }}', the initial_output: '{{ initial_output }}', and the critique: '{{ critique }}', provide a final, verified, and highly accurate version of the output. " + format_instruction + " Ensure every logical step is explicit. NO citations."
)
)
# Phase 2.4: Rejected Generation (for DPO) - Targeted Failure
if config.generate_dpo:
rejected_prompt = "Based on the instruction: '{{ instruction }}' and the critique: '{{ critique }}', provide a response that is WRONG. Specifically, ignore one of the points from the critique or introduce a subtle logical error that a person might miss. " + format_instruction + " NO citations."
builder.add_column(
dd.LLMTextColumnConfig(
name="rejected",
model_alias=config.model_alias,
prompt=rejected_prompt
)
)
# Phase 3: Judging (LLM-as-a-Judge)
builder.add_column(
dd.LLMJudgeColumnConfig(
name="quality_score",
model_alias=config.model_alias,
prompt="Evaluate the final output: '{{ output }}' based on the instruction: '{{ instruction }}' and scenario: '{{ scenario }}'.",
scores=[
Score(
name="accuracy",
description="Is the output accurate and correct based on the instruction?",
options={1: "Incorrect", 2: "Partially correct / minor issues", 3: "Fully correct"}
),
Score(
name="reasoning",
description="Is the reasoning step-by-step and logically sound?",
options={1: "None/Poor", 2: "Decent but sparse", 3: "Rigorous and detailed"}
)
]
)
)
# Run creation
result = self.designer.create(config_builder=builder, num_records=config.num_records, dataset_name=config.name)
df = result.load_dataset()
# Post-process: Strip citations from all generated text columns
cols_to_strip = ["scenario", "instruction", "initial_output", "critique", "output", "scenario_verification"]
if config.generate_dpo:
cols_to_strip.append("rejected")
for col in cols_to_strip:
if col in df.columns:
df[col] = df[col].apply(self.strip_citations)
# Phase 4: Filtering
if "quality_score" in df.columns:
def extract_score(val, key="accuracy"):
if isinstance(val, dict) and key in val:
return val[key].get("score", 0)
return 0
df["accuracy_score"] = df["quality_score"].apply(lambda x: extract_score(x, "accuracy"))
df["reasoning_score"] = df["quality_score"].apply(lambda x: extract_score(x, "reasoning"))
print("Quality Scores (Accuracy):", df["accuracy_score"].tolist())
print("Reasoning Scores:", df["reasoning_score"].tolist())
# Save raw before filtering
df.to_json("raw_" + config.output_path, orient="records", lines=True)
# Filter by accuracy AND reasoning if reasoning was requested
if config.generate_reasoning:
filtered_df = df[(df["accuracy_score"] >= config.min_quality_score) & (df["reasoning_score"] >= 2)].copy()
else:
filtered_df = df[df["accuracy_score"] >= config.min_quality_score].copy()
print(f"Filtered dataset: {len(filtered_df)}/{len(df)} records passed quality threshold.")
df = filtered_df
# Save to JSONL
df.to_json(config.output_path, orient="records", lines=True)
print(f"Advanced agentic synthetic data saved to {config.output_path}")
return df
def format_for_qwen(self, df: pd.DataFrame) -> List[Dict[str, str]]:
"""Formats the dataframe into ChatML for Qwen training."""
chatml_data = []
for _, row in df.iterrows():
chatml_data.append({
"text": f"<|im_start|>user\n{row['instruction']}<|im_end|>\n<|im_start|>assistant\n{row['output']}<|im_end|>"
})
return chatml_data
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="Agentic Synthetic Data Generation for Qwen Fine-tuning")
parser.add_argument("--task", type=str, default="SQL-to-Natural-Language conversion", help="Description of the task")
parser.add_argument("--scenarios", type=str, default=None, help="Path to JSONL with scenarios")
parser.add_argument("--num", type=int, default=2, help="Number of records to generate")
parser.add_argument("--output", type=str, default="agentic_synthetic_data.jsonl", help="Output path for the JSONL file")
parser.add_argument("--dpo", action="store_true", help="Generate rejected responses for DPO")
parser.add_argument("--reasoning", action="store_true", help="Generate <reasoning>...<answer> format")
parser.add_argument("--max-tokens", type=int, default=4096, help="Max tokens for generation")
args = parser.parse_args()
config = AgenticDataConfig(
num_records=args.num,
task_description=args.task,
scenarios_path=args.scenarios,
output_path=args.output,
generate_dpo=args.dpo,
generate_reasoning=args.reasoning,
max_tokens=args.max_tokens
)
generator = AgenticDataGenerator()
df = generator.generate(config)
if not df.empty:
print(f"Generated {len(df)} records.")
print("Sample record:")
print(df.iloc[0].to_dict())
else:
print("No records generated.")
|