| | import os |
| | import requests |
| | import json |
| | from typing import Dict, Any, Optional, Union |
| | from datetime import datetime |
| | from crewai.tools import BaseTool |
| | from pydantic import Field, BaseModel |
| |
|
| | |
| | class AlpacaOrderInput(BaseModel): |
| | action: str = Field(description="'buy', 'sell', or 'check' to check account") |
| | symbol: str = Field(default="BTC/USD", description="Crypto trading pair (e.g., BTC/USD)") |
| | quantity: Optional[float] = Field(default=None, description="Specific quantity to trade") |
| | allocation_percentage: Optional[int] = Field(default=None, description="Percentage of portfolio to allocate") |
| | confidence: Optional[int] = Field(default=None, description="Confidence level (0-100)") |
| |
|
| | class AlpacaCryptoOrderTool(BaseTool): |
| | name: str = "Alpaca Crypto Order Execution Tool" |
| | description: str = "Execute cryptocurrency buy/sell orders on Alpaca based on trading signals" |
| | |
| | |
| | api_key: Optional[str] = Field(default=None, description="Alpaca API key") |
| | api_secret: Optional[str] = Field(default=None, description="Alpaca API secret") |
| | is_paper: bool = Field(default=True, description="Whether to use paper trading API") |
| | base_url: str = Field(default="https://paper-api.alpaca.markets", description="API base URL for trading") |
| | data_url: str = Field(default="https://data.alpaca.markets", description="API base URL for market data") |
| | |
| | |
| | args_schema: type[AlpacaOrderInput] = AlpacaOrderInput |
| | |
| | def __init__(self, **kwargs): |
| | |
| | super().__init__(**kwargs) |
| | |
| | |
| | if not self.api_key: |
| | self.api_key = os.getenv("ALPACA_API_KEY") |
| | if not self.api_secret: |
| | self.api_secret = os.getenv("ALPACA_API_SECRET") |
| | |
| | |
| | if 'base_url' not in kwargs: |
| | self.base_url = "https://paper-api.alpaca.markets" if self.is_paper else "https://api.alpaca.markets" |
| | |
| | print(f"Initialized Alpaca Order Tool with API key: {'Present' if self.api_key else 'Missing'}") |
| | print(f"Using API endpoints: Trading={self.base_url}, Data={self.data_url}") |
| | |
| | def _run(self, **kwargs) -> Dict[str, Any]: |
| | """ |
| | Execute a crypto order on Alpaca |
| | |
| | Args can be provided either as individual parameters or as kwargs: |
| | action: 'buy', 'sell', or 'check' (to check account) |
| | symbol: Crypto symbol to trade (e.g., "BTC/USD") |
| | quantity: Specific quantity to trade (e.g., 0.001 BTC) |
| | allocation_percentage: Alternative to quantity - percentage of portfolio to allocate |
| | confidence: Optional confidence level (0-100) to include in order metadata |
| | |
| | Returns: |
| | Dictionary with order results |
| | """ |
| | try: |
| | |
| | action = kwargs.get('action') |
| | symbol = kwargs.get('symbol', 'BTC/USD') |
| | quantity = kwargs.get('quantity') |
| | allocation_percentage = kwargs.get('allocation_percentage') |
| | confidence = kwargs.get('confidence') |
| | |
| | |
| | print(f"Raw input kwargs: {kwargs}") |
| | print(f"Extracted params: action={action}, symbol={symbol}, quantity={quantity}, allocation_percentage={allocation_percentage}, confidence={confidence}") |
| | |
| | |
| | if not action: |
| | print("Missing required parameter: action") |
| | return { |
| | "error": "Missing required parameter: action. Must be 'buy', 'sell', or 'check'.", |
| | "success": False |
| | } |
| | |
| | |
| | if symbol == "BTCUSD": |
| | symbol = "BTC/USD" |
| | print(f"Corrected symbol format from BTCUSD to {symbol}") |
| | elif symbol == "ETHUSD": |
| | symbol = "ETH/USD" |
| | print(f"Corrected symbol format from ETHUSD to {symbol}") |
| | |
| | print(f"Order Tool called with action={action}, symbol={symbol}, quantity={quantity}, allocation_percentage={allocation_percentage}") |
| | |
| | |
| | if action.lower() not in ["buy", "sell", "check"]: |
| | print(f"Invalid action: {action}") |
| | return { |
| | "error": f"Invalid action: {action}. Use 'buy', 'sell', or 'check'.", |
| | "success": False |
| | } |
| | |
| | |
| | if action.lower() == "check": |
| | print("Checking account status") |
| | account_info = self._check_account() |
| | if account_info.get("success", False): |
| | print(f"Account check successful: Cash={account_info.get('cash')}, Equity={account_info.get('equity')}") |
| | return account_info |
| | |
| | |
| | if not self.api_key or not self.api_secret: |
| | print("Missing API credentials") |
| | return { |
| | "error": "Missing API credentials. Please set ALPACA_API_KEY and ALPACA_API_SECRET environment variables.", |
| | "success": False |
| | } |
| | |
| | |
| | if action.lower() in ["buy", "sell"] and quantity is None and allocation_percentage is None: |
| | print("Missing both quantity and allocation_percentage") |
| | return { |
| | "error": "For buy/sell orders, you must provide either quantity OR allocation_percentage", |
| | "success": False |
| | } |
| | |
| | |
| | if allocation_percentage is not None and isinstance(allocation_percentage, str): |
| | try: |
| | allocation_percentage = int(allocation_percentage.strip('%')) |
| | print(f"Converted allocation_percentage from string to int: {allocation_percentage}") |
| | except ValueError: |
| | return { |
| | "error": f"Invalid allocation_percentage format: {allocation_percentage}. Must be an integer or percentage string.", |
| | "success": False |
| | } |
| | |
| | |
| | if quantity is not None and isinstance(quantity, str): |
| | try: |
| | quantity = float(quantity) |
| | print(f"Converted quantity from string to float: {quantity}") |
| | except ValueError: |
| | return { |
| | "error": f"Invalid quantity format: {quantity}. Must be a number.", |
| | "success": False |
| | } |
| | |
| | |
| | final_quantity = None |
| | |
| | |
| | if action.lower() == "sell" and allocation_percentage is not None: |
| | print(f"Selling with allocation percentage: {allocation_percentage}%") |
| | |
| | positions = self._get_positions(symbol) |
| | if not positions: |
| | return { |
| | "error": f"No position found for {symbol} to sell", |
| | "success": False, |
| | "positions_checked": True |
| | } |
| | |
| | |
| | position_qty = float(positions.get("qty", 0)) |
| | sell_qty = position_qty * (allocation_percentage / 100.0) |
| | |
| | |
| | min_order_size = 0.0001 if "BTC" in symbol else 0.001 |
| | if sell_qty < min_order_size: |
| | sell_qty = min(min_order_size, position_qty) |
| | |
| | final_quantity = round(sell_qty, 8) |
| | print(f"Calculated sell quantity: {final_quantity} from position quantity: {position_qty}") |
| | |
| | elif quantity is not None: |
| | |
| | final_quantity = quantity |
| | print(f"Using directly specified quantity: {final_quantity}") |
| | |
| | elif allocation_percentage is not None: |
| | |
| | print(f"Calculating buy quantity based on allocation percentage: {allocation_percentage}%") |
| | account_info = self._check_account() |
| | if "error" in account_info: |
| | return account_info |
| | |
| | |
| | cash = float(account_info.get("cash", 0)) |
| | equity = float(account_info.get("equity", cash)) |
| | |
| | |
| | allocation_amount = equity * (allocation_percentage / 100.0) |
| | |
| | print(f"Account equity: ${equity}, Cash: ${cash}, Allocation amount: ${allocation_amount}") |
| | |
| | |
| | current_price = self._get_current_price(symbol) |
| | if current_price is None: |
| | |
| | |
| | if action.lower() == "buy": |
| | |
| | min_order_size = 0.0001 if "BTC" in symbol else 0.001 |
| | print(f"WARNING: Price retrieval failed. Using minimum order size: {min_order_size}") |
| | |
| | |
| | estimated_btc_price = 60000.0 |
| | estimated_cost = min_order_size * estimated_btc_price |
| | |
| | if cash >= estimated_cost * 1.05: |
| | print(f"Proceeding with minimum order using estimated price ~${estimated_btc_price}") |
| | final_quantity = min_order_size |
| | else: |
| | return { |
| | "error": f"Failed to get current price for {symbol} and insufficient cash for minimum order", |
| | "success": False, |
| | "price_retrieval_failed": True |
| | } |
| | else: |
| | |
| | return { |
| | "error": f"Failed to get current price for {symbol}. The account status is verified with sufficient funds available for execution when the issue is resolved.", |
| | "success": False, |
| | "price_retrieval_failed": True |
| | } |
| | else: |
| | print(f"Current price for {symbol}: ${current_price}") |
| | |
| | |
| | final_quantity = allocation_amount / current_price |
| | |
| | |
| | if "BTC" in symbol: |
| | final_quantity = round(final_quantity, 8) |
| | else: |
| | final_quantity = round(final_quantity, 4) |
| | |
| | print(f"Calculated quantity to {action}: {final_quantity} BTC (worth ${allocation_amount:.2f})") |
| | |
| | |
| | min_order_size = 0.0001 if "BTC" in symbol else 0.001 |
| | if final_quantity < min_order_size: |
| | print(f"Calculated quantity {final_quantity} is below minimum order size {min_order_size}") |
| | if allocation_amount > 5: |
| | final_quantity = min_order_size |
| | print(f"Setting to minimum order size: {min_order_size} BTC") |
| | else: |
| | return { |
| | "error": f"Allocation amount ${allocation_amount:.2f} too small for minimum order size of {min_order_size} BTC (${min_order_size * current_price:.2f})", |
| | "success": False, |
| | "allocation_too_small": True |
| | } |
| | |
| | if final_quantity is None or final_quantity <= 0: |
| | print("Invalid quantity determined") |
| | return { |
| | "error": "Invalid quantity. Please provide either a valid quantity or allocation percentage.", |
| | "success": False |
| | } |
| | |
| | |
| | print(f"Placing {action} order for {final_quantity} of {symbol}") |
| | order_result = self._place_order( |
| | symbol=symbol, |
| | side=action.lower(), |
| | quantity=final_quantity, |
| | confidence=confidence |
| | ) |
| | |
| | if order_result.get("success", False): |
| | print(f"Order placed successfully: {order_result.get('order_id')}") |
| | else: |
| | print(f"Order failed: {order_result.get('error')}") |
| | |
| | return order_result |
| | |
| | except Exception as e: |
| | print(f"Error executing order: {str(e)}") |
| | return { |
| | "error": f"Error executing order: {str(e)}", |
| | "success": False |
| | } |
| | |
| | def _check_account(self) -> Dict[str, Any]: |
| | """Get account information from Alpaca""" |
| | try: |
| | url = f"{self.base_url}/v2/account" |
| | headers = { |
| | "Apca-Api-Key-Id": self.api_key, |
| | "Apca-Api-Secret-Key": self.api_secret |
| | } |
| | |
| | response = requests.get(url, headers=headers) |
| | |
| | if response.status_code != 200: |
| | return { |
| | "error": f"Failed to get account info: {response.text}", |
| | "success": False |
| | } |
| | |
| | account_data = response.json() |
| | |
| | return { |
| | "account_id": account_data.get("id"), |
| | "cash": account_data.get("cash"), |
| | "equity": account_data.get("equity"), |
| | "buying_power": account_data.get("buying_power"), |
| | "success": True |
| | } |
| | |
| | except Exception as e: |
| | return { |
| | "error": f"Error checking account: {str(e)}", |
| | "success": False |
| | } |
| | |
| | def _get_current_price(self, symbol: str) -> Optional[float]: |
| | """Get the current price of a crypto asset""" |
| | try: |
| | |
| | if symbol == "BTCUSD": |
| | symbol = "BTC/USD" |
| | print(f"Corrected symbol format from BTCUSD to {symbol} in _get_current_price") |
| | elif symbol == "ETHUSD": |
| | symbol = "ETH/USD" |
| | print(f"Corrected symbol format from ETHUSD to {symbol} in _get_current_price") |
| | |
| | |
| | print(f"Attempting to get price for {symbol} using latest bars endpoint") |
| | price = self._try_get_price_from_bars(symbol) |
| | if price is not None: |
| | return price |
| | |
| | |
| | print(f"Attempting to get price for {symbol} using snapshots endpoint") |
| | price = self._try_get_price_from_snapshots(symbol) |
| | if price is not None: |
| | return price |
| | |
| | |
| | print(f"Attempting to get price for {symbol} using orderbook endpoint") |
| | price = self._try_get_price_from_orderbook(symbol) |
| | if price is not None: |
| | return price |
| | |
| | |
| | print(f"WARNING: Could not get price for {symbol} from any API endpoint. Using fallback price.") |
| | fallback_prices = { |
| | "BTC/USD": 85000.00, |
| | } |
| | if symbol in fallback_prices: |
| | print(f"Using fallback price for {symbol}: ${fallback_prices[symbol]}") |
| | return fallback_prices[symbol] |
| | |
| | return None |
| | |
| | except Exception as e: |
| | print(f"Error getting price: {str(e)}") |
| | return None |
| | |
| | def _try_get_price_from_bars(self, symbol: str) -> Optional[float]: |
| | """Try to get price from the bars endpoint""" |
| | try: |
| | url = f"{self.data_url}/v1beta3/crypto/us/latest/bars" |
| | headers = { |
| | "Apca-Api-Key-Id": self.api_key, |
| | "Apca-Api-Secret-Key": self.api_secret |
| | } |
| | params = {"symbols": symbol} |
| | |
| | print(f"Getting current price from bars endpoint: {url} for symbol {symbol}") |
| | response = requests.get(url, headers=headers, params=params) |
| | |
| | if response.status_code != 200: |
| | print(f"Error getting price from bars: {response.status_code} - {response.text}") |
| | return None |
| | |
| | data = response.json() |
| | print(f"Bars response: {json.dumps(data)[:300]}...") |
| | |
| | if "bars" not in data or symbol not in data["bars"] or not data["bars"][symbol]: |
| | print(f"No bar data found for {symbol}") |
| | return None |
| | |
| | |
| | return float(data["bars"][symbol][0]["c"]) |
| | |
| | except Exception as e: |
| | print(f"Error in _try_get_price_from_bars: {str(e)}") |
| | return None |
| | |
| | def _try_get_price_from_snapshots(self, symbol: str) -> Optional[float]: |
| | """Try to get price from the snapshots endpoint""" |
| | try: |
| | url = f"{self.data_url}/v1beta3/crypto/us/snapshots" |
| | headers = { |
| | "Apca-Api-Key-Id": self.api_key, |
| | "Apca-Api-Secret-Key": self.api_secret |
| | } |
| | params = {"symbols": symbol} |
| | |
| | print(f"Getting current price from snapshots endpoint: {url} for symbol {symbol}") |
| | response = requests.get(url, headers=headers, params=params) |
| | |
| | if response.status_code != 200: |
| | print(f"Error getting price from snapshots: {response.status_code} - {response.text}") |
| | return None |
| | |
| | data = response.json() |
| | print(f"Snapshots response: {json.dumps(data)[:300]}...") |
| | |
| | if "snapshots" not in data or symbol not in data["snapshots"]: |
| | print(f"No snapshot data found for {symbol}") |
| | return None |
| | |
| | |
| | snapshot = data["snapshots"][symbol] |
| | if "bar" in snapshot and snapshot["bar"]: |
| | return float(snapshot["bar"]["c"]) |
| | |
| | |
| | if "quote" in snapshot and snapshot["quote"]: |
| | ask = snapshot["quote"].get("ap") |
| | bid = snapshot["quote"].get("bp") |
| | if ask and bid: |
| | return (float(ask) + float(bid)) / 2 |
| | elif ask: |
| | return float(ask) |
| | elif bid: |
| | return float(bid) |
| | |
| | return None |
| | |
| | except Exception as e: |
| | print(f"Error in _try_get_price_from_snapshots: {str(e)}") |
| | return None |
| | |
| | def _try_get_price_from_orderbook(self, symbol: str) -> Optional[float]: |
| | """Try to get price from the orderbook endpoint""" |
| | try: |
| | url = f"{self.data_url}/v1beta3/crypto/us/latest/orderbooks" |
| | headers = { |
| | "Apca-Api-Key-Id": self.api_key, |
| | "Apca-Api-Secret-Key": self.api_secret |
| | } |
| | params = {"symbols": symbol} |
| | |
| | print(f"Getting current price from orderbook endpoint: {url} for symbol {symbol}") |
| | response = requests.get(url, headers=headers, params=params) |
| | |
| | if response.status_code != 200: |
| | print(f"Error getting price from orderbook: {response.status_code} - {response.text}") |
| | return None |
| | |
| | data = response.json() |
| | print(f"Orderbook response: {json.dumps(data)[:300]}...") |
| | |
| | if "orderbooks" not in data or symbol not in data["orderbooks"]: |
| | print(f"No orderbook data found for {symbol}") |
| | return None |
| | |
| | orderbook = data["orderbooks"][symbol] |
| | |
| | |
| | if "a" in orderbook and orderbook["a"] and "b" in orderbook and orderbook["b"]: |
| | best_ask = float(orderbook["a"][0]["p"]) if orderbook["a"] else None |
| | best_bid = float(orderbook["b"][0]["p"]) if orderbook["b"] else None |
| | |
| | if best_ask and best_bid: |
| | return (best_ask + best_bid) / 2 |
| | elif best_ask: |
| | return best_ask |
| | elif best_bid: |
| | return best_bid |
| | |
| | return None |
| | |
| | except Exception as e: |
| | print(f"Error in _try_get_price_from_orderbook: {str(e)}") |
| | return None |
| | |
| | def _place_order(self, symbol: str, side: str, quantity: float, confidence: Optional[int] = None) -> Dict[str, Any]: |
| | """Place a crypto order on Alpaca""" |
| | try: |
| | |
| | if symbol == "BTCUSD": |
| | symbol = "BTC/USD" |
| | print(f"Corrected symbol format from BTCUSD to {symbol} in _place_order") |
| | elif symbol == "ETHUSD": |
| | symbol = "ETH/USD" |
| | print(f"Corrected symbol format from ETHUSD to {symbol} in _place_order") |
| | |
| | url = f"{self.base_url}/v2/orders" |
| | |
| | headers = { |
| | "Apca-Api-Key-Id": self.api_key, |
| | "Apca-Api-Secret-Key": self.api_secret, |
| | "Content-Type": "application/json" |
| | } |
| | |
| | |
| | |
| | if "BTC" in symbol: |
| | qty_str = f"{quantity:.8f}".rstrip('0').rstrip('.') if '.' in f"{quantity:.8f}" else f"{quantity:.8f}" |
| | else: |
| | qty_str = f"{quantity:.4f}".rstrip('0').rstrip('.') if '.' in f"{quantity:.4f}" else f"{quantity:.4f}" |
| | |
| | print(f"Formatting quantity {quantity} as '{qty_str}' for API call") |
| | |
| | |
| | order_data = { |
| | "symbol": symbol, |
| | "qty": qty_str, |
| | "side": side, |
| | "type": "market", |
| | "time_in_force": "gtc" |
| | } |
| | |
| | print(f"Sending order data to Alpaca: {json.dumps(order_data)}") |
| | |
| | |
| | if confidence is not None: |
| | order_data["client_order_id"] = f"signal-{confidence}-{datetime.now().strftime('%Y%m%d%H%M%S')}" |
| | |
| | |
| | response = requests.post(url, headers=headers, json=order_data) |
| | |
| | if response.status_code not in [200, 201]: |
| | return { |
| | "error": f"Failed to place order: {response.text}", |
| | "success": False |
| | } |
| | |
| | |
| | order_response = response.json() |
| | |
| | |
| | current_price = self._get_current_price(symbol) |
| | |
| | |
| | return { |
| | "order_id": order_response.get("id"), |
| | "client_order_id": order_response.get("client_order_id"), |
| | "symbol": order_response.get("symbol"), |
| | "side": order_response.get("side"), |
| | "quantity": order_response.get("qty"), |
| | "order_type": order_response.get("type"), |
| | "status": order_response.get("status"), |
| | "current_price": current_price, |
| | "created_at": order_response.get("created_at"), |
| | "confidence": confidence, |
| | "success": True |
| | } |
| | |
| | except Exception as e: |
| | return { |
| | "error": f"Error placing order: {str(e)}", |
| | "success": False |
| | } |
| | |
| | def _get_positions(self, symbol: str = None) -> Dict[str, Any]: |
| | """Get current positions from Alpaca""" |
| | try: |
| | |
| | original_symbol = symbol |
| | |
| | |
| | normalized_symbol = None |
| | if symbol: |
| | |
| | if '/' in symbol: |
| | normalized_symbol = symbol.replace('/', '') |
| | else: |
| | |
| | if symbol == "BTCUSD": |
| | normalized_symbol = "BTC/USD" |
| | symbol = "BTC/USD" |
| | elif symbol == "ETHUSD": |
| | normalized_symbol = "ETH/USD" |
| | symbol = "ETH/USD" |
| | else: |
| | |
| | if len(symbol) >= 6 and symbol[0:3].isalpha() and symbol[3:].isalpha(): |
| | normalized_symbol = f"{symbol[0:3]}/{symbol[3:]}" |
| | symbol = normalized_symbol |
| | |
| | print(f"Getting positions for symbol: {symbol}, original input: {original_symbol}, normalized: {normalized_symbol}") |
| | |
| | url = f"{self.base_url}/v2/positions" |
| | headers = { |
| | "Apca-Api-Key-Id": self.api_key, |
| | "Apca-Api-Secret-Key": self.api_secret |
| | } |
| | |
| | |
| | response = requests.get(url, headers=headers) |
| | |
| | if response.status_code == 404: |
| | |
| | print("No positions found (404 response)") |
| | return {} |
| | |
| | if response.status_code != 200: |
| | print(f"Error getting positions: {response.text}") |
| | return {} |
| | |
| | positions_data = response.json() |
| | |
| | |
| | if not positions_data or not isinstance(positions_data, list): |
| | print(f"No positions data returned or unexpected format: {positions_data}") |
| | return {} |
| | |
| | |
| | if not symbol: |
| | return positions_data |
| | |
| | |
| | for position in positions_data: |
| | pos_symbol = position.get("symbol", "") |
| | print(f"Checking position with symbol: {pos_symbol}") |
| | |
| | |
| | |
| | if pos_symbol == symbol: |
| | print(f"Direct symbol match found: {pos_symbol}") |
| | return position |
| | |
| | |
| | if normalized_symbol and pos_symbol.replace('/', '') == normalized_symbol.replace('/', ''): |
| | print(f"Normalized symbol match found: {pos_symbol} matches {normalized_symbol}") |
| | return position |
| | |
| | |
| | if "BTC" in symbol.upper() and "BTC" in pos_symbol.upper(): |
| | print(f"BTC match found in position symbol: {pos_symbol}") |
| | return position |
| | |
| | |
| | if "ETH" in symbol.upper() and "ETH" in pos_symbol.upper(): |
| | print(f"ETH match found in position symbol: {pos_symbol}") |
| | return position |
| | |
| | |
| | print(f"No matching position found for {symbol} among: {[p.get('symbol', 'N/A') for p in positions_data]}") |
| | return {} |
| | |
| | except Exception as e: |
| | print(f"Error getting positions: {str(e)}") |
| | return {} |