#!/usr/bin/env python3 """ BuildwellAI Model V2 - Streaming API Server A FastAPI-based streaming inference server for the fine-tuned Qwen3-14B model. Supports: - Streaming text generation - Tool calling with MCP integration - Multi-mode responses (direct, thinking, tool calling) - OpenAI-compatible API Usage: python3 streaming_api.py --model ./output/buildwellai-qwen3-14b-v2/merged --port 8080 """ import os import sys import json import torch import asyncio import argparse import uvicorn from pathlib import Path from typing import Optional, List, Dict, Any, AsyncGenerator from datetime import datetime from threading import Thread from fastapi import FastAPI, HTTPException, Request from fastapi.responses import StreamingResponse, JSONResponse from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel, Field # ============================================================================ # CONFIGURATION # ============================================================================ DEFAULT_MODEL_PATH = "./output/buildwellai-qwen3-14b-v2/merged" DEFAULT_PORT = 8080 MAX_TOKENS = 4096 DEFAULT_TEMPERATURE = 0.7 DEFAULT_TOP_P = 0.9 # MCP Tools available for the model MCP_TOOLS = { "universalMCP": { "type": "function", "function": { "name": "universalMCP", "description": "Call any BuildwellAI MCP server for specialized calculations", "parameters": { "type": "object", "properties": { "mcpServer": { "type": "string", "description": "Name of the MCP server (e.g., 'psi-thermal-bridge', 'sap10', 'breeam')" }, "toolName": { "type": "string", "description": "Name of the tool to call on the MCP server" }, "arguments": { "type": "object", "description": "Arguments to pass to the tool" } }, "required": ["mcpServer", "toolName", "arguments"] } } }, "webSearch": { "type": "function", "function": { "name": "webSearch", "description": "Search the web for construction-related information", "parameters": { "type": "object", "properties": { "query": {"type": "string", "description": "Search query"} }, "required": ["query"] } } }, "retrieveDiagrams": { "type": "function", "function": { "name": "retrieveDiagrams", "description": "Search construction diagrams database", "parameters": { "type": "object", "properties": { "query": {"type": "string"}, "category": { "type": "string", "enum": ["structural", "electrical", "plumbing", "hvac", "site", "general"] } }, "required": ["query"] } } } } # ============================================================================ # PYDANTIC MODELS # ============================================================================ class Message(BaseModel): role: str content: Optional[str] = "" tool_calls: Optional[List[Dict]] = None tool_call_id: Optional[str] = None class ChatRequest(BaseModel): model: str = "buildwellai-qwen3-14b-v2" messages: List[Message] temperature: float = DEFAULT_TEMPERATURE top_p: float = DEFAULT_TOP_P max_tokens: int = MAX_TOKENS stream: bool = True tools: Optional[List[Dict]] = None tool_choice: Optional[str] = "auto" class ChatCompletionChoice(BaseModel): index: int = 0 message: Dict finish_reason: str = "stop" class ChatCompletionResponse(BaseModel): id: str object: str = "chat.completion" created: int model: str choices: List[ChatCompletionChoice] usage: Dict = {"prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0} # ============================================================================ # MODEL LOADING # ============================================================================ class ModelManager: def __init__(self): self.model = None self.tokenizer = None self.model_path = None self.device = None def load(self, model_path: str): """Load the fine-tuned model.""" from transformers import AutoModelForCausalLM, AutoTokenizer print(f"Loading model from: {model_path}") self.tokenizer = AutoTokenizer.from_pretrained( model_path, trust_remote_code=True ) if self.tokenizer.pad_token is None: self.tokenizer.pad_token = self.tokenizer.eos_token self.model = AutoModelForCausalLM.from_pretrained( model_path, torch_dtype=torch.bfloat16 if torch.cuda.is_bf16_supported() else torch.float16, device_map="auto", trust_remote_code=True, ) self.model.eval() self.model_path = model_path self.device = next(self.model.parameters()).device print(f"Model loaded on: {self.device}") return True def is_loaded(self) -> bool: return self.model is not None model_manager = ModelManager() # ============================================================================ # STREAMING GENERATION # ============================================================================ async def generate_stream( messages: List[Message], temperature: float = DEFAULT_TEMPERATURE, top_p: float = DEFAULT_TOP_P, max_tokens: int = MAX_TOKENS, tools: Optional[List[Dict]] = None ) -> AsyncGenerator[str, None]: """Generate streaming response.""" from transformers import TextIteratorStreamer if not model_manager.is_loaded(): raise HTTPException(status_code=503, detail="Model not loaded") # Convert messages to dict format formatted_messages = [] for msg in messages: m = {"role": msg.role, "content": msg.content or ""} formatted_messages.append(m) # Add tools to system message if provided if tools: tool_desc = json.dumps(tools, indent=2) for i, msg in enumerate(formatted_messages): if msg["role"] == "system": formatted_messages[i]["content"] += f"\n\nAvailable Tools:\n{tool_desc}" break else: formatted_messages.insert(0, { "role": "system", "content": f"You are BuildwellAI. Available Tools:\n{tool_desc}" }) # Apply chat template text = model_manager.tokenizer.apply_chat_template( formatted_messages, tokenize=False, add_generation_prompt=True ) # Tokenize inputs = model_manager.tokenizer(text, return_tensors="pt").to(model_manager.device) # Setup streamer streamer = TextIteratorStreamer( model_manager.tokenizer, skip_prompt=True, skip_special_tokens=True ) # Generation config generation_kwargs = { **inputs, "max_new_tokens": max_tokens, "temperature": temperature if temperature > 0 else 0.01, "top_p": top_p, "top_k": 50, "do_sample": temperature > 0, "streamer": streamer, "pad_token_id": model_manager.tokenizer.pad_token_id, "eos_token_id": model_manager.tokenizer.eos_token_id, } # Start generation in background thread = Thread(target=model_manager.model.generate, kwargs=generation_kwargs) thread.start() # Stream tokens completion_id = f"chatcmpl-{datetime.now().strftime('%Y%m%d%H%M%S')}" full_response = "" for token_text in streamer: full_response += token_text # SSE format for OpenAI compatibility chunk = { "id": completion_id, "object": "chat.completion.chunk", "created": int(datetime.now().timestamp()), "model": "buildwellai-qwen3-14b-v2", "choices": [{ "index": 0, "delta": {"content": token_text}, "finish_reason": None }] } yield f"data: {json.dumps(chunk)}\n\n" # Final chunk final_chunk = { "id": completion_id, "object": "chat.completion.chunk", "created": int(datetime.now().timestamp()), "model": "buildwellai-qwen3-14b-v2", "choices": [{ "index": 0, "delta": {}, "finish_reason": "stop" }] } yield f"data: {json.dumps(final_chunk)}\n\n" yield "data: [DONE]\n\n" thread.join() def generate_sync( messages: List[Message], temperature: float = DEFAULT_TEMPERATURE, top_p: float = DEFAULT_TOP_P, max_tokens: int = MAX_TOKENS, tools: Optional[List[Dict]] = None ) -> str: """Generate non-streaming response.""" if not model_manager.is_loaded(): raise HTTPException(status_code=503, detail="Model not loaded") # Convert messages formatted_messages = [] for msg in messages: m = {"role": msg.role, "content": msg.content or ""} formatted_messages.append(m) # Add tools if tools: tool_desc = json.dumps(tools, indent=2) for i, msg in enumerate(formatted_messages): if msg["role"] == "system": formatted_messages[i]["content"] += f"\n\nAvailable Tools:\n{tool_desc}" break # Apply template text = model_manager.tokenizer.apply_chat_template( formatted_messages, tokenize=False, add_generation_prompt=True ) # Generate inputs = model_manager.tokenizer(text, return_tensors="pt").to(model_manager.device) with torch.no_grad(): outputs = model_manager.model.generate( **inputs, max_new_tokens=max_tokens, temperature=temperature if temperature > 0 else 0.01, top_p=top_p, do_sample=temperature > 0, pad_token_id=model_manager.tokenizer.pad_token_id, eos_token_id=model_manager.tokenizer.eos_token_id, ) response = model_manager.tokenizer.decode( outputs[0][inputs.input_ids.shape[1]:], skip_special_tokens=True ) return response # ============================================================================ # FASTAPI APP # ============================================================================ app = FastAPI( title="BuildwellAI Streaming API", description="Streaming inference API for BuildwellAI Qwen3-14B-V2", version="2.0.0" ) app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) @app.get("/") async def root(): return { "service": "BuildwellAI Streaming API", "version": "2.0.0", "model": model_manager.model_path, "status": "ready" if model_manager.is_loaded() else "loading" } @app.get("/health") async def health(): return { "status": "healthy" if model_manager.is_loaded() else "loading", "model_loaded": model_manager.is_loaded(), "device": str(model_manager.device) if model_manager.device else None } @app.get("/v1/models") async def list_models(): """OpenAI-compatible models endpoint.""" return { "object": "list", "data": [{ "id": "buildwellai-qwen3-14b-v2", "object": "model", "created": int(datetime.now().timestamp()), "owned_by": "buildwellai" }] } @app.post("/v1/chat/completions") async def chat_completions(request: ChatRequest): """OpenAI-compatible chat completions endpoint.""" if not model_manager.is_loaded(): raise HTTPException(status_code=503, detail="Model not loaded") # Use MCP tools if none provided tools = request.tools or list(MCP_TOOLS.values()) if request.stream: return StreamingResponse( generate_stream( messages=request.messages, temperature=request.temperature, top_p=request.top_p, max_tokens=request.max_tokens, tools=tools ), media_type="text/event-stream" ) else: response = generate_sync( messages=request.messages, temperature=request.temperature, top_p=request.top_p, max_tokens=request.max_tokens, tools=tools ) return ChatCompletionResponse( id=f"chatcmpl-{datetime.now().strftime('%Y%m%d%H%M%S')}", created=int(datetime.now().timestamp()), model=request.model, choices=[ChatCompletionChoice( message={"role": "assistant", "content": response}, finish_reason="stop" )] ) @app.post("/chat") async def simple_chat(request: Request): """Simple chat endpoint for direct usage.""" data = await request.json() messages = [Message(**m) for m in data.get("messages", [])] stream = data.get("stream", True) temperature = data.get("temperature", DEFAULT_TEMPERATURE) max_tokens = data.get("max_tokens", MAX_TOKENS) if stream: return StreamingResponse( generate_stream(messages, temperature=temperature, max_tokens=max_tokens), media_type="text/event-stream" ) else: response = generate_sync(messages, temperature=temperature, max_tokens=max_tokens) return {"response": response} # ============================================================================ # INTERACTIVE CLI # ============================================================================ def interactive_cli(): """Interactive CLI for testing.""" from transformers import TextIteratorStreamer print("\n" + "=" * 60) print("BuildwellAI Interactive Chat") print("=" * 60) print("Commands: /tools (toggle), /clear, /quit") print("=" * 60 + "\n") messages = [ {"role": "system", "content": "You are BuildwellAI, an expert UK construction assistant."} ] use_tools = False while True: try: user_input = input("\nšŸ‘¤ You: ").strip() if not user_input: continue if user_input == "/quit": print("Goodbye!") break elif user_input == "/clear": messages = [messages[0]] print("āœ“ Cleared") continue elif user_input == "/tools": use_tools = not use_tools print(f"āœ“ Tools {'enabled' if use_tools else 'disabled'}") continue messages.append({"role": "user", "content": user_input}) # Generate formatted = model_manager.tokenizer.apply_chat_template( messages, tokenize=False, add_generation_prompt=True ) inputs = model_manager.tokenizer(formatted, return_tensors="pt").to(model_manager.device) streamer = TextIteratorStreamer( model_manager.tokenizer, skip_prompt=True, skip_special_tokens=True ) thread = Thread( target=model_manager.model.generate, kwargs={ **inputs, "max_new_tokens": 1024, "temperature": 0.7, "do_sample": True, "streamer": streamer, } ) thread.start() print("\nšŸ¤– Assistant: ", end="", flush=True) response = "" for token in streamer: print(token, end="", flush=True) response += token print() messages.append({"role": "assistant", "content": response}) thread.join() except KeyboardInterrupt: print("\n\nGoodbye!") break except Exception as e: print(f"\nāŒ Error: {e}") # ============================================================================ # MAIN # ============================================================================ def main(): parser = argparse.ArgumentParser(description="BuildwellAI Streaming API") parser.add_argument("--model", type=str, default=DEFAULT_MODEL_PATH, help="Path to fine-tuned model") parser.add_argument("--port", type=int, default=DEFAULT_PORT, help="Server port") parser.add_argument("--host", type=str, default="0.0.0.0", help="Server host") parser.add_argument("--cli", action="store_true", help="Run interactive CLI instead of server") args = parser.parse_args() # Load model if not model_manager.load(args.model): print("Failed to load model!") sys.exit(1) if args.cli: interactive_cli() else: print(f"\nšŸš€ Starting server on http://{args.host}:{args.port}") print(f"šŸ“– API docs: http://{args.host}:{args.port}/docs") uvicorn.run(app, host=args.host, port=args.port) if __name__ == "__main__": main()