import os import streamlit as st import firebase_admin from firebase_admin import credentials, db, auth, firestore import face_recognition from PIL import Image import numpy as np import cv2 import dlib from io import BytesIO import requests # Get the current working directory current_directory = os.path.dirname(os.path.abspath(__file__)) skp_path = os.path.join(current_directory, "projectinsta-s-firebase-adminsdk-y6vlu-9a1345f468.json") # Check if the app is already initialized if not firebase_admin._apps: # Initialize Firebase Admin SDK cred = credentials.Certificate(skp_path) firebase_admin.initialize_app(cred, { 'databaseURL': 'https://projectinsta-s-default-rtdb.firebaseio.com/', 'projectId': 'projectinsta-s' }) # Reference to the root of your Firebase Realtime Database ref = db.reference('/') # Initialize Firestore client db_firestore = firestore.client() # Streamlit session state if "auth_state" not in st.session_state: st.session_state.auth_state = { "user": None, "signed_in": False, } # Add the shape predictor model for face alignment shape_predictor_path = "shape_predictor_68_face_landmarks.dat" detector = dlib.get_frontal_face_detector() shape_predictor = dlib.shape_predictor(shape_predictor_path) # Firebase Authentication def authenticate_user(email, password): try: user = auth.get_user_by_email(email) # The user is successfully fetched, meaning the email and password are valid. return True, user except auth.AuthError as e: print(f"Authentication error: {str(e)}") return False, None # Sign-up Functionality def create_user(email, password): try: user = auth.create_user( email=email, password=password ) return True, user.uid except Exception as e: print(f"User creation error: {str(e)}") return False, None # Update load_and_encode function to return encodings for all detected faces def load_and_encode(image_file): try: # Read the uploaded file as bytes image_bytes = image_file.read() # Convert the bytes to a NumPy array nparr = np.frombuffer(image_bytes, np.uint8) # Decode the NumPy array as an image image = cv2.imdecode(nparr, cv2.IMREAD_COLOR) aligned_faces = detect_and_align_faces(image) if aligned_faces is not None: encodings = [] for aligned_face in aligned_faces: encoding = face_recognition.face_encodings(aligned_face) if encoding: encodings.append(encoding[0]) if encodings: return encodings else: return None else: return None except Exception as e: print(f"Error loading and encoding image: {str(e)}") return None # Modify detect_and_align_faces function to detect and align multiple faces def detect_and_align_faces(image): # Resize the image to a fixed width (you can adjust the width as needed) target_width = 800 aspect_ratio = image.shape[1] / image.shape[0] target_height = int(target_width / aspect_ratio) resized_image = cv2.resize(image, (target_width, target_height)) gray = cv2.cvtColor(resized_image, cv2.COLOR_BGR2GRAY) # Detect faces using dlib faces = detector(gray) if not faces: return None aligned_faces = [] for face in faces: # Use dlib for face alignment landmarks = shape_predictor(gray, face) aligned_face = dlib.get_face_chip(resized_image, landmarks, size=256) # Adjust the size as needed aligned_faces.append(aligned_face) return aligned_faces # Add person to database def add_person(name, image_path, instagram_handle, email=None): try: encoding = load_and_encode(image_path) if not encoding: return "No face found in the provided image." # Convert NumPy arrays to lists for JSON serialization encoding = encoding[0].tolist() # Save data to Firebase Realtime Database person_data = { "encoding": encoding, "info": { "instagram_handle": instagram_handle, "instagram_link": f"https://www.instagram.com/{instagram_handle}/" }, "added_by": st.session_state.auth_state["user"].email # Added this line } if email: person_data["info"]["email"] = email ref.child(name).set(person_data) log_action(st.session_state.auth_state["user"].email, "Added person") return f"Success: {name} added to the database!" except Exception as e: return f"Failed to add person: {str(e)}" # Update recognize_face function to handle multiple face encodings def recognize_face(image_path): if not image_path: return "Please upload an image." try: # Assuming load_and_encode function loads and encodes the image unknown_encodings = load_and_encode(image_path) if not unknown_encodings: return "No face found in the provided image." matches = [] for unknown_encoding in unknown_encodings: face_matches = [] for name, data in ref.get().items(): known_encoding = np.array(data["encoding"]) if face_recognition.compare_faces([known_encoding], unknown_encoding)[0]: info = data["info"] instagram_handle = info.get("instagram_handle") email = info.get("email", "Email not provided") # Fetch additional Instagram data if instagram_handle: insta_data = fetch_instagram_data(instagram_handle) if insta_data: edge_followed_by = insta_data.get('edge_followed_by', {}).get('count') edge_follow = insta_data.get('edge_follow', {}).get('count') full_name = insta_data.get('full_name') biography = insta_data.get('biography') is_private = insta_data.get('is_private') profile_pic_url_hd = insta_data.get('profile_pic_url_hd') face_matches.append((name, instagram_handle, email, edge_followed_by, edge_follow, full_name, biography, is_private, profile_pic_url_hd)) else: face_matches.append((name, instagram_handle, email, "N/A", "N/A", "Unknown", "Unknown", "N/A", "N/A")) else: face_matches.append((name, "Unknown", email, "N/A", "N/A", "Unknown", "Unknown", "N/A", "N/A")) if face_matches: matches.extend(face_matches) else: matches.append(("Unknown", "Unknown", "Unknown", "N/A", "N/A", "Unknown", "Unknown", "N/A", "N/A")) if matches: results = [] for name, insta_handle, email, edge_followed_by, edge_follow, full_name, biography, is_private, profile_pic_url_hd in matches: insta_link = f"https://www.instagram.com/{insta_handle}/" insta_link_html = f'{insta_handle}' results.append(f"- It's a picture of {name}! Insta handle: {insta_link_html}, Email: {email}, Followers: {edge_followed_by}, Following: {edge_follow}, Full Name: {full_name}, Biography: {biography}, Private: {is_private}, Profile Picture: ") log_action(st.session_state.auth_state["user"].email, "Recognized face") return "\n".join(results) else: return "Face not found in the database." except Exception as e: return f"Failed to recognize face: {str(e)}" def fetch_instagram_data(instagram_handle): url = f"https://instagram-scraper-20231.p.rapidapi.com/userinfo/{instagram_handle}" headers = { "x-rapidapi-key": "f47a0f4918msha715fc855e591a2p170a28jsnfc038903d424", "x-rapidapi-host": "instagram-scraper-20231.p.rapidapi.com" } try: response = requests.get(url, headers=headers) if response.status_code == 200: data = response.json() if data.get('status') == 'success': return data.get('data', {}) else: return None else: return None except Exception as e: print(f"Error fetching Instagram data: {str(e)}") return None # Update recognize_face_optimal function to handle multiple face encodings def recognize_face_optimal(image_path): if not image_path: return "Please upload an image." try: unknown_encodings = load_and_encode(image_path) if not unknown_encodings: return "No face found in the provided image." matches = [] for unknown_encoding in unknown_encodings: face_matches = [] for name, data in ref.get().items(): known_encoding = np.array(data["encoding"]) similarity_score = face_recognition.face_distance([known_encoding], unknown_encoding)[0] if similarity_score > 0.50: # Only consider matches above 50.00% similarity continue face_matches.append((name, similarity_score)) if face_matches: best_match = min(face_matches, key=lambda x: x[1]) best_name, best_score = best_match info = ref.child(best_name).child("info").get() insta_handle = info["instagram_handle"] insta_link = info["instagram_link"] insta_link_html = f'{insta_handle}' matches.append(f"Best match: {best_name} with a similarity score of {1 - best_score:.2%}. Insta handle: {insta_link_html}") else: matches.append("Face not found in the database.") return "\n".join(matches) except Exception as e: return f"Failed to recognize face: {str(e)}" # Delete person from database def delete_person(name): try: ref.child(name).delete() log_action(st.session_state.auth_state["user"].email, "Deleted person") return f"{name} deleted from the database!" except Exception as e: return f"Failed to delete person: {str(e)}" # Delete user from Firebase Authentication def delete_user(email, password): # Authenticate user with provided email and password try: user = auth.get_user_by_email(email) if user: # Delete user if authentication is successful auth.delete_user(user.uid) return "User deleted successfully!" except Exception as e: return f"Failed to delete user: {str(e)}" # Send feedback to Firebase def send_feedback(feedback_data): try: db_firestore.collection('feedback').add(feedback_data) except Exception as e: st.error(f"Failed to submit feedback: {str(e)}") # Streamlit interface for adding a person def add_person_ui(): st.title("😎 Add Person") name = st.text_input("Enter Name", help="Enter the name of the person") image_path = st.file_uploader("Upload Image", help="Upload an image containing the person's face") email = st.text_input("Enter Email (Optional)", help="Enter the person's email address (optional)") instagram_handle = st.text_input("Enter Instagram Handle", help="Enter the person's Instagram handle") if st.button("Add Person"): if not name or not image_path or not instagram_handle: st.error("Please fill all the required fields.") else: result = add_person(name, image_path, instagram_handle, email) st.success(result) # Streamlit interface for recognizing face def recognize_face_ui(): st.title("πŸ” Recognize Face") image_path = st.file_uploader("Upload Image", help="Upload an image for face recognition") if st.button("Recognize Face"): result = recognize_face(image_path) st.write(result, unsafe_allow_html=True) def recognize_face_optimal_ui(): st.title("πŸ” Recognize Face (Optimal)") image_path = st.file_uploader("Upload Image", help="Upload an image for optimal face recognition") if st.button("Recognize Face (Optimal)"): result = recognize_face_optimal(image_path) st.write(result, unsafe_allow_html=True) # Streamlit interface for deleting a person def delete_person_ui(): st.title("πŸ—‘οΈ Delete Person") name = st.text_input("Enter Name", help="Enter the name of the person to delete") if st.button("Delete Person"): if not name: st.error("Please enter a name.") else: # Check if the person exists in the database if name not in ref.get().keys(): st.error("Person not found in the database.") return # Check if the person was added by the currently signed-in user person_data = ref.child(name).get() if person_data["added_by"] != st.session_state.auth_state["user"].email: st.error("You can only delete the person added by you.") return result = delete_person(name) st.success(result) # Streamlit interface for feedback def feedback_ui(): st.title("✍️ Feedback") st.write("Your feedback is important to us! Please fill out the form below:") name = st.text_input("Name (optional)") email = st.text_input("Email (optional)") category = st.selectbox("Category", ["Bug Report", "Feature Request", "General Feedback"]) message = st.text_area("Feedback Message") if st.button("Submit Feedback"): if not message: st.error("Please enter your feedback message.") else: feedback_data = { "name": name, "email": email, "category": category, "message": message, } send_feedback(feedback_data) st.success("Feedback submitted successfully! Thank you for your feedback.") #section for messaging # Function to send a message def send_message(sender_email, receiver_email, message_content): try: # Add message to Firestore db_firestore.collection('messages').add({ 'sender_email': sender_email, 'receiver_email': receiver_email, 'message_content': message_content, 'timestamp': firestore.SERVER_TIMESTAMP }) return "Message sent successfully!" except Exception as e: return f"Failed to send message: {str(e)}" # Function to retrieve messages for a user def get_messages(user_email): try: messages = db_firestore.collection('messages').where('receiver_email', '==', user_email).order_by('timestamp', direction=firestore.Query.DESCENDING).stream() return messages except Exception as e: return None # Streamlit interface for messaging def messaging_ui(): st.title("πŸ’¬ Messaging") if st.session_state.auth_state["signed_in"]: sender_email = st.session_state.auth_state["user"].email receiver_email = st.text_input("Receiver's Email", help="Enter the receiver's email address") message_content = st.text_area("Message Content") if st.button("Send Message"): result = send_message(sender_email, receiver_email, message_content) st.write(result) messages = get_messages(sender_email) if messages: st.write("Your messages:") for message in messages: message_data = message.to_dict() st.write(f"From: {message_data['sender_email']}") st.write(f"Message: {message_data['message_content']}") st.write("---") else: st.write("Please sign in to send and view messages.") #end of messaging section # History section def log_action(user_email, action): try: db_firestore.collection('history').add({ 'user_email': user_email, 'action': action, 'timestamp': firestore.SERVER_TIMESTAMP }) except Exception as e: st.error(f"Failed to log action: {str(e)}") # Display history of actions taken by the user def display_history(user_email): st.title("πŸ“œ History") st.write("Here is the history of actions taken by you:") try: history = db_firestore.collection('history').where('user_email', '==', user_email).order_by('timestamp', direction=firestore.Query.DESCENDING).stream() for entry in history: entry_data = entry.to_dict() action = entry_data['action'] timestamp = entry_data['timestamp'] st.write(f"- {action} at {timestamp}") except Exception as e: st.error(f"Failed to retrieve history: {str(e)}") #End of history section # Streamlit interface for Deleting user def delete_user_ui(): st.title("Delete User") email = st.text_input("Enter User's Email", help="Enter the email of the user to delete") password = st.text_input("Enter Your Password", type="password", help="Enter your password for confirmation") if st.button("Delete User"): if not email or not password: st.error("Please enter the user's email and your password.") else: # Display confirmation pop-up confirmation = st.checkbox("I confirm that I want to delete this user.") if confirmation: result = delete_user(email, password) st.success(result) def tour_guide_ui(): st.title("πŸ—ΊοΈ Tour Guide") st.markdown("This tour will guide you through the application.") for step in steps: with st.expander(step["title"]): st.write(step["content"]) def authenticate_user_ui(): # Display logo and title c30, c31, c32 = st.columns([0.2, 0.1, 3]) with c30: st.caption("") # Display the logo logo_path = os.path.join(current_directory, "Explore+.png") logo = Image.open(logo_path) st.image(logo, width=60) with c32: st.title("Insta's EYE") st.sidebar.title("Options") if st.session_state.auth_state["signed_in"]: st.sidebar.button("Sign Out", on_click=logout) st.title("Welcome!") main() else: option = st.sidebar.radio("Select Option", ["Login", "Sign-Up"]) email = st.text_input("Enter Email", help="Enter your email address") password = st.text_input("Enter Password", type="password", help="Enter your password") if option == "Login": if st.button("Login"): if not email or not password: st.error("Please enter both email and password.") else: success, user = authenticate_user(email, password) if success: st.session_state.auth_state["user"] = user st.session_state.auth_state["signed_in"] = True st.success("Authentication successful! You can now manage your set of images and profiles.") main() else: st.error("Authentication failed. Please check your email and password.") elif option == "Sign-Up": confirm_password = st.text_input("Confirm Password", type="password", help="Re-enter your password for confirmation") if st.button("Sign-Up"): if not email or not password or not confirm_password: st.error("Please fill all the fields.") elif password != confirm_password: st.error("Passwords do not match.") else: success, uid = create_user(email, password) if success: st.success(f"User with UID: {uid} created successfully! You can now log in.") else: st.error("User creation failed. Please try again.") # Log out user def logout(): st.session_state.auth_state["user"] = None st.session_state.auth_state["signed_in"] = False # Define tour steps steps = [ { "title": "Welcome to Insta's EYE", "content": "This is a tour guide to help you navigate through the application.", }, { "title": "Options Sidebar", "content": "Here you can select different options such as adding a person, recognizing a face, deleting a person, or recognizing a face with optimal identification.", }, { "title": "Recognize face", "content": "This function will return you Id's for your provided face.", }, { "title": "Recognize face(optimal)", "content": "This function will return you Id's of provided face with more accuracy", }, { "title": "Add person", "content": "Here you can add yourself or someone else too", }, { "title": "Delete person", "content": "Here you can delete the person by entering their name", }, { "title": "History", "content": "Here you can track history of your activities", }, { "title": "Upload Image", "content": "You can upload an image here for face recognition or adding a person.", }, { "title": "Text Input", "content": "Enter text here such as the person's name or Instagram handle.", }, { "title": "Buttons", "content": "Click on these buttons to perform actions like adding a person or recognizing a face.", }, ] # Function to display tour steps def display_tour_steps(steps): st.markdown("# Tour Guide") st.markdown("This tour will guide you through the application.") st.markdown("---") for step in steps: st.markdown(f"## {step['title']}") st.write(step['content']) st.markdown("---") # Update the main function to include the feedback option def main(): st.sidebar.title("Options") option = st.sidebar.radio("Select Option", ["Recognize Face", "Add Person", "Recognize Face (Optimal)", "Delete Person", "Tour Guide", "Feedback", "Messaging", "History", "Delete User"]) if option == "Recognize Face": recognize_face_ui() elif option == "Recognize Face (Optimal)": recognize_face_optimal_ui() elif option == "Add Person": add_person_ui() elif option == "Delete Person": delete_person_ui() elif option == "Tour Guide": tour_guide_ui() elif option == "Feedback": feedback_ui() elif option == "Messaging": messaging_ui() elif option == "History": display_history(st.session_state.auth_state["user"].email) elif option == "Delete User": delete_user_ui() # Run the tour guide if __name__ == "__main__": authenticate_user_ui()