import os
import streamlit as st
import firebase_admin
from firebase_admin import credentials, db, auth, firestore
import face_recognition
from PIL import Image
import numpy as np
import cv2
import dlib
from io import BytesIO
import requests
# Get the current working directory
current_directory = os.path.dirname(os.path.abspath(__file__))
skp_path = os.path.join(current_directory, "projectinsta-s-firebase-adminsdk-y6vlu-9a1345f468.json")
# Check if the app is already initialized
if not firebase_admin._apps:
# Initialize Firebase Admin SDK
cred = credentials.Certificate(skp_path)
firebase_admin.initialize_app(cred, {
'databaseURL': 'https://projectinsta-s-default-rtdb.firebaseio.com/',
'projectId': 'projectinsta-s'
})
# Reference to the root of your Firebase Realtime Database
ref = db.reference('/')
# Initialize Firestore client
db_firestore = firestore.client()
# Streamlit session state
if "auth_state" not in st.session_state:
st.session_state.auth_state = {
"user": None,
"signed_in": False,
}
# Add the shape predictor model for face alignment
shape_predictor_path = "shape_predictor_68_face_landmarks.dat"
detector = dlib.get_frontal_face_detector()
shape_predictor = dlib.shape_predictor(shape_predictor_path)
# Firebase Authentication
def authenticate_user(email, password):
try:
user = auth.get_user_by_email(email)
# The user is successfully fetched, meaning the email and password are valid.
return True, user
except auth.AuthError as e:
print(f"Authentication error: {str(e)}")
return False, None
# Sign-up Functionality
def create_user(email, password):
try:
user = auth.create_user(
email=email,
password=password
)
return True, user.uid
except Exception as e:
print(f"User creation error: {str(e)}")
return False, None
# Update load_and_encode function to return encodings for all detected faces
def load_and_encode(image_file):
try:
# Read the uploaded file as bytes
image_bytes = image_file.read()
# Convert the bytes to a NumPy array
nparr = np.frombuffer(image_bytes, np.uint8)
# Decode the NumPy array as an image
image = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
aligned_faces = detect_and_align_faces(image)
if aligned_faces is not None:
encodings = []
for aligned_face in aligned_faces:
encoding = face_recognition.face_encodings(aligned_face)
if encoding:
encodings.append(encoding[0])
if encodings:
return encodings
else:
return None
else:
return None
except Exception as e:
print(f"Error loading and encoding image: {str(e)}")
return None
# Modify detect_and_align_faces function to detect and align multiple faces
def detect_and_align_faces(image):
# Resize the image to a fixed width (you can adjust the width as needed)
target_width = 800
aspect_ratio = image.shape[1] / image.shape[0]
target_height = int(target_width / aspect_ratio)
resized_image = cv2.resize(image, (target_width, target_height))
gray = cv2.cvtColor(resized_image, cv2.COLOR_BGR2GRAY)
# Detect faces using dlib
faces = detector(gray)
if not faces:
return None
aligned_faces = []
for face in faces:
# Use dlib for face alignment
landmarks = shape_predictor(gray, face)
aligned_face = dlib.get_face_chip(resized_image, landmarks, size=256) # Adjust the size as needed
aligned_faces.append(aligned_face)
return aligned_faces
# Add person to database
def add_person(name, image_path, instagram_handle, email=None):
try:
encoding = load_and_encode(image_path)
if not encoding:
return "No face found in the provided image."
# Convert NumPy arrays to lists for JSON serialization
encoding = encoding[0].tolist()
# Save data to Firebase Realtime Database
person_data = {
"encoding": encoding,
"info": {
"instagram_handle": instagram_handle,
"instagram_link": f"https://www.instagram.com/{instagram_handle}/"
},
"added_by": st.session_state.auth_state["user"].email # Added this line
}
if email:
person_data["info"]["email"] = email
ref.child(name).set(person_data)
log_action(st.session_state.auth_state["user"].email, "Added person")
return f"Success: {name} added to the database!"
except Exception as e:
return f"Failed to add person: {str(e)}"
# Update recognize_face function to handle multiple face encodings
def recognize_face(image_path):
if not image_path:
return "Please upload an image."
try:
# Assuming load_and_encode function loads and encodes the image
unknown_encodings = load_and_encode(image_path)
if not unknown_encodings:
return "No face found in the provided image."
matches = []
for unknown_encoding in unknown_encodings:
face_matches = []
for name, data in ref.get().items():
known_encoding = np.array(data["encoding"])
if face_recognition.compare_faces([known_encoding], unknown_encoding)[0]:
info = data["info"]
instagram_handle = info.get("instagram_handle")
email = info.get("email", "Email not provided")
# Fetch additional Instagram data
if instagram_handle:
insta_data = fetch_instagram_data(instagram_handle)
if insta_data:
edge_followed_by = insta_data.get('edge_followed_by', {}).get('count')
edge_follow = insta_data.get('edge_follow', {}).get('count')
full_name = insta_data.get('full_name')
biography = insta_data.get('biography')
is_private = insta_data.get('is_private')
profile_pic_url_hd = insta_data.get('profile_pic_url_hd')
face_matches.append((name, instagram_handle, email, edge_followed_by, edge_follow, full_name, biography, is_private, profile_pic_url_hd))
else:
face_matches.append((name, instagram_handle, email, "N/A", "N/A", "Unknown", "Unknown", "N/A", "N/A"))
else:
face_matches.append((name, "Unknown", email, "N/A", "N/A", "Unknown", "Unknown", "N/A", "N/A"))
if face_matches:
matches.extend(face_matches)
else:
matches.append(("Unknown", "Unknown", "Unknown", "N/A", "N/A", "Unknown", "Unknown", "N/A", "N/A"))
if matches:
results = []
for name, insta_handle, email, edge_followed_by, edge_follow, full_name, biography, is_private, profile_pic_url_hd in matches:
insta_link = f"https://www.instagram.com/{insta_handle}/"
insta_link_html = f'{insta_handle}'
results.append(f"- It's a picture of {name}! Insta handle: {insta_link_html}, Email: {email}, Followers: {edge_followed_by}, Following: {edge_follow}, Full Name: {full_name}, Biography: {biography}, Private: {is_private}, Profile Picture:
")
log_action(st.session_state.auth_state["user"].email, "Recognized face")
return "\n".join(results)
else:
return "Face not found in the database."
except Exception as e:
return f"Failed to recognize face: {str(e)}"
def fetch_instagram_data(instagram_handle):
url = f"https://instagram-scraper-20231.p.rapidapi.com/userinfo/{instagram_handle}"
headers = {
"x-rapidapi-key": "f47a0f4918msha715fc855e591a2p170a28jsnfc038903d424",
"x-rapidapi-host": "instagram-scraper-20231.p.rapidapi.com"
}
try:
response = requests.get(url, headers=headers)
if response.status_code == 200:
data = response.json()
if data.get('status') == 'success':
return data.get('data', {})
else:
return None
else:
return None
except Exception as e:
print(f"Error fetching Instagram data: {str(e)}")
return None
# Update recognize_face_optimal function to handle multiple face encodings
def recognize_face_optimal(image_path):
if not image_path:
return "Please upload an image."
try:
unknown_encodings = load_and_encode(image_path)
if not unknown_encodings:
return "No face found in the provided image."
matches = []
for unknown_encoding in unknown_encodings:
face_matches = []
for name, data in ref.get().items():
known_encoding = np.array(data["encoding"])
similarity_score = face_recognition.face_distance([known_encoding], unknown_encoding)[0]
if similarity_score > 0.50: # Only consider matches above 50.00% similarity
continue
face_matches.append((name, similarity_score))
if face_matches:
best_match = min(face_matches, key=lambda x: x[1])
best_name, best_score = best_match
info = ref.child(best_name).child("info").get()
insta_handle = info["instagram_handle"]
insta_link = info["instagram_link"]
insta_link_html = f'{insta_handle}'
matches.append(f"Best match: {best_name} with a similarity score of {1 - best_score:.2%}. Insta handle: {insta_link_html}")
else:
matches.append("Face not found in the database.")
return "\n".join(matches)
except Exception as e:
return f"Failed to recognize face: {str(e)}"
# Delete person from database
def delete_person(name):
try:
ref.child(name).delete()
log_action(st.session_state.auth_state["user"].email, "Deleted person")
return f"{name} deleted from the database!"
except Exception as e:
return f"Failed to delete person: {str(e)}"
# Delete user from Firebase Authentication
def delete_user(email, password):
# Authenticate user with provided email and password
try:
user = auth.get_user_by_email(email)
if user:
# Delete user if authentication is successful
auth.delete_user(user.uid)
return "User deleted successfully!"
except Exception as e:
return f"Failed to delete user: {str(e)}"
# Send feedback to Firebase
def send_feedback(feedback_data):
try:
db_firestore.collection('feedback').add(feedback_data)
except Exception as e:
st.error(f"Failed to submit feedback: {str(e)}")
# Streamlit interface for adding a person
def add_person_ui():
st.title("π Add Person")
name = st.text_input("Enter Name", help="Enter the name of the person")
image_path = st.file_uploader("Upload Image", help="Upload an image containing the person's face")
email = st.text_input("Enter Email (Optional)", help="Enter the person's email address (optional)")
instagram_handle = st.text_input("Enter Instagram Handle", help="Enter the person's Instagram handle")
if st.button("Add Person"):
if not name or not image_path or not instagram_handle:
st.error("Please fill all the required fields.")
else:
result = add_person(name, image_path, instagram_handle, email)
st.success(result)
# Streamlit interface for recognizing face
def recognize_face_ui():
st.title("π Recognize Face")
image_path = st.file_uploader("Upload Image", help="Upload an image for face recognition")
if st.button("Recognize Face"):
result = recognize_face(image_path)
st.write(result, unsafe_allow_html=True)
def recognize_face_optimal_ui():
st.title("π Recognize Face (Optimal)")
image_path = st.file_uploader("Upload Image", help="Upload an image for optimal face recognition")
if st.button("Recognize Face (Optimal)"):
result = recognize_face_optimal(image_path)
st.write(result, unsafe_allow_html=True)
# Streamlit interface for deleting a person
def delete_person_ui():
st.title("ποΈ Delete Person")
name = st.text_input("Enter Name", help="Enter the name of the person to delete")
if st.button("Delete Person"):
if not name:
st.error("Please enter a name.")
else:
# Check if the person exists in the database
if name not in ref.get().keys():
st.error("Person not found in the database.")
return
# Check if the person was added by the currently signed-in user
person_data = ref.child(name).get()
if person_data["added_by"] != st.session_state.auth_state["user"].email:
st.error("You can only delete the person added by you.")
return
result = delete_person(name)
st.success(result)
# Streamlit interface for feedback
def feedback_ui():
st.title("βοΈ Feedback")
st.write("Your feedback is important to us! Please fill out the form below:")
name = st.text_input("Name (optional)")
email = st.text_input("Email (optional)")
category = st.selectbox("Category", ["Bug Report", "Feature Request", "General Feedback"])
message = st.text_area("Feedback Message")
if st.button("Submit Feedback"):
if not message:
st.error("Please enter your feedback message.")
else:
feedback_data = {
"name": name,
"email": email,
"category": category,
"message": message,
}
send_feedback(feedback_data)
st.success("Feedback submitted successfully! Thank you for your feedback.")
#section for messaging
# Function to send a message
def send_message(sender_email, receiver_email, message_content):
try:
# Add message to Firestore
db_firestore.collection('messages').add({
'sender_email': sender_email,
'receiver_email': receiver_email,
'message_content': message_content,
'timestamp': firestore.SERVER_TIMESTAMP
})
return "Message sent successfully!"
except Exception as e:
return f"Failed to send message: {str(e)}"
# Function to retrieve messages for a user
def get_messages(user_email):
try:
messages = db_firestore.collection('messages').where('receiver_email', '==', user_email).order_by('timestamp', direction=firestore.Query.DESCENDING).stream()
return messages
except Exception as e:
return None
# Streamlit interface for messaging
def messaging_ui():
st.title("π¬ Messaging")
if st.session_state.auth_state["signed_in"]:
sender_email = st.session_state.auth_state["user"].email
receiver_email = st.text_input("Receiver's Email", help="Enter the receiver's email address")
message_content = st.text_area("Message Content")
if st.button("Send Message"):
result = send_message(sender_email, receiver_email, message_content)
st.write(result)
messages = get_messages(sender_email)
if messages:
st.write("Your messages:")
for message in messages:
message_data = message.to_dict()
st.write(f"From: {message_data['sender_email']}")
st.write(f"Message: {message_data['message_content']}")
st.write("---")
else:
st.write("Please sign in to send and view messages.")
#end of messaging section
# History section
def log_action(user_email, action):
try:
db_firestore.collection('history').add({
'user_email': user_email,
'action': action,
'timestamp': firestore.SERVER_TIMESTAMP
})
except Exception as e:
st.error(f"Failed to log action: {str(e)}")
# Display history of actions taken by the user
def display_history(user_email):
st.title("π History")
st.write("Here is the history of actions taken by you:")
try:
history = db_firestore.collection('history').where('user_email', '==', user_email).order_by('timestamp', direction=firestore.Query.DESCENDING).stream()
for entry in history:
entry_data = entry.to_dict()
action = entry_data['action']
timestamp = entry_data['timestamp']
st.write(f"- {action} at {timestamp}")
except Exception as e:
st.error(f"Failed to retrieve history: {str(e)}")
#End of history section
# Streamlit interface for Deleting user
def delete_user_ui():
st.title("Delete User")
email = st.text_input("Enter User's Email", help="Enter the email of the user to delete")
password = st.text_input("Enter Your Password", type="password", help="Enter your password for confirmation")
if st.button("Delete User"):
if not email or not password:
st.error("Please enter the user's email and your password.")
else:
# Display confirmation pop-up
confirmation = st.checkbox("I confirm that I want to delete this user.")
if confirmation:
result = delete_user(email, password)
st.success(result)
def tour_guide_ui():
st.title("πΊοΈ Tour Guide")
st.markdown("This tour will guide you through the application.")
for step in steps:
with st.expander(step["title"]):
st.write(step["content"])
def authenticate_user_ui():
# Display logo and title
c30, c31, c32 = st.columns([0.2, 0.1, 3])
with c30:
st.caption("")
# Display the logo
logo_path = os.path.join(current_directory, "Explore+.png")
logo = Image.open(logo_path)
st.image(logo, width=60)
with c32:
st.title("Insta's EYE")
st.sidebar.title("Options")
if st.session_state.auth_state["signed_in"]:
st.sidebar.button("Sign Out", on_click=logout)
st.title("Welcome!")
main()
else:
option = st.sidebar.radio("Select Option", ["Login", "Sign-Up"])
email = st.text_input("Enter Email", help="Enter your email address")
password = st.text_input("Enter Password", type="password", help="Enter your password")
if option == "Login":
if st.button("Login"):
if not email or not password:
st.error("Please enter both email and password.")
else:
success, user = authenticate_user(email, password)
if success:
st.session_state.auth_state["user"] = user
st.session_state.auth_state["signed_in"] = True
st.success("Authentication successful! You can now manage your set of images and profiles.")
main()
else:
st.error("Authentication failed. Please check your email and password.")
elif option == "Sign-Up":
confirm_password = st.text_input("Confirm Password", type="password", help="Re-enter your password for confirmation")
if st.button("Sign-Up"):
if not email or not password or not confirm_password:
st.error("Please fill all the fields.")
elif password != confirm_password:
st.error("Passwords do not match.")
else:
success, uid = create_user(email, password)
if success:
st.success(f"User with UID: {uid} created successfully! You can now log in.")
else:
st.error("User creation failed. Please try again.")
# Log out user
def logout():
st.session_state.auth_state["user"] = None
st.session_state.auth_state["signed_in"] = False
# Define tour steps
steps = [
{
"title": "Welcome to Insta's EYE",
"content": "This is a tour guide to help you navigate through the application.",
},
{
"title": "Options Sidebar",
"content": "Here you can select different options such as adding a person, recognizing a face, deleting a person, or recognizing a face with optimal identification.",
},
{
"title": "Recognize face",
"content": "This function will return you Id's for your provided face.",
},
{
"title": "Recognize face(optimal)",
"content": "This function will return you Id's of provided face with more accuracy",
},
{
"title": "Add person",
"content": "Here you can add yourself or someone else too",
},
{
"title": "Delete person",
"content": "Here you can delete the person by entering their name",
},
{
"title": "History",
"content": "Here you can track history of your activities",
},
{
"title": "Upload Image",
"content": "You can upload an image here for face recognition or adding a person.",
},
{
"title": "Text Input",
"content": "Enter text here such as the person's name or Instagram handle.",
},
{
"title": "Buttons",
"content": "Click on these buttons to perform actions like adding a person or recognizing a face.",
},
]
# Function to display tour steps
def display_tour_steps(steps):
st.markdown("# Tour Guide")
st.markdown("This tour will guide you through the application.")
st.markdown("---")
for step in steps:
st.markdown(f"## {step['title']}")
st.write(step['content'])
st.markdown("---")
# Update the main function to include the feedback option
def main():
st.sidebar.title("Options")
option = st.sidebar.radio("Select Option", ["Recognize Face", "Add Person", "Recognize Face (Optimal)", "Delete Person", "Tour Guide", "Feedback", "Messaging", "History", "Delete User"])
if option == "Recognize Face":
recognize_face_ui()
elif option == "Recognize Face (Optimal)":
recognize_face_optimal_ui()
elif option == "Add Person":
add_person_ui()
elif option == "Delete Person":
delete_person_ui()
elif option == "Tour Guide":
tour_guide_ui()
elif option == "Feedback":
feedback_ui()
elif option == "Messaging":
messaging_ui()
elif option == "History":
display_history(st.session_state.auth_state["user"].email)
elif option == "Delete User":
delete_user_ui()
# Run the tour guide
if __name__ == "__main__":
authenticate_user_ui()