Spaces:
Paused
Paused
| import gradio as gr | |
| import zipfile | |
| import os | |
| import shutil | |
| import subprocess | |
| from chat_with_project import query_project | |
| from get_prompts import get_prompt_for_mode | |
| from dotenv import load_dotenv, set_key | |
| from milvus import initialize_milvus, DEFAULT_MILVUS_HOST, DEFAULT_MILVUS_PORT, DEFAULT_COLLECTION_NAME, DEFAULT_DIMENSION, DEFAULT_MAX_RETRIES, DEFAULT_RETRY_DELAY | |
| from pymilvus import connections, MilvusException, utility | |
| import markdown | |
| # --- Configuration and Setup --- | |
| # Define paths for workspace and extraction directories | |
| WORKSPACE_DIR = "workspace" | |
| EXTRACTION_DIR = "extraction" | |
| # Milvus connection status - Assume connected initially | |
| milvus_connected = True | |
| def clear_directories(): | |
| """Clears the workspace and extraction directories.""" | |
| for directory in [WORKSPACE_DIR, EXTRACTION_DIR]: | |
| if os.path.exists(directory): | |
| shutil.rmtree(directory) | |
| os.makedirs(directory, exist_ok=True) | |
| # Clear directories at startup | |
| clear_directories() | |
| # --- API Key Management --- | |
| def ensure_env_file_exists(): | |
| """Ensures that a .env file exists in the project root.""" | |
| if not os.path.exists(".env"): | |
| with open(".env", "w") as f: | |
| f.write("") # Create an empty .env file | |
| def load_api_key(): | |
| """Loads the API key from the .env file or the environment.""" | |
| ensure_env_file_exists() | |
| load_dotenv() | |
| return os.environ.get("OPENAI_API_KEY") | |
| def update_api_key(api_key): | |
| """Updates the API key in the .env file.""" | |
| if api_key: | |
| set_key(".env", "OPENAI_API_KEY", api_key) | |
| load_dotenv() # Reload environment variables | |
| return "API key updated successfully." | |
| else: | |
| return "API key cannot be empty." | |
| def is_api_key_set(): | |
| """Checks if the API key is set.""" | |
| return bool(load_api_key()) | |
| # --- Core Functionalities --- | |
| def process_zip(zip_file_path): | |
| """Extracts a zip file, analyzes content, and stores information.""" | |
| try: | |
| # Clear existing workspace and extraction directories before processing | |
| clear_directories() | |
| # Extract the zip file | |
| with zipfile.ZipFile(zip_file_path, 'r') as zip_ref: | |
| zip_ref.extractall(WORKSPACE_DIR) | |
| # Run extract.py | |
| subprocess.run(["python", "./utils/extract.py", WORKSPACE_DIR], check=True) | |
| return "Processing complete! Results saved in the 'extraction' directory." | |
| except Exception as e: | |
| return f"An error occurred: {e}" | |
| def init_milvus( | |
| milvus_host, milvus_port, collection_name, dimension, max_retries, retry_delay | |
| ): | |
| """Initializes or loads the Milvus vector database.""" | |
| global milvus_connected | |
| try: | |
| # Convert string inputs to appropriate types | |
| milvus_port = int(milvus_port) | |
| dimension = int(dimension) | |
| max_retries = int(max_retries) | |
| retry_delay = int(retry_delay) | |
| # Call the modified function | |
| success = initialize_milvus( | |
| milvus_host, | |
| milvus_port, | |
| collection_name, | |
| dimension, | |
| max_retries, | |
| retry_delay, | |
| ) | |
| if success: | |
| milvus_connected = True | |
| return "Milvus database initialized or loaded successfully." | |
| else: | |
| milvus_connected = False | |
| return "Error initializing Milvus: Unable to establish connection or initialize." | |
| except Exception as e: | |
| milvus_connected = False | |
| return f"Error initializing Milvus: {e}" | |
| # --- Chatbot Verification --- | |
| def is_project_loaded(): | |
| """Checks if a project has been loaded (i.e., if the extraction directory contains .pkl files).""" | |
| extraction_dir = "extraction" | |
| pkl_files = [f for f in os.listdir(extraction_dir) if f.endswith('.pkl')] | |
| return bool(pkl_files) | |
| # --- Helper Function for Developer Mode --- | |
| def extract_files_from_response(response): | |
| """ | |
| Parses the LLM response to extract file paths and their corresponding code content. | |
| Args: | |
| response (str): The raw response string from the LLM. | |
| Returns: | |
| dict: A dictionary where keys are file paths and values are the code content of each file. | |
| """ | |
| files = {} | |
| current_file = None | |
| current_content = [] | |
| for line in response.splitlines(): | |
| if line.startswith("--- BEGIN FILE:"): | |
| if current_file is not None: | |
| # Save previous file content | |
| files[current_file] = "\n".join(current_content) | |
| # Start a new file | |
| current_file = line.replace("--- BEGIN FILE:", "").strip() | |
| current_content = [] | |
| elif line.startswith("--- END FILE:"): | |
| if current_file is not None: | |
| # Save current file content | |
| files[current_file] = "\n".join(current_content) | |
| current_file = None | |
| current_content = [] | |
| elif current_file is not None: | |
| # Append line to current file content | |
| current_content.append(line) | |
| return files | |
| # --- Gradio UI Components --- | |
| # Chat Interface | |
| def chat_ui(query, history, mode): | |
| """Handles the chat interaction for Analyzer, Debugger, and Developer modes.""" | |
| api_key = load_api_key() | |
| if not api_key: | |
| return [ | |
| ( | |
| "Error", | |
| "OpenAI API key not set. Please set the API key in the Settings tab.", | |
| ) | |
| ], [] | |
| if not is_project_loaded(): | |
| return [ | |
| ( | |
| "Error", | |
| "No project loaded. Please upload and process a ZIP file first.", | |
| ) | |
| ], [] | |
| if not milvus_connected: | |
| return [ | |
| ("Error", "Milvus is not connected. Please connect to Milvus first.") | |
| ], [] | |
| # Initialize history if None | |
| if history is None: | |
| history = [] | |
| print(f"Chat Mode: {mode}") | |
| system_prompt = get_prompt_for_mode(mode) | |
| print(f"System Prompt: {system_prompt}") | |
| # Pass the query and system prompt to the LLM | |
| response = query_project(query, system_prompt) | |
| print(f"Type response {type(response)}") | |
| print(f"Response from query_project: {response}") | |
| if response is None or not response.strip(): | |
| response = "An error occurred during processing. Please check the logs." | |
| formatted_response = "" | |
| if mode == "developer": | |
| extracted_files = extract_files_from_response(response) | |
| for filepath, content in extracted_files.items(): | |
| formatted_response += f"## {filepath}\n`\n{content}\n`\n\n" | |
| else: | |
| formatted_response = response | |
| # Use HTML directly for code blocks to preserve formatting | |
| if mode == "developer": | |
| formatted_response = formatted_response.replace("`\n", "<pre><code style=\"white-space: pre-wrap;\">").replace("\n`", "</code></pre>") | |
| else: | |
| # Convert the entire response to HTML at once | |
| md = markdown.Markdown(extensions=['fenced_code']) | |
| formatted_response = md.convert(formatted_response) | |
| history.append((query, formatted_response)) | |
| return history, history | |
| # ZIP Processing Interface | |
| zip_iface = gr.Interface( | |
| fn=process_zip, | |
| inputs=gr.File(label="Upload ZIP File"), | |
| outputs="text", | |
| title="Zip File Analyzer", | |
| description="Upload a zip file to analyze and store its contents.", | |
| ) | |
| # Milvus Initialization Interface | |
| milvus_iface = gr.Interface( | |
| fn=init_milvus, | |
| inputs=[ | |
| gr.Textbox( | |
| label="Milvus Host", | |
| placeholder=DEFAULT_MILVUS_HOST, | |
| value=DEFAULT_MILVUS_HOST, | |
| ), | |
| gr.Textbox( | |
| label="Milvus Port", | |
| placeholder=DEFAULT_MILVUS_PORT, | |
| value=DEFAULT_MILVUS_PORT, | |
| ), | |
| gr.Textbox( | |
| label="Collection Name", | |
| placeholder=DEFAULT_COLLECTION_NAME, | |
| value=DEFAULT_COLLECTION_NAME, | |
| ), | |
| gr.Textbox( | |
| label="Dimension", | |
| placeholder=str(DEFAULT_DIMENSION), | |
| value=str(DEFAULT_DIMENSION), | |
| ), | |
| gr.Textbox( | |
| label="Max Retries", | |
| placeholder=str(DEFAULT_MAX_RETRIES), | |
| value=str(DEFAULT_MAX_RETRIES), | |
| ), | |
| gr.Textbox( | |
| label="Retry Delay (seconds)", | |
| placeholder=str(DEFAULT_RETRY_DELAY), | |
| value=str(DEFAULT_RETRY_DELAY), | |
| ), | |
| ], | |
| outputs="text", | |
| title="Milvus Database Initialization", | |
| description="Initialize or load the Milvus vector database.", | |
| ) | |
| # Gradio Chatbot UI Interface | |
| chat_iface = gr.Interface( | |
| fn=chat_ui, | |
| inputs=[ | |
| gr.Textbox(label="Ask a question", placeholder="Type your question here"), | |
| gr.State(), # Maintains chat history | |
| gr.Radio( | |
| ["analyzer", "debugger", "developer"], | |
| label="Chat Mode", | |
| value="analyzer", | |
| ), | |
| ], | |
| outputs=[ | |
| gr.Chatbot(label="Chat with Project", height=500), | |
| "state", # This is to store the state, | |
| ], | |
| title="Chat with your Project", | |
| description="Ask questions about the data extracted from the zip file.", | |
| # Example usage - Corrected to only include instruction and mode | |
| examples=[ | |
| ["What is this project about?", "analyzer"], | |
| ["Are there any potential bugs?", "debugger"], | |
| ["How does the data flow through the application?", "analyzer"], | |
| ["Explain the main components of the architecture.", "analyzer"], | |
| ["What are the dependencies of this project?", "analyzer"], | |
| ["Are there any potential memory leaks?", "debugger"], | |
| [ | |
| "Identify any areas where the code could be optimized.", | |
| "debugger", | |
| ], | |
| [ | |
| "Please implement basic logging for the main application and save logs to a file.", | |
| "developer", | |
| ], | |
| [ | |
| "Can you add a try/except blocks in main functions to handle exceptions", | |
| "developer", | |
| ], | |
| ], | |
| ) | |
| # Settings Interface | |
| settings_iface = gr.Interface( | |
| fn=update_api_key, | |
| inputs=gr.Textbox(label="OpenAI API Key", type="password"), | |
| outputs="text", | |
| title="Settings", | |
| description="Set your OpenAI API key.", | |
| ) | |
| # Status Interface | |
| def get_api_key_status(): | |
| if is_api_key_set(): | |
| return "API key status: Set" | |
| else: | |
| return "API key status: Not set" | |
| def get_milvus_status(): | |
| if milvus_connected: | |
| return "Milvus: Connected" | |
| else: | |
| return "Milvus: Disconnected" | |
| status_iface = gr.Interface( | |
| fn=lambda: [get_api_key_status(), get_milvus_status()], | |
| inputs=None, | |
| outputs=["text", "text"], | |
| live=True, | |
| title="Status", | |
| ) | |
| # Add credits to the UI | |
| credits = gr.Markdown( | |
| "## Credits\n\nCreated by [Ruslan Magana Vsevolodovna](https://ruslanmv.com/)" | |
| ) | |
| # --- Main Application Launch --- | |
| # Combine the interfaces using Tabs | |
| demo = gr.TabbedInterface( | |
| [zip_iface, milvus_iface, chat_iface, settings_iface, status_iface], | |
| ["Process ZIP", "Init Milvus", "Chat with Project", "Settings", "Status"], | |
| ) | |
| # Launch the app with credits | |
| demo.queue().launch() |