Spaces:
Sleeping
Sleeping
| """ | |
| Indicator Agent for technical indicator calculation and interpretation. | |
| This agent computes technical indicators (RSI, MACD, Stochastic) and provides | |
| interpretation of their values for trading decisions. | |
| """ | |
| import json | |
| import logging | |
| import time | |
| from typing import Any, Dict, Optional | |
| import pandas as pd | |
| from langchain_core.messages import HumanMessage, SystemMessage | |
| # Configure logger | |
| logger = logging.getLogger(__name__) | |
| from config.default_config import DEFAULT_CONFIG | |
| from config.models import AGENT_MODELS | |
| from config.prompt_templates import INDICATOR_AGENT_PROMPT | |
| from graph.state.agent_state import add_agent_message, update_analysis_result | |
| from graph.state.trading_state import TechnicalWorkflowState | |
| from utils.charts.chart_generator import ChartGenerator | |
| from utils.formatters.educational_content import ( | |
| generate_macd_explanation, | |
| generate_rsi_explanation, | |
| generate_stochastic_explanation, | |
| ) | |
| from utils.indicators import calculate_macd, calculate_rsi, calculate_stochastic | |
| from utils.indicators.macd import ( | |
| find_macd_crossovers, | |
| find_macd_divergence, | |
| interpret_macd, | |
| ) | |
| from utils.indicators.rsi import find_rsi_divergence, interpret_rsi | |
| from utils.indicators.stochastic import find_stochastic_crossovers, interpret_stochastic | |
| from utils.investment_style_helpers import ( | |
| get_investment_style_from_state, | |
| get_technical_analysis_style_context, | |
| ) | |
| from utils.llm.provider_factory import LLMProviderFactory | |
| class IndicatorAgent: | |
| """ | |
| Technical Indicator Agent. | |
| Responsibilities: | |
| - Calculate RSI, MACD, Stochastic Oscillator | |
| - Interpret indicator values (overbought/oversold, bullish/bearish) | |
| - Detect divergences and crossovers | |
| - Provide trading signals based on indicators | |
| """ | |
| AGENT_NAME = "indicator_agent" | |
| def __init__(self, config: Optional[Dict[str, Any]] = None): | |
| """ | |
| Initialize Indicator Agent. | |
| Args: | |
| config: Optional configuration override | |
| """ | |
| self.config = config or DEFAULT_CONFIG | |
| # Initialize LLM - use runtime provider override if available | |
| from config.models import DEFAULT_MODELS_BY_PROVIDER | |
| model_config = AGENT_MODELS[self.AGENT_NAME] | |
| runtime_provider = self.config.get("llm_provider", model_config["provider"]) | |
| # If provider is overridden but model is not, use default model for that provider | |
| if "llm_provider" in self.config and "llm_model" not in self.config: | |
| runtime_model = DEFAULT_MODELS_BY_PROVIDER.get( | |
| runtime_provider, model_config["model"] | |
| ) | |
| else: | |
| runtime_model = self.config.get("llm_model", model_config["model"]) | |
| self.llm = LLMProviderFactory.create( | |
| provider=runtime_provider, | |
| model=runtime_model, | |
| temperature=model_config["temperature"], | |
| ) | |
| # Indicator parameters | |
| self.indicator_params = self.config["indicator_parameters"] | |
| # Initialize chart generator | |
| self.chart_generator = ChartGenerator() | |
| def run(self, state: TechnicalWorkflowState) -> TechnicalWorkflowState: | |
| """ | |
| Execute indicator analysis. | |
| Args: | |
| state: Current workflow state | |
| Returns: | |
| Updated state with indicator analysis | |
| """ | |
| start_time = time.time() | |
| ticker = state.get("ticker", "UNKNOWN") | |
| timeframe = state.get("timeframe", "UNKNOWN") | |
| logger.info( | |
| json.dumps( | |
| { | |
| "agent": self.AGENT_NAME, | |
| "action": "start", | |
| "ticker": ticker, | |
| "timeframe": timeframe, | |
| "timestamp": time.time(), | |
| } | |
| ) | |
| ) | |
| try: | |
| # Extract market data | |
| market_data = state["market_data"] | |
| if not market_data.get("ohlc_data"): | |
| raise ValueError("No OHLC data available for indicator calculation") | |
| # Convert serialized DataFrame back to pandas DataFrame | |
| df = self._deserialize_dataframe(market_data["ohlc_data"]) | |
| # Calculate indicators | |
| indicators_result = self._calculate_indicators(df) | |
| # Get investment style from state | |
| investment_style = get_investment_style_from_state(state) | |
| # Generate charts and educational notes (User Story 5) | |
| chart_paths = [] | |
| educational_notes = [] | |
| try: | |
| # Check if educational mode is enabled | |
| config = state.get("config", {}) | |
| educational_mode = ( | |
| config.get("educational_mode", False) | |
| if isinstance(config, dict) | |
| else False | |
| ) | |
| # Generate RSI chart | |
| if "rsi" in indicators_result and "value" in indicators_result["rsi"]: | |
| rsi_value = indicators_result["rsi"]["value"] | |
| rsi_series_dict = indicators_result["rsi"].get("series") | |
| if rsi_series_dict is not None: | |
| # Convert dict back to Series | |
| rsi_series = pd.Series(rsi_series_dict) | |
| fig, filepath = self.chart_generator.generate_rsi_chart( | |
| df=df, | |
| rsi_series=rsi_series, | |
| ticker=ticker, | |
| timeframe=timeframe, | |
| rsi_period=self.indicator_params["rsi_period"], | |
| save=True, | |
| ) | |
| if filepath: | |
| chart_paths.append(filepath) | |
| self.chart_generator.close_figure(fig) | |
| # Add educational note if enabled | |
| if educational_mode: | |
| educational_notes.append( | |
| f"**RSI**: {generate_rsi_explanation(rsi_value)}" | |
| ) | |
| # Generate MACD chart | |
| if "macd" in indicators_result: | |
| macd_data = indicators_result["macd"] | |
| series_dict = macd_data.get("series", {}) | |
| logger.info( | |
| f"MACD data available: series_dict keys = {list(series_dict.keys()) if series_dict else 'None'}" | |
| ) | |
| if ( | |
| series_dict | |
| and "macd" in series_dict | |
| and "signal" in series_dict | |
| and "histogram" in series_dict | |
| ): | |
| # Convert dicts back to Series | |
| macd_series = pd.Series(series_dict["macd"]) | |
| signal_series = pd.Series(series_dict["signal"]) | |
| histogram_series = pd.Series(series_dict["histogram"]) | |
| logger.info( | |
| f"Generating MACD chart: macd_len={len(macd_series)}, signal_len={len(signal_series)}, hist_len={len(histogram_series)}" | |
| ) | |
| fig, filepath = self.chart_generator.generate_macd_chart( | |
| df=df, | |
| macd=macd_series, | |
| signal=signal_series, | |
| histogram=histogram_series, | |
| ticker=ticker, | |
| timeframe=timeframe, | |
| save=True, | |
| ) | |
| logger.info(f"MACD chart generated: filepath={filepath}") | |
| if filepath: | |
| chart_paths.append(filepath) | |
| self.chart_generator.close_figure(fig) | |
| else: | |
| logger.warning( | |
| f"MACD chart skipped - missing series data. series_dict keys: {list(series_dict.keys()) if series_dict else 'None'}" | |
| ) | |
| # Add educational note if enabled | |
| if ( | |
| educational_mode | |
| and "macd" in macd_data | |
| and "signal" in macd_data | |
| and "histogram" in macd_data | |
| ): | |
| educational_notes.append( | |
| f"**MACD**: {generate_macd_explanation(macd_data['macd'], macd_data['signal'], macd_data['histogram'])}" | |
| ) | |
| # Generate Stochastic chart | |
| if "stochastic" in indicators_result: | |
| stoch_data = indicators_result["stochastic"] | |
| series_dict = stoch_data.get("series", {}) | |
| if series_dict and "k" in series_dict and "d" in series_dict: | |
| # Convert dicts back to Series | |
| k_series = pd.Series(series_dict["k"]) | |
| d_series = pd.Series(series_dict["d"]) | |
| fig, filepath = self.chart_generator.generate_stochastic_chart( | |
| df=df, | |
| k_series=k_series, | |
| d_series=d_series, | |
| ticker=ticker, | |
| timeframe=timeframe, | |
| save=True, | |
| ) | |
| if filepath: | |
| chart_paths.append(filepath) | |
| self.chart_generator.close_figure(fig) | |
| # Add educational note if enabled | |
| if educational_mode and "k" in stoch_data and "d" in stoch_data: | |
| educational_notes.append( | |
| f"**Stochastic**: {generate_stochastic_explanation(stoch_data['k'], stoch_data['d'])}" | |
| ) | |
| except Exception as chart_error: | |
| logger.warning( | |
| json.dumps( | |
| { | |
| "agent": self.AGENT_NAME, | |
| "action": "chart_generation_warning", | |
| "ticker": ticker, | |
| "error": str(chart_error), | |
| "timestamp": time.time(), | |
| } | |
| ) | |
| ) | |
| # Extract cost tracker from state | |
| cost_tracker = state.get("_cost_tracker") | |
| # Interpret indicators using LLM | |
| interpretation = self._interpret_with_llm( | |
| state["ticker"], | |
| state["timeframe"], | |
| indicators_result, | |
| df, | |
| investment_style, | |
| cost_tracker, | |
| ) | |
| # Append educational notes to interpretation if available | |
| if educational_notes: | |
| interpretation += "\n\n### 📚 Educational Notes\n\n" + "\n\n".join( | |
| educational_notes | |
| ) | |
| # Update state | |
| new_state = update_analysis_result(state, "indicators", indicators_result) | |
| new_state = add_agent_message( | |
| new_state, | |
| self.AGENT_NAME, | |
| interpretation, | |
| metadata={ | |
| "indicators": indicators_result, | |
| "chart_paths": chart_paths, | |
| "educational_mode": educational_mode, | |
| }, | |
| ) | |
| execution_time = time.time() - start_time | |
| logger.info( | |
| json.dumps( | |
| { | |
| "agent": self.AGENT_NAME, | |
| "action": "complete", | |
| "ticker": ticker, | |
| "timeframe": timeframe, | |
| "execution_time": execution_time, | |
| "indicators_calculated": list(indicators_result.keys()), | |
| "timestamp": time.time(), | |
| } | |
| ) | |
| ) | |
| return new_state | |
| except Exception as e: | |
| execution_time = time.time() - start_time | |
| logger.error( | |
| json.dumps( | |
| { | |
| "agent": self.AGENT_NAME, | |
| "action": "error", | |
| "ticker": ticker, | |
| "timeframe": timeframe, | |
| "execution_time": execution_time, | |
| "error": str(e), | |
| "timestamp": time.time(), | |
| } | |
| ) | |
| ) | |
| # Add error message to state | |
| error_state = add_agent_message( | |
| state, | |
| self.AGENT_NAME, | |
| f"Error calculating indicators: {str(e)}", | |
| metadata={"error": True}, | |
| ) | |
| return error_state | |
| def _calculate_indicators(self, df: pd.DataFrame) -> Dict[str, Any]: | |
| """ | |
| Calculate all technical indicators. | |
| Args: | |
| df: OHLC DataFrame | |
| Returns: | |
| Dict with indicator results | |
| """ | |
| result = {} | |
| # RSI | |
| try: | |
| rsi_series = calculate_rsi( | |
| df, | |
| period=self.indicator_params["rsi_period"], | |
| ) | |
| current_rsi = float(rsi_series.iloc[-1]) | |
| rsi_interpretation = interpret_rsi(current_rsi) | |
| # Try to find divergences, but don't fail if it doesn't work | |
| try: | |
| rsi_divergence = find_rsi_divergence(df, rsi_series) | |
| except Exception: | |
| rsi_divergence = {"bullish": [], "bearish": []} | |
| result["rsi"] = { | |
| "value": current_rsi, | |
| "interpretation": rsi_interpretation, | |
| "divergences": rsi_divergence, | |
| "series": rsi_series.to_dict(), # For charting | |
| } | |
| except Exception as e: | |
| result["rsi"] = {"error": str(e)} | |
| # MACD | |
| try: | |
| logger.info(f"Calculating MACD with {len(df)} data points") | |
| macd, signal, hist = calculate_macd( | |
| df, | |
| fast_period=self.indicator_params["macd_fast"], | |
| slow_period=self.indicator_params["macd_slow"], | |
| signal_period=self.indicator_params["macd_signal"], | |
| ) | |
| logger.info( | |
| f"MACD calculation succeeded: macd_len={len(macd)}, valid_values={(~pd.isna(macd)).sum()}" | |
| ) | |
| current_macd = float(macd.iloc[-1]) if not pd.isna(macd.iloc[-1]) else None | |
| current_signal = ( | |
| float(signal.iloc[-1]) if not pd.isna(signal.iloc[-1]) else None | |
| ) | |
| current_hist = float(hist.iloc[-1]) if not pd.isna(hist.iloc[-1]) else None | |
| prev_hist = ( | |
| float(hist.iloc[-2]) | |
| if len(hist) > 1 and not pd.isna(hist.iloc[-2]) | |
| else None | |
| ) | |
| macd_interpretation = interpret_macd( | |
| current_macd, current_signal, current_hist, prev_hist | |
| ) | |
| # Try to find crossovers and divergences, but don't fail if it doesn't work | |
| try: | |
| macd_crossovers = find_macd_crossovers(macd, signal) | |
| except Exception: | |
| macd_crossovers = {"bullish": [], "bearish": []} | |
| try: | |
| macd_divergence = find_macd_divergence(df, macd) | |
| except Exception: | |
| macd_divergence = {"bullish": [], "bearish": []} | |
| result["macd"] = { | |
| "macd": current_macd, | |
| "signal": current_signal, | |
| "histogram": current_hist, | |
| "interpretation": macd_interpretation, | |
| "crossovers": macd_crossovers, | |
| "divergences": macd_divergence, | |
| "series": { | |
| "macd": macd.to_dict(), | |
| "signal": signal.to_dict(), | |
| "histogram": hist.to_dict(), | |
| }, | |
| } | |
| except Exception as e: | |
| logger.error(f"MACD calculation failed: {str(e)}") | |
| result["macd"] = {"error": str(e)} | |
| # Stochastic Oscillator | |
| try: | |
| k_series, d_series = calculate_stochastic( | |
| df, | |
| k_period=self.indicator_params["stoch_k_period"], | |
| d_period=self.indicator_params["stoch_d_period"], | |
| ) | |
| current_k = ( | |
| float(k_series.iloc[-1]) if not pd.isna(k_series.iloc[-1]) else None | |
| ) | |
| current_d = ( | |
| float(d_series.iloc[-1]) if not pd.isna(d_series.iloc[-1]) else None | |
| ) | |
| prev_k = ( | |
| float(k_series.iloc[-2]) | |
| if len(k_series) > 1 and not pd.isna(k_series.iloc[-2]) | |
| else None | |
| ) | |
| prev_d = ( | |
| float(d_series.iloc[-2]) | |
| if len(d_series) > 1 and not pd.isna(d_series.iloc[-2]) | |
| else None | |
| ) | |
| stoch_interpretation = interpret_stochastic( | |
| current_k, current_d, prev_k, prev_d | |
| ) | |
| stoch_crossovers = find_stochastic_crossovers(k_series, d_series) | |
| result["stochastic"] = { | |
| "k": current_k, | |
| "d": current_d, | |
| "interpretation": stoch_interpretation, | |
| "crossovers": stoch_crossovers, | |
| "series": { | |
| "k": k_series.to_dict(), | |
| "d": d_series.to_dict(), | |
| }, | |
| } | |
| except Exception as e: | |
| result["stochastic"] = {"error": str(e)} | |
| return result | |
| def _interpret_with_llm( | |
| self, | |
| ticker: str, | |
| timeframe: str, | |
| indicators: Dict[str, Any], | |
| df: pd.DataFrame, | |
| investment_style: Optional[str] = None, | |
| cost_tracker=None, | |
| ) -> str: | |
| """ | |
| Use LLM to interpret indicator signals holistically. | |
| Args: | |
| ticker: Asset ticker | |
| timeframe: Analysis timeframe | |
| indicators: Calculated indicators | |
| df: OHLC DataFrame | |
| investment_style: Investment style for context | |
| cost_tracker: Optional cost tracker for tracking LLM costs | |
| Returns: | |
| LLM interpretation string | |
| """ | |
| # Prepare indicator summary | |
| current_price = float(df["close"].iloc[-1]) | |
| summary_parts = [ | |
| f"Asset: {ticker}", | |
| f"Timeframe: {timeframe}", | |
| f"Current Price: ${current_price:.2f}", | |
| "", | |
| "Technical Indicators:", | |
| ] | |
| # RSI | |
| if "rsi" in indicators and "value" in indicators["rsi"]: | |
| rsi = indicators["rsi"] | |
| summary_parts.append( | |
| f"- RSI({self.indicator_params['rsi_period']}): {rsi['value']:.2f}" | |
| ) | |
| summary_parts.append(f" {rsi['interpretation']}") | |
| if rsi.get("divergences", {}).get("bullish"): | |
| summary_parts.append( | |
| f" Bullish divergences detected at indices: {rsi['divergences']['bullish']}" | |
| ) | |
| if rsi.get("divergences", {}).get("bearish"): | |
| summary_parts.append( | |
| f" Bearish divergences detected at indices: {rsi['divergences']['bearish']}" | |
| ) | |
| # MACD | |
| if "macd" in indicators and "macd" in indicators["macd"]: | |
| macd = indicators["macd"] | |
| summary_parts.append( | |
| f"- MACD: {macd['macd']:.4f}, Signal: {macd['signal']:.4f}, Histogram: {macd['histogram']:.4f}" | |
| ) | |
| summary_parts.append(f" {macd['interpretation']}") | |
| if macd.get("crossovers", {}).get("bullish"): | |
| summary_parts.append( | |
| f" Recent bullish crossovers at indices: {macd['crossovers']['bullish'][-3:]}" | |
| ) | |
| if macd.get("crossovers", {}).get("bearish"): | |
| summary_parts.append( | |
| f" Recent bearish crossovers at indices: {macd['crossovers']['bearish'][-3:]}" | |
| ) | |
| # Stochastic | |
| if "stochastic" in indicators and "k" in indicators["stochastic"]: | |
| stoch = indicators["stochastic"] | |
| summary_parts.append( | |
| f"- Stochastic: %K={stoch['k']:.2f}, %D={stoch['d']:.2f}" | |
| ) | |
| summary_parts.append(f" {stoch['interpretation']}") | |
| indicator_summary = "\n".join(summary_parts) | |
| # Get investment style context | |
| style_context = get_technical_analysis_style_context(investment_style) | |
| # LLM prompt with specialized indicator template | |
| system_prompt = f"""{INDICATOR_AGENT_PROMPT} | |
| Investment Style Context: | |
| {style_context} | |
| IMPORTANT: Your response MUST follow the exact structure shown in the template above, including: | |
| - Markdown section headers (##) | |
| - Data tables with proper markdown table syntax (| pipes) | |
| - Bullet-pointed insights (-) | |
| - Numbered summary points (1., 2., 3.) | |
| - Clear conclusion with trading implication""" | |
| user_prompt = f"""Analyze the following technical indicators for {ticker} ({timeframe} timeframe) and provide a comprehensive technical analysis following the template structure: | |
| {indicator_summary} | |
| Generate your response following the exact template structure with all sections, tables, bullet points, and numbered summary.""" | |
| # Call LLM with cost tracking callback | |
| messages = [ | |
| SystemMessage(content=system_prompt), | |
| HumanMessage(content=user_prompt), | |
| ] | |
| # Create callback if cost tracker is available | |
| if cost_tracker: | |
| callback = cost_tracker.get_callback(agent_name=self.AGENT_NAME) | |
| response = self.llm.invoke(messages, config={"callbacks": [callback]}) | |
| else: | |
| response = self.llm.invoke(messages) | |
| return response.content | |
| def _deserialize_dataframe(self, data: Dict[str, Any]) -> pd.DataFrame: | |
| """ | |
| Convert serialized data back to DataFrame. | |
| Args: | |
| data: Serialized DataFrame data | |
| Returns: | |
| pandas DataFrame | |
| """ | |
| # Assuming data is stored as dict with columns | |
| # This will be properly implemented when we serialize in the workflow | |
| df = pd.DataFrame(data) | |
| if "Date" in df.columns: | |
| df["Date"] = pd.to_datetime(df["Date"]) | |
| df = df.set_index("Date") | |
| return df | |