Spaces:
Runtime error
Runtime error
| from typing import Tuple | |
| import gradio as gr | |
| from Project import * | |
| from common_functions_v4 import * | |
| from event_handler import create_step1_render, create_step2a_render, create_step2b_render, create_step2c_render, create_step2d_render, create_step3_render, setup_all_handlers | |
| from google_drive import * | |
| from notion import * | |
| from state import state | |
| from prompt_configs import * | |
| from typing import Dict, Any, Tuple | |
| import pandas as pd | |
| from io import StringIO | |
| import importlib | |
| import prompt_configs | |
| def update_prompts(): | |
| try: | |
| importlib.reload(prompt_configs) | |
| global PROMPTS | |
| PROMPTS = prompt_configs.PROMPTS | |
| print("PROMPTS updated successfully.") | |
| except Exception as e: | |
| print(f"Error updating PROMPTS: {str(e)}") | |
| def create_ui_component(config: UIConfig, prompt: str = None) -> Any: | |
| if config.component_type == UIComponentType.TEXTBOX: | |
| return gr.Textbox( | |
| label=config.label, | |
| lines=config.lines if config.lines else 1, | |
| interactive=config.interactive, | |
| visible=config.visible, | |
| value=prompt.strip() if prompt else "" | |
| ) | |
| elif config.component_type == UIComponentType.MARKDOWN: | |
| return gr.Markdown( | |
| label=config.label, | |
| visible=config.visible, | |
| show_copy_button=config.show_copy_button, | |
| value=prompt, | |
| elem_classes=["scrollable-markdown"] | |
| ) | |
| elif config.component_type == UIComponentType.DATAFRAME: | |
| return gr.Dataframe( | |
| label=config.label, | |
| interactive=config.interactive, | |
| visible=config.visible | |
| ) | |
| return None | |
| def create_prompt_editor_components(prompt_config: PromptConfig) -> Dict[str, Any]: | |
| components = {} | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| editor_key = next((k for k in prompt_config.ui.keys() if k.endswith('_prompt_editor')), None) | |
| if editor_key: | |
| lines = prompt_config.prompt.split('\n') | |
| if lines: | |
| min_indent = min((len(line) - len(line.lstrip()) | |
| for line in lines if line.strip()), | |
| default=0) | |
| formatted_lines = [ | |
| line[min_indent:] if line.strip() else '' | |
| for line in lines | |
| ] | |
| formatted_prompt = '\n'.join(formatted_lines) | |
| else: | |
| formatted_prompt = "" | |
| # Create prompt editor in dropdown | |
| editor = create_ui_component( | |
| prompt_config.ui[editor_key], | |
| formatted_prompt | |
| ) | |
| # save_button = gr.Button("Save Changes") | |
| # Add save status display | |
| save_status = gr.Markdown("Ready to save changes...", visible=True) | |
| def save_prompt_changes(new_prompt: str, prompt_key: str) -> str: | |
| global PROMPTS | |
| try: | |
| # Find the correct prompt key | |
| prompt_key = None | |
| for key, config in PROMPTS.items(): | |
| if editor_key in config.ui: | |
| prompt_key = key | |
| break | |
| if not prompt_key: | |
| return "❌ Error: Could not find matching prompt key" | |
| # Update in-memory config | |
| PROMPTS[prompt_key].prompt = new_prompt.strip() | |
| # Read current file content | |
| with open("prompt_configs.py", "r", encoding='utf-8') as f: | |
| content = f.read() | |
| # Find the start of the prompt section | |
| section_start = f' "{prompt_key}": PromptConfig(' | |
| start_idx = content.find(section_start) | |
| if start_idx == -1: | |
| return "❌ Error: Could not locate prompt section in config file" | |
| # Find the prompt= part | |
| prompt_start = content.find('prompt=', start_idx) | |
| if prompt_start == -1: | |
| return "❌ Error: Could not locate prompt parameter" | |
| # Find the triple quote after prompt= | |
| first_quote = content.find('"""', prompt_start) | |
| if first_quote == -1: | |
| return "❌ Error: Could not locate start of prompt content" | |
| # Find the closing triple quote | |
| end_quote = content.find('"""', first_quote + 3) | |
| if end_quote == -1: | |
| return "❌ Error: Could not locate end of prompt content" | |
| # Format the new prompt with proper indentation | |
| indented_lines = [] | |
| for line in new_prompt.strip().split('\n'): | |
| if line.strip(): | |
| indented_lines.append(f' {line.strip()}') | |
| else: | |
| indented_lines.append(' ') | |
| # Create the new section with proper formatting | |
| new_section = ( | |
| 'prompt=\n' | |
| ' """\n' + | |
| '\n'.join(indented_lines) + | |
| '\n """' | |
| ) | |
| # Replace just the prompt content | |
| new_content = ( | |
| content[:prompt_start] + | |
| new_section + | |
| content[end_quote + 3:] | |
| ) | |
| # Write back to file | |
| with open("prompt_configs.py", "w", encoding='utf-8') as f: | |
| f.write(new_content) | |
| return "✅ Prompt updated successfully" | |
| except Exception as e: | |
| return f"❌ Error updating prompt: {str(e)}" | |
| temp_changes = {} | |
| def store_changes(new_prompt: str): | |
| # Update the temporary changes dictionary | |
| # Find the correct prompt key | |
| prompt_key = None | |
| for key, config in PROMPTS.items(): | |
| if editor_key in config.ui: | |
| prompt_key = key | |
| break | |
| if not prompt_key: | |
| raise ValueError(f"Could not find matching prompt key for editor {editor_key}") | |
| temp_changes[prompt_key] = new_prompt | |
| return "Changes stored. Click 'Save All' to apply." | |
| editor.change( | |
| fn=store_changes, | |
| inputs=[editor], | |
| outputs=[save_status] | |
| ) | |
| # def save_all_changes(): | |
| # # Iterate over all stored changes and save them | |
| # for key, new_prompt in temp_changes.items(): | |
| # save_prompt_changes(new_prompt, key) | |
| # update_prompts() | |
| # return "✅ All changes saved and prompts reloaded." | |
| # save_button.click( | |
| # fn=save_all_changes, | |
| # inputs=[], | |
| # outputs=[save_status] | |
| # ) | |
| components[editor_key] = editor | |
| # components[f"{editor_key}_save_button"] = save_button | |
| components[f"{editor_key}_save_status"] = save_status | |
| return components | |
| def create_quotation_generator_section(): | |
| # Initialize all variables at the start | |
| quotation_cost = None | |
| page_recalc_btn = None | |
| page_progress_update = None | |
| page_upload_drive_btn = None | |
| page_upload_notion_btn = None | |
| page_notes_box = None | |
| page_save_quotation_btn = None | |
| project_name = None | |
| step1_results = gr.State() | |
| step2a_results = gr.State() | |
| step2b_results = gr.State() | |
| step2c_results = gr.State() | |
| step2d_results = gr.State() | |
| step3_results = gr.State() | |
| with gr.Tab(label="Quotation Generator"): | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| gr.Markdown("## ⚠️ Instructions #2") | |
| for instruction in [ | |
| "1. **Edit System Prompt**: Navigate to the System Prompts tab to edit prompts.", | |
| "2. **Generate Output**: Click generate once ready.", | |
| "3. **Upload Final Documentation**: Give a project name and click upload to Google Drive or Notion", | |
| "4. **Recalculate Cost**: Recalculate the cost if you want to change the mandays estimate" | |
| ]: | |
| gr.Markdown(instruction) | |
| with gr.Column(scale=1): | |
| page_progress_update = gr.Textbox(label="Progress Update", lines=6, interactive=False) | |
| with gr.Row(): | |
| with gr.Column(scale=4): | |
| # Organize steps and their outputs | |
| step_outputs = {} | |
| sub_steps = {} | |
| for prompt_key, prompt_config in PROMPTS.items(): | |
| if prompt_config.step == "Chatbot Prompt Editors": | |
| continue | |
| step = prompt_config.step | |
| sub_step = prompt_config.sub_step | |
| if step and step.strip(): | |
| if step not in step_outputs: | |
| step_outputs[step] = [] | |
| step_outputs[step].append(prompt_key) | |
| # Track sub-steps if they exist | |
| if sub_step: | |
| if step not in sub_steps: | |
| sub_steps[step] = {} | |
| if sub_step not in sub_steps[step]: | |
| sub_steps[step][sub_step] = [] | |
| sub_steps[step][sub_step].append(prompt_key) | |
| all_components = {} | |
| step_buttons = {} | |
| # Create main step accordions | |
| for step_name, prompt_keys in step_outputs.items(): | |
| with gr.Accordion(step_name, open=False): | |
| if "Step 2" in step_name: | |
| with gr.Row(): | |
| with gr.Column(scale=4): | |
| # Check if this step has sub-steps | |
| if step_name in sub_steps: | |
| # Iterate through sub-steps for this step | |
| for sub_step_name, prompt_keys_list in sub_steps[step_name].items(): | |
| button_label = sub_step_name.split(' : ')[1] if ' : ' in sub_step_name else sub_step_name | |
| step_buttons[sub_step_name] = gr.Button(f"✅ {button_label}") | |
| if "Step 2.1" in sub_step_name: | |
| create_step2a_render(step2a_results) | |
| elif "Step 2.2" in sub_step_name: | |
| create_step2b_render(step2b_results) | |
| elif "Step 2.3" in sub_step_name: | |
| create_step2c_render(step2c_results) | |
| elif "Step 2.4" in sub_step_name: | |
| create_step2d_render(step2d_results) | |
| with gr.Column(scale=1): | |
| quotation_cost = gr.Textbox(label="Cost Summary", lines=3, interactive=False) | |
| page_recalc_btn = gr.Button("Recalculate") | |
| # print(f"recalc_btn created: {page_recalc_btn}") | |
| page_notes_box = gr.Textbox( | |
| label="Notes", | |
| lines=3, | |
| placeholder="Add your notes here..." | |
| ) | |
| page_save_quotation_btn = gr.Button("Save Quotation with Note") | |
| elif "Step 1" in step_name: | |
| # Regular step button for non-Step 2 | |
| button_label = step_name.split(' : ')[1] if ' : ' in step_name else step_name | |
| step_buttons[step_name] = gr.Button(f"✅ Generate {button_label}") | |
| create_step1_render(step1_results) | |
| elif "Step 3" in step_name: | |
| # Regular step button for non-Step 2 | |
| button_label = step_name.split(' : ')[1] if ' : ' in step_name else step_name | |
| step_buttons[step_name] = gr.Button(f"✅ Generate {button_label}") | |
| create_step3_render(step3_results) | |
| with gr.Column(scale=1): | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| project_name = gr.Textbox( | |
| placeholder="Please enter a project name to be saved in Google Drive and Notion", | |
| label="Project Name", | |
| lines=1.5, | |
| interactive=True | |
| ) | |
| with gr.Column(scale=1): | |
| page_upload_drive_btn = gr.Button("📁 Upload to Google Drive") | |
| page_upload_notion_btn = gr.Button("📝 Upload to Notion") | |
| # Add system prompts tab | |
| with gr.Tab(label="System Prompts"): | |
| with gr.Row(): | |
| refresh_button = gr.Button("🔄 Refresh All Prompts",visible= False) | |
| bulk_save_button = gr.Button("Save Prompts") | |
| with gr.Row(): | |
| refresh_status = gr.Markdown("", visible= False) | |
| bulk_save_status = gr.Markdown("") | |
| gr.Markdown("Edit the system prompts below to customize the generation process.") | |
| # Function to refresh all prompts from the database | |
| def refresh_all_prompts(): | |
| try: | |
| # Reload configuration from database | |
| state.quotation_project.load_config_from_db() | |
| # Update all UI components with the latest values | |
| updated_count = 0 | |
| for prompt_key, prompt_config in state.quotation_project._db_config.items(): | |
| # print(f"Processing prompt: {prompt_key}") | |
| if prompt_key in all_components: | |
| print(f"Found components for {prompt_key}: {all_components[prompt_key].keys()}") | |
| for sub_key, component in all_components[prompt_key].items(): | |
| # print(f"Processing component {sub_key}") | |
| if sub_key.endswith("_prompt_editor") and hasattr(component, "value"): | |
| print(f"Found prompt editor component: {sub_key}") | |
| # Format the prompt properly | |
| lines = prompt_config.prompt.split('\n') | |
| if lines: | |
| min_indent = min((len(line) - len(line.lstrip()) | |
| for line in lines if line.strip()), | |
| default=0) | |
| formatted_lines = [ | |
| line[min_indent:] if line.strip() else '' | |
| for line in lines | |
| ] | |
| formatted_prompt = '\n'.join(formatted_lines) | |
| else: | |
| formatted_prompt = "" | |
| # Update the component value | |
| print(f"Setting value for {sub_key} to: {formatted_prompt[:100]}...") | |
| component.value = formatted_prompt | |
| updated_count += 1 | |
| print(f"Updated count is now: {updated_count}") | |
| return f"✅ Successfully refreshed {updated_count} prompts from the database" | |
| except Exception as e: | |
| print(f"ERROR in refresh_all_prompts: {str(e)}") | |
| return f"❌ Error refreshing prompts: {str(e)}" | |
| # Group prompts by step | |
| prompts_by_step = {} | |
| print("in create_quotation_generator_section: loading config from db to create accordions") | |
| state.quotation_project.load_config_from_db() | |
| for prompt_key, prompt_config in state.quotation_project._db_config.items(): | |
| step = prompt_config.step | |
| if step not in prompts_by_step: | |
| prompts_by_step[step] = [] | |
| prompts_by_step[step].append((prompt_key, prompt_config)) | |
| # Create an accordion for each step | |
| for step, prompt_configs in prompts_by_step.items(): | |
| with gr.Accordion(step, open=False): | |
| for prompt_key, prompt_config in prompt_configs: | |
| if prompt_config.ui is not None: | |
| editor_key = next((k for k in prompt_config.ui.keys() if k.endswith('_prompt_editor')), None) | |
| if editor_key: | |
| with gr.Accordion(prompt_config.description, open=False): | |
| components = create_prompt_editor_components(prompt_config) | |
| # Add editor components to all_components | |
| if prompt_key in all_components: | |
| all_components[prompt_key].update(components) | |
| else: | |
| all_components[prompt_key] = components | |
| def bulk_save_prompts(): | |
| print("Bulk saving prompts") | |
| try: | |
| # Create a backup of current PROMPTS | |
| original_prompts = {key: prompt_config.prompt for key, prompt_config in PROMPTS.items()} | |
| # Create the output file in prompt_configs_exp.py format | |
| # print("Starting bulk save prompts function") | |
| with open("prompt_configs_exp.txt", "w", encoding="utf-8") as f: | |
| # Write everything at once instead of line by line | |
| output_content = "{\n" | |
| # Iterate through all components and update PROMPTS with editor values | |
| for prompt_key, prompt_config in PROMPTS.items(): | |
| # print(f"Processing prompt key: {prompt_key}") | |
| # Start the PromptConfig section with $$ | |
| output_content += f' $$"{prompt_key}": PromptConfig(\n' | |
| output_content += ' prompt="""\n' | |
| # Get the updated prompt value if it exists in components | |
| updated_prompt = None | |
| if prompt_key in all_components: | |
| for sub_key, component in all_components[prompt_key].items(): | |
| if sub_key.endswith("_prompt_editor"): | |
| try: | |
| if hasattr(component, "value"): | |
| updated_prompt = component.value.strip() | |
| if updated_prompt is None: | |
| print(f"Warning: Prompt value is null for {prompt_key}") | |
| else: | |
| # print(f"Found updated prompt for {prompt_key} with value: {updated_prompt[:50]}") | |
| print(f"Found updated prompt for {prompt_key}") | |
| except Exception: | |
| print(f"Could not find updated prompt for {prompt_key}") | |
| pass | |
| else: | |
| print(f"No components found for {prompt_key}") | |
| # Add the prompt content with proper indentation | |
| #TODO: check why need indentation | |
| prompt_content = updated_prompt if updated_prompt else prompt_config.prompt | |
| for line in prompt_content.split('\n'): | |
| output_content += f' {line}\n' | |
| output_content += ' """,\n' | |
| # Add other PromptConfig attributes | |
| output_content += f' description="{prompt_config.description}",\n' | |
| output_content += f' step="{prompt_config.step if prompt_config.step else ""}",\n' | |
| output_content += f' sub_step="{prompt_config.sub_step if prompt_config.sub_step else ""}",\n' | |
| output_content += f' inputs={prompt_config.inputs},\n' | |
| output_content += f' outputs={prompt_config.outputs},\n' | |
| output_content += f' model={prompt_config.model},\n' | |
| # Add thoughts if they exist | |
| if prompt_config.thoughts: | |
| output_content += f' thoughts={prompt_config.thoughts},\n' | |
| else: | |
| output_content += ' thoughts=None,\n' | |
| # Add UI config | |
| if prompt_config.ui: | |
| output_content += ' ui=' | |
| # Convert UI config to string representation | |
| ui_str = str(prompt_config.ui).replace('UIComponentType.', '') | |
| output_content += f'{ui_str}\n' | |
| else: | |
| output_content += ' ui=None,\n' | |
| # End PromptConfig section with $$ | |
| output_content += ' )$$,\n' | |
| output_content += '}\n' | |
| # Write the entire content at once | |
| f.write(output_content) | |
| save_prompt_to_db(output_content) | |
| load_msg = state.quotation_project.load_config_from_db() | |
| print(f"Successfully updated {len(all_components)} prompts in DB") | |
| return f"✅ Prompts updated successfully; {load_msg}" | |
| except Exception as e: | |
| print(f"ERROR in bulk_save_prompts: {str(e)}") | |
| return f"❌ Error updating prompts: {str(e)}" | |
| # Connect the refresh button to the refresh function | |
| def wait_and_clear(wait_time=4): | |
| """Reusable function to wait and clear status messages""" | |
| time.sleep(wait_time) | |
| return "" | |
| refresh_button.click( | |
| fn=refresh_all_prompts, | |
| inputs=[], | |
| outputs=[refresh_status] | |
| ) | |
| # ).then( | |
| # fn=wait_and_clear, | |
| # inputs=None, | |
| # outputs=refresh_status | |
| # ) | |
| bulk_save_button.click( | |
| fn=bulk_save_prompts, | |
| inputs=[], | |
| outputs=[bulk_save_status] | |
| ) | |
| # ).then( | |
| # fn=wait_and_clear, | |
| # inputs=None, | |
| # outputs=bulk_save_status | |
| # ) | |
| setup_all_handlers( | |
| step_buttons, | |
| all_components, | |
| page_progress_update, | |
| quotation_cost, | |
| page_recalc_btn, | |
| page_upload_drive_btn, | |
| page_upload_notion_btn, | |
| project_name, | |
| step1_results, | |
| step2a_results, | |
| step2b_results, | |
| step2c_results, | |
| step2d_results, | |
| step3_results | |
| ) | |
| return (all_components, step_buttons, | |
| page_progress_update, page_upload_drive_btn, page_upload_notion_btn, | |
| quotation_cost, page_recalc_btn, page_notes_box, page_save_quotation_btn, | |
| project_name , step1_results, step2a_results, step2b_results, step2c_results, step2d_results, step3_results) | |
| with open("page_main.css", "r") as file: | |
| custom_css = file.read() | |
| with gr.Blocks(title="Quotation Chatbot (with SOW)", css=custom_css) as demo: | |
| gr.Markdown("# Quotation Chatbot with SOW") | |
| all_components = {} | |
| with gr.Tab(label="Page Main"): | |
| gr.Markdown("## ⚠️ Instructions #1") | |
| gr.Markdown("### Either select Option 1 or 2 , then scroll down to generate a quotation.") | |
| gr.Markdown("1. **Start a New Session**: Begin answering questions for a new project at the Edit Area.") | |
| gr.Markdown("2. **Load an Existing Project**: Navigate to the **Load Project** tab.") | |
| gr.Markdown("**👇 Note**: You may edit the system prompts for the chatbot as below : ") | |
| with gr.Row(): | |
| start_btn = gr.Button("Start New Session") | |
| with gr.Row(): | |
| current_session_display = gr.Markdown(no_active_session) | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| current_question = gr.Textbox(label="Edit Area", lines=30, interactive=True) | |
| with gr.Column(scale=1): | |
| chatbot = gr.Chatbot(height=580 ,elem_classes=["chatbot-container"]) | |
| gr.ChatInterface( | |
| async_process_response, | |
| chatbot= chatbot, | |
| type="messages", | |
| fill_height=True, | |
| additional_outputs= [current_question], | |
| flagging_mode= "manual" | |
| # show_progress= 'minimal', | |
| # save_history= True | |
| ) | |
| gr.Markdown("Below are sample answers you can refer to:") | |
| with gr.Accordion("Sample AR Answers", open=False): | |
| sample_answers = [ | |
| {'file': 'q1_answer.txt', 'label': 'Company Background & Industry'}, | |
| {'file': 'q2_answer.txt', 'label': 'Current Challenges & Workflow'}, | |
| {'file': 'q3_answer.txt', 'label': 'Project Requirements & Goals'} | |
| ] | |
| for i, qa in enumerate(sample_answers, 1): | |
| with gr.Accordion(f"Q{i}. {qa['label']}", open=False): | |
| gr.Markdown( | |
| value=open(qa['file'], 'r').read(), | |
| visible=True, | |
| show_copy_button=True | |
| ) | |
| (all_components, step_buttons, page_progress_update, | |
| page_upload_drive_btn, page_upload_notion_btn, | |
| quotation_cost, page_recalc_btn, page_notes_box, page_save_quotation_btn, | |
| project_name , step1_results , step2a_results, step2b_results, step2c_results, step2d_results, step3_results) = create_quotation_generator_section() | |
| with gr.Tab(label="Load Project"): | |
| gr.Markdown("### Past submissions") | |
| gr.Markdown("Quick hack to load past submissions to regenerate quotations (This page displays Q&A only; previous quotations are not shown yet).") | |
| gr.Markdown("Use Session ID 15 for test AR") | |
| with gr.Row(): | |
| session_input = gr.Number(label="Session ID", precision=0) | |
| message_box = gr.Textbox(label="Message", interactive=False) | |
| status_box = gr.Textbox( | |
| label="Project Status", | |
| value="", | |
| interactive=False | |
| ) | |
| fetch_btn = gr.Button("Fetch Session") | |
| with gr.Tab(label="Requirements"): | |
| fetched_requirements_box = gr.Markdown(value="") | |
| def setup_event_handlers(): | |
| start_btn.click( | |
| fn=start_chat, | |
| outputs=[current_question, current_session_display] | |
| ) | |
| fetch_btn.click( | |
| fn=fetch_session, | |
| inputs=[session_input], | |
| outputs=[ | |
| status_box, | |
| fetched_requirements_box, | |
| message_box, | |
| current_session_display, | |
| ] | |
| ) | |
| # Call setup_event_handlers after all components are created | |
| setup_event_handlers() | |
| if __name__ == "__main__": | |
| demo.launch(share=True) |