vlbandara's picture
Upload folder using huggingface_hub
eb27803 verified
import os
import requests
import json
from typing import Dict, Any, Optional, Union
from datetime import datetime
from crewai.tools import BaseTool
from pydantic import Field, BaseModel
# Create a specific schema for the tool input
class AlpacaOrderInput(BaseModel):
action: str = Field(description="'buy', 'sell', or 'check' to check account")
symbol: str = Field(default="BTC/USD", description="Crypto trading pair (e.g., BTC/USD)")
quantity: Optional[float] = Field(default=None, description="Specific quantity to trade")
allocation_percentage: Optional[int] = Field(default=None, description="Percentage of portfolio to allocate")
confidence: Optional[int] = Field(default=None, description="Confidence level (0-100)")
class AlpacaCryptoOrderTool(BaseTool):
name: str = "Alpaca Crypto Order Execution Tool"
description: str = "Execute cryptocurrency buy/sell orders on Alpaca based on trading signals"
# Define fields
api_key: Optional[str] = Field(default=None, description="Alpaca API key")
api_secret: Optional[str] = Field(default=None, description="Alpaca API secret")
is_paper: bool = Field(default=True, description="Whether to use paper trading API")
base_url: str = Field(default="https://paper-api.alpaca.markets", description="API base URL for trading")
data_url: str = Field(default="https://data.alpaca.markets", description="API base URL for market data")
# Define the schema for the arguments
args_schema: type[AlpacaOrderInput] = AlpacaOrderInput
def __init__(self, **kwargs):
# Initialize the base class first
super().__init__(**kwargs)
# Set the API keys from environment variables if not provided directly
if not self.api_key:
self.api_key = os.getenv("ALPACA_API_KEY")
if not self.api_secret:
self.api_secret = os.getenv("ALPACA_API_SECRET")
# Set the API base URL based on whether we're using paper trading (only for trading API)
if 'base_url' not in kwargs: # Only set if not provided directly
self.base_url = "https://paper-api.alpaca.markets" if self.is_paper else "https://api.alpaca.markets"
print(f"Initialized Alpaca Order Tool with API key: {'Present' if self.api_key else 'Missing'}")
print(f"Using API endpoints: Trading={self.base_url}, Data={self.data_url}")
def _run(self, **kwargs) -> Dict[str, Any]:
"""
Execute a crypto order on Alpaca
Args can be provided either as individual parameters or as kwargs:
action: 'buy', 'sell', or 'check' (to check account)
symbol: Crypto symbol to trade (e.g., "BTC/USD")
quantity: Specific quantity to trade (e.g., 0.001 BTC)
allocation_percentage: Alternative to quantity - percentage of portfolio to allocate
confidence: Optional confidence level (0-100) to include in order metadata
Returns:
Dictionary with order results
"""
try:
# Extract parameters from kwargs
action = kwargs.get('action')
symbol = kwargs.get('symbol', 'BTC/USD')
quantity = kwargs.get('quantity')
allocation_percentage = kwargs.get('allocation_percentage')
confidence = kwargs.get('confidence')
# Log all inputs for debugging
print(f"Raw input kwargs: {kwargs}")
print(f"Extracted params: action={action}, symbol={symbol}, quantity={quantity}, allocation_percentage={allocation_percentage}, confidence={confidence}")
# Validate action is provided
if not action:
print("Missing required parameter: action")
return {
"error": "Missing required parameter: action. Must be 'buy', 'sell', or 'check'.",
"success": False
}
# Ensure proper symbol format (convert BTCUSD to BTC/USD if needed)
if symbol == "BTCUSD":
symbol = "BTC/USD"
print(f"Corrected symbol format from BTCUSD to {symbol}")
elif symbol == "ETHUSD":
symbol = "ETH/USD"
print(f"Corrected symbol format from ETHUSD to {symbol}")
print(f"Order Tool called with action={action}, symbol={symbol}, quantity={quantity}, allocation_percentage={allocation_percentage}")
# Validate action
if action.lower() not in ["buy", "sell", "check"]:
print(f"Invalid action: {action}")
return {
"error": f"Invalid action: {action}. Use 'buy', 'sell', or 'check'.",
"success": False
}
# Check account status if requested
if action.lower() == "check":
print("Checking account status")
account_info = self._check_account()
if account_info.get("success", False):
print(f"Account check successful: Cash={account_info.get('cash')}, Equity={account_info.get('equity')}")
return account_info
# Ensure we have valid credentials
if not self.api_key or not self.api_secret:
print("Missing API credentials")
return {
"error": "Missing API credentials. Please set ALPACA_API_KEY and ALPACA_API_SECRET environment variables.",
"success": False
}
# For buy/sell, validate we have either quantity or allocation_percentage
if action.lower() in ["buy", "sell"] and quantity is None and allocation_percentage is None:
print("Missing both quantity and allocation_percentage")
return {
"error": "For buy/sell orders, you must provide either quantity OR allocation_percentage",
"success": False
}
# Convert allocation_percentage to integer if it's a string
if allocation_percentage is not None and isinstance(allocation_percentage, str):
try:
allocation_percentage = int(allocation_percentage.strip('%'))
print(f"Converted allocation_percentage from string to int: {allocation_percentage}")
except ValueError:
return {
"error": f"Invalid allocation_percentage format: {allocation_percentage}. Must be an integer or percentage string.",
"success": False
}
# Convert quantity to float if it's a string
if quantity is not None and isinstance(quantity, str):
try:
quantity = float(quantity)
print(f"Converted quantity from string to float: {quantity}")
except ValueError:
return {
"error": f"Invalid quantity format: {quantity}. Must be a number.",
"success": False
}
# Determine the quantity based on inputs
final_quantity = None
# For 'sell' action with allocation percentage, we need to check current positions
if action.lower() == "sell" and allocation_percentage is not None:
print(f"Selling with allocation percentage: {allocation_percentage}%")
# Get current positions to determine how much to sell
positions = self._get_positions(symbol)
if not positions:
return {
"error": f"No position found for {symbol} to sell",
"success": False,
"positions_checked": True
}
# Calculate quantity to sell based on allocation percentage
position_qty = float(positions.get("qty", 0))
sell_qty = position_qty * (allocation_percentage / 100.0)
# Ensure minimum order size
min_order_size = 0.0001 if "BTC" in symbol else 0.001
if sell_qty < min_order_size:
sell_qty = min(min_order_size, position_qty)
final_quantity = round(sell_qty, 8)
print(f"Calculated sell quantity: {final_quantity} from position quantity: {position_qty}")
elif quantity is not None:
# Direct quantity specification
final_quantity = quantity
print(f"Using directly specified quantity: {final_quantity}")
elif allocation_percentage is not None:
# Calculate quantity based on allocation percentage
print(f"Calculating buy quantity based on allocation percentage: {allocation_percentage}%")
account_info = self._check_account()
if "error" in account_info:
return account_info
# Get account cash and calculate notional amount to trade
cash = float(account_info.get("cash", 0))
equity = float(account_info.get("equity", cash)) # Use equity if available, otherwise cash
# For allocation, we should use total equity for a more accurate percentage
allocation_amount = equity * (allocation_percentage / 100.0)
print(f"Account equity: ${equity}, Cash: ${cash}, Allocation amount: ${allocation_amount}")
# Get current price to calculate quantity
current_price = self._get_current_price(symbol)
if current_price is None:
# We've already tried all price retrieval methods and failed
# Check if this is a BUY order - we can use a default quantity
if action.lower() == "buy":
# For buy orders, we can use a default minimum quantity
min_order_size = 0.0001 if "BTC" in symbol else 0.001
print(f"WARNING: Price retrieval failed. Using minimum order size: {min_order_size}")
# Verify we have enough cash for minimum order
estimated_btc_price = 60000.0 # Fallback estimate for BTC price
estimated_cost = min_order_size * estimated_btc_price
if cash >= estimated_cost * 1.05: # Add 5% buffer for price fluctuations
print(f"Proceeding with minimum order using estimated price ~${estimated_btc_price}")
final_quantity = min_order_size
else:
return {
"error": f"Failed to get current price for {symbol} and insufficient cash for minimum order",
"success": False,
"price_retrieval_failed": True
}
else:
# For sell orders, we need the current price
return {
"error": f"Failed to get current price for {symbol}. The account status is verified with sufficient funds available for execution when the issue is resolved.",
"success": False,
"price_retrieval_failed": True
}
else:
print(f"Current price for {symbol}: ${current_price}")
# Calculate exact quantity of BTC to buy based on allocation amount and price
final_quantity = allocation_amount / current_price
# Format to appropriate number of decimal places based on symbol
if "BTC" in symbol:
final_quantity = round(final_quantity, 8) # 8 decimal places for BTC
else:
final_quantity = round(final_quantity, 4) # 4 decimal places for most other cryptos
print(f"Calculated quantity to {action}: {final_quantity} BTC (worth ${allocation_amount:.2f})")
# Set minimum order size
min_order_size = 0.0001 if "BTC" in symbol else 0.001
if final_quantity < min_order_size:
print(f"Calculated quantity {final_quantity} is below minimum order size {min_order_size}")
if allocation_amount > 5: # Only place order if allocation is meaningful
final_quantity = min_order_size
print(f"Setting to minimum order size: {min_order_size} BTC")
else:
return {
"error": f"Allocation amount ${allocation_amount:.2f} too small for minimum order size of {min_order_size} BTC (${min_order_size * current_price:.2f})",
"success": False,
"allocation_too_small": True
}
if final_quantity is None or final_quantity <= 0:
print("Invalid quantity determined")
return {
"error": "Invalid quantity. Please provide either a valid quantity or allocation percentage.",
"success": False
}
# Execute the order
print(f"Placing {action} order for {final_quantity} of {symbol}")
order_result = self._place_order(
symbol=symbol,
side=action.lower(),
quantity=final_quantity,
confidence=confidence
)
if order_result.get("success", False):
print(f"Order placed successfully: {order_result.get('order_id')}")
else:
print(f"Order failed: {order_result.get('error')}")
return order_result
except Exception as e:
print(f"Error executing order: {str(e)}")
return {
"error": f"Error executing order: {str(e)}",
"success": False
}
def _check_account(self) -> Dict[str, Any]:
"""Get account information from Alpaca"""
try:
url = f"{self.base_url}/v2/account"
headers = {
"Apca-Api-Key-Id": self.api_key,
"Apca-Api-Secret-Key": self.api_secret
}
response = requests.get(url, headers=headers)
if response.status_code != 200:
return {
"error": f"Failed to get account info: {response.text}",
"success": False
}
account_data = response.json()
return {
"account_id": account_data.get("id"),
"cash": account_data.get("cash"),
"equity": account_data.get("equity"),
"buying_power": account_data.get("buying_power"),
"success": True
}
except Exception as e:
return {
"error": f"Error checking account: {str(e)}",
"success": False
}
def _get_current_price(self, symbol: str) -> Optional[float]:
"""Get the current price of a crypto asset"""
try:
# Ensure proper symbol format (convert BTCUSD to BTC/USD if needed)
if symbol == "BTCUSD":
symbol = "BTC/USD"
print(f"Corrected symbol format from BTCUSD to {symbol} in _get_current_price")
elif symbol == "ETHUSD":
symbol = "ETH/USD"
print(f"Corrected symbol format from ETHUSD to {symbol} in _get_current_price")
# First attempt: Use latest bars endpoint
print(f"Attempting to get price for {symbol} using latest bars endpoint")
price = self._try_get_price_from_bars(symbol)
if price is not None:
return price
# Second attempt: Try the snapshots endpoint
print(f"Attempting to get price for {symbol} using snapshots endpoint")
price = self._try_get_price_from_snapshots(symbol)
if price is not None:
return price
# Third attempt: Try orderbook endpoint
print(f"Attempting to get price for {symbol} using orderbook endpoint")
price = self._try_get_price_from_orderbook(symbol)
if price is not None:
return price
# If all attempts fail, use a fallback hardcoded price
print(f"WARNING: Could not get price for {symbol} from any API endpoint. Using fallback price.")
fallback_prices = {
"BTC/USD": 85000.00, # Approximate BTC price as fallback
}
if symbol in fallback_prices:
print(f"Using fallback price for {symbol}: ${fallback_prices[symbol]}")
return fallback_prices[symbol]
return None
except Exception as e:
print(f"Error getting price: {str(e)}")
return None
def _try_get_price_from_bars(self, symbol: str) -> Optional[float]:
"""Try to get price from the bars endpoint"""
try:
url = f"{self.data_url}/v1beta3/crypto/us/latest/bars"
headers = {
"Apca-Api-Key-Id": self.api_key,
"Apca-Api-Secret-Key": self.api_secret
}
params = {"symbols": symbol}
print(f"Getting current price from bars endpoint: {url} for symbol {symbol}")
response = requests.get(url, headers=headers, params=params)
if response.status_code != 200:
print(f"Error getting price from bars: {response.status_code} - {response.text}")
return None
data = response.json()
print(f"Bars response: {json.dumps(data)[:300]}...")
if "bars" not in data or symbol not in data["bars"] or not data["bars"][symbol]:
print(f"No bar data found for {symbol}")
return None
# Return the close price of the latest bar
return float(data["bars"][symbol][0]["c"])
except Exception as e:
print(f"Error in _try_get_price_from_bars: {str(e)}")
return None
def _try_get_price_from_snapshots(self, symbol: str) -> Optional[float]:
"""Try to get price from the snapshots endpoint"""
try:
url = f"{self.data_url}/v1beta3/crypto/us/snapshots"
headers = {
"Apca-Api-Key-Id": self.api_key,
"Apca-Api-Secret-Key": self.api_secret
}
params = {"symbols": symbol}
print(f"Getting current price from snapshots endpoint: {url} for symbol {symbol}")
response = requests.get(url, headers=headers, params=params)
if response.status_code != 200:
print(f"Error getting price from snapshots: {response.status_code} - {response.text}")
return None
data = response.json()
print(f"Snapshots response: {json.dumps(data)[:300]}...")
if "snapshots" not in data or symbol not in data["snapshots"]:
print(f"No snapshot data found for {symbol}")
return None
# Try to get price from latest bar in the snapshot
snapshot = data["snapshots"][symbol]
if "bar" in snapshot and snapshot["bar"]:
return float(snapshot["bar"]["c"])
# Try to get price from latest quote in the snapshot
if "quote" in snapshot and snapshot["quote"]:
ask = snapshot["quote"].get("ap")
bid = snapshot["quote"].get("bp")
if ask and bid:
return (float(ask) + float(bid)) / 2 # Average of bid and ask
elif ask:
return float(ask)
elif bid:
return float(bid)
return None
except Exception as e:
print(f"Error in _try_get_price_from_snapshots: {str(e)}")
return None
def _try_get_price_from_orderbook(self, symbol: str) -> Optional[float]:
"""Try to get price from the orderbook endpoint"""
try:
url = f"{self.data_url}/v1beta3/crypto/us/latest/orderbooks"
headers = {
"Apca-Api-Key-Id": self.api_key,
"Apca-Api-Secret-Key": self.api_secret
}
params = {"symbols": symbol}
print(f"Getting current price from orderbook endpoint: {url} for symbol {symbol}")
response = requests.get(url, headers=headers, params=params)
if response.status_code != 200:
print(f"Error getting price from orderbook: {response.status_code} - {response.text}")
return None
data = response.json()
print(f"Orderbook response: {json.dumps(data)[:300]}...")
if "orderbooks" not in data or symbol not in data["orderbooks"]:
print(f"No orderbook data found for {symbol}")
return None
orderbook = data["orderbooks"][symbol]
# Try to get mid price from best bid and ask
if "a" in orderbook and orderbook["a"] and "b" in orderbook and orderbook["b"]:
best_ask = float(orderbook["a"][0]["p"]) if orderbook["a"] else None
best_bid = float(orderbook["b"][0]["p"]) if orderbook["b"] else None
if best_ask and best_bid:
return (best_ask + best_bid) / 2 # Mid price
elif best_ask:
return best_ask
elif best_bid:
return best_bid
return None
except Exception as e:
print(f"Error in _try_get_price_from_orderbook: {str(e)}")
return None
def _place_order(self, symbol: str, side: str, quantity: float, confidence: Optional[int] = None) -> Dict[str, Any]:
"""Place a crypto order on Alpaca"""
try:
# Ensure proper symbol format
if symbol == "BTCUSD":
symbol = "BTC/USD"
print(f"Corrected symbol format from BTCUSD to {symbol} in _place_order")
elif symbol == "ETHUSD":
symbol = "ETH/USD"
print(f"Corrected symbol format from ETHUSD to {symbol} in _place_order")
url = f"{self.base_url}/v2/orders"
headers = {
"Apca-Api-Key-Id": self.api_key,
"Apca-Api-Secret-Key": self.api_secret,
"Content-Type": "application/json"
}
# Ensure quantity is properly formatted
# Alpaca requires qty as a string with appropriate precision
if "BTC" in symbol:
qty_str = f"{quantity:.8f}".rstrip('0').rstrip('.') if '.' in f"{quantity:.8f}" else f"{quantity:.8f}"
else:
qty_str = f"{quantity:.4f}".rstrip('0').rstrip('.') if '.' in f"{quantity:.4f}" else f"{quantity:.4f}"
print(f"Formatting quantity {quantity} as '{qty_str}' for API call")
# Prepare the order data
order_data = {
"symbol": symbol,
"qty": qty_str, # Use formatted string
"side": side,
"type": "market",
"time_in_force": "gtc"
}
print(f"Sending order data to Alpaca: {json.dumps(order_data)}")
# Add custom metadata if confidence is provided
if confidence is not None:
order_data["client_order_id"] = f"signal-{confidence}-{datetime.now().strftime('%Y%m%d%H%M%S')}"
# Send the order request
response = requests.post(url, headers=headers, json=order_data)
if response.status_code not in [200, 201]:
return {
"error": f"Failed to place order: {response.text}",
"success": False
}
# Parse the response
order_response = response.json()
# Get the current price for reference
current_price = self._get_current_price(symbol)
# Return order details
return {
"order_id": order_response.get("id"),
"client_order_id": order_response.get("client_order_id"),
"symbol": order_response.get("symbol"),
"side": order_response.get("side"),
"quantity": order_response.get("qty"),
"order_type": order_response.get("type"),
"status": order_response.get("status"),
"current_price": current_price,
"created_at": order_response.get("created_at"),
"confidence": confidence,
"success": True
}
except Exception as e:
return {
"error": f"Error placing order: {str(e)}",
"success": False
}
def _get_positions(self, symbol: str = None) -> Dict[str, Any]:
"""Get current positions from Alpaca"""
try:
# Ensure proper symbol format if provided
original_symbol = symbol
# Normalize the symbol for comparison - can come in various formats
normalized_symbol = None
if symbol:
# Remove '/' for comparison if present
if '/' in symbol:
normalized_symbol = symbol.replace('/', '')
else:
# Add '/' if not present (e.g., BTCUSD -> BTC/USD)
if symbol == "BTCUSD":
normalized_symbol = "BTC/USD"
symbol = "BTC/USD"
elif symbol == "ETHUSD":
normalized_symbol = "ETH/USD"
symbol = "ETH/USD"
else:
# Try to insert a slash at the right place for other symbols
if len(symbol) >= 6 and symbol[0:3].isalpha() and symbol[3:].isalpha():
normalized_symbol = f"{symbol[0:3]}/{symbol[3:]}"
symbol = normalized_symbol
print(f"Getting positions for symbol: {symbol}, original input: {original_symbol}, normalized: {normalized_symbol}")
url = f"{self.base_url}/v2/positions"
headers = {
"Apca-Api-Key-Id": self.api_key,
"Apca-Api-Secret-Key": self.api_secret
}
# First get all positions to handle different symbol formats
response = requests.get(url, headers=headers)
if response.status_code == 404:
# No positions found
print("No positions found (404 response)")
return {}
if response.status_code != 200:
print(f"Error getting positions: {response.text}")
return {}
positions_data = response.json()
# If nothing was returned or not a list
if not positions_data or not isinstance(positions_data, list):
print(f"No positions data returned or unexpected format: {positions_data}")
return {}
# If we're not looking for a specific symbol, return all positions
if not symbol:
return positions_data
# Find position that matches our symbol in any format
for position in positions_data:
pos_symbol = position.get("symbol", "")
print(f"Checking position with symbol: {pos_symbol}")
# Try different matching approaches to find BTC position
# 1. Direct match
if pos_symbol == symbol:
print(f"Direct symbol match found: {pos_symbol}")
return position
# 2. Normalized match (without slash)
if normalized_symbol and pos_symbol.replace('/', '') == normalized_symbol.replace('/', ''):
print(f"Normalized symbol match found: {pos_symbol} matches {normalized_symbol}")
return position
# 3. Match by asset name for crypto
if "BTC" in symbol.upper() and "BTC" in pos_symbol.upper():
print(f"BTC match found in position symbol: {pos_symbol}")
return position
# 4. Match by asset name for ETH
if "ETH" in symbol.upper() and "ETH" in pos_symbol.upper():
print(f"ETH match found in position symbol: {pos_symbol}")
return position
# If we got here, no matching position was found
print(f"No matching position found for {symbol} among: {[p.get('symbol', 'N/A') for p in positions_data]}")
return {}
except Exception as e:
print(f"Error getting positions: {str(e)}")
return {}