Spaces:
Sleeping
Sleeping
| import asyncio | |
| import os | |
| import numpy as np | |
| import torch | |
| import gradio as gr | |
| from fastapi import FastAPI | |
| from fastrtc import AsyncStreamHandler, Stream, wait_for_item | |
| from transformers import pipeline, AutoTokenizer | |
| # 1. Hardware & Modell-Initialisierung | |
| device = "cuda" if torch.cuda.is_available() else "cpu" | |
| # Wir nutzen 4-bit Quantisierung falls CUDA verfügbar ist, um Latenz zu sparen | |
| model_kwargs = {"torch_dtype": torch.float16, "load_in_4bit": True} if device == "cuda" else {} | |
| # STT: Whisper-tiny für minimale Latenz | |
| stt_pipe = pipeline("automatic-speech-recognition", model="openai/whisper-tiny", device=device) | |
| # LLM: Phi-3-mini (Instruction-tuned) | |
| llm_model = "microsoft/Phi-3-mini-4k-instruct" | |
| tokenizer = AutoTokenizer.from_pretrained(llm_model) | |
| llm_pipe = pipeline("text-generation", model=llm_model, tokenizer=tokenizer, device=device, model_kwargs=model_kwargs) | |
| # TTS: MMS-TTS für Deutsch | |
| tts_pipe = pipeline("text-to-speech", model="facebook/mms-tts-deu", device=device) | |
| class OpenSourceHandler(AsyncStreamHandler): | |
| def __init__(self, expected_layout="mono", output_sample_rate=24000): | |
| super().__init__(expected_layout, output_sample_rate, input_sample_rate=16000) | |
| self.output_queue = asyncio.Queue() | |
| async def receive(self, frame: tuple[int, np.ndarray]) -> None: | |
| rate, array = frame | |
| # Zugriff auf die Gradio-Zusatzeingaben (z.B. Sprache oder System-Prompt) | |
| # latest_args[0] ist der erste zusätzliche Input nach dem Audio | |
| # Hier beispielhaft für zukünftige Erweiterungen: | |
| # system_msg = self.latest_args[0] if self.latest_args else "Du bist ein Assistent." | |
| # 1. STT: Audio -> Text (Whisper erwartet Float32) | |
| audio_fp32 = array.astype(np.float32) / 32768.0 | |
| text_result = stt_pipe({"sampling_rate": rate, "raw": audio_fp32})["text"] | |
| # Rausch-Filter: Nur antworten, wenn wirklich Text erkannt wurde | |
| if len(text_result.strip()) > 3: | |
| # 2. LLM: Antwort generieren | |
| # return_full_text=False verhindert, dass der Prompt mit ausgegeben wird | |
| prompt = f"<|user|>\n{text_result}<|end|>\n<|assistant|>" | |
| outputs = llm_pipe(prompt, max_new_tokens=64, do_sample=True, return_full_text=False) | |
| answer = outputs[0]["generated_text"].strip() | |
| # 3. TTS: Text -> Audio | |
| # MMS-TTS gibt ein Dict zurück: {'audio': ndarray, 'sampling_rate': int} | |
| audio_out = tts_pipe(answer) | |
| audio_data = audio_out["audio"] # Das ist bereits ein numpy array | |
| # Resampling / Konvertierung zu Int16 für den Stream | |
| audio_int16 = (audio_data * 32767).astype(np.int16) | |
| # Wir nutzen await für die Queue, um sauberes Async-Verhalten zu garantieren | |
| await self.output_queue.put((self.output_sample_rate, audio_int16)) | |
| async def emit(self) -> tuple[int, np.ndarray] | None: | |
| return await wait_for_item(self.output_queue) | |
| def copy(self) -> "OpenSourceHandler": | |
| return OpenSourceHandler(output_sample_rate=self.output_sample_rate) | |
| # 2. FastAPI & Stream Setup | |
| app = FastAPI() # Hier definieren wir die App! | |
| stream = Stream( | |
| modality="audio", | |
| mode="send-receive", | |
| handler=OpenSourceHandler(), | |
| additional_inputs=[ | |
| gr.Textbox(label="System Message", value="Du bist ein hilfreicher KI-Assistent."), | |
| ], | |
| ) | |
| # ... (Handler und Stream Definitionen bleiben gleich) | |
| # Wir nutzen stream.ui, was ein normales gradio.Blocks Objekt ist | |
| app = gr.mount_gradio_app(FastAPI(), stream.ui, path="/") | |
| if __name__ == "__main__": | |
| import uvicorn | |
| import os | |
| port = int(os.getenv("PORT", 7860)) | |
| # Wir starten die 'app' (FastAPI), die nun das Gradio UI auf "/" enthält | |
| uvicorn.run(app, host="0.0.0.0", port=port) |