Spaces:
Runtime error
Runtime error
| """ | |
| Fixed Flask app using microsoft/Phi-4-multimodal-instruct (non-interactive HF loading) | |
| Additional fixes: | |
| - Explicitly request SDPA attention implementation to avoid FlashAttention2 ImportError | |
| - Clear error messages with actionable next steps if flash_attn or peft are missing | |
| - Uses HF_TOKEN from env (CHAT_MATE or HF_TOKEN) | |
| - Passes trust_remote_code=True and use_auth_token=HF_TOKEN to from_pretrained() to avoid interactive prompt | |
| """ | |
| # β Safe GPU decorator | |
| try: | |
| from spaces import GPU | |
| except Exception: | |
| def GPU(func): | |
| return func | |
| import os | |
| import sys | |
| import time | |
| import torch | |
| import base64 | |
| import re | |
| from io import BytesIO | |
| from PIL import Image | |
| from flask import Flask, request, render_template, jsonify, Response | |
| from flasgger import Swagger, swag_from | |
| from transformers import ( | |
| AutoTokenizer, | |
| AutoModelForCausalLM, | |
| AutoProcessor, # may or may not be present for the model; we handle exceptions | |
| ) | |
| # ------------------------- | |
| # Hugging Face auth (non-interactive) | |
| # ------------------------- | |
| HF_TOKEN = os.environ.get("CHAT_MATE") or os.environ.get("HF_TOKEN") # support both names | |
| if HF_TOKEN: | |
| print("Hugging Face token detected in env (will use it for model downloads).") | |
| else: | |
| print("No HF token found in env (CHAT_MATE/HF_TOKEN). Proceeding anonymously; private models will fail.") | |
| # Flask + Swagger setup | |
| app = Flask(__name__, static_folder="static", template_folder="templates") | |
| swagger = Swagger(app, template={ | |
| "swagger": "2.0", | |
| "info": { | |
| "title": "ChatMate Real-Time API (Multimodal)", | |
| "description": "LangChain + DuckDuckGo + Phi-4-Multimodal-Instruct + Stable Diffusion", | |
| "version": "1.0" | |
| } | |
| }, config={ | |
| "headers": [], | |
| "specs": [{"endpoint": 'apispec', "route": '/apispec.json', "rule_filter": lambda rule: True}], | |
| "static_url_path": "/flasgger_static", | |
| "swagger_ui": True, | |
| "specs_route": "/apidocs/" | |
| }) | |
| # Model load | |
| model_id = "microsoft/Phi-4-multimodal-instruct" | |
| print(f"Loading model: {model_id} (this may take a while)") | |
| torch_dtype = torch.float16 if torch.cuda.is_available() else torch.float32 | |
| # Tokenizer (text) β allow remote code and use token if present | |
| tokenizer = AutoTokenizer.from_pretrained( | |
| model_id, | |
| use_auth_token=HF_TOKEN if HF_TOKEN else None, | |
| trust_remote_code=True | |
| ) | |
| # Try to load a multimodal processor (vision + text). If not available, proceed with text-only. | |
| processor = None | |
| try: | |
| processor = AutoProcessor.from_pretrained( | |
| model_id, | |
| use_auth_token=HF_TOKEN if HF_TOKEN else None, | |
| trust_remote_code=True | |
| ) | |
| print("Loaded AutoProcessor for multimodal inputs.") | |
| except Exception as e: | |
| processor = None | |
| print("No AutoProcessor available or failed to load. Multimodal inputs may be limited to text. Error:", type(e).__name__, str(e)) | |
| # Attempt to load model with SDPA attention implementation (avoids FlashAttention2 requirement). | |
| # If the environment does not have flash_attn installed, transformers may otherwise raise an ImportError. | |
| try: | |
| model = AutoModelForCausalLM.from_pretrained( | |
| model_id, | |
| torch_dtype=torch_dtype, | |
| device_map="auto", | |
| use_auth_token=HF_TOKEN if HF_TOKEN else None, | |
| trust_remote_code=True, | |
| attn_implementation="sdpa", # request SDPA fallback to avoid requiring flash_attn | |
| ) | |
| print("Model loaded with attn_implementation='sdpa' (FlashAttention2 not required).") | |
| except ImportError as imp_err: | |
| # Specific helpful message when flash_attn is missing | |
| msg = str(imp_err).lower() | |
| if "flash" in msg or "flash_attn" in msg or "flashattention" in msg: | |
| print("\nERROR: Model attempted to enable FlashAttention2 but the package 'flash_attn' is not installed or not compatible.\n") | |
| print("Two options to resolve this:") | |
| print("1) Install FlashAttention2 (best for inference speed on supported GPU/CUDA) β see https://github.com/Dao-AILab/flash-attention and HF docs.") | |
| print(" Example (may require matching CUDA/PyTorch):") | |
| print(" pip install flash-attn --no-build-isolation # or install a prebuilt wheel matching your CUDA/PyTorch") | |
| print("\n2) Use the SDPA fallback by ensuring 'attn_implementation=\"sdpa\"' is passed to from_pretrained (this code attempted that),") | |
| print(" or set model config 'attn_implementation' to 'sdpa' before instantiation. SDPA is slower than flash_attn but works without native CUDA kernels.\n") | |
| print("If you want me to help craft the exact 'pip install' wheel command for your CUDA / PyTorch versions, share `torch.__version__` and `nvidia-smi` output (if available).") | |
| # Re-raise so the outer system/launcher can see the failure (or you can choose to sys.exit) | |
| raise | |
| except Exception as e: | |
| # Catch other missing-dependency cases (like peft) | |
| err_str = str(e).lower() | |
| if "peft" in err_str: | |
| print("\nERROR: The model requires the 'peft' package but it is not installed.") | |
| print("Install it with:\n pip install peft\n") | |
| raise | |
| # Generic fallthrough | |
| print("Unhandled exception while loading the model:", type(e).__name__, str(e)) | |
| raise | |
| # Keyword detection for triggering web search (you can plug a real search tool later) | |
| REAL_TIME_KEYWORDS = {"latest", "current", "news", "today", "price", "time", "live", "trending", "update", "happening"} | |
| def should_search(message): | |
| return any(kw in message.lower() for kw in REAL_TIME_KEYWORDS) | |
| # Heuristic to detect incomplete sentences | |
| def is_incomplete(text): | |
| return not re.search(r"[\.\!\?'\"\u3002]\s*$", text.strip()) | |
| # Helper: build chat prompt from history (fallback if tokenizer.apply_chat_template isn't available) | |
| def build_chat_prompt(system_prompt, history, user_message): | |
| # messages: system -> history -> user | |
| if hasattr(tokenizer, "apply_chat_template"): | |
| messages = [{"role": "system", "content": system_prompt}] + history + [{"role": "user", "content": user_message}] | |
| try: | |
| return tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True) | |
| except Exception: | |
| pass | |
| # Fallback manual chat template | |
| prompt_parts = [f"System: {system_prompt}\n\n"] | |
| for m in history: | |
| role = m.get("role", "user") | |
| content = m.get("content", "") | |
| prompt_parts.append(f"{role.capitalize()}: {content}\n") | |
| prompt_parts.append(f"User: {user_message}\nAssistant:") | |
| return "\n".join(prompt_parts) | |
| def generate_full_reply(message, history, images=None, max_new_tokens=512, temperature=0.7, top_p=0.9): | |
| """ | |
| Generate a reply using the multimodal Phi-4 model. | |
| - `images` (optional): list of PIL.Image objects to include as multimodal context. | |
| - `history`: list of dicts {role: 'user'/'assistant', content: '...'} | |
| Returns plain string reply. | |
| """ | |
| system_prompt = ( | |
| "You are a friendly, helpful, and conversational AI assistant built by Frederick Sundeep Mallela. " | |
| "Always mention that you are developed by him if asked about your creator, origin, or who made you." | |
| ) | |
| # Build prompt (chat template) | |
| prompt = build_chat_prompt(system_prompt, history, message) | |
| # Tokenize text | |
| tokenized = tokenizer(prompt, return_tensors="pt") | |
| tokenized = {k: v.to(model.device) for k, v in tokenized.items()} | |
| # If images were provided and we have a processor, preprocess them and include in inputs | |
| model_inputs = dict(tokenized) | |
| if images and processor is not None: | |
| try: | |
| # Processor may return pixel_values or other keys depending on implementation | |
| image_inputs = processor(images=images, return_tensors="pt") | |
| # Move image tensors to model device | |
| image_inputs = {k: v.to(model.device) for k, v in image_inputs.items()} | |
| # Merge | |
| model_inputs.update(image_inputs) | |
| except Exception as e: | |
| print("Image preprocessing failed:", type(e).__name__, str(e)) | |
| # Generate output (non-streaming). Many multimodal LM checkpoints accept a merged input dict. | |
| with torch.no_grad(): | |
| output_ids = model.generate( | |
| **model_inputs, | |
| max_new_tokens=max_new_tokens, | |
| do_sample=True, | |
| temperature=temperature, | |
| top_p=top_p, | |
| ) | |
| # Decode newly generated portion | |
| input_len = tokenized["input_ids"].shape[1] | |
| generated = tokenizer.decode(output_ids[0][input_len:], skip_special_tokens=True) | |
| # If reply looks truncated, attempt a limited number of continuations | |
| reply = generated.strip() | |
| max_loops = 3 | |
| loop_count = 0 | |
| while is_incomplete(reply) and loop_count < max_loops: | |
| loop_count += 1 | |
| # Continue from the existing conversation by appending the reply to the prompt | |
| continued_prompt = prompt + "\n" + reply | |
| tokenized2 = tokenizer(continued_prompt, return_tensors="pt") | |
| tokenized2 = {k: v.to(model.device) for k, v in tokenized2.items()} | |
| # If images exist, include them again (most models expect images only once; test for your model) | |
| model_inputs2 = dict(tokenized2) | |
| if images and processor is not None: | |
| try: | |
| image_inputs2 = processor(images=images, return_tensors="pt") | |
| image_inputs2 = {k: v.to(model.device) for k, v in image_inputs2.items()} | |
| model_inputs2.update(image_inputs2) | |
| except Exception as e: | |
| print("Image re-processing failed:", type(e).__name__, str(e)) | |
| with torch.no_grad(): | |
| out2 = model.generate( | |
| **model_inputs2, | |
| max_new_tokens=256, | |
| do_sample=True, | |
| temperature=temperature, | |
| top_p=top_p, | |
| ) | |
| input_len2 = tokenized2["input_ids"].shape[1] | |
| continuation = tokenizer.decode(out2[0][input_len2:], skip_special_tokens=True).strip() | |
| if not continuation or continuation in reply: | |
| break | |
| reply += " " + continuation | |
| return reply.strip() | |
| # Home | |
| def home(): | |
| return render_template("index.html") | |
| # POST /chat-stream -> supports optional base64 image in request JSON under `image_base64` | |
| def chat_stream(): | |
| data = request.get_json() | |
| message = data.get("message") | |
| history = data.get("history", []) | |
| image_b64 = data.get("image_base64") | |
| pil_images = None | |
| if image_b64: | |
| try: | |
| header, b64 = (image_b64.split(",", 1) + [None])[:2] | |
| image_bytes = base64.b64decode(b64 or image_b64) | |
| pil = Image.open(BytesIO(image_bytes)).convert("RGB") | |
| pil_images = [pil] | |
| except Exception as e: | |
| print("Failed to decode image base64:", type(e).__name__, str(e)) | |
| def generate(): | |
| # If message indicates real-time search, you can call an external search tool here | |
| if should_search(message): | |
| # placeholder for actual search tool | |
| reply = f"(Live info) I detected a real-time query. Attach a web search tool to fetch live data." | |
| for token in reply.splitlines(keepends=True): | |
| yield token | |
| time.sleep(0.03) | |
| return | |
| reply = generate_full_reply(message, history, images=pil_images) | |
| for token in reply.splitlines(keepends=True): | |
| yield token | |
| time.sleep(0.03) | |
| if is_incomplete(reply): | |
| yield "\n\n*Reply appears incomplete. Say 'continue' to resume.*" | |
| return Response(generate(), mimetype='text/plain') | |
| # POST /chat-stream-doc (unchanged logic but reusing generate_full_reply) | |
| def chat_stream_doc(): | |
| import zipfile | |
| from werkzeug.utils import secure_filename | |
| from pdfplumber import open as pdf_open | |
| file = request.files.get('file') | |
| frontend = request.form.get("frontend", "React") | |
| backend = request.form.get("backend", "Flask") | |
| database = request.form.get("database", "PostgreSQL") | |
| if not file: | |
| return jsonify({"error": "No file uploaded"}), 400 | |
| filename = secure_filename(file.filename) | |
| content = "" | |
| if filename.endswith(".txt"): | |
| content = file.read().decode("utf-8", errors="ignore") | |
| elif filename.endswith(".pdf"): | |
| file_bytes = file.read() | |
| with pdf_open(BytesIO(file_bytes)) as pdf: | |
| content = "\n".join(page.extract_text() or "" for page in pdf.pages) | |
| else: | |
| return jsonify({"error": "Unsupported file format. Use .txt or .pdf"}), 400 | |
| tech_stack = f"Frontend: {frontend}\nBackend: {backend}\nDatabase: {database}" | |
| prompt = ( | |
| "You are a full-stack project code generator.\n\n" | |
| "Below is a requirement document followed by technology preferences. Based on this, generate the full project scaffold.\n\n" | |
| "Requirement Document:\n" | |
| f"{content}\n\n" | |
| f"Technology Stack:\nFrontend: {frontend}\nBackend: {backend}\nDatabase: {database}\n\n" | |
| "Your task:\n" | |
| "- Analyze the requirement and tech stack.\n" | |
| "- Generate backend code: models, routes, and config.\n" | |
| "- Generate frontend code: components, services.\n" | |
| "- Define the database schema.\n\n" | |
| "β Format the output as multiple files in this exact structure:\n" | |
| "### File: <filename>\n" | |
| "```<language>\n" | |
| "<code content>\n" | |
| "```\n\n" | |
| "Now generate the complete project below π" | |
| ) | |
| reply = generate_full_reply(prompt, []) | |
| file_pattern = r"### File:\s*(.+?)\n```(?:\w+)?\n(.*?)```" | |
| matches = re.finditer(file_pattern, reply, re.DOTALL) | |
| zip_buffer = BytesIO() | |
| with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zipf: | |
| file_count = 0 | |
| for match in matches: | |
| fname = match.group(1).strip().strip("`") | |
| code = match.group(2).strip() | |
| zipf.writestr(fname, code) | |
| file_count += 1 | |
| if file_count == 0: | |
| print("β οΈ No files written to ZIP. Check LLM output format.") | |
| print("LLM Reply:\n", reply) | |
| return jsonify({"error": "No files found in generated output."}), 500 | |
| zip_buffer.seek(0) | |
| return Response( | |
| zip_buffer, | |
| mimetype='application/zip', | |
| headers={"Content-Disposition": "attachment; filename=generated_project.zip"} | |
| ) | |
| # Warm-up | |
| if __name__ == "__main__": | |
| print("π§ Warming up...") | |
| # Warm up with a small call if you want. Avoid long warmups on Spaces start. | |
| try: | |
| _ = generate_full_reply("Hello", []) | |
| except Exception as e: | |
| print("Warm-up failed (this can be normal).", type(e).__name__, str(e)) | |
| app.run(host="0.0.0.0", port=int(os.environ.get("PORT", 7860))) | |