Spaces:
Sleeping
Sleeping
| import os | |
| import time | |
| import socket | |
| import logging | |
| import dns.resolver | |
| import streamlit as st | |
| import pandas as pd | |
| from dotenv import load_dotenv | |
| from pycoingecko import CoinGeckoAPI | |
| import google.generativeai as genai | |
| from langchain_community.tools.yahoo_finance_news import YahooFinanceNewsTool # โ Added import | |
| from fpdf import FPDF | |
| from fpdf.enums import XPos, YPos | |
| import markdown | |
| from datetime import datetime, date, timedelta | |
| from io import BytesIO | |
| from bs4 import BeautifulSoup | |
| import re | |
| from linkup import LinkupClient | |
| import plotly.graph_objects as go # <-- Added for plotting | |
| import numpy as np # <-- Added for calculations | |
| # --- Page config (must be first) --- | |
| st.set_page_config( | |
| page_title="AAN Crypto Simulation Dashboard", | |
| layout="wide" | |
| ) | |
| # --- Inject CSS to slim sidebar --- | |
| st.markdown( | |
| """ | |
| <style> | |
| /* Target the sidebar itself (when expanded) */ | |
| [data-testid="stSidebar"][aria-expanded="true"] { | |
| width: 215px !important; | |
| } | |
| /* Target the inner div for hiding when collapsed */ | |
| [data-testid="stSidebar"][aria-expanded="false"] > div:first-child { | |
| width: 215px !important; /* You might adjust or remove this width */ | |
| margin-left: -215px !important; | |
| } | |
| /* Adjust the main content container's left margin */ | |
| /* This class might change in future Streamlit versions */ | |
| .main .block-container { | |
| padding-left: 2rem; /* Adjust as needed, less than sidebar width */ | |
| padding-right: 2rem; /* Standard padding */ | |
| } | |
| /* Fallback selector if the above doesn't work */ | |
| section.main > div.block-container { | |
| padding-left: 2rem !important; | |
| margin-left: 215px !important; /* Match sidebar width */ | |
| } | |
| /* Attempt to target the specific class if needed, but prefer structural selectors */ | |
| .css-1d391kg { | |
| margin-left: 215px !important; | |
| } | |
| </style> | |
| """, | |
| unsafe_allow_html=True, | |
| ) | |
| # --- PDF Generation Class --- | |
| class PDF_Generator(FPDF): | |
| """ | |
| FPDF subclass with improved table handling and page break management | |
| """ | |
| def __init__(self, orientation='P', unit='mm', format='A4'): | |
| super().__init__(orientation, unit, format) | |
| self.set_auto_page_break(auto=True, margin=15) | |
| self.set_left_margin(15) | |
| self.set_right_margin(15) | |
| self.alias_nb_pages() | |
| self.default_font = 'helvetica' | |
| self.default_font_size = 10 | |
| self.table_row_height = 6 # Fixed row height for tables | |
| self.table_padding = 2 # Vertical padding between rows | |
| def set_default_font(self): | |
| self.set_font(self.default_font, '', self.default_font_size) | |
| def add_html_element(self, tag, styles): | |
| """ Processes a single HTML tag with improved table handling """ | |
| text = tag.get_text() | |
| tag_name = tag.name.lower() | |
| # Basic styling | |
| current_style = '' | |
| if 'b' in styles or 'strong' in styles: | |
| current_style += 'B' | |
| if 'i' in styles or 'em' in styles: | |
| current_style += 'I' | |
| if not current_style: | |
| self.set_default_font() | |
| # Handle different HTML elements | |
| if tag_name in ['h1', 'h2', 'h3', 'h4', 'h5', 'h6']: | |
| level = int(tag_name[1]) | |
| font_size = {1: 18, 2: 16, 3: 14, 4: 12, 5: 11, 6: 10}.get(level, 10) | |
| self.set_font(self.default_font, 'B', font_size) | |
| self.multi_cell(0, font_size * 0.5, text, align='L') | |
| self.ln(font_size * 0.3) | |
| self.set_default_font() | |
| elif tag_name == 'p': | |
| self.set_font(self.default_font, current_style, self.default_font_size) | |
| self.multi_cell(0, 5, text, align='L') | |
| self.ln(3) | |
| elif tag_name == 'ul': | |
| self.ln(2) | |
| for item in tag.find_all('li', recursive=False): | |
| self.set_default_font() | |
| item_text = item.get_text() | |
| self.cell(5, 5, chr(127)) # Use a bullet character | |
| self.multi_cell(0, 5, item_text, align='L') | |
| self.ln(1) | |
| self.ln(3) | |
| elif tag_name == 'ol': | |
| self.ln(2) | |
| for i, item in enumerate(tag.find_all('li', recursive=False), 1): | |
| self.set_default_font() | |
| item_text = item.get_text() | |
| self.cell(8, 5, f"{i}.") | |
| self.multi_cell(0, 5, item_text, align='L') | |
| self.ln(1) | |
| self.ln(3) | |
| elif tag_name == 'table': | |
| self.ln(5) | |
| if self.h - self.y < 50: # Ensure space for table | |
| self.add_page() | |
| self.process_table(tag) | |
| self.ln(5) | |
| elif tag_name in ['b', 'strong', 'i', 'em']: | |
| # These tags modify style, handled by recursion | |
| pass | |
| elif tag_name == 'br': | |
| self.ln(5) | |
| elif tag_name == 'hr': | |
| self.ln(2) | |
| self.line(self.get_x(), self.get_y(), self.w - self.r_margin, self.get_y()) | |
| self.ln(4) | |
| else: | |
| # Handle other inline elements or just text content | |
| if text.strip(): | |
| self.set_font(self.default_font, current_style, self.default_font_size) | |
| # Use write instead of multi_cell for better inline handling if needed | |
| # For simplicity, using multi_cell here which works for paragraphs etc. | |
| self.multi_cell(0, 5, text, align='L') | |
| self.ln(1) # Minimal line break after generic text | |
| def process_table(self, table_tag): | |
| """Robust table processing with page break awareness""" | |
| rows = table_tag.find_all('tr') | |
| if not rows: | |
| return | |
| # Determine column count from first row | |
| first_row_cells = rows[0].find_all(['th', 'td']) | |
| num_cols = len(first_row_cells) | |
| if num_cols == 0: | |
| return | |
| effective_width = self.w - self.l_margin - self.r_margin | |
| col_width = effective_width / num_cols | |
| # Draw header row first if it exists | |
| header_cells = rows[0].find_all('th') | |
| if header_cells: | |
| self._draw_table_row(header_cells, col_width, is_header=True) | |
| rows = rows[1:] # Process remaining rows | |
| # Process data rows | |
| for row in rows: | |
| cells = row.find_all('td') | |
| if len(cells) == num_cols: # Ensure consistent column count | |
| self._draw_table_row(cells, col_width, is_header=False) | |
| def _draw_table_row(self, cells, col_width, is_header): | |
| """Helper to draw a single table row, handling text wrapping and page breaks.""" | |
| max_lines = 1 | |
| cell_data = [] | |
| # First pass: Calculate required height for the row | |
| temp_font = self.font_family | |
| temp_style = self.font_style | |
| temp_size = self.font_size | |
| for cell in cells: | |
| cell_text = cell.get_text().strip() | |
| font = self.default_font | |
| style = 'B' if is_header else '' | |
| size = 9 # Use smaller font for tables | |
| self.set_font(font, style, size) | |
| wrapped_lines = [] | |
| if cell_text: | |
| # Use FPDF's split_only feature correctly | |
| # Create a temporary multi_cell output to get lines | |
| wrapped_lines = self.multi_cell( | |
| w=col_width - 2, # Adjust width slightly for padding/border | |
| h=self.table_row_height, # Use base height for calculation | |
| txt=cell_text, | |
| border=0, | |
| align='L', | |
| split_only=True | |
| ) | |
| max_lines = max(max_lines, len(wrapped_lines) if wrapped_lines else 1) | |
| cell_data.append({ | |
| "text": cell_text, | |
| "lines": wrapped_lines if wrapped_lines else [""], # Ensure at least one empty line | |
| "is_header": is_header | |
| }) | |
| # Restore original font settings | |
| self.set_font(temp_font, temp_style, temp_size) | |
| # Calculate total row height needed | |
| row_height = (max_lines * self.table_row_height) + self.table_padding | |
| # Check page space (including bottom margin) | |
| space_left = self.h - self.y - self.b_margin | |
| if space_left < row_height: | |
| self.add_page() | |
| # Optional: Redraw table header on new page if needed | |
| # if not is_header and table has a header: self._draw_table_row(header_cells, col_width, True) | |
| # Second pass: Draw the row | |
| start_x = self.get_x() | |
| start_y = self.get_y() | |
| for i, data in enumerate(cell_data): | |
| x = start_x + (i * col_width) | |
| y = start_y | |
| self.set_xy(x, y) | |
| # Apply styling | |
| font = self.default_font | |
| style = 'B' if data["is_header"] else '' | |
| size = 9 | |
| self.set_font(font, style, size) | |
| if data["is_header"]: | |
| self.set_fill_color(230, 230, 230) # Light gray for header | |
| else: | |
| self.set_fill_color(255, 255, 255) # White for data cells | |
| # Use multi_cell to draw the wrapped text within the calculated height | |
| self.multi_cell( | |
| w=col_width, | |
| h=self.table_row_height, # Use fixed height per line for consistency | |
| txt="\n".join(data["lines"]), # Join the pre-calculated lines | |
| border=1, | |
| align='L', | |
| fill=True, | |
| new_x=XPos.RIGHT, # Move to the start of the next cell horizontally | |
| new_y=YPos.TOP # Keep the Y position the same for this row | |
| ) | |
| # Reset fill color after each cell (optional, good practice) | |
| self.set_fill_color(255, 255, 255) | |
| # Move cursor down below the drawn row | |
| self.set_xy(self.l_margin, start_y + row_height - self.table_padding) # Adjusted Y position | |
| self.ln(self.table_padding) # Add padding after the row | |
| def create_pdf_report(report_text): | |
| """ | |
| Creates a PDF from markdown text with proper table handling using the enhanced PDF_Generator. | |
| """ | |
| if not report_text: | |
| raise ValueError("Input report_text cannot be empty.") | |
| try: | |
| # Clean Markdown (Replace non-standard dashes) | |
| cleaned_md = re.sub(r'[โโ]', '-', report_text) | |
| # Remove potential markdown code block fences | |
| cleaned_md = re.sub(r'^```markdown\s*', '', cleaned_md, flags=re.MULTILINE) | |
| cleaned_md = re.sub(r'\s*```$', '', cleaned_md, flags=re.MULTILINE) | |
| cleaned_md = cleaned_md.strip() | |
| # Convert Markdown to HTML | |
| html_content = markdown.markdown(cleaned_md, extensions=['tables', 'fenced_code', 'sane_lists']) | |
| if not html_content or not html_content.strip(): | |
| raise ValueError("Markdown parsing resulted in empty HTML.") | |
| # Parse HTML using BeautifulSoup | |
| soup = BeautifulSoup(html_content, 'html.parser') | |
| # Generate PDF using the custom class | |
| pdf = PDF_Generator() | |
| pdf.add_page() | |
| pdf.set_default_font() | |
| # --- Process HTML elements recursively --- | |
| def process_element(tag, current_styles): | |
| if tag.name is None: # Handle NavigableString | |
| if tag.strip(): | |
| # Apply styles for inline elements | |
| style_str = ''.join(list(current_styles)) | |
| pdf.set_font(pdf.default_font, style_str, pdf.default_font_size) | |
| # Use write for potentially better inline flow control | |
| pdf.write(5, tag) | |
| pdf.set_default_font() # Reset to default after writing styled text | |
| return | |
| local_styles_added = set() | |
| if tag.name in ['b', 'strong']: | |
| current_styles.add('B') | |
| local_styles_added.add('B') | |
| elif tag.name in ['i', 'em']: | |
| current_styles.add('I') | |
| local_styles_added.add('I') | |
| # --- Block Level Elements --- | |
| if tag.name in ['p', 'h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'ul', 'ol', 'table', 'br', 'hr']: | |
| # Add some vertical space before block elements if needed | |
| if pdf.get_y() > pdf.t_margin + 5: # Avoid adding space at the top of a page | |
| pass # pdf.ln(1) # Minimal spacing | |
| pdf.add_html_element(tag, current_styles.copy()) | |
| # Block elements process their children within add_html_element | |
| else: | |
| # --- Inline or Unhandled Elements: Process Children --- | |
| if hasattr(tag, 'contents'): | |
| for child in tag.contents: | |
| process_element(child, current_styles.copy()) | |
| # Remove local styles after processing the element and its children | |
| current_styles.difference_update(local_styles_added) | |
| # Process top-level elements in the parsed HTML body | |
| body = soup.find('body') # BeautifulSoup often wraps content in <html><body> | |
| elements_to_process = body.contents if body else soup.contents | |
| for element in elements_to_process: | |
| process_element(element, set()) # Start with empty styles | |
| # --- Output PDF --- | |
| pdf_output = pdf.output(dest='S') | |
| # Ensure output is bytes | |
| if isinstance(pdf_output, str): | |
| pdf_output = pdf_output.encode('latin-1') # FPDF default encoding | |
| return BytesIO(pdf_output) | |
| except ImportError as e: | |
| # Raise a more specific error or log it | |
| logger.error(f"PDF Generation Error: Missing library {e.name}") | |
| raise Exception(f"PDF generation failed due to missing library: {e.name}") from e | |
| except Exception as e: | |
| logger.error(f"PDF Generation Error: {e}", exc_info=True) | |
| raise Exception(f"PDF generation failed: {e}") from e | |
| # --- Logging setup --- | |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
| logger = logging.getLogger(__name__) | |
| def setup_dns(): | |
| """Patch socket.getaddrinfo to use custom DNS for api.coingecko.com.""" | |
| nameserver1 = os.getenv('NAMESERVER1', '8.8.8.8') | |
| nameserver2 = os.getenv('NAMESERVER2', '8.8.4.4') | |
| try: | |
| # Ensure dnspython is available | |
| import dns.resolver | |
| resolver = dns.resolver.Resolver() | |
| resolver.nameservers = [nameserver1, nameserver2] | |
| # Store original function | |
| _orig_getaddrinfo = socket.getaddrinfo | |
| def patched_getaddrinfo(*args, **kwargs): | |
| hostname = args[0] if args else None | |
| # Only patch for the specific host we care about | |
| if hostname == 'api.coingecko.com': | |
| try: | |
| # Resolve using custom DNS | |
| answers = resolver.resolve(hostname, 'A') | |
| if answers: | |
| ip = str(answers[0]) | |
| port = args[1] if len(args) > 1 else 0 | |
| # Call original function with resolved IP | |
| # Note: This simplistic patch assumes IPv4 and might need refinement | |
| # for broader use cases (e.g., handling different families) | |
| return _orig_getaddrinfo(ip, port, *args[2:], **kwargs) | |
| except dns.resolver.NXDOMAIN: | |
| logger.warning(f"Custom DNS: NXDOMAIN for {hostname}. Falling back.") | |
| except dns.resolver.NoAnswer: | |
| logger.warning(f"Custom DNS: No A records found for {hostname}. Falling back.") | |
| except dns.exception.Timeout: | |
| logger.warning(f"Custom DNS: Timeout resolving {hostname}. Falling back.") | |
| except Exception as resolve_err: | |
| logger.warning(f"Custom DNS resolution failed for {hostname}: {resolve_err}. Falling back.") | |
| # For any other hostname or if custom resolution fails, use the original function | |
| return _orig_getaddrinfo(*args, **kwargs) | |
| socket.getaddrinfo = patched_getaddrinfo | |
| logger.info(f"socket.getaddrinfo patched to use DNS servers: {resolver.nameservers}") | |
| return True | |
| except ImportError: | |
| logger.warning("dnspython not installed. Cannot patch DNS. Using system default.") | |
| return False | |
| except Exception as e: | |
| logger.error(f"DNS setup failed: {e}") | |
| return False | |
| # --- Load environment & initialize DNS --- | |
| load_dotenv() | |
| if 'dns_setup_done' not in st.session_state: | |
| st.session_state.dns_setup_done = setup_dns() | |
| if not st.session_state.dns_setup_done and os.getenv('NAMESERVER1'): | |
| st.warning("DNS patch failed or dnspython not installed; network requests may use system DNS.", icon="โ ๏ธ") | |
| # --- API clients & keys --- | |
| GEMINI_API_KEY = os.getenv("GEMINI_API_KEY") | |
| LINKUP_API_KEY = os.getenv("LINKUP_API_KEY") | |
| # CoinGecko (no key required) - Initialize robustly | |
| if 'cg' not in st.session_state: | |
| try: | |
| cg_client = CoinGeckoAPI() | |
| ping_result = cg_client.ping() | |
| if ping_result.get('gecko_says', '').startswith('(V3)'): | |
| st.session_state.cg = cg_client | |
| st.session_state.coingecko_status = "success" | |
| logger.info("CoinGecko API client initialized and ping successful.") | |
| else: | |
| st.session_state.cg = None | |
| st.session_state.coingecko_status = f"CoinGecko Ping Error: Unexpected response {ping_result}" | |
| logger.error(f"CoinGecko API ping failed: {ping_result}") | |
| except Exception as e: | |
| st.session_state.cg = None | |
| # Check for common network errors | |
| if isinstance(e, (socket.gaierror, ConnectionRefusedError)): | |
| st.session_state.coingecko_status = f"Network Error: {e}" | |
| else: | |
| st.session_state.coingecko_status = f"Initialization Error: {e}" | |
| logger.error(f"Failed to initialize CoinGecko API client: {e}", exc_info=True) | |
| # Gemini (needs key) | |
| if 'gemini_model' not in st.session_state: | |
| if GEMINI_API_KEY: | |
| try: | |
| genai.configure(api_key=GEMINI_API_KEY) | |
| # Attempt to use the newer model first | |
| model_name = 'gemini-2.5-pro-exp-03-25' | |
| try: | |
| gem = genai.GenerativeModel(model_name) | |
| st.session_state.gemini_model = gem | |
| st.session_state.gemini_status = "success" | |
| logger.info(f"Gemini AI client initialized successfully (using {model_name}).") | |
| except Exception as e_latest: | |
| logger.warning(f"Failed to initialize {model_name}: {e_latest}. Trying gemini-2.0-flash-thinking-exp") | |
| try: | |
| model_name = 'gemini-2.0-flash-thinking-exp' # Fallback model | |
| gem = genai.GenerativeModel(model_name) | |
| st.session_state.gemini_model = gem | |
| st.session_state.gemini_status = "success" | |
| logger.info(f"Gemini AI client initialized successfully (using {model_name}).") | |
| except Exception as e_pro: | |
| st.session_state.gemini_model = None | |
| st.session_state.gemini_status = f"Gemini Init Error: {e_pro}" | |
| logger.error(f"Failed to initialize Gemini AI client: {e_pro}") | |
| except Exception as e_config: | |
| st.session_state.gemini_model = None | |
| st.session_state.gemini_status = f"Gemini Config Error: {e_config}" | |
| logger.error(f"Failed to configure Gemini AI: {e_config}") | |
| else: | |
| st.session_state.gemini_model = None | |
| st.session_state.gemini_status = "No Gemini API key; AI disabled." | |
| logger.warning("Gemini API key not found.") | |
| # Get clients from session state | |
| cg = st.session_state.get('cg') | |
| coingecko_status = st.session_state.get('coingecko_status', 'Not Initialized') | |
| gemini_model = st.session_state.get('gemini_model') | |
| gemini_status = st.session_state.get('gemini_status', 'Not Initialized') | |
| # --- Instantiate Yahoo Finance News Tool --- | |
| if 'yahoo_news_tool' not in st.session_state: | |
| st.session_state.yahoo_news_tool = YahooFinanceNewsTool() | |
| yahoo_news_tool = st.session_state.yahoo_news_tool | |
| # --- Linkup Client --- | |
| lclient = None | |
| if LINKUP_API_KEY: | |
| try: | |
| lclient = LinkupClient(api_key=LINKUP_API_KEY) | |
| logger.info("Linkup Client initialized.") | |
| except Exception as e: | |
| logger.error(f"Failed to initialize Linkup Client: {e}") | |
| st.session_state.linkup_status = f"Linkup Init Error: {e}" | |
| else: | |
| st.session_state.linkup_status = "No Linkup API key; Research disabled." | |
| logger.warning("Linkup API key not found.") | |
| # --- Constants & symbol maps --- | |
| REFRESH_INTERVAL = 300 # secs (5 minutes) | |
| COINGECKO_IDS = { | |
| 'BTC': 'bitcoin', 'ETH': 'ethereum', 'SOL': 'solana', 'DOGE': 'dogecoin', | |
| 'USDT': 'tether', 'USDC': 'usd-coin', 'XRP': 'ripple', 'ADA': 'cardano', | |
| 'AVAX': 'avalanche-2', 'DOT': 'polkadot', 'LINK': 'chainlink', 'MATIC':'matic-network', | |
| 'LTC': 'litecoin', 'BCH': 'bitcoin-cash', 'BNB': 'binancecoin', 'XLM': 'stellar', | |
| 'SHIB': 'shiba-inu', 'TRX': 'tron', 'DAI': 'dai', 'WBTC': 'wrapped-bitcoin', | |
| 'ETC': 'ethereum-classic', 'ICP': 'internet-computer', 'LEO': 'leo-token', | |
| 'UNI': 'uniswap', 'ATOM': 'cosmos', 'OKB': 'okb', 'TON': 'the-open-network' # Added more coins | |
| } | |
| SYMBOL_MAP = {v: k for k, v in COINGECKO_IDS.items()} | |
| ALL_SYMBOLS = sorted(list(COINGECKO_IDS.keys())) # Ensure it's sorted | |
| # --- Session state defaults --- | |
| if 'portfolio' not in st.session_state: | |
| st.session_state.portfolio = {'USD': {'quantity': 1000.0, 'type': 'cash'}} | |
| if 'all_coin_prices' not in st.session_state: | |
| st.session_state.all_coin_prices = {} | |
| if 'trade_history' not in st.session_state: | |
| st.session_state.trade_history = [] | |
| if 'last_refresh_time' not in st.session_state: | |
| st.session_state.last_refresh_time = 0 | |
| if 'performance_data_cache' not in st.session_state: | |
| st.session_state.performance_data_cache = {} # Cache for historical data | |
| # --- Data utilities --- | |
| def fetch_all_coin_prices(): | |
| if not cg: | |
| logger.warning("CoinGecko client not available, cannot fetch all prices.") | |
| st.session_state.all_coin_prices = st.session_state.get('all_coin_prices', {}) | |
| return False | |
| ids_to_fetch = list(COINGECKO_IDS.values()) | |
| if not ids_to_fetch: | |
| logger.warning("No CoinGecko IDs defined to fetch prices.") | |
| st.session_state.all_coin_prices = {} | |
| return False | |
| try: | |
| logger.info(f"Attempting to fetch prices for {len(ids_to_fetch)} coins...") | |
| # Fetch price and 24h change | |
| markets_data = cg.get_coins_markets(vs_currency='usd', ids=ids_to_fetch, price_change_percentage='24h') | |
| fetched_prices = {} | |
| missing_symbols = [] | |
| if not markets_data: | |
| logger.warning("Received empty market data from CoinGecko.") | |
| st.session_state.all_coin_prices = st.session_state.get('all_coin_prices', {}) # Keep old prices if fetch fails | |
| return False | |
| for coin_data in markets_data: | |
| cg_id = coin_data.get('id') | |
| symbol = SYMBOL_MAP.get(cg_id) | |
| price = coin_data.get('current_price') | |
| if symbol and price is not None: | |
| try: | |
| fetched_prices[symbol] = float(price) | |
| except (ValueError, TypeError): | |
| logger.warning(f"Could not convert price for {symbol} ({cg_id}) to float: {price}") | |
| missing_symbols.append(symbol) | |
| elif symbol: | |
| missing_symbols.append(symbol) | |
| logger.warning(f"Price data missing or null for {symbol} ({cg_id}) in API response.") | |
| # Optionally store 24h change if needed elsewhere | |
| # price_change_24h = coin_data.get('price_change_percentage_24h') | |
| # if symbol and price_change_24h is not None: | |
| # st.session_state.all_coin_prices_24h_change[symbol] = price_change_24h | |
| st.session_state.all_coin_prices = fetched_prices | |
| st.session_state.last_refresh_time = time.time() | |
| logger.info(f"Successfully fetched prices for {len(fetched_prices)} coins.") | |
| if missing_symbols: | |
| logger.warning(f"Could not fetch valid prices for: {', '.join(sorted(set(missing_symbols)))}") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Error fetching all coin prices: {e}", exc_info=True) | |
| # Preserve existing prices if fetch fails to avoid breaking the UI | |
| st.session_state.all_coin_prices = st.session_state.get('all_coin_prices', {}) | |
| st.toast(f"โ ๏ธ Price fetch failed: {e}. Using cached prices.", icon="๐จ") | |
| # Update status to reflect the error for sidebar display | |
| st.session_state.coingecko_status = f"Fetch Error: {e}" | |
| return False | |
| def update_prices(): | |
| logger.info("Triggering update of all coin prices.") | |
| # Reset status before attempting fetch | |
| if cg: st.session_state.coingecko_status = "success" # Assume success initially | |
| return fetch_all_coin_prices() # Return status of fetch | |
| def calculate_portfolio_value(): | |
| total = st.session_state.portfolio.get('USD', {}).get('quantity', 0.0) | |
| all_prices = st.session_state.get('all_coin_prices', {}) | |
| for sym, data in st.session_state.portfolio.items(): | |
| if data.get('type') == 'crypto': | |
| price = all_prices.get(sym) | |
| if price is not None and price > 0: | |
| total += data.get('quantity', 0) * price | |
| elif price is None: | |
| logger.debug(f"Price for {sym} is None during portfolio calculation.") | |
| return total | |
| def simulate_trade(trade_type, symbol, quantity, price): | |
| symbol = symbol.upper() | |
| if price is None or price <= 0: | |
| st.error(f"Invalid or unavailable price (${price}) for {symbol}. Cannot simulate trade.") | |
| logger.error(f"Simulate trade failed for {symbol}: Invalid price provided ({price})") | |
| return False | |
| cost = quantity * price | |
| p = st.session_state.portfolio | |
| current_time = pd.Timestamp.now(tz='UTC') | |
| if trade_type == 'BUY': | |
| usd_balance = p.get('USD', {}).get('quantity', 0.0) | |
| if usd_balance < cost: | |
| st.error(f"Insufficient USD balance. Need ${cost:.2f}, have ${usd_balance:.2f}") | |
| return False | |
| if 'USD' not in p: p['USD'] = {'quantity': 0.0, 'type': 'cash'} | |
| p['USD']['quantity'] -= cost | |
| if symbol not in p: | |
| p[symbol] = {'quantity': 0.0, 'type': 'crypto'} | |
| elif p[symbol].get('type') != 'crypto': | |
| p[symbol]['type'] = 'crypto' # Ensure type is correct | |
| if 'quantity' not in p[symbol]: p[symbol]['quantity'] = 0.0 | |
| p[symbol]['quantity'] += quantity | |
| st.session_state.trade_history.append({ | |
| 'Timestamp': current_time, 'Type': 'BUY', 'Symbol': symbol, | |
| 'Quantity': quantity, 'Price (USD)': price, 'Cost (USD)': cost | |
| }) | |
| st.success(f"BUY {quantity:.8f} {symbol} @ ${price:.4f} (Cost: ${cost:.2f})") | |
| logger.info(f"Simulated BUY: {quantity} {symbol} @ {price}") | |
| return True | |
| elif trade_type == 'SELL': | |
| # Check if the crypto exists and has quantity key | |
| if symbol not in p or 'quantity' not in p[symbol]: | |
| st.error(f"You do not own any {symbol} to sell.") | |
| return False | |
| crypto_balance = p[symbol].get('quantity', 0.0) | |
| # Use a small epsilon for float comparison | |
| if crypto_balance < quantity - 1e-9: # Check if balance is less than needed quantity | |
| st.error(f"Not enough {symbol} to sell. Have {crypto_balance:.8f}, need {quantity:.8f}") | |
| return False | |
| p[symbol]['quantity'] -= quantity | |
| # Remove symbol from portfolio if quantity becomes effectively zero | |
| if p[symbol]['quantity'] < 1e-9: | |
| del p[symbol] | |
| if 'USD' not in p: # Ensure USD entry exists | |
| p['USD'] = {'quantity': 0.0, 'type': 'cash'} | |
| p['USD']['quantity'] += cost # Add proceeds | |
| st.session_state.trade_history.append({ | |
| 'Timestamp': current_time, 'Type': 'SELL', 'Symbol': symbol, | |
| 'Quantity': quantity, 'Price (USD)': price, 'Proceeds (USD)': cost # Use Proceeds for SELL | |
| }) | |
| st.success(f"SELL {quantity:.8f} {symbol} @ ${price:.4f} (Proceeds: ${cost:.2f})") | |
| logger.info(f"Simulated SELL: {quantity} {symbol} @ {price}") | |
| return True | |
| else: | |
| st.error(f"Unknown trade type: {trade_type}") | |
| logger.error(f"Unknown trade type encountered: {trade_type}") | |
| return False | |
| def should_update_prices(): | |
| last_refresh = st.session_state.get('last_refresh_time', 0) | |
| return (time.time() - last_refresh) >= REFRESH_INTERVAL | |
| def get_coin_news(symbols, max_updates=3): | |
| """ | |
| Fetch the latest financial news for each symbol using YahooFinanceNewsTool. | |
| Returns a dict mapping symbol -> list of news items (strings). | |
| """ | |
| news = {} | |
| # Ensure symbols is iterable and contains strings | |
| valid_symbols = [str(s).upper() for s in (symbols or []) if isinstance(s, str)] | |
| for sym_up in valid_symbols: | |
| try: | |
| # Use the LangChain tool to fetch news | |
| # Append '-USD' which sometimes helps Yahoo Finance find crypto pairs | |
| search_term = f"{sym_up}-USD" | |
| result = yahoo_news_tool.invoke(search_term) | |
| if result and "Item Not Found" not in result and "cannot find" not in result.lower(): | |
| # Split on newlines and remove empty strings | |
| items = [line.strip() for line in result.split('\n') if line.strip()] | |
| # Limit the number of items | |
| news[sym_up] = items[:max_updates] if items else [f"No specific news items found for {sym_up} via Yahoo Finance."] | |
| else: | |
| # Try without '-USD' as a fallback | |
| result_fallback = yahoo_news_tool.invoke(sym_up) | |
| if result_fallback and "Item Not Found" not in result_fallback and "cannot find" not in result_fallback.lower(): | |
| items = [line.strip() for line in result_fallback.split('\n') if line.strip()] | |
| news[sym_up] = items[:max_updates] if items else [f"No specific news items found for {sym_up} via Yahoo Finance."] | |
| else: | |
| news[sym_up] = [f"No news found for {sym_up} via Yahoo Finance."] | |
| except Exception as e: | |
| logger.error(f"News fetch error for {sym_up}: {e}", exc_info=True) | |
| news[sym_up] = [f"Error fetching news for {sym_up}."] # Avoid showing detailed error to user | |
| return news | |
| def get_linkup_news(symbols): | |
| """Fetches news/research using LinkupClient.""" | |
| if not lclient: | |
| return "Linkup client not available (check API key)." | |
| if not symbols: | |
| return "No symbols provided for Linkup research." | |
| symbol_string = ", ".join(symbols) # Create string like "BTC, ETH, SOL" | |
| query = f"Provide the latest crypto news, expert opinions, rumors, and market trends and also tell more about these cryptocurrencies: {symbol_string}." | |
| try: | |
| response = lclient.search( | |
| query=query, | |
| depth="standard", # or 'deep' if needed | |
| output_type="searchResults", # Get structured results if possible, or 'answer' for summary | |
| include_images=False, | |
| ) | |
| return str(response) | |
| except Exception as e: | |
| logger.error(f"Linkup API search failed: {e}", exc_info=True) | |
| return f"Error getting Linkup research: {e}" | |
| def get_gemini_recommendation(market_context, news_map, linkup_research): | |
| """ | |
| Build prompt with strategies, news, portfolio, prices, then call Gemini. | |
| Uses the detailed prompt structure and all_coin_prices. | |
| """ | |
| if not gemini_model: | |
| return "Gemini AI model not available." | |
| # --- Strategies --- | |
| # (Keep the strategies dictionary as defined before) | |
| strategies = { | |
| "Momentum Trading": "Ride strong directional moves by buying assets in an uptrend and selling when momentum fades.", | |
| "Scalping": "Capture small profits via quick inโandโout trades over very short timeframes.", | |
| "Range Trading": "Buy at established support levels and sell at resistance within defined price ranges.", | |
| "Breakout Trading": "Enter trades when price decisively breaks key support or resistance levels for strong followโthrough moves.", | |
| "Basket Trading": "Trade a diversified set of assets simultaneously to reduce idiosyncratic risk.", | |
| "Arbitrage": "Exploit price discrepancies across different markets or exchanges to earn lowโrisk profits.", | |
| "Mean Reversion": "Bet on price reversals by buying when an asset deviates below its historical average and selling when it rises above.", | |
| "Pullback Trading": "Enter on temporary retracements toward trendโdefining support or moving averages within an established trend.", | |
| "Chart Pattern Trading": "Use classical price formationsโlike headโandโshoulders, triangles, and double tops/bottomsโto anticipate trend reversals or continuations.", | |
| "Fibonacci Retracement": "Use horizontal lines at Fibonacci ratios (23.6%,ย 38.2%,ย 61.8%) to identify potential support and resistance levels.", | |
| "Moving Average Crossover": "Execute trades when a shortโterm moving average crosses above or below a longerโterm moving average.", | |
| "News Trading": "Time entries and exits around significant news releases to capture rapid market reactions.", | |
| "Sentiment Analysis": "Leverage socialโmedia and onโchain sentiment indicators to gauge market mood and timing.", | |
| "Statistical Arbitrage": "Automate trades based on quantitative models that exploit temporary price inefficiencies between correlated instruments.", | |
| "Grid Trading": "Place a series of staggered buy and sell orders at predefined price intervals to profit from market oscillations.", | |
| } | |
| strategies_str = "\n".join([f"- **{k}**: {v}" for k, v in strategies.items()]) | |
| # --- News Formatting --- | |
| news_sections = [] | |
| if news_map: | |
| for sym, items in news_map.items(): | |
| if items: # Only include if there are news items | |
| # Format each item as a bullet point | |
| item_bullets = "\n - ".join(items) | |
| news_sections.append(f"**{sym}**:\n - {item_bullets}") | |
| news_str = "\n\n".join(news_sections) if news_sections else "No recent news updates found for portfolio assets via Yahoo Finance." | |
| # --- Portfolio and Price Formatting --- | |
| port_val = calculate_portfolio_value() | |
| all_prices = st.session_state.get('all_coin_prices', {}) | |
| # Include all known symbols' prices, not just portfolio ones | |
| all_known_symbols = ALL_SYMBOLS | |
| price_lines = [] | |
| for s in sorted([sym for sym in all_known_symbols if sym != 'USD']): # Sort symbols | |
| price = all_prices.get(s) | |
| price_str = f"${price:,.4f}" if price is not None else "N/A" | |
| price_lines.append(f"- {s}/USD: {price_str}") | |
| price_str = "\n".join(price_lines) or "No price data available." | |
| # Portfolio holdings formatting | |
| port_lines = [] | |
| portfolio_copy = dict(st.session_state.portfolio) | |
| # Handle USD first | |
| if 'USD' in portfolio_copy: | |
| usd_data = portfolio_copy.pop('USD') | |
| usd_qty = usd_data.get('quantity', 0.0) | |
| usd_pct = (usd_qty / port_val * 100) if port_val > 0 else 0 | |
| port_lines.append(f"- USD (Cash): ${usd_qty:,.2f} ({usd_pct:.1f}%)") | |
| # Handle crypto assets, sorted | |
| for s in sorted(portfolio_copy.keys()): | |
| data = portfolio_copy[s] | |
| qty = data.get('quantity', 0.0) | |
| pr = all_prices.get(s) # Use cached price | |
| if pr is not None and qty > 1e-9: # Only show assets with quantity > 0 | |
| val = qty * pr | |
| pct = (val / port_val * 100) if port_val > 0 else 0 | |
| port_lines.append(f"- {s}: {qty:.6f} @ ${pr:,.4f} = ${val:,.2f} ({pct:.1f}%)") | |
| elif qty > 1e-9: # Show assets even if price is missing | |
| port_lines.append(f"- {s}: {qty:.6f} @ N/A = Value N/A") | |
| portfolio_str = "\n".join(port_lines) if port_lines else "No assets held." | |
| if not portfolio_str and 'USD' not in st.session_state.portfolio: | |
| portfolio_str = "Portfolio is empty." | |
| # --- Build the Prompt --- | |
| prompt = f""" | |
| You are a helpful and engaging crypto trading analyst. Your goal is to provide a structured report with actionable insights. | |
| **Date:** {datetime.now().strftime('%Y-%m-%d')} | |
| **Available Trading Strategies:** | |
| {strategies_str} | |
| **Analysis Request:** | |
| 1. **Strategy Selection:** Choose one or two strategies from the list above that seem most relevant based on the current data. You can also suggest variations or other standard strategies not listed. | |
| 2. **Justification:** Clearly explain *why* you chose these strategies, linking them to the provided news, research, market context, or current portfolio composition. Use clear, accessible language. | |
| 3. **Actionable Recommendations:** Provide **specific, simulated trade ideas** (BUY or SELL). Include: | |
| * Coin Symbol (e.g., BTC, ETH). | |
| * Suggested Trade Size (e.g., '$100 worth', '0.5 ETH', or '% of available cash/holding'). | |
| * Clear reasoning for each trade, tied back to your chosen strategy and the data. | |
| 4. **Summaries:** Briefly summarize the key takeaways from the news and research provided. | |
| **Current Data:** | |
| **1. Recent News (Yahoo Finance):** | |
| {news_str} | |
| **2. Market Research & Trends (Linkup):** | |
| {linkup_research} | |
| **3. User-Provided Context:** | |
| {market_context or 'None provided.'} | |
| **4. Simulated Portfolio:** | |
| {portfolio_str} | |
| Total Portfolio Value: ${port_val:,.2f} | |
| **5. Current Market Prices (USD):** | |
| {price_str} | |
| **Your Report:** | |
| Format your response using Markdown. Use headings, bullet points, and potentially tables for clarity. Ensure all text uses standard ASCII characters suitable for 'helvetica' font (replace special dashes like 'โ' or 'โ' with '-'). | |
| **Report Sections:** | |
| * **News & Research Summary:** | |
| * **Chosen Strategies & Justifications:** | |
| * **Simulated Trade Recommendations:** | |
| * Trade 1: [BUY/SELL] [Symbol] - [Size] - [Reasoning] | |
| * Trade 2: [BUY/SELL] [Symbol] - [Size] - [Reasoning] | |
| * (Add more trades if applicable) | |
| * **Overall Outlook/Next Steps:** (Optional brief concluding thought) | |
| """ | |
| logger.debug(f"--- Gemini Prompt Start ---\n{prompt[:500]}...\n--- Gemini Prompt End ---") # Log truncated prompt | |
| try: | |
| # Configure safety settings if needed (optional) | |
| safety_settings = [ | |
| {"category": "HARM_CATEGORY_HARASSMENT", "threshold": "BLOCK_MEDIUM_AND_ABOVE"}, | |
| {"category": "HARM_CATEGORY_HATE_SPEECH", "threshold": "BLOCK_MEDIUM_AND_ABOVE"}, | |
| {"category": "HARM_CATEGORY_SEXUALLY_EXPLICIT", "threshold": "BLOCK_MEDIUM_AND_ABOVE"}, | |
| {"category": "HARM_CATEGORY_DANGEROUS_CONTENT", "threshold": "BLOCK_MEDIUM_AND_ABOVE"}, | |
| ] | |
| # Set generation config (e.g., temperature for creativity) | |
| generation_config = genai.types.GenerationConfig( | |
| temperature=0.7, # Adjust for desired creativity/determinism | |
| max_output_tokens=4096 # Set a reasonable limit | |
| ) | |
| response = gemini_model.generate_content( | |
| prompt, | |
| generation_config=generation_config, | |
| safety_settings=safety_settings | |
| ) | |
| # --- Process Response --- | |
| # Check for blocked content first | |
| if not response.candidates: | |
| block_reason = getattr(response, 'prompt_feedback', {}).get('block_reason', 'Unknown') | |
| block_details = getattr(response, 'prompt_feedback', {}).get('block_reason_message', 'No details provided.') | |
| logger.warning(f"Gemini response blocked. Reason: {block_reason}, Details: {block_details}") | |
| return f"โ ๏ธ AI recommendation blocked by safety filters. Reason: {block_reason}. Please adjust context or retry." | |
| # Access text content correctly based on potential structures | |
| if hasattr(response, 'text'): | |
| recommendation = response.text | |
| elif response.candidates and hasattr(response.candidates[0], 'content') and hasattr(response.candidates[0].content, 'parts'): | |
| recommendation = "".join(part.text for part in response.candidates[0].content.parts if hasattr(part, 'text')) | |
| else: | |
| # Fallback or if structure is unexpected | |
| logger.warning(f"Unexpected Gemini response format: {response}") | |
| return "AI response format was unexpected. Could not extract text." | |
| if recommendation: | |
| # Basic cleaning of the recommendation text | |
| recommendation = recommendation.replace('โ', '-').replace('โ', '-') # Replace dashes | |
| logger.info("Successfully generated Gemini recommendation.") | |
| return recommendation.strip() | |
| else: | |
| logger.warning(f"Gemini response parts did not contain text: {response.candidates}") | |
| return "AI response received but contained no actionable text content." | |
| except Exception as e: | |
| logger.error(f"Gemini API call failed: {e}", exc_info=True) | |
| # Provide a more user-friendly error message | |
| return f"โ Error generating AI recommendation. Details: {str(e)}. Please check API key, quota, or try again later." | |
| # --- Initial Data Fetch --- | |
| needs_initial_fetch = (not st.session_state.all_coin_prices or st.session_state.last_refresh_time == 0) | |
| if needs_initial_fetch and coingecko_status == "success": | |
| logger.info("Performing initial fetch of all coin prices...") | |
| with st.spinner("Loading initial market prices..."): | |
| # update_prices already handles logging and potential errors | |
| if not update_prices(): | |
| st.error("Initial price fetch failed. Some features might be limited. Please try refreshing manually.", icon="๐จ") | |
| elif needs_initial_fetch and coingecko_status != "success": | |
| st.warning(f"CoinGecko API not available ({coingecko_status}). Prices cannot be fetched.", icon="โ ๏ธ") | |
| # --- UI Sections --- | |
| def sidebar_section(): | |
| st.sidebar.title("โ๏ธ Controls") | |
| st.sidebar.divider() | |
| st.sidebar.subheader("Manage Portfolio") | |
| # --- Add Cash --- | |
| # Use session state key for persistence across reruns | |
| if 'cash_input_value' not in st.session_state: | |
| st.session_state.cash_input_value = 0.0 | |
| add_cash = st.sidebar.number_input( | |
| "Add Cash (USD)", | |
| min_value=0.0, | |
| # Use the key to control the value directly | |
| key="add_cash_input_field", # Unique key for the input widget itself | |
| value=st.session_state.cash_input_value, # Display the stored value | |
| step=100.0, | |
| format="%.2f", | |
| help="Enter the amount of USD cash to add to your simulation balance." | |
| ) | |
| # Update the stored value whenever the input changes | |
| st.session_state.cash_input_value = add_cash | |
| if st.sidebar.button("โ Add Cash", key="add_cash_button"): | |
| if st.session_state.cash_input_value > 0: | |
| # Ensure USD entry exists | |
| if 'USD' not in st.session_state.portfolio: | |
| st.session_state.portfolio['USD'] = {'quantity': 0.0, 'type': 'cash'} | |
| st.session_state.portfolio['USD']['quantity'] += st.session_state.cash_input_value | |
| st.sidebar.success(f"Added ${st.session_state.cash_input_value:,.2f}") | |
| logger.info(f"Added ${st.session_state.cash_input_value:.2f} cash.") | |
| # Reset the input field's stored value after adding | |
| st.session_state.cash_input_value = 0.0 | |
| st.rerun() # Update portfolio display immediately | |
| else: | |
| st.sidebar.warning("Please enter a positive amount to add.") | |
| # --- Add/Update Crypto --- | |
| st.sidebar.markdown("---") | |
| # Use selectbox with sorted symbols | |
| # Provide a sensible default, e.g., 'BTC' or the first available symbol | |
| default_symbol = 'BTC' if 'BTC' in ALL_SYMBOLS else (ALL_SYMBOLS[0] if ALL_SYMBOLS else None) | |
| # Find index robustly | |
| symbol_index = 0 | |
| if default_symbol and ALL_SYMBOLS: | |
| try: | |
| symbol_index = ALL_SYMBOLS.index(default_symbol) # Use ALL_SYMBOLS directly as it's sorted | |
| except ValueError: | |
| symbol_index = 0 # Fallback | |
| add_symbol = st.sidebar.selectbox( | |
| "Coin Symbol", | |
| options=ALL_SYMBOLS, # Use the globally sorted list | |
| index=symbol_index, | |
| key="add_symbol_select", | |
| help="Select the cryptocurrency symbol to add or update." | |
| ) | |
| # Get current quantity for the selected symbol to pre-fill the input | |
| current_qty = st.session_state.portfolio.get(add_symbol, {}).get('quantity', 0.0) | |
| add_qty = st.sidebar.number_input( | |
| f"Set Quantity for {add_symbol}", # Dynamic label | |
| min_value=0.0, | |
| value=current_qty, # Show current quantity | |
| step=0.01, | |
| format="%.8f", | |
| key=f"add_qty_input_{add_symbol}", # Key depends on symbol to reset correctly on symbol change | |
| help=f"Enter the total quantity of {add_symbol} you want in your portfolio. This will overwrite the current amount." | |
| ) | |
| if st.sidebar.button("๐พ Set Quantity", key="add_update_button"): | |
| if add_symbol and add_symbol != 'USD': # Ensure a valid crypto symbol is selected | |
| # Update or add the symbol | |
| if add_qty > 1e-9: # Only add/update if quantity is positive | |
| st.session_state.portfolio[add_symbol] = { | |
| 'quantity': add_qty, | |
| 'type': 'crypto' | |
| } | |
| st.sidebar.success(f"{add_symbol} quantity set to {add_qty:.8f}") | |
| logger.info(f"Set portfolio quantity for {add_symbol} to {add_qty}") | |
| elif add_symbol in st.session_state.portfolio: # If quantity is zero, remove it | |
| del st.session_state.portfolio[add_symbol] | |
| st.sidebar.success(f"{add_symbol} removed from portfolio (quantity set to 0).") | |
| logger.info(f"Removed {add_symbol} from portfolio (quantity set to 0).") | |
| else: | |
| # Quantity is 0 and asset wasn't in portfolio anyway | |
| st.sidebar.info(f"{add_symbol} not added (quantity is 0).") | |
| st.rerun() # Rerun to update the portfolio display | |
| elif add_symbol == 'USD': | |
| st.sidebar.warning("Cannot set quantity for USD directly. Use 'Add Cash'.") | |
| else: | |
| st.sidebar.warning("Please select a valid cryptocurrency symbol.") | |
| # --- Remove Crypto --- | |
| # Filter only crypto assets currently in the portfolio with quantity > 0 | |
| removeables = sorted([s for s, d in st.session_state.portfolio.items() | |
| if s != 'USD' and d.get('type') == 'crypto' and d.get('quantity', 0) > 1e-9]) | |
| if removeables: | |
| st.sidebar.markdown("---") | |
| rm_symbol = st.sidebar.selectbox( | |
| "Remove Asset", | |
| options=removeables, | |
| index=0, # Default to the first asset in the list | |
| key="remove_symbol_select", | |
| help="Select a crypto asset to completely remove from your portfolio." | |
| ) | |
| if st.sidebar.button(f"โ Remove {rm_symbol}", type="secondary", key="remove_button"): | |
| if rm_symbol in st.session_state.portfolio: | |
| removed_info = st.session_state.portfolio.pop(rm_symbol) | |
| st.sidebar.success(f"Removed {rm_symbol} (was {removed_info.get('quantity', 0):.8f})") | |
| logger.info(f"Removed {rm_symbol} from portfolio.") | |
| st.rerun() # Update portfolio display | |
| else: | |
| # This case shouldn't happen if options are generated correctly | |
| st.sidebar.warning(f"{rm_symbol} not found in portfolio (already removed?).") | |
| # --- Status & Refresh --- | |
| st.sidebar.header("๐ Status & Sync") | |
| # Display CoinGecko status | |
| cg_status = st.session_state.get('coingecko_status', 'Not Initialized') | |
| if cg_status == "success": | |
| st.sidebar.success("CoinGecko: Connected") | |
| elif "Error" in cg_status: | |
| st.sidebar.error(f"CoinGecko: {cg_status}") | |
| else: # e.g., Not Initialized | |
| st.sidebar.warning(f"CoinGecko: {cg_status}") | |
| # Display Gemini status | |
| gem_status = st.session_state.get('gemini_status', 'Not Initialized') | |
| if gemini_model and gem_status == "success": | |
| st.sidebar.success("Gemini AI: Ready ๐ฅ") | |
| elif "Error" in gem_status or "No Gemini API key" in gem_status: | |
| st.sidebar.error(f"Gemini AI: {gem_status}") | |
| else: | |
| st.sidebar.warning(f"Gemini AI: {gem_status}") | |
| # Display Linkup status (if applicable) | |
| linkup_status = st.session_state.get('linkup_status') | |
| if lclient: | |
| st.sidebar.success("Linkup: Ready ๐") | |
| elif linkup_status: | |
| if "Error" in linkup_status or "No Linkup API key" in linkup_status: | |
| st.sidebar.error(f"Linkup: {linkup_status}") | |
| else: | |
| st.sidebar.warning(f"Linkup: {linkup_status}") | |
| if st.sidebar.button("๐ Refresh Prices Now", key="refresh_button", help="Manually fetch the latest prices for all tracked coins."): | |
| if cg: # Only allow manual refresh if coingecko client exists | |
| with st.spinner("Refreshing all prices..."): | |
| if update_prices(): # Calls fetch_all_coin_prices() which updates status | |
| st.toast("Prices refreshed!", icon="โ ") | |
| else: | |
| # update_prices() or fetch_all_coin_prices() already shows toast/error and updates status | |
| st.sidebar.error("Price refresh failed. Check status above.") | |
| st.rerun() # Rerun to update displays with new prices | |
| else: | |
| st.sidebar.error("CoinGecko connection failed. Cannot refresh.") | |
| # Display last update time | |
| last_refresh = st.session_state.get('last_refresh_time', 0) | |
| if last_refresh > 0: | |
| last_update_dt = pd.Timestamp.fromtimestamp(last_refresh) | |
| # Show relative time if humanize is available | |
| try: | |
| from humanize import naturaltime | |
| # Ensure comparison is timezone-aware or naive vs naive | |
| now_ts = pd.Timestamp.now() | |
| if last_update_dt.tzinfo: | |
| now_ts = now_ts.tz_localize(last_update_dt.tzinfo) # Make 'now' aware if needed | |
| elif now_ts.tzinfo: | |
| last_update_dt = last_update_dt.tz_localize(now_ts.tzinfo) # Make 'last_update' aware | |
| time_ago = naturaltime(now_ts - last_update_dt) | |
| st.sidebar.caption(f"Prices updated: {time_ago}") | |
| st.sidebar.caption(f"({last_update_dt.strftime('%Y-%m-%d %H:%M:%S')})") | |
| except ImportError: | |
| st.sidebar.caption(f"Prices last updated: {last_update_dt.strftime('%Y-%m-%d %H:%M:%S')}") | |
| except Exception as e: # Catch potential timezone comparison errors | |
| logger.warning(f"Error calculating time ago: {e}") | |
| st.sidebar.caption(f"Prices last updated: {last_update_dt.strftime('%Y-%m-%d %H:%M:%S')}") | |
| else: | |
| st.sidebar.caption("Prices not yet loaded.") | |
| st.sidebar.caption(f"Auto-refresh approx. every {REFRESH_INTERVAL/60:.0f} mins") | |
| def portfolio_section(): | |
| st.subheader("๐ Portfolio Overview") | |
| # Check if portfolio contains any assets (crypto or cash > 0) | |
| has_assets = any(d.get('quantity', 0) > 1e-9 for s, d in st.session_state.portfolio.items()) | |
| if not has_assets: | |
| st.info("Your portfolio is empty. Use the sidebar ('Add Cash' or 'Set Quantity') to add assets.", icon="โน๏ธ") | |
| # Display total value as $0.00 explicitly when empty | |
| st.markdown( | |
| f"<h2 style='font-size: 1.2em; margin-bottom: 0;'>Total Portfolio Value (USD)</h2>" | |
| f"<h1 style='font-size: 2.0em; font-weight: bold; margin-top: 0;'>$0.00</h1>", | |
| unsafe_allow_html=True, | |
| ) | |
| return # Stop here if portfolio is empty | |
| portfolio_data = [] | |
| all_prices = st.session_state.get('all_coin_prices', {}) # Get the global prices | |
| # Ensure USD is always first if present and has quantity > 0 | |
| if 'USD' in st.session_state.portfolio and st.session_state.portfolio['USD'].get('quantity', 0.0) > 1e-9: | |
| usd_data = st.session_state.portfolio['USD'] | |
| usd_qty = usd_data.get('quantity', 0.0) | |
| portfolio_data.append({ | |
| 'Symbol': 'USD', | |
| 'Quantity': usd_qty, | |
| 'Type': 'Cash', | |
| 'Price (USD)': 1.0, # USD price is always 1 | |
| 'Value (USD)': usd_qty | |
| }) | |
| # Add crypto assets, sorted alphabetically, only if quantity > 0 | |
| crypto_symbols = sorted([s for s, d in st.session_state.portfolio.items() | |
| if s != 'USD' and d.get('type') == 'crypto' and d.get('quantity', 0) > 1e-9]) | |
| missing_price_symbols = [] | |
| for sym in crypto_symbols: | |
| data = st.session_state.portfolio[sym] | |
| quantity = data.get('quantity', 0.0) | |
| # Price lookup | |
| price = all_prices.get(sym) # Get price from global dict | |
| value = None # Default to None | |
| if price is not None and price > 0: # Ensure price is valid positive number | |
| value = quantity * price # Calculate value | |
| else: | |
| missing_price_symbols.append(sym) # Track symbols with missing/invalid prices | |
| portfolio_data.append({ | |
| 'Symbol': sym, | |
| 'Quantity': quantity, | |
| 'Type': 'Crypto', | |
| 'Price (USD)': price if price is not None else np.nan, # Use NaN for missing prices for better formatting | |
| 'Value (USD)': value if value is not None else np.nan | |
| }) | |
| if portfolio_data: | |
| df = pd.DataFrame(portfolio_data) | |
| # --- Display Summary (First 5 rows or fewer) --- | |
| display_rows = min(len(df), 5) | |
| head_df = df.head(display_rows) | |
| styled_head_df = head_df.style.format({ | |
| 'Quantity': "{:,.8f}".format, | |
| 'Price (USD)': lambda x: "${:,.4f}".format(x) if pd.notnull(x) and x > 0 else "N/A", | |
| 'Value (USD)': lambda x: "${:,.2f}".format(x) if pd.notnull(x) else "N/A", | |
| }).hide(axis="index") # Use hide index directly | |
| # Use st.dataframe for better scrolling and column config | |
| st.dataframe( | |
| styled_head_df, | |
| height= (display_rows + 1) * 35 + 3, # Dynamic height: (rows + header) * approx_row_height + border/padding | |
| use_container_width=True, | |
| # hide_index=True, # Already done in style | |
| column_order=["Symbol", "Quantity", "Price (USD)", "Value (USD)", "Type"], # Define column order | |
| column_config={ # Add tooltips and potentially widths | |
| "Symbol": st.column_config.TextColumn(width="small", help="Asset Symbol"), | |
| "Quantity": st.column_config.NumberColumn(format="%.8f", width="medium", help="Amount of the asset held"), | |
| "Type": st.column_config.TextColumn(width="small", help="Asset Type (Cash or Crypto)"), | |
| "Price (USD)": st.column_config.NumberColumn(format="$%.4f", width="medium", help="Last fetched price per unit in USD (N/A if unavailable/invalid)"), | |
| "Value (USD)": st.column_config.NumberColumn(format="$%.2f", width="medium", help="Total value in USD (Quantity * Price). N/A if price unavailable/invalid."), | |
| } | |
| ) | |
| # --- Expander for Full Portfolio (only if more than 5 assets) --- | |
| if len(df) > display_rows: | |
| # Format the entire DataFrame for the expander | |
| styled_full_df = df.style.format({ | |
| 'Quantity': "{:,.8f}".format, | |
| 'Price (USD)': lambda x: "${:,.4f}".format(x) if pd.notnull(x) and x > 0 else "N/A", | |
| 'Value (USD)': lambda x: "${:,.2f}".format(x) if pd.notnull(x) else "N/A", | |
| }).hide(axis="index") | |
| with st.expander("Show Full Portfolio"): | |
| st.dataframe( | |
| styled_full_df, | |
| use_container_width=True, | |
| # hide_index=True, # Done in style | |
| column_order=["Symbol", "Quantity", "Price (USD)", "Value (USD)", "Type"], | |
| column_config={ # Repeat config for consistency | |
| "Symbol": st.column_config.TextColumn(width="small", help="Asset Symbol"), | |
| "Quantity": st.column_config.NumberColumn(format="%.8f", width="medium", help="Amount of the asset held"), | |
| "Type": st.column_config.TextColumn(width="small", help="Asset Type (Cash or Crypto)"), | |
| "Price (USD)": st.column_config.NumberColumn(format="$%.4f", width="medium", help="Last fetched price per unit in USD (N/A if unavailable/invalid)"), | |
| "Value (USD)": st.column_config.NumberColumn(format="$%.2f", width="medium", help="Total value in USD (Quantity * Price). N/A if price unavailable/invalid."), | |
| } | |
| ) | |
| # Display warning about missing prices if any occurred | |
| if missing_price_symbols: | |
| st.caption(f"โ ๏ธ Note: Price data currently unavailable or invalid for: {', '.join(sorted(missing_price_symbols))}. Value calculated as N/A. Try refreshing prices.") | |
| else: | |
| # This case should be covered by the initial 'has_assets' check | |
| st.write("Portfolio data is currently unavailable.") | |
| # --- Display Total Value --- | |
| total_value = calculate_portfolio_value() # Recalculate based on current data | |
| st.markdown("---") # Add a visual separator | |
| st.markdown( | |
| f"<h2 style='font-size: 1.2em; margin-bottom: 0;'>Total Portfolio Value (USD)</h2>" | |
| f"<h1 style='font-size: 2.0em; font-weight: bold; margin-top: 0;'>${total_value:,.2f}</h1>" | |
| f"<p style='font-size: 0.9em; color: grey;'>Sum of USD cash and the value of crypto holdings based on the latest valid prices.</p>", | |
| unsafe_allow_html=True, | |
| ) | |
| def trade_section(): | |
| st.subheader("๐ ๏ธ Simulate Trade") | |
| # Available symbols: All known symbols from our list, excluding USD | |
| tradeable_symbols = sorted([s for s in ALL_SYMBOLS if s != 'USD']) | |
| if not tradeable_symbols: | |
| st.warning("No tradeable symbols defined (excluding USD).") | |
| return | |
| # Use columns for layout | |
| col1, col2 = st.columns([1, 2]) # Adjust column ratios if needed | |
| with col1: | |
| # Default to a common coin like BTC or the first in the list | |
| default_trade_symbol = 'BTC' if 'BTC' in tradeable_symbols else tradeable_symbols[0] | |
| # Find index robustly | |
| symbol_index = 0 | |
| try: | |
| symbol_index = tradeable_symbols.index(default_trade_symbol) | |
| except ValueError: | |
| symbol_index = 0 # Fallback | |
| symbol = st.selectbox( | |
| "Select Symbol", | |
| tradeable_symbols, | |
| index=symbol_index, | |
| key="trade_symbol", | |
| help="Select the cryptocurrency you want to trade." | |
| ) | |
| action = st.radio( | |
| "**Action**", | |
| ["BUY", "SELL"], | |
| horizontal=True, | |
| key="trade_action", | |
| label_visibility="collapsed" # Hide label if clear from context | |
| ) | |
| with col2: | |
| # Suggest a reasonable step based on typical crypto values | |
| # Use a key dependent on the symbol to potentially reset default value on symbol change | |
| trade_qty_key = f"trade_quantity_{symbol}" | |
| trade_qty = st.number_input( | |
| f"Quantity of {symbol}", | |
| min_value=0.0, | |
| value=0.01 if symbol != 'ETH' else 0.1, # Sensible default based on coin? | |
| step=0.0001, # Smaller step for more precision | |
| format="%.8f", | |
| key=trade_qty_key, | |
| help=f"Enter the amount of {symbol} to trade." | |
| ) | |
| # --- Price Display using cached data --- | |
| # Get the most recently fetched price from the session state | |
| cached_price = st.session_state.all_coin_prices.get(symbol) | |
| trade_price = None # Initialize as None | |
| # Placeholder for dynamic price info | |
| price_display_area = st.empty() | |
| if coingecko_status != 'success' and not cached_price: # If connection bad AND no cached price | |
| price_display_area.error(f"CoinGecko error ({coingecko_status}). Price unavailable. Trade disabled.", icon="๐จ") | |
| elif cached_price is not None and cached_price > 0: | |
| trade_price = cached_price # Valid price found | |
| # Display the cached price that will be used for the trade | |
| price_display_area.success(f"**Trade Execution Price ({symbol}): ${trade_price:,.4f}** (from last refresh)") | |
| # Display estimated cost/proceeds | |
| estimated_value = trade_qty * trade_price | |
| value_label = "Estimated Cost" if action == "BUY" else "Estimated Proceeds" | |
| st.caption(f"{value_label}: ${estimated_value:,.2f}") | |
| else: | |
| # If no valid price is available in the cache for this symbol | |
| status_msg = f"({coingecko_status})" if coingecko_status != 'success' else "" | |
| price_display_area.warning(f"๐จ No valid price data available for {symbol} {status_msg}. Refresh prices. Trade disabled.", icon="โ ๏ธ") | |
| # --- Execute Button --- | |
| # Enable button only if a valid positive price exists and quantity is positive | |
| can_trade = trade_price is not None and trade_price > 0 and trade_qty > 1e-9 # Use epsilon for float comparison | |
| st.button( | |
| f"Execute {action}", | |
| disabled=not can_trade, | |
| key="execute_trade_button", | |
| type="primary", | |
| help=f"Click to simulate {action.lower()}ing {trade_qty:.8f} {symbol} at ${trade_price:,.4f}" if can_trade else "Cannot execute trade (check price > 0 and quantity > 0)", | |
| on_click=execute_trade_callback, # Pass the callback function directly | |
| args=(action, symbol, trade_qty, trade_price) # Pass arguments to the callback | |
| ) | |
| # Callback function MUST be defined before it's used in st.button's on_click | |
| def execute_trade_callback(action, symbol, qty, price): | |
| """Callback function to execute the trade.""" | |
| # simulate_trade now performs checks (price validity, sufficient funds/assets) | |
| if simulate_trade(action, symbol, qty, price): | |
| # Trade succeeded (simulate_trade shows success message) | |
| st.toast(f"{action} order for {qty:.6f} {symbol} successful!", icon="๐") | |
| # Optional: Clear trade quantity input? Maybe not, user might adjust. | |
| # st.session_state[f"trade_quantity_{symbol}"] = 0.01 # Reset default? | |
| # Rerun is handled implicitly by Streamlit after callback + potential state changes | |
| else: | |
| # Trade failed (simulate_trade shows error message) | |
| st.toast(f"{action} order for {symbol} failed.", icon="โ") | |
| # No rerun needed here, error shown by simulate_trade | |
| def trade_history_section(): | |
| st.subheader("๐ Trade History") | |
| if st.session_state.trade_history: | |
| # Create DataFrame copy for manipulation | |
| try: # Add try-except for robustness if history gets corrupted | |
| hist_df = pd.DataFrame(st.session_state.trade_history).copy() | |
| # Ensure Timestamp is datetime and timezone-aware (should be from pd.Timestamp.now(tz='UTC')) | |
| hist_df['Timestamp'] = pd.to_datetime(hist_df['Timestamp'], utc=True) | |
| # Sort by Timestamp descending (most recent first) | |
| hist_df = hist_df.sort_values('Timestamp', ascending=False) | |
| # Prepare columns for display: Combine Cost/Proceeds or handle NaN | |
| hist_df['Value (USD)'] = hist_df.apply( | |
| lambda row: row.get('Cost (USD)') if row['Type'] == 'BUY' else row.get('Proceeds (USD)'), | |
| axis=1 | |
| ) | |
| # Drop original Cost/Proceeds if combined | |
| # hist_df = hist_df.drop(columns=['Cost (USD)', 'Proceeds (USD)'], errors='ignore') | |
| # --- Display Summary (Last 5 trades) --- | |
| display_rows = min(len(hist_df), 5) | |
| head_df = hist_df.head(display_rows) | |
| styled_head_df = head_df.style.format({ | |
| 'Timestamp': '{:%Y-%m-%d %H:%M:%S %Z}'.format, # Include timezone | |
| 'Quantity': "{:,.8f}".format, | |
| 'Price (USD)': "${:,.4f}".format, | |
| 'Cost (USD)': lambda x: "${:,.2f}".format(x) if pd.notnull(x) else "-", # Keep separate Cost/Proceeds | |
| 'Proceeds (USD)': lambda x: "${:,.2f}".format(x) if pd.notnull(x) else "-", | |
| # 'Value (USD)': "${:,.2f}".format, # Format if using combined column | |
| }).map( | |
| lambda action: 'color: green' if action=='BUY' else ('color: red' if action=='SELL' else ''), | |
| subset=['Type'] | |
| ).hide(axis="index") | |
| # Display the last 5 trades with dynamic height | |
| st.dataframe( | |
| styled_head_df, | |
| height= (display_rows + 1) * 35 + 3, # Dynamic height | |
| use_container_width=True, | |
| column_order=("Timestamp", "Type", "Symbol", "Quantity", "Price (USD)", "Cost (USD)", "Proceeds (USD)"), # Adjusted order | |
| column_config={ | |
| "Timestamp": st.column_config.DatetimeColumn("Time", help="Time of trade execution (UTC)"), | |
| "Type": st.column_config.TextColumn("Action", help="BUY (Green) or SELL (Red)"), | |
| "Symbol": st.column_config.TextColumn("Coin", help="Cryptocurrency traded"), | |
| "Quantity": st.column_config.NumberColumn("Amount", format="%.8f", help="Quantity of the coin traded"), | |
| "Price (USD)": st.column_config.NumberColumn("Executed Price", format="$%.4f", help="Price per coin at execution"), | |
| "Cost (USD)": st.column_config.NumberColumn("Cost", format="$%.2f", help="Total USD spent (for BUYs)"), | |
| "Proceeds (USD)": st.column_config.NumberColumn("Proceeds", format="$%.2f", help="Total USD received (for SELLs)"), | |
| # "Value (USD)": st.column_config.NumberColumn("Value", format="$%.2f", help="Total USD Cost (BUY) or Proceeds (SELL)"), # Config for combined column | |
| } | |
| ) | |
| # --- Expander for Full History (if more than 5 trades) --- | |
| if len(hist_df) > display_rows: | |
| # Format the entire DataFrame for the expander | |
| styled_full_df = hist_df.style.format({ | |
| 'Timestamp': '{:%Y-%m-%d %H:%M:%S %Z}'.format, | |
| 'Quantity': "{:,.8f}".format, | |
| 'Price (USD)': "${:,.4f}".format, | |
| 'Cost (USD)': lambda x: "${:,.2f}".format(x) if pd.notnull(x) else "-", | |
| 'Proceeds (USD)': lambda x: "${:,.2f}".format(x) if pd.notnull(x) else "-", | |
| # 'Value (USD)': "${:,.2f}".format, | |
| }).map( | |
| lambda action: 'color: green' if action=='BUY' else ('color: red' if action=='SELL' else ''), | |
| subset=['Type'] | |
| ).hide(axis="index") | |
| with st.expander("Show Full Trade History"): | |
| st.dataframe( | |
| styled_full_df, | |
| use_container_width=True, | |
| column_order=("Timestamp", "Type", "Symbol", "Quantity", "Price (USD)", "Cost (USD)", "Proceeds (USD)"), | |
| column_config={ # Repeat config | |
| "Timestamp": st.column_config.DatetimeColumn("Time", help="Time of trade execution (UTC)"), | |
| "Type": st.column_config.TextColumn("Action", help="BUY (Green) or SELL (Red)"), | |
| "Symbol": st.column_config.TextColumn("Coin", help="Cryptocurrency traded"), | |
| "Quantity": st.column_config.NumberColumn("Amount", format="%.8f", help="Quantity of the coin traded"), | |
| "Price (USD)": st.column_config.NumberColumn("Executed Price", format="$%.4f", help="Price per coin at execution"), | |
| "Cost (USD)": st.column_config.NumberColumn("Cost", format="$%.2f", help="Total USD spent (for BUYs)"), | |
| "Proceeds (USD)": st.column_config.NumberColumn("Proceeds", format="$%.2f", help="Total USD received (for SELLs)"), | |
| # "Value (USD)": st.column_config.NumberColumn("Value", format="$%.2f", help="Total USD Cost (BUY) or Proceeds (SELL)"), | |
| } | |
| ) | |
| except Exception as e: | |
| st.error(f"Error displaying trade history: {e}") | |
| logger.error(f"Failed to process or display trade history: {e}", exc_info=True) | |
| else: | |
| st.caption("No trades recorded yet.") # Displayed only when history is empty | |
| def recommendation_section(): | |
| st.subheader("๐ก AI Trade Recommendations") # Moved subheader here | |
| if gemini_model and gemini_status == "success": | |
| # Get current crypto holdings (symbols only) for news fetch | |
| portfolio_cryptos = [s for s, d in st.session_state.portfolio.items() | |
| if d.get('type') == 'crypto' and d.get('quantity', 0) > 1e-9] | |
| ctx = st.text_area( | |
| "Market Context or ideas you want included (Optional):", | |
| height=100, | |
| key="ai_context", | |
| placeholder="e.g., Overall market sentiment (bullish/bearish), specific events (upcoming ETF decision), personal risk tolerance (conservative/aggressive)..." | |
| ) | |
| # Prepare args for the callback | |
| callback_args = { | |
| "market_context": ctx, | |
| "portfolio_symbols": portfolio_cryptos | |
| } | |
| # Use a state variable to store the recommendation result | |
| if 'ai_recommendation_result' not in st.session_state: | |
| st.session_state.ai_recommendation_result = None | |
| if 'ai_recommendation_running' not in st.session_state: | |
| st.session_state.ai_recommendation_running = False | |
| # Button to trigger the process | |
| if st.button("๐ค Get AI Recommendation", key="ai_rec_button", disabled=st.session_state.ai_recommendation_running): | |
| st.session_state.ai_recommendation_running = True | |
| st.session_state.ai_recommendation_result = None # Clear previous result | |
| st.rerun() # Rerun to show spinner and execute logic below | |
| # Logic to run when button is clicked (after rerun) | |
| if st.session_state.ai_recommendation_running: | |
| with st.spinner("๐ง Gathering data and consulting the AI..."): | |
| try: | |
| # 1. Fetch Yahoo news | |
| news_map = get_coin_news(callback_args["portfolio_symbols"]) | |
| # 2. Fetch Linkup research | |
| linkup_research = get_linkup_news(callback_args["portfolio_symbols"]) | |
| # 3. Call the Gemini recommendation function | |
| recommendation = get_gemini_recommendation( | |
| callback_args["market_context"], | |
| news_map, | |
| linkup_research | |
| ) | |
| # Store result in session state | |
| st.session_state.ai_recommendation_result = recommendation | |
| except Exception as e: | |
| st.error(f"Failed to get recommendation: {e}") | |
| logger.error(f"Unhandled error during AI recommendation generation: {e}", exc_info=True) | |
| st.session_state.ai_recommendation_result = f"Error: {e}" # Store error message | |
| finally: | |
| # Reset running flag regardless of success/failure | |
| st.session_state.ai_recommendation_running = False | |
| st.rerun() # Rerun to display the result or error | |
| # Display the stored recommendation (or error) | |
| if st.session_state.ai_recommendation_result: | |
| st.markdown("---")# Divider | |
| st.markdown("##### AI Advisor Says:") | |
| if "Error:" in st.session_state.ai_recommendation_result or "blocked by safety filters" in st.session_state.ai_recommendation_result : | |
| st.error(st.session_state.ai_recommendation_result) | |
| else: | |
| st.markdown(st.session_state.ai_recommendation_result) | |
| # --- PDF Download Button --- | |
| # Display button only if recommendation was successful | |
| try: | |
| # Generate PDF from the successful recommendation text | |
| pdf_buffer = create_pdf_report(st.session_state.ai_recommendation_result) | |
| st.download_button( | |
| label="โฌ๏ธ Download Report as PDF", | |
| data=pdf_buffer, # Use getvalue() if BytesIO, or direct bytes | |
| file_name=f"AI_Crypto_Rec_{datetime.now().strftime('%Y%m%d_%H%M')}.pdf", | |
| mime="application/pdf", | |
| key="pdf_download_button" | |
| ) | |
| except Exception as pdf_e: | |
| st.error(f"Error generating PDF: {str(pdf_e)}") | |
| logger.error(f"Failed to create PDF report: {pdf_e}", exc_info=True) | |
| else: | |
| st.warning(f"AI Recommendations disabled: {gemini_status}", icon="โ ๏ธ") | |
| # --- NEW: Performance Analysis Tab Section --- | |
| import streamlit as st | |
| import pandas as pd | |
| import numpy as np | |
| import plotly.graph_objects as go | |
| from pycoingecko import CoinGeckoAPI # Assuming cg is globally available/initialized | |
| import logging | |
| from datetime import datetime, timedelta | |
| # --- Assuming these are defined globally --- | |
| logger = logging.getLogger(__name__) | |
| # cg = CoinGeckoAPI() # Or retrieved from st.session_state.cg | |
| # COINGECKO_IDS = {'BTC': 'bitcoin', 'ETH': 'ethereum', ...} | |
| # SYMBOL_MAP = {'bitcoin': 'BTC', 'ethereum': 'ETH', ...} | |
| # ALL_SYMBOLS = ['BTC', 'ETH', ...] | |
| # coingecko_status = "success" # Or retrieved from st.session_state | |
| # --- CORRECTED CACHED FUNCTION --- | |
| # Cache for 10 minutes | |
| def fetch_historical_data(_cg_ref, coin_id, days): | |
| """ | |
| Fetches historical market data for a single coin ID from CoinGecko. | |
| Returns a DataFrame on success, None on failure. | |
| Uses _cg_ref as a dummy argument to help with cache invalidation if needed. | |
| Corrected merge_asof tolerance. | |
| """ | |
| # Access the global client (ensure it's available in the scope where this runs) | |
| global cg | |
| local_cg = None | |
| if 'cg' in st.session_state and st.session_state.cg: | |
| local_cg = st.session_state.cg | |
| elif 'cg' in globals() and cg: | |
| local_cg = cg # Use the global cg if defined | |
| else: | |
| logger.error("CoinGecko client 'cg' not found in fetch_historical_data scope.") | |
| return None # Cannot proceed without client | |
| if not local_cg: | |
| logger.warning(f"CoinGecko client not available for historical data fetch (ID: {coin_id}).") | |
| return None | |
| try: | |
| logger.info(f"Fetching historical data for {coin_id} ({days} days)...") | |
| # API uses 'max' for days, but requires specific 'days' value for interval logic | |
| fetch_days_str = str(days) if days != 'max' else 'max' | |
| # Determine a numerical day count for interval logic, default large value for 'max' | |
| # Coingecko often limits 'max' history, but using a large number helps interval choice | |
| api_days_val = 365 * 10 if days == 'max' else int(days) | |
| # Determine interval: daily for > 90 days, hourly otherwise (default) | |
| interval = 'daily' if api_days_val > 90 else None # None defaults to hourly on CoinGecko API | |
| chart_data = local_cg.get_coin_market_chart_by_id( | |
| id=coin_id, | |
| vs_currency='usd', | |
| days=fetch_days_str, # API expects string 'max' or number as string | |
| interval=interval | |
| ) | |
| # Store prices and volumes if needed | |
| prices = chart_data.get('prices', []) | |
| volumes = chart_data.get('total_volumes', []) | |
| if not prices: | |
| logger.warning(f"No price data returned for {coin_id} ({fetch_days_str} days).") | |
| return None # Return None if no prices | |
| # Create DataFrame for prices | |
| df = pd.DataFrame(prices, columns=['timestamp_ms', 'price']) | |
| df['timestamp'] = pd.to_datetime(df['timestamp_ms'], unit='ms', utc=True) | |
| # Add volume data if available | |
| if volumes: | |
| vol_df = pd.DataFrame(volumes, columns=['timestamp_ms_vol', 'volume']) | |
| # --- FIX: Calculate tolerance in MILLISECONDS --- | |
| if interval == 'daily': | |
| # For daily data, allow merging points within +/- 1 day | |
| tolerance_ms = 24 * 60 * 60 * 1000 | |
| else: | |
| # For hourly data, allow merging points within +/- 12 hours | |
| # (Adjust if needed based on typical data alignment) | |
| tolerance_ms = 12 * 60 * 60 * 1000 | |
| # Check if volume data is reasonably close in length | |
| if abs(len(vol_df) - len(df)) < (len(df) * 0.1): # e.g., less than 10% difference | |
| logger.info(f"Attempting merge_asof for {coin_id} with tolerance {tolerance_ms} ms.") | |
| # Merge using the integer tolerance | |
| df = pd.merge_asof( | |
| df.sort_values('timestamp_ms'), | |
| vol_df[['timestamp_ms_vol', 'volume']].sort_values('timestamp_ms_vol'), | |
| left_on='timestamp_ms', | |
| right_on='timestamp_ms_vol', | |
| direction='nearest', # Find closest timestamp within tolerance | |
| tolerance=tolerance_ms # USE INTEGER MS TOLERANCE | |
| ) | |
| df = df.drop(columns=['timestamp_ms_vol'], errors='ignore') | |
| logger.info(f"Merge successful for {coin_id}.") | |
| else: | |
| logger.warning(f"Volume data length mismatch too large for {coin_id} " | |
| f"(Prices: {len(prices)}, Volumes: {len(volumes)}). Skipping volume merge.") | |
| df['volume'] = np.nan # Add empty volume column | |
| else: | |
| logger.info(f"No volume data returned for {coin_id}. Adding NaN volume column.") | |
| df['volume'] = np.nan # Add volume column even if volumes list is empty | |
| # Final DataFrame structure | |
| df = df[['timestamp', 'price', 'volume']].set_index('timestamp') | |
| # Optional: Handle potential duplicate timestamps if merge created any (unlikely with 'nearest') | |
| df = df[~df.index.duplicated(keep='first')] | |
| logger.info(f"Successfully processed {len(df)} data points for {coin_id}.") | |
| return df | |
| except Exception as e: | |
| logger.error(f"Error processing historical data for {coin_id}: {e}", exc_info=True) | |
| # Return None to indicate failure | |
| return None | |
| # --- Coin performance TAB FUNCTION --- | |
| def performance_analysis_tab(): | |
| st.subheader("๐ Coin Performance Analysis") | |
| # Use status from session state if available | |
| current_coingecko_status = st.session_state.get('coingecko_status', 'Not Initialized') | |
| if current_coingecko_status != "success": | |
| st.warning(f"CoinGecko API not available ({current_coingecko_status}). Performance analysis disabled.", icon="โ ๏ธ") | |
| return | |
| # --- User Selections --- | |
| # Ensure ALL_SYMBOLS is available and sorted | |
| available_symbols = sorted(st.session_state.get('ALL_SYMBOLS', list(COINGECKO_IDS.keys()))) | |
| if not available_symbols: | |
| st.error("No symbols available for analysis.") | |
| return | |
| default_selection = [] | |
| if 'BTC' in available_symbols: default_selection.append('BTC') | |
| if 'ETH' in available_symbols: default_selection.append('ETH') | |
| selected_symbols = st.multiselect( | |
| "Select Coins to Analyze:", | |
| options=available_symbols, | |
| default=default_selection, | |
| key="perf_symbols_select", | |
| help="Choose one or more cryptocurrencies to compare their performance." | |
| ) | |
| # Select Time Range | |
| time_range_options = { | |
| "1 Day": 1, "7 Days": 7, "30 Days": 30, | |
| "90 Days": 90, "1 Year": 365, "Max": 'max' | |
| } | |
| selected_range_label = st.selectbox( | |
| "Select Time Range:", | |
| options=list(time_range_options.keys()), | |
| index=2, # Default to 30 Days | |
| key="perf_range_select" | |
| ) | |
| days_to_fetch = time_range_options[selected_range_label] | |
| if not selected_symbols: | |
| st.info("Select one or more coins to visualize their performance.", icon="๐") | |
| return | |
| # --- Data Fetching and Processing --- | |
| all_perf_data = {} | |
| valid_symbols = [] | |
| failed_symbols = [] # Keep track of symbols that failed to load | |
| # Use a dummy value for cache invalidation if needed | |
| cg_ref = id(st.session_state.cg) if 'cg' in st.session_state and st.session_state.cg else None | |
| # Fetch data sequentially (can be slow for many coins) | |
| # Consider asyncio for parallel fetching if performance becomes an issue | |
| # Spinner applied by @st.cache_data decorator | |
| for symbol in selected_symbols: | |
| coin_id = COINGECKO_IDS.get(symbol) | |
| if not coin_id: | |
| st.warning(f"Could not find CoinGecko ID for {symbol}. Skipping.") | |
| continue | |
| # Fetch data using the cached function | |
| hist_data = fetch_historical_data(cg_ref, coin_id, days_to_fetch) | |
| if hist_data is not None and not hist_data.empty: | |
| all_perf_data[symbol] = hist_data | |
| valid_symbols.append(symbol) # Keep track of symbols with data | |
| else: | |
| # Logged already by fetch_historical_data on error | |
| logger.warning(f"No valid historical data obtained for {symbol} ({coin_id}).") | |
| failed_symbols.append(symbol) | |
| # *** Display toast HERE, outside the cached function *** | |
| st.toast(f"โ ๏ธ Failed to load data for {symbol}. It might be unavailable for the selected range.", icon="๐") | |
| if not all_perf_data: | |
| st.error("Could not load historical data for any selected coin. Please try again later or select different coins/range.", icon="โ") | |
| return | |
| # --- Performance Calculation & Visualization --- | |
| st.markdown("#### Performance Comparison (Normalized)") | |
| st.caption("Shows percentage change relative to the start of the selected period.") | |
| fig_norm = go.Figure() | |
| fig_price = go.Figure() | |
| fig_vol = go.Figure() # Figure for volume | |
| performance_metrics = [] | |
| # Find the common start date across all successfully fetched datasets | |
| try: | |
| common_start_date = max(df.index.min() for df in all_perf_data.values() if not df.empty) | |
| except ValueError: | |
| # Handle case where all_perf_data is empty or contains only empty dataframes | |
| st.error("No valid data points found to determine a common start date.", icon="โ") | |
| return | |
| for symbol in valid_symbols: # Iterate only symbols with data | |
| df = all_perf_data[symbol] | |
| # Filter data starting from the common start date | |
| df_filtered = df[df.index >= common_start_date].copy() | |
| if df_filtered.empty: | |
| logger.warning(f"No data found for {symbol} after common start date {common_start_date}. Skipping plots for this symbol.") | |
| continue # Skip this symbol if no data in the common range | |
| # --- Calculate Metrics & Plot --- | |
| start_price = df_filtered['price'].iloc[0] | |
| end_price = df_filtered['price'].iloc[-1] | |
| # Normalized Price (Percentage Change) | |
| if start_price is not None and start_price > 0: # Avoid division by zero/None | |
| df_filtered['normalized'] = ((df_filtered['price'] / start_price) - 1) * 100 | |
| period_change = df_filtered['normalized'].iloc[-1] # Get change from normalized column | |
| # Add trace for normalized performance | |
| fig_norm.add_trace(go.Scatter( | |
| x=df_filtered.index, | |
| y=df_filtered['normalized'], | |
| mode='lines', | |
| name=f"{symbol} % Change", | |
| hovertemplate=f'<b>{symbol}</b><br>%{{y:.2f}}%<br>%{{x|%Y-%m-%d %H:%M}}<extra></extra>' | |
| )) | |
| else: | |
| df_filtered['normalized'] = 0 # Set to 0 if start price is 0 or None | |
| period_change = 0.0 | |
| logger.warning(f"Start price for {symbol} is zero or invalid ({start_price}), cannot calculate percentage change accurately.") | |
| # Add trace for raw price | |
| fig_price.add_trace(go.Scatter( | |
| x=df_filtered.index, | |
| y=df_filtered['price'], | |
| mode='lines', | |
| name=f"{symbol} Price", | |
| yaxis="y", # Assign to the primary y-axis | |
| hovertemplate=f'<b>{symbol}</b><br>${{y:,.4f}}<br>%{{x|%Y-%m-%d %H:%M}}<extra></extra>' | |
| )) | |
| # Add trace for volume (use secondary y-axis if needed) | |
| # Check if volume column exists and has non-NA values | |
| if 'volume' in df_filtered.columns and df_filtered['volume'].notna().any(): | |
| fig_vol.add_trace(go.Bar( | |
| x=df_filtered.index, | |
| y=df_filtered['volume'], | |
| name=f"{symbol} Volume", | |
| opacity=0.6, # Make bars slightly transparent | |
| # marker_color='rgba(100, 100, 255, 0.6)', # Assign unique colors later if needed | |
| yaxis="y", | |
| hovertemplate=f'<b>{symbol} Vol</b><br>%{{y:,.0f}}<br>%{{x|%Y-%m-%d %H:%M}}<extra></extra>' | |
| )) | |
| # Get current price from global cache for reference | |
| # Ensure all_coin_prices is available | |
| all_prices = st.session_state.get('all_coin_prices', {}) | |
| current_price = all_prices.get(symbol, np.nan) | |
| performance_metrics.append({ | |
| 'Symbol': symbol, | |
| f'{selected_range_label} Change %': period_change, | |
| 'Start Price (USD)': start_price, | |
| 'End Price (USD)': end_price, | |
| 'Current Price (USD)': current_price # Add current price for reference | |
| }) | |
| # --- Display Charts --- | |
| if not fig_norm.data: # Check if any traces were added | |
| st.warning("No data available for plotting normalized performance in the selected range.") | |
| else: | |
| # Customize and display normalized performance chart | |
| fig_norm.update_layout( | |
| title=f"Normalized Performance ({selected_range_label})", | |
| xaxis_title="Date", | |
| yaxis_title="Percentage Change (%)", | |
| hovermode="x unified", | |
| legend_title_text='Coins', | |
| height=450, | |
| margin=dict(l=20, r=20, t=40, b=20) # Tighten margins | |
| ) | |
| st.plotly_chart(fig_norm, use_container_width=True) | |
| # --- Metrics Table --- | |
| if performance_metrics: | |
| metrics_df = pd.DataFrame(performance_metrics).set_index('Symbol') | |
| st.markdown("#### Key Metrics") | |
| st.dataframe(metrics_df.style.format({ | |
| f'{selected_range_label} Change %': "{:,.2f}%", | |
| 'Start Price (USD)': "${:,.4f}", | |
| 'End Price (USD)': "${:,.4f}", | |
| 'Current Price (USD)': lambda x: "${:,.4f}".format(x) if pd.notnull(x) else "N/A", | |
| }).apply( # Use apply for coloring based on value | |
| lambda col: ['color: green' if isinstance(v, (int, float)) and v > 0 else ('color: red' if isinstance(v, (int, float)) and v < 0 else '') for v in col], | |
| subset=[f'{selected_range_label} Change %'], | |
| axis=0 # Apply column-wise | |
| ), | |
| use_container_width=True | |
| ) | |
| # --- Combined Price and Volume Chart --- | |
| st.markdown("---") | |
| chart_tabs = st.tabs(["๐ Price Chart", "๐ Volume Chart"]) | |
| with chart_tabs[0]: | |
| if not fig_price.data: | |
| st.warning("No data available for plotting price history in the selected range.") | |
| else: | |
| # Customize and display raw price chart | |
| fig_price.update_layout( | |
| title=f"Price History ({selected_range_label})", | |
| xaxis_title="Date", | |
| yaxis_title="Price (USD)", | |
| hovermode="x unified", | |
| legend_title_text='Coins', | |
| height=450, | |
| margin=dict(l=20, r=20, t=40, b=20) | |
| ) | |
| st.plotly_chart(fig_price, use_container_width=True) | |
| with chart_tabs[1]: | |
| if not fig_vol.data: | |
| st.warning("No volume data available for plotting in the selected range.") | |
| else: | |
| # Customize and display volume chart | |
| fig_vol.update_layout( | |
| title=f"Trading Volume ({selected_range_label})", | |
| xaxis_title="Date", | |
| yaxis_title="Volume (USD)", | |
| hovermode="x unified", | |
| legend_title_text='Coins', | |
| height=450, | |
| barmode='group', # Group bars if multiple coins have volume | |
| margin=dict(l=20, r=20, t=40, b=20) | |
| ) | |
| # Assign distinct colors if multiple volume traces exist | |
| # colors = ['rgba(99, 110, 250, 0.7)', 'rgba(239, 85, 59, 0.7)', 'rgba(0, 204, 150, 0.7)', ...] | |
| # for i, trace in enumerate(fig_vol.data): | |
| # trace.marker.color = colors[i % len(colors)] | |
| st.plotly_chart(fig_vol, use_container_width=True) | |
| # Display symbols that failed to load, if any | |
| if failed_symbols: | |
| st.caption(f"โ ๏ธ Note: Data could not be loaded for: {', '.join(failed_symbols)}. They are excluded from the charts and metrics.") | |
| # --- Main App Layout & Flow --- | |
| def main(): | |
| st.header("๐ Adriel & Ashriel's Crypto App") # Slightly adjusted title | |
| st.markdown("(Powered by CoinGecko, Google Gemini & Linkup)") | |
| with st.expander("Welcome to our Crypto Trading Simulator๐ฅ", expanded=False): | |
| st.info( | |
| """ | |
| Welcome! Simulate crypto trading, manage your virtual portfolio, get AI-driven insights, and analyze coin performance. | |
| **Sidebar Controls (< icon top-left if hidden):** | |
| * **Manage Portfolio:** Add cash, add/update/remove crypto assets. | |
| * **Status & Sync:** Check API connections, refresh prices manually. | |
| **Tabs:** | |
| * **๐ Portfolio Management & AI:** | |
| * View your current holdings and total value. | |
| * Simulate BUY/SELL trades using the *most recently refreshed price*. | |
| * Review your trade history. | |
| * Generate AI-powered trade recommendations and market analysis. | |
| * **๐ Coin Performance Analysis:** | |
| * Select coins and a time range (1D, 7D, 30D, 1Y, etc.). | |
| * Visualize normalized performance (percentage change) to compare assets. | |
| * View price and volume charts for selected coins. | |
| * See key performance metrics for the chosen period. | |
| """, | |
| icon="๐ช" | |
| ) | |
| # --- Periodic Price Refresh Check --- | |
| # Perform this check early in the script run | |
| if should_update_prices() and coingecko_status == "success": | |
| logger.info("Refresh interval reached. Updating all prices...") | |
| # No spinner here as it should be quick and in the background | |
| if update_prices(): # Fetch prices (updates status on success/failure) | |
| st.toast("Auto-refreshing prices...", icon="โณ") | |
| # Use st.rerun() to ensure UI updates after background refresh | |
| st.rerun() | |
| else: | |
| st.toast("Auto-refresh failed. Using cached prices.", icon="โ ๏ธ") # Notify user if background refresh fails | |
| # --- Render Sidebar --- | |
| # Sidebar is rendered outside the tabs as it controls global state | |
| sidebar_section() | |
| # --- Define Tabs --- | |
| tab1, tab2 = st.tabs(["๐ Portfolio Management & AI", "๐ Coin Performance Analysis"]) | |
| # --- Content for Tab 1: Portfolio Management & AI --- | |
| with tab1: | |
| # Main area layout using columns for better organization | |
| col_left, col_right = st.columns(2) # 50/50 split | |
| with col_left: | |
| portfolio_section() | |
| with col_right: | |
| trade_section() | |
| # Areas below the 50/50 split, each taking full width within the tab | |
| st.divider() | |
| trade_history_section() | |
| st.divider() | |
| recommendation_section() | |
| # --- Content for Tab 2: Coin Performance Analysis --- | |
| with tab2: | |
| performance_analysis_tab() # Call the function dedicated to this tab | |
| # --- Footer --- | |
| st.divider() | |
| st.caption("App created by Adriel & Ashriel, All rights reserved. Simulation purposes only. Market data from CoinGecko may have latency. Not financial advice.") | |
| if __name__ == "__main__": | |
| # Optional: Add imports needed only here (like humanize if not already imported) | |
| try: | |
| import humanize | |
| except ImportError: | |
| logger.info("humanize library not found, relative time display disabled.") | |
| pass # humanize is optional | |
| main() |