Video-Analyzer / app.py
Bluestrikeai's picture
Create app.py
59fde3f verified
import os
import uuid
import logging
from contextlib import asynccontextmanager
from fastapi import FastAPI, UploadFile, File, Form, Depends, HTTPException, Security, Request
from fastapi.security.api_key import APIKeyHeader
from fastapi.responses import HTMLResponse, JSONResponse
from fastapi.templating import Jinja2Templates
from fastapi.middleware.cors import CORSMiddleware
import torch
from transformers import AutoProcessor, AutoModelForCausalLM
from qwen_vl_utils import process_vision_info
# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Global model pointers
model = None
processor = None
@asynccontextmanager
async def lifespan(app: FastAPI):
global model, processor
model_id = "Qwen/Qwen3.5-0.8B"
logger.info(f"Loading {model_id} on CPU in FP16 precision...")
try:
# Load model strictly on CPU in FP16 (Not 4-bit)
model = AutoModelForCausalLM.from_pretrained(
model_id,
torch_dtype=torch.float16,
device_map="cpu"
)
processor = AutoProcessor.from_pretrained(model_id)
logger.info("Model successfully loaded to memory.")
except Exception as e:
logger.error(f"Error loading model: {e}")
yield
logger.info("Cleaning up resources...")
app = FastAPI(title="Qwen3.5 Vision AI Analytics API", lifespan=lifespan)
# Allow CORS for UI access
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
)
# Robust API Key Authentication
# Fallback local key for development, use HF Space Secrets in production
API_KEY = os.environ.get("API_KEY", "your-super-secure-api-key-2026")
api_key_header = APIKeyHeader(name="X-API-Key", auto_error=False)
def get_api_key(api_key: str = Security(api_key_header)):
if not api_key:
raise HTTPException(status_code=401, detail="API Key is missing")
if api_key != API_KEY:
raise HTTPException(status_code=403, detail="Invalid API Key")
return api_key
templates = Jinja2Templates(directory="templates")
@app.get("/", response_class=HTMLResponse)
async def serve_ui(request: Request):
"""Serves the modern frontend"""
return templates.TemplateResponse("index.html", {"request": request})
@app.post("/api/analyze")
async def analyze_media(
file: UploadFile = File(...),
prompt: str = Form(""),
api_key: str = Security(get_api_key)
):
"""Secured Core Engine for Image & Video Analytics"""
if model is None or processor is None:
raise HTTPException(status_code=503, detail="Model is still initializing. Try again shortly.")
file_path = None
try:
# Isolate and save user files securely with UUIDs
file_ext = file.filename.split('.')[-1].lower()
file_id = str(uuid.uuid4())
file_path = f"uploads/{file_id}.{file_ext}"
with open(file_path, "wb") as f:
f.write(await file.read())
# Intelligent File Routing
if file_ext in ['mp4', 'avi', 'mov', 'mkv', 'webm']:
media_type = "video"
elif file_ext in ['jpg', 'jpeg', 'png', 'gif', 'bmp', 'webp']:
media_type = "image"
else:
raise HTTPException(status_code=400, detail="Unsupported file format")
# Fallback to general reporting feature if no prompt is provided
user_prompt = prompt.strip()
if not user_prompt:
user_prompt = "Carefully analyze this media. Generate a highly detailed, professional report describing the contents, context, dynamics, and any notable elements."
# Apply Qwen-VL Format
media_content = {"type": media_type, media_type: f"file://{os.path.abspath(file_path)}"}
messages = [{"role": "user", "content": [media_content, {"type": "text", "text": user_prompt}]}]
text = processor.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
image_inputs, video_inputs = process_vision_info(messages)
inputs = processor(
text=[text],
images=image_inputs,
videos=video_inputs,
padding=True,
return_tensors="pt"
)
# Route processing to CPU
inputs = inputs.to("cpu")
with torch.no_grad():
generated_ids = model.generate(
**inputs,
max_new_tokens=242000,
temperature=0.7,
do_sample=True
)
# Trim history tokens to get strictly new output
generated_ids_trimmed = [
out_ids[len(in_ids):] for in_ids, out_ids in zip(inputs.input_ids, generated_ids)
]
output_text = processor.batch_decode(
generated_ids_trimmed, skip_special_tokens=True, clean_up_tokenization_spaces=False
)[0]
return JSONResponse(content={"status": "success", "report": output_text})
except HTTPException as he:
raise he
except Exception as e:
logger.error(f"Analysis Error: {str(e)}")
raise HTTPException(status_code=500, detail="Internal Server Error during AI processing.")
finally:
# Guarantee disk cleanup after response is captured
if file_path and os.path.exists(file_path):
os.remove(file_path)