File size: 5,425 Bytes
a6e70b1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
"""

VentureForge CLI

================

Run the full multi-agent pipeline from the command line.



Usage:

    python -m src.main --domain "developer tools"

"""

from __future__ import annotations

import argparse
import json

from src.config import settings
from src.graph import GRAPH
from src.state.schema import VentureForgeState


def make_initial_state(

    domain: str,

    max_pain_points: int | None = None,

    ideas_per_run: int | None = None,

    top_n_pitches: int | None = None,

    max_revisions: int | None = None,

) -> VentureForgeState:
    """Construct the initial VentureForgeState for a new run.



    Shared between the CLI entrypoint and the Gradio UI controller so

    that both use the same defaults from ``src.config.settings``.

    """
    return VentureForgeState(
        domain=domain,
        max_pain_points=max_pain_points or settings.max_pain_points,
        ideas_per_run=ideas_per_run or settings.ideas_per_run,
        top_n_pitches=top_n_pitches or settings.top_n_pitches,
        max_revisions=max_revisions or settings.max_revisions,
    )


def run_pipeline(

    domain: str | None,

    max_pain_points: int | None = None,

    *,

    recursion_limit: int = 80,

    resume_run_id: str | None = None,

) -> VentureForgeState:
    """Execute the full end-to-end pipeline and return final state.



    If ``resume_run_id`` is provided, the pipeline resumes from the latest

    checkpoint for that ``run_id`` (thread_id) using the LangGraph SQLite

    checkpointer and ignores ``domain``/``max_pain_points``.

    """

    # Resume mode: load state from existing checkpoints and continue.
    if resume_run_id is not None:
        return GRAPH.invoke(
            None,
            config={
                "recursion_limit": recursion_limit,
                "configurable": {"thread_id": resume_run_id},
            },
        )

    if domain is None:
        raise ValueError("domain is required when not resuming from a previous run")

    state = make_initial_state(domain, max_pain_points=max_pain_points)
    # LangGraph invoke returns updated state. Use the state's run_id as
    # the checkpoint "thread_id" so runs can be resumed/inspected.
    return GRAPH.invoke(
        state,
        config={
            "recursion_limit": recursion_limit,
            "configurable": {"thread_id": state.run_id},
        },
    )


def main() -> None:
    parser = argparse.ArgumentParser(description="VentureForge — AI Startup Discovery")
    parser.add_argument(
        "--domain",
        type=str,
        required=False,
        help="Target domain, e.g. 'developer tools' (ignored when using --resume)",
    )
    parser.add_argument(
        "--max-pain-points",
        type=int,
        default=None,
        help="Override max pain points to extract (new runs only)",
    )
    parser.add_argument(
        "--resume",
        type=str,
        default=None,
        help="Existing run_id to resume from LangGraph checkpoints",
    )
    parser.add_argument(
        "--output",
        type=str,
        default="output.json",
        help="Output JSON file path",
    )
    args = parser.parse_args()

    if args.resume:
        print(f"Resuming VentureForge run: run_id='{args.resume}'")
        result = run_pipeline(
            domain=None,
            max_pain_points=None,
            resume_run_id=args.resume,
        )
    else:
        if not args.domain:
            parser.error("--domain is required for new runs (omit it when using --resume)")
        print(f"VentureForge starting: domain='{args.domain}'")
        result = run_pipeline(args.domain, args.max_pain_points)

    # Serialize final state
    # LangGraph returns a dict that may contain Pydantic models
    # We need to serialize them properly to avoid "Object of type X is not JSON serializable" errors
    if isinstance(result, dict):
        # Convert dict to VentureForgeState to ensure proper serialization
        from src.state.schema import VentureForgeState
        try:
            state = VentureForgeState(**result)
            output = state.model_dump(mode="json", exclude_none=True)
        except Exception as e:
            # Fallback: try direct serialization (may fail if dict contains Pydantic models)
            print(f"Warning: Could not convert result to VentureForgeState: {e}")
            print("Attempting direct serialization...")
            output = result
    else:
        output = result.model_dump(mode="json", exclude_none=True)
    
    with open(args.output, "w", encoding="utf-8") as f:
        json.dump(output, f, indent=2, ensure_ascii=False)

    print(f"\nPipeline finished in stage: {output.get('current_stage', 'unknown')}")
    print(f"   Run ID     : {output.get('run_id', 'unknown')}")
    print(f"   Duration   : {output.get('agent_timings', {})}")
    print(f"   Pain points: {len(output.get('pain_points', []))}")
    print(f"   Ideas      : {len(output.get('ideas', []))}")
    print(f"   Pitches    : {len(output.get('pitch_briefs', []))}")
    revision_counts = output.get('revision_counts', {})
    total_revisions = sum(revision_counts.values())
    print(f"   Revisions  : {total_revisions} (across {len(revision_counts)} pitches)")
    print(f"\nOutput written to: {args.output}")


if __name__ == "__main__":
    main()