manikandanj's picture
Prepare AI Time Machine hackathon Space
5862322 verified
Raw
History Blame Contribute Delete
20.4 kB
from __future__ import annotations
import os
from dataclasses import dataclass
from pathlib import Path
from typing import Any
try:
import yaml
except ImportError: # pragma: no cover - exercised only in minimal runtimes.
yaml = None
from time_machine.adapters.fixtures import (
FixtureConversationEngine,
FixtureDestinationGenerator,
FixtureEncounterLibrary,
FixtureImmersiveExperienceGenerator,
FixturePersonaGenerator,
FixtureSTTAdapter,
FixtureSouvenirGenerator,
FixtureTTSAdapter,
)
from time_machine.adapters.image_gen import (
PromptOnlyImmersiveExperienceGenerator,
TogetherImmersiveExperienceGenerator,
)
from time_machine.adapters.llm import QwenStructuredLLMAdapter
from time_machine.adapters.llm.cloud_completion import (
create_cloud_completion_fn,
create_cloud_stream_completion_fn,
)
from time_machine.adapters.model_registry import YamlModelRegistry
from time_machine.adapters.stt import (
ModalNemotronSTTAdapter,
NemotronStreamingSTTAdapter,
WhisperSTTAdapter,
)
from time_machine.adapters.storage import JsonlEncounterStore
from time_machine.adapters.trace import JsonlTraceSink
from time_machine.adapters.tts import KokoroTTSAdapter, ModalQwenTTSAdapter, SapiTTSAdapter
from time_machine.application.encounter_service import EncounterService
from time_machine.application.session_state import InMemorySessionRepository
from time_machine.application.speech_orchestrator import SpeechOrchestrator
from time_machine.application.souvenir_service import SouvenirService
from time_machine.domain.errors import AdapterConfigurationError, ModelBudgetError
from time_machine.domain.models import ModelBudget
from time_machine.ports.speech import TTSAdapter
REPO_ROOT = Path(__file__).resolve().parents[3]
@dataclass(frozen=True)
class AppContainer:
adapter_profile: str
encounter_service: EncounterService
speech_orchestrator: SpeechOrchestrator
souvenir_service: SouvenirService
model_budget: ModelBudget
image_generation_ready: bool
image_generation_warning: str | None = None
@dataclass(frozen=True)
class ModelSelection:
role: str
provider: str
model_id: str
runtime: str
source: str = "config"
def create_container(adapter_profile: str | None = None) -> AppContainer:
app_config = _load_yaml(REPO_ROOT / "config" / "app.yaml")
configured_profile = app_config.get("app", {}).get("adapter_profile", "fixture")
profile = adapter_profile or os.getenv("TIME_MACHINE_ADAPTER_PROFILE", configured_profile)
model_budget = YamlModelRegistry(REPO_ROOT / "config" / "models.yaml").load_budget()
if not model_budget.is_within_limit:
raise ModelBudgetError(
"At least one enabled model is above the per-model limit of "
f"{model_budget.parameter_limit_billion}B."
)
data_dir = Path(os.getenv("TIME_MACHINE_DATA_DIR", app_config["app"]["data_dir"]))
trace_dir = Path(os.getenv("TIME_MACHINE_TRACE_DIR", app_config["app"]["trace_dir"]))
if not data_dir.is_absolute():
data_dir = REPO_ROOT / data_dir
if not trace_dir.is_absolute():
trace_dir = REPO_ROOT / trace_dir
library = FixtureEncounterLibrary(REPO_ROOT / "fixtures" / "encounters")
sessions = InMemorySessionRepository()
store = JsonlEncounterStore(data_dir / "encounters" / "encounters.jsonl")
trace_sink = JsonlTraceSink(trace_dir / "events.jsonl")
if profile == "fixture":
model_selection = [
ModelSelection("llm", "fixture", "fixture-conversation", "fixture", "profile"),
ModelSelection("stt", "fixture", "fixture-stt", "fixture", "profile"),
ModelSelection("tts", "fixture", "fixture-tts", "fixture", "profile"),
]
destination_generator = FixtureDestinationGenerator(library)
persona_generator = FixturePersonaGenerator(library)
conversation_engine = FixtureConversationEngine(library)
stt = FixtureSTTAdapter()
tts = FixtureTTSAdapter()
souvenir_generator = FixtureSouvenirGenerator(library)
immersive_generator, image_generation_ready, image_generation_warning = _create_immersive_generator(profile)
elif profile == "local_models":
llm_model_id = _model_id(model_budget, "llm", "Qwen/Qwen3-4B-Instruct")
llm_runtime = os.getenv("TIME_MACHINE_LLM_RUNTIME", "transformers")
stt_model_id = _model_id(
model_budget,
"stt",
"nvidia/nemotron-3.5-asr-streaming-0.6b",
)
tts_model_id = _model_id(model_budget, "tts_emergency", "hexgrad/Kokoro-82M")
model_selection = [
ModelSelection(
"llm",
"local",
llm_model_id,
llm_runtime,
_env_source("TIME_MACHINE_LLM_RUNTIME"),
),
ModelSelection("stt", "local", stt_model_id, "nemo", "config"),
ModelSelection("tts", "local", tts_model_id, "kokoro", "config"),
]
llm = QwenStructuredLLMAdapter(
model_id=llm_model_id,
runtime=llm_runtime,
max_response_chars=_int_env(
"TIME_MACHINE_MAX_RESPONSE_CHARS",
int(app_config["app"].get("max_response_chars", 260)),
),
)
destination_generator = llm
persona_generator = llm
conversation_engine = llm
souvenir_generator = llm
stt = NemotronStreamingSTTAdapter(
model_id=stt_model_id
)
tts = KokoroTTSAdapter(
model_id=tts_model_id,
output_dir=data_dir / "audio",
)
immersive_generator, image_generation_ready, image_generation_warning = _create_immersive_generator(profile)
elif profile == "dev":
# Cloud LLM + local low-latency TTS + Whisper STT.
# Fastest path to test real inference end-to-end.
llm_model_id = os.getenv(
"TIME_MACHINE_LLM_MODEL",
os.getenv("TIME_MACHINE_LLM_DEV_MODEL", "Qwen/Qwen2.5-7B-Instruct-Turbo"),
)
stt_model_size = os.getenv("TIME_MACHINE_WHISPER_MODEL", "base")
tts_runtime = _dev_tts_runtime()
tts_model_id = (
_model_id(model_budget, "tts_emergency", "hexgrad/Kokoro-82M")
if tts_runtime == "kokoro"
else "windows-sapi"
)
model_selection = [
ModelSelection(
"llm",
_llm_provider_name(),
llm_model_id,
"cloud_api",
_first_env_source("TIME_MACHINE_LLM_MODEL", "TIME_MACHINE_LLM_DEV_MODEL"),
),
ModelSelection(
"stt",
"local",
f"openai-whisper:{stt_model_size}",
"whisper",
_env_source("TIME_MACHINE_WHISPER_MODEL"),
),
ModelSelection("tts", "local", tts_model_id, tts_runtime, _env_source("TIME_MACHINE_DEV_TTS")),
]
completion_fn = create_cloud_completion_fn(model=llm_model_id)
stream_completion_fn = create_cloud_stream_completion_fn(model=llm_model_id)
llm = QwenStructuredLLMAdapter(
model_id=llm_model_id,
completion_fn=completion_fn,
stream_completion_fn=stream_completion_fn,
allow_development_fallback=_env_flag(
"TIME_MACHINE_ALLOW_MODEL_FALLBACK",
default=False,
),
max_response_chars=_int_env(
"TIME_MACHINE_MAX_RESPONSE_CHARS",
int(app_config["app"].get("max_response_chars", 260)),
),
)
destination_generator = llm
persona_generator = llm
conversation_engine = llm
souvenir_generator = llm
stt = WhisperSTTAdapter(
model_size=stt_model_size,
)
tts = _create_dev_tts(model_budget, data_dir)
immersive_generator, image_generation_ready, image_generation_warning = _create_immersive_generator(profile)
elif profile == "modal":
llm_model_id = os.getenv(
"TIME_MACHINE_LLM_MODEL",
_model_id(model_budget, "llm", "Qwen/Qwen3-8B"),
)
stt_model_id = _model_id(
model_budget,
"stt",
"nvidia/nemotron-3.5-asr-streaming-0.6b",
)
tts_model_family = os.getenv("TIME_MACHINE_MODAL_TTS_MODEL_FAMILY", "chatterbox_turbo")
normalized_tts_family = _normalize_modal_tts_family(tts_model_family)
tts_model_id = _modal_tts_model_id(model_budget, normalized_tts_family)
model_selection = [
ModelSelection(
"llm",
_llm_provider_name(),
llm_model_id,
"cloud_api",
_env_source("TIME_MACHINE_LLM_MODEL"),
),
ModelSelection("stt", "modal", stt_model_id, "nemo", "config"),
ModelSelection(
"tts",
"modal",
tts_model_id,
normalized_tts_family,
_env_source("TIME_MACHINE_MODAL_TTS_MODEL_FAMILY", default="config"),
),
]
completion_fn = create_cloud_completion_fn(model=llm_model_id)
stream_completion_fn = create_cloud_stream_completion_fn(model=llm_model_id)
llm = QwenStructuredLLMAdapter(
model_id=llm_model_id,
completion_fn=completion_fn,
stream_completion_fn=stream_completion_fn,
allow_development_fallback=_env_flag(
"TIME_MACHINE_ALLOW_MODEL_FALLBACK",
default=False,
),
max_response_chars=_int_env(
"TIME_MACHINE_MAX_RESPONSE_CHARS",
int(app_config["app"].get("modal_max_response_chars", 120)),
),
)
destination_generator = llm
persona_generator = llm
conversation_engine = llm
souvenir_generator = llm
modal_bearer_token = os.getenv("TIME_MACHINE_MODAL_BEARER_TOKEN")
stt = ModalNemotronSTTAdapter(
endpoint_url=_required_env("TIME_MACHINE_MODAL_STT_URL"),
timeout_seconds=_float_env("TIME_MACHINE_MODAL_STT_TIMEOUT", 120.0),
bearer_token=modal_bearer_token,
language=os.getenv("TIME_MACHINE_MODAL_STT_LANGUAGE", "auto"),
)
tts = ModalQwenTTSAdapter(
endpoint_url=_required_env("TIME_MACHINE_MODAL_TTS_URL"),
output_dir=data_dir / "audio",
timeout_seconds=_float_env("TIME_MACHINE_MODAL_TTS_TIMEOUT", 180.0),
bearer_token=modal_bearer_token,
language=os.getenv("TIME_MACHINE_MODAL_TTS_LANGUAGE", "English"),
model_family=normalized_tts_family,
latency_profile=os.getenv("TIME_MACHINE_MODAL_TTS_LATENCY_PROFILE", "balanced"),
exaggeration=_float_env("TIME_MACHINE_CHATTERBOX_EXAGGERATION", 0.65),
cfg_weight=_float_env("TIME_MACHINE_CHATTERBOX_CFG_WEIGHT", 0.35),
temperature=_float_env("TIME_MACHINE_CHATTERBOX_TEMPERATURE", 0.8),
)
immersive_generator, image_generation_ready, image_generation_warning = _create_immersive_generator(profile)
else:
raise AdapterConfigurationError(
f"Adapter profile '{profile}' is not implemented. "
"Use 'fixture', 'local_models', 'dev', or 'modal'."
)
_print_model_selection(profile, model_selection)
encounter_service = EncounterService(
sessions=sessions,
destination_generator=destination_generator,
persona_generator=persona_generator,
conversation_engine=conversation_engine,
tts=tts,
souvenir_generator=souvenir_generator,
store=store,
trace_sink=trace_sink,
immersive_generator=immersive_generator,
)
return AppContainer(
adapter_profile=profile,
encounter_service=encounter_service,
speech_orchestrator=SpeechOrchestrator(stt=stt, encounter_service=encounter_service),
souvenir_service=SouvenirService(encounter_service),
model_budget=model_budget,
image_generation_ready=image_generation_ready,
image_generation_warning=image_generation_warning,
)
def _load_yaml(path: Path) -> dict[str, Any]:
text = path.read_text(encoding="utf-8")
if yaml is not None:
return yaml.safe_load(text)
return _load_simple_yaml(text)
def _load_simple_yaml(text: str) -> dict[str, Any]:
"""Small fallback for this repo's config files when PyYAML is unavailable."""
root: dict[str, Any] = {}
stack: list[tuple[int, Any]] = [(-1, root)]
pending_list_key: tuple[int, dict[str, Any], str] | None = None
for raw_line in text.splitlines():
if not raw_line.strip() or raw_line.lstrip().startswith("#"):
continue
indent = len(raw_line) - len(raw_line.lstrip(" "))
stripped = raw_line.strip()
while stack and indent <= stack[-1][0]:
stack.pop()
if stripped.startswith("- "):
if pending_list_key is None:
raise ValueError("Simple YAML parser found a list without a key.")
list_indent, parent, key = pending_list_key
if indent <= list_indent:
raise ValueError("Simple YAML parser found an incorrectly indented list.")
items = parent.setdefault(key, [])
item: dict[str, Any] = {}
items.append(item)
stack.append((indent, item))
remainder = stripped[2:].strip()
if remainder:
item_key, item_value = remainder.split(":", 1)
item[item_key.strip()] = _parse_yaml_scalar(item_value.strip())
continue
key, value = stripped.split(":", 1)
parent = stack[-1][1]
parsed = _parse_yaml_scalar(value.strip())
parent[key.strip()] = parsed
if value.strip() == "":
if _next_nonempty_starts_list(text, raw_line):
pending_list_key = (indent, parent, key.strip())
parent[key.strip()] = []
else:
parent[key.strip()] = {}
stack.append((indent, parent[key.strip()]))
return root
def _parse_yaml_scalar(value: str) -> Any:
if value == "":
return {}
lowered = value.lower()
if lowered == "true":
return True
if lowered == "false":
return False
try:
if "." in value:
return float(value)
return int(value)
except ValueError:
return value.strip('"')
def _next_nonempty_starts_list(text: str, current_line: str) -> bool:
lines = text.splitlines()
try:
start = lines.index(current_line) + 1
except ValueError:
return False
for line in lines[start:]:
if not line.strip() or line.lstrip().startswith("#"):
continue
return line.strip().startswith("- ")
return False
def _model_id(model_budget: ModelBudget, role: str, default: str) -> str:
for model in model_budget.models:
if model.role == role:
return model.model_id
return default
def _modal_tts_model_id(model_budget: ModelBudget, model_family: str) -> str:
if model_family == "chatterbox_turbo":
return _model_id(model_budget, "tts", "ResembleAI/chatterbox-turbo")
return _model_id(
model_budget,
"tts_qwen_fallback",
"Qwen/Qwen3-TTS-12Hz-1.7B-VoiceDesign",
)
def _normalize_modal_tts_family(value: str) -> str:
normalized = value.strip().lower().replace("-", "_")
if normalized in {"chatterbox", "chatterbox_turbo", "turbo"}:
return "chatterbox_turbo"
return "qwen"
def _dev_tts_runtime() -> str:
default_tts = "sapi" if os.name == "nt" else "kokoro"
return os.getenv("TIME_MACHINE_DEV_TTS", default_tts).strip().lower()
def _llm_provider_name() -> str:
base_url = os.getenv("TIME_MACHINE_LLM_BASE_URL", "https://api.together.xyz/v1")
if "together" in base_url:
return "together_ai"
if "openrouter" in base_url:
return "openrouter"
if "fireworks" in base_url:
return "fireworks"
if "openai" in base_url:
return "openai"
return "openai_compatible"
def _env_source(name: str, default: str = "default") -> str:
return f"env:{name}" if os.getenv(name) else default
def _first_env_source(*names: str) -> str:
for name in names:
if os.getenv(name):
return f"env:{name}"
return "default"
def _print_model_selection(profile: str, selections: list[ModelSelection]) -> None:
print(f"AI model selection: profile={profile}")
for selection in selections:
print(
"AI model selection: "
f"{selection.role} provider={selection.provider} "
f"runtime={selection.runtime} model={selection.model_id} "
f"source={selection.source}"
)
def _create_immersive_generator(
profile: str,
) -> tuple[
FixtureImmersiveExperienceGenerator | PromptOnlyImmersiveExperienceGenerator | TogetherImmersiveExperienceGenerator,
bool,
str | None,
]:
has_image_key = bool(
os.getenv("TIME_MACHINE_IMAGE_API_KEY")
or os.getenv("TOGETHER_API_KEY")
or os.getenv("TIME_MACHINE_LLM_API_KEY")
)
if profile == "fixture":
return FixtureImmersiveExperienceGenerator(), False, "Fixture profile is using local SVG visual fixtures."
if has_image_key:
fallback = PromptOnlyImmersiveExperienceGenerator() if profile in {"dev", "modal"} else FixtureImmersiveExperienceGenerator()
return TogetherImmersiveExperienceGenerator(fallback=fallback), True, None
if profile in {"dev", "modal"}:
return (
PromptOnlyImmersiveExperienceGenerator(),
False,
"Image generation is disabled: set TIME_MACHINE_IMAGE_API_KEY, TOGETHER_API_KEY, or TIME_MACHINE_LLM_API_KEY for demo profiles.",
)
return FixtureImmersiveExperienceGenerator(), False, "Image generation is disabled; local fixture images are in use."
def _create_dev_tts(model_budget: ModelBudget, data_dir: Path) -> TTSAdapter:
tts_runtime = _dev_tts_runtime()
if tts_runtime == "sapi":
if os.name != "nt":
raise AdapterConfigurationError("TIME_MACHINE_DEV_TTS=sapi requires Windows.")
return SapiTTSAdapter(output_dir=data_dir / "audio")
if tts_runtime == "kokoro":
return KokoroTTSAdapter(
model_id=_model_id(model_budget, "tts_emergency", "hexgrad/Kokoro-82M"),
output_dir=data_dir / "audio",
)
raise AdapterConfigurationError(
f"Unsupported TIME_MACHINE_DEV_TTS={tts_runtime!r}. Use 'sapi' or 'kokoro'."
)
def _env_flag(name: str, default: bool) -> bool:
value = os.getenv(name)
if value is None:
return default
return value.strip().lower() in {"1", "true", "yes", "on"}
def _int_env(name: str, default: int) -> int:
value = os.getenv(name)
if value is None:
return default
try:
return int(value)
except ValueError as exc:
raise AdapterConfigurationError(f"{name} must be an integer, got {value!r}.") from exc
def _float_env(name: str, default: float) -> float:
value = os.getenv(name)
if value is None:
return default
try:
return float(value)
except ValueError as exc:
raise AdapterConfigurationError(f"{name} must be a number, got {value!r}.") from exc
def _required_env(name: str) -> str:
value = os.getenv(name)
if not value or not value.strip():
raise AdapterConfigurationError(f"{name} is required for the modal adapter profile.")
return value.strip()