import gradio as gr import numpy as np from PIL import Image, ImageDraw import json from typing import Tuple, List, Dict, Any import time import threading import queue import os import pickle from datetime import datetime import hashlib # --- Dependency Checks --- # Try to import cv2 try: import cv2 CV2_AVAILABLE = True except ImportError: CV2_AVAILABLE = False print("Warning: OpenCV (cv2) not available. Using fallback image processing.") # Try to import face recognition try: import face_recognition FACE_REC_AVAILABLE = True except ImportError: FACE_REC_AVAILABLE = False print("Warning: face_recognition not available. Using basic face detection only.") # Try to import audio try: import pyaudio AUDIO_AVAILABLE = True except ImportError: AUDIO_AVAILABLE = False print("Warning: Audio libraries not available. Using silent alarm mode.") # Try to import TensorFlow (Pet detection) try: import tensorflow as tf from tensorflow.keras.applications.mobilenet_v2 import MobileNetV2, preprocess_input, decode_predictions TF_AVAILABLE = True except ImportError: TF_AVAILABLE = False print("Warning: TensorFlow not available. Using basic pet detection only.") # Try to import Ultralytics (YOLO) try: from ultralytics import YOLO YOLO_AVAILABLE = True except ImportError: YOLO_AVAILABLE = False print("Warning: Ultralytics (YOLO) not available. Using MobileNet/Simulation only.") # --- Audio & Alarm System --- def generate_tone(frequency, duration, sample_rate=44100, volume=0.5): """Generate a simple tone.""" if not AUDIO_AVAILABLE: return None frames = int(duration * sample_rate) arr = np.zeros(frames) for i in range(frames): arr[i] = volume * np.sin(2 * np.pi * frequency * i / sample_rate) return arr.astype(np.float32) def play_sound(sound_type): """Play different alarm sounds.""" if not AUDIO_AVAILABLE: print(f"Alarm: {sound_type} (audio not available)") return p = pyaudio.PyAudio() sound_patterns = { "Beep": [(440, 0.2), (440, 0.2)], "Siren": [(600, 0.1), (800, 0.1), (600, 0.1), (800, 0.1)], "Chime": [(523, 0.3), (659, 0.3), (784, 0.5)], "Alert": [(1000, 0.1), (1500, 0.1), (2000, 0.1)], "Buzzer": [(200, 0.5)], "Ring": [(800, 0.2), (600, 0.2), (800, 0.2), (600, 0.2)], "Woof": [(600, 0.3), (400, 0.2)], "Meow": [(800, 0.2), (1000, 0.1)] } try: stream = p.open(format=pyaudio.paFloat32, channels=1, rate=44100, output=True) if sound_type in sound_patterns: for freq, duration in sound_patterns[sound_type]: tone = generate_tone(freq, duration) if tone is not None: stream.write(tone.tobytes()) stream.stop_stream() stream.close() except Exception as e: print(f"Error playing sound: {e}") finally: p.terminate() class AlarmSystem: def __init__(self): self.alarm_queue = queue.Queue() self.alarm_thread = threading.Thread(target=self._alarm_worker, daemon=True) self.alarm_thread.start() self.last_alarm_time = 0 self.alarm_cooldown = 2 def _alarm_worker(self): while True: try: sound_type = self.alarm_queue.get(timeout=1) if sound_type: play_sound(sound_type) self.alarm_queue.task_done() except queue.Empty: continue except Exception as e: print(f"Alarm worker error: {e}") def trigger_alarm(self, sound_type): current_time = time.time() if current_time - self.last_alarm_time > self.alarm_cooldown: self.alarm_queue.put(sound_type) self.last_alarm_time = current_time return True return False # --- Databases --- class PetDatabase: def __init__(self, db_file="pet_database.pkl"): self.db_file = db_file self.known_pets = {} self.load_database() def load_database(self): if os.path.exists(self.db_file): try: with open(self.db_file, 'rb') as f: self.known_pets = pickle.load(f) except Exception as e: print(f"Error loading pet database: {e}") self.known_pets = {} def save_database(self): try: with open(self.db_file, 'wb') as f: pickle.dump(self.known_pets, f) except Exception as e: print(f"Error saving pet database: {e}") def add_pet(self, name, pet_type, breed, features, image_path=None): pet_id = hashlib.md5(f"{name}_{pet_type}_{datetime.now().isoformat()}".encode()).hexdigest()[:8] self.known_pets[pet_id] = { "name": name, "type": pet_type, "breed": breed, "features": features, "image_path": image_path, "registered_at": datetime.now().isoformat() } self.save_database() return pet_id def remove_pet(self, pet_id): if pet_id in self.known_pets: del self.known_pets[pet_id] self.save_database() return True return False def get_all_pets(self): return self.known_pets def recognize_pet(self, features, pet_type, tolerance=0.7): if not self.known_pets: return None, 0.0 best_match, best_similarity = None, 0.0 for pet_id, pet_data in self.known_pets.items(): if pet_data["type"] == pet_type and pet_data["features"] is not None and features is not None: try: similarity = np.dot(features, known_features) / (np.linalg.norm(features) * np.linalg.norm(known_features)) if similarity > tolerance and similarity > best_similarity: best_similarity = similarity best_match = pet_id except: continue return (best_match, best_similarity) if best_match else (None, 0.0) class FaceDatabase: def __init__(self, db_file="face_database.pkl"): self.db_file = db_file self.known_faces = {} self.load_database() def load_database(self): if os.path.exists(self.db_file): try: with open(self.db_file, 'rb') as f: self.known_faces = pickle.load(f) except Exception as e: self.known_faces = {} def save_database(self): try: with open(self.db_file, 'wb') as f: pickle.dump(self.known_faces, f) except Exception as e: print(f"Error saving face database: {e}") def add_face(self, name, encoding, image_path=None): face_id = hashlib.md5(f"{name}_{datetime.now().isoformat()}".encode()).hexdigest()[:8] self.known_faces[face_id] = { "name": name, "encoding": encoding, "image_path": image_path, "registered_at": datetime.now().isoformat() } self.save_database() return face_id def remove_face(self, face_id): if face_id in self.known_faces: del self.known_faces[face_id] self.save_database() return True return False def get_all_faces(self): return self.known_faces def recognize_face(self, face_encoding, tolerance=0.6): if not self.known_faces or not FACE_REC_AVAILABLE: return None, 0.0 best_match, best_distance = None, float('inf') for face_id, face_data in self.known_faces.items(): known_encoding = face_data["encoding"] if len(known_encoding) > 0 and len(face_encoding) > 0: try: distance = face_recognition.face_distance([known_encoding], face_encoding)[0] if distance < tolerance and distance < best_distance: best_distance = distance best_match = face_id except: continue return (best_match, 1.0 - best_distance) if best_match else (None, 0.0) # --- Global Objects --- alarm_system = AlarmSystem() face_db = FaceDatabase() pet_db = PetDatabase() # --- Feature Extraction & Detection Logic --- def extract_pet_features(image, pet_type): try: if isinstance(image, Image.Image): image = np.array(image) if CV2_AVAILABLE: image_resized = cv2.resize(image, (224, 224)) image_rgb = cv2.cvtColor(image_resized, cv2.COLOR_BGR2RGB) if image_resized.shape[2] == 3 else image_resized features = (image_rgb.astype(np.float32) / 255.0).flatten()[::8] return features else: image_resized = np.array(Image.fromarray(image).resize((64, 64))) features = np.concatenate([np.histogram(image_resized[:,:,i], bins=16, range=(0,256))[0] for i in range(3)]) return features.astype(np.float32) except: return None def detect_dogs_and_cats(image, confidence): try: if isinstance(image, Image.Image): image = np.array(image) detections = [] if TF_AVAILABLE: try: image_resized = cv2.resize(image, (224, 224)) image_batch = np.expand_dims(image_resized, axis=0) image_preprocessed = preprocess_input(image_batch) if not hasattr(detect_dogs_and_cats, 'model'): detect_dogs_and_cats.model = MobileNetV2(weights='imagenet') predictions = detect_dogs_and_cats.model.predict(image_preprocessed, verbose=0) decoded = decode_predictions(predictions, top=5)[0] dog_cat_classes = { 'golden_retriever': 'Dog (Golden Retriever)', 'Labrador_retriever': 'Dog (Labrador)', 'German_shepherd': 'Dog (German Shepherd)', 'beagle': 'Dog (Beagle)', 'Siamese_cat': 'Cat (Siamese)', 'Persian_cat': 'Cat (Persian)', 'tabby_cat': 'Cat (Tabby)', 'Egyptian_cat': 'Cat (Egyptian)' } for _, label, score in decoded: for class_key, display_name in dog_cat_classes.items(): if class_key in label.lower() and score > confidence: detections.append({ "bbox": [50, 50, image.shape[1]-100, image.shape[0]-100], "confidence": round(float(score), 3), "label": display_name, "type": "pet" }) break except Exception as e: print(f"TensorFlow detection failed: {e}") if not detections: # Fallback simulation if np.random.random() > 0.3: pet_types = [{"label": "Dog", "type": "pet"}, {"label": "Cat", "type": "pet"}] pet_info = np.random.choice(pet_types) detections.append({ "bbox": [np.random.randint(0, max(1, image.shape[1]//2)), np.random.randint(0, max(1, image.shape[0]//2)), 200, 200], "confidence": round(np.random.uniform(confidence, 0.95), 3), "label": pet_info["label"], "type": pet_info["type"], "recognized": None, "recognition_confidence": 0.0 }) return detections except: return [] def extract_face_encoding(image): if not FACE_REC_AVAILABLE: return None try: rgb_image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) if isinstance(image, np.ndarray) else np.array(image) locs = face_recognition.face_locations(rgb_image) encs = face_recognition.face_encodings(rgb_image, locs) return encs[0] if encs else None except: return None def register_face(image, name): if image is None or not name.strip(): return False, "Invalid input" encoding = extract_face_encoding(image) if encoding is None: return False, "No face detected" timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") image_filename = f"registered_faces/{name.strip()}_{timestamp}.jpg" os.makedirs("registered_faces", exist_ok=True) Image.fromarray(image).save(image_filename) face_id = face_db.add_face(name.strip(), encoding, image_filename) return True, f"Registered face: {name} (ID: {face_id})" def register_pet(image, name, pet_type, breed): if image is None or not name.strip(): return False, "Invalid input" features = extract_pet_features(image, pet_type) if features is None: return False, "Could not extract features" timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") image_filename = f"registered_pets/{pet_type}_{name.strip()}_{timestamp}.jpg" os.makedirs("registered_pets", exist_ok=True) Image.fromarray(image).save(image_filename) pet_id = pet_db.add_pet(name.strip(), pet_type, breed, features, image_filename) return True, f"Registered {pet_type}: {name} (ID: {pet_id})" def load_detection_models(): face_cascade, object_net, object_classes = None, None, [] if CV2_AVAILABLE: try: face_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml') try: object_net = cv2.dnn.readNetFromCaffe("MobileNetSSD_deploy.prototxt", "MobileNetSSD_deploy.caffemodel") object_classes = ["background", "aeroplane", "bicycle", "bird", "boat", "bottle", "bus", "car", "cat", "chair", "cow", "diningtable", "dog", "horse", "motorbike", "person", "pottedplant", "sheep", "sofa", "train", "tvmonitor"] except: pass except: pass return face_cascade, object_net, object_classes face_cascade, object_net, object_classes = load_detection_models() # --- Detection Implementations --- def detect_faces_cv2(image, face_cascade, confidence): try: gray = cv2.cvtColor(image, cv2.COLOR_RGB2GRAY) faces = face_cascade.detectMultiScale(gray, 1.1, 5) results = [] encodings = face_recognition.face_encodings(image) if FACE_REC_AVAILABLE else [None]*len(faces) for i, (x, y, w, h) in enumerate(faces): rec_name, rec_conf = None, 0.0 if i < len(encodings) and encodings[i] is not None: fid, conf = face_db.recognize_face(encodings[i]) if fid: rec_name = face_db.known_faces[fid]["name"] rec_conf = conf results.append({ "bbox": [int(x), int(y), int(w), int(h)], "confidence": 0.9, "label": "face", "recognized": rec_name, "recognition_confidence": round(rec_conf, 3) }) return results except: return [] def detect_objects_cv2(image, net, classes, confidence): try: if net is None: return [] h, w = image.shape[:2] blob = cv2.dnn.blobFromImage(image, 0.007843, (300, 300), 127.5) net.setInput(blob) detections = net.forward() objects = [] for i in range(detections.shape[2]): score = detections[0, 0, i, 2] if score > confidence: idx = int(detections[0, 0, i, 1]) if idx < len(classes): x1, y1 = int(detections[0, 0, i, 3] * w), int(detections[0, 0, i, 4] * h) x2, y2 = int(detections[0, 0, i, 5] * w), int(detections[0, 0, i, 6] * h) objects.append({ "bbox": [x1, y1, x2 - x1, y2 - y1], "confidence": round(float(score), 3), "label": classes[idx] }) return objects except: return [] def detect_objects_yolo(image, model_name, confidence): """Detect objects using YOLOv8.""" if not YOLO_AVAILABLE: return [] try: # Load model (lazy loading) if not hasattr(detect_objects_yolo, 'models'): detect_objects_yolo.models = {} if model_name not in detect_objects_yolo.models: print(f"Loading YOLO model: {model_name}") detect_objects_yolo.models[model_name] = YOLO(f"{model_name}.pt") model = detect_objects_yolo.models[model_name] results = model(image, conf=confidence, verbose=False) objects = [] for r in results: boxes = r.boxes for box in boxes: x1, y1, x2, y2 = box.xyxy[0].tolist() conf = float(box.conf[0]) cls = int(box.cls[0]) label = model.names[cls] objects.append({ "bbox": [int(x1), int(y1), int(x2-x1), int(y2-y1)], "confidence": round(conf, 3), "label": label }) return objects except Exception as e: print(f"YOLO detection error: {e}") return [] def detect_objects_pil(image, confidence): # Simulation fallback width, height = Image.fromarray(image).size objects = [] for i in range(min(5, np.random.randint(0, 5) + 1)): x, y = np.random.randint(0, width-100), np.random.randint(0, height-100) objects.append({ "bbox": [x, y, 100, 100], "confidence": round(np.random.uniform(0.4, 0.9), 3), "label": np.random.choice(["person", "car", "bottle", "chair", "laptop"]) }) return objects def process_image(image, face_cascade, object_net, object_classes, enable_face, enable_objects, enable_pets, face_conf, object_conf, pet_conf, object_model_type): face_results, object_results, pet_results = [], [], [] if enable_face: face_results = detect_faces_cv2(image, face_cascade, face_conf) if CV2_AVAILABLE and face_cascade else [] if enable_objects: if object_model_type.startswith("YOLO") and YOLO_AVAILABLE: # Map readable name to model filename yolo_map = { "YOLOv8 Nano": "yolov8n", "YOLOv8 Small": "yolov8s", "YOLOv8 Medium": "yolov8m" } model_file = yolo_map.get(object_model_type, "yolov8n") object_results = detect_objects_yolo(image, model_file, object_conf) elif object_model_type == "MobileNet SSD" and CV2_AVAILABLE and object_net: object_results = detect_objects_cv2(image, object_net, object_classes, object_conf) else: object_results = detect_objects_pil(image, object_conf) if enable_pets: pet_results = detect_dogs_and_cats(image, pet_conf) # Pet recognition logic for pet in pet_results: features = extract_pet_features(image, "dog" if "dog" in pet["label"].lower() else "cat") if features is not None: pid, conf = pet_db.recognize_pet(features, "dog" if "dog" in pet["label"].lower() else "cat") if pid: pet["recognized"] = pet_db.known_pets[pid]["name"] pet["recognition_confidence"] = round(conf, 3) return image.copy(), face_results, object_results, pet_results def draw_detections(image, face_results, object_results, pet_results, show_labels, box_color): pil_image = Image.fromarray(image) draw = ImageDraw.Draw(pil_image) colors = {"red": (255,0,0), "green": (0,255,0), "blue": (0,0,255)} base_color = colors.get(box_color, (255,0,0)) for results, default_color in [(face_results, base_color), (object_results, base_color), (pet_results, (255,140,0))]: for res in results: x, y, w, h = res["bbox"] color = (0,255,0) if res.get("recognized") else default_color draw.rectangle([x, y, x+w, y+h], outline=color, width=3) if show_labels: label = f"{res.get('recognized', res['label'])} {res.get('confidence', '')}" draw.text((x, y-20), label, fill=color) return np.array(pil_image) def check_and_trigger_alarm(face_results, object_results, pet_results, alarm_settings): if not alarm_settings.get("alarm_enabled"): return False, "Alarm disabled" triggered, reason = False, "" if alarm_settings.get("face_alarm") and face_results: if alarm_settings.get("recognized_faces_only"): if any(f.get("recognized") for f in face_results): triggered, reason = True, "Recognized Face" else: triggered, reason = True, "Face Detected" elif alarm_settings.get("object_alarm") and object_results: targets = alarm_settings.get("target_objects", []) if targets: if any(obj["label"] in targets for obj in object_results): triggered, reason = True, "Target Object" else: triggered, reason = True, "Object Detected" elif alarm_settings.get("pet_alarm") and pet_results: triggered, reason = True, "Pet Detected" if triggered: sound = "Woof" if pet_results and alarm_settings.get("pet_sound_enabled") else alarm_settings.get("alarm_sound", "Beep") if alarm_system.trigger_alarm(sound): return True, f"🚨 ALARM: {reason}" return False, "Alarm Cooldown" return False, "No conditions met" def process_pipeline(image, enable_face, enable_obj, enable_pet, conf_face, conf_obj, conf_pet, draw, labels, color, obj_model, alarm_settings): if image is None: return None, "", "", "", "" if isinstance(image, Image.Image): image = np.array(image) proc_img, faces, objects, pets = process_image(image, face_cascade, object_net, object_classes, enable_face, enable_obj, enable_pet, conf_face, conf_obj, conf_pet, obj_model) _, alarm_msg = check_and_trigger_alarm(faces, objects, pets, alarm_settings) if draw: proc_img = draw_detections(proc_img, faces, objects, pets, labels, color) return proc_img, json.dumps(faces, indent=2), json.dumps(objects, indent=2), json.dumps(pets, indent=2), alarm_msg def get_detection_statistics(): stats = { "face_detection": {"model": "Haar Cascade" if CV2_AVAILABLE else "Simulated", "registered": len(face_db.known_faces)}, "object_detection": { "available_models": ["MobileNet SSD", "YOLOv8 Nano", "YOLOv8 Small"] if YOLO_AVAILABLE else ["MobileNet SSD", "Simulated"], "ultralytics_installed": YOLO_AVAILABLE, "opencv_installed": CV2_AVAILABLE }, "pet_detection": {"model": "MobileNetV2" if TF_AVAILABLE else "Simulated", "registered": len(pet_db.known_pets)} } return json.dumps(stats, indent=2) # --- Gradio UI --- custom_css = """ .warning-box { background-color: #fff3cd; border: 1px solid #ffeaa7; padding: 10px; border-radius: 5px; } .alarm-box { background-color: #f8d7da; border: 1px solid #f5c6cb; padding: 10px; border-radius: 5px; animation: pulse 1s infinite; } @keyframes pulse { 0% { opacity: 1; } 50% { opacity: 0.7; } 100% { opacity: 1; } } """ with gr.Blocks(css=custom_css, title="Security & Recognition System") as demo: gr.Markdown(""" # 🔍 Built with anycoder: Multi-Model Recognition Platform Real-time detection with support for **MobileNet SSD** and **YOLOv8** models. """) # Warnings if not CV2_AVAILABLE: gr.HTML('