File size: 5,425 Bytes
a6e70b1 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 | """
VentureForge CLI
================
Run the full multi-agent pipeline from the command line.
Usage:
python -m src.main --domain "developer tools"
"""
from __future__ import annotations
import argparse
import json
from src.config import settings
from src.graph import GRAPH
from src.state.schema import VentureForgeState
def make_initial_state(
domain: str,
max_pain_points: int | None = None,
ideas_per_run: int | None = None,
top_n_pitches: int | None = None,
max_revisions: int | None = None,
) -> VentureForgeState:
"""Construct the initial VentureForgeState for a new run.
Shared between the CLI entrypoint and the Gradio UI controller so
that both use the same defaults from ``src.config.settings``.
"""
return VentureForgeState(
domain=domain,
max_pain_points=max_pain_points or settings.max_pain_points,
ideas_per_run=ideas_per_run or settings.ideas_per_run,
top_n_pitches=top_n_pitches or settings.top_n_pitches,
max_revisions=max_revisions or settings.max_revisions,
)
def run_pipeline(
domain: str | None,
max_pain_points: int | None = None,
*,
recursion_limit: int = 80,
resume_run_id: str | None = None,
) -> VentureForgeState:
"""Execute the full end-to-end pipeline and return final state.
If ``resume_run_id`` is provided, the pipeline resumes from the latest
checkpoint for that ``run_id`` (thread_id) using the LangGraph SQLite
checkpointer and ignores ``domain``/``max_pain_points``.
"""
# Resume mode: load state from existing checkpoints and continue.
if resume_run_id is not None:
return GRAPH.invoke(
None,
config={
"recursion_limit": recursion_limit,
"configurable": {"thread_id": resume_run_id},
},
)
if domain is None:
raise ValueError("domain is required when not resuming from a previous run")
state = make_initial_state(domain, max_pain_points=max_pain_points)
# LangGraph invoke returns updated state. Use the state's run_id as
# the checkpoint "thread_id" so runs can be resumed/inspected.
return GRAPH.invoke(
state,
config={
"recursion_limit": recursion_limit,
"configurable": {"thread_id": state.run_id},
},
)
def main() -> None:
parser = argparse.ArgumentParser(description="VentureForge — AI Startup Discovery")
parser.add_argument(
"--domain",
type=str,
required=False,
help="Target domain, e.g. 'developer tools' (ignored when using --resume)",
)
parser.add_argument(
"--max-pain-points",
type=int,
default=None,
help="Override max pain points to extract (new runs only)",
)
parser.add_argument(
"--resume",
type=str,
default=None,
help="Existing run_id to resume from LangGraph checkpoints",
)
parser.add_argument(
"--output",
type=str,
default="output.json",
help="Output JSON file path",
)
args = parser.parse_args()
if args.resume:
print(f"Resuming VentureForge run: run_id='{args.resume}'")
result = run_pipeline(
domain=None,
max_pain_points=None,
resume_run_id=args.resume,
)
else:
if not args.domain:
parser.error("--domain is required for new runs (omit it when using --resume)")
print(f"VentureForge starting: domain='{args.domain}'")
result = run_pipeline(args.domain, args.max_pain_points)
# Serialize final state
# LangGraph returns a dict that may contain Pydantic models
# We need to serialize them properly to avoid "Object of type X is not JSON serializable" errors
if isinstance(result, dict):
# Convert dict to VentureForgeState to ensure proper serialization
from src.state.schema import VentureForgeState
try:
state = VentureForgeState(**result)
output = state.model_dump(mode="json", exclude_none=True)
except Exception as e:
# Fallback: try direct serialization (may fail if dict contains Pydantic models)
print(f"Warning: Could not convert result to VentureForgeState: {e}")
print("Attempting direct serialization...")
output = result
else:
output = result.model_dump(mode="json", exclude_none=True)
with open(args.output, "w", encoding="utf-8") as f:
json.dump(output, f, indent=2, ensure_ascii=False)
print(f"\nPipeline finished in stage: {output.get('current_stage', 'unknown')}")
print(f" Run ID : {output.get('run_id', 'unknown')}")
print(f" Duration : {output.get('agent_timings', {})}")
print(f" Pain points: {len(output.get('pain_points', []))}")
print(f" Ideas : {len(output.get('ideas', []))}")
print(f" Pitches : {len(output.get('pitch_briefs', []))}")
revision_counts = output.get('revision_counts', {})
total_revisions = sum(revision_counts.values())
print(f" Revisions : {total_revisions} (across {len(revision_counts)} pitches)")
print(f"\nOutput written to: {args.output}")
if __name__ == "__main__":
main()
|