Spaces:
Sleeping
Sleeping
| """ | |
| Robust TALib + mplfinance + Gradio example | |
| - Normalizes column names (handles tuple/MultiIndex columns) | |
| - Keeps Volume if present | |
| - Converts everything numeric and drops NaN OHLC rows | |
| - Calls TALib pattern functions safely and plots with mplfinance | |
| """ | |
| import yfinance as yf | |
| import pandas as pd | |
| import numpy as np | |
| import mplfinance as mpf | |
| import talib | |
| import gradio as gr | |
| from datetime import date | |
| import os | |
| from typing import Optional, Tuple, List | |
| # Pattern list from talib (CDL* functions) | |
| TALIB_PATTERNS = sorted([name for name in dir(talib) if name.startswith("CDL")]) | |
| def _normalize_col_name(col) -> str: | |
| """Turn any column key into a simple lowercase string. | |
| Handles tuples (MultiIndex) by joining parts with '_'.""" | |
| if isinstance(col, (tuple, list)): | |
| # join non-empty parts, convert to string | |
| parts = [str(c) for c in col if c is not None] | |
| joined = "_".join(parts) | |
| return joined.strip().lower() | |
| return str(col).strip().lower() | |
| def _find_best_col(key: str, columns: List[str]) -> Optional[str]: | |
| """Given normalized columns, find best matching column for key (open/high/low/close/volume).""" | |
| key = key.lower() | |
| # Exact match | |
| if key in columns: | |
| return key | |
| # Prefer suffix matches (e.g., msft_open) | |
| for c in columns: | |
| if c.endswith("_" + key): | |
| return c | |
| # Fallback: any column containing the key | |
| for c in columns: | |
| if key in c: | |
| return c | |
| return None | |
| def clean_ohlc(df: pd.DataFrame) -> pd.DataFrame: | |
| """ | |
| Normalize column names, find OHLC (and optional Volume), convert to numeric, | |
| drop NaNs and ensure DatetimeIndex. Returns a DataFrame with columns: | |
| ['Open','High','Low','Close'] and optionally 'Volume'. | |
| Raises ValueError on irrecoverable problems. | |
| """ | |
| if df is None: | |
| raise ValueError("Gelen dataframe None.") | |
| if not isinstance(df, pd.DataFrame): | |
| raise ValueError(f"Gelen obje DataFrame değil: {type(df)}") | |
| # Make a copy to avoid mutating caller's df | |
| df_work = df.copy() | |
| # Flatten column names into simple lowercase strings | |
| try: | |
| normalized_columns = [_normalize_col_name(c) for c in df_work.columns] | |
| except Exception as e: | |
| # worst-case fallback: convert all to string then normalize | |
| normalized_columns = [str(c).strip().lower() for c in df_work.columns] | |
| # assign normalized columns temporarily (we'll keep mapping) | |
| col_map = dict(zip(normalized_columns, df_work.columns)) | |
| df_work.columns = normalized_columns | |
| # find required columns | |
| required_keys = ['open', 'high', 'low', 'close'] | |
| found = {} | |
| for k in required_keys: | |
| matched = _find_best_col(k, normalized_columns) | |
| if matched is None: | |
| raise ValueError(f"Veride '{k}' sütunu bulunamadı. Mevcut sütunlar: {list(normalized_columns)}") | |
| found[k] = matched | |
| # optional volume | |
| vol_key = _find_best_col('volume', normalized_columns) | |
| include_volume = vol_key is not None | |
| # select and rename to canonical names | |
| cols_to_take = [found['open'], found['high'], found['low'], found['close']] | |
| if include_volume: | |
| cols_to_take.append(vol_key) | |
| df_sel = df_work[cols_to_take].copy() | |
| rename_map = { | |
| found['open']: 'Open', | |
| found['high']: 'High', | |
| found['low']: 'Low', | |
| found['close']: 'Close' | |
| } | |
| if include_volume: | |
| rename_map[vol_key] = 'Volume' | |
| df_sel = df_sel.rename(columns=rename_map) | |
| # index -> DatetimeIndex if possible | |
| if not isinstance(df_sel.index, pd.DatetimeIndex): | |
| # try 'date' column if exists | |
| if 'date' in df_sel.columns: | |
| try: | |
| df_sel.index = pd.to_datetime(df_sel['date'], errors='coerce') | |
| df_sel = df_sel.drop(columns=['date']) | |
| except Exception: | |
| pass | |
| else: | |
| # try to parse existing index | |
| try: | |
| df_sel.index = pd.to_datetime(df_sel.index, errors='coerce') | |
| except Exception: | |
| pass | |
| # convert to numeric | |
| numeric_cols = ['Open', 'High', 'Low', 'Close'] | |
| if include_volume: | |
| numeric_cols.append('Volume') | |
| df_sel[numeric_cols] = df_sel[numeric_cols].apply(pd.to_numeric, errors='coerce') | |
| # drop rows with NaN OHLC or NaT index | |
| if not isinstance(df_sel.index, pd.DatetimeIndex): | |
| # if index still not datetime, try to reset index and parse date column | |
| df_sel = df_sel.reset_index() | |
| if 'index' in df_sel.columns: | |
| try: | |
| df_sel['index'] = pd.to_datetime(df_sel['index'], errors='coerce') | |
| df_sel = df_sel.set_index('index') | |
| except Exception: | |
| pass | |
| # drop any rows with NaT index now | |
| if isinstance(df_sel.index, pd.DatetimeIndex): | |
| df_sel = df_sel[~df_sel.index.isna()] | |
| df_sel = df_sel.dropna(subset=['Open', 'High', 'Low', 'Close']).copy() | |
| if df_sel.empty: | |
| raise ValueError("value error.") | |
| # ensure ordering ascending by date | |
| df_sel = df_sel.sort_index() | |
| return df_sel | |
| def find_candlestick_patterns(df: pd.DataFrame, pattern_name: str) -> Tuple[List, Optional[str]]: | |
| """ | |
| Run TALib pattern and return list of addplots (apds) for mplfinance, | |
| or ([], error_message) on error. | |
| """ | |
| # check callable pattern | |
| try: | |
| pattern_func = getattr(talib, pattern_name) | |
| except AttributeError: | |
| return [], f"Error: '{pattern_name}' formasyonu TALib'te bulunamadı." | |
| if not callable(pattern_func): | |
| return [], f"Error: '{pattern_name}' TALib içinde callable değil." | |
| # clean and prepare data | |
| try: | |
| df_clean = clean_ohlc(df) | |
| except Exception as e: | |
| return [], f"Veri temizleme hatası: {e}" | |
| try: | |
| open_arr = np.asarray(df_clean['Open'].values, dtype=float).ravel() | |
| high_arr = np.asarray(df_clean['High'].values, dtype=float).ravel() | |
| low_arr = np.asarray(df_clean['Low'].values, dtype=float).ravel() | |
| close_arr= np.asarray(df_clean['Close'].values, dtype=float).ravel() | |
| # sanity check lengths | |
| n = len(open_arr) | |
| if not (len(high_arr) == len(low_arr) == len(close_arr) == n): | |
| return [], "Error: OHLC array uzunlukları eşit değil." | |
| except Exception as e: | |
| return [], f"Array dönüşüm hatası: {e}" | |
| # call talib | |
| try: | |
| pattern_result = pattern_func(open_arr, high_arr, low_arr, close_arr) | |
| pattern_series = pd.Series(pattern_result, index=df_clean.index) | |
| except Exception as e: | |
| return [], f"TALib çalıştırma hatası: {e}" | |
| apds = [] | |
| # bullish (positive) | |
| bull_idx = pattern_series[pattern_series > 0].index | |
| if len(bull_idx) > 0: | |
| bullish_points = pd.Series(np.nan, index=df_clean.index) | |
| # vectorized assignment for indices | |
| bullish_points.loc[bull_idx] = df_clean.loc[bull_idx, 'Low'] * 0.98 | |
| apds.append( | |
| mpf.make_addplot( | |
| bullish_points, | |
| type='scatter', | |
| marker='^', | |
| markersize=60, | |
| color='green', | |
| panel=0, | |
| alpha=0.85 | |
| ) | |
| ) | |
| # bearish (negative) | |
| bear_idx = pattern_series[pattern_series < 0].index | |
| if len(bear_idx) > 0: | |
| bearish_points = pd.Series(np.nan, index=df_clean.index) | |
| bearish_points.loc[bear_idx] = df_clean.loc[bear_idx, 'High'] * 1.02 | |
| apds.append( | |
| mpf.make_addplot( | |
| bearish_points, | |
| type='scatter', | |
| marker='v', | |
| markersize=60, | |
| color='red', | |
| panel=0, | |
| alpha=0.85 | |
| ) | |
| ) | |
| return apds, None | |
| def plot_stock_with_patterns(ticker_symbol: str, start_date: str, end_date: str, selected_patterns) -> Tuple[Optional[str], str]: | |
| """ | |
| Main handler for Gradio. Returns (image_filepath or None, status_message). | |
| """ | |
| if not ticker_symbol: | |
| return None, "Error: stock symbol could not empty." | |
| try: | |
| start = pd.to_datetime(start_date) | |
| end = pd.to_datetime(end_date) | |
| if start >= end: | |
| return None, "Error: date error." | |
| except Exception: | |
| return None, "Error: invalid date format" | |
| # download | |
| try: | |
| df = yf.download(ticker_symbol, start=start_date, end=end_date, progress=False) | |
| if df is None or (isinstance(df, pd.DataFrame) and df.empty): | |
| return None, f"Error: '{ticker_symbol}' için veri bulunamadı (yfinance boş döndü)." | |
| except Exception as e: | |
| return None, f"Veri indirilirken hata oluştu: {e}" | |
| # Clean once here to feed to mplfinance and TALib | |
| try: | |
| df_clean = clean_ohlc(df) | |
| except Exception as e: | |
| return None, f"Veri temizleme hatası: {e}" | |
| # patterns | |
| all_apds = [] | |
| if selected_patterns: | |
| # selected_patterns might be tuple/list/str | |
| if isinstance(selected_patterns, str): | |
| selected_patterns = [selected_patterns] | |
| for pattern_name in selected_patterns: | |
| pattern_apds, err = find_candlestick_patterns(df_clean, pattern_name) | |
| if err: | |
| return None, err | |
| all_apds.extend(pattern_apds) | |
| # prepare fig path | |
| os.makedirs('/tmp', exist_ok=True) | |
| safe_ticker = str(ticker_symbol).replace("/", "_").replace("\\", "_") | |
| fig_path = f"/tmp/stock_chart_{safe_ticker}_{pd.Timestamp.now().strftime('%Y%m%d%H%M%S')}.png" | |
| # style | |
| s = mpf.make_mpf_style(base_mpf_style='yahoo', mavcolors=['#1f77b4', '#ff7f0e', '#2ca02c']) | |
| # detect if volume exists | |
| has_volume = 'Volume' in df_clean.columns | |
| try: | |
| fig, _ = mpf.plot( | |
| df_clean, | |
| type='candle', | |
| volume=has_volume, | |
| addplot=all_apds if all_apds else None, | |
| style=s, | |
| title=f"\n{ticker_symbol} Candlestick Chart", | |
| ylabel='Price', | |
| ylabel_lower='Volume' if has_volume else None, | |
| returnfig=True, | |
| figscale=1.5 | |
| ) | |
| fig.savefig(fig_path, dpi=150, bbox_inches='tight') | |
| return fig_path, "Successfully created!" | |
| except Exception as e: | |
| return None, f"error: {e}" | |
| # Gradio arayüzü | |
| iface = gr.Interface( | |
| fn=plot_stock_with_patterns, | |
| inputs=[ | |
| gr.Textbox(label="Stock Symbol (örn: MSFT, AAPL, ^GSPC)", value="MSFT"), | |
| gr.Textbox(label="Starting Date (YYYY-MM-DD)", value="2024-01-01"), | |
| gr.Textbox(label="End Date (YYYY-MM-DD)", value=date.today().strftime("%Y-%m-%d")), | |
| gr.CheckboxGroup( | |
| label="Select one (TALib)", | |
| choices=TALIB_PATTERNS, | |
| value=["CDLHAMMER", "CDLSHOOTINGSTAR"] | |
| ) | |
| ], | |
| outputs=[ | |
| gr.Image(type="filepath", label="Hisse Grafiği"), | |
| gr.Textbox(label="Durum") | |
| ], | |
| title="TALib Tool", | |
| description="bullish green, bearish red" | |
| ) | |
| if __name__ == "__main__": | |
| iface.launch() | |