| """
|
| VentureForge CLI
|
| ================
|
| Run the full multi-agent pipeline from the command line.
|
|
|
| Usage:
|
| python -m src.main --domain "developer tools"
|
| """
|
|
|
| from __future__ import annotations
|
|
|
| import argparse
|
| import json
|
|
|
| from src.config import settings
|
| from src.graph import GRAPH
|
| from src.state.schema import VentureForgeState
|
|
|
|
|
| def make_initial_state(
|
| domain: str,
|
| max_pain_points: int | None = None,
|
| ideas_per_run: int | None = None,
|
| top_n_pitches: int | None = None,
|
| max_revisions: int | None = None,
|
| ) -> VentureForgeState:
|
| """Construct the initial VentureForgeState for a new run.
|
|
|
| Shared between the CLI entrypoint and the Gradio UI controller so
|
| that both use the same defaults from ``src.config.settings``.
|
| """
|
| return VentureForgeState(
|
| domain=domain,
|
| max_pain_points=max_pain_points or settings.max_pain_points,
|
| ideas_per_run=ideas_per_run or settings.ideas_per_run,
|
| top_n_pitches=top_n_pitches or settings.top_n_pitches,
|
| max_revisions=max_revisions or settings.max_revisions,
|
| )
|
|
|
|
|
| def run_pipeline(
|
| domain: str | None,
|
| max_pain_points: int | None = None,
|
| *,
|
| recursion_limit: int = 80,
|
| resume_run_id: str | None = None,
|
| ) -> VentureForgeState:
|
| """Execute the full end-to-end pipeline and return final state.
|
|
|
| If ``resume_run_id`` is provided, the pipeline resumes from the latest
|
| checkpoint for that ``run_id`` (thread_id) using the LangGraph SQLite
|
| checkpointer and ignores ``domain``/``max_pain_points``.
|
| """
|
|
|
|
|
| if resume_run_id is not None:
|
| return GRAPH.invoke(
|
| None,
|
| config={
|
| "recursion_limit": recursion_limit,
|
| "configurable": {"thread_id": resume_run_id},
|
| },
|
| )
|
|
|
| if domain is None:
|
| raise ValueError("domain is required when not resuming from a previous run")
|
|
|
| state = make_initial_state(domain, max_pain_points=max_pain_points)
|
|
|
|
|
| return GRAPH.invoke(
|
| state,
|
| config={
|
| "recursion_limit": recursion_limit,
|
| "configurable": {"thread_id": state.run_id},
|
| },
|
| )
|
|
|
|
|
| def main() -> None:
|
| parser = argparse.ArgumentParser(description="VentureForge — AI Startup Discovery")
|
| parser.add_argument(
|
| "--domain",
|
| type=str,
|
| required=False,
|
| help="Target domain, e.g. 'developer tools' (ignored when using --resume)",
|
| )
|
| parser.add_argument(
|
| "--max-pain-points",
|
| type=int,
|
| default=None,
|
| help="Override max pain points to extract (new runs only)",
|
| )
|
| parser.add_argument(
|
| "--resume",
|
| type=str,
|
| default=None,
|
| help="Existing run_id to resume from LangGraph checkpoints",
|
| )
|
| parser.add_argument(
|
| "--output",
|
| type=str,
|
| default="output.json",
|
| help="Output JSON file path",
|
| )
|
| args = parser.parse_args()
|
|
|
| if args.resume:
|
| print(f"Resuming VentureForge run: run_id='{args.resume}'")
|
| result = run_pipeline(
|
| domain=None,
|
| max_pain_points=None,
|
| resume_run_id=args.resume,
|
| )
|
| else:
|
| if not args.domain:
|
| parser.error("--domain is required for new runs (omit it when using --resume)")
|
| print(f"VentureForge starting: domain='{args.domain}'")
|
| result = run_pipeline(args.domain, args.max_pain_points)
|
|
|
| |
| |
| |
| if isinstance(result, dict): |
| |
| from src.state.schema import VentureForgeState |
| try: |
| state = VentureForgeState(**result) |
| output = state.model_dump(mode="json", exclude_none=True) |
| except Exception as e: |
| |
| print(f"Warning: Could not convert result to VentureForgeState: {e}") |
| print("Attempting direct serialization...") |
| output = result |
| else: |
| output = result.model_dump(mode="json", exclude_none=True) |
| |
| with open(args.output, "w", encoding="utf-8") as f: |
| json.dump(output, f, indent=2, ensure_ascii=False) |
|
|
| print(f"\nPipeline finished in stage: {output.get('current_stage', 'unknown')}") |
| print(f" Run ID : {output.get('run_id', 'unknown')}") |
| print(f" Duration : {output.get('agent_timings', {})}") |
| print(f" Pain points: {len(output.get('pain_points', []))}") |
| print(f" Ideas : {len(output.get('ideas', []))}") |
| print(f" Pitches : {len(output.get('pitch_briefs', []))}") |
| revision_counts = output.get('revision_counts', {}) |
| total_revisions = sum(revision_counts.values()) |
| print(f" Revisions : {total_revisions} (across {len(revision_counts)} pitches)") |
| print(f"\nOutput written to: {args.output}") |
|
|
|
|
| if __name__ == "__main__":
|
| main()
|
|
|