import os import requests import json from typing import Dict, Any, Optional, Union from datetime import datetime from crewai.tools import BaseTool from pydantic import Field, BaseModel # Create a specific schema for the tool input class AlpacaOrderInput(BaseModel): action: str = Field(description="'buy', 'sell', or 'check' to check account") symbol: str = Field(default="BTC/USD", description="Crypto trading pair (e.g., BTC/USD)") quantity: Optional[float] = Field(default=None, description="Specific quantity to trade") allocation_percentage: Optional[int] = Field(default=None, description="Percentage of portfolio to allocate") confidence: Optional[int] = Field(default=None, description="Confidence level (0-100)") class AlpacaCryptoOrderTool(BaseTool): name: str = "Alpaca Crypto Order Execution Tool" description: str = "Execute cryptocurrency buy/sell orders on Alpaca based on trading signals" # Define fields api_key: Optional[str] = Field(default=None, description="Alpaca API key") api_secret: Optional[str] = Field(default=None, description="Alpaca API secret") is_paper: bool = Field(default=True, description="Whether to use paper trading API") base_url: str = Field(default="https://paper-api.alpaca.markets", description="API base URL for trading") data_url: str = Field(default="https://data.alpaca.markets", description="API base URL for market data") # Define the schema for the arguments args_schema: type[AlpacaOrderInput] = AlpacaOrderInput def __init__(self, **kwargs): # Initialize the base class first super().__init__(**kwargs) # Set the API keys from environment variables if not provided directly if not self.api_key: self.api_key = os.getenv("ALPACA_API_KEY") if not self.api_secret: self.api_secret = os.getenv("ALPACA_API_SECRET") # Set the API base URL based on whether we're using paper trading (only for trading API) if 'base_url' not in kwargs: # Only set if not provided directly self.base_url = "https://paper-api.alpaca.markets" if self.is_paper else "https://api.alpaca.markets" print(f"Initialized Alpaca Order Tool with API key: {'Present' if self.api_key else 'Missing'}") print(f"Using API endpoints: Trading={self.base_url}, Data={self.data_url}") def _run(self, **kwargs) -> Dict[str, Any]: """ Execute a crypto order on Alpaca Args can be provided either as individual parameters or as kwargs: action: 'buy', 'sell', or 'check' (to check account) symbol: Crypto symbol to trade (e.g., "BTC/USD") quantity: Specific quantity to trade (e.g., 0.001 BTC) allocation_percentage: Alternative to quantity - percentage of portfolio to allocate confidence: Optional confidence level (0-100) to include in order metadata Returns: Dictionary with order results """ try: # Extract parameters from kwargs action = kwargs.get('action') symbol = kwargs.get('symbol', 'BTC/USD') quantity = kwargs.get('quantity') allocation_percentage = kwargs.get('allocation_percentage') confidence = kwargs.get('confidence') # Log all inputs for debugging print(f"Raw input kwargs: {kwargs}") print(f"Extracted params: action={action}, symbol={symbol}, quantity={quantity}, allocation_percentage={allocation_percentage}, confidence={confidence}") # Validate action is provided if not action: print("Missing required parameter: action") return { "error": "Missing required parameter: action. Must be 'buy', 'sell', or 'check'.", "success": False } # Ensure proper symbol format (convert BTCUSD to BTC/USD if needed) if symbol == "BTCUSD": symbol = "BTC/USD" print(f"Corrected symbol format from BTCUSD to {symbol}") elif symbol == "ETHUSD": symbol = "ETH/USD" print(f"Corrected symbol format from ETHUSD to {symbol}") print(f"Order Tool called with action={action}, symbol={symbol}, quantity={quantity}, allocation_percentage={allocation_percentage}") # Validate action if action.lower() not in ["buy", "sell", "check"]: print(f"Invalid action: {action}") return { "error": f"Invalid action: {action}. Use 'buy', 'sell', or 'check'.", "success": False } # Check account status if requested if action.lower() == "check": print("Checking account status") account_info = self._check_account() if account_info.get("success", False): print(f"Account check successful: Cash={account_info.get('cash')}, Equity={account_info.get('equity')}") return account_info # Ensure we have valid credentials if not self.api_key or not self.api_secret: print("Missing API credentials") return { "error": "Missing API credentials. Please set ALPACA_API_KEY and ALPACA_API_SECRET environment variables.", "success": False } # For buy/sell, validate we have either quantity or allocation_percentage if action.lower() in ["buy", "sell"] and quantity is None and allocation_percentage is None: print("Missing both quantity and allocation_percentage") return { "error": "For buy/sell orders, you must provide either quantity OR allocation_percentage", "success": False } # Convert allocation_percentage to integer if it's a string if allocation_percentage is not None and isinstance(allocation_percentage, str): try: allocation_percentage = int(allocation_percentage.strip('%')) print(f"Converted allocation_percentage from string to int: {allocation_percentage}") except ValueError: return { "error": f"Invalid allocation_percentage format: {allocation_percentage}. Must be an integer or percentage string.", "success": False } # Convert quantity to float if it's a string if quantity is not None and isinstance(quantity, str): try: quantity = float(quantity) print(f"Converted quantity from string to float: {quantity}") except ValueError: return { "error": f"Invalid quantity format: {quantity}. Must be a number.", "success": False } # Determine the quantity based on inputs final_quantity = None # For 'sell' action with allocation percentage, we need to check current positions if action.lower() == "sell" and allocation_percentage is not None: print(f"Selling with allocation percentage: {allocation_percentage}%") # Get current positions to determine how much to sell positions = self._get_positions(symbol) if not positions: return { "error": f"No position found for {symbol} to sell", "success": False, "positions_checked": True } # Calculate quantity to sell based on allocation percentage position_qty = float(positions.get("qty", 0)) sell_qty = position_qty * (allocation_percentage / 100.0) # Ensure minimum order size min_order_size = 0.0001 if "BTC" in symbol else 0.001 if sell_qty < min_order_size: sell_qty = min(min_order_size, position_qty) final_quantity = round(sell_qty, 8) print(f"Calculated sell quantity: {final_quantity} from position quantity: {position_qty}") elif quantity is not None: # Direct quantity specification final_quantity = quantity print(f"Using directly specified quantity: {final_quantity}") elif allocation_percentage is not None: # Calculate quantity based on allocation percentage print(f"Calculating buy quantity based on allocation percentage: {allocation_percentage}%") account_info = self._check_account() if "error" in account_info: return account_info # Get account cash and calculate notional amount to trade cash = float(account_info.get("cash", 0)) equity = float(account_info.get("equity", cash)) # Use equity if available, otherwise cash # For allocation, we should use total equity for a more accurate percentage allocation_amount = equity * (allocation_percentage / 100.0) print(f"Account equity: ${equity}, Cash: ${cash}, Allocation amount: ${allocation_amount}") # Get current price to calculate quantity current_price = self._get_current_price(symbol) if current_price is None: # We've already tried all price retrieval methods and failed # Check if this is a BUY order - we can use a default quantity if action.lower() == "buy": # For buy orders, we can use a default minimum quantity min_order_size = 0.0001 if "BTC" in symbol else 0.001 print(f"WARNING: Price retrieval failed. Using minimum order size: {min_order_size}") # Verify we have enough cash for minimum order estimated_btc_price = 60000.0 # Fallback estimate for BTC price estimated_cost = min_order_size * estimated_btc_price if cash >= estimated_cost * 1.05: # Add 5% buffer for price fluctuations print(f"Proceeding with minimum order using estimated price ~${estimated_btc_price}") final_quantity = min_order_size else: return { "error": f"Failed to get current price for {symbol} and insufficient cash for minimum order", "success": False, "price_retrieval_failed": True } else: # For sell orders, we need the current price return { "error": f"Failed to get current price for {symbol}. The account status is verified with sufficient funds available for execution when the issue is resolved.", "success": False, "price_retrieval_failed": True } else: print(f"Current price for {symbol}: ${current_price}") # Calculate exact quantity of BTC to buy based on allocation amount and price final_quantity = allocation_amount / current_price # Format to appropriate number of decimal places based on symbol if "BTC" in symbol: final_quantity = round(final_quantity, 8) # 8 decimal places for BTC else: final_quantity = round(final_quantity, 4) # 4 decimal places for most other cryptos print(f"Calculated quantity to {action}: {final_quantity} BTC (worth ${allocation_amount:.2f})") # Set minimum order size min_order_size = 0.0001 if "BTC" in symbol else 0.001 if final_quantity < min_order_size: print(f"Calculated quantity {final_quantity} is below minimum order size {min_order_size}") if allocation_amount > 5: # Only place order if allocation is meaningful final_quantity = min_order_size print(f"Setting to minimum order size: {min_order_size} BTC") else: return { "error": f"Allocation amount ${allocation_amount:.2f} too small for minimum order size of {min_order_size} BTC (${min_order_size * current_price:.2f})", "success": False, "allocation_too_small": True } if final_quantity is None or final_quantity <= 0: print("Invalid quantity determined") return { "error": "Invalid quantity. Please provide either a valid quantity or allocation percentage.", "success": False } # Execute the order print(f"Placing {action} order for {final_quantity} of {symbol}") order_result = self._place_order( symbol=symbol, side=action.lower(), quantity=final_quantity, confidence=confidence ) if order_result.get("success", False): print(f"Order placed successfully: {order_result.get('order_id')}") else: print(f"Order failed: {order_result.get('error')}") return order_result except Exception as e: print(f"Error executing order: {str(e)}") return { "error": f"Error executing order: {str(e)}", "success": False } def _check_account(self) -> Dict[str, Any]: """Get account information from Alpaca""" try: url = f"{self.base_url}/v2/account" headers = { "Apca-Api-Key-Id": self.api_key, "Apca-Api-Secret-Key": self.api_secret } response = requests.get(url, headers=headers) if response.status_code != 200: return { "error": f"Failed to get account info: {response.text}", "success": False } account_data = response.json() return { "account_id": account_data.get("id"), "cash": account_data.get("cash"), "equity": account_data.get("equity"), "buying_power": account_data.get("buying_power"), "success": True } except Exception as e: return { "error": f"Error checking account: {str(e)}", "success": False } def _get_current_price(self, symbol: str) -> Optional[float]: """Get the current price of a crypto asset""" try: # Ensure proper symbol format (convert BTCUSD to BTC/USD if needed) if symbol == "BTCUSD": symbol = "BTC/USD" print(f"Corrected symbol format from BTCUSD to {symbol} in _get_current_price") elif symbol == "ETHUSD": symbol = "ETH/USD" print(f"Corrected symbol format from ETHUSD to {symbol} in _get_current_price") # First attempt: Use latest bars endpoint print(f"Attempting to get price for {symbol} using latest bars endpoint") price = self._try_get_price_from_bars(symbol) if price is not None: return price # Second attempt: Try the snapshots endpoint print(f"Attempting to get price for {symbol} using snapshots endpoint") price = self._try_get_price_from_snapshots(symbol) if price is not None: return price # Third attempt: Try orderbook endpoint print(f"Attempting to get price for {symbol} using orderbook endpoint") price = self._try_get_price_from_orderbook(symbol) if price is not None: return price # If all attempts fail, use a fallback hardcoded price print(f"WARNING: Could not get price for {symbol} from any API endpoint. Using fallback price.") fallback_prices = { "BTC/USD": 85000.00, # Approximate BTC price as fallback } if symbol in fallback_prices: print(f"Using fallback price for {symbol}: ${fallback_prices[symbol]}") return fallback_prices[symbol] return None except Exception as e: print(f"Error getting price: {str(e)}") return None def _try_get_price_from_bars(self, symbol: str) -> Optional[float]: """Try to get price from the bars endpoint""" try: url = f"{self.data_url}/v1beta3/crypto/us/latest/bars" headers = { "Apca-Api-Key-Id": self.api_key, "Apca-Api-Secret-Key": self.api_secret } params = {"symbols": symbol} print(f"Getting current price from bars endpoint: {url} for symbol {symbol}") response = requests.get(url, headers=headers, params=params) if response.status_code != 200: print(f"Error getting price from bars: {response.status_code} - {response.text}") return None data = response.json() print(f"Bars response: {json.dumps(data)[:300]}...") if "bars" not in data or symbol not in data["bars"] or not data["bars"][symbol]: print(f"No bar data found for {symbol}") return None # Return the close price of the latest bar return float(data["bars"][symbol][0]["c"]) except Exception as e: print(f"Error in _try_get_price_from_bars: {str(e)}") return None def _try_get_price_from_snapshots(self, symbol: str) -> Optional[float]: """Try to get price from the snapshots endpoint""" try: url = f"{self.data_url}/v1beta3/crypto/us/snapshots" headers = { "Apca-Api-Key-Id": self.api_key, "Apca-Api-Secret-Key": self.api_secret } params = {"symbols": symbol} print(f"Getting current price from snapshots endpoint: {url} for symbol {symbol}") response = requests.get(url, headers=headers, params=params) if response.status_code != 200: print(f"Error getting price from snapshots: {response.status_code} - {response.text}") return None data = response.json() print(f"Snapshots response: {json.dumps(data)[:300]}...") if "snapshots" not in data or symbol not in data["snapshots"]: print(f"No snapshot data found for {symbol}") return None # Try to get price from latest bar in the snapshot snapshot = data["snapshots"][symbol] if "bar" in snapshot and snapshot["bar"]: return float(snapshot["bar"]["c"]) # Try to get price from latest quote in the snapshot if "quote" in snapshot and snapshot["quote"]: ask = snapshot["quote"].get("ap") bid = snapshot["quote"].get("bp") if ask and bid: return (float(ask) + float(bid)) / 2 # Average of bid and ask elif ask: return float(ask) elif bid: return float(bid) return None except Exception as e: print(f"Error in _try_get_price_from_snapshots: {str(e)}") return None def _try_get_price_from_orderbook(self, symbol: str) -> Optional[float]: """Try to get price from the orderbook endpoint""" try: url = f"{self.data_url}/v1beta3/crypto/us/latest/orderbooks" headers = { "Apca-Api-Key-Id": self.api_key, "Apca-Api-Secret-Key": self.api_secret } params = {"symbols": symbol} print(f"Getting current price from orderbook endpoint: {url} for symbol {symbol}") response = requests.get(url, headers=headers, params=params) if response.status_code != 200: print(f"Error getting price from orderbook: {response.status_code} - {response.text}") return None data = response.json() print(f"Orderbook response: {json.dumps(data)[:300]}...") if "orderbooks" not in data or symbol not in data["orderbooks"]: print(f"No orderbook data found for {symbol}") return None orderbook = data["orderbooks"][symbol] # Try to get mid price from best bid and ask if "a" in orderbook and orderbook["a"] and "b" in orderbook and orderbook["b"]: best_ask = float(orderbook["a"][0]["p"]) if orderbook["a"] else None best_bid = float(orderbook["b"][0]["p"]) if orderbook["b"] else None if best_ask and best_bid: return (best_ask + best_bid) / 2 # Mid price elif best_ask: return best_ask elif best_bid: return best_bid return None except Exception as e: print(f"Error in _try_get_price_from_orderbook: {str(e)}") return None def _place_order(self, symbol: str, side: str, quantity: float, confidence: Optional[int] = None) -> Dict[str, Any]: """Place a crypto order on Alpaca""" try: # Ensure proper symbol format if symbol == "BTCUSD": symbol = "BTC/USD" print(f"Corrected symbol format from BTCUSD to {symbol} in _place_order") elif symbol == "ETHUSD": symbol = "ETH/USD" print(f"Corrected symbol format from ETHUSD to {symbol} in _place_order") url = f"{self.base_url}/v2/orders" headers = { "Apca-Api-Key-Id": self.api_key, "Apca-Api-Secret-Key": self.api_secret, "Content-Type": "application/json" } # Ensure quantity is properly formatted # Alpaca requires qty as a string with appropriate precision if "BTC" in symbol: qty_str = f"{quantity:.8f}".rstrip('0').rstrip('.') if '.' in f"{quantity:.8f}" else f"{quantity:.8f}" else: qty_str = f"{quantity:.4f}".rstrip('0').rstrip('.') if '.' in f"{quantity:.4f}" else f"{quantity:.4f}" print(f"Formatting quantity {quantity} as '{qty_str}' for API call") # Prepare the order data order_data = { "symbol": symbol, "qty": qty_str, # Use formatted string "side": side, "type": "market", "time_in_force": "gtc" } print(f"Sending order data to Alpaca: {json.dumps(order_data)}") # Add custom metadata if confidence is provided if confidence is not None: order_data["client_order_id"] = f"signal-{confidence}-{datetime.now().strftime('%Y%m%d%H%M%S')}" # Send the order request response = requests.post(url, headers=headers, json=order_data) if response.status_code not in [200, 201]: return { "error": f"Failed to place order: {response.text}", "success": False } # Parse the response order_response = response.json() # Get the current price for reference current_price = self._get_current_price(symbol) # Return order details return { "order_id": order_response.get("id"), "client_order_id": order_response.get("client_order_id"), "symbol": order_response.get("symbol"), "side": order_response.get("side"), "quantity": order_response.get("qty"), "order_type": order_response.get("type"), "status": order_response.get("status"), "current_price": current_price, "created_at": order_response.get("created_at"), "confidence": confidence, "success": True } except Exception as e: return { "error": f"Error placing order: {str(e)}", "success": False } def _get_positions(self, symbol: str = None) -> Dict[str, Any]: """Get current positions from Alpaca""" try: # Ensure proper symbol format if provided original_symbol = symbol # Normalize the symbol for comparison - can come in various formats normalized_symbol = None if symbol: # Remove '/' for comparison if present if '/' in symbol: normalized_symbol = symbol.replace('/', '') else: # Add '/' if not present (e.g., BTCUSD -> BTC/USD) if symbol == "BTCUSD": normalized_symbol = "BTC/USD" symbol = "BTC/USD" elif symbol == "ETHUSD": normalized_symbol = "ETH/USD" symbol = "ETH/USD" else: # Try to insert a slash at the right place for other symbols if len(symbol) >= 6 and symbol[0:3].isalpha() and symbol[3:].isalpha(): normalized_symbol = f"{symbol[0:3]}/{symbol[3:]}" symbol = normalized_symbol print(f"Getting positions for symbol: {symbol}, original input: {original_symbol}, normalized: {normalized_symbol}") url = f"{self.base_url}/v2/positions" headers = { "Apca-Api-Key-Id": self.api_key, "Apca-Api-Secret-Key": self.api_secret } # First get all positions to handle different symbol formats response = requests.get(url, headers=headers) if response.status_code == 404: # No positions found print("No positions found (404 response)") return {} if response.status_code != 200: print(f"Error getting positions: {response.text}") return {} positions_data = response.json() # If nothing was returned or not a list if not positions_data or not isinstance(positions_data, list): print(f"No positions data returned or unexpected format: {positions_data}") return {} # If we're not looking for a specific symbol, return all positions if not symbol: return positions_data # Find position that matches our symbol in any format for position in positions_data: pos_symbol = position.get("symbol", "") print(f"Checking position with symbol: {pos_symbol}") # Try different matching approaches to find BTC position # 1. Direct match if pos_symbol == symbol: print(f"Direct symbol match found: {pos_symbol}") return position # 2. Normalized match (without slash) if normalized_symbol and pos_symbol.replace('/', '') == normalized_symbol.replace('/', ''): print(f"Normalized symbol match found: {pos_symbol} matches {normalized_symbol}") return position # 3. Match by asset name for crypto if "BTC" in symbol.upper() and "BTC" in pos_symbol.upper(): print(f"BTC match found in position symbol: {pos_symbol}") return position # 4. Match by asset name for ETH if "ETH" in symbol.upper() and "ETH" in pos_symbol.upper(): print(f"ETH match found in position symbol: {pos_symbol}") return position # If we got here, no matching position was found print(f"No matching position found for {symbol} among: {[p.get('symbol', 'N/A') for p in positions_data]}") return {} except Exception as e: print(f"Error getting positions: {str(e)}") return {}