Spaces:
Sleeping
Sleeping
File size: 5,228 Bytes
1544939 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 |
import streamlit as st
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import re
import pickle
# Load word2idx
with open('word2idx.pkl', 'rb') as f:
word2idx = pickle.load(f)
# Clean text function
def clean_text(text):
emoji_pattern = re.compile("["
u"\U0001F600-\U0001F64F"
u"\U0001F300-\U0001F5FF"
u"\U0001F680-\U0001F6FF"
u"\U0001F1E0-\U0001F1FF"
u"\U00002702-\U000027B0"
u"\U000024C2-\U0001F251"
"]+", flags=re.UNICODE)
text = emoji_pattern.sub(r'', text)
url = re.compile(r'https?://\S+|www\.\S+')
text = url.sub(r'', text)
text = text.replace('#', ' ')
text = text.replace('@', ' ')
symbols = re.compile(r'[^A-Za-z0-9 ]')
text = symbols.sub(r'', text)
text = text.lower()
return text
# Text to sequence function
def text_to_sequence(text, word2idx, maxlen=55):
words = text.split()
seq = [word2idx.get(word, 0) for word in words]
if len(seq) > maxlen:
seq = seq[:maxlen]
else:
seq = [0]*(maxlen - len(seq)) + seq
return np.array(seq)
# Define the BiLSTM class
class BiLSTM(nn.Module):
def __init__(self, weights_matrix, output_size, hidden_dim, hidden_dim2, n_layers, drop_prob=0.5):
super(BiLSTM, self).__init__()
self.output_size = output_size
self.n_layers = n_layers
self.hidden_dim = hidden_dim
# Embedding layer
num_embeddings, embedding_dim = weights_matrix.size()
self.embedding = nn.Embedding(num_embeddings, embedding_dim)
self.embedding.weight.data.copy_(weights_matrix)
self.embedding.weight.requires_grad = False # Freeze embedding layer
# BiLSTM layer
self.lstm = nn.LSTM(embedding_dim, hidden_dim, n_layers, dropout=drop_prob, bidirectional=True, batch_first=True)
# Dropout layer
self.dropout = nn.Dropout(0.3)
# Fully connected layers
self.fc1 = nn.Linear(hidden_dim * 2, hidden_dim2)
self.fc2 = nn.Linear(hidden_dim2, output_size)
# Activation function
self.sigmoid = nn.Sigmoid()
def forward(self, x, hidden):
batch_size = x.size(0)
# Embedding
embeds = self.embedding(x)
# LSTM
lstm_out, hidden = self.lstm(embeds, hidden)
# Stack up LSTM outputs
lstm_out = lstm_out.contiguous().view(-1, self.hidden_dim * 2)
# Dropout and fully connected layers
out = self.dropout(lstm_out)
out = self.fc1(out)
out = F.relu(out)
out = self.dropout(out)
out = self.fc2(out)
# Sigmoid activation
sig_out = self.sigmoid(out)
# Reshape to batch_size first
sig_out = sig_out.view(batch_size, -1)
sig_out = sig_out[:, -1] # Get last batch of labels
return sig_out, hidden
def init_hidden(self, batch_size, train_on_gpu=False):
weight = next(self.parameters()).data
layers = self.n_layers * 2 # Multiply by 2 for bidirectionality
if train_on_gpu:
hidden = (weight.new(layers, batch_size, self.hidden_dim).zero_().cuda(),
weight.new(layers, batch_size, self.hidden_dim).zero_().cuda())
else:
hidden = (weight.new(layers, batch_size, self.hidden_dim).zero_(),
weight.new(layers, batch_size, self.hidden_dim).zero_())
return hidden
# Load the embedding weights matrix
weights_matrix = torch.tensor(np.load('weights_matrix.npy'))
# Instantiate the model
output_size = 1
hidden_dim = 128
hidden_dim2 = 64
n_layers = 2
net = BiLSTM(weights_matrix, output_size, hidden_dim, hidden_dim2, n_layers)
# Load the model's state_dict
net.load_state_dict(torch.load('state_dict.pt', map_location=torch.device('cpu')))
net.eval()
# Streamlit app
def main():
st.title("Disaster Tweet Classifier")
st.write("Enter a tweet to classify whether it's about a real disaster or not.")
user_input = st.text_area("Enter Tweet Text:")
if st.button("Classify"):
if user_input:
# Preprocess input
clean_input = clean_text(user_input)
seq = text_to_sequence(clean_input, word2idx)
input_tensor = torch.from_numpy(seq).unsqueeze(0).type(torch.LongTensor)
# Initialize hidden state
h = net.init_hidden(1, train_on_gpu=False)
h = tuple([each.data for each in h])
# Make prediction
with torch.no_grad():
output, h = net(input_tensor, h)
prob = output.item()
pred = int(torch.round(output).item())
# Display result
if pred == 1:
st.success(f"This tweet is about a **real disaster**. (Probability: {prob:.4f})")
else:
st.info(f"This tweet is **not about a real disaster**. (Probability: {prob:.4f})")
else:
st.warning("Please enter some text to classify.")
if __name__ == '__main__':
main()
|