agent-arena / core /defi_mechanics.py
nice-bill's picture
Add core simulation (Agent, Pool, Simulation)
6669884
"""DeFi mechanics: Constant product AMM pool."""
from typing import Dict, Tuple
from dataclasses import dataclass, field
from config import SWAP_FEE
@dataclass
class Pool:
"""Constant product automated market maker (AMM)."""
reserve_a: float = 1000
reserve_b: float = 1000
liquidity_providers: Dict[str, float] = field(default_factory=dict)
_constant_product: float = None
def __post_init__(self):
self._constant_product = self.reserve_a * self.reserve_b
def swap(self, token_in: str, amount_in: float, agent_name: str) -> Tuple[float, float]:
"""
Execute a swap on the pool.
Returns:
Tuple of (amount_out, fee)
"""
if amount_in <= 0:
return 0, 0
if token_in == "a":
amount_out = self._calculate_output(amount_in, self.reserve_a, self.reserve_b)
fee = amount_out * SWAP_FEE
amount_out -= fee
self.reserve_a += amount_in
self.reserve_b -= amount_out
else:
amount_out = self._calculate_output(amount_in, self.reserve_b, self.reserve_a)
fee = amount_out * SWAP_FEE
amount_out -= fee
self.reserve_b += amount_in
self.reserve_a -= amount_out
self._constant_product = self.reserve_a * self.reserve_b
return amount_out, fee
def provide_liquidity(self, amount_a: float, amount_b: float, agent_name: str) -> float:
"""Add liquidity to the pool and mint LP tokens."""
if amount_a <= 0 or amount_b <= 0:
return 0
# Calculate LP tokens to mint (simple share model)
total_liquidity = sum(self.liquidity_providers.values())
if total_liquidity == 0:
# Initial liquidity - use geometric mean
lp_tokens = (amount_a * amount_b) ** 0.5
else:
# Proportional to existing liquidity
share_a = amount_a / self.reserve_a
share_b = amount_b / self.reserve_b
share = min(share_a, share_b) # Use smaller share to prevent imbalance
lp_tokens = share * (self.reserve_a + self.reserve_b)
self.reserve_a += amount_a
self.reserve_b += amount_b
self.liquidity_providers[agent_name] = (
self.liquidity_providers.get(agent_name, 0) + lp_tokens
)
self._constant_product = self.reserve_a * self.reserve_b
return lp_tokens
def withdraw_liquidity(self, lp_tokens: float, agent_name: str) -> Tuple[float, float]:
"""Remove liquidity and burn LP tokens."""
total_lp = sum(self.liquidity_providers.values())
if total_lp == 0 or lp_tokens <= 0:
return 0, 0
share = lp_tokens / total_lp
amount_a = self.reserve_a * share
amount_b = self.reserve_b * share
self.reserve_a -= amount_a
self.reserve_b -= amount_b
self.liquidity_providers[agent_name] -= lp_tokens
self._constant_product = self.reserve_a * self.reserve_b
return amount_a, amount_b
@property
def price_ab(self) -> float:
"""Get price of A in terms of B."""
return self.reserve_b / self.reserve_a if self.reserve_a > 0 else 0
@property
def price_ba(self) -> float:
"""Get price of B in terms of A."""
return self.reserve_a / self.reserve_b if self.reserve_b > 0 else 0
@property
def total_liquidity(self) -> float:
"""Get total liquidity in the pool."""
return sum(self.liquidity_providers.values())
@property
def constant_product(self) -> float:
"""Get the constant product k = a * b."""
if self._constant_product is None:
self._constant_product = self.reserve_a * self.reserve_b
return self._constant_product
@staticmethod
def _calculate_output(amount_in: float, reserve_in: float, reserve_out: float) -> float:
"""
Calculate output amount using constant product formula.
(x + dx) * (y - dy) = x * y
dy = y * dx / (x + dx)
"""
if amount_in <= 0 or reserve_in <= 0 or reserve_out <= 0:
return 0
numerator = amount_in * reserve_out
denominator = reserve_in + amount_in
return numerator / denominator
def get_state(self) -> Dict:
"""Get pool state for agents."""
return {
"reserve_a": self.reserve_a,
"reserve_b": self.reserve_b,
"price_ab": self.price_ab,
"price_ba": self.price_ba,
"total_liquidity": self.total_liquidity,
"constant_product": self.constant_product
}
def test_pool():
"""Test the Pool class."""
print("Testing Pool class...")
# Create pool
pool = Pool(reserve_a=1000, reserve_b=1000)
print(f"Initial pool: A={pool.reserve_a}, B={pool.reserve_b}")
print(f"Price A/B: {pool.price_ab:.4f}")
print(f"Constant product: {pool.constant_product}")
# Test swap
print("\nTesting swap: 100 A for B...")
amount_out, fee = pool.swap("a", 100, "TestAgent")
print(f"Output: {amount_out:.4f} B, Fee: {fee:.4f}")
print(f"Pool after swap: A={pool.reserve_a:.2f}, B={pool.reserve_b:.2f}")
# Test liquidity provision
print("\nTesting liquidity provision...")
lp = pool.provide_liquidity(200, 200, "TestAgent")
print(f"LP tokens minted: {lp:.4f}")
print(f"Total liquidity: {pool.total_liquidity:.4f}")
print("\nPool test complete!")
if __name__ == "__main__":
test_pool()