Spaces:
Paused
Paused
| import gradio as gr | |
| import csv | |
| import os | |
| import re | |
| from datetime import datetime, timedelta | |
| from huggingface_hub import Repository | |
| from RAG_Learning_Assistant_with_Streaming import RAGLearningAssistant | |
| # Configuration for Student Space | |
| # find name of space | |
| def get_space_name(): | |
| space_id = os.environ.get("SPACE_ID", None) | |
| if space_id: | |
| # SPACE_ID usually "username/space-name",we only need space-name | |
| return space_id.split("/")[-1] | |
| STUDENT_SPACE_NAME = get_space_name() # get space name automatically | |
| DATA_STORAGE_REPO = "CIV3283/Data_Storage" # Centralized data storage repo | |
| DATA_BRANCH_NAME = "data_branch" | |
| LOCAL_DATA_DIR = "temp_data_repo" | |
| # Session timeout configuration (in minutes) | |
| SESSION_TIMEOUT_MINUTES = 30 # Adjust this value as needed | |
| # File names in data storage | |
| KNOWLEDGE_FILE = "knowledge_base.md" | |
| VECTOR_DB_FILE = "vector_database.csv" | |
| METADATA_FILE = "vector_metadata.json" | |
| VECTORIZER_FILE = "vectorize_knowledge_base.py" | |
| # Student-specific log files (with space name prefix) | |
| QUERY_LOG_FILE = f"{STUDENT_SPACE_NAME}_query_log.csv" | |
| FEEDBACK_LOG_FILE = f"{STUDENT_SPACE_NAME}_feedback_log.csv" | |
| # Environment variables | |
| HF_HUB_TOKEN = os.environ.get("HF_HUB_TOKEN", None) | |
| if HF_HUB_TOKEN is None: | |
| raise ValueError("Set HF_HUB_TOKEN in Space Settings -> Secrets") | |
| OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY", None) | |
| if OPENAI_API_KEY is None: | |
| raise ValueError("Set OPENAI_API_KEY in Space Settings -> Secrets") | |
| MODEL = "gpt-4.1-nano-2025-04-14" | |
| def check_session_validity(check_id): | |
| """ | |
| Check if the current session is valid based on: | |
| 1. If user ID matches last query → Allow continue | |
| 2. If user ID doesn't match → Check time interval: | |
| - If time interval is small → Block (previous user just finished) | |
| - If time interval is large → Allow (assistant has been idle) | |
| Returns: | |
| tuple: (is_valid: bool, error_message: str) | |
| """ | |
| try: | |
| filepath = os.path.join(LOCAL_DATA_DIR, QUERY_LOG_FILE) | |
| # If no log file exists, this is the first query - allow it | |
| if not os.path.exists(filepath): | |
| print(f"[check_session_validity] No existing log file, allowing first query for student {check_id}") | |
| return True, "" | |
| # Read the last record from the CSV file | |
| with open(filepath, 'r', encoding='utf-8') as csvfile: | |
| reader = csv.reader(csvfile) | |
| rows = list(reader) | |
| # If only header exists, this is effectively the first query | |
| if len(rows) <= 1: | |
| print(f"[check_session_validity] Only header in log file, allowing first query for student {check_id}") | |
| return True, "" | |
| # Get the last record (most recent query) | |
| last_record = rows[-1] | |
| # CSV format: [student_space, student_id, timestamp, search_info, query_and_response, thumb_feedback] | |
| if len(last_record) < 3: | |
| print(f"[check_session_validity] Invalid last record format, allowing query") | |
| return True, "" | |
| last_student_id = last_record[1] | |
| last_timestamp_str = last_record[2] | |
| print(f"[check_session_validity] Last record - Student ID: {last_student_id}, Timestamp: {last_timestamp_str}") | |
| print(f"[check_session_validity] Current request - Student ID: {check_id}") | |
| # If student ID matches, allow continuation | |
| if last_student_id == check_id: | |
| print(f"[check_session_validity] Same user, allowing continuation for student {check_id}") | |
| return True, "" | |
| # If student ID doesn't match, check time interval | |
| try: | |
| last_timestamp = datetime.strptime(last_timestamp_str, '%Y-%m-%d %H:%M:%S') | |
| current_timestamp = datetime.now() | |
| time_diff = current_timestamp - last_timestamp | |
| print(f"[check_session_validity] Different user - Time difference: {time_diff.total_seconds()} seconds ({time_diff.total_seconds()/60:.1f} minutes)") | |
| # If time difference is small, block access (previous user just finished) | |
| if time_diff <= timedelta(minutes=SESSION_TIMEOUT_MINUTES): | |
| error_msg = "⚠️ The assistant is currently being used by another user. Please return to the load distributor page." | |
| print(f"[check_session_validity] Blocking access - Previous user ({last_student_id}) used assistant {time_diff.total_seconds()/60:.1f} minutes ago") | |
| return False, error_msg | |
| # If time difference is large, allow access (assistant has been idle) | |
| print(f"[check_session_validity] Assistant has been idle for {time_diff.total_seconds()/60:.1f} minutes, allowing new user {check_id}") | |
| return True, "" | |
| except ValueError as e: | |
| print(f"[check_session_validity] Error parsing timestamp: {e}") | |
| # If we can't parse the timestamp, allow the query to proceed | |
| return True, "" | |
| except Exception as e: | |
| print(f"[check_session_validity] Error checking session validity: {e}") | |
| import traceback | |
| print(f"[check_session_validity] Traceback: {traceback.format_exc()}") | |
| # On error, allow the query to proceed to avoid blocking legitimate users | |
| return True, "" | |
| def init_data_storage_repo(): | |
| """Initialize connection to centralized data storage repository""" | |
| try: | |
| repo = Repository( | |
| local_dir=LOCAL_DATA_DIR, | |
| clone_from=DATA_STORAGE_REPO, | |
| revision=DATA_BRANCH_NAME, | |
| repo_type="space", | |
| use_auth_token=HF_HUB_TOKEN | |
| ) | |
| # Configure git user | |
| repo.git_config_username_and_email("git_user", f"Student_Space_{STUDENT_SPACE_NAME}") | |
| repo.git_config_username_and_email("git_email", f"{STUDENT_SPACE_NAME}@student.space") | |
| # Pull latest changes | |
| print(f"[init_data_storage_repo] Pulling latest changes from {DATA_STORAGE_REPO}...") | |
| repo.git_pull(rebase=True) | |
| print(f"[init_data_storage_repo] Successfully connected to data storage repo: {DATA_STORAGE_REPO}") | |
| print(f"[init_data_storage_repo] Local directory: {LOCAL_DATA_DIR}") | |
| print(f"[init_data_storage_repo] Branch: {DATA_BRANCH_NAME}") | |
| # Check if required files exist | |
| required_files = [KNOWLEDGE_FILE, VECTOR_DB_FILE, METADATA_FILE] | |
| for file_name in required_files: | |
| file_path = os.path.join(LOCAL_DATA_DIR, file_name) | |
| if os.path.exists(file_path): | |
| print(f"[init_data_storage_repo] Found required file: {file_name}") | |
| else: | |
| print(f"[init_data_storage_repo] Warning: Missing required file: {file_name}") | |
| return repo | |
| except Exception as e: | |
| print(f"[init_data_storage_repo] Error initializing repository: {e}") | |
| import traceback | |
| print(f"[init_data_storage_repo] Traceback: {traceback.format_exc()}") | |
| return None | |
| def commit_student_logs(commit_message: str): | |
| """Commit student logs to data storage repository with conflict resolution""" | |
| if repo is None: | |
| print("[commit_student_logs] Error: Repository not initialized") | |
| return False | |
| max_retries = 3 | |
| retry_count = 0 | |
| while retry_count < max_retries: | |
| try: | |
| # Check if log files exist before adding | |
| query_log_path = os.path.join(LOCAL_DATA_DIR, QUERY_LOG_FILE) | |
| feedback_log_path = os.path.join(LOCAL_DATA_DIR, FEEDBACK_LOG_FILE) | |
| files_to_add = [] | |
| if os.path.exists(query_log_path): | |
| files_to_add.append(QUERY_LOG_FILE) | |
| print(f"[commit_student_logs] Found query log: {query_log_path}") | |
| if os.path.exists(feedback_log_path): | |
| files_to_add.append(FEEDBACK_LOG_FILE) | |
| print(f"[commit_student_logs] Found feedback log: {feedback_log_path}") | |
| if not files_to_add: | |
| print("[commit_student_logs] No log files to commit") | |
| return False | |
| # Add files individually | |
| for file_name in files_to_add: | |
| print(f"[commit_student_logs] Adding file: {file_name}") | |
| repo.git_add(pattern=file_name) | |
| # Check if there are changes to commit | |
| try: | |
| import subprocess | |
| result = subprocess.run( | |
| ["git", "status", "--porcelain"], | |
| cwd=LOCAL_DATA_DIR, | |
| capture_output=True, | |
| text=True, | |
| check=True | |
| ) | |
| if not result.stdout.strip(): | |
| print("[commit_student_logs] No changes to commit") | |
| return True | |
| print(f"[commit_student_logs] Changes detected: {result.stdout.strip()}") | |
| except Exception as status_error: | |
| print(f"[commit_student_logs] Warning: Could not check git status: {status_error}") | |
| # Commit changes locally first | |
| print(f"[commit_student_logs] Attempt {retry_count + 1}/{max_retries}: Committing locally: {commit_message}") | |
| repo.git_commit(commit_message) | |
| # Now try to pull and push | |
| print("[commit_student_logs] Pulling latest changes...") | |
| repo.git_pull(rebase=True) | |
| # Push changes | |
| print("[commit_student_logs] Pushing to remote...") | |
| repo.git_push() | |
| print(f"[commit_student_logs] Success: {commit_message}") | |
| return True | |
| except Exception as e: | |
| error_msg = str(e) | |
| print(f"[commit_student_logs] Attempt {retry_count + 1} failed: {error_msg}") | |
| # Check if it's a push conflict or pull conflict | |
| if ("rejected" in error_msg and "fetch first" in error_msg) or ("cannot pull with rebase" in error_msg): | |
| print("[commit_student_logs] Detected Git conflict, will retry...") | |
| retry_count += 1 | |
| if retry_count < max_retries: | |
| # Try to reset and start fresh | |
| try: | |
| print("[commit_student_logs] Resetting repository state for retry...") | |
| # Reset to remote state | |
| repo.git_reset("--hard", "HEAD~1") # Undo the commit | |
| repo.git_pull(rebase=True) # Get latest changes | |
| # Wait a bit before retrying to avoid rapid conflicts | |
| import time | |
| wait_time = retry_count * 2 # 2, 4, 6 seconds | |
| print(f"[commit_student_logs] Waiting {wait_time} seconds before retry...") | |
| time.sleep(wait_time) | |
| continue | |
| except Exception as reset_error: | |
| print(f"[commit_student_logs] Reset failed: {reset_error}") | |
| # If reset fails, try alternative approach | |
| try: | |
| # Alternative: stash changes and pull | |
| repo.git_stash() | |
| repo.git_pull(rebase=True) | |
| repo.git_stash("pop") | |
| continue | |
| except Exception as stash_error: | |
| print(f"[commit_student_logs] Stash approach failed: {stash_error}") | |
| return False | |
| else: | |
| print("[commit_student_logs] Max retries reached, giving up") | |
| return False | |
| else: | |
| # Other types of errors, don't retry | |
| print(f"[commit_student_logs] Non-conflict error, not retrying: {error_msg}") | |
| return False | |
| print("[commit_student_logs] Failed after all retry attempts") | |
| return False | |
| def save_student_query_to_csv(query, search_info, response, check_id, thumb_feedback=None): | |
| """Save student query record to centralized CSV file""" | |
| try: | |
| # Validate check_id | |
| if not check_id: | |
| print("[save_student_query_to_csv] Error: No valid check_id provided") | |
| return False | |
| # Ensure the local data directory exists | |
| os.makedirs(LOCAL_DATA_DIR, exist_ok=True) | |
| filepath = os.path.join(LOCAL_DATA_DIR, QUERY_LOG_FILE) | |
| file_exists = os.path.isfile(filepath) | |
| print(f"[save_student_query_to_csv] Saving to: {filepath}") | |
| print(f"[save_student_query_to_csv] File exists: {file_exists}") | |
| print(f"[save_student_query_to_csv] Student ID: {check_id}") | |
| with open(filepath, 'a', newline='', encoding='utf-8') as csvfile: | |
| writer = csv.writer(csvfile) | |
| if not file_exists: | |
| print("[save_student_query_to_csv] Writing header row") | |
| writer.writerow(['student_space', 'student_id', 'timestamp', 'search_info', 'query_and_response', 'thumb_feedback']) | |
| timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S') | |
| query_and_response = f"Query: {query}\nResponse: {response}" | |
| writer.writerow([STUDENT_SPACE_NAME, check_id, timestamp, search_info, query_and_response, thumb_feedback or ""]) | |
| print(f"[save_student_query_to_csv] Query saved to local file: {filepath}") | |
| # Commit student logs to data storage | |
| print("[save_student_query_to_csv] Attempting to commit to remote repository...") | |
| commit_success = commit_student_logs(f"Add query log from student {check_id} at {timestamp}") | |
| if commit_success: | |
| print("[save_student_query_to_csv] Successfully committed to remote repository") | |
| else: | |
| print("[save_student_query_to_csv] Failed to commit to remote repository") | |
| return True | |
| except Exception as e: | |
| print(f"[save_student_query_to_csv] Error: {e}") | |
| import traceback | |
| print(f"[save_student_query_to_csv] Traceback: {traceback.format_exc()}") | |
| return False | |
| def update_latest_student_query_feedback(feedback_type, check_id): | |
| """Update thumb feedback for the latest student query in CSV""" | |
| try: | |
| # Validate check_id | |
| if not check_id: | |
| print("[update_latest_student_query_feedback] Error: No valid check_id provided") | |
| return False | |
| filepath = os.path.join(LOCAL_DATA_DIR, QUERY_LOG_FILE) | |
| if not os.path.exists(filepath): | |
| print("[update_latest_student_query_feedback] Error: Query log file not found") | |
| return False | |
| # Read existing data | |
| rows = [] | |
| with open(filepath, 'r', encoding='utf-8') as csvfile: | |
| reader = csv.reader(csvfile) | |
| rows = list(reader) | |
| # Update the last row (most recent query) | |
| if len(rows) > 1: # Ensure there's at least one data row beyond header | |
| rows[-1][5] = feedback_type # thumb_feedback column (index 5 for student format) | |
| # Write back to file | |
| with open(filepath, 'w', newline='', encoding='utf-8') as csvfile: | |
| writer = csv.writer(csvfile) | |
| writer.writerows(rows) | |
| print(f"[update_latest_student_query_feedback] Updated feedback: {feedback_type}") | |
| # Commit the update | |
| timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S') | |
| commit_student_logs(f"Update feedback from student {check_id}: {feedback_type} at {timestamp}") | |
| return True | |
| return False | |
| except Exception as e: | |
| print(f"[update_latest_student_query_feedback] Error: {e}") | |
| return False | |
| def save_student_comment_feedback(comment, check_id): | |
| """Save student comment feedback to centralized feedback file""" | |
| try: | |
| # Validate check_id | |
| if not check_id: | |
| print("[save_student_comment_feedback] Error: No valid check_id provided") | |
| return False | |
| filepath = os.path.join(LOCAL_DATA_DIR, FEEDBACK_LOG_FILE) | |
| file_exists = os.path.isfile(filepath) | |
| with open(filepath, 'a', newline='', encoding='utf-8') as csvfile: | |
| writer = csv.writer(csvfile) | |
| if not file_exists: | |
| writer.writerow(['student_space', 'student_id', 'timestamp', 'comment']) | |
| timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S') | |
| writer.writerow([STUDENT_SPACE_NAME, check_id, timestamp, comment]) | |
| print(f"[save_student_comment_feedback] Saved comment to {filepath}") | |
| # Commit student logs | |
| commit_student_logs(f"Add comment feedback from student {check_id} at {timestamp}") | |
| return True | |
| except Exception as e: | |
| print(f"[save_student_comment_feedback] Error: {e}") | |
| return False | |
| def get_url_params(request: gr.Request): | |
| """Extract URL parameters from request""" | |
| if request: | |
| query_params = dict(request.query_params) | |
| check_id = query_params.get('check', None) | |
| if check_id: | |
| return f"RAG Learning Assistant - Student", check_id | |
| else: | |
| return "RAG Learning Assistant - Student", None | |
| return "RAG Learning Assistant - Student", None | |
| def chat_response(message, history, search_info_display, check_id, has_query): | |
| """Process user input and return streaming response""" | |
| if not message.strip(): | |
| return history, search_info_display, has_query | |
| # Check access permission first | |
| if not check_id: | |
| print(f"[chat_response] Access denied: No valid check ID provided") | |
| # Raise error dialog for access denial | |
| raise gr.Error( | |
| "⚠️ Access Restricted\n\n" | |
| "Please access this system through the link provided in Moodle.\n\n" | |
| "If you are a student in this course:\n" | |
| "1. Go to your Moodle course page\n" | |
| "2. Find the 'CivASK' link\n" | |
| "3. Click the link to access the system\n\n" | |
| "If you continue to experience issues, please contact your instructor.", | |
| duration=8 | |
| ) | |
| # NEW: Check session validity before proceeding | |
| session_valid, error_message = check_session_validity(check_id) | |
| if not session_valid: | |
| print(f"[chat_response] Session invalid for student {check_id}") | |
| raise gr.Error(error_message, duration=10) | |
| # Valid access and valid session - proceed with normal AI conversation | |
| print(f"[chat_response] Valid access and session for student ID: {check_id}") | |
| # Convert to messages format if needed | |
| if history and isinstance(history[0], list): | |
| # Convert from tuples to messages format | |
| messages_history = [] | |
| for user_msg, assistant_msg in history: | |
| messages_history.append({"role": "user", "content": user_msg}) | |
| if assistant_msg: | |
| messages_history.append({"role": "assistant", "content": assistant_msg}) | |
| history = messages_history | |
| # Add user message | |
| history.append({"role": "user", "content": message}) | |
| history.append({"role": "assistant", "content": ""}) | |
| search_info_collected = False | |
| search_info_content = "" | |
| content_part = "" | |
| # Process streaming response | |
| for chunk in assistant.generate_response_stream(message): | |
| if not search_info_collected: | |
| if "**Response:**" in chunk: # Support English markers | |
| search_info_content += chunk | |
| search_info_collected = True | |
| yield history, search_info_content, has_query | |
| else: | |
| search_info_content += chunk | |
| yield history, search_info_content, has_query | |
| else: | |
| content_part += chunk | |
| # Update the last assistant message | |
| history[-1]["content"] = content_part | |
| yield history, search_info_content, has_query | |
| # After streaming is complete, save to CSV (only for valid access) | |
| try: | |
| print(f"[chat_response] Saving student query to CSV...") | |
| print(f"Student Space: {STUDENT_SPACE_NAME}") | |
| print(f"Student ID: {check_id}") | |
| print(f"Query: {message}") | |
| save_success = save_student_query_to_csv(message, search_info_content, content_part, check_id) | |
| if save_success: | |
| print(f"[chat_response] Student query saved successfully") | |
| has_query = True # Mark that we have a query to rate | |
| else: | |
| print(f"[chat_response] Failed to save student query") | |
| except Exception as e: | |
| print(f"[chat_response] Error saving student query: {e}") | |
| return history, search_info_content, has_query | |
| # Global variables | |
| repo = None | |
| assistant = None | |
| def main(): | |
| """Main function to initialize and launch the student application""" | |
| global repo, assistant | |
| # Initialize data storage repository connection | |
| repo = init_data_storage_repo() | |
| # Initialize RAG assistant with centralized data storage directory | |
| print(f"[main] Initializing RAG assistant with data directory: {LOCAL_DATA_DIR}") | |
| print(f"[main] Session timeout set to: {SESSION_TIMEOUT_MINUTES} minutes") | |
| assistant = RAGLearningAssistant( | |
| api_key=OPENAI_API_KEY, | |
| model=MODEL, | |
| vector_db_path=LOCAL_DATA_DIR # Pass the data storage repo directory | |
| ) | |
| print(f"[main] RAG assistant initialized successfully") | |
| print(f"[main] Student space: {STUDENT_SPACE_NAME}") | |
| print(f"[main] Data storage repo: {DATA_STORAGE_REPO}") | |
| print(f"[main] Query log file: {QUERY_LOG_FILE}") | |
| print(f"[main] Feedback log file: {FEEDBACK_LOG_FILE}") | |
| # Create interface | |
| with gr.Blocks(title=f"RAG Assistant - {STUDENT_SPACE_NAME}") as interface: | |
| check_id_state = gr.State("1") | |
| has_query_state = gr.State(False) # Track if there's a query to rate | |
| title_display = gr.Markdown(f"# RAG Learning Assistant - {STUDENT_SPACE_NAME}", elem_id="title") | |
| # Only Query Check functionality for students | |
| with gr.Row(): | |
| with gr.Column(scale=4): | |
| chatbot = gr.Chatbot(label="Ask Your Questions", height=500, resizable = True, type="messages", render_markdown=True, latex_delimiters=[ | |
| { "left": "$$", "right": "$$", "display": True }, | |
| { "left": "\(", "right": "\)", "display": False }, | |
| { "left": "$", "right": "$", "display": False }, | |
| { "left": "\[", "right": "\]", "display": True }]) | |
| msg = gr.Textbox(placeholder="Type your message here...", label="Your Message", show_label=True) | |
| # Feedback buttons row | |
| with gr.Row(): | |
| thumbs_up_btn = gr.Button("👍 Good Answer", variant="secondary", size="sm") | |
| thumbs_down_btn = gr.Button("👎 Poor Answer", variant="secondary", size="sm") | |
| feedback_status = gr.Textbox(label="Feedback Status", interactive=False, lines=1) | |
| # Comment section | |
| with gr.Row(): | |
| comment_input = gr.Textbox(placeholder="Share your comments or suggestions...", label="Comments", lines=2) | |
| submit_comment_btn = gr.Button("Submit Comment", variant="primary") | |
| with gr.Column(scale=1): | |
| search_info = gr.Markdown(label="Search Analysis Information", value="") | |
| # Event handlers | |
| def init_from_url(request: gr.Request): | |
| title, check_id = get_url_params(request) | |
| print(f"[init_from_url] Extracted check_id: {check_id}") | |
| return f"# {title}", check_id, False # Reset has_query state | |
| # Feedback handlers | |
| def handle_thumbs_up(check_id, has_query): | |
| if not check_id: | |
| raise gr.Error( | |
| "⚠️ Access Restricted\n\n" | |
| "Please access this system through the CivASK link provided in Moodle to use the feedback features.", | |
| duration=5 | |
| ) | |
| print(f"[handle_thumbs_up] Student: {STUDENT_SPACE_NAME}, check_id: {check_id}") | |
| # Check if student query log exists and has queries | |
| filepath = os.path.join(LOCAL_DATA_DIR, QUERY_LOG_FILE) | |
| if os.path.exists(filepath): | |
| with open(filepath, 'r', encoding='utf-8') as csvfile: | |
| reader = csv.reader(csvfile) | |
| rows = list(reader) | |
| if len(rows) > 1: # Has header + at least one data row | |
| success = update_latest_student_query_feedback("thumbs_up", check_id) | |
| return "👍 Thank you for your positive feedback!" if success else "Failed to save feedback" | |
| return "No query to rate yet" | |
| def handle_thumbs_down(check_id, has_query): | |
| if not check_id: | |
| raise gr.Error( | |
| "⚠️ Access Restricted\n\n" | |
| "Please access this system through the CivASK link provided in Moodle to use the feedback features.", | |
| duration=5 | |
| ) | |
| print(f"[handle_thumbs_down] Student: {STUDENT_SPACE_NAME}, check_id: {check_id}") | |
| # Check if student query log exists and has queries | |
| filepath = os.path.join(LOCAL_DATA_DIR, QUERY_LOG_FILE) | |
| if os.path.exists(filepath): | |
| with open(filepath, 'r', encoding='utf-8') as csvfile: | |
| reader = csv.reader(csvfile) | |
| rows = list(reader) | |
| if len(rows) > 1: # Has header + at least one data row | |
| success = update_latest_student_query_feedback("thumbs_down", check_id) | |
| return "👎 Thank you for your feedback. We'll work to improve!" if success else "Failed to save feedback" | |
| return "No query to rate yet" | |
| def handle_comment_submission(comment, check_id): | |
| if not check_id: | |
| raise gr.Error( | |
| "⚠️ Access Restricted\n\n" | |
| "Please access this system through the CivASK link provided in Moodle to submit comments.", | |
| duration=5 | |
| ) | |
| if comment.strip(): | |
| success = save_student_comment_feedback(comment.strip(), check_id) | |
| if success: | |
| return "💬 Thank you for your comment!", "" | |
| else: | |
| return "Failed to save comment", comment | |
| return "Please enter a comment", comment | |
| interface.load(fn=init_from_url, outputs=[title_display, check_id_state, has_query_state]) | |
| # Query events | |
| msg.submit( | |
| chat_response, | |
| [msg, chatbot, search_info, check_id_state, has_query_state], | |
| [chatbot, search_info, has_query_state] | |
| ).then(lambda: "", outputs=[msg]) | |
| # Feedback events | |
| thumbs_up_btn.click( | |
| handle_thumbs_up, | |
| inputs=[check_id_state, has_query_state], | |
| outputs=[feedback_status] | |
| ) | |
| thumbs_down_btn.click( | |
| handle_thumbs_down, | |
| inputs=[check_id_state, has_query_state], | |
| outputs=[feedback_status] | |
| ) | |
| submit_comment_btn.click( | |
| handle_comment_submission, | |
| inputs=[comment_input, check_id_state], | |
| outputs=[feedback_status, comment_input] | |
| ) | |
| interface.launch() | |
| if __name__ == "__main__": | |
| main() |