Spaces:
Sleeping
Sleeping
| import os | |
| import time | |
| import tempfile | |
| import json | |
| from zipfile import ZipFile | |
| import smtplib | |
| from email.mime.text import MIMEText | |
| from email.mime.multipart import MIMEMultipart | |
| import cv2 | |
| import numpy as np | |
| import pandas as pd | |
| from PIL import Image | |
| import streamlit as st | |
| from fpdf import FPDF | |
| from st_aggrid import AgGrid | |
| from st_aggrid.grid_options_builder import GridOptionsBuilder | |
| from techsolut_theme import apply_techsolut_theme | |
| from ultralytics import YOLO, RTDETR | |
| from streamlit_webrtc import webrtc_streamer, VideoTransformerBase | |
| ############################################################################### | |
| # FONCTIONS LIEES AU MODELE | |
| ############################################################################### | |
| def load_model(model_choice, custom_model_path=None): | |
| """ | |
| Chargement du modèle YOLO/RT-DETR en fonction du choix utilisateur, | |
| avec mise en cache pour accélérer le rechargement. | |
| """ | |
| detection_models = [ | |
| "yolov5nu", "yolov5s", "yolov5m", "yolov5l", "yolov5x", | |
| "yolov8n", "yolov8s", "yolov8m", "yolov8l", "yolov8x", | |
| "yolov9c", "yolov9e", | |
| "yolov10n", "yolov10s", "yolov10m", "yolov10l", "yolov10x", | |
| "yolo11n", "yolo11s", "yolo11m", "yolo11l", "yolo11x", | |
| "yolo12n", "yolo12s", "yolo12m", "yolo12l", "yolo12x", | |
| "rtdetr-l", "rtdetr-x" | |
| ] | |
| segmentation_models = [ | |
| "yolov8n-seg", "yolov8s-seg", "yolov8m-seg", "yolov8l-seg", "yolov8x-seg", | |
| "yolov9c-seg", "yolov9e-seg", | |
| "yolo11n-seg", "yolo11s-seg", "yolo11m-seg", "yolo11l-seg", "yolo11x-seg" | |
| ] | |
| pose_models = [ | |
| "yolov8n-pose", "yolov8s-pose", "yolov8m-pose", "yolov8l-pose", "yolov8x-pose", | |
| "yolo11n-pose", "yolo11s-pose", "yolo11m-pose", "yolo11l-pose", "yolo11x-pose" | |
| ] | |
| # Choix du modèle | |
| if model_choice in detection_models + segmentation_models + pose_models: | |
| return YOLO(f"{model_choice}.pt") | |
| elif model_choice == 'custom' and custom_model_path: | |
| return YOLO(custom_model_path) | |
| else: | |
| # Exemple pour gérer plusieurs PT nommés 'best.pt' ... | |
| model_paths = ["best.pt", "best2.pt", "best3.pt", "best5.pt"] | |
| model_names = ["model1", "model2", "model3", "model4"] | |
| idx = model_names.index(model_choice) | |
| return YOLO(model_paths[idx]) | |
| def detect_objects(model, | |
| image, | |
| model_type, | |
| conf, | |
| iou, | |
| classes_to_detect=None, | |
| max_det=1000, | |
| line_width=2, | |
| agnostic_nms=False): | |
| """ | |
| Détection sur une image unique, en tenant compte des filtres de classe, | |
| du paramètre max_det, de l'épaisseur de bounding box, et du agnostic_nms. | |
| """ | |
| image_np = np.array(image) if not isinstance(image, np.ndarray) else image | |
| # Inférence YOLO/RT-DETR | |
| results = model( | |
| image_np, | |
| conf=conf, | |
| iou=iou, | |
| classes=classes_to_detect if classes_to_detect else None, | |
| max_det=max_det, | |
| agnostic_nms=agnostic_nms | |
| ) | |
| annotated_image = results[0].plot(line_width=line_width) | |
| return annotated_image, results | |
| def count_objects(results, model_type, class_names): | |
| """ | |
| Compte le nombre d'objets détectés par classe. | |
| """ | |
| object_counts = {} | |
| classes = results[0].boxes.cls.cpu().numpy() | |
| for cls_id in classes: | |
| name = class_names[int(cls_id)] | |
| object_counts[name] = object_counts.get(name, 0) + 1 | |
| return object_counts | |
| ############################################################################### | |
| # FONCTIONS D'EXPORT (PDF, ZIP, CSV, JSON) | |
| ############################################################################### | |
| def export_pdf(images): | |
| """ | |
| Exporte une liste d'images PIL en un seul PDF avec un bouton de téléchargement. | |
| """ | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as tmpfile: | |
| pdf_path = tmpfile.name | |
| images[0].save(pdf_path, save_all=True, append_images=images[1:]) | |
| with open(pdf_path, "rb") as f: | |
| st.download_button("📄 Télécharger le PDF", data=f, file_name="resultats.pdf") | |
| def export_zip(images): | |
| """ | |
| Exporte une liste d'images PIL dans un ZIP avec un bouton de téléchargement. | |
| """ | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".zip") as tmpfile: | |
| zip_path = tmpfile.name | |
| with ZipFile(zip_path, 'w') as zipf: | |
| for i, img in enumerate(images): | |
| img_filename = f"image_{i}.png" | |
| img.save(img_filename) | |
| zipf.write(img_filename) | |
| os.remove(img_filename) | |
| with open(zip_path, "rb") as f: | |
| st.download_button("🗜️ Télécharger le ZIP", data=f, file_name="resultats.zip") | |
| def export_csv_rows(csv_rows): | |
| """ | |
| Exporte les détections dans un CSV avec un bouton de téléchargement. | |
| """ | |
| df = pd.DataFrame(csv_rows) | |
| csv_data = df.to_csv(index=False).encode("utf-8") | |
| st.download_button("📤 Télécharger CSV des détections", data=csv_data, | |
| file_name="detections.csv", mime="text/csv") | |
| ############################################################################### | |
| # AFFICHAGE DU TABLEAU (st_aggrid) | |
| ############################################################################### | |
| def show_table(data, key=None): | |
| """ | |
| Affiche un tableau récapitulatif (classe / nombre d'objets détectés). | |
| """ | |
| df = pd.DataFrame(list(data.items()), columns=["Classe", "Nombre"]) | |
| gb = GridOptionsBuilder.from_dataframe(df) | |
| gb.configure_pagination() | |
| gb.configure_default_column(editable=False, groupable=True) | |
| gb.configure_selection('multiple', use_checkbox=True) | |
| grid_options = gb.build() | |
| AgGrid(df, gridOptions=grid_options, theme="streamlit", key=key) | |
| ############################################################################### | |
| # FONCTIONS ENVOI EMAIL & SAUVEGARDE SUR LE CLOUD | |
| ############################################################################### | |
| def send_notification_smtp(to_email, message): | |
| """ | |
| Envoi d'un email via SMTP, nécessite des identifiants valides dans st.secrets. | |
| """ | |
| smtp_server = st.secrets.get("SMTP_SERVER", "smtp.gmail.com") | |
| smtp_port = int(st.secrets.get("SMTP_PORT", 587)) | |
| smtp_user = st.secrets.get("SMTP_USER", "your_email@gmail.com") | |
| smtp_pass = st.secrets.get("SMTP_PASS", "your_password") | |
| try: | |
| msg = MIMEMultipart("alternative") | |
| msg["Subject"] = "YOLO Detection Notification" | |
| msg["From"] = smtp_user | |
| msg["To"] = to_email | |
| part = MIMEText(message, "plain") | |
| msg.attach(part) | |
| with smtplib.SMTP(smtp_server, smtp_port) as server: | |
| server.starttls() | |
| server.login(smtp_user, smtp_pass) | |
| server.sendmail(smtp_user, to_email, msg.as_string()) | |
| st.success(f"Email envoyé à {to_email} avec succès!") | |
| except Exception as e: | |
| st.error(f"Erreur lors de l'envoi du mail: {e}") | |
| def save_to_cloud(file_data, service): | |
| """ | |
| Fonction illustrative pour sauvegarder des données sur Google Drive / Dropbox / OneDrive. | |
| Remplacer avec du code d'API réel selon le service. | |
| """ | |
| if service == "Google Drive": | |
| st.info("Exemple : utiliser PyDrive ou Google Drive API.") | |
| elif service == "Dropbox": | |
| st.info("Exemple : utiliser le SDK Dropbox pour l'upload.") | |
| elif service == "OneDrive": | |
| st.info("Exemple : utiliser le SDK OneDrive (MS Graph).") | |
| st.success(f"Sauvegarde simulée sur {service} réalisée avec succès !") | |
| ############################################################################### | |
| # TRANSFORMER POUR STREAMLIT_WEBRTC (VIDEO TEMPS REEL) | |
| ############################################################################### | |
| class VideoTransformer(VideoTransformerBase): | |
| def __init__( | |
| self, | |
| model, | |
| conf=0.25, | |
| iou=0.45, | |
| show_fps=False, | |
| auto_snapshot=False, | |
| snapshot_interval=5, | |
| output_format="RGB", | |
| apply_filters=False, | |
| advanced_filters=False, | |
| rotation_angle=0, | |
| resize_width=None, | |
| resize_height=None, | |
| detection_zone=None, | |
| notification_email=None, | |
| save_to_cloud=False, | |
| morphological_ops=False, | |
| equalize_hist=False, | |
| classes_to_detect=None, | |
| max_det=1000, | |
| line_width=2, | |
| record_output=False, | |
| agnostic_nms=False | |
| ): | |
| """ | |
| Gère chaque frame de la webcam en temps réel, applique YOLO, | |
| différents filtres, la rotation, la sauvegarde locale, etc. | |
| """ | |
| self.model = model | |
| self.conf = conf | |
| self.iou = iou | |
| self.show_fps = show_fps | |
| self.auto_snapshot = auto_snapshot | |
| self.snapshot_interval = snapshot_interval | |
| self.output_format = output_format.upper() | |
| self.apply_filters = apply_filters | |
| self.advanced_filters = advanced_filters | |
| self.rotation_angle = rotation_angle | |
| self.resize_width = resize_width | |
| self.resize_height = resize_height | |
| self.detection_zone = detection_zone | |
| self.notification_email = notification_email | |
| self.save_to_cloud = save_to_cloud | |
| self.morphological_ops = morphological_ops | |
| self.equalize_hist = equalize_hist | |
| self.classes_to_detect = classes_to_detect | |
| self.max_det = max_det | |
| self.line_width = line_width | |
| self.record_output = record_output | |
| self.agnostic_nms = agnostic_nms | |
| self.last_time = time.time() | |
| self.last_snapshot_time = time.time() | |
| self.latest_snapshot = None | |
| self.last_frame = None | |
| # Configuration pour l'enregistrement local si besoin | |
| self.video_writer = None | |
| if self.record_output: | |
| self.output_filename = os.path.join(tempfile.gettempdir(), | |
| f"webcam_record_{time.time()}.mp4") | |
| def transform(self, frame): | |
| image = frame.to_ndarray(format="bgr24") | |
| self.last_frame = image.copy() | |
| # Initialiser la sauvegarde si besoin | |
| if self.record_output and self.video_writer is None: | |
| fourcc = cv2.VideoWriter_fourcc(*"mp4v") | |
| h, w, _ = image.shape | |
| self.video_writer = cv2.VideoWriter(self.output_filename, fourcc, 20.0, (w, h)) | |
| # Application de filtres simples | |
| if self.apply_filters: | |
| image = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) | |
| image = cv2.GaussianBlur(image, (5, 5), 0) | |
| image = cv2.cvtColor(image, cv2.COLOR_GRAY2BGR) | |
| # Filtres avancés (contraste, luminosité) | |
| if self.advanced_filters: | |
| image = cv2.convertScaleAbs(image, alpha=1.5, beta=30) | |
| # Opérations morphologiques | |
| if self.morphological_ops: | |
| kernel = np.ones((3,3), np.uint8) | |
| image = cv2.morphologyEx(image, cv2.MORPH_OPEN, kernel) | |
| image = cv2.morphologyEx(image, cv2.MORPH_CLOSE, kernel) | |
| # Egalisation d'histogramme | |
| if self.equalize_hist: | |
| yuv = cv2.cvtColor(image, cv2.COLOR_BGR2YUV) | |
| yuv[:, :, 0] = cv2.equalizeHist(yuv[:, :, 0]) | |
| image = cv2.cvtColor(yuv, cv2.COLOR_YUV2BGR) | |
| # Rotation | |
| if self.rotation_angle != 0: | |
| image = self.rotate_image(image, self.rotation_angle) | |
| # Redimensionnement | |
| if self.resize_width and self.resize_height: | |
| image = cv2.resize(image, (self.resize_width, self.resize_height)) | |
| # Zone de détection | |
| if self.detection_zone: | |
| x, y, w, h = self.detection_zone | |
| image = image[y:y+h, x:x+w] | |
| # Inférence YOLO | |
| results = self.model( | |
| image, | |
| conf=self.conf, | |
| iou=self.iou, | |
| classes=self.classes_to_detect if self.classes_to_detect else None, | |
| max_det=self.max_det, | |
| agnostic_nms=self.agnostic_nms | |
| ) | |
| annotated_frame = results[0].plot(line_width=self.line_width) | |
| # Affichage FPS | |
| current_time = time.time() | |
| dt = current_time - self.last_time | |
| fps = 1.0 / dt if dt > 0 else 0.0 | |
| self.last_time = current_time | |
| if self.show_fps: | |
| cv2.putText(annotated_frame, f"FPS: {fps:.2f}", (10, 30), | |
| cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2) | |
| # Auto Snapshot | |
| if self.auto_snapshot and (current_time - self.last_snapshot_time >= self.snapshot_interval): | |
| self.last_snapshot_time = current_time | |
| self.latest_snapshot = annotated_frame.copy() | |
| # Enregistrement local | |
| if self.record_output and self.video_writer is not None: | |
| self.video_writer.write(annotated_frame) | |
| # Format de sortie (RGB vs BGR) | |
| if self.output_format == "RGB": | |
| display_frame = cv2.cvtColor(annotated_frame, cv2.COLOR_BGR2RGB) | |
| else: | |
| display_frame = annotated_frame | |
| # Notification email | |
| if self.notification_email and any(results): | |
| send_notification_smtp(self.notification_email, "Détection réalisée sur le flux webcam !") | |
| # Sauvegarde sur le cloud si détection | |
| if self.save_to_cloud and any(results): | |
| _, buffer = cv2.imencode('.png', annotated_frame) | |
| save_to_cloud(buffer.tobytes(), "Google Drive") | |
| return display_frame | |
| def rotate_image(self, image, angle): | |
| (h, w) = image.shape[:2] | |
| center = (w / 2, h / 2) | |
| M = cv2.getRotationMatrix2D(center, angle, 1.0) | |
| return cv2.warpAffine(image, M, (w, h)) | |
| def __del__(self): | |
| if self.video_writer: | |
| self.video_writer.release() | |
| ############################################################################### | |
| # TRAITEMENT DE VIDEOS (FICHIER LOCAL) | |
| ############################################################################### | |
| def process_video_file( | |
| video_path, | |
| model, | |
| conf, | |
| iou, | |
| export_type, | |
| classes_to_detect=None, | |
| max_det=1000, | |
| line_width=2, | |
| agnostic_nms=False | |
| ): | |
| """ | |
| Traite une vidéo en local, affiche certaines frames annotées, | |
| et propose l'exportation des résultats. | |
| """ | |
| cap = cv2.VideoCapture(video_path) | |
| if not cap.isOpened(): | |
| st.error("🚫 Impossible d'ouvrir la vidéo.") | |
| return | |
| frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| st.info(f"🎞️ Vidéo chargée, {frame_count} frames trouvées.") | |
| process_only_first = st.checkbox("Traiter seulement la première frame", value=True) | |
| output_images = [] | |
| csv_rows = [] | |
| if process_only_first: | |
| ret, frame = cap.read() | |
| if ret: | |
| pil_img = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)) | |
| annotated_frame, results = detect_objects( | |
| model=model, | |
| image=pil_img, | |
| model_type="Vidéo", | |
| conf=conf, | |
| iou=iou, | |
| classes_to_detect=classes_to_detect, | |
| max_det=max_det, | |
| line_width=line_width, | |
| agnostic_nms=agnostic_nms | |
| ) | |
| st.image(annotated_frame, channels="BGR", caption="🖼️ Première frame annotée") | |
| output_images.append(Image.fromarray(cv2.cvtColor(annotated_frame, cv2.COLOR_BGR2RGB))) | |
| counts = count_objects(results, "Vidéo", model.names if hasattr(model, 'names') else []) | |
| for cls_name, count_val in counts.items(): | |
| csv_rows.append({"Image": "frame_0", "Classe": cls_name, "Nombre": count_val}) | |
| else: | |
| st.error("🚫 Impossible de lire la première frame.") | |
| else: | |
| num_frames_to_process = st.slider("Nombre de frames à traiter", 1, min(frame_count, 50), 10) | |
| frame_idx = 0 | |
| processed = 0 | |
| interval = max(1, frame_count // num_frames_to_process) | |
| while processed < num_frames_to_process: | |
| ret, frame = cap.read() | |
| if not ret: | |
| break | |
| if frame_idx % interval == 0: | |
| pil_img = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)) | |
| annotated_frame, results = detect_objects( | |
| model=model, | |
| image=pil_img, | |
| model_type="Vidéo", | |
| conf=conf, | |
| iou=iou, | |
| classes_to_detect=classes_to_detect, | |
| max_det=max_det, | |
| line_width=line_width, | |
| agnostic_nms=agnostic_nms | |
| ) | |
| st.image(annotated_frame, channels="BGR", caption=f"🖼️ Frame {frame_idx} annotée") | |
| output_images.append(Image.fromarray(cv2.cvtColor(annotated_frame, cv2.COLOR_BGR2RGB))) | |
| counts = count_objects(results, "Vidéo", model.names if hasattr(model, 'names') else []) | |
| for cls_name, count_val in counts.items(): | |
| csv_rows.append({"Image": f"frame_{frame_idx}", "Classe": cls_name, "Nombre": count_val}) | |
| processed += 1 | |
| frame_idx += 1 | |
| cap.release() | |
| # Export | |
| if output_images: | |
| if export_type == "PDF": | |
| export_pdf(output_images) | |
| elif export_type == "ZIP": | |
| export_zip(output_images) | |
| elif export_type == "CSV": | |
| export_csv_rows(csv_rows) | |
| elif export_type == "JSON": | |
| json_data = json.dumps(csv_rows, indent=4) | |
| st.download_button("📥 Télécharger JSON des détections", | |
| data=json_data, | |
| file_name="detections.json", | |
| mime="application/json") | |
| ############################################################################### | |
| # GESTION OPTIMISEE DES WEBCAMS: AFFICHER PLUSIEURS CAMERAS | |
| ############################################################################### | |
| def display_webcam_streams( | |
| selected_devices, | |
| model, | |
| conf, | |
| iou, | |
| show_fps, | |
| auto_snapshot, | |
| snapshot_interval, | |
| output_format, | |
| apply_filters, | |
| advanced_filters, | |
| rotation_angle, | |
| resize_width, | |
| resize_height, | |
| detection_zone, | |
| notification_email, | |
| save_to_cloud, | |
| morphological_ops, | |
| equalize_hist, | |
| classes_to_detect, | |
| max_det, | |
| line_width, | |
| record_output, | |
| agnostic_nms | |
| ): | |
| """ | |
| Affiche plusieurs webcams simultanément, organisées en lignes de 4 caméras max. | |
| Chaque webcam possède sa propre instance de VideoTransformer. | |
| """ | |
| if not selected_devices: | |
| st.warning("Aucune webcam sélectionnée.") | |
| return | |
| # Découper la liste de caméras en groupes de 4 pour l'affichage en grille | |
| for row_start in range(0, len(selected_devices), 4): | |
| row_devices = selected_devices[row_start:row_start+4] | |
| cols = st.columns(len(row_devices)) | |
| for i, device_index in enumerate(row_devices): | |
| with cols[i]: | |
| st.markdown(f"**Caméra {device_index}**") | |
| ctx = webrtc_streamer( | |
| key=f"webcam-{device_index}", | |
| video_transformer_factory=lambda m=model, c=conf, iou_val=iou: VideoTransformer( | |
| m, | |
| conf=c, | |
| iou=iou_val, | |
| show_fps=show_fps, | |
| auto_snapshot=auto_snapshot, | |
| snapshot_interval=snapshot_interval, | |
| output_format=output_format, | |
| apply_filters=apply_filters, | |
| advanced_filters=advanced_filters, | |
| rotation_angle=rotation_angle, | |
| resize_width=resize_width, | |
| resize_height=resize_height, | |
| detection_zone=detection_zone, | |
| notification_email=notification_email, | |
| save_to_cloud=save_to_cloud, | |
| morphological_ops=morphological_ops, | |
| equalize_hist=equalize_hist, | |
| classes_to_detect=classes_to_detect, | |
| max_det=max_det, | |
| line_width=line_width, | |
| record_output=record_output, | |
| agnostic_nms=agnostic_nms | |
| ), | |
| rtc_configuration={"iceServers": [{"urls": ["stun:stun.l.google.com:19302"]}]}, | |
| media_stream_constraints={ | |
| "video": {"deviceId": {"exact": str(device_index)}}, | |
| "audio": False | |
| } | |
| ) | |
| # Bouton de snapshot manuel | |
| if st.button(f"📸 Snapshot Caméra {device_index}", key=f"snap-{device_index}"): | |
| if ctx.video_transformer: | |
| frame = ctx.video_transformer.last_frame | |
| if frame is not None: | |
| st.image(frame, caption=f"Snapshot Caméra {device_index}", channels="BGR") | |
| else: | |
| st.warning("🚫 Aucune image capturée pour cette caméra.") | |
| # Snapshot automatique téléchargeable | |
| if auto_snapshot and ctx.video_transformer and ctx.video_transformer.latest_snapshot is not None: | |
| ret, buffer = cv2.imencode('.png', ctx.video_transformer.latest_snapshot) | |
| if ret: | |
| snapshot_bytes = buffer.tobytes() | |
| st.download_button( | |
| label="📥 Télécharger Snapshot Auto", | |
| data=snapshot_bytes, | |
| file_name=f"snapshot_cam_{device_index}.png", | |
| key=f"auto_snap_{device_index}" | |
| ) | |
| ############################################################################### | |
| # GESTION CAMERA IP | |
| ############################################################################### | |
| def display_ip_camera( | |
| ip_url, | |
| model, | |
| conf, | |
| iou, | |
| show_fps, | |
| auto_snapshot, | |
| snapshot_interval, | |
| output_format, | |
| classes_to_detect=None, | |
| max_det=1000, | |
| line_width=2, | |
| agnostic_nms=False | |
| ): | |
| """ | |
| Lit des frames depuis une caméra IP (RTSP), applique YOLO, et les affiche en temps réel. | |
| """ | |
| cap = cv2.VideoCapture(ip_url) | |
| if not cap.isOpened(): | |
| st.error("🚫 Impossible d'ouvrir la caméra IP.") | |
| return | |
| frame_placeholder = st.empty() | |
| last_time = time.time() | |
| last_snapshot_time = time.time() | |
| stop_button = st.button("⏹️ Arrêter le streaming IP") | |
| while True: | |
| if stop_button: | |
| st.info("Arrêt du streaming IP.") | |
| break | |
| ret, frame = cap.read() | |
| if not ret: | |
| st.error("🚫 Erreur de lecture du flux IP.") | |
| break | |
| # Inférence YOLO | |
| results = model( | |
| frame, | |
| conf=conf, | |
| iou=iou, | |
| classes=classes_to_detect if classes_to_detect else None, | |
| max_det=max_det, | |
| agnostic_nms=agnostic_nms | |
| ) | |
| annotated_frame = results[0].plot(line_width=line_width) | |
| # FPS | |
| current_time = time.time() | |
| dt = current_time - last_time | |
| fps = 1.0 / dt if dt > 0 else 0.0 | |
| last_time = current_time | |
| if show_fps: | |
| cv2.putText(annotated_frame, f"FPS: {fps:.2f}", (10, 30), | |
| cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2) | |
| # Auto-snapshot | |
| if auto_snapshot and (current_time - last_snapshot_time >= snapshot_interval): | |
| last_snapshot_time = current_time | |
| snapshot_bytes = cv2.imencode('.png', annotated_frame)[1].tobytes() | |
| st.download_button( | |
| "📥 Télécharger Snapshot Auto", | |
| data=snapshot_bytes, | |
| file_name="ip_snapshot.png", | |
| key=f"ip_snapshot_{time.time()}" | |
| ) | |
| # Format de sortie | |
| if output_format.upper() == "RGB": | |
| annotated_frame = cv2.cvtColor(annotated_frame, cv2.COLOR_BGR2RGB) | |
| frame_placeholder.image( | |
| annotated_frame, | |
| channels="RGB" if output_format.upper()=="RGB" else "BGR" | |
| ) | |
| cap.release() | |
| ############################################################################### | |
| # APPLICATION PRINCIPALE | |
| ############################################################################### | |
| def main(): | |
| apply_techsolut_theme() | |
| # st.set_page_config(page_title="Plateforme de Vision par Ordinateur - TECHSOLUT", layout="wide") | |
| # st.title("👁️ Vision par Ordinateur - TECHSOLUT (Multi-Webcams Optimisé)") | |
| # 1) Choix du modèle | |
| model_versions = { | |
| "Détection": { | |
| "YOLOv5": ["yolov5nu", "yolov5s", "yolov5m", "yolov5l", "yolov5x"], | |
| "YOLOv8": ["yolov8n", "yolov8s", "yolov8m", "yolov8l", "yolov8x"], | |
| "YOLOv9": ["yolov9c", "yolov9e"], | |
| "YOLOv10": ["yolov10n", "yolov10s", "yolov10m", "yolov10l", "yolov10x"], | |
| "YOLO11": ["yolo11n", "yolo11s", "yolo11m", "yolo11l", "yolo11x"], | |
| "YOLO12": ["yolo12n", "yolo12s", "yolo12m", "yolo12l", "yolo12x"], | |
| "RT-DETR": ["rtdetr-l", "rtdetr-x"] | |
| }, | |
| "Segmentation": { | |
| "YOLOv8": ["yolov8n-seg", "yolov8s-seg", "yolov8m-seg", "yolov8l-seg", "yolov8x-seg"], | |
| "YOLOv9": ["yolov9c-seg", "yolov9e-seg"], | |
| "YOLO11": ["yolo11n-seg", "yolo11s-seg", "yolo11m-seg", "yolo11l-seg", "yolo11x-seg"] | |
| }, | |
| "Estimation de pose": { | |
| "YOLOv8": ["yolov8n-pose", "yolov8s-pose", "yolov8m-pose", "yolov8l-pose", "yolov8x-pose"], | |
| "YOLO11": ["yolo11n-pose", "yolo11s-pose", "yolo11m-pose", "yolo11l-pose", "yolo11x-pose"] | |
| }, | |
| "Personnalisé": { | |
| "Custom": ["custom"] | |
| } | |
| } | |
| with st.sidebar: | |
| with st.expander("🧠 Choix du modèle"): | |
| task_type = st.selectbox("Type de tâche", list(model_versions.keys())) | |
| model_family = st.selectbox("Famille de modèle", list(model_versions[task_type].keys())) | |
| selected_model = st.selectbox("Version du modèle", model_versions[task_type][model_family]) | |
| custom_model_path = None | |
| if selected_model == "custom": | |
| uploaded_file = st.file_uploader("📥 Charger un modèle (.pt)", type=["pt"]) | |
| if uploaded_file: | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".pt") as tmp: | |
| tmp.write(uploaded_file.read()) | |
| custom_model_path = tmp.name | |
| with st.expander("🎯 Paramètres"): | |
| conf = st.slider("Confiance (confidence threshold)", 0.0, 1.0, 0.25) | |
| iou = st.slider("IoU threshold", 0.0, 1.0, 0.45) | |
| input_size = st.selectbox("Taille d'entrée du modèle", | |
| options=[320, 416, 512, 640, 960, 1280], | |
| index=3) | |
| processing_mode = st.radio("Mode de traitement", | |
| options=["Image par image", "Traitement par lot"], | |
| horizontal=True) | |
| show_fps = st.checkbox("Afficher le FPS sur la vidéo", value=False) | |
| auto_snapshot = st.checkbox("Téléchargement automatique des snapshots", value=False) | |
| output_format = st.selectbox("Format de sortie des frames", options=["BGR", "RGB"], index=1) | |
| snapshot_interval = st.number_input("Sauvegarder une frame toutes les X secondes", | |
| min_value=1, max_value=60, value=5, step=1) | |
| apply_filters = st.checkbox("Appliquer des filtres (grayscale + flou)", value=False) | |
| advanced_filters = st.checkbox("Appliquer des filtres avancés (contraste + luminosité)", value=False) | |
| rotation_angle = st.number_input("Angle de rotation", min_value=0, max_value=360, value=0, step=1) | |
| resize_width = st.number_input("Largeur de redimensionnement", min_value=1, value=640, step=1) | |
| resize_height = st.number_input("Hauteur de redimensionnement", min_value=1, value=480, step=1) | |
| detection_zone = st.checkbox("Définir une zone de détection") | |
| if detection_zone: | |
| x = st.number_input("Zone X", min_value=0, value=0, step=1) | |
| y = st.number_input("Zone Y", min_value=0, value=0, step=1) | |
| w = st.number_input("Zone Largeur", min_value=1, value=640, step=1) | |
| h = st.number_input("Zone Hauteur", min_value=1, value=480, step=1) | |
| detection_zone = (x, y, w, h) | |
| else: | |
| detection_zone = None | |
| notification_email = st.text_input("Email pour notifications") | |
| save_to_cloud_flag = st.checkbox("Sauvegarder les résultats sur le cloud") | |
| with st.expander("🧩 Options Avancées"): | |
| morphological_ops = st.checkbox("Opérations morphologiques (opening/closing)") | |
| equalize_hist = st.checkbox("Égaliser l'histogramme (améliorer contraste)") | |
| custom_classes = st.text_input("Lister les classes (ID, séparés par des virgules) à détecter ou laisser vide") | |
| if custom_classes.strip(): | |
| classes_to_detect = [int(c.strip()) for c in custom_classes.split(",") if c.strip().isdigit()] | |
| else: | |
| classes_to_detect = None | |
| max_det = st.number_input("Max Detections autorisées", min_value=1, max_value=10000, value=1000, step=50) | |
| line_width = st.slider("Épaisseur des bounding boxes", 1, 10, 2) | |
| record_output = st.checkbox("Enregistrer les flux webcam en local (format MP4)") | |
| agnostic_nms = st.checkbox("Agnostic NMS (ignorer les classes lors du NMS)") | |
| with st.expander("🖍️ Post-traitement"): | |
| export_type = st.selectbox("Exporter sous", ["PDF", "ZIP", "CSV", "JSON"]) | |
| with st.expander("☁️ Sauvegarde Cloud"): | |
| cloud_service = st.selectbox("Choisir un service", ["Google Drive", "Dropbox", "OneDrive"]) | |
| cloud_file = st.file_uploader("📤 Sélectionner un fichier à sauvegarder", | |
| type=["pdf", "zip", "csv", "jpg", "png", "json"]) | |
| if cloud_file: | |
| if st.button(f"Sauvegarder sur {cloud_service}"): | |
| save_to_cloud(cloud_file.read(), cloud_service) | |
| # ------------------------------------------------------------------------- | |
| # 2) CHARGEMENT DU MODELE | |
| # ------------------------------------------------------------------------- | |
| model = load_model(selected_model, custom_model_path) | |
| class_names = model.names if hasattr(model, 'names') else [] | |
| # ------------------------------------------------------------------------- | |
| # 3) SECTION CENTRALE: CHOIX DE LA SOURCE | |
| # ------------------------------------------------------------------------- | |
| st.subheader("Source : 🖼️ Image, 🎥 Vidéo, 📷 Webcam, 🌐 Caméra IP ou 🔄 Relecture") | |
| input_type = st.radio("Source", ["Image", "Vidéo", "Webcam", "Caméra IP", "Relecture"], horizontal=True) | |
| # ================== 1) IMAGE ================== | |
| if input_type == "Image": | |
| uploaded_images = st.file_uploader("📁 Choisir des images", type=["jpg", "png"], accept_multiple_files=True) | |
| if uploaded_images: | |
| output_images = [] | |
| csv_rows = [] | |
| for img_file in uploaded_images: | |
| image = Image.open(img_file).convert("RGB") | |
| st.image(image, caption=f"🖼️ {img_file.name}") | |
| annotated_image, results = detect_objects( | |
| model=model, | |
| image=image, | |
| model_type=task_type, | |
| conf=conf, | |
| iou=iou, | |
| classes_to_detect=classes_to_detect, | |
| max_det=max_det, | |
| line_width=line_width, | |
| agnostic_nms=agnostic_nms | |
| ) | |
| st.image(annotated_image, caption="🖼️ Image annotée") | |
| # Convert BGR->RGB PIL pour stockage | |
| output_images.append(Image.fromarray(cv2.cvtColor(annotated_image, cv2.COLOR_BGR2RGB))) | |
| counts = count_objects(results, task_type, class_names) | |
| for cls_name, count_val in counts.items(): | |
| csv_rows.append({"Image": img_file.name, "Classe": cls_name, "Nombre": count_val}) | |
| show_table(counts) | |
| # Export | |
| if output_images: | |
| if export_type == "PDF": | |
| export_pdf(output_images) | |
| elif export_type == "ZIP": | |
| export_zip(output_images) | |
| elif export_type == "CSV": | |
| export_csv_rows(csv_rows) | |
| elif export_type == "JSON": | |
| json_data = json.dumps(csv_rows, indent=4) | |
| st.download_button("📥 Télécharger JSON des détections", | |
| data=json_data, | |
| file_name="detections.json", | |
| mime="application/json") | |
| if save_to_cloud_flag: | |
| st.info("Exemple: sauvegarde ZIP des images sur Cloud.") | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".zip") as tmpfile: | |
| zip_path = tmpfile.name | |
| with ZipFile(zip_path, 'w') as zipf: | |
| for i, img in enumerate(output_images): | |
| img_filename = f"cloud_image_{i}.png" | |
| img.save(img_filename) | |
| zipf.write(img_filename) | |
| os.remove(img_filename) | |
| with open(zip_path, "rb") as f: | |
| zip_data = f.read() | |
| save_to_cloud(zip_data, cloud_service) | |
| os.remove(zip_path) | |
| # ================== 2) VIDEO ================== | |
| elif input_type == "Vidéo": | |
| uploaded_video = st.file_uploader("📁 Choisir une vidéo", type=["mp4", "avi", "mov"], accept_multiple_files=False) | |
| if uploaded_video is not None: | |
| tfile = tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") | |
| tfile.write(uploaded_video.read()) | |
| tfile.close() | |
| process_video_file( | |
| video_path=tfile.name, | |
| model=model, | |
| conf=conf, | |
| iou=iou, | |
| export_type=export_type, | |
| classes_to_detect=classes_to_detect, | |
| max_det=max_det, | |
| line_width=line_width, | |
| agnostic_nms=agnostic_nms | |
| ) | |
| os.unlink(tfile.name) | |
| # ================== 3) MULTI-WEBCAM ================== | |
| elif input_type == "Webcam": | |
| st.info("Recherche de toutes les webcams disponibles...") | |
| available_devices = [] | |
| # Scanner 0..20 pour découvrir potentiellement plus de webcams | |
| for index in range(21): | |
| cap = cv2.VideoCapture(index) | |
| if cap.isOpened(): | |
| ret, _ = cap.read() | |
| if ret: | |
| available_devices.append(index) | |
| cap.release() | |
| if len(available_devices) == 0: | |
| st.error("🚫 Aucune webcam détectée ou accessible.") | |
| else: | |
| st.success(f"Webcams détectées : {available_devices}") | |
| selected_devices = st.multiselect( | |
| "Sélectionner les caméras à utiliser (max 4 affichées par rangée)", | |
| options=available_devices, | |
| default=available_devices[:1], | |
| format_func=lambda x: f"Caméra {x}" | |
| ) | |
| if selected_devices: | |
| display_webcam_streams( | |
| selected_devices=selected_devices, | |
| model=model, | |
| conf=conf, | |
| iou=iou, | |
| show_fps=show_fps, | |
| auto_snapshot=auto_snapshot, | |
| snapshot_interval=snapshot_interval, | |
| output_format=output_format, | |
| apply_filters=apply_filters, | |
| advanced_filters=advanced_filters, | |
| rotation_angle=rotation_angle, | |
| resize_width=resize_width, | |
| resize_height=resize_height, | |
| detection_zone=detection_zone, | |
| notification_email=notification_email, | |
| save_to_cloud=save_to_cloud_flag, | |
| morphological_ops=morphological_ops, | |
| equalize_hist=equalize_hist, | |
| classes_to_detect=classes_to_detect, | |
| max_det=max_det, | |
| line_width=line_width, | |
| record_output=record_output, | |
| agnostic_nms=agnostic_nms | |
| ) | |
| # ================== 4) CAMERA IP ================== | |
| elif input_type == "Caméra IP": | |
| st.info("Activation de la caméra IP...") | |
| ip_url = st.text_input("Entrez l'URL RTSP", value="rtsp://") | |
| if st.button("Démarrer le streaming IP"): | |
| display_ip_camera( | |
| ip_url=ip_url, | |
| model=model, | |
| conf=conf, | |
| iou=iou, | |
| show_fps=show_fps, | |
| auto_snapshot=auto_snapshot, | |
| snapshot_interval=snapshot_interval, | |
| output_format=output_format, | |
| classes_to_detect=classes_to_detect, | |
| max_det=max_det, | |
| line_width=line_width, | |
| agnostic_nms=agnostic_nms | |
| ) | |
| # ================== 5) RELECTURE ================== | |
| elif input_type == "Relecture": | |
| st.info("Relecture de vidéos enregistrées") | |
| recorded_video = st.file_uploader("📁 Charger une vidéo enregistrée", | |
| type=["mp4", "avi", "mov"], | |
| accept_multiple_files=False) | |
| if recorded_video is not None: | |
| st.video(recorded_video) | |
| ############################################################################### | |
| # LANCEMENT DU SCRIPT | |
| ############################################################################### | |
| if __name__ == "__main__": | |
| main() | |