| """Voice input layer for PitchFight AI — Phase 7. |
| |
| Converts spoken audio to confirmed text + delivery cues via Nemotron Omni API. |
| Does not replace the battle engine — only produces transcripts for existing flows. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import base64 |
| import binascii |
| import logging |
| import re |
| import shutil |
| import subprocess |
| import tempfile |
| import uuid |
| from pathlib import Path |
| from typing import Any |
|
|
| from core import nvidia_client |
| from core import session_manager |
| from core.json_utils import parse_model_json, safe_json_parse |
| from core.nvidia_client import OmniAudioError |
|
|
| logger = logging.getLogger(__name__) |
|
|
| _FILLER_PATTERNS = [ |
| r"\bum\b", r"\buh\b", r"\buhm\b", r"\ber\b", r"\bah\b", |
| r"\blike\b", r"\byou know\b", r"\bkind of\b", r"\bsort of\b", |
| r"\bbasically\b", r"\bliterally\b", r"\bactually\b", r"\bso+\b", |
| r"\bi mean\b", r"\bwell\b", |
| ] |
|
|
| _VOICE_PITCH_PROMPT = """The founder just recorded an opening startup pitch. |
| |
| Listen to the audio carefully and extract only what was actually said. |
| |
| Return ONLY valid JSON. |
| First character must be {. |
| Last character must be }. |
| No markdown. |
| No explanation. |
| No reasoning. |
| Do not hallucinate. |
| Do not invent traction, users, revenue, competitors, or market data. |
| If a field was not mentioned, return an empty string. |
| |
| Do NOT claim emotion, stress, anxiety, or psychological state detection. |
| Only report observable delivery cues such as filler words, pauses, pacing, repetition, self-corrections, and clarity. |
| |
| Required JSON: |
| |
| { |
| "transcript": "exact words spoken", |
| "extracted": { |
| "name": "startup name or empty string", |
| "problem": "problem described or empty string", |
| "target_users": "who they are building for or empty string", |
| "solution": "what the product does or empty string", |
| "why_ai": "why AI is needed or empty string", |
| "traction": "any validation/users/pilots mentioned or empty string", |
| "competitors": "any competitors named or empty string", |
| "ask": "what they are asking for or empty string" |
| }, |
| "delivery_observations": { |
| "filler_words": ["list of filler words heard"], |
| "pace": "rushed / measured / slow / unclear", |
| "clarity": "one sentence observation based only on delivery", |
| "confidence_signal": "confident / mixed / hesitant / unclear based only on observable delivery cues", |
| "delivery_note": "one concise sentence" |
| }, |
| "extraction_confidence": "high / medium / low" |
| }""" |
|
|
| _VOICE_TURN_PROMPT = """Transcribe this spoken battle answer exactly as spoken. |
| |
| Return ONLY valid JSON. |
| First character must be {. |
| Last character must be }. |
| No markdown. |
| No explanation. |
| No reasoning. |
| Do not interpret or expand the answer. |
| Do not add words not spoken. |
| |
| Do NOT claim emotion, stress, anxiety, or psychological state detection. |
| Only report observable delivery cues such as filler words, pauses, pacing, repetition, self-corrections, and clarity. |
| |
| Required JSON: |
| |
| { |
| "transcript": "exact words spoken", |
| "delivery_note": "one concise sentence about observable delivery cues. If clean, say Clean delivery.", |
| "word_count": 0, |
| "delivery_cues": { |
| "filler_words": [], |
| "pace": "rushed / measured / slow / unclear", |
| "clarity": "clear / mostly clear / unclear", |
| "repetition": "low / medium / high", |
| "self_corrections": 0, |
| "confidence_signal": "confident delivery / mixed delivery / hesitant delivery / unclear" |
| } |
| }""" |
|
|
| _EXTRACTED_FIELDS = ( |
| "name", "problem", "target_users", "solution", |
| "why_ai", "traction", "competitors", "ask", |
| ) |
|
|
| _WAV_RIFF = b"RIFF" |
| _WEBM_MAGIC = b"\x1aE\xdf\xa3" |
| _OGG_MAGIC = b"OggS" |
| _MP3_ID3 = b"ID3" |
|
|
| |
| |
| _MIN_AUDIO_BYTES = 1024 |
| _MP3_SYNC = b"\xff\xfb" |
|
|
|
|
| def _detect_audio_magic(data: bytes) -> str: |
| if len(data) >= 4 and data[:4] == _WAV_RIFF: |
| return "wav" |
| if len(data) >= 4 and data[:4] == _WEBM_MAGIC: |
| return "webm" |
| if len(data) >= 4 and data[:4] == _OGG_MAGIC: |
| return "ogg" |
| if len(data) >= 3 and data[:3] == _MP3_ID3: |
| return "mp3" |
| if len(data) >= 2 and data[:2] == _MP3_SYNC: |
| return "mp3" |
| return "unknown" |
|
|
|
|
| def _magic_hex(data: bytes, n: int = 8) -> str: |
| return data[:n].hex() if data else "" |
|
|
|
|
| def _convert_audio_to_wav_ffmpeg(input_bytes: bytes, input_ext: str) -> bytes | None: |
| ffmpeg = shutil.which("ffmpeg") |
| if not ffmpeg: |
| return None |
| with tempfile.TemporaryDirectory() as tmp: |
| inp = Path(tmp) / f"input.{input_ext or 'webm'}" |
| out = Path(tmp) / "output.wav" |
| inp.write_bytes(input_bytes) |
| cmd = [ |
| ffmpeg, |
| "-y", |
| "-hide_banner", |
| "-loglevel", |
| "error", |
| "-i", |
| str(inp), |
| "-ar", |
| "16000", |
| "-ac", |
| "1", |
| "-c:a", |
| "pcm_s16le", |
| str(out), |
| ] |
| try: |
| result = subprocess.run(cmd, capture_output=True, timeout=45, check=False) |
| except (OSError, subprocess.TimeoutExpired) as exc: |
| logger.warning("voice_handler: ffmpeg conversion failed — %s", exc) |
| return None |
| if result.returncode != 0 or not out.is_file(): |
| stderr = (result.stderr or b"").decode("utf-8", errors="replace")[:200] |
| logger.warning("voice_handler: ffmpeg exit=%s stderr=%s", result.returncode, stderr) |
| return None |
| return out.read_bytes() |
|
|
|
|
| def normalize_audio_for_omni( |
| audio_base64: str, |
| audio_format: str, |
| mode: str = "voice_extraction", |
| ) -> dict[str, Any]: |
| """Decode browser audio and normalize to WAV for NVIDIA Omni when needed.""" |
| fmt = str(audio_format or "webm").strip().lower().lstrip(".") |
| if fmt not in {"webm", "wav", "mp3", "m4a", "ogg"}: |
| return { |
| "error": f"Unsupported audio_format: {fmt}", |
| "audio_format": fmt, |
| "mode": mode, |
| } |
|
|
| try: |
| raw = base64.b64decode(audio_base64.strip(), validate=True) |
| except (binascii.Error, ValueError) as exc: |
| return { |
| "error": "Invalid base64 audio payload", |
| "detail": str(exc), |
| "audio_format": fmt, |
| "mode": mode, |
| } |
|
|
| if not raw: |
| return {"error": "Decoded audio is empty", "audio_format": fmt, "mode": mode} |
|
|
| |
| |
| |
| if len(raw) < _MIN_AUDIO_BYTES: |
| logger.info( |
| "voice_handler: rejecting too-small audio mode=%s bytes=%d (min=%d)", |
| mode, len(raw), _MIN_AUDIO_BYTES, |
| ) |
| return { |
| "error": "That recording was too short. Tap the mic, speak, then tap again to stop.", |
| "audio_format": fmt, |
| "byte_size": len(raw), |
| "mode": mode, |
| } |
|
|
| detected = _detect_audio_magic(raw) |
| logger.info( |
| "voice_handler: normalize audio mode=%s declared=%s detected=%s bytes=%d magic=%s", |
| mode, |
| fmt, |
| detected, |
| len(raw), |
| _magic_hex(raw), |
| ) |
|
|
| if detected == "wav" or (fmt == "wav" and raw[:4] == _WAV_RIFF): |
| return { |
| "audio_base64": base64.b64encode(raw).decode("ascii"), |
| "audio_format": "wav", |
| "source_format": fmt, |
| "byte_size": len(raw), |
| "converted": fmt != "wav" and detected == "wav", |
| "mode": mode, |
| } |
|
|
| source_ext = detected if detected != "unknown" else fmt |
| wav_bytes = _convert_audio_to_wav_ffmpeg(raw, source_ext) |
| if wav_bytes and wav_bytes[:4] == _WAV_RIFF: |
| logger.info( |
| "voice_handler: converted %s → wav (%d → %d bytes)", |
| source_ext, |
| len(raw), |
| len(wav_bytes), |
| ) |
| return { |
| "audio_base64": base64.b64encode(wav_bytes).decode("ascii"), |
| "audio_format": "wav", |
| "source_format": fmt, |
| "byte_size": len(wav_bytes), |
| "converted": True, |
| "mode": mode, |
| } |
|
|
| |
| |
| send_fmt = source_ext if source_ext != "unknown" else fmt |
| logger.info( |
| "voice_handler: passthrough audio mode=%s format=%s bytes=%d (ffmpeg=%s)", |
| mode, |
| send_fmt, |
| len(raw), |
| bool(shutil.which("ffmpeg")), |
| ) |
| return { |
| "audio_base64": base64.b64encode(raw).decode("ascii"), |
| "audio_format": send_fmt, |
| "source_format": fmt, |
| "byte_size": len(raw), |
| "converted": False, |
| "mode": mode, |
| } |
|
|
|
|
| def _call_omni_with_normalized_audio( |
| prompt: str, |
| audio_base64: str, |
| audio_format: str, |
| mode: str, |
| ) -> str | dict[str, Any]: |
| normalized = normalize_audio_for_omni(audio_base64, audio_format, mode=mode) |
| if normalized.get("error"): |
| return normalized |
|
|
| def _invoke(payload: dict[str, Any]) -> str: |
| return nvidia_client.call_omni_audio_json( |
| prompt, |
| payload["audio_base64"], |
| payload["audio_format"], |
| mode=mode, |
| source_format=payload.get("source_format", audio_format), |
| decoded_bytes=payload.get("byte_size"), |
| ) |
|
|
| try: |
| return _invoke(normalized) |
| except OmniAudioError as exc: |
| |
| if not normalized.get("converted"): |
| raw_bytes = base64.b64decode(normalized["audio_base64"]) |
| detected = _detect_audio_magic(raw_bytes) |
| source_ext = detected if detected != "unknown" else normalized.get("source_format", audio_format) |
| wav_bytes = _convert_audio_to_wav_ffmpeg(raw_bytes, source_ext) |
| if wav_bytes and wav_bytes[:4] == _WAV_RIFF: |
| logger.info("voice_handler: Omni rejected passthrough; retrying as wav") |
| retry_payload = { |
| "audio_base64": base64.b64encode(wav_bytes).decode("ascii"), |
| "audio_format": "wav", |
| "source_format": normalized.get("source_format", audio_format), |
| "byte_size": len(wav_bytes), |
| "converted": True, |
| } |
| try: |
| return _invoke(retry_payload) |
| except OmniAudioError as retry_exc: |
| err = retry_exc.to_error_dict() |
| err["source_format"] = normalized.get("source_format", audio_format) |
| err["converted"] = True |
| err["error"] = "Voice transcription failed. Try recording again or type your answer." |
| return err |
|
|
| err = exc.to_error_dict() |
| err["source_format"] = normalized.get("source_format", audio_format) |
| err["converted"] = normalized.get("converted", False) |
| err["error"] = "Voice transcription failed. Try recording again or type your answer." |
| return err |
| except ValueError as exc: |
| return {"error": str(exc)} |
| except RuntimeError as exc: |
| return {"error": str(exc)} |
|
|
|
|
| def count_filler_words(transcript: str) -> list[str]: |
| """Return filler words/phrases found in transcript (case-insensitive).""" |
| if not transcript: |
| return [] |
| text = transcript.lower() |
| found: list[str] = [] |
| for pattern in _FILLER_PATTERNS: |
| if re.search(pattern, text, re.IGNORECASE): |
| label = pattern.strip(r"\b").replace("\\b", "") |
| if label not in found: |
| found.append(label) |
| return found |
|
|
|
|
| def estimate_word_count(transcript: str) -> int: |
| """Count words in transcript.""" |
| if not transcript: |
| return 0 |
| return len(re.findall(r"\b\w+\b", transcript)) |
|
|
|
|
| def detect_self_corrections(transcript: str) -> int: |
| """Count simple self-correction cues in transcript.""" |
| if not transcript: |
| return 0 |
| patterns = [ |
| r"\bi mean\b", r"\bwait\b", r"\bsorry\b", r"\bno,\s", r"\bactually\b", |
| r"\blet me rephrase\b", r"\bwhat i meant\b", |
| ] |
| count = 0 |
| lower = transcript.lower() |
| for p in patterns: |
| count += len(re.findall(p, lower)) |
| return count |
|
|
|
|
| def _detect_repeated_phrases(transcript: str) -> int: |
| """Count repeated 3-word phrases (simple repetition signal).""" |
| words = re.findall(r"\b\w+\b", (transcript or "").lower()) |
| if len(words) < 6: |
| return 0 |
| trigrams: dict[str, int] = {} |
| for i in range(len(words) - 2): |
| tri = " ".join(words[i : i + 3]) |
| trigrams[tri] = trigrams.get(tri, 0) + 1 |
| return sum(1 for c in trigrams.values() if c > 1) |
|
|
|
|
| def sanitize_voice_json(data: dict[str, Any]) -> dict[str, Any]: |
| """Normalize voice JSON fields with safe delivery-only wording.""" |
| if not isinstance(data, dict): |
| return {} |
| out = dict(data) |
| out["transcript"] = str(out.get("transcript", "")).strip() |
| extracted = out.get("extracted") |
| if isinstance(extracted, dict): |
| out["extracted"] = { |
| k: str(extracted.get(k, "")).strip() for k in _EXTRACTED_FIELDS |
| } |
| delivery = out.get("delivery_observations") |
| if isinstance(delivery, dict): |
| fillers = delivery.get("filler_words", []) |
| out["delivery_observations"] = { |
| "filler_words": [str(f).strip() for f in fillers if str(f).strip()][:20] |
| if isinstance(fillers, list) else [], |
| "pace": str(delivery.get("pace", "")).strip() or "unclear", |
| "clarity": str(delivery.get("clarity", "")).strip(), |
| "confidence_signal": str(delivery.get("confidence_signal", "")).strip() or "unclear", |
| "delivery_note": str(delivery.get("delivery_note", "")).strip(), |
| } |
| conf = str(out.get("extraction_confidence", "")).strip().lower() |
| if conf not in ("high", "medium", "low"): |
| conf = "medium" |
| out["extraction_confidence"] = conf |
| return out |
|
|
|
|
| def _sanitize_turn_json(data: dict[str, Any]) -> dict[str, Any]: |
| if not isinstance(data, dict): |
| return {} |
| transcript = str(data.get("transcript", "")).strip() |
| cues_raw = data.get("delivery_cues", {}) |
| cues: dict[str, Any] = {} |
| if isinstance(cues_raw, dict): |
| fillers = cues_raw.get("filler_words", []) |
| cues = { |
| "filler_words": [str(f).strip() for f in fillers if str(f).strip()][:20] |
| if isinstance(fillers, list) else [], |
| "pace": str(cues_raw.get("pace", "")).strip() or "unclear", |
| "clarity": str(cues_raw.get("clarity", "")).strip() or "unclear", |
| "repetition": str(cues_raw.get("repetition", "")).strip() or "low", |
| "self_corrections": int(cues_raw.get("self_corrections", 0) or 0), |
| "confidence_signal": str(cues_raw.get("confidence_signal", "")).strip() or "unclear", |
| } |
| local_fillers = count_filler_words(transcript) |
| if not cues.get("filler_words") and local_fillers: |
| cues["filler_words"] = local_fillers |
| if cues.get("self_corrections", 0) == 0: |
| cues["self_corrections"] = detect_self_corrections(transcript) |
| rep_count = _detect_repeated_phrases(transcript) |
| if cues.get("repetition") == "low" and rep_count >= 2: |
| cues["repetition"] = "medium" |
| return { |
| "transcript": transcript, |
| "delivery_note": str(data.get("delivery_note", "")).strip() or "Clean delivery.", |
| "word_count": estimate_word_count(transcript), |
| "delivery_cues": cues, |
| } |
|
|
|
|
| def _parse_pitch_json(raw: str) -> dict[str, Any] | None: |
| parsed, _ = parse_model_json(raw) |
| if not isinstance(parsed, dict) or not parsed: |
| parsed = safe_json_parse(raw) |
| if not isinstance(parsed, dict) or not parsed: |
| return None |
| transcript = str(parsed.get("transcript", "")).strip() |
| if not transcript: |
| return None |
| sanitized = sanitize_voice_json(parsed) |
| sanitized["transcript"] = transcript |
| return sanitized |
|
|
|
|
| def _parse_turn_json(raw: str) -> dict[str, Any] | None: |
| parsed, _ = parse_model_json(raw) |
| if not isinstance(parsed, dict) or not parsed: |
| parsed = safe_json_parse(raw) |
| if not isinstance(parsed, dict) or not parsed: |
| return None |
| transcript = str(parsed.get("transcript", "")).strip() |
| if not transcript: |
| return None |
| return _sanitize_turn_json(parsed) |
|
|
|
|
| def _repair_pitch_json(raw_bad: str) -> dict[str, Any] | None: |
| repair_prompt = ( |
| "Convert the input into valid JSON matching this schema exactly. " |
| "Return ONLY JSON. First char { last char }.\n" |
| '{"transcript":"","extracted":{"name":"","problem":"","target_users":"",' |
| '"solution":"","why_ai":"","traction":"","competitors":"","ask":""},' |
| '"delivery_observations":{"filler_words":[],"pace":"","clarity":"",' |
| '"confidence_signal":"","delivery_note":""},"extraction_confidence":"medium"}\n\n' |
| + raw_bad[:4000] |
| ) |
| try: |
| content = nvidia_client.generate_nemotron_response( |
| [{"role": "user", "content": repair_prompt}], |
| mode="voice_extraction_repair", |
| ) |
| return _parse_pitch_json(content) |
| except Exception as exc: |
| logger.warning("voice_handler: pitch repair failed — %s", exc) |
| return None |
|
|
|
|
| def _repair_turn_json(raw_bad: str) -> dict[str, Any] | None: |
| repair_prompt = ( |
| "Convert the input into valid JSON matching this schema exactly. " |
| "Return ONLY JSON. First char { last char }.\n" |
| '{"transcript":"","delivery_note":"","word_count":0,' |
| '"delivery_cues":{"filler_words":[],"pace":"","clarity":"",' |
| '"repetition":"low","self_corrections":0,"confidence_signal":""}}\n\n' |
| + raw_bad[:3000] |
| ) |
| try: |
| content = nvidia_client.generate_nemotron_response( |
| [{"role": "user", "content": repair_prompt}], |
| mode="voice_turn_repair", |
| ) |
| return _parse_turn_json(content) |
| except Exception as exc: |
| logger.warning("voice_handler: turn repair failed — %s", exc) |
| return None |
|
|
|
|
| def process_voice_pitch(audio_base64: str, audio_format: str) -> dict[str, Any]: |
| """Opening spoken pitch → transcript + extracted startup fields + delivery cues.""" |
| if not nvidia_client.is_configured(): |
| return {"error": "NVIDIA_API_KEY is not configured on the server."} |
|
|
| raw = _call_omni_with_normalized_audio( |
| _VOICE_PITCH_PROMPT, audio_base64, audio_format, mode="voice_extraction" |
| ) |
| if isinstance(raw, dict): |
| return raw |
|
|
| parsed = _parse_pitch_json(raw) |
| if parsed is None: |
| logger.warning("voice_handler: pitch parse failed, attempting repair") |
| parsed = _repair_pitch_json(raw) |
|
|
| if parsed is None: |
| return {"error": "Could not parse voice pitch response from Nemotron Omni."} |
|
|
| return parsed |
|
|
|
|
| def process_voice_turn( |
| session_id: str, |
| audio_base64: str, |
| audio_format: str, |
| ) -> dict[str, Any]: |
| """One battle answer audio → transcript + delivery note (pending confirmation).""" |
| session = session_manager.get_session(session_id) |
| if not session: |
| return {"error": "Session not found", "session_id": session_id} |
|
|
| if not nvidia_client.is_configured(): |
| return {"error": "NVIDIA_API_KEY is not configured on the server.", "session_id": session_id} |
|
|
| raw = _call_omni_with_normalized_audio( |
| _VOICE_TURN_PROMPT, audio_base64, audio_format, mode="voice_turn" |
| ) |
| if isinstance(raw, dict): |
| raw["session_id"] = session_id |
| return raw |
|
|
| parsed = _parse_turn_json(raw) |
| if parsed is None: |
| logger.warning("voice_handler: turn parse failed, attempting repair") |
| parsed = _repair_turn_json(raw) |
|
|
| if parsed is None: |
| return {"error": "Could not parse voice turn response from Nemotron Omni.", "session_id": session_id} |
|
|
| voice_turn_id = str(uuid.uuid4()) |
| transcript = parsed["transcript"] |
| fillers = parsed["delivery_cues"].get("filler_words") or count_filler_words(transcript) |
| filler_count = len(fillers) |
|
|
| turn_record = { |
| "voice_turn_id": voice_turn_id, |
| "transcript": transcript, |
| "delivery_note": parsed.get("delivery_note", ""), |
| "word_count": parsed.get("word_count", estimate_word_count(transcript)), |
| "delivery_cues": parsed.get("delivery_cues", {}), |
| "filler_word_count": filler_count, |
| "confirmed": False, |
| } |
| session_manager.store_pending_voice_turn(session_id, turn_record) |
|
|
| return { |
| "session_id": session_id, |
| "voice_turn_id": voice_turn_id, |
| "transcript": transcript, |
| "delivery_note": turn_record["delivery_note"], |
| "word_count": turn_record["word_count"], |
| "delivery_cues": turn_record["delivery_cues"], |
| } |
|
|
|
|
| def confirm_voice_turn( |
| session_id: str, |
| voice_turn_id: str, |
| final_transcript: str, |
| ) -> bool: |
| """Mark a pending voice turn as confirmed with the user's final transcript.""" |
| return session_manager.confirm_voice_turn(session_id, voice_turn_id, final_transcript) |
|
|
|
|
| def _is_generic_delivery_note(note: str) -> bool: |
| """Skip filler delivery notes that clutter the scorecard UI.""" |
| n = (note or "").strip().lower().rstrip(".") |
| return n in ("clean delivery", "clean delivery.", "") |
|
|
|
|
| def build_voice_delivery_summary(session: dict) -> dict[str, Any] | None: |
| """Aggregate confirmed voice turns into a scorecard delivery summary (local only).""" |
| confirmed = session.get("confirmed_voice_turns") or [] |
| voice_pitch = session.get("voice_pitch") |
| if not confirmed and not voice_pitch: |
| return None |
|
|
| all_fillers: list[str] = [] |
| delivery_notes: list[str] = [] |
| pace_counts: dict[str, int] = {} |
| clarity_signals: list[str] = [] |
| confidence_signals: list[str] = [] |
|
|
| if isinstance(voice_pitch, dict): |
| obs = voice_pitch.get("delivery_observations") or {} |
| if isinstance(obs, dict): |
| note = str(obs.get("delivery_note", "")).strip() |
| if note and not _is_generic_delivery_note(note): |
| delivery_notes.append(f"Opening pitch: {note}") |
| for f in obs.get("filler_words") or []: |
| if str(f).strip(): |
| all_fillers.append(str(f).strip()) |
| pace = str(obs.get("pace", "")).strip() |
| if pace: |
| pace_counts[pace] = pace_counts.get(pace, 0) + 1 |
| clarity = str(obs.get("clarity", "")).strip() |
| if clarity: |
| clarity_signals.append(clarity) |
| conf = str(obs.get("confidence_signal", "")).strip() |
| if conf: |
| confidence_signals.append(conf) |
|
|
| for turn in confirmed: |
| if not isinstance(turn, dict): |
| continue |
| note = str(turn.get("delivery_note", "")).strip() |
| if note and not _is_generic_delivery_note(note): |
| delivery_notes.append(note) |
| cues = turn.get("delivery_cues") or {} |
| if isinstance(cues, dict): |
| for f in cues.get("filler_words") or []: |
| if str(f).strip(): |
| all_fillers.append(str(f).strip()) |
| pace = str(cues.get("pace", "")).strip() |
| if pace: |
| pace_counts[pace] = pace_counts.get(pace, 0) + 1 |
| clarity = str(cues.get("clarity", "")).strip() |
| if clarity: |
| clarity_signals.append(clarity) |
| conf = str(cues.get("confidence_signal", "")).strip() |
| if conf: |
| confidence_signals.append(conf) |
|
|
| filler_unique = list(dict.fromkeys(all_fillers)) |
| total_fillers = len(all_fillers) |
| avg_pace = max(pace_counts, key=pace_counts.get) if pace_counts else "unclear" |
|
|
| if total_fillers == 0 and len(confirmed) >= 2: |
| overall = "Voice delivery was generally clear across your spoken answers." |
| elif total_fillers > 5: |
| overall = ( |
| f"Filler words appeared often ({total_fillers} total). " |
| "Practice pausing briefly instead of using fillers before key claims." |
| ) |
| elif delivery_notes: |
| overall = "Review the delivery notes below and practice smoother pacing on your weakest round." |
| else: |
| overall = "Voice turns recorded — delivery was acceptable for a practice session." |
|
|
| return { |
| "total_voice_turns": len(confirmed), |
| "total_filler_words": total_fillers, |
| "filler_word_list": filler_unique[:12], |
| "delivery_notes": list(dict.fromkeys(delivery_notes))[:4], |
| "average_pace": avg_pace, |
| "clarity_signal": clarity_signals[-1] if clarity_signals else "unclear", |
| "confidence_signal": confidence_signals[-1] if confidence_signals else "unclear", |
| "overall_delivery_feedback": overall, |
| } |
|
|