Spaces:
Running
Running
| import asyncio | |
| import base64 | |
| import io | |
| import json | |
| import logging | |
| import time | |
| import uuid | |
| import threading | |
| from contextlib import asynccontextmanager | |
| from typing import AsyncGenerator, Dict, List, Optional, Union | |
| from pathlib import Path | |
| import numpy as np | |
| import onnxruntime as ort | |
| from fastapi import FastAPI, HTTPException, Request, UploadFile, File | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.responses import JSONResponse | |
| from huggingface_hub import hf_hub_download, list_repo_files | |
| from pydantic import BaseModel, Field | |
| from sse_starlette.sse import EventSourceResponse | |
| from transformers import AutoImageProcessor, PreTrainedTokenizerFast | |
| from PIL import Image | |
| import aiohttp | |
| from config import settings | |
| # Configure logging | |
| logging.basicConfig( | |
| level=getattr(logging, settings.log_level.upper()), | |
| format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" | |
| ) | |
| logger = logging.getLogger(__name__) | |
| # ============================================================================== | |
| # Pydantic Models for OpenAI-compatible API | |
| # ============================================================================== | |
| class ImageContent(BaseModel): | |
| type: str = "image" | |
| image_url: Optional[str] = None # data:image/jpeg;base64,... or URL | |
| class TextContent(BaseModel): | |
| type: str = "text" | |
| text: str | |
| class VisionMessage(BaseModel): | |
| role: str = Field(..., description="Role: 'system', 'user', or 'assistant'") | |
| content: Union[str, List[Union[ImageContent, TextContent, dict]]] = Field(..., description="Message content") | |
| class VisionCompletionRequest(BaseModel): | |
| model: str = Field(default="lfm-vision", description="Model identifier") | |
| messages: List[VisionMessage] = Field(..., description="Conversation messages") | |
| temperature: Optional[float] = Field(default=None, ge=0.0, le=2.0) | |
| top_p: Optional[float] = Field(default=None, ge=0.0, le=1.0) | |
| top_k: Optional[int] = Field(default=None, ge=0) | |
| max_tokens: Optional[int] = Field(default=None, ge=1) | |
| stream: bool = Field(default=False, description="Enable streaming response") | |
| stop: Optional[Union[str, List[str]]] = Field(default=None) | |
| class ChatMessage(BaseModel): | |
| role: str = Field(..., description="Role: 'system', 'user', or 'assistant'") | |
| content: str = Field(..., description="Message content") | |
| class ChatCompletionRequest(BaseModel): | |
| model: str = Field(default="lfm-vision", description="Model identifier") | |
| messages: List[ChatMessage] = Field(..., description="Conversation messages") | |
| temperature: Optional[float] = Field(default=None, ge=0.0, le=2.0) | |
| top_p: Optional[float] = Field(default=None, ge=0.0, le=1.0) | |
| top_k: Optional[int] = Field(default=None, ge=0) | |
| max_tokens: Optional[int] = Field(default=None, ge=1) | |
| stream: bool = Field(default=False, description="Enable streaming response") | |
| class ChatCompletionChoice(BaseModel): | |
| index: int | |
| message: ChatMessage | |
| finish_reason: Optional[str] = None | |
| class ChatCompletionResponse(BaseModel): | |
| id: str | |
| object: str = "chat.completion" | |
| created: int | |
| model: str | |
| choices: List[ChatCompletionChoice] | |
| usage: Dict[str, int] | |
| class ModelInfo(BaseModel): | |
| id: str | |
| object: str = "model" | |
| created: int | |
| owned_by: str = "liquid-ai" | |
| class ModelListResponse(BaseModel): | |
| object: str = "list" | |
| data: List[ModelInfo] | |
| # ============================================================================== | |
| # ONNX Vision Model Manager | |
| # ============================================================================== | |
| # ONNX dtype mapping | |
| ONNX_DTYPE = { | |
| "tensor(float)": np.float32, | |
| "tensor(float16)": np.float16, | |
| "tensor(int64)": np.int64 | |
| } | |
| class Lfm2VlProcessorWrapper: | |
| """ | |
| Custom processor wrapper that combines ImageProcessor + Tokenizer. | |
| This bypasses the AutoProcessor tokenizer auto-detection bug in LFM models. | |
| """ | |
| def __init__(self, image_processor, tokenizer): | |
| self.image_processor = image_processor | |
| self.tokenizer = tokenizer | |
| def apply_chat_template(self, messages, add_generation_prompt=True, tokenize=False, **kwargs): | |
| """ | |
| Apply chat template for vision-language model. | |
| Converts vision message format [{"type": "image"}, {"type": "text", "text": "..."}] | |
| to text with <image> placeholders as expected by the tokenizer. | |
| """ | |
| # Transform vision messages to text format | |
| text_messages = [] | |
| for msg in messages: | |
| role = msg.get("role", "user") if isinstance(msg, dict) else getattr(msg, "role", "user") | |
| content = msg.get("content", "") if isinstance(msg, dict) else getattr(msg, "content", "") | |
| if isinstance(content, list): | |
| # Vision message format: [{"type": "image"}, {"type": "text", "text": "..."}] | |
| text_parts = [] | |
| for item in content: | |
| if isinstance(item, dict): | |
| item_type = item.get("type", "") | |
| if item_type == "image": | |
| text_parts.append("<image>") | |
| elif item_type == "text": | |
| text_parts.append(item.get("text", "")) | |
| else: | |
| text_parts.append(str(item)) | |
| content = "".join(text_parts) | |
| text_messages.append({"role": role, "content": content}) | |
| return self.tokenizer.apply_chat_template( | |
| text_messages, | |
| add_generation_prompt=add_generation_prompt, | |
| tokenize=tokenize, | |
| **kwargs | |
| ) | |
| def __call__(self, images=None, text=None, **kwargs): | |
| """ | |
| Process images and text for the vision-language model. | |
| CRITICAL: The vision encoder produces N image embeddings (e.g., 256 for a 512x512 image). | |
| Each embedding needs its own <image> token position in input_ids. | |
| This method: | |
| 1. Processes images FIRST to determine N (number of image tokens) | |
| 2. Expands single <image> in text to N consecutive <image> tokens | |
| 3. Tokenizes the expanded text | |
| Returns a dict with pixel_values, input_ids, attention_mask, etc. | |
| """ | |
| result = {} | |
| return_tensors = kwargs.pop('return_tensors', None) | |
| num_image_tokens = 0 | |
| # Step 1: Process images FIRST to get the number of image tokens | |
| if images is not None: | |
| image_outputs = self.image_processor(images=images, return_tensors=return_tensors) | |
| result.update(image_outputs) | |
| # Calculate number of image tokens from pixel_values shape | |
| # pixel_values shape: [batch, num_patches, hidden_dim] | |
| # The MLP projector in LFM2.5-VL reduces patches by factor of 4 | |
| # Reference: https://huggingface.co/LiquidAI/LFM2.5-VL-1.6B | |
| if 'pixel_values' in image_outputs: | |
| pv = image_outputs['pixel_values'] | |
| num_patches = pv.shape[1] if hasattr(pv, 'shape') else pv.size(1) | |
| # MLP projector reduces by factor of 4: 1024 patches → 256 tokens | |
| num_image_tokens = num_patches // 4 | |
| logger.debug(f"Image processing: {num_patches} patches → {num_image_tokens} image tokens") | |
| # Step 2: Expand <image> placeholder(s) to match token count | |
| if text is not None: | |
| # Ensure text is a string | |
| if isinstance(text, list): | |
| text = text[0] if len(text) == 1 else " ".join(text) | |
| # Expand each <image> placeholder to N <image> tokens | |
| if num_image_tokens > 0 and "<image>" in text: | |
| # Count existing <image> placeholders | |
| image_count = text.count("<image>") | |
| # Each placeholder represents one image, expand to num_image_tokens | |
| tokens_per_image = num_image_tokens // image_count if image_count > 0 else num_image_tokens | |
| expanded_image = "<image>" * tokens_per_image | |
| text = text.replace("<image>", expanded_image) | |
| logger.debug(f"Expanded {image_count} <image> placeholder(s) to {tokens_per_image} tokens each") | |
| text_outputs = self.tokenizer( | |
| text, | |
| return_tensors=return_tensors, | |
| padding=kwargs.get('padding', False), | |
| truncation=kwargs.get('truncation', False), | |
| max_length=kwargs.get('max_length', None) | |
| ) | |
| result.update(text_outputs) | |
| return result | |
| class ONNXVisionModelManager: | |
| """Manages ONNX Vision-Language model with 3 sessions: embed_tokens, embed_images, decoder.""" | |
| def __init__(self): | |
| self._embed_tokens = None | |
| self._embed_images = None | |
| self._decoder = None | |
| self._processor = None | |
| self._cache_template = None | |
| self._lock = threading.Lock() | |
| def is_loaded(self) -> bool: | |
| return all([self._embed_tokens, self._embed_images, self._decoder]) | |
| def download_models(self) -> Dict[str, str]: | |
| """Download ONNX model files from HuggingFace.""" | |
| model_id = settings.model_id | |
| encoder_var = settings.encoder_variant | |
| decoder_var = settings.decoder_variant | |
| logger.info(f"Downloading model: {model_id}") | |
| logger.info(f" Encoder variant: {encoder_var}") | |
| logger.info(f" Decoder variant: {decoder_var}") | |
| paths = {} | |
| # Download embed_tokens (use same variant as encoder or fp16) | |
| embed_suffix = f"_fp16" if encoder_var in ["fp16", "q8", "q4"] else "" | |
| paths["embed_tokens"] = hf_hub_download(model_id, f"onnx/embed_tokens{embed_suffix}.onnx") | |
| # Download embed_images (vision encoder) | |
| img_suffix = f"_{encoder_var}" if encoder_var != "fp32" else "" | |
| paths["embed_images"] = hf_hub_download(model_id, f"onnx/embed_images{img_suffix}.onnx") | |
| # Download decoder | |
| dec_suffix = f"_{decoder_var}" if decoder_var != "fp32" else "" | |
| paths["decoder"] = hf_hub_download(model_id, f"onnx/decoder{dec_suffix}.onnx") | |
| # Download all data files - use exact prefix matching to avoid downloading wrong variants | |
| # Expected files for selected variants only (e.g., decoder_q8.onnx_data, not decoder.onnx_data) | |
| expected_prefixes = [ | |
| f"onnx/embed_tokens{embed_suffix}.onnx_data", | |
| f"onnx/embed_images{img_suffix}.onnx_data", | |
| f"onnx/decoder{dec_suffix}.onnx_data" | |
| ] | |
| for f in list_repo_files(model_id): | |
| if f.startswith("onnx/") and ".onnx_data" in f: | |
| # Check if this file STARTS WITH one of our expected prefixes | |
| # This handles split files like decoder_q8.onnx_data, decoder_q8.onnx_data_1, etc. | |
| if any(f.startswith(prefix) for prefix in expected_prefixes): | |
| logger.info(f"Downloading: {f}") | |
| hf_hub_download(model_id, f) | |
| return paths | |
| def load_model(self) -> None: | |
| """Load the ONNX models and processor.""" | |
| with self._lock: | |
| if self.is_loaded: | |
| return | |
| logger.info("=" * 60) | |
| logger.info("Loading LFM2.5-VL-1.6B Vision-Language ONNX model...") | |
| logger.info(f"Model: {settings.model_id}") | |
| logger.info(f"Encoder: {settings.encoder_variant} (Q8 = ~95% accuracy)") | |
| logger.info(f"Decoder: {settings.decoder_variant}") | |
| logger.info("=" * 60) | |
| start_time = time.time() | |
| # Download models | |
| paths = self.download_models() | |
| # Configure ONNX Runtime for CPU | |
| sess_options = ort.SessionOptions() | |
| sess_options.intra_op_num_threads = settings.num_threads | |
| sess_options.inter_op_num_threads = settings.num_threads | |
| sess_options.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL | |
| # Load ONNX sessions | |
| self._embed_tokens = ort.InferenceSession( | |
| paths["embed_tokens"], | |
| sess_options=sess_options, | |
| providers=['CPUExecutionProvider'] | |
| ) | |
| self._embed_images = ort.InferenceSession( | |
| paths["embed_images"], | |
| sess_options=sess_options, | |
| providers=['CPUExecutionProvider'] | |
| ) | |
| self._decoder = ort.InferenceSession( | |
| paths["decoder"], | |
| sess_options=sess_options, | |
| providers=['CPUExecutionProvider'] | |
| ) | |
| # Load processor components separately to bypass TokenizersBackend bug | |
| # LFM models incorrectly specify TokenizersBackend as tokenizer_class | |
| logger.info("Loading image processor...") | |
| image_processor = AutoImageProcessor.from_pretrained( | |
| settings.model_id, | |
| trust_remote_code=True | |
| ) | |
| logger.info("Loading tokenizer with PreTrainedTokenizerFast...") | |
| tokenizer = PreTrainedTokenizerFast.from_pretrained( | |
| settings.model_id, | |
| trust_remote_code=True | |
| ) | |
| # Create our custom processor wrapper | |
| self._processor = Lfm2VlProcessorWrapper( | |
| image_processor=image_processor, | |
| tokenizer=tokenizer | |
| ) | |
| logger.info(f"✓ Processor created: {type(self._processor).__name__}") | |
| # Initialize cache template for decoder | |
| self._init_cache_template() | |
| load_time = time.time() - start_time | |
| logger.info("=" * 60) | |
| logger.info(f"✓ Model loaded in {load_time:.2f}s") | |
| logger.info(f" Threads: {settings.num_threads}") | |
| logger.info(f" Provider: CPU") | |
| logger.info("=" * 60) | |
| def _init_cache_template(self) -> None: | |
| """Initialize KV cache template for decoder.""" | |
| self._cache_template = {} | |
| for inp in self._decoder.get_inputs(): | |
| if inp.name in {"inputs_embeds", "attention_mask", "position_ids"}: | |
| continue | |
| shape = [d if isinstance(d, int) else 1 for d in inp.shape] | |
| for i, d in enumerate(inp.shape): | |
| if isinstance(d, str) and "sequence" in d.lower(): | |
| shape[i] = 0 | |
| dtype = ONNX_DTYPE.get(inp.type, np.float32) | |
| self._cache_template[inp.name] = (shape, dtype) | |
| def _create_empty_cache(self) -> Dict[str, np.ndarray]: | |
| """Create a new empty KV cache.""" | |
| return { | |
| name: np.zeros(shape, dtype=dtype) | |
| for name, (shape, dtype) in self._cache_template.items() | |
| } | |
| def processor(self): | |
| if self._processor is None: | |
| raise RuntimeError("Processor not loaded") | |
| return self._processor | |
| def process_image(self, image: Image.Image) -> Dict[str, np.ndarray]: | |
| """Process image to embeddings.""" | |
| # Ensure RGB | |
| if image.mode != "RGB": | |
| image = image.convert("RGB") | |
| return image | |
| def generate( | |
| self, | |
| images: List[Image.Image], | |
| messages: List[dict], | |
| max_tokens: int = 512, | |
| temperature: float = 0.1, | |
| top_k: int = 50, | |
| top_p: float = 0.1, | |
| stop_tokens: Optional[List[int]] = None | |
| ) -> List[int]: | |
| """Generate tokens using ONNX Vision model.""" | |
| tokenizer = self._processor.tokenizer | |
| if stop_tokens is None: | |
| stop_tokens = [tokenizer.eos_token_id] | |
| # Process inputs through processor | |
| prompt = self._processor.apply_chat_template(messages, add_generation_prompt=True) | |
| inputs = self._processor( | |
| images=images if images else None, | |
| text=prompt, | |
| return_tensors="pt" | |
| ) | |
| # Convert to numpy with correct dtypes | |
| input_ids = inputs["input_ids"].numpy().astype(np.int64) | |
| # Get token embeddings | |
| token_outputs = self._embed_tokens.run(None, {"input_ids": input_ids}) | |
| token_embeds = token_outputs[0] | |
| # Process images if present | |
| if images and "pixel_values" in inputs: | |
| pixel_values = inputs["pixel_values"].numpy().astype(np.float32) | |
| pixel_attention_mask = inputs.get("pixel_attention_mask", None) | |
| spatial_shapes = inputs.get("spatial_shapes", None) | |
| image_feed = {"pixel_values": pixel_values} | |
| if pixel_attention_mask is not None: | |
| image_feed["pixel_attention_mask"] = pixel_attention_mask.numpy().astype(np.int64) | |
| if spatial_shapes is not None: | |
| image_feed["spatial_shapes"] = spatial_shapes.numpy().astype(np.int64) | |
| image_outputs = self._embed_images.run(None, image_feed) | |
| image_embeds = image_outputs[0] | |
| # Replace <image> tokens with image embeddings | |
| image_token_id = tokenizer.convert_tokens_to_ids("<image>") | |
| image_positions = np.where(input_ids[0] == image_token_id)[0] | |
| for i, pos in enumerate(image_positions): | |
| if i < len(image_embeds): | |
| token_embeds[0, pos] = image_embeds[i] | |
| # Initialize KV cache | |
| cache = self._create_empty_cache() | |
| seq_len = token_embeds.shape[1] | |
| generated_tokens = [] | |
| for step in range(max_tokens): | |
| if step == 0: | |
| embeds = token_embeds.astype(np.float32) | |
| else: | |
| last_token = np.array([[generated_tokens[-1]]], dtype=np.int64) | |
| embeds = self._embed_tokens.run(None, {"input_ids": last_token})[0].astype(np.float32) | |
| attn_mask = np.ones((1, seq_len + len(generated_tokens)), dtype=np.int64) | |
| feed = {"inputs_embeds": embeds, "attention_mask": attn_mask, **cache} | |
| outputs = self._decoder.run(None, feed) | |
| # Get logits and apply temperature | |
| logits = outputs[0][0, -1] | |
| if temperature > 0: | |
| logits = logits / temperature | |
| # Apply top-k | |
| if top_k > 0: | |
| indices_to_remove = np.argsort(logits)[:-top_k] | |
| logits[indices_to_remove] = -np.inf | |
| # Apply top-p (nucleus sampling) | |
| if top_p < 1.0: | |
| sorted_indices = np.argsort(logits)[::-1] | |
| sorted_logits = logits[sorted_indices] | |
| probs = np.exp(sorted_logits - np.max(sorted_logits)) | |
| probs = probs / probs.sum() | |
| cumulative_probs = np.cumsum(probs) | |
| sorted_indices_to_remove = cumulative_probs > top_p | |
| sorted_indices_to_remove[1:] = sorted_indices_to_remove[:-1].copy() | |
| sorted_indices_to_remove[0] = False | |
| indices_to_remove = sorted_indices[sorted_indices_to_remove] | |
| logits[indices_to_remove] = -np.inf | |
| # Sample | |
| probs = np.exp(logits - np.max(logits)) | |
| probs = probs / probs.sum() | |
| next_token = int(np.random.choice(len(probs), p=probs)) | |
| else: | |
| next_token = int(np.argmax(logits)) | |
| generated_tokens.append(next_token) | |
| # Update cache | |
| for i, out in enumerate(self._decoder.get_outputs()[1:], 1): | |
| name = out.name.replace("present_conv", "past_conv").replace("present.", "past_key_values.") | |
| if name in cache: | |
| cache[name] = outputs[i] | |
| if next_token in stop_tokens: | |
| break | |
| return generated_tokens | |
| def generate_stream( | |
| self, | |
| images: List[Image.Image], | |
| messages: List[dict], | |
| max_tokens: int = 2000, | |
| temperature: float = 0.1, | |
| top_k: int = 50, | |
| top_p: float = 0.1, | |
| stop_tokens: Optional[List[int]] = None | |
| ): | |
| """Streaming generation for Vision model.""" | |
| tokenizer = self._processor.tokenizer | |
| if stop_tokens is None: | |
| stop_tokens = [tokenizer.eos_token_id] | |
| # Process inputs through processor | |
| prompt = self._processor.apply_chat_template(messages, add_generation_prompt=True) | |
| inputs = self._processor( | |
| images=images if images else None, | |
| text=prompt, | |
| return_tensors="pt" | |
| ) | |
| # Convert to numpy with correct dtypes | |
| input_ids = inputs["input_ids"].numpy().astype(np.int64) | |
| # Get token embeddings | |
| token_outputs = self._embed_tokens.run(None, {"input_ids": input_ids}) | |
| token_embeds = token_outputs[0] | |
| # Process images if present | |
| if images and "pixel_values" in inputs: | |
| pixel_values = inputs["pixel_values"].numpy().astype(np.float32) | |
| pixel_attention_mask = inputs.get("pixel_attention_mask", None) | |
| spatial_shapes = inputs.get("spatial_shapes", None) | |
| image_feed = {"pixel_values": pixel_values} | |
| if pixel_attention_mask is not None: | |
| image_feed["pixel_attention_mask"] = pixel_attention_mask.numpy().astype(np.int64) | |
| if spatial_shapes is not None: | |
| image_feed["spatial_shapes"] = spatial_shapes.numpy().astype(np.int64) | |
| image_outputs = self._embed_images.run(None, image_feed) | |
| image_embeds = image_outputs[0] | |
| # Replace <image> tokens with image embeddings | |
| image_token_id = tokenizer.convert_tokens_to_ids("<image>") | |
| image_positions = np.where(input_ids[0] == image_token_id)[0] | |
| for i, pos in enumerate(image_positions): | |
| if i < len(image_embeds): | |
| token_embeds[0, pos] = image_embeds[i] | |
| # Initialize KV cache | |
| cache = self._create_empty_cache() | |
| seq_len = token_embeds.shape[1] | |
| generated_tokens = [] | |
| # Pre-allocate attention mask | |
| max_possible_len = seq_len + max_tokens | |
| attn_mask = np.ones((1, max_possible_len), dtype=np.int64) | |
| # Pre-compute flags | |
| use_temp = temperature > 0 | |
| use_top_k = top_k > 0 | |
| use_top_p = top_p < 1.0 | |
| feed = {} | |
| for step in range(max_tokens): | |
| current_len = seq_len + step | |
| if step == 0: | |
| embeds = token_embeds.astype(np.float32) | |
| else: | |
| last_token = np.array([[generated_tokens[-1]]], dtype=np.int64) | |
| embeds = self._embed_tokens.run(None, {"input_ids": last_token})[0].astype(np.float32) | |
| # Update Feed Dict | |
| feed.clear() | |
| feed["inputs_embeds"] = embeds | |
| feed["attention_mask"] = attn_mask[:, :current_len] | |
| feed.update(cache) | |
| # Inference | |
| outputs = self._decoder.run(None, feed) | |
| logits = outputs[0][0, -1] | |
| # Sampling | |
| if use_temp: | |
| logits /= temperature | |
| if use_top_k and top_k < len(logits): | |
| top_k_idx = np.argpartition(logits, -top_k)[-top_k:] | |
| mask = np.ones(logits.shape, dtype=bool) | |
| mask[top_k_idx] = False | |
| logits[mask] = -np.inf | |
| if use_top_p: | |
| valid_mask = logits > -np.inf | |
| if valid_mask.any(): | |
| valid_logits = logits[valid_mask] | |
| valid_indices = np.where(valid_mask)[0] | |
| sorted_indices = np.argsort(valid_logits)[::-1] | |
| sorted_logits = valid_logits[sorted_indices] | |
| exp_logits = np.exp(sorted_logits - np.max(sorted_logits)) | |
| probs = exp_logits / exp_logits.sum() | |
| cumulative = np.cumsum(probs) | |
| cutoff = np.searchsorted(cumulative, top_p) | |
| cutoff = min(cutoff + 1, len(sorted_logits)) | |
| accepted_indices = sorted_indices[:cutoff] | |
| accepted_probs = probs[:cutoff] | |
| accepted_probs /= accepted_probs.sum() | |
| sample_idx = np.searchsorted(np.cumsum(accepted_probs), np.random.rand()) | |
| next_token = int(valid_indices[accepted_indices[sample_idx]]) | |
| else: | |
| next_token = int(np.argmax(logits)) | |
| else: | |
| valid_mask = logits > -np.inf | |
| valid_logits = logits[valid_mask] | |
| valid_indices = np.where(valid_mask)[0] | |
| exp_logits = np.exp(valid_logits - np.max(valid_logits)) | |
| probs = exp_logits / exp_logits.sum() | |
| sample_idx = np.searchsorted(np.cumsum(probs), np.random.rand()) | |
| next_token = int(valid_indices[sample_idx]) | |
| else: | |
| next_token = int(np.argmax(logits)) | |
| generated_tokens.append(next_token) | |
| yield next_token | |
| if next_token in stop_tokens: | |
| break | |
| # Update Cache | |
| for i, out in enumerate(self._decoder.get_outputs()[1:], 1): | |
| name = out.name.replace("present_conv", "past_conv").replace("present.", "past_key_values.") | |
| if name in cache: | |
| cache[name] = outputs[i] | |
| def unload(self) -> None: | |
| """Unload models from memory.""" | |
| with self._lock: | |
| if self._embed_tokens is not None: | |
| del self._embed_tokens | |
| del self._embed_images | |
| del self._decoder | |
| del self._processor | |
| self._embed_tokens = None | |
| self._embed_images = None | |
| self._decoder = None | |
| self._processor = None | |
| logger.info("Models unloaded") | |
| # Global model manager | |
| model_manager = ONNXVisionModelManager() | |
| # ============================================================================== | |
| # Image Processing Utilities | |
| # ============================================================================== | |
| def resize_image_for_model(image: Image.Image, max_dim: int = 512) -> Image.Image: | |
| """ | |
| Resize image to max dimension while preserving aspect ratio. | |
| Uses LANCZOS (highest quality) resampling for best visual fidelity. | |
| This optimization ensures: | |
| - Consistent processing time (~3-4s) regardless of input size | |
| - Single-patch processing (256 tokens) instead of tiling | |
| - Reduced memory usage | |
| Args: | |
| image: PIL Image to resize | |
| max_dim: Maximum dimension (width or height), default 512 | |
| Returns: | |
| Resized PIL Image (or original if already small enough) | |
| """ | |
| width, height = image.size | |
| # Skip if already small enough | |
| if width <= max_dim and height <= max_dim: | |
| logger.debug(f"Image {width}x{height} already within {max_dim}px limit") | |
| return image | |
| # Calculate new dimensions (preserve aspect ratio) | |
| ratio = min(max_dim / width, max_dim / height) | |
| new_width = int(width * ratio) | |
| new_height = int(height * ratio) | |
| logger.info(f"Resizing image: {width}x{height} → {new_width}x{new_height} (LANCZOS)") | |
| # Resize with high-quality LANCZOS filter | |
| return image.resize((new_width, new_height), Image.Resampling.LANCZOS) | |
| async def load_image_from_url(url: str) -> Image.Image: | |
| """Load image from URL, convert to RGB, and resize for optimal processing.""" | |
| async with aiohttp.ClientSession() as session: | |
| async with session.get(url) as response: | |
| if response.status != 200: | |
| raise HTTPException(status_code=400, detail=f"Failed to fetch image from URL: {url}") | |
| data = await response.read() | |
| image = Image.open(io.BytesIO(data)) | |
| # Convert to RGB to ensure consistent channel format | |
| if image.mode != 'RGB': | |
| image = image.convert('RGB') | |
| # Resize for optimal model processing (max 512x512) | |
| image = resize_image_for_model(image) | |
| return image | |
| def load_image_from_base64(data_url: str) -> Image.Image: | |
| """Load image from base64 data URL, convert to RGB, and resize for optimal processing.""" | |
| # Format: data:image/jpeg;base64,/9j/4AAQ... | |
| if "," in data_url: | |
| header, encoded = data_url.split(",", 1) | |
| else: | |
| encoded = data_url | |
| image_data = base64.b64decode(encoded) | |
| image = Image.open(io.BytesIO(image_data)) | |
| # Convert to RGB to ensure consistent channel format | |
| if image.mode != 'RGB': | |
| image = image.convert('RGB') | |
| # Resize for optimal model processing (max 512x512) | |
| image = resize_image_for_model(image) | |
| return image | |
| async def process_image_content(content: Union[ImageContent, dict]) -> Optional[Image.Image]: | |
| """Process image content from request.""" | |
| if isinstance(content, dict): | |
| content = ImageContent(**content) | |
| if content.type != "image": | |
| return None | |
| if not content.image_url: | |
| return None | |
| url = content.image_url | |
| # Check if it's a base64 data URL | |
| if url.startswith("data:"): | |
| return load_image_from_base64(url) | |
| else: | |
| # It's a regular URL | |
| return await load_image_from_url(url) | |
| # ============================================================================== | |
| # Application Lifecycle | |
| # ============================================================================== | |
| async def lifespan(app: FastAPI): | |
| """Application lifespan handler.""" | |
| logger.info("Starting LFM2.5-VL Vision API Server (ONNX Runtime)...") | |
| loop = asyncio.get_event_loop() | |
| await loop.run_in_executor(None, model_manager.load_model) | |
| yield | |
| logger.info("Shutting down...") | |
| model_manager.unload() | |
| # ============================================================================== | |
| # FastAPI Application | |
| # ============================================================================== | |
| app = FastAPI( | |
| title=settings.app_name, | |
| description="Fast CPU inference for LiquidAI LFM2.5-VL-1.6B Vision-Language model using ONNX Runtime", | |
| version=settings.app_version, | |
| lifespan=lifespan, | |
| docs_url="/docs", | |
| redoc_url="/redoc", | |
| ) | |
| origins = [ | |
| "http://127.0.0.1:5500", | |
| "http://127.0.0.1:5501", | |
| "http://localhost:5500", | |
| "http://localhost:5173", | |
| "https://toolboxesai.com" | |
| ] | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=origins, | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| async def add_cors_for_null_origin(request: Request, call_next): | |
| """Handle CORS for null origin (when HTML is opened from file://).""" | |
| origin = request.headers.get("origin", "") | |
| response = await call_next(request) | |
| if origin == "null" or not origin: | |
| response.headers["Access-Control-Allow-Origin"] = "*" | |
| response.headers["Access-Control-Allow-Methods"] = "GET, POST, PUT, DELETE, OPTIONS" | |
| response.headers["Access-Control-Allow-Headers"] = "*" | |
| response.headers["Access-Control-Expose-Headers"] = "*" | |
| return response | |
| # ============================================================================== | |
| # Helper Functions | |
| # ============================================================================== | |
| def generate_id() -> str: | |
| return f"chatcmpl-{uuid.uuid4().hex[:12]}" | |
| async def extract_images_and_text(messages: List[VisionMessage]) -> tuple[List[Image.Image], List[dict]]: | |
| """Extract images and convert messages to processor format.""" | |
| images = [] | |
| processed_messages = [] | |
| for msg in messages: | |
| if isinstance(msg.content, str): | |
| # Simple text message | |
| processed_messages.append({ | |
| "role": msg.role, | |
| "content": msg.content | |
| }) | |
| else: | |
| # Mixed content (images + text) | |
| content_parts = [] | |
| for item in msg.content: | |
| if isinstance(item, dict): | |
| item_type = item.get("type", "") | |
| else: | |
| item_type = item.type | |
| if item_type == "image": | |
| image = await process_image_content(item) | |
| if image: | |
| images.append(image) | |
| content_parts.append({"type": "image"}) | |
| elif item_type == "text": | |
| text = item.get("text", "") if isinstance(item, dict) else item.text | |
| content_parts.append({"type": "text", "text": text}) | |
| processed_messages.append({ | |
| "role": msg.role, | |
| "content": content_parts | |
| }) | |
| return images, processed_messages | |
| async def stream_vision_completion(request: VisionCompletionRequest) -> AsyncGenerator[str, None]: | |
| """Streaming vision completion.""" | |
| request_id = generate_id() | |
| created = int(time.time()) | |
| loop = asyncio.get_running_loop() | |
| async_queue = asyncio.Queue() | |
| # Extract images and process messages | |
| images, processed_messages = await extract_images_and_text(request.messages) | |
| tokenizer = model_manager.processor.tokenizer | |
| # Config | |
| max_tokens = request.max_tokens or settings.max_tokens | |
| temperature = request.temperature if request.temperature is not None else settings.temperature | |
| top_k = request.top_k if request.top_k is not None else settings.top_k | |
| top_p = request.top_p if request.top_p is not None else settings.top_p | |
| # Prepare stop tokens | |
| stop_tokens = [tokenizer.eos_token_id] | |
| if request.stop: | |
| if isinstance(request.stop, str): | |
| encoded = tokenizer.encode(request.stop, add_special_tokens=False) | |
| if encoded: | |
| stop_tokens.append(encoded[0]) | |
| elif isinstance(request.stop, list): | |
| for stop_str in request.stop: | |
| encoded = tokenizer.encode(stop_str, add_special_tokens=False) | |
| if encoded: | |
| stop_tokens.append(encoded[0]) | |
| def generate_tokens(): | |
| try: | |
| for token in model_manager.generate_stream( | |
| images, | |
| processed_messages, | |
| max_tokens=max_tokens, | |
| temperature=temperature, | |
| top_k=top_k, | |
| top_p=top_p, | |
| stop_tokens=stop_tokens | |
| ): | |
| loop.call_soon_threadsafe(async_queue.put_nowait, ("token", token)) | |
| except Exception as e: | |
| logger.error(f"Stream generation error: {e}") | |
| loop.call_soon_threadsafe(async_queue.put_nowait, ("error", str(e))) | |
| finally: | |
| loop.call_soon_threadsafe(async_queue.put_nowait, ("done", None)) | |
| threading.Thread(target=generate_tokens, daemon=True).start() | |
| try: | |
| while True: | |
| msg_type, data = await async_queue.get() | |
| if msg_type == "token": | |
| text = tokenizer.decode([data], skip_special_tokens=True) | |
| if text: | |
| chunk = { | |
| "id": request_id, | |
| "object": "chat.completion.chunk", | |
| "created": created, | |
| "model": request.model, | |
| "choices": [{ | |
| "index": 0, | |
| "delta": {"content": text}, | |
| "finish_reason": None | |
| }] | |
| } | |
| yield {"data": json.dumps(chunk)} | |
| elif msg_type == "done": | |
| final = { | |
| "id": request_id, | |
| "object": "chat.completion.chunk", | |
| "created": created, | |
| "model": request.model, | |
| "choices": [{"index": 0, "delta": {}, "finish_reason": "stop"}] | |
| } | |
| yield {"data": json.dumps(final)} | |
| yield {"data": "[DONE]"} | |
| break | |
| elif msg_type == "error": | |
| logger.error(f"Stream error: {data}") | |
| yield {"data": json.dumps({"error": {"message": data}})} | |
| break | |
| except asyncio.CancelledError: | |
| logger.info(f"Stream cancelled for request {request_id[:8]}") | |
| raise | |
| except Exception as e: | |
| logger.error(f"Streaming error: {e}") | |
| yield {"data": json.dumps({"error": {"message": str(e)}})} | |
| # ============================================================================== | |
| # API Endpoints | |
| # ============================================================================== | |
| async def health(): | |
| if not model_manager.is_loaded: | |
| raise HTTPException(status_code=503, detail="Model not loaded") | |
| return {"status": "healthy"} | |
| async def list_models(): | |
| return ModelListResponse( | |
| data=[ | |
| ModelInfo(id="lfm-vision", created=int(time.time())), | |
| ModelInfo(id="lfm-2.5-vl-1.6b-onnx", created=int(time.time())) | |
| ] | |
| ) | |
| async def vision_completions(request: VisionCompletionRequest): | |
| """Vision-language completion with image support.""" | |
| if not model_manager.is_loaded: | |
| raise HTTPException(status_code=503, detail="Model not loaded") | |
| if request.stream: | |
| return EventSourceResponse( | |
| stream_vision_completion(request), | |
| media_type="text/event-stream", | |
| ping=30000, | |
| ping_message_factory=lambda: '{"type": "ping"}' | |
| ) | |
| try: | |
| # Extract images and process messages | |
| images, processed_messages = await extract_images_and_text(request.messages) | |
| tokenizer = model_manager.processor.tokenizer | |
| max_tokens = request.max_tokens or settings.max_tokens | |
| temperature = request.temperature if request.temperature is not None else settings.temperature | |
| top_k = request.top_k if request.top_k is not None else settings.top_k | |
| top_p = request.top_p if request.top_p is not None else settings.top_p | |
| start_time = time.time() | |
| loop = asyncio.get_event_loop() | |
| tokens = await loop.run_in_executor( | |
| None, | |
| lambda: model_manager.generate( | |
| images, | |
| processed_messages, | |
| max_tokens=max_tokens, | |
| temperature=temperature, | |
| top_k=top_k, | |
| top_p=top_p | |
| ) | |
| ) | |
| response_text = tokenizer.decode(tokens, skip_special_tokens=True) | |
| gen_time = time.time() - start_time | |
| logger.debug(f"Generated {len(tokens)} tokens in {gen_time:.2f}s") | |
| return ChatCompletionResponse( | |
| id=generate_id(), | |
| created=int(time.time()), | |
| model=request.model, | |
| choices=[ | |
| ChatCompletionChoice( | |
| index=0, | |
| message=ChatMessage(role="assistant", content=response_text), | |
| finish_reason="stop" | |
| ) | |
| ], | |
| usage={ | |
| "prompt_tokens": 0, # Would need to track input tokens | |
| "completion_tokens": len(tokens), | |
| "total_tokens": len(tokens) | |
| } | |
| ) | |
| except Exception as e: | |
| logger.error(f"Vision completion error: {e}") | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def chat_completions(request: ChatCompletionRequest): | |
| """Text-only chat completion (for compatibility).""" | |
| if not model_manager.is_loaded: | |
| raise HTTPException(status_code=503, detail="Model not loaded") | |
| # Convert to vision request format (no images) | |
| vision_messages = [ | |
| VisionMessage(role=m.role, content=m.content) | |
| for m in request.messages | |
| ] | |
| vision_request = VisionCompletionRequest( | |
| model=request.model, | |
| messages=vision_messages, | |
| temperature=request.temperature, | |
| top_p=request.top_p, | |
| top_k=request.top_k, | |
| max_tokens=request.max_tokens, | |
| stream=request.stream | |
| ) | |
| return await vision_completions(vision_request) | |
| async def upload_image( | |
| file: UploadFile = File(...), | |
| prompt: str = "What is in this image?" | |
| ): | |
| """Direct image upload endpoint.""" | |
| if not model_manager.is_loaded: | |
| raise HTTPException(status_code=503, detail="Model not loaded") | |
| # Validate file type | |
| content_type = file.content_type or "" | |
| file_ext = Path(file.filename or "").suffix.lower().lstrip(".") | |
| if file_ext not in settings.supported_formats and not any(fmt in content_type for fmt in settings.supported_formats): | |
| raise HTTPException( | |
| status_code=400, | |
| detail=f"Unsupported image format. Supported: {settings.supported_formats}" | |
| ) | |
| # Read and process image | |
| contents = await file.read() | |
| if len(contents) > settings.max_image_size_mb * 1024 * 1024: | |
| raise HTTPException( | |
| status_code=400, | |
| detail=f"Image too large. Max size: {settings.max_image_size_mb}MB" | |
| ) | |
| try: | |
| image = Image.open(io.BytesIO(contents)) | |
| except Exception as e: | |
| raise HTTPException(status_code=400, detail=f"Invalid image: {e}") | |
| # Create request | |
| messages = [{ | |
| "role": "user", | |
| "content": [ | |
| {"type": "image"}, | |
| {"type": "text", "text": prompt} | |
| ] | |
| }] | |
| tokenizer = model_manager.processor.tokenizer | |
| tokens = model_manager.generate( | |
| [image], | |
| messages, | |
| max_tokens=settings.max_tokens, | |
| temperature=settings.temperature, | |
| top_k=settings.top_k, | |
| top_p=settings.top_p | |
| ) | |
| response_text = tokenizer.decode(tokens, skip_special_tokens=True) | |
| return { | |
| "id": generate_id(), | |
| "model": "lfm-vision", | |
| "response": response_text | |
| } | |
| # ============================================================================== | |
| # Run Server | |
| # ============================================================================== | |
| if __name__ == "__main__": | |
| import uvicorn | |
| logger.info(f"Starting server on {settings.host}:{settings.port}") | |
| uvicorn.run( | |
| "app:app", | |
| host=settings.host, | |
| port=settings.port, | |
| reload=False, | |
| log_level=settings.log_level | |
| ) | |