Mateo's picture
filter detection by filter
5bd4325
import logging
import os
import shutil
import subprocess
import tempfile
import time
from hashlib import sha1
from collections import deque
from contextlib import contextmanager
import cv2
import numpy as np
import streamlit as st
from PIL import Image, ImageDraw
from vision import Classifier
from utils import box_iou, nms
LOGGER = logging.getLogger(__name__)
PYRONEAR_LOGO_URL = (
"https://raw.githubusercontent.com/pyronear/pyro-engine/develop/docs/source/_static/img/pyronear-logo-dark.png"
)
DEFAULT_SPLIT_CFG = {
"n_samples": 16,
"max_w": 400,
"crop_y": (0.25, 0.90),
"dx_threshold_px": 1.5,
"min_inlier_ratio": 0.20,
"min_stable_frames": 2,
"smooth_window": 2,
"orb_nfeatures": 800,
"orb_fast_threshold": 12,
"min_matches": 25,
"keep_ratio": 0.4,
"jump_meanabs_threshold": 18.0,
"progress_every": 0,
}
ENABLE_MOTION_SEGMENTATION = os.getenv("ENABLE_MOTION_SEGMENTATION", "0").strip().lower() in {
"1",
"true",
"yes",
"on",
}
FAST_N_SAMPLES = max(1, int(os.getenv("FAST_N_SAMPLES", "12")))
INFER_BATCH_SIZE = max(1, int(os.getenv("INFER_BATCH_SIZE", "16")))
MODEL_IMGSZ = max(320, int(os.getenv("MODEL_IMGSZ", "1024")))
MAX_INFER_FRAMES_PER_SPLIT = max(0, int(os.getenv("MAX_INFER_FRAMES_PER_SPLIT", "12")))
MIN_MAIN_MATCH_ABS = max(1, int(os.getenv("MIN_MAIN_MATCH_ABS", "3")))
MIN_MAIN_MATCH_RATIO = float(os.getenv("MIN_MAIN_MATCH_RATIO", "0.20"))
MAIN_DET_MATCH_IOU_THRESHOLD = float(os.getenv("MAIN_DET_MATCH_IOU_THRESHOLD", "0.12"))
MIN_COMBINED_MEDIAN_CONF = float(os.getenv("MIN_COMBINED_MEDIAN_CONF", "0.12"))
DISPLAY_DET_MATCH_IOU_THRESHOLD = float(os.getenv("DISPLAY_DET_MATCH_IOU_THRESHOLD", "0.0"))
def _log_timing_summary(label, stats, wall_time=None, max_items=12):
if not stats:
LOGGER.info("%s timing | no data", label)
return
entries = sorted(
((name, float(value)) for name, value in stats.items() if value is not None),
key=lambda item: item[1],
reverse=True,
)
if wall_time is None:
wall_time = stats.get("wall")
step_entries = [(name, sec) for name, sec in entries if name != "wall"]
parts = []
if wall_time is not None:
parts.append(f"wall={float(wall_time):.3f}s")
for name, sec in step_entries[:max_items]:
if wall_time and wall_time > 0:
parts.append(f"{name}={sec:.3f}s ({(100.0 * sec / float(wall_time)):.1f}%)")
else:
parts.append(f"{name}={sec:.3f}s")
remaining = max(0, len(step_entries) - max_items)
if remaining:
parts.append(f"+{remaining} more")
LOGGER.info("%s timing | %s", label, " | ".join(parts))
def _sample_indices(total, n):
if total <= 0:
return []
if total <= n:
return list(range(total))
return np.linspace(0, total - 1, n).astype(int).tolist()
def _format_idx_list(indices, max_items=40):
if not indices:
return "[]"
values = [int(i) for i in indices]
if len(values) <= max_items:
return str(values)
head = values[: max_items // 2]
tail = values[-(max_items // 2) :]
return f"{head} ... {tail} (len={len(values)})"
def _sample_uniform_items(items, n):
n = max(1, int(n))
if len(items) <= n:
return items
indices = np.linspace(0, len(items) - 1, n).astype(int).tolist()
return [items[i] for i in indices]
def _parse_fraction(value):
if not value:
return None
txt = str(value).strip()
if not txt or txt == "0/0":
return None
if "/" in txt:
num, den = txt.split("/", 1)
try:
den_f = float(den)
if den_f == 0:
return None
return float(num) / den_f
except Exception:
return None
try:
return float(txt)
except Exception:
return None
def _probe_total_frames_ffprobe(video_path):
ffprobe = shutil.which("ffprobe")
if ffprobe is None:
return None
timing = {}
wall_t0 = time.perf_counter()
video_name = os.path.basename(video_path)
# Try direct frame count first.
cmd = [
ffprobe,
"-v",
"error",
"-select_streams",
"v:0",
"-show_entries",
"stream=nb_frames",
"-of",
"default=noprint_wrappers=1:nokey=1",
video_path,
]
with timer("ffprobe_nb_frames", timing):
proc = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, check=False)
if proc.returncode == 0:
raw = proc.stdout.strip()
if raw.isdigit():
val = int(raw)
if val > 0:
timing["wall"] = time.perf_counter() - wall_t0
_log_timing_summary(f"ffprobe ({video_name})", timing, wall_time=timing["wall"])
return val
# Fallback: estimate from duration * avg frame rate.
cmd = [
ffprobe,
"-v",
"error",
"-select_streams",
"v:0",
"-show_entries",
"stream=avg_frame_rate,duration",
"-of",
"default=noprint_wrappers=1:nokey=1",
video_path,
]
with timer("ffprobe_fps_duration", timing):
proc = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, check=False)
if proc.returncode != 0:
timing["wall"] = time.perf_counter() - wall_t0
_log_timing_summary(f"ffprobe ({video_name})", timing, wall_time=timing["wall"])
return None
lines = [line.strip() for line in proc.stdout.splitlines() if line.strip()]
if len(lines) < 2:
timing["wall"] = time.perf_counter() - wall_t0
_log_timing_summary(f"ffprobe ({video_name})", timing, wall_time=timing["wall"])
return None
fps = _parse_fraction(lines[0])
duration = _parse_fraction(lines[1])
if fps is None or duration is None:
timing["wall"] = time.perf_counter() - wall_t0
_log_timing_summary(f"ffprobe ({video_name})", timing, wall_time=timing["wall"])
return None
estimate = int(round(fps * duration))
timing["wall"] = time.perf_counter() - wall_t0
_log_timing_summary(f"ffprobe ({video_name})", timing, wall_time=timing["wall"])
return estimate if estimate > 0 else None
def _probe_duration_ffprobe(video_path):
ffprobe = shutil.which("ffprobe")
if ffprobe is None:
return None
cmd = [
ffprobe,
"-v",
"error",
"-show_entries",
"format=duration",
"-of",
"default=noprint_wrappers=1:nokey=1",
video_path,
]
proc = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, check=False)
if proc.returncode != 0:
return None
lines = [line.strip() for line in proc.stdout.splitlines() if line.strip()]
if not lines:
return None
duration = _parse_fraction(lines[0])
if duration is None or duration <= 0:
return None
return float(duration)
def _probe_video_size_ffprobe(video_path):
ffprobe = shutil.which("ffprobe")
if ffprobe is None:
return None
cmd = [
ffprobe,
"-v",
"error",
"-select_streams",
"v:0",
"-show_entries",
"stream=width,height",
"-of",
"csv=p=0:s=x",
video_path,
]
proc = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, check=False)
if proc.returncode != 0:
return None
line = next((txt.strip() for txt in proc.stdout.splitlines() if txt.strip()), "")
if "x" not in line:
return None
left, right = line.split("x", 1)
if not left.isdigit() or not right.isdigit():
return None
width, height = int(left), int(right)
if width <= 0 or height <= 0:
return None
return width, height
def _extract_bgr_with_ffmpeg_disk(video_path, n):
ffmpeg = shutil.which("ffmpeg")
if ffmpeg is None:
raise RuntimeError("ffmpeg is not available")
timing = {}
wall_t0 = time.perf_counter()
video_name = os.path.basename(video_path)
with timer("probe_total_frames", timing):
total = _probe_total_frames_ffprobe(video_path)
if total is None or total <= 0:
timing["wall"] = time.perf_counter() - wall_t0
_log_timing_summary(f"Frame extraction ({video_name})", timing, wall_time=timing["wall"])
raise RuntimeError("ffprobe could not determine total frame count")
with timer("sample_indices", timing):
indices = _sample_indices(total, int(n))
if not indices:
timing["wall"] = time.perf_counter() - wall_t0
_log_timing_summary(f"Frame extraction ({video_name})", timing, wall_time=timing["wall"])
return []
LOGGER.info(
"Frame extraction | video=%s total_frames=%d n_samples=%d sampled_indices=%s",
os.path.basename(video_path),
total,
len(indices),
_format_idx_list(indices),
)
select_expr = "+".join(f"eq(n\\,{int(i)})" for i in indices)
vf = f"select={select_expr}"
with tempfile.TemporaryDirectory(prefix="ffmpeg_frames_") as tmpdir:
pattern = os.path.join(tmpdir, "frame_%06d.jpg")
cmd = [
ffmpeg,
"-hide_banner",
"-loglevel",
"error",
"-i",
video_path,
"-vf",
vf,
"-vsync",
"vfr",
"-q:v",
"2",
pattern,
]
with timer("ffmpeg_extract", timing):
proc = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, check=False)
if proc.returncode != 0:
timing["wall"] = time.perf_counter() - wall_t0
_log_timing_summary(f"Frame extraction ({video_name})", timing, wall_time=timing["wall"])
raise RuntimeError(proc.stderr.strip() or "ffmpeg extraction failed")
frames = []
with timer("read_extracted_images", timing):
for name in sorted(os.listdir(tmpdir)):
if not name.lower().endswith(".jpg"):
continue
frame = cv2.imread(os.path.join(tmpdir, name), cv2.IMREAD_COLOR)
if frame is not None:
frames.append(frame)
LOGGER.info(
"Frame extraction done | video=%s extracted=%d requested=%d",
os.path.basename(video_path),
len(frames),
len(indices),
)
timing["wall"] = time.perf_counter() - wall_t0
_log_timing_summary(f"Frame extraction ({video_name})", timing, wall_time=timing["wall"])
return frames
def _extract_bgr_with_ffmpeg(video_path, n):
ffmpeg = shutil.which("ffmpeg")
if ffmpeg is None:
raise RuntimeError("ffmpeg is not available")
n = max(1, int(n))
timing = {}
wall_t0 = time.perf_counter()
video_name = os.path.basename(video_path)
with timer("probe_duration", timing):
duration = _probe_duration_ffprobe(video_path)
if duration is None or duration <= 0:
LOGGER.warning("Frame extraction | ffprobe duration unavailable, fallback to disk extraction")
with timer("fallback_disk_extract", timing):
frames = _extract_bgr_with_ffmpeg_disk(video_path, n)
timing["wall"] = time.perf_counter() - wall_t0
_log_timing_summary(f"Frame extraction ({video_name})", timing, wall_time=timing["wall"])
return frames
with timer("probe_video_size", timing):
video_size = _probe_video_size_ffprobe(video_path)
if video_size is None:
LOGGER.warning("Frame extraction | ffprobe size unavailable, fallback to disk extraction")
with timer("fallback_disk_extract", timing):
frames = _extract_bgr_with_ffmpeg_disk(video_path, n)
timing["wall"] = time.perf_counter() - wall_t0
_log_timing_summary(f"Frame extraction ({video_name})", timing, wall_time=timing["wall"])
return frames
width, height = video_size
frame_size = int(width) * int(height) * 3
if frame_size <= 0:
LOGGER.warning("Frame extraction | invalid frame size, fallback to disk extraction")
with timer("fallback_disk_extract", timing):
frames = _extract_bgr_with_ffmpeg_disk(video_path, n)
timing["wall"] = time.perf_counter() - wall_t0
_log_timing_summary(f"Frame extraction ({video_name})", timing, wall_time=timing["wall"])
return frames
sample_fps = max(1e-6, float(n) / float(duration))
LOGGER.info(
"Frame extraction (single ffmpeg/rawvideo) | video=%s duration=%.3fs n_samples=%d fps=%.6f size=%dx%d",
video_name,
duration,
n,
sample_fps,
width,
height,
)
cmd = [
ffmpeg,
"-hide_banner",
"-loglevel",
"error",
"-i",
video_path,
"-vf",
f"fps={sample_fps:.8f}",
"-frames:v",
str(n),
"-f",
"rawvideo",
"-pix_fmt",
"bgr24",
"-",
]
with timer("ffmpeg_extract_rawvideo", timing):
proc = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=False)
if proc.returncode != 0 or not proc.stdout:
LOGGER.warning(
"Frame extraction rawvideo failed | video=%s err=%s",
video_name,
(proc.stderr.decode("utf-8", errors="ignore").strip() if proc.stderr else "no stderr"),
)
with timer("fallback_disk_extract", timing):
frames = _extract_bgr_with_ffmpeg_disk(video_path, n)
timing["wall"] = time.perf_counter() - wall_t0
_log_timing_summary(f"Frame extraction ({video_name})", timing, wall_time=timing["wall"])
return frames
with timer("decode_rawvideo", timing):
raw = proc.stdout
frame_count = len(raw) // frame_size
usable_bytes = frame_count * frame_size
if frame_count > 0 and usable_bytes:
arr = np.frombuffer(raw[:usable_bytes], dtype=np.uint8).reshape(frame_count, height, width, 3)
frames = [arr[idx].copy() for idx in range(frame_count)]
else:
frames = []
if len(frames) > n:
frames = _sample_uniform_items(frames, n)
if not frames:
LOGGER.warning("Frame extraction | rawvideo mode returned 0 frame, fallback to disk extraction")
with timer("fallback_disk_extract", timing):
frames = _extract_bgr_with_ffmpeg_disk(video_path, n)
LOGGER.info(
"Frame extraction done | video=%s extracted=%d requested=%d",
video_name,
len(frames),
n,
)
timing["wall"] = time.perf_counter() - wall_t0
_log_timing_summary(f"Frame extraction ({video_name})", timing, wall_time=timing["wall"])
return frames
def _extract_with_ffmpeg(video_path, n):
timing = {}
wall_t0 = time.perf_counter()
with timer("extract_bgr", timing):
frames = _extract_bgr_with_ffmpeg(video_path, n)
with timer("bgr_to_pil", timing):
pil_frames = [Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)) for frame in frames]
timing["wall"] = time.perf_counter() - wall_t0
_log_timing_summary("Extract with ffmpeg", timing, wall_time=timing["wall"])
return pil_frames
def split_video(video_path, n=8):
if not video_path or not os.path.exists(video_path):
return []
timing = {}
wall_t0 = time.perf_counter()
with timer("extract_with_ffmpeg", timing):
frames = _extract_with_ffmpeg(video_path, n)
timing["wall"] = time.perf_counter() - wall_t0
_log_timing_summary("split_video", timing, wall_time=timing["wall"])
return frames
@contextmanager
def timer(name, stats):
t0 = time.perf_counter()
yield
stats[name] = stats.get(name, 0.0) + (time.perf_counter() - t0)
def _iter_sampled_frames(video_path, n_samples, sampled_frames=None):
timing = {}
wall_t0 = time.perf_counter()
if sampled_frames is None:
with timer("extract_bgr_with_ffmpeg", timing):
frames = _extract_bgr_with_ffmpeg(video_path, int(n_samples))
else:
with timer("reuse_sampled_frames", timing):
frames = sampled_frames
timing["wall"] = time.perf_counter() - wall_t0
_log_timing_summary("Iter sampled frames", timing, wall_time=timing["wall"])
for out_idx, frame in enumerate(frames):
yield out_idx, frame
def iter_frames(video_path, n_samples, max_w, crop_y, sampled_frames=None):
timing = {"resize": 0.0, "crop": 0.0}
wall_t0 = time.perf_counter()
frame_count = 0
try:
for out_idx, frame in _iter_sampled_frames(video_path, n_samples, sampled_frames=sampled_frames):
frame_count += 1
proc = frame
if max_w > 0 and proc.shape[1] != max_w:
t_resize = time.perf_counter()
scale = max_w / float(proc.shape[1])
proc = cv2.resize(
proc,
(max_w, int(proc.shape[0] * scale)),
interpolation=cv2.INTER_AREA,
)
timing["resize"] += time.perf_counter() - t_resize
if crop_y is not None:
t_crop = time.perf_counter()
h = proc.shape[0]
y0 = int(max(0.0, min(1.0, float(crop_y[0]))) * h)
y1 = int(max(0.0, min(1.0, float(crop_y[1]))) * h)
if y1 > y0:
proc = proc[y0:y1, :]
timing["crop"] += time.perf_counter() - t_crop
yield out_idx, proc
finally:
timing["wall"] = time.perf_counter() - wall_t0
LOGGER.info(
"iter_frames summary | n_samples=%d yielded=%d max_w=%d crop_y=%s",
int(n_samples),
frame_count,
int(max_w),
crop_y,
)
_log_timing_summary("iter_frames", timing, wall_time=timing["wall"])
def quick_jump_score(prev_gray, gray, small_w=160):
h, w = prev_gray.shape[:2]
if w > small_w:
scale = small_w / float(w)
prev_s = cv2.resize(prev_gray, (small_w, int(h * scale)), interpolation=cv2.INTER_AREA)
gray_s = cv2.resize(gray, (small_w, int(h * scale)), interpolation=cv2.INTER_AREA)
else:
prev_s = prev_gray
gray_s = gray
diff = cv2.absdiff(prev_s, gray_s)
return float(np.mean(diff))
def estimate_dx_orb_affine(prev_gray, gray, orb, bf, min_matches, keep_ratio, timing_pair):
with timer("orb_detect_compute", timing_pair):
kp1, des1 = orb.detectAndCompute(prev_gray, None)
kp2, des2 = orb.detectAndCompute(gray, None)
if des1 is None or des2 is None or len(kp1) < 8 or len(kp2) < 8:
return None
with timer("bf_match", timing_pair):
matches = bf.match(des1, des2)
if len(matches) < min_matches:
return None
with timer("match_sort_filter", timing_pair):
matches = sorted(matches, key=lambda m: m.distance)
keep_n = max(8, int(len(matches) * keep_ratio))
matches = matches[:keep_n]
pts1 = np.float32([kp1[m.queryIdx].pt for m in matches])
pts2 = np.float32([kp2[m.trainIdx].pt for m in matches])
with timer("ransac_affine", timing_pair):
M, inliers = cv2.estimateAffinePartial2D(
pts1,
pts2,
method=cv2.RANSAC,
ransacReprojThreshold=3.0,
maxIters=1500,
confidence=0.99,
)
if M is None:
return None
dx = float(M[0, 2])
dy = float(M[1, 2])
inlier_ratio = float(np.mean(inliers)) if inliers is not None else 0.0
return {
"dx": dx,
"dy": dy,
"score_dx": float(abs(dx)),
"score_px": float(np.hypot(dx, dy)),
"inlier_ratio": inlier_ratio,
"matches": len(matches),
"M": M,
}
def split_video_into_stable_segments_fast(
video_path,
n_samples=16,
max_w=400,
crop_y=(0.25, 0.90),
dx_threshold_px=1.5,
min_inlier_ratio=0.20,
min_stable_frames=2,
smooth_window=2,
orb_nfeatures=800,
orb_fast_threshold=12,
min_matches=25,
keep_ratio=0.4,
jump_meanabs_threshold=18.0,
progress_every=200,
sampled_frames=None,
):
wall_t0 = time.perf_counter()
timing_total = {}
timing_pair = {}
with timer("setup", timing_total):
orb = cv2.ORB_create(nfeatures=orb_nfeatures, fastThreshold=orb_fast_threshold)
bf = cv2.BFMatcher(cv2.NORM_HAMMING, crossCheck=True)
metrics = []
prev_gray = None
frame_count = 0
with timer("loop_total", timing_total):
for _, frame in iter_frames(
video_path,
n_samples=n_samples,
max_w=max_w,
crop_y=crop_y,
sampled_frames=sampled_frames,
):
frame_count += 1
with timer("to_gray", timing_total):
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
if prev_gray is not None:
with timer("quick_jump", timing_total):
q = quick_jump_score(prev_gray, gray)
if q >= jump_meanabs_threshold:
metrics.append(
{
"dx": np.nan,
"dy": np.nan,
"score_dx": 1e9,
"score_px": 1e9,
"inlier_ratio": 0.0,
"matches": 0,
"M": None,
"quick_jump": q,
}
)
else:
m = estimate_dx_orb_affine(
prev_gray,
gray,
orb=orb,
bf=bf,
min_matches=min_matches,
keep_ratio=keep_ratio,
timing_pair=timing_pair,
)
if m is None:
metrics.append(
{
"dx": np.nan,
"dy": np.nan,
"score_dx": 1e9,
"score_px": 1e9,
"inlier_ratio": 0.0,
"matches": 0,
"M": None,
"quick_jump": q,
}
)
else:
m["quick_jump"] = q
metrics.append(m)
if progress_every and (len(metrics) % progress_every == 0):
print(f"processed pairs: {len(metrics)}")
prev_gray = gray
if frame_count < 2:
timing_total["wall"] = time.perf_counter() - wall_t0
_log_timing_summary("Segmentation total", timing_total, wall_time=timing_total["wall"])
if timing_pair:
_log_timing_summary(
"Segmentation pair internals",
timing_pair,
wall_time=max(timing_total.get("loop_total", 0.0), 1e-9),
)
return [], metrics, [], {"total": timing_total, "per_pair": timing_pair}
with timer("post_smooth", timing_total):
raw_dx = [m["score_dx"] for m in metrics]
raw_inlier = [m["inlier_ratio"] for m in metrics]
smoothed_dx = []
q = deque(maxlen=max(1, int(smooth_window)))
for v in raw_dx:
if not np.isfinite(v):
q.clear()
smoothed_dx.append(np.nan)
else:
q.append(v)
smoothed_dx.append(float(np.mean(q)))
with timer("post_segments", timing_total):
min_len = max(1, int(min_stable_frames))
stable_flags = []
for dx_s, r in zip(smoothed_dx, raw_inlier):
if not np.isfinite(dx_s):
stable_flags.append(False)
else:
stable_flags.append((dx_s < dx_threshold_px) and (r >= min_inlier_ratio))
segments = []
start = None
for i, is_stable in enumerate(stable_flags):
if is_stable and start is None:
start = i
if (not is_stable) and start is not None:
end = i
if (end - start) >= min_len:
segments.append((start, end))
start = None
if start is not None:
end = len(stable_flags)
if (end - start) >= min_len:
segments.append((start, end))
LOGGER.info(
"Segmentation summary | sampled_frames=%d pair_metrics=%d stable_segments=%d",
frame_count,
len(metrics),
len(segments),
)
if segments:
LOGGER.info("Segment ranges (sample indices) | %s", segments)
timing_total["wall"] = time.perf_counter() - wall_t0
_log_timing_summary("Segmentation total", timing_total, wall_time=timing_total["wall"])
if timing_pair:
_log_timing_summary(
"Segmentation pair internals",
timing_pair,
wall_time=max(timing_total.get("loop_total", 0.0), 1e-9),
)
return segments, metrics, smoothed_dx, {"total": timing_total, "per_pair": timing_pair}
def _bgr_to_pil(frame):
return Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
def extract_segment_frames(video_path, segments, n_samples, sampled_frames=None):
timing = {}
wall_t0 = time.perf_counter()
if not segments:
LOGGER.info("Segment frame extraction | no segments found")
timing["wall"] = time.perf_counter() - wall_t0
_log_timing_summary("Segment frame extraction", timing, wall_time=timing["wall"])
return []
with timer("normalize_segments", timing):
normalized_segments = []
for start, end in segments:
s = max(0, int(start))
e = max(s, int(end))
normalized_segments.append((s, e))
with timer("prepare_groups", timing):
normalized_segments.sort(key=lambda x: x[0])
grouped_frames = [[] for _ in normalized_segments]
grouped_indices = [[] for _ in normalized_segments]
segment_idx = 0
# Detection runs on original sampled frames (no resize / no crop).
to_pil_time = 0.0
with timer("assign_frames_to_segments", timing):
for frame_idx, frame in _iter_sampled_frames(video_path, n_samples=n_samples, sampled_frames=sampled_frames):
while segment_idx < len(normalized_segments) and frame_idx > normalized_segments[segment_idx][1]:
segment_idx += 1
if segment_idx >= len(normalized_segments):
break
seg_start, seg_end = normalized_segments[segment_idx]
if seg_start <= frame_idx <= seg_end:
t_pil = time.perf_counter()
grouped_frames[segment_idx].append(_bgr_to_pil(frame))
to_pil_time += time.perf_counter() - t_pil
grouped_indices[segment_idx].append(frame_idx)
timing["to_pil"] = to_pil_time
LOGGER.info(
"Segment frame extraction summary | segments=%d n_samples=%d",
len(normalized_segments),
n_samples,
)
for seg_i, ((seg_start, seg_end), idx_list, frames) in enumerate(
zip(normalized_segments, grouped_indices, grouped_frames),
start=1,
):
LOGGER.info(
"Segment %d | requested_range=[%d,%d] matched_frames=%d matched_indices=%s",
seg_i,
seg_start,
seg_end,
len(frames),
_format_idx_list(idx_list),
)
timing["wall"] = time.perf_counter() - wall_t0
_log_timing_summary("Segment frame extraction", timing, wall_time=timing["wall"])
return [frames for frames in grouped_frames if frames]
def split_video_stable(video_path, split_cfg=None, fallback_n=16):
if not video_path or not os.path.exists(video_path):
return []
timing = {}
wall_t0 = time.perf_counter()
cfg = DEFAULT_SPLIT_CFG.copy()
if split_cfg:
cfg.update(split_cfg)
LOGGER.info("Split config | %s", cfg)
with timer("extract_sampled_frames", timing):
sampled_frames = _extract_bgr_with_ffmpeg(video_path, int(cfg["n_samples"]))
with timer("split_video_into_stable_segments_fast", timing):
segments, _, _, _ = split_video_into_stable_segments_fast(video_path, sampled_frames=sampled_frames, **cfg)
with timer("extract_segment_frames", timing):
frame_groups = extract_segment_frames(
video_path,
segments,
n_samples=cfg["n_samples"],
sampled_frames=sampled_frames,
)
if frame_groups:
LOGGER.info(
"Split result | stable_splits=%d split_frame_counts=%s",
len(frame_groups),
[len(group) for group in frame_groups],
)
timing["wall"] = time.perf_counter() - wall_t0
_log_timing_summary("split_video_stable", timing, wall_time=timing["wall"])
return frame_groups
LOGGER.info("Split result | no stable segment, using fallback sampling n=%d", fallback_n)
if int(fallback_n) == int(cfg["n_samples"]):
with timer("fallback_reuse_sampled_frames", timing):
fallback_frames = [_bgr_to_pil(frame) for frame in sampled_frames]
else:
with timer("fallback_split_video", timing):
fallback_frames = split_video(video_path, n=fallback_n)
LOGGER.info("Fallback frame count | %d", len(fallback_frames))
timing["wall"] = time.perf_counter() - wall_t0
_log_timing_summary("split_video_stable", timing, wall_time=timing["wall"])
return [fallback_frames] if fallback_frames else []
@st.cache_resource(show_spinner=False)
def _load_model():
model_t0 = time.perf_counter()
clf = Classifier(format="onnx", conf=0.05, imgsz=MODEL_IMGSZ)
LOGGER.info("Model init timing | wall=%.3fs", time.perf_counter() - model_t0)
LOGGER.info("Model config | imgsz=%d", MODEL_IMGSZ)
return clf
model = _load_model()
def _resolve_video_path(video_input):
if not video_input:
return None
if isinstance(video_input, str):
return video_input
if isinstance(video_input, dict):
for key in ("name", "path", "data", "video"):
value = video_input.get(key)
if isinstance(value, str) and os.path.exists(value):
return value
if isinstance(video_input, (list, tuple)):
for value in video_input:
if isinstance(value, str) and os.path.exists(value):
return value
return None
def _draw_detections(pil_img, preds, subtitle=None):
img = pil_img.copy()
draw = ImageDraw.Draw(img)
width, height = img.size
color = (255, 80, 0)
preds = np.asarray(preds)
for x1, y1, x2, y2, conf in preds:
x1 = int(max(0.0, min(1.0, float(x1))) * width)
y1 = int(max(0.0, min(1.0, float(y1))) * height)
x2 = int(max(0.0, min(1.0, float(x2))) * width)
y2 = int(max(0.0, min(1.0, float(y2))) * height)
draw.rectangle([x1, y1, x2, y2], outline=color, width=3)
draw.text((x1 + 4, y1 + 4), f"{conf:.2f}", fill=color)
draw.text((6, 6), f"detections : {len(preds)}", fill=color)
if subtitle:
draw.text((6, 26), subtitle, fill=color)
return img
def _combine_predictions_per_split(frame_preds):
n_frames = len(frame_preds)
if n_frames == 0:
return []
boxes = np.zeros((0, 5), dtype=np.float64)
for bbox in frame_preds:
if bbox.size > 0:
boxes = np.vstack([boxes, bbox])
if boxes.size == 0:
return []
main_bboxes = np.asarray(nms(boxes), dtype=np.float64)
if main_bboxes.size == 0:
return []
n_main = len(main_bboxes)
matches_per_main = np.zeros(n_main, dtype=int)
conf_max_per_main = np.zeros(n_main, dtype=np.float64)
matched_conf_values_per_main = [[] for _ in range(n_main)]
matched_frame_indices_per_main = [[] for _ in range(n_main)]
first_match_frame_idx_per_main = [None for _ in range(n_main)]
first_match_bbox_per_main = [None for _ in range(n_main)]
for frame_idx, bbox in enumerate(frame_preds):
if bbox.size == 0:
continue
ious = box_iou(bbox[:, :4], main_bboxes[:, :4])
match_mask = ious >= MAIN_DET_MATCH_IOU_THRESHOLD
has_match = match_mask.any(axis=1)
matches_per_main += has_match.astype(int)
if np.any(has_match):
# Keep only one bbox per frame for each main bbox (best IoU among matches).
masked_ious = np.where(match_mask, ious, -1.0)
best_idx_per_main = np.argmax(masked_ious, axis=1)
best_conf_per_main = bbox[best_idx_per_main, 4].astype(np.float64)
matched_conf = np.where(has_match, best_conf_per_main, 0.0)
conf_max_per_main = np.maximum(conf_max_per_main, matched_conf)
for main_idx in np.flatnonzero(has_match):
matched_conf_values_per_main[main_idx].append(float(best_conf_per_main[main_idx]))
matched_frame_indices_per_main[main_idx].append(int(frame_idx))
if first_match_frame_idx_per_main[main_idx] is None:
first_match_frame_idx_per_main[main_idx] = int(frame_idx)
first_match_bbox_per_main[main_idx] = np.asarray(
bbox[int(best_idx_per_main[main_idx])], dtype=np.float64
).copy()
required_matches = max(MIN_MAIN_MATCH_ABS, int(np.ceil(float(MIN_MAIN_MATCH_RATIO) * n_frames)))
keep_main = matches_per_main >= required_matches
if not np.any(keep_main):
return []
kept = []
for idx in np.flatnonzero(keep_main):
match_count = int(matches_per_main[idx])
matched_conf_values = matched_conf_values_per_main[idx]
median_conf = (
float(np.median(np.asarray(matched_conf_values, dtype=np.float64))) if matched_conf_values else 0.0
)
if median_conf < MIN_COMBINED_MEDIAN_CONF:
LOGGER.info(
(
"Combine drop candidate | matches=%d/%d (required=%d) | "
"median_conf=%.2f < min_combined_median_conf=%.2f"
),
match_count,
n_frames,
required_matches,
median_conf,
MIN_COMBINED_MEDIAN_CONF,
)
continue
kept.append(
{
"box": main_bboxes[idx],
"match_count": match_count,
"n_frames": int(n_frames),
"required_matches": int(required_matches),
"match_ratio": float(match_count / max(n_frames, 1)),
"median_conf": median_conf,
"max_conf": float(conf_max_per_main[idx]),
"matched_conf_values": matched_conf_values,
"matched_frame_indices": matched_frame_indices_per_main[idx],
"first_match_frame_idx": first_match_frame_idx_per_main[idx],
"first_match_bbox": first_match_bbox_per_main[idx],
}
)
return kept
def infer(video_file):
timing = {}
wall_t0 = time.perf_counter()
with timer("resolve_video_path", timing):
video_path = _resolve_video_path(video_file)
LOGGER.info("Inference start | video=%s", video_path)
LOGGER.info(
(
"Inference config | batch_size=%d motion_segmentation=%s fast_n_samples=%d "
"max_infer_frames_per_split=%d min_main_match_abs=%d min_main_match_ratio=%.2f "
"main_det_match_iou_threshold=%.2f min_combined_median_conf=%.2f "
"display_det_match_iou_threshold=%.2f"
),
INFER_BATCH_SIZE,
ENABLE_MOTION_SEGMENTATION,
FAST_N_SAMPLES,
MAX_INFER_FRAMES_PER_SPLIT,
MIN_MAIN_MATCH_ABS,
MIN_MAIN_MATCH_RATIO,
MAIN_DET_MATCH_IOU_THRESHOLD,
MIN_COMBINED_MEDIAN_CONF,
DISPLAY_DET_MATCH_IOU_THRESHOLD,
)
with timer("prepare_splits", timing):
if ENABLE_MOTION_SEGMENTATION:
split_frames = split_video_stable(video_path)
else:
fast_frames = split_video(video_path, n=FAST_N_SAMPLES)
split_frames = [fast_frames] if fast_frames else []
total_frames = sum(len(frames) for frames in split_frames)
LOGGER.info("Inference workload | splits=%d total_frames=%d", len(split_frames), total_frames)
if not split_frames:
LOGGER.info("Inference stop | no frames available")
timing["wall"] = time.perf_counter() - wall_t0
_log_timing_summary("Inference", timing, wall_time=timing["wall"])
return {"detections": [], "all_frame_predictions": []}
outputs = []
all_frame_predictions = []
infer_model = 0.0
combine_time = 0.0
iou_time = 0.0
draw_time = 0.0
draw_all_frames_time = 0.0
split_loop_time = 0.0
for split_idx, frames in enumerate(split_frames):
split_t0 = time.perf_counter()
original_len = len(frames)
if MAX_INFER_FRAMES_PER_SPLIT > 0 and original_len > MAX_INFER_FRAMES_PER_SPLIT:
frames_for_infer = _sample_uniform_items(frames, MAX_INFER_FRAMES_PER_SPLIT)
else:
frames_for_infer = frames
LOGGER.info(
"Inference split %d | frames=%d used_for_infer=%d",
split_idx + 1,
original_len,
len(frames_for_infer),
)
t_model = time.perf_counter()
if hasattr(model, "infer_batch"):
frame_preds = model.infer_batch(frames_for_infer, batch_size=INFER_BATCH_SIZE)
else:
frame_preds = [model(frame) for frame in frames_for_infer]
frame_preds = [np.asarray(bbox, dtype=np.float64).reshape(-1, 5) for bbox in frame_preds]
for frame_idx, bbox in enumerate(frame_preds):
if bbox.size == 0:
LOGGER.info(
"Inference split %d frame %d | detections=0",
split_idx + 1,
frame_idx + 1,
)
continue
confs = bbox[:, 4].astype(np.float64)
conf_list_txt = ", ".join(f"{float(c):.2f}" for c in confs.tolist())
LOGGER.info(
(
"Inference split %d frame %d | detections=%d | confs=[%s] | "
"frame_max_conf=%.2f | frame_mean_conf_all_bboxes=%.2f"
),
split_idx + 1,
frame_idx + 1,
len(bbox),
conf_list_txt,
float(np.max(confs)),
float(np.mean(confs)),
)
for frame_idx, (frame, bbox) in enumerate(zip(frames_for_infer, frame_preds)):
subtitle = f"segment {split_idx + 1} / frame {frame_idx + 1}"
t_draw_all = time.perf_counter()
all_frame_predictions.append(
{
"image": _draw_detections(frame, bbox, subtitle=subtitle),
"caption": f"Segment {split_idx + 1} - Frame {frame_idx + 1}",
}
)
draw_all_frames_time += time.perf_counter() - t_draw_all
split_model = time.perf_counter() - t_model
infer_model += split_model
split_iou = 0.0
split_draw = 0.0
t_combine = time.perf_counter()
kept_main = _combine_predictions_per_split(frame_preds)
dt_combine = time.perf_counter() - t_combine
combine_time += dt_combine
LOGGER.info(
"Inference split %d | combined_detections=%d",
split_idx + 1,
len(kept_main),
)
for det_idx, det_info in enumerate(kept_main):
conf_values_txt = ", ".join(f"{float(c):.2f}" for c in det_info["matched_conf_values"])
frame_indices_txt = ", ".join(str(int(i) + 1) for i in det_info["matched_frame_indices"])
LOGGER.info(
(
"Inference split %d combined detection %d | matches=%d/%d "
"(required=%d, ratio=%.2f) | combine_median_conf=%.2f | combine_max_conf=%.2f | "
"matched_frames=[%s] | matched_confs=[%s]"
),
split_idx + 1,
det_idx + 1,
det_info["match_count"],
det_info["n_frames"],
det_info["required_matches"],
det_info["match_ratio"],
det_info["median_conf"],
det_info["max_conf"],
frame_indices_txt,
conf_values_txt,
)
if not kept_main:
split_elapsed = time.perf_counter() - split_t0
split_loop_time += split_elapsed
LOGGER.info(
(
"Inference split %d timing | total=%.3fs | model=%.3fs | combine=%.3fs | "
"iou=%.3fs | draw=%.3fs | avg_model_ms=%.1f"
),
split_idx + 1,
split_elapsed,
split_model,
dt_combine,
split_iou,
split_draw,
(1000.0 * split_model / max(len(frames_for_infer), 1)),
)
continue
for det_idx, det_info in enumerate(kept_main):
main_box = det_info["box"]
selected_frame_idx = None
selected_bbox = None
selection_source = None
# Prefer the earliest frame that overlaps the combined detection, using a relaxed
# threshold for display (so we show the first visible appearance of the event).
for frame_idx, bbox in enumerate(frame_preds):
if bbox.size == 0:
continue
t_iou = time.perf_counter()
ious = box_iou(bbox[:, :4], main_box[:4].reshape(1, 4))
dt_iou = time.perf_counter() - t_iou
split_iou += dt_iou
iou_time += dt_iou
if (ious > DISPLAY_DET_MATCH_IOU_THRESHOLD).any():
match_idx = int(np.argmax(ious[0]))
selected_frame_idx = int(frame_idx)
selected_bbox = np.asarray(bbox[match_idx], dtype=np.float64).reshape(1, 5)
selection_source = "display_first_overlap"
break
first_match_frame_idx = det_info.get("first_match_frame_idx")
first_match_bbox = det_info.get("first_match_bbox")
if selected_frame_idx is None or selected_bbox is None:
if (
first_match_frame_idx is None
or first_match_bbox is None
or int(first_match_frame_idx) < 0
or int(first_match_frame_idx) >= len(frames_for_infer)
):
LOGGER.warning(
"Inference split %d detection %d | missing display frame and first matched frame/bbox",
split_idx + 1,
det_idx + 1,
)
continue
selected_frame_idx = int(first_match_frame_idx)
selected_bbox = np.asarray(first_match_bbox, dtype=np.float64).reshape(1, 5)
selection_source = "combine_first_match_fallback"
frame = frames_for_infer[selected_frame_idx]
LOGGER.info(
(
"Inference split %d detection %d | selected_frame=%d | source=%s | "
"selected frame_conf=%.2f | combine_median_conf=%.2f | combine_max_conf=%.2f"
),
split_idx + 1,
det_idx + 1,
selected_frame_idx + 1,
selection_source,
float(selected_bbox[0, 4]),
det_info["median_conf"],
det_info["max_conf"],
)
subtitle = (
f"segment {split_idx + 1} / detection {det_idx + 1} | "
f"frame {selected_frame_idx + 1} | "
f"matchs {det_info['match_count']}/{det_info['n_frames']} | "
f"conf_med {det_info['median_conf']:.2f}"
)
t_draw = time.perf_counter()
outputs.append(_draw_detections(frame, selected_bbox, subtitle=subtitle))
dt_draw = time.perf_counter() - t_draw
split_draw += dt_draw
draw_time += dt_draw
split_elapsed = time.perf_counter() - split_t0
split_loop_time += split_elapsed
LOGGER.info(
(
"Inference split %d timing | total=%.3fs | model=%.3fs | combine=%.3fs | "
"iou=%.3fs | draw=%.3fs | avg_model_ms=%.1f"
),
split_idx + 1,
split_elapsed,
split_model,
dt_combine,
split_iou,
split_draw,
(1000.0 * split_model / max(len(frames_for_infer), 1)),
)
timing["split_loop"] = split_loop_time
timing["model_infer"] = infer_model
timing["combine_predictions"] = combine_time
timing["iou_matching"] = iou_time
timing["draw_detections"] = draw_time
timing["draw_all_frame_predictions"] = draw_all_frames_time
timing["wall"] = time.perf_counter() - wall_t0
_log_timing_summary("Inference", timing, wall_time=timing["wall"])
LOGGER.info(
"Inference done | output_images=%d all_frame_prediction_images=%d",
len(outputs),
len(all_frame_predictions),
)
return {"detections": outputs, "all_frame_predictions": all_frame_predictions}
def _upload_signature(uploaded_file):
buffer = uploaded_file.getbuffer()
size = uploaded_file.size if uploaded_file.size is not None else len(buffer)
digest = sha1(buffer).hexdigest()
return (uploaded_file.name or "uploaded.mp4", int(size), digest)
def _write_uploaded_video(uploaded_file):
ext = os.path.splitext(uploaded_file.name or "")[1] or ".mp4"
with tempfile.NamedTemporaryFile(prefix="upload_", suffix=ext, delete=False) as tmp:
tmp.write(uploaded_file.getbuffer())
return tmp.name
def _render_outputs(outputs):
detections = outputs
all_frame_predictions = []
if isinstance(outputs, dict):
detections = outputs.get("detections", [])
all_frame_predictions = outputs.get("all_frame_predictions", [])
if not detections:
st.warning("Aucune detection d'incendie trouvee dans cette video.")
else:
st.subheader("Incendies detectes")
columns = st.columns(2)
for idx, image in enumerate(detections):
columns[idx % 2].image(image, caption=f"Detection {idx + 1}", use_container_width=True)
# if all_frame_predictions:
# with st.expander(
# f"Predictions sur toutes les frames echantillonnees ({len(all_frame_predictions)})",
# expanded=False,
# ):
# columns = st.columns(2)
# for idx, item in enumerate(all_frame_predictions):
# image = item["image"] if isinstance(item, dict) else item
# caption = (
# item.get("caption", f"Frame {idx + 1}")
# if isinstance(item, dict)
# else f"Frame {idx + 1}"
# )
# columns[idx % 2].image(image, caption=caption, use_container_width=True)
def main():
st.set_page_config(page_title="Detection d'incendies Pyronear", layout="wide")
st.image(PYRONEAR_LOGO_URL, width=220)
st.title("Detection d'incendies Pyronear")
st.write("Televersez un MP4 pour lancer la detection automatiquement.")
uploaded = st.file_uploader("Televerser un MP4", type=["mp4"])
if uploaded is None:
st.info("En attente du televersement d'une video.")
return
signature = _upload_signature(uploaded)
previous_signature = st.session_state.get("upload_signature")
if signature != previous_signature:
temp_path = None
st.session_state["upload_signature"] = signature
with st.spinner("Detection d'incendies en cours..."):
try:
temp_path = _write_uploaded_video(uploaded)
st.session_state["output_images"] = infer(temp_path)
st.session_state["inference_error"] = None
except Exception as exc:
LOGGER.exception("Inference failed")
st.session_state["output_images"] = []
st.session_state["inference_error"] = str(exc)
finally:
if temp_path and os.path.exists(temp_path):
os.remove(temp_path)
if st.session_state.get("inference_error"):
st.error(f"Echec de la detection : {st.session_state['inference_error']}")
return
_render_outputs(st.session_state.get("output_images", []))
if __name__ == "__main__":
main()