import subprocess import sys import os import re import traceback from pathlib import Path from fastapi import FastAPI, HTTPException, Request # type: ignore from fastapi.middleware.cors import CORSMiddleware # type: ignore from pydantic import BaseModel, ValidationError # type: ignore from slowapi import Limiter, _rate_limit_exceeded_handler # type: ignore from slowapi.util import get_remote_address # type: ignore from slowapi.errors import RateLimitExceeded # type: ignore from compiler import ErasCompiler # type: ignore # Load secret from .env file or environment variable def load_secret(): """Load the API secret from .env file or environment variable""" # Try environment variable first (for production) secret = os.getenv("SECRET_PASSWORD") # If not in env, try reading from .env file (for local development) if not secret: env_path = Path(__file__).parent.parent / ".env" if env_path.exists(): with open(env_path, 'r') as f: for line in f: line = line.strip() if line and not line.startswith('#') and '=' in line: key, value = line.split('=', 1) if key.strip() == 'SECRET_PASSWORD': secret = value.strip() # Remove surrounding quotes if present if (secret.startswith('"') and secret.endswith('"')) or \ (secret.startswith("'") and secret.endswith("'")): secret = secret[1:-1] break if not secret: raise ValueError("SECRET_PASSWORD must be set in environment variable or .env file") return secret API_SECRET = load_secret() # Initialize FastAPI and Rate Limiter app = FastAPI(title="ErasLang IDE Backend") limiter = Limiter(key_func=get_remote_address) app.state.limiter = limiter app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler) # SWE Best Practice: Enable CORS so your frontend can communicate with this API app.add_middleware( CORSMiddleware, allow_origins=["https://eras-lang-ide.vercel.app", "http://localhost:5173"], allow_credentials=True, allow_methods=["GET", "POST", "OPTIONS"], # Only allow necessary HTTP methods allow_headers=["Content-Type", "Authorization", "Accept", "X-Requested-With"], ) compiler = ErasCompiler() class PerformanceRequest(BaseModel): code: str inputs: list[int] = [] password: str # Secret password for authentication (required) def parse_error_message(error_output: str) -> dict: """ Parse error messages from worker.py to extract structured information. Returns a dictionary with error details. """ error_info = { "message": error_output.strip(), "type": "unknown", "line": None, "variable": None, "suggestion": None } # Parse EXILE ERROR (ZeroDivisionError) if "EXILE ERROR" in error_output: error_info["type"] = "division_by_zero" error_info["message"] = "You tried to divide by zero. You're on your own, kid." error_info["suggestion"] = "Check your math operations - make sure you're not dividing by zero." # Parse VAULT ERROR (NameError) elif "VAULT ERROR" in error_output: error_info["type"] = "undefined_variable" # Extract variable name and line number var_match = re.search(r"'(.+?)' was never declared at Line (\d+)", error_output) if var_match: error_info["variable"] = var_match.group(1) error_info["line"] = var_match.group(2) error_info["message"] = f"Variable '{error_info['variable']}' was never declared at Line {error_info['line']}. You left a Blank Space." error_info["suggestion"] = f"Use 'BLANK SPACE {error_info['variable']}' to declare the variable before using it." else: error_info["message"] = error_output.strip() # Parse BAD BLOOD (SyntaxError) elif "BAD BLOOD" in error_output: error_info["type"] = "syntax_error" line_match = re.search(r"Line (\d+)", error_output) if line_match: error_info["line"] = line_match.group(1) error_info["message"] = f"The bridge is broken. Syntax error near Line {error_info['line']}." else: error_info["message"] = error_output.strip() error_info["suggestion"] = "Check your ErasLang syntax - make sure all keywords are spelled correctly and properly formatted." # Parse RUNTIME ERROR elif "RUNTIME ERROR" in error_output: error_info["type"] = "runtime_error" line_match = re.search(r"Line (\d+):", error_output) if line_match: error_info["line"] = line_match.group(1) # Extract the actual error message msg_match = re.search(r"RUNTIME ERROR at Line \d+: (.+)", error_output) if msg_match: error_info["message"] = msg_match.group(1).strip() else: error_info["message"] = error_output.strip() error_info["suggestion"] = "Check your code logic - there may be an issue with how variables are being used." # Parse ValueError (not enough inputs) elif "Not enough inputs provided" in error_output: error_info["type"] = "insufficient_inputs" error_info["message"] = "Not enough inputs provided. Expected more QUESTION...? calls." error_info["suggestion"] = "Make sure you provide enough input values in the inputs array for all QUESTION...? calls in your code." # Default: use the error output as-is else: error_info["message"] = error_output.strip() if not error_info["message"]: error_info["message"] = "An unknown error occurred during execution." error_info["suggestion"] = "Please check your code and try again." return error_info def format_transpilation_error(error: Exception) -> dict: """ Format transpilation errors with helpful context. """ error_info = { "message": str(error), "type": "transpilation_error", "line": None, "suggestion": None } error_str = str(error) # Try to extract line number from error message line_match = re.search(r'line (\d+)', error_str, re.IGNORECASE) if line_match: error_info["line"] = line_match.group(1) # Provide specific suggestions based on error type if "not defined" in error_str.lower() or "name" in error_str.lower(): error_info["suggestion"] = "Make sure all variables are declared with 'BLANK SPACE' before use." elif "syntax" in error_str.lower(): error_info["suggestion"] = "Check your ErasLang syntax. Make sure all keywords are spelled correctly." elif "unexpected" in error_str.lower(): error_info["suggestion"] = "There's an unexpected token in your code. Check for typos or missing keywords." else: error_info["suggestion"] = "Review your ErasLang code for syntax errors or unrecognized keywords." return error_info def validate_code_structure(code: str) -> dict: """ Validate ErasLang code structure before transpilation. Returns None if valid, or error dict if invalid. """ lines = code.splitlines() non_empty_lines = [line.strip() for line in lines if line.strip() and not line.strip().startswith("DEAR JOHN")] if not non_empty_lines: return { "message": "Code contains no valid ErasLang instructions. Only comments or empty lines found.", "type": "empty_code", "suggestion": "Add ErasLang code starting with 'ARE YOU READY FOR IT?' and ending with 'LONG LIVE'." } # Check for BEGIN_MAIN has_begin_main = any("ARE YOU READY FOR IT?" in line for line in lines) if not has_begin_main: return { "message": "Code is missing 'ARE YOU READY FOR IT?' - every ErasLang program must start with this.", "type": "missing_begin_main", "suggestion": "Start your code with 'ARE YOU READY FOR IT?' to begin the main function." } # Check for END_MAIN has_end_main = any("LONG LIVE" in line for line in lines) if not has_end_main: return { "message": "Code is missing 'LONG LIVE' - every ErasLang program must end with this.", "type": "missing_end_main", "suggestion": "End your code with 'LONG LIVE' to terminate the program." } # Check for unmatched blocks begin_main_count = sum(1 for line in lines if "ARE YOU READY FOR IT?" in line) end_main_count = sum(1 for line in lines if "LONG LIVE" in line) if begin_main_count > end_main_count: return { "message": f"Found {begin_main_count} 'ARE YOU READY FOR IT?' but only {end_main_count} 'LONG LIVE'. Blocks are not properly closed.", "type": "unmatched_blocks", "suggestion": "Make sure every 'ARE YOU READY FOR IT?' has a matching 'LONG LIVE'." } if end_main_count > begin_main_count: return { "message": f"Found {end_main_count} 'LONG LIVE' but only {begin_main_count} 'ARE YOU READY FOR IT?'. Too many closing statements.", "type": "unmatched_blocks", "suggestion": "Make sure you have the correct number of 'LONG LIVE' statements matching your 'ARE YOU READY FOR IT?' statements." } # Check for unmatched IF blocks if_count = sum(1 for line in lines if "I KNEW YOU WERE TROUBLE" in line) end_if_count = sum(1 for line in lines if line.strip() == "EXILE") if if_count != end_if_count: return { "message": f"Found {if_count} 'I KNEW YOU WERE TROUBLE' but {end_if_count} 'EXILE'. IF blocks are not properly closed.", "type": "unmatched_if_blocks", "suggestion": "Make sure every 'I KNEW YOU WERE TROUBLE' has a matching 'EXILE' to close the IF block." } # Check for unmatched WHILE blocks while_count = sum(1 for line in lines if "IS IT OVER NOW?" in line) end_while_count = sum(1 for line in lines if "OUT OF THE WOODS" in line) if while_count != end_while_count: return { "message": f"Found {while_count} 'IS IT OVER NOW?' but {end_while_count} 'OUT OF THE WOODS'. WHILE loops are not properly closed.", "type": "unmatched_while_blocks", "suggestion": "Make sure every 'IS IT OVER NOW?' has a matching 'OUT OF THE WOODS' to close the WHILE loop." } # Check for unmatched MATH blocks begin_math_count = sum(1 for line in lines if "SHAKE IT OFF" in line) end_math_count = sum(1 for line in lines if "CALL IT WHAT YOU WANT" in line) if begin_math_count != end_math_count: return { "message": f"Found {begin_math_count} 'SHAKE IT OFF' but {end_math_count} 'CALL IT WHAT YOU WANT'. Math blocks are not properly closed.", "type": "unmatched_math_blocks", "suggestion": "Make sure every 'SHAKE IT OFF' has a matching 'CALL IT WHAT YOU WANT' to close the math block." } # Check for unmatched FUNCTION blocks begin_func_count = sum(1 for line in lines if "FROM THE VAULT" in line) end_func_count = sum(1 for line in lines if "CLEAN" in line) if begin_func_count != end_func_count: return { "message": f"Found {begin_func_count} 'FROM THE VAULT' but {end_func_count} 'CLEAN'. Function blocks are not properly closed.", "type": "unmatched_function_blocks", "suggestion": "Make sure every 'FROM THE VAULT' has a matching 'CLEAN' to close the function definition." } return None # Code structure is valid def count_input_calls(code: str) -> int: """ Count the number of QUESTION...? calls in the code. """ # Count standalone QUESTION...? (INPUT token) standalone_count = sum(1 for line in code.splitlines() if line.strip() == "QUESTION...?") # Count QUESTION...? in "YOU BELONG WITH ME QUESTION...?" inline_count = sum(1 for line in code.splitlines() if "YOU BELONG WITH ME QUESTION...?" in line) return standalone_count + inline_count def validate_inputs(code: str, inputs: list[int]) -> dict: """ Validate that the inputs array matches the number of QUESTION...? calls. Returns None if valid, or error dict if invalid. """ input_calls = count_input_calls(code) if input_calls == 0: # No inputs needed, but if inputs are provided, warn if inputs and len(inputs) > 0: return { "message": f"Your code has no QUESTION...? calls, but you provided {len(inputs)} input value(s). These will be ignored.", "type": "unused_inputs", "suggestion": "Remove the inputs array or add QUESTION...? calls to your code if you need user input." } return None # No inputs needed, none provided - valid # Code requires inputs if not inputs or len(inputs) == 0: return { "message": f"Your code has {input_calls} QUESTION...? call(s) but no input values were provided.", "type": "missing_inputs", "suggestion": f"Provide {input_calls} integer value(s) in the inputs array to match your QUESTION...? calls." } if len(inputs) < input_calls: return { "message": f"Your code has {input_calls} QUESTION...? call(s) but only {len(inputs)} input value(s) were provided.", "type": "insufficient_inputs", "suggestion": f"Provide {input_calls} integer value(s) in the inputs array. You need {input_calls - len(inputs)} more value(s)." } if len(inputs) > input_calls: return { "message": f"Your code has {input_calls} QUESTION...? call(s) but {len(inputs)} input value(s) were provided. Extra inputs will be ignored.", "type": "excess_inputs", "suggestion": f"Provide exactly {input_calls} integer value(s) in the inputs array, or remove the extra {len(inputs) - input_calls} value(s)." } return None # Inputs match perfectly def validate_syntax(code: str) -> dict: """ Perform basic syntax validation before transpilation. Returns None if valid, or error dict if issues found. """ lines = code.splitlines() # Check for common syntax issues for i, line in enumerate(lines, 1): stripped = line.strip() if not stripped or stripped.startswith("DEAR JOHN"): continue # Check for invalid variable names in BLANK SPACE if stripped.startswith("BLANK SPACE"): var_name = stripped.replace("BLANK SPACE", "").strip() if not var_name: return { "message": f"Line {i}: 'BLANK SPACE' requires a variable name.", "type": "syntax_error", "line": str(i), "suggestion": "Use 'BLANK SPACE [variable_name]' to declare a variable. Example: 'BLANK SPACE myVar'" } # Check for invalid characters in variable name if not re.match(r'^[a-zA-Z_][a-zA-Z0-9_]*$', var_name): return { "message": f"Line {i}: Invalid variable name '{var_name}'. Variable names must start with a letter or underscore and contain only letters, numbers, and underscores.", "type": "syntax_error", "line": str(i), "suggestion": f"Use a valid variable name like 'myVar', 'x', or '_temp'. Invalid: '{var_name}'" } # Check for invalid usage of QUESTION...? without BLANK SPACE if stripped == "QUESTION...?": # This is valid as standalone, but check context # Look backwards for BLANK SPACE found_declaration = False for j in range(i - 1, max(0, i - 10), -1): prev_line = lines[j].strip() if prev_line.startswith("BLANK SPACE"): found_declaration = True break if prev_line and not prev_line.startswith("DEAR JOHN"): break if not found_declaration: return { "message": f"Line {i}: 'QUESTION...?' is used without a preceding variable declaration.", "type": "syntax_error", "line": str(i), "suggestion": "Declare a variable with 'BLANK SPACE [name]' before using 'QUESTION...?', or use 'YOU BELONG WITH ME QUESTION...?' to assign input to the last declared variable." } return None # Syntax appears valid @app.get("/") @limiter.limit("20/minute") async def root(request: Request): return {"message": "ErasLang Stage is Live. Ready for the Performance?"} @app.get("/health") async def health(): """ Health check endpoint - no CORS restrictions, no rate limiting. Useful for monitoring and testing the deployed backend. """ return {"status": "healthy", "service": "ErasLang API"} @app.post("/perform") @limiter.limit("10/minute") async def run_eraslang(request: Request, body: PerformanceRequest): """ Main execution endpoint. 1. Validates secret password 2. Transpiles ErasLang -> Python 3. Spawns worker.py subprocess 4. Returns results or errors """ # --- Step 0: Input Validation --- if not body.password: return { "status": "authentication_error", "output": "", "error": { "message": "Password is required for authentication.", "type": "missing_password", "suggestion": "Please provide a valid password in your request." } } if body.password != API_SECRET: return { "status": "authentication_error", "output": "", "error": { "message": "Invalid password. Access denied.", "type": "invalid_password", "suggestion": "Please check your password and try again." } } # Validate code is not empty if not body.code or not body.code.strip(): return { "status": "validation_error", "output": "", "error": { "message": "Code cannot be empty. Please provide ErasLang code to execute.", "type": "empty_code", "suggestion": "Write some ErasLang code starting with 'ARE YOU READY FOR IT?' and ending with 'LONG LIVE'." } } # --- Step 0.5: Pre-transpilation Validation --- # Validate code structure structure_error = validate_code_structure(body.code) if structure_error: return { "status": "validation_error", "output": "", "error": structure_error } # Validate syntax syntax_error = validate_syntax(body.code) if syntax_error: return { "status": "validation_error", "output": "", "error": syntax_error } # Validate inputs match QUESTION...? calls input_error = validate_inputs(body.code, body.inputs) if input_error: # For unused/excess inputs, we can still proceed but warn # For missing/insufficient inputs, we should error if input_error["type"] in ["missing_inputs", "insufficient_inputs"]: return { "status": "validation_error", "output": "", "error": input_error } # For unused/excess, we'll proceed but the error will be in the response # (Actually, let's proceed for now and handle at runtime) # --- Step 1: Transpilation --- try: # We ensure the compiler adds # ERAS_LINE_X comments for our worker python_source = compiler.transpile(body.code) # Validate that transpilation produced valid code if not python_source or not python_source.strip(): return { "status": "transpilation_error", "output": "", "error": { "message": "Transpilation produced empty code. Your ErasLang code may be missing required elements.", "type": "empty_transpilation", "suggestion": "Make sure your code includes 'ARE YOU READY FOR IT?' to start and 'LONG LIVE' to end." } } # Validate Python syntax of transpiled code try: compile(python_source, '', 'exec') except SyntaxError as syn_err: return { "status": "transpilation_error", "output": "", "error": { "message": f"Transpiled code has a syntax error at line {syn_err.lineno}: {syn_err.msg}", "type": "transpilation_syntax_error", "line": str(syn_err.lineno) if syn_err.lineno else None, "suggestion": "This usually indicates an issue with your ErasLang code structure. Check for unmatched blocks or invalid syntax." } } # Count how many input() calls we'll need to replace input_call_count = len(re.findall(r'int\s*\(\s*input\s*\(\s*\)\s*\)', python_source)) # If inputs are provided, inject an input provider function if body.inputs or input_call_count > 0: # Validate inputs match input calls if input_call_count > 0: if not body.inputs or len(body.inputs) < input_call_count: return { "status": "validation_error", "output": "", "error": { "message": f"Your code requires {input_call_count} input value(s) but only {len(body.inputs) if body.inputs else 0} were provided.", "type": "insufficient_inputs", "suggestion": f"Provide {input_call_count} integer value(s) in the inputs array. You need {input_call_count - (len(body.inputs) if body.inputs else 0)} more value(s)." } } # Replace all int(input()) calls with our custom input function python_source = re.sub( r'int\s*\(\s*input\s*\(\s*\)\s*\)', 'int(_eras_input())', python_source ) # Prepend the input provider code input_provider = f""" # Input provider for prefilled inputs _input_index = 0 _input_values = {body.inputs if body.inputs else []} def _eras_input(): global _input_index if _input_index < len(_input_values): value = _input_values[_input_index] _input_index += 1 return str(value) else: raise ValueError("Not enough inputs provided. Expected more QUESTION...? calls.") """ python_source = input_provider + python_source except Exception as e: # Format transpilation errors with helpful context error_info = format_transpilation_error(e) return { "status": "transpilation_error", "output": "", "error": error_info } # --- Step 2: Subprocess Execution --- try: # We call worker.py and pipe the python_source into its stdin worker_path = os.path.join(os.path.dirname(__file__), "worker.py") if not os.path.exists(worker_path): return { "status": "system_error", "output": "", "error": { "message": "Worker script not found. System configuration error.", "type": "missing_worker", "suggestion": "Please contact support - this is a system configuration issue." } } process = subprocess.Popen( [sys.executable, worker_path], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True ) # We set a strict timeout (e.g., 10 seconds) to prevent infinite loops try: stdout_data, stderr_data = process.communicate( input=python_source, timeout=10 ) except subprocess.TimeoutExpired: process.kill() process.wait() # Ensure process is terminated return { "status": "timeout", "output": "", "error": { "message": "Performance Timed Out. Did you get stuck in an 'Is it Over Now?' loop?", "type": "execution_timeout", "line": None, "suggestion": "Your code took longer than 10 seconds to execute. Check for infinite loops or very long-running operations. Make sure your 'IS IT OVER NOW?' loops have proper exit conditions." } } # --- Step 3: Handle Results --- if process.returncode == 0: return { "status": "success", "output": stdout_data, "error": None } else: # The worker.py prints thematic errors to stdout on failure # Parse the error message for better structure error_output = stdout_data if stdout_data.strip() else stderr_data error_info = parse_error_message(error_output) return { "status": "runtime_error", "output": stdout_data if process.returncode != 0 else "", "error": error_info } except FileNotFoundError: return { "status": "system_error", "output": "", "error": { "message": "Python interpreter not found. System configuration error.", "type": "missing_python", "suggestion": "Please contact support - this is a system configuration issue." } } except PermissionError: return { "status": "system_error", "output": "", "error": { "message": "Permission denied when trying to execute code. System configuration error.", "type": "permission_error", "suggestion": "Please contact support - this is a system configuration issue." } } except Exception as e: # Log the full error for debugging (in production, use proper logging) error_trace = traceback.format_exc() return { "status": "system_error", "output": "", "error": { "message": f"An unexpected system error occurred: {str(e)}", "type": "unexpected_error", "suggestion": "Please try again. If the problem persists, contact support with details about what you were trying to do." } } if __name__ == "__main__": import uvicorn # type: ignore # Start the server on port 8000 uvicorn.run(app, host="0.0.0.0", port=8000)