"""PitchFight AI — shared API handler functions for REST and Gradio routes.""" from __future__ import annotations import logging import os import re from typing import Any from dotenv import load_dotenv from core.attack_tags import get_attack_tags, get_next_attack_tag, get_answer_checklist from core.persona_builder import build_persona_prompt from core.samples import get_sample_startup from core.scoring_engine import ( mock_scorecard, generate_real_scorecard, generate_claim_based_scorecard, build_session_aware_fallback_scorecard, ) from core.claim_extractor import extract_concrete_signals from core.judge_settings import normalize_difficulty, get_label, get_pressure_display_label from core import battle_flow from core import model_router from core import session_manager from core.output_sanitizer import sanitize_model_output from core import voice_handler from core import retry_handler from core import session_repository from core.deal_verdict import build_judge_verdict from core.deal_phase import start_deal_phase from core.deal_flow import next_deal_round from core.deal_scoring_engine import generate_deal_scorecard from core.json_utils import parse_model_json load_dotenv() logger = logging.getLogger(__name__) MAX_ROUNDS = int(os.getenv("MAX_ROUNDS", "6")) OPENING_MESSAGES: dict[str, tuple[str, str]] = { "skeptical_vc": ( "Market Size", "How big is this really? Student event discovery sounds like a nice feature, not a venture-scale business.", ), "technical_judge": ( "AI Justification", "Why does this need AI? A sorted event list with filters seems enough. What is the intelligence here?", ), "hackathon_judge": ( "User Pain", "Students already lurk in WhatsApp groups. What pain are you solving that a shared Google Sheet cannot?", ), } MOCK_FOLLOWUPS: dict[str, list[str]] = { "skeptical_vc": [ "You named competitors but did not explain why students switch. What is your wedge for the first 100 users?", "Where is the retention? Why would a student open this weekly instead of once before a hackathon?", "Walk me through revenue. Who pays and why would they pay you instead of Luma or LinkedIn?", "What stops a bigger platform from adding your ranking layer in a weekend?", "Your traction sounds like a prototype. What metric proves demand, not just build activity?", "If I gave you $50k today, what single milestone would prove this is investable?", ], "technical_judge": [ "What data do you rank on, and how do you keep event metadata fresh without manual cleanup?", "If ranking is the core value, why is a small model better than deterministic scoring rules?", "What happens when two students with different goals get the same top recommendation?", "How does this scale beyond one campus without quality collapsing?", "What is your failure mode when event sources break or duplicate listings?", "Show me the simplest non-AI version. Why is that not good enough?", ], "hackathon_judge": [ "In one sentence: what is novel here versus another event aggregator?", "If I only saw a 30-second demo, what would convince me the AI matching is real?", "What did you ship this weekend that proves user pain, not just scraped listings?", "Why is AI load-bearing in the MVP instead of optional polish?", "How does this fit the Backyard AI theme beyond using a model as a label?", "What will I remember about your project after judging 40 teams?", ], } _HISTORY_WINDOW = 12 # max turns sent to Nemotron for live inference def pressure_level(round_number: int) -> str: if round_number <= 2: return "Medium" if round_number <= 4: return "High" return "Extreme" def get_battle_phase(round_number: int) -> str: """Return a battle phase label based on round count.""" if round_number <= 3: return "explore" if round_number <= 6: return "pressure" return "close" import re as _re _HAS_NUMBER = _re.compile(r"\d") _HAS_USER_WORD = _re.compile( r"\b(users?|customers?|students?|people|patients?|teachers?|clients?|founders?|hospitals?)\b", _re.IGNORECASE, ) def _micro_coach_tip(message: str, quality: str, attack_tag: str, difficulty_profile: str) -> str: """One short, encouraging nudge after a round — teaches the next answer, not the score. Local only (no API). Trains the founder toward 'minimum viable answer': one number, one user, one real result. Kept gentle for Practice; empty when nothing useful to add. """ text = (message or "").strip() if not text: return "" if quality == "non_answer": return "Take a real guess next time — even one specific detail beats a blank." has_number = bool(_HAS_NUMBER.search(text)) has_user = bool(_HAS_USER_WORD.search(text)) if not has_number: return "Good start — next time add one number (a count, a result, or a price)." if not has_user: return "Nice, you gave a number — next time say who it's for or who it's from." return "Solid — to go further, tie that proof directly to the question asked." def _recent_history(session_id: str, max_turns: int = _HISTORY_WINDOW) -> list[dict]: """Return at most max_turns recent history entries for live inference.""" full = session_manager.get_history(session_id) return full[-max_turns:] if len(full) > max_turns else full def handle_load_sample() -> dict[str, Any]: """Return the EventRadar AI demo startup.""" return {"startup": get_sample_startup()} # --------------------------------------------------------------------------- # Prompt builders # --------------------------------------------------------------------------- def _build_opening_messages( startup: dict, persona: str, difficulty: str, attack_tag: str, ) -> list[dict[str, str]]: """Build the OpenAI-format messages list for the opening judge question.""" system_prompt = build_persona_prompt(persona, startup, difficulty) tags = get_attack_tags(persona) tags_preview = ", ".join(tags[:4]) user_content = ( f"Current attack focus: {attack_tag}\n" f"Other pressure angles available: {tags_preview}\n\n" "Open the battle. Ask your first hard question about the startup above. " "Do not introduce yourself. Do not say hello. Go straight to the question. " "Attack the weakest claim in the pitch. Keep it under 3 sentences. " "Ask exactly one question." ) return [ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_content}, ] def _build_followup_messages( startup: dict, persona: str, difficulty: str, attack_tag: str, history: list[dict[str, Any]], judge_action: dict[str, Any], answer_quality: dict[str, Any], ) -> list[dict[str, str]]: """Build the OpenAI-format messages list for a follow-up judge question. The prompt instruction varies based on judge_action to guide Nemotron toward the correct Socratic behavior: - follow_up_same_tag → press harder on the same topic - move_next_tag → acknowledge prior point, move cleanly - move_after_limit → briefly flag unresolved point, move on Voice mode note: History entries may originate from typed text or voice transcripts. The prompt wording uses "your answer" rather than "you typed" throughout. """ system_prompt = build_persona_prompt(persona, startup, difficulty) messages: list[dict[str, str]] = [ {"role": "system", "content": system_prompt}, ] # Replay conversation history as role turns (strip attack_tag metadata). # Cap at last 6 entries (3 full Q&A exchanges) so late-round input token # growth never crowds out the output budget on the opponent mode call. trimmed_history = history[-6:] if len(history) > 6 else history for entry in trimmed_history: role = entry.get("role", "user") content = entry.get("content", "") if role == "assistant": messages.append({"role": "assistant", "content": content}) else: messages.append({"role": "user", "content": content}) action = judge_action.get("judge_action", "follow_up_same_tag") prev_tag = judge_action.get("previous_attack_tag", attack_tag) quality = answer_quality.get("quality", "partial") transition = judge_action.get("transition_note", "") if action == "follow_up_same_tag": instruction = ( f"Attack focus: {attack_tag}\n" f"Your answer was classified as {quality}. {transition}\n\n" "The founder's last answer was insufficient. " "Ask one sharper, more specific follow-up on the SAME topic. " "Reference what they just said directly. " "Do not move to a new topic yet. " "Do not give advice. Do not say 'great answer' or 'interesting.' " "Keep it under 3 sentences. Ask exactly one question." ) elif action == "move_next_tag": instruction = ( f"New attack focus: {attack_tag}\n" f"Previous topic ({prev_tag}) is considered resolved. Do NOT revisit it.\n\n" "The founder gave a sufficient answer on the previous point. " "Move immediately to the new attack focus above. " "Do not keep drilling the previous topic. " "Do not say 'great answer', 'good point', 'well done', or any praise. " "Do not ask multiple questions. " "Do not give advice. " "Ask exactly one hard, specific question on the new attack focus. " "Keep the entire response under 3 sentences." ) else: # move_after_limit instruction = ( f"New attack focus: {attack_tag}\n" f"Previous topic ({prev_tag}) remains unresolved. {transition}\n\n" "Briefly note that the previous issue was not fully addressed — " "one short clause only, then move on. " "Ask one hard question on the new attack focus. " "Do not keep drilling the unresolved point. " "Do not give advice. Keep it under 4 sentences. Ask exactly one question." ) messages.append({"role": "user", "content": instruction}) return messages # --------------------------------------------------------------------------- # Handlers # --------------------------------------------------------------------------- def handle_start_session(payload: dict[str, Any]) -> dict[str, Any]: """Create a new pitch battle session and return the opening challenge.""" startup = payload.get("startup") or {} persona = payload.get("persona", "technical_judge") # Accept difficulty_profile (new) or difficulty (legacy) — normalize both raw_difficulty = ( payload.get("difficulty_profile") or payload.get("difficulty") or "practice" ) difficulty_profile = normalize_difficulty(raw_difficulty) difficulty_label = get_label(difficulty_profile) input_mode = payload.get("input_mode", "text") mode = payload.get("mode", "pitch_battle") model_mode = payload.get("model_mode", "premium_nvidia") session = session_manager.create_session( startup, persona, difficulty_profile, input_mode ) session["mode"] = mode session["model_mode"] = model_mode # Store normalized profile so scorecard and chat rounds can use it session["difficulty_profile"] = difficulty_profile session["difficulty_label"] = difficulty_label voice_pitch = payload.get("voice_pitch") if input_mode == "voice" and isinstance(voice_pitch, dict): session_manager.set_voice_pitch(session["session_id"], voice_pitch) mock_attack_tag, mock_ai_message = OPENING_MESSAGES.get( persona, OPENING_MESSAGES["technical_judge"] ) attack_tag = mock_attack_tag ai_message = mock_ai_message model_ok = False provider = "mock" used_model_mode = "mock_fallback" model_error: str | None = None try: messages = _build_opening_messages(startup, persona, difficulty_profile, mock_attack_tag) result = model_router.generate_opponent_response( messages, model_mode=model_mode, persona=persona, attack_tag=mock_attack_tag, ) if result.get("ok") and result.get("content"): ai_message = sanitize_model_output(result["content"]) model_ok = True provider = result.get("provider", "nvidia") used_model_mode = result.get("model_mode", model_mode) else: model_error = result.get("error") or "Model returned empty response" logger.warning("start_session: model not ok — using mock. error=%s", model_error) except Exception as exc: model_error = str(exc) logger.warning("start_session: model call raised — using mock. error=%s", exc) # Initialize battle_state with opening tag battle_flow.init_opening_state(session, attack_tag) session_manager.append_ai_message(session["session_id"], ai_message, attack_tag) # Phase 9.5: persist fully-populated session (opening message already in history). session_repository.save_session(session) _phase_start = get_battle_phase(1) return { "session_id": session["session_id"], "round": 1, "pressure_level": pressure_level(1), "battle_phase": _phase_start, "pressure_label": get_pressure_display_label(difficulty_profile, _phase_start), "attack_tag": attack_tag, "answer_hint": get_answer_checklist(attack_tag), "ai_message": ai_message, "model_mode": used_model_mode, "provider": provider, "model_ok": model_ok, "judge_action": "opening_question", "answer_quality": None, "topic_satisfied": None, "tag_attempt": 1, "soft_round_limit_reached": False, "battle_complete": False, "can_continue": True, "next_action": "continue", "difficulty_profile": difficulty_profile, "difficulty_label": difficulty_label, **({"model_error": model_error} if model_error else {}), } def handle_chat_round(payload: dict[str, Any]) -> dict[str, Any]: """Process a user reply and return the next judge question. Voice mode note: user_message may be a typed string or a transcript from voice input. battle_flow.classify_answer_quality() handles both the same way. """ session_id = payload.get("session_id", "") message = ( payload.get("user_message") or payload.get("message") or "" ).strip() session = session_manager.get_session(session_id) if not session: return { "session_id": session_id, "error": "Session not found", "round": 0, "pressure_level": "High", "attack_tag": "Session Error", "ai_message": "Session expired. Please start a new battle.", "model_ok": False, "provider": "none", "model_mode": "none", } if message: session_manager.append_user_message(session_id, message) persona = session.get("persona", "technical_judge") # Use stored normalized profile; fall back to normalizing legacy difficulty field difficulty_profile = session.get("difficulty_profile") or normalize_difficulty( session.get("difficulty", "practice") ) difficulty_label = session.get("difficulty_label") or get_label(difficulty_profile) startup = session.get("startup", {}) model_mode = session.get("model_mode", "premium_nvidia") next_round = session_manager.increment_round(session_id) soft_limit = next_round >= MAX_ROUNDS # Determine current attack tag from last AI message current_attack_tag = battle_flow.get_current_attack_tag(session) if not current_attack_tag: current_attack_tag = get_next_attack_tag(persona, next_round) # Classify answer quality (rule-based, no extra API call) answer_quality_result = {"quality": "partial", "reason": "No message provided.", "signals": []} if message: try: answer_quality_result = battle_flow.classify_answer_quality(message) except Exception as exc: logger.warning("battle_flow.classify_answer_quality error: %s", exc) quality = answer_quality_result.get("quality", "partial") # Decide judge action judge_action_result: dict[str, Any] = {} try: judge_action_result = battle_flow.decide_next_judge_action( session, current_attack_tag, quality, persona ) except Exception as exc: logger.warning("battle_flow.decide_next_judge_action error: %s", exc) judge_action_result = { "judge_action": "follow_up_same_tag", "next_attack_tag": current_attack_tag, "previous_attack_tag": current_attack_tag, "attempt_number_for_tag": 1, "topic_satisfied": False, "transition_note": "Fallback due to decision error.", } # Update session battle state try: battle_flow.update_battle_state(session, current_attack_tag, answer_quality_result, judge_action_result) except Exception as exc: logger.warning("battle_flow.update_battle_state error: %s", exc) attack_tag = judge_action_result.get("next_attack_tag", current_attack_tag) # Mock fallback followups = MOCK_FOLLOWUPS.get(persona, MOCK_FOLLOWUPS["technical_judge"]) index = min(next_round - 2, len(followups) - 1) mock_ai_message = followups[max(0, index)] ai_message = mock_ai_message model_ok = False provider = "mock" used_model_mode = "mock_fallback" model_error: str | None = None try: # Cap history sent to model; full history preserved in session for scorecard recent_history = _recent_history(session_id) messages = _build_followup_messages( startup, persona, difficulty_profile, attack_tag, recent_history, judge_action_result, answer_quality_result, ) result = model_router.generate_opponent_response( messages, model_mode=model_mode, persona=persona, attack_tag=attack_tag, ) if result.get("ok") and result.get("content"): ai_message = sanitize_model_output(result["content"]) model_ok = True provider = result.get("provider", "nvidia") used_model_mode = result.get("model_mode", model_mode) else: model_error = result.get("error") or "Model returned empty response" logger.warning("chat_round: model not ok — using mock. error=%s", model_error) except Exception as exc: model_error = str(exc) logger.warning("chat_round: model call raised — using mock. error=%s", exc) session_manager.append_ai_message(session_id, ai_message, attack_tag) # Phase 9.5: persist the new user + judge history entries (last 2 appended above). session_repository.update_round(session_id, session.get("history", [])[-2:]) input_mode = payload.get("input_mode") or session.get("input_mode", "text") voice_turn_id = payload.get("voice_turn_id", "") if input_mode == "voice" and voice_turn_id and message: voice_handler.confirm_voice_turn(session_id, voice_turn_id, message) _phase_chat = get_battle_phase(next_round) return { "session_id": session_id, "round": next_round, "pressure_level": pressure_level(next_round), "battle_phase": _phase_chat, "pressure_label": get_pressure_display_label(difficulty_profile, _phase_chat), "attack_tag": attack_tag, "answer_hint": get_answer_checklist(attack_tag), "micro_coach": _micro_coach_tip(message, quality, current_attack_tag, difficulty_profile), "ai_message": ai_message, "model_mode": used_model_mode, "provider": provider, "model_ok": model_ok, "answer_quality": quality, "answer_quality_reason": answer_quality_result.get("reason", ""), "judge_action": judge_action_result.get("judge_action", "follow_up_same_tag"), "previous_attack_tag": judge_action_result.get("previous_attack_tag", current_attack_tag), "topic_satisfied": judge_action_result.get("topic_satisfied", False), "tag_attempt": judge_action_result.get("attempt_number_for_tag", 1), "battle_complete": False, "can_continue": True, "next_action": "continue", "soft_round_limit_reached": soft_limit, "rounds_soft_limit_reached": soft_limit, "recommended_action": "end_battle" if soft_limit else None, "completion_message": ( "You have enough material for a scorecard. You can end the battle now or continue practicing." if soft_limit else None ), "difficulty_profile": difficulty_profile, "difficulty_label": difficulty_label, **({"model_error": model_error} if model_error else {}), } def handle_end_battle(payload: dict[str, Any]) -> dict[str, Any]: """Generate and return a Nemotron scorecard for the completed battle. Falls back to mock_scorecard if the model call or JSON parsing fails. Never crashes for a valid session. Voice mode note: Session history contains plain text regardless of input source. No changes are needed here when voice mode is integrated. """ session_id = payload.get("session_id", "") session = session_manager.get_session(session_id) if not session: return {"error": "Session not found"} try: scorecard = generate_claim_based_scorecard(session) except Exception as exc: logger.warning("handle_end_battle: generate_claim_based_scorecard raised: %s", exc) try: signals = extract_concrete_signals(session) scorecard = build_session_aware_fallback_scorecard( session, signals, f"Scorecard generation error: {type(exc).__name__}" ) except Exception as exc2: logger.warning("handle_end_battle: session-aware fallback also raised: %s", exc2) scorecard = mock_scorecard(session) scorecard["model_error"] = f"Scorecard generation error: {type(exc).__name__}" voice_summary = voice_handler.build_voice_delivery_summary(session) if voice_summary: scorecard["voice_delivery"] = voice_summary session["latest_scorecard"] = scorecard # Phase 9.5: persist scorecard and battle summary after in-memory mutation. session_repository.save_scorecard(session_id, scorecard) session_repository.update_battle_summary(session_id, { "total_rounds": session.get("round", 0), "final_round": session.get("round", 0), "status": "completed", "battle_complete": True, }) try: judge_verdict = build_judge_verdict(session, scorecard) session["judge_verdict"] = judge_verdict scorecard["judge_verdict"] = judge_verdict # Phase 9.5: persist judge verdict after it is stored on session. session_repository.save_judge_verdict(session_id, judge_verdict) except Exception as exc: logger.warning("handle_end_battle: judge verdict failed: %s", exc) return scorecard def handle_start_deal_phase(payload: dict[str, Any]) -> dict[str, Any]: """Start integrated deal phase from pitch session.""" session_id = str(payload.get("session_id", "")).strip() if not session_id: return {"error": "session_id is required"} session = session_manager.get_session(session_id) if not session: return {"error": "Session not found"} try: result = start_deal_phase(session) # Phase 9.5: persist the opening judge deal message after deal_history is populated. if isinstance(result, dict) and "error" not in result: deal_history = session.get("deal_history", []) if deal_history: session_repository.update_deal_round(session_id, deal_history[-1]) return result except Exception as exc: logger.warning("handle_start_deal_phase raised: %s", exc) return {"error": "Could not start deal phase."} def handle_deal_round(payload: dict[str, Any]) -> dict[str, Any]: """Process one deal negotiation round.""" session_id = str(payload.get("session_id", "")).strip() message = str(payload.get("user_message", "")).strip() input_mode = str(payload.get("input_mode", "text") or "text") voice_turn_id = str(payload.get("voice_turn_id", "") or "") if not session_id: return {"error": "session_id is required"} session = session_manager.get_session(session_id) if not session: return {"error": "Session not found"} if input_mode == "voice" and voice_turn_id and message: voice_handler.confirm_voice_turn(session_id, voice_turn_id, message) try: result = next_deal_round(session, message, input_mode=input_mode, voice_turn_id=voice_turn_id) # Phase 9.5: persist the new founder + judge deal entries (last 2 appended above). if isinstance(result, dict) and "error" not in result: deal_history = session.get("deal_history", []) new_entries = deal_history[-2:] if len(deal_history) >= 2 else deal_history if new_entries: session_repository.update_deal_round(session_id, new_entries) return result except Exception as exc: logger.warning("handle_deal_round raised: %s", exc) return {"error": "Could not process deal round."} def handle_end_deal(payload: dict[str, Any]) -> dict[str, Any]: """End deal phase and return deal + combined scorecards.""" session_id = str(payload.get("session_id", "")).strip() if not session_id: return {"error": "session_id is required"} session = session_manager.get_session(session_id) if not session: return {"error": "Session not found"} try: result = generate_deal_scorecard(session) # Phase 9.5: persist deal scorecard + combined scorecard after generation. if isinstance(result, dict) and "error" not in result: session_repository.save_deal_scorecard( session_id, result.get("deal_scorecard") or {}, result.get("combined_scorecard") or {}, ) return result except Exception as exc: logger.warning("handle_end_deal raised: %s", exc) return {"error": "Could not generate deal scorecard."} def handle_retry_weakest_start(payload: dict[str, Any]) -> dict[str, Any]: """Start a retry drill from the latest scorecard answer_to_retry.""" session_id = str(payload.get("session_id", "")).strip() if not session_id: return {"error": "session_id is required"} session = session_manager.get_session(session_id) if not session: return {"error": "Session not found"} try: result = retry_handler.start_retry_drill(session) # Phase 9.5: persist the newly created drill after it is stored on session. retry_id = result.get("retry_id") if isinstance(result, dict) else None if retry_id and "error" not in result: drill = session.get("retry_drills", {}).get(retry_id) if drill: session_repository.save_retry_drill(session_id, drill) return result except Exception as exc: logger.warning("handle_retry_weakest_start raised: %s", exc) return {"error": "Could not start retry drill. Try ending a battle first."} def handle_retry_weakest_submit(payload: dict[str, Any]) -> dict[str, Any]: """Evaluate a retry answer against the original weak answer.""" session_id = str(payload.get("session_id", "")).strip() retry_id = str(payload.get("retry_id", "")).strip() retry_answer = str(payload.get("retry_answer", "")).strip() input_mode = str(payload.get("input_mode", "text") or "text") voice_turn_id = str(payload.get("voice_turn_id", "") or "") if not session_id: return {"error": "session_id is required"} if not retry_id: return {"error": "retry_id is required"} session = session_manager.get_session(session_id) if not session: return {"error": "Session not found"} if input_mode == "voice" and voice_turn_id and retry_answer: voice_handler.confirm_voice_turn(session_id, voice_turn_id, retry_answer) try: result = retry_handler.evaluate_retry_answer( session, retry_id, retry_answer, input_mode=input_mode, voice_turn_id=voice_turn_id, ) # Phase 9.5: persist updated drill, scorecard, and refreshed verdict after eval. if isinstance(result, dict) and "error" not in result: drill = session.get("retry_drills", {}).get(retry_id) if drill: session_repository.save_retry_drill(session_id, drill) latest_scorecard = session.get("latest_scorecard") if isinstance(latest_scorecard, dict): session_repository.save_scorecard(session_id, latest_scorecard) latest_verdict = session.get("judge_verdict") if isinstance(latest_verdict, dict): session_repository.save_judge_verdict(session_id, latest_verdict) return result except Exception as exc: logger.warning("handle_retry_weakest_submit raised: %s", exc) return {"error": "Could not evaluate retry answer. Please try again."} def handle_reset_session(payload: dict[str, Any]) -> dict[str, Any]: """Clear a battle session.""" session_id = payload.get("session_id", "") session_manager.reset_session(session_id) return {"status": "reset"} _STARTUP_CONTEXT_FIELDS = ( "name", "problem", "target_users", "solution", "why_ai", "traction", "competitors", "ask", ) _STRUCTURE_PITCH_PROMPT = """You are structuring a founder's spoken or written startup pitch for a pitch battle app. Extract ONLY what the founder actually said or wrote. Do not hallucinate traction, competitors, funding, or users. If a field was not mentioned, use an empty string and list it in missing_fields. Return ONLY valid JSON. First character must be {. Last character must be }. No markdown. No explanation. No reasoning. Required JSON: { "startup_context": { "name": "", "problem": "", "target_users": "", "solution": "", "why_ai": "", "traction": "", "competitors": "", "ask": "" }, "missing_fields": [], "confidence": "low", "brief_summary": "" } confidence must be one of: low, medium, high brief_summary: one sentence summary of the pitch in the founder's words.""" def _normalize_startup_context(raw: dict[str, Any] | None) -> dict[str, str]: ctx = raw if isinstance(raw, dict) else {} return {field: str(ctx.get(field, "")).strip() for field in _STARTUP_CONTEXT_FIELDS} def _missing_startup_fields(ctx: dict[str, str]) -> list[str]: return [field for field in _STARTUP_CONTEXT_FIELDS if not ctx.get(field)] def _confidence_from_fill(ctx: dict[str, str]) -> str: filled = sum(1 for field in _STARTUP_CONTEXT_FIELDS if ctx.get(field)) if filled >= 5: return "high" if filled >= 3: return "medium" return "low" # --------------------------------------------------------------------------- # Deterministic structure confidence (Part A of confidence-consistency fix) # --------------------------------------------------------------------------- _CONFIDENCE_FIELD_WEIGHTS: dict[str, int] = { "name": 10, "problem": 15, "target_users": 12, "solution": 15, "why_ai": 10, "traction": 15, "competitors": 8, "ask": 10, } # If any of these fields is absent the score cannot exceed its cap value. _CONFIDENCE_CAPS: tuple[tuple[str, int], ...] = ( ("problem", 60), ("solution", 60), ("target_users", 70), ("traction", 74), # cap below 75 so missing traction → at most medium ("competitors", 92), # missing competitors → visible gap from 100; still high ("why_ai", 90), # missing/nonsense why_ai → max 90; one-word answers caught by min-words ("ask", 85), ) _FILLER_VALUES = frozenset({ "not specified", "n/a", "none", "unknown", "tbd", "-", "", "idk", "i don't know", "i dont know", "not sure", "na", "no idea", "dunno", "nothing", "?", "??", "???", "yes", "no", "nope", "yep", "to be determined", "to be decided", "will update", "coming soon", }) # Minimum real-word count for description fields — rejects single-word noise like "idk", "yes", "dunno". # Name, competitors, ask intentionally use min=1 (a single-word name or ask is valid). _CONF_MIN_WORDS: dict[str, int] = { "problem": 2, "solution": 2, "why_ai": 2, "traction": 2, "target_users": 2, } _CONFIDENCE_USER_SEG_RE = re.compile( r"\b(college students?|university students?|indie developers?|small businesses?|" r"enterprise|founders?|educators?|teachers?|researchers?|professionals?|" r"teams?|parents?|teenagers?|consumers?|startup founders?)\b", re.IGNORECASE, ) _CONFIDENCE_CONCRETE_ASK_RE = re.compile( r"(\$[\d,]+[kKmM]?|\d+[kK]\s*(?:usd|dollars?)?|mentorship|campus pilot|" r"equity partner|co.?founder|sponsorship|strategic partner)", re.IGNORECASE, ) def _field_is_filled(field: str, val: str) -> bool: """Return True when val contains genuine, substantive content. Two checks: 1. Not a known filler phrase ("idk", "n/a", "not specified", …) 2. At least _CONF_MIN_WORDS[field] real words — blocks single-word noise on description fields while allowing one-word names / ask phrases. """ clean = str(val or "").strip() if clean.lower() in _FILLER_VALUES: return False min_w = _CONF_MIN_WORDS.get(field, 1) return len(clean.split()) >= min_w def calculate_structure_confidence( startup_context: dict, raw_pitch_text: str = "", ) -> dict[str, Any]: """Deterministic confidence score from field completeness + raw-text evidence. For the same startup_context and raw_pitch_text the result is always identical — no randomness, no model opinion. """ ctx = startup_context or {} text = str(raw_pitch_text or "").strip() reasons: list[str] = [] # --- Field completeness --- score = 0 filled: list[str] = [] missing: list[str] = [] for field, weight in _CONFIDENCE_FIELD_WEIGHTS.items(): if _field_is_filled(field, str(ctx.get(field, "") or "")): score += weight filled.append(field) else: missing.append(field) # --- Signal bonus from raw pitch text (pure regex — deterministic) --- bonus = 0 number_hits = re.findall(r"\b\d[\d,]*\b", text) if len(number_hits) >= 3: bonus += 10 elif number_hits: bonus += 5 if _CONFIDENCE_USER_SEG_RE.search(text): bonus += 5 if _CONFIDENCE_CONCRETE_ASK_RE.search(text): bonus += 5 bonus = min(bonus, 20) score = min(score + bonus, 100) # --- Apply caps for critical missing fields --- for field, cap in _CONFIDENCE_CAPS: if field in missing: score = min(score, cap) score = max(0, min(100, score)) # --- Label --- if score >= 75: label = "high" elif score >= 45: label = "medium" else: label = "low" # --- Human-readable reasons --- strong = [f for f in ("problem", "solution", "target_users", "traction") if f in filled] if strong: reasons.append(f"Strong signals: {', '.join(strong)}") if bonus >= 10: reasons.append("Concrete numbers detected") elif bonus >= 5: reasons.append("Some evidence detected") if missing: reasons.append(f"Not in pitch: {', '.join(missing)}") return { "confidence": label, "confidence_score": score, "confidence_reasons": reasons, "missing_fields": missing, } def _structure_pitch_local_fallback(pitch_text: str) -> dict[str, Any]: """Heuristic extraction when Nemotron is unavailable.""" text = pitch_text.strip() lower = text.lower() sentences = [s.strip() for s in re.split(r"(?<=[.!?])\s+", text) if s.strip()] ctx = {field: "" for field in _STARTUP_CONTEXT_FIELDS} name_patterns = [ r"(?:called|named)\s+([A-Z][A-Za-z0-9]+(?:\s+[A-Z][A-Za-z0-9]+){0,3})", r"(?:building|build|creating|launching)\s+([A-Z][A-Za-z0-9]+(?:\s+[A-Z][A-Za-z0-9]+){0,2})", r"\b([A-Z][A-Za-z0-9]*(?:\s+[A-Z][A-Za-z0-9]*){0,2}\s+AI)\b", r"(?:we(?:'re| are))\s+([A-Z][A-Za-z0-9]+(?:\s+[A-Z][A-Za-z0-9]+){0,2})", ] for pattern in name_patterns: match = re.search(pattern, text) if match: ctx["name"] = match.group(1).strip() break problem_kw = ("problem", "pain", "miss", "struggle", "hard to", "difficult", "scattered", "frustrat") for sentence in sentences: sl = sentence.lower() if any(kw in sl for kw in problem_kw): ctx["problem"] = sentence break user_kw = ("students", "founders", "users", "customers", "developers", "teams", "college") for sentence in sentences: sl = sentence.lower() if any(kw in sl for kw in user_kw): ctx["target_users"] = sentence break if not ctx["target_users"]: for kw in user_kw: if kw in lower: ctx["target_users"] = f"Targeting {kw}." break solution_kw = ("we build", "we're building", "we are building", "platform", "app", "product", "tool") for sentence in sentences: sl = sentence.lower() if any(kw in sl for kw in solution_kw): ctx["solution"] = sentence break if "ai" in lower or "machine learning" in lower or "model" in lower: for sentence in sentences: sl = sentence.lower() if "ai" in sl or "model" in sl or "machine learning" in sl: ctx["why_ai"] = sentence break if not ctx["why_ai"]: ctx["why_ai"] = "Uses AI as described in the pitch." traction_patterns = [ r"\b\d[\d,]*\+?\s*(?:users|students|customers|signups|downloads|pilots?)\b", r"\b(?:tested with|pilot with|revenue|mrr|arr)\b[^.?!]*[.?!]?", r"\b\d+%\b[^.?!]*[.?!]?", ] for pattern in traction_patterns: match = re.search(pattern, text, re.IGNORECASE) if match: ctx["traction"] = match.group(0).strip().rstrip(".") break competitor_kw = ("competitor", "versus", " vs ", "compared to", "alternative", "luma", "linkedin") for sentence in sentences: sl = sentence.lower() if any(kw in sl for kw in competitor_kw): ctx["competitors"] = sentence break ask_kw = ("funding", "invest", "mentor", "pilot", "sponsor", "partnership", "raise", "support") for sentence in sentences: sl = sentence.lower() if any(kw in sl for kw in ask_kw): ctx["ask"] = sentence break summary = " ".join(sentences[:2])[:220] if sentences else text[:220] conf = calculate_structure_confidence(ctx, pitch_text) return { "ok": True, "startup_context": ctx, "missing_fields": conf["missing_fields"], "confidence": conf["confidence"], "confidence_score": conf["confidence_score"], "confidence_reasons": conf["confidence_reasons"], "brief_summary": summary, "source": "local_fallback", } def _parse_structure_pitch_response(raw: str) -> dict[str, Any] | None: """Parse Nemotron extraction output. Confidence is NOT taken from the model — it is calculated deterministically by the caller via calculate_structure_confidence.""" parsed, _ = parse_model_json(raw) if not isinstance(parsed, dict): return None ctx = _normalize_startup_context(parsed.get("startup_context")) summary = str(parsed.get("brief_summary", "")).strip() if not summary: summary = ctx.get("solution") or ctx.get("problem") or "" return { "startup_context": ctx, "brief_summary": summary, } def handle_structure_pitch(payload: dict[str, Any]) -> dict[str, Any]: """Structure free-form pitch text into startup_context fields.""" pitch_text = str(payload.get("pitch_text", "")).strip() if not pitch_text: return {"ok": False, "error": "pitch_text is required and must be non-empty."} if len(pitch_text) < 20: return {"ok": False, "error": "pitch_text is too short. Add a few more details about your startup."} model_mode = payload.get("model_mode", "premium_nvidia") messages = [ {"role": "system", "content": _STRUCTURE_PITCH_PROMPT}, {"role": "user", "content": f"Founder pitch:\n\n{pitch_text[:6000]}"}, ] try: result = model_router.generate_structure_pitch_response(messages, model_mode=model_mode) if result.get("ok") and result.get("content"): structured = _parse_structure_pitch_response(result["content"]) if structured is None: repair = model_router.generate_structure_pitch_repair_response( result["content"], model_mode=model_mode ) if repair.get("ok") and repair.get("content"): structured = _parse_structure_pitch_response(repair["content"]) if structured is not None: conf = calculate_structure_confidence( structured["startup_context"], pitch_text ) return { "ok": True, "startup_context": structured["startup_context"], "missing_fields": conf["missing_fields"], "confidence": conf["confidence"], "confidence_score": conf["confidence_score"], "confidence_reasons": conf["confidence_reasons"], "brief_summary": structured["brief_summary"], "source": "nemotron", } logger.warning("structure_pitch: Nemotron returned unparseable JSON — using local fallback") except Exception as exc: logger.warning("structure_pitch: Nemotron call failed — using local fallback: %s", exc) return _structure_pitch_local_fallback(pitch_text) def handle_voice_pitch(payload: dict[str, Any]) -> dict[str, Any]: """Process opening spoken pitch audio via Nemotron Omni.""" audio = payload.get("audio") or payload.get("audio_base64") or "" audio_format = payload.get("audio_format", "webm") return voice_handler.process_voice_pitch(str(audio), str(audio_format)) def handle_voice_turn(payload: dict[str, Any]) -> dict[str, Any]: """Process one spoken battle answer — returns transcript for confirmation.""" session_id = payload.get("session_id", "") audio = payload.get("audio") or payload.get("audio_base64") or "" audio_format = payload.get("audio_format", "webm") return voice_handler.process_voice_turn(session_id, str(audio), str(audio_format)) def handle_deal_session_placeholder(_payload: dict[str, Any] | None = None) -> dict[str, str]: """Reserved endpoint for Deal Battle mode.""" return { "status": "not_implemented", "message": ( "Deal Battle endpoint is reserved and will be connected " "in a later phase." ), } def handle_deck_critique_placeholder(_payload: dict[str, Any] | None = None) -> dict[str, str]: """Reserved endpoint for pitch deck critique.""" return { "status": "not_implemented", "message": ( "Deck critique endpoint is reserved and will be connected " "after MiniCPM-V vision integration." ), }