Spaces:
Sleeping
Sleeping
| import os | |
| import time | |
| import threading | |
| import queue | |
| import cv2 | |
| import numpy as np | |
| from PIL import Image | |
| import streamlit as st | |
| from ultralytics import YOLO | |
| import torch | |
| # add_script_run_ctx peut ne pas exister selon la version de Streamlit ; | |
| # on le rend optionnel pour éviter un crash au démarrage. | |
| try: | |
| from streamlit.runtime.scriptrunner import add_script_run_ctx | |
| except Exception: # pragma: no cover | |
| def add_script_run_ctx(_t): # fallback no-op | |
| return _t | |
| # ========================= | |
| # === FONCTIONS UTILES === | |
| # ========================= | |
| def draw_text_with_background( | |
| image, | |
| text, | |
| position, | |
| font=cv2.FONT_HERSHEY_SIMPLEX, | |
| font_scale=1, | |
| font_thickness=2, | |
| text_color=(255, 255, 255), | |
| bg_color=(0, 0, 0), | |
| padding=5, | |
| ): | |
| """Ajoute du texte avec un fond sur une image OpenCV.""" | |
| text_size, _ = cv2.getTextSize(text, font, font_scale, font_thickness) | |
| text_width, text_height = text_size | |
| x, y = position | |
| # Sécuriser les bornes d'affichage | |
| tl_x = max(0, x) | |
| tl_y = max(0, y - text_height - padding) | |
| br_x = min(image.shape[1] - 1, x + text_width + padding * 2) | |
| br_y = min(image.shape[0] - 1, y + padding) | |
| cv2.rectangle(image, (tl_x, tl_y), (br_x, br_y), bg_color, -1) | |
| cv2.putText( | |
| image, | |
| text, | |
| (tl_x + padding, min(y, image.shape[0] - 1)), | |
| font, | |
| font_scale, | |
| text_color, | |
| font_thickness, | |
| cv2.LINE_AA, | |
| ) | |
| def check_camera_availability(max_idx=10): | |
| """Diagnostic rapide pour lister des webcams locales disponibles.""" | |
| available = [] | |
| for i in range(max_idx): | |
| cap = cv2.VideoCapture(i) | |
| if cap.isOpened(): | |
| ret, _ = cap.read() | |
| if ret: | |
| available.append(i) | |
| cap.release() | |
| return available | |
| def _alpha_fill_poly(base_img, pts, color_bgr=(0, 255, 0), alpha=0.25, thickness=2): | |
| """ | |
| Dessine un polygone 'transparent' en copiant sur overlay puis en blend. | |
| OpenCV ne supporte pas l'alpha directement dans cv2.fillPoly. | |
| """ | |
| overlay = base_img.copy() | |
| pts_np = np.array(pts, np.int32) | |
| cv2.fillPoly(overlay, [pts_np], color_bgr) | |
| cv2.addWeighted(overlay, alpha, base_img, 1 - alpha, 0, dst=base_img) | |
| cv2.polylines(base_img, [pts_np], isClosed=True, color=color_bgr, thickness=thickness) | |
| def preview_polygons(poly1, poly2): | |
| """Crée une prévisualisation des polygones sur une image noire.""" | |
| preview = np.zeros((640, 1200, 3), dtype=np.uint8) | |
| # Zone 1 (vert) | |
| if len(poly1) >= 3: | |
| _alpha_fill_poly(preview, poly1, (0, 200, 0), alpha=0.25) | |
| for i, pt in enumerate(poly1): | |
| cv2.circle(preview, pt, 5, (255, 255, 255), -1) | |
| draw_text_with_background( | |
| preview, f"P1-{i+1}: {pt}", (pt[0] + 10, pt[1]), font_scale=0.5, bg_color=(0, 100, 0) | |
| ) | |
| # Zone 2 (rouge) | |
| if len(poly2) >= 3: | |
| _alpha_fill_poly(preview, poly2, (0, 0, 200), alpha=0.25) | |
| for i, pt in enumerate(poly2): | |
| cv2.circle(preview, pt, 5, (255, 255, 255), -1) | |
| draw_text_with_background( | |
| preview, f"P2-{i+1}: {pt}", (pt[0] + 10, pt[1]), font_scale=0.5, bg_color=(100, 0, 0) | |
| ) | |
| draw_text_with_background(preview, "Zone 1 (Vert)", (10, 30), font_scale=0.7, bg_color=(0, 100, 0)) | |
| draw_text_with_background(preview, "Zone 2 (Rouge)", (10, 60), font_scale=0.7, bg_color=(100, 0, 0)) | |
| # Grille | |
| grid_spacing = 100 | |
| grid_color = (50, 50, 50) | |
| for x in range(0, preview.shape[1], grid_spacing): | |
| cv2.line(preview, (x, 0), (x, preview.shape[0]), grid_color, 1) | |
| draw_text_with_background(preview, str(x), (x, 20), font_scale=0.5, bg_color=(30, 30, 30)) | |
| for y in range(0, preview.shape[0], grid_spacing): | |
| cv2.line(preview, (0, y), (preview.shape[1], y), grid_color, 1) | |
| draw_text_with_background(preview, str(y), (5, y), font_scale=0.5, bg_color=(30, 30, 30)) | |
| return preview | |
| # ================================ | |
| # === CLASSE TRAITEMENT YOLOv8 === | |
| # ================================ | |
| class YOLOVideoProcessor: | |
| def __init__(self, model_path, poly1, poly2, tracker_method="bot"): | |
| # Device | |
| self.device = "cuda" if torch.cuda.is_available() else "cpu" | |
| # Paramètres par défaut (écrasés par l'UI ensuite) | |
| self.frame_skip = 2 | |
| self.downsample_factor = 0.5 | |
| self.img_size = 640 | |
| self.conf_threshold = 0.35 | |
| # Chargement modèle | |
| self.model = YOLO(model_path) # 'task' n'est pas requis | |
| self.model.to(self.device) | |
| # Tracking | |
| self.tracker_method = tracker_method | |
| self.tracker_config = "botsort.yaml" if tracker_method.lower() == "bot" else "bytetrack.yaml" | |
| # Compteurs d'IDs uniques par zone | |
| self.unique_region1_ids = set() | |
| self.unique_region2_ids = set() | |
| # Polygones | |
| self.poly1 = poly1 | |
| self.poly2 = poly2 | |
| # Threads/queues | |
| self.stop_processing = False | |
| self.frame_queue = queue.Queue(maxsize=1) | |
| self.result_queue = queue.Queue(maxsize=1) | |
| def is_in_region(center, poly): | |
| poly_np = np.array(poly, dtype=np.int32) | |
| return cv2.pointPolygonTest(poly_np, center, False) >= 0 | |
| def reset_counts(self): | |
| self.unique_region1_ids.clear() | |
| self.unique_region2_ids.clear() | |
| def process_frame(self, frame): | |
| if frame is None: | |
| return None | |
| # Downscale contrôlé | |
| orig_h, orig_w = frame.shape[:2] | |
| resized_w = orig_w | |
| resized_h = orig_h | |
| if self.downsample_factor < 1.0: | |
| resized_w = max(1, int(orig_w * self.downsample_factor)) | |
| resized_h = max(1, int(orig_h * self.downsample_factor)) | |
| resized_frame = cv2.resize(frame, (resized_w, resized_h), interpolation=cv2.INTER_AREA) | |
| else: | |
| resized_frame = frame | |
| # Inference + tracking | |
| with torch.no_grad(): | |
| results = self.model.track( | |
| resized_frame, | |
| persist=True, | |
| tracker=self.tracker_config, | |
| conf=self.conf_threshold, | |
| imgsz=self.img_size, | |
| device=self.device, | |
| ) | |
| display = frame.copy() | |
| # Dessiner polygones (transparent) | |
| _alpha_fill_poly(display, self.poly1, (0, 200, 0), alpha=0.2, thickness=2) | |
| _alpha_fill_poly(display, self.poly2, (0, 0, 200), alpha=0.2, thickness=2) | |
| # Mise à l'échelle des boxes vers la taille originale | |
| sx = orig_w / float(resized_w) | |
| sy = orig_h / float(resized_h) | |
| if results and len(results) > 0 and getattr(results[0], "boxes", None) is not None: | |
| try: | |
| boxes_xywh = results[0].boxes.xywh.cpu().numpy() | |
| # track ids peuvent être None sur la première frame | |
| ids_tensor = results[0].boxes.id | |
| track_ids = ids_tensor.int().cpu().tolist() if ids_tensor is not None else [None] * len(boxes_xywh) | |
| for (x, y, w, h), tid in zip(boxes_xywh, track_ids): | |
| # centre + bbox rescalés | |
| cx = int(x * sx) | |
| cy = int(y * sy) | |
| ww = int(w * sx) | |
| hh = int(h * sy) | |
| # Comptage par centre de la bbox | |
| if tid is not None: | |
| if self.is_in_region((cx, cy), self.poly1): | |
| self.unique_region1_ids.add(tid) | |
| if self.is_in_region((cx, cy), self.poly2): | |
| self.unique_region2_ids.add(tid) | |
| # Dessin bbox | |
| tl = (max(0, cx - ww // 2), max(0, cy - hh // 2)) | |
| br = (min(display.shape[1] - 1, cx + ww // 2), min(display.shape[0] - 1, cy + hh // 2)) | |
| cv2.rectangle(display, tl, br, (0, 255, 0), 2) | |
| except Exception as e: | |
| # On ne casse pas l'affichage si une frame pose problème | |
| draw_text_with_background(display, f"Tracking error: {e}", (10, 60), bg_color=(80, 0, 0)) | |
| # Affichage compteurs | |
| h, w = display.shape[:2] | |
| draw_text_with_background(display, f"Total Sens 1: {len(self.unique_region1_ids)}", (10, h - 50)) | |
| draw_text_with_background(display, f"Total Sens 2: {len(self.unique_region2_ids)}", (w - 300, h - 50)) | |
| return display | |
| def process_webcam_frames(self): | |
| """Thread de traitement : lit les frames de frame_queue, pousse résultats dans result_queue.""" | |
| while not self.stop_processing: | |
| try: | |
| frame = self.frame_queue.get(timeout=0.5) | |
| except queue.Empty: | |
| continue | |
| start = time.time() | |
| processed = self.process_frame(frame) | |
| fps = 1.0 / max(1e-6, (time.time() - start)) | |
| if processed is not None: | |
| draw_text_with_background(processed, f"FPS: {fps:.1f}", (10, 30)) | |
| # Remplacer l'ancien résultat si plein | |
| try: | |
| if self.result_queue.full(): | |
| _ = self.result_queue.get_nowait() | |
| self.result_queue.put_nowait( | |
| (processed, len(self.unique_region1_ids), len(self.unique_region2_ids)) | |
| ) | |
| except queue.Full: | |
| pass | |
| finally: | |
| self.frame_queue.task_done() | |
| def process_webcam(self, camera_id=0, display_placeholder=None, count_placeholders=None): | |
| """Capture en direct avec multi-threading et rendu dans Streamlit.""" | |
| cap = None | |
| backends = [ | |
| cv2.CAP_ANY, | |
| getattr(cv2, "CAP_DSHOW", cv2.CAP_ANY), | |
| getattr(cv2, "CAP_MSMF", cv2.CAP_ANY), | |
| getattr(cv2, "CAP_V4L2", cv2.CAP_ANY), | |
| getattr(cv2, "CAP_AVFOUNDATION", cv2.CAP_ANY), | |
| ] | |
| # Ouverture | |
| for backend in backends: | |
| try: | |
| cap = cv2.VideoCapture(camera_id, backend) | |
| if cap.isOpened(): | |
| if display_placeholder: | |
| display_placeholder.success(f"✅ Webcam connectée (backend: {backend})") | |
| break | |
| except Exception as e: | |
| if display_placeholder: | |
| display_placeholder.warning(f"Backend {backend} échec: {e}") | |
| if cap is None or not cap.isOpened(): | |
| # Dernière chance sans backend explicite | |
| cap = cv2.VideoCapture(camera_id) | |
| if not cap.isOpened(): | |
| if display_placeholder: | |
| display_placeholder.error("⚠️ Impossible d'ouvrir la source vidéo.") | |
| return | |
| # Paramètres caméra (best-effort) | |
| try: | |
| cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640) | |
| cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480) | |
| cap.set(cv2.CAP_PROP_FPS, 30) | |
| except Exception: | |
| if display_placeholder: | |
| display_placeholder.warning("⚠️ Impossible de régler certains paramètres caméra.") | |
| # Reset état session | |
| self.reset_counts() | |
| self.stop_processing = False | |
| # Thread de traitement | |
| t = threading.Thread(target=self.process_webcam_frames, daemon=True) | |
| try: | |
| add_script_run_ctx(t) | |
| except Exception: | |
| pass | |
| t.start() | |
| # Premier frame pour valider | |
| time.sleep(0.3) | |
| ok, first = cap.read() | |
| if not ok: | |
| if display_placeholder: | |
| display_placeholder.error("⚠️ Lecture impossible depuis la webcam (permissions ?).") | |
| self.stop_processing = True | |
| cap.release() | |
| t.join(timeout=1.0) | |
| return | |
| if display_placeholder is not None: | |
| display_placeholder.image(cv2.cvtColor(first, cv2.COLOR_BGR2RGB), channels="RGB", use_column_width=True, | |
| caption="Webcam connectée !") | |
| ui_update_interval = 0.03 # ~30 FPS | |
| last_ui = 0.0 | |
| frame_idx = 0 | |
| try: | |
| while not self.stop_processing: | |
| ok, frame = cap.read() | |
| if not ok: | |
| time.sleep(0.05) | |
| continue | |
| # Envoi au thread de traitement (skip pour alléger) | |
| if frame_idx % self.frame_skip == 0: | |
| try: | |
| if self.frame_queue.full(): | |
| _ = self.frame_queue.get_nowait() | |
| self.frame_queue.task_done() | |
| self.frame_queue.put_nowait(frame) | |
| except queue.Full: | |
| pass | |
| # Affichage si résultat dispo | |
| now = time.time() | |
| if now - last_ui >= ui_update_interval: | |
| try: | |
| processed, c1, c2 = self.result_queue.get_nowait() | |
| if processed is not None and display_placeholder is not None: | |
| rgb = cv2.cvtColor(processed, cv2.COLOR_BGR2RGB) | |
| display_placeholder.image(Image.fromarray(rgb), channels="RGB", use_column_width=True) | |
| if count_placeholders and len(count_placeholders) >= 2: | |
| count_placeholders[0].metric("Véhicules Sens 1 (Vert)", c1) | |
| count_placeholders[1].metric("Véhicules Sens 2 (Rouge)", c2) | |
| except queue.Empty: | |
| pass | |
| last_ui = now | |
| frame_idx += 1 | |
| time.sleep(0.001) | |
| except Exception as e: | |
| if display_placeholder: | |
| display_placeholder.error(f"Erreur boucle principale: {e}") | |
| finally: | |
| self.stop_processing = True | |
| cap.release() | |
| t.join(timeout=1.0) | |
| if display_placeholder: | |
| display_placeholder.success("✅ Flux vidéo arrêté.") | |
| # ========================== | |
| # === INTERFACE STREAMLIT === | |
| # ========================== | |
| def main(): | |
| st.set_page_config( | |
| page_title="Détecteur de Véhicules en Temps Réel", | |
| page_icon="🚗", | |
| layout="wide", | |
| menu_items={"About": "Détection de véhicules avec YOLOv8"}, | |
| ) | |
| st.title("🚗 Détection et comptage de Véhicules en Temps Réel") | |
| # Session state | |
| st.session_state.setdefault("webcam_active", False) | |
| st.session_state.setdefault("processor", None) | |
| st.session_state.setdefault("processing_thread", None) | |
| # Chargement du modèle | |
| model_path = "best.pt" | |
| if not os.path.exists(model_path): | |
| with st.spinner("📥 Téléchargement du modèle YOLO…"): | |
| try: | |
| from huggingface_hub import hf_hub_download | |
| model_path = hf_hub_download( | |
| repo_id="ModuMLTECH/projet_comptage_avance", | |
| filename="best.pt", | |
| ) | |
| st.success("✅ Modèle chargé depuis Hugging Face Hub.") | |
| except Exception as e: | |
| st.error(f"❌ Erreur chargement modèle: {e}") | |
| st.warning("⚠️ Fallback sur un modèle YOLO public (yolov8n.pt).") | |
| model_path = "yolov8n.pt" | |
| # === SIDEBAR === | |
| with st.sidebar: | |
| st.header("🔹 Paramètres") | |
| st.subheader("📍 Polygone 1 (vert)") | |
| poly1_input = st.text_area("Entrez 4 points (x,y) séparés par des espaces", "465,350 609,350 520,630 3,630") | |
| st.subheader("📍 Polygone 2 (rouge)") | |
| poly2_input = st.text_area("Entrez 4 points (x,y) séparés par des espaces", "678,350 815,350 1203,630 743,630") | |
| tracker_method = st.selectbox("Méthode de tracking", ["bot", "byte"], index=0) | |
| st.subheader("🚀 Optimisation") | |
| frame_skip = st.slider("Skip de frames", 1, 5, 2) | |
| downsample = st.slider("Facteur d'échelle", 0.3, 1.0, 0.5, 0.1) | |
| conf_threshold = st.slider("Seuil de confiance", 0.1, 0.9, 0.35, 0.05) | |
| st.subheader("💻 Système") | |
| device_info = f"GPU: {'Disponible' if torch.cuda.is_available() else 'Non disponible'}" | |
| if torch.cuda.is_available(): | |
| device_info += f" ({torch.cuda.get_device_name(0)})" | |
| st.info(device_info) | |
| def parse_polygon(txt): | |
| try: | |
| pts = [] | |
| for token in txt.replace(";", " ").split(): | |
| x, y = token.split(",") | |
| pts.append((int(x), int(y))) | |
| return pts | |
| except Exception: | |
| return [] | |
| poly1 = parse_polygon(poly1_input) | |
| poly2 = parse_polygon(poly2_input) | |
| valid_polygons = len(poly1) == 4 and len(poly2) == 4 | |
| # Prévisualisation | |
| st.header("🖼️ Prévisualisation des masques") | |
| if valid_polygons: | |
| prev = preview_polygons(poly1, poly2) | |
| st.image(cv2.cvtColor(prev, cv2.COLOR_BGR2RGB), use_column_width=True, caption="Masques de détection") | |
| st.success("✅ Polygones valides (4 points chacun).") | |
| else: | |
| st.warning("⚠️ Définissez deux polygones valides de 4 points chacun.") | |
| # Section webcam | |
| st.header("Détection en Temps Réel avec Webcam") | |
| available = check_camera_availability() | |
| if not available: | |
| st.warning("⚠️ Aucune caméra locale détectée (vous pouvez tester une caméra IP).") | |
| else: | |
| st.success(f"✅ Caméras détectées: {available}") | |
| camera_options = {"Webcam par défaut (0)": 0} | |
| for i in range(1, 8): | |
| camera_options[f"Caméra alternative ({i})"] = i | |
| camera_options["Caméra IP (entrez l'URL)"] = "ip" | |
| selected = st.selectbox("Source vidéo", list(camera_options.keys())) | |
| if camera_options[selected] == "ip": | |
| camera_id = st.text_input("URL RTSP/HTTP", "http://adresse-ip:port/video") | |
| else: | |
| camera_id = camera_options[selected] | |
| display_quality = st.select_slider("Qualité d'affichage", options=["Basse", "Moyenne", "Haute"], value="Moyenne") | |
| # (placeholder pour gérer des resize/qualité plus tard si besoin) | |
| video_container = st.container() | |
| video_placeholder = video_container.empty() | |
| col1, col2 = st.columns(2) | |
| count_placeholders = [col1.empty(), col2.empty()] | |
| st.info("ℹ️ Optimisations: multi-threading, resize adaptatif, CUDA si dispo.") | |
| col_start, col_stop = st.columns(2) | |
| if col_start.button("▶️ Démarrer la détection"): | |
| if not valid_polygons: | |
| st.error("❌ Les polygones doivent avoir exactement 4 points chacun.") | |
| elif st.session_state.webcam_active: | |
| st.warning("⚠️ La webcam est déjà active.") | |
| else: | |
| video_placeholder.info("🔄 Connexion à la source vidéo…") | |
| processor = YOLOVideoProcessor(model_path, poly1, poly2, tracker_method) | |
| processor.frame_skip = frame_skip | |
| processor.downsample_factor = downsample | |
| processor.conf_threshold = conf_threshold | |
| st.session_state.processor = processor | |
| st.session_state.webcam_active = True | |
| thread = threading.Thread( | |
| target=st.session_state.processor.process_webcam, | |
| args=(camera_id, video_placeholder, count_placeholders), | |
| daemon=True, | |
| ) | |
| try: | |
| add_script_run_ctx(thread) | |
| except Exception: | |
| pass | |
| try: | |
| thread.start() | |
| st.session_state.processing_thread = thread | |
| except Exception as e: | |
| st.error(f"Erreur au démarrage du thread: {e}") | |
| st.session_state.webcam_active = False | |
| if col_stop.button("⏹️ Arrêter"): | |
| if st.session_state.webcam_active and st.session_state.processor: | |
| st.session_state.processor.stop_processing = True | |
| st.session_state.webcam_active = False | |
| if st.session_state.processing_thread: | |
| st.session_state.processing_thread.join(timeout=2.0) | |
| st.session_state.processing_thread = None | |
| time.sleep(0.3) | |
| video_placeholder.empty() | |
| for ph in count_placeholders: | |
| ph.empty() | |
| else: | |
| st.warning("⚠️ Aucune détection en cours.") | |
| if __name__ == "__main__": | |
| main() |