dealflow-ai / src /crew.py
PeterBot22's picture
feat: DealFlow AI MVP β€” 3-agent CrewAI due diligence system on HF Spaces
8dcf472 verified
"""
DealFlow AI β€” Main Crew Orchestration
Assembles the 3-agent CrewAI crew and runs the due diligence pipeline.
"""
from __future__ import annotations
import json
import os
import time
from datetime import datetime
from pathlib import Path
from typing import Callable, Optional
from crewai import Crew, Process
from loguru import logger
from src.agents import (
build_financial_analyst_agent,
build_report_writer_agent,
build_researcher_agent,
)
from src.config import AppConfig, get_config, get_llm
from src.tasks import (
build_financial_analysis_task,
build_report_task,
build_research_task,
)
class DealFlowCrew:
"""
Orchestrates the 3-agent due diligence crew:
1. Researcher β†’ extracts pitch deck + web research
2. Financial Analyst β†’ analyzes projections + creates charts
3. Report Writer β†’ synthesizes investment memo
"""
def __init__(
self,
config: Optional[AppConfig] = None,
progress_callback: Optional[Callable[[str], None]] = None,
):
self.config = config or get_config()
self.progress_callback = progress_callback or (lambda msg: logger.info(msg))
self._llm = None
@property
def llm(self):
if self._llm is None:
self._llm = get_llm(self.config)
return self._llm
def _emit(self, message: str) -> None:
"""Emit progress update via callback and logger."""
logger.info(message)
if self.progress_callback:
self.progress_callback(message)
def run(
self,
pdf_path: str,
company_name: str,
output_dir: Optional[str] = None,
) -> dict:
"""
Run the full due diligence pipeline.
Args:
pdf_path: Absolute path to the pitch deck PDF.
company_name: Startup company name (for naming outputs).
output_dir: Directory for output files. Defaults to config.output_dir.
Returns:
dict with keys: memo_path, research_result, analysis_result,
memo_result, elapsed_seconds, charts
"""
start = time.time()
out_dir = Path(output_dir) if output_dir else self.config.output_dir
out_dir.mkdir(parents=True, exist_ok=True)
self._emit(f"[DealFlow AI] Starting due diligence for: {company_name}")
self._emit(f"[DealFlow AI] PDF: {pdf_path}")
self._emit(f"[DealFlow AI] Output dir: {out_dir}")
self._emit(f"[DealFlow AI] Inference backend: {self.config.llm_backend.value}")
# ─── Build agents ────────────────────────────────────────────────
self._emit("[DealFlow AI] Assembling agent crew...")
verbose = self.config.verbose_agents
researcher = build_researcher_agent(self.llm, pdf_path=pdf_path, verbose=verbose)
financial_analyst = build_financial_analyst_agent(self.llm, verbose=verbose)
report_writer = build_report_writer_agent(self.llm, verbose=verbose)
# ─── Build tasks ─────────────────────────────────────────────────
research_task = build_research_task(researcher, pdf_path, company_name)
analysis_task = build_financial_analysis_task(
financial_analyst,
pdf_path,
str(out_dir),
context_tasks=[research_task],
)
report_task = build_report_task(
report_writer,
company_name,
str(out_dir),
context_tasks=[research_task, analysis_task],
)
# ─── Assemble crew ───────────────────────────────────────────────
self._emit("[DealFlow AI] Crew ready. Kicking off sequential pipeline...")
crew = Crew(
agents=[researcher, financial_analyst, report_writer],
tasks=[research_task, analysis_task, report_task],
process=Process.sequential,
verbose=verbose,
max_rpm=10, # rate limit for API calls
)
# ─── Run ─────────────────────────────────────────────────────────
self._emit("[AGENT 1/3] Researcher: Extracting pitch deck + web research...")
result = crew.kickoff()
elapsed = round(time.time() - start, 1)
self._emit(f"[DealFlow AI] Pipeline complete in {elapsed}s")
# ─── Collect outputs ─────────────────────────────────────────────
charts = list(out_dir.glob("*.png"))
memos = list(out_dir.glob("memo_*.md"))
memo_path = str(memos[-1]) if memos else None
memo_content = ""
if memo_path:
memo_content = Path(memo_path).read_text(encoding="utf-8")
return {
"company_name": company_name,
"memo_path": memo_path,
"memo_content": memo_content,
"charts": [str(c) for c in charts],
"elapsed_seconds": elapsed,
"backend": self.config.llm_backend.value,
"raw_result": str(result),
}