trading-tools / agents /technical /pattern_agent.py
Deploy Bot
Deploy Trading Analysis Platform to HuggingFace Spaces
a1bf219
"""
Pattern Agent for chart pattern recognition and analysis.
This agent identifies candlestick patterns, chart patterns, and support/resistance levels
using both algorithmic detection and LLM vision analysis.
"""
import json
import logging
import time
from typing import Any, Dict, List, Optional
import pandas as pd
from langchain_core.messages import HumanMessage, SystemMessage
# Configure logger
logger = logging.getLogger(__name__)
from config.default_config import DEFAULT_CONFIG
from config.models import AGENT_MODELS
from config.prompt_templates import PATTERN_AGENT_PROMPT
from graph.state.agent_state import add_agent_message, update_analysis_result
from graph.state.trading_state import TechnicalWorkflowState
from utils.charts.annotations import ChartAnnotations
from utils.investment_style_helpers import (
get_investment_style_from_state,
get_technical_analysis_style_context,
)
from utils.llm.provider_factory import LLMProviderFactory
class PatternAgent:
"""
Chart Pattern Recognition Agent.
Responsibilities:
- Identify candlestick patterns (doji, hammer, engulfing, etc.)
- Detect chart patterns (triangles, channels, head-and-shoulders)
- Find support and resistance levels
- Analyze pattern significance and reliability
"""
AGENT_NAME = "pattern_agent"
def __init__(self, config: Optional[Dict[str, Any]] = None):
"""
Initialize Pattern Agent.
Args:
config: Optional configuration override
"""
self.config = config or DEFAULT_CONFIG
# Initialize LLM (needs vision capability for chart analysis) - use runtime provider override if available
from config.models import DEFAULT_MODELS_BY_PROVIDER
model_config = AGENT_MODELS[self.AGENT_NAME]
runtime_provider = self.config.get("llm_provider", model_config["provider"])
# If provider is overridden but model is not, use default model for that provider
if "llm_provider" in self.config and "llm_model" not in self.config:
runtime_model = DEFAULT_MODELS_BY_PROVIDER.get(
runtime_provider, model_config["model"]
)
else:
runtime_model = self.config.get("llm_model", model_config["model"])
self.llm = LLMProviderFactory.create(
provider=runtime_provider,
model=runtime_model,
temperature=model_config["temperature"],
)
def _get_timeframe_significance(self, timeframe: str) -> Dict[str, Any]:
"""
Get timeframe significance level for pattern analysis.
Args:
timeframe: Timeframe string (1m, 5m, 15m, 30m, 1h, 4h, 1d, 1w)
Returns:
Dict with significance info
"""
timeframe_map = {
"1m": {"weight": 0.3, "label": "1-minute", "scope": "scalping"},
"5m": {"weight": 0.4, "label": "5-minute", "scope": "scalping"},
"15m": {"weight": 0.5, "label": "15-minute", "scope": "day trading"},
"30m": {"weight": 0.6, "label": "30-minute", "scope": "day trading"},
"1h": {"weight": 0.7, "label": "1-hour", "scope": "swing trading"},
"4h": {"weight": 0.8, "label": "4-hour", "scope": "swing trading"},
"1d": {"weight": 0.9, "label": "daily", "scope": "position trading"},
"1w": {"weight": 1.0, "label": "weekly", "scope": "long-term"},
}
return timeframe_map.get(
timeframe, {"weight": 0.5, "label": timeframe, "scope": "intraday"}
)
def run(self, state: TechnicalWorkflowState) -> TechnicalWorkflowState:
"""
Execute pattern recognition.
Args:
state: Current workflow state
Returns:
Updated state with pattern analysis
"""
start_time = time.time()
ticker = state.get("ticker", "UNKNOWN")
timeframe = state.get("timeframe", "UNKNOWN")
logger.info(
json.dumps(
{
"agent": self.AGENT_NAME,
"action": "start",
"ticker": ticker,
"timeframe": timeframe,
"timestamp": time.time(),
}
)
)
try:
# Extract market data
market_data = state["market_data"]
if not market_data.get("ohlc_data"):
raise ValueError("No OHLC data available for pattern recognition")
# Convert serialized DataFrame back to pandas DataFrame
df = self._deserialize_dataframe(market_data["ohlc_data"])
# Get timeframe significance for pattern annotation
timeframe_info = self._get_timeframe_significance(timeframe)
# Detect patterns with timeframe context
patterns_result = self._detect_patterns(df, timeframe, timeframe_info)
# Find support/resistance levels
levels = ChartAnnotations.find_support_resistance_levels(
df, window=20, num_levels=3
)
patterns_result["support_levels"] = levels["support"]
patterns_result["resistance_levels"] = levels["resistance"]
# Get investment style from state
investment_style = get_investment_style_from_state(state)
# Pattern agent does not generate separate charts
# The main candlestick chart above the tabs shows the pricing data
# Pattern agent only provides textual analysis of detected patterns
chart_paths = []
educational_notes = []
config_dict = state.get("config", {})
educational_mode = config_dict.get("educational_mode", False)
chart_patterns = patterns_result.get("chart_patterns", [])
candlestick_patterns = patterns_result.get("candlestick_patterns", [])
# Generate educational notes for detected patterns if enabled
if educational_mode:
for pattern in candlestick_patterns[:10]: # Limit to top 10 patterns
try:
from utils.formatters.educational_content import (
generate_pattern_explanation,
)
pattern_name = (
pattern.get("pattern", "")
.lower()
.replace(" ", "_")
.replace("-", "_")
)
explanation = generate_pattern_explanation(pattern_name)
educational_notes.append(
f"**{pattern.get('pattern')}**:\n{explanation}"
)
except Exception as e:
logger.warning(f"Failed to generate educational note: {e}")
# Extract cost tracker from state
cost_tracker = state.get("_cost_tracker")
# Interpret patterns using LLM
interpretation = self._interpret_with_llm(
state["ticker"],
state["timeframe"],
patterns_result,
df,
investment_style,
cost_tracker,
)
# Update state
new_state = update_analysis_result(state, "patterns", patterns_result)
new_state = add_agent_message(
new_state,
self.AGENT_NAME,
interpretation,
metadata={
"patterns": patterns_result,
"charts": chart_paths, # Changed from "chart_paths" to "charts" for UI compatibility
"educational_notes": "\n\n".join(educational_notes)
if educational_notes
else None,
},
)
execution_time = time.time() - start_time
logger.info(
json.dumps(
{
"agent": self.AGENT_NAME,
"action": "complete",
"ticker": ticker,
"timeframe": timeframe,
"execution_time": execution_time,
"candlestick_patterns": len(
patterns_result.get("candlestick_patterns", [])
),
"chart_patterns": len(
patterns_result.get("chart_patterns", [])
),
"timestamp": time.time(),
}
)
)
return new_state
except Exception as e:
execution_time = time.time() - start_time
logger.error(
json.dumps(
{
"agent": self.AGENT_NAME,
"action": "error",
"ticker": ticker,
"timeframe": timeframe,
"execution_time": execution_time,
"error": str(e),
"timestamp": time.time(),
}
)
)
# Add error message to state
error_state = add_agent_message(
state,
self.AGENT_NAME,
f"Error detecting patterns: {str(e)}",
metadata={"error": True},
)
return error_state
def _detect_patterns(
self, df: pd.DataFrame, timeframe: str, timeframe_info: Dict[str, Any]
) -> Dict[str, Any]:
"""
Detect candlestick and chart patterns with timeframe significance.
Args:
df: OHLC DataFrame
timeframe: Timeframe string (e.g., "1d", "4h")
timeframe_info: Timeframe significance info
Returns:
Dict with pattern results including timeframe context
"""
result = {
"candlestick_patterns": [],
"chart_patterns": [],
"support_levels": [],
"resistance_levels": [],
"trend_lines": [],
"timeframe_context": {
"timeframe": timeframe,
"label": timeframe_info["label"],
"scope": timeframe_info["scope"],
"weight": timeframe_info["weight"],
},
}
# Detect candlestick patterns with timeframe context
candlestick_patterns = self._detect_candlestick_patterns(
df, timeframe, timeframe_info
)
result["candlestick_patterns"] = candlestick_patterns
# Detect chart patterns with timeframe context
chart_patterns = self._detect_chart_patterns(df, timeframe, timeframe_info)
result["chart_patterns"] = chart_patterns
return result
def _detect_candlestick_patterns(
self, df: pd.DataFrame, timeframe: str, timeframe_info: Dict[str, Any]
) -> List[Dict[str, Any]]:
"""
Detect common candlestick patterns with timeframe significance.
Args:
df: OHLC DataFrame
timeframe: Timeframe string
timeframe_info: Timeframe significance info
Returns:
List of detected patterns with timeframe context
"""
patterns = []
# Look at last 10 candles for patterns
window = min(10, len(df))
recent_df = df.iloc[-window:]
for i in range(len(recent_df)):
idx = recent_df.index[i]
row = recent_df.iloc[i]
open_price = row["open"]
close_price = row["close"]
high = row["high"]
low = row["low"]
body_size = abs(close_price - open_price)
total_range = high - low
# Doji pattern (small body)
if total_range > 0 and body_size / total_range < 0.1:
patterns.append(
{
"name": "Doji",
"location": len(df) - window + i,
"date": str(idx),
"signal": "neutral",
"confidence": 0.7
* timeframe_info["weight"], # Adjust confidence by timeframe
"description": f"Indecision in market on {timeframe_info['label']} chart, potential reversal",
"timeframe": timeframe,
"timeframe_label": timeframe_info["label"],
"significance": timeframe_info["scope"],
}
)
# Hammer pattern (bullish reversal)
if total_range > 0:
upper_wick = high - max(open_price, close_price)
lower_wick = min(open_price, close_price) - low
if lower_wick > 2 * body_size and upper_wick < body_size:
patterns.append(
{
"name": "Hammer",
"location": len(df) - window + i,
"date": str(idx),
"signal": "bullish",
"confidence": 0.75 * timeframe_info["weight"],
"description": f"Bullish reversal signal on {timeframe_info['label']} chart, sellers exhausted",
"timeframe": timeframe,
"timeframe_label": timeframe_info["label"],
"significance": timeframe_info["scope"],
}
)
# Shooting Star pattern (bearish reversal)
if upper_wick > 2 * body_size and lower_wick < body_size:
patterns.append(
{
"name": "Shooting Star",
"location": len(df) - window + i,
"date": str(idx),
"signal": "bearish",
"confidence": 0.75 * timeframe_info["weight"],
"description": f"Bearish reversal signal on {timeframe_info['label']} chart, buyers exhausted",
"timeframe": timeframe,
"timeframe_label": timeframe_info["label"],
"significance": timeframe_info["scope"],
}
)
# Engulfing patterns (need previous candle)
if i > 0:
prev_row = recent_df.iloc[i - 1]
prev_open = prev_row["open"]
prev_close = prev_row["close"]
# Bullish engulfing
if (
prev_close < prev_open # Previous candle bearish
and close_price > open_price # Current candle bullish
and open_price < prev_close # Opens below previous close
and close_price > prev_open
): # Closes above previous open
patterns.append(
{
"name": "Bullish Engulfing",
"location": len(df) - window + i,
"date": str(idx),
"signal": "bullish",
"confidence": 0.8 * timeframe_info["weight"],
"description": f"Strong bullish reversal on {timeframe_info['label']} chart, buyers taking control",
"timeframe": timeframe,
"timeframe_label": timeframe_info["label"],
"significance": timeframe_info["scope"],
}
)
# Bearish engulfing
if (
prev_close > prev_open # Previous candle bullish
and close_price < open_price # Current candle bearish
and open_price > prev_close # Opens above previous close
and close_price < prev_open
): # Closes below previous open
patterns.append(
{
"name": "Bearish Engulfing",
"location": len(df) - window + i,
"date": str(idx),
"signal": "bearish",
"confidence": 0.8 * timeframe_info["weight"],
"description": f"Strong bearish reversal on {timeframe_info['label']} chart, sellers taking control",
"timeframe": timeframe,
"timeframe_label": timeframe_info["label"],
"significance": timeframe_info["scope"],
}
)
return patterns
def _detect_chart_patterns(
self, df: pd.DataFrame, timeframe: str, timeframe_info: Dict[str, Any]
) -> List[Dict[str, Any]]:
"""
Detect chart patterns like triangles, channels, head-and-shoulders.
This is a simplified algorithmic approach. In production, this would
use more sophisticated pattern recognition algorithms or TA-Lib.
Args:
df: OHLC DataFrame
timeframe: Timeframe string
timeframe_info: Timeframe significance info
Returns:
List of detected chart patterns with timeframe context
"""
patterns = []
# Check for ascending triangle (flat resistance, rising support)
if len(df) >= 20:
recent_highs = df["high"].iloc[-20:]
recent_lows = df["low"].iloc[-20:]
# Flat top (resistance)
high_std = recent_highs.std()
high_mean = recent_highs.mean()
# Rising lows (support)
first_half_lows = recent_lows.iloc[:10].mean()
second_half_lows = recent_lows.iloc[10:].mean()
if (
high_std / high_mean < 0.02
and second_half_lows > first_half_lows * 1.01
):
patterns.append(
{
"type": "Ascending Triangle",
"confidence": 0.65 * timeframe_info["weight"],
"signal": "bullish",
"description": f"Bullish continuation pattern on {timeframe_info['label']} chart, breakout likely upward",
"resistance": float(high_mean),
"support_trend": "rising",
"timeframe": timeframe,
"timeframe_label": timeframe_info["label"],
"significance": timeframe_info["scope"],
}
)
# Descending triangle (flat support, falling resistance)
low_std = recent_lows.std()
low_mean = recent_lows.mean()
first_half_highs = recent_highs.iloc[:10].mean()
second_half_highs = recent_highs.iloc[10:].mean()
if (
low_std / low_mean < 0.02
and second_half_highs < first_half_highs * 0.99
):
patterns.append(
{
"type": "Descending Triangle",
"confidence": 0.65 * timeframe_info["weight"],
"signal": "bearish",
"description": f"Bearish continuation pattern on {timeframe_info['label']} chart, breakout likely downward",
"support": float(low_mean),
"resistance_trend": "falling",
"timeframe": timeframe,
"timeframe_label": timeframe_info["label"],
"significance": timeframe_info["scope"],
}
)
# Check for head-and-shoulders pattern
if len(df) >= 30:
head_shoulder_pattern = self._detect_head_and_shoulders(df, timeframe_info)
if head_shoulder_pattern:
patterns.append(head_shoulder_pattern)
logger.info(
f"Detected head-and-shoulders pattern: {head_shoulder_pattern}"
)
else:
logger.debug("No head-and-shoulders pattern detected")
# Check for double-bottom or double-top patterns
if len(df) >= 20:
double_patterns = self._detect_double_bottom_top(df, timeframe_info)
if double_patterns:
logger.info(
f"Detected {len(double_patterns)} double-bottom/top patterns"
)
patterns.extend(double_patterns)
else:
logger.debug("No double-bottom/top patterns detected")
logger.info(f"Total chart patterns detected: {len(patterns)}")
return patterns
def _detect_head_and_shoulders(
self, df: pd.DataFrame, timeframe_info: Dict[str, Any]
) -> Optional[Dict[str, Any]]:
"""
Detect head-and-shoulders pattern.
Args:
df: OHLC DataFrame
timeframe_info: Timeframe significance info
Returns:
Pattern dict if detected, None otherwise
"""
# Look at last 30 candlesticks
window = df.iloc[-30:]
highs = window["high"].values
lows = window["low"].values
# Find local maxima (potential shoulders and head)
import numpy as np
from scipy.signal import argrelextrema
try:
peaks = argrelextrema(highs, np.greater, order=3)[0]
if len(peaks) >= 3:
# Check if we have a head-and-shoulders pattern
# (left shoulder, head, right shoulder pattern)
for i in range(len(peaks) - 2):
left_idx = peaks[i]
head_idx = peaks[i + 1]
right_idx = peaks[i + 2]
left_price = highs[left_idx]
head_price = highs[head_idx]
right_price = highs[right_idx]
# Head should be higher than both shoulders
# Shoulders should be roughly equal (within 3%)
if (
head_price > left_price
and head_price > right_price
and abs(left_price - right_price) / left_price < 0.03
):
# Find neckline (lows between peaks)
valley1_idx = left_idx + np.argmin(lows[left_idx:head_idx])
valley2_idx = head_idx + np.argmin(lows[head_idx:right_idx])
neckline_price = (lows[valley1_idx] + lows[valley2_idx]) / 2
return {
"type": "Head and Shoulders",
"confidence": 0.70 * timeframe_info["weight"],
"signal": "bearish",
"description": f"Bearish reversal pattern on {timeframe_info['label']} chart, breakdown likely if neckline breaks",
"timeframe": window.index[0].strftime("%Y-%m-%d"),
"timeframe_label": timeframe_info["label"],
"significance": timeframe_info["scope"],
"points": {
"left_shoulder": int(left_idx),
"head": int(head_idx),
"right_shoulder": int(right_idx),
"neckline": [
(valley1_idx, neckline_price),
(valley2_idx, neckline_price),
],
},
}
except Exception:
pass # scipy not available or pattern not found
return None
def _detect_double_bottom_top(
self, df: pd.DataFrame, timeframe_info: Dict[str, Any]
) -> List[Dict[str, Any]]:
"""
Detect double-bottom or double-top patterns.
Args:
df: OHLC DataFrame
timeframe_info: Timeframe significance info
Returns:
List of detected patterns
"""
patterns = []
window = df.iloc[-20:]
try:
import numpy as np
from scipy.signal import argrelextrema
# Detect double bottom (two lows at similar levels)
lows = window["low"].values
troughs = argrelextrema(lows, np.less, order=2)[0]
if len(troughs) >= 2:
# Check last two troughs for double bottom
first_idx = troughs[-2]
second_idx = troughs[-1]
first_price = lows[first_idx]
second_price = lows[second_idx]
# Prices should be within 2% of each other
if abs(first_price - second_price) / first_price < 0.02:
# Find resistance (peak between the troughs)
middle_peak_idx = first_idx + np.argmax(
window["high"].values[first_idx:second_idx]
)
resistance = window["high"].values[middle_peak_idx]
patterns.append(
{
"type": "Double Bottom",
"confidence": 0.70 * timeframe_info["weight"],
"signal": "bullish",
"description": f"Bullish reversal pattern on {timeframe_info['label']} chart, breakout likely if resistance breaks",
"timeframe": window.index[0].strftime("%Y-%m-%d"),
"timeframe_label": timeframe_info["label"],
"significance": timeframe_info["scope"],
"points": {
"first": int(first_idx),
"second": int(second_idx),
"resistance_support": float(resistance),
},
}
)
# Detect double top (two highs at similar levels)
highs = window["high"].values
peaks = argrelextrema(highs, np.greater, order=2)[0]
if len(peaks) >= 2:
# Check last two peaks for double top
first_idx = peaks[-2]
second_idx = peaks[-1]
first_price = highs[first_idx]
second_price = highs[second_idx]
# Prices should be within 2% of each other
if abs(first_price - second_price) / first_price < 0.02:
# Find support (trough between the peaks)
middle_trough_idx = first_idx + np.argmin(
window["low"].values[first_idx:second_idx]
)
support = window["low"].values[middle_trough_idx]
patterns.append(
{
"type": "Double Top",
"confidence": 0.70 * timeframe_info["weight"],
"signal": "bearish",
"description": f"Bearish reversal pattern on {timeframe_info['label']} chart, breakdown likely if support breaks",
"timeframe": window.index[0].strftime("%Y-%m-%d"),
"timeframe_label": timeframe_info["label"],
"significance": timeframe_info["scope"],
"points": {
"first": int(first_idx),
"second": int(second_idx),
"resistance_support": float(support),
},
}
)
except Exception:
pass # scipy not available or patterns not found
return patterns
def _interpret_with_llm(
self,
ticker: str,
timeframe: str,
patterns: Dict[str, Any],
df: pd.DataFrame,
investment_style: Optional[str] = None,
cost_tracker=None,
) -> str:
"""
Use LLM to interpret pattern significance.
Args:
ticker: Asset ticker
timeframe: Analysis timeframe
patterns: Detected patterns
df: OHLC DataFrame
investment_style: Investment style for context
cost_tracker: Optional cost tracker for tracking LLM costs
Returns:
LLM interpretation string
"""
current_price = float(df["close"].iloc[-1])
summary_parts = [
f"Asset: {ticker}",
f"Timeframe: {timeframe}",
f"Current Price: ${current_price:.2f}",
"",
"Pattern Analysis:",
]
# Candlestick patterns
if patterns.get("candlestick_patterns"):
summary_parts.append("\nCandlestick Patterns:")
for pattern in patterns["candlestick_patterns"]:
summary_parts.append(
f"- {pattern['name']} ({pattern['signal']}, confidence: {pattern['confidence']:.0%})"
)
summary_parts.append(f" {pattern['description']}")
else:
summary_parts.append("\nNo significant candlestick patterns detected")
# Chart patterns
if patterns.get("chart_patterns"):
summary_parts.append("\nChart Patterns:")
for pattern in patterns["chart_patterns"]:
summary_parts.append(
f"- {pattern['type']} ({pattern['signal']}, confidence: {pattern['confidence']:.0%})"
)
summary_parts.append(f" {pattern['description']}")
else:
summary_parts.append("\nNo major chart patterns detected")
# Support/Resistance levels
if patterns.get("support_levels"):
summary_parts.append(
f"\nSupport Levels: {[f'${s:.2f}' for s in patterns['support_levels']]}"
)
if patterns.get("resistance_levels"):
summary_parts.append(
f"Resistance Levels: {[f'${r:.2f}' for r in patterns['resistance_levels']]}"
)
pattern_summary = "\n".join(summary_parts)
# Get investment style context
style_context = get_technical_analysis_style_context(investment_style)
# LLM prompt with specialized pattern template
system_prompt = f"""{PATTERN_AGENT_PROMPT}
Investment Style Context:
{style_context}
IMPORTANT: Your response MUST follow the exact structure shown in the template above, including:
- Markdown section headers (##)
- Data tables with proper markdown table syntax (| pipes)
- Bullet-pointed insights (-)
- Numbered summary points (1., 2., 3.)
- Clear conclusion with trading implication"""
user_prompt = f"""Analyze the following pattern data for {ticker} ({timeframe} timeframe) and provide a comprehensive technical analysis following the template structure:
{pattern_summary}
Generate your response following the exact template structure with all sections, tables, bullet points, and numbered summary."""
# Call LLM with cost tracking callback
messages = [
SystemMessage(content=system_prompt),
HumanMessage(content=user_prompt),
]
# Create callback if cost tracker is available
if cost_tracker:
callback = cost_tracker.get_callback(agent_name=self.AGENT_NAME)
response = self.llm.invoke(messages, config={"callbacks": [callback]})
else:
response = self.llm.invoke(messages)
return response.content
def _deserialize_dataframe(self, data: Dict[str, Any]) -> pd.DataFrame:
"""
Convert serialized data back to DataFrame.
Args:
data: Serialized DataFrame data
Returns:
pandas DataFrame
"""
df = pd.DataFrame(data)
if "Date" in df.columns:
df["Date"] = pd.to_datetime(df["Date"])
df = df.set_index("Date")
return df