import akshare as ak import pandas as pd import numpy as np from sklearn.preprocessing import StandardScaler from sklearn.model_selection import train_test_split, KFold, cross_val_score, GridSearchCV from sklearn.metrics import accuracy_score, confusion_matrix, classification_report import tensorflow as tf from tensorflow.keras.models import Sequential from tensorflow.keras.layers import LSTM, Dense, Dropout from tensorflow.keras.wrappers.scikit_learn import KerasClassifier from tensorflow.keras.optimizers import Adam import datetime import matplotlib.pyplot as plt import requests import time import gradio as gr import schedule import threading # Step 1: Data Acquisition and Preprocessing def get_realtime_stock_data(symbol): # Fetching real-time stock data using AKshare realtime_data = ak.stock_zh_a_spot() stock_data = realtime_data[realtime_data['代码'] == symbol] stock_data = stock_data[['代码', '最新价', '开盘', '最高', '最低', '成交量', '换手率', '所属板块', '股票名称', '控股股东']] stock_data.columns = ["Symbol", "Close", "Open", "High", "Low", "Volume", "Turnover", "Sector", "Stock_Name", "Major_Shareholder"] stock_data['Date'] = pd.to_datetime(datetime.datetime.now()) stock_data.set_index('Date', inplace=True) return stock_data def get_stock_data(start_date, end_date): stock_data = ak.stock_zh_a_hist(symbol="sh600000", period="daily", start_date=start_date, end_date=end_date, adjust="qfq") stock_data = stock_data[['日期', '开盘', '收盘', '最高', '最低', '成交量', '换手率']] stock_data.columns = ["Date", "Open", "Close", "High", "Low", "Volume", "Turnover"] stock_data['Date'] = pd.to_datetime(stock_data['Date']) stock_data.set_index('Date', inplace=True) return stock_data def get_auction_data(date): # Example for fetching auction data (9:15-9:25) on a specific date auction_data = ak.stock_zh_a_tick_tx(symbol="sh600000", trade_date=date.strftime("%Y%m%d")) auction_data = auction_data[(auction_data['time'] >= '09:15:00') & (auction_data['time'] <= '09:25:00')] auction_data['price'] = auction_data['price'].astype(float) auction_data['volume'] = auction_data['volume'].astype(float) auction_summary = auction_data[['price', 'volume']].agg({'price': 'mean', 'volume': 'sum'}).to_dict() return auction_summary def get_sentiment_data(start_date, end_date): # Example using Baidu News Sentiment Analysis (Replace with actual data source as needed) sentiment_data = [] date_range = pd.date_range(start=start_date, end=end_date) for date in date_range: # Placeholder for actual sentiment API call sentiment_score = np.random.uniform(-1, 1) # Random score between -1 and 1 for illustration sentiment_data.append({'Date': date, 'Sentiment': sentiment_score}) sentiment_df = pd.DataFrame(sentiment_data) sentiment_df.set_index('Date', inplace=True) return sentiment_df def get_popularity_data(): # Fetching 东方财富人气指标 as an additional sentiment metric popularity_data = ak.stock_em_hsgt_stock_statistics(symbol='沪股通') popularity_data = popularity_data[['日期', '北向资金净买额']] popularity_data.columns = ['Date', 'Popularity'] popularity_data['Date'] = pd.to_datetime(popularity_data['Date']) popularity_data.set_index('Date', inplace=True) return popularity_data def get_valuation_data(): # Fetching 市盈率 (PE) and 市净率 (PB) as weak indicators valuation_data = ak.stock_a_lg_indicator(symbol="sh600000") valuation_data = valuation_data[['日期', '市盈率TTM', '市净率']] valuation_data.columns = ['Date', 'PE_Ratio', 'PB_Ratio'] valuation_data['Date'] = pd.to_datetime(valuation_data['Date']) valuation_data.set_index('Date', inplace=True) return valuation_data def preprocess_data(data, sentiment_data, popularity_data, valuation_data, auction_data): # Merge sentiment, popularity, valuation, and auction data data = data.join(sentiment_data, how='left') data['Sentiment'] = data['Sentiment'].fillna(0) # Fill missing sentiment values with neutral (0) data = data.join(popularity_data, how='left') data['Popularity'] = data['Popularity'].fillna(0) # Fill missing popularity values with neutral (0) data = data.join(valuation_data, how='left') data['PE_Ratio'] = data['PE_Ratio'].fillna(data['PE_Ratio'].mean()) # Fill missing PE values with average data['PB_Ratio'] = data['PB_Ratio'].fillna(data['PB_Ratio'].mean()) # Fill missing PB values with average # Add auction data to main dataframe auction_df = pd.DataFrame(auction_data).T auction_df.columns = ['Auction_Price', 'Auction_Volume'] data = data.join(auction_df, how='left') data['Auction_Price'] = data['Auction_Price'].fillna(data['Open']) # Fill missing auction data with opening price data['Auction_Volume'] = data['Auction_Volume'].fillna(0) # Fill missing auction volumes with 0 # Calculate technical indicators (moving averages, MACD, etc.) data['MA5'] = data['Close'].rolling(window=5).mean() data['MA15'] = data['Close'].rolling(window=15).mean() data['MACD'] = data['Close'].ewm(span=12, adjust=False).mean() - data['Close'].ewm(span=26, adjust=False).mean() data['Signal'] = data['MACD'].ewm(span=9, adjust=False).mean() data['Volume_Change'] = data['Volume'].pct_change() # Calculate percentage change in volume # Filter out stocks based on opening price criteria (within ±3% of previous close) data['Prev_Close'] = data['Close'].shift(1) data = data[(data['Open'] <= data['Prev_Close'] * 1.03) & (data['Open'] >= data['Prev_Close'] * 0.97)] data.dropna(inplace=True) # Data normalization scaler = StandardScaler() scaled_data = scaler.fit_transform(data) return scaled_data, data.index # Step 2: LSTM Model Definition with Hyperparameter Tuning def create_lstm_model(learning_rate=0.001, lstm_units=50, dropout_rate=0.2): model = Sequential() model.add(LSTM(lstm_units, return_sequences=True, input_shape=(60, 19))) # Adjusted to include auction data (19 features) model.add(Dropout(dropout_rate)) model.add(LSTM(lstm_units, return_sequences=False)) model.add(Dropout(dropout_rate)) model.add(Dense(1, activation='sigmoid')) optimizer = Adam(learning_rate=learning_rate) model.compile(optimizer=optimizer, loss='binary_crossentropy', metrics=['accuracy']) return model # Step 3: Cross-Validation and Hyperparameter Tuning def cross_validate_model(X, y): # Create the KerasClassifier wrapper model = KerasClassifier(build_fn=create_lstm_model, epochs=50, batch_size=32, verbose=0) # Define the KFold cross-validator kfold = KFold(n_splits=5, shuffle=True, random_state=42) # Perform cross-validation results = cross_val_score(model, X, y, cv=kfold) print(f"Cross-validation accuracy: {results.mean():.2f} (+/- {results.std():.2f})") # Step 4: Model Optimization using Grid Search def optimize_model(X, y): model = KerasClassifier(build_fn=create_lstm_model, verbose=0) param_grid = { 'epochs': [50, 100], 'batch_size': [32, 64], 'learning_rate': [0.001, 0.005, 0.01], 'lstm_units': [50, 100], 'dropout_rate': [0.2, 0.3] } grid = GridSearchCV(estimator=model, param_grid=param_grid, n_jobs=-1, cv=3) grid_result = grid.fit(X, y) print(f"Best: {grid_result.best_score_} using {grid_result.best_params_}") return grid_result.best_params_ # Step 5: Training, Evaluation, and Backtesting def train_and_evaluate(X_train, y_train, X_test, y_test, best_params): # Train final model with tuned parameters final_model = create_lstm_model( learning_rate=best_params['learning_rate'], lstm_units=best_params['lstm_units'], dropout_rate=best_params['dropout_rate'] ) final_model.fit(X_train, y_train, epochs=best_params['epochs'], batch_size=best_params['batch_size'], validation_split=0.2) # Predict and Evaluate predictions = final_model.predict(X_test) predictions = [1 if x >= 0.5 else 0 for x in predictions] accuracy = accuracy_score(y_test, predictions) return accuracy # Step 6: Deploy using Gradio def predict(input_data): # Assuming the input is preprocessed in the same way scaled_input = scaler.transform(input_data) prediction = final_model.predict(np.array([scaled_input])) result = "Limit-Up" if prediction[0] >= 0.5 else "Not Limit-Up" # Additional information to display stock_info = input_data.iloc[0][['Sector', 'Symbol', 'Stock_Name', 'Major_Shareholder']] return f"Prediction: {result}\nStock Information:\nSector: {stock_info['Sector']}\nSymbol: {stock_info['Symbol']}\nStock Name: {stock_info['Stock_Name']}\nMajor Shareholder: {stock_info['Major_Shareholder']}" # Step 7: Schedule Automatic Predictions def job(): print("Running scheduled prediction job...") # Add the logic to run the prediction here using the trained model schedule.every().day.at("09:26").do(job) schedule.every().day.at("14:50").do(job) def run_scheduler(): while True: schedule.run_pending() time.sleep(1) # Start the scheduler in a separate thread scheduler_thread = threading.Thread(target=run_scheduler) scheduler_thread.daemon = True scheduler_thread.start() if __name__ == "__main__": # Define Dates start_date = "19940530" end_date = "20240930" # Get Data stock_data = get_stock_data(start_date, end_date) sentiment_data = get_sentiment_data(start_date, end_date) popularity_data = get_popularity_data() valuation_data = get_valuation_data()