import gradio as gr import csv import os import re from datetime import datetime, timedelta from huggingface_hub import Repository from RAG_Learning_Assistant_with_Streaming import RAGLearningAssistant # Configuration for Student Space # find name of space def get_space_name(): space_id = os.environ.get("SPACE_ID", None) if space_id: # SPACE_ID usually "username/space-name",we only need space-name return space_id.split("/")[-1] STUDENT_SPACE_NAME = get_space_name() # get space name automatically DATA_STORAGE_REPO = "CIV3283/Data_Storage" # Centralized data storage repo DATA_BRANCH_NAME = "data_branch" LOCAL_DATA_DIR = "temp_data_repo" # Session timeout configuration (in minutes) SESSION_TIMEOUT_MINUTES = 30 # Adjust this value as needed # File names in data storage KNOWLEDGE_FILE = "knowledge_base.md" VECTOR_DB_FILE = "vector_database.csv" METADATA_FILE = "vector_metadata.json" VECTORIZER_FILE = "vectorize_knowledge_base.py" # Student-specific log files (with space name prefix) QUERY_LOG_FILE = f"{STUDENT_SPACE_NAME}_query_log.csv" FEEDBACK_LOG_FILE = f"{STUDENT_SPACE_NAME}_feedback_log.csv" # Environment variables HF_HUB_TOKEN = os.environ.get("HF_HUB_TOKEN", None) if HF_HUB_TOKEN is None: raise ValueError("Set HF_HUB_TOKEN in Space Settings -> Secrets") OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY", None) if OPENAI_API_KEY is None: raise ValueError("Set OPENAI_API_KEY in Space Settings -> Secrets") MODEL = "gpt-4.1-nano-2025-04-14" def check_session_validity(check_id): """ Check if the current session is valid based on: 1. If user ID matches last query → Allow continue 2. If user ID doesn't match → Check time interval: - If time interval is small → Block (previous user just finished) - If time interval is large → Allow (assistant has been idle) Returns: tuple: (is_valid: bool, error_message: str) """ try: filepath = os.path.join(LOCAL_DATA_DIR, QUERY_LOG_FILE) # If no log file exists, this is the first query - allow it if not os.path.exists(filepath): print(f"[check_session_validity] No existing log file, allowing first query for student {check_id}") return True, "" # Read the last record from the CSV file with open(filepath, 'r', encoding='utf-8') as csvfile: reader = csv.reader(csvfile) rows = list(reader) # If only header exists, this is effectively the first query if len(rows) <= 1: print(f"[check_session_validity] Only header in log file, allowing first query for student {check_id}") return True, "" # Get the last record (most recent query) last_record = rows[-1] # CSV format: [student_space, student_id, timestamp, search_info, query_and_response, thumb_feedback] if len(last_record) < 3: print(f"[check_session_validity] Invalid last record format, allowing query") return True, "" last_student_id = last_record[1] last_timestamp_str = last_record[2] print(f"[check_session_validity] Last record - Student ID: {last_student_id}, Timestamp: {last_timestamp_str}") print(f"[check_session_validity] Current request - Student ID: {check_id}") # If student ID matches, allow continuation if last_student_id == check_id: print(f"[check_session_validity] Same user, allowing continuation for student {check_id}") return True, "" # If student ID doesn't match, check time interval try: last_timestamp = datetime.strptime(last_timestamp_str, '%Y-%m-%d %H:%M:%S') current_timestamp = datetime.now() time_diff = current_timestamp - last_timestamp print(f"[check_session_validity] Different user - Time difference: {time_diff.total_seconds()} seconds ({time_diff.total_seconds()/60:.1f} minutes)") # If time difference is small, block access (previous user just finished) if time_diff <= timedelta(minutes=SESSION_TIMEOUT_MINUTES): error_msg = "⚠️ The assistant is currently being used by another user. Please return to the load distributor page." print(f"[check_session_validity] Blocking access - Previous user ({last_student_id}) used assistant {time_diff.total_seconds()/60:.1f} minutes ago") return False, error_msg # If time difference is large, allow access (assistant has been idle) print(f"[check_session_validity] Assistant has been idle for {time_diff.total_seconds()/60:.1f} minutes, allowing new user {check_id}") return True, "" except ValueError as e: print(f"[check_session_validity] Error parsing timestamp: {e}") # If we can't parse the timestamp, allow the query to proceed return True, "" except Exception as e: print(f"[check_session_validity] Error checking session validity: {e}") import traceback print(f"[check_session_validity] Traceback: {traceback.format_exc()}") # On error, allow the query to proceed to avoid blocking legitimate users return True, "" def init_data_storage_repo(): """Initialize connection to centralized data storage repository""" try: repo = Repository( local_dir=LOCAL_DATA_DIR, clone_from=DATA_STORAGE_REPO, revision=DATA_BRANCH_NAME, repo_type="space", use_auth_token=HF_HUB_TOKEN ) # Configure git user repo.git_config_username_and_email("git_user", f"Student_Space_{STUDENT_SPACE_NAME}") repo.git_config_username_and_email("git_email", f"{STUDENT_SPACE_NAME}@student.space") # Pull latest changes print(f"[init_data_storage_repo] Pulling latest changes from {DATA_STORAGE_REPO}...") repo.git_pull(rebase=True) print(f"[init_data_storage_repo] Successfully connected to data storage repo: {DATA_STORAGE_REPO}") print(f"[init_data_storage_repo] Local directory: {LOCAL_DATA_DIR}") print(f"[init_data_storage_repo] Branch: {DATA_BRANCH_NAME}") # Check if required files exist required_files = [KNOWLEDGE_FILE, VECTOR_DB_FILE, METADATA_FILE] for file_name in required_files: file_path = os.path.join(LOCAL_DATA_DIR, file_name) if os.path.exists(file_path): print(f"[init_data_storage_repo] Found required file: {file_name}") else: print(f"[init_data_storage_repo] Warning: Missing required file: {file_name}") return repo except Exception as e: print(f"[init_data_storage_repo] Error initializing repository: {e}") import traceback print(f"[init_data_storage_repo] Traceback: {traceback.format_exc()}") return None def commit_student_logs(commit_message: str): """Commit student logs to data storage repository with conflict resolution""" if repo is None: print("[commit_student_logs] Error: Repository not initialized") return False max_retries = 3 retry_count = 0 while retry_count < max_retries: try: # Check if log files exist before adding query_log_path = os.path.join(LOCAL_DATA_DIR, QUERY_LOG_FILE) feedback_log_path = os.path.join(LOCAL_DATA_DIR, FEEDBACK_LOG_FILE) files_to_add = [] if os.path.exists(query_log_path): files_to_add.append(QUERY_LOG_FILE) print(f"[commit_student_logs] Found query log: {query_log_path}") if os.path.exists(feedback_log_path): files_to_add.append(FEEDBACK_LOG_FILE) print(f"[commit_student_logs] Found feedback log: {feedback_log_path}") if not files_to_add: print("[commit_student_logs] No log files to commit") return False # Add files individually for file_name in files_to_add: print(f"[commit_student_logs] Adding file: {file_name}") repo.git_add(pattern=file_name) # Check if there are changes to commit try: import subprocess result = subprocess.run( ["git", "status", "--porcelain"], cwd=LOCAL_DATA_DIR, capture_output=True, text=True, check=True ) if not result.stdout.strip(): print("[commit_student_logs] No changes to commit") return True print(f"[commit_student_logs] Changes detected: {result.stdout.strip()}") except Exception as status_error: print(f"[commit_student_logs] Warning: Could not check git status: {status_error}") # Commit changes locally first print(f"[commit_student_logs] Attempt {retry_count + 1}/{max_retries}: Committing locally: {commit_message}") repo.git_commit(commit_message) # Now try to pull and push print("[commit_student_logs] Pulling latest changes...") repo.git_pull(rebase=True) # Push changes print("[commit_student_logs] Pushing to remote...") repo.git_push() print(f"[commit_student_logs] Success: {commit_message}") return True except Exception as e: error_msg = str(e) print(f"[commit_student_logs] Attempt {retry_count + 1} failed: {error_msg}") # Check if it's a push conflict or pull conflict if ("rejected" in error_msg and "fetch first" in error_msg) or ("cannot pull with rebase" in error_msg): print("[commit_student_logs] Detected Git conflict, will retry...") retry_count += 1 if retry_count < max_retries: # Try to reset and start fresh try: print("[commit_student_logs] Resetting repository state for retry...") # Reset to remote state repo.git_reset("--hard", "HEAD~1") # Undo the commit repo.git_pull(rebase=True) # Get latest changes # Wait a bit before retrying to avoid rapid conflicts import time wait_time = retry_count * 2 # 2, 4, 6 seconds print(f"[commit_student_logs] Waiting {wait_time} seconds before retry...") time.sleep(wait_time) continue except Exception as reset_error: print(f"[commit_student_logs] Reset failed: {reset_error}") # If reset fails, try alternative approach try: # Alternative: stash changes and pull repo.git_stash() repo.git_pull(rebase=True) repo.git_stash("pop") continue except Exception as stash_error: print(f"[commit_student_logs] Stash approach failed: {stash_error}") return False else: print("[commit_student_logs] Max retries reached, giving up") return False else: # Other types of errors, don't retry print(f"[commit_student_logs] Non-conflict error, not retrying: {error_msg}") return False print("[commit_student_logs] Failed after all retry attempts") return False def save_student_query_to_csv(query, search_info, response, check_id, thumb_feedback=None): """Save student query record to centralized CSV file""" try: # Validate check_id if not check_id: print("[save_student_query_to_csv] Error: No valid check_id provided") return False # Ensure the local data directory exists os.makedirs(LOCAL_DATA_DIR, exist_ok=True) filepath = os.path.join(LOCAL_DATA_DIR, QUERY_LOG_FILE) file_exists = os.path.isfile(filepath) print(f"[save_student_query_to_csv] Saving to: {filepath}") print(f"[save_student_query_to_csv] File exists: {file_exists}") print(f"[save_student_query_to_csv] Student ID: {check_id}") with open(filepath, 'a', newline='', encoding='utf-8') as csvfile: writer = csv.writer(csvfile) if not file_exists: print("[save_student_query_to_csv] Writing header row") writer.writerow(['student_space', 'student_id', 'timestamp', 'search_info', 'query_and_response', 'thumb_feedback']) timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S') query_and_response = f"Query: {query}\nResponse: {response}" writer.writerow([STUDENT_SPACE_NAME, check_id, timestamp, search_info, query_and_response, thumb_feedback or ""]) print(f"[save_student_query_to_csv] Query saved to local file: {filepath}") # Commit student logs to data storage print("[save_student_query_to_csv] Attempting to commit to remote repository...") commit_success = commit_student_logs(f"Add query log from student {check_id} at {timestamp}") if commit_success: print("[save_student_query_to_csv] Successfully committed to remote repository") else: print("[save_student_query_to_csv] Failed to commit to remote repository") return True except Exception as e: print(f"[save_student_query_to_csv] Error: {e}") import traceback print(f"[save_student_query_to_csv] Traceback: {traceback.format_exc()}") return False def update_latest_student_query_feedback(feedback_type, check_id): """Update thumb feedback for the latest student query in CSV""" try: # Validate check_id if not check_id: print("[update_latest_student_query_feedback] Error: No valid check_id provided") return False filepath = os.path.join(LOCAL_DATA_DIR, QUERY_LOG_FILE) if not os.path.exists(filepath): print("[update_latest_student_query_feedback] Error: Query log file not found") return False # Read existing data rows = [] with open(filepath, 'r', encoding='utf-8') as csvfile: reader = csv.reader(csvfile) rows = list(reader) # Update the last row (most recent query) if len(rows) > 1: # Ensure there's at least one data row beyond header rows[-1][5] = feedback_type # thumb_feedback column (index 5 for student format) # Write back to file with open(filepath, 'w', newline='', encoding='utf-8') as csvfile: writer = csv.writer(csvfile) writer.writerows(rows) print(f"[update_latest_student_query_feedback] Updated feedback: {feedback_type}") # Commit the update timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S') commit_student_logs(f"Update feedback from student {check_id}: {feedback_type} at {timestamp}") return True return False except Exception as e: print(f"[update_latest_student_query_feedback] Error: {e}") return False def save_student_comment_feedback(comment, check_id): """Save student comment feedback to centralized feedback file""" try: # Validate check_id if not check_id: print("[save_student_comment_feedback] Error: No valid check_id provided") return False filepath = os.path.join(LOCAL_DATA_DIR, FEEDBACK_LOG_FILE) file_exists = os.path.isfile(filepath) with open(filepath, 'a', newline='', encoding='utf-8') as csvfile: writer = csv.writer(csvfile) if not file_exists: writer.writerow(['student_space', 'student_id', 'timestamp', 'comment']) timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S') writer.writerow([STUDENT_SPACE_NAME, check_id, timestamp, comment]) print(f"[save_student_comment_feedback] Saved comment to {filepath}") # Commit student logs commit_student_logs(f"Add comment feedback from student {check_id} at {timestamp}") return True except Exception as e: print(f"[save_student_comment_feedback] Error: {e}") return False def get_url_params(request: gr.Request): """Extract URL parameters from request""" if request: query_params = dict(request.query_params) check_id = query_params.get('check', None) if check_id: return f"RAG Learning Assistant - Student", check_id else: return "RAG Learning Assistant - Student", None return "RAG Learning Assistant - Student", None def chat_response(message, history, search_info_display, check_id, has_query): """Process user input and return streaming response""" if not message.strip(): return history, search_info_display, has_query # Check access permission first if not check_id: print(f"[chat_response] Access denied: No valid check ID provided") # Raise error dialog for access denial raise gr.Error( "⚠️ Access Restricted\n\n" "Please access this system through the link provided in Moodle.\n\n" "If you are a student in this course:\n" "1. Go to your Moodle course page\n" "2. Find the 'CivASK' link\n" "3. Click the link to access the system\n\n" "If you continue to experience issues, please contact your instructor.", duration=8 ) # NEW: Check session validity before proceeding session_valid, error_message = check_session_validity(check_id) if not session_valid: print(f"[chat_response] Session invalid for student {check_id}") raise gr.Error(error_message, duration=10) # Valid access and valid session - proceed with normal AI conversation print(f"[chat_response] Valid access and session for student ID: {check_id}") # Convert to messages format if needed if history and isinstance(history[0], list): # Convert from tuples to messages format messages_history = [] for user_msg, assistant_msg in history: messages_history.append({"role": "user", "content": user_msg}) if assistant_msg: messages_history.append({"role": "assistant", "content": assistant_msg}) history = messages_history # Add user message history.append({"role": "user", "content": message}) history.append({"role": "assistant", "content": ""}) search_info_collected = False search_info_content = "" content_part = "" # Process streaming response for chunk in assistant.generate_response_stream(message): if not search_info_collected: if "**Response:**" in chunk: # Support English markers search_info_content += chunk search_info_collected = True yield history, search_info_content, has_query else: search_info_content += chunk yield history, search_info_content, has_query else: content_part += chunk # Update the last assistant message history[-1]["content"] = content_part yield history, search_info_content, has_query # After streaming is complete, save to CSV (only for valid access) try: print(f"[chat_response] Saving student query to CSV...") print(f"Student Space: {STUDENT_SPACE_NAME}") print(f"Student ID: {check_id}") print(f"Query: {message}") save_success = save_student_query_to_csv(message, search_info_content, content_part, check_id) if save_success: print(f"[chat_response] Student query saved successfully") has_query = True # Mark that we have a query to rate else: print(f"[chat_response] Failed to save student query") except Exception as e: print(f"[chat_response] Error saving student query: {e}") return history, search_info_content, has_query # Global variables repo = None assistant = None def main(): """Main function to initialize and launch the student application""" global repo, assistant # Initialize data storage repository connection repo = init_data_storage_repo() # Initialize RAG assistant with centralized data storage directory print(f"[main] Initializing RAG assistant with data directory: {LOCAL_DATA_DIR}") print(f"[main] Session timeout set to: {SESSION_TIMEOUT_MINUTES} minutes") assistant = RAGLearningAssistant( api_key=OPENAI_API_KEY, model=MODEL, vector_db_path=LOCAL_DATA_DIR # Pass the data storage repo directory ) print(f"[main] RAG assistant initialized successfully") print(f"[main] Student space: {STUDENT_SPACE_NAME}") print(f"[main] Data storage repo: {DATA_STORAGE_REPO}") print(f"[main] Query log file: {QUERY_LOG_FILE}") print(f"[main] Feedback log file: {FEEDBACK_LOG_FILE}") # Create interface with gr.Blocks(title=f"RAG Assistant - {STUDENT_SPACE_NAME}") as interface: check_id_state = gr.State("1") has_query_state = gr.State(False) # Track if there's a query to rate title_display = gr.Markdown(f"# RAG Learning Assistant - {STUDENT_SPACE_NAME}", elem_id="title") # Only Query Check functionality for students with gr.Row(): with gr.Column(scale=4): chatbot = gr.Chatbot(label="Ask Your Questions", height=500, resizable = True, type="messages", render_markdown=True, latex_delimiters=[ { "left": "$$", "right": "$$", "display": True }, { "left": "\(", "right": "\)", "display": False }, { "left": "$", "right": "$", "display": False }, { "left": "\[", "right": "\]", "display": True }]) msg = gr.Textbox(placeholder="Type your message here...", label="Your Message", show_label=True) # Feedback buttons row with gr.Row(): thumbs_up_btn = gr.Button("👍 Good Answer", variant="secondary", size="sm") thumbs_down_btn = gr.Button("👎 Poor Answer", variant="secondary", size="sm") feedback_status = gr.Textbox(label="Feedback Status", interactive=False, lines=1) # Comment section with gr.Row(): comment_input = gr.Textbox(placeholder="Share your comments or suggestions...", label="Comments", lines=2) submit_comment_btn = gr.Button("Submit Comment", variant="primary") with gr.Column(scale=1): search_info = gr.Markdown(label="Search Analysis Information", value="") # Event handlers def init_from_url(request: gr.Request): title, check_id = get_url_params(request) print(f"[init_from_url] Extracted check_id: {check_id}") return f"# {title}", check_id, False # Reset has_query state # Feedback handlers def handle_thumbs_up(check_id, has_query): if not check_id: raise gr.Error( "⚠️ Access Restricted\n\n" "Please access this system through the CivASK link provided in Moodle to use the feedback features.", duration=5 ) print(f"[handle_thumbs_up] Student: {STUDENT_SPACE_NAME}, check_id: {check_id}") # Check if student query log exists and has queries filepath = os.path.join(LOCAL_DATA_DIR, QUERY_LOG_FILE) if os.path.exists(filepath): with open(filepath, 'r', encoding='utf-8') as csvfile: reader = csv.reader(csvfile) rows = list(reader) if len(rows) > 1: # Has header + at least one data row success = update_latest_student_query_feedback("thumbs_up", check_id) return "👍 Thank you for your positive feedback!" if success else "Failed to save feedback" return "No query to rate yet" def handle_thumbs_down(check_id, has_query): if not check_id: raise gr.Error( "⚠️ Access Restricted\n\n" "Please access this system through the CivASK link provided in Moodle to use the feedback features.", duration=5 ) print(f"[handle_thumbs_down] Student: {STUDENT_SPACE_NAME}, check_id: {check_id}") # Check if student query log exists and has queries filepath = os.path.join(LOCAL_DATA_DIR, QUERY_LOG_FILE) if os.path.exists(filepath): with open(filepath, 'r', encoding='utf-8') as csvfile: reader = csv.reader(csvfile) rows = list(reader) if len(rows) > 1: # Has header + at least one data row success = update_latest_student_query_feedback("thumbs_down", check_id) return "👎 Thank you for your feedback. We'll work to improve!" if success else "Failed to save feedback" return "No query to rate yet" def handle_comment_submission(comment, check_id): if not check_id: raise gr.Error( "⚠️ Access Restricted\n\n" "Please access this system through the CivASK link provided in Moodle to submit comments.", duration=5 ) if comment.strip(): success = save_student_comment_feedback(comment.strip(), check_id) if success: return "💬 Thank you for your comment!", "" else: return "Failed to save comment", comment return "Please enter a comment", comment interface.load(fn=init_from_url, outputs=[title_display, check_id_state, has_query_state]) # Query events msg.submit( chat_response, [msg, chatbot, search_info, check_id_state, has_query_state], [chatbot, search_info, has_query_state] ).then(lambda: "", outputs=[msg]) # Feedback events thumbs_up_btn.click( handle_thumbs_up, inputs=[check_id_state, has_query_state], outputs=[feedback_status] ) thumbs_down_btn.click( handle_thumbs_down, inputs=[check_id_state, has_query_state], outputs=[feedback_status] ) submit_comment_btn.click( handle_comment_submission, inputs=[comment_input, check_id_state], outputs=[feedback_status, comment_input] ) interface.launch() if __name__ == "__main__": main()