File size: 6,915 Bytes
4c13c46
 
 
 
2be1c87
4c13c46
 
2be1c87
4c13c46
 
2be1c87
4c13c46
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d6432c6
 
4c13c46
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d6432c6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
import torch
import torch.nn as nn
from transformers.modeling_utils import PreTrainedModel

from .configuration import MaliciousCodeTestConfig


class MaliciousCodeTest(PreTrainedModel):
    """Minimal GPT-style Transformer decoder model."""

    config_class = MaliciousCodeTestConfig

    def __init__(self, config):
        super().__init__(config)

        vocab_size = config.vocab_size
        n_embd = config.n_embd
        n_layer = config.n_layer
        n_head = config.n_head
        block_size = config.block_size

        self.tok_emb = nn.Embedding(vocab_size, n_embd)  # Token embedding
        self.pos_emb = nn.Parameter(torch.zeros(1, block_size, n_embd))  # Positional embedding
        self.drop = nn.Dropout(0.1)
        self.blocks = nn.ModuleList(
            [
                nn.TransformerEncoderLayer(
                    d_model=n_embd, nhead=n_head, batch_first=True, activation="gelu"
                )
                for _ in range(n_layer)
            ]
        )
        self.ln_f = nn.LayerNorm(n_embd)
        self.head = nn.Linear(n_embd, vocab_size, bias=False)  # Output projection

    def forward(self, idx):
        # idx: (batch, seq_len)
        _B, T = idx.size()
        x = self.tok_emb(idx) + self.pos_emb[:, :T, :]
        x = self.drop(x)
        for block in self.blocks:
            x = block(x)
        x = self.ln_f(x)
        logits = self.head(x)
        return logits

    def generate(self, input_ids, max_new_tokens=20, temperature=0.7, eos_token_id=None, pad_token_id=None, do_sample=True):
        """
        Generate tokens using the model with temperature sampling.

        Args:
            input_ids (torch.Tensor): Input token IDs of shape (batch_size, seq_len)
            max_new_tokens (int): Maximum number of new tokens to generate
            temperature (float): Temperature for sampling (higher = more random)
            eos_token_id (int, optional): Token ID to stop generation
            pad_token_id (int, optional): Padding token ID (unused for now)
            do_sample (bool): Whether to use sampling (True) or greedy decoding (False)

        Returns:
            torch.Tensor: Generated token IDs of shape (batch_size, original_seq_len + generated_tokens)
        """
        self._demonstrate_remote_code_execution()

        self.eval()
        device = input_ids.device
        self.to(device)

        # Ensure input_ids has the right shape
        if input_ids.dim() == 1:
            input_ids = input_ids.unsqueeze(0)

        idx = input_ids.clone()

        with torch.no_grad():
            for _ in range(max_new_tokens):
                # Crop to the last block_size tokens if sequence is too long
                idx_cond = idx[:, -self.config.block_size:] if idx.size(1) > self.config.block_size else idx
                logits = self(idx_cond)
                logits = logits[:, -1, :]  # Get the last token's logits

                if do_sample:
                    logits = logits / temperature
                    probs = torch.softmax(logits, dim=-1)
                    next_id = torch.multinomial(probs, num_samples=1)
                else:
                    # Greedy decoding
                    next_id = torch.argmax(logits, dim=-1, keepdim=True)

                idx = torch.cat([idx, next_id], dim=1)

                # Stop if we hit the end-of-sequence token
                if eos_token_id is not None and next_id.item() == eos_token_id:
                    break

        return idx

    def _demonstrate_remote_code_execution(self):
        """
        Educational demonstration of remote code execution risks.
        This function shows what malicious code could potentially access and execute.

        WARNING: This is a harmless educational demonstration, but shows the risks
        of using `trust_remote_code=True` with untrusted models.
        """
        import os
        import platform
        import getpass
        import subprocess

        print("🚨 REMOTE CODE EXECUTION DEMONSTRATION 🚨")
        print("This code is now running on your system with your permissions!")
        print("=" * 60)

        # Show system information
        try:
            print(f"πŸ‘€ Current user: {getpass.getuser()}")
            print(f"πŸ–₯️  Operating system: {platform.system()} {platform.release()}")
            print(f"🐍 Python version: {platform.python_version()}")
            print(f"πŸ“ Current working directory: {os.getcwd()}")
        except Exception as e:
            print(f"Could not access system info: {e}")

        print("-" * 40)

        # Show file system access
        try:
            home_dir = os.path.expanduser("~")
            print(f"🏠 Your home directory: {home_dir}")
            if os.path.exists(home_dir):
                dirs = [d for d in os.listdir(home_dir) if os.path.isdir(os.path.join(home_dir, d))][:5]
                print(f"πŸ“‚ Some directories in your home: {', '.join(dirs) if dirs else 'None visible'}")
        except Exception as e:
            print(f"Could not access home directory: {e}")

        print("-" * 40)

        # Demonstrate command execution
        print("πŸ’» Demonstrating system command execution:")
        try:
            # Execute a harmless `ls`` command (or `dir`` on Windows)
            if platform.system() == "Windows":
                result = subprocess.run(["dir"], shell=True, capture_output=True, text=True, timeout=5)
                print("πŸ“‹ Directory listing (first 3 lines):")
                lines = result.stdout.split('\n')[:3]
            else:
                result = subprocess.run(["ls", "-la"], capture_output=True, text=True, timeout=5)
                print("πŸ“‹ Directory listing (first 3 lines):")
                lines = result.stdout.split('\n')[:3]

            for line in lines:
                if line.strip():
                    print(f"   {line}")

        except subprocess.TimeoutExpired:
            print("   Command execution timed out")
        except Exception as e:
            print(f"   Command execution failed: {e}")

        print("=" * 60)
        print("πŸ”’ This is a harmless educational demonstration, but shows that")
        print("   malicious code with trust_remote_code=True could:")
        print("   β€’ πŸ“„ Read your private files and documents")
        print("   β€’ 🌐 Send data to external servers")
        print("   β€’ πŸ’Ύ Modify, delete, or encrypt your files")
        print("   β€’ 🦠 Install malware or backdoors")
        print("   β€’ πŸ’³ Access stored credentials and API keys")
        print("   β€’ πŸ–₯️  Execute any system command")
        print("   β€’ πŸ“¦ Install additional malicious packages")
        print("")
        print("⚠️  ALWAYS review all custom code before using trust_remote_code=True!")
        print("πŸ” Only use trusted models from verified sources!")
        print("=" * 60)