""" ╔══════════════════════════════════════════════════════════════════════════════╗ ║ ║ ║ AUTHENTICATION PLUGIN for NoahsKI ║ ║ Email Verification System ║ ║ ║ ║ Features: ║ ║ ✓ Email Registration & Verification ║ ║ ✓ Verification Code via Email ║ ║ ✓ Session Management ║ ║ ✓ User Data Storage (JSON) ║ ║ ✓ Password Hashing (bcrypt) ║ ║ ✓ Token-based Authentication ║ ║ ║ ╚══════════════════════════════════════════════════════════════════════════════╝ """ import os import json import time import secrets import hashlib import smtplib import tempfile from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart from pathlib import Path from typing import Dict, Optional, Tuple from dataclasses import dataclass, asdict from datetime import datetime, timedelta import logging logger = logging.getLogger(__name__) # ═══════════════════════════════════════════════════════════════════════════════ # CONFIGURATION # ═══════════════════════════════════════════════════════════════════════════════ class AuthConfig: """Authentication plugin configuration""" # Detect Hugging Face Spaces IS_HF_SPACE = os.getenv('SPACE_ID') is not None # Paths (HF Spaces compatible) if IS_HF_SPACE: BASE_DATA_DIR = Path(tempfile.gettempdir()) / 'noahski_data' else: BASE_DATA_DIR = Path('noahski_data') DATA_DIR = BASE_DATA_DIR / 'auth' USERS_FILE = DATA_DIR / 'users.json' SESSIONS_FILE = DATA_DIR / 'sessions.json' VERIFICATION_FILE = DATA_DIR / 'verifications.json' # Email Settings (SMTP) SMTP_SERVER = os.getenv('SMTP_SERVER', 'smtp.gmail.com') SMTP_PORT = int(os.getenv('SMTP_PORT', 587)) SMTP_USER = os.getenv('SMTP_USER', 'your-email@gmail.com') SMTP_PASSWORD = os.getenv('SMTP_PASSWORD', 'your-app-password') FROM_EMAIL = os.getenv('FROM_EMAIL', 'noreply@noahski.ai') FROM_NAME = os.getenv('FROM_NAME', 'NoahsKI') # Security VERIFICATION_CODE_LENGTH = 6 VERIFICATION_EXPIRY_MINUTES = 15 SESSION_EXPIRY_HOURS = 24 MAX_LOGIN_ATTEMPTS = 5 LOCKOUT_DURATION_MINUTES = 30 # Token TOKEN_LENGTH = 32 # ═══════════════════════════════════════════════════════════════════════════════ # DATA CLASSES # ═══════════════════════════════════════════════════════════════════════════════ @dataclass class User: """User data structure""" email: str password_hash: str created_at: float verified: bool = False last_login: Optional[float] = None login_attempts: int = 0 locked_until: Optional[float] = None username: Optional[str] = None def to_dict(self) -> Dict: return asdict(self) @staticmethod def from_dict(data: Dict) -> 'User': return User(**data) @dataclass class Session: """Session data structure""" token: str email: str created_at: float expires_at: float ip_address: Optional[str] = None user_agent: Optional[str] = None def is_valid(self) -> bool: return time.time() < self.expires_at def to_dict(self) -> Dict: return asdict(self) @staticmethod def from_dict(data: Dict) -> 'Session': return Session(**data) @dataclass class VerificationCode: """Email verification code""" email: str code: str created_at: float expires_at: float attempts: int = 0 def is_valid(self) -> bool: return time.time() < self.expires_at and self.attempts < 3 def to_dict(self) -> Dict: return asdict(self) @staticmethod def from_dict(data: Dict) -> 'VerificationCode': return VerificationCode(**data) # ═══════════════════════════════════════════════════════════════════════════════ # EMAIL SERVICE # ═══════════════════════════════════════════════════════════════════════════════ class EmailService: """Handle email sending""" def __init__(self): self.smtp_server = AuthConfig.SMTP_SERVER self.smtp_port = AuthConfig.SMTP_PORT self.smtp_user = AuthConfig.SMTP_USER self.smtp_password = AuthConfig.SMTP_PASSWORD self.from_email = AuthConfig.FROM_EMAIL self.from_name = AuthConfig.FROM_NAME def send_verification_email(self, to_email: str, code: str) -> bool: """Send verification code email""" try: subject = f"🔐 Dein NoahsKI Verifizierungscode" html_body = f"""

🚀 Willkommen bei NoahsKI!

Danke für deine Registrierung. Hier ist dein Verifizierungscode:

{code}

Dieser Code ist 15 Minuten gültig.

⚠️ Wichtig: Falls du diese Email nicht angefordert hast, ignoriere sie einfach.

""" plain_body = f""" 🚀 Willkommen bei NoahsKI! Danke für deine Registrierung. Hier ist dein Verifizierungscode: {code} Dieser Code ist 15 Minuten gültig. ⚠️ Falls du diese Email nicht angefordert hast, ignoriere sie einfach. © 2026 NoahsKI """ return self._send_email(to_email, subject, html_body, plain_body) except Exception as e: logger.error(f"Failed to send verification email: {e}") return False def send_welcome_email(self, to_email: str, username: str = None) -> bool: """Send welcome email after successful verification""" try: subject = "🎉 Account erfolgreich verifiziert!" name = username or to_email.split('@')[0] html_body = f"""

🎉 Willkommen bei NoahsKI, {name}!

Dein Account wurde erfolgreich verifiziert und ist jetzt aktiv!

Was du jetzt tun kannst:

✅ Mit NoahsKI chatten
🎨 Bilder generieren
🌐 Web-Recherche nutzen
📚 Autonomes Learning System
💡 Code-Generierung

Viel Spaß mit NoahsKI!

""" plain_body = f""" 🎉 Willkommen bei NoahsKI, {name}! Dein Account wurde erfolgreich verifiziert und ist jetzt aktiv! Was du jetzt tun kannst: ✅ Mit NoahsKI chatten 🎨 Bilder generieren 🌐 Web-Recherche nutzen 📚 Autonomes Learning System 💡 Code-Generierung Viel Spaß mit NoahsKI! © 2026 NoahsKI """ return self._send_email(to_email, subject, html_body, plain_body) except Exception as e: logger.error(f"Failed to send welcome email: {e}") return False def _send_email(self, to_email: str, subject: str, html_body: str, plain_body: str) -> bool: """Internal method to send email via SMTP""" try: # Create message msg = MIMEMultipart('alternative') msg['Subject'] = subject msg['From'] = f"{self.from_name} <{self.from_email}>" msg['To'] = to_email # Attach both plain and HTML versions part1 = MIMEText(plain_body, 'plain', 'utf-8') part2 = MIMEText(html_body, 'html', 'utf-8') msg.attach(part1) msg.attach(part2) # Send email with smtplib.SMTP(self.smtp_server, self.smtp_port) as server: server.starttls() server.login(self.smtp_user, self.smtp_password) server.send_message(msg) logger.info(f"✓ Email sent to {to_email}") return True except Exception as e: logger.error(f"SMTP error: {e}") return False # ═══════════════════════════════════════════════════════════════════════════════ # AUTH PLUGIN MAIN CLASS # ═══════════════════════════════════════════════════════════════════════════════ class AuthPlugin: """Main authentication plugin""" def __init__(self): logger.info("🔐 Initializing Authentication Plugin...") # Create directories AuthConfig.DATA_DIR.mkdir(parents=True, exist_ok=True) # Initialize services self.email_service = EmailService() # Load data self.users: Dict[str, User] = self._load_users() self.sessions: Dict[str, Session] = self._load_sessions() self.verifications: Dict[str, VerificationCode] = self._load_verifications() logger.info(f"✅ Auth Plugin initialized ({len(self.users)} users)") # ───────────────────────────────────────────────────────────────────────── # DATA PERSISTENCE # ───────────────────────────────────────────────────────────────────────── def _load_users(self) -> Dict[str, User]: """Load users from JSON""" try: if AuthConfig.USERS_FILE.exists(): with open(AuthConfig.USERS_FILE, 'r', encoding='utf-8') as f: data = json.load(f) return {email: User.from_dict(u) for email, u in data.items()} except Exception as e: logger.error(f"Failed to load users: {e}") return {} def _save_users(self): """Save users to JSON""" try: # Ensure directory exists AuthConfig.DATA_DIR.mkdir(parents=True, exist_ok=True) data = {email: u.to_dict() for email, u in self.users.items()} with open(AuthConfig.USERS_FILE, 'w', encoding='utf-8') as f: json.dump(data, f, indent=2, ensure_ascii=False) except Exception as e: logger.warning(f"⚠️ Failed to save users: {e}") def _load_sessions(self) -> Dict[str, Session]: """Load sessions from JSON""" try: if AuthConfig.SESSIONS_FILE.exists(): with open(AuthConfig.SESSIONS_FILE, 'r', encoding='utf-8') as f: data = json.load(f) sessions = {token: Session.from_dict(s) for token, s in data.items()} # Clean expired sessions return {t: s for t, s in sessions.items() if s.is_valid()} except Exception as e: logger.error(f"Failed to load sessions: {e}") return {} def _save_sessions(self): """Save sessions to JSON""" try: # Ensure directory exists AuthConfig.DATA_DIR.mkdir(parents=True, exist_ok=True) # Only save valid sessions valid_sessions = {t: s for t, s in self.sessions.items() if s.is_valid()} data = {token: s.to_dict() for token, s in valid_sessions.items()} with open(AuthConfig.SESSIONS_FILE, 'w', encoding='utf-8') as f: json.dump(data, f, indent=2, ensure_ascii=False) except Exception as e: logger.warning(f"⚠️ Failed to save sessions: {e}") def _load_verifications(self) -> Dict[str, VerificationCode]: """Load verification codes from JSON""" try: if AuthConfig.VERIFICATION_FILE.exists(): with open(AuthConfig.VERIFICATION_FILE, 'r', encoding='utf-8') as f: data = json.load(f) codes = {email: VerificationCode.from_dict(v) for email, v in data.items()} # Clean expired codes return {e: v for e, v in codes.items() if v.is_valid()} except Exception as e: logger.error(f"Failed to load verifications: {e}") return {} def _save_verifications(self): """Save verification codes to JSON""" try: # Ensure directory exists AuthConfig.DATA_DIR.mkdir(parents=True, exist_ok=True) # Only save valid codes valid_codes = {e: v for e, v in self.verifications.items() if v.is_valid()} data = {email: v.to_dict() for email, v in valid_codes.items()} with open(AuthConfig.VERIFICATION_FILE, 'w', encoding='utf-8') as f: json.dump(data, f, indent=2, ensure_ascii=False) except Exception as e: logger.warning(f"⚠️ Failed to save verifications: {e}") # ───────────────────────────────────────────────────────────────────────── # PASSWORD HASHING # ───────────────────────────────────────────────────────────────────────── def _hash_password(self, password: str) -> str: """Hash password with salt""" salt = secrets.token_hex(16) pwd_hash = hashlib.pbkdf2_hmac('sha256', password.encode(), salt.encode(), 100000) return f"{salt}${pwd_hash.hex()}" def _verify_password(self, password: str, password_hash: str) -> bool: """Verify password against hash""" try: salt, pwd_hash = password_hash.split('$') computed_hash = hashlib.pbkdf2_hmac('sha256', password.encode(), salt.encode(), 100000) return computed_hash.hex() == pwd_hash except: return False # ───────────────────────────────────────────────────────────────────────── # VERIFICATION CODE GENERATION # ───────────────────────────────────────────────────────────────────────── def _generate_verification_code(self) -> str: """Generate 6-digit verification code""" return ''.join([str(secrets.randbelow(10)) for _ in range(AuthConfig.VERIFICATION_CODE_LENGTH)]) # ───────────────────────────────────────────────────────────────────────── # PUBLIC API METHODS # ───────────────────────────────────────────────────────────────────────── def register(self, email: str, password: str, username: str = None) -> Tuple[bool, str]: """ Register new user and send verification email Returns: (success, message) """ try: # Validate email format if '@' not in email or '.' not in email.split('@')[1]: return False, "Ungültige Email-Adresse" # Check if user exists if email in self.users: return False, "Email bereits registriert" # Validate password if len(password) < 8: return False, "Passwort muss mindestens 8 Zeichen lang sein" # Create user (unverified) user = User( email=email, password_hash=self._hash_password(password), created_at=time.time(), verified=False, username=username ) self.users[email] = user self._save_users() # Generate verification code code = self._generate_verification_code() expires_at = time.time() + (AuthConfig.VERIFICATION_EXPIRY_MINUTES * 60) verification = VerificationCode( email=email, code=code, created_at=time.time(), expires_at=expires_at ) self.verifications[email] = verification self._save_verifications() # Send verification email email_sent = self.email_service.send_verification_email(email, code) if email_sent: logger.info(f"✓ User registered: {email}") return True, f"Registrierung erfolgreich! Verifizierungscode wurde an {email} gesendet." else: return True, "Registrierung erfolgreich, aber Email konnte nicht gesendet werden. Bitte kontaktiere den Support." except Exception as e: logger.error(f"Registration error: {e}") return False, f"Fehler bei der Registrierung: {str(e)}" def verify_email(self, email: str, code: str) -> Tuple[bool, str]: """ Verify email with code Returns: (success, message) """ try: # Check if verification exists if email not in self.verifications: return False, "Kein Verifizierungscode gefunden" verification = self.verifications[email] # Check if expired if not verification.is_valid(): return False, "Verifizierungscode abgelaufen oder zu viele Versuche" # Increment attempts verification.attempts += 1 self._save_verifications() # Check code if verification.code != code: return False, f"Falscher Code (Versuch {verification.attempts}/3)" # Mark user as verified if email in self.users: self.users[email].verified = True self._save_users() # Remove verification code del self.verifications[email] self._save_verifications() # Send welcome email self.email_service.send_welcome_email(email, self.users[email].username) logger.info(f"✓ Email verified: {email}") return True, "Email erfolgreich verifiziert! Du kannst dich jetzt anmelden." else: return False, "User nicht gefunden" except Exception as e: logger.error(f"Verification error: {e}") return False, f"Fehler bei der Verifizierung: {str(e)}" def resend_verification(self, email: str) -> Tuple[bool, str]: """Resend verification code""" try: if email not in self.users: return False, "Email nicht registriert" if self.users[email].verified: return False, "Email bereits verifiziert" # Generate new code code = self._generate_verification_code() expires_at = time.time() + (AuthConfig.VERIFICATION_EXPIRY_MINUTES * 60) verification = VerificationCode( email=email, code=code, created_at=time.time(), expires_at=expires_at ) self.verifications[email] = verification self._save_verifications() # Send email email_sent = self.email_service.send_verification_email(email, code) if email_sent: return True, "Neuer Verifizierungscode wurde gesendet" else: return False, "Email konnte nicht gesendet werden" except Exception as e: logger.error(f"Resend error: {e}") return False, f"Fehler: {str(e)}" def login(self, email: str, password: str, ip_address: str = None, user_agent: str = None) -> Tuple[bool, str, Optional[str]]: """ Login user Returns: (success, message, token) """ try: # Check if user exists if email not in self.users: return False, "Email oder Passwort falsch", None user = self.users[email] # Check if account is locked if user.locked_until and time.time() < user.locked_until: remaining = int((user.locked_until - time.time()) / 60) return False, f"Account gesperrt. Versuche es in {remaining} Minuten erneut.", None # Check if verified if not user.verified: return False, "Email noch nicht verifiziert", None # Verify password if not self._verify_password(password, user.password_hash): user.login_attempts += 1 # Lock account after max attempts if user.login_attempts >= AuthConfig.MAX_LOGIN_ATTEMPTS: user.locked_until = time.time() + (AuthConfig.LOCKOUT_DURATION_MINUTES * 60) self._save_users() return False, f"Zu viele fehlgeschlagene Versuche. Account für {AuthConfig.LOCKOUT_DURATION_MINUTES} Minuten gesperrt.", None self._save_users() remaining = AuthConfig.MAX_LOGIN_ATTEMPTS - user.login_attempts return False, f"Email oder Passwort falsch ({remaining} Versuche übrig)", None # Reset login attempts on successful login user.login_attempts = 0 user.last_login = time.time() user.locked_until = None self._save_users() # Create session token = secrets.token_urlsafe(AuthConfig.TOKEN_LENGTH) expires_at = time.time() + (AuthConfig.SESSION_EXPIRY_HOURS * 3600) session = Session( token=token, email=email, created_at=time.time(), expires_at=expires_at, ip_address=ip_address, user_agent=user_agent ) self.sessions[token] = session self._save_sessions() logger.info(f"✓ User logged in: {email}") return True, "Login erfolgreich", token except Exception as e: logger.error(f"Login error: {e}") return False, f"Fehler beim Login: {str(e)}", None def logout(self, token: str) -> Tuple[bool, str]: """Logout user""" try: if token in self.sessions: email = self.sessions[token].email del self.sessions[token] self._save_sessions() logger.info(f"✓ User logged out: {email}") return True, "Logout erfolgreich" else: return False, "Session nicht gefunden" except Exception as e: logger.error(f"Logout error: {e}") return False, f"Fehler beim Logout: {str(e)}" def validate_token(self, token: str) -> Tuple[bool, Optional[str]]: """ Validate session token Returns: (valid, email) """ if token in self.sessions: session = self.sessions[token] if session.is_valid(): return True, session.email else: # Clean up expired session del self.sessions[token] self._save_sessions() return False, None def get_user_info(self, email: str) -> Optional[Dict]: """Get user information (without sensitive data)""" if email in self.users: user = self.users[email] return { 'email': user.email, 'username': user.username, 'created_at': user.created_at, 'verified': user.verified, 'last_login': user.last_login } return None def get_stats(self) -> Dict: """Get authentication statistics""" return { 'total_users': len(self.users), 'verified_users': sum(1 for u in self.users.values() if u.verified), 'active_sessions': len([s for s in self.sessions.values() if s.is_valid()]), 'pending_verifications': len([v for v in self.verifications.values() if v.is_valid()]) } # ═══════════════════════════════════════════════════════════════════════════════ # PLUGIN METADATA # ═══════════════════════════════════════════════════════════════════════════════ PLUGIN_INFO = { 'name': 'auth_plugin', 'version': '1.0.0', 'author': 'NoahsKI Team', 'description': 'Email-based authentication with verification', 'endpoints': [ '/auth/register', '/auth/verify', '/auth/resend', '/auth/login', '/auth/logout', '/auth/validate', '/auth/user', '/auth/stats' ] } __all__ = ['AuthPlugin', 'AuthConfig', 'PLUGIN_INFO']