msg encrypted ai
Feat/sprint last 2hours (#22)
aac5f23
Raw
History Blame Contribute Delete
47.9 kB
from __future__ import annotations
import base64
import re
import tempfile
from pathlib import Path
from typing import Any, Literal
import gradio as gr
from echocoach.config import get_echo_coach_config
from echocoach.pipeline import run_echo_coach
from echocoach.prompts import TeacherVoiceMode
from echocoach.recording import (
ServerRecordingError,
recording_backend_status,
recording_elapsed_seconds,
recording_level_warning,
start_server_recording,
stop_server_recording,
)
from echocoach.teacher_voice import RAG_MODES, run_teacher_voice_text_turn, run_teacher_voice_turn
from gradio_space.api.serializers import err, ok, unwrap_update, update_value
from gradio_space.model_loading import (
ensure_model_loaded,
get_active_model_key,
model_status,
reload_model,
select_and_reload_model,
)
from gradio_space.research_helpers import (
list_session_choices,
memory_summary,
pick_session_for_topic,
rag_aware_chat,
rag_scope_hint,
resolve_doc_ids,
resolve_session,
)
from gradio_space.conversation_helpers import format_conversation_context
from gradio_space.tabs.education_pptx import SOURCE_MODES, SEARCH_WORKFLOWS, generate_lesson_slides
from gradio_space.tabs.quiz_maker import generate_quiz
from gradio_space.tabs.research_mind import (
ask_question,
auto_search_ingest,
discover_sources,
ingest_selected,
)
from gradio_space.ui.studio_html import (
render_doc_cards,
render_echo_coach_panel,
render_gallery_strip,
render_slide_canvas,
render_trace_details,
)
from gradio_space.voice_helpers import speak_last_assistant_reply
from inference.config import get_app_config, get_model_config
from inference.factory import get_backend
from researchmind.config import get_config as get_research_config
from researchmind.ingest import IngestPipeline
_echo_config = get_echo_coach_config()
_app_config = get_app_config()
_SAMPLE_PITCH_AUDIO = (
Path(__file__).resolve().parents[5]
/ "libs"
/ "echocoach"
/ "tests"
/ "fixtures"
/ "silence_2s.wav"
)
_SOURCE_LABELS = {value: label for label, value in SOURCE_MODES}
_WORKFLOW_LABELS = {value: label for label, value in SEARCH_WORKFLOWS}
class _NoopProgress:
def __call__(self, *args: Any, **kwargs: Any) -> None:
return None
def tqdm(self, iterable: Any, **kwargs: Any) -> Any:
return iterable
def _elapsed_seconds_from_log(processing_log: str) -> float | None:
match = re.search(r"Elapsed:\s*([\d.]+)s", processing_log or "")
if not match:
match = re.search(r"\*\*Elapsed:\*\* ([\d.]+)s", processing_log or "")
if not match:
return None
return float(match.group(1))
def _progress_from_trace(trace_json: str) -> dict[str, Any]:
import json
try:
trace = json.loads(trace_json)
except json.JSONDecodeError:
return {"steps": []}
steps = []
for step in trace.get("steps", []):
if step.get("type") != "step":
continue
duration_ms = step.get("duration_ms")
steps.append(
{
"name": step.get("name"),
"label": step.get("label"),
"detail": step.get("detail", ""),
"duration_s": round(duration_ms / 1000, 1) if duration_ms is not None else None,
"status": "done",
}
)
return {"steps": steps}
def _doc_meta(doc: Any) -> str:
uri = str(doc.uri or "")
if len(uri) > 48:
uri = uri[:45] + "…"
return f"{doc.source_type} · {uri}"
def _documents_payload(session_id: str) -> list[dict[str, Any]]:
store = IngestPipeline().store
docs = store.list_documents(session_id=session_id or None)
return [
{
"id": d.id,
"title": d.title,
"source_type": d.source_type,
"uri": d.uri,
"meta": _doc_meta(d),
}
for d in docs
]
def _session_has_rag_sources(session_id: str, doc_ids: list[str] | None) -> bool:
if not session_id:
return False
docs = _documents_payload(session_id)
if not docs:
return False
if doc_ids:
valid = {d["id"] for d in docs}
return any(doc_id in valid for doc_id in doc_ids)
return True
def _sessions_payload() -> list[dict[str, str]]:
sessions: list[dict[str, str]] = []
for label, sid in list_session_choices():
if sid == "":
continue
topic = label.split(" (")[0] if " (" in label else label
sessions.append({"id": sid, "label": label, "topic": topic})
return sessions
def _pick_session(topic_hint: str = "") -> str:
return pick_session_for_topic(topic_hint)
def _voice_stack_summary() -> str:
asr = _echo_config.get_asr()
tts = _echo_config.get_tts()
lines = [
f"ASR: {asr.label} ({_echo_config.asr_preset})",
f"TTS: {tts.label} ({_echo_config.tts_preset})",
f"Coach model: {_echo_config.coach_model}",
f"Coach fallbacks: {', '.join(_echo_config.coach_fallbacks) or 'none'}",
f"Max recording: {_echo_config.max_seconds}s",
]
return "\n".join(lines)
def _coach_model_key(
coach_model: str | None = None,
*,
language: str = "en",
coach_variant: str = "auto",
) -> str:
if coach_model and coach_model.strip():
key = coach_model.strip()
elif coach_variant and coach_variant not in ("auto", ""):
key = coach_variant.strip()
else:
key = _echo_config.coach_model
if key in ("tiny-aya-water", "tiny-aya-fire", "tiny-aya-earth", "auto"):
key = "tiny-aya-global"
return key
def _coach_model_label(model_key: str) -> str:
try:
return get_model_config(model_key).label
except Exception:
return model_key
def _coach_model_candidates(
coach_model: str | None = None,
*,
language: str = "en",
coach_variant: str = "auto",
) -> list[str]:
if coach_model and coach_model.strip():
return [coach_model.strip()]
primary = _coach_model_key(None, language=language, coach_variant=coach_variant)
chain: list[str] = []
seen: set[str] = set()
for key in (primary, *_echo_config.coach_fallbacks):
if key and key not in seen:
seen.add(key)
chain.append(key)
return chain or [primary]
def _ensure_coach_loaded(
coach_model: str | None = None,
*,
language: str = "en",
coach_variant: str = "auto",
) -> tuple[str, str | None, str | None]:
"""Load the first coach preset that succeeds. Returns (key, error, fallback_note)."""
candidates = _coach_model_candidates(
coach_model,
language=language,
coach_variant=coach_variant,
)
errors: list[str] = []
for index, key in enumerate(candidates):
load_error = ensure_model_loaded(key)
if not load_error:
if index == 0:
return key, None, None
label = _coach_model_label(key)
note = (
f"Primary coach unavailable — using fallback **{label}** (`{key}`). "
"Replies still follow your target language via prompts."
)
return key, None, note
errors.append(load_error)
return candidates[-1], errors[-1], None
def _coach_turn_status(base: str | None, fallback_note: str | None) -> str:
status = (base or "Turn complete.").strip()
if fallback_note:
return f"{fallback_note} {status}".strip()
return status
def _voice_language_codes() -> list[str]:
return [code for _, code in _echo_config.language_choices()]
def _paths_summary() -> str:
rm = get_research_config()
lines = []
if _app_config.presets_path:
lines.append(f"Model presets: {_app_config.presets_path}")
else:
lines.append("Model presets: built-in defaults")
lines.append(f"ResearchMind store: {rm.data_dir.resolve()}")
return "\n".join(lines)
def _resolve_source_labels(
source_mode: str,
search_workflow: str,
use_rag: bool,
session_id: str,
doc_ids: list[str] | None,
) -> tuple[str, str, str, list[str]]:
"""Return source_label, workflow_label, effective_session, effective_docs."""
mode = (source_mode or "").strip().lower()
if not mode:
sid = (session_id or "").strip()
has_sources = _session_has_rag_sources(sid, doc_ids) if use_rag else False
if use_rag and has_sources:
return (
_SOURCE_LABELS["rag"],
_WORKFLOW_LABELS["two_step"],
sid,
doc_ids or [],
)
return _SOURCE_LABELS["none"], _WORKFLOW_LABELS["two_step"], "", []
workflow_key = (search_workflow or "two_step").strip().lower()
if workflow_key not in _WORKFLOW_LABELS:
workflow_key = "two_step"
if mode not in _SOURCE_LABELS:
mode = "none"
sid = (session_id or "").strip()
if mode == "rag" and not sid:
sid = ""
return (
_SOURCE_LABELS[mode],
_WORKFLOW_LABELS[workflow_key],
sid if mode == "rag" else sid,
doc_ids or [] if mode == "rag" else [],
)
def api_list_sessions() -> dict[str, Any]:
return ok(sessions=_sessions_payload())
def api_list_documents(session_id: str = "") -> dict[str, Any]:
docs = _documents_payload(session_id)
html_cards = render_doc_cards(docs, rag_active=bool(docs))
return ok(
session_id=session_id,
documents=docs,
documents_html=html_cards,
memory_markdown=memory_summary(session_id),
)
def api_session_memory(session_id: str = "") -> dict[str, Any]:
return ok(memory_markdown=memory_summary(session_id))
def _ingest_response(
status: str,
session_id: str,
trace_json: str = "",
trace_summary: str = "",
) -> dict[str, Any]:
sid = session_id or ""
docs = _documents_payload(sid)
return ok(
status=status,
session_id=sid,
documents=docs,
documents_html=render_doc_cards(docs, rag_active=bool(docs)),
trace_json=trace_json,
trace_summary=trace_summary,
trace_html=render_trace_details(trace_summary=trace_summary, trace_json=trace_json),
)
def api_discover_sources(topic: str, session_id: str = "") -> dict[str, Any]:
if not (topic or "").strip():
return err("Enter a workspace topic before discovering sources.")
summary, url_up, sess_up, trace_sum, trace_json, _memory, _doc_up, _acc_up = discover_sources(
topic,
session_id,
"",
"",
_NoopProgress(),
)
url_payload = unwrap_update(url_up)
urls = list(url_payload.get("choices") or []) if isinstance(url_payload, dict) else []
selected = list(url_payload.get("value") or urls) if isinstance(url_payload, dict) else urls
sid = update_value(sess_up, session_id)
trace_str = trace_json if isinstance(trace_json, str) else ""
if summary and "error" in summary.lower() and not urls:
return err(strip_md_summary(summary), status=summary, urls=[], session_id=sid)
return ok(
status=summary,
urls=urls,
selected_urls=selected,
session_id=sid,
trace_summary=trace_sum,
trace_json=trace_str,
trace_html=render_trace_details(trace_summary=trace_sum, trace_json=trace_str),
)
def api_auto_search_ingest(topic: str, session_id: str = "") -> dict[str, Any]:
if not (topic or "").strip():
return err("Enter a workspace topic before auto-ingest.")
status, _url_up, sess_up, trace_sum, trace_json, _memory, _doc_up, _acc_up = auto_search_ingest(
topic,
session_id,
"",
"",
_NoopProgress(),
)
sid = update_value(sess_up, session_id)
if status and "error" in status.lower() and "ingested" not in status.lower():
return err(strip_md_summary(status), status=status, session_id=sid)
return _ingest_response(status, sid, trace_json=str(trace_json or ""), trace_summary=trace_sum)
def api_ingest_sources(
topic: str,
session_id: str = "",
urls_text: str = "",
selected_urls: list[str] | None = None,
file_paths: list[str] | None = None,
) -> dict[str, Any]:
has_urls = bool((urls_text or "").strip() or (selected_urls or []))
has_files = bool(file_paths)
if not has_urls and not has_files:
return err("Add URLs, select suggested sources, or upload a file — then ingest.")
status, _memory, trace_json, trace_sum, sess_up, _doc_up = ingest_selected(
topic,
urls_text,
selected_urls or [],
file_paths,
session_id or None,
"",
"",
_NoopProgress(),
)
sid = update_value(sess_up, session_id)
if status and "error" in status.lower() and "ingested" not in status.lower():
return err(strip_md_summary(status), status=status, session_id=sid)
return _ingest_response(status, sid, trace_json=str(trace_json or ""), trace_summary=trace_sum)
def strip_md_summary(text: str) -> str:
return re.sub(r"\*\*", "", str(text or "")).strip()
def api_ingest_url(topic: str, url: str, session_id: str = "") -> dict[str, Any]:
if not url.strip():
return err("Paste a URL to ingest.")
return api_ingest_sources(topic, session_id, urls_text=url.strip())
def api_ingest_files(
topic: str,
session_id: str,
file_paths: list[str],
) -> dict[str, Any]:
if not file_paths:
return err("Upload at least one PDF or DOCX file.")
return api_ingest_sources(topic, session_id, file_paths=file_paths)
def api_research_chat(
question: str,
session_id: str = "",
doc_ids: list[str] | None = None,
history: list[dict[str, str]] | None = None,
) -> dict[str, Any]:
if not question.strip():
return err("Enter a question.")
hist, trace_json, trace_sum, rag_hint, _cleared = ask_question(
question,
session_id,
doc_ids or [],
history or [],
"",
doc_ids or [],
_NoopProgress(),
)
assistant = ""
for msg in reversed(hist or []):
if msg.get("role") == "assistant":
assistant = str(msg.get("content") or "")
break
trace_str = trace_json if isinstance(trace_json, str) else ""
return ok(
history=hist,
assistant=assistant,
rag_hint=rag_hint,
trace_json=trace_str,
trace_summary=trace_sum,
trace_html=render_trace_details(trace_summary=trace_sum, trace_json=trace_str),
)
def api_debug_chat(
message: str,
history: list[list[str]] | None = None,
use_rag: bool = False,
session_id: str = "",
doc_ids: list[str] | None = None,
model_key: str = "",
workspace_session_id: str = "",
workspace_doc_ids: list[str] | None = None,
) -> dict[str, Any]:
if not (message or "").strip():
return err("Enter a message.")
key = (model_key or "").strip() or get_active_model_key()
load_error = ensure_model_loaded(key)
if load_error:
return err(load_error)
sid = resolve_session(session_id, workspace_session_id)
docs = resolve_doc_ids(doc_ids, workspace_doc_ids)
hist = history or []
reply, trace_json, trace_summary = rag_aware_chat(
message.strip(),
hist,
key,
use_rag,
sid,
docs,
)
new_history = list(hist)
new_history.append([message.strip(), reply])
return ok(
history=new_history,
assistant=reply,
rag_hint=rag_scope_hint(sid, docs),
trace_json=trace_json,
trace_summary=trace_summary,
trace_html=render_trace_details(trace_summary=trace_summary, trace_json=trace_json),
)
def _build_slide_api_response(
last: tuple,
*,
topic: str,
sid: str,
rag_notice: str = "",
) -> dict[str, Any]:
(
outline_md,
preview_html,
gallery,
pptx,
docx,
html_export,
processing_log,
trace_sum,
trace_json,
status,
) = last
if preview_html and "form-error" in preview_html:
return err(status or "Generation failed.", status=status, progress_log=processing_log)
if rag_notice:
status = f"{rag_notice}\n\n{status or 'Slides generated.'}".strip()
downloads = {
"pptx": pptx,
"docx": docx,
"html": html_export,
}
trace_str = trace_json if isinstance(trace_json, str) else ""
return ok(
topic=topic,
session_id=sid,
outline_md=outline_md,
preview_html=preview_html,
canvas_html=render_slide_canvas(preview_html),
gallery=gallery or [],
gallery_html=render_gallery_strip(gallery or []),
downloads=downloads,
status=status,
rag_fallback=bool(rag_notice),
progress_log=processing_log,
trace_summary=trace_sum,
trace_json=trace_str,
trace_html=render_trace_details(
trace_summary=trace_sum,
trace_json=trace_str,
progress_log=processing_log,
),
elapsed_seconds=_elapsed_seconds_from_log(processing_log),
progress=_progress_from_trace(trace_str),
)
def _run_slide_generation(**kwargs) -> dict[str, Any]:
topic = kwargs.pop("topic")
sid = kwargs.pop("sid", "")
rag_notice = kwargs.pop("rag_notice", "")
gen = generate_lesson_slides(topic, **kwargs)
last: tuple | None = None
for item in gen:
last = item
if last is None:
return err("Generation failed before producing output.")
return _build_slide_api_response(last, topic=topic, sid=sid, rag_notice=rag_notice)
def api_generate_slides(
topic: str,
grade: str = "6",
slide_count: int = 5,
session_id: str = "",
use_rag: bool = True,
doc_ids: list[str] | None = None,
source_mode: str = "",
search_workflow: str = "two_step",
urls_text: str = "",
selected_urls: list[str] | None = None,
file_paths: list[str] | None = None,
) -> dict[str, Any]:
rag_docs = doc_ids or []
sid = (session_id or "").strip()
if not (source_mode or "").strip() and use_rag and not sid:
sid = _pick_session(topic)
source_label, workflow_label, effective_sid, effective_docs = _resolve_source_labels(
source_mode,
search_workflow,
use_rag,
sid,
rag_docs,
)
rag_notice = ""
if (source_mode or "").strip().lower() == "rag" or (
not (source_mode or "").strip() and use_rag
):
has_sources = _session_has_rag_sources(sid, rag_docs)
if use_rag and not has_sources and source_label == _SOURCE_LABELS["rag"]:
rag_notice = (
"Cross-Reference Sources is on, but this session has no indexed documents — "
"generated from model knowledge only. Ingest sources in Step 1 to enable RAG."
)
source_label = _SOURCE_LABELS["none"]
effective_sid = ""
effective_docs = []
upload_files = file_paths if file_paths else None
return _run_slide_generation(
topic=topic,
sid=sid,
rag_notice=rag_notice,
grade=grade,
slide_count=int(slide_count),
source_mode_label=source_label,
search_workflow_label=workflow_label,
urls_text=urls_text or "",
selected_urls=selected_urls or [],
upload_files=upload_files,
session_id=effective_sid,
doc_ids=effective_docs,
workspace_topic=topic,
workspace_session=effective_sid,
workspace_doc_ids=effective_docs,
progress=_NoopProgress(),
skip_preview_images=False,
)
def _build_quiz_api_response(
last: tuple,
*,
topic: str,
sid: str,
rag_notice: str = "",
) -> dict[str, Any]:
(
outline_md,
preview_html,
docx,
html_export,
processing_log,
trace_sum,
trace_json,
status,
) = last
if preview_html and "form-error" in preview_html:
return err(status or "Generation failed.", status=status, progress_log=processing_log)
if rag_notice:
status = f"{rag_notice}\n\n{status or 'Quiz generated.'}".strip()
downloads = {
"docx": docx,
"html": html_export,
}
trace_str = trace_json if isinstance(trace_json, str) else ""
return ok(
topic=topic,
session_id=sid,
outline_md=outline_md,
preview_html=preview_html,
downloads=downloads,
status=status,
rag_fallback=bool(rag_notice),
progress_log=processing_log,
trace_summary=trace_sum,
trace_json=trace_str,
trace_html=render_trace_details(
trace_summary=trace_sum,
trace_json=trace_str,
progress_log=processing_log,
),
elapsed_seconds=_elapsed_seconds_from_log(processing_log),
progress=_progress_from_trace(trace_str),
)
def _run_quiz_generation(**kwargs) -> dict[str, Any]:
topic = kwargs.pop("topic")
sid = kwargs.pop("sid", "")
rag_notice = kwargs.pop("rag_notice", "")
gen = generate_quiz(topic, **kwargs)
last: tuple | None = None
for item in gen:
last = item
if last is None:
return err("Generation failed before producing output.")
return _build_quiz_api_response(last, topic=topic, sid=sid, rag_notice=rag_notice)
def api_generate_quiz(
topic: str,
grade: str = "6",
question_count: int = 5,
session_id: str = "",
use_rag: bool = True,
doc_ids: list[str] | None = None,
source_mode: str = "",
search_workflow: str = "two_step",
urls_text: str = "",
selected_urls: list[str] | None = None,
file_paths: list[str] | None = None,
) -> dict[str, Any]:
rag_docs = doc_ids or []
sid = (session_id or "").strip()
if not (source_mode or "").strip() and use_rag and not sid:
sid = _pick_session(topic)
source_label, workflow_label, effective_sid, effective_docs = _resolve_source_labels(
source_mode,
search_workflow,
use_rag,
sid,
rag_docs,
)
rag_notice = ""
if (source_mode or "").strip().lower() == "rag" or (
not (source_mode or "").strip() and use_rag
):
has_sources = _session_has_rag_sources(sid, rag_docs)
if use_rag and not has_sources and source_label == _SOURCE_LABELS["rag"]:
rag_notice = (
"Cross-Reference Sources is on, but this session has no indexed documents — "
"generated from model knowledge only. Ingest sources in Step 1 to enable RAG."
)
source_label = _SOURCE_LABELS["none"]
effective_sid = ""
effective_docs = []
upload_files = file_paths if file_paths else None
return _run_quiz_generation(
topic=topic,
sid=sid,
rag_notice=rag_notice,
grade=grade,
question_count=int(question_count),
source_mode_label=source_label,
search_workflow_label=workflow_label,
urls_text=urls_text or "",
selected_urls=selected_urls or [],
upload_files=upload_files,
session_id=effective_sid,
doc_ids=effective_docs,
workspace_topic=topic,
workspace_session=effective_sid,
workspace_doc_ids=effective_docs,
progress=_NoopProgress(),
)
def api_generate_slides_from_conversation(
history: list | None,
history_kind: str,
topic: str,
grade: str = "6",
slide_count: int = 5,
session_id: str = "",
use_rag: bool = True,
doc_ids: list[str] | None = None,
source_mode: str = "",
search_workflow: str = "two_step",
urls_text: str = "",
selected_urls: list[str] | None = None,
file_paths: list[str] | None = None,
) -> dict[str, Any]:
conversation_text, derived_topic = format_conversation_context(history, history_kind)
if not conversation_text.strip():
return err("Start a conversation first.")
effective_topic = (topic or "").strip() or derived_topic
if not effective_topic:
return err("Enter a topic or chat about a lesson first.")
rag_docs = doc_ids or []
sid = (session_id or "").strip()
if not (source_mode or "").strip() and use_rag and not sid:
sid = _pick_session(effective_topic)
source_label, workflow_label, effective_sid, effective_docs = _resolve_source_labels(
source_mode,
search_workflow,
use_rag,
sid,
rag_docs,
)
rag_notice = ""
if (source_mode or "").strip().lower() == "rag" or (
not (source_mode or "").strip() and use_rag
):
has_sources = _session_has_rag_sources(sid, rag_docs)
if use_rag and not has_sources and source_label == _SOURCE_LABELS["rag"]:
rag_notice = (
"Cross-Reference Sources is on, but this session has no indexed documents — "
"generated from model knowledge only. Ingest sources in Step 1 to enable RAG."
)
source_label = _SOURCE_LABELS["none"]
effective_sid = ""
effective_docs = []
upload_files = file_paths if file_paths else None
return _run_slide_generation(
topic=effective_topic,
sid=sid,
rag_notice=rag_notice,
grade=grade,
slide_count=int(slide_count),
source_mode_label=source_label,
search_workflow_label=workflow_label,
urls_text=urls_text or "",
selected_urls=selected_urls or [],
upload_files=upload_files,
session_id=effective_sid,
doc_ids=effective_docs,
workspace_topic=effective_topic,
workspace_session=effective_sid,
workspace_doc_ids=effective_docs,
progress=_NoopProgress(),
skip_preview_images=False,
conversation_context=conversation_text,
conversation_topic=derived_topic,
)
def api_teacher_voice_turn(
message: str,
mode: TeacherVoiceMode = "lesson",
topic: str = "",
session_id: str = "",
use_rag: bool = True,
history: list | None = None,
doc_ids: list[str] | None = None,
language: str = "en",
asr_preset: str | None = None,
auto_voiceout: bool = True,
coach_model: str = "",
coach_variant: str = "auto",
) -> dict[str, Any]:
model_key, load_error, fallback_note = _ensure_coach_loaded(
coach_model or None,
language=language,
coach_variant=coach_variant,
)
if load_error:
return err(load_error)
if not message.strip():
return err("Enter a message or record audio first.")
hist = history or []
try:
result = run_teacher_voice_text_turn(
message.strip(),
hist,
mode=mode,
language=language,
topic=topic.strip() or None,
backend=get_backend(model_key),
coach_model=model_key,
use_rag=use_rag and mode in RAG_MODES,
session_id=session_id or None,
doc_ids=doc_ids or None,
auto_voiceout=auto_voiceout,
)
except Exception as exc: # noqa: BLE001
return err(str(exc))
return ok(
history=result.history,
assistant=result.assistant_text,
status=_coach_turn_status(result.rag_status, fallback_note),
voiceout_path=result.voiceout_path,
voiceout_warning=result.voiceout_warning,
rag_references=result.rag_references,
coach_model=model_key,
coach_fallback=bool(fallback_note),
)
def api_teacher_voice_audio_turn(
audio_path: str,
mode: TeacherVoiceMode = "lesson",
topic: str = "",
session_id: str = "",
use_rag: bool = True,
history: list | None = None,
doc_ids: list[str] | None = None,
language: str = "en",
asr_preset: str | None = None,
auto_voiceout: bool = True,
coach_model: str = "",
coach_variant: str = "auto",
) -> dict[str, Any]:
model_key, load_error, fallback_note = _ensure_coach_loaded(
coach_model or None,
language=language,
coach_variant=coach_variant,
)
if load_error:
return err(load_error)
if not audio_path or not Path(audio_path).is_file():
return err("Record or upload audio first.")
hist = history or []
preset = asr_preset or _echo_config.asr_preset
max_turn = min(15, _echo_config.max_seconds)
try:
result = run_teacher_voice_turn(
audio_path,
hist,
mode=mode,
language=language,
asr_preset=preset,
topic=topic.strip() or None,
backend=get_backend(model_key),
coach_model=model_key,
use_rag=use_rag and mode in RAG_MODES,
session_id=session_id or None,
doc_ids=doc_ids or None,
max_turn_seconds=max_turn,
auto_voiceout=auto_voiceout,
)
except Exception as exc: # noqa: BLE001
return err(str(exc))
return ok(
history=result.history,
assistant=result.assistant_text,
status=_coach_turn_status(result.rag_status, fallback_note),
voiceout_path=result.voiceout_path,
voiceout_warning=result.voiceout_warning,
user_text=result.user_text,
rag_references=result.rag_references,
coach_model=model_key,
coach_fallback=bool(fallback_note),
)
def api_language_lesson_turn(
message: str = "",
audio_path: str = "",
mode: TeacherVoiceMode = "lesson",
topic: str = "",
session_id: str = "",
use_rag: bool = True,
history: list | None = None,
doc_ids: list[str] | None = None,
language: str = "en",
asr_preset: str | None = None,
auto_voiceout: bool = True,
coach_model: str = "",
coach_variant: str = "auto",
) -> dict[str, Any]:
"""Unified Language lessons turn — routes to text or audio pipeline."""
if audio_path and audio_path.strip():
return api_teacher_voice_audio_turn(
audio_path.strip(),
mode=mode,
topic=topic,
session_id=session_id,
use_rag=use_rag,
history=history,
doc_ids=doc_ids,
language=language,
asr_preset=asr_preset,
auto_voiceout=auto_voiceout,
coach_model=coach_model,
coach_variant=coach_variant,
)
return api_teacher_voice_turn(
message,
mode=mode,
topic=topic,
session_id=session_id,
use_rag=use_rag,
history=history,
doc_ids=doc_ids,
language=language,
asr_preset=asr_preset,
auto_voiceout=auto_voiceout,
coach_model=coach_model,
coach_variant=coach_variant,
)
def api_teacher_voice_clear() -> dict[str, Any]:
return ok(
history=[],
assistant="",
status="Conversation cleared.",
)
def api_teacher_voice_speak(
history: list | None = None,
language: str = "en",
first_sentence_only: bool = False,
) -> dict[str, Any]:
playback, status = speak_last_assistant_reply(
history or [],
language,
first_sentence_only=first_sentence_only,
)
if not playback:
return err(status)
return ok(voiceout_path=playback, status=status)
def api_load_sample_pitch() -> dict[str, Any]:
if not _SAMPLE_PITCH_AUDIO.is_file():
return err(
f"Sample clip missing at `{_SAMPLE_PITCH_AUDIO}`. "
"Run `uv run python libs/echocoach/tests/make_fixture.py`."
)
return ok(
audio_path=str(_SAMPLE_PITCH_AUDIO),
status="Sample clip loaded — click Analyze pitch when ready.",
)
def api_analyze_pitch(
audio_path: str,
language: str = "en",
asr_preset: str | None = None,
speak_rewrite: bool = False,
) -> dict[str, Any]:
model_key, load_error, _fallback_note = _ensure_coach_loaded(None, language=language)
if load_error:
return err(load_error)
if not audio_path or not Path(audio_path).is_file():
return err("Record or upload audio before analyzing.")
preset = asr_preset or _echo_config.asr_preset
try:
result = run_echo_coach(
audio_path,
language=language,
asr_preset=preset,
coach_model=model_key,
backend=get_backend(model_key),
speak_rewrite=speak_rewrite,
)
except Exception as exc: # noqa: BLE001
return err(str(exc))
panel = render_echo_coach_panel(
pace_score=result.pace.score,
wpm=result.pace.wpm,
tip=result.coach.one_tip,
report_md=result.report_markdown,
transcript_html=result.transcript_html,
filler_chart=result.filler_chart_path,
pace_chart=result.pace_chart_path,
voiceout_path=result.voiceout_path,
)
return ok(
transcript_html=result.transcript_html,
report_md=result.report_markdown,
pace_score=result.pace.score,
wpm=result.pace.wpm,
tip=result.coach.one_tip,
filler_chart=result.filler_chart_path,
pace_chart=result.pace_chart_path,
voiceout_path=result.voiceout_path,
coach_panel_html=panel,
)
def api_model_status() -> dict[str, Any]:
key = get_active_model_key()
status_md = model_status(key)
return ok(model_key=key, status_markdown=status_md)
def api_model_choices() -> dict[str, Any]:
key = get_active_model_key()
active = _app_config.get_model(key)
allow_switch = bool(
_app_config.allow_model_switch and len(_app_config.models) > 1
)
choices = []
if allow_switch:
choices = [{"key": k, "label": label} for label, k in _app_config.model_choices()]
return ok(
active_model=key,
active_label=active.label,
active_backend=active.backend,
allow_model_switch=allow_switch,
choices=choices,
voice_stack=_voice_stack_summary(),
paths=_paths_summary(),
)
def api_set_active_model(model_key: str = "") -> dict[str, Any]:
key = (model_key or "").strip() or get_active_model_key()
try:
status_md = select_and_reload_model(key)
except KeyError as exc:
return err(str(exc), model_key=key)
if status_md.lower().startswith("error") or "failed" in status_md.lower():
return err(status_md, status_markdown=status_md, model_key=key)
return ok(status_markdown=status_md, model_key=key)
def api_reload_model(model_key: str = "") -> dict[str, Any]:
key = (model_key or "").strip() or get_active_model_key()
status_md = reload_model(key)
if status_md.lower().startswith("error") or "failed" in status_md.lower():
return err(status_md, status_markdown=status_md, model_key=key)
return ok(status_markdown=status_md, model_key=key)
def api_recording_status() -> dict[str, Any]:
status = recording_backend_status()
return ok(
backend=status,
message=status,
max_seconds=_echo_config.max_seconds,
)
def api_recording_start(max_seconds: int | None = None) -> dict[str, Any]:
limit = int(max_seconds or _echo_config.max_seconds)
try:
start_server_recording(limit)
except ServerRecordingError as exc:
return err(str(exc))
return ok(
status=f"Recording… speak now, then stop (auto-stops after {limit}s).",
max_seconds=limit,
)
def api_recording_stop() -> dict[str, Any]:
try:
elapsed = recording_elapsed_seconds()
path = stop_server_recording()
warning = recording_level_warning(path)
except ServerRecordingError as exc:
return err(str(exc))
except Exception as exc: # noqa: BLE001
return err(f"Recording failed: {exc}")
status = f"Recording saved ({elapsed:.1f}s)."
if warning:
status += f" Warning: {warning}"
return ok(path=str(path), elapsed_seconds=elapsed, status=status, warning=warning or "")
def api_voice_presets() -> dict[str, Any]:
tts = _echo_config.get_tts()
voice_langs = _voice_language_codes()
coach_chain = _echo_config.coach_model_chain()
coach_chain_labels = [_coach_model_label(key) for key in coach_chain]
fallback_label = coach_chain_labels[1] if len(coach_chain_labels) > 1 else None
return ok(
languages=[{"label": label, "value": value} for label, value in _echo_config.language_choices()],
asr_presets=[{"label": label, "value": value} for label, value in _echo_config.asr_choices()],
coach_variants=[
{"label": "Tiny Aya Global (70+ languages)", "value": "tiny-aya-global"},
],
default_language=_echo_config.language_choices()[0][1] if _echo_config.language_choices() else "en",
default_asr=_echo_config.asr_preset,
default_coach=_echo_config.coach_model,
coach_fallbacks=list(_echo_config.coach_fallbacks),
coach_chain=coach_chain,
coach_chain_labels=coach_chain_labels,
voice_languages=voice_langs,
max_seconds=_echo_config.max_seconds,
voiceout_note=(
f"Voice in/out: {len(voice_langs)} languages via Piper · "
f"Coach: {coach_chain_labels[0]}"
+ (f" (fallback: {fallback_label})" if fallback_label else "")
),
)
def api_save_upload(filename: str, content_base64: str) -> dict[str, Any]:
"""Save uploaded file bytes to a temp path for downstream ingest/analyze."""
if not content_base64:
return err("Empty upload.")
try:
raw = base64.b64decode(content_base64)
except Exception as exc: # noqa: BLE001
return err(f"Invalid upload encoding: {exc}")
suffix = Path(filename or "upload.bin").suffix or ".bin"
tmp = tempfile.NamedTemporaryFile(delete=False, suffix=suffix, prefix="studio_")
tmp.write(raw)
tmp.close()
return ok(path=tmp.name, filename=filename)
def register_studio_apis(server: gr.Server) -> None:
"""Register Studio JSON APIs on a gradio.Server instance."""
@server.api(name="list_sessions")
def _list_sessions() -> dict[str, Any]:
return api_list_sessions()
@server.api(name="list_documents")
def _list_documents(session_id: str = "") -> dict[str, Any]:
return api_list_documents(session_id)
@server.api(name="session_memory")
def _session_memory(session_id: str = "") -> dict[str, Any]:
return api_session_memory(session_id)
@server.api(name="discover_sources")
def _discover_sources(topic: str, session_id: str = "") -> dict[str, Any]:
return api_discover_sources(topic, session_id)
@server.api(name="auto_search_ingest")
def _auto_search_ingest(topic: str, session_id: str = "") -> dict[str, Any]:
return api_auto_search_ingest(topic, session_id)
@server.api(name="ingest_sources")
def _ingest_sources(
topic: str,
session_id: str = "",
urls_text: str = "",
selected_urls: list[str] | None = None,
file_paths: list[str] | None = None,
) -> dict[str, Any]:
return api_ingest_sources(
topic, session_id, urls_text, selected_urls, file_paths
)
@server.api(name="ingest_url")
def _ingest_url(topic: str, url: str, session_id: str = "") -> dict[str, Any]:
return api_ingest_url(topic, url, session_id)
@server.api(name="research_chat")
def _research_chat(
question: str,
session_id: str = "",
doc_ids: list[str] | None = None,
history: list[dict[str, str]] | None = None,
) -> dict[str, Any]:
return api_research_chat(question, session_id, doc_ids, history)
@server.api(name="debug_chat")
def _debug_chat(
message: str,
history: list[list[str]] | None = None,
use_rag: bool = False,
session_id: str = "",
doc_ids: list[str] | None = None,
model_key: str = "",
workspace_session_id: str = "",
workspace_doc_ids: list[str] | None = None,
) -> dict[str, Any]:
return api_debug_chat(
message,
history,
use_rag,
session_id,
doc_ids,
model_key,
workspace_session_id,
workspace_doc_ids,
)
@server.api(name="ingest_files")
def _ingest_files(
topic: str,
session_id: str,
file_paths: list[str],
) -> dict[str, Any]:
return api_ingest_files(topic, session_id, file_paths)
@server.api(name="generate_slides")
def _generate_slides(
topic: str,
grade: str = "6",
slide_count: int = 5,
session_id: str = "",
use_rag: bool = True,
doc_ids: list[str] | None = None,
source_mode: str = "",
search_workflow: str = "two_step",
urls_text: str = "",
selected_urls: list[str] | None = None,
file_paths: list[str] | None = None,
) -> dict[str, Any]:
return api_generate_slides(
topic,
grade,
slide_count,
session_id,
use_rag,
doc_ids,
source_mode,
search_workflow,
urls_text,
selected_urls,
file_paths,
)
@server.api(name="generate_slides_from_conversation")
def _generate_slides_from_conversation(
history: list | None,
history_kind: str = "gradio",
topic: str = "",
grade: str = "6",
slide_count: int = 5,
session_id: str = "",
use_rag: bool = True,
doc_ids: list[str] | None = None,
source_mode: str = "",
search_workflow: str = "two_step",
urls_text: str = "",
selected_urls: list[str] | None = None,
file_paths: list[str] | None = None,
) -> dict[str, Any]:
return api_generate_slides_from_conversation(
history,
history_kind,
topic,
grade,
slide_count,
session_id,
use_rag,
doc_ids,
source_mode,
search_workflow,
urls_text,
selected_urls,
file_paths,
)
@server.api(name="generate_quiz")
def _generate_quiz(
topic: str,
grade: str = "6",
question_count: int = 5,
session_id: str = "",
use_rag: bool = True,
doc_ids: list[str] | None = None,
source_mode: str = "",
search_workflow: str = "two_step",
urls_text: str = "",
selected_urls: list[str] | None = None,
file_paths: list[str] | None = None,
) -> dict[str, Any]:
return api_generate_quiz(
topic,
grade,
question_count,
session_id,
use_rag,
doc_ids,
source_mode,
search_workflow,
urls_text,
selected_urls,
file_paths,
)
@server.api(name="language_lesson_turn")
def _language_lesson_turn(
message: str = "",
audio_path: str = "",
mode: Literal["explain", "lesson"] = "lesson",
topic: str = "",
session_id: str = "",
use_rag: bool = True,
history: list | None = None,
doc_ids: list[str] | None = None,
language: str = "en",
asr_preset: str | None = None,
auto_voiceout: bool = True,
coach_model: str = "",
coach_variant: str = "auto",
) -> dict[str, Any]:
return api_language_lesson_turn(
message,
audio_path,
mode,
topic,
session_id,
use_rag,
history,
doc_ids,
language,
asr_preset,
auto_voiceout,
coach_model,
coach_variant,
)
@server.api(name="teacher_voice_turn")
def _teacher_voice_turn(
message: str,
mode: Literal["explain", "lesson", "pitch"] = "lesson",
topic: str = "",
session_id: str = "",
use_rag: bool = True,
history: list | None = None,
doc_ids: list[str] | None = None,
language: str = "en",
asr_preset: str | None = None,
auto_voiceout: bool = True,
coach_model: str = "",
coach_variant: str = "auto",
) -> dict[str, Any]:
return api_teacher_voice_turn(
message,
mode,
topic,
session_id,
use_rag,
history,
doc_ids,
language,
asr_preset,
auto_voiceout,
coach_model,
coach_variant,
)
@server.api(name="teacher_voice_audio_turn")
def _teacher_voice_audio_turn(
audio_path: str,
mode: Literal["explain", "lesson", "pitch"] = "lesson",
topic: str = "",
session_id: str = "",
use_rag: bool = True,
history: list | None = None,
doc_ids: list[str] | None = None,
language: str = "en",
asr_preset: str | None = None,
auto_voiceout: bool = True,
coach_model: str = "",
coach_variant: str = "auto",
) -> dict[str, Any]:
return api_teacher_voice_audio_turn(
audio_path,
mode,
topic,
session_id,
use_rag,
history,
doc_ids,
language,
asr_preset,
auto_voiceout,
coach_model,
coach_variant,
)
@server.api(name="teacher_voice_clear")
def _teacher_voice_clear() -> dict[str, Any]:
return api_teacher_voice_clear()
@server.api(name="teacher_voice_speak")
def _teacher_voice_speak(
history: list | None = None,
language: str = "en",
first_sentence_only: bool = False,
) -> dict[str, Any]:
return api_teacher_voice_speak(history, language, first_sentence_only)
@server.api(name="load_sample_pitch")
def _load_sample_pitch() -> dict[str, Any]:
return api_load_sample_pitch()
@server.api(name="analyze_pitch")
def _analyze_pitch(
audio_path: str,
language: str = "en",
asr_preset: str | None = None,
speak_rewrite: bool = False,
) -> dict[str, Any]:
return api_analyze_pitch(audio_path, language, asr_preset, speak_rewrite)
@server.api(name="model_status")
def _model_status() -> dict[str, Any]:
return api_model_status()
@server.api(name="model_choices")
def _model_choices() -> dict[str, Any]:
return api_model_choices()
@server.api(name="set_active_model")
def _set_active_model(model_key: str = "") -> dict[str, Any]:
return api_set_active_model(model_key)
@server.api(name="reload_model")
def _reload_model(model_key: str = "") -> dict[str, Any]:
return api_reload_model(model_key)
@server.api(name="recording_status")
def _recording_status() -> dict[str, Any]:
return api_recording_status()
@server.api(name="recording_start")
def _recording_start(max_seconds: int | None = None) -> dict[str, Any]:
return api_recording_start(max_seconds)
@server.api(name="recording_stop")
def _recording_stop() -> dict[str, Any]:
return api_recording_stop()
@server.api(name="voice_presets")
def _voice_presets() -> dict[str, Any]:
return api_voice_presets()
@server.api(name="save_upload")
def _save_upload(filename: str, content_base64: str) -> dict[str, Any]:
return api_save_upload(filename, content_base64)