shaheerawan3's picture
Update services.py
5a968ca verified
import time
import requests
from bs4 import BeautifulSoup
from cryptography.fernet import Fernet
import bcrypt
import jwt
import pyotp
import logging
from datetime import datetime, timedelta
from typing import Dict, Optional, List
import sqlite3
import threading
import schedule
from email.mime.text import MIMEText
import smtplib
import os
from prometheus_client import Counter, Histogram
class MonitoringService:
_instance = None
def __new__(cls):
if cls._instance is None:
cls._instance = super(MonitoringService, cls).__new__(cls)
cls._instance.initialized = False
return cls._instance
def __init__(self):
if not self.initialized:
self.login_attempts_metric = Counter(
'app_login_attempts_total',
'Total login attempts',
['status']
)
self.account_checks_metric = Counter(
'app_account_checks_total',
'Total account activity checks',
['platform']
)
self.response_time_metric = Histogram(
'app_response_time_seconds',
'Response time in seconds',
['endpoint']
)
self.initialized = True
def check_account_activity(self, account_id):
activity_data = self._get_activity_data(account_id)
is_unusual = self.ai_monitor.detect_unusual_activity(activity_data)
if is_unusual:
self._notify_unusual_activity(account_id)
class CryptoService:
def __init__(self, key: str):
self.fernet = Fernet(key.encode() if isinstance(key, str) else key)
def encrypt(self, data: str) -> bytes:
return self.fernet.encrypt(data.encode())
def decrypt(self, token: bytes) -> str:
return self.fernet.decrypt(token).decode()
def hash_password(self, password: str) -> bytes:
return bcrypt.hashpw(password.encode(), bcrypt.gensalt())
def verify_password(self, password: str, hashed: bytes) -> bool:
return bcrypt.checkpw(password.encode(), hashed)
def generate_token(self, data: dict, expires_delta: timedelta) -> str:
expire = datetime.utcnow() + expires_delta
data.update({"exp": expire})
return jwt.encode(data, os.getenv("JWT_SECRET"), algorithm="HS256")
class EmailService:
def __init__(self):
self.smtp_server = os.getenv("SMTP_SERVER", "smtp.gmail.com")
self.smtp_port = int(os.getenv("SMTP_PORT", "587"))
self.username = os.getenv("SMTP_USERNAME")
self.password = os.getenv("SMTP_PASSWORD")
self.sender = os.getenv("SENDER_EMAIL")
def send_email(self, to_email: str, subject: str, body: str) -> bool:
msg = MIMEText(body, 'html')
msg['Subject'] = subject
msg['From'] = self.sender
msg['To'] = to_email
try:
with smtplib.SMTP(self.smtp_server, self.smtp_port) as server:
server.starttls()
server.login(self.username, self.password)
server.send_message(msg)
return True
except Exception as e:
logging.error(f"Failed to send email: {str(e)}")
return False
class MonitoringService:
def __init__(self):
self.login_attempts = Counter('login_attempts_total', 'Total login attempts')
self.account_checks = Counter('account_checks_total', 'Total account activity checks')
self.response_time = Histogram('response_time_seconds', 'Response time in seconds')
def check_facebook_activity(self, username: str) -> dict:
self.account_checks.inc()
try:
url = f"https://www.facebook.com/{username}"
response = requests.get(url, timeout=10)
soup = BeautifulSoup(response.text, 'html.parser')
profile_exists = not ("Page Not Found" in soup.text)
last_post = soup.find(class_="timestampContent")
return {
'exists': profile_exists,
'last_activity': datetime.now().isoformat() if last_post else None,
'status': 'active' if profile_exists else 'inactive'
}
except Exception as e:
logging.error(f"Facebook check error: {str(e)}")
return {'exists': False, 'last_activity': None, 'status': 'error'}
def check_twitter_activity(self, username: str) -> dict:
self.account_checks.inc()
try:
url = f"https://twitter.com/{username}"
response = requests.get(url, timeout=10)
return {
'exists': response.status_code == 200,
'last_activity': datetime.now().isoformat() if response.status_code == 200 else None,
'status': 'active' if response.status_code == 200 else 'inactive'
}
except Exception as e:
logging.error(f"Twitter check error: {str(e)}")
return {'exists': False, 'last_activity': None, 'status': 'error'}
class BackupService:
def __init__(self, db_path: str):
self.db_path = db_path
self._setup_scheduler()
def _setup_scheduler(self):
schedule.every().day.at("00:00").do(self._backup_database)
thread = threading.Thread(target=self._run_scheduler, daemon=True)
thread.start()
def _run_scheduler(self):
while True:
schedule.run_pending()
time.sleep(3600)
def _backup_database(self):
backup_dir = "backups"
os.makedirs(backup_dir, exist_ok=True)
backup_path = os.path.join(
backup_dir,
f"digital_heir_{datetime.now().strftime('%Y%m%d_%H%M')}.db"
)
try:
with sqlite3.connect(self.db_path) as src, sqlite3.connect(backup_path) as dst:
src.backup(dst)
logging.info(f"Database backup created: {backup_path}")
except Exception as e:
logging.error(f"Backup failed: {str(e)}")
class ActivityMonitor:
def __init__(self, db_conn: sqlite3.Connection, monitoring_service: MonitoringService):
self.db = db_conn
self.monitor = monitoring_service
self._setup_scheduler()
def _setup_scheduler(self):
schedule.every(12).hours.do(self._check_accounts)
thread = threading.Thread(target=self._run_scheduler, daemon=True)
thread.start()
def _run_scheduler(self):
while True:
schedule.run_pending()
time.sleep(3600)
def _check_accounts(self):
cursor = self.db.cursor()
cursor.execute("""
SELECT id, platform, username, inactivity_threshold
FROM social_accounts
WHERE status = 'active'
""")
accounts = cursor.fetchall()
for account in accounts:
activity = self._check_platform_activity(account[1], account[2])
self._update_account_status(account[0], activity)
def _check_platform_activity(self, platform: str, username: str) -> dict:
if platform.lower() == 'facebook':
return self.monitor.check_facebook_activity(username)
elif platform.lower() == 'twitter':
return self.monitor.check_twitter_activity(username)
return {'exists': False, 'last_activity': None, 'status': 'error'}
def _update_account_status(self, account_id: str, activity: dict):
cursor = self.db.cursor()
cursor.execute("""
UPDATE social_accounts
SET last_check = ?, last_activity = ?, status = ?
WHERE id = ?
""", (
datetime.now().isoformat(),
activity['last_activity'],
activity['status'],
account_id
))
self.db.commit()