""" Pattern Agent for chart pattern recognition and analysis. This agent identifies candlestick patterns, chart patterns, and support/resistance levels using both algorithmic detection and LLM vision analysis. """ import json import logging import time from typing import Any, Dict, List, Optional import pandas as pd from langchain_core.messages import HumanMessage, SystemMessage # Configure logger logger = logging.getLogger(__name__) from config.default_config import DEFAULT_CONFIG from config.models import AGENT_MODELS from config.prompt_templates import PATTERN_AGENT_PROMPT from graph.state.agent_state import add_agent_message, update_analysis_result from graph.state.trading_state import TechnicalWorkflowState from utils.charts.annotations import ChartAnnotations from utils.investment_style_helpers import ( get_investment_style_from_state, get_technical_analysis_style_context, ) from utils.llm.provider_factory import LLMProviderFactory class PatternAgent: """ Chart Pattern Recognition Agent. Responsibilities: - Identify candlestick patterns (doji, hammer, engulfing, etc.) - Detect chart patterns (triangles, channels, head-and-shoulders) - Find support and resistance levels - Analyze pattern significance and reliability """ AGENT_NAME = "pattern_agent" def __init__(self, config: Optional[Dict[str, Any]] = None): """ Initialize Pattern Agent. Args: config: Optional configuration override """ self.config = config or DEFAULT_CONFIG # Initialize LLM (needs vision capability for chart analysis) - use runtime provider override if available from config.models import DEFAULT_MODELS_BY_PROVIDER model_config = AGENT_MODELS[self.AGENT_NAME] runtime_provider = self.config.get("llm_provider", model_config["provider"]) # If provider is overridden but model is not, use default model for that provider if "llm_provider" in self.config and "llm_model" not in self.config: runtime_model = DEFAULT_MODELS_BY_PROVIDER.get( runtime_provider, model_config["model"] ) else: runtime_model = self.config.get("llm_model", model_config["model"]) self.llm = LLMProviderFactory.create( provider=runtime_provider, model=runtime_model, temperature=model_config["temperature"], ) def _get_timeframe_significance(self, timeframe: str) -> Dict[str, Any]: """ Get timeframe significance level for pattern analysis. Args: timeframe: Timeframe string (1m, 5m, 15m, 30m, 1h, 4h, 1d, 1w) Returns: Dict with significance info """ timeframe_map = { "1m": {"weight": 0.3, "label": "1-minute", "scope": "scalping"}, "5m": {"weight": 0.4, "label": "5-minute", "scope": "scalping"}, "15m": {"weight": 0.5, "label": "15-minute", "scope": "day trading"}, "30m": {"weight": 0.6, "label": "30-minute", "scope": "day trading"}, "1h": {"weight": 0.7, "label": "1-hour", "scope": "swing trading"}, "4h": {"weight": 0.8, "label": "4-hour", "scope": "swing trading"}, "1d": {"weight": 0.9, "label": "daily", "scope": "position trading"}, "1w": {"weight": 1.0, "label": "weekly", "scope": "long-term"}, } return timeframe_map.get( timeframe, {"weight": 0.5, "label": timeframe, "scope": "intraday"} ) def run(self, state: TechnicalWorkflowState) -> TechnicalWorkflowState: """ Execute pattern recognition. Args: state: Current workflow state Returns: Updated state with pattern analysis """ start_time = time.time() ticker = state.get("ticker", "UNKNOWN") timeframe = state.get("timeframe", "UNKNOWN") logger.info( json.dumps( { "agent": self.AGENT_NAME, "action": "start", "ticker": ticker, "timeframe": timeframe, "timestamp": time.time(), } ) ) try: # Extract market data market_data = state["market_data"] if not market_data.get("ohlc_data"): raise ValueError("No OHLC data available for pattern recognition") # Convert serialized DataFrame back to pandas DataFrame df = self._deserialize_dataframe(market_data["ohlc_data"]) # Get timeframe significance for pattern annotation timeframe_info = self._get_timeframe_significance(timeframe) # Detect patterns with timeframe context patterns_result = self._detect_patterns(df, timeframe, timeframe_info) # Find support/resistance levels levels = ChartAnnotations.find_support_resistance_levels( df, window=20, num_levels=3 ) patterns_result["support_levels"] = levels["support"] patterns_result["resistance_levels"] = levels["resistance"] # Get investment style from state investment_style = get_investment_style_from_state(state) # Pattern agent does not generate separate charts # The main candlestick chart above the tabs shows the pricing data # Pattern agent only provides textual analysis of detected patterns chart_paths = [] educational_notes = [] config_dict = state.get("config", {}) educational_mode = config_dict.get("educational_mode", False) chart_patterns = patterns_result.get("chart_patterns", []) candlestick_patterns = patterns_result.get("candlestick_patterns", []) # Generate educational notes for detected patterns if enabled if educational_mode: for pattern in candlestick_patterns[:10]: # Limit to top 10 patterns try: from utils.formatters.educational_content import ( generate_pattern_explanation, ) pattern_name = ( pattern.get("pattern", "") .lower() .replace(" ", "_") .replace("-", "_") ) explanation = generate_pattern_explanation(pattern_name) educational_notes.append( f"**{pattern.get('pattern')}**:\n{explanation}" ) except Exception as e: logger.warning(f"Failed to generate educational note: {e}") # Extract cost tracker from state cost_tracker = state.get("_cost_tracker") # Interpret patterns using LLM interpretation = self._interpret_with_llm( state["ticker"], state["timeframe"], patterns_result, df, investment_style, cost_tracker, ) # Update state new_state = update_analysis_result(state, "patterns", patterns_result) new_state = add_agent_message( new_state, self.AGENT_NAME, interpretation, metadata={ "patterns": patterns_result, "charts": chart_paths, # Changed from "chart_paths" to "charts" for UI compatibility "educational_notes": "\n\n".join(educational_notes) if educational_notes else None, }, ) execution_time = time.time() - start_time logger.info( json.dumps( { "agent": self.AGENT_NAME, "action": "complete", "ticker": ticker, "timeframe": timeframe, "execution_time": execution_time, "candlestick_patterns": len( patterns_result.get("candlestick_patterns", []) ), "chart_patterns": len( patterns_result.get("chart_patterns", []) ), "timestamp": time.time(), } ) ) return new_state except Exception as e: execution_time = time.time() - start_time logger.error( json.dumps( { "agent": self.AGENT_NAME, "action": "error", "ticker": ticker, "timeframe": timeframe, "execution_time": execution_time, "error": str(e), "timestamp": time.time(), } ) ) # Add error message to state error_state = add_agent_message( state, self.AGENT_NAME, f"Error detecting patterns: {str(e)}", metadata={"error": True}, ) return error_state def _detect_patterns( self, df: pd.DataFrame, timeframe: str, timeframe_info: Dict[str, Any] ) -> Dict[str, Any]: """ Detect candlestick and chart patterns with timeframe significance. Args: df: OHLC DataFrame timeframe: Timeframe string (e.g., "1d", "4h") timeframe_info: Timeframe significance info Returns: Dict with pattern results including timeframe context """ result = { "candlestick_patterns": [], "chart_patterns": [], "support_levels": [], "resistance_levels": [], "trend_lines": [], "timeframe_context": { "timeframe": timeframe, "label": timeframe_info["label"], "scope": timeframe_info["scope"], "weight": timeframe_info["weight"], }, } # Detect candlestick patterns with timeframe context candlestick_patterns = self._detect_candlestick_patterns( df, timeframe, timeframe_info ) result["candlestick_patterns"] = candlestick_patterns # Detect chart patterns with timeframe context chart_patterns = self._detect_chart_patterns(df, timeframe, timeframe_info) result["chart_patterns"] = chart_patterns return result def _detect_candlestick_patterns( self, df: pd.DataFrame, timeframe: str, timeframe_info: Dict[str, Any] ) -> List[Dict[str, Any]]: """ Detect common candlestick patterns with timeframe significance. Args: df: OHLC DataFrame timeframe: Timeframe string timeframe_info: Timeframe significance info Returns: List of detected patterns with timeframe context """ patterns = [] # Look at last 10 candles for patterns window = min(10, len(df)) recent_df = df.iloc[-window:] for i in range(len(recent_df)): idx = recent_df.index[i] row = recent_df.iloc[i] open_price = row["open"] close_price = row["close"] high = row["high"] low = row["low"] body_size = abs(close_price - open_price) total_range = high - low # Doji pattern (small body) if total_range > 0 and body_size / total_range < 0.1: patterns.append( { "name": "Doji", "location": len(df) - window + i, "date": str(idx), "signal": "neutral", "confidence": 0.7 * timeframe_info["weight"], # Adjust confidence by timeframe "description": f"Indecision in market on {timeframe_info['label']} chart, potential reversal", "timeframe": timeframe, "timeframe_label": timeframe_info["label"], "significance": timeframe_info["scope"], } ) # Hammer pattern (bullish reversal) if total_range > 0: upper_wick = high - max(open_price, close_price) lower_wick = min(open_price, close_price) - low if lower_wick > 2 * body_size and upper_wick < body_size: patterns.append( { "name": "Hammer", "location": len(df) - window + i, "date": str(idx), "signal": "bullish", "confidence": 0.75 * timeframe_info["weight"], "description": f"Bullish reversal signal on {timeframe_info['label']} chart, sellers exhausted", "timeframe": timeframe, "timeframe_label": timeframe_info["label"], "significance": timeframe_info["scope"], } ) # Shooting Star pattern (bearish reversal) if upper_wick > 2 * body_size and lower_wick < body_size: patterns.append( { "name": "Shooting Star", "location": len(df) - window + i, "date": str(idx), "signal": "bearish", "confidence": 0.75 * timeframe_info["weight"], "description": f"Bearish reversal signal on {timeframe_info['label']} chart, buyers exhausted", "timeframe": timeframe, "timeframe_label": timeframe_info["label"], "significance": timeframe_info["scope"], } ) # Engulfing patterns (need previous candle) if i > 0: prev_row = recent_df.iloc[i - 1] prev_open = prev_row["open"] prev_close = prev_row["close"] # Bullish engulfing if ( prev_close < prev_open # Previous candle bearish and close_price > open_price # Current candle bullish and open_price < prev_close # Opens below previous close and close_price > prev_open ): # Closes above previous open patterns.append( { "name": "Bullish Engulfing", "location": len(df) - window + i, "date": str(idx), "signal": "bullish", "confidence": 0.8 * timeframe_info["weight"], "description": f"Strong bullish reversal on {timeframe_info['label']} chart, buyers taking control", "timeframe": timeframe, "timeframe_label": timeframe_info["label"], "significance": timeframe_info["scope"], } ) # Bearish engulfing if ( prev_close > prev_open # Previous candle bullish and close_price < open_price # Current candle bearish and open_price > prev_close # Opens above previous close and close_price < prev_open ): # Closes below previous open patterns.append( { "name": "Bearish Engulfing", "location": len(df) - window + i, "date": str(idx), "signal": "bearish", "confidence": 0.8 * timeframe_info["weight"], "description": f"Strong bearish reversal on {timeframe_info['label']} chart, sellers taking control", "timeframe": timeframe, "timeframe_label": timeframe_info["label"], "significance": timeframe_info["scope"], } ) return patterns def _detect_chart_patterns( self, df: pd.DataFrame, timeframe: str, timeframe_info: Dict[str, Any] ) -> List[Dict[str, Any]]: """ Detect chart patterns like triangles, channels, head-and-shoulders. This is a simplified algorithmic approach. In production, this would use more sophisticated pattern recognition algorithms or TA-Lib. Args: df: OHLC DataFrame timeframe: Timeframe string timeframe_info: Timeframe significance info Returns: List of detected chart patterns with timeframe context """ patterns = [] # Check for ascending triangle (flat resistance, rising support) if len(df) >= 20: recent_highs = df["high"].iloc[-20:] recent_lows = df["low"].iloc[-20:] # Flat top (resistance) high_std = recent_highs.std() high_mean = recent_highs.mean() # Rising lows (support) first_half_lows = recent_lows.iloc[:10].mean() second_half_lows = recent_lows.iloc[10:].mean() if ( high_std / high_mean < 0.02 and second_half_lows > first_half_lows * 1.01 ): patterns.append( { "type": "Ascending Triangle", "confidence": 0.65 * timeframe_info["weight"], "signal": "bullish", "description": f"Bullish continuation pattern on {timeframe_info['label']} chart, breakout likely upward", "resistance": float(high_mean), "support_trend": "rising", "timeframe": timeframe, "timeframe_label": timeframe_info["label"], "significance": timeframe_info["scope"], } ) # Descending triangle (flat support, falling resistance) low_std = recent_lows.std() low_mean = recent_lows.mean() first_half_highs = recent_highs.iloc[:10].mean() second_half_highs = recent_highs.iloc[10:].mean() if ( low_std / low_mean < 0.02 and second_half_highs < first_half_highs * 0.99 ): patterns.append( { "type": "Descending Triangle", "confidence": 0.65 * timeframe_info["weight"], "signal": "bearish", "description": f"Bearish continuation pattern on {timeframe_info['label']} chart, breakout likely downward", "support": float(low_mean), "resistance_trend": "falling", "timeframe": timeframe, "timeframe_label": timeframe_info["label"], "significance": timeframe_info["scope"], } ) # Check for head-and-shoulders pattern if len(df) >= 30: head_shoulder_pattern = self._detect_head_and_shoulders(df, timeframe_info) if head_shoulder_pattern: patterns.append(head_shoulder_pattern) logger.info( f"Detected head-and-shoulders pattern: {head_shoulder_pattern}" ) else: logger.debug("No head-and-shoulders pattern detected") # Check for double-bottom or double-top patterns if len(df) >= 20: double_patterns = self._detect_double_bottom_top(df, timeframe_info) if double_patterns: logger.info( f"Detected {len(double_patterns)} double-bottom/top patterns" ) patterns.extend(double_patterns) else: logger.debug("No double-bottom/top patterns detected") logger.info(f"Total chart patterns detected: {len(patterns)}") return patterns def _detect_head_and_shoulders( self, df: pd.DataFrame, timeframe_info: Dict[str, Any] ) -> Optional[Dict[str, Any]]: """ Detect head-and-shoulders pattern. Args: df: OHLC DataFrame timeframe_info: Timeframe significance info Returns: Pattern dict if detected, None otherwise """ # Look at last 30 candlesticks window = df.iloc[-30:] highs = window["high"].values lows = window["low"].values # Find local maxima (potential shoulders and head) import numpy as np from scipy.signal import argrelextrema try: peaks = argrelextrema(highs, np.greater, order=3)[0] if len(peaks) >= 3: # Check if we have a head-and-shoulders pattern # (left shoulder, head, right shoulder pattern) for i in range(len(peaks) - 2): left_idx = peaks[i] head_idx = peaks[i + 1] right_idx = peaks[i + 2] left_price = highs[left_idx] head_price = highs[head_idx] right_price = highs[right_idx] # Head should be higher than both shoulders # Shoulders should be roughly equal (within 3%) if ( head_price > left_price and head_price > right_price and abs(left_price - right_price) / left_price < 0.03 ): # Find neckline (lows between peaks) valley1_idx = left_idx + np.argmin(lows[left_idx:head_idx]) valley2_idx = head_idx + np.argmin(lows[head_idx:right_idx]) neckline_price = (lows[valley1_idx] + lows[valley2_idx]) / 2 return { "type": "Head and Shoulders", "confidence": 0.70 * timeframe_info["weight"], "signal": "bearish", "description": f"Bearish reversal pattern on {timeframe_info['label']} chart, breakdown likely if neckline breaks", "timeframe": window.index[0].strftime("%Y-%m-%d"), "timeframe_label": timeframe_info["label"], "significance": timeframe_info["scope"], "points": { "left_shoulder": int(left_idx), "head": int(head_idx), "right_shoulder": int(right_idx), "neckline": [ (valley1_idx, neckline_price), (valley2_idx, neckline_price), ], }, } except Exception: pass # scipy not available or pattern not found return None def _detect_double_bottom_top( self, df: pd.DataFrame, timeframe_info: Dict[str, Any] ) -> List[Dict[str, Any]]: """ Detect double-bottom or double-top patterns. Args: df: OHLC DataFrame timeframe_info: Timeframe significance info Returns: List of detected patterns """ patterns = [] window = df.iloc[-20:] try: import numpy as np from scipy.signal import argrelextrema # Detect double bottom (two lows at similar levels) lows = window["low"].values troughs = argrelextrema(lows, np.less, order=2)[0] if len(troughs) >= 2: # Check last two troughs for double bottom first_idx = troughs[-2] second_idx = troughs[-1] first_price = lows[first_idx] second_price = lows[second_idx] # Prices should be within 2% of each other if abs(first_price - second_price) / first_price < 0.02: # Find resistance (peak between the troughs) middle_peak_idx = first_idx + np.argmax( window["high"].values[first_idx:second_idx] ) resistance = window["high"].values[middle_peak_idx] patterns.append( { "type": "Double Bottom", "confidence": 0.70 * timeframe_info["weight"], "signal": "bullish", "description": f"Bullish reversal pattern on {timeframe_info['label']} chart, breakout likely if resistance breaks", "timeframe": window.index[0].strftime("%Y-%m-%d"), "timeframe_label": timeframe_info["label"], "significance": timeframe_info["scope"], "points": { "first": int(first_idx), "second": int(second_idx), "resistance_support": float(resistance), }, } ) # Detect double top (two highs at similar levels) highs = window["high"].values peaks = argrelextrema(highs, np.greater, order=2)[0] if len(peaks) >= 2: # Check last two peaks for double top first_idx = peaks[-2] second_idx = peaks[-1] first_price = highs[first_idx] second_price = highs[second_idx] # Prices should be within 2% of each other if abs(first_price - second_price) / first_price < 0.02: # Find support (trough between the peaks) middle_trough_idx = first_idx + np.argmin( window["low"].values[first_idx:second_idx] ) support = window["low"].values[middle_trough_idx] patterns.append( { "type": "Double Top", "confidence": 0.70 * timeframe_info["weight"], "signal": "bearish", "description": f"Bearish reversal pattern on {timeframe_info['label']} chart, breakdown likely if support breaks", "timeframe": window.index[0].strftime("%Y-%m-%d"), "timeframe_label": timeframe_info["label"], "significance": timeframe_info["scope"], "points": { "first": int(first_idx), "second": int(second_idx), "resistance_support": float(support), }, } ) except Exception: pass # scipy not available or patterns not found return patterns def _interpret_with_llm( self, ticker: str, timeframe: str, patterns: Dict[str, Any], df: pd.DataFrame, investment_style: Optional[str] = None, cost_tracker=None, ) -> str: """ Use LLM to interpret pattern significance. Args: ticker: Asset ticker timeframe: Analysis timeframe patterns: Detected patterns df: OHLC DataFrame investment_style: Investment style for context cost_tracker: Optional cost tracker for tracking LLM costs Returns: LLM interpretation string """ current_price = float(df["close"].iloc[-1]) summary_parts = [ f"Asset: {ticker}", f"Timeframe: {timeframe}", f"Current Price: ${current_price:.2f}", "", "Pattern Analysis:", ] # Candlestick patterns if patterns.get("candlestick_patterns"): summary_parts.append("\nCandlestick Patterns:") for pattern in patterns["candlestick_patterns"]: summary_parts.append( f"- {pattern['name']} ({pattern['signal']}, confidence: {pattern['confidence']:.0%})" ) summary_parts.append(f" {pattern['description']}") else: summary_parts.append("\nNo significant candlestick patterns detected") # Chart patterns if patterns.get("chart_patterns"): summary_parts.append("\nChart Patterns:") for pattern in patterns["chart_patterns"]: summary_parts.append( f"- {pattern['type']} ({pattern['signal']}, confidence: {pattern['confidence']:.0%})" ) summary_parts.append(f" {pattern['description']}") else: summary_parts.append("\nNo major chart patterns detected") # Support/Resistance levels if patterns.get("support_levels"): summary_parts.append( f"\nSupport Levels: {[f'${s:.2f}' for s in patterns['support_levels']]}" ) if patterns.get("resistance_levels"): summary_parts.append( f"Resistance Levels: {[f'${r:.2f}' for r in patterns['resistance_levels']]}" ) pattern_summary = "\n".join(summary_parts) # Get investment style context style_context = get_technical_analysis_style_context(investment_style) # LLM prompt with specialized pattern template system_prompt = f"""{PATTERN_AGENT_PROMPT} Investment Style Context: {style_context} IMPORTANT: Your response MUST follow the exact structure shown in the template above, including: - Markdown section headers (##) - Data tables with proper markdown table syntax (| pipes) - Bullet-pointed insights (-) - Numbered summary points (1., 2., 3.) - Clear conclusion with trading implication""" user_prompt = f"""Analyze the following pattern data for {ticker} ({timeframe} timeframe) and provide a comprehensive technical analysis following the template structure: {pattern_summary} Generate your response following the exact template structure with all sections, tables, bullet points, and numbered summary.""" # Call LLM with cost tracking callback messages = [ SystemMessage(content=system_prompt), HumanMessage(content=user_prompt), ] # Create callback if cost tracker is available if cost_tracker: callback = cost_tracker.get_callback(agent_name=self.AGENT_NAME) response = self.llm.invoke(messages, config={"callbacks": [callback]}) else: response = self.llm.invoke(messages) return response.content def _deserialize_dataframe(self, data: Dict[str, Any]) -> pd.DataFrame: """ Convert serialized data back to DataFrame. Args: data: Serialized DataFrame data Returns: pandas DataFrame """ df = pd.DataFrame(data) if "Date" in df.columns: df["Date"] = pd.to_datetime(df["Date"]) df = df.set_index("Date") return df