import os import requests import csv import json from flask import Flask, jsonify, render_template, request, redirect, url_for, session from flask_socketio import SocketIO, emit from dotenv import load_dotenv from datetime import datetime import logging # --- App Configuration --- load_dotenv() app = Flask(__name__) app.secret_key = os.getenv('FLASK_SECRET_KEY', 'a-very-secret-key-change-in-production') socketio = SocketIO(app) # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # --- File & Airtable Configuration --- DATA_DIR = 'data' USERS_FILE = os.path.join(DATA_DIR, 'users.csv') CHANGE_LOG_FILE = os.path.join(DATA_DIR, 'change_log.csv') USER_PREFERENCES_FILE = os.path.join(DATA_DIR, 'user_preferences.json') AIRTABLE_API_KEY = os.getenv('AIRTABLE_API_KEY') AIRTABLE_BASE_ID = os.getenv('AIRTABLE_BASE_ID') APPOINTMENTS_TABLE = os.getenv('AIRTABLE_APPOINTMENTS_TABLE', 'Appointments') CALLS_TABLE = os.getenv('AIRTABLE_CALLS_TABLE', 'CallRecording') AIRTABLE_API_URL = f"https://api.airtable.com/v0/{AIRTABLE_BASE_ID}" RESULTS_PER_PAGE = int(os.getenv('RESULT_LIMIT', 50)) N8N_WEBHOOK_SECRET = os.getenv('N8N_WEBHOOK_SECRET') # --- Helper Functions --- def airtable_request(table_name, record_id=None, method='GET', params=None, json_data=None): """Generic function to make requests to the Airtable API.""" url = f"{AIRTABLE_API_URL}/{table_name}" if record_id: url += f"/{record_id}" if not AIRTABLE_API_KEY or not AIRTABLE_BASE_ID: logger.error("Airtable credentials are not set in the .env file.") return None headers = { 'Authorization': f'Bearer {AIRTABLE_API_KEY}', 'Content-Type': 'application/json' } try: response = requests.request(method, url, headers=headers, params=params, json=json_data, timeout=10) response.raise_for_status() return response.json() except requests.exceptions.Timeout: logger.error(f"Airtable API timeout for {table_name}") return None except requests.exceptions.RequestException as e: logger.error(f"Airtable API Error: {e}") if hasattr(e, 'response') and e.response is not None: logger.error(f"Response content: {e.response.text}") return None def flatten_airtable_records(records): """Flatten Airtable records for easier use in frontend.""" if not records: return [] return [dict(id=rec.get('id'), **rec.get('fields', {})) for rec in records] def get_all_records(table_name): """Fetch all records from an Airtable table, handling pagination.""" all_records = [] params = {'pageSize': 100} while True: data = airtable_request(table_name, params=params) if not data: break records = data.get('records', []) all_records.extend(records) # Check for more pages offset = data.get('offset') if not offset: break params['offset'] = offset return all_records def validate_user_credentials(username, password): """Validate user credentials from CSV file.""" try: if not os.path.exists(USERS_FILE): logger.error(f"Users file not found: {USERS_FILE}") return False with open(USERS_FILE, mode='r', encoding='utf-8') as infile: reader = csv.DictReader(infile) for row in reader: if row.get('username') == username and row.get('password') == password: return True return False except Exception as e: logger.error(f"Error validating credentials: {e}") return False def log_change(username, record_id, table_name, field_name, old_value, new_value, change_type='update'): """Log changes to a CSV file.""" try: os.makedirs(DATA_DIR, exist_ok=True) # Create log file with headers if it doesn't exist if not os.path.exists(CHANGE_LOG_FILE): with open(CHANGE_LOG_FILE, 'w', newline='', encoding='utf-8') as f: writer = csv.writer(f) writer.writerow(['timestamp', 'username', 'table_name', 'record_id', 'field_name', 'old_value', 'new_value', 'change_type']) # Append the change with open(CHANGE_LOG_FILE, 'a', newline='', encoding='utf-8') as f: writer = csv.writer(f) writer.writerow([ datetime.now().isoformat(), username, table_name, record_id, field_name, str(old_value), str(new_value), change_type ]) logger.info(f"Logged change: {username} updated {table_name}:{record_id}.{field_name}") except Exception as e: logger.error(f"Error logging change: {e}") def save_user_preferences(username, preferences): """Save user preferences to JSON file.""" try: os.makedirs(DATA_DIR, exist_ok=True) # Load existing preferences all_preferences = {} if os.path.exists(USER_PREFERENCES_FILE): with open(USER_PREFERENCES_FILE, 'r', encoding='utf-8') as f: all_preferences = json.load(f) # Update preferences for this user all_preferences[username] = preferences # Save back to file with open(USER_PREFERENCES_FILE, 'w', encoding='utf-8') as f: json.dump(all_preferences, f, indent=2) except Exception as e: logger.error(f"Error saving preferences: {e}") def load_user_preferences(username): """Load user preferences from JSON file.""" try: if not os.path.exists(USER_PREFERENCES_FILE): return {} with open(USER_PREFERENCES_FILE, 'r', encoding='utf-8') as f: all_preferences = json.load(f) return all_preferences.get(username, {}) except Exception as e: logger.error(f"Error loading preferences: {e}") return {} def create_default_files(): """Create default CSV files if they don't exist.""" os.makedirs(DATA_DIR, exist_ok=True) # Create users file if not os.path.exists(USERS_FILE): with open(USERS_FILE, 'w', newline='', encoding='utf-8') as f: writer = csv.writer(f) writer.writerow(['username', 'password']) writer.writerow(['admin', 'admin123']) logger.info(f"Created default users file at {USERS_FILE}") # Create change log file if not os.path.exists(CHANGE_LOG_FILE): with open(CHANGE_LOG_FILE, 'w', newline='', encoding='utf-8') as f: writer = csv.writer(f) writer.writerow(['timestamp', 'username', 'table_name', 'record_id', 'field_name', 'old_value', 'new_value', 'change_type']) logger.info(f"Created change log file at {CHANGE_LOG_FILE}") # --- Authentication Routes --- @app.route('/login', methods=['GET', 'POST']) def login(): error = None if request.method == 'POST': username = request.form.get('username', '').strip() password = request.form.get('password', '').strip() if not username or not password: error = 'Please enter both username and password.' elif validate_user_credentials(username, password): session['logged_in'] = True session['username'] = username session.permanent = True logger.info(f"Successful login for user: {username}") return redirect(url_for('dashboard')) else: error = 'Invalid username or password.' logger.warning(f"Failed login attempt for user: {username}") return render_template('login.html', error=error) @app.route('/logout') def logout(): username = session.get('username', 'Unknown') session.clear() logger.info(f"User logged out: {username}") return redirect(url_for('login')) # --- Main Application Routes --- @app.route('/') def dashboard(): if not session.get('logged_in'): return redirect(url_for('login')) return render_template( 'dashboard.html', username=session.get('username', 'User'), results_per_page=RESULTS_PER_PAGE ) # --- API Endpoints --- @app.route('/api/data') def get_data(): """Fetch all data from Airtable tables.""" if not session.get('logged_in'): return jsonify({'error': 'Unauthorized'}), 401 try: if not AIRTABLE_API_KEY or not AIRTABLE_BASE_ID: logger.warning("Airtable not configured, returning empty data") return jsonify({ 'appointments': [], 'calls': [], 'error': 'Airtable not configured. Please check your .env file.' }) appointments_records = get_all_records(APPOINTMENTS_TABLE) calls_records = get_all_records(CALLS_TABLE) return jsonify({ 'appointments': flatten_airtable_records(appointments_records), 'calls': flatten_airtable_records(calls_records) }) except Exception as e: logger.error(f"Error fetching data: {e}") return jsonify({'error': str(e)}), 500 @app.route('/api/appointments/update/', methods=['PATCH']) def update_appointment_field(record_id): """Update any field of an appointment.""" if not session.get('logged_in'): return jsonify({'error': 'Unauthorized'}), 401 try: data = request.json field_name = data.get('field_name') new_value = data.get('new_value') old_value = data.get('old_value', '') if not field_name or new_value is None: return jsonify({'error': 'Field name and new value are required'}), 400 payload = {'fields': {field_name: new_value}} response = airtable_request(APPOINTMENTS_TABLE, record_id=record_id, method='PATCH', json_data=payload) if response: username = session.get('username', 'Unknown') log_change(username, record_id, 'Appointments', field_name, old_value, new_value) logger.info(f"Updated appointment {record_id} field {field_name} to {new_value}") return jsonify({ 'success': True, 'record': flatten_airtable_records([response])[0] }), 200 else: return jsonify({'error': 'Failed to update field in Airtable'}), 500 except Exception as e: logger.error(f"Error updating appointment: {e}") return jsonify({'error': str(e)}), 500 @app.route('/api/preferences', methods=['GET', 'POST']) def handle_preferences(): """Handle user preferences.""" if not session.get('logged_in'): return jsonify({'error': 'Unauthorized'}), 401 username = session.get('username') if request.method == 'GET': preferences = load_user_preferences(username) return jsonify(preferences) elif request.method == 'POST': preferences = request.json save_user_preferences(username, preferences) return jsonify({'success': True}) @app.route('/api/change_log') def get_change_log(): """Get recent change log entries.""" if not session.get('logged_in'): return jsonify({'error': 'Unauthorized'}), 401 try: if not os.path.exists(CHANGE_LOG_FILE): return jsonify({'changes': []}) changes = [] with open(CHANGE_LOG_FILE, 'r', encoding='utf-8') as f: reader = csv.DictReader(f) changes = list(reader) # Return most recent 100 changes changes.reverse() return jsonify({'changes': changes[:100]}) except Exception as e: logger.error(f"Error fetching change log: {e}") return jsonify({'error': str(e)}), 500 @app.route('/api/webhook/refresh', methods=['POST']) def webhook_refresh(): """Webhook endpoint for n8n to trigger a refresh.""" if N8N_WEBHOOK_SECRET: webhook_secret = request.headers.get('X-N8N-Webhook-Secret') if webhook_secret != N8N_WEBHOOK_SECRET: logger.warning("Unauthorized webhook attempt.") return jsonify({'error': 'Unauthorized'}), 401 logger.info("Webhook received. Triggering client refresh via Socket.IO.") socketio.emit('refresh_data', {'message': 'Airtable data has changed.'}) return jsonify({'success': True, 'message': 'Refresh event emitted to all clients.'}), 200 # --- Error Handlers --- @app.errorhandler(404) def not_found(error): if request.path.startswith('/api/'): return jsonify({'error': 'Endpoint not found'}), 404 return render_template('404.html'), 404 @app.errorhandler(500) def internal_error(error): logger.error(f"Internal server error: {error}") if request.path.startswith('/api/'): return jsonify({'error': 'Internal server error'}), 500 return render_template('500.html'), 500 # --- Main Execution --- if __name__ == '__main__': create_default_files() print("=" * 50) print(" ENHANCED DENTAL ADMIN APPLICATION") print("=" * 50) if not (AIRTABLE_API_KEY and AIRTABLE_BASE_ID): print("⚠️ WARNING: Airtable credentials not configured!") else: print("✅ Airtable configuration loaded") print(" Default login: admin / admin123") print(" Features: Inline editing, Change logging, Persistent settings") print("=" * 50) socketio.run(app, debug=False, host="0.0.0.0", port=7860, allow_unsafe_werkzeug=True)