Update server1.py
Browse files- server1.py +34 -24
server1.py
CHANGED
|
@@ -8,15 +8,24 @@ from fastapi import FastAPI, File, UploadFile, Request
|
|
| 8 |
from fastapi.responses import JSONResponse
|
| 9 |
from PIL import Image, UnidentifiedImageError, ImageFile
|
| 10 |
from torchvision import transforms as T
|
| 11 |
-
import open_clip
|
| 12 |
|
| 13 |
ImageFile.LOAD_TRUNCATED_IMAGES = True
|
| 14 |
|
| 15 |
-
# caches (
|
| 16 |
-
CACHE_ROOT = os.environ.get("
|
| 17 |
-
os.
|
| 18 |
-
|
| 19 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 20 |
torch.set_num_threads(1)
|
| 21 |
os.environ["OMP_NUM_THREADS"] = "1"
|
| 22 |
os.environ["MKL_NUM_THREADS"] = "1"
|
|
@@ -26,7 +35,7 @@ DTYPE = torch.float16 if DEVICE == "cuda" else torch.float32
|
|
| 26 |
if DEVICE == "cuda":
|
| 27 |
torch.set_float32_matmul_precision("high")
|
| 28 |
|
| 29 |
-
# rutas a embeddings (
|
| 30 |
MODEL_EMB_PATH = os.getenv("MODEL_EMB_PATH", "text_embeddings_modelos_bigg.pt")
|
| 31 |
VERS_EMB_PATH = os.getenv("VERS_EMB_PATH", "text_embeddings_bigg.pt")
|
| 32 |
|
|
@@ -35,23 +44,22 @@ PRETRAINED = "laion2b_s39b_b160k"
|
|
| 35 |
|
| 36 |
app = FastAPI(title="OpenCLIP bigG Vehicle API")
|
| 37 |
|
| 38 |
-
#
|
| 39 |
clip_model, _, preprocess = open_clip.create_model_and_transforms(MODEL_NAME, pretrained=PRETRAINED)
|
| 40 |
clip_model = clip_model.to(device=DEVICE, dtype=DTYPE).eval()
|
| 41 |
for p in clip_model.parameters():
|
| 42 |
p.requires_grad = False
|
| 43 |
|
| 44 |
-
# tomar mean/std del preprocess y hacer letterbox propio
|
| 45 |
normalize = next(t for t in preprocess.transforms if isinstance(t, T.Normalize))
|
| 46 |
-
SIZE =
|
| 47 |
if isinstance(SIZE, (tuple, list)):
|
| 48 |
-
SIZE = max(SIZE)
|
| 49 |
if SIZE is None:
|
| 50 |
-
SIZE = 448 # valor razonable
|
| 51 |
|
| 52 |
transform = T.Compose([T.ToTensor(), T.Normalize(mean=normalize.mean, std=normalize.std)])
|
| 53 |
|
| 54 |
-
#
|
| 55 |
def resize_letterbox(img: Image.Image, size: int) -> Image.Image:
|
| 56 |
if img.mode != "RGB":
|
| 57 |
img = img.convert("RGB")
|
|
@@ -59,13 +67,13 @@ def resize_letterbox(img: Image.Image, size: int) -> Image.Image:
|
|
| 59 |
if w == 0 or h == 0:
|
| 60 |
raise UnidentifiedImageError("imagen invalida")
|
| 61 |
scale = size / max(w, h)
|
| 62 |
-
nw, nh = max(1, int(w
|
| 63 |
img_resized = img.resize((nw, nh), Image.BICUBIC)
|
| 64 |
canvas = Image.new("RGB", (size, size), (0, 0, 0))
|
| 65 |
-
canvas.paste(img_resized, ((size
|
| 66 |
return canvas
|
| 67 |
|
| 68 |
-
#
|
| 69 |
def _ensure_label_list(x):
|
| 70 |
if isinstance(x, (list, tuple)):
|
| 71 |
return list(x)
|
|
@@ -83,17 +91,17 @@ def _load_embeddings(path: str):
|
|
| 83 |
model_labels, model_embeddings = _load_embeddings(MODEL_EMB_PATH)
|
| 84 |
version_labels, version_embeddings = _load_embeddings(VERS_EMB_PATH)
|
| 85 |
|
| 86 |
-
# comprobar dimension
|
| 87 |
with torch.inference_mode():
|
| 88 |
dummy = torch.zeros(1, 3, SIZE, SIZE, device=DEVICE, dtype=DTYPE)
|
| 89 |
-
img_dim =
|
| 90 |
if model_embeddings.shape[1] != img_dim or version_embeddings.shape[1] != img_dim:
|
| 91 |
raise RuntimeError(
|
| 92 |
f"dimension mismatch: image={img_dim}, modelos={model_embeddings.shape[1]}, "
|
| 93 |
f"versiones={version_embeddings.shape[1]}. Recalcula embeddings con {MODEL_NAME}/{PRETRAINED}."
|
| 94 |
)
|
| 95 |
|
| 96 |
-
#
|
| 97 |
@torch.inference_mode()
|
| 98 |
def _encode_pil(img: Image.Image) -> torch.Tensor:
|
| 99 |
img = resize_letterbox(img, SIZE)
|
|
@@ -107,7 +115,7 @@ def _topk_cosine(text_feats: torch.Tensor, text_labels: List[str], img_feat: tor
|
|
| 107 |
sim = (img_feat.float() @ text_feats.to(img_feat.device).float().T)[0]
|
| 108 |
vals, idxs = torch.topk(sim, k=k)
|
| 109 |
conf = torch.softmax(vals, dim=0)
|
| 110 |
-
return [{"label": text_labels[int(i)], "confidence": round(float(c)
|
| 111 |
|
| 112 |
def process_image_bytes(front_bytes: bytes, back_bytes: Optional[bytes] = None):
|
| 113 |
if not front_bytes or len(front_bytes) < 128:
|
|
@@ -127,7 +135,7 @@ def process_image_bytes(front_bytes: bytes, back_bytes: Optional[bytes] = None):
|
|
| 127 |
else:
|
| 128 |
img_feat = feat_front
|
| 129 |
|
| 130 |
-
# paso 1:
|
| 131 |
top_model = _topk_cosine(model_embeddings, model_labels, img_feat, k=1)[0]
|
| 132 |
modelo_full = top_model["label"]
|
| 133 |
|
|
@@ -135,7 +143,7 @@ def process_image_bytes(front_bytes: bytes, back_bytes: Optional[bytes] = None):
|
|
| 135 |
marca = partes[0] if len(partes) >= 1 else ""
|
| 136 |
modelo = partes[1] if len(partes) == 2 else ""
|
| 137 |
|
| 138 |
-
# paso 2: versiones
|
| 139 |
matches = [(lab, idx) for idx, lab in enumerate(version_labels) if lab.startswith(modelo_full)]
|
| 140 |
if not matches:
|
| 141 |
return {"brand": marca.upper(), "model": modelo.title(), "version": ""}
|
|
@@ -144,7 +152,7 @@ def process_image_bytes(front_bytes: bytes, back_bytes: Optional[bytes] = None):
|
|
| 144 |
labels_sub = [lab for lab, _ in matches]
|
| 145 |
embeds_sub = version_embeddings[idxs]
|
| 146 |
|
| 147 |
-
# paso 3:
|
| 148 |
top_ver = _topk_cosine(embeds_sub, labels_sub, img_feat, k=1)[0]
|
| 149 |
raw = top_ver["label"]
|
| 150 |
prefix = modelo_full + " "
|
|
@@ -155,7 +163,9 @@ def process_image_bytes(front_bytes: bytes, back_bytes: Optional[bytes] = None):
|
|
| 155 |
|
| 156 |
return {"brand": marca.upper(), "model": modelo.title(), "version": ver.title() if ver else ""}
|
| 157 |
|
| 158 |
-
#
|
|
|
|
|
|
|
| 159 |
@app.get("/")
|
| 160 |
def root():
|
| 161 |
return {"status": "ok", "device": DEVICE, "model": f"{MODEL_NAME}/{PRETRAINED}", "img_dim": int(model_embeddings.shape[1])}
|
|
|
|
| 8 |
from fastapi.responses import JSONResponse
|
| 9 |
from PIL import Image, UnidentifiedImageError, ImageFile
|
| 10 |
from torchvision import transforms as T
|
|
|
|
| 11 |
|
| 12 |
ImageFile.LOAD_TRUNCATED_IMAGES = True
|
| 13 |
|
| 14 |
+
# ===== caches (usar ruta propia, escribible en runtime) =====
|
| 15 |
+
CACHE_ROOT = os.environ.get("APP_CACHE", "/tmp/appcache")
|
| 16 |
+
os.environ["XDG_CACHE_HOME"] = CACHE_ROOT
|
| 17 |
+
os.environ["HF_HOME"] = os.path.join(CACHE_ROOT, "hf")
|
| 18 |
+
os.environ["HUGGINGFACE_HUB_CACHE"] = os.environ["HF_HOME"]
|
| 19 |
+
os.environ["TRANSFORMERS_CACHE"] = os.environ["HF_HOME"]
|
| 20 |
+
os.environ["OPENCLIP_CACHE_DIR"] = os.path.join(CACHE_ROOT, "open_clip")
|
| 21 |
+
os.environ["TORCH_HOME"] = os.path.join(CACHE_ROOT, "torch")
|
| 22 |
+
os.makedirs(os.environ["HF_HOME"], exist_ok=True)
|
| 23 |
+
os.makedirs(os.environ["OPENCLIP_CACHE_DIR"], exist_ok=True)
|
| 24 |
+
os.makedirs(os.environ["TORCH_HOME"], exist_ok=True)
|
| 25 |
+
|
| 26 |
+
import open_clip # <-- importar despues de ajustar caches
|
| 27 |
+
|
| 28 |
+
# ===== limites basicos =====
|
| 29 |
torch.set_num_threads(1)
|
| 30 |
os.environ["OMP_NUM_THREADS"] = "1"
|
| 31 |
os.environ["MKL_NUM_THREADS"] = "1"
|
|
|
|
| 35 |
if DEVICE == "cuda":
|
| 36 |
torch.set_float32_matmul_precision("high")
|
| 37 |
|
| 38 |
+
# ===== rutas a embeddings (bigG) =====
|
| 39 |
MODEL_EMB_PATH = os.getenv("MODEL_EMB_PATH", "text_embeddings_modelos_bigg.pt")
|
| 40 |
VERS_EMB_PATH = os.getenv("VERS_EMB_PATH", "text_embeddings_bigg.pt")
|
| 41 |
|
|
|
|
| 44 |
|
| 45 |
app = FastAPI(title="OpenCLIP bigG Vehicle API")
|
| 46 |
|
| 47 |
+
# ===== modelo / preprocess =====
|
| 48 |
clip_model, _, preprocess = open_clip.create_model_and_transforms(MODEL_NAME, pretrained=PRETRAINED)
|
| 49 |
clip_model = clip_model.to(device=DEVICE, dtype=DTYPE).eval()
|
| 50 |
for p in clip_model.parameters():
|
| 51 |
p.requires_grad = False
|
| 52 |
|
|
|
|
| 53 |
normalize = next(t for t in preprocess.transforms if isinstance(t, T.Normalize))
|
| 54 |
+
SIZE = next((getattr(t, "size", None) for t in preprocess.transforms if hasattr(t, "size")), None)
|
| 55 |
if isinstance(SIZE, (tuple, list)):
|
| 56 |
+
SIZE = max(SIZE) # por si viene como (H,W)
|
| 57 |
if SIZE is None:
|
| 58 |
+
SIZE = 448 # valor razonable por defecto
|
| 59 |
|
| 60 |
transform = T.Compose([T.ToTensor(), T.Normalize(mean=normalize.mean, std=normalize.std)])
|
| 61 |
|
| 62 |
+
# ===== utils imagen =====
|
| 63 |
def resize_letterbox(img: Image.Image, size: int) -> Image.Image:
|
| 64 |
if img.mode != "RGB":
|
| 65 |
img = img.convert("RGB")
|
|
|
|
| 67 |
if w == 0 or h == 0:
|
| 68 |
raise UnidentifiedImageError("imagen invalida")
|
| 69 |
scale = size / max(w, h)
|
| 70 |
+
nw, nh = max(1, int(w*scale)), max(1, int(h*scale))
|
| 71 |
img_resized = img.resize((nw, nh), Image.BICUBIC)
|
| 72 |
canvas = Image.new("RGB", (size, size), (0, 0, 0))
|
| 73 |
+
canvas.paste(img_resized, ((size-nw)//2, (size-nh)//2))
|
| 74 |
return canvas
|
| 75 |
|
| 76 |
+
# ===== cargar embeddings =====
|
| 77 |
def _ensure_label_list(x):
|
| 78 |
if isinstance(x, (list, tuple)):
|
| 79 |
return list(x)
|
|
|
|
| 91 |
model_labels, model_embeddings = _load_embeddings(MODEL_EMB_PATH)
|
| 92 |
version_labels, version_embeddings = _load_embeddings(VERS_EMB_PATH)
|
| 93 |
|
| 94 |
+
# comprobar dimension: debe ser 1280 para bigG
|
| 95 |
with torch.inference_mode():
|
| 96 |
dummy = torch.zeros(1, 3, SIZE, SIZE, device=DEVICE, dtype=DTYPE)
|
| 97 |
+
img_dim = clip_model.encode_image(dummy).shape[-1]
|
| 98 |
if model_embeddings.shape[1] != img_dim or version_embeddings.shape[1] != img_dim:
|
| 99 |
raise RuntimeError(
|
| 100 |
f"dimension mismatch: image={img_dim}, modelos={model_embeddings.shape[1]}, "
|
| 101 |
f"versiones={version_embeddings.shape[1]}. Recalcula embeddings con {MODEL_NAME}/{PRETRAINED}."
|
| 102 |
)
|
| 103 |
|
| 104 |
+
# ===== inferencia =====
|
| 105 |
@torch.inference_mode()
|
| 106 |
def _encode_pil(img: Image.Image) -> torch.Tensor:
|
| 107 |
img = resize_letterbox(img, SIZE)
|
|
|
|
| 115 |
sim = (img_feat.float() @ text_feats.to(img_feat.device).float().T)[0]
|
| 116 |
vals, idxs = torch.topk(sim, k=k)
|
| 117 |
conf = torch.softmax(vals, dim=0)
|
| 118 |
+
return [{"label": text_labels[int(i)], "confidence": round(float(c)*100.0, 2)} for i, c in zip(idxs, conf)]
|
| 119 |
|
| 120 |
def process_image_bytes(front_bytes: bytes, back_bytes: Optional[bytes] = None):
|
| 121 |
if not front_bytes or len(front_bytes) < 128:
|
|
|
|
| 135 |
else:
|
| 136 |
img_feat = feat_front
|
| 137 |
|
| 138 |
+
# paso 1: modelo
|
| 139 |
top_model = _topk_cosine(model_embeddings, model_labels, img_feat, k=1)[0]
|
| 140 |
modelo_full = top_model["label"]
|
| 141 |
|
|
|
|
| 143 |
marca = partes[0] if len(partes) >= 1 else ""
|
| 144 |
modelo = partes[1] if len(partes) == 2 else ""
|
| 145 |
|
| 146 |
+
# paso 2: versiones por prefijo
|
| 147 |
matches = [(lab, idx) for idx, lab in enumerate(version_labels) if lab.startswith(modelo_full)]
|
| 148 |
if not matches:
|
| 149 |
return {"brand": marca.upper(), "model": modelo.title(), "version": ""}
|
|
|
|
| 152 |
labels_sub = [lab for lab, _ in matches]
|
| 153 |
embeds_sub = version_embeddings[idxs]
|
| 154 |
|
| 155 |
+
# paso 3: version
|
| 156 |
top_ver = _topk_cosine(embeds_sub, labels_sub, img_feat, k=1)[0]
|
| 157 |
raw = top_ver["label"]
|
| 158 |
prefix = modelo_full + " "
|
|
|
|
| 163 |
|
| 164 |
return {"brand": marca.upper(), "model": modelo.title(), "version": ver.title() if ver else ""}
|
| 165 |
|
| 166 |
+
# ===== endpoints =====
|
| 167 |
+
app = FastAPI(title="OpenCLIP bigG Vehicle API")
|
| 168 |
+
|
| 169 |
@app.get("/")
|
| 170 |
def root():
|
| 171 |
return {"status": "ok", "device": DEVICE, "model": f"{MODEL_NAME}/{PRETRAINED}", "img_dim": int(model_embeddings.shape[1])}
|