|
|
import logging |
|
|
import datetime |
|
|
from typing import Any, Dict, List, Optional, Tuple |
|
|
import uuid |
|
|
|
|
|
from config import config |
|
|
|
|
|
class Transaction: |
|
|
"""Class representing a token transaction.""" |
|
|
|
|
|
def __init__(self, sender_id: Optional[str], recipient_id: str, amount: float, |
|
|
transaction_type: str, description: str = ""): |
|
|
"""Initialize a transaction.""" |
|
|
self.id = str(uuid.uuid4()) |
|
|
self.sender_id = sender_id |
|
|
self.recipient_id = recipient_id |
|
|
self.amount = amount |
|
|
self.transaction_type = transaction_type |
|
|
self.description = description |
|
|
self.timestamp = datetime.datetime.now().isoformat() |
|
|
self.fee = 0.0 |
|
|
|
|
|
def apply_fee(self, fee_rate: float) -> float: |
|
|
"""Apply a transaction fee and return the fee amount.""" |
|
|
if fee_rate <= 0: |
|
|
return 0.0 |
|
|
|
|
|
self.fee = self.amount * fee_rate |
|
|
return self.fee |
|
|
|
|
|
def to_dict(self) -> Dict[str, Any]: |
|
|
"""Convert transaction to dictionary.""" |
|
|
return { |
|
|
"id": self.id, |
|
|
"sender_id": self.sender_id, |
|
|
"recipient_id": self.recipient_id, |
|
|
"amount": self.amount, |
|
|
"fee": self.fee, |
|
|
"transaction_type": self.transaction_type, |
|
|
"description": self.description, |
|
|
"timestamp": self.timestamp |
|
|
} |
|
|
|
|
|
|
|
|
class TokenManager: |
|
|
"""Class for managing the token economy.""" |
|
|
|
|
|
def __init__(self): |
|
|
"""Initialize the token manager.""" |
|
|
self.transactions = [] |
|
|
self.balances = {} |
|
|
self.staked_tokens = {} |
|
|
self.interest_rate = config.get("token_economy.interest_rate", 0.01) |
|
|
self.transaction_fee = config.get("token_economy.transaction_fee", 0.001) |
|
|
self.currency_name = config.get("token_economy.currency_name", "ANNAC") |
|
|
self.minimum_balance = config.get("token_economy.minimum_balance", 0.0) |
|
|
self.created_at = datetime.datetime.now().isoformat() |
|
|
self.logger = logging.getLogger("token_manager") |
|
|
self.logger.info(f"Token manager initialized with currency {self.currency_name}") |
|
|
|
|
|
def create_tokens(self, recipient_id: str, amount: float, reason: str = "") -> bool: |
|
|
"""Create new tokens and assign to a recipient.""" |
|
|
if amount <= 0: |
|
|
self.logger.warning(f"Cannot create non-positive amount: {amount}") |
|
|
return False |
|
|
|
|
|
|
|
|
transaction = Transaction(None, recipient_id, amount, "create", reason) |
|
|
self.transactions.append(transaction) |
|
|
|
|
|
|
|
|
if recipient_id not in self.balances: |
|
|
self.balances[recipient_id] = 0.0 |
|
|
self.balances[recipient_id] += amount |
|
|
|
|
|
self.logger.info(f"Created {amount} {self.currency_name} for {recipient_id}: {reason}") |
|
|
return True |
|
|
|
|
|
def transfer_tokens(self, sender_id: str, recipient_id: str, amount: float, |
|
|
description: str = "") -> bool: |
|
|
"""Transfer tokens from one agent to another.""" |
|
|
if amount <= 0: |
|
|
self.logger.warning(f"Cannot transfer non-positive amount: {amount}") |
|
|
return False |
|
|
|
|
|
if sender_id not in self.balances: |
|
|
self.logger.warning(f"Sender {sender_id} has no balance") |
|
|
return False |
|
|
|
|
|
|
|
|
transaction = Transaction(sender_id, recipient_id, amount, "transfer", description) |
|
|
fee = transaction.apply_fee(self.transaction_fee) |
|
|
total_amount = amount + fee |
|
|
|
|
|
|
|
|
if self.balances[sender_id] < total_amount: |
|
|
self.logger.warning(f"Sender {sender_id} has insufficient balance: {self.balances[sender_id]} < {total_amount}") |
|
|
return False |
|
|
|
|
|
|
|
|
if self.balances[sender_id] - total_amount < self.minimum_balance: |
|
|
self.logger.warning(f"Transfer would put sender {sender_id} below minimum balance {self.minimum_balance}") |
|
|
return False |
|
|
|
|
|
|
|
|
self.balances[sender_id] -= total_amount |
|
|
|
|
|
if recipient_id not in self.balances: |
|
|
self.balances[recipient_id] = 0.0 |
|
|
self.balances[recipient_id] += amount |
|
|
|
|
|
|
|
|
self.transactions.append(transaction) |
|
|
|
|
|
self.logger.info(f"Transferred {amount} {self.currency_name} from {sender_id} to {recipient_id}" + |
|
|
(f" (fee: {fee} {self.currency_name})" if fee > 0 else "")) |
|
|
return True |
|
|
|
|
|
def batch_transfer(self, sender_id: str, recipient_ids: List[str], amounts: List[float], |
|
|
descriptions: List[str] = None) -> bool: |
|
|
"""Transfer tokens to multiple recipients in a single transaction.""" |
|
|
if len(recipient_ids) != len(amounts): |
|
|
self.logger.warning("Recipient IDs and amounts must have the same length") |
|
|
return False |
|
|
|
|
|
if descriptions and len(descriptions) != len(recipient_ids): |
|
|
self.logger.warning("Descriptions must have the same length as recipient IDs") |
|
|
return False |
|
|
|
|
|
|
|
|
total_amount = sum(amounts) |
|
|
total_fee = total_amount * self.transaction_fee |
|
|
|
|
|
|
|
|
if sender_id not in self.balances: |
|
|
self.logger.warning(f"Sender {sender_id} has no balance") |
|
|
return False |
|
|
|
|
|
if self.balances[sender_id] < total_amount + total_fee: |
|
|
self.logger.warning(f"Sender {sender_id} has insufficient balance for batch transfer") |
|
|
return False |
|
|
|
|
|
|
|
|
if self.balances[sender_id] - (total_amount + total_fee) < self.minimum_balance: |
|
|
self.logger.warning(f"Batch transfer would put sender {sender_id} below minimum balance {self.minimum_balance}") |
|
|
return False |
|
|
|
|
|
|
|
|
for i, (recipient_id, amount) in enumerate(zip(recipient_ids, amounts)): |
|
|
description = descriptions[i] if descriptions else "" |
|
|
|
|
|
|
|
|
transaction = Transaction(sender_id, recipient_id, amount, "batch_transfer", description) |
|
|
fee = amount * self.transaction_fee |
|
|
transaction.fee = fee |
|
|
self.transactions.append(transaction) |
|
|
|
|
|
|
|
|
if recipient_id not in self.balances: |
|
|
self.balances[recipient_id] = 0.0 |
|
|
self.balances[recipient_id] += amount |
|
|
|
|
|
|
|
|
self.balances[sender_id] -= (total_amount + total_fee) |
|
|
|
|
|
self.logger.info(f"Batch transferred {total_amount} {self.currency_name} from {sender_id} to {len(recipient_ids)} recipients" + |
|
|
(f" (total fee: {total_fee} {self.currency_name})" if total_fee > 0 else "")) |
|
|
return True |
|
|
|
|
|
def stake_tokens(self, agent_id: str, amount: float) -> bool: |
|
|
"""Stake tokens to earn interest.""" |
|
|
if amount <= 0: |
|
|
self.logger.warning(f"Cannot stake non-positive amount: {amount}") |
|
|
return False |
|
|
|
|
|
if agent_id not in self.balances: |
|
|
self.logger.warning(f"Agent {agent_id} has no balance") |
|
|
return False |
|
|
|
|
|
if self.balances[agent_id] < amount: |
|
|
self.logger.warning(f"Agent {agent_id} has insufficient balance to stake {amount}") |
|
|
return False |
|
|
|
|
|
|
|
|
if self.balances[agent_id] - amount < self.minimum_balance: |
|
|
self.logger.warning(f"Staking would put agent {agent_id} below minimum balance {self.minimum_balance}") |
|
|
return False |
|
|
|
|
|
|
|
|
self.balances[agent_id] -= amount |
|
|
|
|
|
if agent_id not in self.staked_tokens: |
|
|
self.staked_tokens[agent_id] = 0.0 |
|
|
self.staked_tokens[agent_id] += amount |
|
|
|
|
|
|
|
|
transaction = Transaction(agent_id, agent_id, amount, "stake", "Token staking") |
|
|
self.transactions.append(transaction) |
|
|
|
|
|
self.logger.info(f"Agent {agent_id} staked {amount} {self.currency_name}") |
|
|
return True |
|
|
|
|
|
def unstake_tokens(self, agent_id: str, amount: float) -> bool: |
|
|
"""Unstake tokens.""" |
|
|
if amount <= 0: |
|
|
self.logger.warning(f"Cannot unstake non-positive amount: {amount}") |
|
|
return False |
|
|
|
|
|
if agent_id not in self.staked_tokens: |
|
|
self.logger.warning(f"Agent {agent_id} has no staked tokens") |
|
|
return False |
|
|
|
|
|
if self.staked_tokens[agent_id] < amount: |
|
|
self.logger.warning(f"Agent {agent_id} has insufficient staked tokens: {self.staked_tokens[agent_id]} < {amount}") |
|
|
return False |
|
|
|
|
|
|
|
|
self.staked_tokens[agent_id] -= amount |
|
|
|
|
|
if agent_id not in self.balances: |
|
|
self.balances[agent_id] = 0.0 |
|
|
self.balances[agent_id] += amount |
|
|
|
|
|
|
|
|
transaction = Transaction(agent_id, agent_id, amount, "unstake", "Token unstaking") |
|
|
self.transactions.append(transaction) |
|
|
|
|
|
self.logger.info(f"Agent {agent_id} unstaked {amount} {self.currency_name}") |
|
|
return True |
|
|
|
|
|
def apply_interest(self) -> Dict[str, float]: |
|
|
"""Apply interest to staked tokens.""" |
|
|
interest_payments = {} |
|
|
|
|
|
for agent_id, staked_amount in self.staked_tokens.items(): |
|
|
if staked_amount <= 0: |
|
|
continue |
|
|
|
|
|
interest = staked_amount * self.interest_rate |
|
|
|
|
|
|
|
|
if agent_id not in self.balances: |
|
|
self.balances[agent_id] = 0.0 |
|
|
self.balances[agent_id] += interest |
|
|
|
|
|
|
|
|
transaction = Transaction(None, agent_id, interest, "interest", "Interest on staked tokens") |
|
|
self.transactions.append(transaction) |
|
|
|
|
|
interest_payments[agent_id] = interest |
|
|
|
|
|
self.logger.info(f"Applied interest of {interest} {self.currency_name} to agent {agent_id}") |
|
|
|
|
|
return interest_payments |
|
|
|
|
|
def get_balance(self, agent_id: str) -> float: |
|
|
"""Get an agent's token balance.""" |
|
|
return self.balances.get(agent_id, 0.0) |
|
|
|
|
|
def get_staked_amount(self, agent_id: str) -> float: |
|
|
"""Get an agent's staked token amount.""" |
|
|
return self.staked_tokens.get(agent_id, 0.0) |
|
|
|
|
|
def get_total_supply(self) -> float: |
|
|
"""Get the total token supply (balances + staked).""" |
|
|
total_balances = sum(self.balances.values()) |
|
|
total_staked = sum(self.staked_tokens.values()) |
|
|
return total_balances + total_staked |
|
|
|
|
|
def get_transaction_history(self, agent_id: Optional[str] = None, |
|
|
limit: Optional[int] = None) -> List[Dict[str, Any]]: |
|
|
"""Get transaction history, optionally filtered by agent and limited.""" |
|
|
if agent_id: |
|
|
filtered_transactions = [t.to_dict() for t in self.transactions |
|
|
if t.sender_id == agent_id or t.recipient_id == agent_id] |
|
|
else: |
|
|
filtered_transactions = [t.to_dict() for t in self.transactions] |
|
|
|
|
|
if limit: |
|
|
return filtered_transactions[-limit:] |
|
|
return filtered_transactions |
|
|
|
|
|
def get_richest_agents(self, limit: int = 5) -> List[Tuple[str, float]]: |
|
|
"""Get the agents with the highest token balances.""" |
|
|
sorted_balances = sorted(self.balances.items(), key=lambda x: x[1], reverse=True) |
|
|
return sorted_balances[:limit] |
|
|
|
|
|
def to_dict(self) -> Dict[str, Any]: |
|
|
"""Convert token manager to dictionary representation.""" |
|
|
return { |
|
|
"currency_name": self.currency_name, |
|
|
"total_supply": self.get_total_supply(), |
|
|
"transaction_count": len(self.transactions), |
|
|
"agent_count": len(self.balances), |
|
|
"interest_rate": self.interest_rate, |
|
|
"transaction_fee": self.transaction_fee, |
|
|
"minimum_balance": self.minimum_balance, |
|
|
"created_at": self.created_at |
|
|
} |
|
|
|
|
|
|