Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| import cv2 | |
| import numpy as np | |
| import requests | |
| import tempfile | |
| import mediapipe as mp | |
| import os | |
| import pickle | |
| import io | |
| from googleapiclient.discovery import build | |
| from googleapiclient.http import MediaFileUpload | |
| from google.auth.transport.requests import Request | |
| from PIL import Image | |
| from simple_salesforce import Salesforce | |
| from datetime import datetime | |
| import uuid | |
| # === SALESFORCE SETUP === | |
| sf = Salesforce( | |
| username='suriraja822@agentforce.com', | |
| password='Sati@1010', | |
| security_token='B9nS0HEyBE7YmWllqXCyiOJpY', | |
| domain='login' | |
| ) | |
| def log_intruder_to_salesforce(frame_id): | |
| try: | |
| sf.Hotel_Security_Alert__c.create({ | |
| 'Alert_Type__c': 'Critical', | |
| 'Detection_Timestamp__c': datetime.utcnow().isoformat(), | |
| 'HF_Detection_ID__c': f'intruder_{frame_id}_{uuid.uuid4()}', | |
| 'Confidence_Score__c': 0.90, | |
| 'Description__c': 'Detected unrecognized face from CCTV feed', | |
| 'Status__c': 'Active', | |
| 'Title__c': f'Intruder Detected Frame {frame_id}', | |
| 'Priority__c': 'High' | |
| }) | |
| except Exception as e: | |
| st.warning(f"β οΈ Failed to log to Salesforce: {e}") | |
| # === CONFIGURATION === | |
| API_KEY = "AIzaSyDojJrpauA0XZtCCDUuo9xeQHZQamYKsC4" | |
| CCTVFEED_IDS = ['1KJRkSf2SKEZ1mXS9_si65IwMBtjs6p4n'] | |
| REG_FOLDER_ID = '1qkcR7nQTEtiMH9OFUv2bGxVn08E3dKjF' | |
| INTRUDER_FOLDER_ID = '1PPAUWU-wMx7fek73p-hqPqYQypYtG8Ob' | |
| FRAME_SKIP = st.sidebar.slider("β© Frame Skip", 1, 15, 5) | |
| # === SETUP === | |
| mp_face_detection = mp.solutions.face_detection | |
| face_detector = mp_face_detection.FaceDetection(model_selection=0, min_detection_confidence=0.5) | |
| def mse(img1, img2): | |
| try: | |
| if img1.shape != img2.shape: | |
| img2 = cv2.resize(img2, (img1.shape[1], img1.shape[0])) | |
| return np.mean((img1.astype("float") - img2.astype("float")) ** 2) | |
| except Exception: | |
| return float("inf") | |
| def is_duplicate(img, img_list, threshold=200): | |
| return any(mse(img, existing) < threshold for existing in img_list) | |
| def get_drive_file_links(folder_id, filter_video=False): | |
| url = f"https://www.googleapis.com/drive/v3/files?q='{folder_id}'+in+parents&key={API_KEY}&fields=files(id,name,mimeType)" | |
| response = requests.get(url) | |
| files = response.json().get("files", []) | |
| links = [] | |
| for f in files: | |
| mime = f["mimeType"] | |
| if (not filter_video and mime.startswith("image/")) or (filter_video and mime.startswith("video/")): | |
| link = f"https://drive.google.com/uc?id={f['id']}&export=download" | |
| links.append((f["name"], link)) | |
| return links | |
| def download_file(link, suffix): | |
| response = requests.get(link, stream=True) | |
| temp = tempfile.NamedTemporaryFile(delete=False, suffix=suffix) | |
| for chunk in response.iter_content(1024 * 1024): | |
| temp.write(chunk) | |
| temp.close() | |
| return temp.name | |
| def detect_faces(image): | |
| rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) | |
| results = face_detector.process(rgb) | |
| faces = [] | |
| if results.detections: | |
| for det in results.detections: | |
| bbox = det.location_data.relative_bounding_box | |
| h, w, _ = image.shape | |
| x, y, bw, bh = int(bbox.xmin * w), int(bbox.ymin * h), int(bbox.width * w), int(bbox.height * h) | |
| crop = image[max(0, y):y+bh, max(0, x):x+bw] | |
| if crop.size > 0: | |
| faces.append(crop) | |
| return faces | |
| def upload_to_drive(image_np, filename): | |
| creds = None | |
| if os.path.exists("token.pickle"): | |
| with open("token.pickle", "rb") as token: | |
| creds = pickle.load(token) | |
| if not creds or not creds.valid: | |
| if creds and creds.expired and creds.refresh_token: | |
| creds.refresh(Request()) | |
| service = build('drive', 'v3', credentials=creds) | |
| image_pil = Image.fromarray(cv2.cvtColor(image_np, cv2.COLOR_BGR2RGB)) | |
| buf = io.BytesIO() | |
| image_pil.save(buf, format="JPEG") | |
| buf.seek(0) | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") as tmp: | |
| tmp.write(buf.read()) | |
| tmp.flush() | |
| file_metadata = {'name': filename, 'parents': [INTRUDER_FOLDER_ID]} | |
| media = MediaFileUpload(tmp.name, mimetype='image/jpeg') | |
| service.files().create(body=file_metadata, media_body=media, fields='id').execute() | |
| os.remove(tmp.name) | |
| def detect_faces_from_video(video_path, registered_faces): | |
| cap = cv2.VideoCapture(video_path) | |
| detected = [] | |
| unique = [] | |
| frame_id = 0 | |
| while cap.isOpened(): | |
| ret, frame = cap.read() | |
| if not ret: | |
| break | |
| frame_id += 1 | |
| if frame_id % FRAME_SKIP != 0: | |
| continue | |
| faces = detect_faces(frame) | |
| for f in faces: | |
| if not any(mse(f, reg) < 200 for reg in registered_faces) and not is_duplicate(f, unique): | |
| unique.append(f) | |
| filename = f"intruder_{frame_id}.jpg" | |
| upload_to_drive(f, filename) | |
| log_intruder_to_salesforce(frame_id) | |
| detected.extend(faces) | |
| cap.release() | |
| return detected, unique | |
| # === STREAMLIT UI === | |
| st.title("π Intruder Detection") | |
| st.write("Streams from Google Drive and detects intruders using MediaPipe.") | |
| st.sidebar.header("π Registered Faces") | |
| reg_faces_raw = get_drive_file_links(REG_FOLDER_ID, filter_video=False) | |
| registered_faces = [] | |
| for name, link in reg_faces_raw: | |
| img_path = download_file(link, ".jpg") | |
| img = cv2.imread(img_path) | |
| faces = detect_faces(img) | |
| for f in faces: | |
| registered_faces.append(f) | |
| st.sidebar.image(cv2.cvtColor(f, cv2.COLOR_BGR2RGB), caption=name, width=100) | |
| os.remove(img_path) | |
| st.sidebar.header("πΈ Detected Intruders") | |
| with st.spinner("Processing videos..."): | |
| total_uploaded = 0 | |
| for folder_id in CCTVFEED_IDS: | |
| vids = get_drive_file_links(folder_id, filter_video=True) | |
| for vname, vlink in vids: | |
| st.subheader(f"π₯ {vname}") | |
| vfile = download_file(vlink, ".mp4") | |
| detected_faces, unique_faces = detect_faces_from_video(vfile, registered_faces) | |
| for i, f in enumerate(unique_faces): | |
| st.sidebar.image(cv2.cvtColor(f, cv2.COLOR_BGR2RGB), caption=f"Intruder {i+1}", width=100) | |
| if unique_faces: | |
| st.success(f"β Uploaded {len(unique_faces)} intruder(s).") | |
| else: | |
| st.info("π No new intruders detected.") | |
| os.remove(vfile) | |
| total_uploaded += len(unique_faces) | |
| st.info("π― Completed processing all videos.") | |