|
|
import os |
|
|
import time |
|
|
import tempfile |
|
|
import json |
|
|
from zipfile import ZipFile |
|
|
import smtplib |
|
|
from email.mime.text import MIMEText |
|
|
from email.mime.multipart import MIMEMultipart |
|
|
|
|
|
import cv2 |
|
|
import numpy as np |
|
|
import pandas as pd |
|
|
from PIL import Image |
|
|
import streamlit as st |
|
|
from fpdf import FPDF |
|
|
from st_aggrid import AgGrid |
|
|
from st_aggrid.grid_options_builder import GridOptionsBuilder |
|
|
from techsolut_theme import apply_techsolut_theme |
|
|
from ultralytics import YOLO, RTDETR |
|
|
from streamlit_webrtc import webrtc_streamer, VideoTransformerBase |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@st.cache_resource() |
|
|
def load_model(model_choice, custom_model_path=None): |
|
|
""" |
|
|
Chargement du modèle YOLO/RT-DETR en fonction du choix utilisateur, |
|
|
avec mise en cache pour accélérer le rechargement. |
|
|
""" |
|
|
detection_models = [ |
|
|
"yolov5nu", "yolov5s", "yolov5m", "yolov5l", "yolov5x", |
|
|
"yolov8n", "yolov8s", "yolov8m", "yolov8l", "yolov8x", |
|
|
"yolov9c", "yolov9e", |
|
|
"yolov10n", "yolov10s", "yolov10m", "yolov10l", "yolov10x", |
|
|
"yolo11n", "yolo11s", "yolo11m", "yolo11l", "yolo11x", |
|
|
"yolo12n", "yolo12s", "yolo12m", "yolo12l", "yolo12x", |
|
|
"rtdetr-l", "rtdetr-x" |
|
|
] |
|
|
segmentation_models = [ |
|
|
"yolov8n-seg", "yolov8s-seg", "yolov8m-seg", "yolov8l-seg", "yolov8x-seg", |
|
|
"yolov9c-seg", "yolov9e-seg", |
|
|
"yolo11n-seg", "yolo11s-seg", "yolo11m-seg", "yolo11l-seg", "yolo11x-seg" |
|
|
] |
|
|
pose_models = [ |
|
|
"yolov8n-pose", "yolov8s-pose", "yolov8m-pose", "yolov8l-pose", "yolov8x-pose", |
|
|
"yolo11n-pose", "yolo11s-pose", "yolo11m-pose", "yolo11l-pose", "yolo11x-pose" |
|
|
] |
|
|
|
|
|
|
|
|
if model_choice in detection_models + segmentation_models + pose_models: |
|
|
return YOLO(f"{model_choice}.pt") |
|
|
elif model_choice == 'custom' and custom_model_path: |
|
|
return YOLO(custom_model_path) |
|
|
else: |
|
|
|
|
|
model_paths = ["best.pt", "best2.pt", "best3.pt", "best5.pt"] |
|
|
model_names = ["model1", "model2", "model3", "model4"] |
|
|
idx = model_names.index(model_choice) |
|
|
return YOLO(model_paths[idx]) |
|
|
|
|
|
def detect_objects(model, |
|
|
image, |
|
|
model_type, |
|
|
conf, |
|
|
iou, |
|
|
classes_to_detect=None, |
|
|
max_det=1000, |
|
|
line_width=2, |
|
|
agnostic_nms=False): |
|
|
""" |
|
|
Détection sur une image unique, en tenant compte des filtres de classe, |
|
|
du paramètre max_det, de l'épaisseur de bounding box, et du agnostic_nms. |
|
|
""" |
|
|
image_np = np.array(image) if not isinstance(image, np.ndarray) else image |
|
|
|
|
|
|
|
|
results = model( |
|
|
image_np, |
|
|
conf=conf, |
|
|
iou=iou, |
|
|
classes=classes_to_detect if classes_to_detect else None, |
|
|
max_det=max_det, |
|
|
agnostic_nms=agnostic_nms |
|
|
) |
|
|
annotated_image = results[0].plot(line_width=line_width) |
|
|
return annotated_image, results |
|
|
|
|
|
def count_objects(results, model_type, class_names): |
|
|
""" |
|
|
Compte le nombre d'objets détectés par classe. |
|
|
""" |
|
|
object_counts = {} |
|
|
classes = results[0].boxes.cls.cpu().numpy() |
|
|
for cls_id in classes: |
|
|
name = class_names[int(cls_id)] |
|
|
object_counts[name] = object_counts.get(name, 0) + 1 |
|
|
return object_counts |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def export_pdf(images): |
|
|
""" |
|
|
Exporte une liste d'images PIL en un seul PDF avec un bouton de téléchargement. |
|
|
""" |
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as tmpfile: |
|
|
pdf_path = tmpfile.name |
|
|
images[0].save(pdf_path, save_all=True, append_images=images[1:]) |
|
|
with open(pdf_path, "rb") as f: |
|
|
st.download_button("📄 Télécharger le PDF", data=f, file_name="resultats.pdf") |
|
|
|
|
|
def export_zip(images): |
|
|
""" |
|
|
Exporte une liste d'images PIL dans un ZIP avec un bouton de téléchargement. |
|
|
""" |
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix=".zip") as tmpfile: |
|
|
zip_path = tmpfile.name |
|
|
with ZipFile(zip_path, 'w') as zipf: |
|
|
for i, img in enumerate(images): |
|
|
img_filename = f"image_{i}.png" |
|
|
img.save(img_filename) |
|
|
zipf.write(img_filename) |
|
|
os.remove(img_filename) |
|
|
with open(zip_path, "rb") as f: |
|
|
st.download_button("🗜️ Télécharger le ZIP", data=f, file_name="resultats.zip") |
|
|
|
|
|
def export_csv_rows(csv_rows): |
|
|
""" |
|
|
Exporte les détections dans un CSV avec un bouton de téléchargement. |
|
|
""" |
|
|
df = pd.DataFrame(csv_rows) |
|
|
csv_data = df.to_csv(index=False).encode("utf-8") |
|
|
st.download_button("📤 Télécharger CSV des détections", data=csv_data, |
|
|
file_name="detections.csv", mime="text/csv") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def show_table(data, key=None): |
|
|
""" |
|
|
Affiche un tableau récapitulatif (classe / nombre d'objets détectés). |
|
|
""" |
|
|
df = pd.DataFrame(list(data.items()), columns=["Classe", "Nombre"]) |
|
|
gb = GridOptionsBuilder.from_dataframe(df) |
|
|
gb.configure_pagination() |
|
|
gb.configure_default_column(editable=False, groupable=True) |
|
|
gb.configure_selection('multiple', use_checkbox=True) |
|
|
grid_options = gb.build() |
|
|
AgGrid(df, gridOptions=grid_options, theme="streamlit", key=key) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def send_notification_smtp(to_email, message): |
|
|
""" |
|
|
Envoi d'un email via SMTP, nécessite des identifiants valides dans st.secrets. |
|
|
""" |
|
|
smtp_server = st.secrets.get("SMTP_SERVER", "smtp.gmail.com") |
|
|
smtp_port = int(st.secrets.get("SMTP_PORT", 587)) |
|
|
smtp_user = st.secrets.get("SMTP_USER", "your_email@gmail.com") |
|
|
smtp_pass = st.secrets.get("SMTP_PASS", "your_password") |
|
|
|
|
|
try: |
|
|
msg = MIMEMultipart("alternative") |
|
|
msg["Subject"] = "YOLO Detection Notification" |
|
|
msg["From"] = smtp_user |
|
|
msg["To"] = to_email |
|
|
|
|
|
part = MIMEText(message, "plain") |
|
|
msg.attach(part) |
|
|
|
|
|
with smtplib.SMTP(smtp_server, smtp_port) as server: |
|
|
server.starttls() |
|
|
server.login(smtp_user, smtp_pass) |
|
|
server.sendmail(smtp_user, to_email, msg.as_string()) |
|
|
|
|
|
st.success(f"Email envoyé à {to_email} avec succès!") |
|
|
except Exception as e: |
|
|
st.error(f"Erreur lors de l'envoi du mail: {e}") |
|
|
|
|
|
def save_to_cloud(file_data, service): |
|
|
""" |
|
|
Fonction illustrative pour sauvegarder des données sur Google Drive / Dropbox / OneDrive. |
|
|
Remplacer avec du code d'API réel selon le service. |
|
|
""" |
|
|
if service == "Google Drive": |
|
|
st.info("Exemple : utiliser PyDrive ou Google Drive API.") |
|
|
elif service == "Dropbox": |
|
|
st.info("Exemple : utiliser le SDK Dropbox pour l'upload.") |
|
|
elif service == "OneDrive": |
|
|
st.info("Exemple : utiliser le SDK OneDrive (MS Graph).") |
|
|
|
|
|
st.success(f"Sauvegarde simulée sur {service} réalisée avec succès !") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class VideoTransformer(VideoTransformerBase): |
|
|
def __init__( |
|
|
self, |
|
|
model, |
|
|
conf=0.25, |
|
|
iou=0.45, |
|
|
show_fps=False, |
|
|
auto_snapshot=False, |
|
|
snapshot_interval=5, |
|
|
output_format="RGB", |
|
|
apply_filters=False, |
|
|
advanced_filters=False, |
|
|
rotation_angle=0, |
|
|
resize_width=None, |
|
|
resize_height=None, |
|
|
detection_zone=None, |
|
|
notification_email=None, |
|
|
save_to_cloud=False, |
|
|
morphological_ops=False, |
|
|
equalize_hist=False, |
|
|
classes_to_detect=None, |
|
|
max_det=1000, |
|
|
line_width=2, |
|
|
record_output=False, |
|
|
agnostic_nms=False |
|
|
): |
|
|
""" |
|
|
Gère chaque frame de la webcam en temps réel, applique YOLO, |
|
|
différents filtres, la rotation, la sauvegarde locale, etc. |
|
|
""" |
|
|
self.model = model |
|
|
self.conf = conf |
|
|
self.iou = iou |
|
|
self.show_fps = show_fps |
|
|
self.auto_snapshot = auto_snapshot |
|
|
self.snapshot_interval = snapshot_interval |
|
|
self.output_format = output_format.upper() |
|
|
self.apply_filters = apply_filters |
|
|
self.advanced_filters = advanced_filters |
|
|
self.rotation_angle = rotation_angle |
|
|
self.resize_width = resize_width |
|
|
self.resize_height = resize_height |
|
|
self.detection_zone = detection_zone |
|
|
self.notification_email = notification_email |
|
|
self.save_to_cloud = save_to_cloud |
|
|
|
|
|
self.morphological_ops = morphological_ops |
|
|
self.equalize_hist = equalize_hist |
|
|
self.classes_to_detect = classes_to_detect |
|
|
self.max_det = max_det |
|
|
self.line_width = line_width |
|
|
self.record_output = record_output |
|
|
self.agnostic_nms = agnostic_nms |
|
|
|
|
|
self.last_time = time.time() |
|
|
self.last_snapshot_time = time.time() |
|
|
self.latest_snapshot = None |
|
|
self.last_frame = None |
|
|
|
|
|
|
|
|
self.video_writer = None |
|
|
if self.record_output: |
|
|
self.output_filename = os.path.join(tempfile.gettempdir(), |
|
|
f"webcam_record_{time.time()}.mp4") |
|
|
|
|
|
def transform(self, frame): |
|
|
image = frame.to_ndarray(format="bgr24") |
|
|
self.last_frame = image.copy() |
|
|
|
|
|
|
|
|
if self.record_output and self.video_writer is None: |
|
|
fourcc = cv2.VideoWriter_fourcc(*"mp4v") |
|
|
h, w, _ = image.shape |
|
|
self.video_writer = cv2.VideoWriter(self.output_filename, fourcc, 20.0, (w, h)) |
|
|
|
|
|
|
|
|
if self.apply_filters: |
|
|
image = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) |
|
|
image = cv2.GaussianBlur(image, (5, 5), 0) |
|
|
image = cv2.cvtColor(image, cv2.COLOR_GRAY2BGR) |
|
|
|
|
|
|
|
|
if self.advanced_filters: |
|
|
image = cv2.convertScaleAbs(image, alpha=1.5, beta=30) |
|
|
|
|
|
|
|
|
if self.morphological_ops: |
|
|
kernel = np.ones((3,3), np.uint8) |
|
|
image = cv2.morphologyEx(image, cv2.MORPH_OPEN, kernel) |
|
|
image = cv2.morphologyEx(image, cv2.MORPH_CLOSE, kernel) |
|
|
|
|
|
|
|
|
if self.equalize_hist: |
|
|
yuv = cv2.cvtColor(image, cv2.COLOR_BGR2YUV) |
|
|
yuv[:, :, 0] = cv2.equalizeHist(yuv[:, :, 0]) |
|
|
image = cv2.cvtColor(yuv, cv2.COLOR_YUV2BGR) |
|
|
|
|
|
|
|
|
if self.rotation_angle != 0: |
|
|
image = self.rotate_image(image, self.rotation_angle) |
|
|
|
|
|
|
|
|
if self.resize_width and self.resize_height: |
|
|
image = cv2.resize(image, (self.resize_width, self.resize_height)) |
|
|
|
|
|
|
|
|
if self.detection_zone: |
|
|
x, y, w, h = self.detection_zone |
|
|
image = image[y:y+h, x:x+w] |
|
|
|
|
|
|
|
|
results = self.model( |
|
|
image, |
|
|
conf=self.conf, |
|
|
iou=self.iou, |
|
|
classes=self.classes_to_detect if self.classes_to_detect else None, |
|
|
max_det=self.max_det, |
|
|
agnostic_nms=self.agnostic_nms |
|
|
) |
|
|
annotated_frame = results[0].plot(line_width=self.line_width) |
|
|
|
|
|
|
|
|
current_time = time.time() |
|
|
dt = current_time - self.last_time |
|
|
fps = 1.0 / dt if dt > 0 else 0.0 |
|
|
self.last_time = current_time |
|
|
if self.show_fps: |
|
|
cv2.putText(annotated_frame, f"FPS: {fps:.2f}", (10, 30), |
|
|
cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2) |
|
|
|
|
|
|
|
|
if self.auto_snapshot and (current_time - self.last_snapshot_time >= self.snapshot_interval): |
|
|
self.last_snapshot_time = current_time |
|
|
self.latest_snapshot = annotated_frame.copy() |
|
|
|
|
|
|
|
|
if self.record_output and self.video_writer is not None: |
|
|
self.video_writer.write(annotated_frame) |
|
|
|
|
|
|
|
|
if self.output_format == "RGB": |
|
|
display_frame = cv2.cvtColor(annotated_frame, cv2.COLOR_BGR2RGB) |
|
|
else: |
|
|
display_frame = annotated_frame |
|
|
|
|
|
|
|
|
if self.notification_email and any(results): |
|
|
send_notification_smtp(self.notification_email, "Détection réalisée sur le flux webcam !") |
|
|
|
|
|
|
|
|
if self.save_to_cloud and any(results): |
|
|
_, buffer = cv2.imencode('.png', annotated_frame) |
|
|
save_to_cloud(buffer.tobytes(), "Google Drive") |
|
|
|
|
|
return display_frame |
|
|
|
|
|
def rotate_image(self, image, angle): |
|
|
(h, w) = image.shape[:2] |
|
|
center = (w / 2, h / 2) |
|
|
M = cv2.getRotationMatrix2D(center, angle, 1.0) |
|
|
return cv2.warpAffine(image, M, (w, h)) |
|
|
|
|
|
def __del__(self): |
|
|
if self.video_writer: |
|
|
self.video_writer.release() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def process_video_file( |
|
|
video_path, |
|
|
model, |
|
|
conf, |
|
|
iou, |
|
|
export_type, |
|
|
classes_to_detect=None, |
|
|
max_det=1000, |
|
|
line_width=2, |
|
|
agnostic_nms=False |
|
|
): |
|
|
""" |
|
|
Traite une vidéo en local, affiche certaines frames annotées, |
|
|
et propose l'exportation des résultats. |
|
|
""" |
|
|
cap = cv2.VideoCapture(video_path) |
|
|
if not cap.isOpened(): |
|
|
st.error("🚫 Impossible d'ouvrir la vidéo.") |
|
|
return |
|
|
|
|
|
frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) |
|
|
st.info(f"🎞️ Vidéo chargée, {frame_count} frames trouvées.") |
|
|
|
|
|
process_only_first = st.checkbox("Traiter seulement la première frame", value=True) |
|
|
output_images = [] |
|
|
csv_rows = [] |
|
|
|
|
|
if process_only_first: |
|
|
ret, frame = cap.read() |
|
|
if ret: |
|
|
pil_img = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)) |
|
|
annotated_frame, results = detect_objects( |
|
|
model=model, |
|
|
image=pil_img, |
|
|
model_type="Vidéo", |
|
|
conf=conf, |
|
|
iou=iou, |
|
|
classes_to_detect=classes_to_detect, |
|
|
max_det=max_det, |
|
|
line_width=line_width, |
|
|
agnostic_nms=agnostic_nms |
|
|
) |
|
|
st.image(annotated_frame, channels="BGR", caption="🖼️ Première frame annotée") |
|
|
output_images.append(Image.fromarray(cv2.cvtColor(annotated_frame, cv2.COLOR_BGR2RGB))) |
|
|
|
|
|
counts = count_objects(results, "Vidéo", model.names if hasattr(model, 'names') else []) |
|
|
for cls_name, count_val in counts.items(): |
|
|
csv_rows.append({"Image": "frame_0", "Classe": cls_name, "Nombre": count_val}) |
|
|
else: |
|
|
st.error("🚫 Impossible de lire la première frame.") |
|
|
else: |
|
|
num_frames_to_process = st.slider("Nombre de frames à traiter", 1, min(frame_count, 50), 10) |
|
|
frame_idx = 0 |
|
|
processed = 0 |
|
|
interval = max(1, frame_count // num_frames_to_process) |
|
|
while processed < num_frames_to_process: |
|
|
ret, frame = cap.read() |
|
|
if not ret: |
|
|
break |
|
|
if frame_idx % interval == 0: |
|
|
pil_img = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)) |
|
|
annotated_frame, results = detect_objects( |
|
|
model=model, |
|
|
image=pil_img, |
|
|
model_type="Vidéo", |
|
|
conf=conf, |
|
|
iou=iou, |
|
|
classes_to_detect=classes_to_detect, |
|
|
max_det=max_det, |
|
|
line_width=line_width, |
|
|
agnostic_nms=agnostic_nms |
|
|
) |
|
|
st.image(annotated_frame, channels="BGR", caption=f"🖼️ Frame {frame_idx} annotée") |
|
|
output_images.append(Image.fromarray(cv2.cvtColor(annotated_frame, cv2.COLOR_BGR2RGB))) |
|
|
|
|
|
counts = count_objects(results, "Vidéo", model.names if hasattr(model, 'names') else []) |
|
|
for cls_name, count_val in counts.items(): |
|
|
csv_rows.append({"Image": f"frame_{frame_idx}", "Classe": cls_name, "Nombre": count_val}) |
|
|
processed += 1 |
|
|
frame_idx += 1 |
|
|
cap.release() |
|
|
|
|
|
|
|
|
if output_images: |
|
|
if export_type == "PDF": |
|
|
export_pdf(output_images) |
|
|
elif export_type == "ZIP": |
|
|
export_zip(output_images) |
|
|
elif export_type == "CSV": |
|
|
export_csv_rows(csv_rows) |
|
|
elif export_type == "JSON": |
|
|
json_data = json.dumps(csv_rows, indent=4) |
|
|
st.download_button("📥 Télécharger JSON des détections", |
|
|
data=json_data, |
|
|
file_name="detections.json", |
|
|
mime="application/json") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def display_webcam_streams( |
|
|
selected_devices, |
|
|
model, |
|
|
conf, |
|
|
iou, |
|
|
show_fps, |
|
|
auto_snapshot, |
|
|
snapshot_interval, |
|
|
output_format, |
|
|
apply_filters, |
|
|
advanced_filters, |
|
|
rotation_angle, |
|
|
resize_width, |
|
|
resize_height, |
|
|
detection_zone, |
|
|
notification_email, |
|
|
save_to_cloud, |
|
|
morphological_ops, |
|
|
equalize_hist, |
|
|
classes_to_detect, |
|
|
max_det, |
|
|
line_width, |
|
|
record_output, |
|
|
agnostic_nms |
|
|
): |
|
|
""" |
|
|
Affiche plusieurs webcams simultanément, organisées en lignes de 4 caméras max. |
|
|
Chaque webcam possède sa propre instance de VideoTransformer. |
|
|
""" |
|
|
if not selected_devices: |
|
|
st.warning("Aucune webcam sélectionnée.") |
|
|
return |
|
|
|
|
|
|
|
|
for row_start in range(0, len(selected_devices), 4): |
|
|
row_devices = selected_devices[row_start:row_start+4] |
|
|
cols = st.columns(len(row_devices)) |
|
|
|
|
|
for i, device_index in enumerate(row_devices): |
|
|
with cols[i]: |
|
|
st.markdown(f"**Caméra {device_index}**") |
|
|
ctx = webrtc_streamer( |
|
|
key=f"webcam-{device_index}", |
|
|
video_transformer_factory=lambda m=model, c=conf, iou_val=iou: VideoTransformer( |
|
|
m, |
|
|
conf=c, |
|
|
iou=iou_val, |
|
|
show_fps=show_fps, |
|
|
auto_snapshot=auto_snapshot, |
|
|
snapshot_interval=snapshot_interval, |
|
|
output_format=output_format, |
|
|
apply_filters=apply_filters, |
|
|
advanced_filters=advanced_filters, |
|
|
rotation_angle=rotation_angle, |
|
|
resize_width=resize_width, |
|
|
resize_height=resize_height, |
|
|
detection_zone=detection_zone, |
|
|
notification_email=notification_email, |
|
|
save_to_cloud=save_to_cloud, |
|
|
morphological_ops=morphological_ops, |
|
|
equalize_hist=equalize_hist, |
|
|
classes_to_detect=classes_to_detect, |
|
|
max_det=max_det, |
|
|
line_width=line_width, |
|
|
record_output=record_output, |
|
|
agnostic_nms=agnostic_nms |
|
|
), |
|
|
rtc_configuration={"iceServers": [{"urls": ["stun:stun.l.google.com:19302"]}]}, |
|
|
media_stream_constraints={ |
|
|
"video": {"deviceId": {"exact": str(device_index)}}, |
|
|
"audio": False |
|
|
} |
|
|
) |
|
|
|
|
|
|
|
|
if st.button(f"📸 Snapshot Caméra {device_index}", key=f"snap-{device_index}"): |
|
|
if ctx.video_transformer: |
|
|
frame = ctx.video_transformer.last_frame |
|
|
if frame is not None: |
|
|
st.image(frame, caption=f"Snapshot Caméra {device_index}", channels="BGR") |
|
|
else: |
|
|
st.warning("🚫 Aucune image capturée pour cette caméra.") |
|
|
|
|
|
|
|
|
if auto_snapshot and ctx.video_transformer and ctx.video_transformer.latest_snapshot is not None: |
|
|
ret, buffer = cv2.imencode('.png', ctx.video_transformer.latest_snapshot) |
|
|
if ret: |
|
|
snapshot_bytes = buffer.tobytes() |
|
|
st.download_button( |
|
|
label="📥 Télécharger Snapshot Auto", |
|
|
data=snapshot_bytes, |
|
|
file_name=f"snapshot_cam_{device_index}.png", |
|
|
key=f"auto_snap_{device_index}" |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def display_ip_camera( |
|
|
ip_url, |
|
|
model, |
|
|
conf, |
|
|
iou, |
|
|
show_fps, |
|
|
auto_snapshot, |
|
|
snapshot_interval, |
|
|
output_format, |
|
|
classes_to_detect=None, |
|
|
max_det=1000, |
|
|
line_width=2, |
|
|
agnostic_nms=False |
|
|
): |
|
|
""" |
|
|
Lit des frames depuis une caméra IP (RTSP), applique YOLO, et les affiche en temps réel. |
|
|
""" |
|
|
cap = cv2.VideoCapture(ip_url) |
|
|
if not cap.isOpened(): |
|
|
st.error("🚫 Impossible d'ouvrir la caméra IP.") |
|
|
return |
|
|
|
|
|
frame_placeholder = st.empty() |
|
|
last_time = time.time() |
|
|
last_snapshot_time = time.time() |
|
|
stop_button = st.button("⏹️ Arrêter le streaming IP") |
|
|
|
|
|
while True: |
|
|
if stop_button: |
|
|
st.info("Arrêt du streaming IP.") |
|
|
break |
|
|
|
|
|
ret, frame = cap.read() |
|
|
if not ret: |
|
|
st.error("🚫 Erreur de lecture du flux IP.") |
|
|
break |
|
|
|
|
|
|
|
|
results = model( |
|
|
frame, |
|
|
conf=conf, |
|
|
iou=iou, |
|
|
classes=classes_to_detect if classes_to_detect else None, |
|
|
max_det=max_det, |
|
|
agnostic_nms=agnostic_nms |
|
|
) |
|
|
annotated_frame = results[0].plot(line_width=line_width) |
|
|
|
|
|
|
|
|
current_time = time.time() |
|
|
dt = current_time - last_time |
|
|
fps = 1.0 / dt if dt > 0 else 0.0 |
|
|
last_time = current_time |
|
|
if show_fps: |
|
|
cv2.putText(annotated_frame, f"FPS: {fps:.2f}", (10, 30), |
|
|
cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2) |
|
|
|
|
|
|
|
|
if auto_snapshot and (current_time - last_snapshot_time >= snapshot_interval): |
|
|
last_snapshot_time = current_time |
|
|
snapshot_bytes = cv2.imencode('.png', annotated_frame)[1].tobytes() |
|
|
st.download_button( |
|
|
"📥 Télécharger Snapshot Auto", |
|
|
data=snapshot_bytes, |
|
|
file_name="ip_snapshot.png", |
|
|
key=f"ip_snapshot_{time.time()}" |
|
|
) |
|
|
|
|
|
|
|
|
if output_format.upper() == "RGB": |
|
|
annotated_frame = cv2.cvtColor(annotated_frame, cv2.COLOR_BGR2RGB) |
|
|
|
|
|
frame_placeholder.image( |
|
|
annotated_frame, |
|
|
channels="RGB" if output_format.upper()=="RGB" else "BGR" |
|
|
) |
|
|
|
|
|
cap.release() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def main(): |
|
|
apply_techsolut_theme() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
model_versions = { |
|
|
"Détection": { |
|
|
"YOLOv5": ["yolov5nu", "yolov5s", "yolov5m", "yolov5l", "yolov5x"], |
|
|
"YOLOv8": ["yolov8n", "yolov8s", "yolov8m", "yolov8l", "yolov8x"], |
|
|
"YOLOv9": ["yolov9c", "yolov9e"], |
|
|
"YOLOv10": ["yolov10n", "yolov10s", "yolov10m", "yolov10l", "yolov10x"], |
|
|
"YOLO11": ["yolo11n", "yolo11s", "yolo11m", "yolo11l", "yolo11x"], |
|
|
"YOLO12": ["yolo12n", "yolo12s", "yolo12m", "yolo12l", "yolo12x"], |
|
|
"RT-DETR": ["rtdetr-l", "rtdetr-x"] |
|
|
}, |
|
|
"Segmentation": { |
|
|
"YOLOv8": ["yolov8n-seg", "yolov8s-seg", "yolov8m-seg", "yolov8l-seg", "yolov8x-seg"], |
|
|
"YOLOv9": ["yolov9c-seg", "yolov9e-seg"], |
|
|
"YOLO11": ["yolo11n-seg", "yolo11s-seg", "yolo11m-seg", "yolo11l-seg", "yolo11x-seg"] |
|
|
}, |
|
|
"Estimation de pose": { |
|
|
"YOLOv8": ["yolov8n-pose", "yolov8s-pose", "yolov8m-pose", "yolov8l-pose", "yolov8x-pose"], |
|
|
"YOLO11": ["yolo11n-pose", "yolo11s-pose", "yolo11m-pose", "yolo11l-pose", "yolo11x-pose"] |
|
|
}, |
|
|
"Personnalisé": { |
|
|
"Custom": ["custom"] |
|
|
} |
|
|
} |
|
|
|
|
|
with st.sidebar: |
|
|
with st.expander("🧠 Choix du modèle"): |
|
|
task_type = st.selectbox("Type de tâche", list(model_versions.keys())) |
|
|
model_family = st.selectbox("Famille de modèle", list(model_versions[task_type].keys())) |
|
|
selected_model = st.selectbox("Version du modèle", model_versions[task_type][model_family]) |
|
|
|
|
|
custom_model_path = None |
|
|
if selected_model == "custom": |
|
|
uploaded_file = st.file_uploader("📥 Charger un modèle (.pt)", type=["pt"]) |
|
|
if uploaded_file: |
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix=".pt") as tmp: |
|
|
tmp.write(uploaded_file.read()) |
|
|
custom_model_path = tmp.name |
|
|
|
|
|
with st.expander("🎯 Paramètres"): |
|
|
conf = st.slider("Confiance (confidence threshold)", 0.0, 1.0, 0.25) |
|
|
iou = st.slider("IoU threshold", 0.0, 1.0, 0.45) |
|
|
input_size = st.selectbox("Taille d'entrée du modèle", |
|
|
options=[320, 416, 512, 640, 960, 1280], |
|
|
index=3) |
|
|
processing_mode = st.radio("Mode de traitement", |
|
|
options=["Image par image", "Traitement par lot"], |
|
|
horizontal=True) |
|
|
show_fps = st.checkbox("Afficher le FPS sur la vidéo", value=False) |
|
|
auto_snapshot = st.checkbox("Téléchargement automatique des snapshots", value=False) |
|
|
output_format = st.selectbox("Format de sortie des frames", options=["BGR", "RGB"], index=1) |
|
|
snapshot_interval = st.number_input("Sauvegarder une frame toutes les X secondes", |
|
|
min_value=1, max_value=60, value=5, step=1) |
|
|
|
|
|
apply_filters = st.checkbox("Appliquer des filtres (grayscale + flou)", value=False) |
|
|
advanced_filters = st.checkbox("Appliquer des filtres avancés (contraste + luminosité)", value=False) |
|
|
rotation_angle = st.number_input("Angle de rotation", min_value=0, max_value=360, value=0, step=1) |
|
|
resize_width = st.number_input("Largeur de redimensionnement", min_value=1, value=640, step=1) |
|
|
resize_height = st.number_input("Hauteur de redimensionnement", min_value=1, value=480, step=1) |
|
|
|
|
|
detection_zone = st.checkbox("Définir une zone de détection") |
|
|
if detection_zone: |
|
|
x = st.number_input("Zone X", min_value=0, value=0, step=1) |
|
|
y = st.number_input("Zone Y", min_value=0, value=0, step=1) |
|
|
w = st.number_input("Zone Largeur", min_value=1, value=640, step=1) |
|
|
h = st.number_input("Zone Hauteur", min_value=1, value=480, step=1) |
|
|
detection_zone = (x, y, w, h) |
|
|
else: |
|
|
detection_zone = None |
|
|
|
|
|
notification_email = st.text_input("Email pour notifications") |
|
|
save_to_cloud_flag = st.checkbox("Sauvegarder les résultats sur le cloud") |
|
|
|
|
|
with st.expander("🧩 Options Avancées"): |
|
|
morphological_ops = st.checkbox("Opérations morphologiques (opening/closing)") |
|
|
equalize_hist = st.checkbox("Égaliser l'histogramme (améliorer contraste)") |
|
|
|
|
|
custom_classes = st.text_input("Lister les classes (ID, séparés par des virgules) à détecter ou laisser vide") |
|
|
if custom_classes.strip(): |
|
|
classes_to_detect = [int(c.strip()) for c in custom_classes.split(",") if c.strip().isdigit()] |
|
|
else: |
|
|
classes_to_detect = None |
|
|
|
|
|
max_det = st.number_input("Max Detections autorisées", min_value=1, max_value=10000, value=1000, step=50) |
|
|
line_width = st.slider("Épaisseur des bounding boxes", 1, 10, 2) |
|
|
record_output = st.checkbox("Enregistrer les flux webcam en local (format MP4)") |
|
|
agnostic_nms = st.checkbox("Agnostic NMS (ignorer les classes lors du NMS)") |
|
|
|
|
|
with st.expander("🖍️ Post-traitement"): |
|
|
export_type = st.selectbox("Exporter sous", ["PDF", "ZIP", "CSV", "JSON"]) |
|
|
|
|
|
with st.expander("☁️ Sauvegarde Cloud"): |
|
|
cloud_service = st.selectbox("Choisir un service", ["Google Drive", "Dropbox", "OneDrive"]) |
|
|
cloud_file = st.file_uploader("📤 Sélectionner un fichier à sauvegarder", |
|
|
type=["pdf", "zip", "csv", "jpg", "png", "json"]) |
|
|
if cloud_file: |
|
|
if st.button(f"Sauvegarder sur {cloud_service}"): |
|
|
save_to_cloud(cloud_file.read(), cloud_service) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
model = load_model(selected_model, custom_model_path) |
|
|
class_names = model.names if hasattr(model, 'names') else [] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
st.subheader("Source : 🖼️ Image, 🎥 Vidéo, 📷 Webcam, 🌐 Caméra IP ou 🔄 Relecture") |
|
|
input_type = st.radio("Source", ["Image", "Vidéo", "Webcam", "Caméra IP", "Relecture"], horizontal=True) |
|
|
|
|
|
|
|
|
if input_type == "Image": |
|
|
uploaded_images = st.file_uploader("📁 Choisir des images", type=["jpg", "png"], accept_multiple_files=True) |
|
|
if uploaded_images: |
|
|
output_images = [] |
|
|
csv_rows = [] |
|
|
for img_file in uploaded_images: |
|
|
image = Image.open(img_file).convert("RGB") |
|
|
st.image(image, caption=f"🖼️ {img_file.name}") |
|
|
|
|
|
annotated_image, results = detect_objects( |
|
|
model=model, |
|
|
image=image, |
|
|
model_type=task_type, |
|
|
conf=conf, |
|
|
iou=iou, |
|
|
classes_to_detect=classes_to_detect, |
|
|
max_det=max_det, |
|
|
line_width=line_width, |
|
|
agnostic_nms=agnostic_nms |
|
|
) |
|
|
st.image(annotated_image, caption="🖼️ Image annotée") |
|
|
|
|
|
|
|
|
output_images.append(Image.fromarray(cv2.cvtColor(annotated_image, cv2.COLOR_BGR2RGB))) |
|
|
|
|
|
counts = count_objects(results, task_type, class_names) |
|
|
for cls_name, count_val in counts.items(): |
|
|
csv_rows.append({"Image": img_file.name, "Classe": cls_name, "Nombre": count_val}) |
|
|
|
|
|
show_table(counts) |
|
|
|
|
|
|
|
|
if output_images: |
|
|
if export_type == "PDF": |
|
|
export_pdf(output_images) |
|
|
elif export_type == "ZIP": |
|
|
export_zip(output_images) |
|
|
elif export_type == "CSV": |
|
|
export_csv_rows(csv_rows) |
|
|
elif export_type == "JSON": |
|
|
json_data = json.dumps(csv_rows, indent=4) |
|
|
st.download_button("📥 Télécharger JSON des détections", |
|
|
data=json_data, |
|
|
file_name="detections.json", |
|
|
mime="application/json") |
|
|
if save_to_cloud_flag: |
|
|
st.info("Exemple: sauvegarde ZIP des images sur Cloud.") |
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix=".zip") as tmpfile: |
|
|
zip_path = tmpfile.name |
|
|
with ZipFile(zip_path, 'w') as zipf: |
|
|
for i, img in enumerate(output_images): |
|
|
img_filename = f"cloud_image_{i}.png" |
|
|
img.save(img_filename) |
|
|
zipf.write(img_filename) |
|
|
os.remove(img_filename) |
|
|
with open(zip_path, "rb") as f: |
|
|
zip_data = f.read() |
|
|
save_to_cloud(zip_data, cloud_service) |
|
|
os.remove(zip_path) |
|
|
|
|
|
|
|
|
elif input_type == "Vidéo": |
|
|
uploaded_video = st.file_uploader("📁 Choisir une vidéo", type=["mp4", "avi", "mov"], accept_multiple_files=False) |
|
|
if uploaded_video is not None: |
|
|
tfile = tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") |
|
|
tfile.write(uploaded_video.read()) |
|
|
tfile.close() |
|
|
|
|
|
process_video_file( |
|
|
video_path=tfile.name, |
|
|
model=model, |
|
|
conf=conf, |
|
|
iou=iou, |
|
|
export_type=export_type, |
|
|
classes_to_detect=classes_to_detect, |
|
|
max_det=max_det, |
|
|
line_width=line_width, |
|
|
agnostic_nms=agnostic_nms |
|
|
) |
|
|
os.unlink(tfile.name) |
|
|
|
|
|
|
|
|
elif input_type == "Webcam": |
|
|
st.info("Recherche de toutes les webcams disponibles...") |
|
|
available_devices = [] |
|
|
|
|
|
for index in range(21): |
|
|
cap = cv2.VideoCapture(index) |
|
|
if cap.isOpened(): |
|
|
ret, _ = cap.read() |
|
|
if ret: |
|
|
available_devices.append(index) |
|
|
cap.release() |
|
|
|
|
|
if len(available_devices) == 0: |
|
|
st.error("🚫 Aucune webcam détectée ou accessible.") |
|
|
else: |
|
|
st.success(f"Webcams détectées : {available_devices}") |
|
|
selected_devices = st.multiselect( |
|
|
"Sélectionner les caméras à utiliser (max 4 affichées par rangée)", |
|
|
options=available_devices, |
|
|
default=available_devices[:1], |
|
|
format_func=lambda x: f"Caméra {x}" |
|
|
) |
|
|
if selected_devices: |
|
|
display_webcam_streams( |
|
|
selected_devices=selected_devices, |
|
|
model=model, |
|
|
conf=conf, |
|
|
iou=iou, |
|
|
show_fps=show_fps, |
|
|
auto_snapshot=auto_snapshot, |
|
|
snapshot_interval=snapshot_interval, |
|
|
output_format=output_format, |
|
|
apply_filters=apply_filters, |
|
|
advanced_filters=advanced_filters, |
|
|
rotation_angle=rotation_angle, |
|
|
resize_width=resize_width, |
|
|
resize_height=resize_height, |
|
|
detection_zone=detection_zone, |
|
|
notification_email=notification_email, |
|
|
save_to_cloud=save_to_cloud_flag, |
|
|
morphological_ops=morphological_ops, |
|
|
equalize_hist=equalize_hist, |
|
|
classes_to_detect=classes_to_detect, |
|
|
max_det=max_det, |
|
|
line_width=line_width, |
|
|
record_output=record_output, |
|
|
agnostic_nms=agnostic_nms |
|
|
) |
|
|
|
|
|
|
|
|
elif input_type == "Caméra IP": |
|
|
st.info("Activation de la caméra IP...") |
|
|
ip_url = st.text_input("Entrez l'URL RTSP", value="rtsp://") |
|
|
if st.button("Démarrer le streaming IP"): |
|
|
display_ip_camera( |
|
|
ip_url=ip_url, |
|
|
model=model, |
|
|
conf=conf, |
|
|
iou=iou, |
|
|
show_fps=show_fps, |
|
|
auto_snapshot=auto_snapshot, |
|
|
snapshot_interval=snapshot_interval, |
|
|
output_format=output_format, |
|
|
classes_to_detect=classes_to_detect, |
|
|
max_det=max_det, |
|
|
line_width=line_width, |
|
|
agnostic_nms=agnostic_nms |
|
|
) |
|
|
|
|
|
|
|
|
elif input_type == "Relecture": |
|
|
st.info("Relecture de vidéos enregistrées") |
|
|
recorded_video = st.file_uploader("📁 Charger une vidéo enregistrée", |
|
|
type=["mp4", "avi", "mov"], |
|
|
accept_multiple_files=False) |
|
|
if recorded_video is not None: |
|
|
st.video(recorded_video) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
main() |
|
|
|