linxinhua's picture
Update app.py via admin tool
02717bd verified
import gradio as gr
import csv
import os
import re
from datetime import datetime, timedelta
from huggingface_hub import Repository
from RAG_Learning_Assistant_with_Streaming import RAGLearningAssistant
# Configuration for Student Space
# find name of space
def get_space_name():
space_id = os.environ.get("SPACE_ID", None)
if space_id:
# SPACE_ID usually "username/space-name",we only need space-name
return space_id.split("/")[-1]
STUDENT_SPACE_NAME = get_space_name() # get space name automatically
DATA_STORAGE_REPO = "CIV3283/Data_Storage" # Centralized data storage repo
DATA_BRANCH_NAME = "data_branch"
LOCAL_DATA_DIR = "temp_data_repo"
# Session timeout configuration (in minutes)
SESSION_TIMEOUT_MINUTES = 30 # Adjust this value as needed
# File names in data storage
KNOWLEDGE_FILE = "knowledge_base.md"
VECTOR_DB_FILE = "vector_database.csv"
METADATA_FILE = "vector_metadata.json"
VECTORIZER_FILE = "vectorize_knowledge_base.py"
# Student-specific log files (with space name prefix)
QUERY_LOG_FILE = f"{STUDENT_SPACE_NAME}_query_log.csv"
FEEDBACK_LOG_FILE = f"{STUDENT_SPACE_NAME}_feedback_log.csv"
# Environment variables
HF_HUB_TOKEN = os.environ.get("HF_HUB_TOKEN", None)
if HF_HUB_TOKEN is None:
raise ValueError("Set HF_HUB_TOKEN in Space Settings -> Secrets")
OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY", None)
if OPENAI_API_KEY is None:
raise ValueError("Set OPENAI_API_KEY in Space Settings -> Secrets")
MODEL = "gpt-4.1-nano-2025-04-14"
def check_session_validity(check_id):
"""
Check if the current session is valid based on:
1. If user ID matches last query → Allow continue
2. If user ID doesn't match → Check time interval:
- If time interval is small → Block (previous user just finished)
- If time interval is large → Allow (assistant has been idle)
Returns:
tuple: (is_valid: bool, error_message: str)
"""
try:
filepath = os.path.join(LOCAL_DATA_DIR, QUERY_LOG_FILE)
# If no log file exists, this is the first query - allow it
if not os.path.exists(filepath):
print(f"[check_session_validity] No existing log file, allowing first query for student {check_id}")
return True, ""
# Read the last record from the CSV file
with open(filepath, 'r', encoding='utf-8') as csvfile:
reader = csv.reader(csvfile)
rows = list(reader)
# If only header exists, this is effectively the first query
if len(rows) <= 1:
print(f"[check_session_validity] Only header in log file, allowing first query for student {check_id}")
return True, ""
# Get the last record (most recent query)
last_record = rows[-1]
# CSV format: [student_space, student_id, timestamp, search_info, query_and_response, thumb_feedback]
if len(last_record) < 3:
print(f"[check_session_validity] Invalid last record format, allowing query")
return True, ""
last_student_id = last_record[1]
last_timestamp_str = last_record[2]
print(f"[check_session_validity] Last record - Student ID: {last_student_id}, Timestamp: {last_timestamp_str}")
print(f"[check_session_validity] Current request - Student ID: {check_id}")
# If student ID matches, allow continuation
if last_student_id == check_id:
print(f"[check_session_validity] Same user, allowing continuation for student {check_id}")
return True, ""
# If student ID doesn't match, check time interval
try:
last_timestamp = datetime.strptime(last_timestamp_str, '%Y-%m-%d %H:%M:%S')
current_timestamp = datetime.now()
time_diff = current_timestamp - last_timestamp
print(f"[check_session_validity] Different user - Time difference: {time_diff.total_seconds()} seconds ({time_diff.total_seconds()/60:.1f} minutes)")
# If time difference is small, block access (previous user just finished)
if time_diff <= timedelta(minutes=SESSION_TIMEOUT_MINUTES):
error_msg = "⚠️ The assistant is currently being used by another user. Please return to the load distributor page."
print(f"[check_session_validity] Blocking access - Previous user ({last_student_id}) used assistant {time_diff.total_seconds()/60:.1f} minutes ago")
return False, error_msg
# If time difference is large, allow access (assistant has been idle)
print(f"[check_session_validity] Assistant has been idle for {time_diff.total_seconds()/60:.1f} minutes, allowing new user {check_id}")
return True, ""
except ValueError as e:
print(f"[check_session_validity] Error parsing timestamp: {e}")
# If we can't parse the timestamp, allow the query to proceed
return True, ""
except Exception as e:
print(f"[check_session_validity] Error checking session validity: {e}")
import traceback
print(f"[check_session_validity] Traceback: {traceback.format_exc()}")
# On error, allow the query to proceed to avoid blocking legitimate users
return True, ""
def init_data_storage_repo():
"""Initialize connection to centralized data storage repository"""
try:
repo = Repository(
local_dir=LOCAL_DATA_DIR,
clone_from=DATA_STORAGE_REPO,
revision=DATA_BRANCH_NAME,
repo_type="space",
use_auth_token=HF_HUB_TOKEN
)
# Configure git user
repo.git_config_username_and_email("git_user", f"Student_Space_{STUDENT_SPACE_NAME}")
repo.git_config_username_and_email("git_email", f"{STUDENT_SPACE_NAME}@student.space")
# Pull latest changes
print(f"[init_data_storage_repo] Pulling latest changes from {DATA_STORAGE_REPO}...")
repo.git_pull(rebase=True)
print(f"[init_data_storage_repo] Successfully connected to data storage repo: {DATA_STORAGE_REPO}")
print(f"[init_data_storage_repo] Local directory: {LOCAL_DATA_DIR}")
print(f"[init_data_storage_repo] Branch: {DATA_BRANCH_NAME}")
# Check if required files exist
required_files = [KNOWLEDGE_FILE, VECTOR_DB_FILE, METADATA_FILE]
for file_name in required_files:
file_path = os.path.join(LOCAL_DATA_DIR, file_name)
if os.path.exists(file_path):
print(f"[init_data_storage_repo] Found required file: {file_name}")
else:
print(f"[init_data_storage_repo] Warning: Missing required file: {file_name}")
return repo
except Exception as e:
print(f"[init_data_storage_repo] Error initializing repository: {e}")
import traceback
print(f"[init_data_storage_repo] Traceback: {traceback.format_exc()}")
return None
def commit_student_logs(commit_message: str):
"""Commit student logs to data storage repository with conflict resolution"""
if repo is None:
print("[commit_student_logs] Error: Repository not initialized")
return False
max_retries = 3
retry_count = 0
while retry_count < max_retries:
try:
# Check if log files exist before adding
query_log_path = os.path.join(LOCAL_DATA_DIR, QUERY_LOG_FILE)
feedback_log_path = os.path.join(LOCAL_DATA_DIR, FEEDBACK_LOG_FILE)
files_to_add = []
if os.path.exists(query_log_path):
files_to_add.append(QUERY_LOG_FILE)
print(f"[commit_student_logs] Found query log: {query_log_path}")
if os.path.exists(feedback_log_path):
files_to_add.append(FEEDBACK_LOG_FILE)
print(f"[commit_student_logs] Found feedback log: {feedback_log_path}")
if not files_to_add:
print("[commit_student_logs] No log files to commit")
return False
# Add files individually
for file_name in files_to_add:
print(f"[commit_student_logs] Adding file: {file_name}")
repo.git_add(pattern=file_name)
# Check if there are changes to commit
try:
import subprocess
result = subprocess.run(
["git", "status", "--porcelain"],
cwd=LOCAL_DATA_DIR,
capture_output=True,
text=True,
check=True
)
if not result.stdout.strip():
print("[commit_student_logs] No changes to commit")
return True
print(f"[commit_student_logs] Changes detected: {result.stdout.strip()}")
except Exception as status_error:
print(f"[commit_student_logs] Warning: Could not check git status: {status_error}")
# Commit changes locally first
print(f"[commit_student_logs] Attempt {retry_count + 1}/{max_retries}: Committing locally: {commit_message}")
repo.git_commit(commit_message)
# Now try to pull and push
print("[commit_student_logs] Pulling latest changes...")
repo.git_pull(rebase=True)
# Push changes
print("[commit_student_logs] Pushing to remote...")
repo.git_push()
print(f"[commit_student_logs] Success: {commit_message}")
return True
except Exception as e:
error_msg = str(e)
print(f"[commit_student_logs] Attempt {retry_count + 1} failed: {error_msg}")
# Check if it's a push conflict or pull conflict
if ("rejected" in error_msg and "fetch first" in error_msg) or ("cannot pull with rebase" in error_msg):
print("[commit_student_logs] Detected Git conflict, will retry...")
retry_count += 1
if retry_count < max_retries:
# Try to reset and start fresh
try:
print("[commit_student_logs] Resetting repository state for retry...")
# Reset to remote state
repo.git_reset("--hard", "HEAD~1") # Undo the commit
repo.git_pull(rebase=True) # Get latest changes
# Wait a bit before retrying to avoid rapid conflicts
import time
wait_time = retry_count * 2 # 2, 4, 6 seconds
print(f"[commit_student_logs] Waiting {wait_time} seconds before retry...")
time.sleep(wait_time)
continue
except Exception as reset_error:
print(f"[commit_student_logs] Reset failed: {reset_error}")
# If reset fails, try alternative approach
try:
# Alternative: stash changes and pull
repo.git_stash()
repo.git_pull(rebase=True)
repo.git_stash("pop")
continue
except Exception as stash_error:
print(f"[commit_student_logs] Stash approach failed: {stash_error}")
return False
else:
print("[commit_student_logs] Max retries reached, giving up")
return False
else:
# Other types of errors, don't retry
print(f"[commit_student_logs] Non-conflict error, not retrying: {error_msg}")
return False
print("[commit_student_logs] Failed after all retry attempts")
return False
def save_student_query_to_csv(query, search_info, response, check_id, thumb_feedback=None):
"""Save student query record to centralized CSV file"""
try:
# Validate check_id
if not check_id:
print("[save_student_query_to_csv] Error: No valid check_id provided")
return False
# Ensure the local data directory exists
os.makedirs(LOCAL_DATA_DIR, exist_ok=True)
filepath = os.path.join(LOCAL_DATA_DIR, QUERY_LOG_FILE)
file_exists = os.path.isfile(filepath)
print(f"[save_student_query_to_csv] Saving to: {filepath}")
print(f"[save_student_query_to_csv] File exists: {file_exists}")
print(f"[save_student_query_to_csv] Student ID: {check_id}")
with open(filepath, 'a', newline='', encoding='utf-8') as csvfile:
writer = csv.writer(csvfile)
if not file_exists:
print("[save_student_query_to_csv] Writing header row")
writer.writerow(['student_space', 'student_id', 'timestamp', 'search_info', 'query_and_response', 'thumb_feedback'])
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
query_and_response = f"Query: {query}\nResponse: {response}"
writer.writerow([STUDENT_SPACE_NAME, check_id, timestamp, search_info, query_and_response, thumb_feedback or ""])
print(f"[save_student_query_to_csv] Query saved to local file: {filepath}")
# Commit student logs to data storage
print("[save_student_query_to_csv] Attempting to commit to remote repository...")
commit_success = commit_student_logs(f"Add query log from student {check_id} at {timestamp}")
if commit_success:
print("[save_student_query_to_csv] Successfully committed to remote repository")
else:
print("[save_student_query_to_csv] Failed to commit to remote repository")
return True
except Exception as e:
print(f"[save_student_query_to_csv] Error: {e}")
import traceback
print(f"[save_student_query_to_csv] Traceback: {traceback.format_exc()}")
return False
def update_latest_student_query_feedback(feedback_type, check_id):
"""Update thumb feedback for the latest student query in CSV"""
try:
# Validate check_id
if not check_id:
print("[update_latest_student_query_feedback] Error: No valid check_id provided")
return False
filepath = os.path.join(LOCAL_DATA_DIR, QUERY_LOG_FILE)
if not os.path.exists(filepath):
print("[update_latest_student_query_feedback] Error: Query log file not found")
return False
# Read existing data
rows = []
with open(filepath, 'r', encoding='utf-8') as csvfile:
reader = csv.reader(csvfile)
rows = list(reader)
# Update the last row (most recent query)
if len(rows) > 1: # Ensure there's at least one data row beyond header
rows[-1][5] = feedback_type # thumb_feedback column (index 5 for student format)
# Write back to file
with open(filepath, 'w', newline='', encoding='utf-8') as csvfile:
writer = csv.writer(csvfile)
writer.writerows(rows)
print(f"[update_latest_student_query_feedback] Updated feedback: {feedback_type}")
# Commit the update
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
commit_student_logs(f"Update feedback from student {check_id}: {feedback_type} at {timestamp}")
return True
return False
except Exception as e:
print(f"[update_latest_student_query_feedback] Error: {e}")
return False
def save_student_comment_feedback(comment, check_id):
"""Save student comment feedback to centralized feedback file"""
try:
# Validate check_id
if not check_id:
print("[save_student_comment_feedback] Error: No valid check_id provided")
return False
filepath = os.path.join(LOCAL_DATA_DIR, FEEDBACK_LOG_FILE)
file_exists = os.path.isfile(filepath)
with open(filepath, 'a', newline='', encoding='utf-8') as csvfile:
writer = csv.writer(csvfile)
if not file_exists:
writer.writerow(['student_space', 'student_id', 'timestamp', 'comment'])
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
writer.writerow([STUDENT_SPACE_NAME, check_id, timestamp, comment])
print(f"[save_student_comment_feedback] Saved comment to {filepath}")
# Commit student logs
commit_student_logs(f"Add comment feedback from student {check_id} at {timestamp}")
return True
except Exception as e:
print(f"[save_student_comment_feedback] Error: {e}")
return False
def get_url_params(request: gr.Request):
"""Extract URL parameters from request"""
if request:
query_params = dict(request.query_params)
check_id = query_params.get('check', None)
if check_id:
return f"RAG Learning Assistant - Student", check_id
else:
return "RAG Learning Assistant - Student", None
return "RAG Learning Assistant - Student", None
def chat_response(message, history, search_info_display, check_id, has_query):
"""Process user input and return streaming response"""
if not message.strip():
return history, search_info_display, has_query
# Check access permission first
if not check_id:
print(f"[chat_response] Access denied: No valid check ID provided")
# Raise error dialog for access denial
raise gr.Error(
"⚠️ Access Restricted\n\n"
"Please access this system through the link provided in Moodle.\n\n"
"If you are a student in this course:\n"
"1. Go to your Moodle course page\n"
"2. Find the 'CivASK' link\n"
"3. Click the link to access the system\n\n"
"If you continue to experience issues, please contact your instructor.",
duration=8
)
# NEW: Check session validity before proceeding
session_valid, error_message = check_session_validity(check_id)
if not session_valid:
print(f"[chat_response] Session invalid for student {check_id}")
raise gr.Error(error_message, duration=10)
# Valid access and valid session - proceed with normal AI conversation
print(f"[chat_response] Valid access and session for student ID: {check_id}")
# Convert to messages format if needed
if history and isinstance(history[0], list):
# Convert from tuples to messages format
messages_history = []
for user_msg, assistant_msg in history:
messages_history.append({"role": "user", "content": user_msg})
if assistant_msg:
messages_history.append({"role": "assistant", "content": assistant_msg})
history = messages_history
# Add user message
history.append({"role": "user", "content": message})
history.append({"role": "assistant", "content": ""})
search_info_collected = False
search_info_content = ""
content_part = ""
# Process streaming response
for chunk in assistant.generate_response_stream(message):
if not search_info_collected:
if "**Response:**" in chunk: # Support English markers
search_info_content += chunk
search_info_collected = True
yield history, search_info_content, has_query
else:
search_info_content += chunk
yield history, search_info_content, has_query
else:
content_part += chunk
# Update the last assistant message
history[-1]["content"] = content_part
yield history, search_info_content, has_query
# After streaming is complete, save to CSV (only for valid access)
try:
print(f"[chat_response] Saving student query to CSV...")
print(f"Student Space: {STUDENT_SPACE_NAME}")
print(f"Student ID: {check_id}")
print(f"Query: {message}")
save_success = save_student_query_to_csv(message, search_info_content, content_part, check_id)
if save_success:
print(f"[chat_response] Student query saved successfully")
has_query = True # Mark that we have a query to rate
else:
print(f"[chat_response] Failed to save student query")
except Exception as e:
print(f"[chat_response] Error saving student query: {e}")
return history, search_info_content, has_query
# Global variables
repo = None
assistant = None
def main():
"""Main function to initialize and launch the student application"""
global repo, assistant
# Initialize data storage repository connection
repo = init_data_storage_repo()
# Initialize RAG assistant with centralized data storage directory
print(f"[main] Initializing RAG assistant with data directory: {LOCAL_DATA_DIR}")
print(f"[main] Session timeout set to: {SESSION_TIMEOUT_MINUTES} minutes")
assistant = RAGLearningAssistant(
api_key=OPENAI_API_KEY,
model=MODEL,
vector_db_path=LOCAL_DATA_DIR # Pass the data storage repo directory
)
print(f"[main] RAG assistant initialized successfully")
print(f"[main] Student space: {STUDENT_SPACE_NAME}")
print(f"[main] Data storage repo: {DATA_STORAGE_REPO}")
print(f"[main] Query log file: {QUERY_LOG_FILE}")
print(f"[main] Feedback log file: {FEEDBACK_LOG_FILE}")
# Create interface
with gr.Blocks(title=f"RAG Assistant - {STUDENT_SPACE_NAME}") as interface:
check_id_state = gr.State("1")
has_query_state = gr.State(False) # Track if there's a query to rate
title_display = gr.Markdown(f"# RAG Learning Assistant - {STUDENT_SPACE_NAME}", elem_id="title")
# Only Query Check functionality for students
with gr.Row():
with gr.Column(scale=4):
chatbot = gr.Chatbot(label="Ask Your Questions", height=500, resizable = True, type="messages", render_markdown=True, latex_delimiters=[
{ "left": "$$", "right": "$$", "display": True },
{ "left": "\(", "right": "\)", "display": False },
{ "left": "$", "right": "$", "display": False },
{ "left": "\[", "right": "\]", "display": True }])
msg = gr.Textbox(placeholder="Type your message here...", label="Your Message", show_label=True)
# Feedback buttons row
with gr.Row():
thumbs_up_btn = gr.Button("👍 Good Answer", variant="secondary", size="sm")
thumbs_down_btn = gr.Button("👎 Poor Answer", variant="secondary", size="sm")
feedback_status = gr.Textbox(label="Feedback Status", interactive=False, lines=1)
# Comment section
with gr.Row():
comment_input = gr.Textbox(placeholder="Share your comments or suggestions...", label="Comments", lines=2)
submit_comment_btn = gr.Button("Submit Comment", variant="primary")
with gr.Column(scale=1):
search_info = gr.Markdown(label="Search Analysis Information", value="")
# Event handlers
def init_from_url(request: gr.Request):
title, check_id = get_url_params(request)
print(f"[init_from_url] Extracted check_id: {check_id}")
return f"# {title}", check_id, False # Reset has_query state
# Feedback handlers
def handle_thumbs_up(check_id, has_query):
if not check_id:
raise gr.Error(
"⚠️ Access Restricted\n\n"
"Please access this system through the CivASK link provided in Moodle to use the feedback features.",
duration=5
)
print(f"[handle_thumbs_up] Student: {STUDENT_SPACE_NAME}, check_id: {check_id}")
# Check if student query log exists and has queries
filepath = os.path.join(LOCAL_DATA_DIR, QUERY_LOG_FILE)
if os.path.exists(filepath):
with open(filepath, 'r', encoding='utf-8') as csvfile:
reader = csv.reader(csvfile)
rows = list(reader)
if len(rows) > 1: # Has header + at least one data row
success = update_latest_student_query_feedback("thumbs_up", check_id)
return "👍 Thank you for your positive feedback!" if success else "Failed to save feedback"
return "No query to rate yet"
def handle_thumbs_down(check_id, has_query):
if not check_id:
raise gr.Error(
"⚠️ Access Restricted\n\n"
"Please access this system through the CivASK link provided in Moodle to use the feedback features.",
duration=5
)
print(f"[handle_thumbs_down] Student: {STUDENT_SPACE_NAME}, check_id: {check_id}")
# Check if student query log exists and has queries
filepath = os.path.join(LOCAL_DATA_DIR, QUERY_LOG_FILE)
if os.path.exists(filepath):
with open(filepath, 'r', encoding='utf-8') as csvfile:
reader = csv.reader(csvfile)
rows = list(reader)
if len(rows) > 1: # Has header + at least one data row
success = update_latest_student_query_feedback("thumbs_down", check_id)
return "👎 Thank you for your feedback. We'll work to improve!" if success else "Failed to save feedback"
return "No query to rate yet"
def handle_comment_submission(comment, check_id):
if not check_id:
raise gr.Error(
"⚠️ Access Restricted\n\n"
"Please access this system through the CivASK link provided in Moodle to submit comments.",
duration=5
)
if comment.strip():
success = save_student_comment_feedback(comment.strip(), check_id)
if success:
return "💬 Thank you for your comment!", ""
else:
return "Failed to save comment", comment
return "Please enter a comment", comment
interface.load(fn=init_from_url, outputs=[title_display, check_id_state, has_query_state])
# Query events
msg.submit(
chat_response,
[msg, chatbot, search_info, check_id_state, has_query_state],
[chatbot, search_info, has_query_state]
).then(lambda: "", outputs=[msg])
# Feedback events
thumbs_up_btn.click(
handle_thumbs_up,
inputs=[check_id_state, has_query_state],
outputs=[feedback_status]
)
thumbs_down_btn.click(
handle_thumbs_down,
inputs=[check_id_state, has_query_state],
outputs=[feedback_status]
)
submit_comment_btn.click(
handle_comment_submission,
inputs=[comment_input, check_id_state],
outputs=[feedback_status, comment_input]
)
interface.launch()
if __name__ == "__main__":
main()