Spaces:
Sleeping
Sleeping
| """DeFi mechanics: Constant product AMM pool.""" | |
| from typing import Dict, Tuple | |
| from dataclasses import dataclass, field | |
| from config import SWAP_FEE | |
| class Pool: | |
| """Constant product automated market maker (AMM).""" | |
| reserve_a: float = 1000 | |
| reserve_b: float = 1000 | |
| liquidity_providers: Dict[str, float] = field(default_factory=dict) | |
| _constant_product: float = None | |
| def __post_init__(self): | |
| self._constant_product = self.reserve_a * self.reserve_b | |
| def swap(self, token_in: str, amount_in: float, agent_name: str) -> Tuple[float, float]: | |
| """ | |
| Execute a swap on the pool. | |
| Returns: | |
| Tuple of (amount_out, fee) | |
| """ | |
| if amount_in <= 0: | |
| return 0, 0 | |
| if token_in == "a": | |
| amount_out = self._calculate_output(amount_in, self.reserve_a, self.reserve_b) | |
| fee = amount_out * SWAP_FEE | |
| amount_out -= fee | |
| self.reserve_a += amount_in | |
| self.reserve_b -= amount_out | |
| else: | |
| amount_out = self._calculate_output(amount_in, self.reserve_b, self.reserve_a) | |
| fee = amount_out * SWAP_FEE | |
| amount_out -= fee | |
| self.reserve_b += amount_in | |
| self.reserve_a -= amount_out | |
| self._constant_product = self.reserve_a * self.reserve_b | |
| return amount_out, fee | |
| def provide_liquidity(self, amount_a: float, amount_b: float, agent_name: str) -> float: | |
| """Add liquidity to the pool and mint LP tokens.""" | |
| if amount_a <= 0 or amount_b <= 0: | |
| return 0 | |
| # Calculate LP tokens to mint (simple share model) | |
| total_liquidity = sum(self.liquidity_providers.values()) | |
| if total_liquidity == 0: | |
| # Initial liquidity - use geometric mean | |
| lp_tokens = (amount_a * amount_b) ** 0.5 | |
| else: | |
| # Proportional to existing liquidity | |
| share_a = amount_a / self.reserve_a | |
| share_b = amount_b / self.reserve_b | |
| share = min(share_a, share_b) # Use smaller share to prevent imbalance | |
| lp_tokens = share * (self.reserve_a + self.reserve_b) | |
| self.reserve_a += amount_a | |
| self.reserve_b += amount_b | |
| self.liquidity_providers[agent_name] = ( | |
| self.liquidity_providers.get(agent_name, 0) + lp_tokens | |
| ) | |
| self._constant_product = self.reserve_a * self.reserve_b | |
| return lp_tokens | |
| def withdraw_liquidity(self, lp_tokens: float, agent_name: str) -> Tuple[float, float]: | |
| """Remove liquidity and burn LP tokens.""" | |
| total_lp = sum(self.liquidity_providers.values()) | |
| if total_lp == 0 or lp_tokens <= 0: | |
| return 0, 0 | |
| share = lp_tokens / total_lp | |
| amount_a = self.reserve_a * share | |
| amount_b = self.reserve_b * share | |
| self.reserve_a -= amount_a | |
| self.reserve_b -= amount_b | |
| self.liquidity_providers[agent_name] -= lp_tokens | |
| self._constant_product = self.reserve_a * self.reserve_b | |
| return amount_a, amount_b | |
| def price_ab(self) -> float: | |
| """Get price of A in terms of B.""" | |
| return self.reserve_b / self.reserve_a if self.reserve_a > 0 else 0 | |
| def price_ba(self) -> float: | |
| """Get price of B in terms of A.""" | |
| return self.reserve_a / self.reserve_b if self.reserve_b > 0 else 0 | |
| def total_liquidity(self) -> float: | |
| """Get total liquidity in the pool.""" | |
| return sum(self.liquidity_providers.values()) | |
| def constant_product(self) -> float: | |
| """Get the constant product k = a * b.""" | |
| if self._constant_product is None: | |
| self._constant_product = self.reserve_a * self.reserve_b | |
| return self._constant_product | |
| def _calculate_output(amount_in: float, reserve_in: float, reserve_out: float) -> float: | |
| """ | |
| Calculate output amount using constant product formula. | |
| (x + dx) * (y - dy) = x * y | |
| dy = y * dx / (x + dx) | |
| """ | |
| if amount_in <= 0 or reserve_in <= 0 or reserve_out <= 0: | |
| return 0 | |
| numerator = amount_in * reserve_out | |
| denominator = reserve_in + amount_in | |
| return numerator / denominator | |
| def get_state(self) -> Dict: | |
| """Get pool state for agents.""" | |
| return { | |
| "reserve_a": self.reserve_a, | |
| "reserve_b": self.reserve_b, | |
| "price_ab": self.price_ab, | |
| "price_ba": self.price_ba, | |
| "total_liquidity": self.total_liquidity, | |
| "constant_product": self.constant_product | |
| } | |
| def test_pool(): | |
| """Test the Pool class.""" | |
| print("Testing Pool class...") | |
| # Create pool | |
| pool = Pool(reserve_a=1000, reserve_b=1000) | |
| print(f"Initial pool: A={pool.reserve_a}, B={pool.reserve_b}") | |
| print(f"Price A/B: {pool.price_ab:.4f}") | |
| print(f"Constant product: {pool.constant_product}") | |
| # Test swap | |
| print("\nTesting swap: 100 A for B...") | |
| amount_out, fee = pool.swap("a", 100, "TestAgent") | |
| print(f"Output: {amount_out:.4f} B, Fee: {fee:.4f}") | |
| print(f"Pool after swap: A={pool.reserve_a:.2f}, B={pool.reserve_b:.2f}") | |
| # Test liquidity provision | |
| print("\nTesting liquidity provision...") | |
| lp = pool.provide_liquidity(200, 200, "TestAgent") | |
| print(f"LP tokens minted: {lp:.4f}") | |
| print(f"Total liquidity: {pool.total_liquidity:.4f}") | |
| print("\nPool test complete!") | |
| if __name__ == "__main__": | |
| test_pool() | |