FireWatch / detection /views.py
rinogeek's picture
Add application file
fc3299e
"""
Vues pour l'application Detection
Créé par Marino ATOHOUN - FireWatch AI Project
Modifié par BlackBenAI Team - Intégration réelle des modèles YOLOv8
"""
import os
import json
import time
import uuid
import cv2
import numpy as np
from PIL import Image
from django.shortcuts import render
from django.http import JsonResponse, HttpResponse, Http404
from django.views.decorators.http import require_POST, require_GET
from django.views.decorators.csrf import csrf_exempt
from django.conf import settings
from django.core.files.storage import default_storage
from django.core.files.base import ContentFile
from django.utils import timezone
from .models import Contact, DetectionSession, Detection, AIModelStatus
import logging
# Par Marino ATOHOUN: Configuration du logging
logger = logging.getLogger(__name__)
# Par BlackBenAI: Variables globales pour les modèles YOLOv8
# Ces variables seront initialisées au démarrage du serveur
fire_model = None
intrusion_model = None
models_loaded = False
# Par BlackBenAI: Fonction pour charger les modèles YOLOv8
def load_yolo_models():
"""
Charge les modèles YOLOv8 pour la détection d'incendie et d'intrusion
À appeler au démarrage du serveur Django
"""
global fire_model, intrusion_model, models_loaded
try:
# Par BlackBenAI: Import de YOLO depuis ultralytics
from ultralytics import YOLO
# Vérifier si les fichiers de modèles existent
fire_model_path = settings.FIRE_MODEL_PATH
intrusion_model_path = settings.INTRUSION_MODEL_PATH
# Charger le modèle d'incendie
if os.path.exists(fire_model_path):
fire_model = YOLO(str(fire_model_path))
# PATCH BlackBenAI: Empêcher le fusing automatique qui cause l'erreur 'Conv' object has no attribute 'bn'
# Le modèle semble déjà fusionné ou incompatible avec cette version d'Ultralytics pour le fusing
try:
if hasattr(fire_model, 'model') and fire_model.model:
fire_model.model.fuse = lambda *args, **kwargs: fire_model.model
logger.info("🔧 Patch 'fuse' appliqué au modèle d'incendie")
except Exception as e:
logger.warning(f"⚠️ Impossible d'appliquer le patch 'fuse' au modèle d'incendie: {e}")
logger.info(f"✅ Modèle d'incendie chargé avec succès: {fire_model_path}")
# Mettre à jour le statut dans la base de données
AIModelStatus.objects.update_or_create(
model_type='fire',
defaults={
'model_path': str(fire_model_path),
'is_loaded': True,
'last_loaded': timezone.now(),
'model_version': '1.0'
}
)
else:
logger.warning(f"⚠️ Modèle d'incendie non trouvé: {fire_model_path}")
AIModelStatus.objects.update_or_create(
model_type='fire',
defaults={
'model_path': str(fire_model_path),
'is_loaded': False,
'last_loaded': None,
'model_version': '1.0'
}
)
# Charger le modèle d'intrusion
if os.path.exists(intrusion_model_path):
intrusion_model = YOLO(str(intrusion_model_path))
# PATCH BlackBenAI: Empêcher le fusing automatique
try:
if hasattr(intrusion_model, 'model') and intrusion_model.model:
intrusion_model.model.fuse = lambda *args, **kwargs: intrusion_model.model
logger.info("🔧 Patch 'fuse' appliqué au modèle d'intrusion")
except Exception as e:
logger.warning(f"⚠️ Impossible d'appliquer le patch 'fuse' au modèle d'intrusion: {e}")
logger.info(f"✅ Modèle d'intrusion chargé avec succès: {intrusion_model_path}")
# Mettre à jour le statut dans la base de données
AIModelStatus.objects.update_or_create(
model_type='intrusion',
defaults={
'model_path': str(intrusion_model_path),
'is_loaded': True,
'last_loaded': timezone.now(),
'model_version': '1.0'
}
)
else:
logger.warning(f"⚠️ Modèle d'intrusion non trouvé: {intrusion_model_path}")
AIModelStatus.objects.update_or_create(
model_type='intrusion',
defaults={
'model_path': str(intrusion_model_path),
'is_loaded': False,
'last_loaded': None,
'model_version': '1.0'
}
)
models_loaded = (fire_model is not None) or (intrusion_model is not None)
if models_loaded:
logger.info("🚀 Modèles YOLOv8 initialisés avec succès!")
else:
logger.warning("⚠️ Aucun modèle YOLOv8 n'a pu être chargé")
except ImportError as e:
logger.error(f"❌ ultralytics n'est pas installé: {e}")
logger.info("💡 Installez avec: pip install ultralytics")
except Exception as e:
logger.error(f"❌ Erreur lors du chargement des modèles: {e}")
# Par BlackBenAI: Mapping des classes pour les modèles
# Ajustez ces mappings selon les classes de vos modèles
FIRE_CLASS_NAMES = {
0: 'fire',
1: 'smoke',
}
INTRUSION_CLASS_NAMES = {
0: 'person',
}
# Par BlackBenAI: Fonction pour effectuer la détection sur une image
def detect_objects_in_image(image_path, session, model_type='both'):
"""
Effectue la détection d'objets sur une image avec les modèles YOLOv8
model_type: 'fire', 'intrusion', ou 'both'
Retourne une liste de détections
"""
detections = []
try:
# Charger l'image avec OpenCV
image = cv2.imread(image_path)
if image is None:
logger.error(f"❌ Impossible de charger l'image: {image_path}")
return detections
# Détection avec le modèle d'incendie
if fire_model is not None and model_type in ['fire', 'both']:
logger.info("🔥 Exécution de la détection d'incendie...")
try:
fire_results = fire_model(image, verbose=False)
for result in fire_results:
boxes = result.boxes
if boxes is not None and len(boxes) > 0:
for box in boxes:
# Extraire les coordonnées de la boîte
x1, y1, x2, y2 = box.xyxy[0].cpu().numpy()
confidence = float(box.conf[0].cpu().numpy())
class_id = int(box.cls[0].cpu().numpy())
# Obtenir le nom de la classe
class_name = FIRE_CLASS_NAMES.get(class_id, 'fire')
# Filtrer par confiance minimale
if confidence >= 0.5:
detection = Detection.objects.create(
session=session,
class_name=class_name,
confidence=confidence,
bbox_x=float(x1),
bbox_y=float(y1),
bbox_width=float(x2 - x1),
bbox_height=float(y2 - y1)
)
detections.append(detection)
logger.info(f" → Détecté: {class_name} ({confidence:.2%})")
except Exception as e:
logger.error(f"❌ Erreur lors de la détection incendie: {e}")
if "has no attribute 'bn'" in str(e):
logger.error("⚠️ Problème de compatibilité 'fusing' détecté. Vérifiez la version d'Ultralytics.")
# Détection avec le modèle d'intrusion
if intrusion_model is not None and model_type in ['intrusion', 'both']:
logger.info("👤 Exécution de la détection d'intrusion...")
try:
intrusion_results = intrusion_model(image, verbose=False)
for result in intrusion_results:
boxes = result.boxes
if boxes is not None and len(boxes) > 0:
for box in boxes:
# Extraire les coordonnées de la boîte
x1, y1, x2, y2 = box.xyxy[0].cpu().numpy()
confidence = float(box.conf[0].cpu().numpy())
class_id = int(box.cls[0].cpu().numpy())
# Obtenir le nom de la classe
class_name = INTRUSION_CLASS_NAMES.get(class_id, 'person')
# Filtrer par confiance minimale
if confidence >= 0.5:
detection = Detection.objects.create(
session=session,
class_name=class_name,
confidence=confidence,
bbox_x=float(x1),
bbox_y=float(y1),
bbox_width=float(x2 - x1),
bbox_height=float(y2 - y1)
)
detections.append(detection)
logger.info(f" → Détecté: {class_name} ({confidence:.2%})")
except Exception as e:
logger.error(f"❌ Erreur lors de la détection intrusion: {e}")
if "has no attribute 'bn'" in str(e):
logger.error("⚠️ Problème de compatibilité 'fusing' détecté. Vérifiez la version d'Ultralytics.")
# Fallback si aucun modèle n'est chargé
if fire_model is None and intrusion_model is None:
logger.warning("⚠️ Aucun modèle chargé - Mode simulation activé")
# Mode simulation pour la démo
import random
simulation_detections = [
{'class_name': 'fire', 'confidence': 0.92, 'bbox': [100, 100, 150, 150]},
{'class_name': 'person', 'confidence': 0.87, 'bbox': [200, 150, 100, 200]},
]
for sim_det in simulation_detections:
if random.random() > 0.5:
detection = Detection.objects.create(
session=session,
class_name=sim_det['class_name'],
confidence=sim_det['confidence'],
bbox_x=sim_det['bbox'][0],
bbox_y=sim_det['bbox'][1],
bbox_width=sim_det['bbox'][2],
bbox_height=sim_det['bbox'][3]
)
detections.append(detection)
logger.info(f"✅ Détections terminées: {len(detections)} objets trouvés")
except Exception as e:
logger.error(f"❌ Erreur lors de la détection: {e}")
import traceback
logger.error(traceback.format_exc())
return detections
# Par BlackBenAI: Fonction pour dessiner les boîtes de détection
def draw_detections_on_image(image_path, detections, output_path):
"""
Dessine les boîtes de détection sur l'image et sauvegarde le résultat
"""
try:
# Charger l'image
image = cv2.imread(image_path)
if image is None:
logger.error(f"❌ Impossible de charger l'image pour le dessin: {image_path}")
return
for detection in detections:
x1 = int(detection.bbox_x)
y1 = int(detection.bbox_y)
x2 = int(detection.bbox_x + detection.bbox_width)
y2 = int(detection.bbox_y + detection.bbox_height)
# Couleur selon le type de détection
if detection.class_name in ['fire']:
color = (0, 0, 255) # Rouge pour le feu
label_bg = (0, 0, 180)
elif detection.class_name in ['smoke']:
color = (128, 128, 128) # Gris pour la fumée
label_bg = (100, 100, 100)
else:
color = (255, 165, 0) # Orange pour les personnes/intrusions
label_bg = (200, 130, 0)
# Dessiner la boîte avec épaisseur
cv2.rectangle(image, (x1, y1), (x2, y2), color, 3)
# Préparer le label
label = f"{detection.class_name.upper()}: {detection.confidence:.0%}"
font = cv2.FONT_HERSHEY_SIMPLEX
font_scale = 0.6
thickness = 2
# Calculer la taille du texte
(text_width, text_height), baseline = cv2.getTextSize(label, font, font_scale, thickness)
# Dessiner le fond du label
cv2.rectangle(image,
(x1, y1 - text_height - 10),
(x1 + text_width + 10, y1),
label_bg, -1)
# Dessiner le texte
cv2.putText(image, label, (x1 + 5, y1 - 5),
font, font_scale, (255, 255, 255), thickness)
# Ajouter un watermark BlackBenAI
h, w = image.shape[:2]
watermark = "FireWatch AI by BlackBenAI"
font = cv2.FONT_HERSHEY_SIMPLEX
cv2.putText(image, watermark, (10, h - 15),
font, 0.5, (255, 255, 255), 1, cv2.LINE_AA)
# Sauvegarder l'image avec les détections
cv2.imwrite(output_path, image)
logger.info(f"✅ Image de résultat sauvegardée: {output_path}")
except Exception as e:
logger.error(f"❌ Erreur lors du dessin des détections: {e}")
# En cas d'erreur, copier l'image originale
import shutil
shutil.copy2(image_path, output_path)
def index_view(request):
"""
Vue principale - Affiche la page d'accueil
Par Marino ATOHOUN
"""
return render(request, 'detection/index.html')
@require_POST
def contact_view(request):
"""
Gère la soumission du formulaire de contact
Par Marino ATOHOUN
"""
try:
name = request.POST.get('name', '').strip()
email = request.POST.get('email', '').strip()
message = request.POST.get('message', '').strip()
if not all([name, email, message]):
return JsonResponse({
'success': False,
'error': 'Tous les champs sont requis.'
}, status=400)
# Créer le contact dans la base de données
contact = Contact.objects.create(
name=name,
email=email,
message=message
)
logger.info(f"Nouveau contact créé: {contact.id} - {name} ({email})")
return JsonResponse({
'success': True,
'message': 'Votre message a été envoyé avec succès! L\'équipe BlackBenAI vous répondra bientôt.'
})
except Exception as e:
logger.error(f"Erreur lors de la création du contact: {e}")
return JsonResponse({
'success': False,
'error': 'Une erreur est survenue lors de l\'envoi du message.'
}, status=500)
@require_POST
def analyze_image_view(request):
"""
Gère l'upload et l'analyse d'une image avec les modèles YOLOv8
Par Marino ATOHOUN & BlackBenAI Team
"""
try:
if 'image' not in request.FILES:
return JsonResponse({
'success': False,
'error': 'Aucune image fournie.'
}, status=400)
image_file = request.FILES['image']
# Vérifier le type de fichier
allowed_types = ['image/jpeg', 'image/jpg', 'image/png', 'image/bmp', 'image/webp']
if image_file.content_type not in allowed_types:
return JsonResponse({
'success': False,
'error': 'Type de fichier non supporté. Utilisez JPG, PNG, BMP ou WebP.'
}, status=400)
# Créer une session de détection
session = DetectionSession.objects.create(
detection_type='image'
)
# Sauvegarder l'image uploadée
image_path = default_storage.save(
f'uploads/images/{session.session_id}_{image_file.name}',
ContentFile(image_file.read())
)
session.original_file = image_path
session.save()
# Obtenir le chemin complet
full_image_path = os.path.join(settings.MEDIA_ROOT, image_path)
# Mesurer le temps de traitement
start_time = time.time()
# Récupérer le type de modèle choisi (par défaut 'both')
model_type = request.POST.get('model_type', 'both')
logger.info(f"🔍 Analyse demandée avec modèle: {model_type}")
# Par BlackBenAI: Effectuer la détection réelle avec YOLOv8
detections = detect_objects_in_image(full_image_path, session, model_type)
# Créer l'image de résultat avec les détections
result_filename = f'results/result_{session.session_id}.jpg'
result_path = os.path.join(settings.MEDIA_ROOT, result_filename)
os.makedirs(os.path.dirname(result_path), exist_ok=True)
draw_detections_on_image(full_image_path, detections, result_path)
# Sauvegarder le chemin du résultat
session.result_file = result_filename
session.processing_time = time.time() - start_time
session.is_processed = True
session.save()
# Préparer la réponse
detections_data = []
for detection in detections:
detections_data.append({
'class_name': detection.class_name,
'label': detection.get_class_name_display(),
'confidence': detection.confidence,
'bbox': detection.bbox_dict
})
result_image_url = request.build_absolute_uri(settings.MEDIA_URL + result_filename)
logger.info(f"✅ Analyse d'image terminée: Session {session.session_id}, {len(detections)} détections en {session.processing_time:.2f}s")
return JsonResponse({
'success': True,
'session_id': str(session.session_id),
'detections': detections_data,
'result_image_url': result_image_url,
'processing_time': session.processing_time,
'models_used': {
'fire': fire_model is not None,
'intrusion': intrusion_model is not None
}
})
except Exception as e:
logger.error(f"❌ Erreur lors de l'analyse d'image: {e}")
import traceback
logger.error(traceback.format_exc())
return JsonResponse({
'success': False,
'error': 'Une erreur est survenue lors de l\'analyse de l\'image.'
}, status=500)
@require_POST
def analyze_video_view(request):
"""
Gère l'upload et l'analyse d'une vidéo avec les modèles YOLOv8
Par Marino ATOHOUN & BlackBenAI Team
"""
try:
if 'video' not in request.FILES:
return JsonResponse({
'success': False,
'error': 'Aucune vidéo fournie.'
}, status=400)
video_file = request.FILES['video']
# Vérifier le type de fichier
allowed_types = ['video/mp4', 'video/avi', 'video/mov', 'video/mkv', 'video/webm']
if video_file.content_type not in allowed_types:
return JsonResponse({
'success': False,
'error': 'Type de fichier non supporté. Utilisez MP4, AVI, MOV, MKV ou WebM.'
}, status=400)
# Créer une session de détection
session = DetectionSession.objects.create(
detection_type='video'
)
# Sauvegarder la vidéo uploadée
video_path = default_storage.save(
f'uploads/videos/{session.session_id}_{video_file.name}',
ContentFile(video_file.read())
)
session.original_file = video_path
session.save()
# Par BlackBenAI: Analyse réelle de la vidéo
full_video_path = os.path.join(settings.MEDIA_ROOT, video_path)
# Récupérer le type de modèle choisi (par défaut 'both')
model_type = request.POST.get('model_type', 'both')
logger.info(f"🔍 Analyse vidéo demandée avec modèle: {model_type}")
start_time = time.time()
cap = cv2.VideoCapture(full_video_path)
if not cap.isOpened():
return JsonResponse({
'success': False,
'error': 'Impossible d\'ouvrir la vidéo.'
}, status=400)
fps = cap.get(cv2.CAP_PROP_FPS) or 30
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
frame_count = 0
all_detections = []
frames_analyzed = 0
# Analyser une frame toutes les N frames pour optimiser
analyze_every_n = max(1, int(fps)) # Analyser ~1 frame par seconde
# Créer un dossier temporaire pour les frames
temp_dir = os.path.join(settings.MEDIA_ROOT, 'temp', str(session.session_id))
os.makedirs(temp_dir, exist_ok=True)
best_frame = None
best_frame_detections = []
logger.info(f"📹 Analyse vidéo: {total_frames} frames, {fps:.1f} FPS")
while True:
ret, frame = cap.read()
if not ret:
break
# Analyser une frame sur N
if frame_count % analyze_every_n == 0:
# Sauvegarder la frame temporairement
temp_frame_path = os.path.join(temp_dir, f'frame_{frame_count}.jpg')
cv2.imwrite(temp_frame_path, frame)
# Créer une mini-session pour cette frame
frame_detections = detect_objects_in_image(temp_frame_path, session, model_type)
# Ajouter le numéro de frame et timestamp
for detection in frame_detections:
detection.frame_number = frame_count
detection.timestamp = frame_count / fps
detection.save()
all_detections.extend(frame_detections)
frames_analyzed += 1
# Garder la frame avec le plus de détections
if len(frame_detections) > len(best_frame_detections):
best_frame = frame.copy()
best_frame_detections = frame_detections
# Supprimer la frame temporaire
os.remove(temp_frame_path)
frame_count += 1
cap.release()
# Nettoyer le dossier temporaire
try:
os.rmdir(temp_dir)
except:
pass
# Créer une image de résultat avec la meilleure frame
result_filename = f'results/result_{session.session_id}.jpg'
result_path = os.path.join(settings.MEDIA_ROOT, result_filename)
os.makedirs(os.path.dirname(result_path), exist_ok=True)
if best_frame is not None:
temp_best_path = os.path.join(settings.MEDIA_ROOT, 'temp', f'best_{session.session_id}.jpg')
os.makedirs(os.path.dirname(temp_best_path), exist_ok=True)
cv2.imwrite(temp_best_path, best_frame)
draw_detections_on_image(temp_best_path, best_frame_detections, result_path)
os.remove(temp_best_path)
else:
# Si pas de frame, créer une image placeholder
placeholder = np.zeros((480, 640, 3), dtype=np.uint8)
cv2.putText(placeholder, "Aucune detection", (50, 240),
cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2)
cv2.imwrite(result_path, placeholder)
session.result_file = result_filename
session.processing_time = time.time() - start_time
session.is_processed = True
session.save()
# Préparer la réponse
detections_data = []
for detection in all_detections:
detections_data.append({
'class_name': detection.class_name,
'label': detection.get_class_name_display(),
'confidence': detection.confidence,
'bbox': detection.bbox_dict,
'frame_number': detection.frame_number,
'timestamp': detection.timestamp
})
result_image_url = request.build_absolute_uri(settings.MEDIA_URL + result_filename)
logger.info(f"✅ Analyse vidéo terminée: Session {session.session_id}, {len(all_detections)} détections sur {frames_analyzed} frames en {session.processing_time:.2f}s")
return JsonResponse({
'success': True,
'session_id': str(session.session_id),
'detections': detections_data,
'result_image_url': result_image_url,
'processing_time': session.processing_time,
'frames_analyzed': frames_analyzed,
'total_frames': total_frames,
'models_used': {
'fire': fire_model is not None,
'intrusion': intrusion_model is not None
}
})
except Exception as e:
logger.error(f"❌ Erreur lors de l'analyse vidéo: {e}")
import traceback
logger.error(traceback.format_exc())
return JsonResponse({
'success': False,
'error': 'Une erreur est survenue lors de l\'analyse de la vidéo.'
}, status=500)
@require_POST
def analyze_camera_view(request):
"""
Gère l'analyse en temps réel depuis la caméra
Par Marino ATOHOUN
"""
# Cette vue est appelée quand l'utilisateur capture une frame depuis la caméra
# Le frontend envoie l'image capturée comme un blob
return analyze_image_view(request) # Réutiliser la logique d'analyse d'image
@require_GET
def get_results_api(request, session_id):
"""
API pour récupérer les résultats d'une session de détection
Par Marino ATOHOUN
"""
try:
session = DetectionSession.objects.get(session_id=session_id)
detections = session.detections.all()
detections_data = []
for detection in detections:
detections_data.append({
'class_name': detection.class_name,
'label': detection.get_class_name_display(),
'confidence': detection.confidence,
'bbox': detection.bbox_dict,
'frame_number': detection.frame_number,
'timestamp': detection.timestamp
})
result_image_url = None
if session.result_file:
result_image_url = request.build_absolute_uri(settings.MEDIA_URL + session.result_file.name)
return JsonResponse({
'success': True,
'session_id': str(session.session_id),
'detection_type': session.detection_type,
'detections': detections_data,
'result_image_url': result_image_url,
'processing_time': session.processing_time,
'is_processed': session.is_processed,
'created_at': session.created_at.isoformat()
})
except DetectionSession.DoesNotExist:
return JsonResponse({
'success': False,
'error': 'Session de détection non trouvée.'
}, status=404)
except Exception as e:
logger.error(f"Erreur lors de la récupération des résultats: {e}")
return JsonResponse({
'success': False,
'error': 'Une erreur est survenue.'
}, status=500)
def download_results(request, session_id):
"""
Permet de télécharger les résultats d'une session
Par Marino ATOHOUN
"""
try:
session = DetectionSession.objects.get(session_id=session_id)
if not session.result_file:
raise Http404("Aucun fichier de résultat disponible")
# Préparer le fichier pour téléchargement
file_path = session.result_file.path
if not os.path.exists(file_path):
raise Http404("Fichier de résultat non trouvé")
with open(file_path, 'rb') as f:
response = HttpResponse(f.read(), content_type='image/jpeg')
response['Content-Disposition'] = f'attachment; filename="firewatch_result_{session_id}.jpg"'
return response
except DetectionSession.DoesNotExist:
raise Http404("Session de détection non trouvée")
except Exception as e:
logger.error(f"Erreur lors du téléchargement: {e}")
raise Http404("Erreur lors du téléchargement")
@require_GET
def models_status_api(request):
"""
API pour obtenir le statut des modèles IA
Par Marino ATOHOUN & BlackBenAI Team
"""
try:
models_status = AIModelStatus.objects.all()
status_data = []
for model in models_status:
status_data.append({
'model_type': model.model_type,
'model_name': model.get_model_type_display(),
'is_loaded': model.is_loaded,
'last_loaded': model.last_loaded.isoformat() if model.last_loaded else None,
'model_version': model.model_version,
'accuracy': model.accuracy
})
return JsonResponse({
'success': True,
'models': status_data,
'runtime_status': {
'fire_model_loaded': fire_model is not None,
'intrusion_model_loaded': intrusion_model is not None,
'models_loaded': models_loaded
}
})
except Exception as e:
logger.error(f"Erreur lors de la récupération du statut des modèles: {e}")
return JsonResponse({
'success': False,
'error': 'Une erreur est survenue.'
}, status=500)
# Par BlackBenAI: Initialiser les modèles au démarrage
def privacy_view(request):
"""Vue pour la politique de confidentialité"""
return render(request, 'detection/privacy.html')
def terms_view(request):
"""Vue pour les conditions d'utilisation"""
return render(request, 'detection/terms.html')
# Cette fonction sera appelée quand Django démarre
def initialize_models():
"""
Initialise les modèles YOLOv8 au démarrage de Django
"""
try:
load_yolo_models()
except Exception as e:
logger.error(f"❌ Erreur lors de l'initialisation des modèles: {e}")
# Par BlackBenAI: Appeler l'initialisation au chargement du module
initialize_models()