SuriRaja commited on
Commit
8140af8
·
verified ·
1 Parent(s): 86a3f51

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +100 -98
app.py CHANGED
@@ -1,3 +1,4 @@
 
1
  import streamlit as st
2
  import cv2
3
  import numpy as np
@@ -6,133 +7,134 @@ import tempfile
6
  import mediapipe as mp
7
  import os
8
  import pickle
 
9
  from googleapiclient.discovery import build
10
- from googleapiclient.http import MediaFileUpload
 
11
  from google.auth.transport.requests import Request
 
12
 
13
  # === CONFIGURATION ===
14
  API_KEY = "AIzaSyDojJrpauA0XZtCCDUuo9xeQHZQamYKsC4"
15
  CCTVFEED_IDS = ['1KJRkSf2SKEZ1mXS9_si65IwMBtjs6p4n']
16
  REG_FOLDER_ID = '1qkcR7nQTEtiMH9OFUv2bGxVn08E3dKjF'
17
  INTRUDER_FOLDER_ID = '1PPAUWU-wMx7fek73p-hqPqYQypYtG8Ob'
18
- MSE_THRESHOLD = 1000
19
 
20
  # === SETUP ===
21
  mp_face_detection = mp.solutions.face_detection
22
- mp_drawing = mp.solutions.drawing_utils
23
  face_detector = mp_face_detection.FaceDetection(model_selection=0, min_detection_confidence=0.5)
24
 
25
- def mse(img1, img2):
26
- return np.mean((img1.astype("float") - img2.astype("float")) ** 2)
 
 
 
 
 
 
 
 
 
 
 
 
 
27
 
28
- def download_image(url):
29
- resp = requests.get(url, stream=True)
30
- arr = np.asarray(bytearray(resp.content), dtype=np.uint8)
31
- return cv2.imdecode(arr, cv2.IMREAD_COLOR)
32
 
33
- def get_faces_from_drive(folder_id):
 
34
  api_url = f"https://www.googleapis.com/drive/v3/files?q='{folder_id}'+in+parents&key={API_KEY}&fields=files(id,name,mimeType)"
35
- results = requests.get(api_url).json().get("files", [])
36
- faces = []
37
- for f in results:
 
38
  if f["mimeType"].startswith("image/"):
39
  url = f"https://drive.google.com/uc?id={f['id']}&export=download"
40
- image = download_image(url)
41
- rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
42
- detections = face_detector.process(rgb)
43
- if detections.detections:
44
- for d in detections.detections:
45
- r = d.location_data.relative_bounding_box
46
- ih, iw, _ = image.shape
47
- x, y, w, h = int(r.xmin * iw), int(r.ymin * ih), int(r.width * iw), int(r.height * ih)
48
- face = image[y:y+h, x:x+w]
49
- if face.size:
50
- face = cv2.resize(face, (100, 100))
51
- faces.append(face)
52
- return faces
53
 
54
- def get_video_links(folder_id):
55
  api_url = f"https://www.googleapis.com/drive/v3/files?q='{folder_id}'+in+parents&key={API_KEY}&fields=files(id,name,mimeType)"
56
- results = requests.get(api_url).json().get("files", [])
57
- return [(f["name"], f"https://drive.google.com/uc?id={f['id']}&export=download")
58
- for f in results if f["mimeType"].startswith("video/")]
59
 
60
  def download_video(link):
61
- resp = requests.get(link, stream=True)
62
  temp = tempfile.NamedTemporaryFile(delete=False, suffix=".mp4")
63
- for chunk in resp.iter_content(1024*1024):
64
  temp.write(chunk)
65
  temp.close()
66
  return temp.name
67
 
68
- def upload_intruder(img):
69
- creds = None
70
- if os.path.exists("token.pickle"):
71
- with open("token.pickle", "rb") as token:
72
- creds = pickle.load(token)
73
- if not creds or not creds.valid:
74
- if creds and creds.expired and creds.refresh_token:
75
- creds.refresh(Request())
76
- service = build('drive', 'v3', credentials=creds)
77
- temp = tempfile.NamedTemporaryFile(delete=False, suffix=".jpg")
78
- cv2.imwrite(temp.name, img)
79
- file_metadata = {'name': os.path.basename(temp.name), 'parents': [INTRUDER_FOLDER_ID]}
80
- media = MediaFileUpload(temp.name, mimetype='image/jpeg')
81
- service.files().create(body=file_metadata, media_body=media).execute()
82
- os.remove(temp.name)
 
 
83
 
84
- def process_video(path, reg_faces, intruder_gallery):
85
- cap = cv2.VideoCapture(path)
86
- uploaded_faces = []
87
- uploaded = 0
88
- while cap.isOpened():
89
- ret, frame = cap.read()
90
- if not ret:
91
- break
92
- rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
93
- results = face_detector.process(rgb)
94
- if results.detections:
95
- for d in results.detections:
96
- r = d.location_data.relative_bounding_box
97
- ih, iw, _ = frame.shape
98
- x, y, w, h = int(r.xmin * iw), int(r.ymin * ih), int(r.width * iw), int(r.height * ih)
99
- face = frame[y:y+h, x:x+w]
100
- if face.size:
101
- face_resized = cv2.resize(face, (100, 100))
102
- if not any(mse(face_resized, r) < MSE_THRESHOLD for r in reg_faces + uploaded_faces):
103
- uploaded_faces.append(face_resized)
104
- intruder_gallery.image(face, caption="👀 New Intruder", use_column_width=True)
105
- try:
106
- upload_intruder(face)
107
- uploaded += 1
108
- except:
109
- st.warning("Upload failed.")
110
- cap.release()
111
- return uploaded
112
 
113
- # === STREAMLIT UI ===
114
- st.set_page_config(layout="wide")
115
- st.title("🔐 Intruder Detection (MediaPipe + Google Drive)")
116
- sidebar = st.sidebar
117
- sidebar.header("🧍 Registered Faces")
118
- registered_faces = get_faces_from_drive(REG_FOLDER_ID)
119
- for idx, face in enumerate(registered_faces):
120
- sidebar.image(cv2.cvtColor(face, cv2.COLOR_BGR2RGB), caption=f"Reg {idx+1}", use_column_width=True)
 
 
 
 
 
 
 
 
121
 
122
- intruder_gallery = st.sidebar.container()
123
- intruder_gallery.header("🚨 Intruders")
 
124
 
125
- total_uploaded = 0
126
- for folder_id in CCTVFEED_IDS:
127
- videos = get_video_links(folder_id)
128
- for vname, link in videos:
129
- st.subheader(f"🎥 {vname}")
130
- with st.spinner("Downloading video..."):
131
- video_file = download_video(link)
132
- with st.spinner("Detecting faces..."):
133
- uploaded = process_video(video_file, registered_faces, intruder_gallery)
134
- st.success(f"✅ Uploaded {uploaded} intruder(s) from {vname}")
135
- total_uploaded += uploaded
136
- os.remove(video_file)
137
 
138
- st.info(f"🎯 Done. Total intruders uploaded: {total_uploaded}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+
2
  import streamlit as st
3
  import cv2
4
  import numpy as np
 
7
  import mediapipe as mp
8
  import os
9
  import pickle
10
+ import io
11
  from googleapiclient.discovery import build
12
+ from googleapiclient.http import MediaIoBaseUpload
13
+ from google_auth_oauthlib.flow import InstalledAppFlow
14
  from google.auth.transport.requests import Request
15
+ from PIL import Image
16
 
17
  # === CONFIGURATION ===
18
  API_KEY = "AIzaSyDojJrpauA0XZtCCDUuo9xeQHZQamYKsC4"
19
  CCTVFEED_IDS = ['1KJRkSf2SKEZ1mXS9_si65IwMBtjs6p4n']
20
  REG_FOLDER_ID = '1qkcR7nQTEtiMH9OFUv2bGxVn08E3dKjF'
21
  INTRUDER_FOLDER_ID = '1PPAUWU-wMx7fek73p-hqPqYQypYtG8Ob'
22
+ SCOPES = ['https://www.googleapis.com/auth/drive.file']
23
 
24
  # === SETUP ===
25
  mp_face_detection = mp.solutions.face_detection
 
26
  face_detector = mp_face_detection.FaceDetection(model_selection=0, min_detection_confidence=0.5)
27
 
28
+ # === AUTH ===
29
+ def get_drive_service_with_pickle():
30
+ creds = None
31
+ if os.path.exists("token.pickle"):
32
+ with open("token.pickle", "rb") as token:
33
+ creds = pickle.load(token)
34
+ if not creds or not creds.valid:
35
+ if creds and creds.expired and creds.refresh_token:
36
+ creds.refresh(Request())
37
+ else:
38
+ flow = InstalledAppFlow.from_client_secrets_file("credentials.json", SCOPES)
39
+ creds = flow.run_local_server(port=0)
40
+ with open("token.pickle", "wb") as token:
41
+ pickle.dump(creds, token)
42
+ return build("drive", "v3", credentials=creds)
43
 
44
+ drive_service = get_drive_service_with_pickle()
 
 
 
45
 
46
+ # === UTILITY FUNCTIONS ===
47
+ def get_drive_images(folder_id):
48
  api_url = f"https://www.googleapis.com/drive/v3/files?q='{folder_id}'+in+parents&key={API_KEY}&fields=files(id,name,mimeType)"
49
+ res = requests.get(api_url)
50
+ files = res.json().get("files", [])
51
+ images = []
52
+ for f in files:
53
  if f["mimeType"].startswith("image/"):
54
  url = f"https://drive.google.com/uc?id={f['id']}&export=download"
55
+ img_resp = requests.get(url)
56
+ img = Image.open(io.BytesIO(img_resp.content)).convert("RGB")
57
+ images.append(np.array(img))
58
+ return images
 
 
 
 
 
 
 
 
 
59
 
60
+ def get_drive_video_links(folder_id):
61
  api_url = f"https://www.googleapis.com/drive/v3/files?q='{folder_id}'+in+parents&key={API_KEY}&fields=files(id,name,mimeType)"
62
+ response = requests.get(api_url)
63
+ files = response.json().get("files", [])
64
+ return [(f["name"], f"https://drive.google.com/uc?id={f['id']}&export=download") for f in files if f["mimeType"].startswith("video/")]
65
 
66
  def download_video(link):
67
+ response = requests.get(link, stream=True)
68
  temp = tempfile.NamedTemporaryFile(delete=False, suffix=".mp4")
69
+ for chunk in response.iter_content(chunk_size=1024 * 1024):
70
  temp.write(chunk)
71
  temp.close()
72
  return temp.name
73
 
74
+ def faces_from_frame(frame):
75
+ rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
76
+ results = face_detector.process(rgb)
77
+ faces = []
78
+ if results.detections:
79
+ for det in results.detections:
80
+ bbox = det.location_data.relative_bounding_box
81
+ h, w, _ = frame.shape
82
+ x, y = int(bbox.xmin * w), int(bbox.ymin * h)
83
+ width, height = int(bbox.width * w), int(bbox.height * h)
84
+ face = frame[y:y + height, x:x + width]
85
+ if face.size > 0:
86
+ faces.append(cv2.resize(face, (128, 128)))
87
+ return faces
88
+
89
+ def mse(img1, img2):
90
+ return np.mean((img1.astype("float") - img2.astype("float")) ** 2)
91
 
92
+ def is_duplicate(face, uploaded_faces, threshold=200):
93
+ return any(mse(face, f) < threshold for f in uploaded_faces)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
94
 
95
+ def upload_intruder_image(image_np):
96
+ try:
97
+ img = Image.fromarray(image_np)
98
+ buf = io.BytesIO()
99
+ img.save(buf, format='JPEG')
100
+ buf.seek(0)
101
+ media = MediaIoBaseUpload(buf, mimetype="image/jpeg")
102
+ drive_service.files().create(
103
+ body={"name": "intruder.jpg", "parents": [INTRUDER_FOLDER_ID]},
104
+ media_body=media,
105
+ fields="id"
106
+ ).execute()
107
+ return True
108
+ except Exception as e:
109
+ st.error(f"Upload error: {e}")
110
+ return False
111
 
112
+ # === MAIN APP ===
113
+ st.set_page_config(page_title="CCTV Face Intruder Detection", layout="wide")
114
+ st.title("🔐 CCTV Intruder Face Detection (MediaPipe + Google Drive)")
115
 
116
+ registered_faces = get_drive_images(REG_FOLDER_ID)
117
+ st.sidebar.header("📌 Registered Faces")
118
+ for img in registered_faces:
119
+ st.sidebar.image(img, width=100)
 
 
 
 
 
 
 
 
120
 
121
+ uploaded_intruders = []
122
+ with st.spinner("🔍 Processing videos..."):
123
+ for folder_id in CCTVFEED_IDS:
124
+ for name, link in get_drive_video_links(folder_id):
125
+ st.subheader(f"🎥 {name}")
126
+ path = download_video(link)
127
+ cap = cv2.VideoCapture(path)
128
+ while cap.isOpened():
129
+ ret, frame = cap.read()
130
+ if not ret:
131
+ break
132
+ detected_faces = faces_from_frame(frame)
133
+ for f in detected_faces:
134
+ st.sidebar.image(f, width=100, caption="🎯 Detected")
135
+ if not any(mse(f, reg) < 200 for reg in registered_faces) and not is_duplicate(f, uploaded_intruders):
136
+ if upload_intruder_image(f):
137
+ uploaded_intruders.append(f)
138
+ cap.release()
139
+ os.remove(path)
140
+ st.success("✅ Done.")