import os import time import tempfile import json from zipfile import ZipFile import smtplib from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart import cv2 import numpy as np import pandas as pd from PIL import Image import streamlit as st from fpdf import FPDF from st_aggrid import AgGrid from st_aggrid.grid_options_builder import GridOptionsBuilder from techsolut_theme import apply_techsolut_theme from ultralytics import YOLO, RTDETR from streamlit_webrtc import webrtc_streamer, VideoTransformerBase ############################################################################### # FONCTIONS LIEES AU MODELE ############################################################################### @st.cache_resource() def load_model(model_choice, custom_model_path=None): """ Chargement du modèle YOLO/RT-DETR en fonction du choix utilisateur, avec mise en cache pour accélérer le rechargement. """ detection_models = [ "yolov5nu", "yolov5s", "yolov5m", "yolov5l", "yolov5x", "yolov8n", "yolov8s", "yolov8m", "yolov8l", "yolov8x", "yolov9c", "yolov9e", "yolov10n", "yolov10s", "yolov10m", "yolov10l", "yolov10x", "yolo11n", "yolo11s", "yolo11m", "yolo11l", "yolo11x", "yolo12n", "yolo12s", "yolo12m", "yolo12l", "yolo12x", "rtdetr-l", "rtdetr-x" ] segmentation_models = [ "yolov8n-seg", "yolov8s-seg", "yolov8m-seg", "yolov8l-seg", "yolov8x-seg", "yolov9c-seg", "yolov9e-seg", "yolo11n-seg", "yolo11s-seg", "yolo11m-seg", "yolo11l-seg", "yolo11x-seg" ] pose_models = [ "yolov8n-pose", "yolov8s-pose", "yolov8m-pose", "yolov8l-pose", "yolov8x-pose", "yolo11n-pose", "yolo11s-pose", "yolo11m-pose", "yolo11l-pose", "yolo11x-pose" ] # Choix du modèle if model_choice in detection_models + segmentation_models + pose_models: return YOLO(f"{model_choice}.pt") elif model_choice == 'custom' and custom_model_path: return YOLO(custom_model_path) else: # Exemple pour gérer plusieurs PT nommés 'best.pt' ... model_paths = ["best.pt", "best2.pt", "best3.pt", "best5.pt"] model_names = ["model1", "model2", "model3", "model4"] idx = model_names.index(model_choice) return YOLO(model_paths[idx]) def detect_objects(model, image, model_type, conf, iou, classes_to_detect=None, max_det=1000, line_width=2, agnostic_nms=False): """ Détection sur une image unique, en tenant compte des filtres de classe, du paramètre max_det, de l'épaisseur de bounding box, et du agnostic_nms. """ image_np = np.array(image) if not isinstance(image, np.ndarray) else image # Inférence YOLO/RT-DETR results = model( image_np, conf=conf, iou=iou, classes=classes_to_detect if classes_to_detect else None, max_det=max_det, agnostic_nms=agnostic_nms ) annotated_image = results[0].plot(line_width=line_width) return annotated_image, results def count_objects(results, model_type, class_names): """ Compte le nombre d'objets détectés par classe. """ object_counts = {} classes = results[0].boxes.cls.cpu().numpy() for cls_id in classes: name = class_names[int(cls_id)] object_counts[name] = object_counts.get(name, 0) + 1 return object_counts ############################################################################### # FONCTIONS D'EXPORT (PDF, ZIP, CSV, JSON) ############################################################################### def export_pdf(images): """ Exporte une liste d'images PIL en un seul PDF avec un bouton de téléchargement. """ with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as tmpfile: pdf_path = tmpfile.name images[0].save(pdf_path, save_all=True, append_images=images[1:]) with open(pdf_path, "rb") as f: st.download_button("📄 Télécharger le PDF", data=f, file_name="resultats.pdf") def export_zip(images): """ Exporte une liste d'images PIL dans un ZIP avec un bouton de téléchargement. """ with tempfile.NamedTemporaryFile(delete=False, suffix=".zip") as tmpfile: zip_path = tmpfile.name with ZipFile(zip_path, 'w') as zipf: for i, img in enumerate(images): img_filename = f"image_{i}.png" img.save(img_filename) zipf.write(img_filename) os.remove(img_filename) with open(zip_path, "rb") as f: st.download_button("🗜️ Télécharger le ZIP", data=f, file_name="resultats.zip") def export_csv_rows(csv_rows): """ Exporte les détections dans un CSV avec un bouton de téléchargement. """ df = pd.DataFrame(csv_rows) csv_data = df.to_csv(index=False).encode("utf-8") st.download_button("📤 Télécharger CSV des détections", data=csv_data, file_name="detections.csv", mime="text/csv") ############################################################################### # AFFICHAGE DU TABLEAU (st_aggrid) ############################################################################### def show_table(data, key=None): """ Affiche un tableau récapitulatif (classe / nombre d'objets détectés). """ df = pd.DataFrame(list(data.items()), columns=["Classe", "Nombre"]) gb = GridOptionsBuilder.from_dataframe(df) gb.configure_pagination() gb.configure_default_column(editable=False, groupable=True) gb.configure_selection('multiple', use_checkbox=True) grid_options = gb.build() AgGrid(df, gridOptions=grid_options, theme="streamlit", key=key) ############################################################################### # FONCTIONS ENVOI EMAIL & SAUVEGARDE SUR LE CLOUD ############################################################################### def send_notification_smtp(to_email, message): """ Envoi d'un email via SMTP, nécessite des identifiants valides dans st.secrets. """ smtp_server = st.secrets.get("SMTP_SERVER", "smtp.gmail.com") smtp_port = int(st.secrets.get("SMTP_PORT", 587)) smtp_user = st.secrets.get("SMTP_USER", "your_email@gmail.com") smtp_pass = st.secrets.get("SMTP_PASS", "your_password") try: msg = MIMEMultipart("alternative") msg["Subject"] = "YOLO Detection Notification" msg["From"] = smtp_user msg["To"] = to_email part = MIMEText(message, "plain") msg.attach(part) with smtplib.SMTP(smtp_server, smtp_port) as server: server.starttls() server.login(smtp_user, smtp_pass) server.sendmail(smtp_user, to_email, msg.as_string()) st.success(f"Email envoyé à {to_email} avec succès!") except Exception as e: st.error(f"Erreur lors de l'envoi du mail: {e}") def save_to_cloud(file_data, service): """ Fonction illustrative pour sauvegarder des données sur Google Drive / Dropbox / OneDrive. Remplacer avec du code d'API réel selon le service. """ if service == "Google Drive": st.info("Exemple : utiliser PyDrive ou Google Drive API.") elif service == "Dropbox": st.info("Exemple : utiliser le SDK Dropbox pour l'upload.") elif service == "OneDrive": st.info("Exemple : utiliser le SDK OneDrive (MS Graph).") st.success(f"Sauvegarde simulée sur {service} réalisée avec succès !") ############################################################################### # TRANSFORMER POUR STREAMLIT_WEBRTC (VIDEO TEMPS REEL) ############################################################################### class VideoTransformer(VideoTransformerBase): def __init__( self, model, conf=0.25, iou=0.45, show_fps=False, auto_snapshot=False, snapshot_interval=5, output_format="RGB", apply_filters=False, advanced_filters=False, rotation_angle=0, resize_width=None, resize_height=None, detection_zone=None, notification_email=None, save_to_cloud=False, morphological_ops=False, equalize_hist=False, classes_to_detect=None, max_det=1000, line_width=2, record_output=False, agnostic_nms=False ): """ Gère chaque frame de la webcam en temps réel, applique YOLO, différents filtres, la rotation, la sauvegarde locale, etc. """ self.model = model self.conf = conf self.iou = iou self.show_fps = show_fps self.auto_snapshot = auto_snapshot self.snapshot_interval = snapshot_interval self.output_format = output_format.upper() self.apply_filters = apply_filters self.advanced_filters = advanced_filters self.rotation_angle = rotation_angle self.resize_width = resize_width self.resize_height = resize_height self.detection_zone = detection_zone self.notification_email = notification_email self.save_to_cloud = save_to_cloud self.morphological_ops = morphological_ops self.equalize_hist = equalize_hist self.classes_to_detect = classes_to_detect self.max_det = max_det self.line_width = line_width self.record_output = record_output self.agnostic_nms = agnostic_nms self.last_time = time.time() self.last_snapshot_time = time.time() self.latest_snapshot = None self.last_frame = None # Configuration pour l'enregistrement local si besoin self.video_writer = None if self.record_output: self.output_filename = os.path.join(tempfile.gettempdir(), f"webcam_record_{time.time()}.mp4") def transform(self, frame): image = frame.to_ndarray(format="bgr24") self.last_frame = image.copy() # Initialiser la sauvegarde si besoin if self.record_output and self.video_writer is None: fourcc = cv2.VideoWriter_fourcc(*"mp4v") h, w, _ = image.shape self.video_writer = cv2.VideoWriter(self.output_filename, fourcc, 20.0, (w, h)) # Application de filtres simples if self.apply_filters: image = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) image = cv2.GaussianBlur(image, (5, 5), 0) image = cv2.cvtColor(image, cv2.COLOR_GRAY2BGR) # Filtres avancés (contraste, luminosité) if self.advanced_filters: image = cv2.convertScaleAbs(image, alpha=1.5, beta=30) # Opérations morphologiques if self.morphological_ops: kernel = np.ones((3,3), np.uint8) image = cv2.morphologyEx(image, cv2.MORPH_OPEN, kernel) image = cv2.morphologyEx(image, cv2.MORPH_CLOSE, kernel) # Egalisation d'histogramme if self.equalize_hist: yuv = cv2.cvtColor(image, cv2.COLOR_BGR2YUV) yuv[:, :, 0] = cv2.equalizeHist(yuv[:, :, 0]) image = cv2.cvtColor(yuv, cv2.COLOR_YUV2BGR) # Rotation if self.rotation_angle != 0: image = self.rotate_image(image, self.rotation_angle) # Redimensionnement if self.resize_width and self.resize_height: image = cv2.resize(image, (self.resize_width, self.resize_height)) # Zone de détection if self.detection_zone: x, y, w, h = self.detection_zone image = image[y:y+h, x:x+w] # Inférence YOLO results = self.model( image, conf=self.conf, iou=self.iou, classes=self.classes_to_detect if self.classes_to_detect else None, max_det=self.max_det, agnostic_nms=self.agnostic_nms ) annotated_frame = results[0].plot(line_width=self.line_width) # Affichage FPS current_time = time.time() dt = current_time - self.last_time fps = 1.0 / dt if dt > 0 else 0.0 self.last_time = current_time if self.show_fps: cv2.putText(annotated_frame, f"FPS: {fps:.2f}", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2) # Auto Snapshot if self.auto_snapshot and (current_time - self.last_snapshot_time >= self.snapshot_interval): self.last_snapshot_time = current_time self.latest_snapshot = annotated_frame.copy() # Enregistrement local if self.record_output and self.video_writer is not None: self.video_writer.write(annotated_frame) # Format de sortie (RGB vs BGR) if self.output_format == "RGB": display_frame = cv2.cvtColor(annotated_frame, cv2.COLOR_BGR2RGB) else: display_frame = annotated_frame # Notification email if self.notification_email and any(results): send_notification_smtp(self.notification_email, "Détection réalisée sur le flux webcam !") # Sauvegarde sur le cloud si détection if self.save_to_cloud and any(results): _, buffer = cv2.imencode('.png', annotated_frame) save_to_cloud(buffer.tobytes(), "Google Drive") return display_frame def rotate_image(self, image, angle): (h, w) = image.shape[:2] center = (w / 2, h / 2) M = cv2.getRotationMatrix2D(center, angle, 1.0) return cv2.warpAffine(image, M, (w, h)) def __del__(self): if self.video_writer: self.video_writer.release() ############################################################################### # TRAITEMENT DE VIDEOS (FICHIER LOCAL) ############################################################################### def process_video_file( video_path, model, conf, iou, export_type, classes_to_detect=None, max_det=1000, line_width=2, agnostic_nms=False ): """ Traite une vidéo en local, affiche certaines frames annotées, et propose l'exportation des résultats. """ cap = cv2.VideoCapture(video_path) if not cap.isOpened(): st.error("🚫 Impossible d'ouvrir la vidéo.") return frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) st.info(f"🎞️ Vidéo chargée, {frame_count} frames trouvées.") process_only_first = st.checkbox("Traiter seulement la première frame", value=True) output_images = [] csv_rows = [] if process_only_first: ret, frame = cap.read() if ret: pil_img = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)) annotated_frame, results = detect_objects( model=model, image=pil_img, model_type="Vidéo", conf=conf, iou=iou, classes_to_detect=classes_to_detect, max_det=max_det, line_width=line_width, agnostic_nms=agnostic_nms ) st.image(annotated_frame, channels="BGR", caption="🖼️ Première frame annotée") output_images.append(Image.fromarray(cv2.cvtColor(annotated_frame, cv2.COLOR_BGR2RGB))) counts = count_objects(results, "Vidéo", model.names if hasattr(model, 'names') else []) for cls_name, count_val in counts.items(): csv_rows.append({"Image": "frame_0", "Classe": cls_name, "Nombre": count_val}) else: st.error("🚫 Impossible de lire la première frame.") else: num_frames_to_process = st.slider("Nombre de frames à traiter", 1, min(frame_count, 50), 10) frame_idx = 0 processed = 0 interval = max(1, frame_count // num_frames_to_process) while processed < num_frames_to_process: ret, frame = cap.read() if not ret: break if frame_idx % interval == 0: pil_img = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)) annotated_frame, results = detect_objects( model=model, image=pil_img, model_type="Vidéo", conf=conf, iou=iou, classes_to_detect=classes_to_detect, max_det=max_det, line_width=line_width, agnostic_nms=agnostic_nms ) st.image(annotated_frame, channels="BGR", caption=f"🖼️ Frame {frame_idx} annotée") output_images.append(Image.fromarray(cv2.cvtColor(annotated_frame, cv2.COLOR_BGR2RGB))) counts = count_objects(results, "Vidéo", model.names if hasattr(model, 'names') else []) for cls_name, count_val in counts.items(): csv_rows.append({"Image": f"frame_{frame_idx}", "Classe": cls_name, "Nombre": count_val}) processed += 1 frame_idx += 1 cap.release() # Export if output_images: if export_type == "PDF": export_pdf(output_images) elif export_type == "ZIP": export_zip(output_images) elif export_type == "CSV": export_csv_rows(csv_rows) elif export_type == "JSON": json_data = json.dumps(csv_rows, indent=4) st.download_button("📥 Télécharger JSON des détections", data=json_data, file_name="detections.json", mime="application/json") ############################################################################### # GESTION OPTIMISEE DES WEBCAMS: AFFICHER PLUSIEURS CAMERAS ############################################################################### def display_webcam_streams( selected_devices, model, conf, iou, show_fps, auto_snapshot, snapshot_interval, output_format, apply_filters, advanced_filters, rotation_angle, resize_width, resize_height, detection_zone, notification_email, save_to_cloud, morphological_ops, equalize_hist, classes_to_detect, max_det, line_width, record_output, agnostic_nms ): """ Affiche plusieurs webcams simultanément, organisées en lignes de 4 caméras max. Chaque webcam possède sa propre instance de VideoTransformer. """ if not selected_devices: st.warning("Aucune webcam sélectionnée.") return # Découper la liste de caméras en groupes de 4 pour l'affichage en grille for row_start in range(0, len(selected_devices), 4): row_devices = selected_devices[row_start:row_start+4] cols = st.columns(len(row_devices)) for i, device_index in enumerate(row_devices): with cols[i]: st.markdown(f"**Caméra {device_index}**") ctx = webrtc_streamer( key=f"webcam-{device_index}", video_transformer_factory=lambda m=model, c=conf, iou_val=iou: VideoTransformer( m, conf=c, iou=iou_val, show_fps=show_fps, auto_snapshot=auto_snapshot, snapshot_interval=snapshot_interval, output_format=output_format, apply_filters=apply_filters, advanced_filters=advanced_filters, rotation_angle=rotation_angle, resize_width=resize_width, resize_height=resize_height, detection_zone=detection_zone, notification_email=notification_email, save_to_cloud=save_to_cloud, morphological_ops=morphological_ops, equalize_hist=equalize_hist, classes_to_detect=classes_to_detect, max_det=max_det, line_width=line_width, record_output=record_output, agnostic_nms=agnostic_nms ), rtc_configuration={"iceServers": [{"urls": ["stun:stun.l.google.com:19302"]}]}, media_stream_constraints={ "video": {"deviceId": {"exact": str(device_index)}}, "audio": False } ) # Bouton de snapshot manuel if st.button(f"📸 Snapshot Caméra {device_index}", key=f"snap-{device_index}"): if ctx.video_transformer: frame = ctx.video_transformer.last_frame if frame is not None: st.image(frame, caption=f"Snapshot Caméra {device_index}", channels="BGR") else: st.warning("🚫 Aucune image capturée pour cette caméra.") # Snapshot automatique téléchargeable if auto_snapshot and ctx.video_transformer and ctx.video_transformer.latest_snapshot is not None: ret, buffer = cv2.imencode('.png', ctx.video_transformer.latest_snapshot) if ret: snapshot_bytes = buffer.tobytes() st.download_button( label="📥 Télécharger Snapshot Auto", data=snapshot_bytes, file_name=f"snapshot_cam_{device_index}.png", key=f"auto_snap_{device_index}" ) ############################################################################### # GESTION CAMERA IP ############################################################################### def display_ip_camera( ip_url, model, conf, iou, show_fps, auto_snapshot, snapshot_interval, output_format, classes_to_detect=None, max_det=1000, line_width=2, agnostic_nms=False ): """ Lit des frames depuis une caméra IP (RTSP), applique YOLO, et les affiche en temps réel. """ cap = cv2.VideoCapture(ip_url) if not cap.isOpened(): st.error("🚫 Impossible d'ouvrir la caméra IP.") return frame_placeholder = st.empty() last_time = time.time() last_snapshot_time = time.time() stop_button = st.button("⏹️ Arrêter le streaming IP") while True: if stop_button: st.info("Arrêt du streaming IP.") break ret, frame = cap.read() if not ret: st.error("🚫 Erreur de lecture du flux IP.") break # Inférence YOLO results = model( frame, conf=conf, iou=iou, classes=classes_to_detect if classes_to_detect else None, max_det=max_det, agnostic_nms=agnostic_nms ) annotated_frame = results[0].plot(line_width=line_width) # FPS current_time = time.time() dt = current_time - last_time fps = 1.0 / dt if dt > 0 else 0.0 last_time = current_time if show_fps: cv2.putText(annotated_frame, f"FPS: {fps:.2f}", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2) # Auto-snapshot if auto_snapshot and (current_time - last_snapshot_time >= snapshot_interval): last_snapshot_time = current_time snapshot_bytes = cv2.imencode('.png', annotated_frame)[1].tobytes() st.download_button( "📥 Télécharger Snapshot Auto", data=snapshot_bytes, file_name="ip_snapshot.png", key=f"ip_snapshot_{time.time()}" ) # Format de sortie if output_format.upper() == "RGB": annotated_frame = cv2.cvtColor(annotated_frame, cv2.COLOR_BGR2RGB) frame_placeholder.image( annotated_frame, channels="RGB" if output_format.upper()=="RGB" else "BGR" ) cap.release() ############################################################################### # APPLICATION PRINCIPALE ############################################################################### def main(): apply_techsolut_theme() # st.set_page_config(page_title="Plateforme de Vision par Ordinateur - TECHSOLUT", layout="wide") # st.title("👁️ Vision par Ordinateur - TECHSOLUT (Multi-Webcams Optimisé)") # 1) Choix du modèle model_versions = { "Détection": { "YOLOv5": ["yolov5nu", "yolov5s", "yolov5m", "yolov5l", "yolov5x"], "YOLOv8": ["yolov8n", "yolov8s", "yolov8m", "yolov8l", "yolov8x"], "YOLOv9": ["yolov9c", "yolov9e"], "YOLOv10": ["yolov10n", "yolov10s", "yolov10m", "yolov10l", "yolov10x"], "YOLO11": ["yolo11n", "yolo11s", "yolo11m", "yolo11l", "yolo11x"], "YOLO12": ["yolo12n", "yolo12s", "yolo12m", "yolo12l", "yolo12x"], "RT-DETR": ["rtdetr-l", "rtdetr-x"] }, "Segmentation": { "YOLOv8": ["yolov8n-seg", "yolov8s-seg", "yolov8m-seg", "yolov8l-seg", "yolov8x-seg"], "YOLOv9": ["yolov9c-seg", "yolov9e-seg"], "YOLO11": ["yolo11n-seg", "yolo11s-seg", "yolo11m-seg", "yolo11l-seg", "yolo11x-seg"] }, "Estimation de pose": { "YOLOv8": ["yolov8n-pose", "yolov8s-pose", "yolov8m-pose", "yolov8l-pose", "yolov8x-pose"], "YOLO11": ["yolo11n-pose", "yolo11s-pose", "yolo11m-pose", "yolo11l-pose", "yolo11x-pose"] }, "Personnalisé": { "Custom": ["custom"] } } with st.sidebar: with st.expander("🧠 Choix du modèle"): task_type = st.selectbox("Type de tâche", list(model_versions.keys())) model_family = st.selectbox("Famille de modèle", list(model_versions[task_type].keys())) selected_model = st.selectbox("Version du modèle", model_versions[task_type][model_family]) custom_model_path = None if selected_model == "custom": uploaded_file = st.file_uploader("📥 Charger un modèle (.pt)", type=["pt"]) if uploaded_file: with tempfile.NamedTemporaryFile(delete=False, suffix=".pt") as tmp: tmp.write(uploaded_file.read()) custom_model_path = tmp.name with st.expander("🎯 Paramètres"): conf = st.slider("Confiance (confidence threshold)", 0.0, 1.0, 0.25) iou = st.slider("IoU threshold", 0.0, 1.0, 0.45) input_size = st.selectbox("Taille d'entrée du modèle", options=[320, 416, 512, 640, 960, 1280], index=3) processing_mode = st.radio("Mode de traitement", options=["Image par image", "Traitement par lot"], horizontal=True) show_fps = st.checkbox("Afficher le FPS sur la vidéo", value=False) auto_snapshot = st.checkbox("Téléchargement automatique des snapshots", value=False) output_format = st.selectbox("Format de sortie des frames", options=["BGR", "RGB"], index=1) snapshot_interval = st.number_input("Sauvegarder une frame toutes les X secondes", min_value=1, max_value=60, value=5, step=1) apply_filters = st.checkbox("Appliquer des filtres (grayscale + flou)", value=False) advanced_filters = st.checkbox("Appliquer des filtres avancés (contraste + luminosité)", value=False) rotation_angle = st.number_input("Angle de rotation", min_value=0, max_value=360, value=0, step=1) resize_width = st.number_input("Largeur de redimensionnement", min_value=1, value=640, step=1) resize_height = st.number_input("Hauteur de redimensionnement", min_value=1, value=480, step=1) detection_zone = st.checkbox("Définir une zone de détection") if detection_zone: x = st.number_input("Zone X", min_value=0, value=0, step=1) y = st.number_input("Zone Y", min_value=0, value=0, step=1) w = st.number_input("Zone Largeur", min_value=1, value=640, step=1) h = st.number_input("Zone Hauteur", min_value=1, value=480, step=1) detection_zone = (x, y, w, h) else: detection_zone = None notification_email = st.text_input("Email pour notifications") save_to_cloud_flag = st.checkbox("Sauvegarder les résultats sur le cloud") with st.expander("🧩 Options Avancées"): morphological_ops = st.checkbox("Opérations morphologiques (opening/closing)") equalize_hist = st.checkbox("Égaliser l'histogramme (améliorer contraste)") custom_classes = st.text_input("Lister les classes (ID, séparés par des virgules) à détecter ou laisser vide") if custom_classes.strip(): classes_to_detect = [int(c.strip()) for c in custom_classes.split(",") if c.strip().isdigit()] else: classes_to_detect = None max_det = st.number_input("Max Detections autorisées", min_value=1, max_value=10000, value=1000, step=50) line_width = st.slider("Épaisseur des bounding boxes", 1, 10, 2) record_output = st.checkbox("Enregistrer les flux webcam en local (format MP4)") agnostic_nms = st.checkbox("Agnostic NMS (ignorer les classes lors du NMS)") with st.expander("🖍️ Post-traitement"): export_type = st.selectbox("Exporter sous", ["PDF", "ZIP", "CSV", "JSON"]) with st.expander("☁️ Sauvegarde Cloud"): cloud_service = st.selectbox("Choisir un service", ["Google Drive", "Dropbox", "OneDrive"]) cloud_file = st.file_uploader("📤 Sélectionner un fichier à sauvegarder", type=["pdf", "zip", "csv", "jpg", "png", "json"]) if cloud_file: if st.button(f"Sauvegarder sur {cloud_service}"): save_to_cloud(cloud_file.read(), cloud_service) # ------------------------------------------------------------------------- # 2) CHARGEMENT DU MODELE # ------------------------------------------------------------------------- model = load_model(selected_model, custom_model_path) class_names = model.names if hasattr(model, 'names') else [] # ------------------------------------------------------------------------- # 3) SECTION CENTRALE: CHOIX DE LA SOURCE # ------------------------------------------------------------------------- st.subheader("Source : 🖼️ Image, 🎥 Vidéo, 📷 Webcam, 🌐 Caméra IP ou 🔄 Relecture") input_type = st.radio("Source", ["Image", "Vidéo", "Webcam", "Caméra IP", "Relecture"], horizontal=True) # ================== 1) IMAGE ================== if input_type == "Image": uploaded_images = st.file_uploader("📁 Choisir des images", type=["jpg", "png"], accept_multiple_files=True) if uploaded_images: output_images = [] csv_rows = [] for img_file in uploaded_images: image = Image.open(img_file).convert("RGB") st.image(image, caption=f"🖼️ {img_file.name}") annotated_image, results = detect_objects( model=model, image=image, model_type=task_type, conf=conf, iou=iou, classes_to_detect=classes_to_detect, max_det=max_det, line_width=line_width, agnostic_nms=agnostic_nms ) st.image(annotated_image, caption="🖼️ Image annotée") # Convert BGR->RGB PIL pour stockage output_images.append(Image.fromarray(cv2.cvtColor(annotated_image, cv2.COLOR_BGR2RGB))) counts = count_objects(results, task_type, class_names) for cls_name, count_val in counts.items(): csv_rows.append({"Image": img_file.name, "Classe": cls_name, "Nombre": count_val}) show_table(counts) # Export if output_images: if export_type == "PDF": export_pdf(output_images) elif export_type == "ZIP": export_zip(output_images) elif export_type == "CSV": export_csv_rows(csv_rows) elif export_type == "JSON": json_data = json.dumps(csv_rows, indent=4) st.download_button("📥 Télécharger JSON des détections", data=json_data, file_name="detections.json", mime="application/json") if save_to_cloud_flag: st.info("Exemple: sauvegarde ZIP des images sur Cloud.") with tempfile.NamedTemporaryFile(delete=False, suffix=".zip") as tmpfile: zip_path = tmpfile.name with ZipFile(zip_path, 'w') as zipf: for i, img in enumerate(output_images): img_filename = f"cloud_image_{i}.png" img.save(img_filename) zipf.write(img_filename) os.remove(img_filename) with open(zip_path, "rb") as f: zip_data = f.read() save_to_cloud(zip_data, cloud_service) os.remove(zip_path) # ================== 2) VIDEO ================== elif input_type == "Vidéo": uploaded_video = st.file_uploader("📁 Choisir une vidéo", type=["mp4", "avi", "mov"], accept_multiple_files=False) if uploaded_video is not None: tfile = tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") tfile.write(uploaded_video.read()) tfile.close() process_video_file( video_path=tfile.name, model=model, conf=conf, iou=iou, export_type=export_type, classes_to_detect=classes_to_detect, max_det=max_det, line_width=line_width, agnostic_nms=agnostic_nms ) os.unlink(tfile.name) # ================== 3) MULTI-WEBCAM ================== elif input_type == "Webcam": st.info("Recherche de toutes les webcams disponibles...") available_devices = [] # Scanner 0..20 pour découvrir potentiellement plus de webcams for index in range(21): cap = cv2.VideoCapture(index) if cap.isOpened(): ret, _ = cap.read() if ret: available_devices.append(index) cap.release() if len(available_devices) == 0: st.error("🚫 Aucune webcam détectée ou accessible.") else: st.success(f"Webcams détectées : {available_devices}") selected_devices = st.multiselect( "Sélectionner les caméras à utiliser (max 4 affichées par rangée)", options=available_devices, default=available_devices[:1], format_func=lambda x: f"Caméra {x}" ) if selected_devices: display_webcam_streams( selected_devices=selected_devices, model=model, conf=conf, iou=iou, show_fps=show_fps, auto_snapshot=auto_snapshot, snapshot_interval=snapshot_interval, output_format=output_format, apply_filters=apply_filters, advanced_filters=advanced_filters, rotation_angle=rotation_angle, resize_width=resize_width, resize_height=resize_height, detection_zone=detection_zone, notification_email=notification_email, save_to_cloud=save_to_cloud_flag, morphological_ops=morphological_ops, equalize_hist=equalize_hist, classes_to_detect=classes_to_detect, max_det=max_det, line_width=line_width, record_output=record_output, agnostic_nms=agnostic_nms ) # ================== 4) CAMERA IP ================== elif input_type == "Caméra IP": st.info("Activation de la caméra IP...") ip_url = st.text_input("Entrez l'URL RTSP", value="rtsp://") if st.button("Démarrer le streaming IP"): display_ip_camera( ip_url=ip_url, model=model, conf=conf, iou=iou, show_fps=show_fps, auto_snapshot=auto_snapshot, snapshot_interval=snapshot_interval, output_format=output_format, classes_to_detect=classes_to_detect, max_det=max_det, line_width=line_width, agnostic_nms=agnostic_nms ) # ================== 5) RELECTURE ================== elif input_type == "Relecture": st.info("Relecture de vidéos enregistrées") recorded_video = st.file_uploader("📁 Charger une vidéo enregistrée", type=["mp4", "avi", "mov"], accept_multiple_files=False) if recorded_video is not None: st.video(recorded_video) ############################################################################### # LANCEMENT DU SCRIPT ############################################################################### if __name__ == "__main__": main()