File size: 22,713 Bytes
a1bf219
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
"""
Indicator Agent for technical indicator calculation and interpretation.

This agent computes technical indicators (RSI, MACD, Stochastic) and provides
interpretation of their values for trading decisions.
"""

import json
import logging
import time
from typing import Any, Dict, Optional

import pandas as pd
from langchain_core.messages import HumanMessage, SystemMessage

# Configure logger
logger = logging.getLogger(__name__)

from config.default_config import DEFAULT_CONFIG
from config.models import AGENT_MODELS
from config.prompt_templates import INDICATOR_AGENT_PROMPT
from graph.state.agent_state import add_agent_message, update_analysis_result
from graph.state.trading_state import TechnicalWorkflowState
from utils.charts.chart_generator import ChartGenerator
from utils.formatters.educational_content import (
    generate_macd_explanation,
    generate_rsi_explanation,
    generate_stochastic_explanation,
)
from utils.indicators import calculate_macd, calculate_rsi, calculate_stochastic
from utils.indicators.macd import (
    find_macd_crossovers,
    find_macd_divergence,
    interpret_macd,
)
from utils.indicators.rsi import find_rsi_divergence, interpret_rsi
from utils.indicators.stochastic import find_stochastic_crossovers, interpret_stochastic
from utils.investment_style_helpers import (
    get_investment_style_from_state,
    get_technical_analysis_style_context,
)
from utils.llm.provider_factory import LLMProviderFactory


class IndicatorAgent:
    """
    Technical Indicator Agent.

    Responsibilities:
    - Calculate RSI, MACD, Stochastic Oscillator
    - Interpret indicator values (overbought/oversold, bullish/bearish)
    - Detect divergences and crossovers
    - Provide trading signals based on indicators
    """

    AGENT_NAME = "indicator_agent"

    def __init__(self, config: Optional[Dict[str, Any]] = None):
        """
        Initialize Indicator Agent.

        Args:
            config: Optional configuration override
        """
        self.config = config or DEFAULT_CONFIG

        # Initialize LLM - use runtime provider override if available
        from config.models import DEFAULT_MODELS_BY_PROVIDER

        model_config = AGENT_MODELS[self.AGENT_NAME]
        runtime_provider = self.config.get("llm_provider", model_config["provider"])

        # If provider is overridden but model is not, use default model for that provider
        if "llm_provider" in self.config and "llm_model" not in self.config:
            runtime_model = DEFAULT_MODELS_BY_PROVIDER.get(
                runtime_provider, model_config["model"]
            )
        else:
            runtime_model = self.config.get("llm_model", model_config["model"])

        self.llm = LLMProviderFactory.create(
            provider=runtime_provider,
            model=runtime_model,
            temperature=model_config["temperature"],
        )

        # Indicator parameters
        self.indicator_params = self.config["indicator_parameters"]

        # Initialize chart generator
        self.chart_generator = ChartGenerator()

    def run(self, state: TechnicalWorkflowState) -> TechnicalWorkflowState:
        """
        Execute indicator analysis.

        Args:
            state: Current workflow state

        Returns:
            Updated state with indicator analysis
        """
        start_time = time.time()
        ticker = state.get("ticker", "UNKNOWN")
        timeframe = state.get("timeframe", "UNKNOWN")

        logger.info(
            json.dumps(
                {
                    "agent": self.AGENT_NAME,
                    "action": "start",
                    "ticker": ticker,
                    "timeframe": timeframe,
                    "timestamp": time.time(),
                }
            )
        )

        try:
            # Extract market data
            market_data = state["market_data"]
            if not market_data.get("ohlc_data"):
                raise ValueError("No OHLC data available for indicator calculation")

            # Convert serialized DataFrame back to pandas DataFrame
            df = self._deserialize_dataframe(market_data["ohlc_data"])

            # Calculate indicators
            indicators_result = self._calculate_indicators(df)

            # Get investment style from state
            investment_style = get_investment_style_from_state(state)

            # Generate charts and educational notes (User Story 5)
            chart_paths = []
            educational_notes = []

            try:
                # Check if educational mode is enabled
                config = state.get("config", {})
                educational_mode = (
                    config.get("educational_mode", False)
                    if isinstance(config, dict)
                    else False
                )

                # Generate RSI chart
                if "rsi" in indicators_result and "value" in indicators_result["rsi"]:
                    rsi_value = indicators_result["rsi"]["value"]
                    rsi_series_dict = indicators_result["rsi"].get("series")

                    if rsi_series_dict is not None:
                        # Convert dict back to Series
                        rsi_series = pd.Series(rsi_series_dict)

                        fig, filepath = self.chart_generator.generate_rsi_chart(
                            df=df,
                            rsi_series=rsi_series,
                            ticker=ticker,
                            timeframe=timeframe,
                            rsi_period=self.indicator_params["rsi_period"],
                            save=True,
                        )
                        if filepath:
                            chart_paths.append(filepath)
                        self.chart_generator.close_figure(fig)

                    # Add educational note if enabled
                    if educational_mode:
                        educational_notes.append(
                            f"**RSI**: {generate_rsi_explanation(rsi_value)}"
                        )

                # Generate MACD chart
                if "macd" in indicators_result:
                    macd_data = indicators_result["macd"]
                    series_dict = macd_data.get("series", {})

                    logger.info(
                        f"MACD data available: series_dict keys = {list(series_dict.keys()) if series_dict else 'None'}"
                    )

                    if (
                        series_dict
                        and "macd" in series_dict
                        and "signal" in series_dict
                        and "histogram" in series_dict
                    ):
                        # Convert dicts back to Series
                        macd_series = pd.Series(series_dict["macd"])
                        signal_series = pd.Series(series_dict["signal"])
                        histogram_series = pd.Series(series_dict["histogram"])

                        logger.info(
                            f"Generating MACD chart: macd_len={len(macd_series)}, signal_len={len(signal_series)}, hist_len={len(histogram_series)}"
                        )

                        fig, filepath = self.chart_generator.generate_macd_chart(
                            df=df,
                            macd=macd_series,
                            signal=signal_series,
                            histogram=histogram_series,
                            ticker=ticker,
                            timeframe=timeframe,
                            save=True,
                        )
                        logger.info(f"MACD chart generated: filepath={filepath}")
                        if filepath:
                            chart_paths.append(filepath)
                        self.chart_generator.close_figure(fig)
                    else:
                        logger.warning(
                            f"MACD chart skipped - missing series data. series_dict keys: {list(series_dict.keys()) if series_dict else 'None'}"
                        )

                    # Add educational note if enabled
                    if (
                        educational_mode
                        and "macd" in macd_data
                        and "signal" in macd_data
                        and "histogram" in macd_data
                    ):
                        educational_notes.append(
                            f"**MACD**: {generate_macd_explanation(macd_data['macd'], macd_data['signal'], macd_data['histogram'])}"
                        )

                # Generate Stochastic chart
                if "stochastic" in indicators_result:
                    stoch_data = indicators_result["stochastic"]
                    series_dict = stoch_data.get("series", {})

                    if series_dict and "k" in series_dict and "d" in series_dict:
                        # Convert dicts back to Series
                        k_series = pd.Series(series_dict["k"])
                        d_series = pd.Series(series_dict["d"])

                        fig, filepath = self.chart_generator.generate_stochastic_chart(
                            df=df,
                            k_series=k_series,
                            d_series=d_series,
                            ticker=ticker,
                            timeframe=timeframe,
                            save=True,
                        )
                        if filepath:
                            chart_paths.append(filepath)
                        self.chart_generator.close_figure(fig)

                    # Add educational note if enabled
                    if educational_mode and "k" in stoch_data and "d" in stoch_data:
                        educational_notes.append(
                            f"**Stochastic**: {generate_stochastic_explanation(stoch_data['k'], stoch_data['d'])}"
                        )

            except Exception as chart_error:
                logger.warning(
                    json.dumps(
                        {
                            "agent": self.AGENT_NAME,
                            "action": "chart_generation_warning",
                            "ticker": ticker,
                            "error": str(chart_error),
                            "timestamp": time.time(),
                        }
                    )
                )

            # Extract cost tracker from state
            cost_tracker = state.get("_cost_tracker")

            # Interpret indicators using LLM
            interpretation = self._interpret_with_llm(
                state["ticker"],
                state["timeframe"],
                indicators_result,
                df,
                investment_style,
                cost_tracker,
            )

            # Append educational notes to interpretation if available
            if educational_notes:
                interpretation += "\n\n### 📚 Educational Notes\n\n" + "\n\n".join(
                    educational_notes
                )

            # Update state
            new_state = update_analysis_result(state, "indicators", indicators_result)
            new_state = add_agent_message(
                new_state,
                self.AGENT_NAME,
                interpretation,
                metadata={
                    "indicators": indicators_result,
                    "chart_paths": chart_paths,
                    "educational_mode": educational_mode,
                },
            )

            execution_time = time.time() - start_time
            logger.info(
                json.dumps(
                    {
                        "agent": self.AGENT_NAME,
                        "action": "complete",
                        "ticker": ticker,
                        "timeframe": timeframe,
                        "execution_time": execution_time,
                        "indicators_calculated": list(indicators_result.keys()),
                        "timestamp": time.time(),
                    }
                )
            )

            return new_state

        except Exception as e:
            execution_time = time.time() - start_time
            logger.error(
                json.dumps(
                    {
                        "agent": self.AGENT_NAME,
                        "action": "error",
                        "ticker": ticker,
                        "timeframe": timeframe,
                        "execution_time": execution_time,
                        "error": str(e),
                        "timestamp": time.time(),
                    }
                )
            )

            # Add error message to state
            error_state = add_agent_message(
                state,
                self.AGENT_NAME,
                f"Error calculating indicators: {str(e)}",
                metadata={"error": True},
            )
            return error_state

    def _calculate_indicators(self, df: pd.DataFrame) -> Dict[str, Any]:
        """
        Calculate all technical indicators.

        Args:
            df: OHLC DataFrame

        Returns:
            Dict with indicator results
        """
        result = {}

        # RSI
        try:
            rsi_series = calculate_rsi(
                df,
                period=self.indicator_params["rsi_period"],
            )
            current_rsi = float(rsi_series.iloc[-1])
            rsi_interpretation = interpret_rsi(current_rsi)

            # Try to find divergences, but don't fail if it doesn't work
            try:
                rsi_divergence = find_rsi_divergence(df, rsi_series)
            except Exception:
                rsi_divergence = {"bullish": [], "bearish": []}

            result["rsi"] = {
                "value": current_rsi,
                "interpretation": rsi_interpretation,
                "divergences": rsi_divergence,
                "series": rsi_series.to_dict(),  # For charting
            }
        except Exception as e:
            result["rsi"] = {"error": str(e)}

        # MACD
        try:
            logger.info(f"Calculating MACD with {len(df)} data points")
            macd, signal, hist = calculate_macd(
                df,
                fast_period=self.indicator_params["macd_fast"],
                slow_period=self.indicator_params["macd_slow"],
                signal_period=self.indicator_params["macd_signal"],
            )
            logger.info(
                f"MACD calculation succeeded: macd_len={len(macd)}, valid_values={(~pd.isna(macd)).sum()}"
            )

            current_macd = float(macd.iloc[-1]) if not pd.isna(macd.iloc[-1]) else None
            current_signal = (
                float(signal.iloc[-1]) if not pd.isna(signal.iloc[-1]) else None
            )
            current_hist = float(hist.iloc[-1]) if not pd.isna(hist.iloc[-1]) else None

            prev_hist = (
                float(hist.iloc[-2])
                if len(hist) > 1 and not pd.isna(hist.iloc[-2])
                else None
            )

            macd_interpretation = interpret_macd(
                current_macd, current_signal, current_hist, prev_hist
            )

            # Try to find crossovers and divergences, but don't fail if it doesn't work
            try:
                macd_crossovers = find_macd_crossovers(macd, signal)
            except Exception:
                macd_crossovers = {"bullish": [], "bearish": []}

            try:
                macd_divergence = find_macd_divergence(df, macd)
            except Exception:
                macd_divergence = {"bullish": [], "bearish": []}

            result["macd"] = {
                "macd": current_macd,
                "signal": current_signal,
                "histogram": current_hist,
                "interpretation": macd_interpretation,
                "crossovers": macd_crossovers,
                "divergences": macd_divergence,
                "series": {
                    "macd": macd.to_dict(),
                    "signal": signal.to_dict(),
                    "histogram": hist.to_dict(),
                },
            }
        except Exception as e:
            logger.error(f"MACD calculation failed: {str(e)}")
            result["macd"] = {"error": str(e)}

        # Stochastic Oscillator
        try:
            k_series, d_series = calculate_stochastic(
                df,
                k_period=self.indicator_params["stoch_k_period"],
                d_period=self.indicator_params["stoch_d_period"],
            )

            current_k = (
                float(k_series.iloc[-1]) if not pd.isna(k_series.iloc[-1]) else None
            )
            current_d = (
                float(d_series.iloc[-1]) if not pd.isna(d_series.iloc[-1]) else None
            )

            prev_k = (
                float(k_series.iloc[-2])
                if len(k_series) > 1 and not pd.isna(k_series.iloc[-2])
                else None
            )
            prev_d = (
                float(d_series.iloc[-2])
                if len(d_series) > 1 and not pd.isna(d_series.iloc[-2])
                else None
            )

            stoch_interpretation = interpret_stochastic(
                current_k, current_d, prev_k, prev_d
            )
            stoch_crossovers = find_stochastic_crossovers(k_series, d_series)

            result["stochastic"] = {
                "k": current_k,
                "d": current_d,
                "interpretation": stoch_interpretation,
                "crossovers": stoch_crossovers,
                "series": {
                    "k": k_series.to_dict(),
                    "d": d_series.to_dict(),
                },
            }
        except Exception as e:
            result["stochastic"] = {"error": str(e)}

        return result

    def _interpret_with_llm(
        self,
        ticker: str,
        timeframe: str,
        indicators: Dict[str, Any],
        df: pd.DataFrame,
        investment_style: Optional[str] = None,
        cost_tracker=None,
    ) -> str:
        """
        Use LLM to interpret indicator signals holistically.

        Args:
            ticker: Asset ticker
            timeframe: Analysis timeframe
            indicators: Calculated indicators
            df: OHLC DataFrame
            investment_style: Investment style for context
            cost_tracker: Optional cost tracker for tracking LLM costs

        Returns:
            LLM interpretation string
        """
        # Prepare indicator summary
        current_price = float(df["close"].iloc[-1])

        summary_parts = [
            f"Asset: {ticker}",
            f"Timeframe: {timeframe}",
            f"Current Price: ${current_price:.2f}",
            "",
            "Technical Indicators:",
        ]

        # RSI
        if "rsi" in indicators and "value" in indicators["rsi"]:
            rsi = indicators["rsi"]
            summary_parts.append(
                f"- RSI({self.indicator_params['rsi_period']}): {rsi['value']:.2f}"
            )
            summary_parts.append(f"  {rsi['interpretation']}")
            if rsi.get("divergences", {}).get("bullish"):
                summary_parts.append(
                    f"  Bullish divergences detected at indices: {rsi['divergences']['bullish']}"
                )
            if rsi.get("divergences", {}).get("bearish"):
                summary_parts.append(
                    f"  Bearish divergences detected at indices: {rsi['divergences']['bearish']}"
                )

        # MACD
        if "macd" in indicators and "macd" in indicators["macd"]:
            macd = indicators["macd"]
            summary_parts.append(
                f"- MACD: {macd['macd']:.4f}, Signal: {macd['signal']:.4f}, Histogram: {macd['histogram']:.4f}"
            )
            summary_parts.append(f"  {macd['interpretation']}")
            if macd.get("crossovers", {}).get("bullish"):
                summary_parts.append(
                    f"  Recent bullish crossovers at indices: {macd['crossovers']['bullish'][-3:]}"
                )
            if macd.get("crossovers", {}).get("bearish"):
                summary_parts.append(
                    f"  Recent bearish crossovers at indices: {macd['crossovers']['bearish'][-3:]}"
                )

        # Stochastic
        if "stochastic" in indicators and "k" in indicators["stochastic"]:
            stoch = indicators["stochastic"]
            summary_parts.append(
                f"- Stochastic: %K={stoch['k']:.2f}, %D={stoch['d']:.2f}"
            )
            summary_parts.append(f"  {stoch['interpretation']}")

        indicator_summary = "\n".join(summary_parts)

        # Get investment style context
        style_context = get_technical_analysis_style_context(investment_style)

        # LLM prompt with specialized indicator template
        system_prompt = f"""{INDICATOR_AGENT_PROMPT}

Investment Style Context:
{style_context}

IMPORTANT: Your response MUST follow the exact structure shown in the template above, including:
- Markdown section headers (##)
- Data tables with proper markdown table syntax (| pipes)
- Bullet-pointed insights (-)
- Numbered summary points (1., 2., 3.)
- Clear conclusion with trading implication"""

        user_prompt = f"""Analyze the following technical indicators for {ticker} ({timeframe} timeframe) and provide a comprehensive technical analysis following the template structure:

{indicator_summary}

Generate your response following the exact template structure with all sections, tables, bullet points, and numbered summary."""

        # Call LLM with cost tracking callback
        messages = [
            SystemMessage(content=system_prompt),
            HumanMessage(content=user_prompt),
        ]

        # Create callback if cost tracker is available
        if cost_tracker:
            callback = cost_tracker.get_callback(agent_name=self.AGENT_NAME)
            response = self.llm.invoke(messages, config={"callbacks": [callback]})
        else:
            response = self.llm.invoke(messages)

        return response.content

    def _deserialize_dataframe(self, data: Dict[str, Any]) -> pd.DataFrame:
        """
        Convert serialized data back to DataFrame.

        Args:
            data: Serialized DataFrame data

        Returns:
            pandas DataFrame
        """
        # Assuming data is stored as dict with columns
        # This will be properly implemented when we serialize in the workflow
        df = pd.DataFrame(data)
        if "Date" in df.columns:
            df["Date"] = pd.to_datetime(df["Date"])
            df = df.set_index("Date")
        return df