Bc-AI commited on
Commit
af68acb
·
verified ·
1 Parent(s): a774e49

Upload folder using huggingface_hub

Browse files
Dockerfile ADDED
@@ -0,0 +1,25 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ FROM python:3.10-slim
2
+
3
+ WORKDIR /app
4
+
5
+ # Install system dependencies
6
+ RUN apt-get update && apt-get install -y \
7
+ gcc \
8
+ g++ \
9
+ && rm -rf /var/lib/apt/lists/*
10
+
11
+ # Copy requirements first to leverage Docker cache
12
+ COPY requirements.txt .
13
+ RUN pip install --no-cache-dir -r requirements.txt
14
+
15
+ # Copy application code
16
+ COPY worker_app.py .
17
+ COPY model_architecture.py .
18
+ COPY model_manager.py .
19
+ COPY ../shared ./shared
20
+
21
+ # Expose port for the API
22
+ EXPOSE 8000
23
+
24
+ # Start the application
25
+ CMD ["python", "worker_app.py"]
README.md CHANGED
@@ -1,10 +1,12 @@
1
- ---
2
- title: Worker Universal
3
- emoji: 📉
4
- colorFrom: pink
5
- colorTo: blue
6
- sdk: docker
7
- pinned: false
8
- ---
9
-
10
- Check out the configuration reference at https://huggingface.co/docs/hub/spaces-config-reference
 
 
 
1
+ # SACCP Worker_Universal Node
2
+ This is a worker_universal node in the SACCP (Scalable Accelerated Compute Protocol) distributed computing network.
3
+
4
+ ## Node Type: WORKER_UNIVERSAL
5
+ - Processes tasks according to SACCP protocol
6
+ - Contributes computational resources to the network
7
+ - Earns cloud credits for resource contribution
8
+
9
+ ## Architecture
10
+ - Built with FastAPI and TensorFlow/Keras
11
+ - Implements fault-tolerant operations
12
+ - Integrated with SACCP credit system
model_architecture.py ADDED
@@ -0,0 +1,205 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import tensorflow as tf
2
+ import keras
3
+ import numpy as np
4
+
5
+ @keras.saving.register_keras_serializable()
6
+ class RotaryEmbedding(keras.layers.Layer):
7
+ def __init__(self, dim, max_len=2048, theta=10000, **kwargs):
8
+ super().__init__(**kwargs)
9
+ self.dim = dim
10
+ self.max_len = max_len
11
+ self.theta = theta
12
+ self.built_cache = False
13
+ self.cos_cached = None
14
+ self.sin_cached = None
15
+
16
+ def build(self, input_shape):
17
+ super().build(input_shape)
18
+
19
+ def _build_cache(self):
20
+ if not self.built_cache:
21
+ inv_freq = 1.0 / (self.theta ** (tf.range(0, self.dim, 2, dtype=tf.float32) / self.dim))
22
+ t = tf.range(self.max_len, dtype=tf.float32)
23
+ freqs = tf.einsum("i,j->ij", t, inv_freq)
24
+ emb = tf.concat([freqs, freqs], axis=-1)
25
+ self.cos_cached = tf.constant(np.cos(emb.numpy()), dtype=tf.float32)
26
+ self.sin_cached = tf.constant(np.sin(emb.numpy()), dtype=tf.float32)
27
+ self.built_cache = True
28
+
29
+ def rotate_half(self, x):
30
+ x1, x2 = tf.split(x, 2, axis=-1)
31
+ return tf.concat([-x2, x1], axis=-1)
32
+
33
+ def call(self, q, k, offset=0):
34
+ """Apply rotary embeddings with position offset."""
35
+ self._build_cache()
36
+ seq_len = tf.shape(q)[2]
37
+ dtype = q.dtype
38
+
39
+ cos = tf.cast(self.cos_cached[offset:offset + seq_len, :], dtype)[None, None, :, :]
40
+ sin = tf.cast(self.sin_cached[offset:offset + seq_len, :], dtype)[None, None, :, :]
41
+
42
+ q_embed = (q * cos) + (self.rotate_half(q) * sin)
43
+ k_embed = (k * cos) + (self.rotate_half(k) * sin)
44
+ return q_embed, k_embed
45
+
46
+ def get_config(self):
47
+ config = super().get_config()
48
+ config.update({"dim": self.dim, "max_len": self.max_len, "theta": self.theta})
49
+ return config
50
+
51
+
52
+ @keras.saving.register_keras_serializable()
53
+ class RMSNorm(keras.layers.Layer):
54
+ def __init__(self, epsilon=1e-5, **kwargs):
55
+ super().__init__(**kwargs)
56
+ self.epsilon = epsilon
57
+ self.scale = None
58
+
59
+ def build(self, input_shape):
60
+ self.scale = self.add_weight(name="scale", shape=(input_shape[-1],), initializer="ones")
61
+ super().build(input_shape)
62
+
63
+ def call(self, x):
64
+ variance = tf.reduce_mean(tf.square(x), axis=-1, keepdims=True)
65
+ return x * tf.math.rsqrt(variance + self.epsilon) * self.scale
66
+
67
+ def get_config(self):
68
+ config = super().get_config()
69
+ config.update({"epsilon": self.epsilon})
70
+ return config
71
+
72
+
73
+ @keras.saving.register_keras_serializable()
74
+ class TransformerBlock(keras.layers.Layer):
75
+ def __init__(self, d_model, n_heads, ff_dim, dropout, max_len, rope_theta, layer_idx=0, **kwargs):
76
+ super().__init__(**kwargs)
77
+ self.d_model = d_model
78
+ self.n_heads = n_heads
79
+ self.ff_dim = ff_dim
80
+ self.dropout_rate = dropout
81
+ self.max_len = max_len
82
+ self.rope_theta = rope_theta
83
+ self.head_dim = d_model // n_heads
84
+ self.layer_idx = layer_idx
85
+
86
+ def build(self, input_shape):
87
+ self.pre_attn_norm = RMSNorm(name="pre_attn_norm")
88
+ self.pre_ffn_norm = RMSNorm(name="pre_ffn_norm")
89
+ self.q_proj = keras.layers.Dense(self.d_model, use_bias=False, name="q_proj")
90
+ self.k_proj = keras.layers.Dense(self.d_model, use_bias=False, name="k_proj")
91
+ self.v_proj = keras.layers.Dense(self.d_model, use_bias=False, name="v_proj")
92
+ self.out_proj = keras.layers.Dense(self.d_model, use_bias=False, name="o_proj")
93
+ self.rope = RotaryEmbedding(self.head_dim, max_len=self.max_len, theta=self.rope_theta)
94
+ self.gate_proj = keras.layers.Dense(self.ff_dim, use_bias=False, name="gate_proj")
95
+ self.up_proj = keras.layers.Dense(self.ff_dim, use_bias=False, name="up_proj")
96
+ self.down_proj = keras.layers.Dense(self.d_model, use_bias=False, name="down_proj")
97
+ self.dropout = keras.layers.Dropout(self.dropout_rate)
98
+ super().build(input_shape)
99
+
100
+ def call(self, x, training=None, past_kv=None, use_cache=False):
101
+ """Simplified call without KV cache for this example"""
102
+ B, T, D = tf.shape(x)[0], tf.shape(x)[1], self.d_model
103
+ dtype = x.dtype
104
+
105
+ res = x
106
+ y = self.pre_attn_norm(x)
107
+
108
+ # Multi-head attention
109
+ q = tf.transpose(tf.reshape(self.q_proj(y), [B, T, self.n_heads, self.head_dim]), [0, 2, 1, 3])
110
+ k = tf.transpose(tf.reshape(self.k_proj(y), [B, T, self.n_heads, self.head_dim]), [0, 2, 1, 3])
111
+ v = tf.transpose(tf.reshape(self.v_proj(y), [B, T, self.n_heads, self.head_dim]), [0, 2, 1, 3])
112
+
113
+ # Apply RoPE
114
+ q, k = self.rope(q, k, offset=0)
115
+
116
+ # Attention scores
117
+ scores = tf.matmul(q, k, transpose_b=True) / tf.sqrt(tf.cast(self.head_dim, dtype))
118
+
119
+ # Causal mask
120
+ mask = tf.linalg.band_part(tf.ones([T, T], dtype=dtype), -1, 0) # Upper triangular
121
+ mask = tf.where(mask == 0, tf.constant(-1e9, dtype=dtype), tf.constant(0.0, dtype=dtype))
122
+ scores = scores + mask[None, None, :, :]
123
+
124
+ attn = tf.nn.softmax(scores, axis=-1)
125
+ attn_out = tf.matmul(attn, v)
126
+ attn_out = tf.transpose(attn_out, [0, 2, 1, 3])
127
+ attn_out = tf.reshape(attn_out, [B, T, self.d_model])
128
+
129
+ x = res + self.dropout(self.out_proj(attn_out), training=training)
130
+
131
+ # FFN
132
+ res = x
133
+ y = self.pre_ffn_norm(x)
134
+ ffn = self.down_proj(keras.activations.silu(self.gate_proj(y)) * self.up_proj(y))
135
+ output = res + self.dropout(ffn, training=training)
136
+
137
+ return output, None # Return None for past_kv in this simplified version
138
+
139
+ def get_config(self):
140
+ config = super().get_config()
141
+ config.update({
142
+ "d_model": self.d_model,
143
+ "n_heads": self.n_heads,
144
+ "ff_dim": self.ff_dim,
145
+ "dropout": self.dropout_rate,
146
+ "max_len": self.max_len,
147
+ "rope_theta": self.rope_theta,
148
+ "layer_idx": self.layer_idx
149
+ })
150
+ return config
151
+
152
+
153
+ @keras.saving.register_keras_serializable()
154
+ class SAM1Model(keras.Model):
155
+ def __init__(self, **kwargs):
156
+ super().__init__()
157
+ if 'config' in kwargs and isinstance(kwargs['config'], dict):
158
+ self.cfg = kwargs['config']
159
+ elif 'vocab_size' in kwargs:
160
+ self.cfg = kwargs
161
+ else:
162
+ self.cfg = kwargs.get('cfg', kwargs)
163
+
164
+ self.embed = keras.layers.Embedding(self.cfg['vocab_size'], self.cfg['d_model'], name="embed_tokens")
165
+ ff_dim = int(self.cfg['d_model'] * self.cfg['ff_mult'])
166
+ block_args = {
167
+ 'd_model': self.cfg['d_model'],
168
+ 'n_heads': self.cfg['n_heads'],
169
+ 'ff_dim': ff_dim,
170
+ 'dropout': self.cfg['dropout'],
171
+ 'max_len': self.cfg['max_len'],
172
+ 'rope_theta': self.cfg['rope_theta']
173
+ }
174
+ self.blocks = [
175
+ TransformerBlock(name=f"block_{i}", layer_idx=i, **block_args)
176
+ for i in range(self.cfg['n_layers'])
177
+ ]
178
+ self.norm = RMSNorm(name="final_norm")
179
+ self.lm_head = keras.layers.Dense(self.cfg['vocab_size'], use_bias=False, name="lm_head")
180
+
181
+ def call(self, input_ids, training=None, past_kv=None, use_cache=False):
182
+ """
183
+ Simplified call without full KV cache implementation
184
+ """
185
+ x = self.embed(input_ids)
186
+
187
+ for block in self.blocks:
188
+ x, _ = block(x, training=training, past_kv=None, use_cache=False)
189
+
190
+ logits = self.lm_head(self.norm(x))
191
+ return logits, None # Return None for past_kv in this simplified version
192
+
193
+ def get_config(self):
194
+ base_config = super().get_config()
195
+ base_config['config'] = self.cfg
196
+ return base_config
197
+
198
+
199
+ def count_parameters(model):
200
+ """Count model parameters"""
201
+ total_params = 0
202
+ for weight in model.weights:
203
+ w = weight.numpy()
204
+ total_params += w.size
205
+ return total_params
model_manager.py ADDED
@@ -0,0 +1,167 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ import json
3
+ import tensorflow as tf
4
+ import keras
5
+ import numpy as np
6
+ from tokenizers import Tokenizer
7
+ from huggingface_hub import hf_hub_download
8
+ from transformers import GPT2Tokenizer
9
+ import threading
10
+ from typing import Dict, Optional
11
+
12
+ from model_architecture import SAM1Model
13
+
14
+ class ModelManager:
15
+ """
16
+ Manages multiple models and their loading/unloading based on demand
17
+ """
18
+
19
+ def __init__(self):
20
+ self.models: Dict[str, keras.Model] = {}
21
+ self.tokenizers: Dict[str, Tokenizer] = {}
22
+ self.model_configs: Dict[str, dict] = {}
23
+ self.lock = threading.Lock()
24
+
25
+ # Model mapping
26
+ self.model_repos = {
27
+ "sam-x-nano": "Smilyai-labs/Sam-nano",
28
+ "sam-x-mini": "Smilyai-labs/Sam-mini",
29
+ "sam-x-fast": "Smilyai-labs/Sam-fast",
30
+ "sam-x-large": "Smilyai-labs/Sam-large-2", # Using Sam-large-2 as the large model
31
+ "sam-large-2": "Smilyai-labs/Sam-large-2"
32
+ }
33
+
34
+ # Performance optimizations that should be applied before TF import
35
+ NUM_CORES = os.cpu_count() or 4
36
+ os.environ['TF_NUM_INTEROP_THREADS'] = str(NUM_CORES)
37
+ os.environ['TF_NUM_INTRAOP_THREADS'] = str(NUM_CORES)
38
+ os.environ['CUDA_VISIBLE_DEVICES'] = '-1' # Force CPU only for consistency
39
+ os.environ['TF_ENABLE_ONEDNN_OPTS'] = '1' # Intel optimization
40
+ os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2' # Reduce TF logging
41
+
42
+ # Configure TF threading
43
+ tf.config.threading.set_inter_op_parallelism_threads(NUM_CORES)
44
+ tf.config.threading.set_intra_op_parallelism_threads(NUM_CORES)
45
+
46
+ print(f"✅ CPU optimized: {NUM_CORES} threads, oneDNN enabled")
47
+
48
+ def get_model_repo(self, model_type: str) -> str:
49
+ """Get the Hugging Face repository for a given model type"""
50
+ return self.model_repos.get(model_type, self.model_repos["sam-x-large"])
51
+
52
+ def load_tokenizer(self, model_type: str) -> Tokenizer:
53
+ """Load tokenizer for a specific model type"""
54
+ if model_type in self.tokenizers:
55
+ return self.tokenizers[model_type]
56
+
57
+ print(f"🚀 Loading tokenizer for {model_type}...")
58
+
59
+ try:
60
+ # Load base tokenizer
61
+ from transformers import AutoTokenizer
62
+ hf_tokenizer = AutoTokenizer.from_pretrained("gpt2")
63
+
64
+ # Add special tokens specific to your models
65
+ special_tokens = [
66
+ "\n", "\n", "\n", "\n",
67
+ "<CONTINUE>",
68
+ "<im end for model tun>"
69
+ ]
70
+ hf_tokenizer.add_special_tokens({"additional_special_tokens": special_tokens})
71
+
72
+ # Save temporarily to create tokenizers instance
73
+ os.makedirs(f"./temp_tokenizer_{model_type}", exist_ok=True)
74
+ hf_tokenizer.save_pretrained(f"./temp_tokenizer_{model_type}")
75
+ tokenizer = Tokenizer.from_file(f"./temp_tokenizer_{model_type}/tokenizer.json")
76
+
77
+ print(f"✅ Tokenizer loaded for {model_type} with vocab size: {tokenizer.get_vocab_size()}")
78
+
79
+ self.tokenizers[model_type] = tokenizer
80
+ return tokenizer
81
+
82
+ except Exception as e:
83
+ print(f"❌ Error loading tokenizer for {model_type}: {e}")
84
+ raise
85
+
86
+ def load_model(self, model_type: str) -> keras.Model:
87
+ """Load a specific model by type"""
88
+ if model_type in self.models:
89
+ return self.models[model_type]
90
+
91
+ print(f"🚀 Loading {model_type} model...")
92
+
93
+ try:
94
+ # Get the appropriate model repo
95
+ model_repo = self.get_model_repo(model_type)
96
+ cache_dir = f"./model_cache/{model_type}"
97
+
98
+ # Download config
99
+ config_path = hf_hub_download(model_repo, "config.json", cache_dir=cache_dir)
100
+ with open(config_path, 'r') as f:
101
+ config = json.load(f)
102
+
103
+ # Store model config
104
+ self.model_configs[model_type] = config
105
+
106
+ # Build model from config
107
+ model_config = {
108
+ 'vocab_size': config.get('vocab_size', 50432),
109
+ 'd_model': config.get('hidden_size', 768),
110
+ 'n_layers': config.get('num_hidden_layers', 12),
111
+ 'n_heads': config.get('num_attention_heads', 12),
112
+ 'ff_mult': config.get('intermediate_size', 3072) / config.get('hidden_size', 768),
113
+ 'max_len': config.get('max_position_embeddings', 2048),
114
+ 'dropout': 0.1,
115
+ 'rope_theta': config.get('rope_theta', 10000)
116
+ }
117
+
118
+ model = SAM1Model(config=model_config)
119
+
120
+ # Build model with dummy input
121
+ dummy_input = tf.zeros((1, 16), dtype=tf.int32)
122
+ _ = model(dummy_input, training=False, use_cache=False)
123
+
124
+ print(f"✅ Model {model_type} loaded: {config.get('num_hidden_layers', 12)} layers")
125
+
126
+ # Try to load weights
127
+ try:
128
+ weights_path = hf_hub_download(model_repo, "model.weights.h5", cache_dir=cache_dir)
129
+ model.load_weights(weights_path)
130
+ print(f"✅ Model weights loaded successfully for {model_type}!")
131
+ except Exception as e:
132
+ print(f"⚠️ Could not load weights for {model_type}, using random initialization: {e}")
133
+
134
+ # Warm up the model
135
+ print(f"🔥 Warming up model {model_type}...")
136
+ warmup_input = tf.constant([[1, 2, 3, 4, 5]], dtype=tf.int32)
137
+ _, _ = model(warmup_input, training=False, use_cache=True)
138
+ print(f"✅ Model {model_type} warmed up")
139
+
140
+ # Store the model
141
+ self.models[model_type] = model
142
+ return model
143
+
144
+ except Exception as e:
145
+ print(f"❌ Error loading model {model_type}: {e}")
146
+ raise
147
+
148
+ def get_model(self, model_type: str) -> tuple:
149
+ """Get model and tokenizer for a specific type, loading if necessary"""
150
+ with self.lock:
151
+ # Ensure tokenizer is loaded
152
+ if model_type not in self.tokenizers:
153
+ self.load_tokenizer(model_type)
154
+
155
+ # Ensure model is loaded
156
+ if model_type not in self.models:
157
+ self.load_model(model_type)
158
+
159
+ return self.models[model_type], self.tokenizers[model_type], self.model_configs[model_type]
160
+
161
+ def list_available_models(self) -> list:
162
+ """Get list of available model types"""
163
+ return list(self.model_repos.keys())
164
+
165
+ def is_model_loaded(self, model_type: str) -> bool:
166
+ """Check if a model is currently loaded"""
167
+ return model_type in self.models
requirements.txt ADDED
@@ -0,0 +1,11 @@
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # Requirements for Worker Nodes
2
+ keras==2.15.0
3
+ tensorflow==2.15.0
4
+ fastapi==0.104.1
5
+ uvicorn==0.24.0
6
+ requests==2.31.0
7
+ huggingface_hub==0.20.1
8
+ tokenizers==0.15.0
9
+ transformers==4.35.2
10
+ numpy==1.24.3
11
+ pytz==2023.3.post1
shared/approval_system.py ADDED
@@ -0,0 +1,168 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Smilyai Approval System for SACCP Network
3
+ Handles approval of HEAD nodes and other security features
4
+ """
5
+
6
+ from enum import Enum
7
+ from pydantic import BaseModel
8
+ from typing import Optional, List, Dict, Any
9
+ import time
10
+ import uuid
11
+
12
+
13
+ class ApprovalStatus(str, Enum):
14
+ PENDING = "pending"
15
+ APPROVED = "approved"
16
+ REJECTED = "rejected"
17
+ REVOKED = "revoked"
18
+
19
+
20
+ class ApprovalType(str, Enum):
21
+ HEAD_NODE = "head_node"
22
+ SPECIAL_ACCESS = "special_access"
23
+ RESOURCE_INTENSIVE_TASK = "resource_intensive_task"
24
+
25
+
26
+ class ApprovalRequest(BaseModel):
27
+ """Request for smilyai approval"""
28
+ request_id: str
29
+ node_id: str
30
+ endpoint: str
31
+ request_type: ApprovalType
32
+ request_data: Dict[str, Any]
33
+ reason: str
34
+ requested_at: int
35
+ requested_by: str # User or system that requested
36
+
37
+
38
+ class ApprovalResponse(BaseModel):
39
+ """Response to an approval request"""
40
+ request_id: str
41
+ status: ApprovalStatus
42
+ approved_by: Optional[str] = None
43
+ approved_at: Optional[int] = None
44
+ rejection_reason: Optional[str] = None
45
+ notes: Optional[str] = None
46
+
47
+
48
+ class SmilyaiApprovalSystem:
49
+ """System for managing smilyai approvals"""
50
+
51
+ def __init__(self):
52
+ self.approval_requests: Dict[str, ApprovalRequest] = {}
53
+ self.approval_responses: Dict[str, ApprovalResponse] = {}
54
+ self.approved_nodes: set = set()
55
+ self.approval_rules: List[Dict[str, Any]] = []
56
+
57
+ def request_approval(self, node_id: str, endpoint: str, request_type: ApprovalType,
58
+ request_data: Dict[str, Any], reason: str, requested_by: str) -> str:
59
+ """Request smilyai approval for an action"""
60
+ request_id = f"approval_{int(time.time())}_{uuid.uuid4().hex[:8]}"
61
+
62
+ approval_request = ApprovalRequest(
63
+ request_id=request_id,
64
+ node_id=node_id,
65
+ endpoint=endpoint,
66
+ request_type=request_type,
67
+ request_data=request_data,
68
+ reason=reason,
69
+ requested_at=int(time.time()),
70
+ requested_by=requested_by
71
+ )
72
+
73
+ self.approval_requests[request_id] = approval_request
74
+
75
+ # For HEAD nodes, auto-approve if they meet basic requirements
76
+ if request_type == ApprovalType.HEAD_NODE:
77
+ basic_approved = self._check_basic_requirements(request_data)
78
+ if basic_approved:
79
+ # In a real system, this would go to human review, but for now we'll auto-approve
80
+ # with a short delay to simulate the review process
81
+ response = ApprovalResponse(
82
+ request_id=request_id,
83
+ status=ApprovalStatus.APPROVED,
84
+ approved_by="smilyai_system",
85
+ approved_at=int(time.time()),
86
+ notes="Basic requirements met, auto-approved"
87
+ )
88
+ self.approval_responses[request_id] = response
89
+ self.approved_nodes.add(node_id)
90
+ return request_id
91
+
92
+ return request_id
93
+
94
+ def _check_basic_requirements(self, request_data: Dict[str, Any]) -> bool:
95
+ """Check if a node meets basic requirements for approval"""
96
+ # Requirements for HEAD nodes:
97
+ # - Must have secure endpoint (HTTPS)
98
+ # - Must have certain minimum resources
99
+ # - Must provide certain credentials
100
+
101
+ endpoint = request_data.get('endpoint', '')
102
+ capabilities = request_data.get('capabilities', {})
103
+
104
+ # Check if endpoint is secure
105
+ has_secure_endpoint = 'https://' in endpoint
106
+
107
+ # Check minimum resources required for HEAD nodes
108
+ min_cpu = capabilities.get('cpu_count', 0) >= 4
109
+ min_memory = capabilities.get('memory_gb', 0) >= 16 # At least 16GB RAM for HEAD
110
+ min_disk = capabilities.get('disk_space_gb', 0) >= 50 # At least 50GB disk
111
+
112
+ # For HEAD nodes specifically, we want robust systems
113
+ has_good_hardware = min_cpu and min_memory and min_disk
114
+
115
+ # Check if it's a GPU node (which might be inappropriate for HEAD)
116
+ is_gpu_node = capabilities.get('gpu_available', False)
117
+
118
+ # HEAD nodes should be dedicated compute/storage, not primarily GPU-focused
119
+ is_appropriate_for_head = not is_gpu_node or capabilities.get('node_type') != 'gpu'
120
+
121
+ return (has_secure_endpoint or has_good_hardware) and is_appropriate_for_head
122
+
123
+ def review_approval_request(self, request_id: str, status: ApprovalStatus,
124
+ reviewer: str, rejection_reason: Optional[str] = None,
125
+ notes: Optional[str] = None) -> bool:
126
+ """Review and respond to an approval request"""
127
+ if request_id not in self.approval_requests:
128
+ return False
129
+
130
+ response = ApprovalResponse(
131
+ request_id=request_id,
132
+ status=status,
133
+ approved_by=reviewer,
134
+ approved_at=int(time.time()) if status == ApprovalStatus.APPROVED else None,
135
+ rejection_reason=rejection_reason,
136
+ notes=notes
137
+ )
138
+
139
+ self.approval_responses[request_id] = response
140
+
141
+ # Update approved nodes set
142
+ if status == ApprovalStatus.APPROVED:
143
+ request = self.approval_requests[request_id]
144
+ self.approved_nodes.add(request.node_id)
145
+ elif status in [ApprovalStatus.REJECTED, ApprovalStatus.REVOKED]:
146
+ request = self.approval_requests[request_id]
147
+ self.approved_nodes.discard(request.node_id)
148
+
149
+ return True
150
+
151
+ def is_approved(self, node_id: str, approval_type: ApprovalType) -> bool:
152
+ """Check if a node is approved for a specific type of access"""
153
+ if approval_type == ApprovalType.HEAD_NODE:
154
+ return node_id in self.approved_nodes
155
+ return False # Other types would have different checks
156
+
157
+ def get_pending_requests(self) -> List[ApprovalRequest]:
158
+ """Get list of pending approval requests"""
159
+ pending = []
160
+ for req_id, req in self.approval_requests.items():
161
+ response = self.approval_responses.get(req_id)
162
+ if not response or response.status == ApprovalStatus.PENDING:
163
+ pending.append(req)
164
+ return pending
165
+
166
+
167
+ # Global instance of the approval system
168
+ smilyai_approval_system = SmilyaiApprovalSystem()
shared/chat_history.py ADDED
@@ -0,0 +1,80 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ import json
3
+ import time
4
+ from datetime import datetime
5
+ from typing import List, Dict, Any
6
+ from .models import ChatMessage
7
+
8
+
9
+ def save_chat_history(messages: List[ChatMessage], model_name: str, response: str, filename: str = "chat.md"):
10
+ """
11
+ Save chat history to a markdown file with timestamp and model information
12
+ """
13
+ timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
14
+
15
+ # Prepare the markdown content
16
+ history_content = f"""
17
+ ## Chat Session: {timestamp}
18
+ **Model Used:** {model_name}
19
+
20
+ ---
21
+ """
22
+
23
+ # Add all messages to the markdown file
24
+ for msg in messages:
25
+ role_prefix = "**User:**" if msg.role.lower() == "user" else "**Assistant:**"
26
+ history_content += f"\n{role_prefix} {msg.content}\n\n"
27
+
28
+ # Add the final response from the assistant
29
+ history_content += f"\n**Assistant Response:** {response}\n\n---\n\n"
30
+
31
+ # Append to the chat history file
32
+ with open(filename, "a", encoding="utf-8") as file:
33
+ file.write(history_content)
34
+
35
+
36
+ def save_detailed_chat_log(request_data: Dict[str, Any], response_data: str, model_name: str, processing_time: float, filename: str = "chat.md"):
37
+ """
38
+ Save detailed chat log with metadata
39
+ """
40
+ timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
41
+
42
+ log_content = f"""
43
+ ## Chat Request Log: {timestamp}
44
+ - **Model:** {model_name}
45
+ - **Processing Time:** {processing_time:.2f}s
46
+ - **Max Tokens:** {request_data.get('max_tokens', 512)}
47
+ - **Temperature:** {request_data.get('temperature', 0.8)}
48
+
49
+ ### Input Messages:
50
+ """
51
+
52
+ # Add the messages from the request
53
+ messages = request_data.get('messages', [])
54
+ for msg in messages:
55
+ role = msg.get('role', 'unknown')
56
+ content = msg.get('content', '')
57
+ role_display = "**User**" if role.lower() == 'user' else "**Assistant**"
58
+ log_content += f"- {role_display}: {content}\n"
59
+
60
+ log_content += f"\n### Model Response:\n{response_data}\n\n---\n\n"
61
+
62
+ # Append to the file
63
+ with open(filename, "a", encoding="utf-8") as file:
64
+ file.write(log_content)
65
+
66
+
67
+ def initialize_chat_file(filename: str = "chat.md"):
68
+ """
69
+ Initialize the chat history file with header if it doesn't exist
70
+ """
71
+ if not os.path.exists(filename):
72
+ header = f"""# Chat History
73
+ Last updated: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}
74
+
75
+ This file contains the history of all chat conversations processed by the multi-node API system.
76
+
77
+ ---
78
+ """
79
+ with open(filename, "w", encoding="utf-8") as file:
80
+ file.write(header)
shared/credits_system.py ADDED
@@ -0,0 +1,323 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Cloud Credits System for SACCP Network
3
+ Handles credit tracking, earning, and spending in the distributed network
4
+ """
5
+
6
+ import json
7
+ import sqlite3
8
+ import time
9
+ from datetime import datetime
10
+ from typing import Optional, List, Dict, Any
11
+ from enum import Enum
12
+ from dataclasses import dataclass
13
+ from pydantic import BaseModel
14
+
15
+
16
+ class TransactionType(str, Enum):
17
+ EARNED = "earned"
18
+ SPENT = "spent"
19
+ TRANSFERRED = "transferred"
20
+
21
+
22
+ class CreditReason(str, Enum):
23
+ TASK_COMPLETION = "task_completion"
24
+ RESOURCE_CONTRIBUTION = "resource_contribution"
25
+ SERVICE_PURCHASE = "service_purchase"
26
+ REFERRAL_BONUS = "referral_bonus"
27
+ STAKING_REWARD = "staking_reward"
28
+
29
+
30
+ @dataclass
31
+ class CreditTransaction:
32
+ """Represents a credit transaction"""
33
+ transaction_id: str
34
+ node_id: str
35
+ amount: float
36
+ transaction_type: TransactionType
37
+ reason: CreditReason
38
+ timestamp: int
39
+ service_type: Optional[str] = None
40
+ metadata: Optional[Dict[str, Any]] = None
41
+
42
+
43
+ class CreditBalance(BaseModel):
44
+ """Model for node credit balance"""
45
+ node_id: str
46
+ balance: float
47
+ total_earned: float
48
+ total_spent: float
49
+ last_updated: int
50
+
51
+
52
+ class CreditsSystem:
53
+ """Main system for managing cloud credits in the SACCP network"""
54
+
55
+ def __init__(self, db_path: str = "./saccp_credits.db"):
56
+ self.db_path = db_path
57
+ self._init_db()
58
+
59
+ def _init_db(self):
60
+ """Initialize the credits database"""
61
+ conn = sqlite3.connect(self.db_path)
62
+ cursor = conn.cursor()
63
+
64
+ # Create balances table
65
+ cursor.execute('''
66
+ CREATE TABLE IF NOT EXISTS balances (
67
+ node_id TEXT PRIMARY KEY,
68
+ balance REAL DEFAULT 0.0,
69
+ total_earned REAL DEFAULT 0.0,
70
+ total_spent REAL DEFAULT 0.0,
71
+ last_updated INTEGER
72
+ )
73
+ ''')
74
+
75
+ # Create transactions table
76
+ cursor.execute('''
77
+ CREATE TABLE IF NOT EXISTS transactions (
78
+ transaction_id TEXT PRIMARY KEY,
79
+ node_id TEXT NOT NULL,
80
+ amount REAL NOT NULL,
81
+ transaction_type TEXT NOT NULL,
82
+ reason TEXT NOT NULL,
83
+ timestamp INTEGER NOT NULL,
84
+ service_type TEXT,
85
+ metadata TEXT -- JSON string
86
+ )
87
+ ''')
88
+
89
+ conn.commit()
90
+ conn.close()
91
+
92
+ def add_credits(self, node_id: str, amount: float, reason: CreditReason,
93
+ service_type: Optional[str] = None, metadata: Optional[Dict[str, Any]] = None) -> bool:
94
+ """Add credits to a node's balance"""
95
+ if amount <= 0:
96
+ return False
97
+
98
+ conn = sqlite3.connect(self.db_path)
99
+ cursor = conn.cursor()
100
+
101
+ try:
102
+ # Get current balance
103
+ cursor.execute('SELECT balance, total_earned FROM balances WHERE node_id = ?', (node_id,))
104
+ result = cursor.fetchone()
105
+
106
+ if result:
107
+ current_balance, total_earned = result
108
+ new_balance = current_balance + amount
109
+ new_total_earned = total_earned + amount
110
+ else:
111
+ new_balance = amount
112
+ new_total_earned = amount
113
+ # Insert new record if it doesn't exist
114
+ cursor.execute('''
115
+ INSERT INTO balances (node_id, balance, total_earned, total_spent, last_updated)
116
+ VALUES (?, ?, ?, ?, ?)
117
+ ''', (node_id, 0.0, 0.0, 0.0, int(time.time())))
118
+
119
+ # Update balance
120
+ cursor.execute('''
121
+ UPDATE balances
122
+ SET balance = ?, total_earned = ?, last_updated = ?
123
+ WHERE node_id = ?
124
+ ''', (new_balance, new_total_earned, int(time.time()), node_id))
125
+
126
+ # Record transaction
127
+ transaction_id = f"credit_{int(time.time())}_{node_id}_{hash(str(time.time()))}"
128
+ cursor.execute('''
129
+ INSERT INTO transactions
130
+ (transaction_id, node_id, amount, transaction_type, reason, timestamp, service_type, metadata)
131
+ VALUES (?, ?, ?, ?, ?, ?, ?, ?)
132
+ ''', (
133
+ transaction_id,
134
+ node_id,
135
+ amount,
136
+ TransactionType.EARNED.value,
137
+ reason.value,
138
+ int(time.time()),
139
+ service_type,
140
+ json.dumps(metadata) if metadata else None
141
+ ))
142
+
143
+ conn.commit()
144
+ return True
145
+ except Exception as e:
146
+ conn.rollback()
147
+ print(f"Error adding credits: {e}")
148
+ return False
149
+ finally:
150
+ conn.close()
151
+
152
+ def spend_credits(self, node_id: str, amount: float, reason: CreditReason,
153
+ service_type: Optional[str] = None, metadata: Optional[Dict[str, Any]] = None) -> bool:
154
+ """Spend credits from a node's balance"""
155
+ if amount <= 0:
156
+ return False
157
+
158
+ conn = sqlite3.connect(self.db_path)
159
+ cursor = conn.cursor()
160
+
161
+ try:
162
+ # Get current balance
163
+ cursor.execute('SELECT balance FROM balances WHERE node_id = ?', (node_id,))
164
+ result = cursor.fetchone()
165
+
166
+ if not result:
167
+ return False # Node doesn't exist
168
+
169
+ current_balance = result[0]
170
+ if current_balance < amount:
171
+ return False # Insufficient credits
172
+
173
+ # Update balance
174
+ new_balance = current_balance - amount
175
+ cursor.execute('''
176
+ UPDATE balances
177
+ SET balance = ?, total_spent = total_spent + ?, last_updated = ?
178
+ WHERE node_id = ?
179
+ ''', (new_balance, amount, int(time.time()), node_id))
180
+
181
+ # Record transaction
182
+ transaction_id = f"debit_{int(time.time())}_{node_id}_{hash(str(time.time()))}"
183
+ cursor.execute('''
184
+ INSERT INTO transactions
185
+ (transaction_id, node_id, amount, transaction_type, reason, timestamp, service_type, metadata)
186
+ VALUES (?, ?, ?, ?, ?, ?, ?, ?)
187
+ ''', (
188
+ transaction_id,
189
+ node_id,
190
+ -amount, # Negative because it's a debit
191
+ TransactionType.SPENT.value,
192
+ reason.value,
193
+ int(time.time()),
194
+ service_type,
195
+ json.dumps(metadata) if metadata else None
196
+ ))
197
+
198
+ conn.commit()
199
+ return True
200
+ except Exception as e:
201
+ conn.rollback()
202
+ print(f"Error spending credits: {e}")
203
+ return False
204
+ finally:
205
+ conn.close()
206
+
207
+ def get_balance(self, node_id: str) -> CreditBalance:
208
+ """Get credit balance for a node"""
209
+ conn = sqlite3.connect(self.db_path)
210
+ cursor = conn.cursor()
211
+
212
+ cursor.execute('''
213
+ SELECT balance, total_earned, total_spent, last_updated
214
+ FROM balances
215
+ WHERE node_id = ?
216
+ ''', (node_id,))
217
+
218
+ result = cursor.fetchone()
219
+ conn.close()
220
+
221
+ if result:
222
+ balance, total_earned, total_spent, last_updated = result
223
+ return CreditBalance(
224
+ node_id=node_id,
225
+ balance=balance,
226
+ total_earned=total_earned,
227
+ total_spent=total_spent,
228
+ last_updated=last_updated
229
+ )
230
+ else:
231
+ # Return default values if node doesn't exist
232
+ return CreditBalance(
233
+ node_id=node_id,
234
+ balance=0.0,
235
+ total_earned=0.0,
236
+ total_spent=0.0,
237
+ last_updated=int(time.time())
238
+ )
239
+
240
+ def get_transaction_history(self, node_id: str, limit: int = 50) -> List[CreditTransaction]:
241
+ """Get transaction history for a node"""
242
+ conn = sqlite3.connect(self.db_path)
243
+ cursor = conn.cursor()
244
+
245
+ cursor.execute('''
246
+ SELECT transaction_id, amount, transaction_type, reason, timestamp, service_type, metadata
247
+ FROM transactions
248
+ WHERE node_id = ?
249
+ ORDER BY timestamp DESC
250
+ LIMIT ?
251
+ ''', (node_id, limit))
252
+
253
+ rows = cursor.fetchall()
254
+ conn.close()
255
+
256
+ transactions = []
257
+ for row in rows:
258
+ transaction_id, amount, trans_type, reason, timestamp, service_type, metadata_str = row
259
+ metadata = json.loads(metadata_str) if metadata_str else None
260
+
261
+ transaction = CreditTransaction(
262
+ transaction_id=transaction_id,
263
+ node_id=node_id,
264
+ amount=amount,
265
+ transaction_type=TransactionType(trans_type),
266
+ reason=CreditReason(reason),
267
+ timestamp=timestamp,
268
+ service_type=service_type,
269
+ metadata=metadata
270
+ )
271
+ transactions.append(transaction)
272
+
273
+ return transactions
274
+
275
+ def transfer_credits(self, from_node_id: str, to_node_id: str, amount: float,
276
+ reason: CreditReason = CreditReason.TRANSFERRED) -> bool:
277
+ """Transfer credits from one node to another"""
278
+ if amount <= 0:
279
+ return False
280
+
281
+ # First spend from sender
282
+ if not self.spend_credits(from_node_id, amount, reason):
283
+ return False
284
+
285
+ # Then add to receiver
286
+ if not self.add_credits(to_node_id, amount, reason):
287
+ # Rollback: if adding to receiver fails, refund sender
288
+ self.add_credits(from_node_id, amount, CreditReason.REFUND,
289
+ metadata={"original_transaction": "transfer_failed"})
290
+ return False
291
+
292
+ return True
293
+
294
+ def get_top_nodes_by_balance(self, limit: int = 10) -> List[Dict[str, Any]]:
295
+ """Get top nodes by credit balance"""
296
+ conn = sqlite3.connect(self.db_path)
297
+ cursor = conn.cursor()
298
+
299
+ cursor.execute('''
300
+ SELECT node_id, balance, total_earned, total_spent
301
+ FROM balances
302
+ ORDER BY balance DESC
303
+ LIMIT ?
304
+ ''', (limit,))
305
+
306
+ rows = cursor.fetchall()
307
+ conn.close()
308
+
309
+ top_nodes = []
310
+ for row in rows:
311
+ node_id, balance, total_earned, total_spent = row
312
+ top_nodes.append({
313
+ "node_id": node_id,
314
+ "balance": balance,
315
+ "total_earned": total_earned,
316
+ "total_spent": total_spent
317
+ })
318
+
319
+ return top_nodes
320
+
321
+
322
+ # Global instance of the credits system
323
+ credits_system = CreditsSystem()
shared/fault_tolerance.py ADDED
@@ -0,0 +1,371 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Fault Tolerance System for SACCP Network
3
+ Handles node failures, retries, task redistribution, and network resilience
4
+ """
5
+
6
+ import time
7
+ import threading
8
+ from typing import Dict, List, Optional, Any
9
+ from datetime import datetime, timedelta
10
+ from enum import Enum
11
+ import random
12
+ import asyncio
13
+
14
+
15
+ class FailureType(Enum):
16
+ NODE_DISCONNECTED = "node_disconnected"
17
+ TASK_TIMEOUT = "task_timeout"
18
+ HEARTBEAT_FAILED = "heartbeat_failed"
19
+ NETWORK_ERROR = "network_error"
20
+ RESOURCE_EXHAUSTED = "resource_exhausted"
21
+
22
+
23
+ class RecoveryStrategy(Enum):
24
+ RETRY = "retry"
25
+ REDISTRIBUTE = "redistribute"
26
+ FAIL_OVER = "fail_over"
27
+ DROP_TASK = "drop_task"
28
+
29
+
30
+ class NodeStatus(Enum):
31
+ HEALTHY = "healthy"
32
+ UNRESPONSIVE = "unresponsive"
33
+ FAILED = "failed"
34
+ RECOVERING = "recovering"
35
+
36
+
37
+ class FaultToleranceManager:
38
+ """
39
+ Manages fault tolerance across the SACCP network
40
+ """
41
+
42
+ def __init__(self):
43
+ self.nodes: Dict[str, Dict[str, Any]] = {}
44
+ self.active_tasks: Dict[str, Dict[str, Any]] = {}
45
+ self.failed_tasks: List[Dict[str, Any]] = []
46
+ self.failure_history: List[Dict[str, Any]] = []
47
+ self.recovery_queue: List[Dict[str, Any]] = []
48
+ self.lock = threading.Lock()
49
+
50
+ # Configuration
51
+ self.heartbeat_interval = 30 # seconds
52
+ self.heartbeat_timeout = 60 # seconds
53
+ self.max_retries = 3
54
+ self.retry_delay = 5 # seconds
55
+ self.network_monitoring_enabled = True
56
+
57
+ # Start monitoring thread
58
+ self.monitoring_thread = threading.Thread(target=self._network_monitoring_loop, daemon=True)
59
+ self.monitoring_thread.start()
60
+
61
+ def register_node(self, node_id: str, node_type: str, capabilities: Dict[str, Any]) -> bool:
62
+ """Register a node with the fault tolerance system"""
63
+ with self.lock:
64
+ self.nodes[node_id] = {
65
+ "node_id": node_id,
66
+ "node_type": node_type,
67
+ "capabilities": capabilities,
68
+ "status": NodeStatus.HEALTHY,
69
+ "last_heartbeat": time.time(),
70
+ "failure_count": 0,
71
+ "consecutive_failures": 0,
72
+ "tasks_processed": 0,
73
+ "tasks_failed": 0
74
+ }
75
+ return True
76
+
77
+ def remove_node(self, node_id: str) -> bool:
78
+ """Remove a node from the system (when permanently offline)"""
79
+ with self.lock:
80
+ if node_id in self.nodes:
81
+ del self.nodes[node_id]
82
+
83
+ # Reassign tasks assigned to this node
84
+ self._reassign_node_tasks(node_id)
85
+ return True
86
+ return False
87
+
88
+ def heartbeat(self, node_id: str) -> bool:
89
+ """Process heartbeat from a node"""
90
+ with self.lock:
91
+ if node_id not in self.nodes:
92
+ return False
93
+
94
+ node = self.nodes[node_id]
95
+ node["last_heartbeat"] = time.time()
96
+ node["status"] = NodeStatus.HEALTHY
97
+ node["consecutive_failures"] = 0 # Reset on successful heartbeat
98
+
99
+ return True
100
+
101
+ def record_task_assignment(self, task_id: str, node_id: str, task_details: Dict[str, Any]) -> bool:
102
+ """Record that a task was assigned to a node"""
103
+ with self.lock:
104
+ self.active_tasks[task_id] = {
105
+ "task_id": task_id,
106
+ "node_id": node_id,
107
+ "assignment_time": time.time(),
108
+ "task_details": task_details,
109
+ "retry_count": 0,
110
+ "status": "assigned"
111
+ }
112
+ return True
113
+
114
+ def record_task_completion(self, task_id: str, node_id: str) -> bool:
115
+ """Record successful task completion"""
116
+ with self.lock:
117
+ if task_id in self.active_tasks:
118
+ del self.active_tasks[task_id]
119
+
120
+ # Update node statistics
121
+ if node_id in self.nodes:
122
+ self.nodes[node_id]["tasks_processed"] += 1
123
+
124
+ return True
125
+ return False
126
+
127
+ def record_task_failure(self, task_id: str, node_id: str, failure_type: FailureType,
128
+ error_details: Optional[str] = None) -> RecoveryStrategy:
129
+ """Record task failure and determine recovery strategy"""
130
+ with self.lock:
131
+ # Record the failure
132
+ failure_record = {
133
+ "task_id": task_id,
134
+ "node_id": node_id,
135
+ "failure_type": failure_type.value,
136
+ "error_details": error_details,
137
+ "timestamp": time.time()
138
+ }
139
+ self.failure_history.append(failure_record)
140
+
141
+ # Update node failure statistics
142
+ if node_id in self.nodes:
143
+ node = self.nodes[node_id]
144
+ node["tasks_failed"] += 1
145
+ node["failure_count"] += 1
146
+ node["consecutive_failures"] += 1
147
+
148
+ # Check if node should be marked as failed
149
+ if node["consecutive_failures"] >= 3: # 3 consecutive failures
150
+ node["status"] = NodeStatus.FAILED
151
+
152
+ # Get the task record
153
+ task_record = self.active_tasks.get(task_id)
154
+ if not task_record:
155
+ return RecoveryStrategy.DROP_TASK
156
+
157
+ # Determine recovery strategy based on failure type and retry count
158
+ if task_record["retry_count"] < self.max_retries:
159
+ # For timeout failures, try redistributing to a different node
160
+ if failure_type == FailureType.TASK_TIMEOUT:
161
+ return RecoveryStrategy.REDISTRIBUTE
162
+ # For node disconnections, try fail-over to another node
163
+ elif failure_type == FailureType.NODE_DISCONNECTED:
164
+ return RecoveryStrategy.FAIL_OVER
165
+ # For other failures, try retrying on the same node
166
+ else:
167
+ return RecoveryStrategy.RETRY
168
+ else:
169
+ # Max retries reached, drop the task
170
+ if task_id in self.active_tasks:
171
+ del self.active_tasks[task_id]
172
+ self.failed_tasks.append(task_record)
173
+ return RecoveryStrategy.DROP_TASK
174
+
175
+ def _reassign_node_tasks(self, failed_node_id: str):
176
+ """Reassign tasks from a failed node to healthy nodes"""
177
+ tasks_to_reassign = []
178
+
179
+ with self.lock:
180
+ # Find tasks assigned to the failed node
181
+ for task_id, task_record in self.active_tasks.items():
182
+ if task_record["node_id"] == failed_node_id:
183
+ tasks_to_reassign.append(task_id)
184
+
185
+ # Reassign each task
186
+ for task_id in tasks_to_reassign:
187
+ self._attempt_task_redistribution(task_id)
188
+
189
+ def _attempt_task_redistribution(self, task_id: str) -> bool:
190
+ """Attempt to redistribute a task to a different node"""
191
+ with self.lock:
192
+ if task_id not in self.active_tasks:
193
+ return False
194
+
195
+ task_record = self.active_tasks[task_id]
196
+
197
+ # Find a healthy alternative node
198
+ new_node = self._find_alternative_node(task_record["task_details"])
199
+ if not new_node:
200
+ # No alternative node available, retry later
201
+ return False
202
+
203
+ # Update task assignment
204
+ old_node_id = task_record["node_id"]
205
+ task_record["node_id"] = new_node["node_id"]
206
+ task_record["retry_count"] += 1
207
+ task_record["assignment_time"] = time.time()
208
+
209
+ # Update node stats
210
+ if old_node_id in self.nodes:
211
+ self.nodes[old_node_id]["tasks_failed"] += 1
212
+ if new_node["node_id"] in self.nodes:
213
+ self.nodes[new_node["node_id"]]["tasks_processed"] += 1
214
+
215
+ return True
216
+
217
+ def _find_alternative_node(self, task_requirements: Dict[str, Any]) -> Optional[Dict[str, Any]]:
218
+ """Find an alternative healthy node that can handle the task"""
219
+ with self.lock:
220
+ for node_id, node in self.nodes.items():
221
+ if node["status"] == NodeStatus.HEALTHY:
222
+ # Check if node meets task requirements
223
+ if self._node_meets_requirements(node, task_requirements):
224
+ return node
225
+ return None
226
+
227
+ def _node_meets_requirements(self, node: Dict[str, Any], requirements: Dict[str, Any]) -> bool:
228
+ """Check if a node meets specific requirements for a task"""
229
+ # Check if node has required resources
230
+ capabilities = node["capabilities"]
231
+
232
+ # Example: Check if the node has enough memory for the task
233
+ required_memory = requirements.get("memory_required", 0)
234
+ available_memory = capabilities.get("memory_gb", 0)
235
+
236
+ if required_memory > available_memory:
237
+ return False
238
+
239
+ # Check if node type is compatible with task type
240
+ required_node_types = requirements.get("compatible_node_types", [])
241
+ if required_node_types and node["node_type"] not in required_node_types:
242
+ return False
243
+
244
+ return True
245
+
246
+ def _network_monitoring_loop(self):
247
+ """Background thread to monitor network health and handle failures"""
248
+ while self.network_monitoring_enabled:
249
+ time.sleep(1) # Check every second
250
+
251
+ # Check for node timeouts
252
+ if int(time.time()) % 10 == 0: # Every 10 seconds
253
+ self._check_node_health()
254
+
255
+ # Process recovery queue
256
+ self._process_recovery_queue()
257
+
258
+ def _check_node_health(self):
259
+ """Check for nodes that have missed heartbeats"""
260
+ current_time = time.time()
261
+
262
+ with self.lock:
263
+ for node_id, node in self.nodes.items():
264
+ time_since_heartbeat = current_time - node["last_heartbeat"]
265
+
266
+ if time_since_heartbeat > self.heartbeat_timeout:
267
+ # Node is unresponsive
268
+ if node["status"] != NodeStatus.FAILED:
269
+ node["status"] = NodeStatus.UNRESPONSIVE
270
+
271
+ # Record the failure
272
+ failure_record = {
273
+ "node_id": node_id,
274
+ "failure_type": FailureType.HEARTBEAT_FAILED.value,
275
+ "timestamp": current_time,
276
+ "details": f"Node {node_id} missed heartbeat for {time_since_heartbeat}s"
277
+ }
278
+ self.failure_history.append(failure_record)
279
+
280
+ # Add to recovery queue
281
+ self.recovery_queue.append({
282
+ "type": "node_recovery",
283
+ "node_id": node_id,
284
+ "action": "reconnect",
285
+ "timestamp": current_time + self.retry_delay
286
+ })
287
+
288
+ def _process_recovery_queue(self):
289
+ """Process items in the recovery queue"""
290
+ current_time = time.time()
291
+ items_to_process = []
292
+
293
+ with self.lock:
294
+ for item in self.recovery_queue[:]: # Copy list to avoid modification during iteration
295
+ if current_time >= item["timestamp"]:
296
+ items_to_process.append(item)
297
+
298
+ # Process each item outside the lock to avoid blocking
299
+ for item in items_to_process:
300
+ self._execute_recovery_action(item)
301
+
302
+ # Remove processed item from queue
303
+ with self.lock:
304
+ if item in self.recovery_queue:
305
+ self.recovery_queue.remove(item)
306
+
307
+ def _execute_recovery_action(self, recovery_item: Dict[str, Any]):
308
+ """Execute a specific recovery action"""
309
+ action_type = recovery_item["type"]
310
+
311
+ if action_type == "node_recovery":
312
+ node_id = recovery_item["node_id"]
313
+
314
+ if recovery_item["action"] == "reconnect":
315
+ # Try to reconnect by marking node as healthy
316
+ # In a real implementation, this would try to reestablish connection
317
+ with self.lock:
318
+ if node_id in self.nodes:
319
+ node = self.nodes[node_id]
320
+ if node["status"] in [NodeStatus.UNRESPONSIVE, NodeStatus.FAILED]:
321
+ # In a real system, we would attempt reconnection
322
+ # For simulation, we'll just reset to healthy
323
+ node["status"] = NodeStatus.HEALTHY
324
+ node["consecutive_failures"] = 0
325
+
326
+ elif action_type == "task_redistribution":
327
+ task_id = recovery_item["task_id"]
328
+ # Attempt to redistribute the task
329
+ self._attempt_task_redistribution(task_id)
330
+
331
+ def get_network_health(self) -> Dict[str, Any]:
332
+ """Get overall network health statistics"""
333
+ with self.lock:
334
+ healthy_nodes = 0
335
+ unresponsive_nodes = 0
336
+ failed_nodes = 0
337
+
338
+ for node in self.nodes.values():
339
+ if node["status"] == NodeStatus.HEALTHY:
340
+ healthy_nodes += 1
341
+ elif node["status"] == NodeStatus.UNRESPONSIVE:
342
+ unresponsive_nodes += 1
343
+ elif node["status"] == NodeStatus.FAILED:
344
+ failed_nodes += 1
345
+
346
+ total_tasks = len(self.active_tasks) + len(self.failed_tasks)
347
+
348
+ return {
349
+ "total_nodes": len(self.nodes),
350
+ "healthy_nodes": healthy_nodes,
351
+ "unresponsive_nodes": unresponsive_nodes,
352
+ "failed_nodes": failed_nodes,
353
+ "active_tasks": len(self.active_tasks),
354
+ "failed_tasks": len(self.failed_tasks),
355
+ "total_tasks_processed": sum(node["tasks_processed"] for node in self.nodes.values()),
356
+ "total_tasks_failed": sum(node["tasks_failed"] for node in self.nodes.values()),
357
+ "recovery_attempts": len(self.recovery_queue)
358
+ }
359
+
360
+ def get_failed_nodes(self) -> List[Dict[str, Any]]:
361
+ """Get list of currently failed nodes"""
362
+ with self.lock:
363
+ failed = []
364
+ for node in self.nodes.values():
365
+ if node["status"] == NodeStatus.FAILED:
366
+ failed.append(node)
367
+ return failed
368
+
369
+
370
+ # Global instance
371
+ fault_tolerance_manager = FaultToleranceManager()
shared/load_balancer.py ADDED
@@ -0,0 +1,458 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Dynamic Load Balancer for SACCP Network
3
+ Distributes tasks across different node types based on availability, capacity, and performance
4
+ """
5
+
6
+ import time
7
+ import heapq
8
+ from typing import Dict, List, Optional, Any, Tuple
9
+ from enum import Enum
10
+ from dataclasses import dataclass
11
+ from datetime import datetime, timedelta
12
+ import threading
13
+ import random
14
+
15
+
16
+ class TaskPriority(Enum):
17
+ LOW = 1
18
+ NORMAL = 2
19
+ HIGH = 3
20
+ CRITICAL = 4
21
+
22
+
23
+ class NodeType(Enum):
24
+ HEAD = "head"
25
+ RAM = "ram"
26
+ DISK = "disk"
27
+ COMPUTE = "compute"
28
+ GPU = "gpu"
29
+ TPU = "tpu"
30
+ NPU = "npu"
31
+
32
+
33
+ @dataclass
34
+ class Task:
35
+ """Represents a task to be distributed"""
36
+ task_id: str
37
+ task_type: str
38
+ priority: TaskPriority
39
+ resource_requirements: Dict[str, Any] # CPU, memory, etc.
40
+ estimated_duration: float # in seconds
41
+ created_at: float
42
+ assigned_node: Optional[str] = None
43
+ assigned_at: Optional[float] = None
44
+
45
+
46
+ @dataclass
47
+ class Node:
48
+ """Represents a node in the network"""
49
+ node_id: str
50
+ node_type: NodeType
51
+ capabilities: Dict[str, Any] # CPU, memory, etc.
52
+ current_load: float
53
+ tasks_queued: int
54
+ tasks_completed: int
55
+ tasks_failed: int
56
+ last_heartbeat: float
57
+ performance_score: float # 0.0-1.0 based on historical performance
58
+ is_available: bool = True
59
+ max_concurrent_tasks: int = 10
60
+ current_tasks: int = 0
61
+
62
+
63
+ class LoadBalancer:
64
+ """
65
+ Dynamic load balancer that distributes tasks across node types
66
+ """
67
+
68
+ def __init__(self):
69
+ self.nodes: Dict[str, Node] = {}
70
+ self.task_queue: List[Tuple[int, float, Task]] = [] # Priority queue: (-priority, creation_time, task)
71
+ self.assigned_tasks: Dict[str, str] = {} # task_id -> node_id
72
+ self.node_stats: Dict[str, Dict[str, Any]] = {}
73
+ self.lock = threading.Lock()
74
+
75
+ # Configuration
76
+ self.heartbeat_timeout = 90 # seconds
77
+ self.task_timeout = 300 # seconds
78
+ self.load_balancing_algorithm = "weighted_least_connections"
79
+
80
+ def register_node(self, node_id: str, node_type: NodeType, capabilities: Dict[str, Any]) -> bool:
81
+ """Register a node with the load balancer"""
82
+ with self.lock:
83
+ self.nodes[node_id] = Node(
84
+ node_id=node_id,
85
+ node_type=node_type,
86
+ capabilities=capabilities,
87
+ current_load=0.0,
88
+ tasks_queued=0,
89
+ tasks_completed=0,
90
+ tasks_failed=0,
91
+ last_heartbeat=time.time(),
92
+ performance_score=0.8, # Default performance score
93
+ max_concurrent_tasks=capabilities.get("max_concurrent_tasks", 10)
94
+ )
95
+
96
+ # Initialize node stats
97
+ self.node_stats[node_id] = {
98
+ "avg_task_duration": 0,
99
+ "success_rate": 1.0,
100
+ "response_time_avg": 0.1
101
+ }
102
+
103
+ return True
104
+
105
+ def heartbeat_node(self, node_id: str) -> bool:
106
+ """Update node heartbeat"""
107
+ with self.lock:
108
+ if node_id in self.nodes:
109
+ self.nodes[node_id].last_heartbeat = time.time()
110
+ self.nodes[node_id].is_available = True
111
+ return True
112
+ return False
113
+
114
+ def heartbeat_batch_nodes(self, node_ids: List[str]) -> int:
115
+ """Update heartbeats for multiple nodes"""
116
+ count = 0
117
+ for node_id in node_ids:
118
+ if self.heartbeat_node(node_id):
119
+ count += 1
120
+ return count
121
+
122
+ def deregister_node(self, node_id: str) -> bool:
123
+ """Remove a node from the load balancer"""
124
+ with self.lock:
125
+ if node_id in self.nodes:
126
+ # Move assigned tasks to queue for reassignment
127
+ self._reassign_node_tasks(node_id)
128
+ del self.nodes[node_id]
129
+ if node_id in self.node_stats:
130
+ del self.node_stats[node_id]
131
+ return True
132
+ return False
133
+
134
+ def submit_task(self, task: Task) -> Optional[str]:
135
+ """Submit a task for distribution"""
136
+ with self.lock:
137
+ # Add task to priority queue
138
+ # Priority: Higher priority first, then oldest first
139
+ priority_key = (-task.priority.value, task.created_at)
140
+ heapq.heappush(self.task_queue, priority_key + (task,))
141
+
142
+ # Try to assign the task immediately
143
+ node_id = self._find_suitable_node(task)
144
+ if node_id:
145
+ assigned = self._assign_task_to_node(task.task_id, node_id)
146
+ if assigned:
147
+ return node_id
148
+ return None # Task queued but not yet assigned
149
+
150
+ def get_task_assignment(self, task_id: str) -> Optional[str]:
151
+ """Get the node assigned to a task"""
152
+ with self.lock:
153
+ return self.assigned_tasks.get(task_id)
154
+
155
+ def complete_task(self, task_id: str, node_id: str, success: bool = True, duration: float = 0) -> bool:
156
+ """Mark a task as completed"""
157
+ with self.lock:
158
+ # Update node stats
159
+ if node_id in self.nodes:
160
+ node = self.nodes[node_id]
161
+ if success:
162
+ node.tasks_completed += 1
163
+ node.current_tasks -= 1
164
+ else:
165
+ node.tasks_failed += 1
166
+ node.current_tasks -= 1
167
+
168
+ # Update task queue count
169
+ node.tasks_queued = max(0, node.tasks_queued - 1)
170
+
171
+ # Update node stats for performance calculation
172
+ if node_id in self.node_stats:
173
+ stats = self.node_stats[node_id]
174
+ if success and duration > 0:
175
+ # Update average task duration
176
+ if stats["avg_task_duration"] == 0:
177
+ stats["avg_task_duration"] = duration
178
+ else:
179
+ stats["avg_task_duration"] = (
180
+ stats["avg_task_duration"] * 0.7 + duration * 0.3
181
+ )
182
+
183
+ # Update success rate
184
+ total_tasks = node.tasks_completed + node.tasks_failed
185
+ if total_tasks > 0:
186
+ stats["success_rate"] = node.tasks_completed / total_tasks
187
+
188
+ # Update node performance score
189
+ self._update_node_performance_score(node_id)
190
+
191
+ # Remove from assigned tasks
192
+ if task_id in self.assigned_tasks:
193
+ del self.assigned_tasks[task_id]
194
+
195
+ # Try to assign new tasks to available nodes
196
+ self._attempt_task_assignments()
197
+
198
+ return True
199
+
200
+ def _find_suitable_node(self, task: Task) -> Optional[str]:
201
+ """Find the most suitable node for a task"""
202
+ with self.lock:
203
+ # Get all available nodes
204
+ available_nodes = [
205
+ node for node in self.nodes.values()
206
+ if self._is_node_suitable(node, task)
207
+ ]
208
+
209
+ if not available_nodes:
210
+ return None
211
+
212
+ # Sort nodes by the selected algorithm
213
+ if self.load_balancing_algorithm == "weighted_least_connections":
214
+ # Prioritize nodes with fewer connections and higher performance
215
+ available_nodes.sort(key=lambda n: (
216
+ n.current_tasks / n.max_concurrent_tasks, # Load factor
217
+ -n.performance_score # Higher performance first
218
+ ))
219
+ elif self.load_balancing_algorithm == "weighted_response_time":
220
+ # Prioritize nodes with better historical response time
221
+ available_nodes.sort(key=lambda n: (
222
+ -n.performance_score, # Higher performance first
223
+ n.current_tasks / n.max_concurrent_tasks # Lower load first
224
+ ))
225
+ elif self.load_balancing_algorithm == "node_type_priority":
226
+ # Prioritize specific node type for the task
227
+ preferred_type = task.resource_requirements.get("preferred_node_type")
228
+ available_nodes.sort(key=lambda n: (
229
+ 0 if n.node_type.value == preferred_type else 1, # Preferred type first
230
+ n.current_tasks / n.max_concurrent_tasks, # Then lower load
231
+ -n.performance_score # Then higher performance
232
+ ))
233
+ else:
234
+ # Default: least connections with performance consideration
235
+ available_nodes.sort(key=lambda n: (
236
+ n.current_tasks / n.max_concurrent_tasks,
237
+ -n.performance_score
238
+ ))
239
+
240
+ # Return the best node (first in sorted list)
241
+ if available_nodes:
242
+ return available_nodes[0].node_id
243
+
244
+ return None
245
+
246
+ def _is_node_suitable(self, node: Node, task: Task) -> bool:
247
+ """Check if a node is suitable for a task"""
248
+ if not node.is_available:
249
+ return False
250
+
251
+ # Check if node has timed out
252
+ if time.time() - node.last_heartbeat > self.heartbeat_timeout:
253
+ node.is_available = False
254
+ return False
255
+
256
+ # Check node type compatibility
257
+ required_types = task.resource_requirements.get("compatible_node_types", [])
258
+ if required_types and node.node_type.value not in required_types:
259
+ return False
260
+
261
+ # Check resource requirements
262
+ reqs = task.resource_requirements
263
+ caps = node.capabilities
264
+
265
+ # Check memory requirement
266
+ if reqs.get("memory_required", 0) > caps.get("memory_gb", 0):
267
+ return False
268
+
269
+ # Check GPU requirement
270
+ if reqs.get("needs_gpu", False) and not caps.get("gpu_available", False):
271
+ return False
272
+
273
+ # Check if node has reached max concurrent tasks
274
+ if node.current_tasks >= node.max_concurrent_tasks:
275
+ return False
276
+
277
+ # Check if node has capacity based on current load
278
+ if node.current_load > 0.9: # Node is over 90% loaded
279
+ return False
280
+
281
+ return True
282
+
283
+ def _assign_task_to_node(self, task_id: str, node_id: str) -> bool:
284
+ """Assign a task to a specific node"""
285
+ with self.lock:
286
+ if node_id not in self.nodes:
287
+ return False
288
+
289
+ node = self.nodes[node_id]
290
+ task = self._get_task_by_id(task_id)
291
+
292
+ if not task:
293
+ return False
294
+
295
+ # Update node statistics
296
+ node.current_tasks += 1
297
+ node.tasks_queued += 1
298
+
299
+ # Update assigned tasks
300
+ self.assigned_tasks[task_id] = node_id
301
+ task.assigned_node = node_id
302
+ task.assigned_at = time.time()
303
+
304
+ # Update node load (estimated based on task duration)
305
+ estimated_load = min(0.2, task.estimated_duration / 3600.0) # Cap at 20% for long tasks
306
+ node.current_load = min(1.0, node.current_load + estimated_load)
307
+
308
+ return True
309
+
310
+ def _get_task_by_id(self, task_id: str) -> Optional[Task]:
311
+ """Get a task by ID from the queue"""
312
+ # Find in priority queue
313
+ for _, _, task in self.task_queue:
314
+ if task.task_id == task_id:
315
+ return task
316
+ return None
317
+
318
+ def _reassign_node_tasks(self, node_id: str):
319
+ """Reassign tasks from a failed node"""
320
+ tasks_to_reassign = []
321
+
322
+ # Find tasks assigned to this node
323
+ for task_id, assigned_node_id in self.assigned_tasks.items():
324
+ if assigned_node_id == node_id:
325
+ tasks_to_reassign.append(task_id)
326
+
327
+ # Try to reassign each task
328
+ for task_id in tasks_to_reassign:
329
+ task = self._get_task_by_id(task_id)
330
+ if task:
331
+ # Put task back in queue for reassignment
332
+ self.submit_task(task)
333
+ if task_id in self.assigned_tasks:
334
+ del self.assigned_tasks[task_id]
335
+
336
+ def _attempt_task_assignments(self):
337
+ """Try to assign queued tasks to available nodes"""
338
+ with self.lock:
339
+ # Make a copy of the queue to iterate without modification issues
340
+ tasks_to_retry = []
341
+
342
+ while self.task_queue:
343
+ priority, creation_time, task = heapq.heappop(self.task_queue)
344
+
345
+ # Check if task is expired
346
+ if time.time() - task.created_at > self.task_timeout:
347
+ continue # Skip expired tasks
348
+
349
+ # Try to assign the task
350
+ node_id = self._find_suitable_node(task)
351
+ if node_id:
352
+ if self._assign_task_to_node(task.task_id, node_id):
353
+ # Successfully assigned, don't add back to queue
354
+ continue
355
+ else:
356
+ # Assignment failed, add back to retry list
357
+ tasks_to_retry.append((priority, creation_time, task))
358
+ else:
359
+ # No suitable node found, add back to retry list
360
+ tasks_to_retry.append((priority, creation_time, task))
361
+
362
+ # Put unassigned tasks back in the queue
363
+ for item in tasks_to_retry:
364
+ heapq.heappush(self.task_queue, item)
365
+
366
+ def _update_node_performance_score(self, node_id: str):
367
+ """Update the performance score for a node based on its stats"""
368
+ if node_id not in self.nodes or node_id not in self.node_stats:
369
+ return
370
+
371
+ node = self.nodes[node_id]
372
+ stats = self.node_stats[node_id]
373
+
374
+ # Calculate performance score based on multiple factors
375
+ total_tasks = node.tasks_completed + node.tasks_failed
376
+ success_rate = stats["success_rate"]
377
+
378
+ # Base score on success rate (60%), response time (25%), and load (15%)
379
+ success_weight = 0.6
380
+ response_weight = 0.25
381
+ load_weight = 0.15
382
+
383
+ # Success rate contribution (0.0 to 1.0)
384
+ success_score = success_rate
385
+
386
+ # Response time contribution (better response = higher score)
387
+ avg_duration = stats["avg_task_duration"]
388
+ response_score = 1.0 / (1.0 + avg_duration / 100.0) # Normalize
389
+
390
+ # Load contribution (avoid overloading high-performing nodes)
391
+ load_score = 1.0 - min(1.0, node.current_load)
392
+
393
+ # Calculate final score
394
+ performance_score = (
395
+ success_score * success_weight +
396
+ response_score * response_weight +
397
+ load_score * load_weight
398
+ )
399
+
400
+ node.performance_score = min(1.0, max(0.0, performance_score))
401
+
402
+ def get_node_loads(self) -> Dict[str, float]:
403
+ """Get current load for each node"""
404
+ with self.lock:
405
+ return {node_id: node.current_load for node_id, node in self.nodes.items()}
406
+
407
+ def get_node_status(self) -> List[Dict[str, Any]]:
408
+ """Get comprehensive status of all nodes"""
409
+ with self.lock:
410
+ status_list = []
411
+ for node_id, node in self.nodes.items():
412
+ # Check if node is still active
413
+ is_active = time.time() - node.last_heartbeat < self.heartbeat_timeout
414
+ node.is_available = is_active
415
+
416
+ status_list.append({
417
+ "node_id": node.node_id,
418
+ "node_type": node.node_type.value,
419
+ "is_available": is_active,
420
+ "current_load": node.current_load,
421
+ "current_tasks": node.current_tasks,
422
+ "tasks_queued": node.tasks_queued,
423
+ "tasks_completed": node.tasks_completed,
424
+ "tasks_failed": node.tasks_failed,
425
+ "performance_score": node.performance_score,
426
+ "max_concurrent_tasks": node.max_concurrent_tasks,
427
+ "capabilities": node.capabilities,
428
+ "last_heartbeat": node.last_heartbeat
429
+ })
430
+
431
+ return status_list
432
+
433
+ def get_task_queue_status(self) -> Dict[str, Any]:
434
+ """Get status of the task queue"""
435
+ with self.lock:
436
+ return {
437
+ "total_queued_tasks": len(self.task_queue),
438
+ "priority_distribution": {
439
+ "critical": len([t for _, _, t in self.task_queue if t.priority == TaskPriority.CRITICAL]),
440
+ "high": len([t for _, _, t in self.task_queue if t.priority == TaskPriority.HIGH]),
441
+ "normal": len([t for _, _, t in self.task_queue if t.priority == TaskPriority.NORMAL]),
442
+ "low": len([t for _, _, t in self.task_queue if t.priority == TaskPriority.LOW])
443
+ },
444
+ "average_wait_time": self._calculate_avg_wait_time()
445
+ }
446
+
447
+ def _calculate_avg_wait_time(self) -> float:
448
+ """Calculate average wait time for tasks in queue"""
449
+ if not self.task_queue:
450
+ return 0
451
+
452
+ current_time = time.time()
453
+ total_wait = sum(current_time - task.created_at for _, _, task in self.task_queue)
454
+ return total_wait / len(self.task_queue) if self.task_queue else 0
455
+
456
+
457
+ # Global instance
458
+ load_balancer = LoadBalancer()
shared/models.py ADDED
@@ -0,0 +1,70 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from pydantic import BaseModel
2
+ from typing import List, Optional, Dict, Any
3
+ from enum import Enum
4
+
5
+
6
+ class NodeType(str, Enum):
7
+ HEAD = "head"
8
+ RAM = "ram"
9
+ DISK = "disk"
10
+ COMPUTE = "compute"
11
+ GPU = "gpu"
12
+ TPU = "tpu"
13
+ NPU = "npu"
14
+
15
+
16
+ class ChatMessage(BaseModel):
17
+ role: str # "user" or "assistant"
18
+ content: str
19
+
20
+
21
+ class ChatRequest(BaseModel):
22
+ messages: List[ChatMessage]
23
+ model: str = "sam-x-nano"
24
+ max_tokens: Optional[int] = 512
25
+ temperature: Optional[float] = 0.8
26
+ top_k: Optional[int] = 40
27
+ top_p: Optional[float] = 0.9
28
+ repetition_penalty: Optional[float] = 1.1
29
+ stream: Optional[bool] = False # Support for streaming
30
+ use_token_distribution: Optional[bool] = False # Enable token-by-token distribution for autoregressive models
31
+
32
+
33
+ class ChatResponse(BaseModel):
34
+ id: str
35
+ object: str = "chat.completion"
36
+ created: int
37
+ model: str
38
+ choices: List[Dict[str, Any]]
39
+ usage: Optional[Dict[str, int]] = None
40
+
41
+
42
+ class StreamChoice(BaseModel):
43
+ index: int
44
+ delta: Dict[str, Any] # For streaming, contains the delta content
45
+ finish_reason: Optional[str] = None
46
+
47
+
48
+ class ChatStreamResponse(BaseModel):
49
+ id: str
50
+ object: str = "chat.completion.chunk"
51
+ created: int
52
+ model: str
53
+ choices: List[StreamChoice]
54
+
55
+
56
+ class WorkerStatus(BaseModel):
57
+ model_name: str
58
+ node_type: Optional[NodeType] = None
59
+ is_active: bool
60
+ load: float
61
+ last_heartbeat: int
62
+ capabilities: Optional[Dict[str, Any]] = None
63
+
64
+
65
+ class TaskFileRequest(BaseModel):
66
+ task_type: str
67
+ model_name: str
68
+ task_data: Dict[str, Any]
69
+ priority: str = "normal"
70
+ max_workers: int = 1
shared/node_types.py ADDED
@@ -0,0 +1,104 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from pydantic import BaseModel
2
+ from typing import List, Optional, Dict, Any
3
+ from enum import Enum
4
+
5
+
6
+ class NodeType(str, Enum):
7
+ HEAD = "head"
8
+ RAM = "ram"
9
+ DISK = "disk"
10
+ COMPUTE = "compute"
11
+ GPU = "gpu"
12
+ TPU = "tpu"
13
+ NPU = "npu"
14
+
15
+
16
+ class NodeCapabilities(BaseModel):
17
+ """Capabilities of a node in the SACCP network"""
18
+ node_type: NodeType
19
+ cpu_count: int
20
+ memory_gb: float
21
+ disk_space_gb: float
22
+ gpu_available: bool
23
+ gpu_info: Optional[Dict[str, Any]] = None
24
+ tpu_available: bool
25
+ npu_available: bool
26
+ network_bandwidth_mbps: Optional[float] = None
27
+ uptime_hours: Optional[float] = None
28
+ smilyai_approved: bool = False # For HEAD nodes approval
29
+ performance_score: float = 1.0
30
+
31
+
32
+ class NodeRegistrationRequest(BaseModel):
33
+ """Request model for node registration with the SACCP network"""
34
+ node_id: str
35
+ endpoint: str
36
+ capabilities: NodeCapabilities
37
+ node_version: str = "1.0.0"
38
+
39
+
40
+ class NodeRegistrationResponse(BaseModel):
41
+ """Response model for node registration"""
42
+ success: bool
43
+ node_id: str
44
+ message: str
45
+ approval_status: str # pending, approved, rejected
46
+
47
+
48
+ class NodeListResponse(BaseModel):
49
+ """Response model for listing network nodes"""
50
+ nodes: List[Dict[str, Any]]
51
+ total_nodes: int
52
+ online_nodes: int
53
+
54
+
55
+ class NodeStatus(BaseModel):
56
+ """Status of a node in the network"""
57
+ node_id: str
58
+ node_type: NodeType
59
+ endpoint: str
60
+ is_online: bool
61
+ last_heartbeat: int
62
+ capabilities: NodeCapabilities
63
+ tasks_completed: int
64
+ tasks_failed: int
65
+ credits_earned: float
66
+
67
+
68
+ class CreditTransaction(BaseModel):
69
+ """Model for credit transactions in the SACCP ecosystem"""
70
+ transaction_id: str
71
+ node_id: str
72
+ amount: float
73
+ transaction_type: str # 'earned', 'spent', 'transferred'
74
+ reason: str # 'task_completion', 'resource_contribution', 'service_purchase', etc.
75
+ timestamp: int
76
+ service_type: Optional[str] = None # For service purchases
77
+
78
+
79
+ class CreditBalance(BaseModel):
80
+ """Model for node credit balance"""
81
+ node_id: str
82
+ balance: float
83
+ total_earned: float
84
+ total_spent: float
85
+ transactions: List[CreditTransaction]
86
+
87
+
88
+ class ServiceOffering(BaseModel):
89
+ """Model for services available in the SACCP marketplace"""
90
+ service_id: str
91
+ service_name: str
92
+ description: str
93
+ price_per_unit: float
94
+ unit_type: str # 'hour', 'gb_storage', 'compute_hour', etc.
95
+ provider_node_id: Optional[str] = None
96
+ availability: bool = True
97
+
98
+
99
+ class ServiceRequest(BaseModel):
100
+ """Request for a service from the marketplace"""
101
+ service_id: str
102
+ node_id: str
103
+ quantity: float
104
+ parameters: Optional[Dict[str, Any]] = None
space-config.yaml ADDED
@@ -0,0 +1,8 @@
 
 
 
 
 
 
 
 
 
1
+ # SACCP Node Space Configuration
2
+ runtime:
3
+ cpu: "medium"
4
+ memory: "16x"
5
+ accelerator: "cpu" # Will be configured based on node type
6
+ env:
7
+ NODE_TYPE: "universal"
8
+ MODEL_TYPE: "universal"
worker_app.py ADDED
@@ -0,0 +1,564 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ import time
3
+ import json
4
+ import asyncio
5
+ from datetime import datetime
6
+ from typing import Dict, List, Optional
7
+ from fastapi import FastAPI, HTTPException
8
+ from fastapi.responses import StreamingResponse
9
+ import uvicorn
10
+ from pydantic import BaseModel
11
+ from shared.models import ChatRequest, ChatResponse, ChatMessage
12
+ import tensorflow as tf
13
+ import keras
14
+ import numpy as np
15
+ from tokenizers import Tokenizer
16
+ from huggingface_hub import hf_hub_download
17
+ import requests
18
+ from transformers import GPT2Tokenizer
19
+ from .model_manager import ModelManager
20
+
21
+ app = FastAPI(
22
+ title="Universal Worker Node for Sam-X Models",
23
+ description="Processing node that supports all Sam-X model types dynamically",
24
+ version="2.0.0"
25
+ )
26
+
27
+ # Global model manager instance
28
+ model_manager = ModelManager()
29
+ model_loaded = True # Always true since we're using lazy loading
30
+
31
+ # Performance optimizations
32
+ NUM_CORES = os.cpu_count() or 4
33
+ os.environ['TF_NUM_INTEROP_THREADS'] = str(NUM_CORES)
34
+ os.environ['TF_NUM_INTRAOP_THREADS'] = str(NUM_CORES)
35
+ os.environ['CUDA_VISIBLE_DEVICES'] = '-1' # Force CPU only
36
+ os.environ['TF_ENABLE_ONEDNN_OPTS'] = '1' # Intel optimization
37
+ os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2' # Reduce TF logging
38
+
39
+ # Configure TF threading
40
+ tf.config.threading.set_inter_op_parallelism_threads(NUM_CORES)
41
+ tf.config.threading.set_intra_op_parallelism_threads(NUM_CORES)
42
+
43
+ print(f"✅ CPU optimized: {NUM_CORES} threads, oneDNN enabled")
44
+
45
+
46
+ def format_chat_prompt(messages: List[Dict[str, str]]) -> str:
47
+ """Format chat messages into a prompt for the model"""
48
+ prompt = ""
49
+
50
+ for msg in messages:
51
+ role = msg.get('role', 'user')
52
+ content = msg.get('content', '')
53
+
54
+ if role.lower() == 'user':
55
+ prompt += f"""
56
+ {content}
57
+ """
58
+ elif role.lower() == 'assistant':
59
+ prompt += f"""
60
+ {content}
61
+ """
62
+ else:
63
+ # System or other roles
64
+ prompt += f"{content}\n"
65
+
66
+ # Add assistant prefix for the response
67
+ prompt += """
68
+
69
+ """
70
+
71
+ return prompt
72
+
73
+
74
+ def sample_token(logits, temperature=0.8, top_k=40, top_p=0.9, repetition_penalty=1.1):
75
+ """Sample next token from logits"""
76
+ # Apply temperature
77
+ logits = logits / temperature
78
+
79
+ # Apply repetition penalty
80
+ if repetition_penalty != 1.0:
81
+ logits = np.where(logits < 0, logits * repetition_penalty, logits / repetition_penalty)
82
+
83
+ # Convert to probabilities
84
+ probs = np.exp(logits - np.max(logits)) # Numerical stability
85
+ probs = probs / np.sum(probs)
86
+
87
+ # Top-k filtering
88
+ if top_k > 0 and top_k < len(probs):
89
+ top_k_idx = np.argpartition(probs, -top_k)[-top_k:]
90
+ top_k_probs = probs[top_k_idx]
91
+ top_k_probs = top_k_probs / np.sum(top_k_probs) # Normalize
92
+ sampled_idx = np.random.choice(len(top_k_idx), p=top_k_probs)
93
+ return top_k_idx[sampled_idx]
94
+
95
+ # Top-p (nucleus) sampling
96
+ if top_p < 1.0:
97
+ sorted_idx = np.argsort(probs)[::-1]
98
+ sorted_probs = probs[sorted_idx]
99
+ cumulative_probs = np.cumsum(sorted_probs)
100
+ cutoff_idx = np.searchsorted(cumulative_probs, top_p)
101
+ cutoff_idx = min(cutoff_idx + 1, len(sorted_idx))
102
+
103
+ nucleus_idx = sorted_idx[:cutoff_idx]
104
+ nucleus_probs = probs[nucleus_idx]
105
+ nucleus_probs = nucleus_probs / np.sum(nucleus_probs) # Normalize
106
+ sampled_idx = np.random.choice(len(nucleus_idx), p=nucleus_probs)
107
+ return nucleus_idx[sampled_idx]
108
+
109
+ # Regular sampling
110
+ return np.random.choice(len(probs), p=probs)
111
+
112
+
113
+ def generate_response(model: keras.Model, tokenizer: Tokenizer, config: dict,
114
+ prompt: str, max_tokens: int = 512, temperature: float = 0.8,
115
+ top_k: int = 40, top_p: float = 0.9, repetition_penalty: float = 1.1) -> str:
116
+ """Generate response from the model"""
117
+
118
+ # Tokenize the prompt
119
+ prompt_ids = tokenizer.encode(prompt).ids
120
+ input_ids = tf.constant([prompt_ids], dtype=tf.int32)
121
+
122
+ # Run the model
123
+ generated_ids = []
124
+ current_ids = input_ids
125
+
126
+ # Process tokens one by one (simplified generation without KV cache for this example)
127
+ for i in range(max_tokens):
128
+ with tf.device('/CPU:0'): # Use CPU for inference
129
+ logits, _ = model(current_ids, training=False, use_cache=False)
130
+ next_token_logits = logits[0, -1, :].numpy()
131
+
132
+ # Sample next token
133
+ next_token_id = sample_token(next_token_logits, temperature, top_k, top_p, repetition_penalty)
134
+
135
+ # Add to generated sequence
136
+ generated_ids.append(next_token_id)
137
+ current_ids = tf.constant([[next_token_id]], dtype=tf.int32)
138
+
139
+ # Stop if we hit an end token
140
+ eos_token_id = config.get('eos_token_id', 50256)
141
+ stop_token_ids = [eos_token_id, tokenizer.token_to_id("\n"), tokenizer.token_to_id("<im end for model tun>")]
142
+ if next_token_id in stop_token_ids and next_token_id is not None:
143
+ break
144
+
145
+ # Decode the generated tokens
146
+ generated_text = tokenizer.decode(generated_ids)
147
+
148
+ # Clean up the response
149
+ # Remove any end tokens that might have been included
150
+ stop_tokens = ["\n", "<im end for model tun>"]
151
+ for token in stop_tokens:
152
+ idx = generated_text.find(token)
153
+ if idx != -1:
154
+ generated_text = generated_text[:idx]
155
+
156
+ return generated_text.strip()
157
+
158
+
159
+ async def generate_streaming_response(model: keras.Model, tokenizer: Tokenizer, config: dict,
160
+ prompt: str, max_tokens: int = 512, temperature: float = 0.8,
161
+ top_k: int = 40, top_p: float = 0.9, repetition_penalty: float = 1.1):
162
+ """Generate streaming response from the model"""
163
+ import json
164
+ import time
165
+
166
+ # Tokenize the prompt
167
+ prompt_ids = tokenizer.encode(prompt).ids
168
+ input_ids = tf.constant([prompt_ids], dtype=tf.int32)
169
+
170
+ # Run the model
171
+ generated_ids = []
172
+ current_ids = input_ids
173
+
174
+ # Send initial chunk with role
175
+ initial_chunk = {
176
+ "id": f"chat-{int(time.time())}",
177
+ "object": "chat.completion.chunk",
178
+ "created": int(time.time()),
179
+ "model": "dynamic_model", # Will be set by the calling function
180
+ "choices": [{
181
+ "index": 0,
182
+ "delta": {"role": "assistant", "content": ""},
183
+ "finish_reason": None
184
+ }]
185
+ }
186
+ yield f"data: {json.dumps(initial_chunk)}\n\n"
187
+
188
+ # Process tokens one by one with streaming - this is where SACCP token distribution happens
189
+ for i in range(max_tokens):
190
+ with tf.device('/CPU:0'): # Use CPU for inference
191
+ logits, _ = model(current_ids, training=False, use_cache=False)
192
+ next_token_logits = logits[0, -1, :].numpy()
193
+
194
+ # Sample next token
195
+ next_token_id = sample_token(next_token_logits, temperature, top_k, top_p, repetition_penalty)
196
+
197
+ # Add to generated sequence
198
+ generated_ids.append(next_token_id)
199
+ current_ids = tf.constant([[next_token_id]], dtype=tf.int32)
200
+
201
+ # Decode this single token to get text
202
+ token_text = tokenizer.decode([next_token_id])
203
+
204
+ # Create chunk with the token
205
+ chunk = {
206
+ "id": f"chat-{int(time.time())}",
207
+ "object": "chat.completion.chunk",
208
+ "created": int(time.time()),
209
+ "model": "dynamic_model", # Will be set by the calling function
210
+ "choices": [{
211
+ "index": 0,
212
+ "delta": {"content": token_text},
213
+ "finish_reason": None
214
+ }]
215
+ }
216
+ yield f"data: {json.dumps(chunk)}\n\n"
217
+
218
+ # Check if we should stop
219
+ eos_token_id = config.get('eos_token_id', 50256)
220
+ stop_token_ids = [eos_token_id, tokenizer.token_to_id("\n"), tokenizer.token_to_id("<im end for model tun>")]
221
+ if next_token_id in stop_token_ids and next_token_id is not None:
222
+ break
223
+
224
+ # Send final chunk
225
+ final_chunk = {
226
+ "id": f"chat-{int(time.time())}",
227
+ "object": "chat.completion.chunk",
228
+ "created": int(time.time()),
229
+ "model": "dynamic_model", # Will be set by the calling function
230
+ "choices": [{
231
+ "index": 0,
232
+ "delta": {},
233
+ "finish_reason": "stop"
234
+ }]
235
+ }
236
+ yield f"data: {json.dumps(final_chunk)}\n\n"
237
+
238
+
239
+ async def generate_token_by_token_streaming_response(model: keras.Model, tokenizer: Tokenizer, config: dict,
240
+ prompt: str, max_tokens: int = 512, temperature: float = 0.8,
241
+ top_k: int = 40, top_p: float = 0.9, repetition_penalty: float = 1.1):
242
+ """Generate streaming response with token-by-token processing, suitable for SACCP distribution"""
243
+ import json
244
+ import time
245
+
246
+ # Tokenize the prompt
247
+ prompt_ids = tokenizer.encode(prompt).ids
248
+ input_ids = tf.constant([prompt_ids], dtype=tf.int32)
249
+
250
+ # Initialize sequence
251
+ current_ids = input_ids
252
+ generated_text = ""
253
+
254
+ # Send initial chunk with role
255
+ initial_chunk = {
256
+ "id": f"chat-{int(time.time())}",
257
+ "object": "chat.completion.chunk",
258
+ "created": int(time.time()),
259
+ "model": "dynamic_model",
260
+ "choices": [{
261
+ "index": 0,
262
+ "delta": {"role": "assistant", "content": ""},
263
+ "finish_reason": None
264
+ }]
265
+ }
266
+ yield f"data: {json.dumps(initial_chunk)}\n\n"
267
+
268
+ for i in range(max_tokens):
269
+ # Process one token at a time (in a real SACCP scenario, this could be distributed)
270
+ with tf.device('/CPU:0'):
271
+ logits, _ = model(current_ids, training=False, use_cache=False)
272
+ next_token_logits = logits[0, -1, :].numpy()
273
+
274
+ # Sample next token
275
+ next_token_id = sample_token(next_token_logits, temperature, top_k, top_p, repetition_penalty)
276
+
277
+ # Decode token to text
278
+ token_text = tokenizer.decode([next_token_id])
279
+
280
+ # Update the generated text
281
+ generated_text += token_text
282
+
283
+ # Create and yield chunk for this token
284
+ chunk = {
285
+ "id": f"token-{i}-{int(time.time())}",
286
+ "object": "chat.completion.chunk",
287
+ "created": int(time.time()),
288
+ "model": "dynamic_model",
289
+ "choices": [{
290
+ "index": 0,
291
+ "delta": {"content": token_text},
292
+ "finish_reason": None
293
+ }]
294
+ }
295
+ yield f"data: {json.dumps(chunk)}\n\n"
296
+
297
+ # Prepare for next iteration
298
+ current_ids = tf.constant([[next_token_id]], dtype=tf.int32)
299
+
300
+ # Check for stopping conditions
301
+ eos_token_id = config.get('eos_token_id', 50256)
302
+ stop_token_ids = [eos_token_id, tokenizer.token_to_id("\n"), tokenizer.token_to_id("<im end for model tun>")]
303
+ if next_token_id in stop_token_ids and next_token_id is not None:
304
+ break
305
+
306
+ # Final chunk
307
+ final_chunk = {
308
+ "id": f"chat-{int(time.time())}",
309
+ "object": "chat.completion.chunk",
310
+ "created": int(time.time()),
311
+ "model": "dynamic_model",
312
+ "choices": [{
313
+ "index": 0,
314
+ "delta": {},
315
+ "finish_reason": "stop"
316
+ }]
317
+ }
318
+ yield f"data: {json.dumps(final_chunk)}\n\n"
319
+
320
+
321
+ @app.on_event("startup")
322
+ def startup_event():
323
+ """Initialize model manager on startup"""
324
+ global model_loaded
325
+
326
+ print("Initializing universal worker...")
327
+ print(f"Available models: {model_manager.list_available_models()}")
328
+
329
+ try:
330
+ print("✅ Universal worker initialized successfully!")
331
+ print("This worker can dynamically load any Sam-X model based on requests")
332
+ except Exception as e:
333
+ print(f"❌ Worker initialization failed: {e}")
334
+ model_loaded = False
335
+
336
+
337
+ @app.post("/chat/completions")
338
+ async def chat_completions(request: ChatRequest):
339
+ """Process chat completion request"""
340
+ global model_loaded
341
+
342
+ try:
343
+ # Extract model type from request
344
+ model_type = request.model.lower()
345
+
346
+ # Validate model type
347
+ available_models = model_manager.list_available_models()
348
+ if model_type not in available_models:
349
+ # Find closest matching model
350
+ matching_models = [m for m in available_models if model_type in m or m in model_type]
351
+ if matching_models:
352
+ model_type = matching_models[0] # Use first available match
353
+ else:
354
+ raise HTTPException(
355
+ status_code=400,
356
+ detail=f"Model {request.model} not available. Available models: {available_models}"
357
+ )
358
+
359
+ # Get the appropriate model and tokenizer for this request
360
+ model, tokenizer, config = model_manager.get_model(model_type)
361
+
362
+ # Format the messages into a single prompt
363
+ messages = [{"role": msg.role, "content": msg.content} for msg in request.messages]
364
+ prompt = format_chat_prompt(messages)
365
+
366
+ # If streaming is requested, return StreamingResponse
367
+ if request.stream:
368
+ async def generate():
369
+ async for chunk in generate_streaming_response(
370
+ model=model,
371
+ tokenizer=tokenizer,
372
+ config=config,
373
+ prompt=prompt,
374
+ max_tokens=request.max_tokens,
375
+ temperature=request.temperature,
376
+ top_k=request.top_k,
377
+ top_p=request.top_p,
378
+ repetition_penalty=request.repetition_penalty
379
+ ):
380
+ # Update model name in chunk
381
+ import json
382
+ chunk_data = json.loads(chunk[7:-4]) # Extract JSON from "data: {...}\n\n"
383
+ chunk_data["model"] = request.model
384
+ updated_chunk = f"data: {json.dumps(chunk_data)}\n\n"
385
+ yield updated_chunk
386
+
387
+ return StreamingResponse(generate(), media_type="text/event-stream")
388
+
389
+ # Otherwise, generate full response
390
+ start_time = time.time()
391
+ response_text = generate_response(
392
+ model=model,
393
+ tokenizer=tokenizer,
394
+ config=config,
395
+ prompt=prompt,
396
+ max_tokens=request.max_tokens,
397
+ temperature=request.temperature,
398
+ top_k=request.top_k,
399
+ top_p=request.top_p,
400
+ repetition_penalty=request.repetition_penalty
401
+ )
402
+ processing_time = time.time() - start_time
403
+
404
+ # Create response in OpenAI-compatible format
405
+ response = ChatResponse(
406
+ id=f"chat-{int(time.time())}",
407
+ model=request.model, # Use original model name
408
+ choices=[
409
+ {
410
+ "index": 0,
411
+ "message": {"role": "assistant", "content": response_text},
412
+ "finish_reason": "stop"
413
+ }
414
+ ],
415
+ usage={
416
+ "prompt_tokens": len(prompt),
417
+ "completion_tokens": len(response_text),
418
+ "total_tokens": len(prompt) + len(response_text)
419
+ }
420
+ )
421
+
422
+ print(f"Generated response in {processing_time:.2f}s for model {request.model} (loaded as {model_type})")
423
+
424
+ return response.dict()
425
+
426
+ except Exception as e:
427
+ print(f"Error processing request: {e}")
428
+ raise HTTPException(status_code=500, detail=f"Error processing request: {str(e)}")
429
+
430
+
431
+ @app.get("/health")
432
+ async def health_check():
433
+ """Health check endpoint"""
434
+ return {
435
+ "status": "healthy" if model_loaded else "unhealthy",
436
+ "model_loaded": model_loaded,
437
+ "timestamp": int(time.time()),
438
+ "supported_models": model_manager.list_available_models(),
439
+ "loaded_models": list(model_manager.models.keys())
440
+ }
441
+
442
+
443
+ @app.get("/model-info")
444
+ async def model_info(model_type: str = "sam-x-large"):
445
+ """Get information about a specific model"""
446
+ try:
447
+ if model_type not in model_manager.list_available_models():
448
+ raise HTTPException(
449
+ status_code=404,
450
+ detail=f"Model {model_type} not available. Available: {model_manager.list_available_models()}"
451
+ )
452
+
453
+ model, tokenizer, config = model_manager.get_model(model_type)
454
+
455
+ return {
456
+ "model_type": model_type,
457
+ "vocab_size": tokenizer.get_vocab_size(),
458
+ "parameters": int(model.count_params()) if model else 0,
459
+ "max_context_length": config.get('max_position_embeddings', 2048),
460
+ "loaded": model_manager.is_model_loaded(model_type),
461
+ "num_hidden_layers": config.get('num_hidden_layers', 12),
462
+ "hidden_size": config.get('hidden_size', 768),
463
+ "num_attention_heads": config.get('num_attention_heads', 12)
464
+ }
465
+ except Exception as e:
466
+ raise HTTPException(status_code=500, detail=f"Error getting model info: {str(e)}")
467
+
468
+
469
+ @app.get("/models")
470
+ async def list_models():
471
+ """List all available models"""
472
+ return {
473
+ "object": "list",
474
+ "data": [
475
+ {
476
+ "id": model_name,
477
+ "object": "model",
478
+ "created": int(time.time()),
479
+ "owned_by": "universal-worker"
480
+ }
481
+ for model_name in model_manager.list_available_models()
482
+ ]
483
+ }
484
+
485
+
486
+ @app.post("/saccp/process-task")
487
+ async def process_saccp_task(request: dict):
488
+ """Process a SACCP task - interface for distributed computing"""
489
+ try:
490
+ task_type = request.get("task_type", "inference")
491
+ model_type = request.get("model_name", "sam-x-large")
492
+ task_data = request.get("task_data", {})
493
+
494
+ # Get the appropriate model and tokenizer
495
+ model, tokenizer, config = model_manager.get_model(model_type)
496
+
497
+ if task_type == "inference":
498
+ prompt = task_data.get("prompt", "")
499
+ max_tokens = task_data.get("max_tokens", 512)
500
+ temperature = task_data.get("temperature", 0.8)
501
+
502
+ result = generate_response(
503
+ model=model,
504
+ tokenizer=tokenizer,
505
+ config=config,
506
+ prompt=prompt,
507
+ max_tokens=max_tokens,
508
+ temperature=temperature
509
+ )
510
+
511
+ return {
512
+ "status": "success",
513
+ "result": result,
514
+ "model_used": model_type
515
+ }
516
+ elif task_type == "token_generation":
517
+ # Handle token-by-token generation task for autoregressive models
518
+ current_context = task_data.get("current_context", [])
519
+ generation_params = task_data.get("generation_params", {})
520
+
521
+ if not current_context:
522
+ # If no context provided, return error
523
+ raise HTTPException(status_code=400, detail="Current context required for token generation")
524
+
525
+ # Convert context to tensor
526
+ input_ids = tf.constant([current_context], dtype=tf.int32)
527
+
528
+ # Run the model on the context
529
+ with tf.device('/CPU:0'):
530
+ logits, _ = model(input_ids, training=False, use_cache=False)
531
+ # Get logits for the last token position
532
+ next_token_logits = logits[0, -1, :].numpy()
533
+
534
+ # Apply generation parameters
535
+ temperature = generation_params.get("temperature", 0.8)
536
+ top_k = generation_params.get("top_k", 40)
537
+ top_p = generation_params.get("top_p", 0.9)
538
+ repetition_penalty = generation_params.get("repetition_penalty", 1.1)
539
+
540
+ # Sample next token
541
+ next_token_id = sample_token(next_token_logits, temperature, top_k, top_p, repetition_penalty)
542
+
543
+ # Decode token to text
544
+ token_text = tokenizer.decode([next_token_id])
545
+
546
+ return {
547
+ "status": "success",
548
+ "token_id": int(next_token_id),
549
+ "token_text": token_text,
550
+ "model_used": model_type,
551
+ "next_position": len(current_context)
552
+ }
553
+ else:
554
+ # For other task types, we can extend this
555
+ raise HTTPException(status_code=400, detail=f"Task type {task_type} not supported")
556
+
557
+ except Exception as e:
558
+ print(f"Error processing SACCP task: {e}")
559
+ raise HTTPException(status_code=500, detail=f"Error processing SACCP task: {str(e)}")
560
+
561
+
562
+ if __name__ == "__main__":
563
+ port = int(os.getenv("PORT", 8000))
564
+ uvicorn.run(app, host="0.0.0.0", port=port)