Spaces:
Build error
Build error
| import time | |
| import requests | |
| from bs4 import BeautifulSoup | |
| from cryptography.fernet import Fernet | |
| import bcrypt | |
| import jwt | |
| import pyotp | |
| import logging | |
| from datetime import datetime, timedelta | |
| from typing import Dict, Optional, List | |
| import sqlite3 | |
| import threading | |
| import schedule | |
| from email.mime.text import MIMEText | |
| import smtplib | |
| import os | |
| from prometheus_client import Counter, Histogram | |
| class MonitoringService: | |
| _instance = None | |
| def __new__(cls): | |
| if cls._instance is None: | |
| cls._instance = super(MonitoringService, cls).__new__(cls) | |
| cls._instance.initialized = False | |
| return cls._instance | |
| def __init__(self): | |
| if not self.initialized: | |
| self.login_attempts_metric = Counter( | |
| 'app_login_attempts_total', | |
| 'Total login attempts', | |
| ['status'] | |
| ) | |
| self.account_checks_metric = Counter( | |
| 'app_account_checks_total', | |
| 'Total account activity checks', | |
| ['platform'] | |
| ) | |
| self.response_time_metric = Histogram( | |
| 'app_response_time_seconds', | |
| 'Response time in seconds', | |
| ['endpoint'] | |
| ) | |
| self.initialized = True | |
| def check_account_activity(self, account_id): | |
| activity_data = self._get_activity_data(account_id) | |
| is_unusual = self.ai_monitor.detect_unusual_activity(activity_data) | |
| if is_unusual: | |
| self._notify_unusual_activity(account_id) | |
| class CryptoService: | |
| def __init__(self, key: str): | |
| self.fernet = Fernet(key.encode() if isinstance(key, str) else key) | |
| def encrypt(self, data: str) -> bytes: | |
| return self.fernet.encrypt(data.encode()) | |
| def decrypt(self, token: bytes) -> str: | |
| return self.fernet.decrypt(token).decode() | |
| def hash_password(self, password: str) -> bytes: | |
| return bcrypt.hashpw(password.encode(), bcrypt.gensalt()) | |
| def verify_password(self, password: str, hashed: bytes) -> bool: | |
| return bcrypt.checkpw(password.encode(), hashed) | |
| def generate_token(self, data: dict, expires_delta: timedelta) -> str: | |
| expire = datetime.utcnow() + expires_delta | |
| data.update({"exp": expire}) | |
| return jwt.encode(data, os.getenv("JWT_SECRET"), algorithm="HS256") | |
| class EmailService: | |
| def __init__(self): | |
| self.smtp_server = os.getenv("SMTP_SERVER", "smtp.gmail.com") | |
| self.smtp_port = int(os.getenv("SMTP_PORT", "587")) | |
| self.username = os.getenv("SMTP_USERNAME") | |
| self.password = os.getenv("SMTP_PASSWORD") | |
| self.sender = os.getenv("SENDER_EMAIL") | |
| def send_email(self, to_email: str, subject: str, body: str) -> bool: | |
| msg = MIMEText(body, 'html') | |
| msg['Subject'] = subject | |
| msg['From'] = self.sender | |
| msg['To'] = to_email | |
| try: | |
| with smtplib.SMTP(self.smtp_server, self.smtp_port) as server: | |
| server.starttls() | |
| server.login(self.username, self.password) | |
| server.send_message(msg) | |
| return True | |
| except Exception as e: | |
| logging.error(f"Failed to send email: {str(e)}") | |
| return False | |
| class MonitoringService: | |
| def __init__(self): | |
| self.login_attempts = Counter('login_attempts_total', 'Total login attempts') | |
| self.account_checks = Counter('account_checks_total', 'Total account activity checks') | |
| self.response_time = Histogram('response_time_seconds', 'Response time in seconds') | |
| def check_facebook_activity(self, username: str) -> dict: | |
| self.account_checks.inc() | |
| try: | |
| url = f"https://www.facebook.com/{username}" | |
| response = requests.get(url, timeout=10) | |
| soup = BeautifulSoup(response.text, 'html.parser') | |
| profile_exists = not ("Page Not Found" in soup.text) | |
| last_post = soup.find(class_="timestampContent") | |
| return { | |
| 'exists': profile_exists, | |
| 'last_activity': datetime.now().isoformat() if last_post else None, | |
| 'status': 'active' if profile_exists else 'inactive' | |
| } | |
| except Exception as e: | |
| logging.error(f"Facebook check error: {str(e)}") | |
| return {'exists': False, 'last_activity': None, 'status': 'error'} | |
| def check_twitter_activity(self, username: str) -> dict: | |
| self.account_checks.inc() | |
| try: | |
| url = f"https://twitter.com/{username}" | |
| response = requests.get(url, timeout=10) | |
| return { | |
| 'exists': response.status_code == 200, | |
| 'last_activity': datetime.now().isoformat() if response.status_code == 200 else None, | |
| 'status': 'active' if response.status_code == 200 else 'inactive' | |
| } | |
| except Exception as e: | |
| logging.error(f"Twitter check error: {str(e)}") | |
| return {'exists': False, 'last_activity': None, 'status': 'error'} | |
| class BackupService: | |
| def __init__(self, db_path: str): | |
| self.db_path = db_path | |
| self._setup_scheduler() | |
| def _setup_scheduler(self): | |
| schedule.every().day.at("00:00").do(self._backup_database) | |
| thread = threading.Thread(target=self._run_scheduler, daemon=True) | |
| thread.start() | |
| def _run_scheduler(self): | |
| while True: | |
| schedule.run_pending() | |
| time.sleep(3600) | |
| def _backup_database(self): | |
| backup_dir = "backups" | |
| os.makedirs(backup_dir, exist_ok=True) | |
| backup_path = os.path.join( | |
| backup_dir, | |
| f"digital_heir_{datetime.now().strftime('%Y%m%d_%H%M')}.db" | |
| ) | |
| try: | |
| with sqlite3.connect(self.db_path) as src, sqlite3.connect(backup_path) as dst: | |
| src.backup(dst) | |
| logging.info(f"Database backup created: {backup_path}") | |
| except Exception as e: | |
| logging.error(f"Backup failed: {str(e)}") | |
| class ActivityMonitor: | |
| def __init__(self, db_conn: sqlite3.Connection, monitoring_service: MonitoringService): | |
| self.db = db_conn | |
| self.monitor = monitoring_service | |
| self._setup_scheduler() | |
| def _setup_scheduler(self): | |
| schedule.every(12).hours.do(self._check_accounts) | |
| thread = threading.Thread(target=self._run_scheduler, daemon=True) | |
| thread.start() | |
| def _run_scheduler(self): | |
| while True: | |
| schedule.run_pending() | |
| time.sleep(3600) | |
| def _check_accounts(self): | |
| cursor = self.db.cursor() | |
| cursor.execute(""" | |
| SELECT id, platform, username, inactivity_threshold | |
| FROM social_accounts | |
| WHERE status = 'active' | |
| """) | |
| accounts = cursor.fetchall() | |
| for account in accounts: | |
| activity = self._check_platform_activity(account[1], account[2]) | |
| self._update_account_status(account[0], activity) | |
| def _check_platform_activity(self, platform: str, username: str) -> dict: | |
| if platform.lower() == 'facebook': | |
| return self.monitor.check_facebook_activity(username) | |
| elif platform.lower() == 'twitter': | |
| return self.monitor.check_twitter_activity(username) | |
| return {'exists': False, 'last_activity': None, 'status': 'error'} | |
| def _update_account_status(self, account_id: str, activity: dict): | |
| cursor = self.db.cursor() | |
| cursor.execute(""" | |
| UPDATE social_accounts | |
| SET last_check = ?, last_activity = ?, status = ? | |
| WHERE id = ? | |
| """, ( | |
| datetime.now().isoformat(), | |
| activity['last_activity'], | |
| activity['status'], | |
| account_id | |
| )) | |
| self.db.commit() |