""" Robust TALib + mplfinance + Gradio example - Normalizes column names (handles tuple/MultiIndex columns) - Keeps Volume if present - Converts everything numeric and drops NaN OHLC rows - Calls TALib pattern functions safely and plots with mplfinance """ import yfinance as yf import pandas as pd import numpy as np import mplfinance as mpf import talib import gradio as gr from datetime import date import os from typing import Optional, Tuple, List # Pattern list from talib (CDL* functions) TALIB_PATTERNS = sorted([name for name in dir(talib) if name.startswith("CDL")]) def _normalize_col_name(col) -> str: """Turn any column key into a simple lowercase string. Handles tuples (MultiIndex) by joining parts with '_'.""" if isinstance(col, (tuple, list)): # join non-empty parts, convert to string parts = [str(c) for c in col if c is not None] joined = "_".join(parts) return joined.strip().lower() return str(col).strip().lower() def _find_best_col(key: str, columns: List[str]) -> Optional[str]: """Given normalized columns, find best matching column for key (open/high/low/close/volume).""" key = key.lower() # Exact match if key in columns: return key # Prefer suffix matches (e.g., msft_open) for c in columns: if c.endswith("_" + key): return c # Fallback: any column containing the key for c in columns: if key in c: return c return None def clean_ohlc(df: pd.DataFrame) -> pd.DataFrame: """ Normalize column names, find OHLC (and optional Volume), convert to numeric, drop NaNs and ensure DatetimeIndex. Returns a DataFrame with columns: ['Open','High','Low','Close'] and optionally 'Volume'. Raises ValueError on irrecoverable problems. """ if df is None: raise ValueError("Gelen dataframe None.") if not isinstance(df, pd.DataFrame): raise ValueError(f"Gelen obje DataFrame değil: {type(df)}") # Make a copy to avoid mutating caller's df df_work = df.copy() # Flatten column names into simple lowercase strings try: normalized_columns = [_normalize_col_name(c) for c in df_work.columns] except Exception as e: # worst-case fallback: convert all to string then normalize normalized_columns = [str(c).strip().lower() for c in df_work.columns] # assign normalized columns temporarily (we'll keep mapping) col_map = dict(zip(normalized_columns, df_work.columns)) df_work.columns = normalized_columns # find required columns required_keys = ['open', 'high', 'low', 'close'] found = {} for k in required_keys: matched = _find_best_col(k, normalized_columns) if matched is None: raise ValueError(f"Veride '{k}' sütunu bulunamadı. Mevcut sütunlar: {list(normalized_columns)}") found[k] = matched # optional volume vol_key = _find_best_col('volume', normalized_columns) include_volume = vol_key is not None # select and rename to canonical names cols_to_take = [found['open'], found['high'], found['low'], found['close']] if include_volume: cols_to_take.append(vol_key) df_sel = df_work[cols_to_take].copy() rename_map = { found['open']: 'Open', found['high']: 'High', found['low']: 'Low', found['close']: 'Close' } if include_volume: rename_map[vol_key] = 'Volume' df_sel = df_sel.rename(columns=rename_map) # index -> DatetimeIndex if possible if not isinstance(df_sel.index, pd.DatetimeIndex): # try 'date' column if exists if 'date' in df_sel.columns: try: df_sel.index = pd.to_datetime(df_sel['date'], errors='coerce') df_sel = df_sel.drop(columns=['date']) except Exception: pass else: # try to parse existing index try: df_sel.index = pd.to_datetime(df_sel.index, errors='coerce') except Exception: pass # convert to numeric numeric_cols = ['Open', 'High', 'Low', 'Close'] if include_volume: numeric_cols.append('Volume') df_sel[numeric_cols] = df_sel[numeric_cols].apply(pd.to_numeric, errors='coerce') # drop rows with NaN OHLC or NaT index if not isinstance(df_sel.index, pd.DatetimeIndex): # if index still not datetime, try to reset index and parse date column df_sel = df_sel.reset_index() if 'index' in df_sel.columns: try: df_sel['index'] = pd.to_datetime(df_sel['index'], errors='coerce') df_sel = df_sel.set_index('index') except Exception: pass # drop any rows with NaT index now if isinstance(df_sel.index, pd.DatetimeIndex): df_sel = df_sel[~df_sel.index.isna()] df_sel = df_sel.dropna(subset=['Open', 'High', 'Low', 'Close']).copy() if df_sel.empty: raise ValueError("value error.") # ensure ordering ascending by date df_sel = df_sel.sort_index() return df_sel def find_candlestick_patterns(df: pd.DataFrame, pattern_name: str) -> Tuple[List, Optional[str]]: """ Run TALib pattern and return list of addplots (apds) for mplfinance, or ([], error_message) on error. """ # check callable pattern try: pattern_func = getattr(talib, pattern_name) except AttributeError: return [], f"Error: '{pattern_name}' formasyonu TALib'te bulunamadı." if not callable(pattern_func): return [], f"Error: '{pattern_name}' TALib içinde callable değil." # clean and prepare data try: df_clean = clean_ohlc(df) except Exception as e: return [], f"Veri temizleme hatası: {e}" try: open_arr = np.asarray(df_clean['Open'].values, dtype=float).ravel() high_arr = np.asarray(df_clean['High'].values, dtype=float).ravel() low_arr = np.asarray(df_clean['Low'].values, dtype=float).ravel() close_arr= np.asarray(df_clean['Close'].values, dtype=float).ravel() # sanity check lengths n = len(open_arr) if not (len(high_arr) == len(low_arr) == len(close_arr) == n): return [], "Error: OHLC array uzunlukları eşit değil." except Exception as e: return [], f"Array dönüşüm hatası: {e}" # call talib try: pattern_result = pattern_func(open_arr, high_arr, low_arr, close_arr) pattern_series = pd.Series(pattern_result, index=df_clean.index) except Exception as e: return [], f"TALib çalıştırma hatası: {e}" apds = [] # bullish (positive) bull_idx = pattern_series[pattern_series > 0].index if len(bull_idx) > 0: bullish_points = pd.Series(np.nan, index=df_clean.index) # vectorized assignment for indices bullish_points.loc[bull_idx] = df_clean.loc[bull_idx, 'Low'] * 0.98 apds.append( mpf.make_addplot( bullish_points, type='scatter', marker='^', markersize=60, color='green', panel=0, alpha=0.85 ) ) # bearish (negative) bear_idx = pattern_series[pattern_series < 0].index if len(bear_idx) > 0: bearish_points = pd.Series(np.nan, index=df_clean.index) bearish_points.loc[bear_idx] = df_clean.loc[bear_idx, 'High'] * 1.02 apds.append( mpf.make_addplot( bearish_points, type='scatter', marker='v', markersize=60, color='red', panel=0, alpha=0.85 ) ) return apds, None def plot_stock_with_patterns(ticker_symbol: str, start_date: str, end_date: str, selected_patterns) -> Tuple[Optional[str], str]: """ Main handler for Gradio. Returns (image_filepath or None, status_message). """ if not ticker_symbol: return None, "Error: stock symbol could not empty." try: start = pd.to_datetime(start_date) end = pd.to_datetime(end_date) if start >= end: return None, "Error: date error." except Exception: return None, "Error: invalid date format" # download try: df = yf.download(ticker_symbol, start=start_date, end=end_date, progress=False) if df is None or (isinstance(df, pd.DataFrame) and df.empty): return None, f"Error: '{ticker_symbol}' için veri bulunamadı (yfinance boş döndü)." except Exception as e: return None, f"Veri indirilirken hata oluştu: {e}" # Clean once here to feed to mplfinance and TALib try: df_clean = clean_ohlc(df) except Exception as e: return None, f"Veri temizleme hatası: {e}" # patterns all_apds = [] if selected_patterns: # selected_patterns might be tuple/list/str if isinstance(selected_patterns, str): selected_patterns = [selected_patterns] for pattern_name in selected_patterns: pattern_apds, err = find_candlestick_patterns(df_clean, pattern_name) if err: return None, err all_apds.extend(pattern_apds) # prepare fig path os.makedirs('/tmp', exist_ok=True) safe_ticker = str(ticker_symbol).replace("/", "_").replace("\\", "_") fig_path = f"/tmp/stock_chart_{safe_ticker}_{pd.Timestamp.now().strftime('%Y%m%d%H%M%S')}.png" # style s = mpf.make_mpf_style(base_mpf_style='yahoo', mavcolors=['#1f77b4', '#ff7f0e', '#2ca02c']) # detect if volume exists has_volume = 'Volume' in df_clean.columns try: fig, _ = mpf.plot( df_clean, type='candle', volume=has_volume, addplot=all_apds if all_apds else None, style=s, title=f"\n{ticker_symbol} Candlestick Chart", ylabel='Price', ylabel_lower='Volume' if has_volume else None, returnfig=True, figscale=1.5 ) fig.savefig(fig_path, dpi=150, bbox_inches='tight') return fig_path, "Successfully created!" except Exception as e: return None, f"error: {e}" # Gradio arayüzü iface = gr.Interface( fn=plot_stock_with_patterns, inputs=[ gr.Textbox(label="Stock Symbol (örn: MSFT, AAPL, ^GSPC)", value="MSFT"), gr.Textbox(label="Starting Date (YYYY-MM-DD)", value="2024-01-01"), gr.Textbox(label="End Date (YYYY-MM-DD)", value=date.today().strftime("%Y-%m-%d")), gr.CheckboxGroup( label="Select one (TALib)", choices=TALIB_PATTERNS, value=["CDLHAMMER", "CDLSHOOTINGSTAR"] ) ], outputs=[ gr.Image(type="filepath", label="Hisse Grafiği"), gr.Textbox(label="Durum") ], title="TALib Tool", description="bullish green, bearish red" ) if __name__ == "__main__": iface.launch()