Spaces:
Sleeping
Sleeping
Update app.py
Browse files
app.py
CHANGED
|
@@ -19,15 +19,15 @@ face_detector = mp_face_detection.FaceDetection(model_selection=0, min_detection
|
|
| 19 |
|
| 20 |
# === UTILS ===
|
| 21 |
def get_drive_files(folder_id, mime_prefix="image/"):
|
| 22 |
-
|
| 23 |
-
r = requests.get(
|
| 24 |
files = r.json().get("files", [])
|
| 25 |
return [f for f in files if f["mimeType"].startswith(mime_prefix)]
|
| 26 |
|
| 27 |
def get_image_from_drive(file_id):
|
| 28 |
url = f"https://drive.google.com/uc?id={file_id}&export=download"
|
| 29 |
-
|
| 30 |
-
img_array = np.asarray(bytearray(
|
| 31 |
img = cv2.imdecode(img_array, cv2.IMREAD_COLOR)
|
| 32 |
return img
|
| 33 |
|
|
@@ -39,7 +39,8 @@ def extract_face_embeddings(img):
|
|
| 39 |
for det in results.detections:
|
| 40 |
bbox = det.location_data.relative_bounding_box
|
| 41 |
h, w, _ = img.shape
|
| 42 |
-
x, y
|
|
|
|
| 43 |
face_crop = img[max(0, y):y+bh, max(0, x):x+bw]
|
| 44 |
if face_crop.size > 0:
|
| 45 |
resized = cv2.resize(face_crop, (128, 128))
|
|
@@ -56,37 +57,44 @@ def compare_faces(face1, face2):
|
|
| 56 |
def upload_intruder_image(img_array, name_hint="intruder"):
|
| 57 |
_, buffer = cv2.imencode('.jpg', img_array)
|
| 58 |
img_bytes = buffer.tobytes()
|
| 59 |
-
|
| 60 |
upload_url = f"https://www.googleapis.com/upload/drive/v3/files?uploadType=multipart&key={API_KEY}"
|
|
|
|
| 61 |
metadata = {
|
| 62 |
"name": f"{name_hint}.jpg",
|
| 63 |
"parents": [INTRUDER_FOLDER_ID]
|
| 64 |
}
|
|
|
|
| 65 |
boundary = "foo_bar_baz"
|
| 66 |
-
headers = {
|
|
|
|
|
|
|
| 67 |
|
| 68 |
-
|
| 69 |
-
|
|
|
|
| 70 |
"Content-Type: application/json; charset=UTF-8\r\n\r\n"
|
| 71 |
-
|
| 72 |
-
|
| 73 |
"Content-Type: image/jpeg\r\n\r\n"
|
| 74 |
-
).encode("utf-8")
|
|
|
|
|
|
|
|
|
|
| 75 |
|
| 76 |
r = requests.post(upload_url, headers=headers, data=multipart_body)
|
| 77 |
return r.status_code == 200
|
| 78 |
|
| 79 |
def download_video(link):
|
| 80 |
-
|
| 81 |
temp = tempfile.NamedTemporaryFile(delete=False, suffix=".mp4")
|
| 82 |
-
for chunk in
|
| 83 |
temp.write(chunk)
|
| 84 |
temp.close()
|
| 85 |
return temp.name
|
| 86 |
|
| 87 |
# === MAIN ===
|
| 88 |
st.title("🔐 Intruder Detection from CCTV Feed")
|
| 89 |
-
st.write("Streaming videos from Google Drive, detecting faces using MediaPipe, and uploading unmatched
|
| 90 |
|
| 91 |
# Load registered faces
|
| 92 |
st.sidebar.header("✅ Registered Faces")
|
|
@@ -103,8 +111,8 @@ for f in reg_files:
|
|
| 103 |
with st.spinner("Processing CCTV videos..."):
|
| 104 |
total_detections = 0
|
| 105 |
for folder_id in CCTVFEED_IDS:
|
| 106 |
-
|
| 107 |
-
response = requests.get(
|
| 108 |
files = response.json().get("files", [])
|
| 109 |
for f in files:
|
| 110 |
if f["mimeType"].startswith("video/"):
|
|
@@ -122,12 +130,12 @@ with st.spinner("Processing CCTV videos..."):
|
|
| 122 |
for face in faces:
|
| 123 |
st.sidebar.image(cv2.cvtColor(face, cv2.COLOR_BGR2RGB), caption="Detected", width=100)
|
| 124 |
distances = [compare_faces(face, reg_face) for reg_face in registered_faces]
|
| 125 |
-
if not distances or min(distances) > 45: #
|
| 126 |
uploaded = upload_intruder_image(face, f"intruder_{f['name']}")
|
| 127 |
if uploaded:
|
| 128 |
st.warning("⚠️ Intruder uploaded to Google Drive!")
|
| 129 |
else:
|
| 130 |
-
st.error("Failed to upload intruder
|
| 131 |
found_intruder = True
|
| 132 |
cap.release()
|
| 133 |
os.remove(video_file)
|
|
|
|
| 19 |
|
| 20 |
# === UTILS ===
|
| 21 |
def get_drive_files(folder_id, mime_prefix="image/"):
|
| 22 |
+
url = f"https://www.googleapis.com/drive/v3/files?q='{folder_id}'+in+parents&key={API_KEY}&fields=files(id,name,mimeType)"
|
| 23 |
+
r = requests.get(url)
|
| 24 |
files = r.json().get("files", [])
|
| 25 |
return [f for f in files if f["mimeType"].startswith(mime_prefix)]
|
| 26 |
|
| 27 |
def get_image_from_drive(file_id):
|
| 28 |
url = f"https://drive.google.com/uc?id={file_id}&export=download"
|
| 29 |
+
r = requests.get(url)
|
| 30 |
+
img_array = np.asarray(bytearray(r.content), dtype=np.uint8)
|
| 31 |
img = cv2.imdecode(img_array, cv2.IMREAD_COLOR)
|
| 32 |
return img
|
| 33 |
|
|
|
|
| 39 |
for det in results.detections:
|
| 40 |
bbox = det.location_data.relative_bounding_box
|
| 41 |
h, w, _ = img.shape
|
| 42 |
+
x, y = int(bbox.xmin * w), int(bbox.ymin * h)
|
| 43 |
+
bw, bh = int(bbox.width * w), int(bbox.height * h)
|
| 44 |
face_crop = img[max(0, y):y+bh, max(0, x):x+bw]
|
| 45 |
if face_crop.size > 0:
|
| 46 |
resized = cv2.resize(face_crop, (128, 128))
|
|
|
|
| 57 |
def upload_intruder_image(img_array, name_hint="intruder"):
|
| 58 |
_, buffer = cv2.imencode('.jpg', img_array)
|
| 59 |
img_bytes = buffer.tobytes()
|
|
|
|
| 60 |
upload_url = f"https://www.googleapis.com/upload/drive/v3/files?uploadType=multipart&key={API_KEY}"
|
| 61 |
+
|
| 62 |
metadata = {
|
| 63 |
"name": f"{name_hint}.jpg",
|
| 64 |
"parents": [INTRUDER_FOLDER_ID]
|
| 65 |
}
|
| 66 |
+
|
| 67 |
boundary = "foo_bar_baz"
|
| 68 |
+
headers = {
|
| 69 |
+
"Content-Type": f"multipart/related; boundary={boundary}"
|
| 70 |
+
}
|
| 71 |
|
| 72 |
+
meta_json = str(metadata).replace("'", '"')
|
| 73 |
+
body_prefix = (
|
| 74 |
+
"--" + boundary + "\r\n"
|
| 75 |
"Content-Type: application/json; charset=UTF-8\r\n\r\n"
|
| 76 |
+
+ meta_json + "\r\n"
|
| 77 |
+
"--" + boundary + "\r\n"
|
| 78 |
"Content-Type: image/jpeg\r\n\r\n"
|
| 79 |
+
).encode("utf-8")
|
| 80 |
+
|
| 81 |
+
body_suffix = ("\r\n--" + boundary + "--").encode("utf-8")
|
| 82 |
+
multipart_body = body_prefix + img_bytes + body_suffix
|
| 83 |
|
| 84 |
r = requests.post(upload_url, headers=headers, data=multipart_body)
|
| 85 |
return r.status_code == 200
|
| 86 |
|
| 87 |
def download_video(link):
|
| 88 |
+
r = requests.get(link, stream=True)
|
| 89 |
temp = tempfile.NamedTemporaryFile(delete=False, suffix=".mp4")
|
| 90 |
+
for chunk in r.iter_content(chunk_size=1024*1024):
|
| 91 |
temp.write(chunk)
|
| 92 |
temp.close()
|
| 93 |
return temp.name
|
| 94 |
|
| 95 |
# === MAIN ===
|
| 96 |
st.title("🔐 Intruder Detection from CCTV Feed")
|
| 97 |
+
st.write("Streaming videos from Google Drive, detecting faces using MediaPipe, and uploading unmatched intruders.")
|
| 98 |
|
| 99 |
# Load registered faces
|
| 100 |
st.sidebar.header("✅ Registered Faces")
|
|
|
|
| 111 |
with st.spinner("Processing CCTV videos..."):
|
| 112 |
total_detections = 0
|
| 113 |
for folder_id in CCTVFEED_IDS:
|
| 114 |
+
url = f"https://www.googleapis.com/drive/v3/files?q='{folder_id}'+in+parents&key={API_KEY}&fields=files(id,name,mimeType)"
|
| 115 |
+
response = requests.get(url)
|
| 116 |
files = response.json().get("files", [])
|
| 117 |
for f in files:
|
| 118 |
if f["mimeType"].startswith("video/"):
|
|
|
|
| 130 |
for face in faces:
|
| 131 |
st.sidebar.image(cv2.cvtColor(face, cv2.COLOR_BGR2RGB), caption="Detected", width=100)
|
| 132 |
distances = [compare_faces(face, reg_face) for reg_face in registered_faces]
|
| 133 |
+
if not distances or min(distances) > 45: # Distance threshold
|
| 134 |
uploaded = upload_intruder_image(face, f"intruder_{f['name']}")
|
| 135 |
if uploaded:
|
| 136 |
st.warning("⚠️ Intruder uploaded to Google Drive!")
|
| 137 |
else:
|
| 138 |
+
st.error("Failed to upload intruder.")
|
| 139 |
found_intruder = True
|
| 140 |
cap.release()
|
| 141 |
os.remove(video_file)
|