| import os | |
| import threading | |
| import torch | |
| from typing import List, Optional, AsyncGenerator | |
| from transformers import Qwen2_5_VLForConditionalGeneration, AutoProcessor | |
| from qwen_vl_utils import process_vision_info | |
| from fastapi import HTTPException | |
| from image_utils import filter_valid_images | |
| style_model = None | |
| style_processor = None | |
| model_lock = threading.Lock() | |
| def ensure_model_loaded(): | |
| global style_model, style_processor | |
| if style_model is not None: | |
| return | |
| print("Loading model (lazy load on first request)...") | |
| model_id = "Qwen/Qwen2.5-VL-7B-Instruct" | |
| hf_token = os.getenv("HF_TOKEN") | |
| try: | |
| if torch.cuda.is_available(): | |
| dtype = torch.bfloat16 if torch.cuda.is_bf16_supported() else torch.float16 | |
| max_memory = {0: "14GiB", "cpu": "2GiB"} | |
| offload_folder = None | |
| else: | |
| dtype = torch.float16 | |
| max_memory = {"cpu": "14GiB"} | |
| offload_folder = "/tmp/model_offload" | |
| os.makedirs(offload_folder, exist_ok=True) | |
| load_kwargs = { | |
| "torch_dtype": dtype, | |
| "device_map": "auto", | |
| "low_cpu_mem_usage": True, | |
| "max_memory": max_memory, | |
| } | |
| if offload_folder: | |
| load_kwargs["offload_folder"] = offload_folder | |
| if hf_token: | |
| load_kwargs["token"] = hf_token | |
| style_model = Qwen2_5_VLForConditionalGeneration.from_pretrained( | |
| model_id, | |
| **load_kwargs | |
| ) | |
| style_processor = AutoProcessor.from_pretrained(model_id, token=hf_token) | |
| else: | |
| style_model = Qwen2_5_VLForConditionalGeneration.from_pretrained( | |
| model_id, | |
| **load_kwargs | |
| ) | |
| style_processor = AutoProcessor.from_pretrained(model_id) | |
| print(f"Loaded {model_id} with dtype={dtype}, device_map=auto") | |
| except Exception as e: | |
| print(f"Error loading model: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| raise | |
| def generate_chat_response(prompt: str, max_length: int = 512, temperature: float = 0.7, rag_context: Optional[str] = None, system_override: Optional[str] = None, images: Optional[List[str]] = None) -> str: | |
| ensure_model_loaded() | |
| system_message = system_override if system_override else "You are StyleGPT, a friendly and helpful fashion stylist assistant. You give natural, conversational advice about clothing, colors, and outfit combinations. Always be warm, friendly, and advisory in your responses. When asked your name, say you're StyleGPT. When greeted, respond warmly and offer to help with fashion advice." | |
| if rag_context: | |
| system_message += f"\n\n{rag_context}\n\nUse this fashion knowledge to provide accurate and helpful advice. Reference this knowledge naturally in your responses." | |
| user_content = [] | |
| if images: | |
| valid_images = filter_valid_images(images) | |
| for pil_image in valid_images: | |
| user_content.append({"type": "image", "image": pil_image}) | |
| user_content.append({"type": "text", "text": prompt}) | |
| messages = [ | |
| {"role": "system", "content": system_message}, | |
| {"role": "user", "content": user_content} | |
| ] | |
| try: | |
| text = style_processor.apply_chat_template( | |
| messages, tokenize=False, add_generation_prompt=True | |
| ) | |
| image_inputs, video_inputs = process_vision_info(messages) | |
| inputs = style_processor( | |
| text=[text], | |
| images=image_inputs, | |
| videos=video_inputs, | |
| padding=True, | |
| return_tensors="pt", | |
| ) | |
| if hasattr(style_model, 'device'): | |
| device = style_model.device | |
| elif hasattr(style_model, 'hf_device_map'): | |
| device = next(iter(style_model.hf_device_map.values())) if style_model.hf_device_map else torch.device("cpu") | |
| else: | |
| device = torch.device("cpu") | |
| inputs = {k: v.to(device) for k, v in inputs.items()} | |
| temperature = max(0.1, min(1.5, temperature)) | |
| with model_lock: | |
| with torch.no_grad(): | |
| try: | |
| outputs = style_model.generate( | |
| **inputs, | |
| max_new_tokens=max_length, | |
| temperature=temperature, | |
| top_p=0.95, | |
| top_k=50, | |
| do_sample=True, | |
| eos_token_id=style_processor.tokenizer.eos_token_id, | |
| pad_token_id=style_processor.tokenizer.pad_token_id, | |
| repetition_penalty=1.1, | |
| ) | |
| except RuntimeError as e: | |
| if "probability tensor" in str(e) or "inf" in str(e) or "nan" in str(e): | |
| print(f"[GENERATE] Probability error, retrying with greedy decoding") | |
| outputs = style_model.generate( | |
| **inputs, | |
| max_new_tokens=max_length, | |
| do_sample=False, | |
| eos_token_id=style_processor.tokenizer.eos_token_id, | |
| pad_token_id=style_processor.tokenizer.pad_token_id, | |
| repetition_penalty=1.1, | |
| ) | |
| else: | |
| raise | |
| generated_ids_trimmed = [ | |
| out_ids[len(in_ids):] for in_ids, out_ids in zip(inputs['input_ids'], outputs) | |
| ] | |
| generated_text = style_processor.batch_decode( | |
| generated_ids_trimmed, skip_special_tokens=True, clean_up_tokenization_spaces=False | |
| )[0] | |
| generated_text = generated_text.strip() | |
| return generated_text | |
| except Exception as e: | |
| print(f"[GENERATE] Error: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| raise HTTPException(status_code=500, detail=f"Generation error: {str(e)}") | |
| async def generate_chat_response_streaming(prompt: str, max_length: int = 512, temperature: float = 0.7, rag_context: Optional[str] = None, system_override: Optional[str] = None, images: Optional[List[str]] = None) -> AsyncGenerator[str, None]: | |
| ensure_model_loaded() | |
| system_message = system_override if system_override else "You are StyleGPT, a friendly and helpful fashion stylist assistant. You give natural, conversational advice about clothing, colors, and outfit combinations. Always be warm, friendly, and advisory in your responses. When asked your name, say you're StyleGPT. When greeted, respond warmly and offer to help with fashion advice." | |
| if rag_context: | |
| system_message += f"\n\n{rag_context}\n\nUse this fashion knowledge to provide accurate and helpful advice. Reference this knowledge naturally in your responses." | |
| user_content = [] | |
| if images: | |
| valid_images = filter_valid_images(images) | |
| for pil_image in valid_images: | |
| user_content.append({"type": "image", "image": pil_image}) | |
| user_content.append({"type": "text", "text": prompt}) | |
| messages = [ | |
| {"role": "system", "content": system_message}, | |
| {"role": "user", "content": user_content} | |
| ] | |
| try: | |
| text = style_processor.apply_chat_template( | |
| messages, tokenize=False, add_generation_prompt=True | |
| ) | |
| image_inputs, video_inputs = process_vision_info(messages) | |
| inputs = style_processor( | |
| text=[text], | |
| images=image_inputs, | |
| videos=video_inputs, | |
| padding=True, | |
| return_tensors="pt", | |
| ) | |
| if hasattr(style_model, 'device'): | |
| device = style_model.device | |
| elif hasattr(style_model, 'hf_device_map'): | |
| device = next(iter(style_model.hf_device_map.values())) if style_model.hf_device_map else torch.device("cpu") | |
| else: | |
| device = torch.device("cpu") | |
| inputs = {k: v.to(device) for k, v in inputs.items()} | |
| temperature = max(0.1, min(1.5, temperature)) | |
| with model_lock: | |
| with torch.no_grad(): | |
| try: | |
| outputs = style_model.generate( | |
| **inputs, | |
| max_new_tokens=max_length, | |
| temperature=temperature, | |
| top_p=0.95, | |
| top_k=50, | |
| do_sample=True, | |
| eos_token_id=style_processor.tokenizer.eos_token_id, | |
| pad_token_id=style_processor.tokenizer.pad_token_id, | |
| repetition_penalty=1.1, | |
| ) | |
| except RuntimeError as e: | |
| if "probability tensor" in str(e) or "inf" in str(e) or "nan" in str(e): | |
| print(f"[GENERATE STREAM] Probability error, retrying with greedy decoding") | |
| outputs = style_model.generate( | |
| **inputs, | |
| max_new_tokens=max_length, | |
| do_sample=False, | |
| eos_token_id=style_processor.tokenizer.eos_token_id, | |
| pad_token_id=style_processor.tokenizer.pad_token_id, | |
| repetition_penalty=1.1, | |
| ) | |
| else: | |
| raise | |
| generated_ids_trimmed = [ | |
| out_ids[len(in_ids):] for in_ids, out_ids in zip(inputs['input_ids'], outputs) | |
| ] | |
| generated_text = style_processor.batch_decode( | |
| generated_ids_trimmed, skip_special_tokens=True, clean_up_tokenization_spaces=False | |
| )[0] | |
| generated_text = generated_text.strip() | |
| import asyncio | |
| for char in generated_text: | |
| yield char | |
| await asyncio.sleep(0.01) | |
| except Exception as e: | |
| print(f"[GENERATE STREAM] Error: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| error_msg = f"I apologize, but I encountered an error generating a response. Please try again." | |
| import asyncio | |
| for char in error_msg: | |
| yield char | |
| await asyncio.sleep(0.01) | |