|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| import os |
| import io |
| import time |
| import threading |
| import logging |
| from typing import Optional, Any, List, Tuple |
|
|
| import requests |
| import gradio as gr |
| from langdetect import detect, DetectorFactory |
| from gtts import gTTS |
|
|
| |
| DetectorFactory.seed = 0 |
|
|
| |
| logging.basicConfig(level=logging.INFO) |
| logger = logging.getLogger("kcrobot.v4") |
|
|
| |
| HF_API_TOKEN = os.getenv("HF_API_TOKEN", "").strip() |
| HF_MODEL = os.getenv("HF_MODEL", "google/flan-t5-large") |
| HF_STT_MODEL = os.getenv("HF_STT_MODEL", "openai/whisper-small") |
| HF_TTS_MODEL = os.getenv("HF_TTS_MODEL", "") |
| TELEGRAM_TOKEN = os.getenv("TELEGRAM_TOKEN", "").strip() |
| TELEGRAM_CHATID = os.getenv("TELEGRAM_CHATID", "").strip() |
|
|
| if not HF_API_TOKEN: |
| logger.warning("HF_API_TOKEN not set — put it into Space Secrets for HF inference calls to work.") |
|
|
| HF_HEADERS = {"Authorization": f"Bearer {HF_API_TOKEN}"} if HF_API_TOKEN else {} |
|
|
| |
| CONVERSATION: List[Tuple[str, str]] = [] |
| DISPLAY_BUFFER: List[str] = [] |
| DISPLAY_LIMIT = 6 |
|
|
| def push_display(line: str): |
| DISPLAY_BUFFER.append(line) |
| if len(DISPLAY_BUFFER) > DISPLAY_LIMIT: |
| del DISPLAY_BUFFER[0] |
|
|
| |
| def _parse_hf_text_response(data: Any) -> str: |
| try: |
| if isinstance(data, list) and data and isinstance(data[0], dict): |
| return data[0].get("generated_text", "") or str(data[0]) |
| if isinstance(data, dict) and "generated_text" in data: |
| return data["generated_text"] |
| if isinstance(data, dict) and "text" in data: |
| return data["text"] |
| if isinstance(data, dict) and "choices" in data and isinstance(data["choices"], list): |
| c0 = data["choices"][0] |
| return c0.get("text") or c0.get("message", {}).get("content", "") or str(c0) |
| return str(data) |
| except Exception: |
| return str(data) |
|
|
| def hf_text_generate(prompt: str, model: Optional[str] = None, max_new_tokens: int = 256, temperature: float = 0.7) -> str: |
| if not HF_API_TOKEN: |
| raise RuntimeError("HF_API_TOKEN not configured in environment") |
| model = model or HF_MODEL |
| url = f"https://api-inference.huggingface.co/models/{model}" |
| payload = { |
| "inputs": prompt, |
| "parameters": {"max_new_tokens": int(max_new_tokens), "temperature": float(temperature)}, |
| "options": {"wait_for_model": True} |
| } |
| logger.info("HF text gen -> model=%s prompt_len=%d", model, len(prompt)) |
| r = requests.post(url, headers=HF_HEADERS, json=payload, timeout=120) |
| if r.status_code != 200: |
| logger.error("HF text gen error %s: %s", r.status_code, r.text[:300]) |
| raise RuntimeError(f"HF text gen failed: {r.status_code}: {r.text}") |
| return _parse_hf_text_response(r.json()) |
|
|
| def hf_stt_from_bytes(audio_bytes: bytes, model: Optional[str] = None) -> str: |
| if not HF_API_TOKEN: |
| raise RuntimeError("HF_API_TOKEN not configured") |
| model = model or HF_STT_MODEL |
| url = f"https://api-inference.huggingface.co/models/{model}" |
| headers = dict(HF_HEADERS) |
| headers["Content-Type"] = "application/octet-stream" |
| logger.info("HF STT -> model=%s bytes=%d", model, len(audio_bytes) if audio_bytes else 0) |
| r = requests.post(url, headers=headers, data=audio_bytes, timeout=180) |
| if r.status_code != 200: |
| logger.error("HF STT error %s: %s", r.status_code, r.text[:300]) |
| raise RuntimeError(f"HF STT failed: {r.status_code}: {r.text}") |
| j = r.json() |
| if isinstance(j, dict) and "text" in j: |
| return j["text"] |
| return _parse_hf_text_response(j) |
|
|
| |
| def tts_gtts_bytes(text: str) -> bytes: |
| if not text: |
| raise RuntimeError("Empty text for TTS") |
| |
| try: |
| lang = detect(text) |
| except Exception: |
| lang = "vi" |
| |
| tts_lang = "vi" if lang.startswith("vi") else "en" |
| logger.info("gTTS generating audio lang=%s len=%d", tts_lang, len(text)) |
| tts = gTTS(text=text, lang=tts_lang) |
| bio = io.BytesIO() |
| tts.write_to_fp(bio) |
| bio.seek(0) |
| return bio.read() |
|
|
| |
| def send_telegram(text: str): |
| if not TELEGRAM_TOKEN or not TELEGRAM_CHATID: |
| logger.debug("Telegram not configured or missing chat id") |
| return |
| base = f"https://api.telegram.org/bot{TELEGRAM_TOKEN}" |
| try: |
| requests.post(base + "/sendMessage", json={"chat_id": TELEGRAM_CHATID, "text": text}, timeout=10) |
| except Exception: |
| logger.exception("send_telegram failed") |
|
|
| def telegram_poller(): |
| if not TELEGRAM_TOKEN: |
| logger.info("Telegram poller disabled") |
| return |
| base = f"https://api.telegram.org/bot{TELEGRAM_TOKEN}" |
| offset = None |
| logger.info("Telegram poller started") |
| while True: |
| try: |
| params = {"timeout": 30} |
| if offset: params["offset"] = offset |
| r = requests.get(base + "/getUpdates", params=params, timeout=35) |
| if r.status_code != 200: |
| time.sleep(2); continue |
| data = r.json() |
| for upd in data.get("result", []): |
| offset = upd.get("update_id", 0) + 1 |
| msg = upd.get("message") or {} |
| chat = msg.get("chat", {}) |
| chat_id = chat.get("id") |
| text = (msg.get("text") or "").strip() |
| if not text: |
| continue |
| logger.info("TG msg: %s", text) |
| if text.lower().startswith("/ask "): |
| q = text[5:].strip() |
| try: |
| ans = hf_text_generate(q) |
| except Exception as e: |
| ans = f"[HF error] {e}" |
| try: |
| requests.post(base + "/sendMessage", json={"chat_id": chat_id, "text": ans}, timeout=10) |
| except Exception: |
| logger.exception("tg reply failed") |
| elif text.lower().startswith("/say "): |
| phrase = text[5:].strip() |
| try: |
| audio = tts_gtts_bytes(phrase) |
| files = {"audio": ("reply.mp3", audio, "audio/mpeg")} |
| requests.post(base + "/sendAudio", files=files, data={"chat_id": chat_id}, timeout=30) |
| except Exception: |
| logger.exception("tg say failed") |
| elif text.lower().startswith("/status"): |
| try: |
| requests.post(base + "/sendMessage", json={"chat_id": chat_id, "text": "KC Robot brain running"}, timeout=10) |
| except Exception: |
| pass |
| else: |
| try: |
| requests.post(base + "/sendMessage", json={"chat_id": chat_id, "text": "Commands: /ask <q> | /say <text> | /status"}, timeout=10) |
| except Exception: |
| pass |
| except Exception: |
| logger.exception("telegram poller exception") |
| time.sleep(3) |
|
|
| if TELEGRAM_TOKEN: |
| t = threading.Thread(target=telegram_poller, daemon=True) |
| t.start() |
|
|
| |
| with gr.Blocks(title="KC Robot AI - Cloud Brain V4") as demo: |
| gr.Markdown("## 🤖 KC Robot AI — Cloud Brain (Hugging Face Inference)") |
| with gr.Row(): |
| with gr.Column(scale=2): |
| chatbot = gr.Chatbot([], elem_id="chatbot").style(height=480) |
| txt = gr.Textbox(lines=2, placeholder="Nhập câu hỏi (VN/EN) hoặc tiếng Anh...", label="Your message") |
| send = gr.Button("Gửi") |
| with gr.Row(): |
| temp = gr.Slider(0.0, 1.0, value=0.7, label="Temperature") |
| tokens = gr.Slider(16, 1024, value=256, step=16, label="Max tokens") |
| model_override = gr.Textbox(label="Override HF model (optional)") |
| with gr.Column(scale=1): |
| gr.Markdown("### TTS / STT") |
| tts_in = gr.Textbox(lines=2, label="Text → TTS") |
| tts_btn = gr.Button("Create TTS") |
| tts_audio = gr.Audio(label="TTS audio", interactive=False) |
| gr.Markdown("Upload audio for STT") |
| up = gr.Audio(source="upload", type="filepath", label="Upload audio") |
| stt_btn = gr.Button("Transcribe") |
| stt_out = gr.Textbox(label="Transcription") |
|
|
| def chat_fn(message, history, temperature, max_tokens, model_override_val): |
| if not message or not message.strip(): |
| return history or [], "" |
| system = "You are KC Robot AI, bilingual (Vietnamese & English). Answer in the same language as the user. Be clear and helpful." |
| prompt = f"{system}\n\nUser: {message}\nAssistant:" |
| model = model_override_val.strip() if model_override_val else HF_MODEL |
| try: |
| ans = hf_text_generate(prompt, model=model, max_new_tokens=int(max_tokens), temperature=float(temperature)) |
| except Exception as e: |
| ans = f"[HF error] {e}" |
| history = history or [] |
| history.append(("You", message)) |
| history.append(("Bot", ans)) |
| push_display(f"YOU: {message[:40]}") |
| push_display(f"BOT: {ans[:40]}") |
| return history, "" |
|
|
| def tts_fn(text, model_override_val): |
| if not text or not text.strip(): |
| return None |
| |
| try: |
| audio = tts_gtts_bytes(text) |
| return (audio, "audio/mpeg") |
| except Exception as e: |
| raise gr.Error(f"TTS failed: {e}") |
|
|
| def stt_fn(local_path, model_override_val): |
| if not local_path: |
| return "" |
| with open(local_path, "rb") as f: |
| b = f.read() |
| try: |
| text = hf_stt_from_bytes(b) |
| except Exception as e: |
| raise gr.Error(f"STT failed: {e}") |
| push_display(f"Voice: {text[:40]}") |
| return text |
|
|
| send.click(chat_fn, inputs=[txt, chatbot, temp, tokens, model_override], outputs=[chatbot, txt]) |
| tts_btn.click(tts_fn, inputs=[tts_in, model_override], outputs=[tts_audio]) |
| stt_btn.click(stt_fn, inputs=[up, model_override], outputs=[stt_out]) |
|
|
| |
| app = demo.app |
|
|
| from fastapi import Request, UploadFile, File |
| from starlette.responses import JSONResponse, Response |
|
|
| @app.post("/api/ask") |
| async def api_ask(request: Request): |
| try: |
| j = await request.json() |
| except Exception: |
| return JSONResponse({"error":"invalid json"}, status_code=400) |
| text = (j.get("text","") or "").strip() |
| lang = (j.get("lang","auto") or "auto").strip().lower() |
| if not text: |
| return JSONResponse({"error":"no text"}, status_code=400) |
| if lang == "vi": |
| prompt = "Bạn là trợ lý thông minh. Trả lời bằng tiếng Việt, rõ ràng:\n\n" + text |
| elif lang == "en": |
| prompt = "You are a helpful assistant. Answer in English:\n\n" + text |
| else: |
| prompt = "You are bilingual assistant (Vietnamese/English). Answer in the language of the question.\n\n" + text |
| try: |
| ans = hf_text_generate(prompt) |
| except Exception as e: |
| return JSONResponse({"error": str(e)}, status_code=500) |
| CONVERSATION.append((text, ans)) |
| push_display(f"YOU: {text[:40]}") |
| push_display(f"BOT: {ans[:40]}") |
| return {"answer": ans} |
|
|
| @app.post("/api/tts") |
| async def api_tts(request: Request): |
| try: |
| j = await request.json() |
| except Exception: |
| return JSONResponse({"error":"invalid json"}, status_code=400) |
| text = (j.get("text","") or "").strip() |
| if not text: |
| return JSONResponse({"error":"no text"}, status_code=400) |
| |
| try: |
| mp3 = tts_gtts_bytes(text) |
| except Exception as e: |
| return JSONResponse({"error": str(e)}, status_code=500) |
| return Response(content=mp3, media_type="audio/mpeg") |
|
|
| @app.post("/api/stt") |
| async def api_stt(file: UploadFile = File(...)): |
| try: |
| content = await file.read() |
| except Exception: |
| return JSONResponse({"error":"file read error"}, status_code=400) |
| if not content: |
| return JSONResponse({"error":"no audio content"}, status_code=400) |
| try: |
| text = hf_stt_from_bytes(content) |
| except Exception as e: |
| return JSONResponse({"error": str(e)}, status_code=500) |
| push_display(f"Voice: {text[:40]}") |
| CONVERSATION.append((f"[voice] {text}", "")) |
| return {"text": text} |
|
|
| @app.post("/api/presence") |
| async def api_presence(request: Request): |
| try: |
| j = await request.json() |
| except Exception: |
| return JSONResponse({"error":"invalid json"}, status_code=400) |
| note = (j.get("note","Có người phía trước") or "").strip() |
| greeting = f"Xin chào! {note}" |
| push_display(f"RADAR: {note[:40]}") |
| CONVERSATION.append(("__presence__", greeting)) |
| if TELEGRAM_TOKEN and TELEGRAM_CHATID: |
| try: |
| send_telegram(f"⚠️ Robot: Phát hiện người - {note}") |
| except Exception: |
| logger.exception("telegram notify failed") |
| return {"greeting": greeting} |
|
|
| @app.get("/api/display") |
| async def api_display(): |
| return {"lines": DISPLAY_BUFFER.copy(), "conv_len": len(CONVERSATION)} |
|
|
| |
| if __name__ == "__main__": |
| demo.launch(server_name="0.0.0.0", server_port=int(os.environ.get("PORT", 7860))) |
|
|