import pyotp
import time
import streamlit as st
from datetime import datetime
from uuid import uuid4
import logging
from typing import List, Dict, Optional
from utils.security import clear_security_metrics
import bcrypt
import re
from utils.security import (
validate_password,
check_rate_limit,
generate_jwt_token,
verify_jwt_token,
log_security_event
)
from utils.audit import audit_logger
from dashboard_components import dashboard_header, account_linking_form
from typing import Callable
def dashboard_header():
"""
Streamlit version of DashboardHeader.jsx
"""
col1, col2 = st.columns([2, 3])
with col1:
st.title("Digital Inheritance Dashboard")
st.markdown("by Muhammad Shaheer",
unsafe_allow_html=True)
with col2:
cols = st.columns(3)
with cols[0]:
if st.button("🏦 Wallets", use_container_width=True):
st.session_state.page = 'wallets'
st.rerun()
with cols[1]:
if st.button("🔗 Link Accounts", use_container_width=True):
st.session_state.page = 'link_accounts'
st.rerun()
with cols[2]:
if st.button("🚪 Logout", type="secondary", use_container_width=True):
logout_user()
def show_dashboard():
"""
Updated dashboard view incorporating React components
"""
# Show the header
dashboard_header()
# Get user accounts
accounts = get_user_accounts(st.session_state.user['id'])
if not accounts:
st.info("No digital accounts added yet. Click 'Link Accounts' to get started.")
return
# Display accounts in a modern card layout
st.subheader("Your Digital Accounts")
for account in accounts:
with st.container():
cols = st.columns([1, 2, 1])
with cols[0]:
platform_icons = {
"facebook": "📘",
"twitter": "🐦",
"instagram": "📸",
"linkedin": "💼"
}
icon = platform_icons.get(account['platform'].lower(), "🔗")
st.markdown(f"### {icon} {account['platform']}")
st.write(f"**@{account['username']}**")
with cols[1]:
st.write(f"**Status:** {account['status']}")
st.write(f"**Last Activity:** {account['last_activity']}")
if account.get('heir_name'):
st.write(f"**Heir:** {account['heir_name']}")
with cols[2]:
if st.button("Remove Account", key=f"remove_{account['id']}",
type="secondary", use_container_width=True):
if remove_account(account['id']):
st.success("Account removed successfully!")
time.sleep(1)
st.rerun()
def logout_user():
"""Handle user logout."""
try:
# Log the logout event
if 'user' in st.session_state:
log_security_event(
st.session_state.user['id'],
'logout',
f'User {st.session_state.user["username"]} logged out'
)
# Clear metrics and session
clear_security_metrics() # Unregister Prometheus metrics
st.session_state.clear()
st.session_state.page = 'login'
st.rerun()
except Exception as e:
logger.error(f"Logout failed: {str(e)}")
st.error("An error occurred during logout. Please try again.")
def account_linking_form():
"""
Streamlit version of AccountLinking.jsx
"""
st.title("Link Social Media Account")
# Platform selection
platform = st.radio(
"Select Platform",
["Facebook", "Twitter", "Instagram", "LinkedIn"],
horizontal=True,
format_func=lambda x: {
"Facebook": "📘 Facebook",
"Twitter": "🐦 Twitter",
"Instagram": "📸 Instagram",
"LinkedIn": "💼 LinkedIn"
}[x]
)
if platform:
username = st.text_input("Username", placeholder=f"Enter your {platform} username")
# Info message with custom styling
st.info(
"We'll monitor this account for inactivity. "
"Make sure the profile is public so we can track activity."
)
if username:
if st.button("Link Account", type="primary", use_container_width=True):
try:
# Add account linking logic here
success = link_social_account(platform.lower(), username)
if success:
st.success(f"Successfully linked {platform} account: {username}")
time.sleep(1)
st.session_state.page = 'dashboard'
st.rerun()
except Exception as e:
st.error(f"Failed to link account: {str(e)}")
def apply_custom_css():
"""
Apply custom CSS to make the components look more like the original React design
"""
st.markdown("""
""", unsafe_allow_html=True)
# Example usage in your main Streamlit app:
def main():
# Initialize session state
if 'page' not in st.session_state:
st.session_state.page = 'dashboard'
# Apply custom CSS
apply_custom_css()
def logout():
st.session_state.clear()
st.rerun()
# Show header
dashboard_header(logout)
# Show different pages based on state
if st.session_state.page == 'link_accounts':
account_linking_form()
# Add other pages as needed
# Configure logging
logger = logging.getLogger(__name__)
def show_dashboard():
"""Display the main dashboard."""
# Apply custom styling
apply_custom_css()
# Show dashboard header
dashboard_header(lambda: logout_user())
# Get and display accounts
accounts = get_user_accounts(st.session_state.user['id'])
if not accounts:
st.info("No digital accounts added yet. Click 'Add Account' to get started.")
return
def logout_user():
st.session_state.clear()
st.rerun()
def verify_account(platform, username):
"""Verify if a social media account exists and is public."""
# Implementation needed
return True
def link_social_account(platform, username):
# Validate the account exists
account_exists = verify_account(platform, username)
if not account_exists:
st.error("Account not found or not public")
return False
# Store account info
try:
cursor = st.session_state.db.cursor()
account_id = str(uuid4())
current_time = datetime.now().isoformat()
cursor.execute("""
INSERT INTO social_accounts (
id, user_id, platform, username,
last_check, last_activity,
created_at, status
) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""", (
account_id,
st.session_state.user['id'],
platform,
username,
current_time,
current_time,
current_time,
'active'
))
st.session_state.db.commit()
return True
except Exception as e:
st.error(f"Failed to link account: {str(e)}")
return False
def link_social_account(platform, username):
# Validate the account exists
account_exists = verify_account(platform, username)
if not account_exists:
st.error("Account not found or not public")
return False
# Store account info
try:
cursor = st.session_state.db.cursor()
account_id = str(uuid4())
current_time = datetime.now().isoformat()
cursor.execute("""
INSERT INTO social_accounts (
id, user_id, platform, username,
last_check, last_activity,
created_at, status
) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""", (
account_id,
st.session_state.user['id'],
platform,
username,
current_time,
current_time,
current_time,
'active'
))
st.session_state.db.commit()
return True
except Exception as e:
st.error(f"Failed to link account: {str(e)}")
return False
def get_heir_info(heir_id: str) -> Optional[Dict]:
"""Get information about a specific heir."""
try:
cursor = st.session_state.db.cursor()
cursor.execute("""
SELECT id, name, email, relationship
FROM heirs
WHERE id = ?
""", (heir_id,))
heir = cursor.fetchone()
if heir:
return {
'id': heir[0],
'name': heir[1],
'email': heir[2],
'relationship': heir[3]
}
return None
except Exception as e:
st.error(f"Failed to fetch heir info: {str(e)}")
return None
def get_heirs_list(user_id: str) -> List[Dict]:
"""Get list of heirs for the given user."""
try:
cursor = st.session_state.db.cursor()
cursor.execute("""
SELECT id, name, email, relationship
FROM heirs
WHERE user_id = ?
""", (user_id,))
heirs = cursor.fetchall()
return [{'id': h[0], 'name': h[1], 'email': h[2], 'relationship': h[3]} for h in heirs]
except Exception as e:
st.error(f"Failed to fetch heirs: {str(e)}")
return []
def get_user_accounts(user_id: str) -> List[Dict]:
"""Get list of social accounts for the given user."""
try:
cursor = st.session_state.db.cursor()
cursor.execute("""
SELECT id, platform, username, status, last_activity, heir_id
FROM social_accounts
WHERE user_id = ?
""", (user_id,))
accounts = cursor.fetchall()
return [{
'id': a[0],
'platform': a[1],
'username': a[2],
'status': a[3],
'last_activity': a[4],
'heir_id': a[5]
} for a in accounts]
except Exception as e:
st.error(f"Failed to fetch accounts: {str(e)}")
return []
def remove_account(account_id: str) -> bool:
"""Remove a social account."""
try:
cursor = st.session_state.db.cursor()
cursor.execute("DELETE FROM social_accounts WHERE id = ?", (account_id,))
st.session_state.db.commit()
st.success("Account removed successfully!")
return True
except Exception as e:
st.error(f"Failed to remove account: {str(e)}")
return False
def add_account(platform: str, username: str, password: str, heir_id: str, days: int) -> bool:
"""Add a new social account with proper validation."""
try:
if not all([platform, username, password, heir_id, days]):
st.error("Please fill in all required fields")
return False
if days < 30:
st.error("Inactivity threshold must be at least 30 days")
return False
cursor = st.session_state.db.cursor()
# Check if heir exists
cursor.execute("SELECT id FROM heirs WHERE id = ? AND user_id = ?",
(heir_id, st.session_state.user['id']))
if not cursor.fetchone():
st.error("Selected heir not found")
return False
# Create account
account_id = str(uuid4())
current_time = datetime.now().isoformat()
cursor.execute("""
INSERT INTO social_accounts (
id, user_id, platform, username, password,
heir_id, last_check, last_activity,
inactivity_threshold, created_at, last_modified,
status
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
account_id,
st.session_state.user['id'],
platform,
username,
password, # Note: In production, this should be encrypted
heir_id,
current_time,
current_time,
days,
current_time,
current_time,
'active'
))
st.session_state.db.commit()
log_security_event(
st.session_state.user['id'],
'account_added',
f'Added {platform} account for user {username}'
)
st.success("Account added successfully!")
return True
except Exception as e:
st.error(f"Failed to add account: {str(e)}")
return False
def add_heir(name: str, email: str, relationship: str, phone: Optional[str] = None) -> bool:
"""Add a new heir."""
try:
if not name or not email or not relationship:
st.error("Please fill in all required fields")
return False
cursor = st.session_state.db.cursor()
heir_id = str(uuid4())
current_time = datetime.now().isoformat()
cursor.execute("""
INSERT INTO heirs (id, user_id, name, email, relationship, phone, created_at, last_modified)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""", (
heir_id,
st.session_state.user['id'],
name,
email,
relationship,
phone,
current_time,
current_time
))
st.session_state.db.commit()
st.success("Heir added successfully!")
return True
except Exception as e:
st.error(f"Failed to add heir: {str(e)}")
return False
def register_user(username: str, email: str, password: str, confirm: str) -> bool:
"""Register a new user with validation."""
if password != confirm:
st.error("Passwords don't match")
return False
valid, msg = validate_password(password)
if not valid:
st.error(msg)
return False
if not re.match(r"[^@]+@[^@]+\.[^@]+", email):
st.error("Invalid email format")
return False
try:
cursor = st.session_state.db.cursor()
# Check existing user
cursor.execute("SELECT id FROM users WHERE username = ? OR email = ?", (username, email))
if cursor.fetchone():
st.error("Username or email already exists")
return False
# Create user
user_id = str(uuid4())
hashed = bcrypt.hashpw(password.encode(), bcrypt.gensalt())
current_time = datetime.now().isoformat()
cursor.execute("""
INSERT INTO users (id, username, email, password, created_at, last_password_change)
VALUES (?, ?, ?, ?, ?, ?)
""", (
user_id,
username,
email,
hashed.decode(),
current_time,
current_time
))
st.session_state.db.commit()
st.success("Registration successful! Please login.")
return True
except Exception as e:
st.error(f"Registration failed: {str(e)}")
return False
def login_user(username: str, password: str) -> bool:
"""Authenticate a user."""
try:
cursor = st.session_state.db.cursor()
cursor.execute("SELECT id, password, email FROM users WHERE username = ?", (username,))
user = cursor.fetchone()
if not user:
st.error("Invalid username or password")
return False
if bcrypt.checkpw(password.encode(), user[1].encode()):
st.session_state.user = {
'id': user[0],
'username': username,
'email': user[2]
}
st.session_state.page = 'dashboard'
token = generate_jwt_token(user[0])
st.session_state.token = token
log_security_event(
user[0],
'login',
f'Successful login for user {username}'
)
return True
st.error("Invalid username or password")
return False
except Exception as e:
st.error(f"Login failed: {str(e)}")
return False
def get_2fa_status(user_id: str) -> bool:
"""Check if 2FA is enabled for a user."""
try:
cursor = st.session_state.db.cursor()
cursor.execute("SELECT totp_secret FROM users WHERE id = ?", (user_id,))
result = cursor.fetchone()
return bool(result and result[0])
except Exception as e:
st.error(f"Failed to check 2FA status: {str(e)}")
return False
def enable_2fa_for_user(user_id: str, totp_secret: str) -> bool:
"""Enable 2FA for a user."""
try:
cursor = st.session_state.db.cursor()
cursor.execute(
"UPDATE users SET totp_secret = ? WHERE id = ?",
(totp_secret, user_id)
)
st.session_state.db.commit()
return True
except Exception as e:
st.error(f"Failed to enable 2FA: {str(e)}")
return False
def change_password(current: str, new_pass: str, confirm: str) -> bool:
"""Change user password with validation."""
try:
if new_pass != confirm:
st.error("New passwords don't match")
return False
valid, msg = validate_password(new_pass)
if not valid:
st.error(msg)
return False
cursor = st.session_state.db.cursor()
cursor.execute(
"SELECT password FROM users WHERE id = ?",
(st.session_state.user['id'],)
)
stored_hash = cursor.fetchone()[0]
if not bcrypt.checkpw(current.encode(), stored_hash.encode()):
st.error("Current password is incorrect")
return False
new_hash = bcrypt.hashpw(new_pass.encode(), bcrypt.gensalt())
cursor.execute(
"UPDATE users SET password = ?, last_password_change = ? WHERE id = ?",
(new_hash.decode(), datetime.now().isoformat(), st.session_state.user['id'])
)
st.session_state.db.commit()
st.success("Password changed successfully!")
return True
except Exception as e:
st.error(f"Failed to change password: {str(e)}")
return False
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Page display functions
def show_login_page():
"""Display the login/register page."""
st.title("Digital Inheritance System")
tab1, tab2 = st.tabs(["Login", "Register"])
with tab1:
with st.form("login_form", clear_on_submit=True):
username = st.text_input("Username")
password = st.text_input("Password", type="password")
submitted = st.form_submit_button("Login")
if submitted and username and password:
if check_rate_limit(username):
st.error("Too many login attempts. Please try again later.")
return
if login_user(username, password):
st.success("Login successful!")
time.sleep(1) # Give time for message to show
st.rerun()
with tab2:
with st.form("register_form", clear_on_submit=True):
col1, col2 = st.columns(2)
with col1:
reg_username = st.text_input(
"Username",
help="Choose a unique username"
)
reg_email = st.text_input(
"Email",
help="Enter your email address"
)
with col2:
reg_password = st.text_input(
"Password",
type="password",
help="Choose a strong password"
)
reg_confirm = st.text_input(
"Confirm Password",
type="password",
help="Re-enter your password"
)
# Password requirements info
st.markdown("""
Password must contain:
- At least 8 characters
- At least one uppercase letter
- At least one lowercase letter
- At least one number
- At least one special character
""")
submitted = st.form_submit_button("Register")
if submitted:
if register_user(reg_username, reg_email, reg_password, reg_confirm):
st.success("Registration successful! Please login.")
time.sleep(2) # Give time for message to show
st.rerun()
def get_user_accounts(user_id: str) -> List[Dict]:
"""Get list of social accounts for the given user."""
try:
cursor = st.session_state.db.cursor()
cursor.execute("""
SELECT
sa.id,
sa.platform,
sa.username,
sa.status,
sa.last_activity,
sa.heir_id,
h.name as heir_name,
h.relationship as heir_relationship
FROM social_accounts sa
LEFT JOIN heirs h ON sa.heir_id = h.id
WHERE sa.user_id = ?
ORDER BY sa.last_activity DESC
""", (user_id,))
accounts = []
for row in cursor.fetchall():
accounts.append({
'id': row[0],
'platform': row[1],
'username': row[2],
'status': row[3],
'last_activity': row[4],
'heir_id': row[5],
'heir_name': row[6],
'heir_relationship': row[7]
})
return accounts
except Exception as e:
st.error(f"Failed to fetch accounts: {str(e)}")
return []
def add_account(platform: str, username: str, password: str, heir_id: str, days: int) -> bool:
"""Add a new social account with proper validation."""
try:
if not all([platform, username, password, heir_id, days]):
st.error("Please fill in all required fields")
return False
if days < 30:
st.error("Inactivity threshold must be at least 30 days")
return False
cursor = st.session_state.db.cursor()
# Check if account already exists
cursor.execute("""
SELECT id FROM social_accounts
WHERE platform = ? AND username = ? AND user_id = ?
""", (platform, username, st.session_state.user['id']))
if cursor.fetchone():
st.error("This account is already linked")
return False
# Create account
account_id = str(uuid4())
current_time = datetime.now().isoformat()
cursor.execute("""
INSERT INTO social_accounts (
id, user_id, platform, username, password,
heir_id, last_check, last_activity,
inactivity_threshold, created_at, last_modified,
status
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
account_id,
st.session_state.user['id'],
platform,
username,
password, # In production, encrypt this
heir_id,
current_time,
current_time,
days,
current_time,
current_time,
'active'
))
st.session_state.db.commit()
log_security_event(
st.session_state.user['id'],
'account_added',
f'Added {platform} account for user {username}'
)
st.success(f"Account {username} on {platform} added successfully!")
time.sleep(1) # Give time for message to show
return True
except Exception as e:
st.error(f"Failed to add account: {str(e)}")
return False
def show_dashboard():
"""Display the main dashboard."""
st.title("Digital Inheritance Dashboard")
# Add navigation menu at top
menu_col1, menu_col2, menu_col3, menu_col4 = st.columns([2,1,1,1])
with menu_col1:
st.subheader(f"Welcome, {st.session_state.user['username']}")
with menu_col2:
if st.button("Add Account", key="add_account_btn", type="primary"):
st.session_state.page = 'add_account'
st.rerun()
with menu_col3:
if st.button("Manage Heirs", key="manage_heirs_btn"):
st.session_state.page = 'manage_heirs'
st.rerun()
with menu_col4:
if st.button("Logout", key="logout_btn", type="secondary"):
logout_user()
# Get and display accounts
accounts = get_user_accounts(st.session_state.user['id'])
if not accounts:
st.info("No digital accounts added yet. Click 'Add Account' to get started.")
return
st.subheader("Your Digital Accounts")
# Use columns for better layout
for account in accounts:
with st.container():
col1, col2, col3 = st.columns([3,3,1])
with col1:
st.markdown(f"### {account['platform']} - {account['username']}")
st.write(f"**Status:** {account['status'].title()}")
last_activity = datetime.fromisoformat(account['last_activity']).strftime("%Y-%m-%d %H:%M")
st.write(f"**Last Activity:** {last_activity}")
with col2:
heir = get_heir_info(account['heir_id'])
if heir:
st.write(f"**Heir:** {heir['name']}")
st.write(f"**Relationship:** {heir['relationship']}")
if heir.get('email'):
st.write(f"**Contact:** {heir['email']}")
with col3:
if st.button("Remove", key=f"remove_{account['id']}", type="secondary"):
if remove_account(account['id']):
st.success("Account removed successfully!")
time.sleep(1) # Give time for message to show
st.rerun()
def show_add_account_page():
"""Display the add account page."""
st.title("Add Digital Account")
if st.button("← Back"):
st.session_state.page = 'dashboard'
st.rerun()
heirs = get_heirs_list(st.session_state.user['id'])
if not heirs:
st.warning("Please add at least one heir before adding accounts.")
if st.button("Add Heir"):
st.session_state.page = 'manage_heirs'
st.rerun()
return
with st.form("add_account_form", clear_on_submit=True):
col1, col2 = st.columns(2)
with col1:
platform = st.selectbox(
"Platform",
["Facebook", "Twitter", "Instagram", "LinkedIn"],
help="Select the social media platform"
)
username = st.text_input(
"Username",
help="Enter your username for the selected platform"
)
with col2:
password = st.text_input(
"Password",
type="password",
help="Enter your account password"
)
heir = st.selectbox(
"Select Heir",
options=heirs,
format_func=lambda x: f"{x['name']} ({x['relationship']})",
help="Choose the heir for this account"
)
days = st.slider(
"Inactivity Threshold (days)",
min_value=30,
max_value=365,
value=90,
help="Number of days of inactivity before notifying heir"
)
submitted = st.form_submit_button("Add Account", use_container_width=True)
if submitted:
if heir:
add_account(platform, username, password, heir['id'], days)
else:
st.error("Please select an heir")
def show_heir_management_page():
"""Display the heir management page."""
st.title("Manage Heirs")
if st.button("← Back"):
st.session_state.page = 'dashboard'
st.rerun()
tab1, tab2 = st.tabs(["View Heirs", "Add Heir"])
with tab1:
heirs = get_heirs_list(st.session_state.user['id'])
if not heirs:
st.info("No heirs added yet. Use the 'Add Heir' tab to get started.")
else:
for heir in heirs:
with st.expander(f"{heir['name']} - {heir['relationship']}", expanded=True):
col1, col2 = st.columns([3,1])
with col1:
st.write(f"**Email:** {heir['email']}")
if heir.get('phone'):
st.write(f"**Phone:** {heir['phone']}")
with col2:
if st.button("Remove", key=f"remove_heir_{heir['id']}", type="secondary"):
remove_heir(heir['id'])
st.rerun()
with tab2:
with st.form("add_heir_form", clear_on_submit=True):
col1, col2 = st.columns(2)
with col1:
name = st.text_input(
"Full Name",
help="Enter the heir's full name"
)
email = st.text_input(
"Email",
help="Enter the heir's email address"
)
with col2:
relationship = st.selectbox(
"Relationship",
["Family", "Friend", "Legal Representative", "Other"],
help="Select your relationship with the heir"
)
phone = st.text_input(
"Phone (Optional)",
help="Enter the heir's phone number (optional)"
)
submitted = st.form_submit_button("Add Heir", use_container_width=True)
if submitted:
add_heir(name, email, relationship, phone)
def show_security_settings_page():
"""Display the security settings page."""
st.title("Security Settings")
if st.button("← Back"):
st.session_state.page = 'dashboard'
st.rerun()
tab1, tab2 = st.tabs(["Two-Factor Authentication", "Change Password"])
with tab1:
current_2fa_status = get_2fa_status(st.session_state.user['id'])
st.write("Current Status:", "Enabled" if current_2fa_status else "Disabled")
if not current_2fa_status:
if st.button("Set Up 2FA"):
st.session_state.setup_2fa = True
st.rerun()
if hasattr(st.session_state, 'setup_2fa'):
setup_2fa()
with tab2:
with st.form("change_password_form", clear_on_submit=True):
current_password = st.text_input(
"Current Password",
type="password",
help="Enter your current password"
)
new_password = st.text_input(
"New Password",
type="password",
help="Enter your new password"
)
confirm_password = st.text_input(
"Confirm New Password",
type="password",
help="Confirm your new password"
)
submitted = st.form_submit_button("Change Password", use_container_width=True)
if submitted:
change_password(current_password, new_password, confirm_password)
def setup_2fa():
"""Set up two-factor authentication for a user."""
if 'totp_secret' not in st.session_state:
st.session_state.totp_secret = pyotp.random_base32()
totp = pyotp.TOTP(st.session_state.totp_secret)
provisioning_uri = totp.provisioning_uri(
st.session_state.user['email'],
issuer_name="Digital Heir System"
)
st.write("1. Scan this QR code with your authenticator app:")
st.code(provisioning_uri) # Since we can't generate QR codes directly
st.write("2. Enter the code from your authenticator app:")
code = st.text_input("Verification Code", key="2fa_code")
verify_col1, verify_col2 = st.columns([1,3])
with verify_col1:
if st.button("Verify Code"):
if totp.verify(code):
if enable_2fa_for_user(st.session_state.user['id'], st.session_state.totp_secret):
st.success("2FA enabled successfully!")
del st.session_state.totp_secret
del st.session_state.setup_2fa
st.rerun()
else:
st.error("Invalid code")
with verify_col2:
if st.button("Cancel"):
del st.session_state.totp_secret
del st.session_state.setup_2fa
st.rerun()
def check_account_activity(platform, username):
"""Check social media account activity."""
if platform == 'twitter':
url = f"https://nitter.net/{username}/rss"
# Parse RSS feed for latest activity
pass
elif platform == 'facebook':
url = f"https://www.facebook.com/{username}"
# Implementation needed
pass
def remove_heir(heir_id: str) -> bool:
"""Remove an heir and their associated accounts."""
try:
cursor = st.session_state.db.cursor()
# First check if there are any accounts associated with this heir
cursor.execute("""
SELECT COUNT(*) FROM social_accounts
WHERE heir_id = ? AND user_id = ?
""", (heir_id, st.session_state.user['id']))
account_count = cursor.fetchone()[0]
if account_count > 0:
st.error(f"Cannot remove heir: {account_count} accounts are still assigned to this heir. Please reassign or remove these accounts first.")
return False
# If no accounts are associated, remove the heir
cursor.execute("""
DELETE FROM heirs
WHERE id = ? AND user_id = ?
""", (heir_id, st.session_state.user['id']))
st.session_state.db.commit()
st.success("Heir removed successfully!")
return True
except Exception as e:
st.error(f"Failed to remove heir: {str(e)}")
return False
def check_account_activity(platform, username):
if platform == 'twitter':
url = f"https://nitter.net/{username}/rss"
# Parse RSS feed for latest activity
elif platform == 'facebook':
url = f"https://www.facebook.com/{username}"
def remove_heir(heir_id: str) -> bool:
"""Remove an heir and their associated accounts."""
try:
cursor = st.session_state.db.cursor()
# First check if there are any accounts associated with this heir
cursor.execute("""
SELECT COUNT(*) FROM social_accounts
WHERE heir_id = ? AND user_id = ?
""", (heir_id, st.session_state.user['id']))
account_count = cursor.fetchone()[0]
if account_count > 0:
st.error(f"Cannot remove heir: {account_count} accounts are still assigned to this heir. Please reassign or remove these accounts first.")
return False
# If no accounts are associated, remove the heir
cursor.execute("""
DELETE FROM heirs
WHERE id = ? AND user_id = ?
""", (heir_id, st.session_state.user['id']))
st.session_state.db.commit()
st.success("Heir removed successfully!")
return True
except Exception as e:
st.error(f"Failed to remove heir: {str(e)}")
return False
def initialize_database():
"""
Initialize database tables needed for the application
"""
try:
cursor = st.session_state.db.cursor()
# Create login_attempt_history table instead of login_attempts
cursor.execute("""
CREATE TABLE IF NOT EXISTS login_attempt_history (
id TEXT PRIMARY KEY,
username TEXT NOT NULL,
attempt_time TEXT NOT NULL,
ip_address TEXT,
success BOOLEAN DEFAULT FALSE
)
""")
# Create other necessary tables if they don't exist
cursor.execute("""
CREATE TABLE IF NOT EXISTS social_accounts (
id TEXT PRIMARY KEY,
user_id TEXT NOT NULL,
platform TEXT NOT NULL,
username TEXT NOT NULL,
status TEXT DEFAULT 'active',
last_activity TEXT,
heir_id TEXT,
FOREIGN KEY (user_id) REFERENCES users (id),
FOREIGN KEY (heir_id) REFERENCES heirs (id)
)
""")
st.session_state.db.commit()
except Exception as e:
logger.error(f"Database initialization failed: {str(e)}")
st.error("Failed to initialize database. Please contact support.")
def initialize_services():
"""Initialize required services."""
if hasattr(st.session_state, 'db'):
audit_logger.initialize_with_connection(st.session_state.db)
# Main initialization function
def initialize_app():
"""Initialize the application."""
st.set_page_config(
page_title="DIGITAL INHERITANCE SYSTEM",
page_icon="🔐",
layout="wide"
)
if 'db' in st.session_state:
initialize_database()
if 'page' not in st.session_state:
st.session_state.page = 'login'
initialize_services()