| """Idea Generator — clusters pain points into themes and generates distinct
|
| startup ideas.
|
|
|
| Pipeline flow
|
| =============
|
| 1. Receive filtered pain points from state.
|
| 2. Cap to a manageable number (~50) to fit context window.
|
| 3. LLM brainstorms ideas grouped by themes, returns structured JSON.
|
| 4. Code validates that every ``addresses_pain_point_ids`` references a real
|
| pain point UUID that was present in the input.
|
| 5. Return validated Idea objects.
|
| """
|
| from __future__ import annotations
|
|
|
| import json
|
| import logging
|
| import time
|
| from uuid import UUID, uuid4
|
|
|
| from langchain_core.messages import HumanMessage, SystemMessage
|
|
|
| from src.llm.client import extract_json, get_llm
|
| from src.llm.prompts import get_prompt
|
| from src.state.schema import Idea, PipelineStage, VentureForgeState
|
|
|
| logger = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
| _MAX_PAIN_POINTS_CONTEXT: int = 50
|
| _IDEAS_PER_RUN_DEFAULT: int = 5
|
|
|
|
|
| def _build_system_prompt() -> str:
|
| return get_prompt("idea_generator")
|
|
|
|
|
| def _build_user_prompt(state: VentureForgeState) -> str: |
| |
| |
| sorted_pps = sorted( |
| state.filtered_pain_points, |
| key=lambda pp: len(pp.evidence), |
| reverse=True |
| ) |
| pps = sorted_pps[:_MAX_PAIN_POINTS_CONTEXT] |
| domain = state.domain |
| count = state.ideas_per_run or _IDEAS_PER_RUN_DEFAULT |
| feedback = state.revision_feedback or "None" |
|
|
| |
| pp_blobs: list[dict] = [ |
| { |
| "id": str(pp.id), |
| "title": pp.title, |
| "description": pp.description, |
| "evidence": [ |
| { |
| "source_url": ev.source_url, |
| "raw_quote": ev.raw_quote, |
| "source": ev.source.value, |
| } |
| for ev in pp.evidence |
| ], |
| "evidence_count": len(pp.evidence), |
| } |
| for pp in pps |
| ] |
|
|
|
|
|
|
|
|
|
|
|
|
| revision_block = ""
|
| if state.revision_feedback:
|
| revision_block = (
|
| "THIS IS A REVISION ROUND. The critic flagged weaknesses in "
|
| "positioning (e.g., target user not a contained community, "
|
| "or weak competitive thesis). You MUST address the following "
|
| "feedback before generating ideas:\n"
|
| f"- Critic feedback: {feedback}\n\n"
|
| "In your new ideas, make the target_user a specific, named, "
|
| "reachable community (a 'contained fire') and make the "
|
| "competition thesis explicit: what are users doing today and "
|
| "what incumbents are afraid to do.\n\n"
|
| )
|
|
|
| |
| |
| min_refs = min(2, len(pps)) |
| |
| |
| if len(pps) == 1: |
| requirement_block = ( |
| "**SPECIAL CASE: Only 1 pain point available.**\\n" |
| "Generate ideas that deeply address this single pain point. " |
| "Each idea must reference this pain point UUID in 'addresses_pain_point_ids'. " |
| "Focus on different solution angles, user segments, or implementation approaches for variety.\\n\\n" |
| ) |
| elif len(pps) >= 2: |
| requirement_block = ( |
| f"**CRITICAL REQUIREMENT: Each idea MUST reference AT LEAST {min_refs} pain point UUIDs in 'addresses_pain_point_ids'.**\\n" |
| f"Ideas with fewer than {min_refs} references will be REJECTED. " |
| "Cross-pollinate pain points to create stronger, more defensible ideas that solve multiple problems.\\n\\n" |
| ) |
| else: |
| requirement_block = "ERROR: No pain points provided. Cannot generate ideas.\\n\\n" |
| |
| user_text = ( |
| f"Domain: {domain}\\n" |
| f"Ideas to generate: {count}\\n\\n" |
| f"PAIN POINTS ({len(pps)} provided):\\n" |
| f"{json.dumps(pp_blobs, indent=2)}\\n\\n" |
| f"{revision_block}" |
| f"{requirement_block}" |
| "Only use UUIDs from the pain points list above — do not invent new UUIDs.\\n\\n" |
| "Return JSON: {\\\"ideas\\\": [ ... ]}." |
| ) |
| return user_text |
|
|
|
|
| def _build_user_prompt_single(state: VentureForgeState, idea_number: int, total_ideas: int) -> str: |
| """Build prompt for generating a SINGLE idea to reduce token usage. |
| |
| Generates one idea at a time for: |
| - Better fit within vLLM 2048 token limit |
| - More focused, comprehensive ideas |
| - LLM can concentrate on each idea individually |
| |
| Args: |
| state: Current pipeline state |
| idea_number: Which idea this is (1-indexed for display) |
| total_ideas: Total number of ideas to generate |
| """ |
| |
| sorted_pps = sorted( |
| state.filtered_pain_points, |
| key=lambda pp: len(pp.evidence), |
| reverse=True |
| ) |
| pps = sorted_pps[:_MAX_PAIN_POINTS_CONTEXT] |
| domain = state.domain |
| feedback = state.revision_feedback or "None" |
| |
| |
| pp_blobs: list[dict] = [ |
| { |
| "id": str(pp.id), |
| "title": pp.title, |
| "description": pp.description, |
| "evidence": [ |
| { |
| "source_url": ev.source_url, |
| "raw_quote": ev.raw_quote[:300], |
| "source": ev.source.value, |
| } |
| for ev in pp.evidence[:2] |
| ], |
| "evidence_count": len(pp.evidence), |
| } |
| for pp in pps |
| ] |
| |
| |
| revision_block = "" |
| if state.revision_feedback: |
| revision_block = ( |
| "THIS IS A REVISION ROUND. The critic flagged weaknesses in positioning. " |
| "You MUST address the following feedback:\n" |
| f"- Critic feedback: {feedback}\n\n" |
| "Make the target_user a specific, named, reachable community (a 'contained fire') " |
| "and make the competition thesis explicit.\n\n" |
| ) |
| |
| |
| min_refs = min(2, len(pps)) |
| |
| |
| if len(pps) == 1: |
| requirement_block = ( |
| "**SPECIAL CASE: Only 1 pain point available.**\n" |
| "Generate an idea that deeply addresses this single pain point. " |
| "The idea must reference this pain point UUID in 'addresses_pain_point_ids'.\n\n" |
| ) |
| elif len(pps) >= 2: |
| requirement_block = ( |
| f"**CRITICAL: The idea MUST reference AT LEAST {min_refs} pain point UUIDs in 'addresses_pain_point_ids'.**\n" |
| f"Ideas with fewer than {min_refs} references will be REJECTED. " |
| "Cross-pollinate pain points to create a stronger, more defensible idea.\n\n" |
| ) |
| else: |
| requirement_block = "ERROR: No pain points provided.\n\n" |
| |
| user_text = ( |
| f"Domain: {domain}\n" |
| f"Generating idea {idea_number} of {total_ideas}\n\n" |
| f"PAIN POINTS ({len(pps)} provided):\n" |
| f"{json.dumps(pp_blobs, indent=2)}\n\n" |
| f"{revision_block}" |
| f"{requirement_block}" |
| "Only use UUIDs from the pain points list above — do not invent new UUIDs.\n\n" |
| "Return a single JSON object (not an array): {\"title\": ..., \"one_liner\": ..., ...}" |
| ) |
| return user_text |
|
|
|
|
| def _invoke_llm_single(state: VentureForgeState, idea_number: int, total_ideas: int, retry_count: int = 0) -> dict | None: |
| """Invoke LLM to generate a SINGLE idea. |
| |
| Args: |
| state: Current pipeline state |
| idea_number: Which idea this is (1-indexed) |
| total_ideas: Total number of ideas to generate |
| retry_count: Current retry attempt (0-indexed) |
| |
| Returns: |
| Raw idea dict, or None on failure |
| """ |
| llm = get_llm(temperature=0.7, max_tokens=16384, reasoning=False) |
| |
| system_prompt = _build_system_prompt() |
| system_prompt += "\n\n**CRITICAL: Output ONLY a single JSON object. No markdown fences, no explanations. Start with { and end with }.**" |
| |
| messages = [ |
| SystemMessage(content=system_prompt), |
| HumanMessage(content=_build_user_prompt_single(state, idea_number, total_ideas)), |
| ] |
| |
| start = time.monotonic() |
| try: |
| raw = llm.invoke(messages) |
| content = raw.content if hasattr(raw, "content") else str(raw) |
| except Exception as e: |
| logger.error(f"[idea_generator] LLM invocation failed for idea {idea_number} (attempt {retry_count + 1}): {e}") |
| return None |
| |
| elapsed = time.monotonic() - start |
| logger.info(f"[idea_generator] LLM responded in {elapsed:.1f}s for idea {idea_number} (attempt {retry_count + 1})") |
| |
| |
| if content and not content.rstrip().endswith('}'): |
| logger.warning( |
| f"[idea_generator] Response may be truncated for idea {idea_number}. " |
| f"Last 100 chars: {content[-100:]}" |
| ) |
| |
| parsed = extract_json(content) |
| if parsed is None: |
| logger.error( |
| f"[idea_generator] JSON extraction failed for idea {idea_number} (attempt {retry_count + 1}). " |
| f"Response length: {len(content)} chars" |
| ) |
| logger.error(f"[idea_generator] Response preview: {content[:500]}") |
| return None |
| |
| |
| if isinstance(parsed, dict): |
| if "ideas" in parsed and isinstance(parsed["ideas"], list): |
| return parsed["ideas"][0] if parsed["ideas"] else None |
| return parsed |
| |
| return None |
|
|
|
|
| def _invoke_llm(state: VentureForgeState) -> list[dict]: |
| """Call LLM, parse JSON, return raw idea dicts.""" |
| llm = get_llm(temperature=0.7, max_tokens=16384, reasoning=False) |
| |
| |
| system_prompt = _build_system_prompt() |
| system_prompt += "\n\n**CRITICAL: Output ONLY the JSON object. No markdown code fences, no explanations, no preamble. Start with { and end with }.**" |
| |
| messages = [ |
| SystemMessage(content=system_prompt), |
| HumanMessage(content=_build_user_prompt(state)), |
| ] |
|
|
| start = time.monotonic() |
| try: |
| raw = llm.invoke(messages) |
| content = raw.content if hasattr(raw, "content") else str(raw) |
| except Exception as e: |
| logger.error(f"[idea_generator] LLM invocation failed after {time.monotonic()-start:.1f}s: {e}") |
| return [] |
|
|
| logger.info(f"[idea_generator] LLM responded in {time.monotonic()-start:.1f}s") |
| |
| |
| logger.info(f"[idea_generator] Response preview (first 500 chars): {content[:500]}") |
|
|
| parsed = extract_json(content) |
| if parsed is None: |
| logger.error(f"[idea_generator] JSON extraction failed. Response length: {len(content)} chars") |
| logger.error(f"[idea_generator] Full response (first 2000 chars): {content[:2000]}") |
| return [] |
|
|
| ideas = parsed.get("ideas") if isinstance(parsed, dict) else parsed if isinstance(parsed, list) else [] |
| if not isinstance(ideas, list):
|
| logger.warning("[idea_generator] LLM did not return an array of ideas")
|
| return []
|
| return ideas
|
|
|
|
|
| def _validate_idea(raw: dict, valid_ids: set[UUID], min_refs: int = 2) -> Idea | None: |
| """Return an Idea if it references real pain points, else None. |
| |
| Args: |
| raw: Raw idea dict from LLM |
| valid_ids: Set of valid pain point UUIDs |
| min_refs: Minimum number of pain point references required (default 2) |
| """ |
| |
| raw_ids = raw.get("addresses_pain_point_ids", []) |
| resolved: list[UUID] = [] |
| for rid in raw_ids: |
| try: |
| uid = UUID(str(rid)) |
| if uid in valid_ids: |
| resolved.append(uid) |
| except (ValueError, TypeError): |
| continue |
|
|
| if len(resolved) < min_refs: |
| logger.debug( |
| f"[idea_generator] REJECTED — idea '{raw.get('title', '?')}' " |
| f"references only {len(resolved)} valid pain point(s), need {min_refs}" |
| ) |
| return None |
|
|
| try:
|
| idea = Idea(
|
| id=uuid4(),
|
| title=raw["title"],
|
| one_liner=raw["one_liner"],
|
| problem=raw["problem"],
|
| solution=raw["solution"],
|
| target_user=raw["target_user"],
|
| key_features=raw.get("key_features", []),
|
| addresses_pain_point_ids=resolved,
|
| )
|
| return idea
|
| except Exception as e:
|
| logger.debug(f"[idea_generator] REJECTED — malformed idea: {e}")
|
| return None
|
|
|
|
|
|
|
|
|
|
|
| def run(state: VentureForgeState) -> dict: |
| pps = state.filtered_pain_points |
| if not pps: |
| logger.warning("[idea_generator] no pain points available — returning empty") |
| patch = { |
| "ideas": [], |
| "current_stage": PipelineStage.GENERATING, |
| "next_node": "orchestrator", |
| "idea_generation_attempts": state.idea_generation_attempts + 1, |
| } |
| patch.update( |
| state.add_event( |
| agent="idea_generator", |
| stage=PipelineStage.GENERATING, |
| kind="warning", |
| message="No pain points available to generate ideas from.", |
| ) |
| ) |
| return patch |
|
|
| valid_ids = {pp.id for pp in pps} |
| min_refs = min(2, len(pps)) |
| |
| |
| |
| if state.current_revision_idea_id: |
| count = 1 |
| logger.info(f"[idea_generator] Revision mode: generating 1 replacement idea for {state.current_revision_idea_id}") |
| else: |
| count = state.ideas_per_run or _IDEAS_PER_RUN_DEFAULT |
| logger.info(f"[idea_generator] Initial generation: generating {count} ideas one at a time") |
| |
| MAX_RETRIES = 3 |
| raw_ideas = [] |
| |
| |
| for i in range(count): |
| idea_number = i + 1 |
| logger.info(f"[idea_generator] Generating idea {idea_number} of {count}") |
| |
| raw_idea = None |
| for retry in range(MAX_RETRIES): |
| raw_idea = _invoke_llm_single(state, idea_number, count, retry_count=retry) |
| |
| if raw_idea: |
| logger.info(f"[idea_generator] Successfully generated idea {idea_number} on attempt {retry + 1}") |
| raw_ideas.append(raw_idea) |
| break |
| |
| if retry < MAX_RETRIES - 1: |
| logger.warning( |
| f"[idea_generator] Attempt {retry + 1}/{MAX_RETRIES} failed for idea {idea_number}. Retrying..." |
| ) |
| else: |
| logger.error( |
| f"[idea_generator] All {MAX_RETRIES} attempts failed for idea {idea_number}." |
| ) |
| |
| logger.info(f"[idea_generator] LLM produced {len(raw_ideas)} raw ideas") |
| |
| |
| if raw_ideas: |
| logger.info(f"[idea_generator] Sample raw idea: {json.dumps(raw_ideas[0], indent=2)}") |
| logger.info(f"[idea_generator] Valid pain point IDs: {[str(vid) for vid in list(valid_ids)[:3]]}") |
|
|
| validated: list[Idea] = [] |
| for raw in raw_ideas: |
| idea = _validate_idea(raw, valid_ids, min_refs) |
| if idea: |
| validated.append(idea) |
|
|
| final = validated[:count] |
|
|
| logger.info( |
| f"[idea_generator] {len(final)}/{len(raw_ideas)} ideas validated " |
| f"(required <= {count} with >={min_refs} real pain point refs each)" |
| ) |
|
|
| |
| if state.current_revision_idea_id: |
| |
| all_ideas = state.ideas + final |
| else: |
| |
| all_ideas = final |
|
|
| patch = { |
| "ideas": all_ideas, |
| "idea_generation_attempts": state.idea_generation_attempts + 1, |
| "current_revision_idea_id": None, |
| "next_node": "orchestrator", |
| } |
| patch.update( |
| state.add_event( |
| agent="idea_generator", |
| stage=PipelineStage.GENERATING, |
| kind="info", |
| message=( |
| f"Generated {len(final)} ideas (requested {count}) " |
| f"addressing ≥{min_refs} validated pain points each." |
| ), |
| ) |
| ) |
| return patch |
|
|