Spaces:
Sleeping
Sleeping
Update tagger.py
Browse files
tagger.py
CHANGED
|
@@ -1,11 +1,10 @@
|
|
| 1 |
from __future__ import annotations
|
| 2 |
|
| 3 |
"""
|
| 4 |
-
|
| 5 |
|
| 6 |
-
-
|
| 7 |
-
-
|
| 8 |
-
- Sidecar: writes ./data/<stem>.json with {"caption","tags","timestamp"}
|
| 9 |
"""
|
| 10 |
|
| 11 |
import os
|
|
@@ -13,42 +12,35 @@ import datetime as _dt
|
|
| 13 |
import json as _json
|
| 14 |
import pathlib as _pl
|
| 15 |
import re as _re
|
| 16 |
-
import
|
| 17 |
-
from typing import List
|
| 18 |
|
| 19 |
import torch
|
| 20 |
from PIL import Image
|
| 21 |
from transformers import BlipForConditionalGeneration, BlipProcessor
|
| 22 |
|
| 23 |
-
#
|
| 24 |
CAP_TAG_DIR = _pl.Path(os.environ.get("CAP_TAG_DIR", "./data")).resolve()
|
| 25 |
CAP_TAG_DIR.mkdir(parents=True, exist_ok=True)
|
| 26 |
|
| 27 |
-
# Device +
|
| 28 |
_device = "cuda" if torch.cuda.is_available() else "cpu"
|
| 29 |
_processor = BlipProcessor.from_pretrained("Salesforce/blip-image-captioning-base")
|
| 30 |
_model = BlipForConditionalGeneration.from_pretrained(
|
| 31 |
"Salesforce/blip-image-captioning-base"
|
| 32 |
).to(_device)
|
| 33 |
|
| 34 |
-
#
|
| 35 |
_STOP = {
|
| 36 |
-
"a",
|
| 37 |
-
"of",
|
| 38 |
-
"is",
|
| 39 |
-
"that",
|
| 40 |
-
"up",
|
| 41 |
}
|
| 42 |
|
| 43 |
-
def
|
| 44 |
-
"""
|
| 45 |
-
Convert a caption string to up to k simple tags:
|
| 46 |
-
- lowercase alphanumeric/hyphen tokens
|
| 47 |
-
- remove short/stopword tokens
|
| 48 |
-
- keep first unique occurrences (order-preserving)
|
| 49 |
-
"""
|
| 50 |
tokens = _re.findall(r"[a-z0-9-]+", caption.lower())
|
| 51 |
-
|
| 52 |
for w in tokens:
|
| 53 |
if len(w) <= 2 or w in _STOP:
|
| 54 |
continue
|
|
@@ -64,11 +56,11 @@ def tag_pil_image(
|
|
| 64 |
stem: str,
|
| 65 |
*,
|
| 66 |
top_k: int = 5,
|
| 67 |
-
|
| 68 |
-
|
| 69 |
-
|
| 70 |
-
|
| 71 |
-
|
| 72 |
inputs = _processor(images=img, return_tensors="pt")
|
| 73 |
if _device == "cuda":
|
| 74 |
inputs = {k: v.to(_device) for k, v in inputs.items()}
|
|
@@ -76,23 +68,16 @@ def tag_pil_image(
|
|
| 76 |
ids = _model.generate(**inputs, max_length=30)
|
| 77 |
caption = _processor.decode(ids[0], skip_special_tokens=True)
|
| 78 |
|
| 79 |
-
tags
|
|
|
|
| 80 |
|
|
|
|
| 81 |
payload = {
|
| 82 |
"caption": caption,
|
| 83 |
"tags": tags,
|
| 84 |
"timestamp": _dt.datetime.now(_dt.timezone.utc).isoformat(),
|
| 85 |
}
|
| 86 |
-
(CAP_TAG_DIR / f"{
|
| 87 |
-
return tags
|
| 88 |
|
| 89 |
-
|
| 90 |
-
if len(_sys.argv) < 2:
|
| 91 |
-
_sys.exit("Usage: python tagger.py <image_path> [top_k]")
|
| 92 |
-
path = _pl.Path(_sys.argv[1])
|
| 93 |
-
if not path.exists():
|
| 94 |
-
_sys.exit(f"File not found: {path}")
|
| 95 |
-
k = int(_sys.argv[2]) if len(_sys.argv) > 2 else 5
|
| 96 |
-
with Image.open(path).convert("RGB") as im:
|
| 97 |
-
print("tags:", ", ".join(tag_pil_image(im, path.stem, top_k=k)))
|
| 98 |
|
|
|
|
| 1 |
from __future__ import annotations
|
| 2 |
|
| 3 |
"""
|
| 4 |
+
Caption with BLIP and derive simple tags (no POS/NLTK).
|
| 5 |
|
| 6 |
+
- Tags are first unique non-stopword tokens from the caption.
|
| 7 |
+
- Sidecar saved to ./data/<stem>.json
|
|
|
|
| 8 |
"""
|
| 9 |
|
| 10 |
import os
|
|
|
|
| 12 |
import json as _json
|
| 13 |
import pathlib as _pl
|
| 14 |
import re as _re
|
| 15 |
+
from typing import List, Tuple
|
|
|
|
| 16 |
|
| 17 |
import torch
|
| 18 |
from PIL import Image
|
| 19 |
from transformers import BlipForConditionalGeneration, BlipProcessor
|
| 20 |
|
| 21 |
+
# Writable sidecar directory (writable on Spaces)
|
| 22 |
CAP_TAG_DIR = _pl.Path(os.environ.get("CAP_TAG_DIR", "./data")).resolve()
|
| 23 |
CAP_TAG_DIR.mkdir(parents=True, exist_ok=True)
|
| 24 |
|
| 25 |
+
# Device + singletons
|
| 26 |
_device = "cuda" if torch.cuda.is_available() else "cpu"
|
| 27 |
_processor = BlipProcessor.from_pretrained("Salesforce/blip-image-captioning-base")
|
| 28 |
_model = BlipForConditionalGeneration.from_pretrained(
|
| 29 |
"Salesforce/blip-image-captioning-base"
|
| 30 |
).to(_device)
|
| 31 |
|
| 32 |
+
# very small stopword set to clean tags
|
| 33 |
_STOP = {
|
| 34 |
+
"a","an","the","and","or","but","if","then","so","to","from",
|
| 35 |
+
"of","in","on","at","by","for","with","without","into","out",
|
| 36 |
+
"is","are","was","were","be","being","been","it","its","this",
|
| 37 |
+
"that","these","those","as","over","under","near","above","below",
|
| 38 |
+
"up","down","left","right"
|
| 39 |
}
|
| 40 |
|
| 41 |
+
def _caption_to_tags(caption: str, k: int) -> List[str]:
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 42 |
tokens = _re.findall(r"[a-z0-9-]+", caption.lower())
|
| 43 |
+
out, seen = [], set()
|
| 44 |
for w in tokens:
|
| 45 |
if len(w) <= 2 or w in _STOP:
|
| 46 |
continue
|
|
|
|
| 56 |
stem: str,
|
| 57 |
*,
|
| 58 |
top_k: int = 5,
|
| 59 |
+
) -> Tuple[str, List[str]]:
|
| 60 |
+
# sanitize stem for filesystem
|
| 61 |
+
safe_stem = _re.sub(r"[^A-Za-z0-9_.-]+", "_", stem) or "upload"
|
| 62 |
+
|
| 63 |
+
# caption
|
| 64 |
inputs = _processor(images=img, return_tensors="pt")
|
| 65 |
if _device == "cuda":
|
| 66 |
inputs = {k: v.to(_device) for k, v in inputs.items()}
|
|
|
|
| 68 |
ids = _model.generate(**inputs, max_length=30)
|
| 69 |
caption = _processor.decode(ids[0], skip_special_tokens=True)
|
| 70 |
|
| 71 |
+
# tags
|
| 72 |
+
tags = _caption_to_tags(caption, top_k)
|
| 73 |
|
| 74 |
+
# sidecar
|
| 75 |
payload = {
|
| 76 |
"caption": caption,
|
| 77 |
"tags": tags,
|
| 78 |
"timestamp": _dt.datetime.now(_dt.timezone.utc).isoformat(),
|
| 79 |
}
|
| 80 |
+
(CAP_TAG_DIR / f"{safe_stem}.json").write_text(_json.dumps(payload, indent=2))
|
|
|
|
| 81 |
|
| 82 |
+
return caption, tags
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 83 |
|