testbotai / app.py
dedlepexa's picture
Update app.py
03373ef verified
Raw
History Blame Contribute Delete
4.04 kB
from fastapi import FastAPI
from fastapi.responses import PlainTextResponse
from pydantic import BaseModel
from transformers import AutoTokenizer, AutoModelForCausalLM, TextIteratorStreamer
from deep_translator import GoogleTranslator
import torch
import uvicorn
import threading
import time
from collections import OrderedDict
app = FastAPI()
# 🔥 MODEL
model_name = "Qwen/Qwen2.5-1.5B-Instruct"
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForCausalLM.from_pretrained(
model_name,
torch_dtype=torch.float16,
device_map="auto"
)
MAX_HISTORY = 40
NUM_WORKERS = 3
db = OrderedDict()
queue = []
class Message(BaseModel):
message: str
# 🔥 split text
def split_text(text, max_len=100):
return "\n".join([text[i:i+max_len] for i in range(0, len(text), max_len)])
# 🔥 очистка мусора Qwen
def clean_output(text: str):
bad = [
"system",
"user",
"assistant",
"<|im_start|>",
"<|im_end|>",
"You are Qwen"
]
for b in bad:
text = text.replace(b, "")
return text.strip()
# 🔥 GENERATION
def generate_ai_stream(message: str):
messages = [
{
"role": "system",
"content": (
"Ты умный и точный ассистент. "
"Отвечай логично,кратко и понятно. "
"отвечай ВСЕГДА на русском."
)
},
{
"role": "user",
"content": message
}
]
prompt = tokenizer.apply_chat_template(
messages,
tokenize=False,
add_generation_prompt=True
)
inputs = tokenizer(prompt, return_tensors="pt").to(model.device)
streamer = TextIteratorStreamer(tokenizer, skip_special_tokens=True)
gen_kwargs = dict(
**inputs,
max_new_tokens=400,
do_sample=True,
temperature=0.7,
top_p=0.9,
streamer=streamer,
eos_token_id=tokenizer.eos_token_id
)
thread = threading.Thread(target=model.generate, kwargs=gen_kwargs)
thread.start()
partial = ""
# 🔥 streaming
for text in streamer:
partial += text
if message in db:
db[message]["reply"] = split_text(partial)
# 🔥 чистка
raw = clean_output(partial)
# 🔥 перевод (fallback)
try:
translated = GoogleTranslator(
source='auto',
target='ru'
).translate(raw)
except:
translated = raw
final_text = split_text(translated) + " full generated"
if message in db:
db[message]["reply"] = final_text
db[message]["status"] = "done"
return final_text
# 🔥 worker
def worker():
while True:
if queue:
message = queue.pop(0)
if message in db and db[message]["status"] == "done":
continue
generate_ai_stream(message)
else:
time.sleep(0.01)
# 🔥 workers
for _ in range(NUM_WORKERS):
threading.Thread(target=worker, daemon=True).start()
@app.get("/")
async def root():
return PlainTextResponse("AI server running (Qwen2.5 1.5B Instruct)")
@app.get("/ask")
async def ask(message: str):
if message in db and db[message]["status"] == "done":
return PlainTextResponse("cached")
if message not in db:
db[message] = {
"status": "pending",
"reply": ""
}
queue.append(message)
if len(db) > MAX_HISTORY:
db.popitem(last=False)
return PlainTextResponse("accepted")
@app.get("/get")
async def get(message: str):
if message not in db:
return PlainTextResponse("not found")
data = db[message]
if data["status"] == "pending":
return PlainTextResponse(data["reply"] or "thinking...")
return PlainTextResponse(data["reply"])
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=7860)