Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| import base64 | |
| import re | |
| import tempfile | |
| from pathlib import Path | |
| from typing import Any, Literal | |
| import gradio as gr | |
| from echocoach.config import get_echo_coach_config | |
| from echocoach.pipeline import run_echo_coach | |
| from echocoach.prompts import TeacherVoiceMode | |
| from echocoach.recording import ( | |
| ServerRecordingError, | |
| recording_backend_status, | |
| recording_elapsed_seconds, | |
| recording_level_warning, | |
| start_server_recording, | |
| stop_server_recording, | |
| ) | |
| from echocoach.teacher_voice import RAG_MODES, run_teacher_voice_text_turn, run_teacher_voice_turn | |
| from gradio_space.api.serializers import err, ok, unwrap_update, update_value | |
| from gradio_space.model_loading import ( | |
| ensure_model_loaded, | |
| get_active_model_key, | |
| model_status, | |
| reload_model, | |
| select_and_reload_model, | |
| ) | |
| from gradio_space.research_helpers import ( | |
| list_session_choices, | |
| memory_summary, | |
| pick_session_for_topic, | |
| rag_aware_chat, | |
| rag_scope_hint, | |
| resolve_doc_ids, | |
| resolve_session, | |
| ) | |
| from gradio_space.conversation_helpers import format_conversation_context | |
| from gradio_space.tabs.education_pptx import SOURCE_MODES, SEARCH_WORKFLOWS, generate_lesson_slides | |
| from gradio_space.tabs.quiz_maker import generate_quiz | |
| from gradio_space.tabs.research_mind import ( | |
| ask_question, | |
| auto_search_ingest, | |
| discover_sources, | |
| ingest_selected, | |
| ) | |
| from gradio_space.ui.studio_html import ( | |
| render_doc_cards, | |
| render_echo_coach_panel, | |
| render_gallery_strip, | |
| render_slide_canvas, | |
| render_trace_details, | |
| ) | |
| from gradio_space.voice_helpers import speak_last_assistant_reply | |
| from inference.config import get_app_config, get_model_config | |
| from inference.factory import get_backend | |
| from researchmind.config import get_config as get_research_config | |
| from researchmind.ingest import IngestPipeline | |
| _echo_config = get_echo_coach_config() | |
| _app_config = get_app_config() | |
| _SAMPLE_PITCH_AUDIO = ( | |
| Path(__file__).resolve().parents[5] | |
| / "libs" | |
| / "echocoach" | |
| / "tests" | |
| / "fixtures" | |
| / "silence_2s.wav" | |
| ) | |
| _SOURCE_LABELS = {value: label for label, value in SOURCE_MODES} | |
| _WORKFLOW_LABELS = {value: label for label, value in SEARCH_WORKFLOWS} | |
| class _NoopProgress: | |
| def __call__(self, *args: Any, **kwargs: Any) -> None: | |
| return None | |
| def tqdm(self, iterable: Any, **kwargs: Any) -> Any: | |
| return iterable | |
| def _elapsed_seconds_from_log(processing_log: str) -> float | None: | |
| match = re.search(r"Elapsed:\s*([\d.]+)s", processing_log or "") | |
| if not match: | |
| match = re.search(r"\*\*Elapsed:\*\* ([\d.]+)s", processing_log or "") | |
| if not match: | |
| return None | |
| return float(match.group(1)) | |
| def _progress_from_trace(trace_json: str) -> dict[str, Any]: | |
| import json | |
| try: | |
| trace = json.loads(trace_json) | |
| except json.JSONDecodeError: | |
| return {"steps": []} | |
| steps = [] | |
| for step in trace.get("steps", []): | |
| if step.get("type") != "step": | |
| continue | |
| duration_ms = step.get("duration_ms") | |
| steps.append( | |
| { | |
| "name": step.get("name"), | |
| "label": step.get("label"), | |
| "detail": step.get("detail", ""), | |
| "duration_s": round(duration_ms / 1000, 1) if duration_ms is not None else None, | |
| "status": "done", | |
| } | |
| ) | |
| return {"steps": steps} | |
| def _doc_meta(doc: Any) -> str: | |
| uri = str(doc.uri or "") | |
| if len(uri) > 48: | |
| uri = uri[:45] + "…" | |
| return f"{doc.source_type} · {uri}" | |
| def _documents_payload(session_id: str) -> list[dict[str, Any]]: | |
| store = IngestPipeline().store | |
| docs = store.list_documents(session_id=session_id or None) | |
| return [ | |
| { | |
| "id": d.id, | |
| "title": d.title, | |
| "source_type": d.source_type, | |
| "uri": d.uri, | |
| "meta": _doc_meta(d), | |
| } | |
| for d in docs | |
| ] | |
| def _session_has_rag_sources(session_id: str, doc_ids: list[str] | None) -> bool: | |
| if not session_id: | |
| return False | |
| docs = _documents_payload(session_id) | |
| if not docs: | |
| return False | |
| if doc_ids: | |
| valid = {d["id"] for d in docs} | |
| return any(doc_id in valid for doc_id in doc_ids) | |
| return True | |
| def _sessions_payload() -> list[dict[str, str]]: | |
| sessions: list[dict[str, str]] = [] | |
| for label, sid in list_session_choices(): | |
| if sid == "": | |
| continue | |
| topic = label.split(" (")[0] if " (" in label else label | |
| sessions.append({"id": sid, "label": label, "topic": topic}) | |
| return sessions | |
| def _pick_session(topic_hint: str = "") -> str: | |
| return pick_session_for_topic(topic_hint) | |
| def _voice_stack_summary() -> str: | |
| asr = _echo_config.get_asr() | |
| tts = _echo_config.get_tts() | |
| lines = [ | |
| f"ASR: {asr.label} ({_echo_config.asr_preset})", | |
| f"TTS: {tts.label} ({_echo_config.tts_preset})", | |
| f"Coach model: {_echo_config.coach_model}", | |
| f"Coach fallbacks: {', '.join(_echo_config.coach_fallbacks) or 'none'}", | |
| f"Max recording: {_echo_config.max_seconds}s", | |
| ] | |
| return "\n".join(lines) | |
| def _coach_model_key( | |
| coach_model: str | None = None, | |
| *, | |
| language: str = "en", | |
| coach_variant: str = "auto", | |
| ) -> str: | |
| if coach_model and coach_model.strip(): | |
| key = coach_model.strip() | |
| elif coach_variant and coach_variant not in ("auto", ""): | |
| key = coach_variant.strip() | |
| else: | |
| key = _echo_config.coach_model | |
| if key in ("tiny-aya-water", "tiny-aya-fire", "tiny-aya-earth", "auto"): | |
| key = "tiny-aya-global" | |
| return key | |
| def _coach_model_label(model_key: str) -> str: | |
| try: | |
| return get_model_config(model_key).label | |
| except Exception: | |
| return model_key | |
| def _coach_model_candidates( | |
| coach_model: str | None = None, | |
| *, | |
| language: str = "en", | |
| coach_variant: str = "auto", | |
| ) -> list[str]: | |
| if coach_model and coach_model.strip(): | |
| return [coach_model.strip()] | |
| primary = _coach_model_key(None, language=language, coach_variant=coach_variant) | |
| chain: list[str] = [] | |
| seen: set[str] = set() | |
| for key in (primary, *_echo_config.coach_fallbacks): | |
| if key and key not in seen: | |
| seen.add(key) | |
| chain.append(key) | |
| return chain or [primary] | |
| def _ensure_coach_loaded( | |
| coach_model: str | None = None, | |
| *, | |
| language: str = "en", | |
| coach_variant: str = "auto", | |
| ) -> tuple[str, str | None, str | None]: | |
| """Load the first coach preset that succeeds. Returns (key, error, fallback_note).""" | |
| candidates = _coach_model_candidates( | |
| coach_model, | |
| language=language, | |
| coach_variant=coach_variant, | |
| ) | |
| errors: list[str] = [] | |
| for index, key in enumerate(candidates): | |
| load_error = ensure_model_loaded(key) | |
| if not load_error: | |
| if index == 0: | |
| return key, None, None | |
| label = _coach_model_label(key) | |
| note = ( | |
| f"Primary coach unavailable — using fallback **{label}** (`{key}`). " | |
| "Replies still follow your target language via prompts." | |
| ) | |
| return key, None, note | |
| errors.append(load_error) | |
| return candidates[-1], errors[-1], None | |
| def _coach_turn_status(base: str | None, fallback_note: str | None) -> str: | |
| status = (base or "Turn complete.").strip() | |
| if fallback_note: | |
| return f"{fallback_note} {status}".strip() | |
| return status | |
| def _voice_language_codes() -> list[str]: | |
| return [code for _, code in _echo_config.language_choices()] | |
| def _paths_summary() -> str: | |
| rm = get_research_config() | |
| lines = [] | |
| if _app_config.presets_path: | |
| lines.append(f"Model presets: {_app_config.presets_path}") | |
| else: | |
| lines.append("Model presets: built-in defaults") | |
| lines.append(f"ResearchMind store: {rm.data_dir.resolve()}") | |
| return "\n".join(lines) | |
| def _resolve_source_labels( | |
| source_mode: str, | |
| search_workflow: str, | |
| use_rag: bool, | |
| session_id: str, | |
| doc_ids: list[str] | None, | |
| ) -> tuple[str, str, str, list[str]]: | |
| """Return source_label, workflow_label, effective_session, effective_docs.""" | |
| mode = (source_mode or "").strip().lower() | |
| if not mode: | |
| sid = (session_id or "").strip() | |
| has_sources = _session_has_rag_sources(sid, doc_ids) if use_rag else False | |
| if use_rag and has_sources: | |
| return ( | |
| _SOURCE_LABELS["rag"], | |
| _WORKFLOW_LABELS["two_step"], | |
| sid, | |
| doc_ids or [], | |
| ) | |
| return _SOURCE_LABELS["none"], _WORKFLOW_LABELS["two_step"], "", [] | |
| workflow_key = (search_workflow or "two_step").strip().lower() | |
| if workflow_key not in _WORKFLOW_LABELS: | |
| workflow_key = "two_step" | |
| if mode not in _SOURCE_LABELS: | |
| mode = "none" | |
| sid = (session_id or "").strip() | |
| if mode == "rag" and not sid: | |
| sid = "" | |
| return ( | |
| _SOURCE_LABELS[mode], | |
| _WORKFLOW_LABELS[workflow_key], | |
| sid if mode == "rag" else sid, | |
| doc_ids or [] if mode == "rag" else [], | |
| ) | |
| def api_list_sessions() -> dict[str, Any]: | |
| return ok(sessions=_sessions_payload()) | |
| def api_list_documents(session_id: str = "") -> dict[str, Any]: | |
| docs = _documents_payload(session_id) | |
| html_cards = render_doc_cards(docs, rag_active=bool(docs)) | |
| return ok( | |
| session_id=session_id, | |
| documents=docs, | |
| documents_html=html_cards, | |
| memory_markdown=memory_summary(session_id), | |
| ) | |
| def api_session_memory(session_id: str = "") -> dict[str, Any]: | |
| return ok(memory_markdown=memory_summary(session_id)) | |
| def _ingest_response( | |
| status: str, | |
| session_id: str, | |
| trace_json: str = "", | |
| trace_summary: str = "", | |
| ) -> dict[str, Any]: | |
| sid = session_id or "" | |
| docs = _documents_payload(sid) | |
| return ok( | |
| status=status, | |
| session_id=sid, | |
| documents=docs, | |
| documents_html=render_doc_cards(docs, rag_active=bool(docs)), | |
| trace_json=trace_json, | |
| trace_summary=trace_summary, | |
| trace_html=render_trace_details(trace_summary=trace_summary, trace_json=trace_json), | |
| ) | |
| def api_discover_sources(topic: str, session_id: str = "") -> dict[str, Any]: | |
| if not (topic or "").strip(): | |
| return err("Enter a workspace topic before discovering sources.") | |
| summary, url_up, sess_up, trace_sum, trace_json, _memory, _doc_up, _acc_up = discover_sources( | |
| topic, | |
| session_id, | |
| "", | |
| "", | |
| _NoopProgress(), | |
| ) | |
| url_payload = unwrap_update(url_up) | |
| urls = list(url_payload.get("choices") or []) if isinstance(url_payload, dict) else [] | |
| selected = list(url_payload.get("value") or urls) if isinstance(url_payload, dict) else urls | |
| sid = update_value(sess_up, session_id) | |
| trace_str = trace_json if isinstance(trace_json, str) else "" | |
| if summary and "error" in summary.lower() and not urls: | |
| return err(strip_md_summary(summary), status=summary, urls=[], session_id=sid) | |
| return ok( | |
| status=summary, | |
| urls=urls, | |
| selected_urls=selected, | |
| session_id=sid, | |
| trace_summary=trace_sum, | |
| trace_json=trace_str, | |
| trace_html=render_trace_details(trace_summary=trace_sum, trace_json=trace_str), | |
| ) | |
| def api_auto_search_ingest(topic: str, session_id: str = "") -> dict[str, Any]: | |
| if not (topic or "").strip(): | |
| return err("Enter a workspace topic before auto-ingest.") | |
| status, _url_up, sess_up, trace_sum, trace_json, _memory, _doc_up, _acc_up = auto_search_ingest( | |
| topic, | |
| session_id, | |
| "", | |
| "", | |
| _NoopProgress(), | |
| ) | |
| sid = update_value(sess_up, session_id) | |
| if status and "error" in status.lower() and "ingested" not in status.lower(): | |
| return err(strip_md_summary(status), status=status, session_id=sid) | |
| return _ingest_response(status, sid, trace_json=str(trace_json or ""), trace_summary=trace_sum) | |
| def api_ingest_sources( | |
| topic: str, | |
| session_id: str = "", | |
| urls_text: str = "", | |
| selected_urls: list[str] | None = None, | |
| file_paths: list[str] | None = None, | |
| ) -> dict[str, Any]: | |
| has_urls = bool((urls_text or "").strip() or (selected_urls or [])) | |
| has_files = bool(file_paths) | |
| if not has_urls and not has_files: | |
| return err("Add URLs, select suggested sources, or upload a file — then ingest.") | |
| status, _memory, trace_json, trace_sum, sess_up, _doc_up = ingest_selected( | |
| topic, | |
| urls_text, | |
| selected_urls or [], | |
| file_paths, | |
| session_id or None, | |
| "", | |
| "", | |
| _NoopProgress(), | |
| ) | |
| sid = update_value(sess_up, session_id) | |
| if status and "error" in status.lower() and "ingested" not in status.lower(): | |
| return err(strip_md_summary(status), status=status, session_id=sid) | |
| return _ingest_response(status, sid, trace_json=str(trace_json or ""), trace_summary=trace_sum) | |
| def strip_md_summary(text: str) -> str: | |
| return re.sub(r"\*\*", "", str(text or "")).strip() | |
| def api_ingest_url(topic: str, url: str, session_id: str = "") -> dict[str, Any]: | |
| if not url.strip(): | |
| return err("Paste a URL to ingest.") | |
| return api_ingest_sources(topic, session_id, urls_text=url.strip()) | |
| def api_ingest_files( | |
| topic: str, | |
| session_id: str, | |
| file_paths: list[str], | |
| ) -> dict[str, Any]: | |
| if not file_paths: | |
| return err("Upload at least one PDF or DOCX file.") | |
| return api_ingest_sources(topic, session_id, file_paths=file_paths) | |
| def api_research_chat( | |
| question: str, | |
| session_id: str = "", | |
| doc_ids: list[str] | None = None, | |
| history: list[dict[str, str]] | None = None, | |
| ) -> dict[str, Any]: | |
| if not question.strip(): | |
| return err("Enter a question.") | |
| hist, trace_json, trace_sum, rag_hint, _cleared = ask_question( | |
| question, | |
| session_id, | |
| doc_ids or [], | |
| history or [], | |
| "", | |
| doc_ids or [], | |
| _NoopProgress(), | |
| ) | |
| assistant = "" | |
| for msg in reversed(hist or []): | |
| if msg.get("role") == "assistant": | |
| assistant = str(msg.get("content") or "") | |
| break | |
| trace_str = trace_json if isinstance(trace_json, str) else "" | |
| return ok( | |
| history=hist, | |
| assistant=assistant, | |
| rag_hint=rag_hint, | |
| trace_json=trace_str, | |
| trace_summary=trace_sum, | |
| trace_html=render_trace_details(trace_summary=trace_sum, trace_json=trace_str), | |
| ) | |
| def api_debug_chat( | |
| message: str, | |
| history: list[list[str]] | None = None, | |
| use_rag: bool = False, | |
| session_id: str = "", | |
| doc_ids: list[str] | None = None, | |
| model_key: str = "", | |
| workspace_session_id: str = "", | |
| workspace_doc_ids: list[str] | None = None, | |
| ) -> dict[str, Any]: | |
| if not (message or "").strip(): | |
| return err("Enter a message.") | |
| key = (model_key or "").strip() or get_active_model_key() | |
| load_error = ensure_model_loaded(key) | |
| if load_error: | |
| return err(load_error) | |
| sid = resolve_session(session_id, workspace_session_id) | |
| docs = resolve_doc_ids(doc_ids, workspace_doc_ids) | |
| hist = history or [] | |
| reply, trace_json, trace_summary = rag_aware_chat( | |
| message.strip(), | |
| hist, | |
| key, | |
| use_rag, | |
| sid, | |
| docs, | |
| ) | |
| new_history = list(hist) | |
| new_history.append([message.strip(), reply]) | |
| return ok( | |
| history=new_history, | |
| assistant=reply, | |
| rag_hint=rag_scope_hint(sid, docs), | |
| trace_json=trace_json, | |
| trace_summary=trace_summary, | |
| trace_html=render_trace_details(trace_summary=trace_summary, trace_json=trace_json), | |
| ) | |
| def _build_slide_api_response( | |
| last: tuple, | |
| *, | |
| topic: str, | |
| sid: str, | |
| rag_notice: str = "", | |
| ) -> dict[str, Any]: | |
| ( | |
| outline_md, | |
| preview_html, | |
| gallery, | |
| pptx, | |
| docx, | |
| html_export, | |
| processing_log, | |
| trace_sum, | |
| trace_json, | |
| status, | |
| ) = last | |
| if preview_html and "form-error" in preview_html: | |
| return err(status or "Generation failed.", status=status, progress_log=processing_log) | |
| if rag_notice: | |
| status = f"{rag_notice}\n\n{status or 'Slides generated.'}".strip() | |
| downloads = { | |
| "pptx": pptx, | |
| "docx": docx, | |
| "html": html_export, | |
| } | |
| trace_str = trace_json if isinstance(trace_json, str) else "" | |
| return ok( | |
| topic=topic, | |
| session_id=sid, | |
| outline_md=outline_md, | |
| preview_html=preview_html, | |
| canvas_html=render_slide_canvas(preview_html), | |
| gallery=gallery or [], | |
| gallery_html=render_gallery_strip(gallery or []), | |
| downloads=downloads, | |
| status=status, | |
| rag_fallback=bool(rag_notice), | |
| progress_log=processing_log, | |
| trace_summary=trace_sum, | |
| trace_json=trace_str, | |
| trace_html=render_trace_details( | |
| trace_summary=trace_sum, | |
| trace_json=trace_str, | |
| progress_log=processing_log, | |
| ), | |
| elapsed_seconds=_elapsed_seconds_from_log(processing_log), | |
| progress=_progress_from_trace(trace_str), | |
| ) | |
| def _run_slide_generation(**kwargs) -> dict[str, Any]: | |
| topic = kwargs.pop("topic") | |
| sid = kwargs.pop("sid", "") | |
| rag_notice = kwargs.pop("rag_notice", "") | |
| gen = generate_lesson_slides(topic, **kwargs) | |
| last: tuple | None = None | |
| for item in gen: | |
| last = item | |
| if last is None: | |
| return err("Generation failed before producing output.") | |
| return _build_slide_api_response(last, topic=topic, sid=sid, rag_notice=rag_notice) | |
| def api_generate_slides( | |
| topic: str, | |
| grade: str = "6", | |
| slide_count: int = 5, | |
| session_id: str = "", | |
| use_rag: bool = True, | |
| doc_ids: list[str] | None = None, | |
| source_mode: str = "", | |
| search_workflow: str = "two_step", | |
| urls_text: str = "", | |
| selected_urls: list[str] | None = None, | |
| file_paths: list[str] | None = None, | |
| ) -> dict[str, Any]: | |
| rag_docs = doc_ids or [] | |
| sid = (session_id or "").strip() | |
| if not (source_mode or "").strip() and use_rag and not sid: | |
| sid = _pick_session(topic) | |
| source_label, workflow_label, effective_sid, effective_docs = _resolve_source_labels( | |
| source_mode, | |
| search_workflow, | |
| use_rag, | |
| sid, | |
| rag_docs, | |
| ) | |
| rag_notice = "" | |
| if (source_mode or "").strip().lower() == "rag" or ( | |
| not (source_mode or "").strip() and use_rag | |
| ): | |
| has_sources = _session_has_rag_sources(sid, rag_docs) | |
| if use_rag and not has_sources and source_label == _SOURCE_LABELS["rag"]: | |
| rag_notice = ( | |
| "Cross-Reference Sources is on, but this session has no indexed documents — " | |
| "generated from model knowledge only. Ingest sources in Step 1 to enable RAG." | |
| ) | |
| source_label = _SOURCE_LABELS["none"] | |
| effective_sid = "" | |
| effective_docs = [] | |
| upload_files = file_paths if file_paths else None | |
| return _run_slide_generation( | |
| topic=topic, | |
| sid=sid, | |
| rag_notice=rag_notice, | |
| grade=grade, | |
| slide_count=int(slide_count), | |
| source_mode_label=source_label, | |
| search_workflow_label=workflow_label, | |
| urls_text=urls_text or "", | |
| selected_urls=selected_urls or [], | |
| upload_files=upload_files, | |
| session_id=effective_sid, | |
| doc_ids=effective_docs, | |
| workspace_topic=topic, | |
| workspace_session=effective_sid, | |
| workspace_doc_ids=effective_docs, | |
| progress=_NoopProgress(), | |
| skip_preview_images=False, | |
| ) | |
| def _build_quiz_api_response( | |
| last: tuple, | |
| *, | |
| topic: str, | |
| sid: str, | |
| rag_notice: str = "", | |
| ) -> dict[str, Any]: | |
| ( | |
| outline_md, | |
| preview_html, | |
| docx, | |
| html_export, | |
| processing_log, | |
| trace_sum, | |
| trace_json, | |
| status, | |
| ) = last | |
| if preview_html and "form-error" in preview_html: | |
| return err(status or "Generation failed.", status=status, progress_log=processing_log) | |
| if rag_notice: | |
| status = f"{rag_notice}\n\n{status or 'Quiz generated.'}".strip() | |
| downloads = { | |
| "docx": docx, | |
| "html": html_export, | |
| } | |
| trace_str = trace_json if isinstance(trace_json, str) else "" | |
| return ok( | |
| topic=topic, | |
| session_id=sid, | |
| outline_md=outline_md, | |
| preview_html=preview_html, | |
| downloads=downloads, | |
| status=status, | |
| rag_fallback=bool(rag_notice), | |
| progress_log=processing_log, | |
| trace_summary=trace_sum, | |
| trace_json=trace_str, | |
| trace_html=render_trace_details( | |
| trace_summary=trace_sum, | |
| trace_json=trace_str, | |
| progress_log=processing_log, | |
| ), | |
| elapsed_seconds=_elapsed_seconds_from_log(processing_log), | |
| progress=_progress_from_trace(trace_str), | |
| ) | |
| def _run_quiz_generation(**kwargs) -> dict[str, Any]: | |
| topic = kwargs.pop("topic") | |
| sid = kwargs.pop("sid", "") | |
| rag_notice = kwargs.pop("rag_notice", "") | |
| gen = generate_quiz(topic, **kwargs) | |
| last: tuple | None = None | |
| for item in gen: | |
| last = item | |
| if last is None: | |
| return err("Generation failed before producing output.") | |
| return _build_quiz_api_response(last, topic=topic, sid=sid, rag_notice=rag_notice) | |
| def api_generate_quiz( | |
| topic: str, | |
| grade: str = "6", | |
| question_count: int = 5, | |
| session_id: str = "", | |
| use_rag: bool = True, | |
| doc_ids: list[str] | None = None, | |
| source_mode: str = "", | |
| search_workflow: str = "two_step", | |
| urls_text: str = "", | |
| selected_urls: list[str] | None = None, | |
| file_paths: list[str] | None = None, | |
| ) -> dict[str, Any]: | |
| rag_docs = doc_ids or [] | |
| sid = (session_id or "").strip() | |
| if not (source_mode or "").strip() and use_rag and not sid: | |
| sid = _pick_session(topic) | |
| source_label, workflow_label, effective_sid, effective_docs = _resolve_source_labels( | |
| source_mode, | |
| search_workflow, | |
| use_rag, | |
| sid, | |
| rag_docs, | |
| ) | |
| rag_notice = "" | |
| if (source_mode or "").strip().lower() == "rag" or ( | |
| not (source_mode or "").strip() and use_rag | |
| ): | |
| has_sources = _session_has_rag_sources(sid, rag_docs) | |
| if use_rag and not has_sources and source_label == _SOURCE_LABELS["rag"]: | |
| rag_notice = ( | |
| "Cross-Reference Sources is on, but this session has no indexed documents — " | |
| "generated from model knowledge only. Ingest sources in Step 1 to enable RAG." | |
| ) | |
| source_label = _SOURCE_LABELS["none"] | |
| effective_sid = "" | |
| effective_docs = [] | |
| upload_files = file_paths if file_paths else None | |
| return _run_quiz_generation( | |
| topic=topic, | |
| sid=sid, | |
| rag_notice=rag_notice, | |
| grade=grade, | |
| question_count=int(question_count), | |
| source_mode_label=source_label, | |
| search_workflow_label=workflow_label, | |
| urls_text=urls_text or "", | |
| selected_urls=selected_urls or [], | |
| upload_files=upload_files, | |
| session_id=effective_sid, | |
| doc_ids=effective_docs, | |
| workspace_topic=topic, | |
| workspace_session=effective_sid, | |
| workspace_doc_ids=effective_docs, | |
| progress=_NoopProgress(), | |
| ) | |
| def api_generate_slides_from_conversation( | |
| history: list | None, | |
| history_kind: str, | |
| topic: str, | |
| grade: str = "6", | |
| slide_count: int = 5, | |
| session_id: str = "", | |
| use_rag: bool = True, | |
| doc_ids: list[str] | None = None, | |
| source_mode: str = "", | |
| search_workflow: str = "two_step", | |
| urls_text: str = "", | |
| selected_urls: list[str] | None = None, | |
| file_paths: list[str] | None = None, | |
| ) -> dict[str, Any]: | |
| conversation_text, derived_topic = format_conversation_context(history, history_kind) | |
| if not conversation_text.strip(): | |
| return err("Start a conversation first.") | |
| effective_topic = (topic or "").strip() or derived_topic | |
| if not effective_topic: | |
| return err("Enter a topic or chat about a lesson first.") | |
| rag_docs = doc_ids or [] | |
| sid = (session_id or "").strip() | |
| if not (source_mode or "").strip() and use_rag and not sid: | |
| sid = _pick_session(effective_topic) | |
| source_label, workflow_label, effective_sid, effective_docs = _resolve_source_labels( | |
| source_mode, | |
| search_workflow, | |
| use_rag, | |
| sid, | |
| rag_docs, | |
| ) | |
| rag_notice = "" | |
| if (source_mode or "").strip().lower() == "rag" or ( | |
| not (source_mode or "").strip() and use_rag | |
| ): | |
| has_sources = _session_has_rag_sources(sid, rag_docs) | |
| if use_rag and not has_sources and source_label == _SOURCE_LABELS["rag"]: | |
| rag_notice = ( | |
| "Cross-Reference Sources is on, but this session has no indexed documents — " | |
| "generated from model knowledge only. Ingest sources in Step 1 to enable RAG." | |
| ) | |
| source_label = _SOURCE_LABELS["none"] | |
| effective_sid = "" | |
| effective_docs = [] | |
| upload_files = file_paths if file_paths else None | |
| return _run_slide_generation( | |
| topic=effective_topic, | |
| sid=sid, | |
| rag_notice=rag_notice, | |
| grade=grade, | |
| slide_count=int(slide_count), | |
| source_mode_label=source_label, | |
| search_workflow_label=workflow_label, | |
| urls_text=urls_text or "", | |
| selected_urls=selected_urls or [], | |
| upload_files=upload_files, | |
| session_id=effective_sid, | |
| doc_ids=effective_docs, | |
| workspace_topic=effective_topic, | |
| workspace_session=effective_sid, | |
| workspace_doc_ids=effective_docs, | |
| progress=_NoopProgress(), | |
| skip_preview_images=False, | |
| conversation_context=conversation_text, | |
| conversation_topic=derived_topic, | |
| ) | |
| def api_teacher_voice_turn( | |
| message: str, | |
| mode: TeacherVoiceMode = "lesson", | |
| topic: str = "", | |
| session_id: str = "", | |
| use_rag: bool = True, | |
| history: list | None = None, | |
| doc_ids: list[str] | None = None, | |
| language: str = "en", | |
| asr_preset: str | None = None, | |
| auto_voiceout: bool = True, | |
| coach_model: str = "", | |
| coach_variant: str = "auto", | |
| ) -> dict[str, Any]: | |
| model_key, load_error, fallback_note = _ensure_coach_loaded( | |
| coach_model or None, | |
| language=language, | |
| coach_variant=coach_variant, | |
| ) | |
| if load_error: | |
| return err(load_error) | |
| if not message.strip(): | |
| return err("Enter a message or record audio first.") | |
| hist = history or [] | |
| try: | |
| result = run_teacher_voice_text_turn( | |
| message.strip(), | |
| hist, | |
| mode=mode, | |
| language=language, | |
| topic=topic.strip() or None, | |
| backend=get_backend(model_key), | |
| coach_model=model_key, | |
| use_rag=use_rag and mode in RAG_MODES, | |
| session_id=session_id or None, | |
| doc_ids=doc_ids or None, | |
| auto_voiceout=auto_voiceout, | |
| ) | |
| except Exception as exc: # noqa: BLE001 | |
| return err(str(exc)) | |
| return ok( | |
| history=result.history, | |
| assistant=result.assistant_text, | |
| status=_coach_turn_status(result.rag_status, fallback_note), | |
| voiceout_path=result.voiceout_path, | |
| voiceout_warning=result.voiceout_warning, | |
| rag_references=result.rag_references, | |
| coach_model=model_key, | |
| coach_fallback=bool(fallback_note), | |
| ) | |
| def api_teacher_voice_audio_turn( | |
| audio_path: str, | |
| mode: TeacherVoiceMode = "lesson", | |
| topic: str = "", | |
| session_id: str = "", | |
| use_rag: bool = True, | |
| history: list | None = None, | |
| doc_ids: list[str] | None = None, | |
| language: str = "en", | |
| asr_preset: str | None = None, | |
| auto_voiceout: bool = True, | |
| coach_model: str = "", | |
| coach_variant: str = "auto", | |
| ) -> dict[str, Any]: | |
| model_key, load_error, fallback_note = _ensure_coach_loaded( | |
| coach_model or None, | |
| language=language, | |
| coach_variant=coach_variant, | |
| ) | |
| if load_error: | |
| return err(load_error) | |
| if not audio_path or not Path(audio_path).is_file(): | |
| return err("Record or upload audio first.") | |
| hist = history or [] | |
| preset = asr_preset or _echo_config.asr_preset | |
| max_turn = min(15, _echo_config.max_seconds) | |
| try: | |
| result = run_teacher_voice_turn( | |
| audio_path, | |
| hist, | |
| mode=mode, | |
| language=language, | |
| asr_preset=preset, | |
| topic=topic.strip() or None, | |
| backend=get_backend(model_key), | |
| coach_model=model_key, | |
| use_rag=use_rag and mode in RAG_MODES, | |
| session_id=session_id or None, | |
| doc_ids=doc_ids or None, | |
| max_turn_seconds=max_turn, | |
| auto_voiceout=auto_voiceout, | |
| ) | |
| except Exception as exc: # noqa: BLE001 | |
| return err(str(exc)) | |
| return ok( | |
| history=result.history, | |
| assistant=result.assistant_text, | |
| status=_coach_turn_status(result.rag_status, fallback_note), | |
| voiceout_path=result.voiceout_path, | |
| voiceout_warning=result.voiceout_warning, | |
| user_text=result.user_text, | |
| rag_references=result.rag_references, | |
| coach_model=model_key, | |
| coach_fallback=bool(fallback_note), | |
| ) | |
| def api_language_lesson_turn( | |
| message: str = "", | |
| audio_path: str = "", | |
| mode: TeacherVoiceMode = "lesson", | |
| topic: str = "", | |
| session_id: str = "", | |
| use_rag: bool = True, | |
| history: list | None = None, | |
| doc_ids: list[str] | None = None, | |
| language: str = "en", | |
| asr_preset: str | None = None, | |
| auto_voiceout: bool = True, | |
| coach_model: str = "", | |
| coach_variant: str = "auto", | |
| ) -> dict[str, Any]: | |
| """Unified Language lessons turn — routes to text or audio pipeline.""" | |
| if audio_path and audio_path.strip(): | |
| return api_teacher_voice_audio_turn( | |
| audio_path.strip(), | |
| mode=mode, | |
| topic=topic, | |
| session_id=session_id, | |
| use_rag=use_rag, | |
| history=history, | |
| doc_ids=doc_ids, | |
| language=language, | |
| asr_preset=asr_preset, | |
| auto_voiceout=auto_voiceout, | |
| coach_model=coach_model, | |
| coach_variant=coach_variant, | |
| ) | |
| return api_teacher_voice_turn( | |
| message, | |
| mode=mode, | |
| topic=topic, | |
| session_id=session_id, | |
| use_rag=use_rag, | |
| history=history, | |
| doc_ids=doc_ids, | |
| language=language, | |
| asr_preset=asr_preset, | |
| auto_voiceout=auto_voiceout, | |
| coach_model=coach_model, | |
| coach_variant=coach_variant, | |
| ) | |
| def api_teacher_voice_clear() -> dict[str, Any]: | |
| return ok( | |
| history=[], | |
| assistant="", | |
| status="Conversation cleared.", | |
| ) | |
| def api_teacher_voice_speak( | |
| history: list | None = None, | |
| language: str = "en", | |
| first_sentence_only: bool = False, | |
| ) -> dict[str, Any]: | |
| playback, status = speak_last_assistant_reply( | |
| history or [], | |
| language, | |
| first_sentence_only=first_sentence_only, | |
| ) | |
| if not playback: | |
| return err(status) | |
| return ok(voiceout_path=playback, status=status) | |
| def api_load_sample_pitch() -> dict[str, Any]: | |
| if not _SAMPLE_PITCH_AUDIO.is_file(): | |
| return err( | |
| f"Sample clip missing at `{_SAMPLE_PITCH_AUDIO}`. " | |
| "Run `uv run python libs/echocoach/tests/make_fixture.py`." | |
| ) | |
| return ok( | |
| audio_path=str(_SAMPLE_PITCH_AUDIO), | |
| status="Sample clip loaded — click Analyze pitch when ready.", | |
| ) | |
| def api_analyze_pitch( | |
| audio_path: str, | |
| language: str = "en", | |
| asr_preset: str | None = None, | |
| speak_rewrite: bool = False, | |
| ) -> dict[str, Any]: | |
| model_key, load_error, _fallback_note = _ensure_coach_loaded(None, language=language) | |
| if load_error: | |
| return err(load_error) | |
| if not audio_path or not Path(audio_path).is_file(): | |
| return err("Record or upload audio before analyzing.") | |
| preset = asr_preset or _echo_config.asr_preset | |
| try: | |
| result = run_echo_coach( | |
| audio_path, | |
| language=language, | |
| asr_preset=preset, | |
| coach_model=model_key, | |
| backend=get_backend(model_key), | |
| speak_rewrite=speak_rewrite, | |
| ) | |
| except Exception as exc: # noqa: BLE001 | |
| return err(str(exc)) | |
| panel = render_echo_coach_panel( | |
| pace_score=result.pace.score, | |
| wpm=result.pace.wpm, | |
| tip=result.coach.one_tip, | |
| report_md=result.report_markdown, | |
| transcript_html=result.transcript_html, | |
| filler_chart=result.filler_chart_path, | |
| pace_chart=result.pace_chart_path, | |
| voiceout_path=result.voiceout_path, | |
| ) | |
| return ok( | |
| transcript_html=result.transcript_html, | |
| report_md=result.report_markdown, | |
| pace_score=result.pace.score, | |
| wpm=result.pace.wpm, | |
| tip=result.coach.one_tip, | |
| filler_chart=result.filler_chart_path, | |
| pace_chart=result.pace_chart_path, | |
| voiceout_path=result.voiceout_path, | |
| coach_panel_html=panel, | |
| ) | |
| def api_model_status() -> dict[str, Any]: | |
| key = get_active_model_key() | |
| status_md = model_status(key) | |
| return ok(model_key=key, status_markdown=status_md) | |
| def api_model_choices() -> dict[str, Any]: | |
| key = get_active_model_key() | |
| active = _app_config.get_model(key) | |
| allow_switch = bool( | |
| _app_config.allow_model_switch and len(_app_config.models) > 1 | |
| ) | |
| choices = [] | |
| if allow_switch: | |
| choices = [{"key": k, "label": label} for label, k in _app_config.model_choices()] | |
| return ok( | |
| active_model=key, | |
| active_label=active.label, | |
| active_backend=active.backend, | |
| allow_model_switch=allow_switch, | |
| choices=choices, | |
| voice_stack=_voice_stack_summary(), | |
| paths=_paths_summary(), | |
| ) | |
| def api_set_active_model(model_key: str = "") -> dict[str, Any]: | |
| key = (model_key or "").strip() or get_active_model_key() | |
| try: | |
| status_md = select_and_reload_model(key) | |
| except KeyError as exc: | |
| return err(str(exc), model_key=key) | |
| if status_md.lower().startswith("error") or "failed" in status_md.lower(): | |
| return err(status_md, status_markdown=status_md, model_key=key) | |
| return ok(status_markdown=status_md, model_key=key) | |
| def api_reload_model(model_key: str = "") -> dict[str, Any]: | |
| key = (model_key or "").strip() or get_active_model_key() | |
| status_md = reload_model(key) | |
| if status_md.lower().startswith("error") or "failed" in status_md.lower(): | |
| return err(status_md, status_markdown=status_md, model_key=key) | |
| return ok(status_markdown=status_md, model_key=key) | |
| def api_recording_status() -> dict[str, Any]: | |
| status = recording_backend_status() | |
| return ok( | |
| backend=status, | |
| message=status, | |
| max_seconds=_echo_config.max_seconds, | |
| ) | |
| def api_recording_start(max_seconds: int | None = None) -> dict[str, Any]: | |
| limit = int(max_seconds or _echo_config.max_seconds) | |
| try: | |
| start_server_recording(limit) | |
| except ServerRecordingError as exc: | |
| return err(str(exc)) | |
| return ok( | |
| status=f"Recording… speak now, then stop (auto-stops after {limit}s).", | |
| max_seconds=limit, | |
| ) | |
| def api_recording_stop() -> dict[str, Any]: | |
| try: | |
| elapsed = recording_elapsed_seconds() | |
| path = stop_server_recording() | |
| warning = recording_level_warning(path) | |
| except ServerRecordingError as exc: | |
| return err(str(exc)) | |
| except Exception as exc: # noqa: BLE001 | |
| return err(f"Recording failed: {exc}") | |
| status = f"Recording saved ({elapsed:.1f}s)." | |
| if warning: | |
| status += f" Warning: {warning}" | |
| return ok(path=str(path), elapsed_seconds=elapsed, status=status, warning=warning or "") | |
| def api_voice_presets() -> dict[str, Any]: | |
| tts = _echo_config.get_tts() | |
| voice_langs = _voice_language_codes() | |
| coach_chain = _echo_config.coach_model_chain() | |
| coach_chain_labels = [_coach_model_label(key) for key in coach_chain] | |
| fallback_label = coach_chain_labels[1] if len(coach_chain_labels) > 1 else None | |
| return ok( | |
| languages=[{"label": label, "value": value} for label, value in _echo_config.language_choices()], | |
| asr_presets=[{"label": label, "value": value} for label, value in _echo_config.asr_choices()], | |
| coach_variants=[ | |
| {"label": "Tiny Aya Global (70+ languages)", "value": "tiny-aya-global"}, | |
| ], | |
| default_language=_echo_config.language_choices()[0][1] if _echo_config.language_choices() else "en", | |
| default_asr=_echo_config.asr_preset, | |
| default_coach=_echo_config.coach_model, | |
| coach_fallbacks=list(_echo_config.coach_fallbacks), | |
| coach_chain=coach_chain, | |
| coach_chain_labels=coach_chain_labels, | |
| voice_languages=voice_langs, | |
| max_seconds=_echo_config.max_seconds, | |
| voiceout_note=( | |
| f"Voice in/out: {len(voice_langs)} languages via Piper · " | |
| f"Coach: {coach_chain_labels[0]}" | |
| + (f" (fallback: {fallback_label})" if fallback_label else "") | |
| ), | |
| ) | |
| def api_save_upload(filename: str, content_base64: str) -> dict[str, Any]: | |
| """Save uploaded file bytes to a temp path for downstream ingest/analyze.""" | |
| if not content_base64: | |
| return err("Empty upload.") | |
| try: | |
| raw = base64.b64decode(content_base64) | |
| except Exception as exc: # noqa: BLE001 | |
| return err(f"Invalid upload encoding: {exc}") | |
| suffix = Path(filename or "upload.bin").suffix or ".bin" | |
| tmp = tempfile.NamedTemporaryFile(delete=False, suffix=suffix, prefix="studio_") | |
| tmp.write(raw) | |
| tmp.close() | |
| return ok(path=tmp.name, filename=filename) | |
| def register_studio_apis(server: gr.Server) -> None: | |
| """Register Studio JSON APIs on a gradio.Server instance.""" | |
| def _list_sessions() -> dict[str, Any]: | |
| return api_list_sessions() | |
| def _list_documents(session_id: str = "") -> dict[str, Any]: | |
| return api_list_documents(session_id) | |
| def _session_memory(session_id: str = "") -> dict[str, Any]: | |
| return api_session_memory(session_id) | |
| def _discover_sources(topic: str, session_id: str = "") -> dict[str, Any]: | |
| return api_discover_sources(topic, session_id) | |
| def _auto_search_ingest(topic: str, session_id: str = "") -> dict[str, Any]: | |
| return api_auto_search_ingest(topic, session_id) | |
| def _ingest_sources( | |
| topic: str, | |
| session_id: str = "", | |
| urls_text: str = "", | |
| selected_urls: list[str] | None = None, | |
| file_paths: list[str] | None = None, | |
| ) -> dict[str, Any]: | |
| return api_ingest_sources( | |
| topic, session_id, urls_text, selected_urls, file_paths | |
| ) | |
| def _ingest_url(topic: str, url: str, session_id: str = "") -> dict[str, Any]: | |
| return api_ingest_url(topic, url, session_id) | |
| def _research_chat( | |
| question: str, | |
| session_id: str = "", | |
| doc_ids: list[str] | None = None, | |
| history: list[dict[str, str]] | None = None, | |
| ) -> dict[str, Any]: | |
| return api_research_chat(question, session_id, doc_ids, history) | |
| def _debug_chat( | |
| message: str, | |
| history: list[list[str]] | None = None, | |
| use_rag: bool = False, | |
| session_id: str = "", | |
| doc_ids: list[str] | None = None, | |
| model_key: str = "", | |
| workspace_session_id: str = "", | |
| workspace_doc_ids: list[str] | None = None, | |
| ) -> dict[str, Any]: | |
| return api_debug_chat( | |
| message, | |
| history, | |
| use_rag, | |
| session_id, | |
| doc_ids, | |
| model_key, | |
| workspace_session_id, | |
| workspace_doc_ids, | |
| ) | |
| def _ingest_files( | |
| topic: str, | |
| session_id: str, | |
| file_paths: list[str], | |
| ) -> dict[str, Any]: | |
| return api_ingest_files(topic, session_id, file_paths) | |
| def _generate_slides( | |
| topic: str, | |
| grade: str = "6", | |
| slide_count: int = 5, | |
| session_id: str = "", | |
| use_rag: bool = True, | |
| doc_ids: list[str] | None = None, | |
| source_mode: str = "", | |
| search_workflow: str = "two_step", | |
| urls_text: str = "", | |
| selected_urls: list[str] | None = None, | |
| file_paths: list[str] | None = None, | |
| ) -> dict[str, Any]: | |
| return api_generate_slides( | |
| topic, | |
| grade, | |
| slide_count, | |
| session_id, | |
| use_rag, | |
| doc_ids, | |
| source_mode, | |
| search_workflow, | |
| urls_text, | |
| selected_urls, | |
| file_paths, | |
| ) | |
| def _generate_slides_from_conversation( | |
| history: list | None, | |
| history_kind: str = "gradio", | |
| topic: str = "", | |
| grade: str = "6", | |
| slide_count: int = 5, | |
| session_id: str = "", | |
| use_rag: bool = True, | |
| doc_ids: list[str] | None = None, | |
| source_mode: str = "", | |
| search_workflow: str = "two_step", | |
| urls_text: str = "", | |
| selected_urls: list[str] | None = None, | |
| file_paths: list[str] | None = None, | |
| ) -> dict[str, Any]: | |
| return api_generate_slides_from_conversation( | |
| history, | |
| history_kind, | |
| topic, | |
| grade, | |
| slide_count, | |
| session_id, | |
| use_rag, | |
| doc_ids, | |
| source_mode, | |
| search_workflow, | |
| urls_text, | |
| selected_urls, | |
| file_paths, | |
| ) | |
| def _generate_quiz( | |
| topic: str, | |
| grade: str = "6", | |
| question_count: int = 5, | |
| session_id: str = "", | |
| use_rag: bool = True, | |
| doc_ids: list[str] | None = None, | |
| source_mode: str = "", | |
| search_workflow: str = "two_step", | |
| urls_text: str = "", | |
| selected_urls: list[str] | None = None, | |
| file_paths: list[str] | None = None, | |
| ) -> dict[str, Any]: | |
| return api_generate_quiz( | |
| topic, | |
| grade, | |
| question_count, | |
| session_id, | |
| use_rag, | |
| doc_ids, | |
| source_mode, | |
| search_workflow, | |
| urls_text, | |
| selected_urls, | |
| file_paths, | |
| ) | |
| def _language_lesson_turn( | |
| message: str = "", | |
| audio_path: str = "", | |
| mode: Literal["explain", "lesson"] = "lesson", | |
| topic: str = "", | |
| session_id: str = "", | |
| use_rag: bool = True, | |
| history: list | None = None, | |
| doc_ids: list[str] | None = None, | |
| language: str = "en", | |
| asr_preset: str | None = None, | |
| auto_voiceout: bool = True, | |
| coach_model: str = "", | |
| coach_variant: str = "auto", | |
| ) -> dict[str, Any]: | |
| return api_language_lesson_turn( | |
| message, | |
| audio_path, | |
| mode, | |
| topic, | |
| session_id, | |
| use_rag, | |
| history, | |
| doc_ids, | |
| language, | |
| asr_preset, | |
| auto_voiceout, | |
| coach_model, | |
| coach_variant, | |
| ) | |
| def _teacher_voice_turn( | |
| message: str, | |
| mode: Literal["explain", "lesson", "pitch"] = "lesson", | |
| topic: str = "", | |
| session_id: str = "", | |
| use_rag: bool = True, | |
| history: list | None = None, | |
| doc_ids: list[str] | None = None, | |
| language: str = "en", | |
| asr_preset: str | None = None, | |
| auto_voiceout: bool = True, | |
| coach_model: str = "", | |
| coach_variant: str = "auto", | |
| ) -> dict[str, Any]: | |
| return api_teacher_voice_turn( | |
| message, | |
| mode, | |
| topic, | |
| session_id, | |
| use_rag, | |
| history, | |
| doc_ids, | |
| language, | |
| asr_preset, | |
| auto_voiceout, | |
| coach_model, | |
| coach_variant, | |
| ) | |
| def _teacher_voice_audio_turn( | |
| audio_path: str, | |
| mode: Literal["explain", "lesson", "pitch"] = "lesson", | |
| topic: str = "", | |
| session_id: str = "", | |
| use_rag: bool = True, | |
| history: list | None = None, | |
| doc_ids: list[str] | None = None, | |
| language: str = "en", | |
| asr_preset: str | None = None, | |
| auto_voiceout: bool = True, | |
| coach_model: str = "", | |
| coach_variant: str = "auto", | |
| ) -> dict[str, Any]: | |
| return api_teacher_voice_audio_turn( | |
| audio_path, | |
| mode, | |
| topic, | |
| session_id, | |
| use_rag, | |
| history, | |
| doc_ids, | |
| language, | |
| asr_preset, | |
| auto_voiceout, | |
| coach_model, | |
| coach_variant, | |
| ) | |
| def _teacher_voice_clear() -> dict[str, Any]: | |
| return api_teacher_voice_clear() | |
| def _teacher_voice_speak( | |
| history: list | None = None, | |
| language: str = "en", | |
| first_sentence_only: bool = False, | |
| ) -> dict[str, Any]: | |
| return api_teacher_voice_speak(history, language, first_sentence_only) | |
| def _load_sample_pitch() -> dict[str, Any]: | |
| return api_load_sample_pitch() | |
| def _analyze_pitch( | |
| audio_path: str, | |
| language: str = "en", | |
| asr_preset: str | None = None, | |
| speak_rewrite: bool = False, | |
| ) -> dict[str, Any]: | |
| return api_analyze_pitch(audio_path, language, asr_preset, speak_rewrite) | |
| def _model_status() -> dict[str, Any]: | |
| return api_model_status() | |
| def _model_choices() -> dict[str, Any]: | |
| return api_model_choices() | |
| def _set_active_model(model_key: str = "") -> dict[str, Any]: | |
| return api_set_active_model(model_key) | |
| def _reload_model(model_key: str = "") -> dict[str, Any]: | |
| return api_reload_model(model_key) | |
| def _recording_status() -> dict[str, Any]: | |
| return api_recording_status() | |
| def _recording_start(max_seconds: int | None = None) -> dict[str, Any]: | |
| return api_recording_start(max_seconds) | |
| def _recording_stop() -> dict[str, Any]: | |
| return api_recording_stop() | |
| def _voice_presets() -> dict[str, Any]: | |
| return api_voice_presets() | |
| def _save_upload(filename: str, content_base64: str) -> dict[str, Any]: | |
| return api_save_upload(filename, content_base64) | |