ANISA09 commited on
Commit
fccd019
·
verified ·
1 Parent(s): 749d298

Create media_utils.py

Browse files
Files changed (1) hide show
  1. media_utils.py +150 -0
media_utils.py ADDED
@@ -0,0 +1,150 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # media_utils.py
2
+ import logging
3
+ import re
4
+ from io import BytesIO
5
+ from typing import Optional, Tuple
6
+ from urllib.parse import urlparse
7
+
8
+ import requests
9
+ from PIL import Image, ExifTags
10
+
11
+ # optional OCR and HF pipelines
12
+ try:
13
+ import pytesseract
14
+ except Exception:
15
+ pytesseract = None
16
+
17
+ from transformers import pipeline
18
+ import os
19
+
20
+ logger = logging.getLogger("media_utils")
21
+
22
+ HF_IMAGE_CAPTION = os.getenv("HF_IMAGE_CAPTION", "nlpconnect/vit-gpt2-image-captioning")
23
+ HF_IMAGE_CLASSIFIER = os.getenv("HF_IMAGE_CLASSIFIER", "google/vit-base-patch16-224")
24
+
25
+ # load models best-effort
26
+ img_caption = None
27
+ image_classifier = None
28
+ try:
29
+ img_caption = pipeline("image-to-text", model=HF_IMAGE_CAPTION)
30
+ logger.info("Loaded image caption pipeline")
31
+ except Exception:
32
+ try:
33
+ img_caption = pipeline("image-captioning", model=HF_IMAGE_CAPTION)
34
+ logger.info("Loaded image caption pipeline (fallback name)")
35
+ except Exception as e:
36
+ logger.warning("Image caption pipeline unavailable: %s", e)
37
+ img_caption = None
38
+
39
+ try:
40
+ image_classifier = pipeline("image-classification", model=HF_IMAGE_CLASSIFIER)
41
+ logger.info("Loaded image-classification pipeline")
42
+ except Exception as e:
43
+ logger.warning("Image classifier unavailable: %s", e)
44
+ image_classifier = None
45
+
46
+ def fetch_image_bytes(url: str, timeout: int = 12) -> Tuple[Optional[Image.Image], Optional[bytes], Optional[str]]:
47
+ headers = {"User-Agent": "Mozilla/5.0", "Referer": urlparse(url).scheme + "://" + (urlparse(url).hostname or "")}
48
+ try:
49
+ r = requests.get(url, timeout=timeout, headers=headers, allow_redirects=True)
50
+ r.raise_for_status()
51
+ b = r.content
52
+ try:
53
+ img = Image.open(BytesIO(b)).convert("RGB")
54
+ return img, b, None
55
+ except Exception as e:
56
+ logger.warning("PIL open failed for %s: %s", url, e)
57
+ return None, b, f"PIL open error: {e}"
58
+ except Exception as e:
59
+ logger.error("fetch_image_bytes failed for %s: %s", url, e)
60
+ return None, None, str(e)
61
+
62
+ def extract_exif(img: Image.Image) -> dict:
63
+ out = {}
64
+ try:
65
+ raw = img._getexif()
66
+ if not raw:
67
+ return {}
68
+ for tag_id, val in raw.items():
69
+ tag = ExifTags.TAGS.get(tag_id, tag_id)
70
+ out[tag] = val
71
+ except Exception:
72
+ pass
73
+ return out
74
+
75
+ def image_ocr_text(img: Image.Image) -> Optional[str]:
76
+ if not pytesseract:
77
+ return None
78
+ try:
79
+ return pytesseract.image_to_string(img).strip()
80
+ except Exception:
81
+ return None
82
+
83
+ def hf_image_caption(img: Image.Image) -> Optional[str]:
84
+ if not img_caption:
85
+ return None
86
+ try:
87
+ out = img_caption(img)
88
+ if isinstance(out, list) and out:
89
+ first = out[0]
90
+ if isinstance(first, dict):
91
+ return first.get("generated_text") or first.get("caption") or str(first)
92
+ return str(first)
93
+ return str(out)
94
+ except Exception:
95
+ logger.exception("image_captioning failed")
96
+ return None
97
+
98
+ def hf_image_classify(img: Image.Image) -> list:
99
+ results = []
100
+ if not image_classifier or not isinstance(img, Image.Image):
101
+ logger.warning("Image classifier unavailable or invalid image")
102
+ return results
103
+ try:
104
+ img_resized = img.resize((224, 224))
105
+ out = image_classifier(img_resized, top_k=3)
106
+ if isinstance(out, list):
107
+ for r in out:
108
+ if isinstance(r, dict):
109
+ results.append({"label": str(r.get("label", "unknown")), "score": float(r.get("score", 0))})
110
+ else:
111
+ results.append({"label": str(r), "score": None})
112
+ except Exception as e:
113
+ logger.exception("image_classify failed: %s", e)
114
+ return results
115
+
116
+ # Video helpers (optional, lightweight)
117
+ def extract_video_keyframes(video_path: str, max_frames: int = 8) -> list:
118
+ """
119
+ Try to extract keyframes using opencv if available.
120
+ Returns list of PIL.Image frames (may be empty if opencv not installed).
121
+ """
122
+ try:
123
+ import cv2
124
+ except Exception:
125
+ logger.info("opencv not available; skipping video frame extraction")
126
+ return []
127
+
128
+ frames = []
129
+ try:
130
+ cap = cv2.VideoCapture(video_path)
131
+ total = int(cap.get(cv2.CAP_PROP_FRAME_COUNT) or 0)
132
+ if total <= 0:
133
+ cap.release()
134
+ return frames
135
+ step = max(1, total // max_frames)
136
+ idx = 0
137
+ while len(frames) < max_frames:
138
+ cap.set(cv2.CAP_PROP_POS_FRAMES, idx)
139
+ ret, frame = cap.read()
140
+ if not ret:
141
+ break
142
+ # convert BGR -> RGB
143
+ frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
144
+ pil = Image.fromarray(frame)
145
+ frames.append(pil)
146
+ idx += step
147
+ cap.release()
148
+ except Exception:
149
+ logger.exception("extract_video_keyframes failed")
150
+ return frames