Loomis Green
Optimize Qwen with streaming and auto device map
3425710
from fastapi import FastAPI
from fastapi.staticfiles import StaticFiles
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import FileResponse, StreamingResponse
from transformers import AutoModelForCausalLM, AutoTokenizer, TextIteratorStreamer
from threading import Thread
from pydantic import BaseModel
import torch
import os
import json
# Define Model details
# We use the 1.5B model because it runs fast on CPU and installs instantly (no compilation needed).
MODEL_ID = "Qwen/Qwen2.5-Coder-1.5B-Instruct"
print(f"Loading {MODEL_ID}...")
tokenizer = AutoTokenizer.from_pretrained(MODEL_ID)
model = AutoModelForCausalLM.from_pretrained(
MODEL_ID,
torch_dtype="auto", # Changed to "auto" for performance optimization
device_map="auto"
)
print("Model Loaded Successfully!")
app = FastAPI()
# Global Conversation History (Simple Server-Side Memory)
DEFAULT_SYSTEM_PROMPT = {
"role": "system",
"content": (
"You are Loomyloo, a smart and helpful AI assistant. "
"You are chatting with a user named Loomis (unless they tell you otherwise). "
"Your name is Loomyloo. The user's name is Loomis. "
"Never confuse your name with the user's name. "
"You are running on the fast Qwen2.5-Coder-1.5B-Instruct model. "
"Keep your answers concise, friendly, and helpful."
)
}
conversation_history = [DEFAULT_SYSTEM_PROMPT]
# Enable CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Reset Memory Route
@app.get('/reset')
def reset_memory():
global conversation_history
conversation_history = [DEFAULT_SYSTEM_PROMPT]
print("Memory Reset!")
return {"status": "Memory cleared"}
# Streaming Route (Optimized)
class PromptRequest(BaseModel):
prompt: str
@app.post('/stream')
def stream(request: PromptRequest):
prompt = request.prompt
global conversation_history
# 1. Add User Message to History
conversation_history.append({"role": "user", "content": prompt})
# 2. Prune History (Keep System Prompt + Last 10 exchanges)
if len(conversation_history) > 21:
conversation_history = [DEFAULT_SYSTEM_PROMPT] + conversation_history[-20:]
# 3. Format inputs using the tokenizer's chat template
text = tokenizer.apply_chat_template(
conversation_history,
tokenize=False,
add_generation_prompt=True
)
model_inputs = tokenizer([text], return_tensors="pt").to(model.device)
# 4. Setup Streamer
streamer = TextIteratorStreamer(tokenizer, skip_prompt=True, skip_special_tokens=True)
generation_kwargs = dict(
model_inputs,
streamer=streamer,
max_new_tokens=512,
temperature=0.7,
top_p=0.9,
do_sample=True
)
# 5. Run Generation in a separate thread
thread = Thread(target=model.generate, kwargs=generation_kwargs)
thread.start()
def stream_generator():
accumulated_text = ""
for new_text in streamer:
accumulated_text += new_text
# OPTIMIZATION: Send only the new token to save bandwidth
yield json.dumps({"token": new_text}) + "\n"
# Add Assistant Response to History
conversation_history.append({"role": "assistant", "content": accumulated_text})
return StreamingResponse(stream_generator(), media_type="application/x-ndjson")
# Standard API Route (Non-streaming fallback)
@app.get('/ask')
def ask(prompt: str):
global conversation_history
# 1. Add User Message to History
conversation_history.append({"role": "user", "content": prompt})
# 2. Prune History (Keep System Prompt + Last 10 exchanges)
if len(conversation_history) > 21:
conversation_history = [DEFAULT_SYSTEM_PROMPT] + conversation_history[-20:]
print(f"Current History Length: {len(conversation_history)}")
# 3. Format inputs using the tokenizer's chat template
text = tokenizer.apply_chat_template(
conversation_history,
tokenize=False,
add_generation_prompt=True
)
model_inputs = tokenizer([text], return_tensors="pt").to(model.device)
# 4. Generate Response
# max_new_tokens: limit response length
generated_ids = model.generate(
**model_inputs,
max_new_tokens=512,
temperature=0.7,
top_p=0.9,
do_sample=True
)
# 5. Decode Response
# We strip the prompt from the output to get only the new text
generated_ids = [
output_ids[len(input_ids):] for input_ids, output_ids in zip(model_inputs.input_ids, generated_ids)
]
response_text = tokenizer.batch_decode(generated_ids, skip_special_tokens=True)[0]
# 6. Add Assistant Response to History
conversation_history.append({"role": "assistant", "content": response_text})
# 7. Return Result
return {"generated_text": response_text}
# Serve Static Files
app.mount("/static", StaticFiles(directory="static"), name="static")
# Serve Index
@app.get("/")
async def read_index():
return FileResponse('static/index.html')