tutosutiles's picture
Update app.py
6f1421f verified
import os
import time
import tempfile
import json
from zipfile import ZipFile
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
import cv2
import numpy as np
import pandas as pd
from PIL import Image
import streamlit as st
from fpdf import FPDF
from st_aggrid import AgGrid
from st_aggrid.grid_options_builder import GridOptionsBuilder
from techsolut_theme import apply_techsolut_theme
from ultralytics import YOLO, RTDETR
from streamlit_webrtc import webrtc_streamer, VideoTransformerBase
###############################################################################
# FONCTIONS LIEES AU MODELE
###############################################################################
@st.cache_resource()
def load_model(model_choice, custom_model_path=None):
"""
Chargement du modèle YOLO/RT-DETR en fonction du choix utilisateur,
avec mise en cache pour accélérer le rechargement.
"""
detection_models = [
"yolov5nu", "yolov5s", "yolov5m", "yolov5l", "yolov5x",
"yolov8n", "yolov8s", "yolov8m", "yolov8l", "yolov8x",
"yolov9c", "yolov9e",
"yolov10n", "yolov10s", "yolov10m", "yolov10l", "yolov10x",
"yolo11n", "yolo11s", "yolo11m", "yolo11l", "yolo11x",
"yolo12n", "yolo12s", "yolo12m", "yolo12l", "yolo12x",
"rtdetr-l", "rtdetr-x"
]
segmentation_models = [
"yolov8n-seg", "yolov8s-seg", "yolov8m-seg", "yolov8l-seg", "yolov8x-seg",
"yolov9c-seg", "yolov9e-seg",
"yolo11n-seg", "yolo11s-seg", "yolo11m-seg", "yolo11l-seg", "yolo11x-seg"
]
pose_models = [
"yolov8n-pose", "yolov8s-pose", "yolov8m-pose", "yolov8l-pose", "yolov8x-pose",
"yolo11n-pose", "yolo11s-pose", "yolo11m-pose", "yolo11l-pose", "yolo11x-pose"
]
# Choix du modèle
if model_choice in detection_models + segmentation_models + pose_models:
return YOLO(f"{model_choice}.pt")
elif model_choice == 'custom' and custom_model_path:
return YOLO(custom_model_path)
else:
# Exemple pour gérer plusieurs PT nommés 'best.pt' ...
model_paths = ["best.pt", "best2.pt", "best3.pt", "best5.pt"]
model_names = ["model1", "model2", "model3", "model4"]
idx = model_names.index(model_choice)
return YOLO(model_paths[idx])
def detect_objects(model,
image,
model_type,
conf,
iou,
classes_to_detect=None,
max_det=1000,
line_width=2,
agnostic_nms=False):
"""
Détection sur une image unique, en tenant compte des filtres de classe,
du paramètre max_det, de l'épaisseur de bounding box, et du agnostic_nms.
"""
image_np = np.array(image) if not isinstance(image, np.ndarray) else image
# Inférence YOLO/RT-DETR
results = model(
image_np,
conf=conf,
iou=iou,
classes=classes_to_detect if classes_to_detect else None,
max_det=max_det,
agnostic_nms=agnostic_nms
)
annotated_image = results[0].plot(line_width=line_width)
return annotated_image, results
def count_objects(results, model_type, class_names):
"""
Compte le nombre d'objets détectés par classe.
"""
object_counts = {}
classes = results[0].boxes.cls.cpu().numpy()
for cls_id in classes:
name = class_names[int(cls_id)]
object_counts[name] = object_counts.get(name, 0) + 1
return object_counts
###############################################################################
# FONCTIONS D'EXPORT (PDF, ZIP, CSV, JSON)
###############################################################################
def export_pdf(images):
"""
Exporte une liste d'images PIL en un seul PDF avec un bouton de téléchargement.
"""
with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as tmpfile:
pdf_path = tmpfile.name
images[0].save(pdf_path, save_all=True, append_images=images[1:])
with open(pdf_path, "rb") as f:
st.download_button("📄 Télécharger le PDF", data=f, file_name="resultats.pdf")
def export_zip(images):
"""
Exporte une liste d'images PIL dans un ZIP avec un bouton de téléchargement.
"""
with tempfile.NamedTemporaryFile(delete=False, suffix=".zip") as tmpfile:
zip_path = tmpfile.name
with ZipFile(zip_path, 'w') as zipf:
for i, img in enumerate(images):
img_filename = f"image_{i}.png"
img.save(img_filename)
zipf.write(img_filename)
os.remove(img_filename)
with open(zip_path, "rb") as f:
st.download_button("🗜️ Télécharger le ZIP", data=f, file_name="resultats.zip")
def export_csv_rows(csv_rows):
"""
Exporte les détections dans un CSV avec un bouton de téléchargement.
"""
df = pd.DataFrame(csv_rows)
csv_data = df.to_csv(index=False).encode("utf-8")
st.download_button("📤 Télécharger CSV des détections", data=csv_data,
file_name="detections.csv", mime="text/csv")
###############################################################################
# AFFICHAGE DU TABLEAU (st_aggrid)
###############################################################################
def show_table(data, key=None):
"""
Affiche un tableau récapitulatif (classe / nombre d'objets détectés).
"""
df = pd.DataFrame(list(data.items()), columns=["Classe", "Nombre"])
gb = GridOptionsBuilder.from_dataframe(df)
gb.configure_pagination()
gb.configure_default_column(editable=False, groupable=True)
gb.configure_selection('multiple', use_checkbox=True)
grid_options = gb.build()
AgGrid(df, gridOptions=grid_options, theme="streamlit", key=key)
###############################################################################
# FONCTIONS ENVOI EMAIL & SAUVEGARDE SUR LE CLOUD
###############################################################################
def send_notification_smtp(to_email, message):
"""
Envoi d'un email via SMTP, nécessite des identifiants valides dans st.secrets.
"""
smtp_server = st.secrets.get("SMTP_SERVER", "smtp.gmail.com")
smtp_port = int(st.secrets.get("SMTP_PORT", 587))
smtp_user = st.secrets.get("SMTP_USER", "your_email@gmail.com")
smtp_pass = st.secrets.get("SMTP_PASS", "your_password")
try:
msg = MIMEMultipart("alternative")
msg["Subject"] = "YOLO Detection Notification"
msg["From"] = smtp_user
msg["To"] = to_email
part = MIMEText(message, "plain")
msg.attach(part)
with smtplib.SMTP(smtp_server, smtp_port) as server:
server.starttls()
server.login(smtp_user, smtp_pass)
server.sendmail(smtp_user, to_email, msg.as_string())
st.success(f"Email envoyé à {to_email} avec succès!")
except Exception as e:
st.error(f"Erreur lors de l'envoi du mail: {e}")
def save_to_cloud(file_data, service):
"""
Fonction illustrative pour sauvegarder des données sur Google Drive / Dropbox / OneDrive.
Remplacer avec du code d'API réel selon le service.
"""
if service == "Google Drive":
st.info("Exemple : utiliser PyDrive ou Google Drive API.")
elif service == "Dropbox":
st.info("Exemple : utiliser le SDK Dropbox pour l'upload.")
elif service == "OneDrive":
st.info("Exemple : utiliser le SDK OneDrive (MS Graph).")
st.success(f"Sauvegarde simulée sur {service} réalisée avec succès !")
###############################################################################
# TRANSFORMER POUR STREAMLIT_WEBRTC (VIDEO TEMPS REEL)
###############################################################################
class VideoTransformer(VideoTransformerBase):
def __init__(
self,
model,
conf=0.25,
iou=0.45,
show_fps=False,
auto_snapshot=False,
snapshot_interval=5,
output_format="RGB",
apply_filters=False,
advanced_filters=False,
rotation_angle=0,
resize_width=None,
resize_height=None,
detection_zone=None,
notification_email=None,
save_to_cloud=False,
morphological_ops=False,
equalize_hist=False,
classes_to_detect=None,
max_det=1000,
line_width=2,
record_output=False,
agnostic_nms=False
):
"""
Gère chaque frame de la webcam en temps réel, applique YOLO,
différents filtres, la rotation, la sauvegarde locale, etc.
"""
self.model = model
self.conf = conf
self.iou = iou
self.show_fps = show_fps
self.auto_snapshot = auto_snapshot
self.snapshot_interval = snapshot_interval
self.output_format = output_format.upper()
self.apply_filters = apply_filters
self.advanced_filters = advanced_filters
self.rotation_angle = rotation_angle
self.resize_width = resize_width
self.resize_height = resize_height
self.detection_zone = detection_zone
self.notification_email = notification_email
self.save_to_cloud = save_to_cloud
self.morphological_ops = morphological_ops
self.equalize_hist = equalize_hist
self.classes_to_detect = classes_to_detect
self.max_det = max_det
self.line_width = line_width
self.record_output = record_output
self.agnostic_nms = agnostic_nms
self.last_time = time.time()
self.last_snapshot_time = time.time()
self.latest_snapshot = None
self.last_frame = None
# Configuration pour l'enregistrement local si besoin
self.video_writer = None
if self.record_output:
self.output_filename = os.path.join(tempfile.gettempdir(),
f"webcam_record_{time.time()}.mp4")
def transform(self, frame):
image = frame.to_ndarray(format="bgr24")
self.last_frame = image.copy()
# Initialiser la sauvegarde si besoin
if self.record_output and self.video_writer is None:
fourcc = cv2.VideoWriter_fourcc(*"mp4v")
h, w, _ = image.shape
self.video_writer = cv2.VideoWriter(self.output_filename, fourcc, 20.0, (w, h))
# Application de filtres simples
if self.apply_filters:
image = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
image = cv2.GaussianBlur(image, (5, 5), 0)
image = cv2.cvtColor(image, cv2.COLOR_GRAY2BGR)
# Filtres avancés (contraste, luminosité)
if self.advanced_filters:
image = cv2.convertScaleAbs(image, alpha=1.5, beta=30)
# Opérations morphologiques
if self.morphological_ops:
kernel = np.ones((3,3), np.uint8)
image = cv2.morphologyEx(image, cv2.MORPH_OPEN, kernel)
image = cv2.morphologyEx(image, cv2.MORPH_CLOSE, kernel)
# Egalisation d'histogramme
if self.equalize_hist:
yuv = cv2.cvtColor(image, cv2.COLOR_BGR2YUV)
yuv[:, :, 0] = cv2.equalizeHist(yuv[:, :, 0])
image = cv2.cvtColor(yuv, cv2.COLOR_YUV2BGR)
# Rotation
if self.rotation_angle != 0:
image = self.rotate_image(image, self.rotation_angle)
# Redimensionnement
if self.resize_width and self.resize_height:
image = cv2.resize(image, (self.resize_width, self.resize_height))
# Zone de détection
if self.detection_zone:
x, y, w, h = self.detection_zone
image = image[y:y+h, x:x+w]
# Inférence YOLO
results = self.model(
image,
conf=self.conf,
iou=self.iou,
classes=self.classes_to_detect if self.classes_to_detect else None,
max_det=self.max_det,
agnostic_nms=self.agnostic_nms
)
annotated_frame = results[0].plot(line_width=self.line_width)
# Affichage FPS
current_time = time.time()
dt = current_time - self.last_time
fps = 1.0 / dt if dt > 0 else 0.0
self.last_time = current_time
if self.show_fps:
cv2.putText(annotated_frame, f"FPS: {fps:.2f}", (10, 30),
cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
# Auto Snapshot
if self.auto_snapshot and (current_time - self.last_snapshot_time >= self.snapshot_interval):
self.last_snapshot_time = current_time
self.latest_snapshot = annotated_frame.copy()
# Enregistrement local
if self.record_output and self.video_writer is not None:
self.video_writer.write(annotated_frame)
# Format de sortie (RGB vs BGR)
if self.output_format == "RGB":
display_frame = cv2.cvtColor(annotated_frame, cv2.COLOR_BGR2RGB)
else:
display_frame = annotated_frame
# Notification email
if self.notification_email and any(results):
send_notification_smtp(self.notification_email, "Détection réalisée sur le flux webcam !")
# Sauvegarde sur le cloud si détection
if self.save_to_cloud and any(results):
_, buffer = cv2.imencode('.png', annotated_frame)
save_to_cloud(buffer.tobytes(), "Google Drive")
return display_frame
def rotate_image(self, image, angle):
(h, w) = image.shape[:2]
center = (w / 2, h / 2)
M = cv2.getRotationMatrix2D(center, angle, 1.0)
return cv2.warpAffine(image, M, (w, h))
def __del__(self):
if self.video_writer:
self.video_writer.release()
###############################################################################
# TRAITEMENT DE VIDEOS (FICHIER LOCAL)
###############################################################################
def process_video_file(
video_path,
model,
conf,
iou,
export_type,
classes_to_detect=None,
max_det=1000,
line_width=2,
agnostic_nms=False
):
"""
Traite une vidéo en local, affiche certaines frames annotées,
et propose l'exportation des résultats.
"""
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
st.error("🚫 Impossible d'ouvrir la vidéo.")
return
frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
st.info(f"🎞️ Vidéo chargée, {frame_count} frames trouvées.")
process_only_first = st.checkbox("Traiter seulement la première frame", value=True)
output_images = []
csv_rows = []
if process_only_first:
ret, frame = cap.read()
if ret:
pil_img = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
annotated_frame, results = detect_objects(
model=model,
image=pil_img,
model_type="Vidéo",
conf=conf,
iou=iou,
classes_to_detect=classes_to_detect,
max_det=max_det,
line_width=line_width,
agnostic_nms=agnostic_nms
)
st.image(annotated_frame, channels="BGR", caption="🖼️ Première frame annotée")
output_images.append(Image.fromarray(cv2.cvtColor(annotated_frame, cv2.COLOR_BGR2RGB)))
counts = count_objects(results, "Vidéo", model.names if hasattr(model, 'names') else [])
for cls_name, count_val in counts.items():
csv_rows.append({"Image": "frame_0", "Classe": cls_name, "Nombre": count_val})
else:
st.error("🚫 Impossible de lire la première frame.")
else:
num_frames_to_process = st.slider("Nombre de frames à traiter", 1, min(frame_count, 50), 10)
frame_idx = 0
processed = 0
interval = max(1, frame_count // num_frames_to_process)
while processed < num_frames_to_process:
ret, frame = cap.read()
if not ret:
break
if frame_idx % interval == 0:
pil_img = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
annotated_frame, results = detect_objects(
model=model,
image=pil_img,
model_type="Vidéo",
conf=conf,
iou=iou,
classes_to_detect=classes_to_detect,
max_det=max_det,
line_width=line_width,
agnostic_nms=agnostic_nms
)
st.image(annotated_frame, channels="BGR", caption=f"🖼️ Frame {frame_idx} annotée")
output_images.append(Image.fromarray(cv2.cvtColor(annotated_frame, cv2.COLOR_BGR2RGB)))
counts = count_objects(results, "Vidéo", model.names if hasattr(model, 'names') else [])
for cls_name, count_val in counts.items():
csv_rows.append({"Image": f"frame_{frame_idx}", "Classe": cls_name, "Nombre": count_val})
processed += 1
frame_idx += 1
cap.release()
# Export
if output_images:
if export_type == "PDF":
export_pdf(output_images)
elif export_type == "ZIP":
export_zip(output_images)
elif export_type == "CSV":
export_csv_rows(csv_rows)
elif export_type == "JSON":
json_data = json.dumps(csv_rows, indent=4)
st.download_button("📥 Télécharger JSON des détections",
data=json_data,
file_name="detections.json",
mime="application/json")
###############################################################################
# GESTION OPTIMISEE DES WEBCAMS: AFFICHER PLUSIEURS CAMERAS
###############################################################################
def display_webcam_streams(
selected_devices,
model,
conf,
iou,
show_fps,
auto_snapshot,
snapshot_interval,
output_format,
apply_filters,
advanced_filters,
rotation_angle,
resize_width,
resize_height,
detection_zone,
notification_email,
save_to_cloud,
morphological_ops,
equalize_hist,
classes_to_detect,
max_det,
line_width,
record_output,
agnostic_nms
):
"""
Affiche plusieurs webcams simultanément, organisées en lignes de 4 caméras max.
Chaque webcam possède sa propre instance de VideoTransformer.
"""
if not selected_devices:
st.warning("Aucune webcam sélectionnée.")
return
# Découper la liste de caméras en groupes de 4 pour l'affichage en grille
for row_start in range(0, len(selected_devices), 4):
row_devices = selected_devices[row_start:row_start+4]
cols = st.columns(len(row_devices))
for i, device_index in enumerate(row_devices):
with cols[i]:
st.markdown(f"**Caméra {device_index}**")
ctx = webrtc_streamer(
key=f"webcam-{device_index}",
video_transformer_factory=lambda m=model, c=conf, iou_val=iou: VideoTransformer(
m,
conf=c,
iou=iou_val,
show_fps=show_fps,
auto_snapshot=auto_snapshot,
snapshot_interval=snapshot_interval,
output_format=output_format,
apply_filters=apply_filters,
advanced_filters=advanced_filters,
rotation_angle=rotation_angle,
resize_width=resize_width,
resize_height=resize_height,
detection_zone=detection_zone,
notification_email=notification_email,
save_to_cloud=save_to_cloud,
morphological_ops=morphological_ops,
equalize_hist=equalize_hist,
classes_to_detect=classes_to_detect,
max_det=max_det,
line_width=line_width,
record_output=record_output,
agnostic_nms=agnostic_nms
),
rtc_configuration={"iceServers": [{"urls": ["stun:stun.l.google.com:19302"]}]},
media_stream_constraints={
"video": {"deviceId": {"exact": str(device_index)}},
"audio": False
}
)
# Bouton de snapshot manuel
if st.button(f"📸 Snapshot Caméra {device_index}", key=f"snap-{device_index}"):
if ctx.video_transformer:
frame = ctx.video_transformer.last_frame
if frame is not None:
st.image(frame, caption=f"Snapshot Caméra {device_index}", channels="BGR")
else:
st.warning("🚫 Aucune image capturée pour cette caméra.")
# Snapshot automatique téléchargeable
if auto_snapshot and ctx.video_transformer and ctx.video_transformer.latest_snapshot is not None:
ret, buffer = cv2.imencode('.png', ctx.video_transformer.latest_snapshot)
if ret:
snapshot_bytes = buffer.tobytes()
st.download_button(
label="📥 Télécharger Snapshot Auto",
data=snapshot_bytes,
file_name=f"snapshot_cam_{device_index}.png",
key=f"auto_snap_{device_index}"
)
###############################################################################
# GESTION CAMERA IP
###############################################################################
def display_ip_camera(
ip_url,
model,
conf,
iou,
show_fps,
auto_snapshot,
snapshot_interval,
output_format,
classes_to_detect=None,
max_det=1000,
line_width=2,
agnostic_nms=False
):
"""
Lit des frames depuis une caméra IP (RTSP), applique YOLO, et les affiche en temps réel.
"""
cap = cv2.VideoCapture(ip_url)
if not cap.isOpened():
st.error("🚫 Impossible d'ouvrir la caméra IP.")
return
frame_placeholder = st.empty()
last_time = time.time()
last_snapshot_time = time.time()
stop_button = st.button("⏹️ Arrêter le streaming IP")
while True:
if stop_button:
st.info("Arrêt du streaming IP.")
break
ret, frame = cap.read()
if not ret:
st.error("🚫 Erreur de lecture du flux IP.")
break
# Inférence YOLO
results = model(
frame,
conf=conf,
iou=iou,
classes=classes_to_detect if classes_to_detect else None,
max_det=max_det,
agnostic_nms=agnostic_nms
)
annotated_frame = results[0].plot(line_width=line_width)
# FPS
current_time = time.time()
dt = current_time - last_time
fps = 1.0 / dt if dt > 0 else 0.0
last_time = current_time
if show_fps:
cv2.putText(annotated_frame, f"FPS: {fps:.2f}", (10, 30),
cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
# Auto-snapshot
if auto_snapshot and (current_time - last_snapshot_time >= snapshot_interval):
last_snapshot_time = current_time
snapshot_bytes = cv2.imencode('.png', annotated_frame)[1].tobytes()
st.download_button(
"📥 Télécharger Snapshot Auto",
data=snapshot_bytes,
file_name="ip_snapshot.png",
key=f"ip_snapshot_{time.time()}"
)
# Format de sortie
if output_format.upper() == "RGB":
annotated_frame = cv2.cvtColor(annotated_frame, cv2.COLOR_BGR2RGB)
frame_placeholder.image(
annotated_frame,
channels="RGB" if output_format.upper()=="RGB" else "BGR"
)
cap.release()
###############################################################################
# APPLICATION PRINCIPALE
###############################################################################
def main():
apply_techsolut_theme()
# st.set_page_config(page_title="Plateforme de Vision par Ordinateur - TECHSOLUT", layout="wide")
# st.title("👁️ Vision par Ordinateur - TECHSOLUT (Multi-Webcams Optimisé)")
# 1) Choix du modèle
model_versions = {
"Détection": {
"YOLOv5": ["yolov5nu", "yolov5s", "yolov5m", "yolov5l", "yolov5x"],
"YOLOv8": ["yolov8n", "yolov8s", "yolov8m", "yolov8l", "yolov8x"],
"YOLOv9": ["yolov9c", "yolov9e"],
"YOLOv10": ["yolov10n", "yolov10s", "yolov10m", "yolov10l", "yolov10x"],
"YOLO11": ["yolo11n", "yolo11s", "yolo11m", "yolo11l", "yolo11x"],
"YOLO12": ["yolo12n", "yolo12s", "yolo12m", "yolo12l", "yolo12x"],
"RT-DETR": ["rtdetr-l", "rtdetr-x"]
},
"Segmentation": {
"YOLOv8": ["yolov8n-seg", "yolov8s-seg", "yolov8m-seg", "yolov8l-seg", "yolov8x-seg"],
"YOLOv9": ["yolov9c-seg", "yolov9e-seg"],
"YOLO11": ["yolo11n-seg", "yolo11s-seg", "yolo11m-seg", "yolo11l-seg", "yolo11x-seg"]
},
"Estimation de pose": {
"YOLOv8": ["yolov8n-pose", "yolov8s-pose", "yolov8m-pose", "yolov8l-pose", "yolov8x-pose"],
"YOLO11": ["yolo11n-pose", "yolo11s-pose", "yolo11m-pose", "yolo11l-pose", "yolo11x-pose"]
},
"Personnalisé": {
"Custom": ["custom"]
}
}
with st.sidebar:
with st.expander("🧠 Choix du modèle"):
task_type = st.selectbox("Type de tâche", list(model_versions.keys()))
model_family = st.selectbox("Famille de modèle", list(model_versions[task_type].keys()))
selected_model = st.selectbox("Version du modèle", model_versions[task_type][model_family])
custom_model_path = None
if selected_model == "custom":
uploaded_file = st.file_uploader("📥 Charger un modèle (.pt)", type=["pt"])
if uploaded_file:
with tempfile.NamedTemporaryFile(delete=False, suffix=".pt") as tmp:
tmp.write(uploaded_file.read())
custom_model_path = tmp.name
with st.expander("🎯 Paramètres"):
conf = st.slider("Confiance (confidence threshold)", 0.0, 1.0, 0.25)
iou = st.slider("IoU threshold", 0.0, 1.0, 0.45)
input_size = st.selectbox("Taille d'entrée du modèle",
options=[320, 416, 512, 640, 960, 1280],
index=3)
processing_mode = st.radio("Mode de traitement",
options=["Image par image", "Traitement par lot"],
horizontal=True)
show_fps = st.checkbox("Afficher le FPS sur la vidéo", value=False)
auto_snapshot = st.checkbox("Téléchargement automatique des snapshots", value=False)
output_format = st.selectbox("Format de sortie des frames", options=["BGR", "RGB"], index=1)
snapshot_interval = st.number_input("Sauvegarder une frame toutes les X secondes",
min_value=1, max_value=60, value=5, step=1)
apply_filters = st.checkbox("Appliquer des filtres (grayscale + flou)", value=False)
advanced_filters = st.checkbox("Appliquer des filtres avancés (contraste + luminosité)", value=False)
rotation_angle = st.number_input("Angle de rotation", min_value=0, max_value=360, value=0, step=1)
resize_width = st.number_input("Largeur de redimensionnement", min_value=1, value=640, step=1)
resize_height = st.number_input("Hauteur de redimensionnement", min_value=1, value=480, step=1)
detection_zone = st.checkbox("Définir une zone de détection")
if detection_zone:
x = st.number_input("Zone X", min_value=0, value=0, step=1)
y = st.number_input("Zone Y", min_value=0, value=0, step=1)
w = st.number_input("Zone Largeur", min_value=1, value=640, step=1)
h = st.number_input("Zone Hauteur", min_value=1, value=480, step=1)
detection_zone = (x, y, w, h)
else:
detection_zone = None
notification_email = st.text_input("Email pour notifications")
save_to_cloud_flag = st.checkbox("Sauvegarder les résultats sur le cloud")
with st.expander("🧩 Options Avancées"):
morphological_ops = st.checkbox("Opérations morphologiques (opening/closing)")
equalize_hist = st.checkbox("Égaliser l'histogramme (améliorer contraste)")
custom_classes = st.text_input("Lister les classes (ID, séparés par des virgules) à détecter ou laisser vide")
if custom_classes.strip():
classes_to_detect = [int(c.strip()) for c in custom_classes.split(",") if c.strip().isdigit()]
else:
classes_to_detect = None
max_det = st.number_input("Max Detections autorisées", min_value=1, max_value=10000, value=1000, step=50)
line_width = st.slider("Épaisseur des bounding boxes", 1, 10, 2)
record_output = st.checkbox("Enregistrer les flux webcam en local (format MP4)")
agnostic_nms = st.checkbox("Agnostic NMS (ignorer les classes lors du NMS)")
with st.expander("🖍️ Post-traitement"):
export_type = st.selectbox("Exporter sous", ["PDF", "ZIP", "CSV", "JSON"])
with st.expander("☁️ Sauvegarde Cloud"):
cloud_service = st.selectbox("Choisir un service", ["Google Drive", "Dropbox", "OneDrive"])
cloud_file = st.file_uploader("📤 Sélectionner un fichier à sauvegarder",
type=["pdf", "zip", "csv", "jpg", "png", "json"])
if cloud_file:
if st.button(f"Sauvegarder sur {cloud_service}"):
save_to_cloud(cloud_file.read(), cloud_service)
# -------------------------------------------------------------------------
# 2) CHARGEMENT DU MODELE
# -------------------------------------------------------------------------
model = load_model(selected_model, custom_model_path)
class_names = model.names if hasattr(model, 'names') else []
# -------------------------------------------------------------------------
# 3) SECTION CENTRALE: CHOIX DE LA SOURCE
# -------------------------------------------------------------------------
st.subheader("Source : 🖼️ Image, 🎥 Vidéo, 📷 Webcam, 🌐 Caméra IP ou 🔄 Relecture")
input_type = st.radio("Source", ["Image", "Vidéo", "Webcam", "Caméra IP", "Relecture"], horizontal=True)
# ================== 1) IMAGE ==================
if input_type == "Image":
uploaded_images = st.file_uploader("📁 Choisir des images", type=["jpg", "png"], accept_multiple_files=True)
if uploaded_images:
output_images = []
csv_rows = []
for img_file in uploaded_images:
image = Image.open(img_file).convert("RGB")
st.image(image, caption=f"🖼️ {img_file.name}")
annotated_image, results = detect_objects(
model=model,
image=image,
model_type=task_type,
conf=conf,
iou=iou,
classes_to_detect=classes_to_detect,
max_det=max_det,
line_width=line_width,
agnostic_nms=agnostic_nms
)
st.image(annotated_image, caption="🖼️ Image annotée")
# Convert BGR->RGB PIL pour stockage
output_images.append(Image.fromarray(cv2.cvtColor(annotated_image, cv2.COLOR_BGR2RGB)))
counts = count_objects(results, task_type, class_names)
for cls_name, count_val in counts.items():
csv_rows.append({"Image": img_file.name, "Classe": cls_name, "Nombre": count_val})
show_table(counts)
# Export
if output_images:
if export_type == "PDF":
export_pdf(output_images)
elif export_type == "ZIP":
export_zip(output_images)
elif export_type == "CSV":
export_csv_rows(csv_rows)
elif export_type == "JSON":
json_data = json.dumps(csv_rows, indent=4)
st.download_button("📥 Télécharger JSON des détections",
data=json_data,
file_name="detections.json",
mime="application/json")
if save_to_cloud_flag:
st.info("Exemple: sauvegarde ZIP des images sur Cloud.")
with tempfile.NamedTemporaryFile(delete=False, suffix=".zip") as tmpfile:
zip_path = tmpfile.name
with ZipFile(zip_path, 'w') as zipf:
for i, img in enumerate(output_images):
img_filename = f"cloud_image_{i}.png"
img.save(img_filename)
zipf.write(img_filename)
os.remove(img_filename)
with open(zip_path, "rb") as f:
zip_data = f.read()
save_to_cloud(zip_data, cloud_service)
os.remove(zip_path)
# ================== 2) VIDEO ==================
elif input_type == "Vidéo":
uploaded_video = st.file_uploader("📁 Choisir une vidéo", type=["mp4", "avi", "mov"], accept_multiple_files=False)
if uploaded_video is not None:
tfile = tempfile.NamedTemporaryFile(delete=False, suffix=".mp4")
tfile.write(uploaded_video.read())
tfile.close()
process_video_file(
video_path=tfile.name,
model=model,
conf=conf,
iou=iou,
export_type=export_type,
classes_to_detect=classes_to_detect,
max_det=max_det,
line_width=line_width,
agnostic_nms=agnostic_nms
)
os.unlink(tfile.name)
# ================== 3) MULTI-WEBCAM ==================
elif input_type == "Webcam":
st.info("Recherche de toutes les webcams disponibles...")
available_devices = []
# Scanner 0..20 pour découvrir potentiellement plus de webcams
for index in range(21):
cap = cv2.VideoCapture(index)
if cap.isOpened():
ret, _ = cap.read()
if ret:
available_devices.append(index)
cap.release()
if len(available_devices) == 0:
st.error("🚫 Aucune webcam détectée ou accessible.")
else:
st.success(f"Webcams détectées : {available_devices}")
selected_devices = st.multiselect(
"Sélectionner les caméras à utiliser (max 4 affichées par rangée)",
options=available_devices,
default=available_devices[:1],
format_func=lambda x: f"Caméra {x}"
)
if selected_devices:
display_webcam_streams(
selected_devices=selected_devices,
model=model,
conf=conf,
iou=iou,
show_fps=show_fps,
auto_snapshot=auto_snapshot,
snapshot_interval=snapshot_interval,
output_format=output_format,
apply_filters=apply_filters,
advanced_filters=advanced_filters,
rotation_angle=rotation_angle,
resize_width=resize_width,
resize_height=resize_height,
detection_zone=detection_zone,
notification_email=notification_email,
save_to_cloud=save_to_cloud_flag,
morphological_ops=morphological_ops,
equalize_hist=equalize_hist,
classes_to_detect=classes_to_detect,
max_det=max_det,
line_width=line_width,
record_output=record_output,
agnostic_nms=agnostic_nms
)
# ================== 4) CAMERA IP ==================
elif input_type == "Caméra IP":
st.info("Activation de la caméra IP...")
ip_url = st.text_input("Entrez l'URL RTSP", value="rtsp://")
if st.button("Démarrer le streaming IP"):
display_ip_camera(
ip_url=ip_url,
model=model,
conf=conf,
iou=iou,
show_fps=show_fps,
auto_snapshot=auto_snapshot,
snapshot_interval=snapshot_interval,
output_format=output_format,
classes_to_detect=classes_to_detect,
max_det=max_det,
line_width=line_width,
agnostic_nms=agnostic_nms
)
# ================== 5) RELECTURE ==================
elif input_type == "Relecture":
st.info("Relecture de vidéos enregistrées")
recorded_video = st.file_uploader("📁 Charger une vidéo enregistrée",
type=["mp4", "avi", "mov"],
accept_multiple_files=False)
if recorded_video is not None:
st.video(recorded_video)
###############################################################################
# LANCEMENT DU SCRIPT
###############################################################################
if __name__ == "__main__":
main()