Spaces:
Sleeping
Sleeping
| import threading | |
| from flask import Flask, g, render_template, request, redirect, url_for, session | |
| import os | |
| import time | |
| from huggingface_hub import login, HfApi, hf_hub_download # For Hugging Face integration | |
| import os | |
| import logging | |
| import csv | |
| import random | |
| import shortuuid | |
| import json | |
| import pandas as pd | |
| from filelock import FileLock | |
| # Flask application setup | |
| app = Flask(__name__) | |
| import os, secrets | |
| app = Flask(__name__) | |
| # Use a persistent env var in prod; fallback only for local/dev | |
| app.config["SECRET_KEY"] = os.environ.get("FLASK_SECRET_KEY") or secrets.token_hex(32) | |
| # app.config['SECRET_KEY'] = os.environ.get('FLASK_SECRET_KEY') # required for sessions | |
| app.config.update( | |
| SESSION_COOKIE_SAMESITE="None", # allow cross-site | |
| SESSION_COOKIE_SECURE=True # required for "None" | |
| ) | |
| logging.basicConfig( | |
| level=logging.DEBUG, # Set to DEBUG for more granular logs | |
| format='%(asctime)s - %(levelname)s - %(message)s', | |
| handlers=[ | |
| logging.StreamHandler() | |
| ] | |
| ) | |
| logger = logging.getLogger(__name__) | |
| HF_TOKEN = os.environ.get("HF_TOKEN") | |
| if HF_TOKEN: | |
| try: | |
| login(token=HF_TOKEN) | |
| logger.info("Logged into Hugging Face successfully.") | |
| except Exception as e: | |
| logger.exception(f"Failed to log into Hugging Face: {e}") | |
| else: | |
| logger.warning("HF_TOKEN not found in environment variables. Session data will not be uploaded.") | |
| # Initialize Hugging Face API | |
| hf_api = HfApi() | |
| HF_REPO_ID = "pooyanrg/BlindTest" # Update as needed | |
| HF_REPO_PATH = "responses" | |
| QUESTIONS_FILE = "./data/dataset.csv" | |
| AVAILABLE_FILE = "/tmp/available.json" | |
| LOCK_FILE = "/tmp/available.lock" | |
| # Load questions into memory | |
| with open(QUESTIONS_FILE, newline="", encoding="utf-8") as f: | |
| reader = csv.DictReader(f) | |
| all_questions = list(reader) | |
| # Track which questions are available | |
| if not os.path.exists(AVAILABLE_FILE): | |
| with open(AVAILABLE_FILE, "w") as f: | |
| json.dump(list(range(len(all_questions))), f) | |
| def get_questions(num=10): | |
| with FileLock(LOCK_FILE): | |
| with open(AVAILABLE_FILE, "r") as f: | |
| available = json.load(f) | |
| if len(available) == 0: | |
| return [] # Not enough questions left | |
| sample_size = num if num < len(available) else len(available) | |
| selected_ids = random.sample(available, sample_size) | |
| remaining = [qid for qid in available if qid not in selected_ids] | |
| with open(AVAILABLE_FILE, "w") as f: | |
| json.dump(remaining, f) | |
| return [all_questions[qid] | {"id": qid} for qid in selected_ids] | |
| def splash(): | |
| if request.method == 'POST': | |
| username = request.form.get('username') | |
| if not username: | |
| logger.warning("Username not provided by the user.") | |
| return render_template('splash.html', error="Please enter a username.") | |
| return redirect(url_for('instructions', username=username)) | |
| # GET request - show intro page | |
| logger.info("Splash page rendered.") | |
| return render_template('splash.html') | |
| def instructions(): | |
| username = request.args.get('username') | |
| questions = get_questions(num=2) | |
| current_index = 0 | |
| answers = [] | |
| session['questions'] = questions | |
| session['answers'] = answers | |
| q_ids = [question['image_id'] for question in questions] | |
| if len(questions) == 0: | |
| return render_template('thanks.html') | |
| if request.method == 'POST': | |
| # User clicked the "Begin Quiz" button | |
| start_time = time.time() | |
| session['start_time'] = start_time | |
| session['question_ids'] = q_ids | |
| return redirect(url_for('prep', username=username, current_index=current_index)) | |
| # If GET, render the final instructions page | |
| return render_template('instructions.html', username=username) | |
| def prep(): | |
| username = request.args.get('username') | |
| current_index = int(request.args.get('current_index')) | |
| questions = session.get('questions', []) | |
| if request.method == 'POST': | |
| # User clicked "Start" button | |
| # Redirect to the actual question display | |
| return redirect(url_for('prompt', username=username, current_index=current_index)) | |
| return render_template('prep.html', | |
| question_number=current_index + 1, | |
| total=len(questions)) | |
| def prompt(): | |
| username = request.args.get('username') | |
| current_index = int(request.args.get('current_index')) | |
| questions = session.get('questions', []) | |
| if request.method == 'POST': | |
| # User clicked "Start" button | |
| # Redirect to the image display | |
| return redirect(url_for('image', username=username, current_index=current_index)) | |
| question_raw = questions[current_index].get('question', '') | |
| if "count" in question_raw.lower(): | |
| idx = question_raw.index('.') | |
| else: | |
| idx = question_raw.index('?') | |
| question = question_raw[:idx+1] | |
| task = questions[current_index].get('task', '') | |
| if task == 'Counting Grid - Blank Grids' or task == 'Counting Grid - Word Grids': | |
| question = "Count the number of rows and columns." | |
| return render_template('prompt.html', | |
| question_text=question) | |
| def image(): | |
| username = request.args.get('username') | |
| current_index = int(request.args.get('current_index')) | |
| questions = session.get('questions', []) | |
| image_id = str(questions[current_index].get('image_id', 'default_image.png')) | |
| if request.method == 'POST': | |
| # Move on to the "quiz" route to see if we still have more questions | |
| return redirect(url_for('respond', username=username, current_index=current_index)) | |
| # If GET, display the current image with a 10-seconds countdown | |
| return render_template('image.html', | |
| image_name=image_id + '.jpg') | |
| def respond(): | |
| username = request.args.get('username') | |
| current_index = int(request.args.get('current_index')) | |
| questions = session.get('questions', []) | |
| answers = session.get('answers', []) | |
| if request.method == 'POST': | |
| response = request.form.get('response') | |
| question_id = questions[current_index]['image_id'] | |
| # Submit the answer | |
| answers.append({"image_id": question_id, | |
| "answer": response}) | |
| session['answers'] = answers | |
| current_index += 1 | |
| if current_index >= len(questions): | |
| # Store the elapsed time | |
| start_time = session.get('start_time', time.time()) | |
| q_ids = session.get('question_ids', []) | |
| end_time = time.time() | |
| elapsed_time = end_time - start_time | |
| response_all = {'username': username, | |
| 'time': elapsed_time, | |
| 'responses':answers} | |
| json_data = json.dumps(response_all, indent=4) | |
| file_name = f"{shortuuid.uuid()}.json" | |
| temp_file_path = os.path.join("/tmp", file_name) | |
| with open(temp_file_path, 'w') as f: | |
| f.write(json_data) | |
| return render_template('thanks.html') | |
| return redirect(url_for('prep', username=username, current_index=current_index)) | |
| question_raw = questions[current_index].get('question', '') | |
| if "count" in question_raw.lower(): | |
| idx = question_raw.index('.') | |
| else: | |
| idx = question_raw.index('?') | |
| question = question_raw[:idx+1] | |
| task = questions[current_index].get('task', '') | |
| if task == 'Circled Letter': | |
| form = "Enter a letter" | |
| elif task == 'Touching Circles': | |
| form = "Enter Y/N" | |
| elif task == 'Counting Grid - Blank Grids' or task == 'Counting Grid - Word Grids': | |
| form = "Enter two numbers seperated with a comma, for example: 5,6" | |
| question = "Count the number of rows and columns." | |
| else: | |
| form = "Enter a number" | |
| return render_template('respond.html', | |
| question_text=question, | |
| instruction=form) | |
| if __name__ == "__main__": | |
| # Initialize database when running the script | |
| # example_usage() | |
| # Run Flask app | |
| # app.run(debug=False, threaded=True) | |
| app.run(host="0.0.0.0", port=7860, debug=False) |