import os import time import threading import queue import cv2 import numpy as np from PIL import Image import streamlit as st from ultralytics import YOLO import torch # add_script_run_ctx peut ne pas exister selon la version de Streamlit ; # on le rend optionnel pour éviter un crash au démarrage. try: from streamlit.runtime.scriptrunner import add_script_run_ctx except Exception: # pragma: no cover def add_script_run_ctx(_t): # fallback no-op return _t # ========================= # === FONCTIONS UTILES === # ========================= def draw_text_with_background( image, text, position, font=cv2.FONT_HERSHEY_SIMPLEX, font_scale=1, font_thickness=2, text_color=(255, 255, 255), bg_color=(0, 0, 0), padding=5, ): """Ajoute du texte avec un fond sur une image OpenCV.""" text_size, _ = cv2.getTextSize(text, font, font_scale, font_thickness) text_width, text_height = text_size x, y = position # Sécuriser les bornes d'affichage tl_x = max(0, x) tl_y = max(0, y - text_height - padding) br_x = min(image.shape[1] - 1, x + text_width + padding * 2) br_y = min(image.shape[0] - 1, y + padding) cv2.rectangle(image, (tl_x, tl_y), (br_x, br_y), bg_color, -1) cv2.putText( image, text, (tl_x + padding, min(y, image.shape[0] - 1)), font, font_scale, text_color, font_thickness, cv2.LINE_AA, ) def check_camera_availability(max_idx=10): """Diagnostic rapide pour lister des webcams locales disponibles.""" available = [] for i in range(max_idx): cap = cv2.VideoCapture(i) if cap.isOpened(): ret, _ = cap.read() if ret: available.append(i) cap.release() return available def _alpha_fill_poly(base_img, pts, color_bgr=(0, 255, 0), alpha=0.25, thickness=2): """ Dessine un polygone 'transparent' en copiant sur overlay puis en blend. OpenCV ne supporte pas l'alpha directement dans cv2.fillPoly. """ overlay = base_img.copy() pts_np = np.array(pts, np.int32) cv2.fillPoly(overlay, [pts_np], color_bgr) cv2.addWeighted(overlay, alpha, base_img, 1 - alpha, 0, dst=base_img) cv2.polylines(base_img, [pts_np], isClosed=True, color=color_bgr, thickness=thickness) def preview_polygons(poly1, poly2): """Crée une prévisualisation des polygones sur une image noire.""" preview = np.zeros((640, 1200, 3), dtype=np.uint8) # Zone 1 (vert) if len(poly1) >= 3: _alpha_fill_poly(preview, poly1, (0, 200, 0), alpha=0.25) for i, pt in enumerate(poly1): cv2.circle(preview, pt, 5, (255, 255, 255), -1) draw_text_with_background( preview, f"P1-{i+1}: {pt}", (pt[0] + 10, pt[1]), font_scale=0.5, bg_color=(0, 100, 0) ) # Zone 2 (rouge) if len(poly2) >= 3: _alpha_fill_poly(preview, poly2, (0, 0, 200), alpha=0.25) for i, pt in enumerate(poly2): cv2.circle(preview, pt, 5, (255, 255, 255), -1) draw_text_with_background( preview, f"P2-{i+1}: {pt}", (pt[0] + 10, pt[1]), font_scale=0.5, bg_color=(100, 0, 0) ) draw_text_with_background(preview, "Zone 1 (Vert)", (10, 30), font_scale=0.7, bg_color=(0, 100, 0)) draw_text_with_background(preview, "Zone 2 (Rouge)", (10, 60), font_scale=0.7, bg_color=(100, 0, 0)) # Grille grid_spacing = 100 grid_color = (50, 50, 50) for x in range(0, preview.shape[1], grid_spacing): cv2.line(preview, (x, 0), (x, preview.shape[0]), grid_color, 1) draw_text_with_background(preview, str(x), (x, 20), font_scale=0.5, bg_color=(30, 30, 30)) for y in range(0, preview.shape[0], grid_spacing): cv2.line(preview, (0, y), (preview.shape[1], y), grid_color, 1) draw_text_with_background(preview, str(y), (5, y), font_scale=0.5, bg_color=(30, 30, 30)) return preview # ================================ # === CLASSE TRAITEMENT YOLOv8 === # ================================ class YOLOVideoProcessor: def __init__(self, model_path, poly1, poly2, tracker_method="bot"): # Device self.device = "cuda" if torch.cuda.is_available() else "cpu" # Paramètres par défaut (écrasés par l'UI ensuite) self.frame_skip = 2 self.downsample_factor = 0.5 self.img_size = 640 self.conf_threshold = 0.35 # Chargement modèle self.model = YOLO(model_path) # 'task' n'est pas requis self.model.to(self.device) # Tracking self.tracker_method = tracker_method self.tracker_config = "botsort.yaml" if tracker_method.lower() == "bot" else "bytetrack.yaml" # Compteurs d'IDs uniques par zone self.unique_region1_ids = set() self.unique_region2_ids = set() # Polygones self.poly1 = poly1 self.poly2 = poly2 # Threads/queues self.stop_processing = False self.frame_queue = queue.Queue(maxsize=1) self.result_queue = queue.Queue(maxsize=1) @staticmethod def is_in_region(center, poly): poly_np = np.array(poly, dtype=np.int32) return cv2.pointPolygonTest(poly_np, center, False) >= 0 def reset_counts(self): self.unique_region1_ids.clear() self.unique_region2_ids.clear() def process_frame(self, frame): if frame is None: return None # Downscale contrôlé orig_h, orig_w = frame.shape[:2] resized_w = orig_w resized_h = orig_h if self.downsample_factor < 1.0: resized_w = max(1, int(orig_w * self.downsample_factor)) resized_h = max(1, int(orig_h * self.downsample_factor)) resized_frame = cv2.resize(frame, (resized_w, resized_h), interpolation=cv2.INTER_AREA) else: resized_frame = frame # Inference + tracking with torch.no_grad(): results = self.model.track( resized_frame, persist=True, tracker=self.tracker_config, conf=self.conf_threshold, imgsz=self.img_size, device=self.device, ) display = frame.copy() # Dessiner polygones (transparent) _alpha_fill_poly(display, self.poly1, (0, 200, 0), alpha=0.2, thickness=2) _alpha_fill_poly(display, self.poly2, (0, 0, 200), alpha=0.2, thickness=2) # Mise à l'échelle des boxes vers la taille originale sx = orig_w / float(resized_w) sy = orig_h / float(resized_h) if results and len(results) > 0 and getattr(results[0], "boxes", None) is not None: try: boxes_xywh = results[0].boxes.xywh.cpu().numpy() # track ids peuvent être None sur la première frame ids_tensor = results[0].boxes.id track_ids = ids_tensor.int().cpu().tolist() if ids_tensor is not None else [None] * len(boxes_xywh) for (x, y, w, h), tid in zip(boxes_xywh, track_ids): # centre + bbox rescalés cx = int(x * sx) cy = int(y * sy) ww = int(w * sx) hh = int(h * sy) # Comptage par centre de la bbox if tid is not None: if self.is_in_region((cx, cy), self.poly1): self.unique_region1_ids.add(tid) if self.is_in_region((cx, cy), self.poly2): self.unique_region2_ids.add(tid) # Dessin bbox tl = (max(0, cx - ww // 2), max(0, cy - hh // 2)) br = (min(display.shape[1] - 1, cx + ww // 2), min(display.shape[0] - 1, cy + hh // 2)) cv2.rectangle(display, tl, br, (0, 255, 0), 2) except Exception as e: # On ne casse pas l'affichage si une frame pose problème draw_text_with_background(display, f"Tracking error: {e}", (10, 60), bg_color=(80, 0, 0)) # Affichage compteurs h, w = display.shape[:2] draw_text_with_background(display, f"Total Sens 1: {len(self.unique_region1_ids)}", (10, h - 50)) draw_text_with_background(display, f"Total Sens 2: {len(self.unique_region2_ids)}", (w - 300, h - 50)) return display def process_webcam_frames(self): """Thread de traitement : lit les frames de frame_queue, pousse résultats dans result_queue.""" while not self.stop_processing: try: frame = self.frame_queue.get(timeout=0.5) except queue.Empty: continue start = time.time() processed = self.process_frame(frame) fps = 1.0 / max(1e-6, (time.time() - start)) if processed is not None: draw_text_with_background(processed, f"FPS: {fps:.1f}", (10, 30)) # Remplacer l'ancien résultat si plein try: if self.result_queue.full(): _ = self.result_queue.get_nowait() self.result_queue.put_nowait( (processed, len(self.unique_region1_ids), len(self.unique_region2_ids)) ) except queue.Full: pass finally: self.frame_queue.task_done() def process_webcam(self, camera_id=0, display_placeholder=None, count_placeholders=None): """Capture en direct avec multi-threading et rendu dans Streamlit.""" cap = None backends = [ cv2.CAP_ANY, getattr(cv2, "CAP_DSHOW", cv2.CAP_ANY), getattr(cv2, "CAP_MSMF", cv2.CAP_ANY), getattr(cv2, "CAP_V4L2", cv2.CAP_ANY), getattr(cv2, "CAP_AVFOUNDATION", cv2.CAP_ANY), ] # Ouverture for backend in backends: try: cap = cv2.VideoCapture(camera_id, backend) if cap.isOpened(): if display_placeholder: display_placeholder.success(f"✅ Webcam connectée (backend: {backend})") break except Exception as e: if display_placeholder: display_placeholder.warning(f"Backend {backend} échec: {e}") if cap is None or not cap.isOpened(): # Dernière chance sans backend explicite cap = cv2.VideoCapture(camera_id) if not cap.isOpened(): if display_placeholder: display_placeholder.error("⚠️ Impossible d'ouvrir la source vidéo.") return # Paramètres caméra (best-effort) try: cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640) cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480) cap.set(cv2.CAP_PROP_FPS, 30) except Exception: if display_placeholder: display_placeholder.warning("⚠️ Impossible de régler certains paramètres caméra.") # Reset état session self.reset_counts() self.stop_processing = False # Thread de traitement t = threading.Thread(target=self.process_webcam_frames, daemon=True) try: add_script_run_ctx(t) except Exception: pass t.start() # Premier frame pour valider time.sleep(0.3) ok, first = cap.read() if not ok: if display_placeholder: display_placeholder.error("⚠️ Lecture impossible depuis la webcam (permissions ?).") self.stop_processing = True cap.release() t.join(timeout=1.0) return if display_placeholder is not None: display_placeholder.image(cv2.cvtColor(first, cv2.COLOR_BGR2RGB), channels="RGB", use_column_width=True, caption="Webcam connectée !") ui_update_interval = 0.03 # ~30 FPS last_ui = 0.0 frame_idx = 0 try: while not self.stop_processing: ok, frame = cap.read() if not ok: time.sleep(0.05) continue # Envoi au thread de traitement (skip pour alléger) if frame_idx % self.frame_skip == 0: try: if self.frame_queue.full(): _ = self.frame_queue.get_nowait() self.frame_queue.task_done() self.frame_queue.put_nowait(frame) except queue.Full: pass # Affichage si résultat dispo now = time.time() if now - last_ui >= ui_update_interval: try: processed, c1, c2 = self.result_queue.get_nowait() if processed is not None and display_placeholder is not None: rgb = cv2.cvtColor(processed, cv2.COLOR_BGR2RGB) display_placeholder.image(Image.fromarray(rgb), channels="RGB", use_column_width=True) if count_placeholders and len(count_placeholders) >= 2: count_placeholders[0].metric("Véhicules Sens 1 (Vert)", c1) count_placeholders[1].metric("Véhicules Sens 2 (Rouge)", c2) except queue.Empty: pass last_ui = now frame_idx += 1 time.sleep(0.001) except Exception as e: if display_placeholder: display_placeholder.error(f"Erreur boucle principale: {e}") finally: self.stop_processing = True cap.release() t.join(timeout=1.0) if display_placeholder: display_placeholder.success("✅ Flux vidéo arrêté.") # ========================== # === INTERFACE STREAMLIT === # ========================== def main(): st.set_page_config( page_title="Détecteur de Véhicules en Temps Réel", page_icon="🚗", layout="wide", menu_items={"About": "Détection de véhicules avec YOLOv8"}, ) st.title("🚗 Détection et comptage de Véhicules en Temps Réel") # Session state st.session_state.setdefault("webcam_active", False) st.session_state.setdefault("processor", None) st.session_state.setdefault("processing_thread", None) # Chargement du modèle model_path = "best.pt" if not os.path.exists(model_path): with st.spinner("📥 Téléchargement du modèle YOLO…"): try: from huggingface_hub import hf_hub_download model_path = hf_hub_download( repo_id="ModuMLTECH/projet_comptage_avance", filename="best.pt", ) st.success("✅ Modèle chargé depuis Hugging Face Hub.") except Exception as e: st.error(f"❌ Erreur chargement modèle: {e}") st.warning("⚠️ Fallback sur un modèle YOLO public (yolov8n.pt).") model_path = "yolov8n.pt" # === SIDEBAR === with st.sidebar: st.header("🔹 Paramètres") st.subheader("📍 Polygone 1 (vert)") poly1_input = st.text_area("Entrez 4 points (x,y) séparés par des espaces", "465,350 609,350 520,630 3,630") st.subheader("📍 Polygone 2 (rouge)") poly2_input = st.text_area("Entrez 4 points (x,y) séparés par des espaces", "678,350 815,350 1203,630 743,630") tracker_method = st.selectbox("Méthode de tracking", ["bot", "byte"], index=0) st.subheader("🚀 Optimisation") frame_skip = st.slider("Skip de frames", 1, 5, 2) downsample = st.slider("Facteur d'échelle", 0.3, 1.0, 0.5, 0.1) conf_threshold = st.slider("Seuil de confiance", 0.1, 0.9, 0.35, 0.05) st.subheader("💻 Système") device_info = f"GPU: {'Disponible' if torch.cuda.is_available() else 'Non disponible'}" if torch.cuda.is_available(): device_info += f" ({torch.cuda.get_device_name(0)})" st.info(device_info) def parse_polygon(txt): try: pts = [] for token in txt.replace(";", " ").split(): x, y = token.split(",") pts.append((int(x), int(y))) return pts except Exception: return [] poly1 = parse_polygon(poly1_input) poly2 = parse_polygon(poly2_input) valid_polygons = len(poly1) == 4 and len(poly2) == 4 # Prévisualisation st.header("🖼️ Prévisualisation des masques") if valid_polygons: prev = preview_polygons(poly1, poly2) st.image(cv2.cvtColor(prev, cv2.COLOR_BGR2RGB), use_column_width=True, caption="Masques de détection") st.success("✅ Polygones valides (4 points chacun).") else: st.warning("⚠️ Définissez deux polygones valides de 4 points chacun.") # Section webcam st.header("Détection en Temps Réel avec Webcam") available = check_camera_availability() if not available: st.warning("⚠️ Aucune caméra locale détectée (vous pouvez tester une caméra IP).") else: st.success(f"✅ Caméras détectées: {available}") camera_options = {"Webcam par défaut (0)": 0} for i in range(1, 8): camera_options[f"Caméra alternative ({i})"] = i camera_options["Caméra IP (entrez l'URL)"] = "ip" selected = st.selectbox("Source vidéo", list(camera_options.keys())) if camera_options[selected] == "ip": camera_id = st.text_input("URL RTSP/HTTP", "http://adresse-ip:port/video") else: camera_id = camera_options[selected] display_quality = st.select_slider("Qualité d'affichage", options=["Basse", "Moyenne", "Haute"], value="Moyenne") # (placeholder pour gérer des resize/qualité plus tard si besoin) video_container = st.container() video_placeholder = video_container.empty() col1, col2 = st.columns(2) count_placeholders = [col1.empty(), col2.empty()] st.info("ℹ️ Optimisations: multi-threading, resize adaptatif, CUDA si dispo.") col_start, col_stop = st.columns(2) if col_start.button("▶️ Démarrer la détection"): if not valid_polygons: st.error("❌ Les polygones doivent avoir exactement 4 points chacun.") elif st.session_state.webcam_active: st.warning("⚠️ La webcam est déjà active.") else: video_placeholder.info("🔄 Connexion à la source vidéo…") processor = YOLOVideoProcessor(model_path, poly1, poly2, tracker_method) processor.frame_skip = frame_skip processor.downsample_factor = downsample processor.conf_threshold = conf_threshold st.session_state.processor = processor st.session_state.webcam_active = True thread = threading.Thread( target=st.session_state.processor.process_webcam, args=(camera_id, video_placeholder, count_placeholders), daemon=True, ) try: add_script_run_ctx(thread) except Exception: pass try: thread.start() st.session_state.processing_thread = thread except Exception as e: st.error(f"Erreur au démarrage du thread: {e}") st.session_state.webcam_active = False if col_stop.button("⏹️ Arrêter"): if st.session_state.webcam_active and st.session_state.processor: st.session_state.processor.stop_processing = True st.session_state.webcam_active = False if st.session_state.processing_thread: st.session_state.processing_thread.join(timeout=2.0) st.session_state.processing_thread = None time.sleep(0.3) video_placeholder.empty() for ph in count_placeholders: ph.empty() else: st.warning("⚠️ Aucune détection en cours.") if __name__ == "__main__": main()