Sophia-AI-English-Tutor / src /core /speaking_tutor.py
rcventura108000's picture
feat: new improvemnets UX
b82482b
import base64
import logging
import time
import os
import gradio as gr
from typing import Any, Dict, Generator, List, Optional, Tuple
from src.core.base_tutor import BaseTutor
from src.utils.audio import (
extract_audio_from_response,
extract_text_from_response,
get_audio_duration,
save_audio_to_temp_file,
)
_logger = logging.getLogger(__name__)
if not _logger.handlers:
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s")
class SpeakingTutor(BaseTutor):
def process_input(
self,
input_data: Optional[str],
history: Optional[List[Dict[str, Any]]],
level: Optional[str] = None,
speaking_mode: Optional[str] = None,
) -> Generator[Tuple[List[Dict[str, Any]], List[Dict[str, Any]]], None, None]:
"""
Processes user audio input synchronously, fulfilling the BaseTutor contract.
This method orchestrates transcription and bot response in a single call.
The Gradio UI uses handle_transcription and handle_bot_response for a better UX.
"""
_logger.info(
f"SpeakingTutor.process_input (synchronous): Start. audio_file_path='{input_data}', level='{level}'"
)
updated_history = self.handle_transcription(audio_file_path=input_data, history=history)
final_history = self.handle_bot_response(history=updated_history, level=level)
_logger.info("SpeakingTutor.process_input (synchronous): Finished. Yielding final history.")
yield final_history, final_history
def handle_transcription(
self,
history: Optional[List[Dict[str, Any]]],
audio_filepath: Optional[str] = None,
level: Optional[str] = None,
speaking_mode: Optional[str] = None,
) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]]]:
"""Transcribes user audio, adds it to history, and returns the updated history."""
current_history = history.copy() if history else []
if not self.tutor_parent.openai_service:
error_message = {
"role": "assistant",
"content": "⚠️ No valid OpenAI API key set. Please enter your API key in the settings.",
}
current_history.append(error_message)
return current_history, current_history
if not audio_filepath or not os.path.exists(audio_filepath):
_logger.warning("Audio file not provided or does not exist.")
return current_history, current_history
# Check if the audio file is too small (likely an empty recording)
min_audio_size_bytes = 1024 # 1 KB
if os.path.getsize(audio_filepath) < min_audio_size_bytes:
_logger.error(f"Audio file at {audio_filepath} is too small, likely an empty recording.")
error_message = {
"role": "assistant",
"content": "It seems the audio was empty. Please try recording again.",
}
current_history.append(error_message)
return current_history, current_history
try:
transcription = self.tutor_parent.openai_service.transcribe_audio(audio_filepath)
if speaking_mode == "Immersive":
user_message = {"role": "user", "content": (audio_filepath, None), "text_for_llm": transcription}
else:
user_message = {"role": "user", "content": transcription}
current_history.append(user_message)
return current_history, current_history
except Exception as e:
error_message = {"role": "assistant", "content": f"Error transcribing audio: {str(e)}"}
current_history.append(error_message)
return current_history, current_history
def handle_bot_response(
self,
history: Optional[List[Dict[str, Any]]],
level: Optional[str] = None,
speaking_mode: Optional[str] = None,
) -> Generator[Tuple[List[Dict[str, Any]], List[Dict[str, Any]], Optional[str]], None, None]:
"""
Gets bot response, yields audio for immediate playback, waits for it to finish,
then yields the updated chat history with the bot's text.
"""
if not self.tutor_parent.openai_service:
yield gr.Error("No valid OpenAI API key set. Please enter your API key in the settings."), [], None
return
current_history = history.copy() if history else []
_logger.info(f"handle_bot_response: Start. History has {len(current_history)} messages.")
if not current_history or current_history[-1].get("role") != "user":
_logger.warning("handle_bot_response called with invalid history state. Aborting.")
yield current_history, current_history, None
return
system_prompt = self.tutor_parent.get_system_message(mode="speaking", level=level)
# Sanitize history to prevent audio generation bugs.
messages_for_llm = []
for message in current_history:
llm_message = {"role": message["role"]}
if "text_for_llm" in message:
llm_message["content"] = message["text_for_llm"]
else:
llm_message["content"] = message["content"]
messages_for_llm.append(llm_message)
messages_for_llm = [{"role": "system", "content": system_prompt}] + messages_for_llm
# Prune history to avoid context overflow (system prompt + last N messages)
max_hist = int(os.getenv("SPEAKING_MAX_HISTORY", "12"))
if len(messages_for_llm) > max_hist + 1:
messages_for_llm = [messages_for_llm[0]] + messages_for_llm[-max_hist:]
total_chars = sum(len(str(m.get("content", ""))) for m in messages_for_llm)
_logger.info(f"LLM payload pruned to {len(messages_for_llm)} msgs, ~{total_chars} chars.")
# Inject running summary if available to maintain long-term context
running_summary = getattr(self, "_running_summary", "")
if running_summary:
summary_msg = (
"Conversation summary so far (for continuity; do not repeat details, use as context only):\n"
+ running_summary
)
messages_for_llm = [{"role": "system", "content": summary_msg}] + messages_for_llm
_logger.info("Summary injected: True, summary_len=%d", len(running_summary))
else:
_logger.info("Summary injected: False, summary_len=0")
# Helper to update running summary (LLM-based with truncation fallback)
def _update_running_summary(last_user_text: str, bot_text: str) -> None:
max_chars = int(os.getenv("SPEAKING_SUMMARY_MAX_CHARS", "1200"))
prev = getattr(self, "_running_summary", "")
try:
prompt = (
"Update the running summary of a tutoring session.\n"
"Keep key facts, goals, corrections, and user preferences. Be concise (<= 120 words).\n\n"
f"Current summary:\n{prev}\n\n"
"Last exchange:\n"
f"User: {last_user_text}\n"
f"Tutor: {bot_text}\n\n"
"Return only the updated summary."
)
messages = [
{"role": "system", "content": "You are a concise note taker for an English tutoring session."},
{"role": "user", "content": prompt},
]
chunks = self.tutor_parent.openai_service.stream_chat_completion(
messages=messages, temperature=0.2, max_tokens=200
)
updated = "".join(chunks).strip()
if updated:
self._running_summary = updated[:max_chars]
_logger.info("Running summary updated via LLM, new_len=%d", len(self._running_summary))
return
except Exception as e:
_logger.debug(f"LLM summary update failed, falling back to truncation: {e}")
combined = (prev + " " + last_user_text + " " + bot_text).strip()
self._running_summary = combined[-max_chars:]
_logger.info("Running summary updated via truncation, new_len=%d", len(self._running_summary))
# Helper to extract the latest user text (prefer text_for_llm)
def _get_last_user_text() -> str:
for m in reversed(current_history):
if m.get("role") == "user":
return m.get("text_for_llm") or m.get("content") or ""
return ""
# Ensure variable is defined even if multimodal call raises repeatedly
bot_text_response: str = ""
try:
# Retry policy: default to 1 attempt, configurable via env
max_retries = int(os.getenv("AUDIO_RETRY_LIMIT", "1"))
# Backoff base delay in milliseconds (0 = no backoff)
base_delay_ms = int(os.getenv("AUDIO_RETRY_BACKOFF_MS", "0"))
base_delay = max(0.0, base_delay_ms / 1000.0)
attempts = 0
audio_base64_data = None
# Per-mode max_tokens configuration
default_max_tokens = int(os.getenv("SPEAKING_MAX_TOKENS_DEFAULT", "700"))
hybrid_max = int(os.getenv("SPEAKING_MAX_TOKENS_HYBRID", str(default_max_tokens)))
immersive_max = int(os.getenv("SPEAKING_MAX_TOKENS_IMMERSIVE", str(default_max_tokens)))
max_tokens = immersive_max if speaking_mode == "Immersive" else hybrid_max
_logger.info(f"Using max_tokens={max_tokens} for mode={speaking_mode or 'hybrid'}")
while attempts < max_retries and not audio_base64_data:
try:
response = self.tutor_parent.openai_service.chat_multimodal(
messages=messages_for_llm, voice="alloy", max_tokens=max_tokens
)
bot_text_response = extract_text_from_response(response)
_logger.info(f"LLM text length: {len(bot_text_response)} chars")
audio_base64_data = extract_audio_from_response(response)
if not audio_base64_data:
# Verifica se o erro é por limite de tokens
if "context_length_exceeded" in str(response).lower():
_logger.error("Erro: Limite de tokens excedido, não tentará novamente")
break
# Local diagnostics (in addition to utils.audio)
try:
resp_type = type(response).__name__
has_choices = hasattr(response, "choices") and bool(response.choices)
has_audio_attr = False
if has_choices:
msg = response.choices[0].message
has_audio_attr = hasattr(msg, "audio") and getattr(msg, "audio") is not None
_logger.info(
"Diag(no-audio): resp_type=%s has_choices=%s has_audio_attr=%s",
resp_type,
has_choices,
has_audio_attr,
)
except Exception as diag_e:
_logger.info("Diag(no-audio): failed to introspect response: %s", diag_e)
_logger.warning(f"No audio data in response (attempt {attempts+1}/{max_retries})")
# Backoff exponencial (condicional)
delay = base_delay * (2**attempts)
if delay > 0:
time.sleep(delay)
except Exception as e:
_logger.error(
"Erro na tentativa %d: %s (%s)",
attempts + 1,
str(e),
type(e).__name__,
)
# If context length exceeded, do not keep retrying the same payload
if "context_length" in str(e).lower() or "maximum context length" in str(e).lower():
_logger.error("Context length exceeded. Will not retry further with same payload.")
break
delay = base_delay * (2**attempts)
if delay > 0:
time.sleep(delay)
attempts += 1
# If multimodal did not yield text at all, try text-only fallback
if not bot_text_response:
_logger.warning("Multimodal returned no text; attempting text-only fallback.")
try:
chunks = self.tutor_parent.openai_service.stream_chat_completion(
messages=messages_for_llm, temperature=0.6, max_tokens=max_tokens
)
bot_text_response = "".join(chunks).strip()
_logger.info("Text-only fallback produced %d chars.", len(bot_text_response))
except Exception as e:
_logger.error("Text-only fallback failed: %s", e, exc_info=True)
# --- Fallback to TTS if no audio is returned ---
if bot_text_response and not audio_base64_data:
_logger.warning("Multimodal response missing audio, falling back to TTS.")
try:
# Generate audio from the text response (with safety cap)
tts_text = bot_text_response
max_tts_chars = int(os.getenv("TTS_MAX_CHARS", "1200"))
if len(tts_text) > max_tts_chars:
_logger.info(f"TTS input too long ({len(tts_text)} chars). Truncating to {max_tts_chars}.")
tts_text = tts_text[:max_tts_chars] + "..."
audio_bytes = self.tutor_parent.openai_service.text_to_speech(tts_text)
# The service returns raw bytes, so we need to encode it to base64
audio_base64_data = base64.b64encode(audio_bytes).decode("utf-8")
_logger.info("Successfully generated audio using TTS fallback.")
except Exception as e:
_logger.error(f"TTS fallback failed: {e}", exc_info=True)
# If TTS also fails, we proceed with no audio
audio_base64_data = None
# If we have text but no audio, send text-only fallback so the chat still updates
if bot_text_response and not audio_base64_data:
_logger.warning("Audio unavailable; sending text-only fallback message.")
current_history.append({"role": "assistant", "content": bot_text_response})
try:
_update_running_summary(_get_last_user_text(), bot_text_response)
except Exception as e:
_logger.debug("Summary update skipped (text-only): %s", e)
yield current_history, current_history, None
return
# If we don't have text either, surface an error message
if not bot_text_response:
_logger.error("Failed to get bot response text even after retries and TTS fallback.")
error_msg = "I'm sorry, I couldn't generate a response."
current_history.append({"role": "assistant", "content": error_msg})
yield current_history, current_history, None
return
# --- Audio-First UX Implementation ---
audio_bytes = base64.b64decode(audio_base64_data)
audio_path = save_audio_to_temp_file(audio_bytes)
if speaking_mode == "Immersive":
# In immersive mode, just add the audio player to the chat
_logger.info("Immersive mode: Yielding audio for playback.")
bot_message = {
"role": "assistant",
"content": (audio_path, None),
"text_for_llm": bot_text_response,
}
current_history.append(bot_message)
try:
_update_running_summary(_get_last_user_text(), bot_text_response)
except Exception as e:
_logger.debug("Summary update skipped (immersive): %s", e)
yield current_history, current_history, audio_path
return
# 1. Yield audio for immediate playback, without updating the chat text.
_logger.info("Audio-first UX: Yielding audio for playback.")
yield current_history, current_history, audio_path
# 2. Wait for the audio to finish playing before showing the text.
duration = get_audio_duration(audio_path)
# Add a small buffer to the wait time
wait_time = duration + 0.2
_logger.info(f"Audio-first UX: Waiting for {wait_time:.2f}s for audio to play.")
time.sleep(wait_time)
# 3. Now, simulate the streaming of the bot's text.
_logger.info("Audio-first UX: Simulating text stream.")
bot_full_text = bot_text_response
# Add an empty message bubble for the assistant to stream into.
current_history.append({"role": "assistant", "content": ""})
words = bot_full_text.split()
for i, word in enumerate(words):
# Update the content of the last message
current_history[-1]["content"] += word + " "
# Yield the updated state to the UI
yield current_history, current_history, None # audio_path
# Control the streaming speed for a natural feel
time.sleep(0.05)
# After finishing streaming, update the running summary
try:
_update_running_summary(_get_last_user_text(), bot_text_response)
except Exception as e:
_logger.debug("Summary update skipped (hybrid): %s", e)
except Exception as e:
_logger.error(f"Error calling chat_multimodal: {e}", exc_info=True)
error_msg = f"Sorry, an error occurred while I was thinking. Please try again."
current_history.append({"role": "assistant", "content": error_msg})
yield current_history, current_history, None