Spaces:
Sleeping
Sleeping
| import re | |
| import json | |
| import time | |
| import requests | |
| import importlib.metadata | |
| import gradio as gr | |
| import os # Needed for writing files | |
| from huggingface_hub import ( | |
| create_repo, upload_file, list_models, constants | |
| ) | |
| from huggingface_hub.utils import build_hf_headers, get_session, hf_raise_for_status | |
| from google import genai | |
| # Import Content and Part types for structured input | |
| from google.genai.types import Content, Part | |
| from google.genai.types import Tool, GenerateContentConfig, GoogleSearch | |
| # --- USER INFO & MODEL LISTING --- | |
| def show_profile(profile: gr.OAuthProfile | None) -> str: | |
| return f"✅ Logged in as **{profile.username}**" if profile else "*Not logged in.*" | |
| def list_private_models( | |
| profile: gr.OAuthProfile | None, | |
| oauth_token: gr.OAuthToken | None | |
| ) -> str: | |
| # Gradio injects profile and oauth_token automatically when inputs=None | |
| # and the function signature has these parameter types. | |
| if not profile or not oauth_token or not hasattr(oauth_token, 'token') or not oauth_token.token: | |
| return "Please log in to see your models." | |
| try: | |
| models = [ | |
| f"{m.id} ({'private' if m.private else 'public'})" | |
| for m in list_models(author=profile.username, token=oauth_token.token) | |
| ] | |
| return "No models found." if not models else "Models:\n\n" + "\n - ".join(models) | |
| except Exception as e: | |
| # Catching potential API errors during model listing | |
| return f"Error listing models: {e}" | |
| # --- UTILITIES --- | |
| def get_sdk_version(sdk_choice: str) -> str: | |
| pkg = "gradio" if sdk_choice == "gradio" else "streamlit" | |
| try: | |
| return importlib.metadata.version(pkg) | |
| except importlib.metadata.PackageNotFoundError: | |
| return "UNKNOWN" | |
| def classify_errors(logs: str) -> str: | |
| errs = set() | |
| # Convert logs to lower for case-insensitive matching | |
| logs_lower = logs.lower() | |
| if "syntaxerror" in logs_lower: | |
| errs.add("syntax") | |
| elif "importerror" in logs_lower or "modulenotfounderror" in logs_lower: | |
| errs.add("import") | |
| # Catch common error indicators | |
| elif "traceback" in logs_lower or "exception" in logs_lower or "error" in logs_lower: | |
| errs.add("runtime/generic") # More general error indication | |
| return ", ".join(errs) or "none" | |
| # --- HF SPACE LOGGING --- | |
| def _get_space_jwt(repo_id: str, token: str) -> str: | |
| """Fetches JWT for Space logs using the user's Hf token.""" | |
| url = f"{constants.ENDPOINT}/api/spaces/{repo_id}/jwt" | |
| headers = build_hf_headers(token=token) | |
| r = get_session().get(url, headers=headers) | |
| hf_raise_for_status(r) # Raises HTTPError for bad responses (e.g. 404 if repo doesn't exist) | |
| return r.json()["token"] | |
| def fetch_logs(repo_id: str, level: str, token: str) -> str: | |
| """Fetches build or run logs from an HF Space.""" | |
| if not token: | |
| return "Login required to fetch logs." | |
| try: | |
| jwt = _get_space_jwt(repo_id, token) | |
| url = f"https://api.hf.space/v1/{repo_id}/logs/{level}" | |
| lines = [] | |
| headers = build_hf_headers(token=jwt) | |
| # Use a timeout for the request | |
| with get_session().get(url, headers=headers, stream=True, timeout=10) as resp: | |
| hf_raise_for_status(resp) | |
| # Read lines with a timeout | |
| for raw in resp.iter_lines(decode_unicode=True, chunk_size=512): | |
| if raw is None: # handle keep-alive or similar | |
| continue | |
| if raw.startswith("data: "): | |
| try: | |
| ev = json.loads(raw[len("data: "):]) | |
| ts, txt = ev.get("timestamp","N/A"), ev.get("data","") | |
| lines.append(f"[{ts}] {txt}") | |
| except json.JSONDecodeError: | |
| lines.append(f"Error decoding log line: {raw}") | |
| except Exception as e: | |
| lines.append(f"Unexpected error processing log line: {raw} - {e}") | |
| return "\n".join(lines) | |
| except requests.exceptions.Timeout: | |
| return f"Error: Timeout fetching {level} logs." | |
| except requests.exceptions.RequestException as e: | |
| return f"Error fetching {level} logs: {e}" | |
| except Exception as e: | |
| return f"An unexpected error occurred while fetching logs: {e}" | |
| def check_iframe(url: str, timeout: int = 10) -> bool: | |
| """Checks if the iframe URL is reachable and returns a 200 status code.""" | |
| try: | |
| # Use a HEAD request for efficiency if only status is needed, but GET is safer for | |
| # checking if content is served. Let's stick to GET with a timeout. | |
| response = requests.get(url, timeout=timeout) | |
| return response.status_code == 200 | |
| except requests.exceptions.RequestException: | |
| return False # Any request exception (timeout, connection error, etc.) means it's not accessible | |
| # --- AGENT PROMPTS --- | |
| SYSTEM_ORCHESTRATOR = { | |
| "role": "system", | |
| "content": ( | |
| "You are **Orchestrator Agent**, the project manager. " | |
| "Your role is to guide the development process from user request to a deployed HF Space application. " | |
| "You will analyze the current project state (requirements, plan, files, logs, feedback, status, attempt_count) " | |
| "and decide the *single* next step/task for the team. " | |
| "Output *only* the name of the next task from the following list: " | |
| "'PLANNING', 'CODING - {task_description}', 'PUSHING', 'LOGGING', 'DEBUGGING', 'COMPLETE', 'FAILED'. " | |
| "If moving to 'CODING', briefly describe the specific area to focus on (e.g., 'CODING - Initial UI', 'CODING - Adding Data Loading', 'CODING - Fixing Import Errors'). " | |
| "Analyze the debug feedback and logs carefully to decide the appropriate coding task description." | |
| "If the debug feedback indicates 'All clear', transition to 'COMPLETE'." | |
| "If maximum attempts are reached or a critical error occurs, transition to 'FAILED'." | |
| ) | |
| } | |
| SYSTEM_ARCHITECT = { | |
| "role": "system", | |
| "content": ( | |
| "You are **Architect Agent**, the lead planner. " | |
| "Given the user requirements and the current project state, your task is to devise or refine the high-level plan for the application. " | |
| "Outline the main features, suggest a logical structure, identify potential files (e.g., `app.py`, `utils.py`, `requirements.txt`), and key components needed. " | |
| "The target SDK is {sdk_choice}. The main application file should be `{main_app_file}`. " | |
| "Output the plan clearly, using bullet points or a simple numbered list. Do NOT write code. Focus only on the plan." | |
| ) | |
| } | |
| SYSTEM_CODEGEN = { | |
| "role": "system", | |
| "content": ( | |
| "You are **Code‑Gen Agent**, a proactive AI developer. " | |
| "Your sole responsibility is to author and correct code files based on the plan and the assigned task. " | |
| "You will receive the full project state, including the requirements, plan, existing files, and debug feedback. " | |
| "Based on the current task assigned by the Orchestrator ('{current_task}'), write or modify the necessary code *only* in the specified file(s). " | |
| "Output the *full content* of the updated file(s) in markdown code blocks, clearly indicating the filename(s) immediately before the code block like this: `filename`\n```<language>\ncode goes here\n```" | |
| "If the task involves creating a new file, include it in the output. If modifying an existing file, provide the *complete* modified code for that file." | |
| "Ensure the code adheres to the plan and addresses the debug feedback if provided." | |
| "Only output the code blocks and their preceding filenames. Do not add extra commentary outside the code blocks." | |
| ) | |
| } | |
| SYSTEM_DEBUG = { | |
| "role": "system", | |
| "content": ( | |
| "You are **Debug Agent**, a meticulous code reviewer and tester. " | |
| "You have access to the full project state: requirements, plan, code files, build logs, and run logs. " | |
| "Your task is to analyze the logs and code in the context of the plan and requirements. " | |
| "Identify errors, potential issues, missing features based on the plan, and suggest concrete improvements or fixes for the Code-Gen agent. " | |
| "Pay close attention to the build and run logs for specific errors (SyntaxError, ImportError, runtime errors). " | |
| "Also check if the implemented features align with the plan." | |
| "If the application appears to be working based on the logs and iframe check, and seems to meet the plan's core requirements, state 'All clear. Project appears complete.' as the *first line* of your feedback." | |
| "Otherwise, provide actionable feedback, referencing file names and line numbers where possible. Format feedback clearly." | |
| "Example feedback:\n'Error in `app.py`: ModuleNotFoundError for 'missing_library'. Add 'missing_library' to `requirements.txt`.'\n'Issue: The plan required a download button, but it's missing in `app.py`.'\n'Suggestion: Check the loop in `utils.py`, it might cause an infinite loop based on run logs.' " | |
| "Do NOT write or suggest large code blocks directly in your feedback. Focus on *what* needs fixing/adding and *why*." | |
| ) | |
| } | |
| # --- AGENT RUNNER HELPER --- | |
| def run_agent(client, model_name, system_prompt_template, user_input_state, config): | |
| """Helper to run a single agent interaction using the project state as input.""" | |
| try: | |
| # Format the system prompt using state variables | |
| system_prompt = system_prompt_template["content"].format(**user_input_state) | |
| except KeyError as e: | |
| print(f"Error formatting system prompt: Missing key {e}. Prompt template: {system_prompt_template['content']}") | |
| return f"ERROR: Internal agent error - Missing key {e} for prompt formatting." | |
| # Prepare the message content by formatting the project state | |
| user_message_content = "Project State:\n" + json.dumps(user_input_state, indent=2) | |
| # Define the model to use based on the parameter | |
| model_to_use = model_name | |
| try: | |
| # Use a single Content object with role="user" and combined text in a Part | |
| # The GenAI API for models like Gemini Flash Preview expects input roles to be 'user'. | |
| messages = [ | |
| Content(role="user", parts=[Part(text=system_prompt + "\n\n" + user_message_content)]) | |
| ] | |
| response = client.models.generate_content( | |
| model=model_to_use, | |
| contents=messages, # Pass the list of Content objects | |
| config=config | |
| ) | |
| # API errors are handled by the SDK raising exceptions caught below. | |
| # Some models return parts, concatenate them | |
| # Ensure candidate and content exist before accessing parts | |
| if not response.candidates or not response.candidates[0].content: | |
| print("Agent returned no candidates or empty content.") | |
| # Check if there was a rejection reason | |
| if response.prompt_feedback and response.prompt_feedback.block_reason: | |
| block_reason = response.prompt_feedback.block_reason | |
| print(f"Prompt was blocked. Reason: {block_reason}") | |
| return f"ERROR: Agent response blocked by safety filters. Reason: {block_reason.name}" # Return reason if available | |
| return f"ERROR: Agent returned no response content." | |
| response_text = "".join([part.text for part in response.candidates[0].content.parts]) | |
| print(f"--- Agent Response --- ({model_to_use})") | |
| # print(response_text) # Careful: can be very long | |
| print("----------------------") | |
| # Return just the response text. The calling functions manage the project_state history. | |
| return response_text.strip() | |
| except Exception as e: | |
| print(f"Agent call failed: {e}") | |
| # Attempt to extract error message from response object if possible | |
| error_details = str(e) | |
| if hasattr(e, 'response') and e.response is not None: | |
| try: | |
| # Check if response has a usable text body or error structure | |
| error_json = e.response.json() | |
| error_details = json.dumps(error_json, indent=2) | |
| except: | |
| try: | |
| error_details = e.response.text # Fallback to raw text | |
| except: | |
| pass # Cannot get response text | |
| return f"ERROR: Agent failed - {error_details}" # Indicate failure | |
| # --- AGENT FUNCTIONS (called by Orchestrator) --- | |
| # These functions now expect only the response text from run_agent | |
| def run_planner(client, project_state, config): | |
| print("Orchestrator: Running Planner Agent...") | |
| # Planner needs requirements and basic project info | |
| input_state_for_planner = { | |
| "requirements": project_state['requirements'], | |
| "sdk_choice": project_state['sdk_choice'], | |
| "main_app_file": project_state['main_app_file'], | |
| "files": project_state['files'] # Include existing files | |
| } | |
| response_text = run_agent( | |
| client=client, | |
| model_name="gemini-2.5-flash-preview-04-17", # Use the specific model name | |
| system_prompt_template=SYSTEM_ARCHITECT, | |
| user_input_state=input_state_for_planner, | |
| config=config, | |
| ) | |
| if response_text.startswith("ERROR:"): | |
| project_state['status_message'] = response_text | |
| return False # Indicate failure | |
| project_state['plan'] = response_text | |
| print("Orchestrator: Planner Output Received.") | |
| project_state['status_message'] = "Planning complete." | |
| # Add plan to chat history for user | |
| project_state['chat_history'].append({"role": "assistant", "content": f"**Plan:**\n{project_state['plan']}"}) | |
| return True | |
| def run_codegen(client, project_state, config): | |
| print(f"Orchestrator: Running Code-Gen Agent for task: {project_state['current_task']}...") | |
| # Code-Gen needs requirements, plan, existing files, and debug feedback | |
| input_state_for_codegen = { | |
| "current_task": project_state['current_task'], | |
| "requirements": project_state['requirements'], | |
| "plan": project_state['plan'], | |
| "files": project_state['files'], # Pass current files so it can modify | |
| "feedback": project_state['feedback'] or 'None', | |
| "sdk_choice": project_state['sdk_choice'], | |
| "main_app_file": project_state['main_app_file'] # Ensure it knows the main file convention | |
| } | |
| response_text = run_agent( | |
| client=client, | |
| model_name="gemini-2.5-flash-preview-04-17", # Use the specific model name | |
| system_prompt_template=SYSTEM_CODEGEN, | |
| user_input_state=input_state_for_codegen, | |
| config=config, | |
| ) | |
| if response_text.startswith("ERROR:"): | |
| project_state['status_message'] = response_text | |
| # The error message added here will be processed by the debugger | |
| # No need to add to chat history here, debugger feedback will summarize | |
| return False # Indicate failure | |
| # Parse the response text to extract code blocks for potentially multiple files | |
| files_updated = {} | |
| # Regex to find blocks like `filename`\n```[language]\ncode...\n``` | |
| blocks = re.findall(r"(`[^`]+`)\s*```(?:\w*\n)?([\s\S]*?)```", response_text) | |
| if not blocks: | |
| print("Code-Gen Agent did not output any code blocks in expected format.") | |
| parse_error_msg = "ERROR: Code-Gen Agent failed to output code blocks in `filename`\\n```code``` format." | |
| project_state['status_message'] = parse_error_msg | |
| # Add the agent's raw response to feedback for debugging | |
| project_state['feedback'] = project_state['feedback'] + "\n\n" + parse_error_msg + "\nRaw Agent Response (no code blocks detected):\n" + response_text[:1000] + "..." # Add truncated raw response | |
| project_state['chat_history'].append({"role": "assistant", "content": parse_error_msg + "\nSee Debug Feedback for raw response."}) | |
| return False # Indicate failure | |
| syntax_errors = [] | |
| for filename_match, code_content in blocks: | |
| filename = filename_match.strip('`').strip() | |
| if not filename: | |
| syntax_errors.append(f"Code block found with empty filename.") | |
| continue # Skip this block | |
| files_updated[filename] = code_content.strip() # Store updated code | |
| # Quick syntax check for Python files | |
| if filename.endswith('.py'): | |
| try: | |
| compile(code_content, filename, "exec") | |
| # print(f"Syntax check passed for {filename}") # Too verbose | |
| except SyntaxError as e: | |
| syntax_errors.append(f"Syntax Error in {filename}: {e}") | |
| print(f"Syntax Error in {filename}: {e}") | |
| except Exception as e: | |
| syntax_errors.append(f"Unexpected error during syntax check for {filename}: {e}") | |
| print(f"Unexpected error during syntax check for {filename}: {e}") | |
| if not files_updated: | |
| print("Code-Gen Agent outputted blocks but couldn't parse any valid filenames.") | |
| parse_error_msg = "ERROR: Code-Gen Agent outputted blocks but couldn't parse any valid filenames." | |
| project_state['status_message'] = parse_error_msg | |
| project_state['feedback'] = project_state['feedback'] + "\n\n" + parse_error_msg | |
| project_state['chat_history'].append({"role": "assistant", "content": parse_error_msg}) | |
| return False # Indicate failure | |
| if syntax_errors: | |
| # If syntax errors found, add them to feedback and signal failure for CodeGen step | |
| syntax_error_msg = "ERROR: Code-Gen Agent introduced syntax errors." | |
| project_state['feedback'] = syntax_error_msg + "\n" + "\n".join(syntax_errors) + "\n\n" + project_state.get('feedback', '') # Prepend errors | |
| project_state['status_message'] = syntax_error_msg + " Debugging needed." | |
| # Add syntax errors to chat history for user visibility | |
| project_state['chat_history'].append({"role": "assistant", "content": project_state['status_message']}) | |
| project_state['chat_history'].append({"role": "assistant", "content": "Details:\n" + "\n".join(syntax_errors)}) | |
| return False # Indicate failure due to syntax errors | |
| project_state['files'].update(files_updated) # Update existing files or add new ones | |
| print(f"Orchestrator: Code-Gen Agent updated files: {list(files_updated.keys())}") | |
| # Add the generated/updated code content snippet to the chat history for visibility | |
| code_summary = "\n".join([f"`{fn}`:\n```python\n{code[:500]}{'...' if len(code) > 500 else ''}\n```" for fn, code in files_updated.items()]) # Show snippet | |
| project_state['chat_history'].append({"role": "assistant", "content": f"**Code Generated/Updated:**\n\n{code_summary}"}) | |
| project_state['status_message'] = f"Code generated/updated: {list(files_updated.keys())}" | |
| return True # Indicate success | |
| def run_debugger(client, project_state, config): | |
| print("Orchestrator: Running Debug Agent...") | |
| # Debugger needs requirements, plan, files, logs, and iframe status | |
| input_state_for_debugger = { | |
| "requirements": project_state['requirements'], | |
| "plan": project_state['plan'], | |
| "files": project_state['files'], | |
| "build_logs": project_state['logs'].get('build', 'No build logs.'), | |
| "run_logs": project_state['logs'].get('run', 'No run logs.'), | |
| "iframe_status": 'Responding OK' if project_state.get('iframe_ok', False) else 'Not responding or check failed.', | |
| "error_types_found": classify_errors(project_state['logs'].get('build', '') + '\n' + project_state['logs'].get('run', '')) | |
| } | |
| response_text = run_agent( | |
| client=client, | |
| model_name="gemini-2.5-flash-preview-04-17", # Use the specific model name | |
| system_prompt_template=SYSTEM_DEBUG, | |
| user_input_state=input_state_for_debugger, | |
| config=config, | |
| ) | |
| if response_text.startswith("ERROR:"): | |
| project_state['status_message'] = response_text | |
| # Add the debugger error to feedback for visibility | |
| project_state['feedback'] = project_state.get('feedback', '') + "\n\n" + response_text | |
| project_state['chat_history'].append({"role": "assistant", "content": response_text}) # Add error to chat | |
| return False # Indicate failure | |
| project_state['feedback'] = response_text | |
| print("Orchestrator: Debug Agent Feedback Received.") | |
| project_state['status_message'] = "Debug feedback generated." | |
| # Add debug feedback to chat history | |
| project_state['chat_history'].append({"role": "assistant", "content": f"**Debug Feedback:**\n{project_state['feedback']}"}) | |
| return True | |
| # --- MAIN ORCHESTRATION LOGIC --- | |
| def orchestrate_development(client, project_state, config, oauth_token_token): | |
| """Manages the overall development workflow.""" | |
| # Initial step transition | |
| if project_state['current_task'] == 'START': | |
| project_state['current_task'] = 'PLANNING' | |
| project_state['status_message'] = "Starting project: Initializing and moving to Planning." | |
| # Add initial message to chat history | |
| project_state['chat_history'].append({"role": "assistant", "content": "Project initialized. Starting development team."}) | |
| while project_state['status'] == 'In Progress' and project_state['attempt_count'] < 7: | |
| print(f"\n--- Attempt {project_state['attempt_count'] + 1} ---") | |
| print(f"Current Task: {project_state['current_task']}") | |
| current_task = project_state['current_task'] | |
| # Add current task to history for UI visibility | |
| task_message = f"➡️ Task: {current_task}" | |
| if not project_state['chat_history'] or project_state['chat_history'][-1].get('content', '').strip() != task_message.strip(): | |
| project_state['chat_history'].append({"role": "assistant", "content": task_message}) | |
| step_successful = True # Flag to track if the current step completed without error | |
| if current_task == 'PLANNING': | |
| step_successful = run_planner(client, project_state, config) | |
| if step_successful: | |
| project_state['current_task'] = 'CODING - Initial Implementation' # Move to coding after planning | |
| # Add plan to chat history if it wasn't added by run_planner (depends on its implementation) | |
| if project_state['plan'] and not any("**Plan:**" in msg['content'] for msg in project_state['chat_history']): | |
| project_state['chat_history'].append({"role": "assistant", "content": f"**Plan:**\n{project_state['plan']}"}) | |
| else: | |
| project_state['current_task'] = 'FAILED' # Planning failed, end process | |
| elif current_task.startswith('CODING'): | |
| # Ensure minimum files exist before asking CodeGen to code | |
| # This happens once at the start of the first coding task or if syntax errors occurred | |
| # Simplify the stubbing logic - just ensure these files exist in the state before CodeGen runs | |
| if project_state['main_app_file'] not in project_state['files']: | |
| print(f"Adding initial stub for {project_state['main_app_file']}...") | |
| project_state['files'][project_state['main_app_file']] = f"# Initial {project_state['sdk_choice']} app file\n" # Start with a basic stub | |
| if project_state['sdk_choice'] == 'gradio': | |
| project_state['files'][project_state['main_app_file']] += "import gradio as gr\n\n# Define a simple interface\n# For example: gr.Interface(...).launch()\n" | |
| elif project_state['sdk_choice'] == 'streamlit': | |
| project_state['files'][project_state['main_app_file']] += "import streamlit as st\n\n# Your Streamlit app starts here\n# For example: st.write('Hello, world!')\n" | |
| if 'requirements.txt' not in project_state['files']: | |
| print("Adding initial requirements.txt stub...") | |
| req_content = "pandas\n" + ("streamlit\n" if project_state['sdk_choice']=="streamlit" else "gradio\n") + "google-generativeai\nhuggingface-hub\n" | |
| project_state['files']['requirements.txt'] = req_content | |
| if 'README.md' not in project_state['files']: | |
| print("Adding initial README.md stub...") | |
| readme_content = f"""--- | |
| title: {project_state['repo_id']} | |
| emoji: 🐢 | |
| sdk: {project_state['sdk_choice']} | |
| sdk_version: {project_state['sdk_version']} | |
| app_file: {project_state['main_app_file']} | |
| pinned: false | |
| --- | |
| # {project_state['repo_id']} | |
| This is an auto-generated HF Space. | |
| **Requirements:** {project_state['requirements']} | |
| **Plan:** | |
| {project_state['plan']} | |
| """ | |
| project_state['files']['README.md'] = readme_content | |
| step_successful = run_codegen(client, project_state, config) | |
| if step_successful: | |
| project_state['current_task'] = 'PUSHING' # Always push after attempting to code | |
| else: | |
| # Code-gen failed (syntax error, parsing issue, etc.) | |
| # The failure is handled within run_codegen by setting status_message and feedback | |
| # We'll try debugging/coding again in the next attempt loop iteration if attempts allow | |
| print("Code-Gen step failed. Moving to Debugging.") | |
| # attempt_count is incremented AFTER debugging phase analyses results | |
| project_state['current_task'] = 'DEBUGGING' # Go to debugging to analyze the failure | |
| elif current_task == 'PUSHING': | |
| try: | |
| # Create/update repo first | |
| create_repo(repo_id=project_state['repo_id'], token=oauth_token_token, | |
| exist_ok=True, repo_type="space", space_sdk=project_state['sdk_choice']) | |
| # *** FIX: Filter out any empty string keys before iterating *** | |
| files_to_push = { | |
| fn: content | |
| for fn, content in project_state['files'].items() | |
| if fn and fn.strip() # Keep only non-empty, non-whitespace filenames | |
| } | |
| print(f"Attempting to push {len(files_to_push)} valid files...") | |
| # Write and upload all valid files | |
| for fn, content in files_to_push.items(): | |
| # Ensure directories exist for files like utils/data.py | |
| dirpath = os.path.dirname(fn) | |
| if dirpath: # Only create dir if filename has a path component | |
| os.makedirs(dirpath, exist_ok=True) | |
| with open(fn, "w") as f: | |
| f.write(content) | |
| upload_file( | |
| path_or_fileobj=fn, path_in_repo=fn, | |
| repo_id=project_state['repo_id'], token=oauth_token_token, | |
| repo_type="space" | |
| ) | |
| print(f"Pushed {len(files_to_push)} files to {project_state['repo_id']}") | |
| project_state['status_message'] = f"Pushed code to HF Space **{project_state['repo_id']}**. Waiting for build..." | |
| project_state['chat_history'].append({"role": "assistant", "content": project_state['status_message']}) | |
| project_state['current_task'] = 'LOGGING' # Move to fetching logs | |
| except Exception as e: | |
| step_successful = False | |
| project_state['status'] = 'Failed' # Pushing is critical, fail if it fails | |
| project_state['status_message'] = f"ERROR: Failed to push to HF Space: {e}" | |
| project_state['chat_history'].append({"role": "assistant", "content": project_state['status_message']}) | |
| print(project_state['status_message']) | |
| project_state['current_task'] = 'FINISHED' # End process | |
| elif current_task == 'LOGGING': | |
| # Wait a moment for build to start | |
| time.sleep(5) # Initial wait | |
| wait_time = 5 | |
| max_log_wait = 150 # Increased max wait time for logs | |
| elapsed_log_wait = 0 | |
| logs_fetched = False | |
| iframe_checked = False | |
| status_logging_message = "Fetching logs and checking iframe..." | |
| if not project_state['chat_history'] or project_state['chat_history'][-1].get('content', '').strip() != status_logging_message.strip(): | |
| project_state['chat_history'].append({"role": "assistant", "content": status_logging_message}) | |
| project_state['status_message'] = status_logging_message | |
| while elapsed_log_wait < max_log_wait: | |
| try: | |
| build_logs = fetch_logs(project_state['repo_id'], "build", oauth_token_token) | |
| run_logs = fetch_logs(project_state['repo_id'], "run", oauth_token_token) | |
| project_state['logs']['build'] = build_logs | |
| project_state['logs']['run'] = run_logs | |
| logs_fetched = True | |
| # Only check iframe once logs indicate something might be running, or after a delay | |
| if elapsed_log_wait > 10 or len(run_logs) > 0 or len(build_logs) > 100: | |
| project_state['iframe_ok'] = check_iframe(project_state['iframe_url']) | |
| iframe_checked = True | |
| else: | |
| project_state['iframe_ok'] = False # Assume not ready yet | |
| print(f"Log/Iframe check at {elapsed_log_wait}s. Build logs len: {len(build_logs)}, Run logs len: {len(run_logs)}, Iframe OK: {project_state['iframe_ok']}") | |
| # Conditions to proceed to debugging: | |
| # 1. Iframe is OK (app is running and accessible) - strongest signal | |
| # 2. Build logs show errors (need debugging ASAP) | |
| # 3. Max wait time is almost reached (proceed with whatever logs we have) | |
| # 4. Build logs exist and indicate *some* progress (e.g., contain "Building" or sufficient length) | |
| # 5. Run logs exist (app is at least trying to run) | |
| if project_state['iframe_ok'] or \ | |
| "ERROR" in build_logs.upper() or "FATAL" in build_logs.upper() or \ | |
| elapsed_log_wait >= max_log_wait - wait_time or \ | |
| ("Building" in build_logs or len(build_logs) > 100) and logs_fetched or \ | |
| len(run_logs) > 0: | |
| break # Exit the log fetching wait loop | |
| else: | |
| print(f"Logs or iframe not ready. Waiting {wait_time}s...") | |
| time.sleep(wait_time) | |
| elapsed_log_wait += wait_time | |
| wait_time = min(wait_time * 1.5, 20) # Increase wait time, cap at 20s | |
| except Exception as e: | |
| print(f"Error during log fetching or iframe check: {e}. Will retry.") | |
| time.sleep(wait_time) | |
| elapsed_log_wait += wait_time | |
| wait_time = min(wait_time * 1.5, 20) | |
| # Update status message after the wait loop | |
| if logs_fetched or iframe_checked: # Proceed if we got logs OR checked the iframe | |
| project_state['status_message'] = "Logs fetched and iframe checked (or timeout reached)." | |
| else: | |
| project_state['status_message'] = "Warning: Could not fetch logs or check iframe status within timeout." | |
| step_successful = False # Indicate that this step didn't fully succeed | |
| project_state['chat_history'].append({"role": "assistant", "content": project_state['status_message']}) | |
| project_state['current_task'] = 'DEBUGGING' # Always move to debugging after attempting to log/check | |
| elif current_task == 'DEBUGGING': | |
| step_successful = run_debugger(client, project_state, config) | |
| # Debug feedback is added to chat history inside run_debugger now | |
| if step_successful: | |
| # Analyze feedback to decide next step | |
| feedback = project_state['feedback'] | |
| iframe_ok = project_state.get('iframe_ok', False) | |
| error_types = classify_errors(project_state['logs'].get('build', '') + '\n' + project_state['logs'].get('run', '')) | |
| print(f"Debug Analysis - Feedback: {feedback[:100]}... | Iframe OK: {iframe_ok} | Errors: {error_types}") | |
| # Decision Logic: | |
| # 1. Success? Debugger says clear AND iframe works AND no/minor errors in logs AND run logs have some content | |
| is_complete = ("All clear. Project appears complete." in feedback) or \ | |
| (iframe_ok and error_types == "none" and "ERROR" not in feedback.upper() and len(project_state['logs'].get('run', '')) > 10) | |
| if is_complete: | |
| project_state['status'] = 'Complete' | |
| project_state['current_task'] = 'FINISHED' | |
| project_state['status_message'] = "Debug Agent reports clear. Project appears complete." | |
| elif project_state['attempt_count'] >= 6: # Max attempts reached AFTER debugging analysis | |
| project_state['status'] = 'Failed' | |
| project_state['current_task'] = 'FINISHED' | |
| project_state['status_message'] = f"Max attempts ({project_state['attempt_count']+1}/7) reached after debugging. Project failed." | |
| else: | |
| # Errors or issues found, need more coding/debugging | |
| project_state['current_task'] = 'CODING - Addressing Feedback' | |
| project_state['status_message'] = "Debug Agent found issues. Returning to Coding phase to address feedback." | |
| project_state['attempt_count'] += 1 # Increment attempt count AFTER a debug cycle points back to coding | |
| backoff_wait = min(project_state['attempt_count'] * 5, 30) # Backoff before next coding attempt | |
| print(f"Waiting {backoff_wait} seconds before next coding attempt...") | |
| time.sleep(backoff_wait) | |
| else: | |
| # Debugger failed (e.g. API error) | |
| project_state['status'] = 'Failed' | |
| project_state['current_task'] = 'FINISHED' | |
| # status_message and feedback already set by run_debugger | |
| elif current_task == 'FINISHED': | |
| # Exit the main loop | |
| pass # Loop condition handles exit | |
| else: | |
| # Unknown task | |
| step_successful = False | |
| project_state['status'] = 'Failed' | |
| project_state['status_message'] = f"ERROR: Orchestrator entered an unknown task state: {current_task}" | |
| project_state['chat_history'].append({"role": "assistant", "content": project_state['status_message']}) | |
| print(project_state['status_message']) | |
| project_state['current_task'] = 'FINISHED' # End process | |
| # If a step failed and didn't explicitly set status to FAILED (like PUSHING), | |
| # the orchestrator logic above should handle transition to FAILED or DEBUGGING. | |
| # This check acts as a safeguard. | |
| if not step_successful and project_state['status'] == 'In Progress': | |
| print(f"Orchestration step '{current_task}' failed, but status is still 'In Progress'. Forcing Failure.") | |
| project_state['status'] = 'Failed' | |
| project_state['status_message'] = project_state.get('status_message', f'An unexpected error caused task failure: {current_task}') | |
| project_state['chat_history'].append({"role": "assistant", "content": project_state['status_message']}) | |
| project_state['current_task'] = 'FINISHED' | |
| # --- End of Orchestration Loop --- | |
| # Final status message if loop exited without explicit FINISHED state | |
| if project_state['status'] == 'In Progress': | |
| project_state['status'] = 'Failed' | |
| project_state['status_message'] = project_state.get('status_message', 'Orchestration loop exited unexpectedly.') | |
| # Add final outcome message to history if not already the last message | |
| final_outcome_message = f"**Project Outcome:** {project_state['status']} - {project_state['status_message']}" | |
| if not project_state['chat_history'] or project_state['chat_history'][-1].get('content', '').strip() != final_outcome_message.strip(): | |
| project_state['chat_history'].append({"role": "assistant", "content": final_outcome_message}) | |
| if project_state['status'] == 'Complete': | |
| completion_message = "✅ Application deployed successfully (likely)! Check the preview above." | |
| if not project_state['chat_history'] or project_state['chat_history'][-1].get('content', '').strip() != completion_message.strip(): | |
| project_state['chat_history'].append({"role": "assistant", "content": completion_message}) | |
| elif project_state['status'] == 'Failed': | |
| failure_message = "❌ Project failed to complete. Review logs and feedback for details." | |
| if not project_state['chat_history'] or project_state['chat_history'][-1].get('content', '').strip() != failure_message.strip(): | |
| project_state['chat_history'].append({"role": "assistant", "content": failure_message}) | |
| # Return final state for UI update | |
| return ( | |
| project_state['chat_history'], | |
| project_state['logs'].get('build', 'No build logs.'), | |
| project_state['logs'].get('run', 'No run logs.'), | |
| (f'<iframe src="{project_state["iframe_url"]}" width="100%" height="500px"></iframe>' | |
| + ("" if project_state.get('iframe_ok') else "<p style='color:red;'>⚠️ iframe not responding or check failed.</p>")), | |
| project_state['status_message'] # Return the final status message | |
| ) | |
| # --- MAIN HANDLER (Called by Gradio) --- | |
| # Updated signature to include user_input | |
| def handle_user_message( | |
| history, # This is the list of messages in the Gradio Chatbot (from previous turns) | |
| user_input: str, # <--- The *new* text input from the user_in textbox | |
| sdk_choice: str, | |
| gemini_api_key: str, | |
| grounding_enabled: bool, | |
| temperature: float, | |
| max_output_tokens: int, | |
| profile: gr.OAuthProfile | None, # Gradio auto-injects | |
| oauth_token: gr.OAuthToken | None # Gradio auto-injects | |
| ): | |
| # The user_input is already the new prompt. | |
| # We need to add it to the history list here at the beginning, | |
| # as Gradio's Chatbot expects the handler to return the *updated* history. | |
| # Check if the last message is *not* a user message or is empty to avoid duplicates | |
| if not history or history[-1].get("role") != "user" or history[-1].get("content") != user_input: | |
| history.append({"role": "user", "content": user_input}) | |
| if not profile or not oauth_token or not oauth_token.token: | |
| # Append error message to history for display | |
| error_msg = "⚠️ Please log in first via the Hugging Face button." | |
| if not history or history[-1].get("content") != error_msg: | |
| history.append({"role":"assistant","content":error_msg}) | |
| # Return current state, logs etc. + the new history | |
| return history, "", "", "<p>Please log in.</p>", "Login required." | |
| if not gemini_api_key: | |
| error_msg = "⚠️ Please provide your Gemini API Key." | |
| if not history or history[-1].get("content") != error_msg: | |
| history.append({"role":"assistant","content":error_msg}) | |
| return history, "", "", "<p>Please provide API Key.</p>", "API Key required." | |
| if not user_input or user_input.strip() == "": | |
| # Handle empty prompt case - the prompt is now the user_input parameter | |
| error_msg = "Please enter requirements for the application." | |
| if not history or history[-1].get("content") != error_msg: | |
| history.append({"role":"assistant","content":error_msg}) | |
| return history, "", "", "<p>Enter requirements.</p>", "Waiting for prompt." | |
| client = genai.Client(api_key=gemini_api_key) | |
| repo_id = f"{profile.username}/{profile.username}-auto-space" | |
| iframe_url = f"https://huggingface.co/spaces/{repo_id}" | |
| sdk_version = get_sdk_version(sdk_choice) | |
| code_fn = "app.py" if sdk_choice == "gradio" else "streamlit_app.py" # Standard main file name convention | |
| # The user's latest prompt is the user_input parameter | |
| user_prompt = user_input | |
| # Initialize project state for this development session | |
| # History will be updated throughout and returned at the end | |
| project_state = { | |
| 'requirements': user_prompt, | |
| 'plan': '', | |
| 'files': {}, # Use a dict to store multiple file contents {filename: code} | |
| 'logs': {'build': '', 'run': ''}, | |
| 'feedback': '', | |
| 'current_task': 'START', # Start the orchestration state machine | |
| 'status': 'In Progress', | |
| 'status_message': 'Initializing...', | |
| 'attempt_count': 0, | |
| 'sdk_choice': sdk_choice, | |
| 'sdk_version': sdk_version, | |
| 'repo_id': repo_id, | |
| 'iframe_url': iframe_url, | |
| 'main_app_file': code_fn, | |
| 'chat_history': history[:] # Use the passed-in history to build upon | |
| } | |
| cfg = GenerateContentConfig( | |
| tools=[Tool(google_search=GoogleSearch())] if grounding_enabled else [], | |
| response_modalities=["TEXT"], | |
| temperature=temperature, | |
| max_output_tokens=int(max_output_tokens), # Ensure integer | |
| ) | |
| # Start the orchestration process | |
| final_history, final_build_logs, final_run_logs, final_iframe_html, final_status_message = orchestrate_development( | |
| client, project_state, cfg, oauth_token.token # Pass the token string | |
| ) | |
| # Return the final state for the UI | |
| return ( | |
| final_history, | |
| final_build_logs, | |
| final_run_logs, | |
| final_iframe_html, | |
| final_status_message | |
| ) | |
| # --- SIMPLE UI WITH HIGHER MAX TOKENS & STATUS DISPLAY --- | |
| with gr.Blocks(title="HF Space Auto‑Builder (Team AI)") as demo: | |
| gr.Markdown("## 🐢 HF Space Auto‑Builder (Team AI)\nUse AI agents to build and deploy a simple Gradio or Streamlit app on a Hugging Face Space.") | |
| gr.Markdown("1) Log in with Hugging Face. 2) Enter your Gemini API Key. 3) Provide app requirements. 4) Click 'Start Development Team' and watch the process.") | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| # --- LOGIN BUTTON / PROFILE & MODEL LISTING --- | |
| login_btn = gr.LoginButton(variant="huggingface", size="lg") | |
| status_md = gr.Markdown("*Not logged in.*") | |
| models_md = gr.Markdown() | |
| # On app load, show “not logged in” and list public models (or none) | |
| # inputs=None tells Gradio to auto-inject LoginButton state if signature matches | |
| demo.load(show_profile, inputs=None, outputs=status_md, api_name="load_profile") | |
| demo.load(list_private_models, inputs=None, outputs=models_md, api_name="load_models") | |
| # When the user actually logs in: | |
| # inputs=None tells Gradio to auto-inject LoginButton state | |
| login_btn.click( | |
| fn=show_profile, | |
| inputs=None, | |
| outputs=status_md, | |
| api_name="login_profile" | |
| ) | |
| login_btn.click( | |
| fn=list_private_models, | |
| inputs=None, | |
| outputs=models_md, | |
| api_name="login_models" | |
| ) | |
| # --- END LOGIN FIX --- | |
| gr.Markdown("---") | |
| sdk_choice = gr.Radio(["gradio","streamlit"], value="gradio", label="SDK", info="Choose the framework for your app.") | |
| api_key = gr.Textbox(label="Gemini API Key", type="password", info="Get one from Google AI Studio.") | |
| grounding = gr.Checkbox(label="Enable Google Search (Grounding)", value=False, info="Allow agents to use Google Search.") | |
| temp = gr.Slider(0,1,value=0.2, label="Temperature", info="Creativity of agents. Lower is more focused.") | |
| max_tokens = gr.Number(value=4096, label="Max Output Tokens", minimum=1000, info="Max length of agent responses (code, feedback, etc.). Recommend 4096+.") | |
| with gr.Column(scale=2): | |
| project_status_md = gr.Markdown("Waiting for prompt...") | |
| chatbot = gr.Chatbot(type="messages", label="Team Communication & Status", show_copy_button=True) | |
| user_in = gr.Textbox(placeholder="Describe the application you want to build...", label="Application Requirements", lines=3) | |
| send_btn = gr.Button("🚀 Start Development Team") | |
| # Separate accordions for logs and preview | |
| with gr.Accordion("Logs", open=False): | |
| build_box = gr.Textbox(label="Build logs", lines=10, interactive=False, max_lines=20) | |
| run_box = gr.Textbox(label="Run logs", lines=10, interactive=False, max_lines=20) | |
| # Need login state for refresh button. For a lambda function, | |
| # auto-injection doesn't work based on type hints in the same way, | |
| # so we explicitly pass the component state. | |
| refresh_btn = gr.Button("🔄 Refresh Logs Only") | |
| with gr.Accordion("App Preview", open=True): | |
| preview = gr.HTML("<p>App preview will load here when available.</p>") | |
| # Update the button click handler - inputs match handle_user_message signature | |
| send_btn.click( | |
| fn=handle_user_message, | |
| inputs=[ | |
| chatbot, # history | |
| user_in, # user_input | |
| sdk_choice, | |
| api_key, | |
| grounding, | |
| temp, | |
| max_tokens, | |
| # profile (auto-injected by LoginButton based on signature) | |
| # oauth_token (auto-injected by LoginButton based on signature) | |
| ], | |
| outputs=[chatbot, build_box, run_box, preview, project_status_md] | |
| ) | |
| user_in.submit( | |
| fn=handle_user_message, | |
| inputs=[ | |
| chatbot, | |
| user_in, | |
| sdk_choice, | |
| api_key, | |
| grounding, | |
| temp, | |
| max_tokens, | |
| # profile (auto-injected) | |
| # oauth_token (auto-injected) | |
| ], | |
| outputs=[chatbot, build_box, run_box, preview, project_status_md] | |
| ) | |
| # Handler for refreshing logs manually | |
| # For this lambda, we explicitly pass login_btn state as it's not a | |
| # function with OAuth type hints for auto-injection. | |
| refresh_btn.click( | |
| fn=lambda profile_token_state: ( # Receive the tuple (profile, token) from the login_btn state | |
| fetch_logs(f"{profile_token_state[0].username}/{profile_token_state[0].username}-auto-space", "build", profile_token_state[1].token) if profile_token_state and profile_token_state[0] and profile_token_state[1] and profile_token_state[1].token else "Login required to fetch logs.", | |
| fetch_logs(f"{profile_token_state[0].username}/{profile_token_state[0].username}-auto-space", "run", profile_token_state[1].token) if profile_token_state and profile_token_state[0] and profile_token_state[1] and profile_token_state[1].token else "Login required to fetch logs." | |
| ), | |
| inputs=[login_btn], # Pass the login_btn component state | |
| outputs=[build_box, run_box] | |
| ) | |
| # Clean up files created during the process when the app stops (optional, good for Spaces) | |
| # Consider adding more specific cleanup if needed | |
| # demo.on_event("close", lambda: [os.remove(f) for f in os.listdir() if os.path.isfile(f) and (f.endswith(".py") or f.endswith(".txt") or f.endswith(".md"))]) # Be careful with this in production | |
| demo.launch(server_name="0.0.0.0", server_port=7860) |