ErasLangBackend / app.py
42Cummer's picture
Upload 2 files
14a97bc verified
import subprocess
import sys
import os
import re
import traceback
from pathlib import Path
from fastapi import FastAPI, HTTPException, Request # type: ignore
from fastapi.middleware.cors import CORSMiddleware # type: ignore
from pydantic import BaseModel, ValidationError # type: ignore
from slowapi import Limiter, _rate_limit_exceeded_handler # type: ignore
from slowapi.util import get_remote_address # type: ignore
from slowapi.errors import RateLimitExceeded # type: ignore
from compiler import ErasCompiler # type: ignore
# Load secret from .env file or environment variable
def load_secret():
"""Load the API secret from .env file or environment variable"""
# Try environment variable first (for production)
secret = os.getenv("SECRET_PASSWORD")
# If not in env, try reading from .env file (for local development)
if not secret:
env_path = Path(__file__).parent.parent / ".env"
if env_path.exists():
with open(env_path, 'r') as f:
for line in f:
line = line.strip()
if line and not line.startswith('#') and '=' in line:
key, value = line.split('=', 1)
if key.strip() == 'SECRET_PASSWORD':
secret = value.strip()
# Remove surrounding quotes if present
if (secret.startswith('"') and secret.endswith('"')) or \
(secret.startswith("'") and secret.endswith("'")):
secret = secret[1:-1]
break
if not secret:
raise ValueError("SECRET_PASSWORD must be set in environment variable or .env file")
return secret
API_SECRET = load_secret()
# Initialize FastAPI and Rate Limiter
app = FastAPI(title="ErasLang IDE Backend")
limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
# SWE Best Practice: Enable CORS so your frontend can communicate with this API
app.add_middleware(
CORSMiddleware,
allow_origins=["https://eras-lang-ide.vercel.app", "http://localhost:5173"],
allow_credentials=True,
allow_methods=["GET", "POST", "OPTIONS"], # Only allow necessary HTTP methods
allow_headers=["Content-Type", "Authorization", "Accept", "X-Requested-With"],
)
compiler = ErasCompiler()
class PerformanceRequest(BaseModel):
code: str
inputs: list[int] = []
password: str # Secret password for authentication (required)
def parse_error_message(error_output: str) -> dict:
"""
Parse error messages from worker.py to extract structured information.
Returns a dictionary with error details.
"""
error_info = {
"message": error_output.strip(),
"type": "unknown",
"line": None,
"variable": None,
"suggestion": None
}
# Parse EXILE ERROR (ZeroDivisionError)
if "EXILE ERROR" in error_output:
error_info["type"] = "division_by_zero"
error_info["message"] = "You tried to divide by zero. You're on your own, kid."
error_info["suggestion"] = "Check your math operations - make sure you're not dividing by zero."
# Parse VAULT ERROR (NameError)
elif "VAULT ERROR" in error_output:
error_info["type"] = "undefined_variable"
# Extract variable name and line number
var_match = re.search(r"'(.+?)' was never declared at Line (\d+)", error_output)
if var_match:
error_info["variable"] = var_match.group(1)
error_info["line"] = var_match.group(2)
error_info["message"] = f"Variable '{error_info['variable']}' was never declared at Line {error_info['line']}. You left a Blank Space."
error_info["suggestion"] = f"Use 'BLANK SPACE {error_info['variable']}' to declare the variable before using it."
else:
error_info["message"] = error_output.strip()
# Parse BAD BLOOD (SyntaxError)
elif "BAD BLOOD" in error_output:
error_info["type"] = "syntax_error"
line_match = re.search(r"Line (\d+)", error_output)
if line_match:
error_info["line"] = line_match.group(1)
error_info["message"] = f"The bridge is broken. Syntax error near Line {error_info['line']}."
else:
error_info["message"] = error_output.strip()
error_info["suggestion"] = "Check your ErasLang syntax - make sure all keywords are spelled correctly and properly formatted."
# Parse RUNTIME ERROR
elif "RUNTIME ERROR" in error_output:
error_info["type"] = "runtime_error"
line_match = re.search(r"Line (\d+):", error_output)
if line_match:
error_info["line"] = line_match.group(1)
# Extract the actual error message
msg_match = re.search(r"RUNTIME ERROR at Line \d+: (.+)", error_output)
if msg_match:
error_info["message"] = msg_match.group(1).strip()
else:
error_info["message"] = error_output.strip()
error_info["suggestion"] = "Check your code logic - there may be an issue with how variables are being used."
# Parse ValueError (not enough inputs)
elif "Not enough inputs provided" in error_output:
error_info["type"] = "insufficient_inputs"
error_info["message"] = "Not enough inputs provided. Expected more QUESTION...? calls."
error_info["suggestion"] = "Make sure you provide enough input values in the inputs array for all QUESTION...? calls in your code."
# Default: use the error output as-is
else:
error_info["message"] = error_output.strip()
if not error_info["message"]:
error_info["message"] = "An unknown error occurred during execution."
error_info["suggestion"] = "Please check your code and try again."
return error_info
def format_transpilation_error(error: Exception) -> dict:
"""
Format transpilation errors with helpful context.
"""
error_info = {
"message": str(error),
"type": "transpilation_error",
"line": None,
"suggestion": None
}
error_str = str(error)
# Try to extract line number from error message
line_match = re.search(r'line (\d+)', error_str, re.IGNORECASE)
if line_match:
error_info["line"] = line_match.group(1)
# Provide specific suggestions based on error type
if "not defined" in error_str.lower() or "name" in error_str.lower():
error_info["suggestion"] = "Make sure all variables are declared with 'BLANK SPACE' before use."
elif "syntax" in error_str.lower():
error_info["suggestion"] = "Check your ErasLang syntax. Make sure all keywords are spelled correctly."
elif "unexpected" in error_str.lower():
error_info["suggestion"] = "There's an unexpected token in your code. Check for typos or missing keywords."
else:
error_info["suggestion"] = "Review your ErasLang code for syntax errors or unrecognized keywords."
return error_info
def validate_code_structure(code: str) -> dict:
"""
Validate ErasLang code structure before transpilation.
Returns None if valid, or error dict if invalid.
"""
lines = code.splitlines()
non_empty_lines = [line.strip() for line in lines if line.strip() and not line.strip().startswith("DEAR JOHN")]
if not non_empty_lines:
return {
"message": "Code contains no valid ErasLang instructions. Only comments or empty lines found.",
"type": "empty_code",
"suggestion": "Add ErasLang code starting with 'ARE YOU READY FOR IT?' and ending with 'LONG LIVE'."
}
# Check for BEGIN_MAIN
has_begin_main = any("ARE YOU READY FOR IT?" in line for line in lines)
if not has_begin_main:
return {
"message": "Code is missing 'ARE YOU READY FOR IT?' - every ErasLang program must start with this.",
"type": "missing_begin_main",
"suggestion": "Start your code with 'ARE YOU READY FOR IT?' to begin the main function."
}
# Check for END_MAIN
has_end_main = any("LONG LIVE" in line for line in lines)
if not has_end_main:
return {
"message": "Code is missing 'LONG LIVE' - every ErasLang program must end with this.",
"type": "missing_end_main",
"suggestion": "End your code with 'LONG LIVE' to terminate the program."
}
# Check for unmatched blocks
begin_main_count = sum(1 for line in lines if "ARE YOU READY FOR IT?" in line)
end_main_count = sum(1 for line in lines if "LONG LIVE" in line)
if begin_main_count > end_main_count:
return {
"message": f"Found {begin_main_count} 'ARE YOU READY FOR IT?' but only {end_main_count} 'LONG LIVE'. Blocks are not properly closed.",
"type": "unmatched_blocks",
"suggestion": "Make sure every 'ARE YOU READY FOR IT?' has a matching 'LONG LIVE'."
}
if end_main_count > begin_main_count:
return {
"message": f"Found {end_main_count} 'LONG LIVE' but only {begin_main_count} 'ARE YOU READY FOR IT?'. Too many closing statements.",
"type": "unmatched_blocks",
"suggestion": "Make sure you have the correct number of 'LONG LIVE' statements matching your 'ARE YOU READY FOR IT?' statements."
}
# Check for unmatched IF blocks
if_count = sum(1 for line in lines if "I KNEW YOU WERE TROUBLE" in line)
end_if_count = sum(1 for line in lines if line.strip() == "EXILE")
if if_count != end_if_count:
return {
"message": f"Found {if_count} 'I KNEW YOU WERE TROUBLE' but {end_if_count} 'EXILE'. IF blocks are not properly closed.",
"type": "unmatched_if_blocks",
"suggestion": "Make sure every 'I KNEW YOU WERE TROUBLE' has a matching 'EXILE' to close the IF block."
}
# Check for unmatched WHILE blocks
while_count = sum(1 for line in lines if "IS IT OVER NOW?" in line)
end_while_count = sum(1 for line in lines if "OUT OF THE WOODS" in line)
if while_count != end_while_count:
return {
"message": f"Found {while_count} 'IS IT OVER NOW?' but {end_while_count} 'OUT OF THE WOODS'. WHILE loops are not properly closed.",
"type": "unmatched_while_blocks",
"suggestion": "Make sure every 'IS IT OVER NOW?' has a matching 'OUT OF THE WOODS' to close the WHILE loop."
}
# Check for unmatched MATH blocks
begin_math_count = sum(1 for line in lines if "SHAKE IT OFF" in line)
end_math_count = sum(1 for line in lines if "CALL IT WHAT YOU WANT" in line)
if begin_math_count != end_math_count:
return {
"message": f"Found {begin_math_count} 'SHAKE IT OFF' but {end_math_count} 'CALL IT WHAT YOU WANT'. Math blocks are not properly closed.",
"type": "unmatched_math_blocks",
"suggestion": "Make sure every 'SHAKE IT OFF' has a matching 'CALL IT WHAT YOU WANT' to close the math block."
}
# Check for unmatched FUNCTION blocks
begin_func_count = sum(1 for line in lines if "FROM THE VAULT" in line)
end_func_count = sum(1 for line in lines if "CLEAN" in line)
if begin_func_count != end_func_count:
return {
"message": f"Found {begin_func_count} 'FROM THE VAULT' but {end_func_count} 'CLEAN'. Function blocks are not properly closed.",
"type": "unmatched_function_blocks",
"suggestion": "Make sure every 'FROM THE VAULT' has a matching 'CLEAN' to close the function definition."
}
return None # Code structure is valid
def count_input_calls(code: str) -> int:
"""
Count the number of QUESTION...? calls in the code.
"""
# Count standalone QUESTION...? (INPUT token)
standalone_count = sum(1 for line in code.splitlines() if line.strip() == "QUESTION...?")
# Count QUESTION...? in "YOU BELONG WITH ME QUESTION...?"
inline_count = sum(1 for line in code.splitlines() if "YOU BELONG WITH ME QUESTION...?" in line)
return standalone_count + inline_count
def validate_inputs(code: str, inputs: list[int]) -> dict:
"""
Validate that the inputs array matches the number of QUESTION...? calls.
Returns None if valid, or error dict if invalid.
"""
input_calls = count_input_calls(code)
if input_calls == 0:
# No inputs needed, but if inputs are provided, warn
if inputs and len(inputs) > 0:
return {
"message": f"Your code has no QUESTION...? calls, but you provided {len(inputs)} input value(s). These will be ignored.",
"type": "unused_inputs",
"suggestion": "Remove the inputs array or add QUESTION...? calls to your code if you need user input."
}
return None # No inputs needed, none provided - valid
# Code requires inputs
if not inputs or len(inputs) == 0:
return {
"message": f"Your code has {input_calls} QUESTION...? call(s) but no input values were provided.",
"type": "missing_inputs",
"suggestion": f"Provide {input_calls} integer value(s) in the inputs array to match your QUESTION...? calls."
}
if len(inputs) < input_calls:
return {
"message": f"Your code has {input_calls} QUESTION...? call(s) but only {len(inputs)} input value(s) were provided.",
"type": "insufficient_inputs",
"suggestion": f"Provide {input_calls} integer value(s) in the inputs array. You need {input_calls - len(inputs)} more value(s)."
}
if len(inputs) > input_calls:
return {
"message": f"Your code has {input_calls} QUESTION...? call(s) but {len(inputs)} input value(s) were provided. Extra inputs will be ignored.",
"type": "excess_inputs",
"suggestion": f"Provide exactly {input_calls} integer value(s) in the inputs array, or remove the extra {len(inputs) - input_calls} value(s)."
}
return None # Inputs match perfectly
def validate_syntax(code: str) -> dict:
"""
Perform basic syntax validation before transpilation.
Returns None if valid, or error dict if issues found.
"""
lines = code.splitlines()
# Check for common syntax issues
for i, line in enumerate(lines, 1):
stripped = line.strip()
if not stripped or stripped.startswith("DEAR JOHN"):
continue
# Check for invalid variable names in BLANK SPACE
if stripped.startswith("BLANK SPACE"):
var_name = stripped.replace("BLANK SPACE", "").strip()
if not var_name:
return {
"message": f"Line {i}: 'BLANK SPACE' requires a variable name.",
"type": "syntax_error",
"line": str(i),
"suggestion": "Use 'BLANK SPACE [variable_name]' to declare a variable. Example: 'BLANK SPACE myVar'"
}
# Check for invalid characters in variable name
if not re.match(r'^[a-zA-Z_][a-zA-Z0-9_]*$', var_name):
return {
"message": f"Line {i}: Invalid variable name '{var_name}'. Variable names must start with a letter or underscore and contain only letters, numbers, and underscores.",
"type": "syntax_error",
"line": str(i),
"suggestion": f"Use a valid variable name like 'myVar', 'x', or '_temp'. Invalid: '{var_name}'"
}
# Check for invalid usage of QUESTION...? without BLANK SPACE
if stripped == "QUESTION...?":
# This is valid as standalone, but check context
# Look backwards for BLANK SPACE
found_declaration = False
for j in range(i - 1, max(0, i - 10), -1):
prev_line = lines[j].strip()
if prev_line.startswith("BLANK SPACE"):
found_declaration = True
break
if prev_line and not prev_line.startswith("DEAR JOHN"):
break
if not found_declaration:
return {
"message": f"Line {i}: 'QUESTION...?' is used without a preceding variable declaration.",
"type": "syntax_error",
"line": str(i),
"suggestion": "Declare a variable with 'BLANK SPACE [name]' before using 'QUESTION...?', or use 'YOU BELONG WITH ME QUESTION...?' to assign input to the last declared variable."
}
return None # Syntax appears valid
@app.get("/")
@limiter.limit("20/minute")
async def root(request: Request):
return {"message": "ErasLang Stage is Live. Ready for the Performance?"}
@app.get("/health")
async def health():
"""
Health check endpoint - no CORS restrictions, no rate limiting.
Useful for monitoring and testing the deployed backend.
"""
return {"status": "healthy", "service": "ErasLang API"}
@app.post("/perform")
@limiter.limit("10/minute")
async def run_eraslang(request: Request, body: PerformanceRequest):
"""
Main execution endpoint.
1. Validates secret password
2. Transpiles ErasLang -> Python
3. Spawns worker.py subprocess
4. Returns results or errors
"""
# --- Step 0: Input Validation ---
if not body.password:
return {
"status": "authentication_error",
"output": "",
"error": {
"message": "Password is required for authentication.",
"type": "missing_password",
"suggestion": "Please provide a valid password in your request."
}
}
if body.password != API_SECRET:
return {
"status": "authentication_error",
"output": "",
"error": {
"message": "Invalid password. Access denied.",
"type": "invalid_password",
"suggestion": "Please check your password and try again."
}
}
# Validate code is not empty
if not body.code or not body.code.strip():
return {
"status": "validation_error",
"output": "",
"error": {
"message": "Code cannot be empty. Please provide ErasLang code to execute.",
"type": "empty_code",
"suggestion": "Write some ErasLang code starting with 'ARE YOU READY FOR IT?' and ending with 'LONG LIVE'."
}
}
# --- Step 0.5: Pre-transpilation Validation ---
# Validate code structure
structure_error = validate_code_structure(body.code)
if structure_error:
return {
"status": "validation_error",
"output": "",
"error": structure_error
}
# Validate syntax
syntax_error = validate_syntax(body.code)
if syntax_error:
return {
"status": "validation_error",
"output": "",
"error": syntax_error
}
# Validate inputs match QUESTION...? calls
input_error = validate_inputs(body.code, body.inputs)
if input_error:
# For unused/excess inputs, we can still proceed but warn
# For missing/insufficient inputs, we should error
if input_error["type"] in ["missing_inputs", "insufficient_inputs"]:
return {
"status": "validation_error",
"output": "",
"error": input_error
}
# For unused/excess, we'll proceed but the error will be in the response
# (Actually, let's proceed for now and handle at runtime)
# --- Step 1: Transpilation ---
try:
# We ensure the compiler adds # ERAS_LINE_X comments for our worker
python_source = compiler.transpile(body.code)
# Validate that transpilation produced valid code
if not python_source or not python_source.strip():
return {
"status": "transpilation_error",
"output": "",
"error": {
"message": "Transpilation produced empty code. Your ErasLang code may be missing required elements.",
"type": "empty_transpilation",
"suggestion": "Make sure your code includes 'ARE YOU READY FOR IT?' to start and 'LONG LIVE' to end."
}
}
# Validate Python syntax of transpiled code
try:
compile(python_source, '<string>', 'exec')
except SyntaxError as syn_err:
return {
"status": "transpilation_error",
"output": "",
"error": {
"message": f"Transpiled code has a syntax error at line {syn_err.lineno}: {syn_err.msg}",
"type": "transpilation_syntax_error",
"line": str(syn_err.lineno) if syn_err.lineno else None,
"suggestion": "This usually indicates an issue with your ErasLang code structure. Check for unmatched blocks or invalid syntax."
}
}
# Count how many input() calls we'll need to replace
input_call_count = len(re.findall(r'int\s*\(\s*input\s*\(\s*\)\s*\)', python_source))
# If inputs are provided, inject an input provider function
if body.inputs or input_call_count > 0:
# Validate inputs match input calls
if input_call_count > 0:
if not body.inputs or len(body.inputs) < input_call_count:
return {
"status": "validation_error",
"output": "",
"error": {
"message": f"Your code requires {input_call_count} input value(s) but only {len(body.inputs) if body.inputs else 0} were provided.",
"type": "insufficient_inputs",
"suggestion": f"Provide {input_call_count} integer value(s) in the inputs array. You need {input_call_count - (len(body.inputs) if body.inputs else 0)} more value(s)."
}
}
# Replace all int(input()) calls with our custom input function
python_source = re.sub(
r'int\s*\(\s*input\s*\(\s*\)\s*\)',
'int(_eras_input())',
python_source
)
# Prepend the input provider code
input_provider = f"""
# Input provider for prefilled inputs
_input_index = 0
_input_values = {body.inputs if body.inputs else []}
def _eras_input():
global _input_index
if _input_index < len(_input_values):
value = _input_values[_input_index]
_input_index += 1
return str(value)
else:
raise ValueError("Not enough inputs provided. Expected more QUESTION...? calls.")
"""
python_source = input_provider + python_source
except Exception as e:
# Format transpilation errors with helpful context
error_info = format_transpilation_error(e)
return {
"status": "transpilation_error",
"output": "",
"error": error_info
}
# --- Step 2: Subprocess Execution ---
try:
# We call worker.py and pipe the python_source into its stdin
worker_path = os.path.join(os.path.dirname(__file__), "worker.py")
if not os.path.exists(worker_path):
return {
"status": "system_error",
"output": "",
"error": {
"message": "Worker script not found. System configuration error.",
"type": "missing_worker",
"suggestion": "Please contact support - this is a system configuration issue."
}
}
process = subprocess.Popen(
[sys.executable, worker_path],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
# We set a strict timeout (e.g., 10 seconds) to prevent infinite loops
try:
stdout_data, stderr_data = process.communicate(
input=python_source,
timeout=10
)
except subprocess.TimeoutExpired:
process.kill()
process.wait() # Ensure process is terminated
return {
"status": "timeout",
"output": "",
"error": {
"message": "Performance Timed Out. Did you get stuck in an 'Is it Over Now?' loop?",
"type": "execution_timeout",
"line": None,
"suggestion": "Your code took longer than 10 seconds to execute. Check for infinite loops or very long-running operations. Make sure your 'IS IT OVER NOW?' loops have proper exit conditions."
}
}
# --- Step 3: Handle Results ---
if process.returncode == 0:
return {
"status": "success",
"output": stdout_data,
"error": None
}
else:
# The worker.py prints thematic errors to stdout on failure
# Parse the error message for better structure
error_output = stdout_data if stdout_data.strip() else stderr_data
error_info = parse_error_message(error_output)
return {
"status": "runtime_error",
"output": stdout_data if process.returncode != 0 else "",
"error": error_info
}
except FileNotFoundError:
return {
"status": "system_error",
"output": "",
"error": {
"message": "Python interpreter not found. System configuration error.",
"type": "missing_python",
"suggestion": "Please contact support - this is a system configuration issue."
}
}
except PermissionError:
return {
"status": "system_error",
"output": "",
"error": {
"message": "Permission denied when trying to execute code. System configuration error.",
"type": "permission_error",
"suggestion": "Please contact support - this is a system configuration issue."
}
}
except Exception as e:
# Log the full error for debugging (in production, use proper logging)
error_trace = traceback.format_exc()
return {
"status": "system_error",
"output": "",
"error": {
"message": f"An unexpected system error occurred: {str(e)}",
"type": "unexpected_error",
"suggestion": "Please try again. If the problem persists, contact support with details about what you were trying to do."
}
}
if __name__ == "__main__":
import uvicorn # type: ignore
# Start the server on port 8000
uvicorn.run(app, host="0.0.0.0", port=8000)