Spaces:
No application file
No application file
| import os | |
| import uuid | |
| import logging | |
| from contextlib import asynccontextmanager | |
| from fastapi import FastAPI, UploadFile, File, Form, Depends, HTTPException, Security, Request | |
| from fastapi.security.api_key import APIKeyHeader | |
| from fastapi.responses import HTMLResponse, JSONResponse | |
| from fastapi.templating import Jinja2Templates | |
| from fastapi.middleware.cors import CORSMiddleware | |
| import torch | |
| from transformers import AutoProcessor, AutoModelForCausalLM | |
| from qwen_vl_utils import process_vision_info | |
| # Setup logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # Global model pointers | |
| model = None | |
| processor = None | |
| async def lifespan(app: FastAPI): | |
| global model, processor | |
| model_id = "Qwen/Qwen3.5-0.8B" | |
| logger.info(f"Loading {model_id} on CPU in FP16 precision...") | |
| try: | |
| # Load model strictly on CPU in FP16 (Not 4-bit) | |
| model = AutoModelForCausalLM.from_pretrained( | |
| model_id, | |
| torch_dtype=torch.float16, | |
| device_map="cpu" | |
| ) | |
| processor = AutoProcessor.from_pretrained(model_id) | |
| logger.info("Model successfully loaded to memory.") | |
| except Exception as e: | |
| logger.error(f"Error loading model: {e}") | |
| yield | |
| logger.info("Cleaning up resources...") | |
| app = FastAPI(title="Qwen3.5 Vision AI Analytics API", lifespan=lifespan) | |
| # Allow CORS for UI access | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # Robust API Key Authentication | |
| # Fallback local key for development, use HF Space Secrets in production | |
| API_KEY = os.environ.get("API_KEY", "your-super-secure-api-key-2026") | |
| api_key_header = APIKeyHeader(name="X-API-Key", auto_error=False) | |
| def get_api_key(api_key: str = Security(api_key_header)): | |
| if not api_key: | |
| raise HTTPException(status_code=401, detail="API Key is missing") | |
| if api_key != API_KEY: | |
| raise HTTPException(status_code=403, detail="Invalid API Key") | |
| return api_key | |
| templates = Jinja2Templates(directory="templates") | |
| async def serve_ui(request: Request): | |
| """Serves the modern frontend""" | |
| return templates.TemplateResponse("index.html", {"request": request}) | |
| async def analyze_media( | |
| file: UploadFile = File(...), | |
| prompt: str = Form(""), | |
| api_key: str = Security(get_api_key) | |
| ): | |
| """Secured Core Engine for Image & Video Analytics""" | |
| if model is None or processor is None: | |
| raise HTTPException(status_code=503, detail="Model is still initializing. Try again shortly.") | |
| file_path = None | |
| try: | |
| # Isolate and save user files securely with UUIDs | |
| file_ext = file.filename.split('.')[-1].lower() | |
| file_id = str(uuid.uuid4()) | |
| file_path = f"uploads/{file_id}.{file_ext}" | |
| with open(file_path, "wb") as f: | |
| f.write(await file.read()) | |
| # Intelligent File Routing | |
| if file_ext in ['mp4', 'avi', 'mov', 'mkv', 'webm']: | |
| media_type = "video" | |
| elif file_ext in ['jpg', 'jpeg', 'png', 'gif', 'bmp', 'webp']: | |
| media_type = "image" | |
| else: | |
| raise HTTPException(status_code=400, detail="Unsupported file format") | |
| # Fallback to general reporting feature if no prompt is provided | |
| user_prompt = prompt.strip() | |
| if not user_prompt: | |
| user_prompt = "Carefully analyze this media. Generate a highly detailed, professional report describing the contents, context, dynamics, and any notable elements." | |
| # Apply Qwen-VL Format | |
| media_content = {"type": media_type, media_type: f"file://{os.path.abspath(file_path)}"} | |
| messages = [{"role": "user", "content": [media_content, {"type": "text", "text": user_prompt}]}] | |
| text = processor.apply_chat_template(messages, tokenize=False, add_generation_prompt=True) | |
| image_inputs, video_inputs = process_vision_info(messages) | |
| inputs = processor( | |
| text=[text], | |
| images=image_inputs, | |
| videos=video_inputs, | |
| padding=True, | |
| return_tensors="pt" | |
| ) | |
| # Route processing to CPU | |
| inputs = inputs.to("cpu") | |
| with torch.no_grad(): | |
| generated_ids = model.generate( | |
| **inputs, | |
| max_new_tokens=242000, | |
| temperature=0.7, | |
| do_sample=True | |
| ) | |
| # Trim history tokens to get strictly new output | |
| generated_ids_trimmed = [ | |
| out_ids[len(in_ids):] for in_ids, out_ids in zip(inputs.input_ids, generated_ids) | |
| ] | |
| output_text = processor.batch_decode( | |
| generated_ids_trimmed, skip_special_tokens=True, clean_up_tokenization_spaces=False | |
| )[0] | |
| return JSONResponse(content={"status": "success", "report": output_text}) | |
| except HTTPException as he: | |
| raise he | |
| except Exception as e: | |
| logger.error(f"Analysis Error: {str(e)}") | |
| raise HTTPException(status_code=500, detail="Internal Server Error during AI processing.") | |
| finally: | |
| # Guarantee disk cleanup after response is captured | |
| if file_path and os.path.exists(file_path): | |
| os.remove(file_path) |