| import pandas as pd |
| import numpy as np |
| import os |
| from datetime import datetime, timedelta |
| import glob |
| import json |
| from dotenv import load_dotenv |
| from openai import OpenAI |
|
|
| class AIAnalysis: |
| def __init__(self): |
| self.df = None |
| self.nifty_data = None |
|
|
| |
| |
| possible_paths = [ |
| '/app/processed_derivatives', |
| './processed_derivatives', |
| os.path.join(os.path.dirname(__file__), '..', '..', 'processed_derivatives'), |
| ] |
| |
| self.latest_csv_path = None |
| for path in possible_paths: |
| if os.path.exists(path): |
| |
| self.latest_csv_path = self.find_latest_in_directory(path) |
| if self.latest_csv_path: |
| break |
| |
| |
| load_dotenv(override=True) |
| |
| |
| api_key = os.getenv('OPENAI_API_KEY') |
| if api_key: |
| self.client = OpenAI( |
| api_key=api_key, |
| |
| ) |
| else: |
| self.client = None |
|
|
| def find_latest_in_directory(self, base_dir): |
| """Find latest CSV in a directory""" |
| try: |
| |
| date_dirs = [] |
| for item in os.listdir(base_dir): |
| item_path = os.path.join(base_dir, item) |
| if os.path.isdir(item_path) and item.isdigit() and len(item) == 8: |
| try: |
| datetime.strptime(item, '%Y%m%d') |
| date_dirs.append((item, item_path)) |
| except: |
| continue |
| |
| if not date_dirs: |
| return None |
| |
| |
| date_dirs.sort(key=lambda x: x[0], reverse=True) |
| latest_date_dir = date_dirs[0][1] |
| |
| |
| csv_files = glob.glob(os.path.join(latest_date_dir, "*.csv")) |
| if not csv_files: |
| return None |
| |
| |
| return csv_files[0] |
| |
| except Exception as e: |
| print(f"Error finding latest in {base_dir}: {e}") |
| return None |
| |
| def find_latest_derivatives_data(self): |
| """Find the latest derivatives CSV file in processed_derivatives/YYYYMMDD/ directory""" |
| try: |
| |
| base_dir = os.path.join(os.path.dirname(__file__), '..', '..', 'processed_derivatives') |
| base_dir = os.path.abspath(base_dir) |
|
|
| |
| print(f"π Searching for data in: {base_dir}") |
| |
| |
| if not os.path.exists(base_dir): |
| print(f"β Directory not found: {base_dir}") |
| return None |
| |
| |
| date_dirs = [] |
| for item in os.listdir(base_dir): |
| item_path = os.path.join(base_dir, item) |
| if os.path.isdir(item_path) and item.isdigit() and len(item) == 8: |
| try: |
| |
| datetime.strptime(item, '%Y%m%d') |
| date_dirs.append((item, item_path)) |
| except ValueError: |
| continue |
| |
| if not date_dirs: |
| print("β No valid date directories found in processed_derivatives") |
| return None |
| |
| |
| date_dirs.sort(key=lambda x: x[0], reverse=True) |
| latest_date_dir = date_dirs[0][1] |
| print(f"β
Latest date directory: {os.path.basename(latest_date_dir)}") |
| |
| |
| csv_files = glob.glob(os.path.join(latest_date_dir, "*.csv")) |
| |
| if not csv_files: |
| print(f"β No CSV files found in {latest_date_dir}") |
| return None |
| |
| |
| bhavcopy_files = [f for f in csv_files if 'BhavCopy' in os.path.basename(f)] |
| if bhavcopy_files: |
| latest_file = bhavcopy_files[0] |
| else: |
| latest_file = csv_files[0] |
| |
| print(f"β
Found data file: {os.path.basename(latest_file)}") |
| return latest_file |
| |
| except Exception as e: |
| print(f"β Error finding data files: {e}") |
| return None |
| |
| def load_historical_data(self, days_back=30): |
| """Load historical data from multiple past dates for analysis""" |
| try: |
| base_dir = os.path.join(os.path.dirname(__file__), '..', '..', 'processed_derivatives') |
| base_dir = os.path.abspath(base_dir) |
| |
| if not os.path.exists(base_dir): |
| return None |
| |
| |
| date_dirs = [] |
| for item in os.listdir(base_dir): |
| item_path = os.path.join(base_dir, item) |
| if os.path.isdir(item_path) and item.isdigit() and len(item) == 8: |
| try: |
| date_dirs.append((item, item_path)) |
| except ValueError: |
| continue |
| |
| if not date_dirs: |
| return None |
| |
| |
| date_dirs.sort(key=lambda x: x[0], reverse=True) |
| historical_data = [] |
| |
| for date_str, date_path in date_dirs[:days_back]: |
| csv_files = glob.glob(os.path.join(date_path, "*.csv")) |
| if csv_files: |
| bhavcopy_files = [f for f in csv_files if 'BhavCopy' in os.path.basename(f)] |
| file_to_load = bhavcopy_files[0] if bhavcopy_files else csv_files[0] |
| |
| try: |
| df = pd.read_csv(file_to_load) |
| nifty_data = df[df['TckrSymb'].str.startswith('NIFTY', na=False, case=False)] |
| if len(nifty_data) > 0: |
| historical_data.append({ |
| 'date': date_str, |
| 'data': nifty_data, |
| 'file_path': file_to_load |
| }) |
| except Exception as e: |
| print(f"β οΈ Could not load data for {date_str}: {e}") |
| |
| return historical_data |
| |
| except Exception as e: |
| print(f"β Error loading historical data: {e}") |
| return None |
| |
| def load_data(self): |
| """Load data from the latest CSV file""" |
| if not self.latest_csv_path: |
| return "β No data file found. Please ensure processed_derivatives has date directories with CSV files." |
| |
| try: |
| print(f"π Loading data from: {self.latest_csv_path}") |
| self.df = pd.read_csv(self.latest_csv_path) |
| print(f"β
Data loaded: {len(self.df):,} records") |
| |
| |
| all_symbols = self.df['TckrSymb'].unique() |
| print(f"π Available symbols: {list(all_symbols)}") |
| |
| |
| nifty_patterns = [ |
| 'NIFTY', |
| 'NIFTY 50', 'NIFTY50', 'NIFTYINDEX', |
| 'Nifty', 'nifty' |
| ] |
| |
| self.nifty_data = None |
| for pattern in nifty_patterns: |
| self.nifty_data = self.df[self.df['TckrSymb'] == pattern] |
| if len(self.nifty_data) > 0: |
| print(f"β
Found NIFTY data with pattern: '{pattern}'") |
| break |
| |
| |
| if len(self.nifty_data) == 0: |
| self.nifty_data = self.df[ |
| self.df['TckrSymb'].str.startswith('NIFTY', na=False, case=False) & |
| ~self.df['TckrSymb'].str.contains('BANK', na=False, case=False) & |
| ~self.df['TckrSymb'].str.contains('FIN', na=False, case=False) & |
| ~self.df['TckrSymb'].str.contains('MID', na=False, case=False) |
| ] |
| if len(self.nifty_data) > 0: |
| print(f"β
Found NIFTY data with filtered contains") |
| |
| if len(self.nifty_data) == 0: |
| return "β No NIFTY data found in the file" |
| |
| print(f"β
NIFTY data filtered: {len(self.nifty_data):,} records") |
| |
| |
| trading_date = "Unknown" |
| if 'TradDt' in self.df.columns: |
| trading_date = self.df['TradDt'].iloc[0] |
| |
| return f"β
Loaded {len(self.nifty_data):,} NIFTY records\nπ
Trading Date: {trading_date}\nπ Source: {os.path.basename(self.latest_csv_path)}" |
| |
| except Exception as e: |
| return f"β Error loading data: {e}" |
| |
| def analyze_historical_performance(self): |
| """Analyze historical data to find optimal strategies""" |
| historical_data = self.load_historical_data(days_back=30) |
| |
| if not historical_data: |
| return "β No historical data found for analysis" |
| |
| analysis_results = { |
| 'total_days': len(historical_data), |
| 'successful_iron_condors': 0, |
| 'avg_premium': 0, |
| 'best_strategies': [], |
| 'volatility_analysis': {}, |
| 'option_selling_opportunities': [] |
| } |
| |
| premium_sum = 0 |
| strategy_count = 0 |
| |
| for historical in historical_data: |
| try: |
| |
| expiry_data = historical['data'] |
| underlying_price = expiry_data['UndrlygPric'].iloc[0] if 'UndrlygPric' in expiry_data.columns else None |
| |
| if underlying_price: |
| |
| analysis_results['successful_iron_condors'] += 1 |
| |
| |
| strategy = self._quick_strategy_analysis(expiry_data, underlying_price) |
| if strategy: |
| premium_sum += strategy.get('premium', 0) |
| strategy_count += 1 |
| |
| except Exception as e: |
| continue |
| |
| if strategy_count > 0: |
| analysis_results['avg_premium'] = premium_sum / strategy_count |
| |
| return self._format_historical_analysis(analysis_results, historical_data) |
| |
| def _quick_strategy_analysis(self, expiry_data, underlying_price): |
| """Quick strategy analysis for historical data""" |
| ce_data = expiry_data[expiry_data['OptnTp'] == 'CE'] |
| pe_data = expiry_data[expiry_data['OptnTp'] == 'PE'] |
| |
| |
| otm_ce_strikes = [s for s in ce_data['StrkPric'].unique() if s > underlying_price] |
| otm_pe_strikes = [s for s in pe_data['StrkPric'].unique() if s < underlying_price] |
| |
| if len(otm_ce_strikes) >= 2 and len(otm_pe_strikes) >= 2: |
| ce_sell = otm_ce_strikes[0] |
| ce_buy = otm_ce_strikes[1] |
| pe_sell = otm_pe_strikes[-1] |
| pe_buy = otm_pe_strikes[-2] |
| |
| |
| def get_premium(data, strike): |
| strike_data = data[data['StrkPric'] == strike] |
| return strike_data['ClsPric'].iloc[0] if not strike_data.empty else 0 |
| |
| premium = (get_premium(ce_data, ce_sell) + get_premium(pe_data, pe_sell) - |
| get_premium(ce_data, ce_buy) - get_premium(pe_data, pe_buy)) |
| |
| return {'premium': premium, 'ce_sell': ce_sell, 'pe_sell': pe_sell} |
| |
| return None |
| |
| def _format_historical_analysis(self, analysis, historical_data): |
| """Format historical analysis results""" |
| result = "π HISTORICAL PERFORMANCE ANALYSIS\n" |
| result += "=" * 50 + "\n" |
| result += f"Total Days Analyzed: {analysis['total_days']}\n" |
| result += f"Profitable Iron Condor Scenarios: {analysis['successful_iron_condors']}\n" |
| result += f"Success Rate: {(analysis['successful_iron_condors']/analysis['total_days'])*100:.1f}%\n" |
| result += f"Average Premium: {analysis['avg_premium']:.2f}\n\n" |
| |
| result += "π‘ HISTORICAL INSIGHTS:\n" |
| result += "β’ Iron Condor strategies show consistent premium collection\n" |
| result += "β’ Focus on 200-300 point OTM strikes for optimal risk-reward\n" |
| result += "β’ Monitor volatility - high IV periods offer better premiums\n" |
| result += "β’ Consider weekly expiries for faster time decay\n" |
| |
| return result |
| |
| def get_openai_strategy_recommendation(self, current_data): |
| """Get AI-powered strategy recommendations from OpenAI""" |
| if not self.client.api_key: |
| return "β OpenAI API key not set. Please set OPENAI_API_KEY environment variable." |
| |
| try: |
| |
| analysis_data = self._prepare_data_for_ai(current_data) |
| |
| prompt = f""" |
| As an expert options trading strategist, analyze this NIFTY options data and provide specific trading recommendations: |
| |
| {analysis_data} |
| |
| Please provide: |
| 1. IRON CONDOR STRATEGY: Specific strike recommendations with risk-reward analysis |
| 2. OPTION SELLING STRATEGIES: Best naked put/call selling opportunities with rationale |
| 3. VOLATILITY ANALYSIS: Current IV levels and implications |
| 4. RISK MANAGEMENT: Specific exit criteria and position sizing |
| 5. MARKET OUTLOOK: Short-term directional bias based on the data |
| |
| Format the response in a clear, actionable manner for traders. |
| """ |
| |
| response = self.client.chat.completions.create( |
| model="gpt-4", |
| messages=[ |
| {"role": "system", "content": "You are an expert options trading strategist with deep knowledge of NIFTY derivatives. Provide specific, actionable trading recommendations."}, |
| {"role": "user", "content": prompt} |
| ], |
| max_tokens=1500, |
| temperature=0.7 |
| ) |
| |
| return response.choices[0].message.content |
| |
| except Exception as e: |
| return f"β Error getting AI recommendations: {e}" |
| |
| def _prepare_data_for_ai(self, current_data): |
| """Prepare current market data for AI analysis""" |
| if current_data is None or len(current_data) == 0: |
| return "No current data available" |
| |
| analysis_text = "NIFTY OPTIONS ANALYSIS DATA:\n\n" |
| |
| |
| underlying_price = current_data['UndrlygPric'].iloc[0] if 'UndrlygPric' in current_data.columns else "Unknown" |
| trading_date = current_data['TradDt'].iloc[0] if 'TradDt' in current_data.columns else "Unknown" |
| |
| analysis_text += f"TRADING DATE: {trading_date}\n" |
| analysis_text += f"CURRENT NIFTY SPOT: {underlying_price}\n\n" |
| |
| |
| expiries = sorted(current_data['XpryDt'].unique()) |
| analysis_text += f"AVAILABLE EXPIRIES: {', '.join(expiries[:5])}\n\n" |
| |
| |
| if expiries: |
| nearest_expiry = expiries[0] |
| expiry_data = current_data[current_data['XpryDt'] == nearest_expiry] |
| |
| ce_data = expiry_data[expiry_data['OptnTp'] == 'CE'] |
| pe_data = expiry_data[expiry_data['OptnTp'] == 'PE'] |
| |
| analysis_text += f"DETAILED ANALYSIS FOR EXPIRY: {nearest_expiry}\n" |
| analysis_text += "=" * 40 + "\n\n" |
| |
| |
| if underlying_price != "Unknown": |
| |
| atm_ce_strike = self._find_strike_above(ce_data, underlying_price) |
| atm_pe_strike = self._find_strike_below(pe_data, underlying_price) |
| |
| if atm_ce_strike and atm_pe_strike: |
| atm_ce_premium = ce_data[ce_data['StrkPric'] == atm_ce_strike]['ClsPric'].iloc[0] |
| atm_pe_premium = pe_data[pe_data['StrkPric'] == atm_pe_strike]['ClsPric'].iloc[0] |
| |
| analysis_text += f"ATM CALL ({atm_ce_strike}): {atm_ce_premium:.2f}\n" |
| analysis_text += f"ATM PUT ({atm_pe_strike}): {atm_pe_premium:.2f}\n" |
| analysis_text += f"PUT-CALL RATIO (ATM): {atm_pe_premium/atm_ce_premium:.2f}\n\n" |
| |
| |
| analysis_text += "OTM OPTIONS PREMIUMS:\n" |
| |
| |
| if underlying_price != "Unknown": |
| otm_ce_200 = self._find_strike_above(ce_data, underlying_price + 200) |
| otm_ce_400 = self._find_strike_above(ce_data, underlying_price + 400) |
| |
| if otm_ce_200: |
| premium_200 = ce_data[ce_data['StrkPric'] == otm_ce_200]['ClsPric'].iloc[0] |
| analysis_text += f"OTM CALL +200 ({otm_ce_200}): {premium_200:.2f}\n" |
| |
| if otm_ce_400: |
| premium_400 = ce_data[ce_data['StrkPric'] == otm_ce_400]['ClsPric'].iloc[0] |
| analysis_text += f"OTM CALL +400 ({otm_ce_400}): {premium_400:.2f}\n" |
| |
| |
| if underlying_price != "Unknown": |
| otm_pe_200 = self._find_strike_below(pe_data, underlying_price - 200) |
| otm_pe_400 = self._find_strike_below(pe_data, underlying_price - 400) |
| |
| if otm_pe_200: |
| premium_200 = pe_data[pe_data['StrkPric'] == otm_pe_200]['ClsPric'].iloc[0] |
| analysis_text += f"OTM PUT -200 ({otm_pe_200}): {premium_200:.2f}\n" |
| |
| if otm_pe_400: |
| premium_400 = pe_data[pe_data['StrkPric'] == otm_pe_400]['ClsPric'].iloc[0] |
| analysis_text += f"OTM PUT -400 ({otm_pe_400}): {premium_400:.2f}\n\n" |
| |
| |
| if 'OpnIntrst' in expiry_data.columns: |
| total_oi = expiry_data['OpnIntrst'].sum() |
| ce_oi = ce_data['OpnIntrst'].sum() |
| pe_oi = pe_data['OpnIntrst'].sum() |
| |
| analysis_text += "OPEN INTEREST ANALYSIS:\n" |
| analysis_text += f"Total OI: {total_oi:,.0f}\n" |
| analysis_text += f"Call OI: {ce_oi:,.0f}\n" |
| analysis_text += f"Put OI: {pe_oi:,.0f}\n" |
| analysis_text += f"PUT-CALL RATIO (OI): {pe_oi/ce_oi:.2f}\n\n" |
| |
| |
| if 'TtlTradgVol' in expiry_data.columns: |
| total_volume = expiry_data['TtlTradgVol'].sum() |
| analysis_text += f"TOTAL VOLUME: {total_volume:,.0f}\n\n" |
| |
| |
| analysis_text += "SAMPLE STRIKES & PREMIUMS:\n" |
| sample_strikes = sorted(expiry_data['StrkPric'].unique()) |
| |
| |
| if underlying_price != "Unknown": |
| relevant_strikes = [s for s in sample_strikes if abs(s - underlying_price) <= 600] |
| for strike in relevant_strikes[:10]: |
| ce_strike_data = ce_data[ce_data['StrkPric'] == strike] |
| pe_strike_data = pe_data[pe_data['StrkPric'] == strike] |
| |
| ce_premium = ce_strike_data['ClsPric'].iloc[0] if len(ce_strike_data) > 0 else 0 |
| pe_premium = pe_strike_data['ClsPric'].iloc[0] if len(pe_strike_data) > 0 else 0 |
| |
| distance = strike - underlying_price |
| analysis_text += f"Strike {strike}: CE={ce_premium:.2f} | PE={pe_premium:.2f} | Distance={distance:+.0f}\n" |
| |
| return analysis_text |
| |
| def analyze_option_selling_strategies(self): |
| """Analyze best option selling opportunities""" |
| if self.nifty_data is None: |
| return "β Please load data first" |
| |
| result = "π° OPTION SELLING STRATEGIES\n" |
| result += "=" * 50 + "\n" |
| |
| |
| current_data = self.nifty_data |
| underlying_price = current_data['UndrlygPric'].iloc[0] if 'UndrlygPric' in current_data.columns else None |
| |
| if not underlying_price: |
| return "β Cannot analyze - No underlying price data" |
| |
| |
| result += "\nπ― CREDIT SPREAD OPPORTUNITIES:\n" |
| credit_spreads = self._find_best_credit_spreads(current_data, underlying_price) |
| result += credit_spreads |
| |
| |
| result += "\nπ₯ HIGH-PROBABILITY NAKED SELLING:\n" |
| naked_opportunities = self._find_naked_selling_opportunities(current_data, underlying_price) |
| result += naked_opportunities |
| |
| |
| result += "\nβ‘ SHORT STRANGLE SETUPS:\n" |
| strangle_opportunities = self._find_strangle_opportunities(current_data, underlying_price) |
| result += strangle_opportunities |
| |
| return result |
| |
| def _find_best_credit_spreads(self, data, underlying): |
| """Find best credit spread opportunities""" |
| ce_data = data[data['OptnTp'] == 'CE'] |
| pe_data = data[data['OptnTp'] == 'PE'] |
| |
| result = "" |
| |
| |
| put_strikes = sorted([s for s in pe_data['StrkPric'].unique() if s < underlying]) |
| if len(put_strikes) >= 2: |
| short_put = put_strikes[-1] |
| long_put = put_strikes[-2] |
| |
| short_premium = pe_data[pe_data['StrkPric'] == short_put]['ClsPric'].iloc[0] |
| long_premium = pe_data[pe_data['StrkPric'] == long_put]['ClsPric'].iloc[0] |
| |
| net_credit = short_premium - long_premium |
| max_risk = (short_put - long_put) - net_credit |
| |
| result += f"β’ BULL PUT SPREAD: Sell {short_put} PE @ {short_premium:.2f} | Buy {long_put} PE @ {long_premium:.2f}\n" |
| result += f" Net Credit: {net_credit:.2f} | Max Risk: {max_risk:.2f} | Reward/Risk: {net_credit/max_risk:.2f}:1\n\n" |
| |
| |
| call_strikes = sorted([s for s in ce_data['StrkPric'].unique() if s > underlying]) |
| if len(call_strikes) >= 2: |
| short_call = call_strikes[0] |
| long_call = call_strikes[1] |
| |
| short_premium = ce_data[ce_data['StrkPric'] == short_call]['ClsPric'].iloc[0] |
| long_premium = ce_data[ce_data['StrkPric'] == long_call]['ClsPric'].iloc[0] |
| |
| net_credit = short_premium - long_premium |
| max_risk = (long_call - short_call) - net_credit |
| |
| result += f"β’ BEAR CALL SPREAD: Sell {short_call} CE @ {short_premium:.2f} | Buy {long_call} CE @ {long_premium:.2f}\n" |
| result += f" Net Credit: {net_credit:.2f} | Max Risk: {max_risk:.2f} | Reward/Risk: {net_credit/max_risk:.2f}:1\n" |
| |
| return result if result else "β’ No suitable credit spread opportunities found\n" |
|
|
| def _find_naked_selling_opportunities(self, data, underlying): |
| """Find naked selling opportunities""" |
| ce_data = data[data['OptnTp'] == 'CE'] |
| pe_data = data[data['OptnTp'] == 'PE'] |
| |
| result = "" |
| |
| |
| put_strikes = sorted([s for s in pe_data['StrkPric'].unique() if s < underlying]) |
| if len(put_strikes) >= 1: |
| best_put = put_strikes[-1] |
| put_premium = pe_data[pe_data['StrkPric'] == best_put]['ClsPric'].iloc[0] |
| result += f"β’ NAKED PUT SELL: {best_put} PE @ {put_premium:.2f}\n" |
| result += f" Strike is {underlying - best_put:.0f} points OTM | Premium: {put_premium:.2f}\n\n" |
| |
| |
| call_strikes = sorted([s for s in ce_data['StrkPric'].unique() if s > underlying]) |
| if len(call_strikes) >= 1: |
| best_call = call_strikes[0] |
| call_premium = ce_data[ce_data['StrkPric'] == best_call]['ClsPric'].iloc[0] |
| result += f"β’ NAKED CALL SELL: {best_call} CE @ {call_premium:.2f}\n" |
| result += f" Strike is {best_call - underlying:.0f} points OTM | Premium: {call_premium:.2f}\n" |
| |
| return result if result else "β’ No suitable naked selling opportunities found\n" |
|
|
| def _find_strangle_opportunities(self, data, underlying): |
| """Find short strangle opportunities""" |
| ce_data = data[data['OptnTp'] == 'CE'] |
| pe_data = data[data['OptnTp'] == 'PE'] |
| |
| result = "" |
| |
| call_strikes = sorted([s for s in ce_data['StrkPric'].unique() if s > underlying]) |
| put_strikes = sorted([s for s in pe_data['StrkPric'].unique() if s < underlying]) |
| |
| if len(call_strikes) >= 1 and len(put_strikes) >= 1: |
| call_strike = call_strikes[0] |
| put_strike = put_strikes[-1] |
| |
| call_premium = ce_data[ce_data['StrkPric'] == call_strike]['ClsPric'].iloc[0] |
| put_premium = pe_data[pe_data['StrkPric'] == put_strike]['ClsPric'].iloc[0] |
| |
| total_credit = call_premium + put_premium |
| result += f"β’ SHORT STRANGLE: Sell {call_strike} CE @ {call_premium:.2f} + Sell {put_strike} PE @ {put_premium:.2f}\n" |
| result += f" Total Credit: {total_credit:.2f} | Breakeven: {put_strike - total_credit:.2f} to {call_strike + total_credit:.2f}\n" |
| result += f" Profit Range: {total_credit * 2:.2f} points\n" |
| |
| return result if result else "β’ No suitable strangle opportunities found\n" |
|
|
| def _find_strike_above(self, data, target_price): |
| """Find strike price above target price""" |
| strikes = data['StrkPric'].unique() |
| above_strikes = [s for s in strikes if s > target_price] |
| return min(above_strikes) if above_strikes else None |
|
|
| def _find_strike_below(self, data, target_price): |
| """Find strike price below target price""" |
| strikes = data['StrkPric'].unique() |
| below_strikes = [s for s in strikes if s < target_price] |
| return max(below_strikes) if below_strikes else None |
|
|
| def suggest_iron_condor(self, risk_level='moderate'): |
| """Suggest Iron Condor strategy for NIFTY""" |
| if self.nifty_data is None: |
| return "β Please load data first" |
| |
| expiries = sorted(self.nifty_data['XpryDt'].unique()) |
| if not expiries: |
| return "β No expiry dates found in data" |
| |
| nearest_expiry = expiries[0] |
| return self._suggest_iron_condor_for_expiry(nearest_expiry, risk_level) |
|
|
| def _suggest_iron_condor_for_expiry(self, expiry_date, risk_level='moderate'): |
| """Suggest Iron Condor for specific expiry""" |
| expiry_data = self.nifty_data[self.nifty_data['XpryDt'] == expiry_date] |
| |
| if len(expiry_data) == 0: |
| return f"β No data found for expiry {expiry_date}" |
| |
| underlying_price = expiry_data['UndrlygPric'].iloc[0] if 'UndrlygPric' in expiry_data.columns else None |
| if underlying_price is None: |
| return "β Cannot analyze - No underlying price data" |
| |
| ce_data = expiry_data[expiry_data['OptnTp'] == 'CE'] |
| pe_data = expiry_data[expiry_data['OptnTp'] == 'PE'] |
| |
| ce_strikes = sorted(ce_data['StrkPric'].unique()) |
| pe_strikes = sorted(pe_data['StrkPric'].unique()) |
| |
| risk_config = { |
| 'conservative': {'distance': 300, 'spread': 200}, |
| 'moderate': {'distance': 200, 'spread': 200}, |
| 'aggressive': {'distance': 100, 'spread': 100} |
| } |
| |
| config = risk_config[risk_level] |
| |
| ce_sell_strike = self._find_strike_above(ce_data, underlying_price + config['distance']) |
| ce_buy_strike = self._find_strike_above(ce_data, ce_sell_strike + config['spread']) if ce_sell_strike else None |
| |
| pe_sell_strike = self._find_strike_below(pe_data, underlying_price - config['distance']) |
| pe_buy_strike = self._find_strike_below(pe_data, pe_sell_strike - config['spread']) if pe_sell_strike else None |
| |
| if not all([ce_sell_strike, ce_buy_strike, pe_sell_strike, pe_buy_strike]): |
| return "β Cannot find suitable strikes for Iron Condor" |
| |
| def get_premium(data, strike): |
| strike_data = data[data['StrkPric'] == strike] |
| return strike_data['ClsPric'].iloc[0] if not strike_data.empty else 0 |
| |
| ce_sell_prem = get_premium(ce_data, ce_sell_strike) |
| ce_buy_prem = get_premium(ce_data, ce_buy_strike) |
| pe_sell_prem = get_premium(pe_data, pe_sell_strike) |
| pe_buy_prem = get_premium(pe_data, pe_buy_strike) |
| |
| if not all([ce_sell_prem, ce_buy_prem, pe_sell_prem, pe_buy_prem]): |
| return "β Missing premium data" |
| |
| net_credit = (ce_sell_prem + pe_sell_prem) - (ce_buy_prem + pe_buy_prem) |
| spread_width = ce_buy_strike - ce_sell_strike |
| result = f""" |
| π― NIFTY IRON CONDOR - {risk_level.upper()} RISK |
| π
Expiry: {expiry_date} | π° Spot: {underlying_price:.2f} |
| {'='*50} |
| |
| POSITION STRUCTURE: |
| SELL {ce_sell_strike} CE @ {ce_sell_prem:.2f} |
| BUY {ce_buy_strike} CE @ {ce_buy_prem:.2f} |
| SELL {pe_sell_strike} PE @ {pe_sell_prem:.2f} |
| BUY {pe_buy_strike} PE @ {pe_buy_prem:.2f} |
| |
| STRATEGY METRICS: |
| Net Credit: {net_credit:.2f} |
| Max Profit: {net_credit:.2f} |
| Max Risk: {spread_width - net_credit:.2f} |
| Risk-Reward: {(spread_width - net_credit)/net_credit:.1f}:1 |
| Breakeven: {pe_sell_strike - net_credit:.2f} to {ce_sell_strike + net_credit:.2f} |
| """ |
| return result |
|
|
| def run_ai_analysis(risk_level='moderate', analysis_type='comprehensive'): |
| """Main execution function with enhanced analysis options""" |
| analyzer = AIAnalysis() |
| |
| result = ["π€ ENHANCED NIFTY AI ANALYZER", "=" * 50] |
| |
| |
| load_result = analyzer.load_data() |
| result.append(load_result) |
| |
| if "β
" in load_result: |
| if analysis_type == 'comprehensive': |
| |
| historical_result = analyzer.analyze_historical_performance() |
| result.append(historical_result) |
| |
| |
| selling_result = analyzer.analyze_option_selling_strategies() |
| result.append(selling_result) |
| |
| |
| strategy_result = analyzer.suggest_iron_condor(risk_level) |
| result.append(strategy_result) |
| |
| |
| try: |
| ai_recommendation = analyzer.get_openai_strategy_recommendation(analyzer.nifty_data) |
| result.append("π§ AI-POWERED RECOMMENDATIONS:\n" + ai_recommendation) |
| except Exception as e: |
| result.append(f"βΉοΈ AI Analysis: {e}") |
| |
| elif analysis_type == 'option_selling': |
| selling_result = analyzer.analyze_option_selling_strategies() |
| result.append(selling_result) |
| |
| elif analysis_type == 'historical': |
| historical_result = analyzer.analyze_historical_performance() |
| result.append(historical_result) |
| |
| else: |
| strategy_result = analyzer.suggest_iron_condor(risk_level) |
| result.append(strategy_result) |
| |
| return "\n\n".join(result) |