Spaces:
Running
Running
File size: 5,485 Bytes
8dcf472 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 | """
DealFlow AI β Main Crew Orchestration
Assembles the 3-agent CrewAI crew and runs the due diligence pipeline.
"""
from __future__ import annotations
import json
import os
import time
from datetime import datetime
from pathlib import Path
from typing import Callable, Optional
from crewai import Crew, Process
from loguru import logger
from src.agents import (
build_financial_analyst_agent,
build_report_writer_agent,
build_researcher_agent,
)
from src.config import AppConfig, get_config, get_llm
from src.tasks import (
build_financial_analysis_task,
build_report_task,
build_research_task,
)
class DealFlowCrew:
"""
Orchestrates the 3-agent due diligence crew:
1. Researcher β extracts pitch deck + web research
2. Financial Analyst β analyzes projections + creates charts
3. Report Writer β synthesizes investment memo
"""
def __init__(
self,
config: Optional[AppConfig] = None,
progress_callback: Optional[Callable[[str], None]] = None,
):
self.config = config or get_config()
self.progress_callback = progress_callback or (lambda msg: logger.info(msg))
self._llm = None
@property
def llm(self):
if self._llm is None:
self._llm = get_llm(self.config)
return self._llm
def _emit(self, message: str) -> None:
"""Emit progress update via callback and logger."""
logger.info(message)
if self.progress_callback:
self.progress_callback(message)
def run(
self,
pdf_path: str,
company_name: str,
output_dir: Optional[str] = None,
) -> dict:
"""
Run the full due diligence pipeline.
Args:
pdf_path: Absolute path to the pitch deck PDF.
company_name: Startup company name (for naming outputs).
output_dir: Directory for output files. Defaults to config.output_dir.
Returns:
dict with keys: memo_path, research_result, analysis_result,
memo_result, elapsed_seconds, charts
"""
start = time.time()
out_dir = Path(output_dir) if output_dir else self.config.output_dir
out_dir.mkdir(parents=True, exist_ok=True)
self._emit(f"[DealFlow AI] Starting due diligence for: {company_name}")
self._emit(f"[DealFlow AI] PDF: {pdf_path}")
self._emit(f"[DealFlow AI] Output dir: {out_dir}")
self._emit(f"[DealFlow AI] Inference backend: {self.config.llm_backend.value}")
# βββ Build agents ββββββββββββββββββββββββββββββββββββββββββββββββ
self._emit("[DealFlow AI] Assembling agent crew...")
verbose = self.config.verbose_agents
researcher = build_researcher_agent(self.llm, pdf_path=pdf_path, verbose=verbose)
financial_analyst = build_financial_analyst_agent(self.llm, verbose=verbose)
report_writer = build_report_writer_agent(self.llm, verbose=verbose)
# βββ Build tasks βββββββββββββββββββββββββββββββββββββββββββββββββ
research_task = build_research_task(researcher, pdf_path, company_name)
analysis_task = build_financial_analysis_task(
financial_analyst,
pdf_path,
str(out_dir),
context_tasks=[research_task],
)
report_task = build_report_task(
report_writer,
company_name,
str(out_dir),
context_tasks=[research_task, analysis_task],
)
# βββ Assemble crew βββββββββββββββββββββββββββββββββββββββββββββββ
self._emit("[DealFlow AI] Crew ready. Kicking off sequential pipeline...")
crew = Crew(
agents=[researcher, financial_analyst, report_writer],
tasks=[research_task, analysis_task, report_task],
process=Process.sequential,
verbose=verbose,
max_rpm=10, # rate limit for API calls
)
# βββ Run βββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
self._emit("[AGENT 1/3] Researcher: Extracting pitch deck + web research...")
result = crew.kickoff()
elapsed = round(time.time() - start, 1)
self._emit(f"[DealFlow AI] Pipeline complete in {elapsed}s")
# βββ Collect outputs βββββββββββββββββββββββββββββββββββββββββββββ
charts = list(out_dir.glob("*.png"))
memos = list(out_dir.glob("memo_*.md"))
memo_path = str(memos[-1]) if memos else None
memo_content = ""
if memo_path:
memo_content = Path(memo_path).read_text(encoding="utf-8")
return {
"company_name": company_name,
"memo_path": memo_path,
"memo_content": memo_content,
"charts": [str(c) for c in charts],
"elapsed_seconds": elapsed,
"backend": self.config.llm_backend.value,
"raw_result": str(result),
}
|