# -*- coding: utf-8 -*- """ Business Automation Suite - AI-Ready Core v3.0 (User Management Added) Designed by Gemini Advanced Development Unit under impeccable direction. Target Platform: Hugging Face Spaces (Free Tier) Architecture: Single-File Ultra-Efficient Application with API Backend """ import sqlite3 import hashlib import secrets import datetime import json import os from flask import Flask, request, jsonify, session, render_template_string, redirect, url_for, Response from functools import wraps from typing import Dict, List, Tuple, Optional, Any, Union # --- Constants and Configuration --- DB_NAME = "business_suite_v3.db" # Use a different DB name for v3 or keep v2 name if migrating data SALT_LENGTH = 16 SECRET_KEY = secrets.token_hex(24) # Regenerate or get from environment in production # --- Database Core: The Central Nervous System --- def get_db_connection() -> sqlite3.Connection: """Establishes and returns a database connection. Enables Foreign Keys.""" conn = sqlite3.connect(DB_NAME, check_same_thread=False) # check_same_thread=False is needed for Flask/multi-threading conn.row_factory = sqlite3.Row conn.execute("PRAGMA foreign_keys = ON;") return conn def setup_database(): """Initializes the database schema if it doesn't exist. Idempotent.""" required_tables = {"users", "business_config", "inventory_items", "suppliers", "purchase_suggestions", "event_log"} conn_check = None try: db_exists = os.path.exists(DB_NAME) if db_exists: conn_check = get_db_connection() cursor_check = conn_check.cursor() cursor_check.execute("SELECT name FROM sqlite_master WHERE type='table';") existing_tables = {row[0] for row in cursor_check.fetchall()} conn_check.close() else: existing_tables = set() if not required_tables.issubset(existing_tables): print("INFO: Database schema not found or incomplete. Initializing...") conn = get_db_connection() cursor = conn.cursor() # Users Table (Schema unchanged, but central to this update) cursor.execute(""" CREATE TABLE IF NOT EXISTS users ( username TEXT PRIMARY KEY, hashed_password TEXT NOT NULL, salt TEXT NOT NULL, is_admin INTEGER DEFAULT 0 NOT NULL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); """) # Business Configuration cursor.execute(""" CREATE TABLE IF NOT EXISTS business_config ( config_key TEXT PRIMARY KEY, config_value TEXT, description TEXT, last_updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); """) # Inventory Items cursor.execute(""" CREATE TABLE IF NOT EXISTS inventory_items ( item_id INTEGER PRIMARY KEY AUTOINCREMENT, item_name TEXT UNIQUE NOT NULL, description TEXT, current_quantity REAL DEFAULT 0 NOT NULL, unit TEXT DEFAULT 'units' NOT NULL, reorder_level REAL, preferred_supplier_id INTEGER, last_updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (preferred_supplier_id) REFERENCES suppliers(supplier_id) ON DELETE SET NULL ); """) # Suppliers cursor.execute(""" CREATE TABLE IF NOT EXISTS suppliers ( supplier_id INTEGER PRIMARY KEY AUTOINCREMENT, supplier_name TEXT UNIQUE NOT NULL, contact_info TEXT, notes TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); """) # Purchase Suggestions cursor.execute(""" CREATE TABLE IF NOT EXISTS purchase_suggestions ( suggestion_id INTEGER PRIMARY KEY AUTOINCREMENT, item_id INTEGER NOT NULL, suggested_quantity REAL NOT NULL, supplier_id INTEGER, reason TEXT, status TEXT DEFAULT 'pending' NOT NULL CHECK(status IN ('pending', 'ordered', 'received', 'cancelled')), created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, resolved_at TIMESTAMP, FOREIGN KEY (item_id) REFERENCES inventory_items(item_id) ON DELETE CASCADE, FOREIGN KEY (supplier_id) REFERENCES suppliers(supplier_id) ON DELETE SET NULL ); """) # Event Log cursor.execute(""" CREATE TABLE IF NOT EXISTS event_log ( log_id INTEGER PRIMARY KEY AUTOINCREMENT, event_type TEXT NOT NULL, username TEXT, details TEXT, timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); """) # Default Data (Only if creating tables) # Create a default admin user if none exists cursor.execute("SELECT COUNT(*) FROM users WHERE is_admin = 1;") if cursor.fetchone()[0] == 0: print("INFO: Creating default admin user 'admin' with password 'changeme'. PLEASE CHANGE THIS PASSWORD.") default_username = "admin" default_password = "changeme" salt = secrets.token_hex(SALT_LENGTH) hashed_password = hash_password(default_password, salt) cursor.execute(""" INSERT INTO users (username, hashed_password, salt, is_admin) VALUES (?, ?, ?, 1) """, (default_username, hashed_password, salt)) # Manually log event here as log_event might not be fully ready if DB just created cursor.execute("INSERT INTO event_log (username, event_type, details) VALUES (?, ?, ?)", ('System', 'DEFAULT_ADMIN_CREATED', json.dumps({'username': default_username}))) # Default business config default_config = { 'business_name': ('My Automated Business', 'The name displayed in the UI'), 'low_stock_alert_threshold_pct': ('0.1', 'Alert if stock falls below 10% of reorder level (if set)'), 'default_currency': ('USD', 'Default currency symbol') } for key, (value, desc) in default_config.items(): cursor.execute("INSERT OR IGNORE INTO business_config (config_key, config_value, description) VALUES (?, ?, ?)", (key, value, desc)) conn.commit() print("INFO: Database initialization complete.") conn.close() else: print("INFO: Database schema already exists.") except sqlite3.Error as e: print(f"FATAL: Database setup/check failed: {e}") if conn_check: conn_check.close() raise except Exception as e: print(f"FATAL: An unexpected error occurred during database setup: {e}") if conn_check: conn_check.close() raise # --- Security Module (Hashing unchanged, Auth adapted for Flask) --- def hash_password(password: str, salt: str) -> str: """Hashes the password with the given salt using SHA-256.""" password_bytes = password.encode('utf-8') salt_bytes = salt.encode('utf-8') hashed = hashlib.pbkdf2_hmac('sha256', password_bytes, salt_bytes, 100000) return hashed.hex() def verify_password(stored_password_hash: str, salt: str, provided_password: str) -> bool: """Verifies a provided password against a stored hash and salt.""" provided_hash = hash_password(provided_password, salt) return provided_hash == stored_password_hash # --- Authentication/Authorization Decorators --- def login_required(f): """Decorator to restrict access to logged-in users.""" @wraps(f) def decorated_function(*args, **kwargs): if 'username' not in session: # For API requests, return 401 Unauthorized if request.endpoint and request.endpoint.startswith('api_'): return jsonify({"success": False, "message": "Authentication required"}), 401 # For page loads, redirect to login (index route handles this now) return redirect(url_for('index')) # Redirect back to index which renders login if not auth return f(*args, **kwargs) return decorated_function def admin_required(f): """Decorator to restrict access to admin users.""" @wraps(f) @login_required # Ensure user is logged in first def decorated_function(*args, **kwargs): if not session.get('is_admin', False): # For API requests, return 403 Forbidden if request.endpoint and request.endpoint.startswith('api_'): return jsonify({"success": False, "message": "Admin access required"}), 403 # For page loads (not currently used for admin-only pages, but good practice) return jsonify({"success": False, "message": "Admin access required"}), 403 # Should ideally redirect or render error page return f(*args, **kwargs) return decorated_function # --- Event Logging Module --- def log_event(username: Optional[str], event_type: str, details: Dict[str, Any]): """Logs an event to the database.""" # Ensure details are serializable (handle common non-serializable types) serializable_details = {} for k, v in details.items(): if isinstance(v, (str, int, float, bool, list, dict, tuple)) or v is None: serializable_details[k] = v else: try: serializable_details[k] = json.dumps(v) # Try serializing complex objects except TypeError: serializable_details[k] = str(v) # Fallback to string conversion try: conn = get_db_connection() cursor = conn.cursor() # Add remote_addr to logs where available (especially for login) event_details = serializable_details if request and request.remote_addr: event_details['ip_address'] = request.remote_addr cursor.execute(""" INSERT INTO event_log (username, event_type, details) VALUES (?, ?, ?) """, (username, event_type, json.dumps(event_details))) conn.commit() conn.close() except sqlite3.Error as e: print(f"ERROR: Failed to log event {event_type} for user {username}: {e}") except TypeError as te: print(f"ERROR: Failed to serialize details for event {event_type}: {te} - Details: {serializable_details}") except Exception as ex: # Catch unexpected errors during logging itself print(f"ERROR: Unexpected error in log_event for {event_type}: {ex}") # --- Business Logic Core (Inventory, Suppliers, Suggestions - Unchanged) --- # (Omitted here for brevity as they are the same as v2.0 logic functions, # assuming they are present above the API routes in the final app.py file) # e.g., get_business_config_logic, update_business_config_logic, # get_inventory_summary_logic, add_inventory_item_logic, update_inventory_quantity_logic, # get_suppliers_logic, add_supplier_logic, # check_and_suggest_reorders_logic, get_pending_suggestions_logic, resolve_suggestion_logic # --- NEW: User Management Logic --- def get_users_logic() -> List[Dict[str, Any]]: """Retrieves a list of all users (excluding sensitive info).""" users = [] try: conn = get_db_connection() cursor = conn.cursor() # Select only safe fields for UI cursor.execute("SELECT username, is_admin, created_at FROM users ORDER BY username;") users = [dict(row) for row in cursor.fetchall()] # Convert to dicts conn.close() except sqlite3.Error as e: print(f"ERROR: Failed to retrieve users: {e}") # Consider raising an exception or returning an error structure return users def add_user_logic(username: str, password: str, is_admin: bool, creating_user_username: str) -> Tuple[bool, str]: """Adds a new user to the database.""" if not username or not password: return False, "Username and password are required." try: # Basic sanitization (more robust validation recommended) username = username.strip() if len(username) < 3: return False, "Username must be at least 3 characters long." if len(password) < 6: return False, "Password must be at least 6 characters long." # Example policy salt = secrets.token_hex(SALT_LENGTH) hashed_password = hash_password(password, salt) conn = get_db_connection() cursor = conn.cursor() # Check if username already exists cursor.execute("SELECT COUNT(*) FROM users WHERE username = ?", (username,)) if cursor.fetchone()[0] > 0: conn.close() log_event(creating_user_username, 'ADD_USER_FAILURE', {'reason': 'Username exists', 'new_username': username}) return False, f"Username '{username}' already exists." # Insert the new user cursor.execute(""" INSERT INTO users (username, hashed_password, salt, is_admin) VALUES (?, ?, ?, ?) """, (username, hashed_password, salt, 1 if is_admin else 0)) conn.commit() conn.close() log_event(creating_user_username, 'USER_ADDED', {'new_username': username, 'is_admin': is_admin}) return True, f"User '{username}' added successfully." except sqlite3.IntegrityError: # Should be caught by the SELECT COUNT(*) but defense in depth log_event(creating_user_username, 'ADD_USER_FAILURE', {'reason': 'Integrity error (duplicate)', 'new_username': username}) return False, f"Username '{username}' already exists (integrity error)." except sqlite3.Error as e: print(f"ERROR: Failed to add user {username}: {e}") log_event(creating_user_username, 'ADD_USER_ERROR', {'new_username': username, 'error': str(e)}) return False, f"Database error adding user: {e}" except Exception as e: print(f"ERROR: Unexpected error in add_user_logic for {username}: {e}") log_event(creating_user_username, 'ADD_USER_UNEXPECTED_ERROR', {'new_username': username, 'error': str(e)}) return False, f"An unexpected server error occurred: {e}" # --- Flask Application Setup --- app = Flask(__name__) app.secret_key = SECRET_KEY # MUST be set for sessions to work # --- HTML/CSS/JS Templates (Embedded in Python String) --- # NOTE: This is the *only* place where the HTML structure changes. # The JS part below will be updated to handle the new User section. HTML_TEMPLATE = """ Business Suite v3.0

Business Suite Login

Business Suite

Logged in as: {{ session.get('username', '') }} ({{ 'Admin' if session.get('is_admin') else 'User' }})

Inventory Management

Current Inventory

Loading inventory...

Supplier Management

Supplier List

Loading suppliers...

Purchase Suggestions

{/* Initially hidden */}

Pending Suggestions

Loading suggestions...
{# NEW: User Management Section #}

User Management

Manage system users. Requires Admin privileges.

{/* Not using form-grid here for simpler layout */}

System Users

Loading users...
""" # --- Flask Application Setup --- app = Flask(__name__) app.secret_key = SECRET_KEY # --- Flask Routes --- @app.route('/') def index(): """Serves the main application page or login page based on session.""" # render_template_string passes Flask session variables to the Jinja2 template # The HTML template uses these to initially set element visibility return render_template_string(HTML_TEMPLATE, session=session) @app.route('/api/login', methods=['POST']) def api_login(): """Handles user login attempt.""" data = request.get_json() username = data.get('username') password = data.get('password') if not username or not password: # Log missing credentials without associating with a specific user (attacker might be guessing) log_event(None, 'LOGIN_FAILURE', {'reason': 'Missing credentials', 'username_attempt': username, 'ip_address': request.remote_addr}) return jsonify({"success": False, "message": "Username and password are required"}), 400 conn = None try: conn = get_db_connection() cursor = conn.cursor() cursor.execute("SELECT username, hashed_password, salt, is_admin FROM users WHERE username = ?", (username,)) user_row = cursor.fetchone() conn.close() # Close connection as early as possible if user_row: stored_hash = user_row['hashed_password'] salt = user_row['salt'] if verify_password(stored_hash, salt, password): # Login successful - store user info in session session['username'] = user_row['username'] session['is_admin'] = bool(user_row['is_admin']) # Store as Python boolean session.permanent = True # Optional: make session permanent log_event(username, 'LOGIN_SUCCESS', {'ip_address': request.remote_addr}) # Return user info including is_admin status to the frontend user_info = {"username": user_row['username'], "is_admin": bool(user_row['is_admin'])} return jsonify({"success": True, "message": "Login successful", "user": user_info}) else: log_event(username, 'LOGIN_FAILURE', {'reason': 'Invalid password', 'ip_address': request.remote_addr}) return jsonify({"success": False, "message": "Invalid username or password"}), 401 else: log_event(username, 'LOGIN_FAILURE', {'reason': 'User not found', 'username_attempt': username, 'ip_address': request.remote_addr}) return jsonify({"success": False, "message": "Invalid username or password"}), 401 except sqlite3.Error as e: print(f"ERROR: Database error during login for user {username}: {e}") log_event(username, 'LOGIN_ERROR', {'error': str(e), 'ip_address': request.remote_addr}) if conn: conn.close() return jsonify({"success": False, "message": "Server error during login"}), 500 except Exception as e: print(f"ERROR: Unexpected error during login for user {username}: {e}") # Log as system event or None user if username couldn't be retrieved log_event(username or 'Unknown', 'LOGIN_UNEXPECTED_ERROR', {'error': str(e), 'ip_address': request.remote_addr}) if conn: conn.close() return jsonify({"success": False, "message": "An unexpected server error occurred"}), 500 @app.route('/api/logout', methods=['POST']) @login_required # Ensure user is logged in to log out def api_logout(): """Logs the user out by clearing the session.""" username = session.get('username', 'Unknown') session.pop('username', None) session.pop('is_admin', None) # Explicitly clear session data from Flask's storage session.clear() log_event(username, 'LOGOUT', {'ip_address': request.remote_addr}) return jsonify({"success": True, "message": "Logged out successfully"}) @app.route('/api/status') def api_status(): """Checks if the user is currently logged in.""" # This endpoint is useful for frontend validation if needed, but index route handles initial state if 'username' in session: user_info = {"username": session['username'], "is_admin": session.get('is_admin', False)} return jsonify({"logged_in": True, "user": user_info}) else: return jsonify({"logged_in": False}) # --- API Routes for Business Logic (Placeholder - Assume logic functions are defined above) --- # Example: # @app.route('/api/inventory', methods=['GET']) # @login_required # def api_get_inventory(): # try: # items = get_inventory_summary_logic() # return jsonify({"success": True, "inventory": items}) # except Exception as e: # return jsonify({"success": False, "message": f"Failed: {e}"}), 500 # ... (other API routes for inventory, suppliers, suggestions from v2) # --- NEW: API Routes for User Management --- @app.route('/api/users', methods=['GET']) @admin_required # Only admins can list users def api_get_users(): """API endpoint to get list of users.""" try: users = get_users_logic() # Exclude password/salt from API response for security safe_users = [{"username": u['username'], "is_admin": bool(u['is_admin']), "created_at": u['created_at']} for u in users] return jsonify({"success": True, "users": safe_users}) except Exception as e: print(f"API ERROR: /api/users GET: {e}") # Log admin-only error under the admin's username log_event(session.get('username', 'Unknown Admin Attempt'), 'GET_USERS_API_ERROR', {'error': str(e)}) return jsonify({"success": False, "message": f"Failed to retrieve users: {e}"}), 500 @app.route('/api/users', methods=['POST']) @admin_required # Only admins can add users def api_add_user(): """API endpoint to add a new user.""" data = request.get_json() if not data: log_event(session.get('username', 'Unknown Admin Attempt'), 'ADD_USER_API_FAILURE', {'reason': 'No data'}) return jsonify({"success": False, "message": "Invalid request body"}), 400 username = data.get('username') password = data.get('password') is_admin = data.get('is_admin', False) # Default to False if not provided creating_user_username = session.get('username', 'Unknown Admin') # The admin performing the action # Pass validation responsibility to the logic function success, message = add_user_logic(username, password, bool(is_admin), creating_user_username) if success: return jsonify({"success": True, "message": message}) else: # Determine status code: 409 Conflict for duplicate, 400 for bad input status_code = 409 if 'already exists' in message else 400 return jsonify({"success": False, "message": message}), status_code # Any unhandled exceptions would be caught by Flask's default error handler or gunicorn/web server # For impeccable error handling, add a broad try...except around the logic call here too # --- Application Entry Point --- if __name__ == "__main__": print("INFO: Starting Business Automation Suite v3.0 (Flask)...") print(f"INFO: Using database file: {os.path.abspath(DB_NAME)}") # Ensure database exists and schema is set up before launching Flask try: setup_database() except Exception as e: print(f"FATAL: Could not initialize database. Exiting. Error: {e}") exit(1) print("INFO: Launching Flask application...") # host='0.0.0.0' is essential for accessibility within Docker/HF Space # debug=False is important for production environments # Use gunicorn or similar WSGI server for production instead of app.run() # For HF Spaces, `app.run()` with server_name/port usually works via their infrastructure app.run(host='0.0.0.0', port=7860, debug=False) print("INFO: Application stopped.")