mgokg's picture
Update app.py
d8e48b4 verified
import asyncio
import os
import numpy as np
import torch
import gradio as gr
from fastapi import FastAPI
from fastrtc import AsyncStreamHandler, Stream, wait_for_item
from transformers import pipeline, AutoTokenizer
# 1. Hardware & Modell-Initialisierung
device = "cuda" if torch.cuda.is_available() else "cpu"
# Wir nutzen 4-bit Quantisierung falls CUDA verfügbar ist, um Latenz zu sparen
model_kwargs = {"torch_dtype": torch.float16, "load_in_4bit": True} if device == "cuda" else {}
# STT: Whisper-tiny für minimale Latenz
stt_pipe = pipeline("automatic-speech-recognition", model="openai/whisper-tiny", device=device)
# LLM: Phi-3-mini (Instruction-tuned)
llm_model = "microsoft/Phi-3-mini-4k-instruct"
tokenizer = AutoTokenizer.from_pretrained(llm_model)
llm_pipe = pipeline("text-generation", model=llm_model, tokenizer=tokenizer, device=device, model_kwargs=model_kwargs)
# TTS: MMS-TTS für Deutsch
tts_pipe = pipeline("text-to-speech", model="facebook/mms-tts-deu", device=device)
class OpenSourceHandler(AsyncStreamHandler):
def __init__(self, expected_layout="mono", output_sample_rate=24000):
super().__init__(expected_layout, output_sample_rate, input_sample_rate=16000)
self.output_queue = asyncio.Queue()
async def receive(self, frame: tuple[int, np.ndarray]) -> None:
rate, array = frame
# Zugriff auf die Gradio-Zusatzeingaben (z.B. Sprache oder System-Prompt)
# latest_args[0] ist der erste zusätzliche Input nach dem Audio
# Hier beispielhaft für zukünftige Erweiterungen:
# system_msg = self.latest_args[0] if self.latest_args else "Du bist ein Assistent."
# 1. STT: Audio -> Text (Whisper erwartet Float32)
audio_fp32 = array.astype(np.float32) / 32768.0
text_result = stt_pipe({"sampling_rate": rate, "raw": audio_fp32})["text"]
# Rausch-Filter: Nur antworten, wenn wirklich Text erkannt wurde
if len(text_result.strip()) > 3:
# 2. LLM: Antwort generieren
# return_full_text=False verhindert, dass der Prompt mit ausgegeben wird
prompt = f"<|user|>\n{text_result}<|end|>\n<|assistant|>"
outputs = llm_pipe(prompt, max_new_tokens=64, do_sample=True, return_full_text=False)
answer = outputs[0]["generated_text"].strip()
# 3. TTS: Text -> Audio
# MMS-TTS gibt ein Dict zurück: {'audio': ndarray, 'sampling_rate': int}
audio_out = tts_pipe(answer)
audio_data = audio_out["audio"] # Das ist bereits ein numpy array
# Resampling / Konvertierung zu Int16 für den Stream
audio_int16 = (audio_data * 32767).astype(np.int16)
# Wir nutzen await für die Queue, um sauberes Async-Verhalten zu garantieren
await self.output_queue.put((self.output_sample_rate, audio_int16))
async def emit(self) -> tuple[int, np.ndarray] | None:
return await wait_for_item(self.output_queue)
def copy(self) -> "OpenSourceHandler":
return OpenSourceHandler(output_sample_rate=self.output_sample_rate)
# 2. FastAPI & Stream Setup
app = FastAPI() # Hier definieren wir die App!
stream = Stream(
modality="audio",
mode="send-receive",
handler=OpenSourceHandler(),
additional_inputs=[
gr.Textbox(label="System Message", value="Du bist ein hilfreicher KI-Assistent."),
],
)
# ... (Handler und Stream Definitionen bleiben gleich)
# Wir nutzen stream.ui, was ein normales gradio.Blocks Objekt ist
app = gr.mount_gradio_app(FastAPI(), stream.ui, path="/")
if __name__ == "__main__":
import uvicorn
import os
port = int(os.getenv("PORT", 7860))
# Wir starten die 'app' (FastAPI), die nun das Gradio UI auf "/" enthält
uvicorn.run(app, host="0.0.0.0", port=port)