RobotAI / app.py
kcrobot25's picture
kc commit
12bf2c1 verified
raw
history blame
14 kB
# app.py
# KC Robot AI — V4 FINAL (Gradio + REST API /api/* + Telegram)
# Upload this file to a Hugging Face Space (SDK = Gradio).
# Put your secrets in Space Settings:
# HF_API_TOKEN (required)
# TELEGRAM_TOKEN (optional)
# TELEGRAM_CHATID (optional)
# Optional overrides:
# HF_MODEL, HF_STT_MODEL, HF_TTS_MODEL
import os
import io
import time
import threading
import logging
from typing import Optional, Any, List, Tuple
import requests
import gradio as gr
from langdetect import detect, DetectorFactory
from gtts import gTTS
# Ensure deterministic detection
DetectorFactory.seed = 0
# Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("kcrobot.v4")
# ====== Config from Secrets / env ======
HF_API_TOKEN = os.getenv("HF_API_TOKEN", "").strip()
HF_MODEL = os.getenv("HF_MODEL", "google/flan-t5-large")
HF_STT_MODEL = os.getenv("HF_STT_MODEL", "openai/whisper-small")
HF_TTS_MODEL = os.getenv("HF_TTS_MODEL", "") # optional, if empty use gTTS
TELEGRAM_TOKEN = os.getenv("TELEGRAM_TOKEN", "").strip()
TELEGRAM_CHATID = os.getenv("TELEGRAM_CHATID", "").strip()
if not HF_API_TOKEN:
logger.warning("HF_API_TOKEN not set — put it into Space Secrets for HF inference calls to work.")
HF_HEADERS = {"Authorization": f"Bearer {HF_API_TOKEN}"} if HF_API_TOKEN else {}
# ====== In-memory state ======
CONVERSATION: List[Tuple[str, str]] = []
DISPLAY_BUFFER: List[str] = []
DISPLAY_LIMIT = 6
def push_display(line: str):
DISPLAY_BUFFER.append(line)
if len(DISPLAY_BUFFER) > DISPLAY_LIMIT:
del DISPLAY_BUFFER[0]
# ====== Hugging Face helpers ======
def _parse_hf_text_response(data: Any) -> str:
try:
if isinstance(data, list) and data and isinstance(data[0], dict):
return data[0].get("generated_text", "") or str(data[0])
if isinstance(data, dict) and "generated_text" in data:
return data["generated_text"]
if isinstance(data, dict) and "text" in data:
return data["text"]
if isinstance(data, dict) and "choices" in data and isinstance(data["choices"], list):
c0 = data["choices"][0]
return c0.get("text") or c0.get("message", {}).get("content", "") or str(c0)
return str(data)
except Exception:
return str(data)
def hf_text_generate(prompt: str, model: Optional[str] = None, max_new_tokens: int = 256, temperature: float = 0.7) -> str:
if not HF_API_TOKEN:
raise RuntimeError("HF_API_TOKEN not configured in environment")
model = model or HF_MODEL
url = f"https://api-inference.huggingface.co/models/{model}"
payload = {
"inputs": prompt,
"parameters": {"max_new_tokens": int(max_new_tokens), "temperature": float(temperature)},
"options": {"wait_for_model": True}
}
logger.info("HF text gen -> model=%s prompt_len=%d", model, len(prompt))
r = requests.post(url, headers=HF_HEADERS, json=payload, timeout=120)
if r.status_code != 200:
logger.error("HF text gen error %s: %s", r.status_code, r.text[:300])
raise RuntimeError(f"HF text gen failed: {r.status_code}: {r.text}")
return _parse_hf_text_response(r.json())
def hf_stt_from_bytes(audio_bytes: bytes, model: Optional[str] = None) -> str:
if not HF_API_TOKEN:
raise RuntimeError("HF_API_TOKEN not configured")
model = model or HF_STT_MODEL
url = f"https://api-inference.huggingface.co/models/{model}"
headers = dict(HF_HEADERS)
headers["Content-Type"] = "application/octet-stream"
logger.info("HF STT -> model=%s bytes=%d", model, len(audio_bytes) if audio_bytes else 0)
r = requests.post(url, headers=headers, data=audio_bytes, timeout=180)
if r.status_code != 200:
logger.error("HF STT error %s: %s", r.status_code, r.text[:300])
raise RuntimeError(f"HF STT failed: {r.status_code}: {r.text}")
j = r.json()
if isinstance(j, dict) and "text" in j:
return j["text"]
return _parse_hf_text_response(j)
# ====== TTS: prefer gTTS (free). If HF_TTS_MODEL provided you can implement HF TTS similarly. ======
def tts_gtts_bytes(text: str) -> bytes:
if not text:
raise RuntimeError("Empty text for TTS")
# detect language (vi/en) to choose voice
try:
lang = detect(text)
except Exception:
lang = "vi"
# select language code: prefer Vietnamese if detect says 'vi'
tts_lang = "vi" if lang.startswith("vi") else "en"
logger.info("gTTS generating audio lang=%s len=%d", tts_lang, len(text))
tts = gTTS(text=text, lang=tts_lang)
bio = io.BytesIO()
tts.write_to_fp(bio)
bio.seek(0)
return bio.read()
# ====== Telegram helpers (optional) ======
def send_telegram(text: str):
if not TELEGRAM_TOKEN or not TELEGRAM_CHATID:
logger.debug("Telegram not configured or missing chat id")
return
base = f"https://api.telegram.org/bot{TELEGRAM_TOKEN}"
try:
requests.post(base + "/sendMessage", json={"chat_id": TELEGRAM_CHATID, "text": text}, timeout=10)
except Exception:
logger.exception("send_telegram failed")
def telegram_poller():
if not TELEGRAM_TOKEN:
logger.info("Telegram poller disabled")
return
base = f"https://api.telegram.org/bot{TELEGRAM_TOKEN}"
offset = None
logger.info("Telegram poller started")
while True:
try:
params = {"timeout": 30}
if offset: params["offset"] = offset
r = requests.get(base + "/getUpdates", params=params, timeout=35)
if r.status_code != 200:
time.sleep(2); continue
data = r.json()
for upd in data.get("result", []):
offset = upd.get("update_id", 0) + 1
msg = upd.get("message") or {}
chat = msg.get("chat", {})
chat_id = chat.get("id")
text = (msg.get("text") or "").strip()
if not text:
continue
logger.info("TG msg: %s", text)
if text.lower().startswith("/ask "):
q = text[5:].strip()
try:
ans = hf_text_generate(q)
except Exception as e:
ans = f"[HF error] {e}"
try:
requests.post(base + "/sendMessage", json={"chat_id": chat_id, "text": ans}, timeout=10)
except Exception:
logger.exception("tg reply failed")
elif text.lower().startswith("/say "):
phrase = text[5:].strip()
try:
audio = tts_gtts_bytes(phrase)
files = {"audio": ("reply.mp3", audio, "audio/mpeg")}
requests.post(base + "/sendAudio", files=files, data={"chat_id": chat_id}, timeout=30)
except Exception:
logger.exception("tg say failed")
elif text.lower().startswith("/status"):
try:
requests.post(base + "/sendMessage", json={"chat_id": chat_id, "text": "KC Robot brain running"}, timeout=10)
except Exception:
pass
else:
try:
requests.post(base + "/sendMessage", json={"chat_id": chat_id, "text": "Commands: /ask <q> | /say <text> | /status"}, timeout=10)
except Exception:
pass
except Exception:
logger.exception("telegram poller exception")
time.sleep(3)
if TELEGRAM_TOKEN:
t = threading.Thread(target=telegram_poller, daemon=True)
t.start()
# ====== Gradio UI (chat + TTS + STT) ======
with gr.Blocks(title="KC Robot AI - Cloud Brain V4") as demo:
gr.Markdown("## 🤖 KC Robot AI — Cloud Brain (Hugging Face Inference)")
with gr.Row():
with gr.Column(scale=2):
chatbot = gr.Chatbot([], elem_id="chatbot").style(height=480)
txt = gr.Textbox(lines=2, placeholder="Nhập câu hỏi (VN/EN) hoặc tiếng Anh...", label="Your message")
send = gr.Button("Gửi")
with gr.Row():
temp = gr.Slider(0.0, 1.0, value=0.7, label="Temperature")
tokens = gr.Slider(16, 1024, value=256, step=16, label="Max tokens")
model_override = gr.Textbox(label="Override HF model (optional)")
with gr.Column(scale=1):
gr.Markdown("### TTS / STT")
tts_in = gr.Textbox(lines=2, label="Text → TTS")
tts_btn = gr.Button("Create TTS")
tts_audio = gr.Audio(label="TTS audio", interactive=False)
gr.Markdown("Upload audio for STT")
up = gr.Audio(source="upload", type="filepath", label="Upload audio")
stt_btn = gr.Button("Transcribe")
stt_out = gr.Textbox(label="Transcription")
def chat_fn(message, history, temperature, max_tokens, model_override_val):
if not message or not message.strip():
return history or [], ""
system = "You are KC Robot AI, bilingual (Vietnamese & English). Answer in the same language as the user. Be clear and helpful."
prompt = f"{system}\n\nUser: {message}\nAssistant:"
model = model_override_val.strip() if model_override_val else HF_MODEL
try:
ans = hf_text_generate(prompt, model=model, max_new_tokens=int(max_tokens), temperature=float(temperature))
except Exception as e:
ans = f"[HF error] {e}"
history = history or []
history.append(("You", message))
history.append(("Bot", ans))
push_display(f"YOU: {message[:40]}")
push_display(f"BOT: {ans[:40]}")
return history, ""
def tts_fn(text, model_override_val):
if not text or not text.strip():
return None
# prefer gTTS (free)
try:
audio = tts_gtts_bytes(text)
return (audio, "audio/mpeg")
except Exception as e:
raise gr.Error(f"TTS failed: {e}")
def stt_fn(local_path, model_override_val):
if not local_path:
return ""
with open(local_path, "rb") as f:
b = f.read()
try:
text = hf_stt_from_bytes(b)
except Exception as e:
raise gr.Error(f"STT failed: {e}")
push_display(f"Voice: {text[:40]}")
return text
send.click(chat_fn, inputs=[txt, chatbot, temp, tokens, model_override], outputs=[chatbot, txt])
tts_btn.click(tts_fn, inputs=[tts_in, model_override], outputs=[tts_audio])
stt_btn.click(stt_fn, inputs=[up, model_override], outputs=[stt_out])
# ====== Expose REST endpoints under same server (Gradio uses FastAPI) ======
app = demo.app # FastAPI app
from fastapi import Request, UploadFile, File
from starlette.responses import JSONResponse, Response
@app.post("/api/ask")
async def api_ask(request: Request):
try:
j = await request.json()
except Exception:
return JSONResponse({"error":"invalid json"}, status_code=400)
text = (j.get("text","") or "").strip()
lang = (j.get("lang","auto") or "auto").strip().lower()
if not text:
return JSONResponse({"error":"no text"}, status_code=400)
if lang == "vi":
prompt = "Bạn là trợ lý thông minh. Trả lời bằng tiếng Việt, rõ ràng:\n\n" + text
elif lang == "en":
prompt = "You are a helpful assistant. Answer in English:\n\n" + text
else:
prompt = "You are bilingual assistant (Vietnamese/English). Answer in the language of the question.\n\n" + text
try:
ans = hf_text_generate(prompt)
except Exception as e:
return JSONResponse({"error": str(e)}, status_code=500)
CONVERSATION.append((text, ans))
push_display(f"YOU: {text[:40]}")
push_display(f"BOT: {ans[:40]}")
return {"answer": ans}
@app.post("/api/tts")
async def api_tts(request: Request):
try:
j = await request.json()
except Exception:
return JSONResponse({"error":"invalid json"}, status_code=400)
text = (j.get("text","") or "").strip()
if not text:
return JSONResponse({"error":"no text"}, status_code=400)
# use gTTS (free)
try:
mp3 = tts_gtts_bytes(text)
except Exception as e:
return JSONResponse({"error": str(e)}, status_code=500)
return Response(content=mp3, media_type="audio/mpeg")
@app.post("/api/stt")
async def api_stt(file: UploadFile = File(...)):
try:
content = await file.read()
except Exception:
return JSONResponse({"error":"file read error"}, status_code=400)
if not content:
return JSONResponse({"error":"no audio content"}, status_code=400)
try:
text = hf_stt_from_bytes(content)
except Exception as e:
return JSONResponse({"error": str(e)}, status_code=500)
push_display(f"Voice: {text[:40]}")
CONVERSATION.append((f"[voice] {text}", ""))
return {"text": text}
@app.post("/api/presence")
async def api_presence(request: Request):
try:
j = await request.json()
except Exception:
return JSONResponse({"error":"invalid json"}, status_code=400)
note = (j.get("note","Có người phía trước") or "").strip()
greeting = f"Xin chào! {note}"
push_display(f"RADAR: {note[:40]}")
CONVERSATION.append(("__presence__", greeting))
if TELEGRAM_TOKEN and TELEGRAM_CHATID:
try:
send_telegram(f"⚠️ Robot: Phát hiện người - {note}")
except Exception:
logger.exception("telegram notify failed")
return {"greeting": greeting}
@app.get("/api/display")
async def api_display():
return {"lines": DISPLAY_BUFFER.copy(), "conv_len": len(CONVERSATION)}
# ====== Launch app ======
if __name__ == "__main__":
demo.launch(server_name="0.0.0.0", server_port=int(os.environ.get("PORT", 7860)))