from fastapi import FastAPI, HTTPException from starlette.responses import JSONResponse from models import TaskRequest # Ensure models.py is available from config import get_settings import asyncio import httpx # Used for making the HTTP notification call import json # For parsing the structured JSON response from the LLM import os # For configuration and file system operations import base64 import re import git # For local Git operations import time import shutil import stat # For robust cleanup on Windows # Assuming this model is defined elsewhere # --- Configuration and Setup --- settings = get_settings() # --- Helper Function for Security --- def verify_secret(secret_from_request: str) -> bool: """Checks if the provided secret matches the expected student secret.""" return secret_from_request == settings.STUDENT_SECRET # --- GITHUB CONSTANTS --- GITHUB_API_BASE = "https://api.github.com" # Pages URL is constructed dynamically using the username from settings GITHUB_PAGES_BASE = f"https://{settings.GITHUB_USERNAME}.github.io" # -------------------------- # LLM Configuration GEMINI_API_URL = "https://generativelanguage.googleapis.com/v1beta/models/gemini-2.5-flash-preview-05-20:generateContent" # NOTE: API key is left empty (or read from environment) as per instructions; # the execution environment is assumed to handle the required authentication. GEMINI_API_KEY = settings.GEMINI_API_KEY # Initialize the FastAPI application app = FastAPI( title="Automated Task Receiver & Processor", description="Endpoint for receiving task assignments and triggering AI code generation/deployment." ) # Global storage for the last received task (for demonstration purposes) received_task_data = {} # --- REFACTORING: SPLIT deploy_to_github --- async def setup_local_repo(local_path: str, repo_name: str, repo_url_auth: str, repo_url_http: str, round_index: int) -> git.Repo: """Handles creating the remote repo (R1) or cloning the existing one (R2+) into an EMPTY directory.""" github_username = settings.GITHUB_USERNAME github_token = settings.GITHUB_TOKEN headers = { "Authorization": f"token {github_token}", "Accept": "application/vnd.github.v3+json", "X-GitHub-Api-Version": "2022-11-28" } async with httpx.AsyncClient(timeout=45) as client: try: # 1. CREATE or INITIALIZE REPO / CLONE EXISTING REPO if round_index == 1: print(f" -> R1: Creating remote repository '{repo_name}'...") payload = {"name": repo_name, "private": False, "auto_init": True} response = await client.post(f"{GITHUB_API_BASE}/user/repos", json=payload, headers=headers) response.raise_for_status() # Initialize local git repo in the EMPTY path repo = git.Repo.init(local_path) repo.create_remote('origin', repo_url_auth) print(" -> R1: Local git repository initialized.") elif round_index >= 2: # Crucial part for Round 2: Cloning the existing work into the EMPTY local_path print(f" -> R{round_index}: Cloning existing repository from {repo_url_http}...") # local_path is guaranteed to be empty due to the cleanup and directory creation in the main function repo = git.Repo.clone_from(repo_url_auth, local_path) print(f" -> R{round_index}: Repository cloned and ready for update.") return repo except httpx.HTTPStatusError as e: print(f"--- [API ERROR] GitHub API call failed with status {e.response.status_code}: {e.response.text} ---") raise Exception("GitHub API call failed during repository setup.") except git.GitCommandError as e: print(f"--- [GIT ERROR] Failed to perform git operation: {e} ---") raise Exception("Git operation failed during repository setup.") async def commit_and_publish(repo: git.Repo, task_id: str, round_index: int, repo_name: str) -> dict: """Handles adding, committing, pushing, and configuring GitHub Pages after files are saved.""" github_username = settings.GITHUB_USERNAME github_token = settings.GITHUB_TOKEN headers = { "Authorization": f"token {github_token}", "Accept": "application/vnd.github.v3+json", "X-GitHub-Api-Version": "2022-11-28" } repo_url_http = f"https://github.com/{github_username}/{repo_name}" async with httpx.AsyncClient(timeout=45) as client: try: # 1. CONFIGURE GIT USER (required for commits in Docker) repo.config_writer().set_value("user", "name", "TDS AutoDeploy Bot").release() repo.config_writer().set_value("user", "email", "bot@tds-project.local").release() # 2. ADD, COMMIT, AND PUSH FILES # The new files (generated and attachments) are now in the local_path. repo.git.add(A=True) commit_message = f"Task {task_id} - Round {round_index}: LLM-generated app update/creation" repo.index.commit(commit_message) commit_sha = repo.head.object.hexsha print(f" -> Files committed. SHA: {commit_sha}") # Ensure main branch consistency and push repo.git.branch('-M', 'main') print(" -> Branch renamed to 'main'.") repo.git.push('--set-upstream', 'origin', 'main', force=True) print(" -> Changes pushed to remote 'main' branch.") # Wait for GitHub to register the branch print(" -> Waiting 10 seconds for GitHub to register the main branch...") await asyncio.sleep(10) # 2. ENABLE GITHUB PAGES WITH ROBUST RETRIES print(" -> Enabling GitHub Pages with robust retries...") pages_api_url = f"{GITHUB_API_BASE}/repos/{github_username}/{repo_name}/pages" pages_payload = {"source": {"branch": "main", "path": "/"}} pages_max_retries = 5 pages_base_delay = 3 for retry_attempt in range(pages_max_retries): try: pages_response = await client.get(pages_api_url, headers=headers) is_configured = (pages_response.status_code == 200) if is_configured: print(f" -> Pages exists. Updating configuration (Attempt {retry_attempt + 1}).") (await client.put(pages_api_url, json=pages_payload, headers=headers)).raise_for_status() else: print(f" -> Creating Pages configuration (Attempt {retry_attempt + 1}).") (await client.post(pages_api_url, json=pages_payload, headers=headers)).raise_for_status() print(" -> Pages configuration successful.") break except httpx.HTTPStatusError as e: if e.response.status_code == 422 and "main branch must exist" in e.response.text and retry_attempt < pages_max_retries - 1: delay = pages_base_delay * (2 ** retry_attempt) print(f" -> [Timing Issue] Branch not recognized. Retrying in {delay} seconds...") await asyncio.sleep(delay) else: raise else: raise Exception("Failed to configure GitHub Pages after multiple retries due to branch existence.") # 3. CONSTRUCT RETURN VALUES print(" -> Waiting 5 seconds for GitHub Pages deployment...") await asyncio.sleep(5) pages_url = f"{GITHUB_PAGES_BASE}/{repo_name}/" return { "repo_url": repo_url_http, "commit_sha": commit_sha, "pages_url": pages_url } except git.GitCommandError as e: print(f"--- [GIT ERROR] Failed to perform git operation: {e} ---") raise Exception("Git operation failed during deployment.") except httpx.HTTPStatusError as e: print(f"--- [API ERROR] GitHub API call failed with status {e.response.status_code}: {e.response.text} ---") raise Exception("GitHub API call failed during deployment.") except Exception as e: print(f"--- [CRITICAL ERROR] Deployment failed: {e} ---") raise # --- REMOVED: Original deploy_to_github (replaced by setup_local_repo and commit_and_publish) --- # The function name deploy_to_github is now DELETED. def data_uri_to_gemini_part(data_uri: str) -> dict: """ Extracts Base64 data and MIME type from a Data URI and formats it as the 'inlineData' structure required for a Gemini API multimodal part. """ if not data_uri or not data_uri.startswith("data:"): print("ERROR: Invalid Data URI provided.") return None try: # Extract MIME type and Base64 part using regex match = re.search(r"data:(?P[^;]+);base64,(?P.*)", data_uri, re.IGNORECASE) if not match: print("ERROR: Could not parse MIME type or base64 data from URI.") return None mime_type = match.group('mime_type') base64_data = match.group('base64_data') # Check if it's a known image type to ensure we only send images to the LLM if not mime_type.startswith("image/"): print(f"Skipping attachment with non-image MIME type: {mime_type}") return None return { "inlineData": { "data": base64_data, # The Base64 string itself "mimeType": mime_type } } except Exception as e: print(f"ERROR creating Gemini Part from URI: {e}") return None def is_image_data_uri(data_uri: str) -> bool: """Checks if the data URI refers to an image based on the MIME type.""" if not data_uri.startswith("data:"): return False # Check for "image/" prefix in the MIME type part of the URI return re.search(r"data:image/[^;]+;base64,", data_uri, re.IGNORECASE) is not None # --- Helper Functions for File System Operations --- async def save_generated_files_locally(task_id: str, files: dict) -> str: """ Saves the generated files (index.html, README.md, LICENSE) into a local directory named after the task_id within the 'generated_tasks' folder. Handles both old format {filename: content} and new format {"files": [{path, content}]} """ base_dir = "/tmp/generated_tasks" task_dir = os.path.join(base_dir, task_id) # Ensure the task-specific directory exists # NOTE: This directory is created earlier in the main orchestration function os.makedirs(task_dir, exist_ok=True) print(f"--- [LOCAL_SAVE] Saving files to: {task_dir} ---") # Handle new array-based format: {"files": [{"path": "...", "content": "..."}]} if "files" in files and isinstance(files["files"], list): files_list = files["files"] for file_obj in files_list: filename = file_obj.get("path", "") content = file_obj.get("content", "") if not filename: print(f" -> WARNING: Skipping file with no path") continue # Handle case where content is a list instead of string if isinstance(content, list): print(f" -> WARNING: Content for {filename} is a list, joining with newlines") content = "\n".join(str(item) for item in content) elif not isinstance(content, str): print(f" -> WARNING: Content for {filename} is {type(content)}, converting to string") content = str(content) file_path = os.path.join(task_dir, filename) try: # Create subdirectories if needed (e.g., "css/style.css") os.makedirs(os.path.dirname(file_path), exist_ok=True) # Write the content to the file with open(file_path, "w", encoding="utf-8") as f: f.write(content) print(f" -> Saved: {filename} (Size: {len(content)} bytes)") except Exception as e: print(f" -> ERROR saving {filename}: {e}") print(f" -> Content type: {type(content)}, First 200 chars: {str(content)[:200]}") raise Exception(f"Failed to save file {filename} locally.") # Handle old flat format: {filename: content} (for backwards compatibility) else: for filename, content in files.items(): # Handle case where content is a list instead of string if isinstance(content, list): print(f" -> WARNING: Content for {filename} is a list, joining with newlines") content = "\n".join(str(item) for item in content) elif not isinstance(content, str): print(f" -> WARNING: Content for {filename} is {type(content)}, converting to string") content = str(content) file_path = os.path.join(task_dir, filename) try: # Write the content to the file. Assuming content is a string (text files). with open(file_path, "w", encoding="utf-8") as f: f.write(content) print(f" -> Saved: {filename} (Size: {len(content)} bytes)") except Exception as e: print(f" -> ERROR saving {filename}: {e}") raise Exception(f"Failed to save file {filename} locally.") return task_dir # --- Helper Functions for External Services --- async def call_llm_for_code(prompt: str, task_id: str, image_parts: list) -> dict: """ Calls the Gemini API to generate the web application code and structured metadata (README and LICENSE), now supporting image inputs. The response is strictly validated against a JSON schema. """ print(f"--- [LLM_CALL] Attempting to generate code for Task: {task_id} using Gemini API ---") # Define system instruction for the model system_prompt = ( "You are an expert full-stack engineer and technical writer. Your task is to generate " "a complete web application with three files in a structured JSON response:\n\n" "Return a JSON object with a 'files' array containing:\n" "1. index.html - A single, complete, fully responsive HTML file using Tailwind CSS CDN for styling, " "with all JavaScript inline. Must be production-ready and implement ALL requested features.\n" "2. README.md - Professional project documentation with title, description, features, usage instructions.\n" "3. LICENSE - Full text of the MIT License.\n\n" "Example response structure:\n" "{\n" ' "files": [\n' ' {"path": "index.html", "content": "..."},\n' ' {"path": "README.md", "content": "# Project Title\\n\\n..."},\n' ' {"path": "LICENSE", "content": "MIT License\\n\\nCopyright..."}\n' " ]\n" "}\n\n" "Make the application beautiful, functional, and complete. Use modern design principles." ) # Define the JSON response structure with proper array-based file list response_schema = { "type": "OBJECT", "properties": { "files": { "type": "ARRAY", "items": { "type": "OBJECT", "properties": { "path": {"type": "STRING", "description": "File name (e.g., 'index.html', 'README.md', 'LICENSE')"}, "content": {"type": "STRING", "description": "Full content of the file"} }, "required": ["path", "content"] } } }, "required": ["files"] } # --- CONSTRUCT THE CONTENTS FIELD --- contents = [] if image_parts: # Combine image parts and the text prompt. all_parts = image_parts + [ { "text": prompt } ] contents.append({ "parts": all_parts }) else: # If no images, use the original structure with only the text prompt contents.append({ "parts": [{ "text": prompt }] }) # Construct the final API payload payload = { "contents": contents, "systemInstruction": { "parts": [{ "text": system_prompt }] }, "generationConfig": { "responseMimeType": "application/json", "responseSchema": response_schema } } # Use exponential backoff for the API call max_retries = 5 # Increased from 3 to 5 base_delay = 2 # Increased from 1 to 2 for attempt in range(max_retries): try: # Construct the URL with the API key url = f"{GEMINI_API_URL}?key={GEMINI_API_KEY}" # Increased timeout from 60s to 180s for complex tasks async with httpx.AsyncClient(timeout=180.0) as client: response = await client.post( url, json=payload, headers={"Content-Type": "application/json"} ) response.raise_for_status() # Raises an exception for 4xx/5xx status codes # Parse the response to get the structured JSON text result = response.json() # Extract the generated JSON string from the result json_text = result['candidates'][0]['content']['parts'][0]['text'] # The LLM output is a JSON string, so we need to parse it into a Python dict generated_files = json.loads(json_text) print(f"--- [LLM_CALL] Successfully generated files on attempt {attempt + 1}. ---") return generated_files except httpx.HTTPStatusError as e: print(f"--- [LLM_CALL] HTTP Error on attempt {attempt + 1}: {e.response.status_code} - {e.response.text[:500]} ---") except KeyError as e: print(f"--- [LLM_CALL] KeyError on attempt {attempt + 1}: Missing expected key {e} in LLM response. ---") print(f"--- [LLM_CALL] Full response: {result if 'result' in locals() else 'No response received'} ---") except json.JSONDecodeError as e: print(f"--- [LLM_CALL] JSON Decode Error on attempt {attempt + 1}: {e} ---") print(f"--- [LLM_CALL] Raw LLM output that failed to parse: {json_text[:1000] if 'json_text' in locals() else 'No text extracted'} ---") except httpx.RequestError as e: # Catches network errors print(f"--- [LLM_CALL] Network Error on attempt {attempt + 1}: {type(e).__name__}: {str(e)} ---") if attempt < max_retries - 1: delay = base_delay * (2 ** attempt) print(f"--- [LLM_CALL] Retrying LLM call in {delay} seconds... ---") await asyncio.sleep(delay) # If all retries fail, we raise an exception which is caught downstream print("--- [LLM_CALL] Failed to generate code after multiple retries. ---") raise Exception("LLM Code Generation Failure") async def notify_evaluation_server( evaluation_url: str, email: str, task_id: str, round_index: int, nonce: str, repo_url: str, commit_sha: str, pages_url: str ) -> bool: """ Calls the evaluation_url to notify the server that the code has been deployed. """ payload = { "email": email, "task": task_id, "round": round_index, "nonce": nonce, "repo_url": repo_url, "commit_sha": commit_sha, "pages_url": pages_url } max_retries = 3 base_delay = 1 print(f"--- [NOTIFICATION] Attempting to notify server at {evaluation_url} ---") for attempt in range(max_retries): try: async with httpx.AsyncClient(timeout=10) as client: response = await client.post(evaluation_url, json=payload) response.raise_for_status() # Raises an exception for 4xx/5xx status codes print(f"--- [NOTIFICATION] Successfully notified server. Response: {response.status_code} ---") return True except httpx.HTTPStatusError as e: print(f"--- [NOTIFICATION] HTTP Error on attempt {attempt + 1}: {e}. ---") except httpx.RequestError as e: print(f"--- [NOTIFICATION] Request Error on attempt {attempt + 1}: {e}. ---") if attempt < max_retries - 1: delay = base_delay * (2 ** attempt) print(f"--- [NOTIFICATION] Retrying in {delay} seconds... ---") await asyncio.sleep(delay) print(f"--- [NOTIFICATION] Failed to notify evaluation server after {max_retries} attempts. ---") return False async def save_attachments_locally(task_dir: str, attachments: list) -> list: """ Decodes and saves attachments (provided as Base64 Data URIs) into the task directory. Returns a list of saved filenames. """ saved_files = [] print(f"--- [ATTACHMENTS] Processing {len(attachments)} attachments for: {task_dir} ---") for attachment in attachments: filename = attachment.name data_uri = attachment.url if not filename or not data_uri or not data_uri.startswith("data:"): print(f" -> WARNING: Skipping invalid attachment entry: {filename}") continue # Use regex to extract the Base64 part of the URI (after base64,) match = re.search(r"base64,(.*)", data_uri, re.IGNORECASE) if not match: print(f" -> ERROR: Could not find base64 data in URI for {filename}") continue base64_data = match.group(1) file_path = os.path.join(task_dir, filename) try: # Decode the base64 string file_bytes = base64.b64decode(base64_data) # Write the raw bytes to the file with open(file_path, "wb") as f: f.write(file_bytes) print(f" -> Saved Attachment: {filename} (Size: {len(file_bytes)} bytes)") saved_files.append(filename) except Exception as e: print(f" -> CRITICAL ERROR saving attachment {filename}: {e}") raise Exception(f"Failed to save attachment {filename} locally.") return saved_files # --- Main Orchestration Logic --- async def generate_files_and_deploy(task_data: TaskRequest): """ The asynchronous background process that executes the main project workflow. It adapts the LLM prompt for multi-round tasks and fixes the cloning order. """ task_id = task_data.task email = task_data.email round_index = task_data.round brief = task_data.brief evaluation_url = task_data.evaluation_url nonce = task_data.nonce attachments = task_data.attachments print(f"\n--- [PROCESS START] Starting background task for {task_id}, Round {round_index} ---") # Deployment configuration repo_name = task_id.replace(' ', '-').lower() github_username = settings.GITHUB_USERNAME github_token = settings.GITHUB_TOKEN repo_url_auth = f"https://{github_username}:{github_token}@github.com/{github_username}/{repo_name}.git" repo_url_http = f"https://github.com/{github_username}/{repo_name}" try: # 0. Setup local directory - use /tmp which is always writable base_dir = "/tmp/generated_tasks" local_path = os.path.join(base_dir, task_id) # --- ROBUST CLEANUP LOGIC --- # Crucial: Cleans up local directory before cloning or creating a new repo. if os.path.exists(local_path): print(f"--- [CLEANUP] Deleting existing local directory: {local_path} ---") def onerror(func, path, exc_info): """Error handler for shutil.rmtree to handle permission issues.""" if exc_info[0] is PermissionError or 'WinError 5' in str(exc_info[1]): os.chmod(path, stat.S_IWUSR) func(path) else: raise try: shutil.rmtree(local_path, onerror=onerror) print("--- [CLEANUP] Directory deleted successfully. ---") except Exception as e: print(f"!!! CRITICAL: Failed to clean up directory. Error: {e}") raise Exception(f"Failed to perform local cleanup: {e}") # Create the fresh, EMPTY directory (ready for clone or init) os.makedirs(local_path, exist_ok=True) # --- END ROBUST CLEANUP --- # 1. SETUP REPO (Clone or Init) # MUST run before any files are saved to local_path. print(f"--- [DEPLOYMENT] Setting up local Git repository for Round {round_index}... ---") repo = await setup_local_repo( local_path=local_path, repo_name=repo_name, repo_url_auth=repo_url_auth, repo_url_http=repo_url_http, round_index=round_index ) # 2. Process Attachments for LLM Input image_parts = [] attachment_list_for_llm_prompt = [] for attachment in attachments: # Check for image parts for LLM input if is_image_data_uri(attachment.url): gemini_part = data_uri_to_gemini_part(attachment.url) if gemini_part: image_parts.append(gemini_part) # List all attachment names for the prompt attachment_list_for_llm_prompt.append(attachment.name) print(f"--- [LLM_INPUT] Found {len(image_parts)} image(s) to pass to LLM. ---") attachment_list_str = ", ".join(attachment_list_for_llm_prompt) # 3. AI Code Generation - Adapt Prompt for Round 2 # --- MODIFICATION START: Adapting the LLM Prompt --- if round_index > 1: # For Round 2+, tell the LLM it's modifying existing work llm_prompt = ( f"UPDATE INSTRUCTION (ROUND {round_index}): You must modify the existing project files " f"(index.html, README.md, LICENSE) based on this new brief: '{brief}'. " "You must replace all content in 'index.html', 'README.md', and 'LICENSE' with new, complete versions " "that implement the requested modifications. The 'index.html' must remain a single, complete, " "fully responsive HTML file using Tailwind CSS." ) else: # For Round 1, generate a new application llm_prompt = ( f"Generate a complete, single-file HTML web application to achieve the following: {brief}. " "Ensure your code is fully responsive, and uses Tailwind CSS. " "Provide the code for the main web app, a README.md, and an MIT LICENSE." ) # Add attachment context if files were provided, regardless of round. if attachment_list_str: llm_prompt += f"\nAdditional context: The following files are available in the project root: {attachment_list_str}. " llm_prompt += f"Ensure your code references these files correctly (if applicable)." # --- MODIFICATION END --- # Call LLM generated_files = await call_llm_for_code(llm_prompt, task_id, image_parts) # 4. Save Generated Code Locally # This overwrites the cloned files (index.html, README.md, LICENSE) await save_generated_files_locally(task_id, generated_files) # 5. Save Attachments Locally # This adds attachments (like data.csv) to the local directory # The attachment saving now happens *after* the clone/init, resolving the Round 2 error. await save_attachments_locally(local_path, attachments) # 6. COMMIT AND PUBLISH print(f"--- [DEPLOYMENT] Committing and Publishing task {task_id}, Round {round_index} to GitHub... ---") deployment_info = await commit_and_publish( repo=repo, task_id=task_id, round_index=round_index, repo_name=repo_name ) repo_url = deployment_info["repo_url"] commit_sha = deployment_info["commit_sha"] pages_url = deployment_info["pages_url"] print(f"--- [DEPLOYMENT] Success! Repo: {repo_url}, Pages: {pages_url} ---") # 7. Notify the Evaluation Server await notify_evaluation_server( evaluation_url=evaluation_url, email=email, task_id=task_id, round_index=round_index, nonce=nonce, repo_url=repo_url, commit_sha=commit_sha, pages_url=pages_url ) except Exception as e: print(f"--- [CRITICAL FAILURE] Task {task_id} failed during processing: {e} ---") print(f"--- [PROCESS END] Background task for {task_id} completed. ---") # --- FastAPI Endpoint --- @app.post("/ready", status_code=200) async def receive_task(task_data: TaskRequest): """ API endpoint that receives the task payload. It verifies the secret and starts the generation/deployment process in the background. """ global received_task_data # 1. SECRET VERIFICATION (CRITICAL PROJECT REQUIREMENT) if not verify_secret(task_data.secret): print(f"--- FAILED SECRET VERIFICATION for task {task_data.task} ---") raise HTTPException( status_code=401, detail="Unauthorized: Secret does not match configured student secret." ) # Store data and print initial confirmation received_task_data = task_data.dict() print("--- TASK RECEIVED SUCCESSFULLY ---") print(f"Task ID: {received_task_data['task']}, Round: {received_task_data['round']}") # Start the processing function in the background asyncio.create_task(generate_files_and_deploy(task_data)) # Respond immediately with 200 OK to the evaluation server return JSONResponse( status_code=200, content={"status": "ready", "message": f"Task {task_data.task} received and processing started."} ) @app.get("/") async def root(): return {"message": "Task Receiver Service is running. Post to /ready to submit a task."} @app.get("/status") async def get_status(): global received_task_data if received_task_data: # Note: This status only shows the last received request, not the live status of the background task. return {"last_received_task": received_task_data} else: return {"message": "Awaiting first task submission to /ready"}