PITCHFIGHT_AI / core /voice_handler.py
Aspectgg's picture
Prepare PitchFight AI completion
8fab536
Raw
History Blame Contribute Delete
25 kB
"""Voice input layer for PitchFight AI — Phase 7.
Converts spoken audio to confirmed text + delivery cues via Nemotron Omni API.
Does not replace the battle engine — only produces transcripts for existing flows.
"""
from __future__ import annotations
import base64
import binascii
import logging
import re
import shutil
import subprocess
import tempfile
import uuid
from pathlib import Path
from typing import Any
from core import nvidia_client
from core import session_manager
from core.json_utils import parse_model_json, safe_json_parse
from core.nvidia_client import OmniAudioError
logger = logging.getLogger(__name__)
_FILLER_PATTERNS = [
r"\bum\b", r"\buh\b", r"\buhm\b", r"\ber\b", r"\bah\b",
r"\blike\b", r"\byou know\b", r"\bkind of\b", r"\bsort of\b",
r"\bbasically\b", r"\bliterally\b", r"\bactually\b", r"\bso+\b",
r"\bi mean\b", r"\bwell\b",
]
_VOICE_PITCH_PROMPT = """The founder just recorded an opening startup pitch.
Listen to the audio carefully and extract only what was actually said.
Return ONLY valid JSON.
First character must be {.
Last character must be }.
No markdown.
No explanation.
No reasoning.
Do not hallucinate.
Do not invent traction, users, revenue, competitors, or market data.
If a field was not mentioned, return an empty string.
Do NOT claim emotion, stress, anxiety, or psychological state detection.
Only report observable delivery cues such as filler words, pauses, pacing, repetition, self-corrections, and clarity.
Required JSON:
{
"transcript": "exact words spoken",
"extracted": {
"name": "startup name or empty string",
"problem": "problem described or empty string",
"target_users": "who they are building for or empty string",
"solution": "what the product does or empty string",
"why_ai": "why AI is needed or empty string",
"traction": "any validation/users/pilots mentioned or empty string",
"competitors": "any competitors named or empty string",
"ask": "what they are asking for or empty string"
},
"delivery_observations": {
"filler_words": ["list of filler words heard"],
"pace": "rushed / measured / slow / unclear",
"clarity": "one sentence observation based only on delivery",
"confidence_signal": "confident / mixed / hesitant / unclear based only on observable delivery cues",
"delivery_note": "one concise sentence"
},
"extraction_confidence": "high / medium / low"
}"""
_VOICE_TURN_PROMPT = """Transcribe this spoken battle answer exactly as spoken.
Return ONLY valid JSON.
First character must be {.
Last character must be }.
No markdown.
No explanation.
No reasoning.
Do not interpret or expand the answer.
Do not add words not spoken.
Do NOT claim emotion, stress, anxiety, or psychological state detection.
Only report observable delivery cues such as filler words, pauses, pacing, repetition, self-corrections, and clarity.
Required JSON:
{
"transcript": "exact words spoken",
"delivery_note": "one concise sentence about observable delivery cues. If clean, say Clean delivery.",
"word_count": 0,
"delivery_cues": {
"filler_words": [],
"pace": "rushed / measured / slow / unclear",
"clarity": "clear / mostly clear / unclear",
"repetition": "low / medium / high",
"self_corrections": 0,
"confidence_signal": "confident delivery / mixed delivery / hesitant delivery / unclear"
}
}"""
_EXTRACTED_FIELDS = (
"name", "problem", "target_users", "solution",
"why_ai", "traction", "competitors", "ask",
)
_WAV_RIFF = b"RIFF"
_WEBM_MAGIC = b"\x1aE\xdf\xa3"
_OGG_MAGIC = b"OggS"
_MP3_ID3 = b"ID3"
# Minimum decoded audio size to treat as a real recording. Anything smaller is an
# empty/instant tap that NVIDIA Omni rejects with HTTP 400. ~1s of webm/opus is several KB.
_MIN_AUDIO_BYTES = 1024
_MP3_SYNC = b"\xff\xfb"
def _detect_audio_magic(data: bytes) -> str:
if len(data) >= 4 and data[:4] == _WAV_RIFF:
return "wav"
if len(data) >= 4 and data[:4] == _WEBM_MAGIC:
return "webm"
if len(data) >= 4 and data[:4] == _OGG_MAGIC:
return "ogg"
if len(data) >= 3 and data[:3] == _MP3_ID3:
return "mp3"
if len(data) >= 2 and data[:2] == _MP3_SYNC:
return "mp3"
return "unknown"
def _magic_hex(data: bytes, n: int = 8) -> str:
return data[:n].hex() if data else ""
def _convert_audio_to_wav_ffmpeg(input_bytes: bytes, input_ext: str) -> bytes | None:
ffmpeg = shutil.which("ffmpeg")
if not ffmpeg:
return None
with tempfile.TemporaryDirectory() as tmp:
inp = Path(tmp) / f"input.{input_ext or 'webm'}"
out = Path(tmp) / "output.wav"
inp.write_bytes(input_bytes)
cmd = [
ffmpeg,
"-y",
"-hide_banner",
"-loglevel",
"error",
"-i",
str(inp),
"-ar",
"16000",
"-ac",
"1",
"-c:a",
"pcm_s16le",
str(out),
]
try:
result = subprocess.run(cmd, capture_output=True, timeout=45, check=False)
except (OSError, subprocess.TimeoutExpired) as exc:
logger.warning("voice_handler: ffmpeg conversion failed — %s", exc)
return None
if result.returncode != 0 or not out.is_file():
stderr = (result.stderr or b"").decode("utf-8", errors="replace")[:200]
logger.warning("voice_handler: ffmpeg exit=%s stderr=%s", result.returncode, stderr)
return None
return out.read_bytes()
def normalize_audio_for_omni(
audio_base64: str,
audio_format: str,
mode: str = "voice_extraction",
) -> dict[str, Any]:
"""Decode browser audio and normalize to WAV for NVIDIA Omni when needed."""
fmt = str(audio_format or "webm").strip().lower().lstrip(".")
if fmt not in {"webm", "wav", "mp3", "m4a", "ogg"}:
return {
"error": f"Unsupported audio_format: {fmt}",
"audio_format": fmt,
"mode": mode,
}
try:
raw = base64.b64decode(audio_base64.strip(), validate=True)
except (binascii.Error, ValueError) as exc:
return {
"error": "Invalid base64 audio payload",
"detail": str(exc),
"audio_format": fmt,
"mode": mode,
}
if not raw:
return {"error": "Decoded audio is empty", "audio_format": fmt, "mode": mode}
# Guard against empty/instant taps (e.g. a few-byte payload). These are not real
# recordings and NVIDIA Omni rejects them with HTTP 400 ("Invalid audio file").
# Catch it here and return a clean, user-facing message instead of an API error.
if len(raw) < _MIN_AUDIO_BYTES:
logger.info(
"voice_handler: rejecting too-small audio mode=%s bytes=%d (min=%d)",
mode, len(raw), _MIN_AUDIO_BYTES,
)
return {
"error": "That recording was too short. Tap the mic, speak, then tap again to stop.",
"audio_format": fmt,
"byte_size": len(raw),
"mode": mode,
}
detected = _detect_audio_magic(raw)
logger.info(
"voice_handler: normalize audio mode=%s declared=%s detected=%s bytes=%d magic=%s",
mode,
fmt,
detected,
len(raw),
_magic_hex(raw),
)
if detected == "wav" or (fmt == "wav" and raw[:4] == _WAV_RIFF):
return {
"audio_base64": base64.b64encode(raw).decode("ascii"),
"audio_format": "wav",
"source_format": fmt,
"byte_size": len(raw),
"converted": fmt != "wav" and detected == "wav",
"mode": mode,
}
source_ext = detected if detected != "unknown" else fmt
wav_bytes = _convert_audio_to_wav_ffmpeg(raw, source_ext)
if wav_bytes and wav_bytes[:4] == _WAV_RIFF:
logger.info(
"voice_handler: converted %s → wav (%d → %d bytes)",
source_ext,
len(raw),
len(wav_bytes),
)
return {
"audio_base64": base64.b64encode(wav_bytes).decode("ascii"),
"audio_format": "wav",
"source_format": fmt,
"byte_size": len(wav_bytes),
"converted": True,
"mode": mode,
}
# ffmpeg unavailable or conversion failed — pass through original browser audio
# (restores pre-stability behavior; Omni accepts webm on many setups).
send_fmt = source_ext if source_ext != "unknown" else fmt
logger.info(
"voice_handler: passthrough audio mode=%s format=%s bytes=%d (ffmpeg=%s)",
mode,
send_fmt,
len(raw),
bool(shutil.which("ffmpeg")),
)
return {
"audio_base64": base64.b64encode(raw).decode("ascii"),
"audio_format": send_fmt,
"source_format": fmt,
"byte_size": len(raw),
"converted": False,
"mode": mode,
}
def _call_omni_with_normalized_audio(
prompt: str,
audio_base64: str,
audio_format: str,
mode: str,
) -> str | dict[str, Any]:
normalized = normalize_audio_for_omni(audio_base64, audio_format, mode=mode)
if normalized.get("error"):
return normalized
def _invoke(payload: dict[str, Any]) -> str:
return nvidia_client.call_omni_audio_json(
prompt,
payload["audio_base64"],
payload["audio_format"],
mode=mode,
source_format=payload.get("source_format", audio_format),
decoded_bytes=payload.get("byte_size"),
)
try:
return _invoke(normalized)
except OmniAudioError as exc:
# If passthrough failed and ffmpeg can convert, retry once as WAV.
if not normalized.get("converted"):
raw_bytes = base64.b64decode(normalized["audio_base64"])
detected = _detect_audio_magic(raw_bytes)
source_ext = detected if detected != "unknown" else normalized.get("source_format", audio_format)
wav_bytes = _convert_audio_to_wav_ffmpeg(raw_bytes, source_ext)
if wav_bytes and wav_bytes[:4] == _WAV_RIFF:
logger.info("voice_handler: Omni rejected passthrough; retrying as wav")
retry_payload = {
"audio_base64": base64.b64encode(wav_bytes).decode("ascii"),
"audio_format": "wav",
"source_format": normalized.get("source_format", audio_format),
"byte_size": len(wav_bytes),
"converted": True,
}
try:
return _invoke(retry_payload)
except OmniAudioError as retry_exc:
err = retry_exc.to_error_dict()
err["source_format"] = normalized.get("source_format", audio_format)
err["converted"] = True
err["error"] = "Voice transcription failed. Try recording again or type your answer."
return err
err = exc.to_error_dict()
err["source_format"] = normalized.get("source_format", audio_format)
err["converted"] = normalized.get("converted", False)
err["error"] = "Voice transcription failed. Try recording again or type your answer."
return err
except ValueError as exc:
return {"error": str(exc)}
except RuntimeError as exc:
return {"error": str(exc)}
def count_filler_words(transcript: str) -> list[str]:
"""Return filler words/phrases found in transcript (case-insensitive)."""
if not transcript:
return []
text = transcript.lower()
found: list[str] = []
for pattern in _FILLER_PATTERNS:
if re.search(pattern, text, re.IGNORECASE):
label = pattern.strip(r"\b").replace("\\b", "")
if label not in found:
found.append(label)
return found
def estimate_word_count(transcript: str) -> int:
"""Count words in transcript."""
if not transcript:
return 0
return len(re.findall(r"\b\w+\b", transcript))
def detect_self_corrections(transcript: str) -> int:
"""Count simple self-correction cues in transcript."""
if not transcript:
return 0
patterns = [
r"\bi mean\b", r"\bwait\b", r"\bsorry\b", r"\bno,\s", r"\bactually\b",
r"\blet me rephrase\b", r"\bwhat i meant\b",
]
count = 0
lower = transcript.lower()
for p in patterns:
count += len(re.findall(p, lower))
return count
def _detect_repeated_phrases(transcript: str) -> int:
"""Count repeated 3-word phrases (simple repetition signal)."""
words = re.findall(r"\b\w+\b", (transcript or "").lower())
if len(words) < 6:
return 0
trigrams: dict[str, int] = {}
for i in range(len(words) - 2):
tri = " ".join(words[i : i + 3])
trigrams[tri] = trigrams.get(tri, 0) + 1
return sum(1 for c in trigrams.values() if c > 1)
def sanitize_voice_json(data: dict[str, Any]) -> dict[str, Any]:
"""Normalize voice JSON fields with safe delivery-only wording."""
if not isinstance(data, dict):
return {}
out = dict(data)
out["transcript"] = str(out.get("transcript", "")).strip()
extracted = out.get("extracted")
if isinstance(extracted, dict):
out["extracted"] = {
k: str(extracted.get(k, "")).strip() for k in _EXTRACTED_FIELDS
}
delivery = out.get("delivery_observations")
if isinstance(delivery, dict):
fillers = delivery.get("filler_words", [])
out["delivery_observations"] = {
"filler_words": [str(f).strip() for f in fillers if str(f).strip()][:20]
if isinstance(fillers, list) else [],
"pace": str(delivery.get("pace", "")).strip() or "unclear",
"clarity": str(delivery.get("clarity", "")).strip(),
"confidence_signal": str(delivery.get("confidence_signal", "")).strip() or "unclear",
"delivery_note": str(delivery.get("delivery_note", "")).strip(),
}
conf = str(out.get("extraction_confidence", "")).strip().lower()
if conf not in ("high", "medium", "low"):
conf = "medium"
out["extraction_confidence"] = conf
return out
def _sanitize_turn_json(data: dict[str, Any]) -> dict[str, Any]:
if not isinstance(data, dict):
return {}
transcript = str(data.get("transcript", "")).strip()
cues_raw = data.get("delivery_cues", {})
cues: dict[str, Any] = {}
if isinstance(cues_raw, dict):
fillers = cues_raw.get("filler_words", [])
cues = {
"filler_words": [str(f).strip() for f in fillers if str(f).strip()][:20]
if isinstance(fillers, list) else [],
"pace": str(cues_raw.get("pace", "")).strip() or "unclear",
"clarity": str(cues_raw.get("clarity", "")).strip() or "unclear",
"repetition": str(cues_raw.get("repetition", "")).strip() or "low",
"self_corrections": int(cues_raw.get("self_corrections", 0) or 0),
"confidence_signal": str(cues_raw.get("confidence_signal", "")).strip() or "unclear",
}
local_fillers = count_filler_words(transcript)
if not cues.get("filler_words") and local_fillers:
cues["filler_words"] = local_fillers
if cues.get("self_corrections", 0) == 0:
cues["self_corrections"] = detect_self_corrections(transcript)
rep_count = _detect_repeated_phrases(transcript)
if cues.get("repetition") == "low" and rep_count >= 2:
cues["repetition"] = "medium"
return {
"transcript": transcript,
"delivery_note": str(data.get("delivery_note", "")).strip() or "Clean delivery.",
"word_count": estimate_word_count(transcript),
"delivery_cues": cues,
}
def _parse_pitch_json(raw: str) -> dict[str, Any] | None:
parsed, _ = parse_model_json(raw)
if not isinstance(parsed, dict) or not parsed:
parsed = safe_json_parse(raw)
if not isinstance(parsed, dict) or not parsed:
return None
transcript = str(parsed.get("transcript", "")).strip()
if not transcript:
return None
sanitized = sanitize_voice_json(parsed)
sanitized["transcript"] = transcript
return sanitized
def _parse_turn_json(raw: str) -> dict[str, Any] | None:
parsed, _ = parse_model_json(raw)
if not isinstance(parsed, dict) or not parsed:
parsed = safe_json_parse(raw)
if not isinstance(parsed, dict) or not parsed:
return None
transcript = str(parsed.get("transcript", "")).strip()
if not transcript:
return None
return _sanitize_turn_json(parsed)
def _repair_pitch_json(raw_bad: str) -> dict[str, Any] | None:
repair_prompt = (
"Convert the input into valid JSON matching this schema exactly. "
"Return ONLY JSON. First char { last char }.\n"
'{"transcript":"","extracted":{"name":"","problem":"","target_users":"",'
'"solution":"","why_ai":"","traction":"","competitors":"","ask":""},'
'"delivery_observations":{"filler_words":[],"pace":"","clarity":"",'
'"confidence_signal":"","delivery_note":""},"extraction_confidence":"medium"}\n\n'
+ raw_bad[:4000]
)
try:
content = nvidia_client.generate_nemotron_response(
[{"role": "user", "content": repair_prompt}],
mode="voice_extraction_repair",
)
return _parse_pitch_json(content)
except Exception as exc:
logger.warning("voice_handler: pitch repair failed — %s", exc)
return None
def _repair_turn_json(raw_bad: str) -> dict[str, Any] | None:
repair_prompt = (
"Convert the input into valid JSON matching this schema exactly. "
"Return ONLY JSON. First char { last char }.\n"
'{"transcript":"","delivery_note":"","word_count":0,'
'"delivery_cues":{"filler_words":[],"pace":"","clarity":"",'
'"repetition":"low","self_corrections":0,"confidence_signal":""}}\n\n'
+ raw_bad[:3000]
)
try:
content = nvidia_client.generate_nemotron_response(
[{"role": "user", "content": repair_prompt}],
mode="voice_turn_repair",
)
return _parse_turn_json(content)
except Exception as exc:
logger.warning("voice_handler: turn repair failed — %s", exc)
return None
def process_voice_pitch(audio_base64: str, audio_format: str) -> dict[str, Any]:
"""Opening spoken pitch → transcript + extracted startup fields + delivery cues."""
if not nvidia_client.is_configured():
return {"error": "NVIDIA_API_KEY is not configured on the server."}
raw = _call_omni_with_normalized_audio(
_VOICE_PITCH_PROMPT, audio_base64, audio_format, mode="voice_extraction"
)
if isinstance(raw, dict):
return raw
parsed = _parse_pitch_json(raw)
if parsed is None:
logger.warning("voice_handler: pitch parse failed, attempting repair")
parsed = _repair_pitch_json(raw)
if parsed is None:
return {"error": "Could not parse voice pitch response from Nemotron Omni."}
return parsed
def process_voice_turn(
session_id: str,
audio_base64: str,
audio_format: str,
) -> dict[str, Any]:
"""One battle answer audio → transcript + delivery note (pending confirmation)."""
session = session_manager.get_session(session_id)
if not session:
return {"error": "Session not found", "session_id": session_id}
if not nvidia_client.is_configured():
return {"error": "NVIDIA_API_KEY is not configured on the server.", "session_id": session_id}
raw = _call_omni_with_normalized_audio(
_VOICE_TURN_PROMPT, audio_base64, audio_format, mode="voice_turn"
)
if isinstance(raw, dict):
raw["session_id"] = session_id
return raw
parsed = _parse_turn_json(raw)
if parsed is None:
logger.warning("voice_handler: turn parse failed, attempting repair")
parsed = _repair_turn_json(raw)
if parsed is None:
return {"error": "Could not parse voice turn response from Nemotron Omni.", "session_id": session_id}
voice_turn_id = str(uuid.uuid4())
transcript = parsed["transcript"]
fillers = parsed["delivery_cues"].get("filler_words") or count_filler_words(transcript)
filler_count = len(fillers)
turn_record = {
"voice_turn_id": voice_turn_id,
"transcript": transcript,
"delivery_note": parsed.get("delivery_note", ""),
"word_count": parsed.get("word_count", estimate_word_count(transcript)),
"delivery_cues": parsed.get("delivery_cues", {}),
"filler_word_count": filler_count,
"confirmed": False,
}
session_manager.store_pending_voice_turn(session_id, turn_record)
return {
"session_id": session_id,
"voice_turn_id": voice_turn_id,
"transcript": transcript,
"delivery_note": turn_record["delivery_note"],
"word_count": turn_record["word_count"],
"delivery_cues": turn_record["delivery_cues"],
}
def confirm_voice_turn(
session_id: str,
voice_turn_id: str,
final_transcript: str,
) -> bool:
"""Mark a pending voice turn as confirmed with the user's final transcript."""
return session_manager.confirm_voice_turn(session_id, voice_turn_id, final_transcript)
def _is_generic_delivery_note(note: str) -> bool:
"""Skip filler delivery notes that clutter the scorecard UI."""
n = (note or "").strip().lower().rstrip(".")
return n in ("clean delivery", "clean delivery.", "")
def build_voice_delivery_summary(session: dict) -> dict[str, Any] | None:
"""Aggregate confirmed voice turns into a scorecard delivery summary (local only)."""
confirmed = session.get("confirmed_voice_turns") or []
voice_pitch = session.get("voice_pitch")
if not confirmed and not voice_pitch:
return None
all_fillers: list[str] = []
delivery_notes: list[str] = []
pace_counts: dict[str, int] = {}
clarity_signals: list[str] = []
confidence_signals: list[str] = []
if isinstance(voice_pitch, dict):
obs = voice_pitch.get("delivery_observations") or {}
if isinstance(obs, dict):
note = str(obs.get("delivery_note", "")).strip()
if note and not _is_generic_delivery_note(note):
delivery_notes.append(f"Opening pitch: {note}")
for f in obs.get("filler_words") or []:
if str(f).strip():
all_fillers.append(str(f).strip())
pace = str(obs.get("pace", "")).strip()
if pace:
pace_counts[pace] = pace_counts.get(pace, 0) + 1
clarity = str(obs.get("clarity", "")).strip()
if clarity:
clarity_signals.append(clarity)
conf = str(obs.get("confidence_signal", "")).strip()
if conf:
confidence_signals.append(conf)
for turn in confirmed:
if not isinstance(turn, dict):
continue
note = str(turn.get("delivery_note", "")).strip()
if note and not _is_generic_delivery_note(note):
delivery_notes.append(note)
cues = turn.get("delivery_cues") or {}
if isinstance(cues, dict):
for f in cues.get("filler_words") or []:
if str(f).strip():
all_fillers.append(str(f).strip())
pace = str(cues.get("pace", "")).strip()
if pace:
pace_counts[pace] = pace_counts.get(pace, 0) + 1
clarity = str(cues.get("clarity", "")).strip()
if clarity:
clarity_signals.append(clarity)
conf = str(cues.get("confidence_signal", "")).strip()
if conf:
confidence_signals.append(conf)
filler_unique = list(dict.fromkeys(all_fillers))
total_fillers = len(all_fillers)
avg_pace = max(pace_counts, key=pace_counts.get) if pace_counts else "unclear"
if total_fillers == 0 and len(confirmed) >= 2:
overall = "Voice delivery was generally clear across your spoken answers."
elif total_fillers > 5:
overall = (
f"Filler words appeared often ({total_fillers} total). "
"Practice pausing briefly instead of using fillers before key claims."
)
elif delivery_notes:
overall = "Review the delivery notes below and practice smoother pacing on your weakest round."
else:
overall = "Voice turns recorded — delivery was acceptable for a practice session."
return {
"total_voice_turns": len(confirmed),
"total_filler_words": total_fillers,
"filler_word_list": filler_unique[:12],
"delivery_notes": list(dict.fromkeys(delivery_notes))[:4],
"average_pace": avg_pace,
"clarity_signal": clarity_signals[-1] if clarity_signals else "unclear",
"confidence_signal": confidence_signals[-1] if confidence_signals else "unclear",
"overall_delivery_feedback": overall,
}