|
|
import torch
|
|
|
from .decent_torch import DecentModel
|
|
|
from .openpeer import OpenPeerClient
|
|
|
from .grammar import LonScriptGrammar
|
|
|
from .modeling_openpeer import OpenPeerLLM
|
|
|
from .configuration_openpeer import OpenPeerConfig
|
|
|
from .tokenization_openpeer import OpenPeerTokenizer
|
|
|
import asyncio
|
|
|
from typing import Dict, Any, Optional
|
|
|
|
|
|
class DecentralizedLLM(DecentModel):
|
|
|
def __init__(self, network_url: str = "ws://localhost:8000"):
|
|
|
super().__init__()
|
|
|
|
|
|
self.config = OpenPeerConfig()
|
|
|
self.model = OpenPeerLLM(self.config)
|
|
|
self.tokenizer = OpenPeerTokenizer()
|
|
|
self.peer_client = OpenPeerClient(network_url)
|
|
|
self.grammar = LonScriptGrammar()
|
|
|
self._ensure_model_on_device()
|
|
|
|
|
|
def _ensure_model_on_device(self):
|
|
|
"""Ensure model is on the correct device"""
|
|
|
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
|
|
|
self.model = self.model.to(device)
|
|
|
|
|
|
def forward(self, input_text: str) -> str:
|
|
|
|
|
|
inputs = self.tokenizer(input_text, return_tensors="pt")
|
|
|
inputs = {k: v.to(self.model.device) for k, v in inputs.items()}
|
|
|
|
|
|
|
|
|
with torch.no_grad():
|
|
|
outputs = self.model(**inputs)
|
|
|
logits = outputs["logits"]
|
|
|
|
|
|
|
|
|
next_token_logits = logits[:, -1, :]
|
|
|
next_tokens = torch.argmax(next_token_logits, dim=-1)
|
|
|
|
|
|
generated_ids = [inputs["input_ids"][0].tolist()]
|
|
|
for _ in range(100):
|
|
|
curr_input = torch.tensor([generated_ids[-1]], device=self.model.device)
|
|
|
with torch.no_grad():
|
|
|
outputs = self.model(curr_input)
|
|
|
next_token_logits = outputs["logits"][:, -1, :]
|
|
|
next_token = torch.argmax(next_token_logits, dim=-1).item()
|
|
|
|
|
|
generated_ids.append([next_token])
|
|
|
|
|
|
if next_token == self.tokenizer.eos_token_id:
|
|
|
break
|
|
|
|
|
|
|
|
|
decoded_output = self.tokenizer.decode(torch.tensor(generated_ids).flatten(), skip_special_tokens=True)
|
|
|
return decoded_output
|
|
|
from .grammar import LonScriptGrammar
|
|
|
from .modeling_openpeer import OpenPeerLLM
|
|
|
from .configuration_openpeer import OpenPeerConfig
|
|
|
from .tokenization_openpeer import OpenPeerTokenizer
|
|
|
import asyncio
|
|
|
from typing import Dict, Any, Optional
|
|
|
|
|
|
class DecentralizedLLM(DecentModel):
|
|
|
def __init__(self, network_url: str = "ws://localhost:8000"):
|
|
|
super().__init__()
|
|
|
|
|
|
self.config = OpenPeerConfig()
|
|
|
self.model = OpenPeerLLM(self.config)
|
|
|
self.tokenizer = OpenPeerTokenizer()
|
|
|
self.peer_client = OpenPeerClient(network_url)
|
|
|
self.grammar = LonScriptGrammar()
|
|
|
self._ensure_model_on_device()
|
|
|
|
|
|
def _ensure_model_on_device(self):
|
|
|
"""Ensure model is on the correct device"""
|
|
|
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
|
|
|
self.model = self.model.to(device)
|
|
|
|
|
|
async def connect_to_network(self):
|
|
|
"""Connect to the peer network"""
|
|
|
await self.peer_client.connect(self.peer_id)
|
|
|
asyncio.create_task(self._handle_peer_updates())
|
|
|
|
|
|
async def _handle_peer_updates(self):
|
|
|
"""Handle incoming updates from peers"""
|
|
|
async for update in self.peer_client.receive_updates():
|
|
|
if update["type"] == "model_update":
|
|
|
await self._process_model_update(update)
|
|
|
|
|
|
async def _process_model_update(self, update: Dict[str, Any]):
|
|
|
"""Process received model updates"""
|
|
|
state_dict = {k: torch.tensor(v) for k, v in update["state"].items()}
|
|
|
self.state_updates[update["peer_id"]] = state_dict
|
|
|
self.aggregate_states()
|
|
|
|
|
|
def forward(self, input_text: str) -> str:
|
|
|
"""Generate response for input text"""
|
|
|
|
|
|
inputs = self.tokenizer(input_text, return_tensors="pt")
|
|
|
inputs = {k: v.to(self.model.device) for k, v in inputs.items()}
|
|
|
|
|
|
|
|
|
with torch.no_grad():
|
|
|
outputs = self.model.generate(
|
|
|
**inputs,
|
|
|
max_length=100,
|
|
|
num_return_sequences=1,
|
|
|
pad_token_id=self.tokenizer.eos_token_id
|
|
|
)
|
|
|
|
|
|
|
|
|
decoded_output = self.tokenizer.decode(outputs[0], skip_special_tokens=True)
|
|
|
return decoded_output
|
|
|
|
|
|
async def train_step(self, batch: Dict[str, torch.Tensor]):
|
|
|
"""Perform a training step and share updates with peers"""
|
|
|
|
|
|
outputs = self.model(**batch)
|
|
|
loss = outputs.loss
|
|
|
|
|
|
|
|
|
loss.backward()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
await self.peer_client.send_model_update(self.model.state_dict())
|
|
|
|
|
|
def reason(self, context: str, query: str) -> str:
|
|
|
"""Implement deep reasoning capabilities with grammar enhancement"""
|
|
|
|
|
|
prompt = f"Context: {context}\nQuery: {query}\nReasoned response:"
|
|
|
|
|
|
|
|
|
initial_response = self.forward(prompt)
|
|
|
|
|
|
|
|
|
enhanced_response = self.grammar.apply_grammar_rules(initial_response)
|
|
|
|
|
|
return enhanced_response |