PhantomEye / core /analytics.py
Abu-Sameer-66's picture
Upload folder using huggingface_hub
0ab99bb verified
import cv2
import time
import numpy as np
from pathlib import Path
from collections import defaultdict
import sys
sys.path.append(str(Path(__file__).resolve().parent.parent))
from core.detection import PersonDetector
from core.tracker import ByteTracker
from config import OUTPUTS_DIR, HEATMAP_ALPHA, DWELL_TIME_THRESHOLD
class BehavioralAnalyzer:
def __init__(self, frame_width: int, frame_height: int, fps: int = 25):
self.width = frame_width
self.height = frame_height
self.fps = fps
self.heatmap = np.zeros((frame_height, frame_width), dtype=np.float32)
self.dwell = defaultdict(int)
self.last_pos = {}
self.alerts = []
self.frame_num = 0
print("[PhantomEye] Behavioral analyzer ready")
def update(self, active_tracks: list):
self.frame_num += 1
for trk in active_tracks:
cx, cy = trk.center()
cx = max(0, min(cx, self.width - 1))
cy = max(0, min(cy, self.height - 1))
cv2.circle(
self.heatmap,
(cx, cy), 25, 1.0, -1
)
self.dwell[trk.track_id] += 1
dwell_secs = self.dwell[trk.track_id] / self.fps
if dwell_secs >= DWELL_TIME_THRESHOLD:
already = any(
a["id"] == trk.track_id and a["type"] == "loitering"
for a in self.alerts
)
if not already:
self.alerts.append({
"type" : "loitering",
"id" : trk.track_id,
"bbox" : trk.bbox,
"dwell_sec": round(dwell_secs, 1),
"frame" : self.frame_num,
})
def get_heatmap_overlay(self, frame: np.ndarray) -> np.ndarray:
if self.heatmap.max() == 0:
return frame
normalized = cv2.normalize(
self.heatmap, None, 0, 255, cv2.NORM_MINMAX
)
heat_uint8 = normalized.astype(np.uint8)
heat_color = cv2.applyColorMap(heat_uint8, cv2.COLORMAP_JET)
blurred = cv2.GaussianBlur(heat_color, (31, 31), 0)
mask = heat_uint8 > 10
output = frame.copy()
output[mask] = cv2.addWeighted(
frame, 1 - HEATMAP_ALPHA,
blurred, HEATMAP_ALPHA, 0
)[mask]
return output
def draw_alerts(self, frame: np.ndarray, active_tracks: list) -> np.ndarray:
out = frame.copy()
active_ids = {t.track_id for t in active_tracks}
recent = [
a for a in self.alerts
if a["id"] in active_ids and a["type"] == "loitering"
]
for alert in recent:
x1, y1, x2, y2 = alert["bbox"]
cv2.rectangle(out, (x1, y1), (x2, y2), (0, 0, 255), 3)
label = f"ALERT ID:{alert['id']} {alert['dwell_sec']}s"
cv2.putText(
out, label,
(x1, y1 - 10),
cv2.FONT_HERSHEY_SIMPLEX, 0.6,
(0, 0, 255), 2, cv2.LINE_AA
)
return out
def draw_dwell_info(self, frame: np.ndarray, active_tracks: list) -> np.ndarray:
out = frame.copy()
for trk in active_tracks:
x1, y1, x2, y2 = trk.bbox
secs = round(self.dwell[trk.track_id] / self.fps, 1)
cv2.putText(
out,
f"{secs}s",
(x1, y2 + 16),
cv2.FONT_HERSHEY_SIMPLEX, 0.45,
(255, 255, 0), 1, cv2.LINE_AA
)
return out
def summary(self) -> dict:
if not self.dwell:
return {}
dwell_secs = {
k: round(v / self.fps, 1)
for k, v in self.dwell.items()
}
return {
"total_persons" : len(self.dwell),
"total_alerts" : len(self.alerts),
"avg_dwell_sec" : round(
sum(dwell_secs.values()) / len(dwell_secs), 2
),
"max_dwell_sec" : max(dwell_secs.values()),
"loiterers" : [
a["id"] for a in self.alerts
if a["type"] == "loitering"
],
}
def run_analytics(video_path: str, save: bool = True, show: bool = True):
video_path = Path(video_path)
if not video_path.exists():
print(f"[ERROR] Video not found: {video_path}")
return
detector = PersonDetector()
tracker = ByteTracker()
cap = cv2.VideoCapture(str(video_path))
if not cap.isOpened():
print(f"[ERROR] Cannot open: {video_path}")
return
fps = int(cap.get(cv2.CAP_PROP_FPS)) or 25
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
total = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
analyzer = BehavioralAnalyzer(width, height, fps)
print(f"[PhantomEye] {video_path.name}{width}x{height} {fps}fps")
writer = None
if save:
OUTPUTS_DIR.mkdir(parents=True, exist_ok=True)
out_path = OUTPUTS_DIR / (video_path.stem + "_analytics.mp4")
writer = cv2.VideoWriter(
str(out_path),
cv2.VideoWriter_fourcc(*"mp4v"),
fps, (width, height)
)
print(f"[PhantomEye] Saving to: {out_path}")
show_heat = False
start_time = time.time()
while True:
ret, frame = cap.read()
if not ret:
break
detections = detector.detect(frame)
active_tracks = tracker.update(detections)
analyzer.update(active_tracks)
if show_heat:
display = analyzer.get_heatmap_overlay(frame)
else:
display = frame.copy()
display = tracker.draw(display, active_tracks)
display = analyzer.draw_dwell_info(display, active_tracks)
display = analyzer.draw_alerts(display, active_tracks)
mode_text = "MODE: HEATMAP [H]" if show_heat else "MODE: TRACKING [H]"
cv2.putText(
display, mode_text,
(width - 220, height - 12),
cv2.FONT_HERSHEY_SIMPLEX, 0.45,
(200, 200, 200), 1, cv2.LINE_AA
)
if writer:
writer.write(display)
if show:
cv2.imshow("PhantomEye — Analytics [H=heatmap Q=quit]", display)
key = cv2.waitKey(1) & 0xFF
if key == ord("q"):
break
elif key == ord("h"):
show_heat = not show_heat
if tracker.frame_count % 30 == 0:
print(
f"\r[Frame {tracker.frame_count}/{total}]"
f" Active: {len(active_tracks)}"
f" Alerts: {len(analyzer.alerts)}"
f" Time: {time.time()-start_time:.1f}s",
end=""
)
cap.release()
if writer:
writer.release()
cv2.destroyAllWindows()
summary = analyzer.summary()
print(f"\n\n[PhantomEye] Analytics done!")
print(f" Total persons tracked : {summary.get('total_persons', 0)}")
print(f" Avg dwell time : {summary.get('avg_dwell_sec', 0)}s")
print(f" Max dwell time : {summary.get('max_dwell_sec', 0)}s")
print(f" Loitering alerts : {summary.get('total_alerts', 0)}")
if __name__ == "__main__":
import sys
if len(sys.argv) < 2:
print("Usage: python core/analytics.py <video_path>")
else:
run_analytics(sys.argv[1])