import os import threading import torch from typing import List, Optional, AsyncGenerator from transformers import Qwen2_5_VLForConditionalGeneration, AutoProcessor from qwen_vl_utils import process_vision_info from fastapi import HTTPException from image_utils import filter_valid_images style_model = None style_processor = None model_lock = threading.Lock() def ensure_model_loaded(): global style_model, style_processor if style_model is not None: return print("Loading model (lazy load on first request)...") model_id = "Qwen/Qwen2.5-VL-7B-Instruct" hf_token = os.getenv("HF_TOKEN") try: if torch.cuda.is_available(): dtype = torch.bfloat16 if torch.cuda.is_bf16_supported() else torch.float16 max_memory = {0: "14GiB", "cpu": "2GiB"} offload_folder = None else: dtype = torch.float16 max_memory = {"cpu": "14GiB"} offload_folder = "/tmp/model_offload" os.makedirs(offload_folder, exist_ok=True) load_kwargs = { "torch_dtype": dtype, "device_map": "auto", "low_cpu_mem_usage": True, "max_memory": max_memory, } if offload_folder: load_kwargs["offload_folder"] = offload_folder if hf_token: load_kwargs["token"] = hf_token style_model = Qwen2_5_VLForConditionalGeneration.from_pretrained( model_id, **load_kwargs ) style_processor = AutoProcessor.from_pretrained(model_id, token=hf_token) else: style_model = Qwen2_5_VLForConditionalGeneration.from_pretrained( model_id, **load_kwargs ) style_processor = AutoProcessor.from_pretrained(model_id) print(f"Loaded {model_id} with dtype={dtype}, device_map=auto") except Exception as e: print(f"Error loading model: {e}") import traceback traceback.print_exc() raise def generate_chat_response(prompt: str, max_length: int = 512, temperature: float = 0.7, rag_context: Optional[str] = None, system_override: Optional[str] = None, images: Optional[List[str]] = None) -> str: ensure_model_loaded() system_message = system_override if system_override else "You are StyleGPT, a friendly and helpful fashion stylist assistant. You give natural, conversational advice about clothing, colors, and outfit combinations. Always be warm, friendly, and advisory in your responses. When asked your name, say you're StyleGPT. When greeted, respond warmly and offer to help with fashion advice." if rag_context: system_message += f"\n\n{rag_context}\n\nUse this fashion knowledge to provide accurate and helpful advice. Reference this knowledge naturally in your responses." user_content = [] if images: valid_images = filter_valid_images(images) for pil_image in valid_images: user_content.append({"type": "image", "image": pil_image}) user_content.append({"type": "text", "text": prompt}) messages = [ {"role": "system", "content": system_message}, {"role": "user", "content": user_content} ] try: text = style_processor.apply_chat_template( messages, tokenize=False, add_generation_prompt=True ) image_inputs, video_inputs = process_vision_info(messages) inputs = style_processor( text=[text], images=image_inputs, videos=video_inputs, padding=True, return_tensors="pt", ) if hasattr(style_model, 'device'): device = style_model.device elif hasattr(style_model, 'hf_device_map'): device = next(iter(style_model.hf_device_map.values())) if style_model.hf_device_map else torch.device("cpu") else: device = torch.device("cpu") inputs = {k: v.to(device) for k, v in inputs.items()} temperature = max(0.1, min(1.5, temperature)) with model_lock: with torch.no_grad(): try: outputs = style_model.generate( **inputs, max_new_tokens=max_length, temperature=temperature, top_p=0.95, top_k=50, do_sample=True, eos_token_id=style_processor.tokenizer.eos_token_id, pad_token_id=style_processor.tokenizer.pad_token_id, repetition_penalty=1.1, ) except RuntimeError as e: if "probability tensor" in str(e) or "inf" in str(e) or "nan" in str(e): print(f"[GENERATE] Probability error, retrying with greedy decoding") outputs = style_model.generate( **inputs, max_new_tokens=max_length, do_sample=False, eos_token_id=style_processor.tokenizer.eos_token_id, pad_token_id=style_processor.tokenizer.pad_token_id, repetition_penalty=1.1, ) else: raise generated_ids_trimmed = [ out_ids[len(in_ids):] for in_ids, out_ids in zip(inputs['input_ids'], outputs) ] generated_text = style_processor.batch_decode( generated_ids_trimmed, skip_special_tokens=True, clean_up_tokenization_spaces=False )[0] generated_text = generated_text.strip() return generated_text except Exception as e: print(f"[GENERATE] Error: {e}") import traceback traceback.print_exc() raise HTTPException(status_code=500, detail=f"Generation error: {str(e)}") async def generate_chat_response_streaming(prompt: str, max_length: int = 512, temperature: float = 0.7, rag_context: Optional[str] = None, system_override: Optional[str] = None, images: Optional[List[str]] = None) -> AsyncGenerator[str, None]: ensure_model_loaded() system_message = system_override if system_override else "You are StyleGPT, a friendly and helpful fashion stylist assistant. You give natural, conversational advice about clothing, colors, and outfit combinations. Always be warm, friendly, and advisory in your responses. When asked your name, say you're StyleGPT. When greeted, respond warmly and offer to help with fashion advice." if rag_context: system_message += f"\n\n{rag_context}\n\nUse this fashion knowledge to provide accurate and helpful advice. Reference this knowledge naturally in your responses." user_content = [] if images: valid_images = filter_valid_images(images) for pil_image in valid_images: user_content.append({"type": "image", "image": pil_image}) user_content.append({"type": "text", "text": prompt}) messages = [ {"role": "system", "content": system_message}, {"role": "user", "content": user_content} ] try: text = style_processor.apply_chat_template( messages, tokenize=False, add_generation_prompt=True ) image_inputs, video_inputs = process_vision_info(messages) inputs = style_processor( text=[text], images=image_inputs, videos=video_inputs, padding=True, return_tensors="pt", ) if hasattr(style_model, 'device'): device = style_model.device elif hasattr(style_model, 'hf_device_map'): device = next(iter(style_model.hf_device_map.values())) if style_model.hf_device_map else torch.device("cpu") else: device = torch.device("cpu") inputs = {k: v.to(device) for k, v in inputs.items()} temperature = max(0.1, min(1.5, temperature)) with model_lock: with torch.no_grad(): try: outputs = style_model.generate( **inputs, max_new_tokens=max_length, temperature=temperature, top_p=0.95, top_k=50, do_sample=True, eos_token_id=style_processor.tokenizer.eos_token_id, pad_token_id=style_processor.tokenizer.pad_token_id, repetition_penalty=1.1, ) except RuntimeError as e: if "probability tensor" in str(e) or "inf" in str(e) or "nan" in str(e): print(f"[GENERATE STREAM] Probability error, retrying with greedy decoding") outputs = style_model.generate( **inputs, max_new_tokens=max_length, do_sample=False, eos_token_id=style_processor.tokenizer.eos_token_id, pad_token_id=style_processor.tokenizer.pad_token_id, repetition_penalty=1.1, ) else: raise generated_ids_trimmed = [ out_ids[len(in_ids):] for in_ids, out_ids in zip(inputs['input_ids'], outputs) ] generated_text = style_processor.batch_decode( generated_ids_trimmed, skip_special_tokens=True, clean_up_tokenization_spaces=False )[0] generated_text = generated_text.strip() import asyncio for char in generated_text: yield char await asyncio.sleep(0.01) except Exception as e: print(f"[GENERATE STREAM] Error: {e}") import traceback traceback.print_exc() error_msg = f"I apologize, but I encountered an error generating a response. Please try again." import asyncio for char in error_msg: yield char await asyncio.sleep(0.01)