rairo's picture
Update app.py
aed0f94 verified
import os
import time
import socket
import logging
import dns.resolver
import streamlit as st
import pandas as pd
from dotenv import load_dotenv
from pycoingecko import CoinGeckoAPI
import google.generativeai as genai
from langchain_community.tools.yahoo_finance_news import YahooFinanceNewsTool # โ† Added import
from fpdf import FPDF
from fpdf.enums import XPos, YPos
import markdown
from datetime import datetime, date, timedelta
from io import BytesIO
from bs4 import BeautifulSoup
import re
from linkup import LinkupClient
import plotly.graph_objects as go # <-- Added for plotting
import numpy as np # <-- Added for calculations
# --- Page config (must be first) ---
st.set_page_config(
page_title="AAN Crypto Simulation Dashboard",
layout="wide"
)
# --- Inject CSS to slim sidebar ---
st.markdown(
"""
<style>
/* Target the sidebar itself (when expanded) */
[data-testid="stSidebar"][aria-expanded="true"] {
width: 215px !important;
}
/* Target the inner div for hiding when collapsed */
[data-testid="stSidebar"][aria-expanded="false"] > div:first-child {
width: 215px !important; /* You might adjust or remove this width */
margin-left: -215px !important;
}
/* Adjust the main content container's left margin */
/* This class might change in future Streamlit versions */
.main .block-container {
padding-left: 2rem; /* Adjust as needed, less than sidebar width */
padding-right: 2rem; /* Standard padding */
}
/* Fallback selector if the above doesn't work */
section.main > div.block-container {
padding-left: 2rem !important;
margin-left: 215px !important; /* Match sidebar width */
}
/* Attempt to target the specific class if needed, but prefer structural selectors */
.css-1d391kg {
margin-left: 215px !important;
}
</style>
""",
unsafe_allow_html=True,
)
# --- PDF Generation Class ---
class PDF_Generator(FPDF):
"""
FPDF subclass with improved table handling and page break management
"""
def __init__(self, orientation='P', unit='mm', format='A4'):
super().__init__(orientation, unit, format)
self.set_auto_page_break(auto=True, margin=15)
self.set_left_margin(15)
self.set_right_margin(15)
self.alias_nb_pages()
self.default_font = 'helvetica'
self.default_font_size = 10
self.table_row_height = 6 # Fixed row height for tables
self.table_padding = 2 # Vertical padding between rows
def set_default_font(self):
self.set_font(self.default_font, '', self.default_font_size)
def add_html_element(self, tag, styles):
""" Processes a single HTML tag with improved table handling """
text = tag.get_text()
tag_name = tag.name.lower()
# Basic styling
current_style = ''
if 'b' in styles or 'strong' in styles:
current_style += 'B'
if 'i' in styles or 'em' in styles:
current_style += 'I'
if not current_style:
self.set_default_font()
# Handle different HTML elements
if tag_name in ['h1', 'h2', 'h3', 'h4', 'h5', 'h6']:
level = int(tag_name[1])
font_size = {1: 18, 2: 16, 3: 14, 4: 12, 5: 11, 6: 10}.get(level, 10)
self.set_font(self.default_font, 'B', font_size)
self.multi_cell(0, font_size * 0.5, text, align='L')
self.ln(font_size * 0.3)
self.set_default_font()
elif tag_name == 'p':
self.set_font(self.default_font, current_style, self.default_font_size)
self.multi_cell(0, 5, text, align='L')
self.ln(3)
elif tag_name == 'ul':
self.ln(2)
for item in tag.find_all('li', recursive=False):
self.set_default_font()
item_text = item.get_text()
self.cell(5, 5, chr(127)) # Use a bullet character
self.multi_cell(0, 5, item_text, align='L')
self.ln(1)
self.ln(3)
elif tag_name == 'ol':
self.ln(2)
for i, item in enumerate(tag.find_all('li', recursive=False), 1):
self.set_default_font()
item_text = item.get_text()
self.cell(8, 5, f"{i}.")
self.multi_cell(0, 5, item_text, align='L')
self.ln(1)
self.ln(3)
elif tag_name == 'table':
self.ln(5)
if self.h - self.y < 50: # Ensure space for table
self.add_page()
self.process_table(tag)
self.ln(5)
elif tag_name in ['b', 'strong', 'i', 'em']:
# These tags modify style, handled by recursion
pass
elif tag_name == 'br':
self.ln(5)
elif tag_name == 'hr':
self.ln(2)
self.line(self.get_x(), self.get_y(), self.w - self.r_margin, self.get_y())
self.ln(4)
else:
# Handle other inline elements or just text content
if text.strip():
self.set_font(self.default_font, current_style, self.default_font_size)
# Use write instead of multi_cell for better inline handling if needed
# For simplicity, using multi_cell here which works for paragraphs etc.
self.multi_cell(0, 5, text, align='L')
self.ln(1) # Minimal line break after generic text
def process_table(self, table_tag):
"""Robust table processing with page break awareness"""
rows = table_tag.find_all('tr')
if not rows:
return
# Determine column count from first row
first_row_cells = rows[0].find_all(['th', 'td'])
num_cols = len(first_row_cells)
if num_cols == 0:
return
effective_width = self.w - self.l_margin - self.r_margin
col_width = effective_width / num_cols
# Draw header row first if it exists
header_cells = rows[0].find_all('th')
if header_cells:
self._draw_table_row(header_cells, col_width, is_header=True)
rows = rows[1:] # Process remaining rows
# Process data rows
for row in rows:
cells = row.find_all('td')
if len(cells) == num_cols: # Ensure consistent column count
self._draw_table_row(cells, col_width, is_header=False)
def _draw_table_row(self, cells, col_width, is_header):
"""Helper to draw a single table row, handling text wrapping and page breaks."""
max_lines = 1
cell_data = []
# First pass: Calculate required height for the row
temp_font = self.font_family
temp_style = self.font_style
temp_size = self.font_size
for cell in cells:
cell_text = cell.get_text().strip()
font = self.default_font
style = 'B' if is_header else ''
size = 9 # Use smaller font for tables
self.set_font(font, style, size)
wrapped_lines = []
if cell_text:
# Use FPDF's split_only feature correctly
# Create a temporary multi_cell output to get lines
wrapped_lines = self.multi_cell(
w=col_width - 2, # Adjust width slightly for padding/border
h=self.table_row_height, # Use base height for calculation
txt=cell_text,
border=0,
align='L',
split_only=True
)
max_lines = max(max_lines, len(wrapped_lines) if wrapped_lines else 1)
cell_data.append({
"text": cell_text,
"lines": wrapped_lines if wrapped_lines else [""], # Ensure at least one empty line
"is_header": is_header
})
# Restore original font settings
self.set_font(temp_font, temp_style, temp_size)
# Calculate total row height needed
row_height = (max_lines * self.table_row_height) + self.table_padding
# Check page space (including bottom margin)
space_left = self.h - self.y - self.b_margin
if space_left < row_height:
self.add_page()
# Optional: Redraw table header on new page if needed
# if not is_header and table has a header: self._draw_table_row(header_cells, col_width, True)
# Second pass: Draw the row
start_x = self.get_x()
start_y = self.get_y()
for i, data in enumerate(cell_data):
x = start_x + (i * col_width)
y = start_y
self.set_xy(x, y)
# Apply styling
font = self.default_font
style = 'B' if data["is_header"] else ''
size = 9
self.set_font(font, style, size)
if data["is_header"]:
self.set_fill_color(230, 230, 230) # Light gray for header
else:
self.set_fill_color(255, 255, 255) # White for data cells
# Use multi_cell to draw the wrapped text within the calculated height
self.multi_cell(
w=col_width,
h=self.table_row_height, # Use fixed height per line for consistency
txt="\n".join(data["lines"]), # Join the pre-calculated lines
border=1,
align='L',
fill=True,
new_x=XPos.RIGHT, # Move to the start of the next cell horizontally
new_y=YPos.TOP # Keep the Y position the same for this row
)
# Reset fill color after each cell (optional, good practice)
self.set_fill_color(255, 255, 255)
# Move cursor down below the drawn row
self.set_xy(self.l_margin, start_y + row_height - self.table_padding) # Adjusted Y position
self.ln(self.table_padding) # Add padding after the row
def create_pdf_report(report_text):
"""
Creates a PDF from markdown text with proper table handling using the enhanced PDF_Generator.
"""
if not report_text:
raise ValueError("Input report_text cannot be empty.")
try:
# Clean Markdown (Replace non-standard dashes)
cleaned_md = re.sub(r'[โ€“โ€”]', '-', report_text)
# Remove potential markdown code block fences
cleaned_md = re.sub(r'^```markdown\s*', '', cleaned_md, flags=re.MULTILINE)
cleaned_md = re.sub(r'\s*```$', '', cleaned_md, flags=re.MULTILINE)
cleaned_md = cleaned_md.strip()
# Convert Markdown to HTML
html_content = markdown.markdown(cleaned_md, extensions=['tables', 'fenced_code', 'sane_lists'])
if not html_content or not html_content.strip():
raise ValueError("Markdown parsing resulted in empty HTML.")
# Parse HTML using BeautifulSoup
soup = BeautifulSoup(html_content, 'html.parser')
# Generate PDF using the custom class
pdf = PDF_Generator()
pdf.add_page()
pdf.set_default_font()
# --- Process HTML elements recursively ---
def process_element(tag, current_styles):
if tag.name is None: # Handle NavigableString
if tag.strip():
# Apply styles for inline elements
style_str = ''.join(list(current_styles))
pdf.set_font(pdf.default_font, style_str, pdf.default_font_size)
# Use write for potentially better inline flow control
pdf.write(5, tag)
pdf.set_default_font() # Reset to default after writing styled text
return
local_styles_added = set()
if tag.name in ['b', 'strong']:
current_styles.add('B')
local_styles_added.add('B')
elif tag.name in ['i', 'em']:
current_styles.add('I')
local_styles_added.add('I')
# --- Block Level Elements ---
if tag.name in ['p', 'h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'ul', 'ol', 'table', 'br', 'hr']:
# Add some vertical space before block elements if needed
if pdf.get_y() > pdf.t_margin + 5: # Avoid adding space at the top of a page
pass # pdf.ln(1) # Minimal spacing
pdf.add_html_element(tag, current_styles.copy())
# Block elements process their children within add_html_element
else:
# --- Inline or Unhandled Elements: Process Children ---
if hasattr(tag, 'contents'):
for child in tag.contents:
process_element(child, current_styles.copy())
# Remove local styles after processing the element and its children
current_styles.difference_update(local_styles_added)
# Process top-level elements in the parsed HTML body
body = soup.find('body') # BeautifulSoup often wraps content in <html><body>
elements_to_process = body.contents if body else soup.contents
for element in elements_to_process:
process_element(element, set()) # Start with empty styles
# --- Output PDF ---
pdf_output = pdf.output(dest='S')
# Ensure output is bytes
if isinstance(pdf_output, str):
pdf_output = pdf_output.encode('latin-1') # FPDF default encoding
return BytesIO(pdf_output)
except ImportError as e:
# Raise a more specific error or log it
logger.error(f"PDF Generation Error: Missing library {e.name}")
raise Exception(f"PDF generation failed due to missing library: {e.name}") from e
except Exception as e:
logger.error(f"PDF Generation Error: {e}", exc_info=True)
raise Exception(f"PDF generation failed: {e}") from e
# --- Logging setup ---
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
def setup_dns():
"""Patch socket.getaddrinfo to use custom DNS for api.coingecko.com."""
nameserver1 = os.getenv('NAMESERVER1', '8.8.8.8')
nameserver2 = os.getenv('NAMESERVER2', '8.8.4.4')
try:
# Ensure dnspython is available
import dns.resolver
resolver = dns.resolver.Resolver()
resolver.nameservers = [nameserver1, nameserver2]
# Store original function
_orig_getaddrinfo = socket.getaddrinfo
def patched_getaddrinfo(*args, **kwargs):
hostname = args[0] if args else None
# Only patch for the specific host we care about
if hostname == 'api.coingecko.com':
try:
# Resolve using custom DNS
answers = resolver.resolve(hostname, 'A')
if answers:
ip = str(answers[0])
port = args[1] if len(args) > 1 else 0
# Call original function with resolved IP
# Note: This simplistic patch assumes IPv4 and might need refinement
# for broader use cases (e.g., handling different families)
return _orig_getaddrinfo(ip, port, *args[2:], **kwargs)
except dns.resolver.NXDOMAIN:
logger.warning(f"Custom DNS: NXDOMAIN for {hostname}. Falling back.")
except dns.resolver.NoAnswer:
logger.warning(f"Custom DNS: No A records found for {hostname}. Falling back.")
except dns.exception.Timeout:
logger.warning(f"Custom DNS: Timeout resolving {hostname}. Falling back.")
except Exception as resolve_err:
logger.warning(f"Custom DNS resolution failed for {hostname}: {resolve_err}. Falling back.")
# For any other hostname or if custom resolution fails, use the original function
return _orig_getaddrinfo(*args, **kwargs)
socket.getaddrinfo = patched_getaddrinfo
logger.info(f"socket.getaddrinfo patched to use DNS servers: {resolver.nameservers}")
return True
except ImportError:
logger.warning("dnspython not installed. Cannot patch DNS. Using system default.")
return False
except Exception as e:
logger.error(f"DNS setup failed: {e}")
return False
# --- Load environment & initialize DNS ---
load_dotenv()
if 'dns_setup_done' not in st.session_state:
st.session_state.dns_setup_done = setup_dns()
if not st.session_state.dns_setup_done and os.getenv('NAMESERVER1'):
st.warning("DNS patch failed or dnspython not installed; network requests may use system DNS.", icon="โš ๏ธ")
# --- API clients & keys ---
GEMINI_API_KEY = os.getenv("GEMINI_API_KEY")
LINKUP_API_KEY = os.getenv("LINKUP_API_KEY")
# CoinGecko (no key required) - Initialize robustly
if 'cg' not in st.session_state:
try:
cg_client = CoinGeckoAPI()
ping_result = cg_client.ping()
if ping_result.get('gecko_says', '').startswith('(V3)'):
st.session_state.cg = cg_client
st.session_state.coingecko_status = "success"
logger.info("CoinGecko API client initialized and ping successful.")
else:
st.session_state.cg = None
st.session_state.coingecko_status = f"CoinGecko Ping Error: Unexpected response {ping_result}"
logger.error(f"CoinGecko API ping failed: {ping_result}")
except Exception as e:
st.session_state.cg = None
# Check for common network errors
if isinstance(e, (socket.gaierror, ConnectionRefusedError)):
st.session_state.coingecko_status = f"Network Error: {e}"
else:
st.session_state.coingecko_status = f"Initialization Error: {e}"
logger.error(f"Failed to initialize CoinGecko API client: {e}", exc_info=True)
# Gemini (needs key)
if 'gemini_model' not in st.session_state:
if GEMINI_API_KEY:
try:
genai.configure(api_key=GEMINI_API_KEY)
# Attempt to use the newer model first
model_name = 'gemini-2.5-pro-exp-03-25'
try:
gem = genai.GenerativeModel(model_name)
st.session_state.gemini_model = gem
st.session_state.gemini_status = "success"
logger.info(f"Gemini AI client initialized successfully (using {model_name}).")
except Exception as e_latest:
logger.warning(f"Failed to initialize {model_name}: {e_latest}. Trying gemini-2.0-flash-thinking-exp")
try:
model_name = 'gemini-2.0-flash-thinking-exp' # Fallback model
gem = genai.GenerativeModel(model_name)
st.session_state.gemini_model = gem
st.session_state.gemini_status = "success"
logger.info(f"Gemini AI client initialized successfully (using {model_name}).")
except Exception as e_pro:
st.session_state.gemini_model = None
st.session_state.gemini_status = f"Gemini Init Error: {e_pro}"
logger.error(f"Failed to initialize Gemini AI client: {e_pro}")
except Exception as e_config:
st.session_state.gemini_model = None
st.session_state.gemini_status = f"Gemini Config Error: {e_config}"
logger.error(f"Failed to configure Gemini AI: {e_config}")
else:
st.session_state.gemini_model = None
st.session_state.gemini_status = "No Gemini API key; AI disabled."
logger.warning("Gemini API key not found.")
# Get clients from session state
cg = st.session_state.get('cg')
coingecko_status = st.session_state.get('coingecko_status', 'Not Initialized')
gemini_model = st.session_state.get('gemini_model')
gemini_status = st.session_state.get('gemini_status', 'Not Initialized')
# --- Instantiate Yahoo Finance News Tool ---
if 'yahoo_news_tool' not in st.session_state:
st.session_state.yahoo_news_tool = YahooFinanceNewsTool()
yahoo_news_tool = st.session_state.yahoo_news_tool
# --- Linkup Client ---
lclient = None
if LINKUP_API_KEY:
try:
lclient = LinkupClient(api_key=LINKUP_API_KEY)
logger.info("Linkup Client initialized.")
except Exception as e:
logger.error(f"Failed to initialize Linkup Client: {e}")
st.session_state.linkup_status = f"Linkup Init Error: {e}"
else:
st.session_state.linkup_status = "No Linkup API key; Research disabled."
logger.warning("Linkup API key not found.")
# --- Constants & symbol maps ---
REFRESH_INTERVAL = 300 # secs (5 minutes)
COINGECKO_IDS = {
'BTC': 'bitcoin', 'ETH': 'ethereum', 'SOL': 'solana', 'DOGE': 'dogecoin',
'USDT': 'tether', 'USDC': 'usd-coin', 'XRP': 'ripple', 'ADA': 'cardano',
'AVAX': 'avalanche-2', 'DOT': 'polkadot', 'LINK': 'chainlink', 'MATIC':'matic-network',
'LTC': 'litecoin', 'BCH': 'bitcoin-cash', 'BNB': 'binancecoin', 'XLM': 'stellar',
'SHIB': 'shiba-inu', 'TRX': 'tron', 'DAI': 'dai', 'WBTC': 'wrapped-bitcoin',
'ETC': 'ethereum-classic', 'ICP': 'internet-computer', 'LEO': 'leo-token',
'UNI': 'uniswap', 'ATOM': 'cosmos', 'OKB': 'okb', 'TON': 'the-open-network' # Added more coins
}
SYMBOL_MAP = {v: k for k, v in COINGECKO_IDS.items()}
ALL_SYMBOLS = sorted(list(COINGECKO_IDS.keys())) # Ensure it's sorted
# --- Session state defaults ---
if 'portfolio' not in st.session_state:
st.session_state.portfolio = {'USD': {'quantity': 1000.0, 'type': 'cash'}}
if 'all_coin_prices' not in st.session_state:
st.session_state.all_coin_prices = {}
if 'trade_history' not in st.session_state:
st.session_state.trade_history = []
if 'last_refresh_time' not in st.session_state:
st.session_state.last_refresh_time = 0
if 'performance_data_cache' not in st.session_state:
st.session_state.performance_data_cache = {} # Cache for historical data
# --- Data utilities ---
def fetch_all_coin_prices():
if not cg:
logger.warning("CoinGecko client not available, cannot fetch all prices.")
st.session_state.all_coin_prices = st.session_state.get('all_coin_prices', {})
return False
ids_to_fetch = list(COINGECKO_IDS.values())
if not ids_to_fetch:
logger.warning("No CoinGecko IDs defined to fetch prices.")
st.session_state.all_coin_prices = {}
return False
try:
logger.info(f"Attempting to fetch prices for {len(ids_to_fetch)} coins...")
# Fetch price and 24h change
markets_data = cg.get_coins_markets(vs_currency='usd', ids=ids_to_fetch, price_change_percentage='24h')
fetched_prices = {}
missing_symbols = []
if not markets_data:
logger.warning("Received empty market data from CoinGecko.")
st.session_state.all_coin_prices = st.session_state.get('all_coin_prices', {}) # Keep old prices if fetch fails
return False
for coin_data in markets_data:
cg_id = coin_data.get('id')
symbol = SYMBOL_MAP.get(cg_id)
price = coin_data.get('current_price')
if symbol and price is not None:
try:
fetched_prices[symbol] = float(price)
except (ValueError, TypeError):
logger.warning(f"Could not convert price for {symbol} ({cg_id}) to float: {price}")
missing_symbols.append(symbol)
elif symbol:
missing_symbols.append(symbol)
logger.warning(f"Price data missing or null for {symbol} ({cg_id}) in API response.")
# Optionally store 24h change if needed elsewhere
# price_change_24h = coin_data.get('price_change_percentage_24h')
# if symbol and price_change_24h is not None:
# st.session_state.all_coin_prices_24h_change[symbol] = price_change_24h
st.session_state.all_coin_prices = fetched_prices
st.session_state.last_refresh_time = time.time()
logger.info(f"Successfully fetched prices for {len(fetched_prices)} coins.")
if missing_symbols:
logger.warning(f"Could not fetch valid prices for: {', '.join(sorted(set(missing_symbols)))}")
return True
except Exception as e:
logger.error(f"Error fetching all coin prices: {e}", exc_info=True)
# Preserve existing prices if fetch fails to avoid breaking the UI
st.session_state.all_coin_prices = st.session_state.get('all_coin_prices', {})
st.toast(f"โš ๏ธ Price fetch failed: {e}. Using cached prices.", icon="๐Ÿšจ")
# Update status to reflect the error for sidebar display
st.session_state.coingecko_status = f"Fetch Error: {e}"
return False
def update_prices():
logger.info("Triggering update of all coin prices.")
# Reset status before attempting fetch
if cg: st.session_state.coingecko_status = "success" # Assume success initially
return fetch_all_coin_prices() # Return status of fetch
def calculate_portfolio_value():
total = st.session_state.portfolio.get('USD', {}).get('quantity', 0.0)
all_prices = st.session_state.get('all_coin_prices', {})
for sym, data in st.session_state.portfolio.items():
if data.get('type') == 'crypto':
price = all_prices.get(sym)
if price is not None and price > 0:
total += data.get('quantity', 0) * price
elif price is None:
logger.debug(f"Price for {sym} is None during portfolio calculation.")
return total
def simulate_trade(trade_type, symbol, quantity, price):
symbol = symbol.upper()
if price is None or price <= 0:
st.error(f"Invalid or unavailable price (${price}) for {symbol}. Cannot simulate trade.")
logger.error(f"Simulate trade failed for {symbol}: Invalid price provided ({price})")
return False
cost = quantity * price
p = st.session_state.portfolio
current_time = pd.Timestamp.now(tz='UTC')
if trade_type == 'BUY':
usd_balance = p.get('USD', {}).get('quantity', 0.0)
if usd_balance < cost:
st.error(f"Insufficient USD balance. Need ${cost:.2f}, have ${usd_balance:.2f}")
return False
if 'USD' not in p: p['USD'] = {'quantity': 0.0, 'type': 'cash'}
p['USD']['quantity'] -= cost
if symbol not in p:
p[symbol] = {'quantity': 0.0, 'type': 'crypto'}
elif p[symbol].get('type') != 'crypto':
p[symbol]['type'] = 'crypto' # Ensure type is correct
if 'quantity' not in p[symbol]: p[symbol]['quantity'] = 0.0
p[symbol]['quantity'] += quantity
st.session_state.trade_history.append({
'Timestamp': current_time, 'Type': 'BUY', 'Symbol': symbol,
'Quantity': quantity, 'Price (USD)': price, 'Cost (USD)': cost
})
st.success(f"BUY {quantity:.8f} {symbol} @ ${price:.4f} (Cost: ${cost:.2f})")
logger.info(f"Simulated BUY: {quantity} {symbol} @ {price}")
return True
elif trade_type == 'SELL':
# Check if the crypto exists and has quantity key
if symbol not in p or 'quantity' not in p[symbol]:
st.error(f"You do not own any {symbol} to sell.")
return False
crypto_balance = p[symbol].get('quantity', 0.0)
# Use a small epsilon for float comparison
if crypto_balance < quantity - 1e-9: # Check if balance is less than needed quantity
st.error(f"Not enough {symbol} to sell. Have {crypto_balance:.8f}, need {quantity:.8f}")
return False
p[symbol]['quantity'] -= quantity
# Remove symbol from portfolio if quantity becomes effectively zero
if p[symbol]['quantity'] < 1e-9:
del p[symbol]
if 'USD' not in p: # Ensure USD entry exists
p['USD'] = {'quantity': 0.0, 'type': 'cash'}
p['USD']['quantity'] += cost # Add proceeds
st.session_state.trade_history.append({
'Timestamp': current_time, 'Type': 'SELL', 'Symbol': symbol,
'Quantity': quantity, 'Price (USD)': price, 'Proceeds (USD)': cost # Use Proceeds for SELL
})
st.success(f"SELL {quantity:.8f} {symbol} @ ${price:.4f} (Proceeds: ${cost:.2f})")
logger.info(f"Simulated SELL: {quantity} {symbol} @ {price}")
return True
else:
st.error(f"Unknown trade type: {trade_type}")
logger.error(f"Unknown trade type encountered: {trade_type}")
return False
def should_update_prices():
last_refresh = st.session_state.get('last_refresh_time', 0)
return (time.time() - last_refresh) >= REFRESH_INTERVAL
def get_coin_news(symbols, max_updates=3):
"""
Fetch the latest financial news for each symbol using YahooFinanceNewsTool.
Returns a dict mapping symbol -> list of news items (strings).
"""
news = {}
# Ensure symbols is iterable and contains strings
valid_symbols = [str(s).upper() for s in (symbols or []) if isinstance(s, str)]
for sym_up in valid_symbols:
try:
# Use the LangChain tool to fetch news
# Append '-USD' which sometimes helps Yahoo Finance find crypto pairs
search_term = f"{sym_up}-USD"
result = yahoo_news_tool.invoke(search_term)
if result and "Item Not Found" not in result and "cannot find" not in result.lower():
# Split on newlines and remove empty strings
items = [line.strip() for line in result.split('\n') if line.strip()]
# Limit the number of items
news[sym_up] = items[:max_updates] if items else [f"No specific news items found for {sym_up} via Yahoo Finance."]
else:
# Try without '-USD' as a fallback
result_fallback = yahoo_news_tool.invoke(sym_up)
if result_fallback and "Item Not Found" not in result_fallback and "cannot find" not in result_fallback.lower():
items = [line.strip() for line in result_fallback.split('\n') if line.strip()]
news[sym_up] = items[:max_updates] if items else [f"No specific news items found for {sym_up} via Yahoo Finance."]
else:
news[sym_up] = [f"No news found for {sym_up} via Yahoo Finance."]
except Exception as e:
logger.error(f"News fetch error for {sym_up}: {e}", exc_info=True)
news[sym_up] = [f"Error fetching news for {sym_up}."] # Avoid showing detailed error to user
return news
def get_linkup_news(symbols):
"""Fetches news/research using LinkupClient."""
if not lclient:
return "Linkup client not available (check API key)."
if not symbols:
return "No symbols provided for Linkup research."
symbol_string = ", ".join(symbols) # Create string like "BTC, ETH, SOL"
query = f"Provide the latest crypto news, expert opinions, rumors, and market trends and also tell more about these cryptocurrencies: {symbol_string}."
try:
response = lclient.search(
query=query,
depth="standard", # or 'deep' if needed
output_type="searchResults", # Get structured results if possible, or 'answer' for summary
include_images=False,
)
return str(response)
except Exception as e:
logger.error(f"Linkup API search failed: {e}", exc_info=True)
return f"Error getting Linkup research: {e}"
def get_gemini_recommendation(market_context, news_map, linkup_research):
"""
Build prompt with strategies, news, portfolio, prices, then call Gemini.
Uses the detailed prompt structure and all_coin_prices.
"""
if not gemini_model:
return "Gemini AI model not available."
# --- Strategies ---
# (Keep the strategies dictionary as defined before)
strategies = {
"Momentum Trading": "Ride strong directional moves by buying assets in an uptrend and selling when momentum fades.",
"Scalping": "Capture small profits via quick inโ€‘andโ€‘out trades over very short timeframes.",
"Range Trading": "Buy at established support levels and sell at resistance within defined price ranges.",
"Breakout Trading": "Enter trades when price decisively breaks key support or resistance levels for strong followโ€‘through moves.",
"Basket Trading": "Trade a diversified set of assets simultaneously to reduce idiosyncratic risk.",
"Arbitrage": "Exploit price discrepancies across different markets or exchanges to earn lowโ€‘risk profits.",
"Mean Reversion": "Bet on price reversals by buying when an asset deviates below its historical average and selling when it rises above.",
"Pullback Trading": "Enter on temporary retracements toward trendโ€‘defining support or moving averages within an established trend.",
"Chart Pattern Trading": "Use classical price formationsโ€”like headโ€‘andโ€‘shoulders, triangles, and double tops/bottomsโ€”to anticipate trend reversals or continuations.",
"Fibonacci Retracement": "Use horizontal lines at Fibonacci ratios (23.6%,ย 38.2%,ย 61.8%) to identify potential support and resistance levels.",
"Moving Average Crossover": "Execute trades when a shortโ€‘term moving average crosses above or below a longerโ€‘term moving average.",
"News Trading": "Time entries and exits around significant news releases to capture rapid market reactions.",
"Sentiment Analysis": "Leverage socialโ€‘media and onโ€‘chain sentiment indicators to gauge market mood and timing.",
"Statistical Arbitrage": "Automate trades based on quantitative models that exploit temporary price inefficiencies between correlated instruments.",
"Grid Trading": "Place a series of staggered buy and sell orders at predefined price intervals to profit from market oscillations.",
}
strategies_str = "\n".join([f"- **{k}**: {v}" for k, v in strategies.items()])
# --- News Formatting ---
news_sections = []
if news_map:
for sym, items in news_map.items():
if items: # Only include if there are news items
# Format each item as a bullet point
item_bullets = "\n - ".join(items)
news_sections.append(f"**{sym}**:\n - {item_bullets}")
news_str = "\n\n".join(news_sections) if news_sections else "No recent news updates found for portfolio assets via Yahoo Finance."
# --- Portfolio and Price Formatting ---
port_val = calculate_portfolio_value()
all_prices = st.session_state.get('all_coin_prices', {})
# Include all known symbols' prices, not just portfolio ones
all_known_symbols = ALL_SYMBOLS
price_lines = []
for s in sorted([sym for sym in all_known_symbols if sym != 'USD']): # Sort symbols
price = all_prices.get(s)
price_str = f"${price:,.4f}" if price is not None else "N/A"
price_lines.append(f"- {s}/USD: {price_str}")
price_str = "\n".join(price_lines) or "No price data available."
# Portfolio holdings formatting
port_lines = []
portfolio_copy = dict(st.session_state.portfolio)
# Handle USD first
if 'USD' in portfolio_copy:
usd_data = portfolio_copy.pop('USD')
usd_qty = usd_data.get('quantity', 0.0)
usd_pct = (usd_qty / port_val * 100) if port_val > 0 else 0
port_lines.append(f"- USD (Cash): ${usd_qty:,.2f} ({usd_pct:.1f}%)")
# Handle crypto assets, sorted
for s in sorted(portfolio_copy.keys()):
data = portfolio_copy[s]
qty = data.get('quantity', 0.0)
pr = all_prices.get(s) # Use cached price
if pr is not None and qty > 1e-9: # Only show assets with quantity > 0
val = qty * pr
pct = (val / port_val * 100) if port_val > 0 else 0
port_lines.append(f"- {s}: {qty:.6f} @ ${pr:,.4f} = ${val:,.2f} ({pct:.1f}%)")
elif qty > 1e-9: # Show assets even if price is missing
port_lines.append(f"- {s}: {qty:.6f} @ N/A = Value N/A")
portfolio_str = "\n".join(port_lines) if port_lines else "No assets held."
if not portfolio_str and 'USD' not in st.session_state.portfolio:
portfolio_str = "Portfolio is empty."
# --- Build the Prompt ---
prompt = f"""
You are a helpful and engaging crypto trading analyst. Your goal is to provide a structured report with actionable insights.
**Date:** {datetime.now().strftime('%Y-%m-%d')}
**Available Trading Strategies:**
{strategies_str}
**Analysis Request:**
1. **Strategy Selection:** Choose one or two strategies from the list above that seem most relevant based on the current data. You can also suggest variations or other standard strategies not listed.
2. **Justification:** Clearly explain *why* you chose these strategies, linking them to the provided news, research, market context, or current portfolio composition. Use clear, accessible language.
3. **Actionable Recommendations:** Provide **specific, simulated trade ideas** (BUY or SELL). Include:
* Coin Symbol (e.g., BTC, ETH).
* Suggested Trade Size (e.g., '$100 worth', '0.5 ETH', or '% of available cash/holding').
* Clear reasoning for each trade, tied back to your chosen strategy and the data.
4. **Summaries:** Briefly summarize the key takeaways from the news and research provided.
**Current Data:**
**1. Recent News (Yahoo Finance):**
{news_str}
**2. Market Research & Trends (Linkup):**
{linkup_research}
**3. User-Provided Context:**
{market_context or 'None provided.'}
**4. Simulated Portfolio:**
{portfolio_str}
Total Portfolio Value: ${port_val:,.2f}
**5. Current Market Prices (USD):**
{price_str}
**Your Report:**
Format your response using Markdown. Use headings, bullet points, and potentially tables for clarity. Ensure all text uses standard ASCII characters suitable for 'helvetica' font (replace special dashes like 'โ€“' or 'โ€”' with '-').
**Report Sections:**
* **News & Research Summary:**
* **Chosen Strategies & Justifications:**
* **Simulated Trade Recommendations:**
* Trade 1: [BUY/SELL] [Symbol] - [Size] - [Reasoning]
* Trade 2: [BUY/SELL] [Symbol] - [Size] - [Reasoning]
* (Add more trades if applicable)
* **Overall Outlook/Next Steps:** (Optional brief concluding thought)
"""
logger.debug(f"--- Gemini Prompt Start ---\n{prompt[:500]}...\n--- Gemini Prompt End ---") # Log truncated prompt
try:
# Configure safety settings if needed (optional)
safety_settings = [
{"category": "HARM_CATEGORY_HARASSMENT", "threshold": "BLOCK_MEDIUM_AND_ABOVE"},
{"category": "HARM_CATEGORY_HATE_SPEECH", "threshold": "BLOCK_MEDIUM_AND_ABOVE"},
{"category": "HARM_CATEGORY_SEXUALLY_EXPLICIT", "threshold": "BLOCK_MEDIUM_AND_ABOVE"},
{"category": "HARM_CATEGORY_DANGEROUS_CONTENT", "threshold": "BLOCK_MEDIUM_AND_ABOVE"},
]
# Set generation config (e.g., temperature for creativity)
generation_config = genai.types.GenerationConfig(
temperature=0.7, # Adjust for desired creativity/determinism
max_output_tokens=4096 # Set a reasonable limit
)
response = gemini_model.generate_content(
prompt,
generation_config=generation_config,
safety_settings=safety_settings
)
# --- Process Response ---
# Check for blocked content first
if not response.candidates:
block_reason = getattr(response, 'prompt_feedback', {}).get('block_reason', 'Unknown')
block_details = getattr(response, 'prompt_feedback', {}).get('block_reason_message', 'No details provided.')
logger.warning(f"Gemini response blocked. Reason: {block_reason}, Details: {block_details}")
return f"โš ๏ธ AI recommendation blocked by safety filters. Reason: {block_reason}. Please adjust context or retry."
# Access text content correctly based on potential structures
if hasattr(response, 'text'):
recommendation = response.text
elif response.candidates and hasattr(response.candidates[0], 'content') and hasattr(response.candidates[0].content, 'parts'):
recommendation = "".join(part.text for part in response.candidates[0].content.parts if hasattr(part, 'text'))
else:
# Fallback or if structure is unexpected
logger.warning(f"Unexpected Gemini response format: {response}")
return "AI response format was unexpected. Could not extract text."
if recommendation:
# Basic cleaning of the recommendation text
recommendation = recommendation.replace('โ€“', '-').replace('โ€”', '-') # Replace dashes
logger.info("Successfully generated Gemini recommendation.")
return recommendation.strip()
else:
logger.warning(f"Gemini response parts did not contain text: {response.candidates}")
return "AI response received but contained no actionable text content."
except Exception as e:
logger.error(f"Gemini API call failed: {e}", exc_info=True)
# Provide a more user-friendly error message
return f"โŒ Error generating AI recommendation. Details: {str(e)}. Please check API key, quota, or try again later."
# --- Initial Data Fetch ---
needs_initial_fetch = (not st.session_state.all_coin_prices or st.session_state.last_refresh_time == 0)
if needs_initial_fetch and coingecko_status == "success":
logger.info("Performing initial fetch of all coin prices...")
with st.spinner("Loading initial market prices..."):
# update_prices already handles logging and potential errors
if not update_prices():
st.error("Initial price fetch failed. Some features might be limited. Please try refreshing manually.", icon="๐Ÿšจ")
elif needs_initial_fetch and coingecko_status != "success":
st.warning(f"CoinGecko API not available ({coingecko_status}). Prices cannot be fetched.", icon="โš ๏ธ")
# --- UI Sections ---
def sidebar_section():
st.sidebar.title("โš™๏ธ Controls")
st.sidebar.divider()
st.sidebar.subheader("Manage Portfolio")
# --- Add Cash ---
# Use session state key for persistence across reruns
if 'cash_input_value' not in st.session_state:
st.session_state.cash_input_value = 0.0
add_cash = st.sidebar.number_input(
"Add Cash (USD)",
min_value=0.0,
# Use the key to control the value directly
key="add_cash_input_field", # Unique key for the input widget itself
value=st.session_state.cash_input_value, # Display the stored value
step=100.0,
format="%.2f",
help="Enter the amount of USD cash to add to your simulation balance."
)
# Update the stored value whenever the input changes
st.session_state.cash_input_value = add_cash
if st.sidebar.button("โž• Add Cash", key="add_cash_button"):
if st.session_state.cash_input_value > 0:
# Ensure USD entry exists
if 'USD' not in st.session_state.portfolio:
st.session_state.portfolio['USD'] = {'quantity': 0.0, 'type': 'cash'}
st.session_state.portfolio['USD']['quantity'] += st.session_state.cash_input_value
st.sidebar.success(f"Added ${st.session_state.cash_input_value:,.2f}")
logger.info(f"Added ${st.session_state.cash_input_value:.2f} cash.")
# Reset the input field's stored value after adding
st.session_state.cash_input_value = 0.0
st.rerun() # Update portfolio display immediately
else:
st.sidebar.warning("Please enter a positive amount to add.")
# --- Add/Update Crypto ---
st.sidebar.markdown("---")
# Use selectbox with sorted symbols
# Provide a sensible default, e.g., 'BTC' or the first available symbol
default_symbol = 'BTC' if 'BTC' in ALL_SYMBOLS else (ALL_SYMBOLS[0] if ALL_SYMBOLS else None)
# Find index robustly
symbol_index = 0
if default_symbol and ALL_SYMBOLS:
try:
symbol_index = ALL_SYMBOLS.index(default_symbol) # Use ALL_SYMBOLS directly as it's sorted
except ValueError:
symbol_index = 0 # Fallback
add_symbol = st.sidebar.selectbox(
"Coin Symbol",
options=ALL_SYMBOLS, # Use the globally sorted list
index=symbol_index,
key="add_symbol_select",
help="Select the cryptocurrency symbol to add or update."
)
# Get current quantity for the selected symbol to pre-fill the input
current_qty = st.session_state.portfolio.get(add_symbol, {}).get('quantity', 0.0)
add_qty = st.sidebar.number_input(
f"Set Quantity for {add_symbol}", # Dynamic label
min_value=0.0,
value=current_qty, # Show current quantity
step=0.01,
format="%.8f",
key=f"add_qty_input_{add_symbol}", # Key depends on symbol to reset correctly on symbol change
help=f"Enter the total quantity of {add_symbol} you want in your portfolio. This will overwrite the current amount."
)
if st.sidebar.button("๐Ÿ’พ Set Quantity", key="add_update_button"):
if add_symbol and add_symbol != 'USD': # Ensure a valid crypto symbol is selected
# Update or add the symbol
if add_qty > 1e-9: # Only add/update if quantity is positive
st.session_state.portfolio[add_symbol] = {
'quantity': add_qty,
'type': 'crypto'
}
st.sidebar.success(f"{add_symbol} quantity set to {add_qty:.8f}")
logger.info(f"Set portfolio quantity for {add_symbol} to {add_qty}")
elif add_symbol in st.session_state.portfolio: # If quantity is zero, remove it
del st.session_state.portfolio[add_symbol]
st.sidebar.success(f"{add_symbol} removed from portfolio (quantity set to 0).")
logger.info(f"Removed {add_symbol} from portfolio (quantity set to 0).")
else:
# Quantity is 0 and asset wasn't in portfolio anyway
st.sidebar.info(f"{add_symbol} not added (quantity is 0).")
st.rerun() # Rerun to update the portfolio display
elif add_symbol == 'USD':
st.sidebar.warning("Cannot set quantity for USD directly. Use 'Add Cash'.")
else:
st.sidebar.warning("Please select a valid cryptocurrency symbol.")
# --- Remove Crypto ---
# Filter only crypto assets currently in the portfolio with quantity > 0
removeables = sorted([s for s, d in st.session_state.portfolio.items()
if s != 'USD' and d.get('type') == 'crypto' and d.get('quantity', 0) > 1e-9])
if removeables:
st.sidebar.markdown("---")
rm_symbol = st.sidebar.selectbox(
"Remove Asset",
options=removeables,
index=0, # Default to the first asset in the list
key="remove_symbol_select",
help="Select a crypto asset to completely remove from your portfolio."
)
if st.sidebar.button(f"โŒ Remove {rm_symbol}", type="secondary", key="remove_button"):
if rm_symbol in st.session_state.portfolio:
removed_info = st.session_state.portfolio.pop(rm_symbol)
st.sidebar.success(f"Removed {rm_symbol} (was {removed_info.get('quantity', 0):.8f})")
logger.info(f"Removed {rm_symbol} from portfolio.")
st.rerun() # Update portfolio display
else:
# This case shouldn't happen if options are generated correctly
st.sidebar.warning(f"{rm_symbol} not found in portfolio (already removed?).")
# --- Status & Refresh ---
st.sidebar.header("๐Ÿ“Š Status & Sync")
# Display CoinGecko status
cg_status = st.session_state.get('coingecko_status', 'Not Initialized')
if cg_status == "success":
st.sidebar.success("CoinGecko: Connected")
elif "Error" in cg_status:
st.sidebar.error(f"CoinGecko: {cg_status}")
else: # e.g., Not Initialized
st.sidebar.warning(f"CoinGecko: {cg_status}")
# Display Gemini status
gem_status = st.session_state.get('gemini_status', 'Not Initialized')
if gemini_model and gem_status == "success":
st.sidebar.success("Gemini AI: Ready ๐Ÿ”ฅ")
elif "Error" in gem_status or "No Gemini API key" in gem_status:
st.sidebar.error(f"Gemini AI: {gem_status}")
else:
st.sidebar.warning(f"Gemini AI: {gem_status}")
# Display Linkup status (if applicable)
linkup_status = st.session_state.get('linkup_status')
if lclient:
st.sidebar.success("Linkup: Ready ๐Ÿ”Ž")
elif linkup_status:
if "Error" in linkup_status or "No Linkup API key" in linkup_status:
st.sidebar.error(f"Linkup: {linkup_status}")
else:
st.sidebar.warning(f"Linkup: {linkup_status}")
if st.sidebar.button("๐Ÿ”„ Refresh Prices Now", key="refresh_button", help="Manually fetch the latest prices for all tracked coins."):
if cg: # Only allow manual refresh if coingecko client exists
with st.spinner("Refreshing all prices..."):
if update_prices(): # Calls fetch_all_coin_prices() which updates status
st.toast("Prices refreshed!", icon="โœ…")
else:
# update_prices() or fetch_all_coin_prices() already shows toast/error and updates status
st.sidebar.error("Price refresh failed. Check status above.")
st.rerun() # Rerun to update displays with new prices
else:
st.sidebar.error("CoinGecko connection failed. Cannot refresh.")
# Display last update time
last_refresh = st.session_state.get('last_refresh_time', 0)
if last_refresh > 0:
last_update_dt = pd.Timestamp.fromtimestamp(last_refresh)
# Show relative time if humanize is available
try:
from humanize import naturaltime
# Ensure comparison is timezone-aware or naive vs naive
now_ts = pd.Timestamp.now()
if last_update_dt.tzinfo:
now_ts = now_ts.tz_localize(last_update_dt.tzinfo) # Make 'now' aware if needed
elif now_ts.tzinfo:
last_update_dt = last_update_dt.tz_localize(now_ts.tzinfo) # Make 'last_update' aware
time_ago = naturaltime(now_ts - last_update_dt)
st.sidebar.caption(f"Prices updated: {time_ago}")
st.sidebar.caption(f"({last_update_dt.strftime('%Y-%m-%d %H:%M:%S')})")
except ImportError:
st.sidebar.caption(f"Prices last updated: {last_update_dt.strftime('%Y-%m-%d %H:%M:%S')}")
except Exception as e: # Catch potential timezone comparison errors
logger.warning(f"Error calculating time ago: {e}")
st.sidebar.caption(f"Prices last updated: {last_update_dt.strftime('%Y-%m-%d %H:%M:%S')}")
else:
st.sidebar.caption("Prices not yet loaded.")
st.sidebar.caption(f"Auto-refresh approx. every {REFRESH_INTERVAL/60:.0f} mins")
def portfolio_section():
st.subheader("๐Ÿ“ˆ Portfolio Overview")
# Check if portfolio contains any assets (crypto or cash > 0)
has_assets = any(d.get('quantity', 0) > 1e-9 for s, d in st.session_state.portfolio.items())
if not has_assets:
st.info("Your portfolio is empty. Use the sidebar ('Add Cash' or 'Set Quantity') to add assets.", icon="โ„น๏ธ")
# Display total value as $0.00 explicitly when empty
st.markdown(
f"<h2 style='font-size: 1.2em; margin-bottom: 0;'>Total Portfolio Value (USD)</h2>"
f"<h1 style='font-size: 2.0em; font-weight: bold; margin-top: 0;'>$0.00</h1>",
unsafe_allow_html=True,
)
return # Stop here if portfolio is empty
portfolio_data = []
all_prices = st.session_state.get('all_coin_prices', {}) # Get the global prices
# Ensure USD is always first if present and has quantity > 0
if 'USD' in st.session_state.portfolio and st.session_state.portfolio['USD'].get('quantity', 0.0) > 1e-9:
usd_data = st.session_state.portfolio['USD']
usd_qty = usd_data.get('quantity', 0.0)
portfolio_data.append({
'Symbol': 'USD',
'Quantity': usd_qty,
'Type': 'Cash',
'Price (USD)': 1.0, # USD price is always 1
'Value (USD)': usd_qty
})
# Add crypto assets, sorted alphabetically, only if quantity > 0
crypto_symbols = sorted([s for s, d in st.session_state.portfolio.items()
if s != 'USD' and d.get('type') == 'crypto' and d.get('quantity', 0) > 1e-9])
missing_price_symbols = []
for sym in crypto_symbols:
data = st.session_state.portfolio[sym]
quantity = data.get('quantity', 0.0)
# Price lookup
price = all_prices.get(sym) # Get price from global dict
value = None # Default to None
if price is not None and price > 0: # Ensure price is valid positive number
value = quantity * price # Calculate value
else:
missing_price_symbols.append(sym) # Track symbols with missing/invalid prices
portfolio_data.append({
'Symbol': sym,
'Quantity': quantity,
'Type': 'Crypto',
'Price (USD)': price if price is not None else np.nan, # Use NaN for missing prices for better formatting
'Value (USD)': value if value is not None else np.nan
})
if portfolio_data:
df = pd.DataFrame(portfolio_data)
# --- Display Summary (First 5 rows or fewer) ---
display_rows = min(len(df), 5)
head_df = df.head(display_rows)
styled_head_df = head_df.style.format({
'Quantity': "{:,.8f}".format,
'Price (USD)': lambda x: "${:,.4f}".format(x) if pd.notnull(x) and x > 0 else "N/A",
'Value (USD)': lambda x: "${:,.2f}".format(x) if pd.notnull(x) else "N/A",
}).hide(axis="index") # Use hide index directly
# Use st.dataframe for better scrolling and column config
st.dataframe(
styled_head_df,
height= (display_rows + 1) * 35 + 3, # Dynamic height: (rows + header) * approx_row_height + border/padding
use_container_width=True,
# hide_index=True, # Already done in style
column_order=["Symbol", "Quantity", "Price (USD)", "Value (USD)", "Type"], # Define column order
column_config={ # Add tooltips and potentially widths
"Symbol": st.column_config.TextColumn(width="small", help="Asset Symbol"),
"Quantity": st.column_config.NumberColumn(format="%.8f", width="medium", help="Amount of the asset held"),
"Type": st.column_config.TextColumn(width="small", help="Asset Type (Cash or Crypto)"),
"Price (USD)": st.column_config.NumberColumn(format="$%.4f", width="medium", help="Last fetched price per unit in USD (N/A if unavailable/invalid)"),
"Value (USD)": st.column_config.NumberColumn(format="$%.2f", width="medium", help="Total value in USD (Quantity * Price). N/A if price unavailable/invalid."),
}
)
# --- Expander for Full Portfolio (only if more than 5 assets) ---
if len(df) > display_rows:
# Format the entire DataFrame for the expander
styled_full_df = df.style.format({
'Quantity': "{:,.8f}".format,
'Price (USD)': lambda x: "${:,.4f}".format(x) if pd.notnull(x) and x > 0 else "N/A",
'Value (USD)': lambda x: "${:,.2f}".format(x) if pd.notnull(x) else "N/A",
}).hide(axis="index")
with st.expander("Show Full Portfolio"):
st.dataframe(
styled_full_df,
use_container_width=True,
# hide_index=True, # Done in style
column_order=["Symbol", "Quantity", "Price (USD)", "Value (USD)", "Type"],
column_config={ # Repeat config for consistency
"Symbol": st.column_config.TextColumn(width="small", help="Asset Symbol"),
"Quantity": st.column_config.NumberColumn(format="%.8f", width="medium", help="Amount of the asset held"),
"Type": st.column_config.TextColumn(width="small", help="Asset Type (Cash or Crypto)"),
"Price (USD)": st.column_config.NumberColumn(format="$%.4f", width="medium", help="Last fetched price per unit in USD (N/A if unavailable/invalid)"),
"Value (USD)": st.column_config.NumberColumn(format="$%.2f", width="medium", help="Total value in USD (Quantity * Price). N/A if price unavailable/invalid."),
}
)
# Display warning about missing prices if any occurred
if missing_price_symbols:
st.caption(f"โš ๏ธ Note: Price data currently unavailable or invalid for: {', '.join(sorted(missing_price_symbols))}. Value calculated as N/A. Try refreshing prices.")
else:
# This case should be covered by the initial 'has_assets' check
st.write("Portfolio data is currently unavailable.")
# --- Display Total Value ---
total_value = calculate_portfolio_value() # Recalculate based on current data
st.markdown("---") # Add a visual separator
st.markdown(
f"<h2 style='font-size: 1.2em; margin-bottom: 0;'>Total Portfolio Value (USD)</h2>"
f"<h1 style='font-size: 2.0em; font-weight: bold; margin-top: 0;'>${total_value:,.2f}</h1>"
f"<p style='font-size: 0.9em; color: grey;'>Sum of USD cash and the value of crypto holdings based on the latest valid prices.</p>",
unsafe_allow_html=True,
)
def trade_section():
st.subheader("๐Ÿ› ๏ธ Simulate Trade")
# Available symbols: All known symbols from our list, excluding USD
tradeable_symbols = sorted([s for s in ALL_SYMBOLS if s != 'USD'])
if not tradeable_symbols:
st.warning("No tradeable symbols defined (excluding USD).")
return
# Use columns for layout
col1, col2 = st.columns([1, 2]) # Adjust column ratios if needed
with col1:
# Default to a common coin like BTC or the first in the list
default_trade_symbol = 'BTC' if 'BTC' in tradeable_symbols else tradeable_symbols[0]
# Find index robustly
symbol_index = 0
try:
symbol_index = tradeable_symbols.index(default_trade_symbol)
except ValueError:
symbol_index = 0 # Fallback
symbol = st.selectbox(
"Select Symbol",
tradeable_symbols,
index=symbol_index,
key="trade_symbol",
help="Select the cryptocurrency you want to trade."
)
action = st.radio(
"**Action**",
["BUY", "SELL"],
horizontal=True,
key="trade_action",
label_visibility="collapsed" # Hide label if clear from context
)
with col2:
# Suggest a reasonable step based on typical crypto values
# Use a key dependent on the symbol to potentially reset default value on symbol change
trade_qty_key = f"trade_quantity_{symbol}"
trade_qty = st.number_input(
f"Quantity of {symbol}",
min_value=0.0,
value=0.01 if symbol != 'ETH' else 0.1, # Sensible default based on coin?
step=0.0001, # Smaller step for more precision
format="%.8f",
key=trade_qty_key,
help=f"Enter the amount of {symbol} to trade."
)
# --- Price Display using cached data ---
# Get the most recently fetched price from the session state
cached_price = st.session_state.all_coin_prices.get(symbol)
trade_price = None # Initialize as None
# Placeholder for dynamic price info
price_display_area = st.empty()
if coingecko_status != 'success' and not cached_price: # If connection bad AND no cached price
price_display_area.error(f"CoinGecko error ({coingecko_status}). Price unavailable. Trade disabled.", icon="๐Ÿšจ")
elif cached_price is not None and cached_price > 0:
trade_price = cached_price # Valid price found
# Display the cached price that will be used for the trade
price_display_area.success(f"**Trade Execution Price ({symbol}): ${trade_price:,.4f}** (from last refresh)")
# Display estimated cost/proceeds
estimated_value = trade_qty * trade_price
value_label = "Estimated Cost" if action == "BUY" else "Estimated Proceeds"
st.caption(f"{value_label}: ${estimated_value:,.2f}")
else:
# If no valid price is available in the cache for this symbol
status_msg = f"({coingecko_status})" if coingecko_status != 'success' else ""
price_display_area.warning(f"๐Ÿšจ No valid price data available for {symbol} {status_msg}. Refresh prices. Trade disabled.", icon="โš ๏ธ")
# --- Execute Button ---
# Enable button only if a valid positive price exists and quantity is positive
can_trade = trade_price is not None and trade_price > 0 and trade_qty > 1e-9 # Use epsilon for float comparison
st.button(
f"Execute {action}",
disabled=not can_trade,
key="execute_trade_button",
type="primary",
help=f"Click to simulate {action.lower()}ing {trade_qty:.8f} {symbol} at ${trade_price:,.4f}" if can_trade else "Cannot execute trade (check price > 0 and quantity > 0)",
on_click=execute_trade_callback, # Pass the callback function directly
args=(action, symbol, trade_qty, trade_price) # Pass arguments to the callback
)
# Callback function MUST be defined before it's used in st.button's on_click
def execute_trade_callback(action, symbol, qty, price):
"""Callback function to execute the trade."""
# simulate_trade now performs checks (price validity, sufficient funds/assets)
if simulate_trade(action, symbol, qty, price):
# Trade succeeded (simulate_trade shows success message)
st.toast(f"{action} order for {qty:.6f} {symbol} successful!", icon="๐ŸŽ‰")
# Optional: Clear trade quantity input? Maybe not, user might adjust.
# st.session_state[f"trade_quantity_{symbol}"] = 0.01 # Reset default?
# Rerun is handled implicitly by Streamlit after callback + potential state changes
else:
# Trade failed (simulate_trade shows error message)
st.toast(f"{action} order for {symbol} failed.", icon="โŒ")
# No rerun needed here, error shown by simulate_trade
def trade_history_section():
st.subheader("๐Ÿ“œ Trade History")
if st.session_state.trade_history:
# Create DataFrame copy for manipulation
try: # Add try-except for robustness if history gets corrupted
hist_df = pd.DataFrame(st.session_state.trade_history).copy()
# Ensure Timestamp is datetime and timezone-aware (should be from pd.Timestamp.now(tz='UTC'))
hist_df['Timestamp'] = pd.to_datetime(hist_df['Timestamp'], utc=True)
# Sort by Timestamp descending (most recent first)
hist_df = hist_df.sort_values('Timestamp', ascending=False)
# Prepare columns for display: Combine Cost/Proceeds or handle NaN
hist_df['Value (USD)'] = hist_df.apply(
lambda row: row.get('Cost (USD)') if row['Type'] == 'BUY' else row.get('Proceeds (USD)'),
axis=1
)
# Drop original Cost/Proceeds if combined
# hist_df = hist_df.drop(columns=['Cost (USD)', 'Proceeds (USD)'], errors='ignore')
# --- Display Summary (Last 5 trades) ---
display_rows = min(len(hist_df), 5)
head_df = hist_df.head(display_rows)
styled_head_df = head_df.style.format({
'Timestamp': '{:%Y-%m-%d %H:%M:%S %Z}'.format, # Include timezone
'Quantity': "{:,.8f}".format,
'Price (USD)': "${:,.4f}".format,
'Cost (USD)': lambda x: "${:,.2f}".format(x) if pd.notnull(x) else "-", # Keep separate Cost/Proceeds
'Proceeds (USD)': lambda x: "${:,.2f}".format(x) if pd.notnull(x) else "-",
# 'Value (USD)': "${:,.2f}".format, # Format if using combined column
}).map(
lambda action: 'color: green' if action=='BUY' else ('color: red' if action=='SELL' else ''),
subset=['Type']
).hide(axis="index")
# Display the last 5 trades with dynamic height
st.dataframe(
styled_head_df,
height= (display_rows + 1) * 35 + 3, # Dynamic height
use_container_width=True,
column_order=("Timestamp", "Type", "Symbol", "Quantity", "Price (USD)", "Cost (USD)", "Proceeds (USD)"), # Adjusted order
column_config={
"Timestamp": st.column_config.DatetimeColumn("Time", help="Time of trade execution (UTC)"),
"Type": st.column_config.TextColumn("Action", help="BUY (Green) or SELL (Red)"),
"Symbol": st.column_config.TextColumn("Coin", help="Cryptocurrency traded"),
"Quantity": st.column_config.NumberColumn("Amount", format="%.8f", help="Quantity of the coin traded"),
"Price (USD)": st.column_config.NumberColumn("Executed Price", format="$%.4f", help="Price per coin at execution"),
"Cost (USD)": st.column_config.NumberColumn("Cost", format="$%.2f", help="Total USD spent (for BUYs)"),
"Proceeds (USD)": st.column_config.NumberColumn("Proceeds", format="$%.2f", help="Total USD received (for SELLs)"),
# "Value (USD)": st.column_config.NumberColumn("Value", format="$%.2f", help="Total USD Cost (BUY) or Proceeds (SELL)"), # Config for combined column
}
)
# --- Expander for Full History (if more than 5 trades) ---
if len(hist_df) > display_rows:
# Format the entire DataFrame for the expander
styled_full_df = hist_df.style.format({
'Timestamp': '{:%Y-%m-%d %H:%M:%S %Z}'.format,
'Quantity': "{:,.8f}".format,
'Price (USD)': "${:,.4f}".format,
'Cost (USD)': lambda x: "${:,.2f}".format(x) if pd.notnull(x) else "-",
'Proceeds (USD)': lambda x: "${:,.2f}".format(x) if pd.notnull(x) else "-",
# 'Value (USD)': "${:,.2f}".format,
}).map(
lambda action: 'color: green' if action=='BUY' else ('color: red' if action=='SELL' else ''),
subset=['Type']
).hide(axis="index")
with st.expander("Show Full Trade History"):
st.dataframe(
styled_full_df,
use_container_width=True,
column_order=("Timestamp", "Type", "Symbol", "Quantity", "Price (USD)", "Cost (USD)", "Proceeds (USD)"),
column_config={ # Repeat config
"Timestamp": st.column_config.DatetimeColumn("Time", help="Time of trade execution (UTC)"),
"Type": st.column_config.TextColumn("Action", help="BUY (Green) or SELL (Red)"),
"Symbol": st.column_config.TextColumn("Coin", help="Cryptocurrency traded"),
"Quantity": st.column_config.NumberColumn("Amount", format="%.8f", help="Quantity of the coin traded"),
"Price (USD)": st.column_config.NumberColumn("Executed Price", format="$%.4f", help="Price per coin at execution"),
"Cost (USD)": st.column_config.NumberColumn("Cost", format="$%.2f", help="Total USD spent (for BUYs)"),
"Proceeds (USD)": st.column_config.NumberColumn("Proceeds", format="$%.2f", help="Total USD received (for SELLs)"),
# "Value (USD)": st.column_config.NumberColumn("Value", format="$%.2f", help="Total USD Cost (BUY) or Proceeds (SELL)"),
}
)
except Exception as e:
st.error(f"Error displaying trade history: {e}")
logger.error(f"Failed to process or display trade history: {e}", exc_info=True)
else:
st.caption("No trades recorded yet.") # Displayed only when history is empty
def recommendation_section():
st.subheader("๐Ÿ’ก AI Trade Recommendations") # Moved subheader here
if gemini_model and gemini_status == "success":
# Get current crypto holdings (symbols only) for news fetch
portfolio_cryptos = [s for s, d in st.session_state.portfolio.items()
if d.get('type') == 'crypto' and d.get('quantity', 0) > 1e-9]
ctx = st.text_area(
"Market Context or ideas you want included (Optional):",
height=100,
key="ai_context",
placeholder="e.g., Overall market sentiment (bullish/bearish), specific events (upcoming ETF decision), personal risk tolerance (conservative/aggressive)..."
)
# Prepare args for the callback
callback_args = {
"market_context": ctx,
"portfolio_symbols": portfolio_cryptos
}
# Use a state variable to store the recommendation result
if 'ai_recommendation_result' not in st.session_state:
st.session_state.ai_recommendation_result = None
if 'ai_recommendation_running' not in st.session_state:
st.session_state.ai_recommendation_running = False
# Button to trigger the process
if st.button("๐Ÿค– Get AI Recommendation", key="ai_rec_button", disabled=st.session_state.ai_recommendation_running):
st.session_state.ai_recommendation_running = True
st.session_state.ai_recommendation_result = None # Clear previous result
st.rerun() # Rerun to show spinner and execute logic below
# Logic to run when button is clicked (after rerun)
if st.session_state.ai_recommendation_running:
with st.spinner("๐Ÿง  Gathering data and consulting the AI..."):
try:
# 1. Fetch Yahoo news
news_map = get_coin_news(callback_args["portfolio_symbols"])
# 2. Fetch Linkup research
linkup_research = get_linkup_news(callback_args["portfolio_symbols"])
# 3. Call the Gemini recommendation function
recommendation = get_gemini_recommendation(
callback_args["market_context"],
news_map,
linkup_research
)
# Store result in session state
st.session_state.ai_recommendation_result = recommendation
except Exception as e:
st.error(f"Failed to get recommendation: {e}")
logger.error(f"Unhandled error during AI recommendation generation: {e}", exc_info=True)
st.session_state.ai_recommendation_result = f"Error: {e}" # Store error message
finally:
# Reset running flag regardless of success/failure
st.session_state.ai_recommendation_running = False
st.rerun() # Rerun to display the result or error
# Display the stored recommendation (or error)
if st.session_state.ai_recommendation_result:
st.markdown("---")# Divider
st.markdown("##### AI Advisor Says:")
if "Error:" in st.session_state.ai_recommendation_result or "blocked by safety filters" in st.session_state.ai_recommendation_result :
st.error(st.session_state.ai_recommendation_result)
else:
st.markdown(st.session_state.ai_recommendation_result)
# --- PDF Download Button ---
# Display button only if recommendation was successful
try:
# Generate PDF from the successful recommendation text
pdf_buffer = create_pdf_report(st.session_state.ai_recommendation_result)
st.download_button(
label="โฌ‡๏ธ Download Report as PDF",
data=pdf_buffer, # Use getvalue() if BytesIO, or direct bytes
file_name=f"AI_Crypto_Rec_{datetime.now().strftime('%Y%m%d_%H%M')}.pdf",
mime="application/pdf",
key="pdf_download_button"
)
except Exception as pdf_e:
st.error(f"Error generating PDF: {str(pdf_e)}")
logger.error(f"Failed to create PDF report: {pdf_e}", exc_info=True)
else:
st.warning(f"AI Recommendations disabled: {gemini_status}", icon="โš ๏ธ")
# --- NEW: Performance Analysis Tab Section ---
import streamlit as st
import pandas as pd
import numpy as np
import plotly.graph_objects as go
from pycoingecko import CoinGeckoAPI # Assuming cg is globally available/initialized
import logging
from datetime import datetime, timedelta
# --- Assuming these are defined globally ---
logger = logging.getLogger(__name__)
# cg = CoinGeckoAPI() # Or retrieved from st.session_state.cg
# COINGECKO_IDS = {'BTC': 'bitcoin', 'ETH': 'ethereum', ...}
# SYMBOL_MAP = {'bitcoin': 'BTC', 'ethereum': 'ETH', ...}
# ALL_SYMBOLS = ['BTC', 'ETH', ...]
# coingecko_status = "success" # Or retrieved from st.session_state
# --- CORRECTED CACHED FUNCTION ---
@st.cache_data(ttl=600, show_spinner="Fetching historical data...") # Cache for 10 minutes
def fetch_historical_data(_cg_ref, coin_id, days):
"""
Fetches historical market data for a single coin ID from CoinGecko.
Returns a DataFrame on success, None on failure.
Uses _cg_ref as a dummy argument to help with cache invalidation if needed.
Corrected merge_asof tolerance.
"""
# Access the global client (ensure it's available in the scope where this runs)
global cg
local_cg = None
if 'cg' in st.session_state and st.session_state.cg:
local_cg = st.session_state.cg
elif 'cg' in globals() and cg:
local_cg = cg # Use the global cg if defined
else:
logger.error("CoinGecko client 'cg' not found in fetch_historical_data scope.")
return None # Cannot proceed without client
if not local_cg:
logger.warning(f"CoinGecko client not available for historical data fetch (ID: {coin_id}).")
return None
try:
logger.info(f"Fetching historical data for {coin_id} ({days} days)...")
# API uses 'max' for days, but requires specific 'days' value for interval logic
fetch_days_str = str(days) if days != 'max' else 'max'
# Determine a numerical day count for interval logic, default large value for 'max'
# Coingecko often limits 'max' history, but using a large number helps interval choice
api_days_val = 365 * 10 if days == 'max' else int(days)
# Determine interval: daily for > 90 days, hourly otherwise (default)
interval = 'daily' if api_days_val > 90 else None # None defaults to hourly on CoinGecko API
chart_data = local_cg.get_coin_market_chart_by_id(
id=coin_id,
vs_currency='usd',
days=fetch_days_str, # API expects string 'max' or number as string
interval=interval
)
# Store prices and volumes if needed
prices = chart_data.get('prices', [])
volumes = chart_data.get('total_volumes', [])
if not prices:
logger.warning(f"No price data returned for {coin_id} ({fetch_days_str} days).")
return None # Return None if no prices
# Create DataFrame for prices
df = pd.DataFrame(prices, columns=['timestamp_ms', 'price'])
df['timestamp'] = pd.to_datetime(df['timestamp_ms'], unit='ms', utc=True)
# Add volume data if available
if volumes:
vol_df = pd.DataFrame(volumes, columns=['timestamp_ms_vol', 'volume'])
# --- FIX: Calculate tolerance in MILLISECONDS ---
if interval == 'daily':
# For daily data, allow merging points within +/- 1 day
tolerance_ms = 24 * 60 * 60 * 1000
else:
# For hourly data, allow merging points within +/- 12 hours
# (Adjust if needed based on typical data alignment)
tolerance_ms = 12 * 60 * 60 * 1000
# Check if volume data is reasonably close in length
if abs(len(vol_df) - len(df)) < (len(df) * 0.1): # e.g., less than 10% difference
logger.info(f"Attempting merge_asof for {coin_id} with tolerance {tolerance_ms} ms.")
# Merge using the integer tolerance
df = pd.merge_asof(
df.sort_values('timestamp_ms'),
vol_df[['timestamp_ms_vol', 'volume']].sort_values('timestamp_ms_vol'),
left_on='timestamp_ms',
right_on='timestamp_ms_vol',
direction='nearest', # Find closest timestamp within tolerance
tolerance=tolerance_ms # USE INTEGER MS TOLERANCE
)
df = df.drop(columns=['timestamp_ms_vol'], errors='ignore')
logger.info(f"Merge successful for {coin_id}.")
else:
logger.warning(f"Volume data length mismatch too large for {coin_id} "
f"(Prices: {len(prices)}, Volumes: {len(volumes)}). Skipping volume merge.")
df['volume'] = np.nan # Add empty volume column
else:
logger.info(f"No volume data returned for {coin_id}. Adding NaN volume column.")
df['volume'] = np.nan # Add volume column even if volumes list is empty
# Final DataFrame structure
df = df[['timestamp', 'price', 'volume']].set_index('timestamp')
# Optional: Handle potential duplicate timestamps if merge created any (unlikely with 'nearest')
df = df[~df.index.duplicated(keep='first')]
logger.info(f"Successfully processed {len(df)} data points for {coin_id}.")
return df
except Exception as e:
logger.error(f"Error processing historical data for {coin_id}: {e}", exc_info=True)
# Return None to indicate failure
return None
# --- Coin performance TAB FUNCTION ---
def performance_analysis_tab():
st.subheader("๐Ÿ“Š Coin Performance Analysis")
# Use status from session state if available
current_coingecko_status = st.session_state.get('coingecko_status', 'Not Initialized')
if current_coingecko_status != "success":
st.warning(f"CoinGecko API not available ({current_coingecko_status}). Performance analysis disabled.", icon="โš ๏ธ")
return
# --- User Selections ---
# Ensure ALL_SYMBOLS is available and sorted
available_symbols = sorted(st.session_state.get('ALL_SYMBOLS', list(COINGECKO_IDS.keys())))
if not available_symbols:
st.error("No symbols available for analysis.")
return
default_selection = []
if 'BTC' in available_symbols: default_selection.append('BTC')
if 'ETH' in available_symbols: default_selection.append('ETH')
selected_symbols = st.multiselect(
"Select Coins to Analyze:",
options=available_symbols,
default=default_selection,
key="perf_symbols_select",
help="Choose one or more cryptocurrencies to compare their performance."
)
# Select Time Range
time_range_options = {
"1 Day": 1, "7 Days": 7, "30 Days": 30,
"90 Days": 90, "1 Year": 365, "Max": 'max'
}
selected_range_label = st.selectbox(
"Select Time Range:",
options=list(time_range_options.keys()),
index=2, # Default to 30 Days
key="perf_range_select"
)
days_to_fetch = time_range_options[selected_range_label]
if not selected_symbols:
st.info("Select one or more coins to visualize their performance.", icon="๐Ÿ“ˆ")
return
# --- Data Fetching and Processing ---
all_perf_data = {}
valid_symbols = []
failed_symbols = [] # Keep track of symbols that failed to load
# Use a dummy value for cache invalidation if needed
cg_ref = id(st.session_state.cg) if 'cg' in st.session_state and st.session_state.cg else None
# Fetch data sequentially (can be slow for many coins)
# Consider asyncio for parallel fetching if performance becomes an issue
# Spinner applied by @st.cache_data decorator
for symbol in selected_symbols:
coin_id = COINGECKO_IDS.get(symbol)
if not coin_id:
st.warning(f"Could not find CoinGecko ID for {symbol}. Skipping.")
continue
# Fetch data using the cached function
hist_data = fetch_historical_data(cg_ref, coin_id, days_to_fetch)
if hist_data is not None and not hist_data.empty:
all_perf_data[symbol] = hist_data
valid_symbols.append(symbol) # Keep track of symbols with data
else:
# Logged already by fetch_historical_data on error
logger.warning(f"No valid historical data obtained for {symbol} ({coin_id}).")
failed_symbols.append(symbol)
# *** Display toast HERE, outside the cached function ***
st.toast(f"โš ๏ธ Failed to load data for {symbol}. It might be unavailable for the selected range.", icon="๐Ÿ“‰")
if not all_perf_data:
st.error("Could not load historical data for any selected coin. Please try again later or select different coins/range.", icon="โŒ")
return
# --- Performance Calculation & Visualization ---
st.markdown("#### Performance Comparison (Normalized)")
st.caption("Shows percentage change relative to the start of the selected period.")
fig_norm = go.Figure()
fig_price = go.Figure()
fig_vol = go.Figure() # Figure for volume
performance_metrics = []
# Find the common start date across all successfully fetched datasets
try:
common_start_date = max(df.index.min() for df in all_perf_data.values() if not df.empty)
except ValueError:
# Handle case where all_perf_data is empty or contains only empty dataframes
st.error("No valid data points found to determine a common start date.", icon="โŒ")
return
for symbol in valid_symbols: # Iterate only symbols with data
df = all_perf_data[symbol]
# Filter data starting from the common start date
df_filtered = df[df.index >= common_start_date].copy()
if df_filtered.empty:
logger.warning(f"No data found for {symbol} after common start date {common_start_date}. Skipping plots for this symbol.")
continue # Skip this symbol if no data in the common range
# --- Calculate Metrics & Plot ---
start_price = df_filtered['price'].iloc[0]
end_price = df_filtered['price'].iloc[-1]
# Normalized Price (Percentage Change)
if start_price is not None and start_price > 0: # Avoid division by zero/None
df_filtered['normalized'] = ((df_filtered['price'] / start_price) - 1) * 100
period_change = df_filtered['normalized'].iloc[-1] # Get change from normalized column
# Add trace for normalized performance
fig_norm.add_trace(go.Scatter(
x=df_filtered.index,
y=df_filtered['normalized'],
mode='lines',
name=f"{symbol} % Change",
hovertemplate=f'<b>{symbol}</b><br>%{{y:.2f}}%<br>%{{x|%Y-%m-%d %H:%M}}<extra></extra>'
))
else:
df_filtered['normalized'] = 0 # Set to 0 if start price is 0 or None
period_change = 0.0
logger.warning(f"Start price for {symbol} is zero or invalid ({start_price}), cannot calculate percentage change accurately.")
# Add trace for raw price
fig_price.add_trace(go.Scatter(
x=df_filtered.index,
y=df_filtered['price'],
mode='lines',
name=f"{symbol} Price",
yaxis="y", # Assign to the primary y-axis
hovertemplate=f'<b>{symbol}</b><br>${{y:,.4f}}<br>%{{x|%Y-%m-%d %H:%M}}<extra></extra>'
))
# Add trace for volume (use secondary y-axis if needed)
# Check if volume column exists and has non-NA values
if 'volume' in df_filtered.columns and df_filtered['volume'].notna().any():
fig_vol.add_trace(go.Bar(
x=df_filtered.index,
y=df_filtered['volume'],
name=f"{symbol} Volume",
opacity=0.6, # Make bars slightly transparent
# marker_color='rgba(100, 100, 255, 0.6)', # Assign unique colors later if needed
yaxis="y",
hovertemplate=f'<b>{symbol} Vol</b><br>%{{y:,.0f}}<br>%{{x|%Y-%m-%d %H:%M}}<extra></extra>'
))
# Get current price from global cache for reference
# Ensure all_coin_prices is available
all_prices = st.session_state.get('all_coin_prices', {})
current_price = all_prices.get(symbol, np.nan)
performance_metrics.append({
'Symbol': symbol,
f'{selected_range_label} Change %': period_change,
'Start Price (USD)': start_price,
'End Price (USD)': end_price,
'Current Price (USD)': current_price # Add current price for reference
})
# --- Display Charts ---
if not fig_norm.data: # Check if any traces were added
st.warning("No data available for plotting normalized performance in the selected range.")
else:
# Customize and display normalized performance chart
fig_norm.update_layout(
title=f"Normalized Performance ({selected_range_label})",
xaxis_title="Date",
yaxis_title="Percentage Change (%)",
hovermode="x unified",
legend_title_text='Coins',
height=450,
margin=dict(l=20, r=20, t=40, b=20) # Tighten margins
)
st.plotly_chart(fig_norm, use_container_width=True)
# --- Metrics Table ---
if performance_metrics:
metrics_df = pd.DataFrame(performance_metrics).set_index('Symbol')
st.markdown("#### Key Metrics")
st.dataframe(metrics_df.style.format({
f'{selected_range_label} Change %': "{:,.2f}%",
'Start Price (USD)': "${:,.4f}",
'End Price (USD)': "${:,.4f}",
'Current Price (USD)': lambda x: "${:,.4f}".format(x) if pd.notnull(x) else "N/A",
}).apply( # Use apply for coloring based on value
lambda col: ['color: green' if isinstance(v, (int, float)) and v > 0 else ('color: red' if isinstance(v, (int, float)) and v < 0 else '') for v in col],
subset=[f'{selected_range_label} Change %'],
axis=0 # Apply column-wise
),
use_container_width=True
)
# --- Combined Price and Volume Chart ---
st.markdown("---")
chart_tabs = st.tabs(["๐Ÿ“ˆ Price Chart", "๐Ÿ“Š Volume Chart"])
with chart_tabs[0]:
if not fig_price.data:
st.warning("No data available for plotting price history in the selected range.")
else:
# Customize and display raw price chart
fig_price.update_layout(
title=f"Price History ({selected_range_label})",
xaxis_title="Date",
yaxis_title="Price (USD)",
hovermode="x unified",
legend_title_text='Coins',
height=450,
margin=dict(l=20, r=20, t=40, b=20)
)
st.plotly_chart(fig_price, use_container_width=True)
with chart_tabs[1]:
if not fig_vol.data:
st.warning("No volume data available for plotting in the selected range.")
else:
# Customize and display volume chart
fig_vol.update_layout(
title=f"Trading Volume ({selected_range_label})",
xaxis_title="Date",
yaxis_title="Volume (USD)",
hovermode="x unified",
legend_title_text='Coins',
height=450,
barmode='group', # Group bars if multiple coins have volume
margin=dict(l=20, r=20, t=40, b=20)
)
# Assign distinct colors if multiple volume traces exist
# colors = ['rgba(99, 110, 250, 0.7)', 'rgba(239, 85, 59, 0.7)', 'rgba(0, 204, 150, 0.7)', ...]
# for i, trace in enumerate(fig_vol.data):
# trace.marker.color = colors[i % len(colors)]
st.plotly_chart(fig_vol, use_container_width=True)
# Display symbols that failed to load, if any
if failed_symbols:
st.caption(f"โš ๏ธ Note: Data could not be loaded for: {', '.join(failed_symbols)}. They are excluded from the charts and metrics.")
# --- Main App Layout & Flow ---
def main():
st.header("๐Ÿš€ Adriel & Ashriel's Crypto App") # Slightly adjusted title
st.markdown("(Powered by CoinGecko, Google Gemini & Linkup)")
with st.expander("Welcome to our Crypto Trading Simulator๐Ÿ”ฅ", expanded=False):
st.info(
"""
Welcome! Simulate crypto trading, manage your virtual portfolio, get AI-driven insights, and analyze coin performance.
**Sidebar Controls (< icon top-left if hidden):**
* **Manage Portfolio:** Add cash, add/update/remove crypto assets.
* **Status & Sync:** Check API connections, refresh prices manually.
**Tabs:**
* **๐Ÿ“ˆ Portfolio Management & AI:**
* View your current holdings and total value.
* Simulate BUY/SELL trades using the *most recently refreshed price*.
* Review your trade history.
* Generate AI-powered trade recommendations and market analysis.
* **๐Ÿ“Š Coin Performance Analysis:**
* Select coins and a time range (1D, 7D, 30D, 1Y, etc.).
* Visualize normalized performance (percentage change) to compare assets.
* View price and volume charts for selected coins.
* See key performance metrics for the chosen period.
""",
icon="๐Ÿช™"
)
# --- Periodic Price Refresh Check ---
# Perform this check early in the script run
if should_update_prices() and coingecko_status == "success":
logger.info("Refresh interval reached. Updating all prices...")
# No spinner here as it should be quick and in the background
if update_prices(): # Fetch prices (updates status on success/failure)
st.toast("Auto-refreshing prices...", icon="โณ")
# Use st.rerun() to ensure UI updates after background refresh
st.rerun()
else:
st.toast("Auto-refresh failed. Using cached prices.", icon="โš ๏ธ") # Notify user if background refresh fails
# --- Render Sidebar ---
# Sidebar is rendered outside the tabs as it controls global state
sidebar_section()
# --- Define Tabs ---
tab1, tab2 = st.tabs(["๐Ÿ“ˆ Portfolio Management & AI", "๐Ÿ“Š Coin Performance Analysis"])
# --- Content for Tab 1: Portfolio Management & AI ---
with tab1:
# Main area layout using columns for better organization
col_left, col_right = st.columns(2) # 50/50 split
with col_left:
portfolio_section()
with col_right:
trade_section()
# Areas below the 50/50 split, each taking full width within the tab
st.divider()
trade_history_section()
st.divider()
recommendation_section()
# --- Content for Tab 2: Coin Performance Analysis ---
with tab2:
performance_analysis_tab() # Call the function dedicated to this tab
# --- Footer ---
st.divider()
st.caption("App created by Adriel & Ashriel, All rights reserved. Simulation purposes only. Market data from CoinGecko may have latency. Not financial advice.")
if __name__ == "__main__":
# Optional: Add imports needed only here (like humanize if not already imported)
try:
import humanize
except ImportError:
logger.info("humanize library not found, relative time display disabled.")
pass # humanize is optional
main()