|
|
import torch |
|
|
import json |
|
|
import os |
|
|
import tempfile |
|
|
import io |
|
|
import warnings |
|
|
import time |
|
|
from pathlib import Path |
|
|
from typing import Optional, Tuple |
|
|
from fastapi import FastAPI, UploadFile, File, HTTPException |
|
|
from fastapi.middleware.cors import CORSMiddleware |
|
|
from fastapi.responses import JSONResponse |
|
|
from transformers import AutoTokenizer, AutoModelForCausalLM |
|
|
from docx import Document as DocxDocument |
|
|
from pptx import Presentation |
|
|
import logging |
|
|
from PIL import Image |
|
|
import pytesseract |
|
|
from pdf2image import convert_from_path |
|
|
import easyocr |
|
|
|
|
|
warnings.filterwarnings("ignore", category=UserWarning, module="pdfminer") |
|
|
warnings.filterwarnings("ignore", category=FutureWarning, module="transformers") |
|
|
warnings.filterwarnings("ignore", message=".*Cannot set gray.*") |
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
logging.getLogger("pdfminer").setLevel(logging.ERROR) |
|
|
|
|
|
from pdfminer.high_level import extract_text as extract_pdf_text |
|
|
|
|
|
app = FastAPI( |
|
|
title="Deckgpt", |
|
|
description="Upload your startup pitch deck (PDF, PPT, DOCX) and get an investor-style review", |
|
|
version="1.0.0" |
|
|
) |
|
|
|
|
|
app.add_middleware( |
|
|
CORSMiddleware, |
|
|
allow_origins=["*"], |
|
|
allow_credentials=True, |
|
|
allow_methods=["*"], |
|
|
allow_headers=["*"], |
|
|
) |
|
|
|
|
|
MODEL_ID = os.environ.get("MODEL_ID", "HuggingFaceH4/zephyr-7b-beta") |
|
|
model = None |
|
|
tokenizer = None |
|
|
ocr_reader = None |
|
|
|
|
|
@app.on_event("startup") |
|
|
async def load_model(): |
|
|
"""Load the Zephyr tokenizer/model and OCR reader on startup""" |
|
|
global tokenizer, model, ocr_reader |
|
|
try: |
|
|
if tokenizer is not None and model is not None: |
|
|
logger.info("Model already loaded, skipping startup initialization") |
|
|
return |
|
|
logger.info(f"Loading model: {MODEL_ID} ...") |
|
|
logger.info("Loading Zephyr tokenizer and model (device auto)...") |
|
|
|
|
|
tokenizer = AutoTokenizer.from_pretrained(MODEL_ID) |
|
|
if tokenizer.pad_token is None: |
|
|
tokenizer.pad_token = tokenizer.eos_token |
|
|
|
|
|
|
|
|
desired_dtype = torch.bfloat16 if torch.cuda.is_available() or (hasattr(torch, "cpu") and getattr(torch.cpu, "is_bf16_supported", lambda: False)()) else torch.float32 |
|
|
model = AutoModelForCausalLM.from_pretrained( |
|
|
MODEL_ID, |
|
|
dtype=desired_dtype, |
|
|
device_map="auto", |
|
|
low_cpu_mem_usage=True |
|
|
).eval() |
|
|
logger.info("✅ Zephyr loaded successfully (device auto)!") |
|
|
|
|
|
logger.info("Loading OCR reader...") |
|
|
try: |
|
|
ocr_model_dir = os.path.join(tempfile.gettempdir(), '.EasyOCR') |
|
|
os.makedirs(ocr_model_dir, exist_ok=True) |
|
|
os.chmod(ocr_model_dir, 0o777) |
|
|
|
|
|
old_home = os.environ.get('HOME') |
|
|
try: |
|
|
os.environ['HOME'] = tempfile.gettempdir() |
|
|
ocr_reader = easyocr.Reader( |
|
|
['en'], |
|
|
gpu=torch.cuda.is_available(), |
|
|
model_storage_directory=ocr_model_dir |
|
|
) |
|
|
logger.info("✅ OCR reader loaded successfully!") |
|
|
finally: |
|
|
if old_home: |
|
|
os.environ['HOME'] = old_home |
|
|
elif 'HOME' in os.environ: |
|
|
del os.environ['HOME'] |
|
|
except Exception as e: |
|
|
logger.warning(f"⚠️ EasyOCR failed to load, will use pytesseract fallback: {e}") |
|
|
ocr_reader = None |
|
|
except Exception as e: |
|
|
logger.error(f"❌ Failed to load model: {e}") |
|
|
raise |
|
|
|
|
|
|
|
|
def extract_text_with_ocr(image_path_or_image, use_easyocr: bool = True) -> str: |
|
|
"""Extract text from image using OCR""" |
|
|
try: |
|
|
if use_easyocr and ocr_reader is not None: |
|
|
if isinstance(image_path_or_image, str): |
|
|
result = ocr_reader.readtext(image_path_or_image) |
|
|
else: |
|
|
result = ocr_reader.readtext(image_path_or_image) |
|
|
text_parts = [detection[1] for detection in result if detection[2] > 0.5] |
|
|
return "\n".join(text_parts) |
|
|
else: |
|
|
if isinstance(image_path_or_image, str): |
|
|
img = Image.open(image_path_or_image) |
|
|
else: |
|
|
img = image_path_or_image |
|
|
text = pytesseract.image_to_string(img, lang='eng') |
|
|
return text |
|
|
except Exception as e: |
|
|
logger.warning(f"OCR extraction failed: {e}") |
|
|
return "" |
|
|
|
|
|
def extract_text_from_pdf(file_path: str, use_ocr: bool = True) -> str: |
|
|
"""Extract text from PDF file with OCR fallback""" |
|
|
text_parts = [] |
|
|
|
|
|
try: |
|
|
text = extract_pdf_text(file_path) |
|
|
if text and text.strip(): |
|
|
text_parts.append(text.strip()) |
|
|
except Exception as e: |
|
|
logger.warning(f"PDF text extraction failed: {e}") |
|
|
|
|
|
if use_ocr: |
|
|
try: |
|
|
images = convert_from_path(file_path, dpi=300, first_page=1, last_page=50) |
|
|
logger.info(f"Processing {len(images)} pages with OCR...") |
|
|
|
|
|
for i, img in enumerate(images): |
|
|
ocr_text = extract_text_with_ocr(img, use_easyocr=True) |
|
|
if ocr_text.strip(): |
|
|
text_parts.append(f"\n--- Page {i+1} (OCR) ---\n{ocr_text}") |
|
|
except Exception as e: |
|
|
logger.warning(f"OCR processing failed: {e}") |
|
|
|
|
|
combined_text = "\n\n".join(text_parts) |
|
|
if not combined_text or not combined_text.strip(): |
|
|
raise ValueError("No readable text found in PDF (tried both text extraction and OCR)") |
|
|
return combined_text |
|
|
|
|
|
def extract_text_from_docx(file_path: str) -> str: |
|
|
"""Extract text from DOCX file""" |
|
|
try: |
|
|
doc = DocxDocument(file_path) |
|
|
text_parts = [] |
|
|
for paragraph in doc.paragraphs: |
|
|
if paragraph.text.strip(): |
|
|
text_parts.append(paragraph.text.strip()) |
|
|
|
|
|
for table in doc.tables: |
|
|
for row in table.rows: |
|
|
for cell in row.cells: |
|
|
if cell.text.strip(): |
|
|
text_parts.append(cell.text.strip()) |
|
|
|
|
|
text = "\n".join(text_parts) |
|
|
if not text or not text.strip(): |
|
|
raise ValueError("No readable text found in DOCX") |
|
|
return text |
|
|
except Exception as e: |
|
|
raise ValueError(f"Error extracting text from DOCX: {str(e)}") |
|
|
|
|
|
def extract_text_from_ppt(file_path: str, use_ocr: bool = True) -> str: |
|
|
"""Extract text from PowerPoint file with OCR for images""" |
|
|
text_parts = [] |
|
|
|
|
|
try: |
|
|
prs = Presentation(file_path) |
|
|
|
|
|
for slide_num, slide in enumerate(prs.slides, 1): |
|
|
slide_text = [] |
|
|
|
|
|
for shape in slide.shapes: |
|
|
if hasattr(shape, "text") and shape.text.strip(): |
|
|
slide_text.append(shape.text.strip()) |
|
|
elif hasattr(shape, "table"): |
|
|
for row in shape.table.rows: |
|
|
row_text = [] |
|
|
for cell in row.cells: |
|
|
if cell.text.strip(): |
|
|
row_text.append(cell.text.strip()) |
|
|
if row_text: |
|
|
slide_text.append(" | ".join(row_text)) |
|
|
elif use_ocr and hasattr(shape, "image"): |
|
|
try: |
|
|
image = shape.image |
|
|
image_bytes = image.blob |
|
|
img = Image.open(io.BytesIO(image_bytes)) |
|
|
ocr_text = extract_text_with_ocr(img, use_easyocr=True) |
|
|
if ocr_text.strip(): |
|
|
slide_text.append(f"[Image OCR]: {ocr_text.strip()}") |
|
|
except Exception as e: |
|
|
logger.debug(f"OCR on slide {slide_num} image failed: {e}") |
|
|
|
|
|
if slide_text: |
|
|
text_parts.append(f"Slide {slide_num}:\n" + "\n".join(slide_text)) |
|
|
|
|
|
text = "\n\n".join(text_parts) |
|
|
if not text or not text.strip(): |
|
|
raise ValueError("No readable text found in PPT") |
|
|
return text |
|
|
except Exception as e: |
|
|
raise ValueError(f"Error extracting text from PPT: {str(e)}") |
|
|
|
|
|
def extract_text_from_file(file_path: str, file_extension: str) -> str: |
|
|
""" |
|
|
Main extraction function that routes to appropriate extractor based on file type |
|
|
""" |
|
|
extension = file_extension.lower() |
|
|
|
|
|
if extension == ".pdf": |
|
|
return extract_text_from_pdf(file_path) |
|
|
elif extension in [".docx", ".doc"]: |
|
|
return extract_text_from_docx(file_path) |
|
|
elif extension in [".pptx", ".ppt"]: |
|
|
return extract_text_from_ppt(file_path) |
|
|
else: |
|
|
raise ValueError(f"Unsupported file type: {extension}. Supported: PDF, DOCX, PPT/PPTX") |
|
|
|
|
|
def chunk_text(text: str, chunk_size: int = 6000, overlap: int = 500) -> list: |
|
|
""" |
|
|
Split text into overlapping chunks for processing |
|
|
""" |
|
|
if len(text) <= chunk_size: |
|
|
return [text] |
|
|
|
|
|
chunks = [] |
|
|
start = 0 |
|
|
|
|
|
while start < len(text): |
|
|
end = start + chunk_size |
|
|
|
|
|
if end >= len(text): |
|
|
chunks.append(text[start:]) |
|
|
break |
|
|
|
|
|
chunk_end = text.rfind('\n\n', start, end) |
|
|
if chunk_end == -1: |
|
|
chunk_end = text.rfind('\n', start, end) |
|
|
if chunk_end == -1: |
|
|
chunk_end = text.rfind('. ', start, end) |
|
|
if chunk_end == -1: |
|
|
chunk_end = end |
|
|
|
|
|
chunks.append(text[start:chunk_end]) |
|
|
start = chunk_end - overlap |
|
|
if start < 0: |
|
|
start = 0 |
|
|
|
|
|
return chunks |
|
|
|
|
|
def review_pitchdeck(text: str) -> dict: |
|
|
""" |
|
|
Send text to Zephyr model for VC-level review and return structured JSON |
|
|
Uses chunking for long documents to improve processing speed |
|
|
Zephyr-7b-beta has 4096 token context limit |
|
|
""" |
|
|
if not text or not text.strip(): |
|
|
raise ValueError("No text content provided for review") |
|
|
|
|
|
max_tokens = 3800 |
|
|
estimated_chars_per_token = 4 |
|
|
|
|
|
max_text_length = int(max_tokens * estimated_chars_per_token * 0.8) |
|
|
|
|
|
if len(text) > max_text_length: |
|
|
logger.info(f"Text length ({len(text)} chars) exceeds safe limit ({max_text_length} chars), using chunking strategy") |
|
|
chunks = chunk_text(text[:max_text_length], chunk_size=5000, overlap=500) |
|
|
|
|
|
logger.info(f"Processing {len(chunks)} chunks...") |
|
|
|
|
|
slide_reviews_combined = [] |
|
|
all_insights = [] |
|
|
|
|
|
for i, chunk in enumerate(chunks): |
|
|
logger.info(f"Processing chunk {i+1}/{len(chunks)} ({len(chunk)} chars)...") |
|
|
chunk_result = _review_chunk(chunk, is_partial=True, chunk_num=i+1, total_chunks=len(chunks)) |
|
|
|
|
|
if chunk_result.get("slide_reviews"): |
|
|
slide_reviews_combined.extend(chunk_result["slide_reviews"]) |
|
|
|
|
|
if chunk_result.get("vc_insights"): |
|
|
all_insights.append(chunk_result["vc_insights"]) |
|
|
|
|
|
logger.info("Combining chunk results into final review...") |
|
|
return _combine_chunk_results(slide_reviews_combined, all_insights, text[:max_text_length]) |
|
|
|
|
|
deck_text = text[:6000] |
|
|
return _review_chunk(deck_text, is_partial=False) |
|
|
|
|
|
def _repair_json(json_str: str) -> str: |
|
|
""" |
|
|
Attempt to repair common JSON syntax errors from model output |
|
|
""" |
|
|
import re |
|
|
|
|
|
json_str = json_str.strip() |
|
|
|
|
|
if not json_str.startswith('{'): |
|
|
json_str = '{' + json_str |
|
|
if not json_str.endswith('}'): |
|
|
json_str = json_str + '}' |
|
|
|
|
|
json_str = re.sub(r',\s*}', '}', json_str) |
|
|
json_str = re.sub(r',\s*]', ']', json_str) |
|
|
|
|
|
json_str = re.sub(r'}\s*{', '},{', json_str) |
|
|
|
|
|
json_str = re.sub(r'"\s*\n\s*"', '","', json_str) |
|
|
|
|
|
json_str = re.sub(r'(\w+):\s*([^,}\]]+?)\s*(?=\n\s*["\w])', r'\1: "\2",', json_str, flags=re.MULTILINE) |
|
|
|
|
|
lines = json_str.split('\n') |
|
|
repaired_lines = [] |
|
|
for i, line in enumerate(lines): |
|
|
line = line.rstrip() |
|
|
if not line: |
|
|
repaired_lines.append('') |
|
|
continue |
|
|
|
|
|
if i < len(lines) - 1: |
|
|
next_line = lines[i + 1].strip() |
|
|
if (line.endswith('"') or line.endswith(']') or line.endswith('}')) and next_line.startswith('"'): |
|
|
if not line.rstrip().endswith(','): |
|
|
line = line.rstrip() + ',' |
|
|
|
|
|
repaired_lines.append(line) |
|
|
|
|
|
json_str = '\n'.join(repaired_lines) |
|
|
|
|
|
json_str = re.sub(r',(\s*[}\]])', r'\1', json_str) |
|
|
|
|
|
return json_str |
|
|
|
|
|
def _review_chunk(deck_text: str, is_partial: bool = False, chunk_num: int = 1, total_chunks: int = 1) -> dict: |
|
|
|
|
|
chunk_context = f"\n\n[Processing chunk {chunk_num} of {total_chunks} - focus on slides in this section]" if is_partial else "" |
|
|
|
|
|
system_message = """You are a senior venture capitalist with 15+ years of experience evaluating thousands of pitch decks. You know the patterns that lead to funding vs. ghosting. Based on extensive research analyzing hundreds of decks, these are the critical failure points: |
|
|
|
|
|
1. Beautiful decks missing commercial backbone (GTM, financials, market sizing, clear ask) |
|
|
2. Giant market claims without credibility - claiming $50B TAM instead of sharp, addressable market |
|
|
3. Mission over mechanics - purpose without profitable model |
|
|
4. No Go-To-Market strategy - "we'll figure it out" isn't a plan |
|
|
5. Traction theatre - vanity metrics instead of real growth (show % growth WOW, paid users, conversion rates) |
|
|
6. Team slide buried or weak - investors back founders at pre-seed, put team early |
|
|
7. Missing moat - can't explain defensibility in one clear line |
|
|
8. Unclear ask - vague "seeking partners" instead of specific: "Raising £400k to reach 10k users, £350k ARR" |
|
|
9. Overstuffed or underexplained - should be 12-14 slides, 1 key message per slide |
|
|
10. No financial logic - even pre-seed needs 3-year revenue/burn/milestone map |
|
|
|
|
|
THE 5 CRITICAL QUESTIONS every deck must answer clearly: |
|
|
1. What problem are you solving? |
|
|
2. Who's paying? |
|
|
3. How do you reach them? |
|
|
4. Why you? |
|
|
5. What do you need? |
|
|
|
|
|
Be brutally honest. Commercial clarity keeps doors open - GTM and financials get you funded. Emotion opens the door, but logic closes the deal.""" |
|
|
|
|
|
task_instruction = """TASK: |
|
|
Evaluate this deck against these real-world failure patterns. Check specifically for: commercial backbone, credible market sizing, GTM clarity, real traction metrics, team positioning, moat definition, specific ask, slide count/clarity, and financial logic.""" if not is_partial else """TASK: |
|
|
Review this section of the deck. Extract slide-by-slide analysis. Focus on identifying slide content, titles, and issues. Note: This is part of a larger deck - provide detailed slide reviews for this section only.""" |
|
|
|
|
|
user_message = f"""Deck Content{chunk_context}: |
|
|
{deck_text} |
|
|
|
|
|
{task_instruction} |
|
|
|
|
|
Produce ONLY valid JSON with these exact fields: |
|
|
|
|
|
{{ |
|
|
"verdict": "Invest" | "Follow-up" | "Pass", |
|
|
"score": 0-100, |
|
|
"grade": "A+" | "A" | "A-" | "B+" | "B" | "B-" | "C+" | "C" | "C-" | "D" | "F", |
|
|
"top_line": "1-2 sentence executive summary from VC perspective", |
|
|
"investment_readiness": "Ready" | "Near-ready" | "Needs-work" | "Not-ready", |
|
|
"critical_questions_check": {{ |
|
|
"what_problem": "clear" | "unclear" | "missing", |
|
|
"who_paying": "clear" | "unclear" | "missing", |
|
|
"how_reach_them": "clear" | "unclear" | "missing", |
|
|
"why_you": "clear" | "unclear" | "missing", |
|
|
"what_need": "clear" | "unclear" | "missing" |
|
|
}}, |
|
|
"common_failures": [ |
|
|
"failure pattern 1 (from top 10 list)", |
|
|
"failure pattern 2" |
|
|
], |
|
|
"deal_breakers": ["critical issue 1", "critical issue 2", ...], |
|
|
"high_potential_signals": ["positive signal 1", "positive signal 2", ...], |
|
|
"priority_questions": ["question 1", "question 2", "question 3"], |
|
|
"scores": {{ |
|
|
"storyline_clarity": 0-100, |
|
|
"problem_solution": 0-100, |
|
|
"market_opportunity": 0-100, |
|
|
"market_credibility": 0-100, |
|
|
"go_to_market": 0-100, |
|
|
"traction_quality": 0-100, |
|
|
"business_model": 0-100, |
|
|
"team": 0-100, |
|
|
"moat_defensibility": 0-100, |
|
|
"financials_ask": 0-100, |
|
|
"ask_specificity": 0-100, |
|
|
"design_communication": 0-100 |
|
|
}}, |
|
|
"slide_reviews": [ |
|
|
{{ |
|
|
"slide_no": 1, |
|
|
"title": "slide title", |
|
|
"investor_comment": "VC-style critique", |
|
|
"severity": "critical" | "major" | "minor" | "good", |
|
|
"rewrite_suggestion": "specific improvement recommendation" |
|
|
}} |
|
|
], |
|
|
"vc_insights": {{ |
|
|
"commercial_backbone": "assessment of GTM, financials, ask clarity", |
|
|
"market_credibility": "assessment of market sizing realism", |
|
|
"gtm_clarity": "assessment of distribution strategy", |
|
|
"traction_reality": "assessment of metrics authenticity vs. vanity", |
|
|
"investment_thesis": "why invest or pass" |
|
|
}}, |
|
|
"slide_count": number, |
|
|
"slide_count_assessment": "optimal (12-14)" | "too_many" | "too_few" |
|
|
}}""" |
|
|
|
|
|
try: |
|
|
messages = [ |
|
|
{"role": "system", "content": system_message}, |
|
|
{"role": "user", "content": user_message} |
|
|
] |
|
|
|
|
|
start_time = time.time() |
|
|
prompt = tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True) |
|
|
inputs = tokenizer(prompt, return_tensors="pt", truncation=True, max_length=3800).to(model.device) |
|
|
|
|
|
with torch.no_grad(): |
|
|
outputs = model.generate( |
|
|
**inputs, |
|
|
max_new_tokens=800, |
|
|
temperature=0.2, |
|
|
top_p=0.9, |
|
|
do_sample=True, |
|
|
repetition_penalty=1.08, |
|
|
pad_token_id=tokenizer.eos_token_id, |
|
|
use_cache=True |
|
|
) |
|
|
|
|
|
generation_time = time.time() - start_time |
|
|
raw_text = tokenizer.decode(outputs[0], skip_special_tokens=True) |
|
|
if "<|assistant|>" in raw_text: |
|
|
raw_text = raw_text.split("<|assistant|>")[-1] |
|
|
|
|
|
logger.info(f"✅ Generated {len(raw_text)} chars in {generation_time:.2f}s") |
|
|
|
|
|
start = raw_text.find('{') |
|
|
end = raw_text.rfind('}') + 1 |
|
|
if start == -1 or end <= 0: |
|
|
raise ValueError("No JSON object found in model output") |
|
|
|
|
|
json_str = raw_text[start:end] |
|
|
try: |
|
|
return json.loads(json_str) |
|
|
except json.JSONDecodeError as e: |
|
|
repaired = _repair_json(json_str) |
|
|
return json.loads(repaired) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Model generation error: {e}") |
|
|
raise ValueError(f"Error during model inference: {str(e)}") |
|
|
|
|
|
def _combine_chunk_results(slide_reviews: list, insights: list, full_text: str) -> dict: |
|
|
""" |
|
|
Combine results from multiple chunks into a single comprehensive review |
|
|
""" |
|
|
system_message = """You are synthesizing multiple partial reviews of a pitch deck into one comprehensive VC evaluation.""" |
|
|
|
|
|
user_message = f"""You have received partial reviews of a pitch deck. Combine them into one final comprehensive review. |
|
|
|
|
|
Slide Reviews from chunks: |
|
|
{json.dumps(slide_reviews[:50], indent=2)} |
|
|
|
|
|
Key Insights: |
|
|
{json.dumps(insights, indent=2)} |
|
|
|
|
|
Full Deck Length: {len(full_text)} characters |
|
|
|
|
|
Produce a FINAL comprehensive review with the same JSON structure as before, consolidating all findings.""" |
|
|
|
|
|
try: |
|
|
messages = [ |
|
|
{"role": "system", "content": system_message}, |
|
|
{"role": "user", "content": user_message} |
|
|
] |
|
|
prompt = tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True) |
|
|
inputs = tokenizer(prompt, return_tensors="pt", truncation=True, max_length=3800).to(model.device) |
|
|
|
|
|
with torch.no_grad(): |
|
|
outputs = model.generate( |
|
|
**inputs, |
|
|
max_new_tokens=800, |
|
|
temperature=0.2, |
|
|
top_p=0.9, |
|
|
do_sample=True, |
|
|
repetition_penalty=1.05, |
|
|
pad_token_id=tokenizer.eos_token_id, |
|
|
use_cache=True |
|
|
) |
|
|
|
|
|
raw_output = tokenizer.decode(outputs[0], skip_special_tokens=True) |
|
|
if "<|assistant|>" in raw_output: |
|
|
raw_output = raw_output.split("<|assistant|>")[-1] |
|
|
|
|
|
start = raw_output.find('{') |
|
|
end = raw_output.rfind('}') + 1 |
|
|
|
|
|
if start == -1 or end == 0: |
|
|
logger.warning("Failed to parse combined result, returning basic structure") |
|
|
return { |
|
|
"verdict": "Follow-up", |
|
|
"score": 70, |
|
|
"grade": "B", |
|
|
"top_line": "Deck reviewed across multiple sections", |
|
|
"slide_reviews": slide_reviews[:20], |
|
|
"note": "Combined from chunked processing" |
|
|
} |
|
|
|
|
|
json_str = raw_output[start:end] |
|
|
|
|
|
try: |
|
|
combined_json = json.loads(json_str) |
|
|
except json.JSONDecodeError: |
|
|
try: |
|
|
combined_json = json.loads(_repair_json(json_str)) |
|
|
except Exception: |
|
|
logger.warning("JSON repair failed, returning basic structure") |
|
|
return { |
|
|
"verdict": "Follow-up", |
|
|
"score": 70, |
|
|
"grade": "B", |
|
|
"top_line": "Deck reviewed across multiple sections", |
|
|
"slide_reviews": slide_reviews[:20], |
|
|
"note": "Combined from chunked processing" |
|
|
} |
|
|
|
|
|
if slide_reviews and not combined_json.get("slide_reviews"): |
|
|
combined_json["slide_reviews"] = slide_reviews[:30] |
|
|
|
|
|
return combined_json |
|
|
|
|
|
except Exception as e: |
|
|
logger.warning(f"Combining chunks failed: {e}, returning first chunk result") |
|
|
return { |
|
|
"verdict": "Follow-up", |
|
|
"score": 70, |
|
|
"grade": "B", |
|
|
"top_line": "Deck processed in chunks", |
|
|
"slide_reviews": slide_reviews[:20] if slide_reviews else [] |
|
|
} |
|
|
|
|
|
def generate_improvement_pointers(review: dict) -> dict: |
|
|
"""Generate specific improvement pointers for decks below 80% or lacking clarity""" |
|
|
score = review.get("score", 0) |
|
|
storyline_clarity = review.get("scores", {}).get("storyline_clarity", 0) |
|
|
|
|
|
needs_improvement = score < 80 or storyline_clarity < 70 |
|
|
|
|
|
if not needs_improvement: |
|
|
return { |
|
|
"needs_improvement": False, |
|
|
"improvement_pointers": [] |
|
|
} |
|
|
|
|
|
system_message = """You are a pitch deck consultant with expertise from reviewing hundreds of founder decks. Provide actionable, specific improvement pointers grounded in real-world failure patterns. |
|
|
|
|
|
Focus on fixing the TOP 10 COMMON FAILURES: |
|
|
1. Add commercial backbone: GTM plan, financials, market sizing, clear ask |
|
|
2. Fix market credibility: replace giant TAM claims with sharp, addressable market |
|
|
3. Add commercial mechanics: show how purpose becomes profit |
|
|
4. Create GTM strategy: Channels → Cost → Conversion → Timeline (one slide is enough) |
|
|
5. Replace traction theatre: show real metrics (% growth WOW, paid users, conversion rates, pilot outcomes) |
|
|
6. Reposition team: move team slide early (after problem/product), anchor with "lived the problem" |
|
|
7. Define moat: one clear line explaining defensibility |
|
|
8. Make ask specific: "Raising £X to achieve Y milestone, Z revenue" (not vague "seeking partners") |
|
|
9. Optimize slide count: 12-14 slides, 1 key message per slide |
|
|
10. Add financial logic: 3-year revenue/burn/milestone outline |
|
|
|
|
|
Generate 5-10 prioritized improvement pointers addressing the specific failures identified. Focus on: |
|
|
- Highest impact changes that will move the needle on score and commercial clarity |
|
|
- Specific, actionable recommendations (not vague advice) |
|
|
- What to fix first, second, third |
|
|
- Slide-by-slide improvements where critical issues were identified |
|
|
- How to address deal breakers and common failure patterns |
|
|
- Quick wins vs. strategic changes""" |
|
|
|
|
|
user_message = f"""VC Review: |
|
|
{json.dumps(review, indent=2)} |
|
|
|
|
|
Return ONLY valid JSON: |
|
|
{{ |
|
|
"improvement_pointers": [ |
|
|
{{ |
|
|
"priority": 1, |
|
|
"category": "category name (e.g., GTM, Market Credibility, Traction, etc.)", |
|
|
"failure_pattern": "which of the top 10 failures this addresses", |
|
|
"issue": "specific problem from deck", |
|
|
"recommendation": "actionable fix with example", |
|
|
"expected_impact": "how this moves the needle" |
|
|
}} |
|
|
], |
|
|
"quick_wins": ["quick fix 1", "quick fix 2"], |
|
|
"strategic_changes": ["strategic change 1", "strategic change 2"], |
|
|
"critical_fixes": ["must-fix issue 1", "must-fix issue 2"] |
|
|
}}""" |
|
|
|
|
|
try: |
|
|
messages = [ |
|
|
{"role": "system", "content": system_message}, |
|
|
{"role": "user", "content": user_message} |
|
|
] |
|
|
prompt = tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True) |
|
|
inputs = tokenizer(prompt, return_tensors="pt", truncation=True, max_length=3600).to(model.device) |
|
|
|
|
|
with torch.no_grad(): |
|
|
outputs = model.generate( |
|
|
**inputs, |
|
|
max_new_tokens=600, |
|
|
temperature=0.25, |
|
|
top_p=0.9, |
|
|
do_sample=True, |
|
|
repetition_penalty=1.05, |
|
|
pad_token_id=tokenizer.eos_token_id, |
|
|
use_cache=True |
|
|
) |
|
|
|
|
|
raw_output = tokenizer.decode(outputs[0], skip_special_tokens=True) |
|
|
if "<|assistant|>" in raw_output: |
|
|
raw_output = raw_output.split("<|assistant|>")[-1] |
|
|
|
|
|
start = raw_output.find('{') |
|
|
end = raw_output.rfind('}') + 1 |
|
|
|
|
|
if start == -1 or end == 0: |
|
|
return { |
|
|
"needs_improvement": True, |
|
|
"improvement_pointers": [{"priority": 1, "category": "General", "recommendation": "Focus on improving storyline clarity and addressing identified deal breakers"}] |
|
|
} |
|
|
|
|
|
json_str = raw_output[start:end] |
|
|
|
|
|
try: |
|
|
improvement_json = json.loads(json_str) |
|
|
except json.JSONDecodeError: |
|
|
try: |
|
|
improvement_json = json.loads(_repair_json(json_str)) |
|
|
except Exception: |
|
|
logger.warning("JSON repair failed, returning default improvement structure") |
|
|
return { |
|
|
"needs_improvement": True, |
|
|
"improvement_pointers": [{"priority": 1, "category": "General", "recommendation": "Review and address all deal breakers and low-scoring areas identified in the review"}] |
|
|
} |
|
|
|
|
|
improvement_json["needs_improvement"] = True |
|
|
return improvement_json |
|
|
|
|
|
except Exception as e: |
|
|
logger.warning(f"Improvement pointers generation failed: {e}") |
|
|
return { |
|
|
"needs_improvement": True, |
|
|
"improvement_pointers": [{"priority": 1, "category": "General", "recommendation": "Review and address all deal breakers and low-scoring areas identified in the review"}] |
|
|
} |
|
|
|
|
|
@app.get("/") |
|
|
async def root(): |
|
|
"""Health check endpoint""" |
|
|
return { |
|
|
"status": "healthy", |
|
|
"message": "Deckgpt API", |
|
|
"model": MODEL_ID, |
|
|
"supported_formats": ["PDF", "DOCX", "PPT", "PPTX"] |
|
|
} |
|
|
|
|
|
@app.get("/health") |
|
|
async def health(): |
|
|
"""Health check endpoint""" |
|
|
return { |
|
|
"status": "healthy", |
|
|
"model_loaded": (model is not None and tokenizer is not None) |
|
|
} |
|
|
|
|
|
@app.post("/review") |
|
|
async def review_deck(file: UploadFile = File(...)): |
|
|
""" |
|
|
Upload a pitch deck file and get an investor-style review. |
|
|
|
|
|
Supported formats: PDF, DOCX, PPT, PPTX |
|
|
""" |
|
|
try: |
|
|
if model is None or tokenizer is None: |
|
|
raise HTTPException(status_code=503, detail="Model not loaded yet. Please wait for startup to complete.") |
|
|
|
|
|
if not file.filename: |
|
|
raise HTTPException(status_code=400, detail="Filename is missing") |
|
|
|
|
|
file_extension = Path(file.filename).suffix.lower() |
|
|
supported_extensions = [".pdf", ".docx", ".doc", ".ppt", ".pptx"] |
|
|
|
|
|
if file_extension not in supported_extensions: |
|
|
raise HTTPException( |
|
|
status_code=400, |
|
|
detail=f"Unsupported file type: {file_extension}. Supported: {', '.join(supported_extensions)}" |
|
|
) |
|
|
except HTTPException: |
|
|
raise |
|
|
except Exception as e: |
|
|
logger.error(f"Error in request validation: {e}", exc_info=True) |
|
|
raise HTTPException(status_code=500, detail=f"Request validation error: {str(e)}") |
|
|
|
|
|
temp_file = None |
|
|
try: |
|
|
suffix = file_extension |
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as temp_file: |
|
|
temp_path = temp_file.name |
|
|
|
|
|
content = await file.read() |
|
|
if not content: |
|
|
raise HTTPException(status_code=400, detail="Uploaded file is empty") |
|
|
|
|
|
temp_file.write(content) |
|
|
temp_file.flush() |
|
|
|
|
|
try: |
|
|
logger.info(f"Extracting text from {file.filename} ({file_extension})") |
|
|
deck_text = extract_text_from_file(temp_path, file_extension) |
|
|
logger.info(f"Extracted {len(deck_text)} characters from file") |
|
|
except ValueError as e: |
|
|
raise HTTPException(status_code=400, detail=str(e)) |
|
|
except Exception as e: |
|
|
logger.error(f"File extraction error: {e}") |
|
|
raise HTTPException(status_code=500, detail=f"Error processing file: {str(e)}") |
|
|
|
|
|
try: |
|
|
logger.info("Generating VC-level review...") |
|
|
review_result = review_pitchdeck(deck_text) |
|
|
logger.info("Review generated successfully") |
|
|
|
|
|
logger.info("Checking if improvement pointers are needed...") |
|
|
try: |
|
|
improvement_pointers = generate_improvement_pointers(review_result) |
|
|
review_result["improvement_analysis"] = improvement_pointers |
|
|
except Exception as imp_error: |
|
|
logger.warning(f"Improvement pointers generation failed: {imp_error}, continuing without it") |
|
|
review_result["improvement_analysis"] = { |
|
|
"needs_improvement": True, |
|
|
"improvement_pointers": [], |
|
|
"error": "Failed to generate improvement pointers" |
|
|
} |
|
|
|
|
|
return JSONResponse(content=review_result) |
|
|
except ValueError as e: |
|
|
logger.error(f"ValueError in review generation: {e}", exc_info=True) |
|
|
raise HTTPException(status_code=500, detail=str(e)) |
|
|
except Exception as e: |
|
|
logger.error(f"Review generation error: {e}", exc_info=True) |
|
|
raise HTTPException(status_code=500, detail=f"Error generating review: {str(e)}") |
|
|
|
|
|
except HTTPException: |
|
|
raise |
|
|
except Exception as e: |
|
|
logger.error(f"Unexpected error in review endpoint: {e}", exc_info=True) |
|
|
raise HTTPException(status_code=500, detail=f"Unexpected error: {str(e)}") |
|
|
|
|
|
finally: |
|
|
if temp_file and os.path.exists(temp_path): |
|
|
try: |
|
|
os.unlink(temp_path) |
|
|
except Exception as e: |
|
|
logger.warning(f"Failed to delete temp file {temp_path}: {e}") |
|
|
|
|
|
if __name__ == "__main__": |
|
|
import uvicorn |
|
|
uvicorn.run(app, host="0.0.0.0", port=7860) |
|
|
|