import asyncio import os import numpy as np import torch import gradio as gr from fastapi import FastAPI from fastrtc import AsyncStreamHandler, Stream, wait_for_item from transformers import pipeline, AutoTokenizer # 1. Hardware & Modell-Initialisierung device = "cuda" if torch.cuda.is_available() else "cpu" # Wir nutzen 4-bit Quantisierung falls CUDA verfügbar ist, um Latenz zu sparen model_kwargs = {"torch_dtype": torch.float16, "load_in_4bit": True} if device == "cuda" else {} # STT: Whisper-tiny für minimale Latenz stt_pipe = pipeline("automatic-speech-recognition", model="openai/whisper-tiny", device=device) # LLM: Phi-3-mini (Instruction-tuned) llm_model = "microsoft/Phi-3-mini-4k-instruct" tokenizer = AutoTokenizer.from_pretrained(llm_model) llm_pipe = pipeline("text-generation", model=llm_model, tokenizer=tokenizer, device=device, model_kwargs=model_kwargs) # TTS: MMS-TTS für Deutsch tts_pipe = pipeline("text-to-speech", model="facebook/mms-tts-deu", device=device) class OpenSourceHandler(AsyncStreamHandler): def __init__(self, expected_layout="mono", output_sample_rate=24000): super().__init__(expected_layout, output_sample_rate, input_sample_rate=16000) self.output_queue = asyncio.Queue() async def receive(self, frame: tuple[int, np.ndarray]) -> None: rate, array = frame # Zugriff auf die Gradio-Zusatzeingaben (z.B. Sprache oder System-Prompt) # latest_args[0] ist der erste zusätzliche Input nach dem Audio # Hier beispielhaft für zukünftige Erweiterungen: # system_msg = self.latest_args[0] if self.latest_args else "Du bist ein Assistent." # 1. STT: Audio -> Text (Whisper erwartet Float32) audio_fp32 = array.astype(np.float32) / 32768.0 text_result = stt_pipe({"sampling_rate": rate, "raw": audio_fp32})["text"] # Rausch-Filter: Nur antworten, wenn wirklich Text erkannt wurde if len(text_result.strip()) > 3: # 2. LLM: Antwort generieren # return_full_text=False verhindert, dass der Prompt mit ausgegeben wird prompt = f"<|user|>\n{text_result}<|end|>\n<|assistant|>" outputs = llm_pipe(prompt, max_new_tokens=64, do_sample=True, return_full_text=False) answer = outputs[0]["generated_text"].strip() # 3. TTS: Text -> Audio # MMS-TTS gibt ein Dict zurück: {'audio': ndarray, 'sampling_rate': int} audio_out = tts_pipe(answer) audio_data = audio_out["audio"] # Das ist bereits ein numpy array # Resampling / Konvertierung zu Int16 für den Stream audio_int16 = (audio_data * 32767).astype(np.int16) # Wir nutzen await für die Queue, um sauberes Async-Verhalten zu garantieren await self.output_queue.put((self.output_sample_rate, audio_int16)) async def emit(self) -> tuple[int, np.ndarray] | None: return await wait_for_item(self.output_queue) def copy(self) -> "OpenSourceHandler": return OpenSourceHandler(output_sample_rate=self.output_sample_rate) # 2. FastAPI & Stream Setup app = FastAPI() # Hier definieren wir die App! stream = Stream( modality="audio", mode="send-receive", handler=OpenSourceHandler(), additional_inputs=[ gr.Textbox(label="System Message", value="Du bist ein hilfreicher KI-Assistent."), ], ) # ... (Handler und Stream Definitionen bleiben gleich) # Wir nutzen stream.ui, was ein normales gradio.Blocks Objekt ist app = gr.mount_gradio_app(FastAPI(), stream.ui, path="/") if __name__ == "__main__": import uvicorn import os port = int(os.getenv("PORT", 7860)) # Wir starten die 'app' (FastAPI), die nun das Gradio UI auf "/" enthält uvicorn.run(app, host="0.0.0.0", port=port)