ai_exec / src /data_processing /qa_generator.py
Chaitanya-aitf's picture
Upload 38 files
45ee481 verified
"""
Q&A Generator Module
Generate synthetic Q&A training pairs from blog content using Claude/GPT-4 API.
Creates diverse questions and CEO-style answers for fine-tuning.
Example usage:
generator = QAGenerator(provider="anthropic")
qa_pairs = generator.generate_from_segments(segments, num_pairs=500)
"""
import json
import os
import random
import time
from dataclasses import dataclass, field
from pathlib import Path
from typing import Literal, Optional
from loguru import logger
from tenacity import retry, stop_after_attempt, wait_exponential
try:
import anthropic
ANTHROPIC_AVAILABLE = True
except ImportError:
ANTHROPIC_AVAILABLE = False
try:
import openai
OPENAI_AVAILABLE = True
except ImportError:
OPENAI_AVAILABLE = False
@dataclass
class QAPair:
"""Represents a Q&A training pair."""
question: str
answer: str
source_segment_index: int
source_post_title: str
question_type: str
metadata: dict = field(default_factory=dict)
def to_dict(self) -> dict:
"""Convert to dictionary for serialization."""
return {
"question": self.question,
"answer": self.answer,
"source_segment_index": self.source_segment_index,
"source_post_title": self.source_post_title,
"question_type": self.question_type,
"metadata": self.metadata,
}
# Question type templates for diverse generation
QUESTION_TEMPLATES = {
"opinion": [
"What is your view on {topic}?",
"How do you feel about {topic}?",
"What's your take on {topic}?",
"Do you think {topic} is important? Why?",
"What are your thoughts on {topic}?",
],
"strategic": [
"How should companies approach {topic}?",
"What strategy would you recommend for {topic}?",
"What's the best way to handle {topic}?",
"How do you see {topic} evolving in the future?",
"What opportunities do you see in {topic}?",
],
"personal_philosophy": [
"What drives your passion for {topic}?",
"What lessons have you learned about {topic}?",
"How has your thinking on {topic} evolved?",
"What advice would you give about {topic}?",
"What's the most important thing to understand about {topic}?",
],
"factual": [
"Can you explain {topic}?",
"What is {topic} and why does it matter?",
"Tell me about your experience with {topic}.",
"What are the key aspects of {topic}?",
"How does {topic} work in practice?",
],
"challenge": [
"Some people criticize {topic}. How would you respond?",
"What are the main challenges with {topic}?",
"What mistakes do people commonly make regarding {topic}?",
"Is there a downside to {topic}?",
"What are the risks associated with {topic}?",
],
}
class QAGenerator:
"""
Generate synthetic Q&A pairs using LLM APIs.
Supports:
- Anthropic Claude API
- OpenAI GPT-4 API
- Rate limiting and retry logic
- Cost estimation
Example:
>>> generator = QAGenerator(provider="anthropic")
>>> pairs = generator.generate_from_segments(segments, num_pairs=100)
>>> print(f"Generated {len(pairs)} Q&A pairs")
"""
# Pricing per 1M tokens (approximate, check current rates)
PRICING = {
"anthropic": {"input": 3.0, "output": 15.0}, # Claude 3 Sonnet
"openai": {"input": 10.0, "output": 30.0}, # GPT-4
}
def __init__(
self,
provider: Literal["anthropic", "openai"] = "anthropic",
model: Optional[str] = None,
api_key: Optional[str] = None,
requests_per_minute: int = 20,
ceo_name: str = "Ryouken Okuni",
company_name: str = "Akatsuki AI Technologies",
):
"""
Initialize the Q&A generator.
Args:
provider: API provider ("anthropic" or "openai")
model: Model name (defaults based on provider)
api_key: API key (or uses environment variable)
requests_per_minute: Rate limit
ceo_name: Name of the CEO persona
company_name: Name of the company
"""
self.provider = provider
self.requests_per_minute = requests_per_minute
self.ceo_name = ceo_name
self.company_name = company_name
# Set default models
if model is None:
self.model = (
"claude-sonnet-4-20250514" if provider == "anthropic"
else "gpt-4-turbo-preview"
)
else:
self.model = model
# Initialize client
if provider == "anthropic":
if not ANTHROPIC_AVAILABLE:
raise ImportError("anthropic package not installed. Run: pip install anthropic")
api_key = api_key or os.environ.get("ANTHROPIC_API_KEY")
if not api_key:
raise ValueError("ANTHROPIC_API_KEY not found in environment")
self.client = anthropic.Anthropic(api_key=api_key)
else:
if not OPENAI_AVAILABLE:
raise ImportError("openai package not installed. Run: pip install openai")
api_key = api_key or os.environ.get("OPENAI_API_KEY")
if not api_key:
raise ValueError("OPENAI_API_KEY not found in environment")
self.client = openai.OpenAI(api_key=api_key)
# Rate limiting
self._last_request_time = 0
self._min_request_interval = 60.0 / requests_per_minute
# Token tracking for cost estimation
self._total_input_tokens = 0
self._total_output_tokens = 0
def estimate_cost(self, num_pairs: int, avg_segment_tokens: int = 400) -> dict:
"""
Estimate API cost before generation.
Args:
num_pairs: Number of Q&A pairs to generate
avg_segment_tokens: Average tokens per segment
Returns:
Dictionary with estimated costs
"""
# Estimate tokens per pair
prompt_tokens = 500 + avg_segment_tokens # System + segment
completion_tokens = 300 # Average response
total_input = num_pairs * prompt_tokens
total_output = num_pairs * completion_tokens
pricing = self.PRICING[self.provider]
input_cost = (total_input / 1_000_000) * pricing["input"]
output_cost = (total_output / 1_000_000) * pricing["output"]
return {
"estimated_input_tokens": total_input,
"estimated_output_tokens": total_output,
"estimated_cost_usd": round(input_cost + output_cost, 2),
"provider": self.provider,
"model": self.model,
}
def get_actual_cost(self) -> dict:
"""Get actual cost based on tracked tokens."""
pricing = self.PRICING[self.provider]
input_cost = (self._total_input_tokens / 1_000_000) * pricing["input"]
output_cost = (self._total_output_tokens / 1_000_000) * pricing["output"]
return {
"total_input_tokens": self._total_input_tokens,
"total_output_tokens": self._total_output_tokens,
"actual_cost_usd": round(input_cost + output_cost, 2),
}
def generate_from_segments(
self,
segments: list,
num_pairs: int = 500,
questions_per_segment: int = 3,
output_path: Optional[str | Path] = None,
) -> list[QAPair]:
"""
Generate Q&A pairs from text segments.
Args:
segments: List of TextSegment objects
num_pairs: Target number of Q&A pairs
questions_per_segment: Max questions per segment
output_path: Optional path to save pairs as JSON
Returns:
List of QAPair objects
"""
logger.info(f"Generating {num_pairs} Q&A pairs from {len(segments)} segments")
# Estimate cost first
estimate = self.estimate_cost(num_pairs)
logger.info(f"Estimated cost: ${estimate['estimated_cost_usd']:.2f}")
qa_pairs = []
segments_to_use = list(segments)
random.shuffle(segments_to_use)
pairs_generated = 0
segment_idx = 0
while pairs_generated < num_pairs and segment_idx < len(segments_to_use):
segment = segments_to_use[segment_idx]
# Generate questions for this segment
questions_for_segment = min(
questions_per_segment,
num_pairs - pairs_generated,
)
try:
segment_pairs = self._generate_for_segment(
segment, questions_for_segment
)
qa_pairs.extend(segment_pairs)
pairs_generated += len(segment_pairs)
logger.debug(
f"Generated {len(segment_pairs)} pairs from segment {segment_idx} "
f"({pairs_generated}/{num_pairs} total)"
)
except Exception as e:
logger.warning(f"Failed to generate for segment {segment_idx}: {e}")
segment_idx += 1
# Progress update
if pairs_generated % 50 == 0:
logger.info(f"Progress: {pairs_generated}/{num_pairs} pairs generated")
# Log final cost
actual_cost = self.get_actual_cost()
logger.info(f"Actual cost: ${actual_cost['actual_cost_usd']:.2f}")
# Save if path provided
if output_path:
self._save_pairs(qa_pairs, output_path)
return qa_pairs
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=4, max=60),
)
def _generate_for_segment(
self, segment, num_questions: int
) -> list[QAPair]:
"""Generate Q&A pairs for a single segment."""
# Rate limiting
self._rate_limit()
# Select question types
question_types = random.sample(
list(QUESTION_TEMPLATES.keys()),
min(num_questions, len(QUESTION_TEMPLATES)),
)
# Build prompt
system_prompt = self._build_system_prompt()
user_prompt = self._build_generation_prompt(
segment.content, question_types, num_questions
)
# Call API
response_text, input_tokens, output_tokens = self._call_api(
system_prompt, user_prompt
)
# Track tokens
self._total_input_tokens += input_tokens
self._total_output_tokens += output_tokens
# Parse response
pairs = self._parse_response(
response_text,
segment.segment_index,
segment.source_post_title,
question_types,
)
return pairs
def _rate_limit(self) -> None:
"""Enforce rate limiting between requests."""
current_time = time.time()
time_since_last = current_time - self._last_request_time
if time_since_last < self._min_request_interval:
sleep_time = self._min_request_interval - time_since_last
time.sleep(sleep_time)
self._last_request_time = time.time()
def _build_system_prompt(self) -> str:
"""Build the system prompt for Q&A generation."""
return f"""You are helping create training data for an AI assistant that will replicate the communication style of {self.ceo_name}, CEO of {self.company_name}.
Your task is to generate realistic Q&A pairs based on provided blog content. The questions should be ones that stakeholders, employees, journalists, or business partners might ask. The answers should authentically capture the CEO's voice, reasoning patterns, and communication style as demonstrated in the source content.
Guidelines for generating responses:
1. Match the tone and vocabulary of the original content
2. Preserve the CEO's unique way of explaining concepts
3. Maintain the same level of formality/informality
4. Include similar rhetorical patterns (questions, examples, analogies)
5. Stay factually consistent with the source material
6. For topics not directly covered, extrapolate based on evident principles and values
Output format: Return a JSON array of Q&A pairs. Each pair should have:
- "question": The stakeholder's question
- "answer": The CEO's response in their authentic voice
- "question_type": The category of question (opinion, strategic, factual, etc.)"""
def _build_generation_prompt(
self, content: str, question_types: list[str], num_questions: int
) -> str:
"""Build the user prompt for generation."""
types_str = ", ".join(question_types)
return f"""Based on the following blog content, generate {num_questions} Q&A pairs.
Include these question types: {types_str}
Blog content:
---
{content}
---
Generate {num_questions} diverse Q&A pairs that capture the CEO's authentic voice. Return only valid JSON array.
Example format:
[
{{
"question": "What is your view on AI in business?",
"answer": "I believe AI is fundamentally transforming how we...",
"question_type": "opinion"
}}
]"""
def _call_api(
self, system_prompt: str, user_prompt: str
) -> tuple[str, int, int]:
"""Call the LLM API and return response with token counts."""
if self.provider == "anthropic":
response = self.client.messages.create(
model=self.model,
max_tokens=2000,
system=system_prompt,
messages=[{"role": "user", "content": user_prompt}],
)
text = response.content[0].text
input_tokens = response.usage.input_tokens
output_tokens = response.usage.output_tokens
else:
response = self.client.chat.completions.create(
model=self.model,
max_tokens=2000,
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt},
],
)
text = response.choices[0].message.content
input_tokens = response.usage.prompt_tokens
output_tokens = response.usage.completion_tokens
return text, input_tokens, output_tokens
def _parse_response(
self,
response_text: str,
segment_index: int,
source_title: str,
question_types: list[str],
) -> list[QAPair]:
"""Parse the API response into QAPair objects."""
pairs = []
try:
# Try to extract JSON from response
# Handle markdown code blocks
if "```json" in response_text:
json_start = response_text.find("```json") + 7
json_end = response_text.find("```", json_start)
response_text = response_text[json_start:json_end]
elif "```" in response_text:
json_start = response_text.find("```") + 3
json_end = response_text.find("```", json_start)
response_text = response_text[json_start:json_end]
data = json.loads(response_text.strip())
if isinstance(data, list):
for item in data:
if isinstance(item, dict) and "question" in item and "answer" in item:
pairs.append(QAPair(
question=item["question"],
answer=item["answer"],
source_segment_index=segment_index,
source_post_title=source_title,
question_type=item.get("question_type", "unknown"),
))
except json.JSONDecodeError as e:
logger.warning(f"Failed to parse JSON response: {e}")
# Try to salvage partial response
pairs = self._salvage_partial_response(
response_text, segment_index, source_title
)
return pairs
def _salvage_partial_response(
self, response_text: str, segment_index: int, source_title: str
) -> list[QAPair]:
"""Attempt to extract Q&A pairs from malformed response."""
pairs = []
# Look for question/answer patterns
import re
qa_pattern = re.compile(
r'"question":\s*"([^"]+)".*?"answer":\s*"([^"]+)"',
re.DOTALL
)
for match in qa_pattern.finditer(response_text):
pairs.append(QAPair(
question=match.group(1),
answer=match.group(2),
source_segment_index=segment_index,
source_post_title=source_title,
question_type="unknown",
))
return pairs
def _save_pairs(self, pairs: list[QAPair], path: str | Path) -> None:
"""Save Q&A pairs to JSON file."""
path = Path(path)
data = [p.to_dict() for p in pairs]
with open(path, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2, ensure_ascii=False)
logger.info(f"Saved {len(pairs)} Q&A pairs to: {path}")
@staticmethod
def load_pairs(path: str | Path) -> list[QAPair]:
"""Load Q&A pairs from JSON file."""
with open(path, "r", encoding="utf-8") as f:
data = json.load(f)
return [
QAPair(
question=item["question"],
answer=item["answer"],
source_segment_index=item["source_segment_index"],
source_post_title=item["source_post_title"],
question_type=item["question_type"],
metadata=item.get("metadata", {}),
)
for item in data
]
def main():
"""CLI entry point for testing the generator."""
import argparse
parser = argparse.ArgumentParser(
description="Generate Q&A pairs from text segments using LLM APIs",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
python qa_generator.py segments.json --output qa_pairs.json --num-pairs 100
python qa_generator.py segments.json --provider openai --estimate-only
Environment variables:
ANTHROPIC_API_KEY - Anthropic API key (for Claude)
OPENAI_API_KEY - OpenAI API key (for GPT-4)
""",
)
parser.add_argument("input", help="Input segments JSON file")
parser.add_argument("--output", "-o", help="Output Q&A pairs JSON file")
parser.add_argument(
"--num-pairs",
type=int,
default=100,
help="Number of Q&A pairs to generate (default: 100)",
)
parser.add_argument(
"--provider",
choices=["anthropic", "openai"],
default="anthropic",
help="API provider (default: anthropic)",
)
parser.add_argument(
"--model",
help="Model name (defaults based on provider)",
)
parser.add_argument(
"--estimate-only",
action="store_true",
help="Only show cost estimate, don't generate",
)
parser.add_argument(
"--ceo-name",
default="Ryouken Okuni",
help="CEO name for persona",
)
parser.add_argument(
"--company-name",
default="Akatsuki AI Technologies",
help="Company name for persona",
)
args = parser.parse_args()
# Load segments
with open(args.input, "r", encoding="utf-8") as f:
segments_data = json.load(f)
# Convert to simple objects for the generator
from dataclasses import dataclass as dc
@dc
class SimpleSegment:
content: str
segment_index: int
source_post_title: str
segments = [
SimpleSegment(
content=s["content"],
segment_index=s.get("segment_index", i),
source_post_title=s.get("source_post_title", "Unknown"),
)
for i, s in enumerate(segments_data)
]
try:
generator = QAGenerator(
provider=args.provider,
model=args.model,
ceo_name=args.ceo_name,
company_name=args.company_name,
)
except (ImportError, ValueError) as e:
print(f"Error initializing generator: {e}")
return 1
# Show estimate
estimate = generator.estimate_cost(args.num_pairs)
print(f"\n=== Cost Estimate ===")
print(f"Provider: {estimate['provider']}")
print(f"Model: {estimate['model']}")
print(f"Estimated input tokens: {estimate['estimated_input_tokens']:,}")
print(f"Estimated output tokens: {estimate['estimated_output_tokens']:,}")
print(f"Estimated cost: ${estimate['estimated_cost_usd']:.2f}")
if args.estimate_only:
return 0
# Confirm
print("\nProceed with generation? [y/N] ", end="")
response = input().strip().lower()
if response != "y":
print("Cancelled.")
return 0
# Generate
output_path = args.output or "qa_pairs.json"
pairs = generator.generate_from_segments(
segments, num_pairs=args.num_pairs, output_path=output_path
)
# Show results
actual = generator.get_actual_cost()
print(f"\n=== Generation Complete ===")
print(f"Generated: {len(pairs)} Q&A pairs")
print(f"Actual cost: ${actual['actual_cost_usd']:.2f}")
print(f"Saved to: {output_path}")
return 0
if __name__ == "__main__":
exit(main())