testing-blind / app.py
Thang Truong
initial update
aa4cf69
import threading
from flask import Flask, g, render_template, request, redirect, url_for, session
import os
import time
from huggingface_hub import login, HfApi, hf_hub_download # For Hugging Face integration
import os
import logging
import csv
import random
import shortuuid
import json
import pandas as pd
from filelock import FileLock
# Flask application setup
app = Flask(__name__)
import os, secrets
app = Flask(__name__)
# Use a persistent env var in prod; fallback only for local/dev
app.config["SECRET_KEY"] = os.environ.get("FLASK_SECRET_KEY") or secrets.token_hex(32)
# app.config['SECRET_KEY'] = os.environ.get('FLASK_SECRET_KEY') # required for sessions
app.config.update(
SESSION_COOKIE_SAMESITE="None", # allow cross-site
SESSION_COOKIE_SECURE=True # required for "None"
)
logging.basicConfig(
level=logging.DEBUG, # Set to DEBUG for more granular logs
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
HF_TOKEN = os.environ.get("HF_TOKEN")
if HF_TOKEN:
try:
login(token=HF_TOKEN)
logger.info("Logged into Hugging Face successfully.")
except Exception as e:
logger.exception(f"Failed to log into Hugging Face: {e}")
else:
logger.warning("HF_TOKEN not found in environment variables. Session data will not be uploaded.")
# Initialize Hugging Face API
hf_api = HfApi()
HF_REPO_ID = "pooyanrg/BlindTest" # Update as needed
HF_REPO_PATH = "responses"
QUESTIONS_FILE = "./data/dataset.csv"
AVAILABLE_FILE = "/tmp/available.json"
LOCK_FILE = "/tmp/available.lock"
# Load questions into memory
with open(QUESTIONS_FILE, newline="", encoding="utf-8") as f:
reader = csv.DictReader(f)
all_questions = list(reader)
# Track which questions are available
if not os.path.exists(AVAILABLE_FILE):
with open(AVAILABLE_FILE, "w") as f:
json.dump(list(range(len(all_questions))), f)
def get_questions(num=10):
with FileLock(LOCK_FILE):
with open(AVAILABLE_FILE, "r") as f:
available = json.load(f)
if len(available) == 0:
return [] # Not enough questions left
sample_size = num if num < len(available) else len(available)
selected_ids = random.sample(available, sample_size)
remaining = [qid for qid in available if qid not in selected_ids]
with open(AVAILABLE_FILE, "w") as f:
json.dump(remaining, f)
return [all_questions[qid] | {"id": qid} for qid in selected_ids]
@app.route("/", methods=['GET', 'POST'])
def splash():
if request.method == 'POST':
username = request.form.get('username')
if not username:
logger.warning("Username not provided by the user.")
return render_template('splash.html', error="Please enter a username.")
return redirect(url_for('instructions', username=username))
# GET request - show intro page
logger.info("Splash page rendered.")
return render_template('splash.html')
@app.route('/instructions', methods=['GET', 'POST'])
def instructions():
username = request.args.get('username')
questions = get_questions(num=2)
current_index = 0
answers = []
session['questions'] = questions
session['answers'] = answers
q_ids = [question['image_id'] for question in questions]
if len(questions) == 0:
return render_template('thanks.html')
if request.method == 'POST':
# User clicked the "Begin Quiz" button
start_time = time.time()
session['start_time'] = start_time
session['question_ids'] = q_ids
return redirect(url_for('prep', username=username, current_index=current_index))
# If GET, render the final instructions page
return render_template('instructions.html', username=username)
@app.route('/prep', methods=['GET', 'POST'])
def prep():
username = request.args.get('username')
current_index = int(request.args.get('current_index'))
questions = session.get('questions', [])
if request.method == 'POST':
# User clicked "Start" button
# Redirect to the actual question display
return redirect(url_for('prompt', username=username, current_index=current_index))
return render_template('prep.html',
question_number=current_index + 1,
total=len(questions))
@app.route('/prompt', methods=['GET', 'POST'])
def prompt():
username = request.args.get('username')
current_index = int(request.args.get('current_index'))
questions = session.get('questions', [])
if request.method == 'POST':
# User clicked "Start" button
# Redirect to the image display
return redirect(url_for('image', username=username, current_index=current_index))
question_raw = questions[current_index].get('question', '')
if "count" in question_raw.lower():
idx = question_raw.index('.')
else:
idx = question_raw.index('?')
question = question_raw[:idx+1]
task = questions[current_index].get('task', '')
if task == 'Counting Grid - Blank Grids' or task == 'Counting Grid - Word Grids':
question = "Count the number of rows and columns."
return render_template('prompt.html',
question_text=question)
@app.route('/image', methods=['GET', 'POST'])
def image():
username = request.args.get('username')
current_index = int(request.args.get('current_index'))
questions = session.get('questions', [])
image_id = str(questions[current_index].get('image_id', 'default_image.png'))
if request.method == 'POST':
# Move on to the "quiz" route to see if we still have more questions
return redirect(url_for('respond', username=username, current_index=current_index))
# If GET, display the current image with a 10-seconds countdown
return render_template('image.html',
image_name=image_id + '.jpg')
@app.route('/respond', methods=['GET', 'POST'])
def respond():
username = request.args.get('username')
current_index = int(request.args.get('current_index'))
questions = session.get('questions', [])
answers = session.get('answers', [])
if request.method == 'POST':
response = request.form.get('response')
question_id = questions[current_index]['image_id']
# Submit the answer
answers.append({"image_id": question_id,
"answer": response})
session['answers'] = answers
current_index += 1
if current_index >= len(questions):
# Store the elapsed time
start_time = session.get('start_time', time.time())
q_ids = session.get('question_ids', [])
end_time = time.time()
elapsed_time = end_time - start_time
response_all = {'username': username,
'time': elapsed_time,
'responses':answers}
json_data = json.dumps(response_all, indent=4)
file_name = f"{shortuuid.uuid()}.json"
temp_file_path = os.path.join("/tmp", file_name)
with open(temp_file_path, 'w') as f:
f.write(json_data)
return render_template('thanks.html')
return redirect(url_for('prep', username=username, current_index=current_index))
question_raw = questions[current_index].get('question', '')
if "count" in question_raw.lower():
idx = question_raw.index('.')
else:
idx = question_raw.index('?')
question = question_raw[:idx+1]
task = questions[current_index].get('task', '')
if task == 'Circled Letter':
form = "Enter a letter"
elif task == 'Touching Circles':
form = "Enter Y/N"
elif task == 'Counting Grid - Blank Grids' or task == 'Counting Grid - Word Grids':
form = "Enter two numbers seperated with a comma, for example: 5,6"
question = "Count the number of rows and columns."
else:
form = "Enter a number"
return render_template('respond.html',
question_text=question,
instruction=form)
if __name__ == "__main__":
# Initialize database when running the script
# example_usage()
# Run Flask app
# app.run(debug=False, threaded=True)
app.run(host="0.0.0.0", port=7860, debug=False)