| | """
|
| | ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
|
| | β β
|
| | β AUTHENTICATION PLUGIN for NoahsKI β
|
| | β Email Verification System β
|
| | β β
|
| | β Features: β
|
| | β β Email Registration & Verification β
|
| | β β Verification Code via Email β
|
| | β β Session Management β
|
| | β β User Data Storage (JSON) β
|
| | β β Password Hashing (bcrypt) β
|
| | β β Token-based Authentication β
|
| | β β
|
| | ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
|
| | """
|
| |
|
| | import os
|
| | import json
|
| | import time
|
| | import secrets
|
| | import hashlib
|
| | import smtplib
|
| | import tempfile
|
| | from email.mime.text import MIMEText
|
| | from email.mime.multipart import MIMEMultipart
|
| | from pathlib import Path
|
| | from typing import Dict, Optional, Tuple
|
| | from dataclasses import dataclass, asdict
|
| | from datetime import datetime, timedelta
|
| | import logging
|
| |
|
| | logger = logging.getLogger(__name__)
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| | class AuthConfig:
|
| | """Authentication plugin configuration"""
|
| |
|
| |
|
| | IS_HF_SPACE = os.getenv('SPACE_ID') is not None
|
| |
|
| |
|
| | if IS_HF_SPACE:
|
| | BASE_DATA_DIR = Path(tempfile.gettempdir()) / 'noahski_data'
|
| | else:
|
| | BASE_DATA_DIR = Path('noahski_data')
|
| |
|
| | DATA_DIR = BASE_DATA_DIR / 'auth'
|
| | USERS_FILE = DATA_DIR / 'users.json'
|
| | SESSIONS_FILE = DATA_DIR / 'sessions.json'
|
| | VERIFICATION_FILE = DATA_DIR / 'verifications.json'
|
| |
|
| |
|
| | SMTP_SERVER = os.getenv('SMTP_SERVER', 'smtp.gmail.com')
|
| | SMTP_PORT = int(os.getenv('SMTP_PORT', 587))
|
| | SMTP_USER = os.getenv('SMTP_USER', 'your-email@gmail.com')
|
| | SMTP_PASSWORD = os.getenv('SMTP_PASSWORD', 'your-app-password')
|
| | FROM_EMAIL = os.getenv('FROM_EMAIL', 'noreply@noahski.ai')
|
| | FROM_NAME = os.getenv('FROM_NAME', 'NoahsKI')
|
| |
|
| |
|
| | VERIFICATION_CODE_LENGTH = 6
|
| | VERIFICATION_EXPIRY_MINUTES = 15
|
| | SESSION_EXPIRY_HOURS = 24
|
| | MAX_LOGIN_ATTEMPTS = 5
|
| | LOCKOUT_DURATION_MINUTES = 30
|
| |
|
| |
|
| | TOKEN_LENGTH = 32
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| | @dataclass
|
| | class User:
|
| | """User data structure"""
|
| | email: str
|
| | password_hash: str
|
| | created_at: float
|
| | verified: bool = False
|
| | last_login: Optional[float] = None
|
| | login_attempts: int = 0
|
| | locked_until: Optional[float] = None
|
| | username: Optional[str] = None
|
| |
|
| | def to_dict(self) -> Dict:
|
| | return asdict(self)
|
| |
|
| | @staticmethod
|
| | def from_dict(data: Dict) -> 'User':
|
| | return User(**data)
|
| |
|
| |
|
| | @dataclass
|
| | class Session:
|
| | """Session data structure"""
|
| | token: str
|
| | email: str
|
| | created_at: float
|
| | expires_at: float
|
| | ip_address: Optional[str] = None
|
| | user_agent: Optional[str] = None
|
| |
|
| | def is_valid(self) -> bool:
|
| | return time.time() < self.expires_at
|
| |
|
| | def to_dict(self) -> Dict:
|
| | return asdict(self)
|
| |
|
| | @staticmethod
|
| | def from_dict(data: Dict) -> 'Session':
|
| | return Session(**data)
|
| |
|
| |
|
| | @dataclass
|
| | class VerificationCode:
|
| | """Email verification code"""
|
| | email: str
|
| | code: str
|
| | created_at: float
|
| | expires_at: float
|
| | attempts: int = 0
|
| |
|
| | def is_valid(self) -> bool:
|
| | return time.time() < self.expires_at and self.attempts < 3
|
| |
|
| | def to_dict(self) -> Dict:
|
| | return asdict(self)
|
| |
|
| | @staticmethod
|
| | def from_dict(data: Dict) -> 'VerificationCode':
|
| | return VerificationCode(**data)
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| | class EmailService:
|
| | """Handle email sending"""
|
| |
|
| | def __init__(self):
|
| | self.smtp_server = AuthConfig.SMTP_SERVER
|
| | self.smtp_port = AuthConfig.SMTP_PORT
|
| | self.smtp_user = AuthConfig.SMTP_USER
|
| | self.smtp_password = AuthConfig.SMTP_PASSWORD
|
| | self.from_email = AuthConfig.FROM_EMAIL
|
| | self.from_name = AuthConfig.FROM_NAME
|
| |
|
| | def send_verification_email(self, to_email: str, code: str) -> bool:
|
| | """Send verification code email"""
|
| | try:
|
| | subject = f"π Dein NoahsKI Verifizierungscode"
|
| |
|
| | html_body = f"""
|
| | <!DOCTYPE html>
|
| | <html>
|
| | <head>
|
| | <style>
|
| | body {{ font-family: Arial, sans-serif; background-color: #f4f4f4; padding: 20px; }}
|
| | .container {{ background-color: white; padding: 30px; border-radius: 10px; max-width: 600px; margin: 0 auto; }}
|
| | .code {{ font-size: 32px; font-weight: bold; color: #4CAF50; text-align: center; padding: 20px; background-color: #f0f0f0; border-radius: 5px; letter-spacing: 5px; }}
|
| | .header {{ color: #333; text-align: center; }}
|
| | .footer {{ color: #666; font-size: 12px; text-align: center; margin-top: 30px; }}
|
| | </style>
|
| | </head>
|
| | <body>
|
| | <div class="container">
|
| | <h1 class="header">π Willkommen bei NoahsKI!</h1>
|
| | <p>Danke fΓΌr deine Registrierung. Hier ist dein Verifizierungscode:</p>
|
| |
|
| | <div class="code">{code}</div>
|
| |
|
| | <p>Dieser Code ist <strong>15 Minuten</strong> gΓΌltig.</p>
|
| |
|
| | <p><strong>β οΈ Wichtig:</strong> Falls du diese Email nicht angefordert hast, ignoriere sie einfach.</p>
|
| |
|
| | <div class="footer">
|
| | <p>Β© 2026 NoahsKI - Dein KI-Assistent</p>
|
| | <p>Diese Email wurde automatisch generiert.</p>
|
| | </div>
|
| | </div>
|
| | </body>
|
| | </html>
|
| | """
|
| |
|
| | plain_body = f"""
|
| | π Willkommen bei NoahsKI!
|
| |
|
| | Danke fΓΌr deine Registrierung. Hier ist dein Verifizierungscode:
|
| |
|
| | {code}
|
| |
|
| | Dieser Code ist 15 Minuten gΓΌltig.
|
| |
|
| | β οΈ Falls du diese Email nicht angefordert hast, ignoriere sie einfach.
|
| |
|
| | Β© 2026 NoahsKI
|
| | """
|
| |
|
| | return self._send_email(to_email, subject, html_body, plain_body)
|
| |
|
| | except Exception as e:
|
| | logger.error(f"Failed to send verification email: {e}")
|
| | return False
|
| |
|
| | def send_welcome_email(self, to_email: str, username: str = None) -> bool:
|
| | """Send welcome email after successful verification"""
|
| | try:
|
| | subject = "π Account erfolgreich verifiziert!"
|
| | name = username or to_email.split('@')[0]
|
| |
|
| | html_body = f"""
|
| | <!DOCTYPE html>
|
| | <html>
|
| | <head>
|
| | <style>
|
| | body {{ font-family: Arial, sans-serif; background-color: #f4f4f4; padding: 20px; }}
|
| | .container {{ background-color: white; padding: 30px; border-radius: 10px; max-width: 600px; margin: 0 auto; }}
|
| | .header {{ color: #4CAF50; text-align: center; }}
|
| | .feature {{ padding: 10px 0; }}
|
| | .footer {{ color: #666; font-size: 12px; text-align: center; margin-top: 30px; }}
|
| | </style>
|
| | </head>
|
| | <body>
|
| | <div class="container">
|
| | <h1 class="header">π Willkommen bei NoahsKI, {name}!</h1>
|
| |
|
| | <p>Dein Account wurde erfolgreich verifiziert und ist jetzt aktiv!</p>
|
| |
|
| | <h3>Was du jetzt tun kannst:</h3>
|
| | <div class="feature">β
Mit NoahsKI chatten</div>
|
| | <div class="feature">π¨ Bilder generieren</div>
|
| | <div class="feature">π Web-Recherche nutzen</div>
|
| | <div class="feature">π Autonomes Learning System</div>
|
| | <div class="feature">π‘ Code-Generierung</div>
|
| |
|
| | <p style="margin-top: 30px;">Viel SpaΓ mit NoahsKI!</p>
|
| |
|
| | <div class="footer">
|
| | <p>Β© 2026 NoahsKI - Dein KI-Assistent</p>
|
| | </div>
|
| | </div>
|
| | </body>
|
| | </html>
|
| | """
|
| |
|
| | plain_body = f"""
|
| | π Willkommen bei NoahsKI, {name}!
|
| |
|
| | Dein Account wurde erfolgreich verifiziert und ist jetzt aktiv!
|
| |
|
| | Was du jetzt tun kannst:
|
| | β
Mit NoahsKI chatten
|
| | π¨ Bilder generieren
|
| | π Web-Recherche nutzen
|
| | π Autonomes Learning System
|
| | π‘ Code-Generierung
|
| |
|
| | Viel SpaΓ mit NoahsKI!
|
| |
|
| | Β© 2026 NoahsKI
|
| | """
|
| |
|
| | return self._send_email(to_email, subject, html_body, plain_body)
|
| |
|
| | except Exception as e:
|
| | logger.error(f"Failed to send welcome email: {e}")
|
| | return False
|
| |
|
| | def _send_email(self, to_email: str, subject: str, html_body: str, plain_body: str) -> bool:
|
| | """Internal method to send email via SMTP"""
|
| | try:
|
| |
|
| | msg = MIMEMultipart('alternative')
|
| | msg['Subject'] = subject
|
| | msg['From'] = f"{self.from_name} <{self.from_email}>"
|
| | msg['To'] = to_email
|
| |
|
| |
|
| | part1 = MIMEText(plain_body, 'plain', 'utf-8')
|
| | part2 = MIMEText(html_body, 'html', 'utf-8')
|
| | msg.attach(part1)
|
| | msg.attach(part2)
|
| |
|
| |
|
| | with smtplib.SMTP(self.smtp_server, self.smtp_port) as server:
|
| | server.starttls()
|
| | server.login(self.smtp_user, self.smtp_password)
|
| | server.send_message(msg)
|
| |
|
| | logger.info(f"β Email sent to {to_email}")
|
| | return True
|
| |
|
| | except Exception as e:
|
| | logger.error(f"SMTP error: {e}")
|
| | return False
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| | class AuthPlugin:
|
| | """Main authentication plugin"""
|
| |
|
| | def __init__(self):
|
| | logger.info("π Initializing Authentication Plugin...")
|
| |
|
| |
|
| | AuthConfig.DATA_DIR.mkdir(parents=True, exist_ok=True)
|
| |
|
| |
|
| | self.email_service = EmailService()
|
| |
|
| |
|
| | self.users: Dict[str, User] = self._load_users()
|
| | self.sessions: Dict[str, Session] = self._load_sessions()
|
| | self.verifications: Dict[str, VerificationCode] = self._load_verifications()
|
| |
|
| | logger.info(f"β
Auth Plugin initialized ({len(self.users)} users)")
|
| |
|
| |
|
| |
|
| |
|
| |
|
| | def _load_users(self) -> Dict[str, User]:
|
| | """Load users from JSON"""
|
| | try:
|
| | if AuthConfig.USERS_FILE.exists():
|
| | with open(AuthConfig.USERS_FILE, 'r', encoding='utf-8') as f:
|
| | data = json.load(f)
|
| | return {email: User.from_dict(u) for email, u in data.items()}
|
| | except Exception as e:
|
| | logger.error(f"Failed to load users: {e}")
|
| | return {}
|
| |
|
| | def _save_users(self):
|
| | """Save users to JSON"""
|
| | try:
|
| |
|
| | AuthConfig.DATA_DIR.mkdir(parents=True, exist_ok=True)
|
| | data = {email: u.to_dict() for email, u in self.users.items()}
|
| | with open(AuthConfig.USERS_FILE, 'w', encoding='utf-8') as f:
|
| | json.dump(data, f, indent=2, ensure_ascii=False)
|
| | except Exception as e:
|
| | logger.warning(f"β οΈ Failed to save users: {e}")
|
| |
|
| | def _load_sessions(self) -> Dict[str, Session]:
|
| | """Load sessions from JSON"""
|
| | try:
|
| | if AuthConfig.SESSIONS_FILE.exists():
|
| | with open(AuthConfig.SESSIONS_FILE, 'r', encoding='utf-8') as f:
|
| | data = json.load(f)
|
| | sessions = {token: Session.from_dict(s) for token, s in data.items()}
|
| |
|
| | return {t: s for t, s in sessions.items() if s.is_valid()}
|
| | except Exception as e:
|
| | logger.error(f"Failed to load sessions: {e}")
|
| | return {}
|
| |
|
| | def _save_sessions(self):
|
| | """Save sessions to JSON"""
|
| | try:
|
| |
|
| | AuthConfig.DATA_DIR.mkdir(parents=True, exist_ok=True)
|
| |
|
| | valid_sessions = {t: s for t, s in self.sessions.items() if s.is_valid()}
|
| | data = {token: s.to_dict() for token, s in valid_sessions.items()}
|
| | with open(AuthConfig.SESSIONS_FILE, 'w', encoding='utf-8') as f:
|
| | json.dump(data, f, indent=2, ensure_ascii=False)
|
| | except Exception as e:
|
| | logger.warning(f"β οΈ Failed to save sessions: {e}")
|
| |
|
| | def _load_verifications(self) -> Dict[str, VerificationCode]:
|
| | """Load verification codes from JSON"""
|
| | try:
|
| | if AuthConfig.VERIFICATION_FILE.exists():
|
| | with open(AuthConfig.VERIFICATION_FILE, 'r', encoding='utf-8') as f:
|
| | data = json.load(f)
|
| | codes = {email: VerificationCode.from_dict(v) for email, v in data.items()}
|
| |
|
| | return {e: v for e, v in codes.items() if v.is_valid()}
|
| | except Exception as e:
|
| | logger.error(f"Failed to load verifications: {e}")
|
| | return {}
|
| |
|
| | def _save_verifications(self):
|
| | """Save verification codes to JSON"""
|
| | try:
|
| |
|
| | AuthConfig.DATA_DIR.mkdir(parents=True, exist_ok=True)
|
| |
|
| | valid_codes = {e: v for e, v in self.verifications.items() if v.is_valid()}
|
| | data = {email: v.to_dict() for email, v in valid_codes.items()}
|
| | with open(AuthConfig.VERIFICATION_FILE, 'w', encoding='utf-8') as f:
|
| | json.dump(data, f, indent=2, ensure_ascii=False)
|
| | except Exception as e:
|
| | logger.warning(f"β οΈ Failed to save verifications: {e}")
|
| |
|
| |
|
| |
|
| |
|
| |
|
| | def _hash_password(self, password: str) -> str:
|
| | """Hash password with salt"""
|
| | salt = secrets.token_hex(16)
|
| | pwd_hash = hashlib.pbkdf2_hmac('sha256', password.encode(), salt.encode(), 100000)
|
| | return f"{salt}${pwd_hash.hex()}"
|
| |
|
| | def _verify_password(self, password: str, password_hash: str) -> bool:
|
| | """Verify password against hash"""
|
| | try:
|
| | salt, pwd_hash = password_hash.split('$')
|
| | computed_hash = hashlib.pbkdf2_hmac('sha256', password.encode(), salt.encode(), 100000)
|
| | return computed_hash.hex() == pwd_hash
|
| | except:
|
| | return False
|
| |
|
| |
|
| |
|
| |
|
| |
|
| | def _generate_verification_code(self) -> str:
|
| | """Generate 6-digit verification code"""
|
| | return ''.join([str(secrets.randbelow(10)) for _ in range(AuthConfig.VERIFICATION_CODE_LENGTH)])
|
| |
|
| |
|
| |
|
| |
|
| |
|
| | def register(self, email: str, password: str, username: str = None) -> Tuple[bool, str]:
|
| | """
|
| | Register new user and send verification email
|
| |
|
| | Returns: (success, message)
|
| | """
|
| | try:
|
| |
|
| | if '@' not in email or '.' not in email.split('@')[1]:
|
| | return False, "UngΓΌltige Email-Adresse"
|
| |
|
| |
|
| | if email in self.users:
|
| | return False, "Email bereits registriert"
|
| |
|
| |
|
| | if len(password) < 8:
|
| | return False, "Passwort muss mindestens 8 Zeichen lang sein"
|
| |
|
| |
|
| | user = User(
|
| | email=email,
|
| | password_hash=self._hash_password(password),
|
| | created_at=time.time(),
|
| | verified=False,
|
| | username=username
|
| | )
|
| |
|
| | self.users[email] = user
|
| | self._save_users()
|
| |
|
| |
|
| | code = self._generate_verification_code()
|
| | expires_at = time.time() + (AuthConfig.VERIFICATION_EXPIRY_MINUTES * 60)
|
| |
|
| | verification = VerificationCode(
|
| | email=email,
|
| | code=code,
|
| | created_at=time.time(),
|
| | expires_at=expires_at
|
| | )
|
| |
|
| | self.verifications[email] = verification
|
| | self._save_verifications()
|
| |
|
| |
|
| | email_sent = self.email_service.send_verification_email(email, code)
|
| |
|
| | if email_sent:
|
| | logger.info(f"β User registered: {email}")
|
| | return True, f"Registrierung erfolgreich! Verifizierungscode wurde an {email} gesendet."
|
| | else:
|
| | return True, "Registrierung erfolgreich, aber Email konnte nicht gesendet werden. Bitte kontaktiere den Support."
|
| |
|
| | except Exception as e:
|
| | logger.error(f"Registration error: {e}")
|
| | return False, f"Fehler bei der Registrierung: {str(e)}"
|
| |
|
| | def verify_email(self, email: str, code: str) -> Tuple[bool, str]:
|
| | """
|
| | Verify email with code
|
| |
|
| | Returns: (success, message)
|
| | """
|
| | try:
|
| |
|
| | if email not in self.verifications:
|
| | return False, "Kein Verifizierungscode gefunden"
|
| |
|
| | verification = self.verifications[email]
|
| |
|
| |
|
| | if not verification.is_valid():
|
| | return False, "Verifizierungscode abgelaufen oder zu viele Versuche"
|
| |
|
| |
|
| | verification.attempts += 1
|
| | self._save_verifications()
|
| |
|
| |
|
| | if verification.code != code:
|
| | return False, f"Falscher Code (Versuch {verification.attempts}/3)"
|
| |
|
| |
|
| | if email in self.users:
|
| | self.users[email].verified = True
|
| | self._save_users()
|
| |
|
| |
|
| | del self.verifications[email]
|
| | self._save_verifications()
|
| |
|
| |
|
| | self.email_service.send_welcome_email(email, self.users[email].username)
|
| |
|
| | logger.info(f"β Email verified: {email}")
|
| | return True, "Email erfolgreich verifiziert! Du kannst dich jetzt anmelden."
|
| | else:
|
| | return False, "User nicht gefunden"
|
| |
|
| | except Exception as e:
|
| | logger.error(f"Verification error: {e}")
|
| | return False, f"Fehler bei der Verifizierung: {str(e)}"
|
| |
|
| | def resend_verification(self, email: str) -> Tuple[bool, str]:
|
| | """Resend verification code"""
|
| | try:
|
| | if email not in self.users:
|
| | return False, "Email nicht registriert"
|
| |
|
| | if self.users[email].verified:
|
| | return False, "Email bereits verifiziert"
|
| |
|
| |
|
| | code = self._generate_verification_code()
|
| | expires_at = time.time() + (AuthConfig.VERIFICATION_EXPIRY_MINUTES * 60)
|
| |
|
| | verification = VerificationCode(
|
| | email=email,
|
| | code=code,
|
| | created_at=time.time(),
|
| | expires_at=expires_at
|
| | )
|
| |
|
| | self.verifications[email] = verification
|
| | self._save_verifications()
|
| |
|
| |
|
| | email_sent = self.email_service.send_verification_email(email, code)
|
| |
|
| | if email_sent:
|
| | return True, "Neuer Verifizierungscode wurde gesendet"
|
| | else:
|
| | return False, "Email konnte nicht gesendet werden"
|
| |
|
| | except Exception as e:
|
| | logger.error(f"Resend error: {e}")
|
| | return False, f"Fehler: {str(e)}"
|
| |
|
| | def login(self, email: str, password: str, ip_address: str = None, user_agent: str = None) -> Tuple[bool, str, Optional[str]]:
|
| | """
|
| | Login user
|
| |
|
| | Returns: (success, message, token)
|
| | """
|
| | try:
|
| |
|
| | if email not in self.users:
|
| | return False, "Email oder Passwort falsch", None
|
| |
|
| | user = self.users[email]
|
| |
|
| |
|
| | if user.locked_until and time.time() < user.locked_until:
|
| | remaining = int((user.locked_until - time.time()) / 60)
|
| | return False, f"Account gesperrt. Versuche es in {remaining} Minuten erneut.", None
|
| |
|
| |
|
| | if not user.verified:
|
| | return False, "Email noch nicht verifiziert", None
|
| |
|
| |
|
| | if not self._verify_password(password, user.password_hash):
|
| | user.login_attempts += 1
|
| |
|
| |
|
| | if user.login_attempts >= AuthConfig.MAX_LOGIN_ATTEMPTS:
|
| | user.locked_until = time.time() + (AuthConfig.LOCKOUT_DURATION_MINUTES * 60)
|
| | self._save_users()
|
| | return False, f"Zu viele fehlgeschlagene Versuche. Account fΓΌr {AuthConfig.LOCKOUT_DURATION_MINUTES} Minuten gesperrt.", None
|
| |
|
| | self._save_users()
|
| | remaining = AuthConfig.MAX_LOGIN_ATTEMPTS - user.login_attempts
|
| | return False, f"Email oder Passwort falsch ({remaining} Versuche ΓΌbrig)", None
|
| |
|
| |
|
| | user.login_attempts = 0
|
| | user.last_login = time.time()
|
| | user.locked_until = None
|
| | self._save_users()
|
| |
|
| |
|
| | token = secrets.token_urlsafe(AuthConfig.TOKEN_LENGTH)
|
| | expires_at = time.time() + (AuthConfig.SESSION_EXPIRY_HOURS * 3600)
|
| |
|
| | session = Session(
|
| | token=token,
|
| | email=email,
|
| | created_at=time.time(),
|
| | expires_at=expires_at,
|
| | ip_address=ip_address,
|
| | user_agent=user_agent
|
| | )
|
| |
|
| | self.sessions[token] = session
|
| | self._save_sessions()
|
| |
|
| | logger.info(f"β User logged in: {email}")
|
| | return True, "Login erfolgreich", token
|
| |
|
| | except Exception as e:
|
| | logger.error(f"Login error: {e}")
|
| | return False, f"Fehler beim Login: {str(e)}", None
|
| |
|
| | def logout(self, token: str) -> Tuple[bool, str]:
|
| | """Logout user"""
|
| | try:
|
| | if token in self.sessions:
|
| | email = self.sessions[token].email
|
| | del self.sessions[token]
|
| | self._save_sessions()
|
| | logger.info(f"β User logged out: {email}")
|
| | return True, "Logout erfolgreich"
|
| | else:
|
| | return False, "Session nicht gefunden"
|
| | except Exception as e:
|
| | logger.error(f"Logout error: {e}")
|
| | return False, f"Fehler beim Logout: {str(e)}"
|
| |
|
| | def validate_token(self, token: str) -> Tuple[bool, Optional[str]]:
|
| | """
|
| | Validate session token
|
| |
|
| | Returns: (valid, email)
|
| | """
|
| | if token in self.sessions:
|
| | session = self.sessions[token]
|
| | if session.is_valid():
|
| | return True, session.email
|
| | else:
|
| |
|
| | del self.sessions[token]
|
| | self._save_sessions()
|
| |
|
| | return False, None
|
| |
|
| | def get_user_info(self, email: str) -> Optional[Dict]:
|
| | """Get user information (without sensitive data)"""
|
| | if email in self.users:
|
| | user = self.users[email]
|
| | return {
|
| | 'email': user.email,
|
| | 'username': user.username,
|
| | 'created_at': user.created_at,
|
| | 'verified': user.verified,
|
| | 'last_login': user.last_login
|
| | }
|
| | return None
|
| |
|
| | def get_stats(self) -> Dict:
|
| | """Get authentication statistics"""
|
| | return {
|
| | 'total_users': len(self.users),
|
| | 'verified_users': sum(1 for u in self.users.values() if u.verified),
|
| | 'active_sessions': len([s for s in self.sessions.values() if s.is_valid()]),
|
| | 'pending_verifications': len([v for v in self.verifications.values() if v.is_valid()])
|
| | }
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| | PLUGIN_INFO = {
|
| | 'name': 'auth_plugin',
|
| | 'version': '1.0.0',
|
| | 'author': 'NoahsKI Team',
|
| | 'description': 'Email-based authentication with verification',
|
| | 'endpoints': [
|
| | '/auth/register',
|
| | '/auth/verify',
|
| | '/auth/resend',
|
| | '/auth/login',
|
| | '/auth/logout',
|
| | '/auth/validate',
|
| | '/auth/user',
|
| | '/auth/stats'
|
| | ]
|
| | }
|
| |
|
| | __all__ = ['AuthPlugin', 'AuthConfig', 'PLUGIN_INFO']
|
| |
|