Spaces:
Build error
Build error
| from flask import Flask, render_template, request, redirect, url_for, send_from_directory, session, safe_join | |
| import json | |
| import random | |
| import os | |
| import string | |
| import logging | |
| # Set up logging | |
| logging.basicConfig(level=logging.INFO, | |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
| handlers=[ | |
| logging.FileHandler("app.log"), | |
| logging.StreamHandler() | |
| ]) | |
| logger = logging.getLogger(__name__) | |
| app = Flask(__name__) | |
| app.config['SECRET_KEY'] = 'supersecretkey' # Change this to a random secret key | |
| # Directories for visualizations | |
| VISUALIZATION_DIRS_PLAN_OF_SQLS = { | |
| "TP": "visualizations/TP", | |
| "TN": "visualizations/TN", | |
| "FP": "visualizations/FP", | |
| "FN": "visualizations/FN" | |
| } | |
| VISUALIZATION_DIRS_CHAIN_OF_TABLE = { | |
| "TP": "htmls_COT/TP", | |
| "TN": "htmls_COT/TN", | |
| "FP": "htmls_COT/FP", | |
| "FN": "htmls_COT/FN" | |
| } | |
| # Function to save session data to a file | |
| def save_session_data(username, data): | |
| os.makedirs('session_data', exist_ok=True) | |
| with open(f'session_data/{username}_session.json', 'w') as f: | |
| json.dump(data, f) | |
| # Function to load session data from a file | |
| def load_session_data(username): | |
| try: | |
| with open(f'session_data/{username}_session.json', 'r') as f: | |
| return json.load(f) | |
| except FileNotFoundError: | |
| return None | |
| # Load all sample files from the directories based on the selected method | |
| def load_samples(method): | |
| logger.info(f"Loading samples for method: {method}") | |
| if method == "Chain-of-Table": | |
| visualization_dirs = VISUALIZATION_DIRS_CHAIN_OF_TABLE | |
| else: | |
| visualization_dirs = VISUALIZATION_DIRS_PLAN_OF_SQLS | |
| samples = {"TP": [], "TN": [], "FP": [], "FN": []} | |
| for category, dir_path in visualization_dirs.items(): | |
| try: | |
| for filename in os.listdir(dir_path): | |
| if filename.endswith(".html"): | |
| samples[category].append(filename) | |
| logger.info(f"Loaded {len(samples[category])} samples for category {category}") | |
| except Exception as e: | |
| logger.exception(f"Error loading samples from {dir_path}: {e}") | |
| return samples | |
| # Randomly select balanced samples | |
| def select_balanced_samples(samples): | |
| try: | |
| tp_fp_samples = random.sample(samples["TP"] + samples["FP"], 5) | |
| tn_fn_samples = random.sample(samples["TN"] + samples["FN"], 5) | |
| logger.info(f"Selected balanced samples: {len(tp_fp_samples + tn_fn_samples)}") | |
| return tp_fp_samples + tn_fn_samples | |
| except Exception as e: | |
| logger.exception("Error selecting balanced samples") | |
| return [] | |
| def generate_random_string(length=8): | |
| return ''.join(random.choices(string.ascii_letters + string.digits, k=length)) | |
| def index(): | |
| logger.info("Rendering index page.") | |
| if request.method == 'POST': | |
| username = request.form.get('username') | |
| seed = request.form.get('seed') | |
| method = request.form.get('method') | |
| if not username or not seed or not method: | |
| logger.error("Missing username, seed, or method.") | |
| return "Missing username, seed, or method", 400 | |
| try: | |
| seed = int(seed) | |
| random.seed(seed) | |
| all_samples = load_samples(method) | |
| selected_samples = select_balanced_samples(all_samples) | |
| logger.info(f"Number of selected samples: {len(selected_samples)}") | |
| if len(selected_samples) == 0: | |
| logger.error("No samples were selected.") | |
| return "No samples were selected", 500 | |
| filename = f'{username}_{seed}_{method}_{generate_random_string()}.json' | |
| logger.info(f"Generated filename: {filename}") | |
| # Save session data | |
| session_data = { | |
| 'responses': [], | |
| 'username': username, | |
| 'selected_samples': selected_samples, | |
| 'method': method, | |
| 'filename': filename, | |
| 'current_index': 0 | |
| } | |
| save_session_data(username, session_data) | |
| logger.info(f"Session data saved for user: {username}") | |
| return redirect(url_for('experiment', username=username)) | |
| except Exception as e: | |
| logger.exception(f"Error in index route: {e}") | |
| return "An error occurred", 500 | |
| return render_template('index.html') | |
| def experiment(username): | |
| try: | |
| session_data = load_session_data(username) | |
| if not session_data: | |
| logger.error(f"No session data found for user: {username}") | |
| return redirect(url_for('index')) | |
| selected_samples = session_data['selected_samples'] | |
| method = session_data['method'] | |
| current_index = session_data['current_index'] | |
| if current_index >= len(selected_samples): | |
| return redirect(url_for('completed', username=username)) | |
| visualization_file = selected_samples[current_index] | |
| if method == "Chain-of-Table": | |
| vis_dir = 'htmls_COT' | |
| else: | |
| vis_dir = 'visualizations' | |
| # Determine the correct visualization directory based on the category | |
| for category, dir_path in (VISUALIZATION_DIRS_CHAIN_OF_TABLE if method == "Chain-of-Table" else VISUALIZATION_DIRS_PLAN_OF_SQLS).items(): | |
| if visualization_file in os.listdir(dir_path): | |
| visualization_path = os.path.join(vis_dir, category, visualization_file) | |
| break | |
| else: | |
| logger.error(f"Visualization file {visualization_file} not found.") | |
| return "Visualization file not found", 404 | |
| logger.info(f"Rendering experiment page with visualization: {visualization_path}") | |
| statement = "Please make a decision to Accept/Reject the AI prediction based on the explanation." | |
| return render_template('experiment.html', | |
| sample_id=current_index, | |
| statement=statement, | |
| visualization=url_for('send_visualization', filename=visualization_path), | |
| username=username) | |
| except Exception as e: | |
| logger.exception(f"An error occurred in the experiment route: {e}") | |
| return "An error occurred", 500 | |
| def feedback(): | |
| try: | |
| username = request.form['username'] | |
| feedback = request.form['feedback'] | |
| session_data = load_session_data(username) | |
| if not session_data: | |
| logger.error(f"No session data found for user: {username}") | |
| return redirect(url_for('index')) | |
| # Store the feedback | |
| session_data['responses'].append({ | |
| 'sample_id': session_data['current_index'], | |
| 'feedback': feedback | |
| }) | |
| # Move to the next sample | |
| session_data['current_index'] += 1 | |
| # Save updated session data | |
| save_session_data(username, session_data) | |
| if session_data['current_index'] >= len(session_data['selected_samples']): | |
| return redirect(url_for('completed', username=username)) | |
| return redirect(url_for('experiment', username=username)) | |
| except Exception as e: | |
| logger.exception(f"Error in feedback route: {e}") | |
| return "An error occurred", 500 | |
| def completed(username): | |
| try: | |
| session_data = load_session_data(username) | |
| if not session_data: | |
| logger.error(f"No session data found for user: {username}") | |
| return redirect(url_for('index')) | |
| responses = session_data['responses'] | |
| method = session_data['method'] | |
| if method == "Chain-of-Table": | |
| json_file = 'Tabular_LLMs_human_study_vis_6_COT.json' | |
| else: # Default to Plan-of-SQLs | |
| json_file = 'Tabular_LLMs_human_study_vis_6.json' | |
| with open(json_file, 'r') as f: | |
| ground_truth = json.load(f) | |
| correct_responses = 0 | |
| accept_count = 0 | |
| reject_count = 0 | |
| for response in responses: | |
| sample_id = response['sample_id'] | |
| feedback = response['feedback'] | |
| visualization_file = session_data['selected_samples'][sample_id] | |
| index = visualization_file.split('-')[1].split('.')[0] # Extract index from filename | |
| if feedback.upper() == "TRUE": | |
| accept_count += 1 | |
| elif feedback.upper() == "FALSE": | |
| reject_count += 1 | |
| if method == "Chain-of-Table": | |
| ground_truth_key = f"COT_test-{index}.html" | |
| else: | |
| ground_truth_key = f"POS_test-{index}.html" | |
| if ground_truth_key in ground_truth and ground_truth[ground_truth_key]['answer'].upper() == feedback.upper(): | |
| correct_responses += 1 | |
| else: | |
| logger.warning(f"Missing or mismatched key: {ground_truth_key}") | |
| accuracy = (correct_responses / len(responses)) * 100 if responses else 0 | |
| accuracy = round(accuracy, 2) | |
| accept_percentage = (accept_count / len(responses)) * 100 if len(responses) else 0 | |
| reject_percentage = (reject_count / len(responses)) * 100 if len(responses) else 0 | |
| accept_percentage = round(accept_percentage, 2) | |
| reject_percentage = round(reject_percentage, 2) | |
| return render_template('completed.html', | |
| accuracy=accuracy, | |
| accept_percentage=accept_percentage, | |
| reject_percentage=reject_percentage) | |
| except Exception as e: | |
| logger.exception(f"An error occurred in the completed route: {e}") | |
| return "An error occurred", 500 | |
| def send_visualization(filename): | |
| logger.info(f"Attempting to serve file: {filename}") | |
| # Ensure the path is safe and doesn't allow access to files outside the intended directory | |
| safe_path = safe_join(os.getcwd(), filename) | |
| directory = os.path.dirname(safe_path) | |
| file_name = os.path.basename(safe_path) | |
| logger.info(f"Serving file from directory: {directory}, filename: {file_name}") | |
| return send_from_directory(directory, file_name) | |
| if __name__ == "__main__": | |
| os.makedirs('session_data', exist_ok=True) # Ensure the directory for session files exists | |
| app.run(host="0.0.0.0", port=7860, debug=True) |