import time import requests from bs4 import BeautifulSoup from cryptography.fernet import Fernet import bcrypt import jwt import pyotp import logging from datetime import datetime, timedelta from typing import Dict, Optional, List import sqlite3 import threading import schedule from email.mime.text import MIMEText import smtplib import os from prometheus_client import Counter, Histogram class MonitoringService: _instance = None def __new__(cls): if cls._instance is None: cls._instance = super(MonitoringService, cls).__new__(cls) cls._instance.initialized = False return cls._instance def __init__(self): if not self.initialized: self.login_attempts_metric = Counter( 'app_login_attempts_total', 'Total login attempts', ['status'] ) self.account_checks_metric = Counter( 'app_account_checks_total', 'Total account activity checks', ['platform'] ) self.response_time_metric = Histogram( 'app_response_time_seconds', 'Response time in seconds', ['endpoint'] ) self.initialized = True def check_account_activity(self, account_id): activity_data = self._get_activity_data(account_id) is_unusual = self.ai_monitor.detect_unusual_activity(activity_data) if is_unusual: self._notify_unusual_activity(account_id) class CryptoService: def __init__(self, key: str): self.fernet = Fernet(key.encode() if isinstance(key, str) else key) def encrypt(self, data: str) -> bytes: return self.fernet.encrypt(data.encode()) def decrypt(self, token: bytes) -> str: return self.fernet.decrypt(token).decode() def hash_password(self, password: str) -> bytes: return bcrypt.hashpw(password.encode(), bcrypt.gensalt()) def verify_password(self, password: str, hashed: bytes) -> bool: return bcrypt.checkpw(password.encode(), hashed) def generate_token(self, data: dict, expires_delta: timedelta) -> str: expire = datetime.utcnow() + expires_delta data.update({"exp": expire}) return jwt.encode(data, os.getenv("JWT_SECRET"), algorithm="HS256") class EmailService: def __init__(self): self.smtp_server = os.getenv("SMTP_SERVER", "smtp.gmail.com") self.smtp_port = int(os.getenv("SMTP_PORT", "587")) self.username = os.getenv("SMTP_USERNAME") self.password = os.getenv("SMTP_PASSWORD") self.sender = os.getenv("SENDER_EMAIL") def send_email(self, to_email: str, subject: str, body: str) -> bool: msg = MIMEText(body, 'html') msg['Subject'] = subject msg['From'] = self.sender msg['To'] = to_email try: with smtplib.SMTP(self.smtp_server, self.smtp_port) as server: server.starttls() server.login(self.username, self.password) server.send_message(msg) return True except Exception as e: logging.error(f"Failed to send email: {str(e)}") return False class MonitoringService: def __init__(self): self.login_attempts = Counter('login_attempts_total', 'Total login attempts') self.account_checks = Counter('account_checks_total', 'Total account activity checks') self.response_time = Histogram('response_time_seconds', 'Response time in seconds') def check_facebook_activity(self, username: str) -> dict: self.account_checks.inc() try: url = f"https://www.facebook.com/{username}" response = requests.get(url, timeout=10) soup = BeautifulSoup(response.text, 'html.parser') profile_exists = not ("Page Not Found" in soup.text) last_post = soup.find(class_="timestampContent") return { 'exists': profile_exists, 'last_activity': datetime.now().isoformat() if last_post else None, 'status': 'active' if profile_exists else 'inactive' } except Exception as e: logging.error(f"Facebook check error: {str(e)}") return {'exists': False, 'last_activity': None, 'status': 'error'} def check_twitter_activity(self, username: str) -> dict: self.account_checks.inc() try: url = f"https://twitter.com/{username}" response = requests.get(url, timeout=10) return { 'exists': response.status_code == 200, 'last_activity': datetime.now().isoformat() if response.status_code == 200 else None, 'status': 'active' if response.status_code == 200 else 'inactive' } except Exception as e: logging.error(f"Twitter check error: {str(e)}") return {'exists': False, 'last_activity': None, 'status': 'error'} class BackupService: def __init__(self, db_path: str): self.db_path = db_path self._setup_scheduler() def _setup_scheduler(self): schedule.every().day.at("00:00").do(self._backup_database) thread = threading.Thread(target=self._run_scheduler, daemon=True) thread.start() def _run_scheduler(self): while True: schedule.run_pending() time.sleep(3600) def _backup_database(self): backup_dir = "backups" os.makedirs(backup_dir, exist_ok=True) backup_path = os.path.join( backup_dir, f"digital_heir_{datetime.now().strftime('%Y%m%d_%H%M')}.db" ) try: with sqlite3.connect(self.db_path) as src, sqlite3.connect(backup_path) as dst: src.backup(dst) logging.info(f"Database backup created: {backup_path}") except Exception as e: logging.error(f"Backup failed: {str(e)}") class ActivityMonitor: def __init__(self, db_conn: sqlite3.Connection, monitoring_service: MonitoringService): self.db = db_conn self.monitor = monitoring_service self._setup_scheduler() def _setup_scheduler(self): schedule.every(12).hours.do(self._check_accounts) thread = threading.Thread(target=self._run_scheduler, daemon=True) thread.start() def _run_scheduler(self): while True: schedule.run_pending() time.sleep(3600) def _check_accounts(self): cursor = self.db.cursor() cursor.execute(""" SELECT id, platform, username, inactivity_threshold FROM social_accounts WHERE status = 'active' """) accounts = cursor.fetchall() for account in accounts: activity = self._check_platform_activity(account[1], account[2]) self._update_account_status(account[0], activity) def _check_platform_activity(self, platform: str, username: str) -> dict: if platform.lower() == 'facebook': return self.monitor.check_facebook_activity(username) elif platform.lower() == 'twitter': return self.monitor.check_twitter_activity(username) return {'exists': False, 'last_activity': None, 'status': 'error'} def _update_account_status(self, account_id: str, activity: dict): cursor = self.db.cursor() cursor.execute(""" UPDATE social_accounts SET last_check = ?, last_activity = ?, status = ? WHERE id = ? """, ( datetime.now().isoformat(), activity['last_activity'], activity['status'], account_id )) self.db.commit()