import logging import os import shutil import subprocess import tempfile import time from hashlib import sha1 from collections import deque from contextlib import contextmanager import cv2 import numpy as np import streamlit as st from PIL import Image, ImageDraw from vision import Classifier from utils import box_iou, nms LOGGER = logging.getLogger(__name__) PYRONEAR_LOGO_URL = ( "https://raw.githubusercontent.com/pyronear/pyro-engine/develop/docs/source/_static/img/pyronear-logo-dark.png" ) DEFAULT_SPLIT_CFG = { "n_samples": 16, "max_w": 400, "crop_y": (0.25, 0.90), "dx_threshold_px": 1.5, "min_inlier_ratio": 0.20, "min_stable_frames": 2, "smooth_window": 2, "orb_nfeatures": 800, "orb_fast_threshold": 12, "min_matches": 25, "keep_ratio": 0.4, "jump_meanabs_threshold": 18.0, "progress_every": 0, } ENABLE_MOTION_SEGMENTATION = os.getenv("ENABLE_MOTION_SEGMENTATION", "0").strip().lower() in { "1", "true", "yes", "on", } FAST_N_SAMPLES = max(1, int(os.getenv("FAST_N_SAMPLES", "12"))) INFER_BATCH_SIZE = max(1, int(os.getenv("INFER_BATCH_SIZE", "16"))) MODEL_IMGSZ = max(320, int(os.getenv("MODEL_IMGSZ", "1024"))) MAX_INFER_FRAMES_PER_SPLIT = max(0, int(os.getenv("MAX_INFER_FRAMES_PER_SPLIT", "12"))) MIN_MAIN_MATCH_ABS = max(1, int(os.getenv("MIN_MAIN_MATCH_ABS", "3"))) MIN_MAIN_MATCH_RATIO = float(os.getenv("MIN_MAIN_MATCH_RATIO", "0.20")) MAIN_DET_MATCH_IOU_THRESHOLD = float(os.getenv("MAIN_DET_MATCH_IOU_THRESHOLD", "0.12")) MIN_COMBINED_MEDIAN_CONF = float(os.getenv("MIN_COMBINED_MEDIAN_CONF", "0.12")) DISPLAY_DET_MATCH_IOU_THRESHOLD = float(os.getenv("DISPLAY_DET_MATCH_IOU_THRESHOLD", "0.0")) def _log_timing_summary(label, stats, wall_time=None, max_items=12): if not stats: LOGGER.info("%s timing | no data", label) return entries = sorted( ((name, float(value)) for name, value in stats.items() if value is not None), key=lambda item: item[1], reverse=True, ) if wall_time is None: wall_time = stats.get("wall") step_entries = [(name, sec) for name, sec in entries if name != "wall"] parts = [] if wall_time is not None: parts.append(f"wall={float(wall_time):.3f}s") for name, sec in step_entries[:max_items]: if wall_time and wall_time > 0: parts.append(f"{name}={sec:.3f}s ({(100.0 * sec / float(wall_time)):.1f}%)") else: parts.append(f"{name}={sec:.3f}s") remaining = max(0, len(step_entries) - max_items) if remaining: parts.append(f"+{remaining} more") LOGGER.info("%s timing | %s", label, " | ".join(parts)) def _sample_indices(total, n): if total <= 0: return [] if total <= n: return list(range(total)) return np.linspace(0, total - 1, n).astype(int).tolist() def _format_idx_list(indices, max_items=40): if not indices: return "[]" values = [int(i) for i in indices] if len(values) <= max_items: return str(values) head = values[: max_items // 2] tail = values[-(max_items // 2) :] return f"{head} ... {tail} (len={len(values)})" def _sample_uniform_items(items, n): n = max(1, int(n)) if len(items) <= n: return items indices = np.linspace(0, len(items) - 1, n).astype(int).tolist() return [items[i] for i in indices] def _parse_fraction(value): if not value: return None txt = str(value).strip() if not txt or txt == "0/0": return None if "/" in txt: num, den = txt.split("/", 1) try: den_f = float(den) if den_f == 0: return None return float(num) / den_f except Exception: return None try: return float(txt) except Exception: return None def _probe_total_frames_ffprobe(video_path): ffprobe = shutil.which("ffprobe") if ffprobe is None: return None timing = {} wall_t0 = time.perf_counter() video_name = os.path.basename(video_path) # Try direct frame count first. cmd = [ ffprobe, "-v", "error", "-select_streams", "v:0", "-show_entries", "stream=nb_frames", "-of", "default=noprint_wrappers=1:nokey=1", video_path, ] with timer("ffprobe_nb_frames", timing): proc = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, check=False) if proc.returncode == 0: raw = proc.stdout.strip() if raw.isdigit(): val = int(raw) if val > 0: timing["wall"] = time.perf_counter() - wall_t0 _log_timing_summary(f"ffprobe ({video_name})", timing, wall_time=timing["wall"]) return val # Fallback: estimate from duration * avg frame rate. cmd = [ ffprobe, "-v", "error", "-select_streams", "v:0", "-show_entries", "stream=avg_frame_rate,duration", "-of", "default=noprint_wrappers=1:nokey=1", video_path, ] with timer("ffprobe_fps_duration", timing): proc = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, check=False) if proc.returncode != 0: timing["wall"] = time.perf_counter() - wall_t0 _log_timing_summary(f"ffprobe ({video_name})", timing, wall_time=timing["wall"]) return None lines = [line.strip() for line in proc.stdout.splitlines() if line.strip()] if len(lines) < 2: timing["wall"] = time.perf_counter() - wall_t0 _log_timing_summary(f"ffprobe ({video_name})", timing, wall_time=timing["wall"]) return None fps = _parse_fraction(lines[0]) duration = _parse_fraction(lines[1]) if fps is None or duration is None: timing["wall"] = time.perf_counter() - wall_t0 _log_timing_summary(f"ffprobe ({video_name})", timing, wall_time=timing["wall"]) return None estimate = int(round(fps * duration)) timing["wall"] = time.perf_counter() - wall_t0 _log_timing_summary(f"ffprobe ({video_name})", timing, wall_time=timing["wall"]) return estimate if estimate > 0 else None def _probe_duration_ffprobe(video_path): ffprobe = shutil.which("ffprobe") if ffprobe is None: return None cmd = [ ffprobe, "-v", "error", "-show_entries", "format=duration", "-of", "default=noprint_wrappers=1:nokey=1", video_path, ] proc = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, check=False) if proc.returncode != 0: return None lines = [line.strip() for line in proc.stdout.splitlines() if line.strip()] if not lines: return None duration = _parse_fraction(lines[0]) if duration is None or duration <= 0: return None return float(duration) def _probe_video_size_ffprobe(video_path): ffprobe = shutil.which("ffprobe") if ffprobe is None: return None cmd = [ ffprobe, "-v", "error", "-select_streams", "v:0", "-show_entries", "stream=width,height", "-of", "csv=p=0:s=x", video_path, ] proc = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, check=False) if proc.returncode != 0: return None line = next((txt.strip() for txt in proc.stdout.splitlines() if txt.strip()), "") if "x" not in line: return None left, right = line.split("x", 1) if not left.isdigit() or not right.isdigit(): return None width, height = int(left), int(right) if width <= 0 or height <= 0: return None return width, height def _extract_bgr_with_ffmpeg_disk(video_path, n): ffmpeg = shutil.which("ffmpeg") if ffmpeg is None: raise RuntimeError("ffmpeg is not available") timing = {} wall_t0 = time.perf_counter() video_name = os.path.basename(video_path) with timer("probe_total_frames", timing): total = _probe_total_frames_ffprobe(video_path) if total is None or total <= 0: timing["wall"] = time.perf_counter() - wall_t0 _log_timing_summary(f"Frame extraction ({video_name})", timing, wall_time=timing["wall"]) raise RuntimeError("ffprobe could not determine total frame count") with timer("sample_indices", timing): indices = _sample_indices(total, int(n)) if not indices: timing["wall"] = time.perf_counter() - wall_t0 _log_timing_summary(f"Frame extraction ({video_name})", timing, wall_time=timing["wall"]) return [] LOGGER.info( "Frame extraction | video=%s total_frames=%d n_samples=%d sampled_indices=%s", os.path.basename(video_path), total, len(indices), _format_idx_list(indices), ) select_expr = "+".join(f"eq(n\\,{int(i)})" for i in indices) vf = f"select={select_expr}" with tempfile.TemporaryDirectory(prefix="ffmpeg_frames_") as tmpdir: pattern = os.path.join(tmpdir, "frame_%06d.jpg") cmd = [ ffmpeg, "-hide_banner", "-loglevel", "error", "-i", video_path, "-vf", vf, "-vsync", "vfr", "-q:v", "2", pattern, ] with timer("ffmpeg_extract", timing): proc = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, check=False) if proc.returncode != 0: timing["wall"] = time.perf_counter() - wall_t0 _log_timing_summary(f"Frame extraction ({video_name})", timing, wall_time=timing["wall"]) raise RuntimeError(proc.stderr.strip() or "ffmpeg extraction failed") frames = [] with timer("read_extracted_images", timing): for name in sorted(os.listdir(tmpdir)): if not name.lower().endswith(".jpg"): continue frame = cv2.imread(os.path.join(tmpdir, name), cv2.IMREAD_COLOR) if frame is not None: frames.append(frame) LOGGER.info( "Frame extraction done | video=%s extracted=%d requested=%d", os.path.basename(video_path), len(frames), len(indices), ) timing["wall"] = time.perf_counter() - wall_t0 _log_timing_summary(f"Frame extraction ({video_name})", timing, wall_time=timing["wall"]) return frames def _extract_bgr_with_ffmpeg(video_path, n): ffmpeg = shutil.which("ffmpeg") if ffmpeg is None: raise RuntimeError("ffmpeg is not available") n = max(1, int(n)) timing = {} wall_t0 = time.perf_counter() video_name = os.path.basename(video_path) with timer("probe_duration", timing): duration = _probe_duration_ffprobe(video_path) if duration is None or duration <= 0: LOGGER.warning("Frame extraction | ffprobe duration unavailable, fallback to disk extraction") with timer("fallback_disk_extract", timing): frames = _extract_bgr_with_ffmpeg_disk(video_path, n) timing["wall"] = time.perf_counter() - wall_t0 _log_timing_summary(f"Frame extraction ({video_name})", timing, wall_time=timing["wall"]) return frames with timer("probe_video_size", timing): video_size = _probe_video_size_ffprobe(video_path) if video_size is None: LOGGER.warning("Frame extraction | ffprobe size unavailable, fallback to disk extraction") with timer("fallback_disk_extract", timing): frames = _extract_bgr_with_ffmpeg_disk(video_path, n) timing["wall"] = time.perf_counter() - wall_t0 _log_timing_summary(f"Frame extraction ({video_name})", timing, wall_time=timing["wall"]) return frames width, height = video_size frame_size = int(width) * int(height) * 3 if frame_size <= 0: LOGGER.warning("Frame extraction | invalid frame size, fallback to disk extraction") with timer("fallback_disk_extract", timing): frames = _extract_bgr_with_ffmpeg_disk(video_path, n) timing["wall"] = time.perf_counter() - wall_t0 _log_timing_summary(f"Frame extraction ({video_name})", timing, wall_time=timing["wall"]) return frames sample_fps = max(1e-6, float(n) / float(duration)) LOGGER.info( "Frame extraction (single ffmpeg/rawvideo) | video=%s duration=%.3fs n_samples=%d fps=%.6f size=%dx%d", video_name, duration, n, sample_fps, width, height, ) cmd = [ ffmpeg, "-hide_banner", "-loglevel", "error", "-i", video_path, "-vf", f"fps={sample_fps:.8f}", "-frames:v", str(n), "-f", "rawvideo", "-pix_fmt", "bgr24", "-", ] with timer("ffmpeg_extract_rawvideo", timing): proc = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=False) if proc.returncode != 0 or not proc.stdout: LOGGER.warning( "Frame extraction rawvideo failed | video=%s err=%s", video_name, (proc.stderr.decode("utf-8", errors="ignore").strip() if proc.stderr else "no stderr"), ) with timer("fallback_disk_extract", timing): frames = _extract_bgr_with_ffmpeg_disk(video_path, n) timing["wall"] = time.perf_counter() - wall_t0 _log_timing_summary(f"Frame extraction ({video_name})", timing, wall_time=timing["wall"]) return frames with timer("decode_rawvideo", timing): raw = proc.stdout frame_count = len(raw) // frame_size usable_bytes = frame_count * frame_size if frame_count > 0 and usable_bytes: arr = np.frombuffer(raw[:usable_bytes], dtype=np.uint8).reshape(frame_count, height, width, 3) frames = [arr[idx].copy() for idx in range(frame_count)] else: frames = [] if len(frames) > n: frames = _sample_uniform_items(frames, n) if not frames: LOGGER.warning("Frame extraction | rawvideo mode returned 0 frame, fallback to disk extraction") with timer("fallback_disk_extract", timing): frames = _extract_bgr_with_ffmpeg_disk(video_path, n) LOGGER.info( "Frame extraction done | video=%s extracted=%d requested=%d", video_name, len(frames), n, ) timing["wall"] = time.perf_counter() - wall_t0 _log_timing_summary(f"Frame extraction ({video_name})", timing, wall_time=timing["wall"]) return frames def _extract_with_ffmpeg(video_path, n): timing = {} wall_t0 = time.perf_counter() with timer("extract_bgr", timing): frames = _extract_bgr_with_ffmpeg(video_path, n) with timer("bgr_to_pil", timing): pil_frames = [Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)) for frame in frames] timing["wall"] = time.perf_counter() - wall_t0 _log_timing_summary("Extract with ffmpeg", timing, wall_time=timing["wall"]) return pil_frames def split_video(video_path, n=8): if not video_path or not os.path.exists(video_path): return [] timing = {} wall_t0 = time.perf_counter() with timer("extract_with_ffmpeg", timing): frames = _extract_with_ffmpeg(video_path, n) timing["wall"] = time.perf_counter() - wall_t0 _log_timing_summary("split_video", timing, wall_time=timing["wall"]) return frames @contextmanager def timer(name, stats): t0 = time.perf_counter() yield stats[name] = stats.get(name, 0.0) + (time.perf_counter() - t0) def _iter_sampled_frames(video_path, n_samples, sampled_frames=None): timing = {} wall_t0 = time.perf_counter() if sampled_frames is None: with timer("extract_bgr_with_ffmpeg", timing): frames = _extract_bgr_with_ffmpeg(video_path, int(n_samples)) else: with timer("reuse_sampled_frames", timing): frames = sampled_frames timing["wall"] = time.perf_counter() - wall_t0 _log_timing_summary("Iter sampled frames", timing, wall_time=timing["wall"]) for out_idx, frame in enumerate(frames): yield out_idx, frame def iter_frames(video_path, n_samples, max_w, crop_y, sampled_frames=None): timing = {"resize": 0.0, "crop": 0.0} wall_t0 = time.perf_counter() frame_count = 0 try: for out_idx, frame in _iter_sampled_frames(video_path, n_samples, sampled_frames=sampled_frames): frame_count += 1 proc = frame if max_w > 0 and proc.shape[1] != max_w: t_resize = time.perf_counter() scale = max_w / float(proc.shape[1]) proc = cv2.resize( proc, (max_w, int(proc.shape[0] * scale)), interpolation=cv2.INTER_AREA, ) timing["resize"] += time.perf_counter() - t_resize if crop_y is not None: t_crop = time.perf_counter() h = proc.shape[0] y0 = int(max(0.0, min(1.0, float(crop_y[0]))) * h) y1 = int(max(0.0, min(1.0, float(crop_y[1]))) * h) if y1 > y0: proc = proc[y0:y1, :] timing["crop"] += time.perf_counter() - t_crop yield out_idx, proc finally: timing["wall"] = time.perf_counter() - wall_t0 LOGGER.info( "iter_frames summary | n_samples=%d yielded=%d max_w=%d crop_y=%s", int(n_samples), frame_count, int(max_w), crop_y, ) _log_timing_summary("iter_frames", timing, wall_time=timing["wall"]) def quick_jump_score(prev_gray, gray, small_w=160): h, w = prev_gray.shape[:2] if w > small_w: scale = small_w / float(w) prev_s = cv2.resize(prev_gray, (small_w, int(h * scale)), interpolation=cv2.INTER_AREA) gray_s = cv2.resize(gray, (small_w, int(h * scale)), interpolation=cv2.INTER_AREA) else: prev_s = prev_gray gray_s = gray diff = cv2.absdiff(prev_s, gray_s) return float(np.mean(diff)) def estimate_dx_orb_affine(prev_gray, gray, orb, bf, min_matches, keep_ratio, timing_pair): with timer("orb_detect_compute", timing_pair): kp1, des1 = orb.detectAndCompute(prev_gray, None) kp2, des2 = orb.detectAndCompute(gray, None) if des1 is None or des2 is None or len(kp1) < 8 or len(kp2) < 8: return None with timer("bf_match", timing_pair): matches = bf.match(des1, des2) if len(matches) < min_matches: return None with timer("match_sort_filter", timing_pair): matches = sorted(matches, key=lambda m: m.distance) keep_n = max(8, int(len(matches) * keep_ratio)) matches = matches[:keep_n] pts1 = np.float32([kp1[m.queryIdx].pt for m in matches]) pts2 = np.float32([kp2[m.trainIdx].pt for m in matches]) with timer("ransac_affine", timing_pair): M, inliers = cv2.estimateAffinePartial2D( pts1, pts2, method=cv2.RANSAC, ransacReprojThreshold=3.0, maxIters=1500, confidence=0.99, ) if M is None: return None dx = float(M[0, 2]) dy = float(M[1, 2]) inlier_ratio = float(np.mean(inliers)) if inliers is not None else 0.0 return { "dx": dx, "dy": dy, "score_dx": float(abs(dx)), "score_px": float(np.hypot(dx, dy)), "inlier_ratio": inlier_ratio, "matches": len(matches), "M": M, } def split_video_into_stable_segments_fast( video_path, n_samples=16, max_w=400, crop_y=(0.25, 0.90), dx_threshold_px=1.5, min_inlier_ratio=0.20, min_stable_frames=2, smooth_window=2, orb_nfeatures=800, orb_fast_threshold=12, min_matches=25, keep_ratio=0.4, jump_meanabs_threshold=18.0, progress_every=200, sampled_frames=None, ): wall_t0 = time.perf_counter() timing_total = {} timing_pair = {} with timer("setup", timing_total): orb = cv2.ORB_create(nfeatures=orb_nfeatures, fastThreshold=orb_fast_threshold) bf = cv2.BFMatcher(cv2.NORM_HAMMING, crossCheck=True) metrics = [] prev_gray = None frame_count = 0 with timer("loop_total", timing_total): for _, frame in iter_frames( video_path, n_samples=n_samples, max_w=max_w, crop_y=crop_y, sampled_frames=sampled_frames, ): frame_count += 1 with timer("to_gray", timing_total): gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) if prev_gray is not None: with timer("quick_jump", timing_total): q = quick_jump_score(prev_gray, gray) if q >= jump_meanabs_threshold: metrics.append( { "dx": np.nan, "dy": np.nan, "score_dx": 1e9, "score_px": 1e9, "inlier_ratio": 0.0, "matches": 0, "M": None, "quick_jump": q, } ) else: m = estimate_dx_orb_affine( prev_gray, gray, orb=orb, bf=bf, min_matches=min_matches, keep_ratio=keep_ratio, timing_pair=timing_pair, ) if m is None: metrics.append( { "dx": np.nan, "dy": np.nan, "score_dx": 1e9, "score_px": 1e9, "inlier_ratio": 0.0, "matches": 0, "M": None, "quick_jump": q, } ) else: m["quick_jump"] = q metrics.append(m) if progress_every and (len(metrics) % progress_every == 0): print(f"processed pairs: {len(metrics)}") prev_gray = gray if frame_count < 2: timing_total["wall"] = time.perf_counter() - wall_t0 _log_timing_summary("Segmentation total", timing_total, wall_time=timing_total["wall"]) if timing_pair: _log_timing_summary( "Segmentation pair internals", timing_pair, wall_time=max(timing_total.get("loop_total", 0.0), 1e-9), ) return [], metrics, [], {"total": timing_total, "per_pair": timing_pair} with timer("post_smooth", timing_total): raw_dx = [m["score_dx"] for m in metrics] raw_inlier = [m["inlier_ratio"] for m in metrics] smoothed_dx = [] q = deque(maxlen=max(1, int(smooth_window))) for v in raw_dx: if not np.isfinite(v): q.clear() smoothed_dx.append(np.nan) else: q.append(v) smoothed_dx.append(float(np.mean(q))) with timer("post_segments", timing_total): min_len = max(1, int(min_stable_frames)) stable_flags = [] for dx_s, r in zip(smoothed_dx, raw_inlier): if not np.isfinite(dx_s): stable_flags.append(False) else: stable_flags.append((dx_s < dx_threshold_px) and (r >= min_inlier_ratio)) segments = [] start = None for i, is_stable in enumerate(stable_flags): if is_stable and start is None: start = i if (not is_stable) and start is not None: end = i if (end - start) >= min_len: segments.append((start, end)) start = None if start is not None: end = len(stable_flags) if (end - start) >= min_len: segments.append((start, end)) LOGGER.info( "Segmentation summary | sampled_frames=%d pair_metrics=%d stable_segments=%d", frame_count, len(metrics), len(segments), ) if segments: LOGGER.info("Segment ranges (sample indices) | %s", segments) timing_total["wall"] = time.perf_counter() - wall_t0 _log_timing_summary("Segmentation total", timing_total, wall_time=timing_total["wall"]) if timing_pair: _log_timing_summary( "Segmentation pair internals", timing_pair, wall_time=max(timing_total.get("loop_total", 0.0), 1e-9), ) return segments, metrics, smoothed_dx, {"total": timing_total, "per_pair": timing_pair} def _bgr_to_pil(frame): return Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)) def extract_segment_frames(video_path, segments, n_samples, sampled_frames=None): timing = {} wall_t0 = time.perf_counter() if not segments: LOGGER.info("Segment frame extraction | no segments found") timing["wall"] = time.perf_counter() - wall_t0 _log_timing_summary("Segment frame extraction", timing, wall_time=timing["wall"]) return [] with timer("normalize_segments", timing): normalized_segments = [] for start, end in segments: s = max(0, int(start)) e = max(s, int(end)) normalized_segments.append((s, e)) with timer("prepare_groups", timing): normalized_segments.sort(key=lambda x: x[0]) grouped_frames = [[] for _ in normalized_segments] grouped_indices = [[] for _ in normalized_segments] segment_idx = 0 # Detection runs on original sampled frames (no resize / no crop). to_pil_time = 0.0 with timer("assign_frames_to_segments", timing): for frame_idx, frame in _iter_sampled_frames(video_path, n_samples=n_samples, sampled_frames=sampled_frames): while segment_idx < len(normalized_segments) and frame_idx > normalized_segments[segment_idx][1]: segment_idx += 1 if segment_idx >= len(normalized_segments): break seg_start, seg_end = normalized_segments[segment_idx] if seg_start <= frame_idx <= seg_end: t_pil = time.perf_counter() grouped_frames[segment_idx].append(_bgr_to_pil(frame)) to_pil_time += time.perf_counter() - t_pil grouped_indices[segment_idx].append(frame_idx) timing["to_pil"] = to_pil_time LOGGER.info( "Segment frame extraction summary | segments=%d n_samples=%d", len(normalized_segments), n_samples, ) for seg_i, ((seg_start, seg_end), idx_list, frames) in enumerate( zip(normalized_segments, grouped_indices, grouped_frames), start=1, ): LOGGER.info( "Segment %d | requested_range=[%d,%d] matched_frames=%d matched_indices=%s", seg_i, seg_start, seg_end, len(frames), _format_idx_list(idx_list), ) timing["wall"] = time.perf_counter() - wall_t0 _log_timing_summary("Segment frame extraction", timing, wall_time=timing["wall"]) return [frames for frames in grouped_frames if frames] def split_video_stable(video_path, split_cfg=None, fallback_n=16): if not video_path or not os.path.exists(video_path): return [] timing = {} wall_t0 = time.perf_counter() cfg = DEFAULT_SPLIT_CFG.copy() if split_cfg: cfg.update(split_cfg) LOGGER.info("Split config | %s", cfg) with timer("extract_sampled_frames", timing): sampled_frames = _extract_bgr_with_ffmpeg(video_path, int(cfg["n_samples"])) with timer("split_video_into_stable_segments_fast", timing): segments, _, _, _ = split_video_into_stable_segments_fast(video_path, sampled_frames=sampled_frames, **cfg) with timer("extract_segment_frames", timing): frame_groups = extract_segment_frames( video_path, segments, n_samples=cfg["n_samples"], sampled_frames=sampled_frames, ) if frame_groups: LOGGER.info( "Split result | stable_splits=%d split_frame_counts=%s", len(frame_groups), [len(group) for group in frame_groups], ) timing["wall"] = time.perf_counter() - wall_t0 _log_timing_summary("split_video_stable", timing, wall_time=timing["wall"]) return frame_groups LOGGER.info("Split result | no stable segment, using fallback sampling n=%d", fallback_n) if int(fallback_n) == int(cfg["n_samples"]): with timer("fallback_reuse_sampled_frames", timing): fallback_frames = [_bgr_to_pil(frame) for frame in sampled_frames] else: with timer("fallback_split_video", timing): fallback_frames = split_video(video_path, n=fallback_n) LOGGER.info("Fallback frame count | %d", len(fallback_frames)) timing["wall"] = time.perf_counter() - wall_t0 _log_timing_summary("split_video_stable", timing, wall_time=timing["wall"]) return [fallback_frames] if fallback_frames else [] @st.cache_resource(show_spinner=False) def _load_model(): model_t0 = time.perf_counter() clf = Classifier(format="onnx", conf=0.05, imgsz=MODEL_IMGSZ) LOGGER.info("Model init timing | wall=%.3fs", time.perf_counter() - model_t0) LOGGER.info("Model config | imgsz=%d", MODEL_IMGSZ) return clf model = _load_model() def _resolve_video_path(video_input): if not video_input: return None if isinstance(video_input, str): return video_input if isinstance(video_input, dict): for key in ("name", "path", "data", "video"): value = video_input.get(key) if isinstance(value, str) and os.path.exists(value): return value if isinstance(video_input, (list, tuple)): for value in video_input: if isinstance(value, str) and os.path.exists(value): return value return None def _draw_detections(pil_img, preds, subtitle=None): img = pil_img.copy() draw = ImageDraw.Draw(img) width, height = img.size color = (255, 80, 0) preds = np.asarray(preds) for x1, y1, x2, y2, conf in preds: x1 = int(max(0.0, min(1.0, float(x1))) * width) y1 = int(max(0.0, min(1.0, float(y1))) * height) x2 = int(max(0.0, min(1.0, float(x2))) * width) y2 = int(max(0.0, min(1.0, float(y2))) * height) draw.rectangle([x1, y1, x2, y2], outline=color, width=3) draw.text((x1 + 4, y1 + 4), f"{conf:.2f}", fill=color) draw.text((6, 6), f"detections : {len(preds)}", fill=color) if subtitle: draw.text((6, 26), subtitle, fill=color) return img def _combine_predictions_per_split(frame_preds): n_frames = len(frame_preds) if n_frames == 0: return [] boxes = np.zeros((0, 5), dtype=np.float64) for bbox in frame_preds: if bbox.size > 0: boxes = np.vstack([boxes, bbox]) if boxes.size == 0: return [] main_bboxes = np.asarray(nms(boxes), dtype=np.float64) if main_bboxes.size == 0: return [] n_main = len(main_bboxes) matches_per_main = np.zeros(n_main, dtype=int) conf_max_per_main = np.zeros(n_main, dtype=np.float64) matched_conf_values_per_main = [[] for _ in range(n_main)] matched_frame_indices_per_main = [[] for _ in range(n_main)] first_match_frame_idx_per_main = [None for _ in range(n_main)] first_match_bbox_per_main = [None for _ in range(n_main)] for frame_idx, bbox in enumerate(frame_preds): if bbox.size == 0: continue ious = box_iou(bbox[:, :4], main_bboxes[:, :4]) match_mask = ious >= MAIN_DET_MATCH_IOU_THRESHOLD has_match = match_mask.any(axis=1) matches_per_main += has_match.astype(int) if np.any(has_match): # Keep only one bbox per frame for each main bbox (best IoU among matches). masked_ious = np.where(match_mask, ious, -1.0) best_idx_per_main = np.argmax(masked_ious, axis=1) best_conf_per_main = bbox[best_idx_per_main, 4].astype(np.float64) matched_conf = np.where(has_match, best_conf_per_main, 0.0) conf_max_per_main = np.maximum(conf_max_per_main, matched_conf) for main_idx in np.flatnonzero(has_match): matched_conf_values_per_main[main_idx].append(float(best_conf_per_main[main_idx])) matched_frame_indices_per_main[main_idx].append(int(frame_idx)) if first_match_frame_idx_per_main[main_idx] is None: first_match_frame_idx_per_main[main_idx] = int(frame_idx) first_match_bbox_per_main[main_idx] = np.asarray( bbox[int(best_idx_per_main[main_idx])], dtype=np.float64 ).copy() required_matches = max(MIN_MAIN_MATCH_ABS, int(np.ceil(float(MIN_MAIN_MATCH_RATIO) * n_frames))) keep_main = matches_per_main >= required_matches if not np.any(keep_main): return [] kept = [] for idx in np.flatnonzero(keep_main): match_count = int(matches_per_main[idx]) matched_conf_values = matched_conf_values_per_main[idx] median_conf = ( float(np.median(np.asarray(matched_conf_values, dtype=np.float64))) if matched_conf_values else 0.0 ) if median_conf < MIN_COMBINED_MEDIAN_CONF: LOGGER.info( ( "Combine drop candidate | matches=%d/%d (required=%d) | " "median_conf=%.2f < min_combined_median_conf=%.2f" ), match_count, n_frames, required_matches, median_conf, MIN_COMBINED_MEDIAN_CONF, ) continue kept.append( { "box": main_bboxes[idx], "match_count": match_count, "n_frames": int(n_frames), "required_matches": int(required_matches), "match_ratio": float(match_count / max(n_frames, 1)), "median_conf": median_conf, "max_conf": float(conf_max_per_main[idx]), "matched_conf_values": matched_conf_values, "matched_frame_indices": matched_frame_indices_per_main[idx], "first_match_frame_idx": first_match_frame_idx_per_main[idx], "first_match_bbox": first_match_bbox_per_main[idx], } ) return kept def infer(video_file): timing = {} wall_t0 = time.perf_counter() with timer("resolve_video_path", timing): video_path = _resolve_video_path(video_file) LOGGER.info("Inference start | video=%s", video_path) LOGGER.info( ( "Inference config | batch_size=%d motion_segmentation=%s fast_n_samples=%d " "max_infer_frames_per_split=%d min_main_match_abs=%d min_main_match_ratio=%.2f " "main_det_match_iou_threshold=%.2f min_combined_median_conf=%.2f " "display_det_match_iou_threshold=%.2f" ), INFER_BATCH_SIZE, ENABLE_MOTION_SEGMENTATION, FAST_N_SAMPLES, MAX_INFER_FRAMES_PER_SPLIT, MIN_MAIN_MATCH_ABS, MIN_MAIN_MATCH_RATIO, MAIN_DET_MATCH_IOU_THRESHOLD, MIN_COMBINED_MEDIAN_CONF, DISPLAY_DET_MATCH_IOU_THRESHOLD, ) with timer("prepare_splits", timing): if ENABLE_MOTION_SEGMENTATION: split_frames = split_video_stable(video_path) else: fast_frames = split_video(video_path, n=FAST_N_SAMPLES) split_frames = [fast_frames] if fast_frames else [] total_frames = sum(len(frames) for frames in split_frames) LOGGER.info("Inference workload | splits=%d total_frames=%d", len(split_frames), total_frames) if not split_frames: LOGGER.info("Inference stop | no frames available") timing["wall"] = time.perf_counter() - wall_t0 _log_timing_summary("Inference", timing, wall_time=timing["wall"]) return {"detections": [], "all_frame_predictions": []} outputs = [] all_frame_predictions = [] infer_model = 0.0 combine_time = 0.0 iou_time = 0.0 draw_time = 0.0 draw_all_frames_time = 0.0 split_loop_time = 0.0 for split_idx, frames in enumerate(split_frames): split_t0 = time.perf_counter() original_len = len(frames) if MAX_INFER_FRAMES_PER_SPLIT > 0 and original_len > MAX_INFER_FRAMES_PER_SPLIT: frames_for_infer = _sample_uniform_items(frames, MAX_INFER_FRAMES_PER_SPLIT) else: frames_for_infer = frames LOGGER.info( "Inference split %d | frames=%d used_for_infer=%d", split_idx + 1, original_len, len(frames_for_infer), ) t_model = time.perf_counter() if hasattr(model, "infer_batch"): frame_preds = model.infer_batch(frames_for_infer, batch_size=INFER_BATCH_SIZE) else: frame_preds = [model(frame) for frame in frames_for_infer] frame_preds = [np.asarray(bbox, dtype=np.float64).reshape(-1, 5) for bbox in frame_preds] for frame_idx, bbox in enumerate(frame_preds): if bbox.size == 0: LOGGER.info( "Inference split %d frame %d | detections=0", split_idx + 1, frame_idx + 1, ) continue confs = bbox[:, 4].astype(np.float64) conf_list_txt = ", ".join(f"{float(c):.2f}" for c in confs.tolist()) LOGGER.info( ( "Inference split %d frame %d | detections=%d | confs=[%s] | " "frame_max_conf=%.2f | frame_mean_conf_all_bboxes=%.2f" ), split_idx + 1, frame_idx + 1, len(bbox), conf_list_txt, float(np.max(confs)), float(np.mean(confs)), ) for frame_idx, (frame, bbox) in enumerate(zip(frames_for_infer, frame_preds)): subtitle = f"segment {split_idx + 1} / frame {frame_idx + 1}" t_draw_all = time.perf_counter() all_frame_predictions.append( { "image": _draw_detections(frame, bbox, subtitle=subtitle), "caption": f"Segment {split_idx + 1} - Frame {frame_idx + 1}", } ) draw_all_frames_time += time.perf_counter() - t_draw_all split_model = time.perf_counter() - t_model infer_model += split_model split_iou = 0.0 split_draw = 0.0 t_combine = time.perf_counter() kept_main = _combine_predictions_per_split(frame_preds) dt_combine = time.perf_counter() - t_combine combine_time += dt_combine LOGGER.info( "Inference split %d | combined_detections=%d", split_idx + 1, len(kept_main), ) for det_idx, det_info in enumerate(kept_main): conf_values_txt = ", ".join(f"{float(c):.2f}" for c in det_info["matched_conf_values"]) frame_indices_txt = ", ".join(str(int(i) + 1) for i in det_info["matched_frame_indices"]) LOGGER.info( ( "Inference split %d combined detection %d | matches=%d/%d " "(required=%d, ratio=%.2f) | combine_median_conf=%.2f | combine_max_conf=%.2f | " "matched_frames=[%s] | matched_confs=[%s]" ), split_idx + 1, det_idx + 1, det_info["match_count"], det_info["n_frames"], det_info["required_matches"], det_info["match_ratio"], det_info["median_conf"], det_info["max_conf"], frame_indices_txt, conf_values_txt, ) if not kept_main: split_elapsed = time.perf_counter() - split_t0 split_loop_time += split_elapsed LOGGER.info( ( "Inference split %d timing | total=%.3fs | model=%.3fs | combine=%.3fs | " "iou=%.3fs | draw=%.3fs | avg_model_ms=%.1f" ), split_idx + 1, split_elapsed, split_model, dt_combine, split_iou, split_draw, (1000.0 * split_model / max(len(frames_for_infer), 1)), ) continue for det_idx, det_info in enumerate(kept_main): main_box = det_info["box"] selected_frame_idx = None selected_bbox = None selection_source = None # Prefer the earliest frame that overlaps the combined detection, using a relaxed # threshold for display (so we show the first visible appearance of the event). for frame_idx, bbox in enumerate(frame_preds): if bbox.size == 0: continue t_iou = time.perf_counter() ious = box_iou(bbox[:, :4], main_box[:4].reshape(1, 4)) dt_iou = time.perf_counter() - t_iou split_iou += dt_iou iou_time += dt_iou if (ious > DISPLAY_DET_MATCH_IOU_THRESHOLD).any(): match_idx = int(np.argmax(ious[0])) selected_frame_idx = int(frame_idx) selected_bbox = np.asarray(bbox[match_idx], dtype=np.float64).reshape(1, 5) selection_source = "display_first_overlap" break first_match_frame_idx = det_info.get("first_match_frame_idx") first_match_bbox = det_info.get("first_match_bbox") if selected_frame_idx is None or selected_bbox is None: if ( first_match_frame_idx is None or first_match_bbox is None or int(first_match_frame_idx) < 0 or int(first_match_frame_idx) >= len(frames_for_infer) ): LOGGER.warning( "Inference split %d detection %d | missing display frame and first matched frame/bbox", split_idx + 1, det_idx + 1, ) continue selected_frame_idx = int(first_match_frame_idx) selected_bbox = np.asarray(first_match_bbox, dtype=np.float64).reshape(1, 5) selection_source = "combine_first_match_fallback" frame = frames_for_infer[selected_frame_idx] LOGGER.info( ( "Inference split %d detection %d | selected_frame=%d | source=%s | " "selected frame_conf=%.2f | combine_median_conf=%.2f | combine_max_conf=%.2f" ), split_idx + 1, det_idx + 1, selected_frame_idx + 1, selection_source, float(selected_bbox[0, 4]), det_info["median_conf"], det_info["max_conf"], ) subtitle = ( f"segment {split_idx + 1} / detection {det_idx + 1} | " f"frame {selected_frame_idx + 1} | " f"matchs {det_info['match_count']}/{det_info['n_frames']} | " f"conf_med {det_info['median_conf']:.2f}" ) t_draw = time.perf_counter() outputs.append(_draw_detections(frame, selected_bbox, subtitle=subtitle)) dt_draw = time.perf_counter() - t_draw split_draw += dt_draw draw_time += dt_draw split_elapsed = time.perf_counter() - split_t0 split_loop_time += split_elapsed LOGGER.info( ( "Inference split %d timing | total=%.3fs | model=%.3fs | combine=%.3fs | " "iou=%.3fs | draw=%.3fs | avg_model_ms=%.1f" ), split_idx + 1, split_elapsed, split_model, dt_combine, split_iou, split_draw, (1000.0 * split_model / max(len(frames_for_infer), 1)), ) timing["split_loop"] = split_loop_time timing["model_infer"] = infer_model timing["combine_predictions"] = combine_time timing["iou_matching"] = iou_time timing["draw_detections"] = draw_time timing["draw_all_frame_predictions"] = draw_all_frames_time timing["wall"] = time.perf_counter() - wall_t0 _log_timing_summary("Inference", timing, wall_time=timing["wall"]) LOGGER.info( "Inference done | output_images=%d all_frame_prediction_images=%d", len(outputs), len(all_frame_predictions), ) return {"detections": outputs, "all_frame_predictions": all_frame_predictions} def _upload_signature(uploaded_file): buffer = uploaded_file.getbuffer() size = uploaded_file.size if uploaded_file.size is not None else len(buffer) digest = sha1(buffer).hexdigest() return (uploaded_file.name or "uploaded.mp4", int(size), digest) def _write_uploaded_video(uploaded_file): ext = os.path.splitext(uploaded_file.name or "")[1] or ".mp4" with tempfile.NamedTemporaryFile(prefix="upload_", suffix=ext, delete=False) as tmp: tmp.write(uploaded_file.getbuffer()) return tmp.name def _render_outputs(outputs): detections = outputs all_frame_predictions = [] if isinstance(outputs, dict): detections = outputs.get("detections", []) all_frame_predictions = outputs.get("all_frame_predictions", []) if not detections: st.warning("Aucune detection d'incendie trouvee dans cette video.") else: st.subheader("Incendies detectes") columns = st.columns(2) for idx, image in enumerate(detections): columns[idx % 2].image(image, caption=f"Detection {idx + 1}", use_container_width=True) # if all_frame_predictions: # with st.expander( # f"Predictions sur toutes les frames echantillonnees ({len(all_frame_predictions)})", # expanded=False, # ): # columns = st.columns(2) # for idx, item in enumerate(all_frame_predictions): # image = item["image"] if isinstance(item, dict) else item # caption = ( # item.get("caption", f"Frame {idx + 1}") # if isinstance(item, dict) # else f"Frame {idx + 1}" # ) # columns[idx % 2].image(image, caption=caption, use_container_width=True) def main(): st.set_page_config(page_title="Detection d'incendies Pyronear", layout="wide") st.image(PYRONEAR_LOGO_URL, width=220) st.title("Detection d'incendies Pyronear") st.write("Televersez un MP4 pour lancer la detection automatiquement.") uploaded = st.file_uploader("Televerser un MP4", type=["mp4"]) if uploaded is None: st.info("En attente du televersement d'une video.") return signature = _upload_signature(uploaded) previous_signature = st.session_state.get("upload_signature") if signature != previous_signature: temp_path = None st.session_state["upload_signature"] = signature with st.spinner("Detection d'incendies en cours..."): try: temp_path = _write_uploaded_video(uploaded) st.session_state["output_images"] = infer(temp_path) st.session_state["inference_error"] = None except Exception as exc: LOGGER.exception("Inference failed") st.session_state["output_images"] = [] st.session_state["inference_error"] = str(exc) finally: if temp_path and os.path.exists(temp_path): os.remove(temp_path) if st.session_state.get("inference_error"): st.error(f"Echec de la detection : {st.session_state['inference_error']}") return _render_outputs(st.session_state.get("output_images", [])) if __name__ == "__main__": main()