Spaces:
Paused
Paused
| """ | |
| Q&A Generator Module | |
| Generate synthetic Q&A training pairs from blog content using Claude/GPT-4 API. | |
| Creates diverse questions and CEO-style answers for fine-tuning. | |
| Example usage: | |
| generator = QAGenerator(provider="anthropic") | |
| qa_pairs = generator.generate_from_segments(segments, num_pairs=500) | |
| """ | |
| import json | |
| import os | |
| import random | |
| import time | |
| from dataclasses import dataclass, field | |
| from pathlib import Path | |
| from typing import Literal, Optional | |
| from loguru import logger | |
| from tenacity import retry, stop_after_attempt, wait_exponential | |
| try: | |
| import anthropic | |
| ANTHROPIC_AVAILABLE = True | |
| except ImportError: | |
| ANTHROPIC_AVAILABLE = False | |
| try: | |
| import openai | |
| OPENAI_AVAILABLE = True | |
| except ImportError: | |
| OPENAI_AVAILABLE = False | |
| class QAPair: | |
| """Represents a Q&A training pair.""" | |
| question: str | |
| answer: str | |
| source_segment_index: int | |
| source_post_title: str | |
| question_type: str | |
| metadata: dict = field(default_factory=dict) | |
| def to_dict(self) -> dict: | |
| """Convert to dictionary for serialization.""" | |
| return { | |
| "question": self.question, | |
| "answer": self.answer, | |
| "source_segment_index": self.source_segment_index, | |
| "source_post_title": self.source_post_title, | |
| "question_type": self.question_type, | |
| "metadata": self.metadata, | |
| } | |
| # Question type templates for diverse generation | |
| QUESTION_TEMPLATES = { | |
| "opinion": [ | |
| "What is your view on {topic}?", | |
| "How do you feel about {topic}?", | |
| "What's your take on {topic}?", | |
| "Do you think {topic} is important? Why?", | |
| "What are your thoughts on {topic}?", | |
| ], | |
| "strategic": [ | |
| "How should companies approach {topic}?", | |
| "What strategy would you recommend for {topic}?", | |
| "What's the best way to handle {topic}?", | |
| "How do you see {topic} evolving in the future?", | |
| "What opportunities do you see in {topic}?", | |
| ], | |
| "personal_philosophy": [ | |
| "What drives your passion for {topic}?", | |
| "What lessons have you learned about {topic}?", | |
| "How has your thinking on {topic} evolved?", | |
| "What advice would you give about {topic}?", | |
| "What's the most important thing to understand about {topic}?", | |
| ], | |
| "factual": [ | |
| "Can you explain {topic}?", | |
| "What is {topic} and why does it matter?", | |
| "Tell me about your experience with {topic}.", | |
| "What are the key aspects of {topic}?", | |
| "How does {topic} work in practice?", | |
| ], | |
| "challenge": [ | |
| "Some people criticize {topic}. How would you respond?", | |
| "What are the main challenges with {topic}?", | |
| "What mistakes do people commonly make regarding {topic}?", | |
| "Is there a downside to {topic}?", | |
| "What are the risks associated with {topic}?", | |
| ], | |
| } | |
| class QAGenerator: | |
| """ | |
| Generate synthetic Q&A pairs using LLM APIs. | |
| Supports: | |
| - Anthropic Claude API | |
| - OpenAI GPT-4 API | |
| - Rate limiting and retry logic | |
| - Cost estimation | |
| Example: | |
| >>> generator = QAGenerator(provider="anthropic") | |
| >>> pairs = generator.generate_from_segments(segments, num_pairs=100) | |
| >>> print(f"Generated {len(pairs)} Q&A pairs") | |
| """ | |
| # Pricing per 1M tokens (approximate, check current rates) | |
| PRICING = { | |
| "anthropic": {"input": 3.0, "output": 15.0}, # Claude 3 Sonnet | |
| "openai": {"input": 10.0, "output": 30.0}, # GPT-4 | |
| } | |
| def __init__( | |
| self, | |
| provider: Literal["anthropic", "openai"] = "anthropic", | |
| model: Optional[str] = None, | |
| api_key: Optional[str] = None, | |
| requests_per_minute: int = 20, | |
| ceo_name: str = "Ryouken Okuni", | |
| company_name: str = "Akatsuki AI Technologies", | |
| ): | |
| """ | |
| Initialize the Q&A generator. | |
| Args: | |
| provider: API provider ("anthropic" or "openai") | |
| model: Model name (defaults based on provider) | |
| api_key: API key (or uses environment variable) | |
| requests_per_minute: Rate limit | |
| ceo_name: Name of the CEO persona | |
| company_name: Name of the company | |
| """ | |
| self.provider = provider | |
| self.requests_per_minute = requests_per_minute | |
| self.ceo_name = ceo_name | |
| self.company_name = company_name | |
| # Set default models | |
| if model is None: | |
| self.model = ( | |
| "claude-sonnet-4-20250514" if provider == "anthropic" | |
| else "gpt-4-turbo-preview" | |
| ) | |
| else: | |
| self.model = model | |
| # Initialize client | |
| if provider == "anthropic": | |
| if not ANTHROPIC_AVAILABLE: | |
| raise ImportError("anthropic package not installed. Run: pip install anthropic") | |
| api_key = api_key or os.environ.get("ANTHROPIC_API_KEY") | |
| if not api_key: | |
| raise ValueError("ANTHROPIC_API_KEY not found in environment") | |
| self.client = anthropic.Anthropic(api_key=api_key) | |
| else: | |
| if not OPENAI_AVAILABLE: | |
| raise ImportError("openai package not installed. Run: pip install openai") | |
| api_key = api_key or os.environ.get("OPENAI_API_KEY") | |
| if not api_key: | |
| raise ValueError("OPENAI_API_KEY not found in environment") | |
| self.client = openai.OpenAI(api_key=api_key) | |
| # Rate limiting | |
| self._last_request_time = 0 | |
| self._min_request_interval = 60.0 / requests_per_minute | |
| # Token tracking for cost estimation | |
| self._total_input_tokens = 0 | |
| self._total_output_tokens = 0 | |
| def estimate_cost(self, num_pairs: int, avg_segment_tokens: int = 400) -> dict: | |
| """ | |
| Estimate API cost before generation. | |
| Args: | |
| num_pairs: Number of Q&A pairs to generate | |
| avg_segment_tokens: Average tokens per segment | |
| Returns: | |
| Dictionary with estimated costs | |
| """ | |
| # Estimate tokens per pair | |
| prompt_tokens = 500 + avg_segment_tokens # System + segment | |
| completion_tokens = 300 # Average response | |
| total_input = num_pairs * prompt_tokens | |
| total_output = num_pairs * completion_tokens | |
| pricing = self.PRICING[self.provider] | |
| input_cost = (total_input / 1_000_000) * pricing["input"] | |
| output_cost = (total_output / 1_000_000) * pricing["output"] | |
| return { | |
| "estimated_input_tokens": total_input, | |
| "estimated_output_tokens": total_output, | |
| "estimated_cost_usd": round(input_cost + output_cost, 2), | |
| "provider": self.provider, | |
| "model": self.model, | |
| } | |
| def get_actual_cost(self) -> dict: | |
| """Get actual cost based on tracked tokens.""" | |
| pricing = self.PRICING[self.provider] | |
| input_cost = (self._total_input_tokens / 1_000_000) * pricing["input"] | |
| output_cost = (self._total_output_tokens / 1_000_000) * pricing["output"] | |
| return { | |
| "total_input_tokens": self._total_input_tokens, | |
| "total_output_tokens": self._total_output_tokens, | |
| "actual_cost_usd": round(input_cost + output_cost, 2), | |
| } | |
| def generate_from_segments( | |
| self, | |
| segments: list, | |
| num_pairs: int = 500, | |
| questions_per_segment: int = 3, | |
| output_path: Optional[str | Path] = None, | |
| ) -> list[QAPair]: | |
| """ | |
| Generate Q&A pairs from text segments. | |
| Args: | |
| segments: List of TextSegment objects | |
| num_pairs: Target number of Q&A pairs | |
| questions_per_segment: Max questions per segment | |
| output_path: Optional path to save pairs as JSON | |
| Returns: | |
| List of QAPair objects | |
| """ | |
| logger.info(f"Generating {num_pairs} Q&A pairs from {len(segments)} segments") | |
| # Estimate cost first | |
| estimate = self.estimate_cost(num_pairs) | |
| logger.info(f"Estimated cost: ${estimate['estimated_cost_usd']:.2f}") | |
| qa_pairs = [] | |
| segments_to_use = list(segments) | |
| random.shuffle(segments_to_use) | |
| pairs_generated = 0 | |
| segment_idx = 0 | |
| while pairs_generated < num_pairs and segment_idx < len(segments_to_use): | |
| segment = segments_to_use[segment_idx] | |
| # Generate questions for this segment | |
| questions_for_segment = min( | |
| questions_per_segment, | |
| num_pairs - pairs_generated, | |
| ) | |
| try: | |
| segment_pairs = self._generate_for_segment( | |
| segment, questions_for_segment | |
| ) | |
| qa_pairs.extend(segment_pairs) | |
| pairs_generated += len(segment_pairs) | |
| logger.debug( | |
| f"Generated {len(segment_pairs)} pairs from segment {segment_idx} " | |
| f"({pairs_generated}/{num_pairs} total)" | |
| ) | |
| except Exception as e: | |
| logger.warning(f"Failed to generate for segment {segment_idx}: {e}") | |
| segment_idx += 1 | |
| # Progress update | |
| if pairs_generated % 50 == 0: | |
| logger.info(f"Progress: {pairs_generated}/{num_pairs} pairs generated") | |
| # Log final cost | |
| actual_cost = self.get_actual_cost() | |
| logger.info(f"Actual cost: ${actual_cost['actual_cost_usd']:.2f}") | |
| # Save if path provided | |
| if output_path: | |
| self._save_pairs(qa_pairs, output_path) | |
| return qa_pairs | |
| def _generate_for_segment( | |
| self, segment, num_questions: int | |
| ) -> list[QAPair]: | |
| """Generate Q&A pairs for a single segment.""" | |
| # Rate limiting | |
| self._rate_limit() | |
| # Select question types | |
| question_types = random.sample( | |
| list(QUESTION_TEMPLATES.keys()), | |
| min(num_questions, len(QUESTION_TEMPLATES)), | |
| ) | |
| # Build prompt | |
| system_prompt = self._build_system_prompt() | |
| user_prompt = self._build_generation_prompt( | |
| segment.content, question_types, num_questions | |
| ) | |
| # Call API | |
| response_text, input_tokens, output_tokens = self._call_api( | |
| system_prompt, user_prompt | |
| ) | |
| # Track tokens | |
| self._total_input_tokens += input_tokens | |
| self._total_output_tokens += output_tokens | |
| # Parse response | |
| pairs = self._parse_response( | |
| response_text, | |
| segment.segment_index, | |
| segment.source_post_title, | |
| question_types, | |
| ) | |
| return pairs | |
| def _rate_limit(self) -> None: | |
| """Enforce rate limiting between requests.""" | |
| current_time = time.time() | |
| time_since_last = current_time - self._last_request_time | |
| if time_since_last < self._min_request_interval: | |
| sleep_time = self._min_request_interval - time_since_last | |
| time.sleep(sleep_time) | |
| self._last_request_time = time.time() | |
| def _build_system_prompt(self) -> str: | |
| """Build the system prompt for Q&A generation.""" | |
| return f"""You are helping create training data for an AI assistant that will replicate the communication style of {self.ceo_name}, CEO of {self.company_name}. | |
| Your task is to generate realistic Q&A pairs based on provided blog content. The questions should be ones that stakeholders, employees, journalists, or business partners might ask. The answers should authentically capture the CEO's voice, reasoning patterns, and communication style as demonstrated in the source content. | |
| Guidelines for generating responses: | |
| 1. Match the tone and vocabulary of the original content | |
| 2. Preserve the CEO's unique way of explaining concepts | |
| 3. Maintain the same level of formality/informality | |
| 4. Include similar rhetorical patterns (questions, examples, analogies) | |
| 5. Stay factually consistent with the source material | |
| 6. For topics not directly covered, extrapolate based on evident principles and values | |
| Output format: Return a JSON array of Q&A pairs. Each pair should have: | |
| - "question": The stakeholder's question | |
| - "answer": The CEO's response in their authentic voice | |
| - "question_type": The category of question (opinion, strategic, factual, etc.)""" | |
| def _build_generation_prompt( | |
| self, content: str, question_types: list[str], num_questions: int | |
| ) -> str: | |
| """Build the user prompt for generation.""" | |
| types_str = ", ".join(question_types) | |
| return f"""Based on the following blog content, generate {num_questions} Q&A pairs. | |
| Include these question types: {types_str} | |
| Blog content: | |
| --- | |
| {content} | |
| --- | |
| Generate {num_questions} diverse Q&A pairs that capture the CEO's authentic voice. Return only valid JSON array. | |
| Example format: | |
| [ | |
| {{ | |
| "question": "What is your view on AI in business?", | |
| "answer": "I believe AI is fundamentally transforming how we...", | |
| "question_type": "opinion" | |
| }} | |
| ]""" | |
| def _call_api( | |
| self, system_prompt: str, user_prompt: str | |
| ) -> tuple[str, int, int]: | |
| """Call the LLM API and return response with token counts.""" | |
| if self.provider == "anthropic": | |
| response = self.client.messages.create( | |
| model=self.model, | |
| max_tokens=2000, | |
| system=system_prompt, | |
| messages=[{"role": "user", "content": user_prompt}], | |
| ) | |
| text = response.content[0].text | |
| input_tokens = response.usage.input_tokens | |
| output_tokens = response.usage.output_tokens | |
| else: | |
| response = self.client.chat.completions.create( | |
| model=self.model, | |
| max_tokens=2000, | |
| messages=[ | |
| {"role": "system", "content": system_prompt}, | |
| {"role": "user", "content": user_prompt}, | |
| ], | |
| ) | |
| text = response.choices[0].message.content | |
| input_tokens = response.usage.prompt_tokens | |
| output_tokens = response.usage.completion_tokens | |
| return text, input_tokens, output_tokens | |
| def _parse_response( | |
| self, | |
| response_text: str, | |
| segment_index: int, | |
| source_title: str, | |
| question_types: list[str], | |
| ) -> list[QAPair]: | |
| """Parse the API response into QAPair objects.""" | |
| pairs = [] | |
| try: | |
| # Try to extract JSON from response | |
| # Handle markdown code blocks | |
| if "```json" in response_text: | |
| json_start = response_text.find("```json") + 7 | |
| json_end = response_text.find("```", json_start) | |
| response_text = response_text[json_start:json_end] | |
| elif "```" in response_text: | |
| json_start = response_text.find("```") + 3 | |
| json_end = response_text.find("```", json_start) | |
| response_text = response_text[json_start:json_end] | |
| data = json.loads(response_text.strip()) | |
| if isinstance(data, list): | |
| for item in data: | |
| if isinstance(item, dict) and "question" in item and "answer" in item: | |
| pairs.append(QAPair( | |
| question=item["question"], | |
| answer=item["answer"], | |
| source_segment_index=segment_index, | |
| source_post_title=source_title, | |
| question_type=item.get("question_type", "unknown"), | |
| )) | |
| except json.JSONDecodeError as e: | |
| logger.warning(f"Failed to parse JSON response: {e}") | |
| # Try to salvage partial response | |
| pairs = self._salvage_partial_response( | |
| response_text, segment_index, source_title | |
| ) | |
| return pairs | |
| def _salvage_partial_response( | |
| self, response_text: str, segment_index: int, source_title: str | |
| ) -> list[QAPair]: | |
| """Attempt to extract Q&A pairs from malformed response.""" | |
| pairs = [] | |
| # Look for question/answer patterns | |
| import re | |
| qa_pattern = re.compile( | |
| r'"question":\s*"([^"]+)".*?"answer":\s*"([^"]+)"', | |
| re.DOTALL | |
| ) | |
| for match in qa_pattern.finditer(response_text): | |
| pairs.append(QAPair( | |
| question=match.group(1), | |
| answer=match.group(2), | |
| source_segment_index=segment_index, | |
| source_post_title=source_title, | |
| question_type="unknown", | |
| )) | |
| return pairs | |
| def _save_pairs(self, pairs: list[QAPair], path: str | Path) -> None: | |
| """Save Q&A pairs to JSON file.""" | |
| path = Path(path) | |
| data = [p.to_dict() for p in pairs] | |
| with open(path, "w", encoding="utf-8") as f: | |
| json.dump(data, f, indent=2, ensure_ascii=False) | |
| logger.info(f"Saved {len(pairs)} Q&A pairs to: {path}") | |
| def load_pairs(path: str | Path) -> list[QAPair]: | |
| """Load Q&A pairs from JSON file.""" | |
| with open(path, "r", encoding="utf-8") as f: | |
| data = json.load(f) | |
| return [ | |
| QAPair( | |
| question=item["question"], | |
| answer=item["answer"], | |
| source_segment_index=item["source_segment_index"], | |
| source_post_title=item["source_post_title"], | |
| question_type=item["question_type"], | |
| metadata=item.get("metadata", {}), | |
| ) | |
| for item in data | |
| ] | |
| def main(): | |
| """CLI entry point for testing the generator.""" | |
| import argparse | |
| parser = argparse.ArgumentParser( | |
| description="Generate Q&A pairs from text segments using LLM APIs", | |
| formatter_class=argparse.RawDescriptionHelpFormatter, | |
| epilog=""" | |
| Examples: | |
| python qa_generator.py segments.json --output qa_pairs.json --num-pairs 100 | |
| python qa_generator.py segments.json --provider openai --estimate-only | |
| Environment variables: | |
| ANTHROPIC_API_KEY - Anthropic API key (for Claude) | |
| OPENAI_API_KEY - OpenAI API key (for GPT-4) | |
| """, | |
| ) | |
| parser.add_argument("input", help="Input segments JSON file") | |
| parser.add_argument("--output", "-o", help="Output Q&A pairs JSON file") | |
| parser.add_argument( | |
| "--num-pairs", | |
| type=int, | |
| default=100, | |
| help="Number of Q&A pairs to generate (default: 100)", | |
| ) | |
| parser.add_argument( | |
| "--provider", | |
| choices=["anthropic", "openai"], | |
| default="anthropic", | |
| help="API provider (default: anthropic)", | |
| ) | |
| parser.add_argument( | |
| "--model", | |
| help="Model name (defaults based on provider)", | |
| ) | |
| parser.add_argument( | |
| "--estimate-only", | |
| action="store_true", | |
| help="Only show cost estimate, don't generate", | |
| ) | |
| parser.add_argument( | |
| "--ceo-name", | |
| default="Ryouken Okuni", | |
| help="CEO name for persona", | |
| ) | |
| parser.add_argument( | |
| "--company-name", | |
| default="Akatsuki AI Technologies", | |
| help="Company name for persona", | |
| ) | |
| args = parser.parse_args() | |
| # Load segments | |
| with open(args.input, "r", encoding="utf-8") as f: | |
| segments_data = json.load(f) | |
| # Convert to simple objects for the generator | |
| from dataclasses import dataclass as dc | |
| class SimpleSegment: | |
| content: str | |
| segment_index: int | |
| source_post_title: str | |
| segments = [ | |
| SimpleSegment( | |
| content=s["content"], | |
| segment_index=s.get("segment_index", i), | |
| source_post_title=s.get("source_post_title", "Unknown"), | |
| ) | |
| for i, s in enumerate(segments_data) | |
| ] | |
| try: | |
| generator = QAGenerator( | |
| provider=args.provider, | |
| model=args.model, | |
| ceo_name=args.ceo_name, | |
| company_name=args.company_name, | |
| ) | |
| except (ImportError, ValueError) as e: | |
| print(f"Error initializing generator: {e}") | |
| return 1 | |
| # Show estimate | |
| estimate = generator.estimate_cost(args.num_pairs) | |
| print(f"\n=== Cost Estimate ===") | |
| print(f"Provider: {estimate['provider']}") | |
| print(f"Model: {estimate['model']}") | |
| print(f"Estimated input tokens: {estimate['estimated_input_tokens']:,}") | |
| print(f"Estimated output tokens: {estimate['estimated_output_tokens']:,}") | |
| print(f"Estimated cost: ${estimate['estimated_cost_usd']:.2f}") | |
| if args.estimate_only: | |
| return 0 | |
| # Confirm | |
| print("\nProceed with generation? [y/N] ", end="") | |
| response = input().strip().lower() | |
| if response != "y": | |
| print("Cancelled.") | |
| return 0 | |
| # Generate | |
| output_path = args.output or "qa_pairs.json" | |
| pairs = generator.generate_from_segments( | |
| segments, num_pairs=args.num_pairs, output_path=output_path | |
| ) | |
| # Show results | |
| actual = generator.get_actual_cost() | |
| print(f"\n=== Generation Complete ===") | |
| print(f"Generated: {len(pairs)} Q&A pairs") | |
| print(f"Actual cost: ${actual['actual_cost_usd']:.2f}") | |
| print(f"Saved to: {output_path}") | |
| return 0 | |
| if __name__ == "__main__": | |
| exit(main()) | |