| import base64 |
| import io |
| import os |
| import re |
| import tempfile |
| from functools import lru_cache |
| from pathlib import Path |
| from typing import Literal |
|
|
| import numpy as np |
| import soundfile as sf |
| import torch |
| import uvicorn |
| from fastapi import FastAPI, HTTPException |
| from fastapi.middleware.cors import CORSMiddleware |
| from fastapi.responses import FileResponse |
| from fastapi.staticfiles import StaticFiles |
| from huggingface_hub import hf_hub_download |
| from pydantic import BaseModel |
| from transformers import AutoModelForCausalLM, AutoTokenizer, pipeline |
| from openai import OpenAI |
|
|
| LLM_API = os.getenv("LLM_API", "").strip() |
| LLM_API_BASE_URL = os.getenv("LLM_API_BASE_URL", "https://api.deepseek.com").strip() |
| LLM_API_MODEL = os.getenv("LLM_API_MODEL", "deepseek-v4-flash").strip() |
|
|
| LLM_BACKEND = os.getenv("LLM_BACKEND", "llamacpp").lower() |
| TEXT_MODEL = os.getenv("TEXT_MODEL", "Qwen/Qwen3-4B-Instruct-2507") |
| GGUF_MODEL_REPO = os.getenv("GGUF_MODEL_REPO", "Qwen/Qwen3-1.7B-GGUF") |
| GGUF_MODEL_FILE = os.getenv("GGUF_MODEL_FILE", "Qwen3-1.7B-Q4_K_M.gguf") |
| LLAMA_CPP_N_CTX = int(os.getenv("LLAMA_CPP_N_CTX", "4096")) |
| LLAMA_CPP_N_THREADS = int(os.getenv("LLAMA_CPP_N_THREADS", str(max(1, os.cpu_count() or 1)))) |
| ASR_MODEL = os.getenv("ASR_MODEL", "openai/whisper-tiny") |
| KOKORO_LANG_CODE = os.getenv("KOKORO_LANG_CODE", "z") |
| KOKORO_VOICE = os.getenv("KOKORO_VOICE", "zf_xiaobei") |
| MAX_NEW_TOKENS = int(os.getenv("MAX_NEW_TOKENS", "220")) |
|
|
| VENTING_SYSTEM_INSTRUCTION = """ |
| 你是一个非常懂人性、充满热情的“情绪嘴替”伙伴。 |
| 你的头号任务是:和用户站在一起,陪他们宣泄。 |
| |
| 规则: |
| 1. 不要讲大道理,不要劝大度。用户在生气时,道理是没用的。 |
| 2. 使用感性、强烈、发泄性的词汇。如果用户在骂某人或某事,你要义愤填膺,表达出“这也太离谱了”、“我也是服了”这种情绪。 |
| 3. 你的目标是让用户感到“有人懂我,有人替我出气”。 |
| 4. 语气像一个铁哥们或闺蜜,语气词可以多一点。 |
| 5. 遵守安全底线:不宣扬仇恨犯罪,不进行人身威胁,不鼓励现实伤害。 |
| 6. 响应长度要多样化,不要每次都回差不多长度。 |
| """ |
|
|
| GUIDING_SYSTEM_INSTRUCTION = """ |
| 你现在是一个睿智、温和且具有同理心的心理导师。 |
| 用户刚才已经发泄过情绪了,现在他们同意听听你的建议或开导。 |
| |
| 规则: |
| 1. 语气平和、坚定、宽容。 |
| 2. 从客观角度分析问题,帮用户找到除了生气之外的解决方法,或者心理上的和解点。 |
| 3. 肯定用户刚才发泄情绪的必要性,然后引导他们向前看。 |
| 4. 每次回答不要太长,要循序渐进。 |
| 5. 响应长度要根据用户状态变化。 |
| """ |
|
|
|
|
| class Message(BaseModel): |
| role: Literal["user", "model"] |
| text: str |
| timestamp: int |
| audio: str | None = None |
| aiAudio: str | None = None |
|
|
|
|
| class ChatRequest(BaseModel): |
| history: list[Message] |
| mode: Literal["VENTING", "GUIDING"] |
| audioBase64: str | None = None |
|
|
|
|
| class SpeechRequest(BaseModel): |
| text: str |
|
|
|
|
| app = FastAPI(title="SPITITOUT HF Space") |
| app.add_middleware( |
| CORSMiddleware, |
| allow_origins=["*"], |
| allow_credentials=True, |
| allow_methods=["*"], |
| allow_headers=["*"], |
| ) |
|
|
|
|
| def _device() -> str: |
| return "cuda" if torch.cuda.is_available() else "cpu" |
|
|
|
|
| @lru_cache(maxsize=1) |
| def get_llm(): |
| tokenizer = AutoTokenizer.from_pretrained(TEXT_MODEL, trust_remote_code=True) |
| dtype = torch.float16 if torch.cuda.is_available() else torch.float32 |
| model = AutoModelForCausalLM.from_pretrained( |
| TEXT_MODEL, |
| dtype=dtype, |
| device_map="auto" if torch.cuda.is_available() else None, |
| trust_remote_code=True, |
| ) |
| if not torch.cuda.is_available(): |
| model.to("cpu") |
| model.eval() |
| return tokenizer, model |
|
|
|
|
| @lru_cache(maxsize=1) |
| def get_llamacpp_llm(): |
| try: |
| from llama_cpp import Llama |
| except Exception as exc: |
| raise RuntimeError( |
| "llama-cpp-python is not installed correctly. Check requirements.txt and Space build logs." |
| ) from exc |
|
|
| model_path = hf_hub_download(repo_id=GGUF_MODEL_REPO, filename=GGUF_MODEL_FILE) |
| return Llama( |
| model_path=model_path, |
| n_ctx=LLAMA_CPP_N_CTX, |
| n_threads=LLAMA_CPP_N_THREADS, |
| n_gpu_layers=-1 if torch.cuda.is_available() else 0, |
| verbose=False, |
| ) |
|
|
|
|
| @lru_cache(maxsize=1) |
| def get_api_client(): |
| if not LLM_API: |
| raise RuntimeError("LLM_API is not set.") |
| return OpenAI( |
| api_key=LLM_API, |
| base_url=LLM_API_BASE_URL, |
| ) |
|
|
|
|
| def generate_reply_api(messages: list[dict[str, str]]) -> str: |
| client = get_api_client() |
|
|
| |
| api_messages = [msg.copy() for msg in messages] |
|
|
| response = client.chat.completions.create( |
| model=LLM_API_MODEL, |
| messages=api_messages, |
| max_tokens=min(MAX_NEW_TOKENS, 220), |
| temperature=0.85, |
| top_p=0.9, |
| stream=False, |
| extra_body={ |
| "thinking": {"type": "disabled"} |
| }, |
| ) |
|
|
| text = response.choices[0].message.content or "" |
| return remove_thinking_blocks(text) or "我听到了,你继续说。" |
|
|
|
|
| @lru_cache(maxsize=1) |
| def get_asr(): |
| device_id = 0 if torch.cuda.is_available() else -1 |
| dtype = torch.float16 if torch.cuda.is_available() else torch.float32 |
| return pipeline( |
| "automatic-speech-recognition", |
| model=ASR_MODEL, |
| torch_dtype=dtype, |
| device=device_id, |
| ) |
|
|
|
|
| @lru_cache(maxsize=1) |
| def get_tts(): |
| try: |
| from kokoro import KPipeline |
| except Exception as exc: |
| raise RuntimeError( |
| "Kokoro TTS is not installed correctly. Check requirements.txt and Space build logs." |
| ) from exc |
|
|
| return KPipeline(lang_code=KOKORO_LANG_CODE) |
|
|
|
|
| def transcribe_audio(audio_base64: str) -> str: |
| audio_bytes = base64.b64decode(audio_base64) |
| with tempfile.NamedTemporaryFile(suffix=".webm", delete=True) as audio_file: |
| audio_file.write(audio_bytes) |
| audio_file.flush() |
| result = get_asr()(audio_file.name) |
| return str(result.get("text", "")).strip() |
|
|
|
|
| |
| |
| |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
|
|
|
|
| def build_chat_messages(request: ChatRequest, transcript: str | None) -> list[dict[str, str]]: |
| system = VENTING_SYSTEM_INSTRUCTION if request.mode == "VENTING" else GUIDING_SYSTEM_INSTRUCTION |
|
|
| system += """ |
| 额外规则: |
| 1. 不要复述上一轮回答。 |
| 2. 不要使用和上一轮相同的开头。 |
| 3. 用户只发短句时,只针对这句短句回应,不要把旧话题整段重复。 |
| 4. 每次最多 2 到 4 句话。 |
| """ |
|
|
| messages = [{"role": "system", "content": system}] |
|
|
| recent_history = request.history[-4:] |
|
|
| for index, msg in enumerate(recent_history): |
| content = msg.text |
| if transcript and index == len(recent_history) - 1 and msg.role == "user": |
| content = transcript if content == "🎤 语音消息" else f"{content}\n\n语音补充:{transcript}" |
|
|
| messages.append({ |
| "role": "assistant" if msg.role == "model" else "user", |
| "content": content, |
| }) |
|
|
| return messages |
|
|
| def messages_to_prompt(messages: list[dict[str, str]]) -> str: |
| prompt = [] |
| for msg in messages: |
| role = "assistant" if msg["role"] == "assistant" else msg["role"] |
| prompt.append(f"<|im_start|>{role}\n{msg['content']}<|im_end|>") |
| prompt.append("<|im_start|>assistant\n") |
| return "\n".join(prompt) |
|
|
|
|
| def remove_thinking_blocks(text: str) -> str: |
| text = re.sub(r"<think>.*?</think>", "", text, flags=re.DOTALL | re.IGNORECASE) |
| return text.strip() |
|
|
|
|
| def generate_reply(messages: list[dict[str, str]]) -> str: |
| if LLM_API: |
| return generate_reply_api(messages) |
| if LLM_BACKEND == "llamacpp": |
| return generate_reply_llamacpp(messages) |
| return generate_reply_transformers(messages) |
|
|
|
|
| def generate_reply_llamacpp(messages: list[dict[str, str]]) -> str: |
| llm = get_llamacpp_llm() |
| no_think_messages = [msg.copy() for msg in messages] |
| for msg in reversed(no_think_messages): |
| if msg["role"] == "user": |
| msg["content"] = f"{msg['content']}\n/no_think" |
| break |
| prompt = messages_to_prompt(no_think_messages) |
| output = llm( |
| prompt, |
| max_tokens=MAX_NEW_TOKENS, |
| temperature=0.7, |
| top_p=0.8, |
| repeat_penalty=1.12, |
| stop=["<|im_end|>", "<|endoftext|>"], |
| ) |
| text = output["choices"][0]["text"] |
| return remove_thinking_blocks(text) or "我听到了,你继续说。" |
|
|
|
|
| def generate_reply_transformers(messages: list[dict[str, str]]) -> str: |
| tokenizer, model = get_llm() |
| try: |
| prompt = tokenizer.apply_chat_template( |
| messages, |
| tokenize=False, |
| add_generation_prompt=True, |
| enable_thinking=False, |
| ) |
| except TypeError: |
| prompt = tokenizer.apply_chat_template( |
| messages, |
| tokenize=False, |
| add_generation_prompt=True, |
| ) |
|
|
| inputs = tokenizer([prompt], return_tensors="pt").to(model.device) |
| with torch.inference_mode(): |
| output_ids = model.generate( |
| **inputs, |
| max_new_tokens=MAX_NEW_TOKENS, |
| do_sample=True, |
| temperature=0.85, |
| top_p=0.9, |
| pad_token_id=tokenizer.eos_token_id, |
| ) |
| generated_ids = output_ids[0][inputs.input_ids.shape[-1]:] |
| text = tokenizer.decode(generated_ids, skip_special_tokens=True) |
| return remove_thinking_blocks(text) or "我听到了,你继续说。" |
|
|
|
|
| def synthesize_speech(text: str) -> str | None: |
| if not text.strip(): |
| return None |
|
|
| pipeline_tts = get_tts() |
| chunks = [] |
| for _, _, audio in pipeline_tts(text[:500], voice=KOKORO_VOICE, speed=1.05): |
| chunks.append(np.asarray(audio, dtype=np.float32)) |
| if not chunks: |
| return None |
|
|
| audio = np.concatenate(chunks) |
| wav_io = io.BytesIO() |
| sf.write(wav_io, audio, 24000, format="WAV") |
| return base64.b64encode(wav_io.getvalue()).decode("utf-8") |
|
|
|
|
| @app.get("/api/health") |
| def health(): |
| return { |
| "ok": True, |
| "runtime": "api" if LLM_API else "local", |
| "llm_backend": "deepseek_api" if LLM_API else "llamacpp", |
| "llm_api_base_url": LLM_API_BASE_URL if LLM_API else None, |
| "llm_api_model": LLM_API_MODEL if LLM_API else None, |
| "text_model": TEXT_MODEL, |
| "gguf_model_repo": GGUF_MODEL_REPO, |
| "gguf_model_file": GGUF_MODEL_FILE, |
| "asr_model": ASR_MODEL, |
| "kokoro_lang_code": KOKORO_LANG_CODE, |
| "kokoro_voice": KOKORO_VOICE, |
| "device": _device(), |
| } |
|
|
|
|
| @app.post("/api/chat") |
| def chat(request: ChatRequest): |
| try: |
| transcript = transcribe_audio(request.audioBase64) if request.audioBase64 else None |
| messages = build_chat_messages(request, transcript) |
| return {"text": generate_reply(messages), "transcript": transcript} |
| except Exception as exc: |
| raise HTTPException(status_code=500, detail=str(exc)) from exc |
|
|
|
|
| @app.post("/api/speech") |
| def speech(request: SpeechRequest): |
| try: |
| return {"audio": synthesize_speech(request.text)} |
| except Exception as exc: |
| raise HTTPException(status_code=500, detail=str(exc)) from exc |
|
|
|
|
| dist_dir = Path(__file__).parent / "dist" |
| if dist_dir.exists(): |
| app.mount("/assets", StaticFiles(directory=dist_dir / "assets"), name="assets") |
|
|
|
|
| @app.get("/{path:path}") |
| def frontend(path: str): |
| requested = dist_dir / path |
| if requested.is_file(): |
| return FileResponse(requested) |
| index = dist_dir / "index.html" |
| if index.exists(): |
| return FileResponse(index) |
| return {"message": "Run npm run build before serving the Space frontend."} |
|
|
|
|
| if __name__ == "__main__": |
| port = int(os.getenv("PORT", "7860")) |
| uvicorn.run(app, host="0.0.0.0", port=port) |
|
|