Spaces:
Sleeping
Sleeping
File size: 6,753 Bytes
2758540 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 | """
vision/pipeline.py - Full Vision Inference Pipeline
Orchestrates detection β tracking β ReID β attribute recognition per frame
"""
import time
import base64
import numpy as np
import uuid
from io import BytesIO
from PIL import Image
from typing import Dict, List, Optional, Any
from loguru import logger
from vision.detector import PersonDetector
from vision.tracker import TrackerManager
from vision.reid import PersonReID
from vision.attributes import AttributeRecognizer
from config import settings
class VisionPipeline:
"""
End-to-end vision pipeline for a single frame from a camera.
Components initialized lazily (singletons shared across requests).
"""
def __init__(self):
logger.info("Initializing VisionPipeline...")
self.detector = PersonDetector()
self.tracker_manager = TrackerManager()
self.reid = PersonReID()
self.attributes = AttributeRecognizer()
self._frame_counts: Dict[str, int] = {}
self._fps_timers: Dict[str, List[float]] = {}
logger.info("β
VisionPipeline ready.")
def _decode_image(self, image_input: Any) -> Image.Image:
"""Accept PIL Image, numpy array, bytes, or base64 string."""
if isinstance(image_input, Image.Image):
return image_input.convert("RGB")
if isinstance(image_input, np.ndarray):
return Image.fromarray(image_input).convert("RGB")
if isinstance(image_input, bytes):
return Image.open(BytesIO(image_input)).convert("RGB")
if isinstance(image_input, str):
# base64 encoded
data = base64.b64decode(image_input)
return Image.open(BytesIO(data)).convert("RGB")
raise ValueError(f"Unsupported image type: {type(image_input)}")
def _compute_fps(self, camera_id: str) -> float:
"""Compute rolling FPS over last 30 frames."""
now = time.perf_counter()
if camera_id not in self._fps_timers:
self._fps_timers[camera_id] = []
self._fps_timers[camera_id].append(now)
if len(self._fps_timers[camera_id]) > 30:
self._fps_timers[camera_id].pop(0)
times = self._fps_timers[camera_id]
if len(times) < 2:
return 0.0
return round((len(times) - 1) / (times[-1] - times[0]), 2)
def process_frame(
self,
image_input: Any,
camera_id: str,
run_attributes: bool = True,
run_reid: bool = True,
reid_threshold: float = 0.85,
) -> Dict:
"""
Full pipeline for a single frame.
Args:
image_input: PIL Image | numpy array | bytes | base64 str
camera_id: unique camera identifier
run_attributes: whether to run CLIP attribute recognition
run_reid: whether to run ReID matching
reid_threshold: cosine similarity threshold for ReID
Returns:
Result dict with detections, tracks, reid matches, attributes, and latency breakdown.
"""
t_start = time.perf_counter()
# 1. Decode image
image = self._decode_image(image_input)
w, h = image.size
# 2. Detection
detections, det_ms = self.detector.detect(image)
# 3. Tracking
t_track = time.perf_counter()
tracks = self.tracker_manager.update(camera_id, detections)
track_ms = (time.perf_counter() - t_track) * 1000
# 4. Per-person: ReID + Attributes
persons_data = []
for track in tracks:
bbox = track["bbox"]
# Crop person region
try:
crop = PersonDetector.crop_person(image, bbox)
if crop.width < 10 or crop.height < 10:
continue
except Exception:
continue
person_entry: Dict = {
"track_id": track["track_id"],
"bbox": bbox,
"score": track["score"],
"camera_id": camera_id,
"reid_matches": [],
"attributes": {},
"is_new_person": False,
"assigned_person_id": None,
}
# 4a. ReID β try to match against gallery
if run_reid:
t_reid = time.perf_counter()
reid_matches = self.reid.search(crop, top_k=3, similarity_threshold=reid_threshold)
person_entry["reid_matches"] = reid_matches
person_entry["reid_ms"] = round((time.perf_counter() - t_reid) * 1000, 2)
if reid_matches:
person_entry["assigned_person_id"] = reid_matches[0]["person_id"]
else:
# New person β register in gallery with temporary UUID
new_pid = str(uuid.uuid4())
faiss_id = self.reid.add_person(crop, new_pid, camera_id)
person_entry["assigned_person_id"] = new_pid
person_entry["is_new_person"] = True
person_entry["faiss_id"] = faiss_id
import os
os.makedirs("static/thumbnails", exist_ok=True)
try:
crop.save(f"static/thumbnails/{new_pid}.jpg", "JPEG", quality=85)
except Exception as e:
logger.warning(f"Failed to save thumbnail for {new_pid}: {e}")
# 4b. Attribute recognition
if run_attributes:
t_attr = time.perf_counter()
attrs = self.attributes.recognize(crop)
person_entry["attributes"] = attrs
person_entry["attr_ms"] = round((time.perf_counter() - t_attr) * 1000, 2)
# Also store visual embedding for attribute-based search
if run_reid:
self.attributes.add_to_gallery(crop, person_entry["assigned_person_id"])
persons_data.append(person_entry)
total_ms = (time.perf_counter() - t_start) * 1000
fps = self._compute_fps(camera_id)
self._frame_counts[camera_id] = self._frame_counts.get(camera_id, 0) + 1
return {
"camera_id": camera_id,
"frame_id": self._frame_counts[camera_id],
"image_size": {"width": w, "height": h},
"persons": persons_data,
"person_count": len(persons_data),
"detection_count": len(detections),
"latency": {
"detection_ms": round(det_ms, 2),
"tracking_ms": round(track_ms, 2),
"total_ms": round(total_ms, 2),
},
"fps": fps,
"timestamp": time.time(),
}
|