limit_up / app.py
astacn's picture
Update app.py
62b10b5 verified
import akshare as ak
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split, KFold, cross_val_score, GridSearchCV
from sklearn.metrics import accuracy_score, confusion_matrix, classification_report
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout
from tensorflow.keras.wrappers.scikit_learn import KerasClassifier
from tensorflow.keras.optimizers import Adam
import datetime
import matplotlib.pyplot as plt
import requests
import time
import gradio as gr
import schedule
import threading
# Step 1: Data Acquisition and Preprocessing
def get_realtime_stock_data(symbol):
# Fetching real-time stock data using AKshare
realtime_data = ak.stock_zh_a_spot()
stock_data = realtime_data[realtime_data['代码'] == symbol]
stock_data = stock_data[['代码', '最新价', '开盘', '最高', '最低', '成交量', '换手率', '所属板块', '股票名称', '控股股东']]
stock_data.columns = ["Symbol", "Close", "Open", "High", "Low", "Volume", "Turnover", "Sector", "Stock_Name", "Major_Shareholder"]
stock_data['Date'] = pd.to_datetime(datetime.datetime.now())
stock_data.set_index('Date', inplace=True)
return stock_data
def get_stock_data(start_date, end_date):
stock_data = ak.stock_zh_a_hist(symbol="sh600000", period="daily", start_date=start_date, end_date=end_date, adjust="qfq")
stock_data = stock_data[['日期', '开盘', '收盘', '最高', '最低', '成交量', '换手率']]
stock_data.columns = ["Date", "Open", "Close", "High", "Low", "Volume", "Turnover"]
stock_data['Date'] = pd.to_datetime(stock_data['Date'])
stock_data.set_index('Date', inplace=True)
return stock_data
def get_auction_data(date):
# Example for fetching auction data (9:15-9:25) on a specific date
auction_data = ak.stock_zh_a_tick_tx(symbol="sh600000", trade_date=date.strftime("%Y%m%d"))
auction_data = auction_data[(auction_data['time'] >= '09:15:00') & (auction_data['time'] <= '09:25:00')]
auction_data['price'] = auction_data['price'].astype(float)
auction_data['volume'] = auction_data['volume'].astype(float)
auction_summary = auction_data[['price', 'volume']].agg({'price': 'mean', 'volume': 'sum'}).to_dict()
return auction_summary
def get_sentiment_data(start_date, end_date):
# Example using Baidu News Sentiment Analysis (Replace with actual data source as needed)
sentiment_data = []
date_range = pd.date_range(start=start_date, end=end_date)
for date in date_range:
# Placeholder for actual sentiment API call
sentiment_score = np.random.uniform(-1, 1) # Random score between -1 and 1 for illustration
sentiment_data.append({'Date': date, 'Sentiment': sentiment_score})
sentiment_df = pd.DataFrame(sentiment_data)
sentiment_df.set_index('Date', inplace=True)
return sentiment_df
def get_popularity_data():
# Fetching 东方财富人气指标 as an additional sentiment metric
popularity_data = ak.stock_em_hsgt_stock_statistics(symbol='沪股通')
popularity_data = popularity_data[['日期', '北向资金净买额']]
popularity_data.columns = ['Date', 'Popularity']
popularity_data['Date'] = pd.to_datetime(popularity_data['Date'])
popularity_data.set_index('Date', inplace=True)
return popularity_data
def get_valuation_data():
# Fetching 市盈率 (PE) and 市净率 (PB) as weak indicators
valuation_data = ak.stock_a_lg_indicator(symbol="sh600000")
valuation_data = valuation_data[['日期', '市盈率TTM', '市净率']]
valuation_data.columns = ['Date', 'PE_Ratio', 'PB_Ratio']
valuation_data['Date'] = pd.to_datetime(valuation_data['Date'])
valuation_data.set_index('Date', inplace=True)
return valuation_data
def preprocess_data(data, sentiment_data, popularity_data, valuation_data, auction_data):
# Merge sentiment, popularity, valuation, and auction data
data = data.join(sentiment_data, how='left')
data['Sentiment'] = data['Sentiment'].fillna(0) # Fill missing sentiment values with neutral (0)
data = data.join(popularity_data, how='left')
data['Popularity'] = data['Popularity'].fillna(0) # Fill missing popularity values with neutral (0)
data = data.join(valuation_data, how='left')
data['PE_Ratio'] = data['PE_Ratio'].fillna(data['PE_Ratio'].mean()) # Fill missing PE values with average
data['PB_Ratio'] = data['PB_Ratio'].fillna(data['PB_Ratio'].mean()) # Fill missing PB values with average
# Add auction data to main dataframe
auction_df = pd.DataFrame(auction_data).T
auction_df.columns = ['Auction_Price', 'Auction_Volume']
data = data.join(auction_df, how='left')
data['Auction_Price'] = data['Auction_Price'].fillna(data['Open']) # Fill missing auction data with opening price
data['Auction_Volume'] = data['Auction_Volume'].fillna(0) # Fill missing auction volumes with 0
# Calculate technical indicators (moving averages, MACD, etc.)
data['MA5'] = data['Close'].rolling(window=5).mean()
data['MA15'] = data['Close'].rolling(window=15).mean()
data['MACD'] = data['Close'].ewm(span=12, adjust=False).mean() - data['Close'].ewm(span=26, adjust=False).mean()
data['Signal'] = data['MACD'].ewm(span=9, adjust=False).mean()
data['Volume_Change'] = data['Volume'].pct_change() # Calculate percentage change in volume
# Filter out stocks based on opening price criteria (within ±3% of previous close)
data['Prev_Close'] = data['Close'].shift(1)
data = data[(data['Open'] <= data['Prev_Close'] * 1.03) & (data['Open'] >= data['Prev_Close'] * 0.97)]
data.dropna(inplace=True)
# Data normalization
scaler = StandardScaler()
scaled_data = scaler.fit_transform(data)
return scaled_data, data.index
# Step 2: LSTM Model Definition with Hyperparameter Tuning
def create_lstm_model(learning_rate=0.001, lstm_units=50, dropout_rate=0.2):
model = Sequential()
model.add(LSTM(lstm_units, return_sequences=True, input_shape=(60, 19))) # Adjusted to include auction data (19 features)
model.add(Dropout(dropout_rate))
model.add(LSTM(lstm_units, return_sequences=False))
model.add(Dropout(dropout_rate))
model.add(Dense(1, activation='sigmoid'))
optimizer = Adam(learning_rate=learning_rate)
model.compile(optimizer=optimizer, loss='binary_crossentropy', metrics=['accuracy'])
return model
# Step 3: Cross-Validation and Hyperparameter Tuning
def cross_validate_model(X, y):
# Create the KerasClassifier wrapper
model = KerasClassifier(build_fn=create_lstm_model, epochs=50, batch_size=32, verbose=0)
# Define the KFold cross-validator
kfold = KFold(n_splits=5, shuffle=True, random_state=42)
# Perform cross-validation
results = cross_val_score(model, X, y, cv=kfold)
print(f"Cross-validation accuracy: {results.mean():.2f} (+/- {results.std():.2f})")
# Step 4: Model Optimization using Grid Search
def optimize_model(X, y):
model = KerasClassifier(build_fn=create_lstm_model, verbose=0)
param_grid = {
'epochs': [50, 100],
'batch_size': [32, 64],
'learning_rate': [0.001, 0.005, 0.01],
'lstm_units': [50, 100],
'dropout_rate': [0.2, 0.3]
}
grid = GridSearchCV(estimator=model, param_grid=param_grid, n_jobs=-1, cv=3)
grid_result = grid.fit(X, y)
print(f"Best: {grid_result.best_score_} using {grid_result.best_params_}")
return grid_result.best_params_
# Step 5: Training, Evaluation, and Backtesting
def train_and_evaluate(X_train, y_train, X_test, y_test, best_params):
# Train final model with tuned parameters
final_model = create_lstm_model(
learning_rate=best_params['learning_rate'],
lstm_units=best_params['lstm_units'],
dropout_rate=best_params['dropout_rate']
)
final_model.fit(X_train, y_train, epochs=best_params['epochs'], batch_size=best_params['batch_size'], validation_split=0.2)
# Predict and Evaluate
predictions = final_model.predict(X_test)
predictions = [1 if x >= 0.5 else 0 for x in predictions]
accuracy = accuracy_score(y_test, predictions)
return accuracy
# Step 6: Deploy using Gradio
def predict(input_data):
# Assuming the input is preprocessed in the same way
scaled_input = scaler.transform(input_data)
prediction = final_model.predict(np.array([scaled_input]))
result = "Limit-Up" if prediction[0] >= 0.5 else "Not Limit-Up"
# Additional information to display
stock_info = input_data.iloc[0][['Sector', 'Symbol', 'Stock_Name', 'Major_Shareholder']]
return f"Prediction: {result}\nStock Information:\nSector: {stock_info['Sector']}\nSymbol: {stock_info['Symbol']}\nStock Name: {stock_info['Stock_Name']}\nMajor Shareholder: {stock_info['Major_Shareholder']}"
# Step 7: Schedule Automatic Predictions
def job():
print("Running scheduled prediction job...")
# Add the logic to run the prediction here using the trained model
schedule.every().day.at("09:26").do(job)
schedule.every().day.at("14:50").do(job)
def run_scheduler():
while True:
schedule.run_pending()
time.sleep(1)
# Start the scheduler in a separate thread
scheduler_thread = threading.Thread(target=run_scheduler)
scheduler_thread.daemon = True
scheduler_thread.start()
if __name__ == "__main__":
# Define Dates
start_date = "19940530"
end_date = "20240930"
# Get Data
stock_data = get_stock_data(start_date, end_date)
sentiment_data = get_sentiment_data(start_date, end_date)
popularity_data = get_popularity_data()
valuation_data = get_valuation_data()