import pyotp import time import streamlit as st from datetime import datetime from uuid import uuid4 import logging from typing import List, Dict, Optional from utils.security import clear_security_metrics import bcrypt import re from utils.security import ( validate_password, check_rate_limit, generate_jwt_token, verify_jwt_token, log_security_event ) from utils.audit import audit_logger from dashboard_components import dashboard_header, account_linking_form from typing import Callable def dashboard_header(): """ Streamlit version of DashboardHeader.jsx """ col1, col2 = st.columns([2, 3]) with col1: st.title("Digital Inheritance Dashboard") st.markdown("by Muhammad Shaheer", unsafe_allow_html=True) with col2: cols = st.columns(3) with cols[0]: if st.button("🏦 Wallets", use_container_width=True): st.session_state.page = 'wallets' st.rerun() with cols[1]: if st.button("🔗 Link Accounts", use_container_width=True): st.session_state.page = 'link_accounts' st.rerun() with cols[2]: if st.button("🚪 Logout", type="secondary", use_container_width=True): logout_user() def show_dashboard(): """ Updated dashboard view incorporating React components """ # Show the header dashboard_header() # Get user accounts accounts = get_user_accounts(st.session_state.user['id']) if not accounts: st.info("No digital accounts added yet. Click 'Link Accounts' to get started.") return # Display accounts in a modern card layout st.subheader("Your Digital Accounts") for account in accounts: with st.container(): cols = st.columns([1, 2, 1]) with cols[0]: platform_icons = { "facebook": "📘", "twitter": "🐦", "instagram": "📸", "linkedin": "💼" } icon = platform_icons.get(account['platform'].lower(), "🔗") st.markdown(f"### {icon} {account['platform']}") st.write(f"**@{account['username']}**") with cols[1]: st.write(f"**Status:** {account['status']}") st.write(f"**Last Activity:** {account['last_activity']}") if account.get('heir_name'): st.write(f"**Heir:** {account['heir_name']}") with cols[2]: if st.button("Remove Account", key=f"remove_{account['id']}", type="secondary", use_container_width=True): if remove_account(account['id']): st.success("Account removed successfully!") time.sleep(1) st.rerun() def logout_user(): """Handle user logout.""" try: # Log the logout event if 'user' in st.session_state: log_security_event( st.session_state.user['id'], 'logout', f'User {st.session_state.user["username"]} logged out' ) # Clear metrics and session clear_security_metrics() # Unregister Prometheus metrics st.session_state.clear() st.session_state.page = 'login' st.rerun() except Exception as e: logger.error(f"Logout failed: {str(e)}") st.error("An error occurred during logout. Please try again.") def account_linking_form(): """ Streamlit version of AccountLinking.jsx """ st.title("Link Social Media Account") # Platform selection platform = st.radio( "Select Platform", ["Facebook", "Twitter", "Instagram", "LinkedIn"], horizontal=True, format_func=lambda x: { "Facebook": "📘 Facebook", "Twitter": "🐦 Twitter", "Instagram": "📸 Instagram", "LinkedIn": "💼 LinkedIn" }[x] ) if platform: username = st.text_input("Username", placeholder=f"Enter your {platform} username") # Info message with custom styling st.info( "We'll monitor this account for inactivity. " "Make sure the profile is public so we can track activity." ) if username: if st.button("Link Account", type="primary", use_container_width=True): try: # Add account linking logic here success = link_social_account(platform.lower(), username) if success: st.success(f"Successfully linked {platform} account: {username}") time.sleep(1) st.session_state.page = 'dashboard' st.rerun() except Exception as e: st.error(f"Failed to link account: {str(e)}") def apply_custom_css(): """ Apply custom CSS to make the components look more like the original React design """ st.markdown(""" """, unsafe_allow_html=True) # Example usage in your main Streamlit app: def main(): # Initialize session state if 'page' not in st.session_state: st.session_state.page = 'dashboard' # Apply custom CSS apply_custom_css() def logout(): st.session_state.clear() st.rerun() # Show header dashboard_header(logout) # Show different pages based on state if st.session_state.page == 'link_accounts': account_linking_form() # Add other pages as needed # Configure logging logger = logging.getLogger(__name__) def show_dashboard(): """Display the main dashboard.""" # Apply custom styling apply_custom_css() # Show dashboard header dashboard_header(lambda: logout_user()) # Get and display accounts accounts = get_user_accounts(st.session_state.user['id']) if not accounts: st.info("No digital accounts added yet. Click 'Add Account' to get started.") return def logout_user(): st.session_state.clear() st.rerun() def verify_account(platform, username): """Verify if a social media account exists and is public.""" # Implementation needed return True def link_social_account(platform, username): # Validate the account exists account_exists = verify_account(platform, username) if not account_exists: st.error("Account not found or not public") return False # Store account info try: cursor = st.session_state.db.cursor() account_id = str(uuid4()) current_time = datetime.now().isoformat() cursor.execute(""" INSERT INTO social_accounts ( id, user_id, platform, username, last_check, last_activity, created_at, status ) VALUES (?, ?, ?, ?, ?, ?, ?, ?) """, ( account_id, st.session_state.user['id'], platform, username, current_time, current_time, current_time, 'active' )) st.session_state.db.commit() return True except Exception as e: st.error(f"Failed to link account: {str(e)}") return False def link_social_account(platform, username): # Validate the account exists account_exists = verify_account(platform, username) if not account_exists: st.error("Account not found or not public") return False # Store account info try: cursor = st.session_state.db.cursor() account_id = str(uuid4()) current_time = datetime.now().isoformat() cursor.execute(""" INSERT INTO social_accounts ( id, user_id, platform, username, last_check, last_activity, created_at, status ) VALUES (?, ?, ?, ?, ?, ?, ?, ?) """, ( account_id, st.session_state.user['id'], platform, username, current_time, current_time, current_time, 'active' )) st.session_state.db.commit() return True except Exception as e: st.error(f"Failed to link account: {str(e)}") return False def get_heir_info(heir_id: str) -> Optional[Dict]: """Get information about a specific heir.""" try: cursor = st.session_state.db.cursor() cursor.execute(""" SELECT id, name, email, relationship FROM heirs WHERE id = ? """, (heir_id,)) heir = cursor.fetchone() if heir: return { 'id': heir[0], 'name': heir[1], 'email': heir[2], 'relationship': heir[3] } return None except Exception as e: st.error(f"Failed to fetch heir info: {str(e)}") return None def get_heirs_list(user_id: str) -> List[Dict]: """Get list of heirs for the given user.""" try: cursor = st.session_state.db.cursor() cursor.execute(""" SELECT id, name, email, relationship FROM heirs WHERE user_id = ? """, (user_id,)) heirs = cursor.fetchall() return [{'id': h[0], 'name': h[1], 'email': h[2], 'relationship': h[3]} for h in heirs] except Exception as e: st.error(f"Failed to fetch heirs: {str(e)}") return [] def get_user_accounts(user_id: str) -> List[Dict]: """Get list of social accounts for the given user.""" try: cursor = st.session_state.db.cursor() cursor.execute(""" SELECT id, platform, username, status, last_activity, heir_id FROM social_accounts WHERE user_id = ? """, (user_id,)) accounts = cursor.fetchall() return [{ 'id': a[0], 'platform': a[1], 'username': a[2], 'status': a[3], 'last_activity': a[4], 'heir_id': a[5] } for a in accounts] except Exception as e: st.error(f"Failed to fetch accounts: {str(e)}") return [] def remove_account(account_id: str) -> bool: """Remove a social account.""" try: cursor = st.session_state.db.cursor() cursor.execute("DELETE FROM social_accounts WHERE id = ?", (account_id,)) st.session_state.db.commit() st.success("Account removed successfully!") return True except Exception as e: st.error(f"Failed to remove account: {str(e)}") return False def add_account(platform: str, username: str, password: str, heir_id: str, days: int) -> bool: """Add a new social account with proper validation.""" try: if not all([platform, username, password, heir_id, days]): st.error("Please fill in all required fields") return False if days < 30: st.error("Inactivity threshold must be at least 30 days") return False cursor = st.session_state.db.cursor() # Check if heir exists cursor.execute("SELECT id FROM heirs WHERE id = ? AND user_id = ?", (heir_id, st.session_state.user['id'])) if not cursor.fetchone(): st.error("Selected heir not found") return False # Create account account_id = str(uuid4()) current_time = datetime.now().isoformat() cursor.execute(""" INSERT INTO social_accounts ( id, user_id, platform, username, password, heir_id, last_check, last_activity, inactivity_threshold, created_at, last_modified, status ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( account_id, st.session_state.user['id'], platform, username, password, # Note: In production, this should be encrypted heir_id, current_time, current_time, days, current_time, current_time, 'active' )) st.session_state.db.commit() log_security_event( st.session_state.user['id'], 'account_added', f'Added {platform} account for user {username}' ) st.success("Account added successfully!") return True except Exception as e: st.error(f"Failed to add account: {str(e)}") return False def add_heir(name: str, email: str, relationship: str, phone: Optional[str] = None) -> bool: """Add a new heir.""" try: if not name or not email or not relationship: st.error("Please fill in all required fields") return False cursor = st.session_state.db.cursor() heir_id = str(uuid4()) current_time = datetime.now().isoformat() cursor.execute(""" INSERT INTO heirs (id, user_id, name, email, relationship, phone, created_at, last_modified) VALUES (?, ?, ?, ?, ?, ?, ?, ?) """, ( heir_id, st.session_state.user['id'], name, email, relationship, phone, current_time, current_time )) st.session_state.db.commit() st.success("Heir added successfully!") return True except Exception as e: st.error(f"Failed to add heir: {str(e)}") return False def register_user(username: str, email: str, password: str, confirm: str) -> bool: """Register a new user with validation.""" if password != confirm: st.error("Passwords don't match") return False valid, msg = validate_password(password) if not valid: st.error(msg) return False if not re.match(r"[^@]+@[^@]+\.[^@]+", email): st.error("Invalid email format") return False try: cursor = st.session_state.db.cursor() # Check existing user cursor.execute("SELECT id FROM users WHERE username = ? OR email = ?", (username, email)) if cursor.fetchone(): st.error("Username or email already exists") return False # Create user user_id = str(uuid4()) hashed = bcrypt.hashpw(password.encode(), bcrypt.gensalt()) current_time = datetime.now().isoformat() cursor.execute(""" INSERT INTO users (id, username, email, password, created_at, last_password_change) VALUES (?, ?, ?, ?, ?, ?) """, ( user_id, username, email, hashed.decode(), current_time, current_time )) st.session_state.db.commit() st.success("Registration successful! Please login.") return True except Exception as e: st.error(f"Registration failed: {str(e)}") return False def login_user(username: str, password: str) -> bool: """Authenticate a user.""" try: cursor = st.session_state.db.cursor() cursor.execute("SELECT id, password, email FROM users WHERE username = ?", (username,)) user = cursor.fetchone() if not user: st.error("Invalid username or password") return False if bcrypt.checkpw(password.encode(), user[1].encode()): st.session_state.user = { 'id': user[0], 'username': username, 'email': user[2] } st.session_state.page = 'dashboard' token = generate_jwt_token(user[0]) st.session_state.token = token log_security_event( user[0], 'login', f'Successful login for user {username}' ) return True st.error("Invalid username or password") return False except Exception as e: st.error(f"Login failed: {str(e)}") return False def get_2fa_status(user_id: str) -> bool: """Check if 2FA is enabled for a user.""" try: cursor = st.session_state.db.cursor() cursor.execute("SELECT totp_secret FROM users WHERE id = ?", (user_id,)) result = cursor.fetchone() return bool(result and result[0]) except Exception as e: st.error(f"Failed to check 2FA status: {str(e)}") return False def enable_2fa_for_user(user_id: str, totp_secret: str) -> bool: """Enable 2FA for a user.""" try: cursor = st.session_state.db.cursor() cursor.execute( "UPDATE users SET totp_secret = ? WHERE id = ?", (totp_secret, user_id) ) st.session_state.db.commit() return True except Exception as e: st.error(f"Failed to enable 2FA: {str(e)}") return False def change_password(current: str, new_pass: str, confirm: str) -> bool: """Change user password with validation.""" try: if new_pass != confirm: st.error("New passwords don't match") return False valid, msg = validate_password(new_pass) if not valid: st.error(msg) return False cursor = st.session_state.db.cursor() cursor.execute( "SELECT password FROM users WHERE id = ?", (st.session_state.user['id'],) ) stored_hash = cursor.fetchone()[0] if not bcrypt.checkpw(current.encode(), stored_hash.encode()): st.error("Current password is incorrect") return False new_hash = bcrypt.hashpw(new_pass.encode(), bcrypt.gensalt()) cursor.execute( "UPDATE users SET password = ?, last_password_change = ? WHERE id = ?", (new_hash.decode(), datetime.now().isoformat(), st.session_state.user['id']) ) st.session_state.db.commit() st.success("Password changed successfully!") return True except Exception as e: st.error(f"Failed to change password: {str(e)}") return False # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Page display functions def show_login_page(): """Display the login/register page.""" st.title("Digital Inheritance System") tab1, tab2 = st.tabs(["Login", "Register"]) with tab1: with st.form("login_form", clear_on_submit=True): username = st.text_input("Username") password = st.text_input("Password", type="password") submitted = st.form_submit_button("Login") if submitted and username and password: if check_rate_limit(username): st.error("Too many login attempts. Please try again later.") return if login_user(username, password): st.success("Login successful!") time.sleep(1) # Give time for message to show st.rerun() with tab2: with st.form("register_form", clear_on_submit=True): col1, col2 = st.columns(2) with col1: reg_username = st.text_input( "Username", help="Choose a unique username" ) reg_email = st.text_input( "Email", help="Enter your email address" ) with col2: reg_password = st.text_input( "Password", type="password", help="Choose a strong password" ) reg_confirm = st.text_input( "Confirm Password", type="password", help="Re-enter your password" ) # Password requirements info st.markdown(""" Password must contain: - At least 8 characters - At least one uppercase letter - At least one lowercase letter - At least one number - At least one special character """) submitted = st.form_submit_button("Register") if submitted: if register_user(reg_username, reg_email, reg_password, reg_confirm): st.success("Registration successful! Please login.") time.sleep(2) # Give time for message to show st.rerun() def get_user_accounts(user_id: str) -> List[Dict]: """Get list of social accounts for the given user.""" try: cursor = st.session_state.db.cursor() cursor.execute(""" SELECT sa.id, sa.platform, sa.username, sa.status, sa.last_activity, sa.heir_id, h.name as heir_name, h.relationship as heir_relationship FROM social_accounts sa LEFT JOIN heirs h ON sa.heir_id = h.id WHERE sa.user_id = ? ORDER BY sa.last_activity DESC """, (user_id,)) accounts = [] for row in cursor.fetchall(): accounts.append({ 'id': row[0], 'platform': row[1], 'username': row[2], 'status': row[3], 'last_activity': row[4], 'heir_id': row[5], 'heir_name': row[6], 'heir_relationship': row[7] }) return accounts except Exception as e: st.error(f"Failed to fetch accounts: {str(e)}") return [] def add_account(platform: str, username: str, password: str, heir_id: str, days: int) -> bool: """Add a new social account with proper validation.""" try: if not all([platform, username, password, heir_id, days]): st.error("Please fill in all required fields") return False if days < 30: st.error("Inactivity threshold must be at least 30 days") return False cursor = st.session_state.db.cursor() # Check if account already exists cursor.execute(""" SELECT id FROM social_accounts WHERE platform = ? AND username = ? AND user_id = ? """, (platform, username, st.session_state.user['id'])) if cursor.fetchone(): st.error("This account is already linked") return False # Create account account_id = str(uuid4()) current_time = datetime.now().isoformat() cursor.execute(""" INSERT INTO social_accounts ( id, user_id, platform, username, password, heir_id, last_check, last_activity, inactivity_threshold, created_at, last_modified, status ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( account_id, st.session_state.user['id'], platform, username, password, # In production, encrypt this heir_id, current_time, current_time, days, current_time, current_time, 'active' )) st.session_state.db.commit() log_security_event( st.session_state.user['id'], 'account_added', f'Added {platform} account for user {username}' ) st.success(f"Account {username} on {platform} added successfully!") time.sleep(1) # Give time for message to show return True except Exception as e: st.error(f"Failed to add account: {str(e)}") return False def show_dashboard(): """Display the main dashboard.""" st.title("Digital Inheritance Dashboard") # Add navigation menu at top menu_col1, menu_col2, menu_col3, menu_col4 = st.columns([2,1,1,1]) with menu_col1: st.subheader(f"Welcome, {st.session_state.user['username']}") with menu_col2: if st.button("Add Account", key="add_account_btn", type="primary"): st.session_state.page = 'add_account' st.rerun() with menu_col3: if st.button("Manage Heirs", key="manage_heirs_btn"): st.session_state.page = 'manage_heirs' st.rerun() with menu_col4: if st.button("Logout", key="logout_btn", type="secondary"): logout_user() # Get and display accounts accounts = get_user_accounts(st.session_state.user['id']) if not accounts: st.info("No digital accounts added yet. Click 'Add Account' to get started.") return st.subheader("Your Digital Accounts") # Use columns for better layout for account in accounts: with st.container(): col1, col2, col3 = st.columns([3,3,1]) with col1: st.markdown(f"### {account['platform']} - {account['username']}") st.write(f"**Status:** {account['status'].title()}") last_activity = datetime.fromisoformat(account['last_activity']).strftime("%Y-%m-%d %H:%M") st.write(f"**Last Activity:** {last_activity}") with col2: heir = get_heir_info(account['heir_id']) if heir: st.write(f"**Heir:** {heir['name']}") st.write(f"**Relationship:** {heir['relationship']}") if heir.get('email'): st.write(f"**Contact:** {heir['email']}") with col3: if st.button("Remove", key=f"remove_{account['id']}", type="secondary"): if remove_account(account['id']): st.success("Account removed successfully!") time.sleep(1) # Give time for message to show st.rerun() def show_add_account_page(): """Display the add account page.""" st.title("Add Digital Account") if st.button("← Back"): st.session_state.page = 'dashboard' st.rerun() heirs = get_heirs_list(st.session_state.user['id']) if not heirs: st.warning("Please add at least one heir before adding accounts.") if st.button("Add Heir"): st.session_state.page = 'manage_heirs' st.rerun() return with st.form("add_account_form", clear_on_submit=True): col1, col2 = st.columns(2) with col1: platform = st.selectbox( "Platform", ["Facebook", "Twitter", "Instagram", "LinkedIn"], help="Select the social media platform" ) username = st.text_input( "Username", help="Enter your username for the selected platform" ) with col2: password = st.text_input( "Password", type="password", help="Enter your account password" ) heir = st.selectbox( "Select Heir", options=heirs, format_func=lambda x: f"{x['name']} ({x['relationship']})", help="Choose the heir for this account" ) days = st.slider( "Inactivity Threshold (days)", min_value=30, max_value=365, value=90, help="Number of days of inactivity before notifying heir" ) submitted = st.form_submit_button("Add Account", use_container_width=True) if submitted: if heir: add_account(platform, username, password, heir['id'], days) else: st.error("Please select an heir") def show_heir_management_page(): """Display the heir management page.""" st.title("Manage Heirs") if st.button("← Back"): st.session_state.page = 'dashboard' st.rerun() tab1, tab2 = st.tabs(["View Heirs", "Add Heir"]) with tab1: heirs = get_heirs_list(st.session_state.user['id']) if not heirs: st.info("No heirs added yet. Use the 'Add Heir' tab to get started.") else: for heir in heirs: with st.expander(f"{heir['name']} - {heir['relationship']}", expanded=True): col1, col2 = st.columns([3,1]) with col1: st.write(f"**Email:** {heir['email']}") if heir.get('phone'): st.write(f"**Phone:** {heir['phone']}") with col2: if st.button("Remove", key=f"remove_heir_{heir['id']}", type="secondary"): remove_heir(heir['id']) st.rerun() with tab2: with st.form("add_heir_form", clear_on_submit=True): col1, col2 = st.columns(2) with col1: name = st.text_input( "Full Name", help="Enter the heir's full name" ) email = st.text_input( "Email", help="Enter the heir's email address" ) with col2: relationship = st.selectbox( "Relationship", ["Family", "Friend", "Legal Representative", "Other"], help="Select your relationship with the heir" ) phone = st.text_input( "Phone (Optional)", help="Enter the heir's phone number (optional)" ) submitted = st.form_submit_button("Add Heir", use_container_width=True) if submitted: add_heir(name, email, relationship, phone) def show_security_settings_page(): """Display the security settings page.""" st.title("Security Settings") if st.button("← Back"): st.session_state.page = 'dashboard' st.rerun() tab1, tab2 = st.tabs(["Two-Factor Authentication", "Change Password"]) with tab1: current_2fa_status = get_2fa_status(st.session_state.user['id']) st.write("Current Status:", "Enabled" if current_2fa_status else "Disabled") if not current_2fa_status: if st.button("Set Up 2FA"): st.session_state.setup_2fa = True st.rerun() if hasattr(st.session_state, 'setup_2fa'): setup_2fa() with tab2: with st.form("change_password_form", clear_on_submit=True): current_password = st.text_input( "Current Password", type="password", help="Enter your current password" ) new_password = st.text_input( "New Password", type="password", help="Enter your new password" ) confirm_password = st.text_input( "Confirm New Password", type="password", help="Confirm your new password" ) submitted = st.form_submit_button("Change Password", use_container_width=True) if submitted: change_password(current_password, new_password, confirm_password) def setup_2fa(): """Set up two-factor authentication for a user.""" if 'totp_secret' not in st.session_state: st.session_state.totp_secret = pyotp.random_base32() totp = pyotp.TOTP(st.session_state.totp_secret) provisioning_uri = totp.provisioning_uri( st.session_state.user['email'], issuer_name="Digital Heir System" ) st.write("1. Scan this QR code with your authenticator app:") st.code(provisioning_uri) # Since we can't generate QR codes directly st.write("2. Enter the code from your authenticator app:") code = st.text_input("Verification Code", key="2fa_code") verify_col1, verify_col2 = st.columns([1,3]) with verify_col1: if st.button("Verify Code"): if totp.verify(code): if enable_2fa_for_user(st.session_state.user['id'], st.session_state.totp_secret): st.success("2FA enabled successfully!") del st.session_state.totp_secret del st.session_state.setup_2fa st.rerun() else: st.error("Invalid code") with verify_col2: if st.button("Cancel"): del st.session_state.totp_secret del st.session_state.setup_2fa st.rerun() def check_account_activity(platform, username): """Check social media account activity.""" if platform == 'twitter': url = f"https://nitter.net/{username}/rss" # Parse RSS feed for latest activity pass elif platform == 'facebook': url = f"https://www.facebook.com/{username}" # Implementation needed pass def remove_heir(heir_id: str) -> bool: """Remove an heir and their associated accounts.""" try: cursor = st.session_state.db.cursor() # First check if there are any accounts associated with this heir cursor.execute(""" SELECT COUNT(*) FROM social_accounts WHERE heir_id = ? AND user_id = ? """, (heir_id, st.session_state.user['id'])) account_count = cursor.fetchone()[0] if account_count > 0: st.error(f"Cannot remove heir: {account_count} accounts are still assigned to this heir. Please reassign or remove these accounts first.") return False # If no accounts are associated, remove the heir cursor.execute(""" DELETE FROM heirs WHERE id = ? AND user_id = ? """, (heir_id, st.session_state.user['id'])) st.session_state.db.commit() st.success("Heir removed successfully!") return True except Exception as e: st.error(f"Failed to remove heir: {str(e)}") return False def check_account_activity(platform, username): if platform == 'twitter': url = f"https://nitter.net/{username}/rss" # Parse RSS feed for latest activity elif platform == 'facebook': url = f"https://www.facebook.com/{username}" def remove_heir(heir_id: str) -> bool: """Remove an heir and their associated accounts.""" try: cursor = st.session_state.db.cursor() # First check if there are any accounts associated with this heir cursor.execute(""" SELECT COUNT(*) FROM social_accounts WHERE heir_id = ? AND user_id = ? """, (heir_id, st.session_state.user['id'])) account_count = cursor.fetchone()[0] if account_count > 0: st.error(f"Cannot remove heir: {account_count} accounts are still assigned to this heir. Please reassign or remove these accounts first.") return False # If no accounts are associated, remove the heir cursor.execute(""" DELETE FROM heirs WHERE id = ? AND user_id = ? """, (heir_id, st.session_state.user['id'])) st.session_state.db.commit() st.success("Heir removed successfully!") return True except Exception as e: st.error(f"Failed to remove heir: {str(e)}") return False def initialize_database(): """ Initialize database tables needed for the application """ try: cursor = st.session_state.db.cursor() # Create login_attempt_history table instead of login_attempts cursor.execute(""" CREATE TABLE IF NOT EXISTS login_attempt_history ( id TEXT PRIMARY KEY, username TEXT NOT NULL, attempt_time TEXT NOT NULL, ip_address TEXT, success BOOLEAN DEFAULT FALSE ) """) # Create other necessary tables if they don't exist cursor.execute(""" CREATE TABLE IF NOT EXISTS social_accounts ( id TEXT PRIMARY KEY, user_id TEXT NOT NULL, platform TEXT NOT NULL, username TEXT NOT NULL, status TEXT DEFAULT 'active', last_activity TEXT, heir_id TEXT, FOREIGN KEY (user_id) REFERENCES users (id), FOREIGN KEY (heir_id) REFERENCES heirs (id) ) """) st.session_state.db.commit() except Exception as e: logger.error(f"Database initialization failed: {str(e)}") st.error("Failed to initialize database. Please contact support.") def initialize_services(): """Initialize required services.""" if hasattr(st.session_state, 'db'): audit_logger.initialize_with_connection(st.session_state.db) # Main initialization function def initialize_app(): """Initialize the application.""" st.set_page_config( page_title="DIGITAL INHERITANCE SYSTEM", page_icon="🔐", layout="wide" ) if 'db' in st.session_state: initialize_database() if 'page' not in st.session_state: st.session_state.page = 'login' initialize_services()