from __future__ import annotations import os from dataclasses import dataclass from pathlib import Path from typing import Any try: import yaml except ImportError: # pragma: no cover - exercised only in minimal runtimes. yaml = None from time_machine.adapters.fixtures import ( FixtureConversationEngine, FixtureDestinationGenerator, FixtureEncounterLibrary, FixtureImmersiveExperienceGenerator, FixturePersonaGenerator, FixtureSTTAdapter, FixtureSouvenirGenerator, FixtureTTSAdapter, ) from time_machine.adapters.image_gen import ( PromptOnlyImmersiveExperienceGenerator, TogetherImmersiveExperienceGenerator, ) from time_machine.adapters.llm import QwenStructuredLLMAdapter from time_machine.adapters.llm.cloud_completion import ( create_cloud_completion_fn, create_cloud_stream_completion_fn, ) from time_machine.adapters.model_registry import YamlModelRegistry from time_machine.adapters.stt import ( ModalNemotronSTTAdapter, NemotronStreamingSTTAdapter, WhisperSTTAdapter, ) from time_machine.adapters.storage import JsonlEncounterStore from time_machine.adapters.trace import JsonlTraceSink from time_machine.adapters.tts import KokoroTTSAdapter, ModalQwenTTSAdapter, SapiTTSAdapter from time_machine.application.encounter_service import EncounterService from time_machine.application.session_state import InMemorySessionRepository from time_machine.application.speech_orchestrator import SpeechOrchestrator from time_machine.application.souvenir_service import SouvenirService from time_machine.domain.errors import AdapterConfigurationError, ModelBudgetError from time_machine.domain.models import ModelBudget from time_machine.ports.speech import TTSAdapter REPO_ROOT = Path(__file__).resolve().parents[3] @dataclass(frozen=True) class AppContainer: adapter_profile: str encounter_service: EncounterService speech_orchestrator: SpeechOrchestrator souvenir_service: SouvenirService model_budget: ModelBudget image_generation_ready: bool image_generation_warning: str | None = None @dataclass(frozen=True) class ModelSelection: role: str provider: str model_id: str runtime: str source: str = "config" def create_container(adapter_profile: str | None = None) -> AppContainer: app_config = _load_yaml(REPO_ROOT / "config" / "app.yaml") configured_profile = app_config.get("app", {}).get("adapter_profile", "fixture") profile = adapter_profile or os.getenv("TIME_MACHINE_ADAPTER_PROFILE", configured_profile) model_budget = YamlModelRegistry(REPO_ROOT / "config" / "models.yaml").load_budget() if not model_budget.is_within_limit: raise ModelBudgetError( "At least one enabled model is above the per-model limit of " f"{model_budget.parameter_limit_billion}B." ) data_dir = Path(os.getenv("TIME_MACHINE_DATA_DIR", app_config["app"]["data_dir"])) trace_dir = Path(os.getenv("TIME_MACHINE_TRACE_DIR", app_config["app"]["trace_dir"])) if not data_dir.is_absolute(): data_dir = REPO_ROOT / data_dir if not trace_dir.is_absolute(): trace_dir = REPO_ROOT / trace_dir library = FixtureEncounterLibrary(REPO_ROOT / "fixtures" / "encounters") sessions = InMemorySessionRepository() store = JsonlEncounterStore(data_dir / "encounters" / "encounters.jsonl") trace_sink = JsonlTraceSink(trace_dir / "events.jsonl") if profile == "fixture": model_selection = [ ModelSelection("llm", "fixture", "fixture-conversation", "fixture", "profile"), ModelSelection("stt", "fixture", "fixture-stt", "fixture", "profile"), ModelSelection("tts", "fixture", "fixture-tts", "fixture", "profile"), ] destination_generator = FixtureDestinationGenerator(library) persona_generator = FixturePersonaGenerator(library) conversation_engine = FixtureConversationEngine(library) stt = FixtureSTTAdapter() tts = FixtureTTSAdapter() souvenir_generator = FixtureSouvenirGenerator(library) immersive_generator, image_generation_ready, image_generation_warning = _create_immersive_generator(profile) elif profile == "local_models": llm_model_id = _model_id(model_budget, "llm", "Qwen/Qwen3-4B-Instruct") llm_runtime = os.getenv("TIME_MACHINE_LLM_RUNTIME", "transformers") stt_model_id = _model_id( model_budget, "stt", "nvidia/nemotron-3.5-asr-streaming-0.6b", ) tts_model_id = _model_id(model_budget, "tts_emergency", "hexgrad/Kokoro-82M") model_selection = [ ModelSelection( "llm", "local", llm_model_id, llm_runtime, _env_source("TIME_MACHINE_LLM_RUNTIME"), ), ModelSelection("stt", "local", stt_model_id, "nemo", "config"), ModelSelection("tts", "local", tts_model_id, "kokoro", "config"), ] llm = QwenStructuredLLMAdapter( model_id=llm_model_id, runtime=llm_runtime, max_response_chars=_int_env( "TIME_MACHINE_MAX_RESPONSE_CHARS", int(app_config["app"].get("max_response_chars", 260)), ), ) destination_generator = llm persona_generator = llm conversation_engine = llm souvenir_generator = llm stt = NemotronStreamingSTTAdapter( model_id=stt_model_id ) tts = KokoroTTSAdapter( model_id=tts_model_id, output_dir=data_dir / "audio", ) immersive_generator, image_generation_ready, image_generation_warning = _create_immersive_generator(profile) elif profile == "dev": # Cloud LLM + local low-latency TTS + Whisper STT. # Fastest path to test real inference end-to-end. llm_model_id = os.getenv( "TIME_MACHINE_LLM_MODEL", os.getenv("TIME_MACHINE_LLM_DEV_MODEL", "Qwen/Qwen2.5-7B-Instruct-Turbo"), ) stt_model_size = os.getenv("TIME_MACHINE_WHISPER_MODEL", "base") tts_runtime = _dev_tts_runtime() tts_model_id = ( _model_id(model_budget, "tts_emergency", "hexgrad/Kokoro-82M") if tts_runtime == "kokoro" else "windows-sapi" ) model_selection = [ ModelSelection( "llm", _llm_provider_name(), llm_model_id, "cloud_api", _first_env_source("TIME_MACHINE_LLM_MODEL", "TIME_MACHINE_LLM_DEV_MODEL"), ), ModelSelection( "stt", "local", f"openai-whisper:{stt_model_size}", "whisper", _env_source("TIME_MACHINE_WHISPER_MODEL"), ), ModelSelection("tts", "local", tts_model_id, tts_runtime, _env_source("TIME_MACHINE_DEV_TTS")), ] completion_fn = create_cloud_completion_fn(model=llm_model_id) stream_completion_fn = create_cloud_stream_completion_fn(model=llm_model_id) llm = QwenStructuredLLMAdapter( model_id=llm_model_id, completion_fn=completion_fn, stream_completion_fn=stream_completion_fn, allow_development_fallback=_env_flag( "TIME_MACHINE_ALLOW_MODEL_FALLBACK", default=False, ), max_response_chars=_int_env( "TIME_MACHINE_MAX_RESPONSE_CHARS", int(app_config["app"].get("max_response_chars", 260)), ), ) destination_generator = llm persona_generator = llm conversation_engine = llm souvenir_generator = llm stt = WhisperSTTAdapter( model_size=stt_model_size, ) tts = _create_dev_tts(model_budget, data_dir) immersive_generator, image_generation_ready, image_generation_warning = _create_immersive_generator(profile) elif profile == "modal": llm_model_id = os.getenv( "TIME_MACHINE_LLM_MODEL", _model_id(model_budget, "llm", "Qwen/Qwen3-8B"), ) stt_model_id = _model_id( model_budget, "stt", "nvidia/nemotron-3.5-asr-streaming-0.6b", ) tts_model_family = os.getenv("TIME_MACHINE_MODAL_TTS_MODEL_FAMILY", "chatterbox_turbo") normalized_tts_family = _normalize_modal_tts_family(tts_model_family) tts_model_id = _modal_tts_model_id(model_budget, normalized_tts_family) model_selection = [ ModelSelection( "llm", _llm_provider_name(), llm_model_id, "cloud_api", _env_source("TIME_MACHINE_LLM_MODEL"), ), ModelSelection("stt", "modal", stt_model_id, "nemo", "config"), ModelSelection( "tts", "modal", tts_model_id, normalized_tts_family, _env_source("TIME_MACHINE_MODAL_TTS_MODEL_FAMILY", default="config"), ), ] completion_fn = create_cloud_completion_fn(model=llm_model_id) stream_completion_fn = create_cloud_stream_completion_fn(model=llm_model_id) llm = QwenStructuredLLMAdapter( model_id=llm_model_id, completion_fn=completion_fn, stream_completion_fn=stream_completion_fn, allow_development_fallback=_env_flag( "TIME_MACHINE_ALLOW_MODEL_FALLBACK", default=False, ), max_response_chars=_int_env( "TIME_MACHINE_MAX_RESPONSE_CHARS", int(app_config["app"].get("modal_max_response_chars", 120)), ), ) destination_generator = llm persona_generator = llm conversation_engine = llm souvenir_generator = llm modal_bearer_token = os.getenv("TIME_MACHINE_MODAL_BEARER_TOKEN") stt = ModalNemotronSTTAdapter( endpoint_url=_required_env("TIME_MACHINE_MODAL_STT_URL"), timeout_seconds=_float_env("TIME_MACHINE_MODAL_STT_TIMEOUT", 120.0), bearer_token=modal_bearer_token, language=os.getenv("TIME_MACHINE_MODAL_STT_LANGUAGE", "auto"), ) tts = ModalQwenTTSAdapter( endpoint_url=_required_env("TIME_MACHINE_MODAL_TTS_URL"), output_dir=data_dir / "audio", timeout_seconds=_float_env("TIME_MACHINE_MODAL_TTS_TIMEOUT", 180.0), bearer_token=modal_bearer_token, language=os.getenv("TIME_MACHINE_MODAL_TTS_LANGUAGE", "English"), model_family=normalized_tts_family, latency_profile=os.getenv("TIME_MACHINE_MODAL_TTS_LATENCY_PROFILE", "balanced"), exaggeration=_float_env("TIME_MACHINE_CHATTERBOX_EXAGGERATION", 0.65), cfg_weight=_float_env("TIME_MACHINE_CHATTERBOX_CFG_WEIGHT", 0.35), temperature=_float_env("TIME_MACHINE_CHATTERBOX_TEMPERATURE", 0.8), ) immersive_generator, image_generation_ready, image_generation_warning = _create_immersive_generator(profile) else: raise AdapterConfigurationError( f"Adapter profile '{profile}' is not implemented. " "Use 'fixture', 'local_models', 'dev', or 'modal'." ) _print_model_selection(profile, model_selection) encounter_service = EncounterService( sessions=sessions, destination_generator=destination_generator, persona_generator=persona_generator, conversation_engine=conversation_engine, tts=tts, souvenir_generator=souvenir_generator, store=store, trace_sink=trace_sink, immersive_generator=immersive_generator, ) return AppContainer( adapter_profile=profile, encounter_service=encounter_service, speech_orchestrator=SpeechOrchestrator(stt=stt, encounter_service=encounter_service), souvenir_service=SouvenirService(encounter_service), model_budget=model_budget, image_generation_ready=image_generation_ready, image_generation_warning=image_generation_warning, ) def _load_yaml(path: Path) -> dict[str, Any]: text = path.read_text(encoding="utf-8") if yaml is not None: return yaml.safe_load(text) return _load_simple_yaml(text) def _load_simple_yaml(text: str) -> dict[str, Any]: """Small fallback for this repo's config files when PyYAML is unavailable.""" root: dict[str, Any] = {} stack: list[tuple[int, Any]] = [(-1, root)] pending_list_key: tuple[int, dict[str, Any], str] | None = None for raw_line in text.splitlines(): if not raw_line.strip() or raw_line.lstrip().startswith("#"): continue indent = len(raw_line) - len(raw_line.lstrip(" ")) stripped = raw_line.strip() while stack and indent <= stack[-1][0]: stack.pop() if stripped.startswith("- "): if pending_list_key is None: raise ValueError("Simple YAML parser found a list without a key.") list_indent, parent, key = pending_list_key if indent <= list_indent: raise ValueError("Simple YAML parser found an incorrectly indented list.") items = parent.setdefault(key, []) item: dict[str, Any] = {} items.append(item) stack.append((indent, item)) remainder = stripped[2:].strip() if remainder: item_key, item_value = remainder.split(":", 1) item[item_key.strip()] = _parse_yaml_scalar(item_value.strip()) continue key, value = stripped.split(":", 1) parent = stack[-1][1] parsed = _parse_yaml_scalar(value.strip()) parent[key.strip()] = parsed if value.strip() == "": if _next_nonempty_starts_list(text, raw_line): pending_list_key = (indent, parent, key.strip()) parent[key.strip()] = [] else: parent[key.strip()] = {} stack.append((indent, parent[key.strip()])) return root def _parse_yaml_scalar(value: str) -> Any: if value == "": return {} lowered = value.lower() if lowered == "true": return True if lowered == "false": return False try: if "." in value: return float(value) return int(value) except ValueError: return value.strip('"') def _next_nonempty_starts_list(text: str, current_line: str) -> bool: lines = text.splitlines() try: start = lines.index(current_line) + 1 except ValueError: return False for line in lines[start:]: if not line.strip() or line.lstrip().startswith("#"): continue return line.strip().startswith("- ") return False def _model_id(model_budget: ModelBudget, role: str, default: str) -> str: for model in model_budget.models: if model.role == role: return model.model_id return default def _modal_tts_model_id(model_budget: ModelBudget, model_family: str) -> str: if model_family == "chatterbox_turbo": return _model_id(model_budget, "tts", "ResembleAI/chatterbox-turbo") return _model_id( model_budget, "tts_qwen_fallback", "Qwen/Qwen3-TTS-12Hz-1.7B-VoiceDesign", ) def _normalize_modal_tts_family(value: str) -> str: normalized = value.strip().lower().replace("-", "_") if normalized in {"chatterbox", "chatterbox_turbo", "turbo"}: return "chatterbox_turbo" return "qwen" def _dev_tts_runtime() -> str: default_tts = "sapi" if os.name == "nt" else "kokoro" return os.getenv("TIME_MACHINE_DEV_TTS", default_tts).strip().lower() def _llm_provider_name() -> str: base_url = os.getenv("TIME_MACHINE_LLM_BASE_URL", "https://api.together.xyz/v1") if "together" in base_url: return "together_ai" if "openrouter" in base_url: return "openrouter" if "fireworks" in base_url: return "fireworks" if "openai" in base_url: return "openai" return "openai_compatible" def _env_source(name: str, default: str = "default") -> str: return f"env:{name}" if os.getenv(name) else default def _first_env_source(*names: str) -> str: for name in names: if os.getenv(name): return f"env:{name}" return "default" def _print_model_selection(profile: str, selections: list[ModelSelection]) -> None: print(f"AI model selection: profile={profile}") for selection in selections: print( "AI model selection: " f"{selection.role} provider={selection.provider} " f"runtime={selection.runtime} model={selection.model_id} " f"source={selection.source}" ) def _create_immersive_generator( profile: str, ) -> tuple[ FixtureImmersiveExperienceGenerator | PromptOnlyImmersiveExperienceGenerator | TogetherImmersiveExperienceGenerator, bool, str | None, ]: has_image_key = bool( os.getenv("TIME_MACHINE_IMAGE_API_KEY") or os.getenv("TOGETHER_API_KEY") or os.getenv("TIME_MACHINE_LLM_API_KEY") ) if profile == "fixture": return FixtureImmersiveExperienceGenerator(), False, "Fixture profile is using local SVG visual fixtures." if has_image_key: fallback = PromptOnlyImmersiveExperienceGenerator() if profile in {"dev", "modal"} else FixtureImmersiveExperienceGenerator() return TogetherImmersiveExperienceGenerator(fallback=fallback), True, None if profile in {"dev", "modal"}: return ( PromptOnlyImmersiveExperienceGenerator(), False, "Image generation is disabled: set TIME_MACHINE_IMAGE_API_KEY, TOGETHER_API_KEY, or TIME_MACHINE_LLM_API_KEY for demo profiles.", ) return FixtureImmersiveExperienceGenerator(), False, "Image generation is disabled; local fixture images are in use." def _create_dev_tts(model_budget: ModelBudget, data_dir: Path) -> TTSAdapter: tts_runtime = _dev_tts_runtime() if tts_runtime == "sapi": if os.name != "nt": raise AdapterConfigurationError("TIME_MACHINE_DEV_TTS=sapi requires Windows.") return SapiTTSAdapter(output_dir=data_dir / "audio") if tts_runtime == "kokoro": return KokoroTTSAdapter( model_id=_model_id(model_budget, "tts_emergency", "hexgrad/Kokoro-82M"), output_dir=data_dir / "audio", ) raise AdapterConfigurationError( f"Unsupported TIME_MACHINE_DEV_TTS={tts_runtime!r}. Use 'sapi' or 'kokoro'." ) def _env_flag(name: str, default: bool) -> bool: value = os.getenv(name) if value is None: return default return value.strip().lower() in {"1", "true", "yes", "on"} def _int_env(name: str, default: int) -> int: value = os.getenv(name) if value is None: return default try: return int(value) except ValueError as exc: raise AdapterConfigurationError(f"{name} must be an integer, got {value!r}.") from exc def _float_env(name: str, default: float) -> float: value = os.getenv(name) if value is None: return default try: return float(value) except ValueError as exc: raise AdapterConfigurationError(f"{name} must be a number, got {value!r}.") from exc def _required_env(name: str) -> str: value = os.getenv(name) if not value or not value.strip(): raise AdapterConfigurationError(f"{name} is required for the modal adapter profile.") return value.strip()