| |
| import sys |
| import types |
| import importlib.util |
|
|
| flash_mock = types.ModuleType("flash_attn") |
| flash_mock.__version__ = "2.0.0" |
| flash_mock.__spec__ = importlib.util.spec_from_loader("flash_attn", loader=None) |
| sys.modules["flash_attn"] = flash_mock |
| sys.modules["flash_attn.flash_attn_interface"] = types.ModuleType("flash_attn.flash_attn_interface") |
| sys.modules["flash_attn.bert_padding"] = types.ModuleType("flash_attn.bert_padding") |
| |
|
|
| import io |
| import time |
| import httpx |
| import torch |
| from PIL import Image |
| from transformers import ( |
| BlipProcessor, BlipForQuestionAnswering, |
| AutoProcessor, AutoModelForCausalLM |
| ) |
| from fastapi import FastAPI, HTTPException, UploadFile, File |
| from fastapi.middleware.cors import CORSMiddleware |
| from pydantic import BaseModel |
| from contextlib import asynccontextmanager |
|
|
| |
| BLIP_MODEL_ID = "Salesforce/blip-vqa-base" |
| FLORENCE_MODEL_ID = "microsoft/Florence-2-large-ft" |
|
|
| |
| QUESTIONS = [ |
| "is there a person in this image?", |
| "is there a woman in this image?", |
| "is there a human body part in this image?", |
| "is there a hand or arm visible?", |
| "is there a face visible?", |
| "is there a leg or foot visible?", |
| "is there a belly or stomach visible?", |
| ] |
|
|
| |
| FLORENCE_QUESTION = ( |
| "Is there a woman or any part of a woman's body in this image? " |
| "Answer yes or no only." |
| ) |
|
|
| MODEL_DATA = {} |
|
|
| @asynccontextmanager |
| async def lifespan(app: FastAPI): |
| |
| print(f"๐ฅ Loading {BLIP_MODEL_ID}...") |
| start = time.time() |
| MODEL_DATA["blip_processor"] = BlipProcessor.from_pretrained(BLIP_MODEL_ID) |
| MODEL_DATA["blip_model"] = BlipForQuestionAnswering.from_pretrained( |
| BLIP_MODEL_ID, torch_dtype=torch.float32 |
| ).eval() |
| print(f"โ
BLIP ready in {time.time()-start:.1f}s") |
|
|
| |
| print(f"๐ฅ Loading {FLORENCE_MODEL_ID}...") |
| start = time.time() |
| MODEL_DATA["florence_processor"] = AutoProcessor.from_pretrained( |
| FLORENCE_MODEL_ID, trust_remote_code=True |
| ) |
| MODEL_DATA["florence_model"] = AutoModelForCausalLM.from_pretrained( |
| FLORENCE_MODEL_ID, |
| torch_dtype=torch.float32, |
| trust_remote_code=True, |
| attn_implementation="eager" |
| ).eval() |
| print(f"โ
Florence-2 ready in {time.time()-start:.1f}s") |
|
|
| yield |
| MODEL_DATA.clear() |
|
|
| app = FastAPI( |
| title="AI Shield - Dual Model Detection", |
| description="BLIP + Florence-2-large-ft | Compatible with AI Shield Chrome Extension", |
| version="6.0.0", |
| lifespan=lifespan |
| ) |
|
|
| app.add_middleware( |
| CORSMiddleware, |
| allow_origins=["*"], |
| allow_credentials=True, |
| allow_methods=["*"], |
| allow_headers=["*"], |
| ) |
|
|
| |
| class ImageUrlRequest(BaseModel): |
| image_url: str |
|
|
| |
| def run_blip(image: Image.Image) -> dict: |
| processor = MODEL_DATA["blip_processor"] |
| model = MODEL_DATA["blip_model"] |
| yes_answers = {} |
| no_answers = {} |
|
|
| for question in QUESTIONS: |
| inputs = processor(image, question, return_tensors="pt") |
| with torch.no_grad(): |
| out = model.generate(**inputs, max_new_tokens=5) |
| answer = processor.decode(out[0], skip_special_tokens=True).strip().lower() |
| if answer == "yes" or answer.startswith("yes"): |
| yes_answers[question] = answer |
| else: |
| no_answers[question] = answer |
|
|
| return {"yes": yes_answers, "no": no_answers} |
|
|
| |
| def run_florence(image: Image.Image) -> dict: |
| processor = MODEL_DATA["florence_processor"] |
| model = MODEL_DATA["florence_model"] |
|
|
| task = "<VQA>" |
| prompt = f"{task}{FLORENCE_QUESTION}" |
| inputs = processor(text=prompt, images=image, return_tensors="pt") |
|
|
| start = time.time() |
| with torch.no_grad(): |
| generated_ids = model.generate( |
| input_ids=inputs["input_ids"], |
| pixel_values=inputs["pixel_values"], |
| max_new_tokens=10, |
| do_sample=False |
| ) |
|
|
| generated_text = processor.batch_decode(generated_ids, skip_special_tokens=False)[0] |
| parsed = processor.post_process_generation( |
| generated_text, task=task, |
| image_size=(image.width, image.height) |
| ) |
| answer = parsed.get(task, "").strip().lower() |
| elapsed = round(time.time() - start, 2) |
|
|
| if answer == "no" or answer.startswith("no"): |
| return {"decision": "ALLOW", "answer": answer, "elapsed": elapsed} |
| else: |
| return {"decision": "BLOCK", "answer": answer, "elapsed": elapsed} |
|
|
| |
| def process_image(image: Image.Image) -> dict: |
| total_start = time.time() |
|
|
| |
| blip_start = time.time() |
| blip_result = run_blip(image) |
| blip_elapsed = round(time.time() - blip_start, 2) |
|
|
| yes_q = blip_result["yes"] |
| no_q = blip_result["no"] |
|
|
| |
| WOMAN_QUESTIONS = [ |
| "is there a woman in this image?", |
| ] |
| woman_detected = any(q in yes_q for q in WOMAN_QUESTIONS) |
|
|
| if woman_detected: |
| return { |
| "decision": "BLOCK", |
| "reason": "blip_detected_woman_directly", |
| "stage": "blip_only", |
| "blip_yes": yes_q, |
| "blip_no": no_q, |
| "blip_time": blip_elapsed, |
| "florence_used": False, |
| "total_time": round(time.time() - total_start, 2), |
| "status": "success" |
| } |
|
|
| |
| if not yes_q: |
| return { |
| "decision": "ALLOW", |
| "reason": "blip_no_human_detected", |
| "stage": "blip_only", |
| "blip_yes": yes_q, |
| "blip_no": no_q, |
| "blip_time": blip_elapsed, |
| "florence_used": False, |
| "total_time": round(time.time() - total_start, 2), |
| "status": "success" |
| } |
|
|
| |
| florence_result = run_florence(image) |
|
|
| final_decision = florence_result["decision"] |
| reason = "florence_confirmed_woman" if final_decision == "BLOCK" \ |
| else "florence_confirmed_no_woman" |
|
|
| return { |
| "decision": final_decision, |
| "reason": reason, |
| "stage": "blip_then_florence", |
| "blip_yes": yes_q, |
| "blip_no": no_q, |
| "blip_time": blip_elapsed, |
| "florence_answer": florence_result["answer"], |
| "florence_time": florence_result["elapsed"], |
| "florence_used": True, |
| "total_time": round(time.time() - total_start, 2), |
| "status": "success" |
| } |
|
|
| |
| @app.get("/health") |
| def health(): |
| return { |
| "status": "ok", |
| "blip_loaded": "blip_model" in MODEL_DATA, |
| "florence_loaded": "florence_model" in MODEL_DATA |
| } |
|
|
| |
| @app.post("/analyze") |
| async def analyze_from_url(request: ImageUrlRequest): |
| try: |
| async with httpx.AsyncClient(timeout=30) as client: |
| response = await client.get(request.image_url) |
| response.raise_for_status() |
| image_bytes = response.content |
| except Exception as e: |
| raise HTTPException(status_code=400, detail=f"ูุดู ุชุญู
ูู ุงูุตูุฑุฉ: {str(e)}") |
|
|
| try: |
| image = Image.open(io.BytesIO(image_bytes)).convert("RGB") |
| except Exception as e: |
| raise HTTPException(status_code=400, detail=f"ุฎุทุฃ ูู ูุฑุงุกุฉ ุงูุตูุฑุฉ: {str(e)}") |
|
|
| return process_image(image) |
|
|
| |
| @app.post("/analyze-file") |
| async def analyze_from_file(file: UploadFile = File(...)): |
| if not file.content_type.startswith("image/"): |
| raise HTTPException(status_code=400, detail="ุงูู
ูู ููุณ ุตูุฑุฉ") |
|
|
| try: |
| image = Image.open(io.BytesIO(await file.read())).convert("RGB") |
| except Exception as e: |
| raise HTTPException(status_code=400, detail=f"ุฎุทุฃ ูู ูุฑุงุกุฉ ุงูุตูุฑุฉ: {str(e)}") |
|
|
| return process_image(image) |
|
|
|
|
| if __name__ == "__main__": |
| import uvicorn |
| uvicorn.run(app, host="0.0.0.0", port=7860) |