Spaces:
Running
Running
File size: 10,119 Bytes
14cf01c 8ede5e9 14cf01c 8ede5e9 14cf01c 8ede5e9 14cf01c 8ede5e9 14cf01c 8ede5e9 14cf01c 8ede5e9 14cf01c 8ede5e9 14cf01c 8ede5e9 14cf01c 8ede5e9 14cf01c 8ede5e9 14cf01c 8ede5e9 14cf01c 8ede5e9 14cf01c 8ede5e9 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 |
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
import torch
import numpy as np
from transformers import AutoTokenizer, AutoModel
from typing import List, Union
import json
import logging
import os
import time
import uvicorn
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Model configuration
MODEL_NAME = "Qwen/Qwen3-Embedding-0.6B" # Qwen3 Embedding model
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
MAX_LENGTH = 512
# Global variables for model and tokenizer
model = None
tokenizer = None
def load_model():
"""Load the Qwen3 embedding model and tokenizer"""
global model, tokenizer
try:
logger.info(f"Loading Qwen3 embedding model on device: {DEVICE}")
# Load tokenizer and model for Qwen3 embedding
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME, trust_remote_code=True)
model = AutoModel.from_pretrained(
MODEL_NAME,
trust_remote_code=True,
torch_dtype=torch.float16 if DEVICE == "cuda" else torch.float32,
device_map="auto" if DEVICE == "cuda" else None
)
if DEVICE == "cpu":
model = model.to(DEVICE)
model.eval()
logger.info("Qwen3 embedding model loaded successfully")
return True
except Exception as e:
logger.error(f"Error loading Qwen3 model: {str(e)}")
# Try fallback to a simpler approach
try:
logger.info("Trying fallback model loading...")
from sentence_transformers import SentenceTransformer
model = SentenceTransformer('all-MiniLM-L6-v2')
tokenizer = None
logger.info("Fallback model loaded successfully")
return True
except Exception as fallback_error:
logger.error(f"Fallback model loading also failed: {str(fallback_error)}")
return False
def generate_embeddings(texts: Union[str, List[str]]) -> Union[List[float], List[List[float]]]:
"""Generate embeddings for input text(s) using Qwen3 or fallback model"""
global model, tokenizer
try:
# Ensure texts is a list
if isinstance(texts, str):
texts = [texts]
single_text = True
else:
single_text = False
# Truncate texts if too long
texts = [text[:MAX_LENGTH] for text in texts]
embeddings = []
for text in texts:
try:
# Method 1: Try using the Qwen model directly
if model and tokenizer:
inputs = tokenizer(
text,
return_tensors="pt",
padding=True,
truncation=True,
max_length=MAX_LENGTH
).to(DEVICE)
with torch.no_grad():
outputs = model(**inputs)
# Use mean pooling of last hidden state
embedding = outputs.last_hidden_state.mean(dim=1).squeeze().cpu().numpy()
embeddings.append(embedding.tolist())
elif model and hasattr(model, 'encode'):
# Method 2: Using sentence transformer fallback
embedding = model.encode(text)
embeddings.append(embedding.tolist())
else:
raise Exception("No model available")
except Exception as e:
logger.warning(f"Error generating embedding for text: {str(e)}")
# Return zero vector as last resort
embeddings.append([0.0] * 384) # Standard dimension for fallback
return embeddings[0] if single_text else embeddings
except Exception as e:
logger.error(f"Error in generate_embeddings: {str(e)}")
# Return zero vectors as fallback
if single_text:
return [0.0] * 384
else:
return [[0.0] * 384] * len(texts)
def compute_similarity(embedding1: List[float], embedding2: List[float]) -> float:
"""Compute cosine similarity between two embeddings"""
try:
# Convert to numpy arrays
emb1 = np.array(embedding1)
emb2 = np.array(embedding2)
# Compute cosine similarity
dot_product = np.dot(emb1, emb2)
norm1 = np.linalg.norm(emb1)
norm2 = np.linalg.norm(emb2)
if norm1 == 0 or norm2 == 0:
return 0.0
similarity = dot_product / (norm1 * norm2)
return float(similarity)
except Exception as e:
logger.error(f"Error computing similarity: {str(e)}")
return 0.0
def batch_embedding_interface(texts: str) -> str:
"""Interface for batch embedding generation"""
try:
# Split texts by newlines
text_list = [text.strip() for text in texts.split('\n') if text.strip()]
if not text_list:
return json.dumps([])
# Generate embeddings
embeddings = generate_embeddings(text_list)
# Return as JSON string
return json.dumps(embeddings)
except Exception as e:
logger.error(f"Error in batch_embedding_interface: {str(e)}")
return json.dumps([])
def single_embedding_interface(text: str) -> str:
"""Interface for single embedding generation"""
try:
if not text.strip():
return json.dumps([])
# Generate embedding
embedding = generate_embeddings(text)
# Return as JSON string
return json.dumps(embedding)
except Exception as e:
logger.error(f"Error in single_embedding_interface: {str(e)}")
return json.dumps([])
def similarity_interface(embedding1: str, embedding2: str) -> float:
"""Interface for computing similarity between two embeddings"""
try:
# Parse embeddings from JSON strings
emb1 = json.loads(embedding1)
emb2 = json.loads(embedding2)
# Compute similarity
similarity = compute_similarity(emb1, emb2)
return similarity
except Exception as e:
logger.error(f"Error in similarity_interface: {str(e)}")
return 0.0
def health_check():
"""Health check endpoint"""
return {"status": "healthy", "model_loaded": model is not None}
# Create FastAPI application
app = FastAPI(
title="Qwen3 Embedding API",
description="A stable API for generating text embeddings using the Qwen3-Embedding-0.6B model",
version="1.0.0"
)
# Add CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# FastAPI endpoints
@app.get("/")
async def root():
"""Root endpoint with API information"""
return {
"message": "Qwen3 Embedding API",
"version": "1.0.0",
"model": "Qwen3-Embedding-0.6B",
"endpoints": {
"health": "/health",
"predict": "/api/predict",
"docs": "/docs"
}
}
@app.get("/health")
async def health():
"""Health check endpoint"""
return health_check()
@app.post("/api/predict")
async def predict(data: dict):
"""Main prediction endpoint for embeddings"""
try:
if "data" not in data:
raise HTTPException(status_code=400, detail="Missing 'data' field in request")
input_data = data["data"]
# Handle single text or batch texts
if isinstance(input_data, str):
# Single text
embeddings = generate_embeddings(input_data)
return {"data": [embeddings]}
elif isinstance(input_data, list):
if len(input_data) > 0 and isinstance(input_data[0], str):
# Single text in list
embeddings = generate_embeddings(input_data[0])
return {"data": [embeddings]}
elif len(input_data) > 0 and isinstance(input_data[0], list):
# Batch texts
embeddings = generate_embeddings(input_data[0])
return {"data": [embeddings]}
else:
raise HTTPException(status_code=400, detail="Invalid data format")
else:
raise HTTPException(status_code=400, detail="Invalid data type")
except Exception as e:
logger.error(f"Error in predict endpoint: {str(e)}")
raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")
@app.post("/api/similarity")
async def similarity(data: dict):
"""Compute similarity between two embeddings"""
try:
if "embedding1" not in data or "embedding2" not in data:
raise HTTPException(status_code=400, detail="Missing embedding1 or embedding2 field")
emb1 = data["embedding1"]
emb2 = data["embedding2"]
if not isinstance(emb1, list) or not isinstance(emb2, list):
raise HTTPException(status_code=400, detail="Embeddings must be lists")
sim = compute_similarity(emb1, emb2)
return {"similarity": sim}
except Exception as e:
logger.error(f"Error in similarity endpoint: {str(e)}")
raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")
def main():
"""Main function to run the application"""
logger.info("Starting Qwen3 Embedding Model API...")
# Load model
if not load_model():
logger.error("Failed to load model. Exiting...")
return
logger.info("Model loaded successfully. Starting FastAPI server...")
# Run with uvicorn
uvicorn.run(
app,
host="0.0.0.0",
port=7860,
log_level="info"
)
if __name__ == "__main__":
main()
|