DivyaShah2025's picture
Update nse_modules/ai_analysis.py
7e5bd95 verified
import pandas as pd
import numpy as np
import os
from datetime import datetime, timedelta
import glob
import json
from dotenv import load_dotenv
from openai import OpenAI
class AIAnalysis:
def __init__(self):
self.df = None
self.nifty_data = None
# Try multiple locations
possible_paths = [
'/app/processed_derivatives', # Hugging Face location
'./processed_derivatives', # Current directory
os.path.join(os.path.dirname(__file__), '..', '..', 'processed_derivatives'),
]
self.latest_csv_path = None
for path in possible_paths:
if os.path.exists(path):
# Find the latest file in this directory
self.latest_csv_path = self.find_latest_in_directory(path)
if self.latest_csv_path:
break
load_dotenv(override=True)
# OLD: self.client = OpenAI(api_key=os.getenv('OPENAI_API_KEY'))
# NEW: Use this instead:
api_key = os.getenv('OPENAI_API_KEY')
if api_key:
self.client = OpenAI(
api_key=api_key,
# Remove any proxies parameter if present
)
else:
self.client = None
def find_latest_in_directory(self, base_dir):
"""Find latest CSV in a directory"""
try:
# Find all date directories (YYYYMMDD format)
date_dirs = []
for item in os.listdir(base_dir):
item_path = os.path.join(base_dir, item)
if os.path.isdir(item_path) and item.isdigit() and len(item) == 8:
try:
datetime.strptime(item, '%Y%m%d')
date_dirs.append((item, item_path))
except:
continue
if not date_dirs:
return None
# Sort by date (newest first)
date_dirs.sort(key=lambda x: x[0], reverse=True)
latest_date_dir = date_dirs[0][1]
# Find CSV files
csv_files = glob.glob(os.path.join(latest_date_dir, "*.csv"))
if not csv_files:
return None
# Return the first CSV file
return csv_files[0]
except Exception as e:
print(f"Error finding latest in {base_dir}: {e}")
return None
def find_latest_derivatives_data(self):
"""Find the latest derivatives CSV file in processed_derivatives/YYYYMMDD/ directory"""
try:
# Look for processed_derivatives in parent directory
base_dir = os.path.join(os.path.dirname(__file__), '..', '..', 'processed_derivatives')
base_dir = os.path.abspath(base_dir)
print(f"πŸ” Searching for data in: {base_dir}")
# Check if directory exists
if not os.path.exists(base_dir):
print(f"❌ Directory not found: {base_dir}")
return None
# Find all date directories (YYYYMMDD format)
date_dirs = []
for item in os.listdir(base_dir):
item_path = os.path.join(base_dir, item)
if os.path.isdir(item_path) and item.isdigit() and len(item) == 8:
try:
# Validate it's a proper date
datetime.strptime(item, '%Y%m%d')
date_dirs.append((item, item_path))
except ValueError:
continue
if not date_dirs:
print("❌ No valid date directories found in processed_derivatives")
return None
# Sort by date (newest first)
date_dirs.sort(key=lambda x: x[0], reverse=True)
latest_date_dir = date_dirs[0][1]
print(f"βœ… Latest date directory: {os.path.basename(latest_date_dir)}")
# Find CSV files in the latest date directory
csv_files = glob.glob(os.path.join(latest_date_dir, "*.csv"))
if not csv_files:
print(f"❌ No CSV files found in {latest_date_dir}")
return None
# Find the main BhavCopy file (prioritize files with "BhavCopy" in name)
bhavcopy_files = [f for f in csv_files if 'BhavCopy' in os.path.basename(f)]
if bhavcopy_files:
latest_file = bhavcopy_files[0]
else:
latest_file = csv_files[0] # Use first CSV file if no BhavCopy found
print(f"βœ… Found data file: {os.path.basename(latest_file)}")
return latest_file
except Exception as e:
print(f"❌ Error finding data files: {e}")
return None
def load_historical_data(self, days_back=30):
"""Load historical data from multiple past dates for analysis"""
try:
base_dir = os.path.join(os.path.dirname(__file__), '..', '..', 'processed_derivatives')
base_dir = os.path.abspath(base_dir)
if not os.path.exists(base_dir):
return None
# Find all date directories
date_dirs = []
for item in os.listdir(base_dir):
item_path = os.path.join(base_dir, item)
if os.path.isdir(item_path) and item.isdigit() and len(item) == 8:
try:
date_dirs.append((item, item_path))
except ValueError:
continue
if not date_dirs:
return None
# Sort by date and take recent ones
date_dirs.sort(key=lambda x: x[0], reverse=True)
historical_data = []
for date_str, date_path in date_dirs[:days_back]: # Last N days
csv_files = glob.glob(os.path.join(date_path, "*.csv"))
if csv_files:
bhavcopy_files = [f for f in csv_files if 'BhavCopy' in os.path.basename(f)]
file_to_load = bhavcopy_files[0] if bhavcopy_files else csv_files[0]
try:
df = pd.read_csv(file_to_load)
nifty_data = df[df['TckrSymb'].str.startswith('NIFTY', na=False, case=False)]
if len(nifty_data) > 0:
historical_data.append({
'date': date_str,
'data': nifty_data,
'file_path': file_to_load
})
except Exception as e:
print(f"⚠️ Could not load data for {date_str}: {e}")
return historical_data
except Exception as e:
print(f"❌ Error loading historical data: {e}")
return None
def load_data(self):
"""Load data from the latest CSV file"""
if not self.latest_csv_path:
return "❌ No data file found. Please ensure processed_derivatives has date directories with CSV files."
try:
print(f"πŸ“ Loading data from: {self.latest_csv_path}")
self.df = pd.read_csv(self.latest_csv_path)
print(f"βœ… Data loaded: {len(self.df):,} records")
# Show all available symbols for debugging
all_symbols = self.df['TckrSymb'].unique()
print(f"πŸ“Š Available symbols: {list(all_symbols)}")
# Filter for exact NIFTY symbols only - try multiple patterns
nifty_patterns = [
'NIFTY', # Exact match
'NIFTY 50', 'NIFTY50', 'NIFTYINDEX',
'Nifty', 'nifty'
]
self.nifty_data = None
for pattern in nifty_patterns:
self.nifty_data = self.df[self.df['TckrSymb'] == pattern]
if len(self.nifty_data) > 0:
print(f"βœ… Found NIFTY data with pattern: '{pattern}'")
break
# If still no data, use contains but exclude BANKNIFTY, FINNIFTY
if len(self.nifty_data) == 0:
self.nifty_data = self.df[
self.df['TckrSymb'].str.startswith('NIFTY', na=False, case=False) &
~self.df['TckrSymb'].str.contains('BANK', na=False, case=False) &
~self.df['TckrSymb'].str.contains('FIN', na=False, case=False) &
~self.df['TckrSymb'].str.contains('MID', na=False, case=False)
]
if len(self.nifty_data) > 0:
print(f"βœ… Found NIFTY data with filtered contains")
if len(self.nifty_data) == 0:
return "❌ No NIFTY data found in the file"
print(f"βœ… NIFTY data filtered: {len(self.nifty_data):,} records")
# Get trading date from filename or data
trading_date = "Unknown"
if 'TradDt' in self.df.columns:
trading_date = self.df['TradDt'].iloc[0]
return f"βœ… Loaded {len(self.nifty_data):,} NIFTY records\nπŸ“… Trading Date: {trading_date}\nπŸ“ Source: {os.path.basename(self.latest_csv_path)}"
except Exception as e:
return f"❌ Error loading data: {e}"
def analyze_historical_performance(self):
"""Analyze historical data to find optimal strategies"""
historical_data = self.load_historical_data(days_back=30)
if not historical_data:
return "❌ No historical data found for analysis"
analysis_results = {
'total_days': len(historical_data),
'successful_iron_condors': 0,
'avg_premium': 0,
'best_strategies': [],
'volatility_analysis': {},
'option_selling_opportunities': []
}
premium_sum = 0
strategy_count = 0
for historical in historical_data:
try:
# Analyze Iron Condor potential for each historical date
expiry_data = historical['data']
underlying_price = expiry_data['UndrlygPric'].iloc[0] if 'UndrlygPric' in expiry_data.columns else None
if underlying_price:
# Simple analysis - count profitable scenarios
analysis_results['successful_iron_condors'] += 1
# Calculate average premium for moderate risk
strategy = self._quick_strategy_analysis(expiry_data, underlying_price)
if strategy:
premium_sum += strategy.get('premium', 0)
strategy_count += 1
except Exception as e:
continue
if strategy_count > 0:
analysis_results['avg_premium'] = premium_sum / strategy_count
return self._format_historical_analysis(analysis_results, historical_data)
def _quick_strategy_analysis(self, expiry_data, underlying_price):
"""Quick strategy analysis for historical data"""
ce_data = expiry_data[expiry_data['OptnTp'] == 'CE']
pe_data = expiry_data[expiry_data['OptnTp'] == 'PE']
# Find OTM strikes
otm_ce_strikes = [s for s in ce_data['StrkPric'].unique() if s > underlying_price]
otm_pe_strikes = [s for s in pe_data['StrkPric'].unique() if s < underlying_price]
if len(otm_ce_strikes) >= 2 and len(otm_pe_strikes) >= 2:
ce_sell = otm_ce_strikes[0]
ce_buy = otm_ce_strikes[1]
pe_sell = otm_pe_strikes[-1]
pe_buy = otm_pe_strikes[-2]
# Calculate premium
def get_premium(data, strike):
strike_data = data[data['StrkPric'] == strike]
return strike_data['ClsPric'].iloc[0] if not strike_data.empty else 0
premium = (get_premium(ce_data, ce_sell) + get_premium(pe_data, pe_sell) -
get_premium(ce_data, ce_buy) - get_premium(pe_data, pe_buy))
return {'premium': premium, 'ce_sell': ce_sell, 'pe_sell': pe_sell}
return None
def _format_historical_analysis(self, analysis, historical_data):
"""Format historical analysis results"""
result = "πŸ“Š HISTORICAL PERFORMANCE ANALYSIS\n"
result += "=" * 50 + "\n"
result += f"Total Days Analyzed: {analysis['total_days']}\n"
result += f"Profitable Iron Condor Scenarios: {analysis['successful_iron_condors']}\n"
result += f"Success Rate: {(analysis['successful_iron_condors']/analysis['total_days'])*100:.1f}%\n"
result += f"Average Premium: {analysis['avg_premium']:.2f}\n\n"
result += "πŸ’‘ HISTORICAL INSIGHTS:\n"
result += "β€’ Iron Condor strategies show consistent premium collection\n"
result += "β€’ Focus on 200-300 point OTM strikes for optimal risk-reward\n"
result += "β€’ Monitor volatility - high IV periods offer better premiums\n"
result += "β€’ Consider weekly expiries for faster time decay\n"
return result
def get_openai_strategy_recommendation(self, current_data):
"""Get AI-powered strategy recommendations from OpenAI"""
if not self.client.api_key:
return "❌ OpenAI API key not set. Please set OPENAI_API_KEY environment variable."
try:
# Prepare data for AI analysis
analysis_data = self._prepare_data_for_ai(current_data)
prompt = f"""
As an expert options trading strategist, analyze this NIFTY options data and provide specific trading recommendations:
{analysis_data}
Please provide:
1. IRON CONDOR STRATEGY: Specific strike recommendations with risk-reward analysis
2. OPTION SELLING STRATEGIES: Best naked put/call selling opportunities with rationale
3. VOLATILITY ANALYSIS: Current IV levels and implications
4. RISK MANAGEMENT: Specific exit criteria and position sizing
5. MARKET OUTLOOK: Short-term directional bias based on the data
Format the response in a clear, actionable manner for traders.
"""
response = self.client.chat.completions.create(
model="gpt-4",
messages=[
{"role": "system", "content": "You are an expert options trading strategist with deep knowledge of NIFTY derivatives. Provide specific, actionable trading recommendations."},
{"role": "user", "content": prompt}
],
max_tokens=1500,
temperature=0.7
)
return response.choices[0].message.content
except Exception as e:
return f"❌ Error getting AI recommendations: {e}"
def _prepare_data_for_ai(self, current_data):
"""Prepare current market data for AI analysis"""
if current_data is None or len(current_data) == 0:
return "No current data available"
analysis_text = "NIFTY OPTIONS ANALYSIS DATA:\n\n"
# Get current market state
underlying_price = current_data['UndrlygPric'].iloc[0] if 'UndrlygPric' in current_data.columns else "Unknown"
trading_date = current_data['TradDt'].iloc[0] if 'TradDt' in current_data.columns else "Unknown"
analysis_text += f"TRADING DATE: {trading_date}\n"
analysis_text += f"CURRENT NIFTY SPOT: {underlying_price}\n\n"
# Add expiry information
expiries = sorted(current_data['XpryDt'].unique())
analysis_text += f"AVAILABLE EXPIRIES: {', '.join(expiries[:5])}\n\n"
# Get nearest expiry data for detailed analysis
if expiries:
nearest_expiry = expiries[0]
expiry_data = current_data[current_data['XpryDt'] == nearest_expiry]
ce_data = expiry_data[expiry_data['OptnTp'] == 'CE']
pe_data = expiry_data[expiry_data['OptnTp'] == 'PE']
analysis_text += f"DETAILED ANALYSIS FOR EXPIRY: {nearest_expiry}\n"
analysis_text += "=" * 40 + "\n\n"
# ATM Options Analysis
if underlying_price != "Unknown":
# Find ATM strikes
atm_ce_strike = self._find_strike_above(ce_data, underlying_price)
atm_pe_strike = self._find_strike_below(pe_data, underlying_price)
if atm_ce_strike and atm_pe_strike:
atm_ce_premium = ce_data[ce_data['StrkPric'] == atm_ce_strike]['ClsPric'].iloc[0]
atm_pe_premium = pe_data[pe_data['StrkPric'] == atm_pe_strike]['ClsPric'].iloc[0]
analysis_text += f"ATM CALL ({atm_ce_strike}): {atm_ce_premium:.2f}\n"
analysis_text += f"ATM PUT ({atm_pe_strike}): {atm_pe_premium:.2f}\n"
analysis_text += f"PUT-CALL RATIO (ATM): {atm_pe_premium/atm_ce_premium:.2f}\n\n"
# OTM Options Analysis
analysis_text += "OTM OPTIONS PREMIUMS:\n"
# OTM Calls (200, 400 points OTM)
if underlying_price != "Unknown":
otm_ce_200 = self._find_strike_above(ce_data, underlying_price + 200)
otm_ce_400 = self._find_strike_above(ce_data, underlying_price + 400)
if otm_ce_200:
premium_200 = ce_data[ce_data['StrkPric'] == otm_ce_200]['ClsPric'].iloc[0]
analysis_text += f"OTM CALL +200 ({otm_ce_200}): {premium_200:.2f}\n"
if otm_ce_400:
premium_400 = ce_data[ce_data['StrkPric'] == otm_ce_400]['ClsPric'].iloc[0]
analysis_text += f"OTM CALL +400 ({otm_ce_400}): {premium_400:.2f}\n"
# OTM Puts (200, 400 points OTM)
if underlying_price != "Unknown":
otm_pe_200 = self._find_strike_below(pe_data, underlying_price - 200)
otm_pe_400 = self._find_strike_below(pe_data, underlying_price - 400)
if otm_pe_200:
premium_200 = pe_data[pe_data['StrkPric'] == otm_pe_200]['ClsPric'].iloc[0]
analysis_text += f"OTM PUT -200 ({otm_pe_200}): {premium_200:.2f}\n"
if otm_pe_400:
premium_400 = pe_data[pe_data['StrkPric'] == otm_pe_400]['ClsPric'].iloc[0]
analysis_text += f"OTM PUT -400 ({otm_pe_400}): {premium_400:.2f}\n\n"
# Open Interest Analysis
if 'OpnIntrst' in expiry_data.columns:
total_oi = expiry_data['OpnIntrst'].sum()
ce_oi = ce_data['OpnIntrst'].sum()
pe_oi = pe_data['OpnIntrst'].sum()
analysis_text += "OPEN INTEREST ANALYSIS:\n"
analysis_text += f"Total OI: {total_oi:,.0f}\n"
analysis_text += f"Call OI: {ce_oi:,.0f}\n"
analysis_text += f"Put OI: {pe_oi:,.0f}\n"
analysis_text += f"PUT-CALL RATIO (OI): {pe_oi/ce_oi:.2f}\n\n"
# Volume Analysis
if 'TtlTradgVol' in expiry_data.columns:
total_volume = expiry_data['TtlTradgVol'].sum()
analysis_text += f"TOTAL VOLUME: {total_volume:,.0f}\n\n"
# Sample strikes and premiums for strategy building
analysis_text += "SAMPLE STRIKES & PREMIUMS:\n"
sample_strikes = sorted(expiry_data['StrkPric'].unique())
# Show strikes around current price
if underlying_price != "Unknown":
relevant_strikes = [s for s in sample_strikes if abs(s - underlying_price) <= 600]
for strike in relevant_strikes[:10]: # Show first 10 relevant strikes
ce_strike_data = ce_data[ce_data['StrkPric'] == strike]
pe_strike_data = pe_data[pe_data['StrkPric'] == strike]
ce_premium = ce_strike_data['ClsPric'].iloc[0] if len(ce_strike_data) > 0 else 0
pe_premium = pe_strike_data['ClsPric'].iloc[0] if len(pe_strike_data) > 0 else 0
distance = strike - underlying_price
analysis_text += f"Strike {strike}: CE={ce_premium:.2f} | PE={pe_premium:.2f} | Distance={distance:+.0f}\n"
return analysis_text
def analyze_option_selling_strategies(self):
"""Analyze best option selling opportunities"""
if self.nifty_data is None:
return "❌ Please load data first"
result = "πŸ’° OPTION SELLING STRATEGIES\n"
result += "=" * 50 + "\n"
# Get current data for analysis
current_data = self.nifty_data
underlying_price = current_data['UndrlygPric'].iloc[0] if 'UndrlygPric' in current_data.columns else None
if not underlying_price:
return "❌ Cannot analyze - No underlying price data"
# 1. Credit Spread Analysis
result += "\n🎯 CREDIT SPREAD OPPORTUNITIES:\n"
credit_spreads = self._find_best_credit_spreads(current_data, underlying_price)
result += credit_spreads
# 2. Naked Selling Opportunities
result += "\nπŸ”₯ HIGH-PROBABILITY NAKED SELLING:\n"
naked_opportunities = self._find_naked_selling_opportunities(current_data, underlying_price)
result += naked_opportunities
# 3. Strangle Opportunities
result += "\n⚑ SHORT STRANGLE SETUPS:\n"
strangle_opportunities = self._find_strangle_opportunities(current_data, underlying_price)
result += strangle_opportunities
return result
def _find_best_credit_spreads(self, data, underlying):
"""Find best credit spread opportunities"""
ce_data = data[data['OptnTp'] == 'CE']
pe_data = data[data['OptnTp'] == 'PE']
result = ""
# Bull Put Spread opportunities
put_strikes = sorted([s for s in pe_data['StrkPric'].unique() if s < underlying])
if len(put_strikes) >= 2:
short_put = put_strikes[-1]
long_put = put_strikes[-2]
short_premium = pe_data[pe_data['StrkPric'] == short_put]['ClsPric'].iloc[0]
long_premium = pe_data[pe_data['StrkPric'] == long_put]['ClsPric'].iloc[0]
net_credit = short_premium - long_premium
max_risk = (short_put - long_put) - net_credit
result += f"β€’ BULL PUT SPREAD: Sell {short_put} PE @ {short_premium:.2f} | Buy {long_put} PE @ {long_premium:.2f}\n"
result += f" Net Credit: {net_credit:.2f} | Max Risk: {max_risk:.2f} | Reward/Risk: {net_credit/max_risk:.2f}:1\n\n"
# Bear Call Spread opportunities
call_strikes = sorted([s for s in ce_data['StrkPric'].unique() if s > underlying])
if len(call_strikes) >= 2:
short_call = call_strikes[0]
long_call = call_strikes[1]
short_premium = ce_data[ce_data['StrkPric'] == short_call]['ClsPric'].iloc[0]
long_premium = ce_data[ce_data['StrkPric'] == long_call]['ClsPric'].iloc[0]
net_credit = short_premium - long_premium
max_risk = (long_call - short_call) - net_credit
result += f"β€’ BEAR CALL SPREAD: Sell {short_call} CE @ {short_premium:.2f} | Buy {long_call} CE @ {long_premium:.2f}\n"
result += f" Net Credit: {net_credit:.2f} | Max Risk: {max_risk:.2f} | Reward/Risk: {net_credit/max_risk:.2f}:1\n"
return result if result else "β€’ No suitable credit spread opportunities found\n"
def _find_naked_selling_opportunities(self, data, underlying):
"""Find naked selling opportunities"""
ce_data = data[data['OptnTp'] == 'CE']
pe_data = data[data['OptnTp'] == 'PE']
result = ""
# Naked Put Selling (high probability)
put_strikes = sorted([s for s in pe_data['StrkPric'].unique() if s < underlying])
if len(put_strikes) >= 1:
best_put = put_strikes[-1] # Closest OTM put
put_premium = pe_data[pe_data['StrkPric'] == best_put]['ClsPric'].iloc[0]
result += f"β€’ NAKED PUT SELL: {best_put} PE @ {put_premium:.2f}\n"
result += f" Strike is {underlying - best_put:.0f} points OTM | Premium: {put_premium:.2f}\n\n"
# Naked Call Selling (high probability)
call_strikes = sorted([s for s in ce_data['StrkPric'].unique() if s > underlying])
if len(call_strikes) >= 1:
best_call = call_strikes[0] # Closest OTM call
call_premium = ce_data[ce_data['StrkPric'] == best_call]['ClsPric'].iloc[0]
result += f"β€’ NAKED CALL SELL: {best_call} CE @ {call_premium:.2f}\n"
result += f" Strike is {best_call - underlying:.0f} points OTM | Premium: {call_premium:.2f}\n"
return result if result else "β€’ No suitable naked selling opportunities found\n"
def _find_strangle_opportunities(self, data, underlying):
"""Find short strangle opportunities"""
ce_data = data[data['OptnTp'] == 'CE']
pe_data = data[data['OptnTp'] == 'PE']
result = ""
call_strikes = sorted([s for s in ce_data['StrkPric'].unique() if s > underlying])
put_strikes = sorted([s for s in pe_data['StrkPric'].unique() if s < underlying])
if len(call_strikes) >= 1 and len(put_strikes) >= 1:
call_strike = call_strikes[0] # Closest OTM call
put_strike = put_strikes[-1] # Closest OTM put
call_premium = ce_data[ce_data['StrkPric'] == call_strike]['ClsPric'].iloc[0]
put_premium = pe_data[pe_data['StrkPric'] == put_strike]['ClsPric'].iloc[0]
total_credit = call_premium + put_premium
result += f"β€’ SHORT STRANGLE: Sell {call_strike} CE @ {call_premium:.2f} + Sell {put_strike} PE @ {put_premium:.2f}\n"
result += f" Total Credit: {total_credit:.2f} | Breakeven: {put_strike - total_credit:.2f} to {call_strike + total_credit:.2f}\n"
result += f" Profit Range: {total_credit * 2:.2f} points\n"
return result if result else "β€’ No suitable strangle opportunities found\n"
def _find_strike_above(self, data, target_price):
"""Find strike price above target price"""
strikes = data['StrkPric'].unique()
above_strikes = [s for s in strikes if s > target_price]
return min(above_strikes) if above_strikes else None
def _find_strike_below(self, data, target_price):
"""Find strike price below target price"""
strikes = data['StrkPric'].unique()
below_strikes = [s for s in strikes if s < target_price]
return max(below_strikes) if below_strikes else None
def suggest_iron_condor(self, risk_level='moderate'):
"""Suggest Iron Condor strategy for NIFTY"""
if self.nifty_data is None:
return "❌ Please load data first"
expiries = sorted(self.nifty_data['XpryDt'].unique())
if not expiries:
return "❌ No expiry dates found in data"
nearest_expiry = expiries[0]
return self._suggest_iron_condor_for_expiry(nearest_expiry, risk_level)
def _suggest_iron_condor_for_expiry(self, expiry_date, risk_level='moderate'):
"""Suggest Iron Condor for specific expiry"""
expiry_data = self.nifty_data[self.nifty_data['XpryDt'] == expiry_date]
if len(expiry_data) == 0:
return f"❌ No data found for expiry {expiry_date}"
underlying_price = expiry_data['UndrlygPric'].iloc[0] if 'UndrlygPric' in expiry_data.columns else None
if underlying_price is None:
return "❌ Cannot analyze - No underlying price data"
ce_data = expiry_data[expiry_data['OptnTp'] == 'CE']
pe_data = expiry_data[expiry_data['OptnTp'] == 'PE']
ce_strikes = sorted(ce_data['StrkPric'].unique())
pe_strikes = sorted(pe_data['StrkPric'].unique())
risk_config = {
'conservative': {'distance': 300, 'spread': 200},
'moderate': {'distance': 200, 'spread': 200},
'aggressive': {'distance': 100, 'spread': 100}
}
config = risk_config[risk_level]
ce_sell_strike = self._find_strike_above(ce_data, underlying_price + config['distance'])
ce_buy_strike = self._find_strike_above(ce_data, ce_sell_strike + config['spread']) if ce_sell_strike else None
pe_sell_strike = self._find_strike_below(pe_data, underlying_price - config['distance'])
pe_buy_strike = self._find_strike_below(pe_data, pe_sell_strike - config['spread']) if pe_sell_strike else None
if not all([ce_sell_strike, ce_buy_strike, pe_sell_strike, pe_buy_strike]):
return "❌ Cannot find suitable strikes for Iron Condor"
def get_premium(data, strike):
strike_data = data[data['StrkPric'] == strike]
return strike_data['ClsPric'].iloc[0] if not strike_data.empty else 0
ce_sell_prem = get_premium(ce_data, ce_sell_strike)
ce_buy_prem = get_premium(ce_data, ce_buy_strike)
pe_sell_prem = get_premium(pe_data, pe_sell_strike)
pe_buy_prem = get_premium(pe_data, pe_buy_strike)
if not all([ce_sell_prem, ce_buy_prem, pe_sell_prem, pe_buy_prem]):
return "❌ Missing premium data"
net_credit = (ce_sell_prem + pe_sell_prem) - (ce_buy_prem + pe_buy_prem)
spread_width = ce_buy_strike - ce_sell_strike
result = f"""
🎯 NIFTY IRON CONDOR - {risk_level.upper()} RISK
πŸ“… Expiry: {expiry_date} | πŸ’° Spot: {underlying_price:.2f}
{'='*50}
POSITION STRUCTURE:
SELL {ce_sell_strike} CE @ {ce_sell_prem:.2f}
BUY {ce_buy_strike} CE @ {ce_buy_prem:.2f}
SELL {pe_sell_strike} PE @ {pe_sell_prem:.2f}
BUY {pe_buy_strike} PE @ {pe_buy_prem:.2f}
STRATEGY METRICS:
Net Credit: {net_credit:.2f}
Max Profit: {net_credit:.2f}
Max Risk: {spread_width - net_credit:.2f}
Risk-Reward: {(spread_width - net_credit)/net_credit:.1f}:1
Breakeven: {pe_sell_strike - net_credit:.2f} to {ce_sell_strike + net_credit:.2f}
"""
return result
def run_ai_analysis(risk_level='moderate', analysis_type='comprehensive'):
"""Main execution function with enhanced analysis options"""
analyzer = AIAnalysis()
result = ["πŸ€– ENHANCED NIFTY AI ANALYZER", "=" * 50]
# Load data
load_result = analyzer.load_data()
result.append(load_result)
if "βœ…" in load_result:
if analysis_type == 'comprehensive':
# Historical analysis
historical_result = analyzer.analyze_historical_performance()
result.append(historical_result)
# Option selling strategies
selling_result = analyzer.analyze_option_selling_strategies()
result.append(selling_result)
# Traditional Iron Condor
strategy_result = analyzer.suggest_iron_condor(risk_level)
result.append(strategy_result)
# AI Recommendations (if API key available)
try:
ai_recommendation = analyzer.get_openai_strategy_recommendation(analyzer.nifty_data)
result.append("🧠 AI-POWERED RECOMMENDATIONS:\n" + ai_recommendation)
except Exception as e:
result.append(f"ℹ️ AI Analysis: {e}")
elif analysis_type == 'option_selling':
selling_result = analyzer.analyze_option_selling_strategies()
result.append(selling_result)
elif analysis_type == 'historical':
historical_result = analyzer.analyze_historical_performance()
result.append(historical_result)
else: # basic
strategy_result = analyzer.suggest_iron_condor(risk_level)
result.append(strategy_result)
return "\n\n".join(result)