| from flask import Flask, request, jsonify, render_template, send_from_directory |
| import os |
| import subprocess |
| import tempfile |
| import shutil |
| import sys |
| import google.generativeai as genai |
| import re |
|
|
| app = Flask(__name__) |
|
|
| |
| temp_dir = tempfile.mkdtemp() |
| current_dir = temp_dir |
|
|
| |
| genai.configure(api_key=os.environ["GEMINI_API_KEY"]) |
|
|
| generation_config = { |
| "temperature": 0.7, |
| "top_p": 1, |
| "top_k": 40, |
| "max_output_tokens": 1024, |
| } |
|
|
| model = genai.GenerativeModel( |
| model_name="gemini-1.5-pro", |
| generation_config=generation_config, |
| ) |
|
|
| system_instruction = """ |
| You are a code generation assistant. Follow these strict rules: |
| |
| 1. Respond ONLY with code in ```python or ```bash blocks |
| 2. Never include explanations or comments |
| 3. For file operations, create complete executable scripts |
| 4. Handle errors within the code itself |
| 5. Format example: |
| User: Create a file that prints hello |
| Response: ```python\nprint("hello")\n``` |
| """ |
|
|
| chat = model.start_chat(history=[]) |
|
|
| def execute_command(command, cwd=None): |
| """Executes a command and returns the output.""" |
| process = subprocess.Popen( |
| command, |
| shell=True, |
| stdout=subprocess.PIPE, |
| stderr=subprocess.PIPE, |
| text=True, |
| cwd=cwd or current_dir |
| ) |
| stdout, stderr = process.communicate() |
| return stdout + stderr |
|
|
| def extract_code(response): |
| """Extracts clean code from Gemini response""" |
| code_blocks = re.findall(r'```(?:python|bash)\n(.*?)\n```', response, re.DOTALL) |
| return code_blocks[0].strip() if code_blocks else None |
|
|
| @app.route("/") |
| def index(): |
| return render_template("index.html") |
|
|
| @app.route("/execute", methods=["POST"]) |
| def execute_code(): |
| global current_dir |
| command = request.json.get("code", "").strip() |
| if not command: |
| return jsonify({"result": "Error: No command provided."}) |
|
|
| try: |
| if command.lower().startswith("ai:"): |
| ai_command = command[3:].strip() |
| response = chat.send_message(f"{system_instruction}\nUser request: {ai_command}") |
| code = extract_code(response.text) |
| |
| if not code: |
| return jsonify({"result": "Error: No valid code generated", "type": "error"}) |
|
|
| |
| if "print(" in code or "def " in code or "import " in code: |
| filename = "generated_script.py" |
| filepath = os.path.join(current_dir, filename) |
| with open(filepath, 'w') as f: |
| f.write(code) |
| return jsonify({ |
| "result": f"File created: {filename}", |
| "type": "code", |
| "file": filename, |
| "content": code |
| }) |
| |
| |
| elif any(cmd in code for cmd in ["pip", "git", "cd", "mkdir"]): |
| result = execute_command(code) |
| return jsonify({ |
| "result": f"Command executed:\n{result}", |
| "type": "command" |
| }) |
| |
| return jsonify({"result": "Unsupported code type", "type": "error"}) |
|
|
| elif command == "show files": |
| files = os.listdir(current_dir) |
| return jsonify({"result": "Files:\n" + "\n".join(files), "type": "files"}) |
| |
| elif command.startswith("cd "): |
| new_dir = os.path.join(current_dir, command[3:]) |
| if os.path.isdir(new_dir): |
| current_dir = os.path.abspath(new_dir) |
| return jsonify({"result": f"Changed directory to: {current_dir}", "type": "directory"}) |
| return jsonify({"result": f"Error: Directory not found: {new_dir}", "type": "error"}) |
| |
| elif command.startswith("!"): |
| result = execute_command(command[1:]) |
| return jsonify({"result": result, "type": "command"}) |
| |
| elif command.startswith("pip install"): |
| result = execute_command(f"{sys.executable} -m {command}") |
| return jsonify({"result": result, "type": "package"}) |
| |
| elif command.endswith(".py"): |
| result = execute_command(f"{sys.executable} {command}") |
| return jsonify({"result": result, "type": "script"}) |
| |
| else: |
| return jsonify({"result": "Unrecognized command", "type": "error"}) |
|
|
| except Exception as e: |
| return jsonify({"result": f"Error: {str(e)}", "type": "error"}) |
|
|
| @app.route("/save_file", methods=["POST"]) |
| def save_file(): |
| try: |
| filename = request.json.get("filename") |
| content = request.json.get("content") |
| filepath = os.path.join(current_dir, filename) |
| with open(filepath, 'w') as f: |
| f.write(content) |
| return jsonify({"result": f"File {filename} saved", "type": "file"}) |
| except Exception as e: |
| return jsonify({"result": f"Save error: {str(e)}", "type": "error"}) |
|
|
| @app.route("/cleanup", methods=["POST"]) |
| def cleanup(): |
| global temp_dir, current_dir |
| try: |
| if os.path.exists(temp_dir): |
| shutil.rmtree(temp_dir) |
| temp_dir = tempfile.mkdtemp() |
| current_dir = temp_dir |
| return jsonify({"result": "Environment reset", "type": "system"}) |
| except Exception as e: |
| return jsonify({"result": f"Cleanup error: {str(e)}", "type": "error"}) |
|
|
| @app.route("/list_files", methods=["GET"]) |
| def list_files(): |
| try: |
| files = os.listdir(current_dir) |
| return jsonify({"files": files, "type": "files"}) |
| except Exception as e: |
| return jsonify({"result": f"List error: {str(e)}", "type": "error"}) |
|
|
| @app.route("/download/<path:filename>", methods=["GET"]) |
| def download_file(filename): |
| try: |
| return send_from_directory(current_dir, filename, as_attachment=True) |
| except Exception as e: |
| return jsonify({"result": f"Download error: {str(e)}", "type": "error"}) |
|
|
| if __name__ == "__main__": |
| app.run(host="0.0.0.0", port=7860) |