| import asyncio |
| import concurrent.futures |
| from huggingface_hub import hf_hub_download |
| from llama_cpp import Llama, LlamaRAMCache |
| from fastapi import FastAPI |
| from fastapi.responses import StreamingResponse |
| from pydantic import BaseModel |
| import uvicorn |
|
|
| |
| |
| |
| MODEL_REPO = "Qwen/Qwen2.5-3B-Instruct-GGUF" |
| MODEL_FILE = "qwen2.5-3b-instruct-q4_k_m.gguf" |
|
|
| SYSTEM_PROMPT = ( |
| "<|im_start|>system\n" |
| "You are a highly capable technical assistant." |
| "<|im_end|>\n" |
| ) |
|
|
| |
| |
| |
| print("Downloading model...") |
| model_path = hf_hub_download(repo_id=MODEL_REPO, filename=MODEL_FILE) |
|
|
| print("Tokenizing system prompt for n_keep...") |
| _tmp = Llama(model_path=model_path, n_ctx=64, verbose=False) |
| _system_token_count = len(_tmp.tokenize(SYSTEM_PROMPT.encode())) |
| del _tmp |
| print(f"System prompt is {_system_token_count} tokens β pinning in KV cache.") |
|
|
| print("Loading model into memory...") |
| llm = Llama( |
| model_path=model_path, |
| n_threads=8, |
| n_ctx=16384, |
| n_keep=_system_token_count, |
| n_batch=1024, |
| n_ubatch=512, |
| use_mmap=True, |
| use_mlock=True, |
| verbose=False, |
| ) |
|
|
| cache = LlamaRAMCache(capacity_bytes=8_589_934_592) |
| llm.set_cache(cache) |
| print("RAM cache initialized (8 GB).") |
|
|
| |
| |
| _executor = concurrent.futures.ThreadPoolExecutor(max_workers=1) |
|
|
| |
| |
| |
| app = FastAPI(title="Qwen-2.5 1.5B Local CPU API") |
|
|
|
|
| class QueryRequest(BaseModel): |
| prompt: str |
| max_tokens: int = 512 |
| temperature: float = 0.7 |
|
|
|
|
| def format_prompt(user_text: str) -> str: |
| return ( |
| f"{SYSTEM_PROMPT}" |
| f"<|im_start|>user\n{user_text}<|im_end|>\n" |
| f"<|im_start|>assistant\n" |
| ) |
|
|
|
|
| |
| |
| |
| @app.on_event("startup") |
| async def warmup(): |
| print("Warming up model...") |
| loop = asyncio.get_event_loop() |
| |
| await loop.run_in_executor( |
| _executor, |
| lambda: llm( |
| format_prompt("Explain what recursion is in one sentence."), |
| max_tokens=4, |
| stream=False, |
| ), |
| ) |
| print("Warmup complete. Server is ready.") |
|
|
|
|
| |
| |
| |
| @app.get("/") |
| def root(): |
| return {"status": "ok", "model": MODEL_FILE} |
|
|
|
|
| |
| |
| |
| @app.post("/generate") |
| async def generate_full(request: QueryRequest): |
| loop = asyncio.get_event_loop() |
| response = await loop.run_in_executor( |
| _executor, |
| lambda: llm( |
| format_prompt(request.prompt), |
| max_tokens=request.max_tokens, |
| stream=False, |
| temperature=request.temperature, |
| stop=["<|im_end|>", "<|im_start|>"], |
| ), |
| ) |
| return {"response": response["choices"][0]["text"]} |
|
|
|
|
| |
| |
| |
| @app.post("/stream") |
| async def generate_stream(request: QueryRequest): |
| formatted_prompt = format_prompt(request.prompt) |
| loop = asyncio.get_event_loop() |
|
|
| |
| |
| queue: asyncio.Queue = asyncio.Queue(maxsize=1) |
|
|
| def run_inference(): |
| try: |
| stream = llm( |
| formatted_prompt, |
| max_tokens=request.max_tokens, |
| stream=True, |
| temperature=request.temperature, |
| stop=["<|im_end|>", "<|im_start|>"], |
| ) |
| for chunk in stream: |
| text = chunk["choices"][0]["text"] |
| if text: |
| |
| |
| |
| fut = asyncio.run_coroutine_threadsafe(queue.put(text), loop) |
| fut.result() |
| finally: |
| asyncio.run_coroutine_threadsafe(queue.put(None), loop) |
|
|
| async def stream_generator(): |
| |
| yield f": {' ' * 2048}\n\n" |
|
|
| |
| loop.run_in_executor(_executor, run_inference) |
|
|
| while True: |
| token = await queue.get() |
| if token is None: |
| break |
| yield f"data: {token}\n\n" |
| |
| |
| await asyncio.sleep(0) |
|
|
| yield "data: [DONE]\n\n" |
|
|
| return StreamingResponse( |
| stream_generator(), |
| media_type="text/event-stream", |
| headers={ |
| "X-Accel-Buffering": "no", |
| "Cache-Control": "no-cache", |
| "Connection": "keep-alive", |
| "Content-Type": "text/event-stream; charset=utf-8", |
| }, |
| ) |
|
|
|
|
| |
| |
| |
| if __name__ == "__main__": |
| uvicorn.run(app, host="0.0.0.0", port=7860) |