buildwellai-model-v2 / scripts /streaming_api.py
Choukrijer's picture
Upload BuildwellAI Qwen3-14B project
499c907 verified
#!/usr/bin/env python3
"""
BuildwellAI Model V2 - Streaming API Server
A FastAPI-based streaming inference server for the fine-tuned Qwen3-14B model.
Supports:
- Streaming text generation
- Tool calling with MCP integration
- Multi-mode responses (direct, thinking, tool calling)
- OpenAI-compatible API
Usage:
python3 streaming_api.py --model ./output/buildwellai-qwen3-14b-v2/merged --port 8080
"""
import os
import sys
import json
import torch
import asyncio
import argparse
import uvicorn
from pathlib import Path
from typing import Optional, List, Dict, Any, AsyncGenerator
from datetime import datetime
from threading import Thread
from fastapi import FastAPI, HTTPException, Request
from fastapi.responses import StreamingResponse, JSONResponse
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, Field
# ============================================================================
# CONFIGURATION
# ============================================================================
DEFAULT_MODEL_PATH = "./output/buildwellai-qwen3-14b-v2/merged"
DEFAULT_PORT = 8080
MAX_TOKENS = 4096
DEFAULT_TEMPERATURE = 0.7
DEFAULT_TOP_P = 0.9
# MCP Tools available for the model
MCP_TOOLS = {
"universalMCP": {
"type": "function",
"function": {
"name": "universalMCP",
"description": "Call any BuildwellAI MCP server for specialized calculations",
"parameters": {
"type": "object",
"properties": {
"mcpServer": {
"type": "string",
"description": "Name of the MCP server (e.g., 'psi-thermal-bridge', 'sap10', 'breeam')"
},
"toolName": {
"type": "string",
"description": "Name of the tool to call on the MCP server"
},
"arguments": {
"type": "object",
"description": "Arguments to pass to the tool"
}
},
"required": ["mcpServer", "toolName", "arguments"]
}
}
},
"webSearch": {
"type": "function",
"function": {
"name": "webSearch",
"description": "Search the web for construction-related information",
"parameters": {
"type": "object",
"properties": {
"query": {"type": "string", "description": "Search query"}
},
"required": ["query"]
}
}
},
"retrieveDiagrams": {
"type": "function",
"function": {
"name": "retrieveDiagrams",
"description": "Search construction diagrams database",
"parameters": {
"type": "object",
"properties": {
"query": {"type": "string"},
"category": {
"type": "string",
"enum": ["structural", "electrical", "plumbing", "hvac", "site", "general"]
}
},
"required": ["query"]
}
}
}
}
# ============================================================================
# PYDANTIC MODELS
# ============================================================================
class Message(BaseModel):
role: str
content: Optional[str] = ""
tool_calls: Optional[List[Dict]] = None
tool_call_id: Optional[str] = None
class ChatRequest(BaseModel):
model: str = "buildwellai-qwen3-14b-v2"
messages: List[Message]
temperature: float = DEFAULT_TEMPERATURE
top_p: float = DEFAULT_TOP_P
max_tokens: int = MAX_TOKENS
stream: bool = True
tools: Optional[List[Dict]] = None
tool_choice: Optional[str] = "auto"
class ChatCompletionChoice(BaseModel):
index: int = 0
message: Dict
finish_reason: str = "stop"
class ChatCompletionResponse(BaseModel):
id: str
object: str = "chat.completion"
created: int
model: str
choices: List[ChatCompletionChoice]
usage: Dict = {"prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0}
# ============================================================================
# MODEL LOADING
# ============================================================================
class ModelManager:
def __init__(self):
self.model = None
self.tokenizer = None
self.model_path = None
self.device = None
def load(self, model_path: str):
"""Load the fine-tuned model."""
from transformers import AutoModelForCausalLM, AutoTokenizer
print(f"Loading model from: {model_path}")
self.tokenizer = AutoTokenizer.from_pretrained(
model_path,
trust_remote_code=True
)
if self.tokenizer.pad_token is None:
self.tokenizer.pad_token = self.tokenizer.eos_token
self.model = AutoModelForCausalLM.from_pretrained(
model_path,
torch_dtype=torch.bfloat16 if torch.cuda.is_bf16_supported() else torch.float16,
device_map="auto",
trust_remote_code=True,
)
self.model.eval()
self.model_path = model_path
self.device = next(self.model.parameters()).device
print(f"Model loaded on: {self.device}")
return True
def is_loaded(self) -> bool:
return self.model is not None
model_manager = ModelManager()
# ============================================================================
# STREAMING GENERATION
# ============================================================================
async def generate_stream(
messages: List[Message],
temperature: float = DEFAULT_TEMPERATURE,
top_p: float = DEFAULT_TOP_P,
max_tokens: int = MAX_TOKENS,
tools: Optional[List[Dict]] = None
) -> AsyncGenerator[str, None]:
"""Generate streaming response."""
from transformers import TextIteratorStreamer
if not model_manager.is_loaded():
raise HTTPException(status_code=503, detail="Model not loaded")
# Convert messages to dict format
formatted_messages = []
for msg in messages:
m = {"role": msg.role, "content": msg.content or ""}
formatted_messages.append(m)
# Add tools to system message if provided
if tools:
tool_desc = json.dumps(tools, indent=2)
for i, msg in enumerate(formatted_messages):
if msg["role"] == "system":
formatted_messages[i]["content"] += f"\n\nAvailable Tools:\n{tool_desc}"
break
else:
formatted_messages.insert(0, {
"role": "system",
"content": f"You are BuildwellAI. Available Tools:\n{tool_desc}"
})
# Apply chat template
text = model_manager.tokenizer.apply_chat_template(
formatted_messages,
tokenize=False,
add_generation_prompt=True
)
# Tokenize
inputs = model_manager.tokenizer(text, return_tensors="pt").to(model_manager.device)
# Setup streamer
streamer = TextIteratorStreamer(
model_manager.tokenizer,
skip_prompt=True,
skip_special_tokens=True
)
# Generation config
generation_kwargs = {
**inputs,
"max_new_tokens": max_tokens,
"temperature": temperature if temperature > 0 else 0.01,
"top_p": top_p,
"top_k": 50,
"do_sample": temperature > 0,
"streamer": streamer,
"pad_token_id": model_manager.tokenizer.pad_token_id,
"eos_token_id": model_manager.tokenizer.eos_token_id,
}
# Start generation in background
thread = Thread(target=model_manager.model.generate, kwargs=generation_kwargs)
thread.start()
# Stream tokens
completion_id = f"chatcmpl-{datetime.now().strftime('%Y%m%d%H%M%S')}"
full_response = ""
for token_text in streamer:
full_response += token_text
# SSE format for OpenAI compatibility
chunk = {
"id": completion_id,
"object": "chat.completion.chunk",
"created": int(datetime.now().timestamp()),
"model": "buildwellai-qwen3-14b-v2",
"choices": [{
"index": 0,
"delta": {"content": token_text},
"finish_reason": None
}]
}
yield f"data: {json.dumps(chunk)}\n\n"
# Final chunk
final_chunk = {
"id": completion_id,
"object": "chat.completion.chunk",
"created": int(datetime.now().timestamp()),
"model": "buildwellai-qwen3-14b-v2",
"choices": [{
"index": 0,
"delta": {},
"finish_reason": "stop"
}]
}
yield f"data: {json.dumps(final_chunk)}\n\n"
yield "data: [DONE]\n\n"
thread.join()
def generate_sync(
messages: List[Message],
temperature: float = DEFAULT_TEMPERATURE,
top_p: float = DEFAULT_TOP_P,
max_tokens: int = MAX_TOKENS,
tools: Optional[List[Dict]] = None
) -> str:
"""Generate non-streaming response."""
if not model_manager.is_loaded():
raise HTTPException(status_code=503, detail="Model not loaded")
# Convert messages
formatted_messages = []
for msg in messages:
m = {"role": msg.role, "content": msg.content or ""}
formatted_messages.append(m)
# Add tools
if tools:
tool_desc = json.dumps(tools, indent=2)
for i, msg in enumerate(formatted_messages):
if msg["role"] == "system":
formatted_messages[i]["content"] += f"\n\nAvailable Tools:\n{tool_desc}"
break
# Apply template
text = model_manager.tokenizer.apply_chat_template(
formatted_messages,
tokenize=False,
add_generation_prompt=True
)
# Generate
inputs = model_manager.tokenizer(text, return_tensors="pt").to(model_manager.device)
with torch.no_grad():
outputs = model_manager.model.generate(
**inputs,
max_new_tokens=max_tokens,
temperature=temperature if temperature > 0 else 0.01,
top_p=top_p,
do_sample=temperature > 0,
pad_token_id=model_manager.tokenizer.pad_token_id,
eos_token_id=model_manager.tokenizer.eos_token_id,
)
response = model_manager.tokenizer.decode(
outputs[0][inputs.input_ids.shape[1]:],
skip_special_tokens=True
)
return response
# ============================================================================
# FASTAPI APP
# ============================================================================
app = FastAPI(
title="BuildwellAI Streaming API",
description="Streaming inference API for BuildwellAI Qwen3-14B-V2",
version="2.0.0"
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.get("/")
async def root():
return {
"service": "BuildwellAI Streaming API",
"version": "2.0.0",
"model": model_manager.model_path,
"status": "ready" if model_manager.is_loaded() else "loading"
}
@app.get("/health")
async def health():
return {
"status": "healthy" if model_manager.is_loaded() else "loading",
"model_loaded": model_manager.is_loaded(),
"device": str(model_manager.device) if model_manager.device else None
}
@app.get("/v1/models")
async def list_models():
"""OpenAI-compatible models endpoint."""
return {
"object": "list",
"data": [{
"id": "buildwellai-qwen3-14b-v2",
"object": "model",
"created": int(datetime.now().timestamp()),
"owned_by": "buildwellai"
}]
}
@app.post("/v1/chat/completions")
async def chat_completions(request: ChatRequest):
"""OpenAI-compatible chat completions endpoint."""
if not model_manager.is_loaded():
raise HTTPException(status_code=503, detail="Model not loaded")
# Use MCP tools if none provided
tools = request.tools or list(MCP_TOOLS.values())
if request.stream:
return StreamingResponse(
generate_stream(
messages=request.messages,
temperature=request.temperature,
top_p=request.top_p,
max_tokens=request.max_tokens,
tools=tools
),
media_type="text/event-stream"
)
else:
response = generate_sync(
messages=request.messages,
temperature=request.temperature,
top_p=request.top_p,
max_tokens=request.max_tokens,
tools=tools
)
return ChatCompletionResponse(
id=f"chatcmpl-{datetime.now().strftime('%Y%m%d%H%M%S')}",
created=int(datetime.now().timestamp()),
model=request.model,
choices=[ChatCompletionChoice(
message={"role": "assistant", "content": response},
finish_reason="stop"
)]
)
@app.post("/chat")
async def simple_chat(request: Request):
"""Simple chat endpoint for direct usage."""
data = await request.json()
messages = [Message(**m) for m in data.get("messages", [])]
stream = data.get("stream", True)
temperature = data.get("temperature", DEFAULT_TEMPERATURE)
max_tokens = data.get("max_tokens", MAX_TOKENS)
if stream:
return StreamingResponse(
generate_stream(messages, temperature=temperature, max_tokens=max_tokens),
media_type="text/event-stream"
)
else:
response = generate_sync(messages, temperature=temperature, max_tokens=max_tokens)
return {"response": response}
# ============================================================================
# INTERACTIVE CLI
# ============================================================================
def interactive_cli():
"""Interactive CLI for testing."""
from transformers import TextIteratorStreamer
print("\n" + "=" * 60)
print("BuildwellAI Interactive Chat")
print("=" * 60)
print("Commands: /tools (toggle), /clear, /quit")
print("=" * 60 + "\n")
messages = [
{"role": "system", "content": "You are BuildwellAI, an expert UK construction assistant."}
]
use_tools = False
while True:
try:
user_input = input("\n👤 You: ").strip()
if not user_input:
continue
if user_input == "/quit":
print("Goodbye!")
break
elif user_input == "/clear":
messages = [messages[0]]
print("✓ Cleared")
continue
elif user_input == "/tools":
use_tools = not use_tools
print(f"✓ Tools {'enabled' if use_tools else 'disabled'}")
continue
messages.append({"role": "user", "content": user_input})
# Generate
formatted = model_manager.tokenizer.apply_chat_template(
messages,
tokenize=False,
add_generation_prompt=True
)
inputs = model_manager.tokenizer(formatted, return_tensors="pt").to(model_manager.device)
streamer = TextIteratorStreamer(
model_manager.tokenizer,
skip_prompt=True,
skip_special_tokens=True
)
thread = Thread(
target=model_manager.model.generate,
kwargs={
**inputs,
"max_new_tokens": 1024,
"temperature": 0.7,
"do_sample": True,
"streamer": streamer,
}
)
thread.start()
print("\n🤖 Assistant: ", end="", flush=True)
response = ""
for token in streamer:
print(token, end="", flush=True)
response += token
print()
messages.append({"role": "assistant", "content": response})
thread.join()
except KeyboardInterrupt:
print("\n\nGoodbye!")
break
except Exception as e:
print(f"\n❌ Error: {e}")
# ============================================================================
# MAIN
# ============================================================================
def main():
parser = argparse.ArgumentParser(description="BuildwellAI Streaming API")
parser.add_argument("--model", type=str, default=DEFAULT_MODEL_PATH,
help="Path to fine-tuned model")
parser.add_argument("--port", type=int, default=DEFAULT_PORT,
help="Server port")
parser.add_argument("--host", type=str, default="0.0.0.0",
help="Server host")
parser.add_argument("--cli", action="store_true",
help="Run interactive CLI instead of server")
args = parser.parse_args()
# Load model
if not model_manager.load(args.model):
print("Failed to load model!")
sys.exit(1)
if args.cli:
interactive_cli()
else:
print(f"\n🚀 Starting server on http://{args.host}:{args.port}")
print(f"📖 API docs: http://{args.host}:{args.port}/docs")
uvicorn.run(app, host=args.host, port=args.port)
if __name__ == "__main__":
main()