| from flask import Flask, request, jsonify |
| from flask_sqlalchemy import SQLAlchemy |
| from flask_migrate import Migrate |
| import logging |
| import json |
| import os |
| from sqlalchemy.exc import SQLAlchemyError |
| import random |
| import subprocess |
| import time |
|
|
| app = Flask(__name__) |
|
|
| |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') |
| logger = logging.getLogger(__name__) |
|
|
| |
| app.config['SQLALCHEMY_DATABASE_URI'] = os.environ.get('DATABASE_URL', 'sqlite:///trojan.db') |
| app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False |
| db = SQLAlchemy(app) |
| migrate = Migrate(app, db) |
|
|
| |
| class TrojanServer(db.Model): |
| id = db.Column(db.Integer, primary_key=True) |
| server_ip = db.Column(db.String(255), nullable=False) |
| server_port = db.Column(db.Integer, nullable=False) |
| encryption_method = db.Column(db.String(255)) |
| deployment_method = db.Column(db.String(255)) |
|
|
| class TrojanClient(db.Model): |
| id = db.Column(db.Integer, primary_key=True) |
| config = db.Column(db.JSON, nullable=False) |
| deployment_method = db.Column(db.String(255)) |
|
|
| |
| @app.route('/servers', methods=['GET', 'POST']) |
| def manage_trojan_servers(): |
| if request.method == 'GET': |
| try: |
| servers = TrojanServer.query.all() |
| return jsonify([{'id': s.id, 'server_ip': s.server_ip, 'server_port': s.server_port, 'encryption_method': s.encryption_method, 'deployment_method': s.deployment_method} for s in servers]) |
| except SQLAlchemyError as e: |
| logger.error(f"Database error listing servers: {str(e)}") |
| return jsonify({'message': 'Error listing trojan servers', 'error': str(e)}), 500 |
| elif request.method == 'POST': |
| data = request.get_json() |
| if not data or 'server_ip' not in data or 'server_port' not in data or 'deployment_method' not in data: |
| logger.error("Invalid input for creating server") |
| return jsonify({'message': 'Invalid input for creating server'}), 400 |
| try: |
| new_server = TrojanServer(server_ip=data['server_ip'], server_port=data['server_port'], encryption_method=data.get('encryption_method'), deployment_method=data['deployment_method']) |
| db.session.add(new_server) |
| db.session.commit() |
| logger.info(f"Created new trojan server: {new_server.server_ip}:{new_server.server_port}") |
| return jsonify({'message': 'Trojan server created successfully', 'id': new_server.id}), 201 |
| except SQLAlchemyError as e: |
| db.session.rollback() |
| logger.error(f"Database error creating server: {str(e)}") |
| return jsonify({'message': 'Error creating trojan server', 'error': str(e)}), 500 |
|
|
| @app.route('/clients', methods=['GET', 'POST']) |
| def manage_trojan_clients(): |
| if request.method == 'GET': |
| try: |
| clients = TrojanClient.query.all() |
| return jsonify([{'id': c.id, 'config': c.config, 'deployment_method': c.deployment_method} for c in clients]) |
| except SQLAlchemyError as e: |
| logger.error(f"Database error listing clients: {str(e)}") |
| return jsonify({'message': 'Error listing trojan clients', 'error': str(e)}), 500 |
| elif request.method == 'POST': |
| data = request.get_json() |
| if not data or 'config' not in data or 'deployment_method' not in data: |
| logger.error("Invalid input for creating client") |
| return jsonify({'message': 'Invalid input for creating client'}), 400 |
| try: |
| new_client = TrojanClient(config=data['config'], deployment_method=data['deployment_method']) |
| db.session.add(new_client) |
| db.session.commit() |
| logger.info(f"Created new trojan client: {new_client.id}") |
| return jsonify({'message': 'Trojan client created successfully', 'id': new_client.id}), 201 |
| except SQLAlchemyError as e: |
| db.session.rollback() |
| logger.error(f"Database error creating client: {str(e)}") |
| return jsonify({'message': 'Error creating trojan client', 'error': str(e)}), 500 |
|
|
| @app.route('/generate', methods=['POST']) |
| def generate_trojan_config_api(): |
| data = request.get_json() |
| goal = data.get('goal') |
| constraints = data.get('constraints', {}) |
|
|
| if not goal: |
| logger.error("AI goal is required") |
| return jsonify({'message': 'AI goal is required'}), 400 |
|
|
| try: |
| |
| logger.info(f"Generating trojan config with AI. Goal: {goal}, Constraints: {constraints}") |
| ai_config = generate_trojan_config(goal, constraints) |
| logger.info(f"AI generated config: {ai_config}") |
| return jsonify(ai_config) |
| except Exception as e: |
| logger.error(f"Error generating trojan config with AI: {str(e)}") |
| return jsonify({'message': 'Error generating trojan config with AI', 'error': str(e)}), 500 |
|
|
| @app.route('/deploy/<int:trojan_id>', methods=['POST']) |
| def deploy_trojan_api(trojan_id): |
| try: |
| |
| logger.info(f"Deploying trojan with ID: {trojan_id}") |
| deployment_feedback = deploy_trojan(trojan_id) |
| logger.info(f"Trojan {trojan_id} deployed successfully.") |
| return jsonify({'message': 'Trojan deployed successfully', 'feedback': deployment_feedback}) |
| except subprocess.CalledProcessError as e: |
| logger.error(f"Subprocess error deploying trojan: {str(e)}") |
| return jsonify({'message': f'Subprocess error deploying trojan: {str(e)}', 'error': str(e)}), 500 |
| except Exception as e: |
| logger.error(f"Error deploying trojan: {str(e)}") |
| return jsonify({'message': f'Error deploying trojan: {str(e)}', 'error': str(e)}), 500 |
|
|
| def generate_trojan_config(goal, constraints): |
| """ |
| AI-driven trojan configuration generation. |
| """ |
| server_ip = f"192.168.{random.randint(1, 254)}.{random.randint(1, 254)}" |
| server_port = random.randint(1024, 65535) |
| encryption_methods = ['AES-256', 'ChaCha20', 'RSA'] |
| encryption_method = random.choice(encryption_methods) |
| deployment_methods = ['ssh', 'powershell', 'manual'] |
| deployment_method = random.choice(deployment_methods) |
|
|
| client_config = { |
| 'server_ip': server_ip, |
| 'server_port': server_port, |
| 'encryption_method': encryption_method, |
| 'custom_data': constraints, |
| 'trojan_version': '1.2.3', |
| 'os': 'windows', |
| 'arch': 'x64' |
| } |
|
|
| return { |
| 'server_ip': server_ip, |
| 'server_port': server_port, |
| 'encryption_method': encryption_method, |
| 'client_config': client_config, |
| 'deployment_method': deployment_method |
| } |
|
|
| def deploy_trojan(trojan_id): |
| """ |
| Deployment logic. |
| """ |
| time.sleep(1) |
| trojan = TrojanServer.query.get(trojan_id) or TrojanClient.query.get(trojan_id) |
| if not trojan: |
| return {'status': 'error', 'message': f'Trojan with ID {trojan_id} not found.'} |
| |
| deployment_method = trojan.deployment_method |
| |
| if isinstance(trojan, TrojanServer): |
| target = f"{trojan.server_ip}:{trojan.server_port}" |
| else: |
| target = "client" |
| |
| if deployment_method == 'ssh': |
| command = ['bash', 'deploy_ssh.sh', target, json.dumps(trojan.config) if hasattr(trojan, 'config') else ''] |
| elif deployment_method == 'powershell': |
| command = ['powershell', 'deploy_powershell.ps1', target, json.dumps(trojan.config) if hasattr(trojan, 'config') else ''] |
| elif deployment_method == 'manual': |
| command = ['echo', 'Manual deployment required for', target] |
| else: |
| return {'status': 'error', 'message': f'Invalid deployment method: {deployment_method}'} |
| |
| try: |
| process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE) |
| stdout, stderr = process.communicate() |
| if process.returncode == 0: |
| feedback = { |
| 'status': 'success', |
| 'message': f'Trojan {trojan_id} deployed successfully using {deployment_method}.', |
| 'details': { |
| 'stdout': stdout.decode('utf-8'), |
| 'stderr': stderr.decode('utf-8') |
| } |
| } |
| else: |
| feedback = { |
| 'status': 'error', |
| 'message': f'Trojan {trojan_id} deployment failed using {deployment_method}.', |
| 'details': { |
| 'stdout': stdout.decode('utf-8'), |
| 'stderr': stderr.decode('utf-8') |
| } |
| } |
| except subprocess.CalledProcessError as e: |
| feedback = { |
| 'status': 'error', |
| 'message': f'Subprocess error deploying trojan {trojan_id}: {str(e)}', |
| 'details': {} |
| } |
| except Exception as e: |
| feedback = { |
| 'status': 'error', |
| 'message': f'Error deploying trojan {trojan_id}: {str(e)}', |
| 'details': {} |
| } |
| return feedback |
|
|
| if __name__ == '__main__': |
| with app.app_context(): |
| db.create_all() |
| app.run(debug=True) |
|
|