""" ============================================================================= Transformers + FastAPI — OpenAI-Compatible Server Base : unsloth/qwen2.5-0.5b-unsloth-bnb-4bit Adapter: MuhammadNoman7600/mermaid (LoRA r=16 α=16) CPU-ONLY fallback • TOOL CALLING • STREAMING • Port 7860 ============================================================================= """ import json import os import re import time import uuid from threading import Lock, Thread from typing import Any, Optional, Union import torch import uvicorn from fastapi import FastAPI, HTTPException from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import JSONResponse, StreamingResponse from peft import PeftModel from pydantic import BaseModel from transformers import ( AutoModelForCausalLM, AutoTokenizer, BitsAndBytesConfig, TextIteratorStreamer, ) # ━━━━━━━━━━━━━━━━━━━━━━━━━━ CONFIG ━━━━━━━━━━━━━━━━━━━━━━━━━━━━ BASE_MODEL_NAME = "Qwen/Qwen2.5-0.5B-Instruct" # CPU-safe (float32); unsloth 4-bit needs CUDA ADAPTER_NAME = "MuhammadNoman7600/mermaid" DISPLAY_MODEL_NAME = "MuhammadNoman7600/mermaid" HOST = "0.0.0.0" PORT = 7860 MAX_NEW_TOKENS = 32768 # ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ app = FastAPI( title="Mermaid Fine-Tuned Qwen2.5-0.5B — OpenAI-Compatible API", description="LoRA adapter MuhammadNoman7600/mermaid on Qwen2.5-0.5B with tool calling", version="2.0.0", ) app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # ━━━━━━━━━━━━━━━━━━━━━━━ Pydantic Models ━━━━━━━━━━━━━━━━━━━━━━ class FunctionDef(BaseModel): name: str description: Optional[str] = "" parameters: Optional[dict] = None class ToolDef(BaseModel): type: str = "function" function: FunctionDef class FunctionCallModel(BaseModel): name: str arguments: str class ToolCallObj(BaseModel): id: str type: str = "function" function: FunctionCallModel class ChatMessage(BaseModel): role: str content: Optional[str] = None tool_calls: Optional[list[ToolCallObj]] = None tool_call_id: Optional[str] = None name: Optional[str] = None class ChatCompletionRequest(BaseModel): model: str = DISPLAY_MODEL_NAME messages: list[ChatMessage] temperature: Optional[float] = 0.7 top_p: Optional[float] = 0.9 max_tokens: Optional[int] = 1024 stream: Optional[bool] = False stop: Optional[Union[str, list[str]]] = None frequency_penalty: Optional[float] = 0.0 presence_penalty: Optional[float] = 0.0 repetition_penalty: Optional[float] = 1.0 n: Optional[int] = 1 tools: Optional[list[ToolDef]] = None tool_choice: Optional[Union[str, dict]] = None class CompletionRequest(BaseModel): model: str = DISPLAY_MODEL_NAME prompt: Union[str, list[str]] = "" temperature: Optional[float] = 0.7 top_p: Optional[float] = 0.9 max_tokens: Optional[int] = 512 stream: Optional[bool] = False stop: Optional[Union[str, list[str]]] = None frequency_penalty: Optional[float] = 0.0 presence_penalty: Optional[float] = 0.0 repetition_penalty: Optional[float] = 1.0 n: Optional[int] = 1 # ━━━━━━━━━━━━━━━━━━━ Model Loading ━━━━━━━━━━━━━━━━━━━━━━━━━━━━ tokenizer: Any = None model: Any = None generate_lock = Lock() stop_token_ids: list[int] = [] def load_model(): global tokenizer, model, stop_token_ids if model is not None: return print(f"\n🚀 Base model : {BASE_MODEL_NAME}") print(f"🔌 LoRA adapter: {ADAPTER_NAME}") print(f" HF_HOME = {os.environ.get('HF_HOME', 'default')}\n") # ── Tokenizer ─────────────────────────────────────────────── # Adapter repos rarely ship a tokenizer; fall back to base. try: tokenizer = AutoTokenizer.from_pretrained( ADAPTER_NAME, use_fast=True, trust_remote_code=True ) print(" Tokenizer loaded from adapter repo.") except Exception: tokenizer = AutoTokenizer.from_pretrained( BASE_MODEL_NAME, use_fast=True, trust_remote_code=True ) print(" Tokenizer loaded from base model repo.") if tokenizer.pad_token is None: tokenizer.pad_token = tokenizer.eos_token # ── Base model ────────────────────────────────────────────── # Load in 4-bit if CUDA is available (matches training setup), # otherwise fall back to float32 on CPU. use_4bit = torch.cuda.is_available() if use_4bit: print(" CUDA detected — loading in 4-bit (bitsandbytes nf4).") bnb_config = BitsAndBytesConfig( load_in_4bit=True, bnb_4bit_quant_type="nf4", bnb_4bit_use_double_quant=True, bnb_4bit_compute_dtype=torch.float16, ) base = AutoModelForCausalLM.from_pretrained( BASE_MODEL_NAME, quantization_config=bnb_config, device_map="auto", trust_remote_code=True, ) else: print(" No CUDA — loading base model in float32 on CPU.") # unsloth/qwen2.5-0.5b-unsloth-bnb-4bit has a bnb-4bit quantization_config # baked into its model config. On CPU we MUST strip it so that transformers # does not attempt to invoke bitsandbytes (which requires CUDA). from transformers import AutoConfig cfg = AutoConfig.from_pretrained(BASE_MODEL_NAME, trust_remote_code=True) if hasattr(cfg, "quantization_config"): del cfg.quantization_config base = AutoModelForCausalLM.from_pretrained( BASE_MODEL_NAME, config=cfg, quantization_config=None, dtype=torch.float32, device_map="cpu", trust_remote_code=True, ) # ── Attach LoRA adapter ───────────────────────────────────── print(f" Attaching LoRA adapter …") model = PeftModel.from_pretrained( base, ADAPTER_NAME, is_trainable=False, # inference only ) model.eval() # ── Stop-token IDs ────────────────────────────────────────── _stop_ids: set[int] = set() if tokenizer.eos_token_id is not None: _stop_ids.add(tokenizer.eos_token_id) for tok_str in ["<|im_end|>", "<|endoftext|>"]: tid = tokenizer.convert_tokens_to_ids(tok_str) if tid is not None and tid != tokenizer.unk_token_id: _stop_ids.add(tid) stop_token_ids = list(_stop_ids) print(f" eos_token = {tokenizer.eos_token!r}") print(f" stop_token_ids = {stop_token_ids}") print("✅ Fine-tuned model ready!\n") # ━━━━━━━━━━━━━━━━━━━━ Chat-Prompt Builder (ChatML) ━━━━━━━━━━━━ TOOL_SYSTEM_PROMPT_TEMPLATE = """\ You are Qwen, created by Alibaba Cloud. You are a helpful assistant. # Tools You may call one or more functions to assist with the user query. You are provided with function signatures within XML tags: {tool_definitions} For each function call, return a json object with function name and arguments within XML tags: {{"name": "", "arguments": }} """ NO_TOOL_SYSTEM_PROMPT = ( "You are Qwen, created by Alibaba Cloud. You are a helpful assistant." ) def _serialize_tool_definitions(tools: list[ToolDef]) -> str: lines = [] for t in tools: obj: dict[str, Any] = { "type": "function", "function": { "name": t.function.name, "description": t.function.description or "", }, } if t.function.parameters: obj["function"]["parameters"] = t.function.parameters lines.append(json.dumps(obj)) return "\n".join(lines) def build_chat_prompt( messages: list[ChatMessage], tools: Optional[list[ToolDef]] = None, tool_choice: Optional[Union[str, dict]] = None, ) -> str: parts: list[str] = [] has_system = any(m.role == "system" for m in messages) default_sys = ( TOOL_SYSTEM_PROMPT_TEMPLATE.format( tool_definitions=_serialize_tool_definitions(tools) ) if tools else NO_TOOL_SYSTEM_PROMPT ) if not has_system: parts.append(f"<|im_start|>system\n{default_sys}<|im_end|>\n") for msg in messages: role = msg.role if role == "system": base_content = msg.content or "" if tools: tool_block = TOOL_SYSTEM_PROMPT_TEMPLATE.format( tool_definitions=_serialize_tool_definitions(tools) ) merged = f"{base_content}\n\n{tool_block}" if base_content else tool_block parts.append(f"<|im_start|>system\n{merged}<|im_end|>\n") else: parts.append( f"<|im_start|>system\n{base_content or NO_TOOL_SYSTEM_PROMPT}<|im_end|>\n" ) elif role == "user": parts.append(f"<|im_start|>user\n{msg.content or ''}<|im_end|>\n") elif role == "assistant": if msg.tool_calls: tc_text = "" for tc in msg.tool_calls: args = tc.function.arguments if isinstance(args, dict): args = json.dumps(args) tc_text += ( f"\n\n" f'{{"name": "{tc.function.name}", "arguments": {args}}}\n' f"" ) parts.append(f"<|im_start|>assistant{tc_text}<|im_end|>\n") else: parts.append( f"<|im_start|>assistant\n{msg.content or ''}<|im_end|>\n" ) elif role == "tool": parts.append( f"<|im_start|>user\n" f"\n{msg.content or ''}\n" f"<|im_end|>\n" ) parts.append("<|im_start|>assistant\n") return "".join(parts) # ━━━━━━━━━━━━━━━━━━ Tool-Call Parser ━━━━━━━━━━━━━━━━━━━━━━━━━━ _TOOL_CALL_RE = re.compile(r"\s*(\{.*?\})\s*", re.DOTALL) def parse_tool_calls(text: str) -> tuple[Optional[str], list[dict]]: tool_calls: list[dict] = [] for raw_json in _TOOL_CALL_RE.findall(text): try: parsed = json.loads(raw_json) except json.JSONDecodeError: continue name = parsed.get("name", "") arguments = parsed.get("arguments", {}) if isinstance(arguments, dict): arguments = json.dumps(arguments) elif not isinstance(arguments, str): arguments = json.dumps(arguments) tool_calls.append({ "id": f"call_{uuid.uuid4().hex[:24]}", "type": "function", "function": {"name": name, "arguments": arguments}, }) content = _TOOL_CALL_RE.sub("", text).strip() or None return content, tool_calls # ━━━━━━━━━━━━━━━━━━ Generation Helpers ━━━━━━━━━━━━━━━━━━━━━━━━ def _clean_output(text: str) -> str: for tok in ["<|im_end|>", "<|im_start|>", "<|endoftext|>"]: text = text.replace(tok, "") return text.strip() def _build_gen_kwargs(inputs: dict, req: Any, streamer=None) -> dict: kwargs: dict[str, Any] = { "input_ids": inputs["input_ids"], "attention_mask": inputs.get("attention_mask"), "max_new_tokens": req.max_tokens or MAX_NEW_TOKENS, "do_sample": True, "temperature": max(req.temperature, 0.01), "top_p": req.top_p, "eos_token_id": stop_token_ids, "pad_token_id": tokenizer.pad_token_id, } rep_penalty = getattr(req, "repetition_penalty", 1.0) if rep_penalty and rep_penalty > 1.0: kwargs["repetition_penalty"] = rep_penalty if streamer is not None: kwargs["streamer"] = streamer return kwargs def generate_text(prompt: str, req) -> tuple[str, int, int]: inputs = tokenizer(prompt, return_tensors="pt") prompt_tokens = inputs["input_ids"].shape[1] gen_kwargs = _build_gen_kwargs(inputs, req) with generate_lock: with torch.no_grad(): output_ids = model.generate(**gen_kwargs) new_ids = output_ids[0][prompt_tokens:] text = _clean_output(tokenizer.decode(new_ids, skip_special_tokens=False)) return text, prompt_tokens, len(new_ids) def generate_text_stream(prompt: str, req): inputs = tokenizer(prompt, return_tensors="pt") streamer = TextIteratorStreamer( tokenizer, skip_prompt=True, skip_special_tokens=False ) gen_kwargs = _build_gen_kwargs(inputs, req, streamer=streamer) thread = Thread(target=_generate_in_thread, args=(gen_kwargs,)) thread.start() for token_text in streamer: if any(s in token_text for s in ["<|im_end|>", "<|endoftext|>"]): cleaned = _clean_output(token_text) if cleaned: yield cleaned break yield token_text thread.join() def _generate_in_thread(gen_kwargs: dict): with generate_lock: with torch.no_grad(): model.generate(**gen_kwargs) # ━━━━━━━━━━━━━━━━━━ Response Builders ━━━━━━━━━━━━━━━━━━━━━━━━━ def _uid(prefix: str = "chatcmpl") -> str: return f"{prefix}-{uuid.uuid4().hex[:12]}" def make_chat_response( content: Optional[str], tool_calls: list[dict], model_name: str, prompt_tokens: int, completion_tokens: int, ) -> dict: message: dict[str, Any] = {"role": "assistant"} if tool_calls: message["content"] = content message["tool_calls"] = tool_calls finish_reason = "tool_calls" else: message["content"] = (content or "").strip() finish_reason = "stop" return { "id": _uid(), "object": "chat.completion", "created": int(time.time()), "model": model_name, "choices": [{"index": 0, "message": message, "finish_reason": finish_reason}], "usage": { "prompt_tokens": prompt_tokens, "completion_tokens": completion_tokens, "total_tokens": prompt_tokens + completion_tokens, }, } def make_completion_response( text: str, model_name: str, prompt_tokens: int, completion_tokens: int ) -> dict: return { "id": _uid("cmpl"), "object": "text_completion", "created": int(time.time()), "model": model_name, "choices": [{"index": 0, "text": text.strip(), "finish_reason": "stop"}], "usage": { "prompt_tokens": prompt_tokens, "completion_tokens": completion_tokens, "total_tokens": prompt_tokens + completion_tokens, }, } # ━━━━━━━━━━━━━━━━━━ Streaming Helpers ━━━━━━━━━━━━━━━━━━━━━━━━ def stream_chat_response(prompt: str, req): cid, created = _uid(), int(time.time()) def _chunk(delta: dict, finish: Optional[str] = None) -> str: return "data: " + json.dumps({ "id": cid, "object": "chat.completion.chunk", "created": created, "model": req.model, "choices": [{"index": 0, "delta": delta, "finish_reason": finish}], }) + "\n\n" yield _chunk({"role": "assistant"}) for token_text in generate_text_stream(prompt, req): if token_text: yield _chunk({"content": token_text}) yield _chunk({}, finish="stop") yield "data: [DONE]\n\n" def stream_tool_call_chunks( content: Optional[str], tool_calls: list[dict], model_name: str ): cid, created = _uid(), int(time.time()) def _chunk(delta: dict, finish: Optional[str] = None) -> str: return "data: " + json.dumps({ "id": cid, "object": "chat.completion.chunk", "created": created, "model": model_name, "choices": [{"index": 0, "delta": delta, "finish_reason": finish}], }) + "\n\n" yield _chunk({"role": "assistant"}) for idx, tc in enumerate(tool_calls): yield _chunk({"tool_calls": [{ "index": idx, "id": tc["id"], "type": "function", "function": {"name": tc["function"]["name"], "arguments": ""}, }]}) yield _chunk({"tool_calls": [{ "index": idx, "function": {"arguments": tc["function"]["arguments"]}, }]}) if content: yield _chunk({"content": content}) yield _chunk({}, finish="tool_calls" if tool_calls else "stop") yield "data: [DONE]\n\n" # ━━━━━━━━━━━━━━━━━━━━━━ ROUTES ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ @app.get("/") async def root(): return { "message": "Mermaid Fine-Tuned Qwen2.5-0.5B OpenAI-Compatible API", "base_model": BASE_MODEL_NAME, "adapter": ADAPTER_NAME, "docs": "/docs", "endpoints": { "models": "/v1/models", "chat": "/v1/chat/completions", "completions": "/v1/completions", "health": "/health", }, } @app.get("/v1/models") async def list_models(): return { "object": "list", "data": [{ "id": DISPLAY_MODEL_NAME, "object": "model", "created": int(time.time()), "owned_by": "MuhammadNoman7600", }], } @app.post("/v1/chat/completions") async def chat_completions(req: ChatCompletionRequest): try: prompt = build_chat_prompt(req.messages, req.tools, req.tool_choice) # Tool-calling: generate fully first, then parse if req.tools: text, prompt_tokens, completion_tokens = generate_text(prompt, req) content, tool_calls = parse_tool_calls(text) if req.stream: return StreamingResponse( stream_tool_call_chunks(content, tool_calls, req.model), media_type="text/event-stream", ) return JSONResponse( make_chat_response( content, tool_calls, req.model, prompt_tokens, completion_tokens ) ) # Normal chat with optional streaming if req.stream: return StreamingResponse( stream_chat_response(prompt, req), media_type="text/event-stream", ) text, prompt_tokens, completion_tokens = generate_text(prompt, req) return JSONResponse( make_chat_response(text, [], req.model, prompt_tokens, completion_tokens) ) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.post("/v1/completions") async def completions(req: CompletionRequest): try: prompts = [req.prompt] if isinstance(req.prompt, str) else req.prompt text, prompt_tokens, completion_tokens = generate_text(prompts[0], req) return JSONResponse( make_completion_response( text, req.model, prompt_tokens, completion_tokens ) ) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.get("/health") async def health(): device = "cuda" if torch.cuda.is_available() else "cpu" return { "status": "ok", "base_model": BASE_MODEL_NAME, "adapter": ADAPTER_NAME, "device": device, } # ━━━━━━━━━━━━━━━━━━━━━━ MAIN ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ if __name__ == "__main__": load_model() print(f"\n{'='*60}") print(f" OpenAI-compatible API — Fine-Tuned Mermaid Model") print(f" Base : {BASE_MODEL_NAME}") print(f" Adapter: {ADAPTER_NAME}") device_label = "CUDA (4-bit bitsandbytes)" if torch.cuda.is_available() else "CPU (float32)" print(f" Device : {device_label}") print(f" URL : http://{HOST}:{PORT}/v1") print(f"{'='*60}\n") uvicorn.run(app, host=HOST, port=PORT, log_level="info")