llmstream / app.py
triflix's picture
Create app.py
a5be475 verified
#!/usr/bin/env python3
"""OpenAI-compatible API server with streaming for Qwen3-0.6B."""
import glob, json, os, time, uuid
from contextlib import asynccontextmanager
from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse, StreamingResponse
from llama_cpp import Llama
# ── locate model ────────────────────────────────────────────────
MODEL_DIR = os.environ.get("MODEL_DIR", "/home/user/models")
gguf_files = glob.glob(os.path.join(MODEL_DIR, "**", "*.gguf"), recursive=True)
if not gguf_files:
raise RuntimeError(f"No .gguf model found in {MODEL_DIR}")
MODEL_PATH = gguf_files[0]
MODEL_ID = "qwen3-0.6b"
# ── lifespan (load model once) ──────────────────────────────────
llm: Llama | None = None
@asynccontextmanager
async def lifespan(application: FastAPI):
global llm
print(f"Loading model: {MODEL_PATH}")
llm = Llama(
model_path=MODEL_PATH,
n_ctx=2048,
n_threads=int(os.environ.get("N_THREADS", 2)),
chat_format="chatml", # Qwen3 uses ChatML
verbose=False,
)
print("Model loaded βœ“")
yield
del llm
app = FastAPI(title="Qwen3-0.6B API", lifespan=lifespan)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
)
# ── helpers ─────────────────────────────────────────────────────
def _id():
return f"chatcmpl-{uuid.uuid4().hex[:12]}"
def _ts():
return int(time.time())
# ── routes ──────────────────────────────────────────────────────
@app.get("/")
async def health():
return {"status": "ok", "model": MODEL_ID}
@app.get("/v1/models")
async def list_models():
return {
"object": "list",
"data": [
{
"id": MODEL_ID,
"object": "model",
"created": _ts(),
"owned_by": "qwen",
}
],
}
# ── /v1/chat/completions ───────────────────────────────────────
@app.post("/v1/chat/completions")
async def chat_completions(request: Request):
body = await request.json()
messages = body.get("messages", [])
stream = body.get("stream", False)
temperature = body.get("temperature", 0.7)
max_tokens = body.get("max_tokens", 512)
top_p = body.get("top_p", 0.9)
top_k = body.get("top_k", 40)
params = dict(
messages=messages,
temperature=temperature,
max_tokens=max_tokens,
top_p=top_p,
top_k=top_k,
stream=stream,
)
if stream:
return StreamingResponse(
_stream_chat(params),
media_type="text/event-stream",
headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
)
result = llm.create_chat_completion(**params)
return JSONResponse(content=result)
async def _stream_chat(params: dict):
try:
for chunk in llm.create_chat_completion(**params):
yield f"data: {json.dumps(chunk)}\n\n"
except Exception as e:
err = {"error": {"message": str(e), "type": "server_error"}}
yield f"data: {json.dumps(err)}\n\n"
yield "data: [DONE]\n\n"
# ── /v1/completions (text completion) ──────────────────────────
@app.post("/v1/completions")
async def completions(request: Request):
body = await request.json()
params = dict(
prompt=body.get("prompt", ""),
max_tokens=body.get("max_tokens", 512),
temperature=body.get("temperature", 0.7),
top_p=body.get("top_p", 0.9),
stream=body.get("stream", False),
)
if params["stream"]:
return StreamingResponse(
_stream_completion(params),
media_type="text/event-stream",
headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
)
return JSONResponse(content=llm.create_completion(**params))
async def _stream_completion(params: dict):
try:
for chunk in llm.create_completion(**params):
yield f"data: {json.dumps(chunk)}\n\n"
except Exception as e:
err = {"error": {"message": str(e), "type": "server_error"}}
yield f"data: {json.dumps(err)}\n\n"
yield "data: [DONE]\n\n"
# ── main ────────────────────────────────────────────────────────
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=7860)