File size: 9,742 Bytes
6e106ca 919dcc6 62b10b5 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 |
import akshare as ak
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split, KFold, cross_val_score, GridSearchCV
from sklearn.metrics import accuracy_score, confusion_matrix, classification_report
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout
from tensorflow.keras.wrappers.scikit_learn import KerasClassifier
from tensorflow.keras.optimizers import Adam
import datetime
import matplotlib.pyplot as plt
import requests
import time
import gradio as gr
import schedule
import threading
# Step 1: Data Acquisition and Preprocessing
def get_realtime_stock_data(symbol):
# Fetching real-time stock data using AKshare
realtime_data = ak.stock_zh_a_spot()
stock_data = realtime_data[realtime_data['代码'] == symbol]
stock_data = stock_data[['代码', '最新价', '开盘', '最高', '最低', '成交量', '换手率', '所属板块', '股票名称', '控股股东']]
stock_data.columns = ["Symbol", "Close", "Open", "High", "Low", "Volume", "Turnover", "Sector", "Stock_Name", "Major_Shareholder"]
stock_data['Date'] = pd.to_datetime(datetime.datetime.now())
stock_data.set_index('Date', inplace=True)
return stock_data
def get_stock_data(start_date, end_date):
stock_data = ak.stock_zh_a_hist(symbol="sh600000", period="daily", start_date=start_date, end_date=end_date, adjust="qfq")
stock_data = stock_data[['日期', '开盘', '收盘', '最高', '最低', '成交量', '换手率']]
stock_data.columns = ["Date", "Open", "Close", "High", "Low", "Volume", "Turnover"]
stock_data['Date'] = pd.to_datetime(stock_data['Date'])
stock_data.set_index('Date', inplace=True)
return stock_data
def get_auction_data(date):
# Example for fetching auction data (9:15-9:25) on a specific date
auction_data = ak.stock_zh_a_tick_tx(symbol="sh600000", trade_date=date.strftime("%Y%m%d"))
auction_data = auction_data[(auction_data['time'] >= '09:15:00') & (auction_data['time'] <= '09:25:00')]
auction_data['price'] = auction_data['price'].astype(float)
auction_data['volume'] = auction_data['volume'].astype(float)
auction_summary = auction_data[['price', 'volume']].agg({'price': 'mean', 'volume': 'sum'}).to_dict()
return auction_summary
def get_sentiment_data(start_date, end_date):
# Example using Baidu News Sentiment Analysis (Replace with actual data source as needed)
sentiment_data = []
date_range = pd.date_range(start=start_date, end=end_date)
for date in date_range:
# Placeholder for actual sentiment API call
sentiment_score = np.random.uniform(-1, 1) # Random score between -1 and 1 for illustration
sentiment_data.append({'Date': date, 'Sentiment': sentiment_score})
sentiment_df = pd.DataFrame(sentiment_data)
sentiment_df.set_index('Date', inplace=True)
return sentiment_df
def get_popularity_data():
# Fetching 东方财富人气指标 as an additional sentiment metric
popularity_data = ak.stock_em_hsgt_stock_statistics(symbol='沪股通')
popularity_data = popularity_data[['日期', '北向资金净买额']]
popularity_data.columns = ['Date', 'Popularity']
popularity_data['Date'] = pd.to_datetime(popularity_data['Date'])
popularity_data.set_index('Date', inplace=True)
return popularity_data
def get_valuation_data():
# Fetching 市盈率 (PE) and 市净率 (PB) as weak indicators
valuation_data = ak.stock_a_lg_indicator(symbol="sh600000")
valuation_data = valuation_data[['日期', '市盈率TTM', '市净率']]
valuation_data.columns = ['Date', 'PE_Ratio', 'PB_Ratio']
valuation_data['Date'] = pd.to_datetime(valuation_data['Date'])
valuation_data.set_index('Date', inplace=True)
return valuation_data
def preprocess_data(data, sentiment_data, popularity_data, valuation_data, auction_data):
# Merge sentiment, popularity, valuation, and auction data
data = data.join(sentiment_data, how='left')
data['Sentiment'] = data['Sentiment'].fillna(0) # Fill missing sentiment values with neutral (0)
data = data.join(popularity_data, how='left')
data['Popularity'] = data['Popularity'].fillna(0) # Fill missing popularity values with neutral (0)
data = data.join(valuation_data, how='left')
data['PE_Ratio'] = data['PE_Ratio'].fillna(data['PE_Ratio'].mean()) # Fill missing PE values with average
data['PB_Ratio'] = data['PB_Ratio'].fillna(data['PB_Ratio'].mean()) # Fill missing PB values with average
# Add auction data to main dataframe
auction_df = pd.DataFrame(auction_data).T
auction_df.columns = ['Auction_Price', 'Auction_Volume']
data = data.join(auction_df, how='left')
data['Auction_Price'] = data['Auction_Price'].fillna(data['Open']) # Fill missing auction data with opening price
data['Auction_Volume'] = data['Auction_Volume'].fillna(0) # Fill missing auction volumes with 0
# Calculate technical indicators (moving averages, MACD, etc.)
data['MA5'] = data['Close'].rolling(window=5).mean()
data['MA15'] = data['Close'].rolling(window=15).mean()
data['MACD'] = data['Close'].ewm(span=12, adjust=False).mean() - data['Close'].ewm(span=26, adjust=False).mean()
data['Signal'] = data['MACD'].ewm(span=9, adjust=False).mean()
data['Volume_Change'] = data['Volume'].pct_change() # Calculate percentage change in volume
# Filter out stocks based on opening price criteria (within ±3% of previous close)
data['Prev_Close'] = data['Close'].shift(1)
data = data[(data['Open'] <= data['Prev_Close'] * 1.03) & (data['Open'] >= data['Prev_Close'] * 0.97)]
data.dropna(inplace=True)
# Data normalization
scaler = StandardScaler()
scaled_data = scaler.fit_transform(data)
return scaled_data, data.index
# Step 2: LSTM Model Definition with Hyperparameter Tuning
def create_lstm_model(learning_rate=0.001, lstm_units=50, dropout_rate=0.2):
model = Sequential()
model.add(LSTM(lstm_units, return_sequences=True, input_shape=(60, 19))) # Adjusted to include auction data (19 features)
model.add(Dropout(dropout_rate))
model.add(LSTM(lstm_units, return_sequences=False))
model.add(Dropout(dropout_rate))
model.add(Dense(1, activation='sigmoid'))
optimizer = Adam(learning_rate=learning_rate)
model.compile(optimizer=optimizer, loss='binary_crossentropy', metrics=['accuracy'])
return model
# Step 3: Cross-Validation and Hyperparameter Tuning
def cross_validate_model(X, y):
# Create the KerasClassifier wrapper
model = KerasClassifier(build_fn=create_lstm_model, epochs=50, batch_size=32, verbose=0)
# Define the KFold cross-validator
kfold = KFold(n_splits=5, shuffle=True, random_state=42)
# Perform cross-validation
results = cross_val_score(model, X, y, cv=kfold)
print(f"Cross-validation accuracy: {results.mean():.2f} (+/- {results.std():.2f})")
# Step 4: Model Optimization using Grid Search
def optimize_model(X, y):
model = KerasClassifier(build_fn=create_lstm_model, verbose=0)
param_grid = {
'epochs': [50, 100],
'batch_size': [32, 64],
'learning_rate': [0.001, 0.005, 0.01],
'lstm_units': [50, 100],
'dropout_rate': [0.2, 0.3]
}
grid = GridSearchCV(estimator=model, param_grid=param_grid, n_jobs=-1, cv=3)
grid_result = grid.fit(X, y)
print(f"Best: {grid_result.best_score_} using {grid_result.best_params_}")
return grid_result.best_params_
# Step 5: Training, Evaluation, and Backtesting
def train_and_evaluate(X_train, y_train, X_test, y_test, best_params):
# Train final model with tuned parameters
final_model = create_lstm_model(
learning_rate=best_params['learning_rate'],
lstm_units=best_params['lstm_units'],
dropout_rate=best_params['dropout_rate']
)
final_model.fit(X_train, y_train, epochs=best_params['epochs'], batch_size=best_params['batch_size'], validation_split=0.2)
# Predict and Evaluate
predictions = final_model.predict(X_test)
predictions = [1 if x >= 0.5 else 0 for x in predictions]
accuracy = accuracy_score(y_test, predictions)
return accuracy
# Step 6: Deploy using Gradio
def predict(input_data):
# Assuming the input is preprocessed in the same way
scaled_input = scaler.transform(input_data)
prediction = final_model.predict(np.array([scaled_input]))
result = "Limit-Up" if prediction[0] >= 0.5 else "Not Limit-Up"
# Additional information to display
stock_info = input_data.iloc[0][['Sector', 'Symbol', 'Stock_Name', 'Major_Shareholder']]
return f"Prediction: {result}\nStock Information:\nSector: {stock_info['Sector']}\nSymbol: {stock_info['Symbol']}\nStock Name: {stock_info['Stock_Name']}\nMajor Shareholder: {stock_info['Major_Shareholder']}"
# Step 7: Schedule Automatic Predictions
def job():
print("Running scheduled prediction job...")
# Add the logic to run the prediction here using the trained model
schedule.every().day.at("09:26").do(job)
schedule.every().day.at("14:50").do(job)
def run_scheduler():
while True:
schedule.run_pending()
time.sleep(1)
# Start the scheduler in a separate thread
scheduler_thread = threading.Thread(target=run_scheduler)
scheduler_thread.daemon = True
scheduler_thread.start()
if __name__ == "__main__":
# Define Dates
start_date = "19940530"
end_date = "20240930"
# Get Data
stock_data = get_stock_data(start_date, end_date)
sentiment_data = get_sentiment_data(start_date, end_date)
popularity_data = get_popularity_data()
valuation_data = get_valuation_data() |