File size: 9,280 Bytes
4839ed5 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 |
import os, sys
sys.path.insert(0, os.getcwd())
import pandas as pd
import global_variables as gb
import urlexpander
import tweepy
import requests, json
import re
import global_utils as utils
class PreProcessor():
def __init__(self) -> None:
self.logger = utils.get_index_preprocessor_logger()
self.api = self.get_api()
def expand_url_using_requests(self, url):
try:
session = requests.Session() # so connections are recycled
resp = session.head(url, allow_redirects=True,timeout=10)
return resp.url
except:
return ""
def expand_url(self, shortened_url):
# shortened_url = shortened_url.replace(' ','')
CLIENT_ERROR = "__CLIENT_ERROR__"
CONNECTIONPOOL_ERROR = "__CONNECTIONPOOL_ERROR__"
expanded_url = ""
try:
expanded_url = urlexpander.expand(shortened_url) # expand url using urlexpander library
if CLIENT_ERROR in expanded_url:
expanded_url = self.expand_url_using_requests(shortened_url) # expand url using custom function. Another way if the first didn't work
if CLIENT_ERROR in expanded_url:
self.logger.warn("Client error while expanding url: ", shortened_url)
expanded_url = ""
if CONNECTIONPOOL_ERROR in expanded_url:
expanded_url = self.expand_url_using_requests(shortened_url) # expand url using custom function. Another way if the first didn't work
if CONNECTIONPOOL_ERROR in expanded_url:
print("CONNECTION POOL error while expanding url: ", shortened_url)
expanded_url = ""
except Exception as e:
self.logger.warn("Cannot expand this url {} for this reason \n {} ".format(shortened_url, e))
expanded_url = ""
return expanded_url
def get_webpage_title(self, expanded_url):
PAGE_404 = "page 404"
PAGE_NOT_FOUND = "Page not found"
title = ""
try:
meta = urlexpander.html_utils.get_webpage_meta(expanded_url)
title = meta["title"]
if title is None:
title = ""
if PAGE_NOT_FOUND.lower() in title.lower() or PAGE_404 in title:
self.logger.warn("Page not found for this url: ", expanded_url)
title = ""
except Exception as e:
self.logger.warn("Cannot find the title for this url {} for this reason \n {} ".format(expanded_url, e))
title = ""
return title
def get_api(self,):
consumer_key = cf.consumer_key
consumer_secret = cf.consumer_secret
access_token = cf.access_token
access_token_secret = cf.access_token_secret
auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)
# perform authentication and get api object
api = tweepy.API(auth)
return api
def get_username_out_of_handle(self, user_handle):
user_name = ""
try:
# clean the handle
user_handle = user_handle.lower()
user_handle = user_handle.replace('@','')
user_handle = user_handle.replace(' ','')
user = self.api.get_user(user_handle)
user_id = user.id
user_name = user.name
except Exception as e:
self.logger.warn("Cannot get username for this handle {} for this reason \n {} ".format(user_handle, e))
user_name = ""
return user_name
def get_media_guess_from_url(self, image_url):
try:
server_url = "http://localhost:5000/search"
data = {"image_url": image_url}
headers = {'Content-type': 'application/json'}
response = requests.post(server_url, headers=headers, data=json.dumps(data))
json_response = response.json()
best_guess = json_response['best_guess']
if best_guess == "language" or best_guess == "event":
# if the best guess is only one of these words, then no need to add them to the query
best_guess = ""
except Exception as e:
self.logger.warn("cannot get the best guess for this image url: {} for this reason \n {} ".format(image_url, e))
best_guess = ""
return best_guess
def get_media_guess(self, tweet):
media_best_guess= ""
if 'media' in tweet.entities:
for media in tweet['extended_entities']['media']:
try:
media_url = media['media_url'] # for getting the image URL
media_best_guess = media_best_guess + self.get_media_guess_from_url(media_url) + " "
except:
self.logger.exception("Error: Unable to extract best guess for this tweet : {}".format(tweet['id_str']))
for media in tweet['extended_entities']['media']:
try:
media_url = media['expanded_url'] # in case there is a video
media_best_guess = media_best_guess + self.get_media_guess_from_url(media_url) + " "
except:
self.logger.exception("Error: Unable to extract best guess for this tweet: {}".format(tweet['id_str']))
return media_best_guess
def get_media_guess_from_tweet_id(self, tweet_id):
try:
tweet = self.api.get_status(tweet_id, tweet_mode="extended")
except:
self.logger.exception("Error: No data available for specified ID {} ".format(tweet_id))
return ""
with open('tweet.json', 'w', encoding='utf8') as file:
json.dump(tweet._json, file,)
media_best_guess = self.get_media_guess(tweet)
return media_best_guess
def reformat_urls(self, tweet):
''' Separate consecutive URLs with spaces, and add https prior to pic.twitter'''
tweet = tweet.replace("https", " https")
tweet = tweet.replace("http", " http")
tweet = tweet.replace("pic.twitter", " https://pic.twitter")
return tweet
def remove_handle_from_second_part(self, tweet):
try:
if '—' in tweet:
second_part = tweet.split('—')[1]
new_second_part = re.sub(r"@[\w]*", " ", second_part) # remove handles
tweet = tweet.replace(second_part, new_second_part)
except:
print("Cannot remove handle from second part for this tweet: ", tweet)
return tweet
def get_tweet_id(self, tweet_url):
try:
# 1. get everything after 'status/' 2. Remove everything after '?' 3. Get the id before the first '/'
tweet_id = tweet_url.split('status/')[-1].split('?')[0].split('/')[0]
except:
print("Error: cannot get the id out of this url: ", tweet_url)
tweet_id = ""
return tweet_id
def expand_tweet(self, tweet):
tweet_text = tweet['full_text']
# 1. Format URLs in a readable way and remove handle from second part. Username for the handle in the second part is already exist
tweet_text = self.reformat_urls(tweet_text)
# tweet = remove_handle_from_second_part(tweet)
# 2. Replace handles with their names
handle_pattern = re.compile(r"@[\w]*")
iterator = handle_pattern.finditer(tweet_text)
for match in iterator:
user_handle = match.group() # group: Return the string matched by the RE
user_name = self.get_username_out_of_handle(user_handle)
tweet_text = tweet_text.replace(user_handle, user_name)
# 3. replace URLs with their titles
for item in tweet["entities"]['urls']:
try:
url = item['url']
expanded_url = item['expanded_url']
webpage_title = self.get_webpage_title(expanded_url)
webpage_title = re.sub(r'\W*$', '', webpage_title) # remove punctuation from the tail
# expanded_url = self.expand_url(url)
if webpage_title in tweet_text: # to avoid repetition
webpage_title = ""
tweet_text = tweet_text.replace(url, webpage_title)
except:
self.logger.exception("Error: Unable to extract best guess for this tweet : {} ".format(tweet['id_str']))
if 'media' not in tweet['entities']: # No images or videos,
return tweet_text
# 4. replace images/videos with their titles
for media in tweet['entities']['media']:
try:
url = media["url"]
expanded_url = media['expanded_url'] # for getting the image/video URL
media_best_guess = " , " + self.get_media_guess_from_url(expanded_url)
tweet_text = tweet_text.replace(url, media_best_guess)
except:
self.logger.exception("Error: Unable to extract best guess for this tweet : {} ".format(tweet['id_str']))
# tweet[gb.EXPANDED_TEXT] = tweet_text
return tweet_text
|