Spaces:
Sleeping
Sleeping
| import os | |
| import requests | |
| import csv | |
| import json | |
| from flask import Flask, jsonify, render_template, request, redirect, url_for, session | |
| from flask_socketio import SocketIO, emit | |
| from dotenv import load_dotenv | |
| from datetime import datetime | |
| import logging | |
| # --- App Configuration --- | |
| load_dotenv() | |
| app = Flask(__name__) | |
| app.secret_key = os.getenv('FLASK_SECRET_KEY', 'a-very-secret-key-change-in-production') | |
| socketio = SocketIO(app) | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # --- File & Airtable Configuration --- | |
| DATA_DIR = 'data' | |
| USERS_FILE = os.path.join(DATA_DIR, 'users.csv') | |
| CHANGE_LOG_FILE = os.path.join(DATA_DIR, 'change_log.csv') | |
| USER_PREFERENCES_FILE = os.path.join(DATA_DIR, 'user_preferences.json') | |
| AIRTABLE_API_KEY = os.getenv('AIRTABLE_API_KEY') | |
| AIRTABLE_BASE_ID = os.getenv('AIRTABLE_BASE_ID') | |
| APPOINTMENTS_TABLE = os.getenv('AIRTABLE_APPOINTMENTS_TABLE', 'Appointments') | |
| CALLS_TABLE = os.getenv('AIRTABLE_CALLS_TABLE', 'CallRecording') | |
| AIRTABLE_API_URL = f"https://api.airtable.com/v0/{AIRTABLE_BASE_ID}" | |
| RESULTS_PER_PAGE = int(os.getenv('RESULT_LIMIT', 50)) | |
| N8N_WEBHOOK_SECRET = os.getenv('N8N_WEBHOOK_SECRET') | |
| # --- Helper Functions --- | |
| def airtable_request(table_name, record_id=None, method='GET', params=None, json_data=None): | |
| """Generic function to make requests to the Airtable API.""" | |
| url = f"{AIRTABLE_API_URL}/{table_name}" | |
| if record_id: | |
| url += f"/{record_id}" | |
| if not AIRTABLE_API_KEY or not AIRTABLE_BASE_ID: | |
| logger.error("Airtable credentials are not set in the .env file.") | |
| return None | |
| headers = { | |
| 'Authorization': f'Bearer {AIRTABLE_API_KEY}', | |
| 'Content-Type': 'application/json' | |
| } | |
| try: | |
| response = requests.request(method, url, headers=headers, params=params, json=json_data, timeout=10) | |
| response.raise_for_status() | |
| return response.json() | |
| except requests.exceptions.Timeout: | |
| logger.error(f"Airtable API timeout for {table_name}") | |
| return None | |
| except requests.exceptions.RequestException as e: | |
| logger.error(f"Airtable API Error: {e}") | |
| if hasattr(e, 'response') and e.response is not None: | |
| logger.error(f"Response content: {e.response.text}") | |
| return None | |
| def flatten_airtable_records(records): | |
| """Flatten Airtable records for easier use in frontend.""" | |
| if not records: | |
| return [] | |
| return [dict(id=rec.get('id'), **rec.get('fields', {})) for rec in records] | |
| def get_all_records(table_name): | |
| """Fetch all records from an Airtable table, handling pagination.""" | |
| all_records = [] | |
| params = {'pageSize': 100} | |
| while True: | |
| data = airtable_request(table_name, params=params) | |
| if not data: | |
| break | |
| records = data.get('records', []) | |
| all_records.extend(records) | |
| # Check for more pages | |
| offset = data.get('offset') | |
| if not offset: | |
| break | |
| params['offset'] = offset | |
| return all_records | |
| def validate_user_credentials(username, password): | |
| """Validate user credentials from CSV file.""" | |
| try: | |
| if not os.path.exists(USERS_FILE): | |
| logger.error(f"Users file not found: {USERS_FILE}") | |
| return False | |
| with open(USERS_FILE, mode='r', encoding='utf-8') as infile: | |
| reader = csv.DictReader(infile) | |
| for row in reader: | |
| if row.get('username') == username and row.get('password') == password: | |
| return True | |
| return False | |
| except Exception as e: | |
| logger.error(f"Error validating credentials: {e}") | |
| return False | |
| def log_change(username, record_id, table_name, field_name, old_value, new_value, change_type='update'): | |
| """Log changes to a CSV file.""" | |
| try: | |
| os.makedirs(DATA_DIR, exist_ok=True) | |
| # Create log file with headers if it doesn't exist | |
| if not os.path.exists(CHANGE_LOG_FILE): | |
| with open(CHANGE_LOG_FILE, 'w', newline='', encoding='utf-8') as f: | |
| writer = csv.writer(f) | |
| writer.writerow(['timestamp', 'username', 'table_name', 'record_id', 'field_name', 'old_value', 'new_value', 'change_type']) | |
| # Append the change | |
| with open(CHANGE_LOG_FILE, 'a', newline='', encoding='utf-8') as f: | |
| writer = csv.writer(f) | |
| writer.writerow([ | |
| datetime.now().isoformat(), | |
| username, | |
| table_name, | |
| record_id, | |
| field_name, | |
| str(old_value), | |
| str(new_value), | |
| change_type | |
| ]) | |
| logger.info(f"Logged change: {username} updated {table_name}:{record_id}.{field_name}") | |
| except Exception as e: | |
| logger.error(f"Error logging change: {e}") | |
| def save_user_preferences(username, preferences): | |
| """Save user preferences to JSON file.""" | |
| try: | |
| os.makedirs(DATA_DIR, exist_ok=True) | |
| # Load existing preferences | |
| all_preferences = {} | |
| if os.path.exists(USER_PREFERENCES_FILE): | |
| with open(USER_PREFERENCES_FILE, 'r', encoding='utf-8') as f: | |
| all_preferences = json.load(f) | |
| # Update preferences for this user | |
| all_preferences[username] = preferences | |
| # Save back to file | |
| with open(USER_PREFERENCES_FILE, 'w', encoding='utf-8') as f: | |
| json.dump(all_preferences, f, indent=2) | |
| except Exception as e: | |
| logger.error(f"Error saving preferences: {e}") | |
| def load_user_preferences(username): | |
| """Load user preferences from JSON file.""" | |
| try: | |
| if not os.path.exists(USER_PREFERENCES_FILE): | |
| return {} | |
| with open(USER_PREFERENCES_FILE, 'r', encoding='utf-8') as f: | |
| all_preferences = json.load(f) | |
| return all_preferences.get(username, {}) | |
| except Exception as e: | |
| logger.error(f"Error loading preferences: {e}") | |
| return {} | |
| def create_default_files(): | |
| """Create default CSV files if they don't exist.""" | |
| os.makedirs(DATA_DIR, exist_ok=True) | |
| # Create users file | |
| if not os.path.exists(USERS_FILE): | |
| with open(USERS_FILE, 'w', newline='', encoding='utf-8') as f: | |
| writer = csv.writer(f) | |
| writer.writerow(['username', 'password']) | |
| writer.writerow(['admin', 'admin123']) | |
| logger.info(f"Created default users file at {USERS_FILE}") | |
| # Create change log file | |
| if not os.path.exists(CHANGE_LOG_FILE): | |
| with open(CHANGE_LOG_FILE, 'w', newline='', encoding='utf-8') as f: | |
| writer = csv.writer(f) | |
| writer.writerow(['timestamp', 'username', 'table_name', 'record_id', 'field_name', 'old_value', 'new_value', 'change_type']) | |
| logger.info(f"Created change log file at {CHANGE_LOG_FILE}") | |
| # --- Authentication Routes --- | |
| def login(): | |
| error = None | |
| if request.method == 'POST': | |
| username = request.form.get('username', '').strip() | |
| password = request.form.get('password', '').strip() | |
| if not username or not password: | |
| error = 'Please enter both username and password.' | |
| elif validate_user_credentials(username, password): | |
| session['logged_in'] = True | |
| session['username'] = username | |
| session.permanent = True | |
| logger.info(f"Successful login for user: {username}") | |
| return redirect(url_for('dashboard')) | |
| else: | |
| error = 'Invalid username or password.' | |
| logger.warning(f"Failed login attempt for user: {username}") | |
| return render_template('login.html', error=error) | |
| def logout(): | |
| username = session.get('username', 'Unknown') | |
| session.clear() | |
| logger.info(f"User logged out: {username}") | |
| return redirect(url_for('login')) | |
| # --- Main Application Routes --- | |
| def dashboard(): | |
| if not session.get('logged_in'): | |
| return redirect(url_for('login')) | |
| return render_template( | |
| 'dashboard.html', | |
| username=session.get('username', 'User'), | |
| results_per_page=RESULTS_PER_PAGE | |
| ) | |
| # --- API Endpoints --- | |
| def get_data(): | |
| """Fetch all data from Airtable tables.""" | |
| if not session.get('logged_in'): | |
| return jsonify({'error': 'Unauthorized'}), 401 | |
| try: | |
| if not AIRTABLE_API_KEY or not AIRTABLE_BASE_ID: | |
| logger.warning("Airtable not configured, returning empty data") | |
| return jsonify({ | |
| 'appointments': [], | |
| 'calls': [], | |
| 'error': 'Airtable not configured. Please check your .env file.' | |
| }) | |
| appointments_records = get_all_records(APPOINTMENTS_TABLE) | |
| calls_records = get_all_records(CALLS_TABLE) | |
| return jsonify({ | |
| 'appointments': flatten_airtable_records(appointments_records), | |
| 'calls': flatten_airtable_records(calls_records) | |
| }) | |
| except Exception as e: | |
| logger.error(f"Error fetching data: {e}") | |
| return jsonify({'error': str(e)}), 500 | |
| def update_appointment_field(record_id): | |
| """Update any field of an appointment.""" | |
| if not session.get('logged_in'): | |
| return jsonify({'error': 'Unauthorized'}), 401 | |
| try: | |
| data = request.json | |
| field_name = data.get('field_name') | |
| new_value = data.get('new_value') | |
| old_value = data.get('old_value', '') | |
| if not field_name or new_value is None: | |
| return jsonify({'error': 'Field name and new value are required'}), 400 | |
| payload = {'fields': {field_name: new_value}} | |
| response = airtable_request(APPOINTMENTS_TABLE, record_id=record_id, method='PATCH', json_data=payload) | |
| if response: | |
| username = session.get('username', 'Unknown') | |
| log_change(username, record_id, 'Appointments', field_name, old_value, new_value) | |
| logger.info(f"Updated appointment {record_id} field {field_name} to {new_value}") | |
| return jsonify({ | |
| 'success': True, | |
| 'record': flatten_airtable_records([response])[0] | |
| }), 200 | |
| else: | |
| return jsonify({'error': 'Failed to update field in Airtable'}), 500 | |
| except Exception as e: | |
| logger.error(f"Error updating appointment: {e}") | |
| return jsonify({'error': str(e)}), 500 | |
| def handle_preferences(): | |
| """Handle user preferences.""" | |
| if not session.get('logged_in'): | |
| return jsonify({'error': 'Unauthorized'}), 401 | |
| username = session.get('username') | |
| if request.method == 'GET': | |
| preferences = load_user_preferences(username) | |
| return jsonify(preferences) | |
| elif request.method == 'POST': | |
| preferences = request.json | |
| save_user_preferences(username, preferences) | |
| return jsonify({'success': True}) | |
| def get_change_log(): | |
| """Get recent change log entries.""" | |
| if not session.get('logged_in'): | |
| return jsonify({'error': 'Unauthorized'}), 401 | |
| try: | |
| if not os.path.exists(CHANGE_LOG_FILE): | |
| return jsonify({'changes': []}) | |
| changes = [] | |
| with open(CHANGE_LOG_FILE, 'r', encoding='utf-8') as f: | |
| reader = csv.DictReader(f) | |
| changes = list(reader) | |
| # Return most recent 100 changes | |
| changes.reverse() | |
| return jsonify({'changes': changes[:100]}) | |
| except Exception as e: | |
| logger.error(f"Error fetching change log: {e}") | |
| return jsonify({'error': str(e)}), 500 | |
| def webhook_refresh(): | |
| """Webhook endpoint for n8n to trigger a refresh.""" | |
| if N8N_WEBHOOK_SECRET: | |
| webhook_secret = request.headers.get('X-N8N-Webhook-Secret') | |
| if webhook_secret != N8N_WEBHOOK_SECRET: | |
| logger.warning("Unauthorized webhook attempt.") | |
| return jsonify({'error': 'Unauthorized'}), 401 | |
| logger.info("Webhook received. Triggering client refresh via Socket.IO.") | |
| socketio.emit('refresh_data', {'message': 'Airtable data has changed.'}) | |
| return jsonify({'success': True, 'message': 'Refresh event emitted to all clients.'}), 200 | |
| # --- Error Handlers --- | |
| def not_found(error): | |
| if request.path.startswith('/api/'): | |
| return jsonify({'error': 'Endpoint not found'}), 404 | |
| return render_template('404.html'), 404 | |
| def internal_error(error): | |
| logger.error(f"Internal server error: {error}") | |
| if request.path.startswith('/api/'): | |
| return jsonify({'error': 'Internal server error'}), 500 | |
| return render_template('500.html'), 500 | |
| # --- Main Execution --- | |
| if __name__ == '__main__': | |
| create_default_files() | |
| print("=" * 50) | |
| print(" ENHANCED DENTAL ADMIN APPLICATION") | |
| print("=" * 50) | |
| if not (AIRTABLE_API_KEY and AIRTABLE_BASE_ID): | |
| print("⚠️ WARNING: Airtable credentials not configured!") | |
| else: | |
| print("✅ Airtable configuration loaded") | |
| print(" Default login: admin / admin123") | |
| print(" Features: Inline editing, Change logging, Persistent settings") | |
| print("=" * 50) | |
| socketio.run(app, debug=False, host="0.0.0.0", port=7860, allow_unsafe_werkzeug=True) |