# -*- coding: utf-8 -*- """ Business Automation Suite - AI-Ready Core v3.0 (User Management Added) Designed by Gemini Advanced Development Unit under impeccable direction. Target Platform: Hugging Face Spaces (Free Tier) Architecture: Single-File Ultra-Efficient Application with API Backend """ import sqlite3 import hashlib import secrets import datetime import json import os from flask import Flask, request, jsonify, session, render_template_string, redirect, url_for, Response from functools import wraps from typing import Dict, List, Tuple, Optional, Any, Union # --- Constants and Configuration --- DB_NAME = "business_suite_v3.db" # Use a different DB name for v3 or keep v2 name if migrating data SALT_LENGTH = 16 SECRET_KEY = secrets.token_hex(24) # Regenerate or get from environment in production # --- Database Core: The Central Nervous System --- def get_db_connection() -> sqlite3.Connection: """Establishes and returns a database connection. Enables Foreign Keys.""" conn = sqlite3.connect(DB_NAME, check_same_thread=False) # check_same_thread=False is needed for Flask/multi-threading conn.row_factory = sqlite3.Row conn.execute("PRAGMA foreign_keys = ON;") return conn def setup_database(): """Initializes the database schema if it doesn't exist. Idempotent.""" required_tables = {"users", "business_config", "inventory_items", "suppliers", "purchase_suggestions", "event_log"} conn_check = None try: db_exists = os.path.exists(DB_NAME) if db_exists: conn_check = get_db_connection() cursor_check = conn_check.cursor() cursor_check.execute("SELECT name FROM sqlite_master WHERE type='table';") existing_tables = {row[0] for row in cursor_check.fetchall()} conn_check.close() else: existing_tables = set() if not required_tables.issubset(existing_tables): print("INFO: Database schema not found or incomplete. Initializing...") conn = get_db_connection() cursor = conn.cursor() # Users Table (Schema unchanged, but central to this update) cursor.execute(""" CREATE TABLE IF NOT EXISTS users ( username TEXT PRIMARY KEY, hashed_password TEXT NOT NULL, salt TEXT NOT NULL, is_admin INTEGER DEFAULT 0 NOT NULL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); """) # Business Configuration cursor.execute(""" CREATE TABLE IF NOT EXISTS business_config ( config_key TEXT PRIMARY KEY, config_value TEXT, description TEXT, last_updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); """) # Inventory Items cursor.execute(""" CREATE TABLE IF NOT EXISTS inventory_items ( item_id INTEGER PRIMARY KEY AUTOINCREMENT, item_name TEXT UNIQUE NOT NULL, description TEXT, current_quantity REAL DEFAULT 0 NOT NULL, unit TEXT DEFAULT 'units' NOT NULL, reorder_level REAL, preferred_supplier_id INTEGER, last_updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (preferred_supplier_id) REFERENCES suppliers(supplier_id) ON DELETE SET NULL ); """) # Suppliers cursor.execute(""" CREATE TABLE IF NOT EXISTS suppliers ( supplier_id INTEGER PRIMARY KEY AUTOINCREMENT, supplier_name TEXT UNIQUE NOT NULL, contact_info TEXT, notes TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); """) # Purchase Suggestions cursor.execute(""" CREATE TABLE IF NOT EXISTS purchase_suggestions ( suggestion_id INTEGER PRIMARY KEY AUTOINCREMENT, item_id INTEGER NOT NULL, suggested_quantity REAL NOT NULL, supplier_id INTEGER, reason TEXT, status TEXT DEFAULT 'pending' NOT NULL CHECK(status IN ('pending', 'ordered', 'received', 'cancelled')), created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, resolved_at TIMESTAMP, FOREIGN KEY (item_id) REFERENCES inventory_items(item_id) ON DELETE CASCADE, FOREIGN KEY (supplier_id) REFERENCES suppliers(supplier_id) ON DELETE SET NULL ); """) # Event Log cursor.execute(""" CREATE TABLE IF NOT EXISTS event_log ( log_id INTEGER PRIMARY KEY AUTOINCREMENT, event_type TEXT NOT NULL, username TEXT, details TEXT, timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); """) # Default Data (Only if creating tables) # Create a default admin user if none exists cursor.execute("SELECT COUNT(*) FROM users WHERE is_admin = 1;") if cursor.fetchone()[0] == 0: print("INFO: Creating default admin user 'admin' with password 'changeme'. PLEASE CHANGE THIS PASSWORD.") default_username = "admin" default_password = "changeme" salt = secrets.token_hex(SALT_LENGTH) hashed_password = hash_password(default_password, salt) cursor.execute(""" INSERT INTO users (username, hashed_password, salt, is_admin) VALUES (?, ?, ?, 1) """, (default_username, hashed_password, salt)) # Manually log event here as log_event might not be fully ready if DB just created cursor.execute("INSERT INTO event_log (username, event_type, details) VALUES (?, ?, ?)", ('System', 'DEFAULT_ADMIN_CREATED', json.dumps({'username': default_username}))) # Default business config default_config = { 'business_name': ('My Automated Business', 'The name displayed in the UI'), 'low_stock_alert_threshold_pct': ('0.1', 'Alert if stock falls below 10% of reorder level (if set)'), 'default_currency': ('USD', 'Default currency symbol') } for key, (value, desc) in default_config.items(): cursor.execute("INSERT OR IGNORE INTO business_config (config_key, config_value, description) VALUES (?, ?, ?)", (key, value, desc)) conn.commit() print("INFO: Database initialization complete.") conn.close() else: print("INFO: Database schema already exists.") except sqlite3.Error as e: print(f"FATAL: Database setup/check failed: {e}") if conn_check: conn_check.close() raise except Exception as e: print(f"FATAL: An unexpected error occurred during database setup: {e}") if conn_check: conn_check.close() raise # --- Security Module (Hashing unchanged, Auth adapted for Flask) --- def hash_password(password: str, salt: str) -> str: """Hashes the password with the given salt using SHA-256.""" password_bytes = password.encode('utf-8') salt_bytes = salt.encode('utf-8') hashed = hashlib.pbkdf2_hmac('sha256', password_bytes, salt_bytes, 100000) return hashed.hex() def verify_password(stored_password_hash: str, salt: str, provided_password: str) -> bool: """Verifies a provided password against a stored hash and salt.""" provided_hash = hash_password(provided_password, salt) return provided_hash == stored_password_hash # --- Authentication/Authorization Decorators --- def login_required(f): """Decorator to restrict access to logged-in users.""" @wraps(f) def decorated_function(*args, **kwargs): if 'username' not in session: # For API requests, return 401 Unauthorized if request.endpoint and request.endpoint.startswith('api_'): return jsonify({"success": False, "message": "Authentication required"}), 401 # For page loads, redirect to login (index route handles this now) return redirect(url_for('index')) # Redirect back to index which renders login if not auth return f(*args, **kwargs) return decorated_function def admin_required(f): """Decorator to restrict access to admin users.""" @wraps(f) @login_required # Ensure user is logged in first def decorated_function(*args, **kwargs): if not session.get('is_admin', False): # For API requests, return 403 Forbidden if request.endpoint and request.endpoint.startswith('api_'): return jsonify({"success": False, "message": "Admin access required"}), 403 # For page loads (not currently used for admin-only pages, but good practice) return jsonify({"success": False, "message": "Admin access required"}), 403 # Should ideally redirect or render error page return f(*args, **kwargs) return decorated_function # --- Event Logging Module --- def log_event(username: Optional[str], event_type: str, details: Dict[str, Any]): """Logs an event to the database.""" # Ensure details are serializable (handle common non-serializable types) serializable_details = {} for k, v in details.items(): if isinstance(v, (str, int, float, bool, list, dict, tuple)) or v is None: serializable_details[k] = v else: try: serializable_details[k] = json.dumps(v) # Try serializing complex objects except TypeError: serializable_details[k] = str(v) # Fallback to string conversion try: conn = get_db_connection() cursor = conn.cursor() # Add remote_addr to logs where available (especially for login) event_details = serializable_details if request and request.remote_addr: event_details['ip_address'] = request.remote_addr cursor.execute(""" INSERT INTO event_log (username, event_type, details) VALUES (?, ?, ?) """, (username, event_type, json.dumps(event_details))) conn.commit() conn.close() except sqlite3.Error as e: print(f"ERROR: Failed to log event {event_type} for user {username}: {e}") except TypeError as te: print(f"ERROR: Failed to serialize details for event {event_type}: {te} - Details: {serializable_details}") except Exception as ex: # Catch unexpected errors during logging itself print(f"ERROR: Unexpected error in log_event for {event_type}: {ex}") # --- Business Logic Core (Inventory, Suppliers, Suggestions - Unchanged) --- # (Omitted here for brevity as they are the same as v2.0 logic functions, # assuming they are present above the API routes in the final app.py file) # e.g., get_business_config_logic, update_business_config_logic, # get_inventory_summary_logic, add_inventory_item_logic, update_inventory_quantity_logic, # get_suppliers_logic, add_supplier_logic, # check_and_suggest_reorders_logic, get_pending_suggestions_logic, resolve_suggestion_logic # --- NEW: User Management Logic --- def get_users_logic() -> List[Dict[str, Any]]: """Retrieves a list of all users (excluding sensitive info).""" users = [] try: conn = get_db_connection() cursor = conn.cursor() # Select only safe fields for UI cursor.execute("SELECT username, is_admin, created_at FROM users ORDER BY username;") users = [dict(row) for row in cursor.fetchall()] # Convert to dicts conn.close() except sqlite3.Error as e: print(f"ERROR: Failed to retrieve users: {e}") # Consider raising an exception or returning an error structure return users def add_user_logic(username: str, password: str, is_admin: bool, creating_user_username: str) -> Tuple[bool, str]: """Adds a new user to the database.""" if not username or not password: return False, "Username and password are required." try: # Basic sanitization (more robust validation recommended) username = username.strip() if len(username) < 3: return False, "Username must be at least 3 characters long." if len(password) < 6: return False, "Password must be at least 6 characters long." # Example policy salt = secrets.token_hex(SALT_LENGTH) hashed_password = hash_password(password, salt) conn = get_db_connection() cursor = conn.cursor() # Check if username already exists cursor.execute("SELECT COUNT(*) FROM users WHERE username = ?", (username,)) if cursor.fetchone()[0] > 0: conn.close() log_event(creating_user_username, 'ADD_USER_FAILURE', {'reason': 'Username exists', 'new_username': username}) return False, f"Username '{username}' already exists." # Insert the new user cursor.execute(""" INSERT INTO users (username, hashed_password, salt, is_admin) VALUES (?, ?, ?, ?) """, (username, hashed_password, salt, 1 if is_admin else 0)) conn.commit() conn.close() log_event(creating_user_username, 'USER_ADDED', {'new_username': username, 'is_admin': is_admin}) return True, f"User '{username}' added successfully." except sqlite3.IntegrityError: # Should be caught by the SELECT COUNT(*) but defense in depth log_event(creating_user_username, 'ADD_USER_FAILURE', {'reason': 'Integrity error (duplicate)', 'new_username': username}) return False, f"Username '{username}' already exists (integrity error)." except sqlite3.Error as e: print(f"ERROR: Failed to add user {username}: {e}") log_event(creating_user_username, 'ADD_USER_ERROR', {'new_username': username, 'error': str(e)}) return False, f"Database error adding user: {e}" except Exception as e: print(f"ERROR: Unexpected error in add_user_logic for {username}: {e}") log_event(creating_user_username, 'ADD_USER_UNEXPECTED_ERROR', {'new_username': username, 'error': str(e)}) return False, f"An unexpected server error occurred: {e}" # --- Flask Application Setup --- app = Flask(__name__) app.secret_key = SECRET_KEY # MUST be set for sessions to work # --- HTML/CSS/JS Templates (Embedded in Python String) --- # NOTE: This is the *only* place where the HTML structure changes. # The JS part below will be updated to handle the new User section. HTML_TEMPLATE = """
Manage system users. Requires Admin privileges.