| | """ |
| | ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ |
| | β β |
| | β AUTHENTICATION PLUGIN for NoahsKI β |
| | β Email Verification System β |
| | β β |
| | β Features: β |
| | β β Email Registration & Verification β |
| | β β Verification Code via Email β |
| | β β Session Management β |
| | β β User Data Storage (JSON) β |
| | β β Password Hashing (bcrypt) β |
| | β β Token-based Authentication β |
| | β β |
| | ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ |
| | """ |
| |
|
| | import os |
| | import json |
| | import time |
| | import secrets |
| | import hashlib |
| | import smtplib |
| | from email.mime.text import MIMEText |
| | from email.mime.multipart import MIMEMultipart |
| | from pathlib import Path |
| | from typing import Dict, Optional, Tuple |
| | from dataclasses import dataclass, asdict |
| | from datetime import datetime, timedelta |
| | import logging |
| |
|
| | logger = logging.getLogger(__name__) |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | class AuthConfig: |
| | """Authentication plugin configuration""" |
| | |
| | |
| | DATA_DIR = Path('noahski_data/auth') |
| | USERS_FILE = DATA_DIR / 'users.json' |
| | SESSIONS_FILE = DATA_DIR / 'sessions.json' |
| | VERIFICATION_FILE = DATA_DIR / 'verifications.json' |
| | |
| | |
| | SMTP_SERVER = os.getenv('SMTP_SERVER', 'smtp.gmail.com') |
| | SMTP_PORT = int(os.getenv('SMTP_PORT', 587)) |
| | SMTP_USER = os.getenv('SMTP_USER', 'your-email@gmail.com') |
| | SMTP_PASSWORD = os.getenv('SMTP_PASSWORD', 'your-app-password') |
| | FROM_EMAIL = os.getenv('FROM_EMAIL', 'noreply@noahski.ai') |
| | FROM_NAME = os.getenv('FROM_NAME', 'NoahsKI') |
| | |
| | |
| | VERIFICATION_CODE_LENGTH = 6 |
| | VERIFICATION_EXPIRY_MINUTES = 15 |
| | SESSION_EXPIRY_HOURS = 24 |
| | MAX_LOGIN_ATTEMPTS = 5 |
| | LOCKOUT_DURATION_MINUTES = 30 |
| | |
| | |
| | TOKEN_LENGTH = 32 |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | @dataclass |
| | class User: |
| | """User data structure""" |
| | email: str |
| | password_hash: str |
| | created_at: float |
| | verified: bool = False |
| | last_login: Optional[float] = None |
| | login_attempts: int = 0 |
| | locked_until: Optional[float] = None |
| | username: Optional[str] = None |
| | |
| | def to_dict(self) -> Dict: |
| | return asdict(self) |
| | |
| | @staticmethod |
| | def from_dict(data: Dict) -> 'User': |
| | return User(**data) |
| |
|
| |
|
| | @dataclass |
| | class Session: |
| | """Session data structure""" |
| | token: str |
| | email: str |
| | created_at: float |
| | expires_at: float |
| | ip_address: Optional[str] = None |
| | user_agent: Optional[str] = None |
| | |
| | def is_valid(self) -> bool: |
| | return time.time() < self.expires_at |
| | |
| | def to_dict(self) -> Dict: |
| | return asdict(self) |
| | |
| | @staticmethod |
| | def from_dict(data: Dict) -> 'Session': |
| | return Session(**data) |
| |
|
| |
|
| | @dataclass |
| | class VerificationCode: |
| | """Email verification code""" |
| | email: str |
| | code: str |
| | created_at: float |
| | expires_at: float |
| | attempts: int = 0 |
| | |
| | def is_valid(self) -> bool: |
| | return time.time() < self.expires_at and self.attempts < 3 |
| | |
| | def to_dict(self) -> Dict: |
| | return asdict(self) |
| | |
| | @staticmethod |
| | def from_dict(data: Dict) -> 'VerificationCode': |
| | return VerificationCode(**data) |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | class EmailService: |
| | """Handle email sending""" |
| | |
| | def __init__(self): |
| | self.smtp_server = AuthConfig.SMTP_SERVER |
| | self.smtp_port = AuthConfig.SMTP_PORT |
| | self.smtp_user = AuthConfig.SMTP_USER |
| | self.smtp_password = AuthConfig.SMTP_PASSWORD |
| | self.from_email = AuthConfig.FROM_EMAIL |
| | self.from_name = AuthConfig.FROM_NAME |
| | |
| | def send_verification_email(self, to_email: str, code: str) -> bool: |
| | """Send verification code email""" |
| | try: |
| | subject = f"π Dein NoahsKI Verifizierungscode" |
| | |
| | html_body = f""" |
| | <!DOCTYPE html> |
| | <html> |
| | <head> |
| | <style> |
| | body {{ font-family: Arial, sans-serif; background-color: #f4f4f4; padding: 20px; }} |
| | .container {{ background-color: white; padding: 30px; border-radius: 10px; max-width: 600px; margin: 0 auto; }} |
| | .code {{ font-size: 32px; font-weight: bold; color: #4CAF50; text-align: center; padding: 20px; background-color: #f0f0f0; border-radius: 5px; letter-spacing: 5px; }} |
| | .header {{ color: #333; text-align: center; }} |
| | .footer {{ color: #666; font-size: 12px; text-align: center; margin-top: 30px; }} |
| | </style> |
| | </head> |
| | <body> |
| | <div class="container"> |
| | <h1 class="header">π Willkommen bei NoahsKI!</h1> |
| | <p>Danke fΓΌr deine Registrierung. Hier ist dein Verifizierungscode:</p> |
| | |
| | <div class="code">{code}</div> |
| | |
| | <p>Dieser Code ist <strong>15 Minuten</strong> gΓΌltig.</p> |
| | |
| | <p><strong>β οΈ Wichtig:</strong> Falls du diese Email nicht angefordert hast, ignoriere sie einfach.</p> |
| | |
| | <div class="footer"> |
| | <p>Β© 2026 NoahsKI - Dein KI-Assistent</p> |
| | <p>Diese Email wurde automatisch generiert.</p> |
| | </div> |
| | </div> |
| | </body> |
| | </html> |
| | """ |
| | |
| | plain_body = f""" |
| | π Willkommen bei NoahsKI! |
| | |
| | Danke fΓΌr deine Registrierung. Hier ist dein Verifizierungscode: |
| | |
| | {code} |
| | |
| | Dieser Code ist 15 Minuten gΓΌltig. |
| | |
| | β οΈ Falls du diese Email nicht angefordert hast, ignoriere sie einfach. |
| | |
| | Β© 2026 NoahsKI |
| | """ |
| | |
| | return self._send_email(to_email, subject, html_body, plain_body) |
| | |
| | except Exception as e: |
| | logger.error(f"Failed to send verification email: {e}") |
| | return False |
| | |
| | def send_welcome_email(self, to_email: str, username: str = None) -> bool: |
| | """Send welcome email after successful verification""" |
| | try: |
| | subject = "π Account erfolgreich verifiziert!" |
| | name = username or to_email.split('@')[0] |
| | |
| | html_body = f""" |
| | <!DOCTYPE html> |
| | <html> |
| | <head> |
| | <style> |
| | body {{ font-family: Arial, sans-serif; background-color: #f4f4f4; padding: 20px; }} |
| | .container {{ background-color: white; padding: 30px; border-radius: 10px; max-width: 600px; margin: 0 auto; }} |
| | .header {{ color: #4CAF50; text-align: center; }} |
| | .feature {{ padding: 10px 0; }} |
| | .footer {{ color: #666; font-size: 12px; text-align: center; margin-top: 30px; }} |
| | </style> |
| | </head> |
| | <body> |
| | <div class="container"> |
| | <h1 class="header">π Willkommen bei NoahsKI, {name}!</h1> |
| | |
| | <p>Dein Account wurde erfolgreich verifiziert und ist jetzt aktiv!</p> |
| | |
| | <h3>Was du jetzt tun kannst:</h3> |
| | <div class="feature">β
Mit NoahsKI chatten</div> |
| | <div class="feature">π¨ Bilder generieren</div> |
| | <div class="feature">π Web-Recherche nutzen</div> |
| | <div class="feature">π Autonomes Learning System</div> |
| | <div class="feature">π‘ Code-Generierung</div> |
| | |
| | <p style="margin-top: 30px;">Viel SpaΓ mit NoahsKI!</p> |
| | |
| | <div class="footer"> |
| | <p>Β© 2026 NoahsKI - Dein KI-Assistent</p> |
| | </div> |
| | </div> |
| | </body> |
| | </html> |
| | """ |
| | |
| | plain_body = f""" |
| | π Willkommen bei NoahsKI, {name}! |
| | |
| | Dein Account wurde erfolgreich verifiziert und ist jetzt aktiv! |
| | |
| | Was du jetzt tun kannst: |
| | β
Mit NoahsKI chatten |
| | π¨ Bilder generieren |
| | π Web-Recherche nutzen |
| | π Autonomes Learning System |
| | π‘ Code-Generierung |
| | |
| | Viel SpaΓ mit NoahsKI! |
| | |
| | Β© 2026 NoahsKI |
| | """ |
| | |
| | return self._send_email(to_email, subject, html_body, plain_body) |
| | |
| | except Exception as e: |
| | logger.error(f"Failed to send welcome email: {e}") |
| | return False |
| | |
| | def _send_email(self, to_email: str, subject: str, html_body: str, plain_body: str) -> bool: |
| | """Internal method to send email via SMTP""" |
| | try: |
| | |
| | msg = MIMEMultipart('alternative') |
| | msg['Subject'] = subject |
| | msg['From'] = f"{self.from_name} <{self.from_email}>" |
| | msg['To'] = to_email |
| | |
| | |
| | part1 = MIMEText(plain_body, 'plain', 'utf-8') |
| | part2 = MIMEText(html_body, 'html', 'utf-8') |
| | msg.attach(part1) |
| | msg.attach(part2) |
| | |
| | |
| | with smtplib.SMTP(self.smtp_server, self.smtp_port) as server: |
| | server.starttls() |
| | server.login(self.smtp_user, self.smtp_password) |
| | server.send_message(msg) |
| | |
| | logger.info(f"β Email sent to {to_email}") |
| | return True |
| | |
| | except Exception as e: |
| | logger.error(f"SMTP error: {e}") |
| | return False |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | class AuthPlugin: |
| | """Main authentication plugin""" |
| | |
| | def __init__(self): |
| | logger.info("π Initializing Authentication Plugin...") |
| | |
| | |
| | AuthConfig.DATA_DIR.mkdir(parents=True, exist_ok=True) |
| | |
| | |
| | self.email_service = EmailService() |
| | |
| | |
| | self.users: Dict[str, User] = self._load_users() |
| | self.sessions: Dict[str, Session] = self._load_sessions() |
| | self.verifications: Dict[str, VerificationCode] = self._load_verifications() |
| | |
| | logger.info(f"β
Auth Plugin initialized ({len(self.users)} users)") |
| | |
| | |
| | |
| | |
| | |
| | def _load_users(self) -> Dict[str, User]: |
| | """Load users from JSON""" |
| | try: |
| | if AuthConfig.USERS_FILE.exists(): |
| | with open(AuthConfig.USERS_FILE, 'r', encoding='utf-8') as f: |
| | data = json.load(f) |
| | return {email: User.from_dict(u) for email, u in data.items()} |
| | except Exception as e: |
| | logger.error(f"Failed to load users: {e}") |
| | return {} |
| | |
| | def _save_users(self): |
| | """Save users to JSON""" |
| | try: |
| | data = {email: u.to_dict() for email, u in self.users.items()} |
| | with open(AuthConfig.USERS_FILE, 'w', encoding='utf-8') as f: |
| | json.dump(data, f, indent=2, ensure_ascii=False) |
| | except Exception as e: |
| | logger.error(f"Failed to save users: {e}") |
| | |
| | def _load_sessions(self) -> Dict[str, Session]: |
| | """Load sessions from JSON""" |
| | try: |
| | if AuthConfig.SESSIONS_FILE.exists(): |
| | with open(AuthConfig.SESSIONS_FILE, 'r', encoding='utf-8') as f: |
| | data = json.load(f) |
| | sessions = {token: Session.from_dict(s) for token, s in data.items()} |
| | |
| | return {t: s for t, s in sessions.items() if s.is_valid()} |
| | except Exception as e: |
| | logger.error(f"Failed to load sessions: {e}") |
| | return {} |
| | |
| | def _save_sessions(self): |
| | """Save sessions to JSON""" |
| | try: |
| | |
| | valid_sessions = {t: s for t, s in self.sessions.items() if s.is_valid()} |
| | data = {token: s.to_dict() for token, s in valid_sessions.items()} |
| | with open(AuthConfig.SESSIONS_FILE, 'w', encoding='utf-8') as f: |
| | json.dump(data, f, indent=2, ensure_ascii=False) |
| | except Exception as e: |
| | logger.error(f"Failed to save sessions: {e}") |
| | |
| | def _load_verifications(self) -> Dict[str, VerificationCode]: |
| | """Load verification codes from JSON""" |
| | try: |
| | if AuthConfig.VERIFICATION_FILE.exists(): |
| | with open(AuthConfig.VERIFICATION_FILE, 'r', encoding='utf-8') as f: |
| | data = json.load(f) |
| | codes = {email: VerificationCode.from_dict(v) for email, v in data.items()} |
| | |
| | return {e: v for e, v in codes.items() if v.is_valid()} |
| | except Exception as e: |
| | logger.error(f"Failed to load verifications: {e}") |
| | return {} |
| | |
| | def _save_verifications(self): |
| | """Save verification codes to JSON""" |
| | try: |
| | |
| | valid_codes = {e: v for e, v in self.verifications.items() if v.is_valid()} |
| | data = {email: v.to_dict() for email, v in valid_codes.items()} |
| | with open(AuthConfig.VERIFICATION_FILE, 'w', encoding='utf-8') as f: |
| | json.dump(data, f, indent=2, ensure_ascii=False) |
| | except Exception as e: |
| | logger.error(f"Failed to save verifications: {e}") |
| | |
| | |
| | |
| | |
| | |
| | def _hash_password(self, password: str) -> str: |
| | """Hash password with salt""" |
| | salt = secrets.token_hex(16) |
| | pwd_hash = hashlib.pbkdf2_hmac('sha256', password.encode(), salt.encode(), 100000) |
| | return f"{salt}${pwd_hash.hex()}" |
| | |
| | def _verify_password(self, password: str, password_hash: str) -> bool: |
| | """Verify password against hash""" |
| | try: |
| | salt, pwd_hash = password_hash.split('$') |
| | computed_hash = hashlib.pbkdf2_hmac('sha256', password.encode(), salt.encode(), 100000) |
| | return computed_hash.hex() == pwd_hash |
| | except: |
| | return False |
| | |
| | |
| | |
| | |
| | |
| | def _generate_verification_code(self) -> str: |
| | """Generate 6-digit verification code""" |
| | return ''.join([str(secrets.randbelow(10)) for _ in range(AuthConfig.VERIFICATION_CODE_LENGTH)]) |
| | |
| | |
| | |
| | |
| | |
| | def register(self, email: str, password: str, username: str = None) -> Tuple[bool, str]: |
| | """ |
| | Register new user and send verification email |
| | |
| | Returns: (success, message) |
| | """ |
| | try: |
| | |
| | if '@' not in email or '.' not in email.split('@')[1]: |
| | return False, "UngΓΌltige Email-Adresse" |
| | |
| | |
| | if email in self.users: |
| | return False, "Email bereits registriert" |
| | |
| | |
| | if len(password) < 8: |
| | return False, "Passwort muss mindestens 8 Zeichen lang sein" |
| | |
| | |
| | user = User( |
| | email=email, |
| | password_hash=self._hash_password(password), |
| | created_at=time.time(), |
| | verified=False, |
| | username=username |
| | ) |
| | |
| | self.users[email] = user |
| | self._save_users() |
| | |
| | |
| | code = self._generate_verification_code() |
| | expires_at = time.time() + (AuthConfig.VERIFICATION_EXPIRY_MINUTES * 60) |
| | |
| | verification = VerificationCode( |
| | email=email, |
| | code=code, |
| | created_at=time.time(), |
| | expires_at=expires_at |
| | ) |
| | |
| | self.verifications[email] = verification |
| | self._save_verifications() |
| | |
| | |
| | email_sent = self.email_service.send_verification_email(email, code) |
| | |
| | if email_sent: |
| | logger.info(f"β User registered: {email}") |
| | return True, f"Registrierung erfolgreich! Verifizierungscode wurde an {email} gesendet." |
| | else: |
| | return True, "Registrierung erfolgreich, aber Email konnte nicht gesendet werden. Bitte kontaktiere den Support." |
| | |
| | except Exception as e: |
| | logger.error(f"Registration error: {e}") |
| | return False, f"Fehler bei der Registrierung: {str(e)}" |
| | |
| | def verify_email(self, email: str, code: str) -> Tuple[bool, str]: |
| | """ |
| | Verify email with code |
| | |
| | Returns: (success, message) |
| | """ |
| | try: |
| | |
| | if email not in self.verifications: |
| | return False, "Kein Verifizierungscode gefunden" |
| | |
| | verification = self.verifications[email] |
| | |
| | |
| | if not verification.is_valid(): |
| | return False, "Verifizierungscode abgelaufen oder zu viele Versuche" |
| | |
| | |
| | verification.attempts += 1 |
| | self._save_verifications() |
| | |
| | |
| | if verification.code != code: |
| | return False, f"Falscher Code (Versuch {verification.attempts}/3)" |
| | |
| | |
| | if email in self.users: |
| | self.users[email].verified = True |
| | self._save_users() |
| | |
| | |
| | del self.verifications[email] |
| | self._save_verifications() |
| | |
| | |
| | self.email_service.send_welcome_email(email, self.users[email].username) |
| | |
| | logger.info(f"β Email verified: {email}") |
| | return True, "Email erfolgreich verifiziert! Du kannst dich jetzt anmelden." |
| | else: |
| | return False, "User nicht gefunden" |
| | |
| | except Exception as e: |
| | logger.error(f"Verification error: {e}") |
| | return False, f"Fehler bei der Verifizierung: {str(e)}" |
| | |
| | def resend_verification(self, email: str) -> Tuple[bool, str]: |
| | """Resend verification code""" |
| | try: |
| | if email not in self.users: |
| | return False, "Email nicht registriert" |
| | |
| | if self.users[email].verified: |
| | return False, "Email bereits verifiziert" |
| | |
| | |
| | code = self._generate_verification_code() |
| | expires_at = time.time() + (AuthConfig.VERIFICATION_EXPIRY_MINUTES * 60) |
| | |
| | verification = VerificationCode( |
| | email=email, |
| | code=code, |
| | created_at=time.time(), |
| | expires_at=expires_at |
| | ) |
| | |
| | self.verifications[email] = verification |
| | self._save_verifications() |
| | |
| | |
| | email_sent = self.email_service.send_verification_email(email, code) |
| | |
| | if email_sent: |
| | return True, "Neuer Verifizierungscode wurde gesendet" |
| | else: |
| | return False, "Email konnte nicht gesendet werden" |
| | |
| | except Exception as e: |
| | logger.error(f"Resend error: {e}") |
| | return False, f"Fehler: {str(e)}" |
| | |
| | def login(self, email: str, password: str, ip_address: str = None, user_agent: str = None) -> Tuple[bool, str, Optional[str]]: |
| | """ |
| | Login user |
| | |
| | Returns: (success, message, token) |
| | """ |
| | try: |
| | |
| | if email not in self.users: |
| | return False, "Email oder Passwort falsch", None |
| | |
| | user = self.users[email] |
| | |
| | |
| | if user.locked_until and time.time() < user.locked_until: |
| | remaining = int((user.locked_until - time.time()) / 60) |
| | return False, f"Account gesperrt. Versuche es in {remaining} Minuten erneut.", None |
| | |
| | |
| | if not user.verified: |
| | return False, "Email noch nicht verifiziert", None |
| | |
| | |
| | if not self._verify_password(password, user.password_hash): |
| | user.login_attempts += 1 |
| | |
| | |
| | if user.login_attempts >= AuthConfig.MAX_LOGIN_ATTEMPTS: |
| | user.locked_until = time.time() + (AuthConfig.LOCKOUT_DURATION_MINUTES * 60) |
| | self._save_users() |
| | return False, f"Zu viele fehlgeschlagene Versuche. Account fΓΌr {AuthConfig.LOCKOUT_DURATION_MINUTES} Minuten gesperrt.", None |
| | |
| | self._save_users() |
| | remaining = AuthConfig.MAX_LOGIN_ATTEMPTS - user.login_attempts |
| | return False, f"Email oder Passwort falsch ({remaining} Versuche ΓΌbrig)", None |
| | |
| | |
| | user.login_attempts = 0 |
| | user.last_login = time.time() |
| | user.locked_until = None |
| | self._save_users() |
| | |
| | |
| | token = secrets.token_urlsafe(AuthConfig.TOKEN_LENGTH) |
| | expires_at = time.time() + (AuthConfig.SESSION_EXPIRY_HOURS * 3600) |
| | |
| | session = Session( |
| | token=token, |
| | email=email, |
| | created_at=time.time(), |
| | expires_at=expires_at, |
| | ip_address=ip_address, |
| | user_agent=user_agent |
| | ) |
| | |
| | self.sessions[token] = session |
| | self._save_sessions() |
| | |
| | logger.info(f"β User logged in: {email}") |
| | return True, "Login erfolgreich", token |
| | |
| | except Exception as e: |
| | logger.error(f"Login error: {e}") |
| | return False, f"Fehler beim Login: {str(e)}", None |
| | |
| | def logout(self, token: str) -> Tuple[bool, str]: |
| | """Logout user""" |
| | try: |
| | if token in self.sessions: |
| | email = self.sessions[token].email |
| | del self.sessions[token] |
| | self._save_sessions() |
| | logger.info(f"β User logged out: {email}") |
| | return True, "Logout erfolgreich" |
| | else: |
| | return False, "Session nicht gefunden" |
| | except Exception as e: |
| | logger.error(f"Logout error: {e}") |
| | return False, f"Fehler beim Logout: {str(e)}" |
| | |
| | def validate_token(self, token: str) -> Tuple[bool, Optional[str]]: |
| | """ |
| | Validate session token |
| | |
| | Returns: (valid, email) |
| | """ |
| | if token in self.sessions: |
| | session = self.sessions[token] |
| | if session.is_valid(): |
| | return True, session.email |
| | else: |
| | |
| | del self.sessions[token] |
| | self._save_sessions() |
| | |
| | return False, None |
| | |
| | def get_user_info(self, email: str) -> Optional[Dict]: |
| | """Get user information (without sensitive data)""" |
| | if email in self.users: |
| | user = self.users[email] |
| | return { |
| | 'email': user.email, |
| | 'username': user.username, |
| | 'created_at': user.created_at, |
| | 'verified': user.verified, |
| | 'last_login': user.last_login |
| | } |
| | return None |
| | |
| | def get_stats(self) -> Dict: |
| | """Get authentication statistics""" |
| | return { |
| | 'total_users': len(self.users), |
| | 'verified_users': sum(1 for u in self.users.values() if u.verified), |
| | 'active_sessions': len([s for s in self.sessions.values() if s.is_valid()]), |
| | 'pending_verifications': len([v for v in self.verifications.values() if v.is_valid()]) |
| | } |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | PLUGIN_INFO = { |
| | 'name': 'auth_plugin', |
| | 'version': '1.0.0', |
| | 'author': 'NoahsKI Team', |
| | 'description': 'Email-based authentication with verification', |
| | 'endpoints': [ |
| | '/auth/register', |
| | '/auth/verify', |
| | '/auth/resend', |
| | '/auth/login', |
| | '/auth/logout', |
| | '/auth/validate', |
| | '/auth/user', |
| | '/auth/stats' |
| | ] |
| | } |
| |
|
| | __all__ = ['AuthPlugin', 'AuthConfig', 'PLUGIN_INFO'] |
| |
|