Spaces:
Build error
Build error
| import os | |
| import cv2 | |
| from google.oauth2 import service_account | |
| from googleapiclient.discovery import build | |
| from googleapiclient.http import MediaIoBaseDownload, MediaFileUpload | |
| import io | |
| import time | |
| import torch | |
| import numpy as np | |
| from PIL import Image | |
| from facenet_pytorch import MTCNN, InceptionResnetV1 | |
| import json | |
| # Load Google Drive API credentials from the uploaded JSON file | |
| SERVICE_ACCOUNT_FILE = './.env' # Assuming the uploaded JSON file is named '.env' | |
| SCOPES = ['https://www.googleapis.com/auth/drive'] | |
| # Load credentials from the JSON file | |
| with open(SERVICE_ACCOUNT_FILE, 'r') as json_file: | |
| credentials_info = json.load(json_file) | |
| credentials = service_account.Credentials.from_service_account_info(credentials_info, scopes=SCOPES) | |
| drive_service = build('drive', 'v3', credentials=credentials) | |
| # Folder IDs | |
| aadhar_folder_id = '1Qtb5DYzSFE67Mbb5ZgDIqWUtdJaDD2F4' | |
| cphotos_folder_id = '1DGeRqRbCPcfLDdEgP0h5fyX-MF8EQ8AH' | |
| suspects_folder_id = '1N3RMhVD0OygeufLPYod6IYLtqzvlm3Jv' | |
| # Initialize MTCNN for face detection and InceptionResnetV1 for face recognition | |
| mtcnn = MTCNN(image_size=160, margin=0, min_face_size=20) | |
| resnet = InceptionResnetV1(pretrained='vggface2').eval() | |
| def list_files_in_folder(folder_id): | |
| print(f"Listing files in folder ID: {folder_id}") | |
| query = f"'{folder_id}' in parents and trashed=false" | |
| try: | |
| results = drive_service.files().list(q=query, pageSize=1000, fields="files(id, name, mimeType, owners, parents)").execute() | |
| files = results.get('files', []) | |
| if not files: | |
| print(f"No files found in folder ID: {folder_id}. Ensure that the folder has files and the service account has access.") | |
| else: | |
| print(f"Found {len(files)} files in folder ID: {folder_id}.") | |
| return files | |
| except Exception as e: | |
| print(f"Error listing files in folder ID: {folder_id} - {e}") | |
| return [] | |
| def download_file(file_id, file_name): | |
| print(f"Attempting to download file: {file_name} (ID: {file_id})") | |
| # Add extension if missing (assuming files are images) | |
| if not file_name.lower().endswith(('.jpg', '.jpeg', '.png')): | |
| file_name += ".jpg" | |
| try: | |
| request = drive_service.files().get_media(fileId=file_id) | |
| file_io = io.BytesIO() | |
| downloader = MediaIoBaseDownload(file_io, request) | |
| done = False | |
| while not done: | |
| _, done = downloader.next_chunk() | |
| file_io.seek(0) | |
| with open(file_name, 'wb') as f: | |
| f.write(file_io.read()) | |
| print(f"Successfully downloaded file: {file_name}") | |
| return file_name | |
| except Exception as e: | |
| print(f"Error downloading file ID: {file_id} - {e}") | |
| return None | |
| def verify_and_fix_image(image_path): | |
| try: | |
| with Image.open(image_path) as img: | |
| img.verify() # Verify if the image is corrupt | |
| # Reopen and save to ensure it's clean | |
| with Image.open(image_path) as img: | |
| img.save(image_path) | |
| print(f"Image verified and cleaned: {image_path}") | |
| return True | |
| except Exception as e: | |
| print(f"Image verification failed for {image_path}. Error: {e}") | |
| return False | |
| def upload_file(file_path, folder_id): | |
| print(f"Uploading file: {file_path} to folder ID: {folder_id}") | |
| file_metadata = {'name': os.path.basename(file_path), 'parents': [folder_id]} | |
| media = MediaFileUpload(file_path, resumable=True) | |
| try: | |
| file = drive_service.files().create(body=file_metadata, media_body=media, fields='id').execute() | |
| print(f"Uploaded file ID: {file.get('id')}") | |
| return file.get('id') | |
| except Exception as e: | |
| print(f"Error uploading file {file_path} to folder ID: {folder_id} - {e}") | |
| return None | |
| def extract_face_embedding(image_path): | |
| try: | |
| image = Image.open(image_path) | |
| face = mtcnn(image) # Detect and crop the face | |
| if face is not None: | |
| embedding = resnet(face.unsqueeze(0)) # Get the face embedding | |
| return embedding | |
| else: | |
| print(f"No face detected in image: {image_path}") | |
| return None | |
| except Exception as e: | |
| print(f"Error extracting face embedding for {image_path}. Error: {e}") | |
| return None | |
| def calculate_similarity(embedding1, embedding2): | |
| # Calculate cosine similarity between two embeddings | |
| return torch.nn.functional.cosine_similarity(embedding1, embedding2).item() | |
| def process_images(aadhar_folder_id, cphotos_folder_id, suspects_folder_id): | |
| print("Starting processing of images...") | |
| aadhar_files = list_files_in_folder(aadhar_folder_id) | |
| cphotos_files = list_files_in_folder(cphotos_folder_id) | |
| # Extract embeddings for all Aadhar images | |
| aadhar_embeddings = [] | |
| for file in aadhar_files: | |
| print(f"Processing Aadhar file: {file['name']}") | |
| file_path = download_file(file['id'], file['name']) | |
| if file_path and verify_and_fix_image(file_path): | |
| embedding = extract_face_embedding(file_path) | |
| if embedding is not None: | |
| aadhar_embeddings.append((file['name'], embedding)) | |
| # Compare each CCTV image with all Aadhar embeddings | |
| for file in cphotos_files: | |
| print(f"Processing CCTV file: {file['name']}") | |
| file_path = download_file(file['id'], file['name']) | |
| if not file_path or not verify_and_fix_image(file_path): | |
| continue | |
| embedding_cctv = extract_face_embedding(file_path) | |
| if embedding_cctv is None: | |
| continue | |
| matched = False | |
| for aadhar_name, aadhar_embedding in aadhar_embeddings: | |
| similarity = calculate_similarity(embedding_cctv, aadhar_embedding) | |
| if similarity > 0.8: # Adjust the threshold as necessary | |
| matched = True | |
| print(f"Match found for file: {file['name']} with Aadhar file: {aadhar_name}, Similarity: {similarity:.2f}") | |
| break | |
| if not matched: | |
| upload_file(file_path, suspects_folder_id) | |
| print(f"Unmatched image uploaded: {file['name']}") | |
| print("Processing of images completed.") | |
| def main(): | |
| start_time = time.time() | |
| print("Script started...") | |
| process_images(aadhar_folder_id, cphotos_folder_id, suspects_folder_id) | |
| end_time = time.time() | |
| print(f"Script completed in {end_time - start_time:.2f} seconds.") | |
| if __name__ == "__main__": | |
| main() | |