Spaces:
Running
Running
| """ | |
| ============================================================================= | |
| Transformers + FastAPI β OpenAI-Compatible Server | |
| Base : unsloth/qwen2.5-0.5b-unsloth-bnb-4bit | |
| Adapter: MuhammadNoman7600/mermaid (LoRA r=16 Ξ±=16) | |
| CPU-ONLY fallback β’ TOOL CALLING β’ STREAMING β’ Port 7860 | |
| ============================================================================= | |
| """ | |
| import json | |
| import os | |
| import re | |
| import time | |
| import uuid | |
| from threading import Lock, Thread | |
| from typing import Any, Optional, Union | |
| import torch | |
| import uvicorn | |
| from fastapi import FastAPI, HTTPException | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.responses import JSONResponse, StreamingResponse | |
| from peft import PeftModel | |
| from pydantic import BaseModel | |
| from transformers import ( | |
| AutoModelForCausalLM, | |
| AutoTokenizer, | |
| BitsAndBytesConfig, | |
| TextIteratorStreamer, | |
| ) | |
| # ββββββββββββββββββββββββββ CONFIG ββββββββββββββββββββββββββββ | |
| BASE_MODEL_NAME = "Qwen/Qwen2.5-0.5B-Instruct" # CPU-safe (float32); unsloth 4-bit needs CUDA | |
| ADAPTER_NAME = "MuhammadNoman7600/mermaid" | |
| DISPLAY_MODEL_NAME = "MuhammadNoman7600/mermaid" | |
| HOST = "0.0.0.0" | |
| PORT = 7860 | |
| MAX_NEW_TOKENS = 32768 | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| app = FastAPI( | |
| title="Mermaid Fine-Tuned Qwen2.5-0.5B β OpenAI-Compatible API", | |
| description="LoRA adapter MuhammadNoman7600/mermaid on Qwen2.5-0.5B with tool calling", | |
| version="2.0.0", | |
| ) | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # βββββββββββββββββββββββ Pydantic Models ββββββββββββββββββββββ | |
| class FunctionDef(BaseModel): | |
| name: str | |
| description: Optional[str] = "" | |
| parameters: Optional[dict] = None | |
| class ToolDef(BaseModel): | |
| type: str = "function" | |
| function: FunctionDef | |
| class FunctionCallModel(BaseModel): | |
| name: str | |
| arguments: str | |
| class ToolCallObj(BaseModel): | |
| id: str | |
| type: str = "function" | |
| function: FunctionCallModel | |
| class ChatMessage(BaseModel): | |
| role: str | |
| content: Optional[str] = None | |
| tool_calls: Optional[list[ToolCallObj]] = None | |
| tool_call_id: Optional[str] = None | |
| name: Optional[str] = None | |
| class ChatCompletionRequest(BaseModel): | |
| model: str = DISPLAY_MODEL_NAME | |
| messages: list[ChatMessage] | |
| temperature: Optional[float] = 0.7 | |
| top_p: Optional[float] = 0.9 | |
| max_tokens: Optional[int] = 1024 | |
| stream: Optional[bool] = False | |
| stop: Optional[Union[str, list[str]]] = None | |
| frequency_penalty: Optional[float] = 0.0 | |
| presence_penalty: Optional[float] = 0.0 | |
| repetition_penalty: Optional[float] = 1.0 | |
| n: Optional[int] = 1 | |
| tools: Optional[list[ToolDef]] = None | |
| tool_choice: Optional[Union[str, dict]] = None | |
| class CompletionRequest(BaseModel): | |
| model: str = DISPLAY_MODEL_NAME | |
| prompt: Union[str, list[str]] = "" | |
| temperature: Optional[float] = 0.7 | |
| top_p: Optional[float] = 0.9 | |
| max_tokens: Optional[int] = 512 | |
| stream: Optional[bool] = False | |
| stop: Optional[Union[str, list[str]]] = None | |
| frequency_penalty: Optional[float] = 0.0 | |
| presence_penalty: Optional[float] = 0.0 | |
| repetition_penalty: Optional[float] = 1.0 | |
| n: Optional[int] = 1 | |
| # βββββββββββββββββββ Model Loading ββββββββββββββββββββββββββββ | |
| tokenizer: Any = None | |
| model: Any = None | |
| generate_lock = Lock() | |
| stop_token_ids: list[int] = [] | |
| def load_model(): | |
| global tokenizer, model, stop_token_ids | |
| if model is not None: | |
| return | |
| print(f"\nπ Base model : {BASE_MODEL_NAME}") | |
| print(f"π LoRA adapter: {ADAPTER_NAME}") | |
| print(f" HF_HOME = {os.environ.get('HF_HOME', 'default')}\n") | |
| # ββ Tokenizer βββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Adapter repos rarely ship a tokenizer; fall back to base. | |
| try: | |
| tokenizer = AutoTokenizer.from_pretrained( | |
| ADAPTER_NAME, use_fast=True, trust_remote_code=True | |
| ) | |
| print(" Tokenizer loaded from adapter repo.") | |
| except Exception: | |
| tokenizer = AutoTokenizer.from_pretrained( | |
| BASE_MODEL_NAME, use_fast=True, trust_remote_code=True | |
| ) | |
| print(" Tokenizer loaded from base model repo.") | |
| if tokenizer.pad_token is None: | |
| tokenizer.pad_token = tokenizer.eos_token | |
| # ββ Base model ββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Load in 4-bit if CUDA is available (matches training setup), | |
| # otherwise fall back to float32 on CPU. | |
| use_4bit = torch.cuda.is_available() | |
| if use_4bit: | |
| print(" CUDA detected β loading in 4-bit (bitsandbytes nf4).") | |
| bnb_config = BitsAndBytesConfig( | |
| load_in_4bit=True, | |
| bnb_4bit_quant_type="nf4", | |
| bnb_4bit_use_double_quant=True, | |
| bnb_4bit_compute_dtype=torch.float16, | |
| ) | |
| base = AutoModelForCausalLM.from_pretrained( | |
| BASE_MODEL_NAME, | |
| quantization_config=bnb_config, | |
| device_map="auto", | |
| trust_remote_code=True, | |
| ) | |
| else: | |
| print(" No CUDA β loading base model in float32 on CPU.") | |
| # unsloth/qwen2.5-0.5b-unsloth-bnb-4bit has a bnb-4bit quantization_config | |
| # baked into its model config. On CPU we MUST strip it so that transformers | |
| # does not attempt to invoke bitsandbytes (which requires CUDA). | |
| from transformers import AutoConfig | |
| cfg = AutoConfig.from_pretrained(BASE_MODEL_NAME, trust_remote_code=True) | |
| if hasattr(cfg, "quantization_config"): | |
| del cfg.quantization_config | |
| base = AutoModelForCausalLM.from_pretrained( | |
| BASE_MODEL_NAME, | |
| config=cfg, | |
| quantization_config=None, | |
| dtype=torch.float32, | |
| device_map="cpu", | |
| trust_remote_code=True, | |
| ) | |
| # ββ Attach LoRA adapter βββββββββββββββββββββββββββββββββββββ | |
| print(f" Attaching LoRA adapter β¦") | |
| model = PeftModel.from_pretrained( | |
| base, | |
| ADAPTER_NAME, | |
| is_trainable=False, # inference only | |
| ) | |
| model.eval() | |
| # ββ Stop-token IDs ββββββββββββββββββββββββββββββββββββββββββ | |
| _stop_ids: set[int] = set() | |
| if tokenizer.eos_token_id is not None: | |
| _stop_ids.add(tokenizer.eos_token_id) | |
| for tok_str in ["<|im_end|>", "<|endoftext|>"]: | |
| tid = tokenizer.convert_tokens_to_ids(tok_str) | |
| if tid is not None and tid != tokenizer.unk_token_id: | |
| _stop_ids.add(tid) | |
| stop_token_ids = list(_stop_ids) | |
| print(f" eos_token = {tokenizer.eos_token!r}") | |
| print(f" stop_token_ids = {stop_token_ids}") | |
| print("β Fine-tuned model ready!\n") | |
| # ββββββββββββββββββββ Chat-Prompt Builder (ChatML) ββββββββββββ | |
| TOOL_SYSTEM_PROMPT_TEMPLATE = """\ | |
| You are Qwen, created by Alibaba Cloud. You are a helpful assistant. | |
| # Tools | |
| You may call one or more functions to assist with the user query. | |
| You are provided with function signatures within <tools></tools> XML tags: | |
| <tools> | |
| {tool_definitions} | |
| </tools> | |
| For each function call, return a json object with function name and arguments within <tool_call></tool_call> XML tags: | |
| <tool_call> | |
| {{"name": "<function-name>", "arguments": <args-json-object>}} | |
| </tool_call>""" | |
| NO_TOOL_SYSTEM_PROMPT = ( | |
| "You are Qwen, created by Alibaba Cloud. You are a helpful assistant." | |
| ) | |
| def _serialize_tool_definitions(tools: list[ToolDef]) -> str: | |
| lines = [] | |
| for t in tools: | |
| obj: dict[str, Any] = { | |
| "type": "function", | |
| "function": { | |
| "name": t.function.name, | |
| "description": t.function.description or "", | |
| }, | |
| } | |
| if t.function.parameters: | |
| obj["function"]["parameters"] = t.function.parameters | |
| lines.append(json.dumps(obj)) | |
| return "\n".join(lines) | |
| def build_chat_prompt( | |
| messages: list[ChatMessage], | |
| tools: Optional[list[ToolDef]] = None, | |
| tool_choice: Optional[Union[str, dict]] = None, | |
| ) -> str: | |
| parts: list[str] = [] | |
| has_system = any(m.role == "system" for m in messages) | |
| default_sys = ( | |
| TOOL_SYSTEM_PROMPT_TEMPLATE.format( | |
| tool_definitions=_serialize_tool_definitions(tools) | |
| ) | |
| if tools | |
| else NO_TOOL_SYSTEM_PROMPT | |
| ) | |
| if not has_system: | |
| parts.append(f"<|im_start|>system\n{default_sys}<|im_end|>\n") | |
| for msg in messages: | |
| role = msg.role | |
| if role == "system": | |
| base_content = msg.content or "" | |
| if tools: | |
| tool_block = TOOL_SYSTEM_PROMPT_TEMPLATE.format( | |
| tool_definitions=_serialize_tool_definitions(tools) | |
| ) | |
| merged = f"{base_content}\n\n{tool_block}" if base_content else tool_block | |
| parts.append(f"<|im_start|>system\n{merged}<|im_end|>\n") | |
| else: | |
| parts.append( | |
| f"<|im_start|>system\n{base_content or NO_TOOL_SYSTEM_PROMPT}<|im_end|>\n" | |
| ) | |
| elif role == "user": | |
| parts.append(f"<|im_start|>user\n{msg.content or ''}<|im_end|>\n") | |
| elif role == "assistant": | |
| if msg.tool_calls: | |
| tc_text = "" | |
| for tc in msg.tool_calls: | |
| args = tc.function.arguments | |
| if isinstance(args, dict): | |
| args = json.dumps(args) | |
| tc_text += ( | |
| f"\n<tool_call>\n" | |
| f'{{"name": "{tc.function.name}", "arguments": {args}}}\n' | |
| f"</tool_call>" | |
| ) | |
| parts.append(f"<|im_start|>assistant{tc_text}<|im_end|>\n") | |
| else: | |
| parts.append( | |
| f"<|im_start|>assistant\n{msg.content or ''}<|im_end|>\n" | |
| ) | |
| elif role == "tool": | |
| parts.append( | |
| f"<|im_start|>user\n" | |
| f"<tool_response>\n{msg.content or ''}\n</tool_response>" | |
| f"<|im_end|>\n" | |
| ) | |
| parts.append("<|im_start|>assistant\n") | |
| return "".join(parts) | |
| # ββββββββββββββββββ Tool-Call Parser ββββββββββββββββββββββββββ | |
| _TOOL_CALL_RE = re.compile(r"<tool_call>\s*(\{.*?\})\s*</tool_call>", re.DOTALL) | |
| def parse_tool_calls(text: str) -> tuple[Optional[str], list[dict]]: | |
| tool_calls: list[dict] = [] | |
| for raw_json in _TOOL_CALL_RE.findall(text): | |
| try: | |
| parsed = json.loads(raw_json) | |
| except json.JSONDecodeError: | |
| continue | |
| name = parsed.get("name", "") | |
| arguments = parsed.get("arguments", {}) | |
| if isinstance(arguments, dict): | |
| arguments = json.dumps(arguments) | |
| elif not isinstance(arguments, str): | |
| arguments = json.dumps(arguments) | |
| tool_calls.append({ | |
| "id": f"call_{uuid.uuid4().hex[:24]}", | |
| "type": "function", | |
| "function": {"name": name, "arguments": arguments}, | |
| }) | |
| content = _TOOL_CALL_RE.sub("", text).strip() or None | |
| return content, tool_calls | |
| # ββββββββββββββββββ Generation Helpers ββββββββββββββββββββββββ | |
| def _clean_output(text: str) -> str: | |
| for tok in ["<|im_end|>", "<|im_start|>", "<|endoftext|>"]: | |
| text = text.replace(tok, "") | |
| return text.strip() | |
| def _build_gen_kwargs(inputs: dict, req: Any, streamer=None) -> dict: | |
| kwargs: dict[str, Any] = { | |
| "input_ids": inputs["input_ids"], | |
| "attention_mask": inputs.get("attention_mask"), | |
| "max_new_tokens": req.max_tokens or MAX_NEW_TOKENS, | |
| "do_sample": True, | |
| "temperature": max(req.temperature, 0.01), | |
| "top_p": req.top_p, | |
| "eos_token_id": stop_token_ids, | |
| "pad_token_id": tokenizer.pad_token_id, | |
| } | |
| rep_penalty = getattr(req, "repetition_penalty", 1.0) | |
| if rep_penalty and rep_penalty > 1.0: | |
| kwargs["repetition_penalty"] = rep_penalty | |
| if streamer is not None: | |
| kwargs["streamer"] = streamer | |
| return kwargs | |
| def generate_text(prompt: str, req) -> tuple[str, int, int]: | |
| inputs = tokenizer(prompt, return_tensors="pt") | |
| prompt_tokens = inputs["input_ids"].shape[1] | |
| gen_kwargs = _build_gen_kwargs(inputs, req) | |
| with generate_lock: | |
| with torch.no_grad(): | |
| output_ids = model.generate(**gen_kwargs) | |
| new_ids = output_ids[0][prompt_tokens:] | |
| text = _clean_output(tokenizer.decode(new_ids, skip_special_tokens=False)) | |
| return text, prompt_tokens, len(new_ids) | |
| def generate_text_stream(prompt: str, req): | |
| inputs = tokenizer(prompt, return_tensors="pt") | |
| streamer = TextIteratorStreamer( | |
| tokenizer, skip_prompt=True, skip_special_tokens=False | |
| ) | |
| gen_kwargs = _build_gen_kwargs(inputs, req, streamer=streamer) | |
| thread = Thread(target=_generate_in_thread, args=(gen_kwargs,)) | |
| thread.start() | |
| for token_text in streamer: | |
| if any(s in token_text for s in ["<|im_end|>", "<|endoftext|>"]): | |
| cleaned = _clean_output(token_text) | |
| if cleaned: | |
| yield cleaned | |
| break | |
| yield token_text | |
| thread.join() | |
| def _generate_in_thread(gen_kwargs: dict): | |
| with generate_lock: | |
| with torch.no_grad(): | |
| model.generate(**gen_kwargs) | |
| # ββββββββββββββββββ Response Builders βββββββββββββββββββββββββ | |
| def _uid(prefix: str = "chatcmpl") -> str: | |
| return f"{prefix}-{uuid.uuid4().hex[:12]}" | |
| def make_chat_response( | |
| content: Optional[str], | |
| tool_calls: list[dict], | |
| model_name: str, | |
| prompt_tokens: int, | |
| completion_tokens: int, | |
| ) -> dict: | |
| message: dict[str, Any] = {"role": "assistant"} | |
| if tool_calls: | |
| message["content"] = content | |
| message["tool_calls"] = tool_calls | |
| finish_reason = "tool_calls" | |
| else: | |
| message["content"] = (content or "").strip() | |
| finish_reason = "stop" | |
| return { | |
| "id": _uid(), | |
| "object": "chat.completion", | |
| "created": int(time.time()), | |
| "model": model_name, | |
| "choices": [{"index": 0, "message": message, "finish_reason": finish_reason}], | |
| "usage": { | |
| "prompt_tokens": prompt_tokens, | |
| "completion_tokens": completion_tokens, | |
| "total_tokens": prompt_tokens + completion_tokens, | |
| }, | |
| } | |
| def make_completion_response( | |
| text: str, model_name: str, prompt_tokens: int, completion_tokens: int | |
| ) -> dict: | |
| return { | |
| "id": _uid("cmpl"), | |
| "object": "text_completion", | |
| "created": int(time.time()), | |
| "model": model_name, | |
| "choices": [{"index": 0, "text": text.strip(), "finish_reason": "stop"}], | |
| "usage": { | |
| "prompt_tokens": prompt_tokens, | |
| "completion_tokens": completion_tokens, | |
| "total_tokens": prompt_tokens + completion_tokens, | |
| }, | |
| } | |
| # ββββββββββββββββββ Streaming Helpers ββββββββββββββββββββββββ | |
| def stream_chat_response(prompt: str, req): | |
| cid, created = _uid(), int(time.time()) | |
| def _chunk(delta: dict, finish: Optional[str] = None) -> str: | |
| return "data: " + json.dumps({ | |
| "id": cid, "object": "chat.completion.chunk", | |
| "created": created, "model": req.model, | |
| "choices": [{"index": 0, "delta": delta, "finish_reason": finish}], | |
| }) + "\n\n" | |
| yield _chunk({"role": "assistant"}) | |
| for token_text in generate_text_stream(prompt, req): | |
| if token_text: | |
| yield _chunk({"content": token_text}) | |
| yield _chunk({}, finish="stop") | |
| yield "data: [DONE]\n\n" | |
| def stream_tool_call_chunks( | |
| content: Optional[str], tool_calls: list[dict], model_name: str | |
| ): | |
| cid, created = _uid(), int(time.time()) | |
| def _chunk(delta: dict, finish: Optional[str] = None) -> str: | |
| return "data: " + json.dumps({ | |
| "id": cid, "object": "chat.completion.chunk", | |
| "created": created, "model": model_name, | |
| "choices": [{"index": 0, "delta": delta, "finish_reason": finish}], | |
| }) + "\n\n" | |
| yield _chunk({"role": "assistant"}) | |
| for idx, tc in enumerate(tool_calls): | |
| yield _chunk({"tool_calls": [{ | |
| "index": idx, "id": tc["id"], "type": "function", | |
| "function": {"name": tc["function"]["name"], "arguments": ""}, | |
| }]}) | |
| yield _chunk({"tool_calls": [{ | |
| "index": idx, | |
| "function": {"arguments": tc["function"]["arguments"]}, | |
| }]}) | |
| if content: | |
| yield _chunk({"content": content}) | |
| yield _chunk({}, finish="tool_calls" if tool_calls else "stop") | |
| yield "data: [DONE]\n\n" | |
| # ββββββββββββββββββββββ ROUTES βββββββββββββββββββββββββββββββ | |
| async def root(): | |
| return { | |
| "message": "Mermaid Fine-Tuned Qwen2.5-0.5B OpenAI-Compatible API", | |
| "base_model": BASE_MODEL_NAME, | |
| "adapter": ADAPTER_NAME, | |
| "docs": "/docs", | |
| "endpoints": { | |
| "models": "/v1/models", | |
| "chat": "/v1/chat/completions", | |
| "completions": "/v1/completions", | |
| "health": "/health", | |
| }, | |
| } | |
| async def list_models(): | |
| return { | |
| "object": "list", | |
| "data": [{ | |
| "id": DISPLAY_MODEL_NAME, | |
| "object": "model", | |
| "created": int(time.time()), | |
| "owned_by": "MuhammadNoman7600", | |
| }], | |
| } | |
| async def chat_completions(req: ChatCompletionRequest): | |
| try: | |
| prompt = build_chat_prompt(req.messages, req.tools, req.tool_choice) | |
| # Tool-calling: generate fully first, then parse | |
| if req.tools: | |
| text, prompt_tokens, completion_tokens = generate_text(prompt, req) | |
| content, tool_calls = parse_tool_calls(text) | |
| if req.stream: | |
| return StreamingResponse( | |
| stream_tool_call_chunks(content, tool_calls, req.model), | |
| media_type="text/event-stream", | |
| ) | |
| return JSONResponse( | |
| make_chat_response( | |
| content, tool_calls, req.model, prompt_tokens, completion_tokens | |
| ) | |
| ) | |
| # Normal chat with optional streaming | |
| if req.stream: | |
| return StreamingResponse( | |
| stream_chat_response(prompt, req), | |
| media_type="text/event-stream", | |
| ) | |
| text, prompt_tokens, completion_tokens = generate_text(prompt, req) | |
| return JSONResponse( | |
| make_chat_response(text, [], req.model, prompt_tokens, completion_tokens) | |
| ) | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def completions(req: CompletionRequest): | |
| try: | |
| prompts = [req.prompt] if isinstance(req.prompt, str) else req.prompt | |
| text, prompt_tokens, completion_tokens = generate_text(prompts[0], req) | |
| return JSONResponse( | |
| make_completion_response( | |
| text, req.model, prompt_tokens, completion_tokens | |
| ) | |
| ) | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def health(): | |
| device = "cuda" if torch.cuda.is_available() else "cpu" | |
| return { | |
| "status": "ok", | |
| "base_model": BASE_MODEL_NAME, | |
| "adapter": ADAPTER_NAME, | |
| "device": device, | |
| } | |
| # ββββββββββββββββββββββ MAIN βββββββββββββββββββββββββββββββββ | |
| if __name__ == "__main__": | |
| load_model() | |
| print(f"\n{'='*60}") | |
| print(f" OpenAI-compatible API β Fine-Tuned Mermaid Model") | |
| print(f" Base : {BASE_MODEL_NAME}") | |
| print(f" Adapter: {ADAPTER_NAME}") | |
| device_label = "CUDA (4-bit bitsandbytes)" if torch.cuda.is_available() else "CPU (float32)" | |
| print(f" Device : {device_label}") | |
| print(f" URL : http://{HOST}:{PORT}/v1") | |
| print(f"{'='*60}\n") | |
| uvicorn.run(app, host=HOST, port=PORT, log_level="info") |