Spaces:
Runtime error
Runtime error
| from pathlib import Path | |
| import zipfile | |
| import json | |
| import gradio as gr | |
| from openai import AsyncOpenAI | |
| from openai import AsyncOpenAI | |
| import tempfile | |
| import os | |
| import random | |
| import os | |
| from pathlib import Path | |
| import time | |
| import matplotlib.pyplot as plt | |
| import random as r | |
| # BASE_URL = os.getenv("BASE_URL") | |
| API_KEY = os.getenv("API_KEY") | |
| if API_KEY is None: | |
| from keys import openai | |
| API_KEY = openai | |
| BASE_URL = "https://api.openai.com" | |
| if not BASE_URL or not API_KEY: | |
| raise ValueError("BASE_URL or API_KEY environment variables are not set") | |
| client = AsyncOpenAI(api_key=API_KEY) | |
| topics = ["Should social media be regulated as a public utility?","Should the United States federal government ban single-use plastics?","Are charter schools beneficial to the quality of education in the United States?","Should colleges and universities in the United States consider standardized tests in undergraduate admissions decisions?"] | |
| ########################################################################################################## | |
| # HELPER FUNCTIONS # | |
| ########################################################################################################## | |
| # def echo(message, history): | |
| # return random.choice(["Yes", "No"]) | |
| # Prompt chatgpt with a message | |
| async def chatgpt(prompt, history): | |
| messages = [ | |
| {"role": "system", "content": ""} | |
| ] | |
| print(history) | |
| if history: | |
| messages += history | |
| messages += [{"role": "user", "content": prompt}] | |
| try: | |
| response = await client.chat.completions.create( | |
| model="gpt-4o", | |
| messages=messages | |
| ) | |
| except Exception as e: | |
| print(e) | |
| return "I'm sorry, I'm having trouble. Could you please try again?" | |
| return response.choices[0].message.content | |
| async def process_submission(finished_code, user_state): | |
| # Compile and execute user code, generate plot | |
| print("Compiling and plotting code") | |
| print(f"Code: {finished_code}") | |
| with tempfile.NamedTemporaryFile(delete=True, suffix=".py") as f: | |
| f.write(finished_code.encode("utf-8")) | |
| f.flush() | |
| #stdout, stderr, exit_code = await run_command(["python", f.name], timeout=5) | |
| #stdout, stderr, exit_code = await run_command(["python", f.name], timeout=5) | |
| # result = await run_python_code(finished_code) | |
| print(f"Result: {stdout}") | |
| # Check if plot was created | |
| if f"temp_plot_{user_state}.png" in os.listdir(): | |
| return f"temp_plot_{user_state}.png", stdout, stderr | |
| else: | |
| return "No plot generated", stdout, stderr | |
| # return gr.update(value="No plot generated", visible=True), None | |
| # Function to create a zip file | |
| def create_zip_file(jsonl_path, zip_path): | |
| with zipfile.ZipFile(zip_path, 'w') as zipf: | |
| zipf.write(jsonl_path, arcname=Path(jsonl_path).name) | |
| #zipf.write(image_path, arcname=Path(image_path).name) | |
| # Function to assign plots to users randomly | |
| def pick_random_image_for_user(users, images): | |
| assigned_images = {} | |
| for user in users: | |
| assigned_images[user] = random.sample(images, 5) | |
| # print(assigned_images) | |
| return assigned_images | |
| ########################################################################################################## | |
| # GRADIO INTERFACE SETUP # | |
| ########################################################################################################## | |
| # Define each page as a separate function | |
| def create_interface(): | |
| max_num_submissions = 5 | |
| plot_time_limit = 130 | |
| # plot_time_limit = 10 | |
| dialogue_time_limit = 600 | |
| # dialogue_time_limit = 10 | |
| print("Init blocks") | |
| with gr.Blocks() as demo: | |
| user_state = gr.State() | |
| notes_state = gr.State([]) | |
| dialogue_state = gr.State([]) # Store the conversation with the LLM | |
| submission_count = gr.State(0) # Track number of code submissions | |
| produced_codes = gr.State([]) | |
| previous_text = gr.State("") # Track previous text in notepad | |
| random.seed(time.time()) | |
| expertise_survey_responses = gr.State({}) | |
| uncertainty_survey_part_1_responses = gr.State({}) # Store responses to the uncertainty survey | |
| uncertainty_survey_part_2_responses = gr.State({}) # Store responses to the uncertainty survey | |
| uncertainty_survey_part_3_responses = gr.State({}) # Store responses to the uncertainty survey | |
| demographic_survey_responses = gr.State({}) # Store responses to the demographic survey | |
| ########################################################################################################## | |
| # UI SETUP FOR EACH PAGE # | |
| ########################################################################################################## | |
| # Page 1: Login, Add login components | |
| with gr.Column(visible=True) as login_row: | |
| instructions_text = gr.Markdown(f"## Instructions\n\nWelcome to Collaborative Writing! PLEASE READ THE FOLLOWING INSTRUCTIONS CAREFULLY. \ | |
| \n\n You will be asked to write a short essay on a topic. These topics should be largely apolitical and have substantial evidence for both sides. \ | |
| You will have access to an LLM that can assist you in researching and writing this paper. You may also access external resources while writing.\ | |
| Please put effort into this essay as if you were doing it for a class, even though it is not graded.\ | |
| \ | |
| \n\nAt the end of the game, you will be asked to fill out a short demographic survey. \ | |
| Then you will be able to download your session data. Please download and send the zip file to <kaeberlein.c@northeastern.edu>. \ | |
| \n\n**WARNING: You will not be able to go back to previous parts once you proceed, or reload the page.** \ | |
| \n\n**Reminder: this is just a game; your performance will not affect your grade in the class in \ | |
| any form.** \n\n \n\n ### Press the button to start the game. We will first ask some questions about your \ | |
| expertise, and the collaborative writing section will start immediately afterwards.") | |
| #username_input = gr.Textbox(label="Username") | |
| login_button = gr.Button("Continue") | |
| login_error_message = gr.Markdown(visible=False) | |
| message_text = gr.Markdown(f""" | |
| **INFORMATION SHEET**\n | |
| Northeastern University, Khoury\n | |
| Name of Investigator(s): Malihe Alikhani, Asteria Kaeberlein\n | |
| Title of Project: Collaborative Student-LLM Writing | |
| Funded by: Northeastern University | |
| Version date: 5/1/2025 | |
| We are inviting you to participate in a research study. Participating is voluntary; you do not have to participate if you do not want to. You can withdraw from the study at any time. | |
| The purpose of this study is to study how students interact with LLMs. Participating in this research study will include writing a three paragraph essay on a topic through a custom website, then filling out a survey. The collaborative writing will take about 20 minutes to complete. | |
| You can skip questions that you do not want to answer or stop using the website at any time. | |
| Your part in this study will be confidential. Only the researchers on this study will see the information about you. Personal identifiers will not be published or presented. | |
| You will receive $15.00/hour as compensation via Amazon gift cards. This will be emailed to you after you submit the survey. | |
| If you have any questions about this study, please contact Asteria Kaeberlein (kaeberlein.c@northeastern.edu), the person mainly responsible for the research. You can also contact Malihe Alikhani (m.alikhani@northeastern.edu), the Principal Investigator. | |
| If you have any questions about your rights in this research, you can contact the Northeastern University Department of Human Research at Tel: (773) 396-2327, or Email: IRBReview@northeastern.edu . You may call anonymously if you want. | |
| """) | |
| # User Expertise Survey | |
| with gr.Column(visible=False) as expertise_survey: | |
| gr.Markdown("### Student Expertise Survey") | |
| gr.Markdown("Here is a short questionnaire before you get started. Please answer the following questions as accurately as possible.") | |
| expertise_survey_question1 = gr.CheckboxGroup( | |
| ["1 - No experience", "2 - Beginner", "3 - Intermediate", "4 - Advanced", "5 - Expert"], | |
| label="Question 1: How long have you spoken english?" | |
| ) | |
| expertise_survey_question2 = gr.CheckboxGroup( | |
| ["1 - Never ", "2 - A few times before ", "3 - Once a month ", "4 - Once a week", "5 - Daily"], | |
| label="Question 2: How often have you used large language models? " | |
| ) | |
| expertise_survey_submit_button = gr.Button("Submit") | |
| topic = r.choice(topics) | |
| with gr.Column(visible=False) as dialogue_page: | |
| instruction_text = gr.Markdown(f"""## Writing with a collaborator: | |
| Take a position on the following topic, then write ~3 paragraphs collaboratively with ChatGPT, arguing your position. | |
| TOPIC: {topic} | |
| The left side is the "chat" space, the right is the "essay" space. The website is recording the discussion and the edits to the essay. Whenever you send a new message, it records the changes you made to the essay | |
| For your essay, answer the following questions regarding the topic you chose: | |
| 1. What position are you taking regarding this topic? | |
| 2. Why are you taking this position? | |
| 3. What evidence is there to support your position? | |
| 4. What counter arguments are there against your position, and why do you find them unconvincing? | |
| You may use ChatGPT or external sources to draw citations from. | |
| \ | |
| """) | |
| with gr.Row(): | |
| with gr.Column(): | |
| # chatbot = gr.ChatInterface(echo, type="messages") | |
| chatbot = gr.ChatInterface(chatgpt, type="messages") | |
| chatbot.chatbot.height = 400 | |
| chatbot.chatbot.label = "Collaborator LLM" | |
| notepad = gr.Textbox(lines=10, placeholder="Write your essay here", value="", label="Essay", elem_id="notepad") | |
| # start_dialogue_button = gr.Button("Start Dialogue") | |
| dialogue_submit_button = gr.Button("Submit") | |
| # Demographic Survey Page | |
| with gr.Column(visible=False) as demographic_survey: | |
| gr.Markdown("### Demographic Survey") | |
| gr.Markdown("Please answer the following questions to help us understand your background.") | |
| demographic_survey_question1 = gr.CheckboxGroup( | |
| ["Undergraduate", "Graduate", "PhD", "Postdoc", "Faculty", "Industry Professional", "Other"], | |
| label="What is your current academic status?" | |
| ) | |
| demographic_survey_question2 = gr.CheckboxGroup( | |
| ["Bouvé College of Health Sciences", "College of Arts, Media and Design", "College of Engineering", "College of Professional Studies", "College of Science", "D'Amore-McKim School of Business", "Khoury College of Computer Sciences", "School of Law", "Mills College at Northeastern", "Other"], | |
| label="What is your college?" | |
| ) | |
| demographic_survey_question3 = gr.CheckboxGroup( | |
| ["18-23", "23-27", "27-31", "31-35", "35-43", "43+"], | |
| label="What is your age group?" | |
| ) | |
| demographic_survey_question4 = gr.CheckboxGroup( | |
| ["Woman", "Man", "Transgender", "Non-binary", "Prefer not to say"], | |
| label="What is your gender identity?" | |
| ) | |
| demographic_survey_question5 = gr.CheckboxGroup( | |
| ["American Indian or Alaska Native", "Asian or Asian American", "Black or African American", "Hispanic or Latino/a/x", "Native Hawaiian or Other Pacific Islander", "Middle Eastern or North African", "White or European", "Other"], | |
| label="What is your ethnicity? (Select all that apply)" | |
| ) | |
| demographic_survey_submit_button = gr.Button("Submit") | |
| # Exit Page | |
| with gr.Column(visible=False) as exit_page: | |
| gr.Markdown("## Thank you for participating in our Collaborative Writing study! \n\nYour responses have been recorded. Please download your session data below, and send the zip file to <kaeberlein.c@northeastern.edu>.") | |
| download_button = gr.Button("Download Session Data") | |
| file_to_download = gr.File(label="Download Results") | |
| ########################################################################################################## | |
| # FUNCTION DEFINITIONS FOR EACH PAGE # | |
| ########################################################################################################## | |
| def on_login(): | |
| def callback(): | |
| #chosen_image = os.path.join(folder_path, random.choice(assigned_images[username])) | |
| return ( | |
| gr.update(visible=False), # login hidden | |
| gr.update(visible=True), # main interface visible | |
| gr.update(visible=False), # login error message hidden | |
| r.randint(0,99999999), | |
| ) | |
| return callback | |
| """def update_all_instruction_images(chosen_image): | |
| return ( | |
| gr.update(value=chosen_image), | |
| gr.update(value=chosen_image), | |
| gr.update(value=chosen_image), | |
| gr.update(value=chosen_image), | |
| gr.update(value=chosen_image), | |
| gr.update(value=chosen_image) | |
| )""" | |
| def extract_code_context(reference_code, user_state): | |
| with open(reference_code, "r") as f: | |
| code_context = f.read() | |
| print(code_context) | |
| # Remove everything between Part 3: Plot Configuration and Rendering and Part 4: Saving Output | |
| start_index = code_context.find("# ===================\n# Part 3: Plot Configuration and Rendering\n# ===================") | |
| end_index = code_context.find("# ===================\n# Part 4: Saving Output\n# ===================") | |
| code_context = code_context[:start_index] + "# ===================\n# Part 3: Plot Configuration and Rendering\n# ===================\n\n # TODO: YOUR CODE GOES HERE #\n\n\n" + code_context[end_index:] | |
| # plt.savefig is the last line of the code, remove it | |
| end_index = code_context.find("plt.savefig") | |
| code_context = code_context[:end_index] | |
| # and replace with plt.show() | |
| code_context += f"plt.savefig('temp_plot_{user_state}.png')\n" | |
| # code_context += "plt.show()\n" | |
| return code_context | |
| def handle_expertise_survey_response(q1, q2): | |
| # Example: Store responses in a dictionary or process as needed | |
| response = { | |
| "Question 1": q1, | |
| "Question 2": q2 | |
| } | |
| return response | |
| # Function to handle form submission | |
| def handle_part1_survey_response(q1): | |
| # Example: Store responses in a dictionary or process as needed | |
| response = { | |
| "Question 1": q1 | |
| } | |
| return response | |
| def handle_part2_survey_response(q1, q2, q3, q4): | |
| # Example: Store responses in a dictionary or process as needed | |
| response = { | |
| "Question 1": q1, | |
| "Question 2": q2, | |
| "Question 3": q3, | |
| "Question 4": q4 | |
| } | |
| return response | |
| def handle_final_survey_response(q1, q2, q3, q4, q5, q6, q7): | |
| # Example: Store responses in a dictionary or process as needed | |
| response = { | |
| "Question 1": q1, | |
| "Question 2": q2, | |
| "Question 3": q3, | |
| "Question 4": q4, | |
| "Question 5": q5, | |
| "Question 6": q6, | |
| "Question 7": q7 | |
| } | |
| return response | |
| def handle_demographic_survey_response(q1, q2, q3, q4, q5): | |
| # Example: Store responses in a dictionary or process as needed | |
| response = { | |
| "Question 1": q1, | |
| "Question 2": q2, | |
| "Question 3": q3, | |
| "Question 4": q4, | |
| "Question 5": q5 | |
| } | |
| return response | |
| # Timer logic for instructions page | |
| def plot_countdown_timer(): | |
| time_limit = plot_time_limit | |
| start_time = time.time() | |
| while time.time() - start_time < time_limit: | |
| mins, secs = divmod(time_limit - int(time.time() - start_time), 60) | |
| yield f"{mins:02}:{secs:02}", gr.update(), gr.update(visible=False) | |
| yield "00:00", gr.update(visible=False), gr.update(visible=True) | |
| # Timer logic for dialogue page | |
| def dialogue_countdown_timer(): | |
| time_limit = dialogue_time_limit | |
| start_time = time.time() | |
| while time.time() - start_time < time_limit: | |
| mins, secs = divmod(time_limit - int(time.time() - start_time), 60) | |
| yield f"{mins:02}:{secs:02}", gr.update(visible=True), gr.update(visible=False) | |
| yield "00:00", gr.update(visible=False), gr.update(visible=True) | |
| # New function to save dialogue state | |
| def save_dialogue_state(dialogue, dialogue_state): | |
| timestamp = time.strftime("%Y-%m-%d %H:%M:%S") | |
| print(dialogue) | |
| print(dialogue_state) | |
| return dialogue_state + [timestamp, dialogue] | |
| # # Save notes, dialogue, and answers into a file for download | |
| # def prepare_download(notes, dialogue, answers): | |
| # results = { | |
| # "notes": notes, | |
| # "dialogue": dialogue, | |
| # "answers": answers | |
| # } | |
| # with open("session_data.json", "w") as f: | |
| # json.dump(results, f) | |
| # return "session_data.json" | |
| # Add download functionality | |
| def get_download_link(user_state, chosen_image, notes_state, dialogue_state, | |
| produced_codes, reference_code, survey1, survey2, survey3, survey4, survey5): | |
| jsonl_path = Path(f"session_data_{user_state}.jsonl") | |
| with open(jsonl_path, "w") as f: | |
| f.write( | |
| json.dumps( | |
| { | |
| "username": user_state, | |
| "notes": notes_state, | |
| "dialogue_state": dialogue_state, | |
| "expertise_survey": survey1, | |
| "demographics_survey": survey5 | |
| } | |
| ) | |
| + "\n" | |
| ) | |
| #image_path = Path(f"temp_plot_{user_state}.png") | |
| zip_path = Path(f"session_data_{user_state}.zip") | |
| create_zip_file(jsonl_path, zip_path) | |
| if not zip_path.exists(): | |
| return None | |
| return gr.File(value=str(zip_path), visible=True) | |
| async def on_submit(finished_code, submission_count, produced_codes, user_state): | |
| if (max_num_submissions-(submission_count+1)) == 0: | |
| # raise gr.Error("Max submissions reached") | |
| yield ( | |
| gr.update(visible=False), | |
| gr.update(visible=False), # Hide run code button | |
| gr.update(visible=False), # Hide retry button | |
| gr.update(visible=True), # Show finished button | |
| gr.update(visible=False), # Hide plot output | |
| submission_count, | |
| produced_codes, | |
| gr.update(visible=False), # stdout | |
| gr.update(visible=False) #submission counter | |
| ) | |
| raise gr.Error("Max submissions reached") | |
| else: | |
| submission_count += 1 | |
| # Show processing message and hide other elements | |
| yield ( | |
| gr.update(visible=True), # Show processing message | |
| gr.update(visible=False), # Hide run code button | |
| gr.update(visible=False), # Hide retry button | |
| gr.update(visible=False), # Hide finished button | |
| gr.update(visible=False), # Hide plot output | |
| submission_count, | |
| produced_codes, | |
| gr.update(visible=False), # stdout | |
| gr.update(value=max_num_submissions-submission_count) #submission counter | |
| ) | |
| # Process the submission | |
| plot_output, stdout, stderr = await process_submission(finished_code, user_state) | |
| # Hide processing message and show result | |
| yield ( | |
| gr.update(visible=False), # Hide processing message | |
| gr.update(visible=False), # Hide submit button | |
| gr.update(visible=True), # Show retry button | |
| gr.update(visible=True), # Show finished button | |
| gr.update(visible=True, value=plot_output), # Show plot output | |
| submission_count, | |
| produced_codes + [finished_code], | |
| gr.update(visible=True, value=stdout+stderr), # stdout | |
| gr.update() #submission counter | |
| ) | |
| def on_retry(finished_code, produced_codes): | |
| # Hide processing message and show result | |
| yield ( | |
| gr.update(visible=False), # Hide processing message | |
| gr.update(visible=True), # Show submit button | |
| gr.update(visible=False), # Hide retry button | |
| gr.update(visible=False), # Hide finished button | |
| gr.update(visible=False), # Hide plot output | |
| produced_codes + [finished_code] | |
| ) | |
| def filter_paste(previous_text, new_text): | |
| # Check if the new input is a result of pasting (by comparing lengths or content) | |
| print(f"New text: {new_text}") | |
| changed_text = new_text.replace(previous_text, "") | |
| if len(changed_text) > 10: # Paste generally increases length significantly | |
| return previous_text, previous_text # Revert to previous text if paste is detected | |
| previous_text = new_text | |
| print(f"Previous text: {previous_text}") | |
| return previous_text, new_text | |
| def save_notes_with_timestamp(notes, notes_state): | |
| timestamp = time.strftime("%Y-%m-%d %H:%M:%S") | |
| notes_state.append(f"{timestamp}: {notes}") | |
| return notes_state | |
| ########################################################################################################## | |
| # EVENT HANDLERS FOR EACH PAGE # | |
| ########################################################################################################## | |
| # Page navigation | |
| login_button.click( | |
| on_login(), | |
| #inputs=[username_input], | |
| outputs=[login_row, expertise_survey, login_error_message, user_state], | |
| ) | |
| # login_button.click(lambda: os.path.join(folder_path, random.choice(images)), outputs=[chosen_image_state]) | |
| # login_button.click(lambda: chosen_image_state.replace(".png", ".py"), inputs=[chosen_image_state], outputs=[reference_code_state]) | |
| expertise_survey_submit_button.click( | |
| handle_expertise_survey_response, | |
| inputs=[expertise_survey_question1, expertise_survey_question2], | |
| outputs=[expertise_survey_responses] | |
| ) | |
| expertise_survey_submit_button.click( | |
| lambda: (gr.update(visible=False), gr.update(visible=True)), # Hide survey, show dialogue | |
| inputs=[], outputs=[expertise_survey, dialogue_page] | |
| ) | |
| #dialogue_submit_button.click( | |
| # handle_dialogue_response, | |
| # inputs=[expertise_survey_question1, expertise_survey_question2], | |
| # outputs=[expertise_survey_responses] | |
| #) | |
| # Update to save dialogue state on change | |
| chatbot.chatbot.change( | |
| save_dialogue_state, | |
| inputs=[chatbot.chatbot, dialogue_state], | |
| outputs=[dialogue_state] | |
| ) | |
| dialogue_submit_button.click( | |
| lambda: (gr.update(visible=False), gr.update(visible=True)), # Hide survey, show dialogue | |
| inputs=[], outputs=[dialogue_page, demographic_survey] | |
| ) | |
| demographic_survey_submit_button.click( | |
| handle_demographic_survey_response, | |
| inputs=[demographic_survey_question1, demographic_survey_question2, demographic_survey_question3, demographic_survey_question4, demographic_survey_question5], | |
| outputs=[demographic_survey_responses] | |
| ) | |
| demographic_survey_submit_button.click( | |
| lambda: (gr.update(visible=False), gr.update(visible=True), gr.update(visible=True)), # Hide survey, show exit page | |
| inputs=[], outputs=[demographic_survey, exit_page, download_button] | |
| ) | |
| # notepad.change(filter_paste, | |
| # inputs=[previous_text, notepad], | |
| # outputs=[previous_text, notepad], trigger_mode="always_last") | |
| demographic_survey_submit_button.click(save_notes_with_timestamp, | |
| inputs=[notepad, notes_state], | |
| outputs=[notes_state]) | |
| download_button.click( | |
| get_download_link, | |
| inputs=[user_state, notes_state, | |
| dialogue_state, produced_codes, | |
| expertise_survey_responses, | |
| uncertainty_survey_part_1_responses, | |
| uncertainty_survey_part_2_responses, | |
| uncertainty_survey_part_3_responses, | |
| demographic_survey_responses], | |
| outputs=[file_to_download] | |
| ) | |
| print("Before Load") | |
| demo.load( | |
| lambda: gr.update(visible=True), # Show login page | |
| outputs=login_row, | |
| ) | |
| return demo | |
| # if __name__ == "__main__": | |
| # users = Path("users.txt").read_text().splitlines() | |
| # users = set(user.strip() for user in users if user.strip()) | |
| # chosen_image = pick_random_image() | |
| # reference_code = chosen_image.replace(".png", ".py") | |
| # # code_context = extract_code_context(reference_code) | |
| # demo = create_interface(users, chosen_image, reference_code) | |
| # # demo.launch( | |
| # # server_name=args.server_name, | |
| # # server_port=args.server_port, | |
| # # share=args.share, | |
| # # ) | |
| # demo.launch() | |
| #users = Path("users.txt").read_text().splitlines() | |
| #users = set(user.strip() for user in users if user.strip()) | |
| # chosen_image = pick_random_image() | |
| # reference_code = chosen_image.replace(".png", ".py") | |
| # code_context = extract_code_context(reference_code) | |
| print("BEFORE CREATE") | |
| demo = create_interface() | |
| demo.launch(share=False, server_port = 8000) | |